Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Use substrate codec for network messages #333

Merged
merged 3 commits into from
Jul 16, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions polkadot/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,19 +222,19 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
// TODO [rob]: collation node implementation
// This isn't a thing. Different parachains will have their own collator executables and
// maybe link to libpolkadot to get a light-client.
service::Role::LIGHT
service::Roles::LIGHT
} else if matches.is_present("light") {
info!("Starting (light)");
config.execution_strategy = service::ExecutionStrategy::NativeWhenPossible;
service::Role::LIGHT
service::Roles::LIGHT
} else if matches.is_present("validator") || matches.is_present("dev") {
info!("Starting validator");
config.execution_strategy = service::ExecutionStrategy::Both;
service::Role::AUTHORITY
service::Roles::AUTHORITY
} else {
info!("Starting (heavy)");
config.execution_strategy = service::ExecutionStrategy::NativeWhenPossible;
service::Role::FULL
service::Roles::FULL
};

if let Some(s) = matches.value_of("execution") {
Expand Down Expand Up @@ -303,7 +303,7 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
None
};

match role == service::Role::LIGHT {
match role == service::Roles::LIGHT {
true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf, worker)?,
false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf, worker)?,
}
Expand Down
3 changes: 0 additions & 3 deletions polkadot/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ authors = ["Parity Technologies <[email protected]>"]
description = "Polkadot-specific networking protocol"

[dependencies]
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
parking_lot = "0.4"
polkadot-api = { path = "../api" }
polkadot-consensus = { path = "../consensus" }
Expand Down
23 changes: 20 additions & 3 deletions polkadot/network/src/collator_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use polkadot_primitives::{AccountId, Hash};
use polkadot_primitives::parachain::{Id as ParaId, Collation};
use codec;

use futures::sync::oneshot;

Expand All @@ -27,12 +28,28 @@ use std::time::{Duration, Instant};
const COLLATION_LIFETIME: Duration = Duration::from_secs(60 * 5);

/// The role of the collator. Whether they're the primary or backup for this parachain.
#[derive(PartialEq, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Debug, Clone, Copy)]
pub enum Role {
/// Primary collators should send collations whenever it's time.
Primary,
Primary = 0,
/// Backup collators should not.
Backup,
Backup = 1,
}

impl codec::Encode for Role {
fn encode_to<T: codec::Output>(&self, dest: &mut T) {
dest.push_byte(*self as u8);
}
}

impl codec::Decode for Role {
fn decode<I: codec::Input>(input: &mut I) -> Option<Self> {
match input.read_byte()? {
x if x == Role::Primary as u8 => Some(Role::Primary),
x if x == Role::Backup as u8 => Some(Role::Backup),
_ => None,
}
}
}

/// A maintenance action for the collator set.
Expand Down
3 changes: 2 additions & 1 deletion polkadot/network/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use polkadot_api::{PolkadotApi, LocalPolkadotApi};
use polkadot_consensus::{Network, SharedTable, Collators};
use polkadot_primitives::{AccountId, Block, Hash, SessionKey};
use polkadot_primitives::parachain::{Id as ParaId, Collation};
use codec::Decode;

use futures::prelude::*;
use futures::sync::mpsc;
Expand Down Expand Up @@ -175,7 +176,7 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> MessageProcessTask<P> {
}
}
ConsensusMessage::ChainSpecific(msg, _) => {
if let Ok(Message::Statement(parent_hash, statement)) = ::serde_json::from_slice(&msg) {
if let Some(Message::Statement(parent_hash, statement)) = Decode::decode(&mut msg.as_slice()) {
if ::polkadot_consensus::check_statement(&statement.statement, &statement.signature, statement.sender, &parent_hash) {
self.table_router.import_statement(statement);
}
Expand Down
78 changes: 60 additions & 18 deletions polkadot/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@
//! parachain block and extrinsic data fetching, communication between collators and validators,
//! and more.

extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;

extern crate substrate_bft as bft;
extern crate substrate_codec as codec;
extern crate substrate_network;
Expand All @@ -47,7 +42,7 @@ mod collator_pool;
mod router;
pub mod consensus;

use codec::{Decode, Encode};
use codec::{Decode, Encode, Input, Output};
use futures::sync::oneshot;
use parking_lot::Mutex;
use polkadot_consensus::{Statement, SignedStatement, GenericStatement};
Expand Down Expand Up @@ -188,7 +183,6 @@ impl CurrentConsensus {
}

/// Polkadot-specific messages.
#[derive(Serialize, Deserialize)]
pub enum Message {
/// signed statement and localized parent hash.
Statement(Hash, SignedStatement),
Expand All @@ -205,8 +199,58 @@ pub enum Message {
Collation(Hash, Collation),
}

impl Encode for Message {
fn encode_to<T: Output>(&self, dest: &mut T) {
match *self {
Message::Statement(ref h, ref s) => {
dest.push_byte(0);
dest.push(h);
dest.push(s);
}
Message::SessionKey(ref h, ref k) => {
dest.push_byte(1);
dest.push(h);
dest.push(k);
}
Message::RequestBlockData(ref id, ref d) => {
dest.push_byte(2);
dest.push(id);
dest.push(d);
}
Message::BlockData(ref id, ref d) => {
dest.push_byte(3);
dest.push(id);
dest.push(d);
}
Message::CollatorRole(ref r) => {
dest.push_byte(4);
dest.push(r);
}
Message::Collation(ref h, ref c) => {
dest.push_byte(5);
dest.push(h);
dest.push(c);
}
}
}
}

impl Decode for Message {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
match input.read_byte()? {
0 => Some(Message::Statement(Decode::decode(input)?, Decode::decode(input)?)),
1 => Some(Message::SessionKey(Decode::decode(input)?, Decode::decode(input)?)),
2 => Some(Message::RequestBlockData(Decode::decode(input)?, Decode::decode(input)?)),
3 => Some(Message::BlockData(Decode::decode(input)?, Decode::decode(input)?)),
4 => Some(Message::CollatorRole(Decode::decode(input)?)),
5 => Some(Message::Collation(Decode::decode(input)?, Decode::decode(input)?)),
_ => None,
}
}
}

fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message) {
let encoded = ::serde_json::to_vec(&message).expect("serialization of messages infallible; qed");
let encoded = message.encode();
ctx.send_message(to, generic_message::Message::ChainSpecific(encoded))
}

Expand Down Expand Up @@ -244,9 +288,7 @@ impl PolkadotProtocol {
/// Send a statement to a validator.
fn send_statement(&mut self, ctx: &mut Context<Block>, _val: SessionKey, parent_hash: Hash, statement: SignedStatement) {
// TODO: something more targeted than gossip.
let raw = ::serde_json::to_vec(&Message::Statement(parent_hash, statement))
.expect("message serialization infallible; qed");

let raw = Message::Statement(parent_hash, statement).encode();
self.consensus_gossip.multicast_chain_specific(ctx, raw, parent_hash);
}

Expand Down Expand Up @@ -427,7 +469,7 @@ impl Specialization<Block> for PolkadotProtocol {
);
}

let validator = status.roles.iter().any(|r| *r == message::Role::Authority);
let validator = status.roles.contains(substrate_network::Roles::AUTHORITY);
let send_key = validator || local_status.collating_for.is_some();

self.peers.insert(peer_id, PeerInfo {
Expand All @@ -436,7 +478,7 @@ impl Specialization<Block> for PolkadotProtocol {
validator,
});

self.consensus_gossip.new_peer(ctx, peer_id, &status.roles);
self.consensus_gossip.new_peer(ctx, peer_id, status.roles);
if let (true, &Some(ref consensus)) = (send_key, &self.live_consensus) {
send_polkadot_message(
ctx,
Expand Down Expand Up @@ -497,11 +539,11 @@ impl Specialization<Block> for PolkadotProtocol {
self.consensus_gossip.on_bft_message(ctx, peer_id, msg)
}
generic_message::Message::ChainSpecific(raw) => {
match serde_json::from_slice(&raw) {
Ok(msg) => self.on_polkadot_message(ctx, peer_id, raw, msg),
Err(e) => {
trace!(target: "p_net", "Bad message from {}: {}", peer_id, e);
ctx.disable_peer(peer_id, "Unknown Polkadot-protocol reason");
match Message::decode(&mut raw.as_slice()) {
Some(msg) => self.on_polkadot_message(ctx, peer_id, raw, msg),
None => {
trace!(target: "p_net", "Bad message from {}", peer_id);
ctx.disable_peer(peer_id, "Invalid polkadot protocol message format");
}
}
}
Expand Down
21 changes: 9 additions & 12 deletions polkadot/network/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use polkadot_primitives::{Block, Hash, SessionKey};
use polkadot_primitives::parachain::{CandidateReceipt, HeadData, BlockData};
use substrate_primitives::H512;
use codec::Encode;
use substrate_network::{PeerId, PeerInfo, ClientHandle, Context, message::Message as SubstrateMessage, message::Role, specialization::Specialization, generic_message::Message as GenericMessage};
use substrate_network::{PeerId, PeerInfo, ClientHandle, Context, Roles, message::Message as SubstrateMessage, specialization::Specialization, generic_message::Message as GenericMessage};

use std::sync::Arc;
use futures::Future;
Expand Down Expand Up @@ -62,25 +62,22 @@ impl TestContext {
fn has_message(&self, to: PeerId, message: Message) -> bool {
use substrate_network::generic_message::Message as GenericMessage;

let encoded = ::serde_json::to_vec(&message).unwrap();
let encoded = message.encode();
self.messages.iter().any(|&(ref peer, ref msg)| match msg {
GenericMessage::ChainSpecific(ref data) => peer == &to && data == &encoded,
_ => false,
})
}
}

fn make_status(status: &Status, roles: Vec<Role>) -> FullStatus {
fn make_status(status: &Status, roles: Roles) -> FullStatus {
FullStatus {
version: 1,
roles,
best_number: 0,
best_hash: Default::default(),
genesis_hash: Default::default(),
chain_status: status.encode(),
parachain_id: None,
validator_id: None,
validator_signature: None,
}
}

Expand All @@ -97,7 +94,7 @@ fn make_consensus(parent_hash: Hash, local_key: SessionKey) -> (CurrentConsensus
}

fn on_message(protocol: &mut PolkadotProtocol, ctx: &mut TestContext, from: PeerId, message: Message) {
let encoded = ::serde_json::to_vec(&message).unwrap();
let encoded = message.encode();
protocol.on_message(ctx, from, GenericMessage::ChainSpecific(encoded));
}

Expand All @@ -115,7 +112,7 @@ fn sends_session_key() {

{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_a, make_status(&validator_status, vec![Role::Authority]));
protocol.on_connect(&mut ctx, peer_a, make_status(&validator_status, Roles::AUTHORITY));
assert!(ctx.messages.is_empty());
}

Expand All @@ -129,7 +126,7 @@ fn sends_session_key() {

{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_b, make_status(&collator_status, vec![]));
protocol.on_connect(&mut ctx, peer_b, make_status(&collator_status, Roles::NONE));
assert!(ctx.has_message(peer_b, Message::SessionKey(parent_hash, local_key)));
}
}
Expand Down Expand Up @@ -171,7 +168,7 @@ fn fetches_from_those_with_knowledge() {
// connect peer A
{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_a, make_status(&status, vec![Role::Authority]));
protocol.on_connect(&mut ctx, peer_a, make_status(&status, Roles::AUTHORITY));
assert!(ctx.has_message(peer_a, Message::SessionKey(parent_hash, local_key)));
}

Expand All @@ -187,7 +184,7 @@ fn fetches_from_those_with_knowledge() {
// peer B connects and sends session key. request already assigned to A
{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_b, make_status(&status, vec![Role::Authority]));
protocol.on_connect(&mut ctx, peer_b, make_status(&status, Roles::AUTHORITY));
on_message(&mut protocol, &mut ctx, peer_b, Message::SessionKey(parent_hash, b_key));
assert!(!ctx.has_message(peer_b, Message::RequestBlockData(2, candidate_hash)));

Expand Down Expand Up @@ -220,7 +217,7 @@ fn remove_bad_collator() {

{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_id, make_status(&status, vec![]));
protocol.on_connect(&mut ctx, peer_id, make_status(&status, Roles::NONE));
}

{
Expand Down
Loading