Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 4 additions & 20 deletions sei-db/db_engine/rocksdb/mvcc/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,7 @@ func (db *Database) Iterator(storeKey string, version int64, start, end []byte)
prefix := storePrefix(storeKey)
start, end = util.IterateWithPrefix(prefix, start, end)

readOpts, cleanup := newTSReadOptions(version)
itr := db.storage.NewIteratorCF(readOpts, db.cfHandle)
return NewRocksDBIterator(itr, cleanup, prefix, start, end, version, db.earliestVersion, false), nil
return newIterator(db.storage, db.cfHandle, prefix, start, end, version, db.earliestVersion, false), nil
}

func (db *Database) ReverseIterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) {
Expand All @@ -341,9 +339,7 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [
prefix := storePrefix(storeKey)
start, end = util.IterateWithPrefix(prefix, start, end)

readOpts, cleanup := newTSReadOptions(version)
itr := db.storage.NewIteratorCF(readOpts, db.cfHandle)
return NewRocksDBIterator(itr, cleanup, prefix, start, end, version, db.earliestVersion, true), nil
return newIterator(db.storage, db.cfHandle, prefix, start, end, version, db.earliestVersion, true), nil
}

// Import loads the initial version of the state in parallel with numWorkers goroutines
Expand Down Expand Up @@ -404,25 +400,13 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte
return false, err
}

var startTs [TimestampSize]byte
binary.LittleEndian.PutUint64(startTs[:], uint64(0))

var endTs [TimestampSize]byte
binary.LittleEndian.PutUint64(endTs[:], uint64(latestVersion))

// Set timestamp lower and upper bound to iterate over all keys in db
readOpts := grocksdb.NewDefaultReadOptions()
readOpts.SetIterStartTimestamp(startTs[:])
readOpts.SetTimestamp(endTs[:])

itr := db.storage.NewIteratorCF(readOpts, db.cfHandle)
rocksItr := NewRocksDBIterator(itr, func() { readOpts.Destroy() }, prefix, start, end, latestVersion, 1, false)
rocksItr := newRangeIterator(db.storage, db.cfHandle, prefix, start, end, 0, latestVersion, 1)
defer func() { _ = rocksItr.Close() }()

for rocksItr.Valid() {
key := rocksItr.Key()
value := rocksItr.Value()
version := int64(binary.LittleEndian.Uint64(itr.Timestamp().Data()))
version := int64(binary.LittleEndian.Uint64(rocksItr.Timestamp().Data()))

// Call callback fn
if fn(key, value, version) {
Expand Down
30 changes: 29 additions & 1 deletion sei-db/db_engine/rocksdb/mvcc/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package mvcc

import (
"bytes"
"encoding/binary"

"github.com/linxGnu/grocksdb"
"github.com/sei-protocol/sei-chain/sei-db/state_db/ss/types"
Expand All @@ -21,7 +22,30 @@ type iterator struct {
invalid bool
}

func NewRocksDBIterator(source *grocksdb.Iterator, cleanup func(), prefix, start, end []byte, version int64, earliestVersion int64, reverse bool) *iterator {
// newIterator creates a versioned iterator. ReadOptions are created and owned
// internally — destroyed when Close() is called.
func newIterator(storage *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle, prefix, start, end []byte, version int64, earliestVersion int64, reverse bool) *iterator {
readOpts, cleanup := newTSReadOptions(version)
source := storage.NewIteratorCF(readOpts, cfHandle)
return buildIterator(source, cleanup, prefix, start, end, version, earliestVersion, reverse)
}

// newRangeIterator creates an iterator over a version range. ReadOptions are
// created and owned internally — destroyed when Close() is called.
func newRangeIterator(storage *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle, prefix, start, end []byte, startVersion, endVersion int64, earliestVersion int64) *iterator {
readOpts := grocksdb.NewDefaultReadOptions()
var startTs [TimestampSize]byte
binary.LittleEndian.PutUint64(startTs[:], uint64(startVersion))
var endTs [TimestampSize]byte
binary.LittleEndian.PutUint64(endTs[:], uint64(endVersion))
readOpts.SetIterStartTimestamp(startTs[:])
readOpts.SetTimestamp(endTs[:])

source := storage.NewIteratorCF(readOpts, cfHandle)
return buildIterator(source, func() { readOpts.Destroy() }, prefix, start, end, endVersion, earliestVersion, false)
}

func buildIterator(source *grocksdb.Iterator, cleanup func(), prefix, start, end []byte, version int64, earliestVersion int64, reverse bool) *iterator {
// Return invalid iterator if requested iterator height is lower than earliest version after pruning
if version < earliestVersion {
return &iterator{
Expand Down Expand Up @@ -141,6 +165,10 @@ func (itr *iterator) Value() []byte {
return copyAndFreeSlice(itr.source.Value())
}

func (itr *iterator) Timestamp() *grocksdb.Slice {
return itr.source.Timestamp()
}

func (itr iterator) Next() {
if itr.invalid {
return
Expand Down
Loading