diff --git a/cmd/apex/blob_cmd.go b/cmd/apex/blob_cmd.go index 943c1c4..0a9cdc3 100644 --- a/cmd/apex/blob_cmd.go +++ b/cmd/apex/blob_cmd.go @@ -16,6 +16,7 @@ func blobCmd() *cobra.Command { Short: "Query blobs from the indexer", } cmd.AddCommand(blobGetCmd()) + cmd.AddCommand(blobGetByCommitmentCmd()) cmd.AddCommand(blobListCmd()) return cmd } @@ -54,6 +55,30 @@ func blobGetCmd() *cobra.Command { } } +func blobGetByCommitmentCmd() *cobra.Command { + return &cobra.Command{ + Use: "get-by-commitment ", + Short: "Get a blob by commitment alone (no height required)", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + addr, _ := cmd.Flags().GetString("rpc-addr") + + commitment, err := hex.DecodeString(args[0]) + if err != nil { + return fmt.Errorf("invalid commitment hex: %w", err) + } + + client := newRPCClient(addr) + result, err := client.call(cmd.Context(), "blob.GetByCommitment", commitment) + if err != nil { + return err + } + + return printJSON(cmd, result) + }, + } +} + func blobListCmd() *cobra.Command { cmd := &cobra.Command{ Use: "list ", diff --git a/cmd/apex/main.go b/cmd/apex/main.go index bf6c217..497d5da 100644 --- a/cmd/apex/main.go +++ b/cmd/apex/main.go @@ -106,11 +106,16 @@ func startCmd() *cobra.Command { setupLogger(cfg.Log) - log.Info(). + startLog := log.Info(). Str("version", version). - Str("node_url", cfg.DataSource.CelestiaNodeURL). - Int("namespaces", len(cfg.DataSource.Namespaces)). - Msg("starting apex indexer") + Str("datasource_type", cfg.DataSource.Type). + Int("namespaces", len(cfg.DataSource.Namespaces)) + if cfg.DataSource.Type == "app" { + startLog = startLog.Str("app_url", cfg.DataSource.CelestiaAppURL) + } else { + startLog = startLog.Str("node_url", cfg.DataSource.CelestiaNodeURL) + } + startLog.Msg("starting apex indexer") return runIndexer(cmd.Context(), cfg) }, @@ -173,20 +178,38 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { } } - // Connect to Celestia node. - fetcher, err := fetch.NewCelestiaNodeFetcher(ctx, cfg.DataSource.CelestiaNodeURL, cfg.DataSource.AuthToken, log.Logger) - if err != nil { - return fmt.Errorf("connect to celestia node: %w", err) + // Connect to data source. + var ( + dataFetcher fetch.DataFetcher + proofFwd fetch.ProofForwarder + ) + switch cfg.DataSource.Type { + case "app": + appFetcher, err := fetch.NewCelestiaAppFetcher(cfg.DataSource.CelestiaAppURL, cfg.DataSource.AuthToken, log.Logger) + if err != nil { + return fmt.Errorf("create celestia-app fetcher: %w", err) + } + dataFetcher = appFetcher + // celestia-app does not serve blob proofs; proofFwd stays nil. + case "node", "": + nodeFetcher, err := fetch.NewCelestiaNodeFetcher(ctx, cfg.DataSource.CelestiaNodeURL, cfg.DataSource.AuthToken, log.Logger) + if err != nil { + return fmt.Errorf("connect to celestia node: %w", err) + } + dataFetcher = nodeFetcher + proofFwd = nodeFetcher + default: + return fmt.Errorf("unsupported data source type: %q", cfg.DataSource.Type) } - defer fetcher.Close() //nolint:errcheck + defer dataFetcher.Close() //nolint:errcheck // Set up API layer. notifier := api.NewNotifier(cfg.Subscription.BufferSize, log.Logger) notifier.SetMetrics(rec) - svc := api.NewService(db, fetcher, fetcher, notifier, log.Logger) + svc := api.NewService(db, dataFetcher, proofFwd, notifier, log.Logger) // Build and run the sync coordinator with observer hook. - coord := syncer.New(db, fetcher, + coord := syncer.New(db, dataFetcher, syncer.WithStartHeight(cfg.Sync.StartHeight), syncer.WithBatchSize(cfg.Sync.BatchSize), syncer.WithConcurrency(cfg.Sync.Concurrency), diff --git a/config/config.go b/config/config.go index 22a9f20..cd2fc6f 100644 --- a/config/config.go +++ b/config/config.go @@ -17,9 +17,13 @@ type Config struct { Log LogConfig `yaml:"log"` } -// DataSourceConfig configures the Celestia node connection. +// DataSourceConfig configures the Celestia data source. +// Type selects the backend: "node" (default) uses a Celestia DA node, +// "app" uses a celestia-app consensus node via CometBFT RPC. type DataSourceConfig struct { + Type string `yaml:"type"` // "node" (default) or "app" CelestiaNodeURL string `yaml:"celestia_node_url"` + CelestiaAppURL string `yaml:"celestia_app_url"` AuthToken string `yaml:"-"` // populated only via APEX_AUTH_TOKEN env var Namespaces []string `yaml:"namespaces"` } @@ -63,6 +67,7 @@ type LogConfig struct { func DefaultConfig() Config { return Config{ DataSource: DataSourceConfig{ + Type: "node", CelestiaNodeURL: "http://localhost:26658", }, Storage: StorageConfig{ diff --git a/config/load.go b/config/load.go index 9f9d827..71b4b18 100644 --- a/config/load.go +++ b/config/load.go @@ -30,9 +30,15 @@ const defaultConfigYAML = `# Apex configuration # Generated by: apex init data_source: - # Celestia node RPC endpoint + # Data source type: "node" (Celestia DA node) or "app" (celestia-app CometBFT RPC) + type: "node" + + # Celestia DA node RPC endpoint (required when type: "node") celestia_node_url: "http://localhost:26658" + # Celestia-app CometBFT RPC endpoint (required when type: "app") + # celestia_app_url: "http://localhost:26657" + # Auth token: set via APEX_AUTH_TOKEN env var (not read from this file). # Namespaces to index (hex-encoded, 29 bytes = 58 hex chars each). @@ -106,9 +112,30 @@ func Load(path string) (*Config, error) { return &cfg, nil } +func validateDataSource(ds *DataSourceConfig) error { + switch ds.Type { + case "node", "": + if ds.CelestiaNodeURL == "" { + return fmt.Errorf("data_source.celestia_node_url is required for type \"node\"") + } + case "app": + if ds.CelestiaAppURL == "" { + return fmt.Errorf("data_source.celestia_app_url is required for type \"app\"") + } + default: + return fmt.Errorf("data_source.type %q is invalid; must be \"node\" or \"app\"", ds.Type) + } + for _, ns := range ds.Namespaces { + if _, err := types.NamespaceFromHex(ns); err != nil { + return fmt.Errorf("invalid namespace %q: %w", ns, err) + } + } + return nil +} + func validate(cfg *Config) error { - if cfg.DataSource.CelestiaNodeURL == "" { - return fmt.Errorf("data_source.celestia_node_url is required") + if err := validateDataSource(&cfg.DataSource); err != nil { + return err } if cfg.Storage.DBPath == "" { return fmt.Errorf("storage.db_path is required") @@ -138,12 +165,5 @@ func validate(cfg *Config) error { return fmt.Errorf("log.format %q is invalid; must be json or console", cfg.Log.Format) } - // Validate namespace hex strings. - for _, ns := range cfg.DataSource.Namespaces { - if _, err := types.NamespaceFromHex(ns); err != nil { - return fmt.Errorf("invalid namespace %q: %w", ns, err) - } - } - return nil } diff --git a/go.mod b/go.mod index 554401a..5e3915c 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.25.0 require ( github.com/filecoin-project/go-jsonrpc v0.10.1 + github.com/gorilla/websocket v1.4.2 github.com/prometheus/client_golang v1.23.2 github.com/rs/zerolog v1.34.0 github.com/spf13/cobra v1.10.2 @@ -19,7 +20,6 @@ require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/gorilla/websocket v1.4.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/go-log/v2 v2.0.8 // indirect github.com/kr/text v0.2.0 // indirect diff --git a/pkg/api/grpc/blob_service.go b/pkg/api/grpc/blob_service.go index 3a60bcd..b12edef 100644 --- a/pkg/api/grpc/blob_service.go +++ b/pkg/api/grpc/blob_service.go @@ -3,6 +3,7 @@ package grpcapi import ( "bytes" "context" + "errors" "fmt" "google.golang.org/grpc" @@ -41,6 +42,22 @@ func (s *BlobServiceServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.Ge return nil, status.Error(codes.NotFound, store.ErrNotFound.Error()) } +func (s *BlobServiceServer) GetByCommitment(ctx context.Context, req *pb.GetByCommitmentRequest) (*pb.GetByCommitmentResponse, error) { + if len(req.Commitment) == 0 { + return nil, status.Error(codes.InvalidArgument, "commitment is required") + } + + b, err := s.svc.Store().GetBlobByCommitment(ctx, req.Commitment) + if err != nil { + if errors.Is(err, store.ErrNotFound) { + return nil, status.Error(codes.NotFound, store.ErrNotFound.Error()) + } + return nil, status.Errorf(codes.Internal, "get blob by commitment: %v", err) + } + + return &pb.GetByCommitmentResponse{Blob: blobToProto(b)}, nil +} + func (s *BlobServiceServer) GetAll(ctx context.Context, req *pb.GetAllRequest) (*pb.GetAllResponse, error) { const maxNamespaces = 16 if len(req.Namespaces) > maxNamespaces { diff --git a/pkg/api/grpc/gen/apex/v1/blob.pb.go b/pkg/api/grpc/gen/apex/v1/blob.pb.go index 9ecd440..1cc7a77 100644 --- a/pkg/api/grpc/gen/apex/v1/blob.pb.go +++ b/pkg/api/grpc/gen/apex/v1/blob.pb.go @@ -239,6 +239,94 @@ func (x *GetAllResponse) GetBlobs() []*Blob { return nil } +type GetByCommitmentRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Commitment []byte `protobuf:"bytes,1,opt,name=commitment,proto3" json:"commitment,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetByCommitmentRequest) Reset() { + *x = GetByCommitmentRequest{} + mi := &file_apex_v1_blob_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetByCommitmentRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetByCommitmentRequest) ProtoMessage() {} + +func (x *GetByCommitmentRequest) ProtoReflect() protoreflect.Message { + mi := &file_apex_v1_blob_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetByCommitmentRequest.ProtoReflect.Descriptor instead. +func (*GetByCommitmentRequest) Descriptor() ([]byte, []int) { + return file_apex_v1_blob_proto_rawDescGZIP(), []int{4} +} + +func (x *GetByCommitmentRequest) GetCommitment() []byte { + if x != nil { + return x.Commitment + } + return nil +} + +type GetByCommitmentResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Blob *Blob `protobuf:"bytes,1,opt,name=blob,proto3" json:"blob,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetByCommitmentResponse) Reset() { + *x = GetByCommitmentResponse{} + mi := &file_apex_v1_blob_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetByCommitmentResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetByCommitmentResponse) ProtoMessage() {} + +func (x *GetByCommitmentResponse) ProtoReflect() protoreflect.Message { + mi := &file_apex_v1_blob_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetByCommitmentResponse.ProtoReflect.Descriptor instead. +func (*GetByCommitmentResponse) Descriptor() ([]byte, []int) { + return file_apex_v1_blob_proto_rawDescGZIP(), []int{5} +} + +func (x *GetByCommitmentResponse) GetBlob() *Blob { + if x != nil { + return x.Blob + } + return nil +} + type BlobServiceSubscribeRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Namespace []byte `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` @@ -248,7 +336,7 @@ type BlobServiceSubscribeRequest struct { func (x *BlobServiceSubscribeRequest) Reset() { *x = BlobServiceSubscribeRequest{} - mi := &file_apex_v1_blob_proto_msgTypes[4] + mi := &file_apex_v1_blob_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -260,7 +348,7 @@ func (x *BlobServiceSubscribeRequest) String() string { func (*BlobServiceSubscribeRequest) ProtoMessage() {} func (x *BlobServiceSubscribeRequest) ProtoReflect() protoreflect.Message { - mi := &file_apex_v1_blob_proto_msgTypes[4] + mi := &file_apex_v1_blob_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -273,7 +361,7 @@ func (x *BlobServiceSubscribeRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use BlobServiceSubscribeRequest.ProtoReflect.Descriptor instead. func (*BlobServiceSubscribeRequest) Descriptor() ([]byte, []int) { - return file_apex_v1_blob_proto_rawDescGZIP(), []int{4} + return file_apex_v1_blob_proto_rawDescGZIP(), []int{6} } func (x *BlobServiceSubscribeRequest) GetNamespace() []byte { @@ -293,7 +381,7 @@ type BlobServiceSubscribeResponse struct { func (x *BlobServiceSubscribeResponse) Reset() { *x = BlobServiceSubscribeResponse{} - mi := &file_apex_v1_blob_proto_msgTypes[5] + mi := &file_apex_v1_blob_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -305,7 +393,7 @@ func (x *BlobServiceSubscribeResponse) String() string { func (*BlobServiceSubscribeResponse) ProtoMessage() {} func (x *BlobServiceSubscribeResponse) ProtoReflect() protoreflect.Message { - mi := &file_apex_v1_blob_proto_msgTypes[5] + mi := &file_apex_v1_blob_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -318,7 +406,7 @@ func (x *BlobServiceSubscribeResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use BlobServiceSubscribeResponse.ProtoReflect.Descriptor instead. func (*BlobServiceSubscribeResponse) Descriptor() ([]byte, []int) { - return file_apex_v1_blob_proto_rawDescGZIP(), []int{5} + return file_apex_v1_blob_proto_rawDescGZIP(), []int{7} } func (x *BlobServiceSubscribeResponse) GetHeight() uint64 { @@ -357,15 +445,22 @@ const file_apex_v1_blob_proto_rawDesc = "" + "\x05limit\x18\x03 \x01(\x05R\x05limit\x12\x16\n" + "\x06offset\x18\x04 \x01(\x05R\x06offset\"5\n" + "\x0eGetAllResponse\x12#\n" + - "\x05blobs\x18\x01 \x03(\v2\r.apex.v1.BlobR\x05blobs\";\n" + + "\x05blobs\x18\x01 \x03(\v2\r.apex.v1.BlobR\x05blobs\"8\n" + + "\x16GetByCommitmentRequest\x12\x1e\n" + + "\n" + + "commitment\x18\x01 \x01(\fR\n" + + "commitment\"<\n" + + "\x17GetByCommitmentResponse\x12!\n" + + "\x04blob\x18\x01 \x01(\v2\r.apex.v1.BlobR\x04blob\";\n" + "\x1bBlobServiceSubscribeRequest\x12\x1c\n" + "\tnamespace\x18\x01 \x01(\fR\tnamespace\"[\n" + "\x1cBlobServiceSubscribeResponse\x12\x16\n" + "\x06height\x18\x01 \x01(\x04R\x06height\x12#\n" + - "\x05blobs\x18\x02 \x03(\v2\r.apex.v1.BlobR\x05blobs2\xd6\x01\n" + + "\x05blobs\x18\x02 \x03(\v2\r.apex.v1.BlobR\x05blobs2\xac\x02\n" + "\vBlobService\x120\n" + "\x03Get\x12\x13.apex.v1.GetRequest\x1a\x14.apex.v1.GetResponse\x129\n" + - "\x06GetAll\x12\x16.apex.v1.GetAllRequest\x1a\x17.apex.v1.GetAllResponse\x12Z\n" + + "\x06GetAll\x12\x16.apex.v1.GetAllRequest\x1a\x17.apex.v1.GetAllResponse\x12T\n" + + "\x0fGetByCommitment\x12\x1f.apex.v1.GetByCommitmentRequest\x1a .apex.v1.GetByCommitmentResponse\x12Z\n" + "\tSubscribe\x12$.apex.v1.BlobServiceSubscribeRequest\x1a%.apex.v1.BlobServiceSubscribeResponse0\x01B.Z,github.com/evstack/apex/pkg/api/grpc/gen;genb\x06proto3" var ( @@ -380,31 +475,36 @@ func file_apex_v1_blob_proto_rawDescGZIP() []byte { return file_apex_v1_blob_proto_rawDescData } -var file_apex_v1_blob_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_apex_v1_blob_proto_msgTypes = make([]protoimpl.MessageInfo, 8) var file_apex_v1_blob_proto_goTypes = []any{ (*GetRequest)(nil), // 0: apex.v1.GetRequest (*GetResponse)(nil), // 1: apex.v1.GetResponse (*GetAllRequest)(nil), // 2: apex.v1.GetAllRequest (*GetAllResponse)(nil), // 3: apex.v1.GetAllResponse - (*BlobServiceSubscribeRequest)(nil), // 4: apex.v1.BlobServiceSubscribeRequest - (*BlobServiceSubscribeResponse)(nil), // 5: apex.v1.BlobServiceSubscribeResponse - (*Blob)(nil), // 6: apex.v1.Blob + (*GetByCommitmentRequest)(nil), // 4: apex.v1.GetByCommitmentRequest + (*GetByCommitmentResponse)(nil), // 5: apex.v1.GetByCommitmentResponse + (*BlobServiceSubscribeRequest)(nil), // 6: apex.v1.BlobServiceSubscribeRequest + (*BlobServiceSubscribeResponse)(nil), // 7: apex.v1.BlobServiceSubscribeResponse + (*Blob)(nil), // 8: apex.v1.Blob } var file_apex_v1_blob_proto_depIdxs = []int32{ - 6, // 0: apex.v1.GetResponse.blob:type_name -> apex.v1.Blob - 6, // 1: apex.v1.GetAllResponse.blobs:type_name -> apex.v1.Blob - 6, // 2: apex.v1.BlobServiceSubscribeResponse.blobs:type_name -> apex.v1.Blob - 0, // 3: apex.v1.BlobService.Get:input_type -> apex.v1.GetRequest - 2, // 4: apex.v1.BlobService.GetAll:input_type -> apex.v1.GetAllRequest - 4, // 5: apex.v1.BlobService.Subscribe:input_type -> apex.v1.BlobServiceSubscribeRequest - 1, // 6: apex.v1.BlobService.Get:output_type -> apex.v1.GetResponse - 3, // 7: apex.v1.BlobService.GetAll:output_type -> apex.v1.GetAllResponse - 5, // 8: apex.v1.BlobService.Subscribe:output_type -> apex.v1.BlobServiceSubscribeResponse - 6, // [6:9] is the sub-list for method output_type - 3, // [3:6] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 8, // 0: apex.v1.GetResponse.blob:type_name -> apex.v1.Blob + 8, // 1: apex.v1.GetAllResponse.blobs:type_name -> apex.v1.Blob + 8, // 2: apex.v1.GetByCommitmentResponse.blob:type_name -> apex.v1.Blob + 8, // 3: apex.v1.BlobServiceSubscribeResponse.blobs:type_name -> apex.v1.Blob + 0, // 4: apex.v1.BlobService.Get:input_type -> apex.v1.GetRequest + 2, // 5: apex.v1.BlobService.GetAll:input_type -> apex.v1.GetAllRequest + 4, // 6: apex.v1.BlobService.GetByCommitment:input_type -> apex.v1.GetByCommitmentRequest + 6, // 7: apex.v1.BlobService.Subscribe:input_type -> apex.v1.BlobServiceSubscribeRequest + 1, // 8: apex.v1.BlobService.Get:output_type -> apex.v1.GetResponse + 3, // 9: apex.v1.BlobService.GetAll:output_type -> apex.v1.GetAllResponse + 5, // 10: apex.v1.BlobService.GetByCommitment:output_type -> apex.v1.GetByCommitmentResponse + 7, // 11: apex.v1.BlobService.Subscribe:output_type -> apex.v1.BlobServiceSubscribeResponse + 8, // [8:12] is the sub-list for method output_type + 4, // [4:8] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_apex_v1_blob_proto_init() } @@ -419,7 +519,7 @@ func file_apex_v1_blob_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_apex_v1_blob_proto_rawDesc), len(file_apex_v1_blob_proto_rawDesc)), NumEnums: 0, - NumMessages: 6, + NumMessages: 8, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/api/grpc/gen/apex/v1/blob_grpc.pb.go b/pkg/api/grpc/gen/apex/v1/blob_grpc.pb.go index 8134f42..ab03764 100644 --- a/pkg/api/grpc/gen/apex/v1/blob_grpc.pb.go +++ b/pkg/api/grpc/gen/apex/v1/blob_grpc.pb.go @@ -19,9 +19,10 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - BlobService_Get_FullMethodName = "/apex.v1.BlobService/Get" - BlobService_GetAll_FullMethodName = "/apex.v1.BlobService/GetAll" - BlobService_Subscribe_FullMethodName = "/apex.v1.BlobService/Subscribe" + BlobService_Get_FullMethodName = "/apex.v1.BlobService/Get" + BlobService_GetAll_FullMethodName = "/apex.v1.BlobService/GetAll" + BlobService_GetByCommitment_FullMethodName = "/apex.v1.BlobService/GetByCommitment" + BlobService_Subscribe_FullMethodName = "/apex.v1.BlobService/Subscribe" ) // BlobServiceClient is the client API for BlobService service. @@ -34,6 +35,8 @@ type BlobServiceClient interface { Get(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*GetResponse, error) // GetAll returns all blobs for the given namespaces at the given height. GetAll(ctx context.Context, in *GetAllRequest, opts ...grpc.CallOption) (*GetAllResponse, error) + // GetByCommitment returns a blob matching the given commitment. + GetByCommitment(ctx context.Context, in *GetByCommitmentRequest, opts ...grpc.CallOption) (*GetByCommitmentResponse, error) // Subscribe streams blob events for the given namespace. Subscribe(ctx context.Context, in *BlobServiceSubscribeRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[BlobServiceSubscribeResponse], error) } @@ -66,6 +69,16 @@ func (c *blobServiceClient) GetAll(ctx context.Context, in *GetAllRequest, opts return out, nil } +func (c *blobServiceClient) GetByCommitment(ctx context.Context, in *GetByCommitmentRequest, opts ...grpc.CallOption) (*GetByCommitmentResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetByCommitmentResponse) + err := c.cc.Invoke(ctx, BlobService_GetByCommitment_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *blobServiceClient) Subscribe(ctx context.Context, in *BlobServiceSubscribeRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[BlobServiceSubscribeResponse], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &BlobService_ServiceDesc.Streams[0], BlobService_Subscribe_FullMethodName, cOpts...) @@ -95,6 +108,8 @@ type BlobServiceServer interface { Get(context.Context, *GetRequest) (*GetResponse, error) // GetAll returns all blobs for the given namespaces at the given height. GetAll(context.Context, *GetAllRequest) (*GetAllResponse, error) + // GetByCommitment returns a blob matching the given commitment. + GetByCommitment(context.Context, *GetByCommitmentRequest) (*GetByCommitmentResponse, error) // Subscribe streams blob events for the given namespace. Subscribe(*BlobServiceSubscribeRequest, grpc.ServerStreamingServer[BlobServiceSubscribeResponse]) error mustEmbedUnimplementedBlobServiceServer() @@ -113,6 +128,9 @@ func (UnimplementedBlobServiceServer) Get(context.Context, *GetRequest) (*GetRes func (UnimplementedBlobServiceServer) GetAll(context.Context, *GetAllRequest) (*GetAllResponse, error) { return nil, status.Error(codes.Unimplemented, "method GetAll not implemented") } +func (UnimplementedBlobServiceServer) GetByCommitment(context.Context, *GetByCommitmentRequest) (*GetByCommitmentResponse, error) { + return nil, status.Error(codes.Unimplemented, "method GetByCommitment not implemented") +} func (UnimplementedBlobServiceServer) Subscribe(*BlobServiceSubscribeRequest, grpc.ServerStreamingServer[BlobServiceSubscribeResponse]) error { return status.Error(codes.Unimplemented, "method Subscribe not implemented") } @@ -173,6 +191,24 @@ func _BlobService_GetAll_Handler(srv interface{}, ctx context.Context, dec func( return interceptor(ctx, in, info, handler) } +func _BlobService_GetByCommitment_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetByCommitmentRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BlobServiceServer).GetByCommitment(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: BlobService_GetByCommitment_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BlobServiceServer).GetByCommitment(ctx, req.(*GetByCommitmentRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _BlobService_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(BlobServiceSubscribeRequest) if err := stream.RecvMsg(m); err != nil { @@ -199,6 +235,10 @@ var BlobService_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetAll", Handler: _BlobService_GetAll_Handler, }, + { + MethodName: "GetByCommitment", + Handler: _BlobService_GetByCommitment_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/pkg/api/grpc/server_test.go b/pkg/api/grpc/server_test.go index a370528..a32e722 100644 --- a/pkg/api/grpc/server_test.go +++ b/pkg/api/grpc/server_test.go @@ -1,6 +1,7 @@ package grpcapi import ( + "bytes" "context" "errors" "net" @@ -59,6 +60,17 @@ func (m *mockStore) GetBlobs(_ context.Context, ns types.Namespace, startHeight, return result, nil } +func (m *mockStore) GetBlobByCommitment(_ context.Context, commitment []byte) (*types.Blob, error) { + for _, blobs := range m.blobs { + for i := range blobs { + if bytes.Equal(blobs[i].Commitment, commitment) { + return &blobs[i], nil + } + } + } + return nil, store.ErrNotFound +} + func (m *mockStore) PutHeader(_ context.Context, h *types.Header) error { m.headers[h.Height] = h return nil @@ -216,6 +228,33 @@ func TestGRPCBlobGetAll(t *testing.T) { } } +func TestGRPCBlobGetByCommitment(t *testing.T) { + st := newMockStore() + ns := testNamespace(1) + commitment := []byte("c1") + + st.blobs[10] = []types.Blob{ + {Height: 10, Namespace: ns, Data: []byte("d1"), Commitment: commitment, Index: 0}, + } + + notifier := api.NewNotifier(16, zerolog.Nop()) + svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop()) + client := startTestServer(t, svc) + + resp, err := client.GetByCommitment(context.Background(), &pb.GetByCommitmentRequest{ + Commitment: commitment, + }) + if err != nil { + t.Fatalf("GetByCommitment: %v", err) + } + if resp.Blob.Height != 10 { + t.Errorf("Height = %d, want 10", resp.Blob.Height) + } + if string(resp.Blob.Data) != "d1" { + t.Errorf("Data = %q, want %q", resp.Blob.Data, "d1") + } +} + func TestGRPCHeaderGetByHeight(t *testing.T) { st := newMockStore() now := time.Now().UTC().Truncate(time.Second) diff --git a/pkg/api/jsonrpc/blob.go b/pkg/api/jsonrpc/blob.go index 87b2e50..ba1bf00 100644 --- a/pkg/api/jsonrpc/blob.go +++ b/pkg/api/jsonrpc/blob.go @@ -23,6 +23,11 @@ func (h *BlobHandler) Get(ctx context.Context, height uint64, namespace []byte, return h.svc.BlobGet(ctx, height, ns, commitment) } +// GetByCommitment returns a blob matching the given commitment. +func (h *BlobHandler) GetByCommitment(ctx context.Context, commitment []byte) (json.RawMessage, error) { + return h.svc.BlobGetByCommitment(ctx, commitment) +} + // GetAll returns all blobs for the given namespaces at the given height. func (h *BlobHandler) GetAll(ctx context.Context, height uint64, namespaces [][]byte) (json.RawMessage, error) { nsList := make([]types.Namespace, len(namespaces)) diff --git a/pkg/api/jsonrpc/server_test.go b/pkg/api/jsonrpc/server_test.go index 21fbe3c..a5f644e 100644 --- a/pkg/api/jsonrpc/server_test.go +++ b/pkg/api/jsonrpc/server_test.go @@ -59,6 +59,17 @@ func (m *mockStore) GetBlobs(_ context.Context, ns types.Namespace, startHeight, return result, nil } +func (m *mockStore) GetBlobByCommitment(_ context.Context, commitment []byte) (*types.Blob, error) { + for _, blobs := range m.blobs { + for i := range blobs { + if bytes.Equal(blobs[i].Commitment, commitment) { + return &blobs[i], nil + } + } + } + return nil, store.ErrNotFound +} + func (m *mockStore) PutHeader(_ context.Context, h *types.Header) error { m.headers[h.Height] = h return nil @@ -265,6 +276,40 @@ func TestJSONRPCBlobGetAll(t *testing.T) { } } +func TestJSONRPCBlobGetByCommitment(t *testing.T) { + st := newMockStore() + ns := testNamespace(1) + st.blobs[10] = []types.Blob{ + {Height: 10, Namespace: ns, Data: []byte("d1"), Commitment: []byte("c1"), Index: 0}, + } + + notifier := api.NewNotifier(16, zerolog.Nop()) + svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop()) + srv := NewServer(svc, zerolog.Nop()) + + t.Run("found", func(t *testing.T) { + resp := doRPC(t, srv, "blob.GetByCommitment", []byte("c1")) + if resp.Error != nil { + t.Fatalf("RPC error: %s", resp.Error.Message) + } + + var blob map[string]json.RawMessage + if err := json.Unmarshal(resp.Result, &blob); err != nil { + t.Fatalf("unmarshal blob: %v", err) + } + if _, ok := blob["commitment"]; !ok { + t.Error("blob JSON missing 'commitment' field") + } + }) + + t.Run("not found", func(t *testing.T) { + resp := doRPC(t, srv, "blob.GetByCommitment", []byte("missing")) + if resp.Error == nil { + t.Fatal("expected error for missing commitment") + } + }) +} + func TestJSONRPCStubMethods(t *testing.T) { notifier := api.NewNotifier(16, zerolog.Nop()) svc := api.NewService(newMockStore(), &mockFetcher{}, nil, notifier, zerolog.Nop()) diff --git a/pkg/api/service.go b/pkg/api/service.go index e8fa191..76b2dbd 100644 --- a/pkg/api/service.go +++ b/pkg/api/service.go @@ -53,6 +53,19 @@ func (s *Service) BlobGet(ctx context.Context, height uint64, namespace types.Na return nil, store.ErrNotFound } +// BlobGetByCommitment returns a blob matching the given commitment as JSON. +// No height or namespace required — commitment is cryptographically unique. +func (s *Service) BlobGetByCommitment(ctx context.Context, commitment []byte) (json.RawMessage, error) { + if len(commitment) == 0 { + return nil, fmt.Errorf("commitment is required") + } + b, err := s.store.GetBlobByCommitment(ctx, commitment) + if err != nil { + return nil, fmt.Errorf("get blob by commitment: %w", err) + } + return MarshalBlob(b), nil +} + // BlobGetAll returns all blobs for the given namespaces at the given height. // limit=0 means no limit; offset=0 means no offset. // Pagination is applied to the aggregate result across all namespaces. diff --git a/pkg/api/service_test.go b/pkg/api/service_test.go index 04ff6d0..60297e6 100644 --- a/pkg/api/service_test.go +++ b/pkg/api/service_test.go @@ -1,6 +1,7 @@ package api import ( + "bytes" "context" "encoding/json" "errors" @@ -54,6 +55,17 @@ func (m *mockStore) GetBlobs(_ context.Context, ns types.Namespace, startHeight, return result, nil } +func (m *mockStore) GetBlobByCommitment(_ context.Context, commitment []byte) (*types.Blob, error) { + for _, blobs := range m.blobs { + for i := range blobs { + if bytes.Equal(blobs[i].Commitment, commitment) { + return &blobs[i], nil + } + } + } + return nil, store.ErrNotFound +} + func (m *mockStore) PutHeader(_ context.Context, h *types.Header) error { m.headers[h.Height] = h return nil @@ -139,6 +151,50 @@ func TestServiceBlobGet(t *testing.T) { } } +func TestServiceBlobGetByCommitment(t *testing.T) { + tests := []struct { + name string + seed bool + commit []byte + wantErr bool + }{ + {name: "found", seed: true, commit: []byte("c1")}, + {name: "not found", seed: false, commit: []byte("missing"), wantErr: true}, + {name: "empty commitment", seed: false, commit: nil, wantErr: true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + st := newMockStore() + if tt.seed { + ns := testNamespace(1) + st.blobs[10] = []types.Blob{ + {Height: 10, Namespace: ns, Data: []byte("d1"), Commitment: []byte("c1"), Index: 0}, + } + } + svc := NewService(st, &mockFetcher{}, nil, NewNotifier(16, zerolog.Nop()), zerolog.Nop()) + + raw, err := svc.BlobGetByCommitment(context.Background(), tt.commit) + if tt.wantErr { + if err == nil { + t.Fatal("expected error, got nil") + } + return + } + if err != nil { + t.Fatalf("BlobGetByCommitment: %v", err) + } + + var m map[string]json.RawMessage + if err := json.Unmarshal(raw, &m); err != nil { + t.Fatalf("unmarshal blob: %v", err) + } + if _, ok := m["commitment"]; !ok { + t.Error("blob JSON missing 'commitment' field") + } + }) + } +} + func TestServiceBlobGetNotFound(t *testing.T) { st := newMockStore() ns := testNamespace(1) diff --git a/pkg/fetch/blobtx.go b/pkg/fetch/blobtx.go new file mode 100644 index 0000000..d37341f --- /dev/null +++ b/pkg/fetch/blobtx.go @@ -0,0 +1,381 @@ +package fetch + +import ( + "fmt" + + "google.golang.org/protobuf/encoding/protowire" + + "github.com/evstack/apex/pkg/types" +) + +// blobTxTypeID is the trailing byte that identifies a BlobTx. +// Defined in celestia-app as 0x62 ('b'). +const blobTxTypeID = 0x62 + +// msgPayForBlobsTypeURL is the protobuf Any type URL for MsgPayForBlobs. +const msgPayForBlobsTypeURL = "/celestia.blob.v1.MsgPayForBlobs" + +// rawBlob holds the fields parsed from a BlobTx blob proto message. +type rawBlob struct { + Namespace []byte + Data []byte + ShareVersion uint32 + NamespaceVersion uint32 + Signer []byte +} + +// pfbData holds the fields extracted from MsgPayForBlobs. +type pfbData struct { + Signer []byte + ShareCommitments [][]byte +} + +// parsedBlobTx is the result of fully parsing a BlobTx: blobs + their +// commitments and signer from MsgPayForBlobs in the inner SDK tx. +type parsedBlobTx struct { + Blobs []rawBlob + PFB pfbData +} + +// parseBlobTx decodes a Celestia BlobTx envelope and extracts both the +// blobs and the MsgPayForBlobs data (commitments, signer) from the inner tx. +// +// BlobTx wire format: +// +// inner_tx (length-prefixed) || blob1 (length-prefixed) || blob2 ... || 0x62 +func parseBlobTx(raw []byte) (*parsedBlobTx, error) { + if len(raw) == 0 { + return nil, fmt.Errorf("empty BlobTx") + } + if raw[len(raw)-1] != blobTxTypeID { + return nil, fmt.Errorf("not a BlobTx: trailing byte 0x%02x, want 0x%02x", raw[len(raw)-1], blobTxTypeID) + } + + // Strip trailing type byte. + data := raw[:len(raw)-1] + + // Read inner SDK tx (length-prefixed). + innerTx, n := protowire.ConsumeBytes(data) + if n < 0 { + return nil, fmt.Errorf("decode inner tx: invalid length prefix") + } + data = data[n:] + + // Parse MsgPayForBlobs from the inner Cosmos SDK tx. + pfb, err := parsePFBFromTx(innerTx) + if err != nil { + return nil, fmt.Errorf("parse inner tx: %w", err) + } + + // Remaining bytes are length-prefixed blob proto messages. + var blobs []rawBlob + for len(data) > 0 { + blobBytes, bn := protowire.ConsumeBytes(data) + if bn < 0 { + return nil, fmt.Errorf("decode blob %d: invalid length prefix", len(blobs)) + } + data = data[bn:] + + b, err := parseRawBlob(blobBytes) + if err != nil { + return nil, fmt.Errorf("parse blob %d: %w", len(blobs), err) + } + blobs = append(blobs, b) + } + + return &parsedBlobTx{Blobs: blobs, PFB: pfb}, nil +} + +// parsePFBFromTx extracts MsgPayForBlobs data from a Cosmos SDK tx. +// +// Proto chain: +// +// cosmos.tx.v1beta1.Tx → field 1: body (bytes) +// cosmos.tx.v1beta1.TxBody → field 1: messages (repeated bytes/Any) +// google.protobuf.Any → field 1: type_url (string), field 2: value (bytes) +// celestia.blob.v1.MsgPayForBlobs → field 1: signer (string), field 4: share_commitments (repeated bytes) +func parsePFBFromTx(txBytes []byte) (pfbData, error) { + // Extract TxBody (field 1) from Tx. + bodyBytes, err := extractBytesField(txBytes, 1) + if err != nil { + return pfbData{}, fmt.Errorf("extract tx body: %w", err) + } + if bodyBytes == nil { + return pfbData{}, fmt.Errorf("tx has no body") + } + + // Iterate messages (field 1, repeated) in TxBody to find MsgPayForBlobs. + messages := extractRepeatedBytesField(bodyBytes, 1) + for _, msgAny := range messages { + typeURL, value, err := parseAny(msgAny) + if err != nil { + continue + } + if typeURL != msgPayForBlobsTypeURL { + continue + } + return parseMsgPayForBlobs(value) + } + + return pfbData{}, fmt.Errorf("no MsgPayForBlobs found in tx") +} + +// parseMsgPayForBlobs extracts signer and share_commitments from MsgPayForBlobs. +// +// field 1: signer (string) +// field 4: share_commitments (repeated bytes) +func parseMsgPayForBlobs(data []byte) (pfbData, error) { + var result pfbData + buf := data + for len(buf) > 0 { + num, typ, n := protowire.ConsumeTag(buf) + if n < 0 { + return pfbData{}, fmt.Errorf("invalid tag in MsgPayForBlobs") + } + buf = buf[n:] + + switch typ { + case protowire.BytesType: + val, n := protowire.ConsumeBytes(buf) + if n < 0 { + return pfbData{}, fmt.Errorf("field %d: invalid bytes", num) + } + buf = buf[n:] + switch num { + case 1: // signer (string, but wire type is bytes) + result.Signer = append([]byte(nil), val...) + case 4: // share_commitments (repeated bytes) + result.ShareCommitments = append(result.ShareCommitments, append([]byte(nil), val...)) + } + case protowire.VarintType: + _, n := protowire.ConsumeVarint(buf) + if n < 0 { + return pfbData{}, fmt.Errorf("field %d: invalid varint", num) + } + buf = buf[n:] + default: + // Skip unknown wire types for forward compatibility. + n = protowire.ConsumeFieldValue(num, typ, buf) + if n < 0 { + return pfbData{}, fmt.Errorf("field %d: invalid value for wire type %d", num, typ) + } + buf = buf[n:] + } + } + return result, nil +} + +// parseAny decodes a google.protobuf.Any message. +// +// field 1: type_url (string), field 2: value (bytes) +func parseAny(data []byte) (typeURL string, value []byte, err error) { + buf := data + for len(buf) > 0 { + num, typ, n := protowire.ConsumeTag(buf) + if n < 0 { + return "", nil, fmt.Errorf("invalid tag") + } + buf = buf[n:] + + if typ != protowire.BytesType { + // Skip non-bytes fields. + n = protowire.ConsumeFieldValue(num, typ, buf) + if n < 0 { + return "", nil, fmt.Errorf("field %d: invalid value", num) + } + buf = buf[n:] + continue + } + + val, n := protowire.ConsumeBytes(buf) + if n < 0 { + return "", nil, fmt.Errorf("field %d: invalid bytes", num) + } + buf = buf[n:] + + switch num { + case 1: + typeURL = string(val) + case 2: + value = val + } + } + return typeURL, value, nil +} + +// parseRawBlob decodes a single blob protobuf message from BlobTx. +// +// Blob fields: 1: namespace (bytes), 2: data (bytes), 3: share_version (uint32), +// 4: namespace_version (uint32), 5: signer (bytes, celestia-app v2+) +func parseRawBlob(data []byte) (rawBlob, error) { + var b rawBlob + for len(data) > 0 { + num, typ, n := protowire.ConsumeTag(data) + if n < 0 { + return rawBlob{}, fmt.Errorf("invalid proto tag") + } + data = data[n:] + + switch typ { + case protowire.BytesType: + val, n := protowire.ConsumeBytes(data) + if n < 0 { + return rawBlob{}, fmt.Errorf("field %d: invalid bytes", num) + } + data = data[n:] + switch num { + case 1: // namespace + b.Namespace = append([]byte(nil), val...) + case 2: // data + b.Data = append([]byte(nil), val...) + case 5: // signer (celestia-app v2+) + b.Signer = append([]byte(nil), val...) + } + case protowire.VarintType: + val, n := protowire.ConsumeVarint(data) + if n < 0 { + return rawBlob{}, fmt.Errorf("field %d: invalid varint", num) + } + data = data[n:] + switch num { + case 3: // share_version + b.ShareVersion = uint32(val) + case 4: // namespace_version + b.NamespaceVersion = uint32(val) + } + default: + // Skip unknown wire types for forward compatibility. + n = protowire.ConsumeFieldValue(num, typ, data) + if n < 0 { + return rawBlob{}, fmt.Errorf("field %d: invalid value for wire type %d", num, typ) + } + data = data[n:] + } + } + return b, nil +} + +// extractBlobsFromBlock iterates over block transactions, parses BlobTxs, +// and returns blobs matching any of the given namespaces. Commitments and +// signer are sourced from MsgPayForBlobs in the inner SDK tx. +func extractBlobsFromBlock(txs [][]byte, namespaces []types.Namespace, height uint64) ([]types.Blob, error) { + nsSet := make(map[types.Namespace]struct{}, len(namespaces)) + for _, ns := range namespaces { + nsSet[ns] = struct{}{} + } + + var result []types.Blob + blobIndex := 0 + + for _, tx := range txs { + if len(tx) == 0 || tx[len(tx)-1] != blobTxTypeID { + continue + } + + parsed, err := parseBlobTx(tx) + if err != nil { + // Skip malformed BlobTxs rather than failing the whole block. + continue + } + + for i, rb := range parsed.Blobs { + if len(rb.Namespace) != types.NamespaceSize { + blobIndex++ + continue + } + var ns types.Namespace + copy(ns[:], rb.Namespace) + + if _, ok := nsSet[ns]; !ok { + blobIndex++ + continue + } + + // Commitment comes from MsgPayForBlobs, matched by index. + var commitment []byte + if i < len(parsed.PFB.ShareCommitments) { + commitment = parsed.PFB.ShareCommitments[i] + } + + // Signer: prefer MsgPayForBlobs.signer, fall back to blob-level signer (v2+). + signer := parsed.PFB.Signer + if len(signer) == 0 { + signer = rb.Signer + } + + result = append(result, types.Blob{ + Height: height, + Namespace: ns, + Data: rb.Data, + Commitment: commitment, + ShareVersion: rb.ShareVersion, + Signer: signer, + Index: blobIndex, + }) + blobIndex++ + } + } + + return result, nil +} + +// extractBytesField returns the first occurrence of a bytes-typed field. +func extractBytesField(data []byte, target protowire.Number) ([]byte, error) { + for len(data) > 0 { + num, typ, n := protowire.ConsumeTag(data) + if n < 0 { + return nil, fmt.Errorf("invalid tag") + } + data = data[n:] + + if typ == protowire.BytesType { + val, n := protowire.ConsumeBytes(data) + if n < 0 { + return nil, fmt.Errorf("field %d: invalid bytes", num) + } + data = data[n:] + if num == target { + return val, nil + } + continue + } + + n = protowire.ConsumeFieldValue(num, typ, data) + if n < 0 { + return nil, fmt.Errorf("field %d: invalid value", num) + } + data = data[n:] + } + return nil, nil +} + +// extractRepeatedBytesField returns all occurrences of a bytes-typed field. +func extractRepeatedBytesField(data []byte, target protowire.Number) [][]byte { + var result [][]byte + for len(data) > 0 { + num, typ, n := protowire.ConsumeTag(data) + if n < 0 { + return result + } + data = data[n:] + + if typ == protowire.BytesType { + val, n := protowire.ConsumeBytes(data) + if n < 0 { + return result + } + data = data[n:] + if num == target { + result = append(result, val) + } + continue + } + + n = protowire.ConsumeFieldValue(num, typ, data) + if n < 0 { + return result + } + data = data[n:] + } + return result +} diff --git a/pkg/fetch/blobtx_test.go b/pkg/fetch/blobtx_test.go new file mode 100644 index 0000000..2bc7d93 --- /dev/null +++ b/pkg/fetch/blobtx_test.go @@ -0,0 +1,281 @@ +package fetch + +import ( + "testing" + + "google.golang.org/protobuf/encoding/protowire" + + "github.com/evstack/apex/pkg/types" +) + +// buildBlobProto encodes a rawBlob as a protobuf message. +func buildBlobProto(b rawBlob) []byte { + var out []byte + if len(b.Namespace) > 0 { + out = protowire.AppendTag(out, 1, protowire.BytesType) + out = protowire.AppendBytes(out, b.Namespace) + } + if len(b.Data) > 0 { + out = protowire.AppendTag(out, 2, protowire.BytesType) + out = protowire.AppendBytes(out, b.Data) + } + if b.ShareVersion > 0 { + out = protowire.AppendTag(out, 3, protowire.VarintType) + out = protowire.AppendVarint(out, uint64(b.ShareVersion)) + } + if b.NamespaceVersion > 0 { + out = protowire.AppendTag(out, 4, protowire.VarintType) + out = protowire.AppendVarint(out, uint64(b.NamespaceVersion)) + } + if len(b.Signer) > 0 { + out = protowire.AppendTag(out, 5, protowire.BytesType) + out = protowire.AppendBytes(out, b.Signer) + } + return out +} + +// buildMsgPayForBlobs encodes a MsgPayForBlobs proto. +// field 1: signer (string/bytes), field 4: share_commitments (repeated bytes) +func buildMsgPayForBlobs(signer string, commitments [][]byte) []byte { + var out []byte + if signer != "" { + out = protowire.AppendTag(out, 1, protowire.BytesType) + out = protowire.AppendBytes(out, []byte(signer)) + } + for _, c := range commitments { + out = protowire.AppendTag(out, 4, protowire.BytesType) + out = protowire.AppendBytes(out, c) + } + return out +} + +// buildAny encodes a google.protobuf.Any proto. +func buildAny(typeURL string, value []byte) []byte { + var out []byte + out = protowire.AppendTag(out, 1, protowire.BytesType) + out = protowire.AppendBytes(out, []byte(typeURL)) + out = protowire.AppendTag(out, 2, protowire.BytesType) + out = protowire.AppendBytes(out, value) + return out +} + +// buildTxBody encodes a TxBody with messages. +func buildTxBody(messages ...[]byte) []byte { + var out []byte + for _, msg := range messages { + out = protowire.AppendTag(out, 1, protowire.BytesType) + out = protowire.AppendBytes(out, msg) + } + return out +} + +// buildTx encodes a Cosmos SDK Tx with a body. +func buildTx(body []byte) []byte { + var out []byte + out = protowire.AppendTag(out, 1, protowire.BytesType) + out = protowire.AppendBytes(out, body) + return out +} + +// buildInnerSDKTx constructs a valid inner SDK tx containing MsgPayForBlobs. +func buildInnerSDKTx(signer string, commitments [][]byte) []byte { + pfb := buildMsgPayForBlobs(signer, commitments) + any := buildAny(msgPayForBlobsTypeURL, pfb) + body := buildTxBody(any) + return buildTx(body) +} + +// buildBlobTx constructs a valid BlobTx wire-format message with proper +// MsgPayForBlobs containing commitments and signer. +func buildBlobTx(signer string, commitments [][]byte, blobs ...rawBlob) []byte { + innerTx := buildInnerSDKTx(signer, commitments) + var out []byte + // Length-prefixed inner tx. + out = protowire.AppendBytes(out, innerTx) + // Length-prefixed blob protos. + for _, b := range blobs { + blobProto := buildBlobProto(b) + out = protowire.AppendBytes(out, blobProto) + } + // Trailing BlobTx type byte. + out = append(out, blobTxTypeID) + return out +} + +func testNS(b byte) []byte { + ns := make([]byte, types.NamespaceSize) + ns[types.NamespaceSize-1] = b + return ns +} + +func TestParseBlobTxSingleBlob(t *testing.T) { + ns := testNS(1) + tx := buildBlobTx("celestia1abc", [][]byte{[]byte("commit1")}, + rawBlob{Namespace: ns, Data: []byte("hello")}, + ) + + parsed, err := parseBlobTx(tx) + if err != nil { + t.Fatalf("parseBlobTx: %v", err) + } + if len(parsed.Blobs) != 1 { + t.Fatalf("got %d blobs, want 1", len(parsed.Blobs)) + } + if string(parsed.Blobs[0].Data) != "hello" { + t.Errorf("Data = %q, want %q", parsed.Blobs[0].Data, "hello") + } + if string(parsed.PFB.Signer) != "celestia1abc" { + t.Errorf("Signer = %q, want %q", parsed.PFB.Signer, "celestia1abc") + } + if len(parsed.PFB.ShareCommitments) != 1 { + t.Fatalf("got %d commitments, want 1", len(parsed.PFB.ShareCommitments)) + } + if string(parsed.PFB.ShareCommitments[0]) != "commit1" { + t.Errorf("Commitment = %q, want %q", parsed.PFB.ShareCommitments[0], "commit1") + } +} + +func TestParseBlobTxMultiBlob(t *testing.T) { + tx := buildBlobTx("signer", [][]byte{[]byte("c1"), []byte("c2")}, + rawBlob{Namespace: testNS(1), Data: []byte("a")}, + rawBlob{Namespace: testNS(2), Data: []byte("b")}, + ) + + parsed, err := parseBlobTx(tx) + if err != nil { + t.Fatalf("parseBlobTx: %v", err) + } + if len(parsed.Blobs) != 2 { + t.Fatalf("got %d blobs, want 2", len(parsed.Blobs)) + } + if string(parsed.Blobs[0].Data) != "a" { + t.Errorf("blob[0].Data = %q", parsed.Blobs[0].Data) + } + if string(parsed.Blobs[1].Data) != "b" { + t.Errorf("blob[1].Data = %q", parsed.Blobs[1].Data) + } + if len(parsed.PFB.ShareCommitments) != 2 { + t.Fatalf("got %d commitments, want 2", len(parsed.PFB.ShareCommitments)) + } + if string(parsed.PFB.ShareCommitments[0]) != "c1" { + t.Errorf("commitment[0] = %q", parsed.PFB.ShareCommitments[0]) + } + if string(parsed.PFB.ShareCommitments[1]) != "c2" { + t.Errorf("commitment[1] = %q", parsed.PFB.ShareCommitments[1]) + } +} + +func TestParseBlobTxNotBlobTx(t *testing.T) { + _, err := parseBlobTx([]byte{0x01, 0x02, 0x03}) + if err == nil { + t.Fatal("expected error for non-BlobTx") + } +} + +func TestParseBlobTxEmpty(t *testing.T) { + _, err := parseBlobTx(nil) + if err == nil { + t.Fatal("expected error for empty input") + } +} + +func TestExtractBlobsFromBlock(t *testing.T) { + ns1 := testNS(1) + ns2 := testNS(2) + ns3 := testNS(3) + + var nsType1 types.Namespace + copy(nsType1[:], ns1) + var nsType2 types.Namespace + copy(nsType2[:], ns2) + + tx1 := buildBlobTx("signer1", [][]byte{[]byte("c1"), []byte("c3")}, + rawBlob{Namespace: ns1, Data: []byte("d1")}, + rawBlob{Namespace: ns3, Data: []byte("d3")}, + ) + tx2 := buildBlobTx("signer2", [][]byte{[]byte("c2")}, + rawBlob{Namespace: ns2, Data: []byte("d2")}, + ) + // Non-BlobTx should be skipped. + regularTx := []byte("regular-cosmos-tx") + + blobs, err := extractBlobsFromBlock( + [][]byte{tx1, regularTx, tx2}, + []types.Namespace{nsType1, nsType2}, + 100, + ) + if err != nil { + t.Fatalf("extractBlobsFromBlock: %v", err) + } + if len(blobs) != 2 { + t.Fatalf("got %d blobs, want 2", len(blobs)) + } + + // First matching blob: ns1 from tx1 + if string(blobs[0].Data) != "d1" { + t.Errorf("blobs[0].Data = %q, want %q", blobs[0].Data, "d1") + } + if blobs[0].Height != 100 { + t.Errorf("blobs[0].Height = %d, want 100", blobs[0].Height) + } + if string(blobs[0].Commitment) != "c1" { + t.Errorf("blobs[0].Commitment = %q, want %q", blobs[0].Commitment, "c1") + } + if string(blobs[0].Signer) != "signer1" { + t.Errorf("blobs[0].Signer = %q, want %q", blobs[0].Signer, "signer1") + } + + // Second matching blob: ns2 from tx2 + if string(blobs[1].Data) != "d2" { + t.Errorf("blobs[1].Data = %q, want %q", blobs[1].Data, "d2") + } + if string(blobs[1].Commitment) != "c2" { + t.Errorf("blobs[1].Commitment = %q, want %q", blobs[1].Commitment, "c2") + } + if string(blobs[1].Signer) != "signer2" { + t.Errorf("blobs[1].Signer = %q, want %q", blobs[1].Signer, "signer2") + } +} + +func TestExtractBlobsFromBlockNoMatch(t *testing.T) { + ns1 := testNS(1) + ns99 := testNS(99) + var nsType99 types.Namespace + copy(nsType99[:], ns99) + + tx := buildBlobTx("s", [][]byte{[]byte("c")}, + rawBlob{Namespace: ns1, Data: []byte("d1")}, + ) + + blobs, err := extractBlobsFromBlock([][]byte{tx}, []types.Namespace{nsType99}, 50) + if err != nil { + t.Fatalf("extractBlobsFromBlock: %v", err) + } + if len(blobs) != 0 { + t.Fatalf("got %d blobs, want 0", len(blobs)) + } +} + +func TestParsePFBFromTx(t *testing.T) { + innerTx := buildInnerSDKTx("celestia1xyz", [][]byte{ + []byte("commit_a"), + []byte("commit_b"), + }) + + pfb, err := parsePFBFromTx(innerTx) + if err != nil { + t.Fatalf("parsePFBFromTx: %v", err) + } + if string(pfb.Signer) != "celestia1xyz" { + t.Errorf("Signer = %q, want %q", pfb.Signer, "celestia1xyz") + } + if len(pfb.ShareCommitments) != 2 { + t.Fatalf("got %d commitments, want 2", len(pfb.ShareCommitments)) + } + if string(pfb.ShareCommitments[0]) != "commit_a" { + t.Errorf("commitment[0] = %q", pfb.ShareCommitments[0]) + } + if string(pfb.ShareCommitments[1]) != "commit_b" { + t.Errorf("commitment[1] = %q", pfb.ShareCommitments[1]) + } +} diff --git a/pkg/fetch/celestia_app.go b/pkg/fetch/celestia_app.go new file mode 100644 index 0000000..d4fbf33 --- /dev/null +++ b/pkg/fetch/celestia_app.go @@ -0,0 +1,379 @@ +package fetch + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/rs/zerolog" + + "github.com/evstack/apex/pkg/types" +) + +// CelestiaAppFetcher implements DataFetcher using CometBFT RPC endpoints +// exposed by celestia-app (consensus node). This enables indexing without +// a Celestia DA node. +type CelestiaAppFetcher struct { + baseURL string + wsURL string + httpClient *http.Client + authToken string + log zerolog.Logger + mu sync.Mutex + closed bool + cancelSub context.CancelFunc +} + +// NewCelestiaAppFetcher creates a fetcher that reads from celestia-app's +// CometBFT RPC. No connection is established at construction time. +func NewCelestiaAppFetcher(baseURL, authToken string, log zerolog.Logger) (*CelestiaAppFetcher, error) { + u, err := url.Parse(baseURL) + if err != nil { + return nil, fmt.Errorf("invalid base URL: %w", err) + } + + // Derive WebSocket URL. + wsScheme := "ws" + if u.Scheme == "https" { + wsScheme = "wss" + } + wsURL := fmt.Sprintf("%s://%s/websocket", wsScheme, u.Host) + + return &CelestiaAppFetcher{ + baseURL: strings.TrimRight(baseURL, "/"), + wsURL: wsURL, + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, + authToken: authToken, + log: log.With().Str("component", "celestia-app-fetcher").Logger(), + }, nil +} + +// GetHeader fetches a block at the given height and returns a Header. +func (f *CelestiaAppFetcher) GetHeader(ctx context.Context, height uint64) (*types.Header, error) { + endpoint := fmt.Sprintf("%s/block?height=%d", f.baseURL, height) + result, err := f.doGet(ctx, endpoint) + if err != nil { + return nil, fmt.Errorf("get block at height %d: %w", height, err) + } + return f.parseBlockHeader(result, height) +} + +// GetBlobs fetches a block and extracts blobs matching the given namespaces. +func (f *CelestiaAppFetcher) GetBlobs(ctx context.Context, height uint64, namespaces []types.Namespace) ([]types.Blob, error) { + endpoint := fmt.Sprintf("%s/block?height=%d", f.baseURL, height) + result, err := f.doGet(ctx, endpoint) + if err != nil { + return nil, fmt.Errorf("get block at height %d: %w", height, err) + } + + var block cometBlockResult + if err := json.Unmarshal(result, &block); err != nil { + return nil, fmt.Errorf("unmarshal block result: %w", err) + } + + txs := make([][]byte, len(block.Block.Data.Txs)) + for i, txB64 := range block.Block.Data.Txs { + decoded, err := base64.StdEncoding.DecodeString(txB64) + if err != nil { + return nil, fmt.Errorf("decode tx %d: %w", i, err) + } + txs[i] = decoded + } + + blobs, err := extractBlobsFromBlock(txs, namespaces, height) + if err != nil { + return nil, fmt.Errorf("extract blobs at height %d: %w", height, err) + } + if len(blobs) == 0 { + return nil, nil + } + return blobs, nil +} + +// GetNetworkHead returns the header at the latest block height. +func (f *CelestiaAppFetcher) GetNetworkHead(ctx context.Context) (*types.Header, error) { + endpoint := fmt.Sprintf("%s/status", f.baseURL) + result, err := f.doGet(ctx, endpoint) + if err != nil { + return nil, fmt.Errorf("get status: %w", err) + } + + var status cometStatusResult + if err := json.Unmarshal(result, &status); err != nil { + return nil, fmt.Errorf("unmarshal status: %w", err) + } + + headHeight := uint64(status.SyncInfo.LatestBlockHeight) + return f.GetHeader(ctx, headHeight) +} + +// SubscribeHeaders connects to the CometBFT WebSocket and subscribes to +// new block events. The returned channel is closed on context cancellation +// or connection error. No reconnection -- the coordinator handles gaps. +func (f *CelestiaAppFetcher) SubscribeHeaders(ctx context.Context) (<-chan *types.Header, error) { + subCtx, cancel := context.WithCancel(ctx) + + f.mu.Lock() + if f.closed { + f.mu.Unlock() + cancel() + return nil, fmt.Errorf("fetcher is closed") + } + if f.cancelSub != nil { + f.cancelSub() // cancel previous subscription + } + f.cancelSub = cancel + f.mu.Unlock() + + dialer := websocket.Dialer{ + HandshakeTimeout: 10 * time.Second, + } + header := http.Header{} + if f.authToken != "" { + header.Set("Authorization", "Bearer "+f.authToken) + } + + conn, resp, err := dialer.DialContext(subCtx, f.wsURL, header) + if resp != nil && resp.Body != nil { + resp.Body.Close() //nolint:errcheck + } + if err != nil { + cancel() + return nil, fmt.Errorf("websocket dial: %w", err) + } + + // Subscribe to new block events. + subscribeReq := map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "method": "subscribe", + "params": map[string]string{ + "query": "tm.event='NewBlock'", + }, + } + if err := conn.WriteJSON(subscribeReq); err != nil { + _ = conn.Close() + cancel() + return nil, fmt.Errorf("send subscribe: %w", err) + } + + out := make(chan *types.Header, 64) + go f.readHeaderLoop(subCtx, conn, out) + + return out, nil +} + +func (f *CelestiaAppFetcher) readHeaderLoop(ctx context.Context, conn *websocket.Conn, out chan<- *types.Header) { + defer close(out) + defer conn.Close() //nolint:errcheck + + // Close the connection when context is cancelled to unblock ReadMessage. + go func() { + <-ctx.Done() + _ = conn.Close() + }() + + for { + _, msg, err := conn.ReadMessage() + if err != nil { + if ctx.Err() == nil { + f.log.Error().Err(err).Msg("websocket read error") + } + return + } + + hdr, err := f.parseWSBlockEvent(msg) + if err != nil { + f.log.Warn().Err(err).Msg("skip unparseable block event") + continue + } + if hdr == nil { + // Non-block message (e.g. subscription confirmation). + continue + } + + select { + case out <- hdr: + case <-ctx.Done(): + return + } + } +} + +// Close cancels any active subscription and marks the fetcher as closed. +func (f *CelestiaAppFetcher) Close() error { + f.mu.Lock() + defer f.mu.Unlock() + if f.closed { + return nil + } + f.closed = true + if f.cancelSub != nil { + f.cancelSub() + } + return nil +} + +// doGet performs an authenticated HTTP GET and extracts the "result" field. +func (f *CelestiaAppFetcher) doGet(ctx context.Context, url string) (json.RawMessage, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + if f.authToken != "" { + req.Header.Set("Authorization", "Bearer "+f.authToken) + } + + resp, err := f.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("HTTP GET %s: %w", url, err) + } + defer resp.Body.Close() //nolint:errcheck + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 512)) + return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, body) + } + + var rpcResp cometRPCResponse + if err := json.NewDecoder(resp.Body).Decode(&rpcResp); err != nil { + return nil, fmt.Errorf("decode response: %w", err) + } + if rpcResp.Error != nil { + return nil, fmt.Errorf("RPC error %d: %s", rpcResp.Error.Code, rpcResp.Error.Message) + } + return rpcResp.Result, nil +} + +func (f *CelestiaAppFetcher) parseBlockHeader(result json.RawMessage, height uint64) (*types.Header, error) { + var block cometBlockResult + if err := json.Unmarshal(result, &block); err != nil { + return nil, fmt.Errorf("unmarshal block: %w", err) + } + + t, err := time.Parse(time.RFC3339Nano, block.Block.Header.Time) + if err != nil { + return nil, fmt.Errorf("parse block time: %w", err) + } + + return &types.Header{ + Height: uint64(block.Block.Header.Height), + Hash: []byte(block.BlockID.Hash), + DataHash: []byte(block.Block.Header.DataHash), + Time: t, + RawHeader: result, + }, nil +} + +func (f *CelestiaAppFetcher) parseWSBlockEvent(msg []byte) (*types.Header, error) { + var event cometWSEvent + if err := json.Unmarshal(msg, &event); err != nil { + return nil, fmt.Errorf("unmarshal ws event: %w", err) + } + + // Skip non-result messages (e.g. subscription confirmation has empty result). + if len(event.Result.Data.Value) == 0 { + return nil, nil //nolint:nilnil + } + + var blockValue cometBlockEventValue + if err := json.Unmarshal(event.Result.Data.Value, &blockValue); err != nil { + return nil, fmt.Errorf("unmarshal block event value: %w", err) + } + + hdr := blockValue.Block.Header + t, err := time.Parse(time.RFC3339Nano, hdr.Time) + if err != nil { + return nil, fmt.Errorf("parse header time: %w", err) + } + + raw, err := json.Marshal(blockValue) + if err != nil { + return nil, fmt.Errorf("marshal raw header: %w", err) + } + + return &types.Header{ + Height: uint64(hdr.Height), + Hash: []byte(blockValue.BlockID.Hash), + DataHash: []byte(hdr.DataHash), + Time: t, + RawHeader: raw, + }, nil +} + +// CometBFT JSON-RPC response types. + +type cometRPCResponse struct { + Result json.RawMessage `json:"result"` + Error *cometRPCError `json:"error,omitempty"` +} + +type cometRPCError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +type cometBlockResult struct { + BlockID cometBlockID `json:"block_id"` + Block cometBlock `json:"block"` +} + +type cometBlockID struct { + Hash hexBytes `json:"hash"` +} + +type cometBlock struct { + Header cometHeader `json:"header"` + Data cometTxData `json:"data"` +} + +type cometHeader struct { + Height jsonInt64 `json:"height"` + Time string `json:"time"` + DataHash hexBytes `json:"data_hash"` +} + +type cometTxData struct { + Txs []string `json:"txs"` // base64-encoded +} + +type cometStatusResult struct { + SyncInfo cometSyncInfo `json:"sync_info"` +} + +type cometSyncInfo struct { + LatestBlockHeight jsonInt64 `json:"latest_block_height"` +} + +// WebSocket event types. + +type cometWSEvent struct { + Result cometWSResult `json:"result"` +} + +type cometWSResult struct { + Data cometWSData `json:"data"` +} + +type cometWSData struct { + Value json.RawMessage `json:"value"` +} + +type cometBlockEventValue struct { + Block cometEventBlock `json:"block"` + BlockID cometBlockID `json:"block_id"` +} + +type cometEventBlock struct { + Header cometHeader `json:"header"` +} diff --git a/pkg/fetch/celestia_app_test.go b/pkg/fetch/celestia_app_test.go new file mode 100644 index 0000000..7673420 --- /dev/null +++ b/pkg/fetch/celestia_app_test.go @@ -0,0 +1,291 @@ +package fetch + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/rs/zerolog" + + "github.com/evstack/apex/pkg/types" +) + +func cometBlockResponse(height int, txsB64 []string, timeStr string) string { + txsJSON, _ := json.Marshal(txsB64) + return fmt.Sprintf(`{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "block_id": { + "hash": "ABCD1234" + }, + "block": { + "header": { + "height": "%d", + "time": "%s", + "data_hash": "DDDD" + }, + "data": { + "txs": %s + } + } + } + }`, height, timeStr, string(txsJSON)) +} + +func cometStatusResponse(height int) string { + return fmt.Sprintf(`{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "sync_info": { + "latest_block_height": "%d" + } + } + }`, height) +} + +func TestCelestiaAppFetcherGetHeader(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, "/block") { + t.Errorf("unexpected path: %s", r.URL.Path) + http.Error(w, "not found", http.StatusNotFound) + return + } + blockTime := time.Date(2025, 6, 15, 12, 0, 0, 0, time.UTC).Format(time.RFC3339Nano) + _, _ = fmt.Fprint(w, cometBlockResponse(42, nil, blockTime)) + })) + defer ts.Close() + + f, err := NewCelestiaAppFetcher(ts.URL, "", zerolog.Nop()) + if err != nil { + t.Fatalf("NewCelestiaAppFetcher: %v", err) + } + defer f.Close() //nolint:errcheck + + hdr, err := f.GetHeader(context.Background(), 42) + if err != nil { + t.Fatalf("GetHeader: %v", err) + } + if hdr.Height != 42 { + t.Errorf("Height = %d, want 42", hdr.Height) + } + if len(hdr.Hash) == 0 { + t.Error("Hash is empty") + } +} + +func TestCelestiaAppFetcherGetBlobs(t *testing.T) { + ns := testNS(1) + + blobTx := buildBlobTx("signer", [][]byte{[]byte("c1")}, + rawBlob{Namespace: ns, Data: []byte("blob-data")}, + ) + blobTxB64 := base64.StdEncoding.EncodeToString(blobTx) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + blockTime := time.Now().UTC().Format(time.RFC3339Nano) + _, _ = fmt.Fprint(w, cometBlockResponse(10, []string{blobTxB64}, blockTime)) + })) + defer ts.Close() + + f, err := NewCelestiaAppFetcher(ts.URL, "", zerolog.Nop()) + if err != nil { + t.Fatalf("NewCelestiaAppFetcher: %v", err) + } + defer f.Close() //nolint:errcheck + + var nsType types.Namespace + copy(nsType[:], ns) + + blobs, err := f.GetBlobs(context.Background(), 10, []types.Namespace{nsType}) + if err != nil { + t.Fatalf("GetBlobs: %v", err) + } + if len(blobs) != 1 { + t.Fatalf("got %d blobs, want 1", len(blobs)) + } + if string(blobs[0].Data) != "blob-data" { + t.Errorf("Data = %q, want %q", blobs[0].Data, "blob-data") + } + if string(blobs[0].Commitment) != "c1" { + t.Errorf("Commitment = %q, want %q", blobs[0].Commitment, "c1") + } + if string(blobs[0].Signer) != "signer" { + t.Errorf("Signer = %q, want %q", blobs[0].Signer, "signer") + } +} + +func TestCelestiaAppFetcherGetBlobsNoMatch(t *testing.T) { + ns := testNS(1) + blobTx := buildBlobTx("s", [][]byte{[]byte("c")}, rawBlob{Namespace: ns, Data: []byte("d")}) + blobTxB64 := base64.StdEncoding.EncodeToString(blobTx) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + blockTime := time.Now().UTC().Format(time.RFC3339Nano) + _, _ = fmt.Fprint(w, cometBlockResponse(10, []string{blobTxB64}, blockTime)) + })) + defer ts.Close() + + f, err := NewCelestiaAppFetcher(ts.URL, "", zerolog.Nop()) + if err != nil { + t.Fatalf("NewCelestiaAppFetcher: %v", err) + } + defer f.Close() //nolint:errcheck + + var ns99 types.Namespace + ns99[types.NamespaceSize-1] = 99 + + blobs, err := f.GetBlobs(context.Background(), 10, []types.Namespace{ns99}) + if err != nil { + t.Fatalf("GetBlobs: %v", err) + } + if blobs != nil { + t.Errorf("expected nil for no matching blobs, got %d blobs", len(blobs)) + } +} + +func TestCelestiaAppFetcherGetNetworkHead(t *testing.T) { + var requestCount atomic.Int32 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestCount.Add(1) + if strings.HasPrefix(r.URL.Path, "/status") { + _, _ = fmt.Fprint(w, cometStatusResponse(50)) + return + } + if strings.HasPrefix(r.URL.Path, "/block") { + blockTime := time.Now().UTC().Format(time.RFC3339Nano) + _, _ = fmt.Fprint(w, cometBlockResponse(50, nil, blockTime)) + return + } + http.Error(w, "not found", http.StatusNotFound) + })) + defer ts.Close() + + f, err := NewCelestiaAppFetcher(ts.URL, "", zerolog.Nop()) + if err != nil { + t.Fatalf("NewCelestiaAppFetcher: %v", err) + } + defer f.Close() //nolint:errcheck + + hdr, err := f.GetNetworkHead(context.Background()) + if err != nil { + t.Fatalf("GetNetworkHead: %v", err) + } + if hdr.Height != 50 { + t.Errorf("Height = %d, want 50", hdr.Height) + } + if requestCount.Load() != 2 { + t.Errorf("expected 2 HTTP requests (status + block), got %d", requestCount.Load()) + } +} + +func TestCelestiaAppFetcherSubscribeHeaders(t *testing.T) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(_ *http.Request) bool { return true }, + } + + blockTime := time.Now().UTC().Format(time.RFC3339Nano) + eventJSON := fmt.Sprintf(`{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "data": { + "type": "tendermint/event/NewBlock", + "value": { + "block": { + "header": { + "height": "100", + "time": "%s", + "data_hash": "AAAA" + } + }, + "block_id": { + "hash": "BBBB" + } + } + } + } + }`, blockTime) + + var upgradeErr atomic.Value + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/websocket" { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + upgradeErr.Store(err) + return + } + defer conn.Close() //nolint:errcheck + + // Read the subscribe request. + _, _, err = conn.ReadMessage() + if err != nil { + return + } + + // Send subscription confirmation (empty result). + _ = conn.WriteMessage(websocket.TextMessage, []byte(`{"jsonrpc":"2.0","id":1,"result":{}}`)) + + // Send a block event. + _ = conn.WriteMessage(websocket.TextMessage, []byte(eventJSON)) + + // Wait for client to close. + for { + if _, _, err := conn.ReadMessage(); err != nil { + return + } + } + } + })) + defer ts.Close() + + f, err := NewCelestiaAppFetcher(ts.URL, "", zerolog.Nop()) + if err != nil { + t.Fatalf("NewCelestiaAppFetcher: %v", err) + } + defer f.Close() //nolint:errcheck + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ch, err := f.SubscribeHeaders(ctx) + if err != nil { + t.Fatalf("SubscribeHeaders: %v", err) + } + + select { + case hdr := <-ch: + if hdr.Height != 100 { + t.Errorf("Height = %d, want 100", hdr.Height) + } + case <-ctx.Done(): + t.Fatal("timed out waiting for header") + } + + if v := upgradeErr.Load(); v != nil { + t.Errorf("websocket upgrade: %v", v) + } +} + +func TestCelestiaAppFetcherCloseIdempotent(t *testing.T) { + f, err := NewCelestiaAppFetcher("http://localhost:26657", "", zerolog.Nop()) + if err != nil { + t.Fatalf("NewCelestiaAppFetcher: %v", err) + } + + if err := f.Close(); err != nil { + t.Fatalf("Close (first): %v", err) + } + if err := f.Close(); err != nil { + t.Fatalf("Close (second): %v", err) + } +} diff --git a/pkg/store/migrations/002_commitment_index.sql b/pkg/store/migrations/002_commitment_index.sql new file mode 100644 index 0000000..b66123a --- /dev/null +++ b/pkg/store/migrations/002_commitment_index.sql @@ -0,0 +1 @@ +CREATE INDEX IF NOT EXISTS idx_blobs_commitment ON blobs(commitment); diff --git a/pkg/store/sqlite.go b/pkg/store/sqlite.go index 4d8fbcd..9669f4f 100644 --- a/pkg/store/sqlite.go +++ b/pkg/store/sqlite.go @@ -92,32 +92,54 @@ func configureSQLite(db *sql.DB) error { return nil } +// migrationStep defines a single schema migration. +type migrationStep struct { + version int + file string +} + +// allMigrations lists every migration in order. Add new entries here. +var allMigrations = []migrationStep{ + {version: 1, file: "migrations/001_init.sql"}, + {version: 2, file: "migrations/002_commitment_index.sql"}, +} + func (s *SQLiteStore) migrate() error { var version int if err := s.writer.QueryRow("PRAGMA user_version").Scan(&version); err != nil { return fmt.Errorf("read user_version: %w", err) } - if version >= 1 { - return nil + for _, m := range allMigrations { + if version >= m.version { + continue + } + if err := s.applyMigration(m); err != nil { + return err + } + version = m.version } - ddl, err := migrations.ReadFile("migrations/001_init.sql") + return nil +} + +func (s *SQLiteStore) applyMigration(m migrationStep) error { + ddl, err := migrations.ReadFile(m.file) if err != nil { - return fmt.Errorf("read migration: %w", err) + return fmt.Errorf("read migration %d: %w", m.version, err) } tx, err := s.writer.Begin() if err != nil { - return fmt.Errorf("begin migration tx: %w", err) + return fmt.Errorf("begin migration %d tx: %w", m.version, err) } defer tx.Rollback() //nolint:errcheck if _, err := tx.Exec(string(ddl)); err != nil { - return fmt.Errorf("exec migration: %w", err) + return fmt.Errorf("exec migration %d: %w", m.version, err) } - if _, err := tx.Exec("PRAGMA user_version = 1"); err != nil { - return fmt.Errorf("set user_version: %w", err) + if _, err := tx.Exec(fmt.Sprintf("PRAGMA user_version = %d", m.version)); err != nil { + return fmt.Errorf("set user_version to %d: %w", m.version, err) } return tx.Commit() @@ -196,6 +218,16 @@ func (s *SQLiteStore) GetBlobs(ctx context.Context, ns types.Namespace, startHei return blobs, rows.Err() } +func (s *SQLiteStore) GetBlobByCommitment(ctx context.Context, commitment []byte) (*types.Blob, error) { + start := time.Now() + defer func() { s.metrics.ObserveStoreQueryDuration("GetBlobByCommitment", time.Since(start)) }() + + row := s.reader.QueryRowContext(ctx, + `SELECT height, namespace, commitment, data, share_version, signer, blob_index + FROM blobs WHERE commitment = ? LIMIT 1`, commitment) + return scanBlob(row) +} + func (s *SQLiteStore) PutHeader(ctx context.Context, header *types.Header) error { start := time.Now() defer func() { s.metrics.ObserveStoreQueryDuration("PutHeader", time.Since(start)) }() diff --git a/pkg/store/sqlite_test.go b/pkg/store/sqlite_test.go index 3c3aa1f..d57ebc6 100644 --- a/pkg/store/sqlite_test.go +++ b/pkg/store/sqlite_test.go @@ -131,6 +131,53 @@ func TestGetBlobNotFound(t *testing.T) { } } +func TestGetBlobByCommitment(t *testing.T) { + ns := testNamespace(1) + seed := []types.Blob{ + {Height: 10, Namespace: ns, Commitment: []byte("c1"), Data: []byte("d1"), Index: 0}, + {Height: 10, Namespace: ns, Commitment: []byte("c2"), Data: []byte("d2"), Index: 1}, + } + tests := []struct { + name string + commitment []byte + wantData string + wantHeight uint64 + wantIndex int + wantErr error + }{ + {name: "found", commitment: []byte("c2"), wantData: "d2", wantHeight: 10, wantIndex: 1}, + {name: "not found", commitment: []byte("nonexistent"), wantErr: ErrNotFound}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := openTestDB(t) + ctx := context.Background() + if err := s.PutBlobs(ctx, seed); err != nil { + t.Fatalf("PutBlobs: %v", err) + } + got, err := s.GetBlobByCommitment(ctx, tt.commitment) + if tt.wantErr != nil { + if !errors.Is(err, tt.wantErr) { + t.Fatalf("got err %v, want %v", err, tt.wantErr) + } + return + } + if err != nil { + t.Fatalf("GetBlobByCommitment: %v", err) + } + if string(got.Data) != tt.wantData { + t.Errorf("Data = %q, want %q", got.Data, tt.wantData) + } + if got.Height != tt.wantHeight { + t.Errorf("Height = %d, want %d", got.Height, tt.wantHeight) + } + if got.Index != tt.wantIndex { + t.Errorf("Index = %d, want %d", got.Index, tt.wantIndex) + } + }) + } +} + func TestPutBlobsIdempotent(t *testing.T) { s := openTestDB(t) ctx := context.Background() diff --git a/pkg/store/store.go b/pkg/store/store.go index 14e9d1b..feb30bd 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -20,6 +20,11 @@ type Store interface { // limit=0 means no limit; offset=0 means no offset. GetBlobs(ctx context.Context, ns types.Namespace, startHeight, endHeight uint64, limit, offset int) ([]types.Blob, error) + // GetBlobByCommitment returns a blob matching the given commitment. + // Commitment is cryptographically unique across the chain, so no + // namespace or height is required. + GetBlobByCommitment(ctx context.Context, commitment []byte) (*types.Blob, error) + PutHeader(ctx context.Context, header *types.Header) error GetHeader(ctx context.Context, height uint64) (*types.Header, error) diff --git a/pkg/sync/mock_test.go b/pkg/sync/mock_test.go index b857561..e25967f 100644 --- a/pkg/sync/mock_test.go +++ b/pkg/sync/mock_test.go @@ -1,6 +1,7 @@ package syncer import ( + "bytes" "context" "sync" @@ -80,6 +81,17 @@ func (m *mockStore) GetBlobs(_ context.Context, ns types.Namespace, startHeight, return result, nil } +func (m *mockStore) GetBlobByCommitment(_ context.Context, commitment []byte) (*types.Blob, error) { + m.mu.Lock() + defer m.mu.Unlock() + for i := range m.blobs { + if bytes.Equal(m.blobs[i].Commitment, commitment) { + return &m.blobs[i], nil + } + } + return nil, store.ErrNotFound +} + func (m *mockStore) PutNamespace(_ context.Context, ns types.Namespace) error { m.mu.Lock() defer m.mu.Unlock() diff --git a/proto/apex/v1/blob.proto b/proto/apex/v1/blob.proto index 8af81d3..b4fe7e1 100644 --- a/proto/apex/v1/blob.proto +++ b/proto/apex/v1/blob.proto @@ -14,6 +14,9 @@ service BlobService { // GetAll returns all blobs for the given namespaces at the given height. rpc GetAll(GetAllRequest) returns (GetAllResponse); + // GetByCommitment returns a blob matching the given commitment. + rpc GetByCommitment(GetByCommitmentRequest) returns (GetByCommitmentResponse); + // Subscribe streams blob events for the given namespace. rpc Subscribe(BlobServiceSubscribeRequest) returns (stream BlobServiceSubscribeResponse); } @@ -41,6 +44,14 @@ message GetAllResponse { repeated Blob blobs = 1; } +message GetByCommitmentRequest { + bytes commitment = 1; +} + +message GetByCommitmentResponse { + Blob blob = 1; +} + message BlobServiceSubscribeRequest { bytes namespace = 1; }