From f3cd1aad10617f532ecf038e9cf55057b0f0f614 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 10 Feb 2026 13:20:40 -0300 Subject: [PATCH 1/7] feat: implement devnet 3 changes --- Makefile | 2 +- bin/ethlambda/src/main.rs | 9 +- crates/blockchain/src/lib.rs | 66 ++- crates/blockchain/src/store.rs | 461 +++++++++++------- .../blockchain/tests/forkchoice_spectests.rs | 38 +- .../blockchain/tests/signature_spectests.rs | 2 +- crates/common/types/src/attestation.rs | 8 + crates/common/types/src/block.rs | 7 + crates/net/p2p/src/gossipsub/handler.rs | 70 ++- crates/net/p2p/src/gossipsub/messages.rs | 10 +- crates/net/p2p/src/gossipsub/mod.rs | 6 +- crates/net/p2p/src/lib.rs | 64 ++- crates/net/p2p/src/metrics.rs | 3 +- crates/storage/src/api/tables.rs | 21 +- crates/storage/src/backend/rocksdb.rs | 5 +- crates/storage/src/store.rs | 252 +++++++--- 16 files changed, 708 insertions(+), 316 deletions(-) diff --git a/Makefile b/Makefile index e28ce6d..0167ca3 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ docker-build: ## 🐳 Build the Docker image --build-arg GIT_BRANCH=$(GIT_BRANCH) \ -t ghcr.io/lambdaclass/ethlambda:$(DOCKER_TAG) . -LEAN_SPEC_COMMIT_HASH:=4edcf7bc9271e6a70ded8aff17710d68beac4266 +LEAN_SPEC_COMMIT_HASH:=b39472e73f8a7d603cc13d14426eed14c6eff6f1 leanSpec: git clone https://github.com/leanEthereum/leanSpec.git --single-branch diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index a906f71..4673185 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -46,6 +46,9 @@ struct CliOptions { /// The node ID to look up in annotated_validators.yaml (e.g., "ethlambda_0") #[arg(long)] node_id: String, + /// Whether this node acts as a committee aggregator + #[arg(long, default_value = "false")] + is_aggregator: bool, } #[tokio::main] @@ -104,7 +107,10 @@ async fn main() { let store = Store::from_anchor_state(backend, genesis_state); let (p2p_tx, p2p_rx) = tokio::sync::mpsc::unbounded_channel(); - let blockchain = BlockChain::spawn(store.clone(), p2p_tx, validator_keys); + // Use first validator ID for subnet subscription + let first_validator_id = validator_keys.keys().min().copied(); + let blockchain = + BlockChain::spawn(store.clone(), p2p_tx, validator_keys, options.is_aggregator); let p2p_handle = tokio::spawn(start_p2p( node_p2p_key, @@ -113,6 +119,7 @@ async fn main() { blockchain, p2p_rx, store.clone(), + first_validator_id, )); ethlambda_rpc::start_rpc_server(metrics_socket, store) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 510de5a..e1bc888 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -5,7 +5,7 @@ use ethlambda_state_transition::is_proposer; use ethlambda_storage::Store; use ethlambda_types::{ ShortRoot, - attestation::{Attestation, AttestationData, SignedAttestation}, + attestation::{Attestation, AttestationData, SignedAggregatedAttestation, SignedAttestation}, block::{BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation}, primitives::{H256, ssz::TreeHash}, signature::ValidatorSecretKey, @@ -30,6 +30,8 @@ pub enum P2PMessage { PublishAttestation(SignedAttestation), /// Publish a block to the gossip network. PublishBlock(SignedBlockWithAttestation), + /// Publish an aggregated attestation to the gossip network. + PublishAggregatedAttestation(SignedAggregatedAttestation), /// Fetch a block by its root hash. FetchBlock(H256), } @@ -38,14 +40,23 @@ pub struct BlockChain { handle: GenServerHandle, } -/// Seconds in a slot. Each slot has 4 intervals of 1 second each. +/// Seconds in a slot. pub const SECONDS_PER_SLOT: u64 = 4; +/// Milliseconds in a slot. +pub const MILLISECONDS_PER_SLOT: u64 = 4_000; +/// Milliseconds per interval (800ms ticks). +pub const MILLISECONDS_PER_INTERVAL: u64 = 800; +/// Number of intervals per slot (5 intervals of 800ms = 4 seconds). +pub const INTERVALS_PER_SLOT: u64 = 5; +/// Number of attestation committees per slot. +pub const ATTESTATION_COMMITTEE_COUNT: u64 = 1; impl BlockChain { pub fn spawn( store: Store, p2p_tx: mpsc::UnboundedSender, validator_keys: HashMap, + is_aggregator: bool, ) -> BlockChain { let genesis_time = store.config().genesis_time; let key_manager = key_manager::KeyManager::new(validator_keys); @@ -54,6 +65,7 @@ impl BlockChain { p2p_tx, key_manager, pending_blocks: HashMap::new(), + is_aggregator, } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -84,6 +96,20 @@ impl BlockChain { .await .inspect_err(|err| error!(%err, "Failed to notify BlockChain of new attestation")); } + + /// Sends an aggregated attestation to the BlockChain for processing. + pub async fn notify_new_aggregated_attestation( + &mut self, + attestation: SignedAggregatedAttestation, + ) { + let _ = self + .handle + .cast(CastMessage::NewAggregatedAttestation(attestation)) + .await + .inspect_err( + |err| error!(%err, "Failed to notify BlockChain of new aggregated attestation"), + ); + } } struct BlockChainServer { @@ -93,16 +119,19 @@ struct BlockChainServer { // Pending blocks waiting for their parent pending_blocks: HashMap>, + + /// Whether this node acts as a committee aggregator. + is_aggregator: bool, } impl BlockChainServer { fn on_tick(&mut self, timestamp: u64) { let genesis_time = self.store.config().genesis_time; - // Calculate current slot and interval + // Calculate current slot and interval from seconds let time_since_genesis = timestamp.saturating_sub(genesis_time); let slot = time_since_genesis / SECONDS_PER_SLOT; - let interval = time_since_genesis % SECONDS_PER_SLOT; + let interval = (time_since_genesis % SECONDS_PER_SLOT) * 1000 / MILLISECONDS_PER_INTERVAL; // Update current slot metric metrics::update_current_slot(slot); @@ -115,7 +144,12 @@ impl BlockChainServer { .flatten(); // Tick the store first - this accepts attestations at interval 0 if we have a proposal - store::on_tick(&mut self.store, timestamp, proposer_validator_id.is_some()); + store::on_tick( + &mut self.store, + timestamp, + proposer_validator_id.is_some(), + self.is_aggregator, + ); // Now build and publish the block (after attestations have been accepted) if let Some(validator_id) = proposer_validator_id { @@ -127,7 +161,7 @@ impl BlockChainServer { self.produce_attestations(slot); } - // Update safe target slot metric (updated by store.on_tick at interval 2) + // Update safe target slot metric (updated by store.on_tick at interval 3) metrics::update_safe_target_slot(self.store.safe_target_slot()); } @@ -349,15 +383,21 @@ impl BlockChainServer { } fn on_gossip_attestation(&mut self, attestation: SignedAttestation) { - let _ = store::on_gossip_attestation(&mut self.store, attestation) + let _ = store::on_gossip_attestation(&mut self.store, attestation, self.is_aggregator) .inspect_err(|err| warn!(%err, "Failed to process gossiped attestation")); } + + fn on_gossip_aggregated_attestation(&mut self, attestation: SignedAggregatedAttestation) { + let _ = store::on_gossip_aggregated_attestation(&mut self.store, attestation) + .inspect_err(|err| warn!(%err, "Failed to process gossiped aggregated attestation")); + } } #[derive(Clone, Debug)] enum CastMessage { NewBlock(SignedBlockWithAttestation), NewAttestation(SignedAttestation), + NewAggregatedAttestation(SignedAggregatedAttestation), Tick, } @@ -389,11 +429,12 @@ impl GenServer for BlockChainServer { .elapsed() .expect("already past the unix epoch"); self.on_tick(timestamp.as_secs()); - // Schedule the next tick at the start of the next second - let millis_to_next_sec = - ((timestamp.as_secs() as u128 + 1) * 1000 - timestamp.as_millis()) as u64; + // Schedule the next tick at the next 800ms interval boundary + let ms_since_epoch = timestamp.as_millis() as u64; + let ms_to_next_interval = + MILLISECONDS_PER_INTERVAL - (ms_since_epoch % MILLISECONDS_PER_INTERVAL); send_after( - Duration::from_millis(millis_to_next_sec), + Duration::from_millis(ms_to_next_interval), handle.clone(), message, ); @@ -402,6 +443,9 @@ impl GenServer for BlockChainServer { self.on_block(signed_block); } CastMessage::NewAttestation(attestation) => self.on_gossip_attestation(attestation), + CastMessage::NewAggregatedAttestation(attestation) => { + self.on_gossip_aggregated_attestation(attestation); + } } CastResponse::NoReply } diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 04138a9..71fa594 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -4,34 +4,40 @@ use ethlambda_crypto::aggregate_signatures; use ethlambda_state_transition::{ is_proposer, process_block, process_slots, slot_is_justifiable_after, }; -use ethlambda_storage::{ForkCheckpoints, SignatureKey, Store}; +use ethlambda_storage::{ForkCheckpoints, SignatureKey, Store, StoredAggregatedPayload}; use ethlambda_types::{ ShortRoot, - attestation::{AggregatedAttestation, Attestation, AttestationData, SignedAttestation}, + attestation::{ + AggregatedAttestation, Attestation, AttestationData, SignedAggregatedAttestation, + SignedAttestation, + }, block::{ AggregatedAttestations, AggregatedSignatureProof, AggregationBits, Block, BlockBody, SignedBlockWithAttestation, }, primitives::{H256, ssz::TreeHash}, signature::ValidatorSignature, - state::{Checkpoint, State, Validator}, + state::{Checkpoint, State}, }; use tracing::{info, trace, warn}; -use crate::{SECONDS_PER_SLOT, metrics}; +use crate::{INTERVALS_PER_SLOT, MILLISECONDS_PER_INTERVAL, SECONDS_PER_SLOT, metrics}; const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; -/// Accept new attestations, moving them from pending to known. +/// Accept new aggregated payloads, promoting them to known for fork choice. fn accept_new_attestations(store: &mut Store) { - store.promote_new_attestations(); + store.promote_new_aggregated_payloads(); update_head(store); } /// Update the head based on the fork choice rule. fn update_head(store: &mut Store) { let blocks = store.get_live_chain(); - let attestations: HashMap = store.iter_known_attestations().collect(); + let attestations = extract_attestations_from_aggregated_payloads( + store, + store.iter_known_aggregated_payloads(), + ); let old_head = store.head(); let new_head = ethlambda_fork_choice::compute_lmd_ghost_head( store.latest_justified().root, @@ -76,7 +82,8 @@ fn update_safe_target(store: &mut Store) { let min_target_score = (num_validators * 2).div_ceil(3); let blocks = store.get_live_chain(); - let attestations: HashMap = store.iter_new_attestations().collect(); + let attestations = + extract_attestations_from_aggregated_payloads(store, store.iter_new_aggregated_payloads()); let safe_target = ethlambda_fork_choice::compute_lmd_ghost_head( store.latest_justified().root, &blocks, @@ -86,15 +93,117 @@ fn update_safe_target(store: &mut Store) { store.set_safe_target(safe_target); } +/// Reconstruct per-validator attestation data from aggregated payloads. +/// +/// For each (validator_id, data_root) key in the payloads, looks up the +/// attestation data by root. Returns the latest attestation per validator +/// (by slot). +fn extract_attestations_from_aggregated_payloads( + store: &Store, + payloads: impl Iterator)>, +) -> HashMap { + let mut result: HashMap = HashMap::new(); + + for ((validator_id, data_root), _payload_list) in payloads { + let Some(data) = store.get_attestation_data_by_root(&data_root) else { + continue; + }; + + let should_update = result + .get(&validator_id) + .is_none_or(|existing| existing.slot < data.slot); + + if should_update { + result.insert(validator_id, data); + } + } + + result +} + +/// Aggregate committee signatures at interval 2. +/// +/// Collects individual gossip signatures, aggregates them by attestation data, +/// and stores the resulting proofs in `LatestNewAggregatedPayloads`. +fn aggregate_committee_signatures(store: &mut Store) { + let gossip_sigs: Vec<(SignatureKey, _)> = store.iter_gossip_signatures().collect(); + if gossip_sigs.is_empty() { + return; + } + + let head_state = store.head_state(); + let validators = &head_state.validators; + + // Group gossip signatures by data_root for batch aggregation + let mut groups: HashMap> = HashMap::new(); + let mut keys_to_delete: Vec = Vec::new(); + + for ((validator_id, data_root), stored_sig) in &gossip_sigs { + if let Ok(sig) = stored_sig.to_validator_signature() { + groups + .entry(*data_root) + .or_default() + .push((*validator_id, sig)); + keys_to_delete.push((*validator_id, *data_root)); + } + } + + for (data_root, validators_and_sigs) in groups { + let Some(data) = store.get_attestation_data_by_root(&data_root) else { + continue; + }; + + let slot = data.slot; + let message = data.tree_hash_root(); + + let mut sigs = vec![]; + let mut pubkeys = vec![]; + let mut ids = vec![]; + + for (vid, sig) in &validators_and_sigs { + let Ok(pubkey) = validators[*vid as usize].get_pubkey() else { + continue; + }; + sigs.push(sig.clone()); + pubkeys.push(pubkey); + ids.push(*vid); + } + + if ids.is_empty() { + continue; + } + + let Ok(proof_data) = aggregate_signatures(pubkeys, sigs, &message, slot as u32) + .inspect_err(|err| warn!(%err, "Failed to aggregate committee signatures")) + else { + continue; + }; + + let participants = aggregation_bits_from_validator_indices(&ids); + let proof = AggregatedSignatureProof::new(participants, proof_data); + let payload = StoredAggregatedPayload { slot, proof }; + + // Store in new aggregated payloads for each covered validator + for vid in &ids { + store.insert_new_aggregated_payload((*vid, data_root), payload.clone()); + } + + metrics::inc_pq_sig_aggregated_signatures(); + metrics::inc_pq_sig_attestations_in_aggregated_signatures(ids.len() as u64); + } + + // Delete aggregated entries from gossip_signatures + store.delete_gossip_signatures(&keys_to_delete); +} + /// Validate incoming attestation before processing. /// /// Ensures the vote respects the basic laws of time and topology: /// 1. The blocks voted for must exist in our store. /// 2. A vote cannot span backwards in time (source > target). /// 3. A vote cannot be for a future slot. -fn validate_attestation(store: &Store, attestation: &Attestation) -> Result<(), StoreError> { +fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<(), StoreError> { let _timing = metrics::time_attestation_validation(); - let data = &attestation.data; // Availability Check - We cannot count a vote if we haven't seen the blocks involved. let source_header = store @@ -129,7 +238,7 @@ fn validate_attestation(store: &Store, attestation: &Attestation) -> Result<(), // Time Check - Validate attestation is not too far in the future. // We allow a small margin for clock disparity (1 slot), but no further. - let current_slot = store.time() / SECONDS_PER_SLOT; + let current_slot = store.time() / INTERVALS_PER_SLOT; if data.slot > current_slot + 1 { return Err(StoreError::AttestationTooFarInFuture { attestation_slot: data.slot, @@ -141,20 +250,27 @@ fn validate_attestation(store: &Store, attestation: &Attestation) -> Result<(), } /// Process a tick event. -pub fn on_tick(store: &mut Store, timestamp: u64, has_proposal: bool) { - let time = timestamp - store.config().genesis_time; +/// +/// `store.time()` represents interval-count-since-genesis: each increment is one +/// 800ms interval. Slot and interval-within-slot are derived as: +/// slot = store.time() / INTERVALS_PER_SLOT +/// interval = store.time() % INTERVALS_PER_SLOT +pub fn on_tick(store: &mut Store, timestamp: u64, has_proposal: bool, is_aggregator: bool) { + // Convert UNIX timestamp to interval count since genesis + let time_delta_ms = (timestamp - store.config().genesis_time) * 1000; + let time = time_delta_ms / MILLISECONDS_PER_INTERVAL; // If we're more than a slot behind, fast-forward to a slot before. // Operations are idempotent, so this should be fine. - if time.saturating_sub(store.time()) > SECONDS_PER_SLOT { - store.set_time(time - SECONDS_PER_SLOT); + if time.saturating_sub(store.time()) > INTERVALS_PER_SLOT { + store.set_time(time - INTERVALS_PER_SLOT); } while store.time() < time { store.set_time(store.time() + 1); - let slot = store.time() / SECONDS_PER_SLOT; - let interval = store.time() % SECONDS_PER_SLOT; + let slot = store.time() / INTERVALS_PER_SLOT; + let interval = store.time() % INTERVALS_PER_SLOT; trace!(%slot, %interval, "processing tick"); @@ -171,33 +287,53 @@ pub fn on_tick(store: &mut Store, timestamp: u64, has_proposal: bool) { } } 1 => { - // Second interval - no action + // Vote propagation — no action } 2 => { - // Mid-slot - update safe target for validators - update_safe_target(store); + // Aggregation interval + if is_aggregator { + aggregate_committee_signatures(store); + } } 3 => { + // Update safe target for validators + update_safe_target(store); + } + 4 => { // End of slot - accept accumulated attestations accept_new_attestations(store); } - _ => unreachable!("slots only have 4 intervals"), + _ => unreachable!("slots only have 5 intervals"), } } } /// Process a gossiped attestation. +/// +/// Verifies the signature, stores attestation data by root, and (if this node +/// is an aggregator) stores the gossip signature for later aggregation. pub fn on_gossip_attestation( store: &mut Store, signed_attestation: SignedAttestation, + is_aggregator: bool, ) -> Result<(), StoreError> { let validator_id = signed_attestation.validator_id; let attestation = Attestation { validator_id, data: signed_attestation.message, }; - validate_attestation(store, &attestation) + validate_attestation_data(store, &attestation.data) .inspect_err(|_| metrics::inc_attestations_invalid("gossip"))?; + + // Reject attestations from future slots + let current_slot = store.time() / INTERVALS_PER_SLOT; + if attestation.data.slot > current_slot { + return Err(StoreError::AttestationTooFarInFuture { + attestation_slot: attestation.data.slot, + current_slot, + }); + } + let target = attestation.data.target; let target_state = store .get_state(&target.root) @@ -211,7 +347,6 @@ pub fn on_gossip_attestation( let message = attestation.data.tree_hash_root(); if cfg!(not(feature = "skip-signature-verification")) { use ethlambda_types::signature::ValidatorSignature; - // Use attestation.data.slot as epoch (matching what Zeam and ethlambda use for signing) let epoch: u32 = attestation.data.slot.try_into().expect("slot exceeds u32"); let signature = ValidatorSignature::from_bytes(&signed_attestation.signature) .map_err(|_| StoreError::SignatureDecodingFailed)?; @@ -219,14 +354,28 @@ pub fn on_gossip_attestation( return Err(StoreError::SignatureVerificationFailed); } } - on_attestation(store, attestation.clone(), false)?; - if cfg!(not(feature = "skip-signature-verification")) { - // Store signature for later lookup during block building + // Store attestation data by root (content-addressed, idempotent) + let data_root = attestation.data.tree_hash_root(); + store.insert_attestation_data_by_root(data_root, attestation.data.clone()); + + if cfg!(feature = "skip-signature-verification") { + // Without signature verification, insert directly into new aggregated payloads + // with a dummy proof so the fork choice pipeline still sees attestations. + let participants = aggregation_bits_from_validator_indices(&[validator_id]); + let payload = StoredAggregatedPayload { + slot: attestation.data.slot, + proof: AggregatedSignatureProof::empty(participants), + }; + store.insert_new_aggregated_payload((validator_id, data_root), payload); + } else if is_aggregator { + // With verification, store gossip signature for later aggregation at interval 2. + // With ATTESTATION_COMMITTEE_COUNT=1, all validators are in the same subnet. let signature = ValidatorSignature::from_bytes(&signed_attestation.signature) .map_err(|_| StoreError::SignatureDecodingFailed)?; store.insert_gossip_signature(&attestation.data, validator_id, signature); } + metrics::inc_attestations_valid("gossip"); let slot = attestation.data.slot; @@ -245,67 +394,52 @@ pub fn on_gossip_attestation( Ok(()) } -/// Process a new attestation and place it into the correct attestation stage. -/// -/// Attestations can come from: -/// - a block body (on-chain, `is_from_block=true`), or -/// - the gossip network (off-chain, `is_from_block=false`). +/// Process a gossiped aggregated attestation from the aggregation subnet. /// -/// The Attestation Pipeline: -/// - Stage 1 (latest_new_attestations): Pending attestations not yet counted in fork choice. -/// - Stage 2 (latest_known_attestations): Active attestations used by LMD-GHOST. -fn on_attestation( +/// Aggregated attestations arrive from committee aggregators and contain a proof +/// covering multiple validators. We store one aggregated payload entry per +/// participating validator so the fork choice extraction works uniformly. +pub fn on_gossip_aggregated_attestation( store: &mut Store, - attestation: Attestation, - is_from_block: bool, + aggregated: SignedAggregatedAttestation, ) -> Result<(), StoreError> { - // First, ensure the attestation is structurally and temporally valid. - validate_attestation(store, &attestation)?; + validate_attestation_data(store, &aggregated.data) + .inspect_err(|_| metrics::inc_attestations_invalid("aggregated"))?; - let validator_id = attestation.validator_id; - let attestation_data = attestation.data; - let attestation_slot = attestation_data.slot; - - if is_from_block { - // On-chain attestation processing - // These are historical attestations from other validators included by the proposer. - // They are processed immediately as "known" attestations. - - let should_update = store - .get_known_attestation(&validator_id) - .is_none_or(|latest| latest.slot < attestation_slot); + // Reject attestations from future slots + let current_slot = store.time() / INTERVALS_PER_SLOT; + if aggregated.data.slot > current_slot { + return Err(StoreError::AttestationTooFarInFuture { + attestation_slot: aggregated.data.slot, + current_slot, + }); + } - if should_update { - store.insert_known_attestation(validator_id, attestation_data.clone()); - } + // Store attestation data by root (content-addressed, idempotent) + let data_root = aggregated.data.tree_hash_root(); + store.insert_attestation_data_by_root(data_root, aggregated.data.clone()); - // Remove pending attestation if superseded by on-chain attestation - if let Some(existing_new) = store.get_new_attestation(&validator_id) - && existing_new.slot <= attestation_slot - { - store.remove_new_attestation(&validator_id); - } - } else { - // Network gossip attestation processing - // These enter the "new" stage and must wait for interval tick acceptance. - - // Reject attestations from future slots - let current_slot = store.time() / SECONDS_PER_SLOT; - if attestation_slot > current_slot { - return Err(StoreError::AttestationTooFarInFuture { - attestation_slot, - current_slot, - }); - } + // Store one aggregated payload per participating validator + for validator_id in aggregated.proof.participant_indices() { + let payload = StoredAggregatedPayload { + slot: aggregated.data.slot, + proof: aggregated.proof.clone(), + }; + store.insert_new_aggregated_payload((validator_id, data_root), payload); + } - let should_update = store - .get_new_attestation(&validator_id) - .is_none_or(|latest| latest.slot < attestation_slot); + let slot = aggregated.data.slot; + let num_participants = aggregated.proof.participants.num_set_bits(); + info!( + slot, + num_participants, + target_slot = aggregated.data.target.slot, + target_root = %ShortRoot(&aggregated.data.target.root.0), + source_slot = aggregated.data.source.slot, + "Aggregated attestation processed" + ); - if should_update { - store.insert_new_attestation(validator_id, attestation_data); - } - } + metrics::inc_attestations_valid("aggregated"); Ok(()) } @@ -380,30 +514,28 @@ pub fn on_block( let attestation_signatures = &signed_block.signature.attestation_signatures; // Process block body attestations. - // TODO: fail the block if an attestation is invalid. Right now we - // just log a warning. + // Store attestation data by root and proofs in known aggregated payloads. for (att, proof) in aggregated_attestations .iter() .zip(attestation_signatures.iter()) { + let data_root = att.data.tree_hash_root(); + store.insert_attestation_data_by_root(data_root, att.data.clone()); + let validator_ids = aggregation_bits_to_validator_indices(&att.aggregation_bits); + let payload = StoredAggregatedPayload { + slot: att.data.slot, + proof: proof.clone(), + }; - for validator_id in validator_ids { - // Update Proof Map - Store the proof so future block builders can reuse this aggregation - store.insert_aggregated_payload(&att.data, validator_id, proof.clone()); + for validator_id in &validator_ids { + // Store proof in known aggregated payloads (active in fork choice) + store.insert_known_aggregated_payload((*validator_id, data_root), payload.clone()); - // Update Fork Choice - Register the vote immediately (historical/on-chain) - let attestation = Attestation { - validator_id, - data: att.data.clone(), - }; - // TODO: validate attestations before processing - let _ = on_attestation(store, attestation, true) - .inspect(|_| metrics::inc_attestations_valid("block")) - .inspect_err(|err| { - warn!(%slot, %validator_id, %err, "Invalid attestation in block"); - metrics::inc_attestations_invalid("block"); - }); + // Also store in existing AggregatedPayloads table for block building + store.insert_aggregated_payload(&att.data, *validator_id, proof.clone()); + + metrics::inc_attestations_valid("block"); } } @@ -412,31 +544,28 @@ pub fn on_block( // to prevent the proposer from gaining circular weight advantage. update_head(store); - // Process proposer attestation as if received via gossip + // Process proposer attestation as pending (enters "new" stage via gossip path) // The proposer's attestation should NOT affect this block's fork choice position. - // It is treated as pending until interval 3 (end of slot). - - if cfg!(not(feature = "skip-signature-verification")) { + let proposer_vid = proposer_attestation.validator_id; + let proposer_data_root = proposer_attestation.data.tree_hash_root(); + store.insert_attestation_data_by_root(proposer_data_root, proposer_attestation.data.clone()); + + if cfg!(feature = "skip-signature-verification") { + // Without sig verification, insert directly with a dummy proof + let participants = aggregation_bits_from_validator_indices(&[proposer_vid]); + let payload = StoredAggregatedPayload { + slot: proposer_attestation.data.slot, + proof: AggregatedSignatureProof::empty(participants), + }; + store.insert_new_aggregated_payload((proposer_vid, proposer_data_root), payload); + } else { // Store the proposer's signature for potential future block building let proposer_sig = ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature) .map_err(|_| StoreError::SignatureDecodingFailed)?; - store.insert_gossip_signature( - &proposer_attestation.data, - proposer_attestation.validator_id, - proposer_sig, - ); + store.insert_gossip_signature(&proposer_attestation.data, proposer_vid, proposer_sig); } - // Process proposer attestation (enters "new" stage, not "known") - // TODO: validate attestations before processing - let _ = on_attestation(store, proposer_attestation, false) - .inspect(|_| metrics::inc_attestations_valid("gossip")) - .inspect_err(|err| { - warn!(%slot, %err, "Invalid proposer attestation in block"); - metrics::inc_attestations_invalid("block"); - }); - info!(%slot, %block_root, %state_root, "Processed new block"); Ok(()) } @@ -539,7 +668,7 @@ fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { let slot_time = store.config().genesis_time + slot * SECONDS_PER_SLOT; // Advance time to current slot (ticking intervals) - on_tick(store, slot_time, true); + on_tick(store, slot_time, true, false); // Process any pending attestations before proposal accept_new_attestations(store); @@ -575,20 +704,20 @@ pub fn produce_block_with_signatures( }); } - // Convert AttestationData to Attestation objects for build_block - let available_attestations: Vec = store - .iter_known_attestations() + // Convert known aggregated payloads to Attestation objects for build_block + let known_attestations = extract_attestations_from_aggregated_payloads( + store, + store.iter_known_aggregated_payloads(), + ); + let available_attestations: Vec = known_attestations + .into_iter() .map(|(validator_id, data)| Attestation { validator_id, data }) .collect(); // Get known block roots for attestation validation let known_block_roots = store.get_block_roots(); - // Collect signature data for block building - let gossip_signatures: HashMap = store - .iter_gossip_signatures() - .filter_map(|(key, stored)| stored.to_validator_signature().ok().map(|sig| (key, sig))) - .collect(); + // Collect existing proofs for block building (from previously received blocks) let aggregated_payloads: HashMap> = store .iter_aggregated_payloads() .map(|(key, stored_payloads)| { @@ -605,7 +734,6 @@ pub fn produce_block_with_signatures( head_root, &available_attestations, &known_block_roots, - &gossip_signatures, &aggregated_payloads, )?; @@ -768,7 +896,6 @@ fn aggregate_attestations_by_data(attestations: &[Attestation]) -> Vec, - gossip_signatures: &HashMap, aggregated_payloads: &HashMap>, ) -> Result<(Block, State, Vec), StoreError> { // Start with empty attestation set @@ -786,7 +912,7 @@ fn build_block( let mut included_keys: HashSet = HashSet::new(); // Fixed-point loop: collect attestations until no new ones can be added - let post_state = loop { + let _post_state = loop { // Aggregate attestations by data for the candidate block let aggregated = aggregate_attestations_by_data(&included_attestations); let attestations: AggregatedAttestations = aggregated @@ -835,11 +961,8 @@ fn build_block( continue; } - // Only include if we have a signature for this attestation - let has_gossip_sig = gossip_signatures.contains_key(&sig_key); - let has_block_proof = aggregated_payloads.contains_key(&sig_key); - - if has_gossip_sig || has_block_proof { + // Only include if we have a proof for this attestation + if aggregated_payloads.contains_key(&sig_key) { new_attestations.push(attestation.clone()); included_keys.insert(sig_key); } @@ -854,13 +977,9 @@ fn build_block( included_attestations.extend(new_attestations); }; - // Compute the aggregated signatures for the attestations. - let (aggregated_attestations, aggregated_signatures) = compute_aggregated_signatures( - post_state.validators.iter().as_slice(), - &included_attestations, - gossip_signatures, - aggregated_payloads, - )?; + // Select existing proofs for the attestations to include in the block. + let (aggregated_attestations, aggregated_signatures) = + select_aggregated_proofs(&included_attestations, aggregated_payloads)?; let attestations: AggregatedAttestations = aggregated_attestations .try_into() @@ -884,15 +1003,15 @@ fn build_block( Ok((final_block, post_state, aggregated_signatures)) } -/// Compute aggregated signatures for a set of attestations. +/// Select existing aggregated proofs for attestations to include in a block. +/// +/// Fresh gossip aggregation happens at interval 2 (`aggregate_committee_signatures`). +/// This function only selects from existing proofs in the `AggregatedPayloads` table +/// (proofs from previously received blocks). /// -/// The result is a list of (attestation, proof) pairs ready for block inclusion. -/// This list might contain attestations with the same data but different signatures. -/// Once we support recursive aggregation, we can aggregate these further. -fn compute_aggregated_signatures( - validators: &[Validator], +/// Returns a list of (attestation, proof) pairs ready for block inclusion. +fn select_aggregated_proofs( attestations: &[Attestation], - gossip_signatures: &HashMap, aggregated_payloads: &HashMap>, ) -> Result<(Vec, Vec), StoreError> { let mut results = vec![]; @@ -902,51 +1021,14 @@ fn compute_aggregated_signatures( let message = data.tree_hash_root(); let validator_ids = aggregation_bits_to_validator_indices(&aggregated.aggregation_bits); + let mut remaining: HashSet = validator_ids.into_iter().collect(); - // Phase 1: Gossip Collection - // Try to aggregate fresh signatures from the network. - let mut gossip_sigs = vec![]; - let mut gossip_keys = vec![]; - let mut gossip_ids = vec![]; - - let mut remaining = HashSet::new(); - - for vid in &validator_ids { - let key = (*vid, message); - if let Some(sig) = gossip_signatures.get(&key) { - let pubkey = validators[*vid as usize] - .get_pubkey() - .expect("valid pubkey"); - - gossip_sigs.push(sig.clone()); - gossip_keys.push(pubkey); - gossip_ids.push(*vid); - } else { - remaining.insert(*vid); - } - } - - if !gossip_ids.is_empty() { - let participants = aggregation_bits_from_validator_indices(&gossip_ids); - let proof_data = - aggregate_signatures(gossip_keys, gossip_sigs, &message, data.slot as u32) - .map_err(StoreError::SignatureAggregationFailed)?; - let aggregated_attestation = AggregatedAttestation { - aggregation_bits: participants.clone(), - data: data.clone(), + // Select existing proofs that cover the most remaining validators + while !remaining.is_empty() { + let Some(&target_id) = remaining.iter().next() else { + break; }; - let aggregate_proof = AggregatedSignatureProof::new(participants, proof_data); - results.push((aggregated_attestation, aggregate_proof)); - - metrics::inc_pq_sig_aggregated_signatures(); - metrics::inc_pq_sig_attestations_in_aggregated_signatures(gossip_ids.len() as u64); - } - // Phase 2: Fallback to existing proofs - // We might have seen proofs for missing signatures in previously-received blocks. - while !aggregated_payloads.is_empty() - && let Some(&target_id) = remaining.iter().next() - { let Some(candidates) = aggregated_payloads .get(&(target_id, message)) .filter(|v| !v.is_empty()) @@ -966,10 +1048,11 @@ fn compute_aggregated_signatures( .max_by_key(|(_, covered)| covered.len()) .expect("candidates is not empty"); - // Sanity check: ensure proof covers at least one remaining validator + // No proof covers any remaining validator if covered.is_empty() { break; } + let aggregate = AggregatedAttestation { aggregation_bits: proof.participants.clone(), data: data.clone(), diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index e1c69e1..ac4c5f2 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -5,14 +5,35 @@ use std::{ }; use ethlambda_blockchain::{SECONDS_PER_SLOT, store}; -use ethlambda_storage::{Store, backend::InMemoryBackend}; +use ethlambda_storage::{SignatureKey, Store, StoredAggregatedPayload, backend::InMemoryBackend}; use ethlambda_types::{ - attestation::Attestation, + attestation::{Attestation, AttestationData}, block::{Block, BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation}, primitives::{H256, VariableList, ssz::TreeHash}, state::State, }; +/// Extract per-validator attestation data from aggregated payloads. +/// Test helper that mirrors the private function in blockchain::store. +fn extract_attestations( + store: &Store, + payloads: impl Iterator)>, +) -> HashMap { + let mut result: HashMap = HashMap::new(); + for ((validator_id, data_root), _) in payloads { + let Some(data) = store.get_attestation_data_by_root(&data_root) else { + continue; + }; + let should_update = result + .get(&validator_id) + .is_none_or(|existing| existing.slot < data.slot); + if should_update { + result.insert(validator_id, data); + } + } + result +} + use crate::types::{ForkChoiceTestVector, StoreChecks}; const SUPPORTED_FIXTURE_FORMAT: &str = "fork_choice_test"; @@ -61,7 +82,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time; // NOTE: the has_proposal argument is set to true, following the spec - store::on_tick(&mut store, block_time, true); + store::on_tick(&mut store, block_time, true, false); let result = store::on_block(&mut store, signed_block); match (result.is_ok(), step.valid) { @@ -86,7 +107,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { "tick" => { let timestamp = step.time.expect("tick step missing time"); // NOTE: the has_proposal argument is set to false, following the spec - store::on_tick(&mut store, timestamp, false); + store::on_tick(&mut store, timestamp, false, false); } other => { // Fail for unsupported step types for now @@ -279,14 +300,12 @@ fn validate_attestation_check( check: &types::AttestationCheck, step_idx: usize, ) -> datatest_stable::Result<()> { - use ethlambda_types::attestation::AttestationData; - let validator_id = check.validator; let location = check.location.as_str(); let attestations: HashMap = match location { - "new" => st.iter_new_attestations().collect(), - "known" => st.iter_known_attestations().collect(), + "new" => extract_attestations(st, st.iter_new_aggregated_payloads()), + "known" => extract_attestations(st, st.iter_known_aggregated_payloads()), other => { return Err( format!("Step {}: unknown attestation location: {}", step_idx, other).into(), @@ -366,7 +385,8 @@ fn validate_lexicographic_head_among( } let blocks = st.get_live_chain(); - let known_attestations: HashMap = st.iter_known_attestations().collect(); + let known_attestations: HashMap = + extract_attestations(st, st.iter_known_aggregated_payloads()); // Resolve all fork labels to roots and compute their weights // Map: label -> (root, slot, weight) diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index 40ec7aa..2d5a75b 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -50,7 +50,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { // Advance time to the block's slot let block_time = signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time; - store::on_tick(&mut st, block_time, true); + store::on_tick(&mut st, block_time, true, false); // Process the block (this includes signature verification) let result = store::on_block(&mut st, signed_block); diff --git a/crates/common/types/src/attestation.rs b/crates/common/types/src/attestation.rs index 309901a..6390a2b 100644 --- a/crates/common/types/src/attestation.rs +++ b/crates/common/types/src/attestation.rs @@ -1,4 +1,5 @@ use crate::{ + block::AggregatedSignatureProof, primitives::ssz::{Decode, Encode, TreeHash}, signature::SignatureSize, state::{Checkpoint, ValidatorRegistryLimit}, @@ -61,3 +62,10 @@ pub struct AggregatedAttestation { /// A general-purpose bitfield for tracking which validators have participated /// in some collective action (attestation, signature aggregation, etc.). pub type AggregationBits = ssz_types::BitList; + +/// Aggregated attestation with its signature proof, used for gossip on the aggregation topic. +#[derive(Debug, Clone, Encode, Decode)] +pub struct SignedAggregatedAttestation { + pub data: AttestationData, + pub proof: AggregatedSignatureProof, +} diff --git a/crates/common/types/src/block.rs b/crates/common/types/src/block.rs index 776dbb1..facdc45 100644 --- a/crates/common/types/src/block.rs +++ b/crates/common/types/src/block.rs @@ -112,6 +112,13 @@ impl AggregatedSignatureProof { pub fn proof_data(&self) -> &ByteListMiB { &self.proof_data } + + /// Returns the validator indices that are set in the participants bitfield. + pub fn participant_indices(&self) -> impl Iterator + '_ { + (0..self.participants.len()) + .filter(|&i| self.participants.get(i).unwrap_or(false)) + .map(|i| i as u64) + } } /// Bitlist representing validator participation in an attestation or signature. diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index 9335270..a52b65f 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -1,6 +1,6 @@ use ethlambda_types::{ ShortRoot, - attestation::SignedAttestation, + attestation::{SignedAggregatedAttestation, SignedAttestation}, block::SignedBlockWithAttestation, primitives::ssz::{Decode, Encode, TreeHash}, }; @@ -9,7 +9,7 @@ use tracing::{error, info, trace}; use super::{ encoding::{compress_message, decompress_message}, - messages::{ATTESTATION_TOPIC_KIND, BLOCK_TOPIC_KIND}, + messages::{AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND}, }; use crate::P2PServer; @@ -22,7 +22,8 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { else { unreachable!("we already matched on event_loop"); }; - match message.topic.as_str().split("/").nth(3) { + let topic_kind = message.topic.as_str().split("/").nth(3); + match topic_kind { Some(BLOCK_TOPIC_KIND) => { let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped block")) @@ -50,7 +51,33 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { ); server.blockchain.notify_new_block(signed_block).await; } - Some(ATTESTATION_TOPIC_KIND) => { + Some(AGGREGATION_TOPIC_KIND) => { + let Ok(uncompressed_data) = decompress_message(&message.data) + .inspect_err(|err| error!(%err, "Failed to decompress gossipped aggregation")) + else { + return; + }; + + let Ok(aggregation) = SignedAggregatedAttestation::from_ssz_bytes(&uncompressed_data) + .inspect_err(|err| error!(?err, "Failed to decode gossipped aggregation")) + else { + return; + }; + let slot = aggregation.data.slot; + info!( + %slot, + target_slot = aggregation.data.target.slot, + target_root = %ShortRoot(&aggregation.data.target.root.0), + source_slot = aggregation.data.source.slot, + source_root = %ShortRoot(&aggregation.data.source.root.0), + "Received aggregated attestation from gossip" + ); + server + .blockchain + .notify_new_aggregated_attestation(aggregation) + .await; + } + Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => { let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped attestation")) else { @@ -95,7 +122,7 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte // Compress with raw snappy let compressed = compress_message(&ssz_bytes); - // Publish to gossipsub + // Publish to the attestation subnet topic let _ = server .swarm .behaviour_mut() @@ -148,3 +175,36 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlockWith |err| tracing::warn!(%slot, %proposer, %err, "Failed to publish block to gossipsub"), ); } + +pub async fn publish_aggregated_attestation( + server: &mut P2PServer, + attestation: SignedAggregatedAttestation, +) { + let slot = attestation.data.slot; + + // Encode to SSZ + let ssz_bytes = attestation.as_ssz_bytes(); + + // Compress with raw snappy + let compressed = compress_message(&ssz_bytes); + + // Publish to the aggregation topic + let _ = server + .swarm + .behaviour_mut() + .gossipsub + .publish(server.aggregation_topic.clone(), compressed) + .inspect(|_| { + info!( + %slot, + target_slot = attestation.data.target.slot, + target_root = %ShortRoot(&attestation.data.target.root.0), + source_slot = attestation.data.source.slot, + source_root = %ShortRoot(&attestation.data.source.root.0), + "Published aggregated attestation to gossipsub" + ) + }) + .inspect_err(|err| { + tracing::warn!(%slot, %err, "Failed to publish aggregated attestation to gossipsub") + }); +} diff --git a/crates/net/p2p/src/gossipsub/messages.rs b/crates/net/p2p/src/gossipsub/messages.rs index 7df4db1..2b31eb6 100644 --- a/crates/net/p2p/src/gossipsub/messages.rs +++ b/crates/net/p2p/src/gossipsub/messages.rs @@ -1,4 +1,10 @@ /// Topic kind for block gossip pub const BLOCK_TOPIC_KIND: &str = "block"; -/// Topic kind for attestation gossip -pub const ATTESTATION_TOPIC_KIND: &str = "attestation"; +/// Topic kind prefix for per-committee attestation subnets. +/// +/// Full topic format: `/leanconsensus/{network}/attestation_{subnet_id}/ssz_snappy` +pub const ATTESTATION_SUBNET_TOPIC_PREFIX: &str = "attestation"; +/// Topic kind for aggregated attestation gossip. +/// +/// Full topic format: `/leanconsensus/{network}/aggregation/ssz_snappy` +pub const AGGREGATION_TOPIC_KIND: &str = "aggregation"; diff --git a/crates/net/p2p/src/gossipsub/mod.rs b/crates/net/p2p/src/gossipsub/mod.rs index a66855e..a692428 100644 --- a/crates/net/p2p/src/gossipsub/mod.rs +++ b/crates/net/p2p/src/gossipsub/mod.rs @@ -2,5 +2,7 @@ mod encoding; mod handler; mod messages; -pub use handler::{handle_gossipsub_message, publish_attestation, publish_block}; -pub use messages::{ATTESTATION_TOPIC_KIND, BLOCK_TOPIC_KIND}; +pub use handler::{ + handle_gossipsub_message, publish_aggregated_attestation, publish_attestation, publish_block, +}; +pub use messages::{AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND}; diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 9951e6a..e4fdd8c 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -4,7 +4,7 @@ use std::{ time::Duration, }; -use ethlambda_blockchain::{BlockChain, P2PMessage}; +use ethlambda_blockchain::{ATTESTATION_COMMITTEE_COUNT, BlockChain, P2PMessage}; use ethlambda_storage::Store; use ethlambda_types::primitives::H256; use ethrex_common::H264; @@ -24,7 +24,10 @@ use tokio::sync::mpsc; use tracing::{info, trace, warn}; use crate::{ - gossipsub::{ATTESTATION_TOPIC_KIND, BLOCK_TOPIC_KIND, publish_attestation, publish_block}, + gossipsub::{ + AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND, + publish_aggregated_attestation, publish_attestation, publish_block, + }, req_resp::{ BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status, fetch_block_from_peer, @@ -60,6 +63,7 @@ pub async fn start_p2p( blockchain: BlockChain, p2p_rx: mpsc::UnboundedReceiver, store: Store, + validator_id: Option, ) { let config = libp2p::gossipsub::ConfigBuilder::default() // d @@ -151,21 +155,44 @@ pub async fn start_p2p( .listen_on(addr) .expect("failed to bind gossipsub listening address"); - let network = "devnet0"; - let topic_kinds = [BLOCK_TOPIC_KIND, ATTESTATION_TOPIC_KIND]; - for topic_kind in topic_kinds { - let topic_str = format!("/leanconsensus/{network}/{topic_kind}/ssz_snappy"); - let topic = libp2p::gossipsub::IdentTopic::new(topic_str); - swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap(); - } + let network = "devnet3"; - // Create topics for outbound messages - let attestation_topic = libp2p::gossipsub::IdentTopic::new(format!( - "/leanconsensus/{network}/{ATTESTATION_TOPIC_KIND}/ssz_snappy" - )); - let block_topic = libp2p::gossipsub::IdentTopic::new(format!( - "/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy" - )); + // Subscribe to block topic (all nodes) + let block_topic_str = format!("/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy"); + let block_topic = libp2p::gossipsub::IdentTopic::new(block_topic_str); + swarm + .behaviour_mut() + .gossipsub + .subscribe(&block_topic) + .unwrap(); + + // Subscribe to aggregation topic (all validators) + let aggregation_topic_str = + format!("/leanconsensus/{network}/{AGGREGATION_TOPIC_KIND}/ssz_snappy"); + let aggregation_topic = libp2p::gossipsub::IdentTopic::new(aggregation_topic_str); + swarm + .behaviour_mut() + .gossipsub + .subscribe(&aggregation_topic) + .unwrap(); + + // Subscribe to attestation subnet topic (validators subscribe to their committee's subnet) + // ATTESTATION_COMMITTEE_COUNT is 1 for devnet-3 but will increase in future devnets. + #[allow(clippy::modulo_one)] + let subnet_id = validator_id.map(|vid| vid % ATTESTATION_COMMITTEE_COUNT); + let attestation_topic_kind = match subnet_id { + Some(id) => format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_{id}"), + // Non-validators subscribe to subnet 0 to receive attestations + None => format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_0"), + }; + let attestation_topic_str = + format!("/leanconsensus/{network}/{attestation_topic_kind}/ssz_snappy"); + let attestation_topic = libp2p::gossipsub::IdentTopic::new(attestation_topic_str); + swarm + .behaviour_mut() + .gossipsub + .subscribe(&attestation_topic) + .unwrap(); info!(socket=%listening_socket, "P2P node started"); @@ -178,6 +205,7 @@ pub async fn start_p2p( p2p_rx, attestation_topic, block_topic, + aggregation_topic, connected_peers: HashSet::new(), pending_requests: HashMap::new(), request_id_map: HashMap::new(), @@ -203,6 +231,7 @@ pub(crate) struct P2PServer { pub(crate) p2p_rx: mpsc::UnboundedReceiver, pub(crate) attestation_topic: libp2p::gossipsub::IdentTopic, pub(crate) block_topic: libp2p::gossipsub::IdentTopic, + pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic, pub(crate) connected_peers: HashSet, pub(crate) pending_requests: HashMap, pub(crate) request_id_map: HashMap, @@ -371,6 +400,9 @@ async fn handle_p2p_message(server: &mut P2PServer, message: P2PMessage) { P2PMessage::PublishBlock(signed_block) => { publish_block(server, signed_block).await; } + P2PMessage::PublishAggregatedAttestation(attestation) => { + publish_aggregated_attestation(server, attestation).await; + } P2PMessage::FetchBlock(root) => { // Deduplicate - if already pending, ignore if server.pending_requests.contains_key(&root) { diff --git a/crates/net/p2p/src/metrics.rs b/crates/net/p2p/src/metrics.rs index 9f24f99..00224d7 100644 --- a/crates/net/p2p/src/metrics.rs +++ b/crates/net/p2p/src/metrics.rs @@ -17,7 +17,7 @@ static NODE_NAME_REGISTRY: LazyLock>> = pub fn populate_name_registry(names_and_privkeys: HashMap) { let mut registry = NODE_NAME_REGISTRY.write().unwrap(); - let name_registry = names_and_privkeys + *registry = names_and_privkeys .into_iter() .filter_map(|(name, mut privkey)| { let Ok(privkey) = secp256k1::SecretKey::try_from_bytes(&mut privkey) else { @@ -31,7 +31,6 @@ pub fn populate_name_registry(names_and_privkeys: HashMap) { Some((peer_id, &*name.leak())) }) .collect(); - *registry = name_registry; } fn resolve(peer_id: &Option) -> &'static str { diff --git a/crates/storage/src/api/tables.rs b/crates/storage/src/api/tables.rs index deecdad..1845470 100644 --- a/crates/storage/src/api/tables.rs +++ b/crates/storage/src/api/tables.rs @@ -12,14 +12,18 @@ pub enum Table { BlockSignatures, /// State storage: H256 -> State States, - /// Known attestations: u64 -> AttestationData - LatestKnownAttestations, - /// Pending attestations: u64 -> AttestationData - LatestNewAttestations, /// Gossip signatures: SignatureKey -> ValidatorSignature GossipSignatures, - /// Aggregated proofs: SignatureKey -> Vec + /// Aggregated proofs from received blocks: SignatureKey -> Vec AggregatedPayloads, + /// Attestation data indexed by tree hash root: H256 -> AttestationData + AttestationDataByRoot, + /// Pending aggregated payloads (not yet active in fork choice): + /// SignatureKey -> Vec + LatestNewAggregatedPayloads, + /// Active aggregated payloads (counted in fork choice): + /// SignatureKey -> Vec + LatestKnownAggregatedPayloads, /// Metadata: string keys -> various scalar values Metadata, /// Live chain index: (slot || root) -> parent_root @@ -31,15 +35,16 @@ pub enum Table { } /// All table variants. -pub const ALL_TABLES: [Table; 10] = [ +pub const ALL_TABLES: [Table; 11] = [ Table::BlockHeaders, Table::BlockBodies, Table::BlockSignatures, Table::States, - Table::LatestKnownAttestations, - Table::LatestNewAttestations, Table::GossipSignatures, Table::AggregatedPayloads, + Table::AttestationDataByRoot, + Table::LatestNewAggregatedPayloads, + Table::LatestKnownAggregatedPayloads, Table::Metadata, Table::LiveChain, ]; diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs index c25b128..a1c2ebc 100644 --- a/crates/storage/src/backend/rocksdb.rs +++ b/crates/storage/src/backend/rocksdb.rs @@ -16,10 +16,11 @@ fn cf_name(table: Table) -> &'static str { Table::BlockBodies => "block_bodies", Table::BlockSignatures => "block_signatures", Table::States => "states", - Table::LatestKnownAttestations => "latest_known_attestations", - Table::LatestNewAttestations => "latest_new_attestations", Table::GossipSignatures => "gossip_signatures", Table::AggregatedPayloads => "aggregated_payloads", + Table::AttestationDataByRoot => "attestation_data_by_root", + Table::LatestNewAggregatedPayloads => "latest_new_aggregated_payloads", + Table::LatestKnownAggregatedPayloads => "latest_known_aggregated_payloads", Table::Metadata => "metadata", Table::LiveChain => "live_chain", } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index b6ef9ca..0441cd4 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -288,7 +288,11 @@ impl Store { // ============ Time ============ - /// Returns the current store time in seconds since genesis. + /// Returns the current store time in interval counts since genesis. + /// + /// Each increment represents one 800ms interval. Derive slot/interval as: + /// slot = time() / INTERVALS_PER_SLOT + /// interval = time() % INTERVALS_PER_SLOT pub fn time(&self) -> u64 { self.get_metadata(KEY_TIME) } @@ -369,13 +373,19 @@ impl Store { { self.prune_live_chain(finalized.slot); - // Prune signatures and payloads for finalized slots + // Prune signatures, payloads, and attestation data for finalized slots let pruned_sigs = self.prune_gossip_signatures(finalized.slot); let pruned_payloads = self.prune_aggregated_payloads(finalized.slot); - if pruned_sigs > 0 || pruned_payloads > 0 { + let pruned_att_data = self.prune_attestation_data_by_root(finalized.slot); + self.prune_aggregated_payload_table(Table::LatestNewAggregatedPayloads, finalized.slot); + self.prune_aggregated_payload_table( + Table::LatestKnownAggregatedPayloads, + finalized.slot, + ); + if pruned_sigs > 0 || pruned_payloads > 0 || pruned_att_data > 0 { info!( finalized_slot = finalized.slot, - pruned_sigs, pruned_payloads, "Pruned finalized signatures" + pruned_sigs, pruned_payloads, pruned_att_data, "Pruned finalized signatures" ); } } @@ -523,6 +533,73 @@ impl Store { removed_count } + /// Prune attestation data by root for slots <= finalized_slot. + /// + /// Returns the number of entries pruned. + pub fn prune_attestation_data_by_root(&mut self, finalized_slot: u64) -> usize { + let view = self.backend.begin_read().expect("read view"); + let mut to_delete = vec![]; + + for (key_bytes, value_bytes) in view + .prefix_iterator(Table::AttestationDataByRoot, &[]) + .expect("iter") + .filter_map(|r| r.ok()) + { + if let Ok(data) = AttestationData::from_ssz_bytes(&value_bytes) + && data.slot <= finalized_slot + { + to_delete.push(key_bytes.to_vec()); + } + } + drop(view); + + let count = to_delete.len(); + if !to_delete.is_empty() { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .delete_batch(Table::AttestationDataByRoot, to_delete) + .expect("delete"); + batch.commit().expect("commit"); + } + count + } + + /// Prune an aggregated payload table (new or known) for slots <= finalized_slot. + fn prune_aggregated_payload_table(&mut self, table: Table, finalized_slot: u64) { + let view = self.backend.begin_read().expect("read view"); + let mut updates = vec![]; + let mut deletes = vec![]; + + for (key_bytes, value_bytes) in view + .prefix_iterator(table, &[]) + .expect("iter") + .filter_map(|r| r.ok()) + { + if let Ok(mut payloads) = Vec::::from_ssz_bytes(&value_bytes) { + let original_len = payloads.len(); + payloads.retain(|p| p.slot > finalized_slot); + + if payloads.is_empty() { + deletes.push(key_bytes.to_vec()); + } else if payloads.len() < original_len { + updates.push((key_bytes.to_vec(), payloads.as_ssz_bytes())); + } + } + } + drop(view); + + if !updates.is_empty() || !deletes.is_empty() { + let mut batch = self.backend.begin_write().expect("write batch"); + if !updates.is_empty() { + batch.put_batch(table, updates).expect("put"); + } + if !deletes.is_empty() { + batch.delete_batch(table, deletes).expect("delete"); + } + batch.commit().expect("commit"); + } + } + /// Get the block header by root. pub fn get_block_header(&self, root: &H256) -> Option { let view = self.backend.begin_read().expect("read view"); @@ -642,124 +719,165 @@ impl Store { batch.commit().expect("commit"); } - // ============ Known Attestations ============ + // ============ Attestation Data By Root ============ + // + // Content-addressed attestation data storage. Used to reconstruct + // per-validator attestation maps from aggregated payloads. + + /// Stores attestation data indexed by its tree hash root. + pub fn insert_attestation_data_by_root(&mut self, root: H256, data: AttestationData) { + let mut batch = self.backend.begin_write().expect("write batch"); + let entries = vec![(root.as_ssz_bytes(), data.as_ssz_bytes())]; + batch + .put_batch(Table::AttestationDataByRoot, entries) + .expect("put attestation data"); + batch.commit().expect("commit"); + } + + /// Returns attestation data for the given root hash. + pub fn get_attestation_data_by_root(&self, root: &H256) -> Option { + let view = self.backend.begin_read().expect("read view"); + view.get(Table::AttestationDataByRoot, &root.as_ssz_bytes()) + .expect("get") + .map(|bytes| AttestationData::from_ssz_bytes(&bytes).expect("valid attestation data")) + } + + // ============ Known Aggregated Payloads ============ // - // "Known" attestations are included in fork choice weight calculations. - // They're promoted from "new" attestations at specific intervals. + // "Known" aggregated payloads are active in fork choice weight calculations. + // Promoted from "new" payloads at specific intervals (0 with proposal, 4). - /// Iterates over all known attestations (validator_id, attestation_data). - pub fn iter_known_attestations(&self) -> impl Iterator + '_ { + /// Iterates over all known aggregated payloads. + pub fn iter_known_aggregated_payloads( + &self, + ) -> impl Iterator)> + '_ { let view = self.backend.begin_read().expect("read view"); let entries: Vec<_> = view - .prefix_iterator(Table::LatestKnownAttestations, &[]) + .prefix_iterator(Table::LatestKnownAggregatedPayloads, &[]) .expect("iterator") .filter_map(|res| res.ok()) .map(|(k, v)| { - let validator_id = u64::from_ssz_bytes(&k).expect("valid validator_id"); - let data = AttestationData::from_ssz_bytes(&v).expect("valid attestation data"); - (validator_id, data) + let key = decode_signature_key(&k); + let payloads = + Vec::::from_ssz_bytes(&v).expect("valid payloads"); + (key, payloads) }) .collect(); entries.into_iter() } - /// Returns a validator's latest known attestation. - pub fn get_known_attestation(&self, validator_id: &u64) -> Option { + /// Insert an aggregated payload into the known (fork-choice-active) table. + pub fn insert_known_aggregated_payload( + &mut self, + key: SignatureKey, + payload: StoredAggregatedPayload, + ) { + let encoded_key = encode_signature_key(&key); let view = self.backend.begin_read().expect("read view"); - view.get(Table::LatestKnownAttestations, &validator_id.as_ssz_bytes()) + let mut payloads: Vec = view + .get(Table::LatestKnownAggregatedPayloads, &encoded_key) .expect("get") - .map(|bytes| AttestationData::from_ssz_bytes(&bytes).expect("valid attestation data")) - } + .map(|bytes| Vec::::from_ssz_bytes(&bytes).expect("valid")) + .unwrap_or_default(); + drop(view); + + payloads.push(payload); - /// Stores a validator's latest known attestation. - pub fn insert_known_attestation(&mut self, validator_id: u64, data: AttestationData) { let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(validator_id.as_ssz_bytes(), data.as_ssz_bytes())]; + let entries = vec![(encoded_key, payloads.as_ssz_bytes())]; batch - .put_batch(Table::LatestKnownAttestations, entries) - .expect("put attestation"); + .put_batch(Table::LatestKnownAggregatedPayloads, entries) + .expect("put known aggregated payload"); batch.commit().expect("commit"); } - // ============ New Attestations ============ + // ============ New Aggregated Payloads ============ // - // "New" attestations are pending attestations not yet included in fork choice. - // They're promoted to "known" via `promote_new_attestations`. + // "New" aggregated payloads are pending — not yet counted in fork choice. + // Promoted to "known" via `promote_new_aggregated_payloads`. - /// Iterates over all new (pending) attestations. - pub fn iter_new_attestations(&self) -> impl Iterator + '_ { + /// Iterates over all new (pending) aggregated payloads. + pub fn iter_new_aggregated_payloads( + &self, + ) -> impl Iterator)> + '_ { let view = self.backend.begin_read().expect("read view"); let entries: Vec<_> = view - .prefix_iterator(Table::LatestNewAttestations, &[]) + .prefix_iterator(Table::LatestNewAggregatedPayloads, &[]) .expect("iterator") .filter_map(|res| res.ok()) .map(|(k, v)| { - let validator_id = u64::from_ssz_bytes(&k).expect("valid validator_id"); - let data = AttestationData::from_ssz_bytes(&v).expect("valid attestation data"); - (validator_id, data) + let key = decode_signature_key(&k); + let payloads = + Vec::::from_ssz_bytes(&v).expect("valid payloads"); + (key, payloads) }) .collect(); entries.into_iter() } - /// Returns a validator's latest new (pending) attestation. - pub fn get_new_attestation(&self, validator_id: &u64) -> Option { + /// Insert an aggregated payload into the new (pending) table. + pub fn insert_new_aggregated_payload( + &mut self, + key: SignatureKey, + payload: StoredAggregatedPayload, + ) { + let encoded_key = encode_signature_key(&key); let view = self.backend.begin_read().expect("read view"); - view.get(Table::LatestNewAttestations, &validator_id.as_ssz_bytes()) + let mut payloads: Vec = view + .get(Table::LatestNewAggregatedPayloads, &encoded_key) .expect("get") - .map(|bytes| AttestationData::from_ssz_bytes(&bytes).expect("valid attestation data")) - } + .map(|bytes| Vec::::from_ssz_bytes(&bytes).expect("valid")) + .unwrap_or_default(); + drop(view); - /// Stores a validator's new (pending) attestation. - pub fn insert_new_attestation(&mut self, validator_id: u64, data: AttestationData) { - let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(validator_id.as_ssz_bytes(), data.as_ssz_bytes())]; - batch - .put_batch(Table::LatestNewAttestations, entries) - .expect("put attestation"); - batch.commit().expect("commit"); - } + payloads.push(payload); - /// Removes a validator's new (pending) attestation. - pub fn remove_new_attestation(&mut self, validator_id: &u64) { let mut batch = self.backend.begin_write().expect("write batch"); + let entries = vec![(encoded_key, payloads.as_ssz_bytes())]; batch - .delete_batch( - Table::LatestNewAttestations, - vec![validator_id.as_ssz_bytes()], - ) - .expect("delete attestation"); + .put_batch(Table::LatestNewAggregatedPayloads, entries) + .expect("put new aggregated payload"); batch.commit().expect("commit"); } - /// Promotes all new attestations to known attestations. + /// Promotes all new aggregated payloads to known, making them active in fork choice. /// - /// Takes all attestations from `latest_new_attestations` and moves them - /// to `latest_known_attestations`, making them count for fork choice. - pub fn promote_new_attestations(&mut self) { - // Read all new attestations + /// Moves entries from `LatestNewAggregatedPayloads` to `LatestKnownAggregatedPayloads`. + pub fn promote_new_aggregated_payloads(&mut self) { let view = self.backend.begin_read().expect("read view"); - let new_attestations: Vec<(Vec, Vec)> = view - .prefix_iterator(Table::LatestNewAttestations, &[]) + let new_payloads: Vec<(Vec, Vec)> = view + .prefix_iterator(Table::LatestNewAggregatedPayloads, &[]) .expect("iterator") .filter_map(|res| res.ok()) .map(|(k, v)| (k.to_vec(), v.to_vec())) .collect(); drop(view); - if new_attestations.is_empty() { + if new_payloads.is_empty() { return; } - // Delete from new and insert to known in a single batch let mut batch = self.backend.begin_write().expect("write batch"); - let keys_to_delete: Vec<_> = new_attestations.iter().map(|(k, _)| k.clone()).collect(); + let keys_to_delete: Vec<_> = new_payloads.iter().map(|(k, _)| k.clone()).collect(); + batch + .delete_batch(Table::LatestNewAggregatedPayloads, keys_to_delete) + .expect("delete new aggregated payloads"); batch - .delete_batch(Table::LatestNewAttestations, keys_to_delete) - .expect("delete new attestations"); + .put_batch(Table::LatestKnownAggregatedPayloads, new_payloads) + .expect("put known aggregated payloads"); + batch.commit().expect("commit"); + } + + /// Delete specific gossip signatures by key. + pub fn delete_gossip_signatures(&mut self, keys: &[SignatureKey]) { + if keys.is_empty() { + return; + } + let encoded_keys: Vec<_> = keys.iter().map(encode_signature_key).collect(); + let mut batch = self.backend.begin_write().expect("write batch"); batch - .put_batch(Table::LatestKnownAttestations, new_attestations) - .expect("put known attestations"); + .delete_batch(Table::GossipSignatures, encoded_keys) + .expect("delete gossip signatures"); batch.commit().expect("commit"); } From aacd731d944e0d21593e8a2b49fd6015e2a9d4f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 10 Feb 2026 17:33:24 -0300 Subject: [PATCH 2/7] chore: address review comments --- crates/blockchain/src/store.rs | 59 +++++++++++++++++++++++----------- 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 71fa594..56a549d 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -144,7 +144,6 @@ fn aggregate_committee_signatures(store: &mut Store) { .entry(*data_root) .or_default() .push((*validator_id, sig)); - keys_to_delete.push((*validator_id, *data_root)); } } @@ -161,7 +160,10 @@ fn aggregate_committee_signatures(store: &mut Store) { let mut ids = vec![]; for (vid, sig) in &validators_and_sigs { - let Ok(pubkey) = validators[*vid as usize].get_pubkey() else { + let Some(validator) = validators.get(*vid as usize) else { + continue; + }; + let Ok(pubkey) = validator.get_pubkey() else { continue; }; sigs.push(sig.clone()); @@ -188,6 +190,9 @@ fn aggregate_committee_signatures(store: &mut Store) { store.insert_new_aggregated_payload((*vid, data_root), payload.clone()); } + // Only delete successfully aggregated signatures + keys_to_delete.extend(ids.iter().map(|vid| (*vid, data_root))); + metrics::inc_pq_sig_aggregated_signatures(); metrics::inc_pq_sig_attestations_in_aggregated_signatures(ids.len() as u64); } @@ -257,7 +262,7 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() /// interval = store.time() % INTERVALS_PER_SLOT pub fn on_tick(store: &mut Store, timestamp: u64, has_proposal: bool, is_aggregator: bool) { // Convert UNIX timestamp to interval count since genesis - let time_delta_ms = (timestamp - store.config().genesis_time) * 1000; + let time_delta_ms = timestamp.saturating_sub(store.config().genesis_time) * 1000; let time = time_delta_ms / MILLISECONDS_PER_INTERVAL; // If we're more than a slot behind, fast-forward to a slot before. @@ -325,15 +330,6 @@ pub fn on_gossip_attestation( validate_attestation_data(store, &attestation.data) .inspect_err(|_| metrics::inc_attestations_invalid("gossip"))?; - // Reject attestations from future slots - let current_slot = store.time() / INTERVALS_PER_SLOT; - if attestation.data.slot > current_slot { - return Err(StoreError::AttestationTooFarInFuture { - attestation_slot: attestation.data.slot, - current_slot, - }); - } - let target = attestation.data.target; let target_state = store .get_state(&target.root) @@ -406,13 +402,38 @@ pub fn on_gossip_aggregated_attestation( validate_attestation_data(store, &aggregated.data) .inspect_err(|_| metrics::inc_attestations_invalid("aggregated"))?; - // Reject attestations from future slots - let current_slot = store.time() / INTERVALS_PER_SLOT; - if aggregated.data.slot > current_slot { - return Err(StoreError::AttestationTooFarInFuture { - attestation_slot: aggregated.data.slot, - current_slot, - }); + // Verify aggregated proof signature + if cfg!(not(feature = "skip-signature-verification")) { + let target_state = store + .get_state(&aggregated.data.target.root) + .ok_or(StoreError::MissingTargetState(aggregated.data.target.root))?; + let validators = &target_state.validators; + let num_validators = validators.len() as u64; + + let participant_indices: Vec = aggregated.proof.participant_indices().collect(); + if participant_indices.iter().any(|&vid| vid >= num_validators) { + return Err(StoreError::InvalidValidatorIndex); + } + + let pubkeys: Vec<_> = participant_indices + .iter() + .map(|&vid| { + validators[vid as usize] + .get_pubkey() + .map_err(|_| StoreError::PubkeyDecodingFailed(vid)) + }) + .collect::>()?; + + let message = aggregated.data.tree_hash_root(); + let epoch: u32 = aggregated.data.slot.try_into().expect("slot exceeds u32"); + + ethlambda_crypto::verify_aggregated_signature( + &aggregated.proof.proof_data, + pubkeys, + &message, + epoch, + ) + .map_err(StoreError::AggregateVerificationFailed)?; } // Store attestation data by root (content-addressed, idempotent) From ef5e8e81c8ade918e267ceec3422ab8cc9588289 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 10 Feb 2026 18:52:33 -0300 Subject: [PATCH 3/7] fix: merge proof lists on promote and use known payloads for block building promote_new_aggregated_payloads was overwriting existing known entries instead of appending, causing loss of previously accumulated proofs. Block building was reading from the legacy AggregatedPayloads table which was never populated by the new gossip pipeline, so proposers couldn't use freshly aggregated proofs. Fix both by merging payload lists during promotion and switching block building to read from LatestKnownAggregatedPayloads. Remove the now- unused AggregatedPayloads table and all associated methods. --- crates/blockchain/src/store.rs | 11 +- crates/storage/src/api/tables.rs | 5 +- crates/storage/src/backend/rocksdb.rs | 1 - crates/storage/src/store.rs | 154 +++++--------------------- 4 files changed, 34 insertions(+), 137 deletions(-) diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 56a549d..9de9b75 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -553,9 +553,6 @@ pub fn on_block( // Store proof in known aggregated payloads (active in fork choice) store.insert_known_aggregated_payload((*validator_id, data_root), payload.clone()); - // Also store in existing AggregatedPayloads table for block building - store.insert_aggregated_payload(&att.data, *validator_id, proof.clone()); - metrics::inc_attestations_valid("block"); } } @@ -738,9 +735,9 @@ pub fn produce_block_with_signatures( // Get known block roots for attestation validation let known_block_roots = store.get_block_roots(); - // Collect existing proofs for block building (from previously received blocks) + // Collect existing proofs for block building from known aggregated payloads let aggregated_payloads: HashMap> = store - .iter_aggregated_payloads() + .iter_known_aggregated_payloads() .map(|(key, stored_payloads)| { let proofs = stored_payloads.into_iter().map(|sp| sp.proof).collect(); (key, proofs) @@ -1027,8 +1024,8 @@ fn build_block( /// Select existing aggregated proofs for attestations to include in a block. /// /// Fresh gossip aggregation happens at interval 2 (`aggregate_committee_signatures`). -/// This function only selects from existing proofs in the `AggregatedPayloads` table -/// (proofs from previously received blocks). +/// This function only selects from existing proofs in the `LatestKnownAggregatedPayloads` table +/// (proofs from previously received blocks and promoted gossip aggregations). /// /// Returns a list of (attestation, proof) pairs ready for block inclusion. fn select_aggregated_proofs( diff --git a/crates/storage/src/api/tables.rs b/crates/storage/src/api/tables.rs index 1845470..7b184d5 100644 --- a/crates/storage/src/api/tables.rs +++ b/crates/storage/src/api/tables.rs @@ -14,8 +14,6 @@ pub enum Table { States, /// Gossip signatures: SignatureKey -> ValidatorSignature GossipSignatures, - /// Aggregated proofs from received blocks: SignatureKey -> Vec - AggregatedPayloads, /// Attestation data indexed by tree hash root: H256 -> AttestationData AttestationDataByRoot, /// Pending aggregated payloads (not yet active in fork choice): @@ -35,13 +33,12 @@ pub enum Table { } /// All table variants. -pub const ALL_TABLES: [Table; 11] = [ +pub const ALL_TABLES: [Table; 10] = [ Table::BlockHeaders, Table::BlockBodies, Table::BlockSignatures, Table::States, Table::GossipSignatures, - Table::AggregatedPayloads, Table::AttestationDataByRoot, Table::LatestNewAggregatedPayloads, Table::LatestKnownAggregatedPayloads, diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs index a1c2ebc..45565f1 100644 --- a/crates/storage/src/backend/rocksdb.rs +++ b/crates/storage/src/backend/rocksdb.rs @@ -17,7 +17,6 @@ fn cf_name(table: Table) -> &'static str { Table::BlockSignatures => "block_signatures", Table::States => "states", Table::GossipSignatures => "gossip_signatures", - Table::AggregatedPayloads => "aggregated_payloads", Table::AttestationDataByRoot => "attestation_data_by_root", Table::LatestNewAggregatedPayloads => "latest_new_aggregated_payloads", Table::LatestKnownAggregatedPayloads => "latest_known_aggregated_payloads", diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 0441cd4..a88b788 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -13,7 +13,7 @@ use crate::types::{StoredAggregatedPayload, StoredSignature}; use ethlambda_types::{ attestation::AttestationData, block::{ - AggregatedSignatureProof, Block, BlockBody, BlockHeader, BlockSignaturesWithAttestation, + Block, BlockBody, BlockHeader, BlockSignaturesWithAttestation, BlockWithAttestation, SignedBlockWithAttestation, }, primitives::{ @@ -375,17 +375,16 @@ impl Store { // Prune signatures, payloads, and attestation data for finalized slots let pruned_sigs = self.prune_gossip_signatures(finalized.slot); - let pruned_payloads = self.prune_aggregated_payloads(finalized.slot); let pruned_att_data = self.prune_attestation_data_by_root(finalized.slot); self.prune_aggregated_payload_table(Table::LatestNewAggregatedPayloads, finalized.slot); self.prune_aggregated_payload_table( Table::LatestKnownAggregatedPayloads, finalized.slot, ); - if pruned_sigs > 0 || pruned_payloads > 0 || pruned_att_data > 0 { + if pruned_sigs > 0 || pruned_att_data > 0 { info!( finalized_slot = finalized.slot, - pruned_sigs, pruned_payloads, pruned_att_data, "Pruned finalized signatures" + pruned_sigs, pruned_att_data, "Pruned finalized signatures" ); } } @@ -488,51 +487,6 @@ impl Store { count } - /// Prune aggregated payloads for slots <= finalized_slot. - /// - /// Returns the number of payloads pruned. - pub fn prune_aggregated_payloads(&mut self, finalized_slot: u64) -> usize { - let view = self.backend.begin_read().expect("read view"); - let mut updates = vec![]; - let mut deletes = vec![]; - let mut removed_count = 0; - - for (key_bytes, value_bytes) in view - .prefix_iterator(Table::AggregatedPayloads, &[]) - .expect("iter") - .filter_map(|r| r.ok()) - { - if let Ok(mut payloads) = Vec::::from_ssz_bytes(&value_bytes) { - let original_len = payloads.len(); - payloads.retain(|p| p.slot > finalized_slot); - removed_count += original_len - payloads.len(); - - if payloads.is_empty() { - deletes.push(key_bytes.to_vec()); - } else if payloads.len() < original_len { - updates.push((key_bytes.to_vec(), payloads.as_ssz_bytes())); - } - } - } - drop(view); - - if !updates.is_empty() || !deletes.is_empty() { - let mut batch = self.backend.begin_write().expect("write batch"); - if !updates.is_empty() { - batch - .put_batch(Table::AggregatedPayloads, updates) - .expect("put"); - } - if !deletes.is_empty() { - batch - .delete_batch(Table::AggregatedPayloads, deletes) - .expect("delete"); - } - batch.commit().expect("commit"); - } - removed_count - } - /// Prune attestation data by root for slots <= finalized_slot. /// /// Returns the number of entries pruned. @@ -842,28 +796,48 @@ impl Store { /// Promotes all new aggregated payloads to known, making them active in fork choice. /// - /// Moves entries from `LatestNewAggregatedPayloads` to `LatestKnownAggregatedPayloads`. + /// Merges entries from `LatestNewAggregatedPayloads` into `LatestKnownAggregatedPayloads`, + /// appending to existing payload lists rather than overwriting them. pub fn promote_new_aggregated_payloads(&mut self) { let view = self.backend.begin_read().expect("read view"); - let new_payloads: Vec<(Vec, Vec)> = view + let new_entries: Vec<(Vec, Vec)> = view .prefix_iterator(Table::LatestNewAggregatedPayloads, &[]) .expect("iterator") .filter_map(|res| res.ok()) .map(|(k, v)| (k.to_vec(), v.to_vec())) .collect(); - drop(view); - if new_payloads.is_empty() { + if new_entries.is_empty() { + drop(view); return; } + // Merge new payloads with existing known payloads + let merged: Vec<(Vec, Vec)> = new_entries + .iter() + .map(|(key, new_bytes)| { + let new_payloads = + Vec::::from_ssz_bytes(new_bytes).expect("valid"); + let mut known_payloads: Vec = view + .get(Table::LatestKnownAggregatedPayloads, key) + .expect("get") + .map(|bytes| { + Vec::::from_ssz_bytes(&bytes).expect("valid") + }) + .unwrap_or_default(); + known_payloads.extend(new_payloads); + (key.clone(), known_payloads.as_ssz_bytes()) + }) + .collect(); + drop(view); + + let keys_to_delete: Vec<_> = new_entries.into_iter().map(|(k, _)| k).collect(); let mut batch = self.backend.begin_write().expect("write batch"); - let keys_to_delete: Vec<_> = new_payloads.iter().map(|(k, _)| k.clone()).collect(); batch .delete_batch(Table::LatestNewAggregatedPayloads, keys_to_delete) .expect("delete new aggregated payloads"); batch - .put_batch(Table::LatestKnownAggregatedPayloads, new_payloads) + .put_batch(Table::LatestKnownAggregatedPayloads, merged) .expect("put known aggregated payloads"); batch.commit().expect("commit"); } @@ -925,76 +899,6 @@ impl Store { batch.commit().expect("commit"); } - // ============ Aggregated Payloads ============ - // - // Aggregated payloads are leanVM proofs combining multiple signatures. - // Used to verify block signatures efficiently. - - /// Iterates over all aggregated signature payloads. - pub fn iter_aggregated_payloads( - &self, - ) -> impl Iterator)> + '_ { - let view = self.backend.begin_read().expect("read view"); - let entries: Vec<_> = view - .prefix_iterator(Table::AggregatedPayloads, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(k, v)| { - let key = decode_signature_key(&k); - let payloads = - Vec::::from_ssz_bytes(&v).expect("valid payloads"); - (key, payloads) - }) - .collect(); - entries.into_iter() - } - - /// Returns aggregated payloads for a signature key. - fn get_aggregated_payloads(&self, key: &SignatureKey) -> Option> { - let view = self.backend.begin_read().expect("read view"); - view.get(Table::AggregatedPayloads, &encode_signature_key(key)) - .expect("get") - .map(|bytes| { - Vec::::from_ssz_bytes(&bytes).expect("valid payloads") - }) - } - - /// Insert an aggregated signature proof for a validator's attestation. - /// - /// Multiple proofs can be stored for the same (validator, attestation_data) pair, - /// each with its own slot metadata for pruning. - /// - /// # Thread Safety - /// - /// This method uses a read-modify-write pattern that is NOT atomic: - /// 1. Read existing payloads - /// 2. Append new payload - /// 3. Write back - /// - /// Concurrent calls could result in lost updates. This method MUST be called - /// from a single thread. In our case, that thread is the `BlockChain` `GenServer` - pub fn insert_aggregated_payload( - &mut self, - attestation_data: &AttestationData, - validator_id: u64, - proof: AggregatedSignatureProof, - ) { - let slot = attestation_data.slot; - let data_root = attestation_data.tree_hash_root(); - let key = (validator_id, data_root); - - // Read existing, add new, write back (NOT atomic - requires single-threaded access) - let mut payloads = self.get_aggregated_payloads(&key).unwrap_or_default(); - payloads.push(StoredAggregatedPayload { slot, proof }); - - let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(encode_signature_key(&key), payloads.as_ssz_bytes())]; - batch - .put_batch(Table::AggregatedPayloads, entries) - .expect("put proofs"); - batch.commit().expect("commit"); - } - // ============ Derived Accessors ============ /// Returns the slot of the current safe target block. From 09b85e783e6a73a9edeb4c7cc3236667ccdb9cce Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Thu, 12 Feb 2026 18:01:03 -0300 Subject: [PATCH 4/7] chore: remove from_ssz_bytes_compat for devnet-3 serialization compatibility (#127) Change BlocksByRootRequest::from_ssz_bytes_compat to use SszDecode::from_ssz_bytes with the payload. We don't need to maintain compatibility with devnet-2 format, so we remove the previous serialization. --- crates/net/p2p/src/req_resp/codec.rs | 12 +++--- crates/net/p2p/src/req_resp/messages.rs | 54 +------------------------ 2 files changed, 7 insertions(+), 59 deletions(-) diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index a1b0021..ead5586 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -7,12 +7,13 @@ use tracing::{debug, trace}; use super::{ encoding::{decode_payload, write_payload}, messages::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, ErrorMessage, Request, Response, - ResponseCode, ResponsePayload, STATUS_PROTOCOL_V1, Status, + BLOCKS_BY_ROOT_PROTOCOL_V1, ErrorMessage, Request, Response, ResponseCode, ResponsePayload, + STATUS_PROTOCOL_V1, Status, }, }; use ethlambda_types::block::SignedBlockWithAttestation; +use ethlambda_types::primitives::ssz::Decode as SszDecode; #[derive(Debug, Clone, Default)] pub struct Codec; @@ -41,10 +42,9 @@ impl libp2p::request_response::Codec for Codec { Ok(Request::Status(status)) } BLOCKS_BY_ROOT_PROTOCOL_V1 => { - let request = - BlocksByRootRequest::from_ssz_bytes_compat(&payload).map_err(|err| { - io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) - })?; + let request = SszDecode::from_ssz_bytes(&payload).map_err(|err| { + io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) + })?; Ok(Request::BlocksByRoot(request)) } _ => Err(io::Error::new( diff --git a/crates/net/p2p/src/req_resp/messages.rs b/crates/net/p2p/src/req_resp/messages.rs index 3826f5b..607c918 100644 --- a/crates/net/p2p/src/req_resp/messages.rs +++ b/crates/net/p2p/src/req_resp/messages.rs @@ -2,7 +2,7 @@ use ethlambda_types::{ block::SignedBlockWithAttestation, primitives::{ H256, - ssz::{Decode, Decode as SszDecode, Encode}, + ssz::{Decode, Encode}, }, state::Checkpoint, }; @@ -139,55 +139,3 @@ pub fn error_message(msg: impl AsRef) -> ErrorMessage { pub struct BlocksByRootRequest { pub roots: RequestedBlockRoots, } - -impl BlocksByRootRequest { - /// Decode from SSZ bytes with backward compatibility. - /// - /// Tries to decode as new format (container with `roots` field) first. - /// Falls back to old format (transparent - direct list of roots) if that fails. - pub fn from_ssz_bytes_compat(bytes: &[u8]) -> Result { - // Try new format (container) first - SszDecode::from_ssz_bytes(bytes).or_else(|_| { - // Fall back to old format (transparent/direct list) - SszDecode::from_ssz_bytes(bytes).map(|roots| Self { roots }) - }) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use ssz::Encode as SszEncode; - - #[test] - fn test_blocks_by_root_backward_compatibility() { - // Create some test roots - let root1 = H256::from_slice(&[1u8; 32]); - let root2 = H256::from_slice(&[2u8; 32]); - let roots_list = - RequestedBlockRoots::new(vec![root1, root2]).expect("Failed to create roots list"); - - // Encode as old format (direct list, similar to transparent) - let old_format_bytes = roots_list.as_ssz_bytes(); - - // Encode as new format (container) - let new_request = BlocksByRootRequest { - roots: roots_list.clone(), - }; - let new_format_bytes = new_request.as_ssz_bytes(); - - // Both formats should decode successfully - let decoded_from_old = BlocksByRootRequest::from_ssz_bytes_compat(&old_format_bytes) - .expect("Failed to decode old format"); - let decoded_from_new = BlocksByRootRequest::from_ssz_bytes_compat(&new_format_bytes) - .expect("Failed to decode new format"); - - // Both should have the same roots - assert_eq!(decoded_from_old.roots.len(), 2); - assert_eq!(decoded_from_new.roots.len(), 2); - assert_eq!(decoded_from_old.roots[0], root1); - assert_eq!(decoded_from_old.roots[1], root2); - assert_eq!(decoded_from_new.roots[0], root1); - assert_eq!(decoded_from_new.roots[1], root2); - } -} From 5108734b69b115af1aadd8f0cf58f903645d6a70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 13 Feb 2026 13:39:46 -0300 Subject: [PATCH 5/7] refactor: pass milliseconds to on_tick instead of seconds Move the seconds-to-milliseconds conversion from inside on_tick to its callers, so both BlockChainServer::on_tick and store::on_tick work natively with millisecond timestamps. --- crates/blockchain/src/lib.rs | 16 ++++++++-------- crates/blockchain/src/store.rs | 13 +++++++------ crates/blockchain/tests/forkchoice_spectests.rs | 10 +++++----- crates/blockchain/tests/signature_spectests.rs | 4 ++-- 4 files changed, 22 insertions(+), 21 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 65c07af..2c7540b 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -136,13 +136,13 @@ struct BlockChainServer { } impl BlockChainServer { - fn on_tick(&mut self, timestamp: u64) { - let genesis_time = self.store.config().genesis_time; + fn on_tick(&mut self, timestamp_ms: u64) { + let genesis_time_ms = self.store.config().genesis_time * 1000; - // Calculate current slot and interval from seconds - let time_since_genesis = timestamp.saturating_sub(genesis_time); - let slot = time_since_genesis / SECONDS_PER_SLOT; - let interval = (time_since_genesis % SECONDS_PER_SLOT) * 1000 / MILLISECONDS_PER_INTERVAL; + // Calculate current slot and interval from milliseconds + let time_since_genesis_ms = timestamp_ms.saturating_sub(genesis_time_ms); + let slot = time_since_genesis_ms / MILLISECONDS_PER_SLOT; + let interval = (time_since_genesis_ms % MILLISECONDS_PER_SLOT) / MILLISECONDS_PER_INTERVAL; // Update current slot metric metrics::update_current_slot(slot); @@ -157,7 +157,7 @@ impl BlockChainServer { // Tick the store first - this accepts attestations at interval 0 if we have a proposal store::on_tick( &mut self.store, - timestamp, + timestamp_ms, proposer_validator_id.is_some(), self.is_aggregator, ); @@ -453,7 +453,7 @@ impl GenServer for BlockChainServer { let timestamp = SystemTime::UNIX_EPOCH .elapsed() .expect("already past the unix epoch"); - self.on_tick(timestamp.as_secs()); + self.on_tick(timestamp.as_millis() as u64); // Schedule the next tick at the next 800ms interval boundary let ms_since_epoch = timestamp.as_millis() as u64; let ms_to_next_interval = diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 9de9b75..13ad8c4 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -21,7 +21,7 @@ use ethlambda_types::{ }; use tracing::{info, trace, warn}; -use crate::{INTERVALS_PER_SLOT, MILLISECONDS_PER_INTERVAL, SECONDS_PER_SLOT, metrics}; +use crate::{INTERVALS_PER_SLOT, MILLISECONDS_PER_INTERVAL, MILLISECONDS_PER_SLOT, metrics}; const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; @@ -260,9 +260,10 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() /// 800ms interval. Slot and interval-within-slot are derived as: /// slot = store.time() / INTERVALS_PER_SLOT /// interval = store.time() % INTERVALS_PER_SLOT -pub fn on_tick(store: &mut Store, timestamp: u64, has_proposal: bool, is_aggregator: bool) { - // Convert UNIX timestamp to interval count since genesis - let time_delta_ms = timestamp.saturating_sub(store.config().genesis_time) * 1000; +pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool, is_aggregator: bool) { + // Convert UNIX timestamp (ms) to interval count since genesis + let genesis_time_ms = store.config().genesis_time * 1000; + let time_delta_ms = timestamp_ms.saturating_sub(genesis_time_ms); let time = time_delta_ms / MILLISECONDS_PER_INTERVAL; // If we're more than a slot behind, fast-forward to a slot before. @@ -683,10 +684,10 @@ pub fn produce_attestation_data(store: &Store, slot: u64) -> AttestationData { /// before returning the canonical head. fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { // Calculate time corresponding to this slot - let slot_time = store.config().genesis_time + slot * SECONDS_PER_SLOT; + let slot_time_ms = store.config().genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT; // Advance time to current slot (ticking intervals) - on_tick(store, slot_time, true, false); + on_tick(store, slot_time_ms, true, false); // Process any pending attestations before proposal accept_new_attestations(store); diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index ac4c5f2..69909f6 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -78,11 +78,11 @@ fn run(path: &Path) -> datatest_stable::Result<()> { let signed_block = build_signed_block(block_data); - let block_time = - signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time; + let block_time_ms = + (signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time) * 1000; // NOTE: the has_proposal argument is set to true, following the spec - store::on_tick(&mut store, block_time, true, false); + store::on_tick(&mut store, block_time_ms, true, false); let result = store::on_block(&mut store, signed_block); match (result.is_ok(), step.valid) { @@ -105,9 +105,9 @@ fn run(path: &Path) -> datatest_stable::Result<()> { } } "tick" => { - let timestamp = step.time.expect("tick step missing time"); + let timestamp_ms = step.time.expect("tick step missing time") * 1000; // NOTE: the has_proposal argument is set to false, following the spec - store::on_tick(&mut store, timestamp, false, false); + store::on_tick(&mut store, timestamp_ms, false, false); } other => { // Fail for unsupported step types for now diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index 2d5a75b..6d0caca 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -49,8 +49,8 @@ fn run(path: &Path) -> datatest_stable::Result<()> { let signed_block: SignedBlockWithAttestation = test.signed_block_with_attestation.into(); // Advance time to the block's slot - let block_time = signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time; - store::on_tick(&mut st, block_time, true, false); + let block_time_ms = (signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time) * 1000; + store::on_tick(&mut st, block_time_ms, true, false); // Process the block (this includes signature verification) let result = store::on_block(&mut st, signed_block); From 1702d6a603f083ece0ede61b9db491084c24de50 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Fri, 13 Feb 2026 17:15:53 -0300 Subject: [PATCH 6/7] docs: document `--is-aggregator` flag requirement for finalization (#132) ## Motivation During devnet-3 debugging we found that the network was producing blocks but never finalizing (`justified_slot=0`, `finalized_slot=0` after 450+ slots). The root cause: no node was started with `--is-aggregator`, so attestation signatures were silently dropped after verification (`store.rs:368`), resulting in all blocks having `attestation_count=0`. ## Description - **CLAUDE.md**: Add "Aggregator Flag Required for Finalization" to the Common Gotchas section, documenting the full attestation pipeline, root cause, and symptom - **README.md**: Add a note in the devnet section warning that at least one node must use `--is-aggregator` when running manually ## How to Test - Read the docs changes for accuracy - Verify by running a devnet without `--is-aggregator` (blocks produce, no finalization) vs. with it on at least one node (finalization advances) --- CLAUDE.md | 7 +++++++ README.md | 2 ++ 2 files changed, 9 insertions(+) diff --git a/CLAUDE.md b/CLAUDE.md index 19864d7..e612769 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -308,6 +308,13 @@ cargo test -p ethlambda-blockchain --features skip-signature-verification --test ## Common Gotchas +### Aggregator Flag Required for Finalization +- At least one node **must** be started with `--is-aggregator` to finalize blocks in production (without `skip-signature-verification`) +- Without this flag, attestations pass signature verification and are logged as "Attestation processed", but the signature is never stored for aggregation (`store.rs:368`), so blocks are always built with `attestation_count=0` +- The attestation pipeline: gossip → verify signature → store gossip signature (only if `is_aggregator`) → aggregate at interval 2 → promote to known → pack into blocks +- With `skip-signature-verification` (tests only), attestations bypass aggregation and go directly to `new_aggregated_payloads`, so the flag is not needed +- **Symptom**: `justified_slot=0` and `finalized_slot=0` indefinitely despite healthy block production and attestation gossip + ### Signature Verification - Tests require `skip-signature-verification` feature for performance - Crypto tests marked `#[ignore]` (slow leanVM operations) diff --git a/README.md b/README.md index 0fa249d..8f870a9 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,8 @@ make run-devnet This generates fresh genesis files and starts all configured clients with metrics enabled. Press `Ctrl+C` to stop all nodes. +> **Important:** When running nodes manually (outside `make run-devnet`), at least one node must be started with `--is-aggregator` for attestations to be aggregated and included in blocks. Without this flag, the network will produce blocks but never finalize. + For custom devnet configurations, go to `lean-quickstart/local-devnet/genesis/validator-config.yaml` and edit the file before running the command above. See `lean-quickstart`'s documentation for more details on how to configure the devnet. ## Philosophy From c92d1758e5b897ab76dadefdff9f9bb89f2484b2 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Fri, 13 Feb 2026 17:17:30 -0300 Subject: [PATCH 7/7] Run cargo fmt --- crates/blockchain/tests/signature_spectests.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index 6d0caca..0c72bfe 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -49,7 +49,8 @@ fn run(path: &Path) -> datatest_stable::Result<()> { let signed_block: SignedBlockWithAttestation = test.signed_block_with_attestation.into(); // Advance time to the block's slot - let block_time_ms = (signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time) * 1000; + let block_time_ms = + (signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time) * 1000; store::on_tick(&mut st, block_time_ms, true, false); // Process the block (this includes signature verification)