diff --git a/sei-tendermint/internal/autobahn/avail/inner.go b/sei-tendermint/internal/autobahn/avail/inner.go index 167c02e486..b42da34068 100644 --- a/sei-tendermint/internal/autobahn/avail/inner.go +++ b/sei-tendermint/internal/autobahn/avail/inner.go @@ -9,6 +9,11 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" ) +// TODO: when dynamic committee changes are supported, newly joined members +// must be added to blocks, votes, nextBlockToPersist, and persistedBlockStart. +// Currently all four are initialized once in newInner from c.Lanes().All(). +// BlockPersister creates lane WALs lazily inside PersistBlock, but the new +// member must also appear in inner.blocks before the next persist cycle. type inner struct { latestAppQC utils.Option[*types.AppQC] latestCommitQC utils.AtomicSend[utils.Option[*types.CommitQC]] @@ -23,8 +28,10 @@ type inner struct { // reconstructed from the blocks already on disk (see newInner). // // TODO: consider giving this its own AtomicSend to avoid waking unrelated - // inner waiters (PushVote, PushCommitQC, etc.) on every markBlockPersisted - // call. Only RecvBatch needs to be notified of cursor changes; + // inner waiters (PushVote, PushCommitQC, etc.) on markBlockPersisted calls. + // Now that blocks are persisted concurrently by lane (one notification per + // lane per batch, not per block), the frequency is lower, but still not + // ideal. Only RecvBatch needs to be notified of cursor changes; // collectPersistBatch is in the same goroutine and reads it directly. nextBlockToPersist map[types.LaneID]types.BlockNumber diff --git a/sei-tendermint/internal/autobahn/avail/state.go b/sei-tendermint/internal/autobahn/avail/state.go index aa070096d8..6172b845f8 100644 --- a/sei-tendermint/internal/autobahn/avail/state.go +++ b/sei-tendermint/internal/autobahn/avail/state.go @@ -157,20 +157,18 @@ func NewState(key types.SecretKey, data *data.State, stateDir utils.Option[strin return nil, err } - // Delete files below the prune anchor that were filtered out by - // loadPersistedState. Also reset the CommitQC persister's cursor to - // match the post-prune range. - laneFirsts := make(map[types.LaneID]types.BlockNumber, len(inner.blocks)) - for lane, q := range inner.blocks { - laneFirsts[lane] = q.first - } - if err := pers.blocks.DeleteBefore(laneFirsts); err != nil { - return nil, fmt.Errorf("prune stale block files: %w", err) - } - if err := pers.commitQCs.DeleteBefore(inner.commitQCs.first); err != nil { - return nil, fmt.Errorf("prune stale commitQC files: %w", err) + // Truncate WAL entries below the prune anchor that were filtered out by + // loadPersistedState. + if ls, ok := loaded.Get(); ok { + if anchor, ok := ls.pruneAnchor.Get(); ok { + if err := pers.blocks.DeleteBefore(anchor.CommitQC); err != nil { + return nil, fmt.Errorf("prune stale block WAL entries: %w", err) + } + if err := pers.commitQCs.DeleteBefore(anchor.CommitQC.Proposal().Index(), utils.Some(anchor.CommitQC)); err != nil { + return nil, fmt.Errorf("prune stale commitQC WAL entries: %w", err) + } + } } - pers.commitQCs.ResetNext(inner.commitQCs.next) return &State{ key: key, @@ -631,18 +629,20 @@ func (s *State) Run(ctx context.Context) error { // runPersist is the main loop for the persist goroutine. // Write order: -// 1. Prune anchor (AppQC + CommitQC pair) — the crash-recovery watermark. -// 2. CommitQCs in order, then publish LastCommitQC immediately -// so consensus can advance without waiting for block writes. -// 3. Blocks per lane in order, markBlockPersisted after each. -// 4. Prune old blocks and CommitQCs. +// 1. Prune anchor (AppQC + CommitQC pair) — the crash-recovery watermark — +// followed by WAL truncation of blocks and CommitQCs. Truncation is +// co-located with the anchor write so the pruning point is derived +// directly from the anchor's CommitQC. Truncation must happen before +// writes because the WAL requires contiguous indices — if the anchor +// advanced past all persisted entries, DeleteBefore resets the WAL so +// new writes start clean. +// 2. CommitQCs and blocks concurrently via scope.Parallel: one goroutine for +// CommitQCs, one goroutine per lane for blocks (sequential within each +// lane). Each goroutine publishes its result (markCommitQCsPersisted / +// markBlockPersisted) per entry so voting unblocks ASAP. // // The prune anchor is a pruning watermark: on restart we resume from it. // -// Blocks are persisted one at a time with inner.nextBlockToPersist -// updated after each write, so vote latency equals single-block write -// time regardless of batch size. -// // TODO: use a single WAL for anchor and CommitQCs to make // this atomic rather than relying on write order. func (s *State) runPersist(ctx context.Context, pers persisters) error { @@ -654,51 +654,74 @@ func (s *State) runPersist(ctx context.Context, pers persisters) error { } // 1. Persist prune anchor first — establishes the crash-recovery watermark. + // All WAL pruning is co-located here so the truncation point is + // derived directly from the anchor, making the safety invariant + // explicit: we only truncate entries the on-disk anchor covers. + // Block WAL pruning must happen before writes because the WAL + // requires contiguous indices — if the anchor advanced past all + // persisted entries, DeleteBefore resets the WAL so new writes + // start clean. if anchor, ok := batch.pruneAnchor.Get(); ok { if err := pers.pruneAnchor.Persist(PruneAnchorConv.Encode(anchor)); err != nil { return fmt.Errorf("persist prune anchor: %w", err) } s.advancePersistedBlockStart(anchor.CommitQC) lastPersistedAppQCNext = anchor.CommitQC.Proposal().Index() + 1 - } - // 2. Persist new CommitQCs, then publish immediately so consensus - // can advance without waiting for block writes or pruning. - for _, qc := range batch.commitQCs { - if err := pers.commitQCs.PersistCommitQC(qc); err != nil { - return fmt.Errorf("persist commitqc %d: %w", qc.Index(), err) + if err := pers.blocks.DeleteBefore(anchor.CommitQC); err != nil { + return fmt.Errorf("block deleteBefore: %w", err) + } + if err := pers.commitQCs.DeleteBefore(anchor.CommitQC.Proposal().Index(), utils.Some(anchor.CommitQC)); err != nil { + return fmt.Errorf("commitqc deleteBefore: %w", err) } - } - if len(batch.commitQCs) > 0 { - s.markCommitQCsPersisted(batch.commitQCs[len(batch.commitQCs)-1]) } - // 3. Persist blocks (mark each individually for vote latency). + // 2. Persist CommitQCs and blocks concurrently. CommitQCs go in + // one goroutine; blocks fan out one goroutine per lane (sequential + // within each lane to preserve block-number ordering). Each + // goroutine publishes its result (markCommitQCsPersisted / + // markBlockPersisted) as soon as it finishes. + blocksByLane := make(map[types.LaneID][]*types.Signed[*types.LaneProposal]) for _, proposal := range batch.blocks { - h := proposal.Msg().Block().Header() - if err := pers.blocks.PersistBlock(proposal); err != nil { - return fmt.Errorf("persist block %s/%d: %w", h.Lane(), h.BlockNumber(), err) + lane := proposal.Msg().Block().Header().Lane() + blocksByLane[lane] = append(blocksByLane[lane], proposal) + } + if err := scope.Parallel(func(ps scope.ParallelScope) error { + if len(batch.commitQCs) > 0 { + ps.Spawn(func() error { + for _, qc := range batch.commitQCs { + if err := pers.commitQCs.PersistCommitQC(qc); err != nil { + return fmt.Errorf("persist commitqc %d: %w", qc.Index(), err) + } + s.markCommitQCsPersisted(qc) + } + return nil + }) } - s.markBlockPersisted(h.Lane(), h.BlockNumber()+1) - } - - // 4. Prune old data. - if err := pers.blocks.DeleteBefore(batch.laneFirsts); err != nil { - return fmt.Errorf("block deleteBefore: %w", err) - } - if err := pers.commitQCs.DeleteBefore(batch.commitQCFirst); err != nil { - return fmt.Errorf("commitqc deleteBefore: %w", err) + for lane, proposals := range blocksByLane { + ps.Spawn(func() error { + for _, p := range proposals { + if err := pers.blocks.PersistBlock(p); err != nil { + h := p.Msg().Block().Header() + return fmt.Errorf("persist block %s/%d: %w", h.Lane(), h.BlockNumber(), err) + } + s.markBlockPersisted(lane, p.Msg().Block().Header().BlockNumber()+1) + } + return nil + }) + } + return nil + }); err != nil { + return err } } } // persistBatch holds the data collected under lock for one persist iteration. type persistBatch struct { - blocks []*types.Signed[*types.LaneProposal] - commitQCs []*types.CommitQC - pruneAnchor utils.Option[*PruneAnchor] - laneFirsts map[types.LaneID]types.BlockNumber - commitQCFirst types.RoadIndex + blocks []*types.Signed[*types.LaneProposal] + commitQCs []*types.CommitQC + pruneAnchor utils.Option[*PruneAnchor] } // advancePersistedBlockStart updates the per-lane block admission watermark @@ -717,8 +740,9 @@ func (s *State) advancePersistedBlockStart(commitQC *types.CommitQC) { } // markBlockPersisted advances the per-lane block persistence cursor. -// Called after each individual block write so that RecvBatch (and therefore -// voting) unblocks with single-block latency regardless of batch size. +// Called after each block is persisted so that RecvBatch (and therefore +// voting) can unblock as soon as the block is durable. Safe for concurrent +// callers (acquires s.inner lock internally). func (s *State) markBlockPersisted(lane types.LaneID, next types.BlockNumber) { for inner, ctrl := range s.inner.Lock() { inner.nextBlockToPersist[lane] = next @@ -759,16 +783,13 @@ func (s *State) collectPersistBatch(ctx context.Context, lastPersistedAppQCNext }); err != nil { return b, err } - b.laneFirsts = make(map[types.LaneID]types.BlockNumber, len(inner.blocks)) for lane, q := range inner.blocks { start := max(inner.nextBlockToPersist[lane], q.first) for n := start; n < q.next; n++ { b.blocks = append(b.blocks, q.q[n]) } - b.laneFirsts[lane] = q.first } commitQCNext = max(commitQCNext, inner.commitQCs.first) - b.commitQCFirst = inner.commitQCs.first for n := commitQCNext; n < inner.commitQCs.next; n++ { b.commitQCs = append(b.commitQCs, inner.commitQCs.q[n]) } diff --git a/sei-tendermint/internal/autobahn/avail/state_test.go b/sei-tendermint/internal/autobahn/avail/state_test.go index d0e95a5bc0..65d45452e3 100644 --- a/sei-tendermint/internal/autobahn/avail/state_test.go +++ b/sei-tendermint/internal/autobahn/avail/state_test.go @@ -291,9 +291,10 @@ func TestStateRestartFromPersisted(t *testing.T) { wantAppQCIdx = appProposal.RoadIndex() } - // Wait for persistence to complete. markCommitQCsPersisted fires - // after all blocks, commitQCs, the prune anchor, and cleanup in the - // batch are on disk, so this confirms all data is durable. + // Wait for commitQC persistence. markCommitQCsPersisted fires after + // all commitQCs in the batch are on disk. Block goroutines may still + // be in flight, but g.Wait() in runPersist ensures they complete + // before the next batch, so the data is durable by scope exit. if err := state.waitForCommitQC(ctx, wantAppQCIdx); err != nil { return fmt.Errorf("waitForCommitQC: %w", err) } @@ -582,9 +583,7 @@ func TestNewStateWithPersistence(t *testing.T) { // All 3 commitQCs should be loaded (no AppQC to skip past). require.Equal(t, types.RoadIndex(0), state.FirstCommitQC()) // LastCommitQC should be set to the last loaded one. - latest, ok := state.LastCommitQC().Load().Get() - require.True(t, ok) - require.NoError(t, utils.TestDiff(qcs[2], latest)) + require.NoError(t, utils.TestDiff(utils.Some(qcs[2]), state.LastCommitQC().Load())) }) t.Run("loads persisted commitQCs with AppQC", func(t *testing.T) { @@ -622,14 +621,11 @@ func TestNewStateWithPersistence(t *testing.T) { // inner.prune(appQC@1, commitQC@1) sets commitQCs.first = 1. require.Equal(t, types.RoadIndex(1), state.FirstCommitQC()) - latest, ok := state.LastCommitQC().Load().Get() - require.True(t, ok) - require.NoError(t, utils.TestDiff(qcs[4], latest)) + require.NoError(t, utils.TestDiff(utils.Some(qcs[4]), state.LastCommitQC().Load())) }) t.Run("non-contiguous commitQC files return error", func(t *testing.T) { dir := t.TempDir() - ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) // Build 6 sequential CommitQCs (indices 0-5). allQCs := make([]*types.CommitQC, 6) @@ -649,19 +645,109 @@ func TestNewStateWithPersistence(t *testing.T) { CommitQc: types.CommitQCConv.Encode(allQCs[0]), })) - // Persist QCs 0, 1, 2 contiguously, then skip to 5 (simulating - // corruption or manual tampering). Since the anchor is persisted - // first, a gap should never occur normally — treat it as an error. + // Persist QCs 0, 1, 2 contiguously, then try to skip to 5. + // PersistCommitQC enforces strict sequential order, so the gap + // is caught at write time rather than at load time. cp, _, err := persist.NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - for i := 0; i < 3; i++ { + for i := range 3 { require.NoError(t, cp.PersistCommitQC(allQCs[i])) } - require.NoError(t, cp.PersistCommitQC(allQCs[5])) - - _, err = NewState(keys[0], ds, utils.Some(dir)) + err = cp.PersistCommitQC(allQCs[5]) require.Error(t, err) - require.Contains(t, err.Error(), "non-contiguous") + require.Contains(t, err.Error(), "out of sequence") + require.NoError(t, cp.Close()) + }) + + t.Run("anchor past all persisted commitQCs truncates WAL", func(t *testing.T) { + dir := t.TempDir() + ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) + + // Build a chain of 10 CommitQCs (indices 0-9). + qcs := make([]*types.CommitQC, 10) + prev := utils.None[*types.CommitQC]() + for i := range qcs { + qcs[i] = makeCommitQC(rng, committee, keys, prev, nil, utils.None[*types.AppQC]()) + prev = utils.Some(qcs[i]) + } + + // Persist only indices 0-4 to the CommitQC WAL. + cp, _, err := persist.NewCommitQCPersister(utils.Some(dir)) + require.NoError(t, err) + for i := range 5 { + require.NoError(t, cp.PersistCommitQC(qcs[i])) + } + require.NoError(t, cp.Close()) + + // Persist a prune anchor at index 9 — well past the persisted range. + appProposal := types.NewAppProposal(50, 9, types.GenAppHash(rng)) + appQC := types.NewAppQC(makeAppVotes(keys, appProposal)) + prunePers, _, err := persist.NewPersister[*pb.PersistedAvailPruneAnchor](utils.Some(dir), innerFile) + require.NoError(t, err) + require.NoError(t, prunePers.Persist(&pb.PersistedAvailPruneAnchor{ + AppQc: types.AppQCConv.Encode(appQC), + CommitQc: types.CommitQCConv.Encode(qcs[9]), + })) + + // NewState should succeed: DeleteBefore truncates the stale WAL, + // then the re-persist loop writes the anchor's CommitQC back. + state, err := NewState(keys[0], ds, utils.Some(dir)) + require.NoError(t, err) + + require.Equal(t, types.RoadIndex(9), state.FirstCommitQC()) + require.NoError(t, utils.TestDiff(utils.Some(qcs[9]), state.LastCommitQC().Load())) + + got, ok := state.LastAppQC().Get() + require.True(t, ok) + require.Equal(t, types.RoadIndex(9), got.Proposal().RoadIndex()) + }) + + t.Run("anchor past all persisted blocks truncates lane WAL", func(t *testing.T) { + dir := t.TempDir() + ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) + lane := keys[0].Public() + + // Persist commitQCs 0-9 and blocks 0-2 for one lane. + qcs := make([]*types.CommitQC, 10) + prev := utils.None[*types.CommitQC]() + cp, _, err := persist.NewCommitQCPersister(utils.Some(dir)) + require.NoError(t, err) + for i := range qcs { + qcs[i] = makeCommitQC(rng, committee, keys, prev, nil, utils.None[*types.AppQC]()) + prev = utils.Some(qcs[i]) + require.NoError(t, cp.PersistCommitQC(qcs[i])) + } + require.NoError(t, cp.Close()) + + bp, _, err := persist.NewBlockPersister(utils.Some(dir)) + require.NoError(t, err) + var parent types.BlockHeaderHash + for n := types.BlockNumber(0); n < 3; n++ { + block := types.NewBlock(lane, n, parent, types.GenPayload(rng)) + signed := types.Sign(keys[0], types.NewLaneProposal(block)) + parent = block.Header().Hash() + require.NoError(t, bp.PersistBlock(signed)) + } + + // Persist a prune anchor at index 9 with a laneRange that starts past + // all persisted blocks — DeleteBefore will TruncateAll the block WAL. + appProposal := types.NewAppProposal(50, 9, types.GenAppHash(rng)) + appQC := types.NewAppQC(makeAppVotes(keys, appProposal)) + prunePers, _, err := persist.NewPersister[*pb.PersistedAvailPruneAnchor](utils.Some(dir), innerFile) + require.NoError(t, err) + require.NoError(t, prunePers.Persist(&pb.PersistedAvailPruneAnchor{ + AppQc: types.AppQCConv.Encode(appQC), + CommitQc: types.CommitQCConv.Encode(qcs[9]), + })) + + // NewState should succeed: block WAL gets truncated, lane starts clean. + state, err := NewState(keys[0], ds, utils.Some(dir)) + require.NoError(t, err) + + require.Equal(t, types.RoadIndex(9), state.FirstCommitQC()) + got, ok := state.LastAppQC().Get() + require.True(t, ok) + require.Equal(t, types.RoadIndex(9), got.Proposal().RoadIndex()) }) t.Run("corrupt AppQC data returns error", func(t *testing.T) { diff --git a/sei-tendermint/internal/autobahn/consensus/persist/blocks.go b/sei-tendermint/internal/autobahn/consensus/persist/blocks.go index 93f233ee7d..23c2bd4937 100644 --- a/sei-tendermint/internal/autobahn/consensus/persist/blocks.go +++ b/sei-tendermint/internal/autobahn/consensus/persist/blocks.go @@ -1,218 +1,299 @@ -// TODO: Block file persistence is a temporary solution that will be replaced by -// a WAL (Write-Ahead Log) library before launch. CommitQC file persistence -// (commitqcs.go) shares the same migration plan. With a WAL, atomic appends -// eliminate several complexities in both files: -// - Corrupt file handling (WAL handles its own integrity). -// - Per-file naming, parsing, and directory scanning. -// - Orphaned file cleanup (WAL truncation replaces DeleteBefore). -// - Gap handling in newInner (WAL replay is always contiguous). -// -// What survives: the BlockPersister abstraction (PersistBlock/DeleteBefore). - +// TODO: add Prometheus metrics for blocks written and truncated. package persist import ( "encoding/hex" + "errors" "fmt" - "maps" "os" "path/filepath" - "slices" - "strconv" - "strings" - - "log/slog" - - "google.golang.org/protobuf/proto" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/scope" "github.com/sei-protocol/seilog" ) var logger = seilog.NewLogger("tendermint", "internal", "autobahn", "consensus", "persist") +const blocksDir = "blocks" + // LoadedBlock is a block loaded from disk during state restoration. type LoadedBlock struct { Number types.BlockNumber Proposal *types.Signed[*types.LaneProposal] } -// BlockPersister manages individual block files in a blocks/ subdirectory. -// Each block is stored as _.pb. -// The caller is responsible for driving persistence (typically a goroutine that -// watches in-memory block state and calls PersistBlock / DeleteBefore). -// When noop is true, all disk I/O is skipped. -type BlockPersister struct { - dir string // full path to the blocks/ subdirectory; empty when noop - noop bool +// laneWALState is the mutable state of a lane WAL, protected by laneWAL's +// mutex. Block numbers within a lane are contiguous, so the first block +// number is derived: nextBlockNum - Count(). +type laneWALState struct { + *indexedWAL[*types.Signed[*types.LaneProposal]] + nextBlockNum types.BlockNumber } -// newNoOpBlockPersister returns a BlockPersister that skips all disk I/O. -// Used when persistence is disabled. -func newNoOpBlockPersister() *BlockPersister { - return &BlockPersister{noop: true} +func (s *laneWALState) firstBlockNum() utils.Option[types.BlockNumber] { + if s.Count() == 0 { + return utils.None[types.BlockNumber]() + } + return utils.Some(s.nextBlockNum - types.BlockNumber(s.Count())) } -// NewBlockPersister creates the blocks/ subdirectory if it doesn't exist and -// returns a block persister. Loads all persisted blocks from disk as sorted -// slices per lane. Corrupt files are skipped; the caller (newInner) returns -// an error if the resulting slices are non-contiguous. -// When stateDir is None, returns a no-op persister that skips all disk I/O. -func NewBlockPersister(stateDir utils.Option[string]) (*BlockPersister, map[types.LaneID][]LoadedBlock, error) { - sd, ok := stateDir.Get() - if !ok { - return newNoOpBlockPersister(), nil, nil - } - dir := filepath.Join(sd, "blocks") - if err := os.MkdirAll(dir, 0700); err != nil { - return nil, nil, fmt.Errorf("create blocks dir %s: %w", dir, err) - } +// laneWAL wraps a laneWALState with a mutex that serializes all writes and +// truncations on a single lane. +type laneWAL struct { + state utils.Mutex[*laneWALState] +} - bp := &BlockPersister{ - dir: dir, - } - blocks, err := bp.loadAll() - if err != nil { - return nil, nil, err - } - return bp, blocks, nil +// BlockPersister manages block persistence using one WAL per lane. +// Each lane gets its own WAL in a subdirectory named by hex-encoded lane ID, +// so truncation is independent per lane. A single shared WAL would be simpler +// but a lane whose blocks are never included in a committed block (e.g. the +// validator is removed from the committee) would prevent truncation of all +// other lanes' entries that follow it. +// When dir is None, all disk I/O is skipped (no-op mode). +// +// All public methods are safe for concurrent use. The lanes map is protected +// by an RWMutex; each laneWAL has its own Mutex for write serialization. +// The per-lane mutex prevents data races, but the caller must not interleave +// DeleteBefore and PersistBlock on the same lane — DeleteBefore may +// re-anchor nextBlockNum, causing a subsequent PersistBlock to fail the +// contiguity check. In practice, state.go runs DeleteBefore to completion +// before spawning PersistBlock goroutines. +// +// NOTE: PersistBlock releases the map RLock before writing to the lane WAL +// (so different lanes are fully parallel). This is safe because lanes are +// only added, never removed. If lane deletion is added in the future, the +// caller must ensure DeleteBefore (which removes lanes) does not overlap +// with PersistBlock calls — or PersistBlock must hold RLock through the +// WAL write. +type BlockPersister struct { + dir utils.Option[string] // immutable after construction + lanes utils.RWMutex[map[types.LaneID]*laneWAL] } -func blockFilename(lane types.LaneID, n types.BlockNumber) string { - return hex.EncodeToString(lane.Bytes()) + "_" + strconv.FormatUint(uint64(n), 10) + ".pb" +func laneDir(lane types.LaneID) string { + return hex.EncodeToString(lane.Bytes()) } -func parseBlockFilename(name string) (types.LaneID, types.BlockNumber, error) { - name = strings.TrimSuffix(name, ".pb") - parts := strings.SplitN(name, "_", 2) - if len(parts) != 2 { - return types.PublicKey{}, 0, fmt.Errorf("bad block filename %q", name) - } - keyBytes, err := hex.DecodeString(parts[0]) - if err != nil { - return types.PublicKey{}, 0, fmt.Errorf("bad lane hex in %q: %w", name, err) - } - lane, err := types.PublicKeyFromBytes(keyBytes) - if err != nil { - return types.PublicKey{}, 0, fmt.Errorf("bad lane key in %q: %w", name, err) - } - n, err := strconv.ParseUint(parts[1], 10, 64) +func newLaneWALState(dir string) (*laneWALState, error) { + iw, err := openIndexedWAL(dir, types.SignedMsgConv[*types.LaneProposal]()) if err != nil { - return types.PublicKey{}, 0, fmt.Errorf("bad block number in %q: %w", name, err) + return nil, err } - return lane, types.BlockNumber(n), nil + return &laneWALState{indexedWAL: iw}, nil } -// PersistBlock writes a signed lane proposal to its own file. -func (bp *BlockPersister) PersistBlock(proposal *types.Signed[*types.LaneProposal]) error { - if bp.noop { - return nil +// NewBlockPersister opens (or creates) per-lane WALs in subdirectories of +// blocks/ and replays all persisted entries. Returns the persister and loaded +// blocks grouped by lane (sorted by block number). Corrupt tail entries are +// auto-truncated by the WAL library. +// When stateDir is None, returns a no-op persister. +func NewBlockPersister(stateDir utils.Option[string]) (*BlockPersister, map[types.LaneID][]LoadedBlock, error) { + sd, ok := stateDir.Get() + if !ok { + return &BlockPersister{lanes: utils.NewRWMutex(map[types.LaneID]*laneWAL{})}, nil, nil } - h := proposal.Msg().Block().Header() - pb := types.SignedMsgConv[*types.LaneProposal]().Encode(proposal) - data, err := proto.Marshal(pb) - if err != nil { - return fmt.Errorf("marshal block %s/%d: %w", h.Lane(), h.BlockNumber(), err) + dir := filepath.Join(sd, blocksDir) + if err := os.MkdirAll(dir, 0700); err != nil { + return nil, nil, fmt.Errorf("create blocks dir %s: %w", dir, err) } - path := filepath.Join(bp.dir, blockFilename(h.Lane(), h.BlockNumber())) - return writeAndSync(path, data) -} -// DeleteBefore removes persisted block files that are no longer needed. -// For lanes in laneFirsts, deletes files with block number below the map value. -// For lanes NOT in laneFirsts (orphaned from a previous committee/epoch), -// deletes all files — old blocks are not reusable after a committee change. -// An empty/nil laneFirsts is a no-op (no committee info available to judge orphans). -// Returns an error if the directory cannot be read; individual file removal -// failures are logged but do not cause an error. -func (bp *BlockPersister) DeleteBefore(laneFirsts map[types.LaneID]types.BlockNumber) error { - if bp.noop || len(laneFirsts) == 0 { - return nil - } - entries, err := os.ReadDir(bp.dir) + lanes := map[types.LaneID]*laneWAL{} + bp := &BlockPersister{dir: utils.Some(dir), lanes: utils.NewRWMutex(lanes)} + + entries, err := os.ReadDir(dir) if err != nil { - return fmt.Errorf("list blocks dir for cleanup: %w", err) + return nil, nil, fmt.Errorf("read blocks dir %s: %w", dir, err) } - for _, entry := range entries { - if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pb") { + + allBlocks := map[types.LaneID][]LoadedBlock{} + for _, e := range entries { + if !e.IsDir() { continue } - lane, fileN, err := parseBlockFilename(entry.Name()) + laneBytes, err := hex.DecodeString(e.Name()) if err != nil { + logger.Warn("skipping unexpected entry in blocks dir", "name", e.Name()) continue } - first, ok := laneFirsts[lane] - if ok && fileN >= first { + lane, err := types.PublicKeyFromBytes(laneBytes) + if err != nil { + logger.Warn("skipping lane dir with invalid key", "name", e.Name(), "err", err) continue } - path := filepath.Join(bp.dir, entry.Name()) - if err := os.Remove(path); err != nil && !os.IsNotExist(err) { - logger.Warn("failed to delete block file", "path", path, "err", err) + lanePath := filepath.Join(dir, e.Name()) + s, err := newLaneWALState(lanePath) + if err != nil { + _ = bp.close() + return nil, nil, fmt.Errorf("open lane WAL in %s: %w", lanePath, err) + } + loaded, err := s.loadAll() + if err != nil { + _ = s.Close() + _ = bp.close() + return nil, nil, fmt.Errorf("load lane WAL in %s: %w", lanePath, err) + } + lanes[lane] = &laneWAL{state: utils.NewMutex(s)} + if len(loaded) > 0 { + allBlocks[lane] = loaded } } - return nil + + return bp, allBlocks, nil } -// loadAll loads all persisted blocks from the blocks/ directory. -// Returns sorted slices per lane. Corrupt files are skipped; the caller -// (newInner) returns an error on gaps or parent-hash mismatches. -func (bp *BlockPersister) loadAll() (map[types.LaneID][]LoadedBlock, error) { - entries, err := os.ReadDir(bp.dir) - if err != nil { - return nil, fmt.Errorf("read blocks dir %s: %w", bp.dir, err) +// getOrCreateLane returns the laneWAL for the given lane, creating it if +// necessary. Uses double-checked locking: fast path reads under RLock; +// slow path (lane creation) promotes to a write Lock. +// The returned pointer is safe to use after the lock is released because +// lanes are only ever added, never removed (see BlockPersister doc). +// Returns an error if called on a no-op persister (caller should check first). +func (bp *BlockPersister) getOrCreateLane(lane types.LaneID) (*laneWAL, error) { + dir, ok := bp.dir.Get() + if !ok { + return nil, fmt.Errorf("getOrCreateLane called on no-op persister") } - - raw := map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} - for _, entry := range entries { - if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pb") { - continue + // Fast path: read-only check. + for lanes := range bp.lanes.RLock() { + if lw, ok := lanes[lane]; ok { + return lw, nil } - lane, n, err := parseBlockFilename(entry.Name()) - if err != nil { - logger.Warn("skipping unrecognized block file", "file", entry.Name(), "err", err) - continue + } + // Slow path: create under write lock (double-checked). + for lanes := range bp.lanes.Lock() { + if lw, ok := lanes[lane]; ok { + return lw, nil } - proposal, err := loadBlockFile(filepath.Join(bp.dir, entry.Name())) + s, err := newLaneWALState(filepath.Join(dir, laneDir(lane))) if err != nil { - logger.Warn("skipping corrupt block file", "file", entry.Name(), "err", err) - continue + return nil, fmt.Errorf("create lane WAL for %s: %w", lane, err) } - h := proposal.Msg().Block().Header() - if h.Lane() != lane || h.BlockNumber() != n { - logger.Warn("skipping block file with mismatched header", - "file", entry.Name(), - "headerLane", h.Lane(), - slog.Uint64("headerNum", uint64(h.BlockNumber())), - "filenameLane", lane, - slog.Uint64("filenameNum", uint64(n)), - ) - continue + lw := &laneWAL{state: utils.NewMutex(s)} + lanes[lane] = lw + return lw, nil + } + panic("unreachable") +} + +// PersistBlock writes a signed lane proposal to the per-lane WAL, creating +// the lane WAL on first use. Safe for concurrent use — concurrent writes to +// distinct lanes are fully parallel; writes to the same lane are serialized +// by the per-lane mutex. +func (bp *BlockPersister) PersistBlock(proposal *types.Signed[*types.LaneProposal]) error { + if _, ok := bp.dir.Get(); !ok { + return nil // no-op persister (persistence disabled) + } + h := proposal.Msg().Block().Header() + lane := h.Lane() + lw, err := bp.getOrCreateLane(lane) + if err != nil { + return err + } + for s := range lw.state.Lock() { + if (s.Count() > 0 || s.nextBlockNum > 0) && h.BlockNumber() != s.nextBlockNum { + return fmt.Errorf("block %s/%d out of sequence (next=%d)", lane, h.BlockNumber(), s.nextBlockNum) } - if raw[lane] == nil { - raw[lane] = map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} + if err := s.Write(proposal); err != nil { + return fmt.Errorf("persist block %s/%d: %w", lane, h.BlockNumber(), err) } - raw[lane][n] = proposal - logger.Info("loaded persisted block", "lane", lane.String(), slog.Uint64("block", uint64(n))) + s.nextBlockNum = h.BlockNumber() + 1 + return nil } + panic("unreachable") +} - result := map[types.LaneID][]LoadedBlock{} - for lane, bs := range raw { - sorted := slices.Sorted(maps.Keys(bs)) - blocks := make([]LoadedBlock, 0, len(sorted)) - for _, n := range sorted { - blocks = append(blocks, LoadedBlock{Number: n, Proposal: bs[n]}) +// DeleteBefore truncates each lane's WAL below the block range specified +// by the given CommitQC. Each lane is truncated independently and in +// parallel. Safe for concurrent use. +// +// TODO: once dynamic committee changes are implemented, add stale lane +// removal here — close and delete WAL directories for lanes no longer in +// the committee after a retention period. +func (bp *BlockPersister) DeleteBefore(qc *types.CommitQC) error { + if _, ok := bp.dir.Get(); !ok { + return nil // no-op persister (persistence disabled) + } + for lanes := range bp.lanes.RLock() { + return scope.Parallel(func(ps scope.ParallelScope) error { + for lane, lw := range lanes { + first := qc.LaneRange(lane).First() + ps.Spawn(func() error { + for s := range lw.state.Lock() { + firstBN, ok := s.firstBlockNum().Get() + if !ok || first <= firstBN { + return nil + } + if first >= s.nextBlockNum { + // Anchor advanced past all persisted blocks for this lane. + if err := s.TruncateAll(); err != nil { + return fmt.Errorf("truncate all lane %s WAL: %w", lane, err) + } + s.nextBlockNum = first + return nil + } + walIdx := s.FirstIdx() + uint64(first-firstBN) + if err := s.TruncateBefore(walIdx, func(entry *types.Signed[*types.LaneProposal]) error { + if got := entry.Msg().Block().Header().BlockNumber(); got != first { + return fmt.Errorf("block at WAL index %d has number %d, expected %d (index mapping broken)", walIdx, got, first) + } + return nil + }); err != nil { + return fmt.Errorf("truncate lane %s WAL before block %d: %w", lane, first, err) + } + return nil + } + panic("unreachable") + }) + } + return nil + }) + } + panic("unreachable") +} + +// close shuts down all per-lane WALs. Internal: only used by tests and +// NewBlockPersister (error cleanup). Production code does not close WALs +// at shutdown — the OS reclaims resources on process exit. +// Safe for concurrent use. +func (bp *BlockPersister) close() error { + if _, ok := bp.dir.Get(); !ok { + return nil // no-op persister (persistence disabled) + } + for lanes := range bp.lanes.Lock() { + var errs []error + for _, lw := range lanes { + for s := range lw.state.Lock() { + if err := s.Close(); err != nil { + errs = append(errs, err) + } + } } - result[lane] = blocks + return errors.Join(errs...) } - return result, nil + panic("unreachable") } -func loadBlockFile(path string) (*types.Signed[*types.LaneProposal], error) { - data, err := os.ReadFile(path) //nolint:gosec // path is constructed from operator-configured stateDir + hardcoded filename; not user-controlled +// loadAll reads all entries from the lane WAL and returns the loaded blocks. +// Also restores nextBlockNum from the last entry. +func (s *laneWALState) loadAll() ([]LoadedBlock, error) { + entries, err := s.ReadAll() if err != nil { return nil, err } - conv := types.SignedMsgConv[*types.LaneProposal]() - return conv.Unmarshal(data) + loaded := make([]LoadedBlock, 0, len(entries)) + for i, proposal := range entries { + h := proposal.Msg().Block().Header() + if i > 0 && h.BlockNumber() != s.nextBlockNum { + return nil, fmt.Errorf("gap in lane %s: block %d follows %d", h.Lane(), h.BlockNumber(), s.nextBlockNum-1) + } + s.nextBlockNum = h.BlockNumber() + 1 + loaded = append(loaded, LoadedBlock{Number: h.BlockNumber(), Proposal: proposal}) + } + if len(loaded) > 0 { + first, last := loaded[0].Number, loaded[len(loaded)-1].Number + logger.Debug("loaded persisted blocks", "lane", entries[0].Msg().Block().Header().Lane().String(), + "first", first, "last", last, "count", len(loaded)) + } + return loaded, nil } diff --git a/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go b/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go index e248ca2bdf..52811b6e82 100644 --- a/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go +++ b/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go @@ -1,7 +1,7 @@ package persist import ( - "encoding/hex" + "fmt" "os" "path/filepath" "testing" @@ -9,6 +9,7 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/require" + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/scope" ) func testSignedProposal(rng utils.Rng, key types.SecretKey, n types.BlockNumber) *types.Signed[*types.LaneProposal] { @@ -17,16 +18,56 @@ func testSignedProposal(rng utils.Rng, key types.SecretKey, n types.BlockNumber) return types.Sign(key, types.NewLaneProposal(block)) } +// testDeleteBefore is a test helper that truncates lane WALs using a plain +// map, avoiding the need to construct a full CommitQC. +func testDeleteBefore(bp *BlockPersister, laneFirsts map[types.LaneID]types.BlockNumber) error { + for lanes := range bp.lanes.RLock() { + return scope.Parallel(func(ps scope.ParallelScope) error { + for lane, first := range laneFirsts { + lw, ok := lanes[lane] + if !ok { + continue + } + ps.Spawn(func() error { + for s := range lw.state.Lock() { + firstBN, ok := s.firstBlockNum().Get() + if !ok || first <= firstBN { + return nil + } + if first >= s.nextBlockNum { + if err := s.TruncateAll(); err != nil { + return err + } + s.nextBlockNum = first + return nil + } + walIdx := s.FirstIdx() + uint64(first-firstBN) + return s.TruncateBefore(walIdx, func(entry *types.Signed[*types.LaneProposal]) error { + if got := entry.Msg().Block().Header().BlockNumber(); got != first { + return fmt.Errorf("block at WAL index %d has number %d, expected %d", walIdx, got, first) + } + return nil + }) + } + panic("unreachable") + }) + } + return nil + }) + } + panic("unreachable") +} + func TestNewBlockPersisterEmptyDir(t *testing.T) { dir := t.TempDir() bp, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) require.NotNil(t, bp) require.Equal(t, 0, len(blocks)) - // blocks/ subdirectory should exist - fi, err := os.Stat(filepath.Join(dir, "blocks")) + fi, err := os.Stat(filepath.Join(dir, blocksDir)) require.NoError(t, err) require.True(t, fi.IsDir()) + require.NoError(t, bp.close()) } func TestPersistBlockAndLoad(t *testing.T) { @@ -42,6 +83,7 @@ func TestPersistBlockAndLoad(t *testing.T) { b1 := testSignedProposal(rng, key, 1) require.NoError(t, bp.PersistBlock(b0)) require.NoError(t, bp.PersistBlock(b1)) + require.NoError(t, bp.close()) bp2, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) @@ -52,6 +94,7 @@ func TestPersistBlockAndLoad(t *testing.T) { require.Equal(t, types.BlockNumber(1), blocks[lane][1].Number) require.NoError(t, utils.TestDiff(b0, blocks[lane][0].Proposal)) require.NoError(t, utils.TestDiff(b1, blocks[lane][1].Proposal)) + require.NoError(t, bp2.close()) } func TestPersistBlockMultipleLanes(t *testing.T) { @@ -69,6 +112,7 @@ func TestPersistBlockMultipleLanes(t *testing.T) { b2 := testSignedProposal(rng, key2, 0) require.NoError(t, bp.PersistBlock(b1)) require.NoError(t, bp.PersistBlock(b2)) + require.NoError(t, bp.close()) _, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) @@ -79,7 +123,7 @@ func TestPersistBlockMultipleLanes(t *testing.T) { require.NoError(t, utils.TestDiff(b2, blocks[lane2][0].Proposal)) } -func TestLoadSkipsCorruptBlockFile(t *testing.T) { +func TestDeleteBeforeRemovesOldKeepsNew(t *testing.T) { rng := utils.TestRng() dir := t.TempDir() @@ -88,46 +132,77 @@ func TestLoadSkipsCorruptBlockFile(t *testing.T) { bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - // Write a good block - b0 := testSignedProposal(rng, key, 0) - require.NoError(t, bp.PersistBlock(b0)) + for i := types.BlockNumber(0); i < 5; i++ { + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, i))) + } - // Write a corrupt file with a valid filename - corruptName := blockFilename(lane, 1) - require.NoError(t, os.WriteFile(filepath.Join(dir, "blocks", corruptName), []byte("corrupt"), 0600)) + require.NoError(t, testDeleteBefore(bp, map[types.LaneID]types.BlockNumber{lane: 3})) + require.NoError(t, bp.close()) _, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 1, len(blocks[lane]), "should only load the valid block") - require.NoError(t, utils.TestDiff(b0, blocks[lane][0].Proposal)) + require.Equal(t, 2, len(blocks[lane]), "should have blocks 3 and 4") + require.Equal(t, types.BlockNumber(3), blocks[lane][0].Number) + require.Equal(t, types.BlockNumber(4), blocks[lane][1].Number) } -func TestLoadCorruptMidSequenceCreatesGap(t *testing.T) { +func TestDeleteBeforeAndRestart(t *testing.T) { rng := utils.TestRng() dir := t.TempDir() - key := types.GenSecretKey(rng) - lane := key.Public() + key1 := types.GenSecretKey(rng) + key2 := types.GenSecretKey(rng) + key3 := types.GenSecretKey(rng) + lane1 := key1.Public() + lane2 := key2.Public() + lane3 := key3.Public() // never persisted — exercises the "no WAL yet" path bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - // Persist blocks 0, 2 (valid) and corrupt block 1. - // After skipping corrupt-1, raw has {0, 2} → returned with gap. - b0 := testSignedProposal(rng, key, 0) - b2 := testSignedProposal(rng, key, 2) - require.NoError(t, bp.PersistBlock(b0)) - require.NoError(t, bp.PersistBlock(b2)) - corruptName := blockFilename(lane, 1) - require.NoError(t, os.WriteFile(filepath.Join(dir, "blocks", corruptName), []byte("corrupt"), 0600)) + for i := types.BlockNumber(0); i < 3; i++ { + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key1, i))) + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key2, i))) + } - _, blocks, err := NewBlockPersister(utils.Some(dir)) + // lane1: truncate old blocks, lane2: delete nothing (first=0), lane3: empty (no WAL). + require.NoError(t, testDeleteBefore(bp, map[types.LaneID]types.BlockNumber{lane1: 2, lane2: 0, lane3: 0})) + require.NoError(t, bp.close()) + + // Restart — verify varied lane states load correctly. + bp2, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 2, len(blocks[lane]), "corrupt skipped; both valid blocks returned") - require.Equal(t, types.BlockNumber(0), blocks[lane][0].Number) - require.Equal(t, types.BlockNumber(2), blocks[lane][1].Number) + require.Equal(t, 1, len(blocks[lane1]), "lane1 should have block 2") + require.Equal(t, types.BlockNumber(2), blocks[lane1][0].Number) + require.Equal(t, 3, len(blocks[lane2]), "lane2 should have all 3 blocks") + require.Equal(t, 0, len(blocks[lane3]), "lane3 never had blocks") + + // Persist more after restart, then restart again to verify continuity. + require.NoError(t, bp2.PersistBlock(testSignedProposal(rng, key1, 3))) + require.NoError(t, bp2.PersistBlock(testSignedProposal(rng, key2, 3))) + require.NoError(t, bp2.close()) + + _, blocks2, err := NewBlockPersister(utils.Some(dir)) + require.NoError(t, err) + require.Equal(t, 2, len(blocks2[lane1]), "lane1 should have blocks 2,3") + require.Equal(t, types.BlockNumber(3), blocks2[lane1][1].Number) + require.Equal(t, 4, len(blocks2[lane2]), "lane2 should have blocks 0..3") + require.Equal(t, types.BlockNumber(3), blocks2[lane2][3].Number) } -func TestLoadReturnsAllWithGap(t *testing.T) { +func TestNoOpBlockPersister(t *testing.T) { + bp, blocks, err := NewBlockPersister(utils.None[string]()) + require.NoError(t, err) + require.NotNil(t, bp) + require.Equal(t, 0, len(blocks)) + + rng := utils.TestRng() + key := types.GenSecretKey(rng) + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, 0))) + require.NoError(t, bp.DeleteBefore(nil)) // no-op persister returns early + require.NoError(t, bp.close()) +} + +func TestDeleteBeforeThenPersistMore(t *testing.T) { rng := utils.TestRng() dir := t.TempDir() @@ -136,176 +211,250 @@ func TestLoadReturnsAllWithGap(t *testing.T) { bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - // Persist blocks 3, 4, 6, 7 (gap at 5). All four returned sorted. - for _, n := range []types.BlockNumber{3, 4, 6, 7} { - require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, n))) + // Persist 0..4, delete before 3, then persist 5. + for i := types.BlockNumber(0); i < 5; i++ { + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, i))) } + require.NoError(t, testDeleteBefore(bp, map[types.LaneID]types.BlockNumber{lane: 3})) + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, 5))) + require.NoError(t, bp.close()) _, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 4, len(blocks[lane]), "should return all valid files including after gap") + require.Equal(t, 3, len(blocks[lane]), "should have blocks 3, 4, 5") require.Equal(t, types.BlockNumber(3), blocks[lane][0].Number) require.Equal(t, types.BlockNumber(4), blocks[lane][1].Number) - require.Equal(t, types.BlockNumber(6), blocks[lane][2].Number) - require.Equal(t, types.BlockNumber(7), blocks[lane][3].Number) + require.Equal(t, types.BlockNumber(5), blocks[lane][2].Number) } -func TestLoadSkipsMismatchedHeader(t *testing.T) { +func TestDeleteBeforePastAllBlocks(t *testing.T) { rng := utils.TestRng() dir := t.TempDir() - key1 := types.GenSecretKey(rng) - key2 := types.GenSecretKey(rng) - lane1 := key1.Public() - lane2 := key2.Public() + key := types.GenSecretKey(rng) + lane := key.Public() bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - // Write block for lane1 but save it under lane2's filename - b := testSignedProposal(rng, key1, 5) - require.NoError(t, bp.PersistBlock(b)) + for i := types.BlockNumber(0); i < 3; i++ { + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, i))) + } + + // Anchor advanced past everything (nextBlockNum is 3, first=10). + require.NoError(t, testDeleteBefore(bp, map[types.LaneID]types.BlockNumber{lane: 10})) - // Rename the file to use lane2 in the filename - oldPath := filepath.Join(dir, "blocks", blockFilename(lane1, 5)) - newPath := filepath.Join(dir, "blocks", blockFilename(lane2, 5)) - require.NoError(t, os.Rename(oldPath, newPath)) + // Lane WAL is now empty; new writes starting from 10 should work. + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, 10))) + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, 11))) + require.NoError(t, bp.close()) - // Reload — should skip the mismatched file + // Reopen — should see only the post-TruncateAll blocks. _, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 0, len(blocks), "mismatched header should be skipped") + require.Equal(t, 2, len(blocks[lane])) + require.Equal(t, types.BlockNumber(10), blocks[lane][0].Number) + require.Equal(t, types.BlockNumber(11), blocks[lane][1].Number) } -func TestLoadSkipsUnrecognizedFilename(t *testing.T) { +func TestDeleteBeforePastAllRejectsStaleBlock(t *testing.T) { + rng := utils.TestRng() dir := t.TempDir() + key := types.GenSecretKey(rng) + lane := key.Public() bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - _ = bp - // Write files with bad names - blocksDir := filepath.Join(dir, "blocks") - require.NoError(t, os.WriteFile(filepath.Join(blocksDir, "notablock.pb"), []byte("data"), 0600)) - require.NoError(t, os.WriteFile(filepath.Join(blocksDir, "readme.txt"), []byte("hi"), 0600)) + for i := types.BlockNumber(0); i < 3; i++ { + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, i))) + } - // Reload — should skip both - _, blocks, err := NewBlockPersister(utils.Some(dir)) - require.NoError(t, err) - require.Equal(t, 0, len(blocks)) + // Anchor advanced past everything; nextBlockNum re-anchored to 10. + require.NoError(t, testDeleteBefore(bp, map[types.LaneID]types.BlockNumber{lane: 10})) + + // Writing a stale block number (0) should be rejected. + err = bp.PersistBlock(testSignedProposal(rng, key, 0)) + require.Error(t, err) + require.Contains(t, err.Error(), "out of sequence") + + // Writing at the correct anchor (10) should succeed. + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, 10))) + require.NoError(t, bp.close()) } -func TestDeleteBeforeRemovesOldKeepsNew(t *testing.T) { +func TestEmptyLaneWALSurvivesReopen(t *testing.T) { rng := utils.TestRng() dir := t.TempDir() key := types.GenSecretKey(rng) lane := key.Public() - bp, _, err := NewBlockPersister(utils.Some(dir)) - require.NoError(t, err) - // Persist blocks 0..4 - for i := types.BlockNumber(0); i < 5; i++ { - require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, i))) - } + // Simulate a crash after lazy lane directory creation but before any write: + // create the lane subdirectory so NewBlockPersister discovers it on open. + bd := filepath.Join(dir, blocksDir) + require.NoError(t, os.MkdirAll(filepath.Join(bd, laneDir(lane)), 0700)) - // Delete blocks before 3 - require.NoError(t, bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane: 3})) + // Reopen — empty lane WAL should be loaded and usable. + bp, blocks, err := NewBlockPersister(utils.Some(dir)) + require.NoError(t, err) + require.Equal(t, 0, len(blocks[lane]), "no blocks loaded") - _, blocks, err := NewBlockPersister(utils.Some(dir)) + // Persist a new block into the lane without needing lazy creation. + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, 0))) + require.NoError(t, bp.close()) + + // Reopen — should see the new block. + _, blocks2, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 2, len(blocks[lane]), "should have blocks 3 and 4") - require.Equal(t, types.BlockNumber(3), blocks[lane][0].Number) - require.Equal(t, types.BlockNumber(4), blocks[lane][1].Number) + require.Equal(t, 1, len(blocks2[lane])) + require.Equal(t, types.BlockNumber(0), blocks2[lane][0].Number) } -func TestDeleteBeforeMultipleLanes(t *testing.T) { - rng := utils.TestRng() +func TestNewBlockPersisterSkipsNonHexDir(t *testing.T) { dir := t.TempDir() + bd := filepath.Join(dir, blocksDir) + require.NoError(t, os.MkdirAll(bd, 0700)) - key1 := types.GenSecretKey(rng) - key2 := types.GenSecretKey(rng) - lane1 := key1.Public() - lane2 := key2.Public() - bp, _, err := NewBlockPersister(utils.Some(dir)) + // Create a non-hex directory and a regular file — both should be skipped. + require.NoError(t, os.Mkdir(filepath.Join(bd, "not-valid-hex"), 0700)) + require.NoError(t, os.WriteFile(filepath.Join(bd, "stray-file.txt"), []byte("hi"), 0600)) + + bp, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) + require.Equal(t, 0, len(blocks)) + require.NoError(t, bp.close()) +} - // Lane1: blocks 0,1,2; Lane2: blocks 0,1,2 - for i := types.BlockNumber(0); i < 3; i++ { - require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key1, i))) - require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key2, i))) - } +func TestNewBlockPersisterSkipsInvalidKeyDir(t *testing.T) { + dir := t.TempDir() + bd := filepath.Join(dir, blocksDir) + require.NoError(t, os.MkdirAll(bd, 0700)) - // Delete lane1 < 2, lane2 < 1 - require.NoError(t, bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane1: 2, lane2: 1})) + // Valid hex but too short to be a valid ed25519 public key. + require.NoError(t, os.Mkdir(filepath.Join(bd, "abcd"), 0700)) - _, blocks, err := NewBlockPersister(utils.Some(dir)) + bp, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 1, len(blocks[lane1]), "lane1 should have block 2") - require.Equal(t, types.BlockNumber(2), blocks[lane1][0].Number) - require.Equal(t, 2, len(blocks[lane2]), "lane2 should have blocks 1,2") - require.Equal(t, types.BlockNumber(1), blocks[lane2][0].Number) - require.Equal(t, types.BlockNumber(2), blocks[lane2][1].Number) + require.Equal(t, 0, len(blocks)) + require.NoError(t, bp.close()) } -func TestDeleteBeforeEmptyMap(t *testing.T) { +func TestPersistBlockOutOfSequence(t *testing.T) { rng := utils.TestRng() dir := t.TempDir() key := types.GenSecretKey(rng) - lane := key.Public() bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, 0))) - // Empty map — should not delete anything - require.NoError(t, bp.DeleteBefore(map[types.LaneID]types.BlockNumber{})) + // Gap: skip block 1, try block 2. + err = bp.PersistBlock(testSignedProposal(rng, key, 2)) + require.Error(t, err) + require.Contains(t, err.Error(), "out of sequence") - _, blocks, err := NewBlockPersister(utils.Some(dir)) - require.NoError(t, err) - require.Equal(t, 1, len(blocks[lane])) + // Duplicate: try block 0 again. + err = bp.PersistBlock(testSignedProposal(rng, key, 0)) + require.Error(t, err) + require.Contains(t, err.Error(), "out of sequence") + + require.NoError(t, bp.close()) } -func TestDeleteBeforeRemovesOrphanedLanes(t *testing.T) { +func TestLoadAllDetectsBlockGap(t *testing.T) { rng := utils.TestRng() dir := t.TempDir() + key := types.GenSecretKey(rng) + lane := key.Public() - key1 := types.GenSecretKey(rng) - lane1 := key1.Public() - key2 := types.GenSecretKey(rng) - lane2 := key2.Public() + // Write directly to a lane WAL, bypassing PersistBlock's contiguity check + // to simulate on-disk corruption (block 0 then block 2, skipping 1). + ld := filepath.Join(dir, blocksDir, laneDir(lane)) + require.NoError(t, os.MkdirAll(ld, 0700)) + s, err := newLaneWALState(ld) + require.NoError(t, err) + require.NoError(t, s.Write(testSignedProposal(rng, key, 0))) + require.NoError(t, s.Write(testSignedProposal(rng, key, 2))) + require.NoError(t, s.Close()) + + _, _, err = NewBlockPersister(utils.Some(dir)) + require.Error(t, err) + require.Contains(t, err.Error(), "gap") +} + +func TestPersistBlockAutoCreatesLane(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - // Persist blocks on both lanes. - for n := types.BlockNumber(0); n < 3; n++ { - require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key1, n))) - require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key2, n))) - } + entries, _ := os.ReadDir(filepath.Join(dir, blocksDir)) + require.Equal(t, 0, len(entries)) + + key := types.GenSecretKey(rng) + lane := key.Public() + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, 0))) + + entries, _ = os.ReadDir(filepath.Join(dir, blocksDir)) + require.Equal(t, 1, len(entries), "should have 1 lane directory") - // Only lane1 is in the current committee; lane2 is orphaned. - require.NoError(t, bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane1: 1})) + require.NoError(t, bp.close()) _, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - - // lane1: block 0 deleted, blocks 1-2 kept. - require.Equal(t, 2, len(blocks[lane1])) - require.Equal(t, types.BlockNumber(1), blocks[lane1][0].Number) - - // lane2: all blocks deleted (orphaned lane). - require.Equal(t, 0, len(blocks[lane2])) + require.Equal(t, 1, len(blocks[lane])) + require.Equal(t, types.BlockNumber(0), blocks[lane][0].Number) } -func TestBlockFilenameRoundTrip(t *testing.T) { +func TestPersistBlockConcurrentDistinctLanes(t *testing.T) { rng := utils.TestRng() - lane := types.GenSecretKey(rng).Public() - n := types.BlockNumber(42) + dir := t.TempDir() + const numLanes = 8 + const blocksPerLane = 20 - name := blockFilename(lane, n) - parsedLane, parsedN, err := parseBlockFilename(name) + bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, hex.EncodeToString(lane.Bytes()), hex.EncodeToString(parsedLane.Bytes())) - require.Equal(t, n, parsedN) + + keys := make([]types.SecretKey, numLanes) + for i := range numLanes { + keys[i] = types.GenSecretKey(rng) + } + + // Each lane prepares its proposals up front (rng is not thread-safe). + proposals := make([][]*types.Signed[*types.LaneProposal], numLanes) + for i := range numLanes { + proposals[i] = make([]*types.Signed[*types.LaneProposal], blocksPerLane) + for j := range blocksPerLane { + proposals[i][j] = testSignedProposal(rng, keys[i], types.BlockNumber(j)) + } + } + + require.NoError(t, scope.Parallel(func(ps scope.ParallelScope) error { + for i := range numLanes { + ps.Spawn(func() error { + for _, p := range proposals[i] { + if err := bp.PersistBlock(p); err != nil { + return err + } + } + return nil + }) + } + return nil + })) + + require.NoError(t, bp.close()) + + _, blocks, err := NewBlockPersister(utils.Some(dir)) + require.NoError(t, err) + require.Equal(t, numLanes, len(blocks)) + for i := range numLanes { + lane := keys[i].Public() + require.Equal(t, blocksPerLane, len(blocks[lane])) + for j := range blocksPerLane { + require.Equal(t, types.BlockNumber(j), blocks[lane][j].Number) + } + } } diff --git a/sei-tendermint/internal/autobahn/consensus/persist/commitqcs.go b/sei-tendermint/internal/autobahn/consensus/persist/commitqcs.go index 4ade5e712c..3e4e652b81 100644 --- a/sei-tendermint/internal/autobahn/consensus/persist/commitqcs.go +++ b/sei-tendermint/internal/autobahn/consensus/persist/commitqcs.go @@ -1,202 +1,185 @@ -// TODO: CommitQC file persistence is a temporary solution that will be replaced -// by the same WAL (Write-Ahead Log) library as block persistence (see blocks.go). -// With a WAL, atomic appends eliminate corrupt file handling, per-file -// naming/parsing, directory scanning, and DeleteBefore cleanup -// (WAL replay is always contiguous). - +// TODO: add Prometheus metrics for commitQCs written and truncated. package persist import ( "fmt" - "maps" - "os" "path/filepath" - "slices" - "strconv" - "strings" - - "log/slog" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" ) +const commitqcsDir = "commitqcs" + // LoadedCommitQC is a CommitQC loaded from disk during state restoration. type LoadedCommitQC struct { Index types.RoadIndex QC *types.CommitQC } -// CommitQCPersister manages individual CommitQC files in a commitqcs/ subdirectory. -// Each CommitQC is stored as .pb. -// The caller is responsible for driving persistence (typically a goroutine that -// watches in-memory state and calls PersistCommitQC / DeleteBefore). -// When noop is true, all disk I/O is skipped but cursor tracking still works. -type CommitQCPersister struct { - dir string // full path to the commitqcs/ subdirectory; empty when noop - noop bool +// commitQCState is the mutable state protected by CommitQCPersister's mutex. +type commitQCState struct { + iw utils.Option[*indexedWAL[*types.CommitQC]] next types.RoadIndex } -// newNoOpCommitQCPersister returns a CommitQCPersister that skips all disk I/O -// but still tracks the next index. Used when persistence is disabled. -func newNoOpCommitQCPersister() *CommitQCPersister { - return &CommitQCPersister{noop: true} +// CommitQCPersister manages CommitQC persistence using a WAL. +// Entries are appended in order; each entry is self-describing (the serialized +// CommitQC contains its RoadIndex). The WAL index is append order, not +// RoadIndex — the indexedWAL tracks first/next indices to enable truncation. +// When iw is None, all disk I/O is skipped but cursor tracking still works. +// +// All public methods are safe for concurrent use. +type CommitQCPersister struct { + state utils.Mutex[*commitQCState] } -// NewCommitQCPersister creates the commitqcs/ subdirectory if it doesn't exist -// and returns a persister. Loads all persisted CommitQCs from disk as a sorted -// slice. Corrupt files are skipped; the caller (newInner) returns an error if -// the resulting slice is non-contiguous. -// When stateDir is None, returns a no-op persister that skips all disk I/O. +// NewCommitQCPersister opens (or creates) a WAL in the commitqcs/ subdirectory +// and replays all persisted entries. Returns the persister and a sorted slice of +// loaded CommitQCs. Corrupt tail entries are auto-truncated by the WAL library. +// When stateDir is None, returns a no-op persister. +// +// After crash recovery with an empty WAL (e.g. TruncateAll completed but no +// new write followed), LoadNext() returns 0. The caller MUST call +// DeleteBefore(anchor, anchorQC) to re-establish the cursor and re-persist +// the anchor's CommitQC before calling PersistCommitQC. func NewCommitQCPersister(stateDir utils.Option[string]) (*CommitQCPersister, []LoadedCommitQC, error) { sd, ok := stateDir.Get() if !ok { - return newNoOpCommitQCPersister(), nil, nil + return &CommitQCPersister{state: utils.NewMutex(&commitQCState{})}, nil, nil } - dir := filepath.Join(sd, "commitqcs") - if err := os.MkdirAll(dir, 0700); err != nil { - return nil, nil, fmt.Errorf("create commitqcs dir %s: %w", dir, err) + dir := filepath.Join(sd, commitqcsDir) + iw, err := openIndexedWAL(dir, types.CommitQCConv) + if err != nil { + return nil, nil, fmt.Errorf("open commitqc WAL in %s: %w", dir, err) } - cp := &CommitQCPersister{dir: dir} - loaded, err := cp.loadAll() + s := &commitQCState{iw: utils.Some(iw)} + loaded, err := loadAllCommitQCs(s) if err != nil { + _ = iw.Close() return nil, nil, err } if len(loaded) > 0 { - cp.next = loaded[len(loaded)-1].Index + 1 + s.next = loaded[len(loaded)-1].Index + 1 } - return cp, loaded, nil + return &CommitQCPersister{state: utils.NewMutex(s)}, loaded, nil } // LoadNext returns the road index of the first CommitQC that has not been // persisted (exclusive upper bound of what's on disk). func (cp *CommitQCPersister) LoadNext() types.RoadIndex { - return cp.next -} - -// ResetNext overrides the next-to-persist cursor. Called after newInner -// applies prune(), which may advance commitQCs.next beyond the raw loader's -// cursor. Without this, PersistCommitQC would reject valid new QCs as -// "already persisted". -func (cp *CommitQCPersister) ResetNext(idx types.RoadIndex) { - cp.next = idx -} - -func commitQCFilename(idx types.RoadIndex) string { - return strconv.FormatUint(uint64(idx), 10) + ".pb" -} - -func parseCommitQCFilename(name string) (types.RoadIndex, error) { - name = strings.TrimSuffix(name, ".pb") - n, err := strconv.ParseUint(name, 10, 64) - if err != nil { - return 0, fmt.Errorf("bad commitqc filename %q: %w", name, err) + for s := range cp.state.Lock() { + return s.next } - return types.RoadIndex(n), nil + panic("unreachable") } -// PersistCommitQC writes a CommitQC to its own file. -// The caller must persist CommitQCs in order; idx < cp.next is a bug. +// PersistCommitQC writes a CommitQC to the WAL. +// Entries must be persisted in sequential order. Duplicates (idx < next) are +// silently ignored — this makes startup idempotent after DeleteBefore truncates +// the WAL. Gaps (idx > next) return an error because they break the linear +// RoadIndex-to-WAL-index mapping that DeleteBefore relies on. func (cp *CommitQCPersister) PersistCommitQC(qc *types.CommitQC) error { - idx := qc.Index() - if idx < cp.next { - return fmt.Errorf("commitqc %d already persisted (next=%d)", idx, cp.next) + for s := range cp.state.Lock() { + return persistCommitQC(s, qc) } - if !cp.noop { - data := types.CommitQCConv.Marshal(qc) - path := filepath.Join(cp.dir, commitQCFilename(idx)) - if err := writeAndSync(path, data); err != nil { - return fmt.Errorf("persist commitqc %d: %w", idx, err) - } - } - cp.next = idx + 1 - return nil + panic("unreachable") } -// DeleteBefore removes persisted CommitQC files with road index below idx. -// Returns an error if the directory cannot be read; individual file removal -// failures are logged but do not cause an error. -func (cp *CommitQCPersister) DeleteBefore(idx types.RoadIndex) error { - if cp.noop || idx == 0 { +// persistCommitQC is the core persist logic. Caller must hold the lock. +func persistCommitQC(s *commitQCState, qc *types.CommitQC) error { + idx := qc.Index() + if idx < s.next { return nil } - entries, err := os.ReadDir(cp.dir) - if err != nil { - return fmt.Errorf("list commitqcs dir for cleanup: %w", err) + if idx > s.next { + return fmt.Errorf("commitqc %d out of sequence (next=%d)", idx, s.next) } - for _, entry := range entries { - if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pb") { - continue - } - fileIdx, err := parseCommitQCFilename(entry.Name()) - if err != nil { - continue - } - if fileIdx >= idx { - continue - } - path := filepath.Join(cp.dir, entry.Name()) - if err := os.Remove(path); err != nil && !os.IsNotExist(err) { - logger.Warn("failed to delete commitqc file", "path", path, "err", err) + if iw, ok := s.iw.Get(); ok { + if err := iw.Write(qc); err != nil { + return fmt.Errorf("persist commitqc %d: %w", idx, err) } - } + } // else: no-op persister (persistence disabled); cursor still advances. + s.next = idx + 1 return nil } -// loadAll loads all persisted CommitQCs from the commitqcs/ directory. -// Returns a sorted slice of all valid files. Corrupt or mismatched files -// are skipped; the caller (newInner) returns an error on gaps. -func (cp *CommitQCPersister) loadAll() ([]LoadedCommitQC, error) { - entries, err := os.ReadDir(cp.dir) - if err != nil { - return nil, fmt.Errorf("read commitqcs dir %s: %w", cp.dir, err) - } - - raw := map[types.RoadIndex]*types.CommitQC{} - for _, entry := range entries { - if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pb") { - continue - } - idx, err := parseCommitQCFilename(entry.Name()) - if err != nil { - logger.Warn("skipping unrecognized commitqc file", "file", entry.Name(), "err", err) - continue +// DeleteBefore removes persisted CommitQCs with road index below idx +// by truncating the front of the WAL. When idx is at or past every +// persisted entry, the WAL is truncated and the write cursor is advanced +// to idx so subsequent PersistCommitQC calls start from the right place. +// The mapping from RoadIndex to WAL index is linear: entries are written +// sequentially, so WAL index = firstIdx + (roadIndex - firstRoadIndex). +// +// If anchor is present it is re-persisted after truncation. This handles +// crash recovery: if a previous TruncateAll cleared the WAL but the +// process crashed before writing new entries, the anchor's CommitQC would +// otherwise be lost. In the normal case the re-persist is a no-op +// (idx < next, already on disk). +func (cp *CommitQCPersister) DeleteBefore(idx types.RoadIndex, anchor utils.Option[*types.CommitQC]) error { + for s := range cp.state.Lock() { + iw, ok := s.iw.Get() + if !ok || idx == 0 { + return nil // no-op persister (persistence disabled), or nothing to prune } - qc, err := loadCommitQCFile(filepath.Join(cp.dir, entry.Name())) - if err != nil { - logger.Warn("skipping corrupt commitqc file", "file", entry.Name(), "err", err) - continue + if idx >= s.next { + s.next = idx + if iw.Count() > 0 { + if err := iw.TruncateAll(); err != nil { + return err + } + } + } else if iw.Count() > 0 { + firstRoadIndex := s.next - types.RoadIndex(iw.Count()) + if idx > firstRoadIndex { + walIdx := iw.FirstIdx() + uint64(idx-firstRoadIndex) + if err := iw.TruncateBefore(walIdx, func(entry *types.CommitQC) error { + if entry.Index() != idx { + return fmt.Errorf("commitqc at WAL index %d has road index %d, expected %d (index mapping broken)", walIdx, entry.Index(), idx) + } + return nil + }); err != nil { + return fmt.Errorf("truncate commitqc WAL before %d: %w", walIdx, err) + } + } } - if qc.Index() != idx { - logger.Warn("skipping commitqc file with mismatched index", - "file", entry.Name(), - slog.Uint64("headerIdx", uint64(qc.Index())), - slog.Uint64("filenameIdx", uint64(idx)), - ) - continue + if qc, ok := anchor.Get(); ok { + return persistCommitQC(s, qc) } - raw[idx] = qc - logger.Info("loaded persisted commitqc", slog.Uint64("roadIndex", uint64(idx))) - } - - if len(raw) == 0 { - return nil, nil + return nil } + panic("unreachable") +} - sorted := slices.Sorted(maps.Keys(raw)) - result := make([]LoadedCommitQC, 0, len(sorted)) - for _, idx := range sorted { - result = append(result, LoadedCommitQC{Index: idx, QC: raw[idx]}) +// Close shuts down the WAL. Safe to call multiple times (idempotent). +func (cp *CommitQCPersister) Close() error { + for s := range cp.state.Lock() { + iw, ok := s.iw.Get() + if !ok { + return nil // no-op persister or already closed + } + s.iw = utils.None[*indexedWAL[*types.CommitQC]]() + return iw.Close() } - return result, nil + panic("unreachable") } -func loadCommitQCFile(path string) (*types.CommitQC, error) { - data, err := os.ReadFile(path) //nolint:gosec // path is constructed from operator-configured stateDir + hardcoded filename; not user-controlled +func loadAllCommitQCs(s *commitQCState) ([]LoadedCommitQC, error) { + iw, ok := s.iw.Get() + if !ok { + return nil, nil // no-op persister (persistence disabled) + } + entries, err := iw.ReadAll() if err != nil { return nil, err } - return types.CommitQCConv.Unmarshal(data) + loaded := make([]LoadedCommitQC, 0, len(entries)) + for i, qc := range entries { + if i > 0 && qc.Index() != loaded[i-1].Index+1 { + return nil, fmt.Errorf("gap in commitqcs: index %d follows %d", qc.Index(), loaded[i-1].Index) + } + loaded = append(loaded, LoadedCommitQC{Index: qc.Index(), QC: qc}) + } + return loaded, nil } diff --git a/sei-tendermint/internal/autobahn/consensus/persist/commitqcs_test.go b/sei-tendermint/internal/autobahn/consensus/persist/commitqcs_test.go index dccaa341fc..f8a716ac5e 100644 --- a/sei-tendermint/internal/autobahn/consensus/persist/commitqcs_test.go +++ b/sei-tendermint/internal/autobahn/consensus/persist/commitqcs_test.go @@ -11,8 +11,9 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/require" ) +var noQC = utils.None[*types.CommitQC]() + func testCommitQC( - rng utils.Rng, committee *types.Committee, keys []types.SecretKey, prev utils.Option[*types.CommitQC], @@ -45,7 +46,6 @@ func testCommitQC( } func makeSequentialCommitQCs( - rng utils.Rng, committee *types.Committee, keys []types.SecretKey, count int, @@ -53,7 +53,7 @@ func makeSequentialCommitQCs( var qcs []*types.CommitQC prev := utils.None[*types.CommitQC]() for range count { - qc := testCommitQC(rng, committee, keys, prev, nil, utils.None[*types.AppQC]()) + qc := testCommitQC(committee, keys, prev, nil, utils.None[*types.AppQC]()) qcs = append(qcs, qc) prev = utils.Some(qc) } @@ -68,9 +68,10 @@ func TestNewCommitQCPersisterEmptyDir(t *testing.T) { require.Equal(t, 0, len(loaded)) require.Equal(t, types.RoadIndex(0), cp.LoadNext()) - fi, err := os.Stat(filepath.Join(dir, "commitqcs")) + fi, err := os.Stat(filepath.Join(dir, commitqcsDir)) require.NoError(t, err) require.True(t, fi.IsDir()) + require.NoError(t, cp.Close()) } func TestPersistCommitQCAndLoad(t *testing.T) { @@ -78,7 +79,7 @@ func TestPersistCommitQCAndLoad(t *testing.T) { committee, keys := types.GenCommittee(rng, 4) dir := t.TempDir() - qcs := makeSequentialCommitQCs(rng, committee, keys, 3) + qcs := makeSequentialCommitQCs(committee, keys, 3) cp, _, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) @@ -87,6 +88,7 @@ func TestPersistCommitQCAndLoad(t *testing.T) { require.NoError(t, cp.PersistCommitQC(qc)) } require.Equal(t, types.RoadIndex(3), cp.LoadNext()) + require.NoError(t, cp.Close()) cp2, loaded, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) @@ -97,153 +99,315 @@ func TestPersistCommitQCAndLoad(t *testing.T) { require.NoError(t, utils.TestDiff(qcs[i], lqc.QC)) } require.Equal(t, types.RoadIndex(3), cp2.LoadNext()) + require.NoError(t, cp2.Close()) } -func TestLoadSkipsCorruptCommitQCFile(t *testing.T) { +func TestCommitQCDeleteBeforeRemovesOldKeepsNew(t *testing.T) { rng := utils.TestRng() committee, keys := types.GenCommittee(rng, 4) dir := t.TempDir() - qcs := makeSequentialCommitQCs(rng, committee, keys, 1) + qcs := makeSequentialCommitQCs(committee, keys, 5) cp, _, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - require.NoError(t, cp.PersistCommitQC(qcs[0])) + for _, qc := range qcs { + require.NoError(t, cp.PersistCommitQC(qc)) + } - // Write a corrupt file for index 1 - corruptPath := filepath.Join(dir, "commitqcs", commitQCFilename(1)) - require.NoError(t, os.WriteFile(corruptPath, []byte("corrupt"), 0600)) + require.NoError(t, cp.DeleteBefore(3, noQC)) + require.NoError(t, cp.Close()) _, loaded, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 1, len(loaded), "should only load valid commitqc") - require.NoError(t, utils.TestDiff(qcs[0], loaded[0].QC)) + require.Equal(t, 2, len(loaded), "should have indices 3 and 4") + require.Equal(t, types.RoadIndex(3), loaded[0].Index) + require.Equal(t, types.RoadIndex(4), loaded[1].Index) } -func TestLoadCommitQCReturnsAllWithGap(t *testing.T) { +func TestCommitQCDeleteBeforeZero(t *testing.T) { rng := utils.TestRng() committee, keys := types.GenCommittee(rng, 4) dir := t.TempDir() - qcs := makeSequentialCommitQCs(rng, committee, keys, 4) + qcs := makeSequentialCommitQCs(committee, keys, 2) cp, _, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) + for _, qc := range qcs { + require.NoError(t, cp.PersistCommitQC(qc)) + } - // Persist 0, 1, skip 2, persist 3 → gap at 2, all three returned sorted. - require.NoError(t, cp.PersistCommitQC(qcs[0])) - require.NoError(t, cp.PersistCommitQC(qcs[1])) - require.NoError(t, cp.PersistCommitQC(qcs[3])) + require.NoError(t, cp.DeleteBefore(0, noQC)) + require.NoError(t, cp.Close()) - cp2, loaded, err := NewCommitQCPersister(utils.Some(dir)) + _, loaded, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 3, len(loaded), "should return all valid files including after gap") - require.Equal(t, types.RoadIndex(0), loaded[0].Index) - require.Equal(t, types.RoadIndex(1), loaded[1].Index) - require.Equal(t, types.RoadIndex(3), loaded[2].Index) - require.Equal(t, types.RoadIndex(4), cp2.LoadNext(), "next should be max index + 1") + require.Equal(t, 2, len(loaded)) } -func TestLoadCommitQCCorruptMidSequenceCreatesGap(t *testing.T) { +func TestCommitQCPersistDuplicateIsNoOp(t *testing.T) { rng := utils.TestRng() committee, keys := types.GenCommittee(rng, 4) dir := t.TempDir() - qcs := makeSequentialCommitQCs(rng, committee, keys, 3) + qcs := makeSequentialCommitQCs(committee, keys, 3) cp, _, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - // Persist 0, corrupt 1, persist 2 → corrupt skipped, returns [0, 2] with gap. require.NoError(t, cp.PersistCommitQC(qcs[0])) - require.NoError(t, cp.PersistCommitQC(qcs[2])) - corruptPath := filepath.Join(dir, "commitqcs", commitQCFilename(1)) - require.NoError(t, os.WriteFile(corruptPath, []byte("corrupt"), 0600)) + require.NoError(t, cp.PersistCommitQC(qcs[1])) + // Persisting qcs[0] again is a no-op (idx < next). + require.NoError(t, cp.PersistCommitQC(qcs[0])) + require.Equal(t, types.RoadIndex(2), cp.LoadNext()) + require.NoError(t, cp.Close()) +} - _, loaded, err := NewCommitQCPersister(utils.Some(dir)) +func TestCommitQCPersistGapRejected(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + dir := t.TempDir() + + qcs := makeSequentialCommitQCs(committee, keys, 5) + cp, _, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 2, len(loaded), "corrupt skipped; both valid files returned") - require.Equal(t, types.RoadIndex(0), loaded[0].Index) - require.Equal(t, types.RoadIndex(2), loaded[1].Index) + + require.NoError(t, cp.PersistCommitQC(qcs[0])) + require.NoError(t, cp.PersistCommitQC(qcs[1])) + // Skip qcs[2], try to persist qcs[3] — should fail because idx(3) != next(2). + err = cp.PersistCommitQC(qcs[3]) + require.Error(t, err) + require.Contains(t, err.Error(), "out of sequence") + require.NoError(t, cp.Close()) } -func TestLoadCommitQCSkipsMismatchedIndex(t *testing.T) { +func TestLoadAllDetectsCommitQCGap(t *testing.T) { rng := utils.TestRng() committee, keys := types.GenCommittee(rng, 4) dir := t.TempDir() - qcs := makeSequentialCommitQCs(rng, committee, keys, 2) - cp, _, err := NewCommitQCPersister(utils.Some(dir)) + // Build 3 sequential CommitQCs (indices 0, 1, 2). + qcs := makeSequentialCommitQCs(committee, keys, 3) + + // Write directly to the WAL, bypassing PersistCommitQC's contiguity + // check to simulate on-disk corruption (index 0 then index 2, skipping 1). + walDir := filepath.Join(dir, commitqcsDir) + require.NoError(t, os.MkdirAll(walDir, 0700)) + iw, err := openIndexedWAL(walDir, types.CommitQCConv) require.NoError(t, err) + require.NoError(t, iw.Write(qcs[0])) + require.NoError(t, iw.Write(qcs[2])) + require.NoError(t, iw.Close()) - // Persist qc[0] (index 0) but save it under filename for index 5 + _, _, err = NewCommitQCPersister(utils.Some(dir)) + require.Error(t, err) + require.Contains(t, err.Error(), "gap") +} + +func TestNoOpCommitQCPersister(t *testing.T) { + cp, loaded, err := NewCommitQCPersister(utils.None[string]()) + require.NoError(t, err) + require.NotNil(t, cp) + require.Equal(t, 0, len(loaded)) + + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + qcs := makeSequentialCommitQCs(committee, keys, 1) require.NoError(t, cp.PersistCommitQC(qcs[0])) - oldPath := filepath.Join(dir, "commitqcs", commitQCFilename(0)) - newPath := filepath.Join(dir, "commitqcs", commitQCFilename(5)) - require.NoError(t, os.Rename(oldPath, newPath)) + require.Equal(t, types.RoadIndex(1), cp.LoadNext()) + require.NoError(t, cp.DeleteBefore(0, noQC)) + require.NoError(t, cp.Close()) +} +func TestCommitQCDeleteBeforePastAll(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + dir := t.TempDir() + + qcs := makeSequentialCommitQCs(committee, keys, 3) + cp, _, err := NewCommitQCPersister(utils.Some(dir)) + require.NoError(t, err) + for _, qc := range qcs { + require.NoError(t, cp.PersistCommitQC(qc)) + } + // next is 3; prune past everything. DeleteBefore advances the cursor + // to 10 and truncates the WAL. + require.NoError(t, cp.DeleteBefore(10, noQC)) + require.Equal(t, types.RoadIndex(10), cp.LoadNext()) + + // New writes starting from 10 should work. + moreQCs := makeSequentialCommitQCs(committee, keys, 12) + require.NoError(t, cp.PersistCommitQC(moreQCs[10])) + require.NoError(t, cp.PersistCommitQC(moreQCs[11])) + require.NoError(t, cp.Close()) + + // Reopen — should see only the post-TruncateAll entries. _, loaded, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 0, len(loaded), "mismatched index should be skipped") + require.Equal(t, 2, len(loaded)) + require.Equal(t, types.RoadIndex(10), loaded[0].Index) + require.Equal(t, types.RoadIndex(11), loaded[1].Index) } -func TestLoadCommitQCSkipsUnrecognizedFilename(t *testing.T) { +// TestCommitQCDeleteBeforePastAllCrashRecovery simulates a crash between WAL +// TruncateAll and new write: on restart the WAL is empty but the anchor is far ahead. +// DeleteBefore must still advance the cursor so PersistCommitQC succeeds. +func TestCommitQCDeleteBeforePastAllCrashRecovery(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) dir := t.TempDir() + + qcs := makeSequentialCommitQCs(committee, keys, 3) cp, _, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - _ = cp + for _, qc := range qcs { + require.NoError(t, cp.PersistCommitQC(qc)) + } - qcDir := filepath.Join(dir, "commitqcs") - require.NoError(t, os.WriteFile(filepath.Join(qcDir, "notaqc.pb"), []byte("data"), 0600)) - require.NoError(t, os.WriteFile(filepath.Join(qcDir, "readme.txt"), []byte("hi"), 0600)) + // DeleteBefore truncates the WAL (past all), then "crash" before writing. + require.NoError(t, cp.DeleteBefore(10, noQC)) + require.NoError(t, cp.Close()) // simulate crash — no new QCs written - _, loaded, err := NewCommitQCPersister(utils.Some(dir)) + // Restart: WAL is empty, next will be 0. + cp2, loaded, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 0, len(loaded)) + require.Empty(t, loaded) + require.Equal(t, types.RoadIndex(0), cp2.LoadNext()) + + // Second DeleteBefore on the empty WAL must advance the cursor. + require.NoError(t, cp2.DeleteBefore(10, noQC)) + require.Equal(t, types.RoadIndex(10), cp2.LoadNext()) + + // Writing from index 10 should now succeed. + moreQCs := makeSequentialCommitQCs(committee, keys, 12) + require.NoError(t, cp2.PersistCommitQC(moreQCs[10])) + require.NoError(t, cp2.PersistCommitQC(moreQCs[11])) + require.NoError(t, cp2.Close()) + + _, loaded, err = NewCommitQCPersister(utils.Some(dir)) + require.NoError(t, err) + require.Equal(t, 2, len(loaded)) + require.Equal(t, types.RoadIndex(10), loaded[0].Index) + require.Equal(t, types.RoadIndex(11), loaded[1].Index) } -func TestCommitQCDeleteBeforeRemovesOldKeepsNew(t *testing.T) { +// TestCommitQCDeleteBeforeWithAnchorRecovers verifies that passing an anchor +// QC to DeleteBefore re-persists it after a WAL reset, so the caller doesn't +// need to handle crash recovery separately. +func TestCommitQCDeleteBeforeWithAnchorRecovers(t *testing.T) { rng := utils.TestRng() committee, keys := types.GenCommittee(rng, 4) dir := t.TempDir() - qcs := makeSequentialCommitQCs(rng, committee, keys, 5) + qcs := makeSequentialCommitQCs(committee, keys, 5) cp, _, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) for _, qc := range qcs { require.NoError(t, cp.PersistCommitQC(qc)) } - require.NoError(t, cp.DeleteBefore(3)) + // Truncate past all, then "crash". + require.NoError(t, cp.DeleteBefore(10, noQC)) + require.NoError(t, cp.Close()) + + // Restart: WAL is empty. Pass the anchor QC (index 4) through DeleteBefore. + cp2, loaded, err := NewCommitQCPersister(utils.Some(dir)) + require.NoError(t, err) + require.Empty(t, loaded) + + // DeleteBefore advances cursor to 4, then re-persists qcs[4] via anchor. + require.NoError(t, cp2.DeleteBefore(4, utils.Some(qcs[4]))) + require.Equal(t, types.RoadIndex(5), cp2.LoadNext()) + + // Continue writing from 5. + require.NoError(t, cp2.PersistCommitQC(qcs[4])) // duplicate — no-op + require.Equal(t, types.RoadIndex(5), cp2.LoadNext()) + require.NoError(t, cp2.Close()) + + // Reopen — anchor QC should be on disk. + _, loaded, err = NewCommitQCPersister(utils.Some(dir)) + require.NoError(t, err) + require.Equal(t, 1, len(loaded)) + require.Equal(t, types.RoadIndex(4), loaded[0].Index) + require.NoError(t, utils.TestDiff(qcs[4], loaded[0].QC)) +} + +func TestCommitQCDeleteBeforeThenPersistMore(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + dir := t.TempDir() + + qcs := makeSequentialCommitQCs(committee, keys, 6) + cp, _, err := NewCommitQCPersister(utils.Some(dir)) + require.NoError(t, err) + + // Persist 0..4, delete before 3, then persist 5. + for i := range 5 { + require.NoError(t, cp.PersistCommitQC(qcs[i])) + } + require.NoError(t, cp.DeleteBefore(3, noQC)) + require.NoError(t, cp.PersistCommitQC(qcs[5])) + require.NoError(t, cp.Close()) _, loaded, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 2, len(loaded), "should have indices 3 and 4") + require.Equal(t, 3, len(loaded), "should have indices 3, 4, 5") require.Equal(t, types.RoadIndex(3), loaded[0].Index) require.Equal(t, types.RoadIndex(4), loaded[1].Index) + require.Equal(t, types.RoadIndex(5), loaded[2].Index) } -func TestCommitQCDeleteBeforeZero(t *testing.T) { +func TestCommitQCDeleteBeforeAlreadyPruned(t *testing.T) { rng := utils.TestRng() committee, keys := types.GenCommittee(rng, 4) dir := t.TempDir() - qcs := makeSequentialCommitQCs(rng, committee, keys, 2) + qcs := makeSequentialCommitQCs(committee, keys, 5) cp, _, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) for _, qc := range qcs { require.NoError(t, cp.PersistCommitQC(qc)) } - // idx=0 should be a no-op - require.NoError(t, cp.DeleteBefore(0)) + // Prune up to index 3. + require.NoError(t, cp.DeleteBefore(3, noQC)) + // Pruning at or below the current first should be a no-op. + require.NoError(t, cp.DeleteBefore(2, noQC)) + require.NoError(t, cp.DeleteBefore(3, noQC)) + require.NoError(t, cp.Close()) + + // Verify nothing extra was pruned. _, loaded, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 2, len(loaded)) + require.Equal(t, 2, len(loaded), "should still have indices 3 and 4") + require.Equal(t, types.RoadIndex(3), loaded[0].Index) + require.Equal(t, types.RoadIndex(4), loaded[1].Index) } -func TestCommitQCFilenameRoundTrip(t *testing.T) { - idx := types.RoadIndex(42) - name := commitQCFilename(idx) - parsed, err := parseCommitQCFilename(name) +func TestCommitQCProgressiveDeleteBefore(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + dir := t.TempDir() + + qcs := makeSequentialCommitQCs(committee, keys, 8) + cp, _, err := NewCommitQCPersister(utils.Some(dir)) + require.NoError(t, err) + for _, qc := range qcs { + require.NoError(t, cp.PersistCommitQC(qc)) + } + + // First prune: remove 0, 1. + require.NoError(t, cp.DeleteBefore(2, noQC)) + require.Equal(t, types.RoadIndex(8), cp.LoadNext()) + + // Second prune: remove 2, 3, 4. + require.NoError(t, cp.DeleteBefore(5, noQC)) + require.NoError(t, cp.Close()) + + // Verify indices 5, 6, 7 survive. + _, loaded, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, idx, parsed) + require.Equal(t, 3, len(loaded)) + require.Equal(t, types.RoadIndex(5), loaded[0].Index) + require.Equal(t, types.RoadIndex(6), loaded[1].Index) + require.Equal(t, types.RoadIndex(7), loaded[2].Index) } diff --git a/sei-tendermint/internal/autobahn/consensus/persist/wal.go b/sei-tendermint/internal/autobahn/consensus/persist/wal.go new file mode 100644 index 0000000000..be1a808ceb --- /dev/null +++ b/sei-tendermint/internal/autobahn/consensus/persist/wal.go @@ -0,0 +1,157 @@ +package persist + +import ( + "context" + "fmt" + "os" + + dbwal "github.com/sei-protocol/sei-chain/sei-db/wal" +) + +// codec is the marshal/unmarshal pair needed to store T in a WAL. +// protoutils.Conv[T, P] satisfies this interface automatically. +type codec[T any] interface { + Marshal(T) []byte + Unmarshal([]byte) (T, error) +} + +// indexedWAL wraps a WAL with monotonic index tracking and typed entries. +// Callers map domain-specific indices (BlockNumber, RoadIndex) to WAL +// indices via Count() and firstIdx. Not safe for concurrent use. +// +// INVARIANT: firstIdx <= nextIdx (enforced by openIndexedWAL, Write, +// TruncateBefore, and TruncateAll). Count() relies on this for unsigned +// subtraction safety. +type indexedWAL[T any] struct { + wal *dbwal.WAL[T] + firstIdx uint64 // WAL index of the oldest entry; == nextIdx when empty + nextIdx uint64 // WAL index that the next Write will be assigned +} + +// openIndexedWAL creates (or opens) a WAL in dir with synchronous, unbatched +// writes and fsync. The prune anchor (persisted via A/B files) is the +// crash-recovery watermark, but fsync on the WAL provides additional +// durability for entries not yet covered by the anchor. +// Initializes index tracking from the WAL's stored offsets so the caller can +// immediately Write, ReadAll, or TruncateBefore. +func openIndexedWAL[T any](dir string, codec codec[T]) (*indexedWAL[T], error) { + if err := os.MkdirAll(dir, 0700); err != nil { + return nil, fmt.Errorf("create dir %s: %w", dir, err) + } + w, err := dbwal.NewWAL( + context.Background(), + func(entry T) ([]byte, error) { return codec.Marshal(entry), nil }, + codec.Unmarshal, + dir, + dbwal.Config{ + WriteBufferSize: 0, // synchronous writes + WriteBatchSize: 1, // no batching + FsyncEnabled: true, + AllowEmpty: true, + }, + ) + if err != nil { + return nil, err + } + iw := &indexedWAL[T]{wal: w} + first, err := w.FirstOffset() + if err != nil { + _ = w.Close() + return nil, fmt.Errorf("first offset: %w", err) + } + last, err := w.LastOffset() + if err != nil { + _ = w.Close() + return nil, fmt.Errorf("last offset: %w", err) + } + // CRITICAL: AllowEmpty must be true in the config above. With AllowEmpty, + // an empty log reports first > last (e.g. first=1, last=0 for a brand-new + // log, or first=N+1, last=N after TruncateAll). Without AllowEmpty, an + // empty log returns (0, 0), and the formula below would give Count() == 1. + // A non-empty log always has first <= last with first >= 1. In both cases, + // setting firstIdx = first and nextIdx = last + 1 yields Count() == 0 + // when empty. + iw.firstIdx = first + iw.nextIdx = last + 1 + return iw, nil +} + +// Write appends entry to the WAL, advancing nextIdx. +func (w *indexedWAL[T]) Write(entry T) error { + if err := w.wal.Write(entry); err != nil { + return err + } + w.nextIdx++ + return nil +} + +// TruncateBefore reads the entry at walIdx, passes it to verify, and — if +// verify returns nil — removes all entries before walIdx. The verify callback +// lets callers assert that the WAL index maps to the expected domain object +// before a destructive operation. +func (w *indexedWAL[T]) TruncateBefore(walIdx uint64, verify func(T) error) error { + if walIdx < w.firstIdx || walIdx >= w.nextIdx { + return fmt.Errorf("WAL index %d out of range [%d, %d)", walIdx, w.firstIdx, w.nextIdx) + } + entry, err := w.wal.ReadAt(walIdx) + if err != nil { + return fmt.Errorf("read at WAL index %d: %w", walIdx, err) + } + if err := verify(entry); err != nil { + return err + } + if err := w.wal.TruncateBefore(walIdx); err != nil { + return fmt.Errorf("truncate before WAL index %d: %w", walIdx, err) + } + w.firstIdx = walIdx + return nil +} + +// ReadAll returns all entries in the WAL. Returns nil if empty. +func (w *indexedWAL[T]) ReadAll() ([]T, error) { + if w.Count() == 0 { + return nil, nil + } + entries := make([]T, 0, w.Count()) + err := w.wal.Replay(w.firstIdx, w.nextIdx-1, func(_ uint64, entry T) error { + entries = append(entries, entry) + return nil + }) + if err != nil { + return nil, err + } + if uint64(len(entries)) != w.Count() { + return nil, fmt.Errorf("WAL replay returned %d entries, expected %d (possible silent data loss)", len(entries), w.Count()) + } + return entries, nil +} + +// FirstIdx returns the WAL index of the oldest entry. +// Equal to nextIdx when the WAL is empty. +func (w *indexedWAL[T]) FirstIdx() uint64 { + return w.firstIdx +} + +// Count returns the number of entries in the WAL. +// Empty when firstIdx == nextIdx (both after fresh open and after TruncateAll). +func (w *indexedWAL[T]) Count() uint64 { + return w.nextIdx - w.firstIdx +} + +// TruncateAll removes all entries from the WAL, leaving it empty for new writes. +// The underlying index counter is preserved (next Write continues from where +// it left off); firstIdx is advanced to nextIdx so Count() == 0. +// Used when all entries are stale (e.g. the prune anchor advanced past +// everything persisted). +func (w *indexedWAL[T]) TruncateAll() error { + if err := w.wal.TruncateAll(); err != nil { + return fmt.Errorf("truncate all WAL entries: %w", err) + } + w.firstIdx = w.nextIdx + return nil +} + +// Close shuts down the underlying WAL. +func (w *indexedWAL[T]) Close() error { + return w.wal.Close() +} diff --git a/sei-tendermint/internal/autobahn/consensus/persist/wal_test.go b/sei-tendermint/internal/autobahn/consensus/persist/wal_test.go new file mode 100644 index 0000000000..019afcf1db --- /dev/null +++ b/sei-tendermint/internal/autobahn/consensus/persist/wal_test.go @@ -0,0 +1,344 @@ +package persist + +import ( + "fmt" + "testing" + + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/require" +) + +func acceptAny(_ string) error { return nil } + +// stringCodec is a trivial codec for testing indexedWAL with strings. +type stringCodec struct{} + +func (stringCodec) Marshal(s string) []byte { return []byte(s) } +func (stringCodec) Unmarshal(b []byte) (string, error) { return string(b), nil } + +func TestIndexedWAL_EmptyStart(t *testing.T) { + dir := t.TempDir() + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + + require.Equal(t, uint64(0), iw.Count()) + require.Equal(t, iw.FirstIdx(), iw.nextIdx) // empty: firstIdx == nextIdx + + entries, err := iw.ReadAll() + require.NoError(t, err) + require.Equal(t, 0, len(entries)) + + require.NoError(t, iw.Close()) +} + +func TestIndexedWAL_WriteAndReadAll(t *testing.T) { + dir := t.TempDir() + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + + require.NoError(t, iw.Write("a")) + require.NoError(t, iw.Write("b")) + require.NoError(t, iw.Write("c")) + + require.Equal(t, uint64(3), iw.Count()) + require.Equal(t, uint64(1), iw.FirstIdx()) + require.Equal(t, uint64(4), iw.nextIdx) + + entries, err := iw.ReadAll() + require.NoError(t, err) + require.Equal(t, 3, len(entries)) + require.Equal(t, "a", entries[0]) + require.Equal(t, "b", entries[1]) + require.Equal(t, "c", entries[2]) + + require.NoError(t, iw.Close()) +} + +func TestIndexedWAL_ReopenWithData(t *testing.T) { + dir := t.TempDir() + + // Write some entries and close. + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + require.NoError(t, iw.Write("x")) + require.NoError(t, iw.Write("y")) + require.NoError(t, iw.Close()) + + // Reopen — should recover firstIdx, nextIdx, and entries. + iw2, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + + require.Equal(t, uint64(2), iw2.Count()) + require.Equal(t, uint64(1), iw2.FirstIdx()) + require.Equal(t, uint64(3), iw2.nextIdx) + + entries, err := iw2.ReadAll() + require.NoError(t, err) + require.Equal(t, 2, len(entries)) + require.Equal(t, "x", entries[0]) + require.Equal(t, "y", entries[1]) + + require.NoError(t, iw2.Close()) +} + +func TestIndexedWAL_ReopenAfterTruncate(t *testing.T) { + dir := t.TempDir() + + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + for _, s := range []string{"a", "b", "c", "d", "e"} { + require.NoError(t, iw.Write(s)) + } + // Truncate first 3 entries (indices 1,2,3); keep 4,5. + require.NoError(t, iw.TruncateBefore(4, acceptAny)) + require.Equal(t, uint64(2), iw.Count()) + require.NoError(t, iw.Close()) + + // Reopen — should see only the surviving entries. + iw2, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + require.Equal(t, uint64(2), iw2.Count()) + require.Equal(t, uint64(4), iw2.FirstIdx()) + require.Equal(t, uint64(6), iw2.nextIdx) + + entries, err := iw2.ReadAll() + require.NoError(t, err) + require.Equal(t, 2, len(entries)) + require.Equal(t, "d", entries[0]) + require.Equal(t, "e", entries[1]) + + require.NoError(t, iw2.Close()) +} + +func TestIndexedWAL_TruncateAllButLast(t *testing.T) { + dir := t.TempDir() + + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + require.NoError(t, iw.Write("a")) + require.NoError(t, iw.Write("b")) + require.NoError(t, iw.Write("c")) + + // TruncateBefore keeps the entry at the given index; remove all but last. + require.NoError(t, iw.TruncateBefore(3, acceptAny)) + require.Equal(t, uint64(1), iw.Count()) + require.Equal(t, uint64(3), iw.FirstIdx()) + require.NoError(t, iw.Close()) + + // Reopen — should see one entry. + iw2, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + require.Equal(t, uint64(1), iw2.Count()) + + entries, err := iw2.ReadAll() + require.NoError(t, err) + require.Equal(t, 1, len(entries)) + require.Equal(t, "c", entries[0]) + + require.NoError(t, iw2.Close()) +} + +func TestIndexedWAL_TruncateBeforeVerifiesEntry(t *testing.T) { + dir := t.TempDir() + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + + require.NoError(t, iw.Write("a")) + require.NoError(t, iw.Write("b")) + require.NoError(t, iw.Write("c")) + + // Verify callback receives the correct entry. + var got string + require.NoError(t, iw.TruncateBefore(2, func(s string) error { + got = s + return nil + })) + require.Equal(t, "b", got) + require.Equal(t, uint64(2), iw.FirstIdx()) + + // Verify callback can reject the truncation. + err = iw.TruncateBefore(3, func(s string) error { + return fmt.Errorf("rejected: %s", s) + }) + require.Error(t, err) + require.Contains(t, err.Error(), "rejected: c") + // firstIdx should NOT have advanced since verify rejected. + require.Equal(t, uint64(2), iw.FirstIdx()) + + require.NoError(t, iw.Close()) +} + +func TestIndexedWAL_TruncateAll(t *testing.T) { + dir := t.TempDir() + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + + require.NoError(t, iw.Write("a")) + require.NoError(t, iw.Write("b")) + require.NoError(t, iw.Write("c")) + require.Equal(t, uint64(3), iw.Count()) + require.Equal(t, uint64(4), iw.nextIdx) + + require.NoError(t, iw.TruncateAll()) + require.Equal(t, uint64(0), iw.Count()) + require.Equal(t, uint64(4), iw.FirstIdx()) // advanced to nextIdx + require.Equal(t, uint64(4), iw.nextIdx) // index counter preserved + + // Can write fresh entries after TruncateAll; indices continue. + require.NoError(t, iw.Write("x")) + require.Equal(t, uint64(1), iw.Count()) + require.Equal(t, uint64(4), iw.FirstIdx()) + require.Equal(t, uint64(5), iw.nextIdx) + + entries, err := iw.ReadAll() + require.NoError(t, err) + require.Equal(t, 1, len(entries)) + require.Equal(t, "x", entries[0]) + + require.NoError(t, iw.Close()) + + // Reopen — should see only the post-TruncateAll entry. + iw2, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + require.Equal(t, uint64(1), iw2.Count()) + require.Equal(t, uint64(4), iw2.FirstIdx()) + require.Equal(t, uint64(5), iw2.nextIdx) + entries, err = iw2.ReadAll() + require.NoError(t, err) + require.Equal(t, 1, len(entries)) + require.Equal(t, "x", entries[0]) + require.NoError(t, iw2.Close()) +} + +func TestIndexedWAL_ReadAllDetectsStaleNextIdx(t *testing.T) { + dir := t.TempDir() + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + + require.NoError(t, iw.Write("a")) + require.NoError(t, iw.Write("b")) + require.Equal(t, uint64(2), iw.Count()) + + // Simulate stale internal state: advance nextIdx so Count() reports more + // entries than the WAL actually contains. ReadAll must return an error + // (either from Replay failing to read the missing entry, or from the + // post-replay count check). + iw.nextIdx++ + + _, err = iw.ReadAll() + require.Error(t, err) + + iw.nextIdx-- + require.NoError(t, iw.Close()) +} + +func TestIndexedWAL_WriteAfterTruncate(t *testing.T) { + dir := t.TempDir() + + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + require.NoError(t, iw.Write("a")) + require.NoError(t, iw.Write("b")) + require.NoError(t, iw.Write("c")) + + // Truncate "a" and "b". + require.NoError(t, iw.TruncateBefore(3, acceptAny)) + require.Equal(t, uint64(1), iw.Count()) + + // Write more after truncation. + require.NoError(t, iw.Write("d")) + require.NoError(t, iw.Write("e")) + require.Equal(t, uint64(3), iw.Count()) + require.Equal(t, uint64(3), iw.FirstIdx()) + require.Equal(t, uint64(6), iw.nextIdx) + + entries, err := iw.ReadAll() + require.NoError(t, err) + require.Equal(t, 3, len(entries)) + require.Equal(t, "c", entries[0]) + require.Equal(t, "d", entries[1]) + require.Equal(t, "e", entries[2]) + + require.NoError(t, iw.Close()) +} + +func TestIndexedWAL_TruncateAllOnEmpty(t *testing.T) { + dir := t.TempDir() + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + + // TruncateAll on a brand-new (empty) WAL should be a no-op. + require.NoError(t, iw.TruncateAll()) + require.Equal(t, uint64(0), iw.Count()) + + // Can still write after TruncateAll on empty. + require.NoError(t, iw.Write("a")) + require.Equal(t, uint64(1), iw.Count()) + + entries, err := iw.ReadAll() + require.NoError(t, err) + require.Equal(t, 1, len(entries)) + require.Equal(t, "a", entries[0]) + require.NoError(t, iw.Close()) +} + +func TestIndexedWAL_ReopenAfterTruncateAllNoWrites(t *testing.T) { + dir := t.TempDir() + + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + require.NoError(t, iw.Write("a")) + require.NoError(t, iw.Write("b")) + require.NoError(t, iw.TruncateAll()) + require.Equal(t, uint64(0), iw.Count()) + // Close immediately — no writes after TruncateAll. + require.NoError(t, iw.Close()) + + // Reopen — should be empty with correct index tracking. + iw2, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + require.Equal(t, uint64(0), iw2.Count()) + + entries, err := iw2.ReadAll() + require.NoError(t, err) + require.Equal(t, 0, len(entries)) + + // Writing after reopen should work. + require.NoError(t, iw2.Write("c")) + require.Equal(t, uint64(1), iw2.Count()) + + entries, err = iw2.ReadAll() + require.NoError(t, err) + require.Equal(t, 1, len(entries)) + require.Equal(t, "c", entries[0]) + require.NoError(t, iw2.Close()) +} + +func TestIndexedWAL_SuccessiveTruncateBefore(t *testing.T) { + dir := t.TempDir() + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + + for _, s := range []string{"a", "b", "c", "d", "e"} { + require.NoError(t, iw.Write(s)) + } + // First truncate: remove "a" (index 1). + require.NoError(t, iw.TruncateBefore(2, acceptAny)) + require.Equal(t, uint64(4), iw.Count()) + require.Equal(t, uint64(2), iw.FirstIdx()) + + // Write more. + require.NoError(t, iw.Write("f")) + require.Equal(t, uint64(5), iw.Count()) + + // Second truncate: remove "b", "c", "d" (indices 2,3,4). + require.NoError(t, iw.TruncateBefore(5, acceptAny)) + require.Equal(t, uint64(2), iw.Count()) + require.Equal(t, uint64(5), iw.FirstIdx()) + + entries, err := iw.ReadAll() + require.NoError(t, err) + require.Equal(t, 2, len(entries)) + require.Equal(t, "e", entries[0]) + require.Equal(t, "f", entries[1]) + require.NoError(t, iw.Close()) +}