Skip to content

Commit

Permalink
Fix BankingStage packet starvation (1.9) (#25236)
Browse files Browse the repository at this point in the history
* do try_recv and one iter of process_buffered_packets
  • Loading branch information
buffalu authored May 16, 2022
1 parent da1a718 commit 3c651eb
Showing 1 changed file with 32 additions and 31 deletions.
63 changes: 32 additions & 31 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 64;
const NUM_VOTE_PROCESSING_THREADS: u32 = 2;
const MIN_THREADS_BANKING: u32 = 1;

const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10);

pub struct ProcessTransactionBatchOutput {
// The number of transactions filtered out by the cost model
cost_model_throttled_transactions_count: usize,
Expand Down Expand Up @@ -776,7 +778,7 @@ impl BankingStage {
data_budget: &DataBudget,
qos_service: &Arc<QosService>,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> BufferedPacketsDecision {
) {
let (decision, make_decision_time) = Measure::this(
|_| {
let bank_start;
Expand Down Expand Up @@ -876,8 +878,6 @@ impl BankingStage {
}
_ => (),
}

decision
}

fn handle_forwarding(
Expand Down Expand Up @@ -957,11 +957,14 @@ impl BankingStage {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit);
let mut banking_stage_stats = BankingStageStats::new(id);

let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id);
let mut last_metrics_update = Instant::now();

loop {
let my_pubkey = cluster_info.id();
while !buffered_packet_batches.is_empty() {
let (decision, process_buffered_packets_time) = Measure::this(
if !buffered_packet_batches.is_empty() {
let (_, process_buffered_packets_time) = Measure::this(
|_| {
Self::process_buffered_packets(
&my_pubkey,
Expand All @@ -984,36 +987,34 @@ impl BankingStage {
);
slot_metrics_tracker
.increment_process_buffered_packets_us(process_buffered_packets_time.as_us());

if matches!(decision, BufferedPacketsDecision::Hold)
|| matches!(decision, BufferedPacketsDecision::ForwardAndHold)
{
// If we are waiting on a new bank,
// check the receiver for more transactions/for exiting
break;
}
}

let (_, slot_metrics_checker_check_slot_boundary_time) = Measure::this(
|_| {
let current_poh_bank = {
let poh = poh_recorder.lock().unwrap();
poh.bank_start()
};
slot_metrics_tracker.update_on_leader_slot_boundary(&current_poh_bank);
},
(),
"slot_metrics_checker_check_slot_boundary",
);
slot_metrics_tracker.increment_slot_metrics_check_slot_boundary_us(
slot_metrics_checker_check_slot_boundary_time.as_us(),
);
// avoid excessively locking poh_recorder
if last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD {
let (_, slot_metrics_checker_check_slot_boundary_time) = Measure::this(
|_| {
let current_poh_bank = {
let poh = poh_recorder.lock().unwrap();
poh.bank_start()
};
slot_metrics_tracker.update_on_leader_slot_boundary(&current_poh_bank);
},
(),
"slot_metrics_checker_check_slot_boundary",
);
slot_metrics_tracker.increment_slot_metrics_check_slot_boundary_us(
slot_metrics_checker_check_slot_boundary_time.as_us(),
);

last_metrics_update = Instant::now();
}

let recv_timeout = if !buffered_packet_batches.is_empty() {
// If packets are buffered, let's wait for less time on recv from the channel.
// This helps detect the next leader faster, and processing the buffered
// packets quickly
Duration::from_millis(10)
// If there are buffered packets, run the equivalent of try_recv to try reading more
// packets. This prevents starving BankingStage::consume_buffered_packets due to
// buffered_packet_batches containing transactions that exceed the cost model for
// the current bank.
Duration::from_millis(0)
} else {
// Default wait time
Duration::from_millis(100)
Expand Down

0 comments on commit 3c651eb

Please sign in to comment.