Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Use SpawnTaskHandles for spawning tasks in the tx pool #8958

Merged
5 commits merged into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion client/transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ targets = ["x86_64-unknown-linux-gnu"]
codec = { package = "parity-scale-codec", version = "2.0.0" }
thiserror = "1.0.21"
futures = { version = "0.3.1", features = ["compat"] }
futures-diagnose = "1.0"
intervalier = "0.4.0"
log = "0.4.8"
parity-util-mem = { version = "0.9.0", default-features = false, features = ["primitive-types"] }
Expand Down
16 changes: 7 additions & 9 deletions client/transaction-pool/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use std::{marker::PhantomData, pin::Pin, sync::Arc};
use codec::{Decode, Encode};
use futures::{
channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready, Ready},
channel::oneshot, future::{Future, FutureExt, ready, Ready},
};

use sc_client_api::{
Expand All @@ -31,6 +31,7 @@ use sp_runtime::{
generic::BlockId, traits::{self, Block as BlockT, BlockIdTo, Header as HeaderT, Hash as HashT},
transaction_validity::{TransactionValidity, TransactionSource},
};
use sp_core::traits::SpawnNamed;
use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
use sp_api::{ProvideRuntimeApi, ApiExt};
use prometheus_endpoint::Registry as PrometheusRegistry;
Expand All @@ -40,7 +41,7 @@ use crate::{metrics::{ApiMetrics, ApiMetricsExt}, error::{self, Error}};
/// The transaction pool logic for full client.
pub struct FullChainApi<Client, Block> {
client: Arc<Client>,
pool: ThreadPool,
spawner: Box<dyn SpawnNamed>,
_marker: PhantomData<Block>,
metrics: Option<Arc<ApiMetrics>>,
}
Expand All @@ -50,6 +51,7 @@ impl<Client, Block> FullChainApi<Client, Block> {
pub fn new(
client: Arc<Client>,
prometheus: Option<&PrometheusRegistry>,
spawner: impl SpawnNamed + 'static,
) -> Self {
let metrics = prometheus.map(ApiMetrics::register).and_then(|r| {
match r {
Expand All @@ -67,13 +69,9 @@ impl<Client, Block> FullChainApi<Client, Block> {

FullChainApi {
client,
pool: ThreadPoolBuilder::new()
.pool_size(2)
.name_prefix("txpool-verifier")
.create()
.expect("Failed to spawn verifier threads, that are critical for node operation."),
Comment on lines -70 to -74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we spawned 2 threads and now you spawn unlimited of them...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still going to be limited by the number of blocking threads, no?
If it's not limited then we might be generating quite huge load on the entire machine due to revalidation (which is not bounded currently) and that might interfere with other tasks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit is quite high, I don't remember it, but it is way more than 2

_marker: Default::default(),
metrics,
spawner: Box::new(spawner) ,
}
}
}
Expand Down Expand Up @@ -109,9 +107,9 @@ where
let metrics = self.metrics.clone();
metrics.report(|m| m.validations_scheduled.inc());

self.pool.spawn_ok(futures_diagnose::diagnose(
self.spawner.spawn_blocking(
"validate-transaction",
async move {
Box::pin(async move {
let res = validate_transaction_blocking(&*client, &at, source, uxt);
if let Err(e) = tx.send(res) {
log::warn!("Unable to send a validate transaction result: {:?}", e);
Expand Down
4 changes: 2 additions & 2 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,10 @@ where
options: sc_transaction_graph::Options,
is_validator: txpool::IsValidator,
prometheus: Option<&PrometheusRegistry>,
spawner: impl SpawnNamed,
spawner: impl SpawnNamed + Clone + 'static,
client: Arc<Client>,
) -> Arc<Self> {
let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus));
let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, spawner.clone()));
let pool = Arc::new(Self::with_revalidation_type(
options, is_validator, pool_api, prometheus, RevalidationType::Full, spawner
));
Expand Down
5 changes: 3 additions & 2 deletions client/transaction-pool/src/testing/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use std::collections::BTreeSet;
use sc_client_api::client::BlockchainEvents;
use sc_block_builder::BlockBuilderProvider;
use sp_consensus::BlockOrigin;
use sp_core::testing::TaskExecutor;

fn pool() -> Pool<TestApi> {
Pool::new(Default::default(), true.into(), TestApi::with_alice_nonce(209).into())
Expand Down Expand Up @@ -935,7 +936,7 @@ fn should_not_accept_old_signatures() {
let client = Arc::new(substrate_test_runtime_client::new());

let pool = Arc::new(
BasicPool::new_test(Arc::new(FullChainApi::new(client, None))).0
BasicPool::new_test(Arc::new(FullChainApi::new(client, None, TaskExecutor::new()))).0
);

let transfer = Transfer {
Expand Down Expand Up @@ -971,7 +972,7 @@ fn import_notification_to_pool_maintain_works() {
let mut client = Arc::new(substrate_test_runtime_client::new());

let pool = Arc::new(
BasicPool::new_test(Arc::new(FullChainApi::new(client.clone(), None))).0
BasicPool::new_test(Arc::new(FullChainApi::new(client.clone(), None, TaskExecutor::new()))).0
);

// Prepare the extrisic, push it to the pool and check that it was added.
Expand Down