Skip to content

Commit

Permalink
Introduce a Proof-of-Validation block type and use that in place of B…
Browse files Browse the repository at this point in the history
…lockData (paritytech#227)

* validators expect collators to give them parachain messages

* mostly port network to use pov_block

* network tests pass

* verify ingress when fetching pov block

* fix runtime compilation

* all tests build

* fix some grumbles

* Update validation/src/collation.rs

Co-Authored-By: rphmeier <[email protected]>

* Update primitives/src/parachain.rs

Co-Authored-By: rphmeier <[email protected]>

* Update network/src/lib.rs

Co-Authored-By: rphmeier <[email protected]>
  • Loading branch information
rphmeier authored Apr 24, 2019
1 parent e7fbcfe commit 75e827c
Show file tree
Hide file tree
Showing 17 changed files with 380 additions and 234 deletions.
15 changes: 10 additions & 5 deletions collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ use futures::{future, Stream, Future, IntoFuture};
use client::BlockchainEvents;
use primitives::{ed25519, Pair};
use polkadot_primitives::{BlockId, SessionKey, Hash, Block};
use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic};
use polkadot_primitives::parachain::{
self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic,
PoVBlock,
};
use polkadot_cli::{PolkadotService, CustomConfiguration, CoreApi, ParachainHost};
use polkadot_cli::{Worker, IntoExit, ProvideRuntimeApi, TaskExecutor};
use polkadot_network::validation::{ValidationNetwork, SessionParams};
Expand Down Expand Up @@ -148,10 +151,10 @@ pub fn collate<'a, R, P>(
P: ParachainContext + 'a,
{
let ingress = relay_context.unrouted_egress(local_id).into_future().map_err(Error::Polkadot);
ingress.and_then(move |ConsolidatedIngress(ingress)| {
ingress.and_then(move |ingress| {
let (block_data, head_data, mut extrinsic) = para_context.produce_candidate(
last_head,
ingress.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
).map_err(Error::Collator)?;

let block_data_hash = block_data.hash();
Expand All @@ -170,10 +173,12 @@ pub fn collate<'a, R, P>(
block_data_hash,
};

// not necessary to send extrinsic because it is recomputed from execution.
Ok(parachain::Collation {
receipt,
block_data,
pov: PoVBlock {
block_data,
ingress,
},
})
})
}
Expand Down
15 changes: 12 additions & 3 deletions network/src/collator_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,18 @@ impl CollatorPool {
mod tests {
use super::*;
use substrate_primitives::crypto::UncheckedInto;
use polkadot_primitives::parachain::{CandidateReceipt, BlockData, HeadData};
use polkadot_primitives::parachain::{
CandidateReceipt, BlockData, PoVBlock, HeadData, ConsolidatedIngress,
};
use futures::Future;

fn make_pov(block_data: Vec<u8>) -> PoVBlock {
PoVBlock {
block_data: BlockData(block_data),
ingress: ConsolidatedIngress(Vec::new()),
}
}

#[test]
fn disconnect_primary_gives_new_primary() {
let mut pool = CollatorPool::new();
Expand Down Expand Up @@ -272,7 +281,7 @@ mod tests {
fees: 0,
block_data_hash: [3; 32].into(),
},
block_data: BlockData(vec![4, 5, 6]),
pov: make_pov(vec![4, 5, 6]),
});

rx1.wait().unwrap();
Expand All @@ -299,7 +308,7 @@ mod tests {
fees: 0,
block_data_hash: [3; 32].into(),
},
block_data: BlockData(vec![4, 5, 6]),
pov: make_pov(vec![4, 5, 6]),
});

let (tx, rx) = oneshot::channel();
Expand Down
98 changes: 77 additions & 21 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ pub mod gossip;
use codec::{Decode, Encode};
use futures::sync::oneshot;
use polkadot_primitives::{Block, SessionKey, Hash, Header};
use polkadot_primitives::parachain::{Id as ParaId, CollatorId, BlockData, CandidateReceipt, Collation};
use polkadot_primitives::parachain::{
Id as ParaId, BlockData, CollatorId, CandidateReceipt, Collation, PoVBlock,
ConsolidatedIngressRoots,
};
use substrate_network::{PeerId, RequestId, Context, Severity};
use substrate_network::{message, generic_message};
use substrate_network::specialization::NetworkSpecialization as Specialization;
Expand Down Expand Up @@ -84,12 +87,33 @@ pub struct Status {
collating_for: Option<(CollatorId, ParaId)>,
}

struct BlockDataRequest {
struct PoVBlockRequest {
attempted_peers: HashSet<SessionKey>,
validation_session_parent: Hash,
candidate_hash: Hash,
block_data_hash: Hash,
sender: oneshot::Sender<BlockData>,
sender: oneshot::Sender<PoVBlock>,
canon_roots: ConsolidatedIngressRoots,
}

impl PoVBlockRequest {
// Attempt to process a response. If the provided block is invalid,
// this returns an error result containing the unmodified request.
//
// If `Ok(())` is returned, that indicates that the request has been processed.
fn process_response(self, pov_block: PoVBlock) -> Result<(), Self> {
if pov_block.block_data.hash() != self.block_data_hash {
return Err(self);
}

match polkadot_validation::validate_incoming(&self.canon_roots, &pov_block.ingress) {
Ok(()) => {
let _ = self.sender.send(pov_block);
Ok(())
}
Err(_) => Err(self)
}
}
}

// ensures collator-protocol messages are sent in correct order.
Expand Down Expand Up @@ -147,9 +171,13 @@ pub enum Message {
// TODO: do this with a cryptographic proof of some kind
// https://github.com/paritytech/polkadot/issues/47
SessionKey(SessionKey),
/// Requesting parachain block data by (relay_parent, candidate_hash).
/// Requesting parachain proof-of-validation block (relay_parent, candidate_hash).
RequestPovBlock(RequestId, Hash, Hash),
/// Provide requested proof-of-validation block data by candidate hash or nothing if unknown.
PovBlock(RequestId, Option<PoVBlock>),
/// Request block data (relay_parent, candidate_hash)
RequestBlockData(RequestId, Hash, Hash),
/// Provide block data by candidate hash or nothing if unknown.
/// Provide requested block data by candidate hash or nothing.
BlockData(RequestId, Option<BlockData>),
/// Tell a collator their role.
CollatorRole(Role),
Expand All @@ -171,8 +199,8 @@ pub struct PolkadotProtocol {
validators: HashMap<SessionKey, PeerId>,
local_collations: LocalCollations<Collation>,
live_validation_sessions: LiveValidationSessions,
in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>,
pending: Vec<BlockDataRequest>,
in_flight: HashMap<(RequestId, PeerId), PoVBlockRequest>,
pending: Vec<PoVBlockRequest>,
extrinsic_store: Option<::av_store::Store>,
next_req_id: u64,
}
Expand All @@ -195,15 +223,22 @@ impl PolkadotProtocol {
}

/// Fetch block data by candidate receipt.
fn fetch_block_data(&mut self, ctx: &mut Context<Block>, candidate: &CandidateReceipt, relay_parent: Hash) -> oneshot::Receiver<BlockData> {
fn fetch_pov_block(
&mut self,
ctx: &mut Context<Block>,
candidate: &CandidateReceipt,
relay_parent: Hash,
canon_roots: ConsolidatedIngressRoots,
) -> oneshot::Receiver<PoVBlock> {
let (tx, rx) = oneshot::channel();

self.pending.push(BlockDataRequest {
self.pending.push(PoVBlockRequest {
attempted_peers: Default::default(),
validation_session_parent: relay_parent,
candidate_hash: candidate.hash(),
block_data_hash: candidate.block_data_hash,
sender: tx,
canon_roots,
});

self.dispatch_pending_requests(ctx);
Expand Down Expand Up @@ -250,7 +285,7 @@ impl PolkadotProtocol {
let parent = pending.validation_session_parent;
let c_hash = pending.candidate_hash;

let still_pending = self.live_validation_sessions.with_block_data(&parent, &c_hash, |x| match x {
let still_pending = self.live_validation_sessions.with_pov_block(&parent, &c_hash, |x| match x {
Ok(data @ &_) => {
// answer locally.
let _ = pending.sender.send(data.clone());
Expand All @@ -270,7 +305,7 @@ impl PolkadotProtocol {
send_polkadot_message(
ctx,
who.clone(),
Message::RequestBlockData(req_id, parent, c_hash),
Message::RequestPovBlock(req_id, parent, c_hash),
);

in_flight.insert((req_id, who), pending);
Expand All @@ -295,20 +330,33 @@ impl PolkadotProtocol {
trace!(target: "p_net", "Polkadot message from {}: {:?}", who, msg);
match msg {
Message::SessionKey(key) => self.on_session_key(ctx, who, key),
Message::RequestPovBlock(req_id, relay_parent, candidate_hash) => {
let pov_block = self.live_validation_sessions.with_pov_block(
&relay_parent,
&candidate_hash,
|res| res.ok().map(|b| b.clone()),
);

send_polkadot_message(ctx, who, Message::PovBlock(req_id, pov_block));
}
Message::RequestBlockData(req_id, relay_parent, candidate_hash) => {
let block_data = self.live_validation_sessions
.with_block_data(
.with_pov_block(
&relay_parent,
&candidate_hash,
|res| res.ok().map(|b| b.clone()),
|res| res.ok().map(|b| b.block_data.clone()),
)
.or_else(|| self.extrinsic_store.as_ref()
.and_then(|s| s.block_data(relay_parent, candidate_hash))
);

send_polkadot_message(ctx, who, Message::BlockData(req_id, block_data));
}
Message::BlockData(req_id, data) => self.on_block_data(ctx, who, req_id, data),
Message::PovBlock(req_id, data) => self.on_pov_block(ctx, who, req_id, data),
Message::BlockData(_req_id, _data) => {
// current block data is never requested bare by the node.
ctx.report_peer(who, Severity::Bad("Peer sent un-requested block data".to_string()));
}
Message::Collation(relay_parent, collation) => self.on_collation(ctx, who, relay_parent, collation),
Message::CollatorRole(role) => self.on_new_role(ctx, who, role),
}
Expand Down Expand Up @@ -355,13 +403,19 @@ impl PolkadotProtocol {
self.dispatch_pending_requests(ctx);
}

fn on_block_data(&mut self, ctx: &mut Context<Block>, who: PeerId, req_id: RequestId, data: Option<BlockData>) {
fn on_pov_block(
&mut self,
ctx: &mut Context<Block>,
who: PeerId,
req_id: RequestId,
pov_block: Option<PoVBlock>,
) {
match self.in_flight.remove(&(req_id, who.clone())) {
Some(req) => {
if let Some(data) = data {
if data.hash() == req.block_data_hash {
let _ = req.sender.send(data);
return
Some(mut req) => {
if let Some(pov_block) = pov_block {
match req.process_response(pov_block) {
Ok(()) => return,
Err(r) => { req = r; }
}
}

Expand Down Expand Up @@ -486,12 +540,14 @@ impl Specialization<Block> for PolkadotProtocol {
self.in_flight.retain(|&(_, ref peer), val| {
let retain = peer != &who;
if !retain {
// swap with a dummy value which will be dropped immediately.
let (sender, _) = oneshot::channel();
pending.push(::std::mem::replace(val, BlockDataRequest {
pending.push(::std::mem::replace(val, PoVBlockRequest {
attempted_peers: Default::default(),
validation_session_parent: Default::default(),
candidate_hash: Default::default(),
block_data_hash: Default::default(),
canon_roots: ConsolidatedIngressRoots(Vec::new()),
sender,
}));
}
Expand Down
34 changes: 18 additions & 16 deletions network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use polkadot_validation::{
};
use polkadot_primitives::{Block, Hash, SessionKey};
use polkadot_primitives::parachain::{
BlockData, Extrinsic, CandidateReceipt, ParachainHost, Id as ParaId, Message
Extrinsic, CandidateReceipt, ParachainHost, Id as ParaId, Message,
Collation, PoVBlock,
};
use gossip::RegisteredMessageValidator;

Expand All @@ -41,7 +42,7 @@ use std::collections::{HashMap, HashSet};
use std::io;
use std::sync::Arc;

use validation::{self, SessionDataFetcher, NetworkService, Executor, Incoming};
use validation::{self, SessionDataFetcher, NetworkService, Executor};

type IngressPairRef<'a> = (ParaId, &'a [Message]);

Expand Down Expand Up @@ -92,6 +93,12 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
.map(|msg| msg.statement)
}

/// Get access to the session data fetcher.
#[cfg(test)]
pub(crate) fn fetcher(&self) -> &SessionDataFetcher<P, E, N, T> {
&self.fetcher
}

fn parent_hash(&self) -> Hash {
self.fetcher.parent_hash()
}
Expand Down Expand Up @@ -201,7 +208,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
fn create_work<D>(&self, candidate_hash: Hash, producer: ParachainWork<D>)
-> impl Future<Item=(),Error=()> + Send + 'static
where
D: Future<Item=(BlockData, Incoming),Error=io::Error> + Send + 'static,
D: Future<Item=PoVBlock,Error=io::Error> + Send + 'static,
{
let table = self.table.clone();
let network = self.network().clone();
Expand All @@ -213,7 +220,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
// store the data before broadcasting statements, so other peers can fetch.
knowledge.lock().note_candidate(
candidate_hash,
Some(validated.block_data().clone()),
Some(validated.pov_block().clone()),
validated.extrinsic().cloned(),
);

Expand All @@ -234,26 +241,21 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
E: Future<Item=(),Error=()> + Clone + Send + 'static,
{
type Error = io::Error;
type FetchCandidate = validation::BlockDataReceiver;
type FetchIncoming = validation::IncomingReceiver;
type FetchValidationProof = validation::PoVReceiver;

fn local_candidate(&self, receipt: CandidateReceipt, block_data: BlockData, extrinsic: Extrinsic) {
fn local_collation(&self, collation: Collation, extrinsic: Extrinsic) {
// produce a signed statement
let hash = receipt.hash();
let validated = Validated::collated_local(receipt, block_data.clone(), extrinsic.clone());
let hash = collation.receipt.hash();
let validated = Validated::collated_local(collation.receipt, collation.pov.clone(), extrinsic.clone());
let statement = self.table.import_validated(validated);

// give to network to make available.
self.fetcher.knowledge().lock().note_candidate(hash, Some(block_data), Some(extrinsic));
self.fetcher.knowledge().lock().note_candidate(hash, Some(collation.pov), Some(extrinsic));
self.network().gossip_message(self.attestation_topic, statement.encode());
}

fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate {
self.fetcher.fetch_block_data(candidate)
}

fn fetch_incoming(&self, parachain: ParaId) -> Self::FetchIncoming {
self.fetcher.fetch_incoming(parachain)
fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> Self::FetchValidationProof {
self.fetcher.fetch_pov_block(candidate)
}
}

Expand Down
Loading

0 comments on commit 75e827c

Please sign in to comment.