diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 7b71233ebce456..b4928fdab2b012 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -76,11 +76,10 @@ pub enum Response { impl AccountantSkel { /// Create a new AccountantSkel that wraps the given Accountant. - pub fn new(acc: Accountant, last_id: Hash, historian: Historian) -> Self { + pub fn new(acc: Accountant, last_id: Hash) -> Self { AccountantSkel { acc, last_id, - historian, entry_info_subscribers: vec![], } } @@ -105,24 +104,24 @@ impl AccountantSkel { fn run_sync( obj: Arc>, broadcast: &streamer::BlobSender, - blob_recycler: &streamer::BlobRecycler - writer: W + blob_recycler: &packet::BlobRecycler, + writer: W, historian: Receiver, ) -> Result<()> { //TODO clean this mess up let entry = historian.recv(Duration::new(1, 0))?; let mut b = blob_recycler.allocate(); let mut out = Cursor::new(b.data_mut()); - let mut ser = bincode::Serializer::new(out); - let mut seq = ser.serialize_seq(None); + let mut ser = Serializer::new(out); + let mut seq = ser.serialize_seq(None).expect("serialize end"); seq.serialize(entry).expect("serialize failed on first entry!"); obj.write().unwrap().notify_entry_info_subscribers(&entry); while let Ok(entry) = historian.try_recv() { //UNLOCK skel in this scope - let mut robj = obj.write().unwrap(); + let mut wobj = obj.write().unwrap(); if let Err(e) = seq.serialize(entry) { - seq.end(); + seq.end().expect("serialize end"); b.set_size(out.len()); broadcast.send(b)?; @@ -132,10 +131,10 @@ impl AccountantSkel { seq = ser.serialize_seq(None); seq.serialize(entry).expect("serialize failed on first entry!"); } - self.last_id = entry.id; - self.acc.register_entry_id(&self.last_id); + wobj.last_id = entry.id; + wobj.acc.register_entry_id(&wobj.last_id); writeln!(writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap(); - self.notify_entry_info_subscribers(&entry); + wobj.notify_entry_info_subscribers(&entry); } seq.end(); b.set_size(out.len()); @@ -146,14 +145,14 @@ impl AccountantSkel { pub fn sync_service( obj: Arc>, exit: AtomicBool, - broadcast: &streamer::BlobSender, - blob_recycler: &streamer::BlobRecycler + broadcast: streamer::BlobSender, + blob_recycler: packet::BlobRecycler, writer: W, historian: Receiver, ) -> JoinHandle<()> { spawn(move|| loop { let e = Self::run_sync(&obj, &broadcast, &blob_recycler, writer, &historian); - if e.is_err() && exit_.load(Ordering::Relaxed) { + if e.is_err() && exit.load(Ordering::Relaxed) { break; } }) @@ -367,10 +366,12 @@ impl AccountantSkel { /// Create a UDP microservice that forwards messages the given AccountantSkel. /// This service is the network leader /// Set `exit` to shutdown its threads. - pub fn serve( + pub fn serve( obj: &Arc>, - me: ReplicatedData + me: ReplicatedData, exit: Arc, + writer: W, + historian: Historian, ) -> Result>> { let gossip = UdpSocket::bind(me.gossip_addr)?; let read = UdpSocket::bind(me.serve_addr)?; @@ -412,11 +413,20 @@ impl AccountantSkel { blob_recycler.clone(), broadcast_receiver, ); + + let t_sync = Self::sync_service( + obj.clone(), + exit.clone(), + broadcast_sender, + blob_recycler.clone(), + writer, + historian + ); let skel = obj.clone(); let t_server = spawn(move || loop { let e = Self::process( - &skel, + &obj.clone(), &verified_receiver, &broadcast_sender, &responder_sender, @@ -424,15 +434,12 @@ impl AccountantSkel { &blob_recycler, ); if e.is_err() { - // Assume this was a timeout, so sync any empty entries. - skel.lock().unwrap().sync(); - if exit.load(Ordering::Relaxed) { break; } } }); - Ok(vec![t_receiver, t_responder, t_server, t_verifier]) + Ok(vec![t_receiver, t_responder, t_server, t_verifier, t_sync]) } /// This service receives messages from a leader in the network and processes the transactions @@ -465,7 +472,7 @@ impl AccountantSkel { let crdt = Arc::new(RwLock::new(Crdt::new(me))); crdt.write().unwrap().insert(&leader); let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); - let t_listen = Crdt::listen(crdt.clone(), exit.clone()); + let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone()); // make sure we are on the same interface let mut local = read.local_addr()?; diff --git a/src/crdt.rs b/src/crdt.rs index 66e08f51915ee3..6d753379b24a4e 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -37,9 +37,9 @@ pub struct ReplicatedData { /// should always be increasing version: u64, /// address to connect to for gossip - gossip_addr: SocketAddr, + pub gossip_addr: SocketAddr, /// address to connect to for replication - replicate_addr: SocketAddr, + pub replicate_addr: SocketAddr, /// address to connect to when this node is leader serve_addr: SocketAddr, /// current leader identity