Skip to content

Commit

Permalink
add transaction cost histogram metrics (solana-labs#20350)
Browse files Browse the repository at this point in the history
  • Loading branch information
tao-stones committed Sep 30, 2021
1 parent bace66b commit e0b9b8b
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 53 deletions.
3 changes: 2 additions & 1 deletion core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use rand::{thread_rng, Rng};
use rayon::prelude::*;
use solana_core::banking_stage::{BankingStage, BankingStageStats};
use solana_core::cost_model::CostModel;
use solana_core::cost_tracker::CostTracker;
use solana_core::cost_tracker::{CostTracker, CostTrackerStats};
use solana_gossip::cluster_info::ClusterInfo;
use solana_gossip::cluster_info::Node;
use solana_ledger::blockstore_processor::process_entries;
Expand Down Expand Up @@ -97,6 +97,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::new(std::u64::MAX, std::u64::MAX),
))))),
&mut CostTrackerStats::default(),
);
});

Expand Down
99 changes: 83 additions & 16 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//! The `banking_stage` processes Transaction messages. It is intended to be used
//! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
use crate::{cost_tracker::CostTracker, packet_hasher::PacketHasher};
use crate::{
cost_tracker::CostTracker, cost_tracker::CostTrackerStats, packet_hasher::PacketHasher,
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools;
use lru::LruCache;
Expand Down Expand Up @@ -425,12 +427,16 @@ impl BankingStage {
cost_tracker: &Arc<RwLock<CostTracker>>,
bank: Arc<Bank>,
banking_stage_stats: &BankingStageStats,
cost_tracker_stats: &mut CostTrackerStats,
) {
if !bank.feature_set.is_active(&feature_set::cost_model::id()) {
return;
}

cost_tracker.write().unwrap().reset_if_new_bank(bank.slot());
cost_tracker
.write()
.unwrap()
.reset_if_new_bank(bank.slot(), cost_tracker_stats);
banking_stage_stats
.reset_cost_tracker_count
.fetch_add(1, Ordering::Relaxed);
Expand All @@ -448,6 +454,7 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
cost_tracker: &Arc<RwLock<CostTracker>>,
cost_tracker_stats: &mut CostTrackerStats,
) {
let mut rebuffered_packets_len = 0;
let mut new_tx_count = 0;
Expand All @@ -467,6 +474,7 @@ impl BankingStage {
*next_leader,
cost_tracker,
banking_stage_stats,
cost_tracker_stats,
);
Self::update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes,
Expand All @@ -479,6 +487,7 @@ impl BankingStage {
cost_tracker,
bank.clone(),
banking_stage_stats,
cost_tracker_stats,
);
let (processed, verified_txs_len, new_unprocessed_indexes) =
Self::process_packets_transactions(
Expand All @@ -491,6 +500,7 @@ impl BankingStage {
gossip_vote_sender,
banking_stage_stats,
cost_tracker,
cost_tracker_stats,
);
if processed < verified_txs_len
|| !Bank::should_bank_still_be_processing_txs(
Expand Down Expand Up @@ -595,6 +605,7 @@ impl BankingStage {
recorder: &TransactionRecorder,
data_budget: &DataBudget,
cost_tracker: &Arc<RwLock<CostTracker>>,
cost_tracker_stats: &mut CostTrackerStats,
) -> BufferedPacketsDecision {
let bank_start;
let (
Expand All @@ -610,6 +621,7 @@ impl BankingStage {
cost_tracker,
bank.clone(),
banking_stage_stats,
cost_tracker_stats,
);
};
(
Expand Down Expand Up @@ -643,6 +655,7 @@ impl BankingStage {
banking_stage_stats,
recorder,
cost_tracker,
cost_tracker_stats,
);
}
BufferedPacketsDecision::Forward => {
Expand Down Expand Up @@ -728,6 +741,7 @@ impl BankingStage {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packets = VecDeque::with_capacity(batch_limit);
let banking_stage_stats = BankingStageStats::new(id);
let mut cost_tracker_stats = CostTrackerStats::default();
loop {
while !buffered_packets.is_empty() {
let decision = Self::process_buffered_packets(
Expand All @@ -743,6 +757,7 @@ impl BankingStage {
&recorder,
data_budget,
cost_tracker,
&mut cost_tracker_stats,
);
if matches!(decision, BufferedPacketsDecision::Hold)
|| matches!(decision, BufferedPacketsDecision::ForwardAndHold)
Expand Down Expand Up @@ -778,6 +793,7 @@ impl BankingStage {
duplicates,
&recorder,
cost_tracker,
&mut cost_tracker_stats,
) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
Expand Down Expand Up @@ -1121,6 +1137,7 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats,
cost_model_enabled: bool,
demote_program_write_locks: bool,
cost_tracker_stats: &mut CostTrackerStats,
) -> (Vec<HashedTransaction<'static>>, Vec<usize>, Vec<usize>) {
let mut retryable_transaction_packet_indexes: Vec<usize> = vec![];

Expand Down Expand Up @@ -1154,7 +1171,11 @@ impl BankingStage {
// into current bank
if cost_model_enabled
&& cost_tracker_readonly
.would_transaction_fit(&tx, demote_program_write_locks)
.would_transaction_fit(
&tx,
demote_program_write_locks,
cost_tracker_stats,
)
.is_err()
{
debug!("transaction {:?} would exceed limit", tx);
Expand Down Expand Up @@ -1242,6 +1263,7 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
cost_tracker: &Arc<RwLock<CostTracker>>,
cost_tracker_stats: &mut CostTrackerStats,
) -> (usize, usize, Vec<usize>) {
let cost_model_enabled = bank.feature_set.is_active(&feature_set::cost_model::id());

Expand All @@ -1256,6 +1278,7 @@ impl BankingStage {
banking_stage_stats,
cost_model_enabled,
bank.demote_program_write_locks(),
cost_tracker_stats,
);
packet_conversion_time.stop();
inc_new_counter_info!("banking_stage-packet_conversion", 1);
Expand Down Expand Up @@ -1293,10 +1316,11 @@ impl BankingStage {
if cost_model_enabled {
transactions.iter().enumerate().for_each(|(index, tx)| {
if unprocessed_tx_indexes.iter().all(|&i| i != index) {
cost_tracker
.write()
.unwrap()
.add_transaction_cost(tx.transaction(), bank.demote_program_write_locks());
cost_tracker.write().unwrap().add_transaction_cost(
tx.transaction(),
bank.demote_program_write_locks(),
cost_tracker_stats,
);
}
});
}
Expand Down Expand Up @@ -1344,6 +1368,7 @@ impl BankingStage {
next_leader: Option<Pubkey>,
cost_tracker: &Arc<RwLock<CostTracker>>,
banking_stage_stats: &BankingStageStats,
cost_tracker_stats: &mut CostTrackerStats,
) -> Vec<usize> {
// Check if we are the next leader. If so, let's not filter the packets
// as we'll filter it again while processing the packets.
Expand All @@ -1367,6 +1392,7 @@ impl BankingStage {
banking_stage_stats,
cost_model_enabled,
bank.demote_program_write_locks(),
cost_tracker_stats,
);
unprocessed_packet_conversion_time.stop();

Expand Down Expand Up @@ -1428,6 +1454,7 @@ impl BankingStage {
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
recorder: &TransactionRecorder,
cost_tracker: &Arc<RwLock<CostTracker>>,
cost_tracker_stats: &mut CostTrackerStats,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
let mms = verified_receiver.recv_timeout(recv_timeout)?;
Expand Down Expand Up @@ -1468,7 +1495,12 @@ impl BankingStage {
continue;
}
let (bank, bank_creation_time) = bank_start.unwrap();
Self::reset_cost_tracker_if_new_bank(cost_tracker, bank.clone(), banking_stage_stats);
Self::reset_cost_tracker_if_new_bank(
cost_tracker,
bank.clone(),
banking_stage_stats,
cost_tracker_stats,
);

let (processed, verified_txs_len, unprocessed_indexes) =
Self::process_packets_transactions(
Expand All @@ -1481,6 +1513,7 @@ impl BankingStage {
gossip_vote_sender,
banking_stage_stats,
cost_tracker,
cost_tracker_stats,
);

new_tx_count += processed;
Expand Down Expand Up @@ -1514,6 +1547,7 @@ impl BankingStage {
next_leader,
cost_tracker,
banking_stage_stats,
cost_tracker_stats,
);
Self::push_unprocessed(
buffered_packets,
Expand Down Expand Up @@ -1740,7 +1774,7 @@ mod tests {
tpu_vote_receiver,
gossip_verified_vote_receiver,
None,
gossip_vote_sender,
vote_forward_sender,
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
Expand Down Expand Up @@ -1791,7 +1825,7 @@ mod tests {
tpu_vote_receiver,
verified_gossip_vote_receiver,
None,
gossip_vote_sender,
vote_forward_sender,
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
Expand Down Expand Up @@ -2847,6 +2881,7 @@ mod tests {
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
&mut CostTrackerStats::default(),
);
assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions);
// When the poh recorder has a bank, should process all non conflicting buffered packets.
Expand All @@ -2866,6 +2901,7 @@ mod tests {
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
&mut CostTrackerStats::default(),
);
if num_expected_unprocessed == 0 {
assert!(buffered_packets.is_empty())
Expand Down Expand Up @@ -2934,6 +2970,7 @@ mod tests {
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
&mut CostTrackerStats::default(),
);

// Check everything is correct. All indexes after `interrupted_iteration`
Expand Down Expand Up @@ -3182,21 +3219,31 @@ mod tests {
make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes);

let mut votes_only = false;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
let (txs, tx_packet_index, _) = BankingStage::transactions_from_packets(
&packets,
&packet_indexes,
false,
votes_only,
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(CostModel::default()))))),
&BankingStageStats::default(),
false,
false,
&mut CostTrackerStats::default(),
);
assert_eq!(2, txs.len());
assert_eq!(vec![0, 1], tx_packet_index);

votes_only = true;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
let (txs, tx_packet_index, _) = BankingStage::transactions_from_packets(
&packets,
&packet_indexes,
false,
votes_only,
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(CostModel::default()))))),
&BankingStageStats::default(),
false,
false,
&mut CostTrackerStats::default(),
);
assert_eq!(0, txs.len());
assert_eq!(0, tx_packet_index.len());
Expand All @@ -3211,21 +3258,31 @@ mod tests {
);

let mut votes_only = false;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
let (txs, tx_packet_index, _) = BankingStage::transactions_from_packets(
&packets,
&packet_indexes,
false,
votes_only,
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(CostModel::default()))))),
&BankingStageStats::default(),
false,
false,
&mut CostTrackerStats::default(),
);
assert_eq!(3, txs.len());
assert_eq!(vec![0, 1, 2], tx_packet_index);

votes_only = true;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
let (txs, tx_packet_index, _) = BankingStage::transactions_from_packets(
&packets,
&packet_indexes,
false,
votes_only,
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(CostModel::default()))))),
&BankingStageStats::default(),
false,
false,
&mut CostTrackerStats::default(),
);
assert_eq!(2, txs.len());
assert_eq!(vec![0, 2], tx_packet_index);
Expand All @@ -3240,21 +3297,31 @@ mod tests {
);

let mut votes_only = false;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
let (txs, tx_packet_index, _) = BankingStage::transactions_from_packets(
&packets,
&packet_indexes,
false,
votes_only,
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(CostModel::default()))))),
&BankingStageStats::default(),
false,
false,
&mut CostTrackerStats::default(),
);
assert_eq!(3, txs.len());
assert_eq!(vec![0, 1, 2], tx_packet_index);

votes_only = true;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
let (txs, tx_packet_index, _) = BankingStage::transactions_from_packets(
&packets,
&packet_indexes,
false,
votes_only,
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(CostModel::default()))))),
&BankingStageStats::default(),
false,
false,
&mut CostTrackerStats::default(),
);
assert_eq!(3, txs.len());
assert_eq!(vec![0, 1, 2], tx_packet_index);
Expand Down
Loading

0 comments on commit e0b9b8b

Please sign in to comment.