From 6b84431e6c1dcda4b989ec0ca6bb88ab97750cd8 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Wed, 13 Nov 2024 19:37:03 +0100 Subject: [PATCH] Add litep2p network protocol benches (#6455) # Description Add support to run networking protocol benchmarks with litep2p backend. Now we can compare the work of both libp2p and litep2p backends for notifications and request-response protocols. Next step: extract worker initialization from the benchmark loop. ### Example run on local machine image ## Integration Does not affect downstream projects. ## Review Notes https://github.com/paritytech/polkadot-sdk/blob/d4d9502538e8a940b809ecc77843af3cea101e19/substrate/client/network/src/litep2p/service.rs#L510-L520 This method should be implemented to run request benchmarks. --------- Co-authored-by: GitHub Action --- prdoc/pr_6455.prdoc | 8 ++ .../network/benches/notifications_protocol.rs | 114 ++++++++++++----- .../benches/request_response_protocol.rs | 115 +++++++++++------- 3 files changed, 164 insertions(+), 73 deletions(-) create mode 100644 prdoc/pr_6455.prdoc diff --git a/prdoc/pr_6455.prdoc b/prdoc/pr_6455.prdoc new file mode 100644 index 000000000000..9a83048e2fd2 --- /dev/null +++ b/prdoc/pr_6455.prdoc @@ -0,0 +1,8 @@ +title: Add litep2p network protocol benches +doc: +- audience: Node Dev + description: |- + Adds networking protocol benchmarks with litep2p backend +crates: +- name: sc-network + validate: false diff --git a/substrate/client/network/benches/notifications_protocol.rs b/substrate/client/network/benches/notifications_protocol.rs index 7d32c9faeba1..c1e18c7b7f47 100644 --- a/substrate/client/network/benches/notifications_protocol.rs +++ b/substrate/client/network/benches/notifications_protocol.rs @@ -22,15 +22,17 @@ use criterion::{ }; use sc_network::{ config::{ - FullNetworkConfiguration, MultiaddrWithPeerId, NetworkConfiguration, NonDefaultSetConfig, - NonReservedPeerMode, NotificationHandshake, Params, ProtocolId, Role, SetConfig, + FullNetworkConfiguration, MultiaddrWithPeerId, NetworkConfiguration, NonReservedPeerMode, + NotificationHandshake, Params, ProtocolId, Role, SetConfig, }, service::traits::NotificationEvent, - NetworkWorker, NotificationMetrics, NotificationService, Roles, + Litep2pNetworkBackend, NetworkBackend, NetworkWorker, NotificationMetrics, NotificationService, + Roles, }; -use sc_network_common::sync::message::BlockAnnouncesHandshake; +use sc_network_common::{sync::message::BlockAnnouncesHandshake, ExHashT}; use sc_network_types::build_multiaddr; -use sp_runtime::traits::Zero; +use sp_core::H256; +use sp_runtime::traits::{Block as BlockT, Zero}; use std::{ net::{IpAddr, Ipv4Addr, TcpListener}, str::FromStr, @@ -61,12 +63,20 @@ fn get_listen_address() -> sc_network::Multiaddr { build_multiaddr!(Ip4(ip), Tcp(port)) } -pub fn create_network_worker( +fn create_network_worker( listen_addr: sc_network::Multiaddr, -) -> (NetworkWorker, Box) { +) -> (N, Box) +where + B: BlockT + 'static, + H: ExHashT, + N: NetworkBackend, +{ let role = Role::Full; + let mut net_conf = NetworkConfiguration::new_local(); + net_conf.listen_addresses = vec![listen_addr]; + let network_config = FullNetworkConfiguration::::new(&net_conf, None); let genesis_hash = runtime::Hash::zero(); - let (block_announce_config, notification_service) = NonDefaultSetConfig::new( + let (block_announce_config, notification_service) = N::notification_config( "/block-announces/1".into(), vec!["/bench-notifications-protocol/block-announces/1".into()], MAX_SIZE, @@ -82,21 +92,17 @@ pub fn create_network_worker( reserved_nodes: vec![], non_reserved_mode: NonReservedPeerMode::Accept, }, + NotificationMetrics::new(None), + network_config.peer_store_handle(), ); - let mut net_conf = NetworkConfiguration::new_local(); - net_conf.listen_addresses = vec![listen_addr]; - let worker = NetworkWorker::::new(Params::< - runtime::Block, - runtime::Hash, - NetworkWorker<_, _>, - > { + let worker = N::new(Params:: { block_announce_config, role, executor: Box::new(|f| { tokio::spawn(f); }), genesis_hash, - network_config: FullNetworkConfiguration::new(&net_conf, None), + network_config, protocol_id: ProtocolId::from("bench-protocol-name"), fork_id: None, metrics_registry: None, @@ -108,14 +114,21 @@ pub fn create_network_worker( (worker, notification_service) } -async fn run_serially(size: usize, limit: usize) { +async fn run_serially(size: usize, limit: usize) +where + B: BlockT + 'static, + H: ExHashT, + N: NetworkBackend, +{ let listen_address1 = get_listen_address(); let listen_address2 = get_listen_address(); - let (worker1, mut notification_service1) = create_network_worker(listen_address1); - let (worker2, mut notification_service2) = create_network_worker(listen_address2.clone()); - let peer_id2: sc_network::PeerId = (*worker2.local_peer_id()).into(); + let (worker1, mut notification_service1) = create_network_worker::(listen_address1); + let (worker2, mut notification_service2) = + create_network_worker::(listen_address2.clone()); + let peer_id2: sc_network::PeerId = worker2.network_service().local_peer_id().into(); worker1 + .network_service() .add_reserved_peer(MultiaddrWithPeerId { multiaddr: listen_address2, peer_id: peer_id2 }) .unwrap(); @@ -124,6 +137,7 @@ async fn run_serially(size: usize, limit: usize) { let (tx, rx) = async_channel::bounded(10); let network1 = tokio::spawn(async move { + let mut sent_counter = 0; tokio::pin!(network1_run); loop { tokio::select! { @@ -131,17 +145,25 @@ async fn run_serially(size: usize, limit: usize) { event = notification_service1.next_event() => { match event { Some(NotificationEvent::NotificationStreamOpened { .. }) => { + sent_counter += 1; notification_service1 .send_async_notification(&peer_id2, vec![0; size]) .await .unwrap(); }, + Some(NotificationEvent::NotificationStreamClosed { .. }) => { + if sent_counter >= limit { + break; + } + panic!("Unexpected stream closure {:?}", event); + } event => panic!("Unexpected event {:?}", event), }; }, message = rx.recv() => { match message { Ok(Some(_)) => { + sent_counter += 1; notification_service1 .send_async_notification(&peer_id2, vec![0; size]) .await @@ -185,14 +207,21 @@ async fn run_serially(size: usize, limit: usize) { let _ = tokio::join!(network1, network2); } -async fn run_with_backpressure(size: usize, limit: usize) { +async fn run_with_backpressure(size: usize, limit: usize) +where + B: BlockT + 'static, + H: ExHashT, + N: NetworkBackend, +{ let listen_address1 = get_listen_address(); let listen_address2 = get_listen_address(); - let (worker1, mut notification_service1) = create_network_worker(listen_address1); - let (worker2, mut notification_service2) = create_network_worker(listen_address2.clone()); - let peer_id2: sc_network::PeerId = (*worker2.local_peer_id()).into(); + let (worker1, mut notification_service1) = create_network_worker::(listen_address1); + let (worker2, mut notification_service2) = + create_network_worker::(listen_address2.clone()); + let peer_id2: sc_network::PeerId = worker2.network_service().local_peer_id().into(); worker1 + .network_service() .add_reserved_peer(MultiaddrWithPeerId { multiaddr: listen_address2, peer_id: peer_id2 }) .unwrap(); @@ -265,18 +294,47 @@ fn run_benchmark(c: &mut Criterion) { for &(exponent, label) in EXPONENTS.iter() { let size = 2usize.pow(exponent); group.throughput(Throughput::Bytes(NOTIFICATIONS as u64 * size as u64)); + + group.bench_with_input( + BenchmarkId::new("libp2p/serially", label), + &(size, NOTIFICATIONS), + |b, &(size, limit)| { + b.to_async(&rt).iter(|| { + run_serially::>(size, limit) + }); + }, + ); + group.bench_with_input( + BenchmarkId::new("litep2p/serially", label), + &(size, NOTIFICATIONS), + |b, &(size, limit)| { + b.to_async(&rt).iter(|| { + run_serially::( + size, limit, + ) + }); + }, + ); group.bench_with_input( - BenchmarkId::new("consistently", label), + BenchmarkId::new("libp2p/with_backpressure", label), &(size, NOTIFICATIONS), |b, &(size, limit)| { - b.to_async(&rt).iter(|| run_serially(size, limit)); + b.to_async(&rt).iter(|| { + run_with_backpressure::>( + size, limit, + ) + }); }, ); group.bench_with_input( - BenchmarkId::new("with_backpressure", label), + BenchmarkId::new("litep2p/with_backpressure", label), &(size, NOTIFICATIONS), |b, &(size, limit)| { - b.to_async(&rt).iter(|| run_with_backpressure(size, limit)); + b.to_async(&rt).iter(|| { + run_with_backpressure::( + size, limit, + ) + }); }, ); } diff --git a/substrate/client/network/benches/request_response_protocol.rs b/substrate/client/network/benches/request_response_protocol.rs index 09bf829f5a7e..b428d0d75ac5 100644 --- a/substrate/client/network/benches/request_response_protocol.rs +++ b/substrate/client/network/benches/request_response_protocol.rs @@ -22,16 +22,16 @@ use criterion::{ }; use sc_network::{ config::{ - FullNetworkConfiguration, IncomingRequest, NetworkConfiguration, NonDefaultSetConfig, - NonReservedPeerMode, NotificationHandshake, OutgoingResponse, Params, ProtocolId, Role, - SetConfig, + FullNetworkConfiguration, IncomingRequest, NetworkConfiguration, NonReservedPeerMode, + NotificationHandshake, OutgoingResponse, Params, ProtocolId, Role, SetConfig, }, - IfDisconnected, NetworkBackend, NetworkRequest, NetworkWorker, NotificationMetrics, - NotificationService, Roles, + IfDisconnected, Litep2pNetworkBackend, NetworkBackend, NetworkRequest, NetworkWorker, + NotificationMetrics, NotificationService, Roles, }; -use sc_network_common::sync::message::BlockAnnouncesHandshake; +use sc_network_common::{sync::message::BlockAnnouncesHandshake, ExHashT}; use sc_network_types::build_multiaddr; -use sp_runtime::traits::Zero; +use sp_core::H256; +use sp_runtime::traits::{Block as BlockT, Zero}; use std::{ net::{IpAddr, Ipv4Addr, TcpListener}, str::FromStr, @@ -62,36 +62,38 @@ fn get_listen_address() -> sc_network::Multiaddr { build_multiaddr!(Ip4(ip), Tcp(port)) } -pub fn create_network_worker( +pub fn create_network_worker( listen_addr: sc_network::Multiaddr, -) -> ( - NetworkWorker, - async_channel::Receiver, - Box, -) { +) -> (N, async_channel::Receiver, Box) +where + B: BlockT + 'static, + H: ExHashT, + N: NetworkBackend, +{ let (tx, rx) = async_channel::bounded(10); - let request_response_config = - NetworkWorker::::request_response_config( - "/request-response/1".into(), - vec![], - MAX_SIZE, - MAX_SIZE, - Duration::from_secs(2), - Some(tx), - ); + let request_response_config = N::request_response_config( + "/request-response/1".into(), + vec![], + MAX_SIZE, + MAX_SIZE, + Duration::from_secs(2), + Some(tx), + ); + let role = Role::Full; let mut net_conf = NetworkConfiguration::new_local(); net_conf.listen_addresses = vec![listen_addr]; let mut network_config = FullNetworkConfiguration::new(&net_conf, None); network_config.add_request_response_protocol(request_response_config); - let (block_announce_config, notification_service) = NonDefaultSetConfig::new( + let genesis_hash = runtime::Hash::zero(); + let (block_announce_config, notification_service) = N::notification_config( "/block-announces/1".into(), vec![], 1024, Some(NotificationHandshake::new(BlockAnnouncesHandshake::::build( Roles::from(&Role::Full), Zero::zero(), - runtime::Hash::zero(), - runtime::Hash::zero(), + genesis_hash, + genesis_hash, ))), SetConfig { in_peers: 1, @@ -99,14 +101,12 @@ pub fn create_network_worker( reserved_nodes: vec![], non_reserved_mode: NonReservedPeerMode::Accept, }, + NotificationMetrics::new(None), + network_config.peer_store_handle(), ); - let worker = NetworkWorker::::new(Params::< - runtime::Block, - runtime::Hash, - NetworkWorker<_, _>, - > { + let worker = N::new(Params:: { block_announce_config, - role: Role::Full, + role, executor: Box::new(|f| { tokio::spawn(f); }), @@ -123,15 +123,21 @@ pub fn create_network_worker( (worker, rx, notification_service) } -async fn run_serially(size: usize, limit: usize) { +async fn run_serially(size: usize, limit: usize) +where + B: BlockT + 'static, + H: ExHashT, + N: NetworkBackend, +{ let listen_address1 = get_listen_address(); let listen_address2 = get_listen_address(); - let (mut worker1, _rx1, _notification_service1) = create_network_worker(listen_address1); - let service1 = worker1.service().clone(); - let (worker2, rx2, _notification_service2) = create_network_worker(listen_address2.clone()); - let peer_id2 = *worker2.local_peer_id(); + let (worker1, _rx1, _notification_service1) = create_network_worker::(listen_address1); + let service1 = worker1.network_service().clone(); + let (worker2, rx2, _notification_service2) = + create_network_worker::(listen_address2.clone()); + let peer_id2 = worker2.network_service().local_peer_id(); - worker1.add_known_address(peer_id2, listen_address2.into()); + worker1.network_service().add_known_address(peer_id2, listen_address2.into()); let network1_run = worker1.run(); let network2_run = worker2.run(); @@ -188,15 +194,21 @@ async fn run_serially(size: usize, limit: usize) { // The libp2p request-response implementation does not provide any backpressure feedback. // So this benchmark is useless until we implement it for litep2p. #[allow(dead_code)] -async fn run_with_backpressure(size: usize, limit: usize) { +async fn run_with_backpressure(size: usize, limit: usize) +where + B: BlockT + 'static, + H: ExHashT, + N: NetworkBackend, +{ let listen_address1 = get_listen_address(); let listen_address2 = get_listen_address(); - let (mut worker1, _rx1, _notification_service1) = create_network_worker(listen_address1); - let service1 = worker1.service().clone(); - let (worker2, rx2, _notification_service2) = create_network_worker(listen_address2.clone()); - let peer_id2 = *worker2.local_peer_id(); + let (worker1, _rx1, _notification_service1) = create_network_worker::(listen_address1); + let service1 = worker1.network_service().clone(); + let (worker2, rx2, _notification_service2) = + create_network_worker::(listen_address2.clone()); + let peer_id2 = worker2.network_service().local_peer_id(); - worker1.add_known_address(peer_id2, listen_address2.into()); + worker1.network_service().add_known_address(peer_id2, listen_address2.into()); let network1_run = worker1.run(); let network2_run = worker2.run(); @@ -261,10 +273,23 @@ fn run_benchmark(c: &mut Criterion) { let size = 2usize.pow(exponent); group.throughput(Throughput::Bytes(REQUESTS as u64 * size as u64)); group.bench_with_input( - BenchmarkId::new("consistently", label), + BenchmarkId::new("libp2p/serially", label), + &(size, REQUESTS), + |b, &(size, limit)| { + b.to_async(&rt).iter(|| { + run_serially::>(size, limit) + }); + }, + ); + group.bench_with_input( + BenchmarkId::new("litep2p/serially", label), &(size, REQUESTS), |b, &(size, limit)| { - b.to_async(&rt).iter(|| run_serially(size, limit)); + b.to_async(&rt).iter(|| { + run_serially::( + size, limit, + ) + }); }, ); }