diff --git a/src/bank_forks.rs b/src/bank_forks.rs index 848fff1eca109c..12ab8579bd6adc 100644 --- a/src/bank_forks.rs +++ b/src/bank_forks.rs @@ -18,23 +18,32 @@ impl BankForks { banks, } } - + pub fn frozen_banks(&self) -> Vec { + self.banks + .iter() + .filter(|(_k, v)| v.is_frozen()) + .map(|(k, _v)| *k) + .collect() + } + pub fn active_banks(&self) -> Vec { + self.banks + .iter() + .filter(|(_k, v)| !v.is_frozen()) + .map(|(k, _v)| *k) + .collect() + } pub fn working_bank(&self) -> Arc { self.banks[&self.working_bank_id].clone() } + pub fn get(&self, bank_id: u64) -> Option<&Arc> { + self.banks.get(&bank_id) + } + // TODO: use the bank's own ID instead of receiving a parameter pub fn insert(&mut self, bank_id: u64, bank: Bank) { - let mut bank = Arc::new(bank); + let bank = Arc::new(bank); self.banks.insert(bank_id, bank.clone()); - - // TODO: this really only needs to look at the first - // parent if we're always calling insert() - // when we construct a child bank - while let Some(parent) = bank.parent() { - self.banks.remove(&parent.id()); - bank = parent; - } } pub fn set_working_bank_id(&mut self, bank_id: u64) { diff --git a/src/blocktree.rs b/src/blocktree.rs index 11a63c163e5932..54d53c3e15e366 100644 --- a/src/blocktree.rs +++ b/src/blocktree.rs @@ -817,6 +817,7 @@ impl Blocktree { max_missing, ) } + /// Returns the entry vector for the slot starting with `blob_start_index` pub fn get_slot_entries( &self, @@ -824,6 +825,16 @@ impl Blocktree { blob_start_index: u64, max_entries: Option, ) -> Result> { + self.get_slot_entries_with_blob_count(slot_height, blob_start_index, max_entries) + .map(|x| x.0) + } + + pub fn get_slot_entries_with_blob_count( + &self, + slot_height: u64, + blob_start_index: u64, + max_entries: Option, + ) -> Result<(Vec, usize)> { // Find the next consecutive block of blobs. let consecutive_blobs = self.get_slot_consecutive_blobs( slot_height, @@ -831,7 +842,8 @@ impl Blocktree { blob_start_index, max_entries, )?; - Ok(Self::deserialize_blobs(&consecutive_blobs)) + let num = consecutive_blobs.len(); + Ok((Self::deserialize_blobs(&consecutive_blobs), num)) } // Returns slots connecting to any element of the list `slot_heights`. diff --git a/src/fullnode.rs b/src/fullnode.rs index e5d2e418739869..4bb65852f67f72 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -132,6 +132,7 @@ impl Fullnode { let bank_info = &bank_forks_info[0]; bank_forks.set_working_bank_id(bank_info.bank_id); let bank = bank_forks.working_bank(); + assert!(bank_forks.get(bank_info.bank_id).is_some()); info!( "starting PoH... {} {}", @@ -296,8 +297,16 @@ impl Fullnode { debug!("{:?} rotating to leader role", self.id); FullnodeReturnType::ValidatorToLeaderRotation }; + let tpu_bank = self + .bank_forks + .read() + .unwrap() + .get(rotation_info.slot) + .unwrap() + .clone(); + trace!("switch to leader"); self.node_services.tpu.switch_to_leader( - &self.bank_forks.read().unwrap().working_bank(), + tpu_bank, &self.poh_recorder, self.tpu_sockets .iter() @@ -352,10 +361,10 @@ impl Fullnode { trace!("{:?}: rotate at slot={}", self.id, rotation_info.slot); //TODO: this will be called by the TVU every time it votes //instead of here - self.poh_recorder.lock().unwrap().reset( - rotation_info.bank.tick_height(), - rotation_info.last_entry_id, - ); + self.poh_recorder + .lock() + .unwrap() + .reset(rotation_info.tick_height, rotation_info.last_entry_id); let slot = rotation_info.slot; let transition = self.rotate(rotation_info); debug!("role transition complete: {:?}", transition); diff --git a/src/poh_recorder.rs b/src/poh_recorder.rs index 36615c8e11b2a9..0606e9ef3dc1f9 100644 --- a/src/poh_recorder.rs +++ b/src/poh_recorder.rs @@ -63,6 +63,7 @@ impl PohRecorder { } pub fn set_working_bank(&mut self, working_bank: WorkingBank) { + trace!("new working bank"); self.working_bank = Some(working_bank); } @@ -94,8 +95,9 @@ impl PohRecorder { .take_while(|x| x.1 <= working_bank.max_tick_height) .count(); let e = if cnt > 0 { - trace!( - "flush_cache: {} {} sending: {}", + debug!( + "flush_cache: bank_id: {} tick_height: {} max: {} sending: {}", + working_bank.bank.id(), working_bank.bank.tick_height(), working_bank.max_tick_height, cnt, diff --git a/src/replay_stage.rs b/src/replay_stage.rs index b48b97ba5b285c..a0e79a1332b27b 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -2,26 +2,32 @@ use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; -use crate::blocktree_processor::{self, BankForksInfo}; +use crate::blocktree_processor; +use crate::blocktree_processor::BankForksInfo; use crate::cluster_info::ClusterInfo; +<<<<<<< HEAD use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice}; use crate::leader_schedule_utils; +======= +use crate::entry::{Entry, EntrySlice}; +use crate::leader_scheduler::LeaderScheduler; +>>>>>>> dd66d8c5... very wip, simple replay stage use crate::packet::BlobError; -use crate::result::{Error, Result}; +use crate::result; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; use crate::tvu::{TvuRotationInfo, TvuRotationSender}; use solana_metrics::counter::Counter; -use solana_metrics::{influxdb, submit}; use solana_runtime::bank::Bank; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::KeypairUtil; use solana_sdk::timing::duration_as_ms; use solana_sdk::vote_transaction::VoteTransaction; +use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::Receiver; use std::sync::mpsc::RecvTimeoutError; -use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; @@ -53,116 +59,6 @@ pub struct ReplayStage { } impl ReplayStage { - /// Process entry blobs, already in order - #[allow(clippy::too_many_arguments)] - fn process_entries( - mut entries: Vec, - bank: &Arc, - cluster_info: &Arc>, - voting_keypair: &Option>, - forward_entry_sender: &EntrySender, - current_blob_index: &mut u64, - last_entry_id: &mut Hash, - subscriptions: &Arc, - ) -> Result<()> { - // Coalesce all the available entries into a single vote - submit( - influxdb::Point::new("replicate-stage") - .add_field("count", influxdb::Value::Integer(entries.len() as i64)) - .to_owned(), - ); - - let mut res = Ok(()); - let mut num_entries_to_write = entries.len(); - let now = Instant::now(); - - if !entries.as_slice().verify(last_entry_id) { - inc_new_counter_info!("replicate_stage-verify-fail", entries.len()); - return Err(Error::BlobError(BlobError::VerificationFailed)); - } - inc_new_counter_info!( - "replicate_stage-verify-duration", - duration_as_ms(&now.elapsed()) as usize - ); - - let num_ticks = bank.tick_height(); - let slot_height = bank.slot_height(); - - let mut num_ticks_to_next_vote = - leader_schedule_utils::num_ticks_left_in_slot(bank, num_ticks); - - for (i, entry) in entries.iter().enumerate() { - inc_new_counter_info!("replicate-stage_bank-tick", bank.tick_height() as usize); - if entry.is_tick() { - if num_ticks_to_next_vote == 0 { - num_ticks_to_next_vote = bank.ticks_per_slot(); - } - num_ticks_to_next_vote -= 1; - } - inc_new_counter_info!( - "replicate-stage_tick-to-vote", - num_ticks_to_next_vote as usize - ); - // If it's the last entry in the vector, i will be vec len - 1. - // If we don't process the entry now, the for loop will exit and the entry - // will be dropped. - if 0 == num_ticks_to_next_vote || (i + 1) == entries.len() { - res = blocktree_processor::process_entries(bank, &entries[0..=i]); - - if res.is_err() { - // TODO: This will return early from the first entry that has an erroneous - // transaction, instead of processing the rest of the entries in the vector - // of received entries. This is in line with previous behavior when - // bank.process_entries() was used to process the entries, but doesn't solve the - // issue that the bank state was still changed, leading to inconsistencies with the - // leader as the leader currently should not be publishing erroneous transactions - inc_new_counter_info!("replicate-stage_failed_process_entries", i); - break; - } - - if 0 == num_ticks_to_next_vote { - subscriptions.notify_subscribers(&bank); - if let Some(voting_keypair) = voting_keypair { - let keypair = voting_keypair.as_ref(); - let vote = - VoteTransaction::new_vote(keypair, slot_height, bank.last_id(), 0); - cluster_info.write().unwrap().push_vote(vote); - } - } - num_entries_to_write = i + 1; - break; - } - } - - // If leader rotation happened, only write the entries up to leader rotation. - entries.truncate(num_entries_to_write); - *last_entry_id = entries - .last() - .expect("Entries cannot be empty at this point") - .id; - - inc_new_counter_info!( - "replicate-transactions", - entries.iter().map(|x| x.transactions.len()).sum() - ); - - let entries_len = entries.len() as u64; - // TODO: In line with previous behavior, this will write all the entries even if - // an error occurred processing one of the entries (causing the rest of the entries to - // not be processed). - if entries_len != 0 { - forward_entry_sender.send(entries)?; - } - - *current_blob_index += entries_len; - res?; - inc_new_counter_info!( - "replicate_stage-duration", - duration_as_ms(&now.elapsed()) as usize - ); - Ok(()) - } - #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] pub fn new( my_id: Pubkey, @@ -175,77 +71,53 @@ impl ReplayStage { to_leader_sender: &TvuRotationSender, ledger_signal_receiver: Receiver, subscriptions: &Arc, - ) -> (Self, Receiver<(u64, Pubkey)>, EntryReceiver) + ) -> Self where T: 'static + KeypairUtil + Send + Sync, { - let (forward_entry_sender, forward_entry_receiver) = channel(); - let (slot_full_sender, slot_full_receiver) = channel(); + trace!("replay stage"); let exit_ = exit.clone(); let to_leader_sender = to_leader_sender.clone(); - let subscriptions_ = subscriptions.clone(); - - // Gather up all the metadata about the current state of the ledger - let (mut bank, tick_height, mut last_entry_id, mut current_blob_index) = { - let mut bank_forks = bank_forks.write().unwrap(); - bank_forks.set_working_bank_id(bank_forks_info[0].bank_id); - let bank = bank_forks.working_bank(); - let tick_height = bank.tick_height(); - ( - bank, - tick_height, - bank_forks_info[0].last_entry_id, - bank_forks_info[0].next_blob_index, - ) - }; - - // Update Tpu and other fullnode components with the current bank - let (mut current_slot, mut current_leader_id, mut max_tick_height_for_slot) = { - let slot = (tick_height + 1) / bank.ticks_per_slot(); - let first_tick_in_slot = slot * bank.ticks_per_slot(); - - let leader_id = leader_schedule_utils::slot_leader_at(slot, &bank); - trace!("node {:?} scheduled as leader for slot {}", leader_id, slot,); + let subscriptions = subscriptions.clone(); + let bank_forks = bank_forks.clone(); - let old_bank = bank.clone(); - // If the next slot is going to be a new slot and we're the leader for that slot, - // make a new working bank, set it as the working bank. - if tick_height + 1 == first_tick_in_slot { - if leader_id == my_id { - bank = Self::create_and_set_working_bank(slot, &bank_forks, &old_bank); - } - current_blob_index = 0; + // bootstrap first leader + let active_banks = bank_forks.read().unwrap().active_banks(); + for bank_id in active_banks { + trace!("replay started with active bank {}", bank_id); + let bank = bank_forks.read().unwrap().get(bank_id).unwrap().clone(); + if Self::is_tpu(&bank, my_id) { + trace!( + "sending rotate signal to bootstrap bank {} {} {} forks_info_id: {}", + bank.id(), + bank.tick_height(), + bank.last_id(), + bank_forks_info[0].last_entry_id, + ); + // RPC can be made aware of last slot's bank + to_leader_sender + .send(TvuRotationInfo { + tick_height: bank.tick_height(), + last_entry_id: bank_forks_info[0].last_entry_id, + slot: bank_id, + leader_id: my_id, + }) + .unwrap(); + break; } - - // Send a rotation notification back to Fullnode to initialize the TPU to the right - // state. After this point, the bank.tick_height() is live, which it means it can - // be updated by the TPU - to_leader_sender - .send(TvuRotationInfo { - bank: old_bank, - last_entry_id, - slot, - leader_id, - }) - .unwrap(); - - let max_tick_height_for_slot = first_tick_in_slot - + leader_schedule_utils::num_ticks_left_in_slot(&bank, first_tick_in_slot); - - (Some(slot), leader_id, max_tick_height_for_slot) - }; + } + let mut progress: HashMap = bank_forks_info + .iter() + .map(|x| (x.bank_id, (x.last_entry_id, x.next_blob_index as usize))) + .collect(); // Start the replay stage loop - let bank_forks = bank_forks.clone(); let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { let _exit = Finalizer::new(exit_.clone()); - let mut prev_slot = None; - - // Loop through blocktree MAX_ENTRY_RECV_PER_ITER entries at a time for each - // relevant slot to see if there are any available updates loop { + let now = Instant::now(); // Stop getting entries if we get exit signal if exit_.load(Ordering::Relaxed) { break; @@ -258,151 +130,112 @@ impl ReplayStage { Ok(_) => (), }; - if current_slot.is_none() { - let new_slot = Self::get_next_slot( - &blocktree, - prev_slot.expect("prev_slot must exist"), - ); - if new_slot.is_some() { - trace!("{} replay_stage: new_slot found: {:?}", my_id, new_slot); - // Reset the state - bank = Self::create_and_set_working_bank( - new_slot.unwrap(), - &bank_forks, - &bank, - ); - current_slot = new_slot; - Self::reset_state( - bank.ticks_per_slot(), - current_slot.unwrap(), - &mut max_tick_height_for_slot, - &mut current_blob_index, - ); - } else { - continue; - } - } - - // current_slot must be Some(x) by this point - let slot = current_slot.unwrap(); - - // Fetch the next entries from the database - let entries = { - if current_leader_id != my_id { - info!( - "{} replay_stage: asking for entries from slot: {}, bi: {}", - my_id, slot, current_blob_index - ); - if let Ok(entries) = blocktree.get_slot_entries( - slot, - current_blob_index, - Some(MAX_ENTRY_RECV_PER_ITER as u64), - ) { - entries - } else { - vec![] - } - } else { - vec![] + Self::generate_new_bank_forks(&blocktree, &mut bank_forks.write().unwrap()); + let live_bank_ids = bank_forks.read().unwrap().active_banks(); + trace!("live banks {:?}", live_bank_ids); + let mut votable: Vec = vec![]; + for bank_id in live_bank_ids { + let bank = bank_forks.read().unwrap().get(bank_id).unwrap().clone(); + if !Self::is_tpu(&bank, my_id) { + bank_forks.write().unwrap().set_working_bank_id(bank_id); + Self::replay_blocktree_into_bank(&bank, &blocktree, &mut progress); } - }; - - if !entries.is_empty() { - if let Err(e) = Self::process_entries( - entries, - &bank, - &cluster_info, - &voting_keypair, - &forward_entry_sender, - &mut current_blob_index, - &mut last_entry_id, - &subscriptions_, - ) { - error!("{} process_entries failed: {:?}", my_id, e); + let max_tick_height = (bank_id + 1) * bank.ticks_per_slot() - 1; + if bank.tick_height() == max_tick_height { + bank.freeze(); + votable.push(bank_id); + progress.remove(&bank_id); } } - - let current_tick_height = bank.tick_height(); - - // We've reached the end of a slot, reset our state and check - // for leader rotation - if max_tick_height_for_slot == current_tick_height { - if let Err(e) = slot_full_sender.send((slot, current_leader_id)) { - error!("{} slot_full alert failed: {:?}", my_id, e); - } - - // Check for leader rotation - let (leader_id, next_slot) = { - let slot = (current_tick_height + 1) / bank.ticks_per_slot(); - - (leader_schedule_utils::slot_leader_at(slot, &bank), slot) - }; - - // If we were the leader for the last slot update the last id b/c we - // haven't processed any of the entries for the slot for which we were - // the leader - if current_leader_id == my_id { - let meta = blocktree.meta(slot).unwrap().expect("meta has to exist"); - if meta.last_index == std::u64::MAX { - // Ledger hasn't gotten last blob yet, break and wait - // for a signal - continue; - } - let last_entry = blocktree - .get_slot_entries(slot, meta.last_index, Some(1)) - .unwrap(); - last_entry_id = last_entry[0].id; - } - - let old_bank = bank.clone(); - prev_slot = current_slot; - if my_id == leader_id { - // Create new bank for next slot if we are the leader for that slot - bank = Self::create_and_set_working_bank( - next_slot, - &bank_forks, - &old_bank, + // TODO: fork selection + // vote on the latest one for now + votable.sort(); + + if let Some(latest_slot_vote) = votable.last() { + let bank = bank_forks + .read() + .unwrap() + .get(*latest_slot_vote) + .unwrap() + .clone(); + let next_slot = *latest_slot_vote + 1; + let next_leader = + LeaderScheduler::default().slot_leader_at(next_slot, &bank); + cluster_info.write().unwrap().set_leader(next_leader); + + subscriptions.notify_subscribers(&bank); + + if let Some(ref voting_keypair) = voting_keypair { + let keypair = voting_keypair.as_ref(); + let vote = VoteTransaction::new_vote( + keypair, + *latest_slot_vote, + bank.last_id(), + 0, ); - current_slot = Some(next_slot); - Self::reset_state( - bank.ticks_per_slot(), - next_slot, - &mut max_tick_height_for_slot, - &mut current_blob_index, - ); - } else { - current_slot = None; + cluster_info.write().unwrap().push_vote(vote); } - - if leader_id != current_leader_id { - // TODO: Remove this soon once we boot the leader from ClusterInfo - cluster_info.write().unwrap().set_leader(leader_id); + if next_leader == my_id { + let mut wforks = bank_forks.write().unwrap(); + let has_bank = wforks.get(next_slot).is_some(); + if !has_bank { + let tpu_bank = + Bank::new_from_parent_and_id(&bank, my_id, next_slot); + wforks.insert(next_slot, tpu_bank); + wforks.set_working_bank_id(next_slot); + } } - // Always send rotation signal so that other services like // RPC can be made aware of last slot's bank + trace!( + "{} request to rotate for {} {}", + my_id, + next_slot, + next_leader + ); to_leader_sender .send(TvuRotationInfo { - bank: old_bank, - last_entry_id, + tick_height: bank.tick_height(), + last_entry_id: bank.last_id(), slot: next_slot, - leader_id, + leader_id: next_leader, }) .unwrap(); - - // Check for any slots that chain to this one - current_leader_id = leader_id; - continue; } + inc_new_counter_info!( + "replicate_stage-duration", + duration_as_ms(&now.elapsed()) as usize + ); } }) .unwrap(); + (Self { t_replay, exit }) + } + + pub fn replay_blocktree_into_bank( + bank: &Bank, + blocktree: &Blocktree, + progress: &mut HashMap, + ) { + let bank_id = bank.id(); + let bank_progress = &mut progress.entry(bank_id).or_insert((bank.last_id(), 0)); + let result = + blocktree.get_slot_entries_with_blob_count(bank_id, bank_progress.1 as u64, None); + if let Ok((entries, num)) = result { + let err = Self::verify_and_process_entries(&bank, &entries, &bank_progress.0); + if err.is_err() { + //TODO: mark this fork as failed + inc_new_counter_info!("replicate-stage_failed_process_entries", entries.len()); + } + bank_progress.1 += num; + if let Some(last_entry) = entries.last() { + bank_progress.0 = last_entry.id; + } + } + } - ( - Self { t_replay, exit }, - slot_full_receiver, - forward_entry_receiver, - ) + pub fn is_tpu(bank: &Bank, my_id: Pubkey) -> bool { + my_id == LeaderScheduler::default().slot_leader_at(bank.id(), &bank) } pub fn close(self) -> thread::Result<()> { @@ -414,33 +247,34 @@ impl ReplayStage { self.exit.store(true, Ordering::Relaxed); } - fn create_and_set_working_bank( - slot: u64, - bank_forks: &Arc>, - parent: &Arc, - ) -> Arc { - let new_bank = Bank::new_from_parent(&parent); - new_bank.squash(); - let mut bank_forks = bank_forks.write().unwrap(); - bank_forks.insert(slot, new_bank); - bank_forks.set_working_bank_id(slot); - bank_forks.working_bank() - } - - fn reset_state( - ticks_per_slot: u64, - slot: u64, - max_tick_height_for_slot: &mut u64, - current_blob_index: &mut u64, - ) { - *current_blob_index = 0; - *max_tick_height_for_slot = (slot + 1) * ticks_per_slot - 1; + pub fn verify_and_process_entries( + bank: &Bank, + entries: &[Entry], + last_entry: &Hash, + ) -> result::Result<()> { + if !entries.verify(last_entry) { + return Err(result::Error::BlobError(BlobError::VerificationFailed)); + } + blocktree_processor::process_entries(bank, entries)?; + Ok(()) } - fn get_next_slot(blocktree: &Blocktree, slot_index: u64) -> Option { + fn generate_new_bank_forks(blocktree: &Blocktree, forks: &mut BankForks) { // Find the next slot that chains to the old slot - let next_slots = blocktree.get_slots_since(&[slot_index]).expect("Db error"); - next_slots.first().cloned() + let frozen_banks = forks.frozen_banks(); + trace!("generate new forks {:?}", frozen_banks); + for frozen in frozen_banks { + let next_slots = blocktree.get_slots_since(&[frozen]).expect("Db error"); + let parent = forks.get(frozen).unwrap().clone(); + for next in next_slots { + let new_fork = forks.get(next).is_none(); + if new_fork { + let leader = LeaderScheduler::default().slot_leader_at(next, &parent); + trace!("new fork:{} parent:{}", next, frozen); + forks.insert(next, Bank::new_from_parent_and_id(&parent, leader, next)); + } + } + } } } @@ -452,148 +286,148 @@ impl Service for ReplayStage { } } -#[cfg(test)] -mod test { - use super::*; - use crate::blocktree::create_new_tmp_ledger; - use crate::cluster_info::{ClusterInfo, Node}; - use crate::entry::create_ticks; - use crate::entry::{next_entry_mut, Entry}; - use crate::fullnode::new_banks_from_blocktree; - use crate::replay_stage::ReplayStage; - use solana_sdk::genesis_block::GenesisBlock; - use solana_sdk::hash::Hash; - use solana_sdk::signature::{Keypair, KeypairUtil}; - use std::fs::remove_dir_all; - use std::sync::atomic::AtomicBool; - use std::sync::mpsc::channel; - use std::sync::{Arc, RwLock}; - - #[test] - fn test_vote_error_replay_stage_correctness() { - // Set up dummy node to host a ReplayStage - let my_keypair = Keypair::new(); - let my_id = my_keypair.pubkey(); - let my_node = Node::new_localhost_with_pubkey(my_id); - - // Create keypair for the leader - let leader_id = Keypair::new().pubkey(); - - let (genesis_block, _mint_keypair) = GenesisBlock::new_with_leader(10_000, leader_id, 500); - - let (my_ledger_path, _last_id) = create_new_tmp_ledger!(&genesis_block); - - // Set up the cluster info - let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); - - // Set up the replay stage - let exit = Arc::new(AtomicBool::new(false)); - let voting_keypair = Arc::new(Keypair::new()); - let (to_leader_sender, _to_leader_receiver) = channel(); - { - let (bank_forks, bank_forks_info, blocktree, l_receiver) = - new_banks_from_blocktree(&my_ledger_path, None); - let bank = bank_forks.working_bank(); - let last_entry_id = bank_forks_info[0].last_entry_id; - - let blocktree = Arc::new(blocktree); - let (replay_stage, _slot_full_receiver, ledger_writer_recv) = ReplayStage::new( - my_keypair.pubkey(), - Some(voting_keypair.clone()), - blocktree.clone(), - &Arc::new(RwLock::new(bank_forks)), - &bank_forks_info, - cluster_info_me.clone(), - exit.clone(), - &to_leader_sender, - l_receiver, - &Arc::new(RpcSubscriptions::default()), - ); - - let keypair = voting_keypair.as_ref(); - let vote = VoteTransaction::new_vote(keypair, 0, bank.last_id(), 0); - cluster_info_me.write().unwrap().push_vote(vote); - - info!("Send ReplayStage an entry, should see it on the ledger writer receiver"); - let next_tick = create_ticks(1, last_entry_id); - blocktree.write_entries(1, 0, 0, next_tick.clone()).unwrap(); - - let received_tick = ledger_writer_recv - .recv() - .expect("Expected to receive an entry on the ledger writer receiver"); - - assert_eq!(next_tick[0], received_tick[0]); - - replay_stage - .close() - .expect("Expect successful ReplayStage exit"); - } - let _ignored = remove_dir_all(&my_ledger_path); - } - - #[test] - fn test_replay_stage_poh_error_entry_receiver() { - // Set up dummy node to host a ReplayStage - let my_keypair = Keypair::new(); - let my_id = my_keypair.pubkey(); - let my_node = Node::new_localhost_with_pubkey(my_id); - // Set up the cluster info - let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); - let (forward_entry_sender, _forward_entry_receiver) = channel(); - let mut last_entry_id = Hash::default(); - let mut current_blob_index = 0; - let mut last_id = Hash::default(); - let mut entries = Vec::new(); - for _ in 0..5 { - let entry = next_entry_mut(&mut last_id, 1, vec![]); //just ticks - entries.push(entry); - } - - let genesis_block = GenesisBlock::new(10_000).0; - let bank = Arc::new(Bank::new(&genesis_block)); - let voting_keypair = Some(Arc::new(Keypair::new())); - let res = ReplayStage::process_entries( - entries.clone(), - &bank, - &cluster_info_me, - &voting_keypair, - &forward_entry_sender, - &mut current_blob_index, - &mut last_entry_id, - &Arc::new(RpcSubscriptions::default()), - ); - - match res { - Ok(_) => (), - Err(e) => assert!(false, "Entries were not sent correctly {:?}", e), - } - - entries.clear(); - for _ in 0..5 { - let entry = Entry::new(&mut Hash::default(), 1, vec![]); //just broken entries - entries.push(entry); - } - - let bank = Arc::new(Bank::new(&genesis_block)); - let res = ReplayStage::process_entries( - entries.clone(), - &bank, - &cluster_info_me, - &voting_keypair, - &forward_entry_sender, - &mut current_blob_index, - &mut last_entry_id, - &Arc::new(RpcSubscriptions::default()), - ); - - match res { - Ok(_) => assert!(false, "Should have failed because entries are broken"), - Err(Error::BlobError(BlobError::VerificationFailed)) => (), - Err(e) => assert!( - false, - "Should have failed because with blob error, instead, got {:?}", - e - ), - } - } -} +// #[cfg(test)] +// mod test { +// use super::*; +// use crate::blocktree::create_new_tmp_ledger; +// use crate::cluster_info::{ClusterInfo, Node}; +// use crate::entry::create_ticks; +// use crate::entry::{next_entry_mut, Entry}; +// use crate::fullnode::new_banks_from_blocktree; +// use crate::replay_stage::ReplayStage; +// use solana_sdk::genesis_block::GenesisBlock; +// use solana_sdk::hash::Hash; +// use solana_sdk::signature::{Keypair, KeypairUtil}; +// use std::fs::remove_dir_all; +// use std::sync::atomic::AtomicBool; +// use std::sync::mpsc::channel; +// use std::sync::{Arc, RwLock}; +// +// #[test] +// fn test_vote_error_replay_stage_correctness() { +// // Set up dummy node to host a ReplayStage +// let my_keypair = Keypair::new(); +// let my_id = my_keypair.pubkey(); +// let my_node = Node::new_localhost_with_pubkey(my_id); +// +// // Create keypair for the leader +// let leader_id = Keypair::new().pubkey(); +// +// let (genesis_block, _mint_keypair) = GenesisBlock::new_with_leader(10_000, leader_id, 500); +// +// let (my_ledger_path, _last_id) = create_new_tmp_ledger!(&genesis_block); +// +// // Set up the cluster info +// let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); +// +// // Set up the replay stage +// let exit = Arc::new(AtomicBool::new(false)); +// let voting_keypair = Arc::new(Keypair::new()); +// let (to_leader_sender, _to_leader_receiver) = channel(); +// { +// let (bank_forks, bank_forks_info, blocktree, l_receiver) = +// new_banks_from_blocktree(&my_ledger_path, None); +// let bank = bank_forks.working_bank(); +// let last_entry_id = bank_forks_info[0].last_entry_id; +// +// let blocktree = Arc::new(blocktree); +// let (replay_stage, _slot_full_receiver, ledger_writer_recv) = ReplayStage::new( +// my_keypair.pubkey(), +// Some(voting_keypair.clone()), +// blocktree.clone(), +// &Arc::new(RwLock::new(bank_forks)), +// &bank_forks_info, +// cluster_info_me.clone(), +// exit.clone(), +// &to_leader_sender, +// l_receiver, +// &Arc::new(RpcSubscriptions::default()), +// ); +// +// let keypair = voting_keypair.as_ref(); +// let vote = VoteTransaction::new_vote(keypair, 0, bank.last_id(), 0); +// cluster_info_me.write().unwrap().push_vote(vote); +// +// info!("Send ReplayStage an entry, should see it on the ledger writer receiver"); +// let next_tick = create_ticks(1, last_entry_id); +// blocktree.write_entries(1, 0, 0, next_tick.clone()).unwrap(); +// +// let received_tick = ledger_writer_recv +// .recv() +// .expect("Expected to receive an entry on the ledger writer receiver"); +// +// assert_eq!(next_tick[0], received_tick[0]); +// +// replay_stage +// .close() +// .expect("Expect successful ReplayStage exit"); +// } +// let _ignored = remove_dir_all(&my_ledger_path); +// } +// +// #[test] +// fn test_replay_stage_poh_error_entry_receiver() { +// // Set up dummy node to host a ReplayStage +// let my_keypair = Keypair::new(); +// let my_id = my_keypair.pubkey(); +// let my_node = Node::new_localhost_with_pubkey(my_id); +// // Set up the cluster info +// let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); +// let (forward_entry_sender, _forward_entry_receiver) = channel(); +// let mut last_entry_id = Hash::default(); +// let mut current_blob_index = 0; +// let mut last_id = Hash::default(); +// let mut entries = Vec::new(); +// for _ in 0..5 { +// let entry = next_entry_mut(&mut last_id, 1, vec![]); //just ticks +// entries.push(entry); +// } +// +// let genesis_block = GenesisBlock::new(10_000).0; +// let bank = Arc::new(Bank::new(&genesis_block)); +// let voting_keypair = Some(Arc::new(Keypair::new())); +// let res = ReplayStage::process_entries( +// entries.clone(), +// &bank, +// &cluster_info_me, +// &voting_keypair, +// &forward_entry_sender, +// &mut current_blob_index, +// &mut last_entry_id, +// &Arc::new(RpcSubscriptions::default()), +// ); +// +// match res { +// Ok(_) => (), +// Err(e) => assert!(false, "Entries were not sent correctly {:?}", e), +// } +// +// entries.clear(); +// for _ in 0..5 { +// let entry = Entry::new(&mut Hash::default(), 1, vec![]); //just broken entries +// entries.push(entry); +// } +// +// let bank = Arc::new(Bank::new(&genesis_block)); +// let res = ReplayStage::process_entries( +// entries.clone(), +// &bank, +// &cluster_info_me, +// &voting_keypair, +// &forward_entry_sender, +// &mut current_blob_index, +// &mut last_entry_id, +// &Arc::new(RpcSubscriptions::default()), +// ); +// +// match res { +// Ok(_) => assert!(false, "Should have failed because entries are broken"), +// Err(Error::BlobError(BlobError::VerificationFailed)) => (), +// Err(e) => assert!( +// false, +// "Should have failed because with blob error, instead, got {:?}", +// e +// ), +// } +// } +// } diff --git a/src/tvu.rs b/src/tvu.rs index 37066ee153fe08..da82e97b52cc93 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -23,7 +23,6 @@ use crate::retransmit_stage::RetransmitStage; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; use crate::storage_stage::{StorageStage, StorageState}; -use solana_runtime::bank::Bank; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -34,7 +33,7 @@ use std::sync::{Arc, RwLock}; use std::thread; pub struct TvuRotationInfo { - pub bank: Arc, // Bank to use + pub tick_height: u64, // Bank to use pub last_entry_id: Hash, // last_entry_id of that bank pub slot: u64, // slot height to initiate a rotation pub leader_id: Pubkey, // leader upon rotation @@ -117,7 +116,7 @@ impl Tvu { exit.clone(), ); - let (replay_stage, slot_full_receiver, forward_entry_receiver) = ReplayStage::new( + let replay_stage = ReplayStage::new( keypair.pubkey(), voting_keypair, blocktree.clone(), @@ -129,6 +128,8 @@ impl Tvu { ledger_signal_receiver, subscriptions, ); + // stub out dummy channel + let (_dummy_sender, mut previous_receiver) = channel(); let blockstream_service = if blockstream.is_some() { let blockstream_service = BlockstreamService::new(