Skip to content

Commit

Permalink
Separate metrics with added priority
Browse files Browse the repository at this point in the history
  • Loading branch information
vusirikala committed Jul 18, 2024
1 parent 4ac3171 commit d460f6a
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 18 deletions.
42 changes: 31 additions & 11 deletions mempool/src/core_mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,26 +124,33 @@ impl Mempool {
insertion_info: &InsertionInfo,
bucket: &str,
stage: &'static str,
priority: &str,
) {
if let Ok(time_delta) = SystemTime::now().duration_since(insertion_info.insertion_time) {
counters::core_mempool_txn_commit_latency(
stage,
insertion_info.submitted_by_label(),
bucket,
time_delta,
priority,
);
}
}

fn log_consensus_pulled_latency(&self, account: AccountAddress, sequence_number: u64) {
if let Some((insertion_info, bucket)) = self
if let Some((insertion_info, bucket, priority)) = self
.transactions
.get_insertion_info_and_bucket(&account, sequence_number)
{
let prev_count = insertion_info
.consensus_pulled_counter
.fetch_add(1, Ordering::Relaxed);
Self::log_txn_latency(insertion_info, bucket, counters::CONSENSUS_PULLED_LABEL);
Self::log_txn_latency(
insertion_info,
bucket,
counters::CONSENSUS_PULLED_LABEL,
priority.to_string().as_str(),
);
counters::CORE_MEMPOOL_TXN_CONSENSUS_PULLED.observe((prev_count + 1) as f64);
}
}
Expand All @@ -154,15 +161,15 @@ impl Mempool {
sequence_number: u64,
stage: &'static str,
) {
if let Some((insertion_info, bucket)) = self
if let Some((insertion_info, bucket, priority)) = self
.transactions
.get_insertion_info_and_bucket(&account, sequence_number)
{
Self::log_txn_latency(insertion_info, bucket, stage);
Self::log_txn_latency(insertion_info, bucket, stage, priority.to_string().as_str());
}
}

fn log_commit_and_parked_latency(insertion_info: &InsertionInfo, bucket: &str) {
fn log_commit_and_parked_latency(insertion_info: &InsertionInfo, bucket: &str, priority: &str) {
let parked_duration = if let Some(park_time) = insertion_info.park_time {
let parked_duration = insertion_info
.ready_time
Expand All @@ -173,6 +180,7 @@ impl Mempool {
insertion_info.submitted_by_label(),
bucket,
parked_duration,
priority,
);
parked_duration
} else {
Expand All @@ -189,6 +197,7 @@ impl Mempool {
insertion_info.submitted_by_label(),
bucket,
commit_minus_parked,
priority,
);
}
}
Expand All @@ -199,12 +208,21 @@ impl Mempool {
sequence_number: u64,
block_timestamp: Duration,
) {
if let Some((insertion_info, bucket)) = self
if let Some((insertion_info, bucket, priority)) = self
.transactions
.get_insertion_info_and_bucket(&account, sequence_number)
{
Self::log_txn_latency(insertion_info, bucket, counters::COMMIT_ACCEPTED_LABEL);
Self::log_commit_and_parked_latency(insertion_info, bucket);
Self::log_txn_latency(
insertion_info,
bucket,
counters::COMMIT_ACCEPTED_LABEL,
priority.to_string().as_str(),
);
Self::log_commit_and_parked_latency(
insertion_info,
bucket,
priority.to_string().as_str(),
);

let insertion_timestamp =
aptos_infallible::duration_since_epoch_at(&insertion_info.insertion_time);
Expand All @@ -214,6 +232,7 @@ impl Mempool {
insertion_info.submitted_by_label(),
bucket,
insertion_to_block,
priority.to_string().as_str(),
);
}
}
Expand Down Expand Up @@ -258,7 +277,7 @@ impl Mempool {
aptos_infallible::duration_since_epoch_at(&now) + self.system_transaction_timeout;

let txn_info = MempoolTransaction::new(
txn,
txn.clone(),
expiration_time,
ranking_score,
timeline_state,
Expand All @@ -271,15 +290,16 @@ impl Mempool {
let submitted_by_label = txn_info.insertion_info.submitted_by_label();
let status = self.transactions.insert(txn_info);

if priority == BroadcastPeerPriority::Primary && status.code == MempoolStatusCode::Accepted
{
info!("txn added to mempool: {} {} status {}, priority {:?}, client_submitted {}, now: {:?}, inserted_at_sender {:?}, time_since: {:?}", txn.sender(), txn.sequence_number(), status, priority.clone(), client_submitted, now, insertion_time_at_sender, SystemTime::now().duration_since(insertion_time_at_sender.unwrap_or(SystemTime::now())));
if status.code == MempoolStatusCode::Accepted {
if let Some(insertion_time_at_sender) = insertion_time_at_sender {
if let Ok(time_delta) = now.duration_since(insertion_time_at_sender) {
counters::core_mempool_txn_commit_latency(
counters::TRANSACTION_INSERTED_LABEL,
submitted_by_label,
self.transactions.get_bucket(ranking_score),
time_delta,
priority.to_string().as_str(),
);
}
}
Expand Down
18 changes: 14 additions & 4 deletions mempool/src/core_mempool/transaction_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use crate::{
mempool::Mempool,
transaction::{InsertionInfo, MempoolTransaction, TimelineState},
},
counters,
counters::{BROADCAST_BATCHED_LABEL, BROADCAST_READY_LABEL, CONSENSUS_READY_LABEL},
counters::{self, BROADCAST_BATCHED_LABEL, BROADCAST_READY_LABEL, CONSENSUS_READY_LABEL},
logging::{LogEntry, LogEvent, LogSchema, TxnsLog},
network::BroadcastPeerPriority,
shared_mempool::types::MultiBucketTimelineIndexIds,
};
use aptos_config::config::MempoolConfig;
Expand Down Expand Up @@ -156,9 +156,13 @@ impl TransactionStore {
&self,
address: &AccountAddress,
sequence_number: u64,
) -> Option<(&InsertionInfo, &str)> {
) -> Option<(&InsertionInfo, &str, &BroadcastPeerPriority)> {
if let Some(txn) = self.get_mempool_txn(address, sequence_number) {
return Some((&txn.insertion_info, self.get_bucket(txn.ranking_score)));
return Some((
&txn.insertion_info,
self.get_bucket(txn.ranking_score),
&txn.priority_of_sender,
));
}
None
}
Expand Down Expand Up @@ -362,6 +366,7 @@ impl TransactionStore {
bucket: &str,
insertion_info: &mut InsertionInfo,
broadcast_ready: bool,
priority: &str,
) {
insertion_info.ready_time = SystemTime::now();
if let Ok(time_delta) = SystemTime::now().duration_since(insertion_info.insertion_time) {
Expand All @@ -372,19 +377,22 @@ impl TransactionStore {
submitted_by,
bucket,
time_delta,
priority,
);
counters::core_mempool_txn_commit_latency(
BROADCAST_READY_LABEL,
submitted_by,
bucket,
time_delta,
priority,
);
} else {
counters::core_mempool_txn_commit_latency(
CONSENSUS_READY_LABEL,
submitted_by,
bucket,
time_delta,
priority,
);
}
}
Expand Down Expand Up @@ -429,6 +437,7 @@ impl TransactionStore {
self.timeline_index.get_bucket(txn.ranking_score),
&mut txn.insertion_info,
process_broadcast_ready,
txn.priority_of_sender.to_string().as_str(),
);
}

Expand Down Expand Up @@ -594,6 +603,7 @@ impl TransactionStore {
&txn.insertion_info,
bucket,
BROADCAST_BATCHED_LABEL,
txn.priority_of_sender.to_string().as_str(),
);
counters::core_mempool_txn_ranking_score(
BROADCAST_BATCHED_LABEL,
Expand Down
18 changes: 18 additions & 0 deletions mempool/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,16 @@ pub fn core_mempool_txn_commit_latency(
submitted_by: &'static str,
bucket: &str,
latency: Duration,
priority: &str,
) {
// Will be deprecated soon
CORE_MEMPOOL_TXN_COMMIT_LATENCY
.with_label_values(&[stage, submitted_by, bucket])
.observe(latency.as_secs_f64());

CORE_MEMPOOL_TXN_LATENCIES
.with_label_values(&[stage, submitted_by, bucket, priority])
.observe(latency.as_secs_f64());
}

/// Counter tracking latency of txns reaching various stages in committing
Expand All @@ -209,6 +215,18 @@ static CORE_MEMPOOL_TXN_COMMIT_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
.unwrap()
});

/// Counter tracking latency of txns reaching various stages
/// (e.g. time from txn entering core mempool to being pulled in consensus block)
static CORE_MEMPOOL_TXN_LATENCIES: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"aptos_core_mempool_txn_latencies",
"Latency of txn reaching various stages in core mempool after insertion",
&["stage", "submitted_by", "bucket", "priority"],
MEMPOOL_LATENCY_BUCKETS.to_vec()
)
.unwrap()
});

pub fn core_mempool_txn_ranking_score(
stage: &'static str,
status: &str,
Expand Down
10 changes: 10 additions & 0 deletions mempool/src/shared_mempool/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use fail::fail_point;
use serde::{Deserialize, Serialize};
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
fmt::Display,
ops::Add,
sync::Arc,
time::{Duration, Instant, SystemTime},
Expand Down Expand Up @@ -93,6 +94,15 @@ pub enum BroadcastPeerPriority {
Failover,
}

impl Display for BroadcastPeerPriority {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BroadcastPeerPriority::Primary => write!(f, "Primary"),
BroadcastPeerPriority::Failover => write!(f, "Failover"),
}
}
}

#[derive(Clone, Debug)]
pub(crate) struct MempoolNetworkInterface<NetworkClient> {
network_client: NetworkClient,
Expand Down
6 changes: 3 additions & 3 deletions mempool/src/tests/core_mempool_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,20 @@ fn test_transaction_metrics() {
);

// Check timestamp returned as end-to-end for broadcast-able transaction
let (insertion_info, _bucket) = mempool
let (insertion_info, _bucket, _priority) = mempool
.get_transaction_store()
.get_insertion_info_and_bucket(&TestTransaction::get_address(0), 0)
.unwrap();
assert_eq!(insertion_info.submitted_by, SubmittedBy::Downstream);

// Check timestamp returned as not end-to-end for non-broadcast-able transaction
let (insertion_info, _bucket) = mempool
let (insertion_info, _bucket, _priority) = mempool
.get_transaction_store()
.get_insertion_info_and_bucket(&TestTransaction::get_address(1), 0)
.unwrap();
assert_eq!(insertion_info.submitted_by, SubmittedBy::PeerValidator);

let (insertion_info, _bucket) = mempool
let (insertion_info, _bucket, _priority) = mempool
.get_transaction_store()
.get_insertion_info_and_bucket(&TestTransaction::get_address(2), 0)
.unwrap();
Expand Down

0 comments on commit d460f6a

Please sign in to comment.