diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index d1889e2f8ec941..2c6b4cfcbbcfbf 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -897,6 +897,8 @@ impl ClusterInfo { } } + // TODO: If two threads call into this function then epoch_slot_index has a + // race condition and the threads will overwrite each other in crds table. pub fn push_epoch_slots(&self, mut update: &[Slot]) { let current_slots: Vec<_> = { let gossip = @@ -933,26 +935,28 @@ impl ClusterInfo { Some((_wallclock, _slot, index)) => *index, None => 0, }; + let self_pubkey = self.id(); + let mut entries = Vec::default(); while !update.is_empty() { let ix = (epoch_slot_index % crds_value::MAX_EPOCH_SLOTS) as u8; let now = timestamp(); let mut slots = if !reset { self.lookup_epoch_slots(ix) } else { - EpochSlots::new(self.id(), now) + EpochSlots::new(self_pubkey, now) }; let n = slots.fill(update, now); update = &update[n..]; if n > 0 { - let entry = CrdsValue::new_signed(CrdsData::EpochSlots(ix, slots), &self.keypair); - self.local_message_pending_push_queue - .lock() - .unwrap() - .push(entry); + let epoch_slots = CrdsData::EpochSlots(ix, slots); + let entry = CrdsValue::new_signed(epoch_slots, &self.keypair); + entries.push(entry); } epoch_slot_index += 1; reset = true; } + let mut gossip = self.gossip.write().unwrap(); + gossip.process_push_message(&self_pubkey, entries, timestamp()); } fn time_gossip_read_lock<'a>( @@ -3777,7 +3781,6 @@ mod tests { let slots = cluster_info.get_epoch_slots(&mut Cursor::default()); assert!(slots.is_empty()); cluster_info.push_epoch_slots(&[0]); - cluster_info.flush_push_queue(); let mut cursor = Cursor::default(); let slots = cluster_info.get_epoch_slots(&mut cursor); @@ -4134,9 +4137,7 @@ mod tests { range.push(last + rand::thread_rng().gen_range(1, 32)); } cluster_info.push_epoch_slots(&range[..16000]); - cluster_info.flush_push_queue(); cluster_info.push_epoch_slots(&range[16000..]); - cluster_info.flush_push_queue(); let slots = cluster_info.get_epoch_slots(&mut Cursor::default()); let slots: Vec<_> = slots.iter().flat_map(|x| x.to_slots(0)).collect(); assert_eq!(slots, range);