Skip to content

Commit

Permalink
simplify PohService, initialize and freeze TPU bank, skip TPU slot in…
Browse files Browse the repository at this point in the history
… TVU
  • Loading branch information
rob-solana committed Feb 13, 2019
1 parent 1e9a725 commit 709cc0a
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 60 deletions.
3 changes: 2 additions & 1 deletion src/bank_fork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,10 @@ impl BankFork {
let hash = Transaction::hash(&processed_transactions);
// record and unlock will unlock all the successfull transactions
poh.record(hash, processed_transactions).map_err(|e| {
warn!("record failure: {:?}", e);
trace!("record failure: {:?}", e);
match e {
Error::PohRecorderError(PohRecorderError::MaxHeightReached) => {
trace!("max_height reached");
BankError::MaxHeightReached
}
_ => BankError::RecordFailure,
Expand Down
22 changes: 19 additions & 3 deletions src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ impl BankingStage {
// Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its last_id is registered with the bank.
let poh_service =
PohService::new(poh_recorder.clone(), config, to_validator_sender.clone());
let poh_service = PohService::new(poh_recorder.clone(), config);

// Single thread to compute confirmation
let compute_confirmation_service = ComputeLeaderConfirmationService::new(
Expand All @@ -83,6 +82,8 @@ impl BankingStage {
let thread_verified_receiver = shared_verified_receiver.clone();
let thread_poh_recorder = poh_recorder.clone();
let thread_banking_exit = poh_service.poh_exit.clone();
let thread_to_validator_sender = to_validator_sender.clone();

Builder::new()
.name("solana-banking-stage-tx".to_string())
.spawn(move || {
Expand All @@ -105,7 +106,6 @@ impl BankingStage {
break Some(BankingStageReturnType::ChannelDisconnected);
}
Error::BankError(BankError::RecordFailure) => {
warn!("Bank failed to record");
break Some(BankingStageReturnType::ChannelDisconnected);
}
Error::BankError(BankError::MaxHeightReached) => {
Expand All @@ -126,6 +126,19 @@ impl BankingStage {
// Signal exit only on "Some" error
if return_result.is_some() {
thread_banking_exit.store(true, Ordering::Relaxed);
} else {
// TODO: pass current_slot to TPU construction...
let current_slot = thread_bank.active_fork().head().fork_id();

trace!(
"leader for slot {} done at {}",
current_slot,
max_tick_height
);
thread_bank.active_fork().head().freeze();
thread_bank.merge_into_root(current_slot);

thread_to_validator_sender.send(max_tick_height).unwrap();
}
return_result
})
Expand Down Expand Up @@ -257,6 +270,9 @@ impl Service for BankingStage {
self.compute_confirmation_service.join()?;

let poh_return_value = self.poh_service.join()?;

trace!("banking_stage join {:?}", poh_return_value);

match poh_return_value {
Ok(_) => (),
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => {
Expand Down
34 changes: 23 additions & 11 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,13 @@ impl Fullnode {
}

let (scheduled_leader, max_tick_height) = {
let mut leader_scheduler = self.bank.leader_scheduler.write().unwrap();
let leader_scheduler = self.bank.leader_scheduler.read().unwrap();

// A transition is only permitted on the final tick of a slot
assert_eq!(leader_scheduler.num_ticks_left_in_slot(tick_height), 0);
let first_tick_of_next_slot = tick_height + 1;

leader_scheduler.update_tick_height(first_tick_of_next_slot, &self.bank);
//leader_scheduler.update_tick_height(first_tick_of_next_slot, &self.bank);
let slot = leader_scheduler.tick_height_to_slot(first_tick_of_next_slot);
(
leader_scheduler.get_leader_for_slot(slot).unwrap(),
Expand All @@ -364,16 +364,22 @@ impl Fullnode {
}

fn rotate(&mut self, tick_height: u64) -> FullnodeReturnType {
trace!("{:?}: rotate at tick_height={}", self.id, tick_height,);
let was_leader = self.node_services.tpu.is_leader();

trace!(
"{:?}: rotate at tick_height: {}, {} leader",
self.id,
tick_height,
if was_leader { "was" } else { "wasn't" }
);

let (scheduled_leader, max_tick_height) = self.get_next_leader(tick_height);
if scheduled_leader == self.id {
let transition = if was_leader {
debug!("{:?} remaining in leader role", self.id);
debug!("{:?} remaining in leader role at {}", self.id, tick_height);
FullnodeReturnType::LeaderToLeaderRotation
} else {
debug!("{:?} rotating to leader role", self.id);
debug!("{:?} rotating to leader role at {}", self.id, tick_height);
FullnodeReturnType::ValidatorToLeaderRotation
};

Expand Down Expand Up @@ -432,6 +438,7 @@ impl Fullnode {

match self.rotation_receiver.recv_timeout(timeout) {
Ok(tick_height) => {
debug!("received rotation at {}", tick_height);
let transition = self.rotate(tick_height);
debug!("role transition complete: {:?}", transition);
if let Some(ref rotation_notifier) = rotation_notifier {
Expand Down Expand Up @@ -929,15 +936,20 @@ mod tests {
.recv()
.expect("signal for leader -> validator transition");
debug!("received rotation signal: {:?}", rotation_signal);

// Re-send the rotation signal, it'll be received again once the tvu is unpaused
leader.rotation_sender.send(rotation_signal).expect("send");

info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)");
{
let bank_state = leader.bank.active_fork();
let w_last_ids = bank_state.head().last_ids().write().unwrap();
assert!(w_last_ids.tick_height < ticks_per_slot - 1);
}
// info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)");
// {
// let bank_state = leader.bank.fork(0).expect("validator should be at slot 1");
// let w_last_ids = bank_state.head().last_ids().write().unwrap();
// info!(
// "w_last_ids.tick_height: {} ticks_per_slot: {}",
// w_last_ids.tick_height, ticks_per_slot
// );
// assert!(w_last_ids.tick_height < ticks_per_slot - 1);
// }

// Clear the blobs we've received so far. After this rotation, we should
// no longer receive blobs from slot 0
Expand Down
37 changes: 5 additions & 32 deletions src/poh_service.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
//! The `poh_service` module implements a service that records the passing of
//! "ticks", a measure of time in the PoH stream
use crate::poh_recorder::{PohRecorder, PohRecorderError};
use crate::result::Error;
use crate::poh_recorder::PohRecorder;
use crate::result::Result;
use crate::service::Service;
use crate::tpu::TpuRotationSender;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::sleep;
Expand Down Expand Up @@ -46,11 +44,7 @@ impl PohService {
self.join()
}

pub fn new(
poh_recorder: PohRecorder,
config: PohServiceConfig,
to_validator_sender: TpuRotationSender,
) -> Self {
pub fn new(poh_recorder: PohRecorder, config: PohServiceConfig) -> Self {
// PohService is a headless producer, so when it exits it should notify the banking stage.
// Since channel are not used to talk between these threads an AtomicBool is used as a
// signal.
Expand All @@ -61,12 +55,7 @@ impl PohService {
.name("solana-poh-service-tick_producer".to_string())
.spawn(move || {
let mut poh_recorder_ = poh_recorder;
let return_value = Self::tick_producer(
&mut poh_recorder_,
config,
&poh_exit_,
&to_validator_sender,
);
let return_value = Self::tick_producer(&mut poh_recorder_, config, &poh_exit_);
poh_exit_.store(true, Ordering::Relaxed);
return_value
})
Expand All @@ -82,33 +71,19 @@ impl PohService {
poh: &mut PohRecorder,
config: PohServiceConfig,
poh_exit: &AtomicBool,
to_validator_sender: &TpuRotationSender,
) -> Result<()> {
let max_tick_height = poh.max_tick_height();
loop {
match config {
PohServiceConfig::Tick(num) => {
for _ in 1..num {
let res = poh.hash();
if let Err(e) = res {
if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e {
to_validator_sender.send(max_tick_height)?;
}
return Err(e);
}
poh.hash()?;
}
}
PohServiceConfig::Sleep(duration) => {
sleep(duration);
}
}
let res = poh.tick();
if let Err(e) = res {
if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e {
to_validator_sender.send(max_tick_height)?;
}
return Err(e);
}
poh.tick()?;
if poh_exit.load(Ordering::Relaxed) {
return Ok(());
}
Expand Down Expand Up @@ -164,11 +139,9 @@ mod tests {
};

const HASHES_PER_TICK: u64 = 2;
let (sender, _) = channel();
let poh_service = PohService::new(
poh_recorder,
PohServiceConfig::Tick(HASHES_PER_TICK as usize),
sender,
);

// get some events
Expand Down
53 changes: 40 additions & 13 deletions src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl ReplayStage {
exit: Arc<AtomicBool>,
mut current_blob_index: u64,
last_entry_id: Arc<RwLock<Hash>>,
to_leader_sender: &TvuRotationSender,
rotation_sender: &TvuRotationSender,
ledger_signal_sender: SyncSender<bool>,
ledger_signal_receiver: Receiver<bool>,
) -> (Self, EntryReceiver) {
Expand All @@ -205,7 +205,7 @@ impl ReplayStage {
(pause, pause_)
};
let exit_ = exit.clone();
let to_leader_sender = to_leader_sender.clone();
let rotation_sender = rotation_sender.clone();
let t_replay = Builder::new()
.name("solana-replay-stage".to_string())
.spawn(move || {
Expand Down Expand Up @@ -238,6 +238,8 @@ impl ReplayStage {

if current_slot.is_none() {
let new_slot = Self::get_next_slot(
my_id,
&bank,
&blocktree,
prev_slot.expect("prev_slot must exist"),
);
Expand Down Expand Up @@ -273,15 +275,17 @@ impl ReplayStage {
let entry_len = entries.len();
// Fetch the next entries from the database
if !entries.is_empty() {
let slot = current_slot.expect("current_slot must exist");

// TODO: ledger provides from get_slot_entries()
let base_slot = match current_slot.expect("current_slot must exist") {
let base_slot = match slot {
0 => 0,
x => x - 1,
};

if let Err(e) = Self::process_entries(
entries,
current_slot.expect("current_slot must exist"),
slot,
base_slot,
&bank,
&cluster_info,
Expand All @@ -293,7 +297,10 @@ impl ReplayStage {
error!("process_entries failed: {:?}", e);
}

let current_tick_height = bank.active_fork().tick_height();
let current_tick_height = bank
.fork(slot)
.expect("fork for current slot must exist")
.tick_height();

// we've reached the end of a slot, reset our state and check
// for leader rotation
Expand All @@ -305,20 +312,29 @@ impl ReplayStage {
// Check for leader rotation
let leader_id = Self::get_leader_for_next_tick(&bank);

info!(
trace!(
"leader_id: {} last_leader_id: {} my_id: {}",
leader_id, last_leader_id, my_id
leader_id,
last_leader_id,
my_id
);

// TODO: Remove this soon once we boot the leader from ClusterInfo
cluster_info.write().unwrap().set_leader(leader_id);

if leader_id != last_leader_id && my_id == leader_id {
to_leader_sender.send(current_tick_height).unwrap();
}
// construct the leader's bank_state
bank.init_fork(slot + 1, &last_entry_id.read().unwrap(), slot)
.expect("init fork");

// Check for any slots that chain to this one
prev_slot = current_slot;
rotation_sender.send(current_tick_height).unwrap();

// causes prev_slot to advance past my leader slot
prev_slot = Some(slot + 1);
} else {
// Check for any slots that chain to this one
prev_slot = current_slot;
}
current_slot = None;
last_leader_id = leader_id;
continue;
Expand All @@ -331,6 +347,10 @@ impl ReplayStage {
// Update disconnected, exit
break;
}
info!(
"{} replay_stage trying on current_slot {:?}",
my_id, current_slot
);
}
})
.unwrap();
Expand Down Expand Up @@ -371,9 +391,16 @@ impl ReplayStage {
.expect("Scheduled leader should be calculated by this point")
}

fn get_next_slot(blocktree: &Blocktree, slot_index: u64) -> Option<u64> {
fn get_next_slot(
my_id: Pubkey,
bank: &Arc<Bank>,
blocktree: &Blocktree,
slot_index: u64,
) -> Option<u64> {
// Find the next slot that chains to the old slot
let next_slots = blocktree.get_slots_since(&[slot_index]).expect("Db error");
let mut next_slots = blocktree.get_slots_since(&[slot_index]).expect("Db error");
let leader_scheduler = bank.leader_scheduler.read().unwrap();
next_slots.retain(|slot| leader_scheduler.get_leader_for_slot(*slot) != Some(my_id));
next_slots.first().cloned()
}
}
Expand Down

0 comments on commit 709cc0a

Please sign in to comment.