diff --git a/.claude/skills/test-pr-devnet/scripts/test-branch.sh b/.claude/skills/test-pr-devnet/scripts/test-branch.sh index b151b22..4d94ac5 100755 --- a/.claude/skills/test-pr-devnet/scripts/test-branch.sh +++ b/.claude/skills/test-pr-devnet/scripts/test-branch.sh @@ -62,7 +62,9 @@ if ! docker info &>/dev/null; then exit 1 fi -if [[ ! -d "$ETHLAMBDA_ROOT/.git" ]]; then +# Use `git rev-parse` instead of `-d .git` to support git worktrees, +# where .git is a file (not a directory) pointing to the main repo. +if ! git -C "$ETHLAMBDA_ROOT" rev-parse --git-dir &>/dev/null; then echo -e "${RED}✗ Error: Not in a git repository${NC}" echo " Run this script from ethlambda repository root" exit 1 diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 998159c..8088ac5 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet, VecDeque}; use std::time::{Duration, SystemTime}; use ethlambda_state_transition::is_proposer; @@ -98,8 +98,8 @@ struct BlockChainServer { p2p_tx: mpsc::UnboundedSender, key_manager: key_manager::KeyManager, - // Pending blocks waiting for their parent - pending_blocks: HashMap>, + // Pending block roots waiting for their parent (block data stored in DB) + pending_blocks: HashMap>, // Maps pending block_root → its cached missing ancestor. Resolved by walking the // chain at lookup time, since a cached ancestor may itself have become pending with // a deeper missing parent after the entry was created. @@ -284,7 +284,27 @@ impl BlockChainServer { Ok(()) } + /// Process a newly received block. fn on_block(&mut self, signed_block: SignedBlockWithAttestation) { + let mut queue = VecDeque::new(); + queue.push_back(signed_block); + + // A new block can trigger a cascade of pending blocks becoming processable. + // Here we process blocks iteratively, to avoid recursive calls that could + // cause a stack overflow. + while let Some(block) = queue.pop_front() { + self.process_or_pend_block(block, &mut queue); + } + } + + /// Try to process a single block. If its parent state is missing, store it + /// as pending. On success, collect any unblocked children into `queue` for + /// the caller to process next (iteratively, avoiding deep recursion). + fn process_or_pend_block( + &mut self, + signed_block: SignedBlockWithAttestation, + queue: &mut VecDeque, + ) { let slot = signed_block.message.block.slot; let block_root = signed_block.message.block.tree_hash_root(); let parent_root = signed_block.message.block.parent_root; @@ -304,11 +324,40 @@ impl BlockChainServer { self.pending_block_parents.insert(block_root, missing_root); - // Store block for later processing + // Persist block data to DB (no LiveChain entry — invisible to fork choice) + self.store.insert_pending_block(block_root, signed_block); + + // Store only the H256 reference in memory self.pending_blocks .entry(parent_root) .or_default() - .push(signed_block); + .insert(block_root); + + // Walk up through DB: if missing_root is already stored from a previous + // session, the actual missing block is further up the chain. + // Note: this loop always terminates — blocks reference parents by hash, + // so a cycle would require a hash collision. + while let Some(header) = self.store.get_block_header(&missing_root) { + if self.store.has_state(&header.parent_root) { + // Parent state available — enqueue for processing, cascade + // handles the rest via the outer loop. + let block = self + .store + .get_signed_block(&missing_root) + .expect("header and parent state exist, so the full signed block must too"); + queue.push_back(block); + return; + } + // Block exists but parent doesn't have state — register as pending + // so the cascade works when the true ancestor arrives + self.pending_blocks + .entry(header.parent_root) + .or_default() + .insert(missing_root); + self.pending_block_parents + .insert(missing_root, header.parent_root); + missing_root = header.parent_root; + } // Request the actual missing block from network self.request_missing_block(missing_root); @@ -326,8 +375,8 @@ impl BlockChainServer { "Block imported successfully" ); - // Check if any pending blocks can now be processed - self.process_pending_children(block_root); + // Enqueue any pending blocks that were waiting for this parent + self.collect_pending_children(block_root, queue); } Err(err) => { warn!( @@ -353,23 +402,37 @@ impl BlockChainServer { ); } - fn process_pending_children(&mut self, parent_root: H256) { - // Remove and process all blocks that were waiting for this parent - if let Some(children) = self.pending_blocks.remove(&parent_root) { - info!(%parent_root, num_children=%children.len(), - "Processing pending blocks after parent arrival"); + /// Move pending children of `parent_root` into the work queue for iterative + /// processing. This replaces the old recursive `process_pending_children`. + fn collect_pending_children( + &mut self, + parent_root: H256, + queue: &mut VecDeque, + ) { + let Some(child_roots) = self.pending_blocks.remove(&parent_root) else { + return; + }; + + info!(%parent_root, num_children=%child_roots.len(), + "Processing pending blocks after parent arrival"); - for child_block in children { - let block_root = child_block.message.block.tree_hash_root(); - let slot = child_block.message.block.slot; - trace!(%parent_root, %slot, "Processing pending child block"); + for block_root in child_roots { + // Clean up lineage tracking + self.pending_block_parents.remove(&block_root); - // Clean up lineage tracking - self.pending_block_parents.remove(&block_root); + // Load block data from DB + let Some(child_block) = self.store.get_signed_block(&block_root) else { + warn!( + block_root = %ShortRoot(&block_root.0), + "Pending block missing from DB, skipping" + ); + continue; + }; - // Process recursively - might unblock more descendants - self.on_block(child_block); - } + let slot = child_block.message.block.slot; + trace!(%parent_root, %slot, "Processing pending child block"); + + queue.push_back(child_block); } } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index b6ef9ca..d43a6a0 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -7,7 +7,7 @@ use std::sync::{Arc, LazyLock}; /// allowing us to skip storing empty bodies and reconstruct them on read. static EMPTY_BODY_ROOT: LazyLock = LazyLock::new(|| BlockBody::default().tree_hash_root()); -use crate::api::{StorageBackend, Table}; +use crate::api::{StorageBackend, StorageWriteBatch, Table}; use crate::types::{StoredAggregatedPayload, StoredSignature}; use ethlambda_types::{ @@ -533,6 +533,21 @@ impl Store { // ============ Signed Blocks ============ + /// Insert a block as pending (parent state not yet available). + /// + /// Stores block data in `BlockHeaders`/`BlockBodies`/`BlockSignatures` + /// **without** writing to `LiveChain`. This persists the heavy signature + /// data (~3KB+ per block) to disk while keeping the block invisible to + /// fork choice. + /// + /// When the block is later processed via [`insert_signed_block`](Self::insert_signed_block), + /// the same keys are overwritten (idempotent) and a `LiveChain` entry is added. + pub fn insert_pending_block(&mut self, root: H256, signed_block: SignedBlockWithAttestation) { + let mut batch = self.backend.begin_write().expect("write batch"); + write_signed_block(batch.as_mut(), &root, signed_block); + batch.commit().expect("commit"); + } + /// Insert a signed block, storing the block and signatures separately. /// /// Blocks and signatures are stored in separate tables because the genesis @@ -541,41 +556,8 @@ impl Store { /// /// Takes ownership to avoid cloning large signature data. pub fn insert_signed_block(&mut self, root: H256, signed_block: SignedBlockWithAttestation) { - // Destructure to extract all components without cloning - let SignedBlockWithAttestation { - message: - BlockWithAttestation { - block, - proposer_attestation, - }, - signature, - } = signed_block; - - let signatures = BlockSignaturesWithAttestation { - proposer_attestation, - signatures: signature, - }; - let mut batch = self.backend.begin_write().expect("write batch"); - - let header = block.header(); - let header_entries = vec![(root.as_ssz_bytes(), header.as_ssz_bytes())]; - batch - .put_batch(Table::BlockHeaders, header_entries) - .expect("put block header"); - - // Skip storing empty bodies - they can be reconstructed from the header's body_root - if header.body_root != *EMPTY_BODY_ROOT { - let body_entries = vec![(root.as_ssz_bytes(), block.body.as_ssz_bytes())]; - batch - .put_batch(Table::BlockBodies, body_entries) - .expect("put block body"); - } - - let sig_entries = vec![(root.as_ssz_bytes(), signatures.as_ssz_bytes())]; - batch - .put_batch(Table::BlockSignatures, sig_entries) - .expect("put block signatures"); + let block = write_signed_block(batch.as_mut(), &root, signed_block); let index_entries = vec![( encode_live_chain_key(block.slot, &root), @@ -892,3 +874,50 @@ impl Store { .expect("head state is always available") } } + +/// Write block header, body, and signatures onto an existing batch. +/// +/// Returns the deserialized [`Block`] so callers can access fields like +/// `slot` and `parent_root` without re-deserializing. +fn write_signed_block( + batch: &mut dyn StorageWriteBatch, + root: &H256, + signed_block: SignedBlockWithAttestation, +) -> Block { + let SignedBlockWithAttestation { + message: + BlockWithAttestation { + block, + proposer_attestation, + }, + signature, + } = signed_block; + + let signatures = BlockSignaturesWithAttestation { + proposer_attestation, + signatures: signature, + }; + + let header = block.header(); + let root_bytes = root.as_ssz_bytes(); + + let header_entries = vec![(root_bytes.clone(), header.as_ssz_bytes())]; + batch + .put_batch(Table::BlockHeaders, header_entries) + .expect("put block header"); + + // Skip storing empty bodies - they can be reconstructed from the header's body_root + if header.body_root != *EMPTY_BODY_ROOT { + let body_entries = vec![(root_bytes.clone(), block.body.as_ssz_bytes())]; + batch + .put_batch(Table::BlockBodies, body_entries) + .expect("put block body"); + } + + let sig_entries = vec![(root_bytes, signatures.as_ssz_bytes())]; + batch + .put_batch(Table::BlockSignatures, sig_entries) + .expect("put block signatures"); + + block +}