feat/refactor: add db backfill and migrate jsonrpc comet to cosmos sdk grpc#34
feat/refactor: add db backfill and migrate jsonrpc comet to cosmos sdk grpc#34tac0turtle merged 11 commits intomainfrom
Conversation
Add an S3Store implementation that batches multiple heights into single S3 objects (chunks) to reduce object count and request costs. Writes are buffered in memory and flushed at checkpoint boundaries (SetSyncState). - New S3Config with bucket, prefix, region, endpoint, chunk_size - Store factory in main.go (sqlite/s3 switch) - 13 mock-based unit tests covering round-trips, chunk boundaries, buffer read-through, idempotent writes, commitment index lookups - Config validation for S3-specific required fields Also includes profiling support, backfill progress tracking, and metrics formatting fixes from the working tree. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Document SetMetrics must-call-before-use contract - Use dedicated nsMu for PutNamespace to avoid holding buffer mutex during S3 network I/O - Add flushMu to serialize flush() calls, preventing concurrent read-modify-write races on S3 chunk objects - Pre-allocate merged slice in flushHeaderChunk (lint fix) - Guard division by zero in backfill progress when done==0 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📝 WalkthroughWalkthroughThe PR migrates the Celestia app data fetcher from HTTP/CometBFT RPC to gRPC/Cosmos SDK endpoints, updates configuration fields and docs, introduces a pluggable backfill package (RPC and DB sources plus a concurrent Runner), adds protobuf definitions for block access, and wires backfill into coordinator/CLI wiring and tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as Caller
participant Runner as backfill.Runner
participant Progress as Progress Reporter
participant Pool as Worker Pool
participant Source as backfill.Source
participant Store as Store
participant Observer as Observer
participant Metrics as Metrics
Caller->>Runner: Run(ctx, fromHeight, toHeight)
Runner->>Metrics: Initialize metrics
Runner->>Progress: Start reporter
Progress->>Progress: Periodic updates (every 10s)
Runner->>Runner: Iterate height ranges (batches)
rect rgba(100,150,255,0.5)
Note over Runner,Pool: Process Batch (concurrent workers)
Runner->>Pool: Distribute heights
par Concurrent height work
Pool->>Source: FetchHeight(height, namespaces)
Source-->>Pool: Header + Blobs
Pool->>Store: StoreHeader(header)
Store-->>Pool: OK
Pool->>Store: StoreBlobs(blobs)
Store-->>Pool: OK
Pool->>Observer: NotifyHeight(height, header, blobs)
Observer-->>Pool: OK
Pool->>Metrics: Record timings/errors
end
end
Runner->>Store: UpdateSyncStatus(latestHeight)
alt error
Runner->>Progress: Stop reporter
Runner-->>Caller: error
else success
Progress->>Progress: Stop reporter
Runner-->>Caller: done
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly refactors the data ingestion and API layers by migrating Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request refactors the Celestia-app data source to use Cosmos SDK gRPC instead of CometBFT RPC, introducing new generated protobuf files for Tendermint queries. It updates configuration fields and documentation to reflect this change, including renaming CelestiaAppURL to CelestiaAppGRPCAddr. The CelestiaAppFetcher is rewritten to use gRPC for fetching blocks and headers, and its SubscribeHeaders method now polls for new blocks instead of using WebSockets. Additionally, the backfill process is refactored to use a new backfill.Runner component, which introduces retry logic for transient RPC errors when fetching data from Celestia nodes. The API subscription mechanism is enhanced with a MaxSubscribers limit, and HTTP RPC endpoints now support configurable ReadTimeout and WriteTimeout values. Review comments highlighted a potential security risk with bearerCreds explicitly disabling transport security, a uint64 underflow vulnerability in totalHeights and heights calculations, and a concern about out-of-order event delivery during backfill due to concurrent observer calls.
| return map[string]string{"authorization": "Bearer " + b.token}, nil | ||
| } | ||
|
|
||
| func (b bearerCreds) RequireTransportSecurity() bool { return false } |
There was a problem hiding this comment.
The bearerCreds implementation of grpc.PerRPCCredentials explicitly disables transport security requirements by returning false from RequireTransportSecurity(). Combined with the use of insecure.NewCredentials() on line 47, this allows authentication tokens to be transmitted over plaintext connections if the indexer is configured with a remote gRPC address that does not use TLS. This exposes sensitive authentication tokens to network interception.
Consider requiring transport security by default and making it configurable only if absolutely necessary for local development.
| func (b bearerCreds) RequireTransportSecurity() bool { return false } | |
| func (b bearerCreds) RequireTransportSecurity() bool { return true } |
| return fmt.Errorf("get namespaces: %w", err) | ||
| } | ||
|
|
||
| totalHeights := toHeight - fromHeight + 1 |
There was a problem hiding this comment.
The calculation of totalHeights does not account for the case where fromHeight > toHeight, which will result in a uint64 underflow. This will cause incorrect progress reporting and potential division by zero in the progress goroutine. It is better to validate the range at the start of the method.
| totalHeights := toHeight - fromHeight + 1 | |
| if fromHeight > toHeight { | |
| return nil | |
| } | |
| totalHeights := toHeight - fromHeight + 1 |
| } | ||
|
|
||
| func (r *Runner) processBatch(ctx context.Context, from, to uint64, namespaces []types.Namespace) error { | ||
| heights := make(chan uint64, to-from+1) |
There was a problem hiding this comment.
The channel allocation make(chan uint64, to-from+1) will panic if from > to due to uint64 underflow. While the current caller logic in Run prevents this, the processBatch method should be defensive against invalid ranges to avoid runtime panics if used independently.
| heights := make(chan uint64, to-from+1) | |
| if from > to { | |
| return nil | |
| } | |
| heights := make(chan uint64, to-from+1) |
|
|
||
| if r.Observer != nil { | ||
| stageStart = time.Now() | ||
| r.Observer(height, hdr, blobs) |
There was a problem hiding this comment.
The Observer is called concurrently from multiple worker goroutines. When this observer is wired to the notifier (as seen in cmd/apex/main.go), it will cause events to be published to API clients out-of-order during backfill. This triggers frequent 'non-contiguous event delivery' warnings in the notifier and may break clients that expect sequential block updates. Consider collecting batch results and publishing them sequentially, or disabling the observer during historical backfills.
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (10)
pkg/fetch/celestia_app_test.go (3)
237-249:TestCloseIdempotentdials a real address — works due to lazy connect but is fragile.
NewCelestiaAppFetcher("localhost:26657", ...)relies on gRPC lazy connect semantics. This is fine today, but if the constructor or dial options change to usegrpc.WithBlock()in the future, this test will hang or fail. Consider using thebufconn-basednewTestFetcherfor consistency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/celestia_app_test.go` around lines 237 - 249, TestCloseIdempotent currently constructs a real dial to "localhost:26657" via NewCelestiaAppFetcher which relies on lazy gRPC connects and is fragile; change the test to use the bufconn-based helper newTestFetcher to create the fetcher so it doesn't perform a real network dial: replace the NewCelestiaAppFetcher call in TestCloseIdempotent with newTestFetcher (or the project's equivalent test helper) and keep the existing Close() idempotency assertions against the returned fetcher instance.
54-58:t.Logfinside a goroutine can race with test completion.Calling
t.Logffrom a goroutine that outlives the test function body causes a panic. Heresrv.Stop()in the cleanup (line 59) should causeServeto return before cleanup completes, so this is likely safe in practice. However, ifStopwere replaced withGracefulStopor the server takes time shutting down, this could become a problem.A safer pattern is to log to a standalone logger or simply discard the error:
Suggested change
go func() { - if err := srv.Serve(lis); err != nil { - t.Logf("grpc serve: %v", err) - } + _ = srv.Serve(lis) }()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/celestia_app_test.go` around lines 54 - 58, The test currently calls t.Logf from inside the goroutine running srv.Serve, which can race with test teardown; instead avoid calling testing.T methods from background goroutines—either replace t.Logf with a standalone logger (e.g., log.Printf) or send the error back to the main test goroutine via a channel and handle/log it there (capture the error from srv.Serve in the goroutine, send to an errCh, then read and call t.Logf in the test function). Update the goroutine around srv.Serve and any cleanup logic so only the main test goroutine uses t.Logf.
251-252: Remove the unusedbase64import and dummy variable assignment.The
base64import is unused incelestia_app.goand only appears in the test file via the dummyvar _ = base64.StdEncodingassignment. The accompanying comment references "blobtx helpers" that do not exist in the codebase. Remove both the import and the dummy assignment to eliminate this maintenance burden.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/celestia_app_test.go` around lines 251 - 252, Remove the unused base64 import and the dummy assignment var _ = base64.StdEncoding from the test file; locate the reference to base64 (the import and the var _ = base64.StdEncoding line) in celestia_app_test.go or its imports, delete both the import entry and the dummy variable, and run `go test`/`go vet` to ensure nothing else depends on base64 before committing.docs/running.md (1)
60-65: Full Config Reference also shows both ports as 9090.Same concern here — in the full reference, both the upstream
celestia_app_grpc_addrand the localgrpc_listen_addrare9090. A comment clarifying the distinction (remote source vs. local server) would help avoid confusion.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/running.md` around lines 60 - 65, The docs show both celestia_app_grpc_addr and grpc_listen_addr using port 9090 which can confuse readers; update the YAML example and/or inline comments to clearly distinguish that celestia_app_grpc_addr is the remote/upstream Celestia app gRPC endpoint (the external host:port your node queries) while grpc_listen_addr is the local gRPC server bind address for this service, and ensure the full config reference mirrors that clarification (e.g., add a comment next to celestia_app_grpc_addr stating "remote/upstream app gRPC endpoint" and a comment next to grpc_listen_addr stating "local bind address").pkg/backfill/runner.go (2)
121-174: Batch processing: considererrgroupfor cleaner worker orchestration.The manual
sync.WaitGroup+sync.Mutex+firstErrpattern works correctly, butgolang.org/x/sync/errgroup(withSetLimit) would eliminate the boilerplate and provide the same semantics with less surface area for bugs.Sketch using errgroup
func (r *Runner) processBatch(ctx context.Context, from, to uint64, namespaces []types.Namespace) error { g, gctx := errgroup.WithContext(ctx) g.SetLimit(r.Concurrency) for h := from; h <= to; h++ { h := h g.Go(func() error { return r.processHeight(gctx, h, namespaces) }) } return g.Wait() }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/backfill/runner.go` around lines 121 - 174, Replace the manual worker orchestration in processBatch with an errgroup: create an errgroup with errgroup.WithContext(ctx), call g.SetLimit(r.Concurrency), loop h from from to to and capture h in the loop, then use g.Go to call r.processHeight(gctx, h, namespaces) for each height, and finally return g.Wait(); this removes the sync.WaitGroup/sync.Mutex/firstErr boilerplate while preserving cancellation and error propagation.
45-74: Progress reporter goroutine: minor log inaccuracy and edge case.Line 64:
fromHeight+donereports a height that is one past the last completed batch end (sincedone = batchEnd - fromHeight + 1). ConsiderfromHeight + done - 1for the actual last completed height.Line 62:
ratecan be+Infifelapsed.Seconds()is near zero on the first tick. Not a crash, butremainingwould be0s(orNaN), producing a confusing log line. A guard likeif elapsed < time.Second { continue }at the top would avoid this.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/backfill/runner.go` around lines 45 - 74, The progress reporter goroutine logs an off-by-one last-completed height and can compute an infinite/NaN rate when elapsed is near zero; inside the anonymous goroutine that reads processedHeights, startTime, totalHeights, fromHeight and toHeight, change the logged height from fromHeight+done to fromHeight+done-1 (but only when done>0) and add a short elapsed guard before computing rate/remaining (e.g., if elapsed < time.Second { continue } or skip rate/ETA calculation until elapsed>0) so you don't divide by a near-zero elapsed and produce +Inf/NaN in the r.Log.Info call; keep existing done==0 handling.pkg/backfill/rpc/source_test.go (2)
12-56: Mock fetchers don't validate input arguments (height, namespaces).Both
mockFetcherandfallbackFetcherignore theheightandnamespacesparameters. This means the tests can't detect ifSource.FetchHeightpasses the wrong height to the underlying fetcher. Consider capturing and asserting the received arguments in at least one test case.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/backfill/rpc/source_test.go` around lines 12 - 56, The mock fetchers (mockFetcher and fallbackFetcher) ignore the height and namespaces parameters so tests can't assert the arguments; update the mock implementations of GetHeader, GetBlobs, and GetHeightData to record the received height and namespaces (e.g., add fields like lastHeight and lastNamespaces or slices of calls) and return them so tests can assert that Source.FetchHeight passed the expected height/namespaces; update at least one test to check those recorded fields after calling Source.FetchHeight to validate the correct arguments were forwarded.
58-139: Tests should use table-driven pattern per coding guidelines.The three separate test functions cover the main scenarios well, but the coding guidelines require table-driven tests. These cases (combined path, fallback, error propagation) are good candidates for consolidation into a single table-driven test with subtests.
Also, there's no test for the combined fetcher returning an error — only the fallback error path is exercised via
fallbackFetcher.Sketch of table-driven refactor
func TestSourceFetchHeight(t *testing.T) { tests := []struct { name string fetcher fetch.DataFetcher height uint64 ns []types.Namespace wantHdr uint64 wantBlobs int wantErr error }{ { name: "combined fetcher", fetcher: &mockFetcher{ hdr: &types.Header{Height: 10}, blobs: []types.Blob{{Height: 10}}, }, height: 10, ns: []types.Namespace{{}}, wantHdr: 10, wantBlobs: 1, }, { name: "fallback mode", fetcher: &fallbackFetcher{hdr: &types.Header{Height: 5}}, height: 5, wantHdr: 5, }, { name: "error propagation", fetcher: &fallbackFetcher{err: store.ErrNotFound}, height: 99, ns: []types.Namespace{{}}, wantErr: store.ErrNotFound, }, // Add: combined fetcher error case } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := NewSource(tt.fetcher) hdr, blobs, err := s.FetchHeight(context.Background(), tt.height, tt.ns) if tt.wantErr != nil { if !errors.Is(err, tt.wantErr) { t.Fatalf("expected %v, got %v", tt.wantErr, err) } return } if err != nil { t.Fatalf("unexpected error: %v", err) } if hdr.Height != tt.wantHdr { t.Fatalf("header height = %d, want %d", hdr.Height, tt.wantHdr) } if len(blobs) != tt.wantBlobs { t.Fatalf("blob count = %d, want %d", len(blobs), tt.wantBlobs) } }) } }As per coding guidelines: "Use table-driven tests pattern for test organization".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/backfill/rpc/source_test.go` around lines 58 - 139, Consolidate the three tests TestSourceFetchHeight_UsesCombinedFetcherWhenAvailable, TestSourceFetchHeight_FallbackMode and TestSourceFetchHeight_PropagatesErrors into a single table-driven test (e.g., TestSourceFetchHeight) that iterates cases with different fetcher implementations (mockFetcher, fallbackFetcher) and inputs passed to NewSource(...). For each table entry run a subtest that calls s.FetchHeight(ctx, height, ns) and asserts hdr height, blob count and error expectations using errors.Is; add an extra table case where the combined fetcher (mockFetcher) returns an error to verify combined-path error propagation. Ensure you keep/ reuse the existing helper types mockFetcher and fallbackFetcher and check combinedCalls/hdrCalls/blobCalls where applicable.pkg/fetch/celestia_app.go (2)
32-41:bearerCredstype may be dead code if PerRPCCredentials dial option is removed.If you adopt the fix for the duplicate-auth issue above,
bearerCredsbecomes unused. If you instead keep PerRPCCredentials and dropwithAuth, thenbearerCredsstays. Either way, ensure only one credential mechanism remains, and remove the other.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/celestia_app.go` around lines 32 - 41, The review notes that bearerCreds (type bearerCreds and its methods GetRequestMetadata and RequireTransportSecurity) may be dead if you remove the PerRPCCredentials dial option; resolve the duplication by choosing one credential mechanism and removing the other: either keep PerRPCCredentials (retain bearerCreds and remove/stop using withAuth) or keep withAuth (remove bearerCreds and any PerRPCCredentials usage), and update any dial/connection code that references bearerCreds, PerRPCCredentials, or withAuth to match the chosen approach so only one auth path remains.
96-98:uint64→int64cast may silently overflow for heights >math.MaxInt64.
Height: int64(height)wraps to a negative value whenheightexceedsmath.MaxInt64. While unlikely in practice today, a defensive guard prevents a confusing negative-height request reaching the server.Proposed fix
+ if height > uint64(math.MaxInt64) { + return nil, nil, fmt.Errorf("height %d overflows int64", height) + } resp, err := f.client.GetBlockByHeight(f.withAuth(ctx), &cometpb.GetBlockByHeightRequest{ Height: int64(height), })(Also add
"math"to imports.)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/celestia_app.go` around lines 96 - 98, The code casts a uint64 height to int64 in the GetBlockByHeightRequest (f.client.GetBlockByHeight with GetBlockByHeightRequest{Height: int64(height)}) which will overflow for heights > math.MaxInt64; modify the call site to guard against overflow by checking if height > math.MaxInt64 and return an error (or handle appropriately) before casting, then perform the safe cast to int64 only when within range; also add "math" to the imports so you can compare against math.MaxInt64.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/running.md`:
- Around line 51-54: The example config uses the same port for Apex gRPC and
celestia-app gRPC which causes a bind conflict; update the Minimal Config
(Consensus Node) example so the celestia_app_grpc_addr value uses a different
port (e.g., change celestia_app_grpc_addr from "localhost:9090" to
"localhost:9091") or alternatively change grpc_listen_addr, and add a short
comment noting that grpc_listen_addr and celestia_app_grpc_addr must not share
the same port when running Apex and celestia-app on the same host; ensure you
update the example entry for celestia_app_grpc_addr and the accompanying comment
text.
In `@pkg/backfill/runner.go`:
- Around line 128-131: The code computes workers from r.Concurrency which can be
≤ 0, causing no goroutines to run and the batch to be silently skipped; update
the code that computes workers (the r.Concurrency → workers logic used before
the goroutine loop and wg.Wait()) to validate r.Concurrency and either return an
error when r.Concurrency <= 0 or default workers to 1 (choose project
convention), then recompute the min with the range size (to - from + 1) so the
subsequent for-range worker spawn actually runs; ensure the check references
r.Concurrency, the local workers variable, and the goroutine/WaitGroup section
(where wg.Wait() and firstErr are used).
- Around line 76-78: The loop in Runner.Run uses uint64(r.BatchSize) and will
never advance if r.BatchSize == 0; validate r.BatchSize at the start of Run
(e.g., in func (r *Runner) Run(...)) and return an error (or set a sane default)
if BatchSize <= 0 to prevent an infinite loop; reference r.BatchSize and the for
loop that uses uint64(r.BatchSize) and ensure the validation occurs before any
use of fromHeight/toHeight batching.
In `@pkg/fetch/celestia_app.go`:
- Around line 226-228: The code calls hdr.Time.AsTime() without checking
hdr.Time for nil which can panic; update the handling around hdr := block.Header
to check if hdr.Time == nil before calling AsTime(), and handle that case (e.g.,
skip/return an error or use time.Time{}), modifying the block-processing logic
where hdr and hdr.Time.AsTime() are used so you don't dereference a nil pointer
(look for the hdr variable and the AsTime() call to locate the change).
- Around line 46-51: The code currently attaches the same Authorization header
twice: via grpc.WithPerRPCCredentials(bearerCreds{token: authToken}) and again
in withAuth (which calls grpcmd.AppendToOutgoingContext); remove the Per-RPC
credentials usage and only set the token in withAuth. Concretely, delete the
branch that appends grpc.WithPerRPCCredentials(...) to opts (the bearerCreds
usage) so opts only contains transport credentials, ensure withAuth continues to
append the "authorization: Bearer <token>" metadata, and remove the bearerCreds
type/implementation if it becomes unused.
In `@proto/cosmos/base/tendermint/v1beta1/query.proto`:
- Around line 37-43: The Header protobuf message currently defines chain_id as
bytes which produces []byte in generated Go; change the field type in the
message Header by replacing "bytes chain_id = 3;" with "string chain_id = 3;" so
it matches CometBFT's types.proto and yields a Go string; update any generated
artifacts (regenerate protobuf bindings) and adjust any call sites that assumed
[]byte to use string (references: message Header, field chain_id).
---
Nitpick comments:
In `@docs/running.md`:
- Around line 60-65: The docs show both celestia_app_grpc_addr and
grpc_listen_addr using port 9090 which can confuse readers; update the YAML
example and/or inline comments to clearly distinguish that
celestia_app_grpc_addr is the remote/upstream Celestia app gRPC endpoint (the
external host:port your node queries) while grpc_listen_addr is the local gRPC
server bind address for this service, and ensure the full config reference
mirrors that clarification (e.g., add a comment next to celestia_app_grpc_addr
stating "remote/upstream app gRPC endpoint" and a comment next to
grpc_listen_addr stating "local bind address").
In `@pkg/backfill/rpc/source_test.go`:
- Around line 12-56: The mock fetchers (mockFetcher and fallbackFetcher) ignore
the height and namespaces parameters so tests can't assert the arguments; update
the mock implementations of GetHeader, GetBlobs, and GetHeightData to record the
received height and namespaces (e.g., add fields like lastHeight and
lastNamespaces or slices of calls) and return them so tests can assert that
Source.FetchHeight passed the expected height/namespaces; update at least one
test to check those recorded fields after calling Source.FetchHeight to validate
the correct arguments were forwarded.
- Around line 58-139: Consolidate the three tests
TestSourceFetchHeight_UsesCombinedFetcherWhenAvailable,
TestSourceFetchHeight_FallbackMode and TestSourceFetchHeight_PropagatesErrors
into a single table-driven test (e.g., TestSourceFetchHeight) that iterates
cases with different fetcher implementations (mockFetcher, fallbackFetcher) and
inputs passed to NewSource(...). For each table entry run a subtest that calls
s.FetchHeight(ctx, height, ns) and asserts hdr height, blob count and error
expectations using errors.Is; add an extra table case where the combined fetcher
(mockFetcher) returns an error to verify combined-path error propagation. Ensure
you keep/ reuse the existing helper types mockFetcher and fallbackFetcher and
check combinedCalls/hdrCalls/blobCalls where applicable.
In `@pkg/backfill/runner.go`:
- Around line 121-174: Replace the manual worker orchestration in processBatch
with an errgroup: create an errgroup with errgroup.WithContext(ctx), call
g.SetLimit(r.Concurrency), loop h from from to to and capture h in the loop,
then use g.Go to call r.processHeight(gctx, h, namespaces) for each height, and
finally return g.Wait(); this removes the sync.WaitGroup/sync.Mutex/firstErr
boilerplate while preserving cancellation and error propagation.
- Around line 45-74: The progress reporter goroutine logs an off-by-one
last-completed height and can compute an infinite/NaN rate when elapsed is near
zero; inside the anonymous goroutine that reads processedHeights, startTime,
totalHeights, fromHeight and toHeight, change the logged height from
fromHeight+done to fromHeight+done-1 (but only when done>0) and add a short
elapsed guard before computing rate/remaining (e.g., if elapsed < time.Second {
continue } or skip rate/ETA calculation until elapsed>0) so you don't divide by
a near-zero elapsed and produce +Inf/NaN in the r.Log.Info call; keep existing
done==0 handling.
In `@pkg/fetch/celestia_app_test.go`:
- Around line 237-249: TestCloseIdempotent currently constructs a real dial to
"localhost:26657" via NewCelestiaAppFetcher which relies on lazy gRPC connects
and is fragile; change the test to use the bufconn-based helper newTestFetcher
to create the fetcher so it doesn't perform a real network dial: replace the
NewCelestiaAppFetcher call in TestCloseIdempotent with newTestFetcher (or the
project's equivalent test helper) and keep the existing Close() idempotency
assertions against the returned fetcher instance.
- Around line 54-58: The test currently calls t.Logf from inside the goroutine
running srv.Serve, which can race with test teardown; instead avoid calling
testing.T methods from background goroutines—either replace t.Logf with a
standalone logger (e.g., log.Printf) or send the error back to the main test
goroutine via a channel and handle/log it there (capture the error from
srv.Serve in the goroutine, send to an errCh, then read and call t.Logf in the
test function). Update the goroutine around srv.Serve and any cleanup logic so
only the main test goroutine uses t.Logf.
- Around line 251-252: Remove the unused base64 import and the dummy assignment
var _ = base64.StdEncoding from the test file; locate the reference to base64
(the import and the var _ = base64.StdEncoding line) in celestia_app_test.go or
its imports, delete both the import entry and the dummy variable, and run `go
test`/`go vet` to ensure nothing else depends on base64 before committing.
In `@pkg/fetch/celestia_app.go`:
- Around line 32-41: The review notes that bearerCreds (type bearerCreds and its
methods GetRequestMetadata and RequireTransportSecurity) may be dead if you
remove the PerRPCCredentials dial option; resolve the duplication by choosing
one credential mechanism and removing the other: either keep PerRPCCredentials
(retain bearerCreds and remove/stop using withAuth) or keep withAuth (remove
bearerCreds and any PerRPCCredentials usage), and update any dial/connection
code that references bearerCreds, PerRPCCredentials, or withAuth to match the
chosen approach so only one auth path remains.
- Around line 96-98: The code casts a uint64 height to int64 in the
GetBlockByHeightRequest (f.client.GetBlockByHeight with
GetBlockByHeightRequest{Height: int64(height)}) which will overflow for heights
> math.MaxInt64; modify the call site to guard against overflow by checking if
height > math.MaxInt64 and return an error (or handle appropriately) before
casting, then perform the safe cast to int64 only when within range; also add
"math" to the imports so you can compare against math.MaxInt64.
| celestia_app_grpc_addr: "localhost:9090" | ||
| namespaces: | ||
| - "0000000000000000000000000000000000000000000000000000deadbeef" | ||
| ``` |
There was a problem hiding this comment.
Port conflict in the "app" example: both Apex gRPC and celestia-app gRPC default to 9090.
In the Minimal Config (Consensus Node) example, celestia_app_grpc_addr defaults to localhost:9090, which is the same as grpc_listen_addr: ":9090" (line 72). When running Apex and celestia-app on the same machine, the Apex gRPC server will fail to bind. Consider noting this in the example or using a different default for one of the two.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@docs/running.md` around lines 51 - 54, The example config uses the same port
for Apex gRPC and celestia-app gRPC which causes a bind conflict; update the
Minimal Config (Consensus Node) example so the celestia_app_grpc_addr value uses
a different port (e.g., change celestia_app_grpc_addr from "localhost:9090" to
"localhost:9091") or alternatively change grpc_listen_addr, and add a short
comment noting that grpc_listen_addr and celestia_app_grpc_addr must not share
the same port when running Apex and celestia-app on the same host; ensure you
update the example entry for celestia_app_grpc_addr and the accompanying comment
text.
| for batchStart := fromHeight; batchStart <= toHeight; batchStart += uint64(r.BatchSize) { | ||
| batchStartTime := time.Now() | ||
| batchEnd := batchStart + uint64(r.BatchSize) - 1 |
There was a problem hiding this comment.
Infinite loop when BatchSize is 0.
If r.BatchSize is zero (e.g., from a zero-value Runner or misconfiguration), uint64(r.BatchSize) is 0, so batchStart never advances and this loop runs forever. A negative BatchSize converts to a huge uint64, causing the loop to exit after one oversized batch.
Proposed fix: validate at the top of Run
func (r *Runner) Run(ctx context.Context, fromHeight, toHeight uint64) error {
+ if r.BatchSize <= 0 {
+ return fmt.Errorf("batch size must be positive, got %d", r.BatchSize)
+ }
+ if r.Concurrency <= 0 {
+ return fmt.Errorf("concurrency must be positive, got %d", r.Concurrency)
+ }
+ if fromHeight > toHeight {
+ return nil // nothing to do
+ }
if r.Metrics == nil {
r.Metrics = metrics.Nop()
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/backfill/runner.go` around lines 76 - 78, The loop in Runner.Run uses
uint64(r.BatchSize) and will never advance if r.BatchSize == 0; validate
r.BatchSize at the start of Run (e.g., in func (r *Runner) Run(...)) and return
an error (or set a sane default) if BatchSize <= 0 to prevent an infinite loop;
reference r.BatchSize and the for loop that uses uint64(r.BatchSize) and ensure
the validation occurs before any use of fromHeight/toHeight batching.
| workers := r.Concurrency | ||
| if int(to-from+1) < workers { | ||
| workers = int(to - from + 1) | ||
| } |
There was a problem hiding this comment.
Zero workers if Concurrency ≤ 0 silently skips the entire batch.
When workers is 0, for range workers (line 139) spawns no goroutines, wg.Wait() returns immediately, firstErr remains nil, and the batch appears successful despite processing zero heights.
This is addressed by the input validation proposed above.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/backfill/runner.go` around lines 128 - 131, The code computes workers
from r.Concurrency which can be ≤ 0, causing no goroutines to run and the batch
to be silently skipped; update the code that computes workers (the r.Concurrency
→ workers logic used before the goroutine loop and wg.Wait()) to validate
r.Concurrency and either return an error when r.Concurrency <= 0 or default
workers to 1 (choose project convention), then recompute the min with the range
size (to - from + 1) so the subsequent for-range worker spawn actually runs;
ensure the check references r.Concurrency, the local workers variable, and the
goroutine/WaitGroup section (where wg.Wait() and firstErr are used).
| opts := []grpc.DialOption{ | ||
| grpc.WithTransportCredentials(insecure.NewCredentials()), | ||
| } | ||
| if authToken != "" { | ||
| opts = append(opts, grpc.WithPerRPCCredentials(bearerCreds{token: authToken})) | ||
| } |
There was a problem hiding this comment.
Duplicate bearer token: PerRPCCredentials and manual metadata both send authorization.
When authToken is non-empty, grpc.WithPerRPCCredentials(bearerCreds{...}) (line 50) already attaches "authorization": "Bearer <token>" to every outgoing RPC. Then withAuth (line 214) appends the same key again via grpcmd.AppendToOutgoingContext. This results in the authorization metadata key carrying two identical values per call, which some servers may reject or misinterpret.
Pick one mechanism — either PerRPCCredentials or manual metadata — not both.
Proposed fix: remove PerRPCCredentials and rely on withAuth only
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
- if authToken != "" {
- opts = append(opts, grpc.WithPerRPCCredentials(bearerCreds{token: authToken}))
- }Also applies to: 210-215
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/fetch/celestia_app.go` around lines 46 - 51, The code currently attaches
the same Authorization header twice: via
grpc.WithPerRPCCredentials(bearerCreds{token: authToken}) and again in withAuth
(which calls grpcmd.AppendToOutgoingContext); remove the Per-RPC credentials
usage and only set the token in withAuth. Concretely, delete the branch that
appends grpc.WithPerRPCCredentials(...) to opts (the bearerCreds usage) so opts
only contains transport credentials, ensure withAuth continues to append the
"authorization: Bearer <token>" metadata, and remove the bearerCreds
type/implementation if it becomes unused.
| hdr := block.Header | ||
| t := hdr.Time.AsTime() | ||
|
|
There was a problem hiding this comment.
Nil-pointer dereference if hdr.Time is nil.
hdr.Time.AsTime() will panic if block.Header.Time is nil (e.g., a pruned/corrupt response). The nil checks on lines 219-224 guard block, block.Header, and blockID, but not hdr.Time.
Proposed fix
hdr := block.Header
+ if hdr.Time == nil {
+ return nil, fmt.Errorf("nil timestamp in header at height %d", hdr.Height)
+ }
t := hdr.Time.AsTime()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| hdr := block.Header | |
| t := hdr.Time.AsTime() | |
| hdr := block.Header | |
| if hdr.Time == nil { | |
| return nil, fmt.Errorf("nil timestamp in header at height %d", hdr.Height) | |
| } | |
| t := hdr.Time.AsTime() | |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/fetch/celestia_app.go` around lines 226 - 228, The code calls
hdr.Time.AsTime() without checking hdr.Time for nil which can panic; update the
handling around hdr := block.Header to check if hdr.Time == nil before calling
AsTime(), and handle that case (e.g., skip/return an error or use time.Time{}),
modifying the block-processing logic where hdr and hdr.Time.AsTime() are used so
you don't dereference a nil pointer (look for the hdr variable and the AsTime()
call to locate the change).
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (4)
pkg/backfill/db/source_test.go (1)
29-29: Unnecessary loop variable capture in Go 1.25.Since this project uses Go 1.25 (per
go.mod), the loop variabletcis already scoped per iteration. Thetc := tcre-capture is harmless but redundant.for _, tc := range cases { - tc := tc t.Run(tc.name, func(t *testing.T) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/backfill/db/source_test.go` at line 29, Remove the redundant re-capture statement "tc := tc" inside the table-driven test loop in source_test.go: locate the loop that iterates over test cases using the variable named tc and delete the "tc := tc" line so the per-iteration tc binding provided by Go 1.25 is used directly; then run the tests (and go vet/static checks) to confirm nothing else relies on that extra capture.pkg/backfill/db/source.go (2)
556-589: Test-only helpers are compiled into the production binary.
writableKV,writablePebble,writableLevel,openWritable, and all theencode*/writeTestBlockhelpers (lines 556–708) are in the non-test filesource.go. They ship with the production binary and increase its size without adding runtime value.Move these to a
source_testhelpers_test.gofile (or similar_test.gofile) so they're excluded from production builds.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/backfill/db/source.go` around lines 556 - 589, The helpers used only in tests (writableKV, writablePebble, writableLevel, openWritable, and the encode* / writeTestBlock helpers referenced in the diff) are defined in a non-test file and are being compiled into production; move all of these symbols into a new _test.go file (e.g., source_testhelpers_test.go) so they are only built for tests, update any imports or package-level visibility as needed, and remove their definitions from source.go; ensure function/type names (writableKV, writablePebble.put/close, writableLevel.put/close, openWritable, and all encode* / writeTestBlock helpers) remain unchanged so tests continue to compile.
529-553: Auto-detection silently swallows open errors — consider logging the pebble error before falling through to leveldb.In "auto" mode, if pebble fails to open a directory, the error is discarded. If the directory is actually a pebble DB with a corrupt manifest (rather than a leveldb), auto-detection would fall through to leveldb and either fail with an unhelpful error or (worse) open it silently.
Consider logging the pebble error at debug level before trying leveldb, to aid troubleshooting.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/backfill/db/source.go` around lines 529 - 553, The openKV function's "auto" case swallows the pebble.Open error; capture the first error (e.g., errPebble) when calling pebble.Open(path, ...) inside the "auto" branch and emit a debug-level log with the path and errPebble before attempting leveldb.OpenFile; keep the subsequent leveldb attempt and final error behavior unchanged so you only add a debug log for pebble.Open failures (referencing openKV, the "auto" case, and pebble.Open).config/load.go (1)
154-185: Dual defaulting ofBackfillSource,CelestiaAppDBBackend,CelestiaAppDBLayout.Lines 158–166 re-apply the same defaults already established in
DefaultConfig()(config.go lines 101–103). SinceLoad()decodes YAML on top ofDefaultConfig(), these fields are already"rpc"/"auto"/"auto"unless explicitly overridden. The validator defaults are only reachable if someone constructs aConfigbypassingDefaultConfig()or explicitly writes an empty string in YAML.This is harmless as a safety net, but consider consolidating defaults in one place (either
DefaultConfig()orvalidate()) to avoid future drift.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@config/load.go` around lines 154 - 185, The validator in Load()/validate() re-applies defaults for BackfillSource, CelestiaAppDBBackend, and CelestiaAppDBLayout that DefaultConfig() already sets, creating duplicate defaulting logic; consolidate defaults into a single place by removing the backup default assignments in the "app" case (the blocks that set ds.BackfillSource = "rpc", ds.CelestiaAppDBBackend = "auto", ds.CelestiaAppDBLayout = "auto") and rely on DefaultConfig() to supply them, or alternatively move those defaults into validate()/Load() and remove them from DefaultConfig(); update and run tests to ensure behavior is unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/fetch/celestia_app.go`:
- Around line 215-219: The code assigns json.Marshal(block) to the RawHeader
field and logs "marshal raw header", but that serializes the entire
cometpb.Block (including Data.Txs) instead of only the header; update the
serialization to json.Marshal(block.Header) so RawHeader truly contains only
header metadata (or, if intended to store the full block, rename the field from
RawHeader and change the error message to reflect full-block marshalling), and
update the error text and any usages referencing RawHeader, json.Marshal(block)
and cometpb.Block accordingly.
In `@proto/cosmos/base/tendermint/v1beta1/query.proto`:
- Around line 37-43: The Header protobuf message field numbers are misaligned
with CometBFT's types.proto causing corrupted runtime deserialization; update
the message Header to use the exact tag numbers from CometBFT (chain_id=2,
height=3, time=4, data_hash=7, and include consensus_hash=10 if present), then
regenerate Go bindings (buf) so CelestiaAppFetcher and mapBlockResponse()
consume correct fields; also update/restore celestia_app_test.go mocks to
construct cometpb.Header via the regenerated types (not the old tags) and re-run
tests.
---
Duplicate comments:
In `@pkg/fetch/celestia_app.go`:
- Around line 213-214: The code calls hdr.Time.AsTime() without checking
hdr.Time for nil (see hdr := block.Header and t := hdr.Time.AsTime()), which can
panic for pruned/malformed responses; update the block handling in the function
to check if hdr.Time == nil before calling AsTime() and handle that case (e.g.,
log/warn and return an error or use a safe zero/default time) so you avoid a
nil-pointer dereference when processing block.Header.Time.
---
Nitpick comments:
In `@config/load.go`:
- Around line 154-185: The validator in Load()/validate() re-applies defaults
for BackfillSource, CelestiaAppDBBackend, and CelestiaAppDBLayout that
DefaultConfig() already sets, creating duplicate defaulting logic; consolidate
defaults into a single place by removing the backup default assignments in the
"app" case (the blocks that set ds.BackfillSource = "rpc",
ds.CelestiaAppDBBackend = "auto", ds.CelestiaAppDBLayout = "auto") and rely on
DefaultConfig() to supply them, or alternatively move those defaults into
validate()/Load() and remove them from DefaultConfig(); update and run tests to
ensure behavior is unchanged.
In `@pkg/backfill/db/source_test.go`:
- Line 29: Remove the redundant re-capture statement "tc := tc" inside the
table-driven test loop in source_test.go: locate the loop that iterates over
test cases using the variable named tc and delete the "tc := tc" line so the
per-iteration tc binding provided by Go 1.25 is used directly; then run the
tests (and go vet/static checks) to confirm nothing else relies on that extra
capture.
In `@pkg/backfill/db/source.go`:
- Around line 556-589: The helpers used only in tests (writableKV,
writablePebble, writableLevel, openWritable, and the encode* / writeTestBlock
helpers referenced in the diff) are defined in a non-test file and are being
compiled into production; move all of these symbols into a new _test.go file
(e.g., source_testhelpers_test.go) so they are only built for tests, update any
imports or package-level visibility as needed, and remove their definitions from
source.go; ensure function/type names (writableKV, writablePebble.put/close,
writableLevel.put/close, openWritable, and all encode* / writeTestBlock helpers)
remain unchanged so tests continue to compile.
- Around line 529-553: The openKV function's "auto" case swallows the
pebble.Open error; capture the first error (e.g., errPebble) when calling
pebble.Open(path, ...) inside the "auto" branch and emit a debug-level log with
the path and errPebble before attempting leveldb.OpenFile; keep the subsequent
leveldb attempt and final error behavior unchanged so you only add a debug log
for pebble.Open failures (referencing openKV, the "auto" case, and pebble.Open).
|
|
||
| func (f *CelestiaAppFetcher) parseWSBlockEvent(msg []byte) (*types.Header, error) { | ||
| var event cometWSEvent | ||
| if err := json.Unmarshal(msg, &event); err != nil { | ||
| return nil, fmt.Errorf("unmarshal ws event: %w", err) | ||
| } | ||
|
|
||
| // Skip non-result messages (e.g. subscription confirmation has empty result). | ||
| if len(event.Result.Data.Value) == 0 { | ||
| return nil, nil //nolint:nilnil | ||
| } | ||
|
|
||
| var blockValue cometBlockEventValue | ||
| if err := json.Unmarshal(event.Result.Data.Value, &blockValue); err != nil { | ||
| return nil, fmt.Errorf("unmarshal block event value: %w", err) | ||
| } | ||
|
|
||
| hdr := blockValue.Block.Header | ||
| t, err := time.Parse(time.RFC3339Nano, hdr.Time) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("parse header time: %w", err) | ||
| } | ||
|
|
||
| raw, err := json.Marshal(blockValue) | ||
| raw, err := json.Marshal(block) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("marshal raw header: %w", err) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check RawHeader definition and usage
rg -n 'RawHeader' --type=go -B2 -A2Repository: evstack/apex
Length of output: 8652
🏁 Script executed:
#!/bin/bash
# Also examine the file context to understand the struct definition
cat -n pkg/fetch/celestia_app.go | head -250 | tail -100Repository: evstack/apex
Length of output: 2922
🏁 Script executed:
#!/bin/bash
# Check for Block struct definition to understand what's being marshaled
rg -n 'type Block struct' --type=go -A10Repository: evstack/apex
Length of output: 1231
RawHeader field name and error message suggest header-only serialization, but json.Marshal(block) actually serializes the entire block including transactions.
The field is named RawHeader and the error message says "marshal raw header", both indicating the intent to store only header metadata. However, the implementation serializes the full cometpb.Block (Header + Data.Txs). Since RawHeader is exposed via APIs, stored in the database, and sent to clients, this unnecessarily includes all transaction data. Either marshal only the header (block.Header instead of block), or rename the field to accurately reflect that it contains the entire block.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/fetch/celestia_app.go` around lines 215 - 219, The code assigns
json.Marshal(block) to the RawHeader field and logs "marshal raw header", but
that serializes the entire cometpb.Block (including Data.Txs) instead of only
the header; update the serialization to json.Marshal(block.Header) so RawHeader
truly contains only header metadata (or, if intended to store the full block,
rename the field from RawHeader and change the error message to reflect
full-block marshalling), and update the error text and any usages referencing
RawHeader, json.Marshal(block) and cometpb.Block accordingly.
Overview
Summary by CodeRabbit
New Features
Configuration
Documentation
Tests