Skip to content

feat: add CelestiaAppFetcher and GetBlobByCommitment#31

Merged
tac0turtle merged 3 commits intomainfrom
feat/phase4-extended-features
Feb 19, 2026
Merged

feat: add CelestiaAppFetcher and GetBlobByCommitment#31
tac0turtle merged 3 commits intomainfrom
feat/phase4-extended-features

Conversation

@tac0turtle
Copy link
Contributor

@tac0turtle tac0turtle commented Feb 19, 2026

Implements two Phase 4 features:

  • GetBlobByCommitment: query blobs by commitment alone (no height/namespace required), with dedicated DB index, exposed via JSON-RPC, gRPC, and CLI.
  • CelestiaAppFetcher: alternative DataFetcher that ingests from celestia-app consensus nodes via CometBFT RPC, enabling indexing without a DA node. Includes BlobTx wire format parser using protowire (no new deps beyond promoting gorilla/websocket to direct).

Overview

Summary by CodeRabbit

  • New Features

    • Retrieve blobs by cryptographic commitment via CLI, gRPC, and JSON-RPC.
    • Option to index via Celestia App (CometBFT RPC) instead of a local node.
    • Real-time header subscriptions over WebSocket.
    • Faster commitment lookups via a new DB index.
  • Tests

    • Extensive tests covering commitment lookup, BlobTx parsing, app fetcher, and streaming behavior.

Implements two Phase 4 features:

- GetBlobByCommitment: query blobs by commitment alone (no height/namespace
  required), with dedicated DB index, exposed via JSON-RPC, gRPC, and CLI.
- CelestiaAppFetcher: alternative DataFetcher that ingests from celestia-app
  consensus nodes via CometBFT RPC, enabling indexing without a DA node.
  Includes BlobTx wire format parser using protowire (no new deps beyond
  promoting gorilla/websocket to direct).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link

coderabbitai bot commented Feb 19, 2026

Warning

Rate limit exceeded

@tac0turtle has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 7 minutes and 25 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📝 Walkthrough

Walkthrough

Adds blob-by-commitment lookup across API, store, CLI, and tests; introduces CelestiaAppFetcher as an alternative DataFetcher with HTTP/WS support; implements BlobTx parsing utilities; updates configuration to select datasource type and adds a DB index and migrations.

Changes

Cohort / File(s) Summary
GetByCommitment API
proto/apex/v1/blob.proto, pkg/api/grpc/blob_service.go, pkg/api/jsonrpc/blob.go, pkg/api/service.go
Adds GetByCommitment proto, gRPC handler, JSON-RPC handler, and service method delegating to store.GetBlobByCommitment and returning marshaled blob JSON.
Store, SQLite & Migrations
pkg/store/store.go, pkg/store/sqlite.go, pkg/store/migrations/002_commitment_index.sql, pkg/store/sqlite_test.go
Adds Store.GetBlobByCommitment to interface, SQLite implementation and query, migration to index blobs.commitment, and unit tests for success and not-found cases.
Fetch: Celestia App Fetcher
pkg/fetch/celestia_app.go, pkg/fetch/celestia_app_test.go
New CelestiaAppFetcher implementing DataFetcher: HTTP RPC + WebSocket header subscription, GetBlobs/GetHeader/GetNetworkHead, Close, with comprehensive tests and parsing helpers.
Fetch: BlobTx parsing
pkg/fetch/blobtx.go, pkg/fetch/blobtx_test.go
Parses BlobTx envelopes, extracts blobs and PayForBlobs metadata, filters by namespace, and includes extensive unit tests for parsing and extraction.
CLI wiring
cmd/apex/blob_cmd.go, cmd/apex/main.go
Adds blobGetByCommitmentCmd; main.go selects datasource type (node/app), constructs appropriate fetcher(s), and wires DataFetcher/ProofForwarder into services.
Configuration & Validation
config/config.go, config/load.go
Adds DataSourceConfig.Type and CelestiaAppURL, default type "node", and centralized validation for datasource type and required URLs.
API Tests & Mocks
pkg/api/grpc/server_test.go, pkg/api/jsonrpc/server_test.go, pkg/api/service_test.go, pkg/sync/mock_test.go, pkg/store/*_test.go
Adds mock GetBlobByCommitment implementations and tests across gRPC/JSON-RPC/service/sync/store suites.
Dependency
go.mod
Promotes github.com/gorilla/websocket to a direct dependency (v1.4.2).

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant CLI as CLI / RPC Client
    participant Service as Service
    participant Store as SQLiteStore
    participant DB as Database

    CLI->>Service: BlobGetByCommitment(commitment)
    Service->>Store: GetBlobByCommitment(commitment)
    Store->>DB: SELECT * FROM blobs WHERE commitment = ?
    DB-->>Store: row (blob)
    Store-->>Service: *types.Blob
    Service->>Service: MarshalBlob(blob)
    Service-->>CLI: JSON-encoded Blob
Loading
sequenceDiagram
    autonumber
    participant Main as main.go
    participant Config as config.Load()
    participant FetcherApp as CelestiaAppFetcher
    participant FetcherNode as CelestiaNodeFetcher
    participant Service as Service

    Main->>Config: Load configuration
    Config-->>Main: DataSourceConfig{Type, URLs}
    Main->>Main: switch on Type
    alt Type == "app"
        Main->>FetcherApp: NewCelestiaAppFetcher(appURL)
        FetcherApp-->>Main: appFetcher (no proofFwd)
        Main->>Service: NewService(dataFetcher=appFetcher, proofFwd=nil)
    else Type == "node"
        Main->>FetcherNode: NewCelestiaNodeFetcher(nodeURL)
        FetcherNode-->>Main: nodeFetcher
        Main->>Service: NewService(dataFetcher=nodeFetcher, proofFwd=nodeFetcher)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related issues

Possibly related PRs

Poem

🐰 A tiny commit hops from hash to light,
I fetch by promise through day and night.
WebSockets hum and SQL flowers sing,
Blobs unfurl bright on a data string.
Hooray — indexed hops make queries right!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 35.29% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely summarizes the two main features being added: CelestiaAppFetcher for app-based data fetching and GetBlobByCommitment for commitment-based blob retrieval.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/phase4-extended-features

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the data indexing capabilities by introducing a new method to retrieve blobs using only their cryptographic commitment, streamlining data access. Additionally, it expands the system's flexibility by enabling direct integration with celestia-app consensus nodes, offering an alternative data source for indexing and reducing dependency on dedicated DA nodes.

Highlights

  • Blob Query by Commitment: Implemented the ability to query blobs solely by their commitment, removing the need for height or namespace. This feature is supported with a dedicated database index and exposed through JSON-RPC, gRPC, and CLI interfaces.
  • CelestiaAppFetcher Integration: Introduced an alternative data fetching mechanism, CelestiaAppFetcher, which allows indexing from celestia-app consensus nodes via CometBFT RPC. This enables indexing without relying on a Celestia DA node and includes a BlobTx wire format parser.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • cmd/apex/blob_cmd.go
    • Added a new CLI command blobGetByCommitmentCmd to query blobs by commitment.
  • cmd/apex/main.go
    • Modified the runIndexer function to dynamically select between CelestiaNodeFetcher and CelestiaAppFetcher based on configuration.
    • Updated logging to include the selected data source type.
  • config/config.go
    • Introduced Type and CelestiaAppURL fields to DataSourceConfig to specify the data source backend.
    • Updated DefaultConfig to set the default data source type to 'node'.
  • config/load.go
    • Updated the default configuration YAML to include new data_source options for type and celestia_app_url.
    • Implemented validateDataSource to ensure correct URL configuration based on the chosen data source type.
    • Refactored the main validate function to use validateDataSource and removed redundant namespace validation.
  • go.mod
    • Promoted github.com/gorilla/websocket to a direct dependency.
  • pkg/api/grpc/blob_service.go
    • Added the GetByCommitment method to the BlobServiceServer for gRPC queries.
  • pkg/api/grpc/gen/apex/v1/blob.pb.go
    • Generated new protobuf messages GetByCommitmentRequest and GetByCommitmentResponse.
    • Updated message type and dependency indices to reflect the new messages.
  • pkg/api/grpc/gen/apex/v1/blob_grpc.pb.go
    • Generated new gRPC service definitions to include the GetByCommitment method.
  • pkg/api/grpc/server_test.go
    • Added a mock implementation for GetBlobByCommitment in mockStore for testing purposes.
  • pkg/api/jsonrpc/blob.go
    • Added the GetByCommitment method to the BlobHandler for JSON-RPC queries.
  • pkg/api/jsonrpc/server_test.go
    • Added a mock implementation for GetBlobByCommitment in mockStore for testing purposes.
  • pkg/api/service.go
    • Implemented the BlobGetByCommitment method to retrieve blobs from the store by commitment.
  • pkg/api/service_test.go
    • Added unit tests for BlobGetByCommitment and BlobGetByCommitmentNotFound.
  • pkg/fetch/blobtx.go
    • Added a new file containing functions to parse BlobTx envelopes and extract blobs from CometBFT transactions.
  • pkg/fetch/blobtx_test.go
    • Added a new file with comprehensive unit tests for BlobTx parsing logic.
  • pkg/fetch/celestia_app.go
    • Added a new file implementing CelestiaAppFetcher, which fetches data from celestia-app CometBFT RPC endpoints.
  • pkg/fetch/celestia_app_test.go
    • Added a new file with unit tests for CelestiaAppFetcher functionalities, including header and blob fetching, and header subscriptions.
  • pkg/store/migrations/002_commitment_index.sql
    • Added a new SQL migration to create an index on the commitment column in the blobs table for efficient lookups.
  • pkg/store/sqlite.go
    • Updated the migrate function to support multiple schema migration steps.
    • Implemented the GetBlobByCommitment method to query blobs using the new commitment index.
  • pkg/store/sqlite_test.go
    • Added unit tests for GetBlobByCommitment and GetBlobByCommitmentNotFound in the SQLite store.
  • pkg/store/store.go
    • Added GetBlobByCommitment to the Store interface definition.
  • pkg/sync/mock_test.go
    • Added a mock implementation for GetBlobByCommitment in mockStore for testing purposes.
  • proto/apex/v1/blob.proto
    • Added new protobuf messages GetByCommitmentRequest and GetByCommitmentResponse.
    • Defined the new GetByCommitment RPC method in the BlobService.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (9)
pkg/fetch/blobtx.go (1)

108-110: Strict rejection of unknown wire types may break forward compatibility.

If a future celestia-app version adds a field with a fixed32/fixed64 wire type to the blob proto, this parser will fail on all blobs. Consider skipping unknown wire types instead of erroring (protowire provides ConsumeFixed32/ConsumeFixed64 and ConsumeGroup for this).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/blobtx.go` around lines 108 - 110, The parser currently errors on
unknown wire types (default branch returning fmt.Errorf) which breaks forward
compatibility; update the default handling in pkg/fetch/blobtx.go (the switch
that currently returns rawBlob{} error) to consume and skip unknown/unsupported
wire types instead of failing: use protowire.ConsumeFixed32,
protowire.ConsumeFixed64 and protowire.ConsumeGroup (or the appropriate
protowire.Consume* helpers) based on the wire type value so the parser advances
past the unknown field, then continue parsing and return rawBlob as before; keep
existing behavior for known fields but replace the error-return in the default
branch with calls to protowire consume functions and ignore the consumed data.
pkg/api/service_test.go (2)

57-66: Use bytes.Equal instead of string() cast comparison.

Same nit as the other mock implementations: bytes.Equal(blobs[i].Commitment, commitment) is more idiomatic and avoids string allocation. Requires adding "bytes" to the import block.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/service_test.go` around lines 57 - 66, Replace the string-cast
comparison in mockStore.GetBlobByCommitment with bytes.Equal to avoid
allocations: change the conditional to use bytes.Equal(blobs[i].Commitment,
commitment) and add "bytes" to the import block; keep the rest of the method and
the ErrNotFound return unchanged.

153-186: New tests don't follow the table-driven pattern required by coding guidelines.

TestServiceBlobGetByCommitment and TestServiceBlobGetByCommitmentNotFound are natural candidates for a single table-driven test. As per coding guidelines, **/*_test.go requires the table-driven tests pattern.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/service_test.go` around lines 153 - 186, Combine
TestServiceBlobGetByCommitment and TestServiceBlobGetByCommitmentNotFound into a
single table-driven test that iterates over cases (e.g., "found" and "not
found"); for each case create/prepare the store using newMockStore() (and
testNamespace/commitment for the found case), construct the service via
NewService(..., &mockFetcher{}, ..., NewNotifier(...), zerolog.Nop()), call
svc.BlobGetByCommitment(ctx, commitment) and assert behavior per case (for
"found" unmarshal json.RawMessage and check presence of "commitment" key, for
"not found" assert errors.Is(err, store.ErrNotFound)); use t.Run with
descriptive names and keep setup/expectation logic inside each case to match the
project's table-driven test pattern for *_test.go files.
pkg/sync/mock_test.go (1)

83-92: Prefer bytes.Equal for byte-slice commitment comparison.

string() conversion works but allocates; bytes.Equal(m.blobs[i].Commitment, commitment) is idiomatic and zero-allocation. Requires adding "bytes" to the import block.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sync/mock_test.go` around lines 83 - 92, Replace the string-based
byte-slice comparison in mockStore.GetBlobByCommitment with bytes.Equal to avoid
allocations: inside GetBlobByCommitment compare m.blobs[i].Commitment and the
incoming commitment using bytes.Equal(m.blobs[i].Commitment, commitment) and add
the "bytes" package to the file imports; keep locking and return behavior
unchanged.
pkg/api/jsonrpc/server_test.go (1)

65-65: Use bytes.Equal instead of string() cast comparison.

bytes is already imported; bytes.Equal(blobs[i].Commitment, commitment) is more idiomatic and avoids allocations from the string conversion.

♻️ Proposed refactor
-			if string(blobs[i].Commitment) == string(commitment) {
+			if bytes.Equal(blobs[i].Commitment, commitment) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/jsonrpc/server_test.go` at line 65, Replace the string-cast
comparison used in the test with a bytes equality check: locate the comparison
using string(blobs[i].Commitment) == string(commitment) (within
pkg/api/jsonrpc/server_test.go, in the loop over blobs) and change it to use
bytes.Equal(blobs[i].Commitment, commitment) so it uses the already-imported
bytes package and avoids allocations.
pkg/store/sqlite_test.go (1)

134-170: New tests don't follow the table-driven pattern required by coding guidelines.

TestGetBlobByCommitment and TestGetBlobByCommitmentNotFound can be merged into a single table-driven test covering happy-path and not-found cases.

♻️ Proposed refactor
-func TestGetBlobByCommitment(t *testing.T) {
-	s := openTestDB(t)
-	ctx := context.Background()
-	ns := testNamespace(1)
-
-	blobs := []types.Blob{
-		{Height: 10, Namespace: ns, Commitment: []byte("c1"), Data: []byte("d1"), ShareVersion: 0, Index: 0},
-		{Height: 10, Namespace: ns, Commitment: []byte("c2"), Data: []byte("d2"), ShareVersion: 0, Index: 1},
-	}
-	if err := s.PutBlobs(ctx, blobs); err != nil {
-		t.Fatalf("PutBlobs: %v", err)
-	}
-
-	got, err := s.GetBlobByCommitment(ctx, []byte("c2"))
-	if err != nil {
-		t.Fatalf("GetBlobByCommitment: %v", err)
-	}
-	if string(got.Data) != "d2" {
-		t.Errorf("Data = %q, want %q", got.Data, "d2")
-	}
-	if got.Height != 10 {
-		t.Errorf("Height = %d, want 10", got.Height)
-	}
-	if got.Index != 1 {
-		t.Errorf("Index = %d, want 1", got.Index)
-	}
-}
-
-func TestGetBlobByCommitmentNotFound(t *testing.T) {
-	s := openTestDB(t)
-	ctx := context.Background()
-
-	_, err := s.GetBlobByCommitment(ctx, []byte("nonexistent"))
-	if !errors.Is(err, ErrNotFound) {
-		t.Fatalf("expected ErrNotFound, got %v", err)
-	}
-}
+func TestGetBlobByCommitment(t *testing.T) {
+	ns := testNamespace(1)
+	seed := []types.Blob{
+		{Height: 10, Namespace: ns, Commitment: []byte("c1"), Data: []byte("d1"), Index: 0},
+		{Height: 10, Namespace: ns, Commitment: []byte("c2"), Data: []byte("d2"), Index: 1},
+	}
+	tests := []struct {
+		name       string
+		commitment []byte
+		wantData   string
+		wantHeight uint64
+		wantIndex  int
+		wantErr    error
+	}{
+		{name: "found", commitment: []byte("c2"), wantData: "d2", wantHeight: 10, wantIndex: 1},
+		{name: "not found", commitment: []byte("nonexistent"), wantErr: ErrNotFound},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			s := openTestDB(t)
+			ctx := context.Background()
+			if err := s.PutBlobs(ctx, seed); err != nil {
+				t.Fatalf("PutBlobs: %v", err)
+			}
+			got, err := s.GetBlobByCommitment(ctx, tt.commitment)
+			if tt.wantErr != nil {
+				if !errors.Is(err, tt.wantErr) {
+					t.Fatalf("got err %v, want %v", err, tt.wantErr)
+				}
+				return
+			}
+			if err != nil {
+				t.Fatalf("GetBlobByCommitment: %v", err)
+			}
+			if string(got.Data) != tt.wantData {
+				t.Errorf("Data = %q, want %q", got.Data, tt.wantData)
+			}
+			if got.Height != tt.wantHeight {
+				t.Errorf("Height = %d, want %d", got.Height, tt.wantHeight)
+			}
+			if got.Index != tt.wantIndex {
+				t.Errorf("Index = %d, want %d", got.Index, tt.wantIndex)
+			}
+		})
+	}
+}

As per coding guidelines, **/*_test.go requires the table-driven tests pattern.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/sqlite_test.go` around lines 134 - 170, Merge
TestGetBlobByCommitment and TestGetBlobByCommitmentNotFound into a single
table-driven test (e.g. TestGetBlobByCommitment) that defines a slice of cases
with fields like name, commitment ([]byte), wantErr (error or nil), and expected
(Data, Height, Index). In the test body call openTestDB and PutBlobs once, then
iterate cases running t.Run(case.name, func(t *testing.T) { ctx :=
context.Background(); got, err := s.GetBlobByCommitment(ctx, case.commitment);
if case.wantErr != nil { if !errors.Is(err, case.wantErr) { t.Fatalf(...) }
return } if err != nil { t.Fatalf(...) } compare got.Data/Height/Index to
case.expected and fail with t.Errorf on mismatch }). Ensure you reference
GetBlobByCommitment, openTestDB, PutBlobs, and ErrNotFound in the cases and
assertions.
cmd/apex/main.go (2)

109-112: Redundant dsType normalization — dead code after Load() validates the type.

config.Load() calls validateDataSource() which rejects any unknown type (including ""), and DefaultConfig() initialises Type to "node" before YAML decoding. By the time startCmd.RunE runs, cfg.DataSource.Type is guaranteed to be "node" or "app". The if dsType == "" branch is never taken.

♻️ Simplification
-		dsType := cfg.DataSource.Type
-		if dsType == "" {
-			dsType = "node"
-		}
-		startLog := log.Info().
+		startLog := log.Info().
 			Str("version", version).
-			Str("datasource_type", dsType).
+			Str("datasource_type", cfg.DataSource.Type).
 			Int("namespaces", len(cfg.DataSource.Namespaces))
-		if dsType == "app" {
+		if cfg.DataSource.Type == "app" {
 			startLog = startLog.Str("app_url", cfg.DataSource.CelestiaAppURL)
 		} else {
 			startLog = startLog.Str("node_url", cfg.DataSource.CelestiaNodeURL)
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/apex/main.go` around lines 109 - 112, The dsType nil/empty check in
startCmd.RunE is redundant because cfg.DataSource.Type is already normalized by
DefaultConfig() and validated by config.Load()/validateDataSource(), so remove
the dead branch and use cfg.DataSource.Type directly; update startCmd.RunE to
reference cfg.DataSource.Type without the if dsType == "" fallback (keep any
existing uses of dsType but assign it directly from cfg.DataSource.Type) and run
tests to ensure nothing else depends on the removed fallback.

186-206: Missing default case leaves dataFetcher nil — add a defensive guard.

If cfg.DataSource.Type doesn't match either case (blocked by Load() validation today, but not enforced here), dataFetcher stays nil (the interface zero value). defer dataFetcher.Close() on line 206 would then panic when runIndexer returns. A default branch makes the unreachability explicit and protects against future callers bypassing Load().

🛡️ Proposed fix
 	switch cfg.DataSource.Type {
 	case "app":
 		...
 	case "node", "":
 		...
+	default:
+		return fmt.Errorf("unsupported datasource type %q", cfg.DataSource.Type)
 	}
 	defer dataFetcher.Close() //nolint:errcheck
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/apex/main.go` around lines 186 - 206, The switch on cfg.DataSource.Type
can leave dataFetcher nil; add a default branch in the switch that returns an
explicit error (e.g., fmt.Errorf("unsupported data source type: %q",
cfg.DataSource.Type)) so callers can't fall through to defer dataFetcher.Close()
with a nil interface. Update the switch containing NewCelestiaAppFetcher and
NewCelestiaNodeFetcher (where dataFetcher and proofFwd are set) to include this
default and ensure any early returns occur before the defer dataFetcher.Close()
call.
pkg/store/sqlite.go (1)

113-143: defer tx.Rollback() inside a loop accumulates defers until migrate() returns.

All deferred rollbacks pile up and only fire when migrate() exits. For committed transactions the rollback is a no-op, so correctness is preserved, but it is non-idiomatic and doesn't scale as migrations grow. Extract the per-migration work into a helper so the defer is scoped to each invocation.

♻️ Proposed refactor: extract `applyMigration` helper
+func (s *SQLiteStore) applyMigration(m migrationStep) error {
+	ddl, err := migrations.ReadFile(m.file)
+	if err != nil {
+		return fmt.Errorf("read migration %d: %w", m.version, err)
+	}
+	tx, err := s.writer.Begin()
+	if err != nil {
+		return fmt.Errorf("begin migration %d tx: %w", m.version, err)
+	}
+	defer tx.Rollback() //nolint:errcheck
+
+	if _, err := tx.Exec(string(ddl)); err != nil {
+		return fmt.Errorf("exec migration %d: %w", m.version, err)
+	}
+	if _, err := tx.Exec(fmt.Sprintf("PRAGMA user_version = %d", m.version)); err != nil {
+		return fmt.Errorf("set user_version to %d: %w", m.version, err)
+	}
+	return tx.Commit()
+}

 func (s *SQLiteStore) migrate() error {
 	var version int
 	if err := s.writer.QueryRow("PRAGMA user_version").Scan(&version); err != nil {
 		return fmt.Errorf("read user_version: %w", err)
 	}
 
 	for _, m := range allMigrations {
 		if version >= m.version {
 			continue
 		}
-
-		ddl, err := migrations.ReadFile(m.file)
-		if err != nil {
-			return fmt.Errorf("read migration %d: %w", m.version, err)
-		}
-
-		tx, err := s.writer.Begin()
-		if err != nil {
-			return fmt.Errorf("begin migration %d tx: %w", m.version, err)
-		}
-		defer tx.Rollback() //nolint:errcheck
-
-		if _, err := tx.Exec(string(ddl)); err != nil {
-			return fmt.Errorf("exec migration %d: %w", m.version, err)
-		}
-		if _, err := tx.Exec(fmt.Sprintf("PRAGMA user_version = %d", m.version)); err != nil {
-			return fmt.Errorf("set user_version to %d: %w", m.version, err)
-		}
-
-		if err := tx.Commit(); err != nil {
-			return fmt.Errorf("commit migration %d: %w", m.version, err)
+		if err := s.applyMigration(m); err != nil {
+			return err
 		}
 		version = m.version
 	}
 	return nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/sqlite.go` around lines 113 - 143, The loop in migrate() currently
defers tx.Rollback() inside each iteration which accumulates defers until
migrate() returns; extract the per-migration logic into a helper (e.g.,
applyMigration) that takes the store (s), the migration m (with m.version and
m.file), and returns the new version/error so the defer tx.Rollback() is
declared inside that helper and is released at the end of each migration; move
reading the ddl, beginning the transaction (s.writer.Begin), executing ddl and
PRAGMA, committing, and setting version into applyMigration and call it from the
loop so rollbacks are scoped per-migration.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/api/grpc/server_test.go`:
- Around line 62-71: Add a new integration test named
TestGRPCBlobGetByCommitment in pkg/api/grpc/server_test.go modeled after
TestGRPCBlobGet and TestGRPCBlobGetAll: create a mockStore containing blobs (the
mock method GetBlobByCommitment already present), start the gRPC server used by
other tests, create a gRPC client, call the blob retrieval RPC that corresponds
to the server's GetBlobByCommitment endpoint with a known commitment, and assert
the returned Blob matches the expected blob and no error occurred; ensure proper
context, teardown of the server and client, and use the same helper/setup
functions used by TestGRPCBlobGet/TestGRPCBlobGetAll for consistency.

In `@pkg/api/jsonrpc/blob.go`:
- Around line 26-29: The JSON-RPC path lacks an empty-commitment guard and
delegates to svc.BlobGetByCommitment, causing an ErrNotFound instead of the
gRPC-like InvalidArgument; add a guard inside the service implementation of
BlobGetByCommitment to validate commitment is non-empty and return a clear
"invalid argument" error (the same sentinel or mapped error used by the gRPC
layer) when len(commitment) == 0 so both BlobHandler.GetByCommitment and the
gRPC blob_service.go behavior are consistent.

In `@pkg/api/jsonrpc/server_test.go`:
- Around line 62-71: Add JSON-RPC integration tests that exercise the new
mockStore.GetBlobByCommitment via the RPC endpoint "blob.GetByCommitment":
follow the pattern in TestJSONRPCBlobGetAll to add two subtests (happy-path and
not-found) that call doRPC(t, srv, "blob.GetByCommitment", params) and assert
the returned blob matches the stored blob for the success case and that a
not-found error is returned for the missing-commitment case; locate usage points
around TestJSONRPCBlobGetAll and the mockStore.GetBlobByCommitment
implementation to mirror request/response encoding and error assertions.

In `@pkg/fetch/celestia_app_test.go`:
- Around line 215-243: The handler passed to httptest.NewServer currently calls
t.Errorf from its goroutine (see the anonymous http.HandlerFunc used to create
ts), which can race with test teardown; instead create an error/report channel
(e.g., errCh) or a sync primitive in the test scope and send any handler errors
into it rather than calling t.Errorf directly, then after exercising the server
and closing ts read the channel and call t.Errorf/t.Fatalf from the main test
goroutine (or use t.Logf plus a boolean flag checked after ts.Close()); update
the upgrader.Upgrade error handling and any other in-handler reports to use that
channel/flag and perform the actual t.* assertion in the test goroutine.
- Around line 152-185: The test TestCelestiaAppFetcherGetNetworkHead increments
requestCount from the httptest handler without synchronization causing a
potential data race; change requestCount to an atomic counter (e.g.,
atomic.Int32/atomic.AddInt32/atomic.LoadInt32) or protect it with a mutex so
increments inside the http.HandlerFunc are safe and the final assertion reads
the counter via atomic.Load or under the same mutex; update the handler (where
requestCount++ occurs) and the final check (where requestCount is compared) to
use the chosen synchronized access.

In `@pkg/fetch/celestia_app.go`:
- Around line 123-133: SubscribeHeaders currently overwrites f.cancelSub and
leaks the previous subscription; modify SubscribeHeaders so that while holding
f.mu you check if f.cancelSub is non-nil and call it to cancel the old
subscription before assigning the new cancel function, or alternatively return
an error if a subscription must be exclusive; specifically update the logic in
the SubscribeHeaders function to invoke the existing f.cancelSub() (and then set
f.cancelSub = cancel) under the mutex to avoid leaving the prior
goroutine/WebSocket active.
- Around line 173-208: The readHeaderLoop goroutine can block on
conn.ReadMessage() and not observe ctx cancellation; modify readHeaderLoop so
that when ctx is cancelled it forces ReadMessage to unblock by closing the
websocket or setting a read deadline: spawn a small watcher goroutine inside
readHeaderLoop that does <-ctx.Done() then calls conn.Close() (or
conn.SetReadDeadline(time.Now())) to unblock ReadMessage, ensuring you still
defer conn.Close() and preserve existing behavior around sending parsed headers
to out and parsing via parseWSBlockEvent.

---

Nitpick comments:
In `@cmd/apex/main.go`:
- Around line 109-112: The dsType nil/empty check in startCmd.RunE is redundant
because cfg.DataSource.Type is already normalized by DefaultConfig() and
validated by config.Load()/validateDataSource(), so remove the dead branch and
use cfg.DataSource.Type directly; update startCmd.RunE to reference
cfg.DataSource.Type without the if dsType == "" fallback (keep any existing uses
of dsType but assign it directly from cfg.DataSource.Type) and run tests to
ensure nothing else depends on the removed fallback.
- Around line 186-206: The switch on cfg.DataSource.Type can leave dataFetcher
nil; add a default branch in the switch that returns an explicit error (e.g.,
fmt.Errorf("unsupported data source type: %q", cfg.DataSource.Type)) so callers
can't fall through to defer dataFetcher.Close() with a nil interface. Update the
switch containing NewCelestiaAppFetcher and NewCelestiaNodeFetcher (where
dataFetcher and proofFwd are set) to include this default and ensure any early
returns occur before the defer dataFetcher.Close() call.

In `@pkg/api/jsonrpc/server_test.go`:
- Line 65: Replace the string-cast comparison used in the test with a bytes
equality check: locate the comparison using string(blobs[i].Commitment) ==
string(commitment) (within pkg/api/jsonrpc/server_test.go, in the loop over
blobs) and change it to use bytes.Equal(blobs[i].Commitment, commitment) so it
uses the already-imported bytes package and avoids allocations.

In `@pkg/api/service_test.go`:
- Around line 57-66: Replace the string-cast comparison in
mockStore.GetBlobByCommitment with bytes.Equal to avoid allocations: change the
conditional to use bytes.Equal(blobs[i].Commitment, commitment) and add "bytes"
to the import block; keep the rest of the method and the ErrNotFound return
unchanged.
- Around line 153-186: Combine TestServiceBlobGetByCommitment and
TestServiceBlobGetByCommitmentNotFound into a single table-driven test that
iterates over cases (e.g., "found" and "not found"); for each case
create/prepare the store using newMockStore() (and testNamespace/commitment for
the found case), construct the service via NewService(..., &mockFetcher{}, ...,
NewNotifier(...), zerolog.Nop()), call svc.BlobGetByCommitment(ctx, commitment)
and assert behavior per case (for "found" unmarshal json.RawMessage and check
presence of "commitment" key, for "not found" assert errors.Is(err,
store.ErrNotFound)); use t.Run with descriptive names and keep setup/expectation
logic inside each case to match the project's table-driven test pattern for
*_test.go files.

In `@pkg/fetch/blobtx.go`:
- Around line 108-110: The parser currently errors on unknown wire types
(default branch returning fmt.Errorf) which breaks forward compatibility; update
the default handling in pkg/fetch/blobtx.go (the switch that currently returns
rawBlob{} error) to consume and skip unknown/unsupported wire types instead of
failing: use protowire.ConsumeFixed32, protowire.ConsumeFixed64 and
protowire.ConsumeGroup (or the appropriate protowire.Consume* helpers) based on
the wire type value so the parser advances past the unknown field, then continue
parsing and return rawBlob as before; keep existing behavior for known fields
but replace the error-return in the default branch with calls to protowire
consume functions and ignore the consumed data.

In `@pkg/store/sqlite_test.go`:
- Around line 134-170: Merge TestGetBlobByCommitment and
TestGetBlobByCommitmentNotFound into a single table-driven test (e.g.
TestGetBlobByCommitment) that defines a slice of cases with fields like name,
commitment ([]byte), wantErr (error or nil), and expected (Data, Height, Index).
In the test body call openTestDB and PutBlobs once, then iterate cases running
t.Run(case.name, func(t *testing.T) { ctx := context.Background(); got, err :=
s.GetBlobByCommitment(ctx, case.commitment); if case.wantErr != nil { if
!errors.Is(err, case.wantErr) { t.Fatalf(...) } return } if err != nil {
t.Fatalf(...) } compare got.Data/Height/Index to case.expected and fail with
t.Errorf on mismatch }). Ensure you reference GetBlobByCommitment, openTestDB,
PutBlobs, and ErrNotFound in the cases and assertions.

In `@pkg/store/sqlite.go`:
- Around line 113-143: The loop in migrate() currently defers tx.Rollback()
inside each iteration which accumulates defers until migrate() returns; extract
the per-migration logic into a helper (e.g., applyMigration) that takes the
store (s), the migration m (with m.version and m.file), and returns the new
version/error so the defer tx.Rollback() is declared inside that helper and is
released at the end of each migration; move reading the ddl, beginning the
transaction (s.writer.Begin), executing ddl and PRAGMA, committing, and setting
version into applyMigration and call it from the loop so rollbacks are scoped
per-migration.

In `@pkg/sync/mock_test.go`:
- Around line 83-92: Replace the string-based byte-slice comparison in
mockStore.GetBlobByCommitment with bytes.Equal to avoid allocations: inside
GetBlobByCommitment compare m.blobs[i].Commitment and the incoming commitment
using bytes.Equal(m.blobs[i].Commitment, commitment) and add the "bytes" package
to the file imports; keep locking and return behavior unchanged.

Comment on lines +26 to +29
// GetByCommitment returns a blob matching the given commitment.
func (h *BlobHandler) GetByCommitment(ctx context.Context, commitment []byte) (json.RawMessage, error) {
return h.svc.BlobGetByCommitment(ctx, commitment)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing empty-commitment guard — inconsistent with the gRPC counterpart.

The gRPC handler (blob_service.go line 46) rejects an empty commitment with codes.InvalidArgument. The JSON-RPC path passes it straight to the service, which queries the store and returns ErrNotFound — a misleading signal. Adding the check at the service layer would fix both transports at once.

🛡️ Proposed fix (service layer)
 func (s *Service) BlobGetByCommitment(ctx context.Context, commitment []byte) (json.RawMessage, error) {
+	if len(commitment) == 0 {
+		return nil, fmt.Errorf("commitment is required")
+	}
 	b, err := s.store.GetBlobByCommitment(ctx, commitment)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/jsonrpc/blob.go` around lines 26 - 29, The JSON-RPC path lacks an
empty-commitment guard and delegates to svc.BlobGetByCommitment, causing an
ErrNotFound instead of the gRPC-like InvalidArgument; add a guard inside the
service implementation of BlobGetByCommitment to validate commitment is
non-empty and return a clear "invalid argument" error (the same sentinel or
mapped error used by the gRPC layer) when len(commitment) == 0 so both
BlobHandler.GetByCommitment and the gRPC blob_service.go behavior are
consistent.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces two significant features: the ability to fetch blobs by commitment (GetBlobByCommitment) and a new CelestiaAppFetcher for indexing from a consensus node. The changes are well-structured and include corresponding updates to the CLI, JSON-RPC, gRPC APIs, configuration, and database migrations. The new CelestiaAppFetcher is a great addition, providing more flexibility for deployment. I've included a couple of suggestions to improve configuration validation and API consistency.

Commitments live in MsgPayForBlobs.share_commitments inside the inner
Cosmos SDK tx, not in the blob proto. Rewrites the BlobTx parser to
traverse Tx -> TxBody -> Any -> MsgPayForBlobs and extract signer and
share_commitments. Signer prefers PFB-level, falls back to blob-level
(celestia-app v2+).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (6)
pkg/fetch/celestia_app_test.go (1)

85-153: Consider consolidating GetBlobs tests into a table-driven pattern.

TestCelestiaAppFetcherGetBlobs and TestCelestiaAppFetcherGetBlobsNoMatch share identical server setup and could be combined into a single table-driven test with cases like {name: "match", queryNS: ns, wantCount: 1} and {name: "no match", queryNS: ns99, wantCount: 0}. As per coding guidelines: "Use table-driven tests pattern for test implementation."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/celestia_app_test.go` around lines 85 - 153, Combine the two tests
into a single table-driven test (e.g., rename to TestCelestiaAppFetcherGetBlobs)
that defines cases like {name: "match", queryNS: ns, wantCount:1,
wantData:"blob-data", wantCommitment:"c1", wantSigner:"signer"} and {name: "no
match", queryNS: ns99, wantCount:0}; keep the shared httptest server setup and
NewCelestiaAppFetcher(...) once and loop cases with t.Run, calling
f.GetBlobs(ctx, 10, []types.Namespace{case.queryNS}) and asserting
length/contents (for wantCount==0 assert nil or zero length, for wantCount==1
assert Data/Commitment/Signer match). Reference functions/types:
TestCelestiaAppFetcherGetBlobs, GetBlobs, NewCelestiaAppFetcher, ns and ns99,
and reuse buildBlobTx/rawBlob as in the original tests.
pkg/fetch/blobtx.go (2)

127-161: parseMsgPayForBlobs rejects Fixed32/Fixed64 wire types; use ConsumeFieldValue in the default branch for forward-compatibility.

parseAny already handles unknown wire types gracefully via ConsumeFieldValue, but parseMsgPayForBlobs (and identically parseRawBlob at line 241) returns an error for any wire type other than Bytes and Varint. If a future proto revision adds a fixed-width field, or if unknown fields appear (e.g., from newer celestia-app versions), parsing will break unnecessarily.

♻️ Proposed fix — skip unknown wire types instead of erroring

For parseMsgPayForBlobs:

 		case protowire.VarintType:
 			_, n := protowire.ConsumeVarint(buf)
 			if n < 0 {
 				return pfbData{}, fmt.Errorf("field %d: invalid varint", num)
 			}
 			buf = buf[n:]
 		default:
-			return pfbData{}, fmt.Errorf("field %d: unsupported wire type %d", num, typ)
+			n := protowire.ConsumeFieldValue(num, typ, buf)
+			if n < 0 {
+				return pfbData{}, fmt.Errorf("field %d: cannot skip wire type %d", num, typ)
+			}
+			buf = buf[n:]
 		}

Apply the same pattern to parseRawBlob at lines 241-242.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/blobtx.go` around lines 127 - 161, parseMsgPayForBlobs currently
errors for any wire type other than Bytes and Varint; change its default branch
to skip unknown/ future wire types using protowire.ConsumeFieldValue so parsing
is forward-compatible (mirror the pattern used in parseAny). Specifically, in
parseMsgPayForBlobs replace the default error with a call to
protowire.ConsumeFieldValue(buf, typ), check n < 0 for an error, then advance
buf by n; apply the identical change to parseRawBlob so both functions skip
unknown fixed32/fixed64/other wire types instead of returning an error.

312-371: extractBytesField and extractRepeatedBytesField return aliased slices — safe today but fragile.

Both helpers return sub-slices of the input buffer without copying. This is fine given the current call chain (all relevant data is eventually copied in parseMsgPayForBlobs), but a future caller that retains these slices across buffer reuse could hit data corruption. A brief doc comment noting the aliasing would help prevent misuse.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/blobtx.go` around lines 312 - 371, extractBytesField and
extractRepeatedBytesField currently return subslices that alias the input
buffer; update their documentation to clearly state that the returned
[]byte/[][]byte reference the original data and may be invalidated if the input
buffer is reused, and instruct callers to make a copy (e.g., copy(dst, val))
when they need to retain the data beyond the lifetime of the input buffer;
mention the function names extractBytesField and extractRepeatedBytesField in
the comment so future maintainers see the warning when inspecting these helpers.
pkg/fetch/blobtx_test.go (3)

111-166: Coding guidelines require table-driven tests.

TestParseBlobTxSingleBlob and TestParseBlobTxMultiBlob test the same function with different inputs and could be consolidated into a single table-driven test. Similarly, the error-case tests (TestParseBlobTxNotBlobTx, TestParseBlobTxEmpty) and the extractBlobsFromBlock variants are candidates for the same pattern.

As per coding guidelines, "Use table-driven tests pattern for test implementation".

♻️ Sketch of table-driven approach for parseBlobTx tests
func TestParseBlobTx(t *testing.T) {
	tests := []struct {
		name            string
		input           []byte
		wantErr         bool
		wantBlobCount   int
		wantSigner      string
		wantCommitments []string
		wantData        []string
	}{
		{
			name: "single blob",
			input: buildBlobTx("celestia1abc", [][]byte{[]byte("commit1")},
				rawBlob{Namespace: testNS(1), Data: []byte("hello")},
			),
			wantBlobCount:   1,
			wantSigner:      "celestia1abc",
			wantCommitments: []string{"commit1"},
			wantData:        []string{"hello"},
		},
		{
			name: "multi blob",
			input: buildBlobTx("signer", [][]byte{[]byte("c1"), []byte("c2")},
				rawBlob{Namespace: testNS(1), Data: []byte("a")},
				rawBlob{Namespace: testNS(2), Data: []byte("b")},
			),
			wantBlobCount:   2,
			wantSigner:      "signer",
			wantCommitments: []string{"c1", "c2"},
			wantData:        []string{"a", "b"},
		},
		{
			name:    "not a BlobTx",
			input:   []byte{0x01, 0x02, 0x03},
			wantErr: true,
		},
		{
			name:    "empty input",
			input:   nil,
			wantErr: true,
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			parsed, err := parseBlobTx(tt.input)
			if tt.wantErr {
				if err == nil {
					t.Fatal("expected error")
				}
				return
			}
			if err != nil {
				t.Fatalf("parseBlobTx: %v", err)
			}
			if len(parsed.Blobs) != tt.wantBlobCount {
				t.Fatalf("got %d blobs, want %d", len(parsed.Blobs), tt.wantBlobCount)
			}
			// ... assert data, signer, commitments
		})
	}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/blobtx_test.go` around lines 111 - 166, Consolidate
TestParseBlobTxSingleBlob and TestParseBlobTxMultiBlob into a single
table-driven test (e.g., TestParseBlobTx) that iterates cases calling
parseBlobTx with different inputs; include cases for single-blob, multi-blob,
and error inputs (not-a-BlobTx, empty) and assert expected blob count, signer
(parsed.PFB.Signer), blob data (parsed.Blobs[].Data) and share commitments
(parsed.PFB.ShareCommitments[]) per case; apply the same table-driven pattern to
other related tests such as the parse errors and extractBlobsFromBlock variants
so each scenario is a table entry rather than separate test functions.

12-35: buildBlobProto skips zero-valued fields — consider adding a test case for zero ShareVersion/NamespaceVersion.

Since ShareVersion == 0 and NamespaceVersion == 0 are omitted from the wire format (correct proto3 behavior), the parser's handling of blobs with default-valued fields is never tested. A test case with explicit zero values would confirm parseRawBlob returns the correct defaults.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/blobtx_test.go` around lines 12 - 35, Add a unit test that verifies
parseRawBlob correctly handles default (zero) numeric fields: use buildBlobProto
to create a blob with ShareVersion and NamespaceVersion set to 0 (they will be
omitted from the wire), and non-empty Namespace/Data/Signer as needed, call
parseRawBlob on the bytes, and assert the returned rawBlob has ShareVersion == 0
and NamespaceVersion == 0 while other fields match; reference buildBlobProto and
parseRawBlob (and the rawBlob struct) to locate where to add the test.

182-238: Missing assertion on Blob.Index field.

extractBlobsFromBlock sets a cross-tx blobIndex on each returned blob, but the test never verifies it. Adding assertions for blobs[0].Index and blobs[1].Index would catch regressions in the index-tracking logic (e.g., the blobIndex drift noted in blobtx.go).

♻️ Add Index assertions
 	if string(blobs[0].Signer) != "signer1" {
 		t.Errorf("blobs[0].Signer = %q, want %q", blobs[0].Signer, "signer1")
 	}
+	if blobs[0].Index != 0 {
+		t.Errorf("blobs[0].Index = %d, want 0", blobs[0].Index)
+	}
 
 	// Second matching blob: ns2 from tx2
 	if string(blobs[1].Data) != "d2" {
 	if string(blobs[1].Signer) != "signer2" {
 		t.Errorf("blobs[1].Signer = %q, want %q", blobs[1].Signer, "signer2")
 	}
+	// blobIndex: tx1 had 2 blobs (indices 0,1), tx2 blob gets index 2
+	if blobs[1].Index != 2 {
+		t.Errorf("blobs[1].Index = %d, want 2", blobs[1].Index)
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/blobtx_test.go` around lines 182 - 238, Add assertions in
TestExtractBlobsFromBlock to verify the cross-tx blob index: check
blobs[0].Index == 0 and blobs[1].Index == 2 (since tx1 has two blobs (indices 0
and 1) and tx2's blob is the third encountered (index 2)); update the test to
t.Errorf/fatal if these expectations on blobs[0].Index and blobs[1].Index fail
so regressions in blobIndex tracking (as used by extractBlobsFromBlock /
blobIndex logic) are detected.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/fetch/celestia_app_test.go`:
- Around line 55-83: The test handler in TestCelestiaAppFetcherGetHeader calls
t.Errorf from the httptest goroutine which can race with the main test; replace
the in-handler t.Errorf by storing the unexpected path in a shared variable
(guarded by a sync.Mutex or atomic.Value) and return the 404 response, then
after ts.Close() / before test end assert in the main test goroutine that the
storedPath is empty (or fail the test with t.Fatalf if it was set). Update the
httptest.NewServer handler closure and the test assertions accordingly
(referencing TestCelestiaAppFetcherGetHeader and the handler used to serve
/block).

---

Duplicate comments:
In `@pkg/fetch/celestia_app_test.go`:
- Around line 218-246: The handler goroutine calls t.Errorf directly (in the
httptest.NewServer HTTP handler around the upgrader.Upgrade call), which races
with the test runner; instead create an error channel in the test, send the
error into that channel from the handler (e.g., on upgrader.Upgrade failure),
and perform the t.Errorf/t.Fatalf check from the main test goroutine after the
server interaction completes (read from the channel and fail if non-nil). Update
the code that uses upgrader.Upgrade in the httptest handler to send errors to
that channel and remove direct calls to t.Errorf from inside the handler.
- Around line 155-188: The test uses a plain int requestCount that is
incremented in the httptest handler goroutine and read in the test goroutine
causing a data race; replace it with sync/atomic's atomic.Int32 (declare var
requestCount atomic.Int32 in TestCelestiaAppFetcherGetNetworkHead), change the
increment in the handler to requestCount.Add(1) and change the assertion to
compare int(requestCount.Load()) (or compare to 2 using requestCount.Load()),
and ensure you import sync/atomic; update references to requestCount in this
test only.

---

Nitpick comments:
In `@pkg/fetch/blobtx_test.go`:
- Around line 111-166: Consolidate TestParseBlobTxSingleBlob and
TestParseBlobTxMultiBlob into a single table-driven test (e.g., TestParseBlobTx)
that iterates cases calling parseBlobTx with different inputs; include cases for
single-blob, multi-blob, and error inputs (not-a-BlobTx, empty) and assert
expected blob count, signer (parsed.PFB.Signer), blob data (parsed.Blobs[].Data)
and share commitments (parsed.PFB.ShareCommitments[]) per case; apply the same
table-driven pattern to other related tests such as the parse errors and
extractBlobsFromBlock variants so each scenario is a table entry rather than
separate test functions.
- Around line 12-35: Add a unit test that verifies parseRawBlob correctly
handles default (zero) numeric fields: use buildBlobProto to create a blob with
ShareVersion and NamespaceVersion set to 0 (they will be omitted from the wire),
and non-empty Namespace/Data/Signer as needed, call parseRawBlob on the bytes,
and assert the returned rawBlob has ShareVersion == 0 and NamespaceVersion == 0
while other fields match; reference buildBlobProto and parseRawBlob (and the
rawBlob struct) to locate where to add the test.
- Around line 182-238: Add assertions in TestExtractBlobsFromBlock to verify the
cross-tx blob index: check blobs[0].Index == 0 and blobs[1].Index == 2 (since
tx1 has two blobs (indices 0 and 1) and tx2's blob is the third encountered
(index 2)); update the test to t.Errorf/fatal if these expectations on
blobs[0].Index and blobs[1].Index fail so regressions in blobIndex tracking (as
used by extractBlobsFromBlock / blobIndex logic) are detected.

In `@pkg/fetch/blobtx.go`:
- Around line 127-161: parseMsgPayForBlobs currently errors for any wire type
other than Bytes and Varint; change its default branch to skip unknown/ future
wire types using protowire.ConsumeFieldValue so parsing is forward-compatible
(mirror the pattern used in parseAny). Specifically, in parseMsgPayForBlobs
replace the default error with a call to protowire.ConsumeFieldValue(buf, typ),
check n < 0 for an error, then advance buf by n; apply the identical change to
parseRawBlob so both functions skip unknown fixed32/fixed64/other wire types
instead of returning an error.
- Around line 312-371: extractBytesField and extractRepeatedBytesField currently
return subslices that alias the input buffer; update their documentation to
clearly state that the returned []byte/[][]byte reference the original data and
may be invalidated if the input buffer is reused, and instruct callers to make a
copy (e.g., copy(dst, val)) when they need to retain the data beyond the
lifetime of the input buffer; mention the function names extractBytesField and
extractRepeatedBytesField in the comment so future maintainers see the warning
when inspecting these helpers.

In `@pkg/fetch/celestia_app_test.go`:
- Around line 85-153: Combine the two tests into a single table-driven test
(e.g., rename to TestCelestiaAppFetcherGetBlobs) that defines cases like {name:
"match", queryNS: ns, wantCount:1, wantData:"blob-data", wantCommitment:"c1",
wantSigner:"signer"} and {name: "no match", queryNS: ns99, wantCount:0}; keep
the shared httptest server setup and NewCelestiaAppFetcher(...) once and loop
cases with t.Run, calling f.GetBlobs(ctx, 10, []types.Namespace{case.queryNS})
and asserting length/contents (for wantCount==0 assert nil or zero length, for
wantCount==1 assert Data/Commitment/Signer match). Reference functions/types:
TestCelestiaAppFetcherGetBlobs, GetBlobs, NewCelestiaAppFetcher, ns and ns99,
and reuse buildBlobTx/rawBlob as in the original tests.

Bug fixes:
- Cancel previous subscription before overwriting cancelSub in SubscribeHeaders
- Close WS connection on ctx cancel to unblock ReadMessage in readHeaderLoop
- Add empty commitment guard in service layer (consistent with gRPC handler)
- Use atomic.Int32 for requestCount in test handler goroutine
- Replace t.Errorf in handler goroutine with atomic.Value checked post-test

Missing test coverage:
- Add TestGRPCBlobGetByCommitment integration test
- Add TestJSONRPCBlobGetByCommitment integration test (found + not found)

Code quality:
- Remove dead dsType normalization in main.go
- Add default case in fetcher type switch to prevent nil dataFetcher
- Extract applyMigration helper to scope defer tx.Rollback per migration
- Use bytes.Equal instead of string cast in all mock GetBlobByCommitment
- Merge commitment store/service tests into table-driven pattern
- Skip unknown wire types in blob/PFB parsers for forward compatibility
- Fix misleading error message for empty datasource type

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@tac0turtle tac0turtle merged commit 41889d6 into main Feb 19, 2026
4 checks passed
@tac0turtle tac0turtle deleted the feat/phase4-extended-features branch February 19, 2026 16:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments