Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix BankingStage packet starvation (1.9) #25236

Merged
merged 5 commits into from
May 16, 2022
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 32 additions & 31 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128;
const NUM_VOTE_PROCESSING_THREADS: u32 = 2;
const MIN_THREADS_BANKING: u32 = 1;

const SLOT_BOUNDARY_CHECK_MS: u64 = 10;
brooksprumo marked this conversation as resolved.
Show resolved Hide resolved

pub struct ProcessTransactionBatchOutput {
// The number of transactions filtered out by the cost model
cost_model_throttled_transactions_count: usize,
Expand Down Expand Up @@ -771,7 +773,7 @@ impl BankingStage {
data_budget: &DataBudget,
qos_service: &Arc<QosService>,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> BufferedPacketsDecision {
) {
let (decision, make_decision_time) = Measure::this(
|_| {
let bank_start;
Expand Down Expand Up @@ -871,8 +873,6 @@ impl BankingStage {
}
_ => (),
}

decision
}

fn handle_forwarding(
Expand Down Expand Up @@ -952,11 +952,14 @@ impl BankingStage {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit);
let mut banking_stage_stats = BankingStageStats::new(id);

let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id);
let mut last_metrics_update = Instant::now();

loop {
let my_pubkey = cluster_info.id();
while !buffered_packet_batches.is_empty() {
let (decision, process_buffered_packets_time) = Measure::this(
if !buffered_packet_batches.is_empty() {
let (_, process_buffered_packets_time) = Measure::this(
|_| {
Self::process_buffered_packets(
&my_pubkey,
Expand All @@ -979,36 +982,34 @@ impl BankingStage {
);
slot_metrics_tracker
.increment_process_buffered_packets_us(process_buffered_packets_time.as_us());

if matches!(decision, BufferedPacketsDecision::Hold)
|| matches!(decision, BufferedPacketsDecision::ForwardAndHold)
{
// If we are waiting on a new bank,
// check the receiver for more transactions/for exiting
break;
}
}

let (_, slot_metrics_checker_check_slot_boundary_time) = Measure::this(
|_| {
let current_poh_bank = {
let poh = poh_recorder.lock().unwrap();
poh.bank_start()
};
slot_metrics_tracker.update_on_leader_slot_boundary(&current_poh_bank);
},
(),
"slot_metrics_checker_check_slot_boundary",
);
slot_metrics_tracker.increment_slot_metrics_check_slot_boundary_us(
slot_metrics_checker_check_slot_boundary_time.as_us(),
);
// avoid excessively locking poh_recorder
if last_metrics_update.elapsed() >= Duration::from_millis(SLOT_BOUNDARY_CHECK_MS) {
let (_, slot_metrics_checker_check_slot_boundary_time) = Measure::this(
|_| {
let current_poh_bank = {
let poh = poh_recorder.lock().unwrap();
poh.bank_start()
};
slot_metrics_tracker.update_on_leader_slot_boundary(&current_poh_bank);
},
(),
"slot_metrics_checker_check_slot_boundary",
);
slot_metrics_tracker.increment_slot_metrics_check_slot_boundary_us(
slot_metrics_checker_check_slot_boundary_time.as_us(),
);

last_metrics_update = Instant::now();
}

let recv_timeout = if !buffered_packet_batches.is_empty() {
// If packets are buffered, let's wait for less time on recv from the channel.
// This helps detect the next leader faster, and processing the buffered
// packets quickly
Duration::from_millis(10)
// If there are buffered packets, run the equivalent of try_recv to try reading more
// packets. This prevents starving BankingStage::consume_buffered_packets due to
// buffered_packet_batches containing transactions that exceed the cost model for
// the current bank.
Duration::from_millis(0)
} else {
// Default wait time
Duration::from_millis(100)
Expand Down