perf(compression): pool zstd src/dst buffers in stream upload#2804
perf(compression): pool zstd src/dst buffers in stream upload#2804ValentaTomas wants to merge 2 commits into
Conversation
zstdCompressor.compress allocated make([]byte, 0, len(src)) per frame (2 MiB at default config) which survived until the part upload completed; nothing recycled it. Drove ~40% of total alloc bytes and ~3% of CPU per the compression-audit pprof. Pull dst from a sync.Pool sized at the requested frame width and recycle in the part-upload completion goroutine, after UploadPart returns (retryablehttp replays happen inside client.Do, which returns before UploadPart, so recycling is safe).
readLoop allocated make([]byte, frameSize) per frame, ~32% of total alloc bytes per the compression-audit pprof. Pull source buffers from a sync.Pool sized at frame width and recycle inside addFrame's goroutine once the compressor has consumed the bytes (defer Put runs on success and error paths). Couples cleanly with the B6 dst pool — both add the same coordination idiom.
PR SummaryMedium Risk Overview It also pools the zstd Reviewed by Cursor Bugbot for commit 7923cc2. Bugbot is set up for automated code reviews on this repo. Configure here. |
❌ 4 Tests Failed:
View the full list of 4 ❄️ flaky test(s)
To view more test analytics, go to the Test Analytics Dashboard |
There was a problem hiding this comment.
Code Review
The putDstBuf function incorrectly stores the address of a local slice header in the sync.Pool, which leads to memory corruption because the stack pointer becomes invalid once the function returns. The pool should store the slice directly to ensure proper reuse and memory safety.
| // dstBufPool holds frame-sized scratch buffers reused for zstd EncodeAll dst. | ||
| // Caller recycles via putDstBuf after the part upload completes. | ||
| var dstBufPool = sync.Pool{ | ||
| New: func() any { | ||
| b := make([]byte, 0) | ||
| return &b | ||
| }, | ||
| } | ||
|
|
||
| func getDstBuf(want int) []byte { | ||
| b := dstBufPool.Get().(*[]byte) | ||
| if cap(*b) < want { | ||
| *b = make([]byte, 0, want) | ||
| } | ||
| return (*b)[:0] | ||
| } | ||
|
|
||
| func putDstBuf(b []byte) { | ||
| if cap(b) == 0 { | ||
| return | ||
| } | ||
| b = b[:0] | ||
| dstBufPool.Put(&b) | ||
| } |
There was a problem hiding this comment.
The putDstBuf function incorrectly stores the address of its local parameter b in the sync.Pool. Since b is a slice header passed by value, &b points to the stack, which becomes invalid once the function returns, leading to memory corruption when the pointer is later retrieved from the pool. Additionally, getDstBuf returns the slice by value, losing the association with the pointer retrieved from the pool and preventing proper reuse. To fix this correctly while adhering to the compressor interface, the pool should store []byte directly instead of *[]byte.
// dstBufPool holds frame-sized scratch buffers reused for zstd EncodeAll dst.
// Caller recycles via putDstBuf after the part upload completes.
var dstBufPool = sync.Pool{
New: func() any {
return make([]byte, 0)
},
}
func getDstBuf(want int) []byte {
b, _ := dstBufPool.Get().([]byte)
if cap(b) < want {
b = make([]byte, 0, want)
}
return b[:0]
}
func putDstBuf(b []byte) {
if cap(b) == 0 {
return
}
dstBufPool.Put(b[:0])
}|
Closing for now—originally opened to illustrate the necessary changes for reference. |
Pulls the two pool fixes out of #2803 so the correctness PR can land independently.
EncodeAlldst buffer; recycle in the part-upload completion goroutine. Audit pprof had it at ~40% of total alloc bytes and ~3% of CPU.addFramenow takes a*[]byte+ valid length and recycles viadeferonce the compressor has consumed it.Local bench (M1 Pro,
BenchmarkCompress/w4_unlimited,-benchtime=2s):Race tests on the full storage package pass.
Independent of the other compression-audit fixes; can land before or after #2803.
How much further could the pipeline copy be reduced (analysis, not in scope here)
Tracing the full upload path with these fixes applied:
uploadPartSlicesalready does the right thing —bytes.NewReader+io.MultiReaderare pure wrappers around ourdstBufslices, MD5 is computed in place over each slice, andretryablehttp.ReaderFuncis replayed on retry without buffering. So no extra copy occurs betweendstBufand the http body.What's left, and whether it's recoverable:
io.ReadFull(in, *buf)inreadLoop(kernel page cache → srcBuf). Recoverable via mmap of the source file and passing mmap'd slices intoaddFrame. Bounded gain: at ~50 GB/s memcpy ceiling, ~80 ms on a 4 GiB memfile vs ~3 s of zstd cost at workers=4. Plus extra lifetime/munmap plumbing (mmap regions can't go insync.Pool). Not worth it given current zstd-bound numbers.zstd.EncodeAllsrc → dst. This is the compression itself, not a "copy". The streaming alternativezstd.Writercould pipe directly into anio.Pipereader serving as the HTTP body, eliminatingdstBuf, but:frameEncodeWorkersparallelism, ~4× slower at workers=4) or stage frames somewhere — i.e. thedstBufyou just removed.bytes.Reader.Read→ http transport write buffer. Could be avoided with a customRoundTripperusingnet.Buffers.WriteTo(writev(2)), butcrypto/tls.Conn.Writereads the body in chunks regardless because each chunk is encrypted into a TLS record — the gain over HTTPS rounds back to zero.copy_from_user. Owned by Go runtime / kernel.Net: after B6 + O4, the only user-space copy that's both recoverable and architecturally simple is the kernel→srcBuf one (option 1), and it's tiny. True zero-copy would require giving up either multipart amortization or frame-level parallelism — both make the pipeline slower, not faster.