Skip to content

Feat/phase2 api layer#29

Merged
tac0turtle merged 5 commits intomainfrom
feat/phase2-api-layer
Feb 19, 2026
Merged

Feat/phase2 api layer#29
tac0turtle merged 5 commits intomainfrom
feat/phase2-api-layer

Conversation

@tac0turtle
Copy link
Contributor

@tac0turtle tac0turtle commented Feb 19, 2026

Overview

Summary by CodeRabbit

  • New Features

    • gRPC and JSON‑RPC APIs for blobs and headers with live subscriptions, server startup/shutdown and read‑only stub methods
    • Protobuf service/types and a build target to generate gRPC code
    • API service & notifier delivering real‑time events with configurable subscription buffer and gRPC listen address
  • Refactor

    • Storage I/O split into separate read/write pools
    • Store API extended for pagination (limit/offset)
  • Tests

    • Extensive unit/integration tests for servers, API service, notifier, and storage
  • Chores

    • Tooling and dependency updates; config defaults added (including gRPC listen addr)

tac0turtle and others added 2 commits February 19, 2026 14:38
…ucture (Phase 2)

Add the complete API layer so rollup clients can connect to Apex as a
drop-in replacement for celestia-node. This implements issues #14, #2,
and #3.

Subscription infrastructure (#14):
- Event fan-out notifier with configurable buffer, namespace filtering,
  contiguity tracking, and non-blocking publish with capacity warnings
- SQLite read/write pool split for concurrent API reads
- Observer hook on sync coordinator for publishing height events

JSON-RPC compatibility layer (#2):
- Shared service layer with blob/header operations
- celestia-node compatible JSON-RPC handlers (blob, header modules)
- Proof forwarding to upstream node (GetProof, Included)
- Stub modules for share, fraud, blobstream
- WebSocket subscription support for blob and header streams

gRPC API (#3):
- Protobuf service definitions (BlobService, HeaderService)
- Server-streaming Subscribe RPCs for real-time events
- Buf configuration for proto generation
- Separate configurable port (default :9090)

Both API servers wire through the shared Service layer with graceful
shutdown on SIGINT/SIGTERM.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link

coderabbitai bot commented Feb 19, 2026

📝 Walkthrough

Walkthrough

Adds protobuf definitions and Buf config, implements JSON‑RPC and gRPC servers with tests, introduces an API Service and Notifier, adds proof‑forwarding interfaces, splits SQLite reader/writer pools with pagination, and wires a HeightObserver into sync coordinator; main starts servers and handles graceful shutdown.

Changes

Cohort / File(s) Summary
Protobuf Definitions
proto/apex/v1/types.proto, proto/apex/v1/blob.proto, proto/apex/v1/header.proto
New proto3 messages (Blob, Header) and services (BlobService, HeaderService) with go_package options.
Buf & Build
buf.yaml, buf.gen.yaml, justfile
Adds Buf v2 configuration and generator rules for Go gRPC/protobuf output; adds just proto target to run buf generate.
Config
config/config.go, config/load.go
Adds Subscription config and GRPC listen address; sets defaults (GRPC :9090, subscription.buffer_size 64) and validates new fields.
CLI / Entrypoint
cmd/apex/main.go
Wires NewNotifier and NewService, registers WithObserver, starts JSON‑RPC and gRPC servers concurrently, and implements graceful shutdown sequencing.
API Facade & Notifier
pkg/api/service.go, pkg/api/notifier.go, pkg/api/notifier_test.go
New Service facade exposing blob/header operations, MarshalBlob and subscription APIs; Notifier implements buffered, namespace‑filtered, non‑blocking fan‑out with tests.
JSON‑RPC API
pkg/api/jsonrpc/server.go, pkg/api/jsonrpc/blob.go, pkg/api/jsonrpc/header.go, pkg/api/jsonrpc/stubs.go, pkg/api/jsonrpc/server_test.go
Adds JSON‑RPC server and handlers for blob/header namespaces, subscription streaming, stubbed unsupported methods, and comprehensive HTTP tests.
gRPC API
pkg/api/grpc/server.go, pkg/api/grpc/blob_service.go, pkg/api/grpc/header_service.go, pkg/api/grpc/server_test.go
Adds gRPC server factory and BlobService/HeaderService implementations (including streaming Subscribe) with in‑process tests.
Fetch / Proof Forwarding
pkg/fetch/fetcher.go, pkg/fetch/celestia_node.go
Introduces ProofForwarder interface and implements GetProof/Included forwarding in CelestiaNodeFetcher.
Storage: SQLite
pkg/store/sqlite.go, pkg/store/store.go, pkg/store/sqlite_test.go
Splits reader/writer DB pools, adds configure helper, routes reads/writes appropriately, and extends GetBlobs signature to include limit/offset (tests updated).
Sync: Observer Wiring
pkg/sync/coordinator.go, pkg/sync/subscription.go, pkg/sync/backfill.go, pkg/sync/mock_test.go
Adds HeightObserver type and WithObserver option; passes observer into backfiller/subscription manager and invokes after processing a height.
Deps / Tooling
go.mod
Bumps Go toolchain and adds grpc/protobuf dependencies required for generated code and gRPC.
Tests & Mocks
pkg/api/*_test.go, pkg/api/grpc/server_test.go, pkg/api/jsonrpc/server_test.go, pkg/store/*_test.go, pkg/sync/*_test.go
Extensive new and updated unit/integration tests and mocks covering service layer, notifier, JSON‑RPC/gRPC endpoints, storage changes, and proof forwarding.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client (HTTP/WS)
    participant JSONRPC as JSON‑RPC Server
    participant GRPC as gRPC Server
    participant Service as API Service
    participant Store as SQLite Store
    participant Notifier as Notifier
    participant Sync as Sync Coordinator

    rect rgba(100,200,150,0.5)
    Note over Client,Service: Request/Fetch Flow
    Client->>JSONRPC: blob.Get(height, namespace)
    JSONRPC->>Service: BlobGet(height, namespace)
    Service->>Store: GetBlob(height, namespace, commitment)
    Store-->>Service: Blob data
    Service-->>JSONRPC: JSON blob
    JSONRPC-->>Client: response
    end

    rect rgba(150,200,100,0.5)
    Note over Client,Notifier: Subscribe Flow
    Client->>GRPC: Subscribe(namespace)
    GRPC->>Service: BlobSubscribe(namespace)
    Service->>Notifier: Subscribe([namespace])
    Notifier-->>Service: subscription channel
    Service-->>GRPC: streamer
    GRPC->>Client: stream BlobEvents
    end

    rect rgba(200,150,100,0.5)
    Note over Sync,Notifier: Publish Flow
    Sync->>Store: store header & blobs
    Sync->>Notifier: Publish(HeightEvent)
    Notifier->>Notifier: filter by namespace, non-blocking send
    Notifier-->>Client: events to subscribers
    end
Loading
sequenceDiagram
    participant Main as cmd/apex
    participant Config as Config Loader
    participant Store as SQLite Store
    participant Notifier as Notifier
    participant Service as API Service
    participant JSONRPC as JSON‑RPC Server
    participant GRPC as gRPC Server
    participant Sync as Coordinator

    rect rgba(100,150,200,0.5)
    Note over Main,Sync: Startup
    Main->>Config: Load config
    Config-->>Main: config (subscription, grpc_listen_addr, readPoolSize)
    Main->>Store: Open(dbPath, readPoolSize)
    Store-->>Main: reader/writer DBs
    Main->>Notifier: NewNotifier(bufferSize)
    Notifier-->>Main: notifier
    Main->>Service: NewService(store, fetcher, proofForwarder, notifier)
    Service-->>Main: service
    Main->>JSONRPC: NewServer(service)
    Main->>GRPC: NewServer(service)
    Main->>Sync: NewCoordinator(WithObserver(notifier.Publish))
    Main->>JSONRPC: Start (goroutine)
    Main->>GRPC: Start (goroutine)
    Main->>Sync: Run()
    end

    rect rgba(200,100,150,0.5)
    Note over Main,Sync: Shutdown
    Sync-->>Main: Run() returns
    Main->>GRPC: GracefulStop()
    Main->>JSONRPC: Shutdown(ctx)
    Main->>Store: Close()
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰 I nibbled proto fields tonight,

Servers rose and streams took flight,
Events hop from store to peer,
Subscribers cheer, the logs appear,
Hooray — new APIs delight!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 17.86% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title "Feat/phase2 api layer" is concise and clearly indicates the main addition: a Phase 2 API layer implementation.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/phase2-api-layer

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the apex indexer by introducing a robust API layer. It provides both gRPC and an improved JSON-RPC interface for interacting with indexed data. The changes include new configuration parameters for these services, a refactored database layer for better read concurrency, and a new event notification system that allows clients to subscribe to real-time updates. This foundational work enables more efficient and versatile access to the indexer's capabilities.

Highlights

  • API Layer Introduction: Introduced a comprehensive API layer, including both gRPC and an enhanced JSON-RPC interface, to provide flexible data access to the indexer.
  • Configuration Updates: Extended configuration options to support the new API servers (listen addresses) and a configurable buffer size for event subscriptions.
  • SQLite Performance Improvement: Refactored the SQLite store to utilize separate read and write connection pools, optimizing for concurrent API read operations while maintaining write consistency.
  • Event Notification System: Implemented a new Notifier component and integrated it with the sync coordinator, allowing API clients to subscribe to real-time height and blob events.
  • Protobuf Definitions and Generation: Added Protobuf definitions for API types and services, along with buf tooling to automate the generation of Go gRPC and protobuf code.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • buf.gen.yaml
    • Added configuration for buf to generate Go gRPC and protobuf code for API services.
  • buf.yaml
    • Added buf configuration for module path, linting, and breaking change detection.
  • cmd/apex/main.go
    • Imported new net, net/http, time, api, grpcapi, jsonrpcapi, and types packages.
    • Modified store.Open call to include ReadPoolSize.
    • Integrated api.Notifier and api.Service.
    • Updated syncer.New to include an observer hook for publishing HeightEvents.
    • Started JSON-RPC and gRPC servers with graceful shutdown mechanisms.
  • config/config.go
    • Added SubscriptionConfig to the main Config struct.
    • Added ReadPoolSize to StorageConfig.
    • Added GRPCListenAddr to RPCConfig.
    • Defined SubscriptionConfig struct with BufferSize.
    • Updated DefaultConfig to include default values for new configuration fields.
  • config/load.go
    • Added comments and default values for read_pool_size, grpc_listen_addr, and subscription.buffer_size in the example YAML.
    • Added validation for storage.read_pool_size, rpc.grpc_listen_addr, and subscription.buffer_size.
  • go.mod
    • Updated Go version to 1.25.0.
    • Added google.golang.org/grpc and google.golang.org/protobuf dependencies.
  • go.sum
    • Updated module checksums for new and updated dependencies.
  • justfile
    • Added a proto recipe to generate protobuf code using buf generate.
  • pkg/api/grpc/blob_service.go
    • Added a new gRPC service for blob operations, including Get, GetAll, and Subscribe methods.
  • pkg/api/grpc/gen/apex/v1/blob.pb.go
    • Added generated Go code for blob protobuf messages and service definitions.
  • pkg/api/grpc/gen/apex/v1/blob_grpc.pb.go
    • Added generated Go gRPC client and server interfaces for blob service.
  • pkg/api/grpc/gen/apex/v1/header.pb.go
    • Added generated Go code for header protobuf messages and service definitions.
  • pkg/api/grpc/gen/apex/v1/header_grpc.pb.go
    • Added generated Go gRPC client and server interfaces for header service.
  • pkg/api/grpc/gen/apex/v1/types.pb.go
    • Added generated Go code for common API types (Blob, Header) protobuf messages.
  • pkg/api/grpc/header_service.go
    • Added a new gRPC service for header operations, including GetByHeight, LocalHead, NetworkHead, and Subscribe methods.
  • pkg/api/grpc/server.go
    • Added a new gRPC server implementation that registers blob and header services.
  • pkg/api/grpc/server_test.go
    • Added unit tests for the gRPC server and its blob/header services.
  • pkg/api/jsonrpc/blob.go
    • Added JSON-RPC handler for blob operations, including Get, GetAll, Subscribe, GetProof, and Included methods.
  • pkg/api/jsonrpc/header.go
    • Added JSON-RPC handler for header operations, including GetByHeight, LocalHead, NetworkHead, and Subscribe methods.
  • pkg/api/jsonrpc/server.go
    • Added a new JSON-RPC server implementation that registers blob, header, and stub services.
  • pkg/api/jsonrpc/server_test.go
    • Added unit tests for the JSON-RPC server and its blob/header services.
  • pkg/api/jsonrpc/stubs.go
    • Added stub implementations for unsupported JSON-RPC methods (e.g., share, fraud, blobstream).
  • pkg/api/notifier.go
    • Added a new Notifier component for fanning out HeightEvents to subscribers, with namespace filtering and buffer management.
  • pkg/api/notifier_test.go
    • Added unit tests for the Notifier component.
  • pkg/api/service.go
    • Added a new Service layer encapsulating API business logic, interacting with store, fetcher, and notifier.
    • Added MarshalBlob helper for converting stored blobs to Celestia-node compatible JSON.
  • pkg/api/service_test.go
    • Added unit tests for the api.Service layer.
  • pkg/fetch/celestia_node.go
    • Extended blobAPI struct to include GetProof and Included methods.
    • Implemented GetProof and Included methods in CelestiaNodeFetcher to forward requests to the upstream Celestia node.
  • pkg/fetch/fetcher.go
    • Added ProofForwarder interface for proof-related operations.
  • pkg/store/sqlite.go
    • Refactored Open function to create separate writer (single connection) and reader (pooled connections) for SQLite.
    • Updated database interaction methods to use writer or reader as appropriate.
    • Added configureSQLite helper function for database setup.
  • pkg/sync/backfill.go
    • Added observer field to Backfiller struct.
    • Modified processHeight to call the observer after processing a height.
  • pkg/sync/coordinator.go
    • Added HeightObserver type definition.
    • Added observer field to Coordinator struct.
    • Added WithObserver option for the coordinator.
    • Passed the observer to Backfiller and SubscriptionManager.
  • pkg/sync/subscription.go
    • Added observer field to SubscriptionManager struct.
    • Modified processHeader to call the observer after processing a header.
  • proto/apex/v1/blob.proto
    • Added new Protobuf definition for BlobService and related messages (GetBlobRequest, GetAllBlobsRequest, GetAllBlobsResponse, SubscribeBlobsRequest, BlobEvent).
  • proto/apex/v1/header.proto
    • Added new Protobuf definition for HeaderService and related messages (GetHeaderRequest, SubscribeHeadersRequest).
  • proto/apex/v1/types.proto
    • Added new Protobuf definitions for common Blob and Header types.
Activity
  • The pull request introduces a new API layer, including gRPC and enhanced JSON-RPC, which required significant code generation and integration.
  • Configuration for buf was added to manage protobuf generation.
  • Core application logic in cmd/apex/main.go was updated to initialize and run the new API servers and integrate the event notification system.
  • The config package was extended to support new API-related settings.
  • Dependency management files (go.mod, go.sum) were updated to include gRPC and protobuf libraries.
  • The justfile was updated with a new command to facilitate protobuf code generation.
  • New packages (pkg/api/grpc, pkg/api/jsonrpc, pkg/api) were created to house the API services, handlers, and a central event Notifier.
  • The pkg/store/sqlite.go was refactored to improve database concurrency by separating read and write connection pools.
  • The pkg/sync components (Backfiller, Coordinator, SubscriptionManager) were modified to incorporate a HeightObserver for publishing events to the new API layer.
  • Extensive unit tests were added for the new API services and the Notifier.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive API layer for the Apex indexer, including both gRPC and JSON-RPC interfaces, a pub/sub notifier for real-time events, protobuf definitions, and extensive test coverage. It also improves database access to support concurrent reads. However, several Denial of Service (DoS) vulnerabilities were identified in the API handlers due to a lack of input validation and resource limits on requests that accept lists of namespaces. These could be exploited to cause excessive database load or memory exhaustion, and implementing sensible limits on input sizes is recommended before deploying to production. Additionally, there are suggestions for performance and error handling improvements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (11)
justfile (1)

39-41: check target skips buf lint — proto violations won't be caught in CI.

The check target (described as "CI equivalent") runs golangci-lint for Go but never runs buf lint. Since buf.yaml enables STANDARD lint, buf lint would currently fail on header.proto (see that file's review). Additionally, there is no check to ensure generated files in pkg/api/grpc/gen are up to date.

🔧 Suggested additions to `check`
+# Lint proto files
+proto-lint:
+    buf lint
+
+# Verify generated proto files are up-to-date (for CI)
+proto-check: proto
+    git diff --exit-code pkg/api/grpc/gen
+
 # Run all checks (CI equivalent)
-check: tidy-check lint test build
+check: tidy-check proto-check proto-lint lint test build

Also applies to: 43-44

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@justfile` around lines 39 - 41, The CI-equivalent check target currently only
runs golangci-lint and misses protobuf linting and generated-code checks; update
the check target to run buf lint (to catch STANDARD rules like the violation in
header.proto) and add a verification step that runs the proto target (or buf
generate) and fails if generated files in pkg/api/grpc/gen are out of date
(e.g., run just proto/buf generate then ensure no diffs). Modify the existing
check target to invoke "buf lint" and a generation-and-compare step so proto
lint errors and stale generated code cause the check to fail.
pkg/api/jsonrpc/stubs.go (2)

14-15: Minor naming inconsistency: BlobStubs (plural) vs ShareStub, FraudStub, BlobstreamStub (singular).

Pick one convention — singular is more idiomatic for a Go type name.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/jsonrpc/stubs.go` around lines 14 - 15, The type name BlobStubs is
inconsistent with the other singular stub types (ShareStub, FraudStub,
BlobstreamStub); rename BlobStubs to BlobStub and update all references/usages
accordingly (constructors, variable declarations, tests, and any method
receivers tied to BlobStubs) so the type name follows the singular convention
like ShareStub, FraudStub, and BlobstreamStub.

9-12: Prefer errors.New for static sentinel errors.

fmt.Errorf is unnecessary when there are no format verbs. errors.New is more idiomatic and avoids the implicit dependency on fmt.

Suggested fix
+	"errors"
-	"fmt"
 
 var (
-	errNotSupported = fmt.Errorf("method not supported by apex indexer")
-	errReadOnly     = fmt.Errorf("apex is a read-only indexer, blob submission not supported")
+	errNotSupported = errors.New("method not supported by apex indexer")
+	errReadOnly     = errors.New("apex is a read-only indexer, blob submission not supported")
 )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/jsonrpc/stubs.go` around lines 9 - 12, Replace the sentinel errors
errNotSupported and errReadOnly in pkg/api/jsonrpc/stubs.go to use errors.New
instead of fmt.Errorf (i.e., errNotSupported = errors.New("method not supported
by apex indexer") and errReadOnly = errors.New("apex is a read-only indexer,
blob submission not supported")), and update imports to remove "fmt" and add
"errors" (or keep "fmt" only if used elsewhere).
pkg/sync/coordinator.go (1)

19-21: height parameter is redundant with header.Height.

HeightObserver receives both height uint64 and header *types.Headerheader.Height already carries the same value. This isn't a bug, but it widens the API surface unnecessarily and could lead to inconsistency if callers pass mismatched values.

Consider simplifying to func(header *types.Header, blobs []types.Blob).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sync/coordinator.go` around lines 19 - 21, HeightObserver currently takes
a redundant height uint64 plus header *types.Header and blobs []types.Blob,
which can cause inconsistency; change the type signature of HeightObserver to
accept only (header *types.Header, blobs []types.Blob), update all
callers/registerers to stop passing the separate height value and instead use
header.Height where needed, and adjust any implementations of HeightObserver to
the new two-argument form (search for the HeightObserver type definition and all
references to it).
pkg/api/grpc/server.go (1)

12-19: Consider adding gRPC server options for production readiness.

The server is created with grpc.NewServer() using no options. A few things to consider for follow-up:

  • Max message size: Default 4 MB receive limit may be too small for large blobs. Use grpc.MaxRecvMsgSize(...).
  • Recovery interceptor: An unhandled panic in a handler will crash the process. A recovery interceptor (e.g., grpc-ecosystem/go-grpc-middleware) gracefully returns an Internal error instead.
  • Reflection: Registering reflection.Register(srv) enables debugging with grpcurl.

Not blocking, but worth tracking.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/grpc/server.go` around lines 12 - 19, The gRPC server is created
without production-ready options in NewServer; update NewServer to configure
sensible server options: set MaxRecvMsgSize and MaxSendMsgSize via
grpc.MaxRecvMsgSize(...) / grpc.MaxSendMsgSize(...) to allow larger blobs, add
recovery middleware as a UnaryInterceptor/StreamInterceptor (e.g., using
grpc-ecosystem/go-grpc-middleware's recovery) to convert panics into Internal
errors, and register reflection (reflection.Register(srv)) so tools like grpcurl
can inspect services (ensure these options are applied before registering
BlobServiceServer and HeaderServiceServer).
proto/apex/v1/blob.proto (1)

10-19: Buf lint violations: RPC request/response naming and missing response wrapper for Get.

Buf reports RPC_REQUEST_STANDARD_NAME and RPC_RESPONSE_STANDARD_NAME errors on all three RPCs. Beyond naming, Get returns Blob directly — this prevents adding response-level metadata (e.g., proof status, pagination) later without a breaking change.

Consider wrapping the response and adopting Buf's naming convention:

Proposed fix
 service BlobService {
-  rpc Get(GetBlobRequest) returns (Blob);
-  rpc GetAll(GetAllBlobsRequest) returns (GetAllBlobsResponse);
-  rpc Subscribe(SubscribeBlobsRequest) returns (stream BlobEvent);
+  rpc Get(BlobServiceGetRequest) returns (BlobServiceGetResponse);
+  rpc GetAll(BlobServiceGetAllRequest) returns (BlobServiceGetAllResponse);
+  rpc Subscribe(BlobServiceSubscribeRequest) returns (stream BlobServiceSubscribeResponse);
 }
 
-message GetBlobRequest {
+message BlobServiceGetRequest {
   uint64 height = 1;
   bytes namespace = 2;
   bytes commitment = 3;
 }
 
-message GetAllBlobsRequest {
+message BlobServiceGetResponse {
+  Blob blob = 1;
+}
+
+message BlobServiceGetAllRequest {
   uint64 height = 1;
   repeated bytes namespaces = 2;
 }
 
-message GetAllBlobsResponse {
+message BlobServiceGetAllResponse {
   repeated Blob blobs = 1;
 }
 
-message SubscribeBlobsRequest {
+message BlobServiceSubscribeRequest {
   repeated bytes namespaces = 1;
 }
 
-message BlobEvent {
+message BlobServiceSubscribeResponse {
   uint64 height = 1;
   repeated Blob blobs = 2;
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@proto/apex/v1/blob.proto` around lines 10 - 19, Buf lint requires RPC
request/response names to follow the RPCNameRequest/RPCNameResponse pattern and
the Get RPC must return a response wrapper instead of a raw Blob; rename and
adjust messages accordingly: rename GetBlobRequest -> GetRequest and replace
Get's response Blob with a wrapper message GetResponse that contains a Blob
field; rename GetAllBlobsRequest -> GetAllRequest and GetAllBlobsResponse ->
GetAllResponse (preserve fields), and rename SubscribeBlobsRequest ->
SubscribeRequest and the streamed response type to SubscribeResponse (or
SubscribeStreamingResponse) to follow RPCNameResponse naming; update the service
RPC signatures (Get, GetAll, Subscribe) to reference the new message names and
update all code/refs that use the old message names.
pkg/api/grpc/server_test.go (1)

124-164: startTestServer and startTestHeaderServer are nearly identical; consider unifying.

Both helpers create the same gRPC server (with both services registered) but return different client types. A single helper returning both clients (or accepting a callback) would reduce duplication.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/grpc/server_test.go` around lines 124 - 164, startTestServer and
startTestHeaderServer duplicate setup code for creating the gRPC server and
connection; consolidate them into a single helper (e.g., startTestClients or
startTestServerWithClients) that calls NewServer and reuses the same
listener/connection setup, then either returns both clients
(pb.NewBlobServiceClient and pb.NewHeaderServiceClient) or accepts a callback to
receive the connection and construct clients there; ensure t.Cleanup still calls
srv.GracefulStop and conn.Close and replace usages of
startTestServer/startTestHeaderServer with the unified helper.
pkg/api/grpc/blob_service.go (2)

23-42: GetByHeight error from store mapped uniformly to Internal; consider distinguishing NotFound.

Line 32 maps all GetBlobs errors to codes.Internal. If the store returns a "not found"-style error for the height (unlikely for GetBlobs which returns empty slices, but possible for future store implementations), it would be more appropriate to map it to codes.NotFound. Current behavior is acceptable given the store contract, but worth a note for robustness.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/grpc/blob_service.go` around lines 23 - 42, The Get handler currently
maps every error from s.svc.Store().GetBlobs to codes.Internal; update Get to
detect a not-found-style error (compare err to store.ErrNotFound or use
errors.Is(err, store.ErrNotFound)) and return status.Error(codes.NotFound,
err.Error()) in that case, otherwise keep returning
status.Errorf(codes.Internal, "get blobs: %v", err); touch the Get function and
its s.svc.Store().GetBlobs error handling and use errors.Is/store.ErrNotFound
when available.

107-117: Narrowing conversion intint32 on Index (Line 115).

types.Blob.Index is int (platform-dependent, 64-bit on most targets) while the proto field is int32. If an index ever exceeds math.MaxInt32, this silently truncates. Blob indices are practically small, but a bounds check or using int64 in the proto would be more defensive.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/grpc/blob_service.go` around lines 107 - 117, The conversion in
blobToProto silently narrows types by casting types.Blob.Index (int) to int32
for pb.Blob.Index; add an explicit bounds check against math.MaxInt32 and
math.MinInt32 and handle overflow instead of truncating: update blobToProto to
return (*pb.Blob, error), import math, check if b.Index is outside int32 range
and return a clear error when it is, otherwise safely cast to int32 and return
the proto; alternatively, if changing the proto is preferred, change
pb.Blob.Index to int64 and remove the cast—reference the blobToProto function
and the Index field when making the change.
pkg/api/service_test.go (1)

16-114: Duplicated mock implementations across three test packages.

mockStore, mockFetcher, and testNamespace are nearly identical in pkg/api/service_test.go, pkg/api/grpc/server_test.go, and pkg/api/jsonrpc/server_test.go. Consider extracting them into a shared internal test helper package (e.g., pkg/api/apitest or pkg/internal/testutil) to reduce maintenance burden and keep the mocks in sync as the Store / DataFetcher interfaces evolve.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/service_test.go` around lines 16 - 114, The three duplicated mocks
(mockStore, mockFetcher, and testNamespace) should be moved into a single
internal test helper package (e.g., apitest or testutil) and imported by the
three test packages: create a new package that exposes the constructors and
types (e.g., NewMockStore, NewMockFetcher, TestNamespace) and move the
implementations of mockStore, mockFetcher, Get/Put methods, and testNamespace
there, update existing tests to import and use those constructors instead of
local duplicates, and ensure exported names match usage (or use package-level
helpers) so the Store/DataFetcher interface implementations remain identical and
compile without changing production code.
cmd/apex/main.go (1)

173-200: JSON-RPC and gRPC server startup failures are silently swallowed.

If httpSrv.ListenAndServe() (line 183) or grpcSrv.Serve(lis) (line 197) fails immediately (e.g., address already in use), the error is only logged and the indexer continues running without that API. This creates an operational risk: since Apex is positioned as a "drop-in replacement for celestia-node," a silently unavailable API layer could break client expectations without clear indication to operators.

Note: net.Listen() at line 190 is synchronous and correctly returns early on bind failure, but the subsequent Serve() call lacks equivalent protection.

Consider using error channels to propagate early startup failures back to the main goroutine so runIndexer can abort if critical services fail to start.

Sketch: propagate early startup errors
+	rpcErrCh := make(chan error, 1)
 	go func() {
 		log.Info().Str("addr", cfg.RPC.ListenAddr).Msg("JSON-RPC server listening")
 		if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
-			log.Error().Err(err).Msg("JSON-RPC server error")
+			rpcErrCh <- err
 		}
 	}()

+	grpcErrCh := make(chan error, 1)
 	go func() {
 		log.Info().Str("addr", cfg.RPC.GRPCListenAddr).Msg("gRPC server listening")
 		if err := grpcSrv.Serve(lis); err != nil {
-			log.Error().Err(err).Msg("gRPC server error")
+			grpcErrCh <- err
 		}
 	}()

Then add early error detection (e.g., select on channels with short timeout before coordinator starts).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/apex/main.go` around lines 173 - 200, The HTTP and gRPC servers
(httpSrv.ListenAndServe and grpcSrv.Serve) currently only log errors in their
goroutines so immediate startup failures are swallowed; update runIndexer to
create an error channel (e.g., startupErrCh) and have the goroutines send any
returned error to that channel if ListenAndServe or Serve returns non-nil
immediately; in the main runIndexer flow, select on startupErrCh (with optional
short timeout) before proceeding to the rest of initialization and return the
error if one is received so the process aborts on early bind/serve failures;
ensure you still treat http.ErrServerClosed as non-fatal when deciding what to
send on the channel.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/apex/main.go`:
- Around line 208-216: GracefulStop() on grpcSrv can hang forever for active
streaming RPCs; wrap the graceful shutdown in a goroutine and use a timeout
fallback that calls grpcSrv.Stop() if GracefulStop doesn't finish within the
same 5s window used by the HTTP shutdown. Specifically, start GracefulStop() in
a goroutine, wait on a done channel or select with time.After(5*time.Second) (or
reuse shutdownCtx) and call grpcSrv.Stop() on timeout to force termination;
reference grpcSrv.GracefulStop and grpcSrv.Stop in your changes.

In `@config/load.go`:
- Around line 115-117: Add the same presence validation for the JSON-RPC listen
address as for gRPC: check cfg.RPC.ListenAddr and return an error if empty
(e.g., fmt.Errorf("rpc.listen_addr is required")). Update the load/config
validation near the existing check for cfg.RPC.GRPCListenAddr so both
cfg.RPC.ListenAddr and cfg.RPC.GRPCListenAddr are validated before proceeding
(referencing symbols cfg.RPC.ListenAddr and cfg.RPC.GRPCListenAddr).

In `@pkg/api/grpc/header_service.go`:
- Around line 23-29: GetByHeight (and similarly LocalHead) currently maps
store.ErrNotFound to codes.Internal; update the error handling in GetByHeight
and LocalHead to check for store.ErrNotFound and return a gRPC NotFound status
(e.g., status.Errorf(codes.NotFound, "...: %v", err) or
status.Error(codes.NotFound, err)) instead of codes.Internal, preserving the
existing message format for other errors and leaving successful returns
unchanged; refer to the GetByHeight and LocalHead methods and the
store.ErrNotFound sentinel when making the change.

In `@pkg/api/grpc/server_test.go`:
- Around line 309-312: The spin-wait loop using notifier.SubscriberCount() lacks
an upper-bound and can hang tests; modify the wait in the test (around
notifier.SubscriberCount()) to use a bounded wait with a timeout—e.g., create a
deadline using time.After or a context with timeout and poll SubscriberCount()
in a loop that breaks on either SubscriberCount() > 0 or the timeout firing, and
fail the test (t.Fatalf or t.Fatalf-like assertion) if the timeout elapses
without a subscriber; ensure you update the test's waiting logic only in the
code that references notifier.SubscriberCount() and the surrounding stream
setup.

In `@pkg/api/jsonrpc/server.go`:
- Around line 16-20: The blob namespace currently registers only BlobHandler, so
BlobStubs' methods (GetCommitmentProof, Submit) that return user-friendly errors
are never exposed; either register BlobStubs alongside BlobHandler
(srv.Register("blob", &BlobStubs{}) or similar) or move/forward BlobStubs'
methods into BlobHandler (implement GetCommitmentProof and Submit on BlobHandler
to call the BlobStubs logic) so calls to blob.GetCommitmentProof and blob.Submit
return the intended errNotSupported/errReadOnly messages instead of "method not
found".

In `@pkg/api/notifier.go`:
- Around line 95-135: Publish currently takes a read lock (n.mu.RLock()) but
mutates sub.lastHeight, creating a latent data race; fix by either (A) making
lastHeight an atomic.Uint64 on the subscriber type and replace all reads/writes
of sub.lastHeight in Publish (and any other places) with atomic Load/Store
operations, or (B) change Publish to take a full write lock (replace
n.mu.RLock()/RUnlock() with n.mu.Lock()/Unlock()) so mutations to sub.lastHeight
are protected; refer to Publish, sub.lastHeight, n.mu.RLock/RUnlock and
n.mu.Lock/Unlock (or atomic.Uint64) to locate the changes.

In `@pkg/store/sqlite.go`:
- Around line 270-277: The Close method on SQLiteStore currently returns only
rErr and drops wErr when both fail; update SQLiteStore.Close to combine both
errors using errors.Join: call s.reader.Close() and s.writer.Close(), then if
both errors are non-nil return errors.Join(rErr, wErr), otherwise return
whichever is non-nil (or nil). Ensure the errors package is imported so
errors.Join can be used and keep function name SQLiteStore.Close and calls to
s.reader.Close and s.writer.Close unchanged.

In `@proto/apex/v1/header.proto`:
- Around line 10-29: The proto currently reuses Header and google.protobuf.Empty
across all RPCs which violates STANDARD lint rules (RPC_REQUEST_RESPONSE_UNIQUE
and RPC_*_STANDARD_NAME); update HeaderService by creating unique
request/response wrapper messages named after each RPC (e.g.,
GetByHeightRequest/GetByHeightResponse, LocalHeadRequest/LocalHeadResponse,
NetworkHeadRequest/NetworkHeadResponse, SubscribeRequest/SubscribeResponse) and
have each rpc use its corresponding wrapper (e.g., rpc
GetByHeight(GetByHeightRequest) returns (GetByHeightResponse); where
GetByHeightResponse contains the Header), adjust Subscribe to return stream
SubscribeResponse, stop using google.protobuf.Empty (remove its import) and
delete or rename existing GetHeaderRequest/SubscribeHeadersRequest to the new
RPC-specific types so each RPC has its own request and response type.

---

Nitpick comments:
In `@cmd/apex/main.go`:
- Around line 173-200: The HTTP and gRPC servers (httpSrv.ListenAndServe and
grpcSrv.Serve) currently only log errors in their goroutines so immediate
startup failures are swallowed; update runIndexer to create an error channel
(e.g., startupErrCh) and have the goroutines send any returned error to that
channel if ListenAndServe or Serve returns non-nil immediately; in the main
runIndexer flow, select on startupErrCh (with optional short timeout) before
proceeding to the rest of initialization and return the error if one is received
so the process aborts on early bind/serve failures; ensure you still treat
http.ErrServerClosed as non-fatal when deciding what to send on the channel.

In `@justfile`:
- Around line 39-41: The CI-equivalent check target currently only runs
golangci-lint and misses protobuf linting and generated-code checks; update the
check target to run buf lint (to catch STANDARD rules like the violation in
header.proto) and add a verification step that runs the proto target (or buf
generate) and fails if generated files in pkg/api/grpc/gen are out of date
(e.g., run just proto/buf generate then ensure no diffs). Modify the existing
check target to invoke "buf lint" and a generation-and-compare step so proto
lint errors and stale generated code cause the check to fail.

In `@pkg/api/grpc/blob_service.go`:
- Around line 23-42: The Get handler currently maps every error from
s.svc.Store().GetBlobs to codes.Internal; update Get to detect a not-found-style
error (compare err to store.ErrNotFound or use errors.Is(err,
store.ErrNotFound)) and return status.Error(codes.NotFound, err.Error()) in that
case, otherwise keep returning status.Errorf(codes.Internal, "get blobs: %v",
err); touch the Get function and its s.svc.Store().GetBlobs error handling and
use errors.Is/store.ErrNotFound when available.
- Around line 107-117: The conversion in blobToProto silently narrows types by
casting types.Blob.Index (int) to int32 for pb.Blob.Index; add an explicit
bounds check against math.MaxInt32 and math.MinInt32 and handle overflow instead
of truncating: update blobToProto to return (*pb.Blob, error), import math,
check if b.Index is outside int32 range and return a clear error when it is,
otherwise safely cast to int32 and return the proto; alternatively, if changing
the proto is preferred, change pb.Blob.Index to int64 and remove the
cast—reference the blobToProto function and the Index field when making the
change.

In `@pkg/api/grpc/server_test.go`:
- Around line 124-164: startTestServer and startTestHeaderServer duplicate setup
code for creating the gRPC server and connection; consolidate them into a single
helper (e.g., startTestClients or startTestServerWithClients) that calls
NewServer and reuses the same listener/connection setup, then either returns
both clients (pb.NewBlobServiceClient and pb.NewHeaderServiceClient) or accepts
a callback to receive the connection and construct clients there; ensure
t.Cleanup still calls srv.GracefulStop and conn.Close and replace usages of
startTestServer/startTestHeaderServer with the unified helper.

In `@pkg/api/grpc/server.go`:
- Around line 12-19: The gRPC server is created without production-ready options
in NewServer; update NewServer to configure sensible server options: set
MaxRecvMsgSize and MaxSendMsgSize via grpc.MaxRecvMsgSize(...) /
grpc.MaxSendMsgSize(...) to allow larger blobs, add recovery middleware as a
UnaryInterceptor/StreamInterceptor (e.g., using
grpc-ecosystem/go-grpc-middleware's recovery) to convert panics into Internal
errors, and register reflection (reflection.Register(srv)) so tools like grpcurl
can inspect services (ensure these options are applied before registering
BlobServiceServer and HeaderServiceServer).

In `@pkg/api/jsonrpc/stubs.go`:
- Around line 14-15: The type name BlobStubs is inconsistent with the other
singular stub types (ShareStub, FraudStub, BlobstreamStub); rename BlobStubs to
BlobStub and update all references/usages accordingly (constructors, variable
declarations, tests, and any method receivers tied to BlobStubs) so the type
name follows the singular convention like ShareStub, FraudStub, and
BlobstreamStub.
- Around line 9-12: Replace the sentinel errors errNotSupported and errReadOnly
in pkg/api/jsonrpc/stubs.go to use errors.New instead of fmt.Errorf (i.e.,
errNotSupported = errors.New("method not supported by apex indexer") and
errReadOnly = errors.New("apex is a read-only indexer, blob submission not
supported")), and update imports to remove "fmt" and add "errors" (or keep "fmt"
only if used elsewhere).

In `@pkg/api/service_test.go`:
- Around line 16-114: The three duplicated mocks (mockStore, mockFetcher, and
testNamespace) should be moved into a single internal test helper package (e.g.,
apitest or testutil) and imported by the three test packages: create a new
package that exposes the constructors and types (e.g., NewMockStore,
NewMockFetcher, TestNamespace) and move the implementations of mockStore,
mockFetcher, Get/Put methods, and testNamespace there, update existing tests to
import and use those constructors instead of local duplicates, and ensure
exported names match usage (or use package-level helpers) so the
Store/DataFetcher interface implementations remain identical and compile without
changing production code.

In `@pkg/sync/coordinator.go`:
- Around line 19-21: HeightObserver currently takes a redundant height uint64
plus header *types.Header and blobs []types.Blob, which can cause inconsistency;
change the type signature of HeightObserver to accept only (header
*types.Header, blobs []types.Blob), update all callers/registerers to stop
passing the separate height value and instead use header.Height where needed,
and adjust any implementations of HeightObserver to the new two-argument form
(search for the HeightObserver type definition and all references to it).

In `@proto/apex/v1/blob.proto`:
- Around line 10-19: Buf lint requires RPC request/response names to follow the
RPCNameRequest/RPCNameResponse pattern and the Get RPC must return a response
wrapper instead of a raw Blob; rename and adjust messages accordingly: rename
GetBlobRequest -> GetRequest and replace Get's response Blob with a wrapper
message GetResponse that contains a Blob field; rename GetAllBlobsRequest ->
GetAllRequest and GetAllBlobsResponse -> GetAllResponse (preserve fields), and
rename SubscribeBlobsRequest -> SubscribeRequest and the streamed response type
to SubscribeResponse (or SubscribeStreamingResponse) to follow RPCNameResponse
naming; update the service RPC signatures (Get, GetAll, Subscribe) to reference
the new message names and update all code/refs that use the old message names.

tac0turtle and others added 2 commits February 19, 2026 14:46
SQLite WAL readers don't benefit beyond ~8 connections. Hardcode the
pool ceiling and derive the size from runtime.NumCPU() instead of
exposing a config knob that nobody needs to tune.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…pping

- Fix proto lint: unique request/response types per RPC (buf STANDARD)
- Add limit/offset pagination to gRPC GetAll (store + proto)
- Restrict gRPC Subscribe to single namespace
- Fix Notifier data race: Publish uses write lock instead of read lock
- Map store.ErrNotFound to gRPC codes.NotFound in header service
- Move BlobStubs methods onto BlobHandler for proper registration
- Use errors.Join in SQLiteStore.Close
- Add rpc.listen_addr validation
- Add timeout to gRPC GracefulStop (5s fallback to Stop)
- Add deadline to spin-wait in subscribe test
- Cap gRPC GetAll namespaces at 16 (JSON-RPC uncapped for compat)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (9)
config/config.go (1)

32-35: Consider making the gRPC namespace cap configurable.

The PR objectives note that GetAll caps gRPC requests at 16 namespaces (JSON-RPC is uncapped). Hardcoding this limit in the API layer rather than exposing it through RPCConfig (e.g., MaxNamespaces int) makes the value invisible to operators and requires a code change to tune. Given the asymmetry with JSON-RPC, surfacing it here as a configurable field with a documented default would improve operational flexibility.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@config/config.go` around lines 32 - 35, The RPC gRPC namespace cap is
hardcoded in the API layer (affecting GetAll) and should be exposed in RPCConfig
so operators can tune it; add a MaxNamespaces int field to RPCConfig with a
sensible default (documented in comments/README), wire that config through where
GetAll enforces the 16-namespace limit (e.g., pass or read
RPCConfig.MaxNamespaces in the code path that implements GetAll) and ensure any
existing hardcoded constant (16) is replaced by using RPCConfig.MaxNamespaces.
pkg/api/grpc/server_test.go (2)

21-121: mockStore, mockFetcher, and testNamespace are verbatim copies of the equivalents in pkg/api/jsonrpc/server_test.go.

Extracting these into a shared internal test package (e.g., pkg/api/internal/testutil) would eliminate the duplication. Additionally, startTestServer and startTestHeaderServer (Lines 123–163) are nearly identical — a single generic helper or a combined return struct could consolidate them.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/grpc/server_test.go` around lines 21 - 121, Duplicate test helpers
(mockStore, mockFetcher, testNamespace) and nearly identical
startTestServer/startTestHeaderServer should be extracted to a shared internal
test package; create pkg/api/internal/testutil with exported helpers
NewMockStore, NewMockFetcher, TestNamespace and a unified StartTestServers (or
StartTestServerPair) that returns the servers/clients needed, then replace the
local definitions in both pkg/api/grpc/server_test.go and
pkg/api/jsonrpc/server_test.go to import and use those helpers (reference
symbols: mockStore, mockFetcher, testNamespace, startTestServer,
startTestHeaderServer).

165-288: TestGRPCBlobGet, TestGRPCBlobGetAll, and the three header tests can be consolidated into table-driven tests.

All five tests follow the same seed-service-call-assert pattern. A table-driven approach would reduce boilerplate and is required by the project's test guidelines.

As per coding guidelines, "Use table-driven tests pattern for test implementation".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/grpc/server_test.go` around lines 165 - 288, Several tests
(TestGRPCBlobGet, TestGRPCBlobGetAll, TestGRPCHeaderGetByHeight,
TestGRPCHeaderLocalHead, TestGRPCHeaderNetworkHead) repeat the same seed →
service → client call → assert pattern; convert them to table-driven subtests.
Create a table (slice of test cases) where each case has a name, seed data
(e.g., blobs or headers), the request to send, which server starter to use
(startTestServer or startTestHeaderServer), and expected assertions (expected
height, data, count, etc.), then loop with t.Run(c.name, func(t *testing.T){ set
up newMockStore(), seed st using c.seed (refer to st.blobs and st.headers),
create notifier and svc via api.NewService, start client via the appropriate
helper, call the RPC (client.Get, GetAll, GetByHeight, LocalHead, NetworkHead)
and run the existing checks for that case }); factor repeated setup into small
helpers (e.g., makeServiceAndClient or seedStore) referenced by function names
in the diff to keep tests concise and readable.
pkg/api/jsonrpc/server_test.go (2)

21-121: mockStore, mockFetcher, and testNamespace are duplicated verbatim in pkg/api/grpc/server_test.go.

Both test packages maintain identical copies of these three helpers. Consider extracting them into a shared internal test package (e.g., pkg/api/internal/testutil) so future changes (e.g., adding a new Store method) only need to be made once.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/jsonrpc/server_test.go` around lines 21 - 121, Extract the duplicated
test helpers (mockStore type and its methods, mockFetcher type and its methods,
and testNamespace) into a shared internal test package (e.g.,
pkg/api/internal/testutil) and replace the copies in
pkg/api/jsonrpc/server_test.go and pkg/api/grpc/server_test.go with imports from
that package; move all related helper functions and types into that package,
keep signatures unchanged (mockStore, mockFetcher, testNamespace), update the
test files to import testutil and reference
testutil.mockStore/testutil.mockFetcher/testutil.testNamespace (or exported
names if you choose to export), and run tests to ensure no missing methods
(export any symbols needed by both packages).

179-241: TestJSONRPCHeaderGetByHeight/LocalHead/NetworkHead can be consolidated into a table-driven test.

The three header tests follow the same structure: seed store, build service, call RPC, check result. Grouping them under a TestJSONRPCHeaderMethods table test would reduce boilerplate and align with the project's table-driven tests guideline (already followed by TestJSONRPCStubMethods).

As per coding guidelines, "Use table-driven tests pattern for test implementation".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/jsonrpc/server_test.go` around lines 179 - 241, Consolidate the three
similar tests (TestJSONRPCHeaderGetByHeight, TestJSONRPCHeaderLocalHead,
TestJSONRPCHeaderNetworkHead) into a single table-driven test (e.g.,
TestJSONRPCHeaderMethods) that iterates cases with fields: name, setup (store
headers/syncState or mockFetcher.networkHead), rpcMethod (e.g.,
"header.GetByHeight", "header.LocalHead", "header.NetworkHead"), rpcParams
(uint64(42) only for GetByHeight), and expectedRawJSON. For each case create the
store via newMockStore or mockFetcher as needed, build service with
api.NewService(st, ft, ..., notifier, zerolog.Nop()) and server with NewServer,
call doRPC(srv, rpcMethod, params...), and assert resp.Error is nil and
string(resp.Result) equals expectedRawJSON; this removes duplicated boilerplate
while keeping the same assertions and setups.
pkg/store/sqlite_test.go (1)

105-105: Consider adding pagination test coverage for non-zero limit/offset.

All three updated call sites pass 0, 0 (no-limit, no-offset), leaving the new pagination feature entirely untested at the SQLite level. A table-driven test verifying limit > 0 and offset > 0 semantics would confirm correct SQL construction and results.

Also applies to: 152-152, 275-275

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/sqlite_test.go` at line 105, The tests in sqlite_test.go call
s.GetBlobs(ctx, ns, 10, 11, 0, 0) only with limit=0/offset=0, so pagination SQL
is untested; add a table-driven subtest in the same file that calls s.GetBlobs
with a variety of limit>0 and offset>0 combinations (e.g. limit=2 offset=0,
limit=2 offset=1, limit larger than remaining, offset at end) and assert
err==nil, the number of returned blobs matches expected, and the returned blob
ordering/IDs match the expected slice for each case; reference the s.GetBlobs
method and reuse existing fixture data inserted earlier in the test to compute
expected results so the assertions validate that limit/offset are applied
correctly.
proto/apex/v1/blob.proto (1)

31-38: Consider using uint32 for pagination fields to prevent negative values at the wire level.

int32 allows negative limit and offset values from clients. While the server should validate, using uint32 would make the contract self-documenting and prevent a class of invalid requests.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@proto/apex/v1/blob.proto` around lines 31 - 38, Change the pagination fields
in the GetAllBlobsRequest message so they use unsigned types to prevent negative
values at the wire level: update the fields limit and offset from int32 to
uint32 in message GetAllBlobsRequest, regenerate protobuf artifacts, and then
update any call sites, validators, and handlers that assume signed ints (e.g.,
request parsing in RPC handlers that reference GetAllBlobsRequest.limit or
.offset) to use the new unsigned type or cast/validate as needed.
pkg/api/grpc/blob_service.go (1)

120-127: Duplicate bytesToNamespace helper across packages.

This exact function also exists in pkg/api/jsonrpc/blob.go (Line 96). Consider extracting it to a shared location (e.g., pkg/types) to avoid drift.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/grpc/blob_service.go` around lines 120 - 127, The bytesToNamespace
helper is duplicated; extract the logic from the bytesToNamespace function into
a single shared utility in a common package (e.g., add a new exported function
BytesToNamespace in pkg/types or an existing pkg/types file) and replace both
bytesToNamespace implementations (the one in blob_service.go and the one in
pkg/api/jsonrpc/blob.go) to call that shared BytesToNamespace helper; ensure the
new function preserves the same signature/behavior (checking len ==
types.NamespaceSize, copying into types.Namespace, returning error) and update
imports where needed.
pkg/store/sqlite.go (1)

75-86: Configure PRAGMAs via DSN to ensure all connections inherit the settings.

database/sql manages a connection pool, and Exec("PRAGMA ...") applies settings only to the connection that executes it. New connections from the pool won't inherit busy_timeout or foreign_keys (which are per-connection settings). journal_mode=WAL is database-level and persists, so it's unaffected; however, the reader pool will only have busy_timeout and foreign_keys on the first connection used.

While WAL mode mitigates practical risk (readers rarely encounter SQLITE_BUSY and foreign_keys only affects writes), use modernc.org/sqlite's DSN _pragma parameter for correctness:

file:path?_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)&_pragma=foreign_keys(1)

This ensures all connections are properly configured without needing configureSQLite() for per-connection settings.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/sqlite.go` around lines 75 - 86, The configureSQLite function
currently sets PRAGMAs with db.Exec which only affects the single connection
used; instead add the per-connection PRAGMA settings to the SQLite DSN using
modernc.org/sqlite's _pragma parameters so every pooled connection inherits them
(e.g. _pragma=busy_timeout(5000)&_pragma=foreign_keys(1) and
_pragma=journal_mode(WAL) if desired), and simplify or remove the db.Exec calls
in configureSQLite; update the codepath that opens the DB (the caller that
constructs the DSN before sql.Open) to include these _pragma entries so
configureSQLite no longer relies on per-connection Exec to set busy_timeout and
foreign_keys.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/apex/main.go`:
- Around line 188-200: If net.Listen for gRPC (cfg.RPC.GRPCListenAddr) fails,
the previously started HTTP server goroutine must be shut down to avoid leaking
sockets/goroutines; modify the error path after net.Listen to call the HTTP
server's shutdown routine (e.g., httpSrv.Shutdown with a context timeout) and
handle/log any shutdown error before returning the net.Listen error. Locate the
HTTP server instance started earlier (httpSrv or similar) and use
context.WithTimeout to perform a graceful shutdown, optionally using a
sync.WaitGroup or channel to wait for the HTTP goroutine to exit before
returning.

In `@pkg/api/grpc/blob_service.go`:
- Around line 59-66: The current loop calls s.svc.Store().GetBlobs per namespace
applying req.Limit and req.Offset per-namespace; change this to collect all
results first (append to allBlobs inside the loop without passing per-namespace
limit/offset), then apply global pagination using req.Offset and req.Limit on
the aggregated allBlobs slice (use safe bounds checks when slicing). Make the
same change in BlobGetAll (service.go) so both gRPC and JSON-RPC return a
globally paginated result and ensure error handling and any total-count
reporting are updated accordingly.

In `@pkg/api/service.go`:
- Around line 58-82: BlobGetAll currently passes the per-request limit and
offset into each s.store.GetBlobs call, causing pagination to be applied
per-namespace; instead fetch all matching blobs per namespace (or fetch with
no/very large paging) into allBlobs, then apply the requested offset and limit
to the aggregated allBlobs slice before marshalling and returning. Update
BlobGetAll to call s.store.GetBlobs without passing the incoming limit/offset
(or use a sentinel to request all), concatenate into allBlobs, compute start :=
min(offset, len(allBlobs)) and end := min(start+limit, len(allBlobs)) (handling
zero/unbounded limit), slice allBlobs[start:end], then marshal that sliced
result; keep the existing error handling and use MarshalBlob for each item.

In `@proto/apex/v1/header.proto`:
- Line 21: The RPC Subscribe (rpc Subscribe) currently uses wrapper message
types named SubscribeHeadersRequest and SubscribeHeadersResponse which violates
Buf's RPC_*_STANDARD_NAME rule; rename those messages to follow the RPC naming
convention (e.g., SubscribeRequest and SubscribeResponse or prefixed with the
service name like HeaderServiceSubscribeRequest/HeaderServiceSubscribeResponse),
update the rpc signature (rpc Subscribe(SubscribeRequest) returns (stream
SubscribeResponse);) and update all references/usages of SubscribeHeadersRequest
and SubscribeHeadersResponse throughout the proto so the names match the new
message identifiers.

---

Duplicate comments:
In `@pkg/store/sqlite.go`:
- Around line 281-283: Update SQLiteStore.Close to combine both pool Close
errors using errors.Join by calling errors.Join(s.reader.Close(),
s.writer.Close()) in the SQLiteStore.Close method; ensure the "errors" package
is imported and add nil checks for s.reader and s.writer (call Close only if
non-nil) to avoid nil dereferences before joining their returned errors.

---

Nitpick comments:
In `@config/config.go`:
- Around line 32-35: The RPC gRPC namespace cap is hardcoded in the API layer
(affecting GetAll) and should be exposed in RPCConfig so operators can tune it;
add a MaxNamespaces int field to RPCConfig with a sensible default (documented
in comments/README), wire that config through where GetAll enforces the
16-namespace limit (e.g., pass or read RPCConfig.MaxNamespaces in the code path
that implements GetAll) and ensure any existing hardcoded constant (16) is
replaced by using RPCConfig.MaxNamespaces.

In `@pkg/api/grpc/blob_service.go`:
- Around line 120-127: The bytesToNamespace helper is duplicated; extract the
logic from the bytesToNamespace function into a single shared utility in a
common package (e.g., add a new exported function BytesToNamespace in pkg/types
or an existing pkg/types file) and replace both bytesToNamespace implementations
(the one in blob_service.go and the one in pkg/api/jsonrpc/blob.go) to call that
shared BytesToNamespace helper; ensure the new function preserves the same
signature/behavior (checking len == types.NamespaceSize, copying into
types.Namespace, returning error) and update imports where needed.

In `@pkg/api/grpc/server_test.go`:
- Around line 21-121: Duplicate test helpers (mockStore, mockFetcher,
testNamespace) and nearly identical startTestServer/startTestHeaderServer should
be extracted to a shared internal test package; create pkg/api/internal/testutil
with exported helpers NewMockStore, NewMockFetcher, TestNamespace and a unified
StartTestServers (or StartTestServerPair) that returns the servers/clients
needed, then replace the local definitions in both pkg/api/grpc/server_test.go
and pkg/api/jsonrpc/server_test.go to import and use those helpers (reference
symbols: mockStore, mockFetcher, testNamespace, startTestServer,
startTestHeaderServer).
- Around line 165-288: Several tests (TestGRPCBlobGet, TestGRPCBlobGetAll,
TestGRPCHeaderGetByHeight, TestGRPCHeaderLocalHead, TestGRPCHeaderNetworkHead)
repeat the same seed → service → client call → assert pattern; convert them to
table-driven subtests. Create a table (slice of test cases) where each case has
a name, seed data (e.g., blobs or headers), the request to send, which server
starter to use (startTestServer or startTestHeaderServer), and expected
assertions (expected height, data, count, etc.), then loop with t.Run(c.name,
func(t *testing.T){ set up newMockStore(), seed st using c.seed (refer to
st.blobs and st.headers), create notifier and svc via api.NewService, start
client via the appropriate helper, call the RPC (client.Get, GetAll,
GetByHeight, LocalHead, NetworkHead) and run the existing checks for that case
}); factor repeated setup into small helpers (e.g., makeServiceAndClient or
seedStore) referenced by function names in the diff to keep tests concise and
readable.

In `@pkg/api/jsonrpc/server_test.go`:
- Around line 21-121: Extract the duplicated test helpers (mockStore type and
its methods, mockFetcher type and its methods, and testNamespace) into a shared
internal test package (e.g., pkg/api/internal/testutil) and replace the copies
in pkg/api/jsonrpc/server_test.go and pkg/api/grpc/server_test.go with imports
from that package; move all related helper functions and types into that
package, keep signatures unchanged (mockStore, mockFetcher, testNamespace),
update the test files to import testutil and reference
testutil.mockStore/testutil.mockFetcher/testutil.testNamespace (or exported
names if you choose to export), and run tests to ensure no missing methods
(export any symbols needed by both packages).
- Around line 179-241: Consolidate the three similar tests
(TestJSONRPCHeaderGetByHeight, TestJSONRPCHeaderLocalHead,
TestJSONRPCHeaderNetworkHead) into a single table-driven test (e.g.,
TestJSONRPCHeaderMethods) that iterates cases with fields: name, setup (store
headers/syncState or mockFetcher.networkHead), rpcMethod (e.g.,
"header.GetByHeight", "header.LocalHead", "header.NetworkHead"), rpcParams
(uint64(42) only for GetByHeight), and expectedRawJSON. For each case create the
store via newMockStore or mockFetcher as needed, build service with
api.NewService(st, ft, ..., notifier, zerolog.Nop()) and server with NewServer,
call doRPC(srv, rpcMethod, params...), and assert resp.Error is nil and
string(resp.Result) equals expectedRawJSON; this removes duplicated boilerplate
while keeping the same assertions and setups.

In `@pkg/store/sqlite_test.go`:
- Line 105: The tests in sqlite_test.go call s.GetBlobs(ctx, ns, 10, 11, 0, 0)
only with limit=0/offset=0, so pagination SQL is untested; add a table-driven
subtest in the same file that calls s.GetBlobs with a variety of limit>0 and
offset>0 combinations (e.g. limit=2 offset=0, limit=2 offset=1, limit larger
than remaining, offset at end) and assert err==nil, the number of returned blobs
matches expected, and the returned blob ordering/IDs match the expected slice
for each case; reference the s.GetBlobs method and reuse existing fixture data
inserted earlier in the test to compute expected results so the assertions
validate that limit/offset are applied correctly.

In `@pkg/store/sqlite.go`:
- Around line 75-86: The configureSQLite function currently sets PRAGMAs with
db.Exec which only affects the single connection used; instead add the
per-connection PRAGMA settings to the SQLite DSN using modernc.org/sqlite's
_pragma parameters so every pooled connection inherits them (e.g.
_pragma=busy_timeout(5000)&_pragma=foreign_keys(1) and _pragma=journal_mode(WAL)
if desired), and simplify or remove the db.Exec calls in configureSQLite; update
the codepath that opens the DB (the caller that constructs the DSN before
sql.Open) to include these _pragma entries so configureSQLite no longer relies
on per-connection Exec to set busy_timeout and foreign_keys.

In `@proto/apex/v1/blob.proto`:
- Around line 31-38: Change the pagination fields in the GetAllBlobsRequest
message so they use unsigned types to prevent negative values at the wire level:
update the fields limit and offset from int32 to uint32 in message
GetAllBlobsRequest, regenerate protobuf artifacts, and then update any call
sites, validators, and handlers that assume signed ints (e.g., request parsing
in RPC handlers that reference GetAllBlobsRequest.limit or .offset) to use the
new unsigned type or cast/validate as needed.

Comment on lines 188 to 200
// Start gRPC server.
grpcSrv := grpcapi.NewServer(svc, log.Logger)
lis, err := net.Listen("tcp", cfg.RPC.GRPCListenAddr)
if err != nil {
return fmt.Errorf("listen gRPC: %w", err)
}

go func() {
log.Info().Str("addr", cfg.RPC.GRPCListenAddr).Msg("gRPC server listening")
if err := grpcSrv.Serve(lis); err != nil {
log.Error().Err(err).Msg("gRPC server error")
}
}()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

HTTP server left running if gRPC listener setup fails.

If net.Listen on Line 190 fails, the function returns an error immediately, but the HTTP server started at Line 181 is never shut down. This leaks the goroutine and the listening socket.

Proposed fix
 	lis, err := net.Listen("tcp", cfg.RPC.GRPCListenAddr)
 	if err != nil {
+		// Shut down the already-started HTTP server before returning.
+		_ = httpSrv.Close()
 		return fmt.Errorf("listen gRPC: %w", err)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Start gRPC server.
grpcSrv := grpcapi.NewServer(svc, log.Logger)
lis, err := net.Listen("tcp", cfg.RPC.GRPCListenAddr)
if err != nil {
return fmt.Errorf("listen gRPC: %w", err)
}
go func() {
log.Info().Str("addr", cfg.RPC.GRPCListenAddr).Msg("gRPC server listening")
if err := grpcSrv.Serve(lis); err != nil {
log.Error().Err(err).Msg("gRPC server error")
}
}()
// Start gRPC server.
grpcSrv := grpcapi.NewServer(svc, log.Logger)
lis, err := net.Listen("tcp", cfg.RPC.GRPCListenAddr)
if err != nil {
// Shut down the already-started HTTP server before returning.
_ = httpSrv.Close()
return fmt.Errorf("listen gRPC: %w", err)
}
go func() {
log.Info().Str("addr", cfg.RPC.GRPCListenAddr).Msg("gRPC server listening")
if err := grpcSrv.Serve(lis); err != nil {
log.Error().Err(err).Msg("gRPC server error")
}
}()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/apex/main.go` around lines 188 - 200, If net.Listen for gRPC
(cfg.RPC.GRPCListenAddr) fails, the previously started HTTP server goroutine
must be shut down to avoid leaking sockets/goroutines; modify the error path
after net.Listen to call the HTTP server's shutdown routine (e.g.,
httpSrv.Shutdown with a context timeout) and handle/log any shutdown error
before returning the net.Listen error. Locate the HTTP server instance started
earlier (httpSrv or similar) and use context.WithTimeout to perform a graceful
shutdown, optionally using a sync.WaitGroup or channel to wait for the HTTP
goroutine to exit before returning.

- Close HTTP server if gRPC listener setup fails (prevents goroutine leak)
- Apply limit/offset pagination to aggregate result across all namespaces
  instead of per-namespace (fixes incorrect multi-namespace pagination)
- Rename proto request/response types to pass buf DEFAULT lint:
  GetBlobRequest→GetRequest, GetAllBlobsRequest→GetAllRequest,
  SubscribeBlobsRequest→BlobServiceSubscribeRequest,
  SubscribeHeadersRequest→HeaderServiceSubscribeRequest (and responses)
- Fix buf.yaml lint category from STANDARD (invalid) to DEFAULT

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
pkg/api/grpc/server_test.go (2)

123-163: Consider consolidating the two nearly identical test helpers.

startTestServer and startTestHeaderServer share all server-setup/teardown logic and differ only in the returned client type. A single generic helper (or one that returns *grpc.ClientConn) would eliminate the duplication.

♻️ Example: single helper returning the connection
+func startTestGRPCServer(t *testing.T, svc *api.Service) *grpc.ClientConn {
+	t.Helper()
+	srv := NewServer(svc, zerolog.Nop())
+	lis, err := net.Listen("tcp", "127.0.0.1:0")
+	if err != nil {
+		t.Fatalf("listen: %v", err)
+	}
+	go func() { _ = srv.Serve(lis) }()
+	t.Cleanup(func() { srv.GracefulStop() })
+	conn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
+	if err != nil {
+		t.Fatalf("dial: %v", err)
+	}
+	t.Cleanup(func() { _ = conn.Close() })
+	return conn
+}

Then in each test:

conn := startTestGRPCServer(t, svc)
client := pb.NewBlobServiceClient(conn)   // or pb.NewHeaderServiceClient(conn)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/grpc/server_test.go` around lines 123 - 163, Consolidate duplicated
test setup in startTestServer and startTestHeaderServer by creating a single
helper (e.g., startTestGRPCServer) that performs NewServer(svc,...), listens,
serves, registers t.Cleanup for srv.GracefulStop and conn.Close, and returns the
grpc.ClientConn (or *grpc.ClientConn) instead of a specific typed client; then
update callers to call pb.NewBlobServiceClient(conn) or
pb.NewHeaderServiceClient(conn) as needed (replace references to startTestServer
and startTestHeaderServer with the new startTestGRPCServer).

165-217: Tests cover only the happy path — consider adding error-path scenarios.

TestGRPCBlobGet and TestGRPCBlobGetAll exercise only the success case. Table-driven sub-tests for "not found", "invalid namespace", and "empty namespaces" would improve confidence and align with the project's table-driven test guideline.

As per coding guidelines: **/*_test.go: "Use table-driven tests pattern for test implementation".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/grpc/server_test.go` around lines 165 - 217, Add table-driven
subtests to TestGRPCBlobGet and TestGRPCBlobGetAll to cover error paths: for
TestGRPCBlobGet add cases for "not found" (no blob at height/commitment),
"invalid namespace" (bad length), and "bad commitment" and assert returned
error; for TestGRPCBlobGetAll add cases for "empty namespaces" (expect error or
empty result per API), "no matching blobs" and "invalid namespace" and assert
proper error/empty response. Use newMockStore to set up st.blobs per case, reuse
testNamespace() for valid namespaces, and call client.Get / client.GetAll inside
subtests to verify responses and errors instead of only the happy path.
proto/apex/v1/blob.proto (1)

31-38: Consider using uint32 (or int32 with explicit validation) for pagination fields.

int32 allows negative values for limit and offset. The server treats negatives the same as zero (no-op), which is safe but potentially confusing. If negative values are never meaningful, uint32 would express the intent more clearly at the schema level. Low priority — current server-side handling is safe.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@proto/apex/v1/blob.proto` around lines 31 - 38, The pagination fields in
message GetAllRequest (fields limit and offset) are currently int32 and thus
allow negative values; change their types to uint32 to express non-negativity at
the schema level (or keep int32 but add explicit validation in the server-side
handlers that reject negative values) — update the proto definition for
GetAllRequest by replacing "int32 limit" and "int32 offset" with "uint32 limit"
and "uint32 offset" (and regenerate stubs), or alternatively add validation
logic in the RPC handling code that checks GetAllRequest.limit and
GetAllRequest.offset and treats negatives as errors/normalizes them to 0.
pkg/api/service.go (1)

159-167: Exposing Store() and Fetcher() accessors enables the logic duplication flagged in blob_service.go.

The gRPC handler bypasses Service methods and calls Store().GetBlobs(...) directly because the Service methods return json.RawMessage. Consider adding typed (non-JSON) methods on Service for blob/header retrieval, so gRPC handlers can share the same business logic without accessing the store directly. This would let you remove or narrow the Store()/Fetcher() accessors.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/service.go` around lines 159 - 167, The public Store() and Fetcher()
accessors let callers bypass Service business logic (see blob_service.go) — add
typed, non-JSON methods on Service such as GetBlobs(ctx context.Context, ids
[]string) ([]Blob, error) and GetHeaders(ctx context.Context, filter
HeaderFilter) ([]Header, error) that encapsulate the existing
Store().GetBlobs/... and Fetcher usage and return concrete types; update the
gRPC handlers to call these new Service methods instead of Store().GetBlobs(...)
and Fetcher(), then remove or make Service.Store() and Service.Fetcher() private
to prevent direct store/fetcher access.
pkg/api/grpc/blob_service.go (1)

24-42: Extract shared blob-matching logic to reduce duplication.

The gRPC handler re-implements business logic that also exists in the service layer:

  • BlobGet duplication (lines 30-39): The commit matching loop (fetch blobs → iterate → match commitment) is identical to Service.BlobGet in service.go, differing only in return type.
  • BlobGetAll pagination duplication (lines 69-78): The pagination logic (apply offset/limit to aggregate results) is identical to Service.BlobGetAll in service.go.

If the matching algorithm or pagination logic evolves, both code paths require updates. Consider extracting a shared method on Service that accepts a callback or returns the typed blob directly, allowing both the JSON-RPC and gRPC handlers to delegate to a single source of truth.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/grpc/blob_service.go` around lines 24 - 42, The Get handler
duplicates matching logic present in Service.BlobGet; instead of re-implementing
the fetch/iterate/match loop in BlobServiceServer.Get, delegate to the service
layer by adding or using a shared Service method (e.g., Service.BlobGet /
Service.FindBlobByCommitment) that accepts context, namespace, commitment and
height and returns the matching typed blob (or nil/error); then have
BlobServiceServer.Get call that method and convert the returned blob with
blobToProto. Do the same for pagination: replace the inline offset/limit logic
with a call to Service.BlobGetAll (or a new Service.PaginateBlobs helper) so
both JSON-RPC and gRPC use the single implementation. Ensure the new/used
Service method signatures match existing symbols (Service.BlobGet,
Service.BlobGetAll) so handlers can delegate without duplicating logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/api/service.go`:
- Around line 170-179: MarshalBlob currently omits the Blob.Height and
Blob.Signer fields, causing a mismatch with blobToProto and types.Blob; update
the map in MarshalBlob (function MarshalBlob) to include "height": b.Height and
"signer": b.Signer so the JSON mirrors blobToProto/types.Blob (or, if omission
is intentional, add a clear comment in MarshalBlob explaining why those fields
are excluded). Ensure the keys match naming style ("height", "signer") and use
the raw b.Height and b.Signer values as in the types.Blob definition.

---

Nitpick comments:
In `@pkg/api/grpc/blob_service.go`:
- Around line 24-42: The Get handler duplicates matching logic present in
Service.BlobGet; instead of re-implementing the fetch/iterate/match loop in
BlobServiceServer.Get, delegate to the service layer by adding or using a shared
Service method (e.g., Service.BlobGet / Service.FindBlobByCommitment) that
accepts context, namespace, commitment and height and returns the matching typed
blob (or nil/error); then have BlobServiceServer.Get call that method and
convert the returned blob with blobToProto. Do the same for pagination: replace
the inline offset/limit logic with a call to Service.BlobGetAll (or a new
Service.PaginateBlobs helper) so both JSON-RPC and gRPC use the single
implementation. Ensure the new/used Service method signatures match existing
symbols (Service.BlobGet, Service.BlobGetAll) so handlers can delegate without
duplicating logic.

In `@pkg/api/grpc/server_test.go`:
- Around line 123-163: Consolidate duplicated test setup in startTestServer and
startTestHeaderServer by creating a single helper (e.g., startTestGRPCServer)
that performs NewServer(svc,...), listens, serves, registers t.Cleanup for
srv.GracefulStop and conn.Close, and returns the grpc.ClientConn (or
*grpc.ClientConn) instead of a specific typed client; then update callers to
call pb.NewBlobServiceClient(conn) or pb.NewHeaderServiceClient(conn) as needed
(replace references to startTestServer and startTestHeaderServer with the new
startTestGRPCServer).
- Around line 165-217: Add table-driven subtests to TestGRPCBlobGet and
TestGRPCBlobGetAll to cover error paths: for TestGRPCBlobGet add cases for "not
found" (no blob at height/commitment), "invalid namespace" (bad length), and
"bad commitment" and assert returned error; for TestGRPCBlobGetAll add cases for
"empty namespaces" (expect error or empty result per API), "no matching blobs"
and "invalid namespace" and assert proper error/empty response. Use newMockStore
to set up st.blobs per case, reuse testNamespace() for valid namespaces, and
call client.Get / client.GetAll inside subtests to verify responses and errors
instead of only the happy path.

In `@pkg/api/service.go`:
- Around line 159-167: The public Store() and Fetcher() accessors let callers
bypass Service business logic (see blob_service.go) — add typed, non-JSON
methods on Service such as GetBlobs(ctx context.Context, ids []string) ([]Blob,
error) and GetHeaders(ctx context.Context, filter HeaderFilter) ([]Header,
error) that encapsulate the existing Store().GetBlobs/... and Fetcher usage and
return concrete types; update the gRPC handlers to call these new Service
methods instead of Store().GetBlobs(...) and Fetcher(), then remove or make
Service.Store() and Service.Fetcher() private to prevent direct store/fetcher
access.

In `@proto/apex/v1/blob.proto`:
- Around line 31-38: The pagination fields in message GetAllRequest (fields
limit and offset) are currently int32 and thus allow negative values; change
their types to uint32 to express non-negativity at the schema level (or keep
int32 but add explicit validation in the server-side handlers that reject
negative values) — update the proto definition for GetAllRequest by replacing
"int32 limit" and "int32 offset" with "uint32 limit" and "uint32 offset" (and
regenerate stubs), or alternatively add validation logic in the RPC handling
code that checks GetAllRequest.limit and GetAllRequest.offset and treats
negatives as errors/normalizes them to 0.

Comment on lines +170 to +179
func MarshalBlob(b *types.Blob) json.RawMessage {
m := map[string]any{
"namespace": b.Namespace[:],
"data": b.Data,
"share_version": b.ShareVersion,
"commitment": b.Commitment,
"index": b.Index,
}
raw, _ := json.Marshal(m) //nolint:errcheck
return raw
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check types.Blob struct fields to understand what's available
ast-grep --pattern $'type Blob struct {
  $$$
}'

# Also search for Blob definition in a more flexible way
fd -e go . | xargs grep -l "type Blob struct" | head -5

Repository: evstack/apex

Length of output: 1896


🏁 Script executed:

#!/bin/bash
# Check if Signer is used/referenced elsewhere in JSON context
rg -n 'signer' --type=go -g '!**/gen/**' -i | head -20

# Check the blobToProto function mentioned in the comment
rg -n 'blobToProto' --type=go -A 20

Repository: evstack/apex

Length of output: 5305


🏁 Script executed:

#!/bin/bash
# Let's look at the actual service.go file around the MarshalBlob function
cat -n pkg/api/service.go | head -200 | tail -80

Repository: evstack/apex

Length of output: 3216


Include height and signer fields in MarshalBlob to match blobToProto and types.Blob definition.

The types.Blob struct has Height and Signer fields (visible in pkg/types/types.go), and blobToProto includes both when converting to the proto message. However, MarshalBlob omits these fields. This inconsistency may cause issues for upstream clients expecting a complete blob representation. Either add these fields to the marshaled JSON or add a comment explaining why they're intentionally excluded.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/api/service.go` around lines 170 - 179, MarshalBlob currently omits the
Blob.Height and Blob.Signer fields, causing a mismatch with blobToProto and
types.Blob; update the map in MarshalBlob (function MarshalBlob) to include
"height": b.Height and "signer": b.Signer so the JSON mirrors
blobToProto/types.Blob (or, if omission is intentional, add a clear comment in
MarshalBlob explaining why those fields are excluded). Ensure the keys match
naming style ("height", "signer") and use the raw b.Height and b.Signer values
as in the types.Blob definition.

@tac0turtle tac0turtle merged commit c879330 into main Feb 19, 2026
4 checks passed
@tac0turtle tac0turtle deleted the feat/phase2-api-layer branch February 19, 2026 14:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments