diff --git a/Cargo.lock b/Cargo.lock index 2275275a..261b8039 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -104,9 +104,9 @@ dependencies = [ [[package]] name = "alloy-chains" -version = "0.2.31" +version = "0.2.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d9d22005bf31b018f31ef9ecadb5d2c39cf4f6acc8db0456f72c815f3d7f757" +checksum = "9247f0a399ef71aeb68f497b2b8fb348014f742b50d3b83b1e00dfe1b7d64b3d" dependencies = [ "alloy-primitives", "num_enum", @@ -202,7 +202,7 @@ dependencies = [ "itoa", "serde", "serde_json", - "winnow", + "winnow 0.7.15", ] [[package]] @@ -615,7 +615,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6df77fea9d6a2a75c0ef8d2acbdfd92286cc599983d3175ccdc170d3433d249" dependencies = [ "serde", - "winnow", + "winnow 0.7.15", ] [[package]] @@ -5513,11 +5513,13 @@ name = "pluto-core" version = "1.7.1" dependencies = [ "alloy", + "async-trait", "base64 0.22.1", "built", "cancellation", "chrono", "crossbeam", + "futures", "hex", "libp2p", "pluto-build-proto", @@ -5533,6 +5535,8 @@ dependencies = [ "test-case", "thiserror 2.0.18", "tokio", + "tokio-util", + "tracing", "tree_hash", ] @@ -5862,7 +5866,7 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e67ba7e9b2b56446f1d419b1d807906278ffa1a658a8a5d8a39dcb1f5a78614f" dependencies = [ - "toml_edit 0.25.4+spec-1.1.0", + "toml_edit 0.25.5+spec-1.1.0", ] [[package]] @@ -7669,9 +7673,9 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "1.0.0+spec-1.1.0" +version = "1.0.1+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32c2555c699578a4f59f0cc68e5116c8d7cabbd45e1409b989d4be085b53f13e" +checksum = "9b320e741db58cac564e26c607d3cc1fdc4a88fd36c879568c07856ed83ff3e9" dependencies = [ "serde_core", ] @@ -7687,28 +7691,28 @@ dependencies = [ "serde_spanned", "toml_datetime 0.6.11", "toml_write", - "winnow", + "winnow 0.7.15", ] [[package]] name = "toml_edit" -version = "0.25.4+spec-1.1.0" +version = "0.25.5+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7193cbd0ce53dc966037f54351dbbcf0d5a642c7f0038c382ef9e677ce8c13f2" +checksum = "8ca1a40644a28bce036923f6a431df0b34236949d111cc07cb6dca830c9ef2e1" dependencies = [ "indexmap 2.13.0", - "toml_datetime 1.0.0+spec-1.1.0", + "toml_datetime 1.0.1+spec-1.1.0", "toml_parser", - "winnow", + "winnow 1.0.0", ] [[package]] name = "toml_parser" -version = "1.0.9+spec-1.1.0" +version = "1.0.10+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "702d4415e08923e7e1ef96cd5727c0dfed80b4d2fa25db9647fe5eb6f7c5a4c4" +checksum = "7df25b4befd31c4816df190124375d5a20c6b6921e2cad937316de3fccd63420" dependencies = [ - "winnow", + "winnow 1.0.0", ] [[package]] @@ -8931,6 +8935,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "winnow" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a90e88e4667264a994d34e6d1ab2d26d398dcdca8b7f52bec8668957517fc7d8" +dependencies = [ + "memchr", +] + [[package]] name = "winreg" version = "0.50.0" diff --git a/Cargo.toml b/Cargo.toml index 93eddca0..6c453495 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ license = "BUSL-1.1" publish = false [workspace.dependencies] +async-trait = "0.1.89" alloy = { version = "1.3", features = ["essentials"] } built = { version = "0.8.0", features = ["git2", "chrono", "cargo-lock"] } blst = "0.3" diff --git a/crates/app/src/deadline/mod.rs b/crates/app/src/deadline/mod.rs deleted file mode 100644 index a8c39809..00000000 --- a/crates/app/src/deadline/mod.rs +++ /dev/null @@ -1,53 +0,0 @@ -use pluto_core::types::{Duty, DutyType}; -use pluto_eth2api::{EthBeaconNodeApiClient, EthBeaconNodeApiClientError}; - -/// Defines the fraction of the slot duration to use as a margin. -/// This is to consider network delays and other factors that may affect the -/// timing. -pub const MARGIN_FACTOR: u32 = 12; - -/// A function that returns the deadline for a duty. -pub type DeadlineFunc = Box Option> + Send + Sync>; - -/// Error type for deadline-related operations. -#[derive(Debug, thiserror::Error)] -pub enum DeadlineError { - /// Beacon client API error. - #[error("Beacon client error: {0}")] - BeaconClientError(#[from] EthBeaconNodeApiClientError), -} - -type Result = std::result::Result; - -/// Create a function that provides duty deadline or [`None`] if the duty never -/// deadlines. -pub async fn new_duty_deadline_func(eth2_cl: &EthBeaconNodeApiClient) -> Result { - let genesis_time = eth2_cl.fetch_genesis_time().await?; - let (slot_duration, _) = eth2_cl.fetch_slots_config().await?; - - #[allow( - clippy::arithmetic_side_effects, - reason = "Matches original implementation" - )] - Ok(Box::new(move |duty: Duty| match duty.duty_type { - DutyType::Exit | DutyType::BuilderRegistration => None, - _ => { - #[allow( - clippy::cast_possible_truncation, - reason = "TODO: unsupported operation in u64" - )] - let start = genesis_time + (slot_duration * (u64::from(duty.slot)) as u32); - let margin = slot_duration / MARGIN_FACTOR; - - let duration = match duty.duty_type { - DutyType::Proposer | DutyType::Randao => slot_duration / 3, - DutyType::SyncMessage => 2 * slot_duration / 3, - DutyType::Attester | DutyType::Aggregator | DutyType::PrepareAggregator => { - 2 * slot_duration - } - _ => slot_duration, - }; - Some(start + duration + margin) - } - })) -} diff --git a/crates/app/src/lib.rs b/crates/app/src/lib.rs index e0634ce2..d6cf9287 100644 --- a/crates/app/src/lib.rs +++ b/crates/app/src/lib.rs @@ -13,9 +13,6 @@ pub mod log; /// until the deadline has elapsed. pub mod retry; -/// Deadline -pub mod deadline; - /// Featureset defines a set of global features and their rollout status. pub mod featureset; diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index ea060e56..cdb9c9aa 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -7,21 +7,25 @@ license.workspace = true publish.workspace = true [dependencies] +async-trait.workspace = true cancellation.workspace = true chrono.workspace = true crossbeam.workspace = true +futures.workspace = true hex.workspace = true +libp2p.workspace = true +pluto-eth2api.workspace = true +prost.workspace = true +prost-types.workspace = true +regex.workspace = true serde.workspace = true serde_json.workspace = true serde_with.workspace = true base64.workspace = true thiserror.workspace = true tokio.workspace = true -libp2p.workspace = true -regex.workspace = true -prost.workspace = true -prost-types.workspace = true -pluto-eth2api.workspace = true +tokio-util.workspace = true +tracing.workspace = true pluto-eth2util.workspace = true tree_hash.workspace = true diff --git a/crates/core/src/deadline.rs b/crates/core/src/deadline.rs new file mode 100644 index 00000000..47cd7fd4 --- /dev/null +++ b/crates/core/src/deadline.rs @@ -0,0 +1,925 @@ +//! Duty deadline tracking and notification functionality. +//! +//! This module provides the [`Deadliner`] trait for tracking duty deadlines +//! and notifying when duties expire. It implements a background task that +//! manages timers for multiple duties and sends expired duties to a channel. +//! +//! # Example +//! +//! ```no_run +//! use chrono::{DateTime, Utc}; +//! use pluto_core::{ +//! deadline::{DeadlineFunc, new_deadliner}, +//! types::{Duty, DutyType, SlotNumber}, +//! }; +//! use std::sync::Arc; +//! use tokio_util::sync::CancellationToken; +//! +//! # async fn example() { +//! let cancel_token = CancellationToken::new(); +//! +//! // Define a deadline function +//! let deadline_func: DeadlineFunc = Arc::new(|_duty| { +//! let deadline = DateTime::from_timestamp(1000, 0).unwrap(); +//! Ok(Some(deadline)) +//! }); +//! +//! let deadliner = new_deadliner(cancel_token, "example", deadline_func); +//! +//! // Add a duty +//! let duty = Duty::new_attester_duty(SlotNumber::new(1)); +//! let added = deadliner.add(duty).await; +//! +//! // Receive expired duties +//! if let Some(mut rx) = deadliner.c() { +//! while let Some(expired_duty) = rx.recv().await { +//! println!("Duty expired: {}", expired_duty); +//! } +//! } +//! # } +//! ``` +use crate::types::{Duty, DutyType, SlotNumber}; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use futures::future::{BoxFuture, FutureExt}; +use pluto_eth2api::{EthBeaconNodeApiClient, EthBeaconNodeApiClientError}; +use std::{ + collections::HashSet, + sync::{Arc, Mutex}, +}; +use tokio_util::sync::CancellationToken; + +/// Fraction of slot duration to use as a margin for network delays. +const MARGIN_FACTOR: i32 = 12; + +/// Type alias for the deadline function. +/// +/// Takes a duty and returns an optional deadline. +/// Returns `Ok(Some(deadline))` if the duty expires at the given time. +/// Returns `Ok(None)` if the duty never expires. +pub type DeadlineFunc = Arc Result>> + Send + Sync>; + +/// Error types for deadline operations. +#[derive(Debug, thiserror::Error)] +pub enum DeadlineError { + /// Failed to fetch genesis time from beacon node. + #[error("Failed to fetch genesis time: {0}")] + FetchGenesisTime(#[from] EthBeaconNodeApiClientError), + + /// Deadliner has been shut down. + #[error("Deadliner has been shut down")] + Shutdown, + + /// Arithmetic overflow in deadline calculation. + #[error("Arithmetic overflow in deadline calculation")] + ArithmeticOverflow, + + /// Duration conversion failed. + #[error("Duration conversion failed")] + DurationConversion, + + /// DateTime calculation failed. + #[error("DateTime calculation failed")] + DateTimeCalculation, +} + +/// Result type for deadline operations. +pub type Result = std::result::Result; + +/// Converts a `std::time::Duration` to `chrono::Duration`. +fn to_chrono_duration(duration: std::time::Duration) -> Result { + chrono::Duration::from_std(duration).map_err(|_| DeadlineError::DurationConversion) +} + +/// Converts seconds (u64) to `chrono::Duration`. +fn secs_to_chrono(secs: u64) -> Result { + let secs_i64 = i64::try_from(secs).map_err(|_| DeadlineError::ArithmeticOverflow)?; + chrono::Duration::try_seconds(secs_i64).ok_or(DeadlineError::DurationConversion) +} + +/// Deadliner provides duty deadline functionality. +/// +/// The `c()` method returns a channel for receiving expired duties. +/// It may only be called once and the returned channel should be used +/// by a single task. Multiple instances are required for different +/// components and use cases. +pub trait Deadliner: Send + Sync { + /// Adds a duty for deadline scheduling. + /// + /// Returns `true` if the duty was added for future deadline scheduling. + /// This method is idempotent and returns `true` if the duty was previously + /// added and still awaits deadline scheduling. + /// + /// Returns `false` if: + /// - The duty has already expired and cannot be scheduled + /// - The duty never expires (e.g., Exit, BuilderRegistration) + fn add(&self, duty: Duty) -> BoxFuture<'_, bool>; + + /// Returns the channel for receiving deadlined duties. + /// + /// This method may only be called once and returns `None` on subsequent + /// calls. The returned channel should only be used by a single task. + fn c(&self) -> Option>; +} + +/// Trait for beacon clients that can provide genesis time and slot +/// configuration. +/// +/// This trait abstracts the necessary beacon node API calls for deadline +/// calculation. +#[async_trait] +pub trait BeaconClientForDeadline { + /// Fetches the genesis time from the beacon node. + async fn fetch_genesis_time(&self) -> Result>; + + /// Fetches the slot duration and slots per epoch from the beacon node. + async fn fetch_slots_config(&self) -> Result<(std::time::Duration, u64)>; +} + +#[async_trait] +impl BeaconClientForDeadline for EthBeaconNodeApiClient { + async fn fetch_genesis_time(&self) -> Result> { + self.fetch_genesis_time() + .await + .map_err(DeadlineError::FetchGenesisTime) + } + + async fn fetch_slots_config(&self) -> Result<(std::time::Duration, u64)> { + self.fetch_slots_config() + .await + .map_err(DeadlineError::FetchGenesisTime) + } +} + +/// Creates a deadline function from the Ethereum 2.0 beacon node configuration. +/// +/// Fetches genesis time and slot duration from the beacon node and returns +/// a function that calculates deadlines for each duty type. +/// +/// # Errors +/// +/// Returns an error if fetching genesis time or slots config fails. +pub async fn new_duty_deadline_func( + client: &C, +) -> Result { + let genesis_time = client.fetch_genesis_time().await?; + let (slot_duration, _slots_per_epoch) = client.fetch_slots_config().await?; + + // Convert std::time::Duration to chrono::Duration for slot_duration + let slot_duration = to_chrono_duration(slot_duration)?; + + Ok(Arc::new(move |duty: Duty| { + // Exit and BuilderRegistration duties never expire + match duty.duty_type { + DutyType::Exit | DutyType::BuilderRegistration => { + return Ok(None); + } + _ => {} + } + + // Calculate slot start time + // start = genesis_time + (slot * slot_duration) + let slot_secs = duty + .slot + .inner() + .checked_mul( + u64::try_from(slot_duration.num_seconds()) + .map_err(|_| DeadlineError::ArithmeticOverflow)?, + ) + .ok_or(DeadlineError::ArithmeticOverflow)?; + let slot_offset = secs_to_chrono(slot_secs)?; + + let start: DateTime = genesis_time + .checked_add_signed(slot_offset) + .ok_or(DeadlineError::DateTimeCalculation)?; + + // Calculate margin: slot_duration / MARGIN_FACTOR + let margin = slot_duration + .checked_div(MARGIN_FACTOR) + .ok_or(DeadlineError::ArithmeticOverflow)?; + + // Calculate duty-specific duration + let duration = match duty.duty_type { + DutyType::Proposer | DutyType::Randao => { + // duration = slot_duration / 3 + slot_duration + .checked_div(3) + .ok_or(DeadlineError::ArithmeticOverflow)? + } + DutyType::SyncMessage => { + // duration = 2 * slot_duration / 3 + slot_duration + .checked_mul(2) + .and_then(|s| s.checked_div(3)) + .ok_or(DeadlineError::ArithmeticOverflow)? + } + DutyType::Attester | DutyType::Aggregator | DutyType::PrepareAggregator => { + // duration = 2 * slot_duration + // Even though attestations and aggregations are acceptable after 2 slots, + // the rewards are heavily diminished. + slot_duration + .checked_mul(2) + .ok_or(DeadlineError::ArithmeticOverflow)? + } + _ => { + // Default: duration = slot_duration + slot_duration + } + }; + + // Calculate final deadline: start + duration + margin + let deadline = start + .checked_add_signed(duration) + .and_then(|t| t.checked_add_signed(margin)) + .ok_or(DeadlineError::DateTimeCalculation)?; + + Ok(Some(deadline)) + })) +} + +/// Gets the duty with the earliest deadline from the duties map. +/// +/// Returns a tuple of (duty, deadline). If no duties are available, +/// returns a sentinel far-future date (9999-01-01). +fn get_curr_duty(duties: &HashSet, deadline_func: &DeadlineFunc) -> (Duty, DateTime) { + let mut curr_duty = Duty::new(SlotNumber::new(0), DutyType::Unknown); + + // Use far-future sentinel date (9999-01-01) matching Go implementation + // This timestamp is a known constant and will never fail + let mut curr_deadline = + DateTime::from_timestamp(253402300799, 0).unwrap_or(DateTime::::MAX_UTC); + + for duty in duties.iter() { + let Ok(deadline_opt) = deadline_func(duty.clone()) else { + continue; + }; + + // Ignore duties that never expire + let Some(duty_deadline) = deadline_opt else { + continue; + }; + + // Update if this duty has an earlier deadline + if duty_deadline < curr_deadline { + curr_duty = duty.clone(); + curr_deadline = duty_deadline; + } + } + + (curr_duty, curr_deadline) +} + +/// Internal message type for adding duties to the deadliner. +struct DeadlineInput { + duty: Duty, + response_tx: tokio::sync::oneshot::Sender, +} + +/// Implementation of the Deadliner trait. +struct DeadlinerImpl { + cancel_token: CancellationToken, + input_tx: tokio::sync::mpsc::UnboundedSender, + output_rx: Arc>>>, +} + +impl Deadliner for DeadlinerImpl { + fn add(&self, duty: Duty) -> BoxFuture<'_, bool> { + Box::pin(async move { + // Check if shut down + if self.cancel_token.is_cancelled() { + return false; + } + + let (response_tx, response_rx) = tokio::sync::oneshot::channel(); + let input = DeadlineInput { duty, response_tx }; + + // Send the duty to the background task + if self.input_tx.send(input).is_err() { + return false; + } + + // Wait for response + response_rx.await.unwrap_or(false) + }) + } + + fn c(&self) -> Option> { + self.output_rx + .lock() + .ok() + .and_then(|mut guard| guard.take()) + } +} + +/// Clock trait for abstracting time operations. +trait Clock: Send + Sync { + /// Returns the current time. + fn now(&self) -> DateTime; + + /// Creates a sleep future that completes after the given duration. + fn sleep(&self, duration: std::time::Duration) -> BoxFuture<'static, ()>; +} + +/// Real clock implementation using tokio::time. +struct RealClock; + +impl Clock for RealClock { + fn now(&self) -> DateTime { + Utc::now() + } + + fn sleep(&self, duration: std::time::Duration) -> BoxFuture<'static, ()> { + tokio::time::sleep(duration).boxed() + } +} + +impl DeadlinerImpl { + /// Background task that manages duty deadlines. + /// + /// This is an associated function (not a method) because the DeadlinerImpl + /// is immediately wrapped in Arc, preventing mutable access. + async fn run_task( + cancel_token: CancellationToken, + label: String, + deadline_func: DeadlineFunc, + clock: Arc, + mut input_rx: tokio::sync::mpsc::UnboundedReceiver, + output_tx: tokio::sync::mpsc::Sender, + ) { + let mut duties: HashSet = HashSet::new(); + let (mut curr_duty, mut curr_deadline) = get_curr_duty(&duties, &deadline_func); + + // Create initial timer + let now = clock.now(); + let initial_duration = curr_deadline + .signed_duration_since(now) + .to_std() + .unwrap_or(std::time::Duration::ZERO); + let mut timer = clock.sleep(initial_duration); + + loop { + tokio::select! { + biased; + + _ = cancel_token.cancelled() => { + return; + } + + Some(input) = input_rx.recv() => { + let duty = input.duty; + let Ok(deadline_opt) = deadline_func(duty.clone()) else { + let _ = input.response_tx.send(false); + continue; + }; + + // Drop duties that never expire + let Some(deadline) = deadline_opt else { + let _ = input.response_tx.send(false); + continue; + }; + + let now = clock.now(); + let expired = deadline < now; + + let _ = input.response_tx.send(!expired); + + // Ignore expired duties + if expired { + continue; + } + + // Add duty to the map (idempotent) + duties.insert(duty); + + // Update timer if this deadline is earlier + if deadline < curr_deadline { + let (new_duty, new_deadline) = get_curr_duty(&duties, &deadline_func); + curr_duty = new_duty; + curr_deadline = new_deadline; + + let duration = curr_deadline + .signed_duration_since(clock.now()) + .to_std() + .unwrap_or(std::time::Duration::ZERO); + timer = clock.sleep(duration); + } + } + + _ = &mut timer => { + // Deadline expired - send duty to output channel + match output_tx.try_send(curr_duty.clone()) { + Ok(()) => {} + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { + tracing::warn!( + label = %label, + duty = %curr_duty, + "Deadliner output channel full" + ); + } + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { + return; + } + } + + // Remove duty from map + duties.remove(&curr_duty); + + // Update to next duty + let (new_duty, new_deadline) = get_curr_duty(&duties, &deadline_func); + curr_duty = new_duty; + curr_deadline = new_deadline; + + let duration = curr_deadline + .signed_duration_since(clock.now()) + .to_std() + .unwrap_or(std::time::Duration::ZERO); + timer = clock.sleep(duration); + } + } + } + } + + /// Internal constructor for creating a deadliner with a specific clock. + fn new_internal( + cancel_token: CancellationToken, + label: impl Into, + deadline_func: DeadlineFunc, + clock: Arc, + ) -> Arc { + const OUTPUT_BUFFER: usize = 10; + + let label = label.into(); + let (input_tx, input_rx) = tokio::sync::mpsc::unbounded_channel(); + let (output_tx, output_rx) = tokio::sync::mpsc::channel(OUTPUT_BUFFER); + + let impl_instance: Arc = Arc::new(DeadlinerImpl { + cancel_token: cancel_token.clone(), + input_tx, + output_rx: Arc::new(Mutex::new(Some(output_rx))), + }); + + // Spawn background task + tokio::spawn(Self::run_task( + cancel_token, + label, + deadline_func, + clock, + input_rx, + output_tx, + )); + + impl_instance + } +} + +/// Creates a new Deadliner instance. +/// +/// Starts a background task that manages duty deadlines and sends expired +/// duties to a channel. The background task runs until the cancellation token +/// is cancelled. +/// +/// # Arguments +/// +/// * `cancel_token` - Token to cancel the background task +/// * `label` - Label for logging purposes +/// * `deadline_func` - Function that calculates deadlines for duties +/// +/// # Returns +/// +/// An Arc-wrapped Deadliner trait object +pub fn new_deadliner( + cancel_token: CancellationToken, + label: impl Into, + deadline_func: DeadlineFunc, +) -> Arc { + DeadlinerImpl::new_internal(cancel_token, label, deadline_func, Arc::new(RealClock)) +} + +/// Creates a new Deadliner instance for testing with a fake clock. +/// +/// This constructor is intended for use in tests where you need to control +/// time progression. +/// +/// # Arguments +/// +/// * `cancel_token` - Token to cancel the background task +/// * `label` - Label for logging purposes +/// * `deadline_func` - Function that calculates deadlines for duties +/// * `clock` - Test clock for controlling time in tests +/// +/// # Returns +/// +/// An Arc-wrapped Deadliner trait object +#[cfg(test)] +fn new_deadliner_for_test( + cancel_token: CancellationToken, + label: impl Into, + deadline_func: DeadlineFunc, + clock: Arc, +) -> Arc { + DeadlinerImpl::new_internal(cancel_token, label, deadline_func, clock) +} + +/// Fake clock implementation for testing. +#[cfg(test)] +type WakerList = Vec<(DateTime, std::task::Waker)>; + +#[cfg(test)] +struct TestClock { + start: std::sync::Arc>>, + wakers: std::sync::Arc>, +} + +#[cfg(test)] +impl TestClock { + fn new(start: DateTime) -> Self { + Self { + start: std::sync::Arc::new(std::sync::Mutex::new(start)), + wakers: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())), + } + } + + fn advance(&self, duration: std::time::Duration) { + let new_time = { + let mut start = self.start.lock().unwrap(); + let chrono_duration = chrono::Duration::from_std(duration).unwrap(); + *start = start.checked_add_signed(chrono_duration).unwrap(); + *start + }; + + // Wake all timers that have expired + let mut wakers = self.wakers.lock().unwrap(); + let (expired, pending): (Vec<_>, Vec<_>) = wakers + .drain(..) + .partition(|(deadline, _)| *deadline <= new_time); + *wakers = pending; + + // Wake expired futures + for (_, waker) in expired { + waker.wake(); + } + } +} + +#[cfg(test)] +impl Clock for TestClock { + fn now(&self) -> DateTime { + *self.start.lock().unwrap() + } + + fn sleep(&self, duration: std::time::Duration) -> BoxFuture<'static, ()> { + let deadline = self + .now() + .checked_add_signed(chrono::Duration::from_std(duration).unwrap()) + .unwrap(); + let wakers = Arc::clone(&self.wakers); + let start = Arc::clone(&self.start); + + Box::pin(std::future::poll_fn(move |cx| { + let now = *start.lock().unwrap(); + if now >= deadline { + std::task::Poll::Ready(()) + } else { + // Register waker + let mut wakers = wakers.lock().unwrap(); + // Check if this waker is already registered for this deadline + if !wakers.iter().any(|(d, _)| *d == deadline) { + wakers.push((deadline, cx.waker().clone())); + } + std::task::Poll::Pending + } + })) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::SlotNumber; + use test_case::test_case; + + /// Helper function to create expired duties, non-expired duties, and + /// voluntary exits. + fn setup_data() -> (Vec, Vec, Vec) { + let expired_duties = vec![ + Duty::new_attester_duty(SlotNumber::new(1)), + Duty::new_proposer_duty(SlotNumber::new(2)), + Duty::new_randao_duty(SlotNumber::new(3)), + ]; + + let non_expired_duties = vec![ + Duty::new_proposer_duty(SlotNumber::new(1)), + Duty::new_attester_duty(SlotNumber::new(2)), + ]; + + let voluntary_exits = vec![ + Duty::new_voluntary_exit_duty(SlotNumber::new(2)), + Duty::new_voluntary_exit_duty(SlotNumber::new(4)), + ]; + + (expired_duties, non_expired_duties, voluntary_exits) + } + + /// Helper function to add duties to the deadliner and send results to a + /// channel. + async fn add_duties( + duties: Vec, + deadliner: Arc, + result_tx: tokio::sync::mpsc::Sender, + ) { + for duty in duties { + let added = deadliner.add(duty).await; + let _ = result_tx.send(added).await; + } + } + + #[tokio::test] + async fn test_deadliner() { + let (expired_duties, non_expired_duties, voluntary_exits) = setup_data(); + + let start_time = DateTime::from_timestamp(1000, 0).unwrap(); + let clock = Arc::new(TestClock::new(start_time)); + + // Create a deadline function provider + let expired_set: std::collections::HashSet<_> = expired_duties.iter().cloned().collect(); + let deadline_func: DeadlineFunc = { + Arc::new(move |duty: Duty| { + if duty.duty_type == DutyType::Exit { + // Voluntary exits expire after 1 hour + let deadline = start_time + .checked_add_signed(chrono::Duration::try_hours(1).unwrap()) + .ok_or(DeadlineError::DateTimeCalculation)?; + return Ok(Some(deadline)); + } + + if expired_set.contains(&duty) { + // Expired duties have deadline 1 hour in the past + let deadline = start_time + .checked_sub_signed(chrono::Duration::try_hours(1).unwrap()) + .ok_or(DeadlineError::DateTimeCalculation)?; + return Ok(Some(deadline)); + } + + // Non-expired duties expire after duty.slot seconds from start + let deadline = start_time + .checked_add_signed( + chrono::Duration::try_seconds(i64::try_from(duty.slot.inner()).unwrap()) + .unwrap(), + ) + .ok_or(DeadlineError::DateTimeCalculation)?; + Ok(Some(deadline)) + }) + }; + + let cancel_token = CancellationToken::new(); + let deadliner = new_deadliner_for_test( + cancel_token.clone(), + "test", + deadline_func, + Arc::clone(&clock), + ); + + // Get the output receiver + let mut output_rx = deadliner.c().expect("should get receiver"); + + // Separate channels for expired and non-expired results + let (expired_tx, mut expired_rx) = tokio::sync::mpsc::channel(100); + let (non_expired_tx, mut non_expired_rx) = tokio::sync::mpsc::channel(100); + + // Add all duties + let expired_len = expired_duties.len(); + let non_expired_len = non_expired_duties.len(); + let voluntary_exits_len = voluntary_exits.len(); + + let handler_expired = tokio::spawn(add_duties( + expired_duties, + Arc::clone(&deadliner), + expired_tx, + )); + let handler_non_expired = tokio::spawn(add_duties( + non_expired_duties.clone(), + Arc::clone(&deadliner), + non_expired_tx.clone(), + )); + let handler_voluntary_exits = tokio::spawn(add_duties( + voluntary_exits, + Arc::clone(&deadliner), + non_expired_tx, + )); + + // Wait for all handlers to complete + let (result_expired, result_non_expired, result_voluntary_exits) = tokio::join!( + handler_expired, + handler_non_expired, + handler_voluntary_exits + ); + result_expired.unwrap(); + result_non_expired.unwrap(); + result_voluntary_exits.unwrap(); + + for _ in 0..expired_len { + let result = expired_rx.recv().await.expect("should receive result"); + assert!(!result, "expired duties should return false"); + } + + for _ in 0..(non_expired_len.checked_add(voluntary_exits_len).unwrap()) { + let result = non_expired_rx.recv().await.expect("should receive result"); + assert!(result, "non-expired duties should return true"); + } + + // Find max slot from non-expired duties + let max_slot = non_expired_duties + .iter() + .map(|d| d.slot.inner()) + .max() + .unwrap(); + + // Advance clock to trigger deadline of all non-expired duties + clock.advance(std::time::Duration::from_secs(max_slot)); + + // Give the deadliner task time to wake up and process + // We need to yield multiple times to ensure the background task runs + for _ in 0..10 { + tokio::task::yield_now().await; + } + + // Collect expired duties from output channel + let mut actual_duties = Vec::new(); + for _ in 0..non_expired_len { + let duty = tokio::time::timeout(std::time::Duration::from_secs(1), output_rx.recv()) + .await + .expect("should receive within timeout") + .expect("should receive duty"); + actual_duties.push(duty); + } + + // Sort both for comparison + actual_duties.sort_by_key(|d| d.slot.inner()); + let mut expected_duties = non_expired_duties; + expected_duties.sort_by_key(|d| d.slot.inner()); + + assert_eq!(expected_duties, actual_duties); + + cancel_token.cancel(); + } + + #[test_case(DutyType::Exit ; "exit")] + #[test_case(DutyType::BuilderRegistration ; "builder_registration")] + #[tokio::test] + async fn test_never_expire_duties(duty_type: DutyType) { + let mock_client = create_mock_client(); + + let deadline_func = new_duty_deadline_func(&mock_client) + .await + .expect("should create deadline func"); + + let duty = Duty::new(SlotNumber::new(100), duty_type); + let result = deadline_func(duty).expect("should compute deadline"); + + assert_eq!(result, None, "duty should never expire"); + } + + // todo: uses hardcode beacon client for testing, should be refactored to use a + // real beacon client (testutils/beaconmock) + #[test_case(DutyType::Proposer ; "proposer")] + #[test_case(DutyType::Attester ; "attester")] + #[test_case(DutyType::Aggregator ; "aggregator")] + #[test_case(DutyType::PrepareAggregator ; "prepare_aggregator")] + #[test_case(DutyType::SyncMessage ; "sync_message")] + #[test_case(DutyType::SyncContribution ; "sync_contribution")] + #[test_case(DutyType::Randao ; "randao")] + #[test_case(DutyType::InfoSync ; "info_sync")] + #[test_case(DutyType::PrepareSyncContribution ; "prepare_sync_contribution")] + #[tokio::test] + async fn test_duty_deadline_durations(duty_type: DutyType) { + let mock_client = create_mock_client(); + + let genesis_time = mock_client.fetch_genesis_time().await.unwrap(); + let (slot_duration, _) = mock_client.fetch_slots_config().await.unwrap(); + + let margin = slot_duration + .checked_div(12) + .expect("margin calculation should not fail"); + + let time_since_genesis = Utc::now().signed_duration_since(genesis_time); + let slot_duration_chrono = to_chrono_duration(slot_duration).unwrap(); + let current_slot = u64::try_from( + time_since_genesis + .num_seconds() + .checked_div(slot_duration_chrono.num_seconds()) + .expect("slot duration should not be zero"), + ) + .expect("current slot should be positive"); + + let slot_start = { + let offset_secs = current_slot + .checked_mul(slot_duration.as_secs()) + .expect("slot offset should not overflow"); + let offset = chrono::Duration::try_seconds( + i64::try_from(offset_secs).expect("offset should fit in i64"), + ) + .expect("offset should be valid duration"); + genesis_time + .checked_add_signed(offset) + .expect("slot start should not overflow") + }; + + let deadline_func = new_duty_deadline_func(&mock_client) + .await + .expect("should create deadline func"); + + let expected_duration = match duty_type { + DutyType::Proposer | DutyType::Randao => { + // slotDuration/3 + margin + slot_duration + .checked_div(3) + .and_then(|d| d.checked_add(margin)) + .expect("duration calculation should not fail") + } + DutyType::Attester | DutyType::Aggregator | DutyType::PrepareAggregator => { + // 2*slotDuration + margin + slot_duration + .checked_mul(2) + .and_then(|d| d.checked_add(margin)) + .expect("duration calculation should not fail") + } + DutyType::SyncMessage => { + // 2*slotDuration/3 + margin + slot_duration + .checked_mul(2) + .and_then(|d| d.checked_div(3)) + .and_then(|d| d.checked_add(margin)) + .expect("duration calculation should not fail") + } + DutyType::SyncContribution | DutyType::InfoSync | DutyType::PrepareSyncContribution => { + // slotDuration + margin + slot_duration + .checked_add(margin) + .expect("duration calculation should not fail") + } + _ => panic!("unexpected duty type: {:?}", duty_type), + }; + + let duty = Duty::new(SlotNumber::new(current_slot), duty_type.clone()); + + let now_before_deadline = slot_start + .checked_add_signed(to_chrono_duration(expected_duration).unwrap()) + .and_then(|t| t.checked_sub_signed(chrono::Duration::try_milliseconds(1).unwrap())) + .expect("time calculation should not fail"); + + let deadline_opt = deadline_func(duty.clone()).expect("should compute deadline"); + + assert!( + deadline_opt.is_some(), + "duty {:?} should have a deadline", + duty_type + ); + + let deadline = deadline_opt.unwrap(); + + assert!( + now_before_deadline < deadline, + "duty {:?}: now ({}) should be before deadline ({})", + duty_type, + now_before_deadline, + deadline + ); + + let time_until_deadline = deadline.signed_duration_since(now_before_deadline); + assert_eq!( + time_until_deadline, + chrono::Duration::try_milliseconds(1).unwrap(), + "duty {:?}: deadline should be exactly 1ms after now (actual: {}ms)", + duty_type, + time_until_deadline.num_milliseconds() + ); + } + + /// Creates a mock EthBeaconNodeApiClient for testing. + fn create_mock_client() -> MockBeaconClient { + MockBeaconClient { + genesis_time: DateTime::from_timestamp(1646092800, 0).unwrap(), /* 2022-03-01 + * 00:00:00 UTC */ + slot_duration: std::time::Duration::from_secs(12), + slots_per_epoch: 16, + } + } + + /// Mock beacon client for testing. + struct MockBeaconClient { + genesis_time: DateTime, + slot_duration: std::time::Duration, + slots_per_epoch: u64, + } + + #[async_trait] + impl BeaconClientForDeadline for MockBeaconClient { + async fn fetch_genesis_time(&self) -> Result> { + Ok(self.genesis_time) + } + + async fn fetch_slots_config(&self) -> Result<(std::time::Duration, u64)> { + Ok((self.slot_duration, self.slots_per_epoch)) + } + } +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index d696f161..d38cbdca 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -19,3 +19,6 @@ pub mod corepb; /// Semver version parsing utilities. pub mod version; + +/// Duty deadline tracking and notification. +pub mod deadline;