Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ cancellation = "0.1.0"
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.5.53", features = ["derive", "env", "cargo"] }
crossbeam = "0.8.4"
dyn-clone = "1.0"
dyn-eq = "0.1.3"
either = "1.13"
futures = "0.3"
futures-timer = "3.0"
Expand Down
5 changes: 5 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ cancellation.workspace = true
chrono.workspace = true
crossbeam.workspace = true
futures.workspace = true
dyn-clone.workspace = true
dyn-eq.workspace = true
hex.workspace = true
libp2p.workspace = true
vise.workspace = true
pluto-eth2api.workspace = true
prost.workspace = true
prost-types.workspace = true
Expand All @@ -38,6 +41,8 @@ prost-types.workspace = true
hex.workspace = true
chrono.workspace = true
test-case.workspace = true
pluto-eth2util.workspace = true
pluto-testutil.workspace = true

[build-dependencies]
pluto-build-proto.workspace = true
Expand Down
7 changes: 7 additions & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@ pub mod version;

/// Duty deadline tracking and notification.
pub mod deadline;

/// parsigdb
pub mod parsigdb;

/// Test utilities.
#[cfg(test)]
pub mod testutils;
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use tracing::{debug, warn};

use crate::{
deadline::Deadliner,
parasigdb::metrics::PARASIG_DB_METRICS,
parsigdb::metrics::PARSIG_DB_METRICS,
signeddata::SignedDataError,
types::{Duty, DutyType, ParSignedData, ParSignedDataSet, PubKey},
};
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -55,9 +56,9 @@ pub type ThreshSub = Arc<

/// Helper to create an internal subscriber from a closure.
///
/// The closure receives owned copies of the duty and data set. Since the closure
/// is `Fn` (can be called multiple times), you need to clone any captured Arc values
/// before the `async move` block.
/// The closure receives owned copies of the duty and data set. Since the
/// closure is `Fn` (can be called multiple times), you need to clone any
/// captured Arc values before the `async move` block.
///
/// # Example
/// ```ignore
Expand Down Expand Up @@ -88,8 +89,8 @@ where
/// Helper to create a threshold subscriber from a closure.
///
/// The closure receives owned copies of the duty and data. Since the closure
/// is `Fn` (can be called multiple times), you need to clone any captured Arc values
/// before the `async move` block.
/// is `Fn` (can be called multiple times), you need to clone any captured Arc
/// values before the `async move` block.
///
/// # Example
/// ```ignore
Expand Down Expand Up @@ -128,6 +129,10 @@ pub enum MemDBError {
/// Share index of the mismatched signature
share_idx: u64,
},

/// Signed data error.
#[error("signed data error: {0}")]
SignedDataError(#[from] SignedDataError),
}

type Result<T> = std::result::Result<T, MemDBError>;
Expand Down Expand Up @@ -186,8 +191,8 @@ impl MemDB {
impl MemDB {
/// Registers a subscriber for internally generated partial signed data.
///
/// The subscriber will be called when the node generates partial signed data
/// that needs to be exchanged with peers.
/// The subscriber will be called when the node generates partial signed
/// data that needs to be exchanged with peers.
pub async fn subscribe_internal(&self, sub: InternalSub) -> Result<()> {
let mut inner = self.inner.lock().await;
inner.internal_subs.push(sub);
Expand All @@ -204,11 +209,13 @@ impl MemDB {
Ok(())
}

/// Stores internally generated partial signed data and notifies subscribers.
/// Stores internally generated partial signed data and notifies
/// subscribers.
///
/// This is called when the node generates partial signed data that needs to be
/// stored and exchanged with peers. It first stores the data (via `store_external`),
/// then calls all internal subscribers to trigger peer exchange.
/// This is called when the node generates partial signed data that needs to
/// be stored and exchanged with peers. It first stores the data (via
/// `store_external`), then calls all internal subscribers to trigger
/// peer exchange.
pub async fn store_internal(&self, duty: &Duty, signed_set: &ParSignedDataSet) -> Result<()> {
self.store_external(duty, signed_set).await?;

Expand All @@ -226,9 +233,10 @@ impl MemDB {

/// Stores externally received partial signed data and checks for threshold.
///
/// This is called when the node receives partial signed data from peers. It stores
/// the data, checks if enough matching signatures have been collected to meet the
/// threshold, and calls threshold subscribers when the threshold is reached.
/// This is called when the node receives partial signed data from peers. It
/// stores the data, checks if enough matching signatures have been
/// collected to meet the threshold, and calls threshold subscribers
/// when the threshold is reached.
pub async fn store_external(&self, duty: &Duty, signed_data: &ParSignedDataSet) -> Result<()> {
let _ = self.deadliner.add(duty.clone()).await;

Expand All @@ -239,7 +247,7 @@ impl MemDB {
.store(
Key {
duty: duty.clone(),
pub_key: pub_key.clone(),
pub_key: *pub_key,
},
par_signed.clone(),
)
Expand All @@ -257,7 +265,7 @@ impl MemDB {
continue;
};

output.insert(pub_key.clone(), psigs);
output.insert(*pub_key, psigs);
}

if output.is_empty() {
Expand All @@ -278,17 +286,15 @@ impl MemDB {

/// Trims expired duties from the database.
///
/// This method runs in a loop, listening for expired duties from the deadliner
/// and removing their associated data from the database. It should be spawned
/// as a background task and will run until the cancellation token is triggered.
/// This method runs in a loop, listening for expired duties from the
/// deadliner and removing their associated data from the database. It
/// should be spawned as a background task and will run until the
/// cancellation token is triggered.
pub async fn trim(&self) {
let deadliner_rx = self.deadliner.c();
if deadliner_rx.is_none() {
let Some(mut deadliner_rx) = self.deadliner.c() else {
warn!("Deadliner channel is not available");
return;
}

let mut deadliner_rx = deadliner_rx.unwrap();
};

loop {
tokio::select! {
Expand Down Expand Up @@ -345,14 +351,10 @@ impl MemDB {
.push(k.clone());

if k.duty.duty_type == DutyType::Exit {
PARASIG_DB_METRICS.exit_total[&k.pub_key.to_string()].inc();
PARSIG_DB_METRICS.exit_total[&k.pub_key.to_string()].inc();
}

let result = inner
.entries
.get(&k)
.map(|entries| entries.clone())
.unwrap_or_default();
let result = inner.entries.get(&k).cloned().unwrap_or_default();

Ok(Some(result))
}
Expand Down Expand Up @@ -381,11 +383,11 @@ async fn get_threshold_matching(
let mut sigs_by_msg_root: HashMap<[u8; 32], Vec<ParSignedData>> = HashMap::new();

for sig in sigs {
let root = sig.signed_data.message_root();
sigs_by_msg_root
.entry(root)
.or_insert_with(Vec::new)
.push(sig.clone());
let root = sig
.signed_data
.message_root()
.map_err(MemDBError::SignedDataError)?;
sigs_by_msg_root.entry(root).or_default().push(sig.clone());
}

// Return the first set that has exactly threshold number of signatures
Expand Down
Loading
Loading