Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug broadcast #2208

Merged
merged 12 commits into from
Dec 20, 2018
38 changes: 18 additions & 20 deletions benches/db_ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,17 @@ fn bench_insert_data_blob_small(bench: &mut Bencher) {
DbLedger::open(&ledger_path).expect("Expected to be able to open database ledger");
let num_entries = 32 * 1024;
let entries = make_tiny_test_entries(num_entries);
let shared_blobs = entries.to_blobs();
let mut blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.write().unwrap()).collect();
let mut blobs: Vec<&mut Blob> = blob_locks.iter_mut().map(|b| &mut **b).collect();
blobs.shuffle(&mut thread_rng());
let slot = 0;
let mut shared_blobs = entries.to_blobs();
shared_blobs.shuffle(&mut thread_rng());

bench.iter(move || {
for blob in blobs.iter_mut() {
let index = blob.index().unwrap();
let key = DataCf::key(slot, index);
db_ledger.insert_data_blob(&key, blob).unwrap();
blob.set_index(index + num_entries as u64).unwrap();
for blob in shared_blobs.iter_mut() {
let index = blob.read().unwrap().index().unwrap();
db_ledger.write_shared_blobs(vec![blob.clone()]).unwrap();
blob.write()
.unwrap()
.set_index(index + num_entries as u64)
.unwrap();
}
});

Expand All @@ -181,18 +180,17 @@ fn bench_insert_data_blob_big(bench: &mut Bencher) {
DbLedger::open(&ledger_path).expect("Expected to be able to open database ledger");
let num_entries = 32 * 1024;
let entries = make_large_test_entries(num_entries);
let shared_blobs = entries.to_blobs();
let mut blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.write().unwrap()).collect();
let mut blobs: Vec<&mut Blob> = blob_locks.iter_mut().map(|b| &mut **b).collect();
blobs.shuffle(&mut thread_rng());
let slot = 0;
let mut shared_blobs = entries.to_blobs();
shared_blobs.shuffle(&mut thread_rng());

bench.iter(move || {
for blob in blobs.iter_mut() {
let index = blob.index().unwrap();
let key = DataCf::key(slot, index);
db_ledger.insert_data_blob(&key, blob).unwrap();
blob.set_index(index + num_entries as u64).unwrap();
for blob in shared_blobs.iter_mut() {
let index = blob.read().unwrap().index().unwrap();
db_ledger.write_shared_blobs(vec![blob.clone()]).unwrap();
blob.write()
.unwrap()
.set_index(index + num_entries as u64)
.unwrap();
}
});

Expand Down
10 changes: 7 additions & 3 deletions src/broadcast_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ fn broadcast(
{
let mut win = window.write().unwrap();
assert!(blobs.len() <= win.len());
for (b, _) in &blobs {
let blobs: Vec<_> = blobs.into_iter().map(|(b, _)| b).collect();
for b in &blobs {
let ix = b.read().unwrap().index().expect("blob index");
let pos = (ix % window_size) as usize;
if let Some(x) = win[pos].data.take() {
Expand All @@ -122,16 +123,19 @@ fn broadcast(

trace!("{} null {}", id, pos);
}
for (b, _) in &blobs {
for b in &blobs {
{
let ix = b.read().unwrap().index().expect("blob index");
let pos = (ix % window_size) as usize;
trace!("{} caching {} at {}", id, ix, pos);
assert!(win[pos].data.is_none());
win[pos].data = Some(b.clone());
}
db_ledger.write_shared_blobs(vec![b])?;
}

db_ledger
.write_consecutive_blobs(&blobs)
.expect("Unrecoverable failure to write to database");
}

// Fill in the coding blob data from the window data blobs
Expand Down
Loading