Skip to content

Commit

Permalink
Add feature to send to tpu vote port
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed Sep 20, 2021
1 parent 109ff01 commit fc68637
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 23 deletions.
35 changes: 23 additions & 12 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! can do its processing in parallel with signature verification on the GPU.
use crate::{
cluster_info::ClusterInfo,
contact_info::ContactInfo,
data_budget::DataBudget,
packet_hasher::PacketHasher,
poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder, WorkingBankEntry},
Expand Down Expand Up @@ -56,7 +57,7 @@ use std::{
collections::{HashMap, VecDeque},
env,
mem::size_of,
net::UdpSocket,
net::{SocketAddr, UdpSocket},
ops::DerefMut,
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
sync::mpsc::Receiver,
Expand Down Expand Up @@ -1430,27 +1431,37 @@ pub(crate) fn next_leader_tpu(
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
) -> Option<std::net::SocketAddr> {
if let Some(leader_pubkey) = poh_recorder
.lock()
.unwrap()
.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET)
{
cluster_info.lookup_contact_info(&leader_pubkey, |leader| leader.tpu)
} else {
None
}
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu)
}

fn next_leader_tpu_forwards(
cluster_info: &ClusterInfo,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Mutex<PohRecorder>,
) -> Option<std::net::SocketAddr> {
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_forwards)
}

pub(crate) fn next_leader_tpu_vote(
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
) -> Option<std::net::SocketAddr> {
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_vote)
}

fn next_leader_x<F>(
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
port_selector: F,
) -> Option<std::net::SocketAddr>
where
F: FnOnce(&ContactInfo) -> SocketAddr,
{
if let Some(leader_pubkey) = poh_recorder
.lock()
.unwrap()
.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET)
{
cluster_info.lookup_contact_info(&leader_pubkey, |leader| leader.tpu_forwards)
cluster_info.lookup_contact_info(&leader_pubkey, port_selector)
} else {
None
}
Expand Down
21 changes: 18 additions & 3 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4882,7 +4882,12 @@ pub(crate) mod tests {
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info);
crate::voting_service::VotingService::handle_vote(
&cluster_info,
&poh_recorder,
vote_info,
false,
);

let mut cursor = Cursor::default();
let (_, votes) = cluster_info.get_votes(&mut cursor);
Expand Down Expand Up @@ -4939,7 +4944,12 @@ pub(crate) mod tests {
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info);
crate::voting_service::VotingService::handle_vote(
&cluster_info,
&poh_recorder,
vote_info,
false,
);
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes.len(), 1);
let vote_tx = &votes[0];
Expand Down Expand Up @@ -5001,7 +5011,12 @@ pub(crate) mod tests {
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info);
crate::voting_service::VotingService::handle_vote(
&cluster_info,
&poh_recorder,
vote_info,
false,
);

assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time);
let (_, votes) = cluster_info.get_votes(&mut cursor);
Expand Down
8 changes: 6 additions & 2 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,12 @@ impl Tvu {
};

let (voting_sender, voting_receiver) = channel();
let voting_service =
VotingService::new(voting_receiver, cluster_info.clone(), poh_recorder.clone());
let voting_service = VotingService::new(
voting_receiver,
cluster_info.clone(),
poh_recorder.clone(),
bank_forks.clone(),
);

let replay_stage = ReplayStage::new(
replay_stage_config,
Expand Down
19 changes: 13 additions & 6 deletions core/src/voting_service.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::cluster_info::ClusterInfo;
use crate::poh_recorder::PohRecorder;
use solana_runtime::bank_forks::BankForks;
use solana_sdk::{clock::Slot, transaction::Transaction};
use std::{
sync::{mpsc::Receiver, Arc, Mutex},
sync::{mpsc::Receiver, Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle},
};

Expand Down Expand Up @@ -38,12 +39,15 @@ impl VotingService {
vote_receiver: Receiver<VoteOp>,
cluster_info: Arc<ClusterInfo>,
poh_recorder: Arc<Mutex<PohRecorder>>,
bank_forks: Arc<RwLock<BankForks>>,
) -> Self {
let thread_hdl = Builder::new()
.name("sol-vote-service".to_string())
.spawn(move || {
for vote_op in vote_receiver.iter() {
Self::handle_vote(&cluster_info, &poh_recorder, vote_op);
let rooted_bank = bank_forks.read().unwrap().root_bank().clone();
let send_to_tpu_vote_port = rooted_bank.send_to_tpu_vote_port_enabled();
Self::handle_vote(&cluster_info, &poh_recorder, vote_op, send_to_tpu_vote_port);
}
})
.unwrap();
Expand All @@ -54,11 +58,14 @@ impl VotingService {
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
vote_op: VoteOp,
send_to_tpu_vote_port: bool,
) {
let _ = cluster_info.send_vote(
vote_op.tx(),
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder),
);
let target_address = if send_to_tpu_vote_port {
crate::banking_stage::next_leader_tpu_vote(cluster_info, poh_recorder)
} else {
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder)
};
let _ = cluster_info.send_vote(vote_op.tx(), target_address);

match vote_op {
VoteOp::PushVote { tx, tower_slots } => {
Expand Down
5 changes: 5 additions & 0 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4881,6 +4881,11 @@ impl Bank {
.is_active(&feature_set::merge_nonce_error_into_system_error::id())
}

pub fn send_to_tpu_vote_port_enabled(&self) -> bool {
self.feature_set
.is_active(&feature_set::send_to_tpu_vote_port::id())
}

// Check if the wallclock time from bank creation to now has exceeded the allotted
// time for transaction processing
pub fn should_bank_still_be_processing_txs(
Expand Down
6 changes: 6 additions & 0 deletions sdk/src/feature_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ pub mod demote_program_write_locks {
solana_sdk::declare_id!("3E3jV7v9VcdJL8iYZUMax9DiDno8j7EWUVbhm9RtShj2");
}

pub mod send_to_tpu_vote_port {
// todo: update
solana_sdk::declare_id!("3E3jV7v9VcdJL8iYZUMax9DiDno8j7EWUVbhm9RtShj3");
}

lazy_static! {
/// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
Expand Down Expand Up @@ -211,6 +216,7 @@ lazy_static! {
(merge_nonce_error_into_system_error::id(), "merge NonceError into SystemError"),
(spl_token_v2_set_authority_fix::id(), "spl-token set_authority fix"),
(demote_program_write_locks::id(), "demote program write locks to readonly #19593"),
(send_to_tpu_vote_port::id(), "Send votes to the tpu vote port"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()
Expand Down

0 comments on commit fc68637

Please sign in to comment.