diff --git a/src/fullnode.rs b/src/fullnode.rs index 61fda40a8ddd7f..232cd4dab1752d 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -8,7 +8,6 @@ use crate::entry::create_ticks; use crate::entry::next_entry_mut; use crate::entry::Entry; use crate::gossip_service::GossipService; -use crate::leader_schedule_utils; use crate::poh_recorder::PohRecorder; use crate::poh_service::{PohService, PohServiceConfig}; use crate::rpc_pubsub_service::PubSubService; @@ -58,14 +57,6 @@ impl NodeServices { } } -#[derive(Debug, PartialEq, Eq)] -pub enum FullnodeReturnType { - LeaderToValidatorRotation, - ValidatorToLeaderRotation, - LeaderToLeaderRotation, - ValidatorToValidatorRotation, -} - pub struct FullnodeConfig { pub sigverify_disabled: bool, pub voting_disabled: bool, @@ -272,7 +263,7 @@ impl Fullnode { } } - fn rotate(&mut self, rotation_info: TvuRotationInfo) -> FullnodeReturnType { + fn rotate(&mut self, rotation_info: TvuRotationInfo) { trace!( "{:?}: rotate for slot={} to leader={:?} using last_entry_id={:?}", self.id, @@ -280,7 +271,6 @@ impl Fullnode { rotation_info.leader_id, rotation_info.last_entry_id, ); - let was_leader = leader_schedule_utils::slot_leader(&rotation_info.bank) == self.id; if let Some(ref mut rpc_service) = self.rpc_service { // TODO: This is not the correct bank. Instead TVU should pass along the @@ -290,13 +280,7 @@ impl Fullnode { } if rotation_info.leader_id == self.id { - let transition = if was_leader { - debug!("{:?} remaining in leader role", self.id); - FullnodeReturnType::LeaderToLeaderRotation - } else { - debug!("{:?} rotating to leader role", self.id); - FullnodeReturnType::ValidatorToLeaderRotation - }; + debug!("{:?} rotating to leader role", self.id); let tpu_bank = self .bank_forks .read() @@ -304,11 +288,6 @@ impl Fullnode { .get(rotation_info.slot) .unwrap() .clone(); - assert_eq!( - tpu_bank.id(), - rotation_info.bank.id(), - "inconsistent switch to leader" - ); self.node_services.tpu.switch_to_leader( &tpu_bank, &self.poh_recorder, @@ -323,15 +302,7 @@ impl Fullnode { rotation_info.slot, &self.blocktree, ); - transition } else { - let transition = if was_leader { - debug!("{:?} rotating to validator role", self.id); - FullnodeReturnType::LeaderToValidatorRotation - } else { - debug!("{:?} remaining in validator role", self.id); - FullnodeReturnType::ValidatorToValidatorRotation - }; self.node_services.tpu.switch_to_forwarder( rotation_info.leader_id, self.tpu_sockets @@ -339,16 +310,12 @@ impl Fullnode { .map(|s| s.try_clone().expect("Failed to clone TPU sockets")) .collect(), ); - transition } } // Runs a thread to manage node role transitions. The returned closure can be used to signal the // node to exit. - pub fn run( - mut self, - rotation_notifier: Option>, - ) -> impl FnOnce() { + pub fn run(mut self, rotation_notifier: Option>) -> impl FnOnce() { let (sender, receiver) = channel(); let exit = self.exit.clone(); let timeout = Duration::from_secs(1); @@ -365,15 +332,15 @@ impl Fullnode { trace!("{:?}: rotate at slot={}", self.id, rotation_info.slot); //TODO: this will be called by the TVU every time it votes //instead of here - self.poh_recorder.lock().unwrap().reset( - rotation_info.bank.tick_height(), - rotation_info.last_entry_id, - ); + self.poh_recorder + .lock() + .unwrap() + .reset(rotation_info.tick_height, rotation_info.last_entry_id); let slot = rotation_info.slot; let transition = self.rotate(rotation_info); debug!("role transition complete: {:?}", transition); if let Some(ref rotation_notifier) = rotation_notifier { - rotation_notifier.send((transition, slot)).unwrap(); + rotation_notifier.send(slot).unwrap(); } } Err(RecvTimeoutError::Timeout) => continue, @@ -604,10 +571,7 @@ mod tests { // Wait for the bootstrap leader to transition. Since there are no other nodes in the // cluster it will continue to be the leader - assert_eq!( - rotation_receiver.recv().unwrap(), - (FullnodeReturnType::LeaderToLeaderRotation, 1) - ); + assert_eq!(rotation_receiver.recv().unwrap(), 1); bootstrap_leader_exit(); } @@ -650,13 +614,7 @@ mod tests { ); let (rotation_sender, rotation_receiver) = channel(); let bootstrap_leader_exit = bootstrap_leader.run(Some(rotation_sender)); - assert_eq!( - rotation_receiver.recv().unwrap(), - ( - FullnodeReturnType::LeaderToValidatorRotation, - DEFAULT_SLOTS_PER_EPOCH - ) - ); + assert_eq!(rotation_receiver.recv().unwrap(), (DEFAULT_SLOTS_PER_EPOCH)); // Test that a node knows to transition to a leader based on parsing the ledger let validator = Fullnode::new( @@ -670,13 +628,7 @@ mod tests { let (rotation_sender, rotation_receiver) = channel(); let validator_exit = validator.run(Some(rotation_sender)); - assert_eq!( - rotation_receiver.recv().unwrap(), - ( - FullnodeReturnType::ValidatorToLeaderRotation, - DEFAULT_SLOTS_PER_EPOCH - ) - ); + assert_eq!(rotation_receiver.recv().unwrap(), (DEFAULT_SLOTS_PER_EPOCH)); validator_exit(); bootstrap_leader_exit(); @@ -753,10 +705,7 @@ mod tests { let (rotation_sender, rotation_receiver) = channel(); let validator_exit = validator.run(Some(rotation_sender)); let rotation = rotation_receiver.recv().unwrap(); - assert_eq!( - rotation, - (FullnodeReturnType::ValidatorToLeaderRotation, blobs_to_send) - ); + assert_eq!(rotation, blobs_to_send); // Close the validator so that rocksdb has locks available validator_exit(); diff --git a/src/replay_stage.rs b/src/replay_stage.rs index b27a8b10a3ab78..39f85189f3ddb6 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -83,7 +83,7 @@ impl ReplayStage { let bank = bank_forks.read().unwrap().get(bank_id).unwrap().clone(); if Self::is_tpu(&bank, my_id) { trace!( - "sending rotate signal to bootstrap bank {} {} {} forks_info_id: {}", + "to_leader_sender: sending rotate signal to bootstrap bank {} {} {} forks_info_id: {}", bank.id(), bank.tick_height(), bank.last_id(), @@ -92,7 +92,7 @@ impl ReplayStage { // RPC can be made aware of last slot's bank to_leader_sender .send(TvuRotationInfo { - bank, + tick_height: bank.tick_height(), last_entry_id: bank_forks_info[0].last_entry_id, slot: bank_id, leader_id: my_id, @@ -154,8 +154,7 @@ impl ReplayStage { .unwrap() .clone(); let next_slot = *latest_slot_vote + 1; - let next_leader = - leader_schedule_utils::slot_leader_at(next_slot, &parent); + let next_leader = leader_schedule_utils::slot_leader_at(next_slot, &parent); cluster_info.write().unwrap().set_leader(next_leader); subscriptions.notify_subscribers(&parent); @@ -171,29 +170,25 @@ impl ReplayStage { cluster_info.write().unwrap().push_vote(vote); } if next_leader == my_id { - let tpu_bank = { - let mut wforks = bank_forks.write().unwrap(); - let tpu_bank = - Bank::new_from_parent_and_id(&parent, my_id, next_slot); - wforks.insert(next_slot, tpu_bank); - wforks.set_working_bank_id(next_slot); - wforks.get(next_slot).unwrap().clone() - }; - trace!( - "{} request to rotate for {} {}", - my_id, - next_slot, - next_leader - ); - to_leader_sender - .send(TvuRotationInfo { - bank: tpu_bank, - last_entry_id: parent.last_id(), - slot: next_slot, - leader_id: next_leader, - }) - .unwrap(); + let mut wforks = bank_forks.write().unwrap(); + let tpu_bank = Bank::new_from_parent_and_id(&parent, my_id, next_slot); + wforks.insert(next_slot, tpu_bank); + wforks.set_working_bank_id(next_slot); } + trace!( + "to_leader_sender: me: {} next_slot: {} next_leader: {}", + my_id, + next_slot, + next_leader + ); + to_leader_sender + .send(TvuRotationInfo { + tick_height: parent.tick_height(), + last_entry_id: parent.last_id(), + slot: next_slot, + leader_id: next_leader, + }) + .unwrap(); } inc_new_counter_info!( "replicate_stage-duration", diff --git a/src/tvu.rs b/src/tvu.rs index b2c75eadf4dddc..3445c1c62a3622 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -23,7 +23,6 @@ use crate::retransmit_stage::RetransmitStage; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; use crate::storage_stage::{StorageStage, StorageState}; -use solana_runtime::bank::Bank; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -34,7 +33,7 @@ use std::sync::{Arc, RwLock}; use std::thread; pub struct TvuRotationInfo { - pub bank: Arc, // Bank to use + pub tick_height: u64, // tick height, bank might not exist yet pub last_entry_id: Hash, // last_entry_id of that bank pub slot: u64, // slot height to initiate a rotation pub leader_id: Pubkey, // leader upon rotation