From f3637f2614d31be9b0e1d0e7dd54b5b6a8544406 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 23 May 2018 07:11:11 -0700 Subject: [PATCH 1/8] cleanup --- src/bin/testnode.rs | 1 + src/crdt.rs | 15 ++++++-- src/streamer.rs | 12 +++--- src/thin_client.rs | 8 ++-- src/tvu.rs | 94 +++++++++++++++------------------------------ 5 files changed, 52 insertions(+), 78 deletions(-) diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 1907589cbd2065..659b84ea89da43 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -131,6 +131,7 @@ fn main() { gossip_sock.local_addr().unwrap(), replicate_sock.local_addr().unwrap(), serve_sock.local_addr().unwrap(), + events_sock.local_addr().unwrap(), ); let mut local = serve_sock.local_addr().unwrap(); diff --git a/src/crdt.rs b/src/crdt.rs index 7029e79b21be9e..eef6981f4720dd 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -31,7 +31,7 @@ use std::thread::{sleep, spawn, JoinHandle}; use std::time::Duration; /// Structure to be replicated by the network -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct ReplicatedData { pub id: PublicKey, sig: Signature, @@ -42,7 +42,9 @@ pub struct ReplicatedData { /// address to connect to for replication pub replicate_addr: SocketAddr, /// address to connect to when this node is leader - pub serve_addr: SocketAddr, + pub requests_addr: SocketAddr, + /// events address + pub events_addr: SocketAddr, /// current leader identity current_leader_id: PublicKey, /// last verified hash that was submitted to the leader @@ -56,7 +58,8 @@ impl ReplicatedData { id: PublicKey, gossip_addr: SocketAddr, replicate_addr: SocketAddr, - serve_addr: SocketAddr, + requests_addr: SocketAddr, + events_addr: SocketAddr, ) -> ReplicatedData { ReplicatedData { id, @@ -64,7 +67,8 @@ impl ReplicatedData { version: 0, gossip_addr, replicate_addr, - serve_addr, + requests_addr, + events_addr, current_leader_id: PublicKey::default(), last_verified_hash: Hash::default(), last_verified_count: 0, @@ -515,12 +519,14 @@ mod test { let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); + let events = UdpSocket::bind("0.0.0.0:0").unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), serve.local_addr().unwrap(), + events.local_addr().unwrap(), ); let crdt = Crdt::new(d); trace!( @@ -632,6 +638,7 @@ mod test { "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), + "127.0.0.1:1237".parse().unwrap(), ); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()); diff --git a/src/streamer.rs b/src/streamer.rs index ee83cd25d027c6..9af0552ce1f9ea 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -650,12 +650,14 @@ mod test { let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let serve = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let event = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); let rep_data = ReplicatedData::new( pubkey_me, read.local_addr().unwrap(), send.local_addr().unwrap(), serve.local_addr().unwrap(), + event.local_addr().unwrap(), ); let mut crdt_me = Crdt::new(rep_data); let me_id = crdt_me.my_data().id; @@ -712,21 +714,17 @@ mod test { let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); let serve = UdpSocket::bind("127.0.0.1:0").unwrap(); + let event = UdpSocket::bind("127.0.0.1:0").unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), serve.local_addr().unwrap(), + event.local_addr().unwrap(), ); let crdt = Crdt::new(d); - trace!( - "id: {} gossip: {} replicate: {} serve: {}", - crdt.my_data().id[0], - gossip.local_addr().unwrap(), - replicate.local_addr().unwrap(), - serve.local_addr().unwrap(), - ); + trace!("data: {:?}", d); (Arc::new(RwLock::new(crdt)), gossip, replicate, serve) } diff --git a/src/thin_client.rs b/src/thin_client.rs index b5632c9ae3e3dc..142bb46ac94b14 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -211,6 +211,7 @@ mod tests { gossip.local_addr().unwrap(), "0.0.0.0:0".parse().unwrap(), requests_socket.local_addr().unwrap(), + events_addr, ); let alice = Mint::new(10_000); @@ -326,6 +327,7 @@ mod tests { gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), serve.local_addr().unwrap(), + events_socket.local_addr().unwrap(), ); (leader, gossip, serve, replicate, events_socket) } @@ -364,7 +366,7 @@ mod tests { let daddr = "0.0.0.0:0".parse().unwrap(); let me = spy.id.clone(); spy.replicate_addr = daddr; - spy.serve_addr = daddr; + spy.requests_addr = daddr; let mut spy_crdt = Crdt::new(spy); spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); @@ -393,7 +395,7 @@ mod tests { .values() .into_iter() .filter(|x| x.id != me) - .map(|x| x.serve_addr) + .map(|x| x.requests_addr) .collect(); v.clone() } @@ -446,7 +448,7 @@ mod tests { let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader.0.serve_addr, + leader.0.requests_addr, requests_socket, events_addr, events_socket, diff --git a/src/tvu.rs b/src/tvu.rs index 18d09d08b1f322..92e35ffd85051c 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -1,5 +1,23 @@ //! The `tvu` module implements the Transaction Validation Unit, a //! 5-stage transaction validation pipeline in software. +//! 1. streamer +//! - Incoming blobs are picked up from the replicate socket. +//! 2. verifier +//! - TODO Blobs are sent to the GPU, and while the memory is there the PoH stream is verified +//! along with the ecdsa signature for the blob and each signature in all the transactions. +//! 3.a retransmit +//! - Blobs originating from the parent (leader atm is the only parent), are retransmit to all the +//! peers in the crdt. Peers is everyone who is not me or the leader that has a known replicate +//! address. +//! 3.b window +//! - Verified blobs are placed into a window, indexed by the counter set by the leader. This could +//! be the PoH counter if its monitonically increasing in each blob. Easure coding is used to +//! recover any missing packets, and requests are made at random to peers and parents to retransmit +//! a missing packet. +//! 4. accountant +//! - Contigous blobs are sent to the accountant for processing transactions +//! 5. validator +//! - TODO Validation messages are sent back to the leader use bank::Bank; use banking_stage::BankingStage; @@ -20,48 +38,27 @@ use streamer; use write_stage::WriteStage; pub struct Tvu { - bank: Arc, - start_hash: Hash, - tick_duration: Option, + pub thread_hdls: Vec>, } impl Tvu { - /// Create a new Tvu that wraps the given Bank. - pub fn new(bank: Bank, start_hash: Hash, tick_duration: Option) -> Self { - Tvu { - bank: Arc::new(bank), - start_hash, - tick_duration, - } - } - /// This service receives messages from a leader in the network and processes the transactions /// on the bank state. /// # Arguments - /// * `obj` - The bank state. + /// * `bank` - The bank state. /// * `me` - my configuration + /// * `gossip` - my gosisp socket + /// * `replicte` - my replicte socket /// * `leader` - leader configuration /// * `exit` - The exit signal. - /// # Remarks - /// The pipeline is constructed as follows: - /// 1. receive blobs from the network, these are out of order - /// 2. verify blobs, PoH, signatures (TODO) - /// 3. reconstruct contiguous window - /// a. order the blobs - /// b. use erasure coding to reconstruct missing blobs - /// c. ask the network for missing blobs, if erasure coding is insufficient - /// d. make sure that the blobs PoH sequences connect (TODO) - /// 4. process the transaction state machine - /// 5. respond with the hash of the state back to the leader - pub fn serve( - obj: &Arc, + pub fn new( + bank: Arc, me: ReplicatedData, gossip: UdpSocket, - requests_socket: UdpSocket, replicate: UdpSocket, leader: ReplicatedData, exit: Arc, - ) -> Result>> { + ) -> Self { //replicate pipeline let crdt = Arc::new(RwLock::new(Crdt::new(me))); crdt.write() @@ -118,39 +115,7 @@ impl Tvu { blob_recycler.clone(), ); - //serve pipeline - // make sure we are on the same interface - let mut local = requests_socket.local_addr()?; - local.set_port(0); - - let packet_recycler = packet::PacketRecycler::default(); - let (packet_sender, packet_receiver) = channel(); - let t_packet_receiver = streamer::receiver( - requests_socket, - exit.clone(), - packet_recycler.clone(), - packet_sender, - ); - - let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); - - let banking_stage = BankingStage::new( - obj.bank.clone(), - exit.clone(), - sig_verify_stage.verified_receiver, - packet_recycler.clone(), - ); - - let record_stage = RecordStage::new( - banking_stage.signal_receiver, - &obj.start_hash, - obj.tick_duration, - ); - - let write_stage = - WriteStage::new_drain(obj.bank.clone(), exit.clone(), record_stage.entry_receiver); - - let mut threads = vec![ + let threads = vec![ //replicate threads t_blob_receiver, t_retransmit, @@ -164,7 +129,7 @@ impl Tvu { write_stage.thread_hdl, ]; threads.extend(sig_verify_stage.thread_hdls.into_iter()); - Ok(threads) + Tvu{threads} } } @@ -185,6 +150,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), requests_socket.local_addr().unwrap(), + events_socket.local_addr().unwrap(), ); (d, gossip, replicate, requests_socket, events_socket) } @@ -215,7 +181,7 @@ mod tests { fn test_replicate() { logger::setup(); let (leader_data, leader_gossip, _, leader_serve, _) = test_node(); - let (target1_data, target1_gossip, target1_replicate, target1_serve, _) = test_node(); + let (target1_data, target1_gossip, target1_replicate, _, target1_events) = test_node(); let (target2_data, target2_gossip, target2_replicate, _, _) = test_node(); let exit = Arc::new(AtomicBool::new(false)); @@ -273,7 +239,7 @@ mod tests { &tvu, target1_data, target1_gossip, - target1_serve, + target1_events, target1_replicate, leader_data, exit.clone(), From abfae92495e5857bec21fc51387201f5aee8e5fc Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 23 May 2018 07:44:01 -0700 Subject: [PATCH 2/8] cleanup --- src/server.rs | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/src/server.rs b/src/server.rs index b250e86d84bf0a..016d5ab57eb2cb 100644 --- a/src/server.rs +++ b/src/server.rs @@ -17,7 +17,7 @@ pub struct Server { } impl Server { - pub fn new( + pub fn leader( bank: Bank, start_hash: Hash, tick_duration: Option, @@ -26,7 +26,7 @@ impl Server { events_socket: UdpSocket, broadcast_socket: UdpSocket, respond_socket: UdpSocket, - gossip: UdpSocket, + gossip_socket: UdpSocket, exit: Arc, writer: W, ) -> Self { @@ -34,7 +34,6 @@ impl Server { let mut thread_hdls = vec![]; let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); thread_hdls.extend(rpu.thread_hdls); - let tpu = Tpu::new( bank.clone(), start_hash, @@ -42,12 +41,36 @@ impl Server { me, events_socket, broadcast_socket, - gossip, + gossip_socket, exit.clone(), writer, ); thread_hdls.extend(tpu.thread_hdls); - + Server { thread_hdls } + } + pub fn validator( + bank: Bank, + me: ReplicatedData, + requests_socket: UdpSocket, + respond_socket: UdpSocket, + replicate_socket: UdpSocket, + gossip_socket: UdpSocket, + leader_repl_data: ReplicatedData, + exit: Arc, + ) -> Self { + let bank = Arc::new(bank); + let mut thread_hdls = vec![]; + let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); + thread_hdls.extend(rpu.thread_hdls); + let tvu = Tvu::new( + bank.clone(), + me, + gossip_socket, + replicate_socket, + leader_repl_data, + exit.clone(), + ); + thread_hdls.extend(tpu.thread_hdls); Server { thread_hdls } } } From 83036cbb48d5592cf321acf217eb016f82002f38 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 23 May 2018 08:29:24 -0700 Subject: [PATCH 3/8] refactor wip --- src/bin/testnode.rs | 2 +- src/server.rs | 3 +- src/thin_client.rs | 84 ++++++++++++++++++++++++--------------------- src/tvu.rs | 47 +++++++++---------------- 4 files changed, 64 insertions(+), 72 deletions(-) diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 659b84ea89da43..4b82899a95c805 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -140,7 +140,7 @@ fn main() { let respond_socket = UdpSocket::bind(local.clone()).unwrap(); eprintln!("starting server..."); - let server = Server::new( + let server = Server::leader( bank, last_id, Some(Duration::from_millis(1000)), diff --git a/src/server.rs b/src/server.rs index 016d5ab57eb2cb..94e3ffbc112a9a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -11,6 +11,7 @@ use std::sync::atomic::AtomicBool; use std::thread::JoinHandle; use std::time::Duration; use tpu::Tpu; +use tvu::Tvu; pub struct Server { pub thread_hdls: Vec>, @@ -70,7 +71,7 @@ impl Server { leader_repl_data, exit.clone(), ); - thread_hdls.extend(tpu.thread_hdls); + thread_hdls.extend(tvu.thread_hdls); Server { thread_hdls } } } diff --git a/src/thin_client.rs b/src/thin_client.rs index 142bb46ac94b14..a2cc3f2bb808e4 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -192,7 +192,6 @@ mod tests { use std::thread::sleep; use std::time::Duration; use streamer::default_window; - use tvu::{self, Tvu}; #[test] fn test_thin_client() { @@ -224,7 +223,7 @@ mod tests { let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let server = Server::new( + let server = Server::leader( bank, alice.last_id(), Some(Duration::from_millis(30)), @@ -258,21 +257,20 @@ mod tests { #[test] fn test_bad_sig() { logger::setup(); - let (leader_data, leader_gossip, _, leader_serve, _leader_events) = tvu::test_node(); + let leader = TestNode::new(); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let serve_addr = leader_serve.local_addr().unwrap(); - - let mut local = leader_serve.local_addr().unwrap(); + let serve_addr = leader.data.requests_addr; + let mut local = serve_addr.local_addr().unwrap(); local.set_port(0); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); let events_addr = events_socket.local_addr().unwrap(); - let server = Server::new( + let server = Server::leader( bank, alice.last_id(), Some(Duration::from_millis(30)), @@ -313,23 +311,33 @@ mod tests { t.join().unwrap(); } } - - fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { - let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); - let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); - serve.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); - - let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); - let pubkey = KeyPair::new().pubkey(); - let leader = ReplicatedData::new( - pubkey, - gossip.local_addr().unwrap(), - replicate.local_addr().unwrap(), - serve.local_addr().unwrap(), - events_socket.local_addr().unwrap(), - ); - (leader, gossip, serve, replicate, events_socket) + struct TestNode { + data: ReplicatedData, + gossip: UdpSocket, + requests: UdpSocket, + replicate: UdpSocket, + event: UdpSocket, + respond: UdpSocket, + broadcast: UdpSocket, + } + impl TestNode { + fn new() -> TestNode { + let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); + let requests = UdpSocket::bind("0.0.0.0:0").unwrap(); + let event = UdpSocket::bind("0.0.0.0:0").unwrap(); + let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); + let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); + let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); + let pubkey = KeyPair::new().pubkey(); + let data = ReplicatedData::new( + pubkey, + gossip.local_addr().unwrap(), + replicate.local_addr().unwrap(), + requests.local_addr().unwrap(), + event.local_addr().unwrap(), + ); + TestNode {data, gossip, requests, replicate, event, respond, broadcast } + } } fn replicant( @@ -338,21 +346,19 @@ mod tests { alice: &Mint, threads: &mut Vec>, ) { - let replicant = test_node(); - let replicant_bank = { - let bank = Bank::new(&alice); - Arc::new(Tvu::new(bank, alice.last_id(), None)) - }; - let mut ts = Tvu::serve( - &replicant_bank, - replicant.0.clone(), - replicant.1, - replicant.2, - replicant.3, + let replicant = TestNode::new(); + let replicant_bank = Bank::new(&alice); + let mut ts = Server::validator( + replicant_bank, + replicant.data.clone(), + replicant.requests, + replicant.respond, + replicant.replicate, + replicant.gossip, leader.clone(), exit.clone(), - ).unwrap(); - threads.append(&mut ts); + ); + threads.append(&mut ts.thread_hdls); } fn converge( @@ -362,7 +368,7 @@ mod tests { threads: &mut Vec>, ) -> Vec { //lets spy on the network - let (mut spy, spy_gossip, _, _, _) = test_node(); + let mut spy = test_node(); let daddr = "0.0.0.0:0".parse().unwrap(); let me = spy.id.clone(); spy.replicate_addr = daddr; @@ -418,7 +424,7 @@ mod tests { let respond_socket = UdpSocket::bind(local.clone()).unwrap(); let events_addr = leader.4.local_addr().unwrap(); - let server = Server::new( + let server = Server::leader( leader_bank, alice.last_id(), None, diff --git a/src/tvu.rs b/src/tvu.rs index 92e35ffd85051c..f640428839eea2 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -4,7 +4,8 @@ //! - Incoming blobs are picked up from the replicate socket. //! 2. verifier //! - TODO Blobs are sent to the GPU, and while the memory is there the PoH stream is verified -//! along with the ecdsa signature for the blob and each signature in all the transactions. +//! along with the ecdsa signature for the blob and each signature in all the transactions. Blobs +//! with errors are dropped, or marked for slashing. //! 3.a retransmit //! - Blobs originating from the parent (leader atm is the only parent), are retransmit to all the //! peers in the crdt. Peers is everyone who is not me or the leader that has a known replicate @@ -20,22 +21,15 @@ //! - TODO Validation messages are sent back to the leader use bank::Bank; -use banking_stage::BankingStage; use crdt::{Crdt, ReplicatedData}; -use hash::Hash; use packet; -use record_stage::RecordStage; use replicate_stage::ReplicateStage; -use result::Result; -use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::JoinHandle; -use std::time::Duration; use streamer; -use write_stage::WriteStage; pub struct Tvu { pub thread_hdls: Vec>, @@ -71,10 +65,11 @@ impl Tvu { let window = streamer::default_window(); let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); + // TODO pull this socket out through the public interface // make sure we are on the same interface - let mut local = replicate.local_addr()?; + let mut local = replicate.local_addr().expect("tvu: get local address"); local.set_port(0); - let write = UdpSocket::bind(local)?; + let write = UdpSocket::bind(local).expect("tvu: bind to local socket"); let blob_recycler = packet::BlobRecycler::default(); let (blob_sender, blob_receiver) = channel(); @@ -83,7 +78,7 @@ impl Tvu { blob_recycler.clone(), replicate, blob_sender.clone(), - )?; + ).expect("tvu: blob receiver creation"); let (window_sender, window_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel(); @@ -109,7 +104,7 @@ impl Tvu { ); let replicate_stage = ReplicateStage::new( - obj.bank.clone(), + bank.clone(), exit.clone(), window_receiver, blob_recycler.clone(), @@ -123,16 +118,14 @@ impl Tvu { replicate_stage.thread_hdl, t_gossip, t_listen, - //serve threads - t_packet_receiver, - banking_stage.thread_hdl, - write_stage.thread_hdl, ]; - threads.extend(sig_verify_stage.thread_hdls.into_iter()); - Tvu{threads} + Tvu{thread_hdls: threads} } } +#[cfg(test)] +use std::time::Duration; + #[cfg(test)] pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { use signature::{KeyPair, KeyPairUtil}; @@ -228,22 +221,17 @@ mod tests { let starting_balance = 10_000; let mint = Mint::new(starting_balance); - let bank = Bank::new(&mint); - let tvu = Arc::new(Tvu::new( - bank, - mint.last_id(), - Some(Duration::from_millis(30)), - )); let replicate_addr = target1_data.replicate_addr; - let threads = Tvu::serve( - &tvu, + let bank = Arc::new(Bank::new(&mint)); + let tvu = Tvu::new( + bank, target1_data, target1_gossip, target1_events, target1_replicate, leader_data, exit.clone(), - ).unwrap(); + ); let mut alice_ref_balance = starting_balance; let mut msgs = VecDeque::new(); @@ -258,8 +246,6 @@ mod tests { w.set_index(i).unwrap(); w.set_id(leader_id).unwrap(); - let bank = &tvu.bank; - let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); let entry0 = Entry::new(&cur_hash, i, vec![tr0]); bank.register_entry_id(&cur_hash); @@ -299,7 +285,6 @@ mod tests { msgs.push(msg); } - let bank = &tvu.bank; let alice_balance = bank.get_balance(&mint.keypair().pubkey()).unwrap(); assert_eq!(alice_balance, alice_ref_balance); @@ -307,7 +292,7 @@ mod tests { assert_eq!(bob_balance, starting_balance - alice_ref_balance); exit.store(true, Ordering::Relaxed); - for t in threads { + for t in tvu.thread_hdls { t.join().expect("join"); } t2_gossip.join().expect("join"); From 6f4e2c184b59e3de10f747c68d63115867d50e43 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 23 May 2018 10:49:48 -0700 Subject: [PATCH 4/8] wip --- src/thin_client.rs | 130 ++++++++++++--------------------------------- src/tvu.rs | 71 +++++++++++++++++-------- 2 files changed, 85 insertions(+), 116 deletions(-) diff --git a/src/thin_client.rs b/src/thin_client.rs index a2cc3f2bb808e4..8959e0a944af8b 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -192,47 +192,28 @@ mod tests { use std::thread::sleep; use std::time::Duration; use streamer::default_window; + use tvu::tests::TestNode; #[test] fn test_thin_client() { logger::setup(); - let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); - let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - requests_socket - .set_read_timeout(Some(Duration::new(1, 0))) - .unwrap(); - let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let events_addr = events_socket.local_addr().unwrap(); - let addr = requests_socket.local_addr().unwrap(); - let pubkey = KeyPair::new().pubkey(); - let d = ReplicatedData::new( - pubkey, - gossip.local_addr().unwrap(), - "0.0.0.0:0".parse().unwrap(), - requests_socket.local_addr().unwrap(), - events_addr, - ); + let leader = TestNode::new(); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let mut local = requests_socket.local_addr().unwrap(); - local.set_port(0); - let broadcast_socket = UdpSocket::bind(local).unwrap(); - let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let server = Server::leader( bank, alice.last_id(), Some(Duration::from_millis(30)), - d, - requests_socket, - events_socket, - broadcast_socket, - respond_socket, - gossip, + leader.data, + leader.requests, + leader.event, + leader.broadcast, + leader.respond, + leader.gossip, exit.clone(), sink(), ); @@ -241,7 +222,7 @@ mod tests { let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = ThinClient::new(addr, requests_socket, events_addr, events_socket); + let mut client = ThinClient::new(leader.data.requests_addr, requests_socket, leader.data.events_addr, events_socket); let last_id = client.get_last_id().wait().unwrap(); let _sig = client .transfer(500, &alice.keypair(), bob_pubkey, &last_id) @@ -262,24 +243,18 @@ mod tests { let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let serve_addr = leader.data.requests_addr; - let mut local = serve_addr.local_addr().unwrap(); - local.set_port(0); - let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let broadcast_socket = UdpSocket::bind(local).unwrap(); - let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let events_addr = events_socket.local_addr().unwrap(); + let events_addr = leader.data.events_addr; let server = Server::leader( bank, alice.last_id(), Some(Duration::from_millis(30)), - leader_data, - leader_serve, - events_socket, - broadcast_socket, - respond_socket, - leader_gossip, + leader.data, + leader.requests, + leader.event, + leader.broadcast, + leader.respond, + leader.gossip, exit.clone(), sink(), ); @@ -290,7 +265,7 @@ mod tests { .set_read_timeout(Some(Duration::new(5, 0))) .unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = ThinClient::new(serve_addr, requests_socket, events_addr, events_socket); + let mut client = ThinClient::new(leader.data.requests_addr, requests_socket, events_addr, events_socket); let last_id = client.get_last_id().wait().unwrap(); let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id); @@ -311,35 +286,6 @@ mod tests { t.join().unwrap(); } } - struct TestNode { - data: ReplicatedData, - gossip: UdpSocket, - requests: UdpSocket, - replicate: UdpSocket, - event: UdpSocket, - respond: UdpSocket, - broadcast: UdpSocket, - } - impl TestNode { - fn new() -> TestNode { - let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); - let requests = UdpSocket::bind("0.0.0.0:0").unwrap(); - let event = UdpSocket::bind("0.0.0.0:0").unwrap(); - let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); - let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); - let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); - let pubkey = KeyPair::new().pubkey(); - let data = ReplicatedData::new( - pubkey, - gossip.local_addr().unwrap(), - replicate.local_addr().unwrap(), - requests.local_addr().unwrap(), - event.local_addr().unwrap(), - ); - TestNode {data, gossip, requests, replicate, event, respond, broadcast } - } - } - fn replicant( leader: &ReplicatedData, exit: Arc, @@ -368,18 +314,18 @@ mod tests { threads: &mut Vec>, ) -> Vec { //lets spy on the network - let mut spy = test_node(); + let mut spy = TestNode::new(); let daddr = "0.0.0.0:0".parse().unwrap(); - let me = spy.id.clone(); - spy.replicate_addr = daddr; - spy.requests_addr = daddr; - let mut spy_crdt = Crdt::new(spy); + let me = spy.data.id.clone(); + spy.data.replicate_addr = daddr; + spy.data.requests_addr = daddr; + let mut spy_crdt = Crdt::new(spy.data); spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_window = default_window(); - let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone()); + let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy.gossip, exit.clone()); let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); //wait for the network to converge let mut converged = false; @@ -411,38 +357,32 @@ mod tests { logger::setup(); const N: usize = 5; trace!("test_multi_accountant_stub"); - let leader = test_node(); + let leader = TestNode::new(); let alice = Mint::new(10_000); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let leader_bank = Bank::new(&alice); - - let mut local = leader.2.local_addr().unwrap(); - local.set_port(0); - let broadcast_socket = UdpSocket::bind(local).unwrap(); - let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let events_addr = leader.4.local_addr().unwrap(); - + let events_addr = leader.data.events_addr; let server = Server::leader( leader_bank, alice.last_id(), None, - leader.0.clone(), - leader.2, - leader.4, - broadcast_socket, - respond_socket, - leader.1, + leader.data, + leader.requests, + leader.event, + leader.broadcast, + leader.respond, + leader.gossip, exit.clone(), sink(), ); let mut threads = server.thread_hdls; for _ in 0..N { - replicant(&leader.0, exit.clone(), &alice, &mut threads); + replicant(&leader.data, exit.clone(), &alice, &mut threads); } - let addrs = converge(&leader.0, exit.clone(), N + 2, &mut threads); + let addrs = converge(&leader.data, exit.clone(), N + 2, &mut threads); //contains the leader addr as well assert_eq!(addrs.len(), N + 1); //verify leader can do transfer @@ -454,9 +394,9 @@ mod tests { let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader.0.requests_addr, + leader.data.requests_addr, requests_socket, - events_addr, + leader.data.events_addr, events_socket, ); trace!("getting leader last_id"); diff --git a/src/tvu.rs b/src/tvu.rs index f640428839eea2..f8b6d4d89b8a4c 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -149,7 +149,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke } #[cfg(test)] -mod tests { +pub mod tests { use bank::Bank; use bincode::serialize; use chrono::prelude::*; @@ -166,36 +166,38 @@ mod tests { use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::time::Duration; + use std::net::UdpSocket; use streamer; - use tvu::{test_node, Tvu}; + use crdt::ReplicatedData; + use tvu::Tvu; /// Test that mesasge sent from leader to target1 and repliated to target2 #[test] fn test_replicate() { logger::setup(); - let (leader_data, leader_gossip, _, leader_serve, _) = test_node(); - let (target1_data, target1_gossip, target1_replicate, _, target1_events) = test_node(); - let (target2_data, target2_gossip, target2_replicate, _, _) = test_node(); + let leader = TestNode::new(); + let target1 = TestNode::new(); + let target2 = TestNode::new(); let exit = Arc::new(AtomicBool::new(false)); //start crdt_leader - let mut crdt_l = Crdt::new(leader_data.clone()); - crdt_l.set_leader(leader_data.id); + let mut crdt_l = Crdt::new(leader.data.clone()); + crdt_l.set_leader(leader.data.id); let cref_l = Arc::new(RwLock::new(crdt_l)); let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone()); let window1 = streamer::default_window(); - let t_l_listen = Crdt::listen(cref_l, window1, leader_gossip, exit.clone()); + let t_l_listen = Crdt::listen(cref_l, window1, leader.gossip, exit.clone()); //start crdt2 - let mut crdt2 = Crdt::new(target2_data.clone()); - crdt2.insert(&leader_data); - crdt2.set_leader(leader_data.id); - let leader_id = leader_data.id; + let mut crdt2 = Crdt::new(target2.data.clone()); + crdt2.insert(&leader.data); + crdt2.set_leader(leader.data.id); + let leader_id = leader.data.id; let cref2 = Arc::new(RwLock::new(crdt2)); let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone()); let window2 = streamer::default_window(); - let t2_listen = Crdt::listen(cref2, window2, target2_gossip, exit.clone()); + let t2_listen = Crdt::listen(cref2, window2, target2.gossip, exit.clone()); // setup some blob services to send blobs into the socket // to simulate the source peer and get blobs out of the socket to @@ -206,14 +208,14 @@ mod tests { let t_receiver = streamer::blob_receiver( exit.clone(), recv_recycler.clone(), - target2_replicate, + target2.replicate, s_reader, ).unwrap(); // simulate leader sending messages let (s_responder, r_responder) = channel(); let t_responder = streamer::responder( - leader_serve, + leader.requests, exit.clone(), resp_recycler.clone(), r_responder, @@ -221,15 +223,14 @@ mod tests { let starting_balance = 10_000; let mint = Mint::new(starting_balance); - let replicate_addr = target1_data.replicate_addr; + let replicate_addr = target1.data.replicate_addr; let bank = Arc::new(Bank::new(&mint)); let tvu = Tvu::new( bank, - target1_data, - target1_gossip, - target1_events, - target1_replicate, - leader_data, + target1.data, + target1.gossip, + target1.replicate, + leader.data, exit.clone(), ); @@ -302,4 +303,32 @@ mod tests { t_l_gossip.join().expect("join"); t_l_listen.join().expect("join"); } + pub struct TestNode { + pub data: ReplicatedData, + pub gossip: UdpSocket, + pub requests: UdpSocket, + pub replicate: UdpSocket, + pub event: UdpSocket, + pub respond: UdpSocket, + pub broadcast: UdpSocket, + } + impl TestNode { + pub fn new() -> TestNode { + let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); + let requests = UdpSocket::bind("0.0.0.0:0").unwrap(); + let event = UdpSocket::bind("0.0.0.0:0").unwrap(); + let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); + let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); + let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); + let pubkey = KeyPair::new().pubkey(); + let data = ReplicatedData::new( + pubkey, + gossip.local_addr().unwrap(), + replicate.local_addr().unwrap(), + requests.local_addr().unwrap(), + event.local_addr().unwrap(), + ); + TestNode {data, gossip, requests, replicate, event, respond, broadcast } + } + } } From e1f1ca29f335e060c945a2f3f5e5a4c4a4aaafb2 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 23 May 2018 10:52:47 -0700 Subject: [PATCH 5/8] woop --- src/streamer.rs | 2 +- src/thin_client.rs | 9 ++++----- src/tvu.rs | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/streamer.rs b/src/streamer.rs index 9af0552ce1f9ea..719aa46aa4b6fc 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -723,8 +723,8 @@ mod test { serve.local_addr().unwrap(), event.local_addr().unwrap(), ); - let crdt = Crdt::new(d); trace!("data: {:?}", d); + let crdt = Crdt::new(d); (Arc::new(RwLock::new(crdt)), gossip, replicate, serve) } diff --git a/src/thin_client.rs b/src/thin_client.rs index 8959e0a944af8b..6e5e2e3b559ac4 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -208,7 +208,7 @@ mod tests { bank, alice.last_id(), Some(Duration::from_millis(30)), - leader.data, + leader.data.clone(), leader.requests, leader.event, leader.broadcast, @@ -243,13 +243,12 @@ mod tests { let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let events_addr = leader.data.events_addr; let server = Server::leader( bank, alice.last_id(), Some(Duration::from_millis(30)), - leader.data, + leader.data.clone(), leader.requests, leader.event, leader.broadcast, @@ -265,7 +264,7 @@ mod tests { .set_read_timeout(Some(Duration::new(5, 0))) .unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = ThinClient::new(leader.data.requests_addr, requests_socket, events_addr, events_socket); + let mut client = ThinClient::new(leader.data.requests_addr, requests_socket, leader.data.events_addr, events_socket); let last_id = client.get_last_id().wait().unwrap(); let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id); @@ -368,7 +367,7 @@ mod tests { leader_bank, alice.last_id(), None, - leader.data, + leader.data.clone(), leader.requests, leader.event, leader.broadcast, diff --git a/src/tvu.rs b/src/tvu.rs index f8b6d4d89b8a4c..157f09e0c6301b 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -226,7 +226,7 @@ pub mod tests { let replicate_addr = target1.data.replicate_addr; let bank = Arc::new(Bank::new(&mint)); let tvu = Tvu::new( - bank, + bank.clone(), target1.data, target1.gossip, target1.replicate, From dc865295a462a814ac700057f9dba71651f8f7f7 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 23 May 2018 10:54:48 -0700 Subject: [PATCH 6/8] fmt --- src/thin_client.rs | 14 ++++++++++++-- src/tvu.rs | 18 ++++++++++++++---- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/src/thin_client.rs b/src/thin_client.rs index 6e5e2e3b559ac4..b564afdc24c056 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -222,7 +222,12 @@ mod tests { let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = ThinClient::new(leader.data.requests_addr, requests_socket, leader.data.events_addr, events_socket); + let mut client = ThinClient::new( + leader.data.requests_addr, + requests_socket, + leader.data.events_addr, + events_socket, + ); let last_id = client.get_last_id().wait().unwrap(); let _sig = client .transfer(500, &alice.keypair(), bob_pubkey, &last_id) @@ -264,7 +269,12 @@ mod tests { .set_read_timeout(Some(Duration::new(5, 0))) .unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = ThinClient::new(leader.data.requests_addr, requests_socket, leader.data.events_addr, events_socket); + let mut client = ThinClient::new( + leader.data.requests_addr, + requests_socket, + leader.data.events_addr, + events_socket, + ); let last_id = client.get_last_id().wait().unwrap(); let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id); diff --git a/src/tvu.rs b/src/tvu.rs index 157f09e0c6301b..a75bbe59d8fc83 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -119,7 +119,9 @@ impl Tvu { t_gossip, t_listen, ]; - Tvu{thread_hdls: threads} + Tvu { + thread_hdls: threads, + } } } @@ -154,6 +156,7 @@ pub mod tests { use bincode::serialize; use chrono::prelude::*; use crdt::Crdt; + use crdt::ReplicatedData; use entry::Entry; use event::Event; use hash::{hash, Hash}; @@ -162,13 +165,12 @@ pub mod tests { use packet::BlobRecycler; use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; + use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::time::Duration; - use std::net::UdpSocket; use streamer; - use crdt::ReplicatedData; use tvu::Tvu; /// Test that mesasge sent from leader to target1 and repliated to target2 @@ -328,7 +330,15 @@ pub mod tests { requests.local_addr().unwrap(), event.local_addr().unwrap(), ); - TestNode {data, gossip, requests, replicate, event, respond, broadcast } + TestNode { + data, + gossip, + requests, + replicate, + event, + respond, + broadcast, + } } } } From 835e1156ccdb14c7ebc5294267fec6eea8d2fc28 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 23 May 2018 11:06:18 -0700 Subject: [PATCH 7/8] @garious review --- src/bin/testnode.rs | 2 +- src/server.rs | 4 ++-- src/thin_client.rs | 48 ++++++++++++++++++++++----------------------- src/tvu.rs | 37 +++++++++++++++++++--------------- 4 files changed, 48 insertions(+), 43 deletions(-) diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 4b82899a95c805..1663e1e60da53e 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -140,7 +140,7 @@ fn main() { let respond_socket = UdpSocket::bind(local.clone()).unwrap(); eprintln!("starting server..."); - let server = Server::leader( + let server = Server::new_leader( bank, last_id, Some(Duration::from_millis(1000)), diff --git a/src/server.rs b/src/server.rs index 94e3ffbc112a9a..12828128ebeb5c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -18,7 +18,7 @@ pub struct Server { } impl Server { - pub fn leader( + pub fn new_leader( bank: Bank, start_hash: Hash, tick_duration: Option, @@ -49,7 +49,7 @@ impl Server { thread_hdls.extend(tpu.thread_hdls); Server { thread_hdls } } - pub fn validator( + pub fn new_validator( bank: Bank, me: ReplicatedData, requests_socket: UdpSocket, diff --git a/src/thin_client.rs b/src/thin_client.rs index b564afdc24c056..7cfacf82cb5d4c 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -204,16 +204,16 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let server = Server::leader( + let server = Server::new_leader( bank, alice.last_id(), Some(Duration::from_millis(30)), leader.data.clone(), - leader.requests, - leader.event, - leader.broadcast, - leader.respond, - leader.gossip, + leader.sockets.requests, + leader.sockets.event, + leader.sockets.broadcast, + leader.sockets.respond, + leader.sockets.gossip, exit.clone(), sink(), ); @@ -249,16 +249,16 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let server = Server::leader( + let server = Server::new_leader( bank, alice.last_id(), Some(Duration::from_millis(30)), leader.data.clone(), - leader.requests, - leader.event, - leader.broadcast, - leader.respond, - leader.gossip, + leader.sockets.requests, + leader.sockets.event, + leader.sockets.broadcast, + leader.sockets.respond, + leader.sockets.gossip, exit.clone(), sink(), ); @@ -303,13 +303,13 @@ mod tests { ) { let replicant = TestNode::new(); let replicant_bank = Bank::new(&alice); - let mut ts = Server::validator( + let mut ts = Server::new_validator( replicant_bank, replicant.data.clone(), - replicant.requests, - replicant.respond, - replicant.replicate, - replicant.gossip, + replicant.sockets.requests, + replicant.sockets.respond, + replicant.sockets.replicate, + replicant.sockets.gossip, leader.clone(), exit.clone(), ); @@ -334,7 +334,7 @@ mod tests { let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_window = default_window(); - let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy.gossip, exit.clone()); + let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy.sockets.gossip, exit.clone()); let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); //wait for the network to converge let mut converged = false; @@ -373,16 +373,16 @@ mod tests { let leader_bank = Bank::new(&alice); let events_addr = leader.data.events_addr; - let server = Server::leader( + let server = Server::new_leader( leader_bank, alice.last_id(), None, leader.data.clone(), - leader.requests, - leader.event, - leader.broadcast, - leader.respond, - leader.gossip, + leader.sockets.requests, + leader.sockets.event, + leader.sockets.broadcast, + leader.sockets.respond, + leader.sockets.gossip, exit.clone(), sink(), ); diff --git a/src/tvu.rs b/src/tvu.rs index a75bbe59d8fc83..c4fd85229db89f 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -11,7 +11,7 @@ //! peers in the crdt. Peers is everyone who is not me or the leader that has a known replicate //! address. //! 3.b window -//! - Verified blobs are placed into a window, indexed by the counter set by the leader. This could +//! - Verified blobs are placed into a window, indexed by the counter set by the leader.sockets. This could //! be the PoH counter if its monitonically increasing in each blob. Easure coding is used to //! recover any missing packets, and requests are made at random to peers and parents to retransmit //! a missing packet. @@ -189,7 +189,7 @@ pub mod tests { let cref_l = Arc::new(RwLock::new(crdt_l)); let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone()); let window1 = streamer::default_window(); - let t_l_listen = Crdt::listen(cref_l, window1, leader.gossip, exit.clone()); + let t_l_listen = Crdt::listen(cref_l, window1, leader.sockets.gossip, exit.clone()); //start crdt2 let mut crdt2 = Crdt::new(target2.data.clone()); @@ -199,7 +199,7 @@ pub mod tests { let cref2 = Arc::new(RwLock::new(crdt2)); let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone()); let window2 = streamer::default_window(); - let t2_listen = Crdt::listen(cref2, window2, target2.gossip, exit.clone()); + let t2_listen = Crdt::listen(cref2, window2, target2.sockets.gossip, exit.clone()); // setup some blob services to send blobs into the socket // to simulate the source peer and get blobs out of the socket to @@ -210,14 +210,14 @@ pub mod tests { let t_receiver = streamer::blob_receiver( exit.clone(), recv_recycler.clone(), - target2.replicate, + target2.sockets.replicate, s_reader, ).unwrap(); // simulate leader sending messages let (s_responder, r_responder) = channel(); let t_responder = streamer::responder( - leader.requests, + leader.sockets.requests, exit.clone(), resp_recycler.clone(), r_responder, @@ -230,8 +230,8 @@ pub mod tests { let tvu = Tvu::new( bank.clone(), target1.data, - target1.gossip, - target1.replicate, + target1.sockets.gossip, + target1.sockets.replicate, leader.data, exit.clone(), ); @@ -305,8 +305,7 @@ pub mod tests { t_l_gossip.join().expect("join"); t_l_listen.join().expect("join"); } - pub struct TestNode { - pub data: ReplicatedData, + pub struct Sockets { pub gossip: UdpSocket, pub requests: UdpSocket, pub replicate: UdpSocket, @@ -314,6 +313,10 @@ pub mod tests { pub respond: UdpSocket, pub broadcast: UdpSocket, } + pub struct TestNode { + pub data: ReplicatedData, + pub sockets: Sockets, + } impl TestNode { pub fn new() -> TestNode { let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -331,13 +334,15 @@ pub mod tests { event.local_addr().unwrap(), ); TestNode { - data, - gossip, - requests, - replicate, - event, - respond, - broadcast, + data: data, + sockets: Sockets { + gossip, + requests, + replicate, + event, + respond, + broadcast, + } } } } From b4af52580e406d9136cbb8a910add5952bb69113 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 23 May 2018 11:06:34 -0700 Subject: [PATCH 8/8] fmt --- src/thin_client.rs | 7 ++++++- src/tvu.rs | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/thin_client.rs b/src/thin_client.rs index 7cfacf82cb5d4c..249119e0af6aa1 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -334,7 +334,12 @@ mod tests { let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_window = default_window(); - let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy.sockets.gossip, exit.clone()); + let t_spy_listen = Crdt::listen( + spy_ref.clone(), + spy_window, + spy.sockets.gossip, + exit.clone(), + ); let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); //wait for the network to converge let mut converged = false; diff --git a/src/tvu.rs b/src/tvu.rs index c4fd85229db89f..d8faf3a315dc4b 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -342,7 +342,7 @@ pub mod tests { event, respond, broadcast, - } + }, } } }