diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 0e0dd4418ae4d6..636da2440630fb 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -1023,7 +1023,7 @@ impl ClusterInfo { let len = data.len(); let now = Instant::now(); let self_id = me.read().unwrap().gossip.id; - trace!("PullResponse me: {} len={}", self_id, len); + trace!("PullResponse me: {} from: {} len={}", self_id, from, len); me.write() .unwrap() .gossip diff --git a/src/cluster_tests.rs b/src/cluster_tests.rs new file mode 100644 index 00000000000000..d7eb18aa2d52cb --- /dev/null +++ b/src/cluster_tests.rs @@ -0,0 +1,41 @@ +/// Cluster independant integration tests +/// +/// All tests must start from an entry point and a funding keypair and +/// discover the rest of the network. +use crate::client::mk_client; +use crate::contact_info::ContactInfo; +use crate::gossip_service::discover; +use solana_sdk::signature::{Keypair, KeypairUtil}; +use solana_sdk::system_transaction::SystemTransaction; + +/// Spend and verify from every node in the network +pub fn spend_and_verify_all_nodes( + entry_point_info: &ContactInfo, + funding_keypair: &Keypair, + nodes: usize, +) { + let cluster_nodes = discover(&entry_point_info, nodes); + assert!(cluster_nodes.len() >= nodes); + for ingress_node in &cluster_nodes { + let random_keypair = Keypair::new(); + let mut client = mk_client(&ingress_node); + let bal = client + .poll_get_balance(&funding_keypair.pubkey()) + .expect("balance in source"); + assert!(bal > 0); + let mut transaction = SystemTransaction::new_move( + &funding_keypair, + random_keypair.pubkey(), + 1, + client.get_last_id(), + 0, + ); + let sig = client + .retry_transfer(&funding_keypair, &mut transaction, 5) + .unwrap(); + for validator in &cluster_nodes { + let mut client = mk_client(&validator); + client.poll_for_signature(&sig).unwrap(); + } + } +} diff --git a/src/fullnode.rs b/src/fullnode.rs index e4ec336fbe95b2..eb3ca02beb0e52 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -32,6 +32,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex, RwLock}; +use std::thread::JoinHandle; use std::thread::{spawn, Result}; use std::time::Duration; @@ -324,18 +325,18 @@ impl Fullnode { // Runs a thread to manage node role transitions. The returned closure can be used to signal the // node to exit. - pub fn run( + pub fn start( mut self, rotation_notifier: Option>, - ) -> impl FnOnce() { + ) -> (JoinHandle<()>, Arc, Receiver) { let (sender, receiver) = channel(); let exit = self.exit.clone(); let timeout = Duration::from_secs(1); - spawn(move || loop { + let handle = spawn(move || loop { if self.exit.load(Ordering::Relaxed) { debug!("node shutdown requested"); self.close().expect("Unable to close node"); - sender.send(true).expect("Unable to signal exit"); + let _ = sender.send(true); break; } @@ -359,6 +360,14 @@ impl Fullnode { _ => (), } }); + (handle, exit, receiver) + } + + pub fn run( + self, + rotation_notifier: Option>, + ) -> impl FnOnce() { + let (_, exit, receiver) = self.start(rotation_notifier); move || { exit.store(true, Ordering::Relaxed); receiver.recv().unwrap(); diff --git a/src/gossip_service.rs b/src/gossip_service.rs index 698f73053ad92a..fb78a3bc093f7f 100644 --- a/src/gossip_service.rs +++ b/src/gossip_service.rs @@ -89,6 +89,11 @@ pub fn make_listening_node( (gossip_service, new_node_cluster_info_ref, new_node, id) } +pub fn discover(entry_point_info: &NodeInfo, num_nodes: usize) -> Vec { + converge(entry_point_info, num_nodes) +} + +//TODO: deprecate this in favor of discover pub fn converge(node: &NodeInfo, num_nodes: usize) -> Vec { info!("Wait for convergence with {} nodes", num_nodes); // Let's spy on the network @@ -102,14 +107,21 @@ pub fn converge(node: &NodeInfo, num_nodes: usize) -> Vec { // Wait for the cluster to converge for _ in 0..15 { let rpc_peers = spy_ref.read().unwrap().rpc_peers(); - if rpc_peers.len() == num_nodes { - debug!("converge found {} nodes: {:?}", rpc_peers.len(), rpc_peers); + if rpc_peers.len() >= num_nodes { + debug!( + "converge found {}/{} nodes: {:?}", + rpc_peers.len(), + num_nodes, + rpc_peers + ); gossip_service.close().unwrap(); return rpc_peers; } debug!( - "converge found {} nodes, need {} more", + "spy_node: {} converge found {}/{} nodes, need {} more", + id, rpc_peers.len(), + num_nodes, num_nodes - rpc_peers.len() ); sleep(Duration::new(1, 0)); diff --git a/src/lib.rs b/src/lib.rs index bb6c9e831315ac..de6ed84403076d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,6 +31,7 @@ pub mod blockstream; pub mod blockstream_service; pub mod blocktree_processor; pub mod cluster_info; +pub mod cluster_tests; pub mod db_window; pub mod entry; #[cfg(feature = "erasure")] @@ -42,6 +43,7 @@ pub mod gossip_service; pub mod leader_confirmation_service; pub mod leader_schedule; pub mod leader_schedule_utils; +pub mod local_cluster; pub mod local_vote_signer_service; pub mod packet; pub mod poh; diff --git a/src/local_cluster.rs b/src/local_cluster.rs new file mode 100644 index 00000000000000..7e052ef1fd4d68 --- /dev/null +++ b/src/local_cluster.rs @@ -0,0 +1,146 @@ +use crate::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree}; +use crate::client::mk_client; +use crate::cluster_info::{Node, NodeInfo}; +use crate::fullnode::{Fullnode, FullnodeConfig}; +use crate::gossip_service::discover; +use crate::thin_client::retry_get_balance; +use crate::thin_client::ThinClient; +use crate::voting_keypair::VotingKeypair; +use solana_sdk::genesis_block::GenesisBlock; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::{Keypair, KeypairUtil}; +use solana_sdk::system_transaction::SystemTransaction; +use std::fs::remove_dir_all; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread::JoinHandle; + +pub struct LocalCluster { + /// Keypair with funding to particpiate in the network + pub funding_keypair: Keypair, + /// Entry point from which the rest of the network can be discovered + pub entry_point_info: NodeInfo, + fullnode_hdls: Vec<(JoinHandle<()>, Arc)>, + ledger_paths: Vec, +} + +impl LocalCluster { + pub fn new(num_nodes: usize, cluster_lamports: u64, lamports_per_node: u64) -> Self { + let leader_keypair = Arc::new(Keypair::new()); + let leader_pubkey = leader_keypair.pubkey(); + let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); + let (genesis_block, mint_keypair) = + GenesisBlock::new_with_leader(cluster_lamports, leader_pubkey, lamports_per_node); + let (genesis_ledger_path, _last_id) = create_new_tmp_ledger!(&genesis_block); + let leader_ledger_path = tmp_copy_blocktree!(&genesis_ledger_path); + let mut ledger_paths = vec![]; + ledger_paths.push(genesis_ledger_path.clone()); + ledger_paths.push(leader_ledger_path.clone()); + let voting_keypair = VotingKeypair::new_local(&leader_keypair); + let fullnode_config = FullnodeConfig::default(); + let leader_node_info = leader_node.info.clone(); + let leader_server = Fullnode::new( + leader_node, + &leader_keypair, + &leader_ledger_path, + voting_keypair, + None, + &fullnode_config, + ); + let (thread, exit, _) = leader_server.start(None); + let mut fullnode_hdls = vec![(thread, exit)]; + let mut client = mk_client(&leader_node_info); + for _ in 0..(num_nodes - 1) { + let validator_keypair = Arc::new(Keypair::new()); + let voting_keypair = VotingKeypair::new_local(&validator_keypair); + let validator_pubkey = validator_keypair.pubkey(); + let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); + let ledger_path = tmp_copy_blocktree!(&genesis_ledger_path); + ledger_paths.push(ledger_path.clone()); + + // Send each validator some tokens to vote + let validator_balance = Self::transfer( + &mut client, + &mint_keypair, + &validator_pubkey, + lamports_per_node, + ); + info!( + "validator {} balance {}", + validator_pubkey, validator_balance + ); + let validator_server = Fullnode::new( + validator_node, + &validator_keypair, + &ledger_path, + voting_keypair, + Some(&leader_node_info), + &FullnodeConfig::default(), + ); + let (thread, exit, _) = validator_server.start(None); + fullnode_hdls.push((thread, exit)); + } + discover(&leader_node_info, num_nodes); + Self { + funding_keypair: mint_keypair, + entry_point_info: leader_node_info, + fullnode_hdls, + ledger_paths, + } + } + + pub fn exit(&self) { + for node in &self.fullnode_hdls { + node.1.store(true, Ordering::Relaxed); + } + } + pub fn close(&mut self) { + self.exit(); + while let Some(node) = self.fullnode_hdls.pop() { + node.0.join().expect("join"); + } + for path in &self.ledger_paths { + remove_dir_all(path).unwrap(); + } + } + + fn transfer( + client: &mut ThinClient, + source_keypair: &Keypair, + dest_pubkey: &Pubkey, + lamports: u64, + ) -> u64 { + trace!("getting leader last_id"); + let last_id = client.get_last_id(); + let mut tx = + SystemTransaction::new_account(&source_keypair, *dest_pubkey, lamports, last_id, 0); + info!( + "executing transfer of {} from {} to {}", + lamports, + source_keypair.pubkey(), + *dest_pubkey + ); + client + .retry_transfer(&source_keypair, &mut tx, 5) + .expect("client transfer"); + retry_get_balance(client, dest_pubkey, Some(lamports)).expect("get balance") + } +} + +impl Drop for LocalCluster { + fn drop(&mut self) { + self.close() + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_local_cluster_start_and_exit() { + solana_logger::setup(); + let network = LocalCluster::new(1, 100, 2); + drop(network) + } +} diff --git a/tests/local_cluster.rs b/tests/local_cluster.rs new file mode 100644 index 00000000000000..3ce99e7ad7e9f9 --- /dev/null +++ b/tests/local_cluster.rs @@ -0,0 +1,40 @@ +extern crate solana; + +use solana::cluster_tests; +use solana::local_cluster::LocalCluster; + +#[test] +fn test_spend_and_verify_all_nodes_1() -> () { + solana_logger::setup(); + let num_nodes = 1; + let local = LocalCluster::new(num_nodes, 10_000, 100); + cluster_tests::spend_and_verify_all_nodes( + &local.entry_point_info, + &local.funding_keypair, + num_nodes, + ); +} + +#[test] +fn test_spend_and_verify_all_nodes_2() -> () { + solana_logger::setup(); + let num_nodes = 2; + let local = LocalCluster::new(num_nodes, 10_000, 100); + cluster_tests::spend_and_verify_all_nodes( + &local.entry_point_info, + &local.funding_keypair, + num_nodes, + ); +} + +#[test] +fn test_spend_and_verify_all_nodes_3() -> () { + solana_logger::setup(); + let num_nodes = 3; + let local = LocalCluster::new(num_nodes, 10_000, 100); + cluster_tests::spend_and_verify_all_nodes( + &local.entry_point_info, + &local.funding_keypair, + num_nodes, + ); +}