Skip to content

Commit

Permalink
Restore more of the blob window and add is_coding helper
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed Jun 2, 2018
1 parent 5564008 commit 25ec2f3
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 67 deletions.
139 changes: 77 additions & 62 deletions src/erasure.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Support erasure coding

use packet::{BlobRecycler, SharedBlob, BLOB_FLAG_IS_CODING};
use packet::{BlobRecycler, SharedBlob};
use std::result;

//TODO(sakridge) pick these values
Expand Down Expand Up @@ -235,8 +235,7 @@ pub fn generate_coding(window: &mut Vec<Option<SharedBlob>>, consumed: usize, nu
}
let w_l = window[n].clone().unwrap();
w_l.write().unwrap().meta.size = max_data_size;
let flags = w_l.write().unwrap().get_flags().unwrap();
if w_l.write().unwrap().set_flags(flags | BLOB_FLAG_IS_CODING).is_err() {
if w_l.write().unwrap().set_coding().is_err() {
return Err(ErasureError::EncodeError);
}
coding_blobs.push(
Expand Down Expand Up @@ -271,74 +270,90 @@ pub fn recover(
re: &BlobRecycler,
window: &mut Vec<Option<SharedBlob>>,
consumed: usize,
received: usize,
) -> Result<()> {
//recover with erasure coding
let mut data_missing = 0;
let mut coded_missing = 0;
let block_start = consumed - (consumed % NUM_CODED);
let coding_start = block_start + NUM_DATA;
let coding_end = block_start + NUM_CODED;
/*info!(
"recover: block_start: {} coding_start: {} coding_end: {}",
block_start,
coding_start,
coding_end
);*/
for i in block_start..coding_end {
let n = i % window.len();
if window[n].is_none() {
if i >= coding_start {
coded_missing += 1;
} else {
data_missing += 1;
if received <= consumed {
return Ok(());
}
let num_blocks = (received - consumed) / NUM_CODED;
let mut block_start = consumed - (consumed % NUM_CODED);

if num_blocks > 0 {
debug!("num_blocks: {} received: {} consumed: {}", num_blocks, received, consumed);
}

for i in 0..num_blocks {
if i > 100 {
break;
}
let mut data_missing = 0;
let mut coded_missing = 0;
let coding_start = block_start + NUM_DATA;
let coding_end = block_start + NUM_CODED;
trace!(
"recover: block_start: {} coding_start: {} coding_end: {}",
block_start,
coding_start,
coding_end
);
for i in block_start..coding_end {
let n = i % window.len();
if window[n].is_none() {
if i >= coding_start {
coded_missing += 1;
} else {
data_missing += 1;
}
}
}
}
if data_missing > 0 {
if (data_missing + coded_missing) <= MAX_MISSING {
if (data_missing + coded_missing) != NUM_CODED {
trace!("recovering: data: {} coding: {}", data_missing, coded_missing);
let mut blobs: Vec<SharedBlob> = Vec::new();
let mut locks = Vec::new();
let mut data_ptrs: Vec<&mut [u8]> = Vec::new();
let mut coding_ptrs: Vec<&[u8]> = Vec::new();
let mut erasures: Vec<i32> = Vec::new();
for i in block_start..coding_end {
let j = i % window.len();
let mut b = &mut window[j];
if b.is_some() {
blobs.push(b.clone().expect("'blobs' arr in pb fn recover"));
continue;
}
if data_missing > 0 {
if (data_missing + coded_missing) <= MAX_MISSING {
let mut blobs: Vec<SharedBlob> = Vec::new();
let mut locks = Vec::new();
let mut data_ptrs: Vec<&mut [u8]> = Vec::new();
let mut coding_ptrs: Vec<&[u8]> = Vec::new();
let mut erasures: Vec<i32> = Vec::new();
for i in block_start..coding_end {
let j = i % window.len();
let mut b = &mut window[j];
if b.is_some() {
blobs.push(b.clone().expect("'blobs' arr in pb fn recover"));
continue;
}
let n = re.allocate();
*b = Some(n.clone());
//mark the missing memory
blobs.push(n);
erasures.push((i - block_start) as i32);
}
let n = re.allocate();
*b = Some(n.clone());
//mark the missing memory
blobs.push(n);
erasures.push(i as i32);
}
erasures.push(-1);
trace!("erasures: {:?}", erasures);
//lock everything
for b in &blobs {
locks.push(b.write().expect("'locks' arr in pb fn recover"));
}
for (i, l) in locks.iter_mut().enumerate() {
if i >= NUM_DATA {
trace!("pushing coding: {}", i);
coding_ptrs.push(&l.data);
} else {
trace!("pushing data: {}", i);
data_ptrs.push(&mut l.data);
erasures.push(-1);
trace!("erasures: {:?}", erasures);
//lock everything
for b in &blobs {
locks.push(b.write().expect("'locks' arr in pb fn recover"));
}
for (i, l) in locks.iter_mut().enumerate() {
if i >= NUM_DATA {
trace!("pushing coding: {}", i);
coding_ptrs.push(&l.data);
} else {
trace!("pushing data: {}", i);
data_ptrs.push(&mut l.data);
}
}
trace!(
"coding_ptrs.len: {} data_ptrs.len {}",
coding_ptrs.len(),
data_ptrs.len()
);
decode_blocks(data_ptrs.as_mut_slice(), &coding_ptrs, &erasures)?;
}
trace!(
"coding_ptrs.len: {} data_ptrs.len {}",
coding_ptrs.len(),
data_ptrs.len()
);
decode_blocks(data_ptrs.as_mut_slice(), &coding_ptrs, &erasures)?;
} else {
return Err(ErasureError::NotEnoughBlocksToDecode);
}
block_start += NUM_CODED;
}
Ok(())
}
Expand Down
9 changes: 9 additions & 0 deletions src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,15 @@ impl Blob {
Ok(())
}

pub fn is_coding(&self) -> bool {
return (self.get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0;
}

pub fn set_coding(&mut self) -> Result<()> {
let flags = self.get_flags().unwrap();
self.set_flags(flags | BLOB_FLAG_IS_CODING)
}

pub fn data(&self) -> &[u8] {
&self.data[BLOB_FLAGS_END..]
}
Expand Down
18 changes: 13 additions & 5 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crdt::Crdt;
#[cfg(feature = "erasure")]
use erasure;
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE, BLOB_FLAG_IS_CODING};
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE};
use result::{Error, Result};
use std::collections::VecDeque;
use std::net::{SocketAddr, UdpSocket};
Expand Down Expand Up @@ -178,11 +178,10 @@ fn repair_window(
) -> Result<()> {
#[cfg(feature = "erasure")]
{
if erasure::recover(_recycler, &mut locked_window.write().unwrap(), *consumed).is_err() {
if erasure::recover(_recycler, &mut locked_window.write().unwrap(), *consumed, *received).is_err() {
trace!("erasure::recover failed");
}
}
let reqs = find_next_missing(locked_window, crdt, consumed, received)?;
//exponential backoff
if *last != *consumed {
*times = 0;
Expand All @@ -194,6 +193,7 @@ fn repair_window(
trace!("repair_window counter {} {}", *times, *consumed);
return Ok(());
}
let reqs = find_next_missing(locked_window, crdt, consumed, received)?;
let sock = UdpSocket::bind("0.0.0.0:0")?;
for (to, req) in reqs {
//todo cache socket
Expand Down Expand Up @@ -293,7 +293,7 @@ fn recv_window(
}
let mut is_coding = false;
if let &Some(ref cblob) = &window[k] {
if (cblob.read().expect("blob read lock for flags streamer::window").get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0 {
if cblob.read().expect("blob read lock for flags streamer::window").is_coding() {
is_coding = true;
}
}
Expand Down Expand Up @@ -330,7 +330,15 @@ fn print_window(
} else if v.is_none() {
"0"
} else {
"1"
if let &Some(ref cblob) = &v {
if cblob.read().unwrap().is_coding() {
"C"
} else {
"1"
}
} else {
"0"
}
}
})
.collect();
Expand Down

0 comments on commit 25ec2f3

Please sign in to comment.