Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix tpu tvu bank race #2707

Merged
merged 2 commits into from
Feb 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 90 additions & 30 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender, SyncSender};
use std::sync::{Arc, RwLock};
use std::thread::{spawn, Result};
use std::thread::{sleep, spawn, Result};
use std::time::{Duration, Instant};

struct NodeServices {
Expand Down Expand Up @@ -66,6 +66,7 @@ pub struct FullnodeConfig {
pub storage_rotate_count: u64,
pub leader_scheduler_config: LeaderSchedulerConfig,
pub ledger_config: BlocktreeConfig,
pub tick_config: PohServiceConfig,
}
impl Default for FullnodeConfig {
fn default() -> Self {
Expand All @@ -80,6 +81,7 @@ impl Default for FullnodeConfig {
storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE,
leader_scheduler_config: LeaderSchedulerConfig::default(),
ledger_config: BlocktreeConfig::default(),
tick_config: PohServiceConfig::default(),
}
}
}
Expand Down Expand Up @@ -315,11 +317,16 @@ impl Fullnode {

fn leader_to_validator(&mut self, tick_height: u64) -> FullnodeReturnType {
trace!(
"leader_to_validator({:?}): tick_height={}",
"leader_to_validator({:?}): tick_height={}, bt: {}",
self.id,
tick_height,
self.bank.tick_height()
);

while self.bank.tick_height() < tick_height {
sleep(Duration::from_millis(10));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's gotta be a smarter way to do this rather than a semi-random sleep too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but that only further justifies a test. A smarter way can be postponed if we add a good test now.

}

let scheduled_leader = {
let mut leader_scheduler = self.bank.leader_scheduler.write().unwrap();

Expand All @@ -338,7 +345,7 @@ impl Fullnode {

if scheduled_leader == self.id {
debug!("node is still the leader");
let last_entry_id = self.node_services.tvu.get_state();
let last_entry_id = self.bank.last_id();
self.validator_to_leader(tick_height, last_entry_id);
FullnodeReturnType::LeaderToLeaderRotation
} else {
Expand Down Expand Up @@ -554,11 +561,17 @@ impl Service for Fullnode {
#[cfg(test)]
mod tests {
use super::*;
use crate::blob_fetch_stage::BlobFetchStage;
use crate::blocktree::{create_tmp_sample_ledger, tmp_copy_ledger};
use crate::entry::make_consecutive_blobs;
use crate::entry::EntrySlice;
use crate::gossip_service::{converge, make_listening_node};
use crate::leader_scheduler::make_active_set_entries;
use crate::streamer::responder;
use std::cmp::min;
use std::fs::remove_dir_all;
use std::sync::atomic::Ordering;
use std::thread::sleep;

#[test]
fn validator_exit() {
Expand Down Expand Up @@ -854,13 +867,12 @@ mod tests {
}

#[test]
#[ignore] // TODO: Make this test less hacky
fn test_tvu_behind() {
solana_logger::setup();

// Make leader node
let ticks_per_slot = 5;
let slots_per_epoch = 2;
let slots_per_epoch = 1;
let leader_keypair = Arc::new(Keypair::new());
let validator_keypair = Arc::new(Keypair::new());

Expand All @@ -885,6 +897,18 @@ mod tests {
slots_per_epoch,
ticks_per_slot * slots_per_epoch,
));
let config = PohServiceConfig::Sleep(Duration::from_millis(200));
fullnode_config.tick_config = config;

info!("Start up a listener");
let blob_receiver_exit = Arc::new(AtomicBool::new(false));
let (_, _, mut listening_node, _) = make_listening_node(&leader_node.info);
let (blob_fetch_sender, blob_fetch_receiver) = channel();
let blob_fetch_stage = BlobFetchStage::new(
Arc::new(listening_node.sockets.tvu.pop().unwrap()),
&blob_fetch_sender,
blob_receiver_exit.clone(),
);

let voting_keypair = VotingKeypair::new_local(&leader_keypair);
info!("Start the bootstrap leader");
Expand All @@ -897,38 +921,50 @@ mod tests {
&fullnode_config,
);

info!("Hold Tvu bank lock to prevent tvu from making progress");
let (rotation_sender, rotation_receiver) = channel();

info!("Pause the Tvu");
let pause_tvu = leader.node_services.tvu.get_pause();
pause_tvu.store(true, Ordering::Relaxed);

// Wait for convergence
converge(&leader_node_info, 2);

info!("Wait for leader -> validator transition");
let signal = leader
.to_validator_receiver
.recv()
.expect("signal for leader -> validator transition");
let (rn_sender, rn_receiver) = channel();
rn_sender.send(signal).expect("send");
leader.to_validator_receiver = rn_receiver;

info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)");
{
let w_last_ids = leader.bank.last_ids().write().unwrap();

info!("Wait for leader -> validator transition");
let signal = leader
.to_validator_receiver
.recv()
.expect("signal for leader -> validator transition");
let (rn_sender, rn_receiver) = channel();
rn_sender.send(signal).expect("send");
leader.to_validator_receiver = rn_receiver;

info!("Make sure the tvu bank is behind");
assert_eq!(w_last_ids.tick_height, 2);
assert!(w_last_ids.tick_height < ticks_per_slot - 1);
}

// Release tvu bank lock, tvu should start making progress again and should signal a
// rotate. After rotation it will still be the slot leader as a new leader schedule has
// not been computed yet (still in epoch 0)
info!("Release tvu bank lock");
let (rotation_sender, rotation_receiver) = channel();
// Clear the blobs we've recieved so far. After this rotation, we should
// no longer receive blobs from slot 0
while let Ok(_) = blob_fetch_receiver.try_recv() {}

let leader_exit = leader.run(Some(rotation_sender));

// Wait for leader_to_validator() function execution to trigger a leader to leader rotation
sleep(Duration::from_millis(1000));

// Tvu bank lock is released here, so tvu should start making progress again and should signal a
// rotation. After rotation it will still be the slot leader as a new leader schedule has
// not been computed yet (still in epoch 0). In the next epoch (epoch 1), the node will
// transition to a validator.
info!("Unpause the Tvu");
pause_tvu.store(false, Ordering::Relaxed);
let expected_rotations = vec![
(FullnodeReturnType::LeaderToLeaderRotation, ticks_per_slot),
(
FullnodeReturnType::LeaderToLeaderRotation,
2 * ticks_per_slot,
),
(
FullnodeReturnType::LeaderToValidatorRotation,
3 * ticks_per_slot,
2 * ticks_per_slot,
),
];

Expand All @@ -943,6 +979,28 @@ mod tests {

info!("Shut down");
leader_exit();

// Make sure that after rotation we don't receive any blobs from slot 0 (make sure
// broadcast started again at the correct place)
while let Ok(new_blobs) = blob_fetch_receiver.try_recv() {
for blob in new_blobs {
assert_ne!(blob.read().unwrap().slot(), 0);
}
}

// Check the ledger to make sure the PoH chains
{
let blocktree = Blocktree::open(&leader_ledger_path).unwrap();
let entries: Vec<_> = (0..3)
.flat_map(|slot_height| blocktree.get_slot_entries(slot_height, 0, None).unwrap())
.collect();

assert!(entries[1..].verify(&entries[0].id))
}

blob_receiver_exit.store(true, Ordering::Relaxed);
blob_fetch_stage.join().unwrap();

Blocktree::destroy(&leader_ledger_path).expect("Expected successful database destruction");
let _ignored = remove_dir_all(&leader_ledger_path).unwrap();
}
Expand Down Expand Up @@ -989,8 +1047,10 @@ mod tests {

let non_tick_active_entries_len = active_set_entries.len() - num_ending_ticks as usize;
let remaining_ticks_in_zeroth_slot = ticks_per_block - num_genesis_ticks;
let entries_for_zeroth_slot =
non_tick_active_entries_len + remaining_ticks_in_zeroth_slot as usize;
let entries_for_zeroth_slot = min(
active_set_entries.len(),
non_tick_active_entries_len + remaining_ticks_in_zeroth_slot as usize,
);
let entry_chunks: Vec<_> = active_set_entries[entries_for_zeroth_slot..]
.chunks(ticks_per_block as usize)
.collect();
Expand Down
82 changes: 81 additions & 1 deletion src/gossip_service.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
//! The `gossip_service` module implements the network control plane.

use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::cluster_info::{ClusterInfo, Node, NodeInfo};
use crate::service::Service;
use crate::streamer;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::{self, JoinHandle};
use std::time::Duration;

pub struct GossipService {
exit: Arc<AtomicBool>,
Expand Down Expand Up @@ -51,6 +55,82 @@ impl GossipService {
}
}

pub fn make_listening_node(
leader: &NodeInfo,
) -> (GossipService, Arc<RwLock<ClusterInfo>>, Node, Pubkey) {
let keypair = Keypair::new();
let exit = Arc::new(AtomicBool::new(false));
let new_node = Node::new_localhost_with_pubkey(keypair.pubkey());
let new_node_info = new_node.info.clone();
let id = new_node.info.id;
let mut new_node_cluster_info = ClusterInfo::new_with_keypair(new_node_info, Arc::new(keypair));
new_node_cluster_info.insert_info(leader.clone());
new_node_cluster_info.set_leader(leader.id);
let new_node_cluster_info_ref = Arc::new(RwLock::new(new_node_cluster_info));
let gossip_service = GossipService::new(
&new_node_cluster_info_ref,
None,
new_node
.sockets
.gossip
.try_clone()
.expect("Failed to clone gossip"),
exit.clone(),
);

(gossip_service, new_node_cluster_info_ref, new_node, id)
}

pub fn converge(node: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
info!("Wait for convergence with {} nodes", num_nodes);
// Let's spy on the network
let (gossip_service, spy_ref, id) = make_spy_node(node);
trace!(
"converge spy_node {} looking for at least {} nodes",
id,
num_nodes
);

// Wait for the cluster to converge
for _ in 0..15 {
let rpc_peers = spy_ref.read().unwrap().rpc_peers();
if rpc_peers.len() == num_nodes {
debug!("converge found {} nodes: {:?}", rpc_peers.len(), rpc_peers);
gossip_service.close().unwrap();
return rpc_peers;
}
debug!(
"converge found {} nodes, need {} more",
rpc_peers.len(),
num_nodes - rpc_peers.len()
);
sleep(Duration::new(1, 0));
}
panic!("Failed to converge");
}

pub fn make_spy_node(leader: &NodeInfo) -> (GossipService, Arc<RwLock<ClusterInfo>>, Pubkey) {
let keypair = Keypair::new();
let exit = Arc::new(AtomicBool::new(false));
let mut spy = Node::new_localhost_with_pubkey(keypair.pubkey());
let id = spy.info.id;
let daddr = "0.0.0.0:0".parse().unwrap();
spy.info.tvu = daddr;
spy.info.rpc = daddr;
let mut spy_cluster_info = ClusterInfo::new_with_keypair(spy.info, Arc::new(keypair));
spy_cluster_info.insert_info(leader.clone());
spy_cluster_info.set_leader(leader.id);
let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info));
let gossip_service = GossipService::new(
&spy_cluster_info_ref,
None,
spy.sockets.gossip,
exit.clone(),
);

(gossip_service, spy_cluster_info_ref, id)
}

impl Service for GossipService {
type JoinReturnType = ();

Expand Down
Loading