diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 3a77efe859d..c375e91dce5 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use std::time::{Instant, Duration}; -use std::collections::{BTreeMap, HashSet, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashSet, HashMap}; use std::sync::Arc; use ansi_term::Colour; @@ -838,7 +838,40 @@ impl miner::MinerService for Miner { self.transaction_queue.all_transactions() } - fn ready_transactions(&self, chain: &C) -> Vec> where + fn pending_transaction_hashes(&self, chain: &C) -> BTreeSet where + C: ChainInfo + Sync, + { + let chain_info = chain.chain_info(); + + let from_queue = || self.transaction_queue.pending_hashes( + |sender| self.nonce_cache.read().get(sender).cloned(), + ); + + let from_pending = || { + self.map_existing_pending_block(|sealing| { + sealing.transactions() + .iter() + .map(|signed| signed.hash()) + .collect() + }, chain_info.best_block_number) + }; + + match self.options.pending_set { + PendingSet::AlwaysQueue => { + from_queue() + }, + PendingSet::AlwaysSealing => { + from_pending().unwrap_or_default() + }, + PendingSet::SealingOrElseQueue => { + from_pending().unwrap_or_else(from_queue) + }, + } + } + + fn ready_transactions(&self, chain: &C) + -> Vec> + where C: ChainInfo + Nonce + Sync, { let chain_info = chain.chain_info(); @@ -1043,8 +1076,12 @@ impl miner::MinerService for Miner { // 2. We ignore blocks that are `invalid` because it doesn't have any meaning in terms of the transactions that // are in those blocks - // Clear nonce cache - self.nonce_cache.write().clear(); + let has_new_best_block = enacted.len() > 0; + + if has_new_best_block { + // Clear nonce cache + self.nonce_cache.write().clear(); + } // First update gas limit in transaction queue and minimal gas price. let gas_limit = *chain.best_block_header().gas_limit(); @@ -1069,10 +1106,12 @@ impl miner::MinerService for Miner { }); } - // ...and at the end remove the old ones - self.transaction_queue.cull(client); + if has_new_best_block { + // ...and at the end remove the old ones + self.transaction_queue.cull(client); + } - if enacted.len() > 0 || (imported.len() > 0 && self.options.reseal_on_uncle) { + if has_new_best_block || (imported.len() > 0 && self.options.reseal_on_uncle) { // Reset `next_allowed_reseal` in case a block is imported. // Even if min_period is high, we will always attempt to create // new pending block. diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs index fc18e5d47d1..340285b9b3f 100644 --- a/ethcore/src/miner/mod.rs +++ b/ethcore/src/miner/mod.rs @@ -28,7 +28,7 @@ pub mod stratum; pub use self::miner::{Miner, MinerOptions, Penalization, PendingSet, AuthoringParams}; use std::sync::Arc; -use std::collections::BTreeMap; +use std::collections::{BTreeSet, BTreeMap}; use bytes::Bytes; use ethereum_types::{H256, U256, Address}; @@ -164,7 +164,13 @@ pub trait MinerService : Send + Sync { fn next_nonce(&self, chain: &C, address: &Address) -> U256 where C: Nonce + Sync; - /// Get a list of all ready transactions. + /// Get a set of all pending transaction hashes. + /// + /// Depending on the settings may look in transaction pool or only in pending block. + fn pending_transaction_hashes(&self, chain: &C) -> BTreeSet where + C: ChainInfo + Sync; + + /// Get a list of all ready transactions either ordered by priority or unordered (cheaper). /// /// Depending on the settings may look in transaction pool or only in pending block. fn ready_transactions(&self, chain: &C) -> Vec> diff --git a/miner/src/pool/queue.rs b/miner/src/pool/queue.rs index 96f3e4ef763..bc8abbf2253 100644 --- a/miner/src/pool/queue.rs +++ b/miner/src/pool/queue.rs @@ -19,7 +19,7 @@ use std::{cmp, fmt}; use std::sync::Arc; use std::sync::atomic::{self, AtomicUsize}; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use ethereum_types::{H256, U256, Address}; use parking_lot::RwLock; @@ -285,7 +285,20 @@ impl TransactionQueue { self.pool.read().pending(ready).collect() } - /// Returns current pneding transactions. + /// Computes unordered set of pending hashes. + /// + /// Since strict nonce-checking is not required, you may get some false positive future transactions as well. + pub fn pending_hashes( + &self, + nonce: N, + ) -> BTreeSet where + N: Fn(&Address) -> Option, + { + let ready = ready::OptionalState::new(nonce); + self.pool.read().pending(ready).map(|tx| tx.hash).collect() + } + + /// Returns current pending transactions ordered by priority. /// /// NOTE: This may return a cached version of pending transaction set. /// Re-computing the pending set is possible with `#collect_pending` method, diff --git a/miner/src/pool/ready.rs b/miner/src/pool/ready.rs index c2829b34a9a..a8e4dd6f8ca 100644 --- a/miner/src/pool/ready.rs +++ b/miner/src/pool/ready.rs @@ -130,6 +130,43 @@ impl txpool::Ready for Condition { } } +/// Readiness checker that only relies on nonce cache (does actually go to state). +/// +/// Checks readiness of transactions by comparing the nonce to state nonce. If nonce +/// isn't found in provided state nonce store, defaults to the tx nonce and updates +/// the nonce store. Useful for using with a state nonce cache when false positives are allowed. +pub struct OptionalState { + nonces: HashMap, + state: C, +} + +impl OptionalState { + pub fn new(state: C) -> Self { + OptionalState { + nonces: Default::default(), + state, + } + } +} + +impl Option> txpool::Ready for OptionalState { + fn is_ready(&mut self, tx: &VerifiedTransaction) -> txpool::Readiness { + let sender = tx.sender(); + let state = &self.state; + let nonce = self.nonces.entry(*sender).or_insert_with(|| { + state(sender).unwrap_or_else(|| tx.transaction.nonce) + }); + match tx.transaction.nonce.cmp(nonce) { + cmp::Ordering::Greater => txpool::Readiness::Future, + cmp::Ordering::Less => txpool::Readiness::Stale, + cmp::Ordering::Equal => { + *nonce = *nonce + 1.into(); + txpool::Readiness::Ready + }, + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rpc/src/v1/helpers/mod.rs b/rpc/src/v1/helpers/mod.rs index c8d9e414b69..8672151739e 100644 --- a/rpc/src/v1/helpers/mod.rs +++ b/rpc/src/v1/helpers/mod.rs @@ -40,7 +40,7 @@ mod subscription_manager; pub use self::dispatch::{Dispatcher, FullDispatcher}; pub use self::network_settings::NetworkSettings; pub use self::poll_manager::PollManager; -pub use self::poll_filter::{PollFilter, limit_logs}; +pub use self::poll_filter::{PollFilter, SyncPollFilter, limit_logs}; pub use self::requests::{ TransactionRequest, FilledTransactionRequest, ConfirmationRequest, ConfirmationPayload, CallRequest, }; diff --git a/rpc/src/v1/helpers/poll_filter.rs b/rpc/src/v1/helpers/poll_filter.rs index 61437078399..4fdd6100e73 100644 --- a/rpc/src/v1/helpers/poll_filter.rs +++ b/rpc/src/v1/helpers/poll_filter.rs @@ -1,18 +1,40 @@ //! Helper type with all filter state data. -use std::collections::HashSet; +use std::{ + collections::{BTreeSet, HashSet}, + sync::Arc, +}; use ethereum_types::H256; +use parking_lot::Mutex; use v1::types::{Filter, Log}; pub type BlockNumber = u64; +/// Thread-safe filter state. +#[derive(Clone)] +pub struct SyncPollFilter(Arc>); + +impl SyncPollFilter { + /// New `SyncPollFilter` + pub fn new(f: PollFilter) -> Self { + SyncPollFilter(Arc::new(Mutex::new(f))) + } + + /// Modify underlying filter + pub fn modify(&self, f: F) -> R where + F: FnOnce(&mut PollFilter) -> R, + { + f(&mut self.0.lock()) + } +} + /// Filter state. #[derive(Clone)] pub enum PollFilter { /// Number of last block which client was notified about. Block(BlockNumber), - /// Hashes of all transactions which client was notified about. - PendingTransaction(Vec), + /// Hashes of all pending transactions the client knows about. + PendingTransaction(BTreeSet), /// Number of From block number, last seen block hash, pending logs and log filter itself. Logs(BlockNumber, Option, HashSet, Filter) } diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index d6434bba84f..975c944c7dd 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -608,11 +608,9 @@ impl Eth for EthClient< } fn block_transaction_count_by_number(&self, num: BlockNumber) -> BoxFuture> { - let block_number = self.client.chain_info().best_block_number; - Box::new(future::ok(match num { BlockNumber::Pending => - self.miner.pending_transactions(block_number).map(|x| x.len().into()), + Some(self.miner.pending_transaction_hashes(&*self.client).len().into()), _ => self.client.block(block_number_to_id(num)).map(|block| block.transactions_count().into()) })) diff --git a/rpc/src/v1/impls/eth_filter.rs b/rpc/src/v1/impls/eth_filter.rs index 1e0f294f036..984914d8b38 100644 --- a/rpc/src/v1/impls/eth_filter.rs +++ b/rpc/src/v1/impls/eth_filter.rs @@ -17,7 +17,7 @@ //! Eth Filter RPC implementation use std::sync::Arc; -use std::collections::HashSet; +use std::collections::BTreeSet; use ethcore::miner::{self, MinerService}; use ethcore::filter::Filter as EthcoreFilter; @@ -30,7 +30,7 @@ use jsonrpc_core::futures::{future, Future}; use jsonrpc_core::futures::future::Either; use v1::traits::EthFilter; use v1::types::{BlockNumber, Index, Filter, FilterChanges, Log, H256 as RpcH256, U256 as RpcU256}; -use v1::helpers::{errors, PollFilter, PollManager, limit_logs}; +use v1::helpers::{errors, SyncPollFilter, PollFilter, PollManager, limit_logs}; use v1::impls::eth::pending_logs; /// Something which provides data that can be filtered over. @@ -41,8 +41,8 @@ pub trait Filterable { /// Get a block hash by block id. fn block_hash(&self, id: BlockId) -> Option; - /// pending transaction hashes at the given block. - fn pending_transactions_hashes(&self) -> Vec; + /// pending transaction hashes at the given block (unordered). + fn pending_transaction_hashes(&self) -> BTreeSet; /// Get logs that match the given filter. fn logs(&self, filter: EthcoreFilter) -> BoxFuture>; @@ -51,7 +51,7 @@ pub trait Filterable { fn pending_logs(&self, block_number: u64, filter: &EthcoreFilter) -> Vec; /// Get a reference to the poll manager. - fn polls(&self) -> &Mutex>; + fn polls(&self) -> &Mutex>; /// Get removed logs within route from the given block to the nearest canon block, not including the canon block. Also returns how many logs have been traversed. fn removed_logs(&self, block_hash: H256, filter: &EthcoreFilter) -> (Vec, u64); @@ -61,7 +61,7 @@ pub trait Filterable { pub struct EthFilterClient { client: Arc, miner: Arc, - polls: Mutex>, + polls: Mutex>, } impl EthFilterClient { @@ -87,11 +87,8 @@ impl Filterable for EthFilterClient where self.client.block_hash(id) } - fn pending_transactions_hashes(&self) -> Vec { - self.miner.ready_transactions(&*self.client) - .into_iter() - .map(|tx| tx.signed().hash()) - .collect() + fn pending_transaction_hashes(&self) -> BTreeSet { + self.miner.pending_transaction_hashes(&*self.client) } fn logs(&self, filter: EthcoreFilter) -> BoxFuture> { @@ -102,7 +99,7 @@ impl Filterable for EthFilterClient where pending_logs(&*self.miner, block_number, filter) } - fn polls(&self) -> &Mutex> { &self.polls } + fn polls(&self) -> &Mutex> { &self.polls } fn removed_logs(&self, block_hash: H256, filter: &EthcoreFilter) -> (Vec, u64) { let inner = || -> Option> { @@ -145,127 +142,124 @@ impl EthFilter for T { fn new_filter(&self, filter: Filter) -> Result { let mut polls = self.polls().lock(); let block_number = self.best_block_number(); - let id = polls.create_poll(PollFilter::Logs(block_number, None, Default::default(), filter)); + let id = polls.create_poll(SyncPollFilter::new(PollFilter::Logs(block_number, None, Default::default(), filter))); Ok(id.into()) } fn new_block_filter(&self) -> Result { let mut polls = self.polls().lock(); // +1, since we don't want to include the current block - let id = polls.create_poll(PollFilter::Block(self.best_block_number() + 1)); + let id = polls.create_poll(SyncPollFilter::new(PollFilter::Block(self.best_block_number() + 1))); Ok(id.into()) } fn new_pending_transaction_filter(&self) -> Result { let mut polls = self.polls().lock(); - let pending_transactions = self.pending_transactions_hashes(); - let id = polls.create_poll(PollFilter::PendingTransaction(pending_transactions)); + let pending_transactions = self.pending_transaction_hashes(); + let id = polls.create_poll(SyncPollFilter::new(PollFilter::PendingTransaction(pending_transactions))); Ok(id.into()) } fn filter_changes(&self, index: Index) -> BoxFuture { - let mut polls = self.polls().lock(); - Box::new(match polls.poll_mut(&index.value()) { - None => Either::A(future::err(errors::filter_not_found())), - Some(filter) => match *filter { - PollFilter::Block(ref mut block_number) => { - // +1, cause we want to return hashes including current block hash. - let current_number = self.best_block_number() + 1; - let hashes = (*block_number..current_number).into_iter() - .map(BlockId::Number) - .filter_map(|id| self.block_hash(id).map(Into::into)) - .collect::>(); - - *block_number = current_number; - - Either::A(future::ok(FilterChanges::Hashes(hashes))) - }, - PollFilter::PendingTransaction(ref mut previous_hashes) => { - // get hashes of pending transactions - let current_hashes = self.pending_transactions_hashes(); - - let new_hashes = - { - let previous_hashes_set = previous_hashes.iter().collect::>(); - - // find all new hashes - current_hashes - .iter() - .filter(|hash| !previous_hashes_set.contains(hash)) - .cloned() - .map(Into::into) - .collect::>() - }; - - // save all hashes of pending transactions - *previous_hashes = current_hashes; - - // return new hashes - Either::A(future::ok(FilterChanges::Hashes(new_hashes))) - }, - PollFilter::Logs(ref mut block_number, ref mut last_block_hash, ref mut previous_logs, ref filter) => { - // retrive the current block number - let current_number = self.best_block_number(); - - // check if we need to check pending hashes - let include_pending = filter.to_block == Some(BlockNumber::Pending); - - // build appropriate filter - let mut filter: EthcoreFilter = filter.clone().into(); - - // retrieve reorg logs - let (mut reorg, reorg_len) = last_block_hash.map_or_else(|| (Vec::new(), 0), |h| self.removed_logs(h, &filter)); - *block_number -= reorg_len as u64; - - filter.from_block = BlockId::Number(*block_number); - filter.to_block = BlockId::Latest; - - // retrieve pending logs - let pending = if include_pending { - let pending_logs = self.pending_logs(current_number, &filter); - - // remove logs about which client was already notified about - let new_pending_logs: Vec<_> = pending_logs.iter() - .filter(|p| !previous_logs.contains(p)) - .cloned() - .collect(); - - // save all logs retrieved by client - *previous_logs = pending_logs.into_iter().collect(); - - new_pending_logs - } else { - Vec::new() - }; - - // save the number of the next block as a first block from which - // we want to get logs - *block_number = current_number + 1; - - // save the current block hash, which we used to get back to the - // canon chain in case of reorg. - *last_block_hash = self.block_hash(BlockId::Number(current_number)); - - // retrieve logs in range from_block..min(BlockId::Latest..to_block) - let limit = filter.limit; - Either::B(self.logs(filter) - .map(move |logs| { reorg.extend(logs); reorg }) // append reorg logs in the front - .map(move |mut logs| { logs.extend(pending); logs }) // append fetched pending logs - .map(move |logs| limit_logs(logs, limit)) // limit the logs - .map(FilterChanges::Logs)) - } + let filter = match self.polls().lock().poll_mut(&index.value()) { + Some(filter) => filter.clone(), + None => return Box::new(future::err(errors::filter_not_found())), + }; + + Box::new(filter.modify(|filter| match *filter { + PollFilter::Block(ref mut block_number) => { + // +1, cause we want to return hashes including current block hash. + let current_number = self.best_block_number() + 1; + let hashes = (*block_number..current_number).into_iter() + .map(BlockId::Number) + .filter_map(|id| self.block_hash(id).map(Into::into)) + .collect::>(); + + *block_number = current_number; + + Either::A(future::ok(FilterChanges::Hashes(hashes))) + }, + PollFilter::PendingTransaction(ref mut previous_hashes) => { + // get hashes of pending transactions + let current_hashes = self.pending_transaction_hashes(); + + let new_hashes = { + // find all new hashes + current_hashes.difference(previous_hashes) + .cloned() + .map(Into::into) + .collect() + }; + + // save all hashes of pending transactions + *previous_hashes = current_hashes; + + // return new hashes + Either::A(future::ok(FilterChanges::Hashes(new_hashes))) + }, + PollFilter::Logs(ref mut block_number, ref mut last_block_hash, ref mut previous_logs, ref filter) => { + // retrive the current block number + let current_number = self.best_block_number(); + + // check if we need to check pending hashes + let include_pending = filter.to_block == Some(BlockNumber::Pending); + + // build appropriate filter + let mut filter: EthcoreFilter = filter.clone().into(); + + // retrieve reorg logs + let (mut reorg, reorg_len) = last_block_hash.map_or_else(|| (Vec::new(), 0), |h| self.removed_logs(h, &filter)); + *block_number -= reorg_len as u64; + + filter.from_block = BlockId::Number(*block_number); + filter.to_block = BlockId::Latest; + + // retrieve pending logs + let pending = if include_pending { + let pending_logs = self.pending_logs(current_number, &filter); + + // remove logs about which client was already notified about + let new_pending_logs: Vec<_> = pending_logs.iter() + .filter(|p| !previous_logs.contains(p)) + .cloned() + .collect(); + + // save all logs retrieved by client + *previous_logs = pending_logs.into_iter().collect(); + + new_pending_logs + } else { + Vec::new() + }; + + // save the number of the next block as a first block from which + // we want to get logs + *block_number = current_number + 1; + + // save the current block hash, which we used to get back to the + // canon chain in case of reorg. + *last_block_hash = self.block_hash(BlockId::Number(current_number)); + + // retrieve logs in range from_block..min(BlockId::Latest..to_block) + let limit = filter.limit; + Either::B(self.logs(filter) + .map(move |logs| { reorg.extend(logs); reorg }) // append reorg logs in the front + .map(move |mut logs| { logs.extend(pending); logs }) // append fetched pending logs + .map(move |logs| limit_logs(logs, limit)) // limit the logs + .map(FilterChanges::Logs)) } - }) + })) } fn filter_logs(&self, index: Index) -> BoxFuture> { let filter = { let mut polls = self.polls().lock(); - match polls.poll(&index.value()) { - Some(&PollFilter::Logs(ref _block_number, ref _last_block_hash, ref _previous_log, ref filter)) => filter.clone(), - // just empty array - Some(_) => return Box::new(future::ok(Vec::new())), + match polls.poll(&index.value()).and_then(|f| f.modify(|filter| match *filter { + PollFilter::Logs(.., ref filter) => Some(filter.clone()), + _ => None, + })) { + Some(filter) => filter, None => return Box::new(future::err(errors::filter_not_found())), } }; diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index ba03c37d9c6..e1a558fc09b 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -16,6 +16,7 @@ //! Eth RPC interface for the light client. +use std::collections::BTreeSet; use std::sync::Arc; use jsonrpc_core::{Result, BoxFuture}; @@ -41,7 +42,7 @@ use transaction::SignedTransaction; use v1::impls::eth_filter::Filterable; use v1::helpers::{errors, limit_logs}; -use v1::helpers::{PollFilter, PollManager}; +use v1::helpers::{SyncPollFilter, PollManager}; use v1::helpers::light_fetch::{self, LightFetch}; use v1::traits::Eth; use v1::types::{ @@ -61,7 +62,7 @@ pub struct EthClient { transaction_queue: Arc>, accounts: Arc, cache: Arc>, - polls: Mutex>, + polls: Mutex>, gas_price_percentile: usize, } @@ -533,8 +534,8 @@ impl Filterable for EthClient { self.client.block_hash(id) } - fn pending_transactions_hashes(&self) -> Vec<::ethereum_types::H256> { - Vec::new() + fn pending_transaction_hashes(&self) -> BTreeSet<::ethereum_types::H256> { + BTreeSet::new() } fn logs(&self, filter: EthcoreFilter) -> BoxFuture> { @@ -545,7 +546,7 @@ impl Filterable for EthClient { Vec::new() // light clients don't mine. } - fn polls(&self) -> &Mutex> { + fn polls(&self) -> &Mutex> { &self.polls } diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs index 3e60a56041e..c53bf808541 100644 --- a/rpc/src/v1/tests/helpers/miner_service.rs +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -17,7 +17,7 @@ //! Test implementation of miner service. use std::sync::Arc; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use bytes::Bytes; use ethcore::account_provider::SignError as AccountError; @@ -219,6 +219,10 @@ impl MinerService for TestMinerService { self.queued_transactions() } + fn pending_transaction_hashes(&self, _chain: &C) -> BTreeSet { + self.queued_transactions().into_iter().map(|tx| tx.signed().hash()).collect() + } + fn queued_transactions(&self) -> Vec> { self.pending_transactions.lock().values().cloned().map(|tx| { Arc::new(VerifiedTransaction::from_pending_block_transaction(tx))