diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index a0a83dee88a6b8..941c079fd692c3 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -93,6 +93,8 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; const NUM_VOTE_PROCESSING_THREADS: u32 = 2; const MIN_THREADS_BANKING: u32 = 1; +const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10); + pub struct ProcessTransactionBatchOutput { // The number of transactions filtered out by the cost model cost_model_throttled_transactions_count: usize, @@ -771,7 +773,7 @@ impl BankingStage { data_budget: &DataBudget, qos_service: &Arc, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, - ) -> BufferedPacketsDecision { + ) { let (decision, make_decision_time) = Measure::this( |_| { let bank_start; @@ -871,8 +873,6 @@ impl BankingStage { } _ => (), } - - decision } fn handle_forwarding( @@ -952,11 +952,14 @@ impl BankingStage { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit); let mut banking_stage_stats = BankingStageStats::new(id); + let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); + let mut last_metrics_update = Instant::now(); + loop { let my_pubkey = cluster_info.id(); - while !buffered_packet_batches.is_empty() { - let (decision, process_buffered_packets_time) = Measure::this( + if !buffered_packet_batches.is_empty() { + let (_, process_buffered_packets_time) = Measure::this( |_| { Self::process_buffered_packets( &my_pubkey, @@ -979,36 +982,34 @@ impl BankingStage { ); slot_metrics_tracker .increment_process_buffered_packets_us(process_buffered_packets_time.as_us()); - - if matches!(decision, BufferedPacketsDecision::Hold) - || matches!(decision, BufferedPacketsDecision::ForwardAndHold) - { - // If we are waiting on a new bank, - // check the receiver for more transactions/for exiting - break; - } } - let (_, slot_metrics_checker_check_slot_boundary_time) = Measure::this( - |_| { - let current_poh_bank = { - let poh = poh_recorder.lock().unwrap(); - poh.bank_start() - }; - slot_metrics_tracker.update_on_leader_slot_boundary(¤t_poh_bank); - }, - (), - "slot_metrics_checker_check_slot_boundary", - ); - slot_metrics_tracker.increment_slot_metrics_check_slot_boundary_us( - slot_metrics_checker_check_slot_boundary_time.as_us(), - ); + // avoid excessively locking poh_recorder + if last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD { + let (_, slot_metrics_checker_check_slot_boundary_time) = Measure::this( + |_| { + let current_poh_bank = { + let poh = poh_recorder.lock().unwrap(); + poh.bank_start() + }; + slot_metrics_tracker.update_on_leader_slot_boundary(¤t_poh_bank); + }, + (), + "slot_metrics_checker_check_slot_boundary", + ); + slot_metrics_tracker.increment_slot_metrics_check_slot_boundary_us( + slot_metrics_checker_check_slot_boundary_time.as_us(), + ); + + last_metrics_update = Instant::now(); + } let recv_timeout = if !buffered_packet_batches.is_empty() { - // If packets are buffered, let's wait for less time on recv from the channel. - // This helps detect the next leader faster, and processing the buffered - // packets quickly - Duration::from_millis(10) + // If there are buffered packets, run the equivalent of try_recv to try reading more + // packets. This prevents starving BankingStage::consume_buffered_packets due to + // buffered_packet_batches containing transactions that exceed the cost model for + // the current bank. + Duration::from_millis(0) } else { // Default wait time Duration::from_millis(100)