diff --git a/CLAUDE.md b/CLAUDE.md index 99214eb..7764eb3 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,5 +1,7 @@ # Apex — Celestia Namespace Indexer +Lightweight indexer that watches Celestia namespaces, stores blobs/headers in SQLite, and exposes them via JSON-RPC, gRPC, and REST health endpoints. Includes Prometheus observability, a CLI client, and multi-stage Docker build. + ## Build Commands ```bash @@ -15,26 +17,97 @@ just tidy # go mod tidy ## Architecture -Apex is a lightweight indexer that watches Celestia namespaces, stores blobs/headers in SQLite, and exposes them via an HTTP API. +### Data Flow ``` -cmd/apex/ CLI entrypoint (cobra) -config/ YAML config loading and validation -pkg/types/ Domain types (Namespace, Blob, Header, SyncState) -pkg/store/ Storage interface (SQLite impl in Phase 1) -pkg/fetch/ Data fetcher interface (Celestia node client in Phase 1) -pkg/sync/ Sync coordinator (backfill + streaming) -pkg/api/ HTTP API server (Phase 2) +Celestia Node → Fetcher → Sync Coordinator → Store (SQLite) + → Notifier → Subscribers + ↕ + API (JSON-RPC + gRPC + Health) +``` + +The sync coordinator runs in two phases: **backfill** (historical blocks in batches) then **streaming** (live via header subscription). Height observers publish events to the notifier which fans out to API subscribers. + +### File Structure + ``` +cmd/apex/ + main.go CLI entrypoint, server wiring, graceful shutdown + client.go Thin HTTP JSON-RPC client for CLI commands + status.go `apex status` command (health endpoint) + blob_cmd.go `apex blob get|list` commands + config_cmd.go `apex config validate|show` commands + +config/ + config.go Config structs (DataSource, Storage, RPC, Sync, Metrics, Log) + load.go YAML loading, validation, env var override, template generation + +pkg/types/ + types.go Domain types: Namespace, Blob, Header, SyncState, SyncStatus + +pkg/store/ + store.go Store interface (PutBlobs, GetBlobs, PutHeader, GetHeader, sync state) + sqlite.go SQLite implementation with metrics instrumentation + migrations/ SQL migration files + +pkg/fetch/ + fetcher.go DataFetcher + ProofForwarder interfaces + celestia_node.go Celestia node-api client (headers, blobs, subscriptions, proofs) + +pkg/sync/ + coordinator.go Sync lifecycle: initialize → backfill → stream, tracks heights + backfill.go Concurrent batch backfill with configurable batch size/concurrency + subscription.go Header subscription manager for live streaming + +pkg/api/ + service.go API service layer (blob/header queries, proof forwarding, subscriptions) + notifier.go Event fan-out to subscribers with bounded buffers + health.go /health and /health/ready HTTP endpoints, HealthStatus JSON + jsonrpc/ JSON-RPC server (go-jsonrpc), blob/header/subscription handlers + grpc/ gRPC server, protobuf service implementations + gen/apex/v1/ Generated protobuf Go code + +pkg/metrics/ + metrics.go Recorder interface (nil-safe), nopRecorder, PromRecorder (Prometheus) + server.go HTTP server for /metrics endpoint + +proto/apex/v1/ Protobuf definitions (blob, header, types) + +Dockerfile Multi-stage build (golang builder + distroless runtime) +``` + +### Key Interfaces + +- **`store.Store`** — persistence (SQLite impl, instrumented with metrics) +- **`fetch.DataFetcher`** — block data retrieval (Celestia node client) +- **`fetch.ProofForwarder`** — proof/inclusion forwarding to upstream node +- **`metrics.Recorder`** — nil-safe metrics abstraction (Prometheus or no-op) +- **`api.StatusProvider`** — sync status for health endpoints (implemented by coordinator) + +### Ports (defaults) + +| Port | Protocol | Purpose | +|-------|----------|------------------| +| :8080 | HTTP | JSON-RPC + health| +| :9090 | TCP | gRPC | +| :9091 | HTTP | Prometheus /metrics | + +### Config + +YAML with strict unknown-field rejection. Auth token via `APEX_AUTH_TOKEN` env var only (not in config file). See `config/config.go` for all fields and `DefaultConfig()` for defaults. ## Conventions -- Go 1.23 minimum (slog, range-over-func available) +- Go 1.25+ (`go.mod` specifies 1.25.0) - SQLite via `modernc.org/sqlite` (CGo-free) - Config: YAML (`gopkg.in/yaml.v3`), strict unknown-field rejection - Logging: `rs/zerolog` - CLI: `spf13/cobra` -- Linter: golangci-lint v2 (.golangci.yml v2 format) +- Metrics: `prometheus/client_golang` behind nil-safe `Recorder` interface +- JSON-RPC: `filecoin-project/go-jsonrpc` +- gRPC: `google.golang.org/grpc` + `google.golang.org/protobuf` +- Protobuf codegen: `buf` (`buf.yaml` + `buf.gen.yaml`) +- Linter: golangci-lint v2 (.golangci.yml v2 format), gocyclo max 15 - Formatter: gofumpt - Build runner: just (justfile) diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..984d93b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +FROM golang:1.25-bookworm AS builder + +WORKDIR /src + +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . + +RUN CGO_ENABLED=0 go build -trimpath \ + -ldflags="-s -w -X main.version=$(git describe --tags --always --dirty 2>/dev/null || echo docker)" \ + -o /apex ./cmd/apex + +FROM gcr.io/distroless/static-debian12:nonroot + +COPY --from=builder /apex /apex + +USER 65532:65532 + +EXPOSE 8080 9090 9091 + +ENTRYPOINT ["/apex", "start"] diff --git a/cmd/apex/blob_cmd.go b/cmd/apex/blob_cmd.go new file mode 100644 index 0000000..943c1c4 --- /dev/null +++ b/cmd/apex/blob_cmd.go @@ -0,0 +1,106 @@ +package main + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "os" + "strconv" + + "github.com/spf13/cobra" +) + +func blobCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "blob", + Short: "Query blobs from the indexer", + } + cmd.AddCommand(blobGetCmd()) + cmd.AddCommand(blobListCmd()) + return cmd +} + +func blobGetCmd() *cobra.Command { + return &cobra.Command{ + Use: "get ", + Short: "Get a single blob by height, namespace, and commitment", + Args: cobra.ExactArgs(3), + RunE: func(cmd *cobra.Command, args []string) error { + addr, _ := cmd.Flags().GetString("rpc-addr") + + height, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + return fmt.Errorf("invalid height: %w", err) + } + + ns, err := hex.DecodeString(args[1]) + if err != nil { + return fmt.Errorf("invalid namespace hex: %w", err) + } + + commitment, err := hex.DecodeString(args[2]) + if err != nil { + return fmt.Errorf("invalid commitment hex: %w", err) + } + + client := newRPCClient(addr) + result, err := client.call(cmd.Context(), "blob.Get", height, ns, commitment) + if err != nil { + return err + } + + return printJSON(cmd, result) + }, + } +} + +func blobListCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "list ", + Short: "List all blobs at a given height", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + addr, _ := cmd.Flags().GetString("rpc-addr") + nsHex, _ := cmd.Flags().GetString("namespace") + + height, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + return fmt.Errorf("invalid height: %w", err) + } + + var namespaces [][]byte + if nsHex != "" { + ns, err := hex.DecodeString(nsHex) + if err != nil { + return fmt.Errorf("invalid namespace hex: %w", err) + } + namespaces = [][]byte{ns} + } + + client := newRPCClient(addr) + result, err := client.call(cmd.Context(), "blob.GetAll", height, namespaces) + if err != nil { + return err + } + + return printJSON(cmd, result) + }, + } + + cmd.Flags().String("namespace", "", "filter by namespace (hex-encoded)") + return cmd +} + +func printJSON(_ *cobra.Command, raw json.RawMessage) error { + return prettyPrintJSON(raw) +} + +func prettyPrintJSON(raw json.RawMessage) error { + var out any + if err := json.Unmarshal(raw, &out); err != nil { + return err + } + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + return enc.Encode(out) +} diff --git a/cmd/apex/client.go b/cmd/apex/client.go new file mode 100644 index 0000000..e35b7b0 --- /dev/null +++ b/cmd/apex/client.go @@ -0,0 +1,108 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +// rpcClient is a thin JSON-RPC client over HTTP for CLI commands. +type rpcClient struct { + url string + client *http.Client +} + +func newRPCClient(addr string) *rpcClient { + return &rpcClient{ + url: "http://" + addr, + client: &http.Client{ + Timeout: 10 * time.Second, + }, + } +} + +type jsonRPCRequest struct { + Jsonrpc string `json:"jsonrpc"` + Method string `json:"method"` + Params []any `json:"params"` + ID int `json:"id"` +} + +type jsonRPCResponse struct { + Result json.RawMessage `json:"result"` + Error *jsonRPCError `json:"error,omitempty"` +} + +type jsonRPCError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func (c *rpcClient) call(ctx context.Context, method string, params ...any) (json.RawMessage, error) { + if params == nil { + params = []any{} + } + + body, err := json.Marshal(jsonRPCRequest{ + Jsonrpc: "2.0", + Method: method, + Params: params, + ID: 1, + }) + if err != nil { + return nil, fmt.Errorf("marshal request: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.url, bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("send request: %w", err) + } + defer resp.Body.Close() //nolint:errcheck + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read response: %w", err) + } + + var rpcResp jsonRPCResponse + if err := json.Unmarshal(respBody, &rpcResp); err != nil { + return nil, fmt.Errorf("unmarshal response: %w", err) + } + + if rpcResp.Error != nil { + return nil, fmt.Errorf("rpc error %d: %s", rpcResp.Error.Code, rpcResp.Error.Message) + } + + return rpcResp.Result, nil +} + +// fetchHealth fetches the health endpoint directly over HTTP. +func (c *rpcClient) fetchHealth(ctx context.Context) (json.RawMessage, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url+"/health", nil) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("send request: %w", err) + } + defer resp.Body.Close() //nolint:errcheck + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read response: %w", err) + } + + return body, nil +} diff --git a/cmd/apex/config_cmd.go b/cmd/apex/config_cmd.go new file mode 100644 index 0000000..8980fb9 --- /dev/null +++ b/cmd/apex/config_cmd.go @@ -0,0 +1,64 @@ +package main + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + "gopkg.in/yaml.v3" + + "github.com/evstack/apex/config" +) + +func configCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "config", + Short: "Config management commands", + } + cmd.AddCommand(configValidateCmd()) + cmd.AddCommand(configShowCmd()) + return cmd +} + +func configValidateCmd() *cobra.Command { + return &cobra.Command{ + Use: "validate", + Short: "Validate the config file", + RunE: func(cmd *cobra.Command, _ []string) error { + cfgPath, err := configPath(cmd) + if err != nil { + return err + } + _, err = config.Load(cfgPath) + if err != nil { + return fmt.Errorf("config invalid: %w", err) + } + fmt.Println("config is valid") + return nil + }, + } +} + +func configShowCmd() *cobra.Command { + return &cobra.Command{ + Use: "show", + Short: "Show the effective config", + RunE: func(cmd *cobra.Command, _ []string) error { + cfgPath, err := configPath(cmd) + if err != nil { + return err + } + cfg, err := config.Load(cfgPath) + if err != nil { + return fmt.Errorf("loading config: %w", err) + } + + enc := yaml.NewEncoder(os.Stdout) + enc.SetIndent(2) + if err := enc.Encode(cfg); err != nil { + return fmt.Errorf("encoding config: %w", err) + } + return enc.Close() + }, + } +} diff --git a/cmd/apex/main.go b/cmd/apex/main.go index 7f0397e..bf6c217 100644 --- a/cmd/apex/main.go +++ b/cmd/apex/main.go @@ -14,12 +14,14 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/cobra" + "google.golang.org/grpc" "github.com/evstack/apex/config" "github.com/evstack/apex/pkg/api" grpcapi "github.com/evstack/apex/pkg/api/grpc" jsonrpcapi "github.com/evstack/apex/pkg/api/jsonrpc" "github.com/evstack/apex/pkg/fetch" + "github.com/evstack/apex/pkg/metrics" "github.com/evstack/apex/pkg/store" syncer "github.com/evstack/apex/pkg/sync" "github.com/evstack/apex/pkg/types" @@ -43,9 +45,15 @@ func rootCmd() *cobra.Command { root.PersistentFlags().String("config", "config.yaml", "path to config file") + root.PersistentFlags().String("rpc-addr", "localhost:8080", "JSON-RPC server address") + root.PersistentFlags().String("format", "json", "output format (json, table)") + root.AddCommand(versionCmd()) root.AddCommand(initCmd()) root.AddCommand(startCmd()) + root.AddCommand(statusCmd()) + root.AddCommand(blobCmd()) + root.AddCommand(configCmd()) return root } @@ -124,6 +132,20 @@ func setupLogger(cfg config.LogConfig) { } } +func setupMetrics(cfg *config.Config) (metrics.Recorder, *metrics.Server) { + if !cfg.Metrics.Enabled { + return metrics.Nop(), nil + } + rec := metrics.NewPromRecorder(nil, version) + srv := metrics.NewServer(cfg.Metrics.ListenAddr, log.Logger) + go func() { + if err := srv.Start(); err != nil { + log.Error().Err(err).Msg("metrics server error") + } + }() + return rec, srv +} + func runIndexer(ctx context.Context, cfg *config.Config) error { // Parse namespaces from config. namespaces, err := cfg.ParsedNamespaces() @@ -131,12 +153,15 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { return fmt.Errorf("parse namespaces: %w", err) } + rec, metricsSrv := setupMetrics(cfg) + // Open store. db, err := store.Open(cfg.Storage.DBPath) if err != nil { return fmt.Errorf("open store: %w", err) } defer db.Close() //nolint:errcheck + db.SetMetrics(rec) // Persist configured namespaces. ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) @@ -157,6 +182,7 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { // Set up API layer. notifier := api.NewNotifier(cfg.Subscription.BufferSize, log.Logger) + notifier.SetMetrics(rec) svc := api.NewService(db, fetcher, fetcher, notifier, log.Logger) // Build and run the sync coordinator with observer hook. @@ -165,16 +191,23 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { syncer.WithBatchSize(cfg.Sync.BatchSize), syncer.WithConcurrency(cfg.Sync.Concurrency), syncer.WithLogger(log.Logger), + syncer.WithMetrics(rec), syncer.WithObserver(func(h uint64, hdr *types.Header, blobs []types.Blob) { notifier.Publish(api.HeightEvent{Height: h, Header: hdr, Blobs: blobs}) }), ) - // Start JSON-RPC server. + // Build HTTP mux: mount health endpoints alongside JSON-RPC. rpcServer := jsonrpcapi.NewServer(svc, log.Logger) + healthHandler := api.NewHealthHandler(coord, db, notifier, version) + + mux := http.NewServeMux() + healthHandler.Register(mux) + mux.Handle("/", rpcServer) + httpSrv := &http.Server{ Addr: cfg.RPC.ListenAddr, - Handler: rpcServer, + Handler: mux, ReadHeaderTimeout: 10 * time.Second, } @@ -207,7 +240,17 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { err = coord.Run(ctx) - // Graceful shutdown. + gracefulShutdown(httpSrv, grpcSrv, metricsSrv) + + if err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("coordinator: %w", err) + } + + log.Info().Msg("apex indexer stopped") + return nil +} + +func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server) { stopped := make(chan struct{}) go func() { grpcSrv.GracefulStop() @@ -224,14 +267,13 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) defer shutdownCancel() - if shutdownErr := httpSrv.Shutdown(shutdownCtx); shutdownErr != nil { - log.Error().Err(shutdownErr).Msg("JSON-RPC server shutdown error") + if err := httpSrv.Shutdown(shutdownCtx); err != nil { + log.Error().Err(err).Msg("JSON-RPC server shutdown error") } - if err != nil && !errors.Is(err, context.Canceled) { - return fmt.Errorf("coordinator: %w", err) + if metricsSrv != nil { + if err := metricsSrv.Shutdown(shutdownCtx); err != nil { + log.Error().Err(err).Msg("metrics server shutdown error") + } } - - log.Info().Msg("apex indexer stopped") - return nil } diff --git a/cmd/apex/status.go b/cmd/apex/status.go new file mode 100644 index 0000000..6d93faf --- /dev/null +++ b/cmd/apex/status.go @@ -0,0 +1,69 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "text/tabwriter" + + "github.com/spf13/cobra" + + "github.com/evstack/apex/pkg/api" +) + +func statusCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "status", + Short: "Show indexer sync status", + RunE: func(cmd *cobra.Command, _ []string) error { + addr, _ := cmd.Flags().GetString("rpc-addr") + format, _ := cmd.Flags().GetString("format") + + client := newRPCClient(addr) + raw, err := client.fetchHealth(cmd.Context()) + if err != nil { + return fmt.Errorf("fetch status: %w", err) + } + + if format == "table" { + var hs api.HealthStatus + if err := json.Unmarshal(raw, &hs); err != nil { + return fmt.Errorf("decode status: %w", err) + } + return printStatusTable(&hs) + } + + // JSON output (pretty-printed). + var out any + if err := json.Unmarshal(raw, &out); err != nil { + return fmt.Errorf("decode status: %w", err) + } + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + return enc.Encode(out) + }, + } + return cmd +} + +func printStatusTable(hs *api.HealthStatus) error { + w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) + for _, row := range []struct { + label string + value any + }{ + {"Healthy", hs.Healthy}, + {"Sync State", hs.SyncState}, + {"Latest Height", hs.LatestHeight}, + {"Network Height", hs.NetworkHeight}, + {"Sync Lag", hs.SyncLag}, + {"Uptime", hs.Uptime}, + {"Version", hs.Version}, + {"Subscribers", hs.Subscribers}, + } { + if _, err := fmt.Fprintf(w, "%s:\t%v\n", row.label, row.value); err != nil { + return err + } + } + return w.Flush() +} diff --git a/config/config.go b/config/config.go index 60fe95b..22a9f20 100644 --- a/config/config.go +++ b/config/config.go @@ -13,6 +13,7 @@ type Config struct { RPC RPCConfig `yaml:"rpc"` Sync SyncConfig `yaml:"sync"` Subscription SubscriptionConfig `yaml:"subscription"` + Metrics MetricsConfig `yaml:"metrics"` Log LogConfig `yaml:"log"` } @@ -46,6 +47,12 @@ type SubscriptionConfig struct { BufferSize int `yaml:"buffer_size"` } +// MetricsConfig configures Prometheus metrics. +type MetricsConfig struct { + Enabled bool `yaml:"enabled"` + ListenAddr string `yaml:"listen_addr"` +} + // LogConfig configures logging. type LogConfig struct { Level string `yaml:"level"` @@ -72,6 +79,10 @@ func DefaultConfig() Config { Subscription: SubscriptionConfig{ BufferSize: 64, }, + Metrics: MetricsConfig{ + Enabled: true, + ListenAddr: ":9091", + }, Log: LogConfig{ Level: "info", Format: "json", diff --git a/config/load.go b/config/load.go index efe8958..9f9d827 100644 --- a/config/load.go +++ b/config/load.go @@ -60,6 +60,12 @@ subscription: # Event buffer size per subscriber (for API subscriptions) buffer_size: 64 +metrics: + # Enable Prometheus metrics endpoint + enabled: true + # Address for the metrics server + listen_addr: ":9091" + log: # Log level: trace, debug, info, warn, error, fatal, panic level: "info" @@ -122,6 +128,9 @@ func validate(cfg *Config) error { if cfg.Subscription.BufferSize <= 0 { return fmt.Errorf("subscription.buffer_size must be positive") } + if cfg.Metrics.Enabled && cfg.Metrics.ListenAddr == "" { + return fmt.Errorf("metrics.listen_addr is required when metrics are enabled") + } if !validLogLevels[cfg.Log.Level] { return fmt.Errorf("log.level %q is invalid; must be one of trace/debug/info/warn/error/fatal/panic", cfg.Log.Level) } diff --git a/go.mod b/go.mod index 7ddbecc..554401a 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.25.0 require ( github.com/filecoin-project/go-jsonrpc v0.10.1 + github.com/prometheus/client_golang v1.23.2 github.com/rs/zerolog v1.34.0 github.com/spf13/cobra v1.10.2 google.golang.org/grpc v1.79.1 @@ -13,29 +14,35 @@ require ( ) require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.4.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/go-log/v2 v2.0.8 // indirect - github.com/kr/pretty v0.3.1 // indirect + github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/ncruces/go-strftime v1.0.0 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/spf13/pflag v1.0.9 // indirect go.opencensus.io v0.22.3 // indirect go.uber.org/atomic v1.6.0 // indirect go.uber.org/multierr v1.5.0 // indirect go.uber.org/zap v1.14.1 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect golang.org/x/net v0.48.0 // indirect golang.org/x/sys v0.41.0 // indirect golang.org/x/text v0.32.0 // indirect golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260217215200-42d3e9bedb6d // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect modernc.org/libc v1.67.6 // indirect modernc.org/mathutil v1.7.1 // indirect modernc.org/memory v1.11.0 // indirect diff --git a/go.sum b/go.sum index 09dfec8..f30bace 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -44,33 +46,45 @@ github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLf github.com/ipfs/go-log/v2 v2.0.8 h1:3b3YNopMHlj4AvyhWAx0pDxqSQWYi4/WuWO7yRV6/Qg= github.com/ipfs/go-log/v2 v2.0.8/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= @@ -82,8 +96,8 @@ github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= @@ -100,12 +114,16 @@ go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6 go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.14.1 h1:nYDKopTbvAPq/NrUVZwT15y2lpROBiLLyoRTbXOYWOo= go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -179,7 +197,6 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/api/health.go b/pkg/api/health.go new file mode 100644 index 0000000..0f4e61e --- /dev/null +++ b/pkg/api/health.go @@ -0,0 +1,106 @@ +package api + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "time" + + "github.com/evstack/apex/pkg/store" + "github.com/evstack/apex/pkg/types" +) + +// StatusProvider returns the current sync status. +type StatusProvider interface { + Status() types.SyncStatus +} + +// HealthStatus is the JSON response for health endpoints. +type HealthStatus struct { + Healthy bool `json:"healthy"` + SyncState string `json:"sync_state"` + LatestHeight uint64 `json:"latest_height"` + NetworkHeight uint64 `json:"network_height"` + SyncLag uint64 `json:"sync_lag"` + Uptime string `json:"uptime"` + Version string `json:"version"` + Subscribers int `json:"subscribers"` +} + +// HealthHandler serves /health and /health/ready endpoints. +type HealthHandler struct { + status StatusProvider + store store.Store + notifier *Notifier + version string + startTime time.Time +} + +// NewHealthHandler creates a HealthHandler. +func NewHealthHandler(sp StatusProvider, s store.Store, n *Notifier, version string) *HealthHandler { + return &HealthHandler{ + status: sp, + store: s, + notifier: n, + version: version, + startTime: time.Now(), + } +} + +// Register mounts the health endpoints on the given mux. +func (h *HealthHandler) Register(mux *http.ServeMux) { + mux.HandleFunc("/health", h.handleHealth) + mux.HandleFunc("/health/ready", h.handleReady) +} + +func (h *HealthHandler) buildStatus() HealthStatus { + ss := h.status.Status() + + var lag uint64 + if ss.NetworkHeight > ss.LatestHeight { + lag = ss.NetworkHeight - ss.LatestHeight + } + + healthy := ss.State == types.Streaming || ss.State == types.Backfilling + + return HealthStatus{ + Healthy: healthy, + SyncState: ss.State.String(), + LatestHeight: ss.LatestHeight, + NetworkHeight: ss.NetworkHeight, + SyncLag: lag, + Uptime: time.Since(h.startTime).Truncate(time.Second).String(), + Version: h.version, + Subscribers: h.notifier.SubscriberCount(), + } +} + +func (h *HealthHandler) handleHealth(w http.ResponseWriter, _ *http.Request) { + hs := h.buildStatus() + + w.Header().Set("Content-Type", "application/json") + if !hs.Healthy { + w.WriteHeader(http.StatusServiceUnavailable) + } + json.NewEncoder(w).Encode(hs) //nolint:errcheck +} + +func (h *HealthHandler) handleReady(w http.ResponseWriter, r *http.Request) { + hs := h.buildStatus() + + // Additional readiness check: store must be accessible. + ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second) + defer cancel() + _, err := h.store.GetSyncState(ctx) + storeOK := err == nil || errors.Is(err, store.ErrNotFound) + + ready := hs.Healthy && storeOK + + w.Header().Set("Content-Type", "application/json") + if !ready { + hs.Healthy = false + w.WriteHeader(http.StatusServiceUnavailable) + } + json.NewEncoder(w).Encode(hs) //nolint:errcheck +} diff --git a/pkg/api/health_test.go b/pkg/api/health_test.go new file mode 100644 index 0000000..db52a19 --- /dev/null +++ b/pkg/api/health_test.go @@ -0,0 +1,103 @@ +package api + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/rs/zerolog" + + "github.com/evstack/apex/pkg/types" +) + +type mockStatusProvider struct { + status types.SyncStatus +} + +func (m *mockStatusProvider) Status() types.SyncStatus { + return m.status +} + +func TestHealthEndpoint(t *testing.T) { + tests := []struct { + name string + state types.SyncState + wantCode int + wantHealth bool + }{ + { + name: "streaming is healthy", + state: types.Streaming, + wantCode: http.StatusOK, + wantHealth: true, + }, + { + name: "backfilling is healthy", + state: types.Backfilling, + wantCode: http.StatusOK, + wantHealth: true, + }, + { + name: "initializing is unhealthy", + state: types.Initializing, + wantCode: http.StatusServiceUnavailable, + wantHealth: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sp := &mockStatusProvider{status: types.SyncStatus{ + State: tt.state, + LatestHeight: 100, + NetworkHeight: 105, + }} + notifier := NewNotifier(64, zerolog.Nop()) + h := NewHealthHandler(sp, newMockStore(), notifier, "test") + + mux := http.NewServeMux() + h.Register(mux) + + req := httptest.NewRequest(http.MethodGet, "/health", nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if rec.Code != tt.wantCode { + t.Errorf("status code = %d, want %d", rec.Code, tt.wantCode) + } + + var hs HealthStatus + if err := json.NewDecoder(rec.Body).Decode(&hs); err != nil { + t.Fatalf("decode response: %v", err) + } + if hs.Healthy != tt.wantHealth { + t.Errorf("healthy = %v, want %v", hs.Healthy, tt.wantHealth) + } + if hs.SyncLag != 5 { + t.Errorf("sync_lag = %d, want 5", hs.SyncLag) + } + }) + } +} + +func TestReadyEndpoint(t *testing.T) { + sp := &mockStatusProvider{status: types.SyncStatus{ + State: types.Streaming, + LatestHeight: 100, + NetworkHeight: 100, + }} + notifier := NewNotifier(64, zerolog.Nop()) + h := NewHealthHandler(sp, newMockStore(), notifier, "test") + + mux := http.NewServeMux() + h.Register(mux) + + req := httptest.NewRequest(http.MethodGet, "/health/ready", nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("status code = %d, want 200", rec.Code) + } +} diff --git a/pkg/api/notifier.go b/pkg/api/notifier.go index a8b5e23..e0ec924 100644 --- a/pkg/api/notifier.go +++ b/pkg/api/notifier.go @@ -6,6 +6,7 @@ import ( "github.com/rs/zerolog" + "github.com/evstack/apex/pkg/metrics" "github.com/evstack/apex/pkg/types" ) @@ -35,6 +36,7 @@ type Notifier struct { subscribers map[uint64]*Subscription nextID atomic.Uint64 bufferSize int + metrics metrics.Recorder log zerolog.Logger } @@ -46,10 +48,16 @@ func NewNotifier(bufferSize int, log zerolog.Logger) *Notifier { return &Notifier{ subscribers: make(map[uint64]*Subscription), bufferSize: bufferSize, + metrics: metrics.Nop(), log: log.With().Str("component", "notifier").Logger(), } } +// SetMetrics sets the metrics recorder for the notifier. +func (n *Notifier) SetMetrics(m metrics.Recorder) { + n.metrics = m +} + // Subscribe creates a new subscription. If namespaces is empty, all blobs are // delivered. The returned Subscription must be cleaned up via Unsubscribe. func (n *Notifier) Subscribe(namespaces []types.Namespace) *Subscription { @@ -70,6 +78,7 @@ func (n *Notifier) Subscribe(namespaces []types.Namespace) *Subscription { n.mu.Unlock() n.log.Debug().Uint64("sub_id", id).Int("namespaces", len(namespaces)).Msg("new subscription") + n.metrics.SetActiveSubscriptions(len(n.subscribers)) return sub } @@ -80,6 +89,7 @@ func (n *Notifier) Unsubscribe(sub *Subscription) { delete(n.subscribers, sub.id) close(sub.ch) } + n.metrics.SetActiveSubscriptions(len(n.subscribers)) n.mu.Unlock() } @@ -129,6 +139,7 @@ func (n *Notifier) Publish(event HeightEvent) { Uint64("sub_id", sub.id). Uint64("height", event.Height). Msg("subscriber buffer full, event dropped") + n.metrics.IncEventsDropped() // Reset lastHeight so next delivery triggers a gap warning. sub.lastHeight = 0 } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 0000000..eb682e4 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,222 @@ +package metrics + +import ( + "runtime" + "sync/atomic" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// Recorder defines the metrics interface for all Apex components. +// Implementations must be safe for concurrent use. A nil Recorder is +// valid — callers should use the package-level Nop() helper. +type Recorder interface { + // Sync metrics + SetSyncState(state string) + SetLatestHeight(h uint64) + SetNetworkHeight(h uint64) + IncBlobsProcessed(n int) + IncHeadersProcessed(n int) + ObserveBatchDuration(d time.Duration) + + // API metrics + IncAPIRequest(method, status string) + ObserveAPIRequestDuration(method string, d time.Duration) + + // Store metrics + ObserveStoreQueryDuration(op string, d time.Duration) + + // Subscription metrics + SetActiveSubscriptions(n int) + IncEventsDropped() +} + +// nopRecorder is a no-op implementation of Recorder. +type nopRecorder struct{} + +// Nop returns a Recorder that discards all metrics. +func Nop() Recorder { return nopRecorder{} } + +func (nopRecorder) SetSyncState(string) {} +func (nopRecorder) SetLatestHeight(uint64) {} +func (nopRecorder) SetNetworkHeight(uint64) {} +func (nopRecorder) IncBlobsProcessed(int) {} +func (nopRecorder) IncHeadersProcessed(int) {} +func (nopRecorder) ObserveBatchDuration(time.Duration) {} +func (nopRecorder) IncAPIRequest(string, string) {} +func (nopRecorder) ObserveAPIRequestDuration(string, time.Duration) {} +func (nopRecorder) ObserveStoreQueryDuration(string, time.Duration) {} +func (nopRecorder) SetActiveSubscriptions(int) {} +func (nopRecorder) IncEventsDropped() {} + +// PromRecorder implements Recorder using Prometheus metrics. +type PromRecorder struct { + syncState *prometheus.GaugeVec + latestHeight prometheus.Gauge + networkHeight prometheus.Gauge + syncLag prometheus.Gauge + blobsProcessed prometheus.Counter + headersProcessed prometheus.Counter + batchDuration prometheus.Histogram + apiRequests *prometheus.CounterVec + apiDuration *prometheus.HistogramVec + storeQueryDur *prometheus.HistogramVec + activeSubs prometheus.Gauge + eventsDropped prometheus.Counter + info *prometheus.GaugeVec + + // cached for lag calculation + lastLatest atomic.Uint64 + lastNetwork atomic.Uint64 +} + +// NewPromRecorder creates a PromRecorder and registers metrics with the +// provided Prometheus registerer. Pass nil to use the default registerer. +func NewPromRecorder(reg prometheus.Registerer, version string) *PromRecorder { + if reg == nil { + reg = prometheus.DefaultRegisterer + } + factory := promauto.With(reg) + + r := &PromRecorder{ + syncState: factory.NewGaugeVec(prometheus.GaugeOpts{ + Name: "apex_sync_state", + Help: "Current sync state (1 = active for the labeled state).", + }, []string{"state"}), + + latestHeight: factory.NewGauge(prometheus.GaugeOpts{ + Name: "apex_sync_latest_height", + Help: "Latest locally synced block height.", + }), + + networkHeight: factory.NewGauge(prometheus.GaugeOpts{ + Name: "apex_sync_network_height", + Help: "Latest known network block height.", + }), + + syncLag: factory.NewGauge(prometheus.GaugeOpts{ + Name: "apex_sync_lag", + Help: "Difference between network height and latest synced height.", + }), + + blobsProcessed: factory.NewCounter(prometheus.CounterOpts{ + Name: "apex_sync_blobs_processed_total", + Help: "Total number of blobs processed.", + }), + + headersProcessed: factory.NewCounter(prometheus.CounterOpts{ + Name: "apex_sync_headers_processed_total", + Help: "Total number of headers processed.", + }), + + batchDuration: factory.NewHistogram(prometheus.HistogramOpts{ + Name: "apex_sync_batch_duration_seconds", + Help: "Duration of backfill batch processing.", + Buckets: prometheus.DefBuckets, + }), + + apiRequests: factory.NewCounterVec(prometheus.CounterOpts{ + Name: "apex_api_requests_total", + Help: "Total API requests by method and status.", + }, []string{"method", "status"}), + + apiDuration: factory.NewHistogramVec(prometheus.HistogramOpts{ + Name: "apex_api_request_duration_seconds", + Help: "API request duration by method.", + Buckets: prometheus.DefBuckets, + }, []string{"method"}), + + storeQueryDur: factory.NewHistogramVec(prometheus.HistogramOpts{ + Name: "apex_store_query_duration_seconds", + Help: "Store query duration by operation.", + Buckets: prometheus.DefBuckets, + }, []string{"operation"}), + + activeSubs: factory.NewGauge(prometheus.GaugeOpts{ + Name: "apex_subscriptions_active", + Help: "Number of active event subscriptions.", + }), + + eventsDropped: factory.NewCounter(prometheus.CounterOpts{ + Name: "apex_subscriptions_events_dropped_total", + Help: "Total number of subscription events dropped.", + }), + + info: factory.NewGaugeVec(prometheus.GaugeOpts{ + Name: "apex_info", + Help: "Build information.", + }, []string{"version", "go_version"}), + } + + r.info.WithLabelValues(version, runtime.Version()).Set(1) + + return r +} + +func (r *PromRecorder) SetSyncState(state string) { + for _, s := range []string{"initializing", "backfilling", "streaming"} { + if s == state { + r.syncState.WithLabelValues(s).Set(1) + } else { + r.syncState.WithLabelValues(s).Set(0) + } + } +} + +func (r *PromRecorder) SetLatestHeight(h uint64) { + r.latestHeight.Set(float64(h)) + r.lastLatest.Store(h) + if lag := float64(r.lastNetwork.Load()) - float64(h); lag > 0 { + r.syncLag.Set(lag) + } else { + r.syncLag.Set(0) + } +} + +func (r *PromRecorder) SetNetworkHeight(h uint64) { + r.networkHeight.Set(float64(h)) + r.lastNetwork.Store(h) + if lag := float64(h) - float64(r.lastLatest.Load()); lag > 0 { + r.syncLag.Set(lag) + } else { + r.syncLag.Set(0) + } +} + +func (r *PromRecorder) IncBlobsProcessed(n int) { + if n > 0 { + r.blobsProcessed.Add(float64(n)) + } +} + +func (r *PromRecorder) IncHeadersProcessed(n int) { + if n > 0 { + r.headersProcessed.Add(float64(n)) + } +} + +func (r *PromRecorder) ObserveBatchDuration(d time.Duration) { + r.batchDuration.Observe(d.Seconds()) +} + +func (r *PromRecorder) IncAPIRequest(method, status string) { + r.apiRequests.WithLabelValues(method, status).Inc() +} + +func (r *PromRecorder) ObserveAPIRequestDuration(method string, d time.Duration) { + r.apiDuration.WithLabelValues(method).Observe(d.Seconds()) +} + +func (r *PromRecorder) ObserveStoreQueryDuration(op string, d time.Duration) { + r.storeQueryDur.WithLabelValues(op).Observe(d.Seconds()) +} + +func (r *PromRecorder) SetActiveSubscriptions(n int) { + r.activeSubs.Set(float64(n)) +} + +func (r *PromRecorder) IncEventsDropped() { + r.eventsDropped.Inc() +} diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go new file mode 100644 index 0000000..8023bf4 --- /dev/null +++ b/pkg/metrics/metrics_test.go @@ -0,0 +1,61 @@ +package metrics + +import ( + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +func TestPromRecorderRegisters(t *testing.T) { + reg := prometheus.NewRegistry() + r := NewPromRecorder(reg, "test-version") + + r.SetSyncState("streaming") + r.SetLatestHeight(100) + r.SetNetworkHeight(105) + r.IncBlobsProcessed(10) + r.IncHeadersProcessed(5) + r.ObserveBatchDuration(500 * time.Millisecond) + r.IncAPIRequest("BlobGet", "ok") + r.ObserveAPIRequestDuration("BlobGet", 10*time.Millisecond) + r.ObserveStoreQueryDuration("GetBlobs", 2*time.Millisecond) + r.SetActiveSubscriptions(3) + r.IncEventsDropped() + + // Verify metrics were gathered without error. + families, err := reg.Gather() + if err != nil { + t.Fatalf("Gather: %v", err) + } + if len(families) == 0 { + t.Fatal("expected at least one metric family") + } + + // Verify apex_info is present. + found := false + for _, fam := range families { + if fam.GetName() == "apex_info" { + found = true + break + } + } + if !found { + t.Error("apex_info metric not found") + } +} + +func TestNopRecorderDoesNotPanic(t *testing.T) { + r := Nop() + r.SetSyncState("streaming") + r.SetLatestHeight(100) + r.SetNetworkHeight(105) + r.IncBlobsProcessed(10) + r.IncHeadersProcessed(5) + r.ObserveBatchDuration(500 * time.Millisecond) + r.IncAPIRequest("BlobGet", "ok") + r.ObserveAPIRequestDuration("BlobGet", 10*time.Millisecond) + r.ObserveStoreQueryDuration("GetBlobs", 2*time.Millisecond) + r.SetActiveSubscriptions(3) + r.IncEventsDropped() +} diff --git a/pkg/metrics/server.go b/pkg/metrics/server.go new file mode 100644 index 0000000..9d2d6b0 --- /dev/null +++ b/pkg/metrics/server.go @@ -0,0 +1,46 @@ +package metrics + +import ( + "context" + "errors" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/rs/zerolog" +) + +// Server serves Prometheus metrics over HTTP. +type Server struct { + httpSrv *http.Server + log zerolog.Logger +} + +// NewServer creates a metrics HTTP server listening on addr. +func NewServer(addr string, log zerolog.Logger) *Server { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + + return &Server{ + httpSrv: &http.Server{ + Addr: addr, + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + }, + log: log.With().Str("component", "metrics-server").Logger(), + } +} + +// Start begins serving metrics. It blocks until the server is shut down. +func (s *Server) Start() error { + s.log.Info().Str("addr", s.httpSrv.Addr).Msg("metrics server listening") + if err := s.httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + return nil +} + +// Shutdown gracefully stops the metrics server. +func (s *Server) Shutdown(ctx context.Context) error { + return s.httpSrv.Shutdown(ctx) +} diff --git a/pkg/store/sqlite.go b/pkg/store/sqlite.go index b262ae7..4d8fbcd 100644 --- a/pkg/store/sqlite.go +++ b/pkg/store/sqlite.go @@ -9,6 +9,7 @@ import ( "runtime" "time" + "github.com/evstack/apex/pkg/metrics" "github.com/evstack/apex/pkg/types" _ "modernc.org/sqlite" @@ -22,8 +23,9 @@ var migrations embed.FS // The writer is limited to a single connection (WAL single-writer constraint), // while the reader pool allows concurrent API reads. type SQLiteStore struct { - writer *sql.DB - reader *sql.DB + writer *sql.DB + reader *sql.DB + metrics metrics.Recorder } // maxReadConns is the upper bound for the read connection pool. @@ -63,7 +65,7 @@ func Open(path string) (*SQLiteStore, error) { return nil, fmt.Errorf("configure reader: %w", err) } - s := &SQLiteStore{writer: writer, reader: reader} + s := &SQLiteStore{writer: writer, reader: reader, metrics: metrics.Nop()} if err := s.migrate(); err != nil { _ = writer.Close() _ = reader.Close() @@ -72,6 +74,11 @@ func Open(path string) (*SQLiteStore, error) { return s, nil } +// SetMetrics sets the metrics recorder for the store. +func (s *SQLiteStore) SetMetrics(m metrics.Recorder) { + s.metrics = m +} + func configureSQLite(db *sql.DB) error { if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil { return fmt.Errorf("set WAL mode: %w", err) @@ -120,6 +127,8 @@ func (s *SQLiteStore) PutBlobs(ctx context.Context, blobs []types.Blob) error { if len(blobs) == 0 { return nil } + start := time.Now() + defer func() { s.metrics.ObserveStoreQueryDuration("PutBlobs", time.Since(start)) }() tx, err := s.writer.BeginTx(ctx, nil) if err != nil { @@ -157,6 +166,9 @@ func (s *SQLiteStore) GetBlob(ctx context.Context, ns types.Namespace, height ui } func (s *SQLiteStore) GetBlobs(ctx context.Context, ns types.Namespace, startHeight, endHeight uint64, limit, offset int) ([]types.Blob, error) { + start := time.Now() + defer func() { s.metrics.ObserveStoreQueryDuration("GetBlobs", time.Since(start)) }() + query := `SELECT height, namespace, commitment, data, share_version, signer, blob_index FROM blobs WHERE namespace = ? AND height >= ? AND height <= ? ORDER BY height, blob_index` @@ -185,6 +197,9 @@ func (s *SQLiteStore) GetBlobs(ctx context.Context, ns types.Namespace, startHei } func (s *SQLiteStore) PutHeader(ctx context.Context, header *types.Header) error { + start := time.Now() + defer func() { s.metrics.ObserveStoreQueryDuration("PutHeader", time.Since(start)) }() + _, err := s.writer.ExecContext(ctx, `INSERT OR IGNORE INTO headers (height, hash, data_hash, time_ns, raw_header) VALUES (?, ?, ?, ?, ?)`, @@ -196,6 +211,9 @@ func (s *SQLiteStore) PutHeader(ctx context.Context, header *types.Header) error } func (s *SQLiteStore) GetHeader(ctx context.Context, height uint64) (*types.Header, error) { + start := time.Now() + defer func() { s.metrics.ObserveStoreQueryDuration("GetHeader", time.Since(start)) }() + var h types.Header var timeNs int64 err := s.reader.QueryRowContext(ctx, @@ -244,6 +262,9 @@ func (s *SQLiteStore) GetNamespaces(ctx context.Context) ([]types.Namespace, err } func (s *SQLiteStore) GetSyncState(ctx context.Context) (*types.SyncStatus, error) { + start := time.Now() + defer func() { s.metrics.ObserveStoreQueryDuration("GetSyncState", time.Since(start)) }() + var state int var latestHeight, networkHeight uint64 err := s.reader.QueryRowContext(ctx, diff --git a/pkg/sync/coordinator.go b/pkg/sync/coordinator.go index 76bdae6..6d0c9c1 100644 --- a/pkg/sync/coordinator.go +++ b/pkg/sync/coordinator.go @@ -9,6 +9,7 @@ import ( "github.com/rs/zerolog" "github.com/evstack/apex/pkg/fetch" + "github.com/evstack/apex/pkg/metrics" "github.com/evstack/apex/pkg/store" "github.com/evstack/apex/pkg/types" ) @@ -21,15 +22,18 @@ type HeightObserver func(height uint64, header *types.Header, blobs []types.Blob // Coordinator manages the sync lifecycle between a data fetcher and a store. type Coordinator struct { - store store.Store - fetcher fetch.DataFetcher - state types.SyncState - stateMu sync.RWMutex - batchSize int - concurrency int - startHeight uint64 - observer HeightObserver - log zerolog.Logger + store store.Store + fetcher fetch.DataFetcher + state types.SyncState + latestHeight uint64 + networkHeight uint64 + stateMu sync.RWMutex + batchSize int + concurrency int + startHeight uint64 + observer HeightObserver + metrics metrics.Recorder + log zerolog.Logger } // Option configures a Coordinator. @@ -70,6 +74,11 @@ func WithObserver(obs HeightObserver) Option { return func(c *Coordinator) { c.observer = obs } } +// WithMetrics sets the metrics recorder for the coordinator. +func WithMetrics(m metrics.Recorder) Option { + return func(c *Coordinator) { c.metrics = m } +} + // New creates a Coordinator with the given store, fetcher, and options. func New(s store.Store, f fetch.DataFetcher, opts ...Option) *Coordinator { coord := &Coordinator{ @@ -78,6 +87,7 @@ func New(s store.Store, f fetch.DataFetcher, opts ...Option) *Coordinator { state: types.Initializing, batchSize: 64, concurrency: 4, + metrics: metrics.Nop(), log: zerolog.Nop(), } for _, opt := range opts { @@ -91,7 +101,9 @@ func (c *Coordinator) Status() types.SyncStatus { c.stateMu.RLock() defer c.stateMu.RUnlock() return types.SyncStatus{ - State: c.state, + State: c.state, + LatestHeight: c.latestHeight, + NetworkHeight: c.networkHeight, } } @@ -99,6 +111,21 @@ func (c *Coordinator) setState(s types.SyncState) { c.stateMu.Lock() c.state = s c.stateMu.Unlock() + c.metrics.SetSyncState(s.String()) +} + +func (c *Coordinator) setLatestHeight(h uint64) { + c.stateMu.Lock() + c.latestHeight = h + c.stateMu.Unlock() + c.metrics.SetLatestHeight(h) +} + +func (c *Coordinator) setNetworkHeight(h uint64) { + c.stateMu.Lock() + c.networkHeight = h + c.stateMu.Unlock() + c.metrics.SetNetworkHeight(h) } // Run executes the sync lifecycle: initialize -> backfill -> stream. @@ -110,6 +137,10 @@ func (c *Coordinator) Run(ctx context.Context) error { if err != nil { return fmt.Errorf("initialize: %w", err) } + c.setNetworkHeight(networkHeight) + if fromHeight > 1 { + c.setLatestHeight(fromHeight - 1) + } if fromHeight <= networkHeight { c.setState(types.Backfilling) @@ -118,17 +149,28 @@ func (c *Coordinator) Run(ctx context.Context) error { Uint64("to", networkHeight). Msg("starting backfill") + // Wrap the user observer to also track latest height and metrics. + wrappedObserver := func(height uint64, header *types.Header, blobs []types.Blob) { + c.setLatestHeight(height) + c.metrics.IncHeadersProcessed(1) + c.metrics.IncBlobsProcessed(len(blobs)) + if c.observer != nil { + c.observer(height, header, blobs) + } + } + bf := &Backfiller{ store: c.store, fetcher: c.fetcher, batchSize: c.batchSize, concurrency: c.concurrency, - observer: c.observer, + observer: wrappedObserver, log: c.log.With().Str("component", "backfiller").Logger(), } if err := bf.Run(ctx, fromHeight, networkHeight); err != nil { return fmt.Errorf("backfill: %w", err) } + c.setLatestHeight(networkHeight) c.log.Info().Uint64("height", networkHeight).Msg("backfill complete") } @@ -136,10 +178,18 @@ func (c *Coordinator) Run(ctx context.Context) error { c.log.Info().Msg("entering streaming mode") sm := &SubscriptionManager{ - store: c.store, - fetcher: c.fetcher, - observer: c.observer, - log: c.log.With().Str("component", "subscription").Logger(), + store: c.store, + fetcher: c.fetcher, + observer: func(height uint64, header *types.Header, blobs []types.Blob) { + c.setLatestHeight(height) + c.setNetworkHeight(height) + c.metrics.IncHeadersProcessed(1) + c.metrics.IncBlobsProcessed(len(blobs)) + if c.observer != nil { + c.observer(height, header, blobs) + } + }, + log: c.log.With().Str("component", "subscription").Logger(), } err = sm.Run(ctx) if errors.Is(err, ErrGapDetected) {