Skip to content

Commit

Permalink
Refactor validator windowing
Browse files Browse the repository at this point in the history
- a unit test for windowing functions
- issue #857
  • Loading branch information
pgarg66 authored and solana-grimes committed Aug 7, 2018
1 parent db2392a commit ceb5a76
Showing 1 changed file with 71 additions and 17 deletions.
88 changes: 71 additions & 17 deletions src/streamer.rs
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::cmp;
use std::collections::VecDeque;
use std::mem;
use std::net::{SocketAddr, UdpSocket};
use std::result;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock};
Expand Down Expand Up @@ -442,6 +443,42 @@ fn process_blob(
}
}

#[derive(Debug, PartialEq, Eq)]
enum RecvWindowError {
WindowOverrun,
AlreadyReceived,
}

fn validate_blob_against_window(
debug_id: u64,
pix: u64,
consumed: u64,
received: u64,
) -> result::Result<u64, RecvWindowError> {
// Prevent receive window from running over
if pix >= consumed + WINDOW_SIZE {
debug!(
"{:x}: received: {} will overrun window: {} skipping..",
debug_id,
pix,
consumed + WINDOW_SIZE
);
return Err(RecvWindowError::WindowOverrun);
}

// Got a blob which has already been consumed, skip it
// probably from a repair window request
if pix < consumed {
debug!(
"{:x}: received: {} but older than consumed: {} skipping..",
debug_id, pix, consumed
);
return Err(RecvWindowError::AlreadyReceived);
}

Ok(cmp::max(pix, received))
}

fn recv_window(
debug_id: u64,
window: &SharedWindow,
Expand Down Expand Up @@ -489,23 +526,14 @@ fn recv_window(
let p = b.write().expect("'b' write lock in fn recv_window");
(p.get_index()?, p.meta.size)
};
// Prevent receive window from running over
if pix >= *consumed + WINDOW_SIZE {
recycler.recycle(b);
continue;
}
if pix > *received {
*received = pix;
}
// Got a blob which has already been consumed, skip it
// probably from a repair window request
if pix < *consumed {
debug!(
"{:x}: received: {} but older than consumed: {} skipping..",
debug_id, pix, *consumed
);
recycler.recycle(b);
continue;

let result = validate_blob_against_window(debug_id, pix, *consumed, *received);
match result {
Ok(v) => *received = v,
Err(_e) => {
recycler.recycle(b);
continue;
}
}

trace!("{:x} window pix: {} size: {}", debug_id, pix, meta_size);
Expand Down Expand Up @@ -930,6 +958,8 @@ mod test {
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer::calculate_highest_lost_blob_index;
use streamer::validate_blob_against_window;
use streamer::RecvWindowError;
use streamer::{blob_receiver, receiver, responder, window};
use streamer::{default_window, BlobReceiver, PacketReceiver, WINDOW_SIZE};

Expand Down Expand Up @@ -1107,4 +1137,28 @@ mod test {
WINDOW_SIZE + 9
);
}

#[test]
pub fn validate_blob_against_window_test() {
assert_eq!(
validate_blob_against_window(0, 90 + WINDOW_SIZE, 90, 100).unwrap_err(),
RecvWindowError::WindowOverrun
);
assert_eq!(
validate_blob_against_window(0, 91 + WINDOW_SIZE, 90, 100).unwrap_err(),
RecvWindowError::WindowOverrun
);
assert_eq!(
validate_blob_against_window(0, 89, 90, 100).unwrap_err(),
RecvWindowError::AlreadyReceived
);
assert_eq!(
validate_blob_against_window(0, 91, 90, 100).ok().unwrap(),
100
);
assert_eq!(
validate_blob_against_window(0, 101, 90, 100).ok().unwrap(),
101
);
}
}

0 comments on commit ceb5a76

Please sign in to comment.