From 0c492d7227e724c948be26f93f0fcfd5f48767eb Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Fri, 4 May 2018 11:11:39 -0700 Subject: [PATCH 1/5] Multinode fixes and test * Replace magic numbers for 64k event size * Fix gossip, dont ping yourself * Retransmit only to listening nodes * Multinode test in stub marked unstable --- src/accountant_skel.rs | 143 +++++++++++++++++++++++++-------- src/accountant_stub.rs | 176 ++++++++++++++++++++++++++++++++++++++++- src/bin/client-demo.rs | 28 ++++--- src/bin/testnode.rs | 9 +++ src/crdt.rs | 69 +++++++++++----- src/packet.rs | 16 +++- src/recorder.rs | 5 +- src/result.rs | 1 + 8 files changed, 375 insertions(+), 72 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 7982ae4fefe019..a2a8d21d42a0e1 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -19,6 +19,7 @@ use serde_json; use signature::PublicKey; use std::cmp::max; use std::collections::VecDeque; +use std::io::sink; use std::io::{Cursor, Write}; use std::mem::size_of; use std::net::{SocketAddr, UdpSocket}; @@ -100,6 +101,8 @@ impl AccountantSkel { num_events: entry.events.len() as u64, }; let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo"); + trace!("sending {} to {}", data.len(), addr); + //TODO dont do IO here, this needs to be on a separate channel let res = socket.send_to(&data, addr); if res.is_err() { eprintln!("couldn't send response: {:?}", res); @@ -182,16 +185,11 @@ impl AccountantSkel { broadcast: &streamer::BlobSender, blob_recycler: &packet::BlobRecycler, writer: &Arc>, - exit: Arc, ) -> Result<()> { let mut q = VecDeque::new(); - while let Ok(list) = Self::receive_all(&obj, writer) { - trace!("New blobs? {}", list.len()); - Self::process_entry_list_into_blobs(&list, blob_recycler, &mut q); - if exit.load(Ordering::Relaxed) { - break; - } - } + let list = Self::receive_all(&obj, writer)?; + trace!("New blobs? {}", list.len()); + Self::process_entry_list_into_blobs(&list, blob_recycler, &mut q); if !q.is_empty() { broadcast.send(q)?; } @@ -206,14 +204,26 @@ impl AccountantSkel { writer: Arc>, ) -> JoinHandle<()> { spawn(move || loop { - let e = Self::run_sync( - obj.clone(), - &broadcast, - &blob_recycler, - &writer, - exit.clone(), - ); - if e.is_err() && exit.load(Ordering::Relaxed) { + let _ = Self::run_sync(obj.clone(), &broadcast, &blob_recycler, &writer); + if exit.load(Ordering::Relaxed) { + info!("sync_service exiting"); + break; + } + }) + } + + /// Process any Entry items that have been published by the Historian. + /// continuosly broadcast blobs of entries out + fn run_sync_no_broadcast(obj: SharedSkel) -> Result<()> { + Self::receive_all(&obj, &Arc::new(Mutex::new(sink())))?; + Ok(()) + } + + pub fn sync_no_broadcast_service(obj: SharedSkel, exit: Arc) -> JoinHandle<()> { + spawn(move || loop { + let _ = Self::run_sync_no_broadcast(obj.clone()); + if exit.load(Ordering::Relaxed) { + info!("sync_no_broadcast_service exiting"); break; } }) @@ -228,7 +238,9 @@ impl AccountantSkel { match msg { Request::GetBalance { key } => { let val = self.acc.lock().unwrap().get_balance(&key); - Some((Response::Balance { key, val }, rsp_addr)) + let rsp = (Response::Balance { key, val }, rsp_addr); + info!("Response::Balance {:?}", rsp); + Some(rsp) } Request::Transaction(_) => unreachable!(), Request::Subscribe { subscriptions } => { @@ -247,10 +259,10 @@ impl AccountantSkel { fn recv_batch(recvr: &streamer::PacketReceiver) -> Result> { let timer = Duration::new(1, 0); let msgs = recvr.recv_timeout(timer)?; - trace!("got msgs"); + debug!("got msgs"); let mut batch = vec![msgs]; while let Ok(more) = recvr.try_recv() { - trace!("got more msgs"); + debug!("got more msgs"); batch.push(more); } info!("batch len {}", batch.len()); @@ -275,6 +287,7 @@ impl AccountantSkel { ) -> Result<()> { let batch = Self::recv_batch(recvr)?; let verified_batches = Self::verify_batch(batch); + debug!("verified batches: {}", verified_batches.len()); for xs in verified_batches { sendr.send(xs)?; } @@ -315,8 +328,9 @@ impl AccountantSkel { &self, req_vers: Vec<(Request, SocketAddr, u8)>, ) -> Result> { - trace!("partitioning"); + debug!("partitioning"); let (trs, reqs) = Self::partition_requests(req_vers); + debug!("trs: {} reqs: {}", trs.len(), reqs.len()); // Process the transactions in parallel and then log the successful ones. for result in self.acc.lock().unwrap().process_verified_transactions(trs) { @@ -328,15 +342,21 @@ impl AccountantSkel { } } + debug!("processing verified"); + // Let validators know they should not attempt to process additional // transactions in parallel. self.historian_input.lock().unwrap().send(Signal::Tick)?; + debug!("after historian_input"); + // Process the remaining requests serially. let rsps = reqs.into_iter() .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) .collect(); + debug!("returning rsps"); + Ok(rsps) } @@ -377,7 +397,7 @@ impl AccountantSkel { ) -> Result<()> { let timer = Duration::new(1, 0); let mms = verified_receiver.recv_timeout(timer)?; - trace!("got some messages: {}", mms.len()); + debug!("got some messages: {}", mms.len()); for (msgs, vers) in mms { let reqs = Self::deserialize_packets(&msgs.read().unwrap()); let req_vers = reqs.into_iter() @@ -389,18 +409,18 @@ impl AccountantSkel { v }) .collect(); - trace!("process_packets"); + debug!("process_packets"); let rsps = obj.process_packets(req_vers)?; - trace!("done process_packets"); + debug!("done process_packets"); let blobs = Self::serialize_responses(rsps, blob_recycler)?; - trace!("sending blobs: {}", blobs.len()); if !blobs.is_empty() { + info!("process: sending blobs: {}", blobs.len()); //don't wake up the other side if there is nothing responder_sender.send(blobs)?; } packet_recycler.recycle(msgs); } - trace!("done responding"); + debug!("done responding"); Ok(()) } /// Process verified blobs, already in order @@ -412,6 +432,7 @@ impl AccountantSkel { ) -> Result<()> { let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; + trace!("replicating blobs {}", blobs.len()); for msgs in &blobs { let blob = msgs.read().unwrap(); let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); @@ -541,10 +562,12 @@ impl AccountantSkel { obj: &SharedSkel, me: ReplicatedData, gossip: UdpSocket, + serve: UdpSocket, replicate: UdpSocket, leader: ReplicatedData, exit: Arc, ) -> Result>> { + //replicate pipeline let crdt = Arc::new(RwLock::new(Crdt::new(me))); crdt.write().unwrap().set_leader(leader.id); crdt.write().unwrap().insert(leader); @@ -580,7 +603,7 @@ impl AccountantSkel { //then sent to the window, which does the erasure coding reconstruction let t_window = streamer::window( exit.clone(), - crdt, + crdt.clone(), blob_recycler.clone(), blob_receiver, window_sender, @@ -588,19 +611,76 @@ impl AccountantSkel { ); let skel = obj.clone(); - let t_server = spawn(move || loop { + let s_exit = exit.clone(); + let t_replicator = spawn(move || loop { let e = Self::replicate_state(&skel, &window_receiver, &blob_recycler); - if e.is_err() && exit.load(Ordering::Relaxed) { + if e.is_err() && s_exit.load(Ordering::Relaxed) { + break; + } + }); + + //serve pipeline + // make sure we are on the same interface + let mut local = serve.local_addr()?; + local.set_port(0); + let respond_socket = UdpSocket::bind(local.clone())?; + + let packet_recycler = packet::PacketRecycler::default(); + let blob_recycler = packet::BlobRecycler::default(); + let (packet_sender, packet_receiver) = channel(); + let t_packet_receiver = + streamer::receiver(serve, exit.clone(), packet_recycler.clone(), packet_sender)?; + let (responder_sender, responder_receiver) = channel(); + let t_responder = streamer::responder( + respond_socket, + exit.clone(), + blob_recycler.clone(), + responder_receiver, + ); + let (verified_sender, verified_receiver) = channel(); + + let exit_ = exit.clone(); + let t_verifier = spawn(move || loop { + let e = Self::verifier(&packet_receiver, &verified_sender); + if e.is_err() && exit_.load(Ordering::Relaxed) { + trace!("verifier exiting"); break; } }); + + let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone()); + + let skel = obj.clone(); + let s_exit = exit.clone(); + let t_server = spawn(move || loop { + let e = Self::process( + &mut skel.clone(), + &verified_receiver, + &responder_sender, + &packet_recycler, + &blob_recycler, + ); + if e.is_err() { + if s_exit.load(Ordering::Relaxed) { + break; + } + } + }); + Ok(vec![ + //replicate threads t_blob_receiver, t_retransmit, t_window, - t_server, + t_replicator, t_gossip, t_listen, + //serve threads + t_packet_receiver, + t_responder, + t_server, + t_verifier, + t_sync, ]) } } @@ -769,7 +849,7 @@ mod tests { tr2.data.plan = Plan::new_payment(502, bob_pubkey); let _sig = acc_stub.transfer_signed(tr2).unwrap(); - assert_eq!(acc_stub.get_balance(&bob_pubkey).wait().unwrap(), 500); + assert_eq!(acc_stub.get_balance(&bob_pubkey).unwrap(), 500); trace!("exiting"); exit.store(true, Ordering::Relaxed); trace!("joining threads"); @@ -797,7 +877,7 @@ mod tests { fn test_replicate() { logger::setup(); let (leader_data, leader_gossip, _, leader_serve) = test_node(); - let (target1_data, target1_gossip, target1_replicate, _) = test_node(); + let (target1_data, target1_gossip, target1_replicate, target1_serve) = test_node(); let (target2_data, target2_gossip, target2_replicate, _) = test_node(); let exit = Arc::new(AtomicBool::new(false)); @@ -851,6 +931,7 @@ mod tests { &acc, target1_data, target1_gossip, + target1_serve, target1_replicate, leader_data, exit.clone(), diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 1797a53e1c6ca1..6bedd388611c89 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -11,6 +11,7 @@ use signature::{KeyPair, PublicKey, Signature}; use std::collections::HashMap; use std::io; use std::net::{SocketAddr, UdpSocket}; +use std::time::Duration; use transaction::Transaction; pub struct AccountantStub { @@ -47,7 +48,10 @@ impl AccountantStub { pub fn recv_response(&self) -> io::Result { let mut buf = vec![0u8; 1024]; + self.socket.set_read_timeout(Some(Duration::new(1, 0)))?; + info!("start recv_from"); self.socket.recv_from(&mut buf)?; + info!("end recv_from"); let resp = deserialize(&buf).expect("deserialize balance"); Ok(resp) } @@ -55,9 +59,11 @@ impl AccountantStub { pub fn process_response(&mut self, resp: Response) { match resp { Response::Balance { key, val } => { + info!("Response balance {:?} {:?}", key, val); self.balances.insert(key, val); } Response::EntryInfo(entry_info) => { + trace!("Response entry_info {:?}", entry_info.id); self.last_id = Some(entry_info.id); self.num_events += entry_info.num_events; } @@ -88,7 +94,8 @@ impl AccountantStub { /// Request the balance of the user holding `pubkey`. This method blocks /// until the server sends a response. If the response packet is dropped /// by the network, this method will hang indefinitely. - pub fn get_balance(&mut self, pubkey: &PublicKey) -> FutureResult { + pub fn get_balance(&mut self, pubkey: &PublicKey) -> io::Result { + info!("get_balance"); let req = Request::GetBalance { key: *pubkey }; let data = serialize(&req).expect("serialize GetBalance"); self.socket @@ -96,13 +103,14 @@ impl AccountantStub { .expect("buffer error"); let mut done = false; while !done { - let resp = self.recv_response().expect("recv response"); + let resp = self.recv_response()?; + info!("recv_response {:?}", resp); if let &Response::Balance { ref key, .. } = &resp { done = key == pubkey; } self.process_response(resp); } - ok(self.balances[pubkey].unwrap()) + self.balances[pubkey].ok_or(io::Error::new(io::ErrorKind::Other, "nokey")) } /// Request the last Entry ID from the server. This method blocks @@ -146,6 +154,7 @@ mod tests { use crdt::ReplicatedData; use futures::Future; use historian::Historian; + use logger; use mint::Mint; use signature::{KeyPair, KeyPairUtil}; use std::io::sink; @@ -158,6 +167,7 @@ mod tests { // TODO: Figure out why this test sometimes hangs on TravisCI. #[test] fn test_accountant_stub() { + logger::setup(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); let addr = serve.local_addr().unwrap(); @@ -186,10 +196,168 @@ mod tests { let last_id = acc.get_last_id().wait().unwrap(); let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id) .unwrap(); - assert_eq!(acc.get_balance(&bob_pubkey).wait().unwrap(), 500); + assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500); exit.store(true, Ordering::Relaxed); for t in threads { t.join().unwrap(); } } } + +#[cfg(all(feature = "unstable", test))] +mod unstsable_tests { + use super::*; + use accountant::Accountant; + use accountant_skel::AccountantSkel; + use crdt::{Crdt, ReplicatedData}; + use futures::Future; + use historian::Historian; + use logger; + use mint::Mint; + use signature::{KeyPair, KeyPairUtil}; + use std::io::sink; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::mpsc::sync_channel; + use std::sync::{Arc, RwLock}; + use std::thread::sleep; + use std::time::Duration; + + fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket) { + let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); + let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); + let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); + let pubkey = KeyPair::new().pubkey(); + let leader = ReplicatedData::new( + pubkey, + gossip.local_addr().unwrap(), + replicate.local_addr().unwrap(), + serve.local_addr().unwrap(), + ); + (leader, gossip, serve, replicate) + } + + #[test] + fn test_multi_accountant_stub() { + logger::setup(); + info!("test_multi_accountant_stub"); + let leader = test_node(); + let replicant = test_node(); + let alice = Mint::new(10_000); + let bob_pubkey = KeyPair::new().pubkey(); + let exit = Arc::new(AtomicBool::new(false)); + + let leader_acc = { + let (input, event_receiver) = sync_channel(10); + let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); + let acc = Accountant::new(&alice); + Arc::new(AccountantSkel::new(acc, input, historian)) + }; + + let replicant_acc = { + let (input, event_receiver) = sync_channel(10); + let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); + let acc = Accountant::new(&alice); + Arc::new(AccountantSkel::new(acc, input, historian)) + }; + + let leader_threads = AccountantSkel::serve( + &leader_acc, + leader.0.clone(), + leader.2, + leader.1, + exit.clone(), + sink(), + ).unwrap(); + let replicant_threads = AccountantSkel::replicate( + &replicant_acc, + replicant.0.clone(), + replicant.1, + replicant.2, + replicant.3, + leader.0.clone(), + exit.clone(), + ).unwrap(); + + //lets spy on the network + let (mut spy, spy_gossip, _, _) = test_node(); + let daddr = "0.0.0.0:0".parse().unwrap(); + spy.replicate_addr = daddr; + spy.serve_addr = daddr; + let mut spy_crdt = Crdt::new(spy); + spy_crdt.insert(leader.0.clone()); + spy_crdt.set_leader(leader.0.id); + + let spy_ref = Arc::new(RwLock::new(spy_crdt)); + let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_gossip, exit.clone()); + let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); + //wait for the network to converge + for _ in 0..20 { + let ix = spy_ref.read().unwrap().update_index; + info!("my update index is {}", ix); + let len = spy_ref.read().unwrap().remote.values().len(); + let mut done = false; + info!("remote len {}", len); + if len > 1 && ix > 2 { + done = true; + //check if everyones remote index is greater or equal to ours + let vs: Vec = spy_ref.read().unwrap().remote.values().cloned().collect(); + for t in vs.into_iter() { + info!("remote update index is {} vs {}", t, ix); + if t < 3 { + done = false; + } + } + } + if done == true { + info!("converged!"); + break; + } + sleep(Duration::new(1, 0)); + } + + //verify leader can do transfer + let leader_balance = { + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); + + let mut acc = AccountantStub::new(leader.0.serve_addr, socket); + info!("getting leader last_id"); + let last_id = acc.get_last_id().wait().unwrap(); + info!("executing leader transer"); + let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id) + .unwrap(); + info!("getting leader balance"); + acc.get_balance(&bob_pubkey).unwrap() + }; + assert_eq!(leader_balance, 500); + //verify replicant has the same balance + let mut replicant_balance = 0; + for _ in 0..10 { + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); + + let mut acc = AccountantStub::new(replicant.0.serve_addr, socket); + info!("getting replicant balance"); + if let Ok(bal) = acc.get_balance(&bob_pubkey) { + replicant_balance = bal; + } + info!("replicant balance {}", replicant_balance); + if replicant_balance == leader_balance { + break; + } + sleep(Duration::new(1, 0)); + } + assert_eq!(replicant_balance, leader_balance); + + exit.store(true, Ordering::Relaxed); + for t in leader_threads { + t.join().unwrap(); + } + for t in replicant_threads { + t.join().unwrap(); + } + for t in vec![t_spy_listen, t_spy_gossip] { + t.join().unwrap(); + } + } +} diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 739b6cea4aadf6..4c868dfcf0c614 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -18,6 +18,8 @@ use std::env; use std::io::{stdin, Read}; use std::net::{SocketAddr, UdpSocket}; use std::process::exit; +use std::thread::sleep; +use std::time::Duration; use std::time::Instant; use untrusted::Input; @@ -38,7 +40,7 @@ fn main() { let mut opts = Options::new(); opts.optopt("s", "", "server address", "host:port"); opts.optopt("c", "", "client address", "host:port"); - opts.optopt("t", "", "number of threads", "4"); + opts.optopt("t", "", "number of threads", &format!("{}", threads)); opts.optflag("h", "help", "print help"); let args: Vec = env::args().collect(); let matches = match opts.parse(&args[1..]) { @@ -84,6 +86,7 @@ fn main() { println!("Binding to {}", client_addr); let socket = UdpSocket::bind(&client_addr).unwrap(); + socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); let mut acc = AccountantStub::new(addr.parse().unwrap(), socket); println!("Get last ID..."); @@ -104,7 +107,7 @@ fn main() { .into_par_iter() .map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id)) .collect(); - let duration = now.elapsed(); + let mut duration = now.elapsed(); let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let bsps = txs as f64 / ns as f64; let nsps = ns as f64 / txs as f64; @@ -115,6 +118,7 @@ fn main() { ); let initial_tx_count = acc.transaction_count(); + println!("initial count {}", initial_tx_count); println!("Transfering {} transactions in {} batches", txs, threads); let now = Instant::now(); @@ -131,16 +135,16 @@ fn main() { } }); - println!("Waiting for half the transactions to complete...",); - let mut tx_count = acc.transaction_count(); - while tx_count < transactions.len() as u64 / 2 { + println!("Waiting for transactions to complete...",); + let mut tx_count; + for _ in 0..5 { tx_count = acc.transaction_count(); + duration = now.elapsed(); + let txs = tx_count - initial_tx_count; + println!("Transactions processed {}", txs); + let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); + let tps = (txs * 1_000_000_000) as f64 / ns as f64; + println!("{} tps", tps); + sleep(Duration::new(1, 0)); } - let txs = tx_count - initial_tx_count; - println!("Transactions processed {}", txs); - - let duration = now.elapsed(); - let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); - let tps = (txs * 1_000_000_000) as f64 / ns as f64; - println!("Done. {} tps", tps); } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index c12840e843808f..e9318bbb8fff92 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -76,6 +76,8 @@ fn main() { }) }); + eprintln!("done parsing..."); + // The first item in the ledger is required to be an entry with zero num_hashes, // which implies its id can be used as the ledger's seed. let entry0 = entries.next().unwrap(); @@ -90,10 +92,14 @@ fn main() { None }; + eprintln!("creating accountant..."); + let acc = Accountant::new_from_deposit(&deposit.unwrap()); acc.register_entry_id(&entry0.id); acc.register_entry_id(&entry1.id); + eprintln!("processing entries..."); + let mut last_id = entry1.id; for entry in entries { last_id = entry.id; @@ -101,6 +107,8 @@ fn main() { acc.register_entry_id(&last_id); } + eprintln!("creating networking stack..."); + let (input, event_receiver) = sync_channel(10_000); let historian = Historian::new(event_receiver, &last_id, Some(1000)); let exit = Arc::new(AtomicBool::new(false)); @@ -115,6 +123,7 @@ fn main() { replicate_sock.local_addr().unwrap(), serve_sock.local_addr().unwrap(), ); + eprintln!("starting server..."); let threads = AccountantSkel::serve(&skel, d, serve_sock, gossip_sock, exit.clone(), stdout()).unwrap(); eprintln!("Ready. Listening on {}", serve_addr); diff --git a/src/crdt.rs b/src/crdt.rs index a0ece4319b4715..df01d89ec10694 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -91,7 +91,7 @@ pub struct Crdt { local: HashMap, /// The value of the remote update index that i have last seen /// This Node will ask external nodes for updates since the value in this list - remote: HashMap, + pub remote: HashMap, pub update_index: u64, me: PublicKey, timeout: Duration, @@ -172,20 +172,33 @@ impl Crdt { let cloned_table: Vec = robj.table.values().cloned().collect(); (robj.table[&robj.me].clone(), cloned_table) }; - let errs: Vec<_> = table + let daddr = "0.0.0.0:0".parse().unwrap(); + let items: Vec<(usize, &ReplicatedData)> = table .iter() - .enumerate() - .cycle() - .zip(blobs.iter()) - .map(|((i, v), b)| { + .filter(|v| { if me.id == v.id { - return Ok(0); + //filter myself + false + } else if v.replicate_addr == daddr { + //filter nodes that are not listening + false + } else { + true } + }) + .enumerate() + .collect(); + let orders: Vec<_> = items.into_iter().cycle().zip(blobs.iter()).collect(); + let errs: Vec<_> = orders + .into_par_iter() + .map(|((i, v), b)| { // only leader should be broadcasting assert!(me.current_leader_id != v.id); let mut blob = b.write().unwrap(); + blob.set_id(me.id).expect("set_id"); blob.set_index(*transmit_index + i as u64) .expect("set_index"); + //TODO profile this, may need multiple sockets for par_iter s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr) }) .collect(); @@ -210,17 +223,28 @@ impl Crdt { (s.table[&s.me].clone(), s.table.values().cloned().collect()) }; let rblob = blob.read().unwrap(); - let errs: Vec<_> = table - .par_iter() - .map(|v| { + let daddr = "0.0.0.0:0".parse().unwrap(); + let orders: Vec<_> = table + .iter() + .filter(|v| { if me.id == v.id { - return Ok(0); - } - if me.current_leader_id == v.id { - trace!("skip retransmit to leader{:?}", v.id); - return Ok(0); + false + } else if me.current_leader_id == v.id { + trace!("skip retransmit to leader {:?}", v.id); + false + } else if v.replicate_addr == daddr { + trace!("skip nodes that are not listening {:?}", v.id); + false + } else { + true } + }) + .collect(); + let errs: Vec<_> = orders + .par_iter() + .map(|v| { trace!("retransmit blob to {}", v.replicate_addr); + //TODO profile this, may need multiple sockets for par_iter s.send_to(&rblob.data[..rblob.meta.size], &v.replicate_addr) }) .collect(); @@ -258,13 +282,18 @@ impl Crdt { /// (A,B) /// * A - Address to send to /// * B - RequestUpdates protocol message - fn gossip_request(&self) -> (SocketAddr, Protocol) { - let n = (Self::random() as usize) % self.table.len(); - trace!("random {:?} {}", &self.me[0..1], n); + fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> { + if self.table.len() <= 1 { + return Err(Error::GeneralError); + } + let mut n = (Self::random() as usize) % self.table.len(); + while self.table.values().nth(n).unwrap().id == self.me { + n = (Self::random() as usize) % self.table.len(); + } let v = self.table.values().nth(n).unwrap().clone(); let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); - (v.gossip_addr, req) + Ok((v.gossip_addr, req)) } /// At random pick a node and try to get updated changes from them @@ -274,7 +303,7 @@ impl Crdt { // Lock the object only to do this operation and not for any longer // especially not when doing the `sock.send_to` - let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request(); + let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request()?; let sock = UdpSocket::bind("0.0.0.0:0")?; // TODO this will get chatty, so we need to first ask for number of updates since // then only ask for specific data that we dont have diff --git a/src/packet.rs b/src/packet.rs index a2d0db4ff8ae75..471fe67fa5c614 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -176,18 +176,26 @@ impl Packets { // * read until it fails // * set it back to blocking before returning socket.set_nonblocking(false)?; + let mut error_msgs = 0; for p in &mut self.packets { p.meta.size = 0; + trace!("receiving"); match socket.recv_from(&mut p.data) { Err(_) if i > 0 => { - trace!("got {:?} messages", i); - break; + debug!("got {:?} messages", i); + error_msgs += 1; + if error_msgs > 30 { + break; + } else { + continue; + } } Err(e) => { - info!("recv_from err {:?}", e); + trace!("recv_from err {:?}", e); return Err(Error::IO(e)); } Ok((nrecv, from)) => { + error_msgs = 0; p.meta.size = nrecv; p.meta.set_addr(&from); if i == 0 { @@ -202,6 +210,7 @@ impl Packets { pub fn recv_from(&mut self, socket: &UdpSocket) -> Result<()> { let sz = self.run_read_from(socket)?; self.packets.resize(sz, Packet::default()); + debug!("recv_from: {}", sz); Ok(()) } pub fn send_to(&self, socket: &UdpSocket) -> Result<()> { @@ -233,6 +242,7 @@ impl Blob { let e = deserialize(&self.data[BLOB_INDEX_END..BLOB_ID_END])?; Ok(e) } + pub fn set_id(&mut self, id: PublicKey) -> Result<()> { let wtr = serialize(&id)?; self.data[BLOB_INDEX_END..BLOB_ID_END].clone_from_slice(&wtr); diff --git a/src/recorder.rs b/src/recorder.rs index 38cc6d87a103b4..674b99c3705c7e 100644 --- a/src/recorder.rs +++ b/src/recorder.rs @@ -8,6 +8,7 @@ use entry::{create_entry_mut, Entry}; use event::Event; use hash::{hash, Hash}; +use packet::BLOB_SIZE; use std::mem; use std::sync::mpsc::{Receiver, SyncSender, TryRecvError}; use std::time::{Duration, Instant}; @@ -83,7 +84,7 @@ impl Recorder { // Record an entry early if we anticipate its serialized size will // be larger than 64kb. At the time of this writing, we assume each // event will be well under 256 bytes. - if self.events.len() >= 65_536 / 256 { + if self.events.len() >= BLOB_SIZE / (2 * mem::size_of::()) { self.record_entry()?; } } @@ -100,8 +101,8 @@ mod tests { use super::*; use bincode::serialize; use signature::{KeyPair, KeyPairUtil}; - use transaction::Transaction; use std::sync::mpsc::sync_channel; + use transaction::Transaction; #[test] fn test_sub64k_entry_size() { diff --git a/src/result.rs b/src/result.rs index 532a64c3b2b9ef..d2cb485add3aa1 100644 --- a/src/result.rs +++ b/src/result.rs @@ -18,6 +18,7 @@ pub enum Error { AccountingError(accountant::AccountingError), SendError, Services, + GeneralError, } pub type Result = std::result::Result; From 3f16f8bcb0520d0282ee17b74b8f7e43738dd55b Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 6 May 2018 21:48:46 -0700 Subject: [PATCH 2/5] useless timeouts i think --- src/accountant_stub.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 6bedd388611c89..dd853ca649df4b 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -11,7 +11,6 @@ use signature::{KeyPair, PublicKey, Signature}; use std::collections::HashMap; use std::io; use std::net::{SocketAddr, UdpSocket}; -use std::time::Duration; use transaction::Transaction; pub struct AccountantStub { @@ -48,7 +47,6 @@ impl AccountantStub { pub fn recv_response(&self) -> io::Result { let mut buf = vec![0u8; 1024]; - self.socket.set_read_timeout(Some(Duration::new(1, 0)))?; info!("start recv_from"); self.socket.recv_from(&mut buf)?; info!("end recv_from"); @@ -190,7 +188,6 @@ mod tests { sleep(Duration::from_millis(300)); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); let mut acc = AccountantStub::new(addr, socket); let last_id = acc.get_last_id().wait().unwrap(); From 19443246a6602de4cdef03b8aa04990520b9e4bc Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 6 May 2018 22:06:19 -0700 Subject: [PATCH 3/5] fixed constant --- src/recorder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/recorder.rs b/src/recorder.rs index 674b99c3705c7e..cb6f81f09c3503 100644 --- a/src/recorder.rs +++ b/src/recorder.rs @@ -84,7 +84,7 @@ impl Recorder { // Record an entry early if we anticipate its serialized size will // be larger than 64kb. At the time of this writing, we assume each // event will be well under 256 bytes. - if self.events.len() >= BLOB_SIZE / (2 * mem::size_of::()) { + if self.events.len() >= BLOB_SIZE / 256 { self.record_entry()?; } } From 3c71391ad00f4aa89f7caf5b5b8e66a8a2b6c51d Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 6 May 2018 22:25:05 -0700 Subject: [PATCH 4/5] fixed! --- src/accountant_stub.rs | 24 +++--------------------- src/packet.rs | 10 ++-------- src/recorder.rs | 4 ++-- 3 files changed, 7 insertions(+), 31 deletions(-) diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index dd853ca649df4b..a3715cf83746cb 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -149,7 +149,7 @@ mod tests { use super::*; use accountant::Accountant; use accountant_skel::AccountantSkel; - use crdt::ReplicatedData; + use crdt::{Crdt, ReplicatedData}; use futures::Future; use historian::Historian; use logger; @@ -158,7 +158,7 @@ mod tests { use std::io::sink; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::sync_channel; - use std::sync::Arc; + use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; @@ -199,25 +199,6 @@ mod tests { t.join().unwrap(); } } -} - -#[cfg(all(feature = "unstable", test))] -mod unstsable_tests { - use super::*; - use accountant::Accountant; - use accountant_skel::AccountantSkel; - use crdt::{Crdt, ReplicatedData}; - use futures::Future; - use historian::Historian; - use logger; - use mint::Mint; - use signature::{KeyPair, KeyPairUtil}; - use std::io::sink; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::mpsc::sync_channel; - use std::sync::{Arc, RwLock}; - use std::thread::sleep; - use std::time::Duration; fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket) { let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -234,6 +215,7 @@ mod unstsable_tests { } #[test] + // #[ignore] fn test_multi_accountant_stub() { logger::setup(); info!("test_multi_accountant_stub"); diff --git a/src/packet.rs b/src/packet.rs index 471fe67fa5c614..713a166f686479 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -17,6 +17,7 @@ pub type BlobRecycler = Recycler; pub const NUM_PACKETS: usize = 1024 * 8; pub const BLOB_SIZE: usize = 64 * 1024; +pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_ID_END; pub const PACKET_DATA_SIZE: usize = 256; pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; @@ -176,26 +177,19 @@ impl Packets { // * read until it fails // * set it back to blocking before returning socket.set_nonblocking(false)?; - let mut error_msgs = 0; for p in &mut self.packets { p.meta.size = 0; trace!("receiving"); match socket.recv_from(&mut p.data) { Err(_) if i > 0 => { debug!("got {:?} messages", i); - error_msgs += 1; - if error_msgs > 30 { - break; - } else { - continue; - } + break; } Err(e) => { trace!("recv_from err {:?}", e); return Err(Error::IO(e)); } Ok((nrecv, from)) => { - error_msgs = 0; p.meta.size = nrecv; p.meta.set_addr(&from); if i == 0 { diff --git a/src/recorder.rs b/src/recorder.rs index cb6f81f09c3503..68a8cf8dae1022 100644 --- a/src/recorder.rs +++ b/src/recorder.rs @@ -8,7 +8,7 @@ use entry::{create_entry_mut, Entry}; use event::Event; use hash::{hash, Hash}; -use packet::BLOB_SIZE; +use packet::BLOB_DATA_SIZE; use std::mem; use std::sync::mpsc::{Receiver, SyncSender, TryRecvError}; use std::time::{Duration, Instant}; @@ -84,7 +84,7 @@ impl Recorder { // Record an entry early if we anticipate its serialized size will // be larger than 64kb. At the time of this writing, we assume each // event will be well under 256 bytes. - if self.events.len() >= BLOB_SIZE / 256 { + if self.events.len() >= BLOB_DATA_SIZE / 256 { self.record_entry()?; } } From 72b40e5b46f2351fab6a873423e14d02c29ee937 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 6 May 2018 22:29:33 -0700 Subject: [PATCH 5/5] fmt --- src/accountant_stub.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index a3715cf83746cb..d5356083be5536 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -215,7 +215,6 @@ mod tests { } #[test] - // #[ignore] fn test_multi_accountant_stub() { logger::setup(); info!("test_multi_accountant_stub");