-
Notifications
You must be signed in to change notification settings - Fork 872
Implement a standard cache. #3077
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7835683
950197c
cff96ab
003fcc9
a208a1b
157a600
b41639f
fe31475
4e0147e
ba471bd
9a25859
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| package metrics | ||
|
|
||
| import "github.com/sei-protocol/sei-chain/sei-db/common/unit" | ||
|
|
||
| // Shared histogram bucket boundaries for use across the codebase. | ||
| // The OTel defaults are too coarse for meaningful percentile queries in Grafana. | ||
|
|
||
| // LatencyBuckets covers 10μs to 5 minutes — wide enough for both fast key | ||
| // lookups and slow compactions/flushes without needing per-metric tuning. | ||
| var LatencyBuckets = []float64{ | ||
| 0.00001, 0.000025, 0.00005, 0.0001, 0.00025, 0.0005, // 10μs–500μs | ||
| 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, // 1ms–50ms | ||
| 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, // 100ms–5min | ||
| } | ||
|
|
||
| // ByteSizeBuckets covers 256B to 1GB for data size histograms. | ||
| var ByteSizeBuckets = []float64{ | ||
| 256, unit.KB, 4 * unit.KB, 16 * unit.KB, 64 * unit.KB, 256 * unit.KB, | ||
| unit.MB, 4 * unit.MB, 16 * unit.MB, 64 * unit.MB, 256 * unit.MB, unit.GB, | ||
| } | ||
|
|
||
| // CountBuckets covers 1 to 1M for per-operation step/iteration counts. | ||
| var CountBuckets = []float64{ | ||
| 1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 100000, 1000000, | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,194 @@ | ||
| package dbcache | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/sei-protocol/sei-chain/sei-db/common/threading" | ||
| "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" | ||
| ) | ||
|
|
||
| var _ Cache = (*cache)(nil) | ||
|
|
||
| // A standard implementation of a flatcache. | ||
| type cache struct { | ||
| ctx context.Context | ||
|
|
||
| // A utility for assigning keys to shard indices. | ||
| shardManager *shardManager | ||
|
|
||
| // The shards in the cache. | ||
| shards []*shard | ||
|
|
||
| // A pool for asynchronous reads. | ||
| readPool threading.Pool | ||
|
|
||
| // A pool for miscellaneous operations that are neither computationally intensive nor IO bound. | ||
| miscPool threading.Pool | ||
| } | ||
|
|
||
| // Creates a new Cache. If cacheName is non-empty, OTel metrics are enabled and the | ||
| // background size scrape runs every metricsScrapeInterval. | ||
| func NewStandardCache( | ||
| ctx context.Context, | ||
| // The number of shards in the cache. Must be a power of two and greater than 0. | ||
| shardCount uint64, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this fixed size? how does user choose the shardCount
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the PR that integrates this with FlatKV, I introduce configuration for FlatKV that allows for each of the 5 DBs to be individually configured, meaning that we'll be able to tune the configuration of each DB's cache for that DB's workload. That PR will follow this one. |
||
| // The maximum size of the cache, in bytes. | ||
| maxSize uint64, | ||
| // A work pool for reading from the DB. | ||
| readPool threading.Pool, | ||
| // A work pool for miscellaneous operations that are neither computationally intensive nor IO bound. | ||
| miscPool threading.Pool, | ||
| // The estimated overhead per entry, in bytes. This is used to calculate the maximum size of the cache. | ||
| // This value should be derived experimentally, and may differ between different builds and architectures. | ||
| estimatedOverheadPerEntry uint64, | ||
| // Name used as the "cache" attribute on metrics. Empty string disables metrics. | ||
| cacheName string, | ||
| // How often to scrape cache size for metrics. Ignored if cacheName is empty. | ||
| metricsScrapeInterval time.Duration, | ||
| ) (Cache, error) { | ||
| if shardCount == 0 || (shardCount&(shardCount-1)) != 0 { | ||
| return nil, ErrNumShardsNotPowerOfTwo | ||
| } | ||
| if maxSize == 0 { | ||
| return nil, fmt.Errorf("maxSize must be greater than 0") | ||
| } | ||
|
|
||
| shardManager, err := newShardManager(shardCount) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to create shard manager: %w", err) | ||
| } | ||
| sizePerShard := maxSize / shardCount | ||
| if sizePerShard == 0 { | ||
| return nil, fmt.Errorf("maxSize must be greater than shardCount") | ||
| } | ||
|
|
||
| shards := make([]*shard, shardCount) | ||
| for i := uint64(0); i < shardCount; i++ { | ||
| shards[i], err = NewShard(ctx, readPool, sizePerShard, estimatedOverheadPerEntry) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to create shard: %w", err) | ||
| } | ||
| } | ||
|
|
||
| c := &cache{ | ||
| ctx: ctx, | ||
| shardManager: shardManager, | ||
| shards: shards, | ||
| readPool: readPool, | ||
| miscPool: miscPool, | ||
| } | ||
|
|
||
| if cacheName != "" { | ||
| metrics := newCacheMetrics(ctx, cacheName, metricsScrapeInterval, c.getCacheSizeInfo) | ||
| for _, s := range c.shards { | ||
| s.metrics = metrics | ||
| } | ||
| } | ||
|
|
||
| return c, nil | ||
| } | ||
|
|
||
| func (c *cache) getCacheSizeInfo() (bytes uint64, entries uint64) { | ||
| for _, s := range c.shards { | ||
| b, e := s.getSizeInfo() | ||
| bytes += b | ||
| entries += e | ||
| } | ||
| return bytes, entries | ||
| } | ||
|
|
||
| func (c *cache) BatchSet(updates []CacheUpdate) error { | ||
| // Sort entries by shard index so each shard is locked only once. | ||
| shardMap := make(map[uint64][]CacheUpdate) | ||
| for i := range updates { | ||
| idx := c.shardManager.Shard(updates[i].Key) | ||
| shardMap[idx] = append(shardMap[idx], updates[i]) | ||
| } | ||
|
|
||
| var wg sync.WaitGroup | ||
| for shardIndex, shardEntries := range shardMap { | ||
| wg.Add(1) | ||
| err := c.miscPool.Submit(c.ctx, func() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The closure passed to miscPool.Submit calls wg.Done() directly after c.shards[shardIndex].BatchSet(shardEntries). If the shard's BatchSet panics, wg.Done() is never called, and wg.Wait() hangs forever, deadlocking the calling goroutine?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My general ethos is to treat https://gobyexample.com/range-over-iterators That being said, deferring release of the waitgroup isn't expensive, so no harm doing it. Change made. |
||
| defer wg.Done() | ||
| c.shards[shardIndex].BatchSet(shardEntries) | ||
| }) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to submit batch set: %w", err) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do we handle partial failures here, if some shards succeed, some shards fail, what should the user do?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any failure from this method is unlikely to be recoverable. If paired with a DB, failure to update the cache can result in "read your writes" consistency breaking down. From the documentation on the cache interface, the conditions under which the cache is allowed to return an error is limited: The actual place that might fail is submission to the work pool: I also added the following to the |
||
| } | ||
| } | ||
| wg.Wait() | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (c *cache) BatchGet(read Reader, keys map[string]types.BatchGetResult) error { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. semantic inconsistency between Get and BatchGet for nil-value entries Set(key, nil) stores the entry with statusAvailable and a nil value (TestCacheSetNilValue).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. Calling |
||
| work := make(map[uint64]map[string]types.BatchGetResult) | ||
| for key := range keys { | ||
| idx := c.shardManager.Shard([]byte(key)) | ||
| if work[idx] == nil { | ||
| work[idx] = make(map[string]types.BatchGetResult) | ||
| } | ||
| work[idx][key] = types.BatchGetResult{} | ||
| } | ||
|
|
||
| var wg sync.WaitGroup | ||
| for shardIndex, subMap := range work { | ||
| wg.Add(1) | ||
|
|
||
| err := c.miscPool.Submit(c.ctx, func() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to be safe, each closure needs to capture the variables explicitly via local copies. Prior to Go 1.22, range variables are reused across iterations, so all closures could end up operating on the same (final) shard, which could be a bug. Same for batchSet
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've got painful memories of that footgun. 😅 The current codebase is unable to compile with any go version before 1.22. In this code, we utilize the new iterator pattern that was added as an experimental feature in 1.22 (requiring a special experimental flag to be set at compile time), and added as a standard feature in 1.23. Meaning that it should be impossible to compile this in a way that ends up using the wrong variable capture semantics. With this in mind, would you still like me to make the change? Willing to make the change if you feel strongly on the topic.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good, if we will hit compile errors, probably OK to just keep it today |
||
| defer wg.Done() | ||
| err := c.shards[shardIndex].BatchGet(read, subMap) | ||
| if err != nil { | ||
| for key := range subMap { | ||
| subMap[key] = types.BatchGetResult{Error: err} | ||
| } | ||
| } | ||
| }) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to submit batch get: %w", err) | ||
| } | ||
| } | ||
| wg.Wait() | ||
|
|
||
| for _, subMap := range work { | ||
| for key, result := range subMap { | ||
| keys[key] = result | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (c *cache) Delete(key []byte) { | ||
| shardIndex := c.shardManager.Shard(key) | ||
| shard := c.shards[shardIndex] | ||
| shard.Delete(key) | ||
| } | ||
|
|
||
| func (c *cache) Get(read Reader, key []byte, updateLru bool) ([]byte, bool, error) { | ||
| shardIndex := c.shardManager.Shard(key) | ||
| shard := c.shards[shardIndex] | ||
|
|
||
| value, ok, err := shard.Get(read, key, updateLru) | ||
| if err != nil { | ||
| return nil, false, fmt.Errorf("failed to get value from shard: %w", err) | ||
| } | ||
| if !ok { | ||
| return nil, false, nil | ||
| } | ||
| return value, ok, nil | ||
| } | ||
|
|
||
| func (c *cache) Set(key []byte, value []byte) { | ||
| shardIndex := c.shardManager.Shard(key) | ||
| shard := c.shards[shardIndex] | ||
|
|
||
| if value == nil { | ||
| shard.Delete(key) | ||
| } else { | ||
| shard.Set(key, value) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we manage the cache context. like do we need a Close()/Shutdown() to the cache interface for collectLoop goroutine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not in this PR (since integration will happen in the next PR), but the context is managed by FlatKV, and is cancelled when FlatKV is shut down.
The cancel method above will cause the ctx used by the worker pools and by the DB caches to have their contexts cancelled, thus freeing up all threading resources in these utilities.