Skip to content
7 changes: 7 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,13 @@ cargo test -p ethlambda-blockchain --features skip-signature-verification --test

## Common Gotchas

### Aggregator Flag Required for Finalization
- At least one node **must** be started with `--is-aggregator` to finalize blocks in production (without `skip-signature-verification`)
- Without this flag, attestations pass signature verification and are logged as "Attestation processed", but the signature is never stored for aggregation (`store.rs:368`), so blocks are always built with `attestation_count=0`
- The attestation pipeline: gossip → verify signature → store gossip signature (only if `is_aggregator`) → aggregate at interval 2 → promote to known → pack into blocks
- With `skip-signature-verification` (tests only), attestations bypass aggregation and go directly to `new_aggregated_payloads`, so the flag is not needed
- **Symptom**: `justified_slot=0` and `finalized_slot=0` indefinitely despite healthy block production and attestation gossip

### Signature Verification
- Tests require `skip-signature-verification` feature for performance
- Crypto tests marked `#[ignore]` (slow leanVM operations)
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ docker-build: ## 🐳 Build the Docker image
-t ghcr.io/lambdaclass/ethlambda:$(DOCKER_TAG) .
@echo

LEAN_SPEC_COMMIT_HASH:=4edcf7bc9271e6a70ded8aff17710d68beac4266
LEAN_SPEC_COMMIT_HASH:=b39472e73f8a7d603cc13d14426eed14c6eff6f1

leanSpec:
git clone https://github.com/leanEthereum/leanSpec.git --single-branch
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ make run-devnet
This generates fresh genesis files and starts all configured clients with metrics enabled.
Press `Ctrl+C` to stop all nodes.

> **Important:** When running nodes manually (outside `make run-devnet`), at least one node must be started with `--is-aggregator` for attestations to be aggregated and included in blocks. Without this flag, the network will produce blocks but never finalize.

For custom devnet configurations, go to `lean-quickstart/local-devnet/genesis/validator-config.yaml` and edit the file before running the command above. See `lean-quickstart`'s documentation for more details on how to configure the devnet.

## Philosophy
Expand Down
9 changes: 8 additions & 1 deletion bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ struct CliOptions {
/// When set, skips genesis initialization and syncs from checkpoint.
#[arg(long)]
checkpoint_sync_url: Option<String>,
/// Whether this node acts as a committee aggregator
#[arg(long, default_value = "false")]
is_aggregator: bool,
}

#[tokio::main]
Expand Down Expand Up @@ -114,7 +117,10 @@ async fn main() -> eyre::Result<()> {
.inspect_err(|err| error!(%err, "Failed to initialize state"))?;

let (p2p_tx, p2p_rx) = tokio::sync::mpsc::unbounded_channel();
let blockchain = BlockChain::spawn(store.clone(), p2p_tx, validator_keys);
// Use first validator ID for subnet subscription
let first_validator_id = validator_keys.keys().min().copied();
let blockchain =
BlockChain::spawn(store.clone(), p2p_tx, validator_keys, options.is_aggregator);

let p2p_handle = tokio::spawn(start_p2p(
node_p2p_key,
Expand All @@ -123,6 +129,7 @@ async fn main() -> eyre::Result<()> {
blockchain,
p2p_rx,
store.clone(),
first_validator_id,
));

ethlambda_rpc::start_rpc_server(metrics_socket, store)
Expand Down
76 changes: 60 additions & 16 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ethlambda_state_transition::is_proposer;
use ethlambda_storage::Store;
use ethlambda_types::{
ShortRoot,
attestation::{Attestation, AttestationData, SignedAttestation},
attestation::{Attestation, AttestationData, SignedAggregatedAttestation, SignedAttestation},
block::{BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation},
primitives::{H256, ssz::TreeHash},
signature::ValidatorSecretKey,
Expand All @@ -30,6 +30,8 @@ pub enum P2PMessage {
PublishAttestation(SignedAttestation),
/// Publish a block to the gossip network.
PublishBlock(SignedBlockWithAttestation),
/// Publish an aggregated attestation to the gossip network.
PublishAggregatedAttestation(SignedAggregatedAttestation),
/// Fetch a block by its root hash.
FetchBlock(H256),
}
Expand All @@ -38,14 +40,23 @@ pub struct BlockChain {
handle: GenServerHandle<BlockChainServer>,
}

/// Seconds in a slot. Each slot has 4 intervals of 1 second each.
/// Seconds in a slot.
pub const SECONDS_PER_SLOT: u64 = 4;
/// Milliseconds in a slot.
pub const MILLISECONDS_PER_SLOT: u64 = 4_000;
/// Milliseconds per interval (800ms ticks).
pub const MILLISECONDS_PER_INTERVAL: u64 = 800;
/// Number of intervals per slot (5 intervals of 800ms = 4 seconds).
pub const INTERVALS_PER_SLOT: u64 = 5;
/// Number of attestation committees per slot.
pub const ATTESTATION_COMMITTEE_COUNT: u64 = 1;

impl BlockChain {
pub fn spawn(
store: Store,
p2p_tx: mpsc::UnboundedSender<P2PMessage>,
validator_keys: HashMap<u64, ValidatorSecretKey>,
is_aggregator: bool,
) -> BlockChain {
let genesis_time = store.config().genesis_time;
let key_manager = key_manager::KeyManager::new(validator_keys);
Expand All @@ -54,6 +65,7 @@ impl BlockChain {
p2p_tx,
key_manager,
pending_blocks: HashMap::new(),
is_aggregator,
pending_block_parents: HashMap::new(),
}
.start();
Expand Down Expand Up @@ -85,6 +97,20 @@ impl BlockChain {
.await
.inspect_err(|err| error!(%err, "Failed to notify BlockChain of new attestation"));
}

/// Sends an aggregated attestation to the BlockChain for processing.
pub async fn notify_new_aggregated_attestation(
&mut self,
attestation: SignedAggregatedAttestation,
) {
let _ = self
.handle
.cast(CastMessage::NewAggregatedAttestation(attestation))
.await
.inspect_err(
|err| error!(%err, "Failed to notify BlockChain of new aggregated attestation"),
);
}
}

/// GenServer that sequences all blockchain updates.
Expand All @@ -104,16 +130,19 @@ struct BlockChainServer {
// chain at lookup time, since a cached ancestor may itself have become pending with
// a deeper missing parent after the entry was created.
pending_block_parents: HashMap<H256, H256>,

/// Whether this node acts as a committee aggregator.
is_aggregator: bool,
}

impl BlockChainServer {
fn on_tick(&mut self, timestamp: u64) {
let genesis_time = self.store.config().genesis_time;
fn on_tick(&mut self, timestamp_ms: u64) {
let genesis_time_ms = self.store.config().genesis_time * 1000;

// Calculate current slot and interval
let time_since_genesis = timestamp.saturating_sub(genesis_time);
let slot = time_since_genesis / SECONDS_PER_SLOT;
let interval = time_since_genesis % SECONDS_PER_SLOT;
// Calculate current slot and interval from milliseconds
let time_since_genesis_ms = timestamp_ms.saturating_sub(genesis_time_ms);
let slot = time_since_genesis_ms / MILLISECONDS_PER_SLOT;
let interval = (time_since_genesis_ms % MILLISECONDS_PER_SLOT) / MILLISECONDS_PER_INTERVAL;

// Update current slot metric
metrics::update_current_slot(slot);
Expand All @@ -126,7 +155,12 @@ impl BlockChainServer {
.flatten();

// Tick the store first - this accepts attestations at interval 0 if we have a proposal
store::on_tick(&mut self.store, timestamp, proposer_validator_id.is_some());
store::on_tick(
&mut self.store,
timestamp_ms,
proposer_validator_id.is_some(),
self.is_aggregator,
);

// Now build and publish the block (after attestations have been accepted)
if let Some(validator_id) = proposer_validator_id {
Expand All @@ -138,7 +172,7 @@ impl BlockChainServer {
self.produce_attestations(slot);
}

// Update safe target slot metric (updated by store.on_tick at interval 2)
// Update safe target slot metric (updated by store.on_tick at interval 3)
metrics::update_safe_target_slot(self.store.safe_target_slot());
}

Expand Down Expand Up @@ -374,15 +408,21 @@ impl BlockChainServer {
}

fn on_gossip_attestation(&mut self, attestation: SignedAttestation) {
let _ = store::on_gossip_attestation(&mut self.store, attestation)
let _ = store::on_gossip_attestation(&mut self.store, attestation, self.is_aggregator)
.inspect_err(|err| warn!(%err, "Failed to process gossiped attestation"));
}

fn on_gossip_aggregated_attestation(&mut self, attestation: SignedAggregatedAttestation) {
let _ = store::on_gossip_aggregated_attestation(&mut self.store, attestation)
.inspect_err(|err| warn!(%err, "Failed to process gossiped aggregated attestation"));
}
}

#[derive(Clone, Debug)]
enum CastMessage {
NewBlock(SignedBlockWithAttestation),
NewAttestation(SignedAttestation),
NewAggregatedAttestation(SignedAggregatedAttestation),
Tick,
}

Expand Down Expand Up @@ -413,12 +453,13 @@ impl GenServer for BlockChainServer {
let timestamp = SystemTime::UNIX_EPOCH
.elapsed()
.expect("already past the unix epoch");
self.on_tick(timestamp.as_secs());
// Schedule the next tick at the start of the next second
let millis_to_next_sec =
((timestamp.as_secs() as u128 + 1) * 1000 - timestamp.as_millis()) as u64;
self.on_tick(timestamp.as_millis() as u64);
// Schedule the next tick at the next 800ms interval boundary
let ms_since_epoch = timestamp.as_millis() as u64;
let ms_to_next_interval =
MILLISECONDS_PER_INTERVAL - (ms_since_epoch % MILLISECONDS_PER_INTERVAL);
send_after(
Duration::from_millis(millis_to_next_sec),
Duration::from_millis(ms_to_next_interval),
handle.clone(),
message,
);
Expand All @@ -427,6 +468,9 @@ impl GenServer for BlockChainServer {
self.on_block(signed_block);
}
CastMessage::NewAttestation(attestation) => self.on_gossip_attestation(attestation),
CastMessage::NewAggregatedAttestation(attestation) => {
self.on_gossip_aggregated_attestation(attestation);
}
}
CastResponse::NoReply
}
Expand Down
Loading