Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' of github.com:paritytech/polkadot into ark-fix-…
Browse files Browse the repository at this point in the history
…import
  • Loading branch information
arkpar committed Jul 18, 2018
2 parents 791c73c + 2eb32a3 commit fbe9d6d
Show file tree
Hide file tree
Showing 27 changed files with 586 additions and 244 deletions.
232 changes: 117 additions & 115 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion demo/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ impl extrinsic_pool::api::ExtrinsicPool<UncheckedExtrinsic, BlockId, Hash> for D
Err("unimplemented".into())
}

fn submit_and_watch(&self, _block: BlockId, _: UncheckedExtrinsic)
-> Result<extrinsic_pool::watcher::Watcher<Hash>, Self::Error>
{
Err("unimplemented".into())
}

fn light_status(&self) -> extrinsic_pool::txpool::LightStatus {
unreachable!()
}
Expand Down Expand Up @@ -169,7 +175,7 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
let _rpc_servers = {
let handler = || {
let chain = rpc::apis::chain::Chain::new(client.clone(), runtime.executor());
let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool));
let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool), runtime.executor());
rpc::rpc_handler::<Block, _, _, _, _>(client.clone(), chain, author, DummySystem)
};
let http_address = "127.0.0.1:9933".parse().unwrap();
Expand Down
6 changes: 3 additions & 3 deletions polkadot/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
info!("Starting collator");
// TODO [rob]: collation node implementation
// This isn't a thing. Different parachains will have their own collator executables and
// maybe link to libpolkadot to get a light-client.
// maybe link to libpolkadot to get a light-client.
service::Roles::LIGHT
} else if matches.is_present("light") {
info!("Starting (light)");
Expand Down Expand Up @@ -478,9 +478,9 @@ fn run_until_exit<C, W>(
let ws_address = parse_address("127.0.0.1:9944", "ws-port", matches)?;

let handler = || {
let client = (&service as &substrate_service::Service<C>).client();
let client = substrate_service::Service::client(&service);
let chain = rpc::apis::chain::Chain::new(client.clone(), executor.clone());
let author = rpc::apis::author::Author::new(client.clone(), service.extrinsic_pool());
let author = rpc::apis::author::Author::new(client.clone(), service.extrinsic_pool(), executor.clone());
rpc::rpc_handler::<service::ComponentBlock<C>, _, _, _, _>(
client,
chain,
Expand Down
25 changes: 21 additions & 4 deletions polkadot/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,13 @@ use std::{
};

use codec::{Decode, Encode};
use extrinsic_pool::{Pool, Listener, txpool::{self, Readiness, scoring::{Change, Choice}}};
use extrinsic_pool::api::{ExtrinsicPool, EventStream};
use extrinsic_pool::{
api::{ExtrinsicPool, EventStream},
txpool::{self, Readiness, scoring::{Change, Choice}},
watcher::Watcher,
Pool,
Listener,
};
use polkadot_api::PolkadotApi;
use primitives::{AccountId, BlockId, Hash, Index, UncheckedExtrinsic as FutureProofUncheckedExtrinsic};
use runtime::{Address, UncheckedExtrinsic};
Expand Down Expand Up @@ -385,15 +390,15 @@ impl<A> Deref for TransactionPool<A> {
}
}

// TODO: more general transaction pool, which can handle more kinds of vec-encoded transactions,
// even when runtime is out of date.
impl<A> ExtrinsicPool<FutureProofUncheckedExtrinsic, BlockId, Hash> for TransactionPool<A> where
A: Send + Sync + 'static,
A: PolkadotApi,
{
type Error = Error;

fn submit(&self, block: BlockId, xts: Vec<FutureProofUncheckedExtrinsic>) -> Result<Vec<Hash>> {
// TODO: more general transaction pool, which can handle more kinds of vec-encoded transactions,
// even when runtime is out of date.
xts.into_iter()
.map(|xt| xt.encode())
.map(|encoded| {
Expand All @@ -404,6 +409,18 @@ impl<A> ExtrinsicPool<FutureProofUncheckedExtrinsic, BlockId, Hash> for Transact
.collect()
}

fn submit_and_watch(&self, block: BlockId, xt: FutureProofUncheckedExtrinsic) -> Result<Watcher<Hash>> {
let encoded = xt.encode();
let decoded = UncheckedExtrinsic::decode(&mut &encoded[..]).ok_or(ErrorKind::InvalidExtrinsicFormat)?;

let verifier = Verifier {
api: &*self.api,
at_block: block,
};

self.inner.submit_and_watch(verifier, decoded)
}

fn light_status(&self) -> LightStatus {
self.inner.light_status()
}
Expand Down
2 changes: 2 additions & 0 deletions substrate/extrinsic-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ version = "0.1.0"
authors = ["Parity Technologies <[email protected]>"]

[dependencies]
serde = "1.0"
serde_derive = "1.0"
error-chain = "0.12"
futures = "0.1"
log = "0.3"
Expand Down
5 changes: 5 additions & 0 deletions substrate/extrinsic-pool/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
use txpool;
use futures::sync::mpsc;

use watcher::Watcher;

/// Extrinsic pool error.
pub trait Error: ::std::error::Error + Send + Sized {
/// Try to extract original `txpool::Error`
Expand All @@ -44,6 +46,9 @@ pub trait ExtrinsicPool<Ex, BlockId, Hash>: Send + Sync + 'static {
/// Submit a collection of extrinsics to the pool.
fn submit(&self, block: BlockId, xt: Vec<Ex>) -> Result<Vec<Hash>, Self::Error>;

/// Submit an extrinsic to the pool and start watching it's progress.
fn submit_and_watch(&self, block: BlockId, xt: Ex) -> Result<Watcher<Hash>, Self::Error>;

/// Returns light status of the pool.
fn light_status(&self) -> txpool::LightStatus;

Expand Down
6 changes: 4 additions & 2 deletions substrate/extrinsic-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@
extern crate futures;
extern crate parking_lot;
extern crate serde;

#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;

pub extern crate transaction_pool as txpool;

pub mod api;
pub mod watcher;

mod listener;
mod pool;
mod watcher;

pub use self::listener::Listener;
pub use self::pool::Pool;
pub use self::watcher::Watcher;
24 changes: 21 additions & 3 deletions substrate/extrinsic-pool/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use futures::sync::mpsc;
//! Extrinsics status updates.
use futures::{
Stream,
sync::mpsc,
};

/// Possible extrinsic status events
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Status<H> {
/// Extrinsic has been finalised in block with given hash.
Finalised(H),
Expand All @@ -37,8 +43,19 @@ pub struct Watcher<H> {
receiver: mpsc::UnboundedReceiver<Status<H>>,
}

impl<H> Watcher<H> {
/// Pipe the notifications to given sink.
///
/// Make sure to drive the future to completion.
pub fn into_stream(self) -> impl Stream<Item=Status<H>, Error=()> {
// we can safely ignore the error here, `UnboundedReceiver` never fails.
self.receiver.map_err(|_| ())
}
}

/// Sender part of the watcher. Exposed only for testing purposes.
#[derive(Debug, Default)]
pub(crate) struct Sender<H> {
pub struct Sender<H> {
receivers: Vec<mpsc::UnboundedSender<Status<H>>>,
finalised: bool,
}
Expand Down Expand Up @@ -74,6 +91,7 @@ impl<H: Clone> Sender<H> {
self.send(Status::Broadcast(peers))
}


/// Returns true if the are no more listeners for this extrinsic or it was finalised.
pub fn is_done(&self) -> bool {
self.finalised || self.receivers.is_empty()
Expand Down
2 changes: 1 addition & 1 deletion substrate/network-libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ bytes = "0.4"
error-chain = { version = "0.12", default-features = false }
fnv = "1.0"
futures = "0.1"
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "2fb5ef1d40f2565e592248abbd21b7ca2da992e0", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "727e0e099b53a4032a7e2330994c819fe866add7", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
ethcore-io = { git = "https://github.com/paritytech/parity.git" }
ethkey = { git = "https://github.com/paritytech/parity.git" }
ethereum-types = "0.3"
Expand Down
58 changes: 50 additions & 8 deletions substrate/network-libp2p/src/network_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

use bytes::Bytes;
use fnv::{FnvHashMap, FnvHashSet};
use futures::{future, sync::mpsc};
use futures::sync::mpsc;
use libp2p::core::{Multiaddr, AddrComponent, Endpoint, UniqueConnec};
use libp2p::core::{PeerId as PeerstorePeerId, PublicKey};
use libp2p::core::{UniqueConnecState, PeerId as PeerstorePeerId, PublicKey};
use libp2p::kad::KadConnecController;
use libp2p::peerstore::{Peerstore, PeerAccess};
use libp2p::peerstore::json_peerstore::JsonPeerstore;
Expand Down Expand Up @@ -192,7 +192,28 @@ impl NetworkState {
&self.local_public_key
}

/// Returns all the IDs of the peer we have knowledge of.
/// Returns the ID of a random peer of the network.
///
/// Returns `None` if we don't know any peer.
pub fn random_peer(&self) -> Option<PeerstorePeerId> {
// TODO: optimize by putting the operation directly in the peerstore
// https://github.com/libp2p/rust-libp2p/issues/316
let peers = match self.peerstore {
PeersStorage::Memory(ref mem) =>
mem.peers().collect::<Vec<_>>(),
PeersStorage::Json(ref json) =>
json.peers().collect::<Vec<_>>(),
};

if peers.is_empty() {
return None
}

let nth = rand::random::<usize>() % peers.len();
Some(peers[nth].clone())
}

/// Returns all the IDs of the peers on the network we have knowledge of.
///
/// This includes peers we are not connected to.
pub fn known_peers(&self) -> impl Iterator<Item = PeerstorePeerId> {
Expand Down Expand Up @@ -402,11 +423,32 @@ impl NetworkState {
}
}

/// Returns true if we should open a new outgoing connection to a peer.
/// This takes into account the number of active peers.
pub fn should_open_outgoing_connections(&self) -> bool {
!self.reserved_only.load(atomic::Ordering::Relaxed) &&
self.connections.read().peer_by_nodeid.len() < self.min_peers as usize
/// Returns the number of open and pending connections with
/// custom protocols.
pub fn num_open_custom_connections(&self) -> u32 {
self.connections
.read()
.info_by_peer
.values()
.filter(|info|
info.protocols.iter().any(|&(_, ref connec)|
match connec.state() {
UniqueConnecState::Pending | UniqueConnecState::Full => true,
_ => false
}
)
)
.count() as u32
}

/// Returns the number of new outgoing custom connections to peers to
/// open. This takes into account the number of active peers.
pub fn should_open_outgoing_custom_connections(&self) -> u32 {
if self.reserved_only.load(atomic::Ordering::Relaxed) {
0
} else {
self.min_peers.saturating_sub(self.num_open_custom_connections())
}
}

/// Returns true if we are connected to the given node.
Expand Down
Loading

0 comments on commit fbe9d6d

Please sign in to comment.