Skip to content

Commit

Permalink
Debug broadcast (#2208)
Browse files Browse the repository at this point in the history
* Add per cf rocksdb options, increase compaction and flush threads

* Change broadcast stage to bulk write blobs

* add db_ledger function specifically for broadcast

* fix broken tests

* fix benches
  • Loading branch information
carllin authored Dec 20, 2018
1 parent 2fe3402 commit 666af1e
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 108 deletions.
38 changes: 18 additions & 20 deletions benches/db_ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,17 @@ fn bench_insert_data_blob_small(bench: &mut Bencher) {
DbLedger::open(&ledger_path).expect("Expected to be able to open database ledger");
let num_entries = 32 * 1024;
let entries = make_tiny_test_entries(num_entries);
let shared_blobs = entries.to_blobs();
let mut blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.write().unwrap()).collect();
let mut blobs: Vec<&mut Blob> = blob_locks.iter_mut().map(|b| &mut **b).collect();
blobs.shuffle(&mut thread_rng());
let slot = 0;
let mut shared_blobs = entries.to_blobs();
shared_blobs.shuffle(&mut thread_rng());

bench.iter(move || {
for blob in blobs.iter_mut() {
let index = blob.index().unwrap();
let key = DataCf::key(slot, index);
db_ledger.insert_data_blob(&key, blob).unwrap();
blob.set_index(index + num_entries as u64).unwrap();
for blob in shared_blobs.iter_mut() {
let index = blob.read().unwrap().index().unwrap();
db_ledger.write_shared_blobs(vec![blob.clone()]).unwrap();
blob.write()
.unwrap()
.set_index(index + num_entries as u64)
.unwrap();
}
});

Expand All @@ -181,18 +180,17 @@ fn bench_insert_data_blob_big(bench: &mut Bencher) {
DbLedger::open(&ledger_path).expect("Expected to be able to open database ledger");
let num_entries = 32 * 1024;
let entries = make_large_test_entries(num_entries);
let shared_blobs = entries.to_blobs();
let mut blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.write().unwrap()).collect();
let mut blobs: Vec<&mut Blob> = blob_locks.iter_mut().map(|b| &mut **b).collect();
blobs.shuffle(&mut thread_rng());
let slot = 0;
let mut shared_blobs = entries.to_blobs();
shared_blobs.shuffle(&mut thread_rng());

bench.iter(move || {
for blob in blobs.iter_mut() {
let index = blob.index().unwrap();
let key = DataCf::key(slot, index);
db_ledger.insert_data_blob(&key, blob).unwrap();
blob.set_index(index + num_entries as u64).unwrap();
for blob in shared_blobs.iter_mut() {
let index = blob.read().unwrap().index().unwrap();
db_ledger.write_shared_blobs(vec![blob.clone()]).unwrap();
blob.write()
.unwrap()
.set_index(index + num_entries as u64)
.unwrap();
}
});

Expand Down
10 changes: 7 additions & 3 deletions src/broadcast_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ fn broadcast(
{
let mut win = window.write().unwrap();
assert!(blobs.len() <= win.len());
for (b, _) in &blobs {
let blobs: Vec<_> = blobs.into_iter().map(|(b, _)| b).collect();
for b in &blobs {
let ix = b.read().unwrap().index().expect("blob index");
let pos = (ix % window_size) as usize;
if let Some(x) = win[pos].data.take() {
Expand All @@ -122,16 +123,19 @@ fn broadcast(

trace!("{} null {}", id, pos);
}
for (b, _) in &blobs {
for b in &blobs {
{
let ix = b.read().unwrap().index().expect("blob index");
let pos = (ix % window_size) as usize;
trace!("{} caching {} at {}", id, ix, pos);
assert!(win[pos].data.is_none());
win[pos].data = Some(b.clone());
}
db_ledger.write_shared_blobs(vec![b])?;
}

db_ledger
.write_consecutive_blobs(&blobs)
.expect("Unrecoverable failure to write to database");
}

// Fill in the coding blob data from the window data blobs
Expand Down
Loading

0 comments on commit 666af1e

Please sign in to comment.