Skip to content

Commit

Permalink
Rewrite some Future structs as async functions (paritytech#679)
Browse files Browse the repository at this point in the history
* Squashed commit of the following:

commit e97a17157ae0887320994661e2f816275fc75b76
Author: Ashley <[email protected]>
Date:   Tue Dec 10 15:06:28 2019 +0100

    Rewrite some functions as async

commit 970e485179f1e087cf0a51c6a4e71f923e87df45
Merge: f98966ac 4782840
Author: Ashley <[email protected]>
Date:   Tue Dec 10 11:19:37 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-futures-update

commit f98966ac188067158071d1e3e243c34ea5738f56
Author: Ashley <[email protected]>
Date:   Mon Dec 9 23:40:20 2019 +0100

    Add async blocks back in

commit 7fa88af0271db659de9274c94cb8e7eead0e4289
Author: Ashley <[email protected]>
Date:   Mon Dec 9 23:17:02 2019 +0100

    Revert "Asyncify network functions"

    This reverts commit f20ae6548dc482cb1e75bc80641cfe55c6131a53.

commit 82413550cdac40bd14a09f62df12de49dd7e55af
Author: Ashley <[email protected]>
Date:   Mon Dec 9 19:09:55 2019 +0100

    Fix validation test again

commit 47e002b08369c9c775b92aea9b6f6ed81b30241b
Author: Ashley <[email protected]>
Date:   Mon Dec 9 19:07:43 2019 +0100

    Switch favicon

commit 0c5c1409078fc57120a39e40ec5cb1763d67d593
Author: Ashley <[email protected]>
Date:   Mon Dec 9 18:54:10 2019 +0100

    Fix validation test

commit 8bb6a0189fe824da09054cbf5b06f11a0f87072d
Author: Ashley <[email protected]>
Date:   Mon Dec 9 18:53:54 2019 +0100

    Nits

commit 33410f3a4910d3e688956cecfcca02cc2dfa6a7a
Author: Ashley <[email protected]>
Date:   Mon Dec 9 18:43:09 2019 +0100

    Fix av store test

commit f0c517eb240c42848cdb3305e0b554ef407bdfaa
Merge: 938f411a 60e72111
Author: Ashley <[email protected]>
Date:   Mon Dec 9 18:21:39 2019 +0100

    Merge branch 'ashley-futures-updates' into ashley-futures-update

commit 60e72111651f2b366592c1e56756c6bf5d8ce2f1
Author: Ashley <[email protected]>
Date:   Mon Dec 9 18:19:40 2019 +0100

    Clean up browser validation worker error

commit f20ae6548dc482cb1e75bc80641cfe55c6131a53
Author: Ashley <[email protected]>
Date:   Mon Dec 9 18:16:40 2019 +0100

    Asyncify network functions

commit b22758d0a3852d701923bd238484e1c9eabec5e2
Merge: 2e8b05ed ca8d5c5
Author: Ashley <[email protected]>
Date:   Mon Dec 9 17:47:26 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-futures-updates

commit 2e8b05edf1a1fadd6943f967c27b6d34675ba06a
Author: Ashley <[email protected]>
Date:   Mon Dec 9 17:45:52 2019 +0100

    Box pin changes

commit 08bfdf7f2d27721abffee49221213304ebc4fd47
Author: Ashley <[email protected]>
Date:   Mon Dec 9 17:15:38 2019 +0100

    Update network/src/lib.rs

    Co-Authored-By: Pierre Krieger <[email protected]>

commit d8be456c508d5e5a03178db45d9f272b302a8a65
Author: Ashley <[email protected]>
Date:   Mon Dec 9 17:15:32 2019 +0100

    Update network/src/lib.rs

    Co-Authored-By: Pierre Krieger <[email protected]>

commit ec7367276fdd374b19f41555fd5985454c559600
Author: Ashley <[email protected]>
Date:   Mon Dec 9 17:14:36 2019 +0100

    Update availability-store/src/worker.rs

    Co-Authored-By: Pierre Krieger <[email protected]>

commit 938f411a9365e9c5fb16bfedb62aacac4403d063
Author: Ashley <[email protected]>
Date:   Mon Dec 9 17:05:05 2019 +0100

    Revert "Revert removal of tokio_executor that causes tokio version mismatch panic"

    This reverts commit cfeb50c01d8df5e209483406a711e64761b44ae9.

commit f92f58044b4fe04bde73a60820d154080dd64b16
Author: Ashley <[email protected]>
Date:   Mon Dec 9 15:47:35 2019 +0100

    Fix adder test parachain

commit cfeb50c01d8df5e209483406a711e64761b44ae9
Author: Ashley <[email protected]>
Date:   Mon Dec 9 15:31:36 2019 +0100

    Revert removal of tokio_executor that causes tokio version mismatch panic

commit 5bcb83a122b9a30f240a238ca670c6b658f4ddf1
Author: Ashley <[email protected]>
Date:   Mon Dec 9 15:17:55 2019 +0100

    Fix typo

commit fc02b1dc16e277649677396833a8d70e8588a56c
Author: Ashley <[email protected]>
Date:   Mon Dec 9 15:02:50 2019 +0100

    Fix collator

commit 6c4ff5b3bf1084a618ffec2d864090c9c8077f0f
Author: Ashley <[email protected]>
Date:   Mon Dec 9 14:35:37 2019 +0100

    Small changes

commit e1338cb4450df5377d8c911da56445914d667472
Author: Ashley <[email protected]>
Date:   Mon Dec 9 14:24:42 2019 +0100

    Fix network tests

commit 4e458f7a91c1ed5c986795f40ed55e596d176c4b
Author: Ashley <[email protected]>
Date:   Mon Dec 9 12:25:26 2019 +0100

    Remove futures01 from availability-store

commit 5729f6cd6b53f061ff155320c815509feb02309e
Author: Ashley <[email protected]>
Date:   Mon Dec 9 12:22:33 2019 +0100

    Fix validation tests

commit a820612565b42780f8b6c09c9c1c30f06a9985ba
Author: Ashley <[email protected]>
Date:   Mon Dec 9 12:01:48 2019 +0100

    Fix availability store tests

commit 112344faeee5f8f03b3b87c6baf7036a7fcbe415
Author: Ashley <[email protected]>
Date:   Mon Dec 9 11:36:03 2019 +0100

    Update tokio version

commit d2de6d8b3f0c3682679fe437d5459ac50a3c3895
Author: Ashley <[email protected]>
Date:   Mon Dec 9 11:33:25 2019 +0100

    Revert cli tokio version to avoid libp2p panic

commit 0c5f24e0c1131ac58a947448456e7fb62c869702
Author: Ashley <[email protected]>
Date:   Mon Dec 9 11:27:13 2019 +0100

    Switch to polkadot-master

commit 2e2311e33a4af87c2c545094ea8cb595cd6cfe2d
Author: Ashley <[email protected]>
Date:   Fri Dec 6 15:07:21 2019 +0100

    Re-add release flag

commit 6adc1b6114e154a590acf82acfaf0c1265409518
Merge: 9767f832 5e9542c
Author: Ashley <[email protected]>
Date:   Fri Dec 6 13:36:35 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm

commit 9767f8325c33211065ef6830becdac0e3cf852de
Merge: c528dc6d 84ece42
Author: Ashley <[email protected]>
Date:   Wed Dec 4 17:11:39 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm

commit c528dc6df8fc31cdcbc10889636355241398debd
Author: Ashley <[email protected]>
Date:   Wed Dec 4 17:07:00 2019 +0100

    Fix wasm build

commit da233a122c678dc7767dac7cc6e2564575b15cc8
Author: Ashley <[email protected]>
Date:   Wed Dec 4 16:25:49 2019 +0100

    tidy

commit 832f8054df78afbcef1903e0f9e7e246b348c10d
Merge: 4e1da888 78e828d
Author: Ashley <[email protected]>
Date:   Wed Dec 4 15:56:56 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm

commit 4e1da8888dd2160064dd453782fb05513c65ade4
Author: Ashley <[email protected]>
Date:   Tue Dec 3 16:47:02 2019 +0100

    Temp switch back to substrate/master

commit af88a87338688797bbc52315fdd0fc22cf23c6cf
Merge: a03a980c abb5111
Author: Ashley <[email protected]>
Date:   Mon Dec 2 19:33:14 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm

commit a03a980ce417ec7b446bfcbe7a66ec0ed6458135
Merge: 31a88a93 f7d4826
Author: Ashley <[email protected]>
Date:   Mon Dec 2 13:52:37 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm

commit 31a88a930ffdf5da72b3e587ec8c0e6b00922e3e
Author: Ashley <[email protected]>
Date:   Mon Dec 2 13:52:35 2019 +0100

    Tidy

commit 5b33b7a7af08d7a3aa3853b8e4995484fb640d52
Author: Ashley <[email protected]>
Date:   Mon Dec 2 11:55:51 2019 +0100

    Add browser-demo

commit 868f6e51dfdc0a64252acd9adabe7b9ba436b1f4
Author: Ashley <[email protected]>
Date:   Mon Dec 2 10:51:57 2019 +0100

    Add initial browser file

commit e5e399c20f1dc4e1023ee57773dcdd9ab2a0a14b
Author: Ashley <[email protected]>
Date:   Mon Dec 2 10:45:02 2019 +0100

    Add browser-demo

commit 408288b05292d952944a6b8e1f2bcf9cf259a040
Author: Ashley <[email protected]>
Date:   Sun Dec 1 19:28:33 2019 +0100

    Get polkadot to compile via wasm!

commit 04ffe72e868be57841d31f01eec1b90423a595d6
Author: Ashley <[email protected]>
Date:   Sun Dec 1 19:28:16 2019 +0100

    Migrate service

commit 119f0829a53b825a3ebc9efdefa76ae7eabb04aa
Merge: 93fb6428 5422684
Author: Ashley <[email protected]>
Date:   Sun Dec 1 17:43:49 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm

commit 93fb6428501bac612a1675cf3b6e3d26f5bbc7c2
Author: Ashley <[email protected]>
Date:   Sun Dec 1 12:21:25 2019 +0100

    Switch branch

commit 0c4fe8331bdc9665ac2427eb8c795112ac728d70
Author: Ashley <[email protected]>
Date:   Sat Nov 30 11:45:59 2019 +0100

    Tidy up validation

commit 73563253d95962657108820ae130a8d3f3093ee8
Author: Ashley <[email protected]>
Date:   Sat Nov 30 11:39:09 2019 +0100

    Tidy up network

commit 1c9cf0427c0e2d15c4b6d52b91d67d4a3963e30d
Author: Ashley <[email protected]>
Date:   Sat Nov 30 01:16:35 2019 +0100

    Final changes to validation

commit 322cca5224fdca0a29d88ff91700ef704a9d0c2a
Author: Ashley <[email protected]>
Date:   Sat Nov 30 00:31:55 2019 +0100

    Migrate network to std futures

commit 96f1a99491f5ae2957effa58cc1e385014575a32
Author: Ashley <[email protected]>
Date:   Fri Nov 29 23:31:04 2019 +0100

    Migrate validation to std futures

commit aaf5e55fffd1367c05687eb34f4365a24e3a34c0
Author: Ashley <[email protected]>
Date:   Fri Nov 29 17:10:11 2019 +0100

    Switch to Spawn trait

commit 2ab282f57e8b9a55cf8d285b283cf009216511d2
Merge: cceb6b72 5598ed9
Author: Ashley <[email protected]>
Date:   Fri Nov 29 16:31:24 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm

commit cceb6b72f5677a1c43d2cd61bd525539054f0c01
Author: Ashley <[email protected]>
Date:   Fri Nov 29 15:47:14 2019 +0100

    Make validation work on wasm!

commit b45a95cf7d829a916bf2ad6936d1e7f4b6f3ef77
Merge: 3773d5c db7eaa6b
Author: Ashley <[email protected]>
Date:   Fri Nov 29 13:57:23 2019 +0100

    Merge remote-tracking branch 'tomaka/wasm-start' into HEAD

commit db7eaa6bd5d3bbcea829570fb47ab4d06f3558ce
Merge: 6f97dbb7 2ab32da
Author: Pierre Krieger <[email protected]>
Date:   Thu Nov 28 13:58:15 2019 +0100

    Merge branch 'master' into wasm-start

commit 6f97dbb786750d854cf8f7a56c6a336ea5979228
Author: Pierre Krieger <[email protected]>
Date:   Thu Nov 28 12:47:45 2019 +0100

    Use --manifest-path instead

commit 20104e98ff1713b6c81b0251b43d060d4e672d55
Author: Pierre Krieger <[email protected]>
Date:   Thu Nov 28 10:44:51 2019 +0100

    Make availability-store compile for WASM

* Fix build

* Fix futures blocking panic in validators (again)

* Deindent
  • Loading branch information
expenses authored Dec 13, 2019
1 parent 3a457c5 commit b0535e6
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 455 deletions.
33 changes: 20 additions & 13 deletions collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use std::pin::Pin;

use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn};
use log::{warn, error};
Expand Down Expand Up @@ -242,20 +243,26 @@ impl<P: 'static, E: 'static, SP: 'static> RelayChainContext for ApiContext<P, E,
SP: Spawn + Clone + Send + Sync
{
type Error = String;
type FutureEgress = Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Unpin + Send>;
type FutureEgress = Pin<Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Send>>;

fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
// TODO: https://github.com/paritytech/polkadot/issues/253
//
// Fetch ingress and accumulate all unrounted egress
let _session = self.network.instantiate_leaf_work(LeafWorkParams {
local_session_key: None,
parent_hash: self.parent_hash,
authorities: self.validators.clone(),
})
.map_err(|e| format!("unable to instantiate validation session: {:?}", e));

Box::new(future::ok(ConsolidatedIngress(Vec::new())))
let network = self.network.clone();
let parent_hash = self.parent_hash;
let authorities = self.validators.clone();

async move {
// TODO: https://github.com/paritytech/polkadot/issues/253
//
// Fetch ingress and accumulate all unrounted egress
let _session = network.instantiate_leaf_work(LeafWorkParams {
local_session_key: None,
parent_hash,
authorities,
})
.map_err(|e| format!("unable to instantiate validation session: {:?}", e));

Ok(ConsolidatedIngress(Vec::new()))
}.boxed()
}
}

Expand Down Expand Up @@ -425,7 +432,7 @@ impl<P, E> Worker for CollationNode<P, E> where
);

let exit = inner_exit_2.clone();
tokio::spawn(future::select(res, exit).map(drop));
tokio::spawn(future::select(res.boxed(), exit).map(drop));
})
});

Expand Down
34 changes: 13 additions & 21 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub mod gossip;
use codec::{Decode, Encode};
use futures::channel::{oneshot, mpsc};
use futures::prelude::*;
use futures::future::Either;
use polkadot_primitives::{Block, Hash, Header};
use polkadot_primitives::parachain::{
Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock,
Expand Down Expand Up @@ -837,25 +836,6 @@ impl PolkadotProtocol {
debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}",
relay_parent, collation.info.parachain_index);

let res = match self.availability_store {
Some(ref availability_store) => {
let availability_store_cloned = availability_store.clone();
let collation_cloned = collation.clone();
Either::Left((async move {
let _ = availability_store_cloned.make_available(av_store::Data {
relay_parent,
parachain_id: collation_cloned.info.parachain_index,
block_data: collation_cloned.pov.block_data.clone(),
outgoing_queues: Some(outgoing_targeted.clone().into()),
}).await;
}
)
.boxed()
)
}
None => Either::Right(futures::future::ready(())),
};

for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) {
match self.validators.get(&primary) {
Some(who) => {
Expand All @@ -871,7 +851,19 @@ impl PolkadotProtocol {
}
}

res
let availability_store = self.availability_store.clone();
let collation_cloned = collation.clone();

async move {
if let Some(availability_store) = availability_store {
let _ = availability_store.make_available(av_store::Data {
relay_parent,
parachain_id: collation_cloned.info.parachain_index,
block_data: collation_cloned.pov.block_data.clone(),
outgoing_queues: Some(outgoing_targeted.clone().into()),
}).await;
}
}
}

/// Give the network protocol a handle to an availability store, used for
Expand Down
5 changes: 3 additions & 2 deletions network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ use log::{debug, trace};
use std::collections::{HashMap, HashSet};
use std::io;
use std::sync::Arc;
use std::pin::Pin;

use crate::validation::{self, LeafWorkDataFetcher, Executor};
use crate::validation::{LeafWorkDataFetcher, Executor};
use crate::NetworkService;

/// Compute the gossip topic for attestations on the given parent hash.
Expand Down Expand Up @@ -232,7 +233,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
E: Future<Output=()> + Clone + Send + 'static,
{
type Error = io::Error;
type FetchValidationProof = validation::PoVReceiver;
type FetchValidationProof = Pin<Box<dyn Future<Output = Result<PoVBlock, io::Error>> + Send>>;

// We have fetched from a collator and here the receipt should have been already formed.
fn local_collation(
Expand Down
91 changes: 31 additions & 60 deletions network/src/tests/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::pin::Pin;
use std::task::{Poll, Context};
use futures::{prelude::*, channel::mpsc};
use futures::{prelude::*, channel::mpsc, future::{select, Either}};
use codec::Encode;

use super::{TestContext, TestChainContext};
Expand All @@ -66,77 +66,48 @@ fn clone_gossip(n: &TopicNotification) -> TopicNotification {
}
}

struct GossipRouter {
incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>,
incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
outgoing: Vec<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
messages: Vec<(Hash, TopicNotification)>,
}

impl GossipRouter {
fn add_message(&mut self, topic: Hash, message: TopicNotification) {
self.outgoing.retain(|&(ref o_topic, ref sender)| {
o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok()
});
self.messages.push((topic, message));
}

fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender<TopicNotification>) {
for message in self.messages.iter()
.filter(|&&(ref t, _)| t == &topic)
.map(|&(_, ref msg)| clone_gossip(msg))
{
if let Err(_) = sender.unbounded_send(message) { return }
}

self.outgoing.push((topic, sender));
}
}

impl Future for GossipRouter {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);

loop {
match Pin::new(&mut this.incoming_messages).poll_next(cx) {
Poll::Ready(Some((topic, message))) => this.add_message(topic, message),
Poll::Ready(None) => panic!("ended early."),
Poll::Pending => break,
}
}

loop {
match Pin::new(&mut this.incoming_streams).poll_next(cx) {
Poll::Ready(Some((topic, sender))) => this.add_outgoing(topic, sender),
Poll::Ready(None) => panic!("ended early."),
Poll::Pending => break,
}
async fn gossip_router(
mut incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>,
mut incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<TopicNotification>)>
) {
let mut outgoing: Vec<(Hash, mpsc::UnboundedSender<TopicNotification>)> = Vec::new();
let mut messages = Vec::new();

loop {
match select(incoming_messages.next(), incoming_streams.next()).await {
Either::Left((Some((topic, message)), _)) => {
outgoing.retain(|&(ref o_topic, ref sender)| {
o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok()
});
messages.push((topic, message));
},
Either::Right((Some((topic, sender)), _)) => {
for message in messages.iter()
.filter(|&&(ref t, _)| t == &topic)
.map(|&(_, ref msg)| clone_gossip(msg))
{
if let Err(_) = sender.unbounded_send(message) { return }
}

outgoing.push((topic, sender));
},
Either::Left((None, _)) | Either::Right((None, _)) => panic!("ended early.")
}

Poll::Pending
}
}


#[derive(Clone)]
struct GossipHandle {
send_message: mpsc::UnboundedSender<(Hash, TopicNotification)>,
send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
}

fn make_gossip() -> (GossipRouter, GossipHandle) {
fn make_gossip() -> (impl Future<Output = ()>, GossipHandle) {
let (message_tx, message_rx) = mpsc::unbounded();
let (listener_tx, listener_rx) = mpsc::unbounded();

(
GossipRouter {
incoming_messages: message_rx,
incoming_streams: listener_rx,
outgoing: Vec::new(),
messages: Vec::new(),
},
gossip_router(message_rx, listener_rx),
GossipHandle { send_message: message_tx, send_listener: listener_tx },
)
}
Expand Down Expand Up @@ -344,7 +315,7 @@ type TestValidationNetwork = crate::validation::ValidationNetwork<
>;

struct Built {
gossip: GossipRouter,
gossip: Pin<Box<dyn Future<Output = ()>>>,
api_handle: Arc<Mutex<ApiData>>,
networks: Vec<TestValidationNetwork>,
}
Expand Down Expand Up @@ -377,7 +348,7 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built {
let networks: Vec<_> = networks.collect();

Built {
gossip: gossip_router,
gossip: gossip_router.boxed(),
api_handle,
networks,
}
Expand Down
Loading

0 comments on commit b0535e6

Please sign in to comment.