Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Rework consensus instance communication with the network worker #958

Merged
merged 1 commit into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
//! This manages routing for parachain statements, parachain block and outgoing message
//! data fetching, communication between collators and validators, and more.

#![recursion_limit="256"]

use polkadot_primitives::{Block, Hash, BlakeTwo256, HashT};

pub mod legacy;
Expand Down
68 changes: 39 additions & 29 deletions network/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use codec::{Decode, Encode};
use futures::channel::{mpsc, oneshot};
use futures::future::Either;
use futures::prelude::*;
use futures::task::{Spawn, SpawnExt};
use futures::task::{Spawn, SpawnExt, Context, Poll};
use futures::stream::{FuturesUnordered, StreamFuture};
use log::{debug, trace};

use polkadot_primitives::{
Expand Down Expand Up @@ -76,8 +77,7 @@ enum ServiceToWorkerMsg {
PeerDisconnected(PeerId),

// service messages.
BuildConsensusNetworking(Arc<SharedTable>, Vec<ValidatorId>),
DropConsensusNetworking(Hash),
BuildConsensusNetworking(mpsc::Receiver<ServiceToWorkerMsg>, Arc<SharedTable>, Vec<ValidatorId>),
SubmitValidatedCollation(
AbridgedCandidateReceipt,
PoVBlock,
Expand Down Expand Up @@ -782,6 +782,21 @@ fn send_peer_collations(
}
}

/// Receives messages associated to a certain consensus networking instance.
struct ConsensusNetworkingReceiver {
receiver: mpsc::Receiver<ServiceToWorkerMsg>,
/// The relay parent of this consensus network.
relay_parent: Hash,
}

impl Stream for ConsensusNetworkingReceiver {
type Item = ServiceToWorkerMsg;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.receiver).poll_next(cx)
}
}

struct Worker<Api, Sp, Gossip> {
protocol_handler: ProtocolHandler,
api: Arc<Api>,
Expand All @@ -790,6 +805,7 @@ struct Worker<Api, Sp, Gossip> {
background_to_main_sender: mpsc::Sender<BackgroundToWorkerMsg>,
background_receiver: mpsc::Receiver<BackgroundToWorkerMsg>,
service_receiver: mpsc::Receiver<ServiceToWorkerMsg>,
consensus_networking_receivers: FuturesUnordered<StreamFuture<ConsensusNetworkingReceiver>>,
}

impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
Expand All @@ -801,6 +817,7 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
// spawns a background task to spawn consensus networking.
fn build_consensus_networking(
&mut self,
receiver: mpsc::Receiver<ServiceToWorkerMsg>,
table: Arc<SharedTable>,
authorities: Vec<ValidatorId>,
) {
Expand Down Expand Up @@ -832,6 +849,9 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
},
);

let relay_parent = table.signing_context().parent_hash;
self.consensus_networking_receivers.push(ConsensusNetworkingReceiver { receiver, relay_parent }.into_future());

// glue the incoming messages, shared table, and validation
// work together.
let _ = self.executor.spawn(statement_import_loop(
Expand All @@ -855,12 +875,8 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
ServiceToWorkerMsg::PeerMessage(remote, messages) => {
self.protocol_handler.on_raw_messages(remote, messages)
}

ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities) => {
self.build_consensus_networking(table, authorities);
}
ServiceToWorkerMsg::DropConsensusNetworking(relay_parent) => {
self.protocol_handler.drop_consensus_networking(&relay_parent);
ServiceToWorkerMsg::BuildConsensusNetworking(receiver, table, authorities) => {
self.build_consensus_networking(receiver, table, authorities);
}
ServiceToWorkerMsg::SubmitValidatedCollation(receipt, pov_block, chunks) => {
let relay_parent = receipt.relay_parent;
Expand Down Expand Up @@ -985,6 +1001,16 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
Some(msg) => self.handle_service_message(msg),
None => return,
},
consensus_service_msg = self.consensus_networking_receivers.next() => match consensus_service_msg {
Some((Some(msg), receiver)) => {
self.handle_service_message(msg);
self.consensus_networking_receivers.push(receiver.into_future());
},
Some((None, receiver)) => {
self.protocol_handler.drop_consensus_networking(&receiver.relay_parent);
},
None => {},
},
background_msg = self.background_receiver.next() => match background_msg {
Some(msg) => self.handle_background_message(msg),
None => return,
Expand Down Expand Up @@ -1017,6 +1043,7 @@ async fn worker_loop<Api, Sp>(
background_to_main_sender: background_tx,
background_receiver: background_rx,
service_receiver: receiver,
consensus_networking_receivers: Default::default(),
};

worker.main_loop().await
Expand Down Expand Up @@ -1296,24 +1323,6 @@ struct RouterInner {
sender: mpsc::Sender<ServiceToWorkerMsg>,
}

impl Drop for RouterInner {
fn drop(&mut self) {
let res = self.sender.try_send(
ServiceToWorkerMsg::DropConsensusNetworking(self.relay_parent)
);

if let Err(e) = res {
assert!(
!e.is_full(),
"futures 0.3 guarantees at least one free slot in the capacity \
per sender; this is the first message sent via this sender; \
therefore we will not have to wait for capacity; qed"
);
// other error variants (disconnection) are fine here.
}
}
}

impl Service {
/// Register an availablility-store that the network can query.
pub fn register_availability_store(&self, store: av_store::Store) {
Expand Down Expand Up @@ -1379,14 +1388,15 @@ impl ParachainNetwork for Service {
let relay_parent = table.signing_context().parent_hash.clone();

Box::pin(async move {
let (router_sender, receiver) = mpsc::channel(0);
sender.send(
ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities)
ServiceToWorkerMsg::BuildConsensusNetworking(receiver, table, authorities)
).await?;

Ok(Router {
inner: Arc::new(RouterInner {
relay_parent,
sender,
sender: router_sender,
})
})
})
Expand Down
97 changes: 78 additions & 19 deletions network/src/protocol/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type GossipStreamEntry = (mpsc::UnboundedReceiver<TopicNotification>, oneshot::S
#[derive(Default, Clone)]
struct MockGossip {
inner: Arc<Mutex<HashMap<Hash, GossipStreamEntry>>>,
gossip_messages: Arc<Mutex<HashMap<Hash, GossipMessage>>>,
}

impl MockGossip {
Expand Down Expand Up @@ -102,8 +103,8 @@ impl crate::legacy::GossipService for MockGossip {
})
}

fn gossip_message(&self, _topic: Hash, _message: GossipMessage) {

fn gossip_message(&self, topic: Hash, message: GossipMessage) {
self.gossip_messages.lock().insert(topic, message);
}

fn send_message(&self, _who: PeerId, _message: GossipMessage) {
Expand Down Expand Up @@ -250,22 +251,6 @@ fn test_setup(config: Config) -> (
(service, mock_gossip, pool, worker_task)
}

#[test]
fn router_inner_drop_sends_worker_message() {
let parent = [1; 32].into();

let (sender, mut receiver) = mpsc::channel(0);
drop(RouterInner {
relay_parent: parent,
sender,
});

match receiver.try_next() {
Ok(Some(ServiceToWorkerMsg::DropConsensusNetworking(x))) => assert_eq!(parent, x),
_ => panic!("message not sent"),
}
}

#[test]
fn worker_task_shuts_down_when_sender_dropped() {
let (service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
Expand All @@ -274,6 +259,30 @@ fn worker_task_shuts_down_when_sender_dropped() {
let _ = pool.run_until(worker_task);
}

/// Given the async nature of `select!` that is being used in the main loop of the worker
/// and that consensus instances use their own channels, we don't know when the synchronize message
/// is handled. This helper functions checks multiple times that the given instance is dropped. Even
/// if the first round fails, the second one should be successful as the consensus instance drop
/// should be already handled this time.
fn wait_for_instance_drop(service: &mut Service, pool: &mut LocalPool, instance: Hash) {
let mut try_counter = 0;
let max_tries = 3;

while try_counter < max_tries {
let dropped = pool.run_until(service.synchronize(move |proto| {
!proto.consensus_instances.contains_key(&instance)
}));

if dropped {
return;
}

try_counter += 1;
}

panic!("Consensus instance `{}` wasn't dropped!", instance);
}

#[test]
fn consensus_instances_cleaned_up() {
let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
Expand All @@ -300,11 +309,61 @@ fn consensus_instances_cleaned_up() {

drop(router);

wait_for_instance_drop(&mut service, &mut pool, relay_parent);
}

#[test]
fn collation_is_received_with_dropped_router() {
let (mut service, gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
let relay_parent = [0; 32].into();
let topic = crate::legacy::gossip::attestation_topic(relay_parent);

let signing_context = SigningContext {
session_index: Default::default(),
parent_hash: relay_parent,
};
let table = Arc::new(SharedTable::new(
vec![Sr25519Keyring::Alice.public().into()],
HashMap::new(),
Some(Arc::new(Sr25519Keyring::Alice.pair().into())),
signing_context,
AvailabilityStore::new_in_memory(service.clone()),
None,
));

pool.spawner().spawn_local(worker_task).unwrap();

let router = pool.run_until(
service.build_table_router(table, &[])
).unwrap();

let receipt = AbridgedCandidateReceipt { relay_parent, ..Default::default() };
let local_collation_future = router.local_collation(
receipt,
PoVBlock { block_data: BlockData(Vec::new()) },
(0, &[]),
);

// Drop the router and make sure that the consensus instance is still alive
drop(router);

assert!(pool.run_until(service.synchronize(move |proto| {
!proto.consensus_instances.contains_key(&relay_parent)
proto.consensus_instances.contains_key(&relay_parent)
})));

// The gossip message should still be unknown
assert!(!gossip.gossip_messages.lock().contains_key(&topic));

pool.run_until(local_collation_future).unwrap();

// Make sure the instance is now dropped and the message was gossiped
wait_for_instance_drop(&mut service, &mut pool, relay_parent);
assert!(pool.run_until(service.synchronize(move |_| {
gossip.gossip_messages.lock().contains_key(&topic)
})));
}


#[test]
fn validator_peer_cleaned_up() {
let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
Expand Down
7 changes: 4 additions & 3 deletions primitives/src/parachain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ pub type ValidatorId = validator_app::Public;
/// Index of the validator is used as a lightweight replacement of the `ValidatorId` when appropriate.
pub type ValidatorIndex = u32;

/// A Parachain validator keypair.
#[cfg(feature = "std")]
pub type ValidatorPair = validator_app::Pair;
application_crypto::with_pair! {
/// A Parachain validator keypair.
pub type ValidatorPair = validator_app::Pair;
}

/// Signature with which parachain validators sign blocks.
///
Expand Down