diff --git a/src/erasure.rs b/src/erasure.rs index 7c74c72c4a7a48..6837df14fa7561 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -1,6 +1,6 @@ // Support erasure coding -use packet::{BlobRecycler, SharedBlob, BLOB_FLAG_IS_CODING}; +use packet::{BlobRecycler, SharedBlob}; use std::result; //TODO(sakridge) pick these values @@ -235,8 +235,7 @@ pub fn generate_coding(window: &mut Vec>, consumed: usize, nu } let w_l = window[n].clone().unwrap(); w_l.write().unwrap().meta.size = max_data_size; - let flags = w_l.write().unwrap().get_flags().unwrap(); - if w_l.write().unwrap().set_flags(flags | BLOB_FLAG_IS_CODING).is_err() { + if w_l.write().unwrap().set_coding().is_err() { return Err(ErasureError::EncodeError); } coding_blobs.push( @@ -271,74 +270,90 @@ pub fn recover( re: &BlobRecycler, window: &mut Vec>, consumed: usize, + received: usize, ) -> Result<()> { //recover with erasure coding - let mut data_missing = 0; - let mut coded_missing = 0; - let block_start = consumed - (consumed % NUM_CODED); - let coding_start = block_start + NUM_DATA; - let coding_end = block_start + NUM_CODED; - /*info!( - "recover: block_start: {} coding_start: {} coding_end: {}", - block_start, - coding_start, - coding_end - );*/ - for i in block_start..coding_end { - let n = i % window.len(); - if window[n].is_none() { - if i >= coding_start { - coded_missing += 1; - } else { - data_missing += 1; + if received <= consumed { + return Ok(()); + } + let num_blocks = (received - consumed) / NUM_CODED; + let mut block_start = consumed - (consumed % NUM_CODED); + + if num_blocks > 0 { + debug!("num_blocks: {} received: {} consumed: {}", num_blocks, received, consumed); + } + + for i in 0..num_blocks { + if i > 100 { + break; + } + let mut data_missing = 0; + let mut coded_missing = 0; + let coding_start = block_start + NUM_DATA; + let coding_end = block_start + NUM_CODED; + trace!( + "recover: block_start: {} coding_start: {} coding_end: {}", + block_start, + coding_start, + coding_end + ); + for i in block_start..coding_end { + let n = i % window.len(); + if window[n].is_none() { + if i >= coding_start { + coded_missing += 1; + } else { + data_missing += 1; + } } } - } - if data_missing > 0 { - if (data_missing + coded_missing) <= MAX_MISSING { + if (data_missing + coded_missing) != NUM_CODED { trace!("recovering: data: {} coding: {}", data_missing, coded_missing); - let mut blobs: Vec = Vec::new(); - let mut locks = Vec::new(); - let mut data_ptrs: Vec<&mut [u8]> = Vec::new(); - let mut coding_ptrs: Vec<&[u8]> = Vec::new(); - let mut erasures: Vec = Vec::new(); - for i in block_start..coding_end { - let j = i % window.len(); - let mut b = &mut window[j]; - if b.is_some() { - blobs.push(b.clone().expect("'blobs' arr in pb fn recover")); - continue; + } + if data_missing > 0 { + if (data_missing + coded_missing) <= MAX_MISSING { + let mut blobs: Vec = Vec::new(); + let mut locks = Vec::new(); + let mut data_ptrs: Vec<&mut [u8]> = Vec::new(); + let mut coding_ptrs: Vec<&[u8]> = Vec::new(); + let mut erasures: Vec = Vec::new(); + for i in block_start..coding_end { + let j = i % window.len(); + let mut b = &mut window[j]; + if b.is_some() { + blobs.push(b.clone().expect("'blobs' arr in pb fn recover")); + continue; + } + let n = re.allocate(); + *b = Some(n.clone()); + //mark the missing memory + blobs.push(n); + erasures.push((i - block_start) as i32); } - let n = re.allocate(); - *b = Some(n.clone()); - //mark the missing memory - blobs.push(n); - erasures.push(i as i32); - } - erasures.push(-1); - trace!("erasures: {:?}", erasures); - //lock everything - for b in &blobs { - locks.push(b.write().expect("'locks' arr in pb fn recover")); - } - for (i, l) in locks.iter_mut().enumerate() { - if i >= NUM_DATA { - trace!("pushing coding: {}", i); - coding_ptrs.push(&l.data); - } else { - trace!("pushing data: {}", i); - data_ptrs.push(&mut l.data); + erasures.push(-1); + trace!("erasures: {:?}", erasures); + //lock everything + for b in &blobs { + locks.push(b.write().expect("'locks' arr in pb fn recover")); } + for (i, l) in locks.iter_mut().enumerate() { + if i >= NUM_DATA { + trace!("pushing coding: {}", i); + coding_ptrs.push(&l.data); + } else { + trace!("pushing data: {}", i); + data_ptrs.push(&mut l.data); + } + } + trace!( + "coding_ptrs.len: {} data_ptrs.len {}", + coding_ptrs.len(), + data_ptrs.len() + ); + decode_blocks(data_ptrs.as_mut_slice(), &coding_ptrs, &erasures)?; } - trace!( - "coding_ptrs.len: {} data_ptrs.len {}", - coding_ptrs.len(), - data_ptrs.len() - ); - decode_blocks(data_ptrs.as_mut_slice(), &coding_ptrs, &erasures)?; - } else { - return Err(ErasureError::NotEnoughBlocksToDecode); } + block_start += NUM_CODED; } Ok(()) } @@ -446,6 +461,7 @@ mod test { "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), + "127.0.0.1:1238".parse().unwrap(), ); let crdt = Arc::new(RwLock::new(crdt::Crdt::new(d.clone()))); @@ -481,7 +497,7 @@ mod test { window[erase_offset] = None; // Recover it from coding - assert!(erasure::recover(&blob_recycler, &mut window, offset).is_ok()); + assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok()); println!("** after-recover:"); print_window(&window); @@ -522,7 +538,7 @@ mod test { window_l0.write().unwrap().data[0] = 55; println!("** after-nulling:"); print_window(&window); - assert!(erasure::recover(&blob_recycler, &mut window, offset).is_ok()); + assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + num_blobs).is_ok()); println!("** after-restore:"); print_window(&window); let window_l = window[offset + 1].clone().unwrap(); diff --git a/src/packet.rs b/src/packet.rs index d7d6b0020fcb4b..3df9f3c352b16e 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -313,6 +313,15 @@ impl Blob { Ok(()) } + pub fn is_coding(&self) -> bool { + return (self.get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0; + } + + pub fn set_coding(&mut self) -> Result<()> { + let flags = self.get_flags().unwrap(); + self.set_flags(flags | BLOB_FLAG_IS_CODING) + } + pub fn data(&self) -> &[u8] { &self.data[BLOB_FLAGS_END..] } diff --git a/src/streamer.rs b/src/streamer.rs index 6fa1b74c31138d..6cb6abbcb74809 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -3,7 +3,7 @@ use crdt::Crdt; #[cfg(feature = "erasure")] use erasure; -use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE, BLOB_FLAG_IS_CODING}; +use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE}; use result::{Error, Result}; use std::collections::VecDeque; use std::net::{SocketAddr, UdpSocket}; @@ -178,11 +178,10 @@ fn repair_window( ) -> Result<()> { #[cfg(feature = "erasure")] { - if erasure::recover(_recycler, &mut locked_window.write().unwrap(), *consumed).is_err() { + if erasure::recover(_recycler, &mut locked_window.write().unwrap(), *consumed, *received).is_err() { trace!("erasure::recover failed"); } } - let reqs = find_next_missing(locked_window, crdt, consumed, received)?; //exponential backoff if *last != *consumed { *times = 0; @@ -194,6 +193,7 @@ fn repair_window( trace!("repair_window counter {} {}", *times, *consumed); return Ok(()); } + let reqs = find_next_missing(locked_window, crdt, consumed, received)?; let sock = UdpSocket::bind("0.0.0.0:0")?; for (to, req) in reqs { //todo cache socket @@ -293,7 +293,7 @@ fn recv_window( } let mut is_coding = false; if let &Some(ref cblob) = &window[k] { - if (cblob.read().expect("blob read lock for flags streamer::window").get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0 { + if cblob.read().expect("blob read lock for flags streamer::window").is_coding() { is_coding = true; } } @@ -330,7 +330,15 @@ fn print_window( } else if v.is_none() { "0" } else { - "1" + if let &Some(ref cblob) = &v { + if cblob.read().unwrap().is_coding() { + "C" + } else { + "1" + } + } else { + "0" + } } }) .collect();