Skip to content

Commit

Permalink
Update epoch slots to include all missing slots (#8276)
Browse files Browse the repository at this point in the history
* Update epoch slots to include all missing slots

* new test for compress/decompress

* address review comments

* limit cache based on size, instead of comparing roots
  • Loading branch information
pgarg66 committed Feb 20, 2020
1 parent c3ac858 commit cb9d183
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 116 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ bincode = "1.2.1"
bs58 = "0.3.0"
byteorder = "1.3.2"
chrono = { version = "0.4.10", features = ["serde"] }
compression = "0.1.5"
core_affinity = "0.5.10"
crc = { version = "1.8.1", optional = true }
crossbeam-channel = "0.3"
Expand Down
106 changes: 104 additions & 2 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
weighted_shuffle::{weighted_best, weighted_shuffle},
};
use bincode::{serialize, serialized_size};
use compression::prelude::*;
use core::cmp;
use itertools::Itertools;
use rayon::iter::IntoParallelIterator;
Expand Down Expand Up @@ -315,10 +316,75 @@ impl ClusterInfo {
)
}

pub fn push_epoch_slots(&mut self, id: Pubkey, root: Slot, min: Slot, slots: BTreeSet<Slot>) {
pub fn compress_incomplete_slots(incomplete_slots: &BTreeSet<Slot>) -> (Slot, Vec<u8>) {
if !incomplete_slots.is_empty() {
let first_slot = incomplete_slots
.iter()
.next()
.expect("expected to find at least one slot");
let last_slot = incomplete_slots
.iter()
.next_back()
.expect("expected to find last slot");
let mut uncompressed = vec![0u8; (last_slot.saturating_sub(*first_slot) + 1) as usize];
incomplete_slots.iter().for_each(|slot| {
uncompressed[slot.saturating_sub(*first_slot) as usize] = 1;
});
if let Ok(compressed) = uncompressed
.iter()
.cloned()
.encode(&mut GZipEncoder::new(), Action::Finish)
.collect::<std::result::Result<Vec<u8>, _>>()
{
(*first_slot, compressed)
} else {
(0, vec![])
}
} else {
(0, vec![])
}
}

pub fn decompress_incomplete_slots(first_slot: u64, compressed: &[u8]) -> BTreeSet<Slot> {
let mut old_incomplete_slots: BTreeSet<Slot> = BTreeSet::new();

if let Ok(decompressed) = compressed
.iter()
.cloned()
.decode(&mut GZipDecoder::new())
.collect::<std::result::Result<Vec<u8>, _>>()
{
decompressed.iter().enumerate().for_each(|(i, val)| {
if *val == 1 {
old_incomplete_slots.insert(first_slot + i as u64);
}
})
}

old_incomplete_slots
}

pub fn push_epoch_slots(
&mut self,
id: Pubkey,
root: Slot,
min: Slot,
slots: BTreeSet<Slot>,
incomplete_slots: &BTreeSet<Slot>,
) {
let (first_missing_slot, compressed_map) =
Self::compress_incomplete_slots(incomplete_slots);
let now = timestamp();
let entry = CrdsValue::new_signed(
CrdsData::EpochSlots(EpochSlots::new(id, root, min, slots, now)),
CrdsData::EpochSlots(EpochSlots::new(
id,
root,
min,
slots,
first_missing_slot,
compressed_map,
now,
)),
&self.keypair,
);
self.gossip
Expand Down Expand Up @@ -2157,6 +2223,8 @@ mod tests {
root: 0,
lowest: 0,
slots: btree_slots,
first_missing: 0,
stash: vec![],
wallclock: 0,
}));
test_split_messages(value);
Expand All @@ -2174,6 +2242,8 @@ mod tests {
root: 0,
lowest: 0,
slots: BTreeSet::new(),
first_missing: 0,
stash: vec![],
wallclock: 0,
}));

Expand All @@ -2192,6 +2262,8 @@ mod tests {
root: 0,
lowest: 0,
slots,
first_missing: 0,
stash: vec![],
wallclock: 0,
});
i += 1;
Expand Down Expand Up @@ -2338,6 +2410,8 @@ mod tests {
peer_root,
peer_lowest,
BTreeSet::new(),
0,
vec![],
timestamp(),
)));
let _ = cluster_info.gossip.crds.insert(value, timestamp());
Expand Down Expand Up @@ -2399,4 +2473,32 @@ mod tests {
serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize;
PACKET_DATA_SIZE - (protocol_size - filter_size)
}

#[test]
fn test_compress_incomplete_slots() {
let mut incomplete_slots: BTreeSet<Slot> = BTreeSet::new();

assert_eq!(
(0, vec![]),
ClusterInfo::compress_incomplete_slots(&incomplete_slots)
);

incomplete_slots.insert(100);
let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(100, first);
let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed);
assert_eq!(incomplete_slots, decompressed);

incomplete_slots.insert(104);
let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(100, first);
let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed);
assert_eq!(incomplete_slots, decompressed);

incomplete_slots.insert(80);
let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(80, first);
let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed);
assert_eq!(incomplete_slots, decompressed);
}
}
10 changes: 10 additions & 0 deletions core/src/crds_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub struct EpochSlots {
pub root: Slot,
pub lowest: Slot,
pub slots: BTreeSet<Slot>,
pub first_missing: Slot,
pub stash: Vec<u8>,
pub wallclock: u64,
}

Expand All @@ -76,13 +78,17 @@ impl EpochSlots {
root: Slot,
lowest: Slot,
slots: BTreeSet<Slot>,
first_missing: Slot,
stash: Vec<u8>,
wallclock: u64,
) -> Self {
Self {
from,
root,
lowest,
slots,
first_missing,
stash,
wallclock,
}
}
Expand Down Expand Up @@ -283,6 +289,8 @@ mod test {
0,
BTreeSet::new(),
0,
vec![],
0,
)));
assert_eq!(v.wallclock(), 0);
let key = v.clone().epoch_slots().unwrap().from;
Expand All @@ -309,6 +317,8 @@ mod test {
0,
0,
btreeset,
0,
vec![],
timestamp(),
)));
verify_signatures(&mut v, &keypair, &wrong_keypair);
Expand Down
Loading

0 comments on commit cb9d183

Please sign in to comment.