diff --git a/src/fullnode.rs b/src/fullnode.rs index 3d32ab4371097e..e28ee799593230 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -13,8 +13,8 @@ use crate::rpc_pubsub::PubSubService; use crate::service::Service; use crate::storage_stage::StorageState; use crate::streamer::BlobSender; -use crate::tpu::{Tpu, TpuReturnType, TpuRotationReceiver}; -use crate::tvu::{Sockets, Tvu, TvuReturnType, TvuRotationReceiver}; +use crate::tpu::{Tpu, TpuRotationReceiver, TpuRotationSender}; +use crate::tvu::{Sockets, Tvu}; use crate::voting_keypair::VotingKeypair; use log::Level; use solana_sdk::hash::Hash; @@ -105,8 +105,8 @@ pub struct Fullnode { tpu_sockets: Vec, broadcast_socket: UdpSocket, node_services: NodeServices, - to_leader_receiver: TvuRotationReceiver, - to_validator_receiver: TpuRotationReceiver, + rotation_sender: TpuRotationSender, + rotation_receiver: TpuRotationReceiver, blob_sender: BlobSender, } @@ -250,9 +250,8 @@ impl Fullnode { Some(Arc::new(voting_keypair)) }; - // Setup channels for rotation indications - let (to_leader_sender, to_leader_receiver) = channel(); - let (to_validator_sender, to_validator_receiver) = channel(); + // Setup channel for rotation indications + let (rotation_sender, rotation_receiver) = channel(); let blob_index = Self::get_consumed_for_slot(&blocktree, slot_height); @@ -266,7 +265,7 @@ impl Fullnode { sockets, blocktree.clone(), config.storage_rotate_count, - to_leader_sender, + &rotation_sender, &storage_state, config.entry_stream.as_ref(), ledger_signal_sender, @@ -290,7 +289,7 @@ impl Fullnode { blob_index, &last_entry_id, id, - &to_validator_sender, + &rotation_sender, &blob_sender, scheduled_leader == id, ); @@ -309,8 +308,8 @@ impl Fullnode { exit, tpu_sockets: node.sockets.tpu, broadcast_socket: node.sockets.broadcast, - to_leader_receiver, - to_validator_receiver, + rotation_sender, + rotation_receiver, blob_sender, } } @@ -360,21 +359,45 @@ impl Fullnode { (scheduled_leader, max_tick_height) } - fn leader_to_validator(&mut self, tick_height: u64) -> FullnodeReturnType { - trace!( - "leader_to_validator({:?}): tick_height={}", - self.id, - tick_height, - ); - - let (scheduled_leader, _max_tick_height) = self.get_next_leader(tick_height); + fn rotate(&mut self, tick_height: u64) -> FullnodeReturnType { + trace!("{:?}: rotate at tick_height={}", self.id, tick_height,); + let was_leader = self.node_services.tpu.is_leader(); + let (scheduled_leader, max_tick_height) = self.get_next_leader(tick_height); if scheduled_leader == self.id { - debug!("node is still the leader"); + let transition = if was_leader { + debug!("{:?} remaining in leader role", self.id); + FullnodeReturnType::LeaderToLeaderRotation + } else { + debug!("{:?} rotating to leader role", self.id); + FullnodeReturnType::ValidatorToLeaderRotation + }; + let last_entry_id = self.bank.last_id(); - self.validator_to_leader(tick_height, last_entry_id); - FullnodeReturnType::LeaderToLeaderRotation + + self.node_services.tpu.switch_to_leader( + &Arc::new(self.bank.copy_for_tpu()), + PohServiceConfig::default(), + self.tpu_sockets + .iter() + .map(|s| s.try_clone().expect("Failed to clone TPU sockets")) + .collect(), + self.broadcast_socket + .try_clone() + .expect("Failed to clone broadcast socket"), + self.cluster_info.clone(), + self.sigverify_disabled, + max_tick_height, + 0, + &last_entry_id, + self.id, + &self.rotation_sender, + &self.blob_sender, + ); + + transition } else { + debug!("{:?} rotating to validator role", self.id); self.node_services.tpu.switch_to_forwarder( self.tpu_sockets .iter() @@ -386,73 +409,6 @@ impl Fullnode { } } - pub fn validator_to_leader(&mut self, tick_height: u64, last_entry_id: Hash) { - trace!( - "validator_to_leader({:?}): tick_height={} last_entry_id={}", - self.id, - tick_height, - last_entry_id, - ); - - let (scheduled_leader, max_tick_height) = self.get_next_leader(tick_height); - assert_eq!(scheduled_leader, self.id, "node is not the leader"); - - let (to_validator_sender, to_validator_receiver) = channel(); - self.to_validator_receiver = to_validator_receiver; - self.node_services.tpu.switch_to_leader( - &Arc::new(self.bank.copy_for_tpu()), - PohServiceConfig::default(), - self.tpu_sockets - .iter() - .map(|s| s.try_clone().expect("Failed to clone TPU sockets")) - .collect(), - self.broadcast_socket - .try_clone() - .expect("Failed to clone broadcast socket"), - self.cluster_info.clone(), - self.sigverify_disabled, - max_tick_height, - 0, - &last_entry_id, - self.id, - &to_validator_sender, - &self.blob_sender, - ) - } - - fn handle_role_transition(&mut self) -> Option<(FullnodeReturnType, u64)> { - let timeout = Duration::from_secs(1); - loop { - if self.exit.load(Ordering::Relaxed) { - return None; - } - - if self.node_services.tpu.is_leader() { - let should_be_forwarder = self.to_validator_receiver.recv_timeout(timeout); - match should_be_forwarder { - Ok(TpuReturnType::LeaderRotation(tick_height)) => { - return Some((self.leader_to_validator(tick_height), tick_height + 1)); - } - Err(RecvTimeoutError::Timeout) => continue, - _ => return None, - } - } else { - let should_be_leader = self.to_leader_receiver.recv_timeout(timeout); - match should_be_leader { - Ok(TvuReturnType::LeaderRotation(tick_height, last_entry_id)) => { - self.validator_to_leader(tick_height, last_entry_id); - return Some(( - FullnodeReturnType::ValidatorToLeaderRotation, - tick_height + 1, - )); - } - Err(RecvTimeoutError::Timeout) => continue, - _ => return None, - } - } - } - } - // Runs a thread to manage node role transitions. The returned closure can be used to signal the // node to exit. pub fn run( @@ -461,22 +417,28 @@ impl Fullnode { ) -> impl FnOnce() { let (sender, receiver) = channel(); let exit = self.exit.clone(); + let timeout = Duration::from_secs(1); spawn(move || loop { - let status = self.handle_role_transition(); - match status { - None => { - debug!("node shutdown requested"); - self.close().expect("Unable to close node"); - sender.send(true).expect("Unable to signal exit"); - break; - } - Some(transition) => { - debug!("role_transition complete: {:?}", transition); + if self.exit.load(Ordering::Relaxed) { + debug!("node shutdown requested"); + self.close().expect("Unable to close node"); + sender.send(true).expect("Unable to signal exit"); + break; + } + + match self.rotation_receiver.recv_timeout(timeout) { + Ok(tick_height) => { + let transition = self.rotate(tick_height); + debug!("role transition complete: {:?}", transition); if let Some(ref rotation_notifier) = rotation_notifier { - rotation_notifier.send(transition).unwrap(); + rotation_notifier + .send((transition, tick_height + 1)) + .unwrap(); } } - }; + Err(RecvTimeoutError::Timeout) => continue, + _ => (), + } }); move || { exit.store(true, Ordering::Relaxed); @@ -927,7 +889,7 @@ mod tests { let voting_keypair = VotingKeypair::new_local(&leader_keypair); info!("Start the bootstrap leader"); - let mut leader = Fullnode::new( + let leader = Fullnode::new( leader_node, &leader_keypair, &leader_ledger_path, @@ -946,13 +908,13 @@ mod tests { converge(&leader_node_info, 2); info!("Wait for leader -> validator transition"); - let signal = leader - .to_validator_receiver + let rotation_signal = leader + .rotation_receiver .recv() .expect("signal for leader -> validator transition"); - let (rn_sender, rn_receiver) = channel(); - rn_sender.send(signal).expect("send"); - leader.to_validator_receiver = rn_receiver; + debug!("received rotation signal: {:?}", rotation_signal); + // Re-send the rotation signal, it'll be received again once the tvu is unpaused + leader.rotation_sender.send(rotation_signal).expect("send"); info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)"); { @@ -960,13 +922,13 @@ mod tests { assert!(w_last_ids.tick_height < ticks_per_slot - 1); } - // Clear the blobs we've recieved so far. After this rotation, we should + // Clear the blobs we've received so far. After this rotation, we should // no longer receive blobs from slot 0 while let Ok(_) = blob_fetch_receiver.try_recv() {} let leader_exit = leader.run(Some(rotation_sender)); - // Wait for leader_to_validator() function execution to trigger a leader to leader rotation + // Wait for Tpu bank to progress while the Tvu bank is stuck sleep(Duration::from_millis(1000)); // Tvu bank lock is released here, so tvu should start making progress again and should signal a diff --git a/src/poh_service.rs b/src/poh_service.rs index 82f597ef87d09f..9716476a636c15 100644 --- a/src/poh_service.rs +++ b/src/poh_service.rs @@ -5,7 +5,7 @@ use crate::poh_recorder::{PohRecorder, PohRecorderError}; use crate::result::Error; use crate::result::Result; use crate::service::Service; -use crate::tpu::{TpuReturnType, TpuRotationSender}; +use crate::tpu::TpuRotationSender; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread::sleep; @@ -92,8 +92,7 @@ impl PohService { let res = poh.hash(); if let Err(e) = res { if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e { - to_validator_sender - .send(TpuReturnType::LeaderRotation(max_tick_height))?; + to_validator_sender.send(max_tick_height)?; } return Err(e); } @@ -106,8 +105,7 @@ impl PohService { let res = poh.tick(); if let Err(e) = res { if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e { - // Leader rotation should only happen if a max_tick_height was specified - to_validator_sender.send(TpuReturnType::LeaderRotation(max_tick_height))?; + to_validator_sender.send(max_tick_height)?; } return Err(e); } diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 97fdc57f680d93..2019f45407d1d4 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -13,7 +13,7 @@ use crate::entry_stream::MockEntryStream as EntryStream; use crate::packet::BlobError; use crate::result::{Error, Result}; use crate::service::Service; -use crate::tvu::{TvuReturnType, TvuRotationSender}; +use crate::tvu::TvuRotationSender; use crate::voting_keypair::VotingKeypair; use log::Level; use solana_metrics::{influxdb, submit}; @@ -189,7 +189,7 @@ impl ReplayStage { exit: Arc, mut current_blob_index: u64, last_entry_id: Arc>, - to_leader_sender: TvuRotationSender, + to_leader_sender: &TvuRotationSender, entry_stream: Option<&String>, ledger_signal_sender: SyncSender, ledger_signal_receiver: Receiver, @@ -203,6 +203,7 @@ impl ReplayStage { (pause, pause_) }; let exit_ = exit.clone(); + let to_leader_sender = to_leader_sender.clone(); let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { @@ -295,12 +296,7 @@ impl ReplayStage { cluster_info.write().unwrap().set_leader(leader_id); if leader_id != last_leader_id && my_id == leader_id { - to_leader_sender - .send(TvuReturnType::LeaderRotation( - current_tick_height, - *last_entry_id.read().unwrap(), - )) - .unwrap(); + to_leader_sender.send(current_tick_height).unwrap(); } // Check for any slots that chain to this one @@ -386,7 +382,6 @@ mod test { use crate::genesis_block::GenesisBlock; use crate::leader_scheduler::{make_active_set_entries, LeaderSchedulerConfig}; use crate::replay_stage::ReplayStage; - use crate::tvu::TvuReturnType; use crate::voting_keypair::VotingKeypair; use chrono::{DateTime, FixedOffset}; use serde_json::Value; @@ -472,7 +467,7 @@ mod test { exit.clone(), meta.consumed, Arc::new(RwLock::new(last_entry_id)), - rotation_sender, + &rotation_sender, None, l_sender, l_receiver, @@ -486,8 +481,6 @@ mod test { entries_to_send.push(entry); } - let expected_last_id = entries_to_send.last().unwrap().id; - // Write the entries to the ledger, replay_stage should get notified of changes blocktree .write_entries(DEFAULT_SLOT_HEIGHT, meta.consumed, &entries_to_send) @@ -495,17 +488,10 @@ mod test { info!("Wait for replay_stage to exit and check return value is correct"); assert_eq!( - Some(TvuReturnType::LeaderRotation( - 2 * ticks_per_slot - 1, - expected_last_id, - )), - { - Some( - rotation_receiver - .recv() - .expect("should have signaled leader rotation"), - ) - } + 2 * ticks_per_slot - 1, + rotation_receiver + .recv() + .expect("should have signaled leader rotation"), ); info!("Check that the entries on the ledger writer channel are correct"); @@ -575,7 +561,7 @@ mod test { exit.clone(), entry_height, Arc::new(RwLock::new(last_entry_id)), - to_leader_sender, + &to_leader_sender, None, l_sender, l_receiver, @@ -689,7 +675,7 @@ mod test { exit.clone(), meta.consumed, Arc::new(RwLock::new(last_entry_id)), - rotation_tx, + &rotation_tx, None, l_sender, l_receiver, @@ -729,18 +715,12 @@ mod test { // Wait for replay_stage to exit and check return value is correct assert_eq!( - Some(TvuReturnType::LeaderRotation( - active_window_tick_length, - expected_last_id, - )), - { - Some( - rotation_rx - .recv() - .expect("should have signaled leader rotation"), - ) - } + active_window_tick_length, + rotation_rx + .recv() + .expect("should have signaled leader rotation") ); + assert_ne!(expected_last_id, Hash::default()); //replay stage should continue running even after rotation has happened (tvu never goes down) replay_stage diff --git a/src/tpu.rs b/src/tpu.rs index 6991192aedf58b..dc35d1a8419563 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -20,10 +20,7 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, RwLock}; use std::thread; -pub enum TpuReturnType { - LeaderRotation(u64), -} - +pub type TpuReturnType = u64; // tick_height to initiate a rotation pub type TpuRotationSender = Sender; pub type TpuRotationReceiver = Receiver; diff --git a/src/tvu.rs b/src/tvu.rs index 9d1cb42141f44b..6c2f0658261c0c 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -21,22 +21,19 @@ use crate::retransmit_stage::RetransmitStage; use crate::service::Service; use crate::storage_stage::{StorageStage, StorageState}; use crate::streamer::BlobSender; +use crate::tpu::{TpuReturnType, TpuRotationReceiver, TpuRotationSender}; use crate::voting_keypair::VotingKeypair; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; +use std::sync::mpsc::{channel, Receiver, SyncSender}; use std::sync::{Arc, RwLock}; use std::thread; -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum TvuReturnType { - LeaderRotation(u64, Hash), -} - -pub type TvuRotationSender = Sender; -pub type TvuRotationReceiver = Receiver; +pub type TvuReturnType = TpuReturnType; +pub type TvuRotationSender = TpuRotationSender; +pub type TvuRotationReceiver = TpuRotationReceiver; pub struct Tvu { fetch_stage: BlobFetchStage, @@ -75,7 +72,7 @@ impl Tvu { sockets: Sockets, blocktree: Arc, storage_rotate_count: u64, - to_leader_sender: TvuRotationSender, + to_leader_sender: &TvuRotationSender, storage_state: &StorageState, entry_stream: Option<&String>, ledger_signal_sender: SyncSender, @@ -261,7 +258,7 @@ pub mod tests { }, Arc::new(blocktree), STORAGE_ROTATE_TEST_COUNT, - sender, + &sender, &StorageState::default(), None, l_sender, @@ -348,7 +345,7 @@ pub mod tests { }, Arc::new(blocktree), STORAGE_ROTATE_TEST_COUNT, - sender, + &sender, &StorageState::default(), None, l_sender,