Skip to content

Commit

Permalink
e2e counters split out by top usecases
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-aptos committed Jul 18, 2024
1 parent 2591aae commit fee3974
Show file tree
Hide file tree
Showing 17 changed files with 227 additions and 75 deletions.
5 changes: 5 additions & 0 deletions config/src/config/mempool_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pub struct MempoolConfig {
pub broadcast_buckets: Vec<u64>,
pub eager_expire_threshold_ms: Option<u64>,
pub eager_expire_time_ms: u64,

pub usecase_stats_num_blocks_to_track: usize,
pub usecase_stats_num_top_to_track: usize,
}

impl Default for MempoolConfig {
Expand Down Expand Up @@ -87,6 +90,8 @@ impl Default for MempoolConfig {
broadcast_buckets: DEFAULT_BUCKETS.to_vec(),
eager_expire_threshold_ms: Some(15_000),
eager_expire_time_ms: 6_000,
usecase_stats_num_blocks_to_track: 40,
usecase_stats_num_top_to_track: 5,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use crate::transaction_shuffler::use_case_aware::{
types::{InputIdx, OutputIdx, UseCaseAwareTransaction, UseCaseKey},
types::{InputIdx, OutputIdx},
utils::StrictMap,
Config,
};
use aptos_types::transaction::use_case::{UseCaseAwareTransaction, UseCaseKey};
use move_core_types::account_address::AccountAddress;
use std::{
collections::{hash_map, BTreeMap, HashMap, VecDeque},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

use crate::transaction_shuffler::use_case_aware::{
delayed_queue::DelayedQueue,
types::{InputIdx, OutputIdx, UseCaseAwareTransaction},
types::{InputIdx, OutputIdx},
Config,
};
use aptos_types::transaction::use_case::UseCaseAwareTransaction;
use std::{collections::VecDeque, fmt::Debug};

#[derive(Debug)]
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/transaction_shuffler/use_case_aware/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::transaction_shuffler::{use_case_aware::types::UseCaseKey, TransactionShuffler};
use aptos_types::transaction::SignedTransaction;
use crate::transaction_shuffler::TransactionShuffler;
use aptos_types::transaction::{use_case::UseCaseKey, SignedTransaction};
use iterator::ShuffledTransactionIterator;

pub(crate) mod iterator;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::transaction_shuffler::use_case_aware::types::{UseCaseAwareTransaction, UseCaseKey};
use aptos_types::transaction::use_case::{UseCaseAwareTransaction, UseCaseKey};
use move_core_types::account_address::AccountAddress;
use proptest_derive::Arbitrary;
use std::fmt::Debug;
Expand Down
53 changes: 0 additions & 53 deletions consensus/src/transaction_shuffler/use_case_aware/types.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,5 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_types::transaction::SignedTransaction;
use move_core_types::account_address::AccountAddress;

pub(crate) type InputIdx = usize;
pub(crate) type OutputIdx = usize;

#[derive(Clone, Eq, Hash, PartialEq)]
pub(crate) enum UseCaseKey {
Platform,
ContractAddress(AccountAddress),
// ModuleBundle (deprecated anyway), scripts, Multisig.
Others,
}

impl std::fmt::Debug for UseCaseKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use UseCaseKey::*;

match self {
Platform => write!(f, "PP"),
ContractAddress(addr) => write!(f, "c{}", hex::encode_upper(&addr[31..])),
Others => write!(f, "OO"),
}
}
}

pub(crate) trait UseCaseAwareTransaction {
fn parse_sender(&self) -> AccountAddress;

fn parse_use_case(&self) -> UseCaseKey;
}

impl UseCaseAwareTransaction for SignedTransaction {
fn parse_sender(&self) -> AccountAddress {
self.sender()
}

fn parse_use_case(&self) -> UseCaseKey {
use aptos_types::transaction::TransactionPayload::*;
use UseCaseKey::*;

match self.payload() {
Script(_) | ModuleBundle(_) | Multisig(_) => Others,
EntryFunction(entry_fun) => {
let module_id = entry_fun.module();
if module_id.address().is_special() {
Platform
} else {
// n.b. Generics ignored.
ContractAddress(*module_id.address())
}
},
}
}
}
21 changes: 19 additions & 2 deletions mempool/src/core_mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use aptos_logger::prelude::*;
use aptos_types::{
account_address::AccountAddress,
mempool_status::{MempoolStatus, MempoolStatusCode},
transaction::SignedTransaction,
transaction::{use_case::UseCaseKey, SignedTransaction},
vm_status::DiscardedVMStatus,
};
use std::{
Expand Down Expand Up @@ -57,13 +57,14 @@ impl Mempool {
&self,
sender: &AccountAddress,
sequence_number: u64,
tracked_use_case: Option<UseCaseKey>,
block_timestamp: Duration,
) {
trace!(
LogSchema::new(LogEntry::RemoveTxn).txns(TxnsLog::new_txn(*sender, sequence_number)),
is_rejected = false
);
self.log_commit_latency(*sender, sequence_number, block_timestamp);
self.log_commit_latency(*sender, sequence_number, tracked_use_case, block_timestamp);
if let Some(ranking_score) = self.transactions.get_ranking_score(sender, sequence_number) {
counters::core_mempool_txn_ranking_score(
counters::REMOVE_LABEL,
Expand Down Expand Up @@ -196,6 +197,7 @@ impl Mempool {
&self,
account: AccountAddress,
sequence_number: u64,
tracked_use_case: Option<UseCaseKey>,
block_timestamp: Duration,
) {
if let Some((insertion_info, bucket)) = self
Expand All @@ -214,6 +216,21 @@ impl Mempool {
bucket,
insertion_to_block,
);

let use_case_label = match tracked_use_case {
Some(UseCaseKey::Platform) => "platform".to_string(),
Some(UseCaseKey::ContractAddress(addr)) => format!("contract_{:?}", addr),
Some(UseCaseKey::Others) => "other".to_string(),
None => "contract_other".to_string(),
};

counters::TXN_E2E_USE_CASE_COMMIT_LATENCY
.with_label_values(&[
&use_case_label,
insertion_info.submitted_by_label(),
bucket,
])
.observe(insertion_to_block.as_secs_f64());
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions mempool/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,16 @@ static CORE_MEMPOOL_TXN_COMMIT_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
.unwrap()
});

pub static TXN_E2E_USE_CASE_COMMIT_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"aptos_txn_e2e_use_case_commit_latency",
"Latency of txn commit_accept, by use_case",
&["use_case", "submitted_by", "bucket"],
MEMPOOL_LATENCY_BUCKETS.to_vec()
)
.unwrap()
});

pub fn core_mempool_txn_ranking_score(
stage: &'static str,
status: &str,
Expand Down
17 changes: 15 additions & 2 deletions mempool/src/shared_mempool/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
tasks,
tasks::process_committed_transactions,
types::{notify_subscribers, ScheduledBroadcast, SharedMempool, SharedMempoolNotification},
use_case_history::UseCaseHistory,
},
MempoolEventsReceiver, QuorumStoreRequest,
};
Expand Down Expand Up @@ -134,10 +135,16 @@ fn spawn_commit_notification_handler<NetworkClient, TransactionValidator>(
{
let mempool = smp.mempool.clone();
let mempool_validator = smp.validator.clone();
let use_case_history = smp.use_case_history.clone();

tokio::spawn(async move {
while let Some(commit_notification) = mempool_listener.next().await {
handle_commit_notification(&mempool, &mempool_validator, commit_notification);
handle_commit_notification(
&mempool,
&mempool_validator,
&use_case_history,
commit_notification,
);
}
});
}
Expand Down Expand Up @@ -202,6 +209,7 @@ async fn handle_client_request<NetworkClient, TransactionValidator>(
fn handle_commit_notification<TransactionValidator>(
mempool: &Arc<Mutex<CoreMempool>>,
mempool_validator: &Arc<RwLock<TransactionValidator>>,
use_case_history: &Arc<Mutex<UseCaseHistory>>,
msg: MempoolCommitNotification,
) where
TransactionValidator: TransactionValidation,
Expand All @@ -218,7 +226,12 @@ fn handle_commit_notification<TransactionValidator>(
counters::COMMIT_STATE_SYNC_LABEL,
msg.transactions.len(),
);
process_committed_transactions(mempool, msg.transactions, msg.block_timestamp_usecs);
process_committed_transactions(
mempool,
use_case_history,
msg.transactions,
msg.block_timestamp_usecs,
);
mempool_validator.write().notify_commit();
let latency = start_time.elapsed();
counters::mempool_service_latency(
Expand Down
1 change: 1 addition & 0 deletions mempool/src/shared_mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ pub use runtime::bootstrap;
pub(crate) use runtime::start_shared_mempool;
mod coordinator;
pub(crate) mod tasks;
pub(crate) mod use_case_history;
19 changes: 16 additions & 3 deletions mempool/src/shared_mempool/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ use crate::{
counters,
logging::{LogEntry, LogEvent, LogSchema},
network::{BroadcastError, MempoolSyncMsg},
shared_mempool::types::{
notify_subscribers, MultiBatchId, ScheduledBroadcast, SharedMempool,
SharedMempoolNotification, SubmissionStatusBundle,
shared_mempool::{
types::{
notify_subscribers, MultiBatchId, ScheduledBroadcast, SharedMempool,
SharedMempoolNotification, SubmissionStatusBundle,
},
use_case_history::UseCaseHistory,
},
thread_pool::IO_POOL,
QuorumStoreRequest, QuorumStoreResponse, SubmissionStatus,
Expand Down Expand Up @@ -545,16 +548,26 @@ pub(crate) fn process_quorum_store_request<NetworkClient, TransactionValidator>(
/// Remove transactions that are committed (or rejected) so that we can stop broadcasting them.
pub(crate) fn process_committed_transactions(
mempool: &Mutex<CoreMempool>,
use_case_history: &Mutex<UseCaseHistory>,
transactions: Vec<CommittedTransaction>,
block_timestamp_usecs: u64,
) {
let mut pool = mempool.lock();
let block_timestamp = Duration::from_micros(block_timestamp_usecs);

let tracking_usecases = use_case_history
.lock()
.update_usecases_and_get_tracking_set(&transactions);

for transaction in transactions {
pool.log_commit_transaction(
&transaction.sender,
transaction.sequence_number,
if tracking_usecases.contains(&transaction.use_case) {
Some(transaction.use_case.clone())
} else {
None
},
block_timestamp,
);
pool.commit_transaction(&transaction.sender, transaction.sequence_number);
Expand Down
7 changes: 7 additions & 0 deletions mempool/src/shared_mempool/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use crate::{
core_mempool::CoreMempool,
network::{MempoolNetworkInterface, MempoolSyncMsg},
shared_mempool::use_case_history::UseCaseHistory,
};
use anyhow::Result;
use aptos_config::{
Expand Down Expand Up @@ -50,6 +51,7 @@ pub(crate) struct SharedMempool<NetworkClient, TransactionValidator> {
pub validator: Arc<RwLock<TransactionValidator>>,
pub subscribers: Vec<UnboundedSender<SharedMempoolNotification>>,
pub broadcast_within_validator_network: Arc<RwLock<bool>>,
pub use_case_history: Arc<Mutex<UseCaseHistory>>,
}

impl<
Expand All @@ -67,6 +69,10 @@ impl<
role: RoleType,
) -> Self {
let network_interface = MempoolNetworkInterface::new(network_client, role, config.clone());
let use_case_history = UseCaseHistory::new(
config.usecase_stats_num_blocks_to_track,
config.usecase_stats_num_top_to_track,
);
SharedMempool {
mempool,
config,
Expand All @@ -75,6 +81,7 @@ impl<
validator,
subscribers,
broadcast_within_validator_network: Arc::new(RwLock::new(true)),
use_case_history: Arc::new(Mutex::new(use_case_history)),
}
}

Expand Down
63 changes: 63 additions & 0 deletions mempool/src/shared_mempool/use_case_history.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_mempool_notifications::CommittedTransaction;
use aptos_types::transaction::use_case::UseCaseKey;
use std::collections::{HashMap, HashSet, VecDeque};

pub(crate) struct UseCaseHistory {
window_size: usize,
num_top_to_track: usize,
recent: VecDeque<HashMap<UseCaseKey, usize>>,
}

impl UseCaseHistory {
pub(crate) fn new(window_size: usize, num_top_to_track: usize) -> Self {
Self {
window_size,
num_top_to_track,
recent: VecDeque::with_capacity(window_size + 1),
}
}

pub(crate) fn update_usecases_and_get_tracking_set(
&mut self,
transactions: &[CommittedTransaction],
) -> HashSet<UseCaseKey> {
let mut count_by_usecase = HashMap::new();
for transaction in transactions {
*count_by_usecase
.entry(transaction.use_case.clone())
.or_insert(0) += 1;
}

self.recent.push_back(count_by_usecase);
while self.recent.len() > self.window_size {
self.recent.pop_front();
}

let mut total = HashMap::new();
for group in &self.recent {
for (use_case, count) in group {
if use_case != &UseCaseKey::Platform && use_case != &UseCaseKey::Others {
*total.entry(use_case.clone()).or_insert(0) += count;
}
}
}

let mut result = HashSet::new();
result.insert(UseCaseKey::Platform);
result.insert(UseCaseKey::Others);

let mut sorted = total.into_iter().collect::<Vec<_>>();
sorted.sort_by_key(|(_, count)| *count);

for _ in 0..self.num_top_to_track {
if let Some((use_case, _)) = sorted.pop() {
result.insert(use_case);
}
}

result
}
}
Loading

0 comments on commit fee3974

Please sign in to comment.