-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Add RestartLastVotedForkSlots for wen_restart. #33239
Changes from 6 commits
ce81bbc
0edf40e
1ec981b
a3749b2
79bedf8
719534d
25e0117
1fcc81d
b7077a7
354673a
0028f64
46d2054
cc11d42
b0cead0
5b7a724
4821d8a
ae8d01d
1ee2699
78a9ded
866228b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -397,7 +397,8 @@ fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) { | |
CrdsData::AccountsHashes(_) => true, | ||
CrdsData::LowestSlot(_, _) | ||
| CrdsData::LegacyVersion(_) | ||
| CrdsData::DuplicateShred(_, _) => { | ||
| CrdsData::DuplicateShred(_, _) | ||
| CrdsData::RestartLastVotedForkSlots(_, _, _, _) => { | ||
let stake = stakes.get(&value.pubkey()).copied(); | ||
stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP | ||
} | ||
|
@@ -722,9 +723,8 @@ impl ClusterInfo { | |
self.my_contact_info.read().unwrap().shred_version() | ||
} | ||
|
||
fn lookup_epoch_slots(&self, ix: EpochSlotsIndex) -> EpochSlots { | ||
fn lookup_epoch_slots(&self, label: CrdsValueLabel) -> EpochSlots { | ||
let self_pubkey = self.id(); | ||
let label = CrdsValueLabel::EpochSlots(ix, self_pubkey); | ||
let gossip_crds = self.gossip.crds.read().unwrap(); | ||
gossip_crds | ||
.get::<&CrdsValue>(&label) | ||
|
@@ -899,16 +899,42 @@ impl ClusterInfo { | |
} | ||
} | ||
|
||
pub fn push_epoch_slots(&self, update: &[Slot]) { | ||
self.push_epoch_slots_or_restart_last_voted_fork_slots(update, None) | ||
} | ||
|
||
pub fn push_restart_last_voted_fork_slots(&self, update: &[Slot], last_vote_bankhash: Hash) { | ||
self.push_epoch_slots_or_restart_last_voted_fork_slots(update, Some(last_vote_bankhash)) | ||
} | ||
|
||
// TODO: If two threads call into this function then epoch_slot_index has a | ||
// race condition and the threads will overwrite each other in crds table. | ||
pub fn push_epoch_slots(&self, mut update: &[Slot]) { | ||
fn push_epoch_slots_or_restart_last_voted_fork_slots( | ||
&self, | ||
mut update: &[Slot], | ||
last_vote_bankhash: Option<Hash>, | ||
) { | ||
let is_epoch_slot = last_vote_bankhash.is_none(); | ||
let self_pubkey = self.id(); | ||
let create_label = |ix| { | ||
if is_epoch_slot { | ||
CrdsValueLabel::EpochSlots(ix, self_pubkey) | ||
} else { | ||
CrdsValueLabel::RestartLastVotedForkSlots(ix, self_pubkey) | ||
} | ||
}; | ||
let max_entries = if is_epoch_slot { | ||
crds_value::MAX_EPOCH_SLOTS | ||
} else { | ||
crds_value::MAX_RESTART_LAST_VOTED_FORK_SLOTS | ||
}; | ||
let last_vote_slot = last_vote_bankhash.map(|_| *update.last().unwrap()); | ||
let current_slots: Vec<_> = { | ||
let gossip_crds = | ||
self.time_gossip_read_lock("lookup_epoch_slots", &self.stats.epoch_slots_lookup); | ||
(0..crds_value::MAX_EPOCH_SLOTS) | ||
(0..max_entries) | ||
.filter_map(|ix| { | ||
let label = CrdsValueLabel::EpochSlots(ix, self_pubkey); | ||
let label = create_label(ix); | ||
let crds_value = gossip_crds.get::<&CrdsValue>(&label)?; | ||
let epoch_slots = crds_value.epoch_slots()?; | ||
let first_slot = epoch_slots.first_slot()?; | ||
|
@@ -922,17 +948,19 @@ impl ClusterInfo { | |
.min() | ||
.unwrap_or_default(); | ||
let max_slot: Slot = update.iter().max().cloned().unwrap_or(0); | ||
let total_slots = max_slot as isize - min_slot as isize; | ||
// WARN if CRDS is not storing at least a full epoch worth of slots | ||
if DEFAULT_SLOTS_PER_EPOCH as isize > total_slots | ||
&& crds_value::MAX_EPOCH_SLOTS as usize <= current_slots.len() | ||
{ | ||
self.stats.epoch_slots_filled.add_relaxed(1); | ||
warn!( | ||
"EPOCH_SLOTS are filling up FAST {}/{}", | ||
total_slots, | ||
current_slots.len() | ||
); | ||
if is_epoch_slot { | ||
let total_slots = max_slot as isize - min_slot as isize; | ||
// WARN if CRDS is not storing at least a full epoch worth of slots | ||
if DEFAULT_SLOTS_PER_EPOCH as isize > total_slots | ||
&& crds_value::MAX_EPOCH_SLOTS as usize <= current_slots.len() | ||
{ | ||
self.stats.epoch_slots_filled.add_relaxed(1); | ||
warn!( | ||
"EPOCH_SLOTS are filling up FAST {}/{}", | ||
total_slots, | ||
current_slots.len() | ||
); | ||
} | ||
} | ||
let mut reset = false; | ||
let mut epoch_slot_index = match current_slots.iter().max() { | ||
|
@@ -942,18 +970,27 @@ impl ClusterInfo { | |
let mut entries = Vec::default(); | ||
let keypair = self.keypair(); | ||
while !update.is_empty() { | ||
let ix = epoch_slot_index % crds_value::MAX_EPOCH_SLOTS; | ||
let ix = epoch_slot_index % max_entries; | ||
let now = timestamp(); | ||
let mut slots = if !reset { | ||
self.lookup_epoch_slots(ix) | ||
self.lookup_epoch_slots(create_label(ix)) | ||
} else { | ||
EpochSlots::new(self_pubkey, now) | ||
}; | ||
let n = slots.fill(update, now); | ||
update = &update[n..]; | ||
if n > 0 { | ||
let epoch_slots = CrdsData::EpochSlots(ix, slots); | ||
let entry = CrdsValue::new_signed(epoch_slots, &keypair); | ||
let data = if is_epoch_slot { | ||
CrdsData::EpochSlots(ix, slots) | ||
} else { | ||
CrdsData::RestartLastVotedForkSlots( | ||
ix, | ||
slots, | ||
last_vote_slot.unwrap(), | ||
last_vote_bankhash.unwrap(), | ||
) | ||
}; | ||
let entry = CrdsValue::new_signed(data, &keypair); | ||
entries.push(entry); | ||
} | ||
epoch_slot_index += 1; | ||
|
@@ -963,7 +1000,10 @@ impl ClusterInfo { | |
let now = timestamp(); | ||
for entry in entries { | ||
if let Err(err) = gossip_crds.insert(entry, now, GossipRoute::LocalMessage) { | ||
error!("push_epoch_slots failed: {:?}", err); | ||
error!( | ||
"push_epoch_slots_or_restart_last_voted_fork_slots failed: {:?}", | ||
err | ||
); | ||
} | ||
} | ||
} | ||
|
@@ -1243,6 +1283,29 @@ impl ClusterInfo { | |
.collect() | ||
} | ||
|
||
/// Returns last-voted-fork-slots inserted since the given cursor. | ||
/// Excludes entries from nodes with unkown or different shred version. | ||
pub fn get_restart_last_voted_fork_slots( | ||
&self, | ||
cursor: &mut Cursor, | ||
) -> Vec<(EpochSlotsIndex, EpochSlots, Slot, Hash)> { | ||
let self_shred_version = Some(self.my_shred_version()); | ||
let gossip_crds = self.gossip.crds.read().unwrap(); | ||
gossip_crds | ||
.get_restart_last_voted_fork_slots(cursor) | ||
.filter(|entry| { | ||
let origin = entry.value.pubkey(); | ||
gossip_crds.get_shred_version(&origin) == self_shred_version | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we need this check, shouldn't shred version be included in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @wen-coding do we need to include There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code was copied from get_epoch_slots, so I think all other Gossip messages which check whether shred_version matches would have the same problem, since we could always have a lag on ContactInfo shred-version. So shred_version is just a sanity check, because it is so easy to circumvent in a real attack. But it's nice to have for naive users pointing mainnet validators at testnet or vice versa. I could go either way here, but would prefer to not include shred_version in RestartLastVotedForkSlots for brevity. Using a contact-info shred_version which may occasionally go out of sync is fine, we will drop some messages but can pick them up later I believe? For RestartLastVotedForkSlots since we draw a line between Gossip messages in restart and those not in restart, this check is less important. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
most of those which check shred-version are node's sockets which are obtained from the same contact-info message which has shred-version.
I don't understand this, why? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, maybe it is overstatement to say this check is less important. The shred_version change was put in mainly to prevent RestartLastVotedForkSlots messages from occupying CRDS table space in non-restarting validators because they are big, but later we decided to add extra check "normal validators just don't accept RestartLastVotedForkSlots at all no matter what shred_version it has", so the shred_version change becomes an extra protection. Let's say one validator has its contact info propagating slower than RestartLastVotedForkSlots, so some neighbors dropped its RestartLastVotedForkSlots because of shred_version mismatch. I was hoping newly restarted validators will do a Gossip pull for all messages and somehow help spread the lost RestartLastVotedForkSlots messages to others? If that's not the case then it might be better to add shred_version in the messages themselves. |
||
}) | ||
.map(|entry| match &entry.value.data { | ||
CrdsData::RestartLastVotedForkSlots(index, slots, last_vote, last_vote_hash) => { | ||
(*index, slots.clone(), *last_vote, *last_vote_hash) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A tuple with 4 fields is not ideal. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed. |
||
} | ||
_ => panic!("this should not happen!"), | ||
}) | ||
.collect() | ||
} | ||
|
||
/// Returns duplicate-shreds inserted since the given cursor. | ||
pub(crate) fn get_duplicate_shreds(&self, cursor: &mut Cursor) -> Vec<DuplicateShred> { | ||
let gossip_crds = self.gossip.crds.read().unwrap(); | ||
|
@@ -4003,6 +4066,75 @@ mod tests { | |
assert_eq!(slots[1].from, node_pubkey); | ||
} | ||
|
||
#[test] | ||
fn test_push_restart_last_voted_fork_slots() { | ||
solana_logger::setup(); | ||
let keypair = Arc::new(Keypair::new()); | ||
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); | ||
let cluster_info = ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified); | ||
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); | ||
assert!(slots.is_empty()); | ||
let mut update: Vec<Slot> = vec![0]; | ||
for i in 0..81 { | ||
for j in 0..1000 { | ||
update.push(i * 1050 + j); | ||
} | ||
} | ||
cluster_info.push_restart_last_voted_fork_slots(&update, Hash::default()); | ||
|
||
let mut cursor = Cursor::default(); | ||
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut cursor); | ||
assert_eq!(slots.len(), 2); | ||
assert_eq!(slots[0].1.to_slots(0).len(), 42468); | ||
assert_eq!(slots[1].1.to_slots(0).len(), 38532); | ||
|
||
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut cursor); | ||
assert!(slots.is_empty()); | ||
|
||
// Test with different shred versions. | ||
let mut rng = rand::thread_rng(); | ||
let node_pubkey = Pubkey::new_unique(); | ||
let mut node = LegacyContactInfo::new_rand(&mut rng, Some(node_pubkey)); | ||
node.set_shred_version(42); | ||
let epoch_slots = EpochSlots::new_rand(&mut rng, Some(node_pubkey)); | ||
let entries = vec![ | ||
CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(node)), | ||
CrdsValue::new_unsigned(CrdsData::RestartLastVotedForkSlots( | ||
0, | ||
epoch_slots, | ||
0, | ||
Hash::default(), | ||
)), | ||
]; | ||
{ | ||
let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); | ||
for entry in entries { | ||
assert!(gossip_crds | ||
.insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) | ||
.is_ok()); | ||
} | ||
} | ||
// Should exclude other node's last-voted-fork-slot because of different | ||
// shred-version. | ||
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); | ||
assert_eq!(slots.len(), 2); | ||
assert_eq!(slots[0].1.from, cluster_info.id()); | ||
assert_eq!(slots[1].1.from, cluster_info.id()); | ||
// Match shred versions. | ||
{ | ||
let mut node = cluster_info.my_contact_info.write().unwrap(); | ||
node.set_shred_version(42); | ||
} | ||
cluster_info.push_self(); | ||
cluster_info.flush_push_queue(); | ||
// Should now include both epoch slots. | ||
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); | ||
assert_eq!(slots.len(), 3); | ||
assert_eq!(slots[0].1.from, cluster_info.id()); | ||
assert_eq!(slots[1].1.from, cluster_info.id()); | ||
assert_eq!(slots[2].1.from, node_pubkey); | ||
} | ||
|
||
#[test] | ||
fn test_append_entrypoint_to_pulls() { | ||
let thread_pool = ThreadPoolBuilder::new().build().unwrap(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you mixing this value with
EpochSlots
?This function has become more convoluted with several
if is_epoch_slot
branches.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split into two functions now.