Skip to content

Commit

Permalink
Refactor validator windowing
Browse files Browse the repository at this point in the history
- a unit test for windowing functions
- issue #857
  • Loading branch information
pgarg66 committed Aug 6, 2018
1 parent 91741e2 commit a9ecf9d
Showing 1 changed file with 58 additions and 15 deletions.
73 changes: 58 additions & 15 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,39 @@ fn process_blob(
}
}

fn validate_blob_against_window(
debug_id: u64,
pix: u64,
consumed: u64,
received: &mut u64,
) -> bool {
// Prevent receive window from running over
if pix >= consumed + WINDOW_SIZE {
debug!(
"{:x}: received: {} will overrun window: {} skipping..",
debug_id,
pix,
consumed + WINDOW_SIZE
);
return false;
}

// Got a blob which has already been consumed, skip it
// probably from a repair window request
if pix < consumed {
debug!(
"{:x}: received: {} but older than consumed: {} skipping..",
debug_id, pix, consumed
);
return false;
}

if pix > *received {
*received = pix;
}
true
}

fn recv_window(
debug_id: u64,
window: &SharedWindow,
Expand Down Expand Up @@ -489,21 +522,8 @@ fn recv_window(
let p = b.write().expect("'b' write lock in fn recv_window");
(p.get_index()?, p.meta.size)
};
// Prevent receive window from running over
if pix >= *consumed + WINDOW_SIZE {
recycler.recycle(b);
continue;
}
if pix > *received {
*received = pix;
}
// Got a blob which has already been consumed, skip it
// probably from a repair window request
if pix < *consumed {
debug!(
"{:x}: received: {} but older than consumed: {} skipping..",
debug_id, pix, *consumed
);

if validate_blob_against_window(debug_id, pix, *consumed, received) == false {
recycler.recycle(b);
continue;
}
Expand Down Expand Up @@ -930,6 +950,7 @@ mod test {
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer::calculate_highest_lost_blob_index;
use streamer::validate_blob_against_window;
use streamer::{blob_receiver, receiver, responder, window};
use streamer::{default_window, BlobReceiver, PacketReceiver, WINDOW_SIZE};

Expand Down Expand Up @@ -1107,4 +1128,26 @@ mod test {
WINDOW_SIZE + 9
);
}

#[test]
pub fn validate_blob_against_window_test() {
let mut recvd: u64 = 100;
assert!(!validate_blob_against_window(
0,
90 + WINDOW_SIZE,
90,
&mut recvd
));
assert!(!validate_blob_against_window(
0,
91 + WINDOW_SIZE,
90,
&mut recvd
));
assert!(!validate_blob_against_window(0, 89, 90, &mut recvd));
assert!(validate_blob_against_window(0, 91, 90, &mut recvd));
assert_eq!(recvd, 100);
assert!(validate_blob_against_window(0, 101, 90, &mut recvd));
assert_eq!(recvd, 101);
}
}

0 comments on commit a9ecf9d

Please sign in to comment.