diff --git a/Cargo.lock b/Cargo.lock index 9f29feece6d86..89422a0bb1502 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3746,8 +3746,6 @@ dependencies = [ "frame-system", "futures 0.3.5", "hex-literal", - "jsonrpc-core", - "jsonrpc-pubsub", "log", "nix", "node-executor", @@ -3881,7 +3879,6 @@ name = "node-rpc" version = "2.0.0-rc6" dependencies = [ "jsonrpc-core", - "jsonrpc-pubsub", "node-primitives", "node-runtime", "pallet-contracts-rpc", diff --git a/bin/node/cli/Cargo.toml b/bin/node/cli/Cargo.toml index 92f223427a710..fdc63f09555bb 100644 --- a/bin/node/cli/Cargo.toml +++ b/bin/node/cli/Cargo.toml @@ -38,8 +38,6 @@ codec = { package = "parity-scale-codec", version = "1.3.4" } serde = { version = "1.0.102", features = ["derive"] } futures = { version = "0.3.1", features = ["compat"] } hex-literal = "0.3.1" -jsonrpc-core = "14.2.0" -jsonrpc-pubsub = "14.2.0" log = "0.4.8" rand = "0.7.2" structopt = { version = "0.3.8", optional = true } diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index d91696ab7d6bc..b972efa813d2f 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -51,7 +51,7 @@ pub fn new_partial(config: &Configuration) -> Result node_rpc::IoHandler, ( sc_consensus_babe::BabeBlockImport, @@ -119,7 +119,7 @@ pub fn new_partial(config: &Configuration) -> Result Result, /// Receives notifications about justification events from Grandpa. pub justification_stream: GrandpaJustificationStream, - /// Subscription manager to keep track of pubsub subscribers. - pub subscriptions: SubscriptionManager, + /// Executor to drive the subscription manager in the Grandpa RPC handler. + pub subscription_executor: SubscriptionTaskExecutor, } /// Full client dependencies. @@ -139,7 +139,7 @@ pub fn create_full( shared_voter_state, shared_authority_set, justification_stream, - subscriptions, + subscription_executor, } = grandpa; io.extend_with( @@ -172,7 +172,7 @@ pub fn create_full( shared_authority_set, shared_voter_state, justification_stream, - subscriptions, + subscription_executor, ) ) ); diff --git a/client/finality-grandpa/rpc/src/lib.rs b/client/finality-grandpa/rpc/src/lib.rs index 5606da42d5947..fedd7220d3115 100644 --- a/client/finality-grandpa/rpc/src/lib.rs +++ b/client/finality-grandpa/rpc/src/lib.rs @@ -19,6 +19,7 @@ //! RPC API for GRANDPA. #![warn(missing_docs)] +use std::sync::Arc; use futures::{FutureExt, TryFutureExt, TryStreamExt, StreamExt}; use log::warn; use jsonrpc_derive::rpc; @@ -27,6 +28,7 @@ use jsonrpc_core::futures::{ sink::Sink as Sink01, stream::Stream as Stream01, future::Future as Future01, + future::Executor as Executor01, }; mod error; @@ -92,12 +94,16 @@ pub struct GrandpaRpcHandler { impl GrandpaRpcHandler { /// Creates a new GrandpaRpcHandler instance. - pub fn new( + pub fn new( authority_set: AuthoritySet, voter_state: VoterState, justification_stream: GrandpaJustificationStream, - manager: SubscriptionManager, - ) -> Self { + executor: E, + ) -> Self + where + E: Executor01 + Send>> + Send + Sync + 'static, + { + let manager = SubscriptionManager::new(Arc::new(executor)); Self { authority_set, voter_state, @@ -232,13 +238,12 @@ mod tests { VoterState: ReportVoterState + Send + Sync + 'static, { let (justification_sender, justification_stream) = GrandpaJustificationStream::channel(); - let manager = SubscriptionManager::new(Arc::new(sc_rpc::testing::TaskExecutor)); let handler = GrandpaRpcHandler::new( TestAuthoritySet, voter_state, justification_stream, - manager, + sc_rpc::testing::TaskExecutor, ); let mut io = jsonrpc_core::MetaIoHandler::default(); diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index f4046ab722ba7..5a1737af3724a 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -46,7 +46,7 @@ use sp_runtime::traits::{ }; use sp_api::{ProvideRuntimeApi, CallApiAt}; use sc_executor::{NativeExecutor, NativeExecutionDispatch, RuntimeInfo}; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use wasm_timer::SystemTime; use sc_telemetry::{telemetry, SUBSTRATE_INFO}; use sp_transaction_pool::MaintainedTransactionPool; @@ -73,17 +73,25 @@ pub trait RpcExtensionBuilder { /// Returns an instance of the RPC extension for a particular `DenyUnsafe` /// value, e.g. the RPC extension might not expose some unsafe methods. - fn build(&self, deny: sc_rpc::DenyUnsafe, subscriptions: SubscriptionManager) -> Self::Output; + fn build( + &self, + deny: sc_rpc::DenyUnsafe, + subscription_executor: sc_rpc::SubscriptionTaskExecutor, + ) -> Self::Output; } impl RpcExtensionBuilder for F where - F: Fn(sc_rpc::DenyUnsafe, SubscriptionManager) -> R, + F: Fn(sc_rpc::DenyUnsafe, sc_rpc::SubscriptionTaskExecutor) -> R, R: sc_rpc::RpcExtension, { type Output = R; - fn build(&self, deny: sc_rpc::DenyUnsafe, subscriptions: SubscriptionManager) -> Self::Output { - (*self)(deny, subscriptions) + fn build( + &self, + deny: sc_rpc::DenyUnsafe, + subscription_executor: sc_rpc::SubscriptionTaskExecutor, + ) -> Self::Output { + (*self)(deny, subscription_executor) } } @@ -97,7 +105,11 @@ impl RpcExtensionBuilder for NoopRpcExtensionBuilder where { type Output = R; - fn build(&self, _deny: sc_rpc::DenyUnsafe, _subscriptions: SubscriptionManager) -> Self::Output { + fn build( + &self, + _deny: sc_rpc::DenyUnsafe, + _subscription_executor: sc_rpc::SubscriptionTaskExecutor, + ) -> Self::Output { self.0.clone() } } @@ -694,7 +706,7 @@ fn gen_handler( }; let task_executor = sc_rpc::SubscriptionTaskExecutor::new(spawn_handle); - let subscriptions = SubscriptionManager::new(Arc::new(task_executor)); + let subscriptions = SubscriptionManager::new(Arc::new(task_executor.clone())); let (chain, state, child_state) = if let (Some(remote_blockchain), Some(on_demand)) = (remote_blockchain, on_demand) { @@ -723,20 +735,16 @@ fn gen_handler( let author = sc_rpc::author::Author::new( client, transaction_pool, - subscriptions.clone(), + subscriptions, keystore, deny_unsafe, ); let system = system::System::new(system_info, system_rpc_tx, deny_unsafe); - let maybe_offchain_rpc = offchain_storage - .map(|storage| { + let maybe_offchain_rpc = offchain_storage.map(|storage| { let offchain = sc_rpc::offchain::Offchain::new(storage, deny_unsafe); - // FIXME: Use plain Option (don't collect into HashMap) when we upgrade to jsonrpc 14.1 - // https://github.com/paritytech/jsonrpc/commit/20485387ed06a48f1a70bf4d609a7cde6cf0accf - let delegate = offchain::OffchainApi::to_delegate(offchain); - delegate.into_iter().collect::>() - }).unwrap_or_default(); + offchain::OffchainApi::to_delegate(offchain) + }); sc_rpc_server::rpc_handler(( state::StateApi::to_delegate(state), @@ -745,7 +753,7 @@ fn gen_handler( maybe_offchain_rpc, author::AuthorApi::to_delegate(author), system::SystemApi::to_delegate(system), - rpc_extensions_builder.build(deny_unsafe, subscriptions), + rpc_extensions_builder.build(deny_unsafe, task_executor), )) }