Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions sei-db/common/metrics/buckets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package metrics

import "github.com/sei-protocol/sei-chain/sei-db/common/unit"

// Shared histogram bucket boundaries for use across the codebase.
// The OTel defaults are too coarse for meaningful percentile queries in Grafana.

// LatencyBuckets covers 10μs to 5 minutes — wide enough for both fast key
// lookups and slow compactions/flushes without needing per-metric tuning.
var LatencyBuckets = []float64{
0.00001, 0.000025, 0.00005, 0.0001, 0.00025, 0.0005, // 10μs–500μs
0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, // 1ms–50ms
0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, // 100ms–5min
}

// ByteSizeBuckets covers 256B to 1GB for data size histograms.
var ByteSizeBuckets = []float64{
256, unit.KB, 4 * unit.KB, 16 * unit.KB, 64 * unit.KB, 256 * unit.KB,
unit.MB, 4 * unit.MB, 16 * unit.MB, 64 * unit.MB, 256 * unit.MB, unit.GB,
}

// CountBuckets covers 1 to 1M for per-operation step/iteration counts.
var CountBuckets = []float64{
1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 100000, 1000000,
}
7 changes: 7 additions & 0 deletions sei-db/common/threading/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,12 @@ type Pool interface {
// If Submit is called concurrently with or after shutdown (i.e. when ctx is done/cancelled), the task may
// be silently dropped. Callers that need a guarantee of execution must
// ensure Submit happens-before shutdown.
//
// This method is permitted to return an error only under the following conditions:
// - the pool is shutting down (i.e. its context is done/cancelled)
// - the provided ctx parameter is done/cancelled before this method returns
// - invalid input (e.g. the task is nil)
//
// If this method returns an error, the task may or may not have been executed.
Submit(ctx context.Context, task func()) error
}
48 changes: 48 additions & 0 deletions sei-db/db_engine/dbcache/cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package dbcache

import (
"context"
"fmt"
"time"

"github.com/sei-protocol/sei-chain/sei-db/common/threading"
"github.com/sei-protocol/sei-chain/sei-db/db_engine/types"
)

Expand All @@ -22,6 +27,9 @@ type Reader func(key []byte) (value []byte, found bool, err error)
// - the Reader method returns an error (for methods that accpet a Reader)
// - the cache is shutting down
// - the cache's work pools are shutting down
//
// Cache errors are are generally not recoverable, and it should be assumed that a cache that has returned an error
// is in a corrupted state, and should be discarded.
type Cache interface {

// Get returns the value for the given key, or (nil, false, nil) if not found.
Expand Down Expand Up @@ -64,6 +72,14 @@ type Cache interface {
BatchSet(updates []CacheUpdate) error
}

// DefaultEstimatedOverheadPerEntry is a rough estimate of the fixed heap overhead per cache entry
// on a 64-bit architecture (amd64/arm64). It accounts for the shardEntry struct (48 B),
// list.Element (48 B), lruQueueEntry (32 B), two map-entry costs (~64 B), string allocation
// rounding (~16 B), and a margin for the duplicate key copy stored in the LRU. Derived from
// static analysis of Go size classes and map bucket layout; validate experimentally for your
// target platform.
const DefaultEstimatedOverheadPerEntry uint64 = 250

// CacheUpdate describes a single key-value mutation to apply to the cache.
type CacheUpdate struct {
// The key to update.
Expand All @@ -76,3 +92,35 @@ type CacheUpdate struct {
func (u *CacheUpdate) IsDelete() bool {
return u.Value == nil
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we manage the cache context. like do we need a Close()/Shutdown() to the cache interface for collectLoop goroutine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not in this PR (since integration will happen in the next PR), but the context is managed by FlatKV, and is cancelled when FlatKV is shut down.

// Close closes all database instances, cancels the store's context to
// stop background goroutines (pools, caches, metrics), and releases the
// file lock.
func (s *CommitStore) Close() error {
	err := s.closeDBsOnly()
	s.cancel()

        // ...

The cancel method above will cause the ctx used by the worker pools and by the DB caches to have their contexts cancelled, thus freeing up all threading resources in these utilities.

// BuildCache creates a new Cache.
func BuildCache(
ctx context.Context,
shardCount uint64,
maxSize uint64,
readPool threading.Pool,
miscPool threading.Pool,
estimatedOverheadPerEntry uint64,
cacheName string,
metricsScrapeInterval time.Duration,
) (Cache, error) {

if maxSize == 0 {
return NewNoOpCache(), nil
}

cache, err := NewStandardCache(
ctx,
shardCount,
maxSize,
readPool,
miscPool,
estimatedOverheadPerEntry,
cacheName,
metricsScrapeInterval,
)
if err != nil {
return nil, fmt.Errorf("failed to create cache: %w", err)
}
return cache, nil
}
194 changes: 194 additions & 0 deletions sei-db/db_engine/dbcache/cache_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package dbcache

import (
"context"
"fmt"
"sync"
"time"

"github.com/sei-protocol/sei-chain/sei-db/common/threading"
"github.com/sei-protocol/sei-chain/sei-db/db_engine/types"
)

var _ Cache = (*cache)(nil)

// A standard implementation of a flatcache.
type cache struct {
ctx context.Context

// A utility for assigning keys to shard indices.
shardManager *shardManager

// The shards in the cache.
shards []*shard

// A pool for asynchronous reads.
readPool threading.Pool

// A pool for miscellaneous operations that are neither computationally intensive nor IO bound.
miscPool threading.Pool
}

// Creates a new Cache. If cacheName is non-empty, OTel metrics are enabled and the
// background size scrape runs every metricsScrapeInterval.
func NewStandardCache(
ctx context.Context,
// The number of shards in the cache. Must be a power of two and greater than 0.
shardCount uint64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this fixed size? how does user choose the shardCount

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PR that integrates this with FlatKV, I introduce configuration for FlatKV that allows for each of the 5 DBs to be individually configured, meaning that we'll be able to tune the configuration of each DB's cache for that DB's workload. That PR will follow this one.

// The maximum size of the cache, in bytes.
maxSize uint64,
// A work pool for reading from the DB.
readPool threading.Pool,
// A work pool for miscellaneous operations that are neither computationally intensive nor IO bound.
miscPool threading.Pool,
// The estimated overhead per entry, in bytes. This is used to calculate the maximum size of the cache.
// This value should be derived experimentally, and may differ between different builds and architectures.
estimatedOverheadPerEntry uint64,
// Name used as the "cache" attribute on metrics. Empty string disables metrics.
cacheName string,
// How often to scrape cache size for metrics. Ignored if cacheName is empty.
metricsScrapeInterval time.Duration,
) (Cache, error) {
if shardCount == 0 || (shardCount&(shardCount-1)) != 0 {
return nil, ErrNumShardsNotPowerOfTwo
}
if maxSize == 0 {
return nil, fmt.Errorf("maxSize must be greater than 0")
}

shardManager, err := newShardManager(shardCount)
if err != nil {
return nil, fmt.Errorf("failed to create shard manager: %w", err)
}
sizePerShard := maxSize / shardCount
if sizePerShard == 0 {
return nil, fmt.Errorf("maxSize must be greater than shardCount")
}

shards := make([]*shard, shardCount)
for i := uint64(0); i < shardCount; i++ {
shards[i], err = NewShard(ctx, readPool, sizePerShard, estimatedOverheadPerEntry)
if err != nil {
return nil, fmt.Errorf("failed to create shard: %w", err)
}
}

c := &cache{
ctx: ctx,
shardManager: shardManager,
shards: shards,
readPool: readPool,
miscPool: miscPool,
}

if cacheName != "" {
metrics := newCacheMetrics(ctx, cacheName, metricsScrapeInterval, c.getCacheSizeInfo)
for _, s := range c.shards {
s.metrics = metrics
}
}

return c, nil
}

func (c *cache) getCacheSizeInfo() (bytes uint64, entries uint64) {
for _, s := range c.shards {
b, e := s.getSizeInfo()
bytes += b
entries += e
}
return bytes, entries
}

func (c *cache) BatchSet(updates []CacheUpdate) error {
// Sort entries by shard index so each shard is locked only once.
shardMap := make(map[uint64][]CacheUpdate)
for i := range updates {
idx := c.shardManager.Shard(updates[i].Key)
shardMap[idx] = append(shardMap[idx], updates[i])
}

var wg sync.WaitGroup
for shardIndex, shardEntries := range shardMap {
wg.Add(1)
err := c.miscPool.Submit(c.ctx, func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The closure passed to miscPool.Submit calls wg.Done() directly after c.shards[shardIndex].BatchSet(shardEntries). If the shard's BatchSet panics, wg.Done() is never called, and wg.Wait() hangs forever, deadlocking the calling goroutine?

Copy link
Contributor Author

@cody-littley cody-littley Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My general ethos is to treat panic as an unrecoverable error, and to not worry too much about things like deadlocked goroutines and to just let the entire process tear itself down, since this sort of blocked goroutine doesn't stop the process from terminating. Sometimes it makes sense to catch a panic in order to do final logging/cleanup, but I think attempting to recover the go process after a panic is almost always a yucky pattern.

https://gobyexample.com/range-over-iterators

That being said, deferring release of the waitgroup isn't expensive, so no harm doing it. Change made.

defer wg.Done()
c.shards[shardIndex].BatchSet(shardEntries)
})
if err != nil {
return fmt.Errorf("failed to submit batch set: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle partial failures here, if some shards succeed, some shards fail, what should the user do?

Copy link
Contributor Author

@cody-littley cody-littley Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any failure from this method is unlikely to be recoverable. If paired with a DB, failure to update the cache can result in "read your writes" consistency breaking down.

From the documentation on the cache interface, the conditions under which the cache is allowed to return an error is limited:

// Although several methods on this interface return errors, the conditions when a cache
// is permitted to actually return an error is limited at the API level. A cache method
// may return an error under the following conditions:
// - malformed input (e.g. a nil key)
// - the Reader method returns an error (for methods that accpet a Reader)
// - the cache is shutting down
// - the cache's work pools are shutting down

The actual place that might fail is submission to the work pool: err := c.miscPool.Submit(c.ctx, ...). Pools have similar semantics, and can only return errors under very specific circumstances. This wasn't documented, so I added the following to the Pool.Submit() godoc:

	// ...
	//
	// This method is permitted to return an error only under the following conditions:
	// - the pool is shutting down (i.e. its context is done/cancelled)
	// - the provided ctx parameter is done/cancelled before this method returns
	// - invalid input (e.g. the task is nil)
	//
	// If this method returns an error, the task may or may not have been executed.
	Submit(ctx context.Context, task func()) error

I also added the following to the Cache doucmentation:

// ...
//
// Cache errors are are generally not recoverable, and it should be assumed that a cache that has returned an error
// is in a corrupted state, and should be discarded.
type Cache interface {

}
}
wg.Wait()

return nil
}

func (c *cache) BatchGet(read Reader, keys map[string]types.BatchGetResult) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

semantic inconsistency between Get and BatchGet for nil-value entries

Set(key, nil) stores the entry with statusAvailable and a nil value (TestCacheSetNilValue).

  • Get returns (nil, true, nil) — key found, value is nil.
  • BatchGet stores BatchGetResult{Value: nil}, and IsFound() returns false — key not found.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Calling Set(key, nil) is now equivalent to calling Delete(key).

work := make(map[uint64]map[string]types.BatchGetResult)
for key := range keys {
idx := c.shardManager.Shard([]byte(key))
if work[idx] == nil {
work[idx] = make(map[string]types.BatchGetResult)
}
work[idx][key] = types.BatchGetResult{}
}

var wg sync.WaitGroup
for shardIndex, subMap := range work {
wg.Add(1)

err := c.miscPool.Submit(c.ctx, func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be safe, each closure needs to capture the variables explicitly via local copies. Prior to Go 1.22, range variables are reused across iterations, so all closures could end up operating on the same (final) shard, which could be a bug.

Same for batchSet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got painful memories of that footgun. 😅

The current codebase is unable to compile with any go version before 1.22. In this code, we utilize the new iterator pattern that was added as an experimental feature in 1.22 (requiring a special experimental flag to be set at compile time), and added as a standard feature in 1.23. Meaning that it should be impossible to compile this in a way that ends up using the wrong variable capture semantics.

With this in mind, would you still like me to make the change? Willing to make the change if you feel strongly on the topic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, if we will hit compile errors, probably OK to just keep it today

defer wg.Done()
err := c.shards[shardIndex].BatchGet(read, subMap)
if err != nil {
for key := range subMap {
subMap[key] = types.BatchGetResult{Error: err}
}
}
})
if err != nil {
return fmt.Errorf("failed to submit batch get: %w", err)
}
}
wg.Wait()

for _, subMap := range work {
for key, result := range subMap {
keys[key] = result
}
}

return nil
}

func (c *cache) Delete(key []byte) {
shardIndex := c.shardManager.Shard(key)
shard := c.shards[shardIndex]
shard.Delete(key)
}

func (c *cache) Get(read Reader, key []byte, updateLru bool) ([]byte, bool, error) {
shardIndex := c.shardManager.Shard(key)
shard := c.shards[shardIndex]

value, ok, err := shard.Get(read, key, updateLru)
if err != nil {
return nil, false, fmt.Errorf("failed to get value from shard: %w", err)
}
if !ok {
return nil, false, nil
}
return value, ok, nil
}

func (c *cache) Set(key []byte, value []byte) {
shardIndex := c.shardManager.Shard(key)
shard := c.shards[shardIndex]

if value == nil {
shard.Delete(key)
} else {
shard.Set(key, value)
}
}
Loading
Loading