feat: add S3-compatible storage backend#32
Conversation
Add an S3Store implementation that batches multiple heights into single S3 objects (chunks) to reduce object count and request costs. Writes are buffered in memory and flushed at checkpoint boundaries (SetSyncState). - New S3Config with bucket, prefix, region, endpoint, chunk_size - Store factory in main.go (sqlite/s3 switch) - 13 mock-based unit tests covering round-trips, chunk boundaries, buffer read-through, idempotent writes, commitment index lookups - Config validation for S3-specific required fields Also includes profiling support, backfill progress tracking, and metrics formatting fixes from the working tree. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
📝 WalkthroughWalkthroughAdds pprof profiling server support, an S3 storage backend alongside SQLite, expanded storage/config validation, backfill-stage metrics and wiring, hex 0x-prefix stripping for CLI hex inputs, and refactors main startup to centralize profiling, data-source, and store initialization. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Run as runIndexer
participant Profile as profile.Server
participant StoreFactory as openStore
participant DataSrcFactory as openDataSource
participant Coordinator as Coordinator
Run->>Profile: setupProfiling(cfg)
alt profiling enabled
Profile-->>Run: *Server
Run->>Profile: Start()
end
Run->>StoreFactory: openStore(ctx, cfg)
StoreFactory-->>Run: Store (SQLite or S3)
Run->>DataSrcFactory: openDataSource(ctx, cfg)
DataSrcFactory-->>Run: DataSource + ProofForwarder
Run->>Coordinator: New Coordinator(store, datasource, metrics)
Coordinator-->>Run: start coordination/backfill/stream
Run->>Profile: on shutdown -> Shutdown(ctx)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances Apex's data persistence capabilities by introducing an S3-compatible storage backend, offering a scalable alternative to the existing SQLite database. It also improves observability with new profiling endpoints and detailed backfill progress metrics, making the system more robust and easier to monitor. The changes include updates to configuration, core application logic for storage selection, and new utility functions for data handling. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (7)
pkg/store/s3.go (3)
296-317:GetBlobsperforms unbounded in-memory accumulation.For a large height range spanning many chunks,
fetchBlobChunksdownloads and deserializes all matching S3 objects into memory before applyingoffset/limit. If the caller requests a small page from a large range, this is wasteful and could OOM. Worth noting as a known limitation or adding chunk-level short-circuiting in the future.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/store/s3.go` around lines 296 - 317, GetBlobs currently accumulates all matching S3 objects in memory (via fetchBlobChunks) then applies offset/limit, which can OOM for large ranges; modify the flow so fetchBlobChunks supports early termination by taking offset and limit (or a targetCount = offset+limit) and yields or returns blobs incrementally, and short-circuit once enough blobs have been collected (respecting collectBufferedBlobs and deduplicateBlobs interactions), so applyOffsetLimit receives at most the required window; update S3Store.GetBlobs to pass offset/limit (or targetCount) into fetchBlobChunks and stop fetching more S3 chunks when the required number of post-deduplication blobs is reached.
92-150: Significant code duplication betweenNewS3StoreandNewS3StoreWithStaticCredentials.Both constructors share identical logic for endpoint options, chunk-size defaulting, and
newS3Storewiring. Consider extracting the shared setup into a helper.Sketch: extract shared logic
+func buildS3Client(ctx context.Context, cfg *config.S3Config, extraOpts ...func(*awsconfig.LoadOptions) error) (s3Client, uint64, error) { + opts := append([]func(*awsconfig.LoadOptions) error{}, extraOpts...) + if cfg.Region != "" { + opts = append(opts, awsconfig.WithRegion(cfg.Region)) + } + awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...) + if err != nil { + return nil, 0, fmt.Errorf("load AWS config: %w", err) + } + var s3Opts []func(*s3.Options) + if cfg.Endpoint != "" { + s3Opts = append(s3Opts, func(o *s3.Options) { + o.BaseEndpoint = aws.String(cfg.Endpoint) + o.UsePathStyle = true + }) + } + client := s3.NewFromConfig(awsCfg, s3Opts...) + chunkSize := uint64(cfg.ChunkSize) + if chunkSize == 0 { + chunkSize = 64 + } + return client, chunkSize, nil +} + func NewS3Store(ctx context.Context, cfg *config.S3Config) (*S3Store, error) { - // ... duplicated code ... + client, chunkSize, err := buildS3Client(ctx, cfg) + if err != nil { + return nil, err + } + return newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize), nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/store/s3.go` around lines 92 - 150, Extract the duplicated setup in NewS3Store and NewS3StoreWithStaticCredentials into a small helper that accepts the loaded awsCfg and cfg and returns the constructed *S3Store (or the s3.Client and chunkSize) so both constructors only differ in how they build the awsconfig.LoadOptions (credentials provider vs not). Specifically, move creation of s3Opts (cfg.Endpoint handling, BaseEndpoint/UsePathStyle), the s3.NewFromConfig call, the chunkSize defaulting (cfg.ChunkSize -> default 64) and the final newS3Store(...) call into a helper (referencing s3.NewFromConfig, cfg.Endpoint, chunkSize and newS3Store) and have NewS3Store and NewS3StoreWithStaticCredentials just build opts, call awsconfig.LoadDefaultConfig, then call that helper with awsCfg and cfg.
380-399: Manual lock/unlock inGetBlobByCommitmentis fragile — consider restructuring.The manual
Lock()/Unlock()pattern (lines 387-399) with an early unlock-and-return at line 393-394 works but is easy to break during maintenance. A small helper or extracting the locked section into its own function would be safer.Sketch: extract locked lookup
+func (s *S3Store) findCommitInBuffer(commitHex string) *types.Blob { + s.mu.Lock() + defer s.mu.Unlock() + for _, entry := range s.commitBuf { + if entry.commitmentHex == commitHex { + ns, err := types.NamespaceFromHex(entry.pointer.Namespace) + if err == nil { + if b := s.findBlobInBufferLocked(ns, entry.pointer.Height, entry.pointer.Index); b != nil { + return b + } + } + } + } + return nil +} func (s *S3Store) GetBlobByCommitment(ctx context.Context, commitment []byte) (*types.Blob, error) { start := time.Now() defer func() { s.metrics.ObserveStoreQueryDuration("GetBlobByCommitment", time.Since(start)) }() commitHex := hex.EncodeToString(commitment) - s.mu.Lock() - for _, entry := range s.commitBuf { - ... - } - s.mu.Unlock() + if b := s.findCommitInBuffer(commitHex); b != nil { + return b, nil + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/store/s3.go` around lines 380 - 399, The manual mu.Lock()/Unlock() in S3Store.GetBlobByCommitment is fragile; extract the buffer lookup into a helper that handles locking and returns the found blob (if any) to avoid early-return while holding the mutex. Add a method on S3Store (e.g. getBlobFromCommitBuf(commitHex string) (*types.Blob, bool)) that acquires s.mu, iterates s.commitBuf, calls s.findBlobInBufferLocked(ns, entry.pointer.Height, entry.pointer.Index) where appropriate, releases s.mu, and returns the blob and true when found (nil,false otherwise). Then replace the manual lock/unlock block in GetBlobByCommitment with a single call to that helper and handle the returned blob normally.cmd/apex/main.go (1)
216-221: Type-switch for metrics wiring is fragile.Adding a third store backend requires remembering to update this switch. Consider defining a small optional interface (e.g.,
MetricsAware) and using a single type assertion instead:♻️ Proposed refactor
- // Wire metrics into the store. - switch s := db.(type) { - case *store.SQLiteStore: - s.SetMetrics(rec) - case *store.S3Store: - s.SetMetrics(rec) - } + // Wire metrics into the store. + if ma, ok := db.(interface{ SetMetrics(metrics.Recorder) }); ok { + ma.SetMetrics(rec) + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/apex/main.go` around lines 216 - 221, Create a small optional interface named MetricsAware (with SetMetrics(*recType) or matching signature used by existing stores) and use a single type assertion against db instead of the switch; e.g., define interface MetricsAware { SetMetrics(...same params...) }, then replace the switch on db.(type) with if ma, ok := db.(MetricsAware); ok { ma.SetMetrics(rec) } so any store implementing SetMetrics (e.g., store.SQLiteStore, store.S3Store) will be wired automatically without editing this site when new backends are added.config/load.go (1)
153-179: Validation function mutates config (sets defaultChunkSize).
validateStoragesilently assignss.S3.ChunkSize = 64on line 166 when the value is zero. Mixing mutation into avalidate*function is surprising. Consider moving the default intoDefaultConfig()or a separateapplyDefaultsstep sovalidate*stays purely read-only.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@config/load.go` around lines 153 - 179, validateStorage currently mutates the config by setting s.S3.ChunkSize = 64 when zero; change this by removing any assignment from validateStorage so it only inspects StorageConfig/S3 fields and returns errors, and instead set the default ChunkSize in the config initialization path (e.g., in DefaultConfig() or a new applyDefaults(cfg *Config) function that sets StorageConfig.S3.ChunkSize = 64 when zero). Keep the existing validation checks (e.g., chunk size positivity) in validateStorage, and ensure callers invoke the defaults-applier (DefaultConfig or applyDefaults) before calling validateStorage.pkg/store/s3_test.go (1)
66-555: Consider adding a concurrent-access test to exercise thesync.MutexinS3Store.The
S3Storestruct uses async.Mutexfor buffer access, and the coding guidelines require running tests with-race. A test that concurrently callsPutBlobs/GetBlob(and maybeSetSyncState) would increase confidence in the locking correctness.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/store/s3_test.go` around lines 66 - 555, Add a new test (e.g. TestS3Store_ConcurrentAccess) that concurrently exercises S3Store's mutex by launching multiple goroutines which call PutBlobs, GetBlob (and optionally SetSyncState) repeatedly against the same store instance returned by newTestS3Store; use a sync.WaitGroup to coordinate, randomize or vary heights/namespaces, and assert no errors or panics and that retrieved blobs match expected data; run this test under the race detector (-race) to validate the locking in methods PutBlobs, GetBlob, and SetSyncState.config/config.go (1)
44-50:ChunkSizetype differs between config and store—minor consistency concern.
S3Config.ChunkSizeisint(line 49) whileS3Store.chunkSizeisuint64(line 82 inpkg/store/s3.go), with explicit conversion at the boundary (pkg/store/s3.go:112). Validation inconfig/load.go:165-170ensures the value is non-negative before conversion, so the type cast is safe. However, the differing types add a maintenance risk. Since YAML naturally decodes intointanyway, consider keeping both asintfor consistency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@config/config.go` around lines 44 - 50, S3Config.ChunkSize is int while S3Store.chunkSize is uint64, causing an unnecessary type conversion; change S3Store.chunkSize from uint64 to int (symbol: S3Store.chunkSize) and remove the explicit cast where S3Config.ChunkSize is assigned (the conversion currently near the S3 store initialization), and adjust any downstream uses that assumed uint64 to work with int (or cast locally where truly needed) so types remain consistent with the YAML-decoded int and existing validation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/store/s3.go`:
- Around line 166-168: The SetMetrics method on S3Store writes s.metrics without
synchronization while metric-recording defer closures (which read s.metrics)
access it concurrently; fix this by making access to s.metrics
thread-safe—either (a) add a sync.RWMutex on S3Store and take a write lock in
S3Store.SetMetrics and read locks in the defer closures that reference
s.metrics, or (b) replace s.metrics with an atomic.Value storing the metrics.
Update S3Store.SetMetrics to use the chosen synchronization and update all
metric reads (the defer closures that reference s.metrics) to load under the
same mechanism; alternatively, if you prefer not to change runtime behavior,
document clearly on S3Store.SetMetrics that it must be called before any other
method and make s.metrics immutable after construction.
- Around line 540-609: The flush function can run concurrently causing
read-modify-write races in flushBlobChunk/flushHeaderChunk; add a dedicated
mutex on S3Store (e.g., flushMu sync.Mutex) and acquire it at the start of
S3Store.flush (s.flushMu.Lock(); defer s.flushMu.Unlock()) to serialize all
flush() invocations so only one flush reads/merges/writes chunks at a time;
update the S3Store struct to include flushMu and ensure callers
(Close/SetSyncState) that trigger flush continue to call the same method so the
serialization takes effect.
- Around line 228-259: PutNamespace currently grabs S3Store.s.mu and holds it
across network I/O (getObject/putObject), blocking all other store ops; change
it to use a dedicated namespace mutex (e.g., add S3Store.nsMu) or no global s.mu
so only namespace operations are serialized: acquire nsMu before reading/writing
the namespaces object, release it before returning, and leave s.mu out of this
function; ensure you still handle S3 not-found, JSON unmarshal/marshal, and
return errors as before, so only the mutex protecting namespaces (used around
the read-modify-write of key "meta/namespaces.json" in PutNamespace) is changed.
- Around line 644-674: Pre-allocate the merged slice when there is no existing
chunk to avoid the prealloc linter warning: in flushHeaderChunk, before the loop
over newHeaders, set merged to a preallocated slice when existing == nil (e.g.,
make([]types.Header, 0, len(newHeaders))), and keep the current behavior of
assigning merged = decodeS3Headers(existing) when existing != nil; then append
newHeaders into merged and proceed as before (this targets function
flushHeaderChunk, variables merged, existing, newHeaders, and decodeS3Headers).
In `@pkg/sync/backfill.go`:
- Around line 51-69: The progress reporter can produce +Inf/negative ETA when
done==0 (rate==0); update the ticker select block to guard against zero
divisions by checking done==0 and elapsed.Seconds() > 0 before computing rate
and remaining (variables: processedHeights, startTime, totalHeights, rate,
remaining), and when rate <= 0 set a safe fallback (e.g., rate = 0 and eta
string = "unknown" or "-" ) so you never convert +Inf to time.Duration; then use
those safe rate/eta strings in the b.log.Info() call (keep the same log fields
like "height", "target", "progress", "rate", "eta", "backfill progress").
---
Nitpick comments:
In `@cmd/apex/main.go`:
- Around line 216-221: Create a small optional interface named MetricsAware
(with SetMetrics(*recType) or matching signature used by existing stores) and
use a single type assertion against db instead of the switch; e.g., define
interface MetricsAware { SetMetrics(...same params...) }, then replace the
switch on db.(type) with if ma, ok := db.(MetricsAware); ok { ma.SetMetrics(rec)
} so any store implementing SetMetrics (e.g., store.SQLiteStore, store.S3Store)
will be wired automatically without editing this site when new backends are
added.
In `@config/config.go`:
- Around line 44-50: S3Config.ChunkSize is int while S3Store.chunkSize is
uint64, causing an unnecessary type conversion; change S3Store.chunkSize from
uint64 to int (symbol: S3Store.chunkSize) and remove the explicit cast where
S3Config.ChunkSize is assigned (the conversion currently near the S3 store
initialization), and adjust any downstream uses that assumed uint64 to work with
int (or cast locally where truly needed) so types remain consistent with the
YAML-decoded int and existing validation.
In `@config/load.go`:
- Around line 153-179: validateStorage currently mutates the config by setting
s.S3.ChunkSize = 64 when zero; change this by removing any assignment from
validateStorage so it only inspects StorageConfig/S3 fields and returns errors,
and instead set the default ChunkSize in the config initialization path (e.g.,
in DefaultConfig() or a new applyDefaults(cfg *Config) function that sets
StorageConfig.S3.ChunkSize = 64 when zero). Keep the existing validation checks
(e.g., chunk size positivity) in validateStorage, and ensure callers invoke the
defaults-applier (DefaultConfig or applyDefaults) before calling
validateStorage.
In `@pkg/store/s3_test.go`:
- Around line 66-555: Add a new test (e.g. TestS3Store_ConcurrentAccess) that
concurrently exercises S3Store's mutex by launching multiple goroutines which
call PutBlobs, GetBlob (and optionally SetSyncState) repeatedly against the same
store instance returned by newTestS3Store; use a sync.WaitGroup to coordinate,
randomize or vary heights/namespaces, and assert no errors or panics and that
retrieved blobs match expected data; run this test under the race detector
(-race) to validate the locking in methods PutBlobs, GetBlob, and SetSyncState.
In `@pkg/store/s3.go`:
- Around line 296-317: GetBlobs currently accumulates all matching S3 objects in
memory (via fetchBlobChunks) then applies offset/limit, which can OOM for large
ranges; modify the flow so fetchBlobChunks supports early termination by taking
offset and limit (or a targetCount = offset+limit) and yields or returns blobs
incrementally, and short-circuit once enough blobs have been collected
(respecting collectBufferedBlobs and deduplicateBlobs interactions), so
applyOffsetLimit receives at most the required window; update S3Store.GetBlobs
to pass offset/limit (or targetCount) into fetchBlobChunks and stop fetching
more S3 chunks when the required number of post-deduplication blobs is reached.
- Around line 92-150: Extract the duplicated setup in NewS3Store and
NewS3StoreWithStaticCredentials into a small helper that accepts the loaded
awsCfg and cfg and returns the constructed *S3Store (or the s3.Client and
chunkSize) so both constructors only differ in how they build the
awsconfig.LoadOptions (credentials provider vs not). Specifically, move creation
of s3Opts (cfg.Endpoint handling, BaseEndpoint/UsePathStyle), the
s3.NewFromConfig call, the chunkSize defaulting (cfg.ChunkSize -> default 64)
and the final newS3Store(...) call into a helper (referencing s3.NewFromConfig,
cfg.Endpoint, chunkSize and newS3Store) and have NewS3Store and
NewS3StoreWithStaticCredentials just build opts, call
awsconfig.LoadDefaultConfig, then call that helper with awsCfg and cfg.
- Around line 380-399: The manual mu.Lock()/Unlock() in
S3Store.GetBlobByCommitment is fragile; extract the buffer lookup into a helper
that handles locking and returns the found blob (if any) to avoid early-return
while holding the mutex. Add a method on S3Store (e.g.
getBlobFromCommitBuf(commitHex string) (*types.Blob, bool)) that acquires s.mu,
iterates s.commitBuf, calls s.findBlobInBufferLocked(ns, entry.pointer.Height,
entry.pointer.Index) where appropriate, releases s.mu, and returns the blob and
true when found (nil,false otherwise). Then replace the manual lock/unlock block
in GetBlobByCommitment with a single call to that helper and handle the returned
blob normally.
| func (s *S3Store) SetMetrics(m metrics.Recorder) { | ||
| s.metrics = m | ||
| } |
There was a problem hiding this comment.
SetMetrics is not thread-safe.
s.metrics is read without holding the lock in all metric-recording defer closures (e.g., lines 194, 218, 265, etc.), but SetMetrics writes it without synchronization. If SetMetrics is called after the store is in use, this is a data race. Either document that SetMetrics must be called before any other method, or protect the field.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/store/s3.go` around lines 166 - 168, The SetMetrics method on S3Store
writes s.metrics without synchronization while metric-recording defer closures
(which read s.metrics) access it concurrently; fix this by making access to
s.metrics thread-safe—either (a) add a sync.RWMutex on S3Store and take a write
lock in S3Store.SetMetrics and read locks in the defer closures that reference
s.metrics, or (b) replace s.metrics with an atomic.Value storing the metrics.
Update S3Store.SetMetrics to use the chosen synchronization and update all
metric reads (the defer closures that reference s.metrics) to load under the
same mechanism; alternatively, if you prefer not to change runtime behavior,
document clearly on S3Store.SetMetrics that it must be called before any other
method and make s.metrics immutable after construction.
There was a problem hiding this comment.
Code Review
The pull request introduces an S3-compatible storage backend, profiling support, backfill progress tracking, and metrics formatting fixes. The changes are well-structured, adding new configurations, store implementations, and integrating them into the existing system. The addition of StripHexPrefix in pkg/types/types.go is a good utility function. The S3 store implementation includes buffering and flushing logic, which is crucial for optimizing S3 interactions. The new profiling server and enhanced metrics for backfill stages provide valuable observability. Overall, the changes enhance the system's flexibility and monitoring capabilities.
| if cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" { | ||
| return fmt.Errorf("profiling.listen_addr is required when profiling is enabled") | ||
| } |
There was a problem hiding this comment.
A new validation check is added for ProfilingConfig, ensuring that profiling.listen_addr is provided if profiling is enabled. This prevents potential runtime errors if the profiling server cannot bind to an address.
| if cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" { | |
| return fmt.Errorf("profiling.listen_addr is required when profiling is enabled") | |
| } | |
| if cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" { | |
| return fmt.Errorf("profiling.listen_addr is required when profiling is enabled") | |
| } |
| close(stopProgress) | ||
| <-progressStopped | ||
| return fmt.Errorf("checkpoint at height %d: %w", batchEnd, err) |
There was a problem hiding this comment.
Similar to the previous error handling, the progress reporting goroutine is gracefully shut down if an error occurs during checkpointing. This ensures resource cleanup.
| close(stopProgress) | |
| <-progressStopped | |
| return fmt.Errorf("checkpoint at height %d: %w", batchEnd, err) | |
| close(stopProgress) | |
| <-progressStopped |
| close(stopProgress) | ||
| <-progressStopped | ||
| return err |
There was a problem hiding this comment.
The stopProgress channel is closed and progressStopped is waited upon before returning an error. This ensures that the progress reporting goroutine is properly shut down when an error occurs during batch processing, preventing goroutine leaks.
| close(stopProgress) | |
| <-progressStopped | |
| return err | |
| close(stopProgress) | |
| <-progressStopped |
| package store | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "context" | ||
| "encoding/hex" | ||
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "sort" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/aws/aws-sdk-go-v2/aws" | ||
| awsconfig "github.com/aws/aws-sdk-go-v2/config" | ||
| "github.com/aws/aws-sdk-go-v2/credentials" | ||
| "github.com/aws/aws-sdk-go-v2/service/s3" | ||
| s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" | ||
|
|
||
| "github.com/evstack/apex/config" | ||
| "github.com/evstack/apex/pkg/metrics" | ||
| "github.com/evstack/apex/pkg/types" | ||
| ) | ||
|
|
||
| // s3Client abstracts the S3 operations used by S3Store, enabling mock-based testing. | ||
| type s3Client interface { | ||
| GetObject(ctx context.Context, input *s3.GetObjectInput, opts ...func(*s3.Options)) (*s3.GetObjectOutput, error) | ||
| PutObject(ctx context.Context, input *s3.PutObjectInput, opts ...func(*s3.Options)) (*s3.PutObjectOutput, error) | ||
| } | ||
|
|
||
| // blobChunkKey identifies a blob chunk in S3 by namespace and chunk start height. | ||
| type blobChunkKey struct { | ||
| namespace types.Namespace | ||
| chunkStart uint64 | ||
| } | ||
|
|
||
| // commitEntry is a buffered commitment index write. | ||
| type commitEntry struct { | ||
| commitmentHex string | ||
| pointer commitPointer | ||
| } | ||
|
|
||
| // commitPointer is the JSON content of a commitment index object. | ||
| type commitPointer struct { | ||
| Namespace string `json:"namespace"` | ||
| Height uint64 `json:"height"` | ||
| Index int `json:"index"` | ||
| } | ||
|
|
||
| // s3Blob is the JSON representation of a blob in an S3 chunk object. | ||
| type s3Blob struct { | ||
| Height uint64 `json:"height"` | ||
| Namespace string `json:"namespace"` | ||
| Data []byte `json:"data"` | ||
| Commitment []byte `json:"commitment"` | ||
| ShareVersion uint32 `json:"share_version"` | ||
| Signer []byte `json:"signer"` | ||
| Index int `json:"index"` | ||
| } | ||
|
|
||
| // s3Header is the JSON representation of a header in an S3 chunk object. | ||
| type s3Header struct { | ||
| Height uint64 `json:"height"` | ||
| Hash []byte `json:"hash"` | ||
| DataHash []byte `json:"data_hash"` | ||
| Time time.Time `json:"time"` | ||
| RawHeader []byte `json:"raw_header"` | ||
| } | ||
|
|
||
| // maxFlushConcurrency bounds parallel S3 I/O during flush. | ||
| const maxFlushConcurrency = 8 | ||
|
|
||
| // S3Store implements Store using an S3-compatible object store. | ||
| // Writes are buffered in memory and flushed to S3 at checkpoint boundaries | ||
| // (SetSyncState calls and Close). | ||
| type S3Store struct { | ||
| client s3Client | ||
| bucket string | ||
| prefix string | ||
| chunkSize uint64 | ||
| metrics metrics.Recorder | ||
|
|
||
| mu sync.Mutex | ||
| blobBuf map[blobChunkKey][]types.Blob | ||
| headerBuf map[uint64][]*types.Header | ||
| commitBuf []commitEntry | ||
| } | ||
|
|
||
| // NewS3Store creates a new S3Store from the given config. | ||
| func NewS3Store(ctx context.Context, cfg *config.S3Config) (*S3Store, error) { | ||
| opts := []func(*awsconfig.LoadOptions) error{} | ||
| if cfg.Region != "" { | ||
| opts = append(opts, awsconfig.WithRegion(cfg.Region)) | ||
| } | ||
| awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("load AWS config: %w", err) | ||
| } | ||
|
|
||
| var s3Opts []func(*s3.Options) | ||
| if cfg.Endpoint != "" { | ||
| s3Opts = append(s3Opts, func(o *s3.Options) { | ||
| o.BaseEndpoint = aws.String(cfg.Endpoint) | ||
| o.UsePathStyle = true | ||
| }) | ||
| } | ||
|
|
||
| client := s3.NewFromConfig(awsCfg, s3Opts...) | ||
|
|
||
| chunkSize := uint64(cfg.ChunkSize) | ||
| if chunkSize == 0 { | ||
| chunkSize = 64 | ||
| } | ||
|
|
||
| return newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize), nil | ||
| } | ||
|
|
||
| // NewS3StoreWithStaticCredentials creates an S3Store with explicit credentials. | ||
| // Useful for testing against MinIO or other S3-compatible services. | ||
| func NewS3StoreWithStaticCredentials(ctx context.Context, cfg *config.S3Config, accessKey, secretKey, token string) (*S3Store, error) { | ||
| opts := []func(*awsconfig.LoadOptions) error{ | ||
| awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, token)), | ||
| } | ||
| if cfg.Region != "" { | ||
| opts = append(opts, awsconfig.WithRegion(cfg.Region)) | ||
| } | ||
| awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("load AWS config: %w", err) | ||
| } | ||
|
|
||
| var s3Opts []func(*s3.Options) | ||
| if cfg.Endpoint != "" { | ||
| s3Opts = append(s3Opts, func(o *s3.Options) { | ||
| o.BaseEndpoint = aws.String(cfg.Endpoint) | ||
| o.UsePathStyle = true | ||
| }) | ||
| } | ||
|
|
||
| client := s3.NewFromConfig(awsCfg, s3Opts...) | ||
|
|
||
| chunkSize := uint64(cfg.ChunkSize) | ||
| if chunkSize == 0 { | ||
| chunkSize = 64 | ||
| } | ||
|
|
||
| return newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize), nil | ||
| } | ||
|
|
||
| // newS3Store creates an S3Store with an injected client (for testing). | ||
| func newS3Store(client s3Client, bucket, prefix string, chunkSize uint64) *S3Store { | ||
| return &S3Store{ | ||
| client: client, | ||
| bucket: bucket, | ||
| prefix: strings.TrimSuffix(prefix, "/"), | ||
| chunkSize: chunkSize, | ||
| metrics: metrics.Nop(), | ||
| blobBuf: make(map[blobChunkKey][]types.Blob), | ||
| headerBuf: make(map[uint64][]*types.Header), | ||
| } | ||
| } | ||
|
|
||
| // SetMetrics sets the metrics recorder. | ||
| func (s *S3Store) SetMetrics(m metrics.Recorder) { | ||
| s.metrics = m | ||
| } | ||
|
|
||
| // chunkStart returns the chunk start height for a given height. | ||
| func (s *S3Store) chunkStart(height uint64) uint64 { | ||
| return (height / s.chunkSize) * s.chunkSize | ||
| } | ||
|
|
||
| // key builds an S3 object key with the configured prefix. | ||
| func (s *S3Store) key(parts ...string) string { | ||
| if s.prefix == "" { | ||
| return strings.Join(parts, "/") | ||
| } | ||
| return s.prefix + "/" + strings.Join(parts, "/") | ||
| } | ||
|
|
||
| func chunkFileName(start uint64) string { | ||
| return fmt.Sprintf("chunk_%016d.json", start) | ||
| } | ||
|
|
||
| // --- Write methods (buffer) --- | ||
|
|
||
| func (s *S3Store) PutBlobs(ctx context.Context, blobs []types.Blob) error { | ||
| if len(blobs) == 0 { | ||
| return nil | ||
| } | ||
| start := time.Now() | ||
| defer func() { s.metrics.ObserveStoreQueryDuration("PutBlobs", time.Since(start)) }() | ||
|
|
||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| for i := range blobs { | ||
| b := &blobs[i] | ||
| key := blobChunkKey{namespace: b.Namespace, chunkStart: s.chunkStart(b.Height)} | ||
| s.blobBuf[key] = append(s.blobBuf[key], *b) | ||
|
|
||
| s.commitBuf = append(s.commitBuf, commitEntry{ | ||
| commitmentHex: hex.EncodeToString(b.Commitment), | ||
| pointer: commitPointer{ | ||
| Namespace: b.Namespace.String(), | ||
| Height: b.Height, | ||
| Index: b.Index, | ||
| }, | ||
| }) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (s *S3Store) PutHeader(ctx context.Context, header *types.Header) error { | ||
| start := time.Now() | ||
| defer func() { s.metrics.ObserveStoreQueryDuration("PutHeader", time.Since(start)) }() | ||
|
|
||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| cs := s.chunkStart(header.Height) | ||
| s.headerBuf[cs] = append(s.headerBuf[cs], header) | ||
| return nil | ||
| } | ||
|
|
||
| func (s *S3Store) PutNamespace(ctx context.Context, ns types.Namespace) error { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| key := s.key("meta", "namespaces.json") | ||
|
|
||
| existing, err := s.getObject(ctx, key) | ||
| if err != nil && !isNotFound(err) { | ||
| return fmt.Errorf("get namespaces: %w", err) | ||
| } | ||
|
|
||
| namespaces := make([]string, 0, 1) | ||
| if existing != nil { | ||
| if err := json.Unmarshal(existing, &namespaces); err != nil { | ||
| return fmt.Errorf("decode namespaces: %w", err) | ||
| } | ||
| } | ||
|
|
||
| nsHex := ns.String() | ||
| for _, n := range namespaces { | ||
| if n == nsHex { | ||
| return nil | ||
| } | ||
| } | ||
| namespaces = append(namespaces, nsHex) | ||
|
|
||
| data, err := json.Marshal(namespaces) | ||
| if err != nil { | ||
| return fmt.Errorf("encode namespaces: %w", err) | ||
| } | ||
| return s.putObject(ctx, key, data) | ||
| } | ||
|
|
||
| // --- Read methods --- | ||
|
|
||
| func (s *S3Store) GetBlob(ctx context.Context, ns types.Namespace, height uint64, index int) (*types.Blob, error) { | ||
| start := time.Now() | ||
| defer func() { s.metrics.ObserveStoreQueryDuration("GetBlob", time.Since(start)) }() | ||
|
|
||
| // Check buffer first. | ||
| if b := s.findBlobInBuffer(ns, height, index); b != nil { | ||
| return b, nil | ||
| } | ||
|
|
||
| cs := s.chunkStart(height) | ||
| key := s.key("blobs", ns.String(), chunkFileName(cs)) | ||
|
|
||
| data, err := s.getObject(ctx, key) | ||
| if err != nil { | ||
| if isNotFound(err) { | ||
| return nil, ErrNotFound | ||
| } | ||
| return nil, fmt.Errorf("get blob chunk: %w", err) | ||
| } | ||
|
|
||
| blobs, err := decodeS3Blobs(data) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| for i := range blobs { | ||
| if blobs[i].Height == height && blobs[i].Index == index && blobs[i].Namespace == ns { | ||
| return &blobs[i], nil | ||
| } | ||
| } | ||
| return nil, ErrNotFound | ||
| } | ||
|
|
||
| func (s *S3Store) GetBlobs(ctx context.Context, ns types.Namespace, startHeight, endHeight uint64, limit, offset int) ([]types.Blob, error) { | ||
| start := time.Now() | ||
| defer func() { s.metrics.ObserveStoreQueryDuration("GetBlobs", time.Since(start)) }() | ||
|
|
||
| buffered := s.collectBufferedBlobs(ns, startHeight, endHeight) | ||
|
|
||
| s3Blobs, err := s.fetchBlobChunks(ctx, ns, startHeight, endHeight) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| allBlobs := deduplicateBlobs(append(buffered, s3Blobs...)) | ||
|
|
||
| sort.Slice(allBlobs, func(i, j int) bool { | ||
| if allBlobs[i].Height != allBlobs[j].Height { | ||
| return allBlobs[i].Height < allBlobs[j].Height | ||
| } | ||
| return allBlobs[i].Index < allBlobs[j].Index | ||
| }) | ||
|
|
||
| return applyOffsetLimit(allBlobs, offset, limit), nil | ||
| } | ||
|
|
||
| // collectBufferedBlobs returns in-buffer blobs matching the namespace and height range. | ||
| func (s *S3Store) collectBufferedBlobs(ns types.Namespace, startHeight, endHeight uint64) []types.Blob { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| var result []types.Blob | ||
| for key, bufs := range s.blobBuf { | ||
| if key.namespace != ns { | ||
| continue | ||
| } | ||
| for i := range bufs { | ||
| if bufs[i].Height >= startHeight && bufs[i].Height <= endHeight { | ||
| result = append(result, bufs[i]) | ||
| } | ||
| } | ||
| } | ||
| return result | ||
| } | ||
|
|
||
| // fetchBlobChunks reads all S3 blob chunks overlapping the height range for a namespace. | ||
| func (s *S3Store) fetchBlobChunks(ctx context.Context, ns types.Namespace, startHeight, endHeight uint64) ([]types.Blob, error) { | ||
| firstChunk := s.chunkStart(startHeight) | ||
| lastChunk := s.chunkStart(endHeight) | ||
|
|
||
| var allBlobs []types.Blob | ||
| for cs := firstChunk; cs <= lastChunk; cs += s.chunkSize { | ||
| key := s.key("blobs", ns.String(), chunkFileName(cs)) | ||
| data, err := s.getObject(ctx, key) | ||
| if err != nil { | ||
| if isNotFound(err) { | ||
| continue | ||
| } | ||
| return nil, fmt.Errorf("get blob chunk at %d: %w", cs, err) | ||
| } | ||
|
|
||
| blobs, err := decodeS3Blobs(data) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| for i := range blobs { | ||
| if blobs[i].Height >= startHeight && blobs[i].Height <= endHeight && blobs[i].Namespace == ns { | ||
| allBlobs = append(allBlobs, blobs[i]) | ||
| } | ||
| } | ||
| } | ||
| return allBlobs, nil | ||
| } | ||
|
|
||
| func applyOffsetLimit(items []types.Blob, offset, limit int) []types.Blob { | ||
| if offset > 0 { | ||
| if offset >= len(items) { | ||
| return nil | ||
| } | ||
| items = items[offset:] | ||
| } | ||
| if limit > 0 && limit < len(items) { | ||
| items = items[:limit] | ||
| } | ||
| return items | ||
| } | ||
|
|
||
| func (s *S3Store) GetBlobByCommitment(ctx context.Context, commitment []byte) (*types.Blob, error) { | ||
| start := time.Now() | ||
| defer func() { s.metrics.ObserveStoreQueryDuration("GetBlobByCommitment", time.Since(start)) }() | ||
|
|
||
| // Check buffer first. | ||
| commitHex := hex.EncodeToString(commitment) | ||
|
|
||
| s.mu.Lock() | ||
| for _, entry := range s.commitBuf { | ||
| if entry.commitmentHex == commitHex { | ||
| ns, err := types.NamespaceFromHex(entry.pointer.Namespace) | ||
| if err == nil { | ||
| if b := s.findBlobInBufferLocked(ns, entry.pointer.Height, entry.pointer.Index); b != nil { | ||
| s.mu.Unlock() | ||
| return b, nil | ||
| } | ||
| } | ||
| } | ||
| } | ||
| s.mu.Unlock() | ||
|
|
||
| // Look up commitment index in S3. | ||
| key := s.key("index", "commitments", commitHex+".json") | ||
| data, err := s.getObject(ctx, key) | ||
| if err != nil { | ||
| if isNotFound(err) { | ||
| return nil, ErrNotFound | ||
| } | ||
| return nil, fmt.Errorf("get commitment index: %w", err) | ||
| } | ||
|
|
||
| var ptr commitPointer | ||
| if err := json.Unmarshal(data, &ptr); err != nil { | ||
| return nil, fmt.Errorf("decode commitment pointer: %w", err) | ||
| } | ||
|
|
||
| ns, err := types.NamespaceFromHex(ptr.Namespace) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("parse namespace from commitment index: %w", err) | ||
| } | ||
|
|
||
| return s.GetBlob(ctx, ns, ptr.Height, ptr.Index) | ||
| } | ||
|
|
||
| func (s *S3Store) GetHeader(ctx context.Context, height uint64) (*types.Header, error) { | ||
| start := time.Now() | ||
| defer func() { s.metrics.ObserveStoreQueryDuration("GetHeader", time.Since(start)) }() | ||
|
|
||
| // Check buffer first. | ||
| if h := s.findHeaderInBuffer(height); h != nil { | ||
| return h, nil | ||
| } | ||
|
|
||
| cs := s.chunkStart(height) | ||
| key := s.key("headers", chunkFileName(cs)) | ||
|
|
||
| data, err := s.getObject(ctx, key) | ||
| if err != nil { | ||
| if isNotFound(err) { | ||
| return nil, ErrNotFound | ||
| } | ||
| return nil, fmt.Errorf("get header chunk: %w", err) | ||
| } | ||
|
|
||
| headers, err := decodeS3Headers(data) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| for i := range headers { | ||
| if headers[i].Height == height { | ||
| return &headers[i], nil | ||
| } | ||
| } | ||
| return nil, ErrNotFound | ||
| } | ||
|
|
||
| func (s *S3Store) GetNamespaces(ctx context.Context) ([]types.Namespace, error) { | ||
| key := s.key("meta", "namespaces.json") | ||
| data, err := s.getObject(ctx, key) | ||
| if err != nil { | ||
| if isNotFound(err) { | ||
| return nil, nil | ||
| } | ||
| return nil, fmt.Errorf("get namespaces: %w", err) | ||
| } | ||
|
|
||
| var hexList []string | ||
| if err := json.Unmarshal(data, &hexList); err != nil { | ||
| return nil, fmt.Errorf("decode namespaces: %w", err) | ||
| } | ||
|
|
||
| namespaces := make([]types.Namespace, 0, len(hexList)) | ||
| for _, h := range hexList { | ||
| ns, err := types.NamespaceFromHex(h) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("parse namespace %q: %w", h, err) | ||
| } | ||
| namespaces = append(namespaces, ns) | ||
| } | ||
| return namespaces, nil | ||
| } | ||
|
|
||
| func (s *S3Store) GetSyncState(ctx context.Context) (*types.SyncStatus, error) { | ||
| start := time.Now() | ||
| defer func() { s.metrics.ObserveStoreQueryDuration("GetSyncState", time.Since(start)) }() | ||
|
|
||
| key := s.key("meta", "sync_state.json") | ||
| data, err := s.getObject(ctx, key) | ||
| if err != nil { | ||
| if isNotFound(err) { | ||
| return nil, ErrNotFound | ||
| } | ||
| return nil, fmt.Errorf("get sync state: %w", err) | ||
| } | ||
|
|
||
| var state struct { | ||
| State int `json:"state"` | ||
| LatestHeight uint64 `json:"latest_height"` | ||
| NetworkHeight uint64 `json:"network_height"` | ||
| } | ||
| if err := json.Unmarshal(data, &state); err != nil { | ||
| return nil, fmt.Errorf("decode sync state: %w", err) | ||
| } | ||
| return &types.SyncStatus{ | ||
| State: types.SyncState(state.State), | ||
| LatestHeight: state.LatestHeight, | ||
| NetworkHeight: state.NetworkHeight, | ||
| }, nil | ||
| } | ||
|
|
||
| func (s *S3Store) SetSyncState(ctx context.Context, status types.SyncStatus) error { | ||
| // Flush buffered data first — this is the checkpoint boundary. | ||
| if err := s.flush(ctx); err != nil { | ||
| return fmt.Errorf("flush before sync state: %w", err) | ||
| } | ||
|
|
||
| data, err := json.Marshal(struct { | ||
| State int `json:"state"` | ||
| LatestHeight uint64 `json:"latest_height"` | ||
| NetworkHeight uint64 `json:"network_height"` | ||
| }{ | ||
| State: int(status.State), | ||
| LatestHeight: status.LatestHeight, | ||
| NetworkHeight: status.NetworkHeight, | ||
| }) | ||
| if err != nil { | ||
| return fmt.Errorf("encode sync state: %w", err) | ||
| } | ||
|
|
||
| key := s.key("meta", "sync_state.json") | ||
| return s.putObject(ctx, key, data) | ||
| } | ||
|
|
||
| func (s *S3Store) Close() error { | ||
| return s.flush(context.Background()) | ||
| } | ||
|
|
||
| // --- Flush --- | ||
|
|
||
| // flush drains the write buffer to S3. Called at checkpoint boundaries. | ||
| func (s *S3Store) flush(ctx context.Context) error { | ||
| s.mu.Lock() | ||
| blobBuf := s.blobBuf | ||
| headerBuf := s.headerBuf | ||
| commitBuf := s.commitBuf | ||
| s.blobBuf = make(map[blobChunkKey][]types.Blob) | ||
| s.headerBuf = make(map[uint64][]*types.Header) | ||
| s.commitBuf = nil | ||
| s.mu.Unlock() | ||
|
|
||
| if len(blobBuf) == 0 && len(headerBuf) == 0 && len(commitBuf) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| // Use a semaphore to bound concurrency. | ||
| sem := make(chan struct{}, maxFlushConcurrency) | ||
| var ( | ||
| mu sync.Mutex | ||
| errs []error | ||
| ) | ||
| addErr := func(err error) { | ||
| mu.Lock() | ||
| errs = append(errs, err) | ||
| mu.Unlock() | ||
| } | ||
|
|
||
| var wg sync.WaitGroup | ||
|
|
||
| // Flush blob chunks. | ||
| for key, blobs := range blobBuf { | ||
| wg.Add(1) | ||
| go func(key blobChunkKey, blobs []types.Blob) { | ||
| defer wg.Done() | ||
| sem <- struct{}{} | ||
| defer func() { <-sem }() | ||
| if err := s.flushBlobChunk(ctx, key, blobs); err != nil { | ||
| addErr(err) | ||
| } | ||
| }(key, blobs) | ||
| } | ||
|
|
||
| // Flush header chunks. | ||
| for cs, headers := range headerBuf { | ||
| wg.Add(1) | ||
| go func(cs uint64, headers []*types.Header) { | ||
| defer wg.Done() | ||
| sem <- struct{}{} | ||
| defer func() { <-sem }() | ||
| if err := s.flushHeaderChunk(ctx, cs, headers); err != nil { | ||
| addErr(err) | ||
| } | ||
| }(cs, headers) | ||
| } | ||
|
|
||
| // Flush commitment indices. | ||
| for _, entry := range commitBuf { | ||
| wg.Add(1) | ||
| go func(e commitEntry) { | ||
| defer wg.Done() | ||
| sem <- struct{}{} | ||
| defer func() { <-sem }() | ||
| if err := s.flushCommitIndex(ctx, e); err != nil { | ||
| addErr(err) | ||
| } | ||
| }(entry) | ||
| } | ||
|
|
||
| wg.Wait() | ||
| return errors.Join(errs...) | ||
| } | ||
|
|
||
| func (s *S3Store) flushBlobChunk(ctx context.Context, key blobChunkKey, newBlobs []types.Blob) error { | ||
| objKey := s.key("blobs", key.namespace.String(), chunkFileName(key.chunkStart)) | ||
|
|
||
| // Read existing chunk (may 404). | ||
| existing, err := s.getObject(ctx, objKey) | ||
| if err != nil && !isNotFound(err) { | ||
| return fmt.Errorf("read blob chunk for merge: %w", err) | ||
| } | ||
|
|
||
| var merged []types.Blob | ||
| if existing != nil { | ||
| merged, err = decodeS3Blobs(existing) | ||
| if err != nil { | ||
| return fmt.Errorf("decode existing blob chunk: %w", err) | ||
| } | ||
| } | ||
|
|
||
| merged = append(merged, newBlobs...) | ||
| merged = deduplicateBlobs(merged) | ||
| sort.Slice(merged, func(i, j int) bool { | ||
| if merged[i].Height != merged[j].Height { | ||
| return merged[i].Height < merged[j].Height | ||
| } | ||
| return merged[i].Index < merged[j].Index | ||
| }) | ||
|
|
||
| data, err := encodeS3Blobs(merged) | ||
| if err != nil { | ||
| return fmt.Errorf("encode blob chunk: %w", err) | ||
| } | ||
| return s.putObject(ctx, objKey, data) | ||
| } | ||
|
|
||
| func (s *S3Store) flushHeaderChunk(ctx context.Context, cs uint64, newHeaders []*types.Header) error { | ||
| objKey := s.key("headers", chunkFileName(cs)) | ||
|
|
||
| existing, err := s.getObject(ctx, objKey) | ||
| if err != nil && !isNotFound(err) { | ||
| return fmt.Errorf("read header chunk for merge: %w", err) | ||
| } | ||
|
|
||
| var merged []types.Header | ||
| if existing != nil { | ||
| merged, err = decodeS3Headers(existing) | ||
| if err != nil { | ||
| return fmt.Errorf("decode existing header chunk: %w", err) | ||
| } | ||
| } | ||
|
|
||
| for _, h := range newHeaders { | ||
| merged = append(merged, *h) | ||
| } | ||
|
|
||
| merged = deduplicateHeaders(merged) | ||
| sort.Slice(merged, func(i, j int) bool { | ||
| return merged[i].Height < merged[j].Height | ||
| }) | ||
|
|
||
| data, err := encodeS3Headers(merged) | ||
| if err != nil { | ||
| return fmt.Errorf("encode header chunk: %w", err) | ||
| } | ||
| return s.putObject(ctx, objKey, data) | ||
| } | ||
|
|
||
| func (s *S3Store) flushCommitIndex(ctx context.Context, e commitEntry) error { | ||
| key := s.key("index", "commitments", e.commitmentHex+".json") | ||
| data, err := json.Marshal(e.pointer) | ||
| if err != nil { | ||
| return fmt.Errorf("encode commitment index: %w", err) | ||
| } | ||
| return s.putObject(ctx, key, data) | ||
| } | ||
|
|
||
| // --- Buffer lookup helpers --- | ||
|
|
||
| func (s *S3Store) findBlobInBuffer(ns types.Namespace, height uint64, index int) *types.Blob { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
| return s.findBlobInBufferLocked(ns, height, index) | ||
| } | ||
|
|
||
| func (s *S3Store) findBlobInBufferLocked(ns types.Namespace, height uint64, index int) *types.Blob { | ||
| key := blobChunkKey{namespace: ns, chunkStart: s.chunkStart(height)} | ||
| for i := range s.blobBuf[key] { | ||
| b := &s.blobBuf[key][i] | ||
| if b.Height == height && b.Index == index { | ||
| cp := *b | ||
| return &cp | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (s *S3Store) findHeaderInBuffer(height uint64) *types.Header { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| cs := s.chunkStart(height) | ||
| for _, h := range s.headerBuf[cs] { | ||
| if h.Height == height { | ||
| cp := *h | ||
| return &cp | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // --- S3 helpers --- | ||
|
|
||
| func (s *S3Store) getObject(ctx context.Context, key string) ([]byte, error) { | ||
| out, err := s.client.GetObject(ctx, &s3.GetObjectInput{ | ||
| Bucket: aws.String(s.bucket), | ||
| Key: aws.String(key), | ||
| }) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer out.Body.Close() //nolint:errcheck | ||
| return io.ReadAll(out.Body) | ||
| } | ||
|
|
||
| func (s *S3Store) putObject(ctx context.Context, key string, data []byte) error { | ||
| _, err := s.client.PutObject(ctx, &s3.PutObjectInput{ | ||
| Bucket: aws.String(s.bucket), | ||
| Key: aws.String(key), | ||
| Body: bytes.NewReader(data), | ||
| ContentType: aws.String("application/json"), | ||
| }) | ||
| return err | ||
| } | ||
|
|
||
| // isNotFound returns true if the error indicates an S3 NoSuchKey. | ||
| func isNotFound(err error) bool { | ||
| var nsk *s3types.NoSuchKey | ||
| if errors.As(err, &nsk) { | ||
| return true | ||
| } | ||
| // Some S3-compatible services return NotFound via the HTTP status code | ||
| // wrapped in a smithy-go OperationError rather than a typed NoSuchKey error. | ||
| var nsb *s3types.NotFound | ||
| return errors.As(err, &nsb) | ||
| } | ||
|
|
||
| // --- Encoding/decoding --- | ||
|
|
||
| func encodeS3Blobs(blobs []types.Blob) ([]byte, error) { | ||
| out := make([]s3Blob, len(blobs)) | ||
| for i, b := range blobs { | ||
| out[i] = s3Blob{ | ||
| Height: b.Height, | ||
| Namespace: b.Namespace.String(), | ||
| Data: b.Data, | ||
| Commitment: b.Commitment, | ||
| ShareVersion: b.ShareVersion, | ||
| Signer: b.Signer, | ||
| Index: b.Index, | ||
| } | ||
| } | ||
| return json.Marshal(out) | ||
| } | ||
|
|
||
| func decodeS3Blobs(data []byte) ([]types.Blob, error) { | ||
| var raw []s3Blob | ||
| if err := json.Unmarshal(data, &raw); err != nil { | ||
| return nil, fmt.Errorf("decode blobs JSON: %w", err) | ||
| } | ||
| blobs := make([]types.Blob, len(raw)) | ||
| for i, r := range raw { | ||
| ns, err := types.NamespaceFromHex(r.Namespace) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("decode blob namespace: %w", err) | ||
| } | ||
| blobs[i] = types.Blob{ | ||
| Height: r.Height, | ||
| Namespace: ns, | ||
| Data: r.Data, | ||
| Commitment: r.Commitment, | ||
| ShareVersion: r.ShareVersion, | ||
| Signer: r.Signer, | ||
| Index: r.Index, | ||
| } | ||
| } | ||
| return blobs, nil | ||
| } | ||
|
|
||
| func encodeS3Headers(headers []types.Header) ([]byte, error) { | ||
| out := make([]s3Header, len(headers)) | ||
| for i, h := range headers { | ||
| out[i] = s3Header{ | ||
| Height: h.Height, | ||
| Hash: h.Hash, | ||
| DataHash: h.DataHash, | ||
| Time: h.Time, | ||
| RawHeader: h.RawHeader, | ||
| } | ||
| } | ||
| return json.Marshal(out) | ||
| } | ||
|
|
||
| func decodeS3Headers(data []byte) ([]types.Header, error) { | ||
| var raw []s3Header | ||
| if err := json.Unmarshal(data, &raw); err != nil { | ||
| return nil, fmt.Errorf("decode headers JSON: %w", err) | ||
| } | ||
| headers := make([]types.Header, len(raw)) | ||
| for i, r := range raw { | ||
| headers[i] = types.Header{ | ||
| Height: r.Height, | ||
| Hash: r.Hash, | ||
| DataHash: r.DataHash, | ||
| Time: r.Time, | ||
| RawHeader: r.RawHeader, | ||
| } | ||
| } | ||
| return headers, nil | ||
| } | ||
|
|
||
| // --- Deduplication --- | ||
|
|
||
| func deduplicateBlobs(blobs []types.Blob) []types.Blob { | ||
| type blobKey struct { | ||
| height uint64 | ||
| namespace types.Namespace | ||
| index int | ||
| } | ||
| seen := make(map[blobKey]struct{}, len(blobs)) | ||
| out := make([]types.Blob, 0, len(blobs)) | ||
| for _, b := range blobs { | ||
| k := blobKey{height: b.Height, namespace: b.Namespace, index: b.Index} | ||
| if _, ok := seen[k]; ok { | ||
| continue | ||
| } | ||
| seen[k] = struct{}{} | ||
| out = append(out, b) | ||
| } | ||
| return out | ||
| } | ||
|
|
||
| func deduplicateHeaders(headers []types.Header) []types.Header { | ||
| seen := make(map[uint64]struct{}, len(headers)) | ||
| out := make([]types.Header, 0, len(headers)) | ||
| for _, h := range headers { | ||
| if _, ok := seen[h.Height]; ok { | ||
| continue | ||
| } | ||
| seen[h.Height] = struct{}{} | ||
| out = append(out, h) | ||
| } | ||
| return out | ||
| } |
There was a problem hiding this comment.
This new file introduces the S3Store implementation, providing an S3-compatible object store backend for Apex. It includes buffering, chunking, and flushing logic to optimize interactions with S3, along with methods for reading and writing blobs, headers, namespaces, and sync state. This is a significant new feature for flexible storage options.
package store
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"sort"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/evstack/apex/config"
"github.com/evstack/apex/pkg/metrics"
"github.com/evstack/apex/pkg/types"
)
// s3Client abstracts the S3 operations used by S3Store, enabling mock-based testing.
type s3Client interface {
GetObject(ctx context.Context, input *s3.GetObjectInput, opts ...func(*s3.Options)) (*s3.GetObjectOutput, error)
PutObject(ctx context.Context, input *s3.PutObjectInput, opts ...func(*s3.Options)) (*s3.PutObjectOutput, error)
}
// blobChunkKey identifies a blob chunk in S3 by namespace and chunk start height.
type blobChunkKey struct {
namespace types.Namespace
chunkStart uint64
}
// commitEntry is a buffered commitment index write.
type commitEntry struct {
commitmentHex string
pointer commitPointer
}
// commitPointer is the JSON content of a commitment index object.
type commitPointer struct {
Namespace string `json:"namespace"`
Height uint64 `json:"height"`
Index int `json:"index"`
}
// s3Blob is the JSON representation of a blob in an S3 chunk object.
type s3Blob struct {
Height uint64 `json:"height"`
Namespace string `json:"namespace"`
Data []byte `json:"data"`
Commitment []byte `json:"commitment"`
ShareVersion uint32 `json:"share_version"`
Signer []byte `json:"signer"`
Index int `json:"index"`
}
// s3Header is the JSON representation of a header in an S3 chunk object.
type s3Header struct {
Height uint64 `json:"height"`
Hash []byte `json:"hash"`
DataHash []byte `json:"data_hash"`
Time time.Time `json:"time"`
RawHeader []byte `json:"raw_header"`
}
// maxFlushConcurrency bounds parallel S3 I/O during flush.
const maxFlushConcurrency = 8
// S3Store implements Store using an S3-compatible object store.
// Writes are buffered in memory and flushed to S3 at checkpoint boundaries
// (SetSyncState calls and Close).
type S3Store struct {
client s3Client
bucket string
prefix string
chunkSize uint64
metrics metrics.Recorder
mu sync.Mutex
blobBuf map[blobChunkKey][]types.Blob
headerBuf map[uint64][]*types.Header
commitBuf []commitEntry
}
// NewS3Store creates a new S3Store from the given config.
func NewS3Store(ctx context.Context, cfg *config.S3Config) (*S3Store, error) {
opts := []func(*awsconfig.LoadOptions) error{}
if cfg.Region != "" {
opts = append(opts, awsconfig.WithRegion(cfg.Region))
}
awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("load AWS config: %w", err)
}
var s3Opts []func(*s3.Options)
if cfg.Endpoint != "" {
s3Opts = append(s3Opts, func(o *s3.Options) {
o.BaseEndpoint = aws.String(cfg.Endpoint)
o.UsePathStyle = true
})
}
client := s3.NewFromConfig(awsCfg, s3Opts...)
chunkSize := uint64(cfg.ChunkSize)
if chunkSize == 0 {
chunkSize = 64
}
return newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize), nil
}
// NewS3StoreWithStaticCredentials creates an S3Store with explicit credentials.
// Useful for testing against MinIO or other S3-compatible services.
func NewS3StoreWithStaticCredentials(ctx context.Context, cfg *config.S3Config, accessKey, secretKey, token string) (*S3Store, error) {
opts := []func(*awsconfig.LoadOptions) error{
awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, token)),
}
if cfg.Region != "" {
opts = append(opts, awsconfig.WithRegion(cfg.Region))
}
awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("load AWS config: %w", err)
}
var s3Opts []func(*s3.Options)
if cfg.Endpoint != "" {
s3Opts = append(s3Opts, func(o *s3.Options) {
o.BaseEndpoint = aws.String(cfg.Endpoint)
o.UsePathStyle = true
})
}
client := s3.NewFromConfig(awsCfg, s3Opts...)
chunkSize := uint64(cfg.ChunkSize)
if chunkSize == 0 {
chunkSize = 64
}
return newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize), nil
}
// newS3Store creates an S3Store with an injected client (for testing).
func newS3Store(client s3Client, bucket, prefix string, chunkSize uint64) *S3Store {
return &S3Store{
client: client,
bucket: bucket,
prefix: strings.TrimSuffix(prefix, "/"),
chunkSize: chunkSize,
metrics: metrics.Nop(),
blobBuf: make(map[blobChunkKey][]types.Blob),
headerBuf: make(map[uint64][]*types.Header),
}
}
// SetMetrics sets the metrics recorder.
func (s *S3Store) SetMetrics(m metrics.Recorder) {
s.metrics = m
}
// chunkStart returns the chunk start height for a given height.
func (s *S3Store) chunkStart(height uint64) uint64 {
return (height / s.chunkSize) * s.chunkSize
}
// key builds an S3 object key with the configured prefix.
func (s *S3Store) key(parts ...string) string {
if s.prefix == "" {
return strings.Join(parts, "/")
}
return s.prefix + "/" + strings.Join(parts, "/")
}
func chunkFileName(start uint64) string {
return fmt.Sprintf("chunk_%016d.json", start)
}
// --- Write methods (buffer) ---
func (s *S3Store) PutBlobs(ctx context.Context, blobs []types.Blob) error {
if len(blobs) == 0 {
return nil
}
start := time.Now()
defer func() { s.metrics.ObserveStoreQueryDuration("PutBlobs", time.Since(start)) }()
s.mu.Lock()
defer s.mu.Unlock()
for i := range blobs {
b := &blobs[i]
key := blobChunkKey{namespace: b.Namespace, chunkStart: s.chunkStart(b.Height)}
s.blobBuf[key] = append(s.blobBuf[key], *b)
s.commitBuf = append(s.commitBuf, commitEntry{
commitmentHex: hex.EncodeToString(b.Commitment),
pointer: commitPointer{
Namespace: b.Namespace.String(),
Height: b.Height,
Index: b.Index,
},
})
}
return nil
}
func (s *S3Store) PutHeader(ctx context.Context, header *types.Header) error {
start := time.Now()
defer func() { s.metrics.ObserveStoreQueryDuration("PutHeader", time.Since(start)) }()
s.mu.Lock()
defer s.mu.Unlock()
cs := s.chunkStart(header.Height)
s.headerBuf[cs] = append(s.headerBuf[cs], header)
return nil
}
func (s *S3Store) PutNamespace(ctx context.Context, ns types.Namespace) error {
s.mu.Lock()
defer s.mu.Unlock()
key := s.key("meta", "namespaces.json")
existing, err := s.getObject(ctx, key)
if err != nil && !isNotFound(err) {
return fmt.Errorf("get namespaces: %w", err)
}
namespaces := make([]string, 0, 1)
if existing != nil {
if err := json.Unmarshal(existing, &namespaces); err != nil {
return fmt.Errorf("decode namespaces: %w", err)
}
}
nsHex := ns.String()
for _, n := range namespaces {
if n == nsHex {
return nil
}
}
namespaces = append(namespaces, nsHex)
data, err := json.Marshal(namespaces)
if err != nil {
return fmt.Errorf("encode namespaces: %w", err)
}
return s.putObject(ctx, key, data)
}
// --- Read methods ---
func (s *S3Store) GetBlob(ctx context.Context, ns types.Namespace, height uint64, index int) (*types.Blob, error) {
start := time.Now()
defer func() { s.metrics.ObserveStoreQueryDuration("GetBlob", time.Since(start)) }()
// Check buffer first.
if b := s.findBlobInBuffer(ns, height, index); b != nil {
return b, nil
}
cs := s.chunkStart(height)
key := s.key("blobs", ns.String(), chunkFileName(cs))
data, err := s.getObject(ctx, key)
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, fmt.Errorf("get blob chunk: %w", err)
}
blobs, err := decodeS3Blobs(data)
if err != nil {
return nil, err
}
for i := range blobs {
if blobs[i].Height == height && blobs[i].Index == index && blobs[i].Namespace == ns {
return &blobs[i],
nil
}
}
return nil, ErrNotFound
}
func (s *S3Store) GetBlobs(ctx context.Context, ns types.Namespace, startHeight, endHeight uint64, limit, offset int) ([]types.Blob, error) {
start := time.Now()
defer func() { s.metrics.ObserveStoreQueryDuration("GetBlobs", time.Since(start)) }()
buffered := s.collectBufferedBlobs(ns, startHeight, endHeight)
s3Blobs, err := s.fetchBlobChunks(ctx, ns, startHeight, endHeight)
if err != nil {
return nil, err
}
allBlobs := deduplicateBlobs(append(buffered, s3Blobs...))
sort.Slice(allBlobs, func(i, j int) bool {
if allBlobs[i].Height != allBlobs[j].Height {
return allBlobs[i].Height < allBlobs[j].Height
}
return allBlobs[i].Index < allBlobs[j].Index
})
return applyOffsetLimit(allBlobs, offset, limit), nil
}
// collectBufferedBlobs returns in-buffer blobs matching the namespace and height range.
func (s *S3Store) collectBufferedBlobs(ns types.Namespace, startHeight, endHeight uint64) []types.Blob {
s.mu.Lock()
defer s.mu.Unlock()
var result []types.Blob
for key, bufs := range s.blobBuf {
if key.namespace != ns {
continue
}
for i := range bufs {
if bufs[i].Height >= startHeight && bufs[i].Height <= endHeight {
result = append(result, bufs[i])
}
}
}
return result
}
// fetchBlobChunks reads all S3 blob chunks overlapping the height range for a namespace.
func (s *S3Store) fetchBlobChunks(ctx context.Context, ns types.Namespace, startHeight, endHeight uint64) ([]types.Blob, error) {
firstChunk := s.chunkStart(startHeight)
lastChunk := s.chunkStart(endHeight)
var allBlobs []types.Blob
for cs := firstChunk; cs <= lastChunk; cs += s.chunkSize {
key := s.key("blobs", ns.String(), chunkFileName(cs))
data, err := s.getObject(ctx, key)
if err != nil {
if isNotFound(err) {
continue
}
return nil, fmt.Errorf("get blob chunk at %d: %w", cs, err)
}
blobs, err := decodeS3Blobs(data)
if err != nil {
return nil, err
}
for i := range blobs {
if blobs[i].Height >= startHeight && blobs[i].Height <= endHeight && blobs[i].Namespace == ns {
allBlobs = append(allBlobs, blobs[i])
}
}
}
return allBlobs, nil
}
func applyOffsetLimit(items []types.Blob, offset, limit int) []types.Blob {
if offset > 0 {
if offset >= len(items) {
return nil
}
items = items[offset:]
}
if limit > 0 && limit < len(items) {
items = items[:limit]
}
return items
}
func (s *S3Store) GetBlobByCommitment(ctx context.Context, commitment []byte) (*types.Blob, error) {
start := time.Now()
defer func() { s.metrics.ObserveStoreQueryDuration("GetBlobByCommitment", time.Since(start)) }()
// Check buffer first.
commitHex := hex.EncodeToString(commitment)
s.mu.Lock()
for _, entry := range s.commitBuf {
if entry.commitmentHex == commitHex {
ns, err := types.NamespaceFromHex(entry.pointer.Namespace)
if err == nil {
if b := s.findBlobInBufferLocked(ns, entry.pointer.Height, entry.pointer.Index); b != nil {
s.mu.Unlock()
return b, nil
}
}
}
}
s.mu.Unlock()
// Look up commitment index in S3.
key := s.key("index", "commitments", commitHex+".json")
data, err := s.getObject(ctx, key)
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, fmt.Errorf("get commitment index: %w", err)
}
var ptr commitPointer
if err := json.Unmarshal(data, &ptr); err != nil {
return nil, fmt.Errorf("decode commitment pointer: %w", err)
}
ns, err := types.NamespaceFromHex(ptr.Namespace)
if err != nil {
return nil, fmt.Errorf("parse namespace from commitment index: %w", err)
}
return s.GetBlob(ctx, ns, ptr.Height, ptr.Index)
}
func (s *S3Store) GetHeader(ctx context.Context, height uint64) (*types.Header, error) {
start := time.Now()
defer func() { s.metrics.ObserveStoreQueryDuration("GetHeader", time.Since(start)) }()
// Check buffer first.
if h := s.findHeaderInBuffer(height); h != nil {
return h, nil
}
cs := s.chunkStart(height)
key := s.key("headers", chunkFileName(cs))
data, err := s.getObject(ctx, key)
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, fmt.Errorf("get header chunk: %w", err)
}
headers, err := decodeS3Headers(data)
if err != nil {
return nil, err
}
for i := range headers {
if headers[i].Height == height {
return &headers[i],
nil
}
}
return nil, ErrNotFound
}
func (s *S3Store) GetNamespaces(ctx context.Context) ([]types.Namespace, error) {
key := s.key("meta", "namespaces.json")
data, err := s.getObject(ctx, key)
if err != nil {
if isNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("get namespaces: %w", err)
}
var hexList []string
if err := json.Unmarshal(data, &hexList); err != nil {
return nil, fmt.Errorf("decode namespaces: %w", err)
}
namespaces := make([]types.Namespace, 0, len(hexList))
for _, h := range hexList {
ns, err := types.NamespaceFromHex(h)
if err != nil {
return nil, fmt.Errorf("parse namespace %q: %w", h, err)
}
namespaces = append(namespaces, ns)
}
return namespaces, nil
}
func (s *S3Store) GetSyncState(ctx context.Context) (*types.SyncStatus, error) {
start := time.Now()
defer func() { s.metrics.ObserveStoreQueryDuration("GetSyncState", time.Since(start)) }()
key := s.key("meta", "sync_state.json")
data, err := s.getObject(ctx, key)
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, fmt.Errorf("get sync state: %w", err)
}
var state struct {
State int `json:"state"`
LatestHeight uint64 `json:"latest_height"`
NetworkHeight uint64 `json:"network_height"`
}
if err := json.Unmarshal(data, &state); err != nil {
return nil, fmt.Errorf("decode sync state: %w", err)
}
return &types.SyncStatus{
State: types.SyncState(state.State),
LatestHeight: state.LatestHeight,
NetworkHeight: state.NetworkHeight,
},
nil
}
func (s *S3Store) SetSyncState(ctx context.Context, status types.SyncStatus) error {
// Flush buffered data first — this is the checkpoint boundary.
if err := s.flush(ctx); err != nil {
return fmt.Errorf("flush before sync state: %w", err)
}
data, err := json.Marshal(struct {
State int `json:"state"`
LatestHeight uint64 `json:"latest_height"`
NetworkHeight uint64 `json:"network_height"`
}{
State: int(status.State),
LatestHeight: status.LatestHeight,
NetworkHeight: status.NetworkHeight,
})
if err != nil {
return fmt.Errorf("encode sync state: %w", err)
}
key := s.key("meta", "sync_state.json")
return s.putObject(ctx, key, data)
}
func (s *S3Store) Close() error {
return s.flush(context.Background())
}
// --- Flush ---
// flush drains the write buffer to S3. Called at checkpoint boundaries.
func (s *S3Store) flush(ctx context.Context) error {
s.mu.Lock()
blobBuf := s.blobBuf
headerBuf := s.headerBuf
commitBuf := s.commitBuf
s.blobBuf = make(map[blobChunkKey][]types.Blob)
s.headerBuf = make(map[uint64][]*types.Header)
s.commitBuf = nil
s.mu.Unlock()
if len(blobBuf) == 0 && len(headerBuf) == 0 && len(commitBuf) == 0 {
return nil
}
// Use a semaphore to bound concurrency.
sem := make(chan struct{}, maxFlushConcurrency)
var (
mu sync.Mutex
errs []error
)
addErr := func(err error) {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
var wg sync.WaitGroup
// Flush blob chunks.
for key, blobs := range blobBuf {
wg.Add(1)
go func(key blobChunkKey, blobs []types.Blob) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
if err := s.flushBlobChunk(ctx, key, blobs); err != nil {
addErr(err)
}
}(key, blobs)
}
// Flush header chunks.
for cs, headers := range headerBuf {
wg.Add(1)
go func(cs uint64, headers []*types.Header) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
if err := s.flushHeaderChunk(ctx, cs, headers); err != nil {
addErr(err)
}
}(cs, headers)
}
// Flush commitment indices.
for _, entry := range commitBuf {
wg.Add(1)
go func(e commitEntry) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
if err := s.flushCommitIndex(ctx, e); err != nil {
addErr(err)
}
}(entry)
}
wg.Wait()
return errors.Join(errs...)
}
func (s *S3Store) flushBlobChunk(ctx context.Context, key blobChunkKey, newBlobs []types.Blob) error {
objKey := s.key("blobs", key.namespace.String(), chunkFileName(key.chunkStart))
// Read existing chunk (may 404).
existing, err := s.getObject(ctx, objKey)
if err != nil && !isNotFound(err) {
return fmt.Errorf("read blob chunk for merge: %w", err)
}
var merged []types.Blob
if existing != nil {
merged, err = decodeS3Blobs(existing)
if err != nil {
return fmt.Errorf("decode existing blob chunk: %w", err)
}
}
merged = append(merged, newBlobs...)
merged = deduplicateBlobs(merged)
sort.Slice(merged, func(i, j int) bool {
if merged[i].Height != merged[j].Height {
return merged[i].Height < merged[j].Height
}
return merged[i].Index < merged[j].Index
})
data, err := encodeS3Blobs(merged)
if err != nil {
return fmt.Errorf("encode blob chunk: %w", err)
}
return s.putObject(ctx, objKey, data)
}
func (s *S3Store) flushHeaderChunk(ctx context.Context, cs uint64, newHeaders []*types.Header) error {
objKey := s.key("headers", chunkFileName(cs))
existing, err := s.getObject(ctx, objKey)
if err != nil && !isNotFound(err) {
return fmt.Errorf("read header chunk for merge: %w", err)
}
var merged []types.Header
if existing != nil {
merged, err = decodeS3Headers(existing)
if err != nil {
return fmt.Errorf("decode existing header chunk: %w", err)
}
}
for _, h := range newHeaders {
merged = append(merged, *h)
}
merged = deduplicateHeaders(merged)
sort.Slice(merged, func(i, j int) bool {
return merged[i].Height < merged[j].Height
})
data, err := encodeS3Headers(merged)
if err != nil {
return fmt.Errorf("encode header chunk: %w", err)
}
return s.putObject(ctx, objKey, data)
}
func (s *S3Store) flushCommitIndex(ctx context.Context, e commitEntry) error {
key := s.key("index", "commitments", e.commitmentHex+".json")
data, err := json.Marshal(e.pointer)
if err != nil {
return fmt.Errorf("encode commitment index: %w", err)
}
return s.putObject(ctx, key, data)
}
// --- Buffer lookup helpers ---
func (s *S3Store) findBlobInBuffer(ns types.Namespace, height uint64, index int) *types.Blob {
s.mu.Lock()
defer s.mu.Unlock()
return s.findBlobInBufferLocked(ns, height, index)
}
func (s *S3Store) findBlobInBufferLocked(ns types.Namespace, height uint64, index int) *types.Blob {
key := blobChunkKey{namespace: ns, chunkStart: s.chunkStart(height)}
for i := range s.blobBuf[key] {
b := &s.blobBuf[key][i]
if b.Height == height && b.Index == index {
cp := *b
return &cp
}
}
return nil
}
func (s *S3Store) findHeaderInBuffer(height uint64) *types.Header {
s.mu.Lock()
defer s.mu.Unlock()
cs := s.chunkStart(height)
for _, h := range s.headerBuf[cs] {
if h.Height == height {
cp := *h
return &cp
}
}
return nil
}
// --- S3 helpers ---
func (s *S3Store) getObject(ctx context.Context, key string) ([]byte, error) {
out, err := s.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
})
if err != nil {
return nil, err
}
defer out.Body.Close() //nolint:errcheck
return io.ReadAll(out.Body)
}
func (s *S3Store) putObject(ctx context.Context, key string, data []byte) error {
_, err := s.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
ContentType: aws.String("application/json"),
})
return err
}
// isNotFound returns true if the error indicates an S3 NoSuchKey.
func isNotFound(err error) bool {
var nsk *s3types.NoSuchKey
if errors.As(err, &nsk) {
return true
}
// Some S3-compatible services return NotFound via the HTTP status code
// wrapped in a smithy-go OperationError rather than a typed NoSuchKey error.
var nsb *s3types.NotFound
return errors.As(err, &nsb)
}
// --- Encoding/decoding ---
func encodeS3Blobs(blobs []types.Blob) ([]byte, error) {
out := make([]s3Blob, len(blobs))
for i, b := range blobs {
out[i] = s3Blob{
Height: b.Height,
Namespace: b.Namespace.String(),
Data: b.Data,
Commitment: b.Commitment,
ShareVersion: b.ShareVersion,
Signer: b.Signer,
Index: b.Index,
}
}
return json.Marshal(out)
}
func decodeS3Blobs(data []byte) ([]types.Blob, error) {
var raw []s3Blob
if err := json.Unmarshal(data, &raw); err != nil {
return nil, fmt.Errorf("decode blobs JSON: %w", err)
}
blobs := make([]types.Blob, len(raw))
for i, r := range raw {
ns, err := types.NamespaceFromHex(r.Namespace)
if err != nil {
return nil, fmt.Errorf("decode blob namespace: %w", err)
}
blobs[i] = types.Blob{
Height: r.Height,
Namespace: ns,
Data: r.Data,
Commitment: r.Commitment,
ShareVersion: r.ShareVersion,
Signer: r.Signer,
Index: r.Index,
}
}
return blobs, nil
}
func encodeS3Headers(headers []types.Header) ([]byte, error) {
out := make([]s3Header, len(headers))
for i, h := range headers {
out[i] = s3Header{
Height: h.Height,
Hash: h.Hash,
DataHash: h.DataHash,
Time: h.Time,
RawHeader: h.RawHeader,
}
}
return json.Marshal(out)
}
func decodeS3Headers(data []byte) ([]types.Header, error) {
var raw []s3Header
if err := json.Unmarshal(data, &raw); err != nil {
return nil, fmt.Errorf("decode headers JSON: %w", err)
}
headers := make([]types.Header, len(raw))
for i, r := range raw {
headers[i] = types.Header{
Height: r.Height,
Hash: r.Hash,
DataHash: r.DataHash,
Time: r.Time,
RawHeader: r.RawHeader,
}
}
return headers, nil
}
// --- Deduplication ---
func deduplicateBlobs(blobs []types.Blob) []types.Blob {
type blobKey struct {
height uint64
namespace types.Namespace
index int
}
seen := make(map[blobKey]struct{}, len(blobs))
out := make([]types.Blob, 0, len(blobs))
for _, b := range blobs {
k := blobKey{height: b.Height, namespace: b.Namespace, index: b.Index}
if _, ok := seen[k]; ok {
continue
}
seen[k] = struct{}{}
out = append(out, b)
}
return out
}
func deduplicateHeaders(headers []types.Header) []types.Header {
seen := make(map[uint64]struct{}, len(headers))
out := make([]types.Header, 0, len(headers))
for _, h := range headers {
if _, ok := seen[h.Height]; ok {
continue
}
seen[h.Height] = struct{}{}
out = append(out, h)
}
return out
}| r.ObserveBackfillStageDuration("fetch_header", 3*time.Millisecond) | ||
| r.IncBackfillStageErrors("store_header") |
There was a problem hiding this comment.
Calls to the new ObserveBackfillStageDuration and IncBackfillStageErrors methods are added to the TestNopRecorderDoesNotPanic test. This ensures that the nopRecorder correctly handles these new methods without panicking.
| r.ObserveBackfillStageDuration("fetch_header", 3*time.Millisecond) | |
| r.IncBackfillStageErrors("store_header") | |
| r.ObserveBackfillStageDuration("fetch_header", 3*time.Millisecond) | |
| r.IncBackfillStageErrors("store_header") |
- Document SetMetrics must-call-before-use contract - Use dedicated nsMu for PutNamespace to avoid holding buffer mutex during S3 network I/O - Add flushMu to serialize flush() calls, preventing concurrent read-modify-write races on S3 chunk objects - Pre-allocate merged slice in flushHeaderChunk (lint fix) - Guard division by zero in backfill progress when done==0 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (3)
pkg/sync/backfill.go (1)
190-196: Inconsistent metrics recording pattern forstore_headerandstore_blobs.
fetch_header(lines 183–184) records the duration unconditionally right after the call, before the error check — clean, single observation point.store_headerandstore_blobsinstead duplicate theObserveBackfillStageDurationcall in both branches (error path and fall-through), which is correct but verbose and inconsistent:// store_header — duration recorded twice (once per branch) stageStart = time.Now() if err := b.store.PutHeader(ctx, hdr); err != nil { b.metrics.ObserveBackfillStageDuration("store_header", time.Since(stageStart)) // ← in error branch b.metrics.IncBackfillStageErrors("store_header") return ... } b.metrics.ObserveBackfillStageDuration("store_header", time.Since(stageStart)) // ← in success branchAdopt the same pattern as
fetch_headerto record duration once before the error check:♻️ Proposed refactor
stageStart = time.Now() - if err := b.store.PutHeader(ctx, hdr); err != nil { - b.metrics.ObserveBackfillStageDuration("store_header", time.Since(stageStart)) - b.metrics.IncBackfillStageErrors("store_header") - return fmt.Errorf("put header: %w", err) - } - b.metrics.ObserveBackfillStageDuration("store_header", time.Since(stageStart)) + putHdrErr := b.store.PutHeader(ctx, hdr) + b.metrics.ObserveBackfillStageDuration("store_header", time.Since(stageStart)) + if putHdrErr != nil { + b.metrics.IncBackfillStageErrors("store_header") + return fmt.Errorf("put header: %w", putHdrErr) + } // ... stageStart = time.Now() - if err := b.store.PutBlobs(ctx, blobs); err != nil { - b.metrics.ObserveBackfillStageDuration("store_blobs", time.Since(stageStart)) - b.metrics.IncBackfillStageErrors("store_blobs") - return fmt.Errorf("put blobs: %w", err) - } - b.metrics.ObserveBackfillStageDuration("store_blobs", time.Since(stageStart)) + putBlobsErr := b.store.PutBlobs(ctx, blobs) + b.metrics.ObserveBackfillStageDuration("store_blobs", time.Since(stageStart)) + if putBlobsErr != nil { + b.metrics.IncBackfillStageErrors("store_blobs") + return fmt.Errorf("put blobs: %w", putBlobsErr) + }Also applies to: 208-215
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sync/backfill.go` around lines 190 - 196, The metrics for store_header are recorded twice; change to the same pattern used in fetch_header: start the timer (stageStart = time.Now()), call b.store.PutHeader(ctx, hdr), immediately call b.metrics.ObserveBackfillStageDuration("store_header", time.Since(stageStart)) before checking the error, then if err != nil call b.metrics.IncBackfillStageErrors("store_header") and return the wrapped error; apply the identical single-observation refactor to the store_blobs/PutBlobs block (the code around where PutBlobs is called and where b.metrics.IncBackfillStageErrors("store_blobs") is incremented).pkg/store/s3.go (2)
384-426: Refactor manualmulock/unlock inGetBlobByCommitmentto use a closure withdefer.The explicit
s.mu.Unlock()before the earlyreturn(line 397) and the post-loops.mu.Unlock()(line 403) are correct today but won't survive a panic inside the loop (no deferred release), and any future edit that adds an additional exit path from the loop body risks a lock leak.♻️ Proposed refactor using a helper closure
func (s *S3Store) GetBlobByCommitment(ctx context.Context, commitment []byte) (*types.Blob, error) { start := time.Now() defer func() { s.metrics.ObserveStoreQueryDuration("GetBlobByCommitment", time.Since(start)) }() // Check buffer first. commitHex := hex.EncodeToString(commitment) - s.mu.Lock() - for _, entry := range s.commitBuf { - if entry.commitmentHex == commitHex { - ns, err := types.NamespaceFromHex(entry.pointer.Namespace) - if err == nil { - if b := s.findBlobInBufferLocked(ns, entry.pointer.Height, entry.pointer.Index); b != nil { - s.mu.Unlock() - return b, nil - } - } - } - } - s.mu.Unlock() + if b := func() *types.Blob { + s.mu.Lock() + defer s.mu.Unlock() + for _, entry := range s.commitBuf { + if entry.commitmentHex != commitHex { + continue + } + ns, err := types.NamespaceFromHex(entry.pointer.Namespace) + if err == nil { + if b := s.findBlobInBufferLocked(ns, entry.pointer.Height, entry.pointer.Index); b != nil { + return b + } + } + } + return nil + }(); b != nil { + return b, nil + } // Look up commitment index in S3.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/store/s3.go` around lines 384 - 426, GetBlobByCommitment manually locks s.mu then unlocks in two places; change this to acquire the lock inside a short closure (or anonymous func) so you can defer s.mu.Unlock() to guarantee release on any return/panic. Specifically, wrap the loop that scans s.commitBuf and the call to s.findBlobInBufferLocked(ns, entry.pointer.Height, entry.pointer.Index) in a closure that does s.mu.Lock(); defer s.mu.Unlock(); and returns the found *types.Blob (or nil) so the outer function can return early without needing explicit Unlock calls; update callers inside GetBlobByCommitment accordingly.
95-153: Extract the shared constructor body to eliminate ~50 lines of duplication.
NewS3StoreandNewS3StoreWithStaticCredentialsdiffer by exactly oneoptsentry. Everything else — region option, endpoint option,LoadDefaultConfig, S3 client construction, chunk-size defaulting, andnewS3Storecall — is copy-pasted.♻️ Proposed refactor
-// NewS3Store creates a new S3Store from the given config. -func NewS3Store(ctx context.Context, cfg *config.S3Config) (*S3Store, error) { - opts := []func(*awsconfig.LoadOptions) error{} - if cfg.Region != "" { - opts = append(opts, awsconfig.WithRegion(cfg.Region)) - } - awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...) - if err != nil { - return nil, fmt.Errorf("load AWS config: %w", err) - } - - var s3Opts []func(*s3.Options) - if cfg.Endpoint != "" { - s3Opts = append(s3Opts, func(o *s3.Options) { - o.BaseEndpoint = aws.String(cfg.Endpoint) - o.UsePathStyle = true - }) - } - - client := s3.NewFromConfig(awsCfg, s3Opts...) - - chunkSize := uint64(cfg.ChunkSize) - if chunkSize == 0 { - chunkSize = 64 - } - - return newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize), nil -} +// NewS3Store creates a new S3Store from the given config. +func NewS3Store(ctx context.Context, cfg *config.S3Config) (*S3Store, error) { + return buildS3Store(ctx, cfg) +} -// NewS3StoreWithStaticCredentials creates an S3Store with explicit credentials. -// Useful for testing against MinIO or other S3-compatible services. -func NewS3StoreWithStaticCredentials(ctx context.Context, cfg *config.S3Config, accessKey, secretKey, token string) (*S3Store, error) { - opts := []func(*awsconfig.LoadOptions) error{ - awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, token)), - } - if cfg.Region != "" { - opts = append(opts, awsconfig.WithRegion(cfg.Region)) - } - awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...) - if err != nil { - return nil, fmt.Errorf("load AWS config: %w", err) - } - - var s3Opts []func(*s3.Options) - if cfg.Endpoint != "" { - s3Opts = append(s3Opts, func(o *s3.Options) { - o.BaseEndpoint = aws.String(cfg.Endpoint) - o.UsePathStyle = true - }) - } - - client := s3.NewFromConfig(awsCfg, s3Opts...) - - chunkSize := uint64(cfg.ChunkSize) - if chunkSize == 0 { - chunkSize = 64 - } - - return newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize), nil -} +// NewS3StoreWithStaticCredentials creates an S3Store with explicit credentials. +// Useful for testing against MinIO or other S3-compatible services. +func NewS3StoreWithStaticCredentials(ctx context.Context, cfg *config.S3Config, accessKey, secretKey, token string) (*S3Store, error) { + return buildS3Store(ctx, cfg, + awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, token)), + ) +} + +func buildS3Store(ctx context.Context, cfg *config.S3Config, extraOpts ...func(*awsconfig.LoadOptions) error) (*S3Store, error) { + opts := append(extraOpts, []func(*awsconfig.LoadOptions) error{}...) + if cfg.Region != "" { + opts = append(opts, awsconfig.WithRegion(cfg.Region)) + } + awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...) + if err != nil { + return nil, fmt.Errorf("load AWS config: %w", err) + } + var s3Opts []func(*s3.Options) + if cfg.Endpoint != "" { + s3Opts = append(s3Opts, func(o *s3.Options) { + o.BaseEndpoint = aws.String(cfg.Endpoint) + o.UsePathStyle = true + }) + } + chunkSize := uint64(cfg.ChunkSize) + if chunkSize == 0 { + chunkSize = 64 + } + return newS3Store(s3.NewFromConfig(awsCfg, s3Opts...), cfg.Bucket, cfg.Prefix, chunkSize), nil +}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/store/s3.go` around lines 95 - 153, Both NewS3Store and NewS3StoreWithStaticCredentials duplicate the same constructor body; extract the shared logic into a helper (e.g. buildS3Store or newS3StoreFromLoadOptions) that accepts ctx, cfg and a variadic slice of awsconfig LoadOptions (or []func(*awsconfig.LoadOptions) error), runs awsconfig.LoadDefaultConfig, applies the endpoint s3.Options, constructs the s3 client via s3.NewFromConfig, computes the chunkSize default, and returns newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize); then have NewS3Store call that helper with no extra opts and NewS3StoreWithStaticCredentials prepend the credentials provider opt (credentials.NewStaticCredentialsProvider(...)) before calling the same helper, removing the duplicated code around awsconfig.LoadDefaultConfig, s3.NewFromConfig, chunkSize handling and the final newS3Store call.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/store/s3.go`:
- Around line 156-177: newS3Store can be constructed with chunkSize==0 which
causes a divide-by-zero in chunkStart; add a guard in newS3Store to set a
sensible default (e.g., 64) when the passed chunkSize is zero (or use an
existing DefaultChunkSize constant), so that chunkSize is always non-zero before
storing it on the S3Store struct and before chunkStart is ever called.
- Around line 545-560: The flush() function swaps buffers and releases s.mu
before performing S3 puts, causing transient ErrNotFound for concurrent reads;
fix by adding "flushing" snapshot maps (e.g., flushingBlobBuf,
flushingHeaderBuf, flushingCommitBuf) that flush() populates under flushMu
before releasing s.mu and clears after the upload goroutines complete (after
wg.Wait()), and update read-path helpers (findBlobInBufferLocked,
findHeaderInBuffer, and any GetBlobByCommitment lookup) to consult buffer ->
flushing snapshot -> S3 (holding flushMu appropriately while reading the
flushing snapshot) so readers can see in-flight data until flush finishes.
- Around line 729-738: The getObject method in S3Store currently calls
io.ReadAll(out.Body) which can OOM on very large or corrupt objects; update
S3Store.getObject to wrap out.Body with an io.LimitReader using a defined max
size constant (e.g., maxObjectRead = 256<<20) and read through that reader, and
if the read hits the limit return a clear error indicating the object is too
large instead of allocating unbounded memory; ensure you still close out.Body
and return the read bytes and propagated errors from the limited read.
In `@pkg/sync/backfill.go`:
- Around line 114-115: The final throughput calculation in backfill.go divides
by elapsed.Seconds() which can be zero and produce +Inf; modify the block that
computes elapsed := time.Since(startTime) and rate := float64(totalHeights) /
elapsed.Seconds() to guard against zero elapsed time (e.g., if elapsed.Seconds()
== 0 set rate = 0) or use a small non-zero fallback before dividing; update the
variables elapsed, rate, totalHeights and startTime in that function accordingly
so the log never prints +Inf.
---
Duplicate comments:
In `@pkg/sync/backfill.go`:
- Around line 55-60: The existing guard that checks if done == 0 inside the
ticker loop (the block that logs "backfill progress: waiting for first batch"
with b.log.Info().Uint64("target", toHeight).Msg(...)) is correct and requires
no change; keep the early continue to avoid performing the rate/remaining
arithmetic when no heights have been processed yet.
---
Nitpick comments:
In `@pkg/store/s3.go`:
- Around line 384-426: GetBlobByCommitment manually locks s.mu then unlocks in
two places; change this to acquire the lock inside a short closure (or anonymous
func) so you can defer s.mu.Unlock() to guarantee release on any return/panic.
Specifically, wrap the loop that scans s.commitBuf and the call to
s.findBlobInBufferLocked(ns, entry.pointer.Height, entry.pointer.Index) in a
closure that does s.mu.Lock(); defer s.mu.Unlock(); and returns the found
*types.Blob (or nil) so the outer function can return early without needing
explicit Unlock calls; update callers inside GetBlobByCommitment accordingly.
- Around line 95-153: Both NewS3Store and NewS3StoreWithStaticCredentials
duplicate the same constructor body; extract the shared logic into a helper
(e.g. buildS3Store or newS3StoreFromLoadOptions) that accepts ctx, cfg and a
variadic slice of awsconfig LoadOptions (or []func(*awsconfig.LoadOptions)
error), runs awsconfig.LoadDefaultConfig, applies the endpoint s3.Options,
constructs the s3 client via s3.NewFromConfig, computes the chunkSize default,
and returns newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize); then have
NewS3Store call that helper with no extra opts and
NewS3StoreWithStaticCredentials prepend the credentials provider opt
(credentials.NewStaticCredentialsProvider(...)) before calling the same helper,
removing the duplicated code around awsconfig.LoadDefaultConfig,
s3.NewFromConfig, chunkSize handling and the final newS3Store call.
In `@pkg/sync/backfill.go`:
- Around line 190-196: The metrics for store_header are recorded twice; change
to the same pattern used in fetch_header: start the timer (stageStart =
time.Now()), call b.store.PutHeader(ctx, hdr), immediately call
b.metrics.ObserveBackfillStageDuration("store_header", time.Since(stageStart))
before checking the error, then if err != nil call
b.metrics.IncBackfillStageErrors("store_header") and return the wrapped error;
apply the identical single-observation refactor to the store_blobs/PutBlobs
block (the code around where PutBlobs is called and where
b.metrics.IncBackfillStageErrors("store_blobs") is incremented).
| func (s *S3Store) getObject(ctx context.Context, key string) ([]byte, error) { | ||
| out, err := s.client.GetObject(ctx, &s3.GetObjectInput{ | ||
| Bucket: aws.String(s.bucket), | ||
| Key: aws.String(key), | ||
| }) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer out.Body.Close() //nolint:errcheck | ||
| return io.ReadAll(out.Body) |
There was a problem hiding this comment.
io.ReadAll with no size limit — large or corrupt chunks can cause OOM.
Blob and header chunks are append-only: each flushBlobChunk / flushHeaderChunk reads the existing S3 object, appends new entries, deduplicates, and writes back a larger object. Over time (especially for popular namespaces with small chunkSize or a large number of indices within a chunk range) these objects can grow substantially. io.ReadAll with no guard will materialise the entire object into a single heap buffer.
A misconfigured chunkSize (e.g., chunkSize = 1) or a corrupt oversized object can exhaust available memory with no user-visible error short of an OOM kill.
🛡️ Proposed fix: cap with `io.LimitReader`
Define a reasonable upper bound (e.g., 256 MiB) and hard-error on objects that exceed it rather than silently allocating:
+const maxChunkBytes = 256 << 20 // 256 MiB hard cap per chunk object
func (s *S3Store) getObject(ctx context.Context, key string) ([]byte, error) {
out, err := s.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
})
if err != nil {
return nil, err
}
defer out.Body.Close() //nolint:errcheck
- return io.ReadAll(out.Body)
+ lr := io.LimitReader(out.Body, maxChunkBytes+1)
+ data, err := io.ReadAll(lr)
+ if err != nil {
+ return nil, err
+ }
+ if int64(len(data)) > maxChunkBytes {
+ return nil, fmt.Errorf("chunk object %q exceeds size limit (%d bytes)", key, maxChunkBytes)
+ }
+ return data, nil
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/store/s3.go` around lines 729 - 738, The getObject method in S3Store
currently calls io.ReadAll(out.Body) which can OOM on very large or corrupt
objects; update S3Store.getObject to wrap out.Body with an io.LimitReader using
a defined max size constant (e.g., maxObjectRead = 256<<20) and read through
that reader, and if the read hits the limit return a clear error indicating the
object is too large instead of allocating unbounded memory; ensure you still
close out.Body and return the read bytes and propagated errors from the limited
read.
Add an S3Store implementation that batches multiple heights into single S3 objects (chunks) to reduce object count and request costs. Writes are buffered in memory and flushed at checkpoint boundaries (SetSyncState).
Also includes profiling support, backfill progress tracking, and metrics formatting fixes from the working tree.
Overview
Summary by CodeRabbit
New Features
Tests