Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Erasure fixes and add erasure to ci #279

Merged
merged 28 commits into from
Jun 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
806676c
Fix erasure compilation
sakridge May 25, 2018
90df367
Enable logging for client demo
sakridge May 25, 2018
dcb66de
Add erasure ci script
sakridge May 25, 2018
198b566
indexing blobs then coding
sakridge May 25, 2018
287f067
generate coding after indexing
sakridge May 26, 2018
d04fe54
Erasure refinements, fix generating orders table
sakridge May 28, 2018
84e1ad1
Add erasure build to ci
sakridge May 29, 2018
fe0338c
Debug erasure ci script
sakridge May 29, 2018
41b369e
Review comments
sakridge May 30, 2018
2a737ab
Fix gf-complete url and symlinks
sakridge May 30, 2018
d596fc1
Add window recovery
sakridge May 30, 2018
140613c
Revert log levels
sakridge May 30, 2018
36b920d
Fixes for erasure coding
sakridge May 31, 2018
5a6e3f1
Handle set_flags error
sakridge May 31, 2018
24a46f7
Fix deadlock and only push to contq if it's not a coding blob
sakridge May 31, 2018
fd2f077
Generate coding for the current blob set not just the first coding set
sakridge Jun 1, 2018
2a4c32e
Restore more of the blob window and add is_coding helper
sakridge Jun 1, 2018
d17a70d
Store another size in the data block so it is coded as well
sakridge Jun 4, 2018
70fdf37
Add receive_index for broadcast blobs and fix blobs_len position
sakridge Jun 4, 2018
76709fd
Move receive_index to correct place
sakridge Jun 4, 2018
3f90dba
Rework to fix coding blob insertion
sakridge Jun 5, 2018
138e52b
Receive fixes
sakridge Jun 5, 2018
90be7e0
Fixes for receiving old blobs and nulling the window with coding
sakridge Jun 5, 2018
efbb387
cargo fmt
sakridge Jun 5, 2018
3b45428
Fix non-erasure blob nulling
sakridge Jun 5, 2018
947a60d
Fix shellcheck
sakridge Jun 5, 2018
f1a3042
More shellcheck
sakridge Jun 5, 2018
57c1171
Shellcheck again
sakridge Jun 5, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/buildkite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ steps:
name: "cuda"
- command: "ci/shellcheck.sh"
name: "shellcheck [public]"
- command: "ci/test-erasure.sh"
name: "erasure"
- wait
- command: "ci/publish.sh"
name: "publish release artifacts"
29 changes: 29 additions & 0 deletions ci/test-erasure.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/bash -e

set -o xtrace

cd "$(dirname "$0")/.."

if [[ -z "${libgf_complete_URL:-}" ]]; then
echo libgf_complete_URL undefined
exit 1
fi

if [[ -z "${libJerasure_URL:-}" ]]; then
echo libJerasure_URL undefined
exit 1
fi

curl -X GET -o libJerasure.so "$libJerasure_URL"
curl -X GET -o libgf_complete.so "$libgf_complete_URL"

ln -s libJerasure.so libJerasure.so.2
ln -s libJerasure.so libJerasure.so.2.0.0
ln -s libgf_complete.so libgf_complete.so.1.0.0
export LD_LIBRARY_PATH=$PWD:$LD_LIBRARY_PATH

# shellcheck disable=SC1090 # <-- shellcheck can't follow ~
source ~/.cargo/env
cargo test --features="erasure"

exit 0
2 changes: 2 additions & 0 deletions src/bin/client-demo.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
extern crate env_logger;
extern crate getopts;
extern crate isatty;
extern crate pnet;
Expand Down Expand Up @@ -49,6 +50,7 @@ fn get_ip_addr() -> Option<IpAddr> {
}

fn main() {
env_logger::init().unwrap();
let mut threads = 4usize;
let mut num_nodes = 1usize;

Expand Down
66 changes: 47 additions & 19 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,38 @@ impl Crdt {
}
}

pub fn index_blobs(
obj: &Arc<RwLock<Self>>,
blobs: &Vec<SharedBlob>,
receive_index: &mut u64,
) -> Result<()> {
let me: ReplicatedData = {
let robj = obj.read().expect("'obj' read lock in crdt::index_blobs");
debug!("broadcast table {}", robj.table.len());
robj.table[&robj.me].clone()
};

// enumerate all the blobs, those are the indices
for (i, b) in blobs.iter().enumerate() {
// only leader should be broadcasting
let mut blob = b.write().expect("'blob' write lock in crdt::index_blobs");
blob.set_id(me.id).expect("set_id in pub fn broadcast");
blob.set_index(*receive_index + i as u64)
.expect("set_index in pub fn broadcast");
}

Ok(())
}

/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to`
pub fn broadcast(
obj: &Arc<RwLock<Self>>,
blobs: &Vec<SharedBlob>,
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
s: &UdpSocket,
transmit_index: &mut u64,
received_index: u64,
) -> Result<()> {
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
// copy to avoid locking during IO
Expand Down Expand Up @@ -266,31 +290,35 @@ impl Crdt {
return Err(Error::CrdtTooSmall);
}
trace!("nodes table {}", nodes.len());
trace!("blobs table {}", blobs.len());
// enumerate all the blobs, those are the indices

// enumerate all the blobs in the window, those are the indices
// transmit them to nodes, starting from a different node
let orders: Vec<_> = blobs
.iter()
.enumerate()
.zip(
nodes
.iter()
.cycle()
.skip((*transmit_index as usize) % nodes.len()),
)
.collect();
let mut orders = Vec::new();
let window_l = window.write().unwrap();
for i in *transmit_index..received_index {
let is = i as usize;
let k = is % window_l.len();
assert!(window_l[k].is_some());

orders.push((window_l[k].clone(), nodes[is % nodes.len()]));
}

trace!("orders table {}", orders.len());
let errs: Vec<_> = orders
.into_iter()
.map(|((i, b), v)| {
.map(|(b, v)| {
// only leader should be broadcasting
assert!(me.current_leader_id != v.id);
let mut blob = b.write().expect("'b' write lock in pub fn broadcast");
blob.set_id(me.id).expect("set_id in pub fn broadcast");
blob.set_index(*transmit_index + i as u64)
.expect("set_index in pub fn broadcast");
let bl = b.unwrap();
let blob = bl.read().expect("blob read lock in streamer::broadcast");
//TODO profile this, may need multiple sockets for par_iter
trace!("broadcast {} to {}", blob.meta.size, v.replicate_addr);
trace!(
"broadcast idx: {} sz: {} to {} coding: {}",
blob.get_index().unwrap(),
blob.meta.size,
v.replicate_addr,
blob.is_coding()
);
assert!(blob.meta.size < BLOB_SIZE);
let e = s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr);
trace!("done broadcast {} to {}", blob.meta.size, v.replicate_addr);
Expand Down
Loading