From bfef07c0d22ead3ab3c4e0e90ddf9b0e3537566e Mon Sep 17 00:00:00 2001 From: Ashley Date: Tue, 1 Jun 2021 16:28:03 +0200 Subject: [PATCH] Use `SpawnTaskHandle`s for spawning tasks in the tx pool (#8958) * Remove futures-diagnose * Use `SpawnTaskHandle`s for spawning tasks in the tx pool * Box the spawner * Fix tests * Use the testing task executor --- Cargo.lock | 17 ----------------- client/transaction-pool/Cargo.toml | 1 - client/transaction-pool/src/api.rs | 16 +++++++--------- client/transaction-pool/src/lib.rs | 4 ++-- client/transaction-pool/src/testing/pool.rs | 5 +++-- 5 files changed, 12 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fca6465198aa6..fc107b0a53bc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2059,22 +2059,6 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "futures-diagnose" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdcef58a173af8148b182684c9f2d5250875adbcaff7b5794073894f9d8634a9" -dependencies = [ - "futures 0.1.31", - "futures 0.3.13", - "lazy_static", - "log", - "parking_lot 0.9.0", - "pin-project 0.4.27", - "serde", - "serde_json", -] - [[package]] name = "futures-executor" version = "0.3.13" @@ -8116,7 +8100,6 @@ version = "3.0.0" dependencies = [ "assert_matches", "futures 0.3.13", - "futures-diagnose", "hex", "intervalier", "log", diff --git a/client/transaction-pool/Cargo.toml b/client/transaction-pool/Cargo.toml index d457d709d1222..6b105520baec5 100644 --- a/client/transaction-pool/Cargo.toml +++ b/client/transaction-pool/Cargo.toml @@ -16,7 +16,6 @@ targets = ["x86_64-unknown-linux-gnu"] codec = { package = "parity-scale-codec", version = "2.0.0" } thiserror = "1.0.21" futures = { version = "0.3.1", features = ["compat"] } -futures-diagnose = "1.0" intervalier = "0.4.0" log = "0.4.8" parity-util-mem = { version = "0.9.0", default-features = false, features = ["primitive-types"] } diff --git a/client/transaction-pool/src/api.rs b/client/transaction-pool/src/api.rs index 2ebf038844fab..fe1f99e0a3c2e 100644 --- a/client/transaction-pool/src/api.rs +++ b/client/transaction-pool/src/api.rs @@ -21,7 +21,7 @@ use std::{marker::PhantomData, pin::Pin, sync::Arc}; use codec::{Decode, Encode}; use futures::{ - channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready, Ready}, + channel::oneshot, future::{Future, FutureExt, ready, Ready}, }; use sc_client_api::{ @@ -31,6 +31,7 @@ use sp_runtime::{ generic::BlockId, traits::{self, Block as BlockT, BlockIdTo, Header as HeaderT, Hash as HashT}, transaction_validity::{TransactionValidity, TransactionSource}, }; +use sp_core::traits::SpawnNamed; use sp_transaction_pool::runtime_api::TaggedTransactionQueue; use sp_api::{ProvideRuntimeApi, ApiExt}; use prometheus_endpoint::Registry as PrometheusRegistry; @@ -40,7 +41,7 @@ use crate::{metrics::{ApiMetrics, ApiMetricsExt}, error::{self, Error}}; /// The transaction pool logic for full client. pub struct FullChainApi { client: Arc, - pool: ThreadPool, + spawner: Box, _marker: PhantomData, metrics: Option>, } @@ -50,6 +51,7 @@ impl FullChainApi { pub fn new( client: Arc, prometheus: Option<&PrometheusRegistry>, + spawner: impl SpawnNamed + 'static, ) -> Self { let metrics = prometheus.map(ApiMetrics::register).and_then(|r| { match r { @@ -67,13 +69,9 @@ impl FullChainApi { FullChainApi { client, - pool: ThreadPoolBuilder::new() - .pool_size(2) - .name_prefix("txpool-verifier") - .create() - .expect("Failed to spawn verifier threads, that are critical for node operation."), _marker: Default::default(), metrics, + spawner: Box::new(spawner) , } } } @@ -109,9 +107,9 @@ where let metrics = self.metrics.clone(); metrics.report(|m| m.validations_scheduled.inc()); - self.pool.spawn_ok(futures_diagnose::diagnose( + self.spawner.spawn_blocking( "validate-transaction", - async move { + Box::pin(async move { let res = validate_transaction_blocking(&*client, &at, source, uxt); if let Err(e) = tx.send(res) { log::warn!("Unable to send a validate transaction result: {:?}", e); diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index bc5f6e367ff86..bcabc5b873997 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -366,10 +366,10 @@ where options: sc_transaction_graph::Options, is_validator: txpool::IsValidator, prometheus: Option<&PrometheusRegistry>, - spawner: impl SpawnNamed, + spawner: impl SpawnNamed + Clone + 'static, client: Arc, ) -> Arc { - let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus)); + let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, spawner.clone())); let pool = Arc::new(Self::with_revalidation_type( options, is_validator, pool_api, prometheus, RevalidationType::Full, spawner )); diff --git a/client/transaction-pool/src/testing/pool.rs b/client/transaction-pool/src/testing/pool.rs index 904870ae0ece9..1a76c28a0e0d4 100644 --- a/client/transaction-pool/src/testing/pool.rs +++ b/client/transaction-pool/src/testing/pool.rs @@ -35,6 +35,7 @@ use std::collections::BTreeSet; use sc_client_api::client::BlockchainEvents; use sc_block_builder::BlockBuilderProvider; use sp_consensus::BlockOrigin; +use sp_core::testing::TaskExecutor; fn pool() -> Pool { Pool::new(Default::default(), true.into(), TestApi::with_alice_nonce(209).into()) @@ -935,7 +936,7 @@ fn should_not_accept_old_signatures() { let client = Arc::new(substrate_test_runtime_client::new()); let pool = Arc::new( - BasicPool::new_test(Arc::new(FullChainApi::new(client, None))).0 + BasicPool::new_test(Arc::new(FullChainApi::new(client, None, TaskExecutor::new()))).0 ); let transfer = Transfer { @@ -971,7 +972,7 @@ fn import_notification_to_pool_maintain_works() { let mut client = Arc::new(substrate_test_runtime_client::new()); let pool = Arc::new( - BasicPool::new_test(Arc::new(FullChainApi::new(client.clone(), None))).0 + BasicPool::new_test(Arc::new(FullChainApi::new(client.clone(), None, TaskExecutor::new()))).0 ); // Prepare the extrisic, push it to the pool and check that it was added.