Skip to content

Commit

Permalink
indexes crds values by their insert order (backport solana-labs#16809) (
Browse files Browse the repository at this point in the history
solana-labs#17132)

* indexes crds values by their insert order

(cherry picked from commit dfa3e7a)

* reads gossip push messages off crds ordinal index

Having an ordinal index on crds values based on insert order allows to
efficiently filter values using a cursor. In particular
CrdsGossipPush::push_messages hash-map can be replaced with a cursor,
saving on the bookkeepings, purging, etc

(cherry picked from commit 22c02b9)

Co-authored-by: behzad nouri <[email protected]>
  • Loading branch information
mergify[bot] and behzadnouri authored May 10, 2021
1 parent efc3c0d commit 094271b
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 98 deletions.
51 changes: 46 additions & 5 deletions core/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub struct Crds {
epoch_slots: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Indices of all crds values associated with a node.
records: HashMap<Pubkey, IndexSet<usize>>,
// Indices of all entries keyed by insert order.
entries: BTreeMap<u64 /*insert order*/, usize /*index*/>,
}

#[derive(PartialEq, Debug)]
Expand Down Expand Up @@ -117,6 +119,7 @@ impl Default for Crds {
votes: IndexSet::default(),
epoch_slots: BTreeMap::default(),
records: HashMap::default(),
entries: BTreeMap::default(),
}
}
}
Expand Down Expand Up @@ -155,6 +158,7 @@ impl Crds {
local_timestamp: u64,
) -> Result<Option<VersionedCrdsValue>, CrdsError> {
let label = value.label();
let pubkey = value.pubkey();
let value = VersionedCrdsValue::new(value, self.cursor, local_timestamp);
match self.table.entry(label) {
Entry::Vacant(entry) => {
Expand All @@ -172,10 +176,8 @@ impl Crds {
}
_ => (),
};
self.records
.entry(value.value.pubkey())
.or_default()
.insert(entry_index);
self.entries.insert(value.ordinal, entry_index);
self.records.entry(pubkey).or_default().insert(entry_index);
self.cursor.consume(value.ordinal);
entry.insert(value);
Ok(None)
Expand All @@ -188,9 +190,11 @@ impl Crds {
self.epoch_slots.remove(&entry.get().ordinal);
self.epoch_slots.insert(value.ordinal, entry_index);
}
self.entries.remove(&entry.get().ordinal);
self.entries.insert(value.ordinal, entry_index);
// As long as the pubkey does not change, self.records
// does not need to be updated.
debug_assert_eq!(entry.get().value.pubkey(), value.value.pubkey());
debug_assert_eq!(entry.get().value.pubkey(), pubkey);
self.cursor.consume(value.ordinal);
Ok(Some(entry.insert(value)))
}
Expand Down Expand Up @@ -271,6 +275,18 @@ impl Crds {
})
}

/// Returns all entries inserted since the given cursor.
pub(crate) fn get_entries<'a>(
&'a self,
cursor: &'a mut Cursor,
) -> impl Iterator<Item = &'a VersionedCrdsValue> {
let range = (Bound::Included(cursor.ordinal()), Bound::Unbounded);
self.entries.range(range).map(move |(ordinal, index)| {
cursor.consume(*ordinal);
self.table.index(*index)
})
}

/// Returns all records associated with a pubkey.
pub(crate) fn get_records(&self, pubkey: &Pubkey) -> impl Iterator<Item = &VersionedCrdsValue> {
self.records
Expand Down Expand Up @@ -417,6 +433,7 @@ impl Crds {
}
_ => (),
}
self.entries.remove(&value.ordinal);
// Remove the index from records associated with the value's pubkey.
let pubkey = value.value.pubkey();
let mut records_entry = match self.records.entry(pubkey) {
Expand Down Expand Up @@ -451,6 +468,7 @@ impl Crds {
}
_ => (),
};
self.entries.insert(value.ordinal, index);
let pubkey = value.value.pubkey();
let records = self.records.get_mut(&pubkey).unwrap();
records.swap_remove(&size);
Expand Down Expand Up @@ -833,6 +851,25 @@ mod test {
_ => panic!("not a vote!"),
}
}
let num_entries = crds
.table
.values()
.filter(|value| value.ordinal >= since)
.count();
let mut cursor = Cursor(since);
assert_eq!(num_entries, crds.get_entries(&mut cursor).count());
assert_eq!(
cursor.0,
crds.entries
.iter()
.last()
.map(|(k, _)| k + 1)
.unwrap_or_default()
.max(since)
);
for value in crds.get_entries(&mut Cursor(since)) {
assert!(value.ordinal >= since);
}
let num_nodes = crds
.table
.values()
Expand All @@ -848,6 +885,10 @@ mod test {
.values()
.filter(|v| matches!(v.value.data, CrdsData::EpochSlots(_, _)))
.count();
assert_eq!(
crds.table.len(),
crds.get_entries(&mut Cursor::default()).count()
);
assert_eq!(num_nodes, crds.get_nodes_contact_info().count());
assert_eq!(num_votes, crds.get_votes(&mut Cursor::default()).count());
assert_eq!(
Expand Down
7 changes: 1 addition & 6 deletions core/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl CrdsGossip {
now: u64,
process_pull_stats: &mut ProcessPullStats,
) {
let success = self.pull.process_pull_responses(
self.pull.process_pull_responses(
&mut self.crds,
from,
responses,
Expand All @@ -294,7 +294,6 @@ impl CrdsGossip {
now,
process_pull_stats,
);
self.push.push_pull_responses(success, now);
}

pub fn make_timeouts_test(&self) -> HashMap<Pubkey, u64> {
Expand All @@ -316,10 +315,6 @@ impl CrdsGossip {
timeouts: &HashMap<Pubkey, u64>,
) -> usize {
let mut rv = 0;
if now > self.push.msg_timeout {
let min = now - self.push.msg_timeout;
self.push.purge_old_pending_push_messages(&self.crds, min);
}
if now > 5 * self.push.msg_timeout {
let min = now - 5 * self.push.msg_timeout;
self.push.purge_old_received_cache(min);
Expand Down
13 changes: 4 additions & 9 deletions core/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
crds::{Crds, CrdsError},
crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS},
crds_gossip_error::CrdsGossipError,
crds_value::{CrdsValue, CrdsValueLabel},
crds_value::CrdsValue,
ping_pong::PingCache,
};
use itertools::Itertools;
Expand Down Expand Up @@ -412,8 +412,7 @@ impl CrdsGossipPull {
mut failed_inserts: Vec<Hash>,
now: u64,
stats: &mut ProcessPullStats,
) -> Vec<(CrdsValueLabel, Hash, u64)> {
let mut success = vec![];
) {
let mut owners = HashSet::new();
for response in responses_expired_timeout {
match crds.insert(response, now) {
Expand All @@ -424,17 +423,14 @@ impl CrdsGossipPull {
}
}
for response in responses {
let label = response.label();
let wallclock = response.wallclock();
let owner = response.pubkey();
match crds.insert(response, now) {
Err(CrdsError::InsertFailed(value_hash)) => failed_inserts.push(value_hash),
Err(CrdsError::UnknownStakes) => (),
Ok(old) => {
stats.success += 1;
self.num_pulls += 1;
owners.insert(label.pubkey());
let value_hash = crds.get(&label).unwrap().value_hash;
success.push((label, value_hash, wallclock));
owners.insert(owner);
if let Some(val) = old {
self.purged_values.push_back((val.value_hash, now))
}
Expand All @@ -449,7 +445,6 @@ impl CrdsGossipPull {
self.purge_failed_inserts(now);
self.failed_inserts
.extend(failed_inserts.into_iter().zip(std::iter::repeat(now)));
success
}

pub fn purge_failed_inserts(&mut self, now: u64) {
Expand Down
Loading

0 comments on commit 094271b

Please sign in to comment.