From 8a3780e790b88b1cd2fd2556af573a754bf5e12b Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> Date: Thu, 19 Mar 2026 16:33:36 +0100 Subject: [PATCH 1/2] feat: add parsigex [wip] --- Cargo.lock | 8 + crates/core/Cargo.toml | 8 + crates/core/examples/parasigex.rs | 448 ++++++++++++++++++++++++ crates/core/src/lib.rs | 3 + crates/core/src/parsigex/behaviour.rs | 395 +++++++++++++++++++++ crates/core/src/parsigex/handler.rs | 328 +++++++++++++++++ crates/core/src/parsigex/mod.rs | 81 +++++ crates/core/src/parsigex/protocol.rs | 83 +++++ crates/core/src/parsigex/signed_data.rs | 78 +++++ crates/core/src/types.rs | 147 +++++++- crates/p2p/src/p2p.rs | 2 +- crates/p2p/src/relay.rs | 8 +- 12 files changed, 1583 insertions(+), 6 deletions(-) create mode 100644 crates/core/examples/parasigex.rs create mode 100644 crates/core/src/parsigex/behaviour.rs create mode 100644 crates/core/src/parsigex/handler.rs create mode 100644 crates/core/src/parsigex/mod.rs create mode 100644 crates/core/src/parsigex/protocol.rs create mode 100644 crates/core/src/parsigex/signed_data.rs diff --git a/Cargo.lock b/Cargo.lock index 2a28c0b..fa25ee6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5519,21 +5519,28 @@ name = "pluto-core" version = "1.7.1" dependencies = [ "alloy", + "anyhow", "async-trait", "base64 0.22.1", "built", "cancellation", "chrono", + "clap", "crossbeam", "dyn-clone", "dyn-eq", "futures", + "futures-timer", "hex", + "k256", "libp2p", "pluto-build-proto", + "pluto-cluster", "pluto-eth2api", "pluto-eth2util", + "pluto-p2p", "pluto-testutil", + "pluto-tracing", "prost 0.14.3", "prost-types 0.14.3", "rand 0.8.5", @@ -5547,6 +5554,7 @@ dependencies = [ "tokio-util", "tracing", "tree_hash", + "unsigned-varint 0.8.0", "vise", ] diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index ccbf223..3559caf 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -12,6 +12,7 @@ cancellation.workspace = true chrono.workspace = true crossbeam.workspace = true futures.workspace = true +futures-timer.workspace = true dyn-clone.workspace = true dyn-eq.workspace = true hex.workspace = true @@ -31,18 +32,25 @@ tokio-util.workspace = true tracing.workspace = true pluto-eth2util.workspace = true tree_hash.workspace = true +unsigned-varint.workspace = true [dev-dependencies] +anyhow.workspace = true alloy.workspace = true +clap.workspace = true rand.workspace = true libp2p.workspace = true +k256.workspace = true prost.workspace = true prost-types.workspace = true hex.workspace = true chrono.workspace = true test-case.workspace = true pluto-eth2util.workspace = true +pluto-cluster.workspace = true +pluto-p2p.workspace = true pluto-testutil.workspace = true +pluto-tracing.workspace = true [build-dependencies] pluto-build-proto.workspace = true diff --git a/crates/core/examples/parasigex.rs b/crates/core/examples/parasigex.rs new file mode 100644 index 0000000..5341bf4 --- /dev/null +++ b/crates/core/examples/parasigex.rs @@ -0,0 +1,448 @@ +#![allow(missing_docs)] + +use std::{collections::HashSet, path::PathBuf, time::Duration}; + +use anyhow::{Context, Result, anyhow}; +use clap::Parser; +use futures::StreamExt; +use libp2p::{ + identify, ping, + relay::{self}, + swarm::{NetworkBehaviour, SwarmEvent}, +}; +use pluto_cluster::lock::Lock; +use pluto_core::{ + parsigex::{self, DutyGater, Event, Handle, Verifier}, + signeddata::SignedRandao, + types::{Duty, DutyType, ParSignedDataSet, PubKey, SlotNumber}, +}; +use pluto_p2p::{ + behaviours::pluto::PlutoBehaviourEvent, + bootnode, + config::P2PConfig, + gater, k1, + p2p::{Node, NodeType}, + peer::peer_id_from_key, + relay::{MutableRelayReservation, RelayRouter}, +}; +use pluto_tracing::TracingConfig; +use tokio::fs; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn}; + +#[derive(NetworkBehaviour)] +#[behaviour(to_swarm = "CombinedBehaviourEvent")] +struct CombinedBehaviour { + relay: relay::client::Behaviour, + relay_reservation: MutableRelayReservation, + relay_router: RelayRouter, + parasigex: parsigex::Behaviour, +} + +#[derive(Debug)] +enum CombinedBehaviourEvent { + ParSigEx(Event), + Relay(relay::client::Event), +} + +impl From for CombinedBehaviourEvent { + fn from(event: Event) -> Self { + Self::ParSigEx(event) + } +} + +impl From for CombinedBehaviourEvent { + fn from(event: relay::client::Event) -> Self { + Self::Relay(event) + } +} + +impl From for CombinedBehaviourEvent { + fn from(value: std::convert::Infallible) -> Self { + match value {} + } +} + +#[derive(Debug, Parser)] +#[command(name = "parasigex-example")] +#[command(about = "Demonstrates partial signature exchange over the bootnode/relay P2P path")] +struct Args { + /// Relay URLs or multiaddrs. + #[arg(long, value_delimiter = ',')] + relays: Vec, + + /// Directory holding the p2p private key and cluster lock. + #[arg(long)] + data_dir: PathBuf, + + /// TCP listen addresses. + #[arg(long, value_delimiter = ',', default_value = "0.0.0.0:0")] + tcp_addrs: Vec, + + /// UDP listen addresses used for QUIC. + #[arg(long, value_delimiter = ',', default_value = "0.0.0.0:0")] + udp_addrs: Vec, + + /// Whether to filter private addresses from advertisements. + #[arg(long, default_value_t = false)] + filter_private_addrs: bool, + + /// External IP address to advertise. + #[arg(long)] + external_ip: Option, + + /// External hostname to advertise. + #[arg(long)] + external_host: Option, + + /// Whether to disable socket reuse-port. + #[arg(long, default_value_t = false)] + disable_reuse_port: bool, + + /// Emit a sample partial signature every N seconds. + #[arg(long, default_value_t = 10)] + broadcast_every: u64, + + /// Share index to use in the sample partial signature. + #[arg(long, default_value_t = 1)] + share_idx: u64, + + /// Log level. + #[arg(long, default_value = "info")] + log_level: String, +} + +fn make_sample_set(slot: u64, share_idx: u64) -> ParSignedDataSet { + let share_byte = u8::try_from(share_idx % 255).unwrap_or(1); + let pub_key = PubKey::new([share_byte; 48]); + + let mut set = ParSignedDataSet::new(); + set.insert( + pub_key, + SignedRandao::new_partial(slot / 32, [share_byte; 96], share_idx), + ); + set +} + +fn log_received(duty: &Duty, set: &ParSignedDataSet, peer: &libp2p::PeerId) { + let entries = set + .inner() + .iter() + .map(|(pub_key, data)| format!("{pub_key}:share_idx={}", data.share_idx)) + .collect::>() + .join(", "); + + info!(peer = %peer, duty = %duty, entries = %entries, "received partial signature set"); +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + pluto_tracing::init( + &TracingConfig::builder() + .with_default_console() + .override_env_filter(&args.log_level) + .build(), + )?; + + let key = k1::load_priv_key(&args.data_dir).with_context(|| { + format!( + "failed to load private key from {}", + args.data_dir.display() + ) + })?; + let local_peer_id = peer_id_from_key(key.public_key()) + .context("failed to derive local peer ID from private key")?; + + let lock_path = args.data_dir.join("cluster-lock.json"); + let lock_str = fs::read_to_string(&lock_path) + .await + .with_context(|| format!("failed to read {}", lock_path.display()))?; + let lock: Lock = serde_json::from_str(&lock_str) + .with_context(|| format!("failed to parse {}", lock_path.display()))?; + + let cancel = CancellationToken::new(); + let lock_hash_hex = hex::encode(&lock.lock_hash); + let relays = bootnode::new_relays(cancel.child_token(), &args.relays, &lock_hash_hex) + .await + .context("failed to resolve relays")?; + + let known_peers = lock + .peer_ids() + .context("failed to derive peer IDs from lock")?; + let self_index = known_peers + .iter() + .position(|peer_id| *peer_id == local_peer_id) + .ok_or_else(|| anyhow!("local peer ID {local_peer_id} not found in cluster lock"))?; + let conn_gater = gater::ConnGater::new( + gater::Config::closed() + .with_relays(relays.clone()) + .with_peer_ids(known_peers.clone()), + ); + + let verifier: Verifier = + std::sync::Arc::new(|_duty, _pubkey, _data| Box::pin(async { Ok(()) })); + let duty_gater: DutyGater = std::sync::Arc::new(|duty| duty.duty_type != DutyType::Unknown); + let handle_slot = std::sync::Arc::new(tokio::sync::Mutex::new(1_u64)); + + let p2p_config = P2PConfig { + relays: vec![], + external_ip: args.external_ip.clone(), + external_host: args.external_host.clone(), + tcp_addrs: args.tcp_addrs.clone(), + udp_addrs: args.udp_addrs.clone(), + disable_reuse_port: args.disable_reuse_port, + }; + + let relay_peer_ids: HashSet<_> = relays + .iter() + .filter_map(|relay| relay.peer().ok().flatten().map(|peer| peer.id)) + .collect(); + + let mut parasigex_handle: Option = None; + let mut node: Node = Node::new( + p2p_config, + key, + NodeType::QUIC, + args.filter_private_addrs, + known_peers.clone(), + |builder, keypair, relay_client| { + let p2p_context = builder.p2p_context(); + let broadcast_context = p2p_context.clone(); + let local_peer_id = keypair.public().to_peer_id(); + let config = parsigex::Config::new( + known_peers.clone(), + self_index, + verifier.clone(), + duty_gater.clone(), + std::sync::Arc::new(move |peer| { + !broadcast_context + .peer_store_lock() + .connections_to_peer(peer) + .is_empty() + }), + ) + .with_timeout(Duration::from_secs(10)); + let (parasigex, handle) = parsigex::Behaviour::new(config); + parasigex_handle = Some(handle); + + builder + .with_gater(conn_gater) + .with_inner(CombinedBehaviour { + parasigex, + relay: relay_client, + relay_reservation: MutableRelayReservation::new(relays.clone()), + relay_router: RelayRouter::new(relays.clone(), p2p_context, local_peer_id), + }) + }, + )?; + + let parasigex_handle = + parasigex_handle.ok_or_else(|| anyhow!("parasigex handle should be created"))?; + + info!( + peer_id = %node.local_peer_id(), + data_dir = %args.data_dir.display(), + known_peers = ?known_peers, + relays = ?args.relays, + "parasigex example started" + ); + + let mut ticker = tokio::time::interval(Duration::from_secs(args.broadcast_every)); + + loop { + tokio::select! { + _ = tokio::signal::ctrl_c() => { + info!("ctrl+c received, shutting down"); + break; + } + _ = ticker.tick() => { + info!("broadcasting sample partial signature set"); + let mut slot = handle_slot.lock().await; + let duty = Duty::new(SlotNumber::new(*slot), DutyType::Randao); + let data_set = make_sample_set(*slot, args.share_idx); + + match parasigex_handle.broadcast(duty.clone(), data_set.clone()).await { + Ok(()) => { + info!(duty = %duty, share_idx = args.share_idx, "broadcasted sample partial signature set"); + *slot = slot.saturating_add(1); + } + Err(error) => { + warn!(%error, "broadcast failed"); + } + } + } + event = node.select_next_some() => { + info!("received swarm event"); + let peer_type = |peer_id: &libp2p::PeerId| { + if relay_peer_ids.contains(peer_id) { + "RELAY" + } else if known_peers.contains(peer_id) { + "PEER" + } else { + "UNKNOWN" + } + }; + + match event { + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::Relay(relay::client::Event::ReservationReqAccepted { + relay_peer_id, + renewal, + limit, + }), + )) => { + info!( + relay_peer_id = %relay_peer_id, + peer_type = peer_type(&relay_peer_id), + renewal, + limit = ?limit, + "relay reservation accepted" + ); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::Relay(relay::client::Event::OutboundCircuitEstablished { + relay_peer_id, + limit, + }), + )) => { + info!( + relay_peer_id = %relay_peer_id, + peer_type = peer_type(&relay_peer_id), + limit = ?limit, + "outbound relay circuit established" + ); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::Relay(relay::client::Event::InboundCircuitEstablished { + src_peer_id, + limit, + }), + )) => { + info!( + src_peer_id = %src_peer_id, + peer_type = peer_type(&src_peer_id), + limit = ?limit, + "inbound relay circuit established" + ); + } + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint, + num_established, + .. + } => { + let address = match &endpoint { + libp2p::core::ConnectedPoint::Dialer { address, .. } => address, + libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => { + send_back_addr + } + }; + info!( + peer_id = %peer_id, + peer_type = peer_type(&peer_id), + address = %address, + num_established, + "connection established" + ); + } + SwarmEvent::ConnectionClosed { + peer_id, + endpoint, + num_established, + cause, + .. + } => { + let address = match &endpoint { + libp2p::core::ConnectedPoint::Dialer { address, .. } => address, + libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => { + send_back_addr + } + }; + info!( + peer_id = %peer_id, + peer_type = peer_type(&peer_id), + address = %address, + num_established, + cause = ?cause, + "connection closed" + ); + } + SwarmEvent::OutgoingConnectionError { + peer_id, + error, + connection_id, + } => { + warn!( + peer_id = ?peer_id, + connection_id = ?connection_id, + error = %error, + "outgoing connection failed" + ); + } + SwarmEvent::IncomingConnectionError { + connection_id, + local_addr, + send_back_addr, + error, + .. + } => { + warn!( + connection_id = ?connection_id, + local_addr = %local_addr, + send_back_addr = %send_back_addr, + error = %error, + "incoming connection failed" + ); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Identify( + identify::Event::Received { peer_id, info, .. }, + )) => { + info!( + peer_id = %peer_id, + peer_type = peer_type(&peer_id), + agent_version = %info.agent_version, + protocol_version = %info.protocol_version, + listen_addrs = ?info.listen_addrs, + "identify received" + ); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Ping(ping::Event { + peer, + result, + .. + })) => match result { + Ok(rtt) => { + info!(peer_id = %peer, peer_type = peer_type(&peer), rtt = ?rtt, "ping succeeded"); + } + Err(error) => { + warn!(peer_id = %peer, peer_type = peer_type(&peer), error = %error, "ping failed"); + } + }, + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::Received { + peer, + duty, + data_set, + .. + }), + )) => { + log_received(&duty, &data_set, &peer); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::Error { peer, error, .. }), + )) => { + warn!(peer = %peer, error = %error, "parasigex protocol error"); + } + SwarmEvent::NewListenAddr { address, .. } => { + info!(address = %address, "listening"); + } + _ => {} + } + } + } + } + + Ok(()) +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 62460d0..e83d6b7 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -26,6 +26,9 @@ pub mod deadline; /// parasigdb pub mod parasigdb; +/// Partial signature exchange. +pub mod parsigex; + /// Test utilities. #[cfg(test)] pub mod testutils; diff --git a/crates/core/src/parsigex/behaviour.rs b/crates/core/src/parsigex/behaviour.rs new file mode 100644 index 0000000..3a8f7d1 --- /dev/null +++ b/crates/core/src/parsigex/behaviour.rs @@ -0,0 +1,395 @@ +//! Network behaviour and control handle for partial signature exchange. + +use std::{ + collections::{HashMap, VecDeque}, + future::Future, + pin::Pin, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + task::{Context, Poll}, + time::Duration, +}; + +use libp2p::{ + Multiaddr, PeerId, + swarm::{ + ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, + }, +}; +use tokio::sync::{mpsc, oneshot}; + +use crate::types::{Duty, ParSignedData, ParSignedDataSet, PubKey}; + +use super::{ + Error as CodecError, Handler, encode_message, + handler::{Failure as HandlerFailure, FromHandler, ToHandler}, +}; + +/// Future returned by verifier callbacks. +pub type VerifyFuture = + Pin> + Send + 'static>>; + +/// Verifier callback type. +pub type Verifier = + Arc VerifyFuture + Send + Sync + 'static>; + +/// Duty gate callback type. +pub type DutyGater = Arc bool + Send + Sync + 'static>; + +/// Peer connection callback type. +pub type PeerConnectionChecker = Arc bool + Send + Sync + 'static>; + +/// Error type for signature verification callbacks. +#[derive(Debug, thiserror::Error)] +pub enum VerifyError { + /// Unknown validator public key. + #[error("unknown pubkey, not part of cluster lock")] + UnknownPubKey, + + /// Invalid share index for the validator. + #[error("invalid shareIdx")] + InvalidShareIndex, + + /// Invalid signed-data family for the duty. + #[error("invalid eth2 signed data")] + InvalidSignedDataFamily, + + /// Generic verification error. + #[error("{0}")] + Other(String), +} + +/// Error type for behaviour operations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Message conversion failed. + #[error(transparent)] + Codec(#[from] CodecError), + + /// Channel closed. + #[error("parsigex handle closed")] + Closed, + + /// Broadcast failed for a peer. + #[error("broadcast to peer {peer} failed: {source}")] + BroadcastPeer { + /// Peer for which the broadcast failed. + peer: PeerId, + /// Source error. + #[source] + source: HandlerFailure, + }, + + /// Peer is not currently connected. + #[error("peer {0} is not connected")] + PeerNotConnected(PeerId), +} + +/// Result type for partial signature exchange behaviour operations. +pub type Result = std::result::Result; + +/// Event emitted by the partial signature exchange behaviour. +#[derive(Debug, Clone)] +pub enum Event { + /// A verified partial signature set was received from a peer. + Received { + /// The remote peer. + peer: PeerId, + /// Connection on which it was received. + connection: ConnectionId, + /// Duty associated with the data set. + duty: Duty, + /// Partial signature set. + data_set: ParSignedDataSet, + }, + /// A peer sent invalid data or verification failed. + Error { + /// The remote peer. + peer: PeerId, + /// Connection on which the error occurred. + connection: ConnectionId, + /// Failure reason. + error: HandlerFailure, + }, +} + +#[derive(Debug)] +struct PendingBroadcast { + remaining: usize, + responder: oneshot::Sender>, +} + +#[derive(Debug)] +enum Command { + Broadcast { + request_id: u64, + duty: Duty, + data_set: ParSignedDataSet, + responder: oneshot::Sender>, + }, +} + +/// Async handle for outbound partial signature broadcasts. +#[derive(Debug, Clone)] +pub struct Handle { + tx: mpsc::UnboundedSender, + next_request_id: Arc, +} + +impl Handle { + /// Broadcasts a partial signature set to all peers except self. + pub async fn broadcast(&self, duty: Duty, data_set: ParSignedDataSet) -> Result<()> { + let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed); + let (tx, rx) = oneshot::channel(); + self.tx + .send(Command::Broadcast { + request_id, + duty, + data_set, + responder: tx, + }) + .map_err(|_| Error::Closed)?; + + Ok(()) + } +} + +/// Configuration for the partial signature exchange behaviour. +#[derive(Clone)] +pub struct Config { + peers: Vec, + self_index: usize, + verifier: Verifier, + duty_gater: DutyGater, + is_peer_connected: PeerConnectionChecker, + timeout: Duration, +} + +impl Config { + /// Creates a new configuration. + pub fn new( + peers: Vec, + self_index: usize, + verifier: Verifier, + duty_gater: DutyGater, + is_peer_connected: PeerConnectionChecker, + ) -> Self { + Self { + peers, + self_index, + verifier, + duty_gater, + is_peer_connected, + timeout: Duration::from_secs(20), + } + } + + /// Sets the send/receive timeout. + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } +} + +/// Behaviour for partial signature exchange. +pub struct Behaviour { + config: Config, + rx: mpsc::UnboundedReceiver, + pending_actions: VecDeque>>, + events: VecDeque, + pending_broadcasts: HashMap, +} + +impl Behaviour { + /// Creates a behaviour and a clonable broadcast handle. + pub fn new(config: Config) -> (Self, Handle) { + let (tx, rx) = mpsc::unbounded_channel(); + let handle = Handle { + tx, + next_request_id: Arc::new(AtomicU64::new(0)), + }; + + ( + Self { + config, + rx, + pending_actions: VecDeque::new(), + events: VecDeque::new(), + pending_broadcasts: HashMap::new(), + }, + handle, + ) + } + + fn handle_command(&mut self, command: Command) { + match command { + Command::Broadcast { + request_id, + duty, + data_set, + responder, + } => { + let message = match encode_message(&duty, &data_set) { + Ok(message) => message, + Err(err) => { + let _ = responder.send(Err(Error::from(err))); + return; + } + }; + + let mut targeted = 0usize; + for (idx, peer) in self.config.peers.iter().enumerate() { + if idx == self.config.self_index { + continue; + } + + if !(self.config.is_peer_connected)(peer) { + let _ = responder.send(Err(Error::PeerNotConnected(*peer))); + return; + } + + self.pending_actions.push_back(ToSwarm::NotifyHandler { + peer_id: *peer, + handler: NotifyHandler::Any, + event: ToHandler::Send { + request_id, + payload: message.clone(), + }, + }); + targeted = targeted.saturating_add(1); + } + + if targeted == 0 { + let _ = responder.send(Ok(())); + return; + } + + self.pending_broadcasts.insert( + request_id, + PendingBroadcast { + remaining: targeted, + responder, + }, + ); + } + } + } + + fn finish_broadcast_success(&mut self, request_id: u64) { + let Some(entry) = self.pending_broadcasts.get_mut(&request_id) else { + return; + }; + + entry.remaining = entry.remaining.saturating_sub(1); + if entry.remaining == 0 { + if let Some(entry) = self.pending_broadcasts.remove(&request_id) { + let _ = entry.responder.send(Ok(())); + } + } + } + + fn finish_broadcast_error(&mut self, request_id: u64, peer: PeerId, error: HandlerFailure) { + if let Some(entry) = self.pending_broadcasts.remove(&request_id) { + let _ = entry.responder.send(Err(Error::BroadcastPeer { + peer, + source: error, + })); + } + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = Handler; + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> std::result::Result, ConnectionDenied> { + tracing::trace!("establishing inbound connection to peer: {:?}", peer); + Ok(Handler::new( + self.config.timeout, + self.config.verifier.clone(), + self.config.duty_gater.clone(), + peer, + )) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + _addr: &Multiaddr, + _role_override: libp2p::core::Endpoint, + _port_use: libp2p::core::transport::PortUse, + ) -> std::result::Result, ConnectionDenied> { + tracing::trace!("establishing outbound connection to peer: {:?}", peer); + Ok(Handler::new( + self.config.timeout, + self.config.verifier.clone(), + self.config.duty_gater.clone(), + peer, + )) + } + + fn on_swarm_event(&mut self, _event: FromSwarm) {} + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + tracing::trace!("received connection handler event: {:?}", event); + match event { + FromHandler::Received { duty, data_set } => { + self.events.push_back(Event::Received { + peer: peer_id, + connection: connection_id, + duty, + data_set, + }); + } + FromHandler::InboundError(error) => { + self.events.push_back(Event::Error { + peer: peer_id, + connection: connection_id, + error, + }); + } + FromHandler::OutboundSuccess { request_id } => { + self.finish_broadcast_success(request_id); + } + FromHandler::OutboundError { request_id, error } => { + self.finish_broadcast_error(request_id, peer_id, error); + } + } + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + tracing::trace!("polling parsigex behaviour"); + + if let Some(event) = self.events.pop_front() { + return Poll::Ready(ToSwarm::GenerateEvent(event)); + } + + if let Poll::Ready(Some(command)) = self.rx.poll_recv(cx) { + self.handle_command(command); + } + + if let Some(action) = self.pending_actions.pop_front() { + return Poll::Ready(action); + } + + Poll::Pending + } +} diff --git a/crates/core/src/parsigex/handler.rs b/crates/core/src/parsigex/handler.rs new file mode 100644 index 0000000..b1cf0bb --- /dev/null +++ b/crates/core/src/parsigex/handler.rs @@ -0,0 +1,328 @@ +//! Connection handler for the partial signature exchange protocol. + +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; + +use futures::{future::BoxFuture, prelude::*}; +use futures_timer::Delay; +use libp2p::{ + PeerId, + core::upgrade::ReadyUpgrade, + swarm::{ + ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError, + SubstreamProtocol, + handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + }, + }, +}; + +use crate::types::{Duty, ParSignedDataSet}; + +use super::{DutyGater, PROTOCOL_NAME, Verifier, protocol}; + +/// Failure type for the partial signature exchange handler. +#[derive(Debug, Clone, thiserror::Error)] +pub enum Failure { + /// Stream negotiation timed out. + #[error("parsigex protocol negotiation timed out")] + Timeout, + /// Invalid payload. + #[error("invalid parsigex payload")] + InvalidPayload, + /// Duty not accepted by the gater. + #[error("invalid duty")] + InvalidDuty, + /// Signature verification failed. + #[error("invalid partial signature")] + InvalidPartialSignature, + /// I/O error. + #[error("{0}")] + Io(String), +} + +impl Failure { + fn io(error: impl std::fmt::Display) -> Self { + Self::Io(error.to_string()) + } +} + +/// Command sent from the behaviour to a handler. +#[derive(Debug, Clone)] +pub enum ToHandler { + /// Send the encoded payload to the remote peer. + Send { + /// Request identifier used to correlate broadcast completions. + request_id: u64, + /// Encoded protobuf payload. + payload: Vec, + }, +} + +/// Event sent from the handler back to the behaviour. +#[derive(Debug, Clone)] +pub enum FromHandler { + /// A verified message was received. + Received { + /// Duty from the message. + duty: Duty, + /// Verified partial signature set. + data_set: ParSignedDataSet, + }, + /// An inbound message failed decoding, gating, or verification. + InboundError(Failure), + /// Outbound send completed successfully. + OutboundSuccess { + /// Request identifier. + request_id: u64, + }, + /// Outbound send failed. + OutboundError { + /// Request identifier. + request_id: u64, + /// Failure reason. + error: Failure, + }, +} + +type SendFuture = BoxFuture<'static, Result<(), Failure>>; +type RecvFuture = BoxFuture<'static, Result<(Duty, ParSignedDataSet), Failure>>; + +enum OutboundState { + OpenStream { request_id: u64, payload: Vec }, + Sending { request_id: u64, future: SendFuture }, +} + +/// Connection handler for parsigex. +pub struct Handler { + timeout: Duration, + verifier: Verifier, + duty_gater: DutyGater, + peer: PeerId, + outbound_queue: VecDeque<(u64, Vec)>, + outbound: Option, + inbound: Option, + pending_events: VecDeque, +} + +impl Handler { + /// Creates a new handler for one connection. + pub fn new(timeout: Duration, verifier: Verifier, duty_gater: DutyGater, peer: PeerId) -> Self { + Self { + timeout, + verifier, + duty_gater, + peer, + outbound_queue: VecDeque::new(), + outbound: None, + inbound: None, + pending_events: VecDeque::new(), + } + } + + fn on_dial_upgrade_error( + &mut self, + error: DialUpgradeError<(), ::OutboundProtocol>, + ) { + let Some(OutboundState::OpenStream { request_id, .. }) = self.outbound.take() else { + return; + }; + + let failure = match error.error { + StreamUpgradeError::Timeout => Failure::Timeout, + StreamUpgradeError::NegotiationFailed => Failure::io("protocol negotiation failed"), + StreamUpgradeError::Apply(e) => libp2p::core::util::unreachable(e), + StreamUpgradeError::Io(e) => Failure::io(e), + }; + + self.pending_events.push_back(FromHandler::OutboundError { + request_id, + error: failure, + }); + } +} + +impl ConnectionHandler for Handler { + type FromBehaviour = ToHandler; + type InboundOpenInfo = (); + type InboundProtocol = ReadyUpgrade; + type OutboundOpenInfo = (); + type OutboundProtocol = ReadyUpgrade; + type ToBehaviour = FromHandler; + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()) + } + + fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { + match event { + ToHandler::Send { + request_id, + payload, + } => self.outbound_queue.push_back((request_id, payload)), + } + } + + #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))] + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent, + > { + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + } + + if let Some(fut) = self.inbound.as_mut() { + match fut.poll_unpin(cx) { + Poll::Pending => {} + Poll::Ready(Ok((duty, data_set))) => { + self.inbound = None; + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + FromHandler::Received { duty, data_set }, + )); + } + Poll::Ready(Err(error)) => { + self.inbound = None; + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + FromHandler::InboundError(error), + )); + } + } + } + + if let Some(outbound) = self.outbound.take() { + match outbound { + OutboundState::OpenStream { + request_id, + payload, + } => { + self.outbound = Some(OutboundState::OpenStream { + request_id, + payload, + }); + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()), + }); + } + OutboundState::Sending { + request_id, + mut future, + } => match future.poll_unpin(cx) { + Poll::Pending => { + self.outbound = Some(OutboundState::Sending { request_id, future }); + } + Poll::Ready(Ok(())) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + FromHandler::OutboundSuccess { request_id }, + )); + } + Poll::Ready(Err(error)) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + FromHandler::OutboundError { request_id, error }, + )); + } + }, + } + } + + if let Some((request_id, payload)) = self.outbound_queue.pop_front() { + self.outbound = Some(OutboundState::OpenStream { + request_id, + payload, + }); + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()), + }); + } + + Poll::Pending + } + + fn on_connection_event( + &mut self, + event: ConnectionEvent, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol: mut stream, + .. + }) => { + stream.ignore_for_keep_alive(); + let verifier = self.verifier.clone(); + let duty_gater = self.duty_gater.clone(); + let timeout = self.timeout; + self.inbound = Some( + async move { + let recv = async { + let bytes = protocol::recv_message(&mut stream) + .await + .map_err(Failure::io)?; + let (duty, data_set) = protocol::decode_message(&bytes) + .map_err(|_| Failure::InvalidPayload)?; + if !(duty_gater)(&duty) { + return Err(Failure::InvalidDuty); + } + + for (pub_key, par_sig) in data_set.inner() { + verifier(duty.clone(), *pub_key, par_sig.clone()) + .await + .map_err(|_| Failure::InvalidPartialSignature)?; + } + + Ok((duty, data_set)) + }; + + futures::pin_mut!(recv); + match futures::future::select(recv, Delay::new(timeout)).await { + futures::future::Either::Left((result, _)) => result, + futures::future::Either::Right(((), _)) => Err(Failure::Timeout), + } + } + .boxed(), + ); + } + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol: mut stream, + .. + }) => { + stream.ignore_for_keep_alive(); + let Some(OutboundState::OpenStream { + request_id, + payload, + }) = self.outbound.take() + else { + self.pending_events.push_back(FromHandler::OutboundError { + request_id: 0, + error: Failure::io(format!( + "unexpected outbound stream state for peer {}", + self.peer + )), + }); + return; + }; + + let timeout = self.timeout; + self.outbound = Some(OutboundState::Sending { + request_id, + future: async move { + let send = protocol::send_message(&mut stream, &payload) + .map(|result| result.map_err(Failure::io)); + futures::pin_mut!(send); + match futures::future::select(send, Delay::new(timeout)).await { + futures::future::Either::Left((result, _)) => result, + futures::future::Either::Right(((), _)) => Err(Failure::Timeout), + } + } + .boxed(), + }); + } + ConnectionEvent::DialUpgradeError(error) => self.on_dial_upgrade_error(error), + _ => {} + } + } +} diff --git a/crates/core/src/parsigex/mod.rs b/crates/core/src/parsigex/mod.rs new file mode 100644 index 0000000..312cadc --- /dev/null +++ b/crates/core/src/parsigex/mod.rs @@ -0,0 +1,81 @@ +//! Partial signature exchange protocol. +//! +//! In-memory exchange test helpers are intentionally not part of this module. +//! We should revisit that only when wiring higher-level integration coverage in +//! `testutil/integration`. +//! +//! The reason is dependency direction: `core` sits above `testutil` in the +//! dependency tree, so test scaffolding for integration-style exchange should +//! not live in `core`. + +pub mod behaviour; +mod handler; +mod protocol; +pub(crate) mod signed_data; + +use libp2p::PeerId; + +pub use behaviour::{ + Behaviour, Config, DutyGater, Error as BehaviourError, Event, Handle, Verifier, VerifyError, +}; +pub use handler::Handler; +pub use protocol::{decode_message, encode_message}; + +/// The protocol name for partial signature exchange (version 2.0.0). +pub const PROTOCOL_NAME: libp2p::swarm::StreamProtocol = + libp2p::swarm::StreamProtocol::new("/charon/parsigex/2.0.0"); + +/// Returns the supported protocols in precedence order. +pub fn protocols() -> Vec { + vec![PROTOCOL_NAME] +} + +/// Error type for proto and conversion operations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Missing duty or data set fields. + #[error("invalid parsigex msg fields")] + InvalidMessageFields, + + /// Invalid partial signed data set proto. + #[error("invalid partial signed data set proto fields")] + InvalidParSignedDataSetFields, + + /// Invalid partial signed proto. + #[error("invalid partial signed proto")] + InvalidParSignedProto, + + /// Invalid duty type. + #[error("invalid duty")] + InvalidDuty, + + /// Unsupported duty type. + #[error("unsupported duty type")] + UnsupportedDutyType, + + /// Deprecated builder proposer duty. + #[error("deprecated duty builder proposer")] + DeprecatedBuilderProposer, + + /// Failed to parse a public key. + #[error("invalid public key: {0}")] + InvalidPubKey(String), + + /// Invalid share index. + #[error("invalid share index")] + InvalidShareIndex, + + /// Serialization failed. + #[error("marshal signed data: {0}")] + Serialize(#[from] serde_json::Error), + + /// Broadcast failed for a peer. + #[error("broadcast to peer {peer} failed")] + BroadcastPeer { + /// Peer for which the broadcast failed. + peer: PeerId, + }, +} + +/// Result type for partial signature exchange operations. +pub type Result = std::result::Result; diff --git a/crates/core/src/parsigex/protocol.rs b/crates/core/src/parsigex/protocol.rs new file mode 100644 index 0000000..d3bb812 --- /dev/null +++ b/crates/core/src/parsigex/protocol.rs @@ -0,0 +1,83 @@ +//! Wire protocol helpers for partial signature exchange. + +use std::io; + +use futures::prelude::*; +use libp2p::swarm::Stream; +use prost::Message; +use unsigned_varint::aio::read_usize; + +use crate::{ + corepb::v1::{core as pbcore, parsigex as pbparsigex}, + types::{Duty, ParSignedDataSet}, +}; + +use super::{Error, Result as ParasigexResult}; + +/// Maximum accepted message size. +const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024; + +/// Encodes a protobuf message to bytes. +pub fn encode_protobuf(message: &M) -> Vec { + let mut buf = Vec::with_capacity(message.encoded_len()); + message + .encode(&mut buf) + .expect("vec-backed protobuf encoding cannot fail"); + buf +} + +/// Decodes a protobuf message from bytes. +pub fn decode_protobuf( + bytes: &[u8], +) -> std::result::Result { + M::decode(bytes) +} + +/// Encodes a partial signature exchange message. +pub fn encode_message(duty: &Duty, data_set: &ParSignedDataSet) -> ParasigexResult> { + let pb = pbparsigex::ParSigExMsg { + duty: Some(pbcore::Duty::from(duty)), + data_set: Some(pbcore::ParSignedDataSet::try_from(data_set)?), + }; + + Ok(encode_protobuf(&pb)) +} + +/// Decodes a partial signature exchange message. +pub fn decode_message(bytes: &[u8]) -> ParasigexResult<(Duty, ParSignedDataSet)> { + let pb: pbparsigex::ParSigExMsg = + decode_protobuf(bytes).map_err(|_| Error::InvalidMessageFields)?; + let duty_pb = pb.duty.ok_or(Error::InvalidMessageFields)?; + let data_set_pb = pb.data_set.ok_or(Error::InvalidMessageFields)?; + let duty = Duty::try_from(&duty_pb)?; + let data_set = ParSignedDataSet::try_from((&duty.duty_type, &data_set_pb))?; + Ok((duty, data_set)) +} + +/// Sends one protobuf message on the stream. +pub async fn send_message(stream: &mut Stream, payload: &[u8]) -> io::Result<()> { + let mut len_buf = unsigned_varint::encode::usize_buffer(); + let encoded_len = unsigned_varint::encode::usize(payload.len(), &mut len_buf); + stream.write_all(encoded_len).await?; + stream.write_all(payload).await?; + stream.flush().await +} + +/// Receives one protobuf payload from the stream. +pub async fn recv_message(stream: &mut Stream) -> io::Result> { + let length = read_usize(&mut *stream).await.map_err(|err| match err { + unsigned_varint::io::ReadError::Io(err) => err, + other => io::Error::new(io::ErrorKind::InvalidData, other), + })?; + + if length > MAX_MESSAGE_SIZE { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("message too large: {length} bytes"), + )); + } + + let mut buf = vec![0_u8; length]; + stream.read_exact(&mut buf).await?; + Ok(buf) +} diff --git a/crates/core/src/parsigex/signed_data.rs b/crates/core/src/parsigex/signed_data.rs new file mode 100644 index 0000000..b6e9bed --- /dev/null +++ b/crates/core/src/parsigex/signed_data.rs @@ -0,0 +1,78 @@ +//! Message and type conversion helpers for partial signature exchange. + +use std::any::Any; + +use crate::{ + signeddata::{ + Attestation, BeaconCommitteeSelection, SignedAggregateAndProof, SignedRandao, + SignedSyncContributionAndProof, SignedSyncMessage, SignedVoluntaryExit, + SyncCommitteeSelection, VersionedAttestation, VersionedSignedAggregateAndProof, + VersionedSignedProposal, VersionedSignedValidatorRegistration, + }, + types::{DutyType, Signature, SignedData}, +}; + +use super::Error; + +pub(crate) fn serialize_signed_data(data: &dyn SignedData) -> Result, Error> { + let any = data as &dyn Any; + + macro_rules! serialize_as { + ($ty:ty) => { + if let Some(value) = any.downcast_ref::<$ty>() { + return Ok(serde_json::to_vec(value)?); + } + }; + } + + serialize_as!(Attestation); + serialize_as!(VersionedAttestation); + serialize_as!(VersionedSignedProposal); + serialize_as!(VersionedSignedValidatorRegistration); + serialize_as!(SignedVoluntaryExit); + serialize_as!(SignedRandao); + serialize_as!(Signature); + serialize_as!(BeaconCommitteeSelection); + serialize_as!(SignedAggregateAndProof); + serialize_as!(VersionedSignedAggregateAndProof); + serialize_as!(SignedSyncMessage); + serialize_as!(SyncCommitteeSelection); + serialize_as!(SignedSyncContributionAndProof); + + Err(Error::UnsupportedDutyType) +} + +pub(crate) fn deserialize_signed_data( + duty_type: &DutyType, + bytes: &[u8], +) -> Result, Error> { + macro_rules! deserialize_json { + ($ty:ty) => { + serde_json::from_slice::<$ty>(bytes) + .map(|value| Box::new(value) as Box) + .map_err(Error::from) + }; + } + + match duty_type { + DutyType::Attester => deserialize_json!(VersionedAttestation) + .or_else(|_| deserialize_json!(Attestation)) + .map_err(|_| Error::UnsupportedDutyType), + DutyType::Proposer => deserialize_json!(VersionedSignedProposal), + DutyType::BuilderProposer => Err(Error::DeprecatedBuilderProposer), + DutyType::BuilderRegistration => deserialize_json!(VersionedSignedValidatorRegistration), + DutyType::Exit => deserialize_json!(SignedVoluntaryExit), + DutyType::Randao => deserialize_json!(SignedRandao), + DutyType::Signature => deserialize_json!(Signature), + DutyType::PrepareAggregator => deserialize_json!(BeaconCommitteeSelection), + DutyType::Aggregator => deserialize_json!(VersionedSignedAggregateAndProof) + .or_else(|_| deserialize_json!(SignedAggregateAndProof)) + .map_err(|_| Error::UnsupportedDutyType), + DutyType::SyncMessage => deserialize_json!(SignedSyncMessage), + DutyType::PrepareSyncContribution => deserialize_json!(SyncCommitteeSelection), + DutyType::SyncContribution => deserialize_json!(SignedSyncContributionAndProof), + DutyType::Unknown | DutyType::InfoSync | DutyType::DutySentinel(_) => { + Err(Error::UnsupportedDutyType) + } + } +} diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 78e2bc6..bae0d93 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -1,6 +1,6 @@ //! Types for the Charon core. -use std::{collections::HashMap, fmt::Display, iter}; +use std::{any::Any, collections::HashMap, fmt::Display, iter}; use chrono::{DateTime, Duration, Utc}; use dyn_clone::DynClone; @@ -8,7 +8,14 @@ use dyn_eq::DynEq; use serde::{Deserialize, Serialize}; use std::fmt::Debug as StdDebug; -use crate::signeddata::SignedDataError; +use crate::{ + corepb::v1::core as pbcore, + parsigex::{ + Error as ParSigExCodecError, + signed_data::{deserialize_signed_data, serialize_signed_data}, + }, + signeddata::SignedDataError, +}; /// The type of duty. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -66,6 +73,52 @@ impl DutyType { } } +impl From<&DutyType> for i32 { + fn from(duty_type: &DutyType) -> Self { + match duty_type { + DutyType::Unknown => 0, + DutyType::Proposer => 1, + DutyType::Attester => 2, + DutyType::Signature => 3, + DutyType::Exit => 4, + DutyType::BuilderProposer => 5, + DutyType::BuilderRegistration => 6, + DutyType::Randao => 7, + DutyType::PrepareAggregator => 8, + DutyType::Aggregator => 9, + DutyType::SyncMessage => 10, + DutyType::PrepareSyncContribution => 11, + DutyType::SyncContribution => 12, + DutyType::InfoSync => 13, + DutyType::DutySentinel(_) => 14, + } + } +} + +impl TryFrom for DutyType { + type Error = ParSigExCodecError; + + fn try_from(value: i32) -> Result { + match value { + 0 => Ok(DutyType::Unknown), + 1 => Ok(DutyType::Proposer), + 2 => Ok(DutyType::Attester), + 3 => Ok(DutyType::Signature), + 4 => Ok(DutyType::Exit), + 5 => Ok(DutyType::BuilderProposer), + 6 => Ok(DutyType::BuilderRegistration), + 7 => Ok(DutyType::Randao), + 8 => Ok(DutyType::PrepareAggregator), + 9 => Ok(DutyType::Aggregator), + 10 => Ok(DutyType::SyncMessage), + 11 => Ok(DutyType::PrepareSyncContribution), + 12 => Ok(DutyType::SyncContribution), + 13 => Ok(DutyType::InfoSync), + _ => Err(ParSigExCodecError::InvalidDuty), + } + } +} + /// SlotNumber struct #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct SlotNumber(u64); @@ -192,6 +245,28 @@ impl Duty { } } +impl From<&Duty> for pbcore::Duty { + fn from(duty: &Duty) -> Self { + Self { + slot: duty.slot.inner(), + r#type: i32::from(&duty.duty_type), + } + } +} + +impl TryFrom<&pbcore::Duty> for Duty { + type Error = ParSigExCodecError; + + fn try_from(duty: &pbcore::Duty) -> Result { + let duty_type = DutyType::try_from(duty.r#type)?; + if !duty_type.is_valid() { + return Err(ParSigExCodecError::InvalidDuty); + } + + Ok(Self::new(duty.slot.into(), duty_type)) + } +} + /// The type of proposal. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -452,7 +527,7 @@ impl AsRef<[u8; SIG_LEN]> for Signature { } /// Signed data type -pub trait SignedData: DynClone + DynEq + StdDebug + Send + Sync { +pub trait SignedData: Any + DynClone + DynEq + StdDebug + Send + Sync { /// signature returns the signed duty data's signature. fn signature(&self) -> Result; @@ -517,6 +592,39 @@ impl ParSignedData { } } +impl TryFrom<&ParSignedData> for pbcore::ParSignedData { + type Error = ParSigExCodecError; + + fn try_from(data: &ParSignedData) -> Result { + let encoded = serialize_signed_data(data.signed_data.as_ref())?; + let share_idx = + i32::try_from(data.share_idx).map_err(|_| ParSigExCodecError::InvalidShareIndex)?; + let signature = data.signed_data.signature().map_err(|err| { + ParSigExCodecError::Serialize(serde_json::Error::io(std::io::Error::other( + err.to_string(), + ))) + })?; + + Ok(Self { + data: encoded.into(), + signature: signature.as_ref().to_vec().into(), + share_idx, + }) + } +} + +impl TryFrom<(&DutyType, &pbcore::ParSignedData)> for ParSignedData { + type Error = ParSigExCodecError; + + fn try_from(value: (&DutyType, &pbcore::ParSignedData)) -> Result { + let (duty_type, data) = value; + let share_idx = + u64::try_from(data.share_idx).map_err(|_| ParSigExCodecError::InvalidShareIndex)?; + let signed_data = deserialize_signed_data(duty_type, &data.data)?; + Ok(Self::new_boxed(signed_data, share_idx)) + } +} + /// ParSignedDataSet is a set of partially signed duty data only signed by a /// single threshold BLS share. #[derive(Debug, Clone, PartialEq, Eq, Default)] @@ -554,6 +662,39 @@ impl ParSignedDataSet { } } +impl TryFrom<&ParSignedDataSet> for pbcore::ParSignedDataSet { + type Error = ParSigExCodecError; + + fn try_from(set: &ParSignedDataSet) -> Result { + let mut out = std::collections::BTreeMap::new(); + for (pub_key, value) in set.inner() { + out.insert(pub_key.to_string(), pbcore::ParSignedData::try_from(value)?); + } + + Ok(Self { set: out }) + } +} + +impl TryFrom<(&DutyType, &pbcore::ParSignedDataSet)> for ParSignedDataSet { + type Error = ParSigExCodecError; + + fn try_from(value: (&DutyType, &pbcore::ParSignedDataSet)) -> Result { + let (duty_type, set) = value; + if set.set.is_empty() { + return Err(ParSigExCodecError::InvalidParSignedDataSetFields); + } + + let mut out = Self::new(); + for (pub_key, value) in &set.set { + let pub_key = PubKey::try_from(pub_key.as_str()) + .map_err(|_| ParSigExCodecError::InvalidPubKey(pub_key.clone()))?; + out.insert(pub_key, ParSignedData::try_from((duty_type, value))?); + } + + Ok(out) + } +} + /// SignedDataSet is a set of signed duty data. #[derive(Debug, Clone, PartialEq, Eq)] pub struct SignedDataSet(HashMap); diff --git a/crates/p2p/src/p2p.rs b/crates/p2p/src/p2p.rs index a35a277..7a52840 100644 --- a/crates/p2p/src/p2p.rs +++ b/crates/p2p/src/p2p.rs @@ -336,7 +336,7 @@ impl Node { .map_err(P2PError::failed_to_build_swarm)? .with_behaviour(|key, relay_client| { let builder = - PlutoBehaviourBuilder::default().with_p2p_context(p2p_context.clone()); + PlutoBehaviourBuilder::default().with_p2p_context(p2p_context.clone()).with_quic_enabled(true); behaviour_fn(builder, key, relay_client).build(key) }) .map_err(P2PError::failed_to_build_swarm)? diff --git a/crates/p2p/src/relay.rs b/crates/p2p/src/relay.rs index c334670..e74c536 100644 --- a/crates/p2p/src/relay.rs +++ b/crates/p2p/src/relay.rs @@ -31,9 +31,10 @@ use libp2p::{ ToSwarm, dial_opts::DialOpts, dummy, }, }; -use tokio::time::Interval; +use tokio::time::{Instant, Interval}; const RELAY_ROUTER_INTERVAL: Duration = Duration::from_secs(60); +const RELAY_ROUTER_INITIAL_DELAY: Duration = Duration::from_secs(10); /// Mutable relay reservation behaviour. /// @@ -246,7 +247,10 @@ pub struct RelayRouter { impl RelayRouter { /// Creates a new relay router. pub fn new(relays: Vec, p2p_context: P2PContext, local_peer_id: PeerId) -> Self { - let mut interval = tokio::time::interval(RELAY_ROUTER_INTERVAL); + let mut interval = tokio::time::interval_at( + Instant::now() + RELAY_ROUTER_INITIAL_DELAY, + RELAY_ROUTER_INTERVAL, + ); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); Self { From 6e9d22499bb6d3457ae398aa17daea81289ca99a Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> Date: Fri, 20 Mar 2026 14:42:03 +0100 Subject: [PATCH 2/2] fix: parsigex --- Cargo.lock | 26 ++ Cargo.toml | 2 + crates/core/src/lib.rs | 5 +- crates/core/src/parsigex/mod.rs | 81 ------- .../signed_data.rs => parsigex_codec.rs} | 58 ++++- crates/core/src/types.rs | 6 +- crates/p2p/Cargo.toml | 2 + crates/p2p/src/lib.rs | 3 + crates/p2p/src/p2p.rs | 41 ++-- crates/p2p/src/proto.rs | 86 +++++++ crates/p2p/src/relay.rs | 4 +- crates/parsigex/Cargo.toml | 32 +++ .../examples/parsigex.rs} | 122 +++++++--- .../parsigex => parsigex/src}/behaviour.rs | 161 ++++++++----- .../src/parsigex => parsigex/src}/handler.rs | 227 ++++++++++++------ crates/parsigex/src/lib.rs | 41 ++++ .../src/parsigex => parsigex/src}/protocol.rs | 39 +-- crates/peerinfo/src/protocol.rs | 99 ++------ 18 files changed, 648 insertions(+), 387 deletions(-) delete mode 100644 crates/core/src/parsigex/mod.rs rename crates/core/src/{parsigex/signed_data.rs => parsigex_codec.rs} (61%) create mode 100644 crates/p2p/src/proto.rs create mode 100644 crates/parsigex/Cargo.toml rename crates/{core/examples/parasigex.rs => parsigex/examples/parsigex.rs} (77%) rename crates/{core/src/parsigex => parsigex/src}/behaviour.rs (70%) rename crates/{core/src/parsigex => parsigex/src}/handler.rs (56%) create mode 100644 crates/parsigex/src/lib.rs rename crates/{core/src/parsigex => parsigex/src}/protocol.rs (61%) diff --git a/Cargo.lock b/Cargo.lock index fa25ee6..d752c4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5688,6 +5688,7 @@ dependencies = [ "pluto-k1util", "pluto-testutil", "pluto-tracing", + "prost 0.14.3", "rand 0.8.5", "reqwest 0.13.2", "serde", @@ -5697,11 +5698,36 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "unsigned-varint 0.8.0", "url", "vise", "vise-exporter", ] +[[package]] +name = "pluto-parsigex" +version = "1.7.1" +dependencies = [ + "anyhow", + "clap", + "either", + "futures", + "futures-timer", + "hex", + "libp2p", + "pluto-cluster", + "pluto-core", + "pluto-p2p", + "pluto-tracing", + "prost 0.14.3", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tokio-util", + "tracing", + "unsigned-varint 0.8.0", +] + [[package]] name = "pluto-peerinfo" version = "1.7.1" diff --git a/Cargo.toml b/Cargo.toml index 3a7a2e1..c0c3f4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "crates/app", + "crates/parsigex", "crates/build-proto", "crates/cli", "crates/cluster", @@ -98,6 +99,7 @@ wiremock = "0.6" # Crates in the workspace pluto-app = { path = "crates/app" } +pluto-parsigex = { path = "crates/parasigex" } pluto-build-proto = { path = "crates/build-proto" } pluto-cli = { path = "crates/cli" } pluto-cluster = { path = "crates/cluster" } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 2fad345..2e35696 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -26,8 +26,9 @@ pub mod deadline; /// parsigdb pub mod parsigdb; -/// Partial signature exchange. -pub mod parsigex; +mod parsigex_codec; + +pub use parsigex_codec::ParSigExCodecError; /// Test utilities. #[cfg(test)] diff --git a/crates/core/src/parsigex/mod.rs b/crates/core/src/parsigex/mod.rs deleted file mode 100644 index 312cadc..0000000 --- a/crates/core/src/parsigex/mod.rs +++ /dev/null @@ -1,81 +0,0 @@ -//! Partial signature exchange protocol. -//! -//! In-memory exchange test helpers are intentionally not part of this module. -//! We should revisit that only when wiring higher-level integration coverage in -//! `testutil/integration`. -//! -//! The reason is dependency direction: `core` sits above `testutil` in the -//! dependency tree, so test scaffolding for integration-style exchange should -//! not live in `core`. - -pub mod behaviour; -mod handler; -mod protocol; -pub(crate) mod signed_data; - -use libp2p::PeerId; - -pub use behaviour::{ - Behaviour, Config, DutyGater, Error as BehaviourError, Event, Handle, Verifier, VerifyError, -}; -pub use handler::Handler; -pub use protocol::{decode_message, encode_message}; - -/// The protocol name for partial signature exchange (version 2.0.0). -pub const PROTOCOL_NAME: libp2p::swarm::StreamProtocol = - libp2p::swarm::StreamProtocol::new("/charon/parsigex/2.0.0"); - -/// Returns the supported protocols in precedence order. -pub fn protocols() -> Vec { - vec![PROTOCOL_NAME] -} - -/// Error type for proto and conversion operations. -#[derive(Debug, thiserror::Error)] -pub enum Error { - /// Missing duty or data set fields. - #[error("invalid parsigex msg fields")] - InvalidMessageFields, - - /// Invalid partial signed data set proto. - #[error("invalid partial signed data set proto fields")] - InvalidParSignedDataSetFields, - - /// Invalid partial signed proto. - #[error("invalid partial signed proto")] - InvalidParSignedProto, - - /// Invalid duty type. - #[error("invalid duty")] - InvalidDuty, - - /// Unsupported duty type. - #[error("unsupported duty type")] - UnsupportedDutyType, - - /// Deprecated builder proposer duty. - #[error("deprecated duty builder proposer")] - DeprecatedBuilderProposer, - - /// Failed to parse a public key. - #[error("invalid public key: {0}")] - InvalidPubKey(String), - - /// Invalid share index. - #[error("invalid share index")] - InvalidShareIndex, - - /// Serialization failed. - #[error("marshal signed data: {0}")] - Serialize(#[from] serde_json::Error), - - /// Broadcast failed for a peer. - #[error("broadcast to peer {peer} failed")] - BroadcastPeer { - /// Peer for which the broadcast failed. - peer: PeerId, - }, -} - -/// Result type for partial signature exchange operations. -pub type Result = std::result::Result; diff --git a/crates/core/src/parsigex/signed_data.rs b/crates/core/src/parsigex_codec.rs similarity index 61% rename from crates/core/src/parsigex/signed_data.rs rename to crates/core/src/parsigex_codec.rs index b6e9bed..1859be7 100644 --- a/crates/core/src/parsigex/signed_data.rs +++ b/crates/core/src/parsigex_codec.rs @@ -1,4 +1,4 @@ -//! Message and type conversion helpers for partial signature exchange. +//! Partial signature exchange codec helpers used by core types. use std::any::Any; @@ -12,9 +12,47 @@ use crate::{ types::{DutyType, Signature, SignedData}, }; -use super::Error; +/// Error type for partial signature exchange codec operations. +#[derive(Debug, thiserror::Error)] +pub enum ParSigExCodecError { + /// Missing duty or data set fields. + #[error("invalid parsigex msg fields")] + InvalidMessageFields, -pub(crate) fn serialize_signed_data(data: &dyn SignedData) -> Result, Error> { + /// Invalid partial signed data set proto. + #[error("invalid partial signed data set proto fields")] + InvalidParSignedDataSetFields, + + /// Invalid partial signed proto. + #[error("invalid partial signed proto")] + InvalidParSignedProto, + + /// Invalid duty type. + #[error("invalid duty")] + InvalidDuty, + + /// Unsupported duty type. + #[error("unsupported duty type")] + UnsupportedDutyType, + + /// Deprecated builder proposer duty. + #[error("deprecated duty builder proposer")] + DeprecatedBuilderProposer, + + /// Failed to parse a public key. + #[error("invalid public key: {0}")] + InvalidPubKey(String), + + /// Invalid share index. + #[error("invalid share index")] + InvalidShareIndex, + + /// Serialization failed. + #[error("marshal signed data: {0}")] + Serialize(#[from] serde_json::Error), +} + +pub(crate) fn serialize_signed_data(data: &dyn SignedData) -> Result, ParSigExCodecError> { let any = data as &dyn Any; macro_rules! serialize_as { @@ -39,27 +77,27 @@ pub(crate) fn serialize_signed_data(data: &dyn SignedData) -> Result, Er serialize_as!(SyncCommitteeSelection); serialize_as!(SignedSyncContributionAndProof); - Err(Error::UnsupportedDutyType) + Err(ParSigExCodecError::UnsupportedDutyType) } pub(crate) fn deserialize_signed_data( duty_type: &DutyType, bytes: &[u8], -) -> Result, Error> { +) -> Result, ParSigExCodecError> { macro_rules! deserialize_json { ($ty:ty) => { serde_json::from_slice::<$ty>(bytes) .map(|value| Box::new(value) as Box) - .map_err(Error::from) + .map_err(ParSigExCodecError::from) }; } match duty_type { DutyType::Attester => deserialize_json!(VersionedAttestation) .or_else(|_| deserialize_json!(Attestation)) - .map_err(|_| Error::UnsupportedDutyType), + .map_err(|_| ParSigExCodecError::UnsupportedDutyType), DutyType::Proposer => deserialize_json!(VersionedSignedProposal), - DutyType::BuilderProposer => Err(Error::DeprecatedBuilderProposer), + DutyType::BuilderProposer => Err(ParSigExCodecError::DeprecatedBuilderProposer), DutyType::BuilderRegistration => deserialize_json!(VersionedSignedValidatorRegistration), DutyType::Exit => deserialize_json!(SignedVoluntaryExit), DutyType::Randao => deserialize_json!(SignedRandao), @@ -67,12 +105,12 @@ pub(crate) fn deserialize_signed_data( DutyType::PrepareAggregator => deserialize_json!(BeaconCommitteeSelection), DutyType::Aggregator => deserialize_json!(VersionedSignedAggregateAndProof) .or_else(|_| deserialize_json!(SignedAggregateAndProof)) - .map_err(|_| Error::UnsupportedDutyType), + .map_err(|_| ParSigExCodecError::UnsupportedDutyType), DutyType::SyncMessage => deserialize_json!(SignedSyncMessage), DutyType::PrepareSyncContribution => deserialize_json!(SyncCommitteeSelection), DutyType::SyncContribution => deserialize_json!(SignedSyncContributionAndProof), DutyType::Unknown | DutyType::InfoSync | DutyType::DutySentinel(_) => { - Err(Error::UnsupportedDutyType) + Err(ParSigExCodecError::UnsupportedDutyType) } } } diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index bae0d93..05a73f0 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -9,11 +9,9 @@ use serde::{Deserialize, Serialize}; use std::fmt::Debug as StdDebug; use crate::{ + ParSigExCodecError, corepb::v1::core as pbcore, - parsigex::{ - Error as ParSigExCodecError, - signed_data::{deserialize_signed_data, serialize_signed_data}, - }, + parsigex_codec::{deserialize_signed_data, serialize_signed_data}, signeddata::SignedDataError, }; diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml index 750609b..935dbaf 100644 --- a/crates/p2p/Cargo.toml +++ b/crates/p2p/Cargo.toml @@ -16,6 +16,7 @@ thiserror.workspace = true k256.workspace = true pluto-eth2util.workspace = true pluto-k1util.workspace = true +prost.workspace = true vise.workspace = true tokio.workspace = true tokio-util.workspace = true @@ -29,6 +30,7 @@ pluto-core.workspace = true backon.workspace = true reqwest.workspace = true url.workspace = true +unsigned-varint.workspace = true [dev-dependencies] pluto-testutil.workspace = true diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 0c06608..5e85afc 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -51,3 +51,6 @@ pub mod relay; /// Force direct connection behaviour. pub mod force_direct; + +/// Protobuf utilities. +pub mod proto; diff --git a/crates/p2p/src/p2p.rs b/crates/p2p/src/p2p.rs index 7a52840..33ba5ab 100644 --- a/crates/p2p/src/p2p.rs +++ b/crates/p2p/src/p2p.rs @@ -110,6 +110,14 @@ use crate::{ utils, }; +const YAMUX_MAX_NUM_STREAMS: usize = 2_048; + +fn yamux_config() -> yamux::Config { + let mut config = yamux::Config::default(); + config.set_max_num_streams(YAMUX_MAX_NUM_STREAMS); + config +} + /// P2P error. #[derive(Debug, thiserror::Error)] pub enum P2PError { @@ -323,20 +331,17 @@ impl Node { { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) + .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_quic() .with_dns() .map_err(P2PError::failed_to_build_swarm)? - .with_relay_client(noise::Config::new, yamux::Config::default) + .with_relay_client(noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_behaviour(|key, relay_client| { - let builder = - PlutoBehaviourBuilder::default().with_p2p_context(p2p_context.clone()).with_quic_enabled(true); + let builder = PlutoBehaviourBuilder::default() + .with_p2p_context(p2p_context.clone()) + .with_quic_enabled(true); behaviour_fn(builder, key, relay_client).build(key) }) .map_err(P2PError::failed_to_build_swarm)? @@ -364,15 +369,11 @@ impl Node { { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) + .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_dns() .map_err(P2PError::failed_to_build_swarm)? - .with_relay_client(noise::Config::new, yamux::Config::default) + .with_relay_client(noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_behaviour(|key, relay_client| { let builder = @@ -400,11 +401,7 @@ impl Node { { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) + .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_quic() .with_dns() @@ -435,11 +432,7 @@ impl Node { { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) + .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_quic() .with_dns() diff --git a/crates/p2p/src/proto.rs b/crates/p2p/src/proto.rs new file mode 100644 index 0000000..57825b4 --- /dev/null +++ b/crates/p2p/src/proto.rs @@ -0,0 +1,86 @@ +use std::io; + +use futures::prelude::*; +use prost::Message; +use unsigned_varint::aio::read_usize; + +/// Default maximum message size (64KB should be plenty for peer info). +pub const MAX_MESSAGE_SIZE: usize = 64 * 1024; + +/// Writes a length-delimited payload with an unsigned varint length prefix. +/// +/// Wire format: `[uvarint length][payload bytes]` +pub async fn write_length_delimited( + stream: &mut S, + payload: &[u8], +) -> io::Result<()> { + let mut len_buf = unsigned_varint::encode::usize_buffer(); + let encoded_len = unsigned_varint::encode::usize(payload.len(), &mut len_buf); + stream.write_all(encoded_len).await?; + stream.write_all(payload).await?; + stream.flush().await +} + +/// Reads a length-delimited payload with an unsigned varint length prefix. +/// +/// Wire format: `[uvarint length][payload bytes]` +/// +/// Returns an error if the payload exceeds `max_message_size`. +pub async fn read_length_delimited( + stream: &mut S, + max_message_size: usize, +) -> io::Result> { + let msg_len = read_usize(&mut *stream).await.map_err(|e| match e { + unsigned_varint::io::ReadError::Io(io_err) => io_err, + other => io::Error::new(io::ErrorKind::InvalidData, other), + })?; + + if msg_len > max_message_size { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("message too large: {msg_len} bytes (max: {max_message_size})"), + )); + } + + let mut buf = vec![0u8; msg_len]; + stream.read_exact(&mut buf).await?; + Ok(buf) +} + +/// Writes a protobuf message with unsigned varint length prefix to the stream. +/// +/// Wire format: `[uvarint length][protobuf bytes]` +pub async fn write_protobuf( + stream: &mut S, + msg: &M, +) -> io::Result<()> { + let mut buf = Vec::with_capacity(msg.encoded_len()); + msg.encode(&mut buf) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + write_length_delimited(stream, &buf).await +} + +/// Reads a protobuf message with unsigned varint length prefix from the stream. +/// +/// Wire format: `[uvarint length][protobuf bytes]` +/// +/// Returns an error if the message exceeds `MAX_MESSAGE_SIZE`. +pub async fn read_protobuf( + stream: &mut S, +) -> io::Result { + read_protobuf_with_max_size(stream, MAX_MESSAGE_SIZE).await +} + +/// Reads a protobuf message with unsigned varint length prefix from the stream. +/// +/// Wire format: `[uvarint length][protobuf bytes]` +/// +/// Returns an error if the message exceeds `max_message_size`. +pub async fn read_protobuf_with_max_size( + stream: &mut S, + max_message_size: usize, +) -> io::Result { + let buf = read_length_delimited(stream, max_message_size).await?; + + M::decode(&buf[..]).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) +} diff --git a/crates/p2p/src/relay.rs b/crates/p2p/src/relay.rs index e74c536..946785e 100644 --- a/crates/p2p/src/relay.rs +++ b/crates/p2p/src/relay.rs @@ -248,7 +248,9 @@ impl RelayRouter { /// Creates a new relay router. pub fn new(relays: Vec, p2p_context: P2PContext, local_peer_id: PeerId) -> Self { let mut interval = tokio::time::interval_at( - Instant::now() + RELAY_ROUTER_INITIAL_DELAY, + Instant::now() + .checked_add(RELAY_ROUTER_INITIAL_DELAY) + .expect("should not overflow"), RELAY_ROUTER_INTERVAL, ); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); diff --git a/crates/parsigex/Cargo.toml b/crates/parsigex/Cargo.toml new file mode 100644 index 0000000..c89d7fb --- /dev/null +++ b/crates/parsigex/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "pluto-parsigex" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +publish.workspace = true + +[dependencies] +either.workspace = true +futures.workspace = true +futures-timer.workspace = true +libp2p.workspace = true +prost.workspace = true +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true +unsigned-varint.workspace = true +pluto-core.workspace = true +pluto-p2p.workspace = true + +[dev-dependencies] +anyhow.workspace = true +clap.workspace = true +hex.workspace = true +pluto-cluster.workspace = true +pluto-tracing.workspace = true +tokio-util.workspace = true +serde_json.workspace = true + +[lints] +workspace = true diff --git a/crates/core/examples/parasigex.rs b/crates/parsigex/examples/parsigex.rs similarity index 77% rename from crates/core/examples/parasigex.rs rename to crates/parsigex/examples/parsigex.rs index 5341bf4..af7d428 100644 --- a/crates/core/examples/parasigex.rs +++ b/crates/parsigex/examples/parsigex.rs @@ -1,6 +1,10 @@ #![allow(missing_docs)] -use std::{collections::HashSet, path::PathBuf, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + path::PathBuf, + time::Duration, +}; use anyhow::{Context, Result, anyhow}; use clap::Parser; @@ -12,7 +16,6 @@ use libp2p::{ }; use pluto_cluster::lock::Lock; use pluto_core::{ - parsigex::{self, DutyGater, Event, Handle, Verifier}, signeddata::SignedRandao, types::{Duty, DutyType, ParSignedDataSet, PubKey, SlotNumber}, }; @@ -25,6 +28,7 @@ use pluto_p2p::{ peer::peer_id_from_key, relay::{MutableRelayReservation, RelayRouter}, }; +use pluto_parsigex::{self as parsigex, DutyGater, Event, Handle, Verifier}; use pluto_tracing::TracingConfig; use tokio::fs; use tokio_util::sync::CancellationToken; @@ -36,7 +40,7 @@ struct CombinedBehaviour { relay: relay::client::Behaviour, relay_reservation: MutableRelayReservation, relay_router: RelayRouter, - parasigex: parsigex::Behaviour, + parsigex: parsigex::Behaviour, } #[derive(Debug)] @@ -64,7 +68,7 @@ impl From for CombinedBehaviourEvent { } #[derive(Debug, Parser)] -#[command(name = "parasigex-example")] +#[command(name = "parsigex-example")] #[command(about = "Demonstrates partial signature exchange over the bootnode/relay P2P path")] struct Args { /// Relay URLs or multiaddrs. @@ -171,10 +175,11 @@ async fn main() -> Result<()> { let known_peers = lock .peer_ids() .context("failed to derive peer IDs from lock")?; - let self_index = known_peers - .iter() - .position(|peer_id| *peer_id == local_peer_id) - .ok_or_else(|| anyhow!("local peer ID {local_peer_id} not found in cluster lock"))?; + if !known_peers.contains(&local_peer_id) { + return Err(anyhow!( + "local peer ID {local_peer_id} not found in cluster lock" + )); + } let conn_gater = gater::ConnGater::new( gater::Config::closed() .with_relays(relays.clone()) @@ -200,7 +205,7 @@ async fn main() -> Result<()> { .filter_map(|relay| relay.peer().ok().flatten().map(|peer| peer.id)) .collect(); - let mut parasigex_handle: Option = None; + let mut parsigex_handle: Option = None; let mut node: Node = Node::new( p2p_config, key, @@ -209,28 +214,21 @@ async fn main() -> Result<()> { known_peers.clone(), |builder, keypair, relay_client| { let p2p_context = builder.p2p_context(); - let broadcast_context = p2p_context.clone(); let local_peer_id = keypair.public().to_peer_id(); let config = parsigex::Config::new( - known_peers.clone(), - self_index, + local_peer_id, + p2p_context.clone(), verifier.clone(), duty_gater.clone(), - std::sync::Arc::new(move |peer| { - !broadcast_context - .peer_store_lock() - .connections_to_peer(peer) - .is_empty() - }), ) .with_timeout(Duration::from_secs(10)); - let (parasigex, handle) = parsigex::Behaviour::new(config); - parasigex_handle = Some(handle); + let (parsigex, handle) = parsigex::Behaviour::new(config, local_peer_id); + parsigex_handle = Some(handle); builder .with_gater(conn_gater) .with_inner(CombinedBehaviour { - parasigex, + parsigex, relay: relay_client, relay_reservation: MutableRelayReservation::new(relays.clone()), relay_router: RelayRouter::new(relays.clone(), p2p_context, local_peer_id), @@ -238,18 +236,19 @@ async fn main() -> Result<()> { }, )?; - let parasigex_handle = - parasigex_handle.ok_or_else(|| anyhow!("parasigex handle should be created"))?; + let parsigex_handle = + parsigex_handle.ok_or_else(|| anyhow!("parsigex handle should be created"))?; info!( peer_id = %node.local_peer_id(), data_dir = %args.data_dir.display(), known_peers = ?known_peers, relays = ?args.relays, - "parasigex example started" + "parsigex example started" ); let mut ticker = tokio::time::interval(Duration::from_secs(args.broadcast_every)); + let mut pending_broadcasts: HashMap = HashMap::new(); loop { tokio::select! { @@ -263,9 +262,15 @@ async fn main() -> Result<()> { let duty = Duty::new(SlotNumber::new(*slot), DutyType::Randao); let data_set = make_sample_set(*slot, args.share_idx); - match parasigex_handle.broadcast(duty.clone(), data_set.clone()).await { - Ok(()) => { - info!(duty = %duty, share_idx = args.share_idx, "broadcasted sample partial signature set"); + match parsigex_handle.broadcast(duty.clone(), data_set.clone()).await { + Ok(request_id) => { + pending_broadcasts.insert(request_id, (duty.clone(), args.share_idx)); + info!( + request_id, + duty = %duty, + share_idx = args.share_idx, + "queued sample partial signature set for broadcast" + ); *slot = slot.saturating_add(1); } Err(error) => { @@ -274,7 +279,6 @@ async fn main() -> Result<()> { } } event = node.select_next_some() => { - info!("received swarm event"); let peer_type = |peer_id: &libp2p::PeerId| { if relay_peer_ids.contains(peer_id) { "RELAY" @@ -433,7 +437,67 @@ async fn main() -> Result<()> { SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( CombinedBehaviourEvent::ParSigEx(Event::Error { peer, error, .. }), )) => { - warn!(peer = %peer, error = %error, "parasigex protocol error"); + warn!(peer = %peer, error = %error, "parsigex protocol error"); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::BroadcastError { + request_id, + peer, + error, + }), + )) => { + match pending_broadcasts.get(&request_id) { + Some((duty, share_idx)) => { + warn!( + request_id, + duty = %duty, + share_idx, + peer = ?peer, + error = %error, + "sample partial signature broadcast failed" + ); + } + None => { + warn!( + request_id, + peer = ?peer, + error = %error, + "partial signature broadcast failed" + ); + } + } + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::BroadcastComplete { + request_id, + }), + )) => { + if let Some((duty, share_idx)) = pending_broadcasts.remove(&request_id) { + info!( + request_id, + duty = %duty, + share_idx, + "broadcasted sample partial signature set" + ); + } else { + info!(request_id, "partial signature broadcast completed"); + } + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::BroadcastFinished { + request_id, + }), + )) => { + if let Some((duty, share_idx)) = pending_broadcasts.remove(&request_id) { + warn!( + request_id, + duty = %duty, + share_idx, + "sample partial signature broadcast finished with failures" + ); + } else { + warn!(request_id, "partial signature broadcast finished with failures"); + } } SwarmEvent::NewListenAddr { address, .. } => { info!(address = %address, "listening"); diff --git a/crates/core/src/parsigex/behaviour.rs b/crates/parsigex/src/behaviour.rs similarity index 70% rename from crates/core/src/parsigex/behaviour.rs rename to crates/parsigex/src/behaviour.rs index 3a8f7d1..4f9d6c5 100644 --- a/crates/core/src/parsigex/behaviour.rs +++ b/crates/parsigex/src/behaviour.rs @@ -12,21 +12,21 @@ use std::{ time::Duration, }; +use either::Either; use libp2p::{ Multiaddr, PeerId, swarm::{ ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, THandler, - THandlerInEvent, THandlerOutEvent, ToSwarm, + THandlerInEvent, THandlerOutEvent, ToSwarm, dummy, }, }; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; -use crate::types::{Duty, ParSignedData, ParSignedDataSet, PubKey}; +use pluto_core::types::{Duty, ParSignedData, ParSignedDataSet, PubKey}; +use pluto_p2p::p2p_context::P2PContext; -use super::{ - Error as CodecError, Handler, encode_message, - handler::{Failure as HandlerFailure, FromHandler, ToHandler}, -}; +use super::{Error as CodecError, Handler, encode_message}; +use crate::handler::{Failure as HandlerFailure, FromHandler, ToHandler}; /// Future returned by verifier callbacks. pub type VerifyFuture = @@ -39,9 +39,6 @@ pub type Verifier = /// Duty gate callback type. pub type DutyGater = Arc bool + Send + Sync + 'static>; -/// Peer connection callback type. -pub type PeerConnectionChecker = Arc bool + Send + Sync + 'static>; - /// Error type for signature verification callbacks. #[derive(Debug, thiserror::Error)] pub enum VerifyError { @@ -92,7 +89,7 @@ pub enum Error { pub type Result = std::result::Result; /// Event emitted by the partial signature exchange behaviour. -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum Event { /// A verified partial signature set was received from a peer. Received { @@ -114,12 +111,31 @@ pub enum Event { /// Failure reason. error: HandlerFailure, }, + /// Broadcast failed. + BroadcastError { + /// Request identifier. + request_id: u64, + /// Peer for which the broadcast failed. + peer: Option, + /// Failure reason. + error: HandlerFailure, + }, + /// Broadcast completed successfully for all targeted peers. + BroadcastComplete { + /// Request identifier. + request_id: u64, + }, + /// Broadcast finished after one or more peer failures. + BroadcastFinished { + /// Request identifier. + request_id: u64, + }, } #[derive(Debug)] struct PendingBroadcast { remaining: usize, - responder: oneshot::Sender>, + failed: bool, } #[derive(Debug)] @@ -128,7 +144,6 @@ enum Command { request_id: u64, duty: Duty, data_set: ParSignedDataSet, - responder: oneshot::Sender>, }, } @@ -141,48 +156,43 @@ pub struct Handle { impl Handle { /// Broadcasts a partial signature set to all peers except self. - pub async fn broadcast(&self, duty: Duty, data_set: ParSignedDataSet) -> Result<()> { + pub async fn broadcast(&self, duty: Duty, data_set: ParSignedDataSet) -> Result { let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed); - let (tx, rx) = oneshot::channel(); self.tx .send(Command::Broadcast { request_id, duty, data_set, - responder: tx, }) .map_err(|_| Error::Closed)?; - Ok(()) + Ok(request_id) } } /// Configuration for the partial signature exchange behaviour. #[derive(Clone)] pub struct Config { - peers: Vec, - self_index: usize, + peer_id: PeerId, + p2p_context: P2PContext, verifier: Verifier, duty_gater: DutyGater, - is_peer_connected: PeerConnectionChecker, timeout: Duration, } impl Config { /// Creates a new configuration. pub fn new( - peers: Vec, - self_index: usize, + peer_id: PeerId, + p2p_context: P2PContext, verifier: Verifier, duty_gater: DutyGater, - is_peer_connected: PeerConnectionChecker, ) -> Self { Self { - peers, - self_index, + peer_id, + p2p_context, verifier, duty_gater, - is_peer_connected, timeout: Duration::from_secs(20), } } @@ -205,7 +215,8 @@ pub struct Behaviour { impl Behaviour { /// Creates a behaviour and a clonable broadcast handle. - pub fn new(config: Config) -> (Self, Handle) { + pub fn new(config: Config, peer_id: PeerId) -> (Self, Handle) { + debug_assert_eq!(config.peer_id, peer_id); let (tx, rx) = mpsc::unbounded_channel(); let handle = Handle { tx, @@ -230,40 +241,55 @@ impl Behaviour { request_id, duty, data_set, - responder, } => { let message = match encode_message(&duty, &data_set) { Ok(message) => message, Err(err) => { - let _ = responder.send(Err(Error::from(err))); + self.broadcast_error(request_id, None, HandlerFailure::Codec(err)); return; } }; + let peers: Vec<_> = self + .config + .p2p_context + .known_peers() + .iter() + .copied() + .collect(); let mut targeted = 0usize; - for (idx, peer) in self.config.peers.iter().enumerate() { - if idx == self.config.self_index { + for peer in peers { + if peer == self.config.peer_id { continue; } - if !(self.config.is_peer_connected)(peer) { - let _ = responder.send(Err(Error::PeerNotConnected(*peer))); - return; + if self + .config + .p2p_context + .peer_store_lock() + .connections_to_peer(&peer) + .is_empty() + { + self.broadcast_error( + request_id, + Some(peer), + HandlerFailure::Io(format!("peer {peer} is not connected")), + ); + continue; } self.pending_actions.push_back(ToSwarm::NotifyHandler { - peer_id: *peer, + peer_id: peer, handler: NotifyHandler::Any, - event: ToHandler::Send { + event: Either::Left(ToHandler::Send { request_id, payload: message.clone(), - }, + }), }); targeted = targeted.saturating_add(1); } if targeted == 0 { - let _ = responder.send(Ok(())); return; } @@ -271,38 +297,47 @@ impl Behaviour { request_id, PendingBroadcast { remaining: targeted, - responder, + failed: false, }, ); } } } - fn finish_broadcast_success(&mut self, request_id: u64) { + fn finish_broadcast_result(&mut self, request_id: u64, failed: bool) { let Some(entry) = self.pending_broadcasts.get_mut(&request_id) else { return; }; + entry.failed |= failed; entry.remaining = entry.remaining.saturating_sub(1); if entry.remaining == 0 { - if let Some(entry) = self.pending_broadcasts.remove(&request_id) { - let _ = entry.responder.send(Ok(())); + let failed = self + .pending_broadcasts + .remove(&request_id) + .map(|entry| entry.failed) + .unwrap_or(failed); + if failed { + self.events + .push_back(Event::BroadcastFinished { request_id }); + } else { + self.events + .push_back(Event::BroadcastComplete { request_id }); } } } - fn finish_broadcast_error(&mut self, request_id: u64, peer: PeerId, error: HandlerFailure) { - if let Some(entry) = self.pending_broadcasts.remove(&request_id) { - let _ = entry.responder.send(Err(Error::BroadcastPeer { - peer, - source: error, - })); - } + fn broadcast_error(&mut self, request_id: u64, peer: Option, error: HandlerFailure) { + self.events.push_back(Event::BroadcastError { + request_id, + peer, + error, + }); } } impl NetworkBehaviour for Behaviour { - type ConnectionHandler = Handler; + type ConnectionHandler = Either; type ToSwarm = Event; fn handle_established_inbound_connection( @@ -312,13 +347,17 @@ impl NetworkBehaviour for Behaviour { _local_addr: &Multiaddr, _remote_addr: &Multiaddr, ) -> std::result::Result, ConnectionDenied> { + if !self.config.p2p_context.is_known_peer(&peer) { + return Ok(Either::Right(dummy::ConnectionHandler)); + } + tracing::trace!("establishing inbound connection to peer: {:?}", peer); - Ok(Handler::new( + Ok(Either::Left(Handler::new( self.config.timeout, self.config.verifier.clone(), self.config.duty_gater.clone(), peer, - )) + ))) } fn handle_established_outbound_connection( @@ -329,13 +368,17 @@ impl NetworkBehaviour for Behaviour { _role_override: libp2p::core::Endpoint, _port_use: libp2p::core::transport::PortUse, ) -> std::result::Result, ConnectionDenied> { + if !self.config.p2p_context.is_known_peer(&peer) { + return Ok(Either::Right(dummy::ConnectionHandler)); + } + tracing::trace!("establishing outbound connection to peer: {:?}", peer); - Ok(Handler::new( + Ok(Either::Left(Handler::new( self.config.timeout, self.config.verifier.clone(), self.config.duty_gater.clone(), peer, - )) + ))) } fn on_swarm_event(&mut self, _event: FromSwarm) {} @@ -346,6 +389,11 @@ impl NetworkBehaviour for Behaviour { connection_id: ConnectionId, event: THandlerOutEvent, ) { + let event = match event { + Either::Left(event) => event, + Either::Right(value) => match value {}, + }; + tracing::trace!("received connection handler event: {:?}", event); match event { FromHandler::Received { duty, data_set } => { @@ -364,10 +412,11 @@ impl NetworkBehaviour for Behaviour { }); } FromHandler::OutboundSuccess { request_id } => { - self.finish_broadcast_success(request_id); + self.finish_broadcast_result(request_id, false); } FromHandler::OutboundError { request_id, error } => { - self.finish_broadcast_error(request_id, peer_id, error); + self.finish_broadcast_result(request_id, true); + self.broadcast_error(request_id, Some(peer_id), error); } } } diff --git a/crates/core/src/parsigex/handler.rs b/crates/parsigex/src/handler.rs similarity index 56% rename from crates/core/src/parsigex/handler.rs rename to crates/parsigex/src/handler.rs index b1cf0bb..e1925aa 100644 --- a/crates/core/src/parsigex/handler.rs +++ b/crates/parsigex/src/handler.rs @@ -20,12 +20,13 @@ use libp2p::{ }, }; -use crate::types::{Duty, ParSignedDataSet}; +use pluto_core::types::{Duty, ParSignedDataSet}; use super::{DutyGater, PROTOCOL_NAME, Verifier, protocol}; +use crate::Error as CodecError; /// Failure type for the partial signature exchange handler. -#[derive(Debug, Clone, thiserror::Error)] +#[derive(Debug, thiserror::Error)] pub enum Failure { /// Stream negotiation timed out. #[error("parsigex protocol negotiation timed out")] @@ -42,6 +43,9 @@ pub enum Failure { /// I/O error. #[error("{0}")] Io(String), + /// Codec error. + #[error("codec error: {0}")] + Codec(CodecError), } impl Failure { @@ -51,7 +55,7 @@ impl Failure { } /// Command sent from the behaviour to a handler. -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum ToHandler { /// Send the encoded payload to the remote peer. Send { @@ -63,7 +67,7 @@ pub enum ToHandler { } /// Event sent from the handler back to the behaviour. -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum FromHandler { /// A verified message was received. Received { @@ -92,16 +96,91 @@ type SendFuture = BoxFuture<'static, Result<(), Failure>>; type RecvFuture = BoxFuture<'static, Result<(Duty, ParSignedDataSet), Failure>>; enum OutboundState { - OpenStream { request_id: u64, payload: Vec }, + IdleStream { stream: libp2p::swarm::Stream }, + RequestOpenStream { request_id: u64, payload: Vec }, Sending { request_id: u64, future: SendFuture }, } +impl std::fmt::Debug for OutboundState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OutboundState::IdleStream { .. } => { + write!(f, "IdleStream {{ stream: }}") + } + OutboundState::RequestOpenStream { + request_id, + payload, + } => write!( + f, + "RequestOpenStream {{ request_id: {}, payload: {:?} }}", + request_id, payload + ), + OutboundState::Sending { request_id, .. } => write!( + f, + "Sending {{ request_id: {}, future: }}", + request_id + ), + } + } +} + +fn recv_message( + mut stream: libp2p::swarm::Stream, + verifier: Verifier, + duty_gater: DutyGater, + timeout: Duration, +) -> RecvFuture { + async move { + let recv = async { + let bytes = protocol::recv_message(&mut stream) + .await + .map_err(Failure::io)?; + let (duty, data_set) = + protocol::decode_message(&bytes).map_err(|_| Failure::InvalidPayload)?; + if !(duty_gater)(&duty) { + return Err(Failure::InvalidDuty); + } + + for (pub_key, par_sig) in data_set.inner() { + verifier(duty.clone(), *pub_key, par_sig.clone()) + .await + .map_err(|_| Failure::InvalidPartialSignature)?; + } + + Ok((duty, data_set)) + }; + + futures::pin_mut!(recv); + match futures::future::select(recv, Delay::new(timeout)).await { + futures::future::Either::Left((result, _)) => result, + futures::future::Either::Right(((), _)) => Err(Failure::Timeout), + } + } + .boxed() +} + +fn send_message( + mut stream: libp2p::swarm::Stream, + payload: Vec, + timeout: Duration, +) -> SendFuture { + async move { + let send = + protocol::send_message(&mut stream, &payload).map(|result| result.map_err(Failure::io)); + futures::pin_mut!(send); + match futures::future::select(send, Delay::new(timeout)).await { + futures::future::Either::Left((result, _)) => result, + futures::future::Either::Right(((), _)) => Err(Failure::Timeout), + } + } + .boxed() +} + /// Connection handler for parsigex. pub struct Handler { timeout: Duration, verifier: Verifier, duty_gater: DutyGater, - peer: PeerId, outbound_queue: VecDeque<(u64, Vec)>, outbound: Option, inbound: Option, @@ -110,12 +189,16 @@ pub struct Handler { impl Handler { /// Creates a new handler for one connection. - pub fn new(timeout: Duration, verifier: Verifier, duty_gater: DutyGater, peer: PeerId) -> Self { + pub fn new( + timeout: Duration, + verifier: Verifier, + duty_gater: DutyGater, + _peer: PeerId, + ) -> Self { Self { timeout, verifier, duty_gater, - peer, outbound_queue: VecDeque::new(), outbound: None, inbound: None, @@ -127,7 +210,7 @@ impl Handler { &mut self, error: DialUpgradeError<(), ::OutboundProtocol>, ) { - let Some(OutboundState::OpenStream { request_id, .. }) = self.outbound.take() else { + let Some(OutboundState::RequestOpenStream { request_id, .. }) = self.outbound.take() else { return; }; @@ -197,17 +280,28 @@ impl ConnectionHandler for Handler { if let Some(outbound) = self.outbound.take() { match outbound { - OutboundState::OpenStream { + OutboundState::IdleStream { stream } => { + if let Some((request_id, payload)) = self.outbound_queue.pop_front() { + self.outbound = Some(OutboundState::Sending { + request_id, + future: send_message(stream, payload, self.timeout), + }); + } else { + self.outbound = Some(OutboundState::IdleStream { stream }); + } + } + OutboundState::RequestOpenStream { request_id, payload, } => { - self.outbound = Some(OutboundState::OpenStream { + // Waiting for stream negotiation - put state back and return pending. + // The OutboundSubstreamRequest was already emitted when first entering this + // state. Returning it again would cause libp2p to panic + // with "cannot extract twice". + self.outbound = Some(OutboundState::RequestOpenStream { request_id, payload, }); - return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()), - }); } OutboundState::Sending { request_id, @@ -217,11 +311,13 @@ impl ConnectionHandler for Handler { self.outbound = Some(OutboundState::Sending { request_id, future }); } Poll::Ready(Ok(())) => { + self.outbound = None; return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( FromHandler::OutboundSuccess { request_id }, )); } Poll::Ready(Err(error)) => { + self.outbound = None; return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( FromHandler::OutboundError { request_id, error }, )); @@ -230,8 +326,12 @@ impl ConnectionHandler for Handler { } } - if let Some((request_id, payload)) = self.outbound_queue.pop_front() { - self.outbound = Some(OutboundState::OpenStream { + // Only start a new outbound operation if none is in progress. + // This prevents overwriting RequestOpenStream or Sending states. + if self.outbound.is_none() + && let Some((request_id, payload)) = self.outbound_queue.pop_front() + { + self.outbound = Some(OutboundState::RequestOpenStream { request_id, payload, }); @@ -253,73 +353,48 @@ impl ConnectionHandler for Handler { .. }) => { stream.ignore_for_keep_alive(); - let verifier = self.verifier.clone(); - let duty_gater = self.duty_gater.clone(); - let timeout = self.timeout; - self.inbound = Some( - async move { - let recv = async { - let bytes = protocol::recv_message(&mut stream) - .await - .map_err(Failure::io)?; - let (duty, data_set) = protocol::decode_message(&bytes) - .map_err(|_| Failure::InvalidPayload)?; - if !(duty_gater)(&duty) { - return Err(Failure::InvalidDuty); - } - - for (pub_key, par_sig) in data_set.inner() { - verifier(duty.clone(), *pub_key, par_sig.clone()) - .await - .map_err(|_| Failure::InvalidPartialSignature)?; - } - - Ok((duty, data_set)) - }; - - futures::pin_mut!(recv); - match futures::future::select(recv, Delay::new(timeout)).await { - futures::future::Either::Left((result, _)) => result, - futures::future::Either::Right(((), _)) => Err(Failure::Timeout), - } - } - .boxed(), - ); + self.inbound = Some(recv_message( + stream, + self.verifier.clone(), + self.duty_gater.clone(), + self.timeout, + )); } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { protocol: mut stream, .. }) => { stream.ignore_for_keep_alive(); - let Some(OutboundState::OpenStream { - request_id, - payload, - }) = self.outbound.take() - else { - self.pending_events.push_back(FromHandler::OutboundError { - request_id: 0, - error: Failure::io(format!( - "unexpected outbound stream state for peer {}", - self.peer - )), - }); - return; - }; - - let timeout = self.timeout; - self.outbound = Some(OutboundState::Sending { - request_id, - future: async move { - let send = protocol::send_message(&mut stream, &payload) - .map(|result| result.map_err(Failure::io)); - futures::pin_mut!(send); - match futures::future::select(send, Delay::new(timeout)).await { - futures::future::Either::Left((result, _)) => result, - futures::future::Either::Right(((), _)) => Err(Failure::Timeout), - } + match self.outbound.take() { + Some(OutboundState::RequestOpenStream { + request_id, + payload, + }) => { + self.outbound = Some(OutboundState::Sending { + request_id, + future: send_message(stream, payload, self.timeout), + }); } - .boxed(), - }); + Some(OutboundState::Sending { request_id, future }) => { + self.outbound = Some(OutboundState::Sending { request_id, future }); + tracing::debug!( + "dropping unexpected outbound parsigex stream while a send is already in progress" + ); + } + Some(OutboundState::IdleStream { + stream: idle_stream, + }) => { + self.outbound = Some(OutboundState::IdleStream { + stream: idle_stream, + }); + tracing::debug!( + "dropping unexpected outbound parsigex stream while an idle stream is already cached" + ); + } + None => { + self.outbound = Some(OutboundState::IdleStream { stream }); + } + } } ConnectionEvent::DialUpgradeError(error) => self.on_dial_upgrade_error(error), _ => {} diff --git a/crates/parsigex/src/lib.rs b/crates/parsigex/src/lib.rs new file mode 100644 index 0000000..ca967af --- /dev/null +++ b/crates/parsigex/src/lib.rs @@ -0,0 +1,41 @@ +//! Partial signature exchange protocol. + +pub mod behaviour; +mod handler; +mod protocol; + +pub use behaviour::{ + Behaviour, Config, DutyGater, Error as BehaviourError, Event, Handle, Verifier, VerifyError, +}; +pub use handler::Handler; +pub use protocol::{decode_message, encode_message}; + +use libp2p::PeerId; +use pluto_core::ParSigExCodecError; + +/// The protocol name for partial signature exchange (version 2.0.0). +pub const PROTOCOL_NAME: libp2p::swarm::StreamProtocol = + libp2p::swarm::StreamProtocol::new("/charon/parsigex/2.0.0"); + +/// Returns the supported protocols in precedence order. +pub fn protocols() -> Vec { + vec![PROTOCOL_NAME] +} + +/// Error type for proto and conversion operations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Core codec error. + #[error(transparent)] + Codec(#[from] ParSigExCodecError), + + /// Broadcast failed for a peer. + #[error("broadcast to peer {peer} failed")] + BroadcastPeer { + /// Peer for which the broadcast failed. + peer: PeerId, + }, +} + +/// Result type for partial signature exchange operations. +pub type Result = std::result::Result; diff --git a/crates/core/src/parsigex/protocol.rs b/crates/parsigex/src/protocol.rs similarity index 61% rename from crates/core/src/parsigex/protocol.rs rename to crates/parsigex/src/protocol.rs index d3bb812..bfaccaf 100644 --- a/crates/core/src/parsigex/protocol.rs +++ b/crates/parsigex/src/protocol.rs @@ -2,15 +2,14 @@ use std::io; -use futures::prelude::*; use libp2p::swarm::Stream; use prost::Message; -use unsigned_varint::aio::read_usize; -use crate::{ +use pluto_core::{ corepb::v1::{core as pbcore, parsigex as pbparsigex}, types::{Duty, ParSignedDataSet}, }; +use pluto_p2p::proto; use super::{Error, Result as ParasigexResult}; @@ -45,10 +44,14 @@ pub fn encode_message(duty: &Duty, data_set: &ParSignedDataSet) -> ParasigexResu /// Decodes a partial signature exchange message. pub fn decode_message(bytes: &[u8]) -> ParasigexResult<(Duty, ParSignedDataSet)> { - let pb: pbparsigex::ParSigExMsg = - decode_protobuf(bytes).map_err(|_| Error::InvalidMessageFields)?; - let duty_pb = pb.duty.ok_or(Error::InvalidMessageFields)?; - let data_set_pb = pb.data_set.ok_or(Error::InvalidMessageFields)?; + let pb: pbparsigex::ParSigExMsg = decode_protobuf(bytes) + .map_err(|_| Error::from(pluto_core::ParSigExCodecError::InvalidMessageFields))?; + let duty_pb = pb + .duty + .ok_or(pluto_core::ParSigExCodecError::InvalidMessageFields)?; + let data_set_pb = pb + .data_set + .ok_or(pluto_core::ParSigExCodecError::InvalidMessageFields)?; let duty = Duty::try_from(&duty_pb)?; let data_set = ParSignedDataSet::try_from((&duty.duty_type, &data_set_pb))?; Ok((duty, data_set)) @@ -56,28 +59,10 @@ pub fn decode_message(bytes: &[u8]) -> ParasigexResult<(Duty, ParSignedDataSet)> /// Sends one protobuf message on the stream. pub async fn send_message(stream: &mut Stream, payload: &[u8]) -> io::Result<()> { - let mut len_buf = unsigned_varint::encode::usize_buffer(); - let encoded_len = unsigned_varint::encode::usize(payload.len(), &mut len_buf); - stream.write_all(encoded_len).await?; - stream.write_all(payload).await?; - stream.flush().await + proto::write_length_delimited(stream, payload).await } /// Receives one protobuf payload from the stream. pub async fn recv_message(stream: &mut Stream) -> io::Result> { - let length = read_usize(&mut *stream).await.map_err(|err| match err { - unsigned_varint::io::ReadError::Io(err) => err, - other => io::Error::new(io::ErrorKind::InvalidData, other), - })?; - - if length > MAX_MESSAGE_SIZE { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - format!("message too large: {length} bytes"), - )); - } - - let mut buf = vec![0_u8; length]; - stream.read_exact(&mut buf).await?; - Ok(buf) + proto::read_length_delimited(stream, MAX_MESSAGE_SIZE).await } diff --git a/crates/peerinfo/src/protocol.rs b/crates/peerinfo/src/protocol.rs index 2fe6ad7..5b28452 100644 --- a/crates/peerinfo/src/protocol.rs +++ b/crates/peerinfo/src/protocol.rs @@ -17,14 +17,12 @@ use std::{ }; use chrono::{DateTime, Utc}; -use futures::prelude::*; use libp2p::{PeerId, swarm::Stream}; use pluto_core::version::{self, SemVer, SemVerError}; -use prost::Message; +use pluto_p2p::proto; use regex::Regex; use tokio::sync::Mutex; use tracing::{info, warn}; -use unsigned_varint::aio::read_usize; use crate::{ LocalPeerInfo, @@ -32,9 +30,6 @@ use crate::{ peerinfopb::v1::peerinfo::PeerInfo, }; -/// Maximum message size (64KB should be plenty for peer info). -const MAX_MESSAGE_SIZE: usize = 64 * 1024; - static GIT_HASH_RE: LazyLock = LazyLock::new(|| Regex::new(r"^[0-9a-f]{7}$").expect("invalid regex")); @@ -51,57 +46,6 @@ pub struct ProtocolState { local_info: LocalPeerInfo, } -/// Writes a protobuf message with unsigned varint length prefix to the stream. -/// -/// Wire format: `[uvarint length][protobuf bytes]` -async fn write_protobuf( - stream: &mut S, - msg: &M, -) -> io::Result<()> { - // Encode message to protobuf bytes - let mut buf = Vec::with_capacity(msg.encoded_len()); - msg.encode(&mut buf) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - - // Write unsigned varint length prefix - let mut len_buf = unsigned_varint::encode::usize_buffer(); - let encoded_len = unsigned_varint::encode::usize(buf.len(), &mut len_buf); - stream.write_all(encoded_len).await?; - - // Write protobuf bytes - stream.write_all(&buf).await?; - stream.flush().await -} - -/// Reads a protobuf message with unsigned varint length prefix from the stream. -/// -/// Wire format: `[uvarint length][protobuf bytes]` -/// -/// Returns an error if the message exceeds `MAX_MESSAGE_SIZE`. -async fn read_protobuf( - stream: &mut S, -) -> io::Result { - // Read unsigned varint length prefix - let msg_len = read_usize(&mut *stream).await.map_err(|e| match e { - unsigned_varint::io::ReadError::Io(io_err) => io_err, - other => io::Error::new(io::ErrorKind::InvalidData, other), - })?; - - if msg_len > MAX_MESSAGE_SIZE { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - format!("message too large: {msg_len} bytes (max: {MAX_MESSAGE_SIZE})"), - )); - } - - // Read exactly `msg_len` protobuf bytes - let mut buf = vec![0u8; msg_len]; - stream.read_exact(&mut buf).await?; - - // Unmarshal protobuf - M::decode(&buf[..]).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) -} - /// Errors that can occur during the protocol. #[derive(Debug, thiserror::Error)] pub enum ProtocolError { @@ -317,8 +261,8 @@ impl ProtocolState { request: &PeerInfo, ) -> io::Result<(Stream, PeerInfo)> { let start = Instant::now(); - write_protobuf(&mut stream, request).await?; - let response = read_protobuf(&mut stream).await?; + proto::write_protobuf(&mut stream, request).await?; + let response = proto::read_protobuf(&mut stream).await?; let rtt = start.elapsed(); self.validate_peer_info(&response, rtt).await; @@ -334,8 +278,8 @@ impl ProtocolState { mut stream: Stream, local_info: &PeerInfo, ) -> io::Result<(Stream, PeerInfo)> { - let request = read_protobuf(&mut stream).await?; - write_protobuf(&mut stream, local_info).await?; + let request = proto::read_protobuf(&mut stream).await?; + proto::write_protobuf(&mut stream, local_info).await?; Ok((stream, request)) } } @@ -344,6 +288,7 @@ impl ProtocolState { mod tests { use super::*; use hex_literal::hex; + use prost::Message; // Test case: minimal // CharonVersion: "v1.0.0" @@ -571,7 +516,7 @@ mod tests { // Write to a cursor let mut buf = Vec::new(); - write_protobuf(&mut buf, &original).await.unwrap(); + proto::write_protobuf(&mut buf, &original).await.unwrap(); // The wire format should be: [varint length][protobuf bytes] // Minimal message is 14 bytes, so length prefix is just 1 byte (14 < 128) @@ -580,7 +525,7 @@ mod tests { // Read it back let mut cursor = futures::io::Cursor::new(&buf[..]); - let decoded: PeerInfo = read_protobuf(&mut cursor).await.unwrap(); + let decoded: PeerInfo = proto::read_protobuf(&mut cursor).await.unwrap(); assert_eq!(original, decoded); } @@ -589,11 +534,11 @@ mod tests { let original = make_full_peerinfo(); let mut buf = Vec::new(); - write_protobuf(&mut buf, &original).await.unwrap(); + proto::write_protobuf(&mut buf, &original).await.unwrap(); // Read it back let mut cursor = futures::io::Cursor::new(&buf[..]); - let decoded: PeerInfo = read_protobuf(&mut cursor).await.unwrap(); + let decoded: PeerInfo = proto::read_protobuf(&mut cursor).await.unwrap(); assert_eq!(original, decoded); } @@ -609,10 +554,10 @@ mod tests { for original in variants { let mut buf = Vec::new(); - write_protobuf(&mut buf, &original).await.unwrap(); + proto::write_protobuf(&mut buf, &original).await.unwrap(); let mut cursor = futures::io::Cursor::new(&buf[..]); - let decoded: PeerInfo = read_protobuf(&mut cursor).await.unwrap(); + let decoded: PeerInfo = proto::read_protobuf(&mut cursor).await.unwrap(); assert_eq!(original, decoded); } } @@ -621,13 +566,13 @@ mod tests { async fn test_read_protobuf_message_too_large() { // Create a buffer with a length prefix that exceeds MAX_MESSAGE_SIZE let mut buf = Vec::new(); - let large_len = MAX_MESSAGE_SIZE + 1; + let large_len = proto::MAX_MESSAGE_SIZE + 1; let mut len_buf = unsigned_varint::encode::usize_buffer(); let encoded_len = unsigned_varint::encode::usize(large_len, &mut len_buf); buf.extend_from_slice(encoded_len); let mut cursor = futures::io::Cursor::new(&buf[..]); - let result: io::Result = read_protobuf(&mut cursor).await; + let result: io::Result = proto::read_protobuf(&mut cursor).await; assert!(result.is_err()); let err = result.unwrap_err(); @@ -641,7 +586,7 @@ mod tests { let invalid_data = [0x05, 0xff, 0xff, 0xff, 0xff, 0xff]; // length 5, then garbage let mut cursor = futures::io::Cursor::new(&invalid_data[..]); - let result: io::Result = read_protobuf(&mut cursor).await; + let result: io::Result = proto::read_protobuf(&mut cursor).await; assert!(result.is_err()); assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidData); @@ -653,7 +598,7 @@ mod tests { let truncated = [0x10]; // claims 16 bytes but has none let mut cursor = futures::io::Cursor::new(&truncated[..]); - let result: io::Result = read_protobuf(&mut cursor).await; + let result: io::Result = proto::read_protobuf(&mut cursor).await; assert!(result.is_err()); assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof); @@ -667,15 +612,15 @@ mod tests { // Write multiple messages to the same buffer let mut buf = Vec::new(); - write_protobuf(&mut buf, &msg1).await.unwrap(); - write_protobuf(&mut buf, &msg2).await.unwrap(); - write_protobuf(&mut buf, &msg3).await.unwrap(); + proto::write_protobuf(&mut buf, &msg1).await.unwrap(); + proto::write_protobuf(&mut buf, &msg2).await.unwrap(); + proto::write_protobuf(&mut buf, &msg3).await.unwrap(); // Read them back in order let mut cursor = futures::io::Cursor::new(&buf[..]); - let decoded1: PeerInfo = read_protobuf(&mut cursor).await.unwrap(); - let decoded2: PeerInfo = read_protobuf(&mut cursor).await.unwrap(); - let decoded3: PeerInfo = read_protobuf(&mut cursor).await.unwrap(); + let decoded1: PeerInfo = proto::read_protobuf(&mut cursor).await.unwrap(); + let decoded2: PeerInfo = proto::read_protobuf(&mut cursor).await.unwrap(); + let decoded3: PeerInfo = proto::read_protobuf(&mut cursor).await.unwrap(); assert_eq!(msg1, decoded1); assert_eq!(msg2, decoded2);