Skip to content

Commit

Permalink
[mempool] improve parking lot eviction (#14586)
Browse files Browse the repository at this point in the history
To better handle large transactions
  • Loading branch information
bchocho authored Sep 10, 2024
1 parent 0bff36a commit 847eed2
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 88 deletions.
21 changes: 19 additions & 2 deletions mempool/src/core_mempool/transaction_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,11 @@ impl TransactionStore {
curr_sequence_number: u64,
) -> bool {
if self.is_full() && self.check_txn_ready(txn, curr_sequence_number) {
// try to free some space in Mempool from ParkingLot by evicting a non-ready txn
if let Some(txn_pointer) = self.parking_lot_index.get_poppable() {
let now = Instant::now();
// try to free some space in Mempool from ParkingLot by evicting non-ready txns
let mut evicted_txns = 0;
let mut evicted_bytes = 0;
while let Some(txn_pointer) = self.parking_lot_index.get_poppable() {
if let Some(txn) = self
.transactions
.get_mut(&txn_pointer.sender)
Expand All @@ -370,9 +373,23 @@ impl TransactionStore {
txn.sequence_info.transaction_sequence_number
))
);
evicted_bytes += txn.get_estimated_bytes() as u64;
evicted_txns += 1;
self.index_remove(&txn);
if !self.is_full() {
break;
}
} else {
error!("Transaction not found in mempool while evicting from parking lot");
break;
}
}
if evicted_txns > 0 {
counters::CORE_MEMPOOL_PARKING_LOT_EVICTED_COUNT.observe(evicted_txns as f64);
counters::CORE_MEMPOOL_PARKING_LOT_EVICTED_BYTES.observe(evicted_bytes as f64);
counters::CORE_MEMPOOL_PARKING_LOT_EVICTED_LATENCY
.observe(now.elapsed().as_secs_f64());
}
}
self.is_full()
}
Expand Down
31 changes: 29 additions & 2 deletions mempool/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ const RANKING_SCORE_BUCKETS: &[f64] = &[

const TXN_CONSENSUS_PULLED_BUCKETS: &[f64] = &[1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 25.0, 50.0, 100.0];

static TRANSACTION_COUNT_BUCKETS: Lazy<Vec<f64>> = Lazy::new(|| {
static TXN_COUNT_BUCKETS: Lazy<Vec<f64>> = Lazy::new(|| {
exponential_buckets(
/*start=*/ 1.5, /*factor=*/ 1.5, /*count=*/ 20,
)
Expand Down Expand Up @@ -316,6 +316,33 @@ pub static CORE_MEMPOOL_TXN_CONSENSUS_PULLED_BY_BUCKET: Lazy<HistogramVec> = Laz
.unwrap()
});

pub static CORE_MEMPOOL_PARKING_LOT_EVICTED_COUNT: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"aptos_core_mempool_parking_lot_evicted_count",
"Number of txns evicted from parking lot",
TXN_COUNT_BUCKETS.clone()
)
.unwrap()
});

pub static CORE_MEMPOOL_PARKING_LOT_EVICTED_BYTES: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"aptos_core_mempool_parking_lot_evicted_bytes",
"Bytes of txns evicted from parking lot",
exponential_buckets(/*start=*/ 500.0, /*factor=*/ 1.4, /*count=*/ 32).unwrap()
)
.unwrap()
});

pub static CORE_MEMPOOL_PARKING_LOT_EVICTED_LATENCY: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"aptos_core_mempool_parking_lot_evicted_latency",
"Latency of evicting for each transaction from parking lot",
MEMPOOL_LATENCY_BUCKETS.to_vec()
)
.unwrap()
});

/// Counter of pending network events to Mempool
pub static PENDING_MEMPOOL_NETWORK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Expand All @@ -333,7 +360,7 @@ static MEMPOOL_SERVICE_TXNS: Lazy<HistogramVec> = Lazy::new(|| {
"aptos_mempool_service_transactions",
"Number of transactions handled in one request/response between mempool and consensus/state sync",
&["type"],
TRANSACTION_COUNT_BUCKETS.clone()
TXN_COUNT_BUCKETS.clone()
)
.unwrap()
});
Expand Down
10 changes: 9 additions & 1 deletion mempool/src/shared_mempool/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,15 @@ fn validate_and_add_transactions<NetworkClient, TransactionValidator>(
.start_timer();
let validation_results = transactions
.par_iter()
.map(|t| smp.validator.read().validate_transaction(t.0.clone()))
.map(|t| {
let result = smp.validator.read().validate_transaction(t.0.clone());
// Pre-compute the hash and length if the transaction is valid, before locking mempool
if result.is_ok() {
t.0.committed_hash();
t.0.txn_bytes_len();
}
result
})
.collect::<Vec<_>>();
vm_validation_timer.stop_and_record();
{
Expand Down
72 changes: 67 additions & 5 deletions mempool/src/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use aptos_types::{
account_address::AccountAddress,
chain_id::ChainId,
mempool_status::MempoolStatusCode,
transaction::{RawTransaction, Script, SignedTransaction},
transaction::{RawTransaction, Script, SignedTransaction, TransactionArgument},
};
use once_cell::sync::Lazy;
use rand::{rngs::StdRng, SeedableRng};
Expand Down Expand Up @@ -45,21 +45,83 @@ static ACCOUNTS: Lazy<Vec<AccountAddress>> = Lazy::new(|| {
]
});

static SMALL_SCRIPT: Lazy<Script> = Lazy::new(|| Script::new(vec![], vec![], vec![]));

static LARGE_SCRIPT: Lazy<Script> = Lazy::new(|| {
let mut args = vec![];
for _ in 0..200 {
args.push(TransactionArgument::Address(AccountAddress::random()));
}
Script::new(vec![], vec![], args)
});

static HUGE_SCRIPT: Lazy<Script> = Lazy::new(|| {
let mut args = vec![];
for _ in 0..200_000 {
args.push(TransactionArgument::Address(AccountAddress::random()));
}
Script::new(vec![], vec![], args)
});

#[derive(Clone, Serialize, Deserialize)]
pub struct TestTransaction {
pub(crate) address: usize,
pub(crate) address: AccountAddress,
pub(crate) sequence_number: u64,
pub(crate) gas_price: u64,
pub(crate) account_seqno: u64,
pub(crate) script: Option<Script>,
}

impl TestTransaction {
pub(crate) const fn new(address: usize, sequence_number: u64, gas_price: u64) -> Self {
pub(crate) fn new(address: usize, sequence_number: u64, gas_price: u64) -> Self {
Self {
address: TestTransaction::get_address(address),
sequence_number,
gas_price,
account_seqno: 0,
script: None,
}
}

pub(crate) fn new_with_large_script(
address: usize,
sequence_number: u64,
gas_price: u64,
) -> Self {
Self {
address: TestTransaction::get_address(address),
sequence_number,
gas_price,
account_seqno: 0,
script: Some(LARGE_SCRIPT.clone()),
}
}

pub(crate) fn new_with_huge_script(
address: usize,
sequence_number: u64,
gas_price: u64,
) -> Self {
Self {
address: TestTransaction::get_address(address),
sequence_number,
gas_price,
account_seqno: 0,
script: Some(HUGE_SCRIPT.clone()),
}
}

pub(crate) fn new_with_address(
address: AccountAddress,
sequence_number: u64,
gas_price: u64,
) -> Self {
Self {
address,
sequence_number,
gas_price,
account_seqno: 0,
script: None,
}
}

Expand Down Expand Up @@ -87,9 +149,9 @@ impl TestTransaction {
exp_timestamp_secs: u64,
) -> SignedTransaction {
let raw_txn = RawTransaction::new_script(
TestTransaction::get_address(self.address),
self.address,
self.sequence_number,
Script::new(vec![], vec![], vec![]),
self.script.clone().unwrap_or(SMALL_SCRIPT.clone()),
max_gas_amount,
self.gas_price,
exp_timestamp_secs,
Expand Down
104 changes: 99 additions & 5 deletions mempool/src/tests/core_mempool_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use aptos_config::config::{MempoolConfig, NodeConfig};
use aptos_consensus_types::common::{TransactionInProgress, TransactionSummary};
use aptos_crypto::HashValue;
use aptos_types::{
mempool_status::MempoolStatusCode, transaction::SignedTransaction, vm_status::DiscardedVMStatus,
account_address::AccountAddress, mempool_status::MempoolStatusCode,
transaction::SignedTransaction, vm_status::DiscardedVMStatus,
};
use itertools::Itertools;
use maplit::btreemap;
Expand Down Expand Up @@ -756,7 +757,9 @@ fn test_capacity_bytes() {
let mut txns = vec![];
let last_txn;
loop {
let txn = new_test_mempool_transaction(address, seq_no);
let txn = signed_txn_to_mempool_transaction(
TestTransaction::new(address, seq_no, 1).make_signed_transaction(),
);
let txn_bytes = txn.get_estimated_bytes();

if size_bytes <= capacity_bytes {
Expand Down Expand Up @@ -811,10 +814,9 @@ fn test_capacity_bytes() {
}
}

fn new_test_mempool_transaction(address: usize, sequence_number: u64) -> MempoolTransaction {
let signed_txn = TestTransaction::new(address, sequence_number, 1).make_signed_transaction();
fn signed_txn_to_mempool_transaction(txn: SignedTransaction) -> MempoolTransaction {
MempoolTransaction::new(
signed_txn,
txn,
Duration::from_secs(1),
1,
TimelineState::NotReady,
Expand Down Expand Up @@ -851,6 +853,98 @@ fn test_parking_lot_eviction() {
assert!(add_txn(&mut pool, TestTransaction::new(0, 2, 1)).is_err());
}

#[test]
fn test_parking_lot_eviction_bytes() {
// Get the small transaction size
let small_txn_size =
signed_txn_to_mempool_transaction(TestTransaction::new(1, 1, 1).make_signed_transaction())
.get_estimated_bytes();

let mut config = NodeConfig::generate_random_config();
config.mempool.capacity = 100;
// Fit 2 small transactions + one additional transaction (by overflowing the capacity bytes)
config.mempool.capacity_bytes = 3 * small_txn_size + 1;
let mut pool = CoreMempool::new(&config);
// Add 2 small transactions to parking lot
for address in 0..2 {
add_txn(&mut pool, TestTransaction::new(address, 1, 1)).unwrap();
}
// Add one large transaction that will top off the capacity bytes
add_txn(&mut pool, TestTransaction::new_with_large_script(2, 1, 1)).unwrap();
// Mempool is full. Insert a small txn for other account.
add_txn(&mut pool, TestTransaction::new(3, 0, 1)).unwrap();
}

#[test]
fn test_parking_lot_eviction_benchmark() {
// Get the small transaction size
let small_txn_size =
signed_txn_to_mempool_transaction(TestTransaction::new(1, 1, 1).make_signed_transaction())
.get_estimated_bytes();
let huge_txn_size = signed_txn_to_mempool_transaction(
TestTransaction::new_with_huge_script(1, 1, 1).make_signed_transaction(),
)
.get_estimated_bytes();
let num_small_txns = (huge_txn_size / small_txn_size) * 2;

let mut config = NodeConfig::generate_random_config();
config.mempool.capacity_per_user = 200;
config.mempool.capacity = 4_000_000;
// ~5 MB
config.mempool.capacity_bytes = num_small_txns * small_txn_size + 1;
let mut pool = CoreMempool::new(&config);

// // Add one huge transaction that will evict all transactions from parking lot
// let huge_signed_txn = TestTransaction::new_with_huge_script(0, 1, 1).make_signed_transaction();
// // Pre-compute these values, as shared mempool would do
// huge_signed_txn.committed_hash();
// huge_signed_txn.txn_bytes_len();
//
// let now = Instant::now();
// add_signed_txn(&mut pool, huge_signed_txn.clone()).unwrap();
// // Flush the huge transaction
// add_txn(&mut pool, TestTransaction::new(1, 0, 1)).unwrap();

let accounts: Vec<_> = (0..num_small_txns)
.map(|_| AccountAddress::random())
.collect();
// Fill up parking lot to capacity
for account in accounts {
for seq_num in 1..2 {
add_txn(
&mut pool,
TestTransaction::new_with_address(account, seq_num, 1),
)
.unwrap();
}
}
// Add one huge transaction that will cause mempool to be (beyond) full
let huge_signed_txn = TestTransaction::new_with_huge_script(0, 0, 1).make_signed_transaction();
add_signed_txn(&mut pool, huge_signed_txn).unwrap();
assert_eq!(pool.get_parking_lot_size(), num_small_txns);

// Add one huge transaction that will evict many transactions from parking lot
let huge_signed_txn = TestTransaction::new_with_huge_script(1, 0, 1).make_signed_transaction();
// Pre-compute these values, as shared mempool would do
huge_signed_txn.committed_hash();
huge_signed_txn.txn_bytes_len();
let now = Instant::now();
add_signed_txn(&mut pool, huge_signed_txn).unwrap();
let time_to_evict_ms = now.elapsed().as_millis();

let has_remainder = huge_txn_size % small_txn_size != 0;
let num_expected_evicted = num_small_txns / 2 + has_remainder as usize;
assert_eq!(
pool.get_parking_lot_size(),
num_small_txns - num_expected_evicted
);
assert!(
time_to_evict_ms < 300,
"Parking lot eviction of {} should take less than 300 ms on a reasonable machine",
num_expected_evicted
);
}

#[test]
fn test_parking_lot_evict_only_for_ready_txn_insertion() {
let mut config = NodeConfig::generate_random_config();
Expand Down
Loading

0 comments on commit 847eed2

Please sign in to comment.