Skip to content

Commit

Permalink
generate_pull_response optimization (bp #11597) (#11954)
Browse files Browse the repository at this point in the history
* generate_pull_response optimization (#11597)

(cherry picked from commit f519fde)

# Conflicts:
#	core/src/crds_gossip_pull.rs

* Fix merge conflicts

Co-authored-by: sakridge <[email protected]>
Co-authored-by: Carl <[email protected]>
  • Loading branch information
3 people authored Sep 1, 2020
1 parent daba428 commit 1afd1db
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 13 deletions.
9 changes: 9 additions & 0 deletions core/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
//! A value is updated to a new version if the labels match, and the value
//! wallclock is later, or the value hash is greater.
use crate::crds_gossip_pull::CrdsFilter;
use crate::crds_value::{CrdsValue, CrdsValueLabel};
use bincode::serialize;
use indexmap::map::IndexMap;
Expand All @@ -37,6 +38,8 @@ pub struct Crds {
/// Stores the map of labels and values
pub table: IndexMap<CrdsValueLabel, VersionedCrdsValue>,
pub num_inserts: usize,

pub masks: IndexMap<CrdsValueLabel, u64>,
}

#[derive(PartialEq, Debug)]
Expand Down Expand Up @@ -86,6 +89,7 @@ impl Default for Crds {
Crds {
table: IndexMap::new(),
num_inserts: 0,
masks: IndexMap::new(),
}
}
}
Expand Down Expand Up @@ -126,6 +130,10 @@ impl Crds {
.map(|current| new_value > *current)
.unwrap_or(true);
if do_insert {
self.masks.insert(
label.clone(),
CrdsFilter::hash_as_u64(&new_value.value_hash),
);
let old = self.table.insert(label, new_value);
self.num_inserts += 1;
Ok(old)
Expand Down Expand Up @@ -193,6 +201,7 @@ impl Crds {

pub fn remove(&mut self, key: &CrdsValueLabel) {
self.table.swap_remove(key);
self.masks.swap_remove(key);
}
}

Expand Down
46 changes: 33 additions & 13 deletions core/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,18 @@ impl CrdsFilter {
// for small ratios this can result in a negative number, ensure it returns 0 instead
((num_items / max_items).log2().ceil()).max(0.0) as u32
}
fn hash_as_u64(item: &Hash) -> u64 {
pub fn hash_as_u64(item: &Hash) -> u64 {
let arr = item.as_ref();
let mut accum = 0;
for (i, val) in arr.iter().enumerate().take(8) {
accum |= (u64::from(*val)) << (i * 8) as u64;
}
accum
}
pub fn test_mask_u64(&self, item: u64, ones: u64) -> bool {
let bits = item | ones;
bits == self.mask
}
pub fn test_mask(&self, item: &Hash) -> bool {
// only consider the highest mask_bits bits from the hash and set the rest to 1.
let ones = (!0u64).checked_shr(self.mask_bits).unwrap_or(!0u64);
Expand All @@ -116,6 +120,9 @@ impl CrdsFilter {
}
self.filter.contains(item)
}
pub fn filter_contains(&self, item: &Hash) -> bool {
self.filter.contains(item)
}
}

#[derive(Default)]
Expand Down Expand Up @@ -396,18 +403,31 @@ impl CrdsGossipPull {
return ret;
}
let mut total_skipped = 0;
for v in crds.table.values() {
recent.iter().for_each(|(i, (caller, filter))| {
//skip values that are too new
if v.value.wallclock() > caller.wallclock().checked_add(jitter).unwrap_or_else(|| 0)
{
total_skipped += 1;
return;
}
if !filter.contains(&v.value_hash) {
ret[*i].push(v.value.clone());
}
});
let mask_ones: Vec<_> = recent
.iter()
.map(|(_i, (_caller, filter))| (!0u64).checked_shr(filter.mask_bits).unwrap_or(!0u64))
.collect();
for (label, mask) in crds.masks.iter() {
recent
.iter()
.zip(mask_ones.iter())
.for_each(|((i, (caller, filter)), mask_ones)| {
if filter.test_mask_u64(*mask, *mask_ones) {
let item = crds.table.get(label).unwrap();

//skip values that are too new
if item.value.wallclock()
> caller.wallclock().checked_add(jitter).unwrap_or_else(|| 0)
{
total_skipped += 1;
return;
}

if !filter.filter_contains(&item.value_hash) {
ret[*i].push(item.value.clone());
}
}
});
}
inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped);
ret
Expand Down

0 comments on commit 1afd1db

Please sign in to comment.