Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Introduce thread pool for transaction validation. (#4051)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomusdrw authored and gavofyork committed Nov 8, 2019
1 parent 3fea329 commit e676a10
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 6 deletions.
100 changes: 99 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2018"
[dependencies]
derive_more = "0.15.0"
log = "0.4.8"
futures-preview = "0.3.0-alpha.19"
futures = { version = "0.3.0", features = ["thread-pool"] }
codec = { package = "parity-scale-codec", version = "1.0.0" }
parking_lot = "0.9.0"
sr-primitives = { path = "../sr-primitives" }
Expand Down
36 changes: 32 additions & 4 deletions core/transaction-pool/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@
//! Chain api required for the transaction pool.
use std::{
sync::Arc,
marker::PhantomData,
pin::Pin,
sync::Arc,
};
use client::{runtime_api::TaggedTransactionQueue, blockchain::HeaderBackend};
use codec::Encode;
use futures::{
channel::oneshot,
executor::{ThreadPool, ThreadPoolBuilder},
future::Future,
};
use txpool;
use primitives::{
H256,
Expand All @@ -39,6 +45,7 @@ use crate::error;
/// The transaction pool logic
pub struct FullChainApi<T, Block> {
client: Arc<T>,
pool: ThreadPool,
_marker: PhantomData<Block>,
}

Expand All @@ -49,27 +56,48 @@ impl<T, Block> FullChainApi<T, Block> where
pub fn new(client: Arc<T>) -> Self {
FullChainApi {
client,
pool: ThreadPoolBuilder::new()
.pool_size(2)
.name_prefix("txpool-verifier")
.create()
.expect("Failed to spawn verifier threads, that are critical for node operation."),
_marker: Default::default()
}
}
}

impl<T, Block> txpool::ChainApi for FullChainApi<T, Block> where
Block: traits::Block<Hash=H256>,
T: traits::ProvideRuntimeApi + HeaderBackend<Block>,
T: traits::ProvideRuntimeApi + HeaderBackend<Block> + 'static,
T::Api: TaggedTransactionQueue<Block>
{
type Block = Block;
type Hash = H256;
type Error = error::Error;
type ValidationFuture = futures::future::Ready<error::Result<TransactionValidity>>;
type ValidationFuture = Pin<Box<dyn Future<Output=error::Result<TransactionValidity>> + Send>>;

fn validate_transaction(
&self,
at: &BlockId<Self::Block>,
uxt: txpool::ExtrinsicFor<Self>,
) -> Self::ValidationFuture {
futures::future::ready(self.client.runtime_api().validate_transaction(at, uxt).map_err(Into::into))
let (tx, rx) = oneshot::channel();
let client = self.client.clone();
let at = at.clone();

self.pool.spawn_ok(async move {
let res = client.runtime_api().validate_transaction(&at, uxt).map_err(Into::into);
if let Err(e) = tx.send(res) {
log::warn!("Unable to send a validate transaction result: {:?}", e);
}
});

Box::pin(async move {
match rx.await {
Ok(r) => r,
Err(e) => Err(client::error::Error::Msg(format!("{}", e)))?,
}
})
}

fn block_id_to_number(&self, at: &BlockId<Self::Block>) -> error::Result<Option<txpool::NumberFor<Self>>> {
Expand Down

0 comments on commit e676a10

Please sign in to comment.