Skip to content

Utility for migrating from memiavl to flatKV#3275

Merged
cody-littley merged 27 commits intomainfrom
cjl/migration-2
Apr 23, 2026
Merged

Utility for migrating from memiavl to flatKV#3275
cody-littley merged 27 commits intomainfrom
cjl/migration-2

Conversation

@cody-littley
Copy link
Copy Markdown
Contributor

Describe your changes and provide context

This PR introduces a utility for migrating data from database A to database B. It's generic so that we can encapsulate migration logic. Additionally, we can reuse this utility in the future if we ever have a future migration of similar nature.

Testing performed to validate your change

Lots of unit tests.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 20, 2026

The latest Buf updates on your PR. Results from workflow Buf / buf (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed✅ passed✅ passed✅ passedApr 23, 2026, 7:56 PM

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 20, 2026

Codecov Report

❌ Patch coverage is 78.59779% with 116 lines in your changes missing coverage. Please review.
✅ Project coverage is 59.30%. Comparing base (321e02d) to head (3e2c8e4).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
sei-db/state_db/sc/migration/migration_metrics.go 10.71% 100 Missing ⚠️
sei-db/state_db/sc/migration/migration_manager.go 96.17% 4 Missing and 4 partials ⚠️
...tate_db/sc/migration/memiavl_migration_iterator.go 92.30% 3 Missing and 3 partials ⚠️
sei-db/state_db/sc/migration/migration_types.go 80.00% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3275      +/-   ##
==========================================
+ Coverage   59.11%   59.30%   +0.18%     
==========================================
  Files        2085     2076       -9     
  Lines      170874   169824    -1050     
==========================================
- Hits       101019   100706     -313     
+ Misses      61059    60363     -696     
+ Partials     8796     8755      -41     
Flag Coverage Δ
sei-chain-pr 78.59% <78.59%> (?)
sei-db 70.41% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
sei-db/state_db/sc/migration/migration_boundary.go 100.00% <100.00%> (ø)
...b/state_db/sc/migration/mock_migration_iterator.go 100.00% <100.00%> (ø)
sei-db/state_db/sc/migration/migration_types.go 80.00% <80.00%> (ø)
...tate_db/sc/migration/memiavl_migration_iterator.go 92.30% <92.30%> (ø)
sei-db/state_db/sc/migration/migration_manager.go 96.17% <96.17%> (ø)
sei-db/state_db/sc/migration/migration_metrics.go 10.71% <10.71%> (ø)

... and 113 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

newErr = e
newDone = true
case <-ctx.Done():
return ctx.Err()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to drain both channels before returning here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the context gets cancelled, this is basically a rugpull, and my goal is to tear down everything to avoid deadlocked threads. IMO, we should treat an unexpected context cancel equivalent to a "we're crashing now, free up everything in preparation for that".

The most useful time for this sort of teardown behavior is in unit tests. If we always use t.Context(), then we ensure that we don't leave behind deadlocked threads when a test ends. The goal is to make it so that each thread has a fast teardown mechanism for when we want to tear it down.

case MigrationComplete:
return len(treeNames)
}
return sort.Search(len(treeNames), func(i int) bool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think tree name is guaranteed to be sorted?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are currently sorted. But you make a good point, it's fragile for us to assume they will always be sorted. I added a defensive sort just in case.

// Write the value to the new DB.
putPair(newDBPairsByStore, value.ModuleName, &proto.KVPair{Key: value.Key, Value: value.Value})
// Delete the value from the old DB.
putPair(oldDBPairsByStore, value.ModuleName, &proto.KVPair{Key: value.Key, Delete: true})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would recommend add a metrics how how many keys are migrated successfully

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics added. The count metric will get reset after a restart or a state sync, but that should be ok as long as we are aware that it works that way.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to commit the binaries here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we don't! Removed.

This happened when I switched branches. It's was in the .gitignore in that branch, but not this one.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

// positioned at the start (boundary defaults to MigrationBoundaryNotStarted).
// If autoRebuild is true, the iterator re-reads from Data before every
// NextBatch call, so external mutations are picked up automatically.
func NewMapMigrationIterator(data map[string]map[string][]byte, autoRebuild bool) *MapMigrationIterator {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks this only being used in the testing files? how about rename it to mock_migration_iterator_test.go

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

if m == nil || m.boundarySnapshot == nil {
return
}
go func() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the goroutine is fire-and-forget. do we want to add some done chan struct{} or sync.WaitGroup to expose()a Close() method or something?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goroutine dies when the context is cancelled. But it's not tricky to add an explicit Close() method. Added.

Comment on lines +1 to +454
package migration

import (
"context"
"encoding/binary"
"errors"
"fmt"
"sort"
"time"

"github.com/sei-protocol/sei-chain/sei-db/proto"
"github.com/sei-protocol/seilog"
)

var logger = seilog.NewLogger("db", "state-db", "sc", "migration")

// MigrationManager handles migration from one database to another,
// routing reads and writes during the course of the migration.
//
// MigrationManager supports concurrent Read calls. ApplyChangeSets must not
// run concurrently with Read or with itself.
//
// A migration manager has two states: "migrating" and "passthrough".
//
// 1. Migrating: active migration. Reads split across old/new DBs by
// boundary, writes are routed across the boundary and applied to
// both DBs in parallel. Each block, N keys are deleted from the old DB
// and written to the new DB.
// 2. Passthrough: All reads/writes forwarded directly
// to the new DB. No boundary, no iterator.
type MigrationManager struct {

// For reading values out of the old database. May be nil once the
// manager is in the passthrough state (post-finalization or
// constructed at targetVersion).
oldDBReader DBReader

// For writing values to the old database. May be nil in passthrough
// (see oldDBReader).
oldDBWriter DBWriter

// For reading values out of the new database.
newDBReader DBReader

// For writing values to the new database.
newDBWriter DBWriter

// For iterating through key-value pairs to migrate in the old
// database. May be nil in passthrough.
iterator MigrationIterator

// The boundary of the migration. All keys to the left of (or equal
// to) the boundary are considered migrated. In passthrough this is
// pinned to MigrationBoundaryComplete, though Read short-circuits
// via migrationFinished before consulting the boundary.
boundary MigrationBoundary

// The number of key-value pairs to migrate after each write operation.
migrationBatchSize int

// The version we want to migrate to.
targetVersion uint64

// If true, then the migration has been fully completed.
migrationFinished bool

// Optional metrics sink. May be nil; all calls on this field go
// through nil-safe methods on *MigrationMetrics.
metrics *MigrationMetrics
}

// Handles the migration of data from one database to another.
func NewMigrationManager(
// The number of key-value pairs to migrate after each write operation. Must be > 0.
migrationBatchSize int,
// The migration version the stored data is expected to be at on entry. If no prior migration
// version is stored in the DB, startVersion should be 0.
startVersion uint64,
// The migration version after the migration is complete.
// Must be strictly greater than startVersion.
targetVersion uint64,
// For reading values out of the old database. May be nil iff the new
// DB already reports targetVersion.
oldDBReader DBReader,
// For writing values to the old database. May be nil iff the new DB
// already reports targetVersion.
oldDBWriter DBWriter,
// For reading values out of the new database.
newDBReader DBReader,
// For writing values to the new database.
newDBWriter DBWriter,
// For iterating through key-value pairs to migrate in the old
// database. May be nil iff the new DB already reports targetVersion.
iterator MigrationIterator,
// Optional metrics sink. Pass nil to disable metric emission.
metrics *MigrationMetrics,
) (*MigrationManager, error) {

// Always-required handles and parameters.
if newDBReader == nil {
return nil, errors.New("newDBReader must not be nil")
}
if newDBWriter == nil {
return nil, errors.New("newDBWriter must not be nil")
}
if migrationBatchSize <= 0 {
return nil, fmt.Errorf("migration batch size must be positive, got %d", migrationBatchSize)
}
if startVersion >= targetVersion {
return nil, fmt.Errorf("startVersion (%d) must be strictly less than targetVersion (%d)",
startVersion, targetVersion)
}

// Look up the version from the new DB first. If it's already at
// targetVersion the migration has completed on a prior boot and we
// don't need the old DB for anything.
currentMigrationVersion, versionKnown, err := readVersionFromDB(newDBReader)
if err != nil {
return nil, fmt.Errorf("failed to read migration version from new DB: %w", err)
}

if versionKnown {
if currentMigrationVersion == targetVersion {
// Passthrough path, migration already complete.
logger.Info("migration manager constructed in passthrough mode", "targetVersion", targetVersion)
metrics.SetVersion(targetVersion)
metrics.SetBoundary(MigrationBoundaryComplete)
return &MigrationManager{
newDBReader: newDBReader,
newDBWriter: newDBWriter,
boundary: MigrationBoundaryComplete,
migrationBatchSize: migrationBatchSize,
targetVersion: targetVersion,
migrationFinished: true,
metrics: metrics,
}, nil
}
if currentMigrationVersion != startVersion {
return nil, fmt.Errorf(
"unexpected migration version in new DB: expected %d (start) or %d (target), got %d",
startVersion, targetVersion, currentMigrationVersion)
}
}

// Migration is not complete, so we can't tolerate nil old DB accessors.
if oldDBReader == nil {
return nil, errors.New("oldDBReader must not be nil when new DB is not at targetVersion")
}
if oldDBWriter == nil {
return nil, errors.New("oldDBWriter must not be nil when new DB is not at targetVersion")
}
if iterator == nil {
return nil, errors.New("iterator must not be nil when new DB is not at targetVersion")
}

if !versionKnown {
// The version wasn't in the new DB, so read it from the old DB.
currentMigrationVersion, _, err = readVersionFromDB(oldDBReader)
if err != nil {
return nil, fmt.Errorf("failed to read migration version from old DB: %w", err)
}
if currentMigrationVersion != startVersion {
return nil, fmt.Errorf(
"unexpected migration version in old DB: expected %d, got %d", startVersion, currentMigrationVersion)
}
}

boundary, err := readMigrationBoundary(newDBReader)
if err != nil {
return nil, fmt.Errorf("failed to read migration boundary: %w", err)
}
iterator.SetBoundary(boundary)

logger.Info("initialized migration manager",
"startVersion", startVersion,
"targetVersion", targetVersion,
"boundary", boundary.String())

metrics.SetVersion(currentMigrationVersion)
metrics.SetBoundary(boundary)

return &MigrationManager{
oldDBReader: oldDBReader,
oldDBWriter: oldDBWriter,
newDBReader: newDBReader,
newDBWriter: newDBWriter,
iterator: iterator,
boundary: boundary,
migrationBatchSize: migrationBatchSize,
targetVersion: targetVersion,
metrics: metrics,
}, nil
}

// readMigrationBoundary reads the current migration boundary from the new
// database, or returns MigrationBoundaryNotStarted if none is stored yet.
func readMigrationBoundary(newDBReader DBReader) (MigrationBoundary, error) {
boundaryBytes, ok, err := newDBReader(MigrationStore, []byte(MigrationBoundaryKey))
if err != nil {
return MigrationBoundary{}, fmt.Errorf("failed to get migration boundary: %w", err)
}
if !ok {
return MigrationBoundaryNotStarted, nil
}
boundary, err := DeserializeMigrationBoundary(boundaryBytes)
if err != nil {
return MigrationBoundary{}, fmt.Errorf("failed to deserialize migration boundary: %w", err)
}
return boundary, nil
}

// readVersionFromDB reads MigrationVersionKey from the given DB's
// MigrationStore, returning (version, present, error). An absent key is
// reported as (0, false, nil) so the caller can distinguish "not set"
// from "explicitly zero".
//
// This helper takes a raw DBReader rather than going through
// MigrationManager.Read because the MigrationStore is reserved for
// internal use and MigrationManager.Read rejects reads against it.
func readVersionFromDB(reader DBReader) (uint64, bool, error) {
data, ok, err := reader(MigrationStore, []byte(MigrationVersionKey))
if err != nil {
return 0, false, err
}
if !ok {
return 0, false, nil
}
if len(data) != 8 {
return 0, false, fmt.Errorf(
"expected 8-byte migration version, got %d bytes", len(data))
}
return binary.BigEndian.Uint64(data), true, nil
}

// IsAtVersion reports whether the DB reached by reader is currently at the
// given migration version. An absent MigrationVersionKey is interpreted as
// version 0.
//
// Intended for callers that need to decide, before constructing a
// MigrationManager, whether to bother opening the legacy/old DB at all:
//
// atTarget, err := migration.IsAtVersion(newReader, targetVersion)
// if err != nil { /* handle */ }
// if atTarget {
// // Skip opening the old DB; just go straight to the new one.
// }
//
// This is a pure lookup; it does not mutate state or call any finalizer.
func IsAtVersion(reader DBReader, version uint64) (bool, error) {
v, _, err := readVersionFromDB(reader)
if err != nil {
return false, err
}
return v == version, nil
}

// Read a value from the database. If the requested value is migrated, read it from the new database.
// Otherwise, read it from the old database.
//
// Reads targeting MigrationStore are rejected with an error: that store
// is reserved for the manager's own bookkeeping.
//
// In passthrough (migrationFinished=true), all reads route to the new DB.
//
// Not safe to call concurrently with ApplyChangeSets.
func (m *MigrationManager) Read(store string, key []byte) ([]byte, bool, error) {
if store == MigrationStore {
// The migration module is reserved for internal use, do not permit outer scope reads from it.
return nil, false, fmt.Errorf("reads from the 'migration' module are not permitted")
}
if m.migrationFinished {
// We've finished the migration, all reads should go to the new DB.
return m.newDBReader(store, key)
}
if m.boundary.IsMigrated(store, key) {
// We are mid-migration and this key has already been migrated, read it from the new DB.
return m.newDBReader(store, key)
}
// We are mid-migration and this key has not been migrated, read it from the old DB.
return m.oldDBReader(store, key)
}

// ApplyChangeSets applies a batch of change sets to the database.
//
// Not safe to call concurrently with Read or itself.
func (m *MigrationManager) ApplyChangeSets(ctx context.Context, changesets []*proto.NamedChangeSet) error {
start := time.Now()
defer func() {
m.metrics.RecordApplyDuration(time.Since(start))
}()

if changesets == nil {
changesets = make([]*proto.NamedChangeSet, 0)
}
for _, cs := range changesets {
if cs.Name == MigrationStore {
// The migration module is reserved for internal use, do not permit outer scope writes to it.
return fmt.Errorf("writes to internal migration store %q are not permitted", MigrationStore)
}
}

if m.migrationFinished {
// Passthrough: migration is complete.
if err := m.newDBWriter(changesets); err != nil {
return fmt.Errorf("failed to apply changes to new database: %w", err)
}
return nil
}

// Get the next batch of keys to migrate.
valuesToMigrate, newBoundary, err := m.iterator.NextBatch(m.migrationBatchSize)
if err != nil {
return fmt.Errorf("failed to get next batch: %w", err)
}
m.boundary = newBoundary
m.migrationFinished = newBoundary.Status() == MigrationComplete
m.metrics.SetBoundary(newBoundary)

// Pairs destined for each DB, grouped by store name and keyed by KVPair.Key.
oldDBPairsByStore := make(map[string]map[string]*proto.KVPair)
newDBPairsByStore := make(map[string]map[string]*proto.KVPair)

// Create change sets that move the values to migrate from the old DB to the new DB.
var keyBytesThisBatch, valueBytesThisBatch int64
for _, value := range valuesToMigrate {
keyBytesThisBatch += int64(len(value.Key))
valueBytesThisBatch += int64(len(value.Value))
// Write the value to the new DB.
putPair(newDBPairsByStore, value.ModuleName, &proto.KVPair{Key: value.Key, Value: value.Value})
// Delete the value from the old DB.
putPair(oldDBPairsByStore, value.ModuleName, &proto.KVPair{Key: value.Key, Delete: true})
}
m.metrics.ReportKeysMigrated(int64(len(valuesToMigrate)), keyBytesThisBatch, valueBytesThisBatch)

// For each pair in the original change sets, route to the appropriate database.
// These must overwrite migrated values, so it's important to do this after we've collected
// the change set for the migrated values.
for _, changeSet := range changesets {
for _, pair := range changeSet.Changeset.Pairs {
if m.boundary.IsMigrated(changeSet.Name, pair.Key) {
putPair(newDBPairsByStore, changeSet.Name, pair)
} else {
putPair(oldDBPairsByStore, changeSet.Name, pair)
}
}
}

oldDBChangeSet := flattenPairsByStore(oldDBPairsByStore)
newDBChangeSets := flattenPairsByStore(newDBPairsByStore)

if m.migrationFinished {
// On the final block of the migration, update the migration version and delete the boundary.
versionBytes := make([]byte, 8)
binary.BigEndian.PutUint64(versionBytes, m.targetVersion)
newDBChangeSets = append(newDBChangeSets, &proto.NamedChangeSet{
Name: MigrationStore,
Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{
{Key: []byte(MigrationVersionKey), Value: versionBytes},
{Key: []byte(MigrationBoundaryKey), Delete: true},
}},
})
// Mirror the on-disk version bump in the in-memory metric so the
// version gauge and the boundary-snapshot loop see the
// completion at the same moment the DB does.
m.metrics.SetVersion(m.targetVersion)
} else {
// On every other block of the migration, update the boundary.
newDBChangeSets = append(newDBChangeSets, &proto.NamedChangeSet{
Name: MigrationStore,
Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{
{Key: []byte(MigrationBoundaryKey), Value: newBoundary.Serialize()},
}},
})
}

// Apply changes to each database in parallel.
oldDBErr := make(chan error, 1)
newDBErr := make(chan error, 1)
go func() {
err := m.oldDBWriter(oldDBChangeSet)
if err != nil {
err = fmt.Errorf("failed to apply changes to old database: %w", err)
}
oldDBErr <- err
}()
go func() {
err := m.newDBWriter(newDBChangeSets)
if err != nil {
err = fmt.Errorf("failed to apply changes to new database: %w", err)
}
newDBErr <- err
}()

// Wait for both writers to finish.
var oldErr, newErr error
oldDone, newDone := false, false
for !oldDone || !newDone {
select {
case e := <-oldDBErr:
oldErr = e
oldDone = true
case e := <-newDBErr:
newErr = e
newDone = true
case <-ctx.Done():
return ctx.Err()
}
}

if err := errors.Join(oldErr, newErr); err != nil {
return fmt.Errorf("failed to apply changes to databases: %w", err)
}

return nil
}

// putPair inserts pair into dest under (store, pair.Key), creating the inner
// map on demand. Later writes to the same (store, key) overwrite earlier ones.
func putPair(dest map[string]map[string]*proto.KVPair, store string, pair *proto.KVPair) {
byKey, ok := dest[store]
if !ok {
byKey = make(map[string]*proto.KVPair)
dest[store] = byKey
}
byKey[string(pair.Key)] = pair
}

// flattenPairsByStore collapses a store-keyed map of (key -> KVPair) into one
// NamedChangeSet per store, with stores and pairs emitted in sorted order for
// deterministic downstream writes.
func flattenPairsByStore(pairsByStore map[string]map[string]*proto.KVPair) []*proto.NamedChangeSet {
storeNames := make([]string, 0, len(pairsByStore))
for name := range pairsByStore {
storeNames = append(storeNames, name)
}
sort.Strings(storeNames)

changeSets := make([]*proto.NamedChangeSet, 0, len(storeNames))
for _, name := range storeNames {
byKey := pairsByStore[name]
pairs := make([]*proto.KVPair, 0, len(byKey))
for _, pair := range byKey {
pairs = append(pairs, pair)
}
sort.Slice(pairs, func(i, j int) bool {
return string(pairs[i].Key) < string(pairs[j].Key)
})
changeSets = append(changeSets, &proto.NamedChangeSet{
Name: name,
Changeset: proto.ChangeSet{Pairs: pairs},
})
}
return changeSets
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

during block processing, could queries arrive while ApplyChangeSets runs? do we need a RWMutex here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requires discussion.

Although it is currently thread safe to read values concurrently with a write, we don't have API for atomic batch reads. Even without this code, a batch read might return an inconsistant slice of data (i.e. some data from block N, other data from block N+1).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added mutex to make this safe.

buf := make([]byte, 1+4+len(nameBytes)+len(mb.key))
buf[0] = byte(mb.status)
binary.BigEndian.PutUint32(buf[1:5], uint32(len(nameBytes))) //nolint:gosec // module names are short strings
copy(buf[5:5+len(nameBytes)], nameBytes)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the number 5 (1-byte status + 4-byte length prefix) appears ~8 times across Serialize and DeserializeMigrationBoundary. consider add single constant (like const headerSize = 5)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@cody-littley cody-littley added this pull request to the merge queue Apr 23, 2026
Merged via the queue into main with commit afbc900 Apr 23, 2026
38 checks passed
@cody-littley cody-littley deleted the cjl/migration-2 branch April 23, 2026 21:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants