From 18682ac98f30b4033e3a07866ffabc1ad462704c Mon Sep 17 00:00:00 2001 From: Lucas B Date: Sun, 15 May 2022 15:13:11 -0500 Subject: [PATCH 1/5] do try_recv and one iter of process_buffered_packets --- core/src/banking_stage.rs | 69 ++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 40 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index a0a83dee88a6b8..fba96813bc3283 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -771,7 +771,7 @@ impl BankingStage { data_budget: &DataBudget, qos_service: &Arc, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, - ) -> BufferedPacketsDecision { + ) { let (decision, make_decision_time) = Measure::this( |_| { let bank_start; @@ -871,8 +871,6 @@ impl BankingStage { } _ => (), } - - decision } fn handle_forwarding( @@ -955,39 +953,29 @@ impl BankingStage { let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); loop { let my_pubkey = cluster_info.id(); - while !buffered_packet_batches.is_empty() { - let (decision, process_buffered_packets_time) = Measure::this( - |_| { - Self::process_buffered_packets( - &my_pubkey, - &socket, - poh_recorder, - cluster_info, - &mut buffered_packet_batches, - &forward_option, - transaction_status_sender.clone(), - &gossip_vote_sender, - &banking_stage_stats, - &recorder, - data_budget, - &qos_service, - &mut slot_metrics_tracker, - ) - }, - (), - "process_buffered_packets", - ); - slot_metrics_tracker - .increment_process_buffered_packets_us(process_buffered_packets_time.as_us()); - - if matches!(decision, BufferedPacketsDecision::Hold) - || matches!(decision, BufferedPacketsDecision::ForwardAndHold) - { - // If we are waiting on a new bank, - // check the receiver for more transactions/for exiting - break; - } - } + let (_, process_buffered_packets_time) = Measure::this( + |_| { + Self::process_buffered_packets( + &my_pubkey, + &socket, + poh_recorder, + cluster_info, + &mut buffered_packet_batches, + &forward_option, + transaction_status_sender.clone(), + &gossip_vote_sender, + &banking_stage_stats, + &recorder, + data_budget, + &qos_service, + &mut slot_metrics_tracker, + ) + }, + (), + "process_buffered_packets", + ); + slot_metrics_tracker + .increment_process_buffered_packets_us(process_buffered_packets_time.as_us()); let (_, slot_metrics_checker_check_slot_boundary_time) = Measure::this( |_| { @@ -1005,10 +993,11 @@ impl BankingStage { ); let recv_timeout = if !buffered_packet_batches.is_empty() { - // If packets are buffered, let's wait for less time on recv from the channel. - // This helps detect the next leader faster, and processing the buffered - // packets quickly - Duration::from_millis(10) + // If there are buffered packets, run the equivalent of try_recv to try reading more + // packets. This prevents starving BankingStage::consume_buffered_packets due to + // buffered_packet_batches containing transactions that exceed the cost model for + // the current bank. + Duration::from_millis(0) } else { // Default wait time Duration::from_millis(100) From c19f3ce557e8a35b7dd2e03fd0d331495ec9e625 Mon Sep 17 00:00:00 2001 From: Lucas B Date: Sun, 15 May 2022 15:32:07 -0500 Subject: [PATCH 2/5] todo --- core/src/banking_stage.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index fba96813bc3283..b87bbd13409d1b 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -980,6 +980,7 @@ impl BankingStage { let (_, slot_metrics_checker_check_slot_boundary_time) = Measure::this( |_| { let current_poh_bank = { + // TODO (B): constant spinning here may have an impact on poh, discuss in PR let poh = poh_recorder.lock().unwrap(); poh.bank_start() }; From 344b66b4b2adaf2dec9782b5d54aaae6c4a086bb Mon Sep 17 00:00:00 2001 From: Lucas B Date: Sun, 15 May 2022 17:01:09 -0500 Subject: [PATCH 3/5] dont lock poh too much --- core/src/banking_stage.rs | 89 ++++++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 38 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index b87bbd13409d1b..f9f0162c4b05a8 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -93,6 +93,8 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; const NUM_VOTE_PROCESSING_THREADS: u32 = 2; const MIN_THREADS_BANKING: u32 = 1; +const SLOT_BOUNDARY_CHECK_MS: u64 = 10; + pub struct ProcessTransactionBatchOutput { // The number of transactions filtered out by the cost model cost_model_throttled_transactions_count: usize, @@ -951,47 +953,58 @@ impl BankingStage { let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit); let mut banking_stage_stats = BankingStageStats::new(id); let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); + + let mut last_slot_metrics_track = Instant::now(); + loop { let my_pubkey = cluster_info.id(); - let (_, process_buffered_packets_time) = Measure::this( - |_| { - Self::process_buffered_packets( - &my_pubkey, - &socket, - poh_recorder, - cluster_info, - &mut buffered_packet_batches, - &forward_option, - transaction_status_sender.clone(), - &gossip_vote_sender, - &banking_stage_stats, - &recorder, - data_budget, - &qos_service, - &mut slot_metrics_tracker, - ) - }, - (), - "process_buffered_packets", - ); - slot_metrics_tracker - .increment_process_buffered_packets_us(process_buffered_packets_time.as_us()); + if !buffered_packet_batches.is_empty() { + let (_, process_buffered_packets_time) = Measure::this( + |_| { + Self::process_buffered_packets( + &my_pubkey, + &socket, + poh_recorder, + cluster_info, + &mut buffered_packet_batches, + &forward_option, + transaction_status_sender.clone(), + &gossip_vote_sender, + &banking_stage_stats, + &recorder, + data_budget, + &qos_service, + &mut slot_metrics_tracker, + ) + }, + (), + "process_buffered_packets", + ); + slot_metrics_tracker + .increment_process_buffered_packets_us(process_buffered_packets_time.as_us()); + } - let (_, slot_metrics_checker_check_slot_boundary_time) = Measure::this( - |_| { - let current_poh_bank = { - // TODO (B): constant spinning here may have an impact on poh, discuss in PR - let poh = poh_recorder.lock().unwrap(); - poh.bank_start() - }; - slot_metrics_tracker.update_on_leader_slot_boundary(¤t_poh_bank); - }, - (), - "slot_metrics_checker_check_slot_boundary", - ); - slot_metrics_tracker.increment_slot_metrics_check_slot_boundary_us( - slot_metrics_checker_check_slot_boundary_time.as_us(), - ); + // avoid excessively locking poh_recorder + if Instant::now().duration_since(last_slot_metrics_track) + >= Duration::from_millis(SLOT_BOUNDARY_CHECK_MS) + { + let (_, slot_metrics_checker_check_slot_boundary_time) = Measure::this( + |_| { + let current_poh_bank = { + let poh = poh_recorder.lock().unwrap(); + poh.bank_start() + }; + slot_metrics_tracker.update_on_leader_slot_boundary(¤t_poh_bank); + }, + (), + "slot_metrics_checker_check_slot_boundary", + ); + slot_metrics_tracker.increment_slot_metrics_check_slot_boundary_us( + slot_metrics_checker_check_slot_boundary_time.as_us(), + ); + + last_slot_metrics_track = Instant::now(); + } let recv_timeout = if !buffered_packet_batches.is_empty() { // If there are buffered packets, run the equivalent of try_recv to try reading more From 7cc19c21ea43aafb4a157823ba822592c1dacd3b Mon Sep 17 00:00:00 2001 From: Lucas B Date: Sun, 15 May 2022 19:51:14 -0500 Subject: [PATCH 4/5] dummy --- core/src/banking_stage.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index f9f0162c4b05a8..bedb4e8b42c376 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -952,9 +952,9 @@ impl BankingStage { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit); let mut banking_stage_stats = BankingStageStats::new(id); - let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); - let mut last_slot_metrics_track = Instant::now(); + let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); + let mut last_metrics_update = Instant::now(); loop { let my_pubkey = cluster_info.id(); @@ -985,9 +985,7 @@ impl BankingStage { } // avoid excessively locking poh_recorder - if Instant::now().duration_since(last_slot_metrics_track) - >= Duration::from_millis(SLOT_BOUNDARY_CHECK_MS) - { + if last_metrics_update.elapsed() >= Duration::from_millis(SLOT_BOUNDARY_CHECK_MS) { let (_, slot_metrics_checker_check_slot_boundary_time) = Measure::this( |_| { let current_poh_bank = { @@ -1003,7 +1001,7 @@ impl BankingStage { slot_metrics_checker_check_slot_boundary_time.as_us(), ); - last_slot_metrics_track = Instant::now(); + last_metrics_update = Instant::now(); } let recv_timeout = if !buffered_packet_batches.is_empty() { From 32f219761c21ba3ec57ad47c2cf2c4ce7e0aafef Mon Sep 17 00:00:00 2001 From: Lucas B Date: Sun, 15 May 2022 21:12:29 -0500 Subject: [PATCH 5/5] make check period constant Duration --- core/src/banking_stage.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index bedb4e8b42c376..941c079fd692c3 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -93,7 +93,7 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; const NUM_VOTE_PROCESSING_THREADS: u32 = 2; const MIN_THREADS_BANKING: u32 = 1; -const SLOT_BOUNDARY_CHECK_MS: u64 = 10; +const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10); pub struct ProcessTransactionBatchOutput { // The number of transactions filtered out by the cost model @@ -985,7 +985,7 @@ impl BankingStage { } // avoid excessively locking poh_recorder - if last_metrics_update.elapsed() >= Duration::from_millis(SLOT_BOUNDARY_CHECK_MS) { + if last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD { let (_, slot_metrics_checker_check_slot_boundary_time) = Measure::this( |_| { let current_poh_bank = {