Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Transaction Pool improvements #8470

Merged
merged 5 commits into from
May 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions miner/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ impl VerifiedTransaction {
self.priority
}

/// Gets transaction insertion id.
pub(crate) fn insertion_id(&self) -> usize {
self.insertion_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this insertion_id used? It seems we wrap this type in txpool::Transaction and use the outer insertion_id instead.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So pool manages it's internal insertion_id (it's required for uniqueness).

This insertion_id here is used for detectingFuture transactions that occupy the pool for a long time, and never get included: https://github.com/paritytech/parity/blob/4900e4da6b7540ba2f172874e723e926adbc8c77/miner/src/pool/ready.rs#L93
We wait for half of the pool to be replaced, but never less than 100 transactions:
https://github.com/paritytech/parity/blob/4900e4da6b7540ba2f172874e723e926adbc8c77/miner/src/pool/queue.rs#L289

We can have any other retention mechanism that is not tied to ids, but I didn't want to change this in that particular PR (time based or block number based for instance).

}

/// Gets wrapped `SignedTransaction`
pub fn signed(&self) -> &transaction::SignedTransaction {
&self.transaction
Expand All @@ -114,9 +119,13 @@ impl VerifiedTransaction {
pub fn pending(&self) -> &transaction::PendingTransaction {
&self.transaction
}

}

impl txpool::VerifiedTransaction for VerifiedTransaction {
type Hash = H256;
type Sender = Address;

fn hash(&self) -> &H256 {
&self.hash
}
Expand All @@ -128,8 +137,4 @@ impl txpool::VerifiedTransaction for VerifiedTransaction {
fn sender(&self) -> &Address {
&self.sender
}

fn insertion_id(&self) -> u64 {
self.insertion_id as u64
}
}
4 changes: 2 additions & 2 deletions miner/src/pool/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,11 @@ impl TransactionQueue {
// We want to clear stale transactions from the queue as well.
// (Transactions that are occuping the queue for a long time without being included)
let stale_id = {
let current_id = self.insertion_id.load(atomic::Ordering::Relaxed) as u64;
let current_id = self.insertion_id.load(atomic::Ordering::Relaxed);
// wait at least for half of the queue to be replaced
let gap = self.pool.read().options().max_count / 2;
// but never less than 100 transactions
let gap = cmp::max(100, gap) as u64;
let gap = cmp::max(100, gap);

current_id.checked_sub(gap)
};
Expand Down
12 changes: 6 additions & 6 deletions miner/src/pool/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ pub struct State<C> {
nonces: HashMap<Address, U256>,
state: C,
max_nonce: Option<U256>,
stale_id: Option<u64>,
stale_id: Option<usize>,
}

impl<C> State<C> {
/// Create new State checker, given client interface.
pub fn new(
state: C,
stale_id: Option<u64>,
stale_id: Option<usize>,
max_nonce: Option<U256>,
) -> Self {
State {
Expand Down Expand Up @@ -91,10 +91,10 @@ impl<C: NonceClient> txpool::Ready<VerifiedTransaction> for State<C> {
match tx.transaction.nonce.cmp(nonce) {
// Before marking as future check for stale ids
cmp::Ordering::Greater => match self.stale_id {
Some(id) if tx.insertion_id() < id => txpool::Readiness::Stalled,
Some(id) if tx.insertion_id() < id => txpool::Readiness::Stale,
_ => txpool::Readiness::Future,
},
cmp::Ordering::Less => txpool::Readiness::Stalled,
cmp::Ordering::Less => txpool::Readiness::Stale,
cmp::Ordering::Equal => {
*nonce = *nonce + 1.into();
txpool::Readiness::Ready
Expand Down Expand Up @@ -178,7 +178,7 @@ mod tests {
let res = State::new(TestClient::new().with_nonce(125), None, None).is_ready(&tx);

// then
assert_eq!(res, txpool::Readiness::Stalled);
assert_eq!(res, txpool::Readiness::Stale);
}

#[test]
Expand All @@ -190,7 +190,7 @@ mod tests {
let res = State::new(TestClient::new(), Some(1), None).is_ready(&tx);

// then
assert_eq!(res, txpool::Readiness::Stalled);
assert_eq!(res, txpool::Readiness::Stale);
}

#[test]
Expand Down
11 changes: 7 additions & 4 deletions miner/src/pool/scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
//! from our local node (own transactions).

use std::cmp;
use std::sync::Arc;

use ethereum_types::U256;
use txpool;
Expand Down Expand Up @@ -69,7 +68,7 @@ impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice {
}
}

fn update_scores(&self, txs: &[Arc<VerifiedTransaction>], scores: &mut [U256], change: txpool::scoring::Change) {
fn update_scores(&self, txs: &[txpool::Transaction<VerifiedTransaction>], scores: &mut [U256], change: txpool::scoring::Change) {
use self::txpool::scoring::Change;

match change {
Expand All @@ -79,7 +78,7 @@ impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice {
assert!(i < txs.len());
assert!(i < scores.len());

scores[i] = txs[i].transaction.gas_price;
scores[i] = txs[i].transaction.transaction.gas_price;
let boost = match txs[i].priority() {
super::Priority::Local => 15,
super::Priority::Retracted => 10,
Expand Down Expand Up @@ -116,6 +115,7 @@ impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice {
mod tests {
use super::*;

use std::sync::Arc;
use pool::tests::tx::{Tx, TxExt};
use txpool::Scoring;

Expand All @@ -131,7 +131,10 @@ mod tests {
1 => ::pool::Priority::Retracted,
_ => ::pool::Priority::Regular,
};
Arc::new(verified)
txpool::Transaction {
insertion_id: 0,
transaction: Arc::new(verified),
}
}).collect::<Vec<_>>();
let initial_scores = vec![U256::from(0), 0.into(), 0.into()];

Expand Down
2 changes: 2 additions & 0 deletions transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ error-chain = "0.11"
log = "0.3"
smallvec = "0.4"
trace-time = { path = "../util/trace-time" }

[dev-dependencies]
ethereum-types = "0.3"
16 changes: 9 additions & 7 deletions transaction-pool/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,26 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use ethereum_types::H256;
/// Error chain doesn't let us have generic types.
/// So the hashes are converted to debug strings for easy display.
type Hash = String;

error_chain! {
errors {
/// Transaction is already imported
AlreadyImported(hash: H256) {
AlreadyImported(hash: Hash) {
description("transaction is already in the pool"),
display("[{:?}] already imported", hash)
display("[{}] already imported", hash)
}
/// Transaction is too cheap to enter the queue
TooCheapToEnter(hash: H256, min_score: String) {
TooCheapToEnter(hash: Hash, min_score: String) {
description("the pool is full and transaction is too cheap to replace any transaction"),
display("[{:?}] too cheap to enter the pool. Min score: {}", hash, min_score)
display("[{}] too cheap to enter the pool. Min score: {}", hash, min_score)
}
/// Transaction is too cheap to replace existing transaction that occupies the same slot.
TooCheapToReplace(old_hash: H256, hash: H256) {
TooCheapToReplace(old_hash: Hash, hash: Hash) {
description("transaction is too cheap to replace existing transaction in the pool"),
display("[{:?}] too cheap to replace: {:?}", hash, old_hash)
display("[{}] too cheap to replace: {}", hash, old_hash)
}
}
}
Expand Down
23 changes: 13 additions & 10 deletions transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@
#![warn(missing_docs)]

extern crate smallvec;
extern crate ethereum_types;
extern crate trace_time;

#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate log;

extern crate trace_time;
#[cfg(test)]
extern crate ethereum_types;

#[cfg(test)]
mod tests;
Expand All @@ -95,27 +96,29 @@ pub mod scoring;
pub use self::error::{Error, ErrorKind};
pub use self::listener::{Listener, NoopListener};
pub use self::options::Options;
pub use self::pool::{Pool, PendingIterator};
pub use self::pool::{Pool, PendingIterator, Transaction};
pub use self::ready::{Ready, Readiness};
pub use self::scoring::Scoring;
pub use self::status::{LightStatus, Status};
pub use self::verifier::Verifier;

use std::fmt;

use ethereum_types::{H256, Address};
use std::hash::Hash;

/// Already verified transaction that can be safely queued.
pub trait VerifiedTransaction: fmt::Debug {
/// Transaction hash type.
type Hash: fmt::Debug + fmt::LowerHex + Eq + Clone + Hash;

/// Transaction sender type.
type Sender: fmt::Debug + Eq + Clone + Hash;

/// Transaction hash
fn hash(&self) -> &H256;
fn hash(&self) -> &Self::Hash;

/// Memory usage
fn mem_usage(&self) -> usize;

/// Transaction sender
fn sender(&self) -> &Address;

/// Unique index of insertion (lower = older).
fn insertion_id(&self) -> u64;
fn sender(&self) -> &Self::Sender;
}
Loading