From 26e2129a92083efdb5874d5d81f836f132f164f5 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Thu, 19 Feb 2026 20:55:06 +0100 Subject: [PATCH 1/6] feat: add S3-compatible storage backend Add an S3Store implementation that batches multiple heights into single S3 objects (chunks) to reduce object count and request costs. Writes are buffered in memory and flushed at checkpoint boundaries (SetSyncState). - New S3Config with bucket, prefix, region, endpoint, chunk_size - Store factory in main.go (sqlite/s3 switch) - 13 mock-based unit tests covering round-trips, chunk boundaries, buffer read-through, idempotent writes, commitment index lookups - Config validation for S3-specific required fields Also includes profiling support, backfill progress tracking, and metrics formatting fixes from the working tree. Co-Authored-By: Claude Opus 4.6 --- cmd/apex/blob_cmd.go | 9 +- cmd/apex/main.go | 89 ++-- config/config.go | 31 +- config/load.go | 54 ++- config/load_test.go | 71 +++ go.mod | 19 + go.sum | 38 ++ pkg/metrics/metrics.go | 47 +- pkg/metrics/metrics_test.go | 28 +- pkg/profile/server.go | 50 +++ pkg/store/s3.go | 861 ++++++++++++++++++++++++++++++++++++ pkg/store/s3_test.go | 555 +++++++++++++++++++++++ pkg/sync/backfill.go | 76 ++++ pkg/sync/coordinator.go | 1 + pkg/types/types.go | 9 +- 15 files changed, 1886 insertions(+), 52 deletions(-) create mode 100644 config/load_test.go create mode 100644 pkg/profile/server.go create mode 100644 pkg/store/s3.go create mode 100644 pkg/store/s3_test.go diff --git a/cmd/apex/blob_cmd.go b/cmd/apex/blob_cmd.go index 0a9cdc3..8c878bb 100644 --- a/cmd/apex/blob_cmd.go +++ b/cmd/apex/blob_cmd.go @@ -7,6 +7,7 @@ import ( "os" "strconv" + "github.com/evstack/apex/pkg/types" "github.com/spf13/cobra" ) @@ -34,12 +35,12 @@ func blobGetCmd() *cobra.Command { return fmt.Errorf("invalid height: %w", err) } - ns, err := hex.DecodeString(args[1]) + ns, err := hex.DecodeString(types.StripHexPrefix(args[1])) if err != nil { return fmt.Errorf("invalid namespace hex: %w", err) } - commitment, err := hex.DecodeString(args[2]) + commitment, err := hex.DecodeString(types.StripHexPrefix(args[2])) if err != nil { return fmt.Errorf("invalid commitment hex: %w", err) } @@ -63,7 +64,7 @@ func blobGetByCommitmentCmd() *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { addr, _ := cmd.Flags().GetString("rpc-addr") - commitment, err := hex.DecodeString(args[0]) + commitment, err := hex.DecodeString(types.StripHexPrefix(args[0])) if err != nil { return fmt.Errorf("invalid commitment hex: %w", err) } @@ -95,7 +96,7 @@ func blobListCmd() *cobra.Command { var namespaces [][]byte if nsHex != "" { - ns, err := hex.DecodeString(nsHex) + ns, err := hex.DecodeString(types.StripHexPrefix(nsHex)) if err != nil { return fmt.Errorf("invalid namespace hex: %w", err) } diff --git a/cmd/apex/main.go b/cmd/apex/main.go index 497d5da..eea6c44 100644 --- a/cmd/apex/main.go +++ b/cmd/apex/main.go @@ -22,6 +22,7 @@ import ( jsonrpcapi "github.com/evstack/apex/pkg/api/jsonrpc" "github.com/evstack/apex/pkg/fetch" "github.com/evstack/apex/pkg/metrics" + "github.com/evstack/apex/pkg/profile" "github.com/evstack/apex/pkg/store" syncer "github.com/evstack/apex/pkg/sync" "github.com/evstack/apex/pkg/types" @@ -151,6 +152,49 @@ func setupMetrics(cfg *config.Config) (metrics.Recorder, *metrics.Server) { return rec, srv } +func setupProfiling(cfg *config.Config) *profile.Server { + if !cfg.Profiling.Enabled { + return nil + } + srv := profile.NewServer(cfg.Profiling.ListenAddr, log.Logger) + go func() { + if err := srv.Start(); err != nil { + log.Error().Err(err).Msg("profiling server error") + } + }() + return srv +} + +func openDataSource(ctx context.Context, cfg *config.Config) (fetch.DataFetcher, fetch.ProofForwarder, error) { + switch cfg.DataSource.Type { + case "app": + appFetcher, err := fetch.NewCelestiaAppFetcher(cfg.DataSource.CelestiaAppURL, cfg.DataSource.AuthToken, log.Logger) + if err != nil { + return nil, nil, fmt.Errorf("create celestia-app fetcher: %w", err) + } + return appFetcher, nil, nil + case "node", "": + nodeFetcher, err := fetch.NewCelestiaNodeFetcher(ctx, cfg.DataSource.CelestiaNodeURL, cfg.DataSource.AuthToken, log.Logger) + if err != nil { + return nil, nil, fmt.Errorf("connect to celestia node: %w", err) + } + return nodeFetcher, nodeFetcher, nil + default: + return nil, nil, fmt.Errorf("unsupported data source type: %q", cfg.DataSource.Type) + } +} + +func openStore(ctx context.Context, cfg *config.Config) (store.Store, error) { + switch cfg.Storage.Type { + case "s3": + return store.NewS3Store(ctx, cfg.Storage.S3) + case "sqlite", "": + return store.Open(cfg.Storage.DBPath) + default: + return nil, fmt.Errorf("unsupported storage type: %q", cfg.Storage.Type) + } +} + func runIndexer(ctx context.Context, cfg *config.Config) error { // Parse namespaces from config. namespaces, err := cfg.ParsedNamespaces() @@ -159,14 +203,22 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { } rec, metricsSrv := setupMetrics(cfg) + profileSrv := setupProfiling(cfg) // Open store. - db, err := store.Open(cfg.Storage.DBPath) + db, err := openStore(ctx, cfg) if err != nil { return fmt.Errorf("open store: %w", err) } defer db.Close() //nolint:errcheck - db.SetMetrics(rec) + + // Wire metrics into the store. + switch s := db.(type) { + case *store.SQLiteStore: + s.SetMetrics(rec) + case *store.S3Store: + s.SetMetrics(rec) + } // Persist configured namespaces. ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) @@ -179,27 +231,9 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { } // Connect to data source. - var ( - dataFetcher fetch.DataFetcher - proofFwd fetch.ProofForwarder - ) - switch cfg.DataSource.Type { - case "app": - appFetcher, err := fetch.NewCelestiaAppFetcher(cfg.DataSource.CelestiaAppURL, cfg.DataSource.AuthToken, log.Logger) - if err != nil { - return fmt.Errorf("create celestia-app fetcher: %w", err) - } - dataFetcher = appFetcher - // celestia-app does not serve blob proofs; proofFwd stays nil. - case "node", "": - nodeFetcher, err := fetch.NewCelestiaNodeFetcher(ctx, cfg.DataSource.CelestiaNodeURL, cfg.DataSource.AuthToken, log.Logger) - if err != nil { - return fmt.Errorf("connect to celestia node: %w", err) - } - dataFetcher = nodeFetcher - proofFwd = nodeFetcher - default: - return fmt.Errorf("unsupported data source type: %q", cfg.DataSource.Type) + dataFetcher, proofFwd, err := openDataSource(ctx, cfg) + if err != nil { + return err } defer dataFetcher.Close() //nolint:errcheck @@ -263,7 +297,7 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { err = coord.Run(ctx) - gracefulShutdown(httpSrv, grpcSrv, metricsSrv) + gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv) if err != nil && !errors.Is(err, context.Canceled) { return fmt.Errorf("coordinator: %w", err) @@ -273,7 +307,7 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { return nil } -func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server) { +func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server) { stopped := make(chan struct{}) go func() { grpcSrv.GracefulStop() @@ -299,4 +333,9 @@ func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *me log.Error().Err(err).Msg("metrics server shutdown error") } } + if profileSrv != nil { + if err := profileSrv.Shutdown(shutdownCtx); err != nil { + log.Error().Err(err).Msg("profiling server shutdown error") + } + } } diff --git a/config/config.go b/config/config.go index cd2fc6f..5d5ec14 100644 --- a/config/config.go +++ b/config/config.go @@ -14,6 +14,7 @@ type Config struct { Sync SyncConfig `yaml:"sync"` Subscription SubscriptionConfig `yaml:"subscription"` Metrics MetricsConfig `yaml:"metrics"` + Profiling ProfilingConfig `yaml:"profiling"` Log LogConfig `yaml:"log"` } @@ -28,9 +29,24 @@ type DataSourceConfig struct { Namespaces []string `yaml:"namespaces"` } -// StorageConfig configures the SQLite database. +// StorageConfig configures the persistence backend. +// Type selects the backend: "sqlite" (default) uses a local SQLite file, +// "s3" uses an S3-compatible object store. type StorageConfig struct { - DBPath string `yaml:"db_path"` + Type string `yaml:"type"` // "sqlite" (default) or "s3" + DBPath string `yaml:"db_path"` // SQLite path (used when type=sqlite) + S3 *S3Config `yaml:"s3"` // S3 settings (used when type=s3) +} + +// S3Config configures an S3-compatible object store backend. +// Credentials are resolved via standard AWS SDK mechanisms +// (env vars, IAM role, shared credentials file). +type S3Config struct { + Bucket string `yaml:"bucket"` + Prefix string `yaml:"prefix"` + Region string `yaml:"region"` + Endpoint string `yaml:"endpoint"` // custom endpoint for MinIO, R2, Spaces + ChunkSize int `yaml:"chunk_size"` // heights per S3 object, default 64 } // RPCConfig configures the API servers. @@ -57,6 +73,12 @@ type MetricsConfig struct { ListenAddr string `yaml:"listen_addr"` } +// ProfilingConfig configures pprof profiling endpoints. +type ProfilingConfig struct { + Enabled bool `yaml:"enabled"` + ListenAddr string `yaml:"listen_addr"` +} + // LogConfig configures logging. type LogConfig struct { Level string `yaml:"level"` @@ -71,6 +93,7 @@ func DefaultConfig() Config { CelestiaNodeURL: "http://localhost:26658", }, Storage: StorageConfig{ + Type: "sqlite", DBPath: "apex.db", }, RPC: RPCConfig{ @@ -88,6 +111,10 @@ func DefaultConfig() Config { Enabled: true, ListenAddr: ":9091", }, + Profiling: ProfilingConfig{ + Enabled: false, + ListenAddr: "127.0.0.1:6061", + }, Log: LogConfig{ Level: "info", Format: "json", diff --git a/config/load.go b/config/load.go index 71b4b18..5211f8c 100644 --- a/config/load.go +++ b/config/load.go @@ -45,9 +45,20 @@ data_source: namespaces: [] storage: - # Path to the SQLite database file + # Storage backend: "sqlite" (default) or "s3" + type: "sqlite" + + # Path to the SQLite database file (used when type: "sqlite") db_path: "apex.db" + # S3-compatible object store settings (used when type: "s3") + # s3: + # bucket: "my-apex-bucket" + # prefix: "indexer" + # region: "us-east-1" + # endpoint: "" # custom endpoint for MinIO, R2, etc. + # chunk_size: 64 # heights per S3 object + rpc: # Address for the JSON-RPC API server (HTTP/WebSocket) listen_addr: ":8080" @@ -72,6 +83,12 @@ metrics: # Address for the metrics server listen_addr: ":9091" +profiling: + # Enable pprof endpoints (/debug/pprof/*) + enabled: false + # Bind address for profiling HTTP server (prefer loopback) + listen_addr: "127.0.0.1:6061" + log: # Log level: trace, debug, info, warn, error, fatal, panic level: "info" @@ -133,12 +150,40 @@ func validateDataSource(ds *DataSourceConfig) error { return nil } +func validateStorage(s *StorageConfig) error { + switch s.Type { + case "s3": + if s.S3 == nil { + return fmt.Errorf("storage.s3 is required when storage.type is \"s3\"") + } + if s.S3.Bucket == "" { + return fmt.Errorf("storage.s3.bucket is required") + } + if s.S3.Region == "" && s.S3.Endpoint == "" { + return fmt.Errorf("storage.s3.region is required (unless endpoint is set)") + } + if s.S3.ChunkSize == 0 { + s.S3.ChunkSize = 64 + } + if s.S3.ChunkSize < 0 { + return fmt.Errorf("storage.s3.chunk_size must be positive") + } + case "sqlite", "": + if s.DBPath == "" { + return fmt.Errorf("storage.db_path is required") + } + default: + return fmt.Errorf("storage.type %q is invalid; must be \"sqlite\" or \"s3\"", s.Type) + } + return nil +} + func validate(cfg *Config) error { if err := validateDataSource(&cfg.DataSource); err != nil { return err } - if cfg.Storage.DBPath == "" { - return fmt.Errorf("storage.db_path is required") + if err := validateStorage(&cfg.Storage); err != nil { + return err } if cfg.RPC.ListenAddr == "" { return fmt.Errorf("rpc.listen_addr is required") @@ -158,6 +203,9 @@ func validate(cfg *Config) error { if cfg.Metrics.Enabled && cfg.Metrics.ListenAddr == "" { return fmt.Errorf("metrics.listen_addr is required when metrics are enabled") } + if cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" { + return fmt.Errorf("profiling.listen_addr is required when profiling is enabled") + } if !validLogLevels[cfg.Log.Level] { return fmt.Errorf("log.level %q is invalid; must be one of trace/debug/info/warn/error/fatal/panic", cfg.Log.Level) } diff --git a/config/load_test.go b/config/load_test.go new file mode 100644 index 0000000..97ac9ee --- /dev/null +++ b/config/load_test.go @@ -0,0 +1,71 @@ +package config + +import ( + "os" + "path/filepath" + "strings" + "testing" +) + +func TestDefaultConfigProfilingDefaults(t *testing.T) { + t.Parallel() + + cfg := DefaultConfig() + if cfg.Profiling.Enabled { + t.Fatal("profiling.enabled default = true, want false") + } + if cfg.Profiling.ListenAddr != "127.0.0.1:6061" { + t.Fatalf("profiling.listen_addr default = %q, want %q", cfg.Profiling.ListenAddr, "127.0.0.1:6061") + } +} + +func TestLoadProfilingEnabledRequiresListenAddr(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + path := filepath.Join(dir, "config.yaml") + content := ` +data_source: + type: "node" + celestia_node_url: "http://localhost:26658" + +storage: + type: "sqlite" + db_path: "apex.db" + +rpc: + listen_addr: ":8080" + grpc_listen_addr: ":9090" + +sync: + start_height: 1 + batch_size: 64 + concurrency: 4 + +subscription: + buffer_size: 64 + +metrics: + enabled: true + listen_addr: ":9091" + +profiling: + enabled: true + listen_addr: "" + +log: + level: "info" + format: "json" +` + if err := os.WriteFile(path, []byte(content), 0o644); err != nil { + t.Fatalf("WriteFile: %v", err) + } + + _, err := Load(path) + if err == nil { + t.Fatal("expected validation error, got nil") + } + if !strings.Contains(err.Error(), "profiling.listen_addr is required when profiling is enabled") { + t.Fatalf("unexpected error: %v", err) + } +} diff --git a/go.mod b/go.mod index 5e3915c..2466d4e 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,10 @@ module github.com/evstack/apex go 1.25.0 require ( + github.com/aws/aws-sdk-go-v2 v1.41.1 + github.com/aws/aws-sdk-go-v2/config v1.32.9 + github.com/aws/aws-sdk-go-v2/credentials v1.19.9 + github.com/aws/aws-sdk-go-v2/service/s3 v1.96.0 github.com/filecoin-project/go-jsonrpc v0.10.1 github.com/gorilla/websocket v1.4.2 github.com/prometheus/client_golang v1.23.2 @@ -15,6 +19,21 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.17 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.8 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.17 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.17 // indirect + github.com/aws/aws-sdk-go-v2/service/signin v1.0.5 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.30.10 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.14 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 // indirect + github.com/aws/smithy-go v1.24.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect diff --git a/go.sum b/go.sum index f30bace..fa3f4c9 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,44 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/aws/aws-sdk-go-v2 v1.41.1 h1:ABlyEARCDLN034NhxlRUSZr4l71mh+T5KAeGh6cerhU= +github.com/aws/aws-sdk-go-v2 v1.41.1/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 h1:489krEF9xIGkOaaX3CE/Be2uWjiXrkCH6gUX+bZA/BU= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4/go.mod h1:IOAPF6oT9KCsceNTvvYMNHy0+kMF8akOjeDvPENWxp4= +github.com/aws/aws-sdk-go-v2/config v1.32.9 h1:ktda/mtAydeObvJXlHzyGpK1xcsLaP16zfUPDGoW90A= +github.com/aws/aws-sdk-go-v2/config v1.32.9/go.mod h1:U+fCQ+9QKsLW786BCfEjYRj34VVTbPdsLP3CHSYXMOI= +github.com/aws/aws-sdk-go-v2/credentials v1.19.9 h1:sWvTKsyrMlJGEuj/WgrwilpoJ6Xa1+KhIpGdzw7mMU8= +github.com/aws/aws-sdk-go-v2/credentials v1.19.9/go.mod h1:+J44MBhmfVY/lETFiKI+klz0Vym2aCmIjqgClMmW82w= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17 h1:I0GyV8wiYrP8XpA70g1HBcQO1JlQxCMTW9npl5UbDHY= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17/go.mod h1:tyw7BOl5bBe/oqvoIeECFJjMdzXoa/dfVz3QQ5lgHGA= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17 h1:xOLELNKGp2vsiteLsvLPwxC+mYmO6OZ8PYgiuPJzF8U= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17/go.mod h1:5M5CI3D12dNOtH3/mk6minaRwI2/37ifCURZISxA/IQ= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17 h1:WWLqlh79iO48yLkj1v3ISRNiv+3KdQoZ6JWyfcsyQik= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17/go.mod h1:EhG22vHRrvF8oXSTYStZhJc1aUgKtnJe+aOiFEV90cM= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.17 h1:JqcdRG//czea7Ppjb+g/n4o8i/R50aTBHkA7vu0lK+k= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.17/go.mod h1:CO+WeGmIdj/MlPel2KwID9Gt7CNq4M65HUfBW97liM0= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 h1:0ryTNEdJbzUCEWkVXEXoqlXV72J5keC1GvILMOuD00E= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4/go.mod h1:HQ4qwNZh32C3CBeO6iJLQlgtMzqeG17ziAA/3KDJFow= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.8 h1:Z5EiPIzXKewUQK0QTMkutjiaPVeVYXX7KIqhXu/0fXs= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.8/go.mod h1:FsTpJtvC4U1fyDXk7c71XoDv3HlRm8V3NiYLeYLh5YE= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.17 h1:RuNSMoozM8oXlgLG/n6WLaFGoea7/CddrCfIiSA+xdY= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.17/go.mod h1:F2xxQ9TZz5gDWsclCtPQscGpP0VUOc8RqgFM3vDENmU= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.17 h1:bGeHBsGZx0Dvu/eJC0Lh9adJa3M1xREcndxLNZlve2U= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.17/go.mod h1:dcW24lbU0CzHusTE8LLHhRLI42ejmINN8Lcr22bwh/g= +github.com/aws/aws-sdk-go-v2/service/s3 v1.96.0 h1:oeu8VPlOre74lBA/PMhxa5vewaMIMmILM+RraSyB8KA= +github.com/aws/aws-sdk-go-v2/service/s3 v1.96.0/go.mod h1:5jggDlZ2CLQhwJBiZJb4vfk4f0GxWdEDruWKEJ1xOdo= +github.com/aws/aws-sdk-go-v2/service/signin v1.0.5 h1:VrhDvQib/i0lxvr3zqlUwLwJP4fpmpyD9wYG1vfSu+Y= +github.com/aws/aws-sdk-go-v2/service/signin v1.0.5/go.mod h1:k029+U8SY30/3/ras4G/Fnv/b88N4mAfliNn08Dem4M= +github.com/aws/aws-sdk-go-v2/service/sso v1.30.10 h1:+VTRawC4iVY58pS/lzpo0lnoa/SYNGF4/B/3/U5ro8Y= +github.com/aws/aws-sdk-go-v2/service/sso v1.30.10/go.mod h1:yifAsgBxgJWn3ggx70A3urX2AN49Y5sJTD1UQFlfqBw= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.14 h1:0jbJeuEHlwKJ9PfXtpSFc4MF+WIWORdhN1n30ITZGFM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.14/go.mod h1:sTGThjphYE4Ohw8vJiRStAcu3rbjtXRsdNB0TvZ5wwo= +github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 h1:5fFjR/ToSOzB2OQ/XqWpZBmNvmP/pJ1jOWYlFDJTjRQ= +github.com/aws/aws-sdk-go-v2/service/sts v1.41.6/go.mod h1:qgFDZQSD/Kys7nJnVqYlWKnh0SSdMjAi0uSwON4wgYQ= +github.com/aws/smithy-go v1.24.0 h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk= +github.com/aws/smithy-go v1.24.0/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index eb682e4..23ba57b 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -20,6 +20,8 @@ type Recorder interface { IncBlobsProcessed(n int) IncHeadersProcessed(n int) ObserveBatchDuration(d time.Duration) + ObserveBackfillStageDuration(stage string, d time.Duration) + IncBackfillStageErrors(stage string) // API metrics IncAPIRequest(method, status string) @@ -39,17 +41,19 @@ type nopRecorder struct{} // Nop returns a Recorder that discards all metrics. func Nop() Recorder { return nopRecorder{} } -func (nopRecorder) SetSyncState(string) {} -func (nopRecorder) SetLatestHeight(uint64) {} -func (nopRecorder) SetNetworkHeight(uint64) {} -func (nopRecorder) IncBlobsProcessed(int) {} -func (nopRecorder) IncHeadersProcessed(int) {} -func (nopRecorder) ObserveBatchDuration(time.Duration) {} -func (nopRecorder) IncAPIRequest(string, string) {} -func (nopRecorder) ObserveAPIRequestDuration(string, time.Duration) {} -func (nopRecorder) ObserveStoreQueryDuration(string, time.Duration) {} -func (nopRecorder) SetActiveSubscriptions(int) {} -func (nopRecorder) IncEventsDropped() {} +func (nopRecorder) SetSyncState(string) {} +func (nopRecorder) SetLatestHeight(uint64) {} +func (nopRecorder) SetNetworkHeight(uint64) {} +func (nopRecorder) IncBlobsProcessed(int) {} +func (nopRecorder) IncHeadersProcessed(int) {} +func (nopRecorder) ObserveBatchDuration(time.Duration) {} +func (nopRecorder) ObserveBackfillStageDuration(string, time.Duration) {} +func (nopRecorder) IncBackfillStageErrors(string) {} +func (nopRecorder) IncAPIRequest(string, string) {} +func (nopRecorder) ObserveAPIRequestDuration(string, time.Duration) {} +func (nopRecorder) ObserveStoreQueryDuration(string, time.Duration) {} +func (nopRecorder) SetActiveSubscriptions(int) {} +func (nopRecorder) IncEventsDropped() {} // PromRecorder implements Recorder using Prometheus metrics. type PromRecorder struct { @@ -60,6 +64,8 @@ type PromRecorder struct { blobsProcessed prometheus.Counter headersProcessed prometheus.Counter batchDuration prometheus.Histogram + backfillStageDur *prometheus.HistogramVec + backfillStageErr *prometheus.CounterVec apiRequests *prometheus.CounterVec apiDuration *prometheus.HistogramVec storeQueryDur *prometheus.HistogramVec @@ -117,6 +123,17 @@ func NewPromRecorder(reg prometheus.Registerer, version string) *PromRecorder { Buckets: prometheus.DefBuckets, }), + backfillStageDur: factory.NewHistogramVec(prometheus.HistogramOpts{ + Name: "apex_backfill_stage_duration_seconds", + Help: "Per-stage duration during backfill height processing.", + Buckets: prometheus.DefBuckets, + }, []string{"stage"}), + + backfillStageErr: factory.NewCounterVec(prometheus.CounterOpts{ + Name: "apex_backfill_stage_errors_total", + Help: "Total backfill stage errors by stage.", + }, []string{"stage"}), + apiRequests: factory.NewCounterVec(prometheus.CounterOpts{ Name: "apex_api_requests_total", Help: "Total API requests by method and status.", @@ -201,6 +218,14 @@ func (r *PromRecorder) ObserveBatchDuration(d time.Duration) { r.batchDuration.Observe(d.Seconds()) } +func (r *PromRecorder) ObserveBackfillStageDuration(stage string, d time.Duration) { + r.backfillStageDur.WithLabelValues(stage).Observe(d.Seconds()) +} + +func (r *PromRecorder) IncBackfillStageErrors(stage string) { + r.backfillStageErr.WithLabelValues(stage).Inc() +} + func (r *PromRecorder) IncAPIRequest(method, status string) { r.apiRequests.WithLabelValues(method, status).Inc() } diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index 8023bf4..5496b0b 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -17,6 +17,8 @@ func TestPromRecorderRegisters(t *testing.T) { r.IncBlobsProcessed(10) r.IncHeadersProcessed(5) r.ObserveBatchDuration(500 * time.Millisecond) + r.ObserveBackfillStageDuration("fetch_header", 3*time.Millisecond) + r.IncBackfillStageErrors("store_header") r.IncAPIRequest("BlobGet", "ok") r.ObserveAPIRequestDuration("BlobGet", 10*time.Millisecond) r.ObserveStoreQueryDuration("GetBlobs", 2*time.Millisecond) @@ -32,17 +34,29 @@ func TestPromRecorderRegisters(t *testing.T) { t.Fatal("expected at least one metric family") } - // Verify apex_info is present. - found := false + // Verify new and existing key metrics are present. + foundInfo := false + foundBackfillDur := false + foundBackfillErr := false for _, fam := range families { - if fam.GetName() == "apex_info" { - found = true - break + switch fam.GetName() { + case "apex_info": + foundInfo = true + case "apex_backfill_stage_duration_seconds": + foundBackfillDur = true + case "apex_backfill_stage_errors_total": + foundBackfillErr = true } } - if !found { + if !foundInfo { t.Error("apex_info metric not found") } + if !foundBackfillDur { + t.Error("apex_backfill_stage_duration_seconds metric not found") + } + if !foundBackfillErr { + t.Error("apex_backfill_stage_errors_total metric not found") + } } func TestNopRecorderDoesNotPanic(t *testing.T) { @@ -53,6 +67,8 @@ func TestNopRecorderDoesNotPanic(t *testing.T) { r.IncBlobsProcessed(10) r.IncHeadersProcessed(5) r.ObserveBatchDuration(500 * time.Millisecond) + r.ObserveBackfillStageDuration("fetch_header", 3*time.Millisecond) + r.IncBackfillStageErrors("store_header") r.IncAPIRequest("BlobGet", "ok") r.ObserveAPIRequestDuration("BlobGet", 10*time.Millisecond) r.ObserveStoreQueryDuration("GetBlobs", 2*time.Millisecond) diff --git a/pkg/profile/server.go b/pkg/profile/server.go new file mode 100644 index 0000000..7710da8 --- /dev/null +++ b/pkg/profile/server.go @@ -0,0 +1,50 @@ +package profile + +import ( + "context" + "errors" + "net/http" + "net/http/pprof" + "time" + + "github.com/rs/zerolog" +) + +// Server serves pprof endpoints over HTTP. +type Server struct { + httpSrv *http.Server + log zerolog.Logger +} + +// NewServer creates a profiling HTTP server listening on addr. +func NewServer(addr string, log zerolog.Logger) *Server { + mux := http.NewServeMux() + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + + return &Server{ + httpSrv: &http.Server{ + Addr: addr, + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + }, + log: log.With().Str("component", "profile-server").Logger(), + } +} + +// Start begins serving profiles. It blocks until the server is shut down. +func (s *Server) Start() error { + s.log.Info().Str("addr", s.httpSrv.Addr).Msg("profiling server listening") + if err := s.httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + return nil +} + +// Shutdown gracefully stops the profiling server. +func (s *Server) Shutdown(ctx context.Context) error { + return s.httpSrv.Shutdown(ctx) +} diff --git a/pkg/store/s3.go b/pkg/store/s3.go new file mode 100644 index 0000000..4f33660 --- /dev/null +++ b/pkg/store/s3.go @@ -0,0 +1,861 @@ +package store + +import ( + "bytes" + "context" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "sort" + "strings" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + + "github.com/evstack/apex/config" + "github.com/evstack/apex/pkg/metrics" + "github.com/evstack/apex/pkg/types" +) + +// s3Client abstracts the S3 operations used by S3Store, enabling mock-based testing. +type s3Client interface { + GetObject(ctx context.Context, input *s3.GetObjectInput, opts ...func(*s3.Options)) (*s3.GetObjectOutput, error) + PutObject(ctx context.Context, input *s3.PutObjectInput, opts ...func(*s3.Options)) (*s3.PutObjectOutput, error) +} + +// blobChunkKey identifies a blob chunk in S3 by namespace and chunk start height. +type blobChunkKey struct { + namespace types.Namespace + chunkStart uint64 +} + +// commitEntry is a buffered commitment index write. +type commitEntry struct { + commitmentHex string + pointer commitPointer +} + +// commitPointer is the JSON content of a commitment index object. +type commitPointer struct { + Namespace string `json:"namespace"` + Height uint64 `json:"height"` + Index int `json:"index"` +} + +// s3Blob is the JSON representation of a blob in an S3 chunk object. +type s3Blob struct { + Height uint64 `json:"height"` + Namespace string `json:"namespace"` + Data []byte `json:"data"` + Commitment []byte `json:"commitment"` + ShareVersion uint32 `json:"share_version"` + Signer []byte `json:"signer"` + Index int `json:"index"` +} + +// s3Header is the JSON representation of a header in an S3 chunk object. +type s3Header struct { + Height uint64 `json:"height"` + Hash []byte `json:"hash"` + DataHash []byte `json:"data_hash"` + Time time.Time `json:"time"` + RawHeader []byte `json:"raw_header"` +} + +// maxFlushConcurrency bounds parallel S3 I/O during flush. +const maxFlushConcurrency = 8 + +// S3Store implements Store using an S3-compatible object store. +// Writes are buffered in memory and flushed to S3 at checkpoint boundaries +// (SetSyncState calls and Close). +type S3Store struct { + client s3Client + bucket string + prefix string + chunkSize uint64 + metrics metrics.Recorder + + mu sync.Mutex + blobBuf map[blobChunkKey][]types.Blob + headerBuf map[uint64][]*types.Header + commitBuf []commitEntry +} + +// NewS3Store creates a new S3Store from the given config. +func NewS3Store(ctx context.Context, cfg *config.S3Config) (*S3Store, error) { + opts := []func(*awsconfig.LoadOptions) error{} + if cfg.Region != "" { + opts = append(opts, awsconfig.WithRegion(cfg.Region)) + } + awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...) + if err != nil { + return nil, fmt.Errorf("load AWS config: %w", err) + } + + var s3Opts []func(*s3.Options) + if cfg.Endpoint != "" { + s3Opts = append(s3Opts, func(o *s3.Options) { + o.BaseEndpoint = aws.String(cfg.Endpoint) + o.UsePathStyle = true + }) + } + + client := s3.NewFromConfig(awsCfg, s3Opts...) + + chunkSize := uint64(cfg.ChunkSize) + if chunkSize == 0 { + chunkSize = 64 + } + + return newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize), nil +} + +// NewS3StoreWithStaticCredentials creates an S3Store with explicit credentials. +// Useful for testing against MinIO or other S3-compatible services. +func NewS3StoreWithStaticCredentials(ctx context.Context, cfg *config.S3Config, accessKey, secretKey, token string) (*S3Store, error) { + opts := []func(*awsconfig.LoadOptions) error{ + awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, token)), + } + if cfg.Region != "" { + opts = append(opts, awsconfig.WithRegion(cfg.Region)) + } + awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...) + if err != nil { + return nil, fmt.Errorf("load AWS config: %w", err) + } + + var s3Opts []func(*s3.Options) + if cfg.Endpoint != "" { + s3Opts = append(s3Opts, func(o *s3.Options) { + o.BaseEndpoint = aws.String(cfg.Endpoint) + o.UsePathStyle = true + }) + } + + client := s3.NewFromConfig(awsCfg, s3Opts...) + + chunkSize := uint64(cfg.ChunkSize) + if chunkSize == 0 { + chunkSize = 64 + } + + return newS3Store(client, cfg.Bucket, cfg.Prefix, chunkSize), nil +} + +// newS3Store creates an S3Store with an injected client (for testing). +func newS3Store(client s3Client, bucket, prefix string, chunkSize uint64) *S3Store { + return &S3Store{ + client: client, + bucket: bucket, + prefix: strings.TrimSuffix(prefix, "/"), + chunkSize: chunkSize, + metrics: metrics.Nop(), + blobBuf: make(map[blobChunkKey][]types.Blob), + headerBuf: make(map[uint64][]*types.Header), + } +} + +// SetMetrics sets the metrics recorder. +func (s *S3Store) SetMetrics(m metrics.Recorder) { + s.metrics = m +} + +// chunkStart returns the chunk start height for a given height. +func (s *S3Store) chunkStart(height uint64) uint64 { + return (height / s.chunkSize) * s.chunkSize +} + +// key builds an S3 object key with the configured prefix. +func (s *S3Store) key(parts ...string) string { + if s.prefix == "" { + return strings.Join(parts, "/") + } + return s.prefix + "/" + strings.Join(parts, "/") +} + +func chunkFileName(start uint64) string { + return fmt.Sprintf("chunk_%016d.json", start) +} + +// --- Write methods (buffer) --- + +func (s *S3Store) PutBlobs(ctx context.Context, blobs []types.Blob) error { + if len(blobs) == 0 { + return nil + } + start := time.Now() + defer func() { s.metrics.ObserveStoreQueryDuration("PutBlobs", time.Since(start)) }() + + s.mu.Lock() + defer s.mu.Unlock() + + for i := range blobs { + b := &blobs[i] + key := blobChunkKey{namespace: b.Namespace, chunkStart: s.chunkStart(b.Height)} + s.blobBuf[key] = append(s.blobBuf[key], *b) + + s.commitBuf = append(s.commitBuf, commitEntry{ + commitmentHex: hex.EncodeToString(b.Commitment), + pointer: commitPointer{ + Namespace: b.Namespace.String(), + Height: b.Height, + Index: b.Index, + }, + }) + } + return nil +} + +func (s *S3Store) PutHeader(ctx context.Context, header *types.Header) error { + start := time.Now() + defer func() { s.metrics.ObserveStoreQueryDuration("PutHeader", time.Since(start)) }() + + s.mu.Lock() + defer s.mu.Unlock() + + cs := s.chunkStart(header.Height) + s.headerBuf[cs] = append(s.headerBuf[cs], header) + return nil +} + +func (s *S3Store) PutNamespace(ctx context.Context, ns types.Namespace) error { + s.mu.Lock() + defer s.mu.Unlock() + + key := s.key("meta", "namespaces.json") + + existing, err := s.getObject(ctx, key) + if err != nil && !isNotFound(err) { + return fmt.Errorf("get namespaces: %w", err) + } + + namespaces := make([]string, 0, 1) + if existing != nil { + if err := json.Unmarshal(existing, &namespaces); err != nil { + return fmt.Errorf("decode namespaces: %w", err) + } + } + + nsHex := ns.String() + for _, n := range namespaces { + if n == nsHex { + return nil + } + } + namespaces = append(namespaces, nsHex) + + data, err := json.Marshal(namespaces) + if err != nil { + return fmt.Errorf("encode namespaces: %w", err) + } + return s.putObject(ctx, key, data) +} + +// --- Read methods --- + +func (s *S3Store) GetBlob(ctx context.Context, ns types.Namespace, height uint64, index int) (*types.Blob, error) { + start := time.Now() + defer func() { s.metrics.ObserveStoreQueryDuration("GetBlob", time.Since(start)) }() + + // Check buffer first. + if b := s.findBlobInBuffer(ns, height, index); b != nil { + return b, nil + } + + cs := s.chunkStart(height) + key := s.key("blobs", ns.String(), chunkFileName(cs)) + + data, err := s.getObject(ctx, key) + if err != nil { + if isNotFound(err) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("get blob chunk: %w", err) + } + + blobs, err := decodeS3Blobs(data) + if err != nil { + return nil, err + } + + for i := range blobs { + if blobs[i].Height == height && blobs[i].Index == index && blobs[i].Namespace == ns { + return &blobs[i], nil + } + } + return nil, ErrNotFound +} + +func (s *S3Store) GetBlobs(ctx context.Context, ns types.Namespace, startHeight, endHeight uint64, limit, offset int) ([]types.Blob, error) { + start := time.Now() + defer func() { s.metrics.ObserveStoreQueryDuration("GetBlobs", time.Since(start)) }() + + buffered := s.collectBufferedBlobs(ns, startHeight, endHeight) + + s3Blobs, err := s.fetchBlobChunks(ctx, ns, startHeight, endHeight) + if err != nil { + return nil, err + } + + allBlobs := deduplicateBlobs(append(buffered, s3Blobs...)) + + sort.Slice(allBlobs, func(i, j int) bool { + if allBlobs[i].Height != allBlobs[j].Height { + return allBlobs[i].Height < allBlobs[j].Height + } + return allBlobs[i].Index < allBlobs[j].Index + }) + + return applyOffsetLimit(allBlobs, offset, limit), nil +} + +// collectBufferedBlobs returns in-buffer blobs matching the namespace and height range. +func (s *S3Store) collectBufferedBlobs(ns types.Namespace, startHeight, endHeight uint64) []types.Blob { + s.mu.Lock() + defer s.mu.Unlock() + + var result []types.Blob + for key, bufs := range s.blobBuf { + if key.namespace != ns { + continue + } + for i := range bufs { + if bufs[i].Height >= startHeight && bufs[i].Height <= endHeight { + result = append(result, bufs[i]) + } + } + } + return result +} + +// fetchBlobChunks reads all S3 blob chunks overlapping the height range for a namespace. +func (s *S3Store) fetchBlobChunks(ctx context.Context, ns types.Namespace, startHeight, endHeight uint64) ([]types.Blob, error) { + firstChunk := s.chunkStart(startHeight) + lastChunk := s.chunkStart(endHeight) + + var allBlobs []types.Blob + for cs := firstChunk; cs <= lastChunk; cs += s.chunkSize { + key := s.key("blobs", ns.String(), chunkFileName(cs)) + data, err := s.getObject(ctx, key) + if err != nil { + if isNotFound(err) { + continue + } + return nil, fmt.Errorf("get blob chunk at %d: %w", cs, err) + } + + blobs, err := decodeS3Blobs(data) + if err != nil { + return nil, err + } + for i := range blobs { + if blobs[i].Height >= startHeight && blobs[i].Height <= endHeight && blobs[i].Namespace == ns { + allBlobs = append(allBlobs, blobs[i]) + } + } + } + return allBlobs, nil +} + +func applyOffsetLimit(items []types.Blob, offset, limit int) []types.Blob { + if offset > 0 { + if offset >= len(items) { + return nil + } + items = items[offset:] + } + if limit > 0 && limit < len(items) { + items = items[:limit] + } + return items +} + +func (s *S3Store) GetBlobByCommitment(ctx context.Context, commitment []byte) (*types.Blob, error) { + start := time.Now() + defer func() { s.metrics.ObserveStoreQueryDuration("GetBlobByCommitment", time.Since(start)) }() + + // Check buffer first. + commitHex := hex.EncodeToString(commitment) + + s.mu.Lock() + for _, entry := range s.commitBuf { + if entry.commitmentHex == commitHex { + ns, err := types.NamespaceFromHex(entry.pointer.Namespace) + if err == nil { + if b := s.findBlobInBufferLocked(ns, entry.pointer.Height, entry.pointer.Index); b != nil { + s.mu.Unlock() + return b, nil + } + } + } + } + s.mu.Unlock() + + // Look up commitment index in S3. + key := s.key("index", "commitments", commitHex+".json") + data, err := s.getObject(ctx, key) + if err != nil { + if isNotFound(err) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("get commitment index: %w", err) + } + + var ptr commitPointer + if err := json.Unmarshal(data, &ptr); err != nil { + return nil, fmt.Errorf("decode commitment pointer: %w", err) + } + + ns, err := types.NamespaceFromHex(ptr.Namespace) + if err != nil { + return nil, fmt.Errorf("parse namespace from commitment index: %w", err) + } + + return s.GetBlob(ctx, ns, ptr.Height, ptr.Index) +} + +func (s *S3Store) GetHeader(ctx context.Context, height uint64) (*types.Header, error) { + start := time.Now() + defer func() { s.metrics.ObserveStoreQueryDuration("GetHeader", time.Since(start)) }() + + // Check buffer first. + if h := s.findHeaderInBuffer(height); h != nil { + return h, nil + } + + cs := s.chunkStart(height) + key := s.key("headers", chunkFileName(cs)) + + data, err := s.getObject(ctx, key) + if err != nil { + if isNotFound(err) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("get header chunk: %w", err) + } + + headers, err := decodeS3Headers(data) + if err != nil { + return nil, err + } + for i := range headers { + if headers[i].Height == height { + return &headers[i], nil + } + } + return nil, ErrNotFound +} + +func (s *S3Store) GetNamespaces(ctx context.Context) ([]types.Namespace, error) { + key := s.key("meta", "namespaces.json") + data, err := s.getObject(ctx, key) + if err != nil { + if isNotFound(err) { + return nil, nil + } + return nil, fmt.Errorf("get namespaces: %w", err) + } + + var hexList []string + if err := json.Unmarshal(data, &hexList); err != nil { + return nil, fmt.Errorf("decode namespaces: %w", err) + } + + namespaces := make([]types.Namespace, 0, len(hexList)) + for _, h := range hexList { + ns, err := types.NamespaceFromHex(h) + if err != nil { + return nil, fmt.Errorf("parse namespace %q: %w", h, err) + } + namespaces = append(namespaces, ns) + } + return namespaces, nil +} + +func (s *S3Store) GetSyncState(ctx context.Context) (*types.SyncStatus, error) { + start := time.Now() + defer func() { s.metrics.ObserveStoreQueryDuration("GetSyncState", time.Since(start)) }() + + key := s.key("meta", "sync_state.json") + data, err := s.getObject(ctx, key) + if err != nil { + if isNotFound(err) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("get sync state: %w", err) + } + + var state struct { + State int `json:"state"` + LatestHeight uint64 `json:"latest_height"` + NetworkHeight uint64 `json:"network_height"` + } + if err := json.Unmarshal(data, &state); err != nil { + return nil, fmt.Errorf("decode sync state: %w", err) + } + return &types.SyncStatus{ + State: types.SyncState(state.State), + LatestHeight: state.LatestHeight, + NetworkHeight: state.NetworkHeight, + }, nil +} + +func (s *S3Store) SetSyncState(ctx context.Context, status types.SyncStatus) error { + // Flush buffered data first — this is the checkpoint boundary. + if err := s.flush(ctx); err != nil { + return fmt.Errorf("flush before sync state: %w", err) + } + + data, err := json.Marshal(struct { + State int `json:"state"` + LatestHeight uint64 `json:"latest_height"` + NetworkHeight uint64 `json:"network_height"` + }{ + State: int(status.State), + LatestHeight: status.LatestHeight, + NetworkHeight: status.NetworkHeight, + }) + if err != nil { + return fmt.Errorf("encode sync state: %w", err) + } + + key := s.key("meta", "sync_state.json") + return s.putObject(ctx, key, data) +} + +func (s *S3Store) Close() error { + return s.flush(context.Background()) +} + +// --- Flush --- + +// flush drains the write buffer to S3. Called at checkpoint boundaries. +func (s *S3Store) flush(ctx context.Context) error { + s.mu.Lock() + blobBuf := s.blobBuf + headerBuf := s.headerBuf + commitBuf := s.commitBuf + s.blobBuf = make(map[blobChunkKey][]types.Blob) + s.headerBuf = make(map[uint64][]*types.Header) + s.commitBuf = nil + s.mu.Unlock() + + if len(blobBuf) == 0 && len(headerBuf) == 0 && len(commitBuf) == 0 { + return nil + } + + // Use a semaphore to bound concurrency. + sem := make(chan struct{}, maxFlushConcurrency) + var ( + mu sync.Mutex + errs []error + ) + addErr := func(err error) { + mu.Lock() + errs = append(errs, err) + mu.Unlock() + } + + var wg sync.WaitGroup + + // Flush blob chunks. + for key, blobs := range blobBuf { + wg.Add(1) + go func(key blobChunkKey, blobs []types.Blob) { + defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + if err := s.flushBlobChunk(ctx, key, blobs); err != nil { + addErr(err) + } + }(key, blobs) + } + + // Flush header chunks. + for cs, headers := range headerBuf { + wg.Add(1) + go func(cs uint64, headers []*types.Header) { + defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + if err := s.flushHeaderChunk(ctx, cs, headers); err != nil { + addErr(err) + } + }(cs, headers) + } + + // Flush commitment indices. + for _, entry := range commitBuf { + wg.Add(1) + go func(e commitEntry) { + defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + if err := s.flushCommitIndex(ctx, e); err != nil { + addErr(err) + } + }(entry) + } + + wg.Wait() + return errors.Join(errs...) +} + +func (s *S3Store) flushBlobChunk(ctx context.Context, key blobChunkKey, newBlobs []types.Blob) error { + objKey := s.key("blobs", key.namespace.String(), chunkFileName(key.chunkStart)) + + // Read existing chunk (may 404). + existing, err := s.getObject(ctx, objKey) + if err != nil && !isNotFound(err) { + return fmt.Errorf("read blob chunk for merge: %w", err) + } + + var merged []types.Blob + if existing != nil { + merged, err = decodeS3Blobs(existing) + if err != nil { + return fmt.Errorf("decode existing blob chunk: %w", err) + } + } + + merged = append(merged, newBlobs...) + merged = deduplicateBlobs(merged) + sort.Slice(merged, func(i, j int) bool { + if merged[i].Height != merged[j].Height { + return merged[i].Height < merged[j].Height + } + return merged[i].Index < merged[j].Index + }) + + data, err := encodeS3Blobs(merged) + if err != nil { + return fmt.Errorf("encode blob chunk: %w", err) + } + return s.putObject(ctx, objKey, data) +} + +func (s *S3Store) flushHeaderChunk(ctx context.Context, cs uint64, newHeaders []*types.Header) error { + objKey := s.key("headers", chunkFileName(cs)) + + existing, err := s.getObject(ctx, objKey) + if err != nil && !isNotFound(err) { + return fmt.Errorf("read header chunk for merge: %w", err) + } + + var merged []types.Header + if existing != nil { + merged, err = decodeS3Headers(existing) + if err != nil { + return fmt.Errorf("decode existing header chunk: %w", err) + } + } + + for _, h := range newHeaders { + merged = append(merged, *h) + } + + merged = deduplicateHeaders(merged) + sort.Slice(merged, func(i, j int) bool { + return merged[i].Height < merged[j].Height + }) + + data, err := encodeS3Headers(merged) + if err != nil { + return fmt.Errorf("encode header chunk: %w", err) + } + return s.putObject(ctx, objKey, data) +} + +func (s *S3Store) flushCommitIndex(ctx context.Context, e commitEntry) error { + key := s.key("index", "commitments", e.commitmentHex+".json") + data, err := json.Marshal(e.pointer) + if err != nil { + return fmt.Errorf("encode commitment index: %w", err) + } + return s.putObject(ctx, key, data) +} + +// --- Buffer lookup helpers --- + +func (s *S3Store) findBlobInBuffer(ns types.Namespace, height uint64, index int) *types.Blob { + s.mu.Lock() + defer s.mu.Unlock() + return s.findBlobInBufferLocked(ns, height, index) +} + +func (s *S3Store) findBlobInBufferLocked(ns types.Namespace, height uint64, index int) *types.Blob { + key := blobChunkKey{namespace: ns, chunkStart: s.chunkStart(height)} + for i := range s.blobBuf[key] { + b := &s.blobBuf[key][i] + if b.Height == height && b.Index == index { + cp := *b + return &cp + } + } + return nil +} + +func (s *S3Store) findHeaderInBuffer(height uint64) *types.Header { + s.mu.Lock() + defer s.mu.Unlock() + + cs := s.chunkStart(height) + for _, h := range s.headerBuf[cs] { + if h.Height == height { + cp := *h + return &cp + } + } + return nil +} + +// --- S3 helpers --- + +func (s *S3Store) getObject(ctx context.Context, key string) ([]byte, error) { + out, err := s.client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(key), + }) + if err != nil { + return nil, err + } + defer out.Body.Close() //nolint:errcheck + return io.ReadAll(out.Body) +} + +func (s *S3Store) putObject(ctx context.Context, key string, data []byte) error { + _, err := s.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(key), + Body: bytes.NewReader(data), + ContentType: aws.String("application/json"), + }) + return err +} + +// isNotFound returns true if the error indicates an S3 NoSuchKey. +func isNotFound(err error) bool { + var nsk *s3types.NoSuchKey + if errors.As(err, &nsk) { + return true + } + // Some S3-compatible services return NotFound via the HTTP status code + // wrapped in a smithy-go OperationError rather than a typed NoSuchKey error. + var nsb *s3types.NotFound + return errors.As(err, &nsb) +} + +// --- Encoding/decoding --- + +func encodeS3Blobs(blobs []types.Blob) ([]byte, error) { + out := make([]s3Blob, len(blobs)) + for i, b := range blobs { + out[i] = s3Blob{ + Height: b.Height, + Namespace: b.Namespace.String(), + Data: b.Data, + Commitment: b.Commitment, + ShareVersion: b.ShareVersion, + Signer: b.Signer, + Index: b.Index, + } + } + return json.Marshal(out) +} + +func decodeS3Blobs(data []byte) ([]types.Blob, error) { + var raw []s3Blob + if err := json.Unmarshal(data, &raw); err != nil { + return nil, fmt.Errorf("decode blobs JSON: %w", err) + } + blobs := make([]types.Blob, len(raw)) + for i, r := range raw { + ns, err := types.NamespaceFromHex(r.Namespace) + if err != nil { + return nil, fmt.Errorf("decode blob namespace: %w", err) + } + blobs[i] = types.Blob{ + Height: r.Height, + Namespace: ns, + Data: r.Data, + Commitment: r.Commitment, + ShareVersion: r.ShareVersion, + Signer: r.Signer, + Index: r.Index, + } + } + return blobs, nil +} + +func encodeS3Headers(headers []types.Header) ([]byte, error) { + out := make([]s3Header, len(headers)) + for i, h := range headers { + out[i] = s3Header{ + Height: h.Height, + Hash: h.Hash, + DataHash: h.DataHash, + Time: h.Time, + RawHeader: h.RawHeader, + } + } + return json.Marshal(out) +} + +func decodeS3Headers(data []byte) ([]types.Header, error) { + var raw []s3Header + if err := json.Unmarshal(data, &raw); err != nil { + return nil, fmt.Errorf("decode headers JSON: %w", err) + } + headers := make([]types.Header, len(raw)) + for i, r := range raw { + headers[i] = types.Header{ + Height: r.Height, + Hash: r.Hash, + DataHash: r.DataHash, + Time: r.Time, + RawHeader: r.RawHeader, + } + } + return headers, nil +} + +// --- Deduplication --- + +func deduplicateBlobs(blobs []types.Blob) []types.Blob { + type blobKey struct { + height uint64 + namespace types.Namespace + index int + } + seen := make(map[blobKey]struct{}, len(blobs)) + out := make([]types.Blob, 0, len(blobs)) + for _, b := range blobs { + k := blobKey{height: b.Height, namespace: b.Namespace, index: b.Index} + if _, ok := seen[k]; ok { + continue + } + seen[k] = struct{}{} + out = append(out, b) + } + return out +} + +func deduplicateHeaders(headers []types.Header) []types.Header { + seen := make(map[uint64]struct{}, len(headers)) + out := make([]types.Header, 0, len(headers)) + for _, h := range headers { + if _, ok := seen[h.Height]; ok { + continue + } + seen[h.Height] = struct{}{} + out = append(out, h) + } + return out +} diff --git a/pkg/store/s3_test.go b/pkg/store/s3_test.go new file mode 100644 index 0000000..318cd9e --- /dev/null +++ b/pkg/store/s3_test.go @@ -0,0 +1,555 @@ +package store + +import ( + "bytes" + "context" + "encoding/hex" + "io" + "sync" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + + "github.com/evstack/apex/pkg/types" +) + +// mockS3Client is an in-memory S3 client for testing. +type mockS3Client struct { + mu sync.RWMutex + objects map[string][]byte +} + +func newMockS3Client() *mockS3Client { + return &mockS3Client{objects: make(map[string][]byte)} +} + +func (m *mockS3Client) GetObject(_ context.Context, input *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + data, ok := m.objects[*input.Key] + if !ok { + return nil, &s3types.NoSuchKey{} + } + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader(data)), + }, nil +} + +func (m *mockS3Client) PutObject(_ context.Context, input *s3.PutObjectInput, _ ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + m.mu.Lock() + defer m.mu.Unlock() + + data, err := io.ReadAll(input.Body) + if err != nil { + return nil, err + } + m.objects[*input.Key] = data + return &s3.PutObjectOutput{}, nil +} + +func (m *mockS3Client) objectCount() int { + m.mu.RLock() + defer m.mu.RUnlock() + return len(m.objects) +} + +func newTestS3Store(t *testing.T) (*S3Store, *mockS3Client) { + t.Helper() + mock := newMockS3Client() + store := newS3Store(mock, "test-bucket", "test", 4) // small chunk size for easier testing + return store, mock +} + +func TestS3Store_PutBlobsAndGetBlob(t *testing.T) { + ctx := context.Background() + s, _ := newTestS3Store(t) + ns := testNamespace(1) + + blobs := []types.Blob{ + {Height: 1, Namespace: ns, Data: []byte("data1"), Commitment: []byte("c1"), Index: 0}, + {Height: 1, Namespace: ns, Data: []byte("data2"), Commitment: []byte("c2"), Index: 1}, + {Height: 2, Namespace: ns, Data: []byte("data3"), Commitment: []byte("c3"), Index: 0}, + } + + if err := s.PutBlobs(ctx, blobs); err != nil { + t.Fatalf("PutBlobs: %v", err) + } + + // Before flush: read from buffer. + b, err := s.GetBlob(ctx, ns, 1, 0) + if err != nil { + t.Fatalf("GetBlob from buffer: %v", err) + } + if !bytes.Equal(b.Data, []byte("data1")) { + t.Errorf("got data %q, want %q", b.Data, "data1") + } + + // Flush via SetSyncState. + if err := s.SetSyncState(ctx, types.SyncStatus{State: types.Backfilling, LatestHeight: 2}); err != nil { + t.Fatalf("SetSyncState: %v", err) + } + + // After flush: read from S3. + b, err = s.GetBlob(ctx, ns, 2, 0) + if err != nil { + t.Fatalf("GetBlob from S3: %v", err) + } + if !bytes.Equal(b.Data, []byte("data3")) { + t.Errorf("got data %q, want %q", b.Data, "data3") + } + + // Not found. + _, err = s.GetBlob(ctx, ns, 99, 0) + if err != ErrNotFound { + t.Errorf("expected ErrNotFound, got %v", err) + } +} + +func TestS3Store_GetBlobs(t *testing.T) { + ctx := context.Background() + s, _ := newTestS3Store(t) + ns := testNamespace(1) + + // Heights 0-7 span two chunks (0-3, 4-7) with chunkSize=4. + var blobs []types.Blob + for h := uint64(0); h < 8; h++ { + blobs = append(blobs, types.Blob{ + Height: h, + Namespace: ns, + Data: []byte{byte(h)}, + Commitment: []byte{byte(h)}, + Index: 0, + }) + } + + if err := s.PutBlobs(ctx, blobs); err != nil { + t.Fatalf("PutBlobs: %v", err) + } + if err := s.SetSyncState(ctx, types.SyncStatus{LatestHeight: 7}); err != nil { + t.Fatalf("SetSyncState: %v", err) + } + + got, err := s.GetBlobs(ctx, ns, 2, 6, 0, 0) + if err != nil { + t.Fatalf("GetBlobs: %v", err) + } + if len(got) != 5 { + t.Fatalf("got %d blobs, want 5", len(got)) + } + for i, b := range got { + wantH := uint64(2 + i) + if b.Height != wantH { + t.Errorf("blob[%d].Height = %d, want %d", i, b.Height, wantH) + } + } + + // With limit and offset. + got, err = s.GetBlobs(ctx, ns, 0, 7, 2, 3) + if err != nil { + t.Fatalf("GetBlobs with limit/offset: %v", err) + } + if len(got) != 2 { + t.Fatalf("got %d blobs, want 2", len(got)) + } + if got[0].Height != 3 || got[1].Height != 4 { + t.Errorf("unexpected heights: %d, %d", got[0].Height, got[1].Height) + } +} + +func TestS3Store_GetBlobByCommitment(t *testing.T) { + ctx := context.Background() + s, _ := newTestS3Store(t) + ns := testNamespace(1) + + commit := []byte("unique-commitment") + blobs := []types.Blob{ + {Height: 5, Namespace: ns, Data: []byte("target"), Commitment: commit, Index: 0}, + } + + if err := s.PutBlobs(ctx, blobs); err != nil { + t.Fatalf("PutBlobs: %v", err) + } + + // From buffer (before flush). + b, err := s.GetBlobByCommitment(ctx, commit) + if err != nil { + t.Fatalf("GetBlobByCommitment from buffer: %v", err) + } + if !bytes.Equal(b.Data, []byte("target")) { + t.Errorf("got data %q, want %q", b.Data, "target") + } + + // Flush and re-read from S3. + if err := s.SetSyncState(ctx, types.SyncStatus{LatestHeight: 5}); err != nil { + t.Fatalf("SetSyncState: %v", err) + } + + b, err = s.GetBlobByCommitment(ctx, commit) + if err != nil { + t.Fatalf("GetBlobByCommitment from S3: %v", err) + } + if !bytes.Equal(b.Data, []byte("target")) { + t.Errorf("got data %q, want %q", b.Data, "target") + } + + // Not found. + _, err = s.GetBlobByCommitment(ctx, []byte("nonexistent")) + if err != ErrNotFound { + t.Errorf("expected ErrNotFound, got %v", err) + } +} + +func TestS3Store_PutHeaderAndGetHeader(t *testing.T) { + ctx := context.Background() + s, _ := newTestS3Store(t) + + now := time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC) + hdr := &types.Header{ + Height: 10, + Hash: []byte("hash10"), + DataHash: []byte("datahash10"), + Time: now, + RawHeader: []byte("raw10"), + } + + if err := s.PutHeader(ctx, hdr); err != nil { + t.Fatalf("PutHeader: %v", err) + } + + // Read from buffer. + got, err := s.GetHeader(ctx, 10) + if err != nil { + t.Fatalf("GetHeader from buffer: %v", err) + } + if !bytes.Equal(got.Hash, []byte("hash10")) { + t.Errorf("got hash %q, want %q", got.Hash, "hash10") + } + + // Flush and re-read. + if err := s.SetSyncState(ctx, types.SyncStatus{LatestHeight: 10}); err != nil { + t.Fatalf("SetSyncState: %v", err) + } + + got, err = s.GetHeader(ctx, 10) + if err != nil { + t.Fatalf("GetHeader from S3: %v", err) + } + if !bytes.Equal(got.Hash, []byte("hash10")) { + t.Errorf("got hash %q, want %q", got.Hash, "hash10") + } + if !got.Time.Equal(now) { + t.Errorf("got time %v, want %v", got.Time, now) + } + + // Not found. + _, err = s.GetHeader(ctx, 999) + if err != ErrNotFound { + t.Errorf("expected ErrNotFound, got %v", err) + } +} + +func TestS3Store_SyncState(t *testing.T) { + ctx := context.Background() + s, _ := newTestS3Store(t) + + // Initially not found. + _, err := s.GetSyncState(ctx) + if err != ErrNotFound { + t.Fatalf("expected ErrNotFound, got %v", err) + } + + status := types.SyncStatus{ + State: types.Backfilling, + LatestHeight: 100, + NetworkHeight: 200, + } + if err := s.SetSyncState(ctx, status); err != nil { + t.Fatalf("SetSyncState: %v", err) + } + + got, err := s.GetSyncState(ctx) + if err != nil { + t.Fatalf("GetSyncState: %v", err) + } + if got.State != types.Backfilling || got.LatestHeight != 100 || got.NetworkHeight != 200 { + t.Errorf("got %+v, want %+v", got, status) + } + + // Update. + status.State = types.Streaming + status.LatestHeight = 200 + if err := s.SetSyncState(ctx, status); err != nil { + t.Fatalf("SetSyncState update: %v", err) + } + got, err = s.GetSyncState(ctx) + if err != nil { + t.Fatalf("GetSyncState after update: %v", err) + } + if got.State != types.Streaming || got.LatestHeight != 200 { + t.Errorf("got %+v, want state=streaming, latest=200", got) + } +} + +func TestS3Store_Namespaces(t *testing.T) { + ctx := context.Background() + s, _ := newTestS3Store(t) + + ns1 := testNamespace(1) + ns2 := testNamespace(2) + + // Empty initially. + got, err := s.GetNamespaces(ctx) + if err != nil { + t.Fatalf("GetNamespaces empty: %v", err) + } + if len(got) != 0 { + t.Errorf("expected 0 namespaces, got %d", len(got)) + } + + // Add two namespaces. + if err := s.PutNamespace(ctx, ns1); err != nil { + t.Fatalf("PutNamespace ns1: %v", err) + } + if err := s.PutNamespace(ctx, ns2); err != nil { + t.Fatalf("PutNamespace ns2: %v", err) + } + + got, err = s.GetNamespaces(ctx) + if err != nil { + t.Fatalf("GetNamespaces: %v", err) + } + if len(got) != 2 { + t.Fatalf("expected 2 namespaces, got %d", len(got)) + } + + // Idempotent add. + if err := s.PutNamespace(ctx, ns1); err != nil { + t.Fatalf("PutNamespace idempotent: %v", err) + } + got, err = s.GetNamespaces(ctx) + if err != nil { + t.Fatalf("GetNamespaces after idempotent: %v", err) + } + if len(got) != 2 { + t.Errorf("expected 2 namespaces after idempotent add, got %d", len(got)) + } +} + +func TestS3Store_FlushOnClose(t *testing.T) { + ctx := context.Background() + s, mock := newTestS3Store(t) + ns := testNamespace(1) + + blobs := []types.Blob{ + {Height: 0, Namespace: ns, Data: []byte("close-data"), Commitment: []byte("cc"), Index: 0}, + } + if err := s.PutBlobs(ctx, blobs); err != nil { + t.Fatalf("PutBlobs: %v", err) + } + hdr := &types.Header{Height: 0, Hash: []byte("h0")} + if err := s.PutHeader(ctx, hdr); err != nil { + t.Fatalf("PutHeader: %v", err) + } + + // No S3 objects yet (only in buffer). + if mock.objectCount() != 0 { + t.Fatalf("expected 0 S3 objects before close, got %d", mock.objectCount()) + } + + if err := s.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + // After close, objects should be flushed. + if mock.objectCount() == 0 { + t.Fatal("expected S3 objects after close, got 0") + } + + // Re-create store pointing to same mock to verify data persisted. + s2 := newS3Store(mock, "test-bucket", "test", 4) + b, err := s2.GetBlob(ctx, ns, 0, 0) + if err != nil { + t.Fatalf("GetBlob after close: %v", err) + } + if !bytes.Equal(b.Data, []byte("close-data")) { + t.Errorf("got data %q, want %q", b.Data, "close-data") + } +} + +func TestS3Store_IdempotentPut(t *testing.T) { + ctx := context.Background() + s, _ := newTestS3Store(t) + ns := testNamespace(1) + + blob := types.Blob{Height: 3, Namespace: ns, Data: []byte("d"), Commitment: []byte("c"), Index: 0} + + // Put same blob twice. + if err := s.PutBlobs(ctx, []types.Blob{blob}); err != nil { + t.Fatalf("PutBlobs 1: %v", err) + } + if err := s.SetSyncState(ctx, types.SyncStatus{LatestHeight: 3}); err != nil { + t.Fatalf("SetSyncState 1: %v", err) + } + + if err := s.PutBlobs(ctx, []types.Blob{blob}); err != nil { + t.Fatalf("PutBlobs 2: %v", err) + } + if err := s.SetSyncState(ctx, types.SyncStatus{LatestHeight: 3}); err != nil { + t.Fatalf("SetSyncState 2: %v", err) + } + + // Should still return exactly one blob. + got, err := s.GetBlobs(ctx, ns, 3, 3, 0, 0) + if err != nil { + t.Fatalf("GetBlobs: %v", err) + } + if len(got) != 1 { + t.Errorf("expected 1 blob after idempotent put, got %d", len(got)) + } +} + +func TestS3Store_ChunkBoundary(t *testing.T) { + ctx := context.Background() + s, _ := newTestS3Store(t) // chunkSize=4 + ns := testNamespace(1) + + // Height 3 -> chunk 0, Height 4 -> chunk 4. + blobs := []types.Blob{ + {Height: 3, Namespace: ns, Data: []byte("in-chunk-0"), Commitment: []byte("c3"), Index: 0}, + {Height: 4, Namespace: ns, Data: []byte("in-chunk-4"), Commitment: []byte("c4"), Index: 0}, + } + if err := s.PutBlobs(ctx, blobs); err != nil { + t.Fatalf("PutBlobs: %v", err) + } + if err := s.SetSyncState(ctx, types.SyncStatus{LatestHeight: 4}); err != nil { + t.Fatalf("SetSyncState: %v", err) + } + + b3, err := s.GetBlob(ctx, ns, 3, 0) + if err != nil { + t.Fatalf("GetBlob height=3: %v", err) + } + if !bytes.Equal(b3.Data, []byte("in-chunk-0")) { + t.Errorf("height=3 data %q, want %q", b3.Data, "in-chunk-0") + } + + b4, err := s.GetBlob(ctx, ns, 4, 0) + if err != nil { + t.Fatalf("GetBlob height=4: %v", err) + } + if !bytes.Equal(b4.Data, []byte("in-chunk-4")) { + t.Errorf("height=4 data %q, want %q", b4.Data, "in-chunk-4") + } +} + +func TestS3Store_MultipleNamespaces(t *testing.T) { + ctx := context.Background() + s, _ := newTestS3Store(t) + ns1 := testNamespace(1) + ns2 := testNamespace(2) + + blobs := []types.Blob{ + {Height: 1, Namespace: ns1, Data: []byte("ns1"), Commitment: []byte("c1"), Index: 0}, + {Height: 1, Namespace: ns2, Data: []byte("ns2"), Commitment: []byte("c2"), Index: 0}, + } + if err := s.PutBlobs(ctx, blobs); err != nil { + t.Fatalf("PutBlobs: %v", err) + } + if err := s.SetSyncState(ctx, types.SyncStatus{LatestHeight: 1}); err != nil { + t.Fatalf("SetSyncState: %v", err) + } + + got1, err := s.GetBlobs(ctx, ns1, 1, 1, 0, 0) + if err != nil { + t.Fatalf("GetBlobs ns1: %v", err) + } + if len(got1) != 1 || !bytes.Equal(got1[0].Data, []byte("ns1")) { + t.Errorf("ns1 blobs: got %d, want 1 with data 'ns1'", len(got1)) + } + + got2, err := s.GetBlobs(ctx, ns2, 1, 1, 0, 0) + if err != nil { + t.Fatalf("GetBlobs ns2: %v", err) + } + if len(got2) != 1 || !bytes.Equal(got2[0].Data, []byte("ns2")) { + t.Errorf("ns2 blobs: got %d, want 1 with data 'ns2'", len(got2)) + } +} + +func TestS3Store_BufferReadThrough(t *testing.T) { + ctx := context.Background() + s, _ := newTestS3Store(t) + ns := testNamespace(1) + + // Write header, don't flush. + hdr := &types.Header{Height: 7, Hash: []byte("h7"), Time: time.Now().UTC().Truncate(time.Second)} + if err := s.PutHeader(ctx, hdr); err != nil { + t.Fatalf("PutHeader: %v", err) + } + + // Write blob, don't flush. + blob := types.Blob{Height: 7, Namespace: ns, Data: []byte("buf"), Commitment: []byte("cb"), Index: 0} + if err := s.PutBlobs(ctx, []types.Blob{blob}); err != nil { + t.Fatalf("PutBlobs: %v", err) + } + + // Read should hit the buffer. + gotH, err := s.GetHeader(ctx, 7) + if err != nil { + t.Fatalf("GetHeader from buffer: %v", err) + } + if !bytes.Equal(gotH.Hash, []byte("h7")) { + t.Errorf("header hash %q, want %q", gotH.Hash, "h7") + } + + gotB, err := s.GetBlob(ctx, ns, 7, 0) + if err != nil { + t.Fatalf("GetBlob from buffer: %v", err) + } + if !bytes.Equal(gotB.Data, []byte("buf")) { + t.Errorf("blob data %q, want %q", gotB.Data, "buf") + } +} + +func TestS3Store_PrefixAndKeyFormat(t *testing.T) { + mock := newMockS3Client() + + // With prefix. + s := newS3Store(mock, "b", "myprefix", 64) + key := s.key("headers", chunkFileName(0)) + want := "myprefix/headers/chunk_0000000000000000.json" + if key != want { + t.Errorf("key with prefix = %q, want %q", key, want) + } + + // Without prefix. + s2 := newS3Store(mock, "b", "", 64) + key2 := s2.key("headers", chunkFileName(64)) + want2 := "headers/chunk_0000000000000064.json" + if key2 != want2 { + t.Errorf("key without prefix = %q, want %q", key2, want2) + } + + // Commitment index key. + commitHex := hex.EncodeToString([]byte("test")) + key3 := s.key("index", "commitments", commitHex+".json") + want3 := "myprefix/index/commitments/" + commitHex + ".json" + if key3 != want3 { + t.Errorf("commitment key = %q, want %q", key3, want3) + } +} + +func TestS3Store_EmptyPutBlobs(t *testing.T) { + ctx := context.Background() + s, _ := newTestS3Store(t) + + if err := s.PutBlobs(ctx, nil); err != nil { + t.Fatalf("PutBlobs nil: %v", err) + } + if err := s.PutBlobs(ctx, []types.Blob{}); err != nil { + t.Fatalf("PutBlobs empty: %v", err) + } +} diff --git a/pkg/sync/backfill.go b/pkg/sync/backfill.go index ae445fc..bb8b6d6 100644 --- a/pkg/sync/backfill.go +++ b/pkg/sync/backfill.go @@ -4,10 +4,13 @@ import ( "context" "fmt" "sync" + "sync/atomic" + "time" "github.com/rs/zerolog" "github.com/evstack/apex/pkg/fetch" + "github.com/evstack/apex/pkg/metrics" "github.com/evstack/apex/pkg/store" "github.com/evstack/apex/pkg/types" ) @@ -19,32 +22,76 @@ type Backfiller struct { batchSize int concurrency int observer HeightObserver + metrics metrics.Recorder log zerolog.Logger } // Run backfills from fromHeight to toHeight (inclusive). func (b *Backfiller) Run(ctx context.Context, fromHeight, toHeight uint64) error { + if b.metrics == nil { + b.metrics = metrics.Nop() + } + namespaces, err := b.store.GetNamespaces(ctx) if err != nil { return fmt.Errorf("get namespaces: %w", err) } + totalHeights := toHeight - fromHeight + 1 + var processedHeights atomic.Uint64 + startTime := time.Now() + + // Periodic progress reporting. + stopProgress := make(chan struct{}) + progressStopped := make(chan struct{}) + go func() { + defer close(progressStopped) + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + done := processedHeights.Load() + elapsed := time.Since(startTime) + pct := float64(done) / float64(totalHeights) * 100 + rate := float64(done) / elapsed.Seconds() + remaining := time.Duration(float64(totalHeights-done)/rate) * time.Second + b.log.Info(). + Uint64("height", fromHeight+done). + Uint64("target", toHeight). + Str("progress", fmt.Sprintf("%.1f%%", pct)). + Str("rate", fmt.Sprintf("%.0f heights/s", rate)). + Str("eta", remaining.Truncate(time.Second).String()). + Msg("backfill progress") + case <-stopProgress: + return + } + } + }() + for batchStart := fromHeight; batchStart <= toHeight; batchStart += uint64(b.batchSize) { + batchStartTime := time.Now() batchEnd := batchStart + uint64(b.batchSize) - 1 if batchEnd > toHeight { batchEnd = toHeight } if err := b.processBatch(ctx, batchStart, batchEnd, namespaces); err != nil { + close(stopProgress) + <-progressStopped return err } + processedHeights.Store(batchEnd - fromHeight + 1) + // Checkpoint after each batch. if err := b.store.SetSyncState(ctx, types.SyncStatus{ State: types.Backfilling, LatestHeight: batchEnd, NetworkHeight: toHeight, }); err != nil { + close(stopProgress) + <-progressStopped return fmt.Errorf("checkpoint at height %d: %w", batchEnd, err) } @@ -52,8 +99,20 @@ func (b *Backfiller) Run(ctx context.Context, fromHeight, toHeight uint64) error Uint64("batch_start", batchStart). Uint64("batch_end", batchEnd). Msg("batch complete") + b.metrics.ObserveBatchDuration(time.Since(batchStartTime)) } + // Stop progress goroutine and log final summary. + close(stopProgress) + <-progressStopped + elapsed := time.Since(startTime) + rate := float64(totalHeights) / elapsed.Seconds() + b.log.Info(). + Uint64("heights", totalHeights). + Str("elapsed", elapsed.Truncate(time.Second).String()). + Str("rate", fmt.Sprintf("%.0f heights/s", rate)). + Msg("backfill finished") + return nil } @@ -114,29 +173,46 @@ func (b *Backfiller) processBatch(ctx context.Context, from, to uint64, namespac } func (b *Backfiller) processHeight(ctx context.Context, height uint64, namespaces []types.Namespace) error { + stageStart := time.Now() hdr, err := b.fetcher.GetHeader(ctx, height) + b.metrics.ObserveBackfillStageDuration("fetch_header", time.Since(stageStart)) if err != nil { + b.metrics.IncBackfillStageErrors("fetch_header") return fmt.Errorf("get header: %w", err) } + + stageStart = time.Now() if err := b.store.PutHeader(ctx, hdr); err != nil { + b.metrics.ObserveBackfillStageDuration("store_header", time.Since(stageStart)) + b.metrics.IncBackfillStageErrors("store_header") return fmt.Errorf("put header: %w", err) } + b.metrics.ObserveBackfillStageDuration("store_header", time.Since(stageStart)) var blobs []types.Blob if len(namespaces) > 0 { + stageStart = time.Now() blobs, err = b.fetcher.GetBlobs(ctx, height, namespaces) + b.metrics.ObserveBackfillStageDuration("fetch_blobs", time.Since(stageStart)) if err != nil { + b.metrics.IncBackfillStageErrors("fetch_blobs") return fmt.Errorf("get blobs: %w", err) } if len(blobs) > 0 { + stageStart = time.Now() if err := b.store.PutBlobs(ctx, blobs); err != nil { + b.metrics.ObserveBackfillStageDuration("store_blobs", time.Since(stageStart)) + b.metrics.IncBackfillStageErrors("store_blobs") return fmt.Errorf("put blobs: %w", err) } + b.metrics.ObserveBackfillStageDuration("store_blobs", time.Since(stageStart)) } } if b.observer != nil { + stageStart = time.Now() b.observer(height, hdr, blobs) + b.metrics.ObserveBackfillStageDuration("observer", time.Since(stageStart)) } return nil diff --git a/pkg/sync/coordinator.go b/pkg/sync/coordinator.go index 6d0c9c1..254379b 100644 --- a/pkg/sync/coordinator.go +++ b/pkg/sync/coordinator.go @@ -165,6 +165,7 @@ func (c *Coordinator) Run(ctx context.Context) error { batchSize: c.batchSize, concurrency: c.concurrency, observer: wrappedObserver, + metrics: c.metrics, log: c.log.With().Str("component", "backfiller").Logger(), } if err := bf.Run(ctx, fromHeight, networkHeight); err != nil { diff --git a/pkg/types/types.go b/pkg/types/types.go index b901f91..62f67ae 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -3,6 +3,7 @@ package types import ( "encoding/hex" "fmt" + "strings" "time" ) @@ -12,9 +13,15 @@ const NamespaceSize = 29 // Namespace is a 29-byte Celestia namespace identifier. type Namespace [NamespaceSize]byte +// StripHexPrefix removes an optional "0x" or "0X" prefix from a hex string. +func StripHexPrefix(s string) string { + return strings.TrimPrefix(strings.TrimPrefix(s, "0x"), "0X") +} + // NamespaceFromHex parses a hex-encoded namespace string. +// It accepts an optional "0x" or "0X" prefix. func NamespaceFromHex(s string) (Namespace, error) { - b, err := hex.DecodeString(s) + b, err := hex.DecodeString(StripHexPrefix(s)) if err != nil { return Namespace{}, fmt.Errorf("invalid hex: %w", err) } From 98bc280ed52911873bfe26c1bdc5a6b243c2cecf Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Thu, 19 Feb 2026 21:07:06 +0100 Subject: [PATCH 2/6] fix: address PR review comments - Document SetMetrics must-call-before-use contract - Use dedicated nsMu for PutNamespace to avoid holding buffer mutex during S3 network I/O - Add flushMu to serialize flush() calls, preventing concurrent read-modify-write races on S3 chunk objects - Pre-allocate merged slice in flushHeaderChunk (lint fix) - Guard division by zero in backfill progress when done==0 Co-Authored-By: Claude Opus 4.6 --- pkg/store/s3.go | 14 +++++++++++--- pkg/sync/backfill.go | 6 ++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/pkg/store/s3.go b/pkg/store/s3.go index 4f33660..951633c 100644 --- a/pkg/store/s3.go +++ b/pkg/store/s3.go @@ -86,6 +86,9 @@ type S3Store struct { blobBuf map[blobChunkKey][]types.Blob headerBuf map[uint64][]*types.Header commitBuf []commitEntry + + nsMu sync.Mutex // guards PutNamespace read-modify-write (separate from buffer mu) + flushMu sync.Mutex // serializes flush operations } // NewS3Store creates a new S3Store from the given config. @@ -163,6 +166,7 @@ func newS3Store(client s3Client, bucket, prefix string, chunkSize uint64) *S3Sto } // SetMetrics sets the metrics recorder. +// Must be called before any other method — not safe for concurrent use. func (s *S3Store) SetMetrics(m metrics.Recorder) { s.metrics = m } @@ -226,8 +230,8 @@ func (s *S3Store) PutHeader(ctx context.Context, header *types.Header) error { } func (s *S3Store) PutNamespace(ctx context.Context, ns types.Namespace) error { - s.mu.Lock() - defer s.mu.Unlock() + s.nsMu.Lock() + defer s.nsMu.Unlock() key := s.key("meta", "namespaces.json") @@ -537,7 +541,11 @@ func (s *S3Store) Close() error { // --- Flush --- // flush drains the write buffer to S3. Called at checkpoint boundaries. +// Serialized by flushMu to prevent concurrent read-modify-write races on S3 chunk objects. func (s *S3Store) flush(ctx context.Context) error { + s.flushMu.Lock() + defer s.flushMu.Unlock() + s.mu.Lock() blobBuf := s.blobBuf headerBuf := s.headerBuf @@ -649,7 +657,7 @@ func (s *S3Store) flushHeaderChunk(ctx context.Context, cs uint64, newHeaders [] return fmt.Errorf("read header chunk for merge: %w", err) } - var merged []types.Header + merged := make([]types.Header, 0, len(newHeaders)) if existing != nil { merged, err = decodeS3Headers(existing) if err != nil { diff --git a/pkg/sync/backfill.go b/pkg/sync/backfill.go index bb8b6d6..3fe1748 100644 --- a/pkg/sync/backfill.go +++ b/pkg/sync/backfill.go @@ -52,6 +52,12 @@ func (b *Backfiller) Run(ctx context.Context, fromHeight, toHeight uint64) error select { case <-ticker.C: done := processedHeights.Load() + if done == 0 { + b.log.Info(). + Uint64("target", toHeight). + Msg("backfill progress: waiting for first batch") + continue + } elapsed := time.Since(startTime) pct := float64(done) / float64(totalHeights) * 100 rate := float64(done) / elapsed.Seconds() From 392b8b2ed6692b68e8638cf6beec911ea408750a Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 20 Feb 2026 11:10:06 +0100 Subject: [PATCH 3/6] Harden Celestia fetch retries and raise backfill defaults --- cmd/apex/main.go | 4 +- config/config.go | 14 +++-- config/load.go | 81 +++++++++++++++++++----- config/load_test.go | 6 ++ docs/running.md | 4 +- pkg/fetch/celestia_node.go | 106 +++++++++++++++++++++++++++++--- pkg/fetch/celestia_node_test.go | 50 +++++++++++++++ 7 files changed, 235 insertions(+), 30 deletions(-) diff --git a/cmd/apex/main.go b/cmd/apex/main.go index eea6c44..1460c25 100644 --- a/cmd/apex/main.go +++ b/cmd/apex/main.go @@ -238,7 +238,7 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { defer dataFetcher.Close() //nolint:errcheck // Set up API layer. - notifier := api.NewNotifier(cfg.Subscription.BufferSize, log.Logger) + notifier := api.NewNotifier(cfg.Subscription.BufferSize, cfg.Subscription.MaxSubscribers, log.Logger) notifier.SetMetrics(rec) svc := api.NewService(db, dataFetcher, proofFwd, notifier, log.Logger) @@ -266,6 +266,8 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { Addr: cfg.RPC.ListenAddr, Handler: mux, ReadHeaderTimeout: 10 * time.Second, + ReadTimeout: time.Duration(cfg.RPC.ReadTimeout) * time.Second, + WriteTimeout: time.Duration(cfg.RPC.WriteTimeout) * time.Second, } go func() { diff --git a/config/config.go b/config/config.go index 5d5ec14..2864506 100644 --- a/config/config.go +++ b/config/config.go @@ -53,6 +53,8 @@ type S3Config struct { type RPCConfig struct { ListenAddr string `yaml:"listen_addr"` GRPCListenAddr string `yaml:"grpc_listen_addr"` + ReadTimeout int `yaml:"read_timeout"` // in seconds + WriteTimeout int `yaml:"write_timeout"` // in seconds } // SyncConfig configures the sync coordinator. @@ -64,7 +66,8 @@ type SyncConfig struct { // SubscriptionConfig configures API event subscriptions. type SubscriptionConfig struct { - BufferSize int `yaml:"buffer_size"` + BufferSize int `yaml:"buffer_size"` + MaxSubscribers int `yaml:"max_subscribers"` } // MetricsConfig configures Prometheus metrics. @@ -99,13 +102,16 @@ func DefaultConfig() Config { RPC: RPCConfig{ ListenAddr: ":8080", GRPCListenAddr: ":9090", + ReadTimeout: 30, + WriteTimeout: 30, }, Sync: SyncConfig{ - BatchSize: 64, - Concurrency: 4, + BatchSize: 256, + Concurrency: 12, }, Subscription: SubscriptionConfig{ - BufferSize: 64, + BufferSize: 64, + MaxSubscribers: 1024, }, Metrics: MetricsConfig{ Enabled: true, diff --git a/config/load.go b/config/load.go index 5211f8c..7f69124 100644 --- a/config/load.go +++ b/config/load.go @@ -64,18 +64,23 @@ rpc: listen_addr: ":8080" # Address for the gRPC API server grpc_listen_addr: ":9090" + # HTTP read/write timeouts in seconds + read_timeout: 30 + # write_timeout: 30 sync: # Height to start syncing from (0 = genesis) start_height: 0 # Number of headers per backfill batch - batch_size: 64 + batch_size: 256 # Number of concurrent fetch workers - concurrency: 4 + concurrency: 12 subscription: # Event buffer size per subscriber (for API subscriptions) buffer_size: 64 + # Maximum number of concurrent subscribers + max_subscribers: 1024 metrics: # Enable Prometheus metrics endpoint @@ -185,33 +190,77 @@ func validate(cfg *Config) error { if err := validateStorage(&cfg.Storage); err != nil { return err } - if cfg.RPC.ListenAddr == "" { + if err := validateRPC(&cfg.RPC); err != nil { + return err + } + if err := validateSync(&cfg.Sync); err != nil { + return err + } + if err := validateSubscription(&cfg.Subscription); err != nil { + return err + } + if err := validateMetrics(&cfg.Metrics); err != nil { + return err + } + if err := validateProfiling(&cfg.Profiling); err != nil { + return err + } + if !validLogLevels[cfg.Log.Level] { + return fmt.Errorf("log.level %q is invalid; must be one of trace/debug/info/warn/error/fatal/panic", cfg.Log.Level) + } + if cfg.Log.Format != "json" && cfg.Log.Format != "console" { + return fmt.Errorf("log.format %q is invalid; must be json or console", cfg.Log.Format) + } + + return nil +} + +func validateRPC(rpc *RPCConfig) error { + if rpc.ListenAddr == "" { return fmt.Errorf("rpc.listen_addr is required") } - if cfg.RPC.GRPCListenAddr == "" { + if rpc.GRPCListenAddr == "" { return fmt.Errorf("rpc.grpc_listen_addr is required") } - if cfg.Sync.BatchSize <= 0 { + if rpc.ReadTimeout < 0 { + return fmt.Errorf("rpc.read_timeout must be non-negative") + } + if rpc.WriteTimeout < 0 { + return fmt.Errorf("rpc.write_timeout must be non-negative") + } + return nil +} + +func validateSync(sync *SyncConfig) error { + if sync.BatchSize <= 0 { return fmt.Errorf("sync.batch_size must be positive") } - if cfg.Sync.Concurrency <= 0 { + if sync.Concurrency <= 0 { return fmt.Errorf("sync.concurrency must be positive") } - if cfg.Subscription.BufferSize <= 0 { + return nil +} + +func validateSubscription(sub *SubscriptionConfig) error { + if sub.BufferSize <= 0 { return fmt.Errorf("subscription.buffer_size must be positive") } - if cfg.Metrics.Enabled && cfg.Metrics.ListenAddr == "" { + if sub.MaxSubscribers <= 0 { + return fmt.Errorf("subscription.max_subscribers must be positive") + } + return nil +} + +func validateMetrics(m *MetricsConfig) error { + if m.Enabled && m.ListenAddr == "" { return fmt.Errorf("metrics.listen_addr is required when metrics are enabled") } - if cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" { + return nil +} + +func validateProfiling(p *ProfilingConfig) error { + if p.Enabled && p.ListenAddr == "" { return fmt.Errorf("profiling.listen_addr is required when profiling is enabled") } - if !validLogLevels[cfg.Log.Level] { - return fmt.Errorf("log.level %q is invalid; must be one of trace/debug/info/warn/error/fatal/panic", cfg.Log.Level) - } - if cfg.Log.Format != "json" && cfg.Log.Format != "console" { - return fmt.Errorf("log.format %q is invalid; must be json or console", cfg.Log.Format) - } - return nil } diff --git a/config/load_test.go b/config/load_test.go index 97ac9ee..e305275 100644 --- a/config/load_test.go +++ b/config/load_test.go @@ -17,6 +17,12 @@ func TestDefaultConfigProfilingDefaults(t *testing.T) { if cfg.Profiling.ListenAddr != "127.0.0.1:6061" { t.Fatalf("profiling.listen_addr default = %q, want %q", cfg.Profiling.ListenAddr, "127.0.0.1:6061") } + if cfg.Sync.BatchSize != 256 { + t.Fatalf("sync.batch_size default = %d, want %d", cfg.Sync.BatchSize, 256) + } + if cfg.Sync.Concurrency != 12 { + t.Fatalf("sync.concurrency default = %d, want %d", cfg.Sync.Concurrency, 12) + } } func TestLoadProfilingEnabledRequiresListenAddr(t *testing.T) { diff --git a/docs/running.md b/docs/running.md index c422aa0..1976f23 100644 --- a/docs/running.md +++ b/docs/running.md @@ -73,8 +73,8 @@ rpc: sync: start_height: 0 # 0 = genesis - batch_size: 64 # headers per backfill batch - concurrency: 4 # concurrent fetch workers + batch_size: 256 # headers per backfill batch + concurrency: 12 # concurrent fetch workers subscription: buffer_size: 64 # event buffer per subscriber diff --git a/pkg/fetch/celestia_node.go b/pkg/fetch/celestia_node.go index f7241f9..249eb69 100644 --- a/pkg/fetch/celestia_node.go +++ b/pkg/fetch/celestia_node.go @@ -4,7 +4,10 @@ import ( "context" "encoding/hex" "encoding/json" + "errors" "fmt" + "io" + "net" "net/http" "strconv" "strings" @@ -42,6 +45,12 @@ type CelestiaNodeFetcher struct { closed bool } +const ( + defaultRPCTimeout = 8 * time.Second + defaultRPCMaxRetries = 2 + defaultRPCRetryDelay = 100 * time.Millisecond +) + // NewCelestiaNodeFetcher connects to a Celestia node at the given WebSocket address. func NewCelestiaNodeFetcher(ctx context.Context, addr, token string, log zerolog.Logger) (*CelestiaNodeFetcher, error) { headers := http.Header{} @@ -67,7 +76,9 @@ func NewCelestiaNodeFetcher(ctx context.Context, addr, token string, log zerolog } func (f *CelestiaNodeFetcher) GetHeader(ctx context.Context, height uint64) (*types.Header, error) { - raw, err := f.header.GetByHeight(ctx, height) + raw, err := f.callRawWithRetry(ctx, "header.GetByHeight", func(callCtx context.Context) (json.RawMessage, error) { + return f.header.GetByHeight(callCtx, height) + }) if err != nil { return nil, fmt.Errorf("header.GetByHeight(%d): %w", height, err) } @@ -76,7 +87,9 @@ func (f *CelestiaNodeFetcher) GetHeader(ctx context.Context, height uint64) (*ty func (f *CelestiaNodeFetcher) GetBlobs(ctx context.Context, height uint64, namespaces []types.Namespace) ([]types.Blob, error) { nsBytes := namespacesToBytes(namespaces) - raw, err := f.blob.GetAll(ctx, height, nsBytes) + raw, err := f.callRawWithRetry(ctx, "blob.GetAll", func(callCtx context.Context) (json.RawMessage, error) { + return f.blob.GetAll(callCtx, height, nsBytes) + }) if err != nil { if isNotFoundErr(err) { return nil, nil @@ -88,13 +101,50 @@ func (f *CelestiaNodeFetcher) GetBlobs(ctx context.Context, height uint64, names } func (f *CelestiaNodeFetcher) GetNetworkHead(ctx context.Context) (*types.Header, error) { - raw, err := f.header.NetworkHead(ctx) + raw, err := f.callRawWithRetry(ctx, "header.NetworkHead", func(callCtx context.Context) (json.RawMessage, error) { + return f.header.NetworkHead(callCtx) + }) if err != nil { return nil, fmt.Errorf("header.NetworkHead: %w", err) } return mapHeader(raw) } +func (f *CelestiaNodeFetcher) callRawWithRetry(ctx context.Context, op string, fn func(context.Context) (json.RawMessage, error)) (json.RawMessage, error) { + var err error + for attempt := range defaultRPCMaxRetries + 1 { + if ctxErr := ctx.Err(); ctxErr != nil { + return nil, ctxErr + } + + callCtx, cancel := context.WithTimeout(ctx, defaultRPCTimeout) + raw, callErr := fn(callCtx) + cancel() + if callErr == nil { + return raw, nil + } + err = callErr + + if isNotFoundErr(err) || !isTransientRPCError(err) || attempt == defaultRPCMaxRetries { + break + } + + delay := retryDelay(attempt) + f.log.Warn(). + Str("op", op). + Int("attempt", attempt+1). + Dur("retry_in", delay). + Err(err). + Msg("transient rpc error; retrying") + select { + case <-time.After(delay): + case <-ctx.Done(): + return nil, ctx.Err() + } + } + return nil, err +} + func (f *CelestiaNodeFetcher) SubscribeHeaders(ctx context.Context) (<-chan *types.Header, error) { rawCh, err := f.header.Subscribe(ctx) if err != nil { @@ -242,14 +292,25 @@ func mapBlobs(raw json.RawMessage, height uint64) ([]types.Blob, error) { func namespacesToBytes(nss []types.Namespace) [][]byte { out := make([][]byte, len(nss)) - for i, ns := range nss { - b := make([]byte, types.NamespaceSize) - copy(b, ns[:]) - out[i] = b + for i := range nss { + out[i] = nss[i][:] } return out } +func retryDelay(attempt int) time.Duration { + base := defaultRPCRetryDelay * time.Duration(1< Date: Fri, 20 Feb 2026 11:10:49 +0100 Subject: [PATCH 4/6] fixes --- config.yaml | 51 +++++++++++++++++++++++++ pkg/api/grpc/blob_service.go | 5 ++- pkg/api/grpc/header_service.go | 5 ++- pkg/api/grpc/server_test.go | 14 +++---- pkg/api/health_test.go | 4 +- pkg/api/jsonrpc/blob.go | 5 ++- pkg/api/jsonrpc/header.go | 5 ++- pkg/api/jsonrpc/server_test.go | 12 +++--- pkg/api/notifier.go | 45 ++++++++++++++-------- pkg/api/notifier_test.go | 69 ++++++++++++++++++++++++++-------- pkg/api/service.go | 4 +- pkg/api/service_test.go | 18 ++++----- 12 files changed, 176 insertions(+), 61 deletions(-) create mode 100644 config.yaml diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..c5e00d7 --- /dev/null +++ b/config.yaml @@ -0,0 +1,51 @@ +# Apex configuration +# Generated by: apex init + +data_source: + # Data source type: "node" (Celestia DA node) or "app" (celestia-app CometBFT RPC) + type: "node" + + # Celestia DA node RPC endpoint (required when type: "node") + celestia_node_url: "http://100.97.172.78:36668" + + # Celestia-app CometBFT RPC endpoint (required when type: "app") + # celestia_app_url: "http://localhost:26657" + + # Auth token: set via APEX_AUTH_TOKEN env var (not read from this file). + + # Namespaces to index (hex-encoded, 29 bytes = 58 hex chars each). + namespaces: [0x000000000000000000000000000000000000005d2e074163aa3b4d9818,0x0000000000000000000000000000000000000037586d889dd7a6a44ca8] + +storage: + # Path to the SQLite database file + db_path: "apex.db" + +rpc: + # Address for the JSON-RPC API server (HTTP/WebSocket) + listen_addr: ":8080" + # Address for the gRPC API server + grpc_listen_addr: ":9090" + +sync: + # Height to start syncing from (0 = genesis) + start_height: 7970386 + # Number of headers per backfill batch + batch_size: 256 + # Number of concurrent fetch workers + concurrency: 12 + +subscription: + # Event buffer size per subscriber (for API subscriptions) + buffer_size: 64 + +metrics: + # Enable Prometheus metrics endpoint + enabled: true + # Address for the metrics server + listen_addr: ":9091" + +log: + # Log level: trace, debug, info, warn, error, fatal, panic + level: "info" + # Log format: json or console + format: "json" diff --git a/pkg/api/grpc/blob_service.go b/pkg/api/grpc/blob_service.go index b12edef..0988c17 100644 --- a/pkg/api/grpc/blob_service.go +++ b/pkg/api/grpc/blob_service.go @@ -108,7 +108,10 @@ func (s *BlobServiceServer) Subscribe(req *pb.BlobServiceSubscribeRequest, strea return status.Errorf(codes.InvalidArgument, "invalid namespace: %v", err) } - sub := s.svc.Notifier().Subscribe([]types.Namespace{ns}) + sub, err := s.svc.Notifier().Subscribe([]types.Namespace{ns}) + if err != nil { + return status.Errorf(codes.ResourceExhausted, "subscribe: %v", err) + } defer s.svc.Notifier().Unsubscribe(sub) ctx := stream.Context() diff --git a/pkg/api/grpc/header_service.go b/pkg/api/grpc/header_service.go index f741aad..e1ef40d 100644 --- a/pkg/api/grpc/header_service.go +++ b/pkg/api/grpc/header_service.go @@ -59,7 +59,10 @@ func (s *HeaderServiceServer) NetworkHead(ctx context.Context, _ *pb.NetworkHead } func (s *HeaderServiceServer) Subscribe(_ *pb.HeaderServiceSubscribeRequest, stream grpc.ServerStreamingServer[pb.HeaderServiceSubscribeResponse]) error { - sub := s.svc.HeaderSubscribe() + sub, err := s.svc.HeaderSubscribe() + if err != nil { + return status.Errorf(codes.ResourceExhausted, "subscribe: %v", err) + } defer s.svc.Notifier().Unsubscribe(sub) ctx := stream.Context() diff --git a/pkg/api/grpc/server_test.go b/pkg/api/grpc/server_test.go index a32e722..52c09e6 100644 --- a/pkg/api/grpc/server_test.go +++ b/pkg/api/grpc/server_test.go @@ -183,7 +183,7 @@ func TestGRPCBlobGet(t *testing.T) { {Height: 10, Namespace: ns, Data: []byte("d1"), Commitment: commitment, Index: 0}, } - notifier := api.NewNotifier(16, zerolog.Nop()) + notifier := api.NewNotifier(16, 1024, zerolog.Nop()) svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop()) client := startTestServer(t, svc) @@ -212,7 +212,7 @@ func TestGRPCBlobGetAll(t *testing.T) { {Height: 10, Namespace: ns, Data: []byte("d2"), Commitment: []byte("c2"), Index: 1}, } - notifier := api.NewNotifier(16, zerolog.Nop()) + notifier := api.NewNotifier(16, 1024, zerolog.Nop()) svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop()) client := startTestServer(t, svc) @@ -237,7 +237,7 @@ func TestGRPCBlobGetByCommitment(t *testing.T) { {Height: 10, Namespace: ns, Data: []byte("d1"), Commitment: commitment, Index: 0}, } - notifier := api.NewNotifier(16, zerolog.Nop()) + notifier := api.NewNotifier(16, 1024, zerolog.Nop()) svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop()) client := startTestServer(t, svc) @@ -266,7 +266,7 @@ func TestGRPCHeaderGetByHeight(t *testing.T) { RawHeader: []byte("raw"), } - notifier := api.NewNotifier(16, zerolog.Nop()) + notifier := api.NewNotifier(16, 1024, zerolog.Nop()) svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop()) client := startTestHeaderServer(t, svc) @@ -291,7 +291,7 @@ func TestGRPCHeaderLocalHead(t *testing.T) { RawHeader: []byte("raw"), } - notifier := api.NewNotifier(16, zerolog.Nop()) + notifier := api.NewNotifier(16, 1024, zerolog.Nop()) svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop()) client := startTestHeaderServer(t, svc) @@ -313,7 +313,7 @@ func TestGRPCHeaderNetworkHead(t *testing.T) { }, } - notifier := api.NewNotifier(16, zerolog.Nop()) + notifier := api.NewNotifier(16, 1024, zerolog.Nop()) svc := api.NewService(newMockStore(), ft, nil, notifier, zerolog.Nop()) client := startTestHeaderServer(t, svc) @@ -330,7 +330,7 @@ func TestGRPCBlobSubscribe(t *testing.T) { st := newMockStore() ns := testNamespace(1) - notifier := api.NewNotifier(16, zerolog.Nop()) + notifier := api.NewNotifier(16, 1024, zerolog.Nop()) svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop()) client := startTestServer(t, svc) diff --git a/pkg/api/health_test.go b/pkg/api/health_test.go index db52a19..4678bdb 100644 --- a/pkg/api/health_test.go +++ b/pkg/api/health_test.go @@ -53,7 +53,7 @@ func TestHealthEndpoint(t *testing.T) { LatestHeight: 100, NetworkHeight: 105, }} - notifier := NewNotifier(64, zerolog.Nop()) + notifier := NewNotifier(64, 1024, zerolog.Nop()) h := NewHealthHandler(sp, newMockStore(), notifier, "test") mux := http.NewServeMux() @@ -87,7 +87,7 @@ func TestReadyEndpoint(t *testing.T) { LatestHeight: 100, NetworkHeight: 100, }} - notifier := NewNotifier(64, zerolog.Nop()) + notifier := NewNotifier(64, 1024, zerolog.Nop()) h := NewHealthHandler(sp, newMockStore(), notifier, "test") mux := http.NewServeMux() diff --git a/pkg/api/jsonrpc/blob.go b/pkg/api/jsonrpc/blob.go index ba1bf00..79357c1 100644 --- a/pkg/api/jsonrpc/blob.go +++ b/pkg/api/jsonrpc/blob.go @@ -49,7 +49,10 @@ func (h *BlobHandler) Subscribe(ctx context.Context, namespace []byte) (<-chan j return nil, err } - sub := h.svc.BlobSubscribe(ns) + sub, err := h.svc.BlobSubscribe(ns) + if err != nil { + return nil, err + } out := make(chan json.RawMessage, cap(sub.Events())) go func() { diff --git a/pkg/api/jsonrpc/header.go b/pkg/api/jsonrpc/header.go index 34eef96..42423e0 100644 --- a/pkg/api/jsonrpc/header.go +++ b/pkg/api/jsonrpc/header.go @@ -30,7 +30,10 @@ func (h *HeaderHandler) NetworkHead(ctx context.Context) (json.RawMessage, error // Subscribe returns a channel of header events. // Only available over WebSocket. func (h *HeaderHandler) Subscribe(ctx context.Context) (<-chan json.RawMessage, error) { - sub := h.svc.HeaderSubscribe() + sub, err := h.svc.HeaderSubscribe() + if err != nil { + return nil, err + } out := make(chan json.RawMessage, cap(sub.Events())) go func() { diff --git a/pkg/api/jsonrpc/server_test.go b/pkg/api/jsonrpc/server_test.go index a5f644e..388b02e 100644 --- a/pkg/api/jsonrpc/server_test.go +++ b/pkg/api/jsonrpc/server_test.go @@ -194,7 +194,7 @@ func TestJSONRPCHeaderGetByHeight(t *testing.T) { RawHeader: []byte(`{"height":"42"}`), } - notifier := api.NewNotifier(16, zerolog.Nop()) + notifier := api.NewNotifier(16, 1024, zerolog.Nop()) svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop()) srv := NewServer(svc, zerolog.Nop()) @@ -217,7 +217,7 @@ func TestJSONRPCHeaderLocalHead(t *testing.T) { RawHeader: []byte(`{"height":"100"}`), } - notifier := api.NewNotifier(16, zerolog.Nop()) + notifier := api.NewNotifier(16, 1024, zerolog.Nop()) svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop()) srv := NewServer(svc, zerolog.Nop()) @@ -238,7 +238,7 @@ func TestJSONRPCHeaderNetworkHead(t *testing.T) { }, } - notifier := api.NewNotifier(16, zerolog.Nop()) + notifier := api.NewNotifier(16, 1024, zerolog.Nop()) svc := api.NewService(newMockStore(), ft, nil, notifier, zerolog.Nop()) srv := NewServer(svc, zerolog.Nop()) @@ -258,7 +258,7 @@ func TestJSONRPCBlobGetAll(t *testing.T) { {Height: 10, Namespace: ns, Data: []byte("d1"), Commitment: []byte("c1"), Index: 0}, } - notifier := api.NewNotifier(16, zerolog.Nop()) + notifier := api.NewNotifier(16, 1024, zerolog.Nop()) svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop()) srv := NewServer(svc, zerolog.Nop()) @@ -283,7 +283,7 @@ func TestJSONRPCBlobGetByCommitment(t *testing.T) { {Height: 10, Namespace: ns, Data: []byte("d1"), Commitment: []byte("c1"), Index: 0}, } - notifier := api.NewNotifier(16, zerolog.Nop()) + notifier := api.NewNotifier(16, 1024, zerolog.Nop()) svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop()) srv := NewServer(svc, zerolog.Nop()) @@ -311,7 +311,7 @@ func TestJSONRPCBlobGetByCommitment(t *testing.T) { } func TestJSONRPCStubMethods(t *testing.T) { - notifier := api.NewNotifier(16, zerolog.Nop()) + notifier := api.NewNotifier(16, 1024, zerolog.Nop()) svc := api.NewService(newMockStore(), &mockFetcher{}, nil, notifier, zerolog.Nop()) srv := NewServer(svc, zerolog.Nop()) diff --git a/pkg/api/notifier.go b/pkg/api/notifier.go index e0ec924..e9f4f91 100644 --- a/pkg/api/notifier.go +++ b/pkg/api/notifier.go @@ -1,6 +1,7 @@ package api import ( + "fmt" "sync" "sync/atomic" @@ -32,24 +33,30 @@ func (s *Subscription) Events() <-chan HeightEvent { // Notifier fans out height events to subscribed API clients. type Notifier struct { - mu sync.RWMutex - subscribers map[uint64]*Subscription - nextID atomic.Uint64 - bufferSize int - metrics metrics.Recorder - log zerolog.Logger + mu sync.RWMutex + subscribers map[uint64]*Subscription + nextID atomic.Uint64 + bufferSize int + maxSubscribers int + metrics metrics.Recorder + log zerolog.Logger } -// NewNotifier creates a Notifier with the given per-subscriber buffer size. -func NewNotifier(bufferSize int, log zerolog.Logger) *Notifier { +// NewNotifier creates a Notifier with the given per-subscriber buffer size +// and maximum number of concurrent subscribers. +func NewNotifier(bufferSize, maxSubscribers int, log zerolog.Logger) *Notifier { if bufferSize <= 0 { bufferSize = 64 } + if maxSubscribers <= 0 { + maxSubscribers = 1024 + } return &Notifier{ - subscribers: make(map[uint64]*Subscription), - bufferSize: bufferSize, - metrics: metrics.Nop(), - log: log.With().Str("component", "notifier").Logger(), + subscribers: make(map[uint64]*Subscription), + bufferSize: bufferSize, + maxSubscribers: maxSubscribers, + metrics: metrics.Nop(), + log: log.With().Str("component", "notifier").Logger(), } } @@ -60,7 +67,15 @@ func (n *Notifier) SetMetrics(m metrics.Recorder) { // Subscribe creates a new subscription. If namespaces is empty, all blobs are // delivered. The returned Subscription must be cleaned up via Unsubscribe. -func (n *Notifier) Subscribe(namespaces []types.Namespace) *Subscription { +// Returns an error if the maximum number of subscribers has been reached. +func (n *Notifier) Subscribe(namespaces []types.Namespace) (*Subscription, error) { + n.mu.Lock() + defer n.mu.Unlock() + + if len(n.subscribers) >= n.maxSubscribers { + return nil, fmt.Errorf("maximum subscribers reached (%d)", n.maxSubscribers) + } + id := n.nextID.Add(1) nsSet := make(map[types.Namespace]struct{}, len(namespaces)) for _, ns := range namespaces { @@ -73,13 +88,11 @@ func (n *Notifier) Subscribe(namespaces []types.Namespace) *Subscription { namespaces: nsSet, } - n.mu.Lock() n.subscribers[id] = sub - n.mu.Unlock() n.log.Debug().Uint64("sub_id", id).Int("namespaces", len(namespaces)).Msg("new subscription") n.metrics.SetActiveSubscriptions(len(n.subscribers)) - return sub + return sub, nil } // Unsubscribe removes a subscription and closes its channel. diff --git a/pkg/api/notifier_test.go b/pkg/api/notifier_test.go index 2574139..78a1f3a 100644 --- a/pkg/api/notifier_test.go +++ b/pkg/api/notifier_test.go @@ -33,8 +33,11 @@ func makeEvent(height uint64, namespaces ...types.Namespace) HeightEvent { } func TestNotifierSubscribePublish(t *testing.T) { - n := NewNotifier(16, zerolog.Nop()) - sub := n.Subscribe(nil) // all namespaces + n := NewNotifier(16, 1024, zerolog.Nop()) + sub, err := n.Subscribe(nil) // all namespaces + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } defer n.Unsubscribe(sub) event := makeEvent(1, testNamespace(1), testNamespace(2)) @@ -54,11 +57,14 @@ func TestNotifierSubscribePublish(t *testing.T) { } func TestNotifierNamespaceFilter(t *testing.T) { - n := NewNotifier(16, zerolog.Nop()) + n := NewNotifier(16, 1024, zerolog.Nop()) ns1 := testNamespace(1) ns2 := testNamespace(2) - sub := n.Subscribe([]types.Namespace{ns1}) + sub, err := n.Subscribe([]types.Namespace{ns1}) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } defer n.Unsubscribe(sub) event := makeEvent(1, ns1, ns2) @@ -78,9 +84,12 @@ func TestNotifierNamespaceFilter(t *testing.T) { } func TestNotifierMultipleSubscribers(t *testing.T) { - n := NewNotifier(16, zerolog.Nop()) - sub1 := n.Subscribe(nil) - sub2 := n.Subscribe(nil) + n := NewNotifier(16, 1024, zerolog.Nop()) + sub1, err1 := n.Subscribe(nil) + sub2, err2 := n.Subscribe(nil) + if err1 != nil || err2 != nil { + t.Fatalf("Subscribe failed: %v, %v", err1, err2) + } defer n.Unsubscribe(sub1) defer n.Unsubscribe(sub2) @@ -99,8 +108,11 @@ func TestNotifierMultipleSubscribers(t *testing.T) { } func TestNotifierBufferOverflow(t *testing.T) { - n := NewNotifier(2, zerolog.Nop()) - sub := n.Subscribe(nil) + n := NewNotifier(2, 1024, zerolog.Nop()) + sub, err := n.Subscribe(nil) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } defer n.Unsubscribe(sub) // Fill buffer. @@ -126,8 +138,11 @@ func TestNotifierBufferOverflow(t *testing.T) { } func TestNotifierUnsubscribe(t *testing.T) { - n := NewNotifier(16, zerolog.Nop()) - sub := n.Subscribe(nil) + n := NewNotifier(16, 1024, zerolog.Nop()) + sub, err := n.Subscribe(nil) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } n.Unsubscribe(sub) @@ -144,8 +159,11 @@ func TestNotifierUnsubscribe(t *testing.T) { func TestNotifierContiguityTracking(t *testing.T) { // Verify that after a buffer overflow, lastHeight is reset. // Next delivery should succeed without panic. - n := NewNotifier(1, zerolog.Nop()) - sub := n.Subscribe(nil) + n := NewNotifier(1, 1024, zerolog.Nop()) + sub, err := n.Subscribe(nil) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } defer n.Unsubscribe(sub) n.Publish(makeEvent(1)) @@ -167,8 +185,11 @@ func TestNotifierContiguityTracking(t *testing.T) { } func TestNotifierEmptyNamespaceSetDeliversAll(t *testing.T) { - n := NewNotifier(16, zerolog.Nop()) - sub := n.Subscribe([]types.Namespace{}) // explicit empty slice + n := NewNotifier(16, 1024, zerolog.Nop()) + sub, err := n.Subscribe([]types.Namespace{}) // explicit empty slice + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } defer n.Unsubscribe(sub) ns1 := testNamespace(1) @@ -184,3 +205,21 @@ func TestNotifierEmptyNamespaceSetDeliversAll(t *testing.T) { t.Fatal("timed out") } } + +func TestNotifierMaxSubscribers(t *testing.T) { + n := NewNotifier(1, 2, zerolog.Nop()) + + _, err1 := n.Subscribe(nil) + _, err2 := n.Subscribe(nil) + _, err3 := n.Subscribe(nil) + + if err1 != nil { + t.Errorf("first subscribe failed: %v", err1) + } + if err2 != nil { + t.Errorf("second subscribe failed: %v", err2) + } + if err3 == nil { + t.Error("third subscribe should have failed") + } +} diff --git a/pkg/api/service.go b/pkg/api/service.go index 76b2dbd..5cc4324 100644 --- a/pkg/api/service.go +++ b/pkg/api/service.go @@ -124,7 +124,7 @@ func (s *Service) BlobIncluded(ctx context.Context, height uint64, namespace []b } // BlobSubscribe creates a subscription for blobs in the given namespace. -func (s *Service) BlobSubscribe(namespace types.Namespace) *Subscription { +func (s *Service) BlobSubscribe(namespace types.Namespace) (*Subscription, error) { return s.notifier.Subscribe([]types.Namespace{namespace}) } @@ -160,7 +160,7 @@ func (s *Service) HeaderNetworkHead(ctx context.Context) (json.RawMessage, error } // HeaderSubscribe creates a subscription for all new headers. -func (s *Service) HeaderSubscribe() *Subscription { +func (s *Service) HeaderSubscribe() (*Subscription, error) { return s.notifier.Subscribe(nil) } diff --git a/pkg/api/service_test.go b/pkg/api/service_test.go index 60297e6..9ae9d08 100644 --- a/pkg/api/service_test.go +++ b/pkg/api/service_test.go @@ -135,7 +135,7 @@ func TestServiceBlobGet(t *testing.T) { {Height: 10, Namespace: ns, Data: []byte("d2"), Commitment: []byte("c2"), Index: 1}, } - svc := NewService(st, &mockFetcher{}, nil, NewNotifier(16, zerolog.Nop()), zerolog.Nop()) + svc := NewService(st, &mockFetcher{}, nil, NewNotifier(16, 1024, zerolog.Nop()), zerolog.Nop()) raw, err := svc.BlobGet(context.Background(), 10, ns, commitment) if err != nil { @@ -171,7 +171,7 @@ func TestServiceBlobGetByCommitment(t *testing.T) { {Height: 10, Namespace: ns, Data: []byte("d1"), Commitment: []byte("c1"), Index: 0}, } } - svc := NewService(st, &mockFetcher{}, nil, NewNotifier(16, zerolog.Nop()), zerolog.Nop()) + svc := NewService(st, &mockFetcher{}, nil, NewNotifier(16, 1024, zerolog.Nop()), zerolog.Nop()) raw, err := svc.BlobGetByCommitment(context.Background(), tt.commit) if tt.wantErr { @@ -198,7 +198,7 @@ func TestServiceBlobGetByCommitment(t *testing.T) { func TestServiceBlobGetNotFound(t *testing.T) { st := newMockStore() ns := testNamespace(1) - svc := NewService(st, &mockFetcher{}, nil, NewNotifier(16, zerolog.Nop()), zerolog.Nop()) + svc := NewService(st, &mockFetcher{}, nil, NewNotifier(16, 1024, zerolog.Nop()), zerolog.Nop()) _, err := svc.BlobGet(context.Background(), 10, ns, []byte("missing")) if !errors.Is(err, store.ErrNotFound) { @@ -216,7 +216,7 @@ func TestServiceBlobGetAll(t *testing.T) { {Height: 10, Namespace: ns2, Data: []byte("d2"), Commitment: []byte("c2"), Index: 0}, } - svc := NewService(st, &mockFetcher{}, nil, NewNotifier(16, zerolog.Nop()), zerolog.Nop()) + svc := NewService(st, &mockFetcher{}, nil, NewNotifier(16, 1024, zerolog.Nop()), zerolog.Nop()) raw, err := svc.BlobGetAll(context.Background(), 10, []types.Namespace{ns1, ns2}, 0, 0) if err != nil { @@ -234,7 +234,7 @@ func TestServiceBlobGetAll(t *testing.T) { func TestServiceBlobGetAllEmpty(t *testing.T) { st := newMockStore() - svc := NewService(st, &mockFetcher{}, nil, NewNotifier(16, zerolog.Nop()), zerolog.Nop()) + svc := NewService(st, &mockFetcher{}, nil, NewNotifier(16, 1024, zerolog.Nop()), zerolog.Nop()) raw, err := svc.BlobGetAll(context.Background(), 10, []types.Namespace{testNamespace(1)}, 0, 0) if err != nil { @@ -252,7 +252,7 @@ func TestServiceHeaderGetByHeight(t *testing.T) { RawHeader: []byte(`{"height":"42"}`), } - svc := NewService(st, &mockFetcher{}, nil, NewNotifier(16, zerolog.Nop()), zerolog.Nop()) + svc := NewService(st, &mockFetcher{}, nil, NewNotifier(16, 1024, zerolog.Nop()), zerolog.Nop()) raw, err := svc.HeaderGetByHeight(context.Background(), 42) if err != nil { @@ -271,7 +271,7 @@ func TestServiceHeaderLocalHead(t *testing.T) { RawHeader: []byte(`{"height":"100"}`), } - svc := NewService(st, &mockFetcher{}, nil, NewNotifier(16, zerolog.Nop()), zerolog.Nop()) + svc := NewService(st, &mockFetcher{}, nil, NewNotifier(16, 1024, zerolog.Nop()), zerolog.Nop()) raw, err := svc.HeaderLocalHead(context.Background()) if err != nil { @@ -290,7 +290,7 @@ func TestServiceHeaderNetworkHead(t *testing.T) { }, } - svc := NewService(newMockStore(), ft, nil, NewNotifier(16, zerolog.Nop()), zerolog.Nop()) + svc := NewService(newMockStore(), ft, nil, NewNotifier(16, 1024, zerolog.Nop()), zerolog.Nop()) raw, err := svc.HeaderNetworkHead(context.Background()) if err != nil { @@ -302,7 +302,7 @@ func TestServiceHeaderNetworkHead(t *testing.T) { } func TestServiceProofForwardingUnavailable(t *testing.T) { - svc := NewService(newMockStore(), &mockFetcher{}, nil, NewNotifier(16, zerolog.Nop()), zerolog.Nop()) + svc := NewService(newMockStore(), &mockFetcher{}, nil, NewNotifier(16, 1024, zerolog.Nop()), zerolog.Nop()) _, err := svc.BlobGetProof(context.Background(), 1, nil, nil) if err == nil { From a09f24b5b8edf4a70916b131a6534191ff4972ec Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 20 Feb 2026 11:13:32 +0100 Subject: [PATCH 5/6] revert config push --- config.yaml | 51 --------------------------------------------------- 1 file changed, 51 deletions(-) delete mode 100644 config.yaml diff --git a/config.yaml b/config.yaml deleted file mode 100644 index c5e00d7..0000000 --- a/config.yaml +++ /dev/null @@ -1,51 +0,0 @@ -# Apex configuration -# Generated by: apex init - -data_source: - # Data source type: "node" (Celestia DA node) or "app" (celestia-app CometBFT RPC) - type: "node" - - # Celestia DA node RPC endpoint (required when type: "node") - celestia_node_url: "http://100.97.172.78:36668" - - # Celestia-app CometBFT RPC endpoint (required when type: "app") - # celestia_app_url: "http://localhost:26657" - - # Auth token: set via APEX_AUTH_TOKEN env var (not read from this file). - - # Namespaces to index (hex-encoded, 29 bytes = 58 hex chars each). - namespaces: [0x000000000000000000000000000000000000005d2e074163aa3b4d9818,0x0000000000000000000000000000000000000037586d889dd7a6a44ca8] - -storage: - # Path to the SQLite database file - db_path: "apex.db" - -rpc: - # Address for the JSON-RPC API server (HTTP/WebSocket) - listen_addr: ":8080" - # Address for the gRPC API server - grpc_listen_addr: ":9090" - -sync: - # Height to start syncing from (0 = genesis) - start_height: 7970386 - # Number of headers per backfill batch - batch_size: 256 - # Number of concurrent fetch workers - concurrency: 12 - -subscription: - # Event buffer size per subscriber (for API subscriptions) - buffer_size: 64 - -metrics: - # Enable Prometheus metrics endpoint - enabled: true - # Address for the metrics server - listen_addr: ":9091" - -log: - # Log level: trace, debug, info, warn, error, fatal, panic - level: "info" - # Log format: json or console - format: "json" From ea4b03ebd2d2df768e65a5615a69a3ec575db02a Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 20 Feb 2026 13:20:52 +0100 Subject: [PATCH 6/6] amendments --- config/load.go | 5 +---- config/load_test.go | 11 ++++++++++- docs/running.md | 3 +++ pkg/api/notifier_test.go | 10 ++++++---- pkg/fetch/celestia_node.go | 14 ++++++++------ 5 files changed, 28 insertions(+), 15 deletions(-) diff --git a/config/load.go b/config/load.go index 0a44edc..626fbab 100644 --- a/config/load.go +++ b/config/load.go @@ -66,7 +66,7 @@ rpc: grpc_listen_addr: ":9090" # HTTP read/write timeouts in seconds read_timeout: 30 - # write_timeout: 30 + write_timeout: 30 sync: # Height to start syncing from (0 = genesis) @@ -205,9 +205,6 @@ func validate(cfg *Config) error { if err := validateProfiling(&cfg.Profiling); err != nil { return err } - if cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" { - return fmt.Errorf("profiling.listen_addr is required when profiling is enabled") - } if !validLogLevels[cfg.Log.Level] { return fmt.Errorf("log.level %q is invalid; must be one of trace/debug/info/warn/error/fatal/panic", cfg.Log.Level) } diff --git a/config/load_test.go b/config/load_test.go index e305275..68a858e 100644 --- a/config/load_test.go +++ b/config/load_test.go @@ -7,7 +7,7 @@ import ( "testing" ) -func TestDefaultConfigProfilingDefaults(t *testing.T) { +func TestDefaultConfigDefaults(t *testing.T) { t.Parallel() cfg := DefaultConfig() @@ -23,6 +23,15 @@ func TestDefaultConfigProfilingDefaults(t *testing.T) { if cfg.Sync.Concurrency != 12 { t.Fatalf("sync.concurrency default = %d, want %d", cfg.Sync.Concurrency, 12) } + if cfg.RPC.ReadTimeout != 30 { + t.Fatalf("rpc.read_timeout default = %d, want 30", cfg.RPC.ReadTimeout) + } + if cfg.RPC.WriteTimeout != 30 { + t.Fatalf("rpc.write_timeout default = %d, want 30", cfg.RPC.WriteTimeout) + } + if cfg.Subscription.MaxSubscribers != 1024 { + t.Fatalf("subscription.max_subscribers default = %d, want 1024", cfg.Subscription.MaxSubscribers) + } } func TestLoadProfilingEnabledRequiresListenAddr(t *testing.T) { diff --git a/docs/running.md b/docs/running.md index 1976f23..4b53620 100644 --- a/docs/running.md +++ b/docs/running.md @@ -70,6 +70,8 @@ storage: rpc: listen_addr: ":8080" # JSON-RPC + health grpc_listen_addr: ":9090" # gRPC + read_timeout: 30 # HTTP read timeout in seconds + write_timeout: 30 # HTTP write timeout in seconds sync: start_height: 0 # 0 = genesis @@ -78,6 +80,7 @@ sync: subscription: buffer_size: 64 # event buffer per subscriber + max_subscribers: 1024 # maximum concurrent subscribers metrics: enabled: true diff --git a/pkg/api/notifier_test.go b/pkg/api/notifier_test.go index 78a1f3a..fddece8 100644 --- a/pkg/api/notifier_test.go +++ b/pkg/api/notifier_test.go @@ -209,16 +209,18 @@ func TestNotifierEmptyNamespaceSetDeliversAll(t *testing.T) { func TestNotifierMaxSubscribers(t *testing.T) { n := NewNotifier(1, 2, zerolog.Nop()) - _, err1 := n.Subscribe(nil) - _, err2 := n.Subscribe(nil) + sub1, err1 := n.Subscribe(nil) + sub2, err2 := n.Subscribe(nil) _, err3 := n.Subscribe(nil) if err1 != nil { - t.Errorf("first subscribe failed: %v", err1) + t.Fatalf("first subscribe failed: %v", err1) } + defer n.Unsubscribe(sub1) if err2 != nil { - t.Errorf("second subscribe failed: %v", err2) + t.Fatalf("second subscribe failed: %v", err2) } + defer n.Unsubscribe(sub2) if err3 == nil { t.Error("third subscribe should have failed") } diff --git a/pkg/fetch/celestia_node.go b/pkg/fetch/celestia_node.go index 249eb69..8275f67 100644 --- a/pkg/fetch/celestia_node.go +++ b/pkg/fetch/celestia_node.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "math/rand/v2" "net" "net/http" "strconv" @@ -181,7 +182,9 @@ func (f *CelestiaNodeFetcher) SubscribeHeaders(ctx context.Context) (<-chan *typ // GetProof forwards a blob proof request to the upstream Celestia node. func (f *CelestiaNodeFetcher) GetProof(ctx context.Context, height uint64, namespace, commitment []byte) (json.RawMessage, error) { - raw, err := f.blob.GetProof(ctx, height, namespace, commitment) + raw, err := f.callRawWithRetry(ctx, "blob.GetProof", func(callCtx context.Context) (json.RawMessage, error) { + return f.blob.GetProof(callCtx, height, namespace, commitment) + }) if err != nil { return nil, fmt.Errorf("blob.GetProof(%d): %w", height, err) } @@ -189,6 +192,9 @@ func (f *CelestiaNodeFetcher) GetProof(ctx context.Context, height uint64, names } // Included forwards a blob inclusion check to the upstream Celestia node. +// Does not use callRawWithRetry because the ProofForwarder interface returns +// (bool, error) rather than (json.RawMessage, error). Proof inclusion checks +// are user-initiated point queries, so retries are left to the caller. func (f *CelestiaNodeFetcher) Included(ctx context.Context, height uint64, namespace []byte, proof json.RawMessage, commitment []byte) (bool, error) { ok, err := f.blob.Included(ctx, height, namespace, proof, commitment) if err != nil { @@ -304,11 +310,7 @@ func retryDelay(attempt int) time.Duration { if jitterCap <= 0 { return base } - n := time.Now().UnixNano() - if n < 0 { - n = -n - } - return base + time.Duration(n%int64(jitterCap)) + return base + time.Duration(rand.Int64N(int64(jitterCap))) } func isNotFoundErr(err error) bool {