diff --git a/CHANGELOG.md b/CHANGELOG.md index 98961e6228..227d9ff6b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Add disaster recovery for sequencer + - Catch up possible DA-only blocks when restarting. [#3057](https://github.com/evstack/ev-node/pull/3057) + - Verify DA and P2P state on restart (prevent double-signing). [#3061](https://github.com/evstack/ev-node/pull/3061) - Node pruning support. [#2984](https://github.com/evstack/ev-node/pull/2984) - Two different sort of pruning implemented: _Classic pruning_ (`all`): prunes given `HEAD-n` blocks from the databases, including store metadatas. diff --git a/apps/evm/server/force_inclusion_test.go b/apps/evm/server/force_inclusion_test.go index a1ad3059ef..21e06bc5cb 100644 --- a/apps/evm/server/force_inclusion_test.go +++ b/apps/evm/server/force_inclusion_test.go @@ -73,6 +73,10 @@ func (m *mockDA) HasForcedInclusionNamespace() bool { return true } +func (m *mockDA) GetLatestDAHeight(_ context.Context) (uint64, error) { + return 0, nil +} + func TestForceInclusionServer_handleSendRawTransaction_Success(t *testing.T) { testHeight := uint64(100) diff --git a/block/internal/da/client.go b/block/internal/da/client.go index d2e1d626e1..41617ed49e 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -299,6 +299,23 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) } } +// GetLatestDAHeight returns the latest height available on the DA layer by +// querying the network head. +func (c *client) GetLatestDAHeight(ctx context.Context) (uint64, error) { + headCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + defer cancel() + + header, err := c.headerAPI.NetworkHead(headCtx) + if err != nil { + return 0, fmt.Errorf("failed to get DA network head: %w", err) + } + if header == nil { + return 0, fmt.Errorf("DA network head returned nil header") + } + + return header.Height, nil +} + // RetrieveForcedInclusion retrieves blobs from the forced inclusion namespace at the specified height. func (c *client) RetrieveForcedInclusion(ctx context.Context, height uint64) datypes.ResultRetrieve { if !c.hasForcedNamespace { diff --git a/block/internal/da/forced_inclusion_retriever.go b/block/internal/da/forced_inclusion_retriever.go index 9b0ad5529e..a6a2253084 100644 --- a/block/internal/da/forced_inclusion_retriever.go +++ b/block/internal/da/forced_inclusion_retriever.go @@ -163,6 +163,9 @@ func (r *forcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context if result.Code == datypes.StatusNotFound { r.logger.Debug().Uint64("height", h).Msg("no forced inclusion blobs at height") + syncFetchedBlocks[h] = &BlockData{ + Timestamp: result.Timestamp, + } continue } diff --git a/block/internal/da/interface.go b/block/internal/da/interface.go index 69c2d18f7e..1e9f6cedee 100644 --- a/block/internal/da/interface.go +++ b/block/internal/da/interface.go @@ -17,6 +17,9 @@ type Client interface { // Get retrieves blobs by their IDs. Used for visualization and fetching specific blobs. Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) + // GetLatestDAHeight returns the latest height available on the DA layer.. + GetLatestDAHeight(ctx context.Context) (uint64, error) + // Namespace accessors. GetHeaderNamespace() []byte GetDataNamespace() []byte diff --git a/block/internal/da/tracing.go b/block/internal/da/tracing.go index 45fae2e863..4d946a8b74 100644 --- a/block/internal/da/tracing.go +++ b/block/internal/da/tracing.go @@ -123,6 +123,20 @@ func (t *tracedClient) Validate(ctx context.Context, ids []datypes.ID, proofs [] return res, nil } +func (t *tracedClient) GetLatestDAHeight(ctx context.Context) (uint64, error) { + ctx, span := t.tracer.Start(ctx, "DA.GetLatestDAHeight") + defer span.End() + + height, err := t.inner.GetLatestDAHeight(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return 0, err + } + span.SetAttributes(attribute.Int64("da.latest_height", int64(height))) + return height, nil +} + func (t *tracedClient) GetHeaderNamespace() []byte { return t.inner.GetHeaderNamespace() } func (t *tracedClient) GetDataNamespace() []byte { return t.inner.GetDataNamespace() } func (t *tracedClient) GetForcedInclusionNamespace() []byte { diff --git a/block/internal/da/tracing_test.go b/block/internal/da/tracing_test.go index ea01c9e425..de32532a31 100644 --- a/block/internal/da/tracing_test.go +++ b/block/internal/da/tracing_test.go @@ -54,10 +54,11 @@ func (m *mockFullClient) Validate(ctx context.Context, ids []datypes.ID, proofs } return nil, nil } -func (m *mockFullClient) GetHeaderNamespace() []byte { return []byte{0x01} } -func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} } -func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} } -func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true } +func (m *mockFullClient) GetLatestDAHeight(_ context.Context) (uint64, error) { return 0, nil } +func (m *mockFullClient) GetHeaderNamespace() []byte { return []byte{0x01} } +func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} } +func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} } +func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true } // setup a tracer provider + span recorder func setupDATrace(t *testing.T, inner FullClient) (FullClient, *tracetest.SpanRecorder) { diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index ac68f2cd85..49e8ad7e00 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -774,12 +774,15 @@ func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, hea // ValidateBlock validates the created block. func (e *Executor) ValidateBlock(_ context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error { - // Set custom verifier for aggregator node signature - header.SetCustomVerifierForAggregator(e.options.AggregatorNodeSignatureBytesProvider) - - // Basic header validation - if err := header.ValidateBasic(); err != nil { - return fmt.Errorf("invalid header: %w", err) + if e.config.Node.BasedSequencer { + if err := header.Header.ValidateBasic(); err != nil { + return fmt.Errorf("invalid header: %w", err) + } + } else { + header.SetCustomVerifierForAggregator(e.options.AggregatorNodeSignatureBytesProvider) + if err := header.ValidateBasic(); err != nil { + return fmt.Errorf("invalid header: %w", err) + } } return lastState.AssertValidForNextState(header, data) diff --git a/block/internal/syncing/block_syncer.go b/block/internal/syncing/block_syncer.go index e48dd46771..e65279a9df 100644 --- a/block/internal/syncing/block_syncer.go +++ b/block/internal/syncing/block_syncer.go @@ -21,5 +21,5 @@ type BlockSyncer interface { ValidateBlock(ctx context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error // VerifyForcedInclusionTxs verifies that forced inclusion transactions are properly handled. - VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error + VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, data *types.Data) error } diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 626eae688d..e19e763ef4 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -748,9 +748,18 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve return err } - // Verify forced inclusion transactions if configured - if event.Source == common.SourceDA { - if err := s.VerifyForcedInclusionTxs(ctx, currentState, data); err != nil { + // Verify forced inclusion transactions if configured. + // The checks is actually only performed on DA only enabled nodes, or P2P nodes catching up with the HEAD. + // P2P nodes at HEAD aren't actually able to verify forced inclusions txs as DA inclusion happens later (so DA hints are not available). This is a known limitation described in the ADR. + if event.Source == common.SourceDA || event.DaHeightHints != [2]uint64{0, 0} { + currentDAHeight := currentState.DAHeight + if event.DaHeightHints[0] > currentDAHeight { + currentDAHeight = event.DaHeightHints[0] + } else if event.DaHeightHints[1] > currentDAHeight { + currentDAHeight = event.DaHeightHints[1] + } + + if err := s.VerifyForcedInclusionTxs(ctx, currentDAHeight, data); err != nil { s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed") if errors.Is(err, errMaliciousProposer) { // remove header as da included from cache @@ -770,9 +779,49 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve // Update DA height if needed // This height is only updated when a height is processed from DA as P2P - // events do not contain DA height information + // events do not contain DA height information. + // + // When a sequencer restarts after extended downtime, it produces "catch-up" + // blocks containing forced inclusion transactions from missed DA epochs and + // submits them to DA at the current (much higher) DA height. This creates a + // gap between the state's DAHeight (tracking forced inclusion epoch progress) + // and event.DaHeight (the DA submission height). + // + // If we jump state.DAHeight directly to event.DaHeight, subsequent calls to + // VerifyForcedInclusionTxs would check the wrong epoch (the submission epoch + // instead of the next forced-inclusion epoch), causing valid catch-up blocks + // to be incorrectly flagged as malicious. + // + // To handle this, when the gap exceeds one DA epoch, we advance DAHeight by + // exactly one epoch per block. This lets the forced inclusion verifier check + // the correct epoch for each catch-up block. Once the sequencer finishes + // catching up and the gap closes, DAHeight converges to event.DaHeight. if event.DaHeight > newState.DAHeight { - newState.DAHeight = event.DaHeight + epochSize := s.genesis.DAEpochForcedInclusion + gap := event.DaHeight - newState.DAHeight + + if epochSize > 0 && gap > epochSize { + // Large gap detected — likely catch-up blocks from a restarted sequencer. + // Advance DAHeight by one epoch to keep forced inclusion verification + // aligned with the epoch the sequencer is replaying. + _, epochEnd, _ := types.CalculateEpochBoundaries( + newState.DAHeight, s.genesis.DAStartHeight, epochSize, + ) + nextEpochStart := epochEnd + 1 + if nextEpochStart > event.DaHeight { + // Shouldn't happen, but clamp to event.DaHeight as a safety net. + nextEpochStart = event.DaHeight + } + s.logger.Debug(). + Uint64("current_da_height", newState.DAHeight). + Uint64("event_da_height", event.DaHeight). + Uint64("advancing_to", nextEpochStart). + Uint64("gap", gap). + Msg("large DA height gap detected (sequencer catch-up), advancing DA height by one epoch") + newState.DAHeight = nextEpochStart + } else { + newState.DAHeight = event.DaHeight + } } batch, err := s.store.NewBatch(ctx) @@ -971,7 +1020,7 @@ func (s *Syncer) getEffectiveGracePeriod() uint64 { // Note: Due to block size constraints (MaxBytes), sequencers may defer forced inclusion transactions // to future blocks (smoothing). This is legitimate behavior within an epoch. // However, ALL forced inclusion txs from an epoch MUST be included before the next epoch begins or grace boundary (whichever comes later). -func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error { +func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, data *types.Data) error { if s.fiRetriever == nil { return nil } @@ -981,7 +1030,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type s.updateDynamicGracePeriod(blockFullness) // Retrieve forced inclusion transactions from DA for current epoch - forcedIncludedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentState.DAHeight) + forcedIncludedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(ctx, daHeight) if err != nil { if errors.Is(err, da.ErrForceInclusionNotConfigured) { s.logger.Debug().Msg("forced inclusion namespace not configured, skipping verification") @@ -1068,10 +1117,10 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type effectiveGracePeriod := s.getEffectiveGracePeriod() graceBoundary := pending.EpochEnd + (effectiveGracePeriod * s.genesis.DAEpochForcedInclusion) - if currentState.DAHeight > graceBoundary { + if daHeight > graceBoundary { maliciousTxs = append(maliciousTxs, pending) s.logger.Warn(). - Uint64("current_da_height", currentState.DAHeight). + Uint64("current_da_height", daHeight). Uint64("epoch_end", pending.EpochEnd). Uint64("grace_boundary", graceBoundary). Uint64("base_grace_periods", s.gracePeriodConfig.basePeriod). @@ -1081,7 +1130,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type Msg("forced inclusion transaction past grace boundary - marking as malicious") } else { remainingPending = append(remainingPending, pending) - if currentState.DAHeight > pending.EpochEnd { + if daHeight > pending.EpochEnd { txsInGracePeriod++ } } @@ -1105,7 +1154,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type effectiveGracePeriod := s.getEffectiveGracePeriod() s.logger.Error(). Uint64("height", data.Height()). - Uint64("current_da_height", currentState.DAHeight). + Uint64("current_da_height", daHeight). Int("malicious_count", len(maliciousTxs)). Uint64("base_grace_periods", s.gracePeriodConfig.basePeriod). Uint64("effective_grace_periods", effectiveGracePeriod). @@ -1125,7 +1174,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type s.logger.Info(). Uint64("height", data.Height()). - Uint64("da_height", currentState.DAHeight). + Uint64("da_height", daHeight). Uint64("epoch_start", forcedIncludedTxsEvent.StartDaHeight). Uint64("epoch_end", forcedIncludedTxsEvent.EndDaHeight). Int("included_count", includedCount). diff --git a/block/internal/syncing/syncer_forced_inclusion_test.go b/block/internal/syncing/syncer_forced_inclusion_test.go index 3c03992fce..6cb34901c2 100644 --- a/block/internal/syncing/syncer_forced_inclusion_test.go +++ b/block/internal/syncing/syncer_forced_inclusion_test.go @@ -426,11 +426,8 @@ func TestVerifyForcedInclusionTxs_AllTransactionsIncluded(t *testing.T) { data := makeData(gen.ChainID, 1, 1) data.Txs[0] = types.Tx(dataBin) - currentState := s.getLastState() - currentState.DAHeight = 0 - // Verify - should pass since all forced txs are included - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data) + err = s.VerifyForcedInclusionTxs(t.Context(), 0, data) require.NoError(t, err) } @@ -504,11 +501,8 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { data.Txs[0] = types.Tx([]byte("regular_tx_1")) data.Txs[1] = types.Tx([]byte("regular_tx_2")) - currentState := s.getLastState() - currentState.DAHeight = 0 - // Verify - should pass since forced tx blob may be legitimately deferred within the epoch - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data) + err = s.VerifyForcedInclusionTxs(t.Context(), 0, data) require.NoError(t, err) // Mock DA for next epoch to return no forced inclusion transactions @@ -517,11 +511,10 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { }).Once() // Move to next epoch but still within grace period - currentState.DAHeight = 1 // Move to epoch end (epoch was [0, 0]) data2 := makeData(gen.ChainID, 2, 1) data2.Txs[0] = []byte("regular_tx_3") - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data2) + err = s.VerifyForcedInclusionTxs(t.Context(), 1, data2) require.NoError(t, err) // Should pass since DAHeight=1 equals grace boundary, not past it // Mock DA for height 2 to return no forced inclusion transactions @@ -530,11 +523,10 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { }).Once() // Now move past grace boundary - should fail if tx still not included - currentState.DAHeight = 2 // Move past grace boundary (graceBoundary = 0 + 1*1 = 1) data3 := makeData(gen.ChainID, 3, 1) data3.Txs[0] = types.Tx([]byte("regular_tx_4")) - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data3) + err = s.VerifyForcedInclusionTxs(t.Context(), 2, data3) require.Error(t, err) require.Contains(t, err.Error(), "sequencer is malicious") require.Contains(t, err.Error(), "past grace boundary") @@ -611,11 +603,8 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { data.Txs[1] = types.Tx([]byte("regular_tx")) // dataBin2 is missing - currentState := s.getLastState() - currentState.DAHeight = 0 - // Verify - should pass since dataBin2 may be legitimately deferred within the epoch - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data) + err = s.VerifyForcedInclusionTxs(t.Context(), 0, data) require.NoError(t, err) // Mock DA for next epoch to return no forced inclusion transactions @@ -624,12 +613,11 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { }).Once() // Move to DAHeight=1 (still within grace period since graceBoundary = 0 + 1*1 = 1) - currentState.DAHeight = 1 data2 := makeData(gen.ChainID, 2, 1) data2.Txs[0] = types.Tx([]byte("regular_tx_3")) // Verify - should pass since we're at the grace boundary, not past it - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data2) + err = s.VerifyForcedInclusionTxs(t.Context(), 1, data2) require.NoError(t, err) // Mock DA for height 2 (when we move to DAHeight 2) @@ -640,11 +628,10 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { // Now simulate moving past grace boundary - should fail if dataBin2 still not included // With basePeriod=1 and DAEpochForcedInclusion=1, graceBoundary = 0 + (1*1) = 1 // So we need DAHeight > 1 to trigger the error - currentState.DAHeight = 2 // Move past grace boundary data3 := makeData(gen.ChainID, 3, 1) data3.Txs[0] = types.Tx([]byte("regular_tx_4")) - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data3) + err = s.VerifyForcedInclusionTxs(t.Context(), 2, data3) require.Error(t, err) require.Contains(t, err.Error(), "sequencer is malicious") require.Contains(t, err.Error(), "past grace boundary") @@ -713,11 +700,8 @@ func TestVerifyForcedInclusionTxs_NoForcedTransactions(t *testing.T) { // Create block data data := makeData(gen.ChainID, 1, 2) - currentState := s.getLastState() - currentState.DAHeight = 0 - // Verify - should pass since no forced txs to verify - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data) + err = s.VerifyForcedInclusionTxs(t.Context(), 0, data) require.NoError(t, err) } @@ -778,11 +762,8 @@ func TestVerifyForcedInclusionTxs_NamespaceNotConfigured(t *testing.T) { // Create block data data := makeData(gen.ChainID, 1, 2) - currentState := s.getLastState() - currentState.DAHeight = 0 - // Verify - should pass since namespace not configured - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data) + err = s.VerifyForcedInclusionTxs(t.Context(), 0, data) require.NoError(t, err) } @@ -867,11 +848,10 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) { data1.Txs[0] = types.Tx(dataBin1) data1.Txs[1] = types.Tx([]byte("regular_tx_1")) - currentState := s.getLastState() - currentState.DAHeight = 104 + daHeight := uint64(104) // Verify - should pass since dataBin2 can be deferred within epoch - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data1) + err = s.VerifyForcedInclusionTxs(t.Context(), daHeight, data1) require.NoError(t, err) // Verify that dataBin2 is now tracked as pending @@ -900,7 +880,7 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) { data2.Txs[1] = types.Tx(dataBin2) // The deferred one we're waiting for // Verify - should pass since dataBin2 is now included and clears pending - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data2) + err = s.VerifyForcedInclusionTxs(t.Context(), daHeight, data2) require.NoError(t, err) // Verify that pending queue is now empty (dataBin2 was included) @@ -993,11 +973,8 @@ func TestVerifyForcedInclusionTxs_MaliciousAfterEpochEnd(t *testing.T) { data1 := makeData(gen.ChainID, 1, 1) data1.Txs[0] = types.Tx([]byte("regular_tx_1")) - currentState := s.getLastState() - currentState.DAHeight = 102 - // Verify - should pass, tx can be deferred within epoch - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data1) + err = s.VerifyForcedInclusionTxs(t.Context(), 102, data1) require.NoError(t, err) } @@ -1090,9 +1067,6 @@ func TestVerifyForcedInclusionTxs_SmoothingExceedsEpoch(t *testing.T) { data1.Txs[0] = types.Tx(dataBin1) data1.Txs[1] = types.Tx(dataBin2) - currentState := s.getLastState() - currentState.DAHeight = 102 // At epoch end - - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data1) + err = s.VerifyForcedInclusionTxs(t.Context(), 102 /* epoch end */, data1) require.NoError(t, err, "smoothing within epoch should be allowed") } diff --git a/block/internal/syncing/tracing.go b/block/internal/syncing/tracing.go index bc43263664..1877886d33 100644 --- a/block/internal/syncing/tracing.go +++ b/block/internal/syncing/tracing.go @@ -85,16 +85,16 @@ func (t *tracedBlockSyncer) ValidateBlock(ctx context.Context, currState types.S return err } -func (t *tracedBlockSyncer) VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error { +func (t *tracedBlockSyncer) VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, data *types.Data) error { ctx, span := t.tracer.Start(ctx, "BlockSyncer.VerifyForcedInclusion", trace.WithAttributes( attribute.Int64("block.height", int64(data.Height())), - attribute.Int64("da.height", int64(currentState.DAHeight)), + attribute.Int64("da.height", int64(daHeight)), ), ) defer span.End() - err := t.inner.VerifyForcedInclusionTxs(ctx, currentState, data) + err := t.inner.VerifyForcedInclusionTxs(ctx, daHeight, data) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) diff --git a/block/internal/syncing/tracing_test.go b/block/internal/syncing/tracing_test.go index 679f3f7a33..b49235871d 100644 --- a/block/internal/syncing/tracing_test.go +++ b/block/internal/syncing/tracing_test.go @@ -21,7 +21,7 @@ type mockBlockSyncer struct { trySyncNextBlockFn func(ctx context.Context, event *common.DAHeightEvent) error applyBlockFn func(ctx context.Context, header types.Header, data *types.Data, currentState types.State) (types.State, error) validateBlockFn func(ctx context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error - verifyForcedInclusionFn func(ctx context.Context, currentState types.State, data *types.Data) error + verifyForcedInclusionFn func(ctx context.Context, daHeight uint64, data *types.Data) error } func (m *mockBlockSyncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEvent) error { @@ -45,9 +45,9 @@ func (m *mockBlockSyncer) ValidateBlock(ctx context.Context, currState types.Sta return nil } -func (m *mockBlockSyncer) VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error { +func (m *mockBlockSyncer) VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, data *types.Data) error { if m.verifyForcedInclusionFn != nil { - return m.verifyForcedInclusionFn(ctx, currentState, data) + return m.verifyForcedInclusionFn(ctx, daHeight, data) } return nil } @@ -248,7 +248,7 @@ func TestTracedBlockSyncer_ValidateBlock_Error(t *testing.T) { func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Success(t *testing.T) { mock := &mockBlockSyncer{ - verifyForcedInclusionFn: func(ctx context.Context, currentState types.State, data *types.Data) error { + verifyForcedInclusionFn: func(ctx context.Context, daHeight uint64, data *types.Data) error { return nil }, } @@ -260,11 +260,8 @@ func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Success(t *testing.T) { Height: 100, }, } - state := types.State{ - DAHeight: 50, - } - err := syncer.VerifyForcedInclusionTxs(ctx, state, data) + err := syncer.VerifyForcedInclusionTxs(ctx, 50, data) require.NoError(t, err) spans := sr.Ended() @@ -280,7 +277,7 @@ func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Success(t *testing.T) { func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Error(t *testing.T) { mock := &mockBlockSyncer{ - verifyForcedInclusionFn: func(ctx context.Context, currentState types.State, data *types.Data) error { + verifyForcedInclusionFn: func(ctx context.Context, daHeight uint64, data *types.Data) error { return errors.New("forced inclusion verification failed") }, } @@ -292,11 +289,8 @@ func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Error(t *testing.T) { Height: 100, }, } - state := types.State{ - DAHeight: 50, - } - err := syncer.VerifyForcedInclusionTxs(ctx, state, data) + err := syncer.VerifyForcedInclusionTxs(ctx, 50, data) require.Error(t, err) spans := sr.Ended() diff --git a/docs/adr/adr-019-forced-inclusion-mechanism.md b/docs/adr/adr-019-forced-inclusion-mechanism.md index ec025222e6..cee5e72bf7 100644 --- a/docs/adr/adr-019-forced-inclusion-mechanism.md +++ b/docs/adr/adr-019-forced-inclusion-mechanism.md @@ -686,7 +686,7 @@ based_sequencer = true # Use based sequencer ### Full Node Verification Flow -1. Receive block from DA or P2P +1. Receive block from DA 2. Before applying block: a. Fetch forced inclusion txs from DA at block's DA height (epoch-based) b. Build map of transactions in block @@ -699,6 +699,8 @@ based_sequencer = true # Use based sequencer h. If txs within grace period: keep in pending queue, allow block 3. Apply block if verification passes +NOTE: P2P nodes only do not proceed to any verification. This is because DA inclusion happens later than block productions, and thus DA hints are added later to broadcasted blocks. + **Grace Period Example** (with base grace period = 1 epoch, `DAEpochForcedInclusion = 50`): - Forced tx appears in epoch ending at DA height 100 @@ -722,18 +724,6 @@ based_sequencer = true # Use based sequencer Every `DAEpochForcedInclusion` DA blocks -### Security Considerations - -1. **Malicious Proposer Detection**: Full nodes reject blocks missing forced transactions -2. **No Timing Attacks**: Epoch boundaries are deterministic, no time-based logic -3. **Blob Size Limits**: Two-tier size validation prevents DoS - - Absolute limit (1.5MB): Blobs exceeding this are permanently rejected - - Batch limit (`MaxBytes`): Ensures no batch exceeds DA submission limits -4. **Graceful Degradation**: Continues operation if forced inclusion not configured -5. **Height Validation**: Handles "height from future" errors without state corruption -6. **Transaction Preservation**: No valid transactions are lost due to size constraints -7. **Strict MaxBytes Enforcement**: Batches NEVER exceed `req.MaxBytes`, preventing DA layer rejections - **Attack Vectors**: ### Security Considerations @@ -774,11 +764,9 @@ Accepted and Implemented ### Negative 1. **Increased Latency**: Forced transactions subject to epoch boundaries -2. **DA Dependency**: Requires DA layer to support multiple namespaces +2. **DA Dependency**: Requires DA layer to be enabled on nodes for verification 3. **Higher DA Costs**: Users pay DA posting fees for forced inclusion -4. **Additional Complexity**: New component (DA Retriever) and verification logic with grace period tracking -5. **Epoch Configuration**: Requires setting `DAEpochForcedInclusion` in genesis (consensus parameter) -6. **Grace Period Adjustment**: Grace period is dynamically adjusted based on block fullness to balance censorship detection with operational reliability +4. **Epoch Configuration**: Requires setting `DAEpochForcedInclusion` in genesis (consensus parameter) ### Neutral diff --git a/pkg/config/config.go b/pkg/config/config.go index 4eab8426ac..e971b21ea4 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -257,9 +257,9 @@ func (d *DAConfig) GetForcedInclusionNamespace() string { // NodeConfig contains all Rollkit specific configuration parameters type NodeConfig struct { // Node mode configuration - Aggregator bool `yaml:"aggregator" comment:"Run node in aggregator mode"` - BasedSequencer bool `yaml:"based_sequencer" comment:"Run node with based sequencer (fetches transactions only from DA forced inclusion namespace). Requires aggregator mode to be enabled."` - Light bool `yaml:"light" comment:"Run node in light mode"` + Aggregator bool `mapstructure:"aggregator" yaml:"aggregator" comment:"Run node in aggregator mode"` + BasedSequencer bool `mapstructure:"based_sequencer" yaml:"based_sequencer" comment:"Run node with based sequencer (fetches transactions only from DA forced inclusion namespace). Requires aggregator mode to be enabled."` + Light bool `mapstructure:"light" yaml:"light" comment:"Run node in light mode"` // Block management configuration BlockTime DurationWrapper `mapstructure:"block_time" yaml:"block_time" comment:"Block time (duration). Examples: \"500ms\", \"1s\", \"5s\", \"1m\", \"2m30s\", \"10m\"."` diff --git a/pkg/sequencers/single/sequencer.go b/pkg/sequencers/single/sequencer.go index 228bde2791..ada324e127 100644 --- a/pkg/sequencers/single/sequencer.go +++ b/pkg/sequencers/single/sequencer.go @@ -21,11 +21,20 @@ import ( "github.com/evstack/ev-node/pkg/genesis" seqcommon "github.com/evstack/ev-node/pkg/sequencers/common" "github.com/evstack/ev-node/pkg/store" + "github.com/evstack/ev-node/types" ) // ErrInvalidId is returned when the chain id is invalid var ErrInvalidId = errors.New("invalid chain id") +// Catch-up state machine. The sequencer transitions through these once per +// lifecycle: unchecked → (inProgress | done). +const ( + catchUpUnchecked int32 = iota // haven't checked DA height yet + catchUpInProgress // actively replaying missed DA epochs + catchUpDone // caught up or was never behind +) + var _ coresequencer.Sequencer = (*Sequencer)(nil) // Sequencer implements core sequencing interface @@ -51,6 +60,15 @@ type Sequencer struct { // Cached forced inclusion transactions from the current epoch cachedForcedInclusionTxs [][]byte + + // catchUpState tracks the catch-up lifecycle (see constants above). + // Checked once on the first epoch fetch via updateCatchUpState. If catch-up + // is needed, transitions to catchUpInProgress until ErrHeightFromFuture + // moves it to catchUpDone. + catchUpState atomic.Int32 + // currentDAEndTime is the DA epoch end timestamp from the last fetched epoch. + // Used as the block timestamp during catch-up to match based sequencing behavior. + currentDAEndTime time.Time } // NewSequencer creates a new Single Sequencer @@ -168,6 +186,13 @@ func (c *Sequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.Submit // GetNextBatch implements sequencing.Sequencer. // It gets the next batch of transactions and fetch for forced included transactions. +// +// During catch-up mode (after sequencer downtime spanning one or more DA epochs), +// only forced inclusion transactions are returned — no mempool transactions. This +// ensures the sequencer produces blocks identical to what nodes running in base +// sequencing mode would have produced during the downtime. Once the sequencer has +// processed all missed DA epochs and reaches the DA head, it exits catch-up mode +// and resumes normal operation with both forced inclusion and mempool transactions. func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { if !c.isValid(req.Id) { return nil, ErrInvalidId @@ -208,10 +233,22 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB forcedTxs = c.cachedForcedInclusionTxs[c.checkpoint.TxIndex:] } - // Get mempool transactions from queue - mempoolBatch, err := c.queue.Next(ctx) - if err != nil { - return nil, err + // Get mempool transactions from queue, but ONLY if we're not catching up. + // During catch-up, the sequencer must produce blocks identical to what base + // sequencing would produce (forced inclusion txs only, no mempool). + var mempoolBatch *coresequencer.Batch + if c.catchUpState.Load() != catchUpInProgress { + var err error + mempoolBatch, err = c.queue.Next(ctx) + if err != nil { + return nil, err + } + } else { + mempoolBatch = &coresequencer.Batch{} + c.logger.Debug(). + Uint64("checkpoint_da_height", c.checkpoint.DAHeight). + Int("forced_txs", len(forcedTxs)). + Msg("catch-up mode: skipping mempool transactions") } // Build combined tx list for filtering @@ -318,6 +355,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB Uint64("consumed_count", forcedTxConsumedCount). Uint64("checkpoint_tx_index", c.checkpoint.TxIndex). Uint64("checkpoint_da_height", c.checkpoint.DAHeight). + Bool("catching_up", c.catchUpState.Load() == catchUpInProgress). Msg("updated checkpoint after processing forced inclusion transactions") } @@ -326,11 +364,24 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB batchTxs = append(batchTxs, validForcedTxs...) batchTxs = append(batchTxs, validMempoolTxs...) + // During catch-up, use the DA epoch end timestamp to match based sequencing behavior. + timestamp := time.Now() + if c.catchUpState.Load() == catchUpInProgress { + daEndTime := c.currentDAEndTime + if !daEndTime.IsZero() { + var remainingForcedTxs uint64 + if len(c.cachedForcedInclusionTxs) > 0 { + remainingForcedTxs = uint64(len(c.cachedForcedInclusionTxs)) - c.checkpoint.TxIndex + } + timestamp = daEndTime.Add(-time.Duration(remainingForcedTxs) * time.Millisecond) + } + } + return &coresequencer.GetNextBatchResponse{ Batch: &coresequencer.Batch{ Transactions: batchTxs, }, - Timestamp: time.Now(), + Timestamp: timestamp, BatchData: req.LastBatchData, }, nil } @@ -374,13 +425,30 @@ func (c *Sequencer) GetDAHeight() uint64 { return c.daHeight.Load() } -// fetchNextDAEpoch fetches transactions from the next DA epoch using checkpoint +// isCatchingUp returns whether the sequencer is in catch-up mode. +func (c *Sequencer) isCatchingUp() bool { + return c.catchUpState.Load() == catchUpInProgress +} + +// fetchNextDAEpoch fetches transactions from the next DA epoch using checkpoint. +// It also updates the catch-up state based on DA heights: +// - Before the first fetch, it queries GetLatestDAHeight to determine if the +// sequencer has missed more than one DA epoch. If so, catch-up mode is +// entered and only forced-inclusion blocks (no mempool) are produced. +// - If the DA height is from the future (not yet produced), the sequencer +// exits catch-up mode as it has reached the DA head. func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint64, error) { currentDAHeight := c.checkpoint.DAHeight + // Determine catch-up state before the (potentially expensive) epoch fetch. + // This is done once per sequencer lifecycle — subsequent catch-up exits are + // handled by ErrHeightFromFuture below. + c.updateCatchUpState(ctx) + c.logger.Debug(). Uint64("da_height", currentDAHeight). Uint64("tx_index", c.checkpoint.TxIndex). + Bool("catching_up", c.catchUpState.Load() == catchUpInProgress). Msg("fetching forced inclusion transactions from DA") forcedTxsEvent, err := c.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight) @@ -389,16 +457,31 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint c.logger.Debug(). Uint64("da_height", currentDAHeight). Msg("DA height from future, waiting for DA to produce block") + + // We've reached the DA head — exit catch-up mode + if c.catchUpState.Load() == catchUpInProgress { + c.logger.Info(). + Uint64("da_height", currentDAHeight). + Msg("catch-up complete: reached DA head, resuming normal sequencing") + c.catchUpState.Store(catchUpDone) + } + return 0, nil } else if errors.Is(err, block.ErrForceInclusionNotConfigured) { // Forced inclusion not configured, continue without forced txs c.cachedForcedInclusionTxs = [][]byte{} + c.catchUpState.Store(catchUpDone) return 0, nil } return 0, fmt.Errorf("failed to retrieve forced inclusion transactions: %w", err) } + // Store the DA epoch end time for timestamp usage during catch-up + if !forcedTxsEvent.Timestamp.IsZero() { + c.currentDAEndTime = forcedTxsEvent.Timestamp.UTC() + } + // Validate and filter transactions validTxs := make([][]byte, 0, len(forcedTxsEvent.Txs)) skippedTxs := 0 @@ -420,6 +503,7 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint Int("skipped_tx_count", skippedTxs). Uint64("da_height_start", forcedTxsEvent.StartDaHeight). Uint64("da_height_end", forcedTxsEvent.EndDaHeight). + Bool("catching_up", c.catchUpState.Load() == catchUpInProgress). Msg("fetched forced inclusion transactions from DA") // Cache the transactions @@ -427,3 +511,79 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint return forcedTxsEvent.EndDaHeight, nil } + +// updateCatchUpState determines whether the sequencer needs to catch up to the +// DA head by comparing the sequencer's checkpoint DA height against the latest +// DA height. +// +// The detection is purely height-based: we query GetLatestDAHeight once (on the +// first epoch fetch) and calculate how many epochs the sequencer has missed. If +// the gap exceeds one epoch, the sequencer enters catch-up mode and replays +// missed epochs with forced-inclusion transactions only (no mempool). It remains +// in catch-up until fetchNextDAEpoch hits ErrHeightFromFuture, meaning we've +// reached the DA head. +// +// This check runs exactly once per sequencer lifecycle, enforced by the +// catchUpState field: any state other than catchUpUnchecked causes an immediate +// return. If the downtime was short enough that the sequencer is still within +// the current or next epoch, no catch-up is needed and the single +// GetLatestDAHeight call is the only overhead. +func (c *Sequencer) updateCatchUpState(ctx context.Context) { + if c.catchUpState.Load() != catchUpUnchecked { + return + } + // Optimistically mark as done; overridden to catchUpInProgress below if + // catch-up is actually needed. + c.catchUpState.Store(catchUpDone) + + epochSize := c.genesis.DAEpochForcedInclusion + if epochSize == 0 { + // No epoch-based forced inclusion configured — catch-up is irrelevant. + return + } + + currentDAHeight := c.checkpoint.DAHeight + daStartHeight := c.genesis.DAStartHeight + + latestDAHeight, err := c.daClient.GetLatestDAHeight(ctx) + if err != nil { + c.logger.Warn().Err(err). + Msg("failed to get latest DA height for catch-up detection, skipping check") + return + } + + if latestDAHeight <= currentDAHeight { + // DA hasn't moved beyond our position — nothing to catch up. + c.logger.Debug(). + Uint64("checkpoint_da_height", currentDAHeight). + Uint64("latest_da_height", latestDAHeight). + Msg("sequencer is at or ahead of DA head, no catch-up needed") + return + } + + // Calculate epoch numbers for current position and DA head. + currentEpoch := types.CalculateEpochNumber(currentDAHeight, daStartHeight, epochSize) + latestEpoch := types.CalculateEpochNumber(latestDAHeight, daStartHeight, epochSize) + missedEpochs := latestEpoch - currentEpoch + + if missedEpochs <= 1 { + // Within the current or next epoch — normal operation, no catch-up. + c.logger.Debug(). + Uint64("checkpoint_da_height", currentDAHeight). + Uint64("latest_da_height", latestDAHeight). + Uint64("current_epoch", currentEpoch). + Uint64("latest_epoch", latestEpoch). + Msg("sequencer within one epoch of DA head, no catch-up needed") + return + } + + // The DA layer is more than one epoch ahead. Enter catch-up mode. + c.catchUpState.Store(catchUpInProgress) + c.logger.Warn(). + Uint64("checkpoint_da_height", currentDAHeight). + Uint64("latest_da_height", latestDAHeight). + Uint64("current_epoch", currentEpoch). + Uint64("latest_epoch", latestEpoch). + Uint64("missed_epochs", missedEpochs). + Msg("entering catch-up mode: DA layer is multiple epochs ahead, replaying missed epochs with forced inclusion txs only") +} diff --git a/pkg/sequencers/single/sequencer_test.go b/pkg/sequencers/single/sequencer_test.go index dd32edfc07..61844495d4 100644 --- a/pkg/sequencers/single/sequencer_test.go +++ b/pkg/sequencers/single/sequencer_test.go @@ -365,7 +365,7 @@ func TestSequencer_GetNextBatch_BeforeDASubmission(t *testing.T) { func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) // Create in-memory datastore db := ds.NewMapDatastore() @@ -381,6 +381,9 @@ func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 100 — same as sequencer start, no catch-up needed + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Maybe() + // Create forced inclusion txs that are 50 and 60 bytes forcedTx1 := make([]byte, 50) forcedTx2 := make([]byte, 60) @@ -455,7 +458,7 @@ func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) { func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -469,6 +472,9 @@ func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 100 — same as sequencer start, no catch-up needed + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Maybe() + // Create forced inclusion txs where combined they exceed maxBytes forcedTx1 := make([]byte, 100) forcedTx2 := make([]byte, 80) // This would be deferred @@ -535,7 +541,7 @@ func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -549,6 +555,9 @@ func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 100 — same as sequencer start, no catch-up needed + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Maybe() + // First call returns large forced txs largeForcedTx1, largeForcedTx2 := make([]byte, 75), make([]byte, 75) mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ @@ -873,7 +882,7 @@ func TestSequencer_DAFailureAndQueueThrottling_Integration(t *testing.T) { func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -887,6 +896,10 @@ func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 101 — close to sequencer start (100), no catch-up needed. + // Use Maybe() since two sequencer instances share this mock. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(101), nil).Maybe() + // Create forced inclusion txs at DA height 100 // Use sizes that all fit in one batch to test checkpoint advancing forcedTx1 := make([]byte, 50) @@ -986,6 +999,9 @@ func TestSequencer_GetNextBatch_EmptyDABatch_IncreasesDAHeight(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 100 — same as sequencer start, no catch-up needed + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Maybe() + // First DA epoch returns empty transactions mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}, @@ -1224,6 +1240,881 @@ func TestSequencer_GetNextBatch_GasFilterError(t *testing.T) { // preserves any transactions that weren't even processed yet due to maxBytes limits. // // This test uses maxBytes to limit how many txs are fetched, triggering the unprocessed txs scenario. +func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { + ctx := context.Background() + logger := zerolog.New(zerolog.NewTestWriter(t)) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is at height 105 — sequencer starts at 100 with epoch size 1, + // so it has missed 5 epochs (>1), triggering catch-up. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // DA epoch at height 100 + oldTimestamp := time.Now().Add(-10 * time.Minute) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp}, + Data: [][]byte{[]byte("forced-tx-1")}, + }).Once() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit a mempool transaction + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx-1")}}, + }) + require.NoError(t, err) + + assert.False(t, seq.isCatchingUp(), "should not be catching up initially") + + // First GetNextBatch — DA head is far ahead, should enter catch-up + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp.Batch) + + assert.True(t, seq.isCatchingUp(), "should be catching up after detecting epoch gap") + + // During catch-up, batch should contain only forced inclusion tx, no mempool tx + assert.Equal(t, 1, len(resp.Batch.Transactions), "should have only forced inclusion tx during catch-up") + assert.Equal(t, []byte("forced-tx-1"), resp.Batch.Transactions[0]) +} + +func TestSequencer_CatchUp_SkipsMempoolDuringCatchUp(t *testing.T) { + ctx := context.Background() + logger := zerolog.New(zerolog.NewTestWriter(t)) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is at 105 — sequencer starts at 100 with epoch size 1, + // so it has missed multiple epochs, triggering catch-up. + // Called once on first fetchNextDAEpoch; subsequent fetches skip the check. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // Epoch at height 100: two forced txs + oldTimestamp := time.Now().Add(-5 * time.Minute) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp}, + Data: [][]byte{[]byte("forced-1"), []byte("forced-2")}, + }).Once() + + // Epoch at height 101: one forced tx + oldTimestamp2 := time.Now().Add(-4 * time.Minute) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp2}, + Data: [][]byte{[]byte("forced-3")}, + }).Once() + + // Epoch at height 102: from the future (head reached during replay) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit several mempool transactions + for i := 0; i < 5; i++ { + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx")}}, + }) + require.NoError(t, err) + } + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // First batch (epoch 100): only forced txs + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp()) + + for _, tx := range resp1.Batch.Transactions { + assert.NotEqual(t, []byte("mempool-tx"), tx, "mempool tx should not appear during catch-up") + } + assert.Equal(t, 2, len(resp1.Batch.Transactions), "should have 2 forced txs from epoch 100") + + // Second batch (epoch 101): only forced txs + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp()) + + for _, tx := range resp2.Batch.Transactions { + assert.NotEqual(t, []byte("mempool-tx"), tx, "mempool tx should not appear during catch-up") + } + assert.Equal(t, 1, len(resp2.Batch.Transactions), "should have 1 forced tx from epoch 101") +} + +func TestSequencer_CatchUp_UsesDATimestamp(t *testing.T) { + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is at 105 — multiple epochs ahead, triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // Epoch at height 100: timestamp 5 minutes ago + epochTimestamp := time.Now().Add(-5 * time.Minute).UTC() + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: epochTimestamp}, + Data: [][]byte{[]byte("forced-tx")}, + }).Once() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp) + assert.True(t, seq.isCatchingUp(), "should be in catch-up mode") + + // During catch-up, the timestamp should be the DA epoch end time, not time.Now() + assert.Equal(t, epochTimestamp, resp.Timestamp, + "catch-up batch timestamp should match DA epoch timestamp") +} + +func TestSequencer_CatchUp_ExitsCatchUpAtDAHead(t *testing.T) { + ctx := context.Background() + logger := zerolog.New(zerolog.NewTestWriter(t)) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is at 105 — multiple epochs ahead, triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // Epoch 100: old (catch-up) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, + Data: [][]byte{[]byte("forced-old")}, + }).Once() + + // Epoch 101: fetched during catch-up, but returns HeightFromFuture to exit catch-up + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Once() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit mempool tx + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx")}}, + }) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // First batch: catch-up (old epoch 100) + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp(), "should be catching up during old epoch") + assert.Equal(t, 1, len(resp1.Batch.Transactions), "catch-up: only forced tx") + assert.Equal(t, []byte("forced-old"), resp1.Batch.Transactions[0]) + + // Second batch: epoch 101 returns HeightFromFuture — should exit catch-up + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.isCatchingUp(), "should have exited catch-up after reaching DA head") + + // Should include mempool tx now (no forced txs available) + hasMempoolTx := false + for _, tx := range resp2.Batch.Transactions { + if bytes.Equal(tx, []byte("mempool-tx")) { + hasMempoolTx = true + } + } + assert.True(t, hasMempoolTx, "should contain mempool tx after exiting catch-up") +} + +func TestSequencer_CatchUp_HeightFromFutureExitsCatchUp(t *testing.T) { + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is at 105 — multiple epochs ahead, triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // Epoch 100: success, fetched during catch-up + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, + Data: [][]byte{[]byte("forced-tx")}, + }).Once() + + // Epoch 101: from the future — DA head reached, exits catch-up + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Once() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // First call: fetches epoch 100, enters catch-up via epoch gap detection + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp()) + assert.Equal(t, 1, len(resp1.Batch.Transactions)) + + // Second call: epoch 101 is from the future, should exit catch-up + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.isCatchingUp(), "should exit catch-up when DA returns HeightFromFuture") + // No forced txs available, batch is empty + assert.Equal(t, 0, len(resp2.Batch.Transactions)) +} + +func TestSequencer_CatchUp_NoCatchUpWhenRecentEpoch(t *testing.T) { + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is at 100 — sequencer starts at 100 with epoch size 1, + // so it is within the same epoch (0 missed). No catch-up. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Once() + + // Epoch at height 100: current epoch + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, + Data: [][]byte{[]byte("forced-tx")}, + }).Once() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit a mempool tx + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx")}}, + }) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.isCatchingUp(), "should NOT be catching up when within one epoch of DA head") + + // Should have both forced and mempool txs (normal operation) + assert.Equal(t, 2, len(resp.Batch.Transactions), "should have forced + mempool tx in normal mode") +} + +func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) { + // Simulates a sequencer that missed 3 DA epochs and must replay them all + // before resuming normal operation. + ctx := context.Background() + logger := zerolog.New(zerolog.NewTestWriter(t)) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is at 106 — sequencer starts at 100 with epoch size 1, + // so it has missed 6 epochs (>1), triggering catch-up. + // Called once on first fetchNextDAEpoch. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(106), nil).Once() + + // 3 old epochs (100, 101, 102) — all with timestamps far in the past + for h := uint64(100); h <= 102; h++ { + ts := time.Now().Add(-time.Duration(103-h) * time.Minute) // older epochs further in the past + txData := []byte("forced-from-epoch-" + string(rune('0'+h-100))) + mockDA.MockClient.On("Retrieve", mock.Anything, h, forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: ts}, + Data: [][]byte{txData}, + }).Once() + } + + // Epoch 103: returns HeightFromFuture — DA head reached, exits catch-up + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(103), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Once() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit mempool txs + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-1"), []byte("mempool-2")}}, + }) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // Process the 3 old epochs — all should be catch-up (no mempool) + for i := 0; i < 3; i++ { + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp(), "should be catching up during epoch %d", 100+i) + assert.Equal(t, 1, len(resp.Batch.Transactions), + "epoch %d: should have exactly 1 forced tx", 100+i) + + for _, tx := range resp.Batch.Transactions { + assert.NotEqual(t, []byte("mempool-1"), tx, "no mempool during catch-up epoch %d", 100+i) + assert.NotEqual(t, []byte("mempool-2"), tx, "no mempool during catch-up epoch %d", 100+i) + } + } + + // DA height should have advanced through the 3 old epochs + assert.Equal(t, uint64(103), seq.GetDAHeight(), "DA height should be at 103 after replaying 3 epochs") + + // Next batch: epoch 103 returns HeightFromFuture — should exit catch-up and include mempool + resp4, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.isCatchingUp(), "should have exited catch-up at DA head") + + hasMempoolTx := false + for _, tx := range resp4.Batch.Transactions { + if bytes.Equal(tx, []byte("mempool-1")) || bytes.Equal(tx, []byte("mempool-2")) { + hasMempoolTx = true + } + } + assert.True(t, hasMempoolTx, "should include mempool txs after exiting catch-up") +} + +func TestSequencer_CatchUp_NoForcedInclusionConfigured(t *testing.T) { + // When forced inclusion is not configured, catch-up should never activate. + // GetLatestDAHeight should NOT be called because DAEpochForcedInclusion == 0 + // causes updateCatchUpState to bail out early. + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + // No forced inclusion namespace configured + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(false).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 0, // no epoch-based forced inclusion + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit mempool tx + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx")}}, + }) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.isCatchingUp(), "should never catch up when forced inclusion not configured") + assert.Equal(t, 1, len(resp.Batch.Transactions)) + assert.Equal(t, []byte("mempool-tx"), resp.Batch.Transactions[0]) +} + +func TestSequencer_CatchUp_CheckpointAdvancesDuringCatchUp(t *testing.T) { + // Verify that the checkpoint (DA epoch tracking) advances correctly during catch-up. + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is at 105 — multiple epochs ahead, triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // Epoch 100: old + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, + Data: [][]byte{[]byte("tx-a"), []byte("tx-b")}, + }).Once() + + // Epoch 101: old + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-4 * time.Minute)}, + Data: [][]byte{[]byte("tx-c")}, + }).Once() + + // Epoch 102: from the future + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Initial checkpoint + assert.Equal(t, uint64(100), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // Process epoch 100 + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.Equal(t, 2, len(resp1.Batch.Transactions)) + + // Checkpoint should advance to epoch 101 + assert.Equal(t, uint64(101), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) + assert.Equal(t, uint64(101), seq.GetDAHeight()) + + // Process epoch 101 + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.Equal(t, 1, len(resp2.Batch.Transactions)) + + // Checkpoint should advance to epoch 102 + assert.Equal(t, uint64(102), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) + assert.Equal(t, uint64(102), seq.GetDAHeight()) +} + +func TestSequencer_CatchUp_MonotonicTimestamps(t *testing.T) { + // When a single DA epoch has more forced txs than fit in one block, + // catch-up must produce strictly monotonic timestamps across the + // resulting blocks. This uses the same jitter scheme as the based + // sequencer: timestamp = DAEndTime - (remainingForcedTxs * 1ms). + ctx := context.Background() + logger := zerolog.New(zerolog.NewTestWriter(t)) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is far ahead — triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(110), nil).Once() + + // Epoch at height 100: 3 forced txs, each 100 bytes + epochTimestamp := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + tx1 := make([]byte, 100) + tx2 := make([]byte, 100) + tx3 := make([]byte, 100) + copy(tx1, "forced-tx-1") + copy(tx2, "forced-tx-2") + copy(tx3, "forced-tx-3") + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: epochTimestamp}, + Data: [][]byte{tx1, tx2, tx3}, + }).Once() + + // Epoch at height 101: single tx (to verify cross-epoch monotonicity) + epoch2Timestamp := time.Date(2025, 1, 1, 12, 0, 10, 0, time.UTC) // 10 seconds later + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: epoch2Timestamp}, + Data: [][]byte{[]byte("forced-tx-4")}, + }).Once() + + // Epoch 102: future — exits catch-up + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + // Custom executor: only 1 tx fits per block (gas-limited) + mockExec := mocks.NewMockExecutor(t) + mockExec.On("GetExecutionInfo", mock.Anything).Return(execution.ExecutionInfo{MaxGas: 1000000}, nil).Maybe() + mockExec.On("FilterTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + func(ctx context.Context, txs [][]byte, maxBytes, maxGas uint64, hasForceIncludedTransaction bool) []execution.FilterStatus { + result := make([]execution.FilterStatus, len(txs)) + // Only first tx fits, rest are postponed + for i := range result { + if i == 0 { + result[i] = execution.FilterOK + } else { + result[i] = execution.FilterPostpone + } + } + return result + }, + nil, + ).Maybe() + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + mockExec, + ) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // Produce 3 blocks from epoch 100 (1 tx each due to gas filter) + var timestamps []time.Time + for i := 0; i < 3; i++ { + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp(), "should be catching up during block %d", i) + assert.Equal(t, 1, len(resp.Batch.Transactions), "block %d: exactly 1 forced tx", i) + timestamps = append(timestamps, resp.Timestamp) + } + + // All 3 timestamps must be strictly monotonically increasing + for i := 1; i < len(timestamps); i++ { + assert.True(t, timestamps[i].After(timestamps[i-1]), + "timestamp[%d] (%v) must be strictly after timestamp[%d] (%v)", + i, timestamps[i], i-1, timestamps[i-1]) + } + + // Verify exact jitter values: + // Block 0: 3 txs total, 1 consumed → 2 remaining → T - 2ms + // Block 1: 1 consumed → 1 remaining → T - 1ms + // Block 2: 1 consumed → 0 remaining → T + assert.Equal(t, epochTimestamp.Add(-2*time.Millisecond), timestamps[0], "block 0: T - 2ms") + assert.Equal(t, epochTimestamp.Add(-1*time.Millisecond), timestamps[1], "block 1: T - 1ms") + assert.Equal(t, epochTimestamp, timestamps[2], "block 2: T (exact epoch end time)") + + // Block from epoch 101 should also be monotonically after epoch 100's last block + resp4, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp(), "should still be catching up") + assert.Equal(t, 1, len(resp4.Batch.Transactions)) + assert.True(t, resp4.Timestamp.After(timestamps[2]), + "epoch 101 timestamp (%v) must be after epoch 100 last timestamp (%v)", + resp4.Timestamp, timestamps[2]) + assert.Equal(t, epoch2Timestamp, resp4.Timestamp, "single-tx epoch gets exact DA end time") +} + +func TestSequencer_CatchUp_MonotonicTimestamps_EmptyEpoch(t *testing.T) { + // Verify that an empty DA epoch (no forced txs) still advances the + // checkpoint and updates currentDAEndTime so subsequent epochs get + // correct timestamps. + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(110), nil).Once() + + // Epoch 100: empty (no forced txs) but valid timestamp + emptyEpochTimestamp := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: emptyEpochTimestamp}, + Data: [][]byte{}, + }).Once() + + // Epoch 101: has a forced tx with a later timestamp + epoch2Timestamp := time.Date(2025, 1, 1, 12, 0, 15, 0, time.UTC) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: epoch2Timestamp}, + Data: [][]byte{[]byte("forced-tx-after-empty")}, + }).Once() + + // Epoch 102: future + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // First call processes the empty epoch 100 — empty batch, but checkpoint advances + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp()) + assert.Equal(t, 0, len(resp1.Batch.Transactions), "empty epoch should produce empty batch") + assert.Equal(t, emptyEpochTimestamp, resp1.Timestamp, + "empty epoch batch should use epoch DA end time (0 remaining)") + + // Second call processes epoch 101 — should have later timestamp + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp()) + assert.Equal(t, 1, len(resp2.Batch.Transactions)) + assert.True(t, resp2.Timestamp.After(resp1.Timestamp), + "epoch 101 timestamp (%v) must be after empty epoch 100 timestamp (%v)", + resp2.Timestamp, resp1.Timestamp) +} + func TestSequencer_GetNextBatch_GasFilteringPreservesUnprocessedTxs(t *testing.T) { db := ds.NewMapDatastore() logger := zerolog.New(zerolog.NewTestWriter(t)) diff --git a/pkg/store/cached_store.go b/pkg/store/cached_store.go index 86f81129da..8a5cca5b38 100644 --- a/pkg/store/cached_store.go +++ b/pkg/store/cached_store.go @@ -169,12 +169,6 @@ func (cs *CachedStore) PruneBlocks(ctx context.Context, height uint64) error { return nil } -// DeleteStateAtHeight removes the state entry at the given height from the underlying store. -func (cs *CachedStore) DeleteStateAtHeight(ctx context.Context, height uint64) error { - // This value is not cached, so nothing to invalidate. - return cs.Store.DeleteStateAtHeight(ctx, height) -} - // Close closes the underlying store. func (cs *CachedStore) Close() error { cs.ClearCache() diff --git a/test/e2e/evm_force_inclusion_e2e_test.go b/test/e2e/evm_force_inclusion_e2e_test.go index 261944a9ed..efec279fa1 100644 --- a/test/e2e/evm_force_inclusion_e2e_test.go +++ b/test/e2e/evm_force_inclusion_e2e_test.go @@ -10,6 +10,7 @@ import ( "net/http" "os" "path/filepath" + "syscall" "testing" "time" @@ -400,8 +401,6 @@ func setupFullNodeWithForceInclusionCheck(t *testing.T, sut *SystemUnderTest, fu // Note: This test simulates the scenario by having the sequencer configured to // listen to the wrong namespace, while we submit directly to the correct namespace. func TestEvmSyncerMaliciousSequencerForceInclusionE2E(t *testing.T) { - t.Skip() // Unskip once https://github.com/evstack/ev-node/pull/2963 is merged - sut := NewSystemUnderTest(t) workDir := t.TempDir() sequencerHome := filepath.Join(workDir, "sequencer") @@ -412,6 +411,7 @@ func TestEvmSyncerMaliciousSequencerForceInclusionE2E(t *testing.T) { t.Log("Malicious sequencer started listening to WRONG forced inclusion namespace") t.Log("NOTE: Sequencer listens to 'wrong-namespace', won't see txs on 'forced-inc'") + // TODO: disable P2P based on the limitations describes in the ADR. sequencerP2PAddress := getNodeP2PAddress(t, sut, sequencerHome, endpoints.RollkitRPCPort) t.Logf("Sequencer P2P address: %s", sequencerP2PAddress) @@ -560,3 +560,392 @@ func TestEvmSyncerMaliciousSequencerForceInclusionE2E(t *testing.T) { require.False(t, evm.CheckTxIncluded(seqClient, txForce.Hash()), "Malicious sequencer should NOT have included the forced inclusion transaction") } + +// setDAStartHeightInGenesis modifies the genesis file to set da_start_height. +// This is needed because the based sequencer requires non-zero DAStartHeight, +// and catch-up detection via CalculateEpochNumber also depends on it. +func setDAStartHeightInGenesis(t *testing.T, homeDir string, height uint64) { + t.Helper() + genesisPath := filepath.Join(homeDir, "config", "genesis.json") + data, err := os.ReadFile(genesisPath) + require.NoError(t, err) + + var genesis map[string]interface{} + err = json.Unmarshal(data, &genesis) + require.NoError(t, err) + + genesis["da_start_height"] = height + + newData, err := json.MarshalIndent(genesis, "", " ") + require.NoError(t, err) + + err = os.WriteFile(genesisPath, newData, 0644) + require.NoError(t, err) +} + +// TestEvmSequencerCatchUpBasedSequencerE2E tests that when a sequencer restarts after +// extended downtime (multiple DA epochs), it correctly enters catch-up mode, replays +// missed forced inclusion transactions from DA (matching what a based sequencer would +// produce), and then resumes normal operation. +// +// Test Flow: +// 1. a) Start sequencer +// 1. b) Start sync node (full node) +// 2. Wait for sync node to sync and send txs +// 3. a) Stop sequencer +// 3. b) Stop sync node +// 4. Restart sync node as based sequencer (reuse home directory, add --based_sequencer flag) +// 5. Send txs to force inclusion namespace +// 6. Wait for node (based sequencer) to produce one block (must contain those transactions) +// 7. Start sequencer +// 8. Verify blocks are produced and equal to based sequencer blocks +// 9. Stop based sequencer and restart as normal sync node +// 10. Verify they are in sync. +func TestEvmSequencerCatchUpBasedSequencerE2E(t *testing.T) { + sut := NewSystemUnderTest(t) + workDir := t.TempDir() + sequencerHome := filepath.Join(workDir, "sequencer") + fullNodeHome := filepath.Join(workDir, "fullnode") + + // ===== PHASE 1: Setup - Start Sequencer and Sync Node ===== + t.Log("Phase 1: Setup - Start Sequencer and Sync Node") + + jwtSecret, fullNodeJwtSecret, genesisHash, endpoints, _ := setupCommonEVMTest(t, sut, true) + + // Create passphrase and JWT secret files for sequencer + seqPassphraseFile := createPassphraseFile(t, sequencerHome) + seqJwtSecretFile := createJWTSecretFile(t, sequencerHome, jwtSecret) + + // Initialize sequencer node + output, err := sut.RunCmd(evmSingleBinaryPath, + "init", + "--evnode.node.aggregator=true", + "--evnode.signer.passphrase_file", seqPassphraseFile, + "--home", sequencerHome, + ) + require.NoError(t, err, "failed to init sequencer", output) + + // Modify genesis: enable force inclusion with epoch=2, set da_start_height=1 + enableForceInclusionInGenesis(t, sequencerHome, 2) + setDAStartHeightInGenesis(t, sequencerHome, 1) + + // Copy genesis to full node (will be used when restarting as based sequencer) + output, err = sut.RunCmd(evmSingleBinaryPath, + "init", + "--home", fullNodeHome, + ) + require.NoError(t, err, "failed to init full node", output) + MustCopyFile(t, + filepath.Join(sequencerHome, "config", "genesis.json"), + filepath.Join(fullNodeHome, "config", "genesis.json"), + ) + + // Start sequencer with forced inclusion namespace + seqProcess := sut.ExecCmd(evmSingleBinaryPath, + "start", + "--evm.jwt-secret-file", seqJwtSecretFile, + "--evm.genesis-hash", genesisHash, + "--evnode.node.block_time", DefaultBlockTime, + "--evnode.node.aggregator=true", + "--evnode.signer.passphrase_file", seqPassphraseFile, + "--home", sequencerHome, + "--evnode.da.block_time", DefaultDABlockTime, + "--evnode.da.address", endpoints.GetDAAddress(), + "--evnode.da.namespace", DefaultDANamespace, + "--evnode.da.forced_inclusion_namespace", "forced-inc", + "--evnode.rpc.address", endpoints.GetRollkitRPCListen(), + "--evnode.p2p.listen_address", endpoints.GetRollkitP2PAddress(), + "--evm.engine-url", endpoints.GetSequencerEngineURL(), + "--evm.eth-url", endpoints.GetSequencerEthURL(), + ) + sut.AwaitNodeUp(t, endpoints.GetRollkitRPCAddress(), NodeStartupTimeout) + t.Log("Sequencer is up with force inclusion enabled") + + // Get sequencer P2P address for sync node to connect to + sequencerP2PAddress := getNodeP2PAddress(t, sut, sequencerHome, endpoints.RollkitRPCPort) + t.Logf("Sequencer P2P address: %s", sequencerP2PAddress) + + // Create JWT secret file for full node + fnJwtSecretFile := createJWTSecretFile(t, fullNodeHome, fullNodeJwtSecret) + + // Start sync node (full node) - connects to sequencer via P2P + fnProcess := sut.ExecCmd(evmSingleBinaryPath, + "start", + "--evm.jwt-secret-file", fnJwtSecretFile, + "--evm.genesis-hash", genesisHash, + "--home", fullNodeHome, + "--evnode.da.block_time", DefaultDABlockTime, + "--evnode.da.address", endpoints.GetDAAddress(), + "--evnode.da.namespace", DefaultDANamespace, + "--evnode.da.forced_inclusion_namespace", "forced-inc", + "--evnode.rpc.address", endpoints.GetFullNodeRPCListen(), + "--evnode.p2p.listen_address", endpoints.GetFullNodeP2PAddress(), + "--evnode.p2p.peers", sequencerP2PAddress, + "--evm.engine-url", endpoints.GetFullNodeEngineURL(), + "--evm.eth-url", endpoints.GetFullNodeEthURL(), + ) + sut.AwaitNodeLive(t, endpoints.GetFullNodeRPCAddress(), NodeStartupTimeout) + t.Log("Sync node (full node) is up and syncing from sequencer") + + // ===== PHASE 2: Send Transactions and Wait for Sync ===== + t.Log("Phase 2: Send Transactions and Wait for Sync") + + seqClient, err := ethclient.Dial(endpoints.GetSequencerEthURL()) + require.NoError(t, err) + defer seqClient.Close() + + fnClient, err := ethclient.Dial(endpoints.GetFullNodeEthURL()) + require.NoError(t, err) + defer fnClient.Close() + + ctx := context.Background() + var nonce uint64 = 0 + + // Submit 2 normal transactions to sequencer + var normalTxHashes []common.Hash + for i := 0; i < 2; i++ { + tx := evm.GetRandomTransaction(t, TestPrivateKey, TestToAddress, DefaultChainID, DefaultGasLimit, &nonce) + err = seqClient.SendTransaction(ctx, tx) + require.NoError(t, err) + normalTxHashes = append(normalTxHashes, tx.Hash()) + t.Logf("Submitted normal tx %d: %s (nonce=%d)", i+1, tx.Hash().Hex(), tx.Nonce()) + } + + // Wait for sync node to sync the transactions + for i, txHash := range normalTxHashes { + require.Eventually(t, func() bool { + return evm.CheckTxIncluded(fnClient, txHash) + }, 20*time.Second, 500*time.Millisecond, "Normal tx %d not synced to full node", i+1) + t.Logf("Normal tx %d synced to full node", i+1) + } + + // Record heights before stopping + seqHeader, err := seqClient.HeaderByNumber(ctx, nil) + require.NoError(t, err) + fnHeader, err := fnClient.HeaderByNumber(ctx, nil) + require.NoError(t, err) + t.Logf("Sequencer height: %d, Full node height: %d", seqHeader.Number.Uint64(), fnHeader.Number.Uint64()) + + // ===== PHASE 3: Stop Sequencer and Sync Node ===== + t.Log("Phase 3: Stop Sequencer and Sync Node") + + // Stop sequencer process + err = seqProcess.Signal(syscall.SIGTERM) + require.NoError(t, err, "failed to stop sequencer process") + time.Sleep(1 * time.Second) + + // Stop sync node process + err = fnProcess.Signal(syscall.SIGTERM) + require.NoError(t, err, "failed to stop full node process") + time.Sleep(1 * time.Second) + t.Log("Both sequencer and sync node stopped") + + // ===== PHASE 4: Restart Sync Node as Based Sequencer ===== + t.Log("Phase 4: Restart Sync Node as Based Sequencer") + + // Restart the same full node as a based sequencer + // Reuse the same home directory and data, just add the --based_sequencer flag + basedSeqProcess := sut.ExecCmd(evmSingleBinaryPath, + "start", + "--evnode.node.aggregator=true", + "--evnode.node.based_sequencer=true", + "--evm.jwt-secret-file", fnJwtSecretFile, + "--evm.genesis-hash", genesisHash, + "--home", fullNodeHome, + "--evnode.da.block_time", DefaultDABlockTime, + "--evnode.da.address", endpoints.GetDAAddress(), + "--evnode.da.namespace", DefaultDANamespace, + "--evnode.da.forced_inclusion_namespace", "forced-inc", + "--evnode.rpc.address", endpoints.GetFullNodeRPCListen(), + "--evnode.p2p.listen_address", endpoints.GetFullNodeP2PAddress(), + "--evm.engine-url", endpoints.GetFullNodeEngineURL(), + "--evm.eth-url", endpoints.GetFullNodeEthURL(), + ) + sut.AwaitNodeLive(t, endpoints.GetFullNodeRPCAddress(), NodeStartupTimeout) + t.Log("Sync node restarted as based sequencer") + + // Reconnect to based sequencer + basedSeqClient, err := ethclient.Dial(endpoints.GetFullNodeEthURL()) + require.NoError(t, err) + defer basedSeqClient.Close() + + // ===== PHASE 5: Submit Forced Inclusion Transactions to DA ===== + t.Log("Phase 5: Submit Forced Inclusion Transactions to DA") + + blobClient, err := blobrpc.NewClient(ctx, endpoints.GetDAAddress(), "", "") + require.NoError(t, err, "Failed to create blob RPC client") + defer blobClient.Close() + + daClient := block.NewDAClient( + blobClient, + config.Config{ + DA: config.DAConfig{ + Namespace: DefaultDANamespace, + ForcedInclusionNamespace: "forced-inc", + }, + }, + zerolog.Nop(), + ) + + // Create and submit 3 forced inclusion txs to DA + var forcedTxHashes []common.Hash + for i := 0; i < 3; i++ { + txForce := evm.GetRandomTransaction(t, TestPrivateKey, TestToAddress, DefaultChainID, DefaultGasLimit, &nonce) + txBytes, err := txForce.MarshalBinary() + require.NoError(t, err) + + result := daClient.Submit(ctx, [][]byte{txBytes}, -1, daClient.GetForcedInclusionNamespace(), nil) + require.Equal(t, da.StatusSuccess, result.Code, "Failed to submit forced tx %d to DA: %s", i+1, result.Message) + + forcedTxHashes = append(forcedTxHashes, txForce.Hash()) + t.Logf("Submitted forced inclusion tx %d to DA: %s (nonce=%d)", i+1, txForce.Hash().Hex(), txForce.Nonce()) + } + + // Wait for DA to advance past multiple epochs + t.Log("Waiting for DA to advance past multiple epochs...") + time.Sleep(6 * time.Second) + + // ===== PHASE 6: Verify Based Sequencer Includes Forced Txs ===== + t.Log("Phase 6: Verify Based Sequencer Includes Forced Txs") + + // Wait for based sequencer to include forced inclusion txs + for i, txHash := range forcedTxHashes { + require.Eventually(t, func() bool { + return evm.CheckTxIncluded(basedSeqClient, txHash) + }, 60*time.Second, 1*time.Second, + "Forced inclusion tx %d (%s) not included in based sequencer", i+1, txHash.Hex()) + t.Logf("Based sequencer included forced tx %d: %s", i+1, txHash.Hex()) + } + t.Log("All forced inclusion txs verified on based sequencer") + + // Get the based sequencer's block height after including forced txs + basedSeqHeader, err := basedSeqClient.HeaderByNumber(ctx, nil) + require.NoError(t, err) + basedSeqFinalHeight := basedSeqHeader.Number.Uint64() + t.Logf("Based sequencer final height: %d", basedSeqFinalHeight) + + // ===== PHASE 7: Restart Original Sequencer ===== + t.Log("Phase 7: Restart Original Sequencer") + + // Restart the original sequencer + seqProcess = sut.ExecCmd(evmSingleBinaryPath, + "start", + "--evm.jwt-secret-file", seqJwtSecretFile, + "--evm.genesis-hash", genesisHash, + "--evnode.node.block_time", DefaultBlockTime, + "--evnode.node.aggregator=true", + "--evnode.signer.passphrase_file", seqPassphraseFile, + "--home", sequencerHome, + "--evnode.da.block_time", DefaultDABlockTime, + "--evnode.da.address", endpoints.GetDAAddress(), + "--evnode.da.namespace", DefaultDANamespace, + "--evnode.da.forced_inclusion_namespace", "forced-inc", + "--evnode.rpc.address", endpoints.GetRollkitRPCListen(), + "--evnode.p2p.listen_address", endpoints.GetRollkitP2PAddress(), + "--evm.engine-url", endpoints.GetSequencerEngineURL(), + "--evm.eth-url", endpoints.GetSequencerEthURL(), + ) + sut.AwaitNodeUp(t, endpoints.GetRollkitRPCAddress(), NodeStartupTimeout) + t.Log("Sequencer restarted successfully") + + // Reconnect to sequencer + seqClient, err = ethclient.Dial(endpoints.GetSequencerEthURL()) + require.NoError(t, err) + + // ===== PHASE 8: Verify Sequencer Catches Up ===== + t.Log("Phase 8: Verify Sequencer Catches Up") + + // Wait for sequencer to catch up and include forced txs + for i, txHash := range forcedTxHashes { + require.Eventually(t, func() bool { + return evm.CheckTxIncluded(seqClient, txHash) + }, 30*time.Second, 1*time.Second, + "Forced inclusion tx %d (%s) should be included after catch-up", i+1, txHash.Hex()) + t.Logf("Sequencer caught up with forced tx %d: %s", i+1, txHash.Hex()) + } + t.Log("All forced inclusion txs verified on sequencer after catch-up") + + // Verify sequencer produces blocks and reaches same height as based sequencer + require.Eventually(t, func() bool { + seqHeader, err := seqClient.HeaderByNumber(ctx, nil) + if err != nil { + return false + } + return seqHeader.Number.Uint64() >= basedSeqFinalHeight + }, 30*time.Second, 1*time.Second, "Sequencer should catch up to based sequencer height") + + seqHeader, err = seqClient.HeaderByNumber(ctx, nil) + require.NoError(t, err) + t.Logf("Sequencer caught up to height: %d", seqHeader.Number.Uint64()) + + // ===== PHASE 9: Stop Based Sequencer and Restart as Normal Sync Node ===== + t.Log("Phase 9: Stop Based Sequencer and Restart as Normal Sync Node") + + // Stop based sequencer + err = basedSeqProcess.Signal(syscall.SIGTERM) + require.NoError(t, err, "failed to stop based sequencer process") + time.Sleep(1 * time.Second) + + // Restart as normal sync node (without --based_sequencer flag, with --p2p.peers to connect to sequencer) + fnProcess = sut.ExecCmd(evmSingleBinaryPath, + "start", + "--evm.jwt-secret-file", fnJwtSecretFile, + "--evm.genesis-hash", genesisHash, + "--home", fullNodeHome, + "--evnode.da.block_time", DefaultDABlockTime, + "--evnode.da.address", endpoints.GetDAAddress(), + "--evnode.da.namespace", DefaultDANamespace, + "--evnode.da.forced_inclusion_namespace", "forced-inc", + "--evnode.rpc.address", endpoints.GetFullNodeRPCListen(), + "--evnode.p2p.listen_address", endpoints.GetFullNodeP2PAddress(), + "--evnode.p2p.peers", sequencerP2PAddress, + "--evm.engine-url", endpoints.GetFullNodeEngineURL(), + "--evm.eth-url", endpoints.GetFullNodeEthURL(), + ) + sut.AwaitNodeLive(t, endpoints.GetFullNodeRPCAddress(), NodeStartupTimeout) + t.Log("Sync node restarted as normal full node") + + // Reconnect to sync node + fnClient, err = ethclient.Dial(endpoints.GetFullNodeEthURL()) + require.NoError(t, err) + + // ===== PHASE 10: Verify Nodes Are in Sync ===== + t.Log("Phase 10: Verify Nodes Are in Sync") + + // Wait for sync node to catch up to sequencer + require.Eventually(t, func() bool { + seqHeader, err1 := seqClient.HeaderByNumber(ctx, nil) + fnHeader, err2 := fnClient.HeaderByNumber(ctx, nil) + if err1 != nil || err2 != nil { + return false + } + return fnHeader.Number.Uint64() >= seqHeader.Number.Uint64() + }, 30*time.Second, 1*time.Second, "Sync node should catch up to sequencer") + + // Verify both nodes have all forced inclusion txs + for i, txHash := range forcedTxHashes { + seqIncluded := evm.CheckTxIncluded(seqClient, txHash) + fnIncluded := evm.CheckTxIncluded(fnClient, txHash) + require.True(t, seqIncluded, "Forced tx %d should be on sequencer", i+1) + require.True(t, fnIncluded, "Forced tx %d should be on sync node", i+1) + t.Logf("Forced tx %d verified on both nodes: %s", i+1, txHash.Hex()) + } + + // Send a new transaction and verify both nodes get it + txFinal := evm.GetRandomTransaction(t, TestPrivateKey, TestToAddress, DefaultChainID, DefaultGasLimit, &nonce) + err = seqClient.SendTransaction(ctx, txFinal) + require.NoError(t, err) + t.Logf("Submitted final tx: %s (nonce=%d)", txFinal.Hash().Hex(), txFinal.Nonce()) + + require.Eventually(t, func() bool { + return evm.CheckTxIncluded(seqClient, txFinal.Hash()) && evm.CheckTxIncluded(fnClient, txFinal.Hash()) + }, 20*time.Second, 500*time.Millisecond, "Final tx should be included on both nodes") + t.Log("Final tx included on both nodes - nodes are in sync") + + t.Log("Test PASSED: Sequencer catch-up with based sequencer verified successfully") + t.Logf(" - Sequencer processed %d normal txs before downtime", len(normalTxHashes)) + t.Logf(" - %d forced inclusion txs submitted to DA during downtime", len(forcedTxHashes)) + t.Logf(" - Based sequencer included all forced txs from DA") + t.Logf(" - Sequencer caught up and replayed all forced txs after restart") + t.Logf(" - Both nodes are in sync") +} diff --git a/test/mocks/da.go b/test/mocks/da.go index 0b5c71a49c..f5293d907a 100644 --- a/test/mocks/da.go +++ b/test/mocks/da.go @@ -250,6 +250,66 @@ func (_c *MockClient_GetHeaderNamespace_Call) RunAndReturn(run func() []byte) *M return _c } +// GetLatestDAHeight provides a mock function for the type MockClient +func (_mock *MockClient) GetLatestDAHeight(ctx context.Context) (uint64, error) { + ret := _mock.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetLatestDAHeight") + } + + var r0 uint64 + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { + return returnFunc(ctx) + } + if returnFunc, ok := ret.Get(0).(func(context.Context) uint64); ok { + r0 = returnFunc(ctx) + } else { + r0 = ret.Get(0).(uint64) + } + if returnFunc, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = returnFunc(ctx) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockClient_GetLatestDAHeight_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestDAHeight' +type MockClient_GetLatestDAHeight_Call struct { + *mock.Call +} + +// GetLatestDAHeight is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockClient_Expecter) GetLatestDAHeight(ctx interface{}) *MockClient_GetLatestDAHeight_Call { + return &MockClient_GetLatestDAHeight_Call{Call: _e.mock.On("GetLatestDAHeight", ctx)} +} + +func (_c *MockClient_GetLatestDAHeight_Call) Run(run func(ctx context.Context)) *MockClient_GetLatestDAHeight_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockClient_GetLatestDAHeight_Call) Return(v uint64, err error) *MockClient_GetLatestDAHeight_Call { + _c.Call.Return(v, err) + return _c +} + +func (_c *MockClient_GetLatestDAHeight_Call) RunAndReturn(run func(ctx context.Context) (uint64, error)) *MockClient_GetLatestDAHeight_Call { + _c.Call.Return(run) + return _c +} + // HasForcedInclusionNamespace provides a mock function for the type MockClient func (_mock *MockClient) HasForcedInclusionNamespace() bool { ret := _mock.Called() diff --git a/test/testda/dummy.go b/test/testda/dummy.go index 684d3fcee5..648021b76a 100644 --- a/test/testda/dummy.go +++ b/test/testda/dummy.go @@ -184,6 +184,11 @@ func (d *DummyDA) GetForcedInclusionNamespace() []byte { return nil } // HasForcedInclusionNamespace reports whether forced inclusion is configured. func (d *DummyDA) HasForcedInclusionNamespace() bool { return false } +// GetLatestDAHeight returns the current DA height (the latest height available). +func (d *DummyDA) GetLatestDAHeight(_ context.Context) (uint64, error) { + return d.height.Load(), nil +} + // Get retrieves blobs by ID (stub implementation). func (d *DummyDA) Get(_ context.Context, _ []datypes.ID, _ []byte) ([]datypes.Blob, error) { return nil, nil