From 45bd9aee0c3e38310d58a335af0bd1e6cc1d9f3b Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Fri, 8 Oct 2021 14:28:10 -0500 Subject: [PATCH] invoke cost tracker from its bank --- core/src/banking_stage.rs | 122 ++++++-------------------------------- core/src/tpu.rs | 4 +- 2 files changed, 20 insertions(+), 106 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 5d242f7f780d07..2dc71c899ce2ec 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -20,6 +20,7 @@ use solana_perf::{ use solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder}; use solana_runtime::{ accounts_db::ErrorCounters, + cost_model::CostModel, bank::{ Bank, ExecuteTimings, TransactionBalancesSet, TransactionCheckResult, TransactionExecutionResult, @@ -55,7 +56,7 @@ use std::{ net::{SocketAddr, UdpSocket}, ops::DerefMut, sync::atomic::{AtomicU64, AtomicUsize, Ordering}, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, Mutex, RwLockReadGuard}, thread::{self, Builder, JoinHandle}, time::Duration, time::Instant, @@ -96,7 +97,6 @@ pub struct BankingStageStats { current_buffered_packet_batches_count: AtomicUsize, rebuffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize, - reset_cost_tracker_count: AtomicUsize, cost_tracker_check_count: AtomicUsize, cost_forced_retry_transactions_count: AtomicUsize, @@ -175,11 +175,6 @@ impl BankingStageStats { .swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "reset_cost_tracker_count", - self.reset_cost_tracker_count.swap(0, Ordering::Relaxed) as i64, - i64 - ), ( "cost_tracker_check_count", self.cost_tracker_check_count.swap(0, Ordering::Relaxed) as i64, @@ -288,7 +283,7 @@ impl BankingStage { verified_vote_receiver: CrossbeamReceiver>, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, - cost_tracker: Arc>, + cost_model: &Arc>, ) -> Self { Self::new_num_threads( cluster_info, @@ -299,7 +294,6 @@ impl BankingStage { Self::num_threads(), transaction_status_sender, gossip_vote_sender, - cost_tracker, ) } @@ -312,7 +306,6 @@ impl BankingStage { num_threads: u32, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, - cost_tracker: Arc>, ) -> Self { let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH); // Single thread to generate entries from many banks. @@ -346,7 +339,6 @@ impl BankingStage { let transaction_status_sender = transaction_status_sender.clone(); let gossip_vote_sender = gossip_vote_sender.clone(); let duplicates = duplicates.clone(); - let cost_tracker = cost_tracker.clone(); let data_budget = data_budget.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) @@ -362,7 +354,6 @@ impl BankingStage { transaction_status_sender, gossip_vote_sender, &duplicates, - &cost_tracker, &data_budget, ); }) @@ -426,24 +417,6 @@ impl BankingStage { has_more_unprocessed_transactions } - fn reset_cost_tracker_if_new_bank( - cost_tracker: &Arc>, - bank_slot: Slot, - banking_stage_stats: &BankingStageStats, - cost_tracker_stats: &mut CostTrackerStats, - ) { - if cost_tracker - .write() - .unwrap() - .reset_if_new_bank(bank_slot, cost_tracker_stats) - { - // only increase counter when bank changed - banking_stage_stats - .reset_cost_tracker_count - .fetch_add(1, Ordering::Relaxed); - } - } - #[allow(clippy::too_many_arguments)] pub fn consume_buffered_packets( my_pubkey: &Pubkey, @@ -455,7 +428,6 @@ impl BankingStage { test_fn: Option, banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, - cost_tracker: &Arc>, cost_tracker_stats: &mut CostTrackerStats, ) { let mut rebuffered_packets_len = 0; @@ -474,7 +446,6 @@ impl BankingStage { original_unprocessed_indexes, my_pubkey, *next_leader, - cost_tracker, banking_stage_stats, cost_tracker_stats, ); @@ -489,12 +460,6 @@ impl BankingStage { bank_creation_time, }) = bank_start { - Self::reset_cost_tracker_if_new_bank( - cost_tracker, - working_bank.slot(), - banking_stage_stats, - cost_tracker_stats, - ); let (processed, verified_txs_len, new_unprocessed_indexes) = Self::process_packets_transactions( &working_bank, @@ -505,7 +470,6 @@ impl BankingStage { transaction_status_sender.clone(), gossip_vote_sender, banking_stage_stats, - cost_tracker, cost_tracker_stats, ); if processed < verified_txs_len @@ -611,7 +575,6 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, - cost_tracker: &Arc>, data_budget: &DataBudget, cost_tracker_stats: &mut CostTrackerStats, ) -> BufferedPacketsDecision { @@ -624,15 +587,6 @@ impl BankingStage { ) = { let poh = poh_recorder.lock().unwrap(); bank_start = poh.bank_start(); - if let Some(ref bank_start) = bank_start { - Self::reset_cost_tracker_if_new_bank( - cost_tracker, - bank_start.working_bank.slot(), - banking_stage_stats, - cost_tracker_stats, - ); - }; - ( poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET), PohRecorder::get_working_bank_if_not_expired(&bank_start.as_ref()), @@ -663,7 +617,6 @@ impl BankingStage { None::>, banking_stage_stats, recorder, - cost_tracker, cost_tracker_stats, ); } @@ -742,7 +695,6 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, duplicates: &Arc, PacketHasher)>>, - cost_tracker: &Arc>, data_budget: &DataBudget, ) { let recorder = poh_recorder.lock().unwrap().recorder(); @@ -764,7 +716,6 @@ impl BankingStage { &gossip_vote_sender, &banking_stage_stats, &recorder, - cost_tracker, data_budget, &mut cost_tracker_stats, ); @@ -801,7 +752,6 @@ impl BankingStage { &banking_stage_stats, duplicates, &recorder, - cost_tracker, &mut cost_tracker_stats, ) { Ok(()) | Err(RecvTimeoutError::Timeout) => (), @@ -1137,7 +1087,7 @@ impl BankingStage { msgs: &Packets, transaction_indexes: &[usize], feature_set: &Arc, - cost_tracker: &Arc>, + read_cost_tracker: &RwLockReadGuard, banking_stage_stats: &BankingStageStats, demote_program_write_locks: bool, votes_only: bool, @@ -1171,14 +1121,13 @@ impl BankingStage { let mut cost_tracker_check_time = Measure::start("cost_tracker_check_time"); let (filtered_transactions, filter_transaction_packet_indexes) = { - let cost_tracker_readonly = cost_tracker.read().unwrap(); verified_transactions_with_packet_indexes .into_iter() .filter_map(|(tx, tx_index)| { // excluding vote TX from cost_model, for now let is_vote = &msgs.packets[tx_index].meta.is_simple_vote_tx; if !is_vote - && cost_tracker_readonly + && read_cost_tracker .would_transaction_fit( &tx, demote_program_write_locks, @@ -1258,7 +1207,6 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, - cost_tracker: &Arc>, cost_tracker_stats: &mut CostTrackerStats, ) -> (usize, usize, Vec) { let mut packet_conversion_time = Measure::start("packet_conversion"); @@ -1267,7 +1215,7 @@ impl BankingStage { msgs, &packet_indexes, &bank.feature_set, - cost_tracker, + &bank.read_cost_tracker().unwrap(), banking_stage_stats, bank.demote_program_write_locks(), bank.vote_only_bank(), @@ -1308,7 +1256,7 @@ impl BankingStage { let mut cost_tracking_time = Measure::start("cost_tracking_time"); transactions.iter().enumerate().for_each(|(index, tx)| { if unprocessed_tx_indexes.iter().all(|&i| i != index) { - cost_tracker.write().unwrap().add_transaction_cost( + bank.write_cost_tracker().unwrap().add_transaction_cost( tx, bank.demote_program_write_locks(), cost_tracker_stats, @@ -1357,7 +1305,6 @@ impl BankingStage { transaction_indexes: &[usize], my_pubkey: &Pubkey, next_leader: Option, - cost_tracker: &Arc>, banking_stage_stats: &BankingStageStats, cost_tracker_stats: &mut CostTrackerStats, ) -> Vec { @@ -1377,7 +1324,7 @@ impl BankingStage { msgs, transaction_indexes, &bank.feature_set, - cost_tracker, + &bank.read_cost_tracker().unwrap(), banking_stage_stats, bank.demote_program_write_locks(), bank.vote_only_bank(), @@ -1442,7 +1389,6 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, duplicates: &Arc, PacketHasher)>>, recorder: &TransactionRecorder, - cost_tracker: &Arc>, cost_tracker_stats: &mut CostTrackerStats, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("process_packets_recv"); @@ -1490,12 +1436,6 @@ impl BankingStage { working_bank, bank_creation_time, } = &*working_bank_start.unwrap(); - Self::reset_cost_tracker_if_new_bank( - cost_tracker, - working_bank.slot(), - banking_stage_stats, - cost_tracker_stats, - ); let (processed, verified_txs_len, unprocessed_indexes) = Self::process_packets_transactions( @@ -1507,7 +1447,6 @@ impl BankingStage { transaction_status_sender.clone(), gossip_vote_sender, banking_stage_stats, - cost_tracker, cost_tracker_stats, ); @@ -1540,7 +1479,6 @@ impl BankingStage { &packet_indexes, my_pubkey, next_leader, - cost_tracker, banking_stage_stats, cost_tracker_stats, ); @@ -1772,9 +1710,7 @@ mod tests { gossip_verified_vote_receiver, None, gossip_vote_sender, - Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( - CostModel::default(), - ))))), + &Arc::new(RwLock::new(CostModel::default())), ); drop(verified_sender); drop(gossip_verified_vote_sender); @@ -1823,9 +1759,7 @@ mod tests { verified_gossip_vote_receiver, None, gossip_vote_sender, - Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( - CostModel::default(), - ))))), + &Arc::new(RwLock::new(CostModel::default())), ); trace!("sending bank"); drop(verified_sender); @@ -1898,9 +1832,7 @@ mod tests { gossip_verified_vote_receiver, None, gossip_vote_sender, - Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( - CostModel::default(), - ))))), + &Arc::new(RwLock::new(CostModel::default())), ); // fund another account so we can send 2 good transactions in a single batch. @@ -2051,9 +1983,7 @@ mod tests { 3, None, gossip_vote_sender, - Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( - CostModel::default(), - ))))), + &Arc::new(RwLock::new(CostModel::default())), ); // wait for banking_stage to eat the packets @@ -2872,9 +2802,7 @@ mod tests { None::>, &BankingStageStats::default(), &recorder, - &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( - CostModel::default(), - ))))), + &Arc::new(RwLock::new(CostModel::default())), &mut CostTrackerStats::default(), ); if num_expected_unprocessed == 0 { @@ -3199,9 +3127,7 @@ mod tests { &packets, &packet_indexes, &Arc::new(FeatureSet::default()), - &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( - CostModel::default(), - ))))), + &RwLock::new(CostTracker::default()).read().unwrap(), &BankingStageStats::default(), false, votes_only, @@ -3216,9 +3142,7 @@ mod tests { &packets, &packet_indexes, &Arc::new(FeatureSet::default()), - &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( - CostModel::default(), - ))))), + &RwLock::new(CostTracker::default()).read().unwrap(), &BankingStageStats::default(), false, votes_only, @@ -3242,9 +3166,7 @@ mod tests { &packets, &packet_indexes, &Arc::new(FeatureSet::default()), - &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( - CostModel::default(), - ))))), + &RwLock::new(CostTracker::default()).read().unwrap(), &BankingStageStats::default(), false, votes_only, @@ -3259,9 +3181,7 @@ mod tests { &packets, &packet_indexes, &Arc::new(FeatureSet::default()), - &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( - CostModel::default(), - ))))), + &RwLock::new(CostTracker::default()).read().unwrap(), &BankingStageStats::default(), false, votes_only, @@ -3285,9 +3205,7 @@ mod tests { &packets, &packet_indexes, &Arc::new(FeatureSet::default()), - &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( - CostModel::default(), - ))))), + &RwLock::new(CostTracker::default()).read().unwrap(), &BankingStageStats::default(), false, votes_only, @@ -3302,9 +3220,7 @@ mod tests { &packets, &packet_indexes, &Arc::new(FeatureSet::default()), - &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( - CostModel::default(), - ))))), + &RwLock::new(CostTracker::default()).read().unwrap(), &BankingStageStats::default(), false, votes_only, diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 0c45edbde536b9..4865579632d1da 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -23,7 +23,6 @@ use solana_rpc::{ use solana_runtime::{ bank_forks::BankForks, cost_model::CostModel, - cost_tracker::CostTracker, vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender}, }; use std::{ @@ -123,7 +122,6 @@ impl Tpu { cluster_confirmed_slot_sender, ); - let cost_tracker = Arc::new(RwLock::new(CostTracker::new(cost_model.clone()))); let banking_stage = BankingStage::new( cluster_info, poh_recorder, @@ -132,7 +130,7 @@ impl Tpu { verified_gossip_vote_packets_receiver, transaction_status_sender, replay_vote_sender, - cost_tracker, + cost_model, ); let broadcast_stage = broadcast_type.new_broadcast_stage(