Skip to content

Commit

Permalink
Merge leader_to_validator/validator_to_leader
Browse files Browse the repository at this point in the history
  • Loading branch information
mvines committed Feb 11, 2019
1 parent 4ae1783 commit 095afdf
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 165 deletions.
180 changes: 71 additions & 109 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use crate::rpc_pubsub::PubSubService;
use crate::service::Service;
use crate::storage_stage::StorageState;
use crate::streamer::BlobSender;
use crate::tpu::{Tpu, TpuReturnType, TpuRotationReceiver};
use crate::tvu::{Sockets, Tvu, TvuReturnType, TvuRotationReceiver};
use crate::tpu::{Tpu, TpuRotationReceiver, TpuRotationSender};
use crate::tvu::{Sockets, Tvu};
use crate::voting_keypair::VotingKeypair;
use log::Level;
use solana_sdk::hash::Hash;
Expand Down Expand Up @@ -105,8 +105,8 @@ pub struct Fullnode {
tpu_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket,
node_services: NodeServices,
to_leader_receiver: TvuRotationReceiver,
to_validator_receiver: TpuRotationReceiver,
rotation_sender: TpuRotationSender,
rotation_receiver: TpuRotationReceiver,
blob_sender: BlobSender,
}

Expand Down Expand Up @@ -250,9 +250,8 @@ impl Fullnode {
Some(Arc::new(voting_keypair))
};

// Setup channels for rotation indications
let (to_leader_sender, to_leader_receiver) = channel();
let (to_validator_sender, to_validator_receiver) = channel();
// Setup channel for rotation indications
let (rotation_sender, rotation_receiver) = channel();

let blob_index = Self::get_consumed_for_slot(&blocktree, slot_height);

Expand All @@ -266,7 +265,7 @@ impl Fullnode {
sockets,
blocktree.clone(),
config.storage_rotate_count,
to_leader_sender,
&rotation_sender,
&storage_state,
config.entry_stream.as_ref(),
ledger_signal_sender,
Expand All @@ -290,7 +289,7 @@ impl Fullnode {
blob_index,
&last_entry_id,
id,
&to_validator_sender,
&rotation_sender,
&blob_sender,
scheduled_leader == id,
);
Expand All @@ -309,8 +308,8 @@ impl Fullnode {
exit,
tpu_sockets: node.sockets.tpu,
broadcast_socket: node.sockets.broadcast,
to_leader_receiver,
to_validator_receiver,
rotation_sender,
rotation_receiver,
blob_sender,
}
}
Expand Down Expand Up @@ -360,21 +359,45 @@ impl Fullnode {
(scheduled_leader, max_tick_height)
}

fn leader_to_validator(&mut self, tick_height: u64) -> FullnodeReturnType {
trace!(
"leader_to_validator({:?}): tick_height={}",
self.id,
tick_height,
);

let (scheduled_leader, _max_tick_height) = self.get_next_leader(tick_height);
fn rotate(&mut self, tick_height: u64) -> FullnodeReturnType {
trace!("{:?}: rotate at tick_height={}", self.id, tick_height,);
let was_leader = self.node_services.tpu.is_leader();

let (scheduled_leader, max_tick_height) = self.get_next_leader(tick_height);
if scheduled_leader == self.id {
debug!("node is still the leader");
let transition = if was_leader {
debug!("{:?} remaining in leader role", self.id);
FullnodeReturnType::LeaderToLeaderRotation
} else {
debug!("{:?} rotating to leader role", self.id);
FullnodeReturnType::ValidatorToLeaderRotation
};

let last_entry_id = self.bank.last_id();
self.validator_to_leader(tick_height, last_entry_id);
FullnodeReturnType::LeaderToLeaderRotation

self.node_services.tpu.switch_to_leader(
&Arc::new(self.bank.copy_for_tpu()),
PohServiceConfig::default(),
self.tpu_sockets
.iter()
.map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
.collect(),
self.broadcast_socket
.try_clone()
.expect("Failed to clone broadcast socket"),
self.cluster_info.clone(),
self.sigverify_disabled,
max_tick_height,
0,
&last_entry_id,
self.id,
&self.rotation_sender,
&self.blob_sender,
);

transition
} else {
debug!("{:?} rotating to validator role", self.id);
self.node_services.tpu.switch_to_forwarder(
self.tpu_sockets
.iter()
Expand All @@ -386,73 +409,6 @@ impl Fullnode {
}
}

pub fn validator_to_leader(&mut self, tick_height: u64, last_entry_id: Hash) {
trace!(
"validator_to_leader({:?}): tick_height={} last_entry_id={}",
self.id,
tick_height,
last_entry_id,
);

let (scheduled_leader, max_tick_height) = self.get_next_leader(tick_height);
assert_eq!(scheduled_leader, self.id, "node is not the leader");

let (to_validator_sender, to_validator_receiver) = channel();
self.to_validator_receiver = to_validator_receiver;
self.node_services.tpu.switch_to_leader(
&Arc::new(self.bank.copy_for_tpu()),
PohServiceConfig::default(),
self.tpu_sockets
.iter()
.map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
.collect(),
self.broadcast_socket
.try_clone()
.expect("Failed to clone broadcast socket"),
self.cluster_info.clone(),
self.sigverify_disabled,
max_tick_height,
0,
&last_entry_id,
self.id,
&to_validator_sender,
&self.blob_sender,
)
}

fn handle_role_transition(&mut self) -> Option<(FullnodeReturnType, u64)> {
let timeout = Duration::from_secs(1);
loop {
if self.exit.load(Ordering::Relaxed) {
return None;
}

if self.node_services.tpu.is_leader() {
let should_be_forwarder = self.to_validator_receiver.recv_timeout(timeout);
match should_be_forwarder {
Ok(TpuReturnType::LeaderRotation(tick_height)) => {
return Some((self.leader_to_validator(tick_height), tick_height + 1));
}
Err(RecvTimeoutError::Timeout) => continue,
_ => return None,
}
} else {
let should_be_leader = self.to_leader_receiver.recv_timeout(timeout);
match should_be_leader {
Ok(TvuReturnType::LeaderRotation(tick_height, last_entry_id)) => {
self.validator_to_leader(tick_height, last_entry_id);
return Some((
FullnodeReturnType::ValidatorToLeaderRotation,
tick_height + 1,
));
}
Err(RecvTimeoutError::Timeout) => continue,
_ => return None,
}
}
}
}

// Runs a thread to manage node role transitions. The returned closure can be used to signal the
// node to exit.
pub fn run(
Expand All @@ -461,22 +417,28 @@ impl Fullnode {
) -> impl FnOnce() {
let (sender, receiver) = channel();
let exit = self.exit.clone();
let timeout = Duration::from_secs(1);
spawn(move || loop {
let status = self.handle_role_transition();
match status {
None => {
debug!("node shutdown requested");
self.close().expect("Unable to close node");
sender.send(true).expect("Unable to signal exit");
break;
}
Some(transition) => {
debug!("role_transition complete: {:?}", transition);
if self.exit.load(Ordering::Relaxed) {
debug!("node shutdown requested");
self.close().expect("Unable to close node");
sender.send(true).expect("Unable to signal exit");
break;
}

match self.rotation_receiver.recv_timeout(timeout) {
Ok(tick_height) => {
let transition = self.rotate(tick_height);
debug!("role transition complete: {:?}", transition);
if let Some(ref rotation_notifier) = rotation_notifier {
rotation_notifier.send(transition).unwrap();
rotation_notifier
.send((transition, tick_height + 1))
.unwrap();
}
}
};
Err(RecvTimeoutError::Timeout) => continue,
_ => (),
}
});
move || {
exit.store(true, Ordering::Relaxed);
Expand Down Expand Up @@ -927,7 +889,7 @@ mod tests {

let voting_keypair = VotingKeypair::new_local(&leader_keypair);
info!("Start the bootstrap leader");
let mut leader = Fullnode::new(
let leader = Fullnode::new(
leader_node,
&leader_keypair,
&leader_ledger_path,
Expand All @@ -946,27 +908,27 @@ mod tests {
converge(&leader_node_info, 2);

info!("Wait for leader -> validator transition");
let signal = leader
.to_validator_receiver
let rotation_signal = leader
.rotation_receiver
.recv()
.expect("signal for leader -> validator transition");
let (rn_sender, rn_receiver) = channel();
rn_sender.send(signal).expect("send");
leader.to_validator_receiver = rn_receiver;
debug!("received rotation signal: {:?}", rotation_signal);
// Re-send the rotation signal, it'll be received again once the tvu is unpaused
leader.rotation_sender.send(rotation_signal).expect("send");

info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)");
{
let w_last_ids = leader.bank.last_ids().write().unwrap();
assert!(w_last_ids.tick_height < ticks_per_slot - 1);
}

// Clear the blobs we've recieved so far. After this rotation, we should
// Clear the blobs we've received so far. After this rotation, we should
// no longer receive blobs from slot 0
while let Ok(_) = blob_fetch_receiver.try_recv() {}

let leader_exit = leader.run(Some(rotation_sender));

// Wait for leader_to_validator() function execution to trigger a leader to leader rotation
// Wait for Tpu bank to progress while the Tvu bank is stuck
sleep(Duration::from_millis(1000));

// Tvu bank lock is released here, so tvu should start making progress again and should signal a
Expand Down
8 changes: 3 additions & 5 deletions src/poh_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::poh_recorder::{PohRecorder, PohRecorderError};
use crate::result::Error;
use crate::result::Result;
use crate::service::Service;
use crate::tpu::{TpuReturnType, TpuRotationSender};
use crate::tpu::TpuRotationSender;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::sleep;
Expand Down Expand Up @@ -92,8 +92,7 @@ impl PohService {
let res = poh.hash();
if let Err(e) = res {
if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e {
to_validator_sender
.send(TpuReturnType::LeaderRotation(max_tick_height))?;
to_validator_sender.send(max_tick_height)?;
}
return Err(e);
}
Expand All @@ -106,8 +105,7 @@ impl PohService {
let res = poh.tick();
if let Err(e) = res {
if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e {
// Leader rotation should only happen if a max_tick_height was specified
to_validator_sender.send(TpuReturnType::LeaderRotation(max_tick_height))?;
to_validator_sender.send(max_tick_height)?;
}
return Err(e);
}
Expand Down
Loading

0 comments on commit 095afdf

Please sign in to comment.