From de6dba45dfe90f2f77325eebc4b3725197d25e10 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> Date: Thu, 12 Mar 2026 15:37:22 +0100 Subject: [PATCH 1/5] wip: add parasig db and some tests --- Cargo.lock | 4 + crates/core/Cargo.toml | 3 + crates/core/src/lib.rs | 7 + crates/core/src/parasigdb/memory.rs | 282 ++++++++++++++++++ .../src/parasigdb/memory_internal_test.rs | 282 ++++++++++++++++++ crates/core/src/parasigdb/metrics.rs | 12 + crates/core/src/parasigdb/mod.rs | 5 + crates/core/src/testutils.rs | 141 +++++++++ crates/core/src/types.rs | 105 ++++--- crates/testutil/Cargo.toml | 1 + crates/testutil/src/random.rs | 192 ++++++++++++ 11 files changed, 997 insertions(+), 37 deletions(-) create mode 100644 crates/core/src/parasigdb/memory.rs create mode 100644 crates/core/src/parasigdb/memory_internal_test.rs create mode 100644 crates/core/src/parasigdb/metrics.rs create mode 100644 crates/core/src/parasigdb/mod.rs create mode 100644 crates/core/src/testutils.rs diff --git a/Cargo.lock b/Cargo.lock index ae2b7b21..03dbdbf8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5480,6 +5480,8 @@ dependencies = [ "libp2p", "pluto-build-proto", "pluto-eth2api", + "pluto-eth2util", + "pluto-testutil", "prost 0.14.3", "prost-types 0.14.3", "rand 0.8.5", @@ -5491,6 +5493,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "vise", ] [[package]] @@ -5691,6 +5694,7 @@ dependencies = [ "hex", "k256", "pluto-crypto", + "pluto-eth2api", "rand 0.8.5", "rand_core 0.6.4", "thiserror 2.0.18", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index c1f74cbf..1566510e 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -14,6 +14,7 @@ crossbeam.workspace = true futures.workspace = true hex.workspace = true libp2p.workspace = true +vise.workspace = true pluto-eth2api.workspace = true prost.workspace = true prost-types.workspace = true @@ -33,6 +34,8 @@ prost-types.workspace = true hex.workspace = true chrono.workspace = true test-case.workspace = true +pluto-eth2util.workspace = true +pluto-testutil.workspace = true [build-dependencies] pluto-build-proto.workspace = true diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index d34e1782..ee3a0360 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -19,3 +19,10 @@ pub mod version; /// Duty deadline tracking and notification. pub mod deadline; + +/// parasigdb +pub mod parasigdb; + +/// Test utilities. +#[cfg(test)] +pub mod testutils; diff --git a/crates/core/src/parasigdb/memory.rs b/crates/core/src/parasigdb/memory.rs new file mode 100644 index 00000000..dd1e0f68 --- /dev/null +++ b/crates/core/src/parasigdb/memory.rs @@ -0,0 +1,282 @@ +#![allow(missing_docs)] +use std::{collections::HashMap, pin::Pin, sync::Arc}; +use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; +use tracing::{debug, warn}; + +use crate::{ + deadline::Deadliner, + parasigdb::metrics::PARASIG_DB_METRICS, + types::{Duty, DutyType, ParSignedData, ParSignedDataSet, PubKey}, +}; +use chrono::{DateTime, Utc}; + +/// Metadata for the memory ParSigDB. +pub struct MemDBMetadata { + /// Slot duration in seconds + pub slot_duration: u64, + /// Genesis time + pub genesis_time: DateTime, +} + +impl MemDBMetadata { + /// Creates new memory ParSigDB metadata. + pub fn new(slot_duration: u64, genesis_time: DateTime) -> Self { + Self { + slot_duration, + genesis_time, + } + } +} + +pub type InternalSub = Box< + dyn Fn(&Duty, &ParSignedDataSet) -> Pin> + Send + Sync>> + + Send + + Sync + + 'static, +>; + +pub type ThreshSub = Box< + dyn Fn( + &Duty, + &HashMap>, + ) -> Pin> + Send + Sync>> + + Send + + Sync + + 'static, +>; + +#[derive(Debug, thiserror::Error)] +pub enum MemDBError { + #[error("mismatching partial signed data: pubkey {pubkey}, share_idx {share_idx}")] + ParsigDataMismatch { pubkey: PubKey, share_idx: u64 }, +} + +type Result = std::result::Result; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Key { + pub duty: Duty, + pub pub_key: PubKey, +} + +pub struct MemDBInner { + internal_subs: Vec, + thresh_subs: Vec, + + entries: HashMap>, + keys_by_duty: HashMap>, +} + +pub struct MemDB { + ct: CancellationToken, + inner: Arc>, + deadliner: Arc, + threshold: u64, +} + +impl MemDB { + pub fn new(ct: CancellationToken, threshold: u64, deadliner: Arc) -> Self { + Self { + ct, + inner: Arc::new(Mutex::new(MemDBInner { + internal_subs: Vec::new(), + thresh_subs: Vec::new(), + entries: HashMap::new(), + keys_by_duty: HashMap::new(), + })), + deadliner, + threshold, + } + } +} + +impl MemDB { + pub async fn subscribe_internal(&self, sub: InternalSub) -> Result<()> { + let mut inner = self.inner.lock().await; + inner.internal_subs.push(sub); + Ok(()) + } + + pub async fn subscribe_threshold(&self, sub: ThreshSub) -> Result<()> { + let mut inner = self.inner.lock().await; + inner.thresh_subs.push(sub); + Ok(()) + } + + pub async fn store_internal(&self, duty: &Duty, signed_set: &ParSignedDataSet) -> Result<()> { + let _ = self.store_external(duty, signed_set).await?; + + let inner = self.inner.lock().await; + for sub in &inner.internal_subs { + sub(&duty, &signed_set).await?; + } + drop(inner); + + Ok(()) + } + + pub async fn store_external(&self, duty: &Duty, signed_data: &ParSignedDataSet) -> Result<()> { + let _ = self.deadliner.add(duty.clone()).await; + + let mut output: HashMap> = HashMap::new(); + + for (pub_key, par_signed) in signed_data.inner().iter() { + let sigs = self + .store( + Key { + duty: duty.clone(), + pub_key: pub_key.clone(), + }, + par_signed.clone(), + ) + .await?; + + let Some(sigs) = sigs else { + debug!("Ignoring duplicate partial signature"); + + continue; + }; + + let psigs = get_threshold_matching(&duty.duty_type, &sigs, self.threshold).await?; + + let Some(psigs) = psigs else { + continue; + }; + + output.insert(pub_key.clone(), psigs); + } + + if output.is_empty() { + return Ok(()); + } + + let inner = self.inner.lock().await; + for sub in inner.thresh_subs.iter() { + sub(&duty, &output).await?; + } + drop(inner); + + Ok(()) + } + + pub async fn trim(&self) { + let deadliner_rx = self.deadliner.c(); + if deadliner_rx.is_none() { + warn!("Deadliner channel is not available"); + return; + } + + let mut deadliner_rx = deadliner_rx.unwrap(); + + loop { + tokio::select! { + biased; + + _ = self.ct.cancelled() => { + return; + } + + Some(duty) = deadliner_rx.recv() => { + let mut inner = self.inner.lock().await; + + for key in inner.keys_by_duty.get(&duty).cloned().unwrap_or_default() { + inner.entries.remove(&key); + } + + inner.keys_by_duty.remove(&duty); + + drop(inner); + } + } + } + } + + async fn store(&self, k: Key, value: ParSignedData) -> Result>> { + let mut inner = self.inner.lock().await; + + // Check if we already have an entry with this ShareIdx + if let Some(existing_entries) = inner.entries.get(&k) { + for s in existing_entries { + if s.share_idx == value.share_idx { + if s == &value { + // Duplicate, return None to indicate no new data + return Ok(None); + } else { + return Err(MemDBError::ParsigDataMismatch { + pubkey: k.pub_key, + share_idx: value.share_idx, + }); + } + } + } + } + + inner + .entries + .entry(k.clone()) + .or_insert_with(Vec::new) + .push(value.clone()); + inner + .keys_by_duty + .entry(k.duty.clone()) + .or_insert_with(Vec::new) + .push(k.clone()); + + if k.duty.duty_type == DutyType::Exit { + PARASIG_DB_METRICS.exit_total[&k.pub_key.to_string()].inc(); + } + + let result = inner + .entries + .get(&k) + .map(|entries| entries.clone()) + .unwrap_or_default(); + + Ok(Some(result)) + } +} + +async fn get_threshold_matching( + typ: &DutyType, + sigs: &[ParSignedData], + threshold: u64, +) -> Result>> { + // Not enough signatures to meet threshold + if (sigs.len() as u64) < threshold { + return Ok(None); + } + + if *typ == DutyType::Signature { + // Signatures do not support message roots. + if sigs.len() as u64 == threshold { + return Ok(Some(sigs.to_vec())); + } else { + return Ok(None); + } + } + + // Group signatures by their message root + let mut sigs_by_msg_root: HashMap<[u8; 32], Vec> = HashMap::new(); + + for sig in sigs { + let root = sig.signed_data.message_root(); + sigs_by_msg_root + .entry(root) + .or_insert_with(Vec::new) + .push(sig.clone()); + } + + // Return the first set that has exactly threshold number of signatures + for set in sigs_by_msg_root.values() { + if set.len() as u64 == threshold { + return Ok(Some(set.clone())); + } + } + + Ok(None) +} + +#[cfg(test)] +#[path = "memory_internal_test.rs"] +mod memory_internal_test; diff --git a/crates/core/src/parasigdb/memory_internal_test.rs b/crates/core/src/parasigdb/memory_internal_test.rs new file mode 100644 index 00000000..4761f223 --- /dev/null +++ b/crates/core/src/parasigdb/memory_internal_test.rs @@ -0,0 +1,282 @@ +//! Internal tests for memory ParSigDB. +//! Mirrors the structure of charon/core/parsigdb/memory_internal_test.go + +use std::sync::Arc; + +use test_case::test_case; +use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; + +use super::get_threshold_matching; +use crate::{ + parasigdb::memory::{MemDB, MemDBMetadata}, testutils, types::{Duty, DutyType, ParSignedData, Signature, SignedData, SlotNumber} +}; + +/// Test wrapper for SyncCommitteeMessage (mimics altair.SyncCommitteeMessage). +/// The message root is the BeaconBlockRoot field. +#[derive(Debug, Clone)] +#[allow(dead_code)] +struct TestSyncCommitteeMessage { + slot: SlotNumber, + beacon_block_root: [u8; 32], + validator_index: u64, + signature: Signature, +} + +impl SignedData for TestSyncCommitteeMessage { + fn signature(&self) -> Signature { + self.signature.clone() + } + + fn set_signature( + &mut self, + signature: Signature, + ) -> std::result::Result<(), Box> { + self.signature = signature; + Ok(()) + } + + fn message_root(&self) -> [u8; 32] { + // For SyncCommitteeMessage, the message root is the BeaconBlockRoot + self.beacon_block_root + } + + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } + + fn equals(&self, other: &dyn SignedData) -> bool { + self.message_root() == other.message_root() && self.signature() == other.signature() + } +} + +/// Test wrapper for BeaconCommitteeSelection (mimics +/// eth2v1.BeaconCommitteeSelection). The message root is computed from the Slot +/// field. +#[derive(Debug, Clone)] +#[allow(dead_code)] +struct TestBeaconCommitteeSelection { + validator_index: u64, + slot: SlotNumber, + selection_proof: Signature, +} + +impl SignedData for TestBeaconCommitteeSelection { + fn signature(&self) -> Signature { + self.selection_proof.clone() + } + + fn set_signature( + &mut self, + signature: Signature, + ) -> std::result::Result<(), Box> { + self.selection_proof = signature; + Ok(()) + } + + fn message_root(&self) -> [u8; 32] { + // For BeaconCommitteeSelection, the message root is derived from the slot. + // We'll use a simple hash: slot number in the first 8 bytes. + let mut root = [0u8; 32]; + root[0..8].copy_from_slice(&self.slot.inner().to_le_bytes()); + root + } + + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } + + fn equals(&self, other: &dyn SignedData) -> bool { + self.message_root() == other.message_root() && self.signature() == other.signature() + } +} + +/// Helper to create random roots for testing +fn random_root(seed: u8) -> [u8; 32] { + let mut root = [0u8; 32]; + root[0] = seed; + root +} + +/// Helper to create random signature for testing +fn random_signature(seed: u8) -> Signature { + let mut sig = [0u8; 96]; + sig[0] = seed; + Signature::new(sig) +} + +/// Copying function here, not using the pluto_cluster::helpers::threshold (not +/// implemented yet) because it would be huge unnecessary dependency for core. +#[allow(clippy::arithmetic_side_effects)] +fn threshold(n: u64) -> u64 { + (2 * n + 2) / 3 +} + +// Test cases for get_threshold_matching +// Matches Go test structure from +// memory_internal_test.go:TestGetThresholdMatching +#[test_case(vec![], None ; "empty")] +#[test_case(vec![0, 0, 0], Some(vec![0, 1, 2]) ; "all identical exact threshold")] +#[test_case(vec![0, 0, 0, 0], None ; "all identical above threshold")] +#[test_case(vec![0, 0, 1, 0], Some(vec![0, 1, 3]) ; "one odd")] +#[test_case(vec![0, 0, 1, 1], None ; "two odd")] +#[tokio::test] +async fn test_get_threshold_matching(input: Vec, output: Option>) { + const N: u64 = 4; + + let slot = SlotNumber::new(123456); + let val_idx = 42u64; + + // Two different roots to vary message roots + let roots = [random_root(1), random_root(2)]; + + // Test different message types using providers (matches Go approach) + let providers: Vec<(&str, Box Box>)> = vec![ + ( + "SyncCommitteeMessage", + Box::new(|i: usize| { + Box::new(TestSyncCommitteeMessage { + slot, + beacon_block_root: roots[input[i]], // Vary root based on input + validator_index: val_idx, + signature: random_signature(i as u8), + }) + }), + ), + ( + "Selection", + Box::new(|i: usize| { + Box::new(TestBeaconCommitteeSelection { + validator_index: val_idx, + slot: SlotNumber::new(input[i] as u64), // Vary slot based on input + selection_proof: random_signature(i as u8), + }) + }), + ), + ]; + + for (_, provider) in providers { + let mut par_sigs: Vec = Vec::new(); + for i in 0..input.len() { + let signed_data = provider(i); + let par_signed = ParSignedData::new(signed_data, i as u64); + par_sigs.push(par_signed); + } + + let th = threshold(N); + + let result = get_threshold_matching(&DutyType::Attester, &par_sigs, th) + .await + .expect("get_threshold_matching should not error"); + + // Check that if we got a result, it has the correct length (matches Go's ok + // check) + if let Some(ref vec) = result { + assert_eq!( + vec.len(), + th as usize, + "result length should match threshold" + ); + } + + let out = result.unwrap_or_default(); + + let mut expect = Vec::new(); + if let Some(output) = &output { + for &idx in output { + expect.push(par_sigs[idx].clone()); + } + } + + assert_eq!(out, expect, "result should match expected"); + } +} + +use pluto_testutil::random as tu_random; + +#[tokio::test] +async fn test_mem_db_threshold() { + const THRESHOLD: u64 = 7; + const N: u64 = 10; + + let deadliner = TestDeadliner::new(); + let ct = CancellationToken::new(); + + let db = Arc::new(MemDB::new(ct.child_token(), THRESHOLD, deadliner.clone())); + + let db_clone = db.clone(); + tokio::spawn(async move { + db_clone.trim().await; + }); + + let times_called = Arc::new(Mutex::new(0)); + + db.subscribe_threshold(Box::new({ + let times_called = times_called.clone(); + move |_duty, _output| { + let times_called = times_called.clone(); + Box::pin(async move { + *times_called.lock().await += 1; + Ok(()) + }) + } + })) + .await + .unwrap(); + + let pubkey = testutils::random_core_pub_key(); + let att = tu_random::random_deneb_versioned_attestation(); + +} + +/// Test deadliner for unit tests. +pub struct TestDeadliner { + added: Arc>>, + ch_tx: tokio::sync::mpsc::Sender, + ch_rx: Arc>>>, +} + +impl TestDeadliner { + /// Creates a new test deadliner. + #[allow(dead_code)] + pub fn new() -> Arc { + const CHANNEL_BUFFER: usize = 100; + let (tx, rx) = tokio::sync::mpsc::channel(CHANNEL_BUFFER); + Arc::new(Self { + added: Arc::new(tokio::sync::Mutex::new(Vec::new())), + ch_tx: tx, + ch_rx: Arc::new(tokio::sync::Mutex::new(Some(rx))), + }) + } + + /// Expires all added duties. + #[allow(dead_code)] + pub async fn expire(&self) -> bool { + let mut added = self.added.lock().await; + for duty in added.drain(..) { + if self.ch_tx.send(duty).await.is_err() { + return false; + } + } + // Send dummy duty to ensure all piped duties above were processed + self.ch_tx + .send(Duty::new(SlotNumber::new(0), DutyType::Unknown)) + .await + .is_ok() + } +} + +impl crate::deadline::Deadliner for TestDeadliner { + fn add(&self, duty: Duty) -> futures::future::BoxFuture<'_, bool> { + Box::pin(async move { + let mut added = self.added.lock().await; + added.push(duty); + true + }) + } + + fn c(&self) -> Option> { + let mut guard = self.ch_rx.blocking_lock(); + guard.take() + } +} diff --git a/crates/core/src/parasigdb/metrics.rs b/crates/core/src/parasigdb/metrics.rs new file mode 100644 index 00000000..c828725d --- /dev/null +++ b/crates/core/src/parasigdb/metrics.rs @@ -0,0 +1,12 @@ +use vise::*; + +/// Metrics for the ParSigDB. +#[derive(Debug, Clone, Metrics)] +pub struct ParasigDBMetrics { + /// Total number of partially signed voluntary exits per public key + #[metrics(labels = ["pubkey"])] + pub exit_total: LabeledFamily, +} + +/// Global metrics for the ParSigDB. +pub static PARASIG_DB_METRICS: Global = Global::new(); diff --git a/crates/core/src/parasigdb/mod.rs b/crates/core/src/parasigdb/mod.rs new file mode 100644 index 00000000..fd01b279 --- /dev/null +++ b/crates/core/src/parasigdb/mod.rs @@ -0,0 +1,5 @@ +/// Memory implementation of the ParSigDB. +pub mod memory; + +/// Metrics for the ParSigDB. +pub mod metrics; diff --git a/crates/core/src/testutils.rs b/crates/core/src/testutils.rs new file mode 100644 index 00000000..3183b146 --- /dev/null +++ b/crates/core/src/testutils.rs @@ -0,0 +1,141 @@ +//! Test utilities for the Charon core. + +use rand::{Rng, SeedableRng}; + +use crate::types::PubKey; + +/// The size of a BLS public key in bytes. +const PK_LEN: usize = 48; + +/// Creates a new seeded random number generator. +/// +/// Returns a new random number generator seeded with a random value. +/// This matches the Go implementation: `rand.New(rand.NewSource(rand.Int63()))`. +pub fn new_seed_rand() -> impl Rng { + let seed = rand::random::(); + rand::rngs::StdRng::seed_from_u64(seed) +} + +/// Returns a random core workflow pubkey. +/// +/// This is a convenience wrapper around `random_core_pub_key_seed` that creates +/// a new random seed for each call. +pub fn random_core_pub_key() -> PubKey { + random_core_pub_key_seed(new_seed_rand()) +} + +/// Returns a random core workflow pubkey using a provided random source. +/// +/// # Arguments +/// +/// * `rng` - A random number generator to use for generating the pubkey. +/// +/// # Panics +/// +/// Panics if the generated bytes cannot be converted to a valid PubKey. +/// This should never happen in practice as we generate exactly 48 bytes. +pub fn random_core_pub_key_seed(mut rng: R) -> PubKey { + let pubkey = deterministic_pub_key_seed(&mut rng); + PubKey::try_from(&pubkey[..]).expect("valid pubkey length") +} + +/// Generates a deterministic pubkey from a seeded RNG. +/// +/// This function creates a new RNG seeded from the input RNG, then fills +/// a 48-byte array with random data. This matches the Go implementation: +/// +/// ```go +/// random := rand.New(rand.NewSource(r.Int63())) +/// var key tbls.PublicKey +/// _, err := random.Read(key[:]) +/// ``` +/// +/// # Arguments +/// +/// * `rng` - A mutable reference to a random number generator. +/// +/// # Returns +/// +/// A 48-byte array containing random data suitable for use as a public key. +fn deterministic_pub_key_seed(rng: &mut R) -> [u8; PK_LEN] { + // Create a new RNG seeded from the input RNG (matching Go's rand.New(rand.NewSource(r.Int63()))) + let seed: u64 = rng.r#gen(); + let mut seeded_rng = rand::rngs::StdRng::seed_from_u64(seed); + + let mut key = [0u8; PK_LEN]; + // Fill the key with random bytes + for byte in &mut key { + *byte = seeded_rng.r#gen(); + } + + key +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_new_seed_rand_produces_different_values() { + let mut rng1 = new_seed_rand(); + let mut rng2 = new_seed_rand(); + + let val1: u64 = rng1.r#gen(); + let val2: u64 = rng2.r#gen(); + + // These should be different with very high probability + assert_ne!(val1, val2); + } + + #[test] + fn test_random_core_pub_key_generates_valid_keys() { + let pk1 = random_core_pub_key(); + let pk2 = random_core_pub_key(); + + // Keys should be different + assert_ne!(pk1, pk2); + + // Keys should have the correct length when serialized + assert_eq!(pk1.to_string().len(), 98); // 0x + 96 hex chars + assert_eq!(pk2.to_string().len(), 98); + } + + #[test] + fn test_random_core_pub_key_seed_is_deterministic() { + let seed = 12345u64; + let mut rng1 = rand::rngs::StdRng::seed_from_u64(seed); + let mut rng2 = rand::rngs::StdRng::seed_from_u64(seed); + + let pk1 = random_core_pub_key_seed(&mut rng1); + let pk2 = random_core_pub_key_seed(&mut rng2); + + // Same seed should produce same key + assert_eq!(pk1, pk2); + } + + #[test] + fn test_deterministic_pub_key_seed() { + let seed = 42u64; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + + let key = deterministic_pub_key_seed(&mut rng); + + // Check that we got 48 bytes + assert_eq!(key.len(), PK_LEN); + + // Check that the key is not all zeros (very unlikely with a proper RNG) + assert!(key.iter().any(|&b| b != 0)); + } + + #[test] + fn test_random_core_pub_key_seed_different_rngs() { + let mut rng1 = rand::rngs::StdRng::seed_from_u64(1); + let mut rng2 = rand::rngs::StdRng::seed_from_u64(2); + + let pk1 = random_core_pub_key_seed(&mut rng1); + let pk2 = random_core_pub_key_seed(&mut rng2); + + // Different seeds should produce different keys + assert_ne!(pk1, pk2); + } +} diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 9cc95d85..7fa29b07 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -448,19 +448,22 @@ impl AsRef<[u8; SIG_LEN]> for Signature { } /// Signed data type -pub trait SignedData: Clone + Serialize + StdDebug { - /// The error type - type Error: std::error::Error; - +pub trait SignedData: StdDebug + Send + Sync { /// signature returns the signed duty data's signature. fn signature(&self) -> Signature; /// set_signature returns a copy of signed duty data with the signature /// replaced. - fn set_signature(&mut self, signature: Signature) -> Result<(), Self::Error>; + fn set_signature(&mut self, signature: Signature) -> Result<(), Box>; /// message_root returns the message root for the unsigned data. fn message_root(&self) -> [u8; 32]; + + /// clone_box returns a boxed clone of the signed data. + fn clone_box(&self) -> Box; + + /// equals checks if two signed data are equal. + fn equals(&self, other: &dyn SignedData) -> bool; } // todo: add Eth2SignedData type @@ -468,21 +471,35 @@ pub trait SignedData: Clone + Serialize + StdDebug { /// ParSignedData is a partially signed duty data only signed by a single /// threshold BLS share. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ParSignedData { +#[derive(Debug)] +pub struct ParSignedData { /// Partially signed duty data. - pub signed_data: T, + pub signed_data: Box, /// Threshold BLS share index. pub share_idx: u64, } -impl ParSignedData -where - T: SignedData, -{ +impl Clone for ParSignedData { + fn clone(&self) -> Self { + Self { + signed_data: self.signed_data.clone_box(), + share_idx: self.share_idx, + } + } +} + +impl PartialEq for ParSignedData { + fn eq(&self, other: &Self) -> bool { + self.share_idx == other.share_idx && self.signed_data.equals(other.signed_data.as_ref()) + } +} + +impl Eq for ParSignedData {} + +impl ParSignedData { /// Create a new partially signed data. - pub fn new(partially_signed_data: T, share_idx: u64) -> Self { + pub fn new(partially_signed_data: Box, share_idx: u64) -> Self { Self { signed_data: partially_signed_data, share_idx, @@ -492,49 +509,51 @@ where /// ParSignedDataSet is a set of partially signed duty data only signed by a /// single threshold BLS share. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ParSignedDataSet(HashMap>); +#[derive(Debug, Clone)] +pub struct ParSignedDataSet(HashMap); -impl Default for ParSignedDataSet -where - T: SignedData, -{ +impl PartialEq for ParSignedDataSet { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} + +impl Eq for ParSignedDataSet {} + +impl Default for ParSignedDataSet { fn default() -> Self { Self(HashMap::default()) } } -impl ParSignedDataSet -where - T: SignedData, -{ +impl ParSignedDataSet { /// Create a new partially signed data set. pub fn new() -> Self { Self::default() } /// Get a partially signed data by public key. - pub fn get(&self, pub_key: &PubKey) -> Option<&ParSignedData> { + pub fn get(&self, pub_key: &PubKey) -> Option<&ParSignedData> { self.inner().get(pub_key) } /// Insert a partially signed data. - pub fn insert(&mut self, pub_key: PubKey, partially_signed_data: ParSignedData) { + pub fn insert(&mut self, pub_key: PubKey, partially_signed_data: ParSignedData) { self.inner_mut().insert(pub_key, partially_signed_data); } /// Remove a partially signed data by public key. - pub fn remove(&mut self, pub_key: &PubKey) -> Option> { + pub fn remove(&mut self, pub_key: &PubKey) -> Option { self.inner_mut().remove(pub_key) } /// Inner partially signed data set. - pub fn inner(&self) -> &HashMap> { + pub fn inner(&self) -> &HashMap { &self.0 } /// Inner partially signed data set. - pub fn inner_mut(&mut self) -> &mut HashMap> { + pub fn inner_mut(&mut self) -> &mut HashMap { &mut self.0 } } @@ -855,31 +874,43 @@ mod tests { struct MockSignedData; impl SignedData for MockSignedData { - type Error = std::io::Error; - fn signature(&self) -> Signature { Signature::new([42u8; SIG_LEN]) } - fn set_signature(&mut self, _signature: Signature) -> Result<(), std::io::Error> { + fn set_signature( + &mut self, + _signature: Signature, + ) -> Result<(), Box> { Ok(()) } fn message_root(&self) -> [u8; 32] { [42u8; 32] } + + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } + + fn equals(&self, _other: &dyn SignedData) -> bool { + // For testing purposes, we consider all MockSignedData instances equal + true + } } #[test] fn test_partially_signed_data_set() { let mut partially_signed_data_set = ParSignedDataSet::new(); - partially_signed_data_set.insert( - PubKey::new([42u8; PK_LEN]), - ParSignedData::new(MockSignedData, 0), - ); + let par_signed = ParSignedData::new(Box::new(MockSignedData), 0); + partially_signed_data_set.insert(PubKey::new([42u8; PK_LEN]), par_signed.clone()); + let retrieved = partially_signed_data_set.get(&PubKey::new([42u8; PK_LEN])); + assert!(retrieved.is_some()); + let retrieved = retrieved.unwrap(); + assert_eq!(retrieved.share_idx, 0); assert_eq!( - partially_signed_data_set.get(&PubKey::new([42u8; PK_LEN])), - Some(&ParSignedData::new(MockSignedData, 0)) + retrieved.signed_data.signature(), + Signature::new([42u8; SIG_LEN]) ); } diff --git a/crates/testutil/Cargo.toml b/crates/testutil/Cargo.toml index 0a973d1f..720bcc86 100644 --- a/crates/testutil/Cargo.toml +++ b/crates/testutil/Cargo.toml @@ -9,6 +9,7 @@ publish.workspace = true [dependencies] k256.workspace = true pluto-crypto.workspace = true +pluto-eth2api.workspace = true rand.workspace = true rand_core.workspace = true thiserror.workspace = true diff --git a/crates/testutil/src/random.rs b/crates/testutil/src/random.rs index 718f59bc..1461aeb1 100644 --- a/crates/testutil/src/random.rs +++ b/crates/testutil/src/random.rs @@ -7,6 +7,11 @@ use k256::{ elliptic_curve::rand_core::{CryptoRng, Error, RngCore}, }; use pluto_crypto::{blst_impl::BlstImpl, tbls::Tbls, types::PrivateKey}; +use pluto_eth2api::types::{ + AltairBeaconStateCurrentJustifiedCheckpoint, ConsensusVersion, Data, + GetAggregatedAttestationV2ResponseResponse, GetAggregatedAttestationV2ResponseResponseData, + GetBlockAttestationsV2ResponseResponseDataArray2, +}; use rand::{Rng, SeedableRng, rngs::StdRng}; /// A deterministic RNG that always returns the same byte value. @@ -67,6 +72,107 @@ pub fn generate_test_bls_key(seed: u64) -> PrivateKey { .expect("deterministic key generation should not fail") } +/// Generates a random BLS signature as a hex string for testing. +/// +/// Returns a 96-byte (192 hex characters) BLS signature encoded as a hex string +/// with "0x" prefix. +pub fn random_eth2_signature() -> String { + let mut bytes = [0u8; 96]; + let mut rng = rand::thread_rng(); + for byte in &mut bytes { + *byte = rng.r#gen(); + } + format!("0x{}", hex::encode(bytes)) +} + +/// Generates a random 32-byte root as a hex string for testing. +/// +/// Returns a 32-byte (64 hex characters) root encoded as a hex string with "0x" prefix. +pub fn random_root() -> String { + let mut bytes = [0u8; 32]; + let mut rng = rand::thread_rng(); + for byte in &mut bytes { + *byte = rng.r#gen(); + } + format!("0x{}", hex::encode(bytes)) +} + +/// Generates a random bitlist as a hex string for testing. +/// +/// # Arguments +/// +/// * `length` - The number of bits to set in the bitlist +/// +/// Returns a hex-encoded bitlist string with "0x" prefix. +pub fn random_bit_list(length: usize) -> String { + // Create a byte array large enough to hold the bits + // For simplicity, use 32 bytes (256 bits) + let mut bytes = [0u8; 32]; + let mut rng = rand::thread_rng(); + + // Set 'length' random bits + for _ in 0..length { + let bit_idx = rng.r#gen::() % 256; + let byte_idx = bit_idx / 8; + let bit_offset = bit_idx % 8; + bytes[byte_idx] |= 1 << bit_offset; + } + + format!("0x{}", hex::encode(bytes)) +} + +/// Generates a random checkpoint for testing. +fn random_checkpoint() -> AltairBeaconStateCurrentJustifiedCheckpoint { + let mut rng = rand::thread_rng(); + AltairBeaconStateCurrentJustifiedCheckpoint { + epoch: rng.r#gen::().to_string(), + root: random_root(), + } +} + +/// Generates random attestation data for Phase 0. +fn random_attestation_data_phase0() -> Data { + let mut rng = rand::thread_rng(); + Data { + slot: rng.r#gen::().to_string(), + index: rng.r#gen::().to_string(), + beacon_block_root: random_root(), + source: random_checkpoint(), + target: random_checkpoint(), + } +} + +/// Generates a random Phase 0 attestation. +/// +/// Returns an attestation with random aggregation bits, attestation data, and signature. +pub fn random_phase0_attestation() -> GetBlockAttestationsV2ResponseResponseDataArray2 { + GetBlockAttestationsV2ResponseResponseDataArray2 { + aggregation_bits: random_bit_list(1), + data: random_attestation_data_phase0(), + signature: random_eth2_signature(), + } +} + +/// Generates a random Deneb versioned attestation. +/// +/// Returns a versioned attestation containing a Phase 0 attestation with the Deneb version tag. +/// This matches the Go implementation: +/// +/// ```go +/// func RandomDenebVersionedAttestation() *eth2spec.VersionedAttestation { +/// return ð2spec.VersionedAttestation{ +/// Version: eth2spec.DataVersionDeneb, +/// Deneb: RandomPhase0Attestation(), +/// } +/// } +/// ``` +pub fn random_deneb_versioned_attestation() -> GetAggregatedAttestationV2ResponseResponse { + GetAggregatedAttestationV2ResponseResponse { + version: ConsensusVersion::Deneb, + data: GetAggregatedAttestationV2ResponseResponseData::Object2(random_phase0_attestation()), + } +} + #[cfg(test)] mod tests { use super::*; @@ -143,4 +249,90 @@ mod tests { "Different seeds should produce different BLS keys" ); } + + #[test] + fn test_random_eth2_signature() { + let sig1 = random_eth2_signature(); + let sig2 = random_eth2_signature(); + + // Check format + assert!(sig1.starts_with("0x")); + // 96 bytes = 192 hex chars + "0x" prefix = 194 total + assert_eq!(sig1.len(), 194); + + // Different calls should produce different signatures + assert_ne!(sig1, sig2); + } + + #[test] + fn test_random_root() { + let root1 = random_root(); + let root2 = random_root(); + + // Check format + assert!(root1.starts_with("0x")); + // 32 bytes = 64 hex chars + "0x" prefix = 66 total + assert_eq!(root1.len(), 66); + + // Different calls should produce different roots + assert_ne!(root1, root2); + } + + #[test] + fn test_random_bit_list() { + let bitlist = random_bit_list(5); + + // Check format + assert!(bitlist.starts_with("0x")); + // 32 bytes = 64 hex chars + "0x" prefix = 66 total + assert_eq!(bitlist.len(), 66); + } + + #[test] + fn test_random_phase0_attestation() { + let att = random_phase0_attestation(); + + // Check that all fields are populated + assert!(att.aggregation_bits.starts_with("0x")); + assert!(att.signature.starts_with("0x")); + assert!(att.data.beacon_block_root.starts_with("0x")); + assert!(!att.data.slot.is_empty()); + assert!(!att.data.index.is_empty()); + } + + #[test] + fn test_random_deneb_versioned_attestation() { + let versioned_att = random_deneb_versioned_attestation(); + + // Check version is Deneb + assert!(matches!(versioned_att.version, ConsensusVersion::Deneb)); + + // Check that data is populated + match versioned_att.data { + GetAggregatedAttestationV2ResponseResponseData::Object2(att) => { + assert!(att.aggregation_bits.starts_with("0x")); + assert!(att.signature.starts_with("0x")); + } + _ => panic!("Expected Object2 variant"), + } + } + + #[test] + fn test_random_deneb_versioned_attestation_different() { + let att1 = random_deneb_versioned_attestation(); + let att2 = random_deneb_versioned_attestation(); + + // Different calls should produce different attestations + // Check signatures are different + let sig1 = match &att1.data { + GetAggregatedAttestationV2ResponseResponseData::Object2(a) => &a.signature, + _ => panic!("Expected Object2"), + }; + let sig2 = match &att2.data { + GetAggregatedAttestationV2ResponseResponseData::Object2(a) => &a.signature, + _ => panic!("Expected Object2"), + }; + + assert_ne!(sig1, sig2); + } } From 13de57a98dd18d60a9edec7d10e7e9184bba4138 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> Date: Tue, 17 Mar 2026 12:50:13 +0100 Subject: [PATCH 2/5] feat: add rust docs --- crates/core/src/parasigdb/memory.rs | 164 +++++++++++++++--- .../src/parasigdb/memory_internal_test.rs | 54 +++++- 2 files changed, 189 insertions(+), 29 deletions(-) diff --git a/crates/core/src/parasigdb/memory.rs b/crates/core/src/parasigdb/memory.rs index dd1e0f68..9de14c97 100644 --- a/crates/core/src/parasigdb/memory.rs +++ b/crates/core/src/parasigdb/memory.rs @@ -1,5 +1,4 @@ -#![allow(missing_docs)] -use std::{collections::HashMap, pin::Pin, sync::Arc}; +use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; use tracing::{debug, warn}; @@ -29,14 +28,22 @@ impl MemDBMetadata { } } -pub type InternalSub = Box< +/// Subscriber callback for internally generated partial signed data. +/// +/// Called when the node generates partial signed data that needs to be +/// exchanged with peers. +pub type InternalSub = Arc< dyn Fn(&Duty, &ParSignedDataSet) -> Pin> + Send + Sync>> + Send + Sync + 'static, >; -pub type ThreshSub = Box< +/// Subscriber callback for threshold-reached partial signed data. +/// +/// Called when enough matching partial signatures have been collected +/// to meet the threshold requirement. +pub type ThreshSub = Arc< dyn Fn( &Duty, &HashMap>, @@ -46,20 +53,95 @@ pub type ThreshSub = Box< + 'static, >; +/// Helper to create an internal subscriber from a closure. +/// +/// The closure receives owned copies of the duty and data set. Since the closure +/// is `Fn` (can be called multiple times), you need to clone any captured Arc values +/// before the `async move` block. +/// +/// # Example +/// ```ignore +/// let counter = Arc::new(Mutex::new(0)); +/// let sub = internal_subscriber({ +/// let counter = counter.clone(); +/// move |_duty, _set| { +/// let counter = counter.clone(); +/// async move { +/// *counter.lock().await += 1; +/// Ok(()) +/// } +/// } +/// }); +/// db.subscribe_internal(sub).await?; +/// ``` +pub fn internal_subscriber(f: F) -> InternalSub +where + F: Fn(Duty, ParSignedDataSet) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + Sync + 'static, +{ + Arc::new(move |duty, set| { + let fut = f(duty.clone(), set.clone()); + Box::pin(fut) + }) +} + +/// Helper to create a threshold subscriber from a closure. +/// +/// The closure receives owned copies of the duty and data. Since the closure +/// is `Fn` (can be called multiple times), you need to clone any captured Arc values +/// before the `async move` block. +/// +/// # Example +/// ```ignore +/// let counter = Arc::new(Mutex::new(0)); +/// let sub = threshold_subscriber({ +/// let counter = counter.clone(); +/// move |_duty, _data| { +/// let counter = counter.clone(); +/// async move { +/// *counter.lock().await += 1; +/// Ok(()) +/// } +/// } +/// }); +/// db.subscribe_threshold(sub).await?; +/// ``` +pub fn threshold_subscriber(f: F) -> ThreshSub +where + F: Fn(Duty, HashMap>) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + Sync + 'static, +{ + Arc::new(move |duty, data| { + let fut = f(duty.clone(), data.clone()); + Box::pin(fut) + }) +} + +/// Error type for the memory ParSigDB. #[derive(Debug, thiserror::Error)] pub enum MemDBError { + /// Mismatching partial signed data. #[error("mismatching partial signed data: pubkey {pubkey}, share_idx {share_idx}")] - ParsigDataMismatch { pubkey: PubKey, share_idx: u64 }, + ParsigDataMismatch { + /// Public key of the validator + pubkey: PubKey, + /// Share index of the mismatched signature + share_idx: u64, + }, } type Result = std::result::Result; +/// Key for indexing partial signed data in the database. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Key { + /// The duty this partial signature is for pub duty: Duty, + /// The public key of the validator pub pub_key: PubKey, } +/// Internal state of the in-memory partial signature database. pub struct MemDBInner { internal_subs: Vec, thresh_subs: Vec, @@ -68,6 +150,10 @@ pub struct MemDBInner { keys_by_duty: HashMap>, } +/// In-memory partial signature database. +/// +/// Stores partial signed data from cluster nodes and triggers callbacks +/// when threshold is reached or when internal data is generated. pub struct MemDB { ct: CancellationToken, inner: Arc>, @@ -76,6 +162,12 @@ pub struct MemDB { } impl MemDB { + /// Creates a new in-memory partial signature database. + /// + /// # Arguments + /// * `ct` - Cancellation token for graceful shutdown + /// * `threshold` - Number of matching partial signatures required + /// * `deadliner` - Deadliner for managing duty expiration pub fn new(ct: CancellationToken, threshold: u64, deadliner: Arc) -> Self { Self { ct, @@ -92,30 +184,53 @@ impl MemDB { } impl MemDB { + /// Registers a subscriber for internally generated partial signed data. + /// + /// The subscriber will be called when the node generates partial signed data + /// that needs to be exchanged with peers. pub async fn subscribe_internal(&self, sub: InternalSub) -> Result<()> { let mut inner = self.inner.lock().await; inner.internal_subs.push(sub); Ok(()) } + /// Registers a subscriber for threshold-reached partial signed data. + /// + /// The subscriber will be called when enough matching partial signatures + /// have been collected to meet the threshold requirement. pub async fn subscribe_threshold(&self, sub: ThreshSub) -> Result<()> { let mut inner = self.inner.lock().await; inner.thresh_subs.push(sub); Ok(()) } + /// Stores internally generated partial signed data and notifies subscribers. + /// + /// This is called when the node generates partial signed data that needs to be + /// stored and exchanged with peers. It first stores the data (via `store_external`), + /// then calls all internal subscribers to trigger peer exchange. pub async fn store_internal(&self, duty: &Duty, signed_set: &ParSignedDataSet) -> Result<()> { - let _ = self.store_external(duty, signed_set).await?; + self.store_external(duty, signed_set).await?; - let inner = self.inner.lock().await; - for sub in &inner.internal_subs { - sub(&duty, &signed_set).await?; + // Collect subscribers first, then release lock before calling them + let subs = { + let inner = self.inner.lock().await; + inner.internal_subs.clone() + }; + + // Call subscribers without holding lock + for sub in &subs { + sub(duty, signed_set).await?; } - drop(inner); Ok(()) } + /// Stores externally received partial signed data and checks for threshold. + /// + /// This is called when the node receives partial signed data from peers. It stores + /// the data, checks if enough matching signatures have been collected to meet the + /// threshold, and calls threshold subscribers when the threshold is reached. pub async fn store_external(&self, duty: &Duty, signed_data: &ParSignedDataSet) -> Result<()> { let _ = self.deadliner.add(duty.clone()).await; @@ -126,7 +241,7 @@ impl MemDB { .store( Key { duty: duty.clone(), - pub_key: pub_key.clone(), + pub_key: *pub_key, }, par_signed.clone(), ) @@ -144,30 +259,37 @@ impl MemDB { continue; }; - output.insert(pub_key.clone(), psigs); + output.insert(*pub_key, psigs); } if output.is_empty() { return Ok(()); } - let inner = self.inner.lock().await; - for sub in inner.thresh_subs.iter() { - sub(&duty, &output).await?; + // Collect subscribers first, then release lock before calling them + let subs = { + let inner = self.inner.lock().await; + inner.thresh_subs.clone() + }; + + // Call subscribers without holding lock + for sub in &subs { + sub(duty, &output).await?; } - drop(inner); Ok(()) } + /// Trims expired duties from the database. + /// + /// This method runs in a loop, listening for expired duties from the deadliner + /// and removing their associated data from the database. It should be spawned + /// as a background task and will run until the cancellation token is triggered. pub async fn trim(&self) { - let deadliner_rx = self.deadliner.c(); - if deadliner_rx.is_none() { + let Some(mut deadliner_rx) = self.deadliner.c() else { warn!("Deadliner channel is not available"); return; - } - - let mut deadliner_rx = deadliner_rx.unwrap(); + }; loop { tokio::select! { diff --git a/crates/core/src/parasigdb/memory_internal_test.rs b/crates/core/src/parasigdb/memory_internal_test.rs index 4761f223..d4727719 100644 --- a/crates/core/src/parasigdb/memory_internal_test.rs +++ b/crates/core/src/parasigdb/memory_internal_test.rs @@ -9,7 +9,9 @@ use tokio_util::sync::CancellationToken; use super::get_threshold_matching; use crate::{ - parasigdb::memory::{MemDB, MemDBMetadata}, testutils, types::{Duty, DutyType, ParSignedData, Signature, SignedData, SlotNumber} + parasigdb::memory::MemDB, + testutils, + types::{Duty, DutyType, ParSignedData, Signature, SignedData, SlotNumber}, }; /// Test wrapper for SyncCommitteeMessage (mimics altair.SyncCommitteeMessage). @@ -197,7 +199,6 @@ use pluto_testutil::random as tu_random; #[tokio::test] async fn test_mem_db_threshold() { const THRESHOLD: u64 = 7; - const N: u64 = 10; let deadliner = TestDeadliner::new(); let ct = CancellationToken::new(); @@ -211,22 +212,59 @@ async fn test_mem_db_threshold() { let times_called = Arc::new(Mutex::new(0)); - db.subscribe_threshold(Box::new({ + // Using the helper function + // Note: We need to clone inside because the outer closure is Fn (not FnOnce), + // so it can be called multiple times + db.subscribe_threshold(super::threshold_subscriber({ let times_called = times_called.clone(); - move |_duty, _output| { + move |_duty, _data| { let times_called = times_called.clone(); - Box::pin(async move { + async move { *times_called.lock().await += 1; Ok(()) - }) + } } })) .await .unwrap(); - let pubkey = testutils::random_core_pub_key(); - let att = tu_random::random_deneb_versioned_attestation(); + let _pubkey = testutils::random_core_pub_key(); + let _att = tu_random::random_deneb_versioned_attestation(); +} + +/// Test using the helper function for internal subscriber. +#[tokio::test] +async fn test_mem_db_with_internal_helper() { + const THRESHOLD: u64 = 7; + + let deadliner = TestDeadliner::new(); + let ct = CancellationToken::new(); + + let db = Arc::new(MemDB::new(ct.child_token(), THRESHOLD, deadliner.clone())); + + let db_clone = db.clone(); + tokio::spawn(async move { + db_clone.trim().await; + }); + + let counter = Arc::new(Mutex::new(0u64)); + + // Using the helper function + // Note: We need to clone inside because the outer closure is Fn (not FnOnce) + db.subscribe_internal(super::internal_subscriber({ + let counter = counter.clone(); + move |_duty, _set| { + let counter = counter.clone(); + async move { + *counter.lock().await += 1; + Ok(()) + } + } + })) + .await + .unwrap(); + assert_eq!(*counter.lock().await, 0); } /// Test deadliner for unit tests. From fc85e63637ad0de75f77eb1ce2e4e0eacd067415 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> Date: Tue, 17 Mar 2026 22:57:58 +0100 Subject: [PATCH 3/5] feat: add clone box and clone eq --- Cargo.lock | 8 ++++++++ Cargo.toml | 2 ++ crates/core/Cargo.toml | 2 ++ crates/core/src/signeddata.rs | 3 +++ crates/core/src/types.rs | 7 ++++++- 5 files changed, 21 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 2275275a..7b2cda4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2375,6 +2375,12 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" +[[package]] +name = "dyn-eq" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c2d035d21af5cde1a6f5c7b444a5bf963520a9f142e5d06931178433d7d5388" + [[package]] name = "ecdsa" version = "0.16.9" @@ -5518,6 +5524,8 @@ dependencies = [ "cancellation", "chrono", "crossbeam", + "dyn-clone", + "dyn-eq", "hex", "libp2p", "pluto-build-proto", diff --git a/Cargo.toml b/Cargo.toml index 93eddca0..c263399f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,8 @@ cancellation = "0.1.0" chrono = { version = "0.4", features = ["serde"] } clap = { version = "4.5.53", features = ["derive", "env", "cargo"] } crossbeam = "0.8.4" +dyn-clone = "1.0" +dyn-eq = "0.1.3" either = "1.13" futures = "0.3" futures-timer = "3.0" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index ea060e56..72609a4c 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -10,6 +10,8 @@ publish.workspace = true cancellation.workspace = true chrono.workspace = true crossbeam.workspace = true +dyn-clone.workspace = true +dyn-eq.workspace = true hex.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/crates/core/src/signeddata.rs b/crates/core/src/signeddata.rs index 09865daf..27cd676a 100644 --- a/crates/core/src/signeddata.rs +++ b/crates/core/src/signeddata.rs @@ -48,6 +48,9 @@ pub enum SignedDataError { /// Invalid attestation wrapper JSON. #[error("unmarshal attestation")] AttestationJson, + /// Custom error. + #[error("{0}")] + Custom(Box), } fn hash_root(value: &T) -> [u8; 32] { diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 2d0f3b37..5f66e7f4 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -3,6 +3,8 @@ use std::{collections::HashMap, fmt::Display, iter}; use chrono::{DateTime, Duration, Utc}; +use dyn_clone::DynClone; +use dyn_eq::DynEq; use serde::{Deserialize, Serialize}; use std::fmt::Debug as StdDebug; @@ -448,7 +450,7 @@ impl AsRef<[u8; SIG_LEN]> for Signature { } /// Signed data type -pub trait SignedData: Clone + Serialize + StdDebug { +pub trait SignedData: DynClone + DynEq + StdDebug { /// The error type type Error: std::error::Error; @@ -464,6 +466,9 @@ pub trait SignedData: Clone + Serialize + StdDebug { fn message_root(&self) -> Result<[u8; 32], Self::Error>; } +dyn_eq::eq_trait_object!(SignedData); +dyn_clone::clone_trait_object!(SignedData); + // todo: add Eth2SignedData type // https://github.com/ObolNetwork/charon/blob/b3008103c5429b031b63518195f4c49db4e9a68d/core/types.go#L396 From 288f63cabc3c36921924edbe97172a3cd6d3a15e Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> Date: Wed, 18 Mar 2026 17:22:26 +0100 Subject: [PATCH 4/5] feat: finish tests --- .../src/parasigdb/memory_internal_test.rs | 383 +++++++----------- crates/testutil/src/lib.rs | 5 + crates/testutil/src/random.rs | 90 +++- 3 files changed, 223 insertions(+), 255 deletions(-) diff --git a/crates/core/src/parasigdb/memory_internal_test.rs b/crates/core/src/parasigdb/memory_internal_test.rs index c36c7f58..58f4c474 100644 --- a/crates/core/src/parasigdb/memory_internal_test.rs +++ b/crates/core/src/parasigdb/memory_internal_test.rs @@ -1,203 +1,122 @@ -//! Internal tests for memory ParSigDB. -//! Mirrors the structure of charon/core/parsigdb/memory_internal_test.go - -use std::sync::Arc; +use std::{ + sync::{Arc, Mutex as StdMutex}, + time::Duration, +}; +use futures::future::{BoxFuture, FutureExt}; +use pluto_eth2api::{spec::altair, v1}; +use pluto_testutil as testutil; use test_case::test_case; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, mpsc}; use tokio_util::sync::CancellationToken; -use super::get_threshold_matching; +use super::{MemDB, get_threshold_matching, threshold_subscriber}; use crate::{ - parasigdb::memory::MemDB, - signeddata, testutils, - types::{self, Duty, DutyType, ParSignedData, Signature, SignedData, SlotNumber}, + deadline::Deadliner, + signeddata::{BeaconCommitteeSelection, SignedSyncMessage, VersionedAttestation}, + testutils::random_core_pub_key, + types::{Duty, DutyType, ParSignedData, ParSignedDataSet, SlotNumber}, }; -/// Test wrapper for SyncCommitteeMessage (mimics altair.SyncCommitteeMessage). -/// The message root is the BeaconBlockRoot field. -#[derive(Debug, Clone, PartialEq, Eq)] -#[allow(dead_code)] -struct TestSyncCommitteeMessage { - slot: SlotNumber, - beacon_block_root: [u8; 32], - validator_index: u64, - signature: Signature, +fn threshold(nodes: usize) -> u64 { + (2_u64 + .checked_mul(u64::try_from(nodes).expect("nodes overflow")) + .expect("nodes overflow")) + .div_ceil(3) } -impl SignedData for TestSyncCommitteeMessage { - fn signature(&self) -> Result { - Ok(self.signature.clone()) - } - - fn set_signature(&self, signature: Signature) -> Result { - let mut out = self.clone(); - out.signature = signature; - Ok(out) - } - - fn message_root(&self) -> Result<[u8; 32], signeddata::SignedDataError> { - // For SyncCommitteeMessage, the message root is the BeaconBlockRoot - Ok(self.beacon_block_root) - } -} - -/// Test wrapper for BeaconCommitteeSelection (mimics -/// eth2v1.BeaconCommitteeSelection). The message root is computed from the Slot -/// field. -#[derive(Debug, Clone, PartialEq, Eq)] -#[allow(dead_code)] -struct TestBeaconCommitteeSelection { - validator_index: u64, - slot: SlotNumber, - selection_proof: Signature, -} - -impl SignedData for TestBeaconCommitteeSelection { - fn signature(&self) -> Result { - Ok(self.selection_proof.clone()) - } - - fn set_signature(&self, signature: Signature) -> Result { - let mut out = self.clone(); - out.selection_proof = signature; - Ok(out) - } - - fn message_root(&self) -> Result<[u8; 32], signeddata::SignedDataError> { - // For BeaconCommitteeSelection, the message root is derived from the slot. - // We'll use a simple hash: slot number in the first 8 bytes. - let mut root = [0u8; 32]; - root[0..8].copy_from_slice(&self.slot.inner().to_le_bytes()); - Ok(root) - } -} - -/// Helper to create random roots for testing -fn random_root(seed: u8) -> [u8; 32] { - let mut root = [0u8; 32]; - root[0] = seed; - root -} - -/// Helper to create random signature for testing -fn random_signature(seed: u8) -> Signature { - let mut sig = [0u8; 96]; - sig[0] = seed; - Signature::new(sig) -} - -/// Copying function here, not using the pluto_cluster::helpers::threshold (not -/// implemented yet) because it would be huge unnecessary dependency for core. -#[allow(clippy::arithmetic_side_effects, clippy::manual_div_ceil)] -fn threshold(n: u64) -> u64 { - (2 * n + 2) / 3 -} - -// Test cases for get_threshold_matching -// Matches Go test structure from -// memory_internal_test.go:TestGetThresholdMatching -#[test_case(vec![], None ; "empty")] -#[test_case(vec![0, 0, 0], Some(vec![0, 1, 2]) ; "all identical exact threshold")] -#[test_case(vec![0, 0, 0, 0], None ; "all identical above threshold")] -#[test_case(vec![0, 0, 1, 0], Some(vec![0, 1, 3]) ; "one odd")] -#[test_case(vec![0, 0, 1, 1], None ; "two odd")] +#[test_case(Vec::new(), Vec::new() ; "empty")] +#[test_case(vec![0, 0, 0], vec![0, 1, 2] ; "all identical exact threshold")] +#[test_case(vec![0, 0, 0, 0], Vec::new() ; "all identical above threshold")] +#[test_case(vec![0, 0, 1, 0], vec![0, 1, 3] ; "one odd")] +#[test_case(vec![0, 0, 1, 1], Vec::new() ; "two odd")] #[tokio::test] -async fn test_get_threshold_matching(input: Vec, output: Option>) { - const N: u64 = 4; +async fn test_get_threshold_matching(input: Vec, output: Vec) { + const N: usize = 4; - let slot = SlotNumber::new(123456); - let val_idx = 42u64; + let slot = testutil::random_slot(); + let validator_index = testutil::random_v_idx(); + let roots = [testutil::random_root_bytes(), testutil::random_root_bytes()]; + let threshold = threshold(N); - // Two different roots to vary message roots - let roots = [random_root(1), random_root(2)]; + type Providers<'a> = [(&'a str, Box ParSignedData + 'a>); 2]; - // Test different message types using providers (matches Go approach) - #[allow(clippy::type_complexity)] - let providers: Vec<(&str, Box Box>)> = vec![ + let providers: Providers<'_> = [ ( - "SyncCommitteeMessage", - Box::new(|i: usize| { - Box::new(TestSyncCommitteeMessage { + "sync_committee_message", + Box::new(|i| { + let message = altair::SyncCommitteeMessage { slot, - beacon_block_root: roots[input[i]], // Vary root based on input - validator_index: val_idx, - signature: random_signature(u8::try_from(i).unwrap()), - }) + beacon_block_root: roots[input[i]], + validator_index, + signature: testutil::random_eth2_signature_bytes(), + }; + + SignedSyncMessage::new_partial(message, u64::try_from(i.wrapping_add(1)).unwrap()) }), ), ( - "Selection", - Box::new(|i: usize| { - Box::new(TestBeaconCommitteeSelection { - validator_index: val_idx, - slot: SlotNumber::new(u64::try_from(input[i]).unwrap()), /* Vary slot based - * on input */ - selection_proof: random_signature(u8::try_from(i).unwrap()), - }) + "selection", + Box::new(|i| { + let selection = v1::BeaconCommitteeSelection { + validator_index, + slot: u64::try_from(input[i]).unwrap(), + selection_proof: testutil::random_eth2_signature_bytes(), + }; + + BeaconCommitteeSelection::new_partial( + selection, + u64::try_from(i.wrapping_add(1)).unwrap(), + ) }), ), ]; - for (_, provider) in providers { - let mut par_sigs: Vec = Vec::new(); + for (name, provider) in providers { + let mut data = Vec::new(); for i in 0..input.len() { - let signed_data = provider(i); - let par_signed = ParSignedData::new_boxed(signed_data, i as u64); - par_sigs.push(par_signed); + data.push(provider(i)); } - let th = threshold(N); - - let result = get_threshold_matching(&DutyType::Attester, &par_sigs, th) + let out = get_threshold_matching(&DutyType::SyncMessage, &data, threshold) .await - .expect("get_threshold_matching should not error"); - - // Check that if we got a result, it has the correct length (matches Go's ok - // check) - if let Some(ref vec) = result { - assert_eq!( - vec.len(), - usize::try_from(th).unwrap(), - "result length should match threshold" - ); - } - - let out = result.unwrap_or_default(); - - let mut expect = Vec::new(); - if let Some(output) = &output { - for &idx in output { - expect.push(par_sigs[idx].clone()); - } - } - - assert_eq!(out, expect, "result should match expected"); + .expect("threshold matching should succeed"); + let expect: Vec<_> = output.iter().map(|idx| data[*idx].clone()).collect(); + let expected_out = if expect.is_empty() { + None + } else { + Some(expect.clone()) + }; + + assert_eq!(expected_out, out, "{name}/output mismatch"); + assert_eq!( + out.as_ref() + .map(|matches| u64::try_from(matches.len()).unwrap() == threshold) + .unwrap_or(false), + expect.len() as u64 == threshold, + "{name}/ok mismatch" + ); } } -use pluto_testutil::random as tu_random; - #[tokio::test] -async fn test_mem_db_threshold() { +async fn test_memdb_threshold() { const THRESHOLD: u64 = 7; + const N: usize = 10; - let deadliner = TestDeadliner::new(); - let ct = CancellationToken::new(); - - let db = Arc::new(MemDB::new(ct.child_token(), THRESHOLD, deadliner.clone())); + let deadliner = Arc::new(TestDeadliner::new()); + let cancel = CancellationToken::new(); + let db = Arc::new(MemDB::new(cancel.clone(), THRESHOLD, deadliner.clone())); - let db_clone = db.clone(); - tokio::spawn(async move { - db_clone.trim().await; + let trim_handle = tokio::spawn({ + let db = db.clone(); + async move { + db.trim().await; + } }); - let times_called = Arc::new(Mutex::new(0)); - - // Using the helper function - // Note: We need to clone inside because the outer closure is Fn (not FnOnce), - // so it can be called multiple times - db.subscribe_threshold(super::threshold_subscriber({ + let times_called = Arc::new(Mutex::new(0usize)); + db.subscribe_threshold(threshold_subscriber({ let times_called = times_called.clone(); move |_duty, _data| { let times_called = times_called.clone(); @@ -208,95 +127,89 @@ async fn test_mem_db_threshold() { } })) .await - .unwrap(); - - let _pubkey = testutils::random_core_pub_key(); - let _att = tu_random::random_deneb_versioned_attestation(); -} - -/// Test using the helper function for internal subscriber. -#[tokio::test] -async fn test_mem_db_with_internal_helper() { - const THRESHOLD: u64 = 7; - - let deadliner = TestDeadliner::new(); - let ct = CancellationToken::new(); - - let db = Arc::new(MemDB::new(ct.child_token(), THRESHOLD, deadliner.clone())); + .expect("subscription should succeed"); + + let pubkey = random_core_pub_key(); + let attestation = testutil::random_deneb_versioned_attestation(); + let duty = Duty::new_attester_duty(SlotNumber::new(123)); + + let enqueue_n = || async { + for i in 0..N { + let partial = VersionedAttestation::new_partial( + attestation.clone(), + u64::try_from(i + 1).unwrap(), + ) + .expect("versioned attestation should be valid"); + + let mut set = ParSignedDataSet::new(); + set.insert(pubkey, partial); + + db.store_external(&duty, &set) + .await + .expect("store_external should succeed"); + } + }; - let db_clone = db.clone(); - tokio::spawn(async move { - db_clone.trim().await; - }); + enqueue_n().await; + assert_eq!(1, *times_called.lock().await); - let counter = Arc::new(Mutex::new(0u64)); + deadliner.expire().await; + tokio::time::sleep(Duration::from_millis(20)).await; - // Using the helper function - // Note: We need to clone inside because the outer closure is Fn (not FnOnce) - db.subscribe_internal(super::internal_subscriber({ - let counter = counter.clone(); - move |_duty, _set| { - let counter = counter.clone(); - async move { - *counter.lock().await += 1; - Ok(()) - } - } - })) - .await - .unwrap(); + enqueue_n().await; + assert_eq!(2, *times_called.lock().await); - assert_eq!(*counter.lock().await, 0); + cancel.cancel(); + trim_handle + .await + .expect("trim task should shut down cleanly"); } -/// Test deadliner for unit tests. -pub struct TestDeadliner { - added: Arc>>, - ch_tx: tokio::sync::mpsc::Sender, - ch_rx: Arc>>>, +struct TestDeadliner { + added: StdMutex>, + tx: mpsc::Sender, + rx: StdMutex>>, } impl TestDeadliner { - /// Creates a new test deadliner. - #[allow(dead_code)] - pub fn new() -> Arc { - const CHANNEL_BUFFER: usize = 100; - let (tx, rx) = tokio::sync::mpsc::channel(CHANNEL_BUFFER); - Arc::new(Self { - added: Arc::new(tokio::sync::Mutex::new(Vec::new())), - ch_tx: tx, - ch_rx: Arc::new(tokio::sync::Mutex::new(Some(rx))), - }) + fn new() -> Self { + let (tx, rx) = mpsc::channel(32); + Self { + added: StdMutex::new(Vec::new()), + tx, + rx: StdMutex::new(Some(rx)), + } } - /// Expires all added duties. - #[allow(dead_code)] - pub async fn expire(&self) -> bool { - let mut added = self.added.lock().await; - for duty in added.drain(..) { - if self.ch_tx.send(duty).await.is_err() { + async fn expire(&self) -> bool { + let duties = { + let mut added = self.added.lock().expect("test deadliner lock poisoned"); + std::mem::take(&mut *added) + }; + + for duty in duties { + if self.tx.send(duty).await.is_err() { return false; } } - // Send dummy duty to ensure all piped duties above were processed - self.ch_tx - .send(Duty::new(SlotNumber::new(0), DutyType::Unknown)) - .await - .is_ok() + + true } } -impl crate::deadline::Deadliner for TestDeadliner { - fn add(&self, duty: Duty) -> futures::future::BoxFuture<'_, bool> { - Box::pin(async move { - let mut added = self.added.lock().await; - added.push(duty); +impl Deadliner for TestDeadliner { + fn add(&self, duty: Duty) -> BoxFuture<'_, bool> { + async move { + self.added + .lock() + .expect("test deadliner lock poisoned") + .push(duty); true - }) + } + .boxed() } - fn c(&self) -> Option> { - let mut guard = self.ch_rx.blocking_lock(); - guard.take() + fn c(&self) -> Option> { + self.rx.lock().expect("test deadliner lock poisoned").take() } } diff --git a/crates/testutil/src/lib.rs b/crates/testutil/src/lib.rs index abc00e7a..686c8c7a 100644 --- a/crates/testutil/src/lib.rs +++ b/crates/testutil/src/lib.rs @@ -6,3 +6,8 @@ /// Random utilities. pub mod random; + +pub use random::{ + random_deneb_versioned_attestation, random_eth2_signature, random_eth2_signature_bytes, + random_root, random_root_bytes, random_slot, random_v_idx, +}; diff --git a/crates/testutil/src/random.rs b/crates/testutil/src/random.rs index a1cd8e44..8e4a8eeb 100644 --- a/crates/testutil/src/random.rs +++ b/crates/testutil/src/random.rs @@ -7,10 +7,13 @@ use k256::{ elliptic_curve::rand_core::{CryptoRng, Error, RngCore}, }; use pluto_crypto::{blst_impl::BlstImpl, tbls::Tbls, types::PrivateKey}; -use pluto_eth2api::types::{ - AltairBeaconStateCurrentJustifiedCheckpoint, ConsensusVersion, Data, - GetAggregatedAttestationV2ResponseResponse, GetAggregatedAttestationV2ResponseResponseData, - GetBlockAttestationsV2ResponseResponseDataArray2, +use pluto_eth2api::{ + spec::phase0, + types::{ + AltairBeaconStateCurrentJustifiedCheckpoint, Data, + GetBlockAttestationsV2ResponseResponseDataArray2, + }, + versioned::{self, AttestationPayload}, }; use rand::{Rng, SeedableRng, rngs::StdRng}; @@ -85,6 +88,13 @@ pub fn random_eth2_signature() -> String { format!("0x{}", hex::encode(bytes)) } +/// Generates a random Ethereum consensus signature for testing. +pub fn random_eth2_signature_bytes() -> phase0::BLSSignature { + let mut signature = [0u8; 96]; + rand::thread_rng().fill(&mut signature[..]); + signature +} + /// Generate random Ethereum address for testing. pub fn random_eth_address(rand: &mut impl Rng) -> [u8; 20] { let mut bytes = [0u8; 20]; @@ -105,6 +115,23 @@ pub fn random_root() -> String { format!("0x{}", hex::encode(bytes)) } +/// Generates a random Ethereum consensus root for testing. +pub fn random_root_bytes() -> phase0::Root { + let mut root = [0u8; 32]; + rand::thread_rng().fill(&mut root); + root +} + +/// Generates a random slot for testing. +pub fn random_slot() -> phase0::Slot { + rand::thread_rng().r#gen() +} + +/// Generates a random validator index for testing. +pub fn random_v_idx() -> phase0::ValidatorIndex { + rand::thread_rng().r#gen() +} + /// Generates a random bitlist as a hex string for testing. /// /// # Arguments @@ -175,10 +202,31 @@ pub fn random_phase0_attestation() -> GetBlockAttestationsV2ResponseResponseData /// } /// } /// ``` -pub fn random_deneb_versioned_attestation() -> GetAggregatedAttestationV2ResponseResponse { - GetAggregatedAttestationV2ResponseResponse { - version: ConsensusVersion::Deneb, - data: GetAggregatedAttestationV2ResponseResponseData::Object2(random_phase0_attestation()), +pub fn random_deneb_versioned_attestation() -> versioned::VersionedAttestation { + let mut rng = rand::thread_rng(); + + let attestation = phase0::Attestation { + aggregation_bits: phase0::BitList::default(), + data: phase0::AttestationData { + slot: rng.r#gen(), + index: rng.r#gen(), + beacon_block_root: random_root_bytes(), + source: phase0::Checkpoint { + epoch: rng.r#gen(), + root: random_root_bytes(), + }, + target: phase0::Checkpoint { + epoch: rng.r#gen(), + root: random_root_bytes(), + }, + }, + signature: random_eth2_signature_bytes(), + }; + + versioned::VersionedAttestation { + version: versioned::DataVersion::Deneb, + validator_index: Some(rng.r#gen()), + attestation: Some(AttestationPayload::Deneb(attestation)), } } @@ -314,15 +362,17 @@ mod tests { let versioned_att = random_deneb_versioned_attestation(); // Check version is Deneb - assert!(matches!(versioned_att.version, ConsensusVersion::Deneb)); + assert!(matches!( + versioned_att.version, + versioned::DataVersion::Deneb + )); // Check that data is populated - match versioned_att.data { - GetAggregatedAttestationV2ResponseResponseData::Object2(att) => { - assert!(att.aggregation_bits.starts_with("0x")); - assert!(att.signature.starts_with("0x")); + match versioned_att.attestation { + Some(AttestationPayload::Deneb(att)) => { + assert_eq!(att.signature.len(), 96); } - _ => panic!("Expected Object2 variant"), + _ => panic!("Expected Deneb attestation"), } } @@ -333,13 +383,13 @@ mod tests { // Different calls should produce different attestations // Check signatures are different - let sig1 = match &att1.data { - GetAggregatedAttestationV2ResponseResponseData::Object2(a) => &a.signature, - _ => panic!("Expected Object2"), + let sig1 = match &att1.attestation { + Some(AttestationPayload::Deneb(a)) => &a.signature, + _ => panic!("Expected Deneb attestation"), }; - let sig2 = match &att2.data { - GetAggregatedAttestationV2ResponseResponseData::Object2(a) => &a.signature, - _ => panic!("Expected Object2"), + let sig2 = match &att2.attestation { + Some(AttestationPayload::Deneb(a)) => &a.signature, + _ => panic!("Expected Deneb attestation"), }; assert_ne!(sig1, sig2); From a4c4bb21af3ba6266f907e3ee73b6f953059a969 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> Date: Thu, 19 Mar 2026 16:37:43 +0100 Subject: [PATCH 5/5] fix: typo --- crates/core/src/lib.rs | 4 ++-- crates/core/src/{parasigdb => parsigdb}/memory.rs | 4 ++-- .../core/src/{parasigdb => parsigdb}/memory_internal_test.rs | 0 crates/core/src/{parasigdb => parsigdb}/metrics.rs | 4 ++-- crates/core/src/{parasigdb => parsigdb}/mod.rs | 0 5 files changed, 6 insertions(+), 6 deletions(-) rename crates/core/src/{parasigdb => parsigdb}/memory.rs (99%) rename crates/core/src/{parasigdb => parsigdb}/memory_internal_test.rs (100%) rename crates/core/src/{parasigdb => parsigdb}/metrics.rs (72%) rename crates/core/src/{parasigdb => parsigdb}/mod.rs (100%) diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 62460d0d..ac709968 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -23,8 +23,8 @@ pub mod version; /// Duty deadline tracking and notification. pub mod deadline; -/// parasigdb -pub mod parasigdb; +/// parsigdb +pub mod parsigdb; /// Test utilities. #[cfg(test)] diff --git a/crates/core/src/parasigdb/memory.rs b/crates/core/src/parsigdb/memory.rs similarity index 99% rename from crates/core/src/parasigdb/memory.rs rename to crates/core/src/parsigdb/memory.rs index f693a80a..e4d025e5 100644 --- a/crates/core/src/parasigdb/memory.rs +++ b/crates/core/src/parsigdb/memory.rs @@ -5,7 +5,7 @@ use tracing::{debug, warn}; use crate::{ deadline::Deadliner, - parasigdb::metrics::PARASIG_DB_METRICS, + parsigdb::metrics::PARSIG_DB_METRICS, signeddata::SignedDataError, types::{Duty, DutyType, ParSignedData, ParSignedDataSet, PubKey}, }; @@ -351,7 +351,7 @@ impl MemDB { .push(k.clone()); if k.duty.duty_type == DutyType::Exit { - PARASIG_DB_METRICS.exit_total[&k.pub_key.to_string()].inc(); + PARSIG_DB_METRICS.exit_total[&k.pub_key.to_string()].inc(); } let result = inner.entries.get(&k).cloned().unwrap_or_default(); diff --git a/crates/core/src/parasigdb/memory_internal_test.rs b/crates/core/src/parsigdb/memory_internal_test.rs similarity index 100% rename from crates/core/src/parasigdb/memory_internal_test.rs rename to crates/core/src/parsigdb/memory_internal_test.rs diff --git a/crates/core/src/parasigdb/metrics.rs b/crates/core/src/parsigdb/metrics.rs similarity index 72% rename from crates/core/src/parasigdb/metrics.rs rename to crates/core/src/parsigdb/metrics.rs index c828725d..24a05fd8 100644 --- a/crates/core/src/parasigdb/metrics.rs +++ b/crates/core/src/parsigdb/metrics.rs @@ -2,11 +2,11 @@ use vise::*; /// Metrics for the ParSigDB. #[derive(Debug, Clone, Metrics)] -pub struct ParasigDBMetrics { +pub struct ParsigDBMetrics { /// Total number of partially signed voluntary exits per public key #[metrics(labels = ["pubkey"])] pub exit_total: LabeledFamily, } /// Global metrics for the ParSigDB. -pub static PARASIG_DB_METRICS: Global = Global::new(); +pub static PARSIG_DB_METRICS: Global = Global::new(); diff --git a/crates/core/src/parasigdb/mod.rs b/crates/core/src/parsigdb/mod.rs similarity index 100% rename from crates/core/src/parasigdb/mod.rs rename to crates/core/src/parsigdb/mod.rs