Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only push latest snapshot hashes in SnapshotGossipManager #31154

Merged
merged 1 commit into from
Apr 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 103 additions & 72 deletions core/src/snapshot_packager_service/snapshot_gossip_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use {
solana_gossip::cluster_info::ClusterInfo,
solana_runtime::{
snapshot_hash::{
FullSnapshotHash, FullSnapshotHashes, IncrementalSnapshotHash,
IncrementalSnapshotHashes, SnapshotHash, StartingSnapshotHashes,
FullSnapshotHash, FullSnapshotHashes, SnapshotHash, StartingSnapshotHashes,
},
snapshot_package::{retain_max_n_elements, SnapshotType},
},
Expand All @@ -14,10 +13,9 @@ use {
/// Manage pushing snapshot hash information to gossip
pub struct SnapshotGossipManager {
cluster_info: Arc<ClusterInfo>,
latest_snapshot_hashes: Option<LatestSnapshotHashes>,
max_full_snapshot_hashes: usize,
max_incremental_snapshot_hashes: usize,
full_snapshot_hashes: FullSnapshotHashes,
incremental_snapshot_hashes: IncrementalSnapshotHashes,
legacy_full_snapshot_hashes: FullSnapshotHashes,
Comment on lines 17 to +18
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually both of these fields will go away. The exist currently to support LegacySnapshotHashes.

}

impl SnapshotGossipManager {
Expand All @@ -26,120 +24,153 @@ impl SnapshotGossipManager {
pub fn new(
cluster_info: Arc<ClusterInfo>,
max_full_snapshot_hashes: usize,
max_incremental_snapshot_hashes: usize,
_max_incremental_snapshot_hashes: usize,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left this parameter here to make the diff smaller. I have a subsequent PR that removes it.

) -> Self {
SnapshotGossipManager {
cluster_info,
latest_snapshot_hashes: None,
max_full_snapshot_hashes,
max_incremental_snapshot_hashes,
full_snapshot_hashes: FullSnapshotHashes {
hashes: Vec::default(),
},
incremental_snapshot_hashes: IncrementalSnapshotHashes {
base: (Slot::default(), SnapshotHash(Hash::default())),
legacy_full_snapshot_hashes: FullSnapshotHashes {
hashes: Vec::default(),
},
}
}

/// If there were starting snapshot hashes, add those to their respective vectors, then push
/// those vectors to the cluster via CRDS.
/// Push any starting snapshot hashes to the cluster via CRDS
pub fn push_starting_snapshot_hashes(
&mut self,
starting_snapshot_hashes: Option<StartingSnapshotHashes>,
) {
if let Some(starting_snapshot_hashes) = starting_snapshot_hashes {
let starting_full_snapshot_hash = starting_snapshot_hashes.full;
self.push_full_snapshot_hash(starting_full_snapshot_hash);
let Some(starting_snapshot_hashes) = starting_snapshot_hashes else {
return;
};
Comment on lines +44 to +46
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was originally coded before the let-else chains were added to Rust. I find this impl to be more readable. Also, it'll make the next PR where I move this fn into new() a bit more obvious.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anything that reduces nesting gets a thumbs up from me


if let Some(starting_incremental_snapshot_hash) = starting_snapshot_hashes.incremental {
self.push_incremental_snapshot_hash(starting_incremental_snapshot_hash);
};
self.update_latest_full_snapshot_hash(starting_snapshot_hashes.full.hash);
if let Some(starting_incremental_snapshot_hash) = starting_snapshot_hashes.incremental {
self.update_latest_incremental_snapshot_hash(
starting_incremental_snapshot_hash.hash,
starting_incremental_snapshot_hash.base.0,
);
}
self.push_latest_snapshot_hashes_to_cluster();

// Handle legacy snapshot hashes here too
// Once LegacySnapshotHashes are removed from CRDS, also remove them here
self.push_legacy_full_snapshot_hash(starting_snapshot_hashes.full);
}

/// Add `snapshot_hash` to its respective vector of hashes, then push that vector to the
/// cluster via CRDS.
/// Push new snapshot hash to the cluster via CRDS
pub fn push_snapshot_hash(
bw-solana marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
snapshot_type: SnapshotType,
snapshot_hash: (Slot, SnapshotHash),
) {
match snapshot_type {
SnapshotType::FullSnapshot => {
self.push_full_snapshot_hash(FullSnapshotHash {
hash: snapshot_hash,
});
self.push_full_snapshot_hash(snapshot_hash);
}
SnapshotType::IncrementalSnapshot(base_slot) => {
let latest_full_snapshot_hash = *self.full_snapshot_hashes.hashes.last().unwrap();
assert_eq!(
base_slot, latest_full_snapshot_hash.0,
"the incremental snapshot's base slot ({}) must match the latest full snapshot hash's slot ({})",
base_slot, latest_full_snapshot_hash.0,
);
Comment on lines -76 to -80
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert is moved, not deleted.

(Now is in update_incremental_snapshot_hash(), which is called by push_incremental_snapshot_hash.)

self.push_incremental_snapshot_hash(IncrementalSnapshotHash {
base: latest_full_snapshot_hash,
hash: snapshot_hash,
});
self.push_incremental_snapshot_hash(snapshot_hash, base_slot);
}
}
}

/// Add `full_snapshot_hash` to the vector of full snapshot hashes, then push that vector to
/// the cluster via CRDS.
fn push_full_snapshot_hash(&mut self, full_snapshot_hash: FullSnapshotHash) {
self.full_snapshot_hashes
.hashes
.push(full_snapshot_hash.hash);

retain_max_n_elements(
&mut self.full_snapshot_hashes.hashes,
self.max_full_snapshot_hashes,
);
/// Push new full snapshot hash to the cluster via CRDS
fn push_full_snapshot_hash(&mut self, full_snapshot_hash: (Slot, SnapshotHash)) {
self.update_latest_full_snapshot_hash(full_snapshot_hash);
self.push_latest_snapshot_hashes_to_cluster();

self.cluster_info
.push_legacy_snapshot_hashes(clone_hashes_for_crds(&self.full_snapshot_hashes.hashes));
// Handle legacy snapshot hashes here too
// Once LegacySnapshotHashes are removed from CRDS, also remove them here
self.push_legacy_full_snapshot_hash(FullSnapshotHash {
hash: full_snapshot_hash,
});
}

/// Add `incremental_snapshot_hash` to the vector of incremental snapshot hashes, then push
/// that vector to the cluster via CRDS.
/// Push new incremental snapshot hash to the cluster via CRDS
fn push_incremental_snapshot_hash(
&mut self,
incremental_snapshot_hash: IncrementalSnapshotHash,
incremental_snapshot_hash: (Slot, SnapshotHash),
base_slot: Slot,
) {
// If the base snapshot hash is different from the one in IncrementalSnapshotHashes, then
// that means the old incremental snapshot hashes are no longer valid, so clear them all
// out.
if incremental_snapshot_hash.base != self.incremental_snapshot_hashes.base {
self.incremental_snapshot_hashes.hashes.clear();
self.incremental_snapshot_hashes.base = incremental_snapshot_hash.base;
}
self.update_latest_incremental_snapshot_hash(incremental_snapshot_hash, base_slot);
self.push_latest_snapshot_hashes_to_cluster();
}

self.incremental_snapshot_hashes
.hashes
.push(incremental_snapshot_hash.hash);
/// Update the latest snapshot hashes with a new full snapshot
fn update_latest_full_snapshot_hash(&mut self, full_snapshot_hash: (Slot, SnapshotHash)) {
self.latest_snapshot_hashes = Some(LatestSnapshotHashes {
full: full_snapshot_hash,
incremental: None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we've gotten a new full snapshot, we know the cannot be any incremental snapshots yet (based on this full snapshot).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be nice as a comment in the code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #31171 to handle this request.

});
}

retain_max_n_elements(
&mut self.incremental_snapshot_hashes.hashes,
self.max_incremental_snapshot_hashes,
/// Update the latest snapshot hashes with a new incremental snapshot
fn update_latest_incremental_snapshot_hash(
&mut self,
incremental_snapshot_hash: (Slot, SnapshotHash),
base_slot: Slot,
) {
let latest_snapshot_hashes = self
.latest_snapshot_hashes
.as_mut()
.expect("there must already be a full snapshot hash");
Comment on lines +114 to +117
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the other side of the same coin is here. In order to get a new incremental snapshot, there must already be a full snapshot to base the incremental on.

assert_eq!(
base_slot, latest_snapshot_hashes.full.0,
"the incremental snapshot's base slot ({}) must match the latest full snapshot's slot ({})",
base_slot, latest_snapshot_hashes.full.0,
);
latest_snapshot_hashes.incremental = Some(incremental_snapshot_hash);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we only care about the latest snapshots, we don't need to push to a vector and then trim the size. Just directly set the new value.

}

// Pushing incremental snapshot hashes to the cluster should never fail. The only error
// case is when the length of the hashes is too big, but we account for that with
// `max_incremental_snapshot_hashes`. If this call ever does error, it's a programmer bug!
// Check to see what changed in `push_snapshot_hashes()` and handle the new
// error condition here.
/// Push the latest snapshot hashes to the cluster via CRDS
fn push_latest_snapshot_hashes_to_cluster(&self) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The impl of this function looks similar to the bottom half of the old push_incremental_snapshot_hash(). (It was mostly copied from it 😸 )

let Some(latest_snapshot_hashes) = self.latest_snapshot_hashes.as_ref() else {
return;
};

// Pushing snapshot hashes to the cluster should never fail. The only error case is when
// the length of the incremental hashes is too big, (and we send a maximum of one here).
// If this call ever does error, it's a programmer bug! Check to see what changed in
// `push_snapshot_hashes()` and handle the new error condition here.
self.cluster_info
.push_snapshot_hashes(
clone_hash_for_crds(&self.incremental_snapshot_hashes.base),
clone_hashes_for_crds(&self.incremental_snapshot_hashes.hashes),
clone_hash_for_crds(&latest_snapshot_hashes.full),
latest_snapshot_hashes
.incremental
bw-solana marked this conversation as resolved.
Show resolved Hide resolved
.iter()
.map(clone_hash_for_crds)
.collect(),
)
.expect(
"Bug! The programmer contract has changed for push_snapshot_hashes() \
and a new error case has been added, which has not been handled here.",
and a new error case has been added that has not been handled here.",
);
}

/// Add `full_snapshot_hash` to the vector of full snapshot hashes, then push that vector to
/// the cluster via CRDS.
fn push_legacy_full_snapshot_hash(&mut self, full_snapshot_hash: FullSnapshotHash) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fn is unchanged from the old push_full_snapshot_hash(). Just renamed and moved here.

self.legacy_full_snapshot_hashes
.hashes
.push(full_snapshot_hash.hash);

retain_max_n_elements(
&mut self.legacy_full_snapshot_hashes.hashes,
self.max_full_snapshot_hashes,
);

self.cluster_info
.push_legacy_snapshot_hashes(clone_hashes_for_crds(
&self.legacy_full_snapshot_hashes.hashes,
));
}
}

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
struct LatestSnapshotHashes {
full: (Slot, SnapshotHash),
incremental: Option<(Slot, SnapshotHash)>,
}

/// Clones and maps snapshot hashes into what CRDS expects
Expand Down