diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 19fa0a8e05a0c1..25f4f28ef7bc36 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -1535,7 +1535,7 @@ pub mod tests { fn test_read_blobs_bytes() { let shared_blobs = make_tiny_test_entries(10).to_shared_blobs(); let slot = 0; - index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, slot); + index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, slot, 0); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 73c10d657d2001..17959fdfc31f92 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -53,13 +53,6 @@ impl Broadcast { inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1); let max_tick_height = (bank.slot() + 1) * bank.ticks_per_slot() - 1; - // TODO: Fix BankingStage/BroadcastStage to operate on `slot` directly instead of - // `max_tick_height` - let mut blob_index = blocktree - .meta(bank.slot()) - .expect("Database error") - .map(|meta| meta.consumed) - .unwrap_or(0); let now = Instant::now(); let mut num_entries = entries.len(); @@ -90,19 +83,20 @@ impl Broadcast { }) .collect(); - index_blobs(&blobs, &self.id, &mut blob_index, bank.slot()); - let parent = bank.parents().first().map(|bank| bank.slot()).unwrap_or(0); - for b in blobs.iter() { - b.write().unwrap().set_parent(parent); - } - - let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); - - let broadcast_start = Instant::now(); + let blob_index = blocktree + .meta(bank.slot()) + .expect("Database error") + .map(|meta| meta.consumed) + .unwrap_or(0); - inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); + index_blobs( + &blobs, + &self.id, + blob_index, + bank.slot(), + bank.parent().map_or(0, |parent| parent.slot()), + ); - assert!(last_tick <= max_tick_height); let contains_last_tick = last_tick == max_tick_height; if contains_last_tick { @@ -111,9 +105,15 @@ impl Broadcast { blocktree.write_shared_blobs(&blobs)?; + let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); + + let broadcast_start = Instant::now(); + // Send out data ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?; + inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); + // Fill in the coding blob data from the window data blobs #[cfg(feature = "erasure")] { diff --git a/core/src/db_window.rs b/core/src/db_window.rs index f8786d6f616581..ed769d36b06bad 100644 --- a/core/src/db_window.rs +++ b/core/src/db_window.rs @@ -415,7 +415,7 @@ mod test { let num_entries = 10; let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs(); - index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, slot); + index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, slot, 0); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); @@ -505,7 +505,7 @@ mod test { let original_entries = make_tiny_test_entries(num_entries); let shared_blobs = original_entries.clone().to_shared_blobs(); - index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, 0); + index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, 0, 0); for blob in shared_blobs.iter().rev() { process_blob(&blocktree, blob).expect("Expect successful processing of blob"); diff --git a/core/src/erasure.rs b/core/src/erasure.rs index cf049d7c96316b..afaa8d9578a8f0 100644 --- a/core/src/erasure.rs +++ b/core/src/erasure.rs @@ -890,7 +890,7 @@ pub mod test { } // Make some dummy slots - index_blobs(&blobs, &Keypair::new().pubkey(), &mut (offset as u64), slot); + index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, slot, 0); for b in blobs { let idx = b.read().unwrap().index() as usize % WINDOW_SIZE; @@ -903,7 +903,7 @@ pub mod test { fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec { let blobs = make_tiny_test_entries(num_blobs).to_shared_blobs(); - index_blobs(&blobs, &Keypair::new().pubkey(), &mut (offset as u64), 0); + index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, 0, 0); blobs } diff --git a/core/src/packet.rs b/core/src/packet.rs index b8807be4f9b95b..f7b79a75e61a86 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -463,16 +463,17 @@ impl Blob { } } -pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, blob_index: &mut u64, slot: u64) { +pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut blob_index: u64, slot: u64, parent: u64) { // enumerate all the blobs, those are the indices for blob in blobs.iter() { let mut blob = blob.write().unwrap(); - blob.set_index(*blob_index); + blob.set_index(blob_index); blob.set_slot(slot); + blob.set_parent(parent); blob.set_id(id); blob.forward(true); - *blob_index += 1; + blob_index += 1; } } diff --git a/tests/tvu.rs b/tests/tvu.rs index 1b5cd30121d7bb..48e13bb8acd3e9 100644 --- a/tests/tvu.rs +++ b/tests/tvu.rs @@ -160,7 +160,8 @@ fn test_replay() { let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2]; let blobs = entries.to_shared_blobs(); - index_blobs(&blobs, &leader.info.id, &mut blob_idx, 0); + index_blobs(&blobs, &leader.info.id, blob_idx, 0, 0); + blob_idx += blobs.len() as u64; blobs .iter() .for_each(|b| b.write().unwrap().meta.set_addr(&tvu_addr));