Skip to content

Commit

Permalink
[mempool] eager expiration based on queued transaction age (#6965) (#…
Browse files Browse the repository at this point in the history
…6991)

### Description

Introduces eager expiration, which expires transactions earlier (default: 3s) than its true client-provided expiration. This prevents transactions that are pulled from mempool expiring upon execution, but more importantly it prevents these transactions from blocking transactions that would have succeeded at execution.

Eager expiration is triggered if sufficiently old transactions (default: 10s) are observed. This internally signals to mempool that a backlog is building.

### Test Plan

Run an overload test `three_region_simulation_graceful_overload` with quorum store and observe that expirations drop from ~3K/s to < 100/s and this pushes TPS up +15% from 3.8K -> 4.4K
  • Loading branch information
bchocho authored Mar 8, 2023
1 parent db0a542 commit 7a0002f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 1 deletion.
4 changes: 4 additions & 0 deletions config/src/config/mempool_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct MempoolConfig {
pub system_transaction_timeout_secs: u64,
pub system_transaction_gc_interval_ms: u64,
pub broadcast_buckets: Vec<u64>,
pub eager_expire_threshold_ms: Option<u64>,
pub eager_expire_time_ms: u64,
}

impl Default for MempoolConfig {
Expand All @@ -47,6 +49,8 @@ impl Default for MempoolConfig {
system_transaction_timeout_secs: 600,
system_transaction_gc_interval_ms: 60_000,
broadcast_buckets: DEFAULT_BROADCAST_BUCKETS.to_vec(),
eager_expire_threshold_ms: Some(10_000),
eager_expire_time_ms: 3_000,
}
}
}
4 changes: 4 additions & 0 deletions mempool/src/core_mempool/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ impl TTLIndex {
}
}

pub(crate) fn iter(&self) -> Iter<TTLOrderingKey> {
self.data.iter()
}

pub(crate) fn size(&self) -> usize {
self.data.len()
}
Expand Down
2 changes: 2 additions & 0 deletions mempool/src/core_mempool/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct MempoolTransaction {
pub timeline_state: TimelineState,
pub sequence_info: SequenceInfo,
pub insertion_time: SystemTime,
pub was_parked: bool,
}

impl MempoolTransaction {
Expand All @@ -44,6 +45,7 @@ impl MempoolTransaction {
ranking_score,
timeline_state,
insertion_time,
was_parked: false,
}
}

Expand Down
44 changes: 43 additions & 1 deletion mempool/src/core_mempool/transaction_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct TransactionStore {
// we keep it separate from `expiration_time_index` so Mempool can't be clogged
// by old transactions even if it hasn't received commit callbacks for a while
system_ttl_index: TTLIndex,
// Broadcast-ready transactions, with a timeline per bucket.
timeline_index: MultiBucketTimelineIndex,
// keeps track of "non-ready" txns (transactions that can't be included in next block)
parking_lot_index: ParkingLotIndex,
Expand All @@ -75,6 +76,10 @@ pub struct TransactionStore {
capacity_bytes: usize,
capacity_per_user: usize,
max_batch_bytes: u64,

// eager expiration
eager_expire_threshold: Option<Duration>,
eager_expire_time: Duration,
}

impl TransactionStore {
Expand Down Expand Up @@ -103,6 +108,10 @@ impl TransactionStore {
capacity_bytes: config.capacity_bytes,
capacity_per_user: config.capacity_per_user,
max_batch_bytes: config.shared_mempool_max_batch_bytes,

// eager expiration
eager_expire_threshold: config.eager_expire_threshold_ms.map(Duration::from_millis),
eager_expire_time: Duration::from_millis(config.eager_expire_time_ms),
}
}

Expand Down Expand Up @@ -443,6 +452,7 @@ impl TransactionStore {
TimelineState::Ready(_) => {},
_ => {
self.parking_lot_index.insert(txn);
txn.was_parked = true;
parking_lot_txns += 1;
},
}
Expand Down Expand Up @@ -623,14 +633,45 @@ impl TransactionStore {
.collect()
}

/// If the oldest transaction (that never entered parking lot) is larger than
/// eager_expire_threshold, there is significant backlog so add eager_expire_time
fn eager_expire_time(&self, gc_time: Duration) -> Duration {
let eager_expire_threshold = match self.eager_expire_threshold {
None => {
return gc_time;
},
Some(v) => v,
};

let mut oldest_insertion_time = None;
// Limit the worst-case linear search to 20.
for key in self.system_ttl_index.iter().take(20) {
if let Some(txn) = self.get_mempool_txn(&key.address, key.sequence_number) {
if !txn.was_parked {
oldest_insertion_time = Some(txn.insertion_time);
break;
}
}
}
if let Some(insertion_time) = oldest_insertion_time {
if let Ok(age) = SystemTime::now().duration_since(insertion_time) {
if age > eager_expire_threshold {
counters::CORE_MEMPOOL_GC_EAGER_EXPIRE_EVENT_COUNT.inc();
return gc_time.saturating_add(self.eager_expire_time);
}
}
}
gc_time
}

/// Garbage collect old transactions.
pub(crate) fn gc_by_system_ttl(&mut self, gc_time: Duration) {
self.gc(gc_time, true);
}

/// Garbage collect old transactions based on client-specified expiration time.
pub(crate) fn gc_by_expiration_time(&mut self, block_time: Duration) {
self.gc(block_time, false);
self.gc(self.eager_expire_time(block_time), false);
}

fn gc(&mut self, now: Duration, by_system_ttl: bool) {
Expand Down Expand Up @@ -672,6 +713,7 @@ impl TransactionStore {
// mark all following txns as non-ready, i.e. park them
for (_, t) in txns.range_mut((park_range_start, park_range_end)) {
self.parking_lot_index.insert(t);
t.was_parked = true;
self.priority_index.remove(t);
self.timeline_index.remove(t);
if let TimelineState::Ready(_) = t.timeline_state {
Expand Down
9 changes: 9 additions & 0 deletions mempool/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,15 @@ pub static CORE_MEMPOOL_GC_EVENT_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

/// Counter for number of periodic client garbage-collection (=GC) events that happen with eager
/// expiration, regardless of how many txns were actually cleaned up in this GC event
pub static CORE_MEMPOOL_GC_EAGER_EXPIRE_EVENT_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"aptos_core_mempool_gc_eager_expire_event_count",
"Number of times the periodic garbage-collection event triggers eager expiration, regardless of how many txns were actually removed")
.unwrap()
});

/// Counter tracking time for how long a transaction stayed in core-mempool before being garbage-collected
pub static CORE_MEMPOOL_GC_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
Expand Down

0 comments on commit 7a0002f

Please sign in to comment.