diff --git a/adapter/s3.go b/adapter/s3.go index 8c888cce2..6e9302368 100644 --- a/adapter/s3.go +++ b/adapter/s3.go @@ -35,13 +35,44 @@ const ( s3LeaderHealthPath = "/healthz/leader" s3HealthMaxRequestBodyBytes = 1024 s3ChunkSize = 1 << 20 - s3ChunkBatchOps = 16 - s3XMLNamespace = "http://s3.amazonaws.com/doc/2006-03-01/" - s3DefaultRegion = "us-east-1" - s3MaxKeys = 1000 - s3ListPageSize = 256 - s3ManifestCleanupTimeout = 2 * time.Minute - s3MaxObjectSizeBytes = 5 * 1024 * 1024 * 1024 // 5 GiB, matching AWS S3 single PUT limit. + // s3ChunkBatchOps caps how many s3ChunkSize chunks fit in a single + // coordinator.Dispatch call on the data-write path + // (PutObject / UploadPart). Sized so the resulting Raft entry stays + // strictly under the post-PR-#593 default `MaxSizePerMsg = 4 MiB` + // even after protobuf framing overhead — each pb.Mutation carries + // the Op tag, Key tag + bytes, and Value length prefix; the + // pb.Request envelope wraps them; marshalRaftCommand prepends one + // byte. Empirically the per-Mutation overhead is ~60 B for normal + // keys and grows linearly with the bucket / objectKey length, so + // `4 × 1 MiB = 4 MiB` exactly is *over* MaxSizePerMsg in practice + // and falls into etcd/raft's util.go:limitSize oversized-first- + // entry path, bypassing the documented + // `MaxInflight × MaxSizePerMsg` per-peer memory bound. Capping at + // `3 × 1 MiB ≈ 3 MiB + few hundred bytes` leaves ~1 MiB of headroom + // even with kilobyte-scale object keys, so the entry rides the + // normal batched-MsgApp path and the bound holds. Per-PUT Raft + // commit count grows ~5× from the pre-PR-#636 baseline (a 5 GiB + // PUT goes from 320 → ~1707 entries) but each fsync is ~5× + // smaller; the WAL group commit landed in PR #600 absorbs the + // higher commit rate. See TestS3ChunkBatchFitsInRaftMaxSize + // for the encoded-size invariant. + s3ChunkBatchOps = 3 + // s3MetaBatchOps batches key-only Del / scan ops on cleanup paths + // (cleanupPartBlobsAsync, deleteByPrefix, cleanupManifestBlobs). + // These ops carry no chunk payload, so the MaxSizePerMsg cap + // translates to a pure key-count budget: 64 BlobKey-shaped keys + // × ~100 B each ≈ 6 KiB per batch, three orders of magnitude + // under the 4 MiB limit. Keeping this batch large means a + // 5 GiB-object cleanup commits ~80 batches instead of ~1707, so + // orphaned-blob garbage collection finishes proportionally faster + // and does not amplify Raft load relative to the data-write path. + s3MetaBatchOps = 64 + s3XMLNamespace = "http://s3.amazonaws.com/doc/2006-03-01/" + s3DefaultRegion = "us-east-1" + s3MaxKeys = 1000 + s3ListPageSize = 256 + s3ManifestCleanupTimeout = 2 * time.Minute + s3MaxObjectSizeBytes = 5 * 1024 * 1024 * 1024 // 5 GiB, matching AWS S3 single PUT limit. s3TxnRetryInitialBackoff = 2 * time.Millisecond s3TxnRetryMaxBackoff = 32 * time.Millisecond @@ -1876,7 +1907,7 @@ func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objec defer func() { <-s.cleanupSem }() ctx, cancel := context.WithTimeout(context.Background(), s3ManifestCleanupTimeout) defer cancel() - pending := make([]*kv.Elem[kv.OP], 0, s3ChunkBatchOps) + pending := make([]*kv.Elem[kv.OP], 0, s3MetaBatchOps) flush := func() { if len(pending) == 0 { return @@ -1897,7 +1928,7 @@ func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objec Op: kv.Del, Key: s3keys.VersionedBlobKey(bucket, generation, objectKey, uploadID, partNo, i, partVersion), }) - if len(pending) >= s3ChunkBatchOps { + if len(pending) >= s3MetaBatchOps { flush() } } @@ -1930,7 +1961,7 @@ func (s *S3Server) deleteByPrefix(ctx context.Context, prefix []byte, bucket str for { readTS := s.readTS() readPin := s.pinReadTS(readTS) - kvs, err := s.store.ScanAt(ctx, cursor, end, s3ChunkBatchOps, readTS) + kvs, err := s.store.ScanAt(ctx, cursor, end, s3MetaBatchOps, readTS) readPin.Release() if err != nil { slog.ErrorContext(ctx, "deleteByPrefix: scan failed", @@ -2189,7 +2220,7 @@ func (s *S3Server) cleanupManifestBlobs(ctx context.Context, bucket string, gene if s == nil || manifest == nil || manifest.UploadID == "" || s.coordinator == nil { return } - pending := make([]*kv.Elem[kv.OP], 0, s3ChunkBatchOps) + pending := make([]*kv.Elem[kv.OP], 0, s3MetaBatchOps) flush := func() { if len(pending) == 0 { return @@ -2206,29 +2237,39 @@ func (s *S3Server) cleanupManifestBlobs(ctx context.Context, bucket string, gene pending = pending[:0] } for _, part := range manifest.Parts { - var ok bool - if pending, ok = s.appendPartBlobKeys(pending, bucket, generation, objectKey, manifest.UploadID, part, flush); !ok { + if !s.appendPartBlobKeys(&pending, bucket, generation, objectKey, manifest.UploadID, part, flush) { return } } flush() } -func (s *S3Server) appendPartBlobKeys(pending []*kv.Elem[kv.OP], bucket string, generation uint64, objectKey string, uploadID string, part s3ObjectPart, flush func()) ([]*kv.Elem[kv.OP], bool) { +// appendPartBlobKeys queues every blob-chunk Del for one manifest part +// onto *pending and triggers flush whenever the batch reaches +// s3MetaBatchOps. The slice is taken by pointer so that the caller's +// `flush` closure (which captures pending from the enclosing +// cleanupManifestBlobs scope) observes appends performed here. A +// previous value-passing version silently no-op'd flush — flush saw +// the outer `pending` whose header still pointed at length 0, and the +// helper accumulated every chunk into one batch on return, defeating +// the s3MetaBatchOps cap and re-opening the OOM / oversized-MsgApp +// risk the cap was meant to bound. See TestS3CleanupManifestBlobs +// _RespectsMetaBatchOps for the regression guard. +func (s *S3Server) appendPartBlobKeys(pending *[]*kv.Elem[kv.OP], bucket string, generation uint64, objectKey string, uploadID string, part s3ObjectPart, flush func()) bool { for chunkNo := range part.ChunkSizes { chunkIndex, err := uint64FromInt(chunkNo) if err != nil { - return pending, false + return false } - pending = append(pending, &kv.Elem[kv.OP]{ + *pending = append(*pending, &kv.Elem[kv.OP]{ Op: kv.Del, Key: s3keys.VersionedBlobKey(bucket, generation, objectKey, uploadID, part.PartNo, chunkIndex, part.PartVersion), }) - if len(pending) >= s3ChunkBatchOps { + if len(*pending) >= s3MetaBatchOps { flush() } } - return pending, true + return true } //nolint:cyclop // Proxying depends on root, bucket, and object-level leadership decisions. diff --git a/adapter/s3_chunk_batch_test.go b/adapter/s3_chunk_batch_test.go new file mode 100644 index 000000000..77ee67ab5 --- /dev/null +++ b/adapter/s3_chunk_batch_test.go @@ -0,0 +1,194 @@ +package adapter + +import ( + "strings" + "testing" + + "github.com/bootjp/elastickv/internal/s3keys" + "github.com/bootjp/elastickv/kv" + pb "github.com/bootjp/elastickv/proto" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" +) + +// raftMaxSizePerMsgPostPR593 is the post-PR-#593 default for +// etcd/raft's MaxSizePerMsg setting. Hardcoded here (rather than +// imported from internal/raftengine/etcd) so the test does not pull +// the engine package; the value is intentionally duplicated and pinned +// because the entire point of this test is to detect when an S3 batch +// silently grows past it. +const raftMaxSizePerMsgPostPR593 = 4 << 20 + +// TestS3ChunkBatchFitsInRaftMaxSize is the byte-budget invariant the +// s3ChunkBatchOps comment advertises: a worst-case S3 PutObject / +// UploadPart batch must encode strictly under +// raftMaxSizePerMsgPostPR593, plus the 1-byte raft framing prefix +// added by marshalRaftCommand. +// +// Without this guard, raising s3ChunkBatchOps or growing s3ChunkSize +// would silently route Raft entries through etcd/raft's +// oversized-first-entry path (util.go:limitSize, the "as an +// exception, if the size of the first entry exceeds maxSize, a +// non-empty slice with just this entry is returned" branch), which +// inflates the documented `MaxInflight × MaxSizePerMsg` per-peer +// memory bound. +func TestS3ChunkBatchFitsInRaftMaxSize(t *testing.T) { + t.Parallel() + + // Worst-case key: a kilobyte-scale objectKey amplifies the + // per-Mutation envelope. Choose 1 KiB to model a deeply nested + // S3 path; longer keys are unusual but the headroom is generous. + bucket := "test-bucket" + objectKey := strings.Repeat("a", 1024) + uploadID := "upload-12345678901234567890" + const generation uint64 = 1 + const partNo uint64 = 1 + + // Fill the chunk value with non-zero bytes so protobuf does not + // elide trailing zeros and underestimate the encoded size. + value := make([]byte, s3ChunkSize) + for i := range value { + value[i] = 0xAB + } + + muts := make([]*pb.Mutation, 0, s3ChunkBatchOps) + for i := uint64(0); i < uint64(s3ChunkBatchOps); i++ { + key := s3keys.BlobKey(bucket, generation, objectKey, uploadID, partNo, i) + muts = append(muts, &pb.Mutation{ + Op: pb.Op_PUT, + Key: key, + Value: value, + }) + } + + req := &pb.Request{ + IsTxn: false, + Phase: pb.Phase_NONE, + Ts: 1234567890, + Mutations: muts, + } + + encoded, err := proto.Marshal(req) + require.NoError(t, err) + + // marshalRaftCommand prepends one framing byte (raftEncodeSingle + // or raftEncodeBatch). Account for it explicitly. + const raftFramingPrefix = 1 + totalEntrySize := len(encoded) + raftFramingPrefix + + require.Lessf(t, + totalEntrySize, raftMaxSizePerMsgPostPR593, + "S3 chunk batch entry must fit strictly under MaxSizePerMsg=%d to avoid the etcd/raft oversized-first-entry path; got %d (s3ChunkBatchOps=%d, s3ChunkSize=%d, objectKey=%dB)", + raftMaxSizePerMsgPostPR593, totalEntrySize, s3ChunkBatchOps, s3ChunkSize, len(objectKey), + ) + + // Sanity: the headroom should be meaningful (at least 64 KiB) so + // future small bumps in key length or Request envelope fields do + // not silently push past the limit. This is the constant we + // document in the s3ChunkBatchOps comment. + const minHeadroom = 64 << 10 + require.Greaterf(t, + raftMaxSizePerMsgPostPR593-totalEntrySize, minHeadroom, + "S3 chunk batch headroom under MaxSizePerMsg has fallen below %d B (got %d B); reduce s3ChunkBatchOps or s3ChunkSize", + minHeadroom, raftMaxSizePerMsgPostPR593-totalEntrySize, + ) +} + +// TestS3MetaBatchFitsInRaftMaxSize is the same byte-budget invariant +// for the cleanup paths (cleanupPartBlobsAsync, deleteByPrefix, +// cleanupManifestBlobs). These ops carry no chunk payload so the +// batch is dominated by key bytes; even at the worst-case key length +// the total stays well under the cap. The test pins the headroom +// margin so a future bump in s3MetaBatchOps that pushes too far is +// caught at PR time. +func TestS3MetaBatchFitsInRaftMaxSize(t *testing.T) { + t.Parallel() + + bucket := "test-bucket" + objectKey := strings.Repeat("a", 1024) + uploadID := "upload-12345678901234567890" + const generation uint64 = 1 + const partNo uint64 = 1 + + muts := make([]*pb.Mutation, 0, s3MetaBatchOps) + for i := uint64(0); i < uint64(s3MetaBatchOps); i++ { + key := s3keys.BlobKey(bucket, generation, objectKey, uploadID, partNo, i) + muts = append(muts, &pb.Mutation{ + Op: pb.Op_DEL, + Key: key, + }) + } + + req := &pb.Request{ + IsTxn: false, + Phase: pb.Phase_NONE, + Ts: 1234567890, + Mutations: muts, + } + + encoded, err := proto.Marshal(req) + require.NoError(t, err) + + const raftFramingPrefix = 1 + totalEntrySize := len(encoded) + raftFramingPrefix + + require.Lessf(t, + totalEntrySize, raftMaxSizePerMsgPostPR593, + "S3 meta batch entry must fit under MaxSizePerMsg=%d; got %d (s3MetaBatchOps=%d, objectKey=%dB)", + raftMaxSizePerMsgPostPR593, totalEntrySize, s3MetaBatchOps, len(objectKey), + ) +} + +// TestAppendPartBlobKeys_FlushFiresEveryS3MetaBatchOps is the +// regression guard for the slice-by-value bug Gemini caught: a +// previous version of appendPartBlobKeys took `pending` by value, so +// the flush closure (captured from cleanupManifestBlobs's enclosing +// scope) saw the outer slice header at length 0 and never fired, +// silently accumulating every chunk into one giant batch. This test +// pins the contract that the helper drains via flush exactly every +// s3MetaBatchOps appends, never building a slice longer than the cap. +func TestAppendPartBlobKeys_FlushFiresEveryS3MetaBatchOps(t *testing.T) { + t.Parallel() + + // Build a manifest part with chunkCount > 2× s3MetaBatchOps so the + // flush closure must fire at least twice, plus a tail flush from + // the caller's final flush() in cleanupManifestBlobs. + const chunkCount = 2*s3MetaBatchOps + 7 + chunkSizes := make([]uint64, chunkCount) + for i := range chunkSizes { + chunkSizes[i] = 1 + } + part := s3ObjectPart{ + PartNo: 1, + PartVersion: 1, + ChunkSizes: chunkSizes, + } + + pending := make([]*kv.Elem[kv.OP], 0, s3MetaBatchOps) + flushCalls := 0 + flushBatchSizes := make([]int, 0, 4) + flush := func() { + // Mirror cleanupManifestBlobs's flush: record the batch size + // and then truncate. If the helper's pointer plumbing is + // broken, len(pending) here would always be 0 and the + // recorded batch sizes would never match s3MetaBatchOps. + flushCalls++ + flushBatchSizes = append(flushBatchSizes, len(pending)) + pending = pending[:0] + } + + srv := (*S3Server)(nil) // method body does not touch s + ok := srv.appendPartBlobKeys(&pending, "bucket", 1, "key", "upload", part, flush) + require.True(t, ok) + + // Exactly two threshold-triggered flushes inside the helper: + // at append #s3MetaBatchOps and #2×s3MetaBatchOps. The 7-entry + // remainder is left in pending for the caller's tail flush(). + require.Equal(t, 2, flushCalls, + "expected flush to fire twice (at append %d and %d); slice-by-value bug regressed?", + s3MetaBatchOps, 2*s3MetaBatchOps) + require.Equal(t, []int{s3MetaBatchOps, s3MetaBatchOps}, flushBatchSizes, + "each flush must drain exactly s3MetaBatchOps entries; pending must not silently overflow the cap") + require.Len(t, pending, 7, + "trailing 7 entries should remain for the caller's final flush()") +}