From 8b39eb5e4e37fc71cd16298f3ad0551dc002d07e Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Tue, 12 Feb 2019 10:56:48 -0800 Subject: [PATCH] Replace Blob Ids with Forward property (#2734) * Replace Blob Id with Blob forwarding * Update simulation to properly propagate blobs --- src/blocktree.rs | 10 ++--- src/broadcast_service.rs | 2 +- src/chacha.rs | 4 +- src/cluster_info.rs | 1 - src/db_window.rs | 67 +++++++++-------------------- src/entry.rs | 3 +- src/erasure.rs | 25 ++++------- src/fullnode.rs | 15 +++---- src/packet.rs | 29 ++++++++----- src/retransmit_stage.rs | 92 ++++++++++++++++++++++++++++++---------- src/tvu.rs | 3 +- src/window_service.rs | 20 +++------ 12 files changed, 139 insertions(+), 132 deletions(-) diff --git a/src/blocktree.rs b/src/blocktree.rs index 178f102f59b6ab..de0b7175ec2224 100644 --- a/src/blocktree.rs +++ b/src/blocktree.rs @@ -1261,7 +1261,7 @@ pub fn create_new_ledger(ledger_path: &str, genesis_block: &GenesisBlock) -> Res Ok((1, entries[0].id)) } -pub fn genesis<'a, I>(ledger_path: &str, keypair: &Keypair, entries: I) -> Result<()> +pub fn genesis<'a, I>(ledger_path: &str, entries: I) -> Result<()> where I: IntoIterator, { @@ -1274,7 +1274,7 @@ where .map(|(idx, entry)| { let mut b = entry.borrow().to_blob(); b.set_index(idx as u64); - b.set_id(&keypair.pubkey()); + b.forward(true); b.set_slot(DEFAULT_SLOT_HEIGHT); b }) @@ -1408,7 +1408,7 @@ mod tests { fn test_read_blobs_bytes() { let shared_blobs = make_tiny_test_entries(10).to_shared_blobs(); let slot = DEFAULT_SLOT_HEIGHT; - index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, &[slot; 10]); + index_blobs(&shared_blobs, &mut 0, &[slot; 10]); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); @@ -1779,7 +1779,7 @@ mod tests { let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator"); { - genesis(&ledger_path, &Keypair::new(), &entries).unwrap(); + genesis(&ledger_path, &entries).unwrap(); let ledger = Blocktree::open(&ledger_path).expect("open failed"); @@ -1797,7 +1797,7 @@ mod tests { let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator"); { // put entries except last 2 into ledger - genesis(&ledger_path, &Keypair::new(), &entries[..entries.len() - 2]).unwrap(); + genesis(&ledger_path, &entries[..entries.len() - 2]).unwrap(); let ledger = Blocktree::open(&ledger_path).expect("open failed"); diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index 40523aeddb871c..2c85c05638a090 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -82,7 +82,7 @@ impl Broadcast { .collect(); // TODO: blob_index should be slot-relative... - index_blobs(&blobs, &self.id, &mut self.blob_index, &slots); + index_blobs(&blobs, &mut self.blob_index, &slots); let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); diff --git a/src/chacha.rs b/src/chacha.rs index dc965fe0b15862..05f9965ea29e8a 100644 --- a/src/chacha.rs +++ b/src/chacha.rs @@ -166,11 +166,11 @@ mod tests { use bs58; // golden needs to be updated if blob stuff changes.... let golden = Hash::new( - &bs58::decode("nzxMWDQVsftBZbMGA1ika8X6bAKy7vya1jfXnVZSErt") + &bs58::decode("8NMJBwpXoBoA7YrA5CemRtGtfAqoY15bvnCqVjh4LYpS") .into_vec() .unwrap(), ); - assert_eq!(hasher.result(), golden,); + assert_eq!(hasher.result(), golden); remove_file(out_path).unwrap(); } } diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 59048013e82655..98f23a96e9c23a 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -583,7 +583,6 @@ impl ClusterInfo { let s = obj.read().unwrap(); (s.my_data().clone(), peers) }; - blob.write().unwrap().set_id(&me.id); let rblob = blob.read().unwrap(); trace!("retransmit orders {}", orders.len()); let errs: Vec<_> = orders diff --git a/src/db_window.rs b/src/db_window.rs index b936a9f8c223ee..6a0b351ee6c8ae 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -16,7 +16,7 @@ use std::sync::{Arc, RwLock}; pub const MAX_REPAIR_LENGTH: usize = 128; -pub fn retransmit_all_leader_blocks( +pub fn retransmit_blobs( dq: &[SharedBlob], leader_scheduler: &Arc>, retransmit: &BlobSender, @@ -24,16 +24,16 @@ pub fn retransmit_all_leader_blocks( ) -> Result<()> { let mut retransmit_queue: Vec = Vec::new(); for b in dq { - // Check if the blob is from the scheduled leader for its slot. If so, - // add to the retransmit_queue + // Don't add blobs generated by this node to the retransmit queue let slot = b.read().unwrap().slot(); if let Some(leader_id) = leader_scheduler.read().unwrap().get_leader_for_slot(slot) { if leader_id != *id { - add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue); + retransmit_queue.push(b.clone()); } } } + //todo maybe move this to where retransmit is actually happening submit( influxdb::Point::new("retransmit-queue") .add_field( @@ -50,24 +50,6 @@ pub fn retransmit_all_leader_blocks( Ok(()) } -pub fn add_blob_to_retransmit_queue( - b: &SharedBlob, - leader_id: Pubkey, - retransmit_queue: &mut Vec, -) { - let p = b.read().unwrap(); - if p.id() == leader_id { - let nv = SharedBlob::default(); - { - let mut mnv = nv.write().unwrap(); - let sz = p.meta.size; - mnv.meta.size = sz; - mnv.data[..sz].copy_from_slice(&p.data[..sz]); - } - retransmit_queue.push(nv); - } -} - /// Process a blob: Add blob to the ledger window. pub fn process_blob( leader_scheduler: &Arc>, @@ -216,7 +198,7 @@ mod test { } #[test] - pub fn test_retransmit() { + pub fn test_send_to_retransmit_stage() { let leader = Keypair::new().pubkey(); let nonleader = Keypair::new().pubkey(); let mut leader_scheduler = LeaderScheduler::default(); @@ -226,39 +208,38 @@ mod test { let (blob_sender, blob_receiver) = channel(); - // Expect blob from leader to be retransmitted - blob.write().unwrap().set_id(&leader); - retransmit_all_leader_blocks( + // Expect all blobs to be sent to retransmit_stage + blob.write().unwrap().forward(false); + retransmit_blobs( &vec![blob.clone()], &leader_scheduler, &blob_sender, &nonleader, ) .expect("Expect successful retransmit"); - let output_blob = blob_receiver + let _ = blob_receiver .try_recv() .expect("Expect input blob to be retransmitted"); - // Retransmitted blob should be missing the leader id - assert_ne!(*output_blob[0].read().unwrap(), *blob.read().unwrap()); - // Set the leader in the retransmitted blob, should now match the original - output_blob[0].write().unwrap().set_id(&leader); - assert_eq!(*output_blob[0].read().unwrap(), *blob.read().unwrap()); - - // Expect blob from nonleader to not be retransmitted - blob.write().unwrap().set_id(&nonleader); - retransmit_all_leader_blocks( + blob.write().unwrap().forward(true); + retransmit_blobs( &vec![blob.clone()], &leader_scheduler, &blob_sender, &nonleader, ) .expect("Expect successful retransmit"); - assert!(blob_receiver.try_recv().is_err()); + let output_blob = blob_receiver + .try_recv() + .expect("Expect input blob to be retransmitted"); + + // retransmit_blobs shouldn't be modifying the blob. That is retransmit stage's job now + assert_eq!(*output_blob[0].read().unwrap(), *blob.read().unwrap()); // Expect blob from leader while currently leader to not be retransmitted - blob.write().unwrap().set_id(&leader); - retransmit_all_leader_blocks(&vec![blob], &leader_scheduler, &blob_sender, &leader) + // Even when forward is set + blob.write().unwrap().forward(true); + retransmit_blobs(&vec![blob], &leader_scheduler, &blob_sender, &leader) .expect("Expect successful retransmit"); assert!(blob_receiver.try_recv().is_err()); } @@ -470,12 +451,7 @@ mod test { let num_entries = 10; let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs(); - index_blobs( - &shared_blobs, - &Keypair::new().pubkey(), - &mut 0, - &vec![slot; num_entries], - ); + index_blobs(&shared_blobs, &mut 0, &vec![slot; num_entries]); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); @@ -572,7 +548,6 @@ mod test { index_blobs( &shared_blobs, - &Keypair::new().pubkey(), &mut 0, &vec![DEFAULT_SLOT_HEIGHT; num_entries], ); diff --git a/src/entry.rs b/src/entry.rs index d111753c63142c..1437fb13e3bd37 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -447,7 +447,6 @@ pub fn make_large_test_entries(num_entries: usize) -> Vec { #[cfg(test)] pub fn make_consecutive_blobs( - id: &Pubkey, num_blobs_to_make: u64, start_height: u64, start_hash: Hash, @@ -460,7 +459,7 @@ pub fn make_consecutive_blobs( for blob in &blobs { let mut blob = blob.write().unwrap(); blob.set_index(index); - blob.set_id(id); + blob.forward(true); blob.meta.set_addr(addr); index += 1; } diff --git a/src/erasure.rs b/src/erasure.rs index f52e06adb259e9..0fcddaaa5cd127 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -329,14 +329,14 @@ impl CodingGenerator { for data_blob in &data_locks[NUM_DATA - NUM_CODING..NUM_DATA] { let index = data_blob.index(); let slot = data_blob.slot(); - let id = data_blob.id(); + let should_forward = data_blob.should_forward(); let coding_blob = SharedBlob::default(); { let mut coding_blob = coding_blob.write().unwrap(); coding_blob.set_index(index); coding_blob.set_slot(slot); - coding_blob.set_id(&id); + coding_blob.forward(should_forward); coding_blob.set_size(max_data_size); coding_blob.set_coding(); } @@ -509,7 +509,6 @@ pub mod test { use crate::window::WindowSlot; use rand::{thread_rng, Rng}; use solana_sdk::pubkey::Pubkey; - use solana_sdk::signature::{Keypair, KeypairUtil}; use std::sync::Arc; #[test] @@ -756,23 +755,23 @@ pub mod test { for i in 0..max_data_size { coding_wl.data[i] = 0; } - // copy index and id from the data blob + // copy index and forward flag from the data blob if let Some(data) = &window[n].data { let data_rl = data.read().unwrap(); let index = data_rl.index(); let slot = data_rl.slot(); - let id = data_rl.id(); + let should_forward = data_rl.should_forward(); trace!( - "{} copying index {} id {:?} from data to coding", - id, + "{} copying index {} should_forward {:?} from data to coding", + should_forward, index, - id + should_forward ); coding_wl.set_index(index); coding_wl.set_slot(slot); - coding_wl.set_id(&id); + coding_wl.forward(should_forward); } coding_wl.set_size(max_data_size); coding_wl.set_coding(); @@ -890,12 +889,7 @@ pub mod test { } // Make some dummy slots - index_blobs( - &blobs, - &Keypair::new().pubkey(), - &mut (offset as u64), - &vec![slot; blobs.len()], - ); + index_blobs(&blobs, &mut (offset as u64), &vec![slot; blobs.len()]); for b in blobs { let idx = b.read().unwrap().index() as usize % WINDOW_SIZE; @@ -910,7 +904,6 @@ pub mod test { index_blobs( &blobs, - &Keypair::new().pubkey(), &mut (offset as u64), &vec![DEFAULT_SLOT_HEIGHT; blobs.len()], ); diff --git a/src/fullnode.rs b/src/fullnode.rs index cc93b9cfa45cdb..b97c0ee790e91d 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -794,16 +794,11 @@ mod tests { let tvu_address = &validator_info.tvu; - let msgs = make_consecutive_blobs( - &leader_id, - blobs_to_send, - ledger_initial_len, - last_id, - &tvu_address, - ) - .into_iter() - .rev() - .collect(); + let msgs = + make_consecutive_blobs(blobs_to_send, ledger_initial_len, last_id, &tvu_address) + .into_iter() + .rev() + .collect(); s_responder.send(msgs).expect("send"); t_responder }; diff --git a/src/packet.rs b/src/packet.rs index 682d98d87c24fe..10790dddf6cfea 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -7,7 +7,6 @@ use byteorder::{ByteOrder, LittleEndian}; use log::Level; use serde::Serialize; pub use solana_sdk::packet::PACKET_DATA_SIZE; -use solana_sdk::pubkey::Pubkey; use std::cmp; use std::fmt; use std::io; @@ -281,8 +280,8 @@ macro_rules! range { const PARENT_RANGE: std::ops::Range = range!(0, u64); const SLOT_RANGE: std::ops::Range = range!(PARENT_RANGE.end, u64); const INDEX_RANGE: std::ops::Range = range!(SLOT_RANGE.end, u64); -const ID_RANGE: std::ops::Range = range!(INDEX_RANGE.end, Pubkey); -const FLAGS_RANGE: std::ops::Range = range!(ID_RANGE.end, u32); +const FORWARD_RANGE: std::ops::Range = range!(INDEX_RANGE.end, bool); +const FLAGS_RANGE: std::ops::Range = range!(FORWARD_RANGE.end, u32); const SIZE_RANGE: std::ops::Range = range!(FLAGS_RANGE.end, u64); macro_rules! align { @@ -324,14 +323,15 @@ impl Blob { LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix); } - /// sender id, we use this for identifying if its a blob from the leader that we should - /// retransmit. eventually blobs should have a signature that we can use for spam filtering - pub fn id(&self) -> Pubkey { - Pubkey::new(&self.data[ID_RANGE]) + /// Used to determine whether or not this blob should be forwarded in retransmit + /// A bool is used here instead of a flag because this item is not intended to be signed when + /// blob signatures are introduced + pub fn should_forward(&self) -> bool { + self.data[FORWARD_RANGE][0] & 0x1 == 1 } - pub fn set_id(&mut self, id: &Pubkey) { - self.data[ID_RANGE].copy_from_slice(id.as_ref()) + pub fn forward(&mut self, forward: bool) { + self.data[FORWARD_RANGE][0] = u8::from(forward) } pub fn flags(&self) -> u32 { @@ -442,14 +442,14 @@ impl Blob { } } -pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, blob_index: &mut u64, slots: &[u64]) { +pub fn index_blobs(blobs: &[SharedBlob], blob_index: &mut u64, slots: &[u64]) { // enumerate all the blobs, those are the indices for (blob, slot) in blobs.iter().zip(slots) { let mut blob = blob.write().unwrap(); blob.set_index(*blob_index); blob.set_slot(*slot); - blob.set_id(id); + blob.forward(true); *blob_index += 1; } } @@ -567,5 +567,12 @@ mod tests { assert_eq!(b.index(), ::max_value()); assert_eq!(b.meta, Meta::default()); } + #[test] + fn test_blob_forward() { + let mut b = Blob::default(); + assert!(!b.should_forward()); + b.forward(true); + assert!(b.should_forward()); + } } diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 4ce1a8ad3faf14..aa8030114c5b11 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -7,6 +7,7 @@ use crate::cluster_info::{ }; use crate::counter::Counter; use crate::leader_scheduler::LeaderScheduler; +use crate::packet::SharedBlob; use crate::result::{Error, Result}; use crate::service::Service; use crate::streamer::BlobReceiver; @@ -28,13 +29,15 @@ use std::time::Duration; /// 1 - using the layer 1 index, broadcast to all layer 2 nodes assuming you know neighborhood size /// 1.2 - If no, then figure out what layer the node is in and who the neighbors are and only broadcast to them /// 1 - also check if there are nodes in lower layers and repeat the layer 1 to layer 2 logic + +/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance) fn compute_retransmit_peers( bank: &Arc, cluster_info: &Arc>, fanout: usize, hood_size: usize, grow: bool, -) -> Vec { +) -> (Vec, Vec) { let peers = cluster_info.read().unwrap().sorted_retransmit_peers(bank); let my_id = cluster_info.read().unwrap().id(); //calc num_layers and num_neighborhoods using the total number of nodes @@ -43,7 +46,7 @@ fn compute_retransmit_peers( if num_layers <= 1 { /* single layer data plane */ - peers + (peers, vec![]) } else { //find my index (my ix is the same as the first node with smaller stake) let my_index = peers @@ -56,15 +59,16 @@ fn compute_retransmit_peers( my_index.unwrap_or(peers.len() - 1), ); let upper_bound = cmp::min(locality.neighbor_bounds.1, peers.len()); - let mut retransmit_peers = peers[locality.neighbor_bounds.0..upper_bound].to_vec(); + let neighbors = peers[locality.neighbor_bounds.0..upper_bound].to_vec(); + let mut children = Vec::new(); for ix in locality.child_layer_peers { if let Some(peer) = peers.get(ix) { - retransmit_peers.push(peer.clone()); + children.push(peer.clone()); continue; } break; } - retransmit_peers + (neighbors, children) } } @@ -85,19 +89,32 @@ fn retransmit( .add_field("count", influxdb::Value::Integer(dq.len() as i64)) .to_owned(), ); - let retransmit_peers = compute_retransmit_peers( + let (neighbors, children) = compute_retransmit_peers( &bank, cluster_info, DATA_PLANE_FANOUT, NEIGHBORHOOD_SIZE, GROW_LAYER_CAPACITY, ); - for b in &mut dq { - ClusterInfo::retransmit_to(&cluster_info, &retransmit_peers, b, sock)?; + for b in &dq { + if b.read().unwrap().should_forward() { + ClusterInfo::retransmit_to(&cluster_info, &neighbors, ©_for_neighbors(b), sock)?; + } + // Always send blobs to children + ClusterInfo::retransmit_to(&cluster_info, &children, b, sock)?; } Ok(()) } +/// Modifies a blob for neighbors nodes +#[inline] +fn copy_for_neighbors(b: &SharedBlob) -> SharedBlob { + let mut blob = b.read().unwrap().clone(); + // Disable blob forwarding for neighbors + blob.forward(false); + Arc::new(RwLock::new(blob)) +} + /// Service to retransmit messages from the leader or layer 1 to relevant peer nodes. /// See `cluster_info` for network layer definitions. /// # Arguments @@ -204,7 +221,7 @@ mod tests { use std::sync::Mutex; use std::time::Instant; - type Nodes = HashMap, Receiver)>; + type Nodes = HashMap, Receiver<(i32, bool)>)>; fn num_threads() -> usize { sys_info::cpu_num().unwrap_or(10) as usize @@ -240,7 +257,7 @@ mod tests { // setup accounts for all nodes (leader has 0 bal) let (s, r) = channel(); - let senders: Arc>>> = + let senders: Arc>>> = Arc::new(Mutex::new(HashMap::new())); senders.lock().unwrap().insert(leader_info.id, s); let mut batches: Vec = Vec::with_capacity(num_threads); @@ -273,7 +290,7 @@ mod tests { assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 0); // create some "blobs". - let blobs: Vec<_> = (0..100).into_par_iter().map(|i| i as i32).collect(); + let blobs: Vec<(_, _)> = (0..100).into_par_iter().map(|i| (i as i32, true)).collect(); // pretend to broadcast from leader - cluster_info::create_broadcast_orders let mut broadcast_table = cluster_info.sorted_tvu_peers(&bank); @@ -283,7 +300,7 @@ mod tests { // send blobs to layer 1 nodes orders.iter().for_each(|(b, vc)| { vc.iter().for_each(|c| { - find_insert_blob(&c.id, *b, &mut batches); + find_insert_blob(&c.id, b.0, &mut batches); }) }); assert!(!batches.is_empty()); @@ -295,43 +312,62 @@ mod tests { let batch_size = batch.len(); let mut remaining = batch_size; let senders: HashMap<_, _> = senders.lock().unwrap().clone(); - let mut mapped_peers: HashMap>> = HashMap::new(); + // A map that holds neighbors and children senders for a given node + let mut mapped_peers: HashMap< + Pubkey, + (Vec>, Vec>), + > = HashMap::new(); while remaining > 0 { for (id, (recv, r)) in batch.iter_mut() { assert!(now.elapsed().as_secs() < timeout, "Timed out"); cluster.gossip.set_self(*id); if !mapped_peers.contains_key(id) { - let peers = compute_retransmit_peers( + let (neighbors, children) = compute_retransmit_peers( &bank, &Arc::new(RwLock::new(cluster.clone())), fanout, hood_size, GROW_LAYER_CAPACITY, ); + let vec_children: Vec<_> = children + .iter() + .map(|p| { + let s = senders.get(&p.id).unwrap(); + recv.iter().for_each(|i| { + let _ = s.send((*i, true)); + }); + s.clone() + }) + .collect(); - let vec_peers: Vec<_> = peers + let vec_neighbors: Vec<_> = neighbors .iter() .map(|p| { let s = senders.get(&p.id).unwrap(); recv.iter().for_each(|i| { - let _ = s.send(*i); + let _ = s.send((*i, false)); }); s.clone() }) .collect(); - mapped_peers.insert(*id, vec_peers); + mapped_peers.insert(*id, (vec_neighbors, vec_children)); } - let vec_peers = mapped_peers.get(id).unwrap().to_vec(); + let (vec_neighbors, vec_children) = mapped_peers.get(id).unwrap(); //send and recv if recv.len() < blobs.len() { loop { match r.try_recv() { - Ok(i) => { - if recv.insert(i) { - vec_peers.iter().for_each(|s| { - let _ = s.send(i); + Ok((data, retransmit)) => { + if recv.insert(data) { + vec_children.iter().for_each(|s| { + let _ = s.send((data, retransmit)); }); + if retransmit { + vec_neighbors.iter().for_each(|s| { + let _ = s.send((data, false)); + }) + } if recv.len() == blobs.len() { remaining -= 1; break; @@ -348,6 +384,8 @@ mod tests { }); } + //todo add tests with network failures + // Run with a single layer #[test] fn test_retransmit_small() { @@ -372,5 +410,13 @@ mod tests { run_simulation(num_nodes, DATA_PLANE_FANOUT / 10, NEIGHBORHOOD_SIZE / 10); } - //todo add tests with network failures + // Test that blobs always come out with forward unset for neighbors + #[test] + fn test_blob_for_neighbors() { + let blob = SharedBlob::default(); + blob.write().unwrap().forward(true); + let for_hoodies = copy_for_neighbors(&blob); + assert!(!for_hoodies.read().unwrap().should_forward()); + } + } diff --git a/src/tvu.rs b/src/tvu.rs index f3398455264bb3..57c2cb3ea9a0d3 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -303,7 +303,6 @@ pub mod tests { let mut cluster_info2 = ClusterInfo::new(target2.info.clone()); cluster_info2.insert_info(leader.info.clone()); cluster_info2.set_leader(leader.info.id); - let leader_id = leader.info.id; let cref2 = Arc::new(RwLock::new(cluster_info2)); let dr_2 = new_gossip(cref2, target2.sockets.gossip, exit.clone()); @@ -402,7 +401,7 @@ pub mod tests { let mut w = b.write().unwrap(); w.set_index(blob_idx); blob_idx += 1; - w.set_id(&leader_id); + w.forward(true); let serialized_entry = serialize(&entry).unwrap(); diff --git a/src/window_service.rs b/src/window_service.rs index b301192b9c3407..ed0894a8e67b53 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -50,7 +50,7 @@ fn recv_window( .to_owned(), ); - retransmit_all_leader_blocks(&dq, leader_scheduler, retransmit, id)?; + retransmit_blobs(&dq, leader_scheduler, retransmit, id)?; //send a contiguous set of blocks trace!("{} num blobs received: {}", id, dq.len()); @@ -215,16 +215,11 @@ mod test { let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let num_blobs_to_make = 10; let gossip_address = &leader_node.info.gossip; - let msgs = make_consecutive_blobs( - &me_id, - num_blobs_to_make, - 0, - Hash::default(), - &gossip_address, - ) - .into_iter() - .rev() - .collect();; + let msgs = + make_consecutive_blobs(num_blobs_to_make, 0, Hash::default(), &gossip_address) + .into_iter() + .rev() + .collect();; s_responder.send(msgs).expect("send"); t_responder }; @@ -290,8 +285,7 @@ mod test { leader_node.sockets.tvu.into_iter().map(Arc::new).collect(); let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let mut msgs = Vec::new(); - let blobs = - make_consecutive_blobs(&me_id, 14u64, 0, Hash::default(), &leader_node.info.gossip); + let blobs = make_consecutive_blobs(14u64, 0, Hash::default(), &leader_node.info.gossip); for v in 0..10 { let i = 9 - v;