From 2a36fce31c49b5d018e3cefc6f1b97a1ed5a6bfa Mon Sep 17 00:00:00 2001 From: blindchaser Date: Mon, 16 Mar 2026 00:43:48 -0400 Subject: [PATCH 1/6] feat: per-db lthash --- sei-db/state_db/sc/flatkv/importer.go | 3 + sei-db/state_db/sc/flatkv/keys.go | 44 +- sei-db/state_db/sc/flatkv/keys_test.go | 32 +- .../state_db/sc/flatkv/perdb_lthash_test.go | 636 ++++++++++++++++++ sei-db/state_db/sc/flatkv/snapshot.go | 5 + sei-db/state_db/sc/flatkv/store.go | 52 +- sei-db/state_db/sc/flatkv/store_catchup.go | 3 + sei-db/state_db/sc/flatkv/store_meta.go | 138 +++- sei-db/state_db/sc/flatkv/store_write.go | 46 +- 9 files changed, 924 insertions(+), 35 deletions(-) create mode 100644 sei-db/state_db/sc/flatkv/perdb_lthash_test.go diff --git a/sei-db/state_db/sc/flatkv/importer.go b/sei-db/state_db/sc/flatkv/importer.go index 6dfb7eea3c..32530f5256 100644 --- a/sei-db/state_db/sc/flatkv/importer.go +++ b/sei-db/state_db/sc/flatkv/importer.go @@ -76,6 +76,9 @@ func (imp *KVImporter) Close() error { imp.store.committedVersion = imp.version imp.store.committedLtHash = imp.store.workingLtHash.Clone() + for dbDir, h := range imp.store.perDBWorkingLtHash { + imp.store.perDBCommittedLtHash[dbDir] = h.Clone() + } if err := imp.store.commitGlobalMetadata(imp.version, imp.store.committedLtHash); err != nil { return fmt.Errorf("import global metadata: %w", err) } diff --git a/sei-db/state_db/sc/flatkv/keys.go b/sei-db/state_db/sc/flatkv/keys.go index b5ef53963d..432ff5444b 100644 --- a/sei-db/state_db/sc/flatkv/keys.go +++ b/sei-db/state_db/sc/flatkv/keys.go @@ -4,6 +4,8 @@ import ( "bytes" "encoding/binary" "fmt" + + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/lthash" ) // DBLocalMetaKey is the key for per-DB local metadata. @@ -28,31 +30,53 @@ const ( BalanceLen = 32 NonceLen = 8 - // localMetaSize is the serialized size of LocalMeta (version = 8 bytes) - localMetaSize = 8 + // localMetaVersionOnly is the serialized size of the old format (version only). + localMetaVersionOnly = 8 + // localMetaWithLtHash is the serialized size with LtHash (version + 2048 bytes). + localMetaWithLtHash = localMetaVersionOnly + lthash.LtHashBytes ) // LocalMeta stores per-DB version tracking metadata. // Stored inside each DB at DBLocalMetaKey (0x00). type LocalMeta struct { - CommittedVersion int64 // Current committed version in this DB + CommittedVersion int64 // Current committed version in this DB + LtHash *lthash.LtHash // nil for old format (version-only) } -// MarshalLocalMeta encodes LocalMeta as fixed 8 bytes (big-endian). +// MarshalLocalMeta encodes LocalMeta to bytes. +// If LtHash is non-nil: 8 + 2048 = 2056 bytes. Otherwise: 8 bytes (backward compat). func MarshalLocalMeta(m *LocalMeta) []byte { - buf := make([]byte, localMetaSize) + if m.LtHash != nil { + buf := make([]byte, localMetaWithLtHash) + binary.BigEndian.PutUint64(buf, uint64(m.CommittedVersion)) //nolint:gosec // version is always non-negative + m.LtHash.MarshalTo(buf[localMetaVersionOnly:]) + return buf + } + buf := make([]byte, localMetaVersionOnly) binary.BigEndian.PutUint64(buf, uint64(m.CommittedVersion)) //nolint:gosec // version is always non-negative return buf } // UnmarshalLocalMeta decodes LocalMeta from bytes. +// Accepts 8-byte (old, LtHash=nil) and 2056-byte (new, with LtHash) formats. func UnmarshalLocalMeta(data []byte) (*LocalMeta, error) { - if len(data) != localMetaSize { - return nil, fmt.Errorf("invalid LocalMeta size: got %d, want %d", len(data), localMetaSize) + switch len(data) { + case localMetaVersionOnly: + return &LocalMeta{ + CommittedVersion: int64(binary.BigEndian.Uint64(data)), //nolint:gosec // version won't exceed int64 max + }, nil + case localMetaWithLtHash: + h, err := lthash.Unmarshal(data[localMetaVersionOnly:]) + if err != nil { + return nil, fmt.Errorf("unmarshal LocalMeta LtHash: %w", err) + } + return &LocalMeta{ + CommittedVersion: int64(binary.BigEndian.Uint64(data[:localMetaVersionOnly])), //nolint:gosec // version won't exceed int64 max + LtHash: h, + }, nil + default: + return nil, fmt.Errorf("invalid LocalMeta size: got %d, want %d or %d", len(data), localMetaVersionOnly, localMetaWithLtHash) } - return &LocalMeta{ - CommittedVersion: int64(binary.BigEndian.Uint64(data)), //nolint:gosec // version won't exceed int64 max - }, nil } // Address is an EVM address (20 bytes). diff --git a/sei-db/state_db/sc/flatkv/keys_test.go b/sei-db/state_db/sc/flatkv/keys_test.go index b4dc64368b..d1f0e5d8bc 100644 --- a/sei-db/state_db/sc/flatkv/keys_test.go +++ b/sei-db/state_db/sc/flatkv/keys_test.go @@ -5,6 +5,7 @@ import ( "math/rand" "testing" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/lthash" "github.com/stretchr/testify/require" ) @@ -241,31 +242,52 @@ func TestLocalMetaSerialization(t *testing.T) { t.Run("RoundTripZero", func(t *testing.T) { original := &LocalMeta{CommittedVersion: 0} encoded := MarshalLocalMeta(original) - require.Equal(t, localMetaSize, len(encoded)) + require.Equal(t, localMetaVersionOnly, len(encoded)) decoded, err := UnmarshalLocalMeta(encoded) require.NoError(t, err) require.Equal(t, original.CommittedVersion, decoded.CommittedVersion) + require.Nil(t, decoded.LtHash) }) t.Run("RoundTripPositive", func(t *testing.T) { original := &LocalMeta{CommittedVersion: 12345} encoded := MarshalLocalMeta(original) - require.Equal(t, localMetaSize, len(encoded)) + require.Equal(t, localMetaVersionOnly, len(encoded)) decoded, err := UnmarshalLocalMeta(encoded) require.NoError(t, err) require.Equal(t, original.CommittedVersion, decoded.CommittedVersion) + require.Nil(t, decoded.LtHash) }) t.Run("RoundTripMaxInt64", func(t *testing.T) { original := &LocalMeta{CommittedVersion: math.MaxInt64} encoded := MarshalLocalMeta(original) - require.Equal(t, localMetaSize, len(encoded)) + require.Equal(t, localMetaVersionOnly, len(encoded)) decoded, err := UnmarshalLocalMeta(encoded) require.NoError(t, err) require.Equal(t, original.CommittedVersion, decoded.CommittedVersion) + require.Nil(t, decoded.LtHash) + }) + + t.Run("RoundTripWithLtHash", func(t *testing.T) { + h := lthash.New() + h.MixIn(func() *lthash.LtHash { + pairs := []lthash.KVPairWithLastValue{{Key: []byte("k"), Value: []byte("v")}} + r, _ := lthash.ComputeLtHash(nil, pairs) + return r + }()) + original := &LocalMeta{CommittedVersion: 42, LtHash: h} + encoded := MarshalLocalMeta(original) + require.Equal(t, localMetaWithLtHash, len(encoded)) + + decoded, err := UnmarshalLocalMeta(encoded) + require.NoError(t, err) + require.Equal(t, original.CommittedVersion, decoded.CommittedVersion) + require.NotNil(t, decoded.LtHash) + require.True(t, original.LtHash.Equal(decoded.LtHash)) }) t.Run("InvalidLength", func(t *testing.T) { @@ -274,8 +296,8 @@ func TestLocalMetaSerialization(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "invalid LocalMeta size") - // Too long - _, err = UnmarshalLocalMeta(make([]byte, localMetaSize+1)) + // Neither old nor new format + _, err = UnmarshalLocalMeta(make([]byte, localMetaVersionOnly+1)) require.Error(t, err) require.Contains(t, err.Error(), "invalid LocalMeta size") }) diff --git a/sei-db/state_db/sc/flatkv/perdb_lthash_test.go b/sei-db/state_db/sc/flatkv/perdb_lthash_test.go new file mode 100644 index 0000000000..b4f14366c2 --- /dev/null +++ b/sei-db/state_db/sc/flatkv/perdb_lthash_test.go @@ -0,0 +1,636 @@ +package flatkv + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/sei-protocol/sei-chain/sei-db/db_engine/pebbledb" + "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" + "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/lthash" + scTypes "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/types" + iavl "github.com/sei-protocol/sei-chain/sei-iavl/proto" + "github.com/stretchr/testify/require" +) + +// fullScanPerDBLtHash computes LtHash for each data DB individually via full scan. +func fullScanPerDBLtHash(t *testing.T, s *CommitStore) map[string]*lthash.LtHash { + t.Helper() + result := make(map[string]*lthash.LtHash, 4) + for dbDir, db := range map[string]types.KeyValueDB{ + accountDBDir: s.accountDB, + codeDBDir: s.codeDB, + storageDBDir: s.storageDB, + legacyDBDir: s.legacyDB, + } { + h, err := fullScanDBLtHash(db) + require.NoError(t, err) + result[dbDir] = h + } + return result +} + +// verifyPerDBLtHash checks that the in-memory per-DB working hashes +// match a full scan of each respective database. +func verifyPerDBLtHash(t *testing.T, s *CommitStore) { + t.Helper() + scanned := fullScanPerDBLtHash(t, s) + for dbDir, scanHash := range scanned { + require.True(t, s.perDBWorkingLtHash[dbDir].Equal(scanHash), + "per-DB LtHash mismatch for %s:\n working: %x\n fullscan: %x", + dbDir, s.perDBWorkingLtHash[dbDir].Checksum(), scanHash.Checksum()) + } +} + +// commitMixedState applies changesets with data across all 4 DB types. +func commitMixedState(t *testing.T, s *CommitStore, round int) { + t.Helper() + addr := addrN(byte(round)) + slot := slotN(byte(round)) + legacyKey := append([]byte{0x09}, addr[:]...) + + cs1 := namedCS( + noncePair(addr, uint64(round)), + codeHashPair(addr, codeHashN(byte(round))), + codePair(addr, []byte{0x60, 0x80, byte(round)}), + storagePair(addr, slot, []byte{byte(round), 0xAA}), + ) + cs2 := makeChangeSet(legacyKey, []byte{byte(round), 0xBB}, false) + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1, cs2})) + _, err := s.Commit() + require.NoError(t, err) +} + +// deletePerDBKeysFromMetadataDB simulates a pre-upgrade store by removing +// per-DB LTHash keys from metadataDB. +func deletePerDBKeysFromMetadataDB(t *testing.T, metadataDBPath string) { + t.Helper() + db, err := pebbledb.Open(context.Background(), metadataDBPath, types.OpenOptions{}, false) + require.NoError(t, err) + for _, metaKey := range perDBLtHashKeys { + _ = db.Delete([]byte(metaKey), types.WriteOptions{}) + } + require.NoError(t, db.Close()) +} + +// simulateUpgrade removes per-DB LTHash keys from both the snapshot and +// working directory's metadataDB, and removes SNAPSHOT_BASE to force re-clone. +func simulateUpgrade(t *testing.T, flatkvDir string) { + t.Helper() + snapDir, _, err := currentSnapshotDir(flatkvDir) + require.NoError(t, err) + deletePerDBKeysFromMetadataDB(t, filepath.Join(snapDir, metadataDir)) + + workingMetaPath := filepath.Join(flatkvDir, workingDirName, metadataDir) + if _, err := os.Stat(workingMetaPath); err == nil { + deletePerDBKeysFromMetadataDB(t, workingMetaPath) + } + _ = os.Remove(filepath.Join(flatkvDir, workingDirName, snapshotBaseFile)) +} + +// Test 1: Crash recovery with global/local skew -- verify per-DB LTHash +// is correct after catchup replays the skewed version. +func TestPerDBLtHashSkewRecovery(t *testing.T) { + dir := t.TempDir() + dbDir := filepath.Join(dir, flatkvRootDir) + + s1 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err := s1.LoadVersion(0, false) + require.NoError(t, err) + + commitMixedState(t, s1, 1) + commitMixedState(t, s1, 2) + verifyPerDBLtHash(t, s1) + require.NoError(t, s1.Close()) + + // Tamper with accountDB's LocalMeta to simulate incomplete commit + // (accountDB thinks it's at v1, but global says v2) + flatkvDir := dbDir + snapDir, _, err := currentSnapshotDir(flatkvDir) + require.NoError(t, err) + + accountDBPath := filepath.Join(snapDir, accountDBDir) + db, err := pebbledb.Open(t.Context(), accountDBPath, types.OpenOptions{}, false) + require.NoError(t, err) + lagMeta := &LocalMeta{CommittedVersion: 1} + require.NoError(t, db.Set(DBLocalMetaKey, MarshalLocalMeta(lagMeta), types.WriteOptions{Sync: true})) + require.NoError(t, db.Close()) + + // Reopen -- should detect skew and catchup + s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err = s2.LoadVersion(0, false) + require.NoError(t, err) + defer s2.Close() + + require.Equal(t, int64(2), s2.Version()) + verifyPerDBLtHash(t, s2) + verifyLtHashAtHeight(t, s2, 2) +} + +// Test 2: Upgrade from old format -- delete per-DB keys from metadataDB, +// reopen, verify backfill produces correct per-DB hashes AFTER catchup. +func TestPerDBLtHashUpgradeBackfill(t *testing.T) { + dir := t.TempDir() + dbDir := filepath.Join(dir, flatkvRootDir) + + s1 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err := s1.LoadVersion(0, false) + require.NoError(t, err) + + commitMixedState(t, s1, 1) + commitMixedState(t, s1, 2) + commitMixedState(t, s1, 3) + verifyPerDBLtHash(t, s1) + require.NoError(t, s1.Close()) + + // Simulate pre-upgrade format by removing per-DB keys + simulateUpgrade(t, dbDir) + + // Reopen -- triggers backfill + s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err = s2.LoadVersion(0, false) + require.NoError(t, err) + defer s2.Close() + + require.Equal(t, int64(3), s2.Version()) + require.False(t, s2.needsPerDBBackfill, "backfill should have cleared the flag") + verifyPerDBLtHash(t, s2) + verifyLtHashAtHeight(t, s2, 3) +} + +// Test 3: Upgrade + skew combined -- old format + tampered LocalMeta version, +// verify backfill after catchup still produces correct per-DB hashes. +func TestPerDBLtHashUpgradeWithSkew(t *testing.T) { + dir := t.TempDir() + dbDir := filepath.Join(dir, flatkvRootDir) + + s1 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err := s1.LoadVersion(0, false) + require.NoError(t, err) + + commitMixedState(t, s1, 1) + commitMixedState(t, s1, 2) + require.NoError(t, s1.Close()) + + flatkvDir := dbDir + + // Simulate upgrade + simulateUpgrade(t, flatkvDir) + + // Tamper accountDB in snapshot to simulate crash skew (version 1 instead of 2) + snapDir, _, err := currentSnapshotDir(flatkvDir) + require.NoError(t, err) + accountDBPath := filepath.Join(snapDir, accountDBDir) + db, err := pebbledb.Open(t.Context(), accountDBPath, types.OpenOptions{}, false) + require.NoError(t, err) + lagMeta := &LocalMeta{CommittedVersion: 1} + require.NoError(t, db.Set(DBLocalMetaKey, MarshalLocalMeta(lagMeta), types.WriteOptions{Sync: true})) + require.NoError(t, db.Close()) + + // Reopen -- catchup replays v2 with zero per-DB hashes, then backfill scans + s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err = s2.LoadVersion(0, false) + require.NoError(t, err) + defer s2.Close() + + require.Equal(t, int64(2), s2.Version()) + verifyPerDBLtHash(t, s2) + verifyLtHashAtHeight(t, s2, 2) +} + +// Test 4: Crash between catchup metadata flush and backfill -- simulate crash +// after catchup commitGlobalMetadata but before backfill persist, reopen and +// verify per-DB keys are still absent so backfill re-triggers. +func TestPerDBLtHashCrashBetweenCatchupAndBackfill(t *testing.T) { + dir := t.TempDir() + dbDir := filepath.Join(dir, flatkvRootDir) + + s1 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err := s1.LoadVersion(0, false) + require.NoError(t, err) + + commitMixedState(t, s1, 1) + commitMixedState(t, s1, 2) + commitMixedState(t, s1, 3) + require.NoError(t, s1.Close()) + + flatkvDir := dbDir + simulateUpgrade(t, flatkvDir) + + // Open without calling backfill by doing a manual partial open: + // open() calls loadGlobalMetadata which sets needsPerDBBackfill = true. + // Then catchup runs, calling commitGlobalMetadata -- it must NOT write per-DB keys. + // We then close without running backfill to simulate a crash. + s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + require.NoError(t, s2.open()) + require.True(t, s2.needsPerDBBackfill, "backfill flag should be set after load") + require.NoError(t, s2.catchup(0)) + require.NoError(t, s2.Close()) + + // Verify per-DB keys are still absent in the working dir's metadataDB. + // (open() cloned from snapshot which has keys deleted.) + workingMetaPath := filepath.Join(flatkvDir, workingDirName, metadataDir) + mdb, err := pebbledb.Open(t.Context(), workingMetaPath, types.OpenOptions{}, false) + require.NoError(t, err) + for dbName, metaKey := range perDBLtHashKeys { + _, getErr := mdb.Get([]byte(metaKey)) + require.Error(t, getErr, "per-DB key %s (%s) should still be absent after crash window", metaKey, dbName) + } + require.NoError(t, mdb.Close()) + + // Full reopen -- should re-trigger backfill + _ = os.Remove(filepath.Join(flatkvDir, workingDirName, snapshotBaseFile)) + s3 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err = s3.LoadVersion(0, false) + require.NoError(t, err) + defer s3.Close() + + require.Equal(t, int64(3), s3.Version()) + require.False(t, s3.needsPerDBBackfill) + verifyPerDBLtHash(t, s3) + verifyLtHashAtHeight(t, s3, 3) +} + +// Test 5: Per-DB full scan verification after restart. +func TestPerDBLtHashPersistenceAfterReopen(t *testing.T) { + dir := t.TempDir() + dbDir := filepath.Join(dir, flatkvRootDir) + + s1 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err := s1.LoadVersion(0, false) + require.NoError(t, err) + + for i := 1; i <= 10; i++ { + commitMixedState(t, s1, i) + } + verifyPerDBLtHash(t, s1) + require.NoError(t, s1.Close()) + + // Reopen and verify + s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err = s2.LoadVersion(0, false) + require.NoError(t, err) + defer s2.Close() + + require.Equal(t, int64(10), s2.Version()) + verifyPerDBLtHash(t, s2) + verifyLtHashAtHeight(t, s2, 10) + + // Also verify committed hashes match working hashes (just opened, no pending writes) + for dbDir, wh := range s2.perDBWorkingLtHash { + ch := s2.perDBCommittedLtHash[dbDir] + require.True(t, wh.Equal(ch), + "per-DB committed and working hashes should match on open for %s", dbDir) + } +} + +// Test 6: ReadOnly open does not persist backfill. +func TestPerDBLtHashReadOnlyNoPersist(t *testing.T) { + dir := t.TempDir() + dbDir := filepath.Join(dir, flatkvRootDir) + + s1 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err := s1.LoadVersion(0, false) + require.NoError(t, err) + + commitMixedState(t, s1, 1) + commitMixedState(t, s1, 2) + require.NoError(t, s1.Close()) + + simulateUpgrade(t, dbDir) + + // Open as readonly -- backfill runs in memory + s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + roStore, err := s2.LoadVersion(0, true) + require.NoError(t, err) + roCS := roStore.(*CommitStore) + verifyPerDBLtHash(t, roCS) + require.NoError(t, roStore.Close()) + + // Open as read-write -- should still need backfill since readonly didn't persist + s3 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err = s3.LoadVersion(0, false) + require.NoError(t, err) + defer s3.Close() + + require.False(t, s3.needsPerDBBackfill, "backfill should have run and persisted on RW open") + verifyPerDBLtHash(t, s3) + verifyLtHashAtHeight(t, s3, 2) +} + +// Test 7: Verify per-DB LTHash alongside global in the incremental 100-block test. +func TestPerDBLtHashIncrementalEqualsFullScan(t *testing.T) { + s := setupTestStore(t) + defer s.Close() + + // Blocks 1-10: accounts + storage + code + legacy + for i := 1; i <= 10; i++ { + addr := addrN(byte(i)) + slot := slotN(byte(i)) + legacyKey := append([]byte{0x09}, addr[:]...) + + cs1 := namedCS( + noncePair(addr, uint64(i)), + storagePair(addr, slot, []byte{byte(i), 0xAA}), + ) + cs2 := makeChangeSet(legacyKey, []byte{byte(i)}, false) + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1, cs2})) + commitAndCheck(t, s) + } + verifyPerDBLtHash(t, s) + verifyLtHashAtHeight(t, s, 10) + + // Blocks 11-15: deploy code + for i := 11; i <= 15; i++ { + addr := addrN(byte(i - 10)) + ch := codeHashN(byte(i)) + cs := namedCS( + codeHashPair(addr, ch), + codePair(addr, []byte{0x60, 0x80, byte(i)}), + ) + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs})) + commitAndCheck(t, s) + } + verifyPerDBLtHash(t, s) + + // Blocks 16-20: update storage + delete some storage + for i := 16; i <= 18; i++ { + addr := addrN(byte(i - 15)) + slot := slotN(byte(i - 15)) + cs := namedCS(storagePair(addr, slot, []byte{byte(i), 0xBB})) + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs})) + commitAndCheck(t, s) + } + for i := 19; i <= 20; i++ { + addr := addrN(byte(i - 15)) + slot := slotN(byte(i - 15)) + cs := namedCS(storageDeletePair(addr, slot)) + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs})) + commitAndCheck(t, s) + } + verifyPerDBLtHash(t, s) + verifyLtHashAtHeight(t, s, 20) +} + +// Test: sum of per-DB hashes equals global hash (homomorphic property). +func TestPerDBLtHashSumEqualsGlobal(t *testing.T) { + s := setupTestStore(t) + defer s.Close() + + for i := 1; i <= 5; i++ { + commitMixedState(t, s, i) + } + + // Compute the "sum" of all per-DB hashes + sumHash := lthash.New() + for _, dbDir := range []string{accountDBDir, codeDBDir, storageDBDir, legacyDBDir} { + sumHash.MixIn(s.perDBWorkingLtHash[dbDir]) + } + + require.True(t, s.workingLtHash.Equal(sumHash), + "sum of per-DB LtHashes should equal global LtHash:\n global: %x\n sum: %x", + s.workingLtHash.Checksum(), sumHash.Checksum()) +} + +// Test: per-DB hashes are correct after catchup with WAL replay. +func TestPerDBLtHashCatchupReplay(t *testing.T) { + dir := t.TempDir() + dbDir := filepath.Join(dir, flatkvRootDir) + + s1 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err := s1.LoadVersion(0, false) + require.NoError(t, err) + + commitMixedState(t, s1, 1) + commitMixedState(t, s1, 2) + require.NoError(t, s1.WriteSnapshot("")) + + commitMixedState(t, s1, 3) + commitMixedState(t, s1, 4) + commitMixedState(t, s1, 5) + verifyPerDBLtHash(t, s1) + + // Save expected hashes for comparison + expectedPerDB := make(map[string][32]byte, 4) + for dbDir, h := range s1.perDBWorkingLtHash { + expectedPerDB[dbDir] = h.Checksum() + } + require.NoError(t, s1.Close()) + + // Reopen: catchup from v2 snapshot through v3,v4,v5 via WAL + s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err = s2.LoadVersion(0, false) + require.NoError(t, err) + defer s2.Close() + + require.Equal(t, int64(5), s2.Version()) + for dbDir, expectedCS := range expectedPerDB { + actualCS := s2.perDBWorkingLtHash[dbDir].Checksum() + require.Equal(t, expectedCS, actualCS, + "per-DB LtHash mismatch for %s after catchup", dbDir) + } + verifyPerDBLtHash(t, s2) +} + +// Test: per-DB LtHash with empty blocks doesn't drift. +func TestPerDBLtHashEmptyBlocks(t *testing.T) { + s := setupTestStore(t) + defer s.Close() + + commitMixedState(t, s, 1) + checksums := make(map[string][32]byte) + for dbDir, h := range s.perDBWorkingLtHash { + checksums[dbDir] = h.Checksum() + } + + // 5 empty blocks + for i := 0; i < 5; i++ { + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{namedCS()})) + commitAndCheck(t, s) + } + + for dbDir, expected := range checksums { + actual := s.perDBWorkingLtHash[dbDir].Checksum() + require.Equal(t, expected, actual, + "empty blocks should not change per-DB LtHash for %s", dbDir) + } +} + +// Test: per-DB hashes after import via Importer. +func TestPerDBLtHashAfterImport(t *testing.T) { + dir := t.TempDir() + dbDir := filepath.Join(dir, flatkvRootDir) + + s := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err := s.LoadVersion(0, false) + require.NoError(t, err) + + imp, err := s.Importer(1) + require.NoError(t, err) + + for i := byte(1); i <= 5; i++ { + addr := addrN(i) + slot := slotN(i) + imp.AddNode(&scTypes.SnapshotNode{Key: storagePair(addr, slot, []byte{i, 0xAA}).Key, Value: storagePair(addr, slot, []byte{i, 0xAA}).Value, Height: 0}) + imp.AddNode(&scTypes.SnapshotNode{Key: noncePair(addr, uint64(i)).Key, Value: noncePair(addr, uint64(i)).Value, Height: 0}) + } + require.NoError(t, imp.Close()) + + verifyPerDBLtHash(t, s) + verifyLtHashAtHeight(t, s, 1) + + for dbDir, wh := range s.perDBWorkingLtHash { + ch := s.perDBCommittedLtHash[dbDir] + require.True(t, wh.Equal(ch), + "per-DB committed and working hashes should match after import for %s", dbDir) + } + require.NoError(t, s.Close()) +} + +// Test: per-DB hashes survive rollback. +func TestPerDBLtHashRollback(t *testing.T) { + dir := t.TempDir() + dbDir := filepath.Join(dir, flatkvRootDir) + + s := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err := s.LoadVersion(0, false) + require.NoError(t, err) + + commitMixedState(t, s, 1) + commitMixedState(t, s, 2) + commitMixedState(t, s, 3) + require.NoError(t, s.WriteSnapshot("")) + + commitMixedState(t, s, 4) + commitMixedState(t, s, 5) + + // Rollback to v3 + require.NoError(t, s.Rollback(3)) + require.Equal(t, int64(3), s.Version()) + verifyPerDBLtHash(t, s) + verifyLtHashAtHeight(t, s, 3) + + require.NoError(t, s.Close()) +} + +// Test: rollback on a store that was upgraded (missing per-DB keys in metadataDB). +// This exercises the backfill path inside Rollback. +func TestPerDBLtHashRollbackAfterUpgrade(t *testing.T) { + dir := t.TempDir() + dbDir := filepath.Join(dir, flatkvRootDir) + + s := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err := s.LoadVersion(0, false) + require.NoError(t, err) + + commitMixedState(t, s, 1) + commitMixedState(t, s, 2) + commitMixedState(t, s, 3) + require.NoError(t, s.WriteSnapshot("")) + + commitMixedState(t, s, 4) + commitMixedState(t, s, 5) + require.NoError(t, s.Close()) + + simulateUpgrade(t, dbDir) + + s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err = s2.LoadVersion(0, false) + require.NoError(t, err) + + require.NoError(t, s2.Rollback(3)) + require.Equal(t, int64(3), s2.Version()) + verifyPerDBLtHash(t, s2) + verifyLtHashAtHeight(t, s2, 3) + + require.NoError(t, s2.Close()) +} + +// Test: backfill persists correct LtHash to each DB's LocalMeta. +func TestPerDBLtHashBackfillUpdatesLocalMeta(t *testing.T) { + dir := t.TempDir() + dbDir := filepath.Join(dir, flatkvRootDir) + + s1 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err := s1.LoadVersion(0, false) + require.NoError(t, err) + + commitMixedState(t, s1, 1) + commitMixedState(t, s1, 2) + require.NoError(t, s1.Close()) + + simulateUpgrade(t, dbDir) + + // Reopen -- triggers backfill which should also update LocalMeta + s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err = s2.LoadVersion(0, false) + require.NoError(t, err) + defer s2.Close() + + // Verify LocalMeta has LtHash for each DB + for dbDir, meta := range s2.localMeta { + require.NotNil(t, meta.LtHash, + "LocalMeta.LtHash should be set for %s after backfill", dbDir) + expected := s2.perDBWorkingLtHash[dbDir] + require.True(t, meta.LtHash.Equal(expected), + "LocalMeta.LtHash should match working hash for %s:\n meta: %x\n working: %x", + dbDir, meta.LtHash.Checksum(), expected.Checksum()) + } +} + +// Test: per-DB keys are present in metadataDB after normal commit cycle. +func TestPerDBLtHashPersistedInMetadataDB(t *testing.T) { + dir := t.TempDir() + dbDir := filepath.Join(dir, flatkvRootDir) + + s := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err := s.LoadVersion(0, false) + require.NoError(t, err) + + commitMixedState(t, s, 1) + commitMixedState(t, s, 2) + + // Read per-DB keys directly from metadataDB + for dbDir, metaKey := range perDBLtHashKeys { + data, err := s.metadataDB.Get([]byte(metaKey)) + require.NoError(t, err, "per-DB key %s should exist in metadataDB after commit", dbDir) + h, err := lthash.Unmarshal(data) + require.NoError(t, err) + require.True(t, s.perDBCommittedLtHash[dbDir].Equal(h), + "metadataDB per-DB hash should match committed hash for %s", dbDir) + } + + require.NoError(t, s.Close()) +} + +func TestPerDBLtHashAfterDirectImport(t *testing.T) { + dir := t.TempDir() + dbDir := filepath.Join(dir, flatkvRootDir) + + s := NewCommitStore(t.Context(), dbDir, DefaultConfig()) + _, err := s.LoadVersion(0, false) + require.NoError(t, err) + + var pairs []*iavl.KVPair + for i := byte(1); i <= 10; i++ { + addr := addrN(i) + slot := slotN(i) + pairs = append(pairs, + storagePair(addr, slot, []byte{i, 0xAA}), + noncePair(addr, uint64(i)), + ) + } + + cs := &proto.NamedChangeSet{ + Name: "evm", + Changeset: iavl.ChangeSet{Pairs: pairs}, + } + require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs})) + commitAndCheck(t, s) + + verifyPerDBLtHash(t, s) + verifyLtHashAtHeight(t, s, 1) + require.NoError(t, s.Close()) +} diff --git a/sei-db/state_db/sc/flatkv/snapshot.go b/sei-db/state_db/sc/flatkv/snapshot.go index 93f0a328e3..ecf91cdb9a 100644 --- a/sei-db/state_db/sc/flatkv/snapshot.go +++ b/sei-db/state_db/sc/flatkv/snapshot.go @@ -592,6 +592,11 @@ func (s *CommitStore) Rollback(targetVersion int64) error { if err := s.catchup(targetVersion); err != nil { return fmt.Errorf("catchup after rollback: %w", err) } + if s.needsPerDBBackfill { + if err := s.backfillPerDBLtHashes(true); err != nil { + return fmt.Errorf("rollback per-DB LtHash backfill: %w", err) + } + } if s.committedVersion != targetVersion { return fmt.Errorf("rollback failed: wanted version %d but reached %d (WAL may be incomplete)", diff --git a/sei-db/state_db/sc/flatkv/store.go b/sei-db/state_db/sc/flatkv/store.go index e37cbc8437..f6afdfe284 100644 --- a/sei-db/state_db/sc/flatkv/store.go +++ b/sei-db/state_db/sc/flatkv/store.go @@ -45,10 +45,22 @@ const ( // Metadata DB keys MetaGlobalVersion = "_meta/version" // Global committed version watermark (8 bytes) MetaGlobalLtHash = "_meta/hash" // Global LtHash (2048 bytes) + MetaAccountLtHash = "_meta/hash/account" + MetaCodeLtHash = "_meta/hash/code" + MetaStorageLtHash = "_meta/hash/storage" + MetaLegacyLtHash = "_meta/hash/legacy" flatkvMeterName = "seidb_flatkv" ) +// perDBLtHashKeys maps DB dir names to their per-DB LTHash keys in metadataDB. +var perDBLtHashKeys = map[string]string{ + accountDBDir: MetaAccountLtHash, + codeDBDir: MetaCodeLtHash, + storageDBDir: MetaStorageLtHash, + legacyDBDir: MetaLegacyLtHash, +} + // pendingKVWrite tracks a buffered key-value write for code/storage DBs. type pendingKVWrite struct { key []byte // Internal DB key @@ -89,6 +101,12 @@ type CommitStore struct { committedLtHash *lthash.LtHash workingLtHash *lthash.LtHash + // Per-DB LTHash tracking. Authoritative copies live in metadataDB; + // secondary copies are written to each DB's LocalMeta for integrity verification. + perDBCommittedLtHash map[string]*lthash.LtHash + perDBWorkingLtHash map[string]*lthash.LtHash + needsPerDBBackfill bool // true when per-DB keys are absent from metadataDB (upgrade) + // Pending writes buffer // accountWrites: key = address string (20 bytes), value = AccountValue // codeWrites/storageWrites/legacyWrites: key = internal DB key string, value = raw bytes @@ -134,7 +152,19 @@ func NewCommitStore( pendingChangeSets: make([]*proto.NamedChangeSet, 0), committedLtHash: lthash.New(), workingLtHash: lthash.New(), - phaseTimer: metrics.NewPhaseTimer(meter, "seidb_main_thread"), + perDBCommittedLtHash: map[string]*lthash.LtHash{ + accountDBDir: lthash.New(), + codeDBDir: lthash.New(), + storageDBDir: lthash.New(), + legacyDBDir: lthash.New(), + }, + perDBWorkingLtHash: map[string]*lthash.LtHash{ + accountDBDir: lthash.New(), + codeDBDir: lthash.New(), + storageDBDir: lthash.New(), + legacyDBDir: lthash.New(), + }, + phaseTimer: metrics.NewPhaseTimer(meter, "seidb_main_thread"), } } @@ -271,6 +301,11 @@ func (s *CommitStore) openReadOnly(targetVersion int64) error { if err := s.catchup(targetVersion); err != nil { return fmt.Errorf("readonly catchup: %w", err) } + if s.needsPerDBBackfill { + if err := s.backfillPerDBLtHashes(false); err != nil { + return fmt.Errorf("readonly per-DB LtHash backfill: %w", err) + } + } if targetVersion > 0 && s.committedVersion != targetVersion { return fmt.Errorf("readonly version mismatch: requested %d, reached %d", @@ -299,7 +334,15 @@ func (s *CommitStore) openTo(catchupTarget int64) error { if err := s.open(); err != nil { return err } - return s.catchup(catchupTarget) + if err := s.catchup(catchupTarget); err != nil { + return err + } + if s.needsPerDBBackfill { + if err := s.backfillPerDBLtHashes(true); err != nil { + return fmt.Errorf("per-DB LtHash backfill: %w", err) + } + } + return nil } // open opens all database instances. @@ -497,6 +540,11 @@ func (s *CommitStore) loadGlobalMetadata() error { s.committedLtHash = lthash.New() s.workingLtHash = lthash.New() } + + if err := s.loadPerDBLtHashes(); err != nil { + return fmt.Errorf("failed to load per-DB LtHashes: %w", err) + } + return nil } diff --git a/sei-db/state_db/sc/flatkv/store_catchup.go b/sei-db/state_db/sc/flatkv/store_catchup.go index e0cee41ab5..1bf255238d 100644 --- a/sei-db/state_db/sc/flatkv/store_catchup.go +++ b/sei-db/state_db/sc/flatkv/store_catchup.go @@ -146,6 +146,9 @@ func (s *CommitStore) catchup(targetVersion int64) error { s.committedVersion = entry.Version s.committedLtHash = s.workingLtHash.Clone() + for dbDir, h := range s.perDBWorkingLtHash { + s.perDBCommittedLtHash[dbDir] = h.Clone() + } s.clearPendingWrites() replayed++ diff --git a/sei-db/state_db/sc/flatkv/store_meta.go b/sei-db/state_db/sc/flatkv/store_meta.go index dc5bd7f185..9fcd1a8247 100644 --- a/sei-db/state_db/sc/flatkv/store_meta.go +++ b/sei-db/state_db/sc/flatkv/store_meta.go @@ -1,6 +1,7 @@ package flatkv import ( + "bytes" "encoding/binary" "fmt" "math" @@ -55,17 +56,19 @@ func (s *CommitStore) loadGlobalLtHash() (*lthash.LtHash, error) { return lthash.Unmarshal(data) } -// commitGlobalMetadata atomically commits global version and LtHash to metadata DB. -// This is the global watermark written AFTER all per-DB commits succeed. +// commitGlobalMetadata atomically commits global version, global LtHash, +// and per-DB LtHashes to metadata DB. +// +// When needsPerDBBackfill is true (upgrade in progress), per-DB keys are +// omitted to prevent persisting wrong zero-based hashes before the full-scan +// backfill completes. func (s *CommitStore) commitGlobalMetadata(version int64, hash *lthash.LtHash) error { batch := s.metadataDB.NewBatch() defer func() { _ = batch.Close() }() - // Encode version (version should always be non-negative in practice) versionBuf := make([]byte, 8) binary.BigEndian.PutUint64(versionBuf, uint64(version)) //nolint:gosec // version is always non-negative - // Write global metadata if err := batch.Set([]byte(MetaGlobalVersion), versionBuf); err != nil { return fmt.Errorf("failed to set global version: %w", err) } @@ -75,7 +78,132 @@ func (s *CommitStore) commitGlobalMetadata(version int64, hash *lthash.LtHash) e return fmt.Errorf("failed to set global lthash: %w", err) } - // Atomic commit with fsync + if !s.needsPerDBBackfill { + for dbDir, metaKey := range perDBLtHashKeys { + if h := s.perDBCommittedLtHash[dbDir]; h != nil { + if err := batch.Set([]byte(metaKey), h.Marshal()); err != nil { + return fmt.Errorf("failed to set %s lthash: %w", dbDir, err) + } + } + } + } + return batch.Commit(types.WriteOptions{Sync: s.config.Fsync}) +} + +// loadPerDBLtHashes reads per-DB LtHashes from metadataDB. +// If any key is missing, sets needsPerDBBackfill and initializes all to zero. +func (s *CommitStore) loadPerDBLtHashes() error { + loaded := make(map[string]*lthash.LtHash, len(perDBLtHashKeys)) + var missing []string + + for dbDir, metaKey := range perDBLtHashKeys { + data, err := s.metadataDB.Get([]byte(metaKey)) + if errorutils.IsNotFound(err) { + missing = append(missing, dbDir) + continue + } + if err != nil { + return fmt.Errorf("failed to read %s lthash: %w", dbDir, err) + } + h, err := lthash.Unmarshal(data) + if err != nil { + return fmt.Errorf("failed to unmarshal %s lthash: %w", dbDir, err) + } + loaded[dbDir] = h + } + + if len(missing) > 0 { + s.needsPerDBBackfill = true + for dbDir := range perDBLtHashKeys { + s.perDBCommittedLtHash[dbDir] = lthash.New() + s.perDBWorkingLtHash[dbDir] = lthash.New() + } + logger.Info("per-DB LtHash keys missing from metadataDB, will backfill after catchup", + "missing", missing, "found", len(loaded)) + return nil + } + + for dbDir, h := range loaded { + s.perDBCommittedLtHash[dbDir] = h + s.perDBWorkingLtHash[dbDir] = h.Clone() + } + return nil +} + +// fullScanDBLtHash computes the LtHash of a single data DB by iterating +// all KV pairs (excluding the LocalMeta key at 0x00). +func fullScanDBLtHash(db types.KeyValueDB) (*lthash.LtHash, error) { + iter, err := db.NewIter(&types.IterOptions{ + LowerBound: metaKeyLowerBound(), + }) + if err != nil { + return nil, fmt.Errorf("fullScanDBLtHash: new iter: %w", err) + } + defer iter.Close() + + var pairs []lthash.KVPairWithLastValue + for iter.First(); iter.Valid(); iter.Next() { + key := bytes.Clone(iter.Key()) + value := bytes.Clone(iter.Value()) + pairs = append(pairs, lthash.KVPairWithLastValue{ + Key: key, + Value: value, + }) + } + if err := iter.Error(); err != nil { + return nil, fmt.Errorf("fullScanDBLtHash: iter error: %w", err) + } + + result, _ := lthash.ComputeLtHash(nil, pairs) + return result, nil +} + +// backfillPerDBLtHashes computes per-DB LtHashes via full scan of each data DB +// and optionally persists them to metadataDB and each DB's LocalMeta. +// Must be called AFTER catchup when all DBs are at a consistent version. +func (s *CommitStore) backfillPerDBLtHashes(persist bool) error { + dataDBs := map[string]types.KeyValueDB{ + accountDBDir: s.accountDB, + codeDBDir: s.codeDB, + storageDBDir: s.storageDB, + legacyDBDir: s.legacyDB, + } + + for dbDir, db := range dataDBs { + h, err := fullScanDBLtHash(db) + if err != nil { + return fmt.Errorf("backfill %s: %w", dbDir, err) + } + s.perDBCommittedLtHash[dbDir] = h + s.perDBWorkingLtHash[dbDir] = h.Clone() + } + + if persist { + batch := s.metadataDB.NewBatch() + defer func() { _ = batch.Close() }() + for dbDir, metaKey := range perDBLtHashKeys { + if err := batch.Set([]byte(metaKey), s.perDBCommittedLtHash[dbDir].Marshal()); err != nil { + return fmt.Errorf("backfill persist %s to metadataDB: %w", dbDir, err) + } + } + if err := batch.Commit(types.WriteOptions{Sync: s.config.Fsync}); err != nil { + return fmt.Errorf("backfill metadataDB commit: %w", err) + } + + for dbDir, db := range dataDBs { + meta := &LocalMeta{ + CommittedVersion: s.localMeta[dbDir].CommittedVersion, + LtHash: s.perDBCommittedLtHash[dbDir], + } + if err := db.Set(DBLocalMetaKey, MarshalLocalMeta(meta), types.WriteOptions{Sync: s.config.Fsync}); err != nil { + return fmt.Errorf("backfill persist %s LocalMeta: %w", dbDir, err) + } + s.localMeta[dbDir] = meta + } + } + s.needsPerDBBackfill = false + logger.Info("per-DB LtHash backfill complete", "persist", persist, "version", s.committedVersion) + return nil } diff --git a/sei-db/state_db/sc/flatkv/store_write.go b/sei-db/state_db/sc/flatkv/store_write.go index 94ed592f61..ea8be3c521 100644 --- a/sei-db/state_db/sc/flatkv/store_write.go +++ b/sei-db/state_db/sc/flatkv/store_write.go @@ -215,15 +215,29 @@ func (s *CommitStore) ApplyChangeSets(cs []*proto.NamedChangeSet) error { s.phaseTimer.SetPhase("apply_change_compute_lt_hash") - // Combine all pairs and update working LtHash - allPairs := append(storagePairs, accountPairs...) - allPairs = append(allPairs, codePairs...) - allPairs = append(allPairs, legacyPairs...) - - if len(allPairs) > 0 { - newLtHash, _ := lthash.ComputeLtHash(s.workingLtHash, allPairs) - s.workingLtHash = newLtHash + // Per-DB LTHash updates + type dbPairs struct { + dir string + pairs []lthash.KVPairWithLastValue } + for _, dp := range [4]dbPairs{ + {storageDBDir, storagePairs}, + {accountDBDir, accountPairs}, + {codeDBDir, codePairs}, + {legacyDBDir, legacyPairs}, + } { + if len(dp.pairs) > 0 { + newHash, _ := lthash.ComputeLtHash(s.perDBWorkingLtHash[dp.dir], dp.pairs) + s.perDBWorkingLtHash[dp.dir] = newHash + } + } + + // Global LTHash = sum of per-DB hashes (homomorphic property) + global := lthash.New() + for _, h := range s.perDBWorkingLtHash { + global.MixIn(h) + } + s.workingLtHash = global s.phaseTimer.SetPhase("apply_change_done") return nil @@ -258,6 +272,9 @@ func (s *CommitStore) Commit() (int64, error) { s.phaseTimer.SetPhase("commit_update_lt_hash") s.committedVersion = version s.committedLtHash = s.workingLtHash.Clone() + for dbDir, h := range s.perDBWorkingLtHash { + s.perDBCommittedLtHash[dbDir] = h.Clone() + } // Step 4: Persist global metadata to metadata DB (always every block) s.phaseTimer.SetPhase("commit_write_metadata") @@ -345,9 +362,9 @@ func (s *CommitStore) commitBatches(version int64) error { } } - // Update local meta atomically with data (same batch) newLocalMeta := &LocalMeta{ CommittedVersion: version, + LtHash: s.perDBWorkingLtHash[accountDBDir], } if err := batch.Set(DBLocalMetaKey, MarshalLocalMeta(newLocalMeta)); err != nil { return fmt.Errorf("accountDB local meta set: %w", err) @@ -373,9 +390,9 @@ func (s *CommitStore) commitBatches(version int64) error { } } - // Update local meta atomically with data (same batch) newLocalMeta := &LocalMeta{ CommittedVersion: version, + LtHash: s.perDBWorkingLtHash[codeDBDir], } if err := batch.Set(DBLocalMetaKey, MarshalLocalMeta(newLocalMeta)); err != nil { return fmt.Errorf("codeDB local meta set: %w", err) @@ -401,9 +418,9 @@ func (s *CommitStore) commitBatches(version int64) error { } } - // Update local meta atomically with data (same batch) newLocalMeta := &LocalMeta{ CommittedVersion: version, + LtHash: s.perDBWorkingLtHash[storageDBDir], } if err := batch.Set(DBLocalMetaKey, MarshalLocalMeta(newLocalMeta)); err != nil { return fmt.Errorf("storageDB local meta set: %w", err) @@ -431,6 +448,7 @@ func (s *CommitStore) commitBatches(version int64) error { newLocalMeta := &LocalMeta{ CommittedVersion: version, + LtHash: s.perDBWorkingLtHash[legacyDBDir], } if err := batch.Set(DBLocalMetaKey, MarshalLocalMeta(newLocalMeta)); err != nil { return fmt.Errorf("legacyDB local meta set: %w", err) @@ -462,9 +480,11 @@ func (s *CommitStore) commitBatches(version int64) error { } // Update in-memory local meta after all commits succeed - newLocalMeta := &LocalMeta{CommittedVersion: version} for _, p := range pending { - s.localMeta[p.dbDir] = newLocalMeta + s.localMeta[p.dbDir] = &LocalMeta{ + CommittedVersion: version, + LtHash: s.perDBWorkingLtHash[p.dbDir], + } } return nil } From a9d6c3d418dc497a2d18a55639f18e0cac91b069 Mon Sep 17 00:00:00 2001 From: blindchaser Date: Mon, 16 Mar 2026 01:29:13 -0400 Subject: [PATCH 2/6] simplfy --- sei-db/state_db/sc/flatkv/importer.go | 5 +- .../state_db/sc/flatkv/perdb_lthash_test.go | 304 ++---------------- sei-db/state_db/sc/flatkv/snapshot.go | 5 - sei-db/state_db/sc/flatkv/store.go | 54 +--- sei-db/state_db/sc/flatkv/store_catchup.go | 5 +- sei-db/state_db/sc/flatkv/store_meta.go | 132 ++------ sei-db/state_db/sc/flatkv/store_write.go | 10 +- 7 files changed, 78 insertions(+), 437 deletions(-) diff --git a/sei-db/state_db/sc/flatkv/importer.go b/sei-db/state_db/sc/flatkv/importer.go index 32530f5256..27dfb00eaa 100644 --- a/sei-db/state_db/sc/flatkv/importer.go +++ b/sei-db/state_db/sc/flatkv/importer.go @@ -75,10 +75,7 @@ func (imp *KVImporter) Close() error { } imp.store.committedVersion = imp.version - imp.store.committedLtHash = imp.store.workingLtHash.Clone() - for dbDir, h := range imp.store.perDBWorkingLtHash { - imp.store.perDBCommittedLtHash[dbDir] = h.Clone() - } + imp.store.snapshotLtHashes() if err := imp.store.commitGlobalMetadata(imp.version, imp.store.committedLtHash); err != nil { return fmt.Errorf("import global metadata: %w", err) } diff --git a/sei-db/state_db/sc/flatkv/perdb_lthash_test.go b/sei-db/state_db/sc/flatkv/perdb_lthash_test.go index b4f14366c2..3994e5c42c 100644 --- a/sei-db/state_db/sc/flatkv/perdb_lthash_test.go +++ b/sei-db/state_db/sc/flatkv/perdb_lthash_test.go @@ -1,8 +1,7 @@ package flatkv import ( - "context" - "os" + "bytes" "path/filepath" "testing" @@ -15,6 +14,31 @@ import ( "github.com/stretchr/testify/require" ) +// testFullScanDBLtHash computes the LtHash of a single data DB by iterating +// all KV pairs (excluding the LocalMeta key at 0x00). Test-only helper. +func testFullScanDBLtHash(t *testing.T, db types.KeyValueDB) *lthash.LtHash { + t.Helper() + iter, err := db.NewIter(&types.IterOptions{ + LowerBound: metaKeyLowerBound(), + }) + require.NoError(t, err) + defer iter.Close() + + var pairs []lthash.KVPairWithLastValue + for iter.First(); iter.Valid(); iter.Next() { + pairs = append(pairs, lthash.KVPairWithLastValue{ + Key: bytes.Clone(iter.Key()), + Value: bytes.Clone(iter.Value()), + }) + } + require.NoError(t, iter.Error()) + result, _ := lthash.ComputeLtHash(nil, pairs) + if result == nil { + return lthash.New() + } + return result +} + // fullScanPerDBLtHash computes LtHash for each data DB individually via full scan. func fullScanPerDBLtHash(t *testing.T, s *CommitStore) map[string]*lthash.LtHash { t.Helper() @@ -25,9 +49,7 @@ func fullScanPerDBLtHash(t *testing.T, s *CommitStore) map[string]*lthash.LtHash storageDBDir: s.storageDB, legacyDBDir: s.legacyDB, } { - h, err := fullScanDBLtHash(db) - require.NoError(t, err) - result[dbDir] = h + result[dbDir] = testFullScanDBLtHash(t, db) } return result } @@ -63,34 +85,7 @@ func commitMixedState(t *testing.T, s *CommitStore, round int) { require.NoError(t, err) } -// deletePerDBKeysFromMetadataDB simulates a pre-upgrade store by removing -// per-DB LTHash keys from metadataDB. -func deletePerDBKeysFromMetadataDB(t *testing.T, metadataDBPath string) { - t.Helper() - db, err := pebbledb.Open(context.Background(), metadataDBPath, types.OpenOptions{}, false) - require.NoError(t, err) - for _, metaKey := range perDBLtHashKeys { - _ = db.Delete([]byte(metaKey), types.WriteOptions{}) - } - require.NoError(t, db.Close()) -} - -// simulateUpgrade removes per-DB LTHash keys from both the snapshot and -// working directory's metadataDB, and removes SNAPSHOT_BASE to force re-clone. -func simulateUpgrade(t *testing.T, flatkvDir string) { - t.Helper() - snapDir, _, err := currentSnapshotDir(flatkvDir) - require.NoError(t, err) - deletePerDBKeysFromMetadataDB(t, filepath.Join(snapDir, metadataDir)) - - workingMetaPath := filepath.Join(flatkvDir, workingDirName, metadataDir) - if _, err := os.Stat(workingMetaPath); err == nil { - deletePerDBKeysFromMetadataDB(t, workingMetaPath) - } - _ = os.Remove(filepath.Join(flatkvDir, workingDirName, snapshotBaseFile)) -} - -// Test 1: Crash recovery with global/local skew -- verify per-DB LTHash +// Test: Crash recovery with global/local skew -- verify per-DB LTHash // is correct after catchup replays the skewed version. func TestPerDBLtHashSkewRecovery(t *testing.T) { dir := t.TempDir() @@ -129,131 +124,7 @@ func TestPerDBLtHashSkewRecovery(t *testing.T) { verifyLtHashAtHeight(t, s2, 2) } -// Test 2: Upgrade from old format -- delete per-DB keys from metadataDB, -// reopen, verify backfill produces correct per-DB hashes AFTER catchup. -func TestPerDBLtHashUpgradeBackfill(t *testing.T) { - dir := t.TempDir() - dbDir := filepath.Join(dir, flatkvRootDir) - - s1 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) - _, err := s1.LoadVersion(0, false) - require.NoError(t, err) - - commitMixedState(t, s1, 1) - commitMixedState(t, s1, 2) - commitMixedState(t, s1, 3) - verifyPerDBLtHash(t, s1) - require.NoError(t, s1.Close()) - - // Simulate pre-upgrade format by removing per-DB keys - simulateUpgrade(t, dbDir) - - // Reopen -- triggers backfill - s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) - _, err = s2.LoadVersion(0, false) - require.NoError(t, err) - defer s2.Close() - - require.Equal(t, int64(3), s2.Version()) - require.False(t, s2.needsPerDBBackfill, "backfill should have cleared the flag") - verifyPerDBLtHash(t, s2) - verifyLtHashAtHeight(t, s2, 3) -} - -// Test 3: Upgrade + skew combined -- old format + tampered LocalMeta version, -// verify backfill after catchup still produces correct per-DB hashes. -func TestPerDBLtHashUpgradeWithSkew(t *testing.T) { - dir := t.TempDir() - dbDir := filepath.Join(dir, flatkvRootDir) - - s1 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) - _, err := s1.LoadVersion(0, false) - require.NoError(t, err) - - commitMixedState(t, s1, 1) - commitMixedState(t, s1, 2) - require.NoError(t, s1.Close()) - - flatkvDir := dbDir - - // Simulate upgrade - simulateUpgrade(t, flatkvDir) - - // Tamper accountDB in snapshot to simulate crash skew (version 1 instead of 2) - snapDir, _, err := currentSnapshotDir(flatkvDir) - require.NoError(t, err) - accountDBPath := filepath.Join(snapDir, accountDBDir) - db, err := pebbledb.Open(t.Context(), accountDBPath, types.OpenOptions{}, false) - require.NoError(t, err) - lagMeta := &LocalMeta{CommittedVersion: 1} - require.NoError(t, db.Set(DBLocalMetaKey, MarshalLocalMeta(lagMeta), types.WriteOptions{Sync: true})) - require.NoError(t, db.Close()) - - // Reopen -- catchup replays v2 with zero per-DB hashes, then backfill scans - s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) - _, err = s2.LoadVersion(0, false) - require.NoError(t, err) - defer s2.Close() - - require.Equal(t, int64(2), s2.Version()) - verifyPerDBLtHash(t, s2) - verifyLtHashAtHeight(t, s2, 2) -} - -// Test 4: Crash between catchup metadata flush and backfill -- simulate crash -// after catchup commitGlobalMetadata but before backfill persist, reopen and -// verify per-DB keys are still absent so backfill re-triggers. -func TestPerDBLtHashCrashBetweenCatchupAndBackfill(t *testing.T) { - dir := t.TempDir() - dbDir := filepath.Join(dir, flatkvRootDir) - - s1 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) - _, err := s1.LoadVersion(0, false) - require.NoError(t, err) - - commitMixedState(t, s1, 1) - commitMixedState(t, s1, 2) - commitMixedState(t, s1, 3) - require.NoError(t, s1.Close()) - - flatkvDir := dbDir - simulateUpgrade(t, flatkvDir) - - // Open without calling backfill by doing a manual partial open: - // open() calls loadGlobalMetadata which sets needsPerDBBackfill = true. - // Then catchup runs, calling commitGlobalMetadata -- it must NOT write per-DB keys. - // We then close without running backfill to simulate a crash. - s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) - require.NoError(t, s2.open()) - require.True(t, s2.needsPerDBBackfill, "backfill flag should be set after load") - require.NoError(t, s2.catchup(0)) - require.NoError(t, s2.Close()) - - // Verify per-DB keys are still absent in the working dir's metadataDB. - // (open() cloned from snapshot which has keys deleted.) - workingMetaPath := filepath.Join(flatkvDir, workingDirName, metadataDir) - mdb, err := pebbledb.Open(t.Context(), workingMetaPath, types.OpenOptions{}, false) - require.NoError(t, err) - for dbName, metaKey := range perDBLtHashKeys { - _, getErr := mdb.Get([]byte(metaKey)) - require.Error(t, getErr, "per-DB key %s (%s) should still be absent after crash window", metaKey, dbName) - } - require.NoError(t, mdb.Close()) - - // Full reopen -- should re-trigger backfill - _ = os.Remove(filepath.Join(flatkvDir, workingDirName, snapshotBaseFile)) - s3 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) - _, err = s3.LoadVersion(0, false) - require.NoError(t, err) - defer s3.Close() - - require.Equal(t, int64(3), s3.Version()) - require.False(t, s3.needsPerDBBackfill) - verifyPerDBLtHash(t, s3) - verifyLtHashAtHeight(t, s3, 3) -} - -// Test 5: Per-DB full scan verification after restart. +// Test: Per-DB full scan verification after restart. func TestPerDBLtHashPersistenceAfterReopen(t *testing.T) { dir := t.TempDir() dbDir := filepath.Join(dir, flatkvRootDir) @@ -278,7 +149,6 @@ func TestPerDBLtHashPersistenceAfterReopen(t *testing.T) { verifyPerDBLtHash(t, s2) verifyLtHashAtHeight(t, s2, 10) - // Also verify committed hashes match working hashes (just opened, no pending writes) for dbDir, wh := range s2.perDBWorkingLtHash { ch := s2.perDBCommittedLtHash[dbDir] require.True(t, wh.Equal(ch), @@ -286,46 +156,11 @@ func TestPerDBLtHashPersistenceAfterReopen(t *testing.T) { } } -// Test 6: ReadOnly open does not persist backfill. -func TestPerDBLtHashReadOnlyNoPersist(t *testing.T) { - dir := t.TempDir() - dbDir := filepath.Join(dir, flatkvRootDir) - - s1 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) - _, err := s1.LoadVersion(0, false) - require.NoError(t, err) - - commitMixedState(t, s1, 1) - commitMixedState(t, s1, 2) - require.NoError(t, s1.Close()) - - simulateUpgrade(t, dbDir) - - // Open as readonly -- backfill runs in memory - s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) - roStore, err := s2.LoadVersion(0, true) - require.NoError(t, err) - roCS := roStore.(*CommitStore) - verifyPerDBLtHash(t, roCS) - require.NoError(t, roStore.Close()) - - // Open as read-write -- should still need backfill since readonly didn't persist - s3 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) - _, err = s3.LoadVersion(0, false) - require.NoError(t, err) - defer s3.Close() - - require.False(t, s3.needsPerDBBackfill, "backfill should have run and persisted on RW open") - verifyPerDBLtHash(t, s3) - verifyLtHashAtHeight(t, s3, 2) -} - -// Test 7: Verify per-DB LTHash alongside global in the incremental 100-block test. +// Test: Verify per-DB LTHash alongside global in the incremental multi-block test. func TestPerDBLtHashIncrementalEqualsFullScan(t *testing.T) { s := setupTestStore(t) defer s.Close() - // Blocks 1-10: accounts + storage + code + legacy for i := 1; i <= 10; i++ { addr := addrN(byte(i)) slot := slotN(byte(i)) @@ -342,7 +177,6 @@ func TestPerDBLtHashIncrementalEqualsFullScan(t *testing.T) { verifyPerDBLtHash(t, s) verifyLtHashAtHeight(t, s, 10) - // Blocks 11-15: deploy code for i := 11; i <= 15; i++ { addr := addrN(byte(i - 10)) ch := codeHashN(byte(i)) @@ -355,7 +189,6 @@ func TestPerDBLtHashIncrementalEqualsFullScan(t *testing.T) { } verifyPerDBLtHash(t, s) - // Blocks 16-20: update storage + delete some storage for i := 16; i <= 18; i++ { addr := addrN(byte(i - 15)) slot := slotN(byte(i - 15)) @@ -383,7 +216,6 @@ func TestPerDBLtHashSumEqualsGlobal(t *testing.T) { commitMixedState(t, s, i) } - // Compute the "sum" of all per-DB hashes sumHash := lthash.New() for _, dbDir := range []string{accountDBDir, codeDBDir, storageDBDir, legacyDBDir} { sumHash.MixIn(s.perDBWorkingLtHash[dbDir]) @@ -412,14 +244,12 @@ func TestPerDBLtHashCatchupReplay(t *testing.T) { commitMixedState(t, s1, 5) verifyPerDBLtHash(t, s1) - // Save expected hashes for comparison expectedPerDB := make(map[string][32]byte, 4) for dbDir, h := range s1.perDBWorkingLtHash { expectedPerDB[dbDir] = h.Checksum() } require.NoError(t, s1.Close()) - // Reopen: catchup from v2 snapshot through v3,v4,v5 via WAL s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) _, err = s2.LoadVersion(0, false) require.NoError(t, err) @@ -445,7 +275,6 @@ func TestPerDBLtHashEmptyBlocks(t *testing.T) { checksums[dbDir] = h.Checksum() } - // 5 empty blocks for i := 0; i < 5; i++ { require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{namedCS()})) commitAndCheck(t, s) @@ -473,8 +302,10 @@ func TestPerDBLtHashAfterImport(t *testing.T) { for i := byte(1); i <= 5; i++ { addr := addrN(i) slot := slotN(i) - imp.AddNode(&scTypes.SnapshotNode{Key: storagePair(addr, slot, []byte{i, 0xAA}).Key, Value: storagePair(addr, slot, []byte{i, 0xAA}).Value, Height: 0}) - imp.AddNode(&scTypes.SnapshotNode{Key: noncePair(addr, uint64(i)).Key, Value: noncePair(addr, uint64(i)).Value, Height: 0}) + sp := storagePair(addr, slot, []byte{i, 0xAA}) + np := noncePair(addr, uint64(i)) + imp.AddNode(&scTypes.SnapshotNode{Key: sp.Key, Value: sp.Value}) + imp.AddNode(&scTypes.SnapshotNode{Key: np.Key, Value: np.Value}) } require.NoError(t, imp.Close()) @@ -506,7 +337,6 @@ func TestPerDBLtHashRollback(t *testing.T) { commitMixedState(t, s, 4) commitMixedState(t, s, 5) - // Rollback to v3 require.NoError(t, s.Rollback(3)) require.Equal(t, int64(3), s.Version()) verifyPerDBLtHash(t, s) @@ -515,71 +345,6 @@ func TestPerDBLtHashRollback(t *testing.T) { require.NoError(t, s.Close()) } -// Test: rollback on a store that was upgraded (missing per-DB keys in metadataDB). -// This exercises the backfill path inside Rollback. -func TestPerDBLtHashRollbackAfterUpgrade(t *testing.T) { - dir := t.TempDir() - dbDir := filepath.Join(dir, flatkvRootDir) - - s := NewCommitStore(t.Context(), dbDir, DefaultConfig()) - _, err := s.LoadVersion(0, false) - require.NoError(t, err) - - commitMixedState(t, s, 1) - commitMixedState(t, s, 2) - commitMixedState(t, s, 3) - require.NoError(t, s.WriteSnapshot("")) - - commitMixedState(t, s, 4) - commitMixedState(t, s, 5) - require.NoError(t, s.Close()) - - simulateUpgrade(t, dbDir) - - s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) - _, err = s2.LoadVersion(0, false) - require.NoError(t, err) - - require.NoError(t, s2.Rollback(3)) - require.Equal(t, int64(3), s2.Version()) - verifyPerDBLtHash(t, s2) - verifyLtHashAtHeight(t, s2, 3) - - require.NoError(t, s2.Close()) -} - -// Test: backfill persists correct LtHash to each DB's LocalMeta. -func TestPerDBLtHashBackfillUpdatesLocalMeta(t *testing.T) { - dir := t.TempDir() - dbDir := filepath.Join(dir, flatkvRootDir) - - s1 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) - _, err := s1.LoadVersion(0, false) - require.NoError(t, err) - - commitMixedState(t, s1, 1) - commitMixedState(t, s1, 2) - require.NoError(t, s1.Close()) - - simulateUpgrade(t, dbDir) - - // Reopen -- triggers backfill which should also update LocalMeta - s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) - _, err = s2.LoadVersion(0, false) - require.NoError(t, err) - defer s2.Close() - - // Verify LocalMeta has LtHash for each DB - for dbDir, meta := range s2.localMeta { - require.NotNil(t, meta.LtHash, - "LocalMeta.LtHash should be set for %s after backfill", dbDir) - expected := s2.perDBWorkingLtHash[dbDir] - require.True(t, meta.LtHash.Equal(expected), - "LocalMeta.LtHash should match working hash for %s:\n meta: %x\n working: %x", - dbDir, meta.LtHash.Checksum(), expected.Checksum()) - } -} - // Test: per-DB keys are present in metadataDB after normal commit cycle. func TestPerDBLtHashPersistedInMetadataDB(t *testing.T) { dir := t.TempDir() @@ -592,7 +357,6 @@ func TestPerDBLtHashPersistedInMetadataDB(t *testing.T) { commitMixedState(t, s, 1) commitMixedState(t, s, 2) - // Read per-DB keys directly from metadataDB for dbDir, metaKey := range perDBLtHashKeys { data, err := s.metadataDB.Get([]byte(metaKey)) require.NoError(t, err, "per-DB key %s should exist in metadataDB after commit", dbDir) diff --git a/sei-db/state_db/sc/flatkv/snapshot.go b/sei-db/state_db/sc/flatkv/snapshot.go index ecf91cdb9a..93f0a328e3 100644 --- a/sei-db/state_db/sc/flatkv/snapshot.go +++ b/sei-db/state_db/sc/flatkv/snapshot.go @@ -592,11 +592,6 @@ func (s *CommitStore) Rollback(targetVersion int64) error { if err := s.catchup(targetVersion); err != nil { return fmt.Errorf("catchup after rollback: %w", err) } - if s.needsPerDBBackfill { - if err := s.backfillPerDBLtHashes(true); err != nil { - return fmt.Errorf("rollback per-DB LtHash backfill: %w", err) - } - } if s.committedVersion != targetVersion { return fmt.Errorf("rollback failed: wanted version %d but reached %d (WAL may be incomplete)", diff --git a/sei-db/state_db/sc/flatkv/store.go b/sei-db/state_db/sc/flatkv/store.go index f6afdfe284..ff7af02acc 100644 --- a/sei-db/state_db/sc/flatkv/store.go +++ b/sei-db/state_db/sc/flatkv/store.go @@ -105,7 +105,6 @@ type CommitStore struct { // secondary copies are written to each DB's LocalMeta for integrity verification. perDBCommittedLtHash map[string]*lthash.LtHash perDBWorkingLtHash map[string]*lthash.LtHash - needsPerDBBackfill bool // true when per-DB keys are absent from metadataDB (upgrade) // Pending writes buffer // accountWrites: key = address string (20 bytes), value = AccountValue @@ -141,30 +140,20 @@ func NewCommitStore( meter := otel.Meter(flatkvMeterName) return &CommitStore{ - ctx: ctx, - config: cfg, - dbDir: dbDir, - localMeta: make(map[string]*LocalMeta), - accountWrites: make(map[string]*pendingAccountWrite), - codeWrites: make(map[string]*pendingKVWrite), - storageWrites: make(map[string]*pendingKVWrite), - legacyWrites: make(map[string]*pendingKVWrite), - pendingChangeSets: make([]*proto.NamedChangeSet, 0), - committedLtHash: lthash.New(), - workingLtHash: lthash.New(), - perDBCommittedLtHash: map[string]*lthash.LtHash{ - accountDBDir: lthash.New(), - codeDBDir: lthash.New(), - storageDBDir: lthash.New(), - legacyDBDir: lthash.New(), - }, - perDBWorkingLtHash: map[string]*lthash.LtHash{ - accountDBDir: lthash.New(), - codeDBDir: lthash.New(), - storageDBDir: lthash.New(), - legacyDBDir: lthash.New(), - }, - phaseTimer: metrics.NewPhaseTimer(meter, "seidb_main_thread"), + ctx: ctx, + config: cfg, + dbDir: dbDir, + localMeta: make(map[string]*LocalMeta), + accountWrites: make(map[string]*pendingAccountWrite), + codeWrites: make(map[string]*pendingKVWrite), + storageWrites: make(map[string]*pendingKVWrite), + legacyWrites: make(map[string]*pendingKVWrite), + pendingChangeSets: make([]*proto.NamedChangeSet, 0), + committedLtHash: lthash.New(), + workingLtHash: lthash.New(), + perDBCommittedLtHash: newPerDBLtHashMap(), + perDBWorkingLtHash: newPerDBLtHashMap(), + phaseTimer: metrics.NewPhaseTimer(meter, "seidb_main_thread"), } } @@ -301,11 +290,6 @@ func (s *CommitStore) openReadOnly(targetVersion int64) error { if err := s.catchup(targetVersion); err != nil { return fmt.Errorf("readonly catchup: %w", err) } - if s.needsPerDBBackfill { - if err := s.backfillPerDBLtHashes(false); err != nil { - return fmt.Errorf("readonly per-DB LtHash backfill: %w", err) - } - } if targetVersion > 0 && s.committedVersion != targetVersion { return fmt.Errorf("readonly version mismatch: requested %d, reached %d", @@ -334,15 +318,7 @@ func (s *CommitStore) openTo(catchupTarget int64) error { if err := s.open(); err != nil { return err } - if err := s.catchup(catchupTarget); err != nil { - return err - } - if s.needsPerDBBackfill { - if err := s.backfillPerDBLtHashes(true); err != nil { - return fmt.Errorf("per-DB LtHash backfill: %w", err) - } - } - return nil + return s.catchup(catchupTarget) } // open opens all database instances. diff --git a/sei-db/state_db/sc/flatkv/store_catchup.go b/sei-db/state_db/sc/flatkv/store_catchup.go index 1bf255238d..ac34a18dcc 100644 --- a/sei-db/state_db/sc/flatkv/store_catchup.go +++ b/sei-db/state_db/sc/flatkv/store_catchup.go @@ -145,10 +145,7 @@ func (s *CommitStore) catchup(targetVersion int64) error { } s.committedVersion = entry.Version - s.committedLtHash = s.workingLtHash.Clone() - for dbDir, h := range s.perDBWorkingLtHash { - s.perDBCommittedLtHash[dbDir] = h.Clone() - } + s.snapshotLtHashes() s.clearPendingWrites() replayed++ diff --git a/sei-db/state_db/sc/flatkv/store_meta.go b/sei-db/state_db/sc/flatkv/store_meta.go index 9fcd1a8247..3b540918f8 100644 --- a/sei-db/state_db/sc/flatkv/store_meta.go +++ b/sei-db/state_db/sc/flatkv/store_meta.go @@ -1,7 +1,6 @@ package flatkv import ( - "bytes" "encoding/binary" "fmt" "math" @@ -58,10 +57,6 @@ func (s *CommitStore) loadGlobalLtHash() (*lthash.LtHash, error) { // commitGlobalMetadata atomically commits global version, global LtHash, // and per-DB LtHashes to metadata DB. -// -// When needsPerDBBackfill is true (upgrade in progress), per-DB keys are -// omitted to prevent persisting wrong zero-based hashes before the full-scan -// backfill completes. func (s *CommitStore) commitGlobalMetadata(version int64, hash *lthash.LtHash) error { batch := s.metadataDB.NewBatch() defer func() { _ = batch.Close() }() @@ -78,12 +73,10 @@ func (s *CommitStore) commitGlobalMetadata(version int64, hash *lthash.LtHash) e return fmt.Errorf("failed to set global lthash: %w", err) } - if !s.needsPerDBBackfill { - for dbDir, metaKey := range perDBLtHashKeys { - if h := s.perDBCommittedLtHash[dbDir]; h != nil { - if err := batch.Set([]byte(metaKey), h.Marshal()); err != nil { - return fmt.Errorf("failed to set %s lthash: %w", dbDir, err) - } + for dbDir, metaKey := range perDBLtHashKeys { + if h := s.perDBCommittedLtHash[dbDir]; h != nil { + if err := batch.Set([]byte(metaKey), h.Marshal()); err != nil { + return fmt.Errorf("failed to set %s lthash: %w", dbDir, err) } } } @@ -91,16 +84,31 @@ func (s *CommitStore) commitGlobalMetadata(version int64, hash *lthash.LtHash) e return batch.Commit(types.WriteOptions{Sync: s.config.Fsync}) } +// newPerDBLtHashMap returns a map with a fresh zero LtHash for each data DB. +func newPerDBLtHashMap() map[string]*lthash.LtHash { + m := make(map[string]*lthash.LtHash, len(perDBLtHashKeys)) + for dbDir := range perDBLtHashKeys { + m[dbDir] = lthash.New() + } + return m +} + +// snapshotLtHashes clones working hashes (global + per-DB) into committed state. +func (s *CommitStore) snapshotLtHashes() { + s.committedLtHash = s.workingLtHash.Clone() + for dbDir, h := range s.perDBWorkingLtHash { + s.perDBCommittedLtHash[dbDir] = h.Clone() + } +} + // loadPerDBLtHashes reads per-DB LtHashes from metadataDB. -// If any key is missing, sets needsPerDBBackfill and initializes all to zero. +// If a key is not found (fresh start), initializes to zero. func (s *CommitStore) loadPerDBLtHashes() error { - loaded := make(map[string]*lthash.LtHash, len(perDBLtHashKeys)) - var missing []string - for dbDir, metaKey := range perDBLtHashKeys { data, err := s.metadataDB.Get([]byte(metaKey)) if errorutils.IsNotFound(err) { - missing = append(missing, dbDir) + s.perDBCommittedLtHash[dbDir] = lthash.New() + s.perDBWorkingLtHash[dbDir] = lthash.New() continue } if err != nil { @@ -110,100 +118,8 @@ func (s *CommitStore) loadPerDBLtHashes() error { if err != nil { return fmt.Errorf("failed to unmarshal %s lthash: %w", dbDir, err) } - loaded[dbDir] = h - } - - if len(missing) > 0 { - s.needsPerDBBackfill = true - for dbDir := range perDBLtHashKeys { - s.perDBCommittedLtHash[dbDir] = lthash.New() - s.perDBWorkingLtHash[dbDir] = lthash.New() - } - logger.Info("per-DB LtHash keys missing from metadataDB, will backfill after catchup", - "missing", missing, "found", len(loaded)) - return nil - } - - for dbDir, h := range loaded { s.perDBCommittedLtHash[dbDir] = h s.perDBWorkingLtHash[dbDir] = h.Clone() } return nil } - -// fullScanDBLtHash computes the LtHash of a single data DB by iterating -// all KV pairs (excluding the LocalMeta key at 0x00). -func fullScanDBLtHash(db types.KeyValueDB) (*lthash.LtHash, error) { - iter, err := db.NewIter(&types.IterOptions{ - LowerBound: metaKeyLowerBound(), - }) - if err != nil { - return nil, fmt.Errorf("fullScanDBLtHash: new iter: %w", err) - } - defer iter.Close() - - var pairs []lthash.KVPairWithLastValue - for iter.First(); iter.Valid(); iter.Next() { - key := bytes.Clone(iter.Key()) - value := bytes.Clone(iter.Value()) - pairs = append(pairs, lthash.KVPairWithLastValue{ - Key: key, - Value: value, - }) - } - if err := iter.Error(); err != nil { - return nil, fmt.Errorf("fullScanDBLtHash: iter error: %w", err) - } - - result, _ := lthash.ComputeLtHash(nil, pairs) - return result, nil -} - -// backfillPerDBLtHashes computes per-DB LtHashes via full scan of each data DB -// and optionally persists them to metadataDB and each DB's LocalMeta. -// Must be called AFTER catchup when all DBs are at a consistent version. -func (s *CommitStore) backfillPerDBLtHashes(persist bool) error { - dataDBs := map[string]types.KeyValueDB{ - accountDBDir: s.accountDB, - codeDBDir: s.codeDB, - storageDBDir: s.storageDB, - legacyDBDir: s.legacyDB, - } - - for dbDir, db := range dataDBs { - h, err := fullScanDBLtHash(db) - if err != nil { - return fmt.Errorf("backfill %s: %w", dbDir, err) - } - s.perDBCommittedLtHash[dbDir] = h - s.perDBWorkingLtHash[dbDir] = h.Clone() - } - - if persist { - batch := s.metadataDB.NewBatch() - defer func() { _ = batch.Close() }() - for dbDir, metaKey := range perDBLtHashKeys { - if err := batch.Set([]byte(metaKey), s.perDBCommittedLtHash[dbDir].Marshal()); err != nil { - return fmt.Errorf("backfill persist %s to metadataDB: %w", dbDir, err) - } - } - if err := batch.Commit(types.WriteOptions{Sync: s.config.Fsync}); err != nil { - return fmt.Errorf("backfill metadataDB commit: %w", err) - } - - for dbDir, db := range dataDBs { - meta := &LocalMeta{ - CommittedVersion: s.localMeta[dbDir].CommittedVersion, - LtHash: s.perDBCommittedLtHash[dbDir], - } - if err := db.Set(DBLocalMetaKey, MarshalLocalMeta(meta), types.WriteOptions{Sync: s.config.Fsync}); err != nil { - return fmt.Errorf("backfill persist %s LocalMeta: %w", dbDir, err) - } - s.localMeta[dbDir] = meta - } - } - - s.needsPerDBBackfill = false - logger.Info("per-DB LtHash backfill complete", "persist", persist, "version", s.committedVersion) - return nil -} diff --git a/sei-db/state_db/sc/flatkv/store_write.go b/sei-db/state_db/sc/flatkv/store_write.go index ea8be3c521..fb4d7a5b2c 100644 --- a/sei-db/state_db/sc/flatkv/store_write.go +++ b/sei-db/state_db/sc/flatkv/store_write.go @@ -233,11 +233,10 @@ func (s *CommitStore) ApplyChangeSets(cs []*proto.NamedChangeSet) error { } // Global LTHash = sum of per-DB hashes (homomorphic property) - global := lthash.New() + s.workingLtHash.Reset() for _, h := range s.perDBWorkingLtHash { - global.MixIn(h) + s.workingLtHash.MixIn(h) } - s.workingLtHash = global s.phaseTimer.SetPhase("apply_change_done") return nil @@ -271,10 +270,7 @@ func (s *CommitStore) Commit() (int64, error) { // Step 3: Update in-memory committed state s.phaseTimer.SetPhase("commit_update_lt_hash") s.committedVersion = version - s.committedLtHash = s.workingLtHash.Clone() - for dbDir, h := range s.perDBWorkingLtHash { - s.perDBCommittedLtHash[dbDir] = h.Clone() - } + s.snapshotLtHashes() // Step 4: Persist global metadata to metadata DB (always every block) s.phaseTimer.SetPhase("commit_write_metadata") From 84e1cbec44ba6a8b0be385e6a51966b793aaeb7b Mon Sep 17 00:00:00 2001 From: blindchaser Date: Mon, 16 Mar 2026 15:32:01 -0400 Subject: [PATCH 3/6] add warning log for No lattice hash found for DB --- sei-db/state_db/sc/flatkv/store_meta.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sei-db/state_db/sc/flatkv/store_meta.go b/sei-db/state_db/sc/flatkv/store_meta.go index 3b540918f8..e66ac764e3 100644 --- a/sei-db/state_db/sc/flatkv/store_meta.go +++ b/sei-db/state_db/sc/flatkv/store_meta.go @@ -107,6 +107,7 @@ func (s *CommitStore) loadPerDBLtHashes() error { for dbDir, metaKey := range perDBLtHashKeys { data, err := s.metadataDB.Get([]byte(metaKey)) if errorutils.IsNotFound(err) { + logger.Warn("No lattice hash found for DB, initializing to fresh hash", "db", dbDir) s.perDBCommittedLtHash[dbDir] = lthash.New() s.perDBWorkingLtHash[dbDir] = lthash.New() continue From b6c9641ba85f5a127b5ab3d63e333d4ca1b24184 Mon Sep 17 00:00:00 2001 From: blindchaser Date: Tue, 17 Mar 2026 10:27:43 -0400 Subject: [PATCH 4/6] refactor(flatkv): store per-DB LtHash in LocalMeta instead of metadataDB --- sei-db/state_db/sc/flatkv/importer.go | 2 +- .../state_db/sc/flatkv/perdb_lthash_test.go | 88 +++++++++++-------- sei-db/state_db/sc/flatkv/store.go | 67 +++++++------- sei-db/state_db/sc/flatkv/store_catchup.go | 2 +- sei-db/state_db/sc/flatkv/store_meta.go | 49 ++--------- sei-db/state_db/sc/flatkv/store_write.go | 6 +- 6 files changed, 99 insertions(+), 115 deletions(-) diff --git a/sei-db/state_db/sc/flatkv/importer.go b/sei-db/state_db/sc/flatkv/importer.go index 27dfb00eaa..6dfb7eea3c 100644 --- a/sei-db/state_db/sc/flatkv/importer.go +++ b/sei-db/state_db/sc/flatkv/importer.go @@ -75,7 +75,7 @@ func (imp *KVImporter) Close() error { } imp.store.committedVersion = imp.version - imp.store.snapshotLtHashes() + imp.store.committedLtHash = imp.store.workingLtHash.Clone() if err := imp.store.commitGlobalMetadata(imp.version, imp.store.committedLtHash); err != nil { return fmt.Errorf("import global metadata: %w", err) } diff --git a/sei-db/state_db/sc/flatkv/perdb_lthash_test.go b/sei-db/state_db/sc/flatkv/perdb_lthash_test.go index 3994e5c42c..2b1317ef0f 100644 --- a/sei-db/state_db/sc/flatkv/perdb_lthash_test.go +++ b/sei-db/state_db/sc/flatkv/perdb_lthash_test.go @@ -2,6 +2,7 @@ package flatkv import ( "bytes" + "encoding/binary" "path/filepath" "testing" @@ -67,26 +68,29 @@ func verifyPerDBLtHash(t *testing.T, s *CommitStore) { } // commitMixedState applies changesets with data across all 4 DB types. -func commitMixedState(t *testing.T, s *CommitStore, round int) { +// round must be in [0, 255] since it is used as a byte to derive unique addresses/slots. +func commitMixedState(t *testing.T, s *CommitStore, round byte) { t.Helper() - addr := addrN(byte(round)) - slot := slotN(byte(round)) + addr := addrN(round) + slot := slotN(round) legacyKey := append([]byte{0x09}, addr[:]...) cs1 := namedCS( noncePair(addr, uint64(round)), - codeHashPair(addr, codeHashN(byte(round))), - codePair(addr, []byte{0x60, 0x80, byte(round)}), - storagePair(addr, slot, []byte{byte(round), 0xAA}), + codeHashPair(addr, codeHashN(round)), + codePair(addr, []byte{0x60, 0x80, round}), + storagePair(addr, slot, []byte{round, 0xAA}), ) - cs2 := makeChangeSet(legacyKey, []byte{byte(round), 0xBB}, false) + cs2 := makeChangeSet(legacyKey, []byte{round, 0xBB}, false) require.NoError(t, s.ApplyChangeSets([]*proto.NamedChangeSet{cs1, cs2})) _, err := s.Commit() require.NoError(t, err) } -// Test: Crash recovery with global/local skew -- verify per-DB LTHash -// is correct after catchup replays the skewed version. +// Test: Crash recovery where metadataDB is behind data DBs. +// Simulates a crash after commitBatches (step 2) but before +// commitGlobalMetadata (step 4) by rolling back metadataDB's +// global version. Data DBs and their LocalMeta remain at v2. func TestPerDBLtHashSkewRecovery(t *testing.T) { dir := t.TempDir() dbDir := filepath.Join(dir, flatkvRootDir) @@ -100,20 +104,21 @@ func TestPerDBLtHashSkewRecovery(t *testing.T) { verifyPerDBLtHash(t, s1) require.NoError(t, s1.Close()) - // Tamper with accountDB's LocalMeta to simulate incomplete commit - // (accountDB thinks it's at v1, but global says v2) + // Roll back metadataDB global version to 1 to simulate crash + // after commitBatches completed but before commitGlobalMetadata. flatkvDir := dbDir snapDir, _, err := currentSnapshotDir(flatkvDir) require.NoError(t, err) - accountDBPath := filepath.Join(snapDir, accountDBDir) - db, err := pebbledb.Open(t.Context(), accountDBPath, types.OpenOptions{}, false) + metaDBPath := filepath.Join(snapDir, metadataDir) + db, err := pebbledb.Open(t.Context(), metaDBPath, types.OpenOptions{}, false) require.NoError(t, err) - lagMeta := &LocalMeta{CommittedVersion: 1} - require.NoError(t, db.Set(DBLocalMetaKey, MarshalLocalMeta(lagMeta), types.WriteOptions{Sync: true})) + versionBuf := make([]byte, 8) + binary.BigEndian.PutUint64(versionBuf, 1) + require.NoError(t, db.Set([]byte(MetaGlobalVersion), versionBuf, types.WriteOptions{Sync: true})) require.NoError(t, db.Close()) - // Reopen -- should detect skew and catchup + // Reopen -- catchup should replay version 2 from WAL s2 := NewCommitStore(t.Context(), dbDir, DefaultConfig()) _, err = s2.LoadVersion(0, false) require.NoError(t, err) @@ -133,7 +138,7 @@ func TestPerDBLtHashPersistenceAfterReopen(t *testing.T) { _, err := s1.LoadVersion(0, false) require.NoError(t, err) - for i := 1; i <= 10; i++ { + for i := byte(1); i <= 10; i++ { commitMixedState(t, s1, i) } verifyPerDBLtHash(t, s1) @@ -149,10 +154,13 @@ func TestPerDBLtHashPersistenceAfterReopen(t *testing.T) { verifyPerDBLtHash(t, s2) verifyLtHashAtHeight(t, s2, 10) - for dbDir, wh := range s2.perDBWorkingLtHash { - ch := s2.perDBCommittedLtHash[dbDir] - require.True(t, wh.Equal(ch), - "per-DB committed and working hashes should match on open for %s", dbDir) + for _, dbDir := range dataDBDirs { + wh := s2.perDBWorkingLtHash[dbDir] + meta := s2.localMeta[dbDir] + require.NotNil(t, meta.LtHash, + "LocalMeta LtHash should be loaded for %s", dbDir) + require.True(t, wh.Equal(meta.LtHash), + "per-DB working hash should match LocalMeta LtHash on open for %s", dbDir) } } @@ -212,7 +220,7 @@ func TestPerDBLtHashSumEqualsGlobal(t *testing.T) { s := setupTestStore(t) defer s.Close() - for i := 1; i <= 5; i++ { + for i := byte(1); i <= 5; i++ { commitMixedState(t, s, i) } @@ -312,10 +320,13 @@ func TestPerDBLtHashAfterImport(t *testing.T) { verifyPerDBLtHash(t, s) verifyLtHashAtHeight(t, s, 1) - for dbDir, wh := range s.perDBWorkingLtHash { - ch := s.perDBCommittedLtHash[dbDir] - require.True(t, wh.Equal(ch), - "per-DB committed and working hashes should match after import for %s", dbDir) + for _, dbDir := range dataDBDirs { + wh := s.perDBWorkingLtHash[dbDir] + meta := s.localMeta[dbDir] + require.NotNil(t, meta.LtHash, + "LocalMeta LtHash should exist after import for %s", dbDir) + require.True(t, wh.Equal(meta.LtHash), + "per-DB working hash should match LocalMeta LtHash after import for %s", dbDir) } require.NoError(t, s.Close()) } @@ -345,8 +356,8 @@ func TestPerDBLtHashRollback(t *testing.T) { require.NoError(t, s.Close()) } -// Test: per-DB keys are present in metadataDB after normal commit cycle. -func TestPerDBLtHashPersistedInMetadataDB(t *testing.T) { +// Test: per-DB LtHashes are persisted in each DB's LocalMeta after normal commit cycle. +func TestPerDBLtHashPersistedInLocalMeta(t *testing.T) { dir := t.TempDir() dbDir := filepath.Join(dir, flatkvRootDir) @@ -357,13 +368,20 @@ func TestPerDBLtHashPersistedInMetadataDB(t *testing.T) { commitMixedState(t, s, 1) commitMixedState(t, s, 2) - for dbDir, metaKey := range perDBLtHashKeys { - data, err := s.metadataDB.Get([]byte(metaKey)) - require.NoError(t, err, "per-DB key %s should exist in metadataDB after commit", dbDir) - h, err := lthash.Unmarshal(data) - require.NoError(t, err) - require.True(t, s.perDBCommittedLtHash[dbDir].Equal(h), - "metadataDB per-DB hash should match committed hash for %s", dbDir) + dbInstances := map[string]types.KeyValueDB{ + accountDBDir: s.accountDB, + codeDBDir: s.codeDB, + storageDBDir: s.storageDB, + legacyDBDir: s.legacyDB, + } + for _, dbDirName := range dataDBDirs { + db := dbInstances[dbDirName] + meta, err := loadLocalMeta(db) + require.NoError(t, err, "LocalMeta should be readable for %s", dbDirName) + require.NotNil(t, meta.LtHash, + "LocalMeta LtHash should be non-nil for %s", dbDirName) + require.True(t, s.perDBWorkingLtHash[dbDirName].Equal(meta.LtHash), + "LocalMeta LtHash should match working hash for %s", dbDirName) } require.NoError(t, s.Close()) diff --git a/sei-db/state_db/sc/flatkv/store.go b/sei-db/state_db/sc/flatkv/store.go index ff7af02acc..45d1736444 100644 --- a/sei-db/state_db/sc/flatkv/store.go +++ b/sei-db/state_db/sc/flatkv/store.go @@ -45,21 +45,12 @@ const ( // Metadata DB keys MetaGlobalVersion = "_meta/version" // Global committed version watermark (8 bytes) MetaGlobalLtHash = "_meta/hash" // Global LtHash (2048 bytes) - MetaAccountLtHash = "_meta/hash/account" - MetaCodeLtHash = "_meta/hash/code" - MetaStorageLtHash = "_meta/hash/storage" - MetaLegacyLtHash = "_meta/hash/legacy" flatkvMeterName = "seidb_flatkv" ) -// perDBLtHashKeys maps DB dir names to their per-DB LTHash keys in metadataDB. -var perDBLtHashKeys = map[string]string{ - accountDBDir: MetaAccountLtHash, - codeDBDir: MetaCodeLtHash, - storageDBDir: MetaStorageLtHash, - legacyDBDir: MetaLegacyLtHash, -} +// dataDBDirs lists all data DB directory names (used for per-DB LtHash iteration). +var dataDBDirs = []string{accountDBDir, codeDBDir, storageDBDir, legacyDBDir} // pendingKVWrite tracks a buffered key-value write for code/storage DBs. type pendingKVWrite struct { @@ -101,10 +92,10 @@ type CommitStore struct { committedLtHash *lthash.LtHash workingLtHash *lthash.LtHash - // Per-DB LTHash tracking. Authoritative copies live in metadataDB; - // secondary copies are written to each DB's LocalMeta for integrity verification. - perDBCommittedLtHash map[string]*lthash.LtHash - perDBWorkingLtHash map[string]*lthash.LtHash + // Per-DB working LTHash tracking. Authoritative copies live in each + // DB's LocalMeta (atomically committed with data). On startup the + // working hashes are loaded from LocalMeta. + perDBWorkingLtHash map[string]*lthash.LtHash // Pending writes buffer // accountWrites: key = address string (20 bytes), value = AccountValue @@ -140,20 +131,19 @@ func NewCommitStore( meter := otel.Meter(flatkvMeterName) return &CommitStore{ - ctx: ctx, - config: cfg, - dbDir: dbDir, - localMeta: make(map[string]*LocalMeta), - accountWrites: make(map[string]*pendingAccountWrite), - codeWrites: make(map[string]*pendingKVWrite), - storageWrites: make(map[string]*pendingKVWrite), - legacyWrites: make(map[string]*pendingKVWrite), - pendingChangeSets: make([]*proto.NamedChangeSet, 0), - committedLtHash: lthash.New(), - workingLtHash: lthash.New(), - perDBCommittedLtHash: newPerDBLtHashMap(), - perDBWorkingLtHash: newPerDBLtHashMap(), - phaseTimer: metrics.NewPhaseTimer(meter, "seidb_main_thread"), + ctx: ctx, + config: cfg, + dbDir: dbDir, + localMeta: make(map[string]*LocalMeta), + accountWrites: make(map[string]*pendingAccountWrite), + codeWrites: make(map[string]*pendingKVWrite), + storageWrites: make(map[string]*pendingKVWrite), + legacyWrites: make(map[string]*pendingKVWrite), + pendingChangeSets: make([]*proto.NamedChangeSet, 0), + committedLtHash: lthash.New(), + workingLtHash: lthash.New(), + perDBWorkingLtHash: newPerDBLtHashMap(), + phaseTimer: metrics.NewPhaseTimer(meter, "seidb_main_thread"), } } @@ -517,8 +507,23 @@ func (s *CommitStore) loadGlobalMetadata() error { s.workingLtHash = lthash.New() } - if err := s.loadPerDBLtHashes(); err != nil { - return fmt.Errorf("failed to load per-DB LtHashes: %w", err) + // Load per-DB LtHashes from each DB's LocalMeta (already loaded in openDBs). + // If any DB's version is behind the global version (partial commit or + // corruption), lower committedVersion so catchup replays from there. + for _, dbDir := range dataDBDirs { + meta := s.localMeta[dbDir] + if meta != nil && meta.LtHash != nil { + s.perDBWorkingLtHash[dbDir] = meta.LtHash.Clone() + } else { + s.perDBWorkingLtHash[dbDir] = lthash.New() + } + if meta != nil && meta.CommittedVersion < s.committedVersion { + logger.Warn("DB LocalMeta version behind global version, will catchup", + "db", dbDir, + "localVersion", meta.CommittedVersion, + "globalVersion", s.committedVersion) + s.committedVersion = meta.CommittedVersion + } } return nil diff --git a/sei-db/state_db/sc/flatkv/store_catchup.go b/sei-db/state_db/sc/flatkv/store_catchup.go index ac34a18dcc..e0cee41ab5 100644 --- a/sei-db/state_db/sc/flatkv/store_catchup.go +++ b/sei-db/state_db/sc/flatkv/store_catchup.go @@ -145,7 +145,7 @@ func (s *CommitStore) catchup(targetVersion int64) error { } s.committedVersion = entry.Version - s.snapshotLtHashes() + s.committedLtHash = s.workingLtHash.Clone() s.clearPendingWrites() replayed++ diff --git a/sei-db/state_db/sc/flatkv/store_meta.go b/sei-db/state_db/sc/flatkv/store_meta.go index e66ac764e3..2833ec06c9 100644 --- a/sei-db/state_db/sc/flatkv/store_meta.go +++ b/sei-db/state_db/sc/flatkv/store_meta.go @@ -55,8 +55,9 @@ func (s *CommitStore) loadGlobalLtHash() (*lthash.LtHash, error) { return lthash.Unmarshal(data) } -// commitGlobalMetadata atomically commits global version, global LtHash, -// and per-DB LtHashes to metadata DB. +// commitGlobalMetadata atomically commits global version and global LtHash +// to metadata DB. Per-DB LtHashes are stored in each DB's LocalMeta +// (committed atomically with data in commitBatches). func (s *CommitStore) commitGlobalMetadata(version int64, hash *lthash.LtHash) error { batch := s.metadataDB.NewBatch() defer func() { _ = batch.Close() }() @@ -73,54 +74,14 @@ func (s *CommitStore) commitGlobalMetadata(version int64, hash *lthash.LtHash) e return fmt.Errorf("failed to set global lthash: %w", err) } - for dbDir, metaKey := range perDBLtHashKeys { - if h := s.perDBCommittedLtHash[dbDir]; h != nil { - if err := batch.Set([]byte(metaKey), h.Marshal()); err != nil { - return fmt.Errorf("failed to set %s lthash: %w", dbDir, err) - } - } - } - return batch.Commit(types.WriteOptions{Sync: s.config.Fsync}) } // newPerDBLtHashMap returns a map with a fresh zero LtHash for each data DB. func newPerDBLtHashMap() map[string]*lthash.LtHash { - m := make(map[string]*lthash.LtHash, len(perDBLtHashKeys)) - for dbDir := range perDBLtHashKeys { + m := make(map[string]*lthash.LtHash, len(dataDBDirs)) + for _, dbDir := range dataDBDirs { m[dbDir] = lthash.New() } return m } - -// snapshotLtHashes clones working hashes (global + per-DB) into committed state. -func (s *CommitStore) snapshotLtHashes() { - s.committedLtHash = s.workingLtHash.Clone() - for dbDir, h := range s.perDBWorkingLtHash { - s.perDBCommittedLtHash[dbDir] = h.Clone() - } -} - -// loadPerDBLtHashes reads per-DB LtHashes from metadataDB. -// If a key is not found (fresh start), initializes to zero. -func (s *CommitStore) loadPerDBLtHashes() error { - for dbDir, metaKey := range perDBLtHashKeys { - data, err := s.metadataDB.Get([]byte(metaKey)) - if errorutils.IsNotFound(err) { - logger.Warn("No lattice hash found for DB, initializing to fresh hash", "db", dbDir) - s.perDBCommittedLtHash[dbDir] = lthash.New() - s.perDBWorkingLtHash[dbDir] = lthash.New() - continue - } - if err != nil { - return fmt.Errorf("failed to read %s lthash: %w", dbDir, err) - } - h, err := lthash.Unmarshal(data) - if err != nil { - return fmt.Errorf("failed to unmarshal %s lthash: %w", dbDir, err) - } - s.perDBCommittedLtHash[dbDir] = h - s.perDBWorkingLtHash[dbDir] = h.Clone() - } - return nil -} diff --git a/sei-db/state_db/sc/flatkv/store_write.go b/sei-db/state_db/sc/flatkv/store_write.go index fb4d7a5b2c..a84ace8324 100644 --- a/sei-db/state_db/sc/flatkv/store_write.go +++ b/sei-db/state_db/sc/flatkv/store_write.go @@ -234,8 +234,8 @@ func (s *CommitStore) ApplyChangeSets(cs []*proto.NamedChangeSet) error { // Global LTHash = sum of per-DB hashes (homomorphic property) s.workingLtHash.Reset() - for _, h := range s.perDBWorkingLtHash { - s.workingLtHash.MixIn(h) + for _, dir := range dataDBDirs { + s.workingLtHash.MixIn(s.perDBWorkingLtHash[dir]) } s.phaseTimer.SetPhase("apply_change_done") @@ -270,7 +270,7 @@ func (s *CommitStore) Commit() (int64, error) { // Step 3: Update in-memory committed state s.phaseTimer.SetPhase("commit_update_lt_hash") s.committedVersion = version - s.snapshotLtHashes() + s.committedLtHash = s.workingLtHash.Clone() // Step 4: Persist global metadata to metadata DB (always every block) s.phaseTimer.SetPhase("commit_write_metadata") From 12db911cebd6f98902aee827501a97a0dd09f422 Mon Sep 17 00:00:00 2001 From: blindchaser Date: Wed, 18 Mar 2026 12:47:43 -0400 Subject: [PATCH 5/6] compute fresh hash and swap --- sei-db/state_db/sc/flatkv/store_write.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sei-db/state_db/sc/flatkv/store_write.go b/sei-db/state_db/sc/flatkv/store_write.go index a84ace8324..848affa509 100644 --- a/sei-db/state_db/sc/flatkv/store_write.go +++ b/sei-db/state_db/sc/flatkv/store_write.go @@ -232,11 +232,14 @@ func (s *CommitStore) ApplyChangeSets(cs []*proto.NamedChangeSet) error { } } - // Global LTHash = sum of per-DB hashes (homomorphic property) - s.workingLtHash.Reset() + // Global LTHash = sum of per-DB hashes (homomorphic property). + // Compute into a fresh hash and swap to avoid a transient empty state + // on workingLtHash (safe for future pipelining / async callers). + globalHash := lthash.New() for _, dir := range dataDBDirs { - s.workingLtHash.MixIn(s.perDBWorkingLtHash[dir]) + globalHash.MixIn(s.perDBWorkingLtHash[dir]) } + s.workingLtHash = globalHash s.phaseTimer.SetPhase("apply_change_done") return nil From 5255a87c831c6b819b6b4afe996dfb6b8a35f2c7 Mon Sep 17 00:00:00 2001 From: blindchaser Date: Wed, 18 Mar 2026 13:47:58 -0400 Subject: [PATCH 6/6] refactor(flatkv): use typed MetaKey with separate keys for per-DB metadata --- sei-db/state_db/sc/flatkv/exporter.go | 14 ++-- sei-db/state_db/sc/flatkv/iterator.go | 64 +++++++++++---- sei-db/state_db/sc/flatkv/keys.go | 76 +++++------------ sei-db/state_db/sc/flatkv/keys_test.go | 82 ++----------------- .../sc/flatkv/lthash_correctness_test.go | 7 +- .../state_db/sc/flatkv/perdb_lthash_test.go | 17 ++-- sei-db/state_db/sc/flatkv/snapshot.go | 2 +- sei-db/state_db/sc/flatkv/snapshot_test.go | 21 ++--- sei-db/state_db/sc/flatkv/store.go | 4 - sei-db/state_db/sc/flatkv/store_meta.go | 69 ++++++++++++---- sei-db/state_db/sc/flatkv/store_meta_test.go | 28 +++---- sei-db/state_db/sc/flatkv/store_write.go | 36 +++----- sei-db/state_db/sc/flatkv/store_write_test.go | 15 ++-- 13 files changed, 186 insertions(+), 249 deletions(-) diff --git a/sei-db/state_db/sc/flatkv/exporter.go b/sei-db/state_db/sc/flatkv/exporter.go index c57b51070d..60413672e6 100644 --- a/sei-db/state_db/sc/flatkv/exporter.go +++ b/sei-db/state_db/sc/flatkv/exporter.go @@ -89,6 +89,10 @@ func (e *KVExporter) Next() (interface{}, error) { continue } + if isMetaKey(e.currentIter.Key()) { + e.currentIter.Next() + continue + } key := bytes.Clone(e.currentIter.Key()) value := bytes.Clone(e.currentIter.Value()) e.currentIter.Next() @@ -123,10 +127,8 @@ func (e *KVExporter) Close() error { return nil } -// openIterForDB returns an iterator over all user data in the given DB, -// excluding internal metadata. metaKeyLowerBound() returns {0x00, 0x00} which -// skips the single-byte DBLocalMetaKey ({0x00}) while including all user keys -// (minimum 20 bytes for an EVM address). +// openIterForDB returns an iterator over all user data in the given DB. +// Metadata keys are filtered out by isMetaKey() in the iteration loop. func (e *KVExporter) openIterForDB(db exportDBKind) (dbtypes.KeyValueDBIterator, error) { var kvDB dbtypes.KeyValueDB switch db { @@ -144,9 +146,7 @@ func (e *KVExporter) openIterForDB(db exportDBKind) (dbtypes.KeyValueDBIterator, if kvDB == nil { return nil, nil } - return kvDB.NewIter(&dbtypes.IterOptions{ - LowerBound: metaKeyLowerBound(), - }) + return kvDB.NewIter(&dbtypes.IterOptions{}) } func (e *KVExporter) convertToNodes(db exportDBKind, key, value []byte) ([]*types.SnapshotNode, error) { diff --git a/sei-db/state_db/sc/flatkv/iterator.go b/sei-db/state_db/sc/flatkv/iterator.go index eeaa6ab121..6ea8557c86 100644 --- a/sei-db/state_db/sc/flatkv/iterator.go +++ b/sei-db/state_db/sc/flatkv/iterator.go @@ -1,7 +1,6 @@ package flatkv import ( - "bytes" "fmt" "github.com/sei-protocol/sei-chain/sei-db/common/evm" @@ -54,11 +53,6 @@ func newDBIterator(db types.KeyValueDB, kind evm.EVMKeyKind, start, end []byte) return &emptyIterator{} } - // Exclude metadata key (0x00) - if internalStart == nil { - internalStart = metaKeyLowerBound() - } - iter, err := db.NewIter(&types.IterOptions{ LowerBound: internalStart, UpperBound: internalEnd, @@ -79,11 +73,6 @@ func newDBIterator(db types.KeyValueDB, kind evm.EVMKeyKind, start, end []byte) func newDBPrefixIterator(db types.KeyValueDB, kind evm.EVMKeyKind, internalPrefix []byte, externalPrefix []byte) Iterator { internalEnd := PrefixEnd(internalPrefix) - // Exclude metadata key (0x00) - if internalPrefix == nil || bytes.Compare(internalPrefix, metaKeyLowerBound()) < 0 { - internalPrefix = metaKeyLowerBound() - } - iter, err := db.NewIter(&types.IterOptions{ LowerBound: internalPrefix, UpperBound: internalEnd, @@ -132,14 +121,22 @@ func (it *dbIterator) First() bool { if it.closed { return false } - return it.iter.First() + if !it.iter.First() { + return false + } + it.skipMetaForward() + return it.iter.Valid() } func (it *dbIterator) Last() bool { if it.closed { return false } - return it.iter.Last() + if !it.iter.Last() { + return false + } + it.skipMetaBackward() + return it.iter.Valid() } func (it *dbIterator) SeekGE(key []byte) bool { @@ -153,7 +150,11 @@ func (it *dbIterator) SeekGE(key []byte) bool { return false } - return it.iter.SeekGE(internalKey) + if !it.iter.SeekGE(internalKey) { + return false + } + it.skipMetaForward() + return it.iter.Valid() } func (it *dbIterator) SeekLT(key []byte) bool { @@ -167,21 +168,50 @@ func (it *dbIterator) SeekLT(key []byte) bool { return false } - return it.iter.SeekLT(internalKey) + if !it.iter.SeekLT(internalKey) { + return false + } + it.skipMetaBackward() + return it.iter.Valid() } func (it *dbIterator) Next() bool { if it.closed { return false } - return it.iter.Next() + if !it.iter.Next() { + return false + } + it.skipMetaForward() + return it.iter.Valid() } func (it *dbIterator) Prev() bool { if it.closed { return false } - return it.iter.Prev() + if !it.iter.Prev() { + return false + } + it.skipMetaBackward() + return it.iter.Valid() +} + +// skipMetaForward advances past any _meta/ keys. +// On I/O error Valid() becomes false and the loop exits; +// the caller surfaces the error via Error(). +func (it *dbIterator) skipMetaForward() { + for it.iter.Valid() && isMetaKey(it.iter.Key()) { + it.iter.Next() + } +} + +// skipMetaBackward retreats past any _meta/ keys. +// Error handling mirrors skipMetaForward. +func (it *dbIterator) skipMetaBackward() { + for it.iter.Valid() && isMetaKey(it.iter.Key()) { + it.iter.Prev() + } } func (it *dbIterator) Key() []byte { diff --git a/sei-db/state_db/sc/flatkv/keys.go b/sei-db/state_db/sc/flatkv/keys.go index 432ff5444b..070ca104f2 100644 --- a/sei-db/state_db/sc/flatkv/keys.go +++ b/sei-db/state_db/sc/flatkv/keys.go @@ -8,19 +8,28 @@ import ( "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/lthash" ) -// DBLocalMetaKey is the key for per-DB local metadata. -// It is a single-byte key (0x00), which cannot collide with any valid user key -// because all user keys have minimum length of 20 bytes (EVM address). -// -// Invariant: All user keys are >= 20 bytes (address=20, storage=52). -var DBLocalMetaKey = []byte{0x00} +const metaKeyPrefix = "_meta/" + +const ( + metaVersion = metaKeyPrefix + "version" + metaLtHash = metaKeyPrefix + "hash" +) -// metaKeyLowerBound returns the iterator lower bound that excludes DBLocalMetaKey. -// Lexicographically: 0x00 (1 byte) < 0x00,0x00 (2 bytes) < any user key (>=20 bytes). -// This ensures metadata key is excluded while all user keys (even those starting -// with 0x00) are included. -func metaKeyLowerBound() []byte { - return []byte{0x00, 0x00} +var ( + metaKeyPrefixBytes = []byte(metaKeyPrefix) + metaVersionKey = []byte(metaVersion) + metaLtHashKey = []byte(metaLtHash) +) + +// isMetaKey reports whether key is a per-DB internal metadata key (not user data). +// +// Safety: _meta/ keys are 10–13 bytes; the shortest user key is 20 bytes +// (an EVM address). Prefix collision would require an address starting with +// 0x5F6D657461 ("_meta") — probability ~2^-48 for random addresses and +// negligible even under CREATE2 brute-force. Legacy DB keys must not use +// the _meta/ prefix. +func isMetaKey(key []byte) bool { + return bytes.HasPrefix(key, metaKeyPrefixBytes) } const ( @@ -29,56 +38,15 @@ const ( SlotLen = 32 BalanceLen = 32 NonceLen = 8 - - // localMetaVersionOnly is the serialized size of the old format (version only). - localMetaVersionOnly = 8 - // localMetaWithLtHash is the serialized size with LtHash (version + 2048 bytes). - localMetaWithLtHash = localMetaVersionOnly + lthash.LtHashBytes ) // LocalMeta stores per-DB version tracking metadata. -// Stored inside each DB at DBLocalMetaKey (0x00). +// Version is stored at _meta/version, LtHash at _meta/hash. type LocalMeta struct { CommittedVersion int64 // Current committed version in this DB LtHash *lthash.LtHash // nil for old format (version-only) } -// MarshalLocalMeta encodes LocalMeta to bytes. -// If LtHash is non-nil: 8 + 2048 = 2056 bytes. Otherwise: 8 bytes (backward compat). -func MarshalLocalMeta(m *LocalMeta) []byte { - if m.LtHash != nil { - buf := make([]byte, localMetaWithLtHash) - binary.BigEndian.PutUint64(buf, uint64(m.CommittedVersion)) //nolint:gosec // version is always non-negative - m.LtHash.MarshalTo(buf[localMetaVersionOnly:]) - return buf - } - buf := make([]byte, localMetaVersionOnly) - binary.BigEndian.PutUint64(buf, uint64(m.CommittedVersion)) //nolint:gosec // version is always non-negative - return buf -} - -// UnmarshalLocalMeta decodes LocalMeta from bytes. -// Accepts 8-byte (old, LtHash=nil) and 2056-byte (new, with LtHash) formats. -func UnmarshalLocalMeta(data []byte) (*LocalMeta, error) { - switch len(data) { - case localMetaVersionOnly: - return &LocalMeta{ - CommittedVersion: int64(binary.BigEndian.Uint64(data)), //nolint:gosec // version won't exceed int64 max - }, nil - case localMetaWithLtHash: - h, err := lthash.Unmarshal(data[localMetaVersionOnly:]) - if err != nil { - return nil, fmt.Errorf("unmarshal LocalMeta LtHash: %w", err) - } - return &LocalMeta{ - CommittedVersion: int64(binary.BigEndian.Uint64(data[:localMetaVersionOnly])), //nolint:gosec // version won't exceed int64 max - LtHash: h, - }, nil - default: - return nil, fmt.Errorf("invalid LocalMeta size: got %d, want %d or %d", len(data), localMetaVersionOnly, localMetaWithLtHash) - } -} - // Address is an EVM address (20 bytes). type Address [AddressLen]byte diff --git a/sei-db/state_db/sc/flatkv/keys_test.go b/sei-db/state_db/sc/flatkv/keys_test.go index d1f0e5d8bc..ad81d5360d 100644 --- a/sei-db/state_db/sc/flatkv/keys_test.go +++ b/sei-db/state_db/sc/flatkv/keys_test.go @@ -5,7 +5,6 @@ import ( "math/rand" "testing" - "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/lthash" "github.com/stretchr/testify/require" ) @@ -238,78 +237,11 @@ func TestFlatKVTypeConversions(t *testing.T) { }) } -func TestLocalMetaSerialization(t *testing.T) { - t.Run("RoundTripZero", func(t *testing.T) { - original := &LocalMeta{CommittedVersion: 0} - encoded := MarshalLocalMeta(original) - require.Equal(t, localMetaVersionOnly, len(encoded)) - - decoded, err := UnmarshalLocalMeta(encoded) - require.NoError(t, err) - require.Equal(t, original.CommittedVersion, decoded.CommittedVersion) - require.Nil(t, decoded.LtHash) - }) - - t.Run("RoundTripPositive", func(t *testing.T) { - original := &LocalMeta{CommittedVersion: 12345} - encoded := MarshalLocalMeta(original) - require.Equal(t, localMetaVersionOnly, len(encoded)) - - decoded, err := UnmarshalLocalMeta(encoded) - require.NoError(t, err) - require.Equal(t, original.CommittedVersion, decoded.CommittedVersion) - require.Nil(t, decoded.LtHash) - }) - - t.Run("RoundTripMaxInt64", func(t *testing.T) { - original := &LocalMeta{CommittedVersion: math.MaxInt64} - encoded := MarshalLocalMeta(original) - require.Equal(t, localMetaVersionOnly, len(encoded)) - - decoded, err := UnmarshalLocalMeta(encoded) - require.NoError(t, err) - require.Equal(t, original.CommittedVersion, decoded.CommittedVersion) - require.Nil(t, decoded.LtHash) - }) - - t.Run("RoundTripWithLtHash", func(t *testing.T) { - h := lthash.New() - h.MixIn(func() *lthash.LtHash { - pairs := []lthash.KVPairWithLastValue{{Key: []byte("k"), Value: []byte("v")}} - r, _ := lthash.ComputeLtHash(nil, pairs) - return r - }()) - original := &LocalMeta{CommittedVersion: 42, LtHash: h} - encoded := MarshalLocalMeta(original) - require.Equal(t, localMetaWithLtHash, len(encoded)) - - decoded, err := UnmarshalLocalMeta(encoded) - require.NoError(t, err) - require.Equal(t, original.CommittedVersion, decoded.CommittedVersion) - require.NotNil(t, decoded.LtHash) - require.True(t, original.LtHash.Equal(decoded.LtHash)) - }) - - t.Run("InvalidLength", func(t *testing.T) { - // Too short - _, err := UnmarshalLocalMeta([]byte{0x00}) - require.Error(t, err) - require.Contains(t, err.Error(), "invalid LocalMeta size") - - // Neither old nor new format - _, err = UnmarshalLocalMeta(make([]byte, localMetaVersionOnly+1)) - require.Error(t, err) - require.Contains(t, err.Error(), "invalid LocalMeta size") - }) - - t.Run("BigEndianEncoding", func(t *testing.T) { - // Verify big-endian encoding: version 0x0102030405060708 - meta := &LocalMeta{CommittedVersion: 0x0102030405060708} - encoded := MarshalLocalMeta(meta) - - // Big-endian: most significant byte first - require.Equal(t, byte(0x01), encoded[0]) - require.Equal(t, byte(0x02), encoded[1]) - require.Equal(t, byte(0x08), encoded[7]) - }) +func TestIsMetaKey(t *testing.T) { + require.True(t, isMetaKey(metaVersionKey)) + require.True(t, isMetaKey(metaLtHashKey)) + require.True(t, isMetaKey([]byte("_meta/future"))) + require.False(t, isMetaKey([]byte{0x00})) + require.False(t, isMetaKey(AccountKey(Address{0x01}))) + require.False(t, isMetaKey(StorageKey(Address{0x01}, Slot{0x02}))) } diff --git a/sei-db/state_db/sc/flatkv/lthash_correctness_test.go b/sei-db/state_db/sc/flatkv/lthash_correctness_test.go index 96dd9aa719..1cf7e0fd3c 100644 --- a/sei-db/state_db/sc/flatkv/lthash_correctness_test.go +++ b/sei-db/state_db/sc/flatkv/lthash_correctness_test.go @@ -22,12 +22,13 @@ func fullScanLtHash(t *testing.T, s *CommitStore) *lthash.LtHash { var pairs []lthash.KVPairWithLastValue scanDB := func(db types.KeyValueDB) { - iter, err := db.NewIter(&types.IterOptions{ - LowerBound: metaKeyLowerBound(), - }) + iter, err := db.NewIter(&types.IterOptions{}) require.NoError(t, err) defer iter.Close() for iter.First(); iter.Valid(); iter.Next() { + if isMetaKey(iter.Key()) { + continue + } key := bytes.Clone(iter.Key()) value := bytes.Clone(iter.Value()) pairs = append(pairs, lthash.KVPairWithLastValue{ diff --git a/sei-db/state_db/sc/flatkv/perdb_lthash_test.go b/sei-db/state_db/sc/flatkv/perdb_lthash_test.go index 2b1317ef0f..6d22b82a16 100644 --- a/sei-db/state_db/sc/flatkv/perdb_lthash_test.go +++ b/sei-db/state_db/sc/flatkv/perdb_lthash_test.go @@ -2,7 +2,6 @@ package flatkv import ( "bytes" - "encoding/binary" "path/filepath" "testing" @@ -16,17 +15,18 @@ import ( ) // testFullScanDBLtHash computes the LtHash of a single data DB by iterating -// all KV pairs (excluding the LocalMeta key at 0x00). Test-only helper. +// all KV pairs (excluding _meta/ metadata keys). Test-only helper. func testFullScanDBLtHash(t *testing.T, db types.KeyValueDB) *lthash.LtHash { t.Helper() - iter, err := db.NewIter(&types.IterOptions{ - LowerBound: metaKeyLowerBound(), - }) + iter, err := db.NewIter(&types.IterOptions{}) require.NoError(t, err) defer iter.Close() var pairs []lthash.KVPairWithLastValue for iter.First(); iter.Valid(); iter.Next() { + if isMetaKey(iter.Key()) { + continue + } pairs = append(pairs, lthash.KVPairWithLastValue{ Key: bytes.Clone(iter.Key()), Value: bytes.Clone(iter.Value()), @@ -106,16 +106,13 @@ func TestPerDBLtHashSkewRecovery(t *testing.T) { // Roll back metadataDB global version to 1 to simulate crash // after commitBatches completed but before commitGlobalMetadata. - flatkvDir := dbDir - snapDir, _, err := currentSnapshotDir(flatkvDir) + snapDir, _, err := currentSnapshotDir(dbDir) require.NoError(t, err) metaDBPath := filepath.Join(snapDir, metadataDir) db, err := pebbledb.Open(t.Context(), metaDBPath, types.OpenOptions{}, false) require.NoError(t, err) - versionBuf := make([]byte, 8) - binary.BigEndian.PutUint64(versionBuf, 1) - require.NoError(t, db.Set([]byte(MetaGlobalVersion), versionBuf, types.WriteOptions{Sync: true})) + require.NoError(t, db.Set(metaVersionKey, versionToBytes(1), types.WriteOptions{Sync: true})) require.NoError(t, db.Close()) // Reopen -- catchup should replay version 2 from WAL diff --git a/sei-db/state_db/sc/flatkv/snapshot.go b/sei-db/state_db/sc/flatkv/snapshot.go index 93f0a328e3..d95e2b6453 100644 --- a/sei-db/state_db/sc/flatkv/snapshot.go +++ b/sei-db/state_db/sc/flatkv/snapshot.go @@ -380,7 +380,7 @@ func (s *CommitStore) migrateFlatLayout(flatkvDir string) (string, error) { var version int64 metaPath := filepath.Join(flatkvDir, metadataDir) if tmpMeta, err := pebbledb.Open(s.ctx, metaPath, types.OpenOptions{}, s.config.EnablePebbleMetrics); err == nil { - verData, verErr := tmpMeta.Get([]byte(MetaGlobalVersion)) + verData, verErr := tmpMeta.Get(metaVersionKey) _ = tmpMeta.Close() if verErr == nil && len(verData) == 8 { version = int64(binary.BigEndian.Uint64(verData)) //nolint:gosec // block height, always < MaxInt64 diff --git a/sei-db/state_db/sc/flatkv/snapshot_test.go b/sei-db/state_db/sc/flatkv/snapshot_test.go index 6178490b46..c795084162 100644 --- a/sei-db/state_db/sc/flatkv/snapshot_test.go +++ b/sei-db/state_db/sc/flatkv/snapshot_test.go @@ -314,8 +314,7 @@ func TestOpenVersionValidation(t *testing.T) { accountDBPath := filepath.Join(snapDir, accountDBDir) db, err := pebbledb.Open(t.Context(), accountDBPath, types.OpenOptions{}, false) require.NoError(t, err) - lagMeta := &LocalMeta{CommittedVersion: 1} - require.NoError(t, db.Set(DBLocalMetaKey, MarshalLocalMeta(lagMeta), types.WriteOptions{Sync: true})) + require.NoError(t, db.Set(metaVersionKey, versionToBytes(1), types.WriteOptions{Sync: true})) require.NoError(t, db.Close()) // Phase 3: reopen - should detect skew and catchup @@ -1399,13 +1398,13 @@ func TestGlobalMetadataCorruption(t *testing.T) { workingMeta := filepath.Join(dbDir, "working", metadataDir) db, err := pebbledb.Open(context.Background(), workingMeta, types.OpenOptions{}, false) require.NoError(t, err) - require.NoError(t, db.Set([]byte(MetaGlobalVersion), []byte{0xFF, 0xFF, 0xFF}, types.WriteOptions{Sync: true})) + require.NoError(t, db.Set(metaVersionKey, []byte{0xFF, 0xFF, 0xFF}, types.WriteOptions{Sync: true})) require.NoError(t, db.Close()) snapMeta := filepath.Join(dbDir, snapshotName(1), metadataDir) db2, err := pebbledb.Open(context.Background(), snapMeta, types.OpenOptions{}, false) require.NoError(t, err) - require.NoError(t, db2.Set([]byte(MetaGlobalVersion), []byte{0xFF, 0xFF, 0xFF}, types.WriteOptions{Sync: true})) + require.NoError(t, db2.Set(metaVersionKey, []byte{0xFF, 0xFF, 0xFF}, types.WriteOptions{Sync: true})) require.NoError(t, db2.Close()) _ = os.Remove(filepath.Join(dbDir, "working", snapshotBaseFile)) @@ -1461,18 +1460,18 @@ func TestLocalMetaCorruption(t *testing.T) { require.NoError(t, s.WriteSnapshot("")) require.NoError(t, s.Close()) - // Corrupt accountDB LocalMeta in working dir: write 3 garbage bytes (expected 8). + // Corrupt accountDB meta version in working dir: write 3 garbage bytes (expected 8). workingAccount := filepath.Join(dbDir, "working", accountDBDir) db, err := pebbledb.Open(context.Background(), workingAccount, types.OpenOptions{}, false) require.NoError(t, err) - require.NoError(t, db.Set(DBLocalMetaKey, []byte{0xDE, 0xAD, 0xFF}, types.WriteOptions{Sync: true})) + require.NoError(t, db.Set(metaVersionKey, []byte{0xDE, 0xAD, 0xFF}, types.WriteOptions{Sync: true})) require.NoError(t, db.Close()) // Same corruption in the snapshot dir. snapAccount := filepath.Join(dbDir, snapshotName(1), accountDBDir) db2, err := pebbledb.Open(context.Background(), snapAccount, types.OpenOptions{}, false) require.NoError(t, err) - require.NoError(t, db2.Set(DBLocalMetaKey, []byte{0xDE, 0xAD, 0xFF}, types.WriteOptions{Sync: true})) + require.NoError(t, db2.Set(metaVersionKey, []byte{0xDE, 0xAD, 0xFF}, types.WriteOptions{Sync: true})) require.NoError(t, db2.Close()) // Remove SNAPSHOT_BASE to force re-clone from corrupted snapshot. @@ -1480,8 +1479,8 @@ func TestLocalMetaCorruption(t *testing.T) { s2 := NewCommitStore(context.Background(), dbDir, DefaultConfig()) _, err = s2.LoadVersion(0, false) - require.Error(t, err, "open should fail when LocalMeta is corrupted (invalid size)") - require.Contains(t, err.Error(), "invalid LocalMeta size") + require.Error(t, err, "open should fail when meta version is corrupted") + require.Contains(t, err.Error(), "invalid meta version length") } // TestWALSegmentCorruption simulates WAL data loss caused by segment corruption. @@ -1505,9 +1504,7 @@ func TestWALSegmentCorruption(t *testing.T) { workingMeta := filepath.Join(dbDir, "working", metadataDir) mdb, err := pebbledb.Open(context.Background(), workingMeta, types.OpenOptions{}, false) require.NoError(t, err) - versionBuf := make([]byte, 8) - versionBuf[7] = 1 // version = 1 - require.NoError(t, mdb.Set([]byte(MetaGlobalVersion), versionBuf, types.WriteOptions{Sync: true})) + require.NoError(t, mdb.Set(metaVersionKey, versionToBytes(1), types.WriteOptions{Sync: true})) require.NoError(t, mdb.Close()) // Corrupt WAL segments: tidwall/wal will auto-truncate, losing all entries. diff --git a/sei-db/state_db/sc/flatkv/store.go b/sei-db/state_db/sc/flatkv/store.go index 45d1736444..c14e0a86b1 100644 --- a/sei-db/state_db/sc/flatkv/store.go +++ b/sei-db/state_db/sc/flatkv/store.go @@ -42,10 +42,6 @@ const ( readOnlyDirPrefix = "readonly-" - // Metadata DB keys - MetaGlobalVersion = "_meta/version" // Global committed version watermark (8 bytes) - MetaGlobalLtHash = "_meta/hash" // Global LtHash (2048 bytes) - flatkvMeterName = "seidb_flatkv" ) diff --git a/sei-db/state_db/sc/flatkv/store_meta.go b/sei-db/state_db/sc/flatkv/store_meta.go index 2833ec06c9..dcd6966119 100644 --- a/sei-db/state_db/sc/flatkv/store_meta.go +++ b/sei-db/state_db/sc/flatkv/store_meta.go @@ -10,22 +10,66 @@ import ( "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/flatkv/lthash" ) -// loadLocalMeta loads the local metadata from a DB, or returns default if not present. +// versionToBytes encodes a non-negative version as 8-byte big-endian. +// Panics on negative input to catch programming errors early. +// Only called from internal commit/test paths — never with untrusted input. +func versionToBytes(v int64) []byte { + if v < 0 { + panic(fmt.Sprintf("flatkv: negative version %d", v)) + } + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(v)) //nolint:gosec // guarded above + return b +} + +// loadLocalMeta loads per-DB metadata by reading separate keys. func loadLocalMeta(db types.KeyValueDB) (*LocalMeta, error) { - val, err := db.Get(DBLocalMetaKey) + meta := &LocalMeta{} + + versionData, err := db.Get(metaVersionKey) + if err != nil { + if errorutils.IsNotFound(err) { + return &LocalMeta{CommittedVersion: 0}, nil + } + return nil, fmt.Errorf("could not read meta version: %w", err) + } + if len(versionData) != 8 { + return nil, fmt.Errorf("invalid meta version length: got %d, want 8", len(versionData)) + } + meta.CommittedVersion = int64(binary.BigEndian.Uint64(versionData)) //nolint:gosec // version won't exceed int64 max + + hashData, err := db.Get(metaLtHashKey) if err != nil && !errorutils.IsNotFound(err) { - return nil, fmt.Errorf("could not get DBLocalMetaKey: %w", err) + return nil, fmt.Errorf("could not read meta hash: %w", err) } - if errorutils.IsNotFound(err) || val == nil { - return &LocalMeta{CommittedVersion: 0}, nil + if err == nil && hashData != nil { + h, err := lthash.Unmarshal(hashData) + if err != nil { + return nil, fmt.Errorf("unmarshal meta hash: %w", err) + } + meta.LtHash = h } - return UnmarshalLocalMeta(val) + + return meta, nil +} + +// writeLocalMetaToBatch writes per-DB metadata (version + LtHash) as separate keys. +func writeLocalMetaToBatch(batch types.Batch, version int64, ltHash *lthash.LtHash) error { + if err := batch.Set(metaVersionKey, versionToBytes(version)); err != nil { + return fmt.Errorf("set meta version: %w", err) + } + if ltHash != nil { + if err := batch.Set(metaLtHashKey, ltHash.Marshal()); err != nil { + return fmt.Errorf("set meta hash: %w", err) + } + } + return nil } // loadGlobalVersion reads the global committed version from metadata DB. // Returns 0 if not found (fresh start). func (s *CommitStore) loadGlobalVersion() (int64, error) { - data, err := s.metadataDB.Get([]byte(MetaGlobalVersion)) + data, err := s.metadataDB.Get(metaVersionKey) if errorutils.IsNotFound(err) { return 0, nil } @@ -45,7 +89,7 @@ func (s *CommitStore) loadGlobalVersion() (int64, error) { // loadGlobalLtHash reads the global committed LtHash from metadata DB. // Returns nil if not found (fresh start). func (s *CommitStore) loadGlobalLtHash() (*lthash.LtHash, error) { - data, err := s.metadataDB.Get([]byte(MetaGlobalLtHash)) + data, err := s.metadataDB.Get(metaLtHashKey) if errorutils.IsNotFound(err) { return nil, nil } @@ -62,15 +106,10 @@ func (s *CommitStore) commitGlobalMetadata(version int64, hash *lthash.LtHash) e batch := s.metadataDB.NewBatch() defer func() { _ = batch.Close() }() - versionBuf := make([]byte, 8) - binary.BigEndian.PutUint64(versionBuf, uint64(version)) //nolint:gosec // version is always non-negative - - if err := batch.Set([]byte(MetaGlobalVersion), versionBuf); err != nil { + if err := batch.Set(metaVersionKey, versionToBytes(version)); err != nil { return fmt.Errorf("failed to set global version: %w", err) } - - lthashBytes := hash.Marshal() - if err := batch.Set([]byte(MetaGlobalLtHash), lthashBytes); err != nil { + if err := batch.Set(metaLtHashKey, hash.Marshal()); err != nil { return fmt.Errorf("failed to set global lthash: %w", err) } diff --git a/sei-db/state_db/sc/flatkv/store_meta_test.go b/sei-db/state_db/sc/flatkv/store_meta_test.go index 425e660828..91597da338 100644 --- a/sei-db/state_db/sc/flatkv/store_meta_test.go +++ b/sei-db/state_db/sc/flatkv/store_meta_test.go @@ -2,6 +2,7 @@ package flatkv import ( "context" + "encoding/binary" "path/filepath" "testing" @@ -31,29 +32,24 @@ func TestLoadLocalMeta(t *testing.T) { db := setupTestDB(t) defer db.Close() - // Write metadata - original := &LocalMeta{CommittedVersion: 42} - err := db.Set(DBLocalMetaKey, MarshalLocalMeta(original), types.WriteOptions{}) - require.NoError(t, err) + require.NoError(t, db.Set(metaVersionKey, versionToBytes(42), types.WriteOptions{})) // Load it back loaded, err := loadLocalMeta(db) require.NoError(t, err) - require.Equal(t, original.CommittedVersion, loaded.CommittedVersion) + require.Equal(t, int64(42), loaded.CommittedVersion) + require.Nil(t, loaded.LtHash) }) - t.Run("CorruptedMeta_ReturnsError", func(t *testing.T) { + t.Run("CorruptedVersion_ReturnsError", func(t *testing.T) { db := setupTestDB(t) defer db.Close() - // Write invalid data (wrong size) - err := db.Set(DBLocalMetaKey, []byte{0x01, 0x02}, types.WriteOptions{}) - require.NoError(t, err) + require.NoError(t, db.Set(metaVersionKey, []byte{0x01, 0x02}, types.WriteOptions{})) - // Should fail to load - _, err = loadLocalMeta(db) + _, err := loadLocalMeta(db) require.Error(t, err) - require.Contains(t, err.Error(), "invalid LocalMeta size") + require.Contains(t, err.Error(), "invalid meta version length") }) } @@ -74,11 +70,9 @@ func TestStoreCommitBatchesUpdatesLocalMeta(t *testing.T) { require.Equal(t, int64(1), s.localMeta[storageDBDir].CommittedVersion) // Verify it's persisted in DB - data, err := s.storageDB.Get(DBLocalMetaKey) - require.NoError(t, err) - meta, err := UnmarshalLocalMeta(data) + data, err := s.storageDB.Get(metaVersionKey) require.NoError(t, err) - require.Equal(t, int64(1), meta.CommittedVersion) + require.Equal(t, int64(1), int64(binary.BigEndian.Uint64(data))) } func TestStoreMetadataOperations(t *testing.T) { @@ -144,7 +138,7 @@ func TestStoreMetadataOperations(t *testing.T) { defer s.Close() // Write invalid data (wrong size) - err := s.metadataDB.Set([]byte(MetaGlobalVersion), []byte{0x01}, types.WriteOptions{}) + err := s.metadataDB.Set(metaVersionKey, []byte{0x01}, types.WriteOptions{}) require.NoError(t, err) // Should return error diff --git a/sei-db/state_db/sc/flatkv/store_write.go b/sei-db/state_db/sc/flatkv/store_write.go index 848affa509..400f9a1945 100644 --- a/sei-db/state_db/sc/flatkv/store_write.go +++ b/sei-db/state_db/sc/flatkv/store_write.go @@ -361,12 +361,8 @@ func (s *CommitStore) commitBatches(version int64) error { } } - newLocalMeta := &LocalMeta{ - CommittedVersion: version, - LtHash: s.perDBWorkingLtHash[accountDBDir], - } - if err := batch.Set(DBLocalMetaKey, MarshalLocalMeta(newLocalMeta)); err != nil { - return fmt.Errorf("accountDB local meta set: %w", err) + if err := writeLocalMetaToBatch(batch, version, s.perDBWorkingLtHash[accountDBDir]); err != nil { + return fmt.Errorf("accountDB local meta: %w", err) } pending = append(pending, pendingCommit{accountDBDir, batch}) } @@ -389,12 +385,8 @@ func (s *CommitStore) commitBatches(version int64) error { } } - newLocalMeta := &LocalMeta{ - CommittedVersion: version, - LtHash: s.perDBWorkingLtHash[codeDBDir], - } - if err := batch.Set(DBLocalMetaKey, MarshalLocalMeta(newLocalMeta)); err != nil { - return fmt.Errorf("codeDB local meta set: %w", err) + if err := writeLocalMetaToBatch(batch, version, s.perDBWorkingLtHash[codeDBDir]); err != nil { + return fmt.Errorf("codeDB local meta: %w", err) } pending = append(pending, pendingCommit{codeDBDir, batch}) } @@ -417,12 +409,8 @@ func (s *CommitStore) commitBatches(version int64) error { } } - newLocalMeta := &LocalMeta{ - CommittedVersion: version, - LtHash: s.perDBWorkingLtHash[storageDBDir], - } - if err := batch.Set(DBLocalMetaKey, MarshalLocalMeta(newLocalMeta)); err != nil { - return fmt.Errorf("storageDB local meta set: %w", err) + if err := writeLocalMetaToBatch(batch, version, s.perDBWorkingLtHash[storageDBDir]); err != nil { + return fmt.Errorf("storageDB local meta: %w", err) } pending = append(pending, pendingCommit{storageDBDir, batch}) } @@ -445,12 +433,8 @@ func (s *CommitStore) commitBatches(version int64) error { } } - newLocalMeta := &LocalMeta{ - CommittedVersion: version, - LtHash: s.perDBWorkingLtHash[legacyDBDir], - } - if err := batch.Set(DBLocalMetaKey, MarshalLocalMeta(newLocalMeta)); err != nil { - return fmt.Errorf("legacyDB local meta set: %w", err) + if err := writeLocalMetaToBatch(batch, version, s.perDBWorkingLtHash[legacyDBDir]); err != nil { + return fmt.Errorf("legacyDB local meta: %w", err) } pending = append(pending, pendingCommit{legacyDBDir, batch}) } @@ -478,11 +462,11 @@ func (s *CommitStore) commitBatches(version int64) error { } } - // Update in-memory local meta after all commits succeed + // Update in-memory local meta after all commits succeed. for _, p := range pending { s.localMeta[p.dbDir] = &LocalMeta{ CommittedVersion: version, - LtHash: s.perDBWorkingLtHash[p.dbDir], + LtHash: s.perDBWorkingLtHash[p.dbDir].Clone(), } } return nil diff --git a/sei-db/state_db/sc/flatkv/store_write_test.go b/sei-db/state_db/sc/flatkv/store_write_test.go index 247fc1626a..f075c7568d 100644 --- a/sei-db/state_db/sc/flatkv/store_write_test.go +++ b/sei-db/state_db/sc/flatkv/store_write_test.go @@ -101,11 +101,9 @@ func TestStoreWriteAllDBs(t *testing.T) { "codeDB": s.codeDB, "legacyDB": s.legacyDB, } { - raw, err := db.Get(DBLocalMetaKey) - require.NoError(t, err, "%s LocalMeta read", name) - meta, err := UnmarshalLocalMeta(raw) - require.NoError(t, err) - require.Equal(t, int64(1), meta.CommittedVersion, "%s persisted LocalMeta", name) + raw, err := db.Get(metaVersionKey) + require.NoError(t, err, "%s meta version read", name) + require.Equal(t, int64(1), int64(binary.BigEndian.Uint64(raw)), "%s persisted version", name) } // Verify storage data was written @@ -1274,14 +1272,15 @@ func TestAccountValueEncodingTransition(t *testing.T) { func countLiveEntries(t *testing.T, db types.KeyValueDB) int { t.Helper() - iter, err := db.NewIter(&types.IterOptions{ - LowerBound: metaKeyLowerBound(), - }) + iter, err := db.NewIter(&types.IterOptions{}) require.NoError(t, err) defer iter.Close() count := 0 for iter.First(); iter.Valid(); iter.Next() { + if isMetaKey(iter.Key()) { + continue + } count++ } require.NoError(t, iter.Error())