From 07013957bc11849006218d3d59b84d8205e67771 Mon Sep 17 00:00:00 2001 From: brooks Date: Mon, 10 Apr 2023 18:00:19 -0400 Subject: [PATCH] Only push latest snapshot hashes in SnapshotGossipManager --- .../snapshot_gossip_manager.rs | 175 +++++++++++------- 1 file changed, 103 insertions(+), 72 deletions(-) diff --git a/core/src/snapshot_packager_service/snapshot_gossip_manager.rs b/core/src/snapshot_packager_service/snapshot_gossip_manager.rs index a5b173f96ea5d1..5e1335ca1adb2e 100644 --- a/core/src/snapshot_packager_service/snapshot_gossip_manager.rs +++ b/core/src/snapshot_packager_service/snapshot_gossip_manager.rs @@ -2,8 +2,7 @@ use { solana_gossip::cluster_info::ClusterInfo, solana_runtime::{ snapshot_hash::{ - FullSnapshotHash, FullSnapshotHashes, IncrementalSnapshotHash, - IncrementalSnapshotHashes, SnapshotHash, StartingSnapshotHashes, + FullSnapshotHash, FullSnapshotHashes, SnapshotHash, StartingSnapshotHashes, }, snapshot_package::{retain_max_n_elements, SnapshotType}, }, @@ -14,10 +13,9 @@ use { /// Manage pushing snapshot hash information to gossip pub struct SnapshotGossipManager { cluster_info: Arc, + latest_snapshot_hashes: Option, max_full_snapshot_hashes: usize, - max_incremental_snapshot_hashes: usize, - full_snapshot_hashes: FullSnapshotHashes, - incremental_snapshot_hashes: IncrementalSnapshotHashes, + legacy_full_snapshot_hashes: FullSnapshotHashes, } impl SnapshotGossipManager { @@ -26,40 +24,42 @@ impl SnapshotGossipManager { pub fn new( cluster_info: Arc, max_full_snapshot_hashes: usize, - max_incremental_snapshot_hashes: usize, + _max_incremental_snapshot_hashes: usize, ) -> Self { SnapshotGossipManager { cluster_info, + latest_snapshot_hashes: None, max_full_snapshot_hashes, - max_incremental_snapshot_hashes, - full_snapshot_hashes: FullSnapshotHashes { - hashes: Vec::default(), - }, - incremental_snapshot_hashes: IncrementalSnapshotHashes { - base: (Slot::default(), SnapshotHash(Hash::default())), + legacy_full_snapshot_hashes: FullSnapshotHashes { hashes: Vec::default(), }, } } - /// If there were starting snapshot hashes, add those to their respective vectors, then push - /// those vectors to the cluster via CRDS. + /// Push any starting snapshot hashes to the cluster via CRDS pub fn push_starting_snapshot_hashes( &mut self, starting_snapshot_hashes: Option, ) { - if let Some(starting_snapshot_hashes) = starting_snapshot_hashes { - let starting_full_snapshot_hash = starting_snapshot_hashes.full; - self.push_full_snapshot_hash(starting_full_snapshot_hash); + let Some(starting_snapshot_hashes) = starting_snapshot_hashes else { + return; + }; - if let Some(starting_incremental_snapshot_hash) = starting_snapshot_hashes.incremental { - self.push_incremental_snapshot_hash(starting_incremental_snapshot_hash); - }; + self.update_latest_full_snapshot_hash(starting_snapshot_hashes.full.hash); + if let Some(starting_incremental_snapshot_hash) = starting_snapshot_hashes.incremental { + self.update_latest_incremental_snapshot_hash( + starting_incremental_snapshot_hash.hash, + starting_incremental_snapshot_hash.base.0, + ); } + self.push_latest_snapshot_hashes_to_cluster(); + + // Handle legacy snapshot hashes here too + // Once LegacySnapshotHashes are removed from CRDS, also remove them here + self.push_legacy_full_snapshot_hash(starting_snapshot_hashes.full); } - /// Add `snapshot_hash` to its respective vector of hashes, then push that vector to the - /// cluster via CRDS. + /// Push new snapshot hash to the cluster via CRDS pub fn push_snapshot_hash( &mut self, snapshot_type: SnapshotType, @@ -67,79 +67,110 @@ impl SnapshotGossipManager { ) { match snapshot_type { SnapshotType::FullSnapshot => { - self.push_full_snapshot_hash(FullSnapshotHash { - hash: snapshot_hash, - }); + self.push_full_snapshot_hash(snapshot_hash); } SnapshotType::IncrementalSnapshot(base_slot) => { - let latest_full_snapshot_hash = *self.full_snapshot_hashes.hashes.last().unwrap(); - assert_eq!( - base_slot, latest_full_snapshot_hash.0, - "the incremental snapshot's base slot ({}) must match the latest full snapshot hash's slot ({})", - base_slot, latest_full_snapshot_hash.0, - ); - self.push_incremental_snapshot_hash(IncrementalSnapshotHash { - base: latest_full_snapshot_hash, - hash: snapshot_hash, - }); + self.push_incremental_snapshot_hash(snapshot_hash, base_slot); } } } - /// Add `full_snapshot_hash` to the vector of full snapshot hashes, then push that vector to - /// the cluster via CRDS. - fn push_full_snapshot_hash(&mut self, full_snapshot_hash: FullSnapshotHash) { - self.full_snapshot_hashes - .hashes - .push(full_snapshot_hash.hash); - - retain_max_n_elements( - &mut self.full_snapshot_hashes.hashes, - self.max_full_snapshot_hashes, - ); + /// Push new full snapshot hash to the cluster via CRDS + fn push_full_snapshot_hash(&mut self, full_snapshot_hash: (Slot, SnapshotHash)) { + self.update_latest_full_snapshot_hash(full_snapshot_hash); + self.push_latest_snapshot_hashes_to_cluster(); - self.cluster_info - .push_legacy_snapshot_hashes(clone_hashes_for_crds(&self.full_snapshot_hashes.hashes)); + // Handle legacy snapshot hashes here too + // Once LegacySnapshotHashes are removed from CRDS, also remove them here + self.push_legacy_full_snapshot_hash(FullSnapshotHash { + hash: full_snapshot_hash, + }); } - /// Add `incremental_snapshot_hash` to the vector of incremental snapshot hashes, then push - /// that vector to the cluster via CRDS. + /// Push new incremental snapshot hash to the cluster via CRDS fn push_incremental_snapshot_hash( &mut self, - incremental_snapshot_hash: IncrementalSnapshotHash, + incremental_snapshot_hash: (Slot, SnapshotHash), + base_slot: Slot, ) { - // If the base snapshot hash is different from the one in IncrementalSnapshotHashes, then - // that means the old incremental snapshot hashes are no longer valid, so clear them all - // out. - if incremental_snapshot_hash.base != self.incremental_snapshot_hashes.base { - self.incremental_snapshot_hashes.hashes.clear(); - self.incremental_snapshot_hashes.base = incremental_snapshot_hash.base; - } + self.update_latest_incremental_snapshot_hash(incremental_snapshot_hash, base_slot); + self.push_latest_snapshot_hashes_to_cluster(); + } - self.incremental_snapshot_hashes - .hashes - .push(incremental_snapshot_hash.hash); + /// Update the latest snapshot hashes with a new full snapshot + fn update_latest_full_snapshot_hash(&mut self, full_snapshot_hash: (Slot, SnapshotHash)) { + self.latest_snapshot_hashes = Some(LatestSnapshotHashes { + full: full_snapshot_hash, + incremental: None, + }); + } - retain_max_n_elements( - &mut self.incremental_snapshot_hashes.hashes, - self.max_incremental_snapshot_hashes, + /// Update the latest snapshot hashes with a new incremental snapshot + fn update_latest_incremental_snapshot_hash( + &mut self, + incremental_snapshot_hash: (Slot, SnapshotHash), + base_slot: Slot, + ) { + let latest_snapshot_hashes = self + .latest_snapshot_hashes + .as_mut() + .expect("there must already be a full snapshot hash"); + assert_eq!( + base_slot, latest_snapshot_hashes.full.0, + "the incremental snapshot's base slot ({}) must match the latest full snapshot's slot ({})", + base_slot, latest_snapshot_hashes.full.0, ); + latest_snapshot_hashes.incremental = Some(incremental_snapshot_hash); + } - // Pushing incremental snapshot hashes to the cluster should never fail. The only error - // case is when the length of the hashes is too big, but we account for that with - // `max_incremental_snapshot_hashes`. If this call ever does error, it's a programmer bug! - // Check to see what changed in `push_snapshot_hashes()` and handle the new - // error condition here. + /// Push the latest snapshot hashes to the cluster via CRDS + fn push_latest_snapshot_hashes_to_cluster(&self) { + let Some(latest_snapshot_hashes) = self.latest_snapshot_hashes.as_ref() else { + return; + }; + + // Pushing snapshot hashes to the cluster should never fail. The only error case is when + // the length of the incremental hashes is too big, (and we send a maximum of one here). + // If this call ever does error, it's a programmer bug! Check to see what changed in + // `push_snapshot_hashes()` and handle the new error condition here. self.cluster_info .push_snapshot_hashes( - clone_hash_for_crds(&self.incremental_snapshot_hashes.base), - clone_hashes_for_crds(&self.incremental_snapshot_hashes.hashes), + clone_hash_for_crds(&latest_snapshot_hashes.full), + latest_snapshot_hashes + .incremental + .iter() + .map(clone_hash_for_crds) + .collect(), ) .expect( "Bug! The programmer contract has changed for push_snapshot_hashes() \ - and a new error case has been added, which has not been handled here.", + and a new error case has been added that has not been handled here.", ); } + + /// Add `full_snapshot_hash` to the vector of full snapshot hashes, then push that vector to + /// the cluster via CRDS. + fn push_legacy_full_snapshot_hash(&mut self, full_snapshot_hash: FullSnapshotHash) { + self.legacy_full_snapshot_hashes + .hashes + .push(full_snapshot_hash.hash); + + retain_max_n_elements( + &mut self.legacy_full_snapshot_hashes.hashes, + self.max_full_snapshot_hashes, + ); + + self.cluster_info + .push_legacy_snapshot_hashes(clone_hashes_for_crds( + &self.legacy_full_snapshot_hashes.hashes, + )); + } +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +struct LatestSnapshotHashes { + full: (Slot, SnapshotHash), + incremental: Option<(Slot, SnapshotHash)>, } /// Clones and maps snapshot hashes into what CRDS expects