Skip to content

Commit

Permalink
Allow for custom [libp2p::Transport] implementations for NetworkWor…
Browse files Browse the repository at this point in the history
…ker. Every such implementation should provide authentication and muxing mechanisms. (paritytech#9)
  • Loading branch information
fixxxedpoint authored Jul 5, 2024
1 parent 7fcabea commit 8522cd0
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 43 deletions.
43 changes: 33 additions & 10 deletions substrate/client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::{
NotificationSenderReady as NotificationSenderReadyT,
},
},
transport,
transport::{self, build_transport, NetworkConfig},
types::ProtocolName,
ReputationChange,
};
Expand All @@ -69,7 +69,11 @@ use libp2p::{
AddressScore, ConnectionError, ConnectionId, ConnectionLimits, DialError, Executor,
ListenError, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent, THandlerErr,
},
Multiaddr, PeerId,
Multiaddr, PeerId, TransportExt,
};
use libp2p::{
core::{muxing::StreamMuxerBox, StreamMuxer},
Transport,
};
use log::{debug, error, info, trace, warn};
use metrics::{Histogram, MetricSources, Metrics};
Expand Down Expand Up @@ -144,12 +148,28 @@ where
B: BlockT + 'static,
H: ExHashT,
{
/// Creates the network service.
/// Creates the network service. It allows to provide a custom implementation
/// of the [`libp2p::Transport`] for its underlying network transport,
/// i.e. a transport that should at minimum provide authentication and multiplexing.
/// Default implementation can be aquired by calling [`crate::transport::build_transport`].
///
/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
/// for the network processing to advance. From it, you can extract a `NetworkService` using
/// `worker.service()`. The `NetworkService` can be shared through the codebase.
pub fn new(params: Params<B>) -> Result<Self, Error> {
pub fn new<SM, T>(
params: Params<B>,
transport_builder: impl FnOnce(NetworkConfig) -> T,
) -> Result<Self, Error>
where
T: Transport<Output = (PeerId, SM)> + Send + Unpin + 'static,
T::Dial: Send,
T::ListenerUpgrade: Send,
T::Error: Send + Sync,

SM: StreamMuxer + Send + 'static,
SM::Substream: Send,
SM::Error: Send + Sync,
{
let FullNetworkConfiguration {
notification_protocols,
request_response_protocols,
Expand Down Expand Up @@ -260,12 +280,15 @@ where
.saturating_add(10)
};

transport::build_transport(
local_identity.clone(),
config_mem,
network_config.yamux_window_size,
yamux_maximum_buffer_size,
)
transport_builder(NetworkConfig {
keypair: local_identity.clone(),
memory_only: config_mem,
muxer_window_size: network_config.yamux_window_size,
muxer_maximum_buffer_size: yamux_maximum_buffer_size,
})
.map(|(peer_id, stream_muxer), _| (peer_id, StreamMuxerBox::new(stream_muxer)))
.boxed()
.with_bandwidth_logging()
};

let (to_notifications, from_protocol_controllers) =
Expand Down
116 changes: 84 additions & 32 deletions substrate/client/network/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,43 @@
//! Transport that serves as a common ground for all connections.

use either::Either;
use futures::{AsyncRead, AsyncWrite};
use libp2p::{
core::{
muxing::StreamMuxerBox,
transport::{Boxed, OptionalTransport},
upgrade,
},
dns, identity, noise, tcp, websocket, PeerId, Transport, TransportExt,
core::{transport::OptionalTransport, upgrade, StreamMuxer},
dns, identity,
identity::Keypair,
noise, tcp, websocket, PeerId, Transport,
};
use std::{sync::Arc, time::Duration};
use std::time::Duration;

pub use libp2p::bandwidth::BandwidthSinks;

/// Builds the transport that serves as a common ground for all connections.
///
/// If `memory_only` is true, then only communication within the same process are allowed. Only
/// addresses with the format `/memory/...` are allowed.
///
/// `yamux_window_size` is the maximum size of the Yamux receive windows. `None` to leave the
/// default (256kiB).
///
/// `yamux_maximum_buffer_size` is the maximum allowed size of the Yamux buffer. This should be
/// set either to the maximum of all the maximum allowed sizes of messages frames of all
/// high-level protocols combined, or to some generously high value if you are sure that a maximum
/// size is enforced on all high-level protocols.
///
/// Returns a `BandwidthSinks` object that allows querying the average bandwidth produced by all
/// the connections spawned with this transport.
pub fn build_transport(
keypair: identity::Keypair,
/// Describes network configuration used for building instances of [`libp2p::Transport`].
pub struct NetworkConfig {
/// Our network identity.
pub keypair: Keypair,
/// Indicates whether created [`Transport`] should be only memory-based.
pub memory_only: bool,
/// Window size of the muxer.
pub muxer_window_size: Option<u32>,
/// Buffer size of the muxer.
pub muxer_maximum_buffer_size: usize,
}

/// Creates default base layer of network transport, i.e. a transport that allows connectivity for
/// `WS + WSS` (with `DNS`) or `TCP + WS` (when `DNS` is not available). It can be used as basis for
/// building a custom implementation of authenticated and mutliplexed [`libp2p::Transport`] that is
/// required by the [`NetworkWorker`].
pub fn build_basic_transport(
memory_only: bool,
yamux_window_size: Option<u32>,
yamux_maximum_buffer_size: usize,
) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc<BandwidthSinks>) {
) -> impl Transport<
Output = impl AsyncRead + AsyncWrite,
Dial = impl Send,
ListenerUpgrade = impl Send,
Error = impl Send,
> + Send {
// Build the base layer of the transport.
let transport = if !memory_only {
if !memory_only {
// Main transport: DNS(TCP)
let tcp_config = tcp::Config::new().nodelay(true);
let tcp_trans = tcp::tokio::Transport::new(tcp_config.clone());
Expand All @@ -78,8 +80,27 @@ pub fn build_transport(
})
} else {
Either::Right(OptionalTransport::some(libp2p::core::transport::MemoryTransport::default()))
};
}
}

/// Adds authentication and multiplexing to a given implementation of [`libp2p::Transport`].
/// It uses the `noise` protocol for authentication and the `yamux` library for connection multiplexing.
pub fn add_authentication_and_muxing(
keypair: identity::Keypair,
yamux_window_size: Option<u32>,
yamux_maximum_buffer_size: usize,
transport: impl Transport<
Output = impl AsyncRead + AsyncWrite + Send + Unpin + 'static,
Dial = impl Send,
ListenerUpgrade = impl Send,
Error = impl Send + 'static,
> + Send,
) -> impl Transport<
Output = (PeerId, impl StreamMuxer<Substream = impl Send, Error = impl Send> + Send),
Dial = impl Send,
ListenerUpgrade = impl Send,
Error = impl Send,
> + Send {
let authentication_config = noise::Config::new(&keypair).expect("Can create noise config. qed");
let multiplexing_config = {
let mut yamux_config = libp2p::yamux::Config::default();
Expand All @@ -95,12 +116,43 @@ pub fn build_transport(
yamux_config
};

let transport = transport
transport
.upgrade(upgrade::Version::V1Lazy)
.authenticate(authentication_config)
.multiplex(multiplexing_config)
.timeout(Duration::from_secs(20))
.boxed();
}

transport.with_bandwidth_logging()
/// Builds the transport that serves as a common ground for all connections.
///
/// If `memory_only` is true, then only communication within the same process are allowed. Only
/// addresses with the format `/memory/...` are allowed.
///
/// `yamux_window_size` is the maximum size of the Yamux receive windows. `None` to leave the
/// default (256kiB).
///
/// `yamux_maximum_buffer_size` is the maximum allowed size of the Yamux buffer. This should be
/// set either to the maximum of all the maximum allowed sizes of messages frames of all
/// high-level protocols combined, or to some generously high value if you are sure that a maximum
/// size is enforced on all high-level protocols.
///
/// Returns a multiplexed and authenticated implementation of [`libp2p::Transport``].
pub fn build_transport(
keypair: identity::Keypair,
memory_only: bool,
yamux_window_size: Option<u32>,
yamux_maximum_buffer_size: usize,
) -> impl Transport<
Output = (PeerId, impl StreamMuxer<Substream = impl Send, Error = impl Send> + Send),
Dial = impl Send,
ListenerUpgrade = impl Send,
Error = impl Send,
> + Send {
let basic_transport = build_basic_transport(memory_only);
add_authentication_and_muxing(
keypair,
yamux_window_size,
yamux_maximum_buffer_size,
basic_transport,
)
}
9 changes: 8 additions & 1 deletion substrate/client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,14 @@ where
};

let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
let network_mut = sc_network::NetworkWorker::new(network_params)?;
let network_mut = sc_network::NetworkWorker::new(network_params, |config| {
sc_network::transport::build_transport(
config.keypair,
config.memory_only,
config.muxer_window_size,
config.muxer_maximum_buffer_size,
)
})?;
let network = network_mut.service().clone();

let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
Expand Down

0 comments on commit 8522cd0

Please sign in to comment.