Skip to content

Commit

Permalink
Chunk blobs into window size to avoid window overrun
Browse files Browse the repository at this point in the history
Fixes #447
  • Loading branch information
sakridge committed Jun 25, 2018
1 parent 3966eb5 commit 476e6a9
Showing 1 changed file with 54 additions and 45 deletions.
99 changes: 54 additions & 45 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,63 +445,72 @@ fn broadcast(
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq);
}
let mut blobs: Vec<_> = dq.into_iter().collect();

// flatten deque to vec
let blobs_vec: Vec<_> = dq.into_iter().collect();

// We could receive more blobs than window slots so
// break them up into window-sized chunks to process
let blobs_chunked: Vec<_> = blobs_vec
.chunks(WINDOW_SIZE)
.map(|x| x.to_vec())
.collect();

print_window(window, *receive_index as usize);

// Insert the coding blobs into the blob stream
#[cfg(feature = "erasure")]
erasure::add_coding_blobs(recycler, &mut blobs, *receive_index);
for mut blobs in blobs_chunked {
// Insert the coding blobs into the blob stream
#[cfg(feature = "erasure")]
erasure::add_coding_blobs(recycler, &mut blobs, *receive_index);

let blobs_len = blobs.len();
info!("broadcast blobs.len: {}", blobs_len);
let blobs_len = blobs.len();
debug!("broadcast blobs.len: {}", blobs_len);

// Index the blobs
Crdt::index_blobs(crdt, &blobs, receive_index)?;
// keep the cache of blobs that are broadcast
{
let mut win = window.write().unwrap();
for b in &blobs {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix as usize) % WINDOW_SIZE;
if let Some(x) = &win[pos] {
trace!(
"popped {} at {}",
x.read().unwrap().get_index().unwrap(),
pos
);
recycler.recycle(x.clone());
// Index the blobs
Crdt::index_blobs(crdt, &blobs, receive_index)?;
// keep the cache of blobs that are broadcast
{
let mut win = window.write().unwrap();
assert!(blobs.len() <= win.len());
for b in &blobs {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix as usize) % WINDOW_SIZE;
if let Some(x) = &win[pos] {
trace!(
"popped {} at {}",
x.read().unwrap().get_index().unwrap(),
pos
);
recycler.recycle(x.clone());
}
trace!("null {}", pos);
win[pos] = None;
assert!(win[pos].is_none());
}
while let Some(b) = blobs.pop() {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix as usize) % WINDOW_SIZE;
trace!("caching {} at {}", ix, pos);
assert!(win[pos].is_none());
win[pos] = Some(b);
}
trace!("null {}", pos);
win[pos] = None;
assert!(win[pos].is_none());
}
while let Some(b) = blobs.pop() {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix as usize) % WINDOW_SIZE;
trace!("caching {} at {}", ix, pos);
assert!(win[pos].is_none());
win[pos] = Some(b);
}
}

// Fill in the coding blob data from the window data blobs
#[cfg(feature = "erasure")]
{
if erasure::generate_coding(
&mut window.write().unwrap(),
*receive_index as usize,
blobs_len,
).is_err()
// Fill in the coding blob data from the window data blobs
#[cfg(feature = "erasure")]
{
return Err(Error::GenericError);
erasure::generate_coding(
&mut window.write().unwrap(),
*receive_index as usize,
blobs_len,
).map_err(|_| Error::GenericError)?;
}
}

*receive_index += blobs_len as u64;
*receive_index += blobs_len as u64;

// Send blobs out from the window
Crdt::broadcast(crdt, &window, &sock, transmit_index, *receive_index)?;
// Send blobs out from the window
Crdt::broadcast(crdt, &window, &sock, transmit_index, *receive_index)?;
}
Ok(())
}

Expand Down

0 comments on commit 476e6a9

Please sign in to comment.