Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Bitwise compress incomplete epoch slots (bp #8341) #8344

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
//!
//! Bank needs to provide an interface for us to query the stake weight
use crate::crds_value::EpochIncompleteSlots;
use crate::packet::limited_deserialize;
use crate::streamer::{PacketReceiver, PacketSender};
use crate::{
Expand Down Expand Up @@ -68,6 +69,8 @@ const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HE
/// The largest protocol header size
const MAX_PROTOCOL_HEADER_SIZE: u64 = 214;

const NUM_BITS_PER_BYTE: u64 = 8;

#[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError {
NoPeers,
Expand Down Expand Up @@ -307,10 +310,91 @@ impl ClusterInfo {
)
}

<<<<<<< HEAD
pub fn push_epoch_slots(&mut self, id: Pubkey, root: Slot, min: Slot, slots: BTreeSet<Slot>) {
let now = timestamp();
let entry = CrdsValue::new_signed(
CrdsData::EpochSlots(EpochSlots::new(id, root, min, slots, now)),
=======
pub fn compress_incomplete_slots(incomplete_slots: &BTreeSet<Slot>) -> EpochIncompleteSlots {
if !incomplete_slots.is_empty() {
let first_slot = incomplete_slots
.iter()
.next()
.expect("expected to find at least one slot");
let last_slot = incomplete_slots
.iter()
.next_back()
.expect("expected to find last slot");
let num_uncompressed_bits = last_slot.saturating_sub(*first_slot) + 1;
let num_uncompressed_bytes = if num_uncompressed_bits % NUM_BITS_PER_BYTE > 0 {
1
} else {
0
} + num_uncompressed_bits / NUM_BITS_PER_BYTE;
let mut uncompressed = vec![0u8; num_uncompressed_bytes as usize];
incomplete_slots.iter().for_each(|slot| {
let offset_from_first_slot = slot.saturating_sub(*first_slot);
let index = offset_from_first_slot / NUM_BITS_PER_BYTE;
let bit_index = offset_from_first_slot % NUM_BITS_PER_BYTE;
uncompressed[index as usize] |= 1 << bit_index;
});
if let Ok(compressed) = uncompressed
.iter()
.cloned()
.encode(&mut GZipEncoder::new(), Action::Finish)
.collect::<std::result::Result<Vec<u8>, _>>()
{
return EpochIncompleteSlots {
first: *first_slot,
compressed_list: compressed,
};
}
}
EpochIncompleteSlots::default()
}

pub fn decompress_incomplete_slots(slots: &EpochIncompleteSlots) -> BTreeSet<Slot> {
let mut old_incomplete_slots: BTreeSet<Slot> = BTreeSet::new();

if let Ok(decompressed) = slots
.compressed_list
.iter()
.cloned()
.decode(&mut GZipDecoder::new())
.collect::<std::result::Result<Vec<u8>, _>>()
{
decompressed.iter().enumerate().for_each(|(i, val)| {
if *val != 0 {
(0..8).for_each(|bit_index| {
if (1 << bit_index & *val) != 0 {
let slot = slots.first + i as u64 * NUM_BITS_PER_BYTE + bit_index;
old_incomplete_slots.insert(slot as u64);
}
})
}
})
}

old_incomplete_slots
}

pub fn push_epoch_slots(
&mut self,
id: Pubkey,
root: Slot,
min: Slot,
slots: BTreeSet<Slot>,
incomplete_slots: &BTreeSet<Slot>,
) {
let compressed = Self::compress_incomplete_slots(incomplete_slots);
let now = timestamp();
let entry = CrdsValue::new_signed(
CrdsData::EpochSlots(
0,
EpochSlots::new(id, root, min, slots, vec![compressed], now),
),
>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341)
&self.keypair,
);
self.gossip
Expand Down Expand Up @@ -2120,13 +2204,27 @@ mod tests {
for i in 0..128 {
btree_slots.insert(i);
}
<<<<<<< HEAD
let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots {
from: Pubkey::default(),
root: 0,
lowest: 0,
slots: btree_slots,
wallclock: 0,
}));
=======
let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(
0,
EpochSlots {
from: Pubkey::default(),
root: 0,
lowest: 0,
slots: btree_slots,
stash: vec![],
wallclock: 0,
},
));
>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341)
test_split_messages(value);
}

Expand All @@ -2137,13 +2235,27 @@ mod tests {
let payload: Vec<CrdsValue> = vec![];
let vec_size = serialized_size(&payload).unwrap();
let desired_size = MAX_PROTOCOL_PAYLOAD_SIZE - vec_size;
<<<<<<< HEAD
let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots {
from: Pubkey::default(),
root: 0,
lowest: 0,
slots: BTreeSet::new(),
wallclock: 0,
}));
=======
let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots(
0,
EpochSlots {
from: Pubkey::default(),
root: 0,
lowest: 0,
slots: BTreeSet::new(),
stash: vec![],
wallclock: 0,
},
));
>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341)

let mut i = 0;
while value.size() <= desired_size {
Expand All @@ -2155,13 +2267,27 @@ mod tests {
desired_size
);
}
<<<<<<< HEAD
value.data = CrdsData::EpochSlots(EpochSlots {
from: Pubkey::default(),
root: 0,
lowest: 0,
slots,
wallclock: 0,
});
=======
value.data = CrdsData::EpochSlots(
0,
EpochSlots {
from: Pubkey::default(),
root: 0,
lowest: 0,
slots,
stash: vec![],
wallclock: 0,
},
);
>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341)
i += 1;
}
let split = ClusterInfo::split_gossip_messages(vec![value.clone()]);
Expand Down Expand Up @@ -2301,13 +2427,27 @@ mod tests {
let other_node_pubkey = Pubkey::new_rand();
let other_node = ContactInfo::new_localhost(&other_node_pubkey, timestamp());
cluster_info.insert_info(other_node.clone());
<<<<<<< HEAD
let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new(
other_node_pubkey,
peer_root,
peer_lowest,
BTreeSet::new(),
timestamp(),
)));
=======
let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(
0,
EpochSlots::new(
other_node_pubkey,
peer_root,
peer_lowest,
BTreeSet::new(),
vec![],
timestamp(),
),
));
>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341)
let _ = cluster_info.gossip.crds.insert(value, timestamp());
}
// only half the visible peers should be eligible to serve this repair
Expand Down Expand Up @@ -2367,4 +2507,35 @@ mod tests {
serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize;
PACKET_DATA_SIZE - (protocol_size - filter_size)
}
<<<<<<< HEAD
=======

#[test]
fn test_compress_incomplete_slots() {
let mut incomplete_slots: BTreeSet<Slot> = BTreeSet::new();

assert_eq!(
EpochIncompleteSlots::default(),
ClusterInfo::compress_incomplete_slots(&incomplete_slots)
);

incomplete_slots.insert(100);
let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(100, compressed.first);
let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed);
assert_eq!(incomplete_slots, decompressed);

incomplete_slots.insert(104);
let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(100, compressed.first);
let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed);
assert_eq!(incomplete_slots, decompressed);

incomplete_slots.insert(80);
let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(80, compressed.first);
let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed);
assert_eq!(incomplete_slots, decompressed);
}
>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341)
}
46 changes: 38 additions & 8 deletions core/src/crds_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use std::{
pub type VoteIndex = u8;
pub const MAX_VOTES: VoteIndex = 32;

pub type EpochSlotIndex = u8;

/// CrdsValue that is replicated across the cluster
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct CrdsValue {
Expand Down Expand Up @@ -58,7 +60,13 @@ impl Signable for CrdsValue {
pub enum CrdsData {
ContactInfo(ContactInfo),
Vote(VoteIndex, Vote),
EpochSlots(EpochSlots),
EpochSlots(EpochSlotIndex, EpochSlots),
}

#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq)]
pub struct EpochIncompleteSlots {
pub first: Slot,
pub compressed_list: Vec<u8>,
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
Expand All @@ -67,6 +75,10 @@ pub struct EpochSlots {
pub root: Slot,
pub lowest: Slot,
pub slots: BTreeSet<Slot>,
<<<<<<< HEAD
=======
pub stash: Vec<EpochIncompleteSlots>,
>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341)
pub wallclock: u64,
}

Expand All @@ -76,13 +88,21 @@ impl EpochSlots {
root: Slot,
lowest: Slot,
slots: BTreeSet<Slot>,
<<<<<<< HEAD
=======
stash: Vec<EpochIncompleteSlots>,
>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341)
wallclock: u64,
) -> Self {
Self {
from,
root,
lowest,
slots,
<<<<<<< HEAD
=======
stash,
>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341)
wallclock,
}
}
Expand Down Expand Up @@ -154,21 +174,21 @@ impl CrdsValue {
match &self.data {
CrdsData::ContactInfo(contact_info) => contact_info.wallclock,
CrdsData::Vote(_, vote) => vote.wallclock,
CrdsData::EpochSlots(vote) => vote.wallclock,
CrdsData::EpochSlots(_, vote) => vote.wallclock,
}
}
pub fn pubkey(&self) -> Pubkey {
match &self.data {
CrdsData::ContactInfo(contact_info) => contact_info.id,
CrdsData::Vote(_, vote) => vote.from,
CrdsData::EpochSlots(slots) => slots.from,
CrdsData::EpochSlots(_, slots) => slots.from,
}
}
pub fn label(&self) -> CrdsValueLabel {
match &self.data {
CrdsData::ContactInfo(_) => CrdsValueLabel::ContactInfo(self.pubkey()),
CrdsData::Vote(ix, _) => CrdsValueLabel::Vote(*ix, self.pubkey()),
CrdsData::EpochSlots(_) => CrdsValueLabel::EpochSlots(self.pubkey()),
CrdsData::EpochSlots(_, _) => CrdsValueLabel::EpochSlots(self.pubkey()),
}
}
pub fn contact_info(&self) -> Option<&ContactInfo> {
Expand All @@ -193,7 +213,7 @@ impl CrdsValue {

pub fn epoch_slots(&self) -> Option<&EpochSlots> {
match &self.data {
CrdsData::EpochSlots(slots) => Some(slots),
CrdsData::EpochSlots(_, slots) => Some(slots),
_ => None,
}
}
Expand Down Expand Up @@ -277,13 +297,16 @@ mod test {
let key = v.clone().vote().unwrap().from;
assert_eq!(v.label(), CrdsValueLabel::Vote(0, key));

let v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new(
Pubkey::default(),
0,
let v = CrdsValue::new_unsigned(CrdsData::EpochSlots(
0,
<<<<<<< HEAD
BTreeSet::new(),
0,
)));
=======
EpochSlots::new(Pubkey::default(), 0, 0, BTreeSet::new(), vec![], 0),
));
>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341)
assert_eq!(v.wallclock(), 0);
let key = v.clone().epoch_slots().unwrap().from;
assert_eq!(v.label(), CrdsValueLabel::EpochSlots(key));
Expand All @@ -304,13 +327,20 @@ mod test {
));
verify_signatures(&mut v, &keypair, &wrong_keypair);
let btreeset: BTreeSet<Slot> = vec![1, 2, 3, 6, 8].into_iter().collect();
<<<<<<< HEAD
v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new(
keypair.pubkey(),
0,
0,
btreeset,
timestamp(),
)));
=======
v = CrdsValue::new_unsigned(CrdsData::EpochSlots(
0,
EpochSlots::new(keypair.pubkey(), 0, 0, btreeset, vec![], timestamp()),
));
>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341)
verify_signatures(&mut v, &keypair, &wrong_keypair);
}

Expand Down