Skip to content

Commit

Permalink
local cluster tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aeyakovenko committed Mar 1, 2019
1 parent fec8675 commit 8c27199
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ impl ClusterInfo {
let len = data.len();
let now = Instant::now();
let self_id = me.read().unwrap().gossip.id;
trace!("PullResponse me: {} len={}", self_id, len);
trace!("PullResponse me: {} from: {} len={}", self_id, from, len);
me.write()
.unwrap()
.gossip
Expand Down
17 changes: 13 additions & 4 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::JoinHandle;
use std::thread::{spawn, Result};
use std::time::Duration;

Expand Down Expand Up @@ -329,18 +330,18 @@ impl Fullnode {

// Runs a thread to manage node role transitions. The returned closure can be used to signal the
// node to exit.
pub fn run(
pub fn start(
mut self,
rotation_notifier: Option<Sender<(FullnodeReturnType, u64)>>,
) -> impl FnOnce() {
) -> (JoinHandle<()>, Arc<AtomicBool>, Receiver<bool>) {
let (sender, receiver) = channel();
let exit = self.exit.clone();
let timeout = Duration::from_secs(1);
spawn(move || loop {
let handle = spawn(move || loop {
if self.exit.load(Ordering::Relaxed) {
debug!("node shutdown requested");
self.close().expect("Unable to close node");
sender.send(true).expect("Unable to signal exit");
let _ = sender.send(true);
break;
}

Expand All @@ -364,6 +365,14 @@ impl Fullnode {
_ => (),
}
});
(handle, exit, receiver)
}

pub fn run(
self,
rotation_notifier: Option<Sender<(FullnodeReturnType, u64)>>,
) -> impl FnOnce() {
let (_, exit, receiver) = self.start(rotation_notifier);
move || {
exit.store(true, Ordering::Relaxed);
receiver.recv().unwrap();
Expand Down
13 changes: 10 additions & 3 deletions src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,21 @@ pub fn converge(node: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
// Wait for the cluster to converge
for _ in 0..15 {
let rpc_peers = spy_ref.read().unwrap().rpc_peers();
if rpc_peers.len() == num_nodes {
debug!("converge found {} nodes: {:?}", rpc_peers.len(), rpc_peers);
if rpc_peers.len() >= num_nodes {
debug!(
"converge found {}/{} nodes: {:?}",
rpc_peers.len(),
num_nodes,
rpc_peers
);
gossip_service.close().unwrap();
return rpc_peers;
}
debug!(
"converge found {} nodes, need {} more",
"spy_node: {} converge found {}/{} nodes, need {} more",
id,
rpc_peers.len(),
num_nodes,
num_nodes - rpc_peers.len()
);
sleep(Duration::new(1, 0));
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub mod blockstream;
pub mod blockstream_service;
pub mod blocktree_processor;
pub mod cluster_info;
pub mod cluster_tests;
pub mod db_window;
pub mod entry;
#[cfg(feature = "erasure")]
Expand All @@ -42,6 +43,7 @@ pub mod gossip_service;
pub mod leader_confirmation_service;
pub mod leader_schedule;
pub mod leader_schedule_utils;
pub mod local_cluster;
pub mod local_vote_signer_service;
pub mod packet;
pub mod poh;
Expand Down

0 comments on commit 8c27199

Please sign in to comment.