diff --git a/src/streamer.rs b/src/streamer.rs index 3cb8026abc6473..842b91943811c2 100755 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -442,6 +442,39 @@ fn process_blob( } } +fn validate_blob_against_window( + debug_id: u64, + pix: u64, + consumed: u64, + received: &mut u64, +) -> bool { + // Prevent receive window from running over + if pix >= consumed + WINDOW_SIZE { + debug!( + "{:x}: received: {} will overrun window: {} skipping..", + debug_id, + pix, + consumed + WINDOW_SIZE + ); + return false; + } + + // Got a blob which has already been consumed, skip it + // probably from a repair window request + if pix < consumed { + debug!( + "{:x}: received: {} but older than consumed: {} skipping..", + debug_id, pix, consumed + ); + return false; + } + + if pix > *received { + *received = pix; + } + true +} + fn recv_window( debug_id: u64, window: &SharedWindow, @@ -489,21 +522,8 @@ fn recv_window( let p = b.write().expect("'b' write lock in fn recv_window"); (p.get_index()?, p.meta.size) }; - // Prevent receive window from running over - if pix >= *consumed + WINDOW_SIZE { - recycler.recycle(b); - continue; - } - if pix > *received { - *received = pix; - } - // Got a blob which has already been consumed, skip it - // probably from a repair window request - if pix < *consumed { - debug!( - "{:x}: received: {} but older than consumed: {} skipping..", - debug_id, pix, *consumed - ); + + if validate_blob_against_window(debug_id, pix, *consumed, received) == false { recycler.recycle(b); continue; } @@ -930,6 +950,7 @@ mod test { use std::sync::{Arc, RwLock}; use std::time::Duration; use streamer::calculate_highest_lost_blob_index; + use streamer::validate_blob_against_window; use streamer::{blob_receiver, receiver, responder, window}; use streamer::{default_window, BlobReceiver, PacketReceiver, WINDOW_SIZE}; @@ -1107,4 +1128,26 @@ mod test { WINDOW_SIZE + 9 ); } + + #[test] + pub fn validate_blob_against_window_test() { + let mut recvd: u64 = 100; + assert!(!validate_blob_against_window( + 0, + 90 + WINDOW_SIZE, + 90, + &mut recvd + )); + assert!(!validate_blob_against_window( + 0, + 91 + WINDOW_SIZE, + 90, + &mut recvd + )); + assert!(!validate_blob_against_window(0, 89, 90, &mut recvd)); + assert!(validate_blob_against_window(0, 91, 90, &mut recvd)); + assert_eq!(recvd, 100); + assert!(validate_blob_against_window(0, 101, 90, &mut recvd)); + assert_eq!(recvd, 101); + } }