From 1cb65435a3b60699f17a823163d4bbd434f93148 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Mon, 25 May 2026 15:18:37 +0300 Subject: [PATCH 1/8] feat: implement `stcs` --- compaction/stcs.go | 108 ++++++++++++++++++++++++++++++++++++++++ compaction/stcs_test.go | 97 ++++++++++++++++++++++++++++++++++++ 2 files changed, 205 insertions(+) create mode 100644 compaction/stcs.go create mode 100644 compaction/stcs_test.go diff --git a/compaction/stcs.go b/compaction/stcs.go new file mode 100644 index 00000000..54d9843f --- /dev/null +++ b/compaction/stcs.go @@ -0,0 +1,108 @@ +package compaction + +import ( + "cmp" + "slices" + + "github.com/ozontech/seq-db/frac" +) + +type strategySTCS struct { + // To trigger compaction of bucket there must be + // at least [mergeTrigger] fractions. + mergeTrigger int + + // At most this many fractions are compacted from a single bucket + // per compaction iteration. + mergeFanIn int + mergeFanOutSize uint64 + + // Fraction size must be within [bucketLowerbound, bucketUpperbound] * avg(bucket) + // to be considered part of the bucket. + bucketLowerbound float64 + bucketUpperbound float64 +} + +func (s strategySTCS) Pick(candidates []frac.Fraction) []frac.Fraction { + if len(candidates) < s.mergeTrigger { + return nil + } + + sorted := slices.Clone(candidates) + slices.SortFunc(sorted, func(a, b frac.Fraction) int { + return cmp.Compare(a.Info().IndexOnDisk, b.Info().IndexOnDisk) + }) + + buckets := s.group(sorted) + // We are interested in buckets with the most amount of fractions. + // Usually, these are the lowest tiers where all freshly sealed fractions end up. + slices.SortFunc(buckets, func(x, y []frac.Fraction) int { + return -cmp.Compare(len(x), len(y)) + }) + + for _, bucket := range buckets { + if len(bucket) < s.mergeTrigger { + continue + } + + fracs := bucket[:min(len(bucket), s.mergeFanIn)] + if picked := s.takeUntilSize(fracs); len(picked) > 0 { + return picked + } + } + + return nil +} + +func (s strategySTCS) group(sorted []frac.Fraction) [][]frac.Fraction { + var ( + sum uint64 + current []frac.Fraction + buckets [][]frac.Fraction + ) + + for _, f := range sorted { + size := f.Info().IndexOnDisk + + if len(current) == 0 { + current = append(current, f) + sum = size + continue + } + + avg := float64(sum) / float64(len(current)) + fsize := float64(size) + + lower := avg * s.bucketLowerbound + upper := avg * s.bucketUpperbound + + if lower <= fsize && fsize <= upper { + current = append(current, f) + sum += size + continue + } + + buckets = append(buckets, current) + current = []frac.Fraction{f} + sum = size + } + + if len(current) > 0 { + buckets = append(buckets, current) + } + + return buckets +} + +func (s strategySTCS) takeUntilSize(fracs []frac.Fraction) []frac.Fraction { + var picked uint64 + + for i := range fracs { + picked += fracs[i].Info().IndexOnDisk + if picked >= s.mergeFanOutSize { + return fracs[:i] + } + } + + return fracs +} diff --git a/compaction/stcs_test.go b/compaction/stcs_test.go new file mode 100644 index 00000000..b846855a --- /dev/null +++ b/compaction/stcs_test.go @@ -0,0 +1,97 @@ +package compaction + +import ( + "context" + "math" + "testing" + + "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/processor" + "github.com/ozontech/seq-db/seq" + "github.com/stretchr/testify/require" +) + +type mockFraction struct { + indexOnDisk uint64 +} + +func (m *mockFraction) Info() *common.Info { + return &common.Info{IndexOnDisk: m.indexOnDisk} +} + +func (m *mockFraction) IsIntersecting(seq.MID, seq.MID) bool { + return false +} + +func (m *mockFraction) Contains(seq.MID) bool { + return false +} + +func (m *mockFraction) Fetch(context.Context, []seq.ID) ([][]byte, error) { + return nil, nil +} + +func (m *mockFraction) Search(context.Context, processor.SearchParams) (*seq.QPR, error) { + return nil, nil +} + +func (m *mockFraction) FindLIDs(context.Context, []seq.ID) ([]seq.LID, error) { + return nil, nil +} + +func makeFracs(sizes ...uint64) []frac.Fraction { + out := make([]frac.Fraction, len(sizes)) + for i, s := range sizes { + out[i] = &mockFraction{indexOnDisk: s} + } + return out +} + +func TestSTCS_Pick(t *testing.T) { + s := strategySTCS{ + mergeTrigger: 4, + mergeFanIn: 32, + mergeFanOutSize: math.MaxUint64, + bucketLowerbound: 0.5, + bucketUpperbound: 1.5, + } + + t.Run("not-enough-candidates", func(t *testing.T) { + for n := range s.mergeTrigger { + require.Nil(t, s.Pick(makeFracs(make([]uint64, n)...))) + } + }) + + t.Run("requirement-not-met", func(t *testing.T) { + // Each fraction size is 10x the previous. + // They land in different buckets and no bucket with [mergeTrigger] fractions exists. + require.Nil(t, s.Pick(makeFracs(100, 1000, 10000, 100000))) + }) + + t.Run("one-bucket", func(t *testing.T) { + require.Len(t, s.Pick(makeFracs(1000, 1000, 1000, 1000)), 4) + }) + + t.Run("largest-bucket", func(t *testing.T) { + fracs := s.Pick(makeFracs( + 1000, 1000, + 100000, 100000, 100000, 100000, 100000, // Will take this bucket. + )) + + require.Len(t, fracs, 5) + for _, f := range fracs { + require.Equal(t, uint64(100000), f.Info().IndexOnDisk) + } + }) + + t.Run("cap-at-fan-in", func(t *testing.T) { + sizes := make([]uint64, s.mergeFanIn+10) + + for i := range sizes { + sizes[i] = 5000 + } + + require.Len(t, s.Pick(makeFracs(sizes...)), s.mergeFanIn) + }) +} From 3118a7d06296bd5f18d6c2d25e4a7d6f25b1d762 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Wed, 27 May 2026 12:12:46 +0300 Subject: [PATCH 2/8] feat: add `FractionName` method for `FracManager` --- fracmanager/fracmanager.go | 14 ++++++++++++-- fracmanager/fraction_provider.go | 7 ++++++- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index 5dc808c7..9ac3c642 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -11,6 +11,7 @@ import ( "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/storage/s3" @@ -114,6 +115,12 @@ func (cs *CompactionSnapshot) Destroy() { } } +func (fm *FracManager) FractionName() string { + filePath := fileBasePattern + fm.lc.provider.nextFractionID() + baseFilePath := filepath.Join(fm.lc.provider.config.DataDir, filePath) + return baseFilePath +} + func (fm *FracManager) SealedFractionsSnapshot() []*frac.Sealed { return fm.lc.registry.sealedSnapshot() } @@ -126,8 +133,11 @@ func (fm *FracManager) ClaimForCompaction(names []string) (*CompactionSnapshot, return &CompactionSnapshot{claimed: claimed}, nil } -func (fm *FracManager) SubstituteWithSealed(produced *frac.Sealed, snapshot *CompactionSnapshot) { - fm.lc.registry.substituteWithSealed(produced, snapshot.claimed...) +func (fm *FracManager) SubstituteWithSealed(produced *sealed.PreloadedData, snapshot *CompactionSnapshot) { + fm.lc.registry.substituteWithSealed( + fm.lc.provider.NewSealedPreloaded(produced.Info.Path, produced), + snapshot.claimed..., + ) } func (fm *FracManager) AcquireFraction(name string) (frac.Fraction, func(), bool) { diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index a3609b85..556eb2f1 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -5,6 +5,7 @@ import ( "io" "math/rand" "path/filepath" + "sync" "time" "github.com/oklog/ulid/v2" @@ -37,8 +38,10 @@ type fractionProvider struct { cacheProvider *CacheMaintainer // Cache provider for data access optimization activeIndexer *frac.ActiveIndexer // Indexer for active fractions readLimiter *storage.ReadLimiter // Read rate limiter - ulidEntropy io.Reader // Entropy source for ULID generation skipMaskProvider skipMaskProvider + + mu sync.Mutex + ulidEntropy io.Reader // Entropy source for ULID generation } func newFractionProvider( @@ -113,6 +116,8 @@ func (fp *fractionProvider) NewRemote(ctx context.Context, name string, cachedIn // IMPORTANT: This method is not thread-safe. When used in concurrent environments, // external synchronization must be provided to avoid ID collisions func (fp *fractionProvider) nextFractionID() string { + fp.mu.Lock() + defer fp.mu.Unlock() return ulid.MustNew(ulid.Timestamp(time.Now()), fp.ulidEntropy).String() } From e0628ddc45f6f1ed4af6947fe8d28875d9a33673 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Wed, 27 May 2026 12:13:49 +0300 Subject: [PATCH 3/8] feat: first iteration on `planner` --- compaction/planner.go | 244 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 244 insertions(+) create mode 100644 compaction/planner.go diff --git a/compaction/planner.go b/compaction/planner.go new file mode 100644 index 00000000..73b146ce --- /dev/null +++ b/compaction/planner.go @@ -0,0 +1,244 @@ +package compaction + +import ( + "context" + "maps" + "slices" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/alecthomas/units" + "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/sealed" + "github.com/ozontech/seq-db/fracmanager" + "github.com/ozontech/seq-db/logger" +) + +const ( + // TODO(dkharms): Move this options to config. + compactionTick = time.Second + compactionWindow = 24 * time.Hour +) + +type task struct { + bin time.Time + filename string + snapshot *fracmanager.CompactionSnapshot + onComplete func(result, error) +} + +type result struct { + filename string + consumed *fracmanager.CompactionSnapshot + produced *sealed.PreloadedData +} + +type planner struct { + wg sync.WaitGroup + ctx context.Context + done chan struct{} + + fm *fracmanager.FracManager + tasks chan task + + mu sync.RWMutex + // inflight tracks active compactions for each time-bin. + // We cannot have concurrent compactions within one time-bin for correctness purposes. + inflight map[time.Time]struct{} + + stats map[time.Time]int +} + +func NewPlanner(ctx context.Context, fm *fracmanager.FracManager) *planner { + p := planner{ + ctx: ctx, + done: make(chan struct{}), + + fm: fm, + + tasks: make(chan task), + + inflight: make(map[time.Time]struct{}), + stats: make(map[time.Time]int), + } + + p.init() + return &p +} + +func (p *planner) init() { + p.wg.Go(func() { + t := time.NewTicker(compactionTick) + + for { + select { + case <-p.ctx.Done(): + close(p.tasks) + return + + case <-p.done: + close(p.tasks) + return + + case <-t.C: + task, ok := p.pick() + if !ok { + continue + } + + select { + case p.tasks <- task: + case <-time.NewTimer(time.Second).C: + // If all executor workers are busy for some long period + // we want to drop the task because it might contain stale decision. + } + } + } + }) +} + +func (p *planner) close() { + close(p.done) +} + +func (p *planner) pick() (task, bool) { + names := func(fracs []frac.Fraction) []string { + fnames := make([]string, len(fracs)) + for i := range fracs { + fnames[i] = fracs[i].Info().Name() + } + return fnames + } + + snapshot := p.fm.SealedFractionsSnapshot() + bins := p.distribute(compactionWindow, snapshot) + times := p.prioritize(bins) + + p.mu.Lock() + defer p.mu.Unlock() + + // NOTE(dkharms): This lock guards [inflight] map. + // Maybe I can find another way to signal from worker that time-bin is free? + + for _, t := range times { + if _, ok := p.inflight[t]; ok { + // There is on-going compaction within this time-bin. + continue + } + + // TODO(dkharms): Move this options to config. + picked := strategySTCS{ + mergeTrigger: 4, + mergeFanIn: 32, + mergeFanOutSize: 128 * uint64(units.MiB), + bucketLowerbound: 0.5, + bucketUpperbound: 1.5, + }.Pick(bins[t].fracs) + + if len(picked) == 0 { + // No candidates were found. + continue + } + + csnapshot, err := p.fm.ClaimForCompaction(names(picked)) + if err != nil { + continue + } + + p.inflight[t] = struct{}{} + + return task{ + bin: t, + + filename: p.fm.FractionName(), + snapshot: csnapshot, + + onComplete: func(r result, err error) { + p.mu.Lock() + defer p.mu.Unlock() + delete(p.inflight, t) + + if err != nil { + logger.Error( + "failed to compact fractions", + zap.Error(err), + zap.Any("snapshot", csnapshot), + ) + return + } + + if r.produced == nil { + logger.Info( + "compaction did not produce fraction", + zap.Any("snapshot", csnapshot), + ) + return + } + + // TODO(dkharms): Is it fine to substitute and delete? + // We need somehow substitute and delete atomically. + p.fm.SubstituteWithSealed(r.produced, csnapshot) + csnapshot.Destroy() + }, + }, true + } + + return task{}, false +} + +type timestampBin struct { + t time.Time + fracs []frac.Fraction +} + +func (p *planner) distribute(window time.Duration, fracs []*frac.Sealed) map[time.Time]timestampBin { + bins := make(map[time.Time]timestampBin) + + for _, f := range fracs { + from, to := f.Info().From.Time(), f.Info().To.Time() + + // Do not handle fractions which have + // too wide date-range. + if to.Sub(from) > window { + continue + } + + bin := from.Truncate(window) + tb := bins[bin] + + tb.t = bin + tb.fracs = append(tb.fracs, f) + + bins[bin] = tb + } + + return bins +} + +func (p *planner) prioritize(bins map[time.Time]timestampBin) []time.Time { + // NOTE(dkharms): What other strategies we can use here? + // (*) Prioritize by change rate; + // (*) Prioritize by amount of fractions; + + ordered := slices.Collect(maps.Keys(bins)) + + // Order timestamp-bins by the change-rate. + // We will prioritize bins with higher change rate. + slices.SortFunc(ordered, func(x, y time.Time) int { + xold, xnew := p.stats[x], len(bins[x].fracs) + yold, ynew := p.stats[y], len(bins[y].fracs) + xchange, ychange := xnew-xold, ynew-yold + + p.stats[x], p.stats[y] = xnew, ynew + + if xchange == ychange { + return -x.Compare(y) + } + + return -(xchange - ychange) + }) + + return ordered +} From 688cb8cc827b4e432a9a09e7ca5857640c4f9226 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Wed, 27 May 2026 12:14:07 +0300 Subject: [PATCH 4/8] feat: first iteration on `Executor` --- compaction/executor.go | 89 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 compaction/executor.go diff --git a/compaction/executor.go b/compaction/executor.go new file mode 100644 index 00000000..faabbc38 --- /dev/null +++ b/compaction/executor.go @@ -0,0 +1,89 @@ +package compaction + +import ( + "sync" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/logger" +) + +type Executor struct { + workers int + wg sync.WaitGroup + p *planner +} + +// FIXME(dkharms): I need to pass here [common.SealParams]. +func NewExecutor(workers int, p *planner) *Executor { + e := Executor{workers: workers, p: p} + e.init() + return &e +} + +func (e *Executor) Close() { + e.p.close() + e.wg.Wait() +} + +func (e *Executor) init() { + for range e.workers { + e.wg.Go(func() { + for t := range e.p.tasks { + logger.Info( + "got new compaction task", + zap.Time("bin", t.bin), + zap.Any("snapshot", t.snapshot), + ) + t.onComplete(e.compact(t)) + } + }) + } +} + +func (e *Executor) compact(t task) (result, error) { + var ( + names []string + srcs []Source + ) + + for _, f := range t.snapshot.Fractions() { + names = append(names, f.Info().Name()) + srcs = append(srcs, frac.NewSealedSource(f)) + } + + logger.Info( + "compacting fractions", + zap.Strings("names", names), + ) + + preloaded, err := Merge(t.filename, common.SealParams{}, srcs...) + return result{filename: t.filename, consumed: t.snapshot, produced: preloaded}, err +} + +type noopexecutor struct{} + +func (noopexecutor) Compact(t task) (result, error) { + var ( + sum int + cnt int + names []string + ) + + for _, f := range t.snapshot.Fractions() { + cnt += 1 + sum += int(f.Info().IndexOnDisk) + names = append(names, f.Info().Name()) + } + + logger.Info( + "picked fractions", + zap.Any("names", names), + zap.Int("size", sum), + zap.Int("count", cnt), + ) + + return result{}, nil +} From 14001ce495deb7e72527d67cb7629597a58cdcb0 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Wed, 27 May 2026 12:14:26 +0300 Subject: [PATCH 5/8] feat: add compaction executor startup --- storeapi/store.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/storeapi/store.go b/storeapi/store.go index dd53079e..c3ee0aca 100644 --- a/storeapi/store.go +++ b/storeapi/store.go @@ -8,6 +8,7 @@ import ( "go.uber.org/atomic" + "github.com/ozontech/seq-db/compaction" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/fracmanager" "github.com/ozontech/seq-db/logger" @@ -31,6 +32,7 @@ type Store struct { fracManagerStop func() SkipMaskManager *skipmaskmanager.SkipMaskManager + Executor *compaction.Executor isStopped atomic.Bool } @@ -66,12 +68,14 @@ func NewStore( } skipMaskManager := skipmaskmanager.New(ctx, c.SkipMaskManagerConfig, skipMaskParams, mappingProvider) - fracManager, stop, err := fracmanager.New(ctx, &c.FracManager, s3cli, skipMaskManager) if err != nil { return nil, fmt.Errorf("loading fractions error: %w", err) } + planner := compaction.NewPlanner(ctx, fracManager) + executor := compaction.NewExecutor(10, planner) + skipMaskManager.Start(fracManager) return &Store{ @@ -82,6 +86,7 @@ func NewStore( FracManager: fracManager, fracManagerStop: stop, SkipMaskManager: skipMaskManager, + Executor: executor, isStopped: atomic.Bool{}, }, nil } @@ -107,6 +112,7 @@ func (s *Store) Stop() { s.grpcServer.Stop(ctx) s.fracManagerStop() s.SkipMaskManager.Stop() + s.Executor.Close() logger.Info("store stopped") } From b7ae92a97252c6419aea35dcf9217b41ea5e1c6b Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Wed, 27 May 2026 12:27:22 +0300 Subject: [PATCH 6/8] refactor: use local `fraction` interface --- compaction/planner.go | 9 +++++++-- compaction/stcs.go | 18 ++++++++---------- compaction/stcs_test.go | 31 ++++--------------------------- 3 files changed, 19 insertions(+), 39 deletions(-) diff --git a/compaction/planner.go b/compaction/planner.go index 73b146ce..c3744608 100644 --- a/compaction/planner.go +++ b/compaction/planner.go @@ -11,11 +11,16 @@ import ( "github.com/alecthomas/units" "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/fracmanager" "github.com/ozontech/seq-db/logger" ) +type fraction interface { + Info() *common.Info +} + const ( // TODO(dkharms): Move this options to config. compactionTick = time.Second @@ -104,7 +109,7 @@ func (p *planner) close() { } func (p *planner) pick() (task, bool) { - names := func(fracs []frac.Fraction) []string { + names := func(fracs []fraction) []string { fnames := make([]string, len(fracs)) for i := range fracs { fnames[i] = fracs[i].Info().Name() @@ -190,7 +195,7 @@ func (p *planner) pick() (task, bool) { type timestampBin struct { t time.Time - fracs []frac.Fraction + fracs []fraction } func (p *planner) distribute(window time.Duration, fracs []*frac.Sealed) map[time.Time]timestampBin { diff --git a/compaction/stcs.go b/compaction/stcs.go index 54d9843f..192951b8 100644 --- a/compaction/stcs.go +++ b/compaction/stcs.go @@ -3,8 +3,6 @@ package compaction import ( "cmp" "slices" - - "github.com/ozontech/seq-db/frac" ) type strategySTCS struct { @@ -23,20 +21,20 @@ type strategySTCS struct { bucketUpperbound float64 } -func (s strategySTCS) Pick(candidates []frac.Fraction) []frac.Fraction { +func (s strategySTCS) Pick(candidates []fraction) []fraction { if len(candidates) < s.mergeTrigger { return nil } sorted := slices.Clone(candidates) - slices.SortFunc(sorted, func(a, b frac.Fraction) int { + slices.SortFunc(sorted, func(a, b fraction) int { return cmp.Compare(a.Info().IndexOnDisk, b.Info().IndexOnDisk) }) buckets := s.group(sorted) // We are interested in buckets with the most amount of fractions. // Usually, these are the lowest tiers where all freshly sealed fractions end up. - slices.SortFunc(buckets, func(x, y []frac.Fraction) int { + slices.SortFunc(buckets, func(x, y []fraction) int { return -cmp.Compare(len(x), len(y)) }) @@ -54,11 +52,11 @@ func (s strategySTCS) Pick(candidates []frac.Fraction) []frac.Fraction { return nil } -func (s strategySTCS) group(sorted []frac.Fraction) [][]frac.Fraction { +func (s strategySTCS) group(sorted []fraction) [][]fraction { var ( sum uint64 - current []frac.Fraction - buckets [][]frac.Fraction + current []fraction + buckets [][]fraction ) for _, f := range sorted { @@ -83,7 +81,7 @@ func (s strategySTCS) group(sorted []frac.Fraction) [][]frac.Fraction { } buckets = append(buckets, current) - current = []frac.Fraction{f} + current = []fraction{f} sum = size } @@ -94,7 +92,7 @@ func (s strategySTCS) group(sorted []frac.Fraction) [][]frac.Fraction { return buckets } -func (s strategySTCS) takeUntilSize(fracs []frac.Fraction) []frac.Fraction { +func (s strategySTCS) takeUntilSize(fracs []fraction) []fraction { var picked uint64 for i := range fracs { diff --git a/compaction/stcs_test.go b/compaction/stcs_test.go index b846855a..3e79f6bc 100644 --- a/compaction/stcs_test.go +++ b/compaction/stcs_test.go @@ -1,15 +1,12 @@ package compaction import ( - "context" "math" "testing" - "github.com/ozontech/seq-db/frac" - "github.com/ozontech/seq-db/frac/common" - "github.com/ozontech/seq-db/frac/processor" - "github.com/ozontech/seq-db/seq" "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/frac/common" ) type mockFraction struct { @@ -20,28 +17,8 @@ func (m *mockFraction) Info() *common.Info { return &common.Info{IndexOnDisk: m.indexOnDisk} } -func (m *mockFraction) IsIntersecting(seq.MID, seq.MID) bool { - return false -} - -func (m *mockFraction) Contains(seq.MID) bool { - return false -} - -func (m *mockFraction) Fetch(context.Context, []seq.ID) ([][]byte, error) { - return nil, nil -} - -func (m *mockFraction) Search(context.Context, processor.SearchParams) (*seq.QPR, error) { - return nil, nil -} - -func (m *mockFraction) FindLIDs(context.Context, []seq.ID) ([]seq.LID, error) { - return nil, nil -} - -func makeFracs(sizes ...uint64) []frac.Fraction { - out := make([]frac.Fraction, len(sizes)) +func makeFracs(sizes ...uint64) []fraction { + out := make([]fraction, len(sizes)) for i, s := range sizes { out[i] = &mockFraction{indexOnDisk: s} } From ec7be15f8ea7430e2cae09d144a4291abcaa4d52 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Wed, 27 May 2026 15:04:16 +0300 Subject: [PATCH 7/8] refactor: move to `frac_test` package --- compaction/executor.go | 30 +---- compaction/metrics.go | 1 + compaction/planner.go | 27 ++--- compaction/stcs_test.go | 2 +- frac/active_indexer_test.go | 9 +- frac/fraction_concurrency_test.go | 21 ++-- frac/fraction_test.go | 193 ++++++++++++++++++++++++------ frac/index_cache.go | 2 +- 8 files changed, 194 insertions(+), 91 deletions(-) create mode 100644 compaction/metrics.go diff --git a/compaction/executor.go b/compaction/executor.go index faabbc38..d3b1a55c 100644 --- a/compaction/executor.go +++ b/compaction/executor.go @@ -7,6 +7,7 @@ import ( "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/logger" ) @@ -43,7 +44,7 @@ func (e *Executor) init() { } } -func (e *Executor) compact(t task) (result, error) { +func (e *Executor) compact(t task) (*sealed.PreloadedData, error) { var ( names []string srcs []Source @@ -60,30 +61,5 @@ func (e *Executor) compact(t task) (result, error) { ) preloaded, err := Merge(t.filename, common.SealParams{}, srcs...) - return result{filename: t.filename, consumed: t.snapshot, produced: preloaded}, err -} - -type noopexecutor struct{} - -func (noopexecutor) Compact(t task) (result, error) { - var ( - sum int - cnt int - names []string - ) - - for _, f := range t.snapshot.Fractions() { - cnt += 1 - sum += int(f.Info().IndexOnDisk) - names = append(names, f.Info().Name()) - } - - logger.Info( - "picked fractions", - zap.Any("names", names), - zap.Int("size", sum), - zap.Int("count", cnt), - ) - - return result{}, nil + return preloaded, err } diff --git a/compaction/metrics.go b/compaction/metrics.go new file mode 100644 index 00000000..d1d3cde1 --- /dev/null +++ b/compaction/metrics.go @@ -0,0 +1 @@ +package compaction diff --git a/compaction/planner.go b/compaction/planner.go index c3744608..3d46e282 100644 --- a/compaction/planner.go +++ b/compaction/planner.go @@ -10,7 +10,6 @@ import ( "go.uber.org/zap" "github.com/alecthomas/units" - "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/fracmanager" @@ -31,13 +30,7 @@ type task struct { bin time.Time filename string snapshot *fracmanager.CompactionSnapshot - onComplete func(result, error) -} - -type result struct { - filename string - consumed *fracmanager.CompactionSnapshot - produced *sealed.PreloadedData + onComplete func(*sealed.PreloadedData, error) } type planner struct { @@ -96,7 +89,7 @@ func (p *planner) init() { select { case p.tasks <- task: case <-time.NewTimer(time.Second).C: - // If all executor workers are busy for some long period + // If all executor workers are busy for some long period of time, // we want to drop the task because it might contain stale decision. } } @@ -117,7 +110,13 @@ func (p *planner) pick() (task, bool) { return fnames } - snapshot := p.fm.SealedFractionsSnapshot() + fractions := p.fm.SealedFractionsSnapshot() + snapshot := make([]fraction, len(fractions)) + + for i := range fractions { + snapshot[i] = fractions[i] + } + bins := p.distribute(compactionWindow, snapshot) times := p.prioritize(bins) @@ -160,7 +159,7 @@ func (p *planner) pick() (task, bool) { filename: p.fm.FractionName(), snapshot: csnapshot, - onComplete: func(r result, err error) { + onComplete: func(s *sealed.PreloadedData, err error) { p.mu.Lock() defer p.mu.Unlock() delete(p.inflight, t) @@ -174,7 +173,7 @@ func (p *planner) pick() (task, bool) { return } - if r.produced == nil { + if s == nil { logger.Info( "compaction did not produce fraction", zap.Any("snapshot", csnapshot), @@ -184,7 +183,7 @@ func (p *planner) pick() (task, bool) { // TODO(dkharms): Is it fine to substitute and delete? // We need somehow substitute and delete atomically. - p.fm.SubstituteWithSealed(r.produced, csnapshot) + p.fm.SubstituteWithSealed(s, csnapshot) csnapshot.Destroy() }, }, true @@ -198,7 +197,7 @@ type timestampBin struct { fracs []fraction } -func (p *planner) distribute(window time.Duration, fracs []*frac.Sealed) map[time.Time]timestampBin { +func (p *planner) distribute(window time.Duration, fracs []fraction) map[time.Time]timestampBin { bins := make(map[time.Time]timestampBin) for _, f := range fracs { diff --git a/compaction/stcs_test.go b/compaction/stcs_test.go index 3e79f6bc..5729c971 100644 --- a/compaction/stcs_test.go +++ b/compaction/stcs_test.go @@ -41,7 +41,7 @@ func TestSTCS_Pick(t *testing.T) { }) t.Run("requirement-not-met", func(t *testing.T) { - // Each fraction size is 10x the previous. + // Each Fraction size is 10x the previous. // They land in different buckets and no bucket with [mergeTrigger] fractions exists. require.Nil(t, s.Pick(makeFracs(100, 1000, 10000, 100000))) }) diff --git a/frac/active_indexer_test.go b/frac/active_indexer_test.go index a1200a7c..812b2763 100644 --- a/frac/active_indexer_test.go +++ b/frac/active_indexer_test.go @@ -1,4 +1,4 @@ -package frac +package frac_test import ( "bytes" @@ -12,6 +12,7 @@ import ( "go.uber.org/zap/zapcore" "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/metric/stopwatch" @@ -76,20 +77,20 @@ func getTestProcessor() *indexer.Processor { func BenchmarkIndexer(b *testing.B) { logger.SetLevel(zapcore.FatalLevel) - idx, stop := NewActiveIndexer(8, 8) + idx, stop := frac.NewActiveIndexer(8, 8) defer stop() allLogs, err := readFileAllAtOnce(filepath.Join(common.TestDataDir, "k8s.logs")) readers := splitLogsToBulks(allLogs, 1000) assert.NoError(b, err) - active := NewActive( + active := frac.NewActive( filepath.Join(b.TempDir(), "test"), idx, storage.NewReadLimiter(1, nil), cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), - &Config{}, + &frac.Config{}, testSkipMaskProvider{}, ) diff --git a/frac/fraction_concurrency_test.go b/frac/fraction_concurrency_test.go index b37cf2a3..addf376d 100644 --- a/frac/fraction_concurrency_test.go +++ b/frac/fraction_concurrency_test.go @@ -1,4 +1,4 @@ -package frac +package frac_test import ( "fmt" @@ -14,6 +14,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" "github.com/ozontech/seq-db/indexer" @@ -39,16 +40,16 @@ func TestConcurrentAppendAndQuery(t *testing.T) { fracPath := filepath.Join(tmpDir, "test_fraction") defer testcommon.RemoveDir(fracPath) - activeIndexer, stop := NewActiveIndexer(numIndexWorkers, 1000) + activeIndexer, stop := frac.NewActiveIndexer(numIndexWorkers, 1000) defer stop() - active := NewActive( + active := frac.NewActive( fracPath, activeIndexer, storage.NewReadLimiter(numReaders/2, nil), cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), - &Config{}, + &frac.Config{}, testSkipMaskProvider{}, ) @@ -154,7 +155,7 @@ const ( kafka = "kafka" ) -func readTest(t *testing.T, fraction Fraction, numReaders, numQueries int, docs []*testDoc, fromTime, toTime time.Time, mapping seq.Mapping) { +func readTest(t *testing.T, fraction frac.Fraction, numReaders, numQueries int, docs []*testDoc, fromTime, toTime time.Time, mapping seq.Mapping) { readersGroup, ctx := errgroup.WithContext(t.Context()) type queryFilter func(doc *testDoc) bool @@ -332,7 +333,7 @@ func generatesMessages(numMessages, bulkSize int) ([]*testDoc, [][]string, time. return docs, bulks, fromTime, toTime } -func seal(active *Active) (*Sealed, error) { +func seal(active *frac.Active) (*frac.Sealed, error) { sealParams := common.SealParams{ IDsZstdLevel: 1, LIDsZstdLevel: 1, @@ -342,7 +343,7 @@ func seal(active *Active) (*Sealed, error) { DocBlocksZstdLevel: 1, DocBlockSize: 128 * int(units.KiB), } - activeSealingSource, err := NewActiveSealingSource(active, sealParams) + activeSealingSource, err := frac.NewActiveSealingSource(active, sealParams) if err != nil { return nil, err } @@ -351,13 +352,13 @@ func seal(active *Active) (*Sealed, error) { return nil, err } - sealed := NewSealedPreloaded( + sealed := frac.NewSealedPreloaded( active.BaseFileName, preloaded, storage.NewReadLimiter(1, nil), - newIndexCache(), + frac.NewIndexCache(), cache.NewCache[[]byte](nil, nil), - &Config{}, + &frac.Config{}, testSkipMaskProvider{}, ) diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 0fee4795..b4bbe08a 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1,7 +1,9 @@ -package frac +package frac_test import ( "context" + cryptorand "crypto/rand" + "encoding/hex" "fmt" "math" "math/rand/v2" @@ -20,6 +22,8 @@ import ( "github.com/stretchr/testify/suite" "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/compaction" + "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" "github.com/ozontech/seq-db/indexer" @@ -42,20 +46,20 @@ func (testSkipMaskProvider) RemoveFrac(_ string) {} type FractionTestSuite struct { suite.Suite tmpDir string - config *Config + config *frac.Config mapping seq.Mapping tokenizers map[seq.TokenizerType]tokenizer.Tokenizer - activeIndexer *ActiveIndexer + activeIndexer *frac.ActiveIndexer stopIndexer func() sealParams common.SealParams - fraction Fraction + fraction frac.Fraction insertDocuments func(docs ...[]string) } func (s *FractionTestSuite) SetupSuiteCommon() { - s.activeIndexer, s.stopIndexer = NewActiveIndexer(4, 10) + s.activeIndexer, s.stopIndexer = frac.NewActiveIndexer(4, 10) } func (s *FractionTestSuite) TearDownSuiteCommon() { @@ -63,7 +67,7 @@ func (s *FractionTestSuite) TearDownSuiteCommon() { } func (s *FractionTestSuite) SetupTestCommon() { - s.config = &Config{} + s.config = &frac.Config{} s.tokenizers = map[seq.TokenizerType]tokenizer.Tokenizer{ seq.TokenizerTypeKeyword: tokenizer.NewKeywordTokenizer(20, false, true), seq.TokenizerTypeText: tokenizer.NewTextTokenizer(20, false, true, 100), @@ -110,6 +114,12 @@ func (s *FractionTestSuite) TearDownTestCommon() { s.NoError(err, "Failed to remove tmp dir") } +func randomHex(n int) string { + b := make([]byte, (n+1)/2) + cryptorand.Read(b) + return hex.EncodeToString(b)[:n] +} + func (s *FractionTestSuite) TestSearchKeyword() { docs := []string{ /*0*/ `{"timestamp":"2000-01-01T13:00:25Z","service":"service_a","message":"first message some text","trace_id":"abcdef","source":"prod01","level":"1"}`, @@ -1789,7 +1799,7 @@ func (s *FractionTestSuite) TestMIDDistribution() { s.insertDocuments(docs) - _, ok := s.fraction.(*Active) + _, ok := s.fraction.(*frac.Active) if ok { s.Require().Nil(s.fraction.Info().Distribution, "active fraction has MID distribution") return @@ -1828,15 +1838,15 @@ func (s *FractionTestSuite) TestFractionInfo() { s.Require().Equal(seq.MID(946731654000000000), info.To, "to doesn't match") switch s.fraction.(type) { - case *Active: + case *frac.Active: s.Require().True(info.MetaOnDisk >= uint64(250) && info.MetaOnDisk <= uint64(400), "meta on disk doesn't match. actual value: %d", info.MetaOnDisk) s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match") - case *Sealed: + case *frac.Sealed: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1400), "index on disk doesn't match. actual value: %d", info.IndexOnDisk) - case *Remote: + case *frac.Remote: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1400), "index on disk doesn't match. actual value: %d", info.IndexOnDisk) @@ -2035,9 +2045,10 @@ func (s *FractionTestSuite) AssertHist( } } -func (s *FractionTestSuite) newActive(bulks ...[]string) *Active { - baseName := filepath.Join(s.tmpDir, "test_fraction") - active := NewActive( +func (s *FractionTestSuite) newActive(bulks ...[]string) *frac.Active { + baseName := filepath.Join(s.tmpDir, randomHex(12)) + + active := frac.NewActive( baseName, s.activeIndexer, storage.NewReadLimiter(1, nil), @@ -2081,20 +2092,20 @@ func (s *FractionTestSuite) newActive(bulks ...[]string) *Active { return active } -func (s *FractionTestSuite) newSealed(bulks ...[]string) *Sealed { +func (s *FractionTestSuite) newSealed(bulks ...[]string) *frac.Sealed { active := s.newActive(bulks...) - activeSealingSource, err := NewActiveSealingSource(active, s.sealParams) + activeSealingSource, err := frac.NewActiveSealingSource(active, s.sealParams) s.Require().NoError(err, "Sealing source creation failed") preloaded, err := sealing.Seal(activeSealingSource, s.sealParams) s.Require().NoError(err, "Sealing failed") - sealed := NewSealedPreloaded( + sealed := frac.NewSealedPreloaded( active.BaseFileName, preloaded, storage.NewReadLimiter(1, nil), - newIndexCache(), + frac.NewIndexCache(), cache.NewCache[[]byte](nil, nil), s.config, testSkipMaskProvider{}, @@ -2127,7 +2138,7 @@ func (s *ActiveFractionTestSuite) SetupTest() { } func (s *ActiveFractionTestSuite) TearDownTest() { - if active, ok := s.fraction.(*Active); ok { + if active, ok := s.fraction.(*frac.Active); ok { active.Release() } else { s.Require().Nil(s.fraction, "fraction is not of Active type") @@ -2145,7 +2156,7 @@ ActiveReplayedFractionTestSuite run tests for active fraction which was replayed */ type ActiveReplayedFractionTestSuite struct { FractionTestSuite - originalFrac *Active + originalFrac *frac.Active } func (s *ActiveReplayedFractionTestSuite) SetupSuite() { @@ -2166,26 +2177,29 @@ func (s *ActiveReplayedFractionTestSuite) SetupTest() { } } -func (s *ActiveReplayedFractionTestSuite) Replay(frac *Active) Fraction { - fracFileName := frac.BaseFileName - s.originalFrac = frac - replayedFrac := NewActive( +func (s *ActiveReplayedFractionTestSuite) Replay(f *frac.Active) frac.Fraction { + s.originalFrac = f + fracFileName := f.BaseFileName + + replayedFrac := frac.NewActive( fracFileName, s.activeIndexer, storage.NewReadLimiter(1, nil), cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), - &Config{}, + &frac.Config{}, testSkipMaskProvider{}, ) + err := replayedFrac.Replay(context.Background()) s.Require().NoError(err, "replay failed") + return replayedFrac } func (s *ActiveReplayedFractionTestSuite) TearDownTest() { s.originalFrac.Release() - if active, ok := s.fraction.(*Active); ok { + if active, ok := s.fraction.(*frac.Active); ok { active.Release() } else { s.Require().Nil(s.fraction, "fraction is not of Active type") @@ -2220,7 +2234,7 @@ func (s *SealedFractionTestSuite) SetupTest() { } func (s *SealedFractionTestSuite) TearDownTest() { - if sealed, ok := s.fraction.(*Sealed); ok { + if sealed, ok := s.fraction.(*frac.Sealed); ok { sealed.Release() } else { s.Require().Nil(s.fraction, "fraction is not of Sealed type") @@ -2256,7 +2270,7 @@ func (s *SealedLoadedFractionTestSuite) SetupTest() { } func (s *SealedLoadedFractionTestSuite) TearDownTest() { - if sealed, ok := s.fraction.(*Sealed); ok { + if sealed, ok := s.fraction.(*frac.Sealed); ok { sealed.Release() } else { s.Require().Nil(s.fraction, "fraction is not of Sealed type") @@ -2268,14 +2282,14 @@ func (s *SealedLoadedFractionTestSuite) TearDownSuite() { s.TearDownSuiteCommon() } -func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Sealed { +func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *frac.Sealed { sealed := s.newSealed(bulks...) sealed.Release() - sealed = NewSealed( + sealed = frac.NewSealed( sealed.BaseFileName, storage.NewReadLimiter(1, nil), - newIndexCache(), + frac.NewIndexCache(), cache.NewCache[[]byte](nil, nil), nil, s.config, @@ -2332,13 +2346,13 @@ func (s *RemoteFractionTestSuite) SetupTest() { s.Require().NoError(err, "offload failed") s.Require().True(offloaded, "didn't offload frac") - remoteFrac := NewRemote( + remoteFrac := frac.NewRemote( context.Background(), sealed.BaseFileName, storage.NewReadLimiter(1, nil), - newIndexCache(), + frac.NewIndexCache(), cache.NewCache[[]byte](nil, nil), - sealed.info, + sealed.Info(), s.config, s3cli, testSkipMaskProvider{}, @@ -2350,7 +2364,7 @@ func (s *RemoteFractionTestSuite) SetupTest() { } func (s *RemoteFractionTestSuite) TearDownTest() { - if remote, ok := s.fraction.(*Remote); ok { + if remote, ok := s.fraction.(*frac.Remote); ok { remote.Suicide() } else { s.Require().Nil(s.fraction, "fraction is not of Remote type") @@ -2364,6 +2378,113 @@ func (s *RemoteFractionTestSuite) TearDownSuite() { s.s3server.Close() } +type CompactedFractionTestSuite struct { + FractionTestSuite +} + +func (s *CompactedFractionTestSuite) SetupSuite() { + s.SetupSuiteCommon() +} + +func (s *CompactedFractionTestSuite) SetupTest() { + s.SetupTestCommon() + + s.insertDocuments = func(bulks ...[]string) { + if s.fraction != nil { + s.Require().Fail("can insert docs only once") + } + s.fraction = s.newCompacted(bulks...) + } +} + +func (s *CompactedFractionTestSuite) TearDownTest() { + if sealed, ok := s.fraction.(*frac.Sealed); ok { + sealed.Release() + } else { + s.Require().Nil(s.fraction, "fraction is not of Sealed type") + } + s.TearDownTestCommon() +} + +func (s *CompactedFractionTestSuite) TearDownSuite() { + s.TearDownSuiteCommon() +} + +// newCompacted flattens all bulks into one doc list, splits it in half, +// seals each half as a separate fraction, and merges them with compaction.Merge. +func (s *CompactedFractionTestSuite) newCompacted(bulks ...[]string) *frac.Sealed { + // Flatten all documents because we are going to reorganize it. + var docs []string + for _, b := range bulks { + docs = append(docs, b...) + } + + var ( + reorganized [][]string + bulkSize = max(len(docs)/32, 1) + ) + + for i := 0; i < len(docs); i += bulkSize { + reorganized = append( + reorganized, + docs[i:min(i+bulkSize, len(docs))], + ) + } + + merged := s.newSealed(reorganized[0]) + for i, bulk := range reorganized[1:] { + current := s.newSealed(bulk) + + mergedBase := filepath.Join( + s.tmpDir, + fmt.Sprintf("merged-%d", i), + ) + + preloaded, err := compaction.Merge( + mergedBase, s.sealParams, + frac.NewSealedSource(merged), + frac.NewSealedSource(current), + ) + + s.Require().NoError(err) + merged = frac.NewSealedPreloaded( + mergedBase, + preloaded, + storage.NewReadLimiter(1, nil), + frac.NewIndexCache(), + cache.NewCache[[]byte](nil, nil), + s.config, + testSkipMaskProvider{}, + ) + } + + return merged +} + +// TestFractionInfo overrides the base test because DocsOnDisk is larger in a +// merged fraction (sum of two source docs files) and MIDsDistribution is not +// populated by compaction.Merge. +func (s *CompactedFractionTestSuite) TestFractionInfo() { + docs := []string{ + `{"timestamp":"2000-01-01T13:00:25Z","service":"service_a","message":"first message some text", "container":"gateway"}`, + `{"timestamp":"2000-01-01T13:00:32Z","service":"service_b","message":"second message other text", "container":"kube-proxy"}`, + `{"timestamp":"2000-01-01T13:00:43Z","service":"service_c","message":"third message other text", "container":"gateway"}`, + `{"timestamp":"2000-01-01T13:00:53Z","service":"service_a","message":"fourth message some text", "container":"kube-proxy"}`, + `{"timestamp":"2000-01-01T13:00:54Z","service":"service_c","message":"apple","container":"kube-scheduler"}`, + } + + s.insertDocuments(docs) + + info := s.fraction.Info() + + s.Require().Equal(uint32(5), info.DocsTotal, "doc total doesn't match") + s.Require().Equal(uint64(583), info.DocsRaw, "doc raw doesn't match") + s.Require().Equal(seq.MID(946731625000000000), info.From, "from doesn't match") + s.Require().Equal(seq.MID(946731654000000000), info.To, "to doesn't match") + s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match") + s.Require().True(info.IndexOnDisk > 0, "index on disk should be non-zero") +} + func TestActiveFractionTestSuite(t *testing.T) { suite.Run(t, new(ActiveFractionTestSuite)) } @@ -2383,3 +2504,7 @@ func TestSealedLoadedFractionTestSuite(t *testing.T) { func TestRemoteFractionTestSuite(t *testing.T) { suite.Run(t, new(RemoteFractionTestSuite)) } + +func TestCompactedFractionTestSuite(t *testing.T) { + suite.Run(t, new(CompactedFractionTestSuite)) +} diff --git a/frac/index_cache.go b/frac/index_cache.go index 043e8c5c..f270f209 100644 --- a/frac/index_cache.go +++ b/frac/index_cache.go @@ -7,7 +7,7 @@ import ( "github.com/ozontech/seq-db/frac/sealed/token" ) -func newIndexCache() *IndexCache { +func NewIndexCache() *IndexCache { return &IndexCache{ LegacyRegistry: cache.NewCache[[]byte](nil, nil), From ace0b9487ca7e34bf69c56ec260db67655306c2c Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Fri, 29 May 2026 12:37:00 +0300 Subject: [PATCH 8/8] fix: respect `merge_trigger` --- compaction/executor.go | 6 +----- compaction/planner.go | 27 +++++++++++++-------------- compaction/stcs.go | 2 +- 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/compaction/executor.go b/compaction/executor.go index d3b1a55c..13fcbeab 100644 --- a/compaction/executor.go +++ b/compaction/executor.go @@ -33,11 +33,6 @@ func (e *Executor) init() { for range e.workers { e.wg.Go(func() { for t := range e.p.tasks { - logger.Info( - "got new compaction task", - zap.Time("bin", t.bin), - zap.Any("snapshot", t.snapshot), - ) t.onComplete(e.compact(t)) } }) @@ -57,6 +52,7 @@ func (e *Executor) compact(t task) (*sealed.PreloadedData, error) { logger.Info( "compacting fractions", + zap.Time("bin", t.bin), zap.Strings("names", names), ) diff --git a/compaction/planner.go b/compaction/planner.go index 3d46e282..487890b6 100644 --- a/compaction/planner.go +++ b/compaction/planner.go @@ -102,14 +102,6 @@ func (p *planner) close() { } func (p *planner) pick() (task, bool) { - names := func(fracs []fraction) []string { - fnames := make([]string, len(fracs)) - for i := range fracs { - fnames[i] = fracs[i].Info().Name() - } - return fnames - } - fractions := p.fm.SealedFractionsSnapshot() snapshot := make([]fraction, len(fractions)) @@ -123,12 +115,11 @@ func (p *planner) pick() (task, bool) { p.mu.Lock() defer p.mu.Unlock() - // NOTE(dkharms): This lock guards [inflight] map. - // Maybe I can find another way to signal from worker that time-bin is free? - for _, t := range times { if _, ok := p.inflight[t]; ok { - // There is on-going compaction within this time-bin. + // NOTE(dkharms): Currently we allow only one on-going compaction + // per each time-bin however this might be not the best idea. + // So, I will revise it later. continue } @@ -168,7 +159,7 @@ func (p *planner) pick() (task, bool) { logger.Error( "failed to compact fractions", zap.Error(err), - zap.Any("snapshot", csnapshot), + zap.Any("snapshot", names(csnapshot.Fractions())), ) return } @@ -176,7 +167,7 @@ func (p *planner) pick() (task, bool) { if s == nil { logger.Info( "compaction did not produce fraction", - zap.Any("snapshot", csnapshot), + zap.Any("snapshot", names(csnapshot.Fractions())), ) return } @@ -246,3 +237,11 @@ func (p *planner) prioritize(bins map[time.Time]timestampBin) []time.Time { return ordered } + +func names[T interface{ Info() *common.Info }, S ~[]T](fracs S) []string { + fnames := make([]string, len(fracs)) + for i := range fracs { + fnames[i] = fracs[i].Info().Name() + } + return fnames +} diff --git a/compaction/stcs.go b/compaction/stcs.go index 192951b8..3c002fc2 100644 --- a/compaction/stcs.go +++ b/compaction/stcs.go @@ -44,7 +44,7 @@ func (s strategySTCS) Pick(candidates []fraction) []fraction { } fracs := bucket[:min(len(bucket), s.mergeFanIn)] - if picked := s.takeUntilSize(fracs); len(picked) > 0 { + if picked := s.takeUntilSize(fracs); len(picked) >= s.mergeTrigger { return picked } }