diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 8fe3938e37a7cb..6bfc8d6a67c74e 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -11,7 +11,7 @@ use { solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_ledger::blockstore_processor::TransactionStatusSender, solana_measure::measure::Measure, - solana_metrics::{inc_new_counter_debug, inc_new_counter_info}, + solana_metrics::inc_new_counter_info, solana_perf::{ cuda_runtime::PinnedVec, data_budget::DataBudget, @@ -433,9 +433,6 @@ impl BankingStage { original_unprocessed_indexes, new_unprocessed_indexes, ) - // TODO TAO - does ^^^ call provides # of old txs that are dropped? Prolly needs - // this number - // } else { let bank_start = poh_recorder.lock().unwrap().bank_start(); if let Some(BankStart { @@ -443,11 +440,6 @@ impl BankingStage { bank_creation_time, }) = bank_start { - // TODO TAO - msgs.packets.len() is number of packets in msgs. - // qos_service.accumulate_try_to_process_buffered_txs(msgs.packets.len(), id, working_bank); - // this counts the buffered txs (or packets) during the bank-life-time. If the - // recording bank is expired, the packets are to be re-queued. - // let (processed, verified_txs_len, new_unprocessed_indexes) = Self::process_packets_transactions( &working_bank, @@ -460,11 +452,6 @@ impl BankingStage { banking_stage_stats, qos_service, ); - // TODO TAO - here vvvv again, "processed < verified_txs_len" is used to - // indicate end-of-slot, which is exactly process_packets() does - it stops - // processingf when PoHMaxHeight reached or Bank halt, in that case, processed - // will < txs_len, but all unprocessed (retryable and those turly unprocessed) - // will be pushed into unprocessed_indexes[] for queueing if processed < verified_txs_len || !Bank::should_bank_still_be_processing_txs( &bank_creation_time, @@ -478,14 +465,6 @@ impl BankingStage { } new_tx_count += processed; - // TODO TAO - qos_service.accumulated_processed_buffered_txs(processed, id, bank.slot()) - // - // TODO TAO - qos_service.accumulated_rebuffered_txs(unprocessed_indexes.len(), id, - // bank.slot()); - // this includes block by cost_limit, account_in_use etc; maybe want to - // report the causes as well. - // TODO TAO - can added refined drop count, and newly buffered count - // Out of the buffered packets just retried, collect any still unprocessed // transactions in this batch for forwarding rebuffered_packets_len += new_unprocessed_indexes.len(); @@ -968,9 +947,6 @@ impl BankingStage { let batch = bank.prepare_sanitized_batch_with_results(txs, transactions_qos_results.into_iter()); lock_time.stop(); - // TODO TAO - above ^^^ call does not provide metrics about how many accounts were locked - // successfully, which is an important number to indicate if there are account contention. - // Should add it. // retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit and // WouldExceedMaxAccountCostLimit @@ -1337,7 +1313,6 @@ impl BankingStage { count, id, ); - inc_new_counter_debug!("banking_stage-transactions_received", count); let mut proc_start = Measure::start("process_packets_transactions_process"); let mut new_tx_count = 0; @@ -1346,13 +1321,11 @@ impl BankingStage { let mut dropped_packet_batches_count = 0; let mut newly_buffered_packets_count = 0; while let Some(msgs) = mms_iter.next() { - // TODO TAO - received `mms` has many `msgs`, a `msgs` is a packets has many `packet`, - // a `packet` is one `tx` atm. - // let packet_indexes = Self::generate_packet_indexes(&msgs.packets); let poh_recorder_bank = poh.lock().unwrap().get_poh_recorder_bank(); let working_bank_start = poh_recorder_bank.working_bank_start(); if PohRecorder::get_working_bank_if_not_expired(&working_bank_start).is_none() { + qos_service.accumulate_tpu_buffered_packets_count(msgs.packets.len() as u64); Self::push_unprocessed( buffered_packets, msgs, @@ -1364,11 +1337,6 @@ impl BankingStage { duplicates, banking_stage_stats, ); - // TODO TAO - count the received txs that being pushed due to working_bank expired - // those txs will be picked up either by next working bank, or being forwarded - // qos_service.accumulate_queued_txs_due_to_working_bank_expire(msgs.packets.len(), - // id, None_bank); - // TODO TAO - can added refined drop count, and newly buffered count continue; } @@ -1378,11 +1346,7 @@ impl BankingStage { bank_creation_time, } = &*working_bank_start.unwrap(); - // TODO TAO - msgs.packets.len() is number of packets in msgs. - // qos_service.accumulate_ingress_txs(msgs.packets.len(), id, working_bank); - // this counts the received txs (or packets) during the bank-life-time. If the - // recording bank is expired, the packets are to be queued. - // + qos_service.accumulate_tpu_ingested_packets_count(msgs.packets.len() as u64); let (processed, verified_txs_len, unprocessed_indexes) = Self::process_packets_transactions( working_bank, @@ -1397,15 +1361,9 @@ impl BankingStage { ); new_tx_count += processed; - - // TODO TAO - so here are the numbers: - // 1. msgs.packets.len() -- # txs received - // 2. verified_txs_len -- # txs verified, diff from above are txs dropped due - // to in vote-only mode, or unsanitized txs - // 3. processed -- # txs being processed - // 4. unprocessed_indexes.len() -- # txs to be buffered, - // So: 3 + 4 == 2, 2 <= 1 - // TODO TAO - qos_service.accumulated_processed_txs(processed, id, bank.slot()) + qos_service.accumulated_verified_txs_count(verified_txs_len as u64); + qos_service.accumulated_processed_txs_count(processed as u64); + qos_service.accumulated_retryable_txs_count(unprocessed_indexes.len() as u64); // Collect any unprocessed transactions in this batch for forwarding Self::push_unprocessed( @@ -1419,43 +1377,9 @@ impl BankingStage { duplicates, banking_stage_stats, ); - // TODO TAO - numbers on push_unprocessed, keep in mind this function called on both - // live stream and buffered queue, hence there are re-counts - // 1. unprocessed_indexes.len() - is unprocessed from above - // 2. (hidden inside function) dropped_duplicated_packets_count - # txs - // silently dropped as they already exists in buffer when this function - // is called again (reason?) - // 3. dropped_packets_count - # of packets pushed out of FIFO buffer to make - // room for newly buffered pacckets - // 4. newly_buffered_packets_count - # of packets being added to buffer in - // this call. - // since push_unprocessed() is called at multiple spots, at metrics level it is hard - // to be exact 1 = 2 + 4, but should be close. - // If we really want to see how live stream part works, might be better to aggregate - // 1, 2, 4 in same bucket as before-call stats (case of dup these data in qos_stats) - // - - // TODO TAO - qos_service.accumulated_unprocessed_txs(unprocessed_indexes.len(), id, - // bank.slot()); - // this includes block by cost_limit, account_in_use etc; maybe want to - // report the causes as well. - // TODO TAO - can added refined drop count, and newly buffered count // If there were retryable transactions, add the unexpired ones to the buffered queue if processed < verified_txs_len { - // TODO TAO - this ^^^ logic here, is it assuming that normally calling - // process_packets_transactions() will process ALL txs passed in, therefore processed - // == verified_txs_len, meaning unprocessed_indexes.len() == 0. If so, it means the - // only reason there are unprocessed message is due to slot advancing, therefore - // following code will push all not-too-old txs in the remaining recved queue into - // buffer. - // If that is the case, then could following happen? - // - received many packets (eg mms has many msgs) - // - packets in first msgs were not fully processed by process_packets_transactions() - // - all packets in 2nd and beyound packets will be pushed into buffer - // - this call exits to upper-loop, where consume_buffer() is called to process buffer - // this isn't necessary bad, prolly for a good reason, but it'd badly mess up with - // reported metrics, perhaps give s more reason to add qos_stats let mut handle_retryable_packets_time = Measure::start("handle_retryable_packets"); let next_leader = poh.lock().unwrap().next_slot_leader(); // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones @@ -1481,10 +1405,6 @@ impl BankingStage { duplicates, banking_stage_stats, ); - // TODO TAO - not sure if we want to collect this: - // qos_service.accumulate_buffered_retryable_txs_due_to_bank_expires(unprocessed_indexes.len(), - // id, bank.slot()) - // TODO TAO - same as above, can log the drop and newly buffered details. } handle_retryable_packets_time.stop(); banking_stage_stats @@ -2319,10 +2239,7 @@ mod tests { 0, None, &gossip_vote_sender, - &QosService::new( - Arc::new(RwLock::new(CostModel::default())), - 1, - ), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ) .0 .unwrap(); @@ -2364,10 +2281,7 @@ mod tests { 0, None, &gossip_vote_sender, - &QosService::new( - Arc::new(RwLock::new(CostModel::default())), - 1 - ), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ) .0, Err(PohRecorderError::MaxHeightReached) @@ -2455,10 +2369,7 @@ mod tests { 0, None, &gossip_vote_sender, - &QosService::new( - Arc::new(RwLock::new(CostModel::default())), - 1, - ), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ); poh_recorder @@ -2567,10 +2478,7 @@ mod tests { &recorder, None, &gossip_vote_sender, - &QosService::new( - Arc::new(RwLock::new(CostModel::default())), - 1, - ), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ); assert_eq!(processed_transactions_count, 0,); @@ -2665,10 +2573,7 @@ mod tests { enable_cpi_and_log_storage: false, }), &gossip_vote_sender, - &QosService::new( - Arc::new(RwLock::new(CostModel::default())), - 1, - ), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ); transaction_status_service.join().unwrap(); @@ -2796,10 +2701,7 @@ mod tests { None::>, &BankingStageStats::default(), &recorder, - &QosService::new( - Arc::new(RwLock::new(CostModel::default())), - 1, - ), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ); assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions); // When the poh recorder has a bank, should process all non conflicting buffered packets. @@ -2816,10 +2718,7 @@ mod tests { None::>, &BankingStageStats::default(), &recorder, - &QosService::new( - Arc::new(RwLock::new(CostModel::default())), - 1, - ), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ); if num_expected_unprocessed == 0 { assert!(buffered_packets.is_empty()) @@ -2885,10 +2784,7 @@ mod tests { test_fn, &BankingStageStats::default(), &recorder, - &QosService::new( - Arc::new(RwLock::new(CostModel::default())), - 1, - ), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ); // Check everything is correct. All indexes after `interrupted_iteration` diff --git a/core/src/qos_service.rs b/core/src/qos_service.rs index 2de1bb7d71ffdb..02938d7852947c 100644 --- a/core/src/qos_service.rs +++ b/core/src/qos_service.rs @@ -144,13 +144,44 @@ impl QosService { select_results } - // metrics are reported with bank slot + // metrics are reported by bank slot pub fn report_metrics(&self, bank: Arc) { self.report_sender .send(bank) .unwrap_or_else(|err| warn!("qos service report metrics failed: {:?}", err)); } + // metrics accumulating apis + pub fn accumulate_tpu_ingested_packets_count(&self, count: u64) { + self.metrics + .tpu_ingested_packets_count + .fetch_add(count, Ordering::Relaxed); + } + + pub fn accumulate_tpu_buffered_packets_count(&self, count: u64) { + self.metrics + .tpu_buffered_packets_count + .fetch_add(count, Ordering::Relaxed); + } + + pub fn accumulated_verified_txs_count(&self, count: u64) { + self.metrics + .verified_txs_count + .fetch_add(count, Ordering::Relaxed); + } + + pub fn accumulated_processed_txs_count(&self, count: u64) { + self.metrics + .processed_txs_count + .fetch_add(count, Ordering::Relaxed); + } + + pub fn accumulated_retryable_txs_count(&self, count: u64) { + self.metrics + .retryable_txs_count + .fetch_add(count, Ordering::Relaxed); + } + fn reporting_loop( running_flag: Arc, metrics: Arc, @@ -172,6 +203,27 @@ struct QosServiceMetrics { // and other transactions. id: u32, + // aggregate metrics per slot + slot: AtomicU64, + + // accumulated number of live packets TPU received from verified receiver for processing. + tpu_ingested_packets_count: AtomicU64, + + // accumulated number of live packets TPU put into buffer due to no active bank. + tpu_buffered_packets_count: AtomicU64, + + // accumulated number of verified txs, which excludes unsanitized transactions and + // non-vote transactions when in vote-only mode from ingested packets + verified_txs_count: AtomicU64, + + // accumulated number of transactions been processed, includes those landed and those to be + // retriued (due to AccountInUse, and other QoS related reasons) + processed_txs_count: AtomicU64, + + // accumulated number of transactions buffered for retry, often due to AccountInUse and QoS + // reasons, includes retried_txs_per_block_limit_count and retried_txs_per_account_limit_count + retryable_txs_count: AtomicU64, + // accumulated time in micro-sec spent in computing transaction cost. It is the main performance // overhead introduced by cost_model compute_cost_time: AtomicU64, @@ -204,43 +256,71 @@ impl QosServiceMetrics { } pub fn report(&self, bank_slot: Slot) { - datapoint_info!( - "qos-service-stats", - ("id", self.id as i64, i64), - ("bank_slot", bank_slot as i64, i64), - ( - "compute_cost_time", - self.compute_cost_time.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "compute_cost_count", - self.compute_cost_count.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "cost_tracking_time", - self.cost_tracking_time.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "selected_txs_count", - self.selected_txs_count.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "retried_txs_per_block_limit_count", - self.retried_txs_per_block_limit_count - .swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "retried_txs_per_account_limit_count", - self.retried_txs_per_account_limit_count - .swap(0, Ordering::Relaxed) as i64, - i64 - ), - ); + if bank_slot != self.slot.load(Ordering::Relaxed) { + datapoint_info!( + "qos-service-stats", + ("id", self.id as i64, i64), + ("bank_slot", bank_slot as i64, i64), + ( + "tpu_ingested_packets_count", + self.tpu_ingested_packets_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "tpu_buffered_packets_count", + self.tpu_buffered_packets_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "verified_txs_count", + self.verified_txs_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "processed_txs_count", + self.processed_txs_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "retryable_txs_count", + self.retryable_txs_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "compute_cost_time", + self.compute_cost_time.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "compute_cost_count", + self.compute_cost_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "cost_tracking_time", + self.cost_tracking_time.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "selected_txs_count", + self.selected_txs_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "retried_txs_per_block_limit_count", + self.retried_txs_per_block_limit_count + .swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "retried_txs_per_account_limit_count", + self.retried_txs_per_account_limit_count + .swap(0, Ordering::Relaxed) as i64, + i64 + ), + ); + self.slot.store(bank_slot, Ordering::Relaxed); + } } }