diff --git a/src/bank_fork.rs b/src/bank_fork.rs index b8dab76a0d9226..c98518ab5da176 100644 --- a/src/bank_fork.rs +++ b/src/bank_fork.rs @@ -294,9 +294,10 @@ impl BankFork { let hash = Transaction::hash(&processed_transactions); // record and unlock will unlock all the successfull transactions poh.record(hash, processed_transactions).map_err(|e| { - warn!("record failure: {:?}", e); + trace!("record failure: {:?}", e); match e { Error::PohRecorderError(PohRecorderError::MaxHeightReached) => { + trace!("max_height reached"); BankError::MaxHeightReached } _ => BankError::RecordFailure, diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 02949f0535eb41..07dc46357fcae7 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -65,8 +65,7 @@ impl BankingStage { // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its last_id is registered with the bank. - let poh_service = - PohService::new(poh_recorder.clone(), config, to_validator_sender.clone()); + let poh_service = PohService::new(poh_recorder.clone(), config); // Single thread to compute confirmation let compute_confirmation_service = ComputeLeaderConfirmationService::new( @@ -83,6 +82,8 @@ impl BankingStage { let thread_verified_receiver = shared_verified_receiver.clone(); let thread_poh_recorder = poh_recorder.clone(); let thread_banking_exit = poh_service.poh_exit.clone(); + let thread_to_validator_sender = to_validator_sender.clone(); + Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { @@ -105,7 +106,6 @@ impl BankingStage { break Some(BankingStageReturnType::ChannelDisconnected); } Error::BankError(BankError::RecordFailure) => { - warn!("Bank failed to record"); break Some(BankingStageReturnType::ChannelDisconnected); } Error::BankError(BankError::MaxHeightReached) => { @@ -126,6 +126,19 @@ impl BankingStage { // Signal exit only on "Some" error if return_result.is_some() { thread_banking_exit.store(true, Ordering::Relaxed); + } else { + // TODO: pass current_slot to TPU construction... + let current_slot = thread_bank.active_fork().head().fork_id(); + + trace!( + "leader for slot {} done at {}", + current_slot, + max_tick_height + ); + thread_bank.active_fork().head().freeze(); + thread_bank.merge_into_root(current_slot); + + thread_to_validator_sender.send(max_tick_height).unwrap(); } return_result }) @@ -257,6 +270,9 @@ impl Service for BankingStage { self.compute_confirmation_service.join()?; let poh_return_value = self.poh_service.join()?; + + trace!("banking_stage join {:?}", poh_return_value); + match poh_return_value { Ok(_) => (), Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => { diff --git a/src/fullnode.rs b/src/fullnode.rs index 5b49529967e423..fd6a33f3bb6cfb 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -333,13 +333,13 @@ impl Fullnode { } let (scheduled_leader, max_tick_height) = { - let mut leader_scheduler = self.bank.leader_scheduler.write().unwrap(); + let leader_scheduler = self.bank.leader_scheduler.read().unwrap(); // A transition is only permitted on the final tick of a slot assert_eq!(leader_scheduler.num_ticks_left_in_slot(tick_height), 0); let first_tick_of_next_slot = tick_height + 1; - leader_scheduler.update_tick_height(first_tick_of_next_slot, &self.bank); + //leader_scheduler.update_tick_height(first_tick_of_next_slot, &self.bank); let slot = leader_scheduler.tick_height_to_slot(first_tick_of_next_slot); ( leader_scheduler.get_leader_for_slot(slot).unwrap(), @@ -364,16 +364,22 @@ impl Fullnode { } fn rotate(&mut self, tick_height: u64) -> FullnodeReturnType { - trace!("{:?}: rotate at tick_height={}", self.id, tick_height,); let was_leader = self.node_services.tpu.is_leader(); + trace!( + "{:?}: rotate at tick_height: {}, {} leader", + self.id, + tick_height, + if was_leader { "was" } else { "wasn't" } + ); + let (scheduled_leader, max_tick_height) = self.get_next_leader(tick_height); if scheduled_leader == self.id { let transition = if was_leader { - debug!("{:?} remaining in leader role", self.id); + debug!("{:?} remaining in leader role at {}", self.id, tick_height); FullnodeReturnType::LeaderToLeaderRotation } else { - debug!("{:?} rotating to leader role", self.id); + debug!("{:?} rotating to leader role at {}", self.id, tick_height); FullnodeReturnType::ValidatorToLeaderRotation }; @@ -432,6 +438,7 @@ impl Fullnode { match self.rotation_receiver.recv_timeout(timeout) { Ok(tick_height) => { + debug!("received rotation at {}", tick_height); let transition = self.rotate(tick_height); debug!("role transition complete: {:?}", transition); if let Some(ref rotation_notifier) = rotation_notifier { @@ -929,15 +936,20 @@ mod tests { .recv() .expect("signal for leader -> validator transition"); debug!("received rotation signal: {:?}", rotation_signal); + // Re-send the rotation signal, it'll be received again once the tvu is unpaused leader.rotation_sender.send(rotation_signal).expect("send"); - info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)"); - { - let bank_state = leader.bank.active_fork(); - let w_last_ids = bank_state.head().last_ids().write().unwrap(); - assert!(w_last_ids.tick_height < ticks_per_slot - 1); - } + // info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)"); + // { + // let bank_state = leader.bank.fork(0).expect("validator should be at slot 1"); + // let w_last_ids = bank_state.head().last_ids().write().unwrap(); + // info!( + // "w_last_ids.tick_height: {} ticks_per_slot: {}", + // w_last_ids.tick_height, ticks_per_slot + // ); + // assert!(w_last_ids.tick_height < ticks_per_slot - 1); + // } // Clear the blobs we've received so far. After this rotation, we should // no longer receive blobs from slot 0 diff --git a/src/poh_service.rs b/src/poh_service.rs index 1a6e9813432ad5..0213779c8ebe7b 100644 --- a/src/poh_service.rs +++ b/src/poh_service.rs @@ -1,11 +1,9 @@ //! The `poh_service` module implements a service that records the passing of //! "ticks", a measure of time in the PoH stream -use crate::poh_recorder::{PohRecorder, PohRecorderError}; -use crate::result::Error; +use crate::poh_recorder::PohRecorder; use crate::result::Result; use crate::service::Service; -use crate::tpu::TpuRotationSender; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread::sleep; @@ -46,11 +44,7 @@ impl PohService { self.join() } - pub fn new( - poh_recorder: PohRecorder, - config: PohServiceConfig, - to_validator_sender: TpuRotationSender, - ) -> Self { + pub fn new(poh_recorder: PohRecorder, config: PohServiceConfig) -> Self { // PohService is a headless producer, so when it exits it should notify the banking stage. // Since channel are not used to talk between these threads an AtomicBool is used as a // signal. @@ -61,12 +55,7 @@ impl PohService { .name("solana-poh-service-tick_producer".to_string()) .spawn(move || { let mut poh_recorder_ = poh_recorder; - let return_value = Self::tick_producer( - &mut poh_recorder_, - config, - &poh_exit_, - &to_validator_sender, - ); + let return_value = Self::tick_producer(&mut poh_recorder_, config, &poh_exit_); poh_exit_.store(true, Ordering::Relaxed); return_value }) @@ -82,33 +71,19 @@ impl PohService { poh: &mut PohRecorder, config: PohServiceConfig, poh_exit: &AtomicBool, - to_validator_sender: &TpuRotationSender, ) -> Result<()> { - let max_tick_height = poh.max_tick_height(); loop { match config { PohServiceConfig::Tick(num) => { for _ in 1..num { - let res = poh.hash(); - if let Err(e) = res { - if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e { - to_validator_sender.send(max_tick_height)?; - } - return Err(e); - } + poh.hash()?; } } PohServiceConfig::Sleep(duration) => { sleep(duration); } } - let res = poh.tick(); - if let Err(e) = res { - if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e { - to_validator_sender.send(max_tick_height)?; - } - return Err(e); - } + poh.tick()?; if poh_exit.load(Ordering::Relaxed) { return Ok(()); } @@ -164,11 +139,9 @@ mod tests { }; const HASHES_PER_TICK: u64 = 2; - let (sender, _) = channel(); let poh_service = PohService::new( poh_recorder, PohServiceConfig::Tick(HASHES_PER_TICK as usize), - sender, ); // get some events diff --git a/src/replay_stage.rs b/src/replay_stage.rs index aa2478e25501fb..e64f344bf8b2a8 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -193,7 +193,7 @@ impl ReplayStage { exit: Arc, mut current_blob_index: u64, last_entry_id: Arc>, - to_leader_sender: &TvuRotationSender, + rotation_sender: &TvuRotationSender, ledger_signal_sender: SyncSender, ledger_signal_receiver: Receiver, ) -> (Self, EntryReceiver) { @@ -205,7 +205,7 @@ impl ReplayStage { (pause, pause_) }; let exit_ = exit.clone(); - let to_leader_sender = to_leader_sender.clone(); + let rotation_sender = rotation_sender.clone(); let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { @@ -238,6 +238,8 @@ impl ReplayStage { if current_slot.is_none() { let new_slot = Self::get_next_slot( + my_id, + &bank, &blocktree, prev_slot.expect("prev_slot must exist"), ); @@ -273,15 +275,17 @@ impl ReplayStage { let entry_len = entries.len(); // Fetch the next entries from the database if !entries.is_empty() { + let slot = current_slot.expect("current_slot must exist"); + // TODO: ledger provides from get_slot_entries() - let base_slot = match current_slot.expect("current_slot must exist") { + let base_slot = match slot { 0 => 0, x => x - 1, }; if let Err(e) = Self::process_entries( entries, - current_slot.expect("current_slot must exist"), + slot, base_slot, &bank, &cluster_info, @@ -293,7 +297,10 @@ impl ReplayStage { error!("process_entries failed: {:?}", e); } - let current_tick_height = bank.active_fork().tick_height(); + let current_tick_height = bank + .fork(slot) + .expect("fork for current slot must exist") + .tick_height(); // we've reached the end of a slot, reset our state and check // for leader rotation @@ -305,20 +312,29 @@ impl ReplayStage { // Check for leader rotation let leader_id = Self::get_leader_for_next_tick(&bank); - info!( + trace!( "leader_id: {} last_leader_id: {} my_id: {}", - leader_id, last_leader_id, my_id + leader_id, + last_leader_id, + my_id ); // TODO: Remove this soon once we boot the leader from ClusterInfo cluster_info.write().unwrap().set_leader(leader_id); if leader_id != last_leader_id && my_id == leader_id { - to_leader_sender.send(current_tick_height).unwrap(); - } + // construct the leader's bank_state + bank.init_fork(slot + 1, &last_entry_id.read().unwrap(), slot) + .expect("init fork"); - // Check for any slots that chain to this one - prev_slot = current_slot; + rotation_sender.send(current_tick_height).unwrap(); + + // causes prev_slot to advance past my leader slot + prev_slot = Some(slot + 1); + } else { + // Check for any slots that chain to this one + prev_slot = current_slot; + } current_slot = None; last_leader_id = leader_id; continue; @@ -331,6 +347,10 @@ impl ReplayStage { // Update disconnected, exit break; } + info!( + "{} replay_stage trying on current_slot {:?}", + my_id, current_slot + ); } }) .unwrap(); @@ -371,9 +391,16 @@ impl ReplayStage { .expect("Scheduled leader should be calculated by this point") } - fn get_next_slot(blocktree: &Blocktree, slot_index: u64) -> Option { + fn get_next_slot( + my_id: Pubkey, + bank: &Arc, + blocktree: &Blocktree, + slot_index: u64, + ) -> Option { // Find the next slot that chains to the old slot - let next_slots = blocktree.get_slots_since(&[slot_index]).expect("Db error"); + let mut next_slots = blocktree.get_slots_since(&[slot_index]).expect("Db error"); + let leader_scheduler = bank.leader_scheduler.read().unwrap(); + next_slots.retain(|slot| leader_scheduler.get_leader_for_slot(*slot) != Some(my_id)); next_slots.first().cloned() } }