Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mempool] eager expiration based on queued transaction age #6965

Merged
merged 3 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/src/config/mempool_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct MempoolConfig {
pub system_transaction_timeout_secs: u64,
pub system_transaction_gc_interval_ms: u64,
pub broadcast_buckets: Vec<u64>,
pub eager_expire_threshold_ms: Option<u64>,
pub eager_expire_time_ms: u64,
}

impl Default for MempoolConfig {
Expand All @@ -47,6 +49,8 @@ impl Default for MempoolConfig {
system_transaction_timeout_secs: 600,
system_transaction_gc_interval_ms: 60_000,
broadcast_buckets: DEFAULT_BROADCAST_BUCKETS.to_vec(),
eager_expire_threshold_ms: Some(10_000),
eager_expire_time_ms: 3_000,
}
}
}
4 changes: 4 additions & 0 deletions mempool/src/core_mempool/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ impl TTLIndex {
}
}

pub(crate) fn iter(&self) -> Iter<TTLOrderingKey> {
self.data.iter()
}

pub(crate) fn size(&self) -> usize {
self.data.len()
}
Expand Down
2 changes: 2 additions & 0 deletions mempool/src/core_mempool/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct MempoolTransaction {
pub timeline_state: TimelineState,
pub sequence_info: SequenceInfo,
pub insertion_time: SystemTime,
pub was_parked: bool,
}

impl MempoolTransaction {
Expand All @@ -44,6 +45,7 @@ impl MempoolTransaction {
ranking_score,
timeline_state,
insertion_time,
was_parked: false,
}
}

Expand Down
44 changes: 43 additions & 1 deletion mempool/src/core_mempool/transaction_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct TransactionStore {
// we keep it separate from `expiration_time_index` so Mempool can't be clogged
// by old transactions even if it hasn't received commit callbacks for a while
system_ttl_index: TTLIndex,
// Broadcast-ready transactions, with a timeline per bucket.
timeline_index: MultiBucketTimelineIndex,
// keeps track of "non-ready" txns (transactions that can't be included in next block)
parking_lot_index: ParkingLotIndex,
Expand All @@ -75,6 +76,10 @@ pub struct TransactionStore {
capacity_bytes: usize,
capacity_per_user: usize,
max_batch_bytes: u64,

// eager expiration
eager_expire_threshold: Option<Duration>,
eager_expire_time: Duration,
}

impl TransactionStore {
Expand Down Expand Up @@ -103,6 +108,10 @@ impl TransactionStore {
capacity_bytes: config.capacity_bytes,
capacity_per_user: config.capacity_per_user,
max_batch_bytes: config.shared_mempool_max_batch_bytes,

// eager expiration
eager_expire_threshold: config.eager_expire_threshold_ms.map(Duration::from_millis),
eager_expire_time: Duration::from_millis(config.eager_expire_time_ms),
}
}

Expand Down Expand Up @@ -443,6 +452,7 @@ impl TransactionStore {
TimelineState::Ready(_) => {},
_ => {
self.parking_lot_index.insert(txn);
txn.was_parked = true;
parking_lot_txns += 1;
},
}
Expand Down Expand Up @@ -623,14 +633,45 @@ impl TransactionStore {
.collect()
}

/// If the oldest transaction (that never entered parking lot) is larger than
/// eager_expire_threshold, there is significant backlog so add eager_expire_time
fn eager_expire_time(&self, gc_time: Duration) -> Duration {
let eager_expire_threshold = match self.eager_expire_threshold {
None => {
return gc_time;
},
Some(v) => v,
};

let mut oldest_insertion_time = None;
// Limit the worst-case linear search to 20.
for key in self.system_ttl_index.iter().take(20) {
if let Some(txn) = self.get_mempool_txn(&key.address, key.sequence_number) {
if !txn.was_parked {
oldest_insertion_time = Some(txn.insertion_time);
break;
}
}
}
if let Some(insertion_time) = oldest_insertion_time {
if let Ok(age) = SystemTime::now().duration_since(insertion_time) {
if age > eager_expire_threshold {
counters::CORE_MEMPOOL_GC_EAGER_EXPIRE_EVENT_COUNT.inc();
return gc_time.saturating_add(self.eager_expire_time);
}
}
}
gc_time
}

/// Garbage collect old transactions.
pub(crate) fn gc_by_system_ttl(&mut self, gc_time: Duration) {
self.gc(gc_time, true);
}

/// Garbage collect old transactions based on client-specified expiration time.
pub(crate) fn gc_by_expiration_time(&mut self, block_time: Duration) {
self.gc(block_time, false);
self.gc(self.eager_expire_time(block_time), false);
}

fn gc(&mut self, now: Duration, by_system_ttl: bool) {
Expand Down Expand Up @@ -672,6 +713,7 @@ impl TransactionStore {
// mark all following txns as non-ready, i.e. park them
for (_, t) in txns.range_mut((park_range_start, park_range_end)) {
self.parking_lot_index.insert(t);
t.was_parked = true;
self.priority_index.remove(t);
self.timeline_index.remove(t);
if let TimelineState::Ready(_) = t.timeline_state {
Expand Down
9 changes: 9 additions & 0 deletions mempool/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,15 @@ pub static CORE_MEMPOOL_GC_EVENT_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

/// Counter for number of periodic client garbage-collection (=GC) events that happen with eager
/// expiration, regardless of how many txns were actually cleaned up in this GC event
pub static CORE_MEMPOOL_GC_EAGER_EXPIRE_EVENT_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"aptos_core_mempool_gc_eager_expire_event_count",
"Number of times the periodic garbage-collection event triggers eager expiration, regardless of how many txns were actually removed")
.unwrap()
});

/// Counter tracking time for how long a transaction stayed in core-mempool before being garbage-collected
pub static CORE_MEMPOOL_GC_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
Expand Down