From f4bc1be708298c0896978c1ce286dcdbf1ecc221 Mon Sep 17 00:00:00 2001 From: musitdev Date: Fri, 22 Dec 2023 18:11:46 +0100 Subject: [PATCH] Add cluster node remove geyzer notification --- core/src/validator.rs | 2 + .../src/geyser_plugin_interface.rs | 6 +++ .../src/cluster_info_notifier.rs | 41 ++++++++++++++++++- gossip/src/crds.rs | 2 + gossip/src/crds/geyser_plugin_utils.rs | 7 +++- 5 files changed, 56 insertions(+), 2 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index d68319c4a7e7e3..5e8d48e3341d63 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -734,6 +734,8 @@ impl Validator { identity_keypair.clone(), socket_addr_space, ); + + //register Geyzer notifier. cluster_info.set_clusterinfo_notifier(cluster_info_notifier); cluster_info.set_contact_debug_interval(config.contact_debug_interval); cluster_info.set_entrypoints(cluster_entrypoints); diff --git a/geyser-plugin-interface/src/geyser_plugin_interface.rs b/geyser-plugin-interface/src/geyser_plugin_interface.rs index e378522563fca2..1688aab9490c9f 100644 --- a/geyser-plugin-interface/src/geyser_plugin_interface.rs +++ b/geyser-plugin-interface/src/geyser_plugin_interface.rs @@ -355,6 +355,12 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug { Ok(()) } + /// Called when a cluster info is removed on gossip network. + #[allow(unused_variables)] + fn notify_clusterinfo_remove(&self, pubkey: &Pubkey) -> Result<()> { + Ok(()) + } + /// Called when all accounts are notified of during startup. fn notify_end_of_startup(&self) -> Result<()> { Ok(()) diff --git a/geyser-plugin-manager/src/cluster_info_notifier.rs b/geyser-plugin-manager/src/cluster_info_notifier.rs index f7a922a2e74c6e..c200c70c07ed0d 100644 --- a/geyser-plugin-manager/src/cluster_info_notifier.rs +++ b/geyser-plugin-manager/src/cluster_info_notifier.rs @@ -109,6 +109,45 @@ impl ClusterInfoNotifierInterface for ClusterInfoNotifierImpl { } fn notify_clusterinfo_remove(&self, pubkey: &Pubkey) { - todo!() + let mut measure2 = Measure::start("geyser-plugin-notify_plugins_of_cluster_info_update"); + let plugin_manager = self.plugin_manager.read().unwrap(); + + if plugin_manager.plugins.is_empty() { + return; + } + for plugin in plugin_manager.plugins.iter() { + let mut measure = Measure::start("geyser-plugin-remove-cluster_info"); + match plugin.notify_clusterinfo_remove(pubkey) { + Err(err) => { + error!( + "Failed to remove cluster_info {}, error: {} to plugin {}", + bs58::encode(pubkey).into_string(), + err, + plugin.name() + ) + } + Ok(_) => { + trace!( + "Successfully remove cluster_info {} to plugin {}", + bs58::encode(pubkey).into_string(), + plugin.name() + ); + } + } + measure.stop(); + inc_new_counter_debug!( + "geyser-plugin-remove-cluster_info-us", + measure.as_us() as usize, + 100000, + 100000 + ); + } + measure2.stop(); + inc_new_counter_debug!( + "geyser-plugin-notify_plugins_of_cluster_info_remove-us", + measure2.as_us() as usize, + 100000, + 100000 + ); } } diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index a0aad657eafd18..99e8e181d1285e 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -550,6 +550,8 @@ impl Crds { self.shards.remove(index, &value); match value.value.data { CrdsData::LegacyContactInfo(_) => { + //notify geyzer interface + self.notify_clusterinfo_remove(&value.value.pubkey()); self.nodes.swap_remove(&index); } CrdsData::Vote(_, _) => { diff --git a/gossip/src/crds/geyser_plugin_utils.rs b/gossip/src/crds/geyser_plugin_utils.rs index b62e88b141d686..4aff2e6cc5f1ec 100644 --- a/gossip/src/crds/geyser_plugin_utils.rs +++ b/gossip/src/crds/geyser_plugin_utils.rs @@ -18,5 +18,10 @@ impl Crds { /// Notified when the AccountsDb is initialized at start when restored /// from a snapshot. - pub fn notify_clusterinfo_remove(&self, pubkey: &Pubkey) {} + pub fn notify_clusterinfo_remove(&self, pubkey: &Pubkey) { + if let Some(clusterinfo_update_notifier) = &self.clusterinfo_update_notifier { + let notifier = &clusterinfo_update_notifier.read().unwrap(); + notifier.notify_clusterinfo_remove(pubkey); + } + } }