Skip to content

Comments

feat/refactor: add db backfill and migrate jsonrpc comet to cosmos sdk grpc#34

Merged
tac0turtle merged 11 commits intomainfrom
marko/grpc_backfill
Feb 20, 2026
Merged

feat/refactor: add db backfill and migrate jsonrpc comet to cosmos sdk grpc#34
tac0turtle merged 11 commits intomainfrom
marko/grpc_backfill

Conversation

@tac0turtle
Copy link
Contributor

@tac0turtle tac0turtle commented Feb 20, 2026

Overview

Summary by CodeRabbit

  • New Features

    • Historical backfill: DB- and RPC-backed sources, a concurrent backfill runner with progress reporting, and public blob-extraction API.
    • Fetching updated to use gRPC for app data and subscriptions.
  • Configuration

    • Replaced celestia_app_url with celestia_app_grpc_addr; added backfill_source and DB backend/layout/path fields with defaults and validation.
  • Documentation

    • Updated running/config docs to reflect gRPC endpoint and new backfill options.
  • Tests

    • Added unit tests covering backfill sources, fetcher, and config validation.

tac0turtle and others added 8 commits February 19, 2026 20:55
Add an S3Store implementation that batches multiple heights into single
S3 objects (chunks) to reduce object count and request costs. Writes are
buffered in memory and flushed at checkpoint boundaries (SetSyncState).

- New S3Config with bucket, prefix, region, endpoint, chunk_size
- Store factory in main.go (sqlite/s3 switch)
- 13 mock-based unit tests covering round-trips, chunk boundaries,
  buffer read-through, idempotent writes, commitment index lookups
- Config validation for S3-specific required fields

Also includes profiling support, backfill progress tracking, and
metrics formatting fixes from the working tree.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Document SetMetrics must-call-before-use contract
- Use dedicated nsMu for PutNamespace to avoid holding buffer mutex
  during S3 network I/O
- Add flushMu to serialize flush() calls, preventing concurrent
  read-modify-write races on S3 chunk objects
- Pre-allocate merged slice in flushHeaderChunk (lint fix)
- Guard division by zero in backfill progress when done==0

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link

coderabbitai bot commented Feb 20, 2026

Warning

Rate limit exceeded

@tac0turtle has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 0 minutes and 26 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📝 Walkthrough

Walkthrough

The PR migrates the Celestia app data fetcher from HTTP/CometBFT RPC to gRPC/Cosmos SDK endpoints, updates configuration fields and docs, introduces a pluggable backfill package (RPC and DB sources plus a concurrent Runner), adds protobuf definitions for block access, and wires backfill into coordinator/CLI wiring and tests.

Changes

Cohort / File(s) Summary
Configuration & CLI Wiring
cmd/apex/main.go, config/config.go, config/load.go, docs/running.md, config/load_test.go
Replaces celestia_app_url with celestia_app_grpc_addr, adds backfill DB options (backfill_source, celestia_app_db_path, celestia_app_db_backend, celestia_app_db_layout) with validation and defaults; CLI wiring updated to use gRPC addr; adds test ensuring DB path required when backfill_source=db.
Celestia App Fetcher (HTTP→gRPC)
pkg/fetch/celestia_app.go, pkg/fetch/celestia_app_test.go
Replaces HTTP/WS JSON-RPC fetcher with a gRPC Cosmos SDK client; adds GetHeightData mapping, bearer-token creds, polling-based header subscription, and corresponding bufconn-based tests.
Backfill API & Runner
pkg/backfill/source.go, pkg/backfill/runner.go
Adds backfill.Source interface and HeightObserver type; implements Runner for concurrent, batched historical fetch/store/observe flow with progress reporting and metrics.
Backfill Sources: RPC & DB
pkg/backfill/rpc/source.go, pkg/backfill/rpc/source_test.go, pkg/backfill/db/source.go, pkg/backfill/db/source_test.go
Implements RPC-backed Source (wraps existing fetch.DataFetcher with unified/fallback fetch paths) and a DB-backed Source that reads Celestia blockstore (Pebble/LevelDB) with layout auto-detection; adds unit tests covering behaviors.
Sync & Coordinator Wiring
pkg/sync/backfill.go, pkg/sync/coordinator.go
Replaces inline backfill loop with backfill.Runner delegation; coordinator gains WithBackfillSource option and field to accept a backfill.Source and wires it into backfiller construction.
Blob extraction API
pkg/fetch/blobtx.go
Exports ExtractBlobsFromBlock wrapper to expose blob extraction by namespaces and height.
Protobuf & Dependencies
proto/cosmos/base/tendermint/v1beta1/query.proto, go.mod
Adds proto definitions for GetBlockByHeight/GetLatestBlock gRPC APIs and updates module dependencies to support new DB backfill and protobuf/gRPC usage; websocket dep reduced to indirect.
Tests & Misc
various *_test.go files
Adds/migrates multiple tests for gRPC fetcher, backfill sources, and config validation.

Sequence Diagram(s)

sequenceDiagram
    participant Caller as Caller
    participant Runner as backfill.Runner
    participant Progress as Progress Reporter
    participant Pool as Worker Pool
    participant Source as backfill.Source
    participant Store as Store
    participant Observer as Observer
    participant Metrics as Metrics

    Caller->>Runner: Run(ctx, fromHeight, toHeight)
    Runner->>Metrics: Initialize metrics
    Runner->>Progress: Start reporter
    Progress->>Progress: Periodic updates (every 10s)

    Runner->>Runner: Iterate height ranges (batches)
    rect rgba(100,150,255,0.5)
    Note over Runner,Pool: Process Batch (concurrent workers)
    Runner->>Pool: Distribute heights
    par Concurrent height work
        Pool->>Source: FetchHeight(height, namespaces)
        Source-->>Pool: Header + Blobs
        Pool->>Store: StoreHeader(header)
        Store-->>Pool: OK
        Pool->>Store: StoreBlobs(blobs)
        Store-->>Pool: OK
        Pool->>Observer: NotifyHeight(height, header, blobs)
        Observer-->>Pool: OK
        Pool->>Metrics: Record timings/errors
    end
    end

    Runner->>Store: UpdateSyncStatus(latestHeight)
    alt error
        Runner->>Progress: Stop reporter
        Runner-->>Caller: error
    else success
        Progress->>Progress: Stop reporter
        Runner-->>Caller: done
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐇 I nibble bytes and hop through logs,

gRPC tunnels in my burrowed bogs,
Batches hum and workers play,
Backfill bridges night to day,
Hooray — a carrot for each saved block!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 20.69% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The pull request title clearly and specifically summarizes the main changes: adding database backfill functionality and migrating from JSON-RPC CometBFT to Cosmos SDK gRPC endpoints, which are the primary themes across the changeset.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch marko/grpc_backfill

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly refactors the data ingestion and API layers by migrating celestia-app data fetching to Cosmos SDK gRPC and introducing a modular backfill system. These changes enhance reliability, performance, and configurability, particularly for handling historical data and network interactions. The update also includes improved error handling for API subscriptions and robust retry mechanisms for external RPC calls.

Highlights

  • Celestia-App Data Fetching Migration: The data fetching mechanism for celestia-app has been migrated from CometBFT JSONRPC and WebSocket to Cosmos SDK gRPC. This involves updating configuration, fetcher implementation, and removing legacy CometBFT client code.
  • New Backfill Mechanism: A new, more robust backfill system has been introduced, featuring backfill.Source and backfill.Runner abstractions. This allows for flexible data sources (e.g., RPC or local DB) and improved batch processing with concurrency.
  • Configuration Enhancements: New configuration options have been added for RPC read/write timeouts, and existing sync parameters (batch size, concurrency) and subscription limits (max subscribers) have been updated to more optimized defaults.
  • Subscription Notifier Improvements: The API event Notifier now includes a maximum subscriber limit and returns errors when this limit is reached or other subscription failures occur, improving resource management and error handling.
  • RPC Call Retry Logic: Retry logic with exponential backoff and jitter has been implemented for CelestiaNodeFetcher RPC calls to handle transient network errors more gracefully.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • cmd/apex/main.go
    • Updated logging fields for celestia-app data source to reflect gRPC address.
    • Modified CelestiaAppFetcher instantiation to use the new gRPC address.
    • Updated api.NewNotifier call to include MaxSubscribers parameter.
    • Added configurable ReadTimeout and WriteTimeout to the HTTP server configuration.
  • config/config.go
    • Renamed CelestiaAppURL to CelestiaAppGRPCAddr in DataSourceConfig.
    • Added ReadTimeout and WriteTimeout fields to RPCConfig.
    • Added MaxSubscribers field to SubscriptionConfig.
    • Updated default values for RPC, Sync, and Subscription configurations.
  • config/load.go
    • Updated comments and validation logic to reflect the celestia_app_grpc_addr change.
    • Added comments and validation for new read_timeout, write_timeout, and max_subscribers configuration fields.
  • config/load_test.go
    • Renamed TestDefaultConfigProfilingDefaults to TestDefaultConfigDefaults.
    • Added assertions for new default configuration values for sync batch size, concurrency, RPC timeouts, and max subscribers.
  • docs/running.md
    • Updated documentation to reflect the use of Cosmos SDK gRPC for celestia-app instead of CometBFT RPC.
    • Modified example configuration to use celestia_app_grpc_addr.
    • Added documentation for new RPC and subscription configuration parameters.
  • go.mod
    • Removed github.com/gorilla/websocket from direct dependencies, moving it to indirect.
  • pkg/api/grpc/blob_service.go
    • Modified Subscribe method to handle potential errors returned by the Notifier.
  • pkg/api/grpc/gen/cosmos/base/tendermint/v1beta1/query.pb.go
    • Added new protobuf definitions for Cosmos SDK Tendermint queries, including messages for blocks, headers, and data.
  • pkg/api/grpc/gen/cosmos/base/tendermint/v1beta1/query_grpc.pb.go
    • Added new gRPC service definitions for Cosmos SDK Tendermint queries, including client and server interfaces for GetBlockByHeight and GetLatestBlock.
  • pkg/api/grpc/header_service.go
    • Modified Subscribe method to handle potential errors returned by the Notifier.
  • pkg/api/grpc/server_test.go
    • Updated api.NewNotifier calls in various tests to include the maxSubscribers parameter.
  • pkg/api/health_test.go
    • Updated NewNotifier calls in tests to include the maxSubscribers parameter.
  • pkg/api/jsonrpc/blob.go
    • Modified BlobSubscribe method to handle potential errors returned by the Notifier.
  • pkg/api/jsonrpc/header.go
    • Modified Subscribe method to handle potential errors returned by the Notifier.
  • pkg/api/jsonrpc/server_test.go
    • Updated api.NewNotifier calls in various tests to include the maxSubscribers parameter.
  • pkg/api/notifier.go
    • Added maxSubscribers field to the Notifier struct.
    • Updated NewNotifier function signature to accept maxSubscribers and enforce minimum values.
    • Modified Subscribe method to check against maxSubscribers and return an error if the limit is reached.
    • Changed Subscribe return type to include an error.
  • pkg/api/notifier_test.go
    • Updated NewNotifier calls in tests to include maxSubscribers.
    • Added error checks for Subscribe calls.
    • Added TestNotifierMaxSubscribers to verify the new subscriber limit functionality.
  • pkg/api/service.go
    • Modified BlobSubscribe and HeaderSubscribe methods to return an error in addition to the subscription.
  • pkg/api/service_test.go
    • Updated NewNotifier calls in tests to include the maxSubscribers parameter.
  • pkg/backfill/db/source.go
    • Added new file defining a Source for reading historical block/blob data directly from celestia-app's local DB, with a placeholder for backend adapters.
  • pkg/backfill/rpc/source.go
    • Added new file defining an RPC-backed Source for backfill data, wrapping a DataFetcher.
  • pkg/backfill/rpc/source_test.go
    • Added new test file for the RPC-backed backfill source, including tests for combined fetcher usage and error propagation.
  • pkg/backfill/runner.go
    • Added new file defining the Runner struct and its Run method for fetching and persisting historical blocks in batches with concurrency and progress reporting.
  • pkg/backfill/source.go
    • Added new file defining the Source interface for fetching historical data and the HeightObserver type.
  • pkg/fetch/celestia_app.go
    • Rewrote CelestiaAppFetcher to use Cosmos SDK gRPC for block and status queries instead of CometBFT JSONRPC.
    • Removed HTTP client and WebSocket-related logic.
    • Implemented GetHeightData for combined header and blob fetching.
    • Updated SubscribeHeaders to poll GetLatestBlock at intervals instead of using WebSockets.
    • Added bearerCreds for gRPC authentication and withAuth helper for metadata.
  • pkg/fetch/celestia_app_test.go
    • Rewrote tests for CelestiaAppFetcher to use a gRPC mock server (mockCometService) and bufconn for in-memory testing.
    • Updated test cases to reflect gRPC interactions for GetHeader, GetBlobs, GetNetworkHead, and SubscribeHeaders.
  • pkg/fetch/celestia_node.go
    • Added callRawWithRetry function to implement retry logic for transient RPC errors in CelestiaNodeFetcher.
    • Defined defaultRPCTimeout, defaultRPCMaxRetries, and defaultRPCRetryDelay constants.
    • Implemented retryDelay with jitter for exponential backoff.
    • Added isTransientRPCError helper to identify retryable network errors.
    • Applied retry logic to GetHeader, GetBlobs, and GetProof methods.
  • pkg/fetch/celestia_node_test.go
    • Added TestIsTransientRPCError to verify transient error detection.
    • Added TestGetBlobsRetriesTransient to test the RPC retry mechanism for CelestiaNodeFetcher.
  • pkg/sync/backfill.go
    • Refactored Backfiller to delegate backfill execution to the new backfill.Runner.
    • Removed internal batch processing and progress reporting logic, relying on backfill.Runner.
    • Introduced backfill.Source and backfillrpc.NewSource for data fetching during backfill.
  • proto/cosmos/base/tendermint/v1beta1/query.proto
    • Added new protobuf schema for Cosmos SDK Tendermint queries, defining messages and services for block data retrieval.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the Celestia-app data source to use Cosmos SDK gRPC instead of CometBFT RPC, introducing new generated protobuf files for Tendermint queries. It updates configuration fields and documentation to reflect this change, including renaming CelestiaAppURL to CelestiaAppGRPCAddr. The CelestiaAppFetcher is rewritten to use gRPC for fetching blocks and headers, and its SubscribeHeaders method now polls for new blocks instead of using WebSockets. Additionally, the backfill process is refactored to use a new backfill.Runner component, which introduces retry logic for transient RPC errors when fetching data from Celestia nodes. The API subscription mechanism is enhanced with a MaxSubscribers limit, and HTTP RPC endpoints now support configurable ReadTimeout and WriteTimeout values. Review comments highlighted a potential security risk with bearerCreds explicitly disabling transport security, a uint64 underflow vulnerability in totalHeights and heights calculations, and a concern about out-of-order event delivery during backfill due to concurrent observer calls.

return map[string]string{"authorization": "Bearer " + b.token}, nil
}

func (b bearerCreds) RequireTransportSecurity() bool { return false }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The bearerCreds implementation of grpc.PerRPCCredentials explicitly disables transport security requirements by returning false from RequireTransportSecurity(). Combined with the use of insecure.NewCredentials() on line 47, this allows authentication tokens to be transmitted over plaintext connections if the indexer is configured with a remote gRPC address that does not use TLS. This exposes sensitive authentication tokens to network interception.

Consider requiring transport security by default and making it configurable only if absolutely necessary for local development.

Suggested change
func (b bearerCreds) RequireTransportSecurity() bool { return false }
func (b bearerCreds) RequireTransportSecurity() bool { return true }

return fmt.Errorf("get namespaces: %w", err)
}

totalHeights := toHeight - fromHeight + 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The calculation of totalHeights does not account for the case where fromHeight > toHeight, which will result in a uint64 underflow. This will cause incorrect progress reporting and potential division by zero in the progress goroutine. It is better to validate the range at the start of the method.

Suggested change
totalHeights := toHeight - fromHeight + 1
if fromHeight > toHeight {
return nil
}
totalHeights := toHeight - fromHeight + 1

}

func (r *Runner) processBatch(ctx context.Context, from, to uint64, namespaces []types.Namespace) error {
heights := make(chan uint64, to-from+1)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The channel allocation make(chan uint64, to-from+1) will panic if from > to due to uint64 underflow. While the current caller logic in Run prevents this, the processBatch method should be defensive against invalid ranges to avoid runtime panics if used independently.

Suggested change
heights := make(chan uint64, to-from+1)
if from > to {
return nil
}
heights := make(chan uint64, to-from+1)


if r.Observer != nil {
stageStart = time.Now()
r.Observer(height, hdr, blobs)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Observer is called concurrently from multiple worker goroutines. When this observer is wired to the notifier (as seen in cmd/apex/main.go), it will cause events to be published to API clients out-of-order during backfill. This triggers frequent 'non-contiguous event delivery' warnings in the notifier and may break clients that expect sequential block updates. Consider collecting batch results and publishing them sequentially, or disabling the observer during historical backfills.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (10)
pkg/fetch/celestia_app_test.go (3)

237-249: TestCloseIdempotent dials a real address — works due to lazy connect but is fragile.

NewCelestiaAppFetcher("localhost:26657", ...) relies on gRPC lazy connect semantics. This is fine today, but if the constructor or dial options change to use grpc.WithBlock() in the future, this test will hang or fail. Consider using the bufconn-based newTestFetcher for consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/celestia_app_test.go` around lines 237 - 249, TestCloseIdempotent
currently constructs a real dial to "localhost:26657" via NewCelestiaAppFetcher
which relies on lazy gRPC connects and is fragile; change the test to use the
bufconn-based helper newTestFetcher to create the fetcher so it doesn't perform
a real network dial: replace the NewCelestiaAppFetcher call in
TestCloseIdempotent with newTestFetcher (or the project's equivalent test
helper) and keep the existing Close() idempotency assertions against the
returned fetcher instance.

54-58: t.Logf inside a goroutine can race with test completion.

Calling t.Logf from a goroutine that outlives the test function body causes a panic. Here srv.Stop() in the cleanup (line 59) should cause Serve to return before cleanup completes, so this is likely safe in practice. However, if Stop were replaced with GracefulStop or the server takes time shutting down, this could become a problem.

A safer pattern is to log to a standalone logger or simply discard the error:

Suggested change
 	go func() {
-		if err := srv.Serve(lis); err != nil {
-			t.Logf("grpc serve: %v", err)
-		}
+		_ = srv.Serve(lis)
 	}()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/celestia_app_test.go` around lines 54 - 58, The test currently
calls t.Logf from inside the goroutine running srv.Serve, which can race with
test teardown; instead avoid calling testing.T methods from background
goroutines—either replace t.Logf with a standalone logger (e.g., log.Printf) or
send the error back to the main test goroutine via a channel and handle/log it
there (capture the error from srv.Serve in the goroutine, send to an errCh, then
read and call t.Logf in the test function). Update the goroutine around
srv.Serve and any cleanup logic so only the main test goroutine uses t.Logf.

251-252: Remove the unused base64 import and dummy variable assignment.

The base64 import is unused in celestia_app.go and only appears in the test file via the dummy var _ = base64.StdEncoding assignment. The accompanying comment references "blobtx helpers" that do not exist in the codebase. Remove both the import and the dummy assignment to eliminate this maintenance burden.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/celestia_app_test.go` around lines 251 - 252, Remove the unused
base64 import and the dummy assignment var _ = base64.StdEncoding from the test
file; locate the reference to base64 (the import and the var _ =
base64.StdEncoding line) in celestia_app_test.go or its imports, delete both the
import entry and the dummy variable, and run `go test`/`go vet` to ensure
nothing else depends on base64 before committing.
docs/running.md (1)

60-65: Full Config Reference also shows both ports as 9090.

Same concern here — in the full reference, both the upstream celestia_app_grpc_addr and the local grpc_listen_addr are 9090. A comment clarifying the distinction (remote source vs. local server) would help avoid confusion.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/running.md` around lines 60 - 65, The docs show both
celestia_app_grpc_addr and grpc_listen_addr using port 9090 which can confuse
readers; update the YAML example and/or inline comments to clearly distinguish
that celestia_app_grpc_addr is the remote/upstream Celestia app gRPC endpoint
(the external host:port your node queries) while grpc_listen_addr is the local
gRPC server bind address for this service, and ensure the full config reference
mirrors that clarification (e.g., add a comment next to celestia_app_grpc_addr
stating "remote/upstream app gRPC endpoint" and a comment next to
grpc_listen_addr stating "local bind address").
pkg/backfill/runner.go (2)

121-174: Batch processing: consider errgroup for cleaner worker orchestration.

The manual sync.WaitGroup + sync.Mutex + firstErr pattern works correctly, but golang.org/x/sync/errgroup (with SetLimit) would eliminate the boilerplate and provide the same semantics with less surface area for bugs.

Sketch using errgroup
func (r *Runner) processBatch(ctx context.Context, from, to uint64, namespaces []types.Namespace) error {
	g, gctx := errgroup.WithContext(ctx)
	g.SetLimit(r.Concurrency)
	for h := from; h <= to; h++ {
		h := h
		g.Go(func() error {
			return r.processHeight(gctx, h, namespaces)
		})
	}
	return g.Wait()
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/backfill/runner.go` around lines 121 - 174, Replace the manual worker
orchestration in processBatch with an errgroup: create an errgroup with
errgroup.WithContext(ctx), call g.SetLimit(r.Concurrency), loop h from from to
to and capture h in the loop, then use g.Go to call r.processHeight(gctx, h,
namespaces) for each height, and finally return g.Wait(); this removes the
sync.WaitGroup/sync.Mutex/firstErr boilerplate while preserving cancellation and
error propagation.

45-74: Progress reporter goroutine: minor log inaccuracy and edge case.

Line 64: fromHeight+done reports a height that is one past the last completed batch end (since done = batchEnd - fromHeight + 1). Consider fromHeight + done - 1 for the actual last completed height.

Line 62: rate can be +Inf if elapsed.Seconds() is near zero on the first tick. Not a crash, but remaining would be 0s (or NaN), producing a confusing log line. A guard like if elapsed < time.Second { continue } at the top would avoid this.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/backfill/runner.go` around lines 45 - 74, The progress reporter goroutine
logs an off-by-one last-completed height and can compute an infinite/NaN rate
when elapsed is near zero; inside the anonymous goroutine that reads
processedHeights, startTime, totalHeights, fromHeight and toHeight, change the
logged height from fromHeight+done to fromHeight+done-1 (but only when done>0)
and add a short elapsed guard before computing rate/remaining (e.g., if elapsed
< time.Second { continue } or skip rate/ETA calculation until elapsed>0) so you
don't divide by a near-zero elapsed and produce +Inf/NaN in the r.Log.Info call;
keep existing done==0 handling.
pkg/backfill/rpc/source_test.go (2)

12-56: Mock fetchers don't validate input arguments (height, namespaces).

Both mockFetcher and fallbackFetcher ignore the height and namespaces parameters. This means the tests can't detect if Source.FetchHeight passes the wrong height to the underlying fetcher. Consider capturing and asserting the received arguments in at least one test case.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/backfill/rpc/source_test.go` around lines 12 - 56, The mock fetchers
(mockFetcher and fallbackFetcher) ignore the height and namespaces parameters so
tests can't assert the arguments; update the mock implementations of GetHeader,
GetBlobs, and GetHeightData to record the received height and namespaces (e.g.,
add fields like lastHeight and lastNamespaces or slices of calls) and return
them so tests can assert that Source.FetchHeight passed the expected
height/namespaces; update at least one test to check those recorded fields after
calling Source.FetchHeight to validate the correct arguments were forwarded.

58-139: Tests should use table-driven pattern per coding guidelines.

The three separate test functions cover the main scenarios well, but the coding guidelines require table-driven tests. These cases (combined path, fallback, error propagation) are good candidates for consolidation into a single table-driven test with subtests.

Also, there's no test for the combined fetcher returning an error — only the fallback error path is exercised via fallbackFetcher.

Sketch of table-driven refactor
func TestSourceFetchHeight(t *testing.T) {
	tests := []struct {
		name      string
		fetcher   fetch.DataFetcher
		height    uint64
		ns        []types.Namespace
		wantHdr   uint64
		wantBlobs int
		wantErr   error
	}{
		{
			name: "combined fetcher",
			fetcher: &mockFetcher{
				hdr:   &types.Header{Height: 10},
				blobs: []types.Blob{{Height: 10}},
			},
			height:    10,
			ns:        []types.Namespace{{}},
			wantHdr:   10,
			wantBlobs: 1,
		},
		{
			name:    "fallback mode",
			fetcher: &fallbackFetcher{hdr: &types.Header{Height: 5}},
			height:  5,
			wantHdr: 5,
		},
		{
			name:    "error propagation",
			fetcher: &fallbackFetcher{err: store.ErrNotFound},
			height:  99,
			ns:      []types.Namespace{{}},
			wantErr: store.ErrNotFound,
		},
		// Add: combined fetcher error case
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			s := NewSource(tt.fetcher)
			hdr, blobs, err := s.FetchHeight(context.Background(), tt.height, tt.ns)
			if tt.wantErr != nil {
				if !errors.Is(err, tt.wantErr) {
					t.Fatalf("expected %v, got %v", tt.wantErr, err)
				}
				return
			}
			if err != nil {
				t.Fatalf("unexpected error: %v", err)
			}
			if hdr.Height != tt.wantHdr {
				t.Fatalf("header height = %d, want %d", hdr.Height, tt.wantHdr)
			}
			if len(blobs) != tt.wantBlobs {
				t.Fatalf("blob count = %d, want %d", len(blobs), tt.wantBlobs)
			}
		})
	}
}

As per coding guidelines: "Use table-driven tests pattern for test organization".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/backfill/rpc/source_test.go` around lines 58 - 139, Consolidate the three
tests TestSourceFetchHeight_UsesCombinedFetcherWhenAvailable,
TestSourceFetchHeight_FallbackMode and TestSourceFetchHeight_PropagatesErrors
into a single table-driven test (e.g., TestSourceFetchHeight) that iterates
cases with different fetcher implementations (mockFetcher, fallbackFetcher) and
inputs passed to NewSource(...). For each table entry run a subtest that calls
s.FetchHeight(ctx, height, ns) and asserts hdr height, blob count and error
expectations using errors.Is; add an extra table case where the combined fetcher
(mockFetcher) returns an error to verify combined-path error propagation. Ensure
you keep/ reuse the existing helper types mockFetcher and fallbackFetcher and
check combinedCalls/hdrCalls/blobCalls where applicable.
pkg/fetch/celestia_app.go (2)

32-41: bearerCreds type may be dead code if PerRPCCredentials dial option is removed.

If you adopt the fix for the duplicate-auth issue above, bearerCreds becomes unused. If you instead keep PerRPCCredentials and drop withAuth, then bearerCreds stays. Either way, ensure only one credential mechanism remains, and remove the other.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/celestia_app.go` around lines 32 - 41, The review notes that
bearerCreds (type bearerCreds and its methods GetRequestMetadata and
RequireTransportSecurity) may be dead if you remove the PerRPCCredentials dial
option; resolve the duplication by choosing one credential mechanism and
removing the other: either keep PerRPCCredentials (retain bearerCreds and
remove/stop using withAuth) or keep withAuth (remove bearerCreds and any
PerRPCCredentials usage), and update any dial/connection code that references
bearerCreds, PerRPCCredentials, or withAuth to match the chosen approach so only
one auth path remains.

96-98: uint64int64 cast may silently overflow for heights > math.MaxInt64.

Height: int64(height) wraps to a negative value when height exceeds math.MaxInt64. While unlikely in practice today, a defensive guard prevents a confusing negative-height request reaching the server.

Proposed fix
+	if height > uint64(math.MaxInt64) {
+		return nil, nil, fmt.Errorf("height %d overflows int64", height)
+	}
 	resp, err := f.client.GetBlockByHeight(f.withAuth(ctx), &cometpb.GetBlockByHeightRequest{
 		Height: int64(height),
 	})

(Also add "math" to imports.)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/celestia_app.go` around lines 96 - 98, The code casts a uint64
height to int64 in the GetBlockByHeightRequest (f.client.GetBlockByHeight with
GetBlockByHeightRequest{Height: int64(height)}) which will overflow for heights
> math.MaxInt64; modify the call site to guard against overflow by checking if
height > math.MaxInt64 and return an error (or handle appropriately) before
casting, then perform the safe cast to int64 only when within range; also add
"math" to the imports so you can compare against math.MaxInt64.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/running.md`:
- Around line 51-54: The example config uses the same port for Apex gRPC and
celestia-app gRPC which causes a bind conflict; update the Minimal Config
(Consensus Node) example so the celestia_app_grpc_addr value uses a different
port (e.g., change celestia_app_grpc_addr from "localhost:9090" to
"localhost:9091") or alternatively change grpc_listen_addr, and add a short
comment noting that grpc_listen_addr and celestia_app_grpc_addr must not share
the same port when running Apex and celestia-app on the same host; ensure you
update the example entry for celestia_app_grpc_addr and the accompanying comment
text.

In `@pkg/backfill/runner.go`:
- Around line 128-131: The code computes workers from r.Concurrency which can be
≤ 0, causing no goroutines to run and the batch to be silently skipped; update
the code that computes workers (the r.Concurrency → workers logic used before
the goroutine loop and wg.Wait()) to validate r.Concurrency and either return an
error when r.Concurrency <= 0 or default workers to 1 (choose project
convention), then recompute the min with the range size (to - from + 1) so the
subsequent for-range worker spawn actually runs; ensure the check references
r.Concurrency, the local workers variable, and the goroutine/WaitGroup section
(where wg.Wait() and firstErr are used).
- Around line 76-78: The loop in Runner.Run uses uint64(r.BatchSize) and will
never advance if r.BatchSize == 0; validate r.BatchSize at the start of Run
(e.g., in func (r *Runner) Run(...)) and return an error (or set a sane default)
if BatchSize <= 0 to prevent an infinite loop; reference r.BatchSize and the for
loop that uses uint64(r.BatchSize) and ensure the validation occurs before any
use of fromHeight/toHeight batching.

In `@pkg/fetch/celestia_app.go`:
- Around line 226-228: The code calls hdr.Time.AsTime() without checking
hdr.Time for nil which can panic; update the handling around hdr := block.Header
to check if hdr.Time == nil before calling AsTime(), and handle that case (e.g.,
skip/return an error or use time.Time{}), modifying the block-processing logic
where hdr and hdr.Time.AsTime() are used so you don't dereference a nil pointer
(look for the hdr variable and the AsTime() call to locate the change).
- Around line 46-51: The code currently attaches the same Authorization header
twice: via grpc.WithPerRPCCredentials(bearerCreds{token: authToken}) and again
in withAuth (which calls grpcmd.AppendToOutgoingContext); remove the Per-RPC
credentials usage and only set the token in withAuth. Concretely, delete the
branch that appends grpc.WithPerRPCCredentials(...) to opts (the bearerCreds
usage) so opts only contains transport credentials, ensure withAuth continues to
append the "authorization: Bearer <token>" metadata, and remove the bearerCreds
type/implementation if it becomes unused.

In `@proto/cosmos/base/tendermint/v1beta1/query.proto`:
- Around line 37-43: The Header protobuf message currently defines chain_id as
bytes which produces []byte in generated Go; change the field type in the
message Header by replacing "bytes chain_id = 3;" with "string chain_id = 3;" so
it matches CometBFT's types.proto and yields a Go string; update any generated
artifacts (regenerate protobuf bindings) and adjust any call sites that assumed
[]byte to use string (references: message Header, field chain_id).

---

Nitpick comments:
In `@docs/running.md`:
- Around line 60-65: The docs show both celestia_app_grpc_addr and
grpc_listen_addr using port 9090 which can confuse readers; update the YAML
example and/or inline comments to clearly distinguish that
celestia_app_grpc_addr is the remote/upstream Celestia app gRPC endpoint (the
external host:port your node queries) while grpc_listen_addr is the local gRPC
server bind address for this service, and ensure the full config reference
mirrors that clarification (e.g., add a comment next to celestia_app_grpc_addr
stating "remote/upstream app gRPC endpoint" and a comment next to
grpc_listen_addr stating "local bind address").

In `@pkg/backfill/rpc/source_test.go`:
- Around line 12-56: The mock fetchers (mockFetcher and fallbackFetcher) ignore
the height and namespaces parameters so tests can't assert the arguments; update
the mock implementations of GetHeader, GetBlobs, and GetHeightData to record the
received height and namespaces (e.g., add fields like lastHeight and
lastNamespaces or slices of calls) and return them so tests can assert that
Source.FetchHeight passed the expected height/namespaces; update at least one
test to check those recorded fields after calling Source.FetchHeight to validate
the correct arguments were forwarded.
- Around line 58-139: Consolidate the three tests
TestSourceFetchHeight_UsesCombinedFetcherWhenAvailable,
TestSourceFetchHeight_FallbackMode and TestSourceFetchHeight_PropagatesErrors
into a single table-driven test (e.g., TestSourceFetchHeight) that iterates
cases with different fetcher implementations (mockFetcher, fallbackFetcher) and
inputs passed to NewSource(...). For each table entry run a subtest that calls
s.FetchHeight(ctx, height, ns) and asserts hdr height, blob count and error
expectations using errors.Is; add an extra table case where the combined fetcher
(mockFetcher) returns an error to verify combined-path error propagation. Ensure
you keep/ reuse the existing helper types mockFetcher and fallbackFetcher and
check combinedCalls/hdrCalls/blobCalls where applicable.

In `@pkg/backfill/runner.go`:
- Around line 121-174: Replace the manual worker orchestration in processBatch
with an errgroup: create an errgroup with errgroup.WithContext(ctx), call
g.SetLimit(r.Concurrency), loop h from from to to and capture h in the loop,
then use g.Go to call r.processHeight(gctx, h, namespaces) for each height, and
finally return g.Wait(); this removes the sync.WaitGroup/sync.Mutex/firstErr
boilerplate while preserving cancellation and error propagation.
- Around line 45-74: The progress reporter goroutine logs an off-by-one
last-completed height and can compute an infinite/NaN rate when elapsed is near
zero; inside the anonymous goroutine that reads processedHeights, startTime,
totalHeights, fromHeight and toHeight, change the logged height from
fromHeight+done to fromHeight+done-1 (but only when done>0) and add a short
elapsed guard before computing rate/remaining (e.g., if elapsed < time.Second {
continue } or skip rate/ETA calculation until elapsed>0) so you don't divide by
a near-zero elapsed and produce +Inf/NaN in the r.Log.Info call; keep existing
done==0 handling.

In `@pkg/fetch/celestia_app_test.go`:
- Around line 237-249: TestCloseIdempotent currently constructs a real dial to
"localhost:26657" via NewCelestiaAppFetcher which relies on lazy gRPC connects
and is fragile; change the test to use the bufconn-based helper newTestFetcher
to create the fetcher so it doesn't perform a real network dial: replace the
NewCelestiaAppFetcher call in TestCloseIdempotent with newTestFetcher (or the
project's equivalent test helper) and keep the existing Close() idempotency
assertions against the returned fetcher instance.
- Around line 54-58: The test currently calls t.Logf from inside the goroutine
running srv.Serve, which can race with test teardown; instead avoid calling
testing.T methods from background goroutines—either replace t.Logf with a
standalone logger (e.g., log.Printf) or send the error back to the main test
goroutine via a channel and handle/log it there (capture the error from
srv.Serve in the goroutine, send to an errCh, then read and call t.Logf in the
test function). Update the goroutine around srv.Serve and any cleanup logic so
only the main test goroutine uses t.Logf.
- Around line 251-252: Remove the unused base64 import and the dummy assignment
var _ = base64.StdEncoding from the test file; locate the reference to base64
(the import and the var _ = base64.StdEncoding line) in celestia_app_test.go or
its imports, delete both the import entry and the dummy variable, and run `go
test`/`go vet` to ensure nothing else depends on base64 before committing.

In `@pkg/fetch/celestia_app.go`:
- Around line 32-41: The review notes that bearerCreds (type bearerCreds and its
methods GetRequestMetadata and RequireTransportSecurity) may be dead if you
remove the PerRPCCredentials dial option; resolve the duplication by choosing
one credential mechanism and removing the other: either keep PerRPCCredentials
(retain bearerCreds and remove/stop using withAuth) or keep withAuth (remove
bearerCreds and any PerRPCCredentials usage), and update any dial/connection
code that references bearerCreds, PerRPCCredentials, or withAuth to match the
chosen approach so only one auth path remains.
- Around line 96-98: The code casts a uint64 height to int64 in the
GetBlockByHeightRequest (f.client.GetBlockByHeight with
GetBlockByHeightRequest{Height: int64(height)}) which will overflow for heights
> math.MaxInt64; modify the call site to guard against overflow by checking if
height > math.MaxInt64 and return an error (or handle appropriately) before
casting, then perform the safe cast to int64 only when within range; also add
"math" to the imports so you can compare against math.MaxInt64.

Comment on lines +51 to 54
celestia_app_grpc_addr: "localhost:9090"
namespaces:
- "0000000000000000000000000000000000000000000000000000deadbeef"
```
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Port conflict in the "app" example: both Apex gRPC and celestia-app gRPC default to 9090.

In the Minimal Config (Consensus Node) example, celestia_app_grpc_addr defaults to localhost:9090, which is the same as grpc_listen_addr: ":9090" (line 72). When running Apex and celestia-app on the same machine, the Apex gRPC server will fail to bind. Consider noting this in the example or using a different default for one of the two.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/running.md` around lines 51 - 54, The example config uses the same port
for Apex gRPC and celestia-app gRPC which causes a bind conflict; update the
Minimal Config (Consensus Node) example so the celestia_app_grpc_addr value uses
a different port (e.g., change celestia_app_grpc_addr from "localhost:9090" to
"localhost:9091") or alternatively change grpc_listen_addr, and add a short
comment noting that grpc_listen_addr and celestia_app_grpc_addr must not share
the same port when running Apex and celestia-app on the same host; ensure you
update the example entry for celestia_app_grpc_addr and the accompanying comment
text.

Comment on lines +76 to +78
for batchStart := fromHeight; batchStart <= toHeight; batchStart += uint64(r.BatchSize) {
batchStartTime := time.Now()
batchEnd := batchStart + uint64(r.BatchSize) - 1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Infinite loop when BatchSize is 0.

If r.BatchSize is zero (e.g., from a zero-value Runner or misconfiguration), uint64(r.BatchSize) is 0, so batchStart never advances and this loop runs forever. A negative BatchSize converts to a huge uint64, causing the loop to exit after one oversized batch.

Proposed fix: validate at the top of Run
 func (r *Runner) Run(ctx context.Context, fromHeight, toHeight uint64) error {
+	if r.BatchSize <= 0 {
+		return fmt.Errorf("batch size must be positive, got %d", r.BatchSize)
+	}
+	if r.Concurrency <= 0 {
+		return fmt.Errorf("concurrency must be positive, got %d", r.Concurrency)
+	}
+	if fromHeight > toHeight {
+		return nil // nothing to do
+	}
 	if r.Metrics == nil {
 		r.Metrics = metrics.Nop()
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/backfill/runner.go` around lines 76 - 78, The loop in Runner.Run uses
uint64(r.BatchSize) and will never advance if r.BatchSize == 0; validate
r.BatchSize at the start of Run (e.g., in func (r *Runner) Run(...)) and return
an error (or set a sane default) if BatchSize <= 0 to prevent an infinite loop;
reference r.BatchSize and the for loop that uses uint64(r.BatchSize) and ensure
the validation occurs before any use of fromHeight/toHeight batching.

Comment on lines +128 to +131
workers := r.Concurrency
if int(to-from+1) < workers {
workers = int(to - from + 1)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Zero workers if Concurrency ≤ 0 silently skips the entire batch.

When workers is 0, for range workers (line 139) spawns no goroutines, wg.Wait() returns immediately, firstErr remains nil, and the batch appears successful despite processing zero heights.

This is addressed by the input validation proposed above.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/backfill/runner.go` around lines 128 - 131, The code computes workers
from r.Concurrency which can be ≤ 0, causing no goroutines to run and the batch
to be silently skipped; update the code that computes workers (the r.Concurrency
→ workers logic used before the goroutine loop and wg.Wait()) to validate
r.Concurrency and either return an error when r.Concurrency <= 0 or default
workers to 1 (choose project convention), then recompute the min with the range
size (to - from + 1) so the subsequent for-range worker spawn actually runs;
ensure the check references r.Concurrency, the local workers variable, and the
goroutine/WaitGroup section (where wg.Wait() and firstErr are used).

Comment on lines +46 to 51
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
if authToken != "" {
opts = append(opts, grpc.WithPerRPCCredentials(bearerCreds{token: authToken}))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Duplicate bearer token: PerRPCCredentials and manual metadata both send authorization.

When authToken is non-empty, grpc.WithPerRPCCredentials(bearerCreds{...}) (line 50) already attaches "authorization": "Bearer <token>" to every outgoing RPC. Then withAuth (line 214) appends the same key again via grpcmd.AppendToOutgoingContext. This results in the authorization metadata key carrying two identical values per call, which some servers may reject or misinterpret.

Pick one mechanism — either PerRPCCredentials or manual metadata — not both.

Proposed fix: remove PerRPCCredentials and rely on withAuth only
 	opts := []grpc.DialOption{
 		grpc.WithTransportCredentials(insecure.NewCredentials()),
 	}
-	if authToken != "" {
-		opts = append(opts, grpc.WithPerRPCCredentials(bearerCreds{token: authToken}))
-	}

Also applies to: 210-215

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/celestia_app.go` around lines 46 - 51, The code currently attaches
the same Authorization header twice: via
grpc.WithPerRPCCredentials(bearerCreds{token: authToken}) and again in withAuth
(which calls grpcmd.AppendToOutgoingContext); remove the Per-RPC credentials
usage and only set the token in withAuth. Concretely, delete the branch that
appends grpc.WithPerRPCCredentials(...) to opts (the bearerCreds usage) so opts
only contains transport credentials, ensure withAuth continues to append the
"authorization: Bearer <token>" metadata, and remove the bearerCreds
type/implementation if it becomes unused.

Comment on lines +226 to 228
hdr := block.Header
t := hdr.Time.AsTime()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Nil-pointer dereference if hdr.Time is nil.

hdr.Time.AsTime() will panic if block.Header.Time is nil (e.g., a pruned/corrupt response). The nil checks on lines 219-224 guard block, block.Header, and blockID, but not hdr.Time.

Proposed fix
 	hdr := block.Header
+	if hdr.Time == nil {
+		return nil, fmt.Errorf("nil timestamp in header at height %d", hdr.Height)
+	}
 	t := hdr.Time.AsTime()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
hdr := block.Header
t := hdr.Time.AsTime()
hdr := block.Header
if hdr.Time == nil {
return nil, fmt.Errorf("nil timestamp in header at height %d", hdr.Height)
}
t := hdr.Time.AsTime()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/celestia_app.go` around lines 226 - 228, The code calls
hdr.Time.AsTime() without checking hdr.Time for nil which can panic; update the
handling around hdr := block.Header to check if hdr.Time == nil before calling
AsTime(), and handle that case (e.g., skip/return an error or use time.Time{}),
modifying the block-processing logic where hdr and hdr.Time.AsTime() are used so
you don't dereference a nil pointer (look for the hdr variable and the AsTime()
call to locate the change).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
pkg/backfill/db/source_test.go (1)

29-29: Unnecessary loop variable capture in Go 1.25.

Since this project uses Go 1.25 (per go.mod), the loop variable tc is already scoped per iteration. The tc := tc re-capture is harmless but redundant.

 	for _, tc := range cases {
-		tc := tc
 		t.Run(tc.name, func(t *testing.T) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/backfill/db/source_test.go` at line 29, Remove the redundant re-capture
statement "tc := tc" inside the table-driven test loop in source_test.go: locate
the loop that iterates over test cases using the variable named tc and delete
the "tc := tc" line so the per-iteration tc binding provided by Go 1.25 is used
directly; then run the tests (and go vet/static checks) to confirm nothing else
relies on that extra capture.
pkg/backfill/db/source.go (2)

556-589: Test-only helpers are compiled into the production binary.

writableKV, writablePebble, writableLevel, openWritable, and all the encode* / writeTestBlock helpers (lines 556–708) are in the non-test file source.go. They ship with the production binary and increase its size without adding runtime value.

Move these to a source_testhelpers_test.go file (or similar _test.go file) so they're excluded from production builds.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/backfill/db/source.go` around lines 556 - 589, The helpers used only in
tests (writableKV, writablePebble, writableLevel, openWritable, and the encode*
/ writeTestBlock helpers referenced in the diff) are defined in a non-test file
and are being compiled into production; move all of these symbols into a new
_test.go file (e.g., source_testhelpers_test.go) so they are only built for
tests, update any imports or package-level visibility as needed, and remove
their definitions from source.go; ensure function/type names (writableKV,
writablePebble.put/close, writableLevel.put/close, openWritable, and all encode*
/ writeTestBlock helpers) remain unchanged so tests continue to compile.

529-553: Auto-detection silently swallows open errors — consider logging the pebble error before falling through to leveldb.

In "auto" mode, if pebble fails to open a directory, the error is discarded. If the directory is actually a pebble DB with a corrupt manifest (rather than a leveldb), auto-detection would fall through to leveldb and either fail with an unhelpful error or (worse) open it silently.

Consider logging the pebble error at debug level before trying leveldb, to aid troubleshooting.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/backfill/db/source.go` around lines 529 - 553, The openKV function's
"auto" case swallows the pebble.Open error; capture the first error (e.g.,
errPebble) when calling pebble.Open(path, ...) inside the "auto" branch and emit
a debug-level log with the path and errPebble before attempting
leveldb.OpenFile; keep the subsequent leveldb attempt and final error behavior
unchanged so you only add a debug log for pebble.Open failures (referencing
openKV, the "auto" case, and pebble.Open).
config/load.go (1)

154-185: Dual defaulting of BackfillSource, CelestiaAppDBBackend, CelestiaAppDBLayout.

Lines 158–166 re-apply the same defaults already established in DefaultConfig() (config.go lines 101–103). Since Load() decodes YAML on top of DefaultConfig(), these fields are already "rpc" / "auto" / "auto" unless explicitly overridden. The validator defaults are only reachable if someone constructs a Config bypassing DefaultConfig() or explicitly writes an empty string in YAML.

This is harmless as a safety net, but consider consolidating defaults in one place (either DefaultConfig() or validate()) to avoid future drift.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@config/load.go` around lines 154 - 185, The validator in Load()/validate()
re-applies defaults for BackfillSource, CelestiaAppDBBackend, and
CelestiaAppDBLayout that DefaultConfig() already sets, creating duplicate
defaulting logic; consolidate defaults into a single place by removing the
backup default assignments in the "app" case (the blocks that set
ds.BackfillSource = "rpc", ds.CelestiaAppDBBackend = "auto",
ds.CelestiaAppDBLayout = "auto") and rely on DefaultConfig() to supply them, or
alternatively move those defaults into validate()/Load() and remove them from
DefaultConfig(); update and run tests to ensure behavior is unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/fetch/celestia_app.go`:
- Around line 215-219: The code assigns json.Marshal(block) to the RawHeader
field and logs "marshal raw header", but that serializes the entire
cometpb.Block (including Data.Txs) instead of only the header; update the
serialization to json.Marshal(block.Header) so RawHeader truly contains only
header metadata (or, if intended to store the full block, rename the field from
RawHeader and change the error message to reflect full-block marshalling), and
update the error text and any usages referencing RawHeader, json.Marshal(block)
and cometpb.Block accordingly.

In `@proto/cosmos/base/tendermint/v1beta1/query.proto`:
- Around line 37-43: The Header protobuf message field numbers are misaligned
with CometBFT's types.proto causing corrupted runtime deserialization; update
the message Header to use the exact tag numbers from CometBFT (chain_id=2,
height=3, time=4, data_hash=7, and include consensus_hash=10 if present), then
regenerate Go bindings (buf) so CelestiaAppFetcher and mapBlockResponse()
consume correct fields; also update/restore celestia_app_test.go mocks to
construct cometpb.Header via the regenerated types (not the old tags) and re-run
tests.

---

Duplicate comments:
In `@pkg/fetch/celestia_app.go`:
- Around line 213-214: The code calls hdr.Time.AsTime() without checking
hdr.Time for nil (see hdr := block.Header and t := hdr.Time.AsTime()), which can
panic for pruned/malformed responses; update the block handling in the function
to check if hdr.Time == nil before calling AsTime() and handle that case (e.g.,
log/warn and return an error or use a safe zero/default time) so you avoid a
nil-pointer dereference when processing block.Header.Time.

---

Nitpick comments:
In `@config/load.go`:
- Around line 154-185: The validator in Load()/validate() re-applies defaults
for BackfillSource, CelestiaAppDBBackend, and CelestiaAppDBLayout that
DefaultConfig() already sets, creating duplicate defaulting logic; consolidate
defaults into a single place by removing the backup default assignments in the
"app" case (the blocks that set ds.BackfillSource = "rpc",
ds.CelestiaAppDBBackend = "auto", ds.CelestiaAppDBLayout = "auto") and rely on
DefaultConfig() to supply them, or alternatively move those defaults into
validate()/Load() and remove them from DefaultConfig(); update and run tests to
ensure behavior is unchanged.

In `@pkg/backfill/db/source_test.go`:
- Line 29: Remove the redundant re-capture statement "tc := tc" inside the
table-driven test loop in source_test.go: locate the loop that iterates over
test cases using the variable named tc and delete the "tc := tc" line so the
per-iteration tc binding provided by Go 1.25 is used directly; then run the
tests (and go vet/static checks) to confirm nothing else relies on that extra
capture.

In `@pkg/backfill/db/source.go`:
- Around line 556-589: The helpers used only in tests (writableKV,
writablePebble, writableLevel, openWritable, and the encode* / writeTestBlock
helpers referenced in the diff) are defined in a non-test file and are being
compiled into production; move all of these symbols into a new _test.go file
(e.g., source_testhelpers_test.go) so they are only built for tests, update any
imports or package-level visibility as needed, and remove their definitions from
source.go; ensure function/type names (writableKV, writablePebble.put/close,
writableLevel.put/close, openWritable, and all encode* / writeTestBlock helpers)
remain unchanged so tests continue to compile.
- Around line 529-553: The openKV function's "auto" case swallows the
pebble.Open error; capture the first error (e.g., errPebble) when calling
pebble.Open(path, ...) inside the "auto" branch and emit a debug-level log with
the path and errPebble before attempting leveldb.OpenFile; keep the subsequent
leveldb attempt and final error behavior unchanged so you only add a debug log
for pebble.Open failures (referencing openKV, the "auto" case, and pebble.Open).

Comment on lines 215 to 219

func (f *CelestiaAppFetcher) parseWSBlockEvent(msg []byte) (*types.Header, error) {
var event cometWSEvent
if err := json.Unmarshal(msg, &event); err != nil {
return nil, fmt.Errorf("unmarshal ws event: %w", err)
}

// Skip non-result messages (e.g. subscription confirmation has empty result).
if len(event.Result.Data.Value) == 0 {
return nil, nil //nolint:nilnil
}

var blockValue cometBlockEventValue
if err := json.Unmarshal(event.Result.Data.Value, &blockValue); err != nil {
return nil, fmt.Errorf("unmarshal block event value: %w", err)
}

hdr := blockValue.Block.Header
t, err := time.Parse(time.RFC3339Nano, hdr.Time)
if err != nil {
return nil, fmt.Errorf("parse header time: %w", err)
}

raw, err := json.Marshal(blockValue)
raw, err := json.Marshal(block)
if err != nil {
return nil, fmt.Errorf("marshal raw header: %w", err)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check RawHeader definition and usage
rg -n 'RawHeader' --type=go -B2 -A2

Repository: evstack/apex

Length of output: 8652


🏁 Script executed:

#!/bin/bash
# Also examine the file context to understand the struct definition
cat -n pkg/fetch/celestia_app.go | head -250 | tail -100

Repository: evstack/apex

Length of output: 2922


🏁 Script executed:

#!/bin/bash
# Check for Block struct definition to understand what's being marshaled
rg -n 'type Block struct' --type=go -A10

Repository: evstack/apex

Length of output: 1231


RawHeader field name and error message suggest header-only serialization, but json.Marshal(block) actually serializes the entire block including transactions.

The field is named RawHeader and the error message says "marshal raw header", both indicating the intent to store only header metadata. However, the implementation serializes the full cometpb.Block (Header + Data.Txs). Since RawHeader is exposed via APIs, stored in the database, and sent to clients, this unnecessarily includes all transaction data. Either marshal only the header (block.Header instead of block), or rename the field to accurately reflect that it contains the entire block.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/celestia_app.go` around lines 215 - 219, The code assigns
json.Marshal(block) to the RawHeader field and logs "marshal raw header", but
that serializes the entire cometpb.Block (including Data.Txs) instead of only
the header; update the serialization to json.Marshal(block.Header) so RawHeader
truly contains only header metadata (or, if intended to store the full block,
rename the field from RawHeader and change the error message to reflect
full-block marshalling), and update the error text and any usages referencing
RawHeader, json.Marshal(block) and cometpb.Block accordingly.

@tac0turtle tac0turtle merged commit 9b5d077 into main Feb 20, 2026
4 checks passed
@tac0turtle tac0turtle deleted the marko/grpc_backfill branch February 20, 2026 15:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant