From 8d30fb0c1955fd5f6c53c005730a002e9a16e355 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 3 Feb 2026 16:41:07 -0300 Subject: [PATCH] feat: fetch unknown blocks referenced by attestations --- crates/blockchain/src/lib.rs | 379 ++++++++++++++++++++++++++++++++++- 1 file changed, 377 insertions(+), 2 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index a438a46..e5e5986 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::time::{Duration, SystemTime}; use ethlambda_state_transition::is_proposer; @@ -42,6 +42,44 @@ pub struct BlockChain { /// Seconds in a slot. Each slot has 4 intervals of 1 second each. pub const SECONDS_PER_SLOT: u64 = 4; +// Pending attestation limits (tighter to reduce DoS surface) +const MAX_PENDING_ATTESTATIONS_PER_BLOCK: usize = 32; // Per unknown block +const MAX_TOTAL_PENDING_ATTESTATIONS: usize = 512; // Global cap +const PENDING_ATTESTATION_TTL_SLOTS: u64 = 4; // 16 seconds + +/// Attestation pending on one or more unknown blocks. +struct PendingAttestation { + /// Full signed attestation (need signature for re-verification) + signed_attestation: SignedAttestation, + /// Block roots we're waiting for + waiting_for: HashSet, + /// Slot when attestation was received (for TTL tracking) + received_slot: u64, +} + +/// Storage for pending attestations. +struct PendingAttestations { + /// Index from block root to attestation indices waiting for that block + by_block: HashMap>, + /// All pending attestations (indices used by by_block) + attestations: Vec>, + /// Free list for slot reuse + free_slots: Vec, + /// Total active count + active_count: usize, +} + +impl PendingAttestations { + fn new() -> Self { + Self { + by_block: HashMap::new(), + attestations: Vec::new(), + free_slots: Vec::new(), + active_count: 0, + } + } +} + impl BlockChain { pub fn spawn( store: Store, @@ -55,6 +93,7 @@ impl BlockChain { p2p_tx, key_manager, pending_blocks: HashMap::new(), + pending_attestations: PendingAttestations::new(), } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -94,6 +133,8 @@ struct BlockChainServer { // Pending blocks waiting for their parent pending_blocks: HashMap>, + // Pending attestations waiting for missing blocks + pending_attestations: PendingAttestations, } impl BlockChainServer { @@ -130,6 +171,12 @@ impl BlockChainServer { // Update safe target slot metric (updated by store.on_tick at interval 2) metrics::update_safe_target_slot(self.store.safe_target_slot()); + + // Clean up expired pending attestations at interval 2 + // (avoids conflict with attestation promotion at intervals 0 and 3) + if interval == 2 { + self.cleanup_expired_pending_attestations(slot); + } } /// Returns the validator ID if any of our validators is the proposer for this slot. @@ -266,11 +313,22 @@ impl BlockChainServer { signed_block: SignedBlockWithAttestation, ) -> Result<(), StoreError> { let slot = signed_block.message.block.slot; + let old_finalized_slot = self.store.latest_finalized().slot; + store::on_block(&mut self.store, signed_block)?; + + let new_finalized_slot = self.store.latest_finalized().slot; + metrics::update_head_slot(slot); metrics::update_latest_justified_slot(self.store.latest_justified().slot); - metrics::update_latest_finalized_slot(self.store.latest_finalized().slot); + metrics::update_latest_finalized_slot(new_finalized_slot); metrics::update_validators_count(self.key_manager.validator_ids().len() as u64); + + // Clean up pending attestations if finalization advanced + if new_finalized_slot > old_finalized_slot { + self.cleanup_finalized_pending_attestations(new_finalized_slot); + } + Ok(()) } @@ -308,6 +366,9 @@ impl BlockChainServer { // Check if any pending blocks can now be processed self.process_pending_children(block_root); + + // Process pending attestations waiting for this block + self.process_pending_attestations(block_root); } Err(err) => { warn!( @@ -348,10 +409,324 @@ impl BlockChainServer { } fn on_gossip_attestation(&mut self, attestation: SignedAttestation) { + // First, check which blocks are unknown + let data = &attestation.message; + let mut unknown_blocks = Vec::new(); + + // Source check happens first in validate_attestation + if !self.store.contains_block(&data.source.root) { + // Source unknown = reject (not recoverable) + warn!( + source_root = %ShortRoot(&data.source.root.0), + validator = attestation.validator_id, + "Attestation has unknown source block" + ); + return; + } + if !self.store.contains_block(&data.target.root) { + unknown_blocks.push(data.target.root); + } + if !self.store.contains_block(&data.head.root) { + unknown_blocks.push(data.head.root); + } + + if !unknown_blocks.is_empty() { + // Pre-validate before queuing + if !self.can_queue_pending_attestation(&attestation) { + trace!( + validator = attestation.validator_id, + "Rejecting pending attestation: failed pre-validation" + ); + return; + } + self.store_pending_attestation(attestation, unknown_blocks); + return; + } + + // All blocks known, process normally if let Err(err) = store::on_gossip_attestation(&mut self.store, attestation) { warn!(%err, "Failed to process gossiped attestation"); } } + + /// Get the current slot based on system time + fn current_slot(&self) -> u64 { + let genesis_time = self.store.config().genesis_time; + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + now.saturating_sub(genesis_time) / SECONDS_PER_SLOT + } + + /// Pre-validate attestation before queuing (cheap checks to filter garbage) + fn can_queue_pending_attestation(&self, attestation: &SignedAttestation) -> bool { + let current_slot = self.current_slot(); + let data = &attestation.message; + + // 1. Slot sanity: not too old, not too far in future + if data.slot + PENDING_ATTESTATION_TTL_SLOTS < current_slot { + return false; // Too old + } + if data.slot > current_slot + 1 { + return false; // Future attestation + } + + // 2. Source must be known (required by spec, not recoverable) + if !self.store.contains_block(&data.source.root) { + return false; + } + + // 3. Target not already finalized (would be useless) + if data.target.slot <= self.store.latest_finalized().slot { + return false; + } + + // 4. Validator ID must be plausible + let head_state = self.store.head_state(); + if attestation.validator_id >= head_state.validators.len() as u64 { + return false; + } + + true + } + + /// Store a pending attestation waiting for one or more unknown blocks + fn store_pending_attestation( + &mut self, + attestation: SignedAttestation, + unknown_blocks: Vec, + ) { + let current_slot = self.current_slot(); + let validator_id = attestation.validator_id; + + // Enforce global limit + if self.pending_attestations.active_count >= MAX_TOTAL_PENDING_ATTESTATIONS { + self.evict_oldest_pending_attestation(); + } + + // Enforce per-block limits (check all unknown blocks) + for root in &unknown_blocks { + if let Some(indices) = self.pending_attestations.by_block.get(root) + && indices.len() >= MAX_PENDING_ATTESTATIONS_PER_BLOCK + { + trace!( + root = %ShortRoot(&root.0), + %validator_id, + "Per-block pending limit reached" + ); + return; + } + } + + // Create the pending attestation + let waiting_for: HashSet = unknown_blocks.iter().copied().collect(); + let pending = PendingAttestation { + signed_attestation: attestation, + waiting_for: waiting_for.clone(), + received_slot: current_slot, + }; + + // Allocate slot + let idx = if let Some(free_idx) = self.pending_attestations.free_slots.pop() { + self.pending_attestations.attestations[free_idx] = Some(pending); + free_idx + } else { + let idx = self.pending_attestations.attestations.len(); + self.pending_attestations.attestations.push(Some(pending)); + idx + }; + + // Add to indices for each unknown block + for root in &unknown_blocks { + self.pending_attestations + .by_block + .entry(*root) + .or_default() + .push(idx); + } + + self.pending_attestations.active_count += 1; + + info!( + %validator_id, + num_unknown = unknown_blocks.len(), + pending_count = self.pending_attestations.active_count, + "Stored attestation pending on missing blocks" + ); + + // Request ALL unknown blocks + for root in unknown_blocks { + self.request_missing_block(root); + } + } + + /// Evict the oldest pending attestation to make room for new ones + fn evict_oldest_pending_attestation(&mut self) { + // Find the oldest attestation by received_slot + let mut oldest_idx = None; + let mut oldest_slot = u64::MAX; + + for (idx, pending_opt) in self.pending_attestations.attestations.iter().enumerate() { + if let Some(pending) = pending_opt + && pending.received_slot < oldest_slot + { + oldest_slot = pending.received_slot; + oldest_idx = Some(idx); + } + } + + if let Some(idx) = oldest_idx + && let Some(pending) = self.pending_attestations.attestations[idx].take() + { + // Remove from all by_block indices + for root in &pending.waiting_for { + if let Some(indices) = self.pending_attestations.by_block.get_mut(root) { + indices.retain(|&i| i != idx); + } + } + // Clear the slot and add to free list + self.pending_attestations.free_slots.push(idx); + self.pending_attestations.active_count -= 1; + } + } + + /// Process pending attestations after a block arrives + fn process_pending_attestations(&mut self, arrived_block: H256) { + // Get indices waiting for this block + let Some(indices) = self.pending_attestations.by_block.remove(&arrived_block) else { + return; + }; + + let mut processed = 0; + let mut still_waiting = 0; + let mut failed = 0; + + for idx in indices { + let Some(pending) = self.pending_attestations.attestations[idx].as_mut() else { + continue; // Already processed + }; + + // Remove this block from waiting set + pending.waiting_for.remove(&arrived_block); + + // If still waiting for other blocks, keep pending + if !pending.waiting_for.is_empty() { + still_waiting += 1; + continue; + } + + // All blocks now available - process + let attestation = self.pending_attestations.attestations[idx] + .take() + .unwrap() + .signed_attestation; + + // Return slot to free list + self.pending_attestations.free_slots.push(idx); + self.pending_attestations.active_count -= 1; + + // Process through FULL on_gossip_attestation flow (includes signature verification) + match store::on_gossip_attestation(&mut self.store, attestation) { + Ok(()) => { + processed += 1; + } + Err(err) => { + failed += 1; + trace!( + arrived_block = %ShortRoot(&arrived_block.0), + %err, + "Pending attestation still invalid after block arrival" + ); + } + } + } + + if processed > 0 || failed > 0 { + info!( + arrived_block = %ShortRoot(&arrived_block.0), + processed, + still_waiting, + failed, + "Processed pending attestations after block arrival" + ); + } + } + + /// Clean up expired pending attestations (TTL-based) + fn cleanup_expired_pending_attestations(&mut self, current_slot: u64) { + let cutoff_slot = current_slot.saturating_sub(PENDING_ATTESTATION_TTL_SLOTS); + let mut removed_count = 0; + + for idx in 0..self.pending_attestations.attestations.len() { + if let Some(pending) = &self.pending_attestations.attestations[idx] + && pending.received_slot < cutoff_slot + { + // Remove from all by_block indices + for root in &pending.waiting_for { + if let Some(indices) = self.pending_attestations.by_block.get_mut(root) { + indices.retain(|&i| i != idx); + } + } + // Clear the slot + self.pending_attestations.attestations[idx] = None; + self.pending_attestations.free_slots.push(idx); + self.pending_attestations.active_count -= 1; + removed_count += 1; + } + } + + // Clean up empty by_block entries + self.pending_attestations + .by_block + .retain(|_, indices| !indices.is_empty()); + + if removed_count > 0 { + info!( + current_slot, + removed_count, + remaining = self.pending_attestations.active_count, + "Cleaned up expired pending attestations" + ); + } + } + + /// Clean up pending attestations whose target is now finalized + fn cleanup_finalized_pending_attestations(&mut self, finalized_slot: u64) { + let mut removed_count = 0; + + for idx in 0..self.pending_attestations.attestations.len() { + if let Some(pending) = &self.pending_attestations.attestations[idx] + && pending.signed_attestation.message.target.slot <= finalized_slot + { + // Remove from all by_block indices + for root in &pending.waiting_for { + if let Some(indices) = self.pending_attestations.by_block.get_mut(root) { + indices.retain(|&i| i != idx); + } + } + // Clear the slot + self.pending_attestations.attestations[idx] = None; + self.pending_attestations.free_slots.push(idx); + self.pending_attestations.active_count -= 1; + removed_count += 1; + } + } + + // Clean up empty by_block entries + self.pending_attestations + .by_block + .retain(|_, indices| !indices.is_empty()); + + if removed_count > 0 { + info!( + finalized_slot, + removed_count, + remaining = self.pending_attestations.active_count, + "Cleaned up finalized pending attestations" + ); + } + } } #[derive(Clone, Debug)]