Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Chain head subscription #126

Merged
merged 16 commits into from
Apr 17, 2018
Merged
Prev Previous commit
Next Next commit
Proper implementation with event loop.
tomusdrw committed Apr 13, 2018

Verified

This commit was signed with the committer’s verified signature.
tomusdrw Tomek Drwięga
commit 88164439649f63bec1added6d1bc92468ca28ef2
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion polkadot/cli/src/lib.rs
Original file line number Diff line number Diff line change
@@ -141,6 +141,7 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
let http_address = parse_address("127.0.0.1:9933", "rpc-port", &matches);
let ws_address = parse_address("127.0.0.1:9944", "ws-port", &matches);

let chain = rpc::apis::chain::Chain::new()
let handler = || rpc::rpc_handler(service.client(), service.client(), service.transaction_pool());
(
rpc::start_http(&http_address, handler())?,
@@ -161,7 +162,7 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
fn parse_address(default: &str, port_param: &str, matches: &clap::ArgMatches) -> SocketAddr {
let mut address: SocketAddr = default.parse().unwrap();
if let Some(port) = matches.value_of(port_param) {
let port: u16 = port.parse().expect(&format!("Invalid RPC port for --{} specified.", port_param));
let port: u16 = port.parse().expect(&format!("Invalid port for --{} specified.", port_param));
address.set_port(port);
}

1 change: 0 additions & 1 deletion polkadot/service/src/lib.rs
Original file line number Diff line number Diff line change
@@ -53,7 +53,6 @@ use futures::prelude::*;
use parking_lot::Mutex;
use tokio_core::reactor::Core;
use codec::Slicable;
use runtime_io::with_externalities;
use primitives::block::{Id as BlockId, TransactionHash};
use transaction_pool::TransactionPool;
use substrate_executor::NativeExecutor;
2 changes: 1 addition & 1 deletion substrate/client/src/client.rs
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ use {error, in_mem, block_builder, runtime_io, bft};
pub type BlockchainEventStream = mpsc::UnboundedReceiver<BlockImportNotification>;

/// Polkadot Client
pub struct Client<B, E> where B: backend::Backend {
pub struct Client<B, E> {
backend: B,
executor: E,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<BlockImportNotification>>>,
7 changes: 4 additions & 3 deletions substrate/rpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -4,16 +4,17 @@ version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]

[dependencies]
parking_lot = "0.4"
log = "0.3"
error-chain = "0.11"
jsonrpc-core = { git="https://github.com/paritytech/jsonrpc.git" }
jsonrpc-macros = { git="https://github.com/paritytech/jsonrpc.git" }
jsonrpc-pubsub = { git="https://github.com/paritytech/jsonrpc.git" }
log = "0.3"
parking_lot = "0.4"
substrate-client = { path = "../client" }
substrate-executor = { path = "../executor" }
substrate-primitives = { path = "../primitives" }
substrate-state-machine = { path = "../state-machine" }
substrate-executor = { path = "../executor" }
tokio-core = "0.1.12"

[dev-dependencies]
assert_matches = "1.1"
53 changes: 27 additions & 26 deletions substrate/rpc/src/chain/mod.rs
Original file line number Diff line number Diff line change
@@ -19,20 +19,21 @@
use std::sync::Arc;

use primitives::block;
use client::{self, Client};
use client::{self, Client, BlockchainEvents};
use state_machine;

use jsonrpc_pubsub::SubscriptionId;
use jsonrpc_macros::pubsub;
use rpc::futures::Future;
use jsonrpc_pubsub::SubscriptionId;
use rpc::Result as RpcResult;
use rpc::futures::{Future, Sink, Stream};

mod error;
use subscriptions::Subscriptions;

mod error;
#[cfg(test)]
mod tests;

use self::error::{Result, ResultExt};
use rpc::Result as RpcResult;

build_rpc_trait! {
/// Polkadot blockchain API
@@ -59,43 +60,43 @@ build_rpc_trait! {
}
}

impl<B, E> ChainApi for Arc<Client<B, E>> where
/// Chain API with subscriptions support.
pub struct Chain<B, E> {
/// Substrate client.
pub client: Arc<Client<B, E>>,
/// Current subscriptions.
pub subscriptions: Subscriptions,
}

impl<B, E> ChainApi for Chain<B, E> where
B: client::backend::Backend + Send + Sync + 'static,
E: state_machine::CodeExecutor + Send + Sync + 'static,
client::error::Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>,
{
type Metadata = ::metadata::Metadata;

fn header(&self, hash: block::HeaderHash) -> Result<Option<block::Header>> {
client::Client::header(self, &block::Id::Hash(hash)).chain_err(|| "Blockchain error")
self.client.header(&block::Id::Hash(hash)).chain_err(|| "Blockchain error")
}

fn head(&self) -> Result<block::HeaderHash> {
Ok(self.info().chain_err(|| "Blockchain error")?.chain.best_hash)
Ok(self.client.info().chain_err(|| "Blockchain error")?.chain.best_hash)
}

fn subscribe_new_head(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber<block::Header>) {
let (tx, rx) = ::std::sync::mpsc::sync_channel(1);
let client = self.clone();
let handle = ::std::thread::spawn(move || {
let sink = subscriber.assign_id(1.into()).unwrap();
let mut last_value = None;
loop {
if let Ok(()) = rx.recv_timeout(::std::time::Duration::from_millis(100)) {
return;
}
let head = client.head().and_then(|h| client.header(h)).ok();
if last_value != head {
last_value = head.clone();
if let Some(Some(head)) = head {
sink.notify(Ok(head)).wait().unwrap();
}
}
}
self.subscriptions.add(subscriber, |sink| {
let stream = self.client.import_notification_stream()
.map(|notification| Ok(notification.header))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should filter by notification.is_best

.map_err(|e| warn!("Block notification stream error: {:?}", e));
sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(stream)
// we ignore the resulting Stream (if the first stream is over we are unsubscribed)
.map(|_| ())
});
}

fn unsubscribe_new_head(&self, id: SubscriptionId) -> RpcResult<bool> {
unimplemented!()
Ok(self.subscriptions.cancel(id))
}
}
8 changes: 7 additions & 1 deletion substrate/rpc/src/chain/tests.rs
Original file line number Diff line number Diff line change
@@ -29,7 +29,13 @@ fn should_return_header() {
digest: Default::default(),
};

let client = Arc::new(client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap());
let core = ::tokio_core::reactor::Core::new().unwrap();
let remote = core.remote();

let client = Chain {
client: Arc::new(client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap()),
subscriptions: Subscriptions::new(remote),
};

assert_matches!(
ChainApi::header(&client, test_genesis_block.blake2_256().into()),
5 changes: 4 additions & 1 deletion substrate/rpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -18,12 +18,13 @@

#![warn(missing_docs)]

extern crate parking_lot;
extern crate jsonrpc_core as rpc;
extern crate jsonrpc_pubsub;
extern crate parking_lot;
extern crate substrate_client as client;
extern crate substrate_primitives as primitives;
extern crate substrate_state_machine as state_machine;
extern crate tokio_core;

#[macro_use]
extern crate error_chain;
@@ -40,6 +41,8 @@ extern crate assert_matches;
#[cfg(test)]
extern crate substrate_runtime_support as runtime_support;

mod subscriptions;

pub mod author;
pub mod chain;
pub mod metadata;
7 changes: 7 additions & 0 deletions substrate/rpc/src/metadata.rs
Original file line number Diff line number Diff line change
@@ -14,10 +14,16 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

//! RPC Metadata
use std::sync::Arc;

use jsonrpc_pubsub::{Session, PubSubMetadata};

/// RPC Metadata.
///
/// Manages peristent session for transports that support it
/// and may contain some additional info extracted from specific transports
/// (like remote client IP address, request headers, etc)
#[derive(Default, Clone)]
pub struct Metadata {
session: Option<Arc<Session>>,
@@ -31,6 +37,7 @@ impl PubSubMetadata for Metadata {
}

impl Metadata {
/// Create new `Metadata` with session (Pub/Sub) support.
pub fn new(transport: ::rpc::futures::sync::mpsc::Sender<String>) -> Self {
Metadata {
session: Some(Arc::new(Session::new(transport))),
86 changes: 86 additions & 0 deletions substrate/rpc/src/subscriptions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2017-2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.

// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::sync::atomic::{self, AtomicUsize};

use jsonrpc_macros::pubsub;
use jsonrpc_pubsub::SubscriptionId;
use parking_lot::Mutex;
use rpc::futures::sync::oneshot;
use rpc::futures::{Future, future};
use tokio_core::reactor::Remote;

type Id = u64;

/// Subscriptions manager.
///
/// Takes care of assigning unique subscription ids and
/// driving the sinks into completion.
#[derive(Debug)]
pub struct Subscriptions {
next_id: AtomicUsize,
active_subscriptions: Mutex<HashMap<Id, oneshot::Sender<()>>>,
event_loop: Remote,
}

impl Subscriptions {
/// Creates new `Subscriptions` object.
pub fn new(event_loop: Remote) -> Self {
Subscriptions {
next_id: Default::default(),
active_subscriptions: Default::default(),
event_loop,
}
}

/// Creates new subscription for given subscriber.
///
/// Second parameter is a function that converts Subscriber sink into a future.
/// This future will be driven to completion bu underlying event loop
/// or will be cancelled in case #cancel is invoked.
pub fn add<T, E, G, R, F>(&self, subscriber: pubsub::Subscriber<T, E>, into_future: G) where
G: FnOnce(pubsub::Sink<T, E>) -> R,
R: future::IntoFuture<Future=F, Item=(), Error=()>,
F: future::Future<Item=(), Error=()> + Send + 'static,
{
let id = self.next_id.fetch_add(1, atomic::Ordering::AcqRel) as u64;
if let Ok(sink) = subscriber.assign_id(id.into()) {
let (tx, rx) = oneshot::channel();
let future = into_future(sink)
.into_future()
.select(rx.map_err(|e| warn!("Error timeing out: {:?}", e)))
.map(|_| ())
.map_err(|_| ());

self.active_subscriptions.lock().insert(id, tx);
self.event_loop.spawn(|_| future);
}
}

/// Cancel subscription.
///
/// Returns true if subscription existed or false otherwise.
pub fn cancel(&self, id: SubscriptionId) -> bool {
if let SubscriptionId::Number(id) = id {
if let Some(tx) = self.active_subscriptions.lock().remove(&id) {
let _ = tx.send(());
return true;
}
}
false
}
}