Skip to content

Commit

Permalink
fix tpu tvu bank race (#2707)
Browse files Browse the repository at this point in the history
* Fix tpu tvu bank race

* Test highlighting race between tvu and tpu banks during leader to leader transitions
  • Loading branch information
carllin authored Feb 11, 2019
1 parent 02c0098 commit 4b38ecd
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 111 deletions.
120 changes: 90 additions & 30 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender, SyncSender};
use std::sync::{Arc, RwLock};
use std::thread::{spawn, Result};
use std::thread::{sleep, spawn, Result};
use std::time::{Duration, Instant};

struct NodeServices {
Expand Down Expand Up @@ -66,6 +66,7 @@ pub struct FullnodeConfig {
pub storage_rotate_count: u64,
pub leader_scheduler_config: LeaderSchedulerConfig,
pub ledger_config: BlocktreeConfig,
pub tick_config: PohServiceConfig,
}
impl Default for FullnodeConfig {
fn default() -> Self {
Expand All @@ -80,6 +81,7 @@ impl Default for FullnodeConfig {
storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE,
leader_scheduler_config: LeaderSchedulerConfig::default(),
ledger_config: BlocktreeConfig::default(),
tick_config: PohServiceConfig::default(),
}
}
}
Expand Down Expand Up @@ -315,11 +317,16 @@ impl Fullnode {

fn leader_to_validator(&mut self, tick_height: u64) -> FullnodeReturnType {
trace!(
"leader_to_validator({:?}): tick_height={}",
"leader_to_validator({:?}): tick_height={}, bt: {}",
self.id,
tick_height,
self.bank.tick_height()
);

while self.bank.tick_height() < tick_height {
sleep(Duration::from_millis(10));
}

let scheduled_leader = {
let mut leader_scheduler = self.bank.leader_scheduler.write().unwrap();

Expand All @@ -338,7 +345,7 @@ impl Fullnode {

if scheduled_leader == self.id {
debug!("node is still the leader");
let last_entry_id = self.node_services.tvu.get_state();
let last_entry_id = self.bank.last_id();
self.validator_to_leader(tick_height, last_entry_id);
FullnodeReturnType::LeaderToLeaderRotation
} else {
Expand Down Expand Up @@ -554,11 +561,17 @@ impl Service for Fullnode {
#[cfg(test)]
mod tests {
use super::*;
use crate::blob_fetch_stage::BlobFetchStage;
use crate::blocktree::{create_tmp_sample_ledger, tmp_copy_ledger};
use crate::entry::make_consecutive_blobs;
use crate::entry::EntrySlice;
use crate::gossip_service::{converge, make_listening_node};
use crate::leader_scheduler::make_active_set_entries;
use crate::streamer::responder;
use std::cmp::min;
use std::fs::remove_dir_all;
use std::sync::atomic::Ordering;
use std::thread::sleep;

#[test]
fn validator_exit() {
Expand Down Expand Up @@ -854,13 +867,12 @@ mod tests {
}

#[test]
#[ignore] // TODO: Make this test less hacky
fn test_tvu_behind() {
solana_logger::setup();

// Make leader node
let ticks_per_slot = 5;
let slots_per_epoch = 2;
let slots_per_epoch = 1;
let leader_keypair = Arc::new(Keypair::new());
let validator_keypair = Arc::new(Keypair::new());

Expand All @@ -885,6 +897,18 @@ mod tests {
slots_per_epoch,
ticks_per_slot * slots_per_epoch,
));
let config = PohServiceConfig::Sleep(Duration::from_millis(200));
fullnode_config.tick_config = config;

info!("Start up a listener");
let blob_receiver_exit = Arc::new(AtomicBool::new(false));
let (_, _, mut listening_node, _) = make_listening_node(&leader_node.info);
let (blob_fetch_sender, blob_fetch_receiver) = channel();
let blob_fetch_stage = BlobFetchStage::new(
Arc::new(listening_node.sockets.tvu.pop().unwrap()),
&blob_fetch_sender,
blob_receiver_exit.clone(),
);

let voting_keypair = VotingKeypair::new_local(&leader_keypair);
info!("Start the bootstrap leader");
Expand All @@ -897,38 +921,50 @@ mod tests {
&fullnode_config,
);

info!("Hold Tvu bank lock to prevent tvu from making progress");
let (rotation_sender, rotation_receiver) = channel();

info!("Pause the Tvu");
let pause_tvu = leader.node_services.tvu.get_pause();
pause_tvu.store(true, Ordering::Relaxed);

// Wait for convergence
converge(&leader_node_info, 2);

info!("Wait for leader -> validator transition");
let signal = leader
.to_validator_receiver
.recv()
.expect("signal for leader -> validator transition");
let (rn_sender, rn_receiver) = channel();
rn_sender.send(signal).expect("send");
leader.to_validator_receiver = rn_receiver;

info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)");
{
let w_last_ids = leader.bank.last_ids().write().unwrap();

info!("Wait for leader -> validator transition");
let signal = leader
.to_validator_receiver
.recv()
.expect("signal for leader -> validator transition");
let (rn_sender, rn_receiver) = channel();
rn_sender.send(signal).expect("send");
leader.to_validator_receiver = rn_receiver;

info!("Make sure the tvu bank is behind");
assert_eq!(w_last_ids.tick_height, 2);
assert!(w_last_ids.tick_height < ticks_per_slot - 1);
}

// Release tvu bank lock, tvu should start making progress again and should signal a
// rotate. After rotation it will still be the slot leader as a new leader schedule has
// not been computed yet (still in epoch 0)
info!("Release tvu bank lock");
let (rotation_sender, rotation_receiver) = channel();
// Clear the blobs we've recieved so far. After this rotation, we should
// no longer receive blobs from slot 0
while let Ok(_) = blob_fetch_receiver.try_recv() {}

let leader_exit = leader.run(Some(rotation_sender));

// Wait for leader_to_validator() function execution to trigger a leader to leader rotation
sleep(Duration::from_millis(1000));

// Tvu bank lock is released here, so tvu should start making progress again and should signal a
// rotation. After rotation it will still be the slot leader as a new leader schedule has
// not been computed yet (still in epoch 0). In the next epoch (epoch 1), the node will
// transition to a validator.
info!("Unpause the Tvu");
pause_tvu.store(false, Ordering::Relaxed);
let expected_rotations = vec![
(FullnodeReturnType::LeaderToLeaderRotation, ticks_per_slot),
(
FullnodeReturnType::LeaderToLeaderRotation,
2 * ticks_per_slot,
),
(
FullnodeReturnType::LeaderToValidatorRotation,
3 * ticks_per_slot,
2 * ticks_per_slot,
),
];

Expand All @@ -943,6 +979,28 @@ mod tests {

info!("Shut down");
leader_exit();

// Make sure that after rotation we don't receive any blobs from slot 0 (make sure
// broadcast started again at the correct place)
while let Ok(new_blobs) = blob_fetch_receiver.try_recv() {
for blob in new_blobs {
assert_ne!(blob.read().unwrap().slot(), 0);
}
}

// Check the ledger to make sure the PoH chains
{
let blocktree = Blocktree::open(&leader_ledger_path).unwrap();
let entries: Vec<_> = (0..3)
.flat_map(|slot_height| blocktree.get_slot_entries(slot_height, 0, None).unwrap())
.collect();

assert!(entries[1..].verify(&entries[0].id))
}

blob_receiver_exit.store(true, Ordering::Relaxed);
blob_fetch_stage.join().unwrap();

Blocktree::destroy(&leader_ledger_path).expect("Expected successful database destruction");
let _ignored = remove_dir_all(&leader_ledger_path).unwrap();
}
Expand Down Expand Up @@ -989,8 +1047,10 @@ mod tests {

let non_tick_active_entries_len = active_set_entries.len() - num_ending_ticks as usize;
let remaining_ticks_in_zeroth_slot = ticks_per_block - num_genesis_ticks;
let entries_for_zeroth_slot =
non_tick_active_entries_len + remaining_ticks_in_zeroth_slot as usize;
let entries_for_zeroth_slot = min(
active_set_entries.len(),
non_tick_active_entries_len + remaining_ticks_in_zeroth_slot as usize,
);
let entry_chunks: Vec<_> = active_set_entries[entries_for_zeroth_slot..]
.chunks(ticks_per_block as usize)
.collect();
Expand Down
82 changes: 81 additions & 1 deletion src/gossip_service.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
//! The `gossip_service` module implements the network control plane.
use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::cluster_info::{ClusterInfo, Node, NodeInfo};
use crate::service::Service;
use crate::streamer;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::{self, JoinHandle};
use std::time::Duration;

pub struct GossipService {
exit: Arc<AtomicBool>,
Expand Down Expand Up @@ -51,6 +55,82 @@ impl GossipService {
}
}

pub fn make_listening_node(
leader: &NodeInfo,
) -> (GossipService, Arc<RwLock<ClusterInfo>>, Node, Pubkey) {
let keypair = Keypair::new();
let exit = Arc::new(AtomicBool::new(false));
let new_node = Node::new_localhost_with_pubkey(keypair.pubkey());
let new_node_info = new_node.info.clone();
let id = new_node.info.id;
let mut new_node_cluster_info = ClusterInfo::new_with_keypair(new_node_info, Arc::new(keypair));
new_node_cluster_info.insert_info(leader.clone());
new_node_cluster_info.set_leader(leader.id);
let new_node_cluster_info_ref = Arc::new(RwLock::new(new_node_cluster_info));
let gossip_service = GossipService::new(
&new_node_cluster_info_ref,
None,
new_node
.sockets
.gossip
.try_clone()
.expect("Failed to clone gossip"),
exit.clone(),
);

(gossip_service, new_node_cluster_info_ref, new_node, id)
}

pub fn converge(node: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
info!("Wait for convergence with {} nodes", num_nodes);
// Let's spy on the network
let (gossip_service, spy_ref, id) = make_spy_node(node);
trace!(
"converge spy_node {} looking for at least {} nodes",
id,
num_nodes
);

// Wait for the cluster to converge
for _ in 0..15 {
let rpc_peers = spy_ref.read().unwrap().rpc_peers();
if rpc_peers.len() == num_nodes {
debug!("converge found {} nodes: {:?}", rpc_peers.len(), rpc_peers);
gossip_service.close().unwrap();
return rpc_peers;
}
debug!(
"converge found {} nodes, need {} more",
rpc_peers.len(),
num_nodes - rpc_peers.len()
);
sleep(Duration::new(1, 0));
}
panic!("Failed to converge");
}

pub fn make_spy_node(leader: &NodeInfo) -> (GossipService, Arc<RwLock<ClusterInfo>>, Pubkey) {
let keypair = Keypair::new();
let exit = Arc::new(AtomicBool::new(false));
let mut spy = Node::new_localhost_with_pubkey(keypair.pubkey());
let id = spy.info.id;
let daddr = "0.0.0.0:0".parse().unwrap();
spy.info.tvu = daddr;
spy.info.rpc = daddr;
let mut spy_cluster_info = ClusterInfo::new_with_keypair(spy.info, Arc::new(keypair));
spy_cluster_info.insert_info(leader.clone());
spy_cluster_info.set_leader(leader.id);
let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info));
let gossip_service = GossipService::new(
&spy_cluster_info_ref,
None,
spy.sockets.gossip,
exit.clone(),
);

(gossip_service, spy_cluster_info_ref, id)
}

impl Service for GossipService {
type JoinReturnType = ();

Expand Down
Loading

0 comments on commit 4b38ecd

Please sign in to comment.