From 2e49ae87878c73c6195ccd9c30253c71a06e8335 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 29 Apr 2018 23:17:05 -0700 Subject: [PATCH] progress --- src/crdt.rs | 2 +- src/lib.rs | 2 +- src/streamer.rs | 7 +++---- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index dc009b28896c7c..66e08f51915ee3 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -32,7 +32,7 @@ use packet::SharedBlob; /// Structure to be replicated by the network #[derive(Serialize, Deserialize, Clone)] pub struct ReplicatedData { - id: PublicKey, + pub id: PublicKey, sig: Signature, /// should always be increasing version: u64, diff --git a/src/lib.rs b/src/lib.rs index 5a5878205582a6..2116638a22e416 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,7 @@ pub mod plan; pub mod recorder; pub mod result; pub mod signature; -//pub mod streamer; +pub mod streamer; pub mod transaction; extern crate bincode; extern crate byteorder; diff --git a/src/streamer.rs b/src/streamer.rs index 4035948bc38ebf..2974e2753ea69f 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -111,7 +111,7 @@ pub fn blob_receiver( fn recv_window( window: &mut Vec>, - subs: &Arc>, + crdt: &Arc>, recycler: &BlobRecycler, consumed: &mut usize, r: &BlobReceiver, @@ -127,7 +127,6 @@ fn recv_window( { //retransmit all leader blocks let mut retransmitq = VecDeque::new(); - let rsubs = subs.read().unwrap(); for b in &dq { let p = b.read().unwrap(); //TODO this check isn't safe against adverserial packets @@ -137,9 +136,9 @@ fn recv_window( p.get_index().unwrap(), p.get_id().unwrap(), p.meta.addr(), - rsubs.leader.addr + leader_id ); - if p.get_id() == Ok(leader_id) { + if p.get_id().unwrap() == leader_id { //TODO //need to copy the retransmited blob //otherwise we get into races with which thread