diff --git a/CLAUDE.md b/CLAUDE.md index 19864d7..e612769 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -308,6 +308,13 @@ cargo test -p ethlambda-blockchain --features skip-signature-verification --test ## Common Gotchas +### Aggregator Flag Required for Finalization +- At least one node **must** be started with `--is-aggregator` to finalize blocks in production (without `skip-signature-verification`) +- Without this flag, attestations pass signature verification and are logged as "Attestation processed", but the signature is never stored for aggregation (`store.rs:368`), so blocks are always built with `attestation_count=0` +- The attestation pipeline: gossip → verify signature → store gossip signature (only if `is_aggregator`) → aggregate at interval 2 → promote to known → pack into blocks +- With `skip-signature-verification` (tests only), attestations bypass aggregation and go directly to `new_aggregated_payloads`, so the flag is not needed +- **Symptom**: `justified_slot=0` and `finalized_slot=0` indefinitely despite healthy block production and attestation gossip + ### Signature Verification - Tests require `skip-signature-verification` feature for performance - Crypto tests marked `#[ignore]` (slow leanVM operations) diff --git a/Makefile b/Makefile index 36bdc13..b9d74f5 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,7 @@ docker-build: ## 🐳 Build the Docker image -t ghcr.io/lambdaclass/ethlambda:$(DOCKER_TAG) . @echo -LEAN_SPEC_COMMIT_HASH:=4edcf7bc9271e6a70ded8aff17710d68beac4266 +LEAN_SPEC_COMMIT_HASH:=b39472e73f8a7d603cc13d14426eed14c6eff6f1 leanSpec: git clone https://github.com/leanEthereum/leanSpec.git --single-branch diff --git a/README.md b/README.md index cfce2f9..06bd343 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,8 @@ make run-devnet This generates fresh genesis files and starts all configured clients with metrics enabled. Press `Ctrl+C` to stop all nodes. +> **Important:** When running nodes manually (outside `make run-devnet`), at least one node must be started with `--is-aggregator` for attestations to be aggregated and included in blocks. Without this flag, the network will produce blocks but never finalize. + For custom devnet configurations, go to `lean-quickstart/local-devnet/genesis/validator-config.yaml` and edit the file before running the command above. See `lean-quickstart`'s documentation for more details on how to configure the devnet. ## Philosophy diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index c7b54b9..bc5c9a1 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -51,6 +51,9 @@ struct CliOptions { /// When set, skips genesis initialization and syncs from checkpoint. #[arg(long)] checkpoint_sync_url: Option, + /// Whether this node acts as a committee aggregator + #[arg(long, default_value = "false")] + is_aggregator: bool, } #[tokio::main] @@ -114,7 +117,10 @@ async fn main() -> eyre::Result<()> { .inspect_err(|err| error!(%err, "Failed to initialize state"))?; let (p2p_tx, p2p_rx) = tokio::sync::mpsc::unbounded_channel(); - let blockchain = BlockChain::spawn(store.clone(), p2p_tx, validator_keys); + // Use first validator ID for subnet subscription + let first_validator_id = validator_keys.keys().min().copied(); + let blockchain = + BlockChain::spawn(store.clone(), p2p_tx, validator_keys, options.is_aggregator); let p2p_handle = tokio::spawn(start_p2p( node_p2p_key, @@ -123,6 +129,7 @@ async fn main() -> eyre::Result<()> { blockchain, p2p_rx, store.clone(), + first_validator_id, )); ethlambda_rpc::start_rpc_server(metrics_socket, store) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 998159c..2c7540b 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -5,7 +5,7 @@ use ethlambda_state_transition::is_proposer; use ethlambda_storage::Store; use ethlambda_types::{ ShortRoot, - attestation::{Attestation, AttestationData, SignedAttestation}, + attestation::{Attestation, AttestationData, SignedAggregatedAttestation, SignedAttestation}, block::{BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation}, primitives::{H256, ssz::TreeHash}, signature::ValidatorSecretKey, @@ -30,6 +30,8 @@ pub enum P2PMessage { PublishAttestation(SignedAttestation), /// Publish a block to the gossip network. PublishBlock(SignedBlockWithAttestation), + /// Publish an aggregated attestation to the gossip network. + PublishAggregatedAttestation(SignedAggregatedAttestation), /// Fetch a block by its root hash. FetchBlock(H256), } @@ -38,14 +40,23 @@ pub struct BlockChain { handle: GenServerHandle, } -/// Seconds in a slot. Each slot has 4 intervals of 1 second each. +/// Seconds in a slot. pub const SECONDS_PER_SLOT: u64 = 4; +/// Milliseconds in a slot. +pub const MILLISECONDS_PER_SLOT: u64 = 4_000; +/// Milliseconds per interval (800ms ticks). +pub const MILLISECONDS_PER_INTERVAL: u64 = 800; +/// Number of intervals per slot (5 intervals of 800ms = 4 seconds). +pub const INTERVALS_PER_SLOT: u64 = 5; +/// Number of attestation committees per slot. +pub const ATTESTATION_COMMITTEE_COUNT: u64 = 1; impl BlockChain { pub fn spawn( store: Store, p2p_tx: mpsc::UnboundedSender, validator_keys: HashMap, + is_aggregator: bool, ) -> BlockChain { let genesis_time = store.config().genesis_time; let key_manager = key_manager::KeyManager::new(validator_keys); @@ -54,6 +65,7 @@ impl BlockChain { p2p_tx, key_manager, pending_blocks: HashMap::new(), + is_aggregator, pending_block_parents: HashMap::new(), } .start(); @@ -85,6 +97,20 @@ impl BlockChain { .await .inspect_err(|err| error!(%err, "Failed to notify BlockChain of new attestation")); } + + /// Sends an aggregated attestation to the BlockChain for processing. + pub async fn notify_new_aggregated_attestation( + &mut self, + attestation: SignedAggregatedAttestation, + ) { + let _ = self + .handle + .cast(CastMessage::NewAggregatedAttestation(attestation)) + .await + .inspect_err( + |err| error!(%err, "Failed to notify BlockChain of new aggregated attestation"), + ); + } } /// GenServer that sequences all blockchain updates. @@ -104,16 +130,19 @@ struct BlockChainServer { // chain at lookup time, since a cached ancestor may itself have become pending with // a deeper missing parent after the entry was created. pending_block_parents: HashMap, + + /// Whether this node acts as a committee aggregator. + is_aggregator: bool, } impl BlockChainServer { - fn on_tick(&mut self, timestamp: u64) { - let genesis_time = self.store.config().genesis_time; + fn on_tick(&mut self, timestamp_ms: u64) { + let genesis_time_ms = self.store.config().genesis_time * 1000; - // Calculate current slot and interval - let time_since_genesis = timestamp.saturating_sub(genesis_time); - let slot = time_since_genesis / SECONDS_PER_SLOT; - let interval = time_since_genesis % SECONDS_PER_SLOT; + // Calculate current slot and interval from milliseconds + let time_since_genesis_ms = timestamp_ms.saturating_sub(genesis_time_ms); + let slot = time_since_genesis_ms / MILLISECONDS_PER_SLOT; + let interval = (time_since_genesis_ms % MILLISECONDS_PER_SLOT) / MILLISECONDS_PER_INTERVAL; // Update current slot metric metrics::update_current_slot(slot); @@ -126,7 +155,12 @@ impl BlockChainServer { .flatten(); // Tick the store first - this accepts attestations at interval 0 if we have a proposal - store::on_tick(&mut self.store, timestamp, proposer_validator_id.is_some()); + store::on_tick( + &mut self.store, + timestamp_ms, + proposer_validator_id.is_some(), + self.is_aggregator, + ); // Now build and publish the block (after attestations have been accepted) if let Some(validator_id) = proposer_validator_id { @@ -138,7 +172,7 @@ impl BlockChainServer { self.produce_attestations(slot); } - // Update safe target slot metric (updated by store.on_tick at interval 2) + // Update safe target slot metric (updated by store.on_tick at interval 3) metrics::update_safe_target_slot(self.store.safe_target_slot()); } @@ -374,15 +408,21 @@ impl BlockChainServer { } fn on_gossip_attestation(&mut self, attestation: SignedAttestation) { - let _ = store::on_gossip_attestation(&mut self.store, attestation) + let _ = store::on_gossip_attestation(&mut self.store, attestation, self.is_aggregator) .inspect_err(|err| warn!(%err, "Failed to process gossiped attestation")); } + + fn on_gossip_aggregated_attestation(&mut self, attestation: SignedAggregatedAttestation) { + let _ = store::on_gossip_aggregated_attestation(&mut self.store, attestation) + .inspect_err(|err| warn!(%err, "Failed to process gossiped aggregated attestation")); + } } #[derive(Clone, Debug)] enum CastMessage { NewBlock(SignedBlockWithAttestation), NewAttestation(SignedAttestation), + NewAggregatedAttestation(SignedAggregatedAttestation), Tick, } @@ -413,12 +453,13 @@ impl GenServer for BlockChainServer { let timestamp = SystemTime::UNIX_EPOCH .elapsed() .expect("already past the unix epoch"); - self.on_tick(timestamp.as_secs()); - // Schedule the next tick at the start of the next second - let millis_to_next_sec = - ((timestamp.as_secs() as u128 + 1) * 1000 - timestamp.as_millis()) as u64; + self.on_tick(timestamp.as_millis() as u64); + // Schedule the next tick at the next 800ms interval boundary + let ms_since_epoch = timestamp.as_millis() as u64; + let ms_to_next_interval = + MILLISECONDS_PER_INTERVAL - (ms_since_epoch % MILLISECONDS_PER_INTERVAL); send_after( - Duration::from_millis(millis_to_next_sec), + Duration::from_millis(ms_to_next_interval), handle.clone(), message, ); @@ -427,6 +468,9 @@ impl GenServer for BlockChainServer { self.on_block(signed_block); } CastMessage::NewAttestation(attestation) => self.on_gossip_attestation(attestation), + CastMessage::NewAggregatedAttestation(attestation) => { + self.on_gossip_aggregated_attestation(attestation); + } } CastResponse::NoReply } diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 04138a9..13ad8c4 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -4,34 +4,40 @@ use ethlambda_crypto::aggregate_signatures; use ethlambda_state_transition::{ is_proposer, process_block, process_slots, slot_is_justifiable_after, }; -use ethlambda_storage::{ForkCheckpoints, SignatureKey, Store}; +use ethlambda_storage::{ForkCheckpoints, SignatureKey, Store, StoredAggregatedPayload}; use ethlambda_types::{ ShortRoot, - attestation::{AggregatedAttestation, Attestation, AttestationData, SignedAttestation}, + attestation::{ + AggregatedAttestation, Attestation, AttestationData, SignedAggregatedAttestation, + SignedAttestation, + }, block::{ AggregatedAttestations, AggregatedSignatureProof, AggregationBits, Block, BlockBody, SignedBlockWithAttestation, }, primitives::{H256, ssz::TreeHash}, signature::ValidatorSignature, - state::{Checkpoint, State, Validator}, + state::{Checkpoint, State}, }; use tracing::{info, trace, warn}; -use crate::{SECONDS_PER_SLOT, metrics}; +use crate::{INTERVALS_PER_SLOT, MILLISECONDS_PER_INTERVAL, MILLISECONDS_PER_SLOT, metrics}; const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; -/// Accept new attestations, moving them from pending to known. +/// Accept new aggregated payloads, promoting them to known for fork choice. fn accept_new_attestations(store: &mut Store) { - store.promote_new_attestations(); + store.promote_new_aggregated_payloads(); update_head(store); } /// Update the head based on the fork choice rule. fn update_head(store: &mut Store) { let blocks = store.get_live_chain(); - let attestations: HashMap = store.iter_known_attestations().collect(); + let attestations = extract_attestations_from_aggregated_payloads( + store, + store.iter_known_aggregated_payloads(), + ); let old_head = store.head(); let new_head = ethlambda_fork_choice::compute_lmd_ghost_head( store.latest_justified().root, @@ -76,7 +82,8 @@ fn update_safe_target(store: &mut Store) { let min_target_score = (num_validators * 2).div_ceil(3); let blocks = store.get_live_chain(); - let attestations: HashMap = store.iter_new_attestations().collect(); + let attestations = + extract_attestations_from_aggregated_payloads(store, store.iter_new_aggregated_payloads()); let safe_target = ethlambda_fork_choice::compute_lmd_ghost_head( store.latest_justified().root, &blocks, @@ -86,15 +93,122 @@ fn update_safe_target(store: &mut Store) { store.set_safe_target(safe_target); } +/// Reconstruct per-validator attestation data from aggregated payloads. +/// +/// For each (validator_id, data_root) key in the payloads, looks up the +/// attestation data by root. Returns the latest attestation per validator +/// (by slot). +fn extract_attestations_from_aggregated_payloads( + store: &Store, + payloads: impl Iterator)>, +) -> HashMap { + let mut result: HashMap = HashMap::new(); + + for ((validator_id, data_root), _payload_list) in payloads { + let Some(data) = store.get_attestation_data_by_root(&data_root) else { + continue; + }; + + let should_update = result + .get(&validator_id) + .is_none_or(|existing| existing.slot < data.slot); + + if should_update { + result.insert(validator_id, data); + } + } + + result +} + +/// Aggregate committee signatures at interval 2. +/// +/// Collects individual gossip signatures, aggregates them by attestation data, +/// and stores the resulting proofs in `LatestNewAggregatedPayloads`. +fn aggregate_committee_signatures(store: &mut Store) { + let gossip_sigs: Vec<(SignatureKey, _)> = store.iter_gossip_signatures().collect(); + if gossip_sigs.is_empty() { + return; + } + + let head_state = store.head_state(); + let validators = &head_state.validators; + + // Group gossip signatures by data_root for batch aggregation + let mut groups: HashMap> = HashMap::new(); + let mut keys_to_delete: Vec = Vec::new(); + + for ((validator_id, data_root), stored_sig) in &gossip_sigs { + if let Ok(sig) = stored_sig.to_validator_signature() { + groups + .entry(*data_root) + .or_default() + .push((*validator_id, sig)); + } + } + + for (data_root, validators_and_sigs) in groups { + let Some(data) = store.get_attestation_data_by_root(&data_root) else { + continue; + }; + + let slot = data.slot; + let message = data.tree_hash_root(); + + let mut sigs = vec![]; + let mut pubkeys = vec![]; + let mut ids = vec![]; + + for (vid, sig) in &validators_and_sigs { + let Some(validator) = validators.get(*vid as usize) else { + continue; + }; + let Ok(pubkey) = validator.get_pubkey() else { + continue; + }; + sigs.push(sig.clone()); + pubkeys.push(pubkey); + ids.push(*vid); + } + + if ids.is_empty() { + continue; + } + + let Ok(proof_data) = aggregate_signatures(pubkeys, sigs, &message, slot as u32) + .inspect_err(|err| warn!(%err, "Failed to aggregate committee signatures")) + else { + continue; + }; + + let participants = aggregation_bits_from_validator_indices(&ids); + let proof = AggregatedSignatureProof::new(participants, proof_data); + let payload = StoredAggregatedPayload { slot, proof }; + + // Store in new aggregated payloads for each covered validator + for vid in &ids { + store.insert_new_aggregated_payload((*vid, data_root), payload.clone()); + } + + // Only delete successfully aggregated signatures + keys_to_delete.extend(ids.iter().map(|vid| (*vid, data_root))); + + metrics::inc_pq_sig_aggregated_signatures(); + metrics::inc_pq_sig_attestations_in_aggregated_signatures(ids.len() as u64); + } + + // Delete aggregated entries from gossip_signatures + store.delete_gossip_signatures(&keys_to_delete); +} + /// Validate incoming attestation before processing. /// /// Ensures the vote respects the basic laws of time and topology: /// 1. The blocks voted for must exist in our store. /// 2. A vote cannot span backwards in time (source > target). /// 3. A vote cannot be for a future slot. -fn validate_attestation(store: &Store, attestation: &Attestation) -> Result<(), StoreError> { +fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<(), StoreError> { let _timing = metrics::time_attestation_validation(); - let data = &attestation.data; // Availability Check - We cannot count a vote if we haven't seen the blocks involved. let source_header = store @@ -129,7 +243,7 @@ fn validate_attestation(store: &Store, attestation: &Attestation) -> Result<(), // Time Check - Validate attestation is not too far in the future. // We allow a small margin for clock disparity (1 slot), but no further. - let current_slot = store.time() / SECONDS_PER_SLOT; + let current_slot = store.time() / INTERVALS_PER_SLOT; if data.slot > current_slot + 1 { return Err(StoreError::AttestationTooFarInFuture { attestation_slot: data.slot, @@ -141,20 +255,28 @@ fn validate_attestation(store: &Store, attestation: &Attestation) -> Result<(), } /// Process a tick event. -pub fn on_tick(store: &mut Store, timestamp: u64, has_proposal: bool) { - let time = timestamp - store.config().genesis_time; +/// +/// `store.time()` represents interval-count-since-genesis: each increment is one +/// 800ms interval. Slot and interval-within-slot are derived as: +/// slot = store.time() / INTERVALS_PER_SLOT +/// interval = store.time() % INTERVALS_PER_SLOT +pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool, is_aggregator: bool) { + // Convert UNIX timestamp (ms) to interval count since genesis + let genesis_time_ms = store.config().genesis_time * 1000; + let time_delta_ms = timestamp_ms.saturating_sub(genesis_time_ms); + let time = time_delta_ms / MILLISECONDS_PER_INTERVAL; // If we're more than a slot behind, fast-forward to a slot before. // Operations are idempotent, so this should be fine. - if time.saturating_sub(store.time()) > SECONDS_PER_SLOT { - store.set_time(time - SECONDS_PER_SLOT); + if time.saturating_sub(store.time()) > INTERVALS_PER_SLOT { + store.set_time(time - INTERVALS_PER_SLOT); } while store.time() < time { store.set_time(store.time() + 1); - let slot = store.time() / SECONDS_PER_SLOT; - let interval = store.time() % SECONDS_PER_SLOT; + let slot = store.time() / INTERVALS_PER_SLOT; + let interval = store.time() % INTERVALS_PER_SLOT; trace!(%slot, %interval, "processing tick"); @@ -171,33 +293,44 @@ pub fn on_tick(store: &mut Store, timestamp: u64, has_proposal: bool) { } } 1 => { - // Second interval - no action + // Vote propagation — no action } 2 => { - // Mid-slot - update safe target for validators - update_safe_target(store); + // Aggregation interval + if is_aggregator { + aggregate_committee_signatures(store); + } } 3 => { + // Update safe target for validators + update_safe_target(store); + } + 4 => { // End of slot - accept accumulated attestations accept_new_attestations(store); } - _ => unreachable!("slots only have 4 intervals"), + _ => unreachable!("slots only have 5 intervals"), } } } /// Process a gossiped attestation. +/// +/// Verifies the signature, stores attestation data by root, and (if this node +/// is an aggregator) stores the gossip signature for later aggregation. pub fn on_gossip_attestation( store: &mut Store, signed_attestation: SignedAttestation, + is_aggregator: bool, ) -> Result<(), StoreError> { let validator_id = signed_attestation.validator_id; let attestation = Attestation { validator_id, data: signed_attestation.message, }; - validate_attestation(store, &attestation) + validate_attestation_data(store, &attestation.data) .inspect_err(|_| metrics::inc_attestations_invalid("gossip"))?; + let target = attestation.data.target; let target_state = store .get_state(&target.root) @@ -211,7 +344,6 @@ pub fn on_gossip_attestation( let message = attestation.data.tree_hash_root(); if cfg!(not(feature = "skip-signature-verification")) { use ethlambda_types::signature::ValidatorSignature; - // Use attestation.data.slot as epoch (matching what Zeam and ethlambda use for signing) let epoch: u32 = attestation.data.slot.try_into().expect("slot exceeds u32"); let signature = ValidatorSignature::from_bytes(&signed_attestation.signature) .map_err(|_| StoreError::SignatureDecodingFailed)?; @@ -219,14 +351,28 @@ pub fn on_gossip_attestation( return Err(StoreError::SignatureVerificationFailed); } } - on_attestation(store, attestation.clone(), false)?; - if cfg!(not(feature = "skip-signature-verification")) { - // Store signature for later lookup during block building + // Store attestation data by root (content-addressed, idempotent) + let data_root = attestation.data.tree_hash_root(); + store.insert_attestation_data_by_root(data_root, attestation.data.clone()); + + if cfg!(feature = "skip-signature-verification") { + // Without signature verification, insert directly into new aggregated payloads + // with a dummy proof so the fork choice pipeline still sees attestations. + let participants = aggregation_bits_from_validator_indices(&[validator_id]); + let payload = StoredAggregatedPayload { + slot: attestation.data.slot, + proof: AggregatedSignatureProof::empty(participants), + }; + store.insert_new_aggregated_payload((validator_id, data_root), payload); + } else if is_aggregator { + // With verification, store gossip signature for later aggregation at interval 2. + // With ATTESTATION_COMMITTEE_COUNT=1, all validators are in the same subnet. let signature = ValidatorSignature::from_bytes(&signed_attestation.signature) .map_err(|_| StoreError::SignatureDecodingFailed)?; store.insert_gossip_signature(&attestation.data, validator_id, signature); } + metrics::inc_attestations_valid("gossip"); let slot = attestation.data.slot; @@ -245,68 +391,78 @@ pub fn on_gossip_attestation( Ok(()) } -/// Process a new attestation and place it into the correct attestation stage. -/// -/// Attestations can come from: -/// - a block body (on-chain, `is_from_block=true`), or -/// - the gossip network (off-chain, `is_from_block=false`). +/// Process a gossiped aggregated attestation from the aggregation subnet. /// -/// The Attestation Pipeline: -/// - Stage 1 (latest_new_attestations): Pending attestations not yet counted in fork choice. -/// - Stage 2 (latest_known_attestations): Active attestations used by LMD-GHOST. -fn on_attestation( +/// Aggregated attestations arrive from committee aggregators and contain a proof +/// covering multiple validators. We store one aggregated payload entry per +/// participating validator so the fork choice extraction works uniformly. +pub fn on_gossip_aggregated_attestation( store: &mut Store, - attestation: Attestation, - is_from_block: bool, + aggregated: SignedAggregatedAttestation, ) -> Result<(), StoreError> { - // First, ensure the attestation is structurally and temporally valid. - validate_attestation(store, &attestation)?; - - let validator_id = attestation.validator_id; - let attestation_data = attestation.data; - let attestation_slot = attestation_data.slot; + validate_attestation_data(store, &aggregated.data) + .inspect_err(|_| metrics::inc_attestations_invalid("aggregated"))?; - if is_from_block { - // On-chain attestation processing - // These are historical attestations from other validators included by the proposer. - // They are processed immediately as "known" attestations. + // Verify aggregated proof signature + if cfg!(not(feature = "skip-signature-verification")) { + let target_state = store + .get_state(&aggregated.data.target.root) + .ok_or(StoreError::MissingTargetState(aggregated.data.target.root))?; + let validators = &target_state.validators; + let num_validators = validators.len() as u64; + + let participant_indices: Vec = aggregated.proof.participant_indices().collect(); + if participant_indices.iter().any(|&vid| vid >= num_validators) { + return Err(StoreError::InvalidValidatorIndex); + } - let should_update = store - .get_known_attestation(&validator_id) - .is_none_or(|latest| latest.slot < attestation_slot); + let pubkeys: Vec<_> = participant_indices + .iter() + .map(|&vid| { + validators[vid as usize] + .get_pubkey() + .map_err(|_| StoreError::PubkeyDecodingFailed(vid)) + }) + .collect::>()?; - if should_update { - store.insert_known_attestation(validator_id, attestation_data.clone()); - } + let message = aggregated.data.tree_hash_root(); + let epoch: u32 = aggregated.data.slot.try_into().expect("slot exceeds u32"); - // Remove pending attestation if superseded by on-chain attestation - if let Some(existing_new) = store.get_new_attestation(&validator_id) - && existing_new.slot <= attestation_slot - { - store.remove_new_attestation(&validator_id); - } - } else { - // Network gossip attestation processing - // These enter the "new" stage and must wait for interval tick acceptance. - - // Reject attestations from future slots - let current_slot = store.time() / SECONDS_PER_SLOT; - if attestation_slot > current_slot { - return Err(StoreError::AttestationTooFarInFuture { - attestation_slot, - current_slot, - }); - } + ethlambda_crypto::verify_aggregated_signature( + &aggregated.proof.proof_data, + pubkeys, + &message, + epoch, + ) + .map_err(StoreError::AggregateVerificationFailed)?; + } - let should_update = store - .get_new_attestation(&validator_id) - .is_none_or(|latest| latest.slot < attestation_slot); + // Store attestation data by root (content-addressed, idempotent) + let data_root = aggregated.data.tree_hash_root(); + store.insert_attestation_data_by_root(data_root, aggregated.data.clone()); - if should_update { - store.insert_new_attestation(validator_id, attestation_data); - } + // Store one aggregated payload per participating validator + for validator_id in aggregated.proof.participant_indices() { + let payload = StoredAggregatedPayload { + slot: aggregated.data.slot, + proof: aggregated.proof.clone(), + }; + store.insert_new_aggregated_payload((validator_id, data_root), payload); } + let slot = aggregated.data.slot; + let num_participants = aggregated.proof.participants.num_set_bits(); + info!( + slot, + num_participants, + target_slot = aggregated.data.target.slot, + target_root = %ShortRoot(&aggregated.data.target.root.0), + source_slot = aggregated.data.source.slot, + "Aggregated attestation processed" + ); + + metrics::inc_attestations_valid("aggregated"); + Ok(()) } @@ -380,30 +536,25 @@ pub fn on_block( let attestation_signatures = &signed_block.signature.attestation_signatures; // Process block body attestations. - // TODO: fail the block if an attestation is invalid. Right now we - // just log a warning. + // Store attestation data by root and proofs in known aggregated payloads. for (att, proof) in aggregated_attestations .iter() .zip(attestation_signatures.iter()) { + let data_root = att.data.tree_hash_root(); + store.insert_attestation_data_by_root(data_root, att.data.clone()); + let validator_ids = aggregation_bits_to_validator_indices(&att.aggregation_bits); + let payload = StoredAggregatedPayload { + slot: att.data.slot, + proof: proof.clone(), + }; - for validator_id in validator_ids { - // Update Proof Map - Store the proof so future block builders can reuse this aggregation - store.insert_aggregated_payload(&att.data, validator_id, proof.clone()); + for validator_id in &validator_ids { + // Store proof in known aggregated payloads (active in fork choice) + store.insert_known_aggregated_payload((*validator_id, data_root), payload.clone()); - // Update Fork Choice - Register the vote immediately (historical/on-chain) - let attestation = Attestation { - validator_id, - data: att.data.clone(), - }; - // TODO: validate attestations before processing - let _ = on_attestation(store, attestation, true) - .inspect(|_| metrics::inc_attestations_valid("block")) - .inspect_err(|err| { - warn!(%slot, %validator_id, %err, "Invalid attestation in block"); - metrics::inc_attestations_invalid("block"); - }); + metrics::inc_attestations_valid("block"); } } @@ -412,31 +563,28 @@ pub fn on_block( // to prevent the proposer from gaining circular weight advantage. update_head(store); - // Process proposer attestation as if received via gossip + // Process proposer attestation as pending (enters "new" stage via gossip path) // The proposer's attestation should NOT affect this block's fork choice position. - // It is treated as pending until interval 3 (end of slot). - - if cfg!(not(feature = "skip-signature-verification")) { + let proposer_vid = proposer_attestation.validator_id; + let proposer_data_root = proposer_attestation.data.tree_hash_root(); + store.insert_attestation_data_by_root(proposer_data_root, proposer_attestation.data.clone()); + + if cfg!(feature = "skip-signature-verification") { + // Without sig verification, insert directly with a dummy proof + let participants = aggregation_bits_from_validator_indices(&[proposer_vid]); + let payload = StoredAggregatedPayload { + slot: proposer_attestation.data.slot, + proof: AggregatedSignatureProof::empty(participants), + }; + store.insert_new_aggregated_payload((proposer_vid, proposer_data_root), payload); + } else { // Store the proposer's signature for potential future block building let proposer_sig = ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature) .map_err(|_| StoreError::SignatureDecodingFailed)?; - store.insert_gossip_signature( - &proposer_attestation.data, - proposer_attestation.validator_id, - proposer_sig, - ); + store.insert_gossip_signature(&proposer_attestation.data, proposer_vid, proposer_sig); } - // Process proposer attestation (enters "new" stage, not "known") - // TODO: validate attestations before processing - let _ = on_attestation(store, proposer_attestation, false) - .inspect(|_| metrics::inc_attestations_valid("gossip")) - .inspect_err(|err| { - warn!(%slot, %err, "Invalid proposer attestation in block"); - metrics::inc_attestations_invalid("block"); - }); - info!(%slot, %block_root, %state_root, "Processed new block"); Ok(()) } @@ -536,10 +684,10 @@ pub fn produce_attestation_data(store: &Store, slot: u64) -> AttestationData { /// before returning the canonical head. fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { // Calculate time corresponding to this slot - let slot_time = store.config().genesis_time + slot * SECONDS_PER_SLOT; + let slot_time_ms = store.config().genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT; // Advance time to current slot (ticking intervals) - on_tick(store, slot_time, true); + on_tick(store, slot_time_ms, true, false); // Process any pending attestations before proposal accept_new_attestations(store); @@ -575,22 +723,22 @@ pub fn produce_block_with_signatures( }); } - // Convert AttestationData to Attestation objects for build_block - let available_attestations: Vec = store - .iter_known_attestations() + // Convert known aggregated payloads to Attestation objects for build_block + let known_attestations = extract_attestations_from_aggregated_payloads( + store, + store.iter_known_aggregated_payloads(), + ); + let available_attestations: Vec = known_attestations + .into_iter() .map(|(validator_id, data)| Attestation { validator_id, data }) .collect(); // Get known block roots for attestation validation let known_block_roots = store.get_block_roots(); - // Collect signature data for block building - let gossip_signatures: HashMap = store - .iter_gossip_signatures() - .filter_map(|(key, stored)| stored.to_validator_signature().ok().map(|sig| (key, sig))) - .collect(); + // Collect existing proofs for block building from known aggregated payloads let aggregated_payloads: HashMap> = store - .iter_aggregated_payloads() + .iter_known_aggregated_payloads() .map(|(key, stored_payloads)| { let proofs = stored_payloads.into_iter().map(|sp| sp.proof).collect(); (key, proofs) @@ -605,7 +753,6 @@ pub fn produce_block_with_signatures( head_root, &available_attestations, &known_block_roots, - &gossip_signatures, &aggregated_payloads, )?; @@ -768,7 +915,6 @@ fn aggregate_attestations_by_data(attestations: &[Attestation]) -> Vec, - gossip_signatures: &HashMap, aggregated_payloads: &HashMap>, ) -> Result<(Block, State, Vec), StoreError> { // Start with empty attestation set @@ -786,7 +931,7 @@ fn build_block( let mut included_keys: HashSet = HashSet::new(); // Fixed-point loop: collect attestations until no new ones can be added - let post_state = loop { + let _post_state = loop { // Aggregate attestations by data for the candidate block let aggregated = aggregate_attestations_by_data(&included_attestations); let attestations: AggregatedAttestations = aggregated @@ -835,11 +980,8 @@ fn build_block( continue; } - // Only include if we have a signature for this attestation - let has_gossip_sig = gossip_signatures.contains_key(&sig_key); - let has_block_proof = aggregated_payloads.contains_key(&sig_key); - - if has_gossip_sig || has_block_proof { + // Only include if we have a proof for this attestation + if aggregated_payloads.contains_key(&sig_key) { new_attestations.push(attestation.clone()); included_keys.insert(sig_key); } @@ -854,13 +996,9 @@ fn build_block( included_attestations.extend(new_attestations); }; - // Compute the aggregated signatures for the attestations. - let (aggregated_attestations, aggregated_signatures) = compute_aggregated_signatures( - post_state.validators.iter().as_slice(), - &included_attestations, - gossip_signatures, - aggregated_payloads, - )?; + // Select existing proofs for the attestations to include in the block. + let (aggregated_attestations, aggregated_signatures) = + select_aggregated_proofs(&included_attestations, aggregated_payloads)?; let attestations: AggregatedAttestations = aggregated_attestations .try_into() @@ -884,15 +1022,15 @@ fn build_block( Ok((final_block, post_state, aggregated_signatures)) } -/// Compute aggregated signatures for a set of attestations. +/// Select existing aggregated proofs for attestations to include in a block. +/// +/// Fresh gossip aggregation happens at interval 2 (`aggregate_committee_signatures`). +/// This function only selects from existing proofs in the `LatestKnownAggregatedPayloads` table +/// (proofs from previously received blocks and promoted gossip aggregations). /// -/// The result is a list of (attestation, proof) pairs ready for block inclusion. -/// This list might contain attestations with the same data but different signatures. -/// Once we support recursive aggregation, we can aggregate these further. -fn compute_aggregated_signatures( - validators: &[Validator], +/// Returns a list of (attestation, proof) pairs ready for block inclusion. +fn select_aggregated_proofs( attestations: &[Attestation], - gossip_signatures: &HashMap, aggregated_payloads: &HashMap>, ) -> Result<(Vec, Vec), StoreError> { let mut results = vec![]; @@ -902,51 +1040,14 @@ fn compute_aggregated_signatures( let message = data.tree_hash_root(); let validator_ids = aggregation_bits_to_validator_indices(&aggregated.aggregation_bits); + let mut remaining: HashSet = validator_ids.into_iter().collect(); - // Phase 1: Gossip Collection - // Try to aggregate fresh signatures from the network. - let mut gossip_sigs = vec![]; - let mut gossip_keys = vec![]; - let mut gossip_ids = vec![]; - - let mut remaining = HashSet::new(); - - for vid in &validator_ids { - let key = (*vid, message); - if let Some(sig) = gossip_signatures.get(&key) { - let pubkey = validators[*vid as usize] - .get_pubkey() - .expect("valid pubkey"); - - gossip_sigs.push(sig.clone()); - gossip_keys.push(pubkey); - gossip_ids.push(*vid); - } else { - remaining.insert(*vid); - } - } - - if !gossip_ids.is_empty() { - let participants = aggregation_bits_from_validator_indices(&gossip_ids); - let proof_data = - aggregate_signatures(gossip_keys, gossip_sigs, &message, data.slot as u32) - .map_err(StoreError::SignatureAggregationFailed)?; - let aggregated_attestation = AggregatedAttestation { - aggregation_bits: participants.clone(), - data: data.clone(), + // Select existing proofs that cover the most remaining validators + while !remaining.is_empty() { + let Some(&target_id) = remaining.iter().next() else { + break; }; - let aggregate_proof = AggregatedSignatureProof::new(participants, proof_data); - results.push((aggregated_attestation, aggregate_proof)); - - metrics::inc_pq_sig_aggregated_signatures(); - metrics::inc_pq_sig_attestations_in_aggregated_signatures(gossip_ids.len() as u64); - } - // Phase 2: Fallback to existing proofs - // We might have seen proofs for missing signatures in previously-received blocks. - while !aggregated_payloads.is_empty() - && let Some(&target_id) = remaining.iter().next() - { let Some(candidates) = aggregated_payloads .get(&(target_id, message)) .filter(|v| !v.is_empty()) @@ -966,10 +1067,11 @@ fn compute_aggregated_signatures( .max_by_key(|(_, covered)| covered.len()) .expect("candidates is not empty"); - // Sanity check: ensure proof covers at least one remaining validator + // No proof covers any remaining validator if covered.is_empty() { break; } + let aggregate = AggregatedAttestation { aggregation_bits: proof.participants.clone(), data: data.clone(), diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index e1c69e1..69909f6 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -5,14 +5,35 @@ use std::{ }; use ethlambda_blockchain::{SECONDS_PER_SLOT, store}; -use ethlambda_storage::{Store, backend::InMemoryBackend}; +use ethlambda_storage::{SignatureKey, Store, StoredAggregatedPayload, backend::InMemoryBackend}; use ethlambda_types::{ - attestation::Attestation, + attestation::{Attestation, AttestationData}, block::{Block, BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation}, primitives::{H256, VariableList, ssz::TreeHash}, state::State, }; +/// Extract per-validator attestation data from aggregated payloads. +/// Test helper that mirrors the private function in blockchain::store. +fn extract_attestations( + store: &Store, + payloads: impl Iterator)>, +) -> HashMap { + let mut result: HashMap = HashMap::new(); + for ((validator_id, data_root), _) in payloads { + let Some(data) = store.get_attestation_data_by_root(&data_root) else { + continue; + }; + let should_update = result + .get(&validator_id) + .is_none_or(|existing| existing.slot < data.slot); + if should_update { + result.insert(validator_id, data); + } + } + result +} + use crate::types::{ForkChoiceTestVector, StoreChecks}; const SUPPORTED_FIXTURE_FORMAT: &str = "fork_choice_test"; @@ -57,11 +78,11 @@ fn run(path: &Path) -> datatest_stable::Result<()> { let signed_block = build_signed_block(block_data); - let block_time = - signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time; + let block_time_ms = + (signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time) * 1000; // NOTE: the has_proposal argument is set to true, following the spec - store::on_tick(&mut store, block_time, true); + store::on_tick(&mut store, block_time_ms, true, false); let result = store::on_block(&mut store, signed_block); match (result.is_ok(), step.valid) { @@ -84,9 +105,9 @@ fn run(path: &Path) -> datatest_stable::Result<()> { } } "tick" => { - let timestamp = step.time.expect("tick step missing time"); + let timestamp_ms = step.time.expect("tick step missing time") * 1000; // NOTE: the has_proposal argument is set to false, following the spec - store::on_tick(&mut store, timestamp, false); + store::on_tick(&mut store, timestamp_ms, false, false); } other => { // Fail for unsupported step types for now @@ -279,14 +300,12 @@ fn validate_attestation_check( check: &types::AttestationCheck, step_idx: usize, ) -> datatest_stable::Result<()> { - use ethlambda_types::attestation::AttestationData; - let validator_id = check.validator; let location = check.location.as_str(); let attestations: HashMap = match location { - "new" => st.iter_new_attestations().collect(), - "known" => st.iter_known_attestations().collect(), + "new" => extract_attestations(st, st.iter_new_aggregated_payloads()), + "known" => extract_attestations(st, st.iter_known_aggregated_payloads()), other => { return Err( format!("Step {}: unknown attestation location: {}", step_idx, other).into(), @@ -366,7 +385,8 @@ fn validate_lexicographic_head_among( } let blocks = st.get_live_chain(); - let known_attestations: HashMap = st.iter_known_attestations().collect(); + let known_attestations: HashMap = + extract_attestations(st, st.iter_known_aggregated_payloads()); // Resolve all fork labels to roots and compute their weights // Map: label -> (root, slot, weight) diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index 40ec7aa..0c72bfe 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -49,8 +49,9 @@ fn run(path: &Path) -> datatest_stable::Result<()> { let signed_block: SignedBlockWithAttestation = test.signed_block_with_attestation.into(); // Advance time to the block's slot - let block_time = signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time; - store::on_tick(&mut st, block_time, true); + let block_time_ms = + (signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time) * 1000; + store::on_tick(&mut st, block_time_ms, true, false); // Process the block (this includes signature verification) let result = store::on_block(&mut st, signed_block); diff --git a/crates/common/types/src/attestation.rs b/crates/common/types/src/attestation.rs index 309901a..6390a2b 100644 --- a/crates/common/types/src/attestation.rs +++ b/crates/common/types/src/attestation.rs @@ -1,4 +1,5 @@ use crate::{ + block::AggregatedSignatureProof, primitives::ssz::{Decode, Encode, TreeHash}, signature::SignatureSize, state::{Checkpoint, ValidatorRegistryLimit}, @@ -61,3 +62,10 @@ pub struct AggregatedAttestation { /// A general-purpose bitfield for tracking which validators have participated /// in some collective action (attestation, signature aggregation, etc.). pub type AggregationBits = ssz_types::BitList; + +/// Aggregated attestation with its signature proof, used for gossip on the aggregation topic. +#[derive(Debug, Clone, Encode, Decode)] +pub struct SignedAggregatedAttestation { + pub data: AttestationData, + pub proof: AggregatedSignatureProof, +} diff --git a/crates/common/types/src/block.rs b/crates/common/types/src/block.rs index 776dbb1..facdc45 100644 --- a/crates/common/types/src/block.rs +++ b/crates/common/types/src/block.rs @@ -112,6 +112,13 @@ impl AggregatedSignatureProof { pub fn proof_data(&self) -> &ByteListMiB { &self.proof_data } + + /// Returns the validator indices that are set in the participants bitfield. + pub fn participant_indices(&self) -> impl Iterator + '_ { + (0..self.participants.len()) + .filter(|&i| self.participants.get(i).unwrap_or(false)) + .map(|i| i as u64) + } } /// Bitlist representing validator participation in an attestation or signature. diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index 9335270..a52b65f 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -1,6 +1,6 @@ use ethlambda_types::{ ShortRoot, - attestation::SignedAttestation, + attestation::{SignedAggregatedAttestation, SignedAttestation}, block::SignedBlockWithAttestation, primitives::ssz::{Decode, Encode, TreeHash}, }; @@ -9,7 +9,7 @@ use tracing::{error, info, trace}; use super::{ encoding::{compress_message, decompress_message}, - messages::{ATTESTATION_TOPIC_KIND, BLOCK_TOPIC_KIND}, + messages::{AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND}, }; use crate::P2PServer; @@ -22,7 +22,8 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { else { unreachable!("we already matched on event_loop"); }; - match message.topic.as_str().split("/").nth(3) { + let topic_kind = message.topic.as_str().split("/").nth(3); + match topic_kind { Some(BLOCK_TOPIC_KIND) => { let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped block")) @@ -50,7 +51,33 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { ); server.blockchain.notify_new_block(signed_block).await; } - Some(ATTESTATION_TOPIC_KIND) => { + Some(AGGREGATION_TOPIC_KIND) => { + let Ok(uncompressed_data) = decompress_message(&message.data) + .inspect_err(|err| error!(%err, "Failed to decompress gossipped aggregation")) + else { + return; + }; + + let Ok(aggregation) = SignedAggregatedAttestation::from_ssz_bytes(&uncompressed_data) + .inspect_err(|err| error!(?err, "Failed to decode gossipped aggregation")) + else { + return; + }; + let slot = aggregation.data.slot; + info!( + %slot, + target_slot = aggregation.data.target.slot, + target_root = %ShortRoot(&aggregation.data.target.root.0), + source_slot = aggregation.data.source.slot, + source_root = %ShortRoot(&aggregation.data.source.root.0), + "Received aggregated attestation from gossip" + ); + server + .blockchain + .notify_new_aggregated_attestation(aggregation) + .await; + } + Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => { let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped attestation")) else { @@ -95,7 +122,7 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte // Compress with raw snappy let compressed = compress_message(&ssz_bytes); - // Publish to gossipsub + // Publish to the attestation subnet topic let _ = server .swarm .behaviour_mut() @@ -148,3 +175,36 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlockWith |err| tracing::warn!(%slot, %proposer, %err, "Failed to publish block to gossipsub"), ); } + +pub async fn publish_aggregated_attestation( + server: &mut P2PServer, + attestation: SignedAggregatedAttestation, +) { + let slot = attestation.data.slot; + + // Encode to SSZ + let ssz_bytes = attestation.as_ssz_bytes(); + + // Compress with raw snappy + let compressed = compress_message(&ssz_bytes); + + // Publish to the aggregation topic + let _ = server + .swarm + .behaviour_mut() + .gossipsub + .publish(server.aggregation_topic.clone(), compressed) + .inspect(|_| { + info!( + %slot, + target_slot = attestation.data.target.slot, + target_root = %ShortRoot(&attestation.data.target.root.0), + source_slot = attestation.data.source.slot, + source_root = %ShortRoot(&attestation.data.source.root.0), + "Published aggregated attestation to gossipsub" + ) + }) + .inspect_err(|err| { + tracing::warn!(%slot, %err, "Failed to publish aggregated attestation to gossipsub") + }); +} diff --git a/crates/net/p2p/src/gossipsub/messages.rs b/crates/net/p2p/src/gossipsub/messages.rs index 7df4db1..2b31eb6 100644 --- a/crates/net/p2p/src/gossipsub/messages.rs +++ b/crates/net/p2p/src/gossipsub/messages.rs @@ -1,4 +1,10 @@ /// Topic kind for block gossip pub const BLOCK_TOPIC_KIND: &str = "block"; -/// Topic kind for attestation gossip -pub const ATTESTATION_TOPIC_KIND: &str = "attestation"; +/// Topic kind prefix for per-committee attestation subnets. +/// +/// Full topic format: `/leanconsensus/{network}/attestation_{subnet_id}/ssz_snappy` +pub const ATTESTATION_SUBNET_TOPIC_PREFIX: &str = "attestation"; +/// Topic kind for aggregated attestation gossip. +/// +/// Full topic format: `/leanconsensus/{network}/aggregation/ssz_snappy` +pub const AGGREGATION_TOPIC_KIND: &str = "aggregation"; diff --git a/crates/net/p2p/src/gossipsub/mod.rs b/crates/net/p2p/src/gossipsub/mod.rs index a66855e..a692428 100644 --- a/crates/net/p2p/src/gossipsub/mod.rs +++ b/crates/net/p2p/src/gossipsub/mod.rs @@ -2,5 +2,7 @@ mod encoding; mod handler; mod messages; -pub use handler::{handle_gossipsub_message, publish_attestation, publish_block}; -pub use messages::{ATTESTATION_TOPIC_KIND, BLOCK_TOPIC_KIND}; +pub use handler::{ + handle_gossipsub_message, publish_aggregated_attestation, publish_attestation, publish_block, +}; +pub use messages::{AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND}; diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index b83196f..130fdfa 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -4,7 +4,7 @@ use std::{ time::Duration, }; -use ethlambda_blockchain::{BlockChain, P2PMessage}; +use ethlambda_blockchain::{ATTESTATION_COMMITTEE_COUNT, BlockChain, P2PMessage}; use ethlambda_storage::Store; use ethlambda_types::primitives::H256; use ethrex_common::H264; @@ -24,7 +24,10 @@ use tokio::sync::mpsc; use tracing::{info, trace, warn}; use crate::{ - gossipsub::{ATTESTATION_TOPIC_KIND, BLOCK_TOPIC_KIND, publish_attestation, publish_block}, + gossipsub::{ + AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND, + publish_aggregated_attestation, publish_attestation, publish_block, + }, req_resp::{ BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status, fetch_block_from_peer, @@ -60,6 +63,7 @@ pub async fn start_p2p( blockchain: BlockChain, p2p_rx: mpsc::UnboundedReceiver, store: Store, + validator_id: Option, ) { let config = libp2p::gossipsub::ConfigBuilder::default() // d @@ -151,21 +155,44 @@ pub async fn start_p2p( .listen_on(addr) .expect("failed to bind gossipsub listening address"); - let network = "devnet0"; - let topic_kinds = [BLOCK_TOPIC_KIND, ATTESTATION_TOPIC_KIND]; - for topic_kind in topic_kinds { - let topic_str = format!("/leanconsensus/{network}/{topic_kind}/ssz_snappy"); - let topic = libp2p::gossipsub::IdentTopic::new(topic_str); - swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap(); - } + let network = "devnet3"; - // Create topics for outbound messages - let attestation_topic = libp2p::gossipsub::IdentTopic::new(format!( - "/leanconsensus/{network}/{ATTESTATION_TOPIC_KIND}/ssz_snappy" - )); - let block_topic = libp2p::gossipsub::IdentTopic::new(format!( - "/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy" - )); + // Subscribe to block topic (all nodes) + let block_topic_str = format!("/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy"); + let block_topic = libp2p::gossipsub::IdentTopic::new(block_topic_str); + swarm + .behaviour_mut() + .gossipsub + .subscribe(&block_topic) + .unwrap(); + + // Subscribe to aggregation topic (all validators) + let aggregation_topic_str = + format!("/leanconsensus/{network}/{AGGREGATION_TOPIC_KIND}/ssz_snappy"); + let aggregation_topic = libp2p::gossipsub::IdentTopic::new(aggregation_topic_str); + swarm + .behaviour_mut() + .gossipsub + .subscribe(&aggregation_topic) + .unwrap(); + + // Subscribe to attestation subnet topic (validators subscribe to their committee's subnet) + // ATTESTATION_COMMITTEE_COUNT is 1 for devnet-3 but will increase in future devnets. + #[allow(clippy::modulo_one)] + let subnet_id = validator_id.map(|vid| vid % ATTESTATION_COMMITTEE_COUNT); + let attestation_topic_kind = match subnet_id { + Some(id) => format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_{id}"), + // Non-validators subscribe to subnet 0 to receive attestations + None => format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_0"), + }; + let attestation_topic_str = + format!("/leanconsensus/{network}/{attestation_topic_kind}/ssz_snappy"); + let attestation_topic = libp2p::gossipsub::IdentTopic::new(attestation_topic_str); + swarm + .behaviour_mut() + .gossipsub + .subscribe(&attestation_topic) + .unwrap(); info!(socket=%listening_socket, "P2P node started"); @@ -178,6 +205,7 @@ pub async fn start_p2p( p2p_rx, attestation_topic, block_topic, + aggregation_topic, connected_peers: HashSet::new(), pending_requests: HashMap::new(), request_id_map: HashMap::new(), @@ -203,6 +231,7 @@ pub(crate) struct P2PServer { pub(crate) p2p_rx: mpsc::UnboundedReceiver, pub(crate) attestation_topic: libp2p::gossipsub::IdentTopic, pub(crate) block_topic: libp2p::gossipsub::IdentTopic, + pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic, pub(crate) connected_peers: HashSet, pub(crate) pending_requests: HashMap, pub(crate) request_id_map: HashMap, @@ -371,6 +400,9 @@ async fn handle_p2p_message(server: &mut P2PServer, message: P2PMessage) { P2PMessage::PublishBlock(signed_block) => { publish_block(server, signed_block).await; } + P2PMessage::PublishAggregatedAttestation(attestation) => { + publish_aggregated_attestation(server, attestation).await; + } P2PMessage::FetchBlock(root) => { // Deduplicate - if already pending, ignore if server.pending_requests.contains_key(&root) { diff --git a/crates/net/p2p/src/metrics.rs b/crates/net/p2p/src/metrics.rs index 9f24f99..00224d7 100644 --- a/crates/net/p2p/src/metrics.rs +++ b/crates/net/p2p/src/metrics.rs @@ -17,7 +17,7 @@ static NODE_NAME_REGISTRY: LazyLock>> = pub fn populate_name_registry(names_and_privkeys: HashMap) { let mut registry = NODE_NAME_REGISTRY.write().unwrap(); - let name_registry = names_and_privkeys + *registry = names_and_privkeys .into_iter() .filter_map(|(name, mut privkey)| { let Ok(privkey) = secp256k1::SecretKey::try_from_bytes(&mut privkey) else { @@ -31,7 +31,6 @@ pub fn populate_name_registry(names_and_privkeys: HashMap) { Some((peer_id, &*name.leak())) }) .collect(); - *registry = name_registry; } fn resolve(peer_id: &Option) -> &'static str { diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index a1b0021..ead5586 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -7,12 +7,13 @@ use tracing::{debug, trace}; use super::{ encoding::{decode_payload, write_payload}, messages::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, ErrorMessage, Request, Response, - ResponseCode, ResponsePayload, STATUS_PROTOCOL_V1, Status, + BLOCKS_BY_ROOT_PROTOCOL_V1, ErrorMessage, Request, Response, ResponseCode, ResponsePayload, + STATUS_PROTOCOL_V1, Status, }, }; use ethlambda_types::block::SignedBlockWithAttestation; +use ethlambda_types::primitives::ssz::Decode as SszDecode; #[derive(Debug, Clone, Default)] pub struct Codec; @@ -41,10 +42,9 @@ impl libp2p::request_response::Codec for Codec { Ok(Request::Status(status)) } BLOCKS_BY_ROOT_PROTOCOL_V1 => { - let request = - BlocksByRootRequest::from_ssz_bytes_compat(&payload).map_err(|err| { - io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) - })?; + let request = SszDecode::from_ssz_bytes(&payload).map_err(|err| { + io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) + })?; Ok(Request::BlocksByRoot(request)) } _ => Err(io::Error::new( diff --git a/crates/net/p2p/src/req_resp/messages.rs b/crates/net/p2p/src/req_resp/messages.rs index 3826f5b..607c918 100644 --- a/crates/net/p2p/src/req_resp/messages.rs +++ b/crates/net/p2p/src/req_resp/messages.rs @@ -2,7 +2,7 @@ use ethlambda_types::{ block::SignedBlockWithAttestation, primitives::{ H256, - ssz::{Decode, Decode as SszDecode, Encode}, + ssz::{Decode, Encode}, }, state::Checkpoint, }; @@ -139,55 +139,3 @@ pub fn error_message(msg: impl AsRef) -> ErrorMessage { pub struct BlocksByRootRequest { pub roots: RequestedBlockRoots, } - -impl BlocksByRootRequest { - /// Decode from SSZ bytes with backward compatibility. - /// - /// Tries to decode as new format (container with `roots` field) first. - /// Falls back to old format (transparent - direct list of roots) if that fails. - pub fn from_ssz_bytes_compat(bytes: &[u8]) -> Result { - // Try new format (container) first - SszDecode::from_ssz_bytes(bytes).or_else(|_| { - // Fall back to old format (transparent/direct list) - SszDecode::from_ssz_bytes(bytes).map(|roots| Self { roots }) - }) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use ssz::Encode as SszEncode; - - #[test] - fn test_blocks_by_root_backward_compatibility() { - // Create some test roots - let root1 = H256::from_slice(&[1u8; 32]); - let root2 = H256::from_slice(&[2u8; 32]); - let roots_list = - RequestedBlockRoots::new(vec![root1, root2]).expect("Failed to create roots list"); - - // Encode as old format (direct list, similar to transparent) - let old_format_bytes = roots_list.as_ssz_bytes(); - - // Encode as new format (container) - let new_request = BlocksByRootRequest { - roots: roots_list.clone(), - }; - let new_format_bytes = new_request.as_ssz_bytes(); - - // Both formats should decode successfully - let decoded_from_old = BlocksByRootRequest::from_ssz_bytes_compat(&old_format_bytes) - .expect("Failed to decode old format"); - let decoded_from_new = BlocksByRootRequest::from_ssz_bytes_compat(&new_format_bytes) - .expect("Failed to decode new format"); - - // Both should have the same roots - assert_eq!(decoded_from_old.roots.len(), 2); - assert_eq!(decoded_from_new.roots.len(), 2); - assert_eq!(decoded_from_old.roots[0], root1); - assert_eq!(decoded_from_old.roots[1], root2); - assert_eq!(decoded_from_new.roots[0], root1); - assert_eq!(decoded_from_new.roots[1], root2); - } -} diff --git a/crates/storage/src/api/tables.rs b/crates/storage/src/api/tables.rs index deecdad..7b184d5 100644 --- a/crates/storage/src/api/tables.rs +++ b/crates/storage/src/api/tables.rs @@ -12,14 +12,16 @@ pub enum Table { BlockSignatures, /// State storage: H256 -> State States, - /// Known attestations: u64 -> AttestationData - LatestKnownAttestations, - /// Pending attestations: u64 -> AttestationData - LatestNewAttestations, /// Gossip signatures: SignatureKey -> ValidatorSignature GossipSignatures, - /// Aggregated proofs: SignatureKey -> Vec - AggregatedPayloads, + /// Attestation data indexed by tree hash root: H256 -> AttestationData + AttestationDataByRoot, + /// Pending aggregated payloads (not yet active in fork choice): + /// SignatureKey -> Vec + LatestNewAggregatedPayloads, + /// Active aggregated payloads (counted in fork choice): + /// SignatureKey -> Vec + LatestKnownAggregatedPayloads, /// Metadata: string keys -> various scalar values Metadata, /// Live chain index: (slot || root) -> parent_root @@ -36,10 +38,10 @@ pub const ALL_TABLES: [Table; 10] = [ Table::BlockBodies, Table::BlockSignatures, Table::States, - Table::LatestKnownAttestations, - Table::LatestNewAttestations, Table::GossipSignatures, - Table::AggregatedPayloads, + Table::AttestationDataByRoot, + Table::LatestNewAggregatedPayloads, + Table::LatestKnownAggregatedPayloads, Table::Metadata, Table::LiveChain, ]; diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs index c25b128..45565f1 100644 --- a/crates/storage/src/backend/rocksdb.rs +++ b/crates/storage/src/backend/rocksdb.rs @@ -16,10 +16,10 @@ fn cf_name(table: Table) -> &'static str { Table::BlockBodies => "block_bodies", Table::BlockSignatures => "block_signatures", Table::States => "states", - Table::LatestKnownAttestations => "latest_known_attestations", - Table::LatestNewAttestations => "latest_new_attestations", Table::GossipSignatures => "gossip_signatures", - Table::AggregatedPayloads => "aggregated_payloads", + Table::AttestationDataByRoot => "attestation_data_by_root", + Table::LatestNewAggregatedPayloads => "latest_new_aggregated_payloads", + Table::LatestKnownAggregatedPayloads => "latest_known_aggregated_payloads", Table::Metadata => "metadata", Table::LiveChain => "live_chain", } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index b6ef9ca..06d6d4f 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -13,8 +13,8 @@ use crate::types::{StoredAggregatedPayload, StoredSignature}; use ethlambda_types::{ attestation::AttestationData, block::{ - AggregatedSignatureProof, Block, BlockBody, BlockHeader, BlockSignaturesWithAttestation, - BlockWithAttestation, SignedBlockWithAttestation, + Block, BlockBody, BlockHeader, BlockSignaturesWithAttestation, BlockWithAttestation, + SignedBlockWithAttestation, }, primitives::{ H256, @@ -288,7 +288,11 @@ impl Store { // ============ Time ============ - /// Returns the current store time in seconds since genesis. + /// Returns the current store time in interval counts since genesis. + /// + /// Each increment represents one 800ms interval. Derive slot/interval as: + /// slot = time() / INTERVALS_PER_SLOT + /// interval = time() % INTERVALS_PER_SLOT pub fn time(&self) -> u64 { self.get_metadata(KEY_TIME) } @@ -369,13 +373,18 @@ impl Store { { self.prune_live_chain(finalized.slot); - // Prune signatures and payloads for finalized slots + // Prune signatures, payloads, and attestation data for finalized slots let pruned_sigs = self.prune_gossip_signatures(finalized.slot); - let pruned_payloads = self.prune_aggregated_payloads(finalized.slot); - if pruned_sigs > 0 || pruned_payloads > 0 { + let pruned_att_data = self.prune_attestation_data_by_root(finalized.slot); + self.prune_aggregated_payload_table(Table::LatestNewAggregatedPayloads, finalized.slot); + self.prune_aggregated_payload_table( + Table::LatestKnownAggregatedPayloads, + finalized.slot, + ); + if pruned_sigs > 0 || pruned_att_data > 0 { info!( finalized_slot = finalized.slot, - pruned_sigs, pruned_payloads, "Pruned finalized signatures" + pruned_sigs, pruned_att_data, "Pruned finalized signatures" ); } } @@ -478,24 +487,51 @@ impl Store { count } - /// Prune aggregated payloads for slots <= finalized_slot. + /// Prune attestation data by root for slots <= finalized_slot. /// - /// Returns the number of payloads pruned. - pub fn prune_aggregated_payloads(&mut self, finalized_slot: u64) -> usize { + /// Returns the number of entries pruned. + pub fn prune_attestation_data_by_root(&mut self, finalized_slot: u64) -> usize { + let view = self.backend.begin_read().expect("read view"); + let mut to_delete = vec![]; + + for (key_bytes, value_bytes) in view + .prefix_iterator(Table::AttestationDataByRoot, &[]) + .expect("iter") + .filter_map(|r| r.ok()) + { + if let Ok(data) = AttestationData::from_ssz_bytes(&value_bytes) + && data.slot <= finalized_slot + { + to_delete.push(key_bytes.to_vec()); + } + } + drop(view); + + let count = to_delete.len(); + if !to_delete.is_empty() { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .delete_batch(Table::AttestationDataByRoot, to_delete) + .expect("delete"); + batch.commit().expect("commit"); + } + count + } + + /// Prune an aggregated payload table (new or known) for slots <= finalized_slot. + fn prune_aggregated_payload_table(&mut self, table: Table, finalized_slot: u64) { let view = self.backend.begin_read().expect("read view"); let mut updates = vec![]; let mut deletes = vec![]; - let mut removed_count = 0; for (key_bytes, value_bytes) in view - .prefix_iterator(Table::AggregatedPayloads, &[]) + .prefix_iterator(table, &[]) .expect("iter") .filter_map(|r| r.ok()) { if let Ok(mut payloads) = Vec::::from_ssz_bytes(&value_bytes) { let original_len = payloads.len(); payloads.retain(|p| p.slot > finalized_slot); - removed_count += original_len - payloads.len(); if payloads.is_empty() { deletes.push(key_bytes.to_vec()); @@ -509,18 +545,13 @@ impl Store { if !updates.is_empty() || !deletes.is_empty() { let mut batch = self.backend.begin_write().expect("write batch"); if !updates.is_empty() { - batch - .put_batch(Table::AggregatedPayloads, updates) - .expect("put"); + batch.put_batch(table, updates).expect("put"); } if !deletes.is_empty() { - batch - .delete_batch(Table::AggregatedPayloads, deletes) - .expect("delete"); + batch.delete_batch(table, deletes).expect("delete"); } batch.commit().expect("commit"); } - removed_count } /// Get the block header by root. @@ -642,124 +673,185 @@ impl Store { batch.commit().expect("commit"); } - // ============ Known Attestations ============ + // ============ Attestation Data By Root ============ // - // "Known" attestations are included in fork choice weight calculations. - // They're promoted from "new" attestations at specific intervals. + // Content-addressed attestation data storage. Used to reconstruct + // per-validator attestation maps from aggregated payloads. - /// Iterates over all known attestations (validator_id, attestation_data). - pub fn iter_known_attestations(&self) -> impl Iterator + '_ { + /// Stores attestation data indexed by its tree hash root. + pub fn insert_attestation_data_by_root(&mut self, root: H256, data: AttestationData) { + let mut batch = self.backend.begin_write().expect("write batch"); + let entries = vec![(root.as_ssz_bytes(), data.as_ssz_bytes())]; + batch + .put_batch(Table::AttestationDataByRoot, entries) + .expect("put attestation data"); + batch.commit().expect("commit"); + } + + /// Returns attestation data for the given root hash. + pub fn get_attestation_data_by_root(&self, root: &H256) -> Option { + let view = self.backend.begin_read().expect("read view"); + view.get(Table::AttestationDataByRoot, &root.as_ssz_bytes()) + .expect("get") + .map(|bytes| AttestationData::from_ssz_bytes(&bytes).expect("valid attestation data")) + } + + // ============ Known Aggregated Payloads ============ + // + // "Known" aggregated payloads are active in fork choice weight calculations. + // Promoted from "new" payloads at specific intervals (0 with proposal, 4). + + /// Iterates over all known aggregated payloads. + pub fn iter_known_aggregated_payloads( + &self, + ) -> impl Iterator)> + '_ { let view = self.backend.begin_read().expect("read view"); let entries: Vec<_> = view - .prefix_iterator(Table::LatestKnownAttestations, &[]) + .prefix_iterator(Table::LatestKnownAggregatedPayloads, &[]) .expect("iterator") .filter_map(|res| res.ok()) .map(|(k, v)| { - let validator_id = u64::from_ssz_bytes(&k).expect("valid validator_id"); - let data = AttestationData::from_ssz_bytes(&v).expect("valid attestation data"); - (validator_id, data) + let key = decode_signature_key(&k); + let payloads = + Vec::::from_ssz_bytes(&v).expect("valid payloads"); + (key, payloads) }) .collect(); entries.into_iter() } - /// Returns a validator's latest known attestation. - pub fn get_known_attestation(&self, validator_id: &u64) -> Option { + /// Insert an aggregated payload into the known (fork-choice-active) table. + pub fn insert_known_aggregated_payload( + &mut self, + key: SignatureKey, + payload: StoredAggregatedPayload, + ) { + let encoded_key = encode_signature_key(&key); let view = self.backend.begin_read().expect("read view"); - view.get(Table::LatestKnownAttestations, &validator_id.as_ssz_bytes()) + let mut payloads: Vec = view + .get(Table::LatestKnownAggregatedPayloads, &encoded_key) .expect("get") - .map(|bytes| AttestationData::from_ssz_bytes(&bytes).expect("valid attestation data")) - } + .map(|bytes| Vec::::from_ssz_bytes(&bytes).expect("valid")) + .unwrap_or_default(); + drop(view); + + payloads.push(payload); - /// Stores a validator's latest known attestation. - pub fn insert_known_attestation(&mut self, validator_id: u64, data: AttestationData) { let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(validator_id.as_ssz_bytes(), data.as_ssz_bytes())]; + let entries = vec![(encoded_key, payloads.as_ssz_bytes())]; batch - .put_batch(Table::LatestKnownAttestations, entries) - .expect("put attestation"); + .put_batch(Table::LatestKnownAggregatedPayloads, entries) + .expect("put known aggregated payload"); batch.commit().expect("commit"); } - // ============ New Attestations ============ + // ============ New Aggregated Payloads ============ // - // "New" attestations are pending attestations not yet included in fork choice. - // They're promoted to "known" via `promote_new_attestations`. + // "New" aggregated payloads are pending — not yet counted in fork choice. + // Promoted to "known" via `promote_new_aggregated_payloads`. - /// Iterates over all new (pending) attestations. - pub fn iter_new_attestations(&self) -> impl Iterator + '_ { + /// Iterates over all new (pending) aggregated payloads. + pub fn iter_new_aggregated_payloads( + &self, + ) -> impl Iterator)> + '_ { let view = self.backend.begin_read().expect("read view"); let entries: Vec<_> = view - .prefix_iterator(Table::LatestNewAttestations, &[]) + .prefix_iterator(Table::LatestNewAggregatedPayloads, &[]) .expect("iterator") .filter_map(|res| res.ok()) .map(|(k, v)| { - let validator_id = u64::from_ssz_bytes(&k).expect("valid validator_id"); - let data = AttestationData::from_ssz_bytes(&v).expect("valid attestation data"); - (validator_id, data) + let key = decode_signature_key(&k); + let payloads = + Vec::::from_ssz_bytes(&v).expect("valid payloads"); + (key, payloads) }) .collect(); entries.into_iter() } - /// Returns a validator's latest new (pending) attestation. - pub fn get_new_attestation(&self, validator_id: &u64) -> Option { + /// Insert an aggregated payload into the new (pending) table. + pub fn insert_new_aggregated_payload( + &mut self, + key: SignatureKey, + payload: StoredAggregatedPayload, + ) { + let encoded_key = encode_signature_key(&key); let view = self.backend.begin_read().expect("read view"); - view.get(Table::LatestNewAttestations, &validator_id.as_ssz_bytes()) + let mut payloads: Vec = view + .get(Table::LatestNewAggregatedPayloads, &encoded_key) .expect("get") - .map(|bytes| AttestationData::from_ssz_bytes(&bytes).expect("valid attestation data")) - } + .map(|bytes| Vec::::from_ssz_bytes(&bytes).expect("valid")) + .unwrap_or_default(); + drop(view); - /// Stores a validator's new (pending) attestation. - pub fn insert_new_attestation(&mut self, validator_id: u64, data: AttestationData) { - let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(validator_id.as_ssz_bytes(), data.as_ssz_bytes())]; - batch - .put_batch(Table::LatestNewAttestations, entries) - .expect("put attestation"); - batch.commit().expect("commit"); - } + payloads.push(payload); - /// Removes a validator's new (pending) attestation. - pub fn remove_new_attestation(&mut self, validator_id: &u64) { let mut batch = self.backend.begin_write().expect("write batch"); + let entries = vec![(encoded_key, payloads.as_ssz_bytes())]; batch - .delete_batch( - Table::LatestNewAttestations, - vec![validator_id.as_ssz_bytes()], - ) - .expect("delete attestation"); + .put_batch(Table::LatestNewAggregatedPayloads, entries) + .expect("put new aggregated payload"); batch.commit().expect("commit"); } - /// Promotes all new attestations to known attestations. + /// Promotes all new aggregated payloads to known, making them active in fork choice. /// - /// Takes all attestations from `latest_new_attestations` and moves them - /// to `latest_known_attestations`, making them count for fork choice. - pub fn promote_new_attestations(&mut self) { - // Read all new attestations + /// Merges entries from `LatestNewAggregatedPayloads` into `LatestKnownAggregatedPayloads`, + /// appending to existing payload lists rather than overwriting them. + pub fn promote_new_aggregated_payloads(&mut self) { let view = self.backend.begin_read().expect("read view"); - let new_attestations: Vec<(Vec, Vec)> = view - .prefix_iterator(Table::LatestNewAttestations, &[]) + let new_entries: Vec<(Vec, Vec)> = view + .prefix_iterator(Table::LatestNewAggregatedPayloads, &[]) .expect("iterator") .filter_map(|res| res.ok()) .map(|(k, v)| (k.to_vec(), v.to_vec())) .collect(); - drop(view); - if new_attestations.is_empty() { + if new_entries.is_empty() { + drop(view); return; } - // Delete from new and insert to known in a single batch + // Merge new payloads with existing known payloads + let merged: Vec<(Vec, Vec)> = new_entries + .iter() + .map(|(key, new_bytes)| { + let new_payloads = + Vec::::from_ssz_bytes(new_bytes).expect("valid"); + let mut known_payloads: Vec = view + .get(Table::LatestKnownAggregatedPayloads, key) + .expect("get") + .map(|bytes| { + Vec::::from_ssz_bytes(&bytes).expect("valid") + }) + .unwrap_or_default(); + known_payloads.extend(new_payloads); + (key.clone(), known_payloads.as_ssz_bytes()) + }) + .collect(); + drop(view); + + let keys_to_delete: Vec<_> = new_entries.into_iter().map(|(k, _)| k).collect(); let mut batch = self.backend.begin_write().expect("write batch"); - let keys_to_delete: Vec<_> = new_attestations.iter().map(|(k, _)| k.clone()).collect(); batch - .delete_batch(Table::LatestNewAttestations, keys_to_delete) - .expect("delete new attestations"); + .delete_batch(Table::LatestNewAggregatedPayloads, keys_to_delete) + .expect("delete new aggregated payloads"); + batch + .put_batch(Table::LatestKnownAggregatedPayloads, merged) + .expect("put known aggregated payloads"); + batch.commit().expect("commit"); + } + + /// Delete specific gossip signatures by key. + pub fn delete_gossip_signatures(&mut self, keys: &[SignatureKey]) { + if keys.is_empty() { + return; + } + let encoded_keys: Vec<_> = keys.iter().map(encode_signature_key).collect(); + let mut batch = self.backend.begin_write().expect("write batch"); batch - .put_batch(Table::LatestKnownAttestations, new_attestations) - .expect("put known attestations"); + .delete_batch(Table::GossipSignatures, encoded_keys) + .expect("delete gossip signatures"); batch.commit().expect("commit"); } @@ -807,76 +899,6 @@ impl Store { batch.commit().expect("commit"); } - // ============ Aggregated Payloads ============ - // - // Aggregated payloads are leanVM proofs combining multiple signatures. - // Used to verify block signatures efficiently. - - /// Iterates over all aggregated signature payloads. - pub fn iter_aggregated_payloads( - &self, - ) -> impl Iterator)> + '_ { - let view = self.backend.begin_read().expect("read view"); - let entries: Vec<_> = view - .prefix_iterator(Table::AggregatedPayloads, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(k, v)| { - let key = decode_signature_key(&k); - let payloads = - Vec::::from_ssz_bytes(&v).expect("valid payloads"); - (key, payloads) - }) - .collect(); - entries.into_iter() - } - - /// Returns aggregated payloads for a signature key. - fn get_aggregated_payloads(&self, key: &SignatureKey) -> Option> { - let view = self.backend.begin_read().expect("read view"); - view.get(Table::AggregatedPayloads, &encode_signature_key(key)) - .expect("get") - .map(|bytes| { - Vec::::from_ssz_bytes(&bytes).expect("valid payloads") - }) - } - - /// Insert an aggregated signature proof for a validator's attestation. - /// - /// Multiple proofs can be stored for the same (validator, attestation_data) pair, - /// each with its own slot metadata for pruning. - /// - /// # Thread Safety - /// - /// This method uses a read-modify-write pattern that is NOT atomic: - /// 1. Read existing payloads - /// 2. Append new payload - /// 3. Write back - /// - /// Concurrent calls could result in lost updates. This method MUST be called - /// from a single thread. In our case, that thread is the `BlockChain` `GenServer` - pub fn insert_aggregated_payload( - &mut self, - attestation_data: &AttestationData, - validator_id: u64, - proof: AggregatedSignatureProof, - ) { - let slot = attestation_data.slot; - let data_root = attestation_data.tree_hash_root(); - let key = (validator_id, data_root); - - // Read existing, add new, write back (NOT atomic - requires single-threaded access) - let mut payloads = self.get_aggregated_payloads(&key).unwrap_or_default(); - payloads.push(StoredAggregatedPayload { slot, proof }); - - let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(encode_signature_key(&key), payloads.as_ssz_bytes())]; - batch - .put_batch(Table::AggregatedPayloads, entries) - .expect("put proofs"); - batch.commit().expect("commit"); - } - // ============ Derived Accessors ============ /// Returns the slot of the current safe target block.