From 0398050920f818d4b7c2b47cf751b37bb498560f Mon Sep 17 00:00:00 2001 From: asynchronous rob Date: Wed, 12 Oct 2022 18:30:12 -0500 Subject: [PATCH] refactor grid topology to expose more info to subsystems (#6140) * refactor grid topology to expose more info to subsystems * fix grid_topology test * fix overseer test * Update node/network/protocol/src/grid_topology.rs Co-authored-by: Vsevolod Stakhov * Update node/network/protocol/src/grid_topology.rs Co-authored-by: Andronik * Update node/network/protocol/src/grid_topology.rs Co-authored-by: Andronik * fix bug in populating topology * fmt Co-authored-by: Vsevolod Stakhov Co-authored-by: Andronik --- Cargo.lock | 1 + node/network/approval-distribution/src/lib.rs | 43 ++- .../approval-distribution/src/tests.rs | 95 +++++-- node/network/bitfield-distribution/Cargo.toml | 1 + node/network/bitfield-distribution/src/lib.rs | 33 ++- .../bitfield-distribution/src/tests.rs | 75 +++--- node/network/bridge/src/rx/mod.rs | 71 +++-- node/network/gossip-support/src/lib.rs | 78 ++---- node/network/gossip-support/src/tests.rs | 63 +++-- node/network/protocol/src/grid_topology.rs | 248 ++++++++++++++++-- .../network/statement-distribution/src/lib.rs | 34 ++- .../statement-distribution/src/tests.rs | 106 +++++--- node/overseer/src/tests.rs | 5 +- node/subsystem-types/src/messages.rs | 19 +- .../src/messages/network_bridge_event.rs | 39 +-- .../implementers-guide/src/types/network.md | 20 +- .../src/types/overseer-protocol.md | 17 +- 17 files changed, 617 insertions(+), 331 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 26a5fc452729..278310281f50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6134,6 +6134,7 @@ dependencies = [ "rand 0.8.5", "rand_chacha 0.3.1", "sp-application-crypto", + "sp-authority-discovery", "sp-core", "sp-keyring", "sp-keystore", diff --git a/node/network/approval-distribution/src/lib.rs b/node/network/approval-distribution/src/lib.rs index f0cb4fc24ff8..5afae66ae818 100644 --- a/node/network/approval-distribution/src/lib.rs +++ b/node/network/approval-distribution/src/lib.rs @@ -343,9 +343,13 @@ impl State { }) }, NetworkBridgeEvent::NewGossipTopology(topology) => { - let session = topology.session; - self.handle_new_session_topology(ctx, session, SessionGridTopology::from(topology)) - .await; + self.handle_new_session_topology( + ctx, + topology.session, + topology.topology, + topology.local_index, + ) + .await; }, NetworkBridgeEvent::PeerViewChange(peer_id, view) => { self.handle_peer_view_change(ctx, metrics, peer_id, view, rng).await; @@ -500,8 +504,14 @@ impl State { ctx: &mut Context, session: SessionIndex, topology: SessionGridTopology, + local_index: Option, ) { - self.topologies.insert_topology(session, topology); + if local_index.is_none() { + // this subsystem only matters to validators. + return + } + + self.topologies.insert_topology(session, topology, local_index); let topology = self.topologies.get_topology(session).expect("just inserted above; qed"); adjust_required_routing_and_propagate( @@ -511,7 +521,9 @@ impl State { |block_entry| block_entry.session == session, |required_routing, local, validator_index| { if *required_routing == RequiredRouting::PendingTopology { - *required_routing = topology.required_routing_by_index(*validator_index, local); + *required_routing = topology + .local_grid_neighbors() + .required_routing_by_index(*validator_index, local); } }, ) @@ -861,7 +873,7 @@ impl State { let local = source == MessageSource::Local; let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| { - t.required_routing_by_index(validator_index, local) + t.local_grid_neighbors().required_routing_by_index(validator_index, local) }); let message_state = match entry.candidates.get_mut(claimed_candidate_index as usize) { @@ -902,7 +914,10 @@ impl State { return false } - if let Some(true) = topology.as_ref().map(|t| t.route_to_peer(required_routing, peer)) { + if let Some(true) = topology + .as_ref() + .map(|t| t.local_grid_neighbors().route_to_peer(required_routing, peer)) + { return true } @@ -1169,7 +1184,8 @@ impl State { // the assignment to all aware peers in the required routing _except_ the original // source of the assignment. Hence the `in_topology_check`. // 3. Any randomly selected peers have been sent the assignment already. - let in_topology = topology.map_or(false, |t| t.route_to_peer(required_routing, peer)); + let in_topology = topology + .map_or(false, |t| t.local_grid_neighbors().route_to_peer(required_routing, peer)); in_topology || knowledge.sent.contains(message_subject, MessageKind::Assignment) }; @@ -1301,9 +1317,9 @@ impl State { let required_routing = message_state.required_routing; let rng = &mut *rng; let mut peer_filter = move |peer_id| { - let in_topology = topology - .as_ref() - .map_or(false, |t| t.route_to_peer(required_routing, peer_id)); + let in_topology = topology.as_ref().map_or(false, |t| { + t.local_grid_neighbors().route_to_peer(required_routing, peer_id) + }); in_topology || { let route_random = random_routing.sample(total_peers, rng); if route_random { @@ -1564,7 +1580,10 @@ async fn adjust_required_routing_and_propagate network_bridge_event::NewGossipTopology { - let mut t = network_bridge_event::NewGossipTopology { - session, - our_neighbors_x: HashMap::new(), - our_neighbors_y: HashMap::new(), + // This builds a grid topology which is a square matrix. + // The local validator occupies the top left-hand corner. + // The X peers occupy the same row and the Y peers occupy + // the same column. + + let local_index = 1; + + assert_eq!( + neighbors_x.len(), + neighbors_y.len(), + "mocking grid topology only implemented for squares", + ); + + let d = neighbors_x.len() + 1; + + let grid_size = d * d; + assert!(grid_size > 0); + assert!(all_peers.len() >= grid_size); + + let peer_info = |i: usize| TopologyPeerInfo { + peer_ids: vec![all_peers[i].0.clone()], + validator_index: ValidatorIndex::from(i as u32), + discovery_id: all_peers[i].1.clone(), }; - for &i in neighbors_x { - t.our_neighbors_x.insert( - all_peers[i].1.clone(), - network_bridge_event::TopologyPeerInfo { - peer_ids: vec![all_peers[i].0.clone()], - validator_index: ValidatorIndex::from(i as u32), - }, - ); + let mut canonical_shuffling: Vec<_> = (0..) + .filter(|i| local_index != *i) + .filter(|i| !neighbors_x.contains(i)) + .filter(|i| !neighbors_y.contains(i)) + .take(grid_size) + .map(peer_info) + .collect(); + + // filled with junk except for own. + let mut shuffled_indices = vec![d + 1; grid_size]; + shuffled_indices[local_index] = 0; + canonical_shuffling[0] = peer_info(local_index); + + for (x_pos, v) in neighbors_x.iter().enumerate() { + let pos = 1 + x_pos; + canonical_shuffling[pos] = peer_info(*v); } - for &i in neighbors_y { - t.our_neighbors_y.insert( - all_peers[i].1.clone(), - network_bridge_event::TopologyPeerInfo { - peer_ids: vec![all_peers[i].0.clone()], - validator_index: ValidatorIndex::from(i as u32), - }, - ); + for (y_pos, v) in neighbors_y.iter().enumerate() { + let pos = d * (1 + y_pos); + canonical_shuffling[pos] = peer_info(*v); + } + + let topology = SessionGridTopology::new(shuffled_indices, canonical_shuffling); + + // sanity check. + { + let g_n = topology + .compute_grid_neighbors_for(ValidatorIndex(local_index as _)) + .expect("topology just constructed with this validator index"); + + assert_eq!(g_n.validator_indices_x.len(), neighbors_x.len()); + assert_eq!(g_n.validator_indices_y.len(), neighbors_y.len()); + + for i in neighbors_x { + assert!(g_n.validator_indices_x.contains(&ValidatorIndex(*i as _))); + } + + for i in neighbors_y { + assert!(g_n.validator_indices_y.contains(&ValidatorIndex(*i as _))); + } } - t + network_bridge_event::NewGossipTopology { + session, + topology, + local_index: Some(ValidatorIndex(local_index as _)), + } } async fn setup_gossip_topology( diff --git a/node/network/bitfield-distribution/Cargo.toml b/node/network/bitfield-distribution/Cargo.toml index e3a4fd3d2095..45df93f00e29 100644 --- a/node/network/bitfield-distribution/Cargo.toml +++ b/node/network/bitfield-distribution/Cargo.toml @@ -18,6 +18,7 @@ polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } maplit = "1.0.2" diff --git a/node/network/bitfield-distribution/src/lib.rs b/node/network/bitfield-distribution/src/lib.rs index a0f82dc5ed1d..1b2167484b49 100644 --- a/node/network/bitfield-distribution/src/lib.rs +++ b/node/network/bitfield-distribution/src/lib.rs @@ -27,7 +27,7 @@ use futures::{channel::oneshot, FutureExt}; use polkadot_node_network_protocol::{ self as net_protocol, grid_topology::{ - RandomRouting, RequiredRouting, SessionBoundGridTopologyStorage, SessionGridTopology, + GridNeighbors, RandomRouting, RequiredRouting, SessionBoundGridTopologyStorage, }, v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, Versioned, View, }; @@ -327,7 +327,7 @@ async fn handle_bitfield_distribution( }; let msg = BitfieldGossipMessage { relay_parent, signed_availability }; - let topology = state.topologies.get_topology_or_fallback(session_idx); + let topology = state.topologies.get_topology_or_fallback(session_idx).local_grid_neighbors(); let required_routing = topology.required_routing_by_index(validator_index, true); relay_message( @@ -352,7 +352,7 @@ async fn handle_bitfield_distribution( async fn relay_message( ctx: &mut Context, job_data: &mut PerRelayParentData, - topology: &SessionGridTopology, + topology_neighbors: &GridNeighbors, peer_views: &mut HashMap, validator: ValidatorId, message: BitfieldGossipMessage, @@ -384,7 +384,7 @@ async fn relay_message( let message_needed = job_data.message_from_validator_needed_by_peer(&peer, &validator); if message_needed { - let in_topology = topology.route_to_peer(required_routing, &peer); + let in_topology = topology_neighbors.route_to_peer(required_routing, &peer); let need_routing = in_topology || { let route_random = random_routing.sample(total_peers, rng); if route_random { @@ -533,7 +533,8 @@ async fn process_incoming_peer_message( let topology = state .topologies - .get_topology_or_fallback(job_data.signing_context.session_index); + .get_topology_or_fallback(job_data.signing_context.session_index) + .local_grid_neighbors(); let required_routing = topology.required_routing_by_index(validator_index, false); metrics.on_bitfield_received(); @@ -579,14 +580,24 @@ async fn handle_network_msg( }, NetworkBridgeEvent::NewGossipTopology(gossip_topology) => { let session_index = gossip_topology.session; - let new_topology = SessionGridTopology::from(gossip_topology); - let newly_added = new_topology.peers_diff(&new_topology); - state.topologies.update_topology(session_index, new_topology); + let new_topology = gossip_topology.topology; + let prev_neighbors = + state.topologies.get_current_topology().local_grid_neighbors().clone(); + + state.topologies.update_topology( + session_index, + new_topology, + gossip_topology.local_index, + ); + let current_topology = state.topologies.get_current_topology(); + + let newly_added = current_topology.local_grid_neighbors().peers_diff(&prev_neighbors); + gum::debug!( target: LOG_TARGET, ?session_index, - "New gossip topology received {} unseen peers", - newly_added.len() + newly_added_peers = ?newly_added.len(), + "New gossip topology received", ); for new_peer in newly_added { @@ -651,7 +662,7 @@ async fn handle_peer_view_change( .cloned() .collect::>(); - let topology = state.topologies.get_current_topology(); + let topology = state.topologies.get_current_topology().local_grid_neighbors(); let is_gossip_peer = topology.route_to_peer(RequiredRouting::GridXY, &origin); let lucky = is_gossip_peer || util::gen_ratio_rng( diff --git a/node/network/bitfield-distribution/src/tests.rs b/node/network/bitfield-distribution/src/tests.rs index f3894d61c5f9..5eb610fe8508 100644 --- a/node/network/bitfield-distribution/src/tests.rs +++ b/node/network/bitfield-distribution/src/tests.rs @@ -20,8 +20,10 @@ use bitvec::bitvec; use futures::executor; use maplit::hashmap; use polkadot_node_network_protocol::{ - grid_topology::SessionBoundGridTopologyStorage, our_view, peer_set::ValidationVersion, view, - ObservedRole, + grid_topology::{SessionBoundGridTopologyStorage, SessionGridTopology, TopologyPeerInfo}, + our_view, + peer_set::ValidationVersion, + view, ObservedRole, }; use polkadot_node_subsystem::{ jaeger, @@ -32,6 +34,7 @@ use polkadot_node_subsystem_util::TimeoutExt; use polkadot_primitives::v2::{AvailabilityBitfield, Signed, ValidatorIndex}; use rand_chacha::ChaCha12Rng; use sp_application_crypto::AppKey; +use sp_authority_discovery::AuthorityPair as AuthorityDiscoveryPair; use sp_core::Pair as PairT; use sp_keyring::Sr25519Keyring; use sp_keystore::{testing::KeyStore, SyncCryptoStore, SyncCryptoStorePtr}; @@ -61,10 +64,11 @@ fn prewarmed_state( peers: Vec, ) -> ProtocolState { let relay_parent = known_message.relay_parent.clone(); - let mut topology: SessionGridTopology = Default::default(); - topology.peers_x = peers.iter().cloned().collect(); let mut topologies = SessionBoundGridTopologyStorage::default(); - topologies.update_topology(0_u32, topology); + topologies.update_topology(0_u32, SessionGridTopology::new(Vec::new(), Vec::new()), None); + topologies.get_current_topology_mut().local_grid_neighbors_mut().peers_x = + peers.iter().cloned().collect(); + ProtocolState { per_relay_parent: hashmap! { relay_parent.clone() => @@ -456,10 +460,9 @@ fn do_not_relay_message_twice() { let mut rng = dummy_rng(); executor::block_on(async move { - let gossip_peers = SessionGridTopology { - peers_x: HashSet::from_iter(vec![peer_a.clone(), peer_b.clone()].into_iter()), - ..Default::default() - }; + let mut gossip_peers = GridNeighbors::empty(); + gossip_peers.peers_x = HashSet::from_iter(vec![peer_a.clone(), peer_b.clone()].into_iter()); + relay_message( &mut ctx, state.per_relay_parent.get_mut(&hash).unwrap(), @@ -780,33 +783,43 @@ fn topology_test() { .try_init(); let hash: Hash = [0; 32].into(); - let peers_x = (0..25).map(|_| PeerId::random()).collect::>(); - let peers_y = (0..25).map(|_| PeerId::random()).collect::>(); - - // ensure all unique - assert_eq!( - peers_x.iter().chain(peers_y.iter()).collect::>().len(), - peers_x.len() + peers_y.len() - ); // validator 0 key pair let (mut state, signing_context, keystore, validator) = state_with_view(our_view![hash], hash); - // Create a simple grid - let mut topology: SessionGridTopology = Default::default(); - topology.peers_x = peers_x.iter().cloned().collect::>(); - topology.validator_indices_x = peers_x + // Create a simple grid without any shuffling. We occupy position 1. + let topology_peer_info: Vec<_> = (0..49) + .map(|i| TopologyPeerInfo { + peer_ids: vec![PeerId::random()], + validator_index: ValidatorIndex(i as _), + discovery_id: AuthorityDiscoveryPair::generate().0.public(), + }) + .collect(); + + let topology = SessionGridTopology::new((0usize..49).collect(), topology_peer_info.clone()); + state.topologies.update_topology(0_u32, topology, Some(ValidatorIndex(1))); + + let peers_x: Vec<_> = [0, 2, 3, 4, 5, 6] .iter() - .enumerate() - .map(|(idx, _)| ValidatorIndex(idx as u32)) - .collect::>(); - topology.peers_y = peers_y.iter().cloned().collect::>(); - topology.validator_indices_y = peers_y + .cloned() + .map(|i| topology_peer_info[i].peer_ids[0].clone()) + .collect(); + + let peers_y: Vec<_> = [8, 15, 22, 29, 36, 43] .iter() - .enumerate() - .map(|(idx, _)| ValidatorIndex((idx + peers_x.len()) as u32)) - .collect::>(); - state.topologies.update_topology(0_u32, topology); + .cloned() + .map(|i| topology_peer_info[i].peer_ids[0].clone()) + .collect(); + + { + let t = state.topologies.get_current_topology().local_grid_neighbors(); + for p_x in &peers_x { + assert!(t.peers_x.contains(p_x)); + } + for p_y in &peers_y { + assert!(t.peers_y.contains(p_y)); + } + } // create a signed message by validator 0 let payload = AvailabilityBitfield(bitvec![u8, bitvec::order::Lsb0; 1u8; 32]); @@ -860,7 +873,7 @@ fn topology_test() { AllMessages::NetworkBridgeTx( NetworkBridgeTxMessage::SendValidationMessage(peers, send_msg), ) => { - let topology = state.topologies.get_current_topology(); + let topology = state.topologies.get_current_topology().local_grid_neighbors(); // It should send message to all peers in y direction and to 4 random peers in x direction assert_eq!(peers_y.len() + 4, peers.len()); assert!(topology.peers_y.iter().all(|peer| peers.contains(&peer))); diff --git a/node/network/bridge/src/rx/mod.rs b/node/network/bridge/src/rx/mod.rs index b93024b43dfb..a08596cd15ac 100644 --- a/node/network/bridge/src/rx/mod.rs +++ b/node/network/bridge/src/rx/mod.rs @@ -27,6 +27,7 @@ use sp_consensus::SyncOracle; use polkadot_node_network_protocol::{ self as net_protocol, + grid_topology::{SessionGridTopology, TopologyPeerInfo}, peer_set::{ CollationVersion, PeerSet, PeerSetProtocolNames, PerPeerSet, ProtocolVersion, ValidationVersion, @@ -37,10 +38,9 @@ use polkadot_node_network_protocol::{ use polkadot_node_subsystem::{ errors::SubsystemError, messages::{ - network_bridge_event::{NewGossipTopology, TopologyPeerInfo}, - ApprovalDistributionMessage, BitfieldDistributionMessage, CollatorProtocolMessage, - GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeRxMessage, - StatementDistributionMessage, + network_bridge_event::NewGossipTopology, ApprovalDistributionMessage, + BitfieldDistributionMessage, CollatorProtocolMessage, GossipSupportMessage, + NetworkBridgeEvent, NetworkBridgeRxMessage, StatementDistributionMessage, }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, }; @@ -129,28 +129,6 @@ where } } -async fn update_gossip_peers_1d( - ads: &mut AD, - neighbors: N, -) -> HashMap -where - AD: validator_discovery::AuthorityDiscovery, - N: IntoIterator, - N::IntoIter: std::iter::ExactSizeIterator, -{ - let neighbors = neighbors.into_iter(); - let mut peers = HashMap::with_capacity(neighbors.len()); - for (authority, validator_index) in neighbors { - let addr = get_peer_id_by_authority_id(ads, authority.clone()).await; - - if let Some(peer_id) = addr { - peers.insert(authority, TopologyPeerInfo { peer_ids: vec![peer_id], validator_index }); - } - } - - peers -} - async fn handle_network_messages( mut sender: impl overseer::NetworkBridgeRxSenderTrait, mut network_service: impl Network, @@ -507,6 +485,26 @@ where } } +async fn flesh_out_topology_peers(ads: &mut AD, neighbors: N) -> Vec +where + AD: validator_discovery::AuthorityDiscovery, + N: IntoIterator, + N::IntoIter: std::iter::ExactSizeIterator, +{ + let neighbors = neighbors.into_iter(); + let mut peers = Vec::with_capacity(neighbors.len()); + for (discovery_id, validator_index) in neighbors { + let addr = get_peer_id_by_authority_id(ads, discovery_id.clone()).await; + peers.push(TopologyPeerInfo { + peer_ids: addr.into_iter().collect(), + validator_index, + discovery_id, + }); + } + + peers +} + #[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)] async fn run_incoming_orchestra_signals( mut ctx: Context, @@ -532,29 +530,28 @@ where msg: NetworkBridgeRxMessage::NewGossipTopology { session, - our_neighbors_x, - our_neighbors_y, + local_index, + canonical_shuffling, + shuffled_indices, }, } => { gum::debug!( target: LOG_TARGET, action = "NewGossipTopology", - neighbors_x = our_neighbors_x.len(), - neighbors_y = our_neighbors_y.len(), + ?session, + ?local_index, "Gossip topology has changed", ); - let gossip_peers_x = - update_gossip_peers_1d(&mut authority_discovery_service, our_neighbors_x).await; - - let gossip_peers_y = - update_gossip_peers_1d(&mut authority_discovery_service, our_neighbors_y).await; + let topology_peers = + flesh_out_topology_peers(&mut authority_discovery_service, canonical_shuffling) + .await; dispatch_validation_event_to_all_unbounded( NetworkBridgeEvent::NewGossipTopology(NewGossipTopology { session, - our_neighbors_x: gossip_peers_x, - our_neighbors_y: gossip_peers_y, + topology: SessionGridTopology::new(shuffled_indices, topology_peers), + local_index, }), ctx.sender(), ); diff --git a/node/network/gossip-support/src/lib.rs b/node/network/gossip-support/src/lib.rs index 823835aa7638..36459f9c8dab 100644 --- a/node/network/gossip-support/src/lib.rs +++ b/node/network/gossip-support/src/lib.rs @@ -525,73 +525,37 @@ async fn update_gossip_topology( sp_core::blake2_256(&subject) }; - // shuffle the indices - let mut rng: ChaCha20Rng = SeedableRng::from_seed(random_seed); - let len = authorities.len(); - let mut indices: Vec = (0..len).collect(); - indices.shuffle(&mut rng); - let our_shuffled_position = indices - .iter() - .position(|i| *i == our_index) - .expect("our_index < len; indices contains it; qed"); - - let neighbors = matrix_neighbors(our_shuffled_position, len); - let row_neighbors = neighbors - .row_neighbors - .map(|i| indices[i]) - .map(|i| (authorities[i].clone(), ValidatorIndex::from(i as u32))) - .collect(); - - let column_neighbors = neighbors - .column_neighbors - .map(|i| indices[i]) - .map(|i| (authorities[i].clone(), ValidatorIndex::from(i as u32))) - .collect(); + // shuffle the validators and create the index mapping + let (shuffled_indices, canonical_shuffling) = { + let mut rng: ChaCha20Rng = SeedableRng::from_seed(random_seed); + let len = authorities.len(); + let mut shuffled_indices = vec![0; len]; + let mut canonical_shuffling: Vec<_> = authorities + .iter() + .enumerate() + .map(|(i, a)| (a.clone(), ValidatorIndex(i as _))) + .collect(); + + canonical_shuffling.shuffle(&mut rng); + for (i, (_, validator_index)) in canonical_shuffling.iter().enumerate() { + shuffled_indices[validator_index.0 as usize] = i; + } + + (shuffled_indices, canonical_shuffling) + }; sender .send_message(NetworkBridgeRxMessage::NewGossipTopology { session: session_index, - our_neighbors_x: row_neighbors, - our_neighbors_y: column_neighbors, + local_index: Some(ValidatorIndex(our_index as _)), + canonical_shuffling, + shuffled_indices, }) .await; Ok(()) } -struct MatrixNeighbors { - row_neighbors: R, - column_neighbors: C, -} - -/// Compute our row and column neighbors in a matrix -fn matrix_neighbors( - our_index: usize, - len: usize, -) -> MatrixNeighbors, impl Iterator> { - assert!(our_index < len, "our_index is computed using `enumerate`; qed"); - - // e.g. for size 11 the matrix would be - // - // 0 1 2 - // 3 4 5 - // 6 7 8 - // 9 10 - // - // and for index 10, the neighbors would be 1, 4, 7, 9 - - let sqrt = (len as f64).sqrt() as usize; - let our_row = our_index / sqrt; - let our_column = our_index % sqrt; - let row_neighbors = our_row * sqrt..std::cmp::min(our_row * sqrt + sqrt, len); - let column_neighbors = (our_column..len).step_by(sqrt); - - MatrixNeighbors { - row_neighbors: row_neighbors.filter(move |i| *i != our_index), - column_neighbors: column_neighbors.filter(move |i| *i != our_index), - } -} - #[overseer::subsystem(GossipSupport, error = SubsystemError, prefix = self::overseer)] impl GossipSupport where diff --git a/node/network/gossip-support/src/tests.rs b/node/network/gossip-support/src/tests.rs index cde47e2ba977..79f2a9a6db42 100644 --- a/node/network/gossip-support/src/tests.rs +++ b/node/network/gossip-support/src/tests.rs @@ -29,6 +29,7 @@ use sp_consensus_babe::{AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch use sp_core::crypto::Pair as PairT; use sp_keyring::Sr25519Keyring; +use polkadot_node_network_protocol::grid_topology::{SessionGridTopology, TopologyPeerInfo}; use polkadot_node_subsystem::{ jaeger, messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest}, @@ -73,13 +74,15 @@ lazy_static! { // [1 3] // [0 ] - static ref ROW_NEIGHBORS: Vec<(AuthorityDiscoveryId, ValidatorIndex)> = vec![ - (Sr25519Keyring::Charlie.public().into(), ValidatorIndex::from(2)), + static ref EXPECTED_SHUFFLING: Vec = vec![6, 4, 0, 5, 2, 3, 1]; + + static ref ROW_NEIGHBORS: Vec = vec![ + ValidatorIndex::from(2), ]; - static ref COLUMN_NEIGHBORS: Vec<(AuthorityDiscoveryId, ValidatorIndex)> = vec![ - (Sr25519Keyring::Two.public().into(), ValidatorIndex::from(5)), - (Sr25519Keyring::Eve.public().into(), ValidatorIndex::from(3)), + static ref COLUMN_NEIGHBORS: Vec = vec![ + ValidatorIndex::from(3), + ValidatorIndex::from(5), ]; } @@ -257,12 +260,31 @@ async fn test_neighbors(overseer: &mut VirtualOverseer, expected_session: Sessio overseer_recv(overseer).await, AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::NewGossipTopology { session: got_session, - our_neighbors_x, - our_neighbors_y, + local_index, + canonical_shuffling, + shuffled_indices, }) => { assert_eq!(expected_session, got_session); - let mut got_row: Vec<_> = our_neighbors_x.into_iter().collect(); - let mut got_column: Vec<_> = our_neighbors_y.into_iter().collect(); + assert_eq!(local_index, Some(ValidatorIndex(6))); + assert_eq!(shuffled_indices, EXPECTED_SHUFFLING.clone()); + + let grid_topology = SessionGridTopology::new( + shuffled_indices, + canonical_shuffling.into_iter() + .map(|(a, v)| TopologyPeerInfo { + validator_index: v, + discovery_id: a, + peer_ids: Vec::new(), + }) + .collect(), + ); + + let grid_neighbors = grid_topology + .compute_grid_neighbors_for(local_index.unwrap()) + .unwrap(); + + let mut got_row: Vec<_> = grid_neighbors.validator_indices_x.into_iter().collect(); + let mut got_column: Vec<_> = grid_neighbors.validator_indices_y.into_iter().collect(); got_row.sort(); got_column.sort(); assert_eq!(got_row, ROW_NEIGHBORS.clone()); @@ -694,26 +716,3 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { assert_eq!(state.last_session_index, Some(1)); assert!(state.last_failure.is_none()); } - -#[test] -fn test_matrix_neighbors() { - for (our_index, len, expected_row, expected_column) in vec![ - (0usize, 1usize, vec![], vec![]), - (1, 2, vec![], vec![0usize]), - (0, 9, vec![1, 2], vec![3, 6]), - (9, 10, vec![], vec![0, 3, 6]), - (10, 11, vec![9], vec![1, 4, 7]), - (7, 11, vec![6, 8], vec![1, 4, 10]), - ] - .into_iter() - { - let matrix = matrix_neighbors(our_index, len); - let mut row_result: Vec<_> = matrix.row_neighbors.collect(); - let mut column_result: Vec<_> = matrix.column_neighbors.collect(); - row_result.sort(); - column_result.sort(); - - assert_eq!(row_result, expected_row); - assert_eq!(column_result, expected_column); - } -} diff --git a/node/network/protocol/src/grid_topology.rs b/node/network/protocol/src/grid_topology.rs index 73de9cfc25b1..100ef66957bd 100644 --- a/node/network/protocol/src/grid_topology.rs +++ b/node/network/protocol/src/grid_topology.rs @@ -30,7 +30,7 @@ //! use crate::PeerId; -use polkadot_primitives::v2::{SessionIndex, ValidatorIndex}; +use polkadot_primitives::v2::{AuthorityDiscoveryId, SessionIndex, ValidatorIndex}; use rand::{CryptoRng, Rng}; use std::{ collections::{hash_map, HashMap, HashSet}, @@ -48,9 +48,106 @@ pub const DEFAULT_RANDOM_SAMPLE_RATE: usize = crate::MIN_GOSSIP_PEERS; /// The number of peers to randomly propagate messages to. pub const DEFAULT_RANDOM_CIRCULATION: usize = 4; -/// Topology representation -#[derive(Default, Clone, Debug)] +/// Information about a peer in the gossip topology for a session. +#[derive(Debug, Clone, PartialEq)] +pub struct TopologyPeerInfo { + /// The validator's known peer IDs. + pub peer_ids: Vec, + /// The index of the validator in the discovery keys of the corresponding + /// `SessionInfo`. This can extend _beyond_ the set of active parachain validators. + pub validator_index: ValidatorIndex, + /// The authority discovery public key of the validator in the corresponding + /// `SessionInfo`. + pub discovery_id: AuthorityDiscoveryId, +} + +/// Topology representation for a session. +#[derive(Default, Clone, Debug, PartialEq)] pub struct SessionGridTopology { + /// An array mapping validator indices to their indices in the + /// shuffling itself. This has the same size as the number of validators + /// in the session. + shuffled_indices: Vec, + /// The canonical shuffling of validators for the session. + canonical_shuffling: Vec, +} + +impl SessionGridTopology { + /// Create a new session grid topology. + pub fn new(shuffled_indices: Vec, canonical_shuffling: Vec) -> Self { + SessionGridTopology { shuffled_indices, canonical_shuffling } + } + + /// Produces the outgoing routing logic for a particular peer. + /// + /// Returns `None` if the validator index is out of bounds. + pub fn compute_grid_neighbors_for(&self, v: ValidatorIndex) -> Option { + if self.shuffled_indices.len() != self.canonical_shuffling.len() { + return None + } + let shuffled_val_index = *self.shuffled_indices.get(v.0 as usize)?; + + let neighbors = matrix_neighbors(shuffled_val_index, self.shuffled_indices.len())?; + + let mut grid_subset = GridNeighbors::empty(); + for r_n in neighbors.row_neighbors { + let n = &self.canonical_shuffling[r_n]; + grid_subset.validator_indices_x.insert(n.validator_index); + for p in &n.peer_ids { + grid_subset.peers_x.insert(p.clone()); + } + } + + for c_n in neighbors.column_neighbors { + let n = &self.canonical_shuffling[c_n]; + grid_subset.validator_indices_y.insert(n.validator_index); + for p in &n.peer_ids { + grid_subset.peers_y.insert(p.clone()); + } + } + + Some(grid_subset) + } +} + +struct MatrixNeighbors { + row_neighbors: R, + column_neighbors: C, +} + +/// Compute the row and column neighbors of `val_index` in a matrix +fn matrix_neighbors( + val_index: usize, + len: usize, +) -> Option, impl Iterator>> { + if val_index >= len { + return None + } + + // e.g. for size 11 the matrix would be + // + // 0 1 2 + // 3 4 5 + // 6 7 8 + // 9 10 + // + // and for index 10, the neighbors would be 1, 4, 7, 9 + + let sqrt = (len as f64).sqrt() as usize; + let our_row = val_index / sqrt; + let our_column = val_index % sqrt; + let row_neighbors = our_row * sqrt..std::cmp::min(our_row * sqrt + sqrt, len); + let column_neighbors = (our_column..len).step_by(sqrt); + + Some(MatrixNeighbors { + row_neighbors: row_neighbors.filter(move |i| *i != val_index), + column_neighbors: column_neighbors.filter(move |i| *i != val_index), + }) +} + +/// Information about the grid neighbors for a particular node in the topology. +#[derive(Debug, Clone, PartialEq)] +pub struct GridNeighbors { /// Represent peers in the X axis pub peers_x: HashSet, /// Represent validators in the X axis @@ -61,7 +158,18 @@ pub struct SessionGridTopology { pub validator_indices_y: HashSet, } -impl SessionGridTopology { +impl GridNeighbors { + /// Utility function for creating an empty set of grid neighbors. + /// Useful for testing. + pub fn empty() -> Self { + GridNeighbors { + peers_x: HashSet::new(), + validator_indices_x: HashSet::new(), + peers_y: HashSet::new(), + validator_indices_y: HashSet::new(), + } + } + /// Given the originator of a message as a validator index, indicates the part of the topology /// we're meant to send the message to. pub fn required_routing_by_index( @@ -123,7 +231,7 @@ impl SessionGridTopology { } /// Returns the difference between this and the `other` topology as a vector of peers - pub fn peers_diff(&self, other: &SessionGridTopology) -> Vec { + pub fn peers_diff(&self, other: &Self) -> Vec { self.peers_x .iter() .chain(self.peers_y.iter()) @@ -138,15 +246,39 @@ impl SessionGridTopology { } } +/// An entry tracking a session grid topology and some cached local neighbors. +#[derive(Debug)] +pub struct SessionGridTopologyEntry { + topology: SessionGridTopology, + local_neighbors: GridNeighbors, +} + +impl SessionGridTopologyEntry { + /// Access the local grid neighbors. + pub fn local_grid_neighbors(&self) -> &GridNeighbors { + &self.local_neighbors + } + + /// Access the local grid neighbors mutably. + pub fn local_grid_neighbors_mut(&mut self) -> &mut GridNeighbors { + &mut self.local_neighbors + } + + /// Access the underlying topology. + pub fn get(&self) -> &SessionGridTopology { + &self.topology + } +} + /// A set of topologies indexed by session #[derive(Default)] pub struct SessionGridTopologies { - inner: HashMap, usize)>, + inner: HashMap, usize)>, } impl SessionGridTopologies { /// Returns a topology for the specific session index - pub fn get_topology(&self, session: SessionIndex) -> Option<&SessionGridTopology> { + pub fn get_topology(&self, session: SessionIndex) -> Option<&SessionGridTopologyEntry> { self.inner.get(&session).and_then(|val| val.0.as_ref()) } @@ -166,63 +298,112 @@ impl SessionGridTopologies { } /// Insert a new topology, no-op if already present. - pub fn insert_topology(&mut self, session: SessionIndex, topology: SessionGridTopology) { + pub fn insert_topology( + &mut self, + session: SessionIndex, + topology: SessionGridTopology, + local_index: Option, + ) { let entry = self.inner.entry(session).or_insert((None, 0)); if entry.0.is_none() { - entry.0 = Some(topology); + let local_neighbors = local_index + .and_then(|l| topology.compute_grid_neighbors_for(l)) + .unwrap_or_else(GridNeighbors::empty); + + entry.0 = Some(SessionGridTopologyEntry { topology, local_neighbors }); } } } /// A simple storage for a topology and the corresponding session index -#[derive(Default, Debug)] -pub struct GridTopologySessionBound { - topology: SessionGridTopology, +#[derive(Debug)] +struct GridTopologySessionBound { + entry: SessionGridTopologyEntry, session_index: SessionIndex, } /// A storage for the current and maybe previous topology -#[derive(Default, Debug)] +#[derive(Debug)] pub struct SessionBoundGridTopologyStorage { current_topology: GridTopologySessionBound, prev_topology: Option, } +impl Default for SessionBoundGridTopologyStorage { + fn default() -> Self { + // having this struct be `Default` is objectively stupid + // but used in a few places + SessionBoundGridTopologyStorage { + current_topology: GridTopologySessionBound { + // session 0 is valid so we should use the upper bound + // as the default instead of the lower bound. + session_index: SessionIndex::max_value(), + entry: SessionGridTopologyEntry { + topology: SessionGridTopology { + shuffled_indices: Vec::new(), + canonical_shuffling: Vec::new(), + }, + local_neighbors: GridNeighbors::empty(), + }, + }, + prev_topology: None, + } + } +} + impl SessionBoundGridTopologyStorage { /// Return a grid topology based on the session index: /// If we need a previous session and it is registered in the storage, then return that session. /// Otherwise, return a current session to have some grid topology in any case - pub fn get_topology_or_fallback(&self, idx: SessionIndex) -> &SessionGridTopology { - self.get_topology(idx).unwrap_or(&self.current_topology.topology) + pub fn get_topology_or_fallback(&self, idx: SessionIndex) -> &SessionGridTopologyEntry { + self.get_topology(idx).unwrap_or(&self.current_topology.entry) } /// Return the grid topology for the specific session index, if no such a session is stored /// returns `None`. - pub fn get_topology(&self, idx: SessionIndex) -> Option<&SessionGridTopology> { + pub fn get_topology(&self, idx: SessionIndex) -> Option<&SessionGridTopologyEntry> { if let Some(prev_topology) = &self.prev_topology { if idx == prev_topology.session_index { - return Some(&prev_topology.topology) + return Some(&prev_topology.entry) } } if self.current_topology.session_index == idx { - return Some(&self.current_topology.topology) + return Some(&self.current_topology.entry) } None } /// Update the current topology preserving the previous one - pub fn update_topology(&mut self, session_index: SessionIndex, topology: SessionGridTopology) { + pub fn update_topology( + &mut self, + session_index: SessionIndex, + topology: SessionGridTopology, + local_index: Option, + ) { + let local_neighbors = local_index + .and_then(|l| topology.compute_grid_neighbors_for(l)) + .unwrap_or_else(GridNeighbors::empty); + let old_current = std::mem::replace( &mut self.current_topology, - GridTopologySessionBound { topology, session_index }, + GridTopologySessionBound { + entry: SessionGridTopologyEntry { topology, local_neighbors }, + session_index, + }, ); self.prev_topology.replace(old_current); } /// Returns a current grid topology - pub fn get_current_topology(&self) -> &SessionGridTopology { - &self.current_topology.topology + pub fn get_current_topology(&self) -> &SessionGridTopologyEntry { + &self.current_topology.entry + } + + /// Access the current grid topology mutably. Dangerous and intended + /// to be used in tests. + pub fn get_current_topology_mut(&mut self) -> &mut SessionGridTopologyEntry { + &mut self.current_topology.entry } } @@ -365,4 +546,27 @@ mod tests { let mut random_routing = RandomRouting { target: 10, sent: 0, sample_rate: 10 }; assert_eq!(run_random_routing(&mut random_routing, &mut rng, 10, 100), 10); } + + #[test] + fn test_matrix_neighbors() { + for (our_index, len, expected_row, expected_column) in vec![ + (0usize, 1usize, vec![], vec![]), + (1, 2, vec![], vec![0usize]), + (0, 9, vec![1, 2], vec![3, 6]), + (9, 10, vec![], vec![0, 3, 6]), + (10, 11, vec![9], vec![1, 4, 7]), + (7, 11, vec![6, 8], vec![1, 4, 10]), + ] + .into_iter() + { + let matrix = matrix_neighbors(our_index, len).unwrap(); + let mut row_result: Vec<_> = matrix.row_neighbors.collect(); + let mut column_result: Vec<_> = matrix.column_neighbors.collect(); + row_result.sort(); + column_result.sort(); + + assert_eq!(row_result, expected_row); + assert_eq!(column_result, expected_column); + } + } } diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 274582420f5d..17ca5d8ea4ac 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -27,7 +27,7 @@ use parity_scale_codec::Encode; use polkadot_node_network_protocol::{ self as net_protocol, - grid_topology::{RequiredRouting, SessionBoundGridTopologyStorage, SessionGridTopology}, + grid_topology::{GridNeighbors, RequiredRouting, SessionBoundGridTopologyStorage}, peer_set::{IsAuthority, PeerSet}, request_response::{v1 as request_v1, IncomingRequestReceiver}, v1::{self as protocol_v1, StatementMetadata}, @@ -910,7 +910,10 @@ async fn circulate_statement_and_dependents( .with_candidate(statement.payload().candidate_hash()) .with_stage(jaeger::Stage::StatementDistribution); - let topology = topology_store.get_topology_or_fallback(active_head.session_index); + let topology = topology_store + .get_topology_or_fallback(active_head.session_index) + .local_grid_neighbors(); + // First circulate the statement directly to all peers needing it. // The borrow of `active_head` needs to encompass only this (Rust) statement. let outputs: Option<(CandidateHash, Vec)> = { @@ -1009,7 +1012,7 @@ fn is_statement_large(statement: &SignedFullStatement) -> (bool, Option) #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] async fn circulate_statement<'a, Context>( required_routing: RequiredRouting, - topology: &SessionGridTopology, + topology: &GridNeighbors, peers: &mut HashMap, ctx: &mut Context, relay_parent: Hash, @@ -1352,7 +1355,8 @@ async fn handle_incoming_message_and_circulate<'a, Context, R>( let session_index = runtime.get_session_index_for_child(ctx.sender(), relay_parent).await; let topology = match session_index { - Ok(session_index) => topology_storage.get_topology_or_fallback(session_index), + Ok(session_index) => + topology_storage.get_topology_or_fallback(session_index).local_grid_neighbors(), Err(e) => { gum::debug!( target: LOG_TARGET, @@ -1361,7 +1365,7 @@ async fn handle_incoming_message_and_circulate<'a, Context, R>( e ); - topology_storage.get_current_topology() + topology_storage.get_current_topology().local_grid_neighbors() }, }; let required_routing = @@ -1588,7 +1592,7 @@ async fn handle_incoming_message<'a, Context>( #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] async fn update_peer_view_and_maybe_send_unlocked( peer: PeerId, - topology: &SessionGridTopology, + topology: &GridNeighbors, peer_data: &mut PeerData, ctx: &mut Context, active_heads: &HashMap, @@ -1673,16 +1677,22 @@ async fn handle_network_update( let _ = metrics.time_network_bridge_update_v1("new_gossip_topology"); let new_session_index = topology.session; - let new_topology: SessionGridTopology = topology.into(); - let old_topology = topology_storage.get_current_topology(); - let newly_added = new_topology.peers_diff(old_topology); - topology_storage.update_topology(new_session_index, new_topology); + let new_topology = topology.topology; + let old_topology = + topology_storage.get_current_topology().local_grid_neighbors().clone(); + topology_storage.update_topology(new_session_index, new_topology, topology.local_index); + + let newly_added = topology_storage + .get_current_topology() + .local_grid_neighbors() + .peers_diff(&old_topology); + for peer in newly_added { if let Some(data) = peers.get_mut(&peer) { let view = std::mem::take(&mut data.view); update_peer_view_and_maybe_send_unlocked( peer, - topology_storage.get_current_topology(), + topology_storage.get_current_topology().local_grid_neighbors(), data, ctx, &*active_heads, @@ -1717,7 +1727,7 @@ async fn handle_network_update( Some(data) => update_peer_view_and_maybe_send_unlocked( peer, - topology_storage.get_current_topology(), + topology_storage.get_current_topology().local_grid_neighbors(), data, ctx, &*active_heads, diff --git a/node/network/statement-distribution/src/tests.rs b/node/network/statement-distribution/src/tests.rs index 3304ad86fcd5..f3b9db00aef4 100644 --- a/node/network/statement-distribution/src/tests.rs +++ b/node/network/statement-distribution/src/tests.rs @@ -20,6 +20,7 @@ use futures::executor::{self, block_on}; use futures_timer::Delay; use parity_scale_codec::{Decode, Encode}; use polkadot_node_network_protocol::{ + grid_topology::{SessionGridTopology, TopologyPeerInfo}, peer_set::ValidationVersion, request_response::{ v1::{StatementFetchingRequest, StatementFetchingResponse}, @@ -509,7 +510,7 @@ fn peer_view_update_sends_messages() { let peer = PeerId::random(); executor::block_on(async move { - let mut topology: SessionGridTopology = Default::default(); + let mut topology = GridNeighbors::empty(); topology.peers_x = HashSet::from_iter(vec![peer.clone()].into_iter()); update_peer_view_and_maybe_send_unlocked( peer.clone(), @@ -639,7 +640,7 @@ fn circulated_statement_goes_to_all_peers_with_view() { }; let statement = StoredStatement { comparator: &comparator, statement: &statement }; - let mut topology: SessionGridTopology = Default::default(); + let mut topology = GridNeighbors::empty(); topology.peers_x = HashSet::from_iter(vec![peer_a.clone(), peer_b.clone(), peer_c.clone()].into_iter()); let needs_dependents = circulate_statement( @@ -2019,42 +2020,77 @@ fn handle_multiple_seconded_statements() { .await; } - // Explicitly add all `lucky` peers to the gossip peers to ensure that neither `peerA` not `peerB` - // receive statements + // Set up a topology which puts peers a & b in a column together. let gossip_topology = { - let mut t = network_bridge_event::NewGossipTopology { - session: 1, - our_neighbors_x: HashMap::new(), - our_neighbors_y: HashMap::new(), - }; - - // Create a topology to ensure that we send messages not to `peer_a`/`peer_b` - for (i, peer) in lucky_peers.iter().enumerate() { - let authority_id = AuthorityPair::generate().0.public(); - t.our_neighbors_y.insert( - authority_id, - network_bridge_event::TopologyPeerInfo { - peer_ids: vec![peer.clone()], - validator_index: (i as u32 + 2_u32).into(), - }, - ); + // create a lucky_peers+1 * lucky_peers+1 grid topology where we are at index 2, sharing + // a row with peer_a (0) and peer_b (1) and a column with all the lucky peers. + // the rest is filled with junk. + // This is an absolute garbage hack depending on quirks of the implementation + // and not on sound architecture. + + let n_lucky = lucky_peers.len(); + let dim = n_lucky + 1; + let grid_size = dim * dim; + let topology_peer_info: Vec<_> = (0..grid_size) + .map(|i| { + if i == 0 { + TopologyPeerInfo { + peer_ids: vec![peer_a.clone()], + validator_index: ValidatorIndex(0), + discovery_id: AuthorityPair::generate().0.public(), + } + } else if i == 1 { + TopologyPeerInfo { + peer_ids: vec![peer_b.clone()], + validator_index: ValidatorIndex(1), + discovery_id: AuthorityPair::generate().0.public(), + } + } else if i == 2 { + TopologyPeerInfo { + peer_ids: vec![], + validator_index: ValidatorIndex(2), + discovery_id: AuthorityPair::generate().0.public(), + } + } else if (i - 2) % dim == 0 { + let lucky_index = ((i - 2) / dim) - 1; + TopologyPeerInfo { + peer_ids: vec![lucky_peers[lucky_index].clone()], + validator_index: ValidatorIndex(i as _), + discovery_id: AuthorityPair::generate().0.public(), + } + } else { + TopologyPeerInfo { + peer_ids: vec![PeerId::random()], + validator_index: ValidatorIndex(i as _), + discovery_id: AuthorityPair::generate().0.public(), + } + } + }) + .collect(); + + // also a hack: this is only required to be accurate for + // the validator indices we compute grid neighbors for. + let mut shuffled_indices = vec![0; grid_size]; + shuffled_indices[2] = 2; + + // Some sanity checking to make sure this hack is set up correctly. + let topology = SessionGridTopology::new(shuffled_indices, topology_peer_info); + let grid_neighbors = topology.compute_grid_neighbors_for(ValidatorIndex(2)).unwrap(); + assert_eq!(grid_neighbors.peers_x.len(), 25); + assert!(grid_neighbors.peers_x.contains(&peer_a)); + assert!(grid_neighbors.peers_x.contains(&peer_b)); + assert!(!grid_neighbors.peers_y.contains(&peer_b)); + assert!(!grid_neighbors.route_to_peer(RequiredRouting::GridY, &peer_b)); + assert_eq!(grid_neighbors.peers_y.len(), lucky_peers.len()); + for lucky in &lucky_peers { + assert!(grid_neighbors.peers_y.contains(lucky)); } - t.our_neighbors_x.insert( - AuthorityPair::generate().0.public(), - network_bridge_event::TopologyPeerInfo { - peer_ids: vec![peer_a.clone()], - validator_index: 0_u32.into(), - }, - ); - t.our_neighbors_x.insert( - AuthorityPair::generate().0.public(), - network_bridge_event::TopologyPeerInfo { - peer_ids: vec![peer_b.clone()], - validator_index: 1_u32.into(), - }, - ); - t + network_bridge_event::NewGossipTopology { + session: 1, + topology, + local_index: Some(ValidatorIndex(2)), + } }; handle diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index 121c707c2541..dee4c7cbbba9 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -873,8 +873,9 @@ fn test_network_bridge_tx_msg() -> NetworkBridgeTxMessage { fn test_network_bridge_rx_msg() -> NetworkBridgeRxMessage { NetworkBridgeRxMessage::NewGossipTopology { session: SessionIndex::from(0_u32), - our_neighbors_x: HashMap::new(), - our_neighbors_y: HashMap::new(), + local_index: None, + canonical_shuffling: Vec::new(), + shuffled_indices: Vec::new(), } } diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index a1520a9aeba8..6e4983813984 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -328,18 +328,13 @@ pub enum NetworkBridgeRxMessage { NewGossipTopology { /// The session info this gossip topology is concerned with. session: SessionIndex, - /// Ids of our neighbors in the X dimensions of the new gossip topology, - /// along with their validator indices within the session. - /// - /// We're not necessarily connected to all of them, but we should - /// try to be. - our_neighbors_x: HashMap, - /// Ids of our neighbors in the X dimensions of the new gossip topology, - /// along with their validator indices within the session. - /// - /// We're not necessarily connected to all of them, but we should - /// try to be. - our_neighbors_y: HashMap, + /// Our validator index in the session, if any. + local_index: Option, + /// The canonical shuffling of validators for the session. + canonical_shuffling: Vec<(AuthorityDiscoveryId, ValidatorIndex)>, + /// The reverse mapping of `canonical_shuffling`: from validator index + /// to the index in `canonical_shuffling` + shuffled_indices: Vec, }, } diff --git a/node/subsystem-types/src/messages/network_bridge_event.rs b/node/subsystem-types/src/messages/network_bridge_event.rs index cd0bb9894b6b..5abad8a3c22c 100644 --- a/node/subsystem-types/src/messages/network_bridge_event.rs +++ b/node/subsystem-types/src/messages/network_bridge_event.rs @@ -14,10 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::{ - collections::{HashMap, HashSet}, - convert::TryFrom, -}; +use std::{collections::HashSet, convert::TryFrom}; pub use sc_network::{PeerId, ReputationChange}; @@ -27,25 +24,15 @@ use polkadot_node_network_protocol::{ }; use polkadot_primitives::v2::{AuthorityDiscoveryId, SessionIndex, ValidatorIndex}; -/// Information about a peer in the gossip topology for a session. -#[derive(Debug, Clone, PartialEq)] -pub struct TopologyPeerInfo { - /// The validator's known peer IDs. - pub peer_ids: Vec, - /// The index of the validator in the discovery keys of the corresponding - /// `SessionInfo`. This can extend _beyond_ the set of active parachain validators. - pub validator_index: ValidatorIndex, -} - /// A struct indicating new gossip topology. #[derive(Debug, Clone, PartialEq)] pub struct NewGossipTopology { /// The session index this topology corresponds to. pub session: SessionIndex, - /// Neighbors in the 'X' dimension of the grid. - pub our_neighbors_x: HashMap, - /// Neighbors in the 'Y' dimension of the grid. - pub our_neighbors_y: HashMap, + /// The topology itself. + pub topology: SessionGridTopology, + /// The local validator index, if any. + pub local_index: Option, } /// Events from network. @@ -122,19 +109,3 @@ impl NetworkBridgeEvent { }) } } - -impl From for SessionGridTopology { - fn from(topology: NewGossipTopology) -> Self { - let peers_x = - topology.our_neighbors_x.values().flat_map(|p| &p.peer_ids).cloned().collect(); - let peers_y = - topology.our_neighbors_y.values().flat_map(|p| &p.peer_ids).cloned().collect(); - - let validator_indices_x = - topology.our_neighbors_x.values().map(|p| p.validator_index.clone()).collect(); - let validator_indices_y = - topology.our_neighbors_y.values().map(|p| p.validator_index.clone()).collect(); - - SessionGridTopology { peers_x, peers_y, validator_indices_x, validator_indices_y } - } -} diff --git a/roadmap/implementers-guide/src/types/network.md b/roadmap/implementers-guide/src/types/network.md index 0d09a682cff2..b698ca2075bf 100644 --- a/roadmap/implementers-guide/src/types/network.md +++ b/roadmap/implementers-guide/src/types/network.md @@ -145,10 +145,19 @@ These updates are posted from the [Network Bridge Subsystem](../node/utility/net struct NewGossipTopology { /// The session index this topology corresponds to. session: SessionIndex, - /// Neighbors in the 'X' dimension of the grid. - our_neighbors_x: HashMap, - /// Neighbors in the 'Y' dimension of the grid. - our_neighbors_y: HashMap, + /// The topology itself. + topology: SessionGridTopology, + /// The local validator index, if any. + local_index: Option, +} + +struct SessionGridTopology { + /// An array mapping validator indices to their indices in the + /// shuffling itself. This has the same size as the number of validators + /// in the session. + shuffled_indices: Vec, + /// The canonical shuffling of validators for the session. + canonical_shuffling: Vec, } struct TopologyPeerInfo { @@ -157,6 +166,9 @@ struct TopologyPeerInfo { /// The index of the validator in the discovery keys of the corresponding /// `SessionInfo`. This can extend _beyond_ the set of active parachain validators. validator_index: ValidatorIndex, + /// The authority discovery public key of the validator in the corresponding + /// `SessionInfo`. + discovery_id: AuthorityDiscoveryId, } enum NetworkBridgeEvent { diff --git a/roadmap/implementers-guide/src/types/overseer-protocol.md b/roadmap/implementers-guide/src/types/overseer-protocol.md index b2559c4cfda7..4b9dc97c27e2 100644 --- a/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -555,14 +555,15 @@ enum NetworkBridgeMessage { /// Inform the distribution subsystems about the new /// gossip network topology formed. NewGossipTopology { - /// The session this topology corresponds to. - session: SessionIndex, - /// Ids of our neighbors in the X dimension of the new gossip topology. - /// We're not necessarily connected to all of them, but we should try to be. - our_neighbors_x: HashSet, - /// Ids of our neighbors in the Y dimension of the new gossip topology. - /// We're not necessarily connected to all of them, but we should try to be. - our_neighbors_y: HashSet, + /// The session info this gossip topology is concerned with. + session: SessionIndex, + /// Our validator index in the session, if any. + local_index: Option, + /// The canonical shuffling of validators for the session. + canonical_shuffling: Vec<(AuthorityDiscoveryId, ValidatorIndex)>, + /// The reverse mapping of `canonical_shuffling`: from validator index + /// to the index in `canonical_shuffling` + shuffled_indices: Vec, } } ```