From b48a8c0555a66e35714a01dc027a23f258155c8e Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Mon, 25 Jun 2018 13:29:53 -0700 Subject: [PATCH] Chunk blobs into window size to avoid window overrun Fixes #447 --- src/streamer.rs | 99 +++++++++++++++++++++++++++---------------------- 1 file changed, 54 insertions(+), 45 deletions(-) diff --git a/src/streamer.rs b/src/streamer.rs index 9c4e6cb0eb962a..abd25ff4d6a920 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -446,63 +446,72 @@ fn broadcast( while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq); } - let mut blobs: Vec<_> = dq.into_iter().collect(); + + // flatten deque to vec + let blobs_vec: Vec<_> = dq.into_iter().collect(); + + // We could receive more blobs than window slots so + // break them up into window-sized chunks to process + let blobs_chunked: Vec<_> = blobs_vec + .chunks(WINDOW_SIZE) + .map(|x| x.to_vec()) + .collect(); print_window(window, *receive_index as usize); - // Insert the coding blobs into the blob stream - #[cfg(feature = "erasure")] - erasure::add_coding_blobs(recycler, &mut blobs, *receive_index); + for mut blobs in blobs_chunked { + // Insert the coding blobs into the blob stream + #[cfg(feature = "erasure")] + erasure::add_coding_blobs(recycler, &mut blobs, *receive_index); - let blobs_len = blobs.len(); - info!("broadcast blobs.len: {}", blobs_len); + let blobs_len = blobs.len(); + debug!("broadcast blobs.len: {}", blobs_len); - // Index the blobs - Crdt::index_blobs(crdt, &blobs, receive_index)?; - // keep the cache of blobs that are broadcast - { - let mut win = window.write().unwrap(); - for b in &blobs { - let ix = b.read().unwrap().get_index().expect("blob index"); - let pos = (ix as usize) % WINDOW_SIZE; - if let Some(x) = &win[pos] { - trace!( - "popped {} at {}", - x.read().unwrap().get_index().unwrap(), - pos - ); - recycler.recycle(x.clone()); + // Index the blobs + Crdt::index_blobs(crdt, &blobs, receive_index)?; + // keep the cache of blobs that are broadcast + { + let mut win = window.write().unwrap(); + assert!(blobs.len() <= win.len()); + for b in &blobs { + let ix = b.read().unwrap().get_index().expect("blob index"); + let pos = (ix as usize) % WINDOW_SIZE; + if let Some(x) = &win[pos] { + trace!( + "popped {} at {}", + x.read().unwrap().get_index().unwrap(), + pos + ); + recycler.recycle(x.clone()); + } + trace!("null {}", pos); + win[pos] = None; + assert!(win[pos].is_none()); + } + while let Some(b) = blobs.pop() { + let ix = b.read().unwrap().get_index().expect("blob index"); + let pos = (ix as usize) % WINDOW_SIZE; + trace!("caching {} at {}", ix, pos); + assert!(win[pos].is_none()); + win[pos] = Some(b); } - trace!("null {}", pos); - win[pos] = None; - assert!(win[pos].is_none()); - } - while let Some(b) = blobs.pop() { - let ix = b.read().unwrap().get_index().expect("blob index"); - let pos = (ix as usize) % WINDOW_SIZE; - trace!("caching {} at {}", ix, pos); - assert!(win[pos].is_none()); - win[pos] = Some(b); } - } - // Fill in the coding blob data from the window data blobs - #[cfg(feature = "erasure")] - { - if erasure::generate_coding( - &mut window.write().unwrap(), - *receive_index as usize, - blobs_len, - ).is_err() + // Fill in the coding blob data from the window data blobs + #[cfg(feature = "erasure")] { - return Err(Error::GenericError); + erasure::generate_coding( + &mut window.write().unwrap(), + *receive_index as usize, + blobs_len, + ).map_err(|_| Error::GenericError)?; } - } - *receive_index += blobs_len as u64; + *receive_index += blobs_len as u64; - // Send blobs out from the window - Crdt::broadcast(crdt, &window, &sock, transmit_index, *receive_index)?; + // Send blobs out from the window + Crdt::broadcast(crdt, &window, &sock, transmit_index, *receive_index)?; + } Ok(()) }