Conversation
Add an S3Store implementation that batches multiple heights into single S3 objects (chunks) to reduce object count and request costs. Writes are buffered in memory and flushed at checkpoint boundaries (SetSyncState). - New S3Config with bucket, prefix, region, endpoint, chunk_size - Store factory in main.go (sqlite/s3 switch) - 13 mock-based unit tests covering round-trips, chunk boundaries, buffer read-through, idempotent writes, commitment index lookups - Config validation for S3-specific required fields Also includes profiling support, backfill progress tracking, and metrics formatting fixes from the working tree. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Document SetMetrics must-call-before-use contract - Use dedicated nsMu for PutNamespace to avoid holding buffer mutex during S3 network I/O - Add flushMu to serialize flush() calls, preventing concurrent read-modify-write races on S3 chunk objects - Pre-allocate merged slice in flushHeaderChunk (lint fix) - Guard division by zero in backfill progress when done==0 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Caution Review failedThe pull request is closed. 📝 WalkthroughWalkthroughIntroduces subscriber capacity limits and enforcement in the notifier, updates notifier APIs to return errors on subscribe failures, adds HTTP server Read/Write timeouts sourced from config, expands configuration options and validation, and implements retry-with-backoff for transient RPC errors in the fetcher. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant gRPC as gRPC Handler
participant Service
participant Notifier
Client->>gRPC: Subscribe request
gRPC->>Service: call BlobSubscribe()/HeaderSubscribe()
Service->>Notifier: Subscribe(namespaces)
alt maxSubscribers exceeded
Notifier-->>Service: error (ResourceExhausted)
Service-->>gRPC: return error
gRPC-->>Client: ResourceExhausted
else success
Notifier-->>Service: subscription
Service-->>gRPC: subscription returned
gRPC-->>Client: stream events
end
sequenceDiagram
participant Fetcher
participant RPCNode
participant Logger
Fetcher->>RPCNode: perform RPC call
alt transient error (timeout/EOF/net)
RPCNode-->>Fetcher: error
Fetcher->>Logger: log retry attempt
Fetcher->>RPCNode: retry with backoff (retryDelay)
RPCNode-->>Fetcher: response or final error
else success
RPCNode-->>Fetcher: response
end
Fetcher-->>Caller: response or error
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly improves the performance, robustness, and configurability of the Apex indexer. Key changes include the introduction of an S3-compatible storage backend for scalable data persistence, integration of pprof for performance diagnostics, and enhanced RPC client resilience with retry mechanisms. Additionally, API subscription management is made more robust with configurable limits, and default synchronization parameters have been tuned for better indexing speed. These updates collectively aim to make Apex more efficient and reliable in various operational environments. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
6bde87a to
a09f24b
Compare
There was a problem hiding this comment.
Code Review
This pull request introduces significant performance and scalability improvements, including an S3 storage backend, RPC call retries, and more detailed metrics and profiling. The code is well-structured, with good refactoring of the main application logic and configuration validation. I've identified a few areas for improvement, mainly concerning potential race conditions in the new S3 store when used in a multi-instance setup, and a minor weakness in the RPC retry jitter implementation.
I am having trouble creating individual review comments. Click here to see my feedback.
pkg/store/s3.go (619-650)
The flushBlobChunk function implements a read-modify-write pattern on S3 objects. This is not atomic and can lead to race conditions and data loss if multiple indexer processes are configured to write to the same S3 bucket and prefix. The flushMu mutex only prevents races within a single process.
A more robust approach for handling concurrent writes to S3 is to use conditional puts with ETags. The pattern would be:
GetObjectto get the current object and its ETag.- Perform the merge in memory.
PutObjectwith theIf-Matchcondition set to the ETag from step 1.- If the
PutObjectfails with aPreconditionFailederror, it means another process modified the object. Retry the entire read-modify-write cycle.
This same issue applies to flushHeaderChunk and PutNamespace.
cmd/apex/main.go (216-221)
Using a type switch to set metrics on different store implementations works, but it's not easily extensible. If more store types are added in the future, this switch statement will need to be modified.
A more maintainable approach would be to define an optional interface, for example:
type MeteredStore interface {
SetMetrics(metrics.Recorder)
}Then you could simply do a type assertion:
if ms, ok := db.(MeteredStore); ok {
ms.SetMetrics(rec)
}This decouples runIndexer from the concrete store implementations.
config.yaml (9)
The example configuration contains a hardcoded IP address 100.97.172.78:36668. While this is a private IP (in the Carrier-Grade NAT range), default configuration files should ideally use localhost or a placeholder like your-celestia-node-ip:26658 to avoid confusion and prevent accidental connection attempts to internal infrastructure.
celestia_node_url: "http://localhost:26658"pkg/fetch/celestia_node.go (301-312)
The current jitter implementation in retryDelay uses time.Now().UnixNano(). This is not a good source of randomness, especially if multiple goroutines call this function at nearly the same time, as they might get very similar jitter values, partially defeating the purpose of jitter. Using math/rand would provide better random distribution. Remember to add import "math/rand" and seed it once at application startup.
func retryDelay(attempt int) time.Duration {
base := defaultRPCRetryDelay * time.Duration(1<<attempt)
jitterCap := base / 2
if jitterCap <= 0 {
return base
}
// Note: math/rand should be seeded once at application startup.
jitter := rand.Int63n(int64(jitterCap))
return base + time.Duration(jitter)
}|
/gemini review |
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Code Review
The pull request introduces several important performance and reliability enhancements, such as HTTP timeouts, increased synchronization concurrency, and a robust retry mechanism for RPC calls. These changes significantly improve the indexer's resilience under load. I have identified a redundant validation check in the configuration loading logic and a minor improvement for the jitter implementation in the retry logic.
| if err := validateProfiling(&cfg.Profiling); err != nil { | ||
| return err | ||
| } |
pkg/fetch/celestia_node.go
Outdated
| n := time.Now().UnixNano() | ||
| if n < 0 { | ||
| n = -n | ||
| } | ||
| return base + time.Duration(n%int64(jitterCap)) |
There was a problem hiding this comment.
Using time.Now().UnixNano() % jitterCap is not a recommended way to generate jitter, as it can produce predictable patterns if calls are frequent. It is better to use a proper random number generator. Since this project targets Go 1.22+, math/rand/v2 is preferred for its improved API and performance.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
docs/running.md (1)
70-81:⚠️ Potential issue | 🟡 MinorFull Config Reference is missing newly added fields.
The "Full Config Reference" section doesn't document three new config fields introduced in this PR:
rpc.read_timeoutandrpc.write_timeout(defaulting to 30s inconfig/config.go)subscription.max_subscribers(defaulting to 1024 inconfig/config.go)These are present in the
defaultConfigYAMLtemplate inconfig/load.gobut omitted from the user-facing docs here.Proposed fix
rpc: listen_addr: ":8080" # JSON-RPC + health grpc_listen_addr: ":9090" # gRPC + read_timeout: 30 # HTTP read timeout in seconds + write_timeout: 30 # HTTP write timeout in seconds sync: start_height: 0 # 0 = genesis batch_size: 256 # headers per backfill batch concurrency: 12 # concurrent fetch workers subscription: buffer_size: 64 # event buffer per subscriber + max_subscribers: 1024 # maximum concurrent subscribers🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/running.md` around lines 70 - 81, Docs missing three new config fields that exist in defaultConfigYAML; update the "Full Config Reference" in docs/running.md to include rpc.read_timeout and rpc.write_timeout (defaults from config/config.go set to 30s) and subscription.max_subscribers (default 1024), matching the entries in config/load.go's defaultConfigYAML and using the same names and default values so the user-facing docs align with the actual defaults.config/load.go (1)
205-210: 🛠️ Refactor suggestion | 🟠 MajorDuplicate profiling validation.
The
profiling.listen_addrcheck on lines 208–210 is already performed insidevalidateProfiling(lines 264–268), which is called on line 205. The inline check is now dead code.Proposed fix
if err := validateProfiling(&cfg.Profiling); err != nil { return err } - if cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" { - return fmt.Errorf("profiling.listen_addr is required when profiling is enabled") - } if !validLogLevels[cfg.Log.Level] {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@config/load.go` around lines 205 - 210, The code performs duplicate validation of profiling.listen_addr: validateProfiling(&cfg.Profiling) already enforces that Profiling.ListenAddr is set when Profiling.Enabled is true, so remove the redundant inline check that follows the call (the if block referencing cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" and its fmt.Errorf return). Keep the validateProfiling function as the single validation point for Profiling configuration (refer to validateProfiling and cfg.Profiling fields) and ensure no other duplicate checks remain in load.go.pkg/fetch/celestia_node.go (1)
183-198:⚠️ Potential issue | 🟡 Minor
Includedcannot usecallRawWithRetrydue to return type mismatch with theProofForwarderinterface.The
ProofForwarder.Includedsignature returns(bool, error), butcallRawWithRetryonly wraps functions returning(json.RawMessage, error). This is a hard constraint from the interface contract.However,
GetProofdoes returnjson.RawMessageand could potentially benefit from the retry pattern likeGetHeader,GetBlobs, andGetNetworkHead. If proof operations intentionally skip retry logic (e.g., different semantics or latency requirements), a comment would clarify the design.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/celestia_node.go` around lines 183 - 198, The Included method currently cannot use callRawWithRetry because callRawWithRetry only wraps functions returning (json.RawMessage, error) while ProofForwarder.Included must return (bool, error); update the code by either (A) adding a thin retry helper (e.g., callBoolWithRetry) that mirrors callRawWithRetry but handles (bool, error) and use it from CelestiaNodeFetcher.Included, or (B) if proofs intentionally skip retries, add a short clarifying comment above CelestiaNodeFetcher.Included explaining why retry semantics are omitted; additionally consider applying callRawWithRetry to GetProof (which returns json.RawMessage) to reuse retry logic used by GetHeader/GetBlobs/GetNetworkHead. Ensure you reference and update the functions/methods: callRawWithRetry, CelestiaNodeFetcher.Included, CelestiaNodeFetcher.GetProof, and ProofForwarder.Included.
🧹 Nitpick comments (4)
config/load_test.go (1)
10-26: Test name no longer reflects its scope; new defaults lack coverage.
TestDefaultConfigProfilingDefaultsnow also asserts sync defaults. Consider renaming it (e.g.,TestDefaultConfigDefaults) and adding assertions for the other new fields:RPC.ReadTimeout,RPC.WriteTimeout, andSubscription.MaxSubscribers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@config/load_test.go` around lines 10 - 26, The test TestDefaultConfigProfilingDefaults now checks sync defaults too and its name is misleading; rename the test (e.g., TestDefaultConfigDefaults) and extend its assertions to cover the newly added defaults from DefaultConfig: verify cfg.RPC.ReadTimeout and cfg.RPC.WriteTimeout equal the expected default durations and verify cfg.Subscription.MaxSubscribers equals its expected default integer; update any error messages in t.Fatal/t.Fatalf to reference the new test name and the exact expected values to make failures clear.pkg/api/grpc/server_test.go (1)
21-33: Consider extracting shared mock types to reduce duplication across test packages.The
mockStoreandmockFetchertypes are nearly identical acrosspkg/api/service_test.go,pkg/api/jsonrpc/server_test.go, andpkg/api/grpc/server_test.go. A sharedinternal/testutil(orpkg/api/apitest) package with reusable mock implementations would reduce maintenance burden when theStoreorDataFetcherinterfaces evolve.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/grpc/server_test.go` around lines 21 - 33, The mockStore and mockFetcher implementations (types mockStore, mockFetcher and constructor newMockStore / newMockFetcher) are duplicated across service_test.go, jsonrpc/server_test.go and grpc/server_test.go; extract these shared test mocks into a single test helper package (e.g. internal/testutil or pkg/api/apitest) containing the mock types, constructors and any helper methods, update each test file to import and use those mock types (replace local mockStore/mockFetcher declarations and newMockStore/newMockFetcher calls with the shared versions), and run tests to ensure imports and interfaces (Store, DataFetcher) are satisfied so future interface changes are fixed in one place.pkg/api/notifier_test.go (1)
209-225: Good coverage of the max-subscribers limit; consider adding cleanup and usingt.Fatalfor the first two subscribes.Two minor observations:
- Subscriptions 1 and 2 are never unsubscribed. While not harmful in a short-lived test, adding
defer n.Unsubscribe(...)would be consistent with every other test in this file and would exercise the cleanup path.- Using
t.Errorffor the first two subscribes means the test continues even if the notifier is fundamentally broken.t.Fatalfwould fail fast and make diagnostics clearer (matching the pattern in other tests).Proposed cleanup
func TestNotifierMaxSubscribers(t *testing.T) { n := NewNotifier(1, 2, zerolog.Nop()) - _, err1 := n.Subscribe(nil) - _, err2 := n.Subscribe(nil) + sub1, err1 := n.Subscribe(nil) + sub2, err2 := n.Subscribe(nil) _, err3 := n.Subscribe(nil) if err1 != nil { - t.Errorf("first subscribe failed: %v", err1) + t.Fatalf("first subscribe failed: %v", err1) } + defer n.Unsubscribe(sub1) if err2 != nil { - t.Errorf("second subscribe failed: %v", err2) + t.Fatalf("second subscribe failed: %v", err2) } + defer n.Unsubscribe(sub2) if err3 == nil { t.Error("third subscribe should have failed") } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/notifier_test.go` around lines 209 - 225, In TestNotifierMaxSubscribers, ensure early failures abort the test and subscribers are cleaned up: after calling NewNotifier and n.Subscribe (the first two calls), use t.Fatalf (or t.Fatal) instead of t.Errorf when err1/err2 != nil so the test fails fast, and immediately defer n.Unsubscribe(...) for each successful subscription returned by Subscribe to exercise the cleanup path and match the pattern in other tests.pkg/fetch/celestia_node.go (1)
301-312: Consider usingmath/rand/v2for jitter instead ofUnixNano.
time.Now().UnixNano()is not random — concurrent calls in the same nanosecond get identical jitter, which defeats its purpose. For a single-fetcher scenario this is unlikely to matter, butrand.IntN(orrand.Int64N) frommath/rand/v2is trivially correct and avoids the subtlety.Proposed fix
+import "math/rand/v2" + func retryDelay(attempt int) time.Duration { base := defaultRPCRetryDelay * time.Duration(1<<attempt) jitterCap := base / 2 if jitterCap <= 0 { return base } - n := time.Now().UnixNano() - if n < 0 { - n = -n - } - return base + time.Duration(n%int64(jitterCap)) + return base + time.Duration(rand.Int64N(int64(jitterCap))) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/celestia_node.go` around lines 301 - 312, The jitter calculation in retryDelay uses time.Now().UnixNano() which can produce identical values across concurrent calls; replace that logic to use math/rand/v2's random integer generator (e.g., rand.Int63n) to compute the jitter: compute jitter := rand.Int63n(int64(jitterCap)) and return base + time.Duration(jitter); ensure the package initializes the global rand source once (e.g., seed in an init() with time.Now().UnixNano()) so retryDelay (and symbol retryDelay) uses a proper random jitter; keep the existing base and jitterCap calculations (defaultRPCRetryDelay referenced) and handle jitterCap <= 0 as before.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@config/load.go`:
- Around line 67-69: The generated config template in config/load.go currently
leaves write_timeout commented out which misleads users; update the template so
write_timeout: 30 is active (uncomment the line) to match DefaultConfig() and
the behavior of Load(), ensuring the generated YAML shows both read_timeout and
write_timeout with explicit values.
---
Outside diff comments:
In `@config/load.go`:
- Around line 205-210: The code performs duplicate validation of
profiling.listen_addr: validateProfiling(&cfg.Profiling) already enforces that
Profiling.ListenAddr is set when Profiling.Enabled is true, so remove the
redundant inline check that follows the call (the if block referencing
cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" and its fmt.Errorf
return). Keep the validateProfiling function as the single validation point for
Profiling configuration (refer to validateProfiling and cfg.Profiling fields)
and ensure no other duplicate checks remain in load.go.
In `@docs/running.md`:
- Around line 70-81: Docs missing three new config fields that exist in
defaultConfigYAML; update the "Full Config Reference" in docs/running.md to
include rpc.read_timeout and rpc.write_timeout (defaults from config/config.go
set to 30s) and subscription.max_subscribers (default 1024), matching the
entries in config/load.go's defaultConfigYAML and using the same names and
default values so the user-facing docs align with the actual defaults.
In `@pkg/fetch/celestia_node.go`:
- Around line 183-198: The Included method currently cannot use callRawWithRetry
because callRawWithRetry only wraps functions returning (json.RawMessage, error)
while ProofForwarder.Included must return (bool, error); update the code by
either (A) adding a thin retry helper (e.g., callBoolWithRetry) that mirrors
callRawWithRetry but handles (bool, error) and use it from
CelestiaNodeFetcher.Included, or (B) if proofs intentionally skip retries, add a
short clarifying comment above CelestiaNodeFetcher.Included explaining why retry
semantics are omitted; additionally consider applying callRawWithRetry to
GetProof (which returns json.RawMessage) to reuse retry logic used by
GetHeader/GetBlobs/GetNetworkHead. Ensure you reference and update the
functions/methods: callRawWithRetry, CelestiaNodeFetcher.Included,
CelestiaNodeFetcher.GetProof, and ProofForwarder.Included.
---
Nitpick comments:
In `@config/load_test.go`:
- Around line 10-26: The test TestDefaultConfigProfilingDefaults now checks sync
defaults too and its name is misleading; rename the test (e.g.,
TestDefaultConfigDefaults) and extend its assertions to cover the newly added
defaults from DefaultConfig: verify cfg.RPC.ReadTimeout and cfg.RPC.WriteTimeout
equal the expected default durations and verify cfg.Subscription.MaxSubscribers
equals its expected default integer; update any error messages in
t.Fatal/t.Fatalf to reference the new test name and the exact expected values to
make failures clear.
In `@pkg/api/grpc/server_test.go`:
- Around line 21-33: The mockStore and mockFetcher implementations (types
mockStore, mockFetcher and constructor newMockStore / newMockFetcher) are
duplicated across service_test.go, jsonrpc/server_test.go and
grpc/server_test.go; extract these shared test mocks into a single test helper
package (e.g. internal/testutil or pkg/api/apitest) containing the mock types,
constructors and any helper methods, update each test file to import and use
those mock types (replace local mockStore/mockFetcher declarations and
newMockStore/newMockFetcher calls with the shared versions), and run tests to
ensure imports and interfaces (Store, DataFetcher) are satisfied so future
interface changes are fixed in one place.
In `@pkg/api/notifier_test.go`:
- Around line 209-225: In TestNotifierMaxSubscribers, ensure early failures
abort the test and subscribers are cleaned up: after calling NewNotifier and
n.Subscribe (the first two calls), use t.Fatalf (or t.Fatal) instead of t.Errorf
when err1/err2 != nil so the test fails fast, and immediately defer
n.Unsubscribe(...) for each successful subscription returned by Subscribe to
exercise the cleanup path and match the pattern in other tests.
In `@pkg/fetch/celestia_node.go`:
- Around line 301-312: The jitter calculation in retryDelay uses
time.Now().UnixNano() which can produce identical values across concurrent
calls; replace that logic to use math/rand/v2's random integer generator (e.g.,
rand.Int63n) to compute the jitter: compute jitter :=
rand.Int63n(int64(jitterCap)) and return base + time.Duration(jitter); ensure
the package initializes the global rand source once (e.g., seed in an init()
with time.Now().UnixNano()) so retryDelay (and symbol retryDelay) uses a proper
random jitter; keep the existing base and jitterCap calculations
(defaultRPCRetryDelay referenced) and handle jitterCap <= 0 as before.
Overview
Summary by CodeRabbit
New Features
Bug Fixes
Chores
Documentation
Tests