feat: add CelestiaAppFetcher and GetBlobByCommitment#31
Conversation
Implements two Phase 4 features: - GetBlobByCommitment: query blobs by commitment alone (no height/namespace required), with dedicated DB index, exposed via JSON-RPC, gRPC, and CLI. - CelestiaAppFetcher: alternative DataFetcher that ingests from celestia-app consensus nodes via CometBFT RPC, enabling indexing without a DA node. Includes BlobTx wire format parser using protowire (no new deps beyond promoting gorilla/websocket to direct). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📝 WalkthroughWalkthroughAdds blob-by-commitment lookup across API, store, CLI, and tests; introduces CelestiaAppFetcher as an alternative DataFetcher with HTTP/WS support; implements BlobTx parsing utilities; updates configuration to select datasource type and adds a DB index and migrations. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant CLI as CLI / RPC Client
participant Service as Service
participant Store as SQLiteStore
participant DB as Database
CLI->>Service: BlobGetByCommitment(commitment)
Service->>Store: GetBlobByCommitment(commitment)
Store->>DB: SELECT * FROM blobs WHERE commitment = ?
DB-->>Store: row (blob)
Store-->>Service: *types.Blob
Service->>Service: MarshalBlob(blob)
Service-->>CLI: JSON-encoded Blob
sequenceDiagram
autonumber
participant Main as main.go
participant Config as config.Load()
participant FetcherApp as CelestiaAppFetcher
participant FetcherNode as CelestiaNodeFetcher
participant Service as Service
Main->>Config: Load configuration
Config-->>Main: DataSourceConfig{Type, URLs}
Main->>Main: switch on Type
alt Type == "app"
Main->>FetcherApp: NewCelestiaAppFetcher(appURL)
FetcherApp-->>Main: appFetcher (no proofFwd)
Main->>Service: NewService(dataFetcher=appFetcher, proofFwd=nil)
else Type == "node"
Main->>FetcherNode: NewCelestiaNodeFetcher(nodeURL)
FetcherNode-->>Main: nodeFetcher
Main->>Service: NewService(dataFetcher=nodeFetcher, proofFwd=nodeFetcher)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the data indexing capabilities by introducing a new method to retrieve blobs using only their cryptographic commitment, streamlining data access. Additionally, it expands the system's flexibility by enabling direct integration with celestia-app consensus nodes, offering an alternative data source for indexing and reducing dependency on dedicated DA nodes. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (9)
pkg/fetch/blobtx.go (1)
108-110: Strict rejection of unknown wire types may break forward compatibility.If a future celestia-app version adds a field with a
fixed32/fixed64wire type to the blob proto, this parser will fail on all blobs. Consider skipping unknown wire types instead of erroring (protowire providesConsumeFixed32/ConsumeFixed64andConsumeGroupfor this).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/blobtx.go` around lines 108 - 110, The parser currently errors on unknown wire types (default branch returning fmt.Errorf) which breaks forward compatibility; update the default handling in pkg/fetch/blobtx.go (the switch that currently returns rawBlob{} error) to consume and skip unknown/unsupported wire types instead of failing: use protowire.ConsumeFixed32, protowire.ConsumeFixed64 and protowire.ConsumeGroup (or the appropriate protowire.Consume* helpers) based on the wire type value so the parser advances past the unknown field, then continue parsing and return rawBlob as before; keep existing behavior for known fields but replace the error-return in the default branch with calls to protowire consume functions and ignore the consumed data.pkg/api/service_test.go (2)
57-66: Usebytes.Equalinstead ofstring()cast comparison.Same nit as the other mock implementations:
bytes.Equal(blobs[i].Commitment, commitment)is more idiomatic and avoids string allocation. Requires adding"bytes"to the import block.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/service_test.go` around lines 57 - 66, Replace the string-cast comparison in mockStore.GetBlobByCommitment with bytes.Equal to avoid allocations: change the conditional to use bytes.Equal(blobs[i].Commitment, commitment) and add "bytes" to the import block; keep the rest of the method and the ErrNotFound return unchanged.
153-186: New tests don't follow the table-driven pattern required by coding guidelines.
TestServiceBlobGetByCommitmentandTestServiceBlobGetByCommitmentNotFoundare natural candidates for a single table-driven test. As per coding guidelines,**/*_test.gorequires the table-driven tests pattern.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/service_test.go` around lines 153 - 186, Combine TestServiceBlobGetByCommitment and TestServiceBlobGetByCommitmentNotFound into a single table-driven test that iterates over cases (e.g., "found" and "not found"); for each case create/prepare the store using newMockStore() (and testNamespace/commitment for the found case), construct the service via NewService(..., &mockFetcher{}, ..., NewNotifier(...), zerolog.Nop()), call svc.BlobGetByCommitment(ctx, commitment) and assert behavior per case (for "found" unmarshal json.RawMessage and check presence of "commitment" key, for "not found" assert errors.Is(err, store.ErrNotFound)); use t.Run with descriptive names and keep setup/expectation logic inside each case to match the project's table-driven test pattern for *_test.go files.pkg/sync/mock_test.go (1)
83-92: Preferbytes.Equalfor byte-slice commitment comparison.
string()conversion works but allocates;bytes.Equal(m.blobs[i].Commitment, commitment)is idiomatic and zero-allocation. Requires adding"bytes"to the import block.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sync/mock_test.go` around lines 83 - 92, Replace the string-based byte-slice comparison in mockStore.GetBlobByCommitment with bytes.Equal to avoid allocations: inside GetBlobByCommitment compare m.blobs[i].Commitment and the incoming commitment using bytes.Equal(m.blobs[i].Commitment, commitment) and add the "bytes" package to the file imports; keep locking and return behavior unchanged.pkg/api/jsonrpc/server_test.go (1)
65-65: Usebytes.Equalinstead ofstring()cast comparison.
bytesis already imported;bytes.Equal(blobs[i].Commitment, commitment)is more idiomatic and avoids allocations from the string conversion.♻️ Proposed refactor
- if string(blobs[i].Commitment) == string(commitment) { + if bytes.Equal(blobs[i].Commitment, commitment) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/jsonrpc/server_test.go` at line 65, Replace the string-cast comparison used in the test with a bytes equality check: locate the comparison using string(blobs[i].Commitment) == string(commitment) (within pkg/api/jsonrpc/server_test.go, in the loop over blobs) and change it to use bytes.Equal(blobs[i].Commitment, commitment) so it uses the already-imported bytes package and avoids allocations.pkg/store/sqlite_test.go (1)
134-170: New tests don't follow the table-driven pattern required by coding guidelines.
TestGetBlobByCommitmentandTestGetBlobByCommitmentNotFoundcan be merged into a single table-driven test covering happy-path and not-found cases.♻️ Proposed refactor
-func TestGetBlobByCommitment(t *testing.T) { - s := openTestDB(t) - ctx := context.Background() - ns := testNamespace(1) - - blobs := []types.Blob{ - {Height: 10, Namespace: ns, Commitment: []byte("c1"), Data: []byte("d1"), ShareVersion: 0, Index: 0}, - {Height: 10, Namespace: ns, Commitment: []byte("c2"), Data: []byte("d2"), ShareVersion: 0, Index: 1}, - } - if err := s.PutBlobs(ctx, blobs); err != nil { - t.Fatalf("PutBlobs: %v", err) - } - - got, err := s.GetBlobByCommitment(ctx, []byte("c2")) - if err != nil { - t.Fatalf("GetBlobByCommitment: %v", err) - } - if string(got.Data) != "d2" { - t.Errorf("Data = %q, want %q", got.Data, "d2") - } - if got.Height != 10 { - t.Errorf("Height = %d, want 10", got.Height) - } - if got.Index != 1 { - t.Errorf("Index = %d, want 1", got.Index) - } -} - -func TestGetBlobByCommitmentNotFound(t *testing.T) { - s := openTestDB(t) - ctx := context.Background() - - _, err := s.GetBlobByCommitment(ctx, []byte("nonexistent")) - if !errors.Is(err, ErrNotFound) { - t.Fatalf("expected ErrNotFound, got %v", err) - } -} +func TestGetBlobByCommitment(t *testing.T) { + ns := testNamespace(1) + seed := []types.Blob{ + {Height: 10, Namespace: ns, Commitment: []byte("c1"), Data: []byte("d1"), Index: 0}, + {Height: 10, Namespace: ns, Commitment: []byte("c2"), Data: []byte("d2"), Index: 1}, + } + tests := []struct { + name string + commitment []byte + wantData string + wantHeight uint64 + wantIndex int + wantErr error + }{ + {name: "found", commitment: []byte("c2"), wantData: "d2", wantHeight: 10, wantIndex: 1}, + {name: "not found", commitment: []byte("nonexistent"), wantErr: ErrNotFound}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := openTestDB(t) + ctx := context.Background() + if err := s.PutBlobs(ctx, seed); err != nil { + t.Fatalf("PutBlobs: %v", err) + } + got, err := s.GetBlobByCommitment(ctx, tt.commitment) + if tt.wantErr != nil { + if !errors.Is(err, tt.wantErr) { + t.Fatalf("got err %v, want %v", err, tt.wantErr) + } + return + } + if err != nil { + t.Fatalf("GetBlobByCommitment: %v", err) + } + if string(got.Data) != tt.wantData { + t.Errorf("Data = %q, want %q", got.Data, tt.wantData) + } + if got.Height != tt.wantHeight { + t.Errorf("Height = %d, want %d", got.Height, tt.wantHeight) + } + if got.Index != tt.wantIndex { + t.Errorf("Index = %d, want %d", got.Index, tt.wantIndex) + } + }) + } +}As per coding guidelines,
**/*_test.gorequires the table-driven tests pattern.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/store/sqlite_test.go` around lines 134 - 170, Merge TestGetBlobByCommitment and TestGetBlobByCommitmentNotFound into a single table-driven test (e.g. TestGetBlobByCommitment) that defines a slice of cases with fields like name, commitment ([]byte), wantErr (error or nil), and expected (Data, Height, Index). In the test body call openTestDB and PutBlobs once, then iterate cases running t.Run(case.name, func(t *testing.T) { ctx := context.Background(); got, err := s.GetBlobByCommitment(ctx, case.commitment); if case.wantErr != nil { if !errors.Is(err, case.wantErr) { t.Fatalf(...) } return } if err != nil { t.Fatalf(...) } compare got.Data/Height/Index to case.expected and fail with t.Errorf on mismatch }). Ensure you reference GetBlobByCommitment, openTestDB, PutBlobs, and ErrNotFound in the cases and assertions.cmd/apex/main.go (2)
109-112: RedundantdsTypenormalization — dead code afterLoad()validates the type.
config.Load()callsvalidateDataSource()which rejects any unknown type (including""), andDefaultConfig()initialisesTypeto"node"before YAML decoding. By the timestartCmd.RunEruns,cfg.DataSource.Typeis guaranteed to be"node"or"app". Theif dsType == ""branch is never taken.♻️ Simplification
- dsType := cfg.DataSource.Type - if dsType == "" { - dsType = "node" - } - startLog := log.Info(). + startLog := log.Info(). Str("version", version). - Str("datasource_type", dsType). + Str("datasource_type", cfg.DataSource.Type). Int("namespaces", len(cfg.DataSource.Namespaces)) - if dsType == "app" { + if cfg.DataSource.Type == "app" { startLog = startLog.Str("app_url", cfg.DataSource.CelestiaAppURL) } else { startLog = startLog.Str("node_url", cfg.DataSource.CelestiaNodeURL) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/apex/main.go` around lines 109 - 112, The dsType nil/empty check in startCmd.RunE is redundant because cfg.DataSource.Type is already normalized by DefaultConfig() and validated by config.Load()/validateDataSource(), so remove the dead branch and use cfg.DataSource.Type directly; update startCmd.RunE to reference cfg.DataSource.Type without the if dsType == "" fallback (keep any existing uses of dsType but assign it directly from cfg.DataSource.Type) and run tests to ensure nothing else depends on the removed fallback.
186-206: Missingdefaultcase leavesdataFetchernil — add a defensive guard.If
cfg.DataSource.Typedoesn't match either case (blocked byLoad()validation today, but not enforced here),dataFetcherstays nil (the interface zero value).defer dataFetcher.Close()on line 206 would then panic whenrunIndexerreturns. Adefaultbranch makes the unreachability explicit and protects against future callers bypassingLoad().🛡️ Proposed fix
switch cfg.DataSource.Type { case "app": ... case "node", "": ... + default: + return fmt.Errorf("unsupported datasource type %q", cfg.DataSource.Type) } defer dataFetcher.Close() //nolint:errcheck🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/apex/main.go` around lines 186 - 206, The switch on cfg.DataSource.Type can leave dataFetcher nil; add a default branch in the switch that returns an explicit error (e.g., fmt.Errorf("unsupported data source type: %q", cfg.DataSource.Type)) so callers can't fall through to defer dataFetcher.Close() with a nil interface. Update the switch containing NewCelestiaAppFetcher and NewCelestiaNodeFetcher (where dataFetcher and proofFwd are set) to include this default and ensure any early returns occur before the defer dataFetcher.Close() call.pkg/store/sqlite.go (1)
113-143:defer tx.Rollback()inside a loop accumulates defers untilmigrate()returns.All deferred rollbacks pile up and only fire when
migrate()exits. For committed transactions the rollback is a no-op, so correctness is preserved, but it is non-idiomatic and doesn't scale as migrations grow. Extract the per-migration work into a helper so thedeferis scoped to each invocation.♻️ Proposed refactor: extract `applyMigration` helper
+func (s *SQLiteStore) applyMigration(m migrationStep) error { + ddl, err := migrations.ReadFile(m.file) + if err != nil { + return fmt.Errorf("read migration %d: %w", m.version, err) + } + tx, err := s.writer.Begin() + if err != nil { + return fmt.Errorf("begin migration %d tx: %w", m.version, err) + } + defer tx.Rollback() //nolint:errcheck + + if _, err := tx.Exec(string(ddl)); err != nil { + return fmt.Errorf("exec migration %d: %w", m.version, err) + } + if _, err := tx.Exec(fmt.Sprintf("PRAGMA user_version = %d", m.version)); err != nil { + return fmt.Errorf("set user_version to %d: %w", m.version, err) + } + return tx.Commit() +} func (s *SQLiteStore) migrate() error { var version int if err := s.writer.QueryRow("PRAGMA user_version").Scan(&version); err != nil { return fmt.Errorf("read user_version: %w", err) } for _, m := range allMigrations { if version >= m.version { continue } - - ddl, err := migrations.ReadFile(m.file) - if err != nil { - return fmt.Errorf("read migration %d: %w", m.version, err) - } - - tx, err := s.writer.Begin() - if err != nil { - return fmt.Errorf("begin migration %d tx: %w", m.version, err) - } - defer tx.Rollback() //nolint:errcheck - - if _, err := tx.Exec(string(ddl)); err != nil { - return fmt.Errorf("exec migration %d: %w", m.version, err) - } - if _, err := tx.Exec(fmt.Sprintf("PRAGMA user_version = %d", m.version)); err != nil { - return fmt.Errorf("set user_version to %d: %w", m.version, err) - } - - if err := tx.Commit(); err != nil { - return fmt.Errorf("commit migration %d: %w", m.version, err) + if err := s.applyMigration(m); err != nil { + return err } version = m.version } return nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/store/sqlite.go` around lines 113 - 143, The loop in migrate() currently defers tx.Rollback() inside each iteration which accumulates defers until migrate() returns; extract the per-migration logic into a helper (e.g., applyMigration) that takes the store (s), the migration m (with m.version and m.file), and returns the new version/error so the defer tx.Rollback() is declared inside that helper and is released at the end of each migration; move reading the ddl, beginning the transaction (s.writer.Begin), executing ddl and PRAGMA, committing, and setting version into applyMigration and call it from the loop so rollbacks are scoped per-migration.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/api/grpc/server_test.go`:
- Around line 62-71: Add a new integration test named
TestGRPCBlobGetByCommitment in pkg/api/grpc/server_test.go modeled after
TestGRPCBlobGet and TestGRPCBlobGetAll: create a mockStore containing blobs (the
mock method GetBlobByCommitment already present), start the gRPC server used by
other tests, create a gRPC client, call the blob retrieval RPC that corresponds
to the server's GetBlobByCommitment endpoint with a known commitment, and assert
the returned Blob matches the expected blob and no error occurred; ensure proper
context, teardown of the server and client, and use the same helper/setup
functions used by TestGRPCBlobGet/TestGRPCBlobGetAll for consistency.
In `@pkg/api/jsonrpc/blob.go`:
- Around line 26-29: The JSON-RPC path lacks an empty-commitment guard and
delegates to svc.BlobGetByCommitment, causing an ErrNotFound instead of the
gRPC-like InvalidArgument; add a guard inside the service implementation of
BlobGetByCommitment to validate commitment is non-empty and return a clear
"invalid argument" error (the same sentinel or mapped error used by the gRPC
layer) when len(commitment) == 0 so both BlobHandler.GetByCommitment and the
gRPC blob_service.go behavior are consistent.
In `@pkg/api/jsonrpc/server_test.go`:
- Around line 62-71: Add JSON-RPC integration tests that exercise the new
mockStore.GetBlobByCommitment via the RPC endpoint "blob.GetByCommitment":
follow the pattern in TestJSONRPCBlobGetAll to add two subtests (happy-path and
not-found) that call doRPC(t, srv, "blob.GetByCommitment", params) and assert
the returned blob matches the stored blob for the success case and that a
not-found error is returned for the missing-commitment case; locate usage points
around TestJSONRPCBlobGetAll and the mockStore.GetBlobByCommitment
implementation to mirror request/response encoding and error assertions.
In `@pkg/fetch/celestia_app_test.go`:
- Around line 215-243: The handler passed to httptest.NewServer currently calls
t.Errorf from its goroutine (see the anonymous http.HandlerFunc used to create
ts), which can race with test teardown; instead create an error/report channel
(e.g., errCh) or a sync primitive in the test scope and send any handler errors
into it rather than calling t.Errorf directly, then after exercising the server
and closing ts read the channel and call t.Errorf/t.Fatalf from the main test
goroutine (or use t.Logf plus a boolean flag checked after ts.Close()); update
the upgrader.Upgrade error handling and any other in-handler reports to use that
channel/flag and perform the actual t.* assertion in the test goroutine.
- Around line 152-185: The test TestCelestiaAppFetcherGetNetworkHead increments
requestCount from the httptest handler without synchronization causing a
potential data race; change requestCount to an atomic counter (e.g.,
atomic.Int32/atomic.AddInt32/atomic.LoadInt32) or protect it with a mutex so
increments inside the http.HandlerFunc are safe and the final assertion reads
the counter via atomic.Load or under the same mutex; update the handler (where
requestCount++ occurs) and the final check (where requestCount is compared) to
use the chosen synchronized access.
In `@pkg/fetch/celestia_app.go`:
- Around line 123-133: SubscribeHeaders currently overwrites f.cancelSub and
leaks the previous subscription; modify SubscribeHeaders so that while holding
f.mu you check if f.cancelSub is non-nil and call it to cancel the old
subscription before assigning the new cancel function, or alternatively return
an error if a subscription must be exclusive; specifically update the logic in
the SubscribeHeaders function to invoke the existing f.cancelSub() (and then set
f.cancelSub = cancel) under the mutex to avoid leaving the prior
goroutine/WebSocket active.
- Around line 173-208: The readHeaderLoop goroutine can block on
conn.ReadMessage() and not observe ctx cancellation; modify readHeaderLoop so
that when ctx is cancelled it forces ReadMessage to unblock by closing the
websocket or setting a read deadline: spawn a small watcher goroutine inside
readHeaderLoop that does <-ctx.Done() then calls conn.Close() (or
conn.SetReadDeadline(time.Now())) to unblock ReadMessage, ensuring you still
defer conn.Close() and preserve existing behavior around sending parsed headers
to out and parsing via parseWSBlockEvent.
---
Nitpick comments:
In `@cmd/apex/main.go`:
- Around line 109-112: The dsType nil/empty check in startCmd.RunE is redundant
because cfg.DataSource.Type is already normalized by DefaultConfig() and
validated by config.Load()/validateDataSource(), so remove the dead branch and
use cfg.DataSource.Type directly; update startCmd.RunE to reference
cfg.DataSource.Type without the if dsType == "" fallback (keep any existing uses
of dsType but assign it directly from cfg.DataSource.Type) and run tests to
ensure nothing else depends on the removed fallback.
- Around line 186-206: The switch on cfg.DataSource.Type can leave dataFetcher
nil; add a default branch in the switch that returns an explicit error (e.g.,
fmt.Errorf("unsupported data source type: %q", cfg.DataSource.Type)) so callers
can't fall through to defer dataFetcher.Close() with a nil interface. Update the
switch containing NewCelestiaAppFetcher and NewCelestiaNodeFetcher (where
dataFetcher and proofFwd are set) to include this default and ensure any early
returns occur before the defer dataFetcher.Close() call.
In `@pkg/api/jsonrpc/server_test.go`:
- Line 65: Replace the string-cast comparison used in the test with a bytes
equality check: locate the comparison using string(blobs[i].Commitment) ==
string(commitment) (within pkg/api/jsonrpc/server_test.go, in the loop over
blobs) and change it to use bytes.Equal(blobs[i].Commitment, commitment) so it
uses the already-imported bytes package and avoids allocations.
In `@pkg/api/service_test.go`:
- Around line 57-66: Replace the string-cast comparison in
mockStore.GetBlobByCommitment with bytes.Equal to avoid allocations: change the
conditional to use bytes.Equal(blobs[i].Commitment, commitment) and add "bytes"
to the import block; keep the rest of the method and the ErrNotFound return
unchanged.
- Around line 153-186: Combine TestServiceBlobGetByCommitment and
TestServiceBlobGetByCommitmentNotFound into a single table-driven test that
iterates over cases (e.g., "found" and "not found"); for each case
create/prepare the store using newMockStore() (and testNamespace/commitment for
the found case), construct the service via NewService(..., &mockFetcher{}, ...,
NewNotifier(...), zerolog.Nop()), call svc.BlobGetByCommitment(ctx, commitment)
and assert behavior per case (for "found" unmarshal json.RawMessage and check
presence of "commitment" key, for "not found" assert errors.Is(err,
store.ErrNotFound)); use t.Run with descriptive names and keep setup/expectation
logic inside each case to match the project's table-driven test pattern for
*_test.go files.
In `@pkg/fetch/blobtx.go`:
- Around line 108-110: The parser currently errors on unknown wire types
(default branch returning fmt.Errorf) which breaks forward compatibility; update
the default handling in pkg/fetch/blobtx.go (the switch that currently returns
rawBlob{} error) to consume and skip unknown/unsupported wire types instead of
failing: use protowire.ConsumeFixed32, protowire.ConsumeFixed64 and
protowire.ConsumeGroup (or the appropriate protowire.Consume* helpers) based on
the wire type value so the parser advances past the unknown field, then continue
parsing and return rawBlob as before; keep existing behavior for known fields
but replace the error-return in the default branch with calls to protowire
consume functions and ignore the consumed data.
In `@pkg/store/sqlite_test.go`:
- Around line 134-170: Merge TestGetBlobByCommitment and
TestGetBlobByCommitmentNotFound into a single table-driven test (e.g.
TestGetBlobByCommitment) that defines a slice of cases with fields like name,
commitment ([]byte), wantErr (error or nil), and expected (Data, Height, Index).
In the test body call openTestDB and PutBlobs once, then iterate cases running
t.Run(case.name, func(t *testing.T) { ctx := context.Background(); got, err :=
s.GetBlobByCommitment(ctx, case.commitment); if case.wantErr != nil { if
!errors.Is(err, case.wantErr) { t.Fatalf(...) } return } if err != nil {
t.Fatalf(...) } compare got.Data/Height/Index to case.expected and fail with
t.Errorf on mismatch }). Ensure you reference GetBlobByCommitment, openTestDB,
PutBlobs, and ErrNotFound in the cases and assertions.
In `@pkg/store/sqlite.go`:
- Around line 113-143: The loop in migrate() currently defers tx.Rollback()
inside each iteration which accumulates defers until migrate() returns; extract
the per-migration logic into a helper (e.g., applyMigration) that takes the
store (s), the migration m (with m.version and m.file), and returns the new
version/error so the defer tx.Rollback() is declared inside that helper and is
released at the end of each migration; move reading the ddl, beginning the
transaction (s.writer.Begin), executing ddl and PRAGMA, committing, and setting
version into applyMigration and call it from the loop so rollbacks are scoped
per-migration.
In `@pkg/sync/mock_test.go`:
- Around line 83-92: Replace the string-based byte-slice comparison in
mockStore.GetBlobByCommitment with bytes.Equal to avoid allocations: inside
GetBlobByCommitment compare m.blobs[i].Commitment and the incoming commitment
using bytes.Equal(m.blobs[i].Commitment, commitment) and add the "bytes" package
to the file imports; keep locking and return behavior unchanged.
| // GetByCommitment returns a blob matching the given commitment. | ||
| func (h *BlobHandler) GetByCommitment(ctx context.Context, commitment []byte) (json.RawMessage, error) { | ||
| return h.svc.BlobGetByCommitment(ctx, commitment) | ||
| } |
There was a problem hiding this comment.
Missing empty-commitment guard — inconsistent with the gRPC counterpart.
The gRPC handler (blob_service.go line 46) rejects an empty commitment with codes.InvalidArgument. The JSON-RPC path passes it straight to the service, which queries the store and returns ErrNotFound — a misleading signal. Adding the check at the service layer would fix both transports at once.
🛡️ Proposed fix (service layer)
func (s *Service) BlobGetByCommitment(ctx context.Context, commitment []byte) (json.RawMessage, error) {
+ if len(commitment) == 0 {
+ return nil, fmt.Errorf("commitment is required")
+ }
b, err := s.store.GetBlobByCommitment(ctx, commitment)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/api/jsonrpc/blob.go` around lines 26 - 29, The JSON-RPC path lacks an
empty-commitment guard and delegates to svc.BlobGetByCommitment, causing an
ErrNotFound instead of the gRPC-like InvalidArgument; add a guard inside the
service implementation of BlobGetByCommitment to validate commitment is
non-empty and return a clear "invalid argument" error (the same sentinel or
mapped error used by the gRPC layer) when len(commitment) == 0 so both
BlobHandler.GetByCommitment and the gRPC blob_service.go behavior are
consistent.
There was a problem hiding this comment.
Code Review
This pull request introduces two significant features: the ability to fetch blobs by commitment (GetBlobByCommitment) and a new CelestiaAppFetcher for indexing from a consensus node. The changes are well-structured and include corresponding updates to the CLI, JSON-RPC, gRPC APIs, configuration, and database migrations. The new CelestiaAppFetcher is a great addition, providing more flexibility for deployment. I've included a couple of suggestions to improve configuration validation and API consistency.
Commitments live in MsgPayForBlobs.share_commitments inside the inner Cosmos SDK tx, not in the blob proto. Rewrites the BlobTx parser to traverse Tx -> TxBody -> Any -> MsgPayForBlobs and extract signer and share_commitments. Signer prefers PFB-level, falls back to blob-level (celestia-app v2+). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (6)
pkg/fetch/celestia_app_test.go (1)
85-153: Consider consolidating GetBlobs tests into a table-driven pattern.
TestCelestiaAppFetcherGetBlobsandTestCelestiaAppFetcherGetBlobsNoMatchshare identical server setup and could be combined into a single table-driven test with cases like{name: "match", queryNS: ns, wantCount: 1}and{name: "no match", queryNS: ns99, wantCount: 0}. As per coding guidelines: "Use table-driven tests pattern for test implementation."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/celestia_app_test.go` around lines 85 - 153, Combine the two tests into a single table-driven test (e.g., rename to TestCelestiaAppFetcherGetBlobs) that defines cases like {name: "match", queryNS: ns, wantCount:1, wantData:"blob-data", wantCommitment:"c1", wantSigner:"signer"} and {name: "no match", queryNS: ns99, wantCount:0}; keep the shared httptest server setup and NewCelestiaAppFetcher(...) once and loop cases with t.Run, calling f.GetBlobs(ctx, 10, []types.Namespace{case.queryNS}) and asserting length/contents (for wantCount==0 assert nil or zero length, for wantCount==1 assert Data/Commitment/Signer match). Reference functions/types: TestCelestiaAppFetcherGetBlobs, GetBlobs, NewCelestiaAppFetcher, ns and ns99, and reuse buildBlobTx/rawBlob as in the original tests.pkg/fetch/blobtx.go (2)
127-161:parseMsgPayForBlobsrejects Fixed32/Fixed64 wire types; useConsumeFieldValuein the default branch for forward-compatibility.
parseAnyalready handles unknown wire types gracefully viaConsumeFieldValue, butparseMsgPayForBlobs(and identicallyparseRawBlobat line 241) returns an error for any wire type other than Bytes and Varint. If a future proto revision adds a fixed-width field, or if unknown fields appear (e.g., from newer celestia-app versions), parsing will break unnecessarily.♻️ Proposed fix — skip unknown wire types instead of erroring
For
parseMsgPayForBlobs:case protowire.VarintType: _, n := protowire.ConsumeVarint(buf) if n < 0 { return pfbData{}, fmt.Errorf("field %d: invalid varint", num) } buf = buf[n:] default: - return pfbData{}, fmt.Errorf("field %d: unsupported wire type %d", num, typ) + n := protowire.ConsumeFieldValue(num, typ, buf) + if n < 0 { + return pfbData{}, fmt.Errorf("field %d: cannot skip wire type %d", num, typ) + } + buf = buf[n:] }Apply the same pattern to
parseRawBlobat lines 241-242.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/blobtx.go` around lines 127 - 161, parseMsgPayForBlobs currently errors for any wire type other than Bytes and Varint; change its default branch to skip unknown/ future wire types using protowire.ConsumeFieldValue so parsing is forward-compatible (mirror the pattern used in parseAny). Specifically, in parseMsgPayForBlobs replace the default error with a call to protowire.ConsumeFieldValue(buf, typ), check n < 0 for an error, then advance buf by n; apply the identical change to parseRawBlob so both functions skip unknown fixed32/fixed64/other wire types instead of returning an error.
312-371:extractBytesFieldandextractRepeatedBytesFieldreturn aliased slices — safe today but fragile.Both helpers return sub-slices of the input buffer without copying. This is fine given the current call chain (all relevant data is eventually copied in
parseMsgPayForBlobs), but a future caller that retains these slices across buffer reuse could hit data corruption. A brief doc comment noting the aliasing would help prevent misuse.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/blobtx.go` around lines 312 - 371, extractBytesField and extractRepeatedBytesField currently return subslices that alias the input buffer; update their documentation to clearly state that the returned []byte/[][]byte reference the original data and may be invalidated if the input buffer is reused, and instruct callers to make a copy (e.g., copy(dst, val)) when they need to retain the data beyond the lifetime of the input buffer; mention the function names extractBytesField and extractRepeatedBytesField in the comment so future maintainers see the warning when inspecting these helpers.pkg/fetch/blobtx_test.go (3)
111-166: Coding guidelines require table-driven tests.
TestParseBlobTxSingleBlobandTestParseBlobTxMultiBlobtest the same function with different inputs and could be consolidated into a single table-driven test. Similarly, the error-case tests (TestParseBlobTxNotBlobTx,TestParseBlobTxEmpty) and theextractBlobsFromBlockvariants are candidates for the same pattern.As per coding guidelines, "Use table-driven tests pattern for test implementation".
♻️ Sketch of table-driven approach for parseBlobTx tests
func TestParseBlobTx(t *testing.T) { tests := []struct { name string input []byte wantErr bool wantBlobCount int wantSigner string wantCommitments []string wantData []string }{ { name: "single blob", input: buildBlobTx("celestia1abc", [][]byte{[]byte("commit1")}, rawBlob{Namespace: testNS(1), Data: []byte("hello")}, ), wantBlobCount: 1, wantSigner: "celestia1abc", wantCommitments: []string{"commit1"}, wantData: []string{"hello"}, }, { name: "multi blob", input: buildBlobTx("signer", [][]byte{[]byte("c1"), []byte("c2")}, rawBlob{Namespace: testNS(1), Data: []byte("a")}, rawBlob{Namespace: testNS(2), Data: []byte("b")}, ), wantBlobCount: 2, wantSigner: "signer", wantCommitments: []string{"c1", "c2"}, wantData: []string{"a", "b"}, }, { name: "not a BlobTx", input: []byte{0x01, 0x02, 0x03}, wantErr: true, }, { name: "empty input", input: nil, wantErr: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { parsed, err := parseBlobTx(tt.input) if tt.wantErr { if err == nil { t.Fatal("expected error") } return } if err != nil { t.Fatalf("parseBlobTx: %v", err) } if len(parsed.Blobs) != tt.wantBlobCount { t.Fatalf("got %d blobs, want %d", len(parsed.Blobs), tt.wantBlobCount) } // ... assert data, signer, commitments }) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/blobtx_test.go` around lines 111 - 166, Consolidate TestParseBlobTxSingleBlob and TestParseBlobTxMultiBlob into a single table-driven test (e.g., TestParseBlobTx) that iterates cases calling parseBlobTx with different inputs; include cases for single-blob, multi-blob, and error inputs (not-a-BlobTx, empty) and assert expected blob count, signer (parsed.PFB.Signer), blob data (parsed.Blobs[].Data) and share commitments (parsed.PFB.ShareCommitments[]) per case; apply the same table-driven pattern to other related tests such as the parse errors and extractBlobsFromBlock variants so each scenario is a table entry rather than separate test functions.
12-35:buildBlobProtoskips zero-valued fields — consider adding a test case for zeroShareVersion/NamespaceVersion.Since
ShareVersion == 0andNamespaceVersion == 0are omitted from the wire format (correct proto3 behavior), the parser's handling of blobs with default-valued fields is never tested. A test case with explicit zero values would confirmparseRawBlobreturns the correct defaults.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/blobtx_test.go` around lines 12 - 35, Add a unit test that verifies parseRawBlob correctly handles default (zero) numeric fields: use buildBlobProto to create a blob with ShareVersion and NamespaceVersion set to 0 (they will be omitted from the wire), and non-empty Namespace/Data/Signer as needed, call parseRawBlob on the bytes, and assert the returned rawBlob has ShareVersion == 0 and NamespaceVersion == 0 while other fields match; reference buildBlobProto and parseRawBlob (and the rawBlob struct) to locate where to add the test.
182-238: Missing assertion onBlob.Indexfield.
extractBlobsFromBlocksets a cross-txblobIndexon each returned blob, but the test never verifies it. Adding assertions forblobs[0].Indexandblobs[1].Indexwould catch regressions in the index-tracking logic (e.g., theblobIndexdrift noted inblobtx.go).♻️ Add Index assertions
if string(blobs[0].Signer) != "signer1" { t.Errorf("blobs[0].Signer = %q, want %q", blobs[0].Signer, "signer1") } + if blobs[0].Index != 0 { + t.Errorf("blobs[0].Index = %d, want 0", blobs[0].Index) + } // Second matching blob: ns2 from tx2 if string(blobs[1].Data) != "d2" {if string(blobs[1].Signer) != "signer2" { t.Errorf("blobs[1].Signer = %q, want %q", blobs[1].Signer, "signer2") } + // blobIndex: tx1 had 2 blobs (indices 0,1), tx2 blob gets index 2 + if blobs[1].Index != 2 { + t.Errorf("blobs[1].Index = %d, want 2", blobs[1].Index) + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/blobtx_test.go` around lines 182 - 238, Add assertions in TestExtractBlobsFromBlock to verify the cross-tx blob index: check blobs[0].Index == 0 and blobs[1].Index == 2 (since tx1 has two blobs (indices 0 and 1) and tx2's blob is the third encountered (index 2)); update the test to t.Errorf/fatal if these expectations on blobs[0].Index and blobs[1].Index fail so regressions in blobIndex tracking (as used by extractBlobsFromBlock / blobIndex logic) are detected.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/fetch/celestia_app_test.go`:
- Around line 55-83: The test handler in TestCelestiaAppFetcherGetHeader calls
t.Errorf from the httptest goroutine which can race with the main test; replace
the in-handler t.Errorf by storing the unexpected path in a shared variable
(guarded by a sync.Mutex or atomic.Value) and return the 404 response, then
after ts.Close() / before test end assert in the main test goroutine that the
storedPath is empty (or fail the test with t.Fatalf if it was set). Update the
httptest.NewServer handler closure and the test assertions accordingly
(referencing TestCelestiaAppFetcherGetHeader and the handler used to serve
/block).
---
Duplicate comments:
In `@pkg/fetch/celestia_app_test.go`:
- Around line 218-246: The handler goroutine calls t.Errorf directly (in the
httptest.NewServer HTTP handler around the upgrader.Upgrade call), which races
with the test runner; instead create an error channel in the test, send the
error into that channel from the handler (e.g., on upgrader.Upgrade failure),
and perform the t.Errorf/t.Fatalf check from the main test goroutine after the
server interaction completes (read from the channel and fail if non-nil). Update
the code that uses upgrader.Upgrade in the httptest handler to send errors to
that channel and remove direct calls to t.Errorf from inside the handler.
- Around line 155-188: The test uses a plain int requestCount that is
incremented in the httptest handler goroutine and read in the test goroutine
causing a data race; replace it with sync/atomic's atomic.Int32 (declare var
requestCount atomic.Int32 in TestCelestiaAppFetcherGetNetworkHead), change the
increment in the handler to requestCount.Add(1) and change the assertion to
compare int(requestCount.Load()) (or compare to 2 using requestCount.Load()),
and ensure you import sync/atomic; update references to requestCount in this
test only.
---
Nitpick comments:
In `@pkg/fetch/blobtx_test.go`:
- Around line 111-166: Consolidate TestParseBlobTxSingleBlob and
TestParseBlobTxMultiBlob into a single table-driven test (e.g., TestParseBlobTx)
that iterates cases calling parseBlobTx with different inputs; include cases for
single-blob, multi-blob, and error inputs (not-a-BlobTx, empty) and assert
expected blob count, signer (parsed.PFB.Signer), blob data (parsed.Blobs[].Data)
and share commitments (parsed.PFB.ShareCommitments[]) per case; apply the same
table-driven pattern to other related tests such as the parse errors and
extractBlobsFromBlock variants so each scenario is a table entry rather than
separate test functions.
- Around line 12-35: Add a unit test that verifies parseRawBlob correctly
handles default (zero) numeric fields: use buildBlobProto to create a blob with
ShareVersion and NamespaceVersion set to 0 (they will be omitted from the wire),
and non-empty Namespace/Data/Signer as needed, call parseRawBlob on the bytes,
and assert the returned rawBlob has ShareVersion == 0 and NamespaceVersion == 0
while other fields match; reference buildBlobProto and parseRawBlob (and the
rawBlob struct) to locate where to add the test.
- Around line 182-238: Add assertions in TestExtractBlobsFromBlock to verify the
cross-tx blob index: check blobs[0].Index == 0 and blobs[1].Index == 2 (since
tx1 has two blobs (indices 0 and 1) and tx2's blob is the third encountered
(index 2)); update the test to t.Errorf/fatal if these expectations on
blobs[0].Index and blobs[1].Index fail so regressions in blobIndex tracking (as
used by extractBlobsFromBlock / blobIndex logic) are detected.
In `@pkg/fetch/blobtx.go`:
- Around line 127-161: parseMsgPayForBlobs currently errors for any wire type
other than Bytes and Varint; change its default branch to skip unknown/ future
wire types using protowire.ConsumeFieldValue so parsing is forward-compatible
(mirror the pattern used in parseAny). Specifically, in parseMsgPayForBlobs
replace the default error with a call to protowire.ConsumeFieldValue(buf, typ),
check n < 0 for an error, then advance buf by n; apply the identical change to
parseRawBlob so both functions skip unknown fixed32/fixed64/other wire types
instead of returning an error.
- Around line 312-371: extractBytesField and extractRepeatedBytesField currently
return subslices that alias the input buffer; update their documentation to
clearly state that the returned []byte/[][]byte reference the original data and
may be invalidated if the input buffer is reused, and instruct callers to make a
copy (e.g., copy(dst, val)) when they need to retain the data beyond the
lifetime of the input buffer; mention the function names extractBytesField and
extractRepeatedBytesField in the comment so future maintainers see the warning
when inspecting these helpers.
In `@pkg/fetch/celestia_app_test.go`:
- Around line 85-153: Combine the two tests into a single table-driven test
(e.g., rename to TestCelestiaAppFetcherGetBlobs) that defines cases like {name:
"match", queryNS: ns, wantCount:1, wantData:"blob-data", wantCommitment:"c1",
wantSigner:"signer"} and {name: "no match", queryNS: ns99, wantCount:0}; keep
the shared httptest server setup and NewCelestiaAppFetcher(...) once and loop
cases with t.Run, calling f.GetBlobs(ctx, 10, []types.Namespace{case.queryNS})
and asserting length/contents (for wantCount==0 assert nil or zero length, for
wantCount==1 assert Data/Commitment/Signer match). Reference functions/types:
TestCelestiaAppFetcherGetBlobs, GetBlobs, NewCelestiaAppFetcher, ns and ns99,
and reuse buildBlobTx/rawBlob as in the original tests.
Bug fixes: - Cancel previous subscription before overwriting cancelSub in SubscribeHeaders - Close WS connection on ctx cancel to unblock ReadMessage in readHeaderLoop - Add empty commitment guard in service layer (consistent with gRPC handler) - Use atomic.Int32 for requestCount in test handler goroutine - Replace t.Errorf in handler goroutine with atomic.Value checked post-test Missing test coverage: - Add TestGRPCBlobGetByCommitment integration test - Add TestJSONRPCBlobGetByCommitment integration test (found + not found) Code quality: - Remove dead dsType normalization in main.go - Add default case in fetcher type switch to prevent nil dataFetcher - Extract applyMigration helper to scope defer tx.Rollback per migration - Use bytes.Equal instead of string cast in all mock GetBlobByCommitment - Merge commitment store/service tests into table-driven pattern - Skip unknown wire types in blob/PFB parsers for forward compatibility - Fix misleading error message for empty datasource type Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Implements two Phase 4 features:
Overview
Summary by CodeRabbit
New Features
Tests