From 7a0002f1bdda3edcdb3011088ab42853d0fc180c Mon Sep 17 00:00:00 2001 From: "Brian (Sunghoon) Cho" Date: Tue, 7 Mar 2023 17:26:31 -0800 Subject: [PATCH] [mempool] eager expiration based on queued transaction age (#6965) (#6991) ### Description Introduces eager expiration, which expires transactions earlier (default: 3s) than its true client-provided expiration. This prevents transactions that are pulled from mempool expiring upon execution, but more importantly it prevents these transactions from blocking transactions that would have succeeded at execution. Eager expiration is triggered if sufficiently old transactions (default: 10s) are observed. This internally signals to mempool that a backlog is building. ### Test Plan Run an overload test `three_region_simulation_graceful_overload` with quorum store and observe that expirations drop from ~3K/s to < 100/s and this pushes TPS up +15% from 3.8K -> 4.4K --- config/src/config/mempool_config.rs | 4 ++ mempool/src/core_mempool/index.rs | 4 ++ mempool/src/core_mempool/transaction.rs | 2 + mempool/src/core_mempool/transaction_store.rs | 44 ++++++++++++++++++- mempool/src/counters.rs | 9 ++++ 5 files changed, 62 insertions(+), 1 deletion(-) diff --git a/config/src/config/mempool_config.rs b/config/src/config/mempool_config.rs index f4e8e145c2b54..99479f905539a 100644 --- a/config/src/config/mempool_config.rs +++ b/config/src/config/mempool_config.rs @@ -27,6 +27,8 @@ pub struct MempoolConfig { pub system_transaction_timeout_secs: u64, pub system_transaction_gc_interval_ms: u64, pub broadcast_buckets: Vec, + pub eager_expire_threshold_ms: Option, + pub eager_expire_time_ms: u64, } impl Default for MempoolConfig { @@ -47,6 +49,8 @@ impl Default for MempoolConfig { system_transaction_timeout_secs: 600, system_transaction_gc_interval_ms: 60_000, broadcast_buckets: DEFAULT_BROADCAST_BUCKETS.to_vec(), + eager_expire_threshold_ms: Some(10_000), + eager_expire_time_ms: 3_000, } } } diff --git a/mempool/src/core_mempool/index.rs b/mempool/src/core_mempool/index.rs index 7f338fc569328..043cf30df42d3 100644 --- a/mempool/src/core_mempool/index.rs +++ b/mempool/src/core_mempool/index.rs @@ -158,6 +158,10 @@ impl TTLIndex { } } + pub(crate) fn iter(&self) -> Iter { + self.data.iter() + } + pub(crate) fn size(&self) -> usize { self.data.len() } diff --git a/mempool/src/core_mempool/transaction.rs b/mempool/src/core_mempool/transaction.rs index 97f04040b7908..da222baf9a3cf 100644 --- a/mempool/src/core_mempool/transaction.rs +++ b/mempool/src/core_mempool/transaction.rs @@ -23,6 +23,7 @@ pub struct MempoolTransaction { pub timeline_state: TimelineState, pub sequence_info: SequenceInfo, pub insertion_time: SystemTime, + pub was_parked: bool, } impl MempoolTransaction { @@ -44,6 +45,7 @@ impl MempoolTransaction { ranking_score, timeline_state, insertion_time, + was_parked: false, } } diff --git a/mempool/src/core_mempool/transaction_store.rs b/mempool/src/core_mempool/transaction_store.rs index 5b57bea7ae684..7d334c343c91a 100644 --- a/mempool/src/core_mempool/transaction_store.rs +++ b/mempool/src/core_mempool/transaction_store.rs @@ -56,6 +56,7 @@ pub struct TransactionStore { // we keep it separate from `expiration_time_index` so Mempool can't be clogged // by old transactions even if it hasn't received commit callbacks for a while system_ttl_index: TTLIndex, + // Broadcast-ready transactions, with a timeline per bucket. timeline_index: MultiBucketTimelineIndex, // keeps track of "non-ready" txns (transactions that can't be included in next block) parking_lot_index: ParkingLotIndex, @@ -75,6 +76,10 @@ pub struct TransactionStore { capacity_bytes: usize, capacity_per_user: usize, max_batch_bytes: u64, + + // eager expiration + eager_expire_threshold: Option, + eager_expire_time: Duration, } impl TransactionStore { @@ -103,6 +108,10 @@ impl TransactionStore { capacity_bytes: config.capacity_bytes, capacity_per_user: config.capacity_per_user, max_batch_bytes: config.shared_mempool_max_batch_bytes, + + // eager expiration + eager_expire_threshold: config.eager_expire_threshold_ms.map(Duration::from_millis), + eager_expire_time: Duration::from_millis(config.eager_expire_time_ms), } } @@ -443,6 +452,7 @@ impl TransactionStore { TimelineState::Ready(_) => {}, _ => { self.parking_lot_index.insert(txn); + txn.was_parked = true; parking_lot_txns += 1; }, } @@ -623,6 +633,37 @@ impl TransactionStore { .collect() } + /// If the oldest transaction (that never entered parking lot) is larger than + /// eager_expire_threshold, there is significant backlog so add eager_expire_time + fn eager_expire_time(&self, gc_time: Duration) -> Duration { + let eager_expire_threshold = match self.eager_expire_threshold { + None => { + return gc_time; + }, + Some(v) => v, + }; + + let mut oldest_insertion_time = None; + // Limit the worst-case linear search to 20. + for key in self.system_ttl_index.iter().take(20) { + if let Some(txn) = self.get_mempool_txn(&key.address, key.sequence_number) { + if !txn.was_parked { + oldest_insertion_time = Some(txn.insertion_time); + break; + } + } + } + if let Some(insertion_time) = oldest_insertion_time { + if let Ok(age) = SystemTime::now().duration_since(insertion_time) { + if age > eager_expire_threshold { + counters::CORE_MEMPOOL_GC_EAGER_EXPIRE_EVENT_COUNT.inc(); + return gc_time.saturating_add(self.eager_expire_time); + } + } + } + gc_time + } + /// Garbage collect old transactions. pub(crate) fn gc_by_system_ttl(&mut self, gc_time: Duration) { self.gc(gc_time, true); @@ -630,7 +671,7 @@ impl TransactionStore { /// Garbage collect old transactions based on client-specified expiration time. pub(crate) fn gc_by_expiration_time(&mut self, block_time: Duration) { - self.gc(block_time, false); + self.gc(self.eager_expire_time(block_time), false); } fn gc(&mut self, now: Duration, by_system_ttl: bool) { @@ -672,6 +713,7 @@ impl TransactionStore { // mark all following txns as non-ready, i.e. park them for (_, t) in txns.range_mut((park_range_start, park_range_end)) { self.parking_lot_index.insert(t); + t.was_parked = true; self.priority_index.remove(t); self.timeline_index.remove(t); if let TimelineState::Ready(_) = t.timeline_state { diff --git a/mempool/src/counters.rs b/mempool/src/counters.rs index 1ff64a8fc7c13..97d14758bcfe6 100644 --- a/mempool/src/counters.rs +++ b/mempool/src/counters.rs @@ -239,6 +239,15 @@ pub static CORE_MEMPOOL_GC_EVENT_COUNT: Lazy = Lazy::new(|| { .unwrap() }); +/// Counter for number of periodic client garbage-collection (=GC) events that happen with eager +/// expiration, regardless of how many txns were actually cleaned up in this GC event +pub static CORE_MEMPOOL_GC_EAGER_EXPIRE_EVENT_COUNT: Lazy = Lazy::new(|| { + register_int_counter!( + "aptos_core_mempool_gc_eager_expire_event_count", + "Number of times the periodic garbage-collection event triggers eager expiration, regardless of how many txns were actually removed") + .unwrap() +}); + /// Counter tracking time for how long a transaction stayed in core-mempool before being garbage-collected pub static CORE_MEMPOOL_GC_LATENCY: Lazy = Lazy::new(|| { register_histogram_vec!(