Skip to content

feat: add S3-compatible storage backend#32

Merged
tac0turtle merged 2 commits intomainfrom
feat/s3-storage-backend
Feb 19, 2026
Merged

feat: add S3-compatible storage backend#32
tac0turtle merged 2 commits intomainfrom
feat/s3-storage-backend

Conversation

@tac0turtle
Copy link
Contributor

@tac0turtle tac0turtle commented Feb 19, 2026

Add an S3Store implementation that batches multiple heights into single S3 objects (chunks) to reduce object count and request costs. Writes are buffered in memory and flushed at checkpoint boundaries (SetSyncState).

  • New S3Config with bucket, prefix, region, endpoint, chunk_size
  • Store factory in main.go (sqlite/s3 switch)
  • 13 mock-based unit tests covering round-trips, chunk boundaries, buffer read-through, idempotent writes, commitment index lookups
  • Config validation for S3-specific required fields

Also includes profiling support, backfill progress tracking, and metrics formatting fixes from the working tree.

Overview

Summary by CodeRabbit

  • New Features

    • Added S3-compatible object storage option alongside SQLite.
    • Enabled configurable profiling (pprof) endpoint for performance inspection.
    • Backfill now reports per-stage metrics and periodic progress/ETA.
    • Hex inputs now accept and strip 0x/0X prefixes automatically.
  • Tests

    • Added tests for profiling configuration validation and comprehensive S3 storage behavior.

Add an S3Store implementation that batches multiple heights into single
S3 objects (chunks) to reduce object count and request costs. Writes are
buffered in memory and flushed at checkpoint boundaries (SetSyncState).

- New S3Config with bucket, prefix, region, endpoint, chunk_size
- Store factory in main.go (sqlite/s3 switch)
- 13 mock-based unit tests covering round-trips, chunk boundaries,
  buffer read-through, idempotent writes, commitment index lookups
- Config validation for S3-specific required fields

Also includes profiling support, backfill progress tracking, and
metrics formatting fixes from the working tree.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link

coderabbitai bot commented Feb 19, 2026

📝 Walkthrough

Walkthrough

Adds pprof profiling server support, an S3 storage backend alongside SQLite, expanded storage/config validation, backfill-stage metrics and wiring, hex 0x-prefix stripping for CLI hex inputs, and refactors main startup to centralize profiling, data-source, and store initialization.

Changes

Cohort / File(s) Summary
Configuration
config/config.go, config/load.go, config/load_test.go
Adds ProfilingConfig and S3Config; adds StorageConfig.Type and Storage.S3; moves storage validation to validateStorage; validates profiling.listen_addr when enabled; adds tests for profiling validation.
CLI hex handling
pkg/types/types.go, cmd/apex/blob_cmd.go
Adds StripHexPrefix and updates namespace/commitment hex parsing in blob commands to strip optional 0x/0X prefixes before decoding.
Startup & wiring
cmd/apex/main.go
Refactors runIndexer wiring: introduces setupProfiling(), openDataSource(), openStore() to centralize initialization and integrate profiling server into lifecycle and graceful shutdown.
Profiling server
pkg/profile/server.go
New pprof HTTP Server type with NewServer, Start, and Shutdown methods to run/stop profiling endpoints.
S3 storage backend
pkg/store/s3.go, pkg/store/s3_test.go
Adds S3Store implementation with buffered writes, chunked encoding, flush pipeline, read/write APIs, sync state handling, metrics attachment, and comprehensive tests including an in-memory mock S3.
Metrics API & tests
pkg/metrics/metrics.go, pkg/metrics/metrics_test.go
Adds backfill-stage metrics to Recorder (ObserveBackfillStageDuration, IncBackfillStageErrors); implements on PromRecorder and updates tests to assert new metrics.
Backfill metrics integration
pkg/sync/backfill.go, pkg/sync/coordinator.go
Wires metrics.Recorder into Backfiller; records per-stage durations/errors, periodic progress logging, throughput and ETA reporting.
Dependencies
go.mod
Adds AWS SDK v2 components and related modules to support S3 implementation.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Run as runIndexer
  participant Profile as profile.Server
  participant StoreFactory as openStore
  participant DataSrcFactory as openDataSource
  participant Coordinator as Coordinator

  Run->>Profile: setupProfiling(cfg)
  alt profiling enabled
    Profile-->>Run: *Server
    Run->>Profile: Start()
  end
  Run->>StoreFactory: openStore(ctx, cfg)
  StoreFactory-->>Run: Store (SQLite or S3)
  Run->>DataSrcFactory: openDataSource(ctx, cfg)
  DataSrcFactory-->>Run: DataSource + ProofForwarder
  Run->>Coordinator: New Coordinator(store, datasource, metrics)
  Coordinator-->>Run: start coordination/backfill/stream
  Run->>Profile: on shutdown -> Shutdown(ctx)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰 I nibbled prefixes, peeled the 0x rind,

Set up a server to profile and mind,
I hopped through S3 buckets, buffered and neat,
Counted backfill beats and metrics sweet,
A tiny rabbit cheers the code complete. 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 24.49% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title 'feat: add S3-compatible storage backend' directly and clearly summarizes the main change—introducing S3 storage support. The changeset extensively implements S3Store, S3Config, and related wiring throughout the codebase, making the title fully relevant and aligned with the primary objective.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/s3-storage-backend

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances Apex's data persistence capabilities by introducing an S3-compatible storage backend, offering a scalable alternative to the existing SQLite database. It also improves observability with new profiling endpoints and detailed backfill progress metrics, making the system more robust and easier to monitor. The changes include updates to configuration, core application logic for storage selection, and new utility functions for data handling.

Highlights

  • S3-Compatible Storage Backend: Introduced a new S3Store implementation that allows Apex to use S3-compatible object storage for blobs and headers. This backend batches multiple heights into single S3 objects (chunks) to reduce object count and request costs. Writes are buffered in memory and flushed at checkpoint boundaries (SetSyncState).
  • Configurable Storage Type: The storage backend is now configurable via the StorageConfig in config.go, allowing users to switch between the default SQLite and the new S3 backend. New configuration options for S3, including bucket, prefix, region, endpoint, and chunk size, have been added.
  • Profiling Support: Added support for pprof profiling endpoints, enabling performance analysis. A new ProfilingConfig allows enabling and configuring the profiling HTTP server.
  • Enhanced Backfill Metrics and Progress Tracking: Implemented detailed metrics for backfill operations, including duration and errors per stage (e.g., fetching headers, storing blobs). Periodic progress logging has also been added to provide better visibility into long-running backfill processes.
  • Hex String Utility: A new utility function StripHexPrefix was added to the types package to robustly handle hex-encoded strings, removing optional '0x' or '0X' prefixes, improving parsing consistency.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • cmd/apex/blob_cmd.go
    • Imported types package.
    • Updated hex decoding calls to use types.StripHexPrefix for improved robustness.
  • cmd/apex/main.go
    • Imported profile package.
    • Added setupProfiling function to initialize and start the pprof server.
    • Introduced openDataSource and openStore functions to abstract data source and storage initialization.
    • Modified runIndexer to use openStore for storage backend selection and setupProfiling.
    • Updated gracefulShutdown to include the profiling server.
  • config/config.go
    • Added ProfilingConfig struct for pprof settings.
    • Updated StorageConfig to include a Type field and an embedded S3Config pointer.
    • Added S3Config struct for S3-specific configuration parameters.
    • Updated DefaultConfig to include default values for StorageConfig.Type and ProfilingConfig.
  • config/load.go
    • Updated the default configuration YAML comments for storage and added a new profiling section.
    • Implemented validateStorage function to enforce S3 and SQLite configuration rules.
    • Modified the main validate function to use validateStorage and validate ProfilingConfig.
  • config/load_test.go
    • Added TestDefaultConfigProfilingDefaults to verify default profiling settings.
    • Added TestLoadProfilingEnabledRequiresListenAddr to test validation for profiling configuration.
  • go.mod
    • Added new dependencies for AWS SDK v2 (aws-sdk-go-v2, config, credentials, service/s3).
  • go.sum
    • Updated with checksums for the newly added AWS SDK v2 dependencies.
  • pkg/metrics/metrics.go
    • Extended Recorder interface with ObserveBackfillStageDuration and IncBackfillStageErrors.
    • Implemented new backfill metrics in PromRecorder to track stage durations and errors.
  • pkg/metrics/metrics_test.go
    • Added test cases to verify the registration and functionality of new backfill metrics.
  • pkg/profile/server.go
    • Added new file defining Server struct and methods to serve pprof endpoints over HTTP.
  • pkg/store/s3.go
    • Added new file implementing S3Store for S3-compatible object storage.
    • Implemented buffering and chunking logic for blobs and headers.
    • Provided methods for PutBlobs, PutHeader, PutNamespace, GetBlob, GetBlobs, GetBlobByCommitment, GetHeader, GetNamespaces, GetSyncState, SetSyncState, and Close.
    • Included helper functions for S3 object key generation, encoding/decoding, and error handling.
  • pkg/store/s3_test.go
    • Added new file containing mock-based unit tests for the S3Store implementation.
    • Tests cover basic CRUD operations, buffer read-through, chunk boundaries, idempotent puts, and namespace handling.
  • pkg/sync/backfill.go
    • Added metrics.Recorder to the Backfiller struct.
    • Integrated backfill progress tracking and logging with ETA calculation.
    • Instrumented processHeight with metrics to record duration and errors for each backfill stage.
  • pkg/sync/coordinator.go
    • Passed the metrics recorder to the Backfiller instance during initialization.
  • pkg/types/types.go
    • Added StripHexPrefix utility function to remove '0x' or '0X' from hex strings.
    • Updated NamespaceFromHex to utilize StripHexPrefix for consistent parsing.
Activity
  • The pull request introduces a new S3-compatible storage backend, profiling support, and enhanced backfill metrics.
  • The author has provided a detailed description outlining the new S3Store implementation, its buffering mechanism, configuration, and testing coverage.
  • The PR also mentions including profiling support, backfill progress tracking, and metrics formatting fixes from the working tree.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (7)
pkg/store/s3.go (3)

296-317: GetBlobs performs unbounded in-memory accumulation.

For a large height range spanning many chunks, fetchBlobChunks downloads and deserializes all matching S3 objects into memory before applying offset/limit. If the caller requests a small page from a large range, this is wasteful and could OOM. Worth noting as a known limitation or adding chunk-level short-circuiting in the future.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/s3.go` around lines 296 - 317, GetBlobs currently accumulates all
matching S3 objects in memory (via fetchBlobChunks) then applies offset/limit,
which can OOM for large ranges; modify the flow so fetchBlobChunks supports
early termination by taking offset and limit (or a targetCount = offset+limit)
and yields or returns blobs incrementally, and short-circuit once enough blobs
have been collected (respecting collectBufferedBlobs and deduplicateBlobs
interactions), so applyOffsetLimit receives at most the required window; update
S3Store.GetBlobs to pass offset/limit (or targetCount) into fetchBlobChunks and
stop fetching more S3 chunks when the required number of post-deduplication
blobs is reached.

92-150: Significant code duplication between NewS3Store and NewS3StoreWithStaticCredentials.

Both constructors share identical logic for endpoint options, chunk-size defaulting, and newS3Store wiring. Consider extracting the shared setup into a helper.

Sketch: extract shared logic
+func buildS3Client(ctx context.Context, cfg *config.S3Config, extraOpts ...func(*awsconfig.LoadOptions) error) (s3Client, uint64, error) {
+	opts := append([]func(*awsconfig.LoadOptions) error{}, extraOpts...)
+	if cfg.Region != "" {
+		opts = append(opts, awsconfig.WithRegion(cfg.Region))
+	}
+	awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)
+	if err != nil {
+		return nil, 0, fmt.Errorf("load AWS config: %w", err)
+	}
+	var s3Opts []func(*s3.Options)
+	if cfg.Endpoint != "" {
+		s3Opts = append(s3Opts, func(o *s3.Options) {
+			o.BaseEndpoint = aws.String(cfg.Endpoint)
+			o.UsePathStyle = true
+		})
+	}
+	client := s3.NewFromConfig(awsCfg, s3Opts...)
+	chunkSize := uint64(cfg.ChunkSize)
+	if chunkSize == 0 {
+		chunkSize = 64
+	}
+	return client, chunkSize, nil
+}
+
 func NewS3Store(ctx context.Context, cfg *config.S3Config) (*S3Store, error) {
-	// ... duplicated code ...
+	client, chunkSize, err := buildS3Client(ctx, cfg)
+	if err != nil {
+		return nil, err
+	}
+	return newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize), nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/s3.go` around lines 92 - 150, Extract the duplicated setup in
NewS3Store and NewS3StoreWithStaticCredentials into a small helper that accepts
the loaded awsCfg and cfg and returns the constructed *S3Store (or the s3.Client
and chunkSize) so both constructors only differ in how they build the
awsconfig.LoadOptions (credentials provider vs not). Specifically, move creation
of s3Opts (cfg.Endpoint handling, BaseEndpoint/UsePathStyle), the
s3.NewFromConfig call, the chunkSize defaulting (cfg.ChunkSize -> default 64)
and the final newS3Store(...) call into a helper (referencing s3.NewFromConfig,
cfg.Endpoint, chunkSize and newS3Store) and have NewS3Store and
NewS3StoreWithStaticCredentials just build opts, call
awsconfig.LoadDefaultConfig, then call that helper with awsCfg and cfg.

380-399: Manual lock/unlock in GetBlobByCommitment is fragile — consider restructuring.

The manual Lock()/Unlock() pattern (lines 387-399) with an early unlock-and-return at line 393-394 works but is easy to break during maintenance. A small helper or extracting the locked section into its own function would be safer.

Sketch: extract locked lookup
+func (s *S3Store) findCommitInBuffer(commitHex string) *types.Blob {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	for _, entry := range s.commitBuf {
+		if entry.commitmentHex == commitHex {
+			ns, err := types.NamespaceFromHex(entry.pointer.Namespace)
+			if err == nil {
+				if b := s.findBlobInBufferLocked(ns, entry.pointer.Height, entry.pointer.Index); b != nil {
+					return b
+				}
+			}
+		}
+	}
+	return nil
+}

 func (s *S3Store) GetBlobByCommitment(ctx context.Context, commitment []byte) (*types.Blob, error) {
 	start := time.Now()
 	defer func() { s.metrics.ObserveStoreQueryDuration("GetBlobByCommitment", time.Since(start)) }()

 	commitHex := hex.EncodeToString(commitment)
-	s.mu.Lock()
-	for _, entry := range s.commitBuf {
-		...
-	}
-	s.mu.Unlock()
+	if b := s.findCommitInBuffer(commitHex); b != nil {
+		return b, nil
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/s3.go` around lines 380 - 399, The manual mu.Lock()/Unlock() in
S3Store.GetBlobByCommitment is fragile; extract the buffer lookup into a helper
that handles locking and returns the found blob (if any) to avoid early-return
while holding the mutex. Add a method on S3Store (e.g.
getBlobFromCommitBuf(commitHex string) (*types.Blob, bool)) that acquires s.mu,
iterates s.commitBuf, calls s.findBlobInBufferLocked(ns, entry.pointer.Height,
entry.pointer.Index) where appropriate, releases s.mu, and returns the blob and
true when found (nil,false otherwise). Then replace the manual lock/unlock block
in GetBlobByCommitment with a single call to that helper and handle the returned
blob normally.
cmd/apex/main.go (1)

216-221: Type-switch for metrics wiring is fragile.

Adding a third store backend requires remembering to update this switch. Consider defining a small optional interface (e.g., MetricsAware) and using a single type assertion instead:

♻️ Proposed refactor
-	// Wire metrics into the store.
-	switch s := db.(type) {
-	case *store.SQLiteStore:
-		s.SetMetrics(rec)
-	case *store.S3Store:
-		s.SetMetrics(rec)
-	}
+	// Wire metrics into the store.
+	if ma, ok := db.(interface{ SetMetrics(metrics.Recorder) }); ok {
+		ma.SetMetrics(rec)
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/apex/main.go` around lines 216 - 221, Create a small optional interface
named MetricsAware (with SetMetrics(*recType) or matching signature used by
existing stores) and use a single type assertion against db instead of the
switch; e.g., define interface MetricsAware { SetMetrics(...same params...) },
then replace the switch on db.(type) with if ma, ok := db.(MetricsAware); ok {
ma.SetMetrics(rec) } so any store implementing SetMetrics (e.g.,
store.SQLiteStore, store.S3Store) will be wired automatically without editing
this site when new backends are added.
config/load.go (1)

153-179: Validation function mutates config (sets default ChunkSize).

validateStorage silently assigns s.S3.ChunkSize = 64 on line 166 when the value is zero. Mixing mutation into a validate* function is surprising. Consider moving the default into DefaultConfig() or a separate applyDefaults step so validate* stays purely read-only.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@config/load.go` around lines 153 - 179, validateStorage currently mutates the
config by setting s.S3.ChunkSize = 64 when zero; change this by removing any
assignment from validateStorage so it only inspects StorageConfig/S3 fields and
returns errors, and instead set the default ChunkSize in the config
initialization path (e.g., in DefaultConfig() or a new applyDefaults(cfg
*Config) function that sets StorageConfig.S3.ChunkSize = 64 when zero). Keep the
existing validation checks (e.g., chunk size positivity) in validateStorage, and
ensure callers invoke the defaults-applier (DefaultConfig or applyDefaults)
before calling validateStorage.
pkg/store/s3_test.go (1)

66-555: Consider adding a concurrent-access test to exercise the sync.Mutex in S3Store.

The S3Store struct uses a sync.Mutex for buffer access, and the coding guidelines require running tests with -race. A test that concurrently calls PutBlobs/GetBlob (and maybe SetSyncState) would increase confidence in the locking correctness.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/s3_test.go` around lines 66 - 555, Add a new test (e.g.
TestS3Store_ConcurrentAccess) that concurrently exercises S3Store's mutex by
launching multiple goroutines which call PutBlobs, GetBlob (and optionally
SetSyncState) repeatedly against the same store instance returned by
newTestS3Store; use a sync.WaitGroup to coordinate, randomize or vary
heights/namespaces, and assert no errors or panics and that retrieved blobs
match expected data; run this test under the race detector (-race) to validate
the locking in methods PutBlobs, GetBlob, and SetSyncState.
config/config.go (1)

44-50: ChunkSize type differs between config and store—minor consistency concern.

S3Config.ChunkSize is int (line 49) while S3Store.chunkSize is uint64 (line 82 in pkg/store/s3.go), with explicit conversion at the boundary (pkg/store/s3.go:112). Validation in config/load.go:165-170 ensures the value is non-negative before conversion, so the type cast is safe. However, the differing types add a maintenance risk. Since YAML naturally decodes into int anyway, consider keeping both as int for consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@config/config.go` around lines 44 - 50, S3Config.ChunkSize is int while
S3Store.chunkSize is uint64, causing an unnecessary type conversion; change
S3Store.chunkSize from uint64 to int (symbol: S3Store.chunkSize) and remove the
explicit cast where S3Config.ChunkSize is assigned (the conversion currently
near the S3 store initialization), and adjust any downstream uses that assumed
uint64 to work with int (or cast locally where truly needed) so types remain
consistent with the YAML-decoded int and existing validation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/store/s3.go`:
- Around line 166-168: The SetMetrics method on S3Store writes s.metrics without
synchronization while metric-recording defer closures (which read s.metrics)
access it concurrently; fix this by making access to s.metrics
thread-safe—either (a) add a sync.RWMutex on S3Store and take a write lock in
S3Store.SetMetrics and read locks in the defer closures that reference
s.metrics, or (b) replace s.metrics with an atomic.Value storing the metrics.
Update S3Store.SetMetrics to use the chosen synchronization and update all
metric reads (the defer closures that reference s.metrics) to load under the
same mechanism; alternatively, if you prefer not to change runtime behavior,
document clearly on S3Store.SetMetrics that it must be called before any other
method and make s.metrics immutable after construction.
- Around line 540-609: The flush function can run concurrently causing
read-modify-write races in flushBlobChunk/flushHeaderChunk; add a dedicated
mutex on S3Store (e.g., flushMu sync.Mutex) and acquire it at the start of
S3Store.flush (s.flushMu.Lock(); defer s.flushMu.Unlock()) to serialize all
flush() invocations so only one flush reads/merges/writes chunks at a time;
update the S3Store struct to include flushMu and ensure callers
(Close/SetSyncState) that trigger flush continue to call the same method so the
serialization takes effect.
- Around line 228-259: PutNamespace currently grabs S3Store.s.mu and holds it
across network I/O (getObject/putObject), blocking all other store ops; change
it to use a dedicated namespace mutex (e.g., add S3Store.nsMu) or no global s.mu
so only namespace operations are serialized: acquire nsMu before reading/writing
the namespaces object, release it before returning, and leave s.mu out of this
function; ensure you still handle S3 not-found, JSON unmarshal/marshal, and
return errors as before, so only the mutex protecting namespaces (used around
the read-modify-write of key "meta/namespaces.json" in PutNamespace) is changed.
- Around line 644-674: Pre-allocate the merged slice when there is no existing
chunk to avoid the prealloc linter warning: in flushHeaderChunk, before the loop
over newHeaders, set merged to a preallocated slice when existing == nil (e.g.,
make([]types.Header, 0, len(newHeaders))), and keep the current behavior of
assigning merged = decodeS3Headers(existing) when existing != nil; then append
newHeaders into merged and proceed as before (this targets function
flushHeaderChunk, variables merged, existing, newHeaders, and decodeS3Headers).

In `@pkg/sync/backfill.go`:
- Around line 51-69: The progress reporter can produce +Inf/negative ETA when
done==0 (rate==0); update the ticker select block to guard against zero
divisions by checking done==0 and elapsed.Seconds() > 0 before computing rate
and remaining (variables: processedHeights, startTime, totalHeights, rate,
remaining), and when rate <= 0 set a safe fallback (e.g., rate = 0 and eta
string = "unknown" or "-" ) so you never convert +Inf to time.Duration; then use
those safe rate/eta strings in the b.log.Info() call (keep the same log fields
like "height", "target", "progress", "rate", "eta", "backfill progress").

---

Nitpick comments:
In `@cmd/apex/main.go`:
- Around line 216-221: Create a small optional interface named MetricsAware
(with SetMetrics(*recType) or matching signature used by existing stores) and
use a single type assertion against db instead of the switch; e.g., define
interface MetricsAware { SetMetrics(...same params...) }, then replace the
switch on db.(type) with if ma, ok := db.(MetricsAware); ok { ma.SetMetrics(rec)
} so any store implementing SetMetrics (e.g., store.SQLiteStore, store.S3Store)
will be wired automatically without editing this site when new backends are
added.

In `@config/config.go`:
- Around line 44-50: S3Config.ChunkSize is int while S3Store.chunkSize is
uint64, causing an unnecessary type conversion; change S3Store.chunkSize from
uint64 to int (symbol: S3Store.chunkSize) and remove the explicit cast where
S3Config.ChunkSize is assigned (the conversion currently near the S3 store
initialization), and adjust any downstream uses that assumed uint64 to work with
int (or cast locally where truly needed) so types remain consistent with the
YAML-decoded int and existing validation.

In `@config/load.go`:
- Around line 153-179: validateStorage currently mutates the config by setting
s.S3.ChunkSize = 64 when zero; change this by removing any assignment from
validateStorage so it only inspects StorageConfig/S3 fields and returns errors,
and instead set the default ChunkSize in the config initialization path (e.g.,
in DefaultConfig() or a new applyDefaults(cfg *Config) function that sets
StorageConfig.S3.ChunkSize = 64 when zero). Keep the existing validation checks
(e.g., chunk size positivity) in validateStorage, and ensure callers invoke the
defaults-applier (DefaultConfig or applyDefaults) before calling
validateStorage.

In `@pkg/store/s3_test.go`:
- Around line 66-555: Add a new test (e.g. TestS3Store_ConcurrentAccess) that
concurrently exercises S3Store's mutex by launching multiple goroutines which
call PutBlobs, GetBlob (and optionally SetSyncState) repeatedly against the same
store instance returned by newTestS3Store; use a sync.WaitGroup to coordinate,
randomize or vary heights/namespaces, and assert no errors or panics and that
retrieved blobs match expected data; run this test under the race detector
(-race) to validate the locking in methods PutBlobs, GetBlob, and SetSyncState.

In `@pkg/store/s3.go`:
- Around line 296-317: GetBlobs currently accumulates all matching S3 objects in
memory (via fetchBlobChunks) then applies offset/limit, which can OOM for large
ranges; modify the flow so fetchBlobChunks supports early termination by taking
offset and limit (or a targetCount = offset+limit) and yields or returns blobs
incrementally, and short-circuit once enough blobs have been collected
(respecting collectBufferedBlobs and deduplicateBlobs interactions), so
applyOffsetLimit receives at most the required window; update S3Store.GetBlobs
to pass offset/limit (or targetCount) into fetchBlobChunks and stop fetching
more S3 chunks when the required number of post-deduplication blobs is reached.
- Around line 92-150: Extract the duplicated setup in NewS3Store and
NewS3StoreWithStaticCredentials into a small helper that accepts the loaded
awsCfg and cfg and returns the constructed *S3Store (or the s3.Client and
chunkSize) so both constructors only differ in how they build the
awsconfig.LoadOptions (credentials provider vs not). Specifically, move creation
of s3Opts (cfg.Endpoint handling, BaseEndpoint/UsePathStyle), the
s3.NewFromConfig call, the chunkSize defaulting (cfg.ChunkSize -> default 64)
and the final newS3Store(...) call into a helper (referencing s3.NewFromConfig,
cfg.Endpoint, chunkSize and newS3Store) and have NewS3Store and
NewS3StoreWithStaticCredentials just build opts, call
awsconfig.LoadDefaultConfig, then call that helper with awsCfg and cfg.
- Around line 380-399: The manual mu.Lock()/Unlock() in
S3Store.GetBlobByCommitment is fragile; extract the buffer lookup into a helper
that handles locking and returns the found blob (if any) to avoid early-return
while holding the mutex. Add a method on S3Store (e.g.
getBlobFromCommitBuf(commitHex string) (*types.Blob, bool)) that acquires s.mu,
iterates s.commitBuf, calls s.findBlobInBufferLocked(ns, entry.pointer.Height,
entry.pointer.Index) where appropriate, releases s.mu, and returns the blob and
true when found (nil,false otherwise). Then replace the manual lock/unlock block
in GetBlobByCommitment with a single call to that helper and handle the returned
blob normally.

Comment on lines +166 to +168
func (s *S3Store) SetMetrics(m metrics.Recorder) {
s.metrics = m
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

SetMetrics is not thread-safe.

s.metrics is read without holding the lock in all metric-recording defer closures (e.g., lines 194, 218, 265, etc.), but SetMetrics writes it without synchronization. If SetMetrics is called after the store is in use, this is a data race. Either document that SetMetrics must be called before any other method, or protect the field.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/s3.go` around lines 166 - 168, The SetMetrics method on S3Store
writes s.metrics without synchronization while metric-recording defer closures
(which read s.metrics) access it concurrently; fix this by making access to
s.metrics thread-safe—either (a) add a sync.RWMutex on S3Store and take a write
lock in S3Store.SetMetrics and read locks in the defer closures that reference
s.metrics, or (b) replace s.metrics with an atomic.Value storing the metrics.
Update S3Store.SetMetrics to use the chosen synchronization and update all
metric reads (the defer closures that reference s.metrics) to load under the
same mechanism; alternatively, if you prefer not to change runtime behavior,
document clearly on S3Store.SetMetrics that it must be called before any other
method and make s.metrics immutable after construction.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request introduces an S3-compatible storage backend, profiling support, backfill progress tracking, and metrics formatting fixes. The changes are well-structured, adding new configurations, store implementations, and integrating them into the existing system. The addition of StripHexPrefix in pkg/types/types.go is a good utility function. The S3 store implementation includes buffering and flushing logic, which is crucial for optimizing S3 interactions. The new profiling server and enhanced metrics for backfill stages provide valuable observability. Overall, the changes enhance the system's flexibility and monitoring capabilities.

Comment on lines +206 to +208
if cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" {
return fmt.Errorf("profiling.listen_addr is required when profiling is enabled")
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

A new validation check is added for ProfilingConfig, ensuring that profiling.listen_addr is provided if profiling is enabled. This prevents potential runtime errors if the profiling server cannot bind to an address.

Suggested change
if cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" {
return fmt.Errorf("profiling.listen_addr is required when profiling is enabled")
}
if cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" {
return fmt.Errorf("profiling.listen_addr is required when profiling is enabled")
}

Comment on lines +93 to 95
close(stopProgress)
<-progressStopped
return fmt.Errorf("checkpoint at height %d: %w", batchEnd, err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the previous error handling, the progress reporting goroutine is gracefully shut down if an error occurs during checkpointing. This ensures resource cleanup.

Suggested change
close(stopProgress)
<-progressStopped
return fmt.Errorf("checkpoint at height %d: %w", batchEnd, err)
close(stopProgress)
<-progressStopped

Comment on lines +80 to 82
close(stopProgress)
<-progressStopped
return err

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The stopProgress channel is closed and progressStopped is waited upon before returning an error. This ensures that the progress reporting goroutine is properly shut down when an error occurs during batch processing, preventing goroutine leaks.

Suggested change
close(stopProgress)
<-progressStopped
return err
close(stopProgress)
<-progressStopped

Comment on lines 1 to 861
package store

import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"sort"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"

"github.com/evstack/apex/config"
"github.com/evstack/apex/pkg/metrics"
"github.com/evstack/apex/pkg/types"
)

// s3Client abstracts the S3 operations used by S3Store, enabling mock-based testing.
type s3Client interface {
GetObject(ctx context.Context, input *s3.GetObjectInput, opts ...func(*s3.Options)) (*s3.GetObjectOutput, error)
PutObject(ctx context.Context, input *s3.PutObjectInput, opts ...func(*s3.Options)) (*s3.PutObjectOutput, error)
}

// blobChunkKey identifies a blob chunk in S3 by namespace and chunk start height.
type blobChunkKey struct {
namespace types.Namespace
chunkStart uint64
}

// commitEntry is a buffered commitment index write.
type commitEntry struct {
commitmentHex string
pointer commitPointer
}

// commitPointer is the JSON content of a commitment index object.
type commitPointer struct {
Namespace string `json:"namespace"`
Height uint64 `json:"height"`
Index int `json:"index"`
}

// s3Blob is the JSON representation of a blob in an S3 chunk object.
type s3Blob struct {
Height uint64 `json:"height"`
Namespace string `json:"namespace"`
Data []byte `json:"data"`
Commitment []byte `json:"commitment"`
ShareVersion uint32 `json:"share_version"`
Signer []byte `json:"signer"`
Index int `json:"index"`
}

// s3Header is the JSON representation of a header in an S3 chunk object.
type s3Header struct {
Height uint64 `json:"height"`
Hash []byte `json:"hash"`
DataHash []byte `json:"data_hash"`
Time time.Time `json:"time"`
RawHeader []byte `json:"raw_header"`
}

// maxFlushConcurrency bounds parallel S3 I/O during flush.
const maxFlushConcurrency = 8

// S3Store implements Store using an S3-compatible object store.
// Writes are buffered in memory and flushed to S3 at checkpoint boundaries
// (SetSyncState calls and Close).
type S3Store struct {
client s3Client
bucket string
prefix string
chunkSize uint64
metrics metrics.Recorder

mu sync.Mutex
blobBuf map[blobChunkKey][]types.Blob
headerBuf map[uint64][]*types.Header
commitBuf []commitEntry
}

// NewS3Store creates a new S3Store from the given config.
func NewS3Store(ctx context.Context, cfg *config.S3Config) (*S3Store, error) {
opts := []func(*awsconfig.LoadOptions) error{}
if cfg.Region != "" {
opts = append(opts, awsconfig.WithRegion(cfg.Region))
}
awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("load AWS config: %w", err)
}

var s3Opts []func(*s3.Options)
if cfg.Endpoint != "" {
s3Opts = append(s3Opts, func(o *s3.Options) {
o.BaseEndpoint = aws.String(cfg.Endpoint)
o.UsePathStyle = true
})
}

client := s3.NewFromConfig(awsCfg, s3Opts...)

chunkSize := uint64(cfg.ChunkSize)
if chunkSize == 0 {
chunkSize = 64
}

return newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize), nil
}

// NewS3StoreWithStaticCredentials creates an S3Store with explicit credentials.
// Useful for testing against MinIO or other S3-compatible services.
func NewS3StoreWithStaticCredentials(ctx context.Context, cfg *config.S3Config, accessKey, secretKey, token string) (*S3Store, error) {
opts := []func(*awsconfig.LoadOptions) error{
awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, token)),
}
if cfg.Region != "" {
opts = append(opts, awsconfig.WithRegion(cfg.Region))
}
awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("load AWS config: %w", err)
}

var s3Opts []func(*s3.Options)
if cfg.Endpoint != "" {
s3Opts = append(s3Opts, func(o *s3.Options) {
o.BaseEndpoint = aws.String(cfg.Endpoint)
o.UsePathStyle = true
})
}

client := s3.NewFromConfig(awsCfg, s3Opts...)

chunkSize := uint64(cfg.ChunkSize)
if chunkSize == 0 {
chunkSize = 64
}

return newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize), nil
}

// newS3Store creates an S3Store with an injected client (for testing).
func newS3Store(client s3Client, bucket, prefix string, chunkSize uint64) *S3Store {
return &S3Store{
client: client,
bucket: bucket,
prefix: strings.TrimSuffix(prefix, "/"),
chunkSize: chunkSize,
metrics: metrics.Nop(),
blobBuf: make(map[blobChunkKey][]types.Blob),
headerBuf: make(map[uint64][]*types.Header),
}
}

// SetMetrics sets the metrics recorder.
func (s *S3Store) SetMetrics(m metrics.Recorder) {
s.metrics = m
}

// chunkStart returns the chunk start height for a given height.
func (s *S3Store) chunkStart(height uint64) uint64 {
return (height / s.chunkSize) * s.chunkSize
}

// key builds an S3 object key with the configured prefix.
func (s *S3Store) key(parts ...string) string {
if s.prefix == "" {
return strings.Join(parts, "/")
}
return s.prefix + "/" + strings.Join(parts, "/")
}

func chunkFileName(start uint64) string {
return fmt.Sprintf("chunk_%016d.json", start)
}

// --- Write methods (buffer) ---

func (s *S3Store) PutBlobs(ctx context.Context, blobs []types.Blob) error {
if len(blobs) == 0 {
return nil
}
start := time.Now()
defer func() { s.metrics.ObserveStoreQueryDuration("PutBlobs", time.Since(start)) }()

s.mu.Lock()
defer s.mu.Unlock()

for i := range blobs {
b := &blobs[i]
key := blobChunkKey{namespace: b.Namespace, chunkStart: s.chunkStart(b.Height)}
s.blobBuf[key] = append(s.blobBuf[key], *b)

s.commitBuf = append(s.commitBuf, commitEntry{
commitmentHex: hex.EncodeToString(b.Commitment),
pointer: commitPointer{
Namespace: b.Namespace.String(),
Height: b.Height,
Index: b.Index,
},
})
}
return nil
}

func (s *S3Store) PutHeader(ctx context.Context, header *types.Header) error {
start := time.Now()
defer func() { s.metrics.ObserveStoreQueryDuration("PutHeader", time.Since(start)) }()

s.mu.Lock()
defer s.mu.Unlock()

cs := s.chunkStart(header.Height)
s.headerBuf[cs] = append(s.headerBuf[cs], header)
return nil
}

func (s *S3Store) PutNamespace(ctx context.Context, ns types.Namespace) error {
s.mu.Lock()
defer s.mu.Unlock()

key := s.key("meta", "namespaces.json")

existing, err := s.getObject(ctx, key)
if err != nil && !isNotFound(err) {
return fmt.Errorf("get namespaces: %w", err)
}

namespaces := make([]string, 0, 1)
if existing != nil {
if err := json.Unmarshal(existing, &namespaces); err != nil {
return fmt.Errorf("decode namespaces: %w", err)
}
}

nsHex := ns.String()
for _, n := range namespaces {
if n == nsHex {
return nil
}
}
namespaces = append(namespaces, nsHex)

data, err := json.Marshal(namespaces)
if err != nil {
return fmt.Errorf("encode namespaces: %w", err)
}
return s.putObject(ctx, key, data)
}

// --- Read methods ---

func (s *S3Store) GetBlob(ctx context.Context, ns types.Namespace, height uint64, index int) (*types.Blob, error) {
start := time.Now()
defer func() { s.metrics.ObserveStoreQueryDuration("GetBlob", time.Since(start)) }()

// Check buffer first.
if b := s.findBlobInBuffer(ns, height, index); b != nil {
return b, nil
}

cs := s.chunkStart(height)
key := s.key("blobs", ns.String(), chunkFileName(cs))

data, err := s.getObject(ctx, key)
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, fmt.Errorf("get blob chunk: %w", err)
}

blobs, err := decodeS3Blobs(data)
if err != nil {
return nil, err
}

for i := range blobs {
if blobs[i].Height == height && blobs[i].Index == index && blobs[i].Namespace == ns {
return &blobs[i], nil
}
}
return nil, ErrNotFound
}

func (s *S3Store) GetBlobs(ctx context.Context, ns types.Namespace, startHeight, endHeight uint64, limit, offset int) ([]types.Blob, error) {
start := time.Now()
defer func() { s.metrics.ObserveStoreQueryDuration("GetBlobs", time.Since(start)) }()

buffered := s.collectBufferedBlobs(ns, startHeight, endHeight)

s3Blobs, err := s.fetchBlobChunks(ctx, ns, startHeight, endHeight)
if err != nil {
return nil, err
}

allBlobs := deduplicateBlobs(append(buffered, s3Blobs...))

sort.Slice(allBlobs, func(i, j int) bool {
if allBlobs[i].Height != allBlobs[j].Height {
return allBlobs[i].Height < allBlobs[j].Height
}
return allBlobs[i].Index < allBlobs[j].Index
})

return applyOffsetLimit(allBlobs, offset, limit), nil
}

// collectBufferedBlobs returns in-buffer blobs matching the namespace and height range.
func (s *S3Store) collectBufferedBlobs(ns types.Namespace, startHeight, endHeight uint64) []types.Blob {
s.mu.Lock()
defer s.mu.Unlock()

var result []types.Blob
for key, bufs := range s.blobBuf {
if key.namespace != ns {
continue
}
for i := range bufs {
if bufs[i].Height >= startHeight && bufs[i].Height <= endHeight {
result = append(result, bufs[i])
}
}
}
return result
}

// fetchBlobChunks reads all S3 blob chunks overlapping the height range for a namespace.
func (s *S3Store) fetchBlobChunks(ctx context.Context, ns types.Namespace, startHeight, endHeight uint64) ([]types.Blob, error) {
firstChunk := s.chunkStart(startHeight)
lastChunk := s.chunkStart(endHeight)

var allBlobs []types.Blob
for cs := firstChunk; cs <= lastChunk; cs += s.chunkSize {
key := s.key("blobs", ns.String(), chunkFileName(cs))
data, err := s.getObject(ctx, key)
if err != nil {
if isNotFound(err) {
continue
}
return nil, fmt.Errorf("get blob chunk at %d: %w", cs, err)
}

blobs, err := decodeS3Blobs(data)
if err != nil {
return nil, err
}
for i := range blobs {
if blobs[i].Height >= startHeight && blobs[i].Height <= endHeight && blobs[i].Namespace == ns {
allBlobs = append(allBlobs, blobs[i])
}
}
}
return allBlobs, nil
}

func applyOffsetLimit(items []types.Blob, offset, limit int) []types.Blob {
if offset > 0 {
if offset >= len(items) {
return nil
}
items = items[offset:]
}
if limit > 0 && limit < len(items) {
items = items[:limit]
}
return items
}

func (s *S3Store) GetBlobByCommitment(ctx context.Context, commitment []byte) (*types.Blob, error) {
start := time.Now()
defer func() { s.metrics.ObserveStoreQueryDuration("GetBlobByCommitment", time.Since(start)) }()

// Check buffer first.
commitHex := hex.EncodeToString(commitment)

s.mu.Lock()
for _, entry := range s.commitBuf {
if entry.commitmentHex == commitHex {
ns, err := types.NamespaceFromHex(entry.pointer.Namespace)
if err == nil {
if b := s.findBlobInBufferLocked(ns, entry.pointer.Height, entry.pointer.Index); b != nil {
s.mu.Unlock()
return b, nil
}
}
}
}
s.mu.Unlock()

// Look up commitment index in S3.
key := s.key("index", "commitments", commitHex+".json")
data, err := s.getObject(ctx, key)
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, fmt.Errorf("get commitment index: %w", err)
}

var ptr commitPointer
if err := json.Unmarshal(data, &ptr); err != nil {
return nil, fmt.Errorf("decode commitment pointer: %w", err)
}

ns, err := types.NamespaceFromHex(ptr.Namespace)
if err != nil {
return nil, fmt.Errorf("parse namespace from commitment index: %w", err)
}

return s.GetBlob(ctx, ns, ptr.Height, ptr.Index)
}

func (s *S3Store) GetHeader(ctx context.Context, height uint64) (*types.Header, error) {
start := time.Now()
defer func() { s.metrics.ObserveStoreQueryDuration("GetHeader", time.Since(start)) }()

// Check buffer first.
if h := s.findHeaderInBuffer(height); h != nil {
return h, nil
}

cs := s.chunkStart(height)
key := s.key("headers", chunkFileName(cs))

data, err := s.getObject(ctx, key)
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, fmt.Errorf("get header chunk: %w", err)
}

headers, err := decodeS3Headers(data)
if err != nil {
return nil, err
}
for i := range headers {
if headers[i].Height == height {
return &headers[i], nil
}
}
return nil, ErrNotFound
}

func (s *S3Store) GetNamespaces(ctx context.Context) ([]types.Namespace, error) {
key := s.key("meta", "namespaces.json")
data, err := s.getObject(ctx, key)
if err != nil {
if isNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("get namespaces: %w", err)
}

var hexList []string
if err := json.Unmarshal(data, &hexList); err != nil {
return nil, fmt.Errorf("decode namespaces: %w", err)
}

namespaces := make([]types.Namespace, 0, len(hexList))
for _, h := range hexList {
ns, err := types.NamespaceFromHex(h)
if err != nil {
return nil, fmt.Errorf("parse namespace %q: %w", h, err)
}
namespaces = append(namespaces, ns)
}
return namespaces, nil
}

func (s *S3Store) GetSyncState(ctx context.Context) (*types.SyncStatus, error) {
start := time.Now()
defer func() { s.metrics.ObserveStoreQueryDuration("GetSyncState", time.Since(start)) }()

key := s.key("meta", "sync_state.json")
data, err := s.getObject(ctx, key)
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, fmt.Errorf("get sync state: %w", err)
}

var state struct {
State int `json:"state"`
LatestHeight uint64 `json:"latest_height"`
NetworkHeight uint64 `json:"network_height"`
}
if err := json.Unmarshal(data, &state); err != nil {
return nil, fmt.Errorf("decode sync state: %w", err)
}
return &types.SyncStatus{
State: types.SyncState(state.State),
LatestHeight: state.LatestHeight,
NetworkHeight: state.NetworkHeight,
}, nil
}

func (s *S3Store) SetSyncState(ctx context.Context, status types.SyncStatus) error {
// Flush buffered data first — this is the checkpoint boundary.
if err := s.flush(ctx); err != nil {
return fmt.Errorf("flush before sync state: %w", err)
}

data, err := json.Marshal(struct {
State int `json:"state"`
LatestHeight uint64 `json:"latest_height"`
NetworkHeight uint64 `json:"network_height"`
}{
State: int(status.State),
LatestHeight: status.LatestHeight,
NetworkHeight: status.NetworkHeight,
})
if err != nil {
return fmt.Errorf("encode sync state: %w", err)
}

key := s.key("meta", "sync_state.json")
return s.putObject(ctx, key, data)
}

func (s *S3Store) Close() error {
return s.flush(context.Background())
}

// --- Flush ---

// flush drains the write buffer to S3. Called at checkpoint boundaries.
func (s *S3Store) flush(ctx context.Context) error {
s.mu.Lock()
blobBuf := s.blobBuf
headerBuf := s.headerBuf
commitBuf := s.commitBuf
s.blobBuf = make(map[blobChunkKey][]types.Blob)
s.headerBuf = make(map[uint64][]*types.Header)
s.commitBuf = nil
s.mu.Unlock()

if len(blobBuf) == 0 && len(headerBuf) == 0 && len(commitBuf) == 0 {
return nil
}

// Use a semaphore to bound concurrency.
sem := make(chan struct{}, maxFlushConcurrency)
var (
mu sync.Mutex
errs []error
)
addErr := func(err error) {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}

var wg sync.WaitGroup

// Flush blob chunks.
for key, blobs := range blobBuf {
wg.Add(1)
go func(key blobChunkKey, blobs []types.Blob) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
if err := s.flushBlobChunk(ctx, key, blobs); err != nil {
addErr(err)
}
}(key, blobs)
}

// Flush header chunks.
for cs, headers := range headerBuf {
wg.Add(1)
go func(cs uint64, headers []*types.Header) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
if err := s.flushHeaderChunk(ctx, cs, headers); err != nil {
addErr(err)
}
}(cs, headers)
}

// Flush commitment indices.
for _, entry := range commitBuf {
wg.Add(1)
go func(e commitEntry) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
if err := s.flushCommitIndex(ctx, e); err != nil {
addErr(err)
}
}(entry)
}

wg.Wait()
return errors.Join(errs...)
}

func (s *S3Store) flushBlobChunk(ctx context.Context, key blobChunkKey, newBlobs []types.Blob) error {
objKey := s.key("blobs", key.namespace.String(), chunkFileName(key.chunkStart))

// Read existing chunk (may 404).
existing, err := s.getObject(ctx, objKey)
if err != nil && !isNotFound(err) {
return fmt.Errorf("read blob chunk for merge: %w", err)
}

var merged []types.Blob
if existing != nil {
merged, err = decodeS3Blobs(existing)
if err != nil {
return fmt.Errorf("decode existing blob chunk: %w", err)
}
}

merged = append(merged, newBlobs...)
merged = deduplicateBlobs(merged)
sort.Slice(merged, func(i, j int) bool {
if merged[i].Height != merged[j].Height {
return merged[i].Height < merged[j].Height
}
return merged[i].Index < merged[j].Index
})

data, err := encodeS3Blobs(merged)
if err != nil {
return fmt.Errorf("encode blob chunk: %w", err)
}
return s.putObject(ctx, objKey, data)
}

func (s *S3Store) flushHeaderChunk(ctx context.Context, cs uint64, newHeaders []*types.Header) error {
objKey := s.key("headers", chunkFileName(cs))

existing, err := s.getObject(ctx, objKey)
if err != nil && !isNotFound(err) {
return fmt.Errorf("read header chunk for merge: %w", err)
}

var merged []types.Header
if existing != nil {
merged, err = decodeS3Headers(existing)
if err != nil {
return fmt.Errorf("decode existing header chunk: %w", err)
}
}

for _, h := range newHeaders {
merged = append(merged, *h)
}

merged = deduplicateHeaders(merged)
sort.Slice(merged, func(i, j int) bool {
return merged[i].Height < merged[j].Height
})

data, err := encodeS3Headers(merged)
if err != nil {
return fmt.Errorf("encode header chunk: %w", err)
}
return s.putObject(ctx, objKey, data)
}

func (s *S3Store) flushCommitIndex(ctx context.Context, e commitEntry) error {
key := s.key("index", "commitments", e.commitmentHex+".json")
data, err := json.Marshal(e.pointer)
if err != nil {
return fmt.Errorf("encode commitment index: %w", err)
}
return s.putObject(ctx, key, data)
}

// --- Buffer lookup helpers ---

func (s *S3Store) findBlobInBuffer(ns types.Namespace, height uint64, index int) *types.Blob {
s.mu.Lock()
defer s.mu.Unlock()
return s.findBlobInBufferLocked(ns, height, index)
}

func (s *S3Store) findBlobInBufferLocked(ns types.Namespace, height uint64, index int) *types.Blob {
key := blobChunkKey{namespace: ns, chunkStart: s.chunkStart(height)}
for i := range s.blobBuf[key] {
b := &s.blobBuf[key][i]
if b.Height == height && b.Index == index {
cp := *b
return &cp
}
}
return nil
}

func (s *S3Store) findHeaderInBuffer(height uint64) *types.Header {
s.mu.Lock()
defer s.mu.Unlock()

cs := s.chunkStart(height)
for _, h := range s.headerBuf[cs] {
if h.Height == height {
cp := *h
return &cp
}
}
return nil
}

// --- S3 helpers ---

func (s *S3Store) getObject(ctx context.Context, key string) ([]byte, error) {
out, err := s.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
})
if err != nil {
return nil, err
}
defer out.Body.Close() //nolint:errcheck
return io.ReadAll(out.Body)
}

func (s *S3Store) putObject(ctx context.Context, key string, data []byte) error {
_, err := s.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
ContentType: aws.String("application/json"),
})
return err
}

// isNotFound returns true if the error indicates an S3 NoSuchKey.
func isNotFound(err error) bool {
var nsk *s3types.NoSuchKey
if errors.As(err, &nsk) {
return true
}
// Some S3-compatible services return NotFound via the HTTP status code
// wrapped in a smithy-go OperationError rather than a typed NoSuchKey error.
var nsb *s3types.NotFound
return errors.As(err, &nsb)
}

// --- Encoding/decoding ---

func encodeS3Blobs(blobs []types.Blob) ([]byte, error) {
out := make([]s3Blob, len(blobs))
for i, b := range blobs {
out[i] = s3Blob{
Height: b.Height,
Namespace: b.Namespace.String(),
Data: b.Data,
Commitment: b.Commitment,
ShareVersion: b.ShareVersion,
Signer: b.Signer,
Index: b.Index,
}
}
return json.Marshal(out)
}

func decodeS3Blobs(data []byte) ([]types.Blob, error) {
var raw []s3Blob
if err := json.Unmarshal(data, &raw); err != nil {
return nil, fmt.Errorf("decode blobs JSON: %w", err)
}
blobs := make([]types.Blob, len(raw))
for i, r := range raw {
ns, err := types.NamespaceFromHex(r.Namespace)
if err != nil {
return nil, fmt.Errorf("decode blob namespace: %w", err)
}
blobs[i] = types.Blob{
Height: r.Height,
Namespace: ns,
Data: r.Data,
Commitment: r.Commitment,
ShareVersion: r.ShareVersion,
Signer: r.Signer,
Index: r.Index,
}
}
return blobs, nil
}

func encodeS3Headers(headers []types.Header) ([]byte, error) {
out := make([]s3Header, len(headers))
for i, h := range headers {
out[i] = s3Header{
Height: h.Height,
Hash: h.Hash,
DataHash: h.DataHash,
Time: h.Time,
RawHeader: h.RawHeader,
}
}
return json.Marshal(out)
}

func decodeS3Headers(data []byte) ([]types.Header, error) {
var raw []s3Header
if err := json.Unmarshal(data, &raw); err != nil {
return nil, fmt.Errorf("decode headers JSON: %w", err)
}
headers := make([]types.Header, len(raw))
for i, r := range raw {
headers[i] = types.Header{
Height: r.Height,
Hash: r.Hash,
DataHash: r.DataHash,
Time: r.Time,
RawHeader: r.RawHeader,
}
}
return headers, nil
}

// --- Deduplication ---

func deduplicateBlobs(blobs []types.Blob) []types.Blob {
type blobKey struct {
height uint64
namespace types.Namespace
index int
}
seen := make(map[blobKey]struct{}, len(blobs))
out := make([]types.Blob, 0, len(blobs))
for _, b := range blobs {
k := blobKey{height: b.Height, namespace: b.Namespace, index: b.Index}
if _, ok := seen[k]; ok {
continue
}
seen[k] = struct{}{}
out = append(out, b)
}
return out
}

func deduplicateHeaders(headers []types.Header) []types.Header {
seen := make(map[uint64]struct{}, len(headers))
out := make([]types.Header, 0, len(headers))
for _, h := range headers {
if _, ok := seen[h.Height]; ok {
continue
}
seen[h.Height] = struct{}{}
out = append(out, h)
}
return out
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This new file introduces the S3Store implementation, providing an S3-compatible object store backend for Apex. It includes buffering, chunking, and flushing logic to optimize interactions with S3, along with methods for reading and writing blobs, headers, namespaces, and sync state. This is a significant new feature for flexible storage options.

package store

import (
	"bytes"
	"context"
	"encoding/hex"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"sort"
	"strings"
	"sync"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	awsconfig "github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/credentials"
	"github.com/aws/aws-sdk-go-v2/service/s3"
	s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"

	"github.com/evstack/apex/config"
	"github.com/evstack/apex/pkg/metrics"
	"github.com/evstack/apex/pkg/types"
)

// s3Client abstracts the S3 operations used by S3Store, enabling mock-based testing.
type s3Client interface {
	GetObject(ctx context.Context, input *s3.GetObjectInput, opts ...func(*s3.Options)) (*s3.GetObjectOutput, error)
	PutObject(ctx context.Context, input *s3.PutObjectInput, opts ...func(*s3.Options)) (*s3.PutObjectOutput, error)
}

// blobChunkKey identifies a blob chunk in S3 by namespace and chunk start height.
type blobChunkKey struct {
	namespace  types.Namespace
	chunkStart uint64
}

// commitEntry is a buffered commitment index write.
type commitEntry struct {
	commitmentHex string
	pointer       commitPointer
}

// commitPointer is the JSON content of a commitment index object.
type commitPointer struct {
	Namespace string `json:"namespace"`
	Height    uint64 `json:"height"`
	Index     int    `json:"index"`
}

// s3Blob is the JSON representation of a blob in an S3 chunk object.
type s3Blob struct {
	Height       uint64 `json:"height"`
	Namespace    string `json:"namespace"`
	Data         []byte `json:"data"`
	Commitment   []byte `json:"commitment"`
	ShareVersion uint32 `json:"share_version"`
	Signer       []byte `json:"signer"`
	Index        int    `json:"index"`
}

// s3Header is the JSON representation of a header in an S3 chunk object.
type s3Header struct {
	Height    uint64    `json:"height"`
	Hash      []byte    `json:"hash"`
	DataHash  []byte    `json:"data_hash"`
	Time      time.Time `json:"time"`
	RawHeader []byte    `json:"raw_header"`
}

// maxFlushConcurrency bounds parallel S3 I/O during flush.
const maxFlushConcurrency = 8

// S3Store implements Store using an S3-compatible object store.
// Writes are buffered in memory and flushed to S3 at checkpoint boundaries
// (SetSyncState calls and Close).
type S3Store struct {
	client    s3Client
	bucket    string
	prefix    string
	chunkSize uint64
	metrics   metrics.Recorder

	mu        sync.Mutex
	blobBuf   map[blobChunkKey][]types.Blob
	headerBuf map[uint64][]*types.Header
	commitBuf []commitEntry
}

// NewS3Store creates a new S3Store from the given config.
func NewS3Store(ctx context.Context, cfg *config.S3Config) (*S3Store, error) {
	opts := []func(*awsconfig.LoadOptions) error{}
	if cfg.Region != "" {
		opts = append(opts, awsconfig.WithRegion(cfg.Region))
	}
	awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)
	if err != nil {
		return nil, fmt.Errorf("load AWS config: %w", err)
	}

	var s3Opts []func(*s3.Options)
	if cfg.Endpoint != "" {
		s3Opts = append(s3Opts, func(o *s3.Options) {
			o.BaseEndpoint = aws.String(cfg.Endpoint)
			o.UsePathStyle = true
		})
	}

	client := s3.NewFromConfig(awsCfg, s3Opts...)

	chunkSize := uint64(cfg.ChunkSize)
	if chunkSize == 0 {
		chunkSize = 64
	}

	return newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize), nil
}

// NewS3StoreWithStaticCredentials creates an S3Store with explicit credentials.
// Useful for testing against MinIO or other S3-compatible services.
func NewS3StoreWithStaticCredentials(ctx context.Context, cfg *config.S3Config, accessKey, secretKey, token string) (*S3Store, error) {
	opts := []func(*awsconfig.LoadOptions) error{
		awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, token)),
	}
	if cfg.Region != "" {
		opts = append(opts, awsconfig.WithRegion(cfg.Region))
	}
	awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)
	if err != nil {
		return nil, fmt.Errorf("load AWS config: %w", err)
	}

	var s3Opts []func(*s3.Options)
	if cfg.Endpoint != "" {
		s3Opts = append(s3Opts, func(o *s3.Options) {
			o.BaseEndpoint = aws.String(cfg.Endpoint)
			o.UsePathStyle = true
		})
	}

	client := s3.NewFromConfig(awsCfg, s3Opts...)

	chunkSize := uint64(cfg.ChunkSize)
	if chunkSize == 0 {
		chunkSize = 64
	}

	return newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize), nil
}

// newS3Store creates an S3Store with an injected client (for testing).
func newS3Store(client s3Client, bucket, prefix string, chunkSize uint64) *S3Store {
	return &S3Store{
		client:    client,
		bucket:    bucket,
		prefix:    strings.TrimSuffix(prefix, "/"),
		chunkSize: chunkSize,
		metrics:   metrics.Nop(),
		blobBuf:   make(map[blobChunkKey][]types.Blob),
		headerBuf: make(map[uint64][]*types.Header),
	}
}

// SetMetrics sets the metrics recorder.
func (s *S3Store) SetMetrics(m metrics.Recorder) {
	s.metrics = m
}

// chunkStart returns the chunk start height for a given height.
func (s *S3Store) chunkStart(height uint64) uint64 {
	return (height / s.chunkSize) * s.chunkSize
}

// key builds an S3 object key with the configured prefix.
func (s *S3Store) key(parts ...string) string {
	if s.prefix == "" {
		return strings.Join(parts, "/")
	}
	return s.prefix + "/" + strings.Join(parts, "/")
}

func chunkFileName(start uint64) string {
	return fmt.Sprintf("chunk_%016d.json", start)
}

// --- Write methods (buffer) ---

func (s *S3Store) PutBlobs(ctx context.Context, blobs []types.Blob) error {
	if len(blobs) == 0 {
		return nil
	}
	start := time.Now()
	defer func() { s.metrics.ObserveStoreQueryDuration("PutBlobs", time.Since(start)) }()

	s.mu.Lock()
	defer s.mu.Unlock()

	for i := range blobs {
		b := &blobs[i]
		key := blobChunkKey{namespace: b.Namespace, chunkStart: s.chunkStart(b.Height)}
		s.blobBuf[key] = append(s.blobBuf[key], *b)

		s.commitBuf = append(s.commitBuf, commitEntry{
			commitmentHex: hex.EncodeToString(b.Commitment),
			pointer: commitPointer{
				Namespace: b.Namespace.String(),
				Height:    b.Height,
				Index:     b.Index,
			},
		})
	}
	return nil
}

func (s *S3Store) PutHeader(ctx context.Context, header *types.Header) error {
	start := time.Now()
	defer func() { s.metrics.ObserveStoreQueryDuration("PutHeader", time.Since(start)) }()

	s.mu.Lock()
	defer s.mu.Unlock()

	cs := s.chunkStart(header.Height)
	s.headerBuf[cs] = append(s.headerBuf[cs], header)
	return nil
}

func (s *S3Store) PutNamespace(ctx context.Context, ns types.Namespace) error {
	s.mu.Lock()
	defer s.mu.Unlock()

	key := s.key("meta", "namespaces.json")

	existing, err := s.getObject(ctx, key)
	if err != nil && !isNotFound(err) {
		return fmt.Errorf("get namespaces: %w", err)
	}

	namespaces := make([]string, 0, 1)
	if existing != nil {
		if err := json.Unmarshal(existing, &namespaces); err != nil {
			return fmt.Errorf("decode namespaces: %w", err)
		}
	}

	nsHex := ns.String()
	for _, n := range namespaces {
		if n == nsHex {
			return nil
		}
	}
	namespaces = append(namespaces, nsHex)

	data, err := json.Marshal(namespaces)
	if err != nil {
		return fmt.Errorf("encode namespaces: %w", err)
	}
	return s.putObject(ctx, key, data)
}

// --- Read methods ---

func (s *S3Store) GetBlob(ctx context.Context, ns types.Namespace, height uint64, index int) (*types.Blob, error) {
	start := time.Now()
	defer func() { s.metrics.ObserveStoreQueryDuration("GetBlob", time.Since(start)) }()

	// Check buffer first.
	if b := s.findBlobInBuffer(ns, height, index); b != nil {
		return b, nil
	}

	cs := s.chunkStart(height)
	key := s.key("blobs", ns.String(), chunkFileName(cs))

	data, err := s.getObject(ctx, key)
	if err != nil {
		if isNotFound(err) {
			return nil, ErrNotFound
		}
		return nil, fmt.Errorf("get blob chunk: %w", err)
	}

	blobs, err := decodeS3Blobs(data)
	if err != nil {
		return nil, err
	}

	for i := range blobs {
		if blobs[i].Height == height && blobs[i].Index == index && blobs[i].Namespace == ns {
			return &blobs[i],
			nil
		}
	}
	return nil, ErrNotFound
}

func (s *S3Store) GetBlobs(ctx context.Context, ns types.Namespace, startHeight, endHeight uint64, limit, offset int) ([]types.Blob, error) {
	start := time.Now()
	defer func() { s.metrics.ObserveStoreQueryDuration("GetBlobs", time.Since(start)) }()

	buffered := s.collectBufferedBlobs(ns, startHeight, endHeight)

	s3Blobs, err := s.fetchBlobChunks(ctx, ns, startHeight, endHeight)
	if err != nil {
		return nil, err
	}

	allBlobs := deduplicateBlobs(append(buffered, s3Blobs...))

	sort.Slice(allBlobs, func(i, j int) bool {
		if allBlobs[i].Height != allBlobs[j].Height {
			return allBlobs[i].Height < allBlobs[j].Height
		}
		return allBlobs[i].Index < allBlobs[j].Index
	})

	return applyOffsetLimit(allBlobs, offset, limit), nil
}

// collectBufferedBlobs returns in-buffer blobs matching the namespace and height range.
func (s *S3Store) collectBufferedBlobs(ns types.Namespace, startHeight, endHeight uint64) []types.Blob {
	s.mu.Lock()
	defer s.mu.Unlock()

	var result []types.Blob
	for key, bufs := range s.blobBuf {
		if key.namespace != ns {
			continue
		}
		for i := range bufs {
			if bufs[i].Height >= startHeight && bufs[i].Height <= endHeight {
				result = append(result, bufs[i])
			}
		}
	}
	return result
}

// fetchBlobChunks reads all S3 blob chunks overlapping the height range for a namespace.
func (s *S3Store) fetchBlobChunks(ctx context.Context, ns types.Namespace, startHeight, endHeight uint64) ([]types.Blob, error) {
	firstChunk := s.chunkStart(startHeight)
	lastChunk := s.chunkStart(endHeight)

	var allBlobs []types.Blob
	for cs := firstChunk; cs <= lastChunk; cs += s.chunkSize {
		key := s.key("blobs", ns.String(), chunkFileName(cs))
		data, err := s.getObject(ctx, key)
		if err != nil {
			if isNotFound(err) {
				continue
			}
			return nil, fmt.Errorf("get blob chunk at %d: %w", cs, err)
		}

		blobs, err := decodeS3Blobs(data)
		if err != nil {
			return nil, err
		}
		for i := range blobs {
			if blobs[i].Height >= startHeight && blobs[i].Height <= endHeight && blobs[i].Namespace == ns {
				allBlobs = append(allBlobs, blobs[i])
			}
		}
	}
	return allBlobs, nil
}

func applyOffsetLimit(items []types.Blob, offset, limit int) []types.Blob {
	if offset > 0 {
		if offset >= len(items) {
			return nil
		}
		items = items[offset:]
	}
	if limit > 0 && limit < len(items) {
		items = items[:limit]
	}
	return items
}

func (s *S3Store) GetBlobByCommitment(ctx context.Context, commitment []byte) (*types.Blob, error) {
	start := time.Now()
	defer func() { s.metrics.ObserveStoreQueryDuration("GetBlobByCommitment", time.Since(start)) }()

	// Check buffer first.
	commitHex := hex.EncodeToString(commitment)

	s.mu.Lock()
	for _, entry := range s.commitBuf {
		if entry.commitmentHex == commitHex {
			ns, err := types.NamespaceFromHex(entry.pointer.Namespace)
			if err == nil {
				if b := s.findBlobInBufferLocked(ns, entry.pointer.Height, entry.pointer.Index); b != nil {
					s.mu.Unlock()
					return b, nil
				}
			}
		}
	}
	s.mu.Unlock()

	// Look up commitment index in S3.
	key := s.key("index", "commitments", commitHex+".json")
	data, err := s.getObject(ctx, key)
	if err != nil {
		if isNotFound(err) {
			return nil, ErrNotFound
		}
		return nil, fmt.Errorf("get commitment index: %w", err)
	}

	var ptr commitPointer
	if err := json.Unmarshal(data, &ptr); err != nil {
		return nil, fmt.Errorf("decode commitment pointer: %w", err)
	}

	ns, err := types.NamespaceFromHex(ptr.Namespace)
	if err != nil {
		return nil, fmt.Errorf("parse namespace from commitment index: %w", err)
	}

	return s.GetBlob(ctx, ns, ptr.Height, ptr.Index)
}

func (s *S3Store) GetHeader(ctx context.Context, height uint64) (*types.Header, error) {
	start := time.Now()
	defer func() { s.metrics.ObserveStoreQueryDuration("GetHeader", time.Since(start)) }()

	// Check buffer first.
	if h := s.findHeaderInBuffer(height); h != nil {
		return h, nil
	}

	cs := s.chunkStart(height)
	key := s.key("headers", chunkFileName(cs))

	data, err := s.getObject(ctx, key)
	if err != nil {
		if isNotFound(err) {
			return nil, ErrNotFound
		}
		return nil, fmt.Errorf("get header chunk: %w", err)
	}

	headers, err := decodeS3Headers(data)
	if err != nil {
		return nil, err
	}
	for i := range headers {
		if headers[i].Height == height {
			return &headers[i],
			nil
		}
	}
	return nil, ErrNotFound
}

func (s *S3Store) GetNamespaces(ctx context.Context) ([]types.Namespace, error) {
	key := s.key("meta", "namespaces.json")
	data, err := s.getObject(ctx, key)
	if err != nil {
		if isNotFound(err) {
			return nil, nil
		}
		return nil, fmt.Errorf("get namespaces: %w", err)
	}

	var hexList []string
	if err := json.Unmarshal(data, &hexList); err != nil {
		return nil, fmt.Errorf("decode namespaces: %w", err)
	}

	namespaces := make([]types.Namespace, 0, len(hexList))
	for _, h := range hexList {
		ns, err := types.NamespaceFromHex(h)
		if err != nil {
			return nil, fmt.Errorf("parse namespace %q: %w", h, err)
		}
		namespaces = append(namespaces, ns)
	}
	return namespaces, nil
}

func (s *S3Store) GetSyncState(ctx context.Context) (*types.SyncStatus, error) {
	start := time.Now()
	defer func() { s.metrics.ObserveStoreQueryDuration("GetSyncState", time.Since(start)) }()

	key := s.key("meta", "sync_state.json")
	data, err := s.getObject(ctx, key)
	if err != nil {
		if isNotFound(err) {
			return nil, ErrNotFound
		}
		return nil, fmt.Errorf("get sync state: %w", err)
	}

	var state struct {
		State         int    `json:"state"`
		LatestHeight  uint64 `json:"latest_height"`
		NetworkHeight uint64 `json:"network_height"`
	}
	if err := json.Unmarshal(data, &state); err != nil {
		return nil, fmt.Errorf("decode sync state: %w", err)
	}
	return &types.SyncStatus{
		State:         types.SyncState(state.State),
		LatestHeight:  state.LatestHeight,
		NetworkHeight: state.NetworkHeight,
	},
	nil
}

func (s *S3Store) SetSyncState(ctx context.Context, status types.SyncStatus) error {
	// Flush buffered data first — this is the checkpoint boundary.
	if err := s.flush(ctx); err != nil {
		return fmt.Errorf("flush before sync state: %w", err)
	}

	data, err := json.Marshal(struct {
		State         int    `json:"state"`
		LatestHeight  uint64 `json:"latest_height"`
		NetworkHeight uint64 `json:"network_height"`
	}{
		State:         int(status.State),
		LatestHeight:  status.LatestHeight,
		NetworkHeight: status.NetworkHeight,
	})
	if err != nil {
		return fmt.Errorf("encode sync state: %w", err)
	}

	key := s.key("meta", "sync_state.json")
	return s.putObject(ctx, key, data)
}

func (s *S3Store) Close() error {
	return s.flush(context.Background())
}

// --- Flush ---

// flush drains the write buffer to S3. Called at checkpoint boundaries.
func (s *S3Store) flush(ctx context.Context) error {
	s.mu.Lock()
	blobBuf := s.blobBuf
	headerBuf := s.headerBuf
	commitBuf := s.commitBuf
	s.blobBuf = make(map[blobChunkKey][]types.Blob)
	s.headerBuf = make(map[uint64][]*types.Header)
	s.commitBuf = nil
	s.mu.Unlock()

	if len(blobBuf) == 0 && len(headerBuf) == 0 && len(commitBuf) == 0 {
		return nil
	}

	// Use a semaphore to bound concurrency.
	sem := make(chan struct{}, maxFlushConcurrency)
	var (
		mu   sync.Mutex
		errs []error
	)
	addErr := func(err error) {
		mu.Lock()
		errs = append(errs, err)
		mu.Unlock()
	}

	var wg sync.WaitGroup

	// Flush blob chunks.
	for key, blobs := range blobBuf {
		wg.Add(1)
		go func(key blobChunkKey, blobs []types.Blob) {
			defer wg.Done()
			sem <- struct{}{}
			defer func() { <-sem }()
			if err := s.flushBlobChunk(ctx, key, blobs); err != nil {
				addErr(err)
			}
		}(key, blobs)
	}

	// Flush header chunks.
	for cs, headers := range headerBuf {
		wg.Add(1)
		go func(cs uint64, headers []*types.Header) {
			defer wg.Done()
			sem <- struct{}{}
			defer func() { <-sem }()
			if err := s.flushHeaderChunk(ctx, cs, headers); err != nil {
				addErr(err)
			}
		}(cs, headers)
	}

	// Flush commitment indices.
	for _, entry := range commitBuf {
		wg.Add(1)
		go func(e commitEntry) {
			defer wg.Done()
			sem <- struct{}{}
			defer func() { <-sem }()
			if err := s.flushCommitIndex(ctx, e); err != nil {
				addErr(err)
			}
		}(entry)
	}

	wg.Wait()
	return errors.Join(errs...)
}

func (s *S3Store) flushBlobChunk(ctx context.Context, key blobChunkKey, newBlobs []types.Blob) error {
	objKey := s.key("blobs", key.namespace.String(), chunkFileName(key.chunkStart))

	// Read existing chunk (may 404).
	existing, err := s.getObject(ctx, objKey)
	if err != nil && !isNotFound(err) {
		return fmt.Errorf("read blob chunk for merge: %w", err)
	}

	var merged []types.Blob
	if existing != nil {
		merged, err = decodeS3Blobs(existing)
		if err != nil {
			return fmt.Errorf("decode existing blob chunk: %w", err)
		}
	}

	merged = append(merged, newBlobs...)
	merged = deduplicateBlobs(merged)
	sort.Slice(merged, func(i, j int) bool {
		if merged[i].Height != merged[j].Height {
			return merged[i].Height < merged[j].Height
		}
		return merged[i].Index < merged[j].Index
	})

	data, err := encodeS3Blobs(merged)
	if err != nil {
		return fmt.Errorf("encode blob chunk: %w", err)
	}
	return s.putObject(ctx, objKey, data)
}

func (s *S3Store) flushHeaderChunk(ctx context.Context, cs uint64, newHeaders []*types.Header) error {
	objKey := s.key("headers", chunkFileName(cs))

	existing, err := s.getObject(ctx, objKey)
	if err != nil && !isNotFound(err) {
		return fmt.Errorf("read header chunk for merge: %w", err)
	}

	var merged []types.Header
	if existing != nil {
		merged, err = decodeS3Headers(existing)
		if err != nil {
			return fmt.Errorf("decode existing header chunk: %w", err)
		}
	}

	for _, h := range newHeaders {
		merged = append(merged, *h)
	}

	merged = deduplicateHeaders(merged)
	sort.Slice(merged, func(i, j int) bool {
		return merged[i].Height < merged[j].Height
	})

	data, err := encodeS3Headers(merged)
	if err != nil {
		return fmt.Errorf("encode header chunk: %w", err)
	}
	return s.putObject(ctx, objKey, data)
}

func (s *S3Store) flushCommitIndex(ctx context.Context, e commitEntry) error {
	key := s.key("index", "commitments", e.commitmentHex+".json")
	data, err := json.Marshal(e.pointer)
	if err != nil {
		return fmt.Errorf("encode commitment index: %w", err)
	}
	return s.putObject(ctx, key, data)
}

// --- Buffer lookup helpers ---

func (s *S3Store) findBlobInBuffer(ns types.Namespace, height uint64, index int) *types.Blob {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.findBlobInBufferLocked(ns, height, index)
}

func (s *S3Store) findBlobInBufferLocked(ns types.Namespace, height uint64, index int) *types.Blob {
	key := blobChunkKey{namespace: ns, chunkStart: s.chunkStart(height)}
	for i := range s.blobBuf[key] {
		b := &s.blobBuf[key][i]
		if b.Height == height && b.Index == index {
			cp := *b
			return &cp
		}
	}
	return nil
}

func (s *S3Store) findHeaderInBuffer(height uint64) *types.Header {
	s.mu.Lock()
	defer s.mu.Unlock()

	cs := s.chunkStart(height)
	for _, h := range s.headerBuf[cs] {
		if h.Height == height {
			cp := *h
			return &cp
		}
	}
	return nil
}

// --- S3 helpers ---

func (s *S3Store) getObject(ctx context.Context, key string) ([]byte, error) {
	out, err := s.client.GetObject(ctx, &s3.GetObjectInput{
		Bucket: aws.String(s.bucket),
		Key:    aws.String(key),
	})
	if err != nil {
		return nil, err
	}
	defer out.Body.Close() //nolint:errcheck
	return io.ReadAll(out.Body)
}

func (s *S3Store) putObject(ctx context.Context, key string, data []byte) error {
	_, err := s.client.PutObject(ctx, &s3.PutObjectInput{
		Bucket:      aws.String(s.bucket),
		Key:         aws.String(key),
		Body:        bytes.NewReader(data),
		ContentType: aws.String("application/json"),
	})
	return err
}

// isNotFound returns true if the error indicates an S3 NoSuchKey.
func isNotFound(err error) bool {
	var nsk *s3types.NoSuchKey
	if errors.As(err, &nsk) {
		return true
	}
	// Some S3-compatible services return NotFound via the HTTP status code
	// wrapped in a smithy-go OperationError rather than a typed NoSuchKey error.
	var nsb *s3types.NotFound
	return errors.As(err, &nsb)
}

// --- Encoding/decoding ---

func encodeS3Blobs(blobs []types.Blob) ([]byte, error) {
	out := make([]s3Blob, len(blobs))
	for i, b := range blobs {
		out[i] = s3Blob{
			Height:       b.Height,
			Namespace:    b.Namespace.String(),
			Data:         b.Data,
			Commitment:   b.Commitment,
			ShareVersion: b.ShareVersion,
			Signer:       b.Signer,
			Index:        b.Index,
		}
	}
	return json.Marshal(out)
}

func decodeS3Blobs(data []byte) ([]types.Blob, error) {
	var raw []s3Blob
	if err := json.Unmarshal(data, &raw); err != nil {
		return nil, fmt.Errorf("decode blobs JSON: %w", err)
	}
	blobs := make([]types.Blob, len(raw))
	for i, r := range raw {
		ns, err := types.NamespaceFromHex(r.Namespace)
		if err != nil {
			return nil, fmt.Errorf("decode blob namespace: %w", err)
		}
		blobs[i] = types.Blob{
			Height:       r.Height,
			Namespace:    ns,
			Data:         r.Data,
			Commitment:   r.Commitment,
			ShareVersion: r.ShareVersion,
			Signer:       r.Signer,
			Index:        r.Index,
		}
	}
	return blobs, nil
}

func encodeS3Headers(headers []types.Header) ([]byte, error) {
	out := make([]s3Header, len(headers))
	for i, h := range headers {
		out[i] = s3Header{
			Height:    h.Height,
			Hash:      h.Hash,
			DataHash:  h.DataHash,
			Time:      h.Time,
			RawHeader: h.RawHeader,
		}
	}
	return json.Marshal(out)
}

func decodeS3Headers(data []byte) ([]types.Header, error) {
	var raw []s3Header
	if err := json.Unmarshal(data, &raw); err != nil {
		return nil, fmt.Errorf("decode headers JSON: %w", err)
	}
	headers := make([]types.Header, len(raw))
	for i, r := range raw {
		headers[i] = types.Header{
			Height:    r.Height,
			Hash:      r.Hash,
			DataHash:  r.DataHash,
			Time:      r.Time,
			RawHeader: r.RawHeader,
		}
	}
	return headers, nil
}

// --- Deduplication ---

func deduplicateBlobs(blobs []types.Blob) []types.Blob {
	type blobKey struct {
		height    uint64
		namespace types.Namespace
		index     int
	}
	seen := make(map[blobKey]struct{}, len(blobs))
	out := make([]types.Blob, 0, len(blobs))
	for _, b := range blobs {
		k := blobKey{height: b.Height, namespace: b.Namespace, index: b.Index}
		if _, ok := seen[k]; ok {
			continue
		}
		seen[k] = struct{}{}
		out = append(out, b)
	}
	return out
}

func deduplicateHeaders(headers []types.Header) []types.Header {
	seen := make(map[uint64]struct{}, len(headers))
	out := make([]types.Header, 0, len(headers))
	for _, h := range headers {
		if _, ok := seen[h.Height]; ok {
			continue
		}
		seen[h.Height] = struct{}{}
		out = append(out, h)
	}
	return out
}

Comment on lines +70 to +71
r.ObserveBackfillStageDuration("fetch_header", 3*time.Millisecond)
r.IncBackfillStageErrors("store_header")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Calls to the new ObserveBackfillStageDuration and IncBackfillStageErrors methods are added to the TestNopRecorderDoesNotPanic test. This ensures that the nopRecorder correctly handles these new methods without panicking.

Suggested change
r.ObserveBackfillStageDuration("fetch_header", 3*time.Millisecond)
r.IncBackfillStageErrors("store_header")
r.ObserveBackfillStageDuration("fetch_header", 3*time.Millisecond)
r.IncBackfillStageErrors("store_header")

- Document SetMetrics must-call-before-use contract
- Use dedicated nsMu for PutNamespace to avoid holding buffer mutex
  during S3 network I/O
- Add flushMu to serialize flush() calls, preventing concurrent
  read-modify-write races on S3 chunk objects
- Pre-allocate merged slice in flushHeaderChunk (lint fix)
- Guard division by zero in backfill progress when done==0

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (3)
pkg/sync/backfill.go (1)

190-196: Inconsistent metrics recording pattern for store_header and store_blobs.

fetch_header (lines 183–184) records the duration unconditionally right after the call, before the error check — clean, single observation point. store_header and store_blobs instead duplicate the ObserveBackfillStageDuration call in both branches (error path and fall-through), which is correct but verbose and inconsistent:

// store_header — duration recorded twice (once per branch)
stageStart = time.Now()
if err := b.store.PutHeader(ctx, hdr); err != nil {
    b.metrics.ObserveBackfillStageDuration("store_header", time.Since(stageStart)) // ← in error branch
    b.metrics.IncBackfillStageErrors("store_header")
    return ...
}
b.metrics.ObserveBackfillStageDuration("store_header", time.Since(stageStart)) // ← in success branch

Adopt the same pattern as fetch_header to record duration once before the error check:

♻️ Proposed refactor
 	stageStart = time.Now()
-	if err := b.store.PutHeader(ctx, hdr); err != nil {
-		b.metrics.ObserveBackfillStageDuration("store_header", time.Since(stageStart))
-		b.metrics.IncBackfillStageErrors("store_header")
-		return fmt.Errorf("put header: %w", err)
-	}
-	b.metrics.ObserveBackfillStageDuration("store_header", time.Since(stageStart))
+	putHdrErr := b.store.PutHeader(ctx, hdr)
+	b.metrics.ObserveBackfillStageDuration("store_header", time.Since(stageStart))
+	if putHdrErr != nil {
+		b.metrics.IncBackfillStageErrors("store_header")
+		return fmt.Errorf("put header: %w", putHdrErr)
+	}

 	// ...

 		stageStart = time.Now()
-		if err := b.store.PutBlobs(ctx, blobs); err != nil {
-			b.metrics.ObserveBackfillStageDuration("store_blobs", time.Since(stageStart))
-			b.metrics.IncBackfillStageErrors("store_blobs")
-			return fmt.Errorf("put blobs: %w", err)
-		}
-		b.metrics.ObserveBackfillStageDuration("store_blobs", time.Since(stageStart))
+		putBlobsErr := b.store.PutBlobs(ctx, blobs)
+		b.metrics.ObserveBackfillStageDuration("store_blobs", time.Since(stageStart))
+		if putBlobsErr != nil {
+			b.metrics.IncBackfillStageErrors("store_blobs")
+			return fmt.Errorf("put blobs: %w", putBlobsErr)
+		}

Also applies to: 208-215

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sync/backfill.go` around lines 190 - 196, The metrics for store_header
are recorded twice; change to the same pattern used in fetch_header: start the
timer (stageStart = time.Now()), call b.store.PutHeader(ctx, hdr), immediately
call b.metrics.ObserveBackfillStageDuration("store_header",
time.Since(stageStart)) before checking the error, then if err != nil call
b.metrics.IncBackfillStageErrors("store_header") and return the wrapped error;
apply the identical single-observation refactor to the store_blobs/PutBlobs
block (the code around where PutBlobs is called and where
b.metrics.IncBackfillStageErrors("store_blobs") is incremented).
pkg/store/s3.go (2)

384-426: Refactor manual mu lock/unlock in GetBlobByCommitment to use a closure with defer.

The explicit s.mu.Unlock() before the early return (line 397) and the post-loop s.mu.Unlock() (line 403) are correct today but won't survive a panic inside the loop (no deferred release), and any future edit that adds an additional exit path from the loop body risks a lock leak.

♻️ Proposed refactor using a helper closure
 func (s *S3Store) GetBlobByCommitment(ctx context.Context, commitment []byte) (*types.Blob, error) {
 	start := time.Now()
 	defer func() { s.metrics.ObserveStoreQueryDuration("GetBlobByCommitment", time.Since(start)) }()

 	// Check buffer first.
 	commitHex := hex.EncodeToString(commitment)

-	s.mu.Lock()
-	for _, entry := range s.commitBuf {
-		if entry.commitmentHex == commitHex {
-			ns, err := types.NamespaceFromHex(entry.pointer.Namespace)
-			if err == nil {
-				if b := s.findBlobInBufferLocked(ns, entry.pointer.Height, entry.pointer.Index); b != nil {
-					s.mu.Unlock()
-					return b, nil
-				}
-			}
-		}
-	}
-	s.mu.Unlock()
+	if b := func() *types.Blob {
+		s.mu.Lock()
+		defer s.mu.Unlock()
+		for _, entry := range s.commitBuf {
+			if entry.commitmentHex != commitHex {
+				continue
+			}
+			ns, err := types.NamespaceFromHex(entry.pointer.Namespace)
+			if err == nil {
+				if b := s.findBlobInBufferLocked(ns, entry.pointer.Height, entry.pointer.Index); b != nil {
+					return b
+				}
+			}
+		}
+		return nil
+	}(); b != nil {
+		return b, nil
+	}

 	// Look up commitment index in S3.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/s3.go` around lines 384 - 426, GetBlobByCommitment manually locks
s.mu then unlocks in two places; change this to acquire the lock inside a short
closure (or anonymous func) so you can defer s.mu.Unlock() to guarantee release
on any return/panic. Specifically, wrap the loop that scans s.commitBuf and the
call to s.findBlobInBufferLocked(ns, entry.pointer.Height, entry.pointer.Index)
in a closure that does s.mu.Lock(); defer s.mu.Unlock(); and returns the found
*types.Blob (or nil) so the outer function can return early without needing
explicit Unlock calls; update callers inside GetBlobByCommitment accordingly.

95-153: Extract the shared constructor body to eliminate ~50 lines of duplication.

NewS3Store and NewS3StoreWithStaticCredentials differ by exactly one opts entry. Everything else — region option, endpoint option, LoadDefaultConfig, S3 client construction, chunk-size defaulting, and newS3Store call — is copy-pasted.

♻️ Proposed refactor
-// NewS3Store creates a new S3Store from the given config.
-func NewS3Store(ctx context.Context, cfg *config.S3Config) (*S3Store, error) {
-	opts := []func(*awsconfig.LoadOptions) error{}
-	if cfg.Region != "" {
-		opts = append(opts, awsconfig.WithRegion(cfg.Region))
-	}
-	awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)
-	if err != nil {
-		return nil, fmt.Errorf("load AWS config: %w", err)
-	}
-
-	var s3Opts []func(*s3.Options)
-	if cfg.Endpoint != "" {
-		s3Opts = append(s3Opts, func(o *s3.Options) {
-			o.BaseEndpoint = aws.String(cfg.Endpoint)
-			o.UsePathStyle = true
-		})
-	}
-
-	client := s3.NewFromConfig(awsCfg, s3Opts...)
-
-	chunkSize := uint64(cfg.ChunkSize)
-	if chunkSize == 0 {
-		chunkSize = 64
-	}
-
-	return newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize), nil
-}
+// NewS3Store creates a new S3Store from the given config.
+func NewS3Store(ctx context.Context, cfg *config.S3Config) (*S3Store, error) {
+	return buildS3Store(ctx, cfg)
+}
 
-// NewS3StoreWithStaticCredentials creates an S3Store with explicit credentials.
-// Useful for testing against MinIO or other S3-compatible services.
-func NewS3StoreWithStaticCredentials(ctx context.Context, cfg *config.S3Config, accessKey, secretKey, token string) (*S3Store, error) {
-	opts := []func(*awsconfig.LoadOptions) error{
-		awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, token)),
-	}
-	if cfg.Region != "" {
-		opts = append(opts, awsconfig.WithRegion(cfg.Region))
-	}
-	awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)
-	if err != nil {
-		return nil, fmt.Errorf("load AWS config: %w", err)
-	}
-
-	var s3Opts []func(*s3.Options)
-	if cfg.Endpoint != "" {
-		s3Opts = append(s3Opts, func(o *s3.Options) {
-			o.BaseEndpoint = aws.String(cfg.Endpoint)
-			o.UsePathStyle = true
-		})
-	}
-
-	client := s3.NewFromConfig(awsCfg, s3Opts...)
-
-	chunkSize := uint64(cfg.ChunkSize)
-	if chunkSize == 0 {
-		chunkSize = 64
-	}
-
-	return newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize), nil
-}
+// NewS3StoreWithStaticCredentials creates an S3Store with explicit credentials.
+// Useful for testing against MinIO or other S3-compatible services.
+func NewS3StoreWithStaticCredentials(ctx context.Context, cfg *config.S3Config, accessKey, secretKey, token string) (*S3Store, error) {
+	return buildS3Store(ctx, cfg,
+		awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, token)),
+	)
+}
+
+func buildS3Store(ctx context.Context, cfg *config.S3Config, extraOpts ...func(*awsconfig.LoadOptions) error) (*S3Store, error) {
+	opts := append(extraOpts, []func(*awsconfig.LoadOptions) error{}...)
+	if cfg.Region != "" {
+		opts = append(opts, awsconfig.WithRegion(cfg.Region))
+	}
+	awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)
+	if err != nil {
+		return nil, fmt.Errorf("load AWS config: %w", err)
+	}
+	var s3Opts []func(*s3.Options)
+	if cfg.Endpoint != "" {
+		s3Opts = append(s3Opts, func(o *s3.Options) {
+			o.BaseEndpoint = aws.String(cfg.Endpoint)
+			o.UsePathStyle = true
+		})
+	}
+	chunkSize := uint64(cfg.ChunkSize)
+	if chunkSize == 0 {
+		chunkSize = 64
+	}
+	return newS3Store(s3.NewFromConfig(awsCfg, s3Opts...), cfg.Bucket, cfg.Prefix, chunkSize), nil
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/s3.go` around lines 95 - 153, Both NewS3Store and
NewS3StoreWithStaticCredentials duplicate the same constructor body; extract the
shared logic into a helper (e.g. buildS3Store or newS3StoreFromLoadOptions) that
accepts ctx, cfg and a variadic slice of awsconfig LoadOptions (or
[]func(*awsconfig.LoadOptions) error), runs awsconfig.LoadDefaultConfig, applies
the endpoint s3.Options, constructs the s3 client via s3.NewFromConfig, computes
the chunkSize default, and returns newS3Store(client, cfg.Bucket, cfg.Prefix,
chunkSize); then have NewS3Store call that helper with no extra opts and
NewS3StoreWithStaticCredentials prepend the credentials provider opt
(credentials.NewStaticCredentialsProvider(...)) before calling the same helper,
removing the duplicated code around awsconfig.LoadDefaultConfig,
s3.NewFromConfig, chunkSize handling and the final newS3Store call.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/store/s3.go`:
- Around line 156-177: newS3Store can be constructed with chunkSize==0 which
causes a divide-by-zero in chunkStart; add a guard in newS3Store to set a
sensible default (e.g., 64) when the passed chunkSize is zero (or use an
existing DefaultChunkSize constant), so that chunkSize is always non-zero before
storing it on the S3Store struct and before chunkStart is ever called.
- Around line 545-560: The flush() function swaps buffers and releases s.mu
before performing S3 puts, causing transient ErrNotFound for concurrent reads;
fix by adding "flushing" snapshot maps (e.g., flushingBlobBuf,
flushingHeaderBuf, flushingCommitBuf) that flush() populates under flushMu
before releasing s.mu and clears after the upload goroutines complete (after
wg.Wait()), and update read-path helpers (findBlobInBufferLocked,
findHeaderInBuffer, and any GetBlobByCommitment lookup) to consult buffer ->
flushing snapshot -> S3 (holding flushMu appropriately while reading the
flushing snapshot) so readers can see in-flight data until flush finishes.
- Around line 729-738: The getObject method in S3Store currently calls
io.ReadAll(out.Body) which can OOM on very large or corrupt objects; update
S3Store.getObject to wrap out.Body with an io.LimitReader using a defined max
size constant (e.g., maxObjectRead = 256<<20) and read through that reader, and
if the read hits the limit return a clear error indicating the object is too
large instead of allocating unbounded memory; ensure you still close out.Body
and return the read bytes and propagated errors from the limited read.

In `@pkg/sync/backfill.go`:
- Around line 114-115: The final throughput calculation in backfill.go divides
by elapsed.Seconds() which can be zero and produce +Inf; modify the block that
computes elapsed := time.Since(startTime) and rate := float64(totalHeights) /
elapsed.Seconds() to guard against zero elapsed time (e.g., if elapsed.Seconds()
== 0 set rate = 0) or use a small non-zero fallback before dividing; update the
variables elapsed, rate, totalHeights and startTime in that function accordingly
so the log never prints +Inf.

---

Duplicate comments:
In `@pkg/sync/backfill.go`:
- Around line 55-60: The existing guard that checks if done == 0 inside the
ticker loop (the block that logs "backfill progress: waiting for first batch"
with b.log.Info().Uint64("target", toHeight).Msg(...)) is correct and requires
no change; keep the early continue to avoid performing the rate/remaining
arithmetic when no heights have been processed yet.

---

Nitpick comments:
In `@pkg/store/s3.go`:
- Around line 384-426: GetBlobByCommitment manually locks s.mu then unlocks in
two places; change this to acquire the lock inside a short closure (or anonymous
func) so you can defer s.mu.Unlock() to guarantee release on any return/panic.
Specifically, wrap the loop that scans s.commitBuf and the call to
s.findBlobInBufferLocked(ns, entry.pointer.Height, entry.pointer.Index) in a
closure that does s.mu.Lock(); defer s.mu.Unlock(); and returns the found
*types.Blob (or nil) so the outer function can return early without needing
explicit Unlock calls; update callers inside GetBlobByCommitment accordingly.
- Around line 95-153: Both NewS3Store and NewS3StoreWithStaticCredentials
duplicate the same constructor body; extract the shared logic into a helper
(e.g. buildS3Store or newS3StoreFromLoadOptions) that accepts ctx, cfg and a
variadic slice of awsconfig LoadOptions (or []func(*awsconfig.LoadOptions)
error), runs awsconfig.LoadDefaultConfig, applies the endpoint s3.Options,
constructs the s3 client via s3.NewFromConfig, computes the chunkSize default,
and returns newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize); then have
NewS3Store call that helper with no extra opts and
NewS3StoreWithStaticCredentials prepend the credentials provider opt
(credentials.NewStaticCredentialsProvider(...)) before calling the same helper,
removing the duplicated code around awsconfig.LoadDefaultConfig,
s3.NewFromConfig, chunkSize handling and the final newS3Store call.

In `@pkg/sync/backfill.go`:
- Around line 190-196: The metrics for store_header are recorded twice; change
to the same pattern used in fetch_header: start the timer (stageStart =
time.Now()), call b.store.PutHeader(ctx, hdr), immediately call
b.metrics.ObserveBackfillStageDuration("store_header", time.Since(stageStart))
before checking the error, then if err != nil call
b.metrics.IncBackfillStageErrors("store_header") and return the wrapped error;
apply the identical single-observation refactor to the store_blobs/PutBlobs
block (the code around where PutBlobs is called and where
b.metrics.IncBackfillStageErrors("store_blobs") is incremented).

Comment on lines +729 to +738
func (s *S3Store) getObject(ctx context.Context, key string) ([]byte, error) {
out, err := s.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
})
if err != nil {
return nil, err
}
defer out.Body.Close() //nolint:errcheck
return io.ReadAll(out.Body)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

io.ReadAll with no size limit — large or corrupt chunks can cause OOM.

Blob and header chunks are append-only: each flushBlobChunk / flushHeaderChunk reads the existing S3 object, appends new entries, deduplicates, and writes back a larger object. Over time (especially for popular namespaces with small chunkSize or a large number of indices within a chunk range) these objects can grow substantially. io.ReadAll with no guard will materialise the entire object into a single heap buffer.

A misconfigured chunkSize (e.g., chunkSize = 1) or a corrupt oversized object can exhaust available memory with no user-visible error short of an OOM kill.

🛡️ Proposed fix: cap with `io.LimitReader`

Define a reasonable upper bound (e.g., 256 MiB) and hard-error on objects that exceed it rather than silently allocating:

+const maxChunkBytes = 256 << 20 // 256 MiB hard cap per chunk object

 func (s *S3Store) getObject(ctx context.Context, key string) ([]byte, error) {
 	out, err := s.client.GetObject(ctx, &s3.GetObjectInput{
 		Bucket: aws.String(s.bucket),
 		Key:    aws.String(key),
 	})
 	if err != nil {
 		return nil, err
 	}
 	defer out.Body.Close() //nolint:errcheck
-	return io.ReadAll(out.Body)
+	lr := io.LimitReader(out.Body, maxChunkBytes+1)
+	data, err := io.ReadAll(lr)
+	if err != nil {
+		return nil, err
+	}
+	if int64(len(data)) > maxChunkBytes {
+		return nil, fmt.Errorf("chunk object %q exceeds size limit (%d bytes)", key, maxChunkBytes)
+	}
+	return data, nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/s3.go` around lines 729 - 738, The getObject method in S3Store
currently calls io.ReadAll(out.Body) which can OOM on very large or corrupt
objects; update S3Store.getObject to wrap out.Body with an io.LimitReader using
a defined max size constant (e.g., maxObjectRead = 256<<20) and read through
that reader, and if the read hits the limit return a clear error indicating the
object is too large instead of allocating unbounded memory; ensure you still
close out.Body and return the read bytes and propagated errors from the limited
read.

@tac0turtle tac0turtle merged commit a7e41d6 into main Feb 19, 2026
4 checks passed
@tac0turtle tac0turtle deleted the feat/s3-storage-backend branch February 19, 2026 21:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments