diff --git a/src/streamer.rs b/src/streamer.rs index 072fc39d903a0d..ff77b8954792c3 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -445,63 +445,70 @@ fn broadcast( while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq); } - let mut blobs: Vec<_> = dq.into_iter().collect(); + let blobs_vec: Vec<_> = dq.into_iter().collect(); + let blobs_chunked: Vec<_> = blobs_vec + .chunks(window.read().unwrap().len()) + .map(|x| x.to_vec()) + .collect(); print_window(window, *receive_index as usize); - // Insert the coding blobs into the blob stream - #[cfg(feature = "erasure")] - erasure::add_coding_blobs(recycler, &mut blobs, *receive_index); + for mut blobs in blobs_chunked { + // Insert the coding blobs into the blob stream + #[cfg(feature = "erasure")] + erasure::add_coding_blobs(recycler, &mut blobs, *receive_index); - let blobs_len = blobs.len(); - info!("broadcast blobs.len: {}", blobs_len); + let blobs_len = blobs.len(); + debug!("broadcast blobs.len: {}", blobs_len); - // Index the blobs - Crdt::index_blobs(crdt, &blobs, receive_index)?; - // keep the cache of blobs that are broadcast - { - let mut win = window.write().unwrap(); - for b in &blobs { - let ix = b.read().unwrap().get_index().expect("blob index"); - let pos = (ix as usize) % WINDOW_SIZE; - if let Some(x) = &win[pos] { - trace!( - "popped {} at {}", - x.read().unwrap().get_index().unwrap(), - pos - ); - recycler.recycle(x.clone()); + // Index the blobs + Crdt::index_blobs(crdt, &blobs, receive_index)?; + // keep the cache of blobs that are broadcast + { + let mut win = window.write().unwrap(); + assert!(blobs.len() <= win.len()); + for b in &blobs { + let ix = b.read().unwrap().get_index().expect("blob index"); + let pos = (ix as usize) % WINDOW_SIZE; + if let Some(x) = &win[pos] { + trace!( + "popped {} at {}", + x.read().unwrap().get_index().unwrap(), + pos + ); + recycler.recycle(x.clone()); + } + trace!("null {}", pos); + win[pos] = None; + assert!(win[pos].is_none()); + } + while let Some(b) = blobs.pop() { + let ix = b.read().unwrap().get_index().expect("blob index"); + let pos = (ix as usize) % WINDOW_SIZE; + trace!("caching {} at {}", ix, pos); + assert!(win[pos].is_none()); + win[pos] = Some(b); } - trace!("null {}", pos); - win[pos] = None; - assert!(win[pos].is_none()); - } - while let Some(b) = blobs.pop() { - let ix = b.read().unwrap().get_index().expect("blob index"); - let pos = (ix as usize) % WINDOW_SIZE; - trace!("caching {} at {}", ix, pos); - assert!(win[pos].is_none()); - win[pos] = Some(b); } - } - // Fill in the coding blob data from the window data blobs - #[cfg(feature = "erasure")] - { - if erasure::generate_coding( - &mut window.write().unwrap(), - *receive_index as usize, - blobs_len, - ).is_err() + // Fill in the coding blob data from the window data blobs + #[cfg(feature = "erasure")] { - return Err(Error::GenericError); + if erasure::generate_coding( + &mut window.write().unwrap(), + *receive_index as usize, + blobs_len, + ).is_err() + { + return Err(Error::GenericError); + } } - } - *receive_index += blobs_len as u64; + *receive_index += blobs_len as u64; - // Send blobs out from the window - Crdt::broadcast(crdt, &window, &sock, transmit_index, *receive_index)?; + // Send blobs out from the window + Crdt::broadcast(crdt, &window, &sock, transmit_index, *receive_index)?; + } Ok(()) }