Skip to content

Commit

Permalink
banking forward tpu vote
Browse files Browse the repository at this point in the history
  • Loading branch information
tao-stones committed Sep 24, 2021
1 parent 892d35b commit 1055778
Showing 1 changed file with 23 additions and 19 deletions.
42 changes: 23 additions & 19 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ pub enum BufferedPacketsDecision {
Hold,
}

pub enum ForwardOption {
NotForward,
ForwardTpuVote,
ForwardTransaction,
}

impl BankingStage {
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
#[allow(clippy::new_ret_no_self)]
Expand Down Expand Up @@ -292,13 +298,13 @@ impl BankingStage {
assert!(num_threads >= MIN_THREADS_VOTES + MIN_THREADS_BANKING);
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
.map(|i| {
let (verified_receiver, enable_forwarding) = match i.cmp(&(num_threads - 2)) {
std::cmp::Ordering::Less => (verified_receiver.clone(), true),
std::cmp::Ordering::Equal => (vote_verified_receiver.clone(), false),
let (verified_receiver, forward_option) = match i.cmp(&(num_threads - 2)) {
std::cmp::Ordering::Less => (verified_receiver.clone(), ForwardOption::ForwardTransaction),
std::cmp::Ordering::Equal => (vote_verified_receiver.clone(), ForwardOption::ForwardTpuVote),
std::cmp::Ordering::Greater => {
// Disable forwarding of vote transactions
// from gossip. Note - votes can also arrive from tpu
(verified_vote_receiver.clone(), false)
(verified_vote_receiver.clone(), ForwardOption::NotForward)
}
};

Expand All @@ -319,7 +325,7 @@ impl BankingStage {
&poh_recorder,
&cluster_info,
&mut recv_start,
enable_forwarding,
forward_option,
i,
batch_limit,
transaction_status_sender,
Expand Down Expand Up @@ -525,7 +531,7 @@ impl BankingStage {
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &ClusterInfo,
buffered_packets: &mut UnprocessedPackets,
enable_forwarding: bool,
forward_option: &ForwardOption,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
Expand Down Expand Up @@ -575,7 +581,7 @@ impl BankingStage {
}
BufferedPacketsDecision::Forward => {
Self::handle_forwarding(
enable_forwarding,
forward_option,
cluster_info,
buffered_packets,
poh_recorder,
Expand All @@ -586,7 +592,7 @@ impl BankingStage {
}
BufferedPacketsDecision::ForwardAndHold => {
Self::handle_forwarding(
enable_forwarding,
forward_option,
cluster_info,
buffered_packets,
poh_recorder,
Expand All @@ -601,22 +607,20 @@ impl BankingStage {
}

fn handle_forwarding(
enable_forwarding: bool,
forward_option: &ForwardOption,
cluster_info: &ClusterInfo,
buffered_packets: &mut UnprocessedPackets,
poh_recorder: &Arc<Mutex<PohRecorder>>,
socket: &UdpSocket,
hold: bool,
data_budget: &DataBudget,
) {
if !enable_forwarding {
if !hold {
buffered_packets.clear();
}
return;
}

let addr = match next_leader_tpu_forwards(cluster_info, poh_recorder) {
let addr = match forward_option {
ForwardOption::NotForward => {if !hold { buffered_packets.clear(); } return},
ForwardOption::ForwardTransaction => next_leader_tpu_forwards(cluster_info, poh_recorder),
ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder),
};
let addr = match addr {
Some(addr) => addr,
None => return,
};
Expand All @@ -638,7 +642,7 @@ impl BankingStage {
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &ClusterInfo,
recv_start: &mut Instant,
enable_forwarding: bool,
forward_option: ForwardOption,
id: u32,
batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>,
Expand All @@ -658,7 +662,7 @@ impl BankingStage {
&poh_recorder,
cluster_info,
&mut buffered_packets,
enable_forwarding,
&forward_option,
transaction_status_sender.clone(),
&gossip_vote_sender,
&banking_stage_stats,
Expand Down

0 comments on commit 1055778

Please sign in to comment.