Skip to content

Commit

Permalink
Remove necessity to pass ConsensusEngineId when registering notificat…
Browse files Browse the repository at this point in the history
…ions protocol (paritytech#7549)

* Remove necessity to pass ConsensusEngineId when registering notifications protocol

* Line width

* Fix tests protocol name

* Other renames

* Doc update

* Change issue in TODO
  • Loading branch information
tomaka authored and shamb0 committed Nov 21, 2020
1 parent 272077c commit b552b03
Show file tree
Hide file tree
Showing 18 changed files with 228 additions and 284 deletions.
2 changes: 0 additions & 2 deletions client/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ mod periodic;
#[cfg(test)]
pub(crate) mod tests;

pub use sp_finality_grandpa::GRANDPA_ENGINE_ID;
pub const GRANDPA_PROTOCOL_NAME: &'static str = "/paritytech/grandpa/1";

// cost scalars for reporting peers.
Expand Down Expand Up @@ -215,7 +214,6 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
let validator = Arc::new(validator);
let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
service.clone(),
GRANDPA_ENGINE_ID,
GRANDPA_PROTOCOL_NAME,
validator.clone()
)));
Expand Down
23 changes: 12 additions & 11 deletions client/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ use sc_network_gossip::Validator;
use std::sync::Arc;
use sp_keyring::Ed25519Keyring;
use parity_scale_codec::Encode;
use sp_runtime::{ConsensusEngineId, traits::NumberFor};
use sp_runtime::traits::NumberFor;
use std::{borrow::Cow, pin::Pin, task::{Context, Poll}};
use crate::communication::GRANDPA_PROTOCOL_NAME;
use crate::environment::SharedVoterSetState;
use sp_finality_grandpa::{AuthorityList, GRANDPA_ENGINE_ID};
use sp_finality_grandpa::AuthorityList;
use super::gossip::{self, GossipValidator};
use super::{VoterSet, Round, SetId};

Expand Down Expand Up @@ -57,11 +58,11 @@ impl sc_network_gossip::Network<Block> for TestNetwork {

fn disconnect_peer(&self, _: PeerId) {}

fn write_notification(&self, who: PeerId, _: ConsensusEngineId, message: Vec<u8>) {
fn write_notification(&self, who: PeerId, _: Cow<'static, str>, message: Vec<u8>) {
let _ = self.sender.unbounded_send(Event::WriteNotification(who, message));
}

fn register_notifications_protocol(&self, _: ConsensusEngineId, _: Cow<'static, str>) {}
fn register_notifications_protocol(&self, _: Cow<'static, str>) {}

fn announce(&self, block: Hash, _associated_data: Vec<u8>) {
let _ = self.sender.unbounded_send(Event::Announce(block));
Expand All @@ -86,7 +87,7 @@ impl sc_network_gossip::ValidatorContext<Block> for TestNetwork {
<Self as sc_network_gossip::Network<Block>>::write_notification(
self,
who.clone(),
GRANDPA_ENGINE_ID,
GRANDPA_PROTOCOL_NAME.into(),
data,
);
}
Expand Down Expand Up @@ -287,20 +288,20 @@ fn good_commit_leads_to_relay() {
// Add the sending peer and send the commit
let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened {
remote: sender_id.clone(),
engine_id: GRANDPA_ENGINE_ID,
protocol: GRANDPA_PROTOCOL_NAME.into(),
role: ObservedRole::Full,
});

let _ = sender.unbounded_send(NetworkEvent::NotificationsReceived {
remote: sender_id.clone(),
messages: vec![(GRANDPA_ENGINE_ID, commit_to_send.clone().into())],
messages: vec![(GRANDPA_PROTOCOL_NAME.into(), commit_to_send.clone().into())],
});

// Add a random peer which will be the recipient of this message
let receiver_id = sc_network::PeerId::random();
let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened {
remote: receiver_id.clone(),
engine_id: GRANDPA_ENGINE_ID,
protocol: GRANDPA_PROTOCOL_NAME.into(),
role: ObservedRole::Full,
});

Expand All @@ -319,7 +320,7 @@ fn good_commit_leads_to_relay() {

sender.unbounded_send(NetworkEvent::NotificationsReceived {
remote: receiver_id,
messages: vec![(GRANDPA_ENGINE_ID, msg.encode().into())],
messages: vec![(GRANDPA_PROTOCOL_NAME.into(), msg.encode().into())],
})
};

Expand Down Expand Up @@ -434,12 +435,12 @@ fn bad_commit_leads_to_report() {
Event::EventStream(sender) => {
let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened {
remote: sender_id.clone(),
engine_id: GRANDPA_ENGINE_ID,
protocol: GRANDPA_PROTOCOL_NAME.into(),
role: ObservedRole::Full,
});
let _ = sender.unbounded_send(NetworkEvent::NotificationsReceived {
remote: sender_id.clone(),
messages: vec![(GRANDPA_ENGINE_ID, commit_to_send.clone().into())],
messages: vec![(GRANDPA_PROTOCOL_NAME.into(), commit_to_send.clone().into())],
});

true
Expand Down
1 change: 0 additions & 1 deletion client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,6 @@ where
// to receive GRANDPA messages on the network. We don't process the
// messages.
network.register_notifications_protocol(
communication::GRANDPA_ENGINE_ID,
From::from(communication::GRANDPA_PROTOCOL_NAME),
);

Expand Down
4 changes: 1 addition & 3 deletions client/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ impl TestNetFactory for GrandpaTestNet {

fn add_full_peer(&mut self) {
self.add_full_peer_with_config(FullPeerConfig {
notifications_protocols: vec![
(communication::GRANDPA_ENGINE_ID, communication::GRANDPA_PROTOCOL_NAME.into())
],
notifications_protocols: vec![communication::GRANDPA_PROTOCOL_NAME.into()],
..Default::default()
})
}
Expand Down
51 changes: 25 additions & 26 deletions client/network-gossip/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use futures::prelude::*;
use futures::channel::mpsc::{channel, Sender, Receiver};
use libp2p::PeerId;
use log::trace;
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use sp_runtime::traits::Block as BlockT;
use std::{
borrow::Cow,
collections::{HashMap, VecDeque},
Expand All @@ -38,7 +38,7 @@ pub struct GossipEngine<B: BlockT> {
state_machine: ConsensusGossip<B>,
network: Box<dyn Network<B> + Send>,
periodic_maintenance_interval: futures_timer::Delay,
engine_id: ConsensusEngineId,
protocol: Cow<'static, str>,

/// Incoming events from the network.
network_event_stream: Pin<Box<dyn Stream<Item = Event> + Send>>,
Expand Down Expand Up @@ -68,20 +68,21 @@ impl<B: BlockT> GossipEngine<B> {
/// Create a new instance.
pub fn new<N: Network<B> + Send + Clone + 'static>(
network: N,
engine_id: ConsensusEngineId,
protocol_name: impl Into<Cow<'static, str>>,
protocol: impl Into<Cow<'static, str>>,
validator: Arc<dyn Validator<B>>,
) -> Self where B: 'static {
let protocol = protocol.into();

// We grab the event stream before registering the notifications protocol, otherwise we
// might miss events.
let network_event_stream = network.event_stream();
network.register_notifications_protocol(engine_id, protocol_name.into());
network.register_notifications_protocol(protocol.clone());

GossipEngine {
state_machine: ConsensusGossip::new(validator, engine_id),
state_machine: ConsensusGossip::new(validator, protocol.clone()),
network: Box::new(network),
periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL),
engine_id,
protocol,

network_event_stream,
message_sinks: HashMap::new(),
Expand Down Expand Up @@ -181,21 +182,21 @@ impl<B: BlockT> Future for GossipEngine<B> {
ForwardingState::Idle => {
match this.network_event_stream.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => match event {
Event::NotificationStreamOpened { remote, engine_id, role } => {
if engine_id != this.engine_id {
Event::NotificationStreamOpened { remote, protocol, role } => {
if protocol != this.protocol {
continue;
}
this.state_machine.new_peer(&mut *this.network, remote, role);
}
Event::NotificationStreamClosed { remote, engine_id } => {
if engine_id != this.engine_id {
Event::NotificationStreamClosed { remote, protocol } => {
if protocol != this.protocol {
continue;
}
this.state_machine.peer_disconnected(&mut *this.network, remote);
},
Event::NotificationsReceived { remote, messages } => {
let messages = messages.into_iter().filter_map(|(engine, data)| {
if engine == this.engine_id {
if engine == this.protocol {
Some(data.to_vec())
} else {
None
Expand Down Expand Up @@ -299,6 +300,7 @@ mod tests {
use rand::Rng;
use sc_network::ObservedRole;
use sp_runtime::{testing::H256, traits::{Block as BlockT}};
use std::borrow::Cow;
use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use substrate_test_runtime_client::runtime::Block;
Expand Down Expand Up @@ -329,11 +331,11 @@ mod tests {
unimplemented!();
}

fn write_notification(&self, _: PeerId, _: ConsensusEngineId, _: Vec<u8>) {
fn write_notification(&self, _: PeerId, _: Cow<'static, str>, _: Vec<u8>) {
unimplemented!();
}

fn register_notifications_protocol(&self, _: ConsensusEngineId, _: Cow<'static, str>) {}
fn register_notifications_protocol(&self, _: Cow<'static, str>) {}

fn announce(&self, _: B::Hash, _: Vec<u8>) {
unimplemented!();
Expand Down Expand Up @@ -361,8 +363,7 @@ mod tests {
let network = TestNetwork::default();
let mut gossip_engine = GossipEngine::<Block>::new(
network.clone(),
[1, 2, 3, 4],
"my_protocol",
"/my_protocol",
Arc::new(AllowAll{}),
);

Expand All @@ -383,14 +384,13 @@ mod tests {
#[test]
fn keeps_multiple_subscribers_per_topic_updated_with_both_old_and_new_messages() {
let topic = H256::default();
let engine_id = [1, 2, 3, 4];
let protocol = Cow::Borrowed("/my_protocol");
let remote_peer = PeerId::random();
let network = TestNetwork::default();

let mut gossip_engine = GossipEngine::<Block>::new(
network.clone(),
engine_id.clone(),
"my_protocol",
protocol.clone(),
Arc::new(AllowAll{}),
);

Expand All @@ -404,7 +404,7 @@ mod tests {
event_sender.start_send(
Event::NotificationStreamOpened {
remote: remote_peer.clone(),
engine_id: engine_id.clone(),
protocol: protocol.clone(),
role: ObservedRole::Authority,
}
).expect("Event stream is unbounded; qed.");
Expand All @@ -413,7 +413,7 @@ mod tests {
let events = messages.iter().cloned().map(|m| {
Event::NotificationsReceived {
remote: remote_peer.clone(),
messages: vec![(engine_id, m.into())]
messages: vec![(protocol.clone(), m.into())]
}
}).collect::<Vec<_>>();

Expand Down Expand Up @@ -498,7 +498,7 @@ mod tests {
}

fn prop(channels: Vec<ChannelLengthAndTopic>, notifications: Vec<Vec<Message>>) {
let engine_id = [1, 2, 3, 4];
let protocol = Cow::Borrowed("/my_protocol");
let remote_peer = PeerId::random();
let network = TestNetwork::default();

Expand All @@ -524,8 +524,7 @@ mod tests {

let mut gossip_engine = GossipEngine::<Block>::new(
network.clone(),
engine_id.clone(),
"my_protocol",
protocol.clone(),
Arc::new(TestValidator{}),
);

Expand Down Expand Up @@ -558,7 +557,7 @@ mod tests {
event_sender.start_send(
Event::NotificationStreamOpened {
remote: remote_peer.clone(),
engine_id: engine_id.clone(),
protocol: protocol.clone(),
role: ObservedRole::Authority,
}
).expect("Event stream is unbounded; qed.");
Expand All @@ -576,7 +575,7 @@ mod tests {
message.push(i_notification.try_into().unwrap());
message.push(i_message.try_into().unwrap());

(engine_id, message.into())
(protocol.clone(), message.into())
}).collect();

event_sender.start_send(Event::NotificationsReceived {
Expand Down
18 changes: 8 additions & 10 deletions client/network-gossip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
//! - Implement the `Network` trait, representing the low-level networking primitives. It is
//! already implemented on `sc_network::NetworkService`.
//! - Implement the `Validator` trait. See the section below.
//! - Decide on a `ConsensusEngineId`. Each gossiping protocol should have a different one.
//! - Decide on a protocol name. Each gossiping protocol should have a different one.
//! - Build a `GossipEngine` using these three elements.
//! - Use the methods of the `GossipEngine` in order to send out messages and receive incoming
//! messages.
Expand All @@ -60,7 +60,7 @@ pub use self::validator::{DiscardAll, MessageIntent, Validator, ValidatorContext

use futures::prelude::*;
use sc_network::{Event, ExHashT, NetworkService, PeerId, ReputationChange};
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use sp_runtime::{traits::Block as BlockT};
use std::{borrow::Cow, pin::Pin, sync::Arc};

mod bridge;
Expand All @@ -79,15 +79,14 @@ pub trait Network<B: BlockT> {
fn disconnect_peer(&self, who: PeerId);

/// Send a notification to a peer.
fn write_notification(&self, who: PeerId, engine_id: ConsensusEngineId, message: Vec<u8>);
fn write_notification(&self, who: PeerId, protocol: Cow<'static, str>, message: Vec<u8>);

/// Registers a notifications protocol.
///
/// See the documentation of [`NetworkService:register_notifications_protocol`] for more information.
fn register_notifications_protocol(
&self,
engine_id: ConsensusEngineId,
protocol_name: Cow<'static, str>,
protocol: Cow<'static, str>,
);

/// Notify everyone we're connected to that we have the given block.
Expand All @@ -110,16 +109,15 @@ impl<B: BlockT, H: ExHashT> Network<B> for Arc<NetworkService<B, H>> {
NetworkService::disconnect_peer(self, who)
}

fn write_notification(&self, who: PeerId, engine_id: ConsensusEngineId, message: Vec<u8>) {
NetworkService::write_notification(self, who, engine_id, message)
fn write_notification(&self, who: PeerId, protocol: Cow<'static, str>, message: Vec<u8>) {
NetworkService::write_notification(self, who, protocol, message)
}

fn register_notifications_protocol(
&self,
engine_id: ConsensusEngineId,
protocol_name: Cow<'static, str>,
protocol: Cow<'static, str>,
) {
NetworkService::register_notifications_protocol(self, engine_id, protocol_name)
NetworkService::register_notifications_protocol(self, protocol)
}

fn announce(&self, block: B::Hash, associated_data: Vec<u8>) {
Expand Down
Loading

0 comments on commit b552b03

Please sign in to comment.