diff --git a/node/network/approval-distribution/src/lib.rs b/node/network/approval-distribution/src/lib.rs index 017538cae5f3..bf8a60fcefba 100644 --- a/node/network/approval-distribution/src/lib.rs +++ b/node/network/approval-distribution/src/lib.rs @@ -24,6 +24,7 @@ use futures::{channel::oneshot, FutureExt as _}; use polkadot_node_network_protocol::{ self as net_protocol, grid_topology::{RandomRouting, RequiredRouting, SessionGridTopologies, SessionGridTopology}, + peer_set::MAX_NOTIFICATION_SIZE, v1 as protocol_v1, PeerId, UnifiedReputationChange as Rep, Versioned, View, }; use polkadot_node_primitives::approval::{ @@ -1381,14 +1382,7 @@ impl State { "Sending assignments to unified peer", ); - sender - .send_message(NetworkBridgeTxMessage::SendValidationMessage( - vec![peer_id], - Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( - protocol_v1::ApprovalDistributionMessage::Assignments(assignments_to_send), - )), - )) - .await; + send_assignments_batched(sender, assignments_to_send, peer_id).await; } if !approvals_to_send.is_empty() { @@ -1399,14 +1393,7 @@ impl State { "Sending approvals to unified peer", ); - sender - .send_message(NetworkBridgeTxMessage::SendValidationMessage( - vec![peer_id], - Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( - protocol_v1::ApprovalDistributionMessage::Approvals(approvals_to_send), - )), - )) - .await; + send_approvals_batched(sender, approvals_to_send, peer_id).await; } } @@ -1605,23 +1592,11 @@ async fn adjust_required_routing_and_propagate ApprovalDistribution { SpawnedSubsystem { name: "approval-distribution-subsystem", future } } } + +/// Ensures the batch size is always at least 1 element. +const fn ensure_size_not_zero(size: usize) -> usize { + if 0 == size { + panic!("Batch size must be at least 1 (MAX_NOTIFICATION_SIZE constant is too low)",); + } + + size +} + +/// The maximum amount of assignments per batch is 33% of maximum allowed by protocol. +/// This is an arbitrary value. Bumping this up increases the maximum amount of approvals or assignments +/// we send in a single message to peers. Exceeding `MAX_NOTIFICATION_SIZE` will violate the protocol +/// configuration. +pub const MAX_ASSIGNMENT_BATCH_SIZE: usize = ensure_size_not_zero( + MAX_NOTIFICATION_SIZE as usize / + std::mem::size_of::<(IndirectAssignmentCert, CandidateIndex)>() / + 3, +); + +/// The maximum amount of approvals per batch is 33% of maximum allowed by protocol. +pub const MAX_APPROVAL_BATCH_SIZE: usize = ensure_size_not_zero( + MAX_NOTIFICATION_SIZE as usize / std::mem::size_of::() / 3, +); + +/// Send assignments while honoring the `max_notification_size` of the protocol. +/// +/// Splitting the messages into multiple notifications allows more granular processing at the +/// destination, such that the subsystem doesn't get stuck for long processing a batch +/// of assignments and can `select!` other tasks. +pub(crate) async fn send_assignments_batched( + sender: &mut impl overseer::ApprovalDistributionSenderTrait, + assignments: Vec<(IndirectAssignmentCert, CandidateIndex)>, + peer: PeerId, +) { + let mut batches = assignments.into_iter().peekable(); + + while batches.peek().is_some() { + let batch: Vec<_> = batches.by_ref().take(MAX_ASSIGNMENT_BATCH_SIZE).collect(); + + sender + .send_message(NetworkBridgeTxMessage::SendValidationMessage( + vec![peer], + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(batch), + )), + )) + .await; + } +} + +/// Send approvals while honoring the `max_notification_size` of the protocol. +pub(crate) async fn send_approvals_batched( + sender: &mut impl overseer::ApprovalDistributionSenderTrait, + approvals: Vec, + peer: PeerId, +) { + let mut batches = approvals.into_iter().peekable(); + + while batches.peek().is_some() { + let batch: Vec<_> = batches.by_ref().take(MAX_APPROVAL_BATCH_SIZE).collect(); + + sender + .send_message(NetworkBridgeTxMessage::SendValidationMessage( + vec![peer], + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Approvals(batch), + )), + )) + .await; + } +} diff --git a/node/network/approval-distribution/src/tests.rs b/node/network/approval-distribution/src/tests.rs index 73aa461752ba..567cf22f23f1 100644 --- a/node/network/approval-distribution/src/tests.rs +++ b/node/network/approval-distribution/src/tests.rs @@ -2276,3 +2276,147 @@ fn resends_messages_periodically() { virtual_overseer }); } + +fn batch_test_round(message_count: usize) { + use polkadot_node_subsystem::SubsystemContext; + let pool = sp_core::testing::TaskExecutor::new(); + let mut state = State::default(); + + let (mut context, mut virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); + let subsystem = ApprovalDistribution::new(Default::default()); + let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(12345); + let mut sender = context.sender().clone(); + let subsystem = subsystem.run_inner(context, &mut state, &mut rng); + + let test_fut = async move { + let overseer = &mut virtual_overseer; + let validators = 0..message_count; + let assignments: Vec<_> = validators + .clone() + .map(|index| (fake_assignment_cert(Hash::zero(), ValidatorIndex(index as u32)), 0)) + .collect(); + + let approvals: Vec<_> = validators + .map(|index| IndirectSignedApprovalVote { + block_hash: Hash::zero(), + candidate_index: 0, + validator: ValidatorIndex(index as u32), + signature: dummy_signature(), + }) + .collect(); + + let peer = PeerId::random(); + send_assignments_batched(&mut sender, assignments.clone(), peer).await; + send_approvals_batched(&mut sender, approvals.clone(), peer).await; + + // Check expected assignments batches. + for assignment_index in (0..assignments.len()).step_by(super::MAX_ASSIGNMENT_BATCH_SIZE) { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) + )) + )) => { + // Last batch should cover all remaining messages. + if sent_assignments.len() < super::MAX_ASSIGNMENT_BATCH_SIZE { + assert_eq!(sent_assignments.len() + assignment_index, assignments.len()); + } else { + assert_eq!(sent_assignments.len(), super::MAX_ASSIGNMENT_BATCH_SIZE); + } + + assert_eq!(peers.len(), 1); + + for (message_index, assignment) in sent_assignments.iter().enumerate() { + assert_eq!(assignment.0, assignments[assignment_index + message_index].0); + assert_eq!(assignment.1, 0); + } + } + ); + } + + // Check approval vote batching. + for approval_index in (0..approvals.len()).step_by(super::MAX_APPROVAL_BATCH_SIZE) { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals) + )) + )) => { + // Last batch should cover all remaining messages. + if sent_approvals.len() < super::MAX_APPROVAL_BATCH_SIZE { + assert_eq!(sent_approvals.len() + approval_index, approvals.len()); + } else { + assert_eq!(sent_approvals.len(), super::MAX_APPROVAL_BATCH_SIZE); + } + + assert_eq!(peers.len(), 1); + + for (message_index, approval) in sent_approvals.iter().enumerate() { + assert_eq!(approval, &approvals[approval_index + message_index]); + } + } + ); + } + virtual_overseer + }; + + futures::pin_mut!(test_fut); + futures::pin_mut!(subsystem); + + executor::block_on(future::join( + async move { + let mut overseer = test_fut.await; + overseer + .send(FromOrchestra::Signal(OverseerSignal::Conclude)) + .timeout(TIMEOUT) + .await + .expect("Conclude send timeout"); + }, + subsystem, + )); +} + +#[test] +fn batch_sending_1_msg() { + batch_test_round(1); +} + +#[test] +fn batch_sending_exactly_one_batch() { + batch_test_round(super::MAX_APPROVAL_BATCH_SIZE); + batch_test_round(super::MAX_ASSIGNMENT_BATCH_SIZE); +} + +#[test] +fn batch_sending_partial_batch() { + batch_test_round(super::MAX_APPROVAL_BATCH_SIZE * 2 + 4); + batch_test_round(super::MAX_ASSIGNMENT_BATCH_SIZE * 2 + 4); +} + +#[test] +fn batch_sending_multiple_same_len() { + batch_test_round(super::MAX_APPROVAL_BATCH_SIZE * 10); + batch_test_round(super::MAX_ASSIGNMENT_BATCH_SIZE * 10); +} + +#[test] +fn batch_sending_half_batch() { + batch_test_round(super::MAX_APPROVAL_BATCH_SIZE / 2); + batch_test_round(super::MAX_ASSIGNMENT_BATCH_SIZE / 2); +} + +#[test] +#[should_panic] +fn const_batch_size_panics_if_zero() { + crate::ensure_size_not_zero(0); +} + +#[test] +fn const_ensure_size_not_zero() { + crate::ensure_size_not_zero(super::MAX_ASSIGNMENT_BATCH_SIZE); + crate::ensure_size_not_zero(super::MAX_APPROVAL_BATCH_SIZE); +} diff --git a/node/network/protocol/src/peer_set.rs b/node/network/protocol/src/peer_set.rs index d9d2925e594d..22eddc44c42f 100644 --- a/node/network/protocol/src/peer_set.rs +++ b/node/network/protocol/src/peer_set.rs @@ -36,7 +36,7 @@ const LEGACY_COLLATION_PROTOCOL_V1: &str = "/polkadot/collation/1"; const LEGACY_PROTOCOL_VERSION_V1: u32 = 1; /// Max notification size is currently constant. -const MAX_NOTIFICATION_SIZE: u64 = 100 * 1024; +pub const MAX_NOTIFICATION_SIZE: u64 = 100 * 1024; /// The peer-sets and thus the protocols which are used for the network. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter)]