Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

groom broadcast #3170

Merged
merged 1 commit into from
Mar 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/blocktree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1535,7 +1535,7 @@ pub mod tests {
fn test_read_blobs_bytes() {
let shared_blobs = make_tiny_test_entries(10).to_shared_blobs();
let slot = 0;
index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, slot);
index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, slot, 0);

let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
Expand Down
36 changes: 18 additions & 18 deletions core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,6 @@ impl Broadcast {
inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);

let max_tick_height = (bank.slot() + 1) * bank.ticks_per_slot() - 1;
// TODO: Fix BankingStage/BroadcastStage to operate on `slot` directly instead of
// `max_tick_height`
let mut blob_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);

let now = Instant::now();
let mut num_entries = entries.len();
Expand Down Expand Up @@ -90,19 +83,20 @@ impl Broadcast {
})
.collect();

index_blobs(&blobs, &self.id, &mut blob_index, bank.slot());
let parent = bank.parents().first().map(|bank| bank.slot()).unwrap_or(0);
for b in blobs.iter() {
b.write().unwrap().set_parent(parent);
}

let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());

let broadcast_start = Instant::now();
let blob_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);

inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
index_blobs(
&blobs,
&self.id,
blob_index,
bank.slot(),
bank.parent().map_or(0, |parent| parent.slot()),
);

assert!(last_tick <= max_tick_height);
let contains_last_tick = last_tick == max_tick_height;

if contains_last_tick {
Expand All @@ -111,9 +105,15 @@ impl Broadcast {

blocktree.write_shared_blobs(&blobs)?;

let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());

let broadcast_start = Instant::now();

// Send out data
ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?;

inc_new_counter_info!("streamer-broadcast-sent", blobs.len());

// Fill in the coding blob data from the window data blobs
#[cfg(feature = "erasure")]
{
Expand Down
4 changes: 2 additions & 2 deletions core/src/db_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ mod test {
let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs();

index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, slot);
index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, slot, 0);

let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
Expand Down Expand Up @@ -505,7 +505,7 @@ mod test {
let original_entries = make_tiny_test_entries(num_entries);
let shared_blobs = original_entries.clone().to_shared_blobs();

index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, 0);
index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, 0, 0);

for blob in shared_blobs.iter().rev() {
process_blob(&blocktree, blob).expect("Expect successful processing of blob");
Expand Down
4 changes: 2 additions & 2 deletions core/src/erasure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ pub mod test {
}

// Make some dummy slots
index_blobs(&blobs, &Keypair::new().pubkey(), &mut (offset as u64), slot);
index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, slot, 0);

for b in blobs {
let idx = b.read().unwrap().index() as usize % WINDOW_SIZE;
Expand All @@ -903,7 +903,7 @@ pub mod test {
fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> {
let blobs = make_tiny_test_entries(num_blobs).to_shared_blobs();

index_blobs(&blobs, &Keypair::new().pubkey(), &mut (offset as u64), 0);
index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, 0, 0);
blobs
}

Expand Down
7 changes: 4 additions & 3 deletions core/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,16 +463,17 @@ impl Blob {
}
}

pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, blob_index: &mut u64, slot: u64) {
pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut blob_index: u64, slot: u64, parent: u64) {
// enumerate all the blobs, those are the indices
for blob in blobs.iter() {
let mut blob = blob.write().unwrap();

blob.set_index(*blob_index);
blob.set_index(blob_index);
blob.set_slot(slot);
blob.set_parent(parent);
blob.set_id(id);
blob.forward(true);
*blob_index += 1;
blob_index += 1;
}
}

Expand Down
3 changes: 2 additions & 1 deletion tests/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ fn test_replay() {

let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2];
let blobs = entries.to_shared_blobs();
index_blobs(&blobs, &leader.info.id, &mut blob_idx, 0);
index_blobs(&blobs, &leader.info.id, blob_idx, 0, 0);
blob_idx += blobs.len() as u64;
blobs
.iter()
.for_each(|b| b.write().unwrap().meta.set_addr(&tvu_addr));
Expand Down