Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fullnode: Merge leader_to_validator/validator_to_leader #2720

Merged
merged 1 commit into from
Feb 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 71 additions & 109 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use crate::rpc_pubsub::PubSubService;
use crate::service::Service;
use crate::storage_stage::StorageState;
use crate::streamer::BlobSender;
use crate::tpu::{Tpu, TpuReturnType, TpuRotationReceiver};
use crate::tvu::{Sockets, Tvu, TvuReturnType, TvuRotationReceiver};
use crate::tpu::{Tpu, TpuRotationReceiver, TpuRotationSender};
use crate::tvu::{Sockets, Tvu};
use crate::voting_keypair::VotingKeypair;
use log::Level;
use solana_sdk::hash::Hash;
Expand Down Expand Up @@ -105,8 +105,8 @@ pub struct Fullnode {
tpu_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket,
node_services: NodeServices,
to_leader_receiver: TvuRotationReceiver,
to_validator_receiver: TpuRotationReceiver,
rotation_sender: TpuRotationSender,
rotation_receiver: TpuRotationReceiver,
blob_sender: BlobSender,
}

Expand Down Expand Up @@ -250,9 +250,8 @@ impl Fullnode {
Some(Arc::new(voting_keypair))
};

// Setup channels for rotation indications
let (to_leader_sender, to_leader_receiver) = channel();
let (to_validator_sender, to_validator_receiver) = channel();
// Setup channel for rotation indications
let (rotation_sender, rotation_receiver) = channel();

let blob_index = Self::get_consumed_for_slot(&blocktree, slot_height);

Expand All @@ -266,7 +265,7 @@ impl Fullnode {
sockets,
blocktree.clone(),
config.storage_rotate_count,
to_leader_sender,
&rotation_sender,
&storage_state,
config.entry_stream.as_ref(),
ledger_signal_sender,
Expand All @@ -290,7 +289,7 @@ impl Fullnode {
blob_index,
&last_entry_id,
id,
&to_validator_sender,
&rotation_sender,
&blob_sender,
scheduled_leader == id,
);
Expand All @@ -309,8 +308,8 @@ impl Fullnode {
exit,
tpu_sockets: node.sockets.tpu,
broadcast_socket: node.sockets.broadcast,
to_leader_receiver,
to_validator_receiver,
rotation_sender,
rotation_receiver,
blob_sender,
}
}
Expand Down Expand Up @@ -360,21 +359,45 @@ impl Fullnode {
(scheduled_leader, max_tick_height)
}

fn leader_to_validator(&mut self, tick_height: u64) -> FullnodeReturnType {
trace!(
"leader_to_validator({:?}): tick_height={}",
self.id,
tick_height,
);

let (scheduled_leader, _max_tick_height) = self.get_next_leader(tick_height);
fn rotate(&mut self, tick_height: u64) -> FullnodeReturnType {
trace!("{:?}: rotate at tick_height={}", self.id, tick_height,);
let was_leader = self.node_services.tpu.is_leader();

let (scheduled_leader, max_tick_height) = self.get_next_leader(tick_height);
if scheduled_leader == self.id {
debug!("node is still the leader");
let transition = if was_leader {
debug!("{:?} remaining in leader role", self.id);
FullnodeReturnType::LeaderToLeaderRotation
} else {
debug!("{:?} rotating to leader role", self.id);
FullnodeReturnType::ValidatorToLeaderRotation
};

let last_entry_id = self.bank.last_id();
self.validator_to_leader(tick_height, last_entry_id);
FullnodeReturnType::LeaderToLeaderRotation

self.node_services.tpu.switch_to_leader(
&Arc::new(self.bank.copy_for_tpu()),
PohServiceConfig::default(),
self.tpu_sockets
.iter()
.map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
.collect(),
self.broadcast_socket
.try_clone()
.expect("Failed to clone broadcast socket"),
self.cluster_info.clone(),
self.sigverify_disabled,
max_tick_height,
0,
&last_entry_id,
self.id,
&self.rotation_sender,
&self.blob_sender,
);

transition
} else {
debug!("{:?} rotating to validator role", self.id);
self.node_services.tpu.switch_to_forwarder(
self.tpu_sockets
.iter()
Expand All @@ -386,73 +409,6 @@ impl Fullnode {
}
}

pub fn validator_to_leader(&mut self, tick_height: u64, last_entry_id: Hash) {
trace!(
"validator_to_leader({:?}): tick_height={} last_entry_id={}",
self.id,
tick_height,
last_entry_id,
);

let (scheduled_leader, max_tick_height) = self.get_next_leader(tick_height);
assert_eq!(scheduled_leader, self.id, "node is not the leader");

let (to_validator_sender, to_validator_receiver) = channel();
self.to_validator_receiver = to_validator_receiver;
self.node_services.tpu.switch_to_leader(
&Arc::new(self.bank.copy_for_tpu()),
PohServiceConfig::default(),
self.tpu_sockets
.iter()
.map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
.collect(),
self.broadcast_socket
.try_clone()
.expect("Failed to clone broadcast socket"),
self.cluster_info.clone(),
self.sigverify_disabled,
max_tick_height,
0,
&last_entry_id,
self.id,
&to_validator_sender,
&self.blob_sender,
)
}

fn handle_role_transition(&mut self) -> Option<(FullnodeReturnType, u64)> {
let timeout = Duration::from_secs(1);
loop {
if self.exit.load(Ordering::Relaxed) {
return None;
}

if self.node_services.tpu.is_leader() {
let should_be_forwarder = self.to_validator_receiver.recv_timeout(timeout);
match should_be_forwarder {
Ok(TpuReturnType::LeaderRotation(tick_height)) => {
return Some((self.leader_to_validator(tick_height), tick_height + 1));
}
Err(RecvTimeoutError::Timeout) => continue,
_ => return None,
}
} else {
let should_be_leader = self.to_leader_receiver.recv_timeout(timeout);
match should_be_leader {
Ok(TvuReturnType::LeaderRotation(tick_height, last_entry_id)) => {
self.validator_to_leader(tick_height, last_entry_id);
return Some((
FullnodeReturnType::ValidatorToLeaderRotation,
tick_height + 1,
));
}
Err(RecvTimeoutError::Timeout) => continue,
_ => return None,
}
}
}
}

// Runs a thread to manage node role transitions. The returned closure can be used to signal the
// node to exit.
pub fn run(
Expand All @@ -461,22 +417,28 @@ impl Fullnode {
) -> impl FnOnce() {
let (sender, receiver) = channel();
let exit = self.exit.clone();
let timeout = Duration::from_secs(1);
spawn(move || loop {
let status = self.handle_role_transition();
match status {
None => {
debug!("node shutdown requested");
self.close().expect("Unable to close node");
sender.send(true).expect("Unable to signal exit");
break;
}
Some(transition) => {
debug!("role_transition complete: {:?}", transition);
if self.exit.load(Ordering::Relaxed) {
debug!("node shutdown requested");
self.close().expect("Unable to close node");
sender.send(true).expect("Unable to signal exit");
break;
}

match self.rotation_receiver.recv_timeout(timeout) {
Ok(tick_height) => {
let transition = self.rotate(tick_height);
debug!("role transition complete: {:?}", transition);
if let Some(ref rotation_notifier) = rotation_notifier {
rotation_notifier.send(transition).unwrap();
rotation_notifier
.send((transition, tick_height + 1))
.unwrap();
}
}
};
Err(RecvTimeoutError::Timeout) => continue,
_ => (),
}
});
move || {
exit.store(true, Ordering::Relaxed);
Expand Down Expand Up @@ -927,7 +889,7 @@ mod tests {

let voting_keypair = VotingKeypair::new_local(&leader_keypair);
info!("Start the bootstrap leader");
let mut leader = Fullnode::new(
let leader = Fullnode::new(
leader_node,
&leader_keypair,
&leader_ledger_path,
Expand All @@ -946,27 +908,27 @@ mod tests {
converge(&leader_node_info, 2);

info!("Wait for leader -> validator transition");
let signal = leader
.to_validator_receiver
let rotation_signal = leader
.rotation_receiver
.recv()
.expect("signal for leader -> validator transition");
let (rn_sender, rn_receiver) = channel();
rn_sender.send(signal).expect("send");
leader.to_validator_receiver = rn_receiver;
debug!("received rotation signal: {:?}", rotation_signal);
// Re-send the rotation signal, it'll be received again once the tvu is unpaused
leader.rotation_sender.send(rotation_signal).expect("send");

info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)");
{
let w_last_ids = leader.bank.last_ids().write().unwrap();
assert!(w_last_ids.tick_height < ticks_per_slot - 1);
}

// Clear the blobs we've recieved so far. After this rotation, we should
// Clear the blobs we've received so far. After this rotation, we should
// no longer receive blobs from slot 0
while let Ok(_) = blob_fetch_receiver.try_recv() {}

let leader_exit = leader.run(Some(rotation_sender));

// Wait for leader_to_validator() function execution to trigger a leader to leader rotation
// Wait for Tpu bank to progress while the Tvu bank is stuck
sleep(Duration::from_millis(1000));

// Tvu bank lock is released here, so tvu should start making progress again and should signal a
Expand Down
8 changes: 3 additions & 5 deletions src/poh_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::poh_recorder::{PohRecorder, PohRecorderError};
use crate::result::Error;
use crate::result::Result;
use crate::service::Service;
use crate::tpu::{TpuReturnType, TpuRotationSender};
use crate::tpu::TpuRotationSender;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::sleep;
Expand Down Expand Up @@ -92,8 +92,7 @@ impl PohService {
let res = poh.hash();
if let Err(e) = res {
if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e {
to_validator_sender
.send(TpuReturnType::LeaderRotation(max_tick_height))?;
to_validator_sender.send(max_tick_height)?;
}
return Err(e);
}
Expand All @@ -106,8 +105,7 @@ impl PohService {
let res = poh.tick();
if let Err(e) = res {
if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e {
// Leader rotation should only happen if a max_tick_height was specified
to_validator_sender.send(TpuReturnType::LeaderRotation(max_tick_height))?;
to_validator_sender.send(max_tick_height)?;
}
return Err(e);
}
Expand Down
Loading