diff --git a/src/packet.rs b/src/packet.rs index 0e684357209a00..c0083dba4020e7 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -214,7 +214,7 @@ impl Packets { } const BLOB_INDEX_END: usize = size_of::(); -const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::(); +const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::() + size_of::(); impl Blob { pub fn get_index(&self) -> Result { diff --git a/src/streamer.rs b/src/streamer.rs index a21d6a59a698cf..bdc6882a9180a7 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -516,7 +516,10 @@ mod test { read.local_addr().unwrap(), send.local_addr().unwrap(), serve.local_addr().unwrap()); - let subs = Arc::new(RwLock::new(Crdt::new(rep_data))); + let mut crdt_me = Crdt::new(rep_data); + let me_id = crdt_me.my_data().id; + crdt_me.set_leader(me_id); + let subs = Arc::new(RwLock::new(crdt_me)); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); @@ -541,6 +544,7 @@ mod test { let b_ = b.clone(); let mut w = b.write().unwrap(); w.set_index(i).unwrap(); + w.set_id(me_id).unwrap(); assert_eq!(i, w.get_index().unwrap()); w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&addr); @@ -573,7 +577,10 @@ mod test { read.local_addr().unwrap(), send.local_addr().unwrap(), serve.local_addr().unwrap()); - let subs = Arc::new(RwLock::new(Crdt::new(rep_data))); + let mut crdt_me = Crdt::new(rep_data); + let me_id = crdt_me.my_data().id; + crdt_me.set_leader(me_id); + let subs = Arc::new(RwLock::new(crdt_me)); let (s_retransmit, r_retransmit) = channel(); let blob_recycler = BlobRecycler::default(); @@ -593,7 +600,9 @@ mod test { let (s_blob_receiver, r_blob_receiver) = channel(); let t_receiver = blob_receiver(exit.clone(), blob_recycler.clone(), read, s_blob_receiver).unwrap(); - let mut oq = r_blob_receiver.recv().unwrap(); + let recv = r_blob_receiver.recv(); + println!("recv: {:?}", recv); + let mut oq = recv.unwrap(); assert_eq!(oq.len(), 1); let o = oq.pop_front().unwrap(); let ro = o.read().unwrap();