Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RestartLastVotedForkSlots for wen_restart. #33239

Merged
merged 20 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ce81bbc
Add RestartLastVotedForkSlots and RestartHeaviestFork for wen_restart.
wen-coding Sep 13, 2023
0edf40e
Fix linter errors.
wen-coding Sep 13, 2023
1ec981b
Merge branch 'master' into wen_restart_gossip_change
wen-coding Sep 13, 2023
a3749b2
Revert RestartHeaviestFork, it will be added in another PR.
wen-coding Sep 13, 2023
79bedf8
Merge branch 'master' into wen_restart_gossip_change
wen-coding Sep 13, 2023
719534d
Merge branch 'master' into wen_restart_gossip_change
wen-coding Sep 14, 2023
25e0117
Update frozen abi message.
wen-coding Sep 15, 2023
1fcc81d
Fix wrong number in test generation, change to pub(crate) to limit sc…
wen-coding Sep 15, 2023
b7077a7
Separate push_epoch_slots and push_restart_last_voted_fork_slots.
wen-coding Sep 15, 2023
354673a
Add RestartLastVotedForkSlots data structure.
wen-coding Sep 17, 2023
0028f64
Remove unused parts to make PR smaller.
wen-coding Sep 18, 2023
46d2054
Remove unused clone.
wen-coding Sep 18, 2023
cc11d42
Use CompressedSlotsVec to share code between EpochSlots and RestartLa…
wen-coding Sep 18, 2023
b0cead0
Merge branch 'master' into wen_restart_gossip_change
wen-coding Sep 18, 2023
5b7a724
Add total_messages to show how many messages are there.
wen-coding Oct 2, 2023
4821d8a
Merge branch 'master' into wen_restart_gossip_change
wen-coding Oct 2, 2023
ae8d01d
Reduce RestartLastVotedForkSlots to one packet (16k slots).
wen-coding Oct 2, 2023
1ee2699
Replace last_vote_slot with shred_version, revert CompressedSlotsVec.
wen-coding Oct 6, 2023
78a9ded
Merge branch 'master' into wen_restart_gossip_change
wen-coding Oct 9, 2023
866228b
Merge branch 'master' into wen_restart_gossip_change
wen-coding Oct 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "EnbW8mYTsPMndq9NkHLTkHJgduXvWSfSD6bBdmqQ8TiF")]
#[frozen_abi(digest = "CVvKB495YW6JN4w1rWwajyZmG5wvNhmD97V99rSv9fGw")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
Expand Down Expand Up @@ -398,7 +398,8 @@ fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) {
CrdsData::AccountsHashes(_) => true,
CrdsData::LowestSlot(_, _)
| CrdsData::LegacyVersion(_)
| CrdsData::DuplicateShred(_, _) => {
| CrdsData::DuplicateShred(_, _)
| CrdsData::RestartLastVotedForkSlots(_) => {
let stake = stakes.get(&value.pubkey()).copied();
stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP
}
Expand Down Expand Up @@ -4078,7 +4079,7 @@ mod tests {
ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, values.clone())
.collect();
let self_pubkey = solana_sdk::pubkey::new_rand();
assert!(splits.len() * 3 < NUM_CRDS_VALUES);
assert!(splits.len() * 2 < NUM_CRDS_VALUES);
// Assert that all messages are included in the splits.
assert_eq!(NUM_CRDS_VALUES, splits.iter().map(Vec::len).sum::<usize>());
splits
Expand Down
20 changes: 20 additions & 0 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,16 @@ pub(crate) fn submit_gossip_stats(
("SnapshotHashes-pull", crds_stats.pull.counts[10], i64),
("ContactInfo-push", crds_stats.push.counts[11], i64),
("ContactInfo-pull", crds_stats.pull.counts[11], i64),
(
"RestartLastVotedForkSlots-push",
crds_stats.push.counts[12],
i64
),
(
"RestartLastVotedForkSlots-pull",
crds_stats.pull.counts[12],
i64
),
(
"all-push",
crds_stats.push.counts.iter().sum::<usize>(),
Expand Down Expand Up @@ -664,6 +674,16 @@ pub(crate) fn submit_gossip_stats(
("SnapshotHashes-pull", crds_stats.pull.fails[10], i64),
("ContactInfo-push", crds_stats.push.fails[11], i64),
("ContactInfo-pull", crds_stats.pull.fails[11], i64),
(
"RestartLastVotedForkSlots-push",
crds_stats.push.fails[12],
i64
),
(
"RestartLastVotedForkSlots-pull",
crds_stats.pull.fails[12],
i64
),
("all-push", crds_stats.push.fails.iter().sum::<usize>(), i64),
("all-pull", crds_stats.pull.fails.iter().sum::<usize>(), i64),
);
Expand Down
3 changes: 2 additions & 1 deletion gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub enum GossipRoute<'a> {
PushMessage(/*from:*/ &'a Pubkey),
}

type CrdsCountsArray = [usize; 12];
type CrdsCountsArray = [usize; 13];

pub(crate) struct CrdsDataStats {
pub(crate) counts: CrdsCountsArray,
Expand Down Expand Up @@ -721,6 +721,7 @@ impl CrdsDataStats {
CrdsData::DuplicateShred(_, _) => 9,
CrdsData::SnapshotHashes(_) => 10,
CrdsData::ContactInfo(_) => 11,
CrdsData::RestartLastVotedForkSlots(_) => 12,
// Update CrdsCountsArray if new items are added here.
}
}
Expand Down
156 changes: 153 additions & 3 deletions gossip/src/crds_value.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use {
crate::{
cluster_info::MAX_LEGACY_SNAPSHOT_HASHES,
cluster_info::{MAX_CRDS_OBJECT_SIZE, MAX_LEGACY_SNAPSHOT_HASHES},
contact_info::ContactInfo,
deprecated,
duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
epoch_slots::EpochSlots,
epoch_slots::{CompressedSlots, EpochSlots, MAX_SLOTS_PER_ENTRY},
legacy_contact_info::LegacyContactInfo,
},
bincode::{serialize, serialized_size},
Expand Down Expand Up @@ -94,6 +94,7 @@ pub enum CrdsData {
DuplicateShred(DuplicateShredIndex, DuplicateShred),
SnapshotHashes(SnapshotHashes),
ContactInfo(ContactInfo),
RestartLastVotedForkSlots(RestartLastVotedForkSlots),
}

impl Sanitize for CrdsData {
Expand Down Expand Up @@ -132,6 +133,7 @@ impl Sanitize for CrdsData {
}
CrdsData::SnapshotHashes(val) => val.sanitize(),
CrdsData::ContactInfo(node) => node.sanitize(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.sanitize(),
}
}
}
Expand All @@ -145,7 +147,7 @@ pub(crate) fn new_rand_timestamp<R: Rng>(rng: &mut R) -> u64 {
impl CrdsData {
/// New random CrdsData for tests and benchmarks.
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> CrdsData {
let kind = rng.gen_range(0..7);
let kind = rng.gen_range(0..8);
// TODO: Implement other kinds of CrdsData here.
// TODO: Assign ranges to each arm proportional to their frequency in
// the mainnet crds table.
Expand All @@ -157,6 +159,9 @@ impl CrdsData {
3 => CrdsData::AccountsHashes(AccountsHashes::new_rand(rng, pubkey)),
4 => CrdsData::Version(Version::new_rand(rng, pubkey)),
5 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)),
6 => CrdsData::RestartLastVotedForkSlots(RestartLastVotedForkSlots::new_rand(
rng, pubkey,
)),
_ => CrdsData::EpochSlots(
rng.gen_range(0..MAX_EPOCH_SLOTS),
EpochSlots::new_rand(rng, pubkey),
Expand Down Expand Up @@ -485,6 +490,87 @@ impl Sanitize for NodeInstance {
}
}

#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, AbiExample, Debug)]
pub struct RestartLastVotedForkSlots {
pub from: Pubkey,
pub wallclock: u64,
pub slots: Vec<CompressedSlots>,
pub last_voted_hash: Hash,
pub shred_version: u16,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add shred_version here too.
It is only 2 bytes, and we need to filter on matching shred_version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


impl Sanitize for RestartLastVotedForkSlots {
fn sanitize(&self) -> std::result::Result<(), SanitizeError> {
if self.slots.is_empty() {
return Err(SanitizeError::InvalidValue);
}
self.slots.sanitize()?;
self.last_voted_hash.sanitize()
}
}

impl RestartLastVotedForkSlots {
pub fn new(from: Pubkey, now: u64, last_voted_hash: Hash, shred_version: u16) -> Self {
Self {
from,
wallclock: now,
slots: Vec::new(),
last_voted_hash,
shred_version,
}
}

/// New random Version for tests and benchmarks.
pub fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
let mut result =
RestartLastVotedForkSlots::new(pubkey, new_rand_timestamp(rng), Hash::new_unique(), 1);
let num_slots = rng.gen_range(2..20);
let mut slots = std::iter::repeat_with(|| 47825632 + rng.gen_range(0..512))
.take(num_slots)
.collect::<Vec<Slot>>();
slots.sort();
result.fill(&slots);
result
}

pub fn fill(&mut self, slots: &[Slot]) -> usize {
let slots = &slots[slots.len().saturating_sub(MAX_SLOTS_PER_ENTRY)..];
let mut num = 0;
let space = self.max_compressed_slot_size();
if space == 0 {
return 0;
}
while num < slots.len() {
let mut cslot = CompressedSlots::new(space as usize);
num += cslot.add(&slots[num..]);
self.slots.push(cslot);
}
num
}

pub fn deflate(&mut self) {
for s in self.slots.iter_mut() {
let _ = s.deflate();
}
}

pub fn max_compressed_slot_size(&self) -> isize {
let len_header = serialized_size(self).unwrap();
let len_slot = serialized_size(&CompressedSlots::default()).unwrap();
MAX_CRDS_OBJECT_SIZE as isize - (len_header + len_slot) as isize
}

pub fn to_slots(&self, min_slot: Slot) -> Vec<Slot> {
self.slots
.iter()
.filter(|s| min_slot < s.first_slot() + s.num_slots() as u64)
.filter_map(|s| s.to_slots(min_slot).ok())
.flatten()
.collect()
}
}

/// Type of the replicated value
/// These are labels for values in a record that is associated with `Pubkey`
#[derive(PartialEq, Hash, Eq, Clone, Debug)]
Expand All @@ -501,6 +587,7 @@ pub enum CrdsValueLabel {
DuplicateShred(DuplicateShredIndex, Pubkey),
SnapshotHashes(Pubkey),
ContactInfo(Pubkey),
RestartLastVotedForkSlots(Pubkey),
}

impl fmt::Display for CrdsValueLabel {
Expand All @@ -524,6 +611,9 @@ impl fmt::Display for CrdsValueLabel {
write!(f, "SnapshotHashes({})", self.pubkey())
}
CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()),
CrdsValueLabel::RestartLastVotedForkSlots(_) => {
write!(f, "RestartLastVotedForkSlots({})", self.pubkey())
}
}
}
}
Expand All @@ -543,6 +633,7 @@ impl CrdsValueLabel {
CrdsValueLabel::DuplicateShred(_, p) => *p,
CrdsValueLabel::SnapshotHashes(p) => *p,
CrdsValueLabel::ContactInfo(pubkey) => *pubkey,
CrdsValueLabel::RestartLastVotedForkSlots(p) => *p,
}
}
}
Expand Down Expand Up @@ -593,6 +684,7 @@ impl CrdsValue {
CrdsData::DuplicateShred(_, shred) => shred.wallclock,
CrdsData::SnapshotHashes(hash) => hash.wallclock,
CrdsData::ContactInfo(node) => node.wallclock(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.wallclock,
}
}
pub fn pubkey(&self) -> Pubkey {
Expand All @@ -609,6 +701,7 @@ impl CrdsValue {
CrdsData::DuplicateShred(_, shred) => shred.from,
CrdsData::SnapshotHashes(hash) => hash.from,
CrdsData::ContactInfo(node) => *node.pubkey(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.from,
}
}
pub fn label(&self) -> CrdsValueLabel {
Expand All @@ -627,6 +720,9 @@ impl CrdsValue {
CrdsData::DuplicateShred(ix, shred) => CrdsValueLabel::DuplicateShred(*ix, shred.from),
CrdsData::SnapshotHashes(_) => CrdsValueLabel::SnapshotHashes(self.pubkey()),
CrdsData::ContactInfo(node) => CrdsValueLabel::ContactInfo(*node.pubkey()),
CrdsData::RestartLastVotedForkSlots(_) => {
CrdsValueLabel::RestartLastVotedForkSlots(self.pubkey())
}
}
}
pub fn contact_info(&self) -> Option<&LegacyContactInfo> {
Expand Down Expand Up @@ -1073,4 +1169,58 @@ mod test {
assert!(node.should_force_push(&pubkey));
assert!(!node.should_force_push(&Pubkey::new_unique()));
}

#[test]
fn test_restart_last_voted_fork_slots() {
let keypair = Keypair::new();
let slot = 53;
let slot_parent = slot - 5;
let shred_version = 21;
let mut slots = RestartLastVotedForkSlots::new(
keypair.pubkey(),
timestamp(),
Hash::default(),
shred_version,
);
let original_slots_vec = [slot_parent, slot];
slots.fill(&original_slots_vec);
let value =
CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(slots.clone()), &keypair);
assert_eq!(value.sanitize(), Ok(()));
let label = value.label();
assert_eq!(
label,
CrdsValueLabel::RestartLastVotedForkSlots(keypair.pubkey())
);
assert_eq!(label.pubkey(), keypair.pubkey());
assert_eq!(value.wallclock(), slots.wallclock);
let retrived_slots = slots.to_slots(0);
assert_eq!(retrived_slots.len(), 2);
assert_eq!(retrived_slots[0], slot_parent);
assert_eq!(retrived_slots[1], slot);

let empty_slots = RestartLastVotedForkSlots::new(
keypair.pubkey(),
timestamp(),
Hash::default(),
shred_version,
);
let bad_value =
CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(empty_slots), &keypair);
assert_eq!(bad_value.sanitize(), Err(SanitizeError::InvalidValue));

let last_slot: Slot = (MAX_SLOTS_PER_ENTRY + 10).try_into().unwrap();
let mut large_slots = RestartLastVotedForkSlots::new(
keypair.pubkey(),
timestamp(),
Hash::default(),
shred_version,
);
let large_slots_vec: Vec<Slot> = (0..last_slot + 1).collect();
large_slots.fill(&large_slots_vec);
let retrived_slots = large_slots.to_slots(0);
assert_eq!(retrived_slots.len(), MAX_SLOTS_PER_ENTRY);
assert_eq!(retrived_slots.first(), Some(&11));
assert_eq!(retrived_slots.last(), Some(&last_slot));
}
}
4 changes: 2 additions & 2 deletions gossip/src/epoch_slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
},
};

const MAX_SLOTS_PER_ENTRY: usize = 2048 * 8;
pub(crate) const MAX_SLOTS_PER_ENTRY: usize = 2048 * 8;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample)]
pub struct Uncompressed {
pub first_slot: Slot,
Expand Down Expand Up @@ -178,7 +178,7 @@ impl Default for CompressedSlots {
}

impl CompressedSlots {
fn new(max_size: usize) -> Self {
pub(crate) fn new(max_size: usize) -> Self {
CompressedSlots::Uncompressed(Uncompressed::new(max_size))
}

Expand Down