Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 59 additions & 18 deletions adapter/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,44 @@ const (
s3LeaderHealthPath = "/healthz/leader"
s3HealthMaxRequestBodyBytes = 1024
s3ChunkSize = 1 << 20
s3ChunkBatchOps = 16
s3XMLNamespace = "http://s3.amazonaws.com/doc/2006-03-01/"
s3DefaultRegion = "us-east-1"
s3MaxKeys = 1000
s3ListPageSize = 256
s3ManifestCleanupTimeout = 2 * time.Minute
s3MaxObjectSizeBytes = 5 * 1024 * 1024 * 1024 // 5 GiB, matching AWS S3 single PUT limit.
// s3ChunkBatchOps caps how many s3ChunkSize chunks fit in a single
// coordinator.Dispatch call on the data-write path
// (PutObject / UploadPart). Sized so the resulting Raft entry stays
// strictly under the post-PR-#593 default `MaxSizePerMsg = 4 MiB`
// even after protobuf framing overhead — each pb.Mutation carries
// the Op tag, Key tag + bytes, and Value length prefix; the
// pb.Request envelope wraps them; marshalRaftCommand prepends one
// byte. Empirically the per-Mutation overhead is ~60 B for normal
// keys and grows linearly with the bucket / objectKey length, so
// `4 × 1 MiB = 4 MiB` exactly is *over* MaxSizePerMsg in practice
// and falls into etcd/raft's util.go:limitSize oversized-first-
// entry path, bypassing the documented
// `MaxInflight × MaxSizePerMsg` per-peer memory bound. Capping at
// `3 × 1 MiB ≈ 3 MiB + few hundred bytes` leaves ~1 MiB of headroom
// even with kilobyte-scale object keys, so the entry rides the
// normal batched-MsgApp path and the bound holds. Per-PUT Raft
// commit count grows ~5× from the pre-PR-#636 baseline (a 5 GiB
// PUT goes from 320 → ~1707 entries) but each fsync is ~5×
// smaller; the WAL group commit landed in PR #600 absorbs the
// higher commit rate. See TestS3ChunkBatchFitsInRaftMaxSize
// for the encoded-size invariant.
s3ChunkBatchOps = 3
// s3MetaBatchOps batches key-only Del / scan ops on cleanup paths
// (cleanupPartBlobsAsync, deleteByPrefix, cleanupManifestBlobs).
// These ops carry no chunk payload, so the MaxSizePerMsg cap
// translates to a pure key-count budget: 64 BlobKey-shaped keys
// × ~100 B each ≈ 6 KiB per batch, three orders of magnitude
// under the 4 MiB limit. Keeping this batch large means a
// 5 GiB-object cleanup commits ~80 batches instead of ~1707, so
// orphaned-blob garbage collection finishes proportionally faster
// and does not amplify Raft load relative to the data-write path.
s3MetaBatchOps = 64
s3XMLNamespace = "http://s3.amazonaws.com/doc/2006-03-01/"
s3DefaultRegion = "us-east-1"
s3MaxKeys = 1000
s3ListPageSize = 256
s3ManifestCleanupTimeout = 2 * time.Minute
s3MaxObjectSizeBytes = 5 * 1024 * 1024 * 1024 // 5 GiB, matching AWS S3 single PUT limit.

s3TxnRetryInitialBackoff = 2 * time.Millisecond
s3TxnRetryMaxBackoff = 32 * time.Millisecond
Expand Down Expand Up @@ -1876,7 +1907,7 @@ func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objec
defer func() { <-s.cleanupSem }()
ctx, cancel := context.WithTimeout(context.Background(), s3ManifestCleanupTimeout)
defer cancel()
pending := make([]*kv.Elem[kv.OP], 0, s3ChunkBatchOps)
pending := make([]*kv.Elem[kv.OP], 0, s3MetaBatchOps)
flush := func() {
if len(pending) == 0 {
return
Expand All @@ -1897,7 +1928,7 @@ func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objec
Op: kv.Del,
Key: s3keys.VersionedBlobKey(bucket, generation, objectKey, uploadID, partNo, i, partVersion),
})
if len(pending) >= s3ChunkBatchOps {
if len(pending) >= s3MetaBatchOps {
flush()
}
}
Expand Down Expand Up @@ -1930,7 +1961,7 @@ func (s *S3Server) deleteByPrefix(ctx context.Context, prefix []byte, bucket str
for {
readTS := s.readTS()
readPin := s.pinReadTS(readTS)
kvs, err := s.store.ScanAt(ctx, cursor, end, s3ChunkBatchOps, readTS)
kvs, err := s.store.ScanAt(ctx, cursor, end, s3MetaBatchOps, readTS)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep blob cleanup scan page size independent of delete batch

deleteByPrefix now uses s3MetaBatchOps for ScanAt, so cleanup of blob prefixes scans 64 full chunk KVs per loop. In this codebase, ScanAt materializes values (e.g. store/mvcc_store.go clones Value in collectScanResults), and blob values are ~1 MiB each, so one cleanup worker can allocate/read ~64 MiB per page and up to ~1 GiB with 16 concurrent cleanup workers. This is a regression from the smaller scan page and can cause significant memory/GC and I/O spikes during abort/manifest cleanup even though the code only needs keys for deletes; consider keeping scan size small (or key-only scan) while still batching delete dispatches at 64.

Useful? React with 👍 / 👎.

readPin.Release()
if err != nil {
slog.ErrorContext(ctx, "deleteByPrefix: scan failed",
Expand Down Expand Up @@ -2189,7 +2220,7 @@ func (s *S3Server) cleanupManifestBlobs(ctx context.Context, bucket string, gene
if s == nil || manifest == nil || manifest.UploadID == "" || s.coordinator == nil {
return
}
pending := make([]*kv.Elem[kv.OP], 0, s3ChunkBatchOps)
pending := make([]*kv.Elem[kv.OP], 0, s3MetaBatchOps)
flush := func() {
if len(pending) == 0 {
return
Expand All @@ -2206,29 +2237,39 @@ func (s *S3Server) cleanupManifestBlobs(ctx context.Context, bucket string, gene
pending = pending[:0]
}
for _, part := range manifest.Parts {
var ok bool
if pending, ok = s.appendPartBlobKeys(pending, bucket, generation, objectKey, manifest.UploadID, part, flush); !ok {
if !s.appendPartBlobKeys(&pending, bucket, generation, objectKey, manifest.UploadID, part, flush) {
return
}
}
flush()
}

func (s *S3Server) appendPartBlobKeys(pending []*kv.Elem[kv.OP], bucket string, generation uint64, objectKey string, uploadID string, part s3ObjectPart, flush func()) ([]*kv.Elem[kv.OP], bool) {
// appendPartBlobKeys queues every blob-chunk Del for one manifest part
// onto *pending and triggers flush whenever the batch reaches
// s3MetaBatchOps. The slice is taken by pointer so that the caller's
// `flush` closure (which captures pending from the enclosing
// cleanupManifestBlobs scope) observes appends performed here. A
// previous value-passing version silently no-op'd flush — flush saw
// the outer `pending` whose header still pointed at length 0, and the
// helper accumulated every chunk into one batch on return, defeating
// the s3MetaBatchOps cap and re-opening the OOM / oversized-MsgApp
// risk the cap was meant to bound. See TestS3CleanupManifestBlobs
// _RespectsMetaBatchOps for the regression guard.
func (s *S3Server) appendPartBlobKeys(pending *[]*kv.Elem[kv.OP], bucket string, generation uint64, objectKey string, uploadID string, part s3ObjectPart, flush func()) bool {
for chunkNo := range part.ChunkSizes {
chunkIndex, err := uint64FromInt(chunkNo)
if err != nil {
return pending, false
return false
}
pending = append(pending, &kv.Elem[kv.OP]{
*pending = append(*pending, &kv.Elem[kv.OP]{
Op: kv.Del,
Key: s3keys.VersionedBlobKey(bucket, generation, objectKey, uploadID, part.PartNo, chunkIndex, part.PartVersion),
})
if len(pending) >= s3ChunkBatchOps {
if len(*pending) >= s3MetaBatchOps {
flush()
}
}
return pending, true
return true
}

//nolint:cyclop // Proxying depends on root, bucket, and object-level leadership decisions.
Expand Down
194 changes: 194 additions & 0 deletions adapter/s3_chunk_batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package adapter

import (
"strings"
"testing"

"github.com/bootjp/elastickv/internal/s3keys"
"github.com/bootjp/elastickv/kv"
pb "github.com/bootjp/elastickv/proto"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
)

// raftMaxSizePerMsgPostPR593 is the post-PR-#593 default for
// etcd/raft's MaxSizePerMsg setting. Hardcoded here (rather than
// imported from internal/raftengine/etcd) so the test does not pull
// the engine package; the value is intentionally duplicated and pinned
// because the entire point of this test is to detect when an S3 batch
// silently grows past it.
const raftMaxSizePerMsgPostPR593 = 4 << 20

// TestS3ChunkBatchFitsInRaftMaxSize is the byte-budget invariant the
// s3ChunkBatchOps comment advertises: a worst-case S3 PutObject /
// UploadPart batch must encode strictly under
// raftMaxSizePerMsgPostPR593, plus the 1-byte raft framing prefix
// added by marshalRaftCommand.
//
// Without this guard, raising s3ChunkBatchOps or growing s3ChunkSize
// would silently route Raft entries through etcd/raft's
// oversized-first-entry path (util.go:limitSize, the "as an
// exception, if the size of the first entry exceeds maxSize, a
// non-empty slice with just this entry is returned" branch), which
// inflates the documented `MaxInflight × MaxSizePerMsg` per-peer
// memory bound.
func TestS3ChunkBatchFitsInRaftMaxSize(t *testing.T) {
t.Parallel()

// Worst-case key: a kilobyte-scale objectKey amplifies the
// per-Mutation envelope. Choose 1 KiB to model a deeply nested
// S3 path; longer keys are unusual but the headroom is generous.
bucket := "test-bucket"
objectKey := strings.Repeat("a", 1024)
uploadID := "upload-12345678901234567890"
const generation uint64 = 1
const partNo uint64 = 1

// Fill the chunk value with non-zero bytes so protobuf does not
// elide trailing zeros and underestimate the encoded size.
value := make([]byte, s3ChunkSize)
for i := range value {
value[i] = 0xAB
}

muts := make([]*pb.Mutation, 0, s3ChunkBatchOps)
for i := uint64(0); i < uint64(s3ChunkBatchOps); i++ {
key := s3keys.BlobKey(bucket, generation, objectKey, uploadID, partNo, i)
muts = append(muts, &pb.Mutation{
Op: pb.Op_PUT,
Key: key,
Value: value,
})
}

req := &pb.Request{
IsTxn: false,
Phase: pb.Phase_NONE,
Ts: 1234567890,
Mutations: muts,
}

encoded, err := proto.Marshal(req)
require.NoError(t, err)

// marshalRaftCommand prepends one framing byte (raftEncodeSingle
// or raftEncodeBatch). Account for it explicitly.
const raftFramingPrefix = 1
totalEntrySize := len(encoded) + raftFramingPrefix

require.Lessf(t,
totalEntrySize, raftMaxSizePerMsgPostPR593,
"S3 chunk batch entry must fit strictly under MaxSizePerMsg=%d to avoid the etcd/raft oversized-first-entry path; got %d (s3ChunkBatchOps=%d, s3ChunkSize=%d, objectKey=%dB)",
raftMaxSizePerMsgPostPR593, totalEntrySize, s3ChunkBatchOps, s3ChunkSize, len(objectKey),
)

// Sanity: the headroom should be meaningful (at least 64 KiB) so
// future small bumps in key length or Request envelope fields do
// not silently push past the limit. This is the constant we
// document in the s3ChunkBatchOps comment.
const minHeadroom = 64 << 10
require.Greaterf(t,
raftMaxSizePerMsgPostPR593-totalEntrySize, minHeadroom,
"S3 chunk batch headroom under MaxSizePerMsg has fallen below %d B (got %d B); reduce s3ChunkBatchOps or s3ChunkSize",
minHeadroom, raftMaxSizePerMsgPostPR593-totalEntrySize,
)
}

// TestS3MetaBatchFitsInRaftMaxSize is the same byte-budget invariant
// for the cleanup paths (cleanupPartBlobsAsync, deleteByPrefix,
// cleanupManifestBlobs). These ops carry no chunk payload so the
// batch is dominated by key bytes; even at the worst-case key length
// the total stays well under the cap. The test pins the headroom
// margin so a future bump in s3MetaBatchOps that pushes too far is
// caught at PR time.
func TestS3MetaBatchFitsInRaftMaxSize(t *testing.T) {
t.Parallel()

bucket := "test-bucket"
objectKey := strings.Repeat("a", 1024)
uploadID := "upload-12345678901234567890"
const generation uint64 = 1
const partNo uint64 = 1

muts := make([]*pb.Mutation, 0, s3MetaBatchOps)
for i := uint64(0); i < uint64(s3MetaBatchOps); i++ {
key := s3keys.BlobKey(bucket, generation, objectKey, uploadID, partNo, i)
muts = append(muts, &pb.Mutation{
Op: pb.Op_DEL,
Key: key,
})
}

req := &pb.Request{
IsTxn: false,
Phase: pb.Phase_NONE,
Ts: 1234567890,
Mutations: muts,
}

encoded, err := proto.Marshal(req)
require.NoError(t, err)

const raftFramingPrefix = 1
totalEntrySize := len(encoded) + raftFramingPrefix

require.Lessf(t,
totalEntrySize, raftMaxSizePerMsgPostPR593,
"S3 meta batch entry must fit under MaxSizePerMsg=%d; got %d (s3MetaBatchOps=%d, objectKey=%dB)",
raftMaxSizePerMsgPostPR593, totalEntrySize, s3MetaBatchOps, len(objectKey),
)
}

// TestAppendPartBlobKeys_FlushFiresEveryS3MetaBatchOps is the
// regression guard for the slice-by-value bug Gemini caught: a
// previous version of appendPartBlobKeys took `pending` by value, so
// the flush closure (captured from cleanupManifestBlobs's enclosing
// scope) saw the outer slice header at length 0 and never fired,
// silently accumulating every chunk into one giant batch. This test
// pins the contract that the helper drains via flush exactly every
// s3MetaBatchOps appends, never building a slice longer than the cap.
func TestAppendPartBlobKeys_FlushFiresEveryS3MetaBatchOps(t *testing.T) {
t.Parallel()

// Build a manifest part with chunkCount > 2× s3MetaBatchOps so the
// flush closure must fire at least twice, plus a tail flush from
// the caller's final flush() in cleanupManifestBlobs.
const chunkCount = 2*s3MetaBatchOps + 7
chunkSizes := make([]uint64, chunkCount)
for i := range chunkSizes {
chunkSizes[i] = 1
}
part := s3ObjectPart{
PartNo: 1,
PartVersion: 1,
ChunkSizes: chunkSizes,
}

pending := make([]*kv.Elem[kv.OP], 0, s3MetaBatchOps)
flushCalls := 0
flushBatchSizes := make([]int, 0, 4)
flush := func() {
// Mirror cleanupManifestBlobs's flush: record the batch size
// and then truncate. If the helper's pointer plumbing is
// broken, len(pending) here would always be 0 and the
// recorded batch sizes would never match s3MetaBatchOps.
flushCalls++
flushBatchSizes = append(flushBatchSizes, len(pending))
pending = pending[:0]
}

srv := (*S3Server)(nil) // method body does not touch s
ok := srv.appendPartBlobKeys(&pending, "bucket", 1, "key", "upload", part, flush)
require.True(t, ok)

// Exactly two threshold-triggered flushes inside the helper:
// at append #s3MetaBatchOps and #2×s3MetaBatchOps. The 7-entry
// remainder is left in pending for the caller's tail flush().
require.Equal(t, 2, flushCalls,
"expected flush to fire twice (at append %d and %d); slice-by-value bug regressed?",
s3MetaBatchOps, 2*s3MetaBatchOps)
require.Equal(t, []int{s3MetaBatchOps, s3MetaBatchOps}, flushBatchSizes,
"each flush must drain exactly s3MetaBatchOps entries; pending must not silently overflow the cap")
require.Len(t, pending, 7,
"trailing 7 entries should remain for the caller's final flush()")
}
Loading