From 13b5eb845f929f3a3225d0ca0daa086dcae85887 Mon Sep 17 00:00:00 2001 From: Rob Date: Fri, 2 Aug 2024 18:03:31 -0400 Subject: [PATCH 01/13] authenticate libp2p connections --- Cargo.lock | 1 + .../src/traits/networking/libp2p_network.rs | 26 +- crates/libp2p-networking/Cargo.toml | 1 + crates/libp2p-networking/src/network/mod.rs | 39 +- crates/libp2p-networking/src/network/node.rs | 28 +- .../src/network/node/config.rs | 13 +- .../src/network/node/handle.rs | 18 +- .../src/network/transport.rs | 541 ++++++++++++++++++ crates/libp2p-networking/tests/common/mod.rs | 36 +- crates/libp2p-networking/tests/counter.rs | 93 +-- 10 files changed, 705 insertions(+), 91 deletions(-) create mode 100644 crates/libp2p-networking/src/network/transport.rs diff --git a/Cargo.lock b/Cargo.lock index cae5c74589..1114335239 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4486,6 +4486,7 @@ dependencies = [ "libp2p", "libp2p-identity", "libp2p-swarm-derive", + "pin-project", "rand 0.8.5", "serde", "serde_bytes", diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index 6af0d9ba81..4bf14fada8 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -58,7 +58,9 @@ use libp2p_identity::{ use libp2p_networking::{ network::{ behaviours::request_response::{Request, Response}, - spawn_network_node, MeshParams, + spawn_network_node, + transport::construct_auth_message, + MeshParams, NetworkEvent::{self, DirectRequest, DirectResponse, GossipMsg}, NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeReceiver, NetworkNodeType, DEFAULT_REPLICATION_FACTOR, @@ -144,7 +146,7 @@ struct Libp2pNetworkInner { /// this node's public key pk: K, /// handle to control the network - handle: Arc, + handle: Arc>, /// Message Receiver receiver: UnboundedReceiver>, /// Receiver for Requests for Data, includes the request and the response chan @@ -393,7 +395,23 @@ impl Libp2pNetwork { .parse()?; // Build our libp2p configuration from our global, network configuration - let mut config_builder: NetworkNodeConfigBuilder = NetworkNodeConfigBuilder::default(); + let mut config_builder = NetworkNodeConfigBuilder::default(); + + // Extrapolate the stake table from the known nodes + let stake_table: HashSet = config + .config + .known_nodes_with_stake + .iter() + .map(|node| K::public_key(&node.stake_table_entry)) + .collect(); + + let auth_message = construct_auth_message(pub_key, priv_key) + .with_context(|| "Failed to construct auth message")?; + + // Set the auth message and stake table + config_builder + .stake_table(Some(stake_table)) + .auth_message(Some(auth_message)); // The replication factor is the minimum of [the default and 2/3 the number of nodes] let Some(default_replication_factor) = DEFAULT_REPLICATION_FACTOR else { @@ -486,7 +504,7 @@ impl Libp2pNetwork { #[allow(clippy::too_many_arguments)] pub async fn new( metrics: Libp2pMetricsValue, - config: NetworkNodeConfig, + config: NetworkNodeConfig, pk: K, bootstrap_addrs: BootstrapAddrs, id: usize, diff --git a/crates/libp2p-networking/Cargo.toml b/crates/libp2p-networking/Cargo.toml index f7e5083823..5e2d53f5f9 100644 --- a/crates/libp2p-networking/Cargo.toml +++ b/crates/libp2p-networking/Cargo.toml @@ -36,6 +36,7 @@ tide = { version = "0.16", optional = true, default-features = false, features = tracing = { workspace = true } void = "1" lazy_static = { workspace = true } +pin-project = "1" [target.'cfg(all(async_executor_impl = "tokio"))'.dependencies] libp2p = { workspace = true, features = ["tokio"] } diff --git a/crates/libp2p-networking/src/network/mod.rs b/crates/libp2p-networking/src/network/mod.rs index 94796117a9..e0707e9f09 100644 --- a/crates/libp2p-networking/src/network/mod.rs +++ b/crates/libp2p-networking/src/network/mod.rs @@ -6,10 +6,13 @@ mod def; pub mod error; /// functionality of a libp2p network node mod node; +/// Alternative Libp2p transport implementations +pub mod transport; use std::{collections::HashSet, fmt::Debug, str::FromStr}; use futures::channel::oneshot::{self, Sender}; +use hotshot_types::traits::signature_key::SignatureKey; #[cfg(async_executor_impl = "async-std")] use libp2p::dns::async_std::Transport as DnsTransport; #[cfg(async_executor_impl = "tokio")] @@ -31,6 +34,7 @@ use quic::async_std::Transport as QuicTransport; use quic::tokio::Transport as QuicTransport; use serde::{Deserialize, Serialize}; use tracing::instrument; +use transport::StakeTableAuthentication; use self::behaviours::request_response::{Request, Response}; pub use self::{ @@ -205,31 +209,34 @@ pub fn gen_multiaddr(port: u16) -> Multiaddr { /// This type is used to represent a transport in the libp2p network framework. The `PeerId` is a unique identifier for each peer in the network, and the `StreamMuxerBox` is a type of multiplexer that can handle multiple substreams over a single connection. type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>; -/// Generate authenticated transport +/// Generates an authenticated transport checked against the stake table. +/// If the stake table or authentication message is not provided, the transport will +/// not participate in stake table authentication. +/// /// # Errors -/// could not sign the quic key with `identity` +/// If we could not create a DNS transport #[instrument(skip(identity))] -pub async fn gen_transport(identity: Keypair) -> Result { - let quic_transport = { +pub async fn gen_transport( + identity: Keypair, + stake_table: Option>, + auth_message: Option>, +) -> Result { + // Create the initial `Quic` transport + let transport = { let mut config = quic::Config::new(&identity); config.handshake_timeout = std::time::Duration::from_secs(20); QuicTransport::new(config) }; - let dns_quic = { - #[cfg(async_executor_impl = "async-std")] - { - DnsTransport::system(quic_transport).await - } + // Require authentication against the stake table + let transport = StakeTableAuthentication::new(transport, stake_table, auth_message); - #[cfg(async_executor_impl = "tokio")] - { - DnsTransport::system(quic_transport) - } - } - .map_err(|e| NetworkError::TransportLaunch { source: e })?; + // Support DNS resolution + let transport = DnsTransport::system(transport) + .await + .map_err(|e| NetworkError::TransportLaunch { source: e })?; - Ok(dns_quic + Ok(transport .map(|(peer_id, connection), _| (peer_id, StreamMuxerBox::new(connection))) .boxed()) } diff --git a/crates/libp2p-networking/src/network/node.rs b/crates/libp2p-networking/src/network/node.rs index f80f322835..46d3d13816 100644 --- a/crates/libp2p-networking/src/network/node.rs +++ b/crates/libp2p-networking/src/network/node.rs @@ -17,7 +17,9 @@ use async_compatibility_layer::{ channel::{unbounded, UnboundedReceiver, UnboundedRecvError, UnboundedSender}, }; use futures::{channel::mpsc, select, FutureExt, SinkExt, StreamExt}; -use hotshot_types::constants::KAD_DEFAULT_REPUB_INTERVAL_SEC; +use hotshot_types::{ + constants::KAD_DEFAULT_REPUB_INTERVAL_SEC, traits::signature_key::SignatureKey, +}; use libp2p::{ autonat, core::transport::ListenerId, @@ -76,7 +78,7 @@ pub const ESTABLISHED_LIMIT_UNWR: u32 = 10; /// Network definition #[derive(custom_debug::Debug)] -pub struct NetworkNode { +pub struct NetworkNode { /// pub/private key from with peer_id is derived identity: Keypair, /// peer id of network node @@ -85,7 +87,7 @@ pub struct NetworkNode { #[debug(skip)] swarm: Swarm, /// the configuration parameters of the netework - config: NetworkNodeConfig, + config: NetworkNodeConfig, /// the listener id we are listening on, if it exists listener_id: Option, /// Handler for requests and response behavior events. @@ -100,7 +102,7 @@ pub struct NetworkNode { bootstrap_tx: Option>, } -impl NetworkNode { +impl NetworkNode { /// Returns number of peers this node is connected to pub fn num_connected(&self) -> usize { self.swarm.connected_peers().count() @@ -158,17 +160,25 @@ impl NetworkNode { /// * Generates a connection to the "broadcast" topic /// * Creates a swarm to manage peers and events #[instrument] - pub async fn new(config: NetworkNodeConfig) -> Result { - // Generate a random PeerId + pub async fn new(config: NetworkNodeConfig) -> Result { + // Generate a random `KeyPair` if one is not specified let identity = if let Some(ref kp) = config.identity { kp.clone() } else { Keypair::generate_ed25519() }; + + // Get the `PeerId` from the `KeyPair` let peer_id = PeerId::from(identity.public()); - debug!(?peer_id); - let transport: BoxedTransport = gen_transport(identity.clone()).await?; - debug!("Launched network transport"); + + // Generate the transport from the identity, stake table, and auth message + let transport: BoxedTransport = gen_transport::( + identity.clone(), + config.stake_table.clone(), + config.auth_message.clone(), + ) + .await?; + // Generate the swarm let mut swarm: Swarm = { // Use the hash of the message's contents as the ID diff --git a/crates/libp2p-networking/src/network/node/config.rs b/crates/libp2p-networking/src/network/node/config.rs index aa831f0ce6..e51106cdfd 100644 --- a/crates/libp2p-networking/src/network/node/config.rs +++ b/crates/libp2p-networking/src/network/node/config.rs @@ -1,5 +1,6 @@ use std::{collections::HashSet, num::NonZeroUsize, time::Duration}; +use hotshot_types::traits::signature_key::SignatureKey; use libp2p::{identity::Keypair, Multiaddr}; use libp2p_identity::PeerId; @@ -10,7 +11,7 @@ pub const DEFAULT_REPLICATION_FACTOR: Option = NonZeroUsize::new(1 /// describe the configuration of the network #[derive(Clone, Default, derive_builder::Builder, custom_debug::Debug)] -pub struct NetworkNodeConfig { +pub struct NetworkNodeConfig { #[builder(default)] /// The type of node (bootstrap etc) pub node_type: NetworkNodeType, @@ -40,6 +41,16 @@ pub struct NetworkNodeConfig { /// whether to start in libp2p::kad::Mode::Server mode #[builder(default = "false")] pub server_mode: bool, + + /// The stake table. Used for authenticating other nodes. If not supplied + /// we will not check other nodes against the stake table + #[builder(default)] + pub stake_table: Option>, + + /// The signed authentication message sent to the remote peer + /// If not supplied we will not send an authentication message during the handshake + #[builder(default)] + pub auth_message: Option>, } /// NOTE: `mesh_outbound_min <= mesh_n_low <= mesh_n <= mesh_n_high` diff --git a/crates/libp2p-networking/src/network/node/handle.rs b/crates/libp2p-networking/src/network/node/handle.rs index 981ec56938..024b03f2c5 100644 --- a/crates/libp2p-networking/src/network/node/handle.rs +++ b/crates/libp2p-networking/src/network/node/handle.rs @@ -5,7 +5,9 @@ use async_compatibility_layer::{ channel::{Receiver, SendError, UnboundedReceiver, UnboundedRecvError, UnboundedSender}, }; use futures::channel::oneshot; -use hotshot_types::traits::network::NetworkError as HotshotNetworkError; +use hotshot_types::traits::{ + network::NetworkError as HotshotNetworkError, signature_key::SignatureKey, +}; use libp2p::{request_response::ResponseChannel, Multiaddr}; use libp2p_identity::PeerId; use snafu::{ResultExt, Snafu}; @@ -22,9 +24,9 @@ use crate::network::{ /// - A reference to the state /// - Controls for the swarm #[derive(Debug, Clone)] -pub struct NetworkNodeHandle { +pub struct NetworkNodeHandle { /// network configuration - network_config: NetworkNodeConfig, + network_config: NetworkNodeConfig, /// send an action to the networkbehaviour send_network: UnboundedSender, @@ -70,10 +72,10 @@ impl NetworkNodeReceiver { /// Spawn a network node task task and return the handle and the receiver for it /// # Errors /// Errors if spawning the task fails -pub async fn spawn_network_node( - config: NetworkNodeConfig, +pub async fn spawn_network_node( + config: NetworkNodeConfig, id: usize, -) -> Result<(NetworkNodeReceiver, NetworkNodeHandle), NetworkNodeHandleError> { +) -> Result<(NetworkNodeReceiver, NetworkNodeHandle), NetworkNodeHandleError> { let mut network = NetworkNode::new(config.clone()) .await .context(NetworkSnafu)?; @@ -107,7 +109,7 @@ pub async fn spawn_network_node( Ok((receiver, handle)) } -impl NetworkNodeHandle { +impl NetworkNodeHandle { /// Cleanly shuts down a swarm node /// This is done by sending a message to /// the swarm itself to spin down @@ -487,7 +489,7 @@ impl NetworkNodeHandle { /// Return a reference to the network config #[must_use] - pub fn config(&self) -> &NetworkNodeConfig { + pub fn config(&self) -> &NetworkNodeConfig { &self.network_config } } diff --git a/crates/libp2p-networking/src/network/transport.rs b/crates/libp2p-networking/src/network/transport.rs new file mode 100644 index 0000000000..ea4b783047 --- /dev/null +++ b/crates/libp2p-networking/src/network/transport.rs @@ -0,0 +1,541 @@ +use anyhow::Context; +use anyhow::Result as AnyhowResult; +use async_compatibility_layer::art::async_timeout; +use futures::AsyncRead; +use futures::AsyncWrite; +use serde::Deserialize; +use serde::Serialize; +use std::collections::HashSet; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Poll; +use tracing::warn; + +use async_std::io::ReadExt; +use async_std::io::WriteExt; +use futures::future::poll_fn; +use hotshot_types::traits::signature_key::SignatureKey; +use libp2p::core::muxing::StreamMuxerExt; +use libp2p::core::transport::TransportEvent; +use libp2p::core::StreamMuxer; +use libp2p::identity::PeerId; +use libp2p::Transport; +use pin_project::pin_project; + +/// The maximum size of an authentication message. This is used to prevent +/// DoS attacks by sending large messages. +const MAX_AUTH_MESSAGE_SIZE: usize = 1024; + +/// The timeout for the authentication handshake. This is used to prevent +/// attacks that keep connections open indefinitely by half-finishing the +/// handshake. +const AUTH_HANDSHAKE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); + +/// A wrapper for a `Transport` that bidirectionally authenticates connections +/// by performing a handshake that checks if the remote peer is present in the +/// stake table. +#[pin_project] +pub struct StakeTableAuthentication { + #[pin] + /// The underlying transport we are wrapping + pub inner: T, + + /// The stake table we check against to authenticate connections + pub stake_table: Arc>>, + + /// A pre-signed message that we send to the remote peer for authentication + pub auth_message: Arc>>, + + /// Phantom data for the connection type + pd: std::marker::PhantomData, +} + +impl StakeTableAuthentication { + /// Create a new `StakeTableAuthentication` transport that wraps the given transport + /// and authenticates connections against the stake table. + pub fn new(inner: T, stake_table: Option>, auth_message: Option>) -> Self { + Self { + inner, + stake_table: Arc::from(stake_table), + auth_message: Arc::from(auth_message), + pd: std::marker::PhantomData, + } + } + + /// Prove to the remote peer that we are in the stake table by sending + /// them our authentication message. + pub async fn authenticate_with_remote_peer( + stream: &mut C::Substream, + auth_message: Arc>>, + ) -> AnyhowResult<()> + where + C::Substream: Unpin, + { + // If we have an auth message, send it to the remote peer, prefixed with + // the message length + if let Some(auth_message) = auth_message.as_ref() { + // Write the length-delimited message + write_length_delimited(stream, auth_message).await?; + } + + Ok(()) + } + + /// Verify that the remote peer is in the stake table by checking their + /// authentication message. + pub async fn verify_peer_authentication( + stream: &mut C::Substream, + stake_table: Arc>>, + ) -> AnyhowResult<()> + where + C::Substream: Unpin, + { + // Read the length-delimited message from the remote peer + let message = read_length_delimited(stream, MAX_AUTH_MESSAGE_SIZE).await?; + + // If we have a stake table, check if the remote peer is in it + if let Some(stake_table) = stake_table.as_ref() { + // Deserialize the authentication message + let auth_message: AuthMessage = bincode::deserialize(&message) + .with_context(|| "Failed to deserialize auth message")?; + + // Verify the signature on the public key + let public_key = auth_message + .validate() + .with_context(|| "Failed to verify authentication message")?; + + // Check if the public key is in the stake table + if !stake_table.contains(&public_key) { + return Err(anyhow::anyhow!("Peer not in stake table")); + } + } + + Ok(()) + } +} + +/// The deserialized form of an authentication message that is sent to the remote peer +#[derive(Clone, Serialize, Deserialize)] +struct AuthMessage { + pub_key_bytes: Vec, + signature: S::PureAssembledSignatureType, +} + +impl AuthMessage { + /// Validate the signature on the public key and return it if valid + pub fn validate(&self) -> AnyhowResult { + // Deserialize the public key + let public_key = S::from_bytes(&self.pub_key_bytes) + .with_context(|| "Failed to deserialize public key")?; + + // Check if the signature is valid + if !public_key.validate(&self.signature, &self.pub_key_bytes) { + return Err(anyhow::anyhow!("Invalid signature")); + } + + Ok(public_key) + } +} + +/// Create an sign an authentication message to be sent to the remote peer +pub fn construct_auth_message( + public_key: &S, + private_key: &S::PrivateKey, +) -> AnyhowResult> { + // Serialize the public key + let pub_key_bytes = public_key.to_bytes(); + + // Sign our public key + let signature = + S::sign(private_key, &pub_key_bytes).with_context(|| "Failed to sign public key")?; + + // Create the auth message + let auth_message = AuthMessage:: { + pub_key_bytes, + signature, + }; + + // Serialize the auth message + Ok(bincode::serialize(&auth_message).with_context(|| "Failed to serialize auth message")?) +} + +/// A helper function to read a length-delimited message from a stream. Takes into +/// account the maximum message size. +pub async fn read_length_delimited( + stream: &mut S, + max_size: usize, +) -> AnyhowResult> { + // Receive the first 8 bytes of the message, which is the length + let mut len_bytes = [0u8; 8]; + stream + .read_exact(&mut len_bytes) + .await + .with_context(|| "Failed to read message length")?; + + // Parse the length of the message + let len = u64::from_be_bytes(len_bytes) as usize; + + // Quit if the message is too large + if len > max_size { + return Err(anyhow::anyhow!("Message too large")); + } + + // Read the actual message + let mut message = vec![0u8; len]; + stream + .read_exact(&mut message) + .await + .with_context(|| "Failed to read message")?; + + Ok(message) +} + +/// A helper function to write a length-delimited message to a stream. +pub async fn write_length_delimited( + stream: &mut S, + message: &[u8], +) -> AnyhowResult<()> { + // Write the length of the message + stream + .write_all(&(message.len() as u64).to_be_bytes()) + .await + .with_context(|| "Failed to write message length")?; + + // Write the actual message + stream + .write_all(message) + .await + .with_context(|| "Failed to write message")?; + + Ok(()) +} + +impl Transport + for StakeTableAuthentication +where + T::Dial: Future> + Send + 'static, + T::ListenerUpgrade: Send + 'static, + T::Output: AsConnection + Send, + T::Error: From<::Error> + From, + + C::Substream: Unpin + Send, +{ + // `Dial` is for connecting out, `ListenerUpgrade` is for accepting incoming connections + type Dial = Pin> + Send>>; + type ListenerUpgrade = Pin> + Send>>; + + // These are just passed through + type Output = T::Output; + type Error = T::Error; + + /// Dial a remote peer. This function is changed to perform an authentication handshake + /// on top. + fn dial( + &mut self, + addr: libp2p::Multiaddr, + ) -> Result> { + // Perform the inner dial + let res = self.inner.dial(addr); + + // Clone the necessary fields + let auth_message = self.auth_message.clone(); + let stake_table = self.stake_table.clone(); + + // If the dial was successful, perform the authentication handshake on top + match res { + Ok(dial) => Ok(Box::pin(async move { + // Perform the inner dial + let mut stream = dial.await?; + + // Time out the authentication block + async_timeout(AUTH_HANDSHAKE_TIMEOUT, async { + // Open a substream for the handshake + let mut substream = + poll_fn(|cx| stream.as_connection().poll_outbound_unpin(cx)).await?; + + // (outbound) Authenticate with the remote peer + Self::authenticate_with_remote_peer(&mut substream, auth_message) + .await + .map_err(|e| { + warn!("Failed to authenticate with remote peer: {:?}", e); + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + + // (inbound) Verify the remote peer's authentication + Self::verify_peer_authentication(&mut substream, stake_table) + .await + .map_err(|e| { + warn!("Failed to verify remote peer: {:?}", e); + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + + Ok::<(), T::Error>(()) + }) + .await + .map_err(|e| { + warn!("Timed out during authentication handshake: {:?}", e); + std::io::Error::new(std::io::ErrorKind::TimedOut, e) + })??; + + Ok(stream) + })), + Err(err) => Err(err), + } + } + + /// Dial a remote peer as a listener. This function is changed to perform an authentication + /// handshake on top. The flow should be the same as the `dial` function. + fn dial_as_listener( + &mut self, + addr: libp2p::Multiaddr, + ) -> Result> { + // Perform the inner dial + let res = self.inner.dial(addr); + + // Clone the necessary fields + let auth_message = self.auth_message.clone(); + let stake_table = self.stake_table.clone(); + + // If the dial was successful, perform the authentication handshake on top + match res { + Ok(dial) => Ok(Box::pin(async move { + // Perform the inner dial + let mut stream = dial.await?; + + // Time out the authentication block + async_timeout(AUTH_HANDSHAKE_TIMEOUT, async { + // Open a substream for the handshake + let mut substream = + poll_fn(|cx| stream.as_connection().poll_outbound_unpin(cx)).await?; + + // (inbound) Verify the remote peer's authentication + Self::verify_peer_authentication(&mut substream, stake_table) + .await + .map_err(|e| { + warn!("Failed to verify remote peer: {:?}", e); + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + + // (outbound) Authenticate with the remote peer + Self::authenticate_with_remote_peer(&mut substream, auth_message) + .await + .map_err(|e| { + warn!("Failed to authenticate with remote peer: {:?}", e); + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + + Ok::<(), T::Error>(()) + }) + .await + .map_err(|e| { + warn!("Timed out performing authentication handshake: {:?}", e); + std::io::Error::new(std::io::ErrorKind::TimedOut, e) + })??; + + Ok(stream) + })), + Err(err) => Err(err), + } + } + + /// This function is where we perform the authentication handshake for _incoming_ connections. + /// The flow in this case is the reverse of the `dial` function: we first verify the remote peer's + /// authentication, and then authenticate with them. + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> + { + match self.as_mut().project().inner.poll(cx) { + Poll::Ready(event) => Poll::Ready(match event { + // If we have an incoming connection, we need to perform the authentication handshake + TransportEvent::Incoming { + listener_id, + upgrade, + local_addr, + send_back_addr, + } => { + // Clone the necessary fields + let auth_message = self.auth_message.clone(); + let stake_table = self.stake_table.clone(); + + // Create a new upgrade that performs the authentication handshake on top + let auth_upgrade = Box::pin(async move { + // Perform the inner upgrade + let mut stream = upgrade.await?; + + // Time out the authentication block + async_timeout(AUTH_HANDSHAKE_TIMEOUT, async { + // Open a substream for the handshake + let mut substream = + poll_fn(|cx| stream.as_connection().poll_inbound_unpin(cx)).await?; + + // (inbound) Verify the remote peer's authentication + Self::verify_peer_authentication(&mut substream, stake_table) + .await + .map_err(|e| { + warn!("Failed to verify remote peer: {:?}", e); + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + + // (outbound) Authenticate with the remote peer + Self::authenticate_with_remote_peer(&mut substream, auth_message) + .await + .map_err(|e| { + warn!("Failed to authenticate with remote peer: {:?}", e); + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + + Ok::<(), T::Error>(()) + }) + .await + .map_err(|e| { + warn!("Timed out performing authentication handshake: {:?}", e); + std::io::Error::new(std::io::ErrorKind::TimedOut, e) + })??; + + Ok(stream) + }); + + // Return the new event + TransportEvent::Incoming { + listener_id, + upgrade: auth_upgrade, + local_addr, + send_back_addr, + } + } + + // We need to re-map the other events because we changed the type of the upgrade + TransportEvent::AddressExpired { + listener_id, + listen_addr, + } => TransportEvent::AddressExpired { + listener_id, + listen_addr, + }, + TransportEvent::ListenerClosed { + listener_id, + reason, + } => TransportEvent::ListenerClosed { + listener_id, + reason, + }, + TransportEvent::ListenerError { listener_id, error } => { + TransportEvent::ListenerError { listener_id, error } + } + TransportEvent::NewAddress { + listener_id, + listen_addr, + } => TransportEvent::NewAddress { + listener_id, + listen_addr, + }, + }), + + Poll::Pending => Poll::Pending, + } + } + + /// The below functions just pass through to the inner transport, but we had + /// to define them + fn remove_listener(&mut self, id: libp2p::core::transport::ListenerId) -> bool { + self.inner.remove_listener(id) + } + fn address_translation( + &self, + listen: &libp2p::Multiaddr, + observed: &libp2p::Multiaddr, + ) -> Option { + self.inner.address_translation(listen, observed) + } + fn listen_on( + &mut self, + id: libp2p::core::transport::ListenerId, + addr: libp2p::Multiaddr, + ) -> Result<(), libp2p::TransportError> { + self.inner.listen_on(id, addr) + } +} + +/// A helper trait that allows us to access the underlying connection +trait AsConnection { + fn as_connection(&mut self) -> &mut C; +} + +/// The implementation of the `AsConnection` trait for a tuple of a `PeerId` +/// and a connection. +impl AsConnection for (PeerId, C) { + fn as_connection(&mut self) -> &mut C { + &mut self.1 + } +} + +#[cfg(test)] +mod test { + use hotshot_types::{signature_key::BLSPubKey, traits::signature_key::SignatureKey}; + + /// Test valid construction and verification of an authentication message + #[test] + fn construct_and_verify_auth_message() { + // Create a new keypair + let keypair = BLSPubKey::generated_from_seed_indexed([1u8; 32], 1337); + + // Construct an authentication message + let auth_message = super::construct_auth_message(&keypair.0, &keypair.1).unwrap(); + + // Verify the authentication message + let public_key = super::AuthMessage::::validate( + &bincode::deserialize(&auth_message).unwrap(), + ); + assert!(public_key.is_ok()); + } + + /// Test invalid construction and verification of an authentication message + #[test] + fn construct_and_verify_invalid_auth_message() { + // Create a new keypair + let keypair = BLSPubKey::generated_from_seed_indexed([1u8; 32], 1337); + + // Construct an authentication message + let auth_message = super::construct_auth_message(&keypair.0, &keypair.1).unwrap(); + + // Change the public key in the message + let mut auth_message: super::AuthMessage = + bincode::deserialize(&auth_message).unwrap(); + + // Change the public key + auth_message.pub_key_bytes[0] ^= 0x01; + + // Serialize the message again + let auth_message = bincode::serialize(&auth_message).unwrap(); + + // Verify the authentication message + let public_key = super::AuthMessage::::validate( + &bincode::deserialize(&auth_message).unwrap(), + ); + assert!(public_key.is_err()); + } + + #[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] + #[cfg_attr(async_executor_impl = "async-std", async_std::test)] + async fn read_and_write_length_delimited() { + // Create a message + let message = b"Hello, world!"; + + // Write the message to a buffer + let mut buffer = Vec::new(); + super::write_length_delimited(&mut buffer, message) + .await + .unwrap(); + + // Read the message from the buffer + let read_message = super::read_length_delimited(&mut buffer.as_slice(), 1024) + .await + .unwrap(); + + // Check if the messages are the same + assert_eq!(message, read_message.as_slice()); + } +} diff --git a/crates/libp2p-networking/tests/common/mod.rs b/crates/libp2p-networking/tests/common/mod.rs index bdb1aa09bd..e90599481d 100644 --- a/crates/libp2p-networking/tests/common/mod.rs +++ b/crates/libp2p-networking/tests/common/mod.rs @@ -14,6 +14,7 @@ use async_compatibility_layer::{ logging::{setup_backtrace, setup_logging}, }; use futures::{future::join_all, Future, FutureExt}; +use hotshot_types::traits::signature_key::SignatureKey; use libp2p::{identity::Keypair, Multiaddr}; use libp2p_identity::PeerId; use libp2p_networking::network::{ @@ -25,8 +26,8 @@ use snafu::{ResultExt, Snafu}; use tracing::{info, instrument, warn}; #[derive(Clone, Debug)] -pub(crate) struct HandleWithState { - pub(crate) handle: Arc, +pub(crate) struct HandleWithState { + pub(crate) handle: Arc>, pub(crate) state: Arc>, } @@ -35,13 +36,13 @@ pub(crate) struct HandleWithState { /// # Panics /// /// Will panic if a handler is already spawned -pub fn spawn_handler( - handle_and_state: HandleWithState, +pub fn spawn_handler( + handle_and_state: HandleWithState, mut receiver: NetworkNodeReceiver, cb: F, ) -> impl Future where - F: Fn(NetworkEvent, HandleWithState) -> RET + Sync + Send + 'static, + F: Fn(NetworkEvent, HandleWithState) -> RET + Sync + Send + 'static, RET: Future> + Send + 'static, S: Debug + Default + Send + Clone + 'static, { @@ -91,7 +92,14 @@ where /// - Initialize network nodes /// - Kill network nodes /// - A test assertion fails -pub async fn test_bed( +pub async fn test_bed< + S: 'static + Send + Default + Debug + Clone, + F, + FutF, + G, + FutG, + K: SignatureKey + 'static, +>( run_test: F, client_handler: G, num_nodes: usize, @@ -100,8 +108,8 @@ pub async fn test_bed, FutG: Future> + 'static + Send + Sync, - F: FnOnce(Vec>, Duration) -> FutF, - G: Fn(NetworkEvent, HandleWithState) -> FutG + 'static + Send + Sync + Clone, + F: FnOnce(Vec>, Duration) -> FutF, + G: Fn(NetworkEvent, HandleWithState) -> FutG + 'static + Send + Sync + Clone, { setup_logging(); setup_backtrace(); @@ -109,7 +117,7 @@ pub async fn test_bed(num_nodes, timeout, num_of_bootstrap) + let handles_and_receivers = spin_up_swarms::(num_nodes, timeout, num_of_bootstrap) .await .unwrap(); @@ -139,7 +147,9 @@ pub async fn test_bed]) -> HashMap { +fn gen_peerid_map( + handles: &[Arc>], +) -> HashMap { let mut r_val = HashMap::new(); for handle in handles { r_val.insert(handle.peer_id(), handle.id()); @@ -149,7 +159,7 @@ fn gen_peerid_map(handles: &[Arc]) -> HashMap /// print the connections for each handle in `handles` /// useful for debugging -pub async fn print_connections(handles: &[Arc]) { +pub async fn print_connections(handles: &[Arc>]) { let m = gen_peerid_map(handles); warn!("PRINTING CONNECTION STATES"); for handle in handles { @@ -171,11 +181,11 @@ pub async fn print_connections(handles: &[Arc]) { /// and waits for connections to propagate to all nodes. #[allow(clippy::type_complexity)] #[instrument] -pub async fn spin_up_swarms( +pub async fn spin_up_swarms( num_of_nodes: usize, timeout_len: Duration, num_bootstrap: usize, -) -> Result, NetworkNodeReceiver)>, TestError> { +) -> Result, NetworkNodeReceiver)>, TestError> { let mut handles = Vec::new(); let mut bootstrap_addrs = Vec::<(PeerId, Multiaddr)>::new(); let mut connecting_futs = Vec::new(); diff --git a/crates/libp2p-networking/tests/counter.rs b/crates/libp2p-networking/tests/counter.rs index 4f3cdf0fa4..6031a36be0 100644 --- a/crates/libp2p-networking/tests/counter.rs +++ b/crates/libp2p-networking/tests/counter.rs @@ -8,6 +8,7 @@ use async_lock::RwLock; #[cfg(async_executor_impl = "async-std")] use async_std::prelude::StreamExt; use common::{test_bed, HandleSnafu, HandleWithState, TestError}; +use hotshot_types::{signature_key::BLSPubKey, traits::signature_key::SignatureKey}; use libp2p_networking::network::{NetworkEvent, NetworkNodeHandleError}; use rand::seq::IteratorRandom; use serde::{Deserialize, Serialize}; @@ -53,10 +54,10 @@ pub enum CounterMessage { /// chooses one /// # Panics /// panics if handles is of length 0 -fn random_handle( - handles: &[HandleWithState], +fn random_handle( + handles: &[HandleWithState], rng: &mut dyn rand::RngCore, -) -> HandleWithState { +) -> HandleWithState { handles.iter().choose(rng).unwrap().clone() } @@ -64,9 +65,9 @@ fn random_handle( /// - updates state based on events received /// - replies to direct messages #[instrument] -pub async fn counter_handle_network_event( +pub async fn counter_handle_network_event( event: NetworkEvent, - handle: HandleWithState, + handle: HandleWithState, ) -> Result<(), NetworkNodeHandleError> { use CounterMessage::*; use NetworkEvent::*; @@ -158,9 +159,9 @@ pub async fn counter_handle_network_event( /// # Panics /// on error #[allow(clippy::similar_names)] -async fn run_request_response_increment<'a>( - requester_handle: HandleWithState, - requestee_handle: HandleWithState, +async fn run_request_response_increment<'a, K: SignatureKey + 'static>( + requester_handle: HandleWithState, + requestee_handle: HandleWithState, timeout: Duration, ) -> Result<(), TestError> { async move { @@ -210,8 +211,8 @@ async fn run_request_response_increment<'a>( /// broadcasts `msg` from a randomly chosen handle /// then asserts that all nodes match `new_state` -async fn run_gossip_round( - handles: &[HandleWithState], +async fn run_gossip_round( + handles: &[HandleWithState], msg: CounterMessage, new_state: CounterState, timeout_duration: Duration, @@ -285,8 +286,8 @@ async fn run_gossip_round( Ok(()) } -async fn run_intersperse_many_rounds( - handles: Vec>, +async fn run_intersperse_many_rounds( + handles: Vec>, timeout: Duration, ) { for i in 0..u32::try_from(NUM_ROUNDS).unwrap() { @@ -301,16 +302,22 @@ async fn run_intersperse_many_rounds( } } -async fn run_dht_many_rounds(handles: Vec>, timeout: Duration) { +async fn run_dht_many_rounds( + handles: Vec>, + timeout: Duration, +) { run_dht_rounds(&handles, timeout, 0, NUM_ROUNDS).await; } -async fn run_dht_one_round(handles: Vec>, timeout: Duration) { +async fn run_dht_one_round( + handles: Vec>, + timeout: Duration, +) { run_dht_rounds(&handles, timeout, 0, 1).await; } -async fn run_request_response_many_rounds( - handles: Vec>, +async fn run_request_response_many_rounds( + handles: Vec>, timeout: Duration, ) { for _i in 0..NUM_ROUNDS { @@ -324,8 +331,8 @@ async fn run_request_response_many_rounds( /// runs one round of request response /// # Panics /// on error -async fn run_request_response_one_round( - handles: Vec>, +async fn run_request_response_one_round( + handles: Vec>, timeout: Duration, ) { run_request_response_increment_all(&handles, timeout).await; @@ -337,22 +344,28 @@ async fn run_request_response_one_round( /// runs multiple rounds of gossip /// # Panics /// on error -async fn run_gossip_many_rounds(handles: Vec>, timeout: Duration) { +async fn run_gossip_many_rounds( + handles: Vec>, + timeout: Duration, +) { run_gossip_rounds(&handles, NUM_ROUNDS, 0, timeout).await; } /// runs one round of gossip /// # Panics /// on error -async fn run_gossip_one_round(handles: Vec>, timeout: Duration) { +async fn run_gossip_one_round( + handles: Vec>, + timeout: Duration, +) { run_gossip_rounds(&handles, 1, 0, timeout).await; } /// runs many rounds of dht /// # Panics /// on error -async fn run_dht_rounds( - handles: &[HandleWithState], +async fn run_dht_rounds( + handles: &[HandleWithState], timeout: Duration, starting_val: usize, num_rounds: usize, @@ -388,8 +401,8 @@ async fn run_dht_rounds( } /// runs `num_rounds` of message broadcast, incrementing the state of all nodes each broadcast -async fn run_gossip_rounds( - handles: &[HandleWithState], +async fn run_gossip_rounds( + handles: &[HandleWithState], num_rounds: usize, starting_state: CounterState, timeout: Duration, @@ -414,8 +427,8 @@ async fn run_gossip_rounds( /// then has all other peers request its state /// and update their state to the recv'ed state #[allow(clippy::similar_names)] -async fn run_request_response_increment_all( - handles: &[HandleWithState], +async fn run_request_response_increment_all( + handles: &[HandleWithState], timeout: Duration, ) { let mut rng = rand::thread_rng(); @@ -490,7 +503,7 @@ async fn run_request_response_increment_all( #[instrument] async fn test_coverage_request_response_one_round() { Box::pin(test_bed( - run_request_response_one_round, + run_request_response_one_round::, counter_handle_network_event, TOTAL_NUM_PEERS_COVERAGE, NUM_OF_BOOTSTRAP_COVERAGE, @@ -505,7 +518,7 @@ async fn test_coverage_request_response_one_round() { #[instrument] async fn test_coverage_request_response_many_rounds() { Box::pin(test_bed( - run_request_response_many_rounds, + run_request_response_many_rounds::, counter_handle_network_event, TOTAL_NUM_PEERS_COVERAGE, NUM_OF_BOOTSTRAP_COVERAGE, @@ -520,7 +533,7 @@ async fn test_coverage_request_response_many_rounds() { #[instrument] async fn test_coverage_intersperse_many_rounds() { Box::pin(test_bed( - run_intersperse_many_rounds, + run_intersperse_many_rounds::, counter_handle_network_event, TOTAL_NUM_PEERS_COVERAGE, NUM_OF_BOOTSTRAP_COVERAGE, @@ -535,7 +548,7 @@ async fn test_coverage_intersperse_many_rounds() { #[instrument] async fn test_coverage_gossip_many_rounds() { Box::pin(test_bed( - run_gossip_many_rounds, + run_gossip_many_rounds::, counter_handle_network_event, TOTAL_NUM_PEERS_COVERAGE, NUM_OF_BOOTSTRAP_COVERAGE, @@ -550,7 +563,7 @@ async fn test_coverage_gossip_many_rounds() { #[instrument] async fn test_coverage_gossip_one_round() { Box::pin(test_bed( - run_gossip_one_round, + run_gossip_one_round::, counter_handle_network_event, TOTAL_NUM_PEERS_COVERAGE, NUM_OF_BOOTSTRAP_COVERAGE, @@ -566,7 +579,7 @@ async fn test_coverage_gossip_one_round() { #[ignore] async fn test_stress_request_response_one_round() { Box::pin(test_bed( - run_request_response_one_round, + run_request_response_one_round::, counter_handle_network_event, TOTAL_NUM_PEERS_STRESS, NUM_OF_BOOTSTRAP_STRESS, @@ -582,7 +595,7 @@ async fn test_stress_request_response_one_round() { #[ignore] async fn test_stress_request_response_many_rounds() { Box::pin(test_bed( - run_request_response_many_rounds, + run_request_response_many_rounds::, counter_handle_network_event, TOTAL_NUM_PEERS_STRESS, NUM_OF_BOOTSTRAP_STRESS, @@ -598,7 +611,7 @@ async fn test_stress_request_response_many_rounds() { #[ignore] async fn test_stress_intersperse_many_rounds() { Box::pin(test_bed( - run_intersperse_many_rounds, + run_intersperse_many_rounds::, counter_handle_network_event, TOTAL_NUM_PEERS_STRESS, NUM_OF_BOOTSTRAP_STRESS, @@ -614,7 +627,7 @@ async fn test_stress_intersperse_many_rounds() { #[ignore] async fn test_stress_gossip_many_rounds() { Box::pin(test_bed( - run_gossip_many_rounds, + run_gossip_many_rounds::, counter_handle_network_event, TOTAL_NUM_PEERS_STRESS, NUM_OF_BOOTSTRAP_STRESS, @@ -630,7 +643,7 @@ async fn test_stress_gossip_many_rounds() { #[ignore] async fn test_stress_gossip_one_round() { Box::pin(test_bed( - run_gossip_one_round, + run_gossip_one_round::, counter_handle_network_event, TOTAL_NUM_PEERS_STRESS, NUM_OF_BOOTSTRAP_STRESS, @@ -646,7 +659,7 @@ async fn test_stress_gossip_one_round() { #[ignore] async fn test_stress_dht_one_round() { Box::pin(test_bed( - run_dht_one_round, + run_dht_one_round::, counter_handle_network_event, TOTAL_NUM_PEERS_STRESS, NUM_OF_BOOTSTRAP_STRESS, @@ -662,7 +675,7 @@ async fn test_stress_dht_one_round() { #[ignore] async fn test_stress_dht_many_rounds() { Box::pin(test_bed( - run_dht_many_rounds, + run_dht_many_rounds::, counter_handle_network_event, TOTAL_NUM_PEERS_STRESS, NUM_OF_BOOTSTRAP_STRESS, @@ -677,7 +690,7 @@ async fn test_stress_dht_many_rounds() { #[instrument] async fn test_coverage_dht_one_round() { Box::pin(test_bed( - run_dht_one_round, + run_dht_one_round::, counter_handle_network_event, TOTAL_NUM_PEERS_COVERAGE, NUM_OF_BOOTSTRAP_COVERAGE, @@ -692,7 +705,7 @@ async fn test_coverage_dht_one_round() { #[instrument] async fn test_coverage_dht_many_rounds() { Box::pin(test_bed( - run_dht_many_rounds, + run_dht_many_rounds::, counter_handle_network_event, TOTAL_NUM_PEERS_COVERAGE, NUM_OF_BOOTSTRAP_COVERAGE, From de09c5ee99f0bd1b0d33e507e7798749e7feab2a Mon Sep 17 00:00:00 2001 From: Rob Date: Fri, 2 Aug 2024 18:21:47 -0400 Subject: [PATCH 02/13] clippy --- .../src/network/transport.rs | 49 ++++++++++++++----- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/crates/libp2p-networking/src/network/transport.rs b/crates/libp2p-networking/src/network/transport.rs index ea4b783047..3346884436 100644 --- a/crates/libp2p-networking/src/network/transport.rs +++ b/crates/libp2p-networking/src/network/transport.rs @@ -65,6 +65,9 @@ impl StakeTableAuthentica /// Prove to the remote peer that we are in the stake table by sending /// them our authentication message. + /// + /// # Errors + /// - If we fail to write the message to the stream pub async fn authenticate_with_remote_peer( stream: &mut C::Substream, auth_message: Arc>>, @@ -84,6 +87,14 @@ impl StakeTableAuthentica /// Verify that the remote peer is in the stake table by checking their /// authentication message. + /// + /// # Errors + /// If the peer fails verification. This can happen if: + /// - We fail to read the message from the stream + /// - The message is too large + /// - The message is invalid + /// - The peer is not in the stake table + /// - The signature is invalid pub async fn verify_peer_authentication( stream: &mut C::Substream, stake_table: Arc>>, @@ -118,7 +129,11 @@ impl StakeTableAuthentica /// The deserialized form of an authentication message that is sent to the remote peer #[derive(Clone, Serialize, Deserialize)] struct AuthMessage { + /// The encoded public key of the sender. This is what gets signed, but + /// it is still encoded here so we can verify the signature. pub_key_bytes: Vec, + + /// The signature on the public key signature: S::PureAssembledSignatureType, } @@ -139,6 +154,10 @@ impl AuthMessage { } /// Create an sign an authentication message to be sent to the remote peer +/// +/// # Errors +/// - If we fail to sign the public key +/// - If we fail to serialize the authentication message pub fn construct_auth_message( public_key: &S, private_key: &S::PrivateKey, @@ -157,24 +176,28 @@ pub fn construct_auth_message( }; // Serialize the auth message - Ok(bincode::serialize(&auth_message).with_context(|| "Failed to serialize auth message")?) + bincode::serialize(&auth_message).with_context(|| "Failed to serialize auth message") } /// A helper function to read a length-delimited message from a stream. Takes into /// account the maximum message size. +/// +/// # Errors +/// - If the message is too big +/// - If we fail to read from the stream pub async fn read_length_delimited( stream: &mut S, max_size: usize, ) -> AnyhowResult> { // Receive the first 8 bytes of the message, which is the length - let mut len_bytes = [0u8; 8]; + let mut len_bytes = [0u8; 4]; stream .read_exact(&mut len_bytes) .await .with_context(|| "Failed to read message length")?; - // Parse the length of the message - let len = u64::from_be_bytes(len_bytes) as usize; + // Parse the length of the message as a `u32` + let len = usize::try_from(u32::from_be_bytes(len_bytes))?; // Quit if the message is too large if len > max_size { @@ -192,13 +215,16 @@ pub async fn read_length_delimited( } /// A helper function to write a length-delimited message to a stream. +/// +/// # Errors +/// - If we fail to write to the stream pub async fn write_length_delimited( stream: &mut S, message: &[u8], ) -> AnyhowResult<()> { // Write the length of the message stream - .write_all(&(message.len() as u64).to_be_bytes()) + .write_all(&u32::try_from(message.len())?.to_be_bytes()) .await .with_context(|| "Failed to write message length")?; @@ -239,8 +265,8 @@ where let res = self.inner.dial(addr); // Clone the necessary fields - let auth_message = self.auth_message.clone(); - let stake_table = self.stake_table.clone(); + let auth_message = Arc::clone(&self.auth_message); + let stake_table = Arc::clone(&self.stake_table); // If the dial was successful, perform the authentication handshake on top match res { @@ -294,8 +320,8 @@ where let res = self.inner.dial(addr); // Clone the necessary fields - let auth_message = self.auth_message.clone(); - let stake_table = self.stake_table.clone(); + let auth_message = Arc::clone(&self.auth_message); + let stake_table = Arc::clone(&self.stake_table); // If the dial was successful, perform the authentication handshake on top match res { @@ -357,8 +383,8 @@ where send_back_addr, } => { // Clone the necessary fields - let auth_message = self.auth_message.clone(); - let stake_table = self.stake_table.clone(); + let auth_message = Arc::clone(&self.auth_message); + let stake_table = Arc::clone(&self.stake_table); // Create a new upgrade that performs the authentication handshake on top let auth_upgrade = Box::pin(async move { @@ -461,6 +487,7 @@ where /// A helper trait that allows us to access the underlying connection trait AsConnection { + /// Get a mutable reference to the underlying connection fn as_connection(&mut self) -> &mut C; } From b13a79fa0cb605152c2b0b93fef95cce99e54b97 Mon Sep 17 00:00:00 2001 From: Rob Date: Mon, 5 Aug 2024 10:07:50 -0400 Subject: [PATCH 03/13] fix lint and build --- crates/libp2p-networking/src/network/mod.rs | 15 ++++++++++++--- crates/libp2p-networking/src/network/transport.rs | 3 +-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/crates/libp2p-networking/src/network/mod.rs b/crates/libp2p-networking/src/network/mod.rs index e0707e9f09..a48a7985b0 100644 --- a/crates/libp2p-networking/src/network/mod.rs +++ b/crates/libp2p-networking/src/network/mod.rs @@ -232,9 +232,18 @@ pub async fn gen_transport( let transport = StakeTableAuthentication::new(transport, stake_table, auth_message); // Support DNS resolution - let transport = DnsTransport::system(transport) - .await - .map_err(|e| NetworkError::TransportLaunch { source: e })?; + let transport = { + #[cfg(async_executor_impl = "async-std")] + { + DnsTransport::system(transport).await + } + + #[cfg(async_executor_impl = "tokio")] + { + DnsTransport::system(transport) + } + } + .map_err(|e| NetworkError::TransportLaunch { source: e })?; Ok(transport .map(|(peer_id, connection), _| (peer_id, StreamMuxerBox::new(connection))) diff --git a/crates/libp2p-networking/src/network/transport.rs b/crates/libp2p-networking/src/network/transport.rs index 3346884436..a6146ab770 100644 --- a/crates/libp2p-networking/src/network/transport.rs +++ b/crates/libp2p-networking/src/network/transport.rs @@ -12,8 +12,7 @@ use std::sync::Arc; use std::task::Poll; use tracing::warn; -use async_std::io::ReadExt; -use async_std::io::WriteExt; +use futures::{AsyncReadExt, AsyncWriteExt}; use futures::future::poll_fn; use hotshot_types::traits::signature_key::SignatureKey; use libp2p::core::muxing::StreamMuxerExt; From 8bea78881342ebceab1f87e4973a03ee2361da5c Mon Sep 17 00:00:00 2001 From: Rob Date: Mon, 5 Aug 2024 10:08:03 -0400 Subject: [PATCH 04/13] fmt --- crates/libp2p-networking/src/network/transport.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/libp2p-networking/src/network/transport.rs b/crates/libp2p-networking/src/network/transport.rs index a6146ab770..65c76df413 100644 --- a/crates/libp2p-networking/src/network/transport.rs +++ b/crates/libp2p-networking/src/network/transport.rs @@ -12,8 +12,8 @@ use std::sync::Arc; use std::task::Poll; use tracing::warn; -use futures::{AsyncReadExt, AsyncWriteExt}; use futures::future::poll_fn; +use futures::{AsyncReadExt, AsyncWriteExt}; use hotshot_types::traits::signature_key::SignatureKey; use libp2p::core::muxing::StreamMuxerExt; use libp2p::core::transport::TransportEvent; From 539dcbc605fb668ef9e41744174728fa8ea9343d Mon Sep 17 00:00:00 2001 From: Rob Date: Mon, 5 Aug 2024 10:43:29 -0400 Subject: [PATCH 05/13] fix incompatibility in tests --- crates/libp2p-networking/src/network/transport.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/libp2p-networking/src/network/transport.rs b/crates/libp2p-networking/src/network/transport.rs index 65c76df413..bea38f0e5c 100644 --- a/crates/libp2p-networking/src/network/transport.rs +++ b/crates/libp2p-networking/src/network/transport.rs @@ -101,11 +101,11 @@ impl StakeTableAuthentica where C::Substream: Unpin, { - // Read the length-delimited message from the remote peer - let message = read_length_delimited(stream, MAX_AUTH_MESSAGE_SIZE).await?; - // If we have a stake table, check if the remote peer is in it if let Some(stake_table) = stake_table.as_ref() { + // Read the length-delimited message from the remote peer + let message = read_length_delimited(stream, MAX_AUTH_MESSAGE_SIZE).await?; + // Deserialize the authentication message let auth_message: AuthMessage = bincode::deserialize(&message) .with_context(|| "Failed to deserialize auth message")?; From 9c8097158a58f05a4c7a2c2100268513587a4691 Mon Sep 17 00:00:00 2001 From: rob-maron <132852777+rob-maron@users.noreply.github.com> Date: Mon, 5 Aug 2024 11:06:47 -0400 Subject: [PATCH 06/13] `anyhow!` to `ensure!` Co-authored-by: Jarred Parr --- crates/libp2p-networking/src/network/transport.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/libp2p-networking/src/network/transport.rs b/crates/libp2p-networking/src/network/transport.rs index bea38f0e5c..d4fae8a56e 100644 --- a/crates/libp2p-networking/src/network/transport.rs +++ b/crates/libp2p-networking/src/network/transport.rs @@ -199,9 +199,7 @@ pub async fn read_length_delimited( let len = usize::try_from(u32::from_be_bytes(len_bytes))?; // Quit if the message is too large - if len > max_size { - return Err(anyhow::anyhow!("Message too large")); - } + ensure!(len <= max_size, "Message too large"); // Read the actual message let mut message = vec![0u8; len]; From 609dfe131466e03d1298e0445c6199c37f8e793b Mon Sep 17 00:00:00 2001 From: Rob Date: Mon, 5 Aug 2024 11:15:05 -0400 Subject: [PATCH 07/13] ensure import --- crates/libp2p-networking/src/network/transport.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/libp2p-networking/src/network/transport.rs b/crates/libp2p-networking/src/network/transport.rs index d4fae8a56e..8b5b028841 100644 --- a/crates/libp2p-networking/src/network/transport.rs +++ b/crates/libp2p-networking/src/network/transport.rs @@ -1,5 +1,5 @@ -use anyhow::Context; use anyhow::Result as AnyhowResult; +use anyhow::{ensure, Context}; use async_compatibility_layer::art::async_timeout; use futures::AsyncRead; use futures::AsyncWrite; From 2d8621f5dacb486da2b718f527533a35d116a73f Mon Sep 17 00:00:00 2001 From: Rob Date: Mon, 5 Aug 2024 14:48:32 -0400 Subject: [PATCH 08/13] match to `PeerId` --- .../src/traits/networking/libp2p_network.rs | 5 +- .../src/network/transport.rs | 384 +++++++++++++----- 2 files changed, 287 insertions(+), 102 deletions(-) diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index 8050a4075d..8eeff5084e 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -413,8 +413,9 @@ impl Libp2pNetwork { .map(|node| K::public_key(&node.stake_table_entry)) .collect(); - let auth_message = construct_auth_message(pub_key, priv_key) - .with_context(|| "Failed to construct auth message")?; + let auth_message = + construct_auth_message(pub_key, &keypair.public().to_peer_id(), priv_key) + .with_context(|| "Failed to construct auth message")?; // Set the auth message and stake table config_builder diff --git a/crates/libp2p-networking/src/network/transport.rs b/crates/libp2p-networking/src/network/transport.rs index 8b5b028841..bc939205f8 100644 --- a/crates/libp2p-networking/src/network/transport.rs +++ b/crates/libp2p-networking/src/network/transport.rs @@ -50,6 +50,75 @@ pub struct StakeTableAuthentication, } +/// Prove to the remote peer that we are in the stake table by sending +/// them our authentication message. +/// +/// # Errors +/// - If we fail to write the message to the stream +pub async fn authenticate_with_remote_peer( + stream: &mut W, + auth_message: Arc>>, +) -> AnyhowResult<()> { + // If we have an auth message, send it to the remote peer, prefixed with + // the message length + if let Some(auth_message) = auth_message.as_ref() { + // Write the length-delimited message + write_length_delimited(stream, auth_message).await?; + } + + Ok(()) +} + +/// Verify that the remote peer is: +/// - In the stake table +/// - Sending us a valid authentication message +/// - Sending us a valid signature +/// - Matching the peer ID we expect +/// +/// # Errors +/// If the peer fails verification. This can happen if: +/// - We fail to read the message from the stream +/// - The message is too large +/// - The message is invalid +/// - The peer is not in the stake table +/// - The signature is invalid +pub async fn verify_peer_authentication( + stream: &mut R, + stake_table: Arc>>, + required_peer_id: &PeerId, +) -> AnyhowResult<()> { + // If we have a stake table, check if the remote peer is in it + if let Some(stake_table) = stake_table.as_ref() { + // Read the length-delimited message from the remote peer + let message = read_length_delimited(stream, MAX_AUTH_MESSAGE_SIZE).await?; + + // Deserialize the authentication message + let auth_message: AuthMessage = + bincode::deserialize(&message).with_context(|| "Failed to deserialize auth message")?; + + // Verify the signature on the public keys + let public_key = auth_message + .validate() + .with_context(|| "Failed to verify authentication message")?; + + // Deserialize the `PeerId` + let peer_id = PeerId::from_bytes(&auth_message.peer_id_bytes) + .with_context(|| "Failed to deserialize peer ID")?; + + // Verify that the peer ID is the same as the remote peer + if peer_id != *required_peer_id { + return Err(anyhow::anyhow!("Peer ID mismatch")); + } + + // Check if the public key is in the stake table + if !stake_table.contains(&public_key) { + return Err(anyhow::anyhow!("Peer not in stake table")); + } + } + + Ok(()) +} + impl StakeTableAuthentication { /// Create a new `StakeTableAuthentication` transport that wraps the given transport /// and authenticates connections against the stake table. @@ -61,76 +130,18 @@ impl StakeTableAuthentica pd: std::marker::PhantomData, } } - - /// Prove to the remote peer that we are in the stake table by sending - /// them our authentication message. - /// - /// # Errors - /// - If we fail to write the message to the stream - pub async fn authenticate_with_remote_peer( - stream: &mut C::Substream, - auth_message: Arc>>, - ) -> AnyhowResult<()> - where - C::Substream: Unpin, - { - // If we have an auth message, send it to the remote peer, prefixed with - // the message length - if let Some(auth_message) = auth_message.as_ref() { - // Write the length-delimited message - write_length_delimited(stream, auth_message).await?; - } - - Ok(()) - } - - /// Verify that the remote peer is in the stake table by checking their - /// authentication message. - /// - /// # Errors - /// If the peer fails verification. This can happen if: - /// - We fail to read the message from the stream - /// - The message is too large - /// - The message is invalid - /// - The peer is not in the stake table - /// - The signature is invalid - pub async fn verify_peer_authentication( - stream: &mut C::Substream, - stake_table: Arc>>, - ) -> AnyhowResult<()> - where - C::Substream: Unpin, - { - // If we have a stake table, check if the remote peer is in it - if let Some(stake_table) = stake_table.as_ref() { - // Read the length-delimited message from the remote peer - let message = read_length_delimited(stream, MAX_AUTH_MESSAGE_SIZE).await?; - - // Deserialize the authentication message - let auth_message: AuthMessage = bincode::deserialize(&message) - .with_context(|| "Failed to deserialize auth message")?; - - // Verify the signature on the public key - let public_key = auth_message - .validate() - .with_context(|| "Failed to verify authentication message")?; - - // Check if the public key is in the stake table - if !stake_table.contains(&public_key) { - return Err(anyhow::anyhow!("Peer not in stake table")); - } - } - - Ok(()) - } } /// The deserialized form of an authentication message that is sent to the remote peer #[derive(Clone, Serialize, Deserialize)] struct AuthMessage { - /// The encoded public key of the sender. This is what gets signed, but - /// it is still encoded here so we can verify the signature. - pub_key_bytes: Vec, + /// The encoded (stake table) public key of the sender. This, along with the peer ID, is + /// signed. It is still encoded here to enable easy verification. + public_key_bytes: Vec, + + /// The encoded peer ID of the sender. This is appended to the public key before signing. + /// It is still encoded here to enable easy verification. + peer_id_bytes: Vec, /// The signature on the public key signature: S::PureAssembledSignatureType, @@ -139,12 +150,16 @@ struct AuthMessage { impl AuthMessage { /// Validate the signature on the public key and return it if valid pub fn validate(&self) -> AnyhowResult { - // Deserialize the public key - let public_key = S::from_bytes(&self.pub_key_bytes) + // Deserialize the stake table public key + let public_key = S::from_bytes(&self.public_key_bytes) .with_context(|| "Failed to deserialize public key")?; - // Check if the signature is valid - if !public_key.validate(&self.signature, &self.pub_key_bytes) { + // Reconstruct the signed message from the public key and peer ID + let mut signed_message = public_key.to_bytes(); + signed_message.extend(self.peer_id_bytes.clone()); + + // Check if the signature is valid across both + if !public_key.validate(&self.signature, &signed_message) { return Err(anyhow::anyhow!("Invalid signature")); } @@ -159,18 +174,24 @@ impl AuthMessage { /// - If we fail to serialize the authentication message pub fn construct_auth_message( public_key: &S, + peer_id: &PeerId, private_key: &S::PrivateKey, ) -> AnyhowResult> { - // Serialize the public key - let pub_key_bytes = public_key.to_bytes(); + // Serialize the stake table public key + let mut public_key_bytes = public_key.to_bytes(); + + // Serialize the peer ID and append it + let peer_id_bytes = peer_id.to_bytes(); + public_key_bytes.extend_from_slice(&peer_id_bytes); // Sign our public key let signature = - S::sign(private_key, &pub_key_bytes).with_context(|| "Failed to sign public key")?; + S::sign(private_key, &public_key_bytes).with_context(|| "Failed to sign public key")?; // Create the auth message let auth_message = AuthMessage:: { - pub_key_bytes, + public_key_bytes, + peer_id_bytes, signature, }; @@ -239,7 +260,7 @@ impl Transport where T::Dial: Future> + Send + 'static, T::ListenerUpgrade: Send + 'static, - T::Output: AsConnection + Send, + T::Output: AsConnection + AsPeerId + Send, T::Error: From<::Error> + From, C::Substream: Unpin + Send, @@ -278,7 +299,7 @@ where poll_fn(|cx| stream.as_connection().poll_outbound_unpin(cx)).await?; // (outbound) Authenticate with the remote peer - Self::authenticate_with_remote_peer(&mut substream, auth_message) + authenticate_with_remote_peer(&mut substream, auth_message) .await .map_err(|e| { warn!("Failed to authenticate with remote peer: {:?}", e); @@ -286,7 +307,7 @@ where })?; // (inbound) Verify the remote peer's authentication - Self::verify_peer_authentication(&mut substream, stake_table) + verify_peer_authentication(&mut substream, stake_table, stream.as_peer_id()) .await .map_err(|e| { warn!("Failed to verify remote peer: {:?}", e); @@ -333,7 +354,7 @@ where poll_fn(|cx| stream.as_connection().poll_outbound_unpin(cx)).await?; // (inbound) Verify the remote peer's authentication - Self::verify_peer_authentication(&mut substream, stake_table) + verify_peer_authentication(&mut substream, stake_table, stream.as_peer_id()) .await .map_err(|e| { warn!("Failed to verify remote peer: {:?}", e); @@ -341,7 +362,7 @@ where })?; // (outbound) Authenticate with the remote peer - Self::authenticate_with_remote_peer(&mut substream, auth_message) + authenticate_with_remote_peer(&mut substream, auth_message) .await .map_err(|e| { warn!("Failed to authenticate with remote peer: {:?}", e); @@ -395,15 +416,19 @@ where poll_fn(|cx| stream.as_connection().poll_inbound_unpin(cx)).await?; // (inbound) Verify the remote peer's authentication - Self::verify_peer_authentication(&mut substream, stake_table) - .await - .map_err(|e| { - warn!("Failed to verify remote peer: {:?}", e); - std::io::Error::new(std::io::ErrorKind::Other, e) - })?; + verify_peer_authentication( + &mut substream, + stake_table, + stream.as_peer_id(), + ) + .await + .map_err(|e| { + warn!("Failed to verify remote peer: {:?}", e); + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; // (outbound) Authenticate with the remote peer - Self::authenticate_with_remote_peer(&mut substream, auth_message) + authenticate_with_remote_peer(&mut substream, auth_message) .await .map_err(|e| { warn!("Failed to authenticate with remote peer: {:?}", e); @@ -496,18 +521,70 @@ impl AsConnection for (PeerId, C) { } } +/// A helper trait that allows us to access the underlying `PeerId` +trait AsPeerId { + /// Get a mutable reference to the underlying `PeerId` + fn as_peer_id(&mut self) -> &mut PeerId; +} + +/// The implementation of the `AsPeerId` trait for a tuple of a `PeerId` +/// and a connection. +impl AsPeerId for (PeerId, C) { + fn as_peer_id(&mut self) -> &mut PeerId { + &mut self.0 + } +} + #[cfg(test)] mod test { + use rand::Rng; + + use std::{collections::HashSet, sync::Arc}; + + use super::write_length_delimited; use hotshot_types::{signature_key::BLSPubKey, traits::signature_key::SignatureKey}; + use super::verify_peer_authentication; + + // Helper macro for generating a new identity and authentication message + macro_rules! new_identity { + () => {{ + // Gen a new seed + let seed = rand::rngs::OsRng.gen::<[u8; 32]>(); + + // Create a new keypair + let keypair = BLSPubKey::generated_from_seed_indexed(seed, 1337); + + // Create a peer ID + let peer_id = libp2p::identity::Keypair::generate_ed25519() + .public() + .to_peer_id(); + + // Construct an authentication message + let auth_message = + super::construct_auth_message(&keypair.0, &peer_id, &keypair.1).unwrap(); + + (keypair, peer_id, auth_message) + }}; + } + + // Helper macro to generator a cursor from a length-delimited message + macro_rules! cursor_from { + ($auth_message:expr) => {{ + let mut stream = futures::io::Cursor::new(vec![]); + write_length_delimited(&mut stream, &$auth_message) + .await + .expect("Failed to write message"); + stream.set_position(0); + stream + }}; + } + /// Test valid construction and verification of an authentication message #[test] - fn construct_and_verify_auth_message() { - // Create a new keypair - let keypair = BLSPubKey::generated_from_seed_indexed([1u8; 32], 1337); - - // Construct an authentication message - let auth_message = super::construct_auth_message(&keypair.0, &keypair.1).unwrap(); + fn signature_verify() { + // Create a new identity + let (_, _, auth_message) = new_identity!(); // Verify the authentication message let public_key = super::AuthMessage::::validate( @@ -516,21 +593,43 @@ mod test { assert!(public_key.is_ok()); } - /// Test invalid construction and verification of an authentication message + /// Test invalid construction and verification of an authentication message with + /// an invalid public key. This ensures we are signing over it correctly. #[test] - fn construct_and_verify_invalid_auth_message() { - // Create a new keypair - let keypair = BLSPubKey::generated_from_seed_indexed([1u8; 32], 1337); - - // Construct an authentication message - let auth_message = super::construct_auth_message(&keypair.0, &keypair.1).unwrap(); + fn signature_verify_invalid_public_key() { + // Create a new identity + let (_, _, auth_message) = new_identity!(); - // Change the public key in the message + // Deserialize the authentication message let mut auth_message: super::AuthMessage = bincode::deserialize(&auth_message).unwrap(); // Change the public key - auth_message.pub_key_bytes[0] ^= 0x01; + auth_message.public_key_bytes[0] ^= 0x01; + + // Serialize the message again + let auth_message = bincode::serialize(&auth_message).unwrap(); + + // Verify the authentication message + let public_key = super::AuthMessage::::validate( + &bincode::deserialize(&auth_message).unwrap(), + ); + assert!(public_key.is_err()); + } + + /// Test invalid construction and verification of an authentication message with + /// an invalid peer ID. This ensures we are signing over it correctly. + #[test] + fn signature_verify_invalid_peer_id() { + // Create a new identity + let (_, _, auth_message) = new_identity!(); + + // Deserialize the authentication message + let mut auth_message: super::AuthMessage = + bincode::deserialize(&auth_message).unwrap(); + + // Change the peer ID + auth_message.peer_id_bytes[0] ^= 0x01; // Serialize the message again let auth_message = bincode::serialize(&auth_message).unwrap(); @@ -542,6 +641,91 @@ mod test { assert!(public_key.is_err()); } + #[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] + #[cfg_attr(async_executor_impl = "async-std", async_std::test)] + async fn valid_authentication() { + // Create a new identity + let (keypair, peer_id, auth_message) = new_identity!(); + + // Create a stream and write the message to it + let mut stream = cursor_from!(auth_message); + + // Create a stake table with the key + let mut stake_table = std::collections::HashSet::new(); + stake_table.insert(keypair.0); + + // Verify the authentication message + let result = + verify_peer_authentication(&mut stream, Arc::new(Some(stake_table)), &peer_id).await; + + assert!( + result.is_ok(), + "Should have passed authentication but did not" + ); + } + + #[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] + #[cfg_attr(async_executor_impl = "async-std", async_std::test)] + async fn key_not_in_stake_table() { + // Create a new identity + let (_, peer_id, auth_message) = new_identity!(); + + // Create a stream and write the message to it + let mut stream = cursor_from!(auth_message); + + // Create an empty stake table + let stake_table: HashSet = std::collections::HashSet::new(); + + // Verify the authentication message + let result = + verify_peer_authentication(&mut stream, Arc::new(Some(stake_table)), &peer_id).await; + + // Make sure it errored for the right reason + assert!( + result + .err() + .expect("Should have failed authentication but did not") + .to_string() + .contains("Peer not in stake table"), + "Did not fail with the correct error" + ); + } + + #[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] + #[cfg_attr(async_executor_impl = "async-std", async_std::test)] + async fn peer_id_mismatch() { + // Create a new identity and authentication message + let (keypair, _, auth_message) = new_identity!(); + + // Create a second (malicious) identity + let (_, malicious_peer_id, _) = new_identity!(); + + // Create a stream and write the message to it + let mut stream = cursor_from!(auth_message); + + // Create a stake table with the key + let mut stake_table: HashSet = std::collections::HashSet::new(); + stake_table.insert(keypair.0); + + // Check against the malicious peer ID + let result = verify_peer_authentication( + &mut stream, + Arc::new(Some(stake_table)), + &malicious_peer_id, + ) + .await; + + // Make sure it errored for the right reason + assert!( + result + .err() + .expect("Should have failed authentication but did not") + .to_string() + .contains("Peer ID mismatch"), + "Did not fail with the correct error" + ); + } + #[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] #[cfg_attr(async_executor_impl = "async-std", async_std::test)] async fn read_and_write_length_delimited() { From 51b2a3702898fa3fd221c7788c286c02f00fde4a Mon Sep 17 00:00:00 2001 From: Rob Date: Mon, 5 Aug 2024 14:54:14 -0400 Subject: [PATCH 09/13] format and lint --- crates/libp2p-networking/src/network/transport.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/crates/libp2p-networking/src/network/transport.rs b/crates/libp2p-networking/src/network/transport.rs index bc939205f8..db89f089a8 100644 --- a/crates/libp2p-networking/src/network/transport.rs +++ b/crates/libp2p-networking/src/network/transport.rs @@ -7,6 +7,7 @@ use serde::Deserialize; use serde::Serialize; use std::collections::HashSet; use std::future::Future; +use std::hash::BuildHasher; use std::pin::Pin; use std::sync::Arc; use std::task::Poll; @@ -82,9 +83,13 @@ pub async fn authenticate_with_remote_peer( /// - The message is invalid /// - The peer is not in the stake table /// - The signature is invalid -pub async fn verify_peer_authentication( +pub async fn verify_peer_authentication< + R: AsyncReadExt + Unpin, + S: SignatureKey, + H: BuildHasher, +>( stream: &mut R, - stake_table: Arc>>, + stake_table: Arc>>, required_peer_id: &PeerId, ) -> AnyhowResult<()> { // If we have a stake table, check if the remote peer is in it @@ -683,8 +688,7 @@ mod test { // Make sure it errored for the right reason assert!( result - .err() - .expect("Should have failed authentication but did not") + .expect_err("Should have failed authentication but did not") .to_string() .contains("Peer not in stake table"), "Did not fail with the correct error" @@ -718,8 +722,7 @@ mod test { // Make sure it errored for the right reason assert!( result - .err() - .expect("Should have failed authentication but did not") + .expect_err("Should have failed authentication but did not") .to_string() .contains("Peer ID mismatch"), "Did not fail with the correct error" From 443784fa9d6ac815d9ec53329a583ed83fe0de31 Mon Sep 17 00:00:00 2001 From: Rob Date: Mon, 5 Aug 2024 14:57:39 -0400 Subject: [PATCH 10/13] namespace everything to `st transport` --- crates/hotshot/src/traits/networking/libp2p_network.rs | 2 +- crates/libp2p-networking/src/network/mod.rs | 4 ++-- .../src/network/{transport.rs => stake_table_transport.rs} | 0 3 files changed, 3 insertions(+), 3 deletions(-) rename crates/libp2p-networking/src/network/{transport.rs => stake_table_transport.rs} (100%) diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index 8eeff5084e..d6ba27fff4 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -59,7 +59,7 @@ use libp2p_networking::{ network::{ behaviours::request_response::{Request, Response}, spawn_network_node, - transport::construct_auth_message, + stake_table_transport::construct_auth_message, MeshParams, NetworkEvent::{self, DirectRequest, DirectResponse, GossipMsg}, NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeReceiver, diff --git a/crates/libp2p-networking/src/network/mod.rs b/crates/libp2p-networking/src/network/mod.rs index a48a7985b0..3d53f0dcf7 100644 --- a/crates/libp2p-networking/src/network/mod.rs +++ b/crates/libp2p-networking/src/network/mod.rs @@ -7,7 +7,7 @@ pub mod error; /// functionality of a libp2p network node mod node; /// Alternative Libp2p transport implementations -pub mod transport; +pub mod stake_table_transport; use std::{collections::HashSet, fmt::Debug, str::FromStr}; @@ -33,8 +33,8 @@ use quic::async_std::Transport as QuicTransport; #[cfg(async_executor_impl = "tokio")] use quic::tokio::Transport as QuicTransport; use serde::{Deserialize, Serialize}; +use stake_table_transport::StakeTableAuthentication; use tracing::instrument; -use transport::StakeTableAuthentication; use self::behaviours::request_response::{Request, Response}; pub use self::{ diff --git a/crates/libp2p-networking/src/network/transport.rs b/crates/libp2p-networking/src/network/stake_table_transport.rs similarity index 100% rename from crates/libp2p-networking/src/network/transport.rs rename to crates/libp2p-networking/src/network/stake_table_transport.rs From a8defe32821df331f064eedf533d6991c8af64b8 Mon Sep 17 00:00:00 2001 From: Rob Date: Mon, 5 Aug 2024 16:07:17 -0400 Subject: [PATCH 11/13] move some items --- .../src/network/stake_table_transport.rs | 255 +++++++++--------- 1 file changed, 126 insertions(+), 129 deletions(-) diff --git a/crates/libp2p-networking/src/network/stake_table_transport.rs b/crates/libp2p-networking/src/network/stake_table_transport.rs index db89f089a8..b6cad13cb4 100644 --- a/crates/libp2p-networking/src/network/stake_table_transport.rs +++ b/crates/libp2p-networking/src/network/stake_table_transport.rs @@ -51,79 +51,6 @@ pub struct StakeTableAuthentication, } -/// Prove to the remote peer that we are in the stake table by sending -/// them our authentication message. -/// -/// # Errors -/// - If we fail to write the message to the stream -pub async fn authenticate_with_remote_peer( - stream: &mut W, - auth_message: Arc>>, -) -> AnyhowResult<()> { - // If we have an auth message, send it to the remote peer, prefixed with - // the message length - if let Some(auth_message) = auth_message.as_ref() { - // Write the length-delimited message - write_length_delimited(stream, auth_message).await?; - } - - Ok(()) -} - -/// Verify that the remote peer is: -/// - In the stake table -/// - Sending us a valid authentication message -/// - Sending us a valid signature -/// - Matching the peer ID we expect -/// -/// # Errors -/// If the peer fails verification. This can happen if: -/// - We fail to read the message from the stream -/// - The message is too large -/// - The message is invalid -/// - The peer is not in the stake table -/// - The signature is invalid -pub async fn verify_peer_authentication< - R: AsyncReadExt + Unpin, - S: SignatureKey, - H: BuildHasher, ->( - stream: &mut R, - stake_table: Arc>>, - required_peer_id: &PeerId, -) -> AnyhowResult<()> { - // If we have a stake table, check if the remote peer is in it - if let Some(stake_table) = stake_table.as_ref() { - // Read the length-delimited message from the remote peer - let message = read_length_delimited(stream, MAX_AUTH_MESSAGE_SIZE).await?; - - // Deserialize the authentication message - let auth_message: AuthMessage = - bincode::deserialize(&message).with_context(|| "Failed to deserialize auth message")?; - - // Verify the signature on the public keys - let public_key = auth_message - .validate() - .with_context(|| "Failed to verify authentication message")?; - - // Deserialize the `PeerId` - let peer_id = PeerId::from_bytes(&auth_message.peer_id_bytes) - .with_context(|| "Failed to deserialize peer ID")?; - - // Verify that the peer ID is the same as the remote peer - if peer_id != *required_peer_id { - return Err(anyhow::anyhow!("Peer ID mismatch")); - } - - // Check if the public key is in the stake table - if !stake_table.contains(&public_key) { - return Err(anyhow::anyhow!("Peer not in stake table")); - } - } - - Ok(()) -} - impl StakeTableAuthentication { /// Create a new `StakeTableAuthentication` transport that wraps the given transport /// and authenticates connections against the stake table. @@ -204,58 +131,75 @@ pub fn construct_auth_message( bincode::serialize(&auth_message).with_context(|| "Failed to serialize auth message") } -/// A helper function to read a length-delimited message from a stream. Takes into -/// account the maximum message size. +/// Prove to the remote peer that we are in the stake table by sending +/// them our authentication message. /// /// # Errors -/// - If the message is too big -/// - If we fail to read from the stream -pub async fn read_length_delimited( - stream: &mut S, - max_size: usize, -) -> AnyhowResult> { - // Receive the first 8 bytes of the message, which is the length - let mut len_bytes = [0u8; 4]; - stream - .read_exact(&mut len_bytes) - .await - .with_context(|| "Failed to read message length")?; - - // Parse the length of the message as a `u32` - let len = usize::try_from(u32::from_be_bytes(len_bytes))?; - - // Quit if the message is too large - ensure!(len <= max_size, "Message too large"); - - // Read the actual message - let mut message = vec![0u8; len]; - stream - .read_exact(&mut message) - .await - .with_context(|| "Failed to read message")?; +/// - If we fail to write the message to the stream +pub async fn authenticate_with_remote_peer( + stream: &mut W, + auth_message: Arc>>, +) -> AnyhowResult<()> { + // If we have an auth message, send it to the remote peer, prefixed with + // the message length + if let Some(auth_message) = auth_message.as_ref() { + // Write the length-delimited message + write_length_delimited(stream, auth_message).await?; + } - Ok(message) + Ok(()) } -/// A helper function to write a length-delimited message to a stream. +/// Verify that the remote peer is: +/// - In the stake table +/// - Sending us a valid authentication message +/// - Sending us a valid signature +/// - Matching the peer ID we expect /// /// # Errors -/// - If we fail to write to the stream -pub async fn write_length_delimited( - stream: &mut S, - message: &[u8], +/// If the peer fails verification. This can happen if: +/// - We fail to read the message from the stream +/// - The message is too large +/// - The message is invalid +/// - The peer is not in the stake table +/// - The signature is invalid +pub async fn verify_peer_authentication< + R: AsyncReadExt + Unpin, + S: SignatureKey, + H: BuildHasher, +>( + stream: &mut R, + stake_table: Arc>>, + required_peer_id: &PeerId, ) -> AnyhowResult<()> { - // Write the length of the message - stream - .write_all(&u32::try_from(message.len())?.to_be_bytes()) - .await - .with_context(|| "Failed to write message length")?; + // If we have a stake table, check if the remote peer is in it + if let Some(stake_table) = stake_table.as_ref() { + // Read the length-delimited message from the remote peer + let message = read_length_delimited(stream, MAX_AUTH_MESSAGE_SIZE).await?; - // Write the actual message - stream - .write_all(message) - .await - .with_context(|| "Failed to write message")?; + // Deserialize the authentication message + let auth_message: AuthMessage = + bincode::deserialize(&message).with_context(|| "Failed to deserialize auth message")?; + + // Verify the signature on the public keys + let public_key = auth_message + .validate() + .with_context(|| "Failed to verify authentication message")?; + + // Deserialize the `PeerId` + let peer_id = PeerId::from_bytes(&auth_message.peer_id_bytes) + .with_context(|| "Failed to deserialize peer ID")?; + + // Verify that the peer ID is the same as the remote peer + if peer_id != *required_peer_id { + return Err(anyhow::anyhow!("Peer ID mismatch")); + } + + // Check if the public key is in the stake table + if !stake_table.contains(&public_key) { + return Err(anyhow::anyhow!("Peer not in stake table")); + } + } Ok(()) } @@ -265,7 +209,7 @@ impl Transport where T::Dial: Future> + Send + 'static, T::ListenerUpgrade: Send + 'static, - T::Output: AsConnection + AsPeerId + Send, + T::Output: AsOutput + Send, T::Error: From<::Error> + From, C::Substream: Unpin + Send, @@ -334,7 +278,8 @@ where } /// Dial a remote peer as a listener. This function is changed to perform an authentication - /// handshake on top. The flow should be the same as the `dial` function. + /// handshake on top. The flow should be the reverse of the `dial` function and the + /// same as the `poll` function. fn dial_as_listener( &mut self, addr: libp2p::Multiaddr, @@ -513,33 +458,85 @@ where } /// A helper trait that allows us to access the underlying connection -trait AsConnection { +/// and `PeerId` from a transport output +trait AsOutput { /// Get a mutable reference to the underlying connection fn as_connection(&mut self) -> &mut C; + + /// Get a mutable reference to the underlying `PeerId` + fn as_peer_id(&mut self) -> &mut PeerId; } /// The implementation of the `AsConnection` trait for a tuple of a `PeerId` /// and a connection. -impl AsConnection for (PeerId, C) { +impl AsOutput for (PeerId, C) { + /// Get a mutable reference to the underlying connection fn as_connection(&mut self) -> &mut C { &mut self.1 } -} -/// A helper trait that allows us to access the underlying `PeerId` -trait AsPeerId { /// Get a mutable reference to the underlying `PeerId` - fn as_peer_id(&mut self) -> &mut PeerId; -} - -/// The implementation of the `AsPeerId` trait for a tuple of a `PeerId` -/// and a connection. -impl AsPeerId for (PeerId, C) { fn as_peer_id(&mut self) -> &mut PeerId { &mut self.0 } } +/// A helper function to read a length-delimited message from a stream. Takes into +/// account the maximum message size. +/// +/// # Errors +/// - If the message is too big +/// - If we fail to read from the stream +pub async fn read_length_delimited( + stream: &mut S, + max_size: usize, +) -> AnyhowResult> { + // Receive the first 8 bytes of the message, which is the length + let mut len_bytes = [0u8; 4]; + stream + .read_exact(&mut len_bytes) + .await + .with_context(|| "Failed to read message length")?; + + // Parse the length of the message as a `u32` + let len = usize::try_from(u32::from_be_bytes(len_bytes))?; + + // Quit if the message is too large + ensure!(len <= max_size, "Message too large"); + + // Read the actual message + let mut message = vec![0u8; len]; + stream + .read_exact(&mut message) + .await + .with_context(|| "Failed to read message")?; + + Ok(message) +} + +/// A helper function to write a length-delimited message to a stream. +/// +/// # Errors +/// - If we fail to write to the stream +pub async fn write_length_delimited( + stream: &mut S, + message: &[u8], +) -> AnyhowResult<()> { + // Write the length of the message + stream + .write_all(&u32::try_from(message.len())?.to_be_bytes()) + .await + .with_context(|| "Failed to write message length")?; + + // Write the actual message + stream + .write_all(message) + .await + .with_context(|| "Failed to write message")?; + + Ok(()) +} + #[cfg(test)] mod test { use rand::Rng; From 4244114f7a15ae7155c2f5a3cf60aec26b997446 Mon Sep 17 00:00:00 2001 From: Rob Date: Tue, 6 Aug 2024 16:54:59 -0400 Subject: [PATCH 12/13] deduplicate some code --- .../src/network/stake_table_transport.rs | 377 +++++++++--------- 1 file changed, 183 insertions(+), 194 deletions(-) diff --git a/crates/libp2p-networking/src/network/stake_table_transport.rs b/crates/libp2p-networking/src/network/stake_table_transport.rs index b6cad13cb4..1e2089f877 100644 --- a/crates/libp2p-networking/src/network/stake_table_transport.rs +++ b/crates/libp2p-networking/src/network/stake_table_transport.rs @@ -12,6 +12,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::Poll; use tracing::warn; +use {std::io::Error as IoError, std::io::ErrorKind as IoErrorKind}; use futures::future::poll_fn; use futures::{AsyncReadExt, AsyncWriteExt}; @@ -36,7 +37,8 @@ const AUTH_HANDSHAKE_TIMEOUT: std::time::Duration = std::time::Duration::from_se /// by performing a handshake that checks if the remote peer is present in the /// stake table. #[pin_project] -pub struct StakeTableAuthentication { +pub struct StakeTableAuthentication +{ #[pin] /// The underlying transport we are wrapping pub inner: T, @@ -51,6 +53,10 @@ pub struct StakeTableAuthentication, } +/// A type alias for the future that upgrades a connection to perform the authentication handshake +type UpgradeFuture = + Pin::Output, ::Error>> + Send>>; + impl StakeTableAuthentication { /// Create a new `StakeTableAuthentication` transport that wraps the given transport /// and authenticates connections against the stake table. @@ -62,6 +68,158 @@ impl StakeTableAuthentica pd: std::marker::PhantomData, } } + + /// Prove to the remote peer that we are in the stake table by sending + /// them our authentication message. + /// + /// # Errors + /// - If we fail to write the message to the stream + pub async fn authenticate_with_remote_peer( + stream: &mut W, + auth_message: Arc>>, + ) -> AnyhowResult<()> { + // If we have an auth message, send it to the remote peer, prefixed with + // the message length + if let Some(auth_message) = auth_message.as_ref() { + // Write the length-delimited message + write_length_delimited(stream, auth_message).await?; + } + + Ok(()) + } + + /// Verify that the remote peer is: + /// - In the stake table + /// - Sending us a valid authentication message + /// - Sending us a valid signature + /// - Matching the peer ID we expect + /// + /// # Errors + /// If the peer fails verification. This can happen if: + /// - We fail to read the message from the stream + /// - The message is too large + /// - The message is invalid + /// - The peer is not in the stake table + /// - The signature is invalid + pub async fn verify_peer_authentication( + stream: &mut R, + stake_table: Arc>>, + required_peer_id: &PeerId, + ) -> AnyhowResult<()> { + // If we have a stake table, check if the remote peer is in it + if let Some(stake_table) = stake_table.as_ref() { + // Read the length-delimited message from the remote peer + let message = read_length_delimited(stream, MAX_AUTH_MESSAGE_SIZE).await?; + + // Deserialize the authentication message + let auth_message: AuthMessage = bincode::deserialize(&message) + .with_context(|| "Failed to deserialize auth message")?; + + // Verify the signature on the public keys + let public_key = auth_message + .validate() + .with_context(|| "Failed to verify authentication message")?; + + // Deserialize the `PeerId` + let peer_id = PeerId::from_bytes(&auth_message.peer_id_bytes) + .with_context(|| "Failed to deserialize peer ID")?; + + // Verify that the peer ID is the same as the remote peer + if peer_id != *required_peer_id { + return Err(anyhow::anyhow!("Peer ID mismatch")); + } + + // Check if the public key is in the stake table + if !stake_table.contains(&public_key) { + return Err(anyhow::anyhow!("Peer not in stake table")); + } + } + + Ok(()) + } + + /// Wrap the supplied future in an upgrade that performs the authentication handshake. + /// + /// `outgoing` is a boolean that indicates if the connection is incoming or outgoing. + /// This is needed because the flow of the handshake is different for each. + fn gen_handshake> + Send + 'static>( + original_future: F, + outgoing: bool, + stake_table: Arc>>, + auth_message: Arc>>, + ) -> UpgradeFuture + where + T::Error: From<::Error> + From, + T::Output: AsOutput + Send, + + C::Substream: Unpin + Send, + { + // Create a new upgrade that performs the authentication handshake on top + Box::pin(async move { + // Wait for the original future to resolve + let mut stream = original_future.await?; + + // Time out the authentication block + async_timeout(AUTH_HANDSHAKE_TIMEOUT, async { + // Open a substream for the handshake. + // The handshake order depends on whether the connection is incoming or outgoing. + let mut substream = if outgoing { + poll_fn(|cx| stream.as_connection().poll_outbound_unpin(cx)).await? + } else { + poll_fn(|cx| stream.as_connection().poll_inbound_unpin(cx)).await? + }; + + if outgoing { + // If the connection is outgoing, authenticate with the remote peer first + Self::authenticate_with_remote_peer(&mut substream, auth_message) + .await + .map_err(|e| { + warn!("Failed to authenticate with remote peer: {:?}", e); + IoError::new(IoErrorKind::Other, e) + })?; + + // Verify the remote peer's authentication + Self::verify_peer_authentication( + &mut substream, + stake_table, + stream.as_peer_id(), + ) + .await + .map_err(|e| { + warn!("Failed to verify remote peer: {:?}", e); + IoError::new(IoErrorKind::Other, e) + })?; + } else { + // If it is incoming, verify the remote peer's authentication first + Self::verify_peer_authentication( + &mut substream, + stake_table, + stream.as_peer_id(), + ) + .await + .map_err(|e| { + warn!("Failed to verify remote peer: {:?}", e); + IoError::new(IoErrorKind::Other, e) + })?; + + // Authenticate with the remote peer + Self::authenticate_with_remote_peer(&mut substream, auth_message) + .await + .map_err(|e| { + warn!("Failed to authenticate with remote peer: {:?}", e); + IoError::new(IoErrorKind::Other, e) + })?; + } + + Ok(stream) + }) + .await + .map_err(|e| { + warn!("Timed out performing authentication handshake: {:?}", e); + IoError::new(IoErrorKind::TimedOut, e) + })? + }) + } } /// The deserialized form of an authentication message that is sent to the remote peer @@ -131,86 +289,13 @@ pub fn construct_auth_message( bincode::serialize(&auth_message).with_context(|| "Failed to serialize auth message") } -/// Prove to the remote peer that we are in the stake table by sending -/// them our authentication message. -/// -/// # Errors -/// - If we fail to write the message to the stream -pub async fn authenticate_with_remote_peer( - stream: &mut W, - auth_message: Arc>>, -) -> AnyhowResult<()> { - // If we have an auth message, send it to the remote peer, prefixed with - // the message length - if let Some(auth_message) = auth_message.as_ref() { - // Write the length-delimited message - write_length_delimited(stream, auth_message).await?; - } - - Ok(()) -} - -/// Verify that the remote peer is: -/// - In the stake table -/// - Sending us a valid authentication message -/// - Sending us a valid signature -/// - Matching the peer ID we expect -/// -/// # Errors -/// If the peer fails verification. This can happen if: -/// - We fail to read the message from the stream -/// - The message is too large -/// - The message is invalid -/// - The peer is not in the stake table -/// - The signature is invalid -pub async fn verify_peer_authentication< - R: AsyncReadExt + Unpin, - S: SignatureKey, - H: BuildHasher, ->( - stream: &mut R, - stake_table: Arc>>, - required_peer_id: &PeerId, -) -> AnyhowResult<()> { - // If we have a stake table, check if the remote peer is in it - if let Some(stake_table) = stake_table.as_ref() { - // Read the length-delimited message from the remote peer - let message = read_length_delimited(stream, MAX_AUTH_MESSAGE_SIZE).await?; - - // Deserialize the authentication message - let auth_message: AuthMessage = - bincode::deserialize(&message).with_context(|| "Failed to deserialize auth message")?; - - // Verify the signature on the public keys - let public_key = auth_message - .validate() - .with_context(|| "Failed to verify authentication message")?; - - // Deserialize the `PeerId` - let peer_id = PeerId::from_bytes(&auth_message.peer_id_bytes) - .with_context(|| "Failed to deserialize peer ID")?; - - // Verify that the peer ID is the same as the remote peer - if peer_id != *required_peer_id { - return Err(anyhow::anyhow!("Peer ID mismatch")); - } - - // Check if the public key is in the stake table - if !stake_table.contains(&public_key) { - return Err(anyhow::anyhow!("Peer not in stake table")); - } - } - - Ok(()) -} - impl Transport for StakeTableAuthentication where T::Dial: Future> + Send + 'static, T::ListenerUpgrade: Send + 'static, T::Output: AsOutput + Send, - T::Error: From<::Error> + From, + T::Error: From<::Error> + From, C::Substream: Unpin + Send, { @@ -237,42 +322,7 @@ where // If the dial was successful, perform the authentication handshake on top match res { - Ok(dial) => Ok(Box::pin(async move { - // Perform the inner dial - let mut stream = dial.await?; - - // Time out the authentication block - async_timeout(AUTH_HANDSHAKE_TIMEOUT, async { - // Open a substream for the handshake - let mut substream = - poll_fn(|cx| stream.as_connection().poll_outbound_unpin(cx)).await?; - - // (outbound) Authenticate with the remote peer - authenticate_with_remote_peer(&mut substream, auth_message) - .await - .map_err(|e| { - warn!("Failed to authenticate with remote peer: {:?}", e); - std::io::Error::new(std::io::ErrorKind::Other, e) - })?; - - // (inbound) Verify the remote peer's authentication - verify_peer_authentication(&mut substream, stake_table, stream.as_peer_id()) - .await - .map_err(|e| { - warn!("Failed to verify remote peer: {:?}", e); - std::io::Error::new(std::io::ErrorKind::Other, e) - })?; - - Ok::<(), T::Error>(()) - }) - .await - .map_err(|e| { - warn!("Timed out during authentication handshake: {:?}", e); - std::io::Error::new(std::io::ErrorKind::TimedOut, e) - })??; - - Ok(stream) - })), + Ok(dial) => Ok(Self::gen_handshake(dial, true, stake_table, auth_message)), Err(err) => Err(err), } } @@ -293,42 +343,7 @@ where // If the dial was successful, perform the authentication handshake on top match res { - Ok(dial) => Ok(Box::pin(async move { - // Perform the inner dial - let mut stream = dial.await?; - - // Time out the authentication block - async_timeout(AUTH_HANDSHAKE_TIMEOUT, async { - // Open a substream for the handshake - let mut substream = - poll_fn(|cx| stream.as_connection().poll_outbound_unpin(cx)).await?; - - // (inbound) Verify the remote peer's authentication - verify_peer_authentication(&mut substream, stake_table, stream.as_peer_id()) - .await - .map_err(|e| { - warn!("Failed to verify remote peer: {:?}", e); - std::io::Error::new(std::io::ErrorKind::Other, e) - })?; - - // (outbound) Authenticate with the remote peer - authenticate_with_remote_peer(&mut substream, auth_message) - .await - .map_err(|e| { - warn!("Failed to authenticate with remote peer: {:?}", e); - std::io::Error::new(std::io::ErrorKind::Other, e) - })?; - - Ok::<(), T::Error>(()) - }) - .await - .map_err(|e| { - warn!("Timed out performing authentication handshake: {:?}", e); - std::io::Error::new(std::io::ErrorKind::TimedOut, e) - })??; - - Ok(stream) - })), + Ok(dial) => Ok(Self::gen_handshake(dial, false, stake_table, auth_message)), Err(err) => Err(err), } } @@ -354,47 +369,9 @@ where let auth_message = Arc::clone(&self.auth_message); let stake_table = Arc::clone(&self.stake_table); - // Create a new upgrade that performs the authentication handshake on top - let auth_upgrade = Box::pin(async move { - // Perform the inner upgrade - let mut stream = upgrade.await?; - - // Time out the authentication block - async_timeout(AUTH_HANDSHAKE_TIMEOUT, async { - // Open a substream for the handshake - let mut substream = - poll_fn(|cx| stream.as_connection().poll_inbound_unpin(cx)).await?; - - // (inbound) Verify the remote peer's authentication - verify_peer_authentication( - &mut substream, - stake_table, - stream.as_peer_id(), - ) - .await - .map_err(|e| { - warn!("Failed to verify remote peer: {:?}", e); - std::io::Error::new(std::io::ErrorKind::Other, e) - })?; - - // (outbound) Authenticate with the remote peer - authenticate_with_remote_peer(&mut substream, auth_message) - .await - .map_err(|e| { - warn!("Failed to authenticate with remote peer: {:?}", e); - std::io::Error::new(std::io::ErrorKind::Other, e) - })?; - - Ok::<(), T::Error>(()) - }) - .await - .map_err(|e| { - warn!("Timed out performing authentication handshake: {:?}", e); - std::io::Error::new(std::io::ErrorKind::TimedOut, e) - })??; - - Ok(stream) - }); + // Generate the handshake upgrade future (inbound) + let auth_upgrade = + Self::gen_handshake(upgrade, false, stake_table, auth_message); // Return the new event TransportEvent::Incoming { @@ -539,6 +516,7 @@ pub async fn write_length_delimited( #[cfg(test)] mod test { + use libp2p::{core::transport::dummy::DummyTransport, quic::Connection}; use rand::Rng; use std::{collections::HashSet, sync::Arc}; @@ -546,7 +524,10 @@ mod test { use super::write_length_delimited; use hotshot_types::{signature_key::BLSPubKey, traits::signature_key::SignatureKey}; - use super::verify_peer_authentication; + use super::StakeTableAuthentication; + + /// A mock type to help with readability + type MockStakeTableAuth = StakeTableAuthentication; // Helper macro for generating a new identity and authentication message macro_rules! new_identity { @@ -657,8 +638,12 @@ mod test { stake_table.insert(keypair.0); // Verify the authentication message - let result = - verify_peer_authentication(&mut stream, Arc::new(Some(stake_table)), &peer_id).await; + let result = MockStakeTableAuth::verify_peer_authentication( + &mut stream, + Arc::new(Some(stake_table)), + &peer_id, + ) + .await; assert!( result.is_ok(), @@ -679,8 +664,12 @@ mod test { let stake_table: HashSet = std::collections::HashSet::new(); // Verify the authentication message - let result = - verify_peer_authentication(&mut stream, Arc::new(Some(stake_table)), &peer_id).await; + let result = MockStakeTableAuth::verify_peer_authentication( + &mut stream, + Arc::new(Some(stake_table)), + &peer_id, + ) + .await; // Make sure it errored for the right reason assert!( @@ -709,7 +698,7 @@ mod test { stake_table.insert(keypair.0); // Check against the malicious peer ID - let result = verify_peer_authentication( + let result = MockStakeTableAuth::verify_peer_authentication( &mut stream, Arc::new(Some(stake_table)), &malicious_peer_id, From b23bb30044b18635b98aad1984ea6fdc877732dc Mon Sep 17 00:00:00 2001 From: Rob Date: Tue, 6 Aug 2024 16:57:33 -0400 Subject: [PATCH 13/13] rename file --- crates/hotshot/src/traits/networking/libp2p_network.rs | 2 +- crates/libp2p-networking/src/network/mod.rs | 4 ++-- .../network/{stake_table_transport.rs => transport.rs} | 10 +++------- 3 files changed, 6 insertions(+), 10 deletions(-) rename crates/libp2p-networking/src/network/{stake_table_transport.rs => transport.rs} (99%) diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index d6ba27fff4..8eeff5084e 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -59,7 +59,7 @@ use libp2p_networking::{ network::{ behaviours::request_response::{Request, Response}, spawn_network_node, - stake_table_transport::construct_auth_message, + transport::construct_auth_message, MeshParams, NetworkEvent::{self, DirectRequest, DirectResponse, GossipMsg}, NetworkNodeConfig, NetworkNodeConfigBuilder, NetworkNodeHandle, NetworkNodeReceiver, diff --git a/crates/libp2p-networking/src/network/mod.rs b/crates/libp2p-networking/src/network/mod.rs index 3d53f0dcf7..a48a7985b0 100644 --- a/crates/libp2p-networking/src/network/mod.rs +++ b/crates/libp2p-networking/src/network/mod.rs @@ -7,7 +7,7 @@ pub mod error; /// functionality of a libp2p network node mod node; /// Alternative Libp2p transport implementations -pub mod stake_table_transport; +pub mod transport; use std::{collections::HashSet, fmt::Debug, str::FromStr}; @@ -33,8 +33,8 @@ use quic::async_std::Transport as QuicTransport; #[cfg(async_executor_impl = "tokio")] use quic::tokio::Transport as QuicTransport; use serde::{Deserialize, Serialize}; -use stake_table_transport::StakeTableAuthentication; use tracing::instrument; +use transport::StakeTableAuthentication; use self::behaviours::request_response::{Request, Response}; pub use self::{ diff --git a/crates/libp2p-networking/src/network/stake_table_transport.rs b/crates/libp2p-networking/src/network/transport.rs similarity index 99% rename from crates/libp2p-networking/src/network/stake_table_transport.rs rename to crates/libp2p-networking/src/network/transport.rs index 1e2089f877..7c754235c1 100644 --- a/crates/libp2p-networking/src/network/stake_table_transport.rs +++ b/crates/libp2p-networking/src/network/transport.rs @@ -521,11 +521,9 @@ mod test { use std::{collections::HashSet, sync::Arc}; - use super::write_length_delimited; + use super::*; use hotshot_types::{signature_key::BLSPubKey, traits::signature_key::SignatureKey}; - use super::StakeTableAuthentication; - /// A mock type to help with readability type MockStakeTableAuth = StakeTableAuthentication; @@ -723,12 +721,10 @@ mod test { // Write the message to a buffer let mut buffer = Vec::new(); - super::write_length_delimited(&mut buffer, message) - .await - .unwrap(); + write_length_delimited(&mut buffer, message).await.unwrap(); // Read the message from the buffer - let read_message = super::read_length_delimited(&mut buffer.as_slice(), 1024) + let read_message = read_length_delimited(&mut buffer.as_slice(), 1024) .await .unwrap();