Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add confirmed slots to EpochSlots #10246

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 143 additions & 19 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ impl ClusterInfo {
.crds
.lookup(&entry)
.and_then(CrdsValue::epoch_slots)
.cloned()
.map(|(_, e)| e.clone())
.unwrap_or_else(|| EpochSlots::new(self.id(), timestamp()))
}

Expand Down Expand Up @@ -595,9 +595,16 @@ impl ClusterInfo {
}
}

pub fn push_epoch_slots(&self, update: &[Slot]) {
// [start_index, start_index + num_epoch_slots) is the range of EpochSlots indexes
// that can be modified
pub fn push_epoch_slots(&self, update: &[Slot], start_index: u8, num_epoch_slots: u8) {
assert!(
num_epoch_slots > 0
&& start_index as u16 + num_epoch_slots as u16 - 1
<= crds_value::MAX_EPOCH_SLOTS as u16
);
let mut num = 0;
let mut current_slots: Vec<_> = (0..crds_value::MAX_EPOCH_SLOTS)
let mut current_slots: Vec<_> = (start_index..start_index + (num_epoch_slots - 1))
.filter_map(|ix| {
Some((
self.time_gossip_read_lock(
Expand All @@ -607,7 +614,7 @@ impl ClusterInfo {
.crds
.lookup(&CrdsValueLabel::EpochSlots(ix, self.id()))
.and_then(CrdsValue::epoch_slots)
.and_then(|x| Some((x.wallclock, x.first_slot()?)))?,
.and_then(|(_, x)| Some((x.wallclock, x.first_slot()?)))?,
ix,
))
})
Expand All @@ -620,10 +627,10 @@ impl ClusterInfo {
.unwrap_or(0);
let max_slot: Slot = update.iter().max().cloned().unwrap_or(0);
let total_slots = max_slot as isize - min_slot as isize;
let ratio = num_epoch_slots as f32 / crds_value::MAX_EPOCH_SLOTS as f32;
let num_expected_slots = (ratio * DEFAULT_SLOTS_PER_EPOCH as f32) as isize;
// WARN if CRDS is not storing at least a full epoch worth of slots
if DEFAULT_SLOTS_PER_EPOCH as isize > total_slots
&& crds_value::MAX_EPOCH_SLOTS as usize <= current_slots.len()
{
if num_expected_slots > total_slots && num_epoch_slots as usize <= current_slots.len() {
inc_new_counter_warn!("cluster_info-epoch_slots-filled", 1);
warn!(
"EPOCH_SLOTS are filling up FAST {}/{}",
Expand All @@ -632,9 +639,12 @@ impl ClusterInfo {
);
}
let mut reset = false;
let mut epoch_slot_index = current_slots.last().map(|(_, x)| *x).unwrap_or(0);
let mut epoch_slot_index = current_slots
.last()
.map(|(_, x)| *x - start_index)
.unwrap_or(0);
while num < update.len() {
let ix = (epoch_slot_index % crds_value::MAX_EPOCH_SLOTS) as u8;
let ix = (epoch_slot_index % num_epoch_slots) as u8 + start_index;
let now = timestamp();
let mut slots = if !reset {
self.lookup_epoch_slots(ix)
Expand Down Expand Up @@ -810,7 +820,10 @@ impl ClusterInfo {
.map(|x| map(x.value.lowest_slot().unwrap(), x.insert_timestamp))
}

pub fn get_epoch_slots_since(&self, since: Option<u64>) -> (Vec<EpochSlots>, Option<u64>) {
pub fn get_epoch_slots_since(
&self,
since: Option<u64>,
) -> (Vec<(EpochSlotsIndex, EpochSlots)>, Option<u64>) {
let vals: Vec<_> = self
.gossip
.read()
Expand All @@ -823,7 +836,12 @@ impl ClusterInfo {
.map(|since| x.insert_timestamp > since)
.unwrap_or(true)
})
.filter_map(|x| Some((x.value.epoch_slots()?.clone(), x.insert_timestamp)))
.filter_map(|x| {
Some((
x.value.epoch_slots().map(|(i, e)| (i, e.clone()))?,
x.insert_timestamp,
))
})
.collect();
let max = vals.iter().map(|x| x.1).max().or(since);
let vec = vals.into_iter().map(|x| x.0).collect();
Expand Down Expand Up @@ -2271,6 +2289,7 @@ pub fn stake_weight_peers<S: std::hash::BuildHasher>(
mod tests {
use super::*;
use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote};
use rand::Rng;
use rayon::prelude::*;
use solana_perf::test_tx::test_tx;
use solana_sdk::signature::{Keypair, Signer};
Expand Down Expand Up @@ -2650,7 +2669,7 @@ mod tests {
let (slots, since) = cluster_info.get_epoch_slots_since(None);
assert!(slots.is_empty());
assert!(since.is_none());
cluster_info.push_epoch_slots(&[0]);
cluster_info.push_epoch_slots(&[0], 0, crds_value::MAX_COMPLETED_EPOCH_SLOTS);

let (slots, since) = cluster_info.get_epoch_slots_since(Some(std::u64::MAX));
assert!(slots.is_empty());
Expand Down Expand Up @@ -2963,7 +2982,6 @@ mod tests {

#[test]
fn test_push_epoch_slots_large() {
use rand::Rng;
let node_keypair = Arc::new(Keypair::new());
let cluster_info = ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
Expand All @@ -2975,12 +2993,84 @@ mod tests {
let last = *range.last().unwrap_or(&0);
range.push(last + rand::thread_rng().gen_range(1, 32));
}
cluster_info.push_epoch_slots(&range[..16000]);
cluster_info.push_epoch_slots(&range[16000..]);
let (slots, since) = cluster_info.get_epoch_slots_since(None);
let slots: Vec<_> = slots.iter().flat_map(|x| x.to_slots(0)).collect();
assert_eq!(slots, range);
assert!(since.is_some());
let (_, update_slots, _) =
get_modified_epoch_slots(&cluster_info, &range, 0, crds_value::MAX_EPOCH_SLOTS, None);
assert_eq!(update_slots, range);
}

#[test]
fn test_push_epoch_slots_range() {
let node_keypair = Arc::new(Keypair::new());
let cluster_info = ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair.clone(),
);
let mut range: Vec<Slot> = vec![];
//random should be hard to compress
for _ in 0..32000 {
let last = *range.last().unwrap_or(&0);
range.push(last + rand::thread_rng().gen_range(1, 32));
}
let (epoch_slots, update_slots, since) =
get_modified_epoch_slots(&cluster_info, &range, 0, crds_value::MAX_EPOCH_SLOTS, None);

// Integrity checks
assert_eq!(update_slots, range);
let num_modified_epoch_slots = epoch_slots.len();
assert!(num_modified_epoch_slots <= crds_value::MAX_EPOCH_SLOTS as usize);
assert!(num_modified_epoch_slots > 20);

// Test with exactly as many EpochSlots as needed
let (_, update_slots, since) = get_modified_epoch_slots(
&cluster_info,
&range,
crds_value::MAX_COMPLETED_EPOCH_SLOTS,
num_modified_epoch_slots as u8,
since,
);
assert_eq!(update_slots, range);

// Test with fewer epoch slots than needed, modified EpochSlots should wrap around
let _ = get_modified_epoch_slots(
&cluster_info,
&range,
crds_value::MAX_COMPLETED_EPOCH_SLOTS,
num_modified_epoch_slots as u8 - 10,
since,
);

// Test that with multiple writes that cause modified EpochSlots to wrap around,
// correctness is still upheld
let cluster_info = ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
);
let mut since = None;
let num_iterations = 2
* ((crds_value::MAX_EPOCH_SLOTS - crds_value::MAX_COMPLETED_EPOCH_SLOTS + 1) as usize)
/ num_modified_epoch_slots;
for _ in 0..=num_iterations {
let (_, mut update_slots, new_since) = get_modified_epoch_slots(
&cluster_info,
&range,
crds_value::MAX_COMPLETED_EPOCH_SLOTS,
crds_value::MAX_EPOCH_SLOTS - crds_value::MAX_COMPLETED_EPOCH_SLOTS + 1,
since,
);
since = new_since;
let first = range[0];
// Find last instance of `first`, that must be where the write started
let begin_index = update_slots.len()
- update_slots
.iter()
.rev()
.position(|slot| *slot == first)
.unwrap()
- 1;
update_slots.rotate_left(begin_index);
update_slots.truncate(range.len());
assert_eq!(update_slots, range);
}
}

#[test]
Expand Down Expand Up @@ -3009,4 +3099,38 @@ mod tests {
let vote = CrdsValue::new_signed(CrdsData::Vote(1, vote), &Keypair::new());
assert!(bincode::serialized_size(&vote).unwrap() <= MAX_PROTOCOL_PAYLOAD_SIZE);
}

fn get_modified_epoch_slots(
cluster_info: &ClusterInfo,
slots: &[Slot],
start_epoch_slot_index: u8,
num_epoch_slots: u8,
since: Option<u64>,
) -> (Vec<(EpochSlotsIndex, EpochSlots)>, Vec<Slot>, Option<u64>) {
cluster_info.push_epoch_slots(
&slots[0..(slots.len() / 2)],
start_epoch_slot_index,
num_epoch_slots,
);
cluster_info.push_epoch_slots(
&slots[(slots.len() / 2)..],
start_epoch_slot_index,
num_epoch_slots,
);
let (epoch_slots, new_since) = cluster_info.get_epoch_slots_since(since);
let update_slots: Vec<_> = epoch_slots
.iter()
.flat_map(|(i, x)| {
assert!(
*i >= start_epoch_slot_index
&& *i <= start_epoch_slot_index + (num_epoch_slots - 1)
);
let res = x.to_slots(0);
res
})
.collect();
assert!(new_since.is_some());
assert!(epoch_slots.len() <= num_epoch_slots as usize);
(epoch_slots, update_slots, new_since)
}
}
Loading