Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cp][aptos-release-v1.19] [mempool] improve parking lot eviction #14589

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions mempool/src/core_mempool/transaction_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,11 @@ impl TransactionStore {
curr_sequence_number: u64,
) -> bool {
if self.is_full() && self.check_txn_ready(txn, curr_sequence_number) {
// try to free some space in Mempool from ParkingLot by evicting a non-ready txn
if let Some(txn_pointer) = self.parking_lot_index.get_poppable() {
let now = Instant::now();
// try to free some space in Mempool from ParkingLot by evicting non-ready txns
let mut evicted_txns = 0;
let mut evicted_bytes = 0;
while let Some(txn_pointer) = self.parking_lot_index.get_poppable() {
if let Some(txn) = self
.transactions
.get_mut(&txn_pointer.sender)
Expand All @@ -370,9 +373,23 @@ impl TransactionStore {
txn.sequence_info.transaction_sequence_number
))
);
evicted_bytes += txn.get_estimated_bytes() as u64;
evicted_txns += 1;
self.index_remove(&txn);
if !self.is_full() {
break;
}
} else {
error!("Transaction not found in mempool while evicting from parking lot");
break;
}
}
if evicted_txns > 0 {
counters::CORE_MEMPOOL_PARKING_LOT_EVICTED_COUNT.observe(evicted_txns as f64);
counters::CORE_MEMPOOL_PARKING_LOT_EVICTED_BYTES.observe(evicted_bytes as f64);
counters::CORE_MEMPOOL_PARKING_LOT_EVICTED_LATENCY
.observe(now.elapsed().as_secs_f64());
}
}
self.is_full()
}
Expand Down
31 changes: 29 additions & 2 deletions mempool/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ const RANKING_SCORE_BUCKETS: &[f64] = &[

const TXN_CONSENSUS_PULLED_BUCKETS: &[f64] = &[1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 25.0, 50.0, 100.0];

static TRANSACTION_COUNT_BUCKETS: Lazy<Vec<f64>> = Lazy::new(|| {
static TXN_COUNT_BUCKETS: Lazy<Vec<f64>> = Lazy::new(|| {
exponential_buckets(
/*start=*/ 1.5, /*factor=*/ 1.5, /*count=*/ 20,
)
Expand Down Expand Up @@ -316,6 +316,33 @@ pub static CORE_MEMPOOL_TXN_CONSENSUS_PULLED_BY_BUCKET: Lazy<HistogramVec> = Laz
.unwrap()
});

pub static CORE_MEMPOOL_PARKING_LOT_EVICTED_COUNT: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"aptos_core_mempool_parking_lot_evicted_count",
"Number of txns evicted from parking lot",
TXN_COUNT_BUCKETS.clone()
)
.unwrap()
});

pub static CORE_MEMPOOL_PARKING_LOT_EVICTED_BYTES: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"aptos_core_mempool_parking_lot_evicted_bytes",
"Bytes of txns evicted from parking lot",
exponential_buckets(/*start=*/ 500.0, /*factor=*/ 1.4, /*count=*/ 32).unwrap()
)
.unwrap()
});

pub static CORE_MEMPOOL_PARKING_LOT_EVICTED_LATENCY: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"aptos_core_mempool_parking_lot_evicted_latency",
"Latency of evicting for each transaction from parking lot",
MEMPOOL_LATENCY_BUCKETS.to_vec()
)
.unwrap()
});

/// Counter of pending network events to Mempool
pub static PENDING_MEMPOOL_NETWORK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Expand All @@ -333,7 +360,7 @@ static MEMPOOL_SERVICE_TXNS: Lazy<HistogramVec> = Lazy::new(|| {
"aptos_mempool_service_transactions",
"Number of transactions handled in one request/response between mempool and consensus/state sync",
&["type"],
TRANSACTION_COUNT_BUCKETS.clone()
TXN_COUNT_BUCKETS.clone()
)
.unwrap()
});
Expand Down
10 changes: 9 additions & 1 deletion mempool/src/shared_mempool/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,15 @@ fn validate_and_add_transactions<NetworkClient, TransactionValidator>(
.start_timer();
let validation_results = transactions
.par_iter()
.map(|t| smp.validator.read().validate_transaction(t.0.clone()))
.map(|t| {
let result = smp.validator.read().validate_transaction(t.0.clone());
// Pre-compute the hash and length if the transaction is valid, before locking mempool
if result.is_ok() {
t.0.committed_hash();
t.0.txn_bytes_len();
}
result
})
.collect::<Vec<_>>();
vm_validation_timer.stop_and_record();
{
Expand Down
72 changes: 67 additions & 5 deletions mempool/src/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use aptos_types::{
account_address::AccountAddress,
chain_id::ChainId,
mempool_status::MempoolStatusCode,
transaction::{RawTransaction, Script, SignedTransaction},
transaction::{RawTransaction, Script, SignedTransaction, TransactionArgument},
};
use once_cell::sync::Lazy;
use rand::{rngs::StdRng, SeedableRng};
Expand Down Expand Up @@ -45,21 +45,83 @@ static ACCOUNTS: Lazy<Vec<AccountAddress>> = Lazy::new(|| {
]
});

static SMALL_SCRIPT: Lazy<Script> = Lazy::new(|| Script::new(vec![], vec![], vec![]));

static LARGE_SCRIPT: Lazy<Script> = Lazy::new(|| {
let mut args = vec![];
for _ in 0..200 {
args.push(TransactionArgument::Address(AccountAddress::random()));
}
Script::new(vec![], vec![], args)
});

static HUGE_SCRIPT: Lazy<Script> = Lazy::new(|| {
let mut args = vec![];
for _ in 0..200_000 {
args.push(TransactionArgument::Address(AccountAddress::random()));
}
Script::new(vec![], vec![], args)
});

#[derive(Clone, Serialize, Deserialize)]
pub struct TestTransaction {
pub(crate) address: usize,
pub(crate) address: AccountAddress,
pub(crate) sequence_number: u64,
pub(crate) gas_price: u64,
pub(crate) account_seqno: u64,
pub(crate) script: Option<Script>,
}

impl TestTransaction {
pub(crate) const fn new(address: usize, sequence_number: u64, gas_price: u64) -> Self {
pub(crate) fn new(address: usize, sequence_number: u64, gas_price: u64) -> Self {
Self {
address: TestTransaction::get_address(address),
sequence_number,
gas_price,
account_seqno: 0,
script: None,
}
}

pub(crate) fn new_with_large_script(
address: usize,
sequence_number: u64,
gas_price: u64,
) -> Self {
Self {
address: TestTransaction::get_address(address),
sequence_number,
gas_price,
account_seqno: 0,
script: Some(LARGE_SCRIPT.clone()),
}
}

pub(crate) fn new_with_huge_script(
address: usize,
sequence_number: u64,
gas_price: u64,
) -> Self {
Self {
address: TestTransaction::get_address(address),
sequence_number,
gas_price,
account_seqno: 0,
script: Some(HUGE_SCRIPT.clone()),
}
}

pub(crate) fn new_with_address(
address: AccountAddress,
sequence_number: u64,
gas_price: u64,
) -> Self {
Self {
address,
sequence_number,
gas_price,
account_seqno: 0,
script: None,
}
}

Expand Down Expand Up @@ -87,9 +149,9 @@ impl TestTransaction {
exp_timestamp_secs: u64,
) -> SignedTransaction {
let raw_txn = RawTransaction::new_script(
TestTransaction::get_address(self.address),
self.address,
self.sequence_number,
Script::new(vec![], vec![], vec![]),
self.script.clone().unwrap_or(SMALL_SCRIPT.clone()),
max_gas_amount,
self.gas_price,
exp_timestamp_secs,
Expand Down
104 changes: 99 additions & 5 deletions mempool/src/tests/core_mempool_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use aptos_config::config::{MempoolConfig, NodeConfig};
use aptos_consensus_types::common::{TransactionInProgress, TransactionSummary};
use aptos_crypto::HashValue;
use aptos_types::{
mempool_status::MempoolStatusCode, transaction::SignedTransaction, vm_status::DiscardedVMStatus,
account_address::AccountAddress, mempool_status::MempoolStatusCode,
transaction::SignedTransaction, vm_status::DiscardedVMStatus,
};
use itertools::Itertools;
use maplit::btreemap;
Expand Down Expand Up @@ -756,7 +757,9 @@ fn test_capacity_bytes() {
let mut txns = vec![];
let last_txn;
loop {
let txn = new_test_mempool_transaction(address, seq_no);
let txn = signed_txn_to_mempool_transaction(
TestTransaction::new(address, seq_no, 1).make_signed_transaction(),
);
let txn_bytes = txn.get_estimated_bytes();

if size_bytes <= capacity_bytes {
Expand Down Expand Up @@ -811,10 +814,9 @@ fn test_capacity_bytes() {
}
}

fn new_test_mempool_transaction(address: usize, sequence_number: u64) -> MempoolTransaction {
let signed_txn = TestTransaction::new(address, sequence_number, 1).make_signed_transaction();
fn signed_txn_to_mempool_transaction(txn: SignedTransaction) -> MempoolTransaction {
MempoolTransaction::new(
signed_txn,
txn,
Duration::from_secs(1),
1,
TimelineState::NotReady,
Expand Down Expand Up @@ -851,6 +853,98 @@ fn test_parking_lot_eviction() {
assert!(add_txn(&mut pool, TestTransaction::new(0, 2, 1)).is_err());
}

#[test]
fn test_parking_lot_eviction_bytes() {
// Get the small transaction size
let small_txn_size =
signed_txn_to_mempool_transaction(TestTransaction::new(1, 1, 1).make_signed_transaction())
.get_estimated_bytes();

let mut config = NodeConfig::generate_random_config();
config.mempool.capacity = 100;
// Fit 2 small transactions + one additional transaction (by overflowing the capacity bytes)
config.mempool.capacity_bytes = 3 * small_txn_size + 1;
let mut pool = CoreMempool::new(&config);
// Add 2 small transactions to parking lot
for address in 0..2 {
add_txn(&mut pool, TestTransaction::new(address, 1, 1)).unwrap();
}
// Add one large transaction that will top off the capacity bytes
add_txn(&mut pool, TestTransaction::new_with_large_script(2, 1, 1)).unwrap();
// Mempool is full. Insert a small txn for other account.
add_txn(&mut pool, TestTransaction::new(3, 0, 1)).unwrap();
}

#[test]
fn test_parking_lot_eviction_benchmark() {
// Get the small transaction size
let small_txn_size =
signed_txn_to_mempool_transaction(TestTransaction::new(1, 1, 1).make_signed_transaction())
.get_estimated_bytes();
let huge_txn_size = signed_txn_to_mempool_transaction(
TestTransaction::new_with_huge_script(1, 1, 1).make_signed_transaction(),
)
.get_estimated_bytes();
let num_small_txns = (huge_txn_size / small_txn_size) * 2;

let mut config = NodeConfig::generate_random_config();
config.mempool.capacity_per_user = 200;
config.mempool.capacity = 4_000_000;
// ~5 MB
config.mempool.capacity_bytes = num_small_txns * small_txn_size + 1;
let mut pool = CoreMempool::new(&config);

// // Add one huge transaction that will evict all transactions from parking lot
// let huge_signed_txn = TestTransaction::new_with_huge_script(0, 1, 1).make_signed_transaction();
// // Pre-compute these values, as shared mempool would do
// huge_signed_txn.committed_hash();
// huge_signed_txn.txn_bytes_len();
//
// let now = Instant::now();
// add_signed_txn(&mut pool, huge_signed_txn.clone()).unwrap();
// // Flush the huge transaction
// add_txn(&mut pool, TestTransaction::new(1, 0, 1)).unwrap();

let accounts: Vec<_> = (0..num_small_txns)
.map(|_| AccountAddress::random())
.collect();
// Fill up parking lot to capacity
for account in accounts {
for seq_num in 1..2 {
add_txn(
&mut pool,
TestTransaction::new_with_address(account, seq_num, 1),
)
.unwrap();
}
}
// Add one huge transaction that will cause mempool to be (beyond) full
let huge_signed_txn = TestTransaction::new_with_huge_script(0, 0, 1).make_signed_transaction();
add_signed_txn(&mut pool, huge_signed_txn).unwrap();
assert_eq!(pool.get_parking_lot_size(), num_small_txns);

// Add one huge transaction that will evict many transactions from parking lot
let huge_signed_txn = TestTransaction::new_with_huge_script(1, 0, 1).make_signed_transaction();
// Pre-compute these values, as shared mempool would do
huge_signed_txn.committed_hash();
huge_signed_txn.txn_bytes_len();
let now = Instant::now();
add_signed_txn(&mut pool, huge_signed_txn).unwrap();
let time_to_evict_ms = now.elapsed().as_millis();

let has_remainder = huge_txn_size % small_txn_size != 0;
let num_expected_evicted = num_small_txns / 2 + has_remainder as usize;
assert_eq!(
pool.get_parking_lot_size(),
num_small_txns - num_expected_evicted
);
assert!(
time_to_evict_ms < 300,
"Parking lot eviction of {} should take less than 300 ms on a reasonable machine",
num_expected_evicted
);
}

#[test]
fn test_parking_lot_evict_only_for_ready_txn_insertion() {
let mut config = NodeConfig::generate_random_config();
Expand Down
Loading
Loading