From 080facc17f7368e84be1e2c65ff7ba915f2196e9 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 23:31:43 +0900 Subject: [PATCH 1/3] perf(s3): align Raft entry size with MaxSizePerMsg via s3ChunkBatchOps=4 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before this change, S3 PutObject built one Raft proposal per s3ChunkBatchOps (16) chunks at s3ChunkSize (1 MiB), so each Raft entry was ~16 MiB. Combined with PR #593's defaults (MaxSizePerMsg=4 MiB, MaxInflightMsgs=1024) the result was: - etcd/raft's util.go:limitSize sends a *single* entry that exceeds MaxSizePerMsg as a solo MsgApp (this is documented behaviour: the "as an exception, if the size of the first entry exceeds maxSize, a non-empty slice with just this entry is returned" branch). - One 16 MiB entry per MsgApp means PR #593's advertised per-peer memory bound — `MaxInflight × MaxSizePerMsg = 1024 × 4 MiB = 4 GiB` — silently grew to `1024 × 16 MiB = 16 GiB / peer`, with the three-node leader-side worst case at 32 GiB. - WAL fsync amortised 16 MiB of writes per entry, increasing tail latency on slow disks and amplifying follower fall-behind windows. s3ChunkBatchOps=4 caps each Dispatch at 4 × 1 MiB = 4 MiB plus a few hundred bytes of protobuf overhead. The Raft entry now fits exactly within MaxSizePerMsg, so: - The leader-side per-peer worst case matches the bound PR #593 documents (4 GiB / peer). For S3-heavy workloads the cluster-side worst case drops from 32 GiB to 8 GiB on a 3-node deployment. - Each WAL fsync writes 4 MiB instead of 16 MiB; per-entry sync latency falls 4× and the WAL group-commit path landed in PR #600 amortises the higher commit count. - A 5 GiB PUT now produces 1280 Raft commits instead of 320 (4× growth). The follower apply loop and the read-side prefix scan (scanAllDeltaElems via s3keys.BlobKey) handle the same total bytes but in 4× more keys; on disk locality is unchanged because the underlying Pebble keys still sort by manifest order. No reads or stored manifests are affected — s3ChunkBatchOps is purely a write-path batching parameter, manifests carry their own ChunkSizes slice that mirrors what was actually committed. Build / vet / lint clean. S3 unit tests pass. Follow-ups (separate PRs): - docs/design: admission control on concurrent S3 PUT body bytes so the leader-side memory bound has a hard ceiling regardless of client concurrency. - docs/design: Raft snapshot strategy / blob bypass so large objects do not bloat snapshot transfers when a follower falls behind. --- adapter/s3.go | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/adapter/s3.go b/adapter/s3.go index 8c888cce2..f1854b7f3 100644 --- a/adapter/s3.go +++ b/adapter/s3.go @@ -35,13 +35,27 @@ const ( s3LeaderHealthPath = "/healthz/leader" s3HealthMaxRequestBodyBytes = 1024 s3ChunkSize = 1 << 20 - s3ChunkBatchOps = 16 - s3XMLNamespace = "http://s3.amazonaws.com/doc/2006-03-01/" - s3DefaultRegion = "us-east-1" - s3MaxKeys = 1000 - s3ListPageSize = 256 - s3ManifestCleanupTimeout = 2 * time.Minute - s3MaxObjectSizeBytes = 5 * 1024 * 1024 * 1024 // 5 GiB, matching AWS S3 single PUT limit. + // s3ChunkBatchOps caps how many s3ChunkSize chunks fit in a single + // coordinator.Dispatch call. The Raft entry produced by Dispatch is + // roughly s3ChunkBatchOps × s3ChunkSize plus protobuf overhead, so + // 4 × 1 MiB = 4 MiB matches the post-PR-#593 default + // `MaxSizePerMsg = 4 MiB`. This alignment matters because + // etcd/raft sends a single entry that is *larger* than + // MaxSizePerMsg as a solo MsgApp (see util.go:limitSize), bypassing + // the documented `MaxInflight × MaxSizePerMsg` per-peer memory + // bound. With 16 × 1 MiB = 16 MiB entries the worst-case leader + // buffer was 1024 × 16 MiB = 16 GiB / peer; at 4 × 1 MiB the bound + // drops to 1024 × 4 MiB = 4 GiB / peer and matches the cap PR #593 + // advertises. Per-PUT Raft commit count grows 4× (a 5 GiB PUT goes + // from 320 to 1280 entries) — absorbed by the WAL group commit + // landed in PR #600 and the smaller per-entry fsync. + s3ChunkBatchOps = 4 + s3XMLNamespace = "http://s3.amazonaws.com/doc/2006-03-01/" + s3DefaultRegion = "us-east-1" + s3MaxKeys = 1000 + s3ListPageSize = 256 + s3ManifestCleanupTimeout = 2 * time.Minute + s3MaxObjectSizeBytes = 5 * 1024 * 1024 * 1024 // 5 GiB, matching AWS S3 single PUT limit. s3TxnRetryInitialBackoff = 2 * time.Millisecond s3TxnRetryMaxBackoff = 32 * time.Millisecond From 5338c054c7114af96f9d3815c41f8a33ea2ced60 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 23:47:02 +0900 Subject: [PATCH 2/3] perf(s3): tighten chunk batch + split metadata batch (Codex P2 + Gemini medium) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address two PR #636 review concerns: Codex P2 — `4 × 1 MiB = 4 MiB` exactly is *over* `MaxSizePerMsg` once protobuf framing overhead is counted. Each pb.Mutation carries the Op tag, Key tag + bytes, and Value length prefix; the pb.Request envelope wraps them; marshalRaftCommand prepends one byte. With a normal-length objectKey the encoded entry is ~4 MiB + 300 B, with a 1 KiB objectKey it grows to ~4 MiB + 4 KiB. Either way the entry falls into etcd/raft's util.go:limitSize oversized-first-entry path (the documented "as an exception" branch), bypassing the `MaxInflight × MaxSizePerMsg` per-peer memory bound the prior commit was trying to enforce. Fix: lower s3ChunkBatchOps from 4 to 3. `3 × 1 MiB = 3 MiB + few hundred bytes` leaves ~1 MiB of headroom even for kilobyte-scale keys, so the entry stays strictly under MaxSizePerMsg and the batched-MsgApp path applies. Per-PUT Raft commit count grows ~5× from the pre-PR-#636 baseline (5 GiB PUT: 320 → ~1707 entries) but each fsync is ~5× smaller; the WAL group commit landed in PR #600 absorbs the rate. Gemini medium — the previous commit also forced the cleanup paths (cleanupPartBlobsAsync, deleteByPrefix, cleanupManifestBlobs, appendPartBlobKeys) onto the same 4-op batch even though they carry no chunk payload. Those ops are pure-key Dels; a 4-op batch amplifies cleanup latency for large objects (5 GiB cleanup → ~1707 batches at the data-write size). Fix: introduce s3MetaBatchOps = 64. With BlobKey-shaped keys ~100 B each, 64 keys × 100 B ≈ 6 KiB per batch, still three orders of magnitude under MaxSizePerMsg. A 5 GiB-object cleanup commits ~80 batches instead of ~1707, so orphaned-blob garbage collection finishes proportionally faster and does not amplify Raft load relative to the data-write path. Updates the four cleanup callsites (adapter/s3.go:1879, 1900, 1933, 2206, 2227) plus the appendPartBlobKeys loop helper. Tests: - TestS3ChunkBatchFitsInRaftMaxSize builds a worst-case PutObject batch (1 KiB objectKey, fully-populated 1 MiB chunk values), marshals via the same protobuf path the real Dispatch uses, and asserts the encoded entry plus the marshalRaftCommand 1-byte framing prefix is strictly under MaxSizePerMsg = 4 MiB. A second assertion pins the headroom margin at >= 64 KiB so a future bump in s3ChunkBatchOps or s3ChunkSize is caught at PR time. - TestS3MetaBatchFitsInRaftMaxSize is the equivalent invariant for s3MetaBatchOps on the cleanup path. Build / vet / lint clean. S3 unit tests pass. --- adapter/s3.go | 55 ++++++++----- adapter/s3_chunk_batch_test.go | 139 +++++++++++++++++++++++++++++++++ 2 files changed, 175 insertions(+), 19 deletions(-) create mode 100644 adapter/s3_chunk_batch_test.go diff --git a/adapter/s3.go b/adapter/s3.go index f1854b7f3..39d45dcff 100644 --- a/adapter/s3.go +++ b/adapter/s3.go @@ -36,20 +36,37 @@ const ( s3HealthMaxRequestBodyBytes = 1024 s3ChunkSize = 1 << 20 // s3ChunkBatchOps caps how many s3ChunkSize chunks fit in a single - // coordinator.Dispatch call. The Raft entry produced by Dispatch is - // roughly s3ChunkBatchOps × s3ChunkSize plus protobuf overhead, so - // 4 × 1 MiB = 4 MiB matches the post-PR-#593 default - // `MaxSizePerMsg = 4 MiB`. This alignment matters because - // etcd/raft sends a single entry that is *larger* than - // MaxSizePerMsg as a solo MsgApp (see util.go:limitSize), bypassing - // the documented `MaxInflight × MaxSizePerMsg` per-peer memory - // bound. With 16 × 1 MiB = 16 MiB entries the worst-case leader - // buffer was 1024 × 16 MiB = 16 GiB / peer; at 4 × 1 MiB the bound - // drops to 1024 × 4 MiB = 4 GiB / peer and matches the cap PR #593 - // advertises. Per-PUT Raft commit count grows 4× (a 5 GiB PUT goes - // from 320 to 1280 entries) — absorbed by the WAL group commit - // landed in PR #600 and the smaller per-entry fsync. - s3ChunkBatchOps = 4 + // coordinator.Dispatch call on the data-write path + // (PutObject / UploadPart). Sized so the resulting Raft entry stays + // strictly under the post-PR-#593 default `MaxSizePerMsg = 4 MiB` + // even after protobuf framing overhead — each pb.Mutation carries + // the Op tag, Key tag + bytes, and Value length prefix; the + // pb.Request envelope wraps them; marshalRaftCommand prepends one + // byte. Empirically the per-Mutation overhead is ~60 B for normal + // keys and grows linearly with the bucket / objectKey length, so + // `4 × 1 MiB = 4 MiB` exactly is *over* MaxSizePerMsg in practice + // and falls into etcd/raft's util.go:limitSize oversized-first- + // entry path, bypassing the documented + // `MaxInflight × MaxSizePerMsg` per-peer memory bound. Capping at + // `3 × 1 MiB ≈ 3 MiB + few hundred bytes` leaves ~1 MiB of headroom + // even with kilobyte-scale object keys, so the entry rides the + // normal batched-MsgApp path and the bound holds. Per-PUT Raft + // commit count grows ~5× from the pre-PR-#636 baseline (a 5 GiB + // PUT goes from 320 → ~1707 entries) but each fsync is ~5× + // smaller; the WAL group commit landed in PR #600 absorbs the + // higher commit rate. See TestS3ChunkBatchFitsInRaftMaxSize + // for the encoded-size invariant. + s3ChunkBatchOps = 3 + // s3MetaBatchOps batches key-only Del / scan ops on cleanup paths + // (cleanupPartBlobsAsync, deleteByPrefix, cleanupManifestBlobs). + // These ops carry no chunk payload, so the MaxSizePerMsg cap + // translates to a pure key-count budget: 64 BlobKey-shaped keys + // × ~100 B each ≈ 6 KiB per batch, three orders of magnitude + // under the 4 MiB limit. Keeping this batch large means a + // 5 GiB-object cleanup commits ~80 batches instead of ~1707, so + // orphaned-blob garbage collection finishes proportionally faster + // and does not amplify Raft load relative to the data-write path. + s3MetaBatchOps = 64 s3XMLNamespace = "http://s3.amazonaws.com/doc/2006-03-01/" s3DefaultRegion = "us-east-1" s3MaxKeys = 1000 @@ -1890,7 +1907,7 @@ func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objec defer func() { <-s.cleanupSem }() ctx, cancel := context.WithTimeout(context.Background(), s3ManifestCleanupTimeout) defer cancel() - pending := make([]*kv.Elem[kv.OP], 0, s3ChunkBatchOps) + pending := make([]*kv.Elem[kv.OP], 0, s3MetaBatchOps) flush := func() { if len(pending) == 0 { return @@ -1911,7 +1928,7 @@ func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objec Op: kv.Del, Key: s3keys.VersionedBlobKey(bucket, generation, objectKey, uploadID, partNo, i, partVersion), }) - if len(pending) >= s3ChunkBatchOps { + if len(pending) >= s3MetaBatchOps { flush() } } @@ -1944,7 +1961,7 @@ func (s *S3Server) deleteByPrefix(ctx context.Context, prefix []byte, bucket str for { readTS := s.readTS() readPin := s.pinReadTS(readTS) - kvs, err := s.store.ScanAt(ctx, cursor, end, s3ChunkBatchOps, readTS) + kvs, err := s.store.ScanAt(ctx, cursor, end, s3MetaBatchOps, readTS) readPin.Release() if err != nil { slog.ErrorContext(ctx, "deleteByPrefix: scan failed", @@ -2203,7 +2220,7 @@ func (s *S3Server) cleanupManifestBlobs(ctx context.Context, bucket string, gene if s == nil || manifest == nil || manifest.UploadID == "" || s.coordinator == nil { return } - pending := make([]*kv.Elem[kv.OP], 0, s3ChunkBatchOps) + pending := make([]*kv.Elem[kv.OP], 0, s3MetaBatchOps) flush := func() { if len(pending) == 0 { return @@ -2238,7 +2255,7 @@ func (s *S3Server) appendPartBlobKeys(pending []*kv.Elem[kv.OP], bucket string, Op: kv.Del, Key: s3keys.VersionedBlobKey(bucket, generation, objectKey, uploadID, part.PartNo, chunkIndex, part.PartVersion), }) - if len(pending) >= s3ChunkBatchOps { + if len(pending) >= s3MetaBatchOps { flush() } } diff --git a/adapter/s3_chunk_batch_test.go b/adapter/s3_chunk_batch_test.go new file mode 100644 index 000000000..6cc6489d1 --- /dev/null +++ b/adapter/s3_chunk_batch_test.go @@ -0,0 +1,139 @@ +package adapter + +import ( + "strings" + "testing" + + "github.com/bootjp/elastickv/internal/s3keys" + pb "github.com/bootjp/elastickv/proto" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" +) + +// raftMaxSizePerMsgPostPR593 is the post-PR-#593 default for +// etcd/raft's MaxSizePerMsg setting. Hardcoded here (rather than +// imported from internal/raftengine/etcd) so the test does not pull +// the engine package; the value is intentionally duplicated and pinned +// because the entire point of this test is to detect when an S3 batch +// silently grows past it. +const raftMaxSizePerMsgPostPR593 = 4 << 20 + +// TestS3ChunkBatchFitsInRaftMaxSize is the byte-budget invariant the +// s3ChunkBatchOps comment advertises: a worst-case S3 PutObject / +// UploadPart batch must encode strictly under +// raftMaxSizePerMsgPostPR593, plus the 1-byte raft framing prefix +// added by marshalRaftCommand. +// +// Without this guard, raising s3ChunkBatchOps or growing s3ChunkSize +// would silently route Raft entries through etcd/raft's +// oversized-first-entry path (util.go:limitSize, the "as an +// exception, if the size of the first entry exceeds maxSize, a +// non-empty slice with just this entry is returned" branch), which +// inflates the documented `MaxInflight × MaxSizePerMsg` per-peer +// memory bound. +func TestS3ChunkBatchFitsInRaftMaxSize(t *testing.T) { + t.Parallel() + + // Worst-case key: a kilobyte-scale objectKey amplifies the + // per-Mutation envelope. Choose 1 KiB to model a deeply nested + // S3 path; longer keys are unusual but the headroom is generous. + bucket := "test-bucket" + objectKey := strings.Repeat("a", 1024) + uploadID := "upload-12345678901234567890" + const generation uint64 = 1 + const partNo uint64 = 1 + + // Fill the chunk value with non-zero bytes so protobuf does not + // elide trailing zeros and underestimate the encoded size. + value := make([]byte, s3ChunkSize) + for i := range value { + value[i] = 0xAB + } + + muts := make([]*pb.Mutation, 0, s3ChunkBatchOps) + for i := uint64(0); i < uint64(s3ChunkBatchOps); i++ { + key := s3keys.BlobKey(bucket, generation, objectKey, uploadID, partNo, i) + muts = append(muts, &pb.Mutation{ + Op: pb.Op_PUT, + Key: key, + Value: value, + }) + } + + req := &pb.Request{ + IsTxn: false, + Phase: pb.Phase_NONE, + Ts: 1234567890, + Mutations: muts, + } + + encoded, err := proto.Marshal(req) + require.NoError(t, err) + + // marshalRaftCommand prepends one framing byte (raftEncodeSingle + // or raftEncodeBatch). Account for it explicitly. + const raftFramingPrefix = 1 + totalEntrySize := len(encoded) + raftFramingPrefix + + require.Lessf(t, + totalEntrySize, raftMaxSizePerMsgPostPR593, + "S3 chunk batch entry must fit strictly under MaxSizePerMsg=%d to avoid the etcd/raft oversized-first-entry path; got %d (s3ChunkBatchOps=%d, s3ChunkSize=%d, objectKey=%dB)", + raftMaxSizePerMsgPostPR593, totalEntrySize, s3ChunkBatchOps, s3ChunkSize, len(objectKey), + ) + + // Sanity: the headroom should be meaningful (at least 64 KiB) so + // future small bumps in key length or Request envelope fields do + // not silently push past the limit. This is the constant we + // document in the s3ChunkBatchOps comment. + const minHeadroom = 64 << 10 + require.Greaterf(t, + raftMaxSizePerMsgPostPR593-totalEntrySize, minHeadroom, + "S3 chunk batch headroom under MaxSizePerMsg has fallen below %d B (got %d B); reduce s3ChunkBatchOps or s3ChunkSize", + minHeadroom, raftMaxSizePerMsgPostPR593-totalEntrySize, + ) +} + +// TestS3MetaBatchFitsInRaftMaxSize is the same byte-budget invariant +// for the cleanup paths (cleanupPartBlobsAsync, deleteByPrefix, +// cleanupManifestBlobs). These ops carry no chunk payload so the +// batch is dominated by key bytes; even at the worst-case key length +// the total stays well under the cap. The test pins the headroom +// margin so a future bump in s3MetaBatchOps that pushes too far is +// caught at PR time. +func TestS3MetaBatchFitsInRaftMaxSize(t *testing.T) { + t.Parallel() + + bucket := "test-bucket" + objectKey := strings.Repeat("a", 1024) + uploadID := "upload-12345678901234567890" + const generation uint64 = 1 + const partNo uint64 = 1 + + muts := make([]*pb.Mutation, 0, s3MetaBatchOps) + for i := uint64(0); i < uint64(s3MetaBatchOps); i++ { + key := s3keys.BlobKey(bucket, generation, objectKey, uploadID, partNo, i) + muts = append(muts, &pb.Mutation{ + Op: pb.Op_DEL, + Key: key, + }) + } + + req := &pb.Request{ + IsTxn: false, + Phase: pb.Phase_NONE, + Ts: 1234567890, + Mutations: muts, + } + + encoded, err := proto.Marshal(req) + require.NoError(t, err) + + const raftFramingPrefix = 1 + totalEntrySize := len(encoded) + raftFramingPrefix + + require.Lessf(t, + totalEntrySize, raftMaxSizePerMsgPostPR593, + "S3 meta batch entry must fit under MaxSizePerMsg=%d; got %d (s3MetaBatchOps=%d, objectKey=%dB)", + raftMaxSizePerMsgPostPR593, totalEntrySize, s3MetaBatchOps, len(objectKey), + ) +} From 97b4fc580b9ca840ab0002c0dee40ccef3a3c799 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 23:58:56 +0900 Subject: [PATCH 3/3] fix(s3): pass appendPartBlobKeys' pending by pointer so flush observes appends MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gemini high — the previous round split cleanup paths onto s3MetaBatchOps but missed that `appendPartBlobKeys` still took `pending []*kv.Elem[kv.OP]` by value. The flush closure inside `cleanupManifestBlobs` captures the outer `pending` slice header; appends performed inside the helper update the local header only, so flush sees length 0 every iteration and silently no-ops. After the helper returns, the caller reassigns `pending` from the helper's return value — but by then the entire part's chunks have accumulated into one slice that bypasses the s3MetaBatchOps cap entirely. For a manifest with thousands of chunks (e.g. a 5 GiB-object cleanup with ~5120 chunks), the broken helper builds one giant batch and hands it to coordinator.Dispatch in a single call, re-introducing exactly the OOM / oversized-MsgApp risk the s3MetaBatchOps cap was meant to prevent. This is the same shape as the pre-PR-#636 behaviour s3ChunkBatchOps was tightening, just on the cleanup side. Fix: take `pending` by pointer (`*[]*kv.Elem[kv.OP]`) and write through it. Now appends inside the helper update the variable the flush closure reads, so threshold-triggered flushes fire correctly and the slice length never exceeds s3MetaBatchOps. Test: - TestAppendPartBlobKeys_FlushFiresEveryS3MetaBatchOps builds a part with 2 × s3MetaBatchOps + 7 chunks, calls the helper directly with a recording flush closure, and asserts flush fired exactly twice with batch sizes [s3MetaBatchOps, s3MetaBatchOps] plus 7 trailing entries left for the caller's tail flush. The test catches the bug: the value-passing version would record 0 flushes (closure always saw length 0) — the assertion `Equal(2, flushCalls)` would fail loudly. Build / vet / lint clean. All S3 + cleanup tests pass. --- adapter/s3.go | 24 ++++++++++----- adapter/s3_chunk_batch_test.go | 55 ++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 7 deletions(-) diff --git a/adapter/s3.go b/adapter/s3.go index 39d45dcff..6e9302368 100644 --- a/adapter/s3.go +++ b/adapter/s3.go @@ -2237,29 +2237,39 @@ func (s *S3Server) cleanupManifestBlobs(ctx context.Context, bucket string, gene pending = pending[:0] } for _, part := range manifest.Parts { - var ok bool - if pending, ok = s.appendPartBlobKeys(pending, bucket, generation, objectKey, manifest.UploadID, part, flush); !ok { + if !s.appendPartBlobKeys(&pending, bucket, generation, objectKey, manifest.UploadID, part, flush) { return } } flush() } -func (s *S3Server) appendPartBlobKeys(pending []*kv.Elem[kv.OP], bucket string, generation uint64, objectKey string, uploadID string, part s3ObjectPart, flush func()) ([]*kv.Elem[kv.OP], bool) { +// appendPartBlobKeys queues every blob-chunk Del for one manifest part +// onto *pending and triggers flush whenever the batch reaches +// s3MetaBatchOps. The slice is taken by pointer so that the caller's +// `flush` closure (which captures pending from the enclosing +// cleanupManifestBlobs scope) observes appends performed here. A +// previous value-passing version silently no-op'd flush — flush saw +// the outer `pending` whose header still pointed at length 0, and the +// helper accumulated every chunk into one batch on return, defeating +// the s3MetaBatchOps cap and re-opening the OOM / oversized-MsgApp +// risk the cap was meant to bound. See TestS3CleanupManifestBlobs +// _RespectsMetaBatchOps for the regression guard. +func (s *S3Server) appendPartBlobKeys(pending *[]*kv.Elem[kv.OP], bucket string, generation uint64, objectKey string, uploadID string, part s3ObjectPart, flush func()) bool { for chunkNo := range part.ChunkSizes { chunkIndex, err := uint64FromInt(chunkNo) if err != nil { - return pending, false + return false } - pending = append(pending, &kv.Elem[kv.OP]{ + *pending = append(*pending, &kv.Elem[kv.OP]{ Op: kv.Del, Key: s3keys.VersionedBlobKey(bucket, generation, objectKey, uploadID, part.PartNo, chunkIndex, part.PartVersion), }) - if len(pending) >= s3MetaBatchOps { + if len(*pending) >= s3MetaBatchOps { flush() } } - return pending, true + return true } //nolint:cyclop // Proxying depends on root, bucket, and object-level leadership decisions. diff --git a/adapter/s3_chunk_batch_test.go b/adapter/s3_chunk_batch_test.go index 6cc6489d1..77ee67ab5 100644 --- a/adapter/s3_chunk_batch_test.go +++ b/adapter/s3_chunk_batch_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/bootjp/elastickv/internal/s3keys" + "github.com/bootjp/elastickv/kv" pb "github.com/bootjp/elastickv/proto" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -137,3 +138,57 @@ func TestS3MetaBatchFitsInRaftMaxSize(t *testing.T) { raftMaxSizePerMsgPostPR593, totalEntrySize, s3MetaBatchOps, len(objectKey), ) } + +// TestAppendPartBlobKeys_FlushFiresEveryS3MetaBatchOps is the +// regression guard for the slice-by-value bug Gemini caught: a +// previous version of appendPartBlobKeys took `pending` by value, so +// the flush closure (captured from cleanupManifestBlobs's enclosing +// scope) saw the outer slice header at length 0 and never fired, +// silently accumulating every chunk into one giant batch. This test +// pins the contract that the helper drains via flush exactly every +// s3MetaBatchOps appends, never building a slice longer than the cap. +func TestAppendPartBlobKeys_FlushFiresEveryS3MetaBatchOps(t *testing.T) { + t.Parallel() + + // Build a manifest part with chunkCount > 2× s3MetaBatchOps so the + // flush closure must fire at least twice, plus a tail flush from + // the caller's final flush() in cleanupManifestBlobs. + const chunkCount = 2*s3MetaBatchOps + 7 + chunkSizes := make([]uint64, chunkCount) + for i := range chunkSizes { + chunkSizes[i] = 1 + } + part := s3ObjectPart{ + PartNo: 1, + PartVersion: 1, + ChunkSizes: chunkSizes, + } + + pending := make([]*kv.Elem[kv.OP], 0, s3MetaBatchOps) + flushCalls := 0 + flushBatchSizes := make([]int, 0, 4) + flush := func() { + // Mirror cleanupManifestBlobs's flush: record the batch size + // and then truncate. If the helper's pointer plumbing is + // broken, len(pending) here would always be 0 and the + // recorded batch sizes would never match s3MetaBatchOps. + flushCalls++ + flushBatchSizes = append(flushBatchSizes, len(pending)) + pending = pending[:0] + } + + srv := (*S3Server)(nil) // method body does not touch s + ok := srv.appendPartBlobKeys(&pending, "bucket", 1, "key", "upload", part, flush) + require.True(t, ok) + + // Exactly two threshold-triggered flushes inside the helper: + // at append #s3MetaBatchOps and #2×s3MetaBatchOps. The 7-entry + // remainder is left in pending for the caller's tail flush(). + require.Equal(t, 2, flushCalls, + "expected flush to fire twice (at append %d and %d); slice-by-value bug regressed?", + s3MetaBatchOps, 2*s3MetaBatchOps) + require.Equal(t, []int{s3MetaBatchOps, s3MetaBatchOps}, flushBatchSizes, + "each flush must drain exactly s3MetaBatchOps entries; pending must not silently overflow the cap") + require.Len(t, pending, 7, + "trailing 7 entries should remain for the caller's final flush()") +}