Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

ICMP message-routing gossip #304

Merged
merged 39 commits into from
Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
207e5b6
core logic for ICMP gossip
rphmeier Jun 27, 2019
360c729
refactor gossip to make more extension friendly
rphmeier Jun 27, 2019
9922f02
move files aroun
rphmeier Jun 27, 2019
0125f18
extract attestation-gossip logic to its own module
rphmeier Jun 27, 2019
5e25763
Merge branch 'master' into rh-icmp-gossip-primitives
rphmeier Jul 24, 2019
3a4bd99
message validation and broadcast logic
rphmeier Jul 26, 2019
9e76e36
fix upstream crates' compilation
rphmeier Jul 28, 2019
65bbe33
add a test
rphmeier Jul 28, 2019
e9e6032
another test for overlapping
rphmeier Jul 30, 2019
a192da1
Some grammar and phrasing tweaks
rphmeier Aug 2, 2019
8d4e67b
Merge branch 'master' into rh-icmp-gossip-primitives
rphmeier Aug 8, 2019
63f26b0
add since parameter to ingress runtime API
rphmeier Aug 9, 2019
ec7d9de
broadcast out known unrouted message queues
rphmeier Aug 9, 2019
bdc213e
fix compilation of service and collator
rphmeier Aug 9, 2019
cfa47b8
Merge branch 'rh-icmp-gossip-primitives' of github.com:paritytech/pol…
rphmeier Aug 9, 2019
b38b712
remove useless index_mapping
rphmeier Aug 9, 2019
727b92c
Merge branch 'master' into rh-icmp-gossip-primitives
rphmeier Aug 12, 2019
6a9269d
some tests for icmp propagation
rphmeier Aug 12, 2019
ced6186
fix decoding bug and test icmp queue validation
rphmeier Aug 12, 2019
a2ea950
simplify engine-id definition
rphmeier Aug 14, 2019
158942e
address some grumbles
rphmeier Aug 14, 2019
a5b5591
some cleanup of old circulation code
rphmeier Aug 14, 2019
251cf2f
give network a handle to extrinsic store on startup
rphmeier Aug 14, 2019
14c58fd
an honest collator ensures data available as well
rphmeier Aug 14, 2019
0cec9da
address some grumbles
rphmeier Aug 14, 2019
e427fd9
add docs; rename the attestation session to "leaf work"
rphmeier Aug 14, 2019
3d8df1f
module docs
rphmeier Aug 14, 2019
8e6ef2c
move gossip back to gossip.rs
rphmeier Aug 15, 2019
02ecf76
clean up and document attestation-gossip a bit
rphmeier Aug 15, 2019
5ccf536
some more docs on the availability store
rphmeier Aug 15, 2019
acc36af
store all outgoing message queues in the availability store
rphmeier Aug 15, 2019
d736ea3
filter `Extrinsic` out of validation crate
rphmeier Aug 15, 2019
bded179
expunge Extrinsic from network
rphmeier Aug 15, 2019
ad31039
expunge Extrinsic from erasure-coding
rphmeier Aug 15, 2019
550c4fd
expunge Extrinsic from collator
rphmeier Aug 15, 2019
7a82f33
expunge from adder-collator
rphmeier Aug 15, 2019
e900307
rename ExtrinsicStore to AvailabilityStore everywhere
rphmeier Aug 15, 2019
093a912
annotate and clean up message-routing tests
rphmeier Aug 15, 2019
bba4c5a
Merge branch 'master' into rh-icmp-gossip-primitives
rphmeier Aug 29, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 74 additions & 28 deletions availability-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Persistent database for parachain data.
//! Persistent database for parachain data: PoV block data and outgoing messages.
//!
//! This will be written into during the block validation pipeline, and queried
//! by networking code in order to circulate required data and maintain availability
//! of it.

use codec::{Encode, Decode};
use kvdb::{KeyValueDB, DBTransaction};
use kvdb_rocksdb::{Database, DatabaseConfig};
use polkadot_primitives::Hash;
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Message};
use log::warn;

use std::collections::HashSet;
Expand All @@ -42,7 +46,7 @@ pub struct Config {
pub path: PathBuf,
}

/// Some data to keep available.
/// Some data to keep available about a parachain block candidate.
pub struct Data {
/// The relay chain parent hash this should be localized to.
pub relay_parent: Hash,
Expand All @@ -52,18 +56,16 @@ pub struct Data {
pub candidate_hash: Hash,
/// Block data.
pub block_data: BlockData,
/// Extrinsic data.
pub extrinsic: Option<Extrinsic>,
/// Outgoing message queues from execution of the block, if any.
///
/// The tuple pairs the message queue root and the queue data.
pub outgoing_queues: Option<Vec<(Hash, Vec<Message>)>>,
}

fn block_data_key(relay_parent: &Hash, candidate_hash: &Hash) -> Vec<u8> {
(relay_parent, candidate_hash, 0i8).encode()
}

fn extrinsic_key(relay_parent: &Hash, candidate_hash: &Hash) -> Vec<u8> {
(relay_parent, candidate_hash, 1i8).encode()
}

/// Handle to the availability store.
#[derive(Clone)]
pub struct Store {
Expand Down Expand Up @@ -96,6 +98,16 @@ impl Store {
}

/// Make some data available provisionally.
///
/// Validators with the responsibility of maintaining availability
/// for a block or collators collating a block will call this function
/// in order to persist that data to disk and so it can be queried and provided
/// to other nodes in the network.
///
/// The message data of `Data` is optional but is expected
/// to be present with the exception of the case where there is no message data
/// due to the block's invalidity. Determination of invalidity is beyond the
/// scope of this function.
pub fn make_available(&self, data: Data) -> io::Result<()> {
let mut tx = DBTransaction::new();

Expand All @@ -118,12 +130,16 @@ impl Store {
data.block_data.encode()
);

if let Some(extrinsic) = data.extrinsic {
tx.put_vec(
columns::DATA,
extrinsic_key(&data.relay_parent, &data.candidate_hash).as_slice(),
extrinsic.encode(),
);
if let Some(outgoing_queues) = data.outgoing_queues {
// This is kept forever and not pruned.
for (root, messages) in outgoing_queues {
tx.put_vec(
columns::DATA,
root.as_ref(),
messages.encode(),
);
}

}

self.inner.write(tx)
Expand All @@ -146,7 +162,6 @@ impl Store {
for candidate_hash in v {
if !finalized_candidates.contains(&candidate_hash) {
tx.delete(columns::DATA, block_data_key(&parent, &candidate_hash).as_slice());
tx.delete(columns::DATA, extrinsic_key(&parent, &candidate_hash).as_slice());
}
}

Expand All @@ -168,12 +183,11 @@ impl Store {
}
}

/// Query extrinsic data.
pub fn extrinsic(&self, relay_parent: Hash, candidate_hash: Hash) -> Option<Extrinsic> {
let encoded_key = extrinsic_key(&relay_parent, &candidate_hash);
match self.inner.get(columns::DATA, &encoded_key[..]) {
/// Query message queue data by message queue root hash.
pub fn queue_by_root(&self, queue_root: &Hash) -> Option<Vec<Message>> {
match self.inner.get(columns::DATA, queue_root.as_ref()) {
Ok(Some(raw)) => Some(
Extrinsic::decode(&mut &raw[..]).expect("all stored data serialized correctly; qed")
<_>::decode(&mut &raw[..]).expect("all stored data serialized correctly; qed")
),
Ok(None) => None,
Err(e) => {
Expand Down Expand Up @@ -207,29 +221,61 @@ mod tests {
parachain_id: para_id_1,
candidate_hash: candidate_1,
block_data: block_data_1.clone(),
extrinsic: Some(Extrinsic { outgoing_messages: Vec::new() }),
outgoing_queues: None,
}).unwrap();

store.make_available(Data {
relay_parent,
parachain_id: para_id_2,
candidate_hash: candidate_2,
block_data: block_data_2.clone(),
extrinsic: Some(Extrinsic { outgoing_messages: Vec::new() }),
outgoing_queues: None,
}).unwrap();

assert_eq!(store.block_data(relay_parent, candidate_1).unwrap(), block_data_1);
assert_eq!(store.block_data(relay_parent, candidate_2).unwrap(), block_data_2);

assert!(store.extrinsic(relay_parent, candidate_1).is_some());
assert!(store.extrinsic(relay_parent, candidate_2).is_some());

store.candidates_finalized(relay_parent, [candidate_1].iter().cloned().collect()).unwrap();

assert_eq!(store.block_data(relay_parent, candidate_1).unwrap(), block_data_1);
assert!(store.block_data(relay_parent, candidate_2).is_none());
}

#[test]
fn queues_available_by_queue_root() {
let relay_parent = [1; 32].into();
let para_id = 5.into();
let candidate = [2; 32].into();
let block_data = BlockData(vec![1, 2, 3]);

let message_queue_root_1 = [0x42; 32].into();
let message_queue_root_2 = [0x43; 32].into();

assert!(store.extrinsic(relay_parent, candidate_1).is_some());
assert!(store.extrinsic(relay_parent, candidate_2).is_none());
let message_a = Message(vec![1, 2, 3, 4]);
let message_b = Message(vec![4, 5, 6, 7]);

let outgoing_queues = vec![
(message_queue_root_1, vec![message_a.clone()]),
(message_queue_root_2, vec![message_b.clone()]),
];

let store = Store::new_in_memory();
store.make_available(Data {
relay_parent,
parachain_id: para_id,
candidate_hash: candidate,
block_data: block_data.clone(),
outgoing_queues: Some(outgoing_queues),
}).unwrap();

assert_eq!(
store.queue_by_root(&message_queue_root_1),
Some(vec![message_a]),
);

assert_eq!(
store.queue_by_root(&message_queue_root_2),
Some(vec![message_b]),
);
}
}
97 changes: 54 additions & 43 deletions collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ use primitives::Pair;
use polkadot_primitives::{
BlockId, Hash, Block,
parachain::{
self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic,
self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, OutgoingMessages,
PoVBlock, Status as ParachainStatus, ValidatorId, CollatorPair,
}
};
use polkadot_cli::{
Worker, IntoExit, ProvideRuntimeApi, TaskExecutor, PolkadotService, CustomConfiguration,
ParachainHost,
};
use polkadot_network::validation::{SessionParams, ValidationNetwork};
use polkadot_network::NetworkService;
use polkadot_network::validation::{LeafWorkParams, ValidationNetwork};
use polkadot_network::PolkadotNetworkService;
use tokio::timer::Timeout;
use consensus_common::SelectChain;

Expand All @@ -92,7 +92,7 @@ pub trait Network: Send + Sync {
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement, Error=()>>;
}

impl<P, E> Network for ValidationNetwork<P, E, NetworkService, TaskExecutor> where
impl<P, E> Network for ValidationNetwork<P, E, PolkadotNetworkService, TaskExecutor> where
P: 'static + Send + Sync,
E: 'static + Send + Sync,
{
Expand Down Expand Up @@ -143,7 +143,7 @@ pub trait BuildParachainContext {
/// This can be implemented through an externally attached service or a stub.
/// This is expected to be a lightweight, shared type like an Arc.
pub trait ParachainContext: Clone {
type ProduceCandidate: IntoFuture<Item=(BlockData, HeadData, Extrinsic), Error=InvalidHead>;
type ProduceCandidate: IntoFuture<Item=(BlockData, HeadData, OutgoingMessages), Error=InvalidHead>;

/// Produce a candidate, given the relay parent hash, the latest ingress queue information
/// and the last parachain head.
Expand Down Expand Up @@ -178,7 +178,7 @@ pub fn collate<'a, R, P>(
para_context: P,
key: Arc<CollatorPair>,
)
-> impl Future<Item=parachain::Collation, Error=Error<R::Error>> + 'a
-> impl Future<Item=(parachain::Collation, OutgoingMessages), Error=Error<R::Error>> + 'a
where
R: RelayChainContext,
R::Error: 'a,
Expand All @@ -198,11 +198,11 @@ pub fn collate<'a, R, P>(
.map(move |x| (ingress, x))
.map_err(Error::Collator)
})
.and_then(move |(ingress, (block_data, head_data, mut extrinsic))| {
.and_then(move |(ingress, (block_data, head_data, mut outgoing))| {
let block_data_hash = block_data.hash();
let signature = key.sign(block_data_hash.as_ref()).into();
let egress_queue_roots =
polkadot_validation::egress_roots(&mut extrinsic.outgoing_messages);
polkadot_validation::egress_roots(&mut outgoing.outgoing_messages);

let receipt = parachain::CandidateReceipt {
parachain_index: local_id,
Expand All @@ -215,19 +215,21 @@ pub fn collate<'a, R, P>(
upward_messages: Vec::new(),
};

Ok(parachain::Collation {
let collation = parachain::Collation {
receipt,
pov: PoVBlock {
block_data,
ingress,
},
})
};

Ok((collation, outgoing))
})
}

/// Polkadot-api context.
struct ApiContext<P, E> {
network: Arc<ValidationNetwork<P, E, NetworkService, TaskExecutor>>,
network: Arc<ValidationNetwork<P, E, PolkadotNetworkService, TaskExecutor>>,
parent_hash: Hash,
validators: Vec<ValidatorId>,
}
Expand All @@ -244,7 +246,7 @@ impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
// TODO: https://github.com/paritytech/polkadot/issues/253
//
// Fetch ingress and accumulate all unrounted egress
let _session = self.network.instantiate_session(SessionParams {
let _session = self.network.instantiate_leaf_work(LeafWorkParams {
local_session_key: None,
parent_hash: self.parent_hash,
authorities: self.validators.clone(),
Expand Down Expand Up @@ -301,26 +303,28 @@ impl<P, E> Worker for CollationNode<P, E> where
return Box::new(future::err(()));
};

let is_known = move |block_hash: &Hash| {
use client::BlockStatus;
use polkadot_network::gossip::Known;

match known_oracle.block_status(&BlockId::hash(*block_hash)) {
Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None,
Ok(BlockStatus::KnownBad) => Some(Known::Bad),
Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) =>
match select_chain.leaves() {
Err(_) => None,
Ok(leaves) => if leaves.contains(block_hash) {
Some(Known::Leaf)
} else {
Some(Known::Old)
},
}
}
};

let message_validator = polkadot_network::gossip::register_validator(
network.clone(),
move |block_hash: &Hash| {
use client::BlockStatus;
use polkadot_network::gossip::Known;

match known_oracle.block_status(&BlockId::hash(*block_hash)) {
Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None,
Ok(BlockStatus::KnownBad) => Some(Known::Bad),
Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) =>
match select_chain.leaves() {
Err(_) => None,
Ok(leaves) => if leaves.contains(block_hash) {
Some(Known::Leaf)
} else {
Some(Known::Old)
},
}
}
},
(is_known, client.clone()),
);

let validation_network = Arc::new(ValidationNetwork::new(
Expand Down Expand Up @@ -384,13 +388,20 @@ impl<P, E> Worker for CollationNode<P, E> where
context,
parachain_context,
key,
).map(move |collation| {
network.with_spec(move |spec, ctx| spec.add_local_collation(
ctx,
relay_parent,
targets,
collation,
));
).map(move |(collation, outgoing)| {
network.with_spec(move |spec, ctx| {
let res = spec.add_local_collation(
ctx,
relay_parent,
targets,
collation,
outgoing,
);

if let Err(e) = res {
warn!("Unable to broadcast local collation: {:?}", e);
}
})
});

future::Either::B(collation_work)
Expand Down Expand Up @@ -448,7 +459,7 @@ pub fn run_collator<P, E>(
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use polkadot_primitives::parachain::{OutgoingMessage, FeeSchedule};
use polkadot_primitives::parachain::{TargetedMessage, FeeSchedule};
use keyring::Sr25519Keyring;
use super::*;

Expand All @@ -473,20 +484,20 @@ mod tests {
struct DummyParachainContext;

impl ParachainContext for DummyParachainContext {
type ProduceCandidate = Result<(BlockData, HeadData, Extrinsic), InvalidHead>;
type ProduceCandidate = Result<(BlockData, HeadData, OutgoingMessages), InvalidHead>;

fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
&self,
_relay_parent: Hash,
_status: ParachainStatus,
ingress: I,
) -> Result<(BlockData, HeadData, Extrinsic), InvalidHead> {
) -> Result<(BlockData, HeadData, OutgoingMessages), InvalidHead> {
// send messages right back.
Ok((
BlockData(vec![1, 2, 3, 4, 5,]),
HeadData(vec![9, 9, 9]),
Extrinsic {
outgoing_messages: ingress.into_iter().map(|(id, msg)| OutgoingMessage {
OutgoingMessages {
outgoing_messages: ingress.into_iter().map(|(id, msg)| TargetedMessage {
target: id,
data: msg.0,
}).collect(),
Expand Down Expand Up @@ -540,7 +551,7 @@ mod tests {
context.clone(),
DummyParachainContext,
Arc::new(Sr25519Keyring::Alice.pair().into()),
).wait().unwrap();
).wait().unwrap().0;

// ascending order by root.
assert_eq!(collation.receipt.egress_queue_roots, vec![(a, root_a), (b, root_b)]);
Expand Down
Loading