Skip to content

Commit

Permalink
ff cleanup: allow_votes_to_directly_update_vote_state and compact_vot… (
Browse files Browse the repository at this point in the history
#32967)

ff cleanup: allow_votes_to_directly_update_vote_state and compact_vote_state_updates
  • Loading branch information
AshwinSekar authored Aug 31, 2023
1 parent 1b9c9a3 commit 025651e
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 311 deletions.
75 changes: 23 additions & 52 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use {
bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{feature_set::allow_votes_to_directly_update_vote_state, timing::AtomicInterval},
solana_sdk::timing::AtomicInterval,
std::{
cmp, env,
sync::{
Expand Down Expand Up @@ -359,55 +359,32 @@ impl BankingStage {
TOTAL_BUFFERED_PACKETS / ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize);
// Keeps track of extraneous vote transactions for the vote threads
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());
let should_split_voting_threads = bank_forks
.read()
.map(|bank_forks| {
let bank = bank_forks.root_bank();
bank.feature_set
.is_active(&allow_votes_to_directly_update_vote_state::id())
})
.unwrap_or(false);
// Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
.map(|id| {
let (packet_receiver, unprocessed_transaction_storage) =
match (id, should_split_voting_threads) {
(0, false) => (
gossip_vote_receiver.clone(),
UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::with_capacity(batch_limit),
ThreadType::Voting(VoteSource::Gossip),
),
),
(0, true) => (
gossip_vote_receiver.clone(),
UnprocessedTransactionStorage::new_vote_storage(
latest_unprocessed_votes.clone(),
VoteSource::Gossip,
),
),
(1, false) => (
tpu_vote_receiver.clone(),
UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::with_capacity(batch_limit),
ThreadType::Voting(VoteSource::Tpu),
),
let (packet_receiver, unprocessed_transaction_storage) = match id {
0 => (
gossip_vote_receiver.clone(),
UnprocessedTransactionStorage::new_vote_storage(
latest_unprocessed_votes.clone(),
VoteSource::Gossip,
),
(1, true) => (
tpu_vote_receiver.clone(),
UnprocessedTransactionStorage::new_vote_storage(
latest_unprocessed_votes.clone(),
VoteSource::Tpu,
),
),
1 => (
tpu_vote_receiver.clone(),
UnprocessedTransactionStorage::new_vote_storage(
latest_unprocessed_votes.clone(),
VoteSource::Tpu,
),
_ => (
non_vote_receiver.clone(),
UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::with_capacity(batch_limit),
ThreadType::Transactions,
),
),
_ => (
non_vote_receiver.clone(),
UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::with_capacity(batch_limit),
ThreadType::Transactions,
),
};
),
};

let mut packet_receiver =
PacketReceiver::new(id, packet_receiver, bank_forks.clone());
Expand Down Expand Up @@ -609,9 +586,7 @@ mod tests {
poh_service::PohService,
},
solana_runtime::{
bank::Bank,
bank_forks::BankForks,
genesis_utils::{activate_feature, bootstrap_validator_stake_lamports},
bank::Bank, bank_forks::BankForks, genesis_utils::bootstrap_validator_stake_lamports,
},
solana_sdk::{
hash::Hash,
Expand Down Expand Up @@ -1126,14 +1101,10 @@ mod tests {
fn test_unprocessed_transaction_storage_full_send() {
solana_logger::setup();
let GenesisConfigInfo {
mut genesis_config,
genesis_config,
mint_keypair,
..
} = create_slow_genesis_config(10000);
activate_feature(
&mut genesis_config,
allow_votes_to_directly_update_vote_state::id(),
);
let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank = bank_forks.read().unwrap().get(0).unwrap();
Expand Down
20 changes: 0 additions & 20 deletions core/src/cluster_info_vote_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use {
},
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT},
feature_set::allow_votes_to_directly_update_vote_state,
hash::Hash,
pubkey::Pubkey,
signature::Signature,
Expand Down Expand Up @@ -265,7 +264,6 @@ impl ClusterInfoVoteListener {
})
.unwrap()
};
let bank_forks_clone = bank_forks.clone();
let bank_send_thread = {
let exit = exit.clone();
Builder::new()
Expand All @@ -276,7 +274,6 @@ impl ClusterInfoVoteListener {
verified_vote_label_packets_receiver,
poh_recorder,
&verified_packets_sender,
bank_forks_clone,
);
})
.unwrap()
Expand Down Expand Up @@ -382,17 +379,10 @@ impl ClusterInfoVoteListener {
verified_vote_label_packets_receiver: VerifiedLabelVotePacketsReceiver,
poh_recorder: Arc<RwLock<PohRecorder>>,
verified_packets_sender: &BankingPacketSender,
bank_forks: Arc<RwLock<BankForks>>,
) -> Result<()> {
let mut verified_vote_packets = VerifiedVotePackets::default();
let mut time_since_lock = Instant::now();
let mut bank_vote_sender_state_option: Option<BankVoteSenderState> = None;
let mut is_tower_full_vote_enabled = bank_forks
.read()
.unwrap()
.root_bank()
.feature_set
.is_active(&allow_votes_to_directly_update_vote_state::id());

loop {
if exit.load(Ordering::Relaxed) {
Expand All @@ -407,7 +397,6 @@ impl ClusterInfoVoteListener {
if let Err(e) = verified_vote_packets.receive_and_process_vote_packets(
&verified_vote_label_packets_receiver,
would_be_leader,
is_tower_full_vote_enabled,
) {
match e {
Error::RecvTimeout(RecvTimeoutError::Disconnected)
Expand All @@ -428,15 +417,6 @@ impl ClusterInfoVoteListener {
verified_packets_sender,
&verified_vote_packets,
)?;
// Check if we've crossed the feature boundary
if !is_tower_full_vote_enabled {
is_tower_full_vote_enabled = bank_forks
.read()
.unwrap()
.root_bank()
.feature_set
.is_active(&allow_votes_to_directly_update_vote_state::id());
}
}
}
}
Expand Down
129 changes: 12 additions & 117 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use {
},
solana_sdk::{
clock::{Slot, UnixTimestamp},
feature_set,
hash::Hash,
instruction::Instruction,
pubkey::Pubkey,
Expand Down Expand Up @@ -508,50 +507,16 @@ impl Tower {
self.last_vote_tx_blockhash = Some(new_vote_tx_blockhash);
}

// Returns true if we have switched the new vote instruction that directly sets vote state
pub(crate) fn is_direct_vote_state_update_enabled(bank: &Bank) -> bool {
bank.feature_set
.is_active(&feature_set::allow_votes_to_directly_update_vote_state::id())
}

fn apply_vote_and_generate_vote_diff(
local_vote_state: &mut VoteState,
slot: Slot,
hash: Hash,
last_voted_slot_in_bank: Option<Slot>,
) -> VoteTransaction {
let vote = Vote::new(vec![slot], hash);
let _ignored = process_vote_unchecked(local_vote_state, vote);
let slots = if let Some(last_voted_slot) = last_voted_slot_in_bank {
local_vote_state
.votes
.iter()
.map(|v| v.slot())
.skip_while(|s| *s <= last_voted_slot)
.collect()
} else {
local_vote_state.votes.iter().map(|v| v.slot()).collect()
};
VoteTransaction::from(Vote::new(slots, hash))
}

pub fn last_voted_slot_in_bank(bank: &Bank, vote_account_pubkey: &Pubkey) -> Option<Slot> {
let vote_account = bank.get_vote_account(vote_account_pubkey)?;
let vote_state = vote_account.vote_state();
vote_state.as_ref().ok()?.last_voted_slot()
}

pub fn record_bank_vote(&mut self, bank: &Bank, vote_account_pubkey: &Pubkey) -> Option<Slot> {
let last_voted_slot_in_bank = Self::last_voted_slot_in_bank(bank, vote_account_pubkey);

pub fn record_bank_vote(&mut self, bank: &Bank) -> Option<Slot> {
// Returns the new root if one is made after applying a vote for the given bank to
// `self.vote_state`
self.record_bank_vote_and_update_lockouts(
bank.slot(),
bank.hash(),
last_voted_slot_in_bank,
Self::is_direct_vote_state_update_enabled(bank),
)
self.record_bank_vote_and_update_lockouts(bank.slot(), bank.hash())
}

/// If we've recently updated the vote state by applying a new vote
Expand All @@ -575,34 +540,19 @@ impl Tower {
&mut self,
vote_slot: Slot,
vote_hash: Hash,
last_voted_slot_in_bank: Option<Slot>,
is_direct_vote_state_update_enabled: bool,
) -> Option<Slot> {
trace!("{} record_vote for {}", self.node_pubkey, vote_slot);
let old_root = self.root();

if is_direct_vote_state_update_enabled {
let vote = Vote::new(vec![vote_slot], vote_hash);
let result = process_vote_unchecked(&mut self.vote_state, vote);
if result.is_err() {
error!(
"Error while recording vote {} {} in local tower {:?}",
vote_slot, vote_hash, result
);
}
self.update_last_vote_from_vote_state(vote_hash);
} else {
let mut new_vote = Self::apply_vote_and_generate_vote_diff(
&mut self.vote_state,
vote_slot,
vote_hash,
last_voted_slot_in_bank,
let vote = Vote::new(vec![vote_slot], vote_hash);
let result = process_vote_unchecked(&mut self.vote_state, vote);
if result.is_err() {
error!(
"Error while recording vote {} {} in local tower {:?}",
vote_slot, vote_hash, result
);

new_vote
.set_timestamp(self.maybe_timestamp(self.last_voted_slot().unwrap_or_default()));
self.last_vote = new_vote;
};
}
self.update_last_vote_from_vote_state(vote_hash);

let new_root = self.root();

Expand All @@ -620,7 +570,7 @@ impl Tower {

#[cfg(test)]
pub fn record_vote(&mut self, slot: Slot, hash: Hash) -> Option<Slot> {
self.record_bank_vote_and_update_lockouts(slot, hash, self.last_voted_slot(), true)
self.record_bank_vote_and_update_lockouts(slot, hash)
}

/// Used for tests
Expand Down Expand Up @@ -1579,7 +1529,7 @@ pub mod test {
signature::Signer,
slot_history::SlotHistory,
},
solana_vote_program::vote_state::{self, Vote, VoteStateVersions, MAX_LOCKOUT_HISTORY},
solana_vote_program::vote_state::{Vote, VoteStateVersions, MAX_LOCKOUT_HISTORY},
std::{
collections::{HashMap, VecDeque},
fs::{remove_file, OpenOptions},
Expand Down Expand Up @@ -2530,61 +2480,6 @@ pub mod test {
assert_eq!(voted_stakes[&2], 1);
}

#[test]
fn test_apply_vote_and_generate_vote_diff() {
let mut local = VoteState::default();
let vote = Tower::apply_vote_and_generate_vote_diff(&mut local, 0, Hash::default(), None);
assert_eq!(local.votes.len(), 1);
assert_eq!(vote.slots(), vec![0]);
assert_eq!(local.tower(), vec![0]);
}

#[test]
fn test_apply_vote_and_generate_vote_diff_dup_vote() {
let mut local = VoteState::default();
// If `latest_voted_slot_in_bank == Some(0)`, then we already have a vote for 0. Adding
// another vote for slot 0 should return an empty vote as the diff.
let vote =
Tower::apply_vote_and_generate_vote_diff(&mut local, 0, Hash::default(), Some(0));
assert!(vote.is_empty());
}

#[test]
fn test_apply_vote_and_generate_vote_diff_next_vote() {
let mut local = VoteState::default();
let vote = Vote {
slots: vec![0],
hash: Hash::default(),
timestamp: None,
};
let _ = vote_state::process_vote_unchecked(&mut local, vote);
assert_eq!(local.votes.len(), 1);
let vote =
Tower::apply_vote_and_generate_vote_diff(&mut local, 1, Hash::default(), Some(0));
assert_eq!(vote.slots(), vec![1]);
assert_eq!(local.tower(), vec![0, 1]);
}

#[test]
fn test_apply_vote_and_generate_vote_diff_next_after_expired_vote() {
let mut local = VoteState::default();
let vote = Vote {
slots: vec![0],
hash: Hash::default(),
timestamp: None,
};
let _ = vote_state::process_vote_unchecked(&mut local, vote);
assert_eq!(local.votes.len(), 1);

// First vote expired, so should be evicted from tower. Thus even with
// `latest_voted_slot_in_bank == Some(0)`, the first vote slot won't be
// observable in any of the results.
let vote =
Tower::apply_vote_and_generate_vote_diff(&mut local, 3, Hash::default(), Some(0));
assert_eq!(vote.slots(), vec![3]);
assert_eq!(local.tower(), vec![3]);
}

#[test]
fn test_check_vote_threshold_forks() {
// Create the ancestor relationships
Expand Down
Loading

0 comments on commit 025651e

Please sign in to comment.