Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RestartLastVotedForkSlots for wen_restart. #33239

Merged
merged 20 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ce81bbc
Add RestartLastVotedForkSlots and RestartHeaviestFork for wen_restart.
wen-coding Sep 13, 2023
0edf40e
Fix linter errors.
wen-coding Sep 13, 2023
1ec981b
Merge branch 'master' into wen_restart_gossip_change
wen-coding Sep 13, 2023
a3749b2
Revert RestartHeaviestFork, it will be added in another PR.
wen-coding Sep 13, 2023
79bedf8
Merge branch 'master' into wen_restart_gossip_change
wen-coding Sep 13, 2023
719534d
Merge branch 'master' into wen_restart_gossip_change
wen-coding Sep 14, 2023
25e0117
Update frozen abi message.
wen-coding Sep 15, 2023
1fcc81d
Fix wrong number in test generation, change to pub(crate) to limit sc…
wen-coding Sep 15, 2023
b7077a7
Separate push_epoch_slots and push_restart_last_voted_fork_slots.
wen-coding Sep 15, 2023
354673a
Add RestartLastVotedForkSlots data structure.
wen-coding Sep 17, 2023
0028f64
Remove unused parts to make PR smaller.
wen-coding Sep 18, 2023
46d2054
Remove unused clone.
wen-coding Sep 18, 2023
cc11d42
Use CompressedSlotsVec to share code between EpochSlots and RestartLa…
wen-coding Sep 18, 2023
b0cead0
Merge branch 'master' into wen_restart_gossip_change
wen-coding Sep 18, 2023
5b7a724
Add total_messages to show how many messages are there.
wen-coding Oct 2, 2023
4821d8a
Merge branch 'master' into wen_restart_gossip_change
wen-coding Oct 2, 2023
ae8d01d
Reduce RestartLastVotedForkSlots to one packet (16k slots).
wen-coding Oct 2, 2023
1ee2699
Replace last_vote_slot with shred_version, revert CompressedSlotsVec.
wen-coding Oct 6, 2023
78a9ded
Merge branch 'master' into wen_restart_gossip_change
wen-coding Oct 9, 2023
866228b
Merge branch 'master' into wen_restart_gossip_change
wen-coding Oct 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "EnbW8mYTsPMndq9NkHLTkHJgduXvWSfSD6bBdmqQ8TiF")]
#[frozen_abi(digest = "DxPH4Cj8QHbGb8daVf8uX3HckPkB9XqKE2AHtQzYYZLr")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
Expand Down Expand Up @@ -397,7 +397,8 @@ fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) {
CrdsData::AccountsHashes(_) => true,
CrdsData::LowestSlot(_, _)
| CrdsData::LegacyVersion(_)
| CrdsData::DuplicateShred(_, _) => {
| CrdsData::DuplicateShred(_, _)
| CrdsData::RestartLastVotedForkSlots(_, _) => {
let stake = stakes.get(&value.pubkey()).copied();
stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP
}
Expand Down
20 changes: 20 additions & 0 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,16 @@ pub(crate) fn submit_gossip_stats(
("SnapshotHashes-pull", crds_stats.pull.counts[10], i64),
("ContactInfo-push", crds_stats.push.counts[11], i64),
("ContactInfo-pull", crds_stats.pull.counts[11], i64),
(
"RestartLastVotedForkSlots-push",
crds_stats.push.counts[12],
i64
),
(
"RestartLastVotedForkSlots-pull",
crds_stats.pull.counts[12],
i64
),
(
"all-push",
crds_stats.push.counts.iter().sum::<usize>(),
Expand Down Expand Up @@ -664,6 +674,16 @@ pub(crate) fn submit_gossip_stats(
("SnapshotHashes-pull", crds_stats.pull.fails[10], i64),
("ContactInfo-push", crds_stats.push.fails[11], i64),
("ContactInfo-pull", crds_stats.pull.fails[11], i64),
(
"RestartLastVotedForkSlots-push",
crds_stats.push.fails[12],
i64
),
(
"RestartLastVotedForkSlots-pull",
crds_stats.pull.fails[12],
i64
),
("all-push", crds_stats.push.fails.iter().sum::<usize>(), i64),
("all-pull", crds_stats.pull.fails.iter().sum::<usize>(), i64),
);
Expand Down
3 changes: 2 additions & 1 deletion gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub enum GossipRoute<'a> {
PushMessage(/*from:*/ &'a Pubkey),
}

type CrdsCountsArray = [usize; 12];
type CrdsCountsArray = [usize; 13];

pub(crate) struct CrdsDataStats {
pub(crate) counts: CrdsCountsArray,
Expand Down Expand Up @@ -721,6 +721,7 @@ impl CrdsDataStats {
CrdsData::DuplicateShred(_, _) => 9,
CrdsData::SnapshotHashes(_) => 10,
CrdsData::ContactInfo(_) => 11,
CrdsData::RestartLastVotedForkSlots(_, _) => 12,
// Update CrdsCountsArray if new items are added here.
}
}
Expand Down
105 changes: 103 additions & 2 deletions gossip/src/crds_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
contact_info::ContactInfo,
deprecated,
duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
epoch_slots::EpochSlots,
epoch_slots::{CompressedSlotsVec, EpochSlots},
legacy_contact_info::LegacyContactInfo,
},
bincode::{serialize, serialized_size},
Expand Down Expand Up @@ -38,6 +38,8 @@ pub const MAX_VOTES: VoteIndex = 32;

pub type EpochSlotsIndex = u8;
pub const MAX_EPOCH_SLOTS: EpochSlotsIndex = 255;
// We now keep 81000 slots, 81000/MAX_SLOTS_PER_ENTRY = 5.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this 81000 coming from?
These CRDS values aren't really cheap. Do we really need this many?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number comes from solana-foundation/solana-improvement-documents#46.

During an outage we don't know each other's status, so there is no way to know how lagged others are. So we send out 9 hours of slots, hoping that we can find out about an outage in 9 hours.

The values are big, but we thought it was okay because these messages are only transmitted during an outage. Later after I add the command line --wen_restart flag I can come back and filter out these CRDS values during non-restart mode if that makes you feel safer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

81_000 slots is ~5MB of data per node in the cluster!!!

Also, I am not sure if a node is 81_000 slots behind other nodes, it can repair all those slots, so what is the point?
I think we really need to revise this number lower.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it, 9 hours is chosen because in the past our outage resolution normally takes 4~7 hours just for the validator restart part. And we add a few hours to be on the cautious side. Not every validator operator has a pager, but most of them will tend to things within 9 hours.

Sending 81k slots on the Gossip messages doesn't mean you need to repair all those slots. Most of them are probably older than your local root so you will instantly dump it on the floor.

The tricky part here is you don't know what the status of others are so you really want to send more than they probably need to make the restart successful.

Copy link
Contributor

@behzadnouri behzadnouri Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending 81k slots on the Gossip messages doesn't mean you need to repair all those slots. Most of them are probably older than your local root so you will instantly dump it on the floor.

There are 2 different scenarios:

  • If a node has some of the intermediate slots, then it can already repair missing slots and complete the chain. No need to send 81k slots to it.
  • If a node does not have any of the intermediate slots, do we really expect it to be able to repair 81k slots?

Even before all these, why do we need to send the parent slots to begin with?
We are already able to repair a fork if we are missing some of the parent slots in the fork by sending repair orphan request:
https://github.com/solana-labs/solana/blob/bc2b37276/core/src/repair/serve_repair.rs#L240-L243
Why do we need to duplicate that protocol here? The node can already recover the whole fork by repairing off the last slot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I think 81k slots is less than 5MB, because it's a bitmap. I assume the current MAX_SLOTS_PER_ENTRY is chosen so each EpochSlots piece fits within a 1460 byte UDP packet, and MAX_RESTART_LAST_VOTED_FORK_SLOTS is 5, which is ~7KB per validator? Let's say it's a 64K UDP packet, that's still smaller than 5MB per validator.

How much memory does the current EpochSlots messages occupy? Considering MAX_EPOCH_SLOTS is 255, this should be much smaller than that.

Is my calculation off?

Also, if the whole cluster is stuck, I think operators will be okay with consuming some bandwidth to quickly bring the cluster back up. If sending 81k slot information means they actually don't need to repair all the slots, maybe we would even save more on the repair bandwidth.

I think the "repair a batch at a time backwards" method you mentioned sounds interesting, but you never know what random validators can throw at you, so a few validators throwing you slots into the far future can make you really busy at repair for quite some time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@behzad assume the worst, turbine / repair is congested (similar to last outage) this protocol will always find the OC restart slot, which is prone to human error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AshwinSekar gossip has a lot more load than turbine and repair combined. I don't think we can solve congestion on one protocol by adding more load to the one already more overloaded.

@wen-coding When this was initially discussed we were talking about only 2 new crds value. This commit itself is adding 5 new pretty big values where each one fills up a packet.
It also has some overlap with already existing repair orphan request and I am not convinced this duplication will buy us anything in practice.
The design that you cannot tell slots.last() is not the parent of last_voted_slot and you rely on receiving all crds values is also not ideal because gossip does not provide such guarantees.
Can you revisit the design addressing these issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought about it more, I think you had a point about we probably do not need to send 81k slots in most cases. Reading previous outage reports, validators around the world normally entered the outage at about the same time so the difference of their roots is normally much smaller than duration of the outages.

That said, I still want to play it safe and send enough slots to rescue the occasional outliers back, so I propose lowering the original 81k number to MAX_SLOTS_PER_ENTRY which is 16384. This lowers the number of packets per validator from 5 to 1, and the code handling the packet don't need to deal with holes in between any more.

@carllin @mvines @t-nelson Are you okay with lowering 81k slots to 16k slots given the reasoning above?

@behzadnouri are you okay with it given we are only adding 1 RestartLastVotedForkSlots per validator now?

regarding "some overlap with already existing repair orphan request", I think:

  1. If we are in an outage, apparently that's because repair orphan request didn't save us
  2. Repair orphan request can figure out parent relationship between blocks and repair missing blocks on the fork, but it's not a global view so it can't tell us whether some blocks can be ignored. To quickly reach consensus between validators you do need a global view, which is something the current repair service can't give you, you do need some sort of global consensus to make validators agree on something again.

I'd be happy to organize a quick meeting if we can't reach agreement over github reviews.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think thats ok, with only one single such crds value the parent is also inferred, which should address @behzadnouri's concern

pub(crate) const MAX_RESTART_LAST_VOTED_FORK_SLOTS: EpochSlotsIndex = 5;

/// CrdsValue that is replicated across the cluster
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample)]
Expand Down Expand Up @@ -94,6 +96,7 @@ pub enum CrdsData {
DuplicateShred(DuplicateShredIndex, DuplicateShred),
SnapshotHashes(SnapshotHashes),
ContactInfo(ContactInfo),
RestartLastVotedForkSlots(EpochSlotsIndex, RestartLastVotedForkSlots),
}

impl Sanitize for CrdsData {
Expand Down Expand Up @@ -132,6 +135,12 @@ impl Sanitize for CrdsData {
}
CrdsData::SnapshotHashes(val) => val.sanitize(),
CrdsData::ContactInfo(node) => node.sanitize(),
CrdsData::RestartLastVotedForkSlots(ix, slots) => {
if *ix >= MAX_RESTART_LAST_VOTED_FORK_SLOTS {
return Err(SanitizeError::ValueOutOfBounds);
}
slots.sanitize()
}
}
}
}
Expand All @@ -145,7 +154,7 @@ pub(crate) fn new_rand_timestamp<R: Rng>(rng: &mut R) -> u64 {
impl CrdsData {
/// New random CrdsData for tests and benchmarks.
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> CrdsData {
let kind = rng.gen_range(0..7);
let kind = rng.gen_range(0..8);
// TODO: Implement other kinds of CrdsData here.
// TODO: Assign ranges to each arm proportional to their frequency in
// the mainnet crds table.
Expand All @@ -157,6 +166,10 @@ impl CrdsData {
3 => CrdsData::AccountsHashes(AccountsHashes::new_rand(rng, pubkey)),
4 => CrdsData::Version(Version::new_rand(rng, pubkey)),
5 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)),
6 => CrdsData::RestartLastVotedForkSlots(
rng.gen_range(0..MAX_RESTART_LAST_VOTED_FORK_SLOTS),
RestartLastVotedForkSlots::new_rand(rng, pubkey),
),
_ => CrdsData::EpochSlots(
rng.gen_range(0..MAX_EPOCH_SLOTS),
EpochSlots::new_rand(rng, pubkey),
Expand Down Expand Up @@ -485,6 +498,50 @@ impl Sanitize for NodeInstance {
}
}

#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, AbiExample, Debug)]
pub struct RestartLastVotedForkSlots {
pub from: Pubkey,
pub wallclock: u64,
pub slots: CompressedSlotsVec,
pub last_voted_slot: Slot,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this new struct, do we still need last_voted_slot?
Can it be just slots.last()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

pub last_voted_hash: Hash,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add shred_version here too.
It is only 2 bytes, and we need to filter on matching shred_version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


impl Sanitize for RestartLastVotedForkSlots {
fn sanitize(&self) -> std::result::Result<(), SanitizeError> {
self.slots.sanitize()?;
self.last_voted_hash.sanitize()
}
}

impl RestartLastVotedForkSlots {
pub fn new(from: Pubkey, now: u64, last_voted_slot: Slot, last_voted_hash: Hash) -> Self {
Self {
from,
wallclock: now,
slots: CompressedSlotsVec::new(),
last_voted_slot,
last_voted_hash,
}
}

/// New random Version for tests and benchmarks.
pub fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
Self {
from: pubkey,
wallclock: new_rand_timestamp(rng),
slots: CompressedSlotsVec::new_rand(rng),
last_voted_slot: rng.gen_range(0..512),
last_voted_hash: Hash::new_unique(),
}
}

pub fn fill(&mut self, update: &[Slot]) -> usize {
self.slots.fill(update)
}
}

/// Type of the replicated value
/// These are labels for values in a record that is associated with `Pubkey`
#[derive(PartialEq, Hash, Eq, Clone, Debug)]
Expand All @@ -501,6 +558,7 @@ pub enum CrdsValueLabel {
DuplicateShred(DuplicateShredIndex, Pubkey),
SnapshotHashes(Pubkey),
ContactInfo(Pubkey),
RestartLastVotedForkSlots(EpochSlotsIndex, Pubkey),
}

impl fmt::Display for CrdsValueLabel {
Expand All @@ -524,6 +582,9 @@ impl fmt::Display for CrdsValueLabel {
write!(f, "SnapshotHashes({})", self.pubkey())
}
CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()),
CrdsValueLabel::RestartLastVotedForkSlots(ix, _) => {
write!(f, "RestartLastVotedForkSlots({}, {})", ix, self.pubkey())
}
}
}
}
Expand All @@ -543,6 +604,7 @@ impl CrdsValueLabel {
CrdsValueLabel::DuplicateShred(_, p) => *p,
CrdsValueLabel::SnapshotHashes(p) => *p,
CrdsValueLabel::ContactInfo(pubkey) => *pubkey,
CrdsValueLabel::RestartLastVotedForkSlots(_, p) => *p,
}
}
}
Expand Down Expand Up @@ -593,6 +655,7 @@ impl CrdsValue {
CrdsData::DuplicateShred(_, shred) => shred.wallclock,
CrdsData::SnapshotHashes(hash) => hash.wallclock,
CrdsData::ContactInfo(node) => node.wallclock(),
CrdsData::RestartLastVotedForkSlots(_, slots) => slots.wallclock,
}
}
pub fn pubkey(&self) -> Pubkey {
Expand All @@ -609,6 +672,7 @@ impl CrdsValue {
CrdsData::DuplicateShred(_, shred) => shred.from,
CrdsData::SnapshotHashes(hash) => hash.from,
CrdsData::ContactInfo(node) => *node.pubkey(),
CrdsData::RestartLastVotedForkSlots(_, slots) => slots.from,
}
}
pub fn label(&self) -> CrdsValueLabel {
Expand All @@ -627,6 +691,9 @@ impl CrdsValue {
CrdsData::DuplicateShred(ix, shred) => CrdsValueLabel::DuplicateShred(*ix, shred.from),
CrdsData::SnapshotHashes(_) => CrdsValueLabel::SnapshotHashes(self.pubkey()),
CrdsData::ContactInfo(node) => CrdsValueLabel::ContactInfo(*node.pubkey()),
CrdsData::RestartLastVotedForkSlots(ix, _) => {
CrdsValueLabel::RestartLastVotedForkSlots(*ix, self.pubkey())
}
}
}
pub fn contact_info(&self) -> Option<&LegacyContactInfo> {
Expand Down Expand Up @@ -1073,4 +1140,38 @@ mod test {
assert!(node.should_force_push(&pubkey));
assert!(!node.should_force_push(&Pubkey::new_unique()));
}

#[test]
fn test_restart_last_voted_fork_slots() {
let keypair = Keypair::new();
let slot = 53;
let slot_parent = slot - 5;
let mut slots =
RestartLastVotedForkSlots::new(keypair.pubkey(), timestamp(), slot, Hash::default());
let original_slots_vec = [slot_parent, slot];
slots.fill(&original_slots_vec);
let ix = 1;
let value = CrdsValue::new_signed(
CrdsData::RestartLastVotedForkSlots(ix, slots.clone()),
&keypair,
);
assert_eq!(value.sanitize(), Ok(()));
let label = value.label();
assert_eq!(
label,
CrdsValueLabel::RestartLastVotedForkSlots(ix, keypair.pubkey())
);
assert_eq!(label.pubkey(), keypair.pubkey());
assert_eq!(value.wallclock(), slots.wallclock);
let retrived_slots = slots.slots.to_slots(0);
assert_eq!(retrived_slots.len(), 2);
assert_eq!(retrived_slots[0], slot_parent);
assert_eq!(retrived_slots[1], slot);

let bad_value = CrdsValue::new_signed(
CrdsData::RestartLastVotedForkSlots(MAX_RESTART_LAST_VOTED_FORK_SLOTS, slots),
&keypair,
);
assert_eq!(bad_value.sanitize(), Err(SanitizeError::ValueOutOfBounds))
}
}
Loading