Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Optimize pending transactions filter #9026

Merged
merged 11 commits into from
Jul 4, 2018
50 changes: 44 additions & 6 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use std::time::{Instant, Duration};
use std::collections::{BTreeMap, HashSet, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashSet, HashMap};
use std::sync::Arc;

use ansi_term::Colour;
Expand Down Expand Up @@ -851,6 +851,37 @@ impl miner::MinerService for Miner {
self.transaction_queue.all_transactions()
}

fn pending_transactions_hashes<C>(&self, chain: &C) -> BTreeSet<H256> where
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be pending_transaction_hashes (no double plural s), see e.g. this.

(I am not a native speaker so take it with a grain of salt)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and we're having both usages. This probably should be unified to a single one in the future:

  • In eth_filter we have pending_transactions_hashes.
  • In ethcore's Block's views and encoded, we have transaction_hashes.

C: ChainInfo + Sync,
{
let chain_info = chain.chain_info();

let from_queue = || self.transaction_queue.pending_hashes(
|sender| self.nonce_cache.read().get(sender).cloned(),
);

let from_pending = || {
self.map_existing_pending_block(|sealing| {
sealing.transactions()
.iter()
.map(|signed| signed.hash())
.collect()
}, chain_info.best_block_number)
};

match self.options.pending_set {
PendingSet::AlwaysQueue => {
from_queue()
},
PendingSet::AlwaysSealing => {
from_pending().unwrap_or_default()
},
PendingSet::SealingOrElseQueue => {
from_pending().unwrap_or_else(from_queue)
},
}
}

fn ready_transactions<C>(&self, chain: &C, max_len: usize, ordering: miner::PendingOrdering)
-> Vec<Arc<VerifiedTransaction>>
where
Expand Down Expand Up @@ -1065,13 +1096,18 @@ impl miner::MinerService for Miner {
// 2. We ignore blocks that are `invalid` because it doesn't have any meaning in terms of the transactions that
// are in those blocks

// Clear nonce cache
self.nonce_cache.write().clear();
let has_new_best_block = enacted.len() > 0;

if has_new_best_block {
// Clear nonce cache
self.nonce_cache.write().clear();
}

// First update gas limit in transaction queue and minimal gas price.
let gas_limit = *chain.best_block_header().gas_limit();
self.update_transaction_queue_limits(gas_limit);


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stray newline?

// Then import all transactions...
let client = self.pool_client(chain);
{
Expand All @@ -1091,10 +1127,12 @@ impl miner::MinerService for Miner {
});
}

// ...and at the end remove the old ones
self.transaction_queue.cull(client);
if has_new_best_block {
// ...and at the end remove the old ones
self.transaction_queue.cull(client);
}

if enacted.len() > 0 || (imported.len() > 0 && self.options.reseal_on_uncle) {
if has_new_best_block || (imported.len() > 0 && self.options.reseal_on_uncle) {
// Reset `next_allowed_reseal` in case a block is imported.
// Even if min_period is high, we will always attempt to create
// new pending block.
Expand Down
8 changes: 7 additions & 1 deletion ethcore/src/miner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use self::miner::{Miner, MinerOptions, Penalization, PendingSet, AuthoringPa
pub use ethcore_miner::pool::PendingOrdering;

use std::sync::Arc;
use std::collections::BTreeMap;
use std::collections::{BTreeSet, BTreeMap};

use bytes::Bytes;
use ethereum_types::{H256, U256, Address};
Expand Down Expand Up @@ -164,6 +164,12 @@ pub trait MinerService : Send + Sync {
fn next_nonce<C>(&self, chain: &C, address: &Address) -> U256
where C: Nonce + Sync;

/// Get a set of all pending transactions hashes.
///
/// Depending on the settings may look in transaction pool or only in pending block.
fn pending_transactions_hashes<C>(&self, chain: &C) -> BTreeSet<H256> where
C: ChainInfo + Sync;

/// Get a list of all ready transactions either ordered by priority or unordered (cheaper).
///
/// Depending on the settings may look in transaction pool or only in pending block.
Expand Down
15 changes: 14 additions & 1 deletion miner/src/pool/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use std::{cmp, fmt};
use std::sync::Arc;
use std::sync::atomic::{self, AtomicUsize};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};

use ethereum_types::{H256, U256, Address};
use parking_lot::RwLock;
Expand Down Expand Up @@ -233,6 +233,19 @@ impl TransactionQueue {
self.pool.read().unordered_pending(ready).collect()
}

/// Computes unordered set of pending hashes.
///
/// Since strict nonce-checking is not required, you may get some false positive future transactions as well.
pub fn pending_hashes<N>(
&self,
nonce: N,
) -> BTreeSet<H256> where
N: Fn(&Address) -> Option<U256>,
{
let ready = ready::OptionalState::new(nonce);
self.pool.read().unordered_pending(ready).map(|tx| tx.hash).collect()
}

/// Returns current pending transactions ordered by priority.
///
/// NOTE: This may return a cached version of pending transaction set.
Expand Down
32 changes: 32 additions & 0 deletions miner/src/pool/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,38 @@ impl txpool::Ready<VerifiedTransaction> for Condition {
}
}

pub struct OptionalState<C> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some docs here?

"Checks readiness of transactions by comparing the nonce to state nonce. If nonce isn't found in provided state nonce store, defaults to the tx nonce and updates the nonce store. Useful for using with a state nonce cache when false positives are allowed." ?

nonces: HashMap<Address, U256>,
state: C,
}

impl<C> OptionalState<C> {
pub fn new(state: C) -> Self {
OptionalState {
nonces: Default::default(),
state,
}
}
}

impl<C: Fn(&Address) -> Option<U256>> txpool::Ready<VerifiedTransaction> for OptionalState<C> {
fn is_ready(&mut self, tx: &VerifiedTransaction) -> txpool::Readiness {
let sender = tx.sender();
let state = &self.state;
let nonce = self.nonces.entry(*sender).or_insert_with(|| {
state(sender).unwrap_or_else(|| tx.transaction.nonce)
});
match tx.transaction.nonce.cmp(nonce) {
cmp::Ordering::Greater => txpool::Readiness::Future,
cmp::Ordering::Less => txpool::Readiness::Stale,
cmp::Ordering::Equal => {
*nonce = *nonce + 1.into();
txpool::Readiness::Ready
},
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/v1/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ mod subscription_manager;
pub use self::dispatch::{Dispatcher, FullDispatcher};
pub use self::network_settings::NetworkSettings;
pub use self::poll_manager::PollManager;
pub use self::poll_filter::{PollFilter, limit_logs};
pub use self::poll_filter::{PollFilter, SyncPollFilter, limit_logs};
pub use self::requests::{
TransactionRequest, FilledTransactionRequest, ConfirmationRequest, ConfirmationPayload, CallRequest,
};
Expand Down
26 changes: 24 additions & 2 deletions rpc/src/v1/helpers/poll_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,41 @@

//! Helper type with all filter state data.

use std::collections::HashSet;
use std::{
collections::{BTreeSet, HashSet},
sync::Arc,
};
use ethereum_types::H256;
use parking_lot::Mutex;
use v1::types::{Filter, Log};

pub type BlockNumber = u64;

/// Thread-safe filter state.
#[derive(Clone)]
pub struct SyncPollFilter(Arc<Mutex<PollFilter>>);

impl SyncPollFilter {
/// New `SyncPollFilter`
pub fn new(f: PollFilter) -> Self {
SyncPollFilter(Arc::new(Mutex::new(f)))
}

/// Modify underlying filter
pub fn modify<F, R>(&self, f: F) -> R where
F: FnOnce(&mut PollFilter) -> R,
{
f(&mut self.0.lock())
}
}

/// Filter state.
#[derive(Clone)]
pub enum PollFilter {
/// Number of last block which client was notified about.
Block(BlockNumber),
/// Hashes of all transactions which client was notified about.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe /// Hashes of all pending transaction the client knows about?

PendingTransaction(Vec<H256>),
PendingTransaction(BTreeSet<H256>),
/// Number of From block number, last seen block hash, pending logs and log filter itself.
Logs(BlockNumber, Option<H256>, HashSet<Log>, Filter)
}
Expand Down
4 changes: 1 addition & 3 deletions rpc/src/v1/impls/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,9 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM, T: StateInfo + 'static> Eth for EthClient<
}

fn block_transaction_count_by_number(&self, num: BlockNumber) -> BoxFuture<Option<RpcU256>> {
let block_number = self.client.chain_info().best_block_number;

Box::new(future::ok(match num {
BlockNumber::Pending =>
self.miner.pending_transactions(block_number).map(|x| x.len().into()),
Some(self.miner.pending_transactions_hashes(&*self.client).len().into()),
_ =>
self.client.block(block_number_to_id(num)).map(|block| block.transactions_count().into())
}))
Expand Down
Loading