-
Notifications
You must be signed in to change notification settings - Fork 872
persist: replace file-per-entry with WAL and refactor into generic indexedWAL #3044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
97ef36d
b3dd4a8
953cc9e
62c71b8
5ce67a7
ed71b43
fd5ed2c
4276d38
a305f9f
d936cb8
3a0719f
81c53ff
1dd2020
4b89979
21c995d
6c1d172
453906d
73e3633
0441000
412bc56
69be223
c126392
d3b515f
eca2ad6
7d7baf2
14d802f
fe19830
7056256
f9f4af3
017da34
9df7f9f
0c2c2bd
0365abb
05788cf
273ab80
42595a5
8788096
35bb0d8
1fa7ae3
4adc9a8
bef0d5b
83a43a9
de6b380
21664a4
810b7ba
4281811
3b0adaf
03dc013
c1abcde
e9c9b7a
f5b3e0f
6b8dc6e
3aa615f
270fdc3
09664f2
c22fbd4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -157,20 +157,18 @@ func NewState(key types.SecretKey, data *data.State, stateDir utils.Option[strin | |
| return nil, err | ||
| } | ||
|
|
||
| // Delete files below the prune anchor that were filtered out by | ||
| // loadPersistedState. Also reset the CommitQC persister's cursor to | ||
| // match the post-prune range. | ||
| laneFirsts := make(map[types.LaneID]types.BlockNumber, len(inner.blocks)) | ||
| for lane, q := range inner.blocks { | ||
| laneFirsts[lane] = q.first | ||
| } | ||
| if err := pers.blocks.DeleteBefore(laneFirsts); err != nil { | ||
| return nil, fmt.Errorf("prune stale block files: %w", err) | ||
| } | ||
| if err := pers.commitQCs.DeleteBefore(inner.commitQCs.first); err != nil { | ||
| return nil, fmt.Errorf("prune stale commitQC files: %w", err) | ||
| // Truncate WAL entries below the prune anchor that were filtered out by | ||
| // loadPersistedState. | ||
| if ls, ok := loaded.Get(); ok { | ||
| if anchor, ok := ls.pruneAnchor.Get(); ok { | ||
| if err := pers.blocks.DeleteBefore(anchor.CommitQC); err != nil { | ||
| return nil, fmt.Errorf("prune stale block WAL entries: %w", err) | ||
| } | ||
| if err := pers.commitQCs.DeleteBefore(anchor.CommitQC.Proposal().Index(), utils.Some(anchor.CommitQC)); err != nil { | ||
| return nil, fmt.Errorf("prune stale commitQC WAL entries: %w", err) | ||
| } | ||
| } | ||
| } | ||
| pers.commitQCs.ResetNext(inner.commitQCs.next) | ||
|
|
||
| return &State{ | ||
| key: key, | ||
|
|
@@ -631,18 +629,20 @@ func (s *State) Run(ctx context.Context) error { | |
|
|
||
| // runPersist is the main loop for the persist goroutine. | ||
| // Write order: | ||
| // 1. Prune anchor (AppQC + CommitQC pair) — the crash-recovery watermark. | ||
| // 2. CommitQCs in order, then publish LastCommitQC immediately | ||
| // so consensus can advance without waiting for block writes. | ||
| // 3. Blocks per lane in order, markBlockPersisted after each. | ||
| // 4. Prune old blocks and CommitQCs. | ||
| // 1. Prune anchor (AppQC + CommitQC pair) — the crash-recovery watermark — | ||
| // followed by WAL truncation of blocks and CommitQCs. Truncation is | ||
| // co-located with the anchor write so the pruning point is derived | ||
| // directly from the anchor's CommitQC. Truncation must happen before | ||
| // writes because the WAL requires contiguous indices — if the anchor | ||
| // advanced past all persisted entries, DeleteBefore resets the WAL so | ||
| // new writes start clean. | ||
| // 2. CommitQCs and blocks concurrently via scope.Parallel: one goroutine for | ||
| // CommitQCs, one goroutine per lane for blocks (sequential within each | ||
| // lane). Each goroutine publishes its result (markCommitQCsPersisted / | ||
| // markBlockPersisted) per entry so voting unblocks ASAP. | ||
| // | ||
| // The prune anchor is a pruning watermark: on restart we resume from it. | ||
| // | ||
| // Blocks are persisted one at a time with inner.nextBlockToPersist | ||
| // updated after each write, so vote latency equals single-block write | ||
| // time regardless of batch size. | ||
| // | ||
| // TODO: use a single WAL for anchor and CommitQCs to make | ||
| // this atomic rather than relying on write order. | ||
| func (s *State) runPersist(ctx context.Context, pers persisters) error { | ||
|
|
@@ -654,51 +654,74 @@ func (s *State) runPersist(ctx context.Context, pers persisters) error { | |
| } | ||
|
|
||
| // 1. Persist prune anchor first — establishes the crash-recovery watermark. | ||
| // All WAL pruning is co-located here so the truncation point is | ||
| // derived directly from the anchor, making the safety invariant | ||
| // explicit: we only truncate entries the on-disk anchor covers. | ||
| // Block WAL pruning must happen before writes because the WAL | ||
| // requires contiguous indices — if the anchor advanced past all | ||
| // persisted entries, DeleteBefore resets the WAL so new writes | ||
| // start clean. | ||
| if anchor, ok := batch.pruneAnchor.Get(); ok { | ||
| if err := pers.pruneAnchor.Persist(PruneAnchorConv.Encode(anchor)); err != nil { | ||
| return fmt.Errorf("persist prune anchor: %w", err) | ||
| } | ||
| s.advancePersistedBlockStart(anchor.CommitQC) | ||
| lastPersistedAppQCNext = anchor.CommitQC.Proposal().Index() + 1 | ||
| } | ||
|
|
||
| // 2. Persist new CommitQCs, then publish immediately so consensus | ||
| // can advance without waiting for block writes or pruning. | ||
| for _, qc := range batch.commitQCs { | ||
| if err := pers.commitQCs.PersistCommitQC(qc); err != nil { | ||
| return fmt.Errorf("persist commitqc %d: %w", qc.Index(), err) | ||
| if err := pers.blocks.DeleteBefore(anchor.CommitQC); err != nil { | ||
| return fmt.Errorf("block deleteBefore: %w", err) | ||
| } | ||
| if err := pers.commitQCs.DeleteBefore(anchor.CommitQC.Proposal().Index(), utils.Some(anchor.CommitQC)); err != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I know we are kind of optimizing in the void without benchmarks, but there are still some disk writes here which are unnecessarily sequential: commitQCs.DeleteBefore can be parallel to blocks.DeleteBefore. CommitQCs.persist can be parallel to blocks.DeleteBefore. In fact each DeleteBefore/persist for each WAL is independent and only Anchor write needs to happen sequentially to everything else. Also we might want to persist first, then do DeleteBefore. Also DeleteBefore doesn't require fsync (I'm not sure how WAL behaves there).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In blocks, we are mainly doing DeleteBefore before persisting for the "after the anchor, the commitQC must be contiguous check" as a defense in depth. If you think that is unnecessary and guaranteed by caller, I can remove it. But then one screw up and our indices calculation would be wrong. (Of course I can also maintain a map of WAL index to commitQC RoadIndex and discard the late commitQC which tried to fill a hole.) |
||
| return fmt.Errorf("commitqc deleteBefore: %w", err) | ||
| } | ||
| } | ||
| if len(batch.commitQCs) > 0 { | ||
| s.markCommitQCsPersisted(batch.commitQCs[len(batch.commitQCs)-1]) | ||
| } | ||
|
|
||
| // 3. Persist blocks (mark each individually for vote latency). | ||
| // 2. Persist CommitQCs and blocks concurrently. CommitQCs go in | ||
| // one goroutine; blocks fan out one goroutine per lane (sequential | ||
| // within each lane to preserve block-number ordering). Each | ||
| // goroutine publishes its result (markCommitQCsPersisted / | ||
| // markBlockPersisted) as soon as it finishes. | ||
| blocksByLane := make(map[types.LaneID][]*types.Signed[*types.LaneProposal]) | ||
| for _, proposal := range batch.blocks { | ||
| h := proposal.Msg().Block().Header() | ||
| if err := pers.blocks.PersistBlock(proposal); err != nil { | ||
| return fmt.Errorf("persist block %s/%d: %w", h.Lane(), h.BlockNumber(), err) | ||
| lane := proposal.Msg().Block().Header().Lane() | ||
| blocksByLane[lane] = append(blocksByLane[lane], proposal) | ||
| } | ||
| if err := scope.Parallel(func(ps scope.ParallelScope) error { | ||
| if len(batch.commitQCs) > 0 { | ||
| ps.Spawn(func() error { | ||
| for _, qc := range batch.commitQCs { | ||
| if err := pers.commitQCs.PersistCommitQC(qc); err != nil { | ||
| return fmt.Errorf("persist commitqc %d: %w", qc.Index(), err) | ||
| } | ||
| s.markCommitQCsPersisted(qc) | ||
| } | ||
| return nil | ||
| }) | ||
| } | ||
| s.markBlockPersisted(h.Lane(), h.BlockNumber()+1) | ||
| } | ||
|
|
||
| // 4. Prune old data. | ||
| if err := pers.blocks.DeleteBefore(batch.laneFirsts); err != nil { | ||
| return fmt.Errorf("block deleteBefore: %w", err) | ||
| } | ||
| if err := pers.commitQCs.DeleteBefore(batch.commitQCFirst); err != nil { | ||
| return fmt.Errorf("commitqc deleteBefore: %w", err) | ||
| for lane, proposals := range blocksByLane { | ||
| ps.Spawn(func() error { | ||
| for _, p := range proposals { | ||
| if err := pers.blocks.PersistBlock(p); err != nil { | ||
| h := p.Msg().Block().Header() | ||
| return fmt.Errorf("persist block %s/%d: %w", h.Lane(), h.BlockNumber(), err) | ||
| } | ||
| s.markBlockPersisted(lane, p.Msg().Block().Header().BlockNumber()+1) | ||
| } | ||
| return nil | ||
| }) | ||
| } | ||
| return nil | ||
| }); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // persistBatch holds the data collected under lock for one persist iteration. | ||
| type persistBatch struct { | ||
| blocks []*types.Signed[*types.LaneProposal] | ||
| commitQCs []*types.CommitQC | ||
| pruneAnchor utils.Option[*PruneAnchor] | ||
| laneFirsts map[types.LaneID]types.BlockNumber | ||
| commitQCFirst types.RoadIndex | ||
| blocks []*types.Signed[*types.LaneProposal] | ||
| commitQCs []*types.CommitQC | ||
| pruneAnchor utils.Option[*PruneAnchor] | ||
| } | ||
|
|
||
| // advancePersistedBlockStart updates the per-lane block admission watermark | ||
|
|
@@ -717,8 +740,9 @@ func (s *State) advancePersistedBlockStart(commitQC *types.CommitQC) { | |
| } | ||
|
|
||
| // markBlockPersisted advances the per-lane block persistence cursor. | ||
| // Called after each individual block write so that RecvBatch (and therefore | ||
| // voting) unblocks with single-block latency regardless of batch size. | ||
| // Called after each block is persisted so that RecvBatch (and therefore | ||
| // voting) can unblock as soon as the block is durable. Safe for concurrent | ||
| // callers (acquires s.inner lock internally). | ||
| func (s *State) markBlockPersisted(lane types.LaneID, next types.BlockNumber) { | ||
| for inner, ctrl := range s.inner.Lock() { | ||
| inner.nextBlockToPersist[lane] = next | ||
|
|
@@ -759,16 +783,13 @@ func (s *State) collectPersistBatch(ctx context.Context, lastPersistedAppQCNext | |
| }); err != nil { | ||
| return b, err | ||
| } | ||
| b.laneFirsts = make(map[types.LaneID]types.BlockNumber, len(inner.blocks)) | ||
| for lane, q := range inner.blocks { | ||
| start := max(inner.nextBlockToPersist[lane], q.first) | ||
| for n := start; n < q.next; n++ { | ||
| b.blocks = append(b.blocks, q.q[n]) | ||
| } | ||
| b.laneFirsts[lane] = q.first | ||
| } | ||
| commitQCNext = max(commitQCNext, inner.commitQCs.first) | ||
| b.commitQCFirst = inner.commitQCs.first | ||
| for n := commitQCNext; n < inner.commitQCs.next; n++ { | ||
| b.commitQCs = append(b.commitQCs, inner.commitQCs.q[n]) | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
afaiu pruneAnchor will still use the 2-file persister. Is this supposed to stay like this? If so, then we perhaps need to productionize the 2-file persister more - like adding a checksum in the file would be nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually did that on the first draft, but felt WAL is not the best fit here. We only need one most recent snapshot at all times, there is no history at all.
If you agree with that conclusion, I could of course productionize the 2-file persister more, but I feel that should be its own PR, because this PR is mostly about WAL replacing local files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#3105