Skip to content

Commit

Permalink
indexes duplicate-shreds in gossip crds table (#29317)
Browse files Browse the repository at this point in the history
Also adding Crds::get_duplicate_shreds which retrieves all upserted
duplicate-shreds since a given cursor using the index.
  • Loading branch information
behzadnouri authored Dec 20, 2022
1 parent 9a82368 commit 2d849a2
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
14 changes: 14 additions & 0 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use {
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, IncrementalSnapshotHashes,
LowestSlot, NodeInstance, SnapshotHashes, Version, Vote, MAX_WALLCLOCK,
},
duplicate_shred::DuplicateShred,
epoch_slots::EpochSlots,
gossip_error::GossipError,
ping_pong::{self, PingCache, Pong},
Expand Down Expand Up @@ -1225,6 +1226,19 @@ impl ClusterInfo {
.collect()
}

/// Returns duplicate-shreds inserted since the given cursor.
#[allow(dead_code)]
pub(crate) fn get_duplicate_shreds(&self, cursor: &mut Cursor) -> Vec<DuplicateShred> {
let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds
.get_duplicate_shreds(cursor)
.map(|entry| match &entry.value.data {
CrdsData::DuplicateShred(_, dup) => dup.clone(),
_ => panic!("this should not happen!"),
})
.collect()
}

pub fn get_node_version(&self, pubkey: &Pubkey) -> Option<solana_version::Version> {
let gossip_crds = self.gossip.crds.read().unwrap();
if let Some(version) = gossip_crds.get::<&Version>(*pubkey) {
Expand Down
32 changes: 32 additions & 0 deletions gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ pub struct Crds {
votes: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Indices of EpochSlots keyed by insert order.
epoch_slots: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Indices of DuplicateShred keyed by insert order.
duplicate_shreds: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Indices of all crds values associated with a node.
records: HashMap<Pubkey, IndexSet<usize>>,
// Indices of all entries keyed by insert order.
Expand Down Expand Up @@ -157,6 +159,7 @@ impl Default for Crds {
nodes: IndexSet::default(),
votes: BTreeMap::default(),
epoch_slots: BTreeMap::default(),
duplicate_shreds: BTreeMap::default(),
records: HashMap::default(),
entries: BTreeMap::default(),
purged: VecDeque::default(),
Expand Down Expand Up @@ -227,6 +230,9 @@ impl Crds {
CrdsData::EpochSlots(_, _) => {
self.epoch_slots.insert(value.ordinal, entry_index);
}
CrdsData::DuplicateShred(_, _) => {
self.duplicate_shreds.insert(value.ordinal, entry_index);
}
_ => (),
};
self.entries.insert(value.ordinal, entry_index);
Expand Down Expand Up @@ -255,6 +261,10 @@ impl Crds {
self.epoch_slots.remove(&entry.get().ordinal);
self.epoch_slots.insert(value.ordinal, entry_index);
}
CrdsData::DuplicateShred(_, _) => {
self.duplicate_shreds.remove(&entry.get().ordinal);
self.duplicate_shreds.insert(value.ordinal, entry_index);
}
_ => (),
}
self.entries.remove(&entry.get().ordinal);
Expand Down Expand Up @@ -340,6 +350,21 @@ impl Crds {
})
}

/// Returns duplicate-shreds inserted since the given cursor.
/// Updates the cursor as the values are consumed.
pub(crate) fn get_duplicate_shreds<'a>(
&'a self,
cursor: &'a mut Cursor,
) -> impl Iterator<Item = &'a VersionedCrdsValue> {
let range = (Bound::Included(cursor.ordinal()), Bound::Unbounded);
self.duplicate_shreds
.range(range)
.map(move |(ordinal, index)| {
cursor.consume(*ordinal);
self.table.index(*index)
})
}

/// Returns all entries inserted since the given cursor.
pub(crate) fn get_entries<'a>(
&'a self,
Expand Down Expand Up @@ -499,6 +524,9 @@ impl Crds {
CrdsData::EpochSlots(_, _) => {
self.epoch_slots.remove(&value.ordinal);
}
CrdsData::DuplicateShred(_, _) => {
self.duplicate_shreds.remove(&value.ordinal);
}
_ => (),
}
self.entries.remove(&value.ordinal);
Expand Down Expand Up @@ -534,6 +562,9 @@ impl Crds {
CrdsData::EpochSlots(_, _) => {
self.epoch_slots.insert(value.ordinal, index);
}
CrdsData::DuplicateShred(_, _) => {
self.duplicate_shreds.insert(value.ordinal, index);
}
_ => (),
};
self.entries.insert(value.ordinal, index);
Expand Down Expand Up @@ -618,6 +649,7 @@ impl Crds {
nodes: self.nodes.clone(),
votes: self.votes.clone(),
epoch_slots: self.epoch_slots.clone(),
duplicate_shreds: self.duplicate_shreds.clone(),
records: self.records.clone(),
entries: self.entries.clone(),
purged: self.purged.clone(),
Expand Down

0 comments on commit 2d849a2

Please sign in to comment.