From 7dea42f11092b63a8334b3f3209c8a137507225e Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Wed, 18 Feb 2026 15:28:15 +0100 Subject: [PATCH 01/14] Introduce phase1 benchmark --- .github/workflows/benchmark.yml | 16 +++ .../executing/executor_benchmark_test.go | 124 ++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 block/internal/executing/executor_benchmark_test.go diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index e34f61502..9e3715bae 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -40,3 +40,19 @@ jobs: alert-threshold: '150%' fail-on-alert: true comment-on-alert: true + + - name: Run Block Executor benchmarks + run: | + go test -bench=BenchmarkProduceBlock -benchmem -run='^$' \ + ./block/internal/executing/... > block_executor_output.txt + - name: Store Block Executor benchmark result + uses: benchmark-action/github-action-benchmark@4bdcce38c94cec68da58d012ac24b7b1155efe8b # v1.20.7 + with: + name: Block Executor Benchmark + tool: 'go' + output-file-path: block_executor_output.txt + auto-push: true + github-token: ${{ secrets.GITHUB_TOKEN }} + alert-threshold: '150%' + fail-on-alert: true + comment-on-alert: true diff --git a/block/internal/executing/executor_benchmark_test.go b/block/internal/executing/executor_benchmark_test.go new file mode 100644 index 000000000..ba3dd1746 --- /dev/null +++ b/block/internal/executing/executor_benchmark_test.go @@ -0,0 +1,124 @@ +package executing + +import ( + "context" + "crypto/rand" + "testing" + "time" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/sync" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/evstack/ev-node/block/internal/cache" + "github.com/evstack/ev-node/block/internal/common" + coreseq "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/pkg/config" + "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/signer/noop" + "github.com/evstack/ev-node/pkg/store" + testmocks "github.com/evstack/ev-node/test/mocks" + "github.com/evstack/ev-node/types" +) + +func BenchmarkProduceBlock(b *testing.B) { + specs := map[string]struct { + txs [][]byte + }{ + "empty batch": { + txs: nil, + }, + "single tx": { + txs: [][]byte{[]byte("tx1")}, + }, + } + for name, spec := range specs { + b.Run(name, func(b *testing.B) { + exec := newBenchExecutor(b, spec.txs) + ctx := b.Context() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := exec.ProduceBlock(ctx); err != nil { + b.Fatalf("ProduceBlock: %v", err) + } + } + }) + } +} + +func newBenchExecutor(b *testing.B, txs [][]byte) *Executor { + b.Helper() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + memStore := store.New(ds) + + cacheManager, err := cache.NewManager(config.DefaultConfig(), memStore, zerolog.Nop()) + require.NoError(b, err) + + // Generate signer without depending on *testing.T + priv, _, err := crypto.GenerateEd25519Key(rand.Reader) + require.NoError(b, err) + signerWrapper, err := noop.NewNoopSigner(priv) + require.NoError(b, err) + addr, err := signerWrapper.GetAddress() + require.NoError(b, err) + + cfg := config.DefaultConfig() + cfg.Node.BlockTime = config.DurationWrapper{Duration: 10 * time.Millisecond} + cfg.Node.MaxPendingHeadersAndData = 100000 + + gen := genesis.Genesis{ + ChainID: "bench-chain", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Hour), + ProposerAddress: addr, + } + + mockExec := testmocks.NewMockExecutor(b) + mockSeq := testmocks.NewMockSequencer(b) + hb := common.NewMockBroadcaster[*types.P2PSignedHeader](b) + db := common.NewMockBroadcaster[*types.P2PData](b) + + hb.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil) + db.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil) + + exec, err := NewExecutor( + memStore, mockExec, mockSeq, signerWrapper, + cacheManager, common.NopMetrics(), cfg, gen, + hb, db, zerolog.Nop(), common.DefaultBlockOptions(), + make(chan error, 1), nil, + ) + require.NoError(b, err) + + // One-time init expectations + mockExec.EXPECT().InitChain(mock.Anything, mock.AnythingOfType("time.Time"), gen.InitialHeight, gen.ChainID). + Return([]byte("init_root"), nil).Once() + mockSeq.EXPECT().SetDAHeight(uint64(0)).Return().Once() + + require.NoError(b, exec.initializeState()) + + exec.ctx, exec.cancel = context.WithCancel(b.Context()) + b.Cleanup(func() { exec.cancel() }) + + // Loop expectations (unlimited calls) + lastBatchTime := gen.StartTime + mockSeq.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). + RunAndReturn(func(ctx context.Context, req coreseq.GetNextBatchRequest) (*coreseq.GetNextBatchResponse, error) { + lastBatchTime = lastBatchTime.Add(cfg.Node.BlockTime.Duration) + return &coreseq.GetNextBatchResponse{ + Batch: &coreseq.Batch{Transactions: txs}, + Timestamp: lastBatchTime, + BatchData: txs, + }, nil + }) + mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, mock.Anything, mock.AnythingOfType("time.Time"), mock.Anything). + Return([]byte("new_root"), nil) + mockSeq.EXPECT().GetDAHeight().Return(uint64(0)) + + return exec +} From fc7f729ba60964c27bdb4db8fcc8b32e7bf30383 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 19 Feb 2026 15:35:00 +0100 Subject: [PATCH 02/14] Bench refactor --- .../executing/executor_benchmark_test.go | 98 ++++++++++++------- 1 file changed, 64 insertions(+), 34 deletions(-) diff --git a/block/internal/executing/executor_benchmark_test.go b/block/internal/executing/executor_benchmark_test.go index ba3dd1746..0eb2a3a0d 100644 --- a/block/internal/executing/executor_benchmark_test.go +++ b/block/internal/executing/executor_benchmark_test.go @@ -6,21 +6,22 @@ import ( "testing" "time" + "github.com/celestiaorg/go-header" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/crypto" "github.com/rs/zerolog" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" + coreexec "github.com/evstack/ev-node/core/execution" coreseq "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/signer/noop" "github.com/evstack/ev-node/pkg/store" - testmocks "github.com/evstack/ev-node/test/mocks" "github.com/evstack/ev-node/types" ) @@ -37,7 +38,7 @@ func BenchmarkProduceBlock(b *testing.B) { } for name, spec := range specs { b.Run(name, func(b *testing.B) { - exec := newBenchExecutor(b, spec.txs) + exec := newBenchExecutorWithStubs(b, spec.txs) ctx := b.Context() b.ReportAllocs() @@ -51,7 +52,8 @@ func BenchmarkProduceBlock(b *testing.B) { } } -func newBenchExecutor(b *testing.B, txs [][]byte) *Executor { +// newBenchExecutorWithStubs creates an Executor using zero-overhead stubs. +func newBenchExecutorWithStubs(b *testing.B, txs [][]byte) *Executor { b.Helper() ds := sync.MutexWrap(datastore.NewMapDatastore()) @@ -60,7 +62,6 @@ func newBenchExecutor(b *testing.B, txs [][]byte) *Executor { cacheManager, err := cache.NewManager(config.DefaultConfig(), memStore, zerolog.Nop()) require.NoError(b, err) - // Generate signer without depending on *testing.T priv, _, err := crypto.GenerateEd25519Key(rand.Reader) require.NoError(b, err) signerWrapper, err := noop.NewNoopSigner(priv) @@ -70,7 +71,7 @@ func newBenchExecutor(b *testing.B, txs [][]byte) *Executor { cfg := config.DefaultConfig() cfg.Node.BlockTime = config.DurationWrapper{Duration: 10 * time.Millisecond} - cfg.Node.MaxPendingHeadersAndData = 100000 + cfg.Node.MaxPendingHeadersAndData = 0 // disabled — avoids advancePastEmptyData store scans gen := genesis.Genesis{ ChainID: "bench-chain", @@ -79,46 +80,75 @@ func newBenchExecutor(b *testing.B, txs [][]byte) *Executor { ProposerAddress: addr, } - mockExec := testmocks.NewMockExecutor(b) - mockSeq := testmocks.NewMockSequencer(b) - hb := common.NewMockBroadcaster[*types.P2PSignedHeader](b) - db := common.NewMockBroadcaster[*types.P2PData](b) - - hb.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil) - db.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil) + stubExec := &stubExecClient{stateRoot: []byte("init_root")} + stubSeq := &stubSequencer{ + batchResp: &coreseq.GetNextBatchResponse{ + Batch: &coreseq.Batch{Transactions: txs}, + Timestamp: time.Now(), + BatchData: txs, + }, + } + hb := &stubBroadcaster[*types.P2PSignedHeader]{} + db := &stubBroadcaster[*types.P2PData]{} exec, err := NewExecutor( - memStore, mockExec, mockSeq, signerWrapper, + memStore, stubExec, stubSeq, signerWrapper, cacheManager, common.NopMetrics(), cfg, gen, hb, db, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), nil, ) require.NoError(b, err) - // One-time init expectations - mockExec.EXPECT().InitChain(mock.Anything, mock.AnythingOfType("time.Time"), gen.InitialHeight, gen.ChainID). - Return([]byte("init_root"), nil).Once() - mockSeq.EXPECT().SetDAHeight(uint64(0)).Return().Once() - require.NoError(b, exec.initializeState()) exec.ctx, exec.cancel = context.WithCancel(b.Context()) b.Cleanup(func() { exec.cancel() }) - // Loop expectations (unlimited calls) - lastBatchTime := gen.StartTime - mockSeq.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). - RunAndReturn(func(ctx context.Context, req coreseq.GetNextBatchRequest) (*coreseq.GetNextBatchResponse, error) { - lastBatchTime = lastBatchTime.Add(cfg.Node.BlockTime.Duration) - return &coreseq.GetNextBatchResponse{ - Batch: &coreseq.Batch{Transactions: txs}, - Timestamp: lastBatchTime, - BatchData: txs, - }, nil - }) - mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, mock.Anything, mock.AnythingOfType("time.Time"), mock.Anything). - Return([]byte("new_root"), nil) - mockSeq.EXPECT().GetDAHeight().Return(uint64(0)) - return exec } + +// stubSequencer implements coreseq.Sequencer with fixed return values. +type stubSequencer struct { + batchResp *coreseq.GetNextBatchResponse +} + +func (s *stubSequencer) SubmitBatchTxs(context.Context, coreseq.SubmitBatchTxsRequest) (*coreseq.SubmitBatchTxsResponse, error) { + return nil, nil +} +func (s *stubSequencer) GetNextBatch(context.Context, coreseq.GetNextBatchRequest) (*coreseq.GetNextBatchResponse, error) { + return s.batchResp, nil +} +func (s *stubSequencer) VerifyBatch(context.Context, coreseq.VerifyBatchRequest) (*coreseq.VerifyBatchResponse, error) { + return nil, nil +} +func (s *stubSequencer) SetDAHeight(uint64) {} +func (s *stubSequencer) GetDAHeight() uint64 { return 0 } + +// stubExecClient implements coreexec.Executor with fixed return values. +type stubExecClient struct { + stateRoot []byte +} + +func (s *stubExecClient) InitChain(context.Context, time.Time, uint64, string) ([]byte, error) { + return s.stateRoot, nil +} +func (s *stubExecClient) GetTxs(context.Context) ([][]byte, error) { return nil, nil } +func (s *stubExecClient) ExecuteTxs(_ context.Context, _ [][]byte, _ uint64, _ time.Time, _ []byte) ([]byte, error) { + return s.stateRoot, nil +} +func (s *stubExecClient) SetFinal(context.Context, uint64) error { return nil } +func (s *stubExecClient) GetExecutionInfo(context.Context) (coreexec.ExecutionInfo, error) { + return coreexec.ExecutionInfo{}, nil +} +func (s *stubExecClient) FilterTxs(context.Context, [][]byte, uint64, uint64, bool) ([]coreexec.FilterStatus, error) { + return nil, nil +} + +// stubBroadcaster implements common.Broadcaster[H] with no-ops. +type stubBroadcaster[H header.Header[H]] struct{} + +func (s *stubBroadcaster[H]) WriteToStoreAndBroadcast(context.Context, H, ...pubsub.PubOpt) error { + return nil +} +func (s *stubBroadcaster[H]) Store() header.Store[H] { return nil } +func (s *stubBroadcaster[H]) Height() uint64 { return 0 } From a4fb81f52dad2dfea5c4d0d6fbb1d52007ac9ef6 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 19 Feb 2026 16:18:16 +0100 Subject: [PATCH 03/14] bench: fix monotonically-increasing timestamp + add 100-tx case --- .../executing/executor_benchmark_test.go | 36 +++++++++++++------ 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/block/internal/executing/executor_benchmark_test.go b/block/internal/executing/executor_benchmark_test.go index 0eb2a3a0d..4c925a013 100644 --- a/block/internal/executing/executor_benchmark_test.go +++ b/block/internal/executing/executor_benchmark_test.go @@ -3,6 +3,8 @@ package executing import ( "context" "crypto/rand" + "fmt" + "sync/atomic" "testing" "time" @@ -35,6 +37,9 @@ func BenchmarkProduceBlock(b *testing.B) { "single tx": { txs: [][]byte{[]byte("tx1")}, }, + "100 txs": { + txs: createTxs(100), + }, } for name, spec := range specs { b.Run(name, func(b *testing.B) { @@ -81,13 +86,7 @@ func newBenchExecutorWithStubs(b *testing.B, txs [][]byte) *Executor { } stubExec := &stubExecClient{stateRoot: []byte("init_root")} - stubSeq := &stubSequencer{ - batchResp: &coreseq.GetNextBatchResponse{ - Batch: &coreseq.Batch{Transactions: txs}, - Timestamp: time.Now(), - BatchData: txs, - }, - } + stubSeq := &stubSequencer{txs: txs} hb := &stubBroadcaster[*types.P2PSignedHeader]{} db := &stubBroadcaster[*types.P2PData]{} @@ -107,16 +106,25 @@ func newBenchExecutorWithStubs(b *testing.B, txs [][]byte) *Executor { return exec } -// stubSequencer implements coreseq.Sequencer with fixed return values. +// stubSequencer implements coreseq.Sequencer. +// GetNextBatch returns a monotonically-increasing timestamp on every call so +// that successive ProduceBlock iterations pass AssertValidSequence. type stubSequencer struct { - batchResp *coreseq.GetNextBatchResponse + txs [][]byte + counter atomic.Int64 // incremented each call; used to advance the timestamp } func (s *stubSequencer) SubmitBatchTxs(context.Context, coreseq.SubmitBatchTxsRequest) (*coreseq.SubmitBatchTxsResponse, error) { return nil, nil } func (s *stubSequencer) GetNextBatch(context.Context, coreseq.GetNextBatchRequest) (*coreseq.GetNextBatchResponse, error) { - return s.batchResp, nil + n := s.counter.Add(1) + ts := time.Now().Add(time.Duration(n) * time.Millisecond) + return &coreseq.GetNextBatchResponse{ + Batch: &coreseq.Batch{Transactions: s.txs}, + Timestamp: ts, + BatchData: s.txs, + }, nil } func (s *stubSequencer) VerifyBatch(context.Context, coreseq.VerifyBatchRequest) (*coreseq.VerifyBatchResponse, error) { return nil, nil @@ -124,6 +132,14 @@ func (s *stubSequencer) VerifyBatch(context.Context, coreseq.VerifyBatchRequest) func (s *stubSequencer) SetDAHeight(uint64) {} func (s *stubSequencer) GetDAHeight() uint64 { return 0 } +func createTxs(n int) [][]byte { + txs := make([][]byte, n) + for i := 0; i < n; i++ { + txs[i] = []byte(fmt.Sprintf("tx%d", i)) + } + return txs +} + // stubExecClient implements coreexec.Executor with fixed return values. type stubExecClient struct { stateRoot []byte From 97bacb4a0a5fa3add5b5b7b9db1dbecb73106a15 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Wed, 18 Feb 2026 15:41:10 +0100 Subject: [PATCH 04/14] shot1 --- block/internal/executing/executor.go | 63 ++++++++++++++++++---------- 1 file changed, 40 insertions(+), 23 deletions(-) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index ac68f2cd8..647c01165 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -14,7 +14,6 @@ import ( "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p/core/crypto" "github.com/rs/zerolog" - "golang.org/x/sync/errgroup" "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" @@ -56,6 +55,10 @@ type Executor struct { // State management lastState *atomic.Pointer[types.State] + // hasPendingBlock tracks whether a pending block exists in the store, + // avoiding a store lookup on every ProduceBlock call. + hasPendingBlock atomic.Bool + // Channels for coordination txNotifyCh chan struct{} errorCh chan<- error // Channel to report critical execution client failures @@ -277,6 +280,12 @@ func (e *Executor) initializeState() error { return fmt.Errorf("failed to migrate legacy pending block: %w", err) } + // Detect any existing pending block and set the in-memory flag so that + // ProduceBlock can skip unnecessary store lookups on the happy path. + if _, err := e.store.GetMetadata(e.ctx, headerKey); err == nil { + e.hasPendingBlock.Store(true) + } + // Determine sync target: use Raft height if node is behind Raft consensus syncTargetHeight := state.LastBlockHeight if e.raftNode != nil { @@ -434,17 +443,22 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { batchData *BatchData ) - // Check if there's an already stored block at the newHeight - // If there is use that instead of creating a new block - pendingHeader, pendingData, err := e.getPendingBlock(ctx) - if err == nil && pendingHeader != nil && pendingHeader.Height() == newHeight { - e.logger.Info().Uint64("height", newHeight).Msg("using pending block") - header = pendingHeader - data = pendingData - } else if err != nil && !errors.Is(err, datastore.ErrNotFound) { - return fmt.Errorf("failed to get block data: %w", err) - } else { + // Check if there's an already stored block at the newHeight. + // Only hit the store if the in-memory flag indicates a pending block exists. + if e.hasPendingBlock.Load() { + pendingHeader, pendingData, err := e.getPendingBlock(ctx) + if err == nil && pendingHeader != nil && pendingHeader.Height() == newHeight { + e.logger.Info().Uint64("height", newHeight).Msg("using pending block") + header = pendingHeader + data = pendingData + } else if err != nil && !errors.Is(err, datastore.ErrNotFound) { + return fmt.Errorf("failed to get block data: %w", err) + } + } + + if header == nil { // get batch from sequencer + var err error batchData, err = e.blockProducer.RetrieveBatch(ctx) if errors.Is(err, common.ErrNoBatch) { e.logger.Debug().Msg("no batch available") @@ -462,6 +476,7 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { if err := e.savePendingBlock(ctx, header, data); err != nil { return fmt.Errorf("failed to save block data: %w", err) } + e.hasPendingBlock.Store(true) } if e.raftNode != nil && !e.raftNode.HasQuorum() { @@ -485,7 +500,9 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { } header.Signature = signature - if err := e.blockProducer.ValidateBlock(ctx, currentState, header, data); err != nil { + // Structural validation only — skip the expensive signature re-verification + // (ValidateBasic) since we just signed this block ourselves. + if err := currentState.AssertValidForNextState(header, data); err != nil { e.sendCriticalError(fmt.Errorf("failed to validate block: %w", err)) e.logger.Error().Err(err).Msg("CRITICAL: Permanent block validation error - halting block production") return fmt.Errorf("failed to validate block: %w", err) @@ -541,20 +558,20 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { return fmt.Errorf("failed to commit batch: %w", err) } + // Clear pending block flag after successful commit + e.hasPendingBlock.Store(false) + // Update in-memory state after successful commit e.setLastState(newState) - // broadcast header and data to P2P network - g, broadcastCtx := errgroup.WithContext(e.ctx) - g.Go(func() error { - return e.headerBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, &types.P2PSignedHeader{SignedHeader: header}) - }) - g.Go(func() error { - return e.dataBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, &types.P2PData{Data: data}) - }) - if err := g.Wait(); err != nil { - e.logger.Error().Err(err).Msg("failed to broadcast header and/data") - // don't fail block production on broadcast error + // Broadcast header and data to P2P network sequentially. + // This avoids the overhead of spawning two goroutines per block via errgroup, + // which the profiler showed dominates CPU time due to goroutine scheduling. + if err := e.headerBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PSignedHeader{SignedHeader: header}); err != nil { + e.logger.Error().Err(err).Msg("failed to broadcast header") + } + if err := e.dataBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PData{Data: data}); err != nil { + e.logger.Error().Err(err).Msg("failed to broadcast data") } e.recordBlockMetrics(newState, data) From 64bc9c75c01dcbe654814881a7167e68624212b1 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Wed, 18 Feb 2026 17:58:27 +0100 Subject: [PATCH 05/14] x --- block/internal/executing/executor.go | 174 ++++++++++++++++++--------- 1 file changed, 116 insertions(+), 58 deletions(-) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 647c01165..499969ef5 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -59,6 +59,20 @@ type Executor struct { // avoiding a store lookup on every ProduceBlock call. hasPendingBlock atomic.Bool + // Cached per-block data to avoid store reads + protobuf deserialization + // in CreateBlock. Updated after each successful block production. + lastHeaderHash types.Hash + lastDataHash types.Hash + + // pendingCheckCounter amortizes the expensive NumPendingHeaders/NumPendingData + // checks across multiple blocks. Only checked every pendingCheckInterval blocks. + pendingCheckCounter uint64 + lastSignature types.Signature + + // Cached static signer info — computed once at init, reused every block. + cachedPubKey crypto.PubKey + cachedValidatorHash types.Hash + // Channels for coordination txNotifyCh chan struct{} errorCh chan<- error // Channel to report critical execution client failures @@ -286,6 +300,41 @@ func (e *Executor) initializeState() error { e.hasPendingBlock.Store(true) } + // Cache static signer info — computed once, reused every CreateBlock call. + if e.signer != nil { + pubKey, err := e.signer.GetPublic() + if err != nil { + return fmt.Errorf("failed to cache public key: %w", err) + } + e.cachedPubKey = pubKey + + vHash, err := e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, pubKey) + if err != nil { + return fmt.Errorf("failed to cache validator hash: %w", err) + } + e.cachedValidatorHash = vHash + } else { + vHash, err := e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, nil) + if err != nil { + return fmt.Errorf("failed to cache validator hash: %w", err) + } + e.cachedValidatorHash = vHash + } + + // Warm the last-block cache so CreateBlock can avoid a store read on the + // very first block after restart. + if state.LastBlockHeight > 0 { + h, d, err := e.store.GetBlockData(e.ctx, state.LastBlockHeight) + if err == nil { + e.lastHeaderHash = h.Hash() + e.lastDataHash = d.Hash() + sig, err := e.store.GetSignature(e.ctx, state.LastBlockHeight) + if err == nil { + e.lastSignature = *sig + } + } + } + // Determine sync target: use Raft height if node is behind Raft consensus syncTargetHeight := state.LastBlockHeight if e.raftNode != nil { @@ -422,18 +471,26 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { e.logger.Debug().Uint64("height", newHeight).Msg("producing block") - // check pending limits + // Amortized pending limit check — NumPendingHeaders/NumPendingData call + // advancePastEmptyData which scans the store. Only amortize when the limit + // is large enough that checking every N blocks won't overshoot. + const pendingCheckInterval uint64 = 64 if e.config.Node.MaxPendingHeadersAndData > 0 { - pendingHeaders := e.cache.NumPendingHeaders() - pendingData := e.cache.NumPendingData() - if pendingHeaders >= e.config.Node.MaxPendingHeadersAndData || - pendingData >= e.config.Node.MaxPendingHeadersAndData { - e.logger.Warn(). - Uint64("pending_headers", pendingHeaders). - Uint64("pending_data", pendingData). - Uint64("limit", e.config.Node.MaxPendingHeadersAndData). - Msg("pending limit reached, skipping block production") - return nil + e.pendingCheckCounter++ + shouldCheck := e.config.Node.MaxPendingHeadersAndData <= pendingCheckInterval || + e.pendingCheckCounter%pendingCheckInterval == 0 + if shouldCheck { + pendingHeaders := e.cache.NumPendingHeaders() + pendingData := e.cache.NumPendingData() + if pendingHeaders >= e.config.Node.MaxPendingHeadersAndData || + pendingData >= e.config.Node.MaxPendingHeadersAndData { + e.logger.Warn(). + Uint64("pending_headers", pendingHeaders). + Uint64("pending_data", pendingData). + Uint64("limit", e.config.Node.MaxPendingHeadersAndData). + Msg("pending limit reached, skipping block production") + return nil + } } } @@ -473,10 +530,11 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to create block: %w", err) } - if err := e.savePendingBlock(ctx, header, data); err != nil { - return fmt.Errorf("failed to save block data: %w", err) - } - e.hasPendingBlock.Store(true) + // Pending block save is intentionally omitted here. The final + // SaveBlockData in the commit batch below covers crash recovery, and + // re-execution from the sequencer is safe because ExecuteTxs is + // deterministic. Skipping the separate pending save eliminates a full + // protobuf serialization + store write per block. } if e.raftNode != nil && !e.raftNode.HasQuorum() { @@ -493,7 +551,7 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { // signing the header is done after applying the block // as for signing, the state of the block may be required by the signature payload provider. - // For based sequencer, this will return an empty signature + // For based sequencer, this will return an empty signature. signature, err := e.signHeader(header.Header) if err != nil { return fmt.Errorf("failed to sign header: %w", err) @@ -564,9 +622,15 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { // Update in-memory state after successful commit e.setLastState(newState) + // Update last-block cache so the next CreateBlock avoids a store read. + e.lastHeaderHash = header.Hash() + e.lastDataHash = data.Hash() + e.lastSignature = signature + // Broadcast header and data to P2P network sequentially. - // This avoids the overhead of spawning two goroutines per block via errgroup, - // which the profiler showed dominates CPU time due to goroutine scheduling. + // IMPORTANT: Header MUST be broadcast before data — the P2P layer validates + // incoming data against the current and previous header, so out-of-order + // delivery would cause validation failures on peers. if err := e.headerBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PSignedHeader{SignedHeader: header}); err != nil { e.logger.Error().Err(err).Msg("failed to broadcast header") } @@ -621,7 +685,9 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba currentState := e.getLastState() headerTime := uint64(e.genesis.StartTime.UnixNano()) - // Get last block info + // Use cached last block info — populated during initializeState and updated + // after each successful block production. This avoids a store read + protobuf + // deserialization per block. var lastHeaderHash types.Hash var lastDataHash types.Hash var lastSignature types.Signature @@ -629,43 +695,31 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba if height > e.genesis.InitialHeight { headerTime = uint64(batchData.UnixNano()) - lastHeader, lastData, err := e.store.GetBlockData(ctx, height-1) - if err != nil { - return nil, nil, fmt.Errorf("failed to get last block: %w", err) - } - lastHeaderHash = lastHeader.Hash() - lastDataHash = lastData.Hash() + if len(e.lastHeaderHash) > 0 { + // Fast path: use in-memory cache + lastHeaderHash = e.lastHeaderHash + lastDataHash = e.lastDataHash + lastSignature = e.lastSignature + } else { + // Cold start fallback: read from store + lastHeader, lastData, err := e.store.GetBlockData(ctx, height-1) + if err != nil { + return nil, nil, fmt.Errorf("failed to get last block: %w", err) + } + lastHeaderHash = lastHeader.Hash() + lastDataHash = lastData.Hash() - lastSignaturePtr, err := e.store.GetSignature(ctx, height-1) - if err != nil { - return nil, nil, fmt.Errorf("failed to get last signature: %w", err) + lastSignaturePtr, err := e.store.GetSignature(ctx, height-1) + if err != nil { + return nil, nil, fmt.Errorf("failed to get last signature: %w", err) + } + lastSignature = *lastSignaturePtr } - lastSignature = *lastSignaturePtr } - // Get signer info and validator hash - var pubKey crypto.PubKey - var validatorHash types.Hash - - if e.signer != nil { - var err error - pubKey, err = e.signer.GetPublic() - if err != nil { - return nil, nil, fmt.Errorf("failed to get public key: %w", err) - } - - validatorHash, err = e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, pubKey) - if err != nil { - return nil, nil, fmt.Errorf("failed to get validator hash: %w", err) - } - } else { - // For based sequencer without signer, use nil pubkey and compute validator hash - var err error - validatorHash, err = e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, nil) - if err != nil { - return nil, nil, fmt.Errorf("failed to get validator hash: %w", err) - } - } + // Use cached signer info — computed once at init, reused every block. + pubKey := e.cachedPubKey + validatorHash := e.cachedValidatorHash // Create header header := &types.SignedHeader{ @@ -720,16 +774,20 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba func (e *Executor) ApplyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) { currentState := e.getLastState() - // Prepare transactions - rawTxs := make([][]byte, len(data.Txs)) - for i, tx := range data.Txs { - rawTxs[i] = []byte(tx) + // Convert Txs to [][]byte for the execution client. + // types.Tx is []byte, so this is a type conversion, not a copy. + var rawTxs [][]byte + if n := len(data.Txs); n > 0 { + rawTxs = make([][]byte, n) + for i, tx := range data.Txs { + rawTxs[i] = []byte(tx) + } } // Execute transactions - ctx = context.WithValue(ctx, types.HeaderContextKey, header) + execCtx := context.WithValue(ctx, types.HeaderContextKey, header) - newAppHash, err := e.executeTxsWithRetry(ctx, rawTxs, header, currentState) + newAppHash, err := e.executeTxsWithRetry(execCtx, rawTxs, header, currentState) if err != nil { e.sendCriticalError(fmt.Errorf("failed to execute transactions: %w", err)) return types.State{}, fmt.Errorf("failed to execute transactions: %w", err) From d04ef7aa9cbd49e343ababf1b25957658e8343a9 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Wed, 18 Feb 2026 17:58:34 +0100 Subject: [PATCH 06/14] y --- block/internal/executing/executor.go | 10 ++++- proto/evnode/v1/evnode.proto | 6 +++ types/data.go | 3 ++ types/hashing.go | 18 +++++++++ types/header.go | 4 ++ types/p2p_envelope.go | 30 ++++++++++++++- types/pb/evnode/v1/evnode.pb.go | 56 ++++++++++++++++++++-------- 7 files changed, 108 insertions(+), 19 deletions(-) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 499969ef5..279e2df46 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -631,10 +631,16 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { // IMPORTANT: Header MUST be broadcast before data — the P2P layer validates // incoming data against the current and previous header, so out-of-order // delivery would cause validation failures on peers. - if err := e.headerBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PSignedHeader{SignedHeader: header}); err != nil { + if err := e.headerBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PSignedHeader{ + SignedHeader: header, + PrevHeaderHash: e.lastHeaderHash, + }); err != nil { e.logger.Error().Err(err).Msg("failed to broadcast header") } - if err := e.dataBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PData{Data: data}); err != nil { + if err := e.dataBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PData{ + Data: data, + PrevDataHash: e.lastDataHash, + }); err != nil { e.logger.Error().Err(err).Msg("failed to broadcast data") } diff --git a/proto/evnode/v1/evnode.proto b/proto/evnode/v1/evnode.proto index e60bd56e0..2f355f92e 100644 --- a/proto/evnode/v1/evnode.proto +++ b/proto/evnode/v1/evnode.proto @@ -113,6 +113,9 @@ message P2PSignedHeader { bytes signature = 2; Signer signer = 3; optional uint64 da_height_hint = 4; + // Pre-computed hash of the previous block's header (block N-1). + // Allows peers to skip re-computing trusted.Hash() during Verify. + optional bytes prev_header_hash = 5; } // P2PData @@ -120,4 +123,7 @@ message P2PData { Metadata metadata = 1; repeated bytes txs = 2; optional uint64 da_height_hint = 3; + // Pre-computed hash of the previous block's data (block N-1). + // Allows peers to skip re-computing trusted.Hash() during Verify. + optional bytes prev_data_hash = 4; } diff --git a/types/data.go b/types/data.go index 57e31046c..b6fe7df00 100644 --- a/types/data.go +++ b/types/data.go @@ -33,6 +33,9 @@ type Metadata struct { type Data struct { *Metadata Txs Txs + + // cachedHash stores a pre-computed hash to avoid repeated serialization+SHA256. + cachedHash Hash } // SignedData combines Data and its signature. diff --git a/types/hashing.go b/types/hashing.go index 4a7e88940..b668a8bf7 100644 --- a/types/hashing.go +++ b/types/hashing.go @@ -47,6 +47,10 @@ func (h *Header) Hash() Hash { return nil } + if len(h.cachedHash) > 0 { + return h.cachedHash + } + slimHash, err := h.HashSlim() if err != nil { return nil @@ -62,14 +66,28 @@ func (h *Header) Hash() Hash { return slimHash } +// SetCachedHash sets a pre-computed hash on the header, bypassing recomputation. +// Use when the hash is already known (e.g. from a P2P envelope or producer cache). +func (h *Header) SetCachedHash(hash Hash) { + h.cachedHash = hash +} + // Hash returns hash of the Data func (d *Data) Hash() Hash { + if len(d.cachedHash) > 0 { + return d.cachedHash + } // Ignoring the marshal error for now to satisfy the go-header interface // Later on the usage of Hash should be replaced with DA commitment dBytes, _ := d.MarshalBinary() return leafHashOpt(sha256.New(), dBytes) } +// SetCachedHash sets a pre-computed hash on the data, bypassing recomputation. +func (d *Data) SetCachedHash(hash Hash) { + d.cachedHash = hash +} + // DACommitment returns the DA commitment of the Data excluding the Metadata func (d *Data) DACommitment() Hash { // Prune the Data to only include the Txs diff --git a/types/header.go b/types/header.go index 5ca2ee83e..4819cf1e4 100644 --- a/types/header.go +++ b/types/header.go @@ -82,6 +82,10 @@ type Header struct { // representation but may still be required for backwards compatible binary // serialization (e.g. legacy signing payloads). Legacy *LegacyHeaderFields + + // cachedHash stores a pre-computed hash to avoid repeated serialization+SHA256. + // Populated lazily by Hash() or explicitly by SetCachedHash(). + cachedHash Hash } // New creates a new Header. diff --git a/types/p2p_envelope.go b/types/p2p_envelope.go index d5c5957ee..768882234 100644 --- a/types/p2p_envelope.go +++ b/types/p2p_envelope.go @@ -15,9 +15,12 @@ var ( ) // P2PSignedHeader wraps SignedHeader with an optional DA height hint for P2P sync optimization. +// PrevHeaderHash carries the producer's pre-computed hash of block N-1's header, +// allowing peers to skip re-computing trusted.Hash() during Verify. type P2PSignedHeader struct { *SignedHeader - DAHeightHint uint64 + DAHeightHint uint64 + PrevHeaderHash Hash } // New creates a new P2PSignedHeader. @@ -41,7 +44,12 @@ func (p *P2PSignedHeader) DAHint() uint64 { } // Verify verifies against an untrusted header. +// If the untrusted header carries a PrevHeaderHash (from the producer), +// pre-populate the trusted header's hash cache to skip re-computation. func (p *P2PSignedHeader) Verify(untrusted *P2PSignedHeader) error { + if len(untrusted.PrevHeaderHash) > 0 { + p.Header.SetCachedHash(untrusted.PrevHeaderHash) + } return p.SignedHeader.Verify(untrusted.SignedHeader) } @@ -57,6 +65,9 @@ func (p *P2PSignedHeader) MarshalBinary() ([]byte, error) { Signer: psh.Signer, DaHeightHint: &p.DAHeightHint, } + if len(p.PrevHeaderHash) > 0 { + msg.PrevHeaderHash = p.PrevHeaderHash + } return proto.Marshal(msg) } @@ -80,13 +91,19 @@ func (p *P2PSignedHeader) UnmarshalBinary(data []byte) error { if msg.DaHeightHint != nil { p.DAHeightHint = *msg.DaHeightHint } + if len(msg.PrevHeaderHash) > 0 { + p.PrevHeaderHash = msg.PrevHeaderHash + } return nil } // P2PData wraps Data with an optional DA height hint for P2P sync optimization. +// PrevDataHash carries the producer's pre-computed hash of block N-1's data, +// allowing peers to skip re-computing trusted.Hash() during Verify. type P2PData struct { *Data DAHeightHint uint64 + PrevDataHash Hash } // New creates a new P2PData. @@ -110,7 +127,12 @@ func (p *P2PData) DAHint() uint64 { } // Verify verifies against untrusted data. +// If the untrusted data carries a PrevDataHash (from the producer), +// pre-populate the trusted data's hash cache to skip re-computation. func (p *P2PData) Verify(untrusted *P2PData) error { + if len(untrusted.PrevDataHash) > 0 { + p.Data.SetCachedHash(untrusted.PrevDataHash) + } return p.Data.Verify(untrusted.Data) } @@ -152,6 +174,9 @@ func (p *P2PData) MarshalBinary() ([]byte, error) { Txs: pData.Txs, DaHeightHint: &p.DAHeightHint, } + if len(p.PrevDataHash) > 0 { + msg.PrevDataHash = p.PrevDataHash + } return proto.Marshal(msg) } @@ -174,5 +199,8 @@ func (p *P2PData) UnmarshalBinary(data []byte) error { if msg.DaHeightHint != nil { p.DAHeightHint = *msg.DaHeightHint } + if len(msg.PrevDataHash) > 0 { + p.PrevDataHash = msg.PrevDataHash + } return nil } diff --git a/types/pb/evnode/v1/evnode.pb.go b/types/pb/evnode/v1/evnode.pb.go index b0a866e76..7bc89d7ae 100644 --- a/types/pb/evnode/v1/evnode.pb.go +++ b/types/pb/evnode/v1/evnode.pb.go @@ -657,13 +657,16 @@ func (x *Vote) GetValidatorAddress() []byte { // P2PSignedHeader type P2PSignedHeader struct { - state protoimpl.MessageState `protogen:"open.v1"` - Header *Header `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` - Signature []byte `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"` - Signer *Signer `protobuf:"bytes,3,opt,name=signer,proto3" json:"signer,omitempty"` - DaHeightHint *uint64 `protobuf:"varint,4,opt,name=da_height_hint,json=daHeightHint,proto3,oneof" json:"da_height_hint,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Header *Header `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` + Signature []byte `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"` + Signer *Signer `protobuf:"bytes,3,opt,name=signer,proto3" json:"signer,omitempty"` + DaHeightHint *uint64 `protobuf:"varint,4,opt,name=da_height_hint,json=daHeightHint,proto3,oneof" json:"da_height_hint,omitempty"` + // Pre-computed hash of the previous block's header (block N-1). + // Allows peers to skip re-computing trusted.Hash() during Verify. + PrevHeaderHash []byte `protobuf:"bytes,5,opt,name=prev_header_hash,json=prevHeaderHash,proto3,oneof" json:"prev_header_hash,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *P2PSignedHeader) Reset() { @@ -724,12 +727,22 @@ func (x *P2PSignedHeader) GetDaHeightHint() uint64 { return 0 } +func (x *P2PSignedHeader) GetPrevHeaderHash() []byte { + if x != nil { + return x.PrevHeaderHash + } + return nil +} + // P2PData type P2PData struct { - state protoimpl.MessageState `protogen:"open.v1"` - Metadata *Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"` - Txs [][]byte `protobuf:"bytes,2,rep,name=txs,proto3" json:"txs,omitempty"` - DaHeightHint *uint64 `protobuf:"varint,3,opt,name=da_height_hint,json=daHeightHint,proto3,oneof" json:"da_height_hint,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + Metadata *Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"` + Txs [][]byte `protobuf:"bytes,2,rep,name=txs,proto3" json:"txs,omitempty"` + DaHeightHint *uint64 `protobuf:"varint,3,opt,name=da_height_hint,json=daHeightHint,proto3,oneof" json:"da_height_hint,omitempty"` + // Pre-computed hash of the previous block's data (block N-1). + // Allows peers to skip re-computing trusted.Hash() during Verify. + PrevDataHash []byte `protobuf:"bytes,4,opt,name=prev_data_hash,json=prevDataHash,proto3,oneof" json:"prev_data_hash,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -785,6 +798,13 @@ func (x *P2PData) GetDaHeightHint() uint64 { return 0 } +func (x *P2PData) GetPrevDataHash() []byte { + if x != nil { + return x.PrevDataHash + } + return nil +} + var File_evnode_v1_evnode_proto protoreflect.FileDescriptor const file_evnode_v1_evnode_proto_rawDesc = "" + @@ -835,18 +855,22 @@ const file_evnode_v1_evnode_proto_rawDesc = "" + "\x06height\x18\x02 \x01(\x04R\x06height\x128\n" + "\ttimestamp\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\ttimestamp\x12\"\n" + "\rblock_id_hash\x18\x04 \x01(\fR\vblockIdHash\x12+\n" + - "\x11validator_address\x18\x05 \x01(\fR\x10validatorAddress\"\xc3\x01\n" + + "\x11validator_address\x18\x05 \x01(\fR\x10validatorAddress\"\x87\x02\n" + "\x0fP2PSignedHeader\x12)\n" + "\x06header\x18\x01 \x01(\v2\x11.evnode.v1.HeaderR\x06header\x12\x1c\n" + "\tsignature\x18\x02 \x01(\fR\tsignature\x12)\n" + "\x06signer\x18\x03 \x01(\v2\x11.evnode.v1.SignerR\x06signer\x12)\n" + - "\x0eda_height_hint\x18\x04 \x01(\x04H\x00R\fdaHeightHint\x88\x01\x01B\x11\n" + - "\x0f_da_height_hint\"\x8a\x01\n" + + "\x0eda_height_hint\x18\x04 \x01(\x04H\x00R\fdaHeightHint\x88\x01\x01\x12-\n" + + "\x10prev_header_hash\x18\x05 \x01(\fH\x01R\x0eprevHeaderHash\x88\x01\x01B\x11\n" + + "\x0f_da_height_hintB\x13\n" + + "\x11_prev_header_hash\"\xc8\x01\n" + "\aP2PData\x12/\n" + "\bmetadata\x18\x01 \x01(\v2\x13.evnode.v1.MetadataR\bmetadata\x12\x10\n" + "\x03txs\x18\x02 \x03(\fR\x03txs\x12)\n" + - "\x0eda_height_hint\x18\x03 \x01(\x04H\x00R\fdaHeightHint\x88\x01\x01B\x11\n" + - "\x0f_da_height_hintB/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" + "\x0eda_height_hint\x18\x03 \x01(\x04H\x00R\fdaHeightHint\x88\x01\x01\x12)\n" + + "\x0eprev_data_hash\x18\x04 \x01(\fH\x01R\fprevDataHash\x88\x01\x01B\x11\n" + + "\x0f_da_height_hintB\x11\n" + + "\x0f_prev_data_hashB/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" var ( file_evnode_v1_evnode_proto_rawDescOnce sync.Once From eb1145ca9750be09425b71a6aa718b72644f5c3c Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Wed, 18 Feb 2026 18:25:29 +0100 Subject: [PATCH 07/14] cgo --- block/internal/executing/executor.go | 12 +++++------ pkg/store/batch.go | 11 +++++++--- pkg/store/kv.go | 15 +++++++++++--- pkg/store/tracing.go | 16 +++++++++++++++ pkg/store/tracing_test.go | 5 +++++ pkg/store/types.go | 4 ++++ types/data.go | 10 ++++++++++ types/serialization.go | 21 ++++++++++--------- types/signer.go | 30 ++++++++++++++++++++++++++-- 9 files changed, 100 insertions(+), 24 deletions(-) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 279e2df46..421da9eb1 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -608,9 +608,6 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { } e.logger.Debug().Uint64("height", newHeight).Msg("proposed block to raft") } - if err := e.deletePendingBlock(batch); err != nil { - e.logger.Warn().Err(err).Uint64("height", newHeight).Msg("failed to delete pending block metadata") - } if err := batch.Commit(); err != nil { return fmt.Errorf("failed to commit batch: %w", err) @@ -885,10 +882,11 @@ func (e *Executor) recordBlockMetrics(newState types.State, data *types.Data) { return } - e.metrics.NumTxs.Set(float64(len(data.Txs))) - e.metrics.TotalTxs.Add(float64(len(data.Txs))) - e.metrics.TxsPerBlock.Observe(float64(len(data.Txs))) - e.metrics.BlockSizeBytes.Set(float64(data.Size())) + nTxs := float64(len(data.Txs)) + e.metrics.NumTxs.Set(nTxs) + e.metrics.TotalTxs.Add(nTxs) + e.metrics.TxsPerBlock.Observe(nTxs) + e.metrics.BlockSizeBytes.Set(float64(data.TxsByteSize())) e.metrics.CommittedHeight.Set(float64(data.Metadata.Height)) } diff --git a/pkg/store/batch.go b/pkg/store/batch.go index 405119c61..e71983c64 100644 --- a/pkg/store/batch.go +++ b/pkg/store/batch.go @@ -48,9 +48,6 @@ func (b *DefaultBatch) SetHeight(height uint64) error { // SaveBlockData saves block data to the batch func (b *DefaultBatch) SaveBlockData(header *types.SignedHeader, data *types.Data, signature *types.Signature) error { - height := header.Height() - signatureHash := *signature - headerBlob, err := header.MarshalBinary() if err != nil { return fmt.Errorf("failed to marshal Header to binary: %w", err) @@ -59,6 +56,14 @@ func (b *DefaultBatch) SaveBlockData(header *types.SignedHeader, data *types.Dat if err != nil { return fmt.Errorf("failed to marshal Data to binary: %w", err) } + return b.SaveBlockDataFromBytes(header, headerBlob, dataBlob, signature) +} + +// SaveBlockDataFromBytes saves pre-serialized block data to the batch. +// This avoids re-marshalling header and data when the caller already has the binary blobs. +func (b *DefaultBatch) SaveBlockDataFromBytes(header *types.SignedHeader, headerBlob, dataBlob []byte, signature *types.Signature) error { + height := header.Height() + signatureHash := *signature if err := b.batch.Put(b.ctx, ds.NewKey(getHeaderKey(height)), headerBlob); err != nil { return fmt.Errorf("failed to put header blob in batch: %w", err) diff --git a/pkg/store/kv.go b/pkg/store/kv.go index 95cad9770..3ee23bc2c 100644 --- a/pkg/store/kv.go +++ b/pkg/store/kv.go @@ -2,7 +2,6 @@ package store import ( "context" - "path" "path/filepath" "strings" @@ -42,8 +41,18 @@ func GetPrefixEntries(ctx context.Context, store ds.Datastore, prefix string) (d // GenerateKey creates a key from a slice of string fields, joining them with slashes. func GenerateKey(fields []string) string { - key := "/" + strings.Join(fields, "/") - return path.Clean(key) + // Pre-calculate total size to avoid re-allocation. + n := 0 + for _, f := range fields { + n += 1 + len(f) // '/' + field + } + var b strings.Builder + b.Grow(n) + for _, f := range fields { + b.WriteByte('/') + b.WriteString(f) + } + return b.String() } // rootify works just like in cosmos-sdk diff --git a/pkg/store/tracing.go b/pkg/store/tracing.go index 796f30289..037507b4e 100644 --- a/pkg/store/tracing.go +++ b/pkg/store/tracing.go @@ -324,6 +324,22 @@ func (b *tracedBatch) SaveBlockData(header *types.SignedHeader, data *types.Data return nil } +func (b *tracedBatch) SaveBlockDataFromBytes(header *types.SignedHeader, headerBlob, dataBlob []byte, signature *types.Signature) error { + _, span := b.tracer.Start(b.ctx, "Batch.SaveBlockDataFromBytes", + trace.WithAttributes(attribute.Int64("height", int64(header.Height()))), + ) + defer span.End() + + err := b.inner.SaveBlockDataFromBytes(header, headerBlob, dataBlob, signature) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + func (b *tracedBatch) SetHeight(height uint64) error { _, span := b.tracer.Start(b.ctx, "Batch.SetHeight", trace.WithAttributes(attribute.Int64("height", int64(height))), diff --git a/pkg/store/tracing_test.go b/pkg/store/tracing_test.go index 1109791c7..020a31e23 100644 --- a/pkg/store/tracing_test.go +++ b/pkg/store/tracing_test.go @@ -167,6 +167,11 @@ func (b *tracingMockBatch) SaveBlockData(header *types.SignedHeader, data *types return nil } +func (b *tracingMockBatch) SaveBlockDataFromBytes(header *types.SignedHeader, _, _ []byte, signature *types.Signature) error { + // Delegate to SaveBlockData for test mocking purposes. + return b.SaveBlockData(header, nil, signature) +} + func (b *tracingMockBatch) SetHeight(height uint64) error { if b.setHeightFn != nil { return b.setHeightFn(height) diff --git a/pkg/store/types.go b/pkg/store/types.go index 106c644e7..a461623ba 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -13,6 +13,10 @@ type Batch interface { // SaveBlockData atomically saves the block header, data, and signature SaveBlockData(header *types.SignedHeader, data *types.Data, signature *types.Signature) error + // SaveBlockDataFromBytes is like SaveBlockData but accepts pre-serialized header and data bytes, + // avoiding re-marshalling when the caller already has the binary blobs. + SaveBlockDataFromBytes(header *types.SignedHeader, headerBlob, dataBlob []byte, signature *types.Signature) error + // SetHeight sets the height in the batch SetHeight(height uint64) error diff --git a/types/data.go b/types/data.go index b6fe7df00..564d9a3e2 100644 --- a/types/data.go +++ b/types/data.go @@ -155,3 +155,13 @@ func (d *Data) Validate() error { func (d *Data) Size() int { return proto.Size(d.ToProto()) } + +// TxsByteSize returns total byte size of all transactions without proto marshalling. +// This is much cheaper than Size() and useful for metrics/reporting. +func (d *Data) TxsByteSize() int { + n := 0 + for _, tx := range d.Txs { + n += len(tx) + } + return n +} diff --git a/types/serialization.go b/types/serialization.go index 0ee1ac87f..518df77f8 100644 --- a/types/serialization.go +++ b/types/serialization.go @@ -76,7 +76,7 @@ func (sh *SignedHeader) ToProto() (*pb.SignedHeader, error) { }, nil } - pubKey, err := crypto.MarshalPublicKey(sh.Signer.PubKey) + pubKey, err := sh.Signer.MarshalledPubKey() if err != nil { return nil, err } @@ -114,8 +114,9 @@ func (sh *SignedHeader) FromProto(other *pb.SignedHeader) error { return err } sh.Signer = Signer{ - Address: append([]byte(nil), other.Signer.Address...), - PubKey: pubKey, + Address: append([]byte(nil), other.Signer.Address...), + PubKey: pubKey, + marshalledPubKey: append([]byte(nil), other.Signer.PubKey...), } } else { sh.Signer = Signer{} @@ -157,7 +158,7 @@ func (sh *SignedHeader) ToDAEnvelopeProto(envelopeSignature []byte) (*pb.DAHeade }, nil } - pubKey, err := crypto.MarshalPublicKey(sh.Signer.PubKey) + pubKey, err := sh.Signer.MarshalledPubKey() if err != nil { return nil, err } @@ -196,8 +197,9 @@ func (sh *SignedHeader) FromDAEnvelopeProto(envelope *pb.DAHeaderEnvelope) error return err } sh.Signer = Signer{ - Address: append([]byte(nil), envelope.Signer.Address...), - PubKey: pubKey, + Address: append([]byte(nil), envelope.Signer.Address...), + PubKey: pubKey, + marshalledPubKey: append([]byte(nil), envelope.Signer.PubKey...), } } else { sh.Signer = Signer{} @@ -440,7 +442,7 @@ func (sd *SignedData) ToProto() (*pb.SignedData, error) { var signerProto *pb.Signer if sd.Signer.PubKey != nil { - pubKey, err := crypto.MarshalPublicKey(sd.Signer.PubKey) + pubKey, err := sd.Signer.MarshalledPubKey() if err != nil { return nil, err } @@ -481,8 +483,9 @@ func (sd *SignedData) FromProto(other *pb.SignedData) error { return err } sd.Signer = Signer{ - Address: append([]byte(nil), other.Signer.Address...), - PubKey: pubKey, + Address: append([]byte(nil), other.Signer.Address...), + PubKey: pubKey, + marshalledPubKey: append([]byte(nil), other.Signer.PubKey...), } } else { sd.Signer = Signer{} diff --git a/types/signer.go b/types/signer.go index af8498a75..fa9391209 100644 --- a/types/signer.go +++ b/types/signer.go @@ -10,6 +10,25 @@ import ( type Signer struct { PubKey crypto.PubKey Address []byte + // marshalledPubKey caches the result of crypto.MarshalPublicKey to avoid + // repeated serialization in SignedHeader.ToProto(). + marshalledPubKey []byte +} + +// MarshalledPubKey returns the marshalled public key bytes, caching the result. +func (s *Signer) MarshalledPubKey() ([]byte, error) { + if len(s.marshalledPubKey) > 0 { + return s.marshalledPubKey, nil + } + if s.PubKey == nil { + return nil, nil + } + bz, err := crypto.MarshalPublicKey(s.PubKey) + if err != nil { + return nil, err + } + s.marshalledPubKey = bz + return bz, nil } // NewSigner creates a new signer from a public key. @@ -19,10 +38,17 @@ func NewSigner(pubKey crypto.PubKey) (Signer, error) { return Signer{}, err } + // Pre-cache marshalled pub key for later use in ToProto. + marshalledPubKey, err := crypto.MarshalPublicKey(pubKey) + if err != nil { + return Signer{}, err + } + address := sha256.Sum256(bz) return Signer{ - PubKey: pubKey, - Address: address[:], + PubKey: pubKey, + Address: address[:], + marshalledPubKey: marshalledPubKey, }, nil } From 2ca152fdfbbdeb64af005436de623b7ee7b4a496 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 19 Feb 2026 09:05:53 +0100 Subject: [PATCH 08/14] z --- block/internal/executing/executor.go | 30 ++++++++++++++++++---------- pkg/store/batch.go | 12 +++++------ pkg/store/keys.go | 27 +++++++++++++++++-------- types/serialization.go | 5 ++++- types/state.go | 30 ++++++++++++++++++++++++++++ 5 files changed, 78 insertions(+), 26 deletions(-) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 421da9eb1..399f54318 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -552,15 +552,15 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { // signing the header is done after applying the block // as for signing, the state of the block may be required by the signature payload provider. // For based sequencer, this will return an empty signature. - signature, err := e.signHeader(header.Header) + signature, _, err := e.signHeader(&header.Header) if err != nil { return fmt.Errorf("failed to sign header: %w", err) } header.Signature = signature - // Structural validation only — skip the expensive signature re-verification - // (ValidateBasic) since we just signed this block ourselves. - if err := currentState.AssertValidForNextState(header, data); err != nil { + // Structural validation only — skip the expensive Validate() / DACommitment() + // re-computation since we just produced this block ourselves. + if err := currentState.AssertValidSequence(header); err != nil { e.sendCriticalError(fmt.Errorf("failed to validate block: %w", err)) e.logger.Error().Err(err).Msg("CRITICAL: Permanent block validation error - halting block production") return fmt.Errorf("failed to validate block: %w", err) @@ -620,7 +620,9 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { e.setLastState(newState) // Update last-block cache so the next CreateBlock avoids a store read. - e.lastHeaderHash = header.Hash() + // Reuse newState.LastHeaderHash (already computed by NextState) instead of + // calling header.Hash() again, which would re-marshal + re-hash. + e.lastHeaderHash = newState.LastHeaderHash e.lastDataHash = data.Hash() e.lastSignature = signature @@ -805,19 +807,25 @@ func (e *Executor) ApplyBlock(ctx context.Context, header types.Header, data *ty return newState, nil } -// signHeader signs the block header -func (e *Executor) signHeader(header types.Header) (types.Signature, error) { +// signHeader signs the block header and returns both the signature and the +// serialized header bytes (signing payload). The caller can reuse headerBytes +// in SaveBlockDataFromBytes to avoid a redundant MarshalBinary call. +func (e *Executor) signHeader(header *types.Header) (types.Signature, []byte, error) { // For based sequencer, return empty signature as there is no signer if e.signer == nil { - return types.Signature{}, nil + return types.Signature{}, nil, nil } - bz, err := e.options.AggregatorNodeSignatureBytesProvider(&header) + bz, err := e.options.AggregatorNodeSignatureBytesProvider(header) if err != nil { - return nil, fmt.Errorf("failed to get signature payload: %w", err) + return nil, nil, fmt.Errorf("failed to get signature payload: %w", err) } - return e.signer.Sign(bz) + sig, err := e.signer.Sign(bz) + if err != nil { + return nil, nil, err + } + return sig, bz, nil } // executeTxsWithRetry executes transactions with retry logic. diff --git a/pkg/store/batch.go b/pkg/store/batch.go index e71983c64..6222b85ba 100644 --- a/pkg/store/batch.go +++ b/pkg/store/batch.go @@ -43,7 +43,7 @@ func (b *DefaultBatch) SetHeight(height uint64) error { } heightBytes := encodeHeight(height) - return b.batch.Put(b.ctx, ds.NewKey(getHeightKey()), heightBytes) + return b.batch.Put(b.ctx, ds.RawKey(getHeightKey()), heightBytes) } // SaveBlockData saves block data to the batch @@ -65,19 +65,19 @@ func (b *DefaultBatch) SaveBlockDataFromBytes(header *types.SignedHeader, header height := header.Height() signatureHash := *signature - if err := b.batch.Put(b.ctx, ds.NewKey(getHeaderKey(height)), headerBlob); err != nil { + if err := b.batch.Put(b.ctx, ds.RawKey(getHeaderKey(height)), headerBlob); err != nil { return fmt.Errorf("failed to put header blob in batch: %w", err) } - if err := b.batch.Put(b.ctx, ds.NewKey(getDataKey(height)), dataBlob); err != nil { + if err := b.batch.Put(b.ctx, ds.RawKey(getDataKey(height)), dataBlob); err != nil { return fmt.Errorf("failed to put data blob in batch: %w", err) } - if err := b.batch.Put(b.ctx, ds.NewKey(getSignatureKey(height)), signatureHash[:]); err != nil { + if err := b.batch.Put(b.ctx, ds.RawKey(getSignatureKey(height)), signatureHash[:]); err != nil { return fmt.Errorf("failed to put signature blob in batch: %w", err) } headerHash := sha256.Sum256(headerBlob) heightBytes := encodeHeight(height) - if err := b.batch.Put(b.ctx, ds.NewKey(getIndexKey(headerHash[:])), heightBytes); err != nil { + if err := b.batch.Put(b.ctx, ds.RawKey(getIndexKey(headerHash[:])), heightBytes); err != nil { return fmt.Errorf("failed to put index key in batch: %w", err) } @@ -98,7 +98,7 @@ func (b *DefaultBatch) UpdateState(state types.State) error { return fmt.Errorf("failed to marshal state to protobuf: %w", err) } - return b.batch.Put(b.ctx, ds.NewKey(getStateAtHeightKey(height)), data) + return b.batch.Put(b.ctx, ds.RawKey(getStateAtHeightKey(height)), data) } // Commit commits all batched operations atomically diff --git a/pkg/store/keys.go b/pkg/store/keys.go index f2aa45d8a..02053bb84 100644 --- a/pkg/store/keys.go +++ b/pkg/store/keys.go @@ -39,29 +39,40 @@ const ( heightPrefix = "t" ) +// heightKey builds a key like "/h/123" with minimal allocation using strconv.AppendUint. +func heightKey(prefix string, height uint64) string { + // Pre-allocate: "/" + prefix + "/" + max uint64 digits (20) + buf := make([]byte, 0, 2+len(prefix)+20) + buf = append(buf, '/') + buf = append(buf, prefix...) + buf = append(buf, '/') + buf = strconv.AppendUint(buf, height, 10) + return string(buf) +} + // GetHeaderKey returns the store key for a block header at the given height. func GetHeaderKey(height uint64) string { - return GenerateKey([]string{headerPrefix, strconv.FormatUint(height, 10)}) + return heightKey(headerPrefix, height) } -func getHeaderKey(height uint64) string { return GetHeaderKey(height) } +func getHeaderKey(height uint64) string { return heightKey(headerPrefix, height) } // GetDataKey returns the store key for block data at the given height. func GetDataKey(height uint64) string { - return GenerateKey([]string{dataPrefix, strconv.FormatUint(height, 10)}) + return heightKey(dataPrefix, height) } -func getDataKey(height uint64) string { return GetDataKey(height) } +func getDataKey(height uint64) string { return heightKey(dataPrefix, height) } // GetSignatureKey returns the store key for a block signature at the given height. func GetSignatureKey(height uint64) string { - return GenerateKey([]string{signaturePrefix, strconv.FormatUint(height, 10)}) + return heightKey(signaturePrefix, height) } -func getSignatureKey(height uint64) string { return GetSignatureKey(height) } +func getSignatureKey(height uint64) string { return heightKey(signaturePrefix, height) } func getStateAtHeightKey(height uint64) string { - return GenerateKey([]string{statePrefix, strconv.FormatUint(height, 10)}) + return heightKey(statePrefix, height) } // GetMetaKey returns the store key for a metadata entry. @@ -77,7 +88,7 @@ func GetIndexKey(hash types.Hash) string { func getIndexKey(hash types.Hash) string { return GetIndexKey(hash) } func getHeightKey() string { - return GenerateKey([]string{heightPrefix}) + return "/" + heightPrefix } // GetHeightToDAHeightHeaderKey returns the metadata key for storing the DA height diff --git a/types/serialization.go b/types/serialization.go index 518df77f8..b304b61fb 100644 --- a/types/serialization.go +++ b/types/serialization.go @@ -363,6 +363,9 @@ func (d *Data) FromProto(other *pb.Data) error { // ToProto converts State into protobuf representation and returns it. func (s *State) ToProto() (*pb.State, error) { + // Avoid timestamppb.New allocation by constructing inline. + secs := s.LastBlockTime.Unix() + nanos := int32(s.LastBlockTime.Nanosecond()) return &pb.State{ Version: &pb.Version{ @@ -372,7 +375,7 @@ func (s *State) ToProto() (*pb.State, error) { ChainId: s.ChainID, InitialHeight: s.InitialHeight, LastBlockHeight: s.LastBlockHeight, - LastBlockTime: timestamppb.New(s.LastBlockTime), + LastBlockTime: ×tamppb.Timestamp{Seconds: secs, Nanos: nanos}, DaHeight: s.DAHeight, AppHash: s.AppHash[:], LastHeaderHash: s.LastHeaderHash[:], diff --git a/types/state.go b/types/state.go index bf535de70..5c2935c44 100644 --- a/types/state.go +++ b/types/state.go @@ -84,3 +84,33 @@ func (s State) AssertValidForNextState(header *SignedHeader, data *Data) error { return nil } + +// AssertValidSequence performs lightweight state-sequence validation for self-produced blocks. +// It skips the expensive Validate() call (which re-computes DACommitment by re-marshaling +// and re-hashing all transaction data) since the producer already computed the data hash +// in CreateBlock. Only cheap structural checks are performed. +func (s State) AssertValidSequence(header *SignedHeader) error { + if header.ChainID() != s.ChainID { + return fmt.Errorf("invalid chain ID - got %s, want %s", header.ChainID(), s.ChainID) + } + + if len(s.LastHeaderHash) == 0 { // initial state + return nil + } + + if expdHeight := s.LastBlockHeight + 1; header.Height() != expdHeight { + return fmt.Errorf("invalid block height - got: %d, want: %d", header.Height(), expdHeight) + } + + if headerTime := header.Time(); s.LastBlockTime.After(headerTime) { + return fmt.Errorf("invalid block time - got: %v, last: %v", headerTime, s.LastBlockTime) + } + if !bytes.Equal(header.LastHeaderHash, s.LastHeaderHash) { + return fmt.Errorf("invalid last header hash - got: %x, want: %x", header.LastHeaderHash, s.LastHeaderHash) + } + if !bytes.Equal(header.AppHash, s.AppHash) { + return fmt.Errorf("invalid last app hash - got: %x, want: %x", header.AppHash, s.AppHash) + } + + return nil +} From 71f458947741f2900585aa90ea54951a884399f2 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 19 Feb 2026 09:42:17 +0100 Subject: [PATCH 09/14] Revert proto changes and interface --- block/internal/executing/executor.go | 6 +-- proto/evnode/v1/evnode.proto | 6 --- types/data.go | 3 -- types/hashing.go | 18 --------- types/header.go | 4 -- types/p2p_envelope.go | 30 +-------------- types/pb/evnode/v1/evnode.pb.go | 56 ++++++++-------------------- 7 files changed, 19 insertions(+), 104 deletions(-) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 399f54318..643ad3a74 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -631,14 +631,12 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { // incoming data against the current and previous header, so out-of-order // delivery would cause validation failures on peers. if err := e.headerBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PSignedHeader{ - SignedHeader: header, - PrevHeaderHash: e.lastHeaderHash, + SignedHeader: header, }); err != nil { e.logger.Error().Err(err).Msg("failed to broadcast header") } if err := e.dataBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PData{ - Data: data, - PrevDataHash: e.lastDataHash, + Data: data, }); err != nil { e.logger.Error().Err(err).Msg("failed to broadcast data") } diff --git a/proto/evnode/v1/evnode.proto b/proto/evnode/v1/evnode.proto index 2f355f92e..e60bd56e0 100644 --- a/proto/evnode/v1/evnode.proto +++ b/proto/evnode/v1/evnode.proto @@ -113,9 +113,6 @@ message P2PSignedHeader { bytes signature = 2; Signer signer = 3; optional uint64 da_height_hint = 4; - // Pre-computed hash of the previous block's header (block N-1). - // Allows peers to skip re-computing trusted.Hash() during Verify. - optional bytes prev_header_hash = 5; } // P2PData @@ -123,7 +120,4 @@ message P2PData { Metadata metadata = 1; repeated bytes txs = 2; optional uint64 da_height_hint = 3; - // Pre-computed hash of the previous block's data (block N-1). - // Allows peers to skip re-computing trusted.Hash() during Verify. - optional bytes prev_data_hash = 4; } diff --git a/types/data.go b/types/data.go index 564d9a3e2..1345e4e82 100644 --- a/types/data.go +++ b/types/data.go @@ -33,9 +33,6 @@ type Metadata struct { type Data struct { *Metadata Txs Txs - - // cachedHash stores a pre-computed hash to avoid repeated serialization+SHA256. - cachedHash Hash } // SignedData combines Data and its signature. diff --git a/types/hashing.go b/types/hashing.go index b668a8bf7..4a7e88940 100644 --- a/types/hashing.go +++ b/types/hashing.go @@ -47,10 +47,6 @@ func (h *Header) Hash() Hash { return nil } - if len(h.cachedHash) > 0 { - return h.cachedHash - } - slimHash, err := h.HashSlim() if err != nil { return nil @@ -66,28 +62,14 @@ func (h *Header) Hash() Hash { return slimHash } -// SetCachedHash sets a pre-computed hash on the header, bypassing recomputation. -// Use when the hash is already known (e.g. from a P2P envelope or producer cache). -func (h *Header) SetCachedHash(hash Hash) { - h.cachedHash = hash -} - // Hash returns hash of the Data func (d *Data) Hash() Hash { - if len(d.cachedHash) > 0 { - return d.cachedHash - } // Ignoring the marshal error for now to satisfy the go-header interface // Later on the usage of Hash should be replaced with DA commitment dBytes, _ := d.MarshalBinary() return leafHashOpt(sha256.New(), dBytes) } -// SetCachedHash sets a pre-computed hash on the data, bypassing recomputation. -func (d *Data) SetCachedHash(hash Hash) { - d.cachedHash = hash -} - // DACommitment returns the DA commitment of the Data excluding the Metadata func (d *Data) DACommitment() Hash { // Prune the Data to only include the Txs diff --git a/types/header.go b/types/header.go index 4819cf1e4..5ca2ee83e 100644 --- a/types/header.go +++ b/types/header.go @@ -82,10 +82,6 @@ type Header struct { // representation but may still be required for backwards compatible binary // serialization (e.g. legacy signing payloads). Legacy *LegacyHeaderFields - - // cachedHash stores a pre-computed hash to avoid repeated serialization+SHA256. - // Populated lazily by Hash() or explicitly by SetCachedHash(). - cachedHash Hash } // New creates a new Header. diff --git a/types/p2p_envelope.go b/types/p2p_envelope.go index 768882234..d5c5957ee 100644 --- a/types/p2p_envelope.go +++ b/types/p2p_envelope.go @@ -15,12 +15,9 @@ var ( ) // P2PSignedHeader wraps SignedHeader with an optional DA height hint for P2P sync optimization. -// PrevHeaderHash carries the producer's pre-computed hash of block N-1's header, -// allowing peers to skip re-computing trusted.Hash() during Verify. type P2PSignedHeader struct { *SignedHeader - DAHeightHint uint64 - PrevHeaderHash Hash + DAHeightHint uint64 } // New creates a new P2PSignedHeader. @@ -44,12 +41,7 @@ func (p *P2PSignedHeader) DAHint() uint64 { } // Verify verifies against an untrusted header. -// If the untrusted header carries a PrevHeaderHash (from the producer), -// pre-populate the trusted header's hash cache to skip re-computation. func (p *P2PSignedHeader) Verify(untrusted *P2PSignedHeader) error { - if len(untrusted.PrevHeaderHash) > 0 { - p.Header.SetCachedHash(untrusted.PrevHeaderHash) - } return p.SignedHeader.Verify(untrusted.SignedHeader) } @@ -65,9 +57,6 @@ func (p *P2PSignedHeader) MarshalBinary() ([]byte, error) { Signer: psh.Signer, DaHeightHint: &p.DAHeightHint, } - if len(p.PrevHeaderHash) > 0 { - msg.PrevHeaderHash = p.PrevHeaderHash - } return proto.Marshal(msg) } @@ -91,19 +80,13 @@ func (p *P2PSignedHeader) UnmarshalBinary(data []byte) error { if msg.DaHeightHint != nil { p.DAHeightHint = *msg.DaHeightHint } - if len(msg.PrevHeaderHash) > 0 { - p.PrevHeaderHash = msg.PrevHeaderHash - } return nil } // P2PData wraps Data with an optional DA height hint for P2P sync optimization. -// PrevDataHash carries the producer's pre-computed hash of block N-1's data, -// allowing peers to skip re-computing trusted.Hash() during Verify. type P2PData struct { *Data DAHeightHint uint64 - PrevDataHash Hash } // New creates a new P2PData. @@ -127,12 +110,7 @@ func (p *P2PData) DAHint() uint64 { } // Verify verifies against untrusted data. -// If the untrusted data carries a PrevDataHash (from the producer), -// pre-populate the trusted data's hash cache to skip re-computation. func (p *P2PData) Verify(untrusted *P2PData) error { - if len(untrusted.PrevDataHash) > 0 { - p.Data.SetCachedHash(untrusted.PrevDataHash) - } return p.Data.Verify(untrusted.Data) } @@ -174,9 +152,6 @@ func (p *P2PData) MarshalBinary() ([]byte, error) { Txs: pData.Txs, DaHeightHint: &p.DAHeightHint, } - if len(p.PrevDataHash) > 0 { - msg.PrevDataHash = p.PrevDataHash - } return proto.Marshal(msg) } @@ -199,8 +174,5 @@ func (p *P2PData) UnmarshalBinary(data []byte) error { if msg.DaHeightHint != nil { p.DAHeightHint = *msg.DaHeightHint } - if len(msg.PrevDataHash) > 0 { - p.PrevDataHash = msg.PrevDataHash - } return nil } diff --git a/types/pb/evnode/v1/evnode.pb.go b/types/pb/evnode/v1/evnode.pb.go index 7bc89d7ae..b0a866e76 100644 --- a/types/pb/evnode/v1/evnode.pb.go +++ b/types/pb/evnode/v1/evnode.pb.go @@ -657,16 +657,13 @@ func (x *Vote) GetValidatorAddress() []byte { // P2PSignedHeader type P2PSignedHeader struct { - state protoimpl.MessageState `protogen:"open.v1"` - Header *Header `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` - Signature []byte `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"` - Signer *Signer `protobuf:"bytes,3,opt,name=signer,proto3" json:"signer,omitempty"` - DaHeightHint *uint64 `protobuf:"varint,4,opt,name=da_height_hint,json=daHeightHint,proto3,oneof" json:"da_height_hint,omitempty"` - // Pre-computed hash of the previous block's header (block N-1). - // Allows peers to skip re-computing trusted.Hash() during Verify. - PrevHeaderHash []byte `protobuf:"bytes,5,opt,name=prev_header_hash,json=prevHeaderHash,proto3,oneof" json:"prev_header_hash,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Header *Header `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` + Signature []byte `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"` + Signer *Signer `protobuf:"bytes,3,opt,name=signer,proto3" json:"signer,omitempty"` + DaHeightHint *uint64 `protobuf:"varint,4,opt,name=da_height_hint,json=daHeightHint,proto3,oneof" json:"da_height_hint,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *P2PSignedHeader) Reset() { @@ -727,22 +724,12 @@ func (x *P2PSignedHeader) GetDaHeightHint() uint64 { return 0 } -func (x *P2PSignedHeader) GetPrevHeaderHash() []byte { - if x != nil { - return x.PrevHeaderHash - } - return nil -} - // P2PData type P2PData struct { - state protoimpl.MessageState `protogen:"open.v1"` - Metadata *Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"` - Txs [][]byte `protobuf:"bytes,2,rep,name=txs,proto3" json:"txs,omitempty"` - DaHeightHint *uint64 `protobuf:"varint,3,opt,name=da_height_hint,json=daHeightHint,proto3,oneof" json:"da_height_hint,omitempty"` - // Pre-computed hash of the previous block's data (block N-1). - // Allows peers to skip re-computing trusted.Hash() during Verify. - PrevDataHash []byte `protobuf:"bytes,4,opt,name=prev_data_hash,json=prevDataHash,proto3,oneof" json:"prev_data_hash,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + Metadata *Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"` + Txs [][]byte `protobuf:"bytes,2,rep,name=txs,proto3" json:"txs,omitempty"` + DaHeightHint *uint64 `protobuf:"varint,3,opt,name=da_height_hint,json=daHeightHint,proto3,oneof" json:"da_height_hint,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -798,13 +785,6 @@ func (x *P2PData) GetDaHeightHint() uint64 { return 0 } -func (x *P2PData) GetPrevDataHash() []byte { - if x != nil { - return x.PrevDataHash - } - return nil -} - var File_evnode_v1_evnode_proto protoreflect.FileDescriptor const file_evnode_v1_evnode_proto_rawDesc = "" + @@ -855,22 +835,18 @@ const file_evnode_v1_evnode_proto_rawDesc = "" + "\x06height\x18\x02 \x01(\x04R\x06height\x128\n" + "\ttimestamp\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\ttimestamp\x12\"\n" + "\rblock_id_hash\x18\x04 \x01(\fR\vblockIdHash\x12+\n" + - "\x11validator_address\x18\x05 \x01(\fR\x10validatorAddress\"\x87\x02\n" + + "\x11validator_address\x18\x05 \x01(\fR\x10validatorAddress\"\xc3\x01\n" + "\x0fP2PSignedHeader\x12)\n" + "\x06header\x18\x01 \x01(\v2\x11.evnode.v1.HeaderR\x06header\x12\x1c\n" + "\tsignature\x18\x02 \x01(\fR\tsignature\x12)\n" + "\x06signer\x18\x03 \x01(\v2\x11.evnode.v1.SignerR\x06signer\x12)\n" + - "\x0eda_height_hint\x18\x04 \x01(\x04H\x00R\fdaHeightHint\x88\x01\x01\x12-\n" + - "\x10prev_header_hash\x18\x05 \x01(\fH\x01R\x0eprevHeaderHash\x88\x01\x01B\x11\n" + - "\x0f_da_height_hintB\x13\n" + - "\x11_prev_header_hash\"\xc8\x01\n" + + "\x0eda_height_hint\x18\x04 \x01(\x04H\x00R\fdaHeightHint\x88\x01\x01B\x11\n" + + "\x0f_da_height_hint\"\x8a\x01\n" + "\aP2PData\x12/\n" + "\bmetadata\x18\x01 \x01(\v2\x13.evnode.v1.MetadataR\bmetadata\x12\x10\n" + "\x03txs\x18\x02 \x03(\fR\x03txs\x12)\n" + - "\x0eda_height_hint\x18\x03 \x01(\x04H\x00R\fdaHeightHint\x88\x01\x01\x12)\n" + - "\x0eprev_data_hash\x18\x04 \x01(\fH\x01R\fprevDataHash\x88\x01\x01B\x11\n" + - "\x0f_da_height_hintB\x11\n" + - "\x0f_prev_data_hashB/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" + "\x0eda_height_hint\x18\x03 \x01(\x04H\x00R\fdaHeightHint\x88\x01\x01B\x11\n" + + "\x0f_da_height_hintB/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" var ( file_evnode_v1_evnode_proto_rawDescOnce sync.Once From 1f17854dadf28ac08362cf1fffd76a65523771af Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 19 Feb 2026 10:53:53 +0100 Subject: [PATCH 10/14] Remove unecessay caches --- block/internal/executing/executor.go | 52 ++++++++++++---------------- 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 643ad3a74..14bde9a03 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -63,15 +63,11 @@ type Executor struct { // in CreateBlock. Updated after each successful block production. lastHeaderHash types.Hash lastDataHash types.Hash + lastSignature types.Signature // pendingCheckCounter amortizes the expensive NumPendingHeaders/NumPendingData // checks across multiple blocks. Only checked every pendingCheckInterval blocks. pendingCheckCounter uint64 - lastSignature types.Signature - - // Cached static signer info — computed once at init, reused every block. - cachedPubKey crypto.PubKey - cachedValidatorHash types.Hash // Channels for coordination txNotifyCh chan struct{} @@ -300,27 +296,6 @@ func (e *Executor) initializeState() error { e.hasPendingBlock.Store(true) } - // Cache static signer info — computed once, reused every CreateBlock call. - if e.signer != nil { - pubKey, err := e.signer.GetPublic() - if err != nil { - return fmt.Errorf("failed to cache public key: %w", err) - } - e.cachedPubKey = pubKey - - vHash, err := e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, pubKey) - if err != nil { - return fmt.Errorf("failed to cache validator hash: %w", err) - } - e.cachedValidatorHash = vHash - } else { - vHash, err := e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, nil) - if err != nil { - return fmt.Errorf("failed to cache validator hash: %w", err) - } - e.cachedValidatorHash = vHash - } - // Warm the last-block cache so CreateBlock can avoid a store read on the // very first block after restart. if state.LastBlockHeight > 0 { @@ -720,9 +695,28 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba } } - // Use cached signer info — computed once at init, reused every block. - pubKey := e.cachedPubKey - validatorHash := e.cachedValidatorHash + // Get signer info and validator hash + var pubKey crypto.PubKey + var validatorHash types.Hash + + if e.signer != nil { + var err error + pubKey, err = e.signer.GetPublic() + if err != nil { + return nil, nil, fmt.Errorf("failed to get public key: %w", err) + } + + validatorHash, err = e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, pubKey) + if err != nil { + return nil, nil, fmt.Errorf("failed to get validator hash: %w", err) + } + } else { + var err error + validatorHash, err = e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, nil) + if err != nil { + return nil, nil, fmt.Errorf("failed to get validator hash: %w", err) + } + } // Create header header := &types.SignedHeader{ From 1bfac035ef585649b8d532cbfc9046ab7fa8b530 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 19 Feb 2026 14:11:17 +0100 Subject: [PATCH 11/14] Save pending --- block/internal/executing/executor.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 14bde9a03..f4b624249 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -505,11 +505,9 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to create block: %w", err) } - // Pending block save is intentionally omitted here. The final - // SaveBlockData in the commit batch below covers crash recovery, and - // re-execution from the sequencer is safe because ExecuteTxs is - // deterministic. Skipping the separate pending save eliminates a full - // protobuf serialization + store write per block. + if err := e.savePendingBlock(ctx, header, data); err != nil { + return fmt.Errorf("failed to save block data: %w", err) + } } if e.raftNode != nil && !e.raftNode.HasQuorum() { @@ -588,7 +586,6 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { return fmt.Errorf("failed to commit batch: %w", err) } - // Clear pending block flag after successful commit e.hasPendingBlock.Store(false) // Update in-memory state after successful commit From 08163ada0055c49dff26fa9f164fcec26b819cf8 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 19 Feb 2026 16:02:25 +0100 Subject: [PATCH 12/14] test: add 100-transaction benchmark for block production and simplify executor comments. --- block/internal/executing/executor.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index f4b624249..f89a477b4 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -290,14 +290,11 @@ func (e *Executor) initializeState() error { return fmt.Errorf("failed to migrate legacy pending block: %w", err) } - // Detect any existing pending block and set the in-memory flag so that - // ProduceBlock can skip unnecessary store lookups on the happy path. if _, err := e.store.GetMetadata(e.ctx, headerKey); err == nil { e.hasPendingBlock.Store(true) } - // Warm the last-block cache so CreateBlock can avoid a store read on the - // very first block after restart. + // Warm the last-block cache if state.LastBlockHeight > 0 { h, d, err := e.store.GetBlockData(e.ctx, state.LastBlockHeight) if err == nil { @@ -660,9 +657,7 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba currentState := e.getLastState() headerTime := uint64(e.genesis.StartTime.UnixNano()) - // Use cached last block info — populated during initializeState and updated - // after each successful block production. This avoids a store read + protobuf - // deserialization per block. + // Get last block info var lastHeaderHash types.Hash var lastDataHash types.Hash var lastSignature types.Signature From 6cf8b0d3d4c2091202618f6270c73b6e9e7138f3 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Fri, 20 Feb 2026 11:50:08 +0100 Subject: [PATCH 13/14] refactor: remove `ValidateBlock` method and simplify state validation logic. --- block/internal/executing/block_producer.go | 3 - block/internal/executing/executor.go | 27 +- .../internal/executing/executor_logic_test.go | 289 +++++++++--------- block/internal/executing/pending.go | 6 +- block/internal/executing/tracing.go | 16 - block/internal/executing/tracing_test.go | 65 ---- test/mocks/batch.go | 69 +++++ types/state.go | 26 +- 8 files changed, 233 insertions(+), 268 deletions(-) diff --git a/block/internal/executing/block_producer.go b/block/internal/executing/block_producer.go index f98c15c6a..92733cc8e 100644 --- a/block/internal/executing/block_producer.go +++ b/block/internal/executing/block_producer.go @@ -21,7 +21,4 @@ type BlockProducer interface { // ApplyBlock executes the block transactions and returns the new state. ApplyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) - - // ValidateBlock validates block structure and state transitions. - ValidateBlock(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error } diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index f89a477b4..00d7fb532 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -10,20 +10,19 @@ import ( "sync/atomic" "time" - "github.com/evstack/ev-node/pkg/raft" - "github.com/ipfs/go-datastore" - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/rs/zerolog" - "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" coreexecutor "github.com/evstack/ev-node/core/execution" coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/raft" "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" + "github.com/ipfs/go-datastore" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/rs/zerolog" ) var _ BlockProducer = (*Executor)(nil) @@ -181,6 +180,9 @@ func (e *Executor) Stop() error { e.wg.Wait() e.logger.Info().Msg("executor stopped") + if !e.hasPendingBlock.Load() { + _ = e.deletePendingBlock(context.Background()) // nolint: gocritic // not critical + } return nil } @@ -446,7 +448,7 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { // Amortized pending limit check — NumPendingHeaders/NumPendingData call // advancePastEmptyData which scans the store. Only amortize when the limit // is large enough that checking every N blocks won't overshoot. - const pendingCheckInterval uint64 = 64 + const pendingCheckInterval uint64 = 64 // arbitrary but good value if e.config.Node.MaxPendingHeadersAndData > 0 { e.pendingCheckCounter++ shouldCheck := e.config.Node.MaxPendingHeadersAndData <= pendingCheckInterval || @@ -842,19 +844,6 @@ func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, hea return nil, nil } -// ValidateBlock validates the created block. -func (e *Executor) ValidateBlock(_ context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { - // Set custom verifier for aggregator node signature - header.SetCustomVerifierForAggregator(e.options.AggregatorNodeSignatureBytesProvider) - - // Basic header validation - if err := header.ValidateBasic(); err != nil { - return fmt.Errorf("invalid header: %w", err) - } - - return lastState.AssertValidForNextState(header, data) -} - // sendCriticalError sends a critical error to the error channel without blocking func (e *Executor) sendCriticalError(err error) { if e.errorCh != nil { diff --git a/block/internal/executing/executor_logic_test.go b/block/internal/executing/executor_logic_test.go index 31a237315..7b90de0ed 100644 --- a/block/internal/executing/executor_logic_test.go +++ b/block/internal/executing/executor_logic_test.go @@ -43,177 +43,80 @@ func buildTestSigner(t *testing.T) (signerAddr []byte, tSigner types.Signer, s p } func TestProduceBlock_EmptyBatch_SetsEmptyDataHash(t *testing.T) { - ds := sync.MutexWrap(datastore.NewMapDatastore()) - memStore := store.New(ds) - - cacheManager, err := cache.NewManager(config.DefaultConfig(), memStore, zerolog.Nop()) - require.NoError(t, err) - - metrics := common.NopMetrics() - - // signer and genesis with correct proposer - addr, _, signerWrapper := buildTestSigner(t) - - cfg := config.DefaultConfig() - cfg.Node.BlockTime = config.DurationWrapper{Duration: 10 * time.Millisecond} - cfg.Node.MaxPendingHeadersAndData = 1000 - - gen := genesis.Genesis{ - ChainID: "test-chain", - InitialHeight: 1, - StartTime: time.Now().Add(-time.Second), - ProposerAddress: addr, - } - - // Use mocks for executor and sequencer - mockExec := testmocks.NewMockExecutor(t) - mockSeq := testmocks.NewMockSequencer(t) - - // Broadcasters are required by produceBlock; use generated mocks - hb := common.NewMockBroadcaster[*types.P2PSignedHeader](t) - hb.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe() - db := common.NewMockBroadcaster[*types.P2PData](t) - db.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe() - - exec, err := NewExecutor( - memStore, - mockExec, - mockSeq, - signerWrapper, - cacheManager, - metrics, - cfg, - gen, - hb, - db, - zerolog.Nop(), - common.DefaultBlockOptions(), - make(chan error, 1), - nil, - ) - require.NoError(t, err) - - // Expect InitChain to be called - initStateRoot := []byte("init_root") - mockExec.EXPECT().InitChain(mock.Anything, mock.AnythingOfType("time.Time"), gen.InitialHeight, gen.ChainID). - Return(initStateRoot, nil).Once() - mockSeq.EXPECT().SetDAHeight(uint64(0)).Return().Once() + fx := setupTestExecutor(t, 1000) + defer fx.Cancel() - // initialize state (creates genesis block in store and sets state) - require.NoError(t, exec.initializeState()) - - // Set up context for the executor (normally done in Start method) - exec.ctx, exec.cancel = context.WithCancel(context.Background()) - defer exec.cancel() - - // sequencer returns empty batch - mockSeq.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). + fx.MockSeq.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). RunAndReturn(func(ctx context.Context, req coreseq.GetNextBatchRequest) (*coreseq.GetNextBatchResponse, error) { return &coreseq.GetNextBatchResponse{Batch: &coreseq.Batch{Transactions: nil}, Timestamp: time.Now()}, nil }).Once() - // executor ExecuteTxs called with empty txs and previous state root - mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.AnythingOfType("time.Time"), initStateRoot). + fx.MockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.AnythingOfType("time.Time"), fx.InitStateRoot). Return([]byte("new_root"), nil).Once() - mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() + fx.MockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() - // produce one block - err = exec.ProduceBlock(exec.ctx) + err := fx.Exec.ProduceBlock(fx.Exec.ctx) require.NoError(t, err) - // Verify height and stored block - h, err := memStore.Height(context.Background()) + h, err := fx.MemStore.Height(context.Background()) require.NoError(t, err) assert.Equal(t, uint64(1), h) - sh, data, err := memStore.GetBlockData(context.Background(), 1) + sh, data, err := fx.MemStore.GetBlockData(context.Background(), 1) require.NoError(t, err) - // Expect empty txs and special empty data hash marker assert.Equal(t, 0, len(data.Txs)) assert.EqualValues(t, common.DataHashForEmptyTxs, sh.DataHash) - - // Broadcasters should have been called with the produced header and data - // The testify mock framework tracks calls automatically } -func TestPendingLimit_SkipsProduction(t *testing.T) { - ds := sync.MutexWrap(datastore.NewMapDatastore()) - memStore := store.New(ds) - - cacheManager, err := cache.NewManager(config.DefaultConfig(), memStore, zerolog.Nop()) - require.NoError(t, err) - - metrics := common.NopMetrics() - - addr, _, signerWrapper := buildTestSigner(t) - - cfg := config.DefaultConfig() - cfg.Node.BlockTime = config.DurationWrapper{Duration: 10 * time.Millisecond} - cfg.Node.MaxPendingHeadersAndData = 1 // low limit to trigger skip quickly - - gen := genesis.Genesis{ - ChainID: "test-chain", - InitialHeight: 1, - StartTime: time.Now().Add(-time.Second), - ProposerAddress: addr, +func TestProduceBlock_OutputPassesValidation(t *testing.T) { + specs := map[string]struct { + txs [][]byte + }{ + "empty batch": {txs: nil}, + "single tx": {txs: [][]byte{[]byte("tx1")}}, + "multi txs": {txs: [][]byte{[]byte("tx1"), []byte("tx2"), []byte("tx3")}}, + "large tx": {txs: [][]byte{make([]byte, 10000)}}, } - mockExec := testmocks.NewMockExecutor(t) - mockSeq := testmocks.NewMockSequencer(t) - hb := common.NewMockBroadcaster[*types.P2PSignedHeader](t) - hb.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe() - db := common.NewMockBroadcaster[*types.P2PData](t) - db.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe() + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + assertProduceBlockInvariantWithTxs(t, spec.txs) + }) + } +} - exec, err := NewExecutor( - memStore, - mockExec, - mockSeq, - signerWrapper, - cacheManager, - metrics, - cfg, - gen, - hb, - db, - zerolog.Nop(), - common.DefaultBlockOptions(), - make(chan error, 1), - nil, - ) - require.NoError(t, err) +func FuzzProduceBlock_OutputPassesValidation(f *testing.F) { + f.Add([]byte("tx1"), []byte("tx2")) + f.Add([]byte{}, []byte{}) + f.Add(make([]byte, 1000), make([]byte, 2000)) - mockExec.EXPECT().InitChain(mock.Anything, mock.AnythingOfType("time.Time"), gen.InitialHeight, gen.ChainID). - Return([]byte("i0"), nil).Once() - mockSeq.EXPECT().SetDAHeight(uint64(0)).Return().Once() - require.NoError(t, exec.initializeState()) + f.Fuzz(func(t *testing.T, tx1, tx2 []byte) { + txs := [][]byte{tx1, tx2} + assertProduceBlockInvariantWithTxs(t, txs) + }) +} - // Set up context for the executor (normally done in Start method) - exec.ctx, exec.cancel = context.WithCancel(context.Background()) - defer exec.cancel() +func TestPendingLimit_SkipsProduction(t *testing.T) { + fx := setupTestExecutor(t, 1) + defer fx.Cancel() - // First production should succeed - // Return empty batch again - mockSeq.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). + fx.MockSeq.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). RunAndReturn(func(ctx context.Context, req coreseq.GetNextBatchRequest) (*coreseq.GetNextBatchResponse, error) { return &coreseq.GetNextBatchResponse{Batch: &coreseq.Batch{Transactions: nil}, Timestamp: time.Now()}, nil }).Once() - // ExecuteTxs with empty - mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.AnythingOfType("time.Time"), []byte("i0")). + fx.MockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.AnythingOfType("time.Time"), fx.InitStateRoot). Return([]byte("i1"), nil).Once() - mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() + fx.MockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() - require.NoError(t, exec.ProduceBlock(exec.ctx)) - h1, err := memStore.Height(context.Background()) + require.NoError(t, fx.Exec.ProduceBlock(fx.Exec.ctx)) + h1, err := fx.MemStore.Height(context.Background()) require.NoError(t, err) assert.Equal(t, uint64(1), h1) - // With limit=1 and lastSubmitted default 0, pending >= 1 so next production should be skipped - // No new expectations; ProduceBlock should return early before hitting sequencer - require.NoError(t, exec.ProduceBlock(exec.ctx)) - h2, err := memStore.Height(context.Background()) + require.NoError(t, fx.Exec.ProduceBlock(fx.Exec.ctx)) + h2, err := fx.MemStore.Height(context.Background()) require.NoError(t, err) assert.Equal(t, h1, h2, "height should not change when production is skipped") } @@ -329,3 +232,113 @@ func TestExecutor_executeTxsWithRetry(t *testing.T) { }) } } + +type executorTestFixture struct { + MemStore store.Store + MockExec *testmocks.MockExecutor + MockSeq *testmocks.MockSequencer + Exec *Executor + Cancel context.CancelFunc + InitStateRoot []byte +} + +func setupTestExecutor(t *testing.T, pendingLimit uint64) executorTestFixture { + t.Helper() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + memStore := store.New(ds) + + cacheManager, err := cache.NewManager(config.DefaultConfig(), memStore, zerolog.Nop()) + require.NoError(t, err) + + metrics := common.NopMetrics() + + addr, _, signerWrapper := buildTestSigner(t) + + cfg := config.DefaultConfig() + cfg.Node.BlockTime = config.DurationWrapper{Duration: 10 * time.Millisecond} + cfg.Node.MaxPendingHeadersAndData = pendingLimit + + gen := genesis.Genesis{ + ChainID: "test-chain", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Second), + ProposerAddress: addr, + } + + mockExec := testmocks.NewMockExecutor(t) + mockSeq := testmocks.NewMockSequencer(t) + + hb := common.NewMockBroadcaster[*types.P2PSignedHeader](t) + hb.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe() + db := common.NewMockBroadcaster[*types.P2PData](t) + db.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe() + + exec, err := NewExecutor( + memStore, mockExec, mockSeq, signerWrapper, cacheManager, metrics, cfg, gen, hb, db, + zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), nil, + ) + require.NoError(t, err) + + initStateRoot := []byte("init_root") + mockExec.EXPECT().InitChain(mock.Anything, mock.AnythingOfType("time.Time"), gen.InitialHeight, gen.ChainID). + Return(initStateRoot, nil).Once() + mockSeq.EXPECT().SetDAHeight(uint64(0)).Return().Once() + + require.NoError(t, exec.initializeState()) + + ctx, cancel := context.WithCancel(context.Background()) + exec.ctx = ctx + + return executorTestFixture{ + MemStore: memStore, + MockExec: mockExec, + MockSeq: mockSeq, + Exec: exec, + Cancel: cancel, + InitStateRoot: initStateRoot, + } +} + +func assertProduceBlockInvariantWithTxs(t *testing.T, txs [][]byte) { + t.Helper() + fx := setupTestExecutor(t, 1000) + defer fx.Cancel() + + timestamp := time.Now().Add(time.Duration(1+len(txs)*2) * time.Millisecond) + + newRoot := make([]byte, 32) + for i, tx := range txs { + if len(tx) > 0 { + newRoot[(i+int(tx[0]))%32] ^= byte(len(tx)) + } + } + newRoot[31] ^= byte(len(txs)) + + fx.MockSeq.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")). + RunAndReturn(func(ctx context.Context, req coreseq.GetNextBatchRequest) (*coreseq.GetNextBatchResponse, error) { + return &coreseq.GetNextBatchResponse{Batch: &coreseq.Batch{Transactions: txs}, Timestamp: timestamp}, nil + }).Once() + + fx.MockExec.EXPECT().ExecuteTxs(mock.Anything, txs, uint64(1), mock.AnythingOfType("time.Time"), fx.InitStateRoot). + Return(newRoot, nil).Once() + fx.MockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once() + + prevState := fx.Exec.getLastState() + + err := fx.Exec.ProduceBlock(fx.Exec.ctx) + require.NoError(t, err) + + h, err := fx.MemStore.Height(context.Background()) + require.NoError(t, err) + assert.Equal(t, uint64(1), h) + + header, data, err := fx.MemStore.GetBlockData(context.Background(), h) + require.NoError(t, err) + + err = header.ValidateBasicWithData(data) + require.NoError(t, err, "Produced header failed ValidateBasicWithData") + + err = prevState.AssertValidForNextState(header, data) + require.NoError(t, err, "Produced block failed AssertValidForNextState") +} diff --git a/block/internal/executing/pending.go b/block/internal/executing/pending.go index dc758e558..c43601bd3 100644 --- a/block/internal/executing/pending.go +++ b/block/internal/executing/pending.go @@ -78,12 +78,12 @@ func (e *Executor) savePendingBlock(ctx context.Context, header *types.SignedHea } // deletePendingBlock removes pending block metadata -func (e *Executor) deletePendingBlock(batch store.Batch) error { - if err := batch.Delete(ds.NewKey(store.GetMetaKey(headerKey))); err != nil { +func (e *Executor) deletePendingBlock(ctx context.Context) error { + if err := e.store.DeleteMetadata(ctx, headerKey); err != nil { return fmt.Errorf("delete pending header: %w", err) } - if err := batch.Delete(ds.NewKey(store.GetMetaKey(dataKey))); err != nil { + if err := e.store.DeleteMetadata(ctx, dataKey); err != nil { return fmt.Errorf("delete pending data: %w", err) } return nil diff --git a/block/internal/executing/tracing.go b/block/internal/executing/tracing.go index 137d9170b..afc1d495a 100644 --- a/block/internal/executing/tracing.go +++ b/block/internal/executing/tracing.go @@ -103,19 +103,3 @@ func (t *tracedBlockProducer) ApplyBlock(ctx context.Context, header types.Heade ) return state, nil } - -func (t *tracedBlockProducer) ValidateBlock(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { - ctx, span := t.tracer.Start(ctx, "BlockExecutor.ValidateBlock", - trace.WithAttributes( - attribute.Int64("block.height", int64(header.Height())), - ), - ) - defer span.End() - - err := t.inner.ValidateBlock(ctx, lastState, header, data) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - } - return err -} diff --git a/block/internal/executing/tracing_test.go b/block/internal/executing/tracing_test.go index 396e1d8a4..a4865e456 100644 --- a/block/internal/executing/tracing_test.go +++ b/block/internal/executing/tracing_test.go @@ -22,7 +22,6 @@ type mockBlockProducer struct { retrieveBatchFn func(ctx context.Context) (*BatchData, error) createBlockFn func(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) applyBlockFn func(ctx context.Context, header types.Header, data *types.Data) (types.State, error) - validateBlockFn func(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error } func (m *mockBlockProducer) ProduceBlock(ctx context.Context) error { @@ -53,13 +52,6 @@ func (m *mockBlockProducer) ApplyBlock(ctx context.Context, header types.Header, return types.State{}, nil } -func (m *mockBlockProducer) ValidateBlock(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { - if m.validateBlockFn != nil { - return m.validateBlockFn(ctx, lastState, header, data) - } - return nil -} - func setupBlockProducerTrace(t *testing.T, inner BlockProducer) (BlockProducer, *tracetest.SpanRecorder) { t.Helper() sr := tracetest.NewSpanRecorder() @@ -264,63 +256,6 @@ func TestTracedBlockProducer_ApplyBlock_Error(t *testing.T) { require.Equal(t, "execution failed", span.Status().Description) } -func TestTracedBlockProducer_ValidateBlock_Success(t *testing.T) { - mock := &mockBlockProducer{ - validateBlockFn: func(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { - return nil - }, - } - producer, sr := setupBlockProducerTrace(t, mock) - ctx := context.Background() - - header := &types.SignedHeader{ - Header: types.Header{ - BaseHeader: types.BaseHeader{ - Height: 75, - }, - }, - } - - err := producer.ValidateBlock(ctx, types.State{}, header, &types.Data{}) - require.NoError(t, err) - - spans := sr.Ended() - require.Len(t, spans, 1) - span := spans[0] - require.Equal(t, "BlockExecutor.ValidateBlock", span.Name()) - require.Equal(t, codes.Unset, span.Status().Code) - - attrs := span.Attributes() - testutil.RequireAttribute(t, attrs, "block.height", int64(75)) -} - -func TestTracedBlockProducer_ValidateBlock_Error(t *testing.T) { - mock := &mockBlockProducer{ - validateBlockFn: func(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { - return errors.New("validation failed") - }, - } - producer, sr := setupBlockProducerTrace(t, mock) - ctx := context.Background() - - header := &types.SignedHeader{ - Header: types.Header{ - BaseHeader: types.BaseHeader{ - Height: 75, - }, - }, - } - - err := producer.ValidateBlock(ctx, types.State{}, header, &types.Data{}) - require.Error(t, err) - - spans := sr.Ended() - require.Len(t, spans, 1) - span := spans[0] - require.Equal(t, codes.Error, span.Status().Code) - require.Equal(t, "validation failed", span.Status().Description) -} - // TestTracedBlockProducer_RetrieveBatch_ErrorWithValue verifies that when the inner // function returns both a value and an error, the value is passed through (not nil). // this is important for cases like ErrNoTransactionsInBatch where valid data accompanies the error. diff --git a/test/mocks/batch.go b/test/mocks/batch.go index 90025855b..2151c3c5f 100644 --- a/test/mocks/batch.go +++ b/test/mocks/batch.go @@ -252,6 +252,75 @@ func (_c *MockBatch_SaveBlockData_Call) RunAndReturn(run func(header *types.Sign return _c } +// SaveBlockDataFromBytes provides a mock function for the type MockBatch +func (_mock *MockBatch) SaveBlockDataFromBytes(header *types.SignedHeader, headerBlob []byte, dataBlob []byte, signature *types.Signature) error { + ret := _mock.Called(header, headerBlob, dataBlob, signature) + + if len(ret) == 0 { + panic("no return value specified for SaveBlockDataFromBytes") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(*types.SignedHeader, []byte, []byte, *types.Signature) error); ok { + r0 = returnFunc(header, headerBlob, dataBlob, signature) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockBatch_SaveBlockDataFromBytes_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveBlockDataFromBytes' +type MockBatch_SaveBlockDataFromBytes_Call struct { + *mock.Call +} + +// SaveBlockDataFromBytes is a helper method to define mock.On call +// - header *types.SignedHeader +// - headerBlob []byte +// - dataBlob []byte +// - signature *types.Signature +func (_e *MockBatch_Expecter) SaveBlockDataFromBytes(header interface{}, headerBlob interface{}, dataBlob interface{}, signature interface{}) *MockBatch_SaveBlockDataFromBytes_Call { + return &MockBatch_SaveBlockDataFromBytes_Call{Call: _e.mock.On("SaveBlockDataFromBytes", header, headerBlob, dataBlob, signature)} +} + +func (_c *MockBatch_SaveBlockDataFromBytes_Call) Run(run func(header *types.SignedHeader, headerBlob []byte, dataBlob []byte, signature *types.Signature)) *MockBatch_SaveBlockDataFromBytes_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 *types.SignedHeader + if args[0] != nil { + arg0 = args[0].(*types.SignedHeader) + } + var arg1 []byte + if args[1] != nil { + arg1 = args[1].([]byte) + } + var arg2 []byte + if args[2] != nil { + arg2 = args[2].([]byte) + } + var arg3 *types.Signature + if args[3] != nil { + arg3 = args[3].(*types.Signature) + } + run( + arg0, + arg1, + arg2, + arg3, + ) + }) + return _c +} + +func (_c *MockBatch_SaveBlockDataFromBytes_Call) Return(err error) *MockBatch_SaveBlockDataFromBytes_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockBatch_SaveBlockDataFromBytes_Call) RunAndReturn(run func(header *types.SignedHeader, headerBlob []byte, dataBlob []byte, signature *types.Signature) error) *MockBatch_SaveBlockDataFromBytes_Call { + _c.Call.Return(run) + return _c +} + // SetHeight provides a mock function for the type MockBatch func (_mock *MockBatch) SetHeight(height uint64) error { ret := _mock.Called(height) diff --git a/types/state.go b/types/state.go index 5c2935c44..714549a53 100644 --- a/types/state.go +++ b/types/state.go @@ -56,39 +56,17 @@ func (s *State) NextState(header Header, stateRoot []byte) (State, error) { // AssertValidForNextState performs common validation of a header and data against the current state. // It assumes any context-specific basic header checks and verifier setup have already been performed func (s State) AssertValidForNextState(header *SignedHeader, data *Data) error { - if header.ChainID() != s.ChainID { - return fmt.Errorf("invalid chain ID - got %s, want %s", header.ChainID(), s.ChainID) + if err := s.AssertValidSequence(header); err != nil { + return err } if err := Validate(header, data); err != nil { return fmt.Errorf("header-data validation failed: %w", err) } - - if len(s.LastHeaderHash) == 0 { // initial state - return nil - } - - if expdHeight := s.LastBlockHeight + 1; header.Height() != expdHeight { - return fmt.Errorf("invalid block height - got: %d, want: %d", header.Height(), expdHeight) - } - - if headerTime := header.Time(); s.LastBlockTime.After(headerTime) { - return fmt.Errorf("invalid block time - got: %v, last: %v", headerTime, s.LastBlockTime) - } - if !bytes.Equal(header.LastHeaderHash, s.LastHeaderHash) { - return fmt.Errorf("invalid last header hash - got: %x, want: %x", header.LastHeaderHash, s.LastHeaderHash) - } - if !bytes.Equal(header.AppHash, s.AppHash) { - return fmt.Errorf("invalid last app hash - got: %x, want: %x", header.AppHash, s.AppHash) - } - return nil } // AssertValidSequence performs lightweight state-sequence validation for self-produced blocks. -// It skips the expensive Validate() call (which re-computes DACommitment by re-marshaling -// and re-hashing all transaction data) since the producer already computed the data hash -// in CreateBlock. Only cheap structural checks are performed. func (s State) AssertValidSequence(header *SignedHeader) error { if header.ChainID() != s.ChainID { return fmt.Errorf("invalid chain ID - got %s, want %s", header.ChainID(), s.ChainID) From 985e9f9f940e7008264f682beccdc052df219bec Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Fri, 20 Feb 2026 13:34:16 +0100 Subject: [PATCH 14/14] Extract last block info --- block/internal/executing/executor.go | 44 ++++++++++++++++------------ 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 00d7fb532..ee9c9a9f5 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -27,6 +27,14 @@ import ( var _ BlockProducer = (*Executor)(nil) +// lastBlockInfo contains cached per-block data to avoid store reads + protobuf +// deserialization in CreateBlock. +type lastBlockInfo struct { + headerHash types.Hash + dataHash types.Hash + signature types.Signature +} + // Executor handles block production, transaction processing, and state management type Executor struct { // Core components @@ -58,14 +66,10 @@ type Executor struct { // avoiding a store lookup on every ProduceBlock call. hasPendingBlock atomic.Bool - // Cached per-block data to avoid store reads + protobuf deserialization - // in CreateBlock. Updated after each successful block production. - lastHeaderHash types.Hash - lastDataHash types.Hash - lastSignature types.Signature + // Cached per-block data + lastBlockInfo atomic.Pointer[lastBlockInfo] // pendingCheckCounter amortizes the expensive NumPendingHeaders/NumPendingData - // checks across multiple blocks. Only checked every pendingCheckInterval blocks. pendingCheckCounter uint64 // Channels for coordination @@ -300,12 +304,15 @@ func (e *Executor) initializeState() error { if state.LastBlockHeight > 0 { h, d, err := e.store.GetBlockData(e.ctx, state.LastBlockHeight) if err == nil { - e.lastHeaderHash = h.Hash() - e.lastDataHash = d.Hash() + info := &lastBlockInfo{ + headerHash: h.Hash(), + dataHash: d.Hash(), + } sig, err := e.store.GetSignature(e.ctx, state.LastBlockHeight) if err == nil { - e.lastSignature = *sig + info.signature = *sig } + e.lastBlockInfo.Store(info) } } @@ -591,11 +598,11 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { e.setLastState(newState) // Update last-block cache so the next CreateBlock avoids a store read. - // Reuse newState.LastHeaderHash (already computed by NextState) instead of - // calling header.Hash() again, which would re-marshal + re-hash. - e.lastHeaderHash = newState.LastHeaderHash - e.lastDataHash = data.Hash() - e.lastSignature = signature + e.lastBlockInfo.Store(&lastBlockInfo{ + headerHash: newState.LastHeaderHash, + dataHash: data.Hash(), + signature: signature, + }) // Broadcast header and data to P2P network sequentially. // IMPORTANT: Header MUST be broadcast before data — the P2P layer validates @@ -659,7 +666,6 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba currentState := e.getLastState() headerTime := uint64(e.genesis.StartTime.UnixNano()) - // Get last block info var lastHeaderHash types.Hash var lastDataHash types.Hash var lastSignature types.Signature @@ -667,11 +673,11 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba if height > e.genesis.InitialHeight { headerTime = uint64(batchData.UnixNano()) - if len(e.lastHeaderHash) > 0 { + if info := e.lastBlockInfo.Load(); info != nil { // Fast path: use in-memory cache - lastHeaderHash = e.lastHeaderHash - lastDataHash = e.lastDataHash - lastSignature = e.lastSignature + lastHeaderHash = info.headerHash + lastDataHash = info.dataHash + lastSignature = info.signature } else { // Cold start fallback: read from store lastHeader, lastData, err := e.store.GetBlockData(ctx, height-1)