From c6be81a2c9bec3436621b3a42041c84310ed15f2 Mon Sep 17 00:00:00 2001 From: "OEM Configuration (temporary user)" Date: Sun, 17 Jun 2018 04:33:24 -0700 Subject: [PATCH 1/3] added retransmission of repair messages --- src/crdt.rs | 74 +++++++++++++++++++++++++++++++++++++++++++-------- src/packet.rs | 1 + 2 files changed, 64 insertions(+), 11 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index 9015fc85b33f43..98180847a9357e 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -590,33 +590,45 @@ impl Crdt { } fn run_window_request( window: &Window, + me: &ReplicatedData, from: &ReplicatedData, ix: u64, blob_recycler: &BlobRecycler, ) -> Option { let pos = (ix as usize) % window.read().unwrap().len(); if let Some(blob) = &window.read().unwrap()[pos] { - let rblob = blob.read().unwrap(); - let blob_ix = rblob.get_index().expect("run_window_request get_index"); + let mut wblob = blob.write().unwrap(); + let blob_ix = wblob.get_index().expect("run_window_request get_index"); if blob_ix == ix { + let num_retransmits = wblob.meta.num_retransmits; + wblob.meta.num_retransmits += 1; + + if me.current_leader_id == me.id && + num_retransmits != 0 && + !num_retransmits.is_power_of_two() + { + return None; + } + let out = blob_recycler.allocate(); + // copy to avoid doing IO inside the lock { let mut outblob = out.write().unwrap(); - let sz = rblob.meta.size; + let sz = wblob.meta.size; outblob.meta.size = sz; - outblob.data[..sz].copy_from_slice(&rblob.data[..sz]); + outblob.data[..sz].copy_from_slice(&wblob.data[..sz]); outblob.meta.set_addr(&from.repair_addr); - //TODO, set the sender id to the requester so we dont retransmit - //come up with a cleaner solution for this when sender signatures are checked - outblob.set_id(from.id).expect("blob set_id"); + outblob.set_id(me.id).expect("blob set_id"); } + return Some(out); } } else { assert!(window.read().unwrap()[pos].is_none()); info!("failed RequestWindowIndex {} {}", ix, from.repair_addr); } + None } @@ -683,7 +695,7 @@ impl Crdt { me.repair_addr ); assert_ne!(from.repair_addr, me.repair_addr); - Self::run_window_request(&window, &from, ix, blob_recycler) + Self::run_window_request(&window, &me, &from, ix, blob_recycler) } Err(_) => { warn!("deserialize crdt packet failed"); @@ -807,6 +819,7 @@ mod tests { use packet::BlobRecycler; use result::Error; use signature::{KeyPair, KeyPairUtil}; + use std; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -1092,18 +1105,57 @@ mod tests { "127.0.0.1:1238".parse().unwrap(), ); let recycler = BlobRecycler::default(); - let rv = Crdt::run_window_request(&window, &me, 0, &recycler); + let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler); assert!(rv.is_none()); let out = recycler.allocate(); out.write().unwrap().meta.size = 200; window.write().unwrap()[0] = Some(out); - let rv = Crdt::run_window_request(&window, &me, 0, &recycler); + let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler); assert!(rv.is_some()); let v = rv.unwrap(); //test we copied the blob assert_eq!(v.read().unwrap().meta.size, 200); let len = window.read().unwrap().len() as u64; - let rv = Crdt::run_window_request(&window, &me, len, &recycler); + let rv = Crdt::run_window_request(&window, &me, &me, len, &recycler); + assert!(rv.is_none()); + } + + /// test window requests respond with the right blob, and do not overrun + #[test] + fn run_window_request_with_backoff() { + let window = default_window(); + let mut me = ReplicatedData::new( + KeyPair::new().pubkey(), + "127.0.0.1:1234".parse().unwrap(), + "127.0.0.1:1235".parse().unwrap(), + "127.0.0.1:1236".parse().unwrap(), + "127.0.0.1:1237".parse().unwrap(), + "127.0.0.1:1238".parse().unwrap(), + ); + + me.current_leader_id = me.id; + let recycler = BlobRecycler::default(); + let num_requests: u32 = 64; + + let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler); assert!(rv.is_none()); + let out = recycler.allocate(); + out.write().unwrap().meta.size = 200; + window.write().unwrap()[0] = Some(out); + let range: std::ops::Range = 0..num_requests; + + for i in range { + let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler); + + if i != 0 && !(i.is_power_of_two()) { + assert!(rv.is_none()); + continue; + } + + assert!(rv.is_some()); + let v = rv.unwrap(); + //test we copied the blob + assert_eq!(v.read().unwrap().meta.size, 200); + } } } diff --git a/src/packet.rs b/src/packet.rs index b98d6e0da3f579..5dfde8c807c6c7 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -29,6 +29,7 @@ pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; #[repr(C)] pub struct Meta { pub size: usize, + pub num_retransmits: u64, pub addr: [u16; 8], pub port: u16, pub v6: bool, From 84336deb1df0e40b7a4f6bd4fd0321d7506559db Mon Sep 17 00:00:00 2001 From: "OEM Configuration (temporary user)" Date: Tue, 19 Jun 2018 00:08:27 -0700 Subject: [PATCH 2/3] fix linting errors, add retransmission fix to repair requests --- src/crdt.rs | 48 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index 98180847a9357e..d6dbee9f29a7f8 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -602,12 +602,18 @@ impl Crdt { if blob_ix == ix { let num_retransmits = wblob.meta.num_retransmits; wblob.meta.num_retransmits += 1; - - if me.current_leader_id == me.id && - num_retransmits != 0 && - !num_retransmits.is_power_of_two() + // Setting the sender id to the requester id + // prevents the requester from retransmitting this response + // to other peers + let mut sender_id = from.id; + + // Allow retransmission of this response if the node + // is the leader and the number of repair requests equals + // a power of two + if me.current_leader_id == me.id + && (num_retransmits == 0 || num_retransmits.is_power_of_two()) { - return None; + sender_id = me.id } let out = blob_recycler.allocate(); @@ -619,7 +625,7 @@ impl Crdt { outblob.meta.size = sz; outblob.data[..sz].copy_from_slice(&wblob.data[..sz]); outblob.meta.set_addr(&from.repair_addr); - outblob.set_id(me.id).expect("blob set_id"); + outblob.set_id(sender_id).expect("blob set_id"); } return Some(out); @@ -1124,6 +1130,7 @@ mod tests { #[test] fn run_window_request_with_backoff() { let window = default_window(); + let mut me = ReplicatedData::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), @@ -1133,11 +1140,21 @@ mod tests { "127.0.0.1:1238".parse().unwrap(), ); + let mock_peer = ReplicatedData::new( + KeyPair::new().pubkey(), + "127.0.0.1:1234".parse().unwrap(), + "127.0.0.1:1235".parse().unwrap(), + "127.0.0.1:1236".parse().unwrap(), + "127.0.0.1:1237".parse().unwrap(), + "127.0.0.1:1238".parse().unwrap(), + ); + me.current_leader_id = me.id; let recycler = BlobRecycler::default(); let num_requests: u32 = 64; - let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler); + // Simulate handling a repair request from mock_peer + let rv = Crdt::run_window_request(&window, &me, &mock_peer, 0, &recycler); assert!(rv.is_none()); let out = recycler.allocate(); out.write().unwrap().meta.size = 200; @@ -1145,17 +1162,18 @@ mod tests { let range: std::ops::Range = 0..num_requests; for i in range { - let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler); + let rv = Crdt::run_window_request(&window, &me, &mock_peer, 0, &recycler); + assert!(rv.is_some()); + let v = rv.unwrap(); + let blob = v.read().unwrap(); + // Test we copied the blob + assert_eq!(blob.meta.size, 200); if i != 0 && !(i.is_power_of_two()) { - assert!(rv.is_none()); - continue; + assert_eq!(blob.get_id().unwrap(), mock_peer.id); + } else { + assert_eq!(blob.get_id().unwrap(), me.id); } - - assert!(rv.is_some()); - let v = rv.unwrap(); - //test we copied the blob - assert_eq!(v.read().unwrap().meta.size, 200); } } } From 7f8e44f51850d8e533d4ded05a48c741e73de413 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 19 Jun 2018 11:40:29 -0700 Subject: [PATCH 3/3] Cleanup test --- src/crdt.rs | 51 ++++++++++++++++++--------------------------------- 1 file changed, 18 insertions(+), 33 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index d6dbee9f29a7f8..5d713e55d83645 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -825,7 +825,6 @@ mod tests { use packet::BlobRecycler; use result::Error; use signature::{KeyPair, KeyPairUtil}; - use std; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -1131,49 +1130,35 @@ mod tests { fn run_window_request_with_backoff() { let window = default_window(); - let mut me = ReplicatedData::new( - KeyPair::new().pubkey(), - "127.0.0.1:1234".parse().unwrap(), - "127.0.0.1:1235".parse().unwrap(), - "127.0.0.1:1236".parse().unwrap(), - "127.0.0.1:1237".parse().unwrap(), - "127.0.0.1:1238".parse().unwrap(), - ); + let mut me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); + me.current_leader_id = me.id; - let mock_peer = ReplicatedData::new( - KeyPair::new().pubkey(), - "127.0.0.1:1234".parse().unwrap(), - "127.0.0.1:1235".parse().unwrap(), - "127.0.0.1:1236".parse().unwrap(), - "127.0.0.1:1237".parse().unwrap(), - "127.0.0.1:1238".parse().unwrap(), - ); + let mock_peer = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); - me.current_leader_id = me.id; let recycler = BlobRecycler::default(); - let num_requests: u32 = 64; // Simulate handling a repair request from mock_peer let rv = Crdt::run_window_request(&window, &me, &mock_peer, 0, &recycler); assert!(rv.is_none()); - let out = recycler.allocate(); - out.write().unwrap().meta.size = 200; - window.write().unwrap()[0] = Some(out); - let range: std::ops::Range = 0..num_requests; + let blob = recycler.allocate(); + let blob_size = 200; + blob.write().unwrap().meta.size = blob_size; + window.write().unwrap()[0] = Some(blob); - for i in range { - let rv = Crdt::run_window_request(&window, &me, &mock_peer, 0, &recycler); - assert!(rv.is_some()); - let v = rv.unwrap(); - let blob = v.read().unwrap(); + let num_requests: u32 = 64; + for i in 0..num_requests { + let shared_blob = + Crdt::run_window_request(&window, &me, &mock_peer, 0, &recycler).unwrap(); + let blob = shared_blob.read().unwrap(); // Test we copied the blob - assert_eq!(blob.meta.size, 200); + assert_eq!(blob.meta.size, blob_size); - if i != 0 && !(i.is_power_of_two()) { - assert_eq!(blob.get_id().unwrap(), mock_peer.id); + let id = if i == 0 || i.is_power_of_two() { + me.id } else { - assert_eq!(blob.get_id().unwrap(), me.id); - } + mock_peer.id + }; + assert_eq!(blob.get_id().unwrap(), id); } } }