Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/apex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
defer dataFetcher.Close() //nolint:errcheck

// Set up API layer.
notifier := api.NewNotifier(cfg.Subscription.BufferSize, log.Logger)
notifier := api.NewNotifier(cfg.Subscription.BufferSize, cfg.Subscription.MaxSubscribers, log.Logger)
notifier.SetMetrics(rec)
svc := api.NewService(db, dataFetcher, proofFwd, notifier, log.Logger)

Expand Down Expand Up @@ -266,6 +266,8 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
Addr: cfg.RPC.ListenAddr,
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
ReadTimeout: time.Duration(cfg.RPC.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(cfg.RPC.WriteTimeout) * time.Second,
}

go func() {
Expand Down
14 changes: 10 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type S3Config struct {
type RPCConfig struct {
ListenAddr string `yaml:"listen_addr"`
GRPCListenAddr string `yaml:"grpc_listen_addr"`
ReadTimeout int `yaml:"read_timeout"` // in seconds
WriteTimeout int `yaml:"write_timeout"` // in seconds
}

// SyncConfig configures the sync coordinator.
Expand All @@ -64,7 +66,8 @@ type SyncConfig struct {

// SubscriptionConfig configures API event subscriptions.
type SubscriptionConfig struct {
BufferSize int `yaml:"buffer_size"`
BufferSize int `yaml:"buffer_size"`
MaxSubscribers int `yaml:"max_subscribers"`
}

// MetricsConfig configures Prometheus metrics.
Expand Down Expand Up @@ -99,13 +102,16 @@ func DefaultConfig() Config {
RPC: RPCConfig{
ListenAddr: ":8080",
GRPCListenAddr: ":9090",
ReadTimeout: 30,
WriteTimeout: 30,
},
Sync: SyncConfig{
BatchSize: 64,
Concurrency: 4,
BatchSize: 256,
Concurrency: 12,
},
Subscription: SubscriptionConfig{
BufferSize: 64,
BufferSize: 64,
MaxSubscribers: 1024,
},
Metrics: MetricsConfig{
Enabled: true,
Expand Down
81 changes: 65 additions & 16 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,23 @@ rpc:
listen_addr: ":8080"
# Address for the gRPC API server
grpc_listen_addr: ":9090"
# HTTP read/write timeouts in seconds
read_timeout: 30
write_timeout: 30

sync:
# Height to start syncing from (0 = genesis)
start_height: 0
# Number of headers per backfill batch
batch_size: 64
batch_size: 256
# Number of concurrent fetch workers
concurrency: 4
concurrency: 12

subscription:
# Event buffer size per subscriber (for API subscriptions)
buffer_size: 64
# Maximum number of concurrent subscribers
max_subscribers: 1024

metrics:
# Enable Prometheus metrics endpoint
Expand Down Expand Up @@ -185,33 +190,77 @@ func validate(cfg *Config) error {
if err := validateStorage(&cfg.Storage); err != nil {
return err
}
if cfg.RPC.ListenAddr == "" {
if err := validateRPC(&cfg.RPC); err != nil {
return err
}
if err := validateSync(&cfg.Sync); err != nil {
return err
}
if err := validateSubscription(&cfg.Subscription); err != nil {
return err
}
if err := validateMetrics(&cfg.Metrics); err != nil {
return err
}
if err := validateProfiling(&cfg.Profiling); err != nil {
return err
}
Comment on lines 205 to 207

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The call to validateProfiling introduces redundant validation because the same check is performed manually immediately following this block (lines 208-210). The manual check should be removed in favor of the new validation function.

if !validLogLevels[cfg.Log.Level] {
return fmt.Errorf("log.level %q is invalid; must be one of trace/debug/info/warn/error/fatal/panic", cfg.Log.Level)
}
if cfg.Log.Format != "json" && cfg.Log.Format != "console" {
return fmt.Errorf("log.format %q is invalid; must be json or console", cfg.Log.Format)
}

return nil
}

func validateRPC(rpc *RPCConfig) error {
if rpc.ListenAddr == "" {
return fmt.Errorf("rpc.listen_addr is required")
}
if cfg.RPC.GRPCListenAddr == "" {
if rpc.GRPCListenAddr == "" {
return fmt.Errorf("rpc.grpc_listen_addr is required")
}
if cfg.Sync.BatchSize <= 0 {
if rpc.ReadTimeout < 0 {
return fmt.Errorf("rpc.read_timeout must be non-negative")
}
if rpc.WriteTimeout < 0 {
return fmt.Errorf("rpc.write_timeout must be non-negative")
}
return nil
}

func validateSync(sync *SyncConfig) error {
if sync.BatchSize <= 0 {
return fmt.Errorf("sync.batch_size must be positive")
}
if cfg.Sync.Concurrency <= 0 {
if sync.Concurrency <= 0 {
return fmt.Errorf("sync.concurrency must be positive")
}
if cfg.Subscription.BufferSize <= 0 {
return nil
}

func validateSubscription(sub *SubscriptionConfig) error {
if sub.BufferSize <= 0 {
return fmt.Errorf("subscription.buffer_size must be positive")
}
if cfg.Metrics.Enabled && cfg.Metrics.ListenAddr == "" {
if sub.MaxSubscribers <= 0 {
return fmt.Errorf("subscription.max_subscribers must be positive")
}
return nil
}

func validateMetrics(m *MetricsConfig) error {
if m.Enabled && m.ListenAddr == "" {
return fmt.Errorf("metrics.listen_addr is required when metrics are enabled")
}
if cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" {
return nil
}

func validateProfiling(p *ProfilingConfig) error {
if p.Enabled && p.ListenAddr == "" {
return fmt.Errorf("profiling.listen_addr is required when profiling is enabled")
}
if !validLogLevels[cfg.Log.Level] {
return fmt.Errorf("log.level %q is invalid; must be one of trace/debug/info/warn/error/fatal/panic", cfg.Log.Level)
}
if cfg.Log.Format != "json" && cfg.Log.Format != "console" {
return fmt.Errorf("log.format %q is invalid; must be json or console", cfg.Log.Format)
}

return nil
}
17 changes: 16 additions & 1 deletion config/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"testing"
)

func TestDefaultConfigProfilingDefaults(t *testing.T) {
func TestDefaultConfigDefaults(t *testing.T) {
t.Parallel()

cfg := DefaultConfig()
Expand All @@ -17,6 +17,21 @@ func TestDefaultConfigProfilingDefaults(t *testing.T) {
if cfg.Profiling.ListenAddr != "127.0.0.1:6061" {
t.Fatalf("profiling.listen_addr default = %q, want %q", cfg.Profiling.ListenAddr, "127.0.0.1:6061")
}
if cfg.Sync.BatchSize != 256 {
t.Fatalf("sync.batch_size default = %d, want %d", cfg.Sync.BatchSize, 256)
}
if cfg.Sync.Concurrency != 12 {
t.Fatalf("sync.concurrency default = %d, want %d", cfg.Sync.Concurrency, 12)
}
if cfg.RPC.ReadTimeout != 30 {
t.Fatalf("rpc.read_timeout default = %d, want 30", cfg.RPC.ReadTimeout)
}
if cfg.RPC.WriteTimeout != 30 {
t.Fatalf("rpc.write_timeout default = %d, want 30", cfg.RPC.WriteTimeout)
}
if cfg.Subscription.MaxSubscribers != 1024 {
t.Fatalf("subscription.max_subscribers default = %d, want 1024", cfg.Subscription.MaxSubscribers)
}
}

func TestLoadProfilingEnabledRequiresListenAddr(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions docs/running.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,17 @@ storage:
rpc:
listen_addr: ":8080" # JSON-RPC + health
grpc_listen_addr: ":9090" # gRPC
read_timeout: 30 # HTTP read timeout in seconds
write_timeout: 30 # HTTP write timeout in seconds

sync:
start_height: 0 # 0 = genesis
batch_size: 64 # headers per backfill batch
concurrency: 4 # concurrent fetch workers
batch_size: 256 # headers per backfill batch
concurrency: 12 # concurrent fetch workers

subscription:
buffer_size: 64 # event buffer per subscriber
max_subscribers: 1024 # maximum concurrent subscribers

metrics:
enabled: true
Expand Down
5 changes: 4 additions & 1 deletion pkg/api/grpc/blob_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ func (s *BlobServiceServer) Subscribe(req *pb.BlobServiceSubscribeRequest, strea
return status.Errorf(codes.InvalidArgument, "invalid namespace: %v", err)
}

sub := s.svc.Notifier().Subscribe([]types.Namespace{ns})
sub, err := s.svc.Notifier().Subscribe([]types.Namespace{ns})
if err != nil {
return status.Errorf(codes.ResourceExhausted, "subscribe: %v", err)
}
defer s.svc.Notifier().Unsubscribe(sub)

ctx := stream.Context()
Expand Down
5 changes: 4 additions & 1 deletion pkg/api/grpc/header_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ func (s *HeaderServiceServer) NetworkHead(ctx context.Context, _ *pb.NetworkHead
}

func (s *HeaderServiceServer) Subscribe(_ *pb.HeaderServiceSubscribeRequest, stream grpc.ServerStreamingServer[pb.HeaderServiceSubscribeResponse]) error {
sub := s.svc.HeaderSubscribe()
sub, err := s.svc.HeaderSubscribe()
if err != nil {
return status.Errorf(codes.ResourceExhausted, "subscribe: %v", err)
}
defer s.svc.Notifier().Unsubscribe(sub)

ctx := stream.Context()
Expand Down
14 changes: 7 additions & 7 deletions pkg/api/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestGRPCBlobGet(t *testing.T) {
{Height: 10, Namespace: ns, Data: []byte("d1"), Commitment: commitment, Index: 0},
}

notifier := api.NewNotifier(16, zerolog.Nop())
notifier := api.NewNotifier(16, 1024, zerolog.Nop())
svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop())
client := startTestServer(t, svc)

Expand Down Expand Up @@ -212,7 +212,7 @@ func TestGRPCBlobGetAll(t *testing.T) {
{Height: 10, Namespace: ns, Data: []byte("d2"), Commitment: []byte("c2"), Index: 1},
}

notifier := api.NewNotifier(16, zerolog.Nop())
notifier := api.NewNotifier(16, 1024, zerolog.Nop())
svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop())
client := startTestServer(t, svc)

Expand All @@ -237,7 +237,7 @@ func TestGRPCBlobGetByCommitment(t *testing.T) {
{Height: 10, Namespace: ns, Data: []byte("d1"), Commitment: commitment, Index: 0},
}

notifier := api.NewNotifier(16, zerolog.Nop())
notifier := api.NewNotifier(16, 1024, zerolog.Nop())
svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop())
client := startTestServer(t, svc)

Expand Down Expand Up @@ -266,7 +266,7 @@ func TestGRPCHeaderGetByHeight(t *testing.T) {
RawHeader: []byte("raw"),
}

notifier := api.NewNotifier(16, zerolog.Nop())
notifier := api.NewNotifier(16, 1024, zerolog.Nop())
svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop())
client := startTestHeaderServer(t, svc)

Expand All @@ -291,7 +291,7 @@ func TestGRPCHeaderLocalHead(t *testing.T) {
RawHeader: []byte("raw"),
}

notifier := api.NewNotifier(16, zerolog.Nop())
notifier := api.NewNotifier(16, 1024, zerolog.Nop())
svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop())
client := startTestHeaderServer(t, svc)

Expand All @@ -313,7 +313,7 @@ func TestGRPCHeaderNetworkHead(t *testing.T) {
},
}

notifier := api.NewNotifier(16, zerolog.Nop())
notifier := api.NewNotifier(16, 1024, zerolog.Nop())
svc := api.NewService(newMockStore(), ft, nil, notifier, zerolog.Nop())
client := startTestHeaderServer(t, svc)

Expand All @@ -330,7 +330,7 @@ func TestGRPCBlobSubscribe(t *testing.T) {
st := newMockStore()
ns := testNamespace(1)

notifier := api.NewNotifier(16, zerolog.Nop())
notifier := api.NewNotifier(16, 1024, zerolog.Nop())
svc := api.NewService(st, &mockFetcher{}, nil, notifier, zerolog.Nop())
client := startTestServer(t, svc)

Expand Down
4 changes: 2 additions & 2 deletions pkg/api/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestHealthEndpoint(t *testing.T) {
LatestHeight: 100,
NetworkHeight: 105,
}}
notifier := NewNotifier(64, zerolog.Nop())
notifier := NewNotifier(64, 1024, zerolog.Nop())
h := NewHealthHandler(sp, newMockStore(), notifier, "test")

mux := http.NewServeMux()
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestReadyEndpoint(t *testing.T) {
LatestHeight: 100,
NetworkHeight: 100,
}}
notifier := NewNotifier(64, zerolog.Nop())
notifier := NewNotifier(64, 1024, zerolog.Nop())
h := NewHealthHandler(sp, newMockStore(), notifier, "test")

mux := http.NewServeMux()
Expand Down
5 changes: 4 additions & 1 deletion pkg/api/jsonrpc/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ func (h *BlobHandler) Subscribe(ctx context.Context, namespace []byte) (<-chan j
return nil, err
}

sub := h.svc.BlobSubscribe(ns)
sub, err := h.svc.BlobSubscribe(ns)
if err != nil {
return nil, err
}
out := make(chan json.RawMessage, cap(sub.Events()))

go func() {
Expand Down
5 changes: 4 additions & 1 deletion pkg/api/jsonrpc/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ func (h *HeaderHandler) NetworkHead(ctx context.Context) (json.RawMessage, error
// Subscribe returns a channel of header events.
// Only available over WebSocket.
func (h *HeaderHandler) Subscribe(ctx context.Context) (<-chan json.RawMessage, error) {
sub := h.svc.HeaderSubscribe()
sub, err := h.svc.HeaderSubscribe()
if err != nil {
return nil, err
}
out := make(chan json.RawMessage, cap(sub.Events()))

go func() {
Expand Down
Loading