Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Remove necessity to pass ConsensusEngineId when registering notifications protocol #7549

Merged
6 commits merged into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions client/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ mod periodic;
#[cfg(test)]
pub(crate) mod tests;

pub use sp_finality_grandpa::GRANDPA_ENGINE_ID;
pub const GRANDPA_PROTOCOL_NAME: &'static str = "/paritytech/grandpa/1";

// cost scalars for reporting peers.
Expand Down Expand Up @@ -215,7 +214,6 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
let validator = Arc::new(validator);
let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
service.clone(),
GRANDPA_ENGINE_ID,
GRANDPA_PROTOCOL_NAME,
validator.clone()
)));
Expand Down
23 changes: 12 additions & 11 deletions client/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ use sc_network_gossip::Validator;
use std::sync::Arc;
use sp_keyring::Ed25519Keyring;
use parity_scale_codec::Encode;
use sp_runtime::{ConsensusEngineId, traits::NumberFor};
use sp_runtime::traits::NumberFor;
use std::{borrow::Cow, pin::Pin, task::{Context, Poll}};
use crate::communication::GRANDPA_PROTOCOL_NAME;
use crate::environment::SharedVoterSetState;
use sp_finality_grandpa::{AuthorityList, GRANDPA_ENGINE_ID};
use sp_finality_grandpa::AuthorityList;
use super::gossip::{self, GossipValidator};
use super::{VoterSet, Round, SetId};

Expand Down Expand Up @@ -57,11 +58,11 @@ impl sc_network_gossip::Network<Block> for TestNetwork {

fn disconnect_peer(&self, _: PeerId) {}

fn write_notification(&self, who: PeerId, _: ConsensusEngineId, message: Vec<u8>) {
fn write_notification(&self, who: PeerId, _: Cow<'static, str>, message: Vec<u8>) {
let _ = self.sender.unbounded_send(Event::WriteNotification(who, message));
}

fn register_notifications_protocol(&self, _: ConsensusEngineId, _: Cow<'static, str>) {}
fn register_notifications_protocol(&self, _: Cow<'static, str>) {}

fn announce(&self, block: Hash, _associated_data: Vec<u8>) {
let _ = self.sender.unbounded_send(Event::Announce(block));
Expand All @@ -86,7 +87,7 @@ impl sc_network_gossip::ValidatorContext<Block> for TestNetwork {
<Self as sc_network_gossip::Network<Block>>::write_notification(
self,
who.clone(),
GRANDPA_ENGINE_ID,
GRANDPA_PROTOCOL_NAME.into(),
data,
);
}
Expand Down Expand Up @@ -287,20 +288,20 @@ fn good_commit_leads_to_relay() {
// Add the sending peer and send the commit
let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened {
remote: sender_id.clone(),
engine_id: GRANDPA_ENGINE_ID,
protocol: GRANDPA_PROTOCOL_NAME.into(),
role: ObservedRole::Full,
});

let _ = sender.unbounded_send(NetworkEvent::NotificationsReceived {
remote: sender_id.clone(),
messages: vec![(GRANDPA_ENGINE_ID, commit_to_send.clone().into())],
messages: vec![(GRANDPA_PROTOCOL_NAME.into(), commit_to_send.clone().into())],
});

// Add a random peer which will be the recipient of this message
let receiver_id = sc_network::PeerId::random();
let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened {
remote: receiver_id.clone(),
engine_id: GRANDPA_ENGINE_ID,
protocol: GRANDPA_PROTOCOL_NAME.into(),
role: ObservedRole::Full,
});

Expand All @@ -319,7 +320,7 @@ fn good_commit_leads_to_relay() {

sender.unbounded_send(NetworkEvent::NotificationsReceived {
remote: receiver_id,
messages: vec![(GRANDPA_ENGINE_ID, msg.encode().into())],
messages: vec![(GRANDPA_PROTOCOL_NAME.into(), msg.encode().into())],
})
};

Expand Down Expand Up @@ -434,12 +435,12 @@ fn bad_commit_leads_to_report() {
Event::EventStream(sender) => {
let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened {
remote: sender_id.clone(),
engine_id: GRANDPA_ENGINE_ID,
protocol: GRANDPA_PROTOCOL_NAME.into(),
role: ObservedRole::Full,
});
let _ = sender.unbounded_send(NetworkEvent::NotificationsReceived {
remote: sender_id.clone(),
messages: vec![(GRANDPA_ENGINE_ID, commit_to_send.clone().into())],
messages: vec![(GRANDPA_PROTOCOL_NAME.into(), commit_to_send.clone().into())],
});

true
Expand Down
1 change: 0 additions & 1 deletion client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,6 @@ where
// to receive GRANDPA messages on the network. We don't process the
// messages.
network.register_notifications_protocol(
communication::GRANDPA_ENGINE_ID,
From::from(communication::GRANDPA_PROTOCOL_NAME),
);

Expand Down
4 changes: 1 addition & 3 deletions client/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ impl TestNetFactory for GrandpaTestNet {

fn add_full_peer(&mut self) {
self.add_full_peer_with_config(FullPeerConfig {
notifications_protocols: vec![
(communication::GRANDPA_ENGINE_ID, communication::GRANDPA_PROTOCOL_NAME.into())
],
notifications_protocols: vec![communication::GRANDPA_PROTOCOL_NAME.into()],
..Default::default()
})
}
Expand Down
51 changes: 25 additions & 26 deletions client/network-gossip/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use futures::prelude::*;
use futures::channel::mpsc::{channel, Sender, Receiver};
use libp2p::PeerId;
use log::trace;
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use sp_runtime::traits::Block as BlockT;
use std::{
borrow::Cow,
collections::{HashMap, VecDeque},
Expand All @@ -38,7 +38,7 @@ pub struct GossipEngine<B: BlockT> {
state_machine: ConsensusGossip<B>,
network: Box<dyn Network<B> + Send>,
periodic_maintenance_interval: futures_timer::Delay,
engine_id: ConsensusEngineId,
protocol: Cow<'static, str>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative to Cow would be something like smol_str which is immutable inlined (up to 22 bytes) string with O(1) clone. The downside is the dependency on a third-party lib in a public API.


/// Incoming events from the network.
network_event_stream: Pin<Box<dyn Stream<Item = Event> + Send>>,
Expand Down Expand Up @@ -68,20 +68,21 @@ impl<B: BlockT> GossipEngine<B> {
/// Create a new instance.
pub fn new<N: Network<B> + Send + Clone + 'static>(
network: N,
engine_id: ConsensusEngineId,
protocol_name: impl Into<Cow<'static, str>>,
protocol: impl Into<Cow<'static, str>>,
validator: Arc<dyn Validator<B>>,
) -> Self where B: 'static {
let protocol = protocol.into();

// We grab the event stream before registering the notifications protocol, otherwise we
// might miss events.
let network_event_stream = network.event_stream();
network.register_notifications_protocol(engine_id, protocol_name.into());
network.register_notifications_protocol(protocol.clone());

GossipEngine {
state_machine: ConsensusGossip::new(validator, engine_id),
state_machine: ConsensusGossip::new(validator, protocol.clone()),
network: Box::new(network),
periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL),
engine_id,
protocol,

network_event_stream,
message_sinks: HashMap::new(),
Expand Down Expand Up @@ -181,21 +182,21 @@ impl<B: BlockT> Future for GossipEngine<B> {
ForwardingState::Idle => {
match this.network_event_stream.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => match event {
Event::NotificationStreamOpened { remote, engine_id, role } => {
if engine_id != this.engine_id {
Event::NotificationStreamOpened { remote, protocol, role } => {
if protocol != this.protocol {
continue;
}
this.state_machine.new_peer(&mut *this.network, remote, role);
}
Event::NotificationStreamClosed { remote, engine_id } => {
if engine_id != this.engine_id {
Event::NotificationStreamClosed { remote, protocol } => {
if protocol != this.protocol {
continue;
}
this.state_machine.peer_disconnected(&mut *this.network, remote);
},
Event::NotificationsReceived { remote, messages } => {
let messages = messages.into_iter().filter_map(|(engine, data)| {
if engine == this.engine_id {
if engine == this.protocol {
Some(data.to_vec())
} else {
None
Expand Down Expand Up @@ -299,6 +300,7 @@ mod tests {
use rand::Rng;
use sc_network::ObservedRole;
use sp_runtime::{testing::H256, traits::{Block as BlockT}};
use std::borrow::Cow;
use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use substrate_test_runtime_client::runtime::Block;
Expand Down Expand Up @@ -329,11 +331,11 @@ mod tests {
unimplemented!();
}

fn write_notification(&self, _: PeerId, _: ConsensusEngineId, _: Vec<u8>) {
fn write_notification(&self, _: PeerId, _: Cow<'static, str>, _: Vec<u8>) {
unimplemented!();
}

fn register_notifications_protocol(&self, _: ConsensusEngineId, _: Cow<'static, str>) {}
fn register_notifications_protocol(&self, _: Cow<'static, str>) {}

fn announce(&self, _: B::Hash, _: Vec<u8>) {
unimplemented!();
Expand Down Expand Up @@ -361,8 +363,7 @@ mod tests {
let network = TestNetwork::default();
let mut gossip_engine = GossipEngine::<Block>::new(
network.clone(),
[1, 2, 3, 4],
"my_protocol",
"/my_protocol",
Arc::new(AllowAll{}),
);

Expand All @@ -383,14 +384,13 @@ mod tests {
#[test]
fn keeps_multiple_subscribers_per_topic_updated_with_both_old_and_new_messages() {
let topic = H256::default();
let engine_id = [1, 2, 3, 4];
let protocol = Cow::Borrowed("/my_protocol");
let remote_peer = PeerId::random();
let network = TestNetwork::default();

let mut gossip_engine = GossipEngine::<Block>::new(
network.clone(),
engine_id.clone(),
"my_protocol",
protocol.clone(),
Arc::new(AllowAll{}),
);

Expand All @@ -404,7 +404,7 @@ mod tests {
event_sender.start_send(
Event::NotificationStreamOpened {
remote: remote_peer.clone(),
engine_id: engine_id.clone(),
protocol: protocol.clone(),
role: ObservedRole::Authority,
}
).expect("Event stream is unbounded; qed.");
Expand All @@ -413,7 +413,7 @@ mod tests {
let events = messages.iter().cloned().map(|m| {
Event::NotificationsReceived {
remote: remote_peer.clone(),
messages: vec![(engine_id, m.into())]
messages: vec![(protocol.clone(), m.into())]
}
}).collect::<Vec<_>>();

Expand Down Expand Up @@ -498,7 +498,7 @@ mod tests {
}

fn prop(channels: Vec<ChannelLengthAndTopic>, notifications: Vec<Vec<Message>>) {
let engine_id = [1, 2, 3, 4];
let protocol = Cow::Borrowed("/my_protocol");
let remote_peer = PeerId::random();
let network = TestNetwork::default();

Expand All @@ -524,8 +524,7 @@ mod tests {

let mut gossip_engine = GossipEngine::<Block>::new(
network.clone(),
engine_id.clone(),
"my_protocol",
protocol.clone(),
Arc::new(TestValidator{}),
);

Expand Down Expand Up @@ -558,7 +557,7 @@ mod tests {
event_sender.start_send(
Event::NotificationStreamOpened {
remote: remote_peer.clone(),
engine_id: engine_id.clone(),
protocol: protocol.clone(),
role: ObservedRole::Authority,
}
).expect("Event stream is unbounded; qed.");
Expand All @@ -576,7 +575,7 @@ mod tests {
message.push(i_notification.try_into().unwrap());
message.push(i_message.try_into().unwrap());

(engine_id, message.into())
(protocol.clone(), message.into())
}).collect();

event_sender.start_send(Event::NotificationsReceived {
Expand Down
18 changes: 8 additions & 10 deletions client/network-gossip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
//! - Implement the `Network` trait, representing the low-level networking primitives. It is
//! already implemented on `sc_network::NetworkService`.
//! - Implement the `Validator` trait. See the section below.
//! - Decide on a `ConsensusEngineId`. Each gossiping protocol should have a different one.
//! - Decide on a protocol name. Each gossiping protocol should have a different one.
//! - Build a `GossipEngine` using these three elements.
//! - Use the methods of the `GossipEngine` in order to send out messages and receive incoming
//! messages.
Expand All @@ -60,7 +60,7 @@ pub use self::validator::{DiscardAll, MessageIntent, Validator, ValidatorContext

use futures::prelude::*;
use sc_network::{Event, ExHashT, NetworkService, PeerId, ReputationChange};
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use sp_runtime::{traits::Block as BlockT};
ordian marked this conversation as resolved.
Show resolved Hide resolved
use std::{borrow::Cow, pin::Pin, sync::Arc};

mod bridge;
Expand All @@ -79,15 +79,14 @@ pub trait Network<B: BlockT> {
fn disconnect_peer(&self, who: PeerId);

/// Send a notification to a peer.
fn write_notification(&self, who: PeerId, engine_id: ConsensusEngineId, message: Vec<u8>);
fn write_notification(&self, who: PeerId, protocol: Cow<'static, str>, message: Vec<u8>);

/// Registers a notifications protocol.
///
/// See the documentation of [`NetworkService:register_notifications_protocol`] for more information.
fn register_notifications_protocol(
&self,
engine_id: ConsensusEngineId,
protocol_name: Cow<'static, str>,
protocol: Cow<'static, str>,
);

/// Notify everyone we're connected to that we have the given block.
Expand All @@ -110,16 +109,15 @@ impl<B: BlockT, H: ExHashT> Network<B> for Arc<NetworkService<B, H>> {
NetworkService::disconnect_peer(self, who)
}

fn write_notification(&self, who: PeerId, engine_id: ConsensusEngineId, message: Vec<u8>) {
NetworkService::write_notification(self, who, engine_id, message)
fn write_notification(&self, who: PeerId, protocol: Cow<'static, str>, message: Vec<u8>) {
NetworkService::write_notification(self, who, protocol, message)
}

fn register_notifications_protocol(
&self,
engine_id: ConsensusEngineId,
protocol_name: Cow<'static, str>,
protocol: Cow<'static, str>,
) {
NetworkService::register_notifications_protocol(self, engine_id, protocol_name)
NetworkService::register_notifications_protocol(self, protocol)
}

fn announce(&self, block: B::Hash, associated_data: Vec<u8>) {
Expand Down
Loading