diff --git a/Cargo.lock b/Cargo.lock index 2a28c0b9..d752c4d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5519,21 +5519,28 @@ name = "pluto-core" version = "1.7.1" dependencies = [ "alloy", + "anyhow", "async-trait", "base64 0.22.1", "built", "cancellation", "chrono", + "clap", "crossbeam", "dyn-clone", "dyn-eq", "futures", + "futures-timer", "hex", + "k256", "libp2p", "pluto-build-proto", + "pluto-cluster", "pluto-eth2api", "pluto-eth2util", + "pluto-p2p", "pluto-testutil", + "pluto-tracing", "prost 0.14.3", "prost-types 0.14.3", "rand 0.8.5", @@ -5547,6 +5554,7 @@ dependencies = [ "tokio-util", "tracing", "tree_hash", + "unsigned-varint 0.8.0", "vise", ] @@ -5680,6 +5688,7 @@ dependencies = [ "pluto-k1util", "pluto-testutil", "pluto-tracing", + "prost 0.14.3", "rand 0.8.5", "reqwest 0.13.2", "serde", @@ -5689,11 +5698,36 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "unsigned-varint 0.8.0", "url", "vise", "vise-exporter", ] +[[package]] +name = "pluto-parsigex" +version = "1.7.1" +dependencies = [ + "anyhow", + "clap", + "either", + "futures", + "futures-timer", + "hex", + "libp2p", + "pluto-cluster", + "pluto-core", + "pluto-p2p", + "pluto-tracing", + "prost 0.14.3", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tokio-util", + "tracing", + "unsigned-varint 0.8.0", +] + [[package]] name = "pluto-peerinfo" version = "1.7.1" diff --git a/Cargo.toml b/Cargo.toml index 3a7a2e19..c0c3f4f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "crates/app", + "crates/parsigex", "crates/build-proto", "crates/cli", "crates/cluster", @@ -98,6 +99,7 @@ wiremock = "0.6" # Crates in the workspace pluto-app = { path = "crates/app" } +pluto-parsigex = { path = "crates/parasigex" } pluto-build-proto = { path = "crates/build-proto" } pluto-cli = { path = "crates/cli" } pluto-cluster = { path = "crates/cluster" } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index ccbf2235..3559cafd 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -12,6 +12,7 @@ cancellation.workspace = true chrono.workspace = true crossbeam.workspace = true futures.workspace = true +futures-timer.workspace = true dyn-clone.workspace = true dyn-eq.workspace = true hex.workspace = true @@ -31,18 +32,25 @@ tokio-util.workspace = true tracing.workspace = true pluto-eth2util.workspace = true tree_hash.workspace = true +unsigned-varint.workspace = true [dev-dependencies] +anyhow.workspace = true alloy.workspace = true +clap.workspace = true rand.workspace = true libp2p.workspace = true +k256.workspace = true prost.workspace = true prost-types.workspace = true hex.workspace = true chrono.workspace = true test-case.workspace = true pluto-eth2util.workspace = true +pluto-cluster.workspace = true +pluto-p2p.workspace = true pluto-testutil.workspace = true +pluto-tracing.workspace = true [build-dependencies] pluto-build-proto.workspace = true diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index ac709968..2e356963 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -26,6 +26,10 @@ pub mod deadline; /// parsigdb pub mod parsigdb; +mod parsigex_codec; + +pub use parsigex_codec::ParSigExCodecError; + /// Test utilities. #[cfg(test)] pub mod testutils; diff --git a/crates/core/src/parsigex_codec.rs b/crates/core/src/parsigex_codec.rs new file mode 100644 index 00000000..1859be72 --- /dev/null +++ b/crates/core/src/parsigex_codec.rs @@ -0,0 +1,116 @@ +//! Partial signature exchange codec helpers used by core types. + +use std::any::Any; + +use crate::{ + signeddata::{ + Attestation, BeaconCommitteeSelection, SignedAggregateAndProof, SignedRandao, + SignedSyncContributionAndProof, SignedSyncMessage, SignedVoluntaryExit, + SyncCommitteeSelection, VersionedAttestation, VersionedSignedAggregateAndProof, + VersionedSignedProposal, VersionedSignedValidatorRegistration, + }, + types::{DutyType, Signature, SignedData}, +}; + +/// Error type for partial signature exchange codec operations. +#[derive(Debug, thiserror::Error)] +pub enum ParSigExCodecError { + /// Missing duty or data set fields. + #[error("invalid parsigex msg fields")] + InvalidMessageFields, + + /// Invalid partial signed data set proto. + #[error("invalid partial signed data set proto fields")] + InvalidParSignedDataSetFields, + + /// Invalid partial signed proto. + #[error("invalid partial signed proto")] + InvalidParSignedProto, + + /// Invalid duty type. + #[error("invalid duty")] + InvalidDuty, + + /// Unsupported duty type. + #[error("unsupported duty type")] + UnsupportedDutyType, + + /// Deprecated builder proposer duty. + #[error("deprecated duty builder proposer")] + DeprecatedBuilderProposer, + + /// Failed to parse a public key. + #[error("invalid public key: {0}")] + InvalidPubKey(String), + + /// Invalid share index. + #[error("invalid share index")] + InvalidShareIndex, + + /// Serialization failed. + #[error("marshal signed data: {0}")] + Serialize(#[from] serde_json::Error), +} + +pub(crate) fn serialize_signed_data(data: &dyn SignedData) -> Result, ParSigExCodecError> { + let any = data as &dyn Any; + + macro_rules! serialize_as { + ($ty:ty) => { + if let Some(value) = any.downcast_ref::<$ty>() { + return Ok(serde_json::to_vec(value)?); + } + }; + } + + serialize_as!(Attestation); + serialize_as!(VersionedAttestation); + serialize_as!(VersionedSignedProposal); + serialize_as!(VersionedSignedValidatorRegistration); + serialize_as!(SignedVoluntaryExit); + serialize_as!(SignedRandao); + serialize_as!(Signature); + serialize_as!(BeaconCommitteeSelection); + serialize_as!(SignedAggregateAndProof); + serialize_as!(VersionedSignedAggregateAndProof); + serialize_as!(SignedSyncMessage); + serialize_as!(SyncCommitteeSelection); + serialize_as!(SignedSyncContributionAndProof); + + Err(ParSigExCodecError::UnsupportedDutyType) +} + +pub(crate) fn deserialize_signed_data( + duty_type: &DutyType, + bytes: &[u8], +) -> Result, ParSigExCodecError> { + macro_rules! deserialize_json { + ($ty:ty) => { + serde_json::from_slice::<$ty>(bytes) + .map(|value| Box::new(value) as Box) + .map_err(ParSigExCodecError::from) + }; + } + + match duty_type { + DutyType::Attester => deserialize_json!(VersionedAttestation) + .or_else(|_| deserialize_json!(Attestation)) + .map_err(|_| ParSigExCodecError::UnsupportedDutyType), + DutyType::Proposer => deserialize_json!(VersionedSignedProposal), + DutyType::BuilderProposer => Err(ParSigExCodecError::DeprecatedBuilderProposer), + DutyType::BuilderRegistration => deserialize_json!(VersionedSignedValidatorRegistration), + DutyType::Exit => deserialize_json!(SignedVoluntaryExit), + DutyType::Randao => deserialize_json!(SignedRandao), + DutyType::Signature => deserialize_json!(Signature), + DutyType::PrepareAggregator => deserialize_json!(BeaconCommitteeSelection), + DutyType::Aggregator => deserialize_json!(VersionedSignedAggregateAndProof) + .or_else(|_| deserialize_json!(SignedAggregateAndProof)) + .map_err(|_| ParSigExCodecError::UnsupportedDutyType), + DutyType::SyncMessage => deserialize_json!(SignedSyncMessage), + DutyType::PrepareSyncContribution => deserialize_json!(SyncCommitteeSelection), + DutyType::SyncContribution => deserialize_json!(SignedSyncContributionAndProof), + DutyType::Unknown | DutyType::InfoSync | DutyType::DutySentinel(_) => { + Err(ParSigExCodecError::UnsupportedDutyType) + } + } +} diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 78e2bc62..05a73f03 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -1,6 +1,6 @@ //! Types for the Charon core. -use std::{collections::HashMap, fmt::Display, iter}; +use std::{any::Any, collections::HashMap, fmt::Display, iter}; use chrono::{DateTime, Duration, Utc}; use dyn_clone::DynClone; @@ -8,7 +8,12 @@ use dyn_eq::DynEq; use serde::{Deserialize, Serialize}; use std::fmt::Debug as StdDebug; -use crate::signeddata::SignedDataError; +use crate::{ + ParSigExCodecError, + corepb::v1::core as pbcore, + parsigex_codec::{deserialize_signed_data, serialize_signed_data}, + signeddata::SignedDataError, +}; /// The type of duty. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -66,6 +71,52 @@ impl DutyType { } } +impl From<&DutyType> for i32 { + fn from(duty_type: &DutyType) -> Self { + match duty_type { + DutyType::Unknown => 0, + DutyType::Proposer => 1, + DutyType::Attester => 2, + DutyType::Signature => 3, + DutyType::Exit => 4, + DutyType::BuilderProposer => 5, + DutyType::BuilderRegistration => 6, + DutyType::Randao => 7, + DutyType::PrepareAggregator => 8, + DutyType::Aggregator => 9, + DutyType::SyncMessage => 10, + DutyType::PrepareSyncContribution => 11, + DutyType::SyncContribution => 12, + DutyType::InfoSync => 13, + DutyType::DutySentinel(_) => 14, + } + } +} + +impl TryFrom for DutyType { + type Error = ParSigExCodecError; + + fn try_from(value: i32) -> Result { + match value { + 0 => Ok(DutyType::Unknown), + 1 => Ok(DutyType::Proposer), + 2 => Ok(DutyType::Attester), + 3 => Ok(DutyType::Signature), + 4 => Ok(DutyType::Exit), + 5 => Ok(DutyType::BuilderProposer), + 6 => Ok(DutyType::BuilderRegistration), + 7 => Ok(DutyType::Randao), + 8 => Ok(DutyType::PrepareAggregator), + 9 => Ok(DutyType::Aggregator), + 10 => Ok(DutyType::SyncMessage), + 11 => Ok(DutyType::PrepareSyncContribution), + 12 => Ok(DutyType::SyncContribution), + 13 => Ok(DutyType::InfoSync), + _ => Err(ParSigExCodecError::InvalidDuty), + } + } +} + /// SlotNumber struct #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct SlotNumber(u64); @@ -192,6 +243,28 @@ impl Duty { } } +impl From<&Duty> for pbcore::Duty { + fn from(duty: &Duty) -> Self { + Self { + slot: duty.slot.inner(), + r#type: i32::from(&duty.duty_type), + } + } +} + +impl TryFrom<&pbcore::Duty> for Duty { + type Error = ParSigExCodecError; + + fn try_from(duty: &pbcore::Duty) -> Result { + let duty_type = DutyType::try_from(duty.r#type)?; + if !duty_type.is_valid() { + return Err(ParSigExCodecError::InvalidDuty); + } + + Ok(Self::new(duty.slot.into(), duty_type)) + } +} + /// The type of proposal. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -452,7 +525,7 @@ impl AsRef<[u8; SIG_LEN]> for Signature { } /// Signed data type -pub trait SignedData: DynClone + DynEq + StdDebug + Send + Sync { +pub trait SignedData: Any + DynClone + DynEq + StdDebug + Send + Sync { /// signature returns the signed duty data's signature. fn signature(&self) -> Result; @@ -517,6 +590,39 @@ impl ParSignedData { } } +impl TryFrom<&ParSignedData> for pbcore::ParSignedData { + type Error = ParSigExCodecError; + + fn try_from(data: &ParSignedData) -> Result { + let encoded = serialize_signed_data(data.signed_data.as_ref())?; + let share_idx = + i32::try_from(data.share_idx).map_err(|_| ParSigExCodecError::InvalidShareIndex)?; + let signature = data.signed_data.signature().map_err(|err| { + ParSigExCodecError::Serialize(serde_json::Error::io(std::io::Error::other( + err.to_string(), + ))) + })?; + + Ok(Self { + data: encoded.into(), + signature: signature.as_ref().to_vec().into(), + share_idx, + }) + } +} + +impl TryFrom<(&DutyType, &pbcore::ParSignedData)> for ParSignedData { + type Error = ParSigExCodecError; + + fn try_from(value: (&DutyType, &pbcore::ParSignedData)) -> Result { + let (duty_type, data) = value; + let share_idx = + u64::try_from(data.share_idx).map_err(|_| ParSigExCodecError::InvalidShareIndex)?; + let signed_data = deserialize_signed_data(duty_type, &data.data)?; + Ok(Self::new_boxed(signed_data, share_idx)) + } +} + /// ParSignedDataSet is a set of partially signed duty data only signed by a /// single threshold BLS share. #[derive(Debug, Clone, PartialEq, Eq, Default)] @@ -554,6 +660,39 @@ impl ParSignedDataSet { } } +impl TryFrom<&ParSignedDataSet> for pbcore::ParSignedDataSet { + type Error = ParSigExCodecError; + + fn try_from(set: &ParSignedDataSet) -> Result { + let mut out = std::collections::BTreeMap::new(); + for (pub_key, value) in set.inner() { + out.insert(pub_key.to_string(), pbcore::ParSignedData::try_from(value)?); + } + + Ok(Self { set: out }) + } +} + +impl TryFrom<(&DutyType, &pbcore::ParSignedDataSet)> for ParSignedDataSet { + type Error = ParSigExCodecError; + + fn try_from(value: (&DutyType, &pbcore::ParSignedDataSet)) -> Result { + let (duty_type, set) = value; + if set.set.is_empty() { + return Err(ParSigExCodecError::InvalidParSignedDataSetFields); + } + + let mut out = Self::new(); + for (pub_key, value) in &set.set { + let pub_key = PubKey::try_from(pub_key.as_str()) + .map_err(|_| ParSigExCodecError::InvalidPubKey(pub_key.clone()))?; + out.insert(pub_key, ParSignedData::try_from((duty_type, value))?); + } + + Ok(out) + } +} + /// SignedDataSet is a set of signed duty data. #[derive(Debug, Clone, PartialEq, Eq)] pub struct SignedDataSet(HashMap); diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml index 750609b1..935dbaf7 100644 --- a/crates/p2p/Cargo.toml +++ b/crates/p2p/Cargo.toml @@ -16,6 +16,7 @@ thiserror.workspace = true k256.workspace = true pluto-eth2util.workspace = true pluto-k1util.workspace = true +prost.workspace = true vise.workspace = true tokio.workspace = true tokio-util.workspace = true @@ -29,6 +30,7 @@ pluto-core.workspace = true backon.workspace = true reqwest.workspace = true url.workspace = true +unsigned-varint.workspace = true [dev-dependencies] pluto-testutil.workspace = true diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 0c06608b..5e85afc4 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -51,3 +51,6 @@ pub mod relay; /// Force direct connection behaviour. pub mod force_direct; + +/// Protobuf utilities. +pub mod proto; diff --git a/crates/p2p/src/p2p.rs b/crates/p2p/src/p2p.rs index a35a2775..33ba5ab1 100644 --- a/crates/p2p/src/p2p.rs +++ b/crates/p2p/src/p2p.rs @@ -110,6 +110,14 @@ use crate::{ utils, }; +const YAMUX_MAX_NUM_STREAMS: usize = 2_048; + +fn yamux_config() -> yamux::Config { + let mut config = yamux::Config::default(); + config.set_max_num_streams(YAMUX_MAX_NUM_STREAMS); + config +} + /// P2P error. #[derive(Debug, thiserror::Error)] pub enum P2PError { @@ -323,20 +331,17 @@ impl Node { { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) + .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_quic() .with_dns() .map_err(P2PError::failed_to_build_swarm)? - .with_relay_client(noise::Config::new, yamux::Config::default) + .with_relay_client(noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_behaviour(|key, relay_client| { - let builder = - PlutoBehaviourBuilder::default().with_p2p_context(p2p_context.clone()); + let builder = PlutoBehaviourBuilder::default() + .with_p2p_context(p2p_context.clone()) + .with_quic_enabled(true); behaviour_fn(builder, key, relay_client).build(key) }) .map_err(P2PError::failed_to_build_swarm)? @@ -364,15 +369,11 @@ impl Node { { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) + .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_dns() .map_err(P2PError::failed_to_build_swarm)? - .with_relay_client(noise::Config::new, yamux::Config::default) + .with_relay_client(noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_behaviour(|key, relay_client| { let builder = @@ -400,11 +401,7 @@ impl Node { { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) + .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_quic() .with_dns() @@ -435,11 +432,7 @@ impl Node { { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) + .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_quic() .with_dns() diff --git a/crates/p2p/src/proto.rs b/crates/p2p/src/proto.rs new file mode 100644 index 00000000..57825b4a --- /dev/null +++ b/crates/p2p/src/proto.rs @@ -0,0 +1,86 @@ +use std::io; + +use futures::prelude::*; +use prost::Message; +use unsigned_varint::aio::read_usize; + +/// Default maximum message size (64KB should be plenty for peer info). +pub const MAX_MESSAGE_SIZE: usize = 64 * 1024; + +/// Writes a length-delimited payload with an unsigned varint length prefix. +/// +/// Wire format: `[uvarint length][payload bytes]` +pub async fn write_length_delimited( + stream: &mut S, + payload: &[u8], +) -> io::Result<()> { + let mut len_buf = unsigned_varint::encode::usize_buffer(); + let encoded_len = unsigned_varint::encode::usize(payload.len(), &mut len_buf); + stream.write_all(encoded_len).await?; + stream.write_all(payload).await?; + stream.flush().await +} + +/// Reads a length-delimited payload with an unsigned varint length prefix. +/// +/// Wire format: `[uvarint length][payload bytes]` +/// +/// Returns an error if the payload exceeds `max_message_size`. +pub async fn read_length_delimited( + stream: &mut S, + max_message_size: usize, +) -> io::Result> { + let msg_len = read_usize(&mut *stream).await.map_err(|e| match e { + unsigned_varint::io::ReadError::Io(io_err) => io_err, + other => io::Error::new(io::ErrorKind::InvalidData, other), + })?; + + if msg_len > max_message_size { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("message too large: {msg_len} bytes (max: {max_message_size})"), + )); + } + + let mut buf = vec![0u8; msg_len]; + stream.read_exact(&mut buf).await?; + Ok(buf) +} + +/// Writes a protobuf message with unsigned varint length prefix to the stream. +/// +/// Wire format: `[uvarint length][protobuf bytes]` +pub async fn write_protobuf( + stream: &mut S, + msg: &M, +) -> io::Result<()> { + let mut buf = Vec::with_capacity(msg.encoded_len()); + msg.encode(&mut buf) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + write_length_delimited(stream, &buf).await +} + +/// Reads a protobuf message with unsigned varint length prefix from the stream. +/// +/// Wire format: `[uvarint length][protobuf bytes]` +/// +/// Returns an error if the message exceeds `MAX_MESSAGE_SIZE`. +pub async fn read_protobuf( + stream: &mut S, +) -> io::Result { + read_protobuf_with_max_size(stream, MAX_MESSAGE_SIZE).await +} + +/// Reads a protobuf message with unsigned varint length prefix from the stream. +/// +/// Wire format: `[uvarint length][protobuf bytes]` +/// +/// Returns an error if the message exceeds `max_message_size`. +pub async fn read_protobuf_with_max_size( + stream: &mut S, + max_message_size: usize, +) -> io::Result { + let buf = read_length_delimited(stream, max_message_size).await?; + + M::decode(&buf[..]).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) +} diff --git a/crates/p2p/src/relay.rs b/crates/p2p/src/relay.rs index c334670f..946785e1 100644 --- a/crates/p2p/src/relay.rs +++ b/crates/p2p/src/relay.rs @@ -31,9 +31,10 @@ use libp2p::{ ToSwarm, dial_opts::DialOpts, dummy, }, }; -use tokio::time::Interval; +use tokio::time::{Instant, Interval}; const RELAY_ROUTER_INTERVAL: Duration = Duration::from_secs(60); +const RELAY_ROUTER_INITIAL_DELAY: Duration = Duration::from_secs(10); /// Mutable relay reservation behaviour. /// @@ -246,7 +247,12 @@ pub struct RelayRouter { impl RelayRouter { /// Creates a new relay router. pub fn new(relays: Vec, p2p_context: P2PContext, local_peer_id: PeerId) -> Self { - let mut interval = tokio::time::interval(RELAY_ROUTER_INTERVAL); + let mut interval = tokio::time::interval_at( + Instant::now() + .checked_add(RELAY_ROUTER_INITIAL_DELAY) + .expect("should not overflow"), + RELAY_ROUTER_INTERVAL, + ); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); Self { diff --git a/crates/parsigex/Cargo.toml b/crates/parsigex/Cargo.toml new file mode 100644 index 00000000..c89d7fbe --- /dev/null +++ b/crates/parsigex/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "pluto-parsigex" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +publish.workspace = true + +[dependencies] +either.workspace = true +futures.workspace = true +futures-timer.workspace = true +libp2p.workspace = true +prost.workspace = true +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true +unsigned-varint.workspace = true +pluto-core.workspace = true +pluto-p2p.workspace = true + +[dev-dependencies] +anyhow.workspace = true +clap.workspace = true +hex.workspace = true +pluto-cluster.workspace = true +pluto-tracing.workspace = true +tokio-util.workspace = true +serde_json.workspace = true + +[lints] +workspace = true diff --git a/crates/parsigex/examples/parsigex.rs b/crates/parsigex/examples/parsigex.rs new file mode 100644 index 00000000..af7d4287 --- /dev/null +++ b/crates/parsigex/examples/parsigex.rs @@ -0,0 +1,512 @@ +#![allow(missing_docs)] + +use std::{ + collections::{HashMap, HashSet}, + path::PathBuf, + time::Duration, +}; + +use anyhow::{Context, Result, anyhow}; +use clap::Parser; +use futures::StreamExt; +use libp2p::{ + identify, ping, + relay::{self}, + swarm::{NetworkBehaviour, SwarmEvent}, +}; +use pluto_cluster::lock::Lock; +use pluto_core::{ + signeddata::SignedRandao, + types::{Duty, DutyType, ParSignedDataSet, PubKey, SlotNumber}, +}; +use pluto_p2p::{ + behaviours::pluto::PlutoBehaviourEvent, + bootnode, + config::P2PConfig, + gater, k1, + p2p::{Node, NodeType}, + peer::peer_id_from_key, + relay::{MutableRelayReservation, RelayRouter}, +}; +use pluto_parsigex::{self as parsigex, DutyGater, Event, Handle, Verifier}; +use pluto_tracing::TracingConfig; +use tokio::fs; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn}; + +#[derive(NetworkBehaviour)] +#[behaviour(to_swarm = "CombinedBehaviourEvent")] +struct CombinedBehaviour { + relay: relay::client::Behaviour, + relay_reservation: MutableRelayReservation, + relay_router: RelayRouter, + parsigex: parsigex::Behaviour, +} + +#[derive(Debug)] +enum CombinedBehaviourEvent { + ParSigEx(Event), + Relay(relay::client::Event), +} + +impl From for CombinedBehaviourEvent { + fn from(event: Event) -> Self { + Self::ParSigEx(event) + } +} + +impl From for CombinedBehaviourEvent { + fn from(event: relay::client::Event) -> Self { + Self::Relay(event) + } +} + +impl From for CombinedBehaviourEvent { + fn from(value: std::convert::Infallible) -> Self { + match value {} + } +} + +#[derive(Debug, Parser)] +#[command(name = "parsigex-example")] +#[command(about = "Demonstrates partial signature exchange over the bootnode/relay P2P path")] +struct Args { + /// Relay URLs or multiaddrs. + #[arg(long, value_delimiter = ',')] + relays: Vec, + + /// Directory holding the p2p private key and cluster lock. + #[arg(long)] + data_dir: PathBuf, + + /// TCP listen addresses. + #[arg(long, value_delimiter = ',', default_value = "0.0.0.0:0")] + tcp_addrs: Vec, + + /// UDP listen addresses used for QUIC. + #[arg(long, value_delimiter = ',', default_value = "0.0.0.0:0")] + udp_addrs: Vec, + + /// Whether to filter private addresses from advertisements. + #[arg(long, default_value_t = false)] + filter_private_addrs: bool, + + /// External IP address to advertise. + #[arg(long)] + external_ip: Option, + + /// External hostname to advertise. + #[arg(long)] + external_host: Option, + + /// Whether to disable socket reuse-port. + #[arg(long, default_value_t = false)] + disable_reuse_port: bool, + + /// Emit a sample partial signature every N seconds. + #[arg(long, default_value_t = 10)] + broadcast_every: u64, + + /// Share index to use in the sample partial signature. + #[arg(long, default_value_t = 1)] + share_idx: u64, + + /// Log level. + #[arg(long, default_value = "info")] + log_level: String, +} + +fn make_sample_set(slot: u64, share_idx: u64) -> ParSignedDataSet { + let share_byte = u8::try_from(share_idx % 255).unwrap_or(1); + let pub_key = PubKey::new([share_byte; 48]); + + let mut set = ParSignedDataSet::new(); + set.insert( + pub_key, + SignedRandao::new_partial(slot / 32, [share_byte; 96], share_idx), + ); + set +} + +fn log_received(duty: &Duty, set: &ParSignedDataSet, peer: &libp2p::PeerId) { + let entries = set + .inner() + .iter() + .map(|(pub_key, data)| format!("{pub_key}:share_idx={}", data.share_idx)) + .collect::>() + .join(", "); + + info!(peer = %peer, duty = %duty, entries = %entries, "received partial signature set"); +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + pluto_tracing::init( + &TracingConfig::builder() + .with_default_console() + .override_env_filter(&args.log_level) + .build(), + )?; + + let key = k1::load_priv_key(&args.data_dir).with_context(|| { + format!( + "failed to load private key from {}", + args.data_dir.display() + ) + })?; + let local_peer_id = peer_id_from_key(key.public_key()) + .context("failed to derive local peer ID from private key")?; + + let lock_path = args.data_dir.join("cluster-lock.json"); + let lock_str = fs::read_to_string(&lock_path) + .await + .with_context(|| format!("failed to read {}", lock_path.display()))?; + let lock: Lock = serde_json::from_str(&lock_str) + .with_context(|| format!("failed to parse {}", lock_path.display()))?; + + let cancel = CancellationToken::new(); + let lock_hash_hex = hex::encode(&lock.lock_hash); + let relays = bootnode::new_relays(cancel.child_token(), &args.relays, &lock_hash_hex) + .await + .context("failed to resolve relays")?; + + let known_peers = lock + .peer_ids() + .context("failed to derive peer IDs from lock")?; + if !known_peers.contains(&local_peer_id) { + return Err(anyhow!( + "local peer ID {local_peer_id} not found in cluster lock" + )); + } + let conn_gater = gater::ConnGater::new( + gater::Config::closed() + .with_relays(relays.clone()) + .with_peer_ids(known_peers.clone()), + ); + + let verifier: Verifier = + std::sync::Arc::new(|_duty, _pubkey, _data| Box::pin(async { Ok(()) })); + let duty_gater: DutyGater = std::sync::Arc::new(|duty| duty.duty_type != DutyType::Unknown); + let handle_slot = std::sync::Arc::new(tokio::sync::Mutex::new(1_u64)); + + let p2p_config = P2PConfig { + relays: vec![], + external_ip: args.external_ip.clone(), + external_host: args.external_host.clone(), + tcp_addrs: args.tcp_addrs.clone(), + udp_addrs: args.udp_addrs.clone(), + disable_reuse_port: args.disable_reuse_port, + }; + + let relay_peer_ids: HashSet<_> = relays + .iter() + .filter_map(|relay| relay.peer().ok().flatten().map(|peer| peer.id)) + .collect(); + + let mut parsigex_handle: Option = None; + let mut node: Node = Node::new( + p2p_config, + key, + NodeType::QUIC, + args.filter_private_addrs, + known_peers.clone(), + |builder, keypair, relay_client| { + let p2p_context = builder.p2p_context(); + let local_peer_id = keypair.public().to_peer_id(); + let config = parsigex::Config::new( + local_peer_id, + p2p_context.clone(), + verifier.clone(), + duty_gater.clone(), + ) + .with_timeout(Duration::from_secs(10)); + let (parsigex, handle) = parsigex::Behaviour::new(config, local_peer_id); + parsigex_handle = Some(handle); + + builder + .with_gater(conn_gater) + .with_inner(CombinedBehaviour { + parsigex, + relay: relay_client, + relay_reservation: MutableRelayReservation::new(relays.clone()), + relay_router: RelayRouter::new(relays.clone(), p2p_context, local_peer_id), + }) + }, + )?; + + let parsigex_handle = + parsigex_handle.ok_or_else(|| anyhow!("parsigex handle should be created"))?; + + info!( + peer_id = %node.local_peer_id(), + data_dir = %args.data_dir.display(), + known_peers = ?known_peers, + relays = ?args.relays, + "parsigex example started" + ); + + let mut ticker = tokio::time::interval(Duration::from_secs(args.broadcast_every)); + let mut pending_broadcasts: HashMap = HashMap::new(); + + loop { + tokio::select! { + _ = tokio::signal::ctrl_c() => { + info!("ctrl+c received, shutting down"); + break; + } + _ = ticker.tick() => { + info!("broadcasting sample partial signature set"); + let mut slot = handle_slot.lock().await; + let duty = Duty::new(SlotNumber::new(*slot), DutyType::Randao); + let data_set = make_sample_set(*slot, args.share_idx); + + match parsigex_handle.broadcast(duty.clone(), data_set.clone()).await { + Ok(request_id) => { + pending_broadcasts.insert(request_id, (duty.clone(), args.share_idx)); + info!( + request_id, + duty = %duty, + share_idx = args.share_idx, + "queued sample partial signature set for broadcast" + ); + *slot = slot.saturating_add(1); + } + Err(error) => { + warn!(%error, "broadcast failed"); + } + } + } + event = node.select_next_some() => { + let peer_type = |peer_id: &libp2p::PeerId| { + if relay_peer_ids.contains(peer_id) { + "RELAY" + } else if known_peers.contains(peer_id) { + "PEER" + } else { + "UNKNOWN" + } + }; + + match event { + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::Relay(relay::client::Event::ReservationReqAccepted { + relay_peer_id, + renewal, + limit, + }), + )) => { + info!( + relay_peer_id = %relay_peer_id, + peer_type = peer_type(&relay_peer_id), + renewal, + limit = ?limit, + "relay reservation accepted" + ); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::Relay(relay::client::Event::OutboundCircuitEstablished { + relay_peer_id, + limit, + }), + )) => { + info!( + relay_peer_id = %relay_peer_id, + peer_type = peer_type(&relay_peer_id), + limit = ?limit, + "outbound relay circuit established" + ); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::Relay(relay::client::Event::InboundCircuitEstablished { + src_peer_id, + limit, + }), + )) => { + info!( + src_peer_id = %src_peer_id, + peer_type = peer_type(&src_peer_id), + limit = ?limit, + "inbound relay circuit established" + ); + } + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint, + num_established, + .. + } => { + let address = match &endpoint { + libp2p::core::ConnectedPoint::Dialer { address, .. } => address, + libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => { + send_back_addr + } + }; + info!( + peer_id = %peer_id, + peer_type = peer_type(&peer_id), + address = %address, + num_established, + "connection established" + ); + } + SwarmEvent::ConnectionClosed { + peer_id, + endpoint, + num_established, + cause, + .. + } => { + let address = match &endpoint { + libp2p::core::ConnectedPoint::Dialer { address, .. } => address, + libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => { + send_back_addr + } + }; + info!( + peer_id = %peer_id, + peer_type = peer_type(&peer_id), + address = %address, + num_established, + cause = ?cause, + "connection closed" + ); + } + SwarmEvent::OutgoingConnectionError { + peer_id, + error, + connection_id, + } => { + warn!( + peer_id = ?peer_id, + connection_id = ?connection_id, + error = %error, + "outgoing connection failed" + ); + } + SwarmEvent::IncomingConnectionError { + connection_id, + local_addr, + send_back_addr, + error, + .. + } => { + warn!( + connection_id = ?connection_id, + local_addr = %local_addr, + send_back_addr = %send_back_addr, + error = %error, + "incoming connection failed" + ); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Identify( + identify::Event::Received { peer_id, info, .. }, + )) => { + info!( + peer_id = %peer_id, + peer_type = peer_type(&peer_id), + agent_version = %info.agent_version, + protocol_version = %info.protocol_version, + listen_addrs = ?info.listen_addrs, + "identify received" + ); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Ping(ping::Event { + peer, + result, + .. + })) => match result { + Ok(rtt) => { + info!(peer_id = %peer, peer_type = peer_type(&peer), rtt = ?rtt, "ping succeeded"); + } + Err(error) => { + warn!(peer_id = %peer, peer_type = peer_type(&peer), error = %error, "ping failed"); + } + }, + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::Received { + peer, + duty, + data_set, + .. + }), + )) => { + log_received(&duty, &data_set, &peer); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::Error { peer, error, .. }), + )) => { + warn!(peer = %peer, error = %error, "parsigex protocol error"); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::BroadcastError { + request_id, + peer, + error, + }), + )) => { + match pending_broadcasts.get(&request_id) { + Some((duty, share_idx)) => { + warn!( + request_id, + duty = %duty, + share_idx, + peer = ?peer, + error = %error, + "sample partial signature broadcast failed" + ); + } + None => { + warn!( + request_id, + peer = ?peer, + error = %error, + "partial signature broadcast failed" + ); + } + } + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::BroadcastComplete { + request_id, + }), + )) => { + if let Some((duty, share_idx)) = pending_broadcasts.remove(&request_id) { + info!( + request_id, + duty = %duty, + share_idx, + "broadcasted sample partial signature set" + ); + } else { + info!(request_id, "partial signature broadcast completed"); + } + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::BroadcastFinished { + request_id, + }), + )) => { + if let Some((duty, share_idx)) = pending_broadcasts.remove(&request_id) { + warn!( + request_id, + duty = %duty, + share_idx, + "sample partial signature broadcast finished with failures" + ); + } else { + warn!(request_id, "partial signature broadcast finished with failures"); + } + } + SwarmEvent::NewListenAddr { address, .. } => { + info!(address = %address, "listening"); + } + _ => {} + } + } + } + } + + Ok(()) +} diff --git a/crates/parsigex/src/behaviour.rs b/crates/parsigex/src/behaviour.rs new file mode 100644 index 00000000..4f9d6c5e --- /dev/null +++ b/crates/parsigex/src/behaviour.rs @@ -0,0 +1,444 @@ +//! Network behaviour and control handle for partial signature exchange. + +use std::{ + collections::{HashMap, VecDeque}, + future::Future, + pin::Pin, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + task::{Context, Poll}, + time::Duration, +}; + +use either::Either; +use libp2p::{ + Multiaddr, PeerId, + swarm::{ + ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, dummy, + }, +}; +use tokio::sync::mpsc; + +use pluto_core::types::{Duty, ParSignedData, ParSignedDataSet, PubKey}; +use pluto_p2p::p2p_context::P2PContext; + +use super::{Error as CodecError, Handler, encode_message}; +use crate::handler::{Failure as HandlerFailure, FromHandler, ToHandler}; + +/// Future returned by verifier callbacks. +pub type VerifyFuture = + Pin> + Send + 'static>>; + +/// Verifier callback type. +pub type Verifier = + Arc VerifyFuture + Send + Sync + 'static>; + +/// Duty gate callback type. +pub type DutyGater = Arc bool + Send + Sync + 'static>; + +/// Error type for signature verification callbacks. +#[derive(Debug, thiserror::Error)] +pub enum VerifyError { + /// Unknown validator public key. + #[error("unknown pubkey, not part of cluster lock")] + UnknownPubKey, + + /// Invalid share index for the validator. + #[error("invalid shareIdx")] + InvalidShareIndex, + + /// Invalid signed-data family for the duty. + #[error("invalid eth2 signed data")] + InvalidSignedDataFamily, + + /// Generic verification error. + #[error("{0}")] + Other(String), +} + +/// Error type for behaviour operations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Message conversion failed. + #[error(transparent)] + Codec(#[from] CodecError), + + /// Channel closed. + #[error("parsigex handle closed")] + Closed, + + /// Broadcast failed for a peer. + #[error("broadcast to peer {peer} failed: {source}")] + BroadcastPeer { + /// Peer for which the broadcast failed. + peer: PeerId, + /// Source error. + #[source] + source: HandlerFailure, + }, + + /// Peer is not currently connected. + #[error("peer {0} is not connected")] + PeerNotConnected(PeerId), +} + +/// Result type for partial signature exchange behaviour operations. +pub type Result = std::result::Result; + +/// Event emitted by the partial signature exchange behaviour. +#[derive(Debug)] +pub enum Event { + /// A verified partial signature set was received from a peer. + Received { + /// The remote peer. + peer: PeerId, + /// Connection on which it was received. + connection: ConnectionId, + /// Duty associated with the data set. + duty: Duty, + /// Partial signature set. + data_set: ParSignedDataSet, + }, + /// A peer sent invalid data or verification failed. + Error { + /// The remote peer. + peer: PeerId, + /// Connection on which the error occurred. + connection: ConnectionId, + /// Failure reason. + error: HandlerFailure, + }, + /// Broadcast failed. + BroadcastError { + /// Request identifier. + request_id: u64, + /// Peer for which the broadcast failed. + peer: Option, + /// Failure reason. + error: HandlerFailure, + }, + /// Broadcast completed successfully for all targeted peers. + BroadcastComplete { + /// Request identifier. + request_id: u64, + }, + /// Broadcast finished after one or more peer failures. + BroadcastFinished { + /// Request identifier. + request_id: u64, + }, +} + +#[derive(Debug)] +struct PendingBroadcast { + remaining: usize, + failed: bool, +} + +#[derive(Debug)] +enum Command { + Broadcast { + request_id: u64, + duty: Duty, + data_set: ParSignedDataSet, + }, +} + +/// Async handle for outbound partial signature broadcasts. +#[derive(Debug, Clone)] +pub struct Handle { + tx: mpsc::UnboundedSender, + next_request_id: Arc, +} + +impl Handle { + /// Broadcasts a partial signature set to all peers except self. + pub async fn broadcast(&self, duty: Duty, data_set: ParSignedDataSet) -> Result { + let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed); + self.tx + .send(Command::Broadcast { + request_id, + duty, + data_set, + }) + .map_err(|_| Error::Closed)?; + + Ok(request_id) + } +} + +/// Configuration for the partial signature exchange behaviour. +#[derive(Clone)] +pub struct Config { + peer_id: PeerId, + p2p_context: P2PContext, + verifier: Verifier, + duty_gater: DutyGater, + timeout: Duration, +} + +impl Config { + /// Creates a new configuration. + pub fn new( + peer_id: PeerId, + p2p_context: P2PContext, + verifier: Verifier, + duty_gater: DutyGater, + ) -> Self { + Self { + peer_id, + p2p_context, + verifier, + duty_gater, + timeout: Duration::from_secs(20), + } + } + + /// Sets the send/receive timeout. + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } +} + +/// Behaviour for partial signature exchange. +pub struct Behaviour { + config: Config, + rx: mpsc::UnboundedReceiver, + pending_actions: VecDeque>>, + events: VecDeque, + pending_broadcasts: HashMap, +} + +impl Behaviour { + /// Creates a behaviour and a clonable broadcast handle. + pub fn new(config: Config, peer_id: PeerId) -> (Self, Handle) { + debug_assert_eq!(config.peer_id, peer_id); + let (tx, rx) = mpsc::unbounded_channel(); + let handle = Handle { + tx, + next_request_id: Arc::new(AtomicU64::new(0)), + }; + + ( + Self { + config, + rx, + pending_actions: VecDeque::new(), + events: VecDeque::new(), + pending_broadcasts: HashMap::new(), + }, + handle, + ) + } + + fn handle_command(&mut self, command: Command) { + match command { + Command::Broadcast { + request_id, + duty, + data_set, + } => { + let message = match encode_message(&duty, &data_set) { + Ok(message) => message, + Err(err) => { + self.broadcast_error(request_id, None, HandlerFailure::Codec(err)); + return; + } + }; + + let peers: Vec<_> = self + .config + .p2p_context + .known_peers() + .iter() + .copied() + .collect(); + let mut targeted = 0usize; + for peer in peers { + if peer == self.config.peer_id { + continue; + } + + if self + .config + .p2p_context + .peer_store_lock() + .connections_to_peer(&peer) + .is_empty() + { + self.broadcast_error( + request_id, + Some(peer), + HandlerFailure::Io(format!("peer {peer} is not connected")), + ); + continue; + } + + self.pending_actions.push_back(ToSwarm::NotifyHandler { + peer_id: peer, + handler: NotifyHandler::Any, + event: Either::Left(ToHandler::Send { + request_id, + payload: message.clone(), + }), + }); + targeted = targeted.saturating_add(1); + } + + if targeted == 0 { + return; + } + + self.pending_broadcasts.insert( + request_id, + PendingBroadcast { + remaining: targeted, + failed: false, + }, + ); + } + } + } + + fn finish_broadcast_result(&mut self, request_id: u64, failed: bool) { + let Some(entry) = self.pending_broadcasts.get_mut(&request_id) else { + return; + }; + + entry.failed |= failed; + entry.remaining = entry.remaining.saturating_sub(1); + if entry.remaining == 0 { + let failed = self + .pending_broadcasts + .remove(&request_id) + .map(|entry| entry.failed) + .unwrap_or(failed); + if failed { + self.events + .push_back(Event::BroadcastFinished { request_id }); + } else { + self.events + .push_back(Event::BroadcastComplete { request_id }); + } + } + } + + fn broadcast_error(&mut self, request_id: u64, peer: Option, error: HandlerFailure) { + self.events.push_back(Event::BroadcastError { + request_id, + peer, + error, + }); + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = Either; + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> std::result::Result, ConnectionDenied> { + if !self.config.p2p_context.is_known_peer(&peer) { + return Ok(Either::Right(dummy::ConnectionHandler)); + } + + tracing::trace!("establishing inbound connection to peer: {:?}", peer); + Ok(Either::Left(Handler::new( + self.config.timeout, + self.config.verifier.clone(), + self.config.duty_gater.clone(), + peer, + ))) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + _addr: &Multiaddr, + _role_override: libp2p::core::Endpoint, + _port_use: libp2p::core::transport::PortUse, + ) -> std::result::Result, ConnectionDenied> { + if !self.config.p2p_context.is_known_peer(&peer) { + return Ok(Either::Right(dummy::ConnectionHandler)); + } + + tracing::trace!("establishing outbound connection to peer: {:?}", peer); + Ok(Either::Left(Handler::new( + self.config.timeout, + self.config.verifier.clone(), + self.config.duty_gater.clone(), + peer, + ))) + } + + fn on_swarm_event(&mut self, _event: FromSwarm) {} + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + let event = match event { + Either::Left(event) => event, + Either::Right(value) => match value {}, + }; + + tracing::trace!("received connection handler event: {:?}", event); + match event { + FromHandler::Received { duty, data_set } => { + self.events.push_back(Event::Received { + peer: peer_id, + connection: connection_id, + duty, + data_set, + }); + } + FromHandler::InboundError(error) => { + self.events.push_back(Event::Error { + peer: peer_id, + connection: connection_id, + error, + }); + } + FromHandler::OutboundSuccess { request_id } => { + self.finish_broadcast_result(request_id, false); + } + FromHandler::OutboundError { request_id, error } => { + self.finish_broadcast_result(request_id, true); + self.broadcast_error(request_id, Some(peer_id), error); + } + } + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + tracing::trace!("polling parsigex behaviour"); + + if let Some(event) = self.events.pop_front() { + return Poll::Ready(ToSwarm::GenerateEvent(event)); + } + + if let Poll::Ready(Some(command)) = self.rx.poll_recv(cx) { + self.handle_command(command); + } + + if let Some(action) = self.pending_actions.pop_front() { + return Poll::Ready(action); + } + + Poll::Pending + } +} diff --git a/crates/parsigex/src/handler.rs b/crates/parsigex/src/handler.rs new file mode 100644 index 00000000..e1925aad --- /dev/null +++ b/crates/parsigex/src/handler.rs @@ -0,0 +1,403 @@ +//! Connection handler for the partial signature exchange protocol. + +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; + +use futures::{future::BoxFuture, prelude::*}; +use futures_timer::Delay; +use libp2p::{ + PeerId, + core::upgrade::ReadyUpgrade, + swarm::{ + ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError, + SubstreamProtocol, + handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + }, + }, +}; + +use pluto_core::types::{Duty, ParSignedDataSet}; + +use super::{DutyGater, PROTOCOL_NAME, Verifier, protocol}; +use crate::Error as CodecError; + +/// Failure type for the partial signature exchange handler. +#[derive(Debug, thiserror::Error)] +pub enum Failure { + /// Stream negotiation timed out. + #[error("parsigex protocol negotiation timed out")] + Timeout, + /// Invalid payload. + #[error("invalid parsigex payload")] + InvalidPayload, + /// Duty not accepted by the gater. + #[error("invalid duty")] + InvalidDuty, + /// Signature verification failed. + #[error("invalid partial signature")] + InvalidPartialSignature, + /// I/O error. + #[error("{0}")] + Io(String), + /// Codec error. + #[error("codec error: {0}")] + Codec(CodecError), +} + +impl Failure { + fn io(error: impl std::fmt::Display) -> Self { + Self::Io(error.to_string()) + } +} + +/// Command sent from the behaviour to a handler. +#[derive(Debug)] +pub enum ToHandler { + /// Send the encoded payload to the remote peer. + Send { + /// Request identifier used to correlate broadcast completions. + request_id: u64, + /// Encoded protobuf payload. + payload: Vec, + }, +} + +/// Event sent from the handler back to the behaviour. +#[derive(Debug)] +pub enum FromHandler { + /// A verified message was received. + Received { + /// Duty from the message. + duty: Duty, + /// Verified partial signature set. + data_set: ParSignedDataSet, + }, + /// An inbound message failed decoding, gating, or verification. + InboundError(Failure), + /// Outbound send completed successfully. + OutboundSuccess { + /// Request identifier. + request_id: u64, + }, + /// Outbound send failed. + OutboundError { + /// Request identifier. + request_id: u64, + /// Failure reason. + error: Failure, + }, +} + +type SendFuture = BoxFuture<'static, Result<(), Failure>>; +type RecvFuture = BoxFuture<'static, Result<(Duty, ParSignedDataSet), Failure>>; + +enum OutboundState { + IdleStream { stream: libp2p::swarm::Stream }, + RequestOpenStream { request_id: u64, payload: Vec }, + Sending { request_id: u64, future: SendFuture }, +} + +impl std::fmt::Debug for OutboundState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OutboundState::IdleStream { .. } => { + write!(f, "IdleStream {{ stream: }}") + } + OutboundState::RequestOpenStream { + request_id, + payload, + } => write!( + f, + "RequestOpenStream {{ request_id: {}, payload: {:?} }}", + request_id, payload + ), + OutboundState::Sending { request_id, .. } => write!( + f, + "Sending {{ request_id: {}, future: }}", + request_id + ), + } + } +} + +fn recv_message( + mut stream: libp2p::swarm::Stream, + verifier: Verifier, + duty_gater: DutyGater, + timeout: Duration, +) -> RecvFuture { + async move { + let recv = async { + let bytes = protocol::recv_message(&mut stream) + .await + .map_err(Failure::io)?; + let (duty, data_set) = + protocol::decode_message(&bytes).map_err(|_| Failure::InvalidPayload)?; + if !(duty_gater)(&duty) { + return Err(Failure::InvalidDuty); + } + + for (pub_key, par_sig) in data_set.inner() { + verifier(duty.clone(), *pub_key, par_sig.clone()) + .await + .map_err(|_| Failure::InvalidPartialSignature)?; + } + + Ok((duty, data_set)) + }; + + futures::pin_mut!(recv); + match futures::future::select(recv, Delay::new(timeout)).await { + futures::future::Either::Left((result, _)) => result, + futures::future::Either::Right(((), _)) => Err(Failure::Timeout), + } + } + .boxed() +} + +fn send_message( + mut stream: libp2p::swarm::Stream, + payload: Vec, + timeout: Duration, +) -> SendFuture { + async move { + let send = + protocol::send_message(&mut stream, &payload).map(|result| result.map_err(Failure::io)); + futures::pin_mut!(send); + match futures::future::select(send, Delay::new(timeout)).await { + futures::future::Either::Left((result, _)) => result, + futures::future::Either::Right(((), _)) => Err(Failure::Timeout), + } + } + .boxed() +} + +/// Connection handler for parsigex. +pub struct Handler { + timeout: Duration, + verifier: Verifier, + duty_gater: DutyGater, + outbound_queue: VecDeque<(u64, Vec)>, + outbound: Option, + inbound: Option, + pending_events: VecDeque, +} + +impl Handler { + /// Creates a new handler for one connection. + pub fn new( + timeout: Duration, + verifier: Verifier, + duty_gater: DutyGater, + _peer: PeerId, + ) -> Self { + Self { + timeout, + verifier, + duty_gater, + outbound_queue: VecDeque::new(), + outbound: None, + inbound: None, + pending_events: VecDeque::new(), + } + } + + fn on_dial_upgrade_error( + &mut self, + error: DialUpgradeError<(), ::OutboundProtocol>, + ) { + let Some(OutboundState::RequestOpenStream { request_id, .. }) = self.outbound.take() else { + return; + }; + + let failure = match error.error { + StreamUpgradeError::Timeout => Failure::Timeout, + StreamUpgradeError::NegotiationFailed => Failure::io("protocol negotiation failed"), + StreamUpgradeError::Apply(e) => libp2p::core::util::unreachable(e), + StreamUpgradeError::Io(e) => Failure::io(e), + }; + + self.pending_events.push_back(FromHandler::OutboundError { + request_id, + error: failure, + }); + } +} + +impl ConnectionHandler for Handler { + type FromBehaviour = ToHandler; + type InboundOpenInfo = (); + type InboundProtocol = ReadyUpgrade; + type OutboundOpenInfo = (); + type OutboundProtocol = ReadyUpgrade; + type ToBehaviour = FromHandler; + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()) + } + + fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { + match event { + ToHandler::Send { + request_id, + payload, + } => self.outbound_queue.push_back((request_id, payload)), + } + } + + #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))] + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent, + > { + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + } + + if let Some(fut) = self.inbound.as_mut() { + match fut.poll_unpin(cx) { + Poll::Pending => {} + Poll::Ready(Ok((duty, data_set))) => { + self.inbound = None; + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + FromHandler::Received { duty, data_set }, + )); + } + Poll::Ready(Err(error)) => { + self.inbound = None; + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + FromHandler::InboundError(error), + )); + } + } + } + + if let Some(outbound) = self.outbound.take() { + match outbound { + OutboundState::IdleStream { stream } => { + if let Some((request_id, payload)) = self.outbound_queue.pop_front() { + self.outbound = Some(OutboundState::Sending { + request_id, + future: send_message(stream, payload, self.timeout), + }); + } else { + self.outbound = Some(OutboundState::IdleStream { stream }); + } + } + OutboundState::RequestOpenStream { + request_id, + payload, + } => { + // Waiting for stream negotiation - put state back and return pending. + // The OutboundSubstreamRequest was already emitted when first entering this + // state. Returning it again would cause libp2p to panic + // with "cannot extract twice". + self.outbound = Some(OutboundState::RequestOpenStream { + request_id, + payload, + }); + } + OutboundState::Sending { + request_id, + mut future, + } => match future.poll_unpin(cx) { + Poll::Pending => { + self.outbound = Some(OutboundState::Sending { request_id, future }); + } + Poll::Ready(Ok(())) => { + self.outbound = None; + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + FromHandler::OutboundSuccess { request_id }, + )); + } + Poll::Ready(Err(error)) => { + self.outbound = None; + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + FromHandler::OutboundError { request_id, error }, + )); + } + }, + } + } + + // Only start a new outbound operation if none is in progress. + // This prevents overwriting RequestOpenStream or Sending states. + if self.outbound.is_none() + && let Some((request_id, payload)) = self.outbound_queue.pop_front() + { + self.outbound = Some(OutboundState::RequestOpenStream { + request_id, + payload, + }); + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()), + }); + } + + Poll::Pending + } + + fn on_connection_event( + &mut self, + event: ConnectionEvent, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol: mut stream, + .. + }) => { + stream.ignore_for_keep_alive(); + self.inbound = Some(recv_message( + stream, + self.verifier.clone(), + self.duty_gater.clone(), + self.timeout, + )); + } + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol: mut stream, + .. + }) => { + stream.ignore_for_keep_alive(); + match self.outbound.take() { + Some(OutboundState::RequestOpenStream { + request_id, + payload, + }) => { + self.outbound = Some(OutboundState::Sending { + request_id, + future: send_message(stream, payload, self.timeout), + }); + } + Some(OutboundState::Sending { request_id, future }) => { + self.outbound = Some(OutboundState::Sending { request_id, future }); + tracing::debug!( + "dropping unexpected outbound parsigex stream while a send is already in progress" + ); + } + Some(OutboundState::IdleStream { + stream: idle_stream, + }) => { + self.outbound = Some(OutboundState::IdleStream { + stream: idle_stream, + }); + tracing::debug!( + "dropping unexpected outbound parsigex stream while an idle stream is already cached" + ); + } + None => { + self.outbound = Some(OutboundState::IdleStream { stream }); + } + } + } + ConnectionEvent::DialUpgradeError(error) => self.on_dial_upgrade_error(error), + _ => {} + } + } +} diff --git a/crates/parsigex/src/lib.rs b/crates/parsigex/src/lib.rs new file mode 100644 index 00000000..ca967afc --- /dev/null +++ b/crates/parsigex/src/lib.rs @@ -0,0 +1,41 @@ +//! Partial signature exchange protocol. + +pub mod behaviour; +mod handler; +mod protocol; + +pub use behaviour::{ + Behaviour, Config, DutyGater, Error as BehaviourError, Event, Handle, Verifier, VerifyError, +}; +pub use handler::Handler; +pub use protocol::{decode_message, encode_message}; + +use libp2p::PeerId; +use pluto_core::ParSigExCodecError; + +/// The protocol name for partial signature exchange (version 2.0.0). +pub const PROTOCOL_NAME: libp2p::swarm::StreamProtocol = + libp2p::swarm::StreamProtocol::new("/charon/parsigex/2.0.0"); + +/// Returns the supported protocols in precedence order. +pub fn protocols() -> Vec { + vec![PROTOCOL_NAME] +} + +/// Error type for proto and conversion operations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Core codec error. + #[error(transparent)] + Codec(#[from] ParSigExCodecError), + + /// Broadcast failed for a peer. + #[error("broadcast to peer {peer} failed")] + BroadcastPeer { + /// Peer for which the broadcast failed. + peer: PeerId, + }, +} + +/// Result type for partial signature exchange operations. +pub type Result = std::result::Result; diff --git a/crates/parsigex/src/protocol.rs b/crates/parsigex/src/protocol.rs new file mode 100644 index 00000000..bfaccaf9 --- /dev/null +++ b/crates/parsigex/src/protocol.rs @@ -0,0 +1,68 @@ +//! Wire protocol helpers for partial signature exchange. + +use std::io; + +use libp2p::swarm::Stream; +use prost::Message; + +use pluto_core::{ + corepb::v1::{core as pbcore, parsigex as pbparsigex}, + types::{Duty, ParSignedDataSet}, +}; +use pluto_p2p::proto; + +use super::{Error, Result as ParasigexResult}; + +/// Maximum accepted message size. +const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024; + +/// Encodes a protobuf message to bytes. +pub fn encode_protobuf(message: &M) -> Vec { + let mut buf = Vec::with_capacity(message.encoded_len()); + message + .encode(&mut buf) + .expect("vec-backed protobuf encoding cannot fail"); + buf +} + +/// Decodes a protobuf message from bytes. +pub fn decode_protobuf( + bytes: &[u8], +) -> std::result::Result { + M::decode(bytes) +} + +/// Encodes a partial signature exchange message. +pub fn encode_message(duty: &Duty, data_set: &ParSignedDataSet) -> ParasigexResult> { + let pb = pbparsigex::ParSigExMsg { + duty: Some(pbcore::Duty::from(duty)), + data_set: Some(pbcore::ParSignedDataSet::try_from(data_set)?), + }; + + Ok(encode_protobuf(&pb)) +} + +/// Decodes a partial signature exchange message. +pub fn decode_message(bytes: &[u8]) -> ParasigexResult<(Duty, ParSignedDataSet)> { + let pb: pbparsigex::ParSigExMsg = decode_protobuf(bytes) + .map_err(|_| Error::from(pluto_core::ParSigExCodecError::InvalidMessageFields))?; + let duty_pb = pb + .duty + .ok_or(pluto_core::ParSigExCodecError::InvalidMessageFields)?; + let data_set_pb = pb + .data_set + .ok_or(pluto_core::ParSigExCodecError::InvalidMessageFields)?; + let duty = Duty::try_from(&duty_pb)?; + let data_set = ParSignedDataSet::try_from((&duty.duty_type, &data_set_pb))?; + Ok((duty, data_set)) +} + +/// Sends one protobuf message on the stream. +pub async fn send_message(stream: &mut Stream, payload: &[u8]) -> io::Result<()> { + proto::write_length_delimited(stream, payload).await +} + +/// Receives one protobuf payload from the stream. +pub async fn recv_message(stream: &mut Stream) -> io::Result> { + proto::read_length_delimited(stream, MAX_MESSAGE_SIZE).await +} diff --git a/crates/peerinfo/src/protocol.rs b/crates/peerinfo/src/protocol.rs index 2fe6ad70..5b284521 100644 --- a/crates/peerinfo/src/protocol.rs +++ b/crates/peerinfo/src/protocol.rs @@ -17,14 +17,12 @@ use std::{ }; use chrono::{DateTime, Utc}; -use futures::prelude::*; use libp2p::{PeerId, swarm::Stream}; use pluto_core::version::{self, SemVer, SemVerError}; -use prost::Message; +use pluto_p2p::proto; use regex::Regex; use tokio::sync::Mutex; use tracing::{info, warn}; -use unsigned_varint::aio::read_usize; use crate::{ LocalPeerInfo, @@ -32,9 +30,6 @@ use crate::{ peerinfopb::v1::peerinfo::PeerInfo, }; -/// Maximum message size (64KB should be plenty for peer info). -const MAX_MESSAGE_SIZE: usize = 64 * 1024; - static GIT_HASH_RE: LazyLock = LazyLock::new(|| Regex::new(r"^[0-9a-f]{7}$").expect("invalid regex")); @@ -51,57 +46,6 @@ pub struct ProtocolState { local_info: LocalPeerInfo, } -/// Writes a protobuf message with unsigned varint length prefix to the stream. -/// -/// Wire format: `[uvarint length][protobuf bytes]` -async fn write_protobuf( - stream: &mut S, - msg: &M, -) -> io::Result<()> { - // Encode message to protobuf bytes - let mut buf = Vec::with_capacity(msg.encoded_len()); - msg.encode(&mut buf) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - - // Write unsigned varint length prefix - let mut len_buf = unsigned_varint::encode::usize_buffer(); - let encoded_len = unsigned_varint::encode::usize(buf.len(), &mut len_buf); - stream.write_all(encoded_len).await?; - - // Write protobuf bytes - stream.write_all(&buf).await?; - stream.flush().await -} - -/// Reads a protobuf message with unsigned varint length prefix from the stream. -/// -/// Wire format: `[uvarint length][protobuf bytes]` -/// -/// Returns an error if the message exceeds `MAX_MESSAGE_SIZE`. -async fn read_protobuf( - stream: &mut S, -) -> io::Result { - // Read unsigned varint length prefix - let msg_len = read_usize(&mut *stream).await.map_err(|e| match e { - unsigned_varint::io::ReadError::Io(io_err) => io_err, - other => io::Error::new(io::ErrorKind::InvalidData, other), - })?; - - if msg_len > MAX_MESSAGE_SIZE { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - format!("message too large: {msg_len} bytes (max: {MAX_MESSAGE_SIZE})"), - )); - } - - // Read exactly `msg_len` protobuf bytes - let mut buf = vec![0u8; msg_len]; - stream.read_exact(&mut buf).await?; - - // Unmarshal protobuf - M::decode(&buf[..]).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) -} - /// Errors that can occur during the protocol. #[derive(Debug, thiserror::Error)] pub enum ProtocolError { @@ -317,8 +261,8 @@ impl ProtocolState { request: &PeerInfo, ) -> io::Result<(Stream, PeerInfo)> { let start = Instant::now(); - write_protobuf(&mut stream, request).await?; - let response = read_protobuf(&mut stream).await?; + proto::write_protobuf(&mut stream, request).await?; + let response = proto::read_protobuf(&mut stream).await?; let rtt = start.elapsed(); self.validate_peer_info(&response, rtt).await; @@ -334,8 +278,8 @@ impl ProtocolState { mut stream: Stream, local_info: &PeerInfo, ) -> io::Result<(Stream, PeerInfo)> { - let request = read_protobuf(&mut stream).await?; - write_protobuf(&mut stream, local_info).await?; + let request = proto::read_protobuf(&mut stream).await?; + proto::write_protobuf(&mut stream, local_info).await?; Ok((stream, request)) } } @@ -344,6 +288,7 @@ impl ProtocolState { mod tests { use super::*; use hex_literal::hex; + use prost::Message; // Test case: minimal // CharonVersion: "v1.0.0" @@ -571,7 +516,7 @@ mod tests { // Write to a cursor let mut buf = Vec::new(); - write_protobuf(&mut buf, &original).await.unwrap(); + proto::write_protobuf(&mut buf, &original).await.unwrap(); // The wire format should be: [varint length][protobuf bytes] // Minimal message is 14 bytes, so length prefix is just 1 byte (14 < 128) @@ -580,7 +525,7 @@ mod tests { // Read it back let mut cursor = futures::io::Cursor::new(&buf[..]); - let decoded: PeerInfo = read_protobuf(&mut cursor).await.unwrap(); + let decoded: PeerInfo = proto::read_protobuf(&mut cursor).await.unwrap(); assert_eq!(original, decoded); } @@ -589,11 +534,11 @@ mod tests { let original = make_full_peerinfo(); let mut buf = Vec::new(); - write_protobuf(&mut buf, &original).await.unwrap(); + proto::write_protobuf(&mut buf, &original).await.unwrap(); // Read it back let mut cursor = futures::io::Cursor::new(&buf[..]); - let decoded: PeerInfo = read_protobuf(&mut cursor).await.unwrap(); + let decoded: PeerInfo = proto::read_protobuf(&mut cursor).await.unwrap(); assert_eq!(original, decoded); } @@ -609,10 +554,10 @@ mod tests { for original in variants { let mut buf = Vec::new(); - write_protobuf(&mut buf, &original).await.unwrap(); + proto::write_protobuf(&mut buf, &original).await.unwrap(); let mut cursor = futures::io::Cursor::new(&buf[..]); - let decoded: PeerInfo = read_protobuf(&mut cursor).await.unwrap(); + let decoded: PeerInfo = proto::read_protobuf(&mut cursor).await.unwrap(); assert_eq!(original, decoded); } } @@ -621,13 +566,13 @@ mod tests { async fn test_read_protobuf_message_too_large() { // Create a buffer with a length prefix that exceeds MAX_MESSAGE_SIZE let mut buf = Vec::new(); - let large_len = MAX_MESSAGE_SIZE + 1; + let large_len = proto::MAX_MESSAGE_SIZE + 1; let mut len_buf = unsigned_varint::encode::usize_buffer(); let encoded_len = unsigned_varint::encode::usize(large_len, &mut len_buf); buf.extend_from_slice(encoded_len); let mut cursor = futures::io::Cursor::new(&buf[..]); - let result: io::Result = read_protobuf(&mut cursor).await; + let result: io::Result = proto::read_protobuf(&mut cursor).await; assert!(result.is_err()); let err = result.unwrap_err(); @@ -641,7 +586,7 @@ mod tests { let invalid_data = [0x05, 0xff, 0xff, 0xff, 0xff, 0xff]; // length 5, then garbage let mut cursor = futures::io::Cursor::new(&invalid_data[..]); - let result: io::Result = read_protobuf(&mut cursor).await; + let result: io::Result = proto::read_protobuf(&mut cursor).await; assert!(result.is_err()); assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidData); @@ -653,7 +598,7 @@ mod tests { let truncated = [0x10]; // claims 16 bytes but has none let mut cursor = futures::io::Cursor::new(&truncated[..]); - let result: io::Result = read_protobuf(&mut cursor).await; + let result: io::Result = proto::read_protobuf(&mut cursor).await; assert!(result.is_err()); assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof); @@ -667,15 +612,15 @@ mod tests { // Write multiple messages to the same buffer let mut buf = Vec::new(); - write_protobuf(&mut buf, &msg1).await.unwrap(); - write_protobuf(&mut buf, &msg2).await.unwrap(); - write_protobuf(&mut buf, &msg3).await.unwrap(); + proto::write_protobuf(&mut buf, &msg1).await.unwrap(); + proto::write_protobuf(&mut buf, &msg2).await.unwrap(); + proto::write_protobuf(&mut buf, &msg3).await.unwrap(); // Read them back in order let mut cursor = futures::io::Cursor::new(&buf[..]); - let decoded1: PeerInfo = read_protobuf(&mut cursor).await.unwrap(); - let decoded2: PeerInfo = read_protobuf(&mut cursor).await.unwrap(); - let decoded3: PeerInfo = read_protobuf(&mut cursor).await.unwrap(); + let decoded1: PeerInfo = proto::read_protobuf(&mut cursor).await.unwrap(); + let decoded2: PeerInfo = proto::read_protobuf(&mut cursor).await.unwrap(); + let decoded3: PeerInfo = proto::read_protobuf(&mut cursor).await.unwrap(); assert_eq!(msg1, decoded1); assert_eq!(msg2, decoded2);