diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index e34f61502..9e3715bae 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -40,3 +40,19 @@ jobs: alert-threshold: '150%' fail-on-alert: true comment-on-alert: true + + - name: Run Block Executor benchmarks + run: | + go test -bench=BenchmarkProduceBlock -benchmem -run='^$' \ + ./block/internal/executing/... > block_executor_output.txt + - name: Store Block Executor benchmark result + uses: benchmark-action/github-action-benchmark@4bdcce38c94cec68da58d012ac24b7b1155efe8b # v1.20.7 + with: + name: Block Executor Benchmark + tool: 'go' + output-file-path: block_executor_output.txt + auto-push: true + github-token: ${{ secrets.GITHUB_TOKEN }} + alert-threshold: '150%' + fail-on-alert: true + comment-on-alert: true diff --git a/block/internal/executing/block_producer.go b/block/internal/executing/block_producer.go index f98c15c6a..92733cc8e 100644 --- a/block/internal/executing/block_producer.go +++ b/block/internal/executing/block_producer.go @@ -21,7 +21,4 @@ type BlockProducer interface { // ApplyBlock executes the block transactions and returns the new state. ApplyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) - - // ValidateBlock validates block structure and state transitions. - ValidateBlock(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error } diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index ac68f2cd8..ee9c9a9f5 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -10,25 +10,31 @@ import ( "sync/atomic" "time" - "github.com/evstack/ev-node/pkg/raft" - "github.com/ipfs/go-datastore" - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/rs/zerolog" - "golang.org/x/sync/errgroup" - "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" coreexecutor "github.com/evstack/ev-node/core/execution" coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/raft" "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" + "github.com/ipfs/go-datastore" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/rs/zerolog" ) var _ BlockProducer = (*Executor)(nil) +// lastBlockInfo contains cached per-block data to avoid store reads + protobuf +// deserialization in CreateBlock. +type lastBlockInfo struct { + headerHash types.Hash + dataHash types.Hash + signature types.Signature +} + // Executor handles block production, transaction processing, and state management type Executor struct { // Core components @@ -56,6 +62,16 @@ type Executor struct { // State management lastState *atomic.Pointer[types.State] + // hasPendingBlock tracks whether a pending block exists in the store, + // avoiding a store lookup on every ProduceBlock call. + hasPendingBlock atomic.Bool + + // Cached per-block data + lastBlockInfo atomic.Pointer[lastBlockInfo] + + // pendingCheckCounter amortizes the expensive NumPendingHeaders/NumPendingData + pendingCheckCounter uint64 + // Channels for coordination txNotifyCh chan struct{} errorCh chan<- error // Channel to report critical execution client failures @@ -168,6 +184,9 @@ func (e *Executor) Stop() error { e.wg.Wait() e.logger.Info().Msg("executor stopped") + if !e.hasPendingBlock.Load() { + _ = e.deletePendingBlock(context.Background()) // nolint: gocritic // not critical + } return nil } @@ -277,6 +296,26 @@ func (e *Executor) initializeState() error { return fmt.Errorf("failed to migrate legacy pending block: %w", err) } + if _, err := e.store.GetMetadata(e.ctx, headerKey); err == nil { + e.hasPendingBlock.Store(true) + } + + // Warm the last-block cache + if state.LastBlockHeight > 0 { + h, d, err := e.store.GetBlockData(e.ctx, state.LastBlockHeight) + if err == nil { + info := &lastBlockInfo{ + headerHash: h.Hash(), + dataHash: d.Hash(), + } + sig, err := e.store.GetSignature(e.ctx, state.LastBlockHeight) + if err == nil { + info.signature = *sig + } + e.lastBlockInfo.Store(info) + } + } + // Determine sync target: use Raft height if node is behind Raft consensus syncTargetHeight := state.LastBlockHeight if e.raftNode != nil { @@ -413,18 +452,26 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { e.logger.Debug().Uint64("height", newHeight).Msg("producing block") - // check pending limits + // Amortized pending limit check — NumPendingHeaders/NumPendingData call + // advancePastEmptyData which scans the store. Only amortize when the limit + // is large enough that checking every N blocks won't overshoot. + const pendingCheckInterval uint64 = 64 // arbitrary but good value if e.config.Node.MaxPendingHeadersAndData > 0 { - pendingHeaders := e.cache.NumPendingHeaders() - pendingData := e.cache.NumPendingData() - if pendingHeaders >= e.config.Node.MaxPendingHeadersAndData || - pendingData >= e.config.Node.MaxPendingHeadersAndData { - e.logger.Warn(). - Uint64("pending_headers", pendingHeaders). - Uint64("pending_data", pendingData). - Uint64("limit", e.config.Node.MaxPendingHeadersAndData). - Msg("pending limit reached, skipping block production") - return nil + e.pendingCheckCounter++ + shouldCheck := e.config.Node.MaxPendingHeadersAndData <= pendingCheckInterval || + e.pendingCheckCounter%pendingCheckInterval == 0 + if shouldCheck { + pendingHeaders := e.cache.NumPendingHeaders() + pendingData := e.cache.NumPendingData() + if pendingHeaders >= e.config.Node.MaxPendingHeadersAndData || + pendingData >= e.config.Node.MaxPendingHeadersAndData { + e.logger.Warn(). + Uint64("pending_headers", pendingHeaders). + Uint64("pending_data", pendingData). + Uint64("limit", e.config.Node.MaxPendingHeadersAndData). + Msg("pending limit reached, skipping block production") + return nil + } } } @@ -434,17 +481,22 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { batchData *BatchData ) - // Check if there's an already stored block at the newHeight - // If there is use that instead of creating a new block - pendingHeader, pendingData, err := e.getPendingBlock(ctx) - if err == nil && pendingHeader != nil && pendingHeader.Height() == newHeight { - e.logger.Info().Uint64("height", newHeight).Msg("using pending block") - header = pendingHeader - data = pendingData - } else if err != nil && !errors.Is(err, datastore.ErrNotFound) { - return fmt.Errorf("failed to get block data: %w", err) - } else { + // Check if there's an already stored block at the newHeight. + // Only hit the store if the in-memory flag indicates a pending block exists. + if e.hasPendingBlock.Load() { + pendingHeader, pendingData, err := e.getPendingBlock(ctx) + if err == nil && pendingHeader != nil && pendingHeader.Height() == newHeight { + e.logger.Info().Uint64("height", newHeight).Msg("using pending block") + header = pendingHeader + data = pendingData + } else if err != nil && !errors.Is(err, datastore.ErrNotFound) { + return fmt.Errorf("failed to get block data: %w", err) + } + } + + if header == nil { // get batch from sequencer + var err error batchData, err = e.blockProducer.RetrieveBatch(ctx) if errors.Is(err, common.ErrNoBatch) { e.logger.Debug().Msg("no batch available") @@ -478,14 +530,16 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { // signing the header is done after applying the block // as for signing, the state of the block may be required by the signature payload provider. - // For based sequencer, this will return an empty signature - signature, err := e.signHeader(header.Header) + // For based sequencer, this will return an empty signature. + signature, _, err := e.signHeader(&header.Header) if err != nil { return fmt.Errorf("failed to sign header: %w", err) } header.Signature = signature - if err := e.blockProducer.ValidateBlock(ctx, currentState, header, data); err != nil { + // Structural validation only — skip the expensive Validate() / DACommitment() + // re-computation since we just produced this block ourselves. + if err := currentState.AssertValidSequence(header); err != nil { e.sendCriticalError(fmt.Errorf("failed to validate block: %w", err)) e.logger.Error().Err(err).Msg("CRITICAL: Permanent block validation error - halting block production") return fmt.Errorf("failed to validate block: %w", err) @@ -533,28 +587,36 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { } e.logger.Debug().Uint64("height", newHeight).Msg("proposed block to raft") } - if err := e.deletePendingBlock(batch); err != nil { - e.logger.Warn().Err(err).Uint64("height", newHeight).Msg("failed to delete pending block metadata") - } if err := batch.Commit(); err != nil { return fmt.Errorf("failed to commit batch: %w", err) } + e.hasPendingBlock.Store(false) + // Update in-memory state after successful commit e.setLastState(newState) - // broadcast header and data to P2P network - g, broadcastCtx := errgroup.WithContext(e.ctx) - g.Go(func() error { - return e.headerBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, &types.P2PSignedHeader{SignedHeader: header}) + // Update last-block cache so the next CreateBlock avoids a store read. + e.lastBlockInfo.Store(&lastBlockInfo{ + headerHash: newState.LastHeaderHash, + dataHash: data.Hash(), + signature: signature, }) - g.Go(func() error { - return e.dataBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, &types.P2PData{Data: data}) - }) - if err := g.Wait(); err != nil { - e.logger.Error().Err(err).Msg("failed to broadcast header and/data") - // don't fail block production on broadcast error + + // Broadcast header and data to P2P network sequentially. + // IMPORTANT: Header MUST be broadcast before data — the P2P layer validates + // incoming data against the current and previous header, so out-of-order + // delivery would cause validation failures on peers. + if err := e.headerBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PSignedHeader{ + SignedHeader: header, + }); err != nil { + e.logger.Error().Err(err).Msg("failed to broadcast header") + } + if err := e.dataBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PData{ + Data: data, + }); err != nil { + e.logger.Error().Err(err).Msg("failed to broadcast data") } e.recordBlockMetrics(newState, data) @@ -604,7 +666,6 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba currentState := e.getLastState() headerTime := uint64(e.genesis.StartTime.UnixNano()) - // Get last block info var lastHeaderHash types.Hash var lastDataHash types.Hash var lastSignature types.Signature @@ -612,18 +673,26 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba if height > e.genesis.InitialHeight { headerTime = uint64(batchData.UnixNano()) - lastHeader, lastData, err := e.store.GetBlockData(ctx, height-1) - if err != nil { - return nil, nil, fmt.Errorf("failed to get last block: %w", err) - } - lastHeaderHash = lastHeader.Hash() - lastDataHash = lastData.Hash() + if info := e.lastBlockInfo.Load(); info != nil { + // Fast path: use in-memory cache + lastHeaderHash = info.headerHash + lastDataHash = info.dataHash + lastSignature = info.signature + } else { + // Cold start fallback: read from store + lastHeader, lastData, err := e.store.GetBlockData(ctx, height-1) + if err != nil { + return nil, nil, fmt.Errorf("failed to get last block: %w", err) + } + lastHeaderHash = lastHeader.Hash() + lastDataHash = lastData.Hash() - lastSignaturePtr, err := e.store.GetSignature(ctx, height-1) - if err != nil { - return nil, nil, fmt.Errorf("failed to get last signature: %w", err) + lastSignaturePtr, err := e.store.GetSignature(ctx, height-1) + if err != nil { + return nil, nil, fmt.Errorf("failed to get last signature: %w", err) + } + lastSignature = *lastSignaturePtr } - lastSignature = *lastSignaturePtr } // Get signer info and validator hash @@ -642,7 +711,6 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba return nil, nil, fmt.Errorf("failed to get validator hash: %w", err) } } else { - // For based sequencer without signer, use nil pubkey and compute validator hash var err error validatorHash, err = e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, nil) if err != nil { @@ -703,16 +771,20 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba func (e *Executor) ApplyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) { currentState := e.getLastState() - // Prepare transactions - rawTxs := make([][]byte, len(data.Txs)) - for i, tx := range data.Txs { - rawTxs[i] = []byte(tx) + // Convert Txs to [][]byte for the execution client. + // types.Tx is []byte, so this is a type conversion, not a copy. + var rawTxs [][]byte + if n := len(data.Txs); n > 0 { + rawTxs = make([][]byte, n) + for i, tx := range data.Txs { + rawTxs[i] = []byte(tx) + } } // Execute transactions - ctx = context.WithValue(ctx, types.HeaderContextKey, header) + execCtx := context.WithValue(ctx, types.HeaderContextKey, header) - newAppHash, err := e.executeTxsWithRetry(ctx, rawTxs, header, currentState) + newAppHash, err := e.executeTxsWithRetry(execCtx, rawTxs, header, currentState) if err != nil { e.sendCriticalError(fmt.Errorf("failed to execute transactions: %w", err)) return types.State{}, fmt.Errorf("failed to execute transactions: %w", err) @@ -727,19 +799,25 @@ func (e *Executor) ApplyBlock(ctx context.Context, header types.Header, data *ty return newState, nil } -// signHeader signs the block header -func (e *Executor) signHeader(header types.Header) (types.Signature, error) { +// signHeader signs the block header and returns both the signature and the +// serialized header bytes (signing payload). The caller can reuse headerBytes +// in SaveBlockDataFromBytes to avoid a redundant MarshalBinary call. +func (e *Executor) signHeader(header *types.Header) (types.Signature, []byte, error) { // For based sequencer, return empty signature as there is no signer if e.signer == nil { - return types.Signature{}, nil + return types.Signature{}, nil, nil } - bz, err := e.options.AggregatorNodeSignatureBytesProvider(&header) + bz, err := e.options.AggregatorNodeSignatureBytesProvider(header) if err != nil { - return nil, fmt.Errorf("failed to get signature payload: %w", err) + return nil, nil, fmt.Errorf("failed to get signature payload: %w", err) } - return e.signer.Sign(bz) + sig, err := e.signer.Sign(bz) + if err != nil { + return nil, nil, err + } + return sig, bz, nil } // executeTxsWithRetry executes transactions with retry logic. @@ -772,19 +850,6 @@ func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, hea return nil, nil } -// ValidateBlock validates the created block. -func (e *Executor) ValidateBlock(_ context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { - // Set custom verifier for aggregator node signature - header.SetCustomVerifierForAggregator(e.options.AggregatorNodeSignatureBytesProvider) - - // Basic header validation - if err := header.ValidateBasic(); err != nil { - return fmt.Errorf("invalid header: %w", err) - } - - return lastState.AssertValidForNextState(header, data) -} - // sendCriticalError sends a critical error to the error channel without blocking func (e *Executor) sendCriticalError(err error) { if e.errorCh != nil { @@ -804,10 +869,11 @@ func (e *Executor) recordBlockMetrics(newState types.State, data *types.Data) { return } - e.metrics.NumTxs.Set(float64(len(data.Txs))) - e.metrics.TotalTxs.Add(float64(len(data.Txs))) - e.metrics.TxsPerBlock.Observe(float64(len(data.Txs))) - e.metrics.BlockSizeBytes.Set(float64(data.Size())) + nTxs := float64(len(data.Txs)) + e.metrics.NumTxs.Set(nTxs) + e.metrics.TotalTxs.Add(nTxs) + e.metrics.TxsPerBlock.Observe(nTxs) + e.metrics.BlockSizeBytes.Set(float64(data.TxsByteSize())) e.metrics.CommittedHeight.Set(float64(data.Metadata.Height)) } diff --git a/block/internal/executing/executor_benchmark_test.go b/block/internal/executing/executor_benchmark_test.go new file mode 100644 index 000000000..4c925a013 --- /dev/null +++ b/block/internal/executing/executor_benchmark_test.go @@ -0,0 +1,170 @@ +package executing + +import ( + "context" + "crypto/rand" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/celestiaorg/go-header" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/sync" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/evstack/ev-node/block/internal/cache" + "github.com/evstack/ev-node/block/internal/common" + coreexec "github.com/evstack/ev-node/core/execution" + coreseq "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/pkg/config" + "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/signer/noop" + "github.com/evstack/ev-node/pkg/store" + "github.com/evstack/ev-node/types" +) + +func BenchmarkProduceBlock(b *testing.B) { + specs := map[string]struct { + txs [][]byte + }{ + "empty batch": { + txs: nil, + }, + "single tx": { + txs: [][]byte{[]byte("tx1")}, + }, + "100 txs": { + txs: createTxs(100), + }, + } + for name, spec := range specs { + b.Run(name, func(b *testing.B) { + exec := newBenchExecutorWithStubs(b, spec.txs) + ctx := b.Context() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := exec.ProduceBlock(ctx); err != nil { + b.Fatalf("ProduceBlock: %v", err) + } + } + }) + } +} + +// newBenchExecutorWithStubs creates an Executor using zero-overhead stubs. +func newBenchExecutorWithStubs(b *testing.B, txs [][]byte) *Executor { + b.Helper() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + memStore := store.New(ds) + + cacheManager, err := cache.NewManager(config.DefaultConfig(), memStore, zerolog.Nop()) + require.NoError(b, err) + + priv, _, err := crypto.GenerateEd25519Key(rand.Reader) + require.NoError(b, err) + signerWrapper, err := noop.NewNoopSigner(priv) + require.NoError(b, err) + addr, err := signerWrapper.GetAddress() + require.NoError(b, err) + + cfg := config.DefaultConfig() + cfg.Node.BlockTime = config.DurationWrapper{Duration: 10 * time.Millisecond} + cfg.Node.MaxPendingHeadersAndData = 0 // disabled — avoids advancePastEmptyData store scans + + gen := genesis.Genesis{ + ChainID: "bench-chain", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Hour), + ProposerAddress: addr, + } + + stubExec := &stubExecClient{stateRoot: []byte("init_root")} + stubSeq := &stubSequencer{txs: txs} + hb := &stubBroadcaster[*types.P2PSignedHeader]{} + db := &stubBroadcaster[*types.P2PData]{} + + exec, err := NewExecutor( + memStore, stubExec, stubSeq, signerWrapper, + cacheManager, common.NopMetrics(), cfg, gen, + hb, db, zerolog.Nop(), common.DefaultBlockOptions(), + make(chan error, 1), nil, + ) + require.NoError(b, err) + + require.NoError(b, exec.initializeState()) + + exec.ctx, exec.cancel = context.WithCancel(b.Context()) + b.Cleanup(func() { exec.cancel() }) + + return exec +} + +// stubSequencer implements coreseq.Sequencer. +// GetNextBatch returns a monotonically-increasing timestamp on every call so +// that successive ProduceBlock iterations pass AssertValidSequence. +type stubSequencer struct { + txs [][]byte + counter atomic.Int64 // incremented each call; used to advance the timestamp +} + +func (s *stubSequencer) SubmitBatchTxs(context.Context, coreseq.SubmitBatchTxsRequest) (*coreseq.SubmitBatchTxsResponse, error) { + return nil, nil +} +func (s *stubSequencer) GetNextBatch(context.Context, coreseq.GetNextBatchRequest) (*coreseq.GetNextBatchResponse, error) { + n := s.counter.Add(1) + ts := time.Now().Add(time.Duration(n) * time.Millisecond) + return &coreseq.GetNextBatchResponse{ + Batch: &coreseq.Batch{Transactions: s.txs}, + Timestamp: ts, + BatchData: s.txs, + }, nil +} +func (s *stubSequencer) VerifyBatch(context.Context, coreseq.VerifyBatchRequest) (*coreseq.VerifyBatchResponse, error) { + return nil, nil +} +func (s *stubSequencer) SetDAHeight(uint64) {} +func (s *stubSequencer) GetDAHeight() uint64 { return 0 } + +func createTxs(n int) [][]byte { + txs := make([][]byte, n) + for i := 0; i < n; i++ { + txs[i] = []byte(fmt.Sprintf("tx%d", i)) + } + return txs +} + +// stubExecClient implements coreexec.Executor with fixed return values. +type stubExecClient struct { + stateRoot []byte +} + +func (s *stubExecClient) InitChain(context.Context, time.Time, uint64, string) ([]byte, error) { + return s.stateRoot, nil +} +func (s *stubExecClient) GetTxs(context.Context) ([][]byte, error) { return nil, nil } +func (s *stubExecClient) ExecuteTxs(_ context.Context, _ [][]byte, _ uint64, _ time.Time, _ []byte) ([]byte, error) { + return s.stateRoot, nil +} +func (s *stubExecClient) SetFinal(context.Context, uint64) error { return nil } +func (s *stubExecClient) GetExecutionInfo(context.Context) (coreexec.ExecutionInfo, error) { + return coreexec.ExecutionInfo{}, nil +} +func (s *stubExecClient) FilterTxs(context.Context, [][]byte, uint64, uint64, bool) ([]coreexec.FilterStatus, error) { + return nil, nil +} + +// stubBroadcaster implements common.Broadcaster[H] with no-ops. +type stubBroadcaster[H header.Header[H]] struct{} + +func (s *stubBroadcaster[H]) WriteToStoreAndBroadcast(context.Context, H, ...pubsub.PubOpt) error { + return nil +} +func (s *stubBroadcaster[H]) Store() header.Store[H] { return nil } +func (s *stubBroadcaster[H]) Height() uint64 { return 0 } diff --git a/block/internal/executing/executor_logic_test.go b/block/internal/executing/executor_logic_test.go index 31a237315..7b90de0ed 100644 --- a/block/internal/executing/executor_logic_test.go +++ b/block/internal/executing/executor_logic_test.go @@ -43,177 +43,80 @@ func buildTestSigner(t *testing.T) (signerAddr []byte, tSigner types.Signer, s p } func TestProduceBlock_EmptyBatch_SetsEmptyDataHash(t *testing.T) { - ds := sync.MutexWrap(datastore.NewMapDatastore()) - memStore := store.New(ds) - - cacheManager, err := cache.NewManager(config.DefaultConfig(), memStore, zerolog.Nop()) - require.NoError(t, err) - - metrics := common.NopMetrics() - - // signer and genesis with correct proposer - addr, _, signerWrapper := buildTestSigner(t) - - cfg := config.DefaultConfig() - cfg.Node.BlockTime = config.DurationWrapper{Duration: 10 * time.Millisecond} - cfg.Node.MaxPendingHeadersAndData = 1000 - - gen := genesis.Genesis{ - ChainID: "test-chain", - InitialHeight: 1, - StartTime: time.Now().Add(-time.Second), - ProposerAddress: addr, - } - - // Use mocks for executor and sequencer - mockExec := testmocks.NewMockExecutor(t) - mockSeq := testmocks.NewMockSequencer(t) - - // Broadcasters are required by produceBlock; use generated mocks - hb := common.NewMockBroadcaster[*types.P2PSignedHeader](t) - hb.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe() - db := common.NewMockBroadcaster[*types.P2PData](t) - db.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe() - - exec, err := NewExecutor( - memStore, - mockExec, - mockSeq, - signerWrapper, - cacheManager, - metrics, - cfg, - gen, - hb, - db, - zerolog.Nop(), - common.DefaultBlockOptions(), - make(chan error, 1), - nil, - ) - require.NoError(t, err) - - // Expect InitChain to be called - initStateRoot := []byte("init_root") - mockExec.EXPECT().InitChain(mock.Anything, mock.AnythingOfType("time.Time"), gen.InitialHeight, gen.ChainID). - Return(initStateRoot, nil).Once() - mockSeq.EXPECT().SetDAHeight(uint64(0)).Return().Once() + fx := setupTestExecutor(t, 1000) + defer fx.Cancel() - // initialize state (creates genesis block in store and sets state) - require.NoError(t, exec.initializeState()) - - // Set up context for the executor (normally done in Start method) - exec.ctx, exec.cancel = context.WithCancel(context.Background()) - defer exec.cancel() - - // sequencer returns empty batch - mockSeq.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). + fx.MockSeq.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). RunAndReturn(func(ctx context.Context, req coreseq.GetNextBatchRequest) (*coreseq.GetNextBatchResponse, error) { return &coreseq.GetNextBatchResponse{Batch: &coreseq.Batch{Transactions: nil}, Timestamp: time.Now()}, nil }).Once() - // executor ExecuteTxs called with empty txs and previous state root - mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.AnythingOfType("time.Time"), initStateRoot). + fx.MockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.AnythingOfType("time.Time"), fx.InitStateRoot). Return([]byte("new_root"), nil).Once() - mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() + fx.MockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() - // produce one block - err = exec.ProduceBlock(exec.ctx) + err := fx.Exec.ProduceBlock(fx.Exec.ctx) require.NoError(t, err) - // Verify height and stored block - h, err := memStore.Height(context.Background()) + h, err := fx.MemStore.Height(context.Background()) require.NoError(t, err) assert.Equal(t, uint64(1), h) - sh, data, err := memStore.GetBlockData(context.Background(), 1) + sh, data, err := fx.MemStore.GetBlockData(context.Background(), 1) require.NoError(t, err) - // Expect empty txs and special empty data hash marker assert.Equal(t, 0, len(data.Txs)) assert.EqualValues(t, common.DataHashForEmptyTxs, sh.DataHash) - - // Broadcasters should have been called with the produced header and data - // The testify mock framework tracks calls automatically } -func TestPendingLimit_SkipsProduction(t *testing.T) { - ds := sync.MutexWrap(datastore.NewMapDatastore()) - memStore := store.New(ds) - - cacheManager, err := cache.NewManager(config.DefaultConfig(), memStore, zerolog.Nop()) - require.NoError(t, err) - - metrics := common.NopMetrics() - - addr, _, signerWrapper := buildTestSigner(t) - - cfg := config.DefaultConfig() - cfg.Node.BlockTime = config.DurationWrapper{Duration: 10 * time.Millisecond} - cfg.Node.MaxPendingHeadersAndData = 1 // low limit to trigger skip quickly - - gen := genesis.Genesis{ - ChainID: "test-chain", - InitialHeight: 1, - StartTime: time.Now().Add(-time.Second), - ProposerAddress: addr, +func TestProduceBlock_OutputPassesValidation(t *testing.T) { + specs := map[string]struct { + txs [][]byte + }{ + "empty batch": {txs: nil}, + "single tx": {txs: [][]byte{[]byte("tx1")}}, + "multi txs": {txs: [][]byte{[]byte("tx1"), []byte("tx2"), []byte("tx3")}}, + "large tx": {txs: [][]byte{make([]byte, 10000)}}, } - mockExec := testmocks.NewMockExecutor(t) - mockSeq := testmocks.NewMockSequencer(t) - hb := common.NewMockBroadcaster[*types.P2PSignedHeader](t) - hb.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe() - db := common.NewMockBroadcaster[*types.P2PData](t) - db.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe() + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + assertProduceBlockInvariantWithTxs(t, spec.txs) + }) + } +} - exec, err := NewExecutor( - memStore, - mockExec, - mockSeq, - signerWrapper, - cacheManager, - metrics, - cfg, - gen, - hb, - db, - zerolog.Nop(), - common.DefaultBlockOptions(), - make(chan error, 1), - nil, - ) - require.NoError(t, err) +func FuzzProduceBlock_OutputPassesValidation(f *testing.F) { + f.Add([]byte("tx1"), []byte("tx2")) + f.Add([]byte{}, []byte{}) + f.Add(make([]byte, 1000), make([]byte, 2000)) - mockExec.EXPECT().InitChain(mock.Anything, mock.AnythingOfType("time.Time"), gen.InitialHeight, gen.ChainID). - Return([]byte("i0"), nil).Once() - mockSeq.EXPECT().SetDAHeight(uint64(0)).Return().Once() - require.NoError(t, exec.initializeState()) + f.Fuzz(func(t *testing.T, tx1, tx2 []byte) { + txs := [][]byte{tx1, tx2} + assertProduceBlockInvariantWithTxs(t, txs) + }) +} - // Set up context for the executor (normally done in Start method) - exec.ctx, exec.cancel = context.WithCancel(context.Background()) - defer exec.cancel() +func TestPendingLimit_SkipsProduction(t *testing.T) { + fx := setupTestExecutor(t, 1) + defer fx.Cancel() - // First production should succeed - // Return empty batch again - mockSeq.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). + fx.MockSeq.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). RunAndReturn(func(ctx context.Context, req coreseq.GetNextBatchRequest) (*coreseq.GetNextBatchResponse, error) { return &coreseq.GetNextBatchResponse{Batch: &coreseq.Batch{Transactions: nil}, Timestamp: time.Now()}, nil }).Once() - // ExecuteTxs with empty - mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.AnythingOfType("time.Time"), []byte("i0")). + fx.MockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.AnythingOfType("time.Time"), fx.InitStateRoot). Return([]byte("i1"), nil).Once() - mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() + fx.MockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() - require.NoError(t, exec.ProduceBlock(exec.ctx)) - h1, err := memStore.Height(context.Background()) + require.NoError(t, fx.Exec.ProduceBlock(fx.Exec.ctx)) + h1, err := fx.MemStore.Height(context.Background()) require.NoError(t, err) assert.Equal(t, uint64(1), h1) - // With limit=1 and lastSubmitted default 0, pending >= 1 so next production should be skipped - // No new expectations; ProduceBlock should return early before hitting sequencer - require.NoError(t, exec.ProduceBlock(exec.ctx)) - h2, err := memStore.Height(context.Background()) + require.NoError(t, fx.Exec.ProduceBlock(fx.Exec.ctx)) + h2, err := fx.MemStore.Height(context.Background()) require.NoError(t, err) assert.Equal(t, h1, h2, "height should not change when production is skipped") } @@ -329,3 +232,113 @@ func TestExecutor_executeTxsWithRetry(t *testing.T) { }) } } + +type executorTestFixture struct { + MemStore store.Store + MockExec *testmocks.MockExecutor + MockSeq *testmocks.MockSequencer + Exec *Executor + Cancel context.CancelFunc + InitStateRoot []byte +} + +func setupTestExecutor(t *testing.T, pendingLimit uint64) executorTestFixture { + t.Helper() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + memStore := store.New(ds) + + cacheManager, err := cache.NewManager(config.DefaultConfig(), memStore, zerolog.Nop()) + require.NoError(t, err) + + metrics := common.NopMetrics() + + addr, _, signerWrapper := buildTestSigner(t) + + cfg := config.DefaultConfig() + cfg.Node.BlockTime = config.DurationWrapper{Duration: 10 * time.Millisecond} + cfg.Node.MaxPendingHeadersAndData = pendingLimit + + gen := genesis.Genesis{ + ChainID: "test-chain", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Second), + ProposerAddress: addr, + } + + mockExec := testmocks.NewMockExecutor(t) + mockSeq := testmocks.NewMockSequencer(t) + + hb := common.NewMockBroadcaster[*types.P2PSignedHeader](t) + hb.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe() + db := common.NewMockBroadcaster[*types.P2PData](t) + db.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe() + + exec, err := NewExecutor( + memStore, mockExec, mockSeq, signerWrapper, cacheManager, metrics, cfg, gen, hb, db, + zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), nil, + ) + require.NoError(t, err) + + initStateRoot := []byte("init_root") + mockExec.EXPECT().InitChain(mock.Anything, mock.AnythingOfType("time.Time"), gen.InitialHeight, gen.ChainID). + Return(initStateRoot, nil).Once() + mockSeq.EXPECT().SetDAHeight(uint64(0)).Return().Once() + + require.NoError(t, exec.initializeState()) + + ctx, cancel := context.WithCancel(context.Background()) + exec.ctx = ctx + + return executorTestFixture{ + MemStore: memStore, + MockExec: mockExec, + MockSeq: mockSeq, + Exec: exec, + Cancel: cancel, + InitStateRoot: initStateRoot, + } +} + +func assertProduceBlockInvariantWithTxs(t *testing.T, txs [][]byte) { + t.Helper() + fx := setupTestExecutor(t, 1000) + defer fx.Cancel() + + timestamp := time.Now().Add(time.Duration(1+len(txs)*2) * time.Millisecond) + + newRoot := make([]byte, 32) + for i, tx := range txs { + if len(tx) > 0 { + newRoot[(i+int(tx[0]))%32] ^= byte(len(tx)) + } + } + newRoot[31] ^= byte(len(txs)) + + fx.MockSeq.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). + RunAndReturn(func(ctx context.Context, req coreseq.GetNextBatchRequest) (*coreseq.GetNextBatchResponse, error) { + return &coreseq.GetNextBatchResponse{Batch: &coreseq.Batch{Transactions: txs}, Timestamp: timestamp}, nil + }).Once() + + fx.MockExec.EXPECT().ExecuteTxs(mock.Anything, txs, uint64(1), mock.AnythingOfType("time.Time"), fx.InitStateRoot). + Return(newRoot, nil).Once() + fx.MockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() + + prevState := fx.Exec.getLastState() + + err := fx.Exec.ProduceBlock(fx.Exec.ctx) + require.NoError(t, err) + + h, err := fx.MemStore.Height(context.Background()) + require.NoError(t, err) + assert.Equal(t, uint64(1), h) + + header, data, err := fx.MemStore.GetBlockData(context.Background(), h) + require.NoError(t, err) + + err = header.ValidateBasicWithData(data) + require.NoError(t, err, "Produced header failed ValidateBasicWithData") + + err = prevState.AssertValidForNextState(header, data) + require.NoError(t, err, "Produced block failed AssertValidForNextState") +} diff --git a/block/internal/executing/pending.go b/block/internal/executing/pending.go index dc758e558..c43601bd3 100644 --- a/block/internal/executing/pending.go +++ b/block/internal/executing/pending.go @@ -78,12 +78,12 @@ func (e *Executor) savePendingBlock(ctx context.Context, header *types.SignedHea } // deletePendingBlock removes pending block metadata -func (e *Executor) deletePendingBlock(batch store.Batch) error { - if err := batch.Delete(ds.NewKey(store.GetMetaKey(headerKey))); err != nil { +func (e *Executor) deletePendingBlock(ctx context.Context) error { + if err := e.store.DeleteMetadata(ctx, headerKey); err != nil { return fmt.Errorf("delete pending header: %w", err) } - if err := batch.Delete(ds.NewKey(store.GetMetaKey(dataKey))); err != nil { + if err := e.store.DeleteMetadata(ctx, dataKey); err != nil { return fmt.Errorf("delete pending data: %w", err) } return nil diff --git a/block/internal/executing/tracing.go b/block/internal/executing/tracing.go index 137d9170b..afc1d495a 100644 --- a/block/internal/executing/tracing.go +++ b/block/internal/executing/tracing.go @@ -103,19 +103,3 @@ func (t *tracedBlockProducer) ApplyBlock(ctx context.Context, header types.Heade ) return state, nil } - -func (t *tracedBlockProducer) ValidateBlock(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { - ctx, span := t.tracer.Start(ctx, "BlockExecutor.ValidateBlock", - trace.WithAttributes( - attribute.Int64("block.height", int64(header.Height())), - ), - ) - defer span.End() - - err := t.inner.ValidateBlock(ctx, lastState, header, data) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - } - return err -} diff --git a/block/internal/executing/tracing_test.go b/block/internal/executing/tracing_test.go index 396e1d8a4..a4865e456 100644 --- a/block/internal/executing/tracing_test.go +++ b/block/internal/executing/tracing_test.go @@ -22,7 +22,6 @@ type mockBlockProducer struct { retrieveBatchFn func(ctx context.Context) (*BatchData, error) createBlockFn func(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) applyBlockFn func(ctx context.Context, header types.Header, data *types.Data) (types.State, error) - validateBlockFn func(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error } func (m *mockBlockProducer) ProduceBlock(ctx context.Context) error { @@ -53,13 +52,6 @@ func (m *mockBlockProducer) ApplyBlock(ctx context.Context, header types.Header, return types.State{}, nil } -func (m *mockBlockProducer) ValidateBlock(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { - if m.validateBlockFn != nil { - return m.validateBlockFn(ctx, lastState, header, data) - } - return nil -} - func setupBlockProducerTrace(t *testing.T, inner BlockProducer) (BlockProducer, *tracetest.SpanRecorder) { t.Helper() sr := tracetest.NewSpanRecorder() @@ -264,63 +256,6 @@ func TestTracedBlockProducer_ApplyBlock_Error(t *testing.T) { require.Equal(t, "execution failed", span.Status().Description) } -func TestTracedBlockProducer_ValidateBlock_Success(t *testing.T) { - mock := &mockBlockProducer{ - validateBlockFn: func(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { - return nil - }, - } - producer, sr := setupBlockProducerTrace(t, mock) - ctx := context.Background() - - header := &types.SignedHeader{ - Header: types.Header{ - BaseHeader: types.BaseHeader{ - Height: 75, - }, - }, - } - - err := producer.ValidateBlock(ctx, types.State{}, header, &types.Data{}) - require.NoError(t, err) - - spans := sr.Ended() - require.Len(t, spans, 1) - span := spans[0] - require.Equal(t, "BlockExecutor.ValidateBlock", span.Name()) - require.Equal(t, codes.Unset, span.Status().Code) - - attrs := span.Attributes() - testutil.RequireAttribute(t, attrs, "block.height", int64(75)) -} - -func TestTracedBlockProducer_ValidateBlock_Error(t *testing.T) { - mock := &mockBlockProducer{ - validateBlockFn: func(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { - return errors.New("validation failed") - }, - } - producer, sr := setupBlockProducerTrace(t, mock) - ctx := context.Background() - - header := &types.SignedHeader{ - Header: types.Header{ - BaseHeader: types.BaseHeader{ - Height: 75, - }, - }, - } - - err := producer.ValidateBlock(ctx, types.State{}, header, &types.Data{}) - require.Error(t, err) - - spans := sr.Ended() - require.Len(t, spans, 1) - span := spans[0] - require.Equal(t, codes.Error, span.Status().Code) - require.Equal(t, "validation failed", span.Status().Description) -} - // TestTracedBlockProducer_RetrieveBatch_ErrorWithValue verifies that when the inner // function returns both a value and an error, the value is passed through (not nil). // this is important for cases like ErrNoTransactionsInBatch where valid data accompanies the error. diff --git a/pkg/store/batch.go b/pkg/store/batch.go index 405119c61..6222b85ba 100644 --- a/pkg/store/batch.go +++ b/pkg/store/batch.go @@ -43,14 +43,11 @@ func (b *DefaultBatch) SetHeight(height uint64) error { } heightBytes := encodeHeight(height) - return b.batch.Put(b.ctx, ds.NewKey(getHeightKey()), heightBytes) + return b.batch.Put(b.ctx, ds.RawKey(getHeightKey()), heightBytes) } // SaveBlockData saves block data to the batch func (b *DefaultBatch) SaveBlockData(header *types.SignedHeader, data *types.Data, signature *types.Signature) error { - height := header.Height() - signatureHash := *signature - headerBlob, err := header.MarshalBinary() if err != nil { return fmt.Errorf("failed to marshal Header to binary: %w", err) @@ -59,20 +56,28 @@ func (b *DefaultBatch) SaveBlockData(header *types.SignedHeader, data *types.Dat if err != nil { return fmt.Errorf("failed to marshal Data to binary: %w", err) } + return b.SaveBlockDataFromBytes(header, headerBlob, dataBlob, signature) +} + +// SaveBlockDataFromBytes saves pre-serialized block data to the batch. +// This avoids re-marshalling header and data when the caller already has the binary blobs. +func (b *DefaultBatch) SaveBlockDataFromBytes(header *types.SignedHeader, headerBlob, dataBlob []byte, signature *types.Signature) error { + height := header.Height() + signatureHash := *signature - if err := b.batch.Put(b.ctx, ds.NewKey(getHeaderKey(height)), headerBlob); err != nil { + if err := b.batch.Put(b.ctx, ds.RawKey(getHeaderKey(height)), headerBlob); err != nil { return fmt.Errorf("failed to put header blob in batch: %w", err) } - if err := b.batch.Put(b.ctx, ds.NewKey(getDataKey(height)), dataBlob); err != nil { + if err := b.batch.Put(b.ctx, ds.RawKey(getDataKey(height)), dataBlob); err != nil { return fmt.Errorf("failed to put data blob in batch: %w", err) } - if err := b.batch.Put(b.ctx, ds.NewKey(getSignatureKey(height)), signatureHash[:]); err != nil { + if err := b.batch.Put(b.ctx, ds.RawKey(getSignatureKey(height)), signatureHash[:]); err != nil { return fmt.Errorf("failed to put signature blob in batch: %w", err) } headerHash := sha256.Sum256(headerBlob) heightBytes := encodeHeight(height) - if err := b.batch.Put(b.ctx, ds.NewKey(getIndexKey(headerHash[:])), heightBytes); err != nil { + if err := b.batch.Put(b.ctx, ds.RawKey(getIndexKey(headerHash[:])), heightBytes); err != nil { return fmt.Errorf("failed to put index key in batch: %w", err) } @@ -93,7 +98,7 @@ func (b *DefaultBatch) UpdateState(state types.State) error { return fmt.Errorf("failed to marshal state to protobuf: %w", err) } - return b.batch.Put(b.ctx, ds.NewKey(getStateAtHeightKey(height)), data) + return b.batch.Put(b.ctx, ds.RawKey(getStateAtHeightKey(height)), data) } // Commit commits all batched operations atomically diff --git a/pkg/store/keys.go b/pkg/store/keys.go index f2aa45d8a..02053bb84 100644 --- a/pkg/store/keys.go +++ b/pkg/store/keys.go @@ -39,29 +39,40 @@ const ( heightPrefix = "t" ) +// heightKey builds a key like "/h/123" with minimal allocation using strconv.AppendUint. +func heightKey(prefix string, height uint64) string { + // Pre-allocate: "/" + prefix + "/" + max uint64 digits (20) + buf := make([]byte, 0, 2+len(prefix)+20) + buf = append(buf, '/') + buf = append(buf, prefix...) + buf = append(buf, '/') + buf = strconv.AppendUint(buf, height, 10) + return string(buf) +} + // GetHeaderKey returns the store key for a block header at the given height. func GetHeaderKey(height uint64) string { - return GenerateKey([]string{headerPrefix, strconv.FormatUint(height, 10)}) + return heightKey(headerPrefix, height) } -func getHeaderKey(height uint64) string { return GetHeaderKey(height) } +func getHeaderKey(height uint64) string { return heightKey(headerPrefix, height) } // GetDataKey returns the store key for block data at the given height. func GetDataKey(height uint64) string { - return GenerateKey([]string{dataPrefix, strconv.FormatUint(height, 10)}) + return heightKey(dataPrefix, height) } -func getDataKey(height uint64) string { return GetDataKey(height) } +func getDataKey(height uint64) string { return heightKey(dataPrefix, height) } // GetSignatureKey returns the store key for a block signature at the given height. func GetSignatureKey(height uint64) string { - return GenerateKey([]string{signaturePrefix, strconv.FormatUint(height, 10)}) + return heightKey(signaturePrefix, height) } -func getSignatureKey(height uint64) string { return GetSignatureKey(height) } +func getSignatureKey(height uint64) string { return heightKey(signaturePrefix, height) } func getStateAtHeightKey(height uint64) string { - return GenerateKey([]string{statePrefix, strconv.FormatUint(height, 10)}) + return heightKey(statePrefix, height) } // GetMetaKey returns the store key for a metadata entry. @@ -77,7 +88,7 @@ func GetIndexKey(hash types.Hash) string { func getIndexKey(hash types.Hash) string { return GetIndexKey(hash) } func getHeightKey() string { - return GenerateKey([]string{heightPrefix}) + return "/" + heightPrefix } // GetHeightToDAHeightHeaderKey returns the metadata key for storing the DA height diff --git a/pkg/store/kv.go b/pkg/store/kv.go index 95cad9770..3ee23bc2c 100644 --- a/pkg/store/kv.go +++ b/pkg/store/kv.go @@ -2,7 +2,6 @@ package store import ( "context" - "path" "path/filepath" "strings" @@ -42,8 +41,18 @@ func GetPrefixEntries(ctx context.Context, store ds.Datastore, prefix string) (d // GenerateKey creates a key from a slice of string fields, joining them with slashes. func GenerateKey(fields []string) string { - key := "/" + strings.Join(fields, "/") - return path.Clean(key) + // Pre-calculate total size to avoid re-allocation. + n := 0 + for _, f := range fields { + n += 1 + len(f) // '/' + field + } + var b strings.Builder + b.Grow(n) + for _, f := range fields { + b.WriteByte('/') + b.WriteString(f) + } + return b.String() } // rootify works just like in cosmos-sdk diff --git a/pkg/store/tracing.go b/pkg/store/tracing.go index 796f30289..037507b4e 100644 --- a/pkg/store/tracing.go +++ b/pkg/store/tracing.go @@ -324,6 +324,22 @@ func (b *tracedBatch) SaveBlockData(header *types.SignedHeader, data *types.Data return nil } +func (b *tracedBatch) SaveBlockDataFromBytes(header *types.SignedHeader, headerBlob, dataBlob []byte, signature *types.Signature) error { + _, span := b.tracer.Start(b.ctx, "Batch.SaveBlockDataFromBytes", + trace.WithAttributes(attribute.Int64("height", int64(header.Height()))), + ) + defer span.End() + + err := b.inner.SaveBlockDataFromBytes(header, headerBlob, dataBlob, signature) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + func (b *tracedBatch) SetHeight(height uint64) error { _, span := b.tracer.Start(b.ctx, "Batch.SetHeight", trace.WithAttributes(attribute.Int64("height", int64(height))), diff --git a/pkg/store/tracing_test.go b/pkg/store/tracing_test.go index 1109791c7..020a31e23 100644 --- a/pkg/store/tracing_test.go +++ b/pkg/store/tracing_test.go @@ -167,6 +167,11 @@ func (b *tracingMockBatch) SaveBlockData(header *types.SignedHeader, data *types return nil } +func (b *tracingMockBatch) SaveBlockDataFromBytes(header *types.SignedHeader, _, _ []byte, signature *types.Signature) error { + // Delegate to SaveBlockData for test mocking purposes. + return b.SaveBlockData(header, nil, signature) +} + func (b *tracingMockBatch) SetHeight(height uint64) error { if b.setHeightFn != nil { return b.setHeightFn(height) diff --git a/pkg/store/types.go b/pkg/store/types.go index 106c644e7..a461623ba 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -13,6 +13,10 @@ type Batch interface { // SaveBlockData atomically saves the block header, data, and signature SaveBlockData(header *types.SignedHeader, data *types.Data, signature *types.Signature) error + // SaveBlockDataFromBytes is like SaveBlockData but accepts pre-serialized header and data bytes, + // avoiding re-marshalling when the caller already has the binary blobs. + SaveBlockDataFromBytes(header *types.SignedHeader, headerBlob, dataBlob []byte, signature *types.Signature) error + // SetHeight sets the height in the batch SetHeight(height uint64) error diff --git a/test/mocks/batch.go b/test/mocks/batch.go index 90025855b..2151c3c5f 100644 --- a/test/mocks/batch.go +++ b/test/mocks/batch.go @@ -252,6 +252,75 @@ func (_c *MockBatch_SaveBlockData_Call) RunAndReturn(run func(header *types.Sign return _c } +// SaveBlockDataFromBytes provides a mock function for the type MockBatch +func (_mock *MockBatch) SaveBlockDataFromBytes(header *types.SignedHeader, headerBlob []byte, dataBlob []byte, signature *types.Signature) error { + ret := _mock.Called(header, headerBlob, dataBlob, signature) + + if len(ret) == 0 { + panic("no return value specified for SaveBlockDataFromBytes") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(*types.SignedHeader, []byte, []byte, *types.Signature) error); ok { + r0 = returnFunc(header, headerBlob, dataBlob, signature) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockBatch_SaveBlockDataFromBytes_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveBlockDataFromBytes' +type MockBatch_SaveBlockDataFromBytes_Call struct { + *mock.Call +} + +// SaveBlockDataFromBytes is a helper method to define mock.On call +// - header *types.SignedHeader +// - headerBlob []byte +// - dataBlob []byte +// - signature *types.Signature +func (_e *MockBatch_Expecter) SaveBlockDataFromBytes(header interface{}, headerBlob interface{}, dataBlob interface{}, signature interface{}) *MockBatch_SaveBlockDataFromBytes_Call { + return &MockBatch_SaveBlockDataFromBytes_Call{Call: _e.mock.On("SaveBlockDataFromBytes", header, headerBlob, dataBlob, signature)} +} + +func (_c *MockBatch_SaveBlockDataFromBytes_Call) Run(run func(header *types.SignedHeader, headerBlob []byte, dataBlob []byte, signature *types.Signature)) *MockBatch_SaveBlockDataFromBytes_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 *types.SignedHeader + if args[0] != nil { + arg0 = args[0].(*types.SignedHeader) + } + var arg1 []byte + if args[1] != nil { + arg1 = args[1].([]byte) + } + var arg2 []byte + if args[2] != nil { + arg2 = args[2].([]byte) + } + var arg3 *types.Signature + if args[3] != nil { + arg3 = args[3].(*types.Signature) + } + run( + arg0, + arg1, + arg2, + arg3, + ) + }) + return _c +} + +func (_c *MockBatch_SaveBlockDataFromBytes_Call) Return(err error) *MockBatch_SaveBlockDataFromBytes_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockBatch_SaveBlockDataFromBytes_Call) RunAndReturn(run func(header *types.SignedHeader, headerBlob []byte, dataBlob []byte, signature *types.Signature) error) *MockBatch_SaveBlockDataFromBytes_Call { + _c.Call.Return(run) + return _c +} + // SetHeight provides a mock function for the type MockBatch func (_mock *MockBatch) SetHeight(height uint64) error { ret := _mock.Called(height) diff --git a/types/data.go b/types/data.go index 57e31046c..1345e4e82 100644 --- a/types/data.go +++ b/types/data.go @@ -152,3 +152,13 @@ func (d *Data) Validate() error { func (d *Data) Size() int { return proto.Size(d.ToProto()) } + +// TxsByteSize returns total byte size of all transactions without proto marshalling. +// This is much cheaper than Size() and useful for metrics/reporting. +func (d *Data) TxsByteSize() int { + n := 0 + for _, tx := range d.Txs { + n += len(tx) + } + return n +} diff --git a/types/serialization.go b/types/serialization.go index 0ee1ac87f..b304b61fb 100644 --- a/types/serialization.go +++ b/types/serialization.go @@ -76,7 +76,7 @@ func (sh *SignedHeader) ToProto() (*pb.SignedHeader, error) { }, nil } - pubKey, err := crypto.MarshalPublicKey(sh.Signer.PubKey) + pubKey, err := sh.Signer.MarshalledPubKey() if err != nil { return nil, err } @@ -114,8 +114,9 @@ func (sh *SignedHeader) FromProto(other *pb.SignedHeader) error { return err } sh.Signer = Signer{ - Address: append([]byte(nil), other.Signer.Address...), - PubKey: pubKey, + Address: append([]byte(nil), other.Signer.Address...), + PubKey: pubKey, + marshalledPubKey: append([]byte(nil), other.Signer.PubKey...), } } else { sh.Signer = Signer{} @@ -157,7 +158,7 @@ func (sh *SignedHeader) ToDAEnvelopeProto(envelopeSignature []byte) (*pb.DAHeade }, nil } - pubKey, err := crypto.MarshalPublicKey(sh.Signer.PubKey) + pubKey, err := sh.Signer.MarshalledPubKey() if err != nil { return nil, err } @@ -196,8 +197,9 @@ func (sh *SignedHeader) FromDAEnvelopeProto(envelope *pb.DAHeaderEnvelope) error return err } sh.Signer = Signer{ - Address: append([]byte(nil), envelope.Signer.Address...), - PubKey: pubKey, + Address: append([]byte(nil), envelope.Signer.Address...), + PubKey: pubKey, + marshalledPubKey: append([]byte(nil), envelope.Signer.PubKey...), } } else { sh.Signer = Signer{} @@ -361,6 +363,9 @@ func (d *Data) FromProto(other *pb.Data) error { // ToProto converts State into protobuf representation and returns it. func (s *State) ToProto() (*pb.State, error) { + // Avoid timestamppb.New allocation by constructing inline. + secs := s.LastBlockTime.Unix() + nanos := int32(s.LastBlockTime.Nanosecond()) return &pb.State{ Version: &pb.Version{ @@ -370,7 +375,7 @@ func (s *State) ToProto() (*pb.State, error) { ChainId: s.ChainID, InitialHeight: s.InitialHeight, LastBlockHeight: s.LastBlockHeight, - LastBlockTime: timestamppb.New(s.LastBlockTime), + LastBlockTime: ×tamppb.Timestamp{Seconds: secs, Nanos: nanos}, DaHeight: s.DAHeight, AppHash: s.AppHash[:], LastHeaderHash: s.LastHeaderHash[:], @@ -440,7 +445,7 @@ func (sd *SignedData) ToProto() (*pb.SignedData, error) { var signerProto *pb.Signer if sd.Signer.PubKey != nil { - pubKey, err := crypto.MarshalPublicKey(sd.Signer.PubKey) + pubKey, err := sd.Signer.MarshalledPubKey() if err != nil { return nil, err } @@ -481,8 +486,9 @@ func (sd *SignedData) FromProto(other *pb.SignedData) error { return err } sd.Signer = Signer{ - Address: append([]byte(nil), other.Signer.Address...), - PubKey: pubKey, + Address: append([]byte(nil), other.Signer.Address...), + PubKey: pubKey, + marshalledPubKey: append([]byte(nil), other.Signer.PubKey...), } } else { sd.Signer = Signer{} diff --git a/types/signer.go b/types/signer.go index af8498a75..fa9391209 100644 --- a/types/signer.go +++ b/types/signer.go @@ -10,6 +10,25 @@ import ( type Signer struct { PubKey crypto.PubKey Address []byte + // marshalledPubKey caches the result of crypto.MarshalPublicKey to avoid + // repeated serialization in SignedHeader.ToProto(). + marshalledPubKey []byte +} + +// MarshalledPubKey returns the marshalled public key bytes, caching the result. +func (s *Signer) MarshalledPubKey() ([]byte, error) { + if len(s.marshalledPubKey) > 0 { + return s.marshalledPubKey, nil + } + if s.PubKey == nil { + return nil, nil + } + bz, err := crypto.MarshalPublicKey(s.PubKey) + if err != nil { + return nil, err + } + s.marshalledPubKey = bz + return bz, nil } // NewSigner creates a new signer from a public key. @@ -19,10 +38,17 @@ func NewSigner(pubKey crypto.PubKey) (Signer, error) { return Signer{}, err } + // Pre-cache marshalled pub key for later use in ToProto. + marshalledPubKey, err := crypto.MarshalPublicKey(pubKey) + if err != nil { + return Signer{}, err + } + address := sha256.Sum256(bz) return Signer{ - PubKey: pubKey, - Address: address[:], + PubKey: pubKey, + Address: address[:], + marshalledPubKey: marshalledPubKey, }, nil } diff --git a/types/state.go b/types/state.go index bf535de70..714549a53 100644 --- a/types/state.go +++ b/types/state.go @@ -56,13 +56,21 @@ func (s *State) NextState(header Header, stateRoot []byte) (State, error) { // AssertValidForNextState performs common validation of a header and data against the current state. // It assumes any context-specific basic header checks and verifier setup have already been performed func (s State) AssertValidForNextState(header *SignedHeader, data *Data) error { - if header.ChainID() != s.ChainID { - return fmt.Errorf("invalid chain ID - got %s, want %s", header.ChainID(), s.ChainID) + if err := s.AssertValidSequence(header); err != nil { + return err } if err := Validate(header, data); err != nil { return fmt.Errorf("header-data validation failed: %w", err) } + return nil +} + +// AssertValidSequence performs lightweight state-sequence validation for self-produced blocks. +func (s State) AssertValidSequence(header *SignedHeader) error { + if header.ChainID() != s.ChainID { + return fmt.Errorf("invalid chain ID - got %s, want %s", header.ChainID(), s.ChainID) + } if len(s.LastHeaderHash) == 0 { // initial state return nil