Skip to content

Commit

Permalink
groom broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-solana committed Mar 7, 2019
1 parent 01fb76f commit b820d4f
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 27 deletions.
2 changes: 1 addition & 1 deletion core/src/blocktree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1535,7 +1535,7 @@ pub mod tests {
fn test_read_blobs_bytes() {
let shared_blobs = make_tiny_test_entries(10).to_shared_blobs();
let slot = 0;
index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, slot);
index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, slot, 0);

let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
Expand Down
36 changes: 18 additions & 18 deletions core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,6 @@ impl Broadcast {
inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);

let max_tick_height = (bank.slot() + 1) * bank.ticks_per_slot() - 1;
// TODO: Fix BankingStage/BroadcastStage to operate on `slot` directly instead of
// `max_tick_height`
let mut blob_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);

let now = Instant::now();
let mut num_entries = entries.len();
Expand Down Expand Up @@ -90,19 +83,20 @@ impl Broadcast {
})
.collect();

index_blobs(&blobs, &self.id, &mut blob_index, bank.slot());
let parent = bank.parents().first().map(|bank| bank.slot()).unwrap_or(0);
for b in blobs.iter() {
b.write().unwrap().set_parent(parent);
}

let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());

let broadcast_start = Instant::now();
let blob_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);

inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
index_blobs(
&blobs,
&self.id,
blob_index,
bank.slot(),
bank.parent().map_or(0, |parent| parent.slot()),
);

assert!(last_tick <= max_tick_height);
let contains_last_tick = last_tick == max_tick_height;

if contains_last_tick {
Expand All @@ -111,9 +105,15 @@ impl Broadcast {

blocktree.write_shared_blobs(&blobs)?;

let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());

let broadcast_start = Instant::now();

// Send out data
ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?;

inc_new_counter_info!("streamer-broadcast-sent", blobs.len());

// Fill in the coding blob data from the window data blobs
#[cfg(feature = "erasure")]
{
Expand Down
4 changes: 2 additions & 2 deletions core/src/db_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ mod test {
let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs();

index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, slot);
index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, slot, 0);

let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
Expand Down Expand Up @@ -505,7 +505,7 @@ mod test {
let original_entries = make_tiny_test_entries(num_entries);
let shared_blobs = original_entries.clone().to_shared_blobs();

index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, 0);
index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, 0, 0);

for blob in shared_blobs.iter().rev() {
process_blob(&blocktree, blob).expect("Expect successful processing of blob");
Expand Down
4 changes: 2 additions & 2 deletions core/src/erasure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ pub mod test {
}

// Make some dummy slots
index_blobs(&blobs, &Keypair::new().pubkey(), &mut (offset as u64), slot);
index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, slot, 0, 0);

for b in blobs {
let idx = b.read().unwrap().index() as usize % WINDOW_SIZE;
Expand All @@ -903,7 +903,7 @@ pub mod test {
fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> {
let blobs = make_tiny_test_entries(num_blobs).to_shared_blobs();

index_blobs(&blobs, &Keypair::new().pubkey(), &mut (offset as u64), 0);
index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, 0, 0);
blobs
}

Expand Down
7 changes: 4 additions & 3 deletions core/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,16 +463,17 @@ impl Blob {
}
}

pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, blob_index: &mut u64, slot: u64) {
pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut blob_index: u64, slot: u64, parent: u64) {
// enumerate all the blobs, those are the indices
for blob in blobs.iter() {
let mut blob = blob.write().unwrap();

blob.set_index(*blob_index);
blob.set_index(blob_index);
blob.set_slot(slot);
blob.set_parent(parent);
blob.set_id(id);
blob.forward(true);
*blob_index += 1;
blob_index += 1;
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ fn test_replay() {

let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2];
let blobs = entries.to_shared_blobs();
index_blobs(&blobs, &leader.info.id, &mut blob_idx, 0);
index_blobs(&blobs, &leader.info.id, &mut blob_idx, 0, 0);
blobs
.iter()
.for_each(|b| b.write().unwrap().meta.set_addr(&tvu_addr));
Expand Down

0 comments on commit b820d4f

Please sign in to comment.