diff --git a/availability-store/src/lib.rs b/availability-store/src/lib.rs index 831a095be4d7..5e7960d141a6 100644 --- a/availability-store/src/lib.rs +++ b/availability-store/src/lib.rs @@ -14,13 +14,17 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! Persistent database for parachain data. +//! Persistent database for parachain data: PoV block data and outgoing messages. +//! +//! This will be written into during the block validation pipeline, and queried +//! by networking code in order to circulate required data and maintain availability +//! of it. use codec::{Encode, Decode}; use kvdb::{KeyValueDB, DBTransaction}; use kvdb_rocksdb::{Database, DatabaseConfig}; use polkadot_primitives::Hash; -use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic}; +use polkadot_primitives::parachain::{Id as ParaId, BlockData, Message}; use log::warn; use std::collections::HashSet; @@ -42,7 +46,7 @@ pub struct Config { pub path: PathBuf, } -/// Some data to keep available. +/// Some data to keep available about a parachain block candidate. pub struct Data { /// The relay chain parent hash this should be localized to. pub relay_parent: Hash, @@ -52,18 +56,16 @@ pub struct Data { pub candidate_hash: Hash, /// Block data. pub block_data: BlockData, - /// Extrinsic data. - pub extrinsic: Option, + /// Outgoing message queues from execution of the block, if any. + /// + /// The tuple pairs the message queue root and the queue data. + pub outgoing_queues: Option)>>, } fn block_data_key(relay_parent: &Hash, candidate_hash: &Hash) -> Vec { (relay_parent, candidate_hash, 0i8).encode() } -fn extrinsic_key(relay_parent: &Hash, candidate_hash: &Hash) -> Vec { - (relay_parent, candidate_hash, 1i8).encode() -} - /// Handle to the availability store. #[derive(Clone)] pub struct Store { @@ -96,6 +98,16 @@ impl Store { } /// Make some data available provisionally. + /// + /// Validators with the responsibility of maintaining availability + /// for a block or collators collating a block will call this function + /// in order to persist that data to disk and so it can be queried and provided + /// to other nodes in the network. + /// + /// The message data of `Data` is optional but is expected + /// to be present with the exception of the case where there is no message data + /// due to the block's invalidity. Determination of invalidity is beyond the + /// scope of this function. pub fn make_available(&self, data: Data) -> io::Result<()> { let mut tx = DBTransaction::new(); @@ -118,12 +130,16 @@ impl Store { data.block_data.encode() ); - if let Some(extrinsic) = data.extrinsic { - tx.put_vec( - columns::DATA, - extrinsic_key(&data.relay_parent, &data.candidate_hash).as_slice(), - extrinsic.encode(), - ); + if let Some(outgoing_queues) = data.outgoing_queues { + // This is kept forever and not pruned. + for (root, messages) in outgoing_queues { + tx.put_vec( + columns::DATA, + root.as_ref(), + messages.encode(), + ); + } + } self.inner.write(tx) @@ -146,7 +162,6 @@ impl Store { for candidate_hash in v { if !finalized_candidates.contains(&candidate_hash) { tx.delete(columns::DATA, block_data_key(&parent, &candidate_hash).as_slice()); - tx.delete(columns::DATA, extrinsic_key(&parent, &candidate_hash).as_slice()); } } @@ -168,12 +183,11 @@ impl Store { } } - /// Query extrinsic data. - pub fn extrinsic(&self, relay_parent: Hash, candidate_hash: Hash) -> Option { - let encoded_key = extrinsic_key(&relay_parent, &candidate_hash); - match self.inner.get(columns::DATA, &encoded_key[..]) { + /// Query message queue data by message queue root hash. + pub fn queue_by_root(&self, queue_root: &Hash) -> Option> { + match self.inner.get(columns::DATA, queue_root.as_ref()) { Ok(Some(raw)) => Some( - Extrinsic::decode(&mut &raw[..]).expect("all stored data serialized correctly; qed") + <_>::decode(&mut &raw[..]).expect("all stored data serialized correctly; qed") ), Ok(None) => None, Err(e) => { @@ -207,7 +221,7 @@ mod tests { parachain_id: para_id_1, candidate_hash: candidate_1, block_data: block_data_1.clone(), - extrinsic: Some(Extrinsic { outgoing_messages: Vec::new() }), + outgoing_queues: None, }).unwrap(); store.make_available(Data { @@ -215,21 +229,53 @@ mod tests { parachain_id: para_id_2, candidate_hash: candidate_2, block_data: block_data_2.clone(), - extrinsic: Some(Extrinsic { outgoing_messages: Vec::new() }), + outgoing_queues: None, }).unwrap(); assert_eq!(store.block_data(relay_parent, candidate_1).unwrap(), block_data_1); assert_eq!(store.block_data(relay_parent, candidate_2).unwrap(), block_data_2); - assert!(store.extrinsic(relay_parent, candidate_1).is_some()); - assert!(store.extrinsic(relay_parent, candidate_2).is_some()); - store.candidates_finalized(relay_parent, [candidate_1].iter().cloned().collect()).unwrap(); assert_eq!(store.block_data(relay_parent, candidate_1).unwrap(), block_data_1); assert!(store.block_data(relay_parent, candidate_2).is_none()); + } + + #[test] + fn queues_available_by_queue_root() { + let relay_parent = [1; 32].into(); + let para_id = 5.into(); + let candidate = [2; 32].into(); + let block_data = BlockData(vec![1, 2, 3]); + + let message_queue_root_1 = [0x42; 32].into(); + let message_queue_root_2 = [0x43; 32].into(); - assert!(store.extrinsic(relay_parent, candidate_1).is_some()); - assert!(store.extrinsic(relay_parent, candidate_2).is_none()); + let message_a = Message(vec![1, 2, 3, 4]); + let message_b = Message(vec![4, 5, 6, 7]); + + let outgoing_queues = vec![ + (message_queue_root_1, vec![message_a.clone()]), + (message_queue_root_2, vec![message_b.clone()]), + ]; + + let store = Store::new_in_memory(); + store.make_available(Data { + relay_parent, + parachain_id: para_id, + candidate_hash: candidate, + block_data: block_data.clone(), + outgoing_queues: Some(outgoing_queues), + }).unwrap(); + + assert_eq!( + store.queue_by_root(&message_queue_root_1), + Some(vec![message_a]), + ); + + assert_eq!( + store.queue_by_root(&message_queue_root_2), + Some(vec![message_b]), + ); } } diff --git a/collator/src/lib.rs b/collator/src/lib.rs index f739f3060e0e..995f68f7557f 100644 --- a/collator/src/lib.rs +++ b/collator/src/lib.rs @@ -57,7 +57,7 @@ use primitives::Pair; use polkadot_primitives::{ BlockId, Hash, Block, parachain::{ - self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic, + self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, OutgoingMessages, PoVBlock, Status as ParachainStatus, ValidatorId, CollatorPair, } }; @@ -65,8 +65,8 @@ use polkadot_cli::{ Worker, IntoExit, ProvideRuntimeApi, TaskExecutor, AbstractService, CustomConfiguration, ParachainHost, }; -use polkadot_network::validation::{SessionParams, ValidationNetwork}; -use polkadot_network::{NetworkService, PolkadotProtocol}; +use polkadot_network::validation::{LeafWorkParams, ValidationNetwork}; +use polkadot_network::{PolkadotNetworkService, PolkadotProtocol}; use tokio::timer::Timeout; pub use polkadot_cli::VersionInfo; @@ -91,7 +91,7 @@ pub trait Network: Send + Sync { fn checked_statements(&self, relay_parent: Hash) -> Box>; } -impl Network for ValidationNetwork where +impl Network for ValidationNetwork where P: 'static + Send + Sync, E: 'static + Send + Sync, { @@ -142,7 +142,7 @@ pub trait BuildParachainContext { /// This can be implemented through an externally attached service or a stub. /// This is expected to be a lightweight, shared type like an Arc. pub trait ParachainContext: Clone { - type ProduceCandidate: IntoFuture; + type ProduceCandidate: IntoFuture; /// Produce a candidate, given the relay parent hash, the latest ingress queue information /// and the last parachain head. @@ -177,7 +177,7 @@ pub fn collate<'a, R, P>( para_context: P, key: Arc, ) - -> impl Future> + 'a + -> impl Future> + 'a where R: RelayChainContext, R::Error: 'a, @@ -197,11 +197,11 @@ pub fn collate<'a, R, P>( .map(move |x| (ingress, x)) .map_err(Error::Collator) }) - .and_then(move |(ingress, (block_data, head_data, mut extrinsic))| { + .and_then(move |(ingress, (block_data, head_data, mut outgoing))| { let block_data_hash = block_data.hash(); let signature = key.sign(block_data_hash.as_ref()).into(); let egress_queue_roots = - polkadot_validation::egress_roots(&mut extrinsic.outgoing_messages); + polkadot_validation::egress_roots(&mut outgoing.outgoing_messages); let receipt = parachain::CandidateReceipt { parachain_index: local_id, @@ -214,19 +214,21 @@ pub fn collate<'a, R, P>( upward_messages: Vec::new(), }; - Ok(parachain::Collation { + let collation = parachain::Collation { receipt, pov: PoVBlock { block_data, ingress, }, - }) + }; + + Ok((collation, outgoing)) }) } /// Polkadot-api context. struct ApiContext { - network: Arc>, + network: Arc>, parent_hash: Hash, validators: Vec, } @@ -243,7 +245,7 @@ impl RelayChainContext for ApiContext where // TODO: https://github.com/paritytech/polkadot/issues/253 // // Fetch ingress and accumulate all unrounted egress - let _session = self.network.instantiate_session(SessionParams { + let _session = self.network.instantiate_leaf_work(LeafWorkParams { local_session_key: None, parent_hash: self.parent_hash, authorities: self.validators.clone(), @@ -303,26 +305,28 @@ impl Worker for CollationNode where return Box::new(future::err(())); }; + let is_known = move |block_hash: &Hash| { + use client::BlockStatus; + use polkadot_network::gossip::Known; + + match known_oracle.block_status(&BlockId::hash(*block_hash)) { + Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None, + Ok(BlockStatus::KnownBad) => Some(Known::Bad), + Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => + match select_chain.leaves() { + Err(_) => None, + Ok(leaves) => if leaves.contains(block_hash) { + Some(Known::Leaf) + } else { + Some(Known::Old) + }, + } + } + }; + let message_validator = polkadot_network::gossip::register_validator( network.clone(), - move |block_hash: &Hash| { - use client::BlockStatus; - use polkadot_network::gossip::Known; - - match known_oracle.block_status(&BlockId::hash(*block_hash)) { - Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None, - Ok(BlockStatus::KnownBad) => Some(Known::Bad), - Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => - match select_chain.leaves() { - Err(_) => None, - Ok(leaves) => if leaves.contains(block_hash) { - Some(Known::Leaf) - } else { - Some(Known::Old) - }, - } - } - }, + (is_known, client.clone()), ); let validation_network = Arc::new(ValidationNetwork::new( @@ -386,13 +390,20 @@ impl Worker for CollationNode where context, parachain_context, key, - ).map(move |collation| { - network.with_spec(move |spec, ctx| spec.add_local_collation( - ctx, - relay_parent, - targets, - collation, - )); + ).map(move |(collation, outgoing)| { + network.with_spec(move |spec, ctx| { + let res = spec.add_local_collation( + ctx, + relay_parent, + targets, + collation, + outgoing, + ); + + if let Err(e) = res { + warn!("Unable to broadcast local collation: {:?}", e); + } + }) }); future::Either::B(collation_work) @@ -450,7 +461,7 @@ pub fn run_collator( #[cfg(test)] mod tests { use std::collections::HashMap; - use polkadot_primitives::parachain::{OutgoingMessage, FeeSchedule}; + use polkadot_primitives::parachain::{TargetedMessage, FeeSchedule}; use keyring::Sr25519Keyring; use super::*; @@ -475,20 +486,20 @@ mod tests { struct DummyParachainContext; impl ParachainContext for DummyParachainContext { - type ProduceCandidate = Result<(BlockData, HeadData, Extrinsic), InvalidHead>; + type ProduceCandidate = Result<(BlockData, HeadData, OutgoingMessages), InvalidHead>; fn produce_candidate>( &self, _relay_parent: Hash, _status: ParachainStatus, ingress: I, - ) -> Result<(BlockData, HeadData, Extrinsic), InvalidHead> { + ) -> Result<(BlockData, HeadData, OutgoingMessages), InvalidHead> { // send messages right back. Ok(( BlockData(vec![1, 2, 3, 4, 5,]), HeadData(vec![9, 9, 9]), - Extrinsic { - outgoing_messages: ingress.into_iter().map(|(id, msg)| OutgoingMessage { + OutgoingMessages { + outgoing_messages: ingress.into_iter().map(|(id, msg)| TargetedMessage { target: id, data: msg.0, }).collect(), @@ -542,7 +553,7 @@ mod tests { context.clone(), DummyParachainContext, Arc::new(Sr25519Keyring::Alice.pair().into()), - ).wait().unwrap(); + ).wait().unwrap().0; // ascending order by root. assert_eq!(collation.receipt.egress_queue_roots, vec![(a, root_a), (b, root_b)]); diff --git a/erasure-coding/src/lib.rs b/erasure-coding/src/lib.rs index 6460c45794e9..cd89bdfc7491 100644 --- a/erasure-coding/src/lib.rs +++ b/erasure-coding/src/lib.rs @@ -27,7 +27,7 @@ use codec::{Encode, Decode}; use reed_solomon::galois_16::{self, ReedSolomon}; use primitives::{Hash as H256, BlakeTwo256, HashT}; -use primitives::parachain::{BlockData, Extrinsic}; +use primitives::parachain::{BlockData, OutgoingMessages}; use substrate_primitives::Blake2Hasher; use trie::{EMPTY_PREFIX, MemoryDB, Trie, TrieMut, trie_types::{TrieDBMut, TrieDB}}; @@ -124,11 +124,11 @@ fn code_params(n_validators: usize) -> Result { /// Obtain erasure-coded chunks, one for each validator. /// /// Works only up to 65536 validators, and `n_validators` must be non-zero. -pub fn obtain_chunks(n_validators: usize, block_data: &BlockData, extrinsic: &Extrinsic) +pub fn obtain_chunks(n_validators: usize, block_data: &BlockData, outgoing: &OutgoingMessages) -> Result>, Error> { let params = code_params(n_validators)?; - let encoded = (block_data, extrinsic).encode(); + let encoded = (block_data, outgoing).encode(); if encoded.is_empty() { return Err(Error::BadPayload); @@ -150,7 +150,7 @@ pub fn obtain_chunks(n_validators: usize, block_data: &BlockData, extrinsic: &Ex /// /// Works only up to 65536 validators, and `n_validators` must be non-zero. pub fn reconstruct<'a, I: 'a>(n_validators: usize, chunks: I) - -> Result<(BlockData, Extrinsic), Error> + -> Result<(BlockData, OutgoingMessages), Error> where I: IntoIterator { let params = code_params(n_validators)?; @@ -399,7 +399,7 @@ mod tests { #[test] fn round_trip_block_data() { let block_data = BlockData((0..255).collect()); - let ex = Extrinsic { outgoing_messages: Vec::new() }; + let ex = OutgoingMessages { outgoing_messages: Vec::new() }; let chunks = obtain_chunks( 10, &block_data, @@ -428,7 +428,7 @@ mod tests { let chunks = obtain_chunks( 10, &block_data, - &Extrinsic { outgoing_messages: Vec::new() }, + &OutgoingMessages { outgoing_messages: Vec::new() }, ).unwrap(); let chunks: Vec<_> = chunks.iter().map(|c| &c[..]).collect(); diff --git a/network/Cargo.toml b/network/Cargo.toml index 9791a97c23f7..2675f73842c9 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -18,7 +18,7 @@ sr-primitives = { git = "https://github.com/paritytech/substrate", branch = "pol futures = "0.1" log = "0.4" exit-future = "0.1.4" +substrate-client = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } [dev-dependencies] -substrate-client = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } substrate-keyring = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } diff --git a/network/src/gossip.rs b/network/src/gossip.rs index 84caa6eaeff5..1f6c5382c55e 100644 --- a/network/src/gossip.rs +++ b/network/src/gossip.rs @@ -14,37 +14,85 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! Gossip messages and the message validator - +//! Gossip messages and the message validator. +//! +//! At the moment, this module houses 2 gossip protocols central to Polkadot. +//! +//! The first is the attestation-gossip system, which aims to circulate parachain +//! candidate attestations by validators at leaves of the block-DAG. +//! +//! The second is the inter-chain message queue routing gossip, which aims to +//! circulate message queues between parachains, which remain un-routed as of +//! recent leaves. +//! +//! These gossip systems do not have any form of sybil-resistance in terms +//! of the nodes which can participate. It could be imposed e.g. by limiting only to +//! validators, but this would prevent message queues from getting into the hands +//! of collators and of attestations from getting into the hands of fishermen. +//! As such, we take certain precautions which allow arbitrary full nodes to +//! join the gossip graph, as well as validators (who are likely to be well-connected +//! amongst themselves). +//! +//! The first is the notion of a neighbor packet. This is a packet sent between +//! neighbors of the gossip graph to inform each other of their current protocol +//! state. As of this writing, for both attestation and message-routing gossip, +//! the only necessary information here is a (length-limited) set of perceived +//! leaves of the block-DAG. +//! +//! These leaves can be used to derive what information a node is willing to accept +//! There is typically an unbounded amount of possible "future" information relative to +//! any protocol state. For example, attestations or unrouted message queues from millions +//! of blocks after a known protocol state. The neighbor packet is meant to avoid being +//! spammed by illegitimate future information, while informing neighbors of when +//! previously-future and now current gossip messages would be accepted. +//! +//! Peers who send information which was not allowed under a recent neighbor packet +//! will be noted as non-beneficial to Substrate's peer-set management utility. + +use sr_primitives::{generic::BlockId, traits::ProvideRuntimeApi}; +use substrate_client::error::Error as ClientError; use substrate_network::{config::Roles, PeerId}; use substrate_network::consensus_gossip::{ self as network_gossip, ValidationResult as GossipValidationResult, ValidatorContext, MessageIntent, ConsensusMessage, }; -use polkadot_validation::{GenericStatement, SignedStatement}; -use polkadot_primitives::{Block, Hash, parachain::{ValidatorIndex, ValidatorId}}; +use polkadot_validation::SignedStatement; +use polkadot_primitives::{Block, Hash}; +use polkadot_primitives::parachain::{ParachainHost, ValidatorId, Message as ParachainMessage}; use codec::{Decode, Encode}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; +use arrayvec::ArrayVec; use parking_lot::RwLock; use log::warn; -use super::NetworkService; +use super::PolkadotNetworkService; use crate::router::attestation_topic; +use attestation::{View as AttestationView, PeerData as AttestationPeerData}; +use message_routing::{View as MessageRoutingView}; + +mod attestation; +mod message_routing; + /// The engine ID of the polkadot attestation system. -pub const POLKADOT_ENGINE_ID: sr_primitives::ConsensusEngineId = [b'd', b'o', b't', b'1']; +pub const POLKADOT_ENGINE_ID: sr_primitives::ConsensusEngineId = *b"dot1"; // arbitrary; in practice this should not be more than 2. -const MAX_CHAIN_HEADS: usize = 5; +pub(crate) const MAX_CHAIN_HEADS: usize = 5; + +/// Type alias for a bounded vector of leaves. +pub type LeavesVec = ArrayVec<[Hash; MAX_CHAIN_HEADS]>; mod benefit { /// When a peer sends us a previously-unknown candidate statement. pub const NEW_CANDIDATE: i32 = 100; /// When a peer sends us a previously-unknown attestation. pub const NEW_ATTESTATION: i32 = 50; + /// When a peer sends us a previously-unknown message packet. + pub const NEW_ICMP_MESSAGES: i32 = 50; } mod cost { @@ -60,6 +108,15 @@ mod cost { pub const BAD_SIGNATURE: i32 = -500; /// A peer sent us a bad neighbor packet. pub const BAD_NEIGHBOR_PACKET: i32 = -300; + /// A peer sent us an ICMP queue we haven't advertised a need for. + pub const UNNEEDED_ICMP_MESSAGES: i32 = -100; + + /// A peer sent us an ICMP queue with a bad root. + pub fn icmp_messages_root_mismatch(n_messages: usize) -> i32 { + const PER_MESSAGE: i32 = -150; + + (0..n_messages).map(|_| PER_MESSAGE).sum() + } } /// A gossip message. @@ -72,35 +129,84 @@ pub enum GossipMessage { /// Non-candidate statements should only be sent to peers who are aware of the candidate. #[codec(index = "2")] Statement(GossipStatement), + /// A packet of messages from one parachain to another. + #[codec(index = "3")] + ParachainMessages(GossipParachainMessages), // TODO: https://github.com/paritytech/polkadot/issues/253 // erasure-coded chunks. } +impl GossipMessage { + fn to_consensus_message(&self) -> ConsensusMessage { + ConsensusMessage { + data: self.encode(), + engine_id: POLKADOT_ENGINE_ID, + } + } +} + +impl From for GossipMessage { + fn from(packet: NeighborPacket) -> Self { + GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet)) + } +} + impl From for GossipMessage { fn from(stmt: GossipStatement) -> Self { GossipMessage::Statement(stmt) } } +impl From for GossipMessage { + fn from(messages: GossipParachainMessages) -> Self { + GossipMessage::ParachainMessages(messages) + } +} + /// A gossip message containing a statement. #[derive(Encode, Decode, Clone)] pub struct GossipStatement { - /// The relay chain parent hash. - pub relay_parent: Hash, + /// The block hash of the relay chain being referred to. In context, this should + /// be a leaf. + pub relay_chain_leaf: Hash, /// The signed statement being gossipped. pub signed_statement: SignedStatement, } impl GossipStatement { /// Create a new instance. - pub fn new(relay_parent: Hash, signed_statement: SignedStatement) -> Self { + pub fn new(relay_chain_leaf: Hash, signed_statement: SignedStatement) -> Self { Self { - relay_parent, + relay_chain_leaf, signed_statement, } } } +/// A packet of messages from one parachain to another. +/// +/// These are all the messages posted from one parachain to another during the +/// execution of a single parachain block. Since this parachain block may have been +/// included in many forks of the relay chain, there is no relay-chain leaf parameter. +#[derive(Encode, Decode, Clone)] +pub struct GossipParachainMessages { + /// The root of the message queue. + pub queue_root: Hash, + /// The messages themselves. + pub messages: Vec, +} + +impl GossipParachainMessages { + // confirms that the queue-root in the struct correctly matches + // the messages. + fn queue_root_is_correct(&self) -> bool { + let root = polkadot_validation::message_queue_root( + self.messages.iter().map(|m| &m.0) + ); + root == self.queue_root + } +} + /// A versioned neighbor message. #[derive(Encode, Decode, Clone)] pub enum VersionedNeighborPacket { @@ -126,28 +232,60 @@ pub enum Known { Bad, } -/// An oracle for known blocks. -pub trait KnownOracle: Send + Sync { +/// Context to the underlying polkadot chain. +pub trait ChainContext: Send + Sync { + /// Provide a closure which is invoked for every unrouted queue hash at a given leaf. + fn leaf_unrouted_roots( + &self, + leaf: &Hash, + with_queue_root: &mut dyn FnMut(&Hash), + ) -> Result<(), ClientError>; + /// whether a block is known. If it's not, returns `None`. fn is_known(&self, block_hash: &Hash) -> Option; } -impl KnownOracle for F where F: Fn(&Hash) -> Option + Send + Sync { +impl ChainContext for (F, P) where + F: Fn(&Hash) -> Option + Send + Sync, + P: Send + Sync + std::ops::Deref, + P::Target: ProvideRuntimeApi, + ::Api: ParachainHost, +{ fn is_known(&self, block_hash: &Hash) -> Option { - (self)(block_hash) + (self.0)(block_hash) + } + + fn leaf_unrouted_roots( + &self, + &leaf: &Hash, + with_queue_root: &mut dyn FnMut(&Hash), + ) -> Result<(), ClientError> { + let api = self.1.runtime_api(); + + let leaf_id = BlockId::Hash(leaf); + let active_parachains = api.active_parachains(&leaf_id)?; + + for para_id in active_parachains { + if let Some(ingress) = api.ingress(&leaf_id, para_id, None)? { + for (_height, _from, queue_root) in ingress.iter() { + with_queue_root(queue_root); + } + } + } + + Ok(()) } } /// Register a gossip validator on the network service. -/// -/// This returns a `RegisteredMessageValidator` // NOTE: since RegisteredMessageValidator is meant to be a type-safe proof // that we've actually done the registration, this should be the only way // to construct it outside of tests. -pub fn register_validator( - service: Arc, - oracle: O, -) -> RegisteredMessageValidator { +pub fn register_validator( + service: Arc, + chain: C, +) -> RegisteredMessageValidator +{ let s = service.clone(); let report_handle = Box::new(move |peer: &PeerId, cost_benefit| { s.report_peer(peer.clone(), cost_benefit); @@ -156,8 +294,9 @@ pub fn register_validator( report_handle, inner: RwLock::new(Inner { peers: HashMap::new(), - our_view: Default::default(), - oracle, + attestation_view: Default::default(), + message_routing_view: Default::default(), + chain, }) }); @@ -169,8 +308,42 @@ pub fn register_validator( RegisteredMessageValidator { inner: validator as _ } } +#[derive(PartialEq)] +enum NewLeafAction { + // (who, message) + TargetedMessage(PeerId, ConsensusMessage), + // (topic, message) + Multicast(Hash, ConsensusMessage), +} + +/// Actions to take after noting a new block-DAG leaf. +/// +/// This should be consumed by passing a consensus-gossip handle to `perform`. +#[must_use = "New chain-head gossip actions must be performed"] +pub struct NewLeafActions { + actions: Vec, +} + +impl NewLeafActions { + /// Perform the queued actions, feeding into gossip. + pub fn perform( + self, + gossip: &mut dyn crate::GossipService, + ctx: &mut dyn substrate_network::Context, + ) { + for action in self.actions { + match action { + NewLeafAction::TargetedMessage(who, message) + => gossip.send_message(ctx, &who, message), + NewLeafAction::Multicast(topic, message) + => gossip.multicast(ctx, &topic, message), + } + } + } +} + /// Register a gossip validator for a non-authority node. -pub fn register_non_authority_validator(service: Arc) { +pub fn register_non_authority_validator(service: Arc) { service.with_gossip(|gossip, ctx| gossip.register_validator( ctx, @@ -184,58 +357,92 @@ pub fn register_non_authority_validator(service: Arc) { /// Create this using `register_validator`. #[derive(Clone)] pub struct RegisteredMessageValidator { - inner: Arc>, + inner: Arc>, } impl RegisteredMessageValidator { #[cfg(test)] - pub(crate) fn new_test( - oracle: O, + pub(crate) fn new_test( + chain: C, report_handle: Box, ) -> Self { - let validator = Arc::new(MessageValidator::new_test(oracle, report_handle)); + let validator = Arc::new(MessageValidator::new_test(chain, report_handle)); RegisteredMessageValidator { inner: validator as _ } } - /// Note a live attestation session. This must be removed later with - /// `remove_session`. - pub(crate) fn note_session( + /// Note that we perceive a new leaf of the block-DAG. We will notify our neighbors that + /// we now accept parachain candidate attestations and incoming message queues + /// relevant to this leaf. + pub(crate) fn new_local_leaf( &self, - relay_parent: Hash, + relay_chain_leaf: Hash, validation: MessageValidationData, - send_neighbor_packet: F, - ) { - // add an entry in our_view - // prune any entries from our_view which are no longer leaves + lookup_queue_by_root: impl Fn(&Hash) -> Option>, + ) -> NewLeafActions { + // add an entry in attestation_view + // prune any entries from attestation_view which are no longer leaves let mut inner = self.inner.inner.write(); - inner.our_view.add_session(relay_parent, validation); - { + inner.attestation_view.new_local_leaf(relay_chain_leaf, validation); - let &mut Inner { ref oracle, ref mut our_view, .. } = &mut *inner; - our_view.prune_old_sessions(|parent| match oracle.is_known(parent) { + let mut actions = Vec::new(); + + { + let &mut Inner { + ref chain, + ref mut attestation_view, + ref mut message_routing_view, + .. + } = &mut *inner; + + attestation_view.prune_old_leaves(|hash| match chain.is_known(hash) { Some(Known::Leaf) => true, _ => false, }); + + if let Err(e) = message_routing_view.update_leaves(chain, attestation_view.neighbor_info()) { + warn!("Unable to fully update leaf-state: {:?}", e); + } } + // send neighbor packets to peers - inner.multicast_neighbor_packet(send_neighbor_packet); + inner.multicast_neighbor_packet( + |who, message| actions.push(NewLeafAction::TargetedMessage(who.clone(), message)) + ); + + // feed any new unrouted queues into the propagation pool. + inner.message_routing_view.sweep_unknown_queues(|topic, queue_root| + match lookup_queue_by_root(queue_root) { + Some(messages) => { + let message = GossipMessage::from(GossipParachainMessages { + queue_root: *queue_root, + messages, + }).to_consensus_message(); + + actions.push(NewLeafAction::Multicast(*topic, message)); + + true + } + None => false, + } + ); + + NewLeafActions { actions } } } -/// The data needed for validating gossip. +/// The data needed for validating gossip messages. #[derive(Default)] pub(crate) struct MessageValidationData { - /// The authorities at a block. + /// The authorities' parachain validation keys at a block. pub(crate) authorities: Vec, - /// Mapping from validator index to `ValidatorId`. - pub(crate) index_mapping: HashMap, } impl MessageValidationData { - fn check_statement(&self, relay_parent: &Hash, statement: &SignedStatement) -> Result<(), ()> { - let sender = match self.index_mapping.get(&statement.sender) { + // check a statement's signature. + fn check_statement(&self, relay_chain_leaf: &Hash, statement: &SignedStatement) -> Result<(), ()> { + let sender = match self.authorities.get(statement.sender as usize) { Some(val) => val, None => return Err(()), }; @@ -245,7 +452,7 @@ impl MessageValidationData { &statement.statement, &statement.signature, sender.clone(), - relay_parent, + relay_chain_leaf, ); if good { @@ -256,157 +463,25 @@ impl MessageValidationData { } } -// knowledge about attestations on a single parent-hash. #[derive(Default)] -struct Knowledge { - candidates: HashSet, -} - -impl Knowledge { - // whether the peer is aware of a candidate with given hash. - fn is_aware_of(&self, candidate_hash: &Hash) -> bool { - self.candidates.contains(candidate_hash) - } - - // note that the peer is aware of a candidate with given hash. - fn note_aware(&mut self, candidate_hash: Hash) { - self.candidates.insert(candidate_hash); - } -} - struct PeerData { - live: HashMap, + attestation: AttestationPeerData, } impl PeerData { - fn knowledge_at_mut(&mut self, parent_hash: &Hash) -> Option<&mut Knowledge> { - self.live.get_mut(parent_hash) + fn leaves(&self) -> impl Iterator { + self.attestation.leaves() } } -struct OurView { - live_sessions: Vec<(Hash, SessionView)>, - topics: HashMap, // maps topic hashes to block hashes. -} - -impl Default for OurView { - fn default() -> Self { - OurView { - live_sessions: Vec::with_capacity(MAX_CHAIN_HEADS), - topics: Default::default(), - } - } -} - -impl OurView { - fn session_view(&self, relay_parent: &Hash) -> Option<&SessionView> { - self.live_sessions.iter() - .find_map(|&(ref h, ref sesh)| if h == relay_parent { Some(sesh) } else { None } ) - } - - fn session_view_mut(&mut self, relay_parent: &Hash) -> Option<&mut SessionView> { - self.live_sessions.iter_mut() - .find_map(|&mut (ref h, ref mut sesh)| if h == relay_parent { Some(sesh) } else { None } ) - } - - fn add_session(&mut self, relay_parent: Hash, validation_data: MessageValidationData) { - self.live_sessions.push(( - relay_parent, - SessionView { - validation_data, - knowledge: Default::default(), - }, - )); - self.topics.insert(attestation_topic(relay_parent), relay_parent); - } - - fn prune_old_sessions bool>(&mut self, is_leaf: F) { - let live_sessions = &mut self.live_sessions; - live_sessions.retain(|&(ref relay_parent, _)| is_leaf(relay_parent)); - self.topics.retain(|_, v| live_sessions.iter().find(|(p, _)| p == v).is_some()); - } - - fn knows_topic(&self, topic: &Hash) -> bool { - self.topics.contains_key(topic) - } - - fn topic_block(&self, topic: &Hash) -> Option<&Hash> { - self.topics.get(topic) - } - - fn neighbor_info(&self) -> Vec { - self.live_sessions.iter().take(MAX_CHAIN_HEADS).map(|(p, _)| p.clone()).collect() - } -} - -struct SessionView { - validation_data: MessageValidationData, - knowledge: Knowledge, -} - -struct Inner { +struct Inner { peers: HashMap, - our_view: OurView, - oracle: O, + attestation_view: AttestationView, + message_routing_view: MessageRoutingView, + chain: C, } -impl Inner { - fn validate_statement(&mut self, message: GossipStatement) - -> (GossipValidationResult, i32) - { - // message must reference one of our chain heads and one - // if message is not a `Candidate` we should have the candidate available - // in `our_view`. - match self.our_view.session_view(&message.relay_parent) { - None => { - let cost = match self.oracle.is_known(&message.relay_parent) { - Some(Known::Leaf) => { - warn!( - target: "network", - "Leaf block {} not considered live for attestation", - message.relay_parent, - ); - - 0 - } - Some(Known::Old) => cost::PAST_MESSAGE, - _ => cost::FUTURE_MESSAGE, - }; - - (GossipValidationResult::Discard, cost) - } - Some(view) => { - // first check that we are capable of receiving this message - // in a DoS-proof manner. - let benefit = match message.signed_statement.statement { - GenericStatement::Candidate(_) => benefit::NEW_CANDIDATE, - GenericStatement::Valid(ref h) | GenericStatement::Invalid(ref h) => { - if !view.knowledge.is_aware_of(h) { - let cost = cost::ATTESTATION_NO_CANDIDATE; - return (GossipValidationResult::Discard, cost); - } - - benefit::NEW_ATTESTATION - } - }; - - // validate signature. - let res = view.validation_data.check_statement( - &message.relay_parent, - &message.signed_statement, - ); - - match res { - Ok(()) => { - let topic = attestation_topic(message.relay_parent); - (GossipValidationResult::ProcessAndKeep(topic), benefit) - } - Err(()) => (GossipValidationResult::Discard, cost::BAD_SIGNATURE), - } - } - } - } - +impl Inner { fn validate_neighbor_packet(&mut self, sender: &PeerId, packet: NeighborPacket) -> (GossipValidationResult, i32, Vec) { @@ -414,16 +489,20 @@ impl Inner { if chain_heads.len() > MAX_CHAIN_HEADS { (GossipValidationResult::Discard, cost::BAD_NEIGHBOR_PACKET, Vec::new()) } else { - let mut new_topics = Vec::new(); - if let Some(ref mut peer) = self.peers.get_mut(sender) { - peer.live.retain(|k, _| chain_heads.contains(k)); - for head in chain_heads { - peer.live.entry(head).or_insert_with(|| { - new_topics.push(attestation_topic(head)); - Default::default() - }); - } - } + let chain_heads: LeavesVec = chain_heads.into_iter().collect(); + let new_topics = if let Some(ref mut peer) = self.peers.get_mut(sender) { + let new_leaves = peer.attestation.update_leaves(&chain_heads); + let new_attestation_topics = new_leaves.iter().cloned().map(attestation_topic); + + // find all topics which are from the intersection of our leaves with the peer's + // new leaves. + let new_message_routing_topics = self.message_routing_view.intersection_topics(&new_leaves); + + new_attestation_topics.chain(new_message_routing_topics).collect() + } else { + Vec::new() + }; + (GossipValidationResult::Discard, 0, new_topics) } } @@ -432,40 +511,36 @@ impl Inner { &self, mut send_neighbor_packet: F, ) { - let neighbor_packet = GossipMessage::Neighbor(VersionedNeighborPacket::V1(NeighborPacket { - chain_heads: self.our_view.neighbor_info() - })); - - let message = ConsensusMessage { - data: neighbor_packet.encode(), - engine_id: POLKADOT_ENGINE_ID, - }; + let neighbor_packet = GossipMessage::from(NeighborPacket { + chain_heads: self.attestation_view.neighbor_info().collect(), + }).to_consensus_message(); for peer in self.peers.keys() { - send_neighbor_packet(peer, message.clone()) + send_neighbor_packet(peer, neighbor_packet.clone()) } } } /// An unregistered message validator. Register this with `register_validator`. -pub struct MessageValidator { +pub struct MessageValidator { report_handle: Box, - inner: RwLock>, + inner: RwLock>, } -impl MessageValidator { +impl MessageValidator { #[cfg(test)] fn new_test( - oracle: O, + chain: C, report_handle: Box, - ) -> Self where O: Sized{ + ) -> Self where C: Sized { MessageValidator { report_handle, inner: RwLock::new(Inner { peers: HashMap::new(), - our_view: Default::default(), - oracle, - }) + attestation_view: Default::default(), + message_routing_view: Default::default(), + chain, + }), } } @@ -474,12 +549,10 @@ impl MessageValidator { } } -impl network_gossip::Validator for MessageValidator { +impl network_gossip::Validator for MessageValidator { fn new_peer(&self, _context: &mut dyn ValidatorContext, who: &PeerId, _roles: Roles) { let mut inner = self.inner.write(); - inner.peers.insert(who.clone(), PeerData { - live: HashMap::new(), - }); + inner.peers.insert(who.clone(), PeerData::default()); } fn peer_disconnected(&self, _context: &mut dyn ValidatorContext, who: &PeerId) { @@ -487,10 +560,11 @@ impl network_gossip::Validator for MessageValida inner.peers.remove(who); } - fn validate(&self, context: &mut dyn ValidatorContext, sender: &PeerId, mut data: &[u8]) + fn validate(&self, context: &mut dyn ValidatorContext, sender: &PeerId, data: &[u8]) -> GossipValidationResult { - let (res, cost_benefit) = match GossipMessage::decode(&mut data) { + let mut decode_data = data; + let (res, cost_benefit) = match GossipMessage::decode(&mut decode_data) { Err(_) => (GossipValidationResult::Discard, cost::MALFORMED_MESSAGE), Ok(GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet))) => { let (res, cb, topics) = self.inner.write().validate_neighbor_packet(sender, packet); @@ -500,7 +574,24 @@ impl network_gossip::Validator for MessageValida (res, cb) } Ok(GossipMessage::Statement(statement)) => { - let (res, cb) = self.inner.write().validate_statement(statement); + let (res, cb) = { + let mut inner = self.inner.write(); + let inner = &mut *inner; + inner.attestation_view.validate_statement_signature(statement, &inner.chain) + }; + + if let GossipValidationResult::ProcessAndKeep(ref topic) = res { + context.broadcast_message(topic.clone(), data.to_vec(), false); + } + (res, cb) + } + Ok(GossipMessage::ParachainMessages(messages)) => { + let (res, cb) = { + let mut inner = self.inner.write(); + let inner = &mut *inner; + inner.message_routing_view.validate_queue_and_note_known(&messages) + }; + if let GossipValidationResult::ProcessAndKeep(ref topic) = res { context.broadcast_message(topic.clone(), data.to_vec(), false); } @@ -516,61 +607,56 @@ impl network_gossip::Validator for MessageValida let inner = self.inner.read(); Box::new(move |topic, _data| { - // check that topic is one of our live sessions. everything else is expired - !inner.our_view.knows_topic(&topic) + // check that messages from this topic are considered live by one of our protocols. + // everything else is expired + let live = inner.attestation_view.is_topic_live(&topic) + || !inner.message_routing_view.is_topic_live(&topic); + + !live // = expired }) } fn message_allowed<'a>(&'a self) -> Box bool + 'a> { let mut inner = self.inner.write(); Box::new(move |who, intent, topic, data| { - let &mut Inner { ref mut peers, ref mut our_view, .. } = &mut *inner; + let &mut Inner { + ref mut peers, + ref mut attestation_view, + ref mut message_routing_view, + .. + } = &mut *inner; match intent { MessageIntent::PeriodicRebroadcast => return false, _ => {}, } - let relay_parent = match our_view.topic_block(topic) { - None => return false, - Some(hash) => hash.clone(), - }; - - // check that topic is one of our peers' live sessions. - let peer_knowledge = match peers.get_mut(who) - .and_then(|p| p.knowledge_at_mut(&relay_parent)) - { - Some(p) => p, - None => return false, - }; + let attestation_head = attestation_view.topic_block(topic).map(|x| x.clone()); + let peer = peers.get_mut(who); match GossipMessage::decode(&mut &data[..]) { - Ok(GossipMessage::Statement(statement)) => { - let signed = statement.signed_statement; - - match signed.statement { - GenericStatement::Valid(ref h) | GenericStatement::Invalid(ref h) => { - // `valid` and `invalid` statements can only be propagated after - // a candidate message is known by that peer. - if !peer_knowledge.is_aware_of(h) { - return false; - } - } - GenericStatement::Candidate(ref c) => { - // if we are sending a `Candidate` message we should make sure that - // our_view and their_view reflects that we know about the candidate. - let hash = c.hash(); - peer_knowledge.note_aware(hash); - if let Some(our_view) = our_view.session_view_mut(&relay_parent) { - our_view.knowledge.note_aware(hash); - } - } + Ok(GossipMessage::Statement(ref statement)) => { + // to allow statements, we need peer knowledge. + let peer_knowledge = peer.and_then(move |p| attestation_head.map(|r| (p, r))) + .and_then(|(p, r)| p.attestation.knowledge_at_mut(&r).map(|k| (k, r))); + + peer_knowledge.map_or(false, |(knowledge, attestation_head)| { + attestation_view.statement_allowed( + statement, + &attestation_head, + knowledge, + ) + }) + } + Ok(GossipMessage::ParachainMessages(_)) => match peer { + None => false, + Some(peer) => { + let their_leaves: LeavesVec = peer.leaves().cloned().collect(); + message_routing_view.allowed_intersecting(&their_leaves, topic) } } - _ => return false, + _ => false, } - - true }) } } @@ -583,7 +669,11 @@ mod tests { use parking_lot::Mutex; use polkadot_primitives::parachain::{CandidateReceipt, HeadData}; use substrate_primitives::crypto::UncheckedInto; - use substrate_primitives::sr25519::Signature as Sr25519Signature; + use substrate_primitives::sr25519::{Public as Sr25519Public, Signature as Sr25519Signature}; + use polkadot_validation::GenericStatement; + use super::message_routing::queue_topic; + + use crate::tests::TestChainContext; #[derive(PartialEq, Clone, Debug)] enum ContextEvent { @@ -619,14 +709,29 @@ mod tests { } } + impl NewLeafActions { + fn has_message(&self, who: PeerId, message: ConsensusMessage) -> bool { + let x = NewLeafAction::TargetedMessage(who, message); + self.actions.iter().find(|&m| m == &x).is_some() + } + + fn has_multicast(&self, topic: Hash, message: ConsensusMessage) -> bool { + let x = NewLeafAction::Multicast(topic, message); + self.actions.iter().find(|&m| m == &x).is_some() + } + } + + fn validator_id(raw: [u8; 32]) -> ValidatorId { + Sr25519Public::from_raw(raw).into() + } + #[test] fn message_allowed() { let (tx, _rx) = mpsc::channel(); let tx = Mutex::new(tx); - let known_map = HashMap::::new(); let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap()); let validator = MessageValidator::new_test( - move |hash: &Hash| known_map.get(hash).map(|x| x.clone()), + TestChainContext::default(), report_handle, ); @@ -641,9 +746,9 @@ mod tests { let hash_b = [2u8; 32].into(); let hash_c = [3u8; 32].into(); - let message = GossipMessage::Neighbor(VersionedNeighborPacket::V1(NeighborPacket { - chain_heads: vec![hash_a, hash_b], - })).encode(); + let message = GossipMessage::from(NeighborPacket { + chain_heads: vec![hash_a, hash_b], + }).encode(); let res = validator.validate( &mut validator_context, &peer_a, @@ -676,7 +781,7 @@ mod tests { }; let statement = GossipMessage::Statement(GossipStatement { - relay_parent: hash_a, + relay_chain_leaf: hash_a, signed_statement: SignedStatement { statement: GenericStatement::Candidate(candidate_receipt), signature: Sr25519Signature([255u8; 64]).into(), @@ -690,7 +795,7 @@ mod tests { let topic_c = attestation_topic(hash_c); // topic_a is in all 3 views -> succeed - validator.inner.write().our_view.add_session(hash_a, MessageValidationData::default()); + validator.inner.write().attestation_view.new_local_leaf(hash_a, MessageValidationData::default()); // topic_b is in the neighbor's view but not ours -> fail // topic_c is not in either -> fail @@ -706,10 +811,9 @@ mod tests { fn too_many_chain_heads_is_report() { let (tx, rx) = mpsc::channel(); let tx = Mutex::new(tx); - let known_map = HashMap::::new(); let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap()); let validator = MessageValidator::new_test( - move |hash: &Hash| known_map.get(hash).map(|x| x.clone()), + TestChainContext::default(), report_handle, ); @@ -722,9 +826,9 @@ mod tests { let chain_heads = (0..MAX_CHAIN_HEADS+1).map(|i| [i as u8; 32].into()).collect(); - let message = GossipMessage::Neighbor(VersionedNeighborPacket::V1(NeighborPacket { - chain_heads, - })).encode(); + let message = GossipMessage::from(NeighborPacket { + chain_heads, + }).encode(); let res = validator.validate( &mut validator_context, &peer_a, @@ -749,10 +853,9 @@ mod tests { fn statement_only_sent_when_candidate_known() { let (tx, _rx) = mpsc::channel(); let tx = Mutex::new(tx); - let known_map = HashMap::::new(); let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap()); let validator = MessageValidator::new_test( - move |hash: &Hash| known_map.get(hash).map(|x| x.clone()), + TestChainContext::default(), report_handle, ); @@ -766,9 +869,10 @@ mod tests { let hash_a = [1u8; 32].into(); let hash_b = [2u8; 32].into(); - let message = GossipMessage::Neighbor(VersionedNeighborPacket::V1(NeighborPacket { - chain_heads: vec![hash_a, hash_b], - })).encode(); + let message = GossipMessage::from(NeighborPacket { + chain_heads: vec![hash_a, hash_b], + }).encode(); + let res = validator.validate( &mut validator_context, &peer_a, @@ -793,7 +897,7 @@ mod tests { let c_hash = [99u8; 32].into(); let statement = GossipMessage::Statement(GossipStatement { - relay_parent: hash_a, + relay_chain_leaf: hash_a, signed_statement: SignedStatement { statement: GenericStatement::Valid(c_hash), signature: Sr25519Signature([255u8; 64]).into(), @@ -801,7 +905,7 @@ mod tests { } }); let encoded = statement.encode(); - validator.inner.write().our_view.add_session(hash_a, MessageValidationData::default()); + validator.inner.write().attestation_view.new_local_leaf(hash_a, MessageValidationData::default()); { let mut message_allowed = validator.message_allowed(); @@ -814,14 +918,336 @@ mod tests { .peers .get_mut(&peer_a) .unwrap() - .live - .get_mut(&hash_a) - .unwrap() - .note_aware(c_hash); - + .attestation + .note_aware_under_leaf(&hash_a, c_hash); { let mut message_allowed = validator.message_allowed(); assert!(message_allowed(&peer_a, MessageIntent::Broadcast, &topic_a, &encoded[..])); } } + + #[test] + fn multicasts_icmp_queues_when_building_on_new_leaf() { + let (tx, _rx) = mpsc::channel(); + let tx = Mutex::new(tx); + let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap()); + + let hash_a = [1u8; 32].into(); + let root_a = [11u8; 32].into(); + let root_a_topic = queue_topic(root_a); + + let root_a_messages = vec![ + ParachainMessage(vec![1, 2, 3]), + ParachainMessage(vec![4, 5, 6]), + ]; + + let chain = { + let mut chain = TestChainContext::default(); + chain.known_map.insert(hash_a, Known::Leaf); + chain.ingress_roots.insert(hash_a, vec![root_a]); + chain + }; + + let validator = RegisteredMessageValidator::new_test(chain, report_handle); + + let authorities: Vec = vec![validator_id([0; 32]), validator_id([10; 32])]; + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + + let mut validator_context = MockValidatorContext::default(); + validator.inner.new_peer(&mut validator_context, &peer_a, Roles::FULL); + validator.inner.new_peer(&mut validator_context, &peer_b, Roles::FULL); + assert!(validator_context.events.is_empty()); + validator_context.clear(); + + + { + let message = GossipMessage::from(NeighborPacket { + chain_heads: vec![hash_a], + }).encode(); + let res = validator.inner.validate( + &mut validator_context, + &peer_a, + &message[..], + ); + + match res { + GossipValidationResult::Discard => {}, + _ => panic!("wrong result"), + } + assert_eq!( + validator_context.events, + vec![ + ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false), + ], + ); + } + + // ensure that we attempt to multicast all relevant queues after noting a leaf. + { + let actions = validator.new_local_leaf( + hash_a, + MessageValidationData { authorities }, + |root| if root == &root_a { + Some(root_a_messages.clone()) + } else { + None + }, + ); + + assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket { + chain_heads: vec![hash_a], + }).to_consensus_message())); + + assert!(actions.has_multicast(root_a_topic, GossipMessage::from(GossipParachainMessages { + queue_root: root_a, + messages: root_a_messages.clone(), + }).to_consensus_message())); + } + + // ensure that we are allowed to multicast to a peer with same chain head, + // but not to one without. + { + let message = GossipMessage::from(GossipParachainMessages { + queue_root: root_a, + messages: root_a_messages.clone(), + }).encode(); + + let mut allowed = validator.inner.message_allowed(); + assert!(allowed(&peer_a, MessageIntent::Broadcast, &root_a_topic, &message[..])); + assert!(!allowed(&peer_b, MessageIntent::Broadcast, &root_a_topic, &message[..])); + } + } + + #[test] + fn multicasts_icmp_queues_on_neighbor_update() { + let (tx, _rx) = mpsc::channel(); + let tx = Mutex::new(tx); + let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap()); + + let hash_a = [1u8; 32].into(); + let root_a = [11u8; 32].into(); + let root_a_topic = queue_topic(root_a); + + let root_a_messages = vec![ + ParachainMessage(vec![1, 2, 3]), + ParachainMessage(vec![4, 5, 6]), + ]; + + let chain = { + let mut chain = TestChainContext::default(); + chain.known_map.insert(hash_a, Known::Leaf); + chain.ingress_roots.insert(hash_a, vec![root_a]); + chain + }; + + let validator = RegisteredMessageValidator::new_test(chain, report_handle); + + let authorities: Vec = vec![validator_id([0; 32]), validator_id([10; 32])]; + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + + let mut validator_context = MockValidatorContext::default(); + validator.inner.new_peer(&mut validator_context, &peer_a, Roles::FULL); + validator.inner.new_peer(&mut validator_context, &peer_b, Roles::FULL); + assert!(validator_context.events.is_empty()); + validator_context.clear(); + + // ensure that we attempt to multicast all relevant queues after noting a leaf. + { + let actions = validator.new_local_leaf( + hash_a, + MessageValidationData { authorities }, + |root| if root == &root_a { + Some(root_a_messages.clone()) + } else { + None + }, + ); + + assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket { + chain_heads: vec![hash_a], + }).to_consensus_message())); + + assert!(actions.has_multicast(root_a_topic, GossipMessage::from(GossipParachainMessages { + queue_root: root_a, + messages: root_a_messages.clone(), + }).to_consensus_message())); + } + + // ensure that we are not allowed to multicast to either peer, as they + // don't have the chain head. + { + let message = GossipMessage::from(GossipParachainMessages { + queue_root: root_a, + messages: root_a_messages.clone(), + }).encode(); + + let mut allowed = validator.inner.message_allowed(); + assert!(!allowed(&peer_a, MessageIntent::Broadcast, &root_a_topic, &message[..])); + assert!(!allowed(&peer_b, MessageIntent::Broadcast, &root_a_topic, &message[..])); + } + + // peer A gets updated to the chain head. now we'll attempt to broadcast + // all queues to it. + { + let message = GossipMessage::from(NeighborPacket { + chain_heads: vec![hash_a], + }).encode(); + let res = validator.inner.validate( + &mut validator_context, + &peer_a, + &message[..], + ); + + match res { + GossipValidationResult::Discard => {}, + _ => panic!("wrong result"), + } + assert_eq!( + validator_context.events, + vec![ + ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false), + ContextEvent::SendTopic(peer_a.clone(), root_a_topic, false), + ], + ); + } + + // ensure that we are allowed to multicast to a peer with same chain head, + // but not to one without. + { + let message = GossipMessage::from(GossipParachainMessages { + queue_root: root_a, + messages: root_a_messages.clone(), + }).encode(); + + let mut allowed = validator.inner.message_allowed(); + assert!(allowed(&peer_a, MessageIntent::Broadcast, &root_a_topic, &message[..])); + assert!(!allowed(&peer_b, MessageIntent::Broadcast, &root_a_topic, &message[..])); + } + } + + #[test] + fn accepts_needed_unknown_icmp_message_queue() { + let (tx, _rx) = mpsc::channel(); + let tx = Mutex::new(tx); + let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap()); + + let hash_a = [1u8; 32].into(); + let root_a_messages = vec![ + ParachainMessage(vec![1, 2, 3]), + ParachainMessage(vec![4, 5, 6]), + ]; + let not_root_a_messages = vec![ + ParachainMessage(vec![1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), + ParachainMessage(vec![4, 5, 6]), + ]; + + let root_a = polkadot_validation::message_queue_root( + root_a_messages.iter().map(|m| &m.0) + ); + let not_root_a = [69u8; 32].into(); + let root_a_topic = queue_topic(root_a); + + let chain = { + let mut chain = TestChainContext::default(); + chain.known_map.insert(hash_a, Known::Leaf); + chain.ingress_roots.insert(hash_a, vec![root_a]); + chain + }; + + let validator = RegisteredMessageValidator::new_test(chain, report_handle); + + let authorities: Vec = vec![validator_id([0; 32]), validator_id([10; 32])]; + + let peer_a = PeerId::random(); + let mut validator_context = MockValidatorContext::default(); + + validator.inner.new_peer(&mut validator_context, &peer_a, Roles::FULL); + assert!(validator_context.events.is_empty()); + validator_context.clear(); + + let queue_messages = GossipMessage::from(GossipParachainMessages { + queue_root: root_a, + messages: root_a_messages.clone(), + }).to_consensus_message(); + + let not_queue_messages = GossipMessage::from(GossipParachainMessages { + queue_root: root_a, + messages: not_root_a_messages.clone(), + }).encode(); + + let queue_messages_wrong_root = GossipMessage::from(GossipParachainMessages { + queue_root: not_root_a, + messages: root_a_messages.clone(), + }).encode(); + + // ensure that we attempt to multicast all relevant queues after noting a leaf. + { + let actions = validator.new_local_leaf( + hash_a, + MessageValidationData { authorities }, + |_root| None, + ); + + assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket { + chain_heads: vec![hash_a], + }).to_consensus_message())); + + // we don't know this queue! no broadcast :( + assert!(!actions.has_multicast(root_a_topic, queue_messages.clone())); + } + + // rejects right queue with unknown root. + { + let res = validator.inner.validate( + &mut validator_context, + &peer_a, + &queue_messages_wrong_root[..], + ); + + match res { + GossipValidationResult::Discard => {}, + _ => panic!("wrong result"), + } + + assert_eq!(validator_context.events, Vec::new()); + } + + // rejects bad queue. + { + let res = validator.inner.validate( + &mut validator_context, + &peer_a, + ¬_queue_messages[..], + ); + + match res { + GossipValidationResult::Discard => {}, + _ => panic!("wrong result"), + } + + assert_eq!(validator_context.events, Vec::new()); + } + + // accepts the right queue. + { + let res = validator.inner.validate( + &mut validator_context, + &peer_a, + &queue_messages.data[..], + ); + + match res { + GossipValidationResult::ProcessAndKeep(topic) if topic == root_a_topic => {}, + _ => panic!("wrong result"), + } + + assert_eq!(validator_context.events, vec![ + ContextEvent::BroadcastMessage(root_a_topic, queue_messages.data.clone(), false), + ]); + } + } } diff --git a/network/src/gossip/attestation.rs b/network/src/gossip/attestation.rs new file mode 100644 index 000000000000..07bfefe71b37 --- /dev/null +++ b/network/src/gossip/attestation.rs @@ -0,0 +1,264 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Gossip messages and structures for dealing with attestations (statements of +//! validity of invalidity on parachain candidates). +//! +//! This follows the same principles as other gossip modules (see parent +//! documentation for more details) by being aware of our current chain +//! heads and accepting only information relative to them. Attestations are localized to +//! relay chain head, so this is easily doable. +//! +//! This module also provides a filter, so we can only broadcast messages to +//! peers that are relevant to chain heads they have advertised. +//! +//! Furthermore, since attestations are bottlenecked by the `Candidate` statement, +//! we only accept attestations which are themselves `Candidate` messages, or reference +//! a `Candidate` we are aware of. Otherwise, it is possible we could be forced to +//! consider an infinite amount of attestations produced by a misbehaving validator. + +use substrate_network::consensus_gossip::{ValidationResult as GossipValidationResult}; +use polkadot_validation::GenericStatement; +use polkadot_primitives::Hash; + +use std::collections::{HashMap, HashSet}; + +use log::warn; +use crate::router::attestation_topic; + +use super::{cost, benefit, MAX_CHAIN_HEADS, LeavesVec, ChainContext, Known, MessageValidationData, GossipStatement}; + +// knowledge about attestations on a single parent-hash. +#[derive(Default)] +pub(super) struct Knowledge { + candidates: HashSet, +} + +impl Knowledge { + // whether the peer is aware of a candidate with given hash. + fn is_aware_of(&self, candidate_hash: &Hash) -> bool { + self.candidates.contains(candidate_hash) + } + + // note that the peer is aware of a candidate with given hash. this should + // be done after observing an incoming candidate message via gossip. + fn note_aware(&mut self, candidate_hash: Hash) { + self.candidates.insert(candidate_hash); + } +} + +#[derive(Default)] +pub(super) struct PeerData { + live: HashMap, +} + +impl PeerData { + /// Update leaves, returning a list of which leaves are new. + pub(super) fn update_leaves(&mut self, leaves: &LeavesVec) -> LeavesVec { + let mut new = LeavesVec::new(); + self.live.retain(|k, _| leaves.contains(k)); + for &leaf in leaves { + self.live.entry(leaf).or_insert_with(|| { + new.push(leaf); + Default::default() + }); + } + + new + } + + #[cfg(test)] + pub(super) fn note_aware_under_leaf(&mut self, relay_chain_leaf: &Hash, candidate_hash: Hash) { + if let Some(knowledge) = self.live.get_mut(relay_chain_leaf) { + knowledge.note_aware(candidate_hash); + } + } + + pub(super) fn knowledge_at_mut(&mut self, parent_hash: &Hash) -> Option<&mut Knowledge> { + self.live.get_mut(parent_hash) + } + + /// Get an iterator over all live leaves of this peer. + pub(super) fn leaves(&self) -> impl Iterator { + self.live.keys() + } +} + +/// An impartial view of what topics and data are valid based on attestation session data. +pub(super) struct View { + leaf_work: Vec<(Hash, LeafView)>, // hashes of the best DAG-leaves paired with validation data. + topics: HashMap, // maps topic hashes to block hashes. +} + +impl Default for View { + fn default() -> Self { + View { + leaf_work: Vec::with_capacity(MAX_CHAIN_HEADS), + topics: Default::default(), + } + } +} + +impl View { + fn leaf_view(&self, relay_chain_leaf: &Hash) -> Option<&LeafView> { + self.leaf_work.iter() + .find_map(|&(ref h, ref leaf)| if h == relay_chain_leaf { Some(leaf) } else { None } ) + } + + fn leaf_view_mut(&mut self, relay_chain_leaf: &Hash) -> Option<&mut LeafView> { + self.leaf_work.iter_mut() + .find_map(|&mut (ref h, ref mut leaf)| if h == relay_chain_leaf { Some(leaf) } else { None } ) + } + + /// Get our leaves-set. Guaranteed to have length <= MAX_CHAIN_HEADS. + pub(super) fn neighbor_info<'a>(&'a self) -> impl Iterator + 'a + Clone { + self.leaf_work.iter().take(MAX_CHAIN_HEADS).map(|(p, _)| p.clone()) + } + + /// Note new leaf in our local view and validation data necessary to check signatures + /// of statements issued under this leaf. + /// + /// This will be pruned later on a call to `prune_old_leaves`, when this leaf + /// is not a leaf anymore. + pub(super) fn new_local_leaf(&mut self, relay_chain_leaf: Hash, validation_data: MessageValidationData) { + self.leaf_work.push(( + relay_chain_leaf, + LeafView { + validation_data, + knowledge: Default::default(), + }, + )); + self.topics.insert(attestation_topic(relay_chain_leaf), relay_chain_leaf); + } + + /// Prune old leaf-work that fails the leaf predicate. + pub(super) fn prune_old_leaves bool>(&mut self, is_leaf: F) { + let leaf_work = &mut self.leaf_work; + leaf_work.retain(|&(ref relay_chain_leaf, _)| is_leaf(relay_chain_leaf)); + self.topics.retain(|_, v| leaf_work.iter().find(|(p, _)| p == v).is_some()); + } + + /// Whether a message topic is considered live relative to our view. non-live + /// topics do not pertain to our perceived leaves, and are uninteresting to us. + pub(super) fn is_topic_live(&self, topic: &Hash) -> bool { + self.topics.contains_key(topic) + } + + /// The relay-chain block hash corresponding to a topic. + pub(super) fn topic_block(&self, topic: &Hash) -> Option<&Hash> { + self.topics.get(topic) + } + + + /// Validate the signature on an attestation statement of some kind. Should be done before + /// any repropagation of that statement. + pub(super) fn validate_statement_signature( + &mut self, + message: GossipStatement, + chain: &C, + ) + -> (GossipValidationResult, i32) + { + // message must reference one of our chain heads and + // if message is not a `Candidate` we should have the candidate available + // in `attestation_view`. + match self.leaf_view(&message.relay_chain_leaf) { + None => { + let cost = match chain.is_known(&message.relay_chain_leaf) { + Some(Known::Leaf) => { + warn!( + target: "network", + "Leaf block {} not considered live for attestation", + message.relay_chain_leaf, + ); + + 0 + } + Some(Known::Old) => cost::PAST_MESSAGE, + _ => cost::FUTURE_MESSAGE, + }; + + (GossipValidationResult::Discard, cost) + } + Some(view) => { + // first check that we are capable of receiving this message + // in a DoS-proof manner. + let benefit = match message.signed_statement.statement { + GenericStatement::Candidate(_) => benefit::NEW_CANDIDATE, + GenericStatement::Valid(ref h) | GenericStatement::Invalid(ref h) => { + if !view.knowledge.is_aware_of(h) { + let cost = cost::ATTESTATION_NO_CANDIDATE; + return (GossipValidationResult::Discard, cost); + } + + benefit::NEW_ATTESTATION + } + }; + + // validate signature. + let res = view.validation_data.check_statement( + &message.relay_chain_leaf, + &message.signed_statement, + ); + + match res { + Ok(()) => { + let topic = attestation_topic(message.relay_chain_leaf); + (GossipValidationResult::ProcessAndKeep(topic), benefit) + } + Err(()) => (GossipValidationResult::Discard, cost::BAD_SIGNATURE), + } + } + } + } + + /// whether it's allowed to send a statement to a peer with given knowledge + /// about the relay parent the statement refers to. + pub(super) fn statement_allowed( + &mut self, + statement: &GossipStatement, + relay_chain_leaf: &Hash, + peer_knowledge: &mut Knowledge, + ) -> bool { + let signed = &statement.signed_statement; + + match signed.statement { + GenericStatement::Valid(ref h) | GenericStatement::Invalid(ref h) => { + // `valid` and `invalid` statements can only be propagated after + // a candidate message is known by that peer. + peer_knowledge.is_aware_of(h) + } + GenericStatement::Candidate(ref c) => { + // if we are sending a `Candidate` message we should make sure that + // attestation_view and their_view reflects that we know about the candidate. + let hash = c.hash(); + peer_knowledge.note_aware(hash); + if let Some(attestation_view) = self.leaf_view_mut(&relay_chain_leaf) { + attestation_view.knowledge.note_aware(hash); + } + + // at this point, the peer hasn't seen the message or the candidate + // and has knowledge of the relevant relay-chain parent. + true + } + } + } +} + +struct LeafView { + validation_data: MessageValidationData, + knowledge: Knowledge, +} diff --git a/network/src/gossip/message_routing.rs b/network/src/gossip/message_routing.rs new file mode 100644 index 000000000000..01482e4671f8 --- /dev/null +++ b/network/src/gossip/message_routing.rs @@ -0,0 +1,339 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Data structures and synchronous logic for ICMP message gossip. +//! +//! The parent-module documentation describes some rationale of the general +//! gossip protocol design. +//! +//! The ICMP message-routing gossip works according to those rationale. +//! +//! In this protocol, we perform work under 4 conditions: +//! ### 1. Upon observation of a new leaf in the block-DAG. +//! +//! We first communicate the best leaves to our neighbors in the gossip graph +//! by the means of a neighbor packet. Then, we query to discover the trie roots +//! of all un-routed message queues from the perspective of each of those leaves. +//! +//! For any trie root in the unrouted set for the new leaf, if we have the corresponding +//! queue, we send it to any peers with the new leaf in their latest advertised set. +//! +//! Which parachain those messages go to and from is unimportant, because this is +//! an everybody-sees-everything style protocol. The only important property is "liveness": +//! that the queue root is un-routed at one of the leaves we perceive to be at the head +//! of the block-DAG. +//! +//! In Substrate gossip, every message is associated with a topic. Typically, +//! many messages are grouped under a single topic. In this gossip system, each queue +//! gets its own topic, which is based on the root hash of the queue. This is because +//! many different chain leaves may have the same queue as un-routed, so it's better than +//! attempting to group message packets by the leaf they appear unrouted at. +//! +//! ### 2. Upon a neighbor packet from a peer. +//! +//! The neighbor packet from a peer should contain perceived chain heads of that peer. +//! If there is any overlap between our perceived chain heads and theirs, we send +//! them any known, un-routed message queue from either set. +//! +//! ### 3. Upon receiving a message queue from a peer. +//! +//! If the message queue is in the un-routed set of one of the latest leaves we've updated to, +//! we accept it and relay to any peers who need that queue as well. +//! +//! If not, we report the peer to the peer-set manager for sending us bad data. +//! +//! ### 4. Periodic Pruning +//! +//! We prune messages that are not un-routed from the view of any leaf and cease +//! to attempt to send them to any peer. + +use sr_primitives::traits::{BlakeTwo256, Hash as HashT}; +use polkadot_primitives::Hash; +use std::collections::{HashMap, HashSet}; +use substrate_client::error::Error as ClientError; +use super::{MAX_CHAIN_HEADS, GossipValidationResult, LeavesVec, ChainContext}; + +/// Construct a topic for a message queue root deterministically. +pub fn queue_topic(queue_root: Hash) -> Hash { + let mut v = queue_root.as_ref().to_vec(); + v.extend(b"message_queue"); + + BlakeTwo256::hash(&v[..]) +} + +/// A view of which queue roots are current for a given set of leaves. +#[derive(Default)] +pub struct View { + leaves: LeavesVec, + leaf_topics: HashMap>, // leaf_hash -> { topics } + expected_queues: HashMap, // topic -> (queue-root, known) +} + +impl View { + /// Update the set of current leaves. This is called when we perceive a new bset leaf-set. + pub fn update_leaves(&mut self, context: &T, new_leaves: I) + -> Result<(), ClientError> + where I: Iterator + { + let new_leaves = new_leaves.take(MAX_CHAIN_HEADS); + let old_leaves = std::mem::replace(&mut self.leaves, new_leaves.collect()); + + let expected_queues = &mut self.expected_queues; + let leaves = &self.leaves; + self.leaf_topics.retain(|l, topics| { + if leaves.contains(l) { return true } + + // prune out all data about old leaves we don't follow anymore. + for topic in topics.iter() { + expected_queues.remove(topic); + } + false + }); + + let mut res = Ok(()); + + // add in new data about fresh leaves. + for new_leaf in &self.leaves { + if old_leaves.contains(new_leaf) { continue } + + let mut this_leaf_topics = HashSet::new(); + + let r = context.leaf_unrouted_roots(new_leaf, &mut |&queue_root| { + let topic = queue_topic(queue_root); + this_leaf_topics.insert(topic); + expected_queues.entry(topic).or_insert((queue_root, false)); + }); + + if r.is_err() { + if let Err(e) = res { + log::debug!(target: "message_routing", "Ignored duplicate error {}", e) + }; + res = r; + } + + self.leaf_topics.insert(*new_leaf, this_leaf_topics); + } + + res + } + + /// Validate an incoming message queue against this view. If it is accepted + /// by our view of un-routed message queues, we will keep and re-propagate. + pub fn validate_queue_and_note_known(&mut self, messages: &super::GossipParachainMessages) + -> (GossipValidationResult, i32) + { + let ostensible_topic = queue_topic(messages.queue_root); + match self.expected_queues.get_mut(&ostensible_topic) { + None => (GossipValidationResult::Discard, super::cost::UNNEEDED_ICMP_MESSAGES), + Some(&mut (_, ref mut known)) => { + if !messages.queue_root_is_correct() { + ( + GossipValidationResult::Discard, + super::cost::icmp_messages_root_mismatch(messages.messages.len()), + ) + } else { + *known = true; + ( + GossipValidationResult::ProcessAndKeep(ostensible_topic), + super::benefit::NEW_ICMP_MESSAGES, + ) + } + } + } + } + + /// Whether a message with given topic is live. + pub fn is_topic_live(&self, topic: &Hash) -> bool { + self.expected_queues.get(topic).is_some() + } + + /// Whether a message is allowed under the intersection of the given leaf-set + /// and our own. + pub fn allowed_intersecting(&self, other_leaves: &LeavesVec, topic: &Hash) -> bool { + for i in other_leaves { + for j in &self.leaves { + if i == j { + let leaf_topics = self.leaf_topics.get(i) + .expect("leaf_topics are mutated only in update_leaves; \ + we have an entry for each item in self.leaves; \ + i is in self.leaves; qed"); + + if leaf_topics.contains(topic) { + return true; + } + } + } + } + + false + } + + /// Get topics of all message queues a peer is interested in - this is useful + /// when a peer has informed us of their new best leaves. + pub fn intersection_topics(&self, other_leaves: &LeavesVec) -> impl Iterator { + let deduplicated = other_leaves.iter() + .filter_map(|l| self.leaf_topics.get(l)) + .flat_map(|topics| topics.iter().cloned()) + .collect::>(); + + deduplicated.into_iter() + } + + /// Iterate over all live message queues for which the data is marked as not locally known, + /// calling a closure with `(topic, root)`. The closure will return whether the queue data is + /// unknown. + /// + /// This is called when we should send un-routed message queues that we are + /// newly aware of to peers - as in when we update our leaves. + pub fn sweep_unknown_queues(&mut self, mut check_known: impl FnMut(&Hash, &Hash) -> bool) { + for (topic, &mut (ref queue_root, ref mut known)) in self.expected_queues.iter_mut() { + if !*known { + *known = check_known(topic, queue_root) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tests::TestChainContext; + use crate::gossip::{Known, GossipParachainMessages}; + use polkadot_primitives::parachain::Message as ParachainMessage; + + fn hash(x: u8) -> Hash { + [x; 32].into() + } + + fn message_queue(from: u8, to: u8) -> Option<[[u8; 2]; 1]> { + if from == to { + None + } else { + Some([[from, to]]) + } + } + + fn message_queue_root(from: u8, to: u8) -> Option { + message_queue(from, to).map( + |q| polkadot_validation::message_queue_root(q.iter()) + ) + } + + // check that our view has all of the roots of the message queues + // emitted in the heads identified in `our_heads`, and none of the others. + fn check_roots(view: &mut View, our_heads: &[u8], n_heads: u8) -> bool { + for i in 0..n_heads { + for j in 0..n_heads { + if let Some(messages) = message_queue(i, j) { + let queue_root = message_queue_root(i, j).unwrap(); + let messages = GossipParachainMessages { + queue_root, + messages: messages.iter().map(|m| ParachainMessage(m.to_vec())).collect(), + }; + + let had_queue = match view.validate_queue_and_note_known(&messages).0 { + GossipValidationResult::ProcessAndKeep(topic) => topic == queue_topic(queue_root), + _ => false, + }; + + if our_heads.contains(&i) != had_queue { + return false + } + } + } + } + + true + } + + #[test] + fn update_leaves_none_in_common() { + let mut ctx = TestChainContext::default(); + let n_heads = 5; + + for i in 0..n_heads { + ctx.known_map.insert(hash(i as u8), Known::Leaf); + + let messages_out: Vec<_> = (0..n_heads).filter_map(|j| message_queue_root(i, j)).collect(); + + if !messages_out.is_empty() { + ctx.ingress_roots.insert(hash(i as u8), messages_out); + } + } + + // initialize the view with 2 leaves. + + let mut view = View::default(); + view.update_leaves( + &ctx, + [hash(0), hash(1)].iter().cloned(), + ).unwrap(); + + // we should have all queue roots that were + // un-routed from the perspective of those 2 + // leaves and no others. + + assert!(check_roots(&mut view, &[0, 1], n_heads)); + + // after updating to a disjoint set, + // the property that we are aware of all un-routed + // from the perspective of our known leaves should + // remain the same. + + view.update_leaves( + &ctx, + [hash(2), hash(3), hash(4)].iter().cloned(), + ).unwrap(); + + assert!(check_roots(&mut view, &[2, 3, 4], n_heads)); + } + + #[test] + fn update_leaves_overlapping() { + let mut ctx = TestChainContext::default(); + let n_heads = 5; + + for i in 0..n_heads { + ctx.known_map.insert(hash(i as u8), Known::Leaf); + + let messages_out: Vec<_> = (0..n_heads).filter_map(|j| message_queue_root(i, j)).collect(); + + if !messages_out.is_empty() { + ctx.ingress_roots.insert(hash(i as u8), messages_out); + } + } + + let mut view = View::default(); + view.update_leaves( + &ctx, + [hash(0), hash(1), hash(2)].iter().cloned(), + ).unwrap(); + + assert!(check_roots(&mut view, &[0, 1, 2], n_heads)); + + view.update_leaves( + &ctx, + [hash(2), hash(3), hash(4)].iter().cloned(), + ).unwrap(); + + // after updating to a leaf-set overlapping with the prior, + // the property that we are aware of all un-routed + // from the perspective of our known leaves should + // remain the same. + + assert!(check_roots(&mut view, &[2, 3, 4], n_heads)); + } +} diff --git a/network/src/lib.rs b/network/src/lib.rs index 40337a8170eb..4eaca9af6678 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -16,8 +16,8 @@ //! Polkadot-specific network implementation. //! -//! This manages routing for parachain statements, parachain block and extrinsic data fetching, -//! communication between collators and validators, and more. +//! This manages routing for parachain statements, parachain block and outgoing message +//! data fetching, communication between collators and validators, and more. mod collator_pool; mod local_collations; @@ -26,23 +26,29 @@ pub mod validation; pub mod gossip; use codec::{Decode, Encode}; -use futures::sync::oneshot; +use futures::sync::{oneshot, mpsc}; +use futures::prelude::*; use polkadot_primitives::{Block, Hash, Header}; use polkadot_primitives::parachain::{ Id as ParaId, BlockData, CollatorId, CandidateReceipt, Collation, PoVBlock, - StructuredUnroutedIngress, ValidatorId + StructuredUnroutedIngress, ValidatorId, OutgoingMessages, }; use substrate_network::{ PeerId, RequestId, Context, StatusMessage as GenericFullStatus, specialization::{Event, NetworkSpecialization as Specialization}, }; -use self::validation::{LiveValidationSessions, RecentValidatorIds, InsertedRecentKey}; +use substrate_network::consensus_gossip::{ + self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage, +}; +use self::validation::{LiveValidationLeaves, RecentValidatorIds, InsertedRecentKey}; use self::collator_pool::{CollatorPool, Role, Action}; use self::local_collations::LocalCollations; use log::{trace, debug, warn}; use std::collections::{HashMap, HashSet}; +use crate::gossip::{POLKADOT_ENGINE_ID, GossipMessage}; + #[cfg(test)] mod tests; @@ -69,7 +75,112 @@ mod benefit { type FullStatus = GenericFullStatus; /// Specialization of the network service for the polkadot protocol. -pub type NetworkService = substrate_network::NetworkService; +pub type PolkadotNetworkService = substrate_network::NetworkService; + +/// Basic functionality that a network has to fulfill. +pub trait NetworkService: Send + Sync + 'static { + /// Get a stream of gossip messages for a given hash. + fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream; + + /// Gossip a message on given topic. + fn gossip_message(&self, topic: Hash, message: GossipMessage); + + /// Execute a closure with the gossip service. + fn with_gossip(&self, with: F) + where F: FnOnce(&mut dyn GossipService, &mut dyn Context); + + /// Execute a closure with the polkadot protocol. + fn with_spec(&self, with: F) + where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context); +} + +impl NetworkService for PolkadotNetworkService { + fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream { + let (tx, rx) = std::sync::mpsc::channel(); + + PolkadotNetworkService::with_gossip(self, move |gossip, _| { + let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic); + let _ = tx.send(inner_rx); + }); + + let topic_stream = match rx.recv() { + Ok(rx) => rx, + Err(_) => mpsc::unbounded().1, // return empty channel. + }; + + GossipMessageStream::new(topic_stream) + } + + fn gossip_message(&self, topic: Hash, message: GossipMessage) { + self.gossip_consensus_message( + topic, + POLKADOT_ENGINE_ID, + message.encode(), + GossipMessageRecipient::BroadcastToAll, + ); + } + + fn with_gossip(&self, with: F) + where F: FnOnce(&mut dyn GossipService, &mut dyn Context) + { + PolkadotNetworkService::with_gossip(self, move |gossip, ctx| with(gossip, ctx)) + } + + fn with_spec(&self, with: F) + where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context) + { + PolkadotNetworkService::with_spec(self, with) + } +} + +/// A gossip network subservice. +pub trait GossipService { + fn send_message(&mut self, ctx: &mut dyn Context, who: &PeerId, message: ConsensusMessage); + fn multicast(&mut self, ctx: &mut dyn Context, topic: &Hash, message: ConsensusMessage); +} + +impl GossipService for consensus_gossip::ConsensusGossip { + fn send_message(&mut self, ctx: &mut dyn Context, who: &PeerId, message: ConsensusMessage) { + consensus_gossip::ConsensusGossip::send_message(self, ctx, who, message) + } + + fn multicast(&mut self, ctx: &mut dyn Context, topic: &Hash, message: ConsensusMessage) { + consensus_gossip::ConsensusGossip::multicast(self, ctx, *topic, message, false) + } +} + +/// A stream of gossip messages and an optional sender for a topic. +pub struct GossipMessageStream { + topic_stream: mpsc::UnboundedReceiver, +} + +impl GossipMessageStream { + /// Create a new instance with the given topic stream. + pub fn new(topic_stream: mpsc::UnboundedReceiver) -> Self { + Self { + topic_stream + } + } +} + +impl Stream for GossipMessageStream { + type Item = (GossipMessage, Option); + type Error = (); + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + let msg = match futures::try_ready!(self.topic_stream.poll()) { + Some(msg) => msg, + None => return Ok(Async::Ready(None)), + }; + + debug!(target: "validation", "Processing statement for live validation leaf-work"); + if let Ok(gmsg) = GossipMessage::decode(&mut &msg.message[..]) { + return Ok(Async::Ready(Some((gmsg, msg.sender)))) + } + } + } +} /// Status of a Polkadot node. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] @@ -79,7 +190,7 @@ pub struct Status { struct PoVBlockRequest { attempted_peers: HashSet, - validation_session_parent: Hash, + validation_leaf: Hash, candidate_hash: Hash, block_data_hash: Hash, sender: oneshot::Sender, @@ -188,10 +299,10 @@ pub struct PolkadotProtocol { collators: CollatorPool, validators: HashMap, local_collations: LocalCollations, - live_validation_sessions: LiveValidationSessions, + live_validation_leaves: LiveValidationLeaves, in_flight: HashMap<(RequestId, PeerId), PoVBlockRequest>, pending: Vec, - extrinsic_store: Option<::av_store::Store>, + availability_store: Option, next_req_id: u64, } @@ -204,10 +315,10 @@ impl PolkadotProtocol { collating_for, validators: HashMap::new(), local_collations: LocalCollations::new(), - live_validation_sessions: LiveValidationSessions::new(), + live_validation_leaves: LiveValidationLeaves::new(), in_flight: HashMap::new(), pending: Vec::new(), - extrinsic_store: None, + availability_store: None, next_req_id: 1, } } @@ -224,7 +335,7 @@ impl PolkadotProtocol { self.pending.push(PoVBlockRequest { attempted_peers: Default::default(), - validation_session_parent: relay_parent, + validation_leaf: relay_parent, candidate_hash: candidate.hash(), block_data_hash: candidate.block_data_hash, sender: tx, @@ -235,15 +346,15 @@ impl PolkadotProtocol { rx } - /// Note new validation session. - fn new_validation_session( + /// Note new leaf to do validation work at + fn new_validation_leaf_work( &mut self, ctx: &mut dyn Context, - params: validation::SessionParams, - ) -> validation::ValidationSession { + params: validation::LeafWorkParams, + ) -> validation::LiveValidationLeaf { - let (session, new_local) = self.live_validation_sessions - .new_validation_session(params); + let (work, new_local) = self.live_validation_leaves + .new_validation_leaf(params); if let Some(new_local) = new_local { for (id, peer_data) in self.peers.iter_mut() @@ -257,12 +368,12 @@ impl PolkadotProtocol { } } - session + work } // true indicates that it was removed actually. fn remove_validation_session(&mut self, parent_hash: Hash) -> bool { - self.live_validation_sessions.remove(parent_hash) + self.live_validation_leaves.remove(parent_hash) } fn dispatch_pending_requests(&mut self, ctx: &mut dyn Context) { @@ -272,10 +383,10 @@ impl PolkadotProtocol { let in_flight = &mut self.in_flight; for mut pending in ::std::mem::replace(&mut self.pending, Vec::new()) { - let parent = pending.validation_session_parent; + let parent = pending.validation_leaf; let c_hash = pending.candidate_hash; - let still_pending = self.live_validation_sessions.with_pov_block(&parent, &c_hash, |x| match x { + let still_pending = self.live_validation_leaves.with_pov_block(&parent, &c_hash, |x| match x { Ok(data @ &_) => { // answer locally. let _ = pending.sender.send(data.clone()); @@ -305,7 +416,7 @@ impl PolkadotProtocol { Some(pending) } } - Err(None) => None, // no such known validation session. prune out. + Err(None) => None, // no such known validation leaf-work. prune out. }); if let Some(pending) = still_pending { @@ -321,7 +432,7 @@ impl PolkadotProtocol { match msg { Message::ValidatorId(key) => self.on_session_key(ctx, who, key), Message::RequestPovBlock(req_id, relay_parent, candidate_hash) => { - let pov_block = self.live_validation_sessions.with_pov_block( + let pov_block = self.live_validation_leaves.with_pov_block( &relay_parent, &candidate_hash, |res| res.ok().map(|b| b.clone()), @@ -330,13 +441,13 @@ impl PolkadotProtocol { send_polkadot_message(ctx, who, Message::PovBlock(req_id, pov_block)); } Message::RequestBlockData(req_id, relay_parent, candidate_hash) => { - let block_data = self.live_validation_sessions + let block_data = self.live_validation_leaves .with_pov_block( &relay_parent, &candidate_hash, |res| res.ok().map(|b| b.block_data.clone()), ) - .or_else(|| self.extrinsic_store.as_ref() + .or_else(|| self.availability_store.as_ref() .and_then(|s| s.block_data(relay_parent, candidate_hash)) ); @@ -507,7 +618,7 @@ impl Specialization for PolkadotProtocol { // send session keys. if peer_info.should_send_key() { - for local_session_key in self.live_validation_sessions.recent_keys() { + for local_session_key in self.live_validation_leaves.recent_keys() { peer_info.collator_state.send_key(local_session_key.clone(), |msg| send_polkadot_message( ctx, who.clone(), @@ -549,7 +660,7 @@ impl Specialization for PolkadotProtocol { let (sender, _) = oneshot::channel(); pending.push(::std::mem::replace(val, PoVBlockRequest { attempted_peers: Default::default(), - validation_session_parent: Default::default(), + validation_leaf: Default::default(), candidate_hash: Default::default(), block_data_hash: Default::default(), canon_roots: StructuredUnroutedIngress(Vec::new()), @@ -676,16 +787,35 @@ impl PolkadotProtocol { impl PolkadotProtocol { /// Add a local collation and broadcast it to the necessary peers. + /// + /// This should be called by a collator intending to get the locally-collated + /// block into the hands of validators. + /// It also places the outgoing message and block data in the local availability store. pub fn add_local_collation( &mut self, ctx: &mut dyn Context, relay_parent: Hash, targets: HashSet, collation: Collation, - ) { + outgoing_targeted: OutgoingMessages, + ) -> std::io::Result<()> { debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}", relay_parent, collation.receipt.parachain_index); + let outgoing_queues = polkadot_validation::outgoing_queues(&outgoing_targeted) + .map(|(_target, root, data)| (root, data)) + .collect(); + + if let Some(ref availability_store) = self.availability_store { + availability_store.make_available(av_store::Data { + relay_parent, + parachain_id: collation.receipt.parachain_index, + candidate_hash: collation.receipt.hash(), + block_data: collation.pov.block_data.clone(), + outgoing_queues: Some(outgoing_queues), + })?; + } + for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) { match self.validators.get(&primary) { Some(who) => { @@ -700,10 +830,13 @@ impl PolkadotProtocol { warn!(target: "polkadot_network", "Encountered tracked but disconnected validator {:?}", primary), } } + + Ok(()) } - /// register availability store. - pub fn register_availability_store(&mut self, extrinsic_store: ::av_store::Store) { - self.extrinsic_store = Some(extrinsic_store); + /// Give the network protocol a handle to an availability store, used for + /// circulation of parachain data required for validation. + pub fn register_availability_store(&mut self, availability_store: ::av_store::Store) { + self.availability_store = Some(availability_store); } } diff --git a/network/src/router.rs b/network/src/router.rs index 9dffa2546d56..baad3ad63418 100644 --- a/network/src/router.rs +++ b/network/src/router.rs @@ -29,7 +29,7 @@ use polkadot_validation::{ }; use polkadot_primitives::{Block, Hash}; use polkadot_primitives::parachain::{ - Extrinsic, CandidateReceipt, ParachainHost, ValidatorIndex, Collation, PoVBlock, + OutgoingMessages, CandidateReceipt, ParachainHost, ValidatorIndex, Collation, PoVBlock, }; use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement}; @@ -41,7 +41,8 @@ use std::collections::{HashMap, HashSet}; use std::io; use std::sync::Arc; -use crate::validation::{self, SessionDataFetcher, NetworkService, Executor}; +use crate::validation::{self, LeafWorkDataFetcher, Executor}; +use crate::NetworkService; /// Compute the gossip topic for attestations on the given parent hash. pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash { @@ -72,7 +73,7 @@ pub(crate) fn checked_statements(network: &N, topic: Hash) -> pub struct Router { table: Arc, attestation_topic: Hash, - fetcher: SessionDataFetcher, + fetcher: LeafWorkDataFetcher, deferred_statements: Arc>, message_validator: RegisteredMessageValidator, } @@ -80,7 +81,7 @@ pub struct Router { impl Router { pub(crate) fn new( table: Arc, - fetcher: SessionDataFetcher, + fetcher: LeafWorkDataFetcher, message_validator: RegisteredMessageValidator, ) -> Self { let parent_hash = fetcher.parent_hash(); @@ -196,7 +197,7 @@ impl Router w knowledge.lock().note_candidate( candidate_hash, Some(validated.pov_block().clone()), - validated.extrinsic().cloned(), + validated.outgoing_messages().cloned(), ); // propagate the statement. @@ -224,13 +225,13 @@ impl TableRouter for Router wh type Error = io::Error; type FetchValidationProof = validation::PoVReceiver; - fn local_collation(&self, collation: Collation, extrinsic: Extrinsic) { + fn local_collation(&self, collation: Collation, outgoing: OutgoingMessages) { // produce a signed statement let hash = collation.receipt.hash(); let validated = Validated::collated_local( collation.receipt, collation.pov.clone(), - extrinsic.clone(), + outgoing.clone(), ); let statement = GossipStatement::new( @@ -242,7 +243,7 @@ impl TableRouter for Router wh ); // give to network to make available. - self.fetcher.knowledge().lock().note_candidate(hash, Some(collation.pov), Some(extrinsic)); + self.fetcher.knowledge().lock().note_candidate(hash, Some(collation.pov), Some(outgoing)); self.network().gossip_message(self.attestation_topic, statement.into()); } diff --git a/network/src/tests/mod.rs b/network/src/tests/mod.rs index be20e0ad1c2e..08f807e11a69 100644 --- a/network/src/tests/mod.rs +++ b/network/src/tests/mod.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use super::{PolkadotProtocol, Status, Message, FullStatus}; -use crate::validation::SessionParams; +use crate::validation::LeafWorkParams; use polkadot_validation::GenericStatement; use polkadot_primitives::{Block, Hash}; @@ -77,6 +77,28 @@ impl TestContext { } } +#[derive(Default)] +pub struct TestChainContext { + pub known_map: HashMap, + pub ingress_roots: HashMap>, +} + +impl crate::gossip::ChainContext for TestChainContext { + fn is_known(&self, block_hash: &Hash) -> Option { + self.known_map.get(block_hash).map(|x| x.clone()) + } + + fn leaf_unrouted_roots(&self, leaf: &Hash, with_queue_root: &mut dyn FnMut(&Hash)) + -> Result<(), substrate_client::error::Error> + { + for root in self.ingress_roots.get(leaf).into_iter().flat_map(|roots| roots) { + with_queue_root(root) + } + + Ok(()) + } +} + fn make_pov(block_data: Vec) -> PoVBlock { PoVBlock { block_data: BlockData(block_data), @@ -96,8 +118,8 @@ fn make_status(status: &Status, roles: Roles) -> FullStatus { } } -fn make_validation_session(parent_hash: Hash, local_key: ValidatorId) -> SessionParams { - SessionParams { +fn make_validation_leaf_work(parent_hash: Hash, local_key: ValidatorId) -> LeafWorkParams { + LeafWorkParams { local_session_key: Some(local_key), parent_hash, authorities: Vec::new(), @@ -129,8 +151,8 @@ fn sends_session_key() { { let mut ctx = TestContext::default(); - let params = make_validation_session(parent_hash, local_key.clone()); - protocol.new_validation_session(&mut ctx, params); + let params = make_validation_leaf_work(parent_hash, local_key.clone()); + protocol.new_validation_leaf_work(&mut ctx, params); assert!(ctx.has_message(peer_a, Message::ValidatorId(local_key.clone()))); } @@ -169,8 +191,8 @@ fn fetches_from_those_with_knowledge() { let status = Status { collating_for: None }; - let params = make_validation_session(parent_hash, local_key.clone()); - let session = protocol.new_validation_session(&mut TestContext::default(), params); + let params = make_validation_leaf_work(parent_hash, local_key.clone()); + let session = protocol.new_validation_leaf_work(&mut TestContext::default(), params); let knowledge = session.knowledge(); knowledge.lock().note_statement(a_key.clone(), &GenericStatement::Valid(candidate_hash)); @@ -259,7 +281,7 @@ fn fetches_available_block_data() { parachain_id: para_id, candidate_hash, block_data: block_data.clone(), - extrinsic: None, + outgoing_queues: None, }).unwrap(); // connect peer A @@ -323,13 +345,13 @@ fn many_session_keys() { let local_key_a: ValidatorId = [3; 32].unchecked_into(); let local_key_b: ValidatorId = [4; 32].unchecked_into(); - let params_a = make_validation_session(parent_a, local_key_a.clone()); - let params_b = make_validation_session(parent_b, local_key_b.clone()); + let params_a = make_validation_leaf_work(parent_a, local_key_a.clone()); + let params_b = make_validation_leaf_work(parent_b, local_key_b.clone()); - protocol.new_validation_session(&mut TestContext::default(), params_a); - protocol.new_validation_session(&mut TestContext::default(), params_b); + protocol.new_validation_leaf_work(&mut TestContext::default(), params_a); + protocol.new_validation_leaf_work(&mut TestContext::default(), params_b); - assert_eq!(protocol.live_validation_sessions.recent_keys(), &[local_key_a.clone(), local_key_b.clone()]); + assert_eq!(protocol.live_validation_leaves.recent_keys(), &[local_key_a.clone(), local_key_b.clone()]); let peer_a = PeerId::random(); diff --git a/network/src/tests/validation.rs b/network/src/tests/validation.rs index 61130a2639ab..cc08d5eda8db 100644 --- a/network/src/tests/validation.rs +++ b/network/src/tests/validation.rs @@ -18,18 +18,17 @@ #![allow(unused)] -use crate::validation::{NetworkService, GossipService, GossipMessageStream}; use crate::gossip::GossipMessage; use substrate_network::Context as NetContext; use substrate_network::consensus_gossip::TopicNotification; use substrate_primitives::{NativeOrEncoded, ExecutionContext}; use substrate_keyring::Sr25519Keyring; -use crate::PolkadotProtocol; +use crate::{GossipService, PolkadotProtocol, NetworkService, GossipMessageStream}; -use polkadot_validation::{SharedTable, MessagesFrom, Network}; -use polkadot_primitives::{Block, Hash, Header, BlockId}; +use polkadot_validation::{SharedTable, Network}; +use polkadot_primitives::{Block, BlockNumber, Hash, Header, BlockId}; use polkadot_primitives::parachain::{ - Id as ParaId, Chain, DutyRoster, ParachainHost, OutgoingMessage, + Id as ParaId, Chain, DutyRoster, ParachainHost, TargetedMessage, ValidatorId, StructuredUnroutedIngress, BlockIngressRoots, Status, FeeSchedule, HeadData, }; @@ -43,7 +42,7 @@ use std::sync::Arc; use futures::{prelude::*, sync::mpsc}; use codec::Encode; -use super::TestContext; +use super::{TestContext, TestChainContext}; type TaskExecutor = Arc + Send>> + Send + Sync>; @@ -315,10 +314,10 @@ impl ParachainHost for RuntimeApi { &self, _at: &BlockId, _: ExecutionContext, - id: Option, + id: Option<(ParaId, Option)>, _: Vec, ) -> ClientResult>> { - let id = id.unwrap(); + let (id, _) = id.unwrap(); Ok(NativeOrEncoded::Native(self.data.lock().ingress.get(&id).cloned())) } } @@ -348,7 +347,7 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built { }); let message_val = crate::gossip::RegisteredMessageValidator::new_test( - |_hash: &_| Some(crate::gossip::Known::Leaf), + TestChainContext::default(), Box::new(|_, _| {}), ); @@ -376,7 +375,7 @@ struct IngressBuilder { } impl IngressBuilder { - fn add_messages(&mut self, source: ParaId, messages: &[OutgoingMessage]) { + fn add_messages(&mut self, source: ParaId, messages: &[TargetedMessage]) { for message in messages { let target = message.target; self.egress.entry((source, target)).or_insert_with(Vec::new).push(message.data.clone()); diff --git a/network/src/validation.rs b/network/src/validation.rs index a6abe4c25028..a6b75e33bc6d 100644 --- a/network/src/validation.rs +++ b/network/src/validation.rs @@ -14,29 +14,24 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! The "validation session" networking code built on top of the base network service. +//! The "validation leaf work" networking code built on top of the base network service. //! //! This fulfills the `polkadot_validation::Network` trait, providing a hook to be called -//! each time a validation session begins on a new chain head. +//! each time validation leaf work begins on a new chain head. -use crate::gossip::GossipMessage; use sr_primitives::traits::ProvideRuntimeApi; -use substrate_network::{PeerId, Context as NetContext}; -use substrate_network::consensus_gossip::{ - self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage, -}; +use substrate_network::PeerId; use polkadot_validation::{ Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement, SignedStatement, }; use polkadot_primitives::{Block, BlockId, Hash}; use polkadot_primitives::parachain::{ - Id as ParaId, Collation, Extrinsic, ParachainHost, CandidateReceipt, CollatorId, - ValidatorId, PoVBlock, ValidatorIndex + Id as ParaId, Collation, OutgoingMessages, ParachainHost, CandidateReceipt, CollatorId, + ValidatorId, PoVBlock }; use futures::prelude::*; use futures::future::{self, Executor as FutureExecutor}; -use futures::sync::mpsc; use futures::sync::oneshot::{self, Receiver}; use std::collections::hash_map::{HashMap, Entry}; @@ -45,17 +40,15 @@ use std::sync::Arc; use arrayvec::ArrayVec; use parking_lot::Mutex; -use log::{debug, warn}; +use log::warn; use crate::router::Router; -use crate::gossip::{POLKADOT_ENGINE_ID, RegisteredMessageValidator, MessageValidationData}; +use crate::gossip::{RegisteredMessageValidator, MessageValidationData}; -use super::PolkadotProtocol; +use super::NetworkService; pub use polkadot_validation::Incoming; -use codec::{Encode, Decode}; - /// An executor suitable for dispatching async consensus tasks. pub trait Executor { fn spawn + Send + 'static>(&self, f: F); @@ -83,108 +76,8 @@ impl Executor for Arc< } } -/// A gossip network subservice. -pub trait GossipService { - fn send_message(&mut self, ctx: &mut dyn NetContext, who: &PeerId, message: ConsensusMessage); -} - -impl GossipService for consensus_gossip::ConsensusGossip { - fn send_message(&mut self, ctx: &mut dyn NetContext, who: &PeerId, message: ConsensusMessage) { - consensus_gossip::ConsensusGossip::send_message(self, ctx, who, message) - } -} - -/// A stream of gossip messages and an optional sender for a topic. -pub struct GossipMessageStream { - topic_stream: mpsc::UnboundedReceiver, -} - -impl GossipMessageStream { - /// Create a new instance with the given topic stream. - pub fn new(topic_stream: mpsc::UnboundedReceiver) -> Self { - Self { - topic_stream - } - } -} - -impl Stream for GossipMessageStream { - type Item = (GossipMessage, Option); - type Error = (); - - fn poll(&mut self) -> Poll, Self::Error> { - loop { - let msg = match futures::try_ready!(self.topic_stream.poll()) { - Some(msg) => msg, - None => return Ok(Async::Ready(None)), - }; - - debug!(target: "validation", "Processing statement for live validation session"); - if let Ok(gmsg) = GossipMessage::decode(&mut &msg.message[..]) { - return Ok(Async::Ready(Some((gmsg, msg.sender)))) - } - } - } -} - -/// Basic functionality that a network has to fulfill. -pub trait NetworkService: Send + Sync + 'static { - /// Get a stream of gossip messages for a given hash. - fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream; - - /// Gossip a message on given topic. - fn gossip_message(&self, topic: Hash, message: GossipMessage); - - /// Execute a closure with the gossip service. - fn with_gossip(&self, with: F) - where F: FnOnce(&mut dyn GossipService, &mut dyn NetContext); - - /// Execute a closure with the polkadot protocol. - fn with_spec(&self, with: F) - where F: FnOnce(&mut PolkadotProtocol, &mut dyn NetContext); -} - -impl NetworkService for super::NetworkService { - fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream { - let (tx, rx) = std::sync::mpsc::channel(); - - super::NetworkService::with_gossip(self, move |gossip, _| { - let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic); - let _ = tx.send(inner_rx); - }); - - let topic_stream = match rx.recv() { - Ok(rx) => rx, - Err(_) => mpsc::unbounded().1, // return empty channel. - }; - - GossipMessageStream::new(topic_stream) - } - - fn gossip_message(&self, topic: Hash, message: GossipMessage) { - self.gossip_consensus_message( - topic, - POLKADOT_ENGINE_ID, - message.encode(), - GossipMessageRecipient::BroadcastToAll, - ); - } - - fn with_gossip(&self, with: F) - where F: FnOnce(&mut dyn GossipService, &mut dyn NetContext) - { - super::NetworkService::with_gossip(self, move |gossip, ctx| with(gossip, ctx)) - } - - fn with_spec(&self, with: F) - where F: FnOnce(&mut PolkadotProtocol, &mut dyn NetContext) - { - super::NetworkService::with_spec(self, with) - } -} - -/// Params to a current validation session. -pub struct SessionParams { +/// Params to instantiate validation work on a block-DAG leaf. +pub struct LeafWorkParams { /// The local session key. pub local_session_key: Option, /// The parent hash. @@ -234,20 +127,22 @@ impl ValidationNetwork where N: NetworkService, T: Clone + Executor + Send + Sync + 'static, { - /// Instantiate session data fetcher at a parent hash. + /// Instantiate block-DAG leaf work + /// (i.e. the work we want to be done by validators at some chain-head) + /// at a parent hash. /// /// If the used session key is new, it will be broadcast to peers. - /// If a validation session was already instantiated at this parent hash, + /// If any validation leaf-work was already instantiated at this parent hash, /// the underlying instance will be shared. /// - /// If there was already a validation session instantiated and a different + /// If there was already validation leaf-work instantiated and a different /// session key was set, then the new key will be ignored. /// /// This implies that there can be multiple services intantiating validation - /// session instances safely, but they should all be coordinated on which session keys + /// leaf-work instances safely, but they should all be coordinated on which session keys /// are being used. - pub fn instantiate_session(&self, params: SessionParams) - -> oneshot::Receiver> + pub fn instantiate_leaf_work(&self, params: LeafWorkParams) + -> oneshot::Receiver> { let parent_hash = params.parent_hash; let network = self.network.clone(); @@ -255,34 +150,27 @@ impl ValidationNetwork where let task_executor = self.executor.clone(); let exit = self.exit.clone(); let message_validator = self.message_validator.clone(); - let index_mapping = params.authorities - .iter() - .enumerate() - .map(|(i, k)| (i as ValidatorIndex, k.clone())) - .collect(); + let authorities = params.authorities.clone(); let (tx, rx) = oneshot::channel(); - { - let message_validator = self.message_validator.clone(); - let authorities = params.authorities.clone(); - self.network.with_gossip(move |gossip, ctx| { - message_validator.note_session( - parent_hash, - MessageValidationData { authorities, index_mapping }, - |peer_id, message| gossip.send_message(ctx, peer_id, message), - ); - }); - } - self.network.with_spec(move |spec, ctx| { - let session = spec.new_validation_session(ctx, params); - let _ = tx.send(SessionDataFetcher { + let actions = message_validator.new_local_leaf( + parent_hash, + MessageValidationData { authorities }, + |queue_root| spec.availability_store.as_ref() + .and_then(|store| store.queue_by_root(queue_root)) + ); + + network.with_gossip(move |gossip, ctx| actions.perform(gossip, ctx)); + + let work = spec.new_validation_leaf_work(ctx, params); + let _ = tx.send(LeafWorkDataFetcher { network, api, task_executor, parent_hash, - knowledge: session.knowledge().clone(), + knowledge: work.knowledge().clone(), exit, message_validator, }); @@ -335,7 +223,7 @@ impl ParachainNetwork for ValidationNetwork where let parent_hash = *table.consensus_parent_hash(); let local_session_key = table.session_key(); - let build_fetcher = self.instantiate_session(SessionParams { + let build_fetcher = self.instantiate_leaf_work(LeafWorkParams { local_session_key, parent_hash, authorities: authorities.to_vec(), @@ -421,9 +309,9 @@ impl Collators for ValidationNetwork where #[derive(Default)] struct KnowledgeEntry { knows_block_data: Vec, - knows_extrinsic: Vec, + knows_outgoing: Vec, pov: Option, - extrinsic: Option, + outgoing_messages: Option, } /// Tracks knowledge of peers. @@ -442,18 +330,18 @@ impl Knowledge { /// Note a statement seen from another validator. pub(crate) fn note_statement(&mut self, from: ValidatorId, statement: &Statement) { // those proposing the candidate or declaring it valid know everything. - // those claiming it invalid do not have the extrinsic data as it is + // those claiming it invalid do not have the outgoing messages data as it is // generated by valid execution. match *statement { GenericStatement::Candidate(ref c) => { let entry = self.candidates.entry(c.hash()).or_insert_with(Default::default); entry.knows_block_data.push(from.clone()); - entry.knows_extrinsic.push(from); + entry.knows_outgoing.push(from); } GenericStatement::Valid(ref hash) => { let entry = self.candidates.entry(*hash).or_insert_with(Default::default); entry.knows_block_data.push(from.clone()); - entry.knows_extrinsic.push(from); + entry.knows_outgoing.push(from); } GenericStatement::Invalid(ref hash) => self.candidates.entry(*hash) .or_insert_with(Default::default) @@ -463,10 +351,15 @@ impl Knowledge { } /// Note a candidate collated or seen locally. - pub(crate) fn note_candidate(&mut self, hash: Hash, pov: Option, extrinsic: Option) { + pub(crate) fn note_candidate( + &mut self, + hash: Hash, + pov: Option, + outgoing_messages: Option, + ) { let entry = self.candidates.entry(hash).or_insert_with(Default::default); entry.pov = entry.pov.take().or(pov); - entry.extrinsic = entry.extrinsic.take().or(extrinsic); + entry.outgoing_messages = entry.outgoing_messages.take().or(outgoing_messages); } } @@ -492,19 +385,19 @@ impl Future for IncomingReceiver { } } -/// A current validation session instance. +/// A current validation leaf-work instance #[derive(Clone)] -pub(crate) struct ValidationSession { +pub(crate) struct LiveValidationLeaf { parent_hash: Hash, knowledge: Arc>, local_session_key: Option, } -impl ValidationSession { - /// Create a new validation session instance. Needs to be attached to the +impl LiveValidationLeaf { + /// Create a new validation leaf-work instance. Needs to be attached to the /// network. - pub(crate) fn new(params: SessionParams) -> Self { - ValidationSession { + pub(crate) fn new(params: LeafWorkParams) -> Self { + LiveValidationLeaf { parent_hash: params.parent_hash, knowledge: Arc::new(Mutex::new(Knowledge::new())), local_session_key: params.local_session_key, @@ -577,32 +470,32 @@ impl RecentValidatorIds { } } -/// Manages requests and keys for live validation session instances. -pub(crate) struct LiveValidationSessions { +/// Manages requests and keys for live validation leaf-work instances. +pub(crate) struct LiveValidationLeaves { // recent local session keys. recent: RecentValidatorIds, - // live validation session instances, on `parent_hash`. refcount retained alongside. - live_instances: HashMap, + // live validation leaf-work instances, on `parent_hash`. refcount retained alongside. + live_instances: HashMap, } -impl LiveValidationSessions { - /// Create a new `LiveValidationSessions` +impl LiveValidationLeaves { + /// Create a new `LiveValidationLeaves` pub(crate) fn new() -> Self { - LiveValidationSessions { + LiveValidationLeaves { recent: Default::default(), live_instances: HashMap::new(), } } - /// Note new validation session. If the used session key is new, + /// Note new leaf for validation work. If the used session key is new, /// it returns it to be broadcasted to peers. /// - /// If there was already a validation session instantiated and a different + /// If there was already work instantiated at this leaf and a different /// session key was set, then the new key will be ignored. - pub(crate) fn new_validation_session( + pub(crate) fn new_validation_leaf( &mut self, - params: SessionParams, - ) -> (ValidationSession, Option) { + params: LeafWorkParams, + ) -> (LiveValidationLeaf, Option) { let parent_hash = params.parent_hash; let key = params.local_session_key.clone(); @@ -629,19 +522,19 @@ impl LiveValidationSessions { return (prev.clone(), maybe_new) } - let session = ValidationSession::new(params); - self.live_instances.insert(parent_hash, (1, session.clone())); + let leaf_work = LiveValidationLeaf::new(params); + self.live_instances.insert(parent_hash, (1, leaf_work.clone())); - (session, check_new_key()) + (leaf_work, check_new_key()) } - /// Remove validation session. true indicates that it was actually removed. + /// Remove validation leaf-work. true indicates that it was actually removed. pub(crate) fn remove(&mut self, parent_hash: Hash) -> bool { let maybe_removed = if let Entry::Occupied(mut entry) = self.live_instances.entry(parent_hash) { entry.get_mut().0 -= 1; if entry.get().0 == 0 { - let (_, session) = entry.remove(); - Some(session) + let (_, leaf_work) = entry.remove(); + Some(leaf_work) } else { None } @@ -649,12 +542,12 @@ impl LiveValidationSessions { None }; - let session = match maybe_removed { + let leaf_work = match maybe_removed { None => return false, Some(s) => s, }; - if let Some(ref key) = session.local_session_key { + if let Some(ref key) = leaf_work.local_session_key { let key_still_used = self.live_instances.values() .any(|c| c.1.local_session_key.as_ref() == Some(key)); @@ -671,12 +564,12 @@ impl LiveValidationSessions { self.recent.as_slice() } - /// Call a closure with pov-data from validation session at parent hash for a given + /// Call a closure with pov-data from validation leaf-work at parent hash for a given /// candidate-receipt hash. /// - /// This calls the closure with `Some(data)` where the session and data are live, - /// `Err(Some(keys))` when the session is live but the data unknown, with a list of keys - /// who have the data, and `Err(None)` where the session is unknown. + /// This calls the closure with `Some(data)` where the leaf-work and data are live, + /// `Err(Some(keys))` when the leaf-work is live but the data unknown, with a list of keys + /// who have the data, and `Err(None)` where the leaf-work is unknown. pub(crate) fn with_pov_block(&self, parent_hash: &Hash, c_hash: &Hash, f: F) -> U where F: FnOnce(Result<&PoVBlock, Option<&[ValidatorId]>>) -> U { @@ -716,8 +609,8 @@ impl Future for PoVReceiver { } } -/// Can fetch data for a given validation session -pub struct SessionDataFetcher { +/// Can fetch data for a given validation leaf-work instance. +pub struct LeafWorkDataFetcher { network: Arc, api: Arc

, exit: E, @@ -727,7 +620,7 @@ pub struct SessionDataFetcher { message_validator: RegisteredMessageValidator, } -impl SessionDataFetcher { +impl LeafWorkDataFetcher { /// Get the parent hash. pub(crate) fn parent_hash(&self) -> Hash { self.parent_hash @@ -759,9 +652,9 @@ impl SessionDataFetcher { } } -impl Clone for SessionDataFetcher { +impl Clone for LeafWorkDataFetcher { fn clone(&self) -> Self { - SessionDataFetcher { + LeafWorkDataFetcher { network: self.network.clone(), api: self.api.clone(), task_executor: self.task_executor.clone(), @@ -773,7 +666,7 @@ impl Clone for SessionDataFetcher SessionDataFetcher where +impl LeafWorkDataFetcher where P::Api: ParachainHost, N: NetworkService, T: Clone + Executor + Send + 'static, @@ -784,7 +677,11 @@ impl SessionDataFetcher where let parachain = candidate.parachain_index; let parent_hash = self.parent_hash; - let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain) + let canon_roots = self.api.runtime_api().ingress( + &BlockId::hash(parent_hash), + parachain, + None, + ) .map_err(|e| format!( "Cannot fetch ingress for parachain {:?} at {:?}: {:?}", @@ -862,39 +759,39 @@ mod tests { } #[test] - fn add_new_sessions_works() { - let mut live_sessions = LiveValidationSessions::new(); + fn add_new_leaf_work_works() { + let mut live_leaves = LiveValidationLeaves::new(); let key_a: ValidatorId = [0; 32].unchecked_into(); let key_b: ValidatorId = [1; 32].unchecked_into(); let parent_hash = [0xff; 32].into(); - let (session, new_key) = live_sessions.new_validation_session(SessionParams { + let (leaf_work, new_key) = live_leaves.new_validation_leaf(LeafWorkParams { parent_hash, local_session_key: None, authorities: Vec::new(), }); - let knowledge = session.knowledge().clone(); + let knowledge = leaf_work.knowledge().clone(); assert!(new_key.is_none()); - let (session, new_key) = live_sessions.new_validation_session(SessionParams { + let (leaf_work, new_key) = live_leaves.new_validation_leaf(LeafWorkParams { parent_hash, local_session_key: Some(key_a.clone()), authorities: Vec::new(), }); // check that knowledge points to the same place. - assert_eq!(&**session.knowledge() as *const _, &*knowledge as *const _); + assert_eq!(&**leaf_work.knowledge() as *const _, &*knowledge as *const _); assert_eq!(new_key, Some(key_a.clone())); - let (session, new_key) = live_sessions.new_validation_session(SessionParams { + let (leaf_work, new_key) = live_leaves.new_validation_leaf(LeafWorkParams { parent_hash, local_session_key: Some(key_b.clone()), authorities: Vec::new(), }); - assert_eq!(&**session.knowledge() as *const _, &*knowledge as *const _); + assert_eq!(&**leaf_work.knowledge() as *const _, &*knowledge as *const _); assert!(new_key.is_none()); } } diff --git a/primitives/src/parachain.rs b/primitives/src/parachain.rs index b60b1581ed39..24bcf9908165 100644 --- a/primitives/src/parachain.rs +++ b/primitives/src/parachain.rs @@ -96,31 +96,37 @@ pub struct DutyRoster { pub validator_duty: Vec, } -/// An outgoing message +/// A message targeted to a specific parachain. #[derive(Clone, PartialEq, Eq, Encode, Decode)] #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] #[cfg_attr(feature = "std", serde(rename_all = "camelCase"))] #[cfg_attr(feature = "std", serde(deny_unknown_fields))] -pub struct OutgoingMessage { +pub struct TargetedMessage { /// The target parachain. pub target: Id, /// The message data. pub data: Vec, } -impl PartialOrd for OutgoingMessage { +impl AsRef<[u8]> for TargetedMessage { + fn as_ref(&self) -> &[u8] { + &self.data[..] + } +} + +impl PartialOrd for TargetedMessage { fn partial_cmp(&self, other: &Self) -> Option { Some(self.target.cmp(&other.target)) } } -impl Ord for OutgoingMessage { +impl Ord for TargetedMessage { fn cmp(&self, other: &Self) -> Ordering { self.target.cmp(&other.target) } } -/// Extrinsic data for a parachain candidate. +/// Outgoing message data for a parachain candidate. /// /// This is data produced by evaluating the candidate. It contains /// full records of all outgoing messages to other parachains. @@ -128,11 +134,37 @@ impl Ord for OutgoingMessage { #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] #[cfg_attr(feature = "std", serde(rename_all = "camelCase"))] #[cfg_attr(feature = "std", serde(deny_unknown_fields))] -pub struct Extrinsic { +pub struct OutgoingMessages { /// The outgoing messages from the execution of the parachain. /// /// This must be sorted in ascending order by parachain ID. - pub outgoing_messages: Vec + pub outgoing_messages: Vec +} + +impl OutgoingMessages { + /// Returns an iterator of slices of all outgoing message queues. + /// + /// All messages in a given slice are guaranteed to have the same target. + pub fn message_queues(&'_ self) -> impl Iterator + '_ { + let mut outgoing = &self.outgoing_messages[..]; + + rstd::iter::from_fn(move || { + if outgoing.is_empty() { return None } + let target = outgoing[0].target; + let mut end = 1; // the index of the last matching item + 1. + loop { + match outgoing.get(end) { + None => break, + Some(x) => if x.target != target { break }, + } + end += 1; + } + + let item = &outgoing[..end]; + outgoing = &outgoing[end..]; + Some(item) + }) + } } /// Candidate receipt type. @@ -217,6 +249,18 @@ pub struct PoVBlock { #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Encode, Debug))] pub struct Message(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec); +impl AsRef<[u8]> for Message { + fn as_ref(&self) -> &[u8] { + &self.0[..] + } +} + +impl From for Message { + fn from(targeted: TargetedMessage) -> Self { + Message(targeted.data) + } +} + /// All ingress roots at one block. /// /// This is an ordered vector of other parachain's egress queue roots from a specific block. @@ -395,7 +439,10 @@ substrate_client::decl_runtime_apis! { fn parachain_code(id: Id) -> Option>; /// Get all the unrouted ingress roots at the given block that /// are targeting the given parachain. - fn ingress(to: Id) -> Option; + /// + /// If `since` is provided, only messages since (including those in) that block + /// will be included. + fn ingress(to: Id, since: Option) -> Option; } } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 57c984bf23f5..dd99246f69e0 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -630,8 +630,10 @@ impl_runtime_apis! { fn parachain_code(id: parachain::Id) -> Option> { Parachains::parachain_code(&id) } - fn ingress(to: parachain::Id) -> Option { - Parachains::ingress(to).map(parachain::StructuredUnroutedIngress) + fn ingress(to: parachain::Id, since: Option) + -> Option + { + Parachains::ingress(to, since).map(parachain::StructuredUnroutedIngress) } } diff --git a/runtime/src/parachains.rs b/runtime/src/parachains.rs index 939132cd4d5d..b507e6d1b50d 100644 --- a/runtime/src/parachains.rs +++ b/runtime/src/parachains.rs @@ -21,7 +21,9 @@ use rstd::collections::btree_map::BTreeMap; use codec::{Encode, Decode, HasCompact}; use srml_support::{decl_storage, decl_module, fail, ensure}; -use sr_primitives::traits::{Hash as HashT, BlakeTwo256, Member, CheckedConversion, Saturating, One}; +use sr_primitives::traits::{ + Hash as HashT, BlakeTwo256, Member, CheckedConversion, Saturating, One, Zero, +}; use sr_primitives::weights::SimpleDispatchInfo; use primitives::{Hash, Balance, parachain::{ self, Id as ParaId, Chain, DutyRoster, AttestedCandidate, Statement, AccountIdConversion, @@ -588,15 +590,23 @@ impl Module { } /// Calculate the ingress to a specific parachain. - /// Complexity: O(n) in the number of blocks since the parachain's watermark. + /// If `since` is provided, only messages since (including those in) that block + /// will be included. + /// Complexity: O(n) in the number of blocks since the supplied block. /// invoked off-chain. /// /// Yields a structure containing all unrouted ingress to the parachain. - pub fn ingress(to: ParaId) -> Option> { + pub fn ingress(to: ParaId, since: Option) -> Option> { let watermark = >::get(to)?; let now = >::block_number(); - Some(number_range(watermark.saturating_add(One::one()),now) + let watermark_since = watermark.saturating_add(One::one()); + let since = rstd::cmp::max(since.unwrap_or(Zero::zero()), watermark_since); + if since > now { + return Some(Vec::new()); + } + + Some(number_range(since, now) .filter_map(|unrouted_height| { >::get(&(unrouted_height, to)).map(|roots| { (unrouted_height, BlockIngressRoots(roots)) @@ -1697,8 +1707,8 @@ mod tests { ]; with_externalities(&mut new_test_ext(parachains), || { - assert_eq!(Parachains::ingress(ParaId::from(1)), Some(Vec::new())); - assert_eq!(Parachains::ingress(ParaId::from(99)), Some(Vec::new())); + assert_eq!(Parachains::ingress(ParaId::from(1), None), Some(Vec::new())); + assert_eq!(Parachains::ingress(ParaId::from(99), None), Some(Vec::new())); for i in 1..10 { System::set_block_number(i); @@ -1755,7 +1765,7 @@ mod tests { // parachain 1 has had a bunch of parachain candidates included, // which raises the watermark. assert_eq!( - Parachains::ingress(ParaId::from(1)), + Parachains::ingress(ParaId::from(1), None), Some(vec![ (9, BlockIngressRoots(vec![ (0.into(), [9; 32].into()) @@ -1766,7 +1776,7 @@ mod tests { // parachain 99 hasn't had any candidates included, so the // ingress is piling up. assert_eq!( - Parachains::ingress(ParaId::from(99)), + Parachains::ingress(ParaId::from(99), None), Some((1..10).map(|i| (i, BlockIngressRoots( vec![(1.into(), [i as u8; 32].into())] ))).collect::>()), @@ -1776,8 +1786,8 @@ mod tests { // after deregistering, there is no ingress to 1, but unrouted messages // from 1 stick around. - assert_eq!(Parachains::ingress(ParaId::from(1)), None); - assert_eq!(Parachains::ingress(ParaId::from(99)), Some((1..10).map(|i| (i, BlockIngressRoots( + assert_eq!(Parachains::ingress(ParaId::from(1), None), None); + assert_eq!(Parachains::ingress(ParaId::from(99), None), Some((1..10).map(|i| (i, BlockIngressRoots( vec![(1.into(), [i as u8; 32].into())] ))).collect::>())); @@ -1809,7 +1819,7 @@ mod tests { System::set_block_number(12); // at the next block, ingress to 99 should be empty. - assert_eq!(Parachains::ingress(ParaId::from(99)), Some(Vec::new())); + assert_eq!(Parachains::ingress(ParaId::from(99), None), Some(Vec::new())); }); } diff --git a/service/src/lib.rs b/service/src/lib.rs index ca95f6164a2c..61e28d123059 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -36,7 +36,7 @@ pub use service::{ServiceBuilderExport, ServiceBuilderImport, ServiceBuilderReve pub use service::config::full_version_from_strs; pub use client::{backend::Backend, runtime_api::{Core as CoreApi, ConstructRuntimeApi}, ExecutionStrategy, CallExecutor}; pub use consensus_common::SelectChain; -pub use polkadot_network::{PolkadotProtocol, NetworkService}; +pub use polkadot_network::{PolkadotProtocol}; pub use polkadot_primitives::parachain::{CollatorId, ParachainHost}; pub use polkadot_primitives::Block; pub use polkadot_runtime::RuntimeApi; @@ -116,7 +116,7 @@ macro_rules! new_full_start { Ok(import_queue) })?; - + (builder, import_setup, inherent_data_providers, tasks_to_spawn) }} } @@ -175,32 +175,34 @@ pub fn new_full(config: Configuration) info!("The node cannot start as an authority because it can't select chain."); return Ok(service); }; - let gossip_validator_select_chain = select_chain.clone(); - let gossip_validator = network_gossip::register_validator( - service.network(), - move |block_hash: &Hash| { - use client::BlockStatus; - - match known_oracle.block_status(&BlockId::hash(*block_hash)) { - Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None, - Ok(BlockStatus::KnownBad) => Some(Known::Bad), - Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => { - match gossip_validator_select_chain.leaves() { - Err(_) => None, - Ok(leaves) => if leaves.contains(block_hash) { - Some(Known::Leaf) - } else { - Some(Known::Old) - }, - } + + let is_known = move |block_hash: &Hash| { + use client::BlockStatus; + + match known_oracle.block_status(&BlockId::hash(*block_hash)) { + Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None, + Ok(BlockStatus::KnownBad) => Some(Known::Bad), + Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => { + match gossip_validator_select_chain.leaves() { + Err(_) => None, + Ok(leaves) => if leaves.contains(block_hash) { + Some(Known::Leaf) + } else { + Some(Known::Old) + }, } } - }, + } + }; + + let gossip_validator = network_gossip::register_validator( + service.network(), + (is_known, client.clone()), ); if service.config().roles.is_authority() { - let extrinsic_store = { + let availability_store = { use std::path::PathBuf; let mut path = PathBuf::from(service.config().database_path.clone()); @@ -212,6 +214,13 @@ pub fn new_full(config: Configuration) })? }; + { + let availability_store = availability_store.clone(); + service.network().with_spec( + |spec, _ctx| spec.register_availability_store(availability_store) + ); + } + // collator connections and validation network both fulfilled by this let validation_network = ValidationNetwork::new( service.network(), @@ -228,7 +237,7 @@ pub fn new_full(config: Configuration) service.transaction_pool(), Arc::new(service.spawn_task_handle()), service.keystore(), - extrinsic_store, + availability_store, polkadot_runtime::constants::time::SLOT_DURATION, service.config().custom.max_block_data_size, ); diff --git a/test-parachains/adder/collator/src/main.rs b/test-parachains/adder/collator/src/main.rs index d677c3b74473..25352b161002 100644 --- a/test-parachains/adder/collator/src/main.rs +++ b/test-parachains/adder/collator/src/main.rs @@ -25,7 +25,7 @@ use substrate_primitives::Pair; use parachain::codec::{Encode, Decode}; use primitives::{ Hash, - parachain::{HeadData, BlockData, Id as ParaId, Message, Extrinsic, Status as ParachainStatus}, + parachain::{HeadData, BlockData, Id as ParaId, Message, OutgoingMessages, Status as ParachainStatus}, }; use collator::{InvalidHead, ParachainContext, VersionInfo, Network, BuildParachainContext}; use parking_lot::Mutex; @@ -53,14 +53,14 @@ struct AdderContext { /// The parachain context. impl ParachainContext for AdderContext { - type ProduceCandidate = Result<(BlockData, HeadData, Extrinsic), InvalidHead>; + type ProduceCandidate = Result<(BlockData, HeadData, OutgoingMessages), InvalidHead>; fn produce_candidate>( &self, _relay_parent: Hash, status: ParachainStatus, ingress: I, - ) -> Result<(BlockData, HeadData, Extrinsic), InvalidHead> + ) -> Result<(BlockData, HeadData, OutgoingMessages), InvalidHead> { let adder_head = AdderHead::decode(&mut &status.head_data.0[..]) .map_err(|_| InvalidHead)?; @@ -94,7 +94,7 @@ impl ParachainContext for AdderContext { next_head.number, next_body.state.overflowing_add(next_body.add).0); db.insert(next_head.clone(), next_body); - Ok((encoded_body, encoded_head, Extrinsic { outgoing_messages: Vec::new() })) + Ok((encoded_body, encoded_head, OutgoingMessages { outgoing_messages: Vec::new() })) } } diff --git a/validation/Cargo.toml b/validation/Cargo.toml index 6e889ffafa3b..ad44c578deb4 100644 --- a/validation/Cargo.toml +++ b/validation/Cargo.toml @@ -14,7 +14,7 @@ derive_more = "0.14.0" log = "0.4.6" exit-future = "0.1" codec = { package = "parity-scale-codec", version = "~1.0.0", default-features = false, features = ["derive"] } -extrinsic_store = { package = "polkadot-availability-store", path = "../availability-store" } +availability_store = { package = "polkadot-availability-store", path = "../availability-store" } parachain = { package = "polkadot-parachain", path = "../parachain" } polkadot-primitives = { path = "../primitives" } polkadot-runtime = { path = "../runtime" } diff --git a/validation/src/attestation_service.rs b/validation/src/attestation_service.rs index cc3a0c453b05..93f9a212c1ea 100644 --- a/validation/src/attestation_service.rs +++ b/validation/src/attestation_service.rs @@ -29,13 +29,13 @@ use client::{error::Result as ClientResult, BlockchainEvents, BlockBody}; use client::block_builder::api::BlockBuilder; use client::blockchain::HeaderBackend; use consensus::SelectChain; -use extrinsic_store::Store as ExtrinsicStore; +use availability_store::Store as AvailabilityStore; use futures::prelude::*; use futures03::{TryStreamExt as _, StreamExt as _}; use log::error; use polkadot_primitives::{Block, BlockId}; use polkadot_primitives::parachain::{CandidateReceipt, ParachainHost}; -use runtime_primitives::traits::{ProvideRuntimeApi, Header as HeaderT}; +use runtime_primitives::traits::{ProvideRuntimeApi}; use babe_primitives::BabeApi; use keystore::KeyStorePtr; @@ -73,7 +73,7 @@ pub(crate) fn fetch_candidates>(client: &P, block: &BlockId) // // NOTE: this will need to be changed to finality notification rather than // block import notifications when the consensus switches to non-instant finality. -fn prune_unneeded_availability

(client: Arc

, extrinsic_store: ExtrinsicStore) +fn prune_unneeded_availability

(client: Arc

, availability_store: AvailabilityStore) -> impl Future + Send where P: Send + Sync + BlockchainEvents + BlockBody + 'static { @@ -94,7 +94,7 @@ fn prune_unneeded_availability

(client: Arc

, extrinsic_store: ExtrinsicStor } }; - if let Err(e) = extrinsic_store.candidates_finalized(parent_hash, candidate_hashes) { + if let Err(e) = availability_store.candidates_finalized(parent_hash, candidate_hashes) { warn!(target: "validation", "Failed to prune unneeded available data: {:?}", e); } @@ -115,7 +115,7 @@ pub(crate) fn start( parachain_validation: Arc>, thread_pool: TaskExecutor, keystore: KeyStorePtr, - extrinsic_store: ExtrinsicStore, + availability_store: AvailabilityStore, max_block_data_size: Option, ) -> ServiceHandle where @@ -148,7 +148,6 @@ pub(crate) fn start( if notification.is_new_best { let res = validation.get_or_instantiate( parent_hash, - notification.header.parent_hash().clone(), &keystore, max_block_data_size, ); @@ -194,7 +193,7 @@ pub(crate) fn start( error!("Failed to spawn old sessions pruning task"); } - let prune_available = prune_unneeded_availability(client, extrinsic_store) + let prune_available = prune_unneeded_availability(client, availability_store) .select(exit.clone()) .then(|_| Ok(())); diff --git a/validation/src/collation.rs b/validation/src/collation.rs index f9a143b915eb..daccf4e7bfe2 100644 --- a/validation/src/collation.rs +++ b/validation/src/collation.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use polkadot_primitives::{Block, Hash, BlockId, Balance, parachain::{ CollatorId, ConsolidatedIngress, StructuredUnroutedIngress, CandidateReceipt, ParachainHost, - Id as ParaId, Collation, Extrinsic, OutgoingMessage, UpwardMessage, FeeSchedule, + Id as ParaId, Collation, TargetedMessage, OutgoingMessages, UpwardMessage, FeeSchedule, }}; use runtime_primitives::traits::ProvideRuntimeApi; use parachain::{wasm_executor::{self, ExternalitiesError, ExecutionMode}, MessageRef, UpwardMessageRef}; @@ -100,10 +100,10 @@ impl CollationFetch { impl Future for CollationFetch where P::Api: ParachainHost, { - type Item = (Collation, Extrinsic); + type Item = (Collation, OutgoingMessages); type Error = C::Error; - fn poll(&mut self) -> Poll<(Collation, Extrinsic), C::Error> { + fn poll(&mut self) -> Poll<(Collation, OutgoingMessages), C::Error> { loop { let collation = { let parachain = self.parachain.clone(); @@ -182,15 +182,15 @@ impl std::error::Error for Error { } } -/// Compute a trie root for a set of messages. +/// Compute a trie root for a set of messages, given the raw message data. pub fn message_queue_root>(messages: I) -> Hash where A: AsRef<[u8]> { - ::trie::trie_types::Layout::::ordered_trie_root(messages) + trie::trie_types::Layout::::ordered_trie_root(messages) } /// Compute the set of egress roots for all given outgoing messages. -pub fn egress_roots(outgoing: &mut [OutgoingMessage]) -> Vec<(ParaId, Hash)> { +pub fn egress_roots(outgoing: &mut [TargetedMessage]) -> Vec<(ParaId, Hash)> { // stable sort messages by parachain ID. outgoing.sort_by_key(|msg| ParaId::from(msg.target)); @@ -214,10 +214,10 @@ pub fn egress_roots(outgoing: &mut [OutgoingMessage]) -> Vec<(ParaId, Hash)> { egress_roots } -fn check_extrinsic( - mut outgoing: Vec, +fn check_egress( + mut outgoing: Vec, expected_egress_roots: &[(ParaId, Hash)], -) -> Result { +) -> Result { // stable sort messages by parachain ID. outgoing.sort_by_key(|msg| ParaId::from(msg.target)); @@ -264,12 +264,12 @@ fn check_extrinsic( } } - Ok(Extrinsic { outgoing_messages: outgoing }) + Ok(OutgoingMessages { outgoing_messages: outgoing }) } struct Externalities { parachain_index: ParaId, - outgoing: Vec, + outgoing: Vec, upward: Vec, fees_charged: Balance, free_balance: Balance, @@ -284,7 +284,7 @@ impl wasm_executor::Externalities for Externalities { } self.apply_message_fee(message.data.len())?; - self.outgoing.push(OutgoingMessage { + self.outgoing.push(TargetedMessage { target, data: message.data.to_vec(), }); @@ -317,11 +317,11 @@ impl Externalities { } } - // Performs final checks of validity, producing the extrinsic data. + // Performs final checks of validity, producing the outgoing message data. fn final_checks( self, candidate: &CandidateReceipt, - ) -> Result { + ) -> Result { if &self.upward != &candidate.upward_messages { return Err(Error::UpwardMessagesInvalid { expected: candidate.upward_messages.clone(), @@ -336,7 +336,7 @@ impl Externalities { }); } - check_extrinsic( + check_egress( self.outgoing, &candidate.egress_queue_roots[..], ) @@ -386,7 +386,7 @@ pub fn validate_collation

( relay_parent: &BlockId, collation: &Collation, max_block_data_size: Option, -) -> Result where +) -> Result where P: ProvideRuntimeApi, P::Api: ParachainHost, { @@ -407,7 +407,7 @@ pub fn validate_collation

( let chain_status = api.parachain_status(relay_parent, para_id)? .ok_or_else(|| Error::InactiveParachain(para_id))?; - let roots = api.ingress(relay_parent, para_id)? + let roots = api.ingress(relay_parent, para_id, None)? .ok_or_else(|| Error::InactiveParachain(para_id))?; validate_incoming(&roots, &collation.pov.ingress)?; @@ -459,42 +459,42 @@ mod tests { #[test] fn compute_and_check_egress() { let messages = vec![ - OutgoingMessage { target: 3.into(), data: vec![1, 1, 1] }, - OutgoingMessage { target: 1.into(), data: vec![1, 2, 3] }, - OutgoingMessage { target: 2.into(), data: vec![4, 5, 6] }, - OutgoingMessage { target: 1.into(), data: vec![7, 8, 9] }, + TargetedMessage { target: 3.into(), data: vec![1, 1, 1] }, + TargetedMessage { target: 1.into(), data: vec![1, 2, 3] }, + TargetedMessage { target: 2.into(), data: vec![4, 5, 6] }, + TargetedMessage { target: 1.into(), data: vec![7, 8, 9] }, ]; let root_1 = message_queue_root(&[vec![1, 2, 3], vec![7, 8, 9]]); let root_2 = message_queue_root(&[vec![4, 5, 6]]); let root_3 = message_queue_root(&[vec![1, 1, 1]]); - assert!(check_extrinsic( + assert!(check_egress( messages.clone(), &[(1.into(), root_1), (2.into(), root_2), (3.into(), root_3)], ).is_ok()); let egress_roots = egress_roots(&mut messages.clone()[..]); - assert!(check_extrinsic( + assert!(check_egress( messages.clone(), &egress_roots[..], ).is_ok()); // missing root. - assert!(check_extrinsic( + assert!(check_egress( messages.clone(), &[(1.into(), root_1), (3.into(), root_3)], ).is_err()); // extra root. - assert!(check_extrinsic( + assert!(check_egress( messages.clone(), &[(1.into(), root_1), (2.into(), root_2), (3.into(), root_3), (4.into(), Default::default())], ).is_err()); // root mismatch. - assert!(check_extrinsic( + assert!(check_egress( messages.clone(), &[(1.into(), root_2), (2.into(), root_1), (3.into(), root_3)], ).is_err()); diff --git a/validation/src/lib.rs b/validation/src/lib.rs index bcb7b45e2c64..c0aa7b0b7644 100644 --- a/validation/src/lib.rs +++ b/validation/src/lib.rs @@ -37,17 +37,17 @@ use client::blockchain::HeaderBackend; use client::block_builder::api::BlockBuilder as BlockBuilderApi; use codec::Encode; use consensus::SelectChain; -use extrinsic_store::Store as ExtrinsicStore; +use availability_store::Store as AvailabilityStore; use parking_lot::Mutex; use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header}; use polkadot_primitives::parachain::{ - Id as ParaId, Chain, DutyRoster, Extrinsic as ParachainExtrinsic, CandidateReceipt, - ParachainHost, AttestedCandidate, Statement as PrimitiveStatement, Message, OutgoingMessage, + Id as ParaId, Chain, DutyRoster, OutgoingMessages, CandidateReceipt, + ParachainHost, AttestedCandidate, Statement as PrimitiveStatement, Message, Collation, PoVBlock, ValidatorSignature, ValidatorPair, ValidatorId }; use primitives::Pair; use runtime_primitives::{ - traits::{ProvideRuntimeApi, Header as HeaderT, DigestFor}, ApplyError + traits::{ProvideRuntimeApi, DigestFor}, ApplyError }; use futures_timer::{Delay, Interval}; use transaction_pool::txpool::{Pool, ChainApi as PoolChainApi}; @@ -88,27 +88,6 @@ const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024; /// Incoming messages; a series of sorted (ParaId, Message) pairs. pub type Incoming = Vec<(ParaId, Vec)>; -/// Outgoing messages from various candidates. -pub type Outgoing = Vec; - -/// Some messages from a parachain. -pub struct MessagesFrom { - /// The parachain originating the messages. - pub from: ParaId, - /// The messages themselves. - pub messages: ParachainExtrinsic, -} - -impl MessagesFrom { - /// Construct from the raw messages. - pub fn from_messages(from: ParaId, messages: Vec) -> Self { - MessagesFrom { - from, - messages: ParachainExtrinsic { outgoing_messages: messages }, - } - } -} - /// A handle to a statement table router. /// /// This is expected to be a lightweight, shared type like an `Arc`. @@ -120,7 +99,7 @@ pub trait TableRouter: Clone { /// Call with local candidate data. This will make the data available on the network, /// and sign, import, and broadcast a statement about the candidate. - fn local_collation(&self, collation: Collation, extrinsic: ParachainExtrinsic); + fn local_collation(&self, collation: Collation, outgoing: OutgoingMessages); /// Fetch validation proof for a specific candidate. fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> Self::FetchValidationProof; @@ -227,6 +206,18 @@ pub fn make_group_info( } +/// Compute the (target, root, messages) of all outgoing queues. +pub fn outgoing_queues(outgoing_targeted: &'_ OutgoingMessages) + -> impl Iterator)> + '_ +{ + outgoing_targeted.message_queues().filter_map(|queue| { + let target = queue.get(0)?.target; + let queue_root = message_queue_root(queue); + let queue_data = queue.iter().map(|msg| msg.clone().into()).collect(); + Some((target, queue_root, queue_data)) + }) +} + // finds the first key we are capable of signing with out of the given set of validators, // if any. fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option> { @@ -249,7 +240,7 @@ struct ParachainValidation { /// handle to remote task executor handle: TaskExecutor, /// Store for extrinsic data. - extrinsic_store: ExtrinsicStore, + availability_store: AvailabilityStore, /// Live agreements. Maps relay chain parent hashes to attestation /// instances. live_instances: Mutex>>, @@ -274,7 +265,6 @@ impl ParachainValidation where fn get_or_instantiate( &self, parent_hash: Hash, - grandparent_hash: Hash, keystore: &KeyStorePtr, max_block_data_size: Option, ) @@ -290,32 +280,6 @@ impl ParachainValidation where let validators = self.client.runtime_api().validators(&id)?; let sign_with = signing_key(&validators[..], keystore); - // compute the parent candidates, if we know of them. - // this will allow us to circulate outgoing messages to other peers as necessary. - let parent_candidates: Vec<_> = crate::attestation_service::fetch_candidates(&*self.client, &id) - .ok() - .and_then(|x| x) - .map(|x| x.collect()) - .unwrap_or_default(); - - // TODO: https://github.com/paritytech/polkadot/issues/253 - // - // We probably don't only want active validators to do this, or messages - // will disappear when validators exit the set. - let _outgoing: Vec<_> = { - // extract all extrinsic data that we have and propagate to peers. - live_instances.get(&grandparent_hash).map(|parent_validation| { - parent_candidates.iter().filter_map(|c| { - let para_id = c.parachain_index; - let hash = c.hash(); - parent_validation.table.extrinsic_data(&hash).map(|ex| MessagesFrom { - from: para_id, - messages: ex, - }) - }).collect() - }).unwrap_or_default() - }; - let duty_roster = self.client.runtime_api().duty_roster(&id)?; let (group_info, local_duty) = make_group_info( @@ -339,7 +303,7 @@ impl ParachainValidation where group_info, sign_with, parent_hash, - self.extrinsic_store.clone(), + self.availability_store.clone(), max_block_data_size, )); @@ -380,10 +344,10 @@ impl ParachainValidation where max_block_data_size: Option, exit: exit_future::Exit, ) { - use extrinsic_store::Data; + use availability_store::Data; let (collators, client) = (self.collators.clone(), self.client.clone()); - let extrinsic_store = self.extrinsic_store.clone(); + let availability_store = self.availability_store.clone(); let with_router = move |router: N::TableRouter| { // fetch a local collation from connected collators. @@ -396,20 +360,24 @@ impl ParachainValidation where ); collation_work.then(move |result| match result { - Ok((collation, extrinsic)) => { - let res = extrinsic_store.make_available(Data { + Ok((collation, outgoing_targeted)) => { + let outgoing_queues = crate::outgoing_queues(&outgoing_targeted) + .map(|(_target, root, data)| (root, data)) + .collect(); + + let res = availability_store.make_available(Data { relay_parent, parachain_id: collation.receipt.parachain_index, candidate_hash: collation.receipt.hash(), block_data: collation.pov.block_data.clone(), - extrinsic: Some(extrinsic.clone()), + outgoing_queues: Some(outgoing_queues), }); match res { Ok(()) => { // TODO: https://github.com/paritytech/polkadot/issues/51 // Erasure-code and provide merkle branches. - router.local_collation(collation, extrinsic); + router.local_collation(collation, outgoing_targeted); } Err(e) => warn!( target: "validation", @@ -482,7 +450,7 @@ impl ProposerFactory where transaction_pool: Arc>, thread_pool: TaskExecutor, keystore: KeyStorePtr, - extrinsic_store: ExtrinsicStore, + availability_store: AvailabilityStore, babe_slot_duration: u64, max_block_data_size: Option, ) -> Self { @@ -491,7 +459,7 @@ impl ProposerFactory where network, collators, handle: thread_pool.clone(), - extrinsic_store: extrinsic_store.clone(), + availability_store: availability_store.clone(), live_instances: Mutex::new(HashMap::new()), }); @@ -501,7 +469,7 @@ impl ProposerFactory where parachain_validation.clone(), thread_pool, keystore.clone(), - extrinsic_store, + availability_store, max_block_data_size, ); @@ -540,7 +508,6 @@ impl consensus::Environment for ProposerFactory, trackers: Vec, - extrinsic_store: ExtrinsicStore, + availability_store: AvailabilityStore, validated: HashMap, } @@ -186,7 +186,7 @@ impl SharedTableInner { }; work.map(|work| ParachainWork { - extrinsic_store: self.extrinsic_store.clone(), + availability_store: self.availability_store.clone(), relay_parent: context.parent_hash.clone(), work, max_block_data_size, @@ -221,24 +221,24 @@ impl Validated { } /// Note that we've validated a candidate with given hash and it is good. - /// Extrinsic data required. - pub fn known_good(hash: Hash, collation: PoVBlock, extrinsic: Extrinsic) -> Self { + /// outgoing message required. + pub fn known_good(hash: Hash, collation: PoVBlock, outgoing: OutgoingMessages) -> Self { Validated { statement: GenericStatement::Valid(hash), - result: Validation::Valid(collation, extrinsic), + result: Validation::Valid(collation, outgoing), } } /// Note that we've collated a candidate. - /// Extrinsic data required. + /// outgoing message required. pub fn collated_local( receipt: CandidateReceipt, collation: PoVBlock, - extrinsic: Extrinsic, + outgoing: OutgoingMessages, ) -> Self { Validated { statement: GenericStatement::Candidate(receipt), - result: Validation::Valid(collation, extrinsic), + result: Validation::Valid(collation, outgoing), } } @@ -249,8 +249,8 @@ impl Validated { } } - /// Get a reference to the extrinsic data, if any. - pub fn extrinsic(&self) -> Option<&Extrinsic> { + /// Get a reference to the outgoing messages data, if any. + pub fn outgoing_messages(&self) -> Option<&OutgoingMessages> { match self.result { Validation::Valid(_, ref ex) => Some(ex), Validation::Invalid(_) => None, @@ -262,7 +262,7 @@ impl Validated { pub struct ParachainWork { work: Work, relay_parent: Hash, - extrinsic_store: ExtrinsicStore, + availability_store: AvailabilityStore, max_block_data_size: Option, } @@ -272,7 +272,7 @@ impl ParachainWork { pub fn prime(self, api: Arc

) -> PrimedParachainWork< Fetch, - impl Send + FnMut(&BlockId, &Collation) -> Result, + impl Send + FnMut(&BlockId, &Collation) -> Result, > where P: Send + Sync + 'static, @@ -301,7 +301,7 @@ impl ParachainWork { /// Prime the parachain work with a custom validation function. pub fn prime_with(self, validate: F) -> PrimedParachainWork - where F: FnMut(&BlockId, &Collation) -> Result + where F: FnMut(&BlockId, &Collation) -> Result { PrimedParachainWork { inner: self, validate } } @@ -321,7 +321,7 @@ pub struct PrimedParachainWork { impl Future for PrimedParachainWork where Fetch: Future, - F: FnMut(&BlockId, &Collation) -> Result, + F: FnMut(&BlockId, &Collation) -> Result, Err: From<::std::io::Error>, { type Item = Validated; @@ -347,18 +347,22 @@ impl Future for PrimedParachainWork GenericStatement::Invalid(candidate_hash), Validation::Invalid(pov_block), ), - Ok(extrinsic) => { - self.inner.extrinsic_store.make_available(Data { + Ok(outgoing_targeted) => { + let outgoing_queues = crate::outgoing_queues(&outgoing_targeted) + .map(|(_target, root, data)| (root, data)) + .collect(); + + self.inner.availability_store.make_available(Data { relay_parent: self.inner.relay_parent, parachain_id: work.candidate_receipt.parachain_index, candidate_hash, block_data: pov_block.block_data.clone(), - extrinsic: Some(extrinsic.clone()), + outgoing_queues: Some(outgoing_queues), })?; ( GenericStatement::Valid(candidate_hash), - Validation::Valid(pov_block, extrinsic) + Validation::Valid(pov_block, outgoing_targeted) ) } }; @@ -397,7 +401,7 @@ impl SharedTable { groups: HashMap, key: Option>, parent_hash: Hash, - extrinsic_store: ExtrinsicStore, + availability_store: AvailabilityStore, max_block_data_size: Option, ) -> Self { SharedTable { @@ -407,7 +411,7 @@ impl SharedTable { table: Table::default(), validated: HashMap::new(), trackers: Vec::new(), - extrinsic_store, + availability_store, })) } } @@ -427,19 +431,6 @@ impl SharedTable { &self.context.groups } - /// Get extrinsic data for candidate with given hash, if any. - /// - /// This will return `Some` for any candidates that have been validated - /// locally. - pub(crate) fn extrinsic_data(&self, hash: &Hash) -> Option { - self.inner.lock().validated.get(hash).and_then(|x| match *x { - ValidationWork::Error(_) => None, - ValidationWork::InProgress => None, - ValidationWork::Done(Validation::Invalid(_)) => None, - ValidationWork::Done(Validation::Valid(_, ref ex)) => Some(ex.clone()), - }) - } - /// Import a single statement with remote source, whose signature has already been checked. /// /// The statement producer, if any, will produce only statements concerning the same candidate @@ -598,7 +589,7 @@ mod tests { type Error = ::std::io::Error; type FetchValidationProof = future::FutureResult; - fn local_collation(&self, _collation: Collation, _extrinsic: Extrinsic) { + fn local_collation(&self, _collation: Collation, _outgoing: OutgoingMessages) { } fn fetch_pov_block(&self, _candidate: &CandidateReceipt) -> Self::FetchValidationProof { @@ -631,7 +622,7 @@ mod tests { groups, Some(local_key.clone()), parent_hash, - ExtrinsicStore::new_in_memory(), + AvailabilityStore::new_in_memory(), None, ); @@ -686,7 +677,7 @@ mod tests { groups, Some(local_key.clone()), parent_hash, - ExtrinsicStore::new_in_memory(), + AvailabilityStore::new_in_memory(), None, ); @@ -718,7 +709,7 @@ mod tests { #[test] fn evaluate_makes_block_data_available() { - let store = ExtrinsicStore::new_in_memory(); + let store = AvailabilityStore::new_in_memory(); let relay_parent = [0; 32].into(); let para_id = 5.into(); let pov_block = pov_block_with_data(vec![1, 2, 3]); @@ -742,11 +733,11 @@ mod tests { fetch: future::ok(pov_block.clone()), }, relay_parent, - extrinsic_store: store.clone(), + availability_store: store.clone(), max_block_data_size: None, }; - let validated = producer.prime_with(|_, _| Ok(Extrinsic { outgoing_messages: Vec::new() })) + let validated = producer.prime_with(|_, _| Ok(OutgoingMessages { outgoing_messages: Vec::new() })) .wait() .unwrap(); @@ -754,12 +745,12 @@ mod tests { assert_eq!(validated.statement, GenericStatement::Valid(hash)); assert_eq!(store.block_data(relay_parent, hash).unwrap(), pov_block.block_data); - assert!(store.extrinsic(relay_parent, hash).is_some()); + // TODO: check that a message queue is included by root. } #[test] fn full_availability() { - let store = ExtrinsicStore::new_in_memory(); + let store = AvailabilityStore::new_in_memory(); let relay_parent = [0; 32].into(); let para_id = 5.into(); let pov_block = pov_block_with_data(vec![1, 2, 3]); @@ -783,18 +774,18 @@ mod tests { fetch: future::ok::<_, ::std::io::Error>(pov_block.clone()), }, relay_parent, - extrinsic_store: store.clone(), + availability_store: store.clone(), max_block_data_size: None, }; - let validated = producer.prime_with(|_, _| Ok(Extrinsic { outgoing_messages: Vec::new() })) + let validated = producer.prime_with(|_, _| Ok(OutgoingMessages { outgoing_messages: Vec::new() })) .wait() .unwrap(); assert_eq!(validated.pov_block(), &pov_block); assert_eq!(store.block_data(relay_parent, hash).unwrap(), pov_block.block_data); - assert!(store.extrinsic(relay_parent, hash).is_some()); + // TODO: check that a message queue is included by root. } #[test] @@ -822,7 +813,7 @@ mod tests { groups, Some(local_key.clone()), parent_hash, - ExtrinsicStore::new_in_memory(), + AvailabilityStore::new_in_memory(), None, ); @@ -868,7 +859,7 @@ mod tests { let para_id = ParaId::from(1); let pov_block = pov_block_with_data(vec![1, 2, 3]); - let extrinsic = Extrinsic { outgoing_messages: Vec::new() }; + let outgoing_messages = OutgoingMessages { outgoing_messages: Vec::new() }; let parent_hash = Default::default(); let local_key = Sr25519Keyring::Alice.pair(); @@ -888,7 +879,7 @@ mod tests { groups, Some(local_key.clone()), parent_hash, - ExtrinsicStore::new_in_memory(), + AvailabilityStore::new_in_memory(), None, ); @@ -907,7 +898,7 @@ mod tests { let signed_statement = shared_table.import_validated(Validated::collated_local( candidate, pov_block, - extrinsic, + outgoing_messages, )).unwrap(); assert!(shared_table.inner.lock().validated.get(&hash).expect("validation has started").is_done());