Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gossip notifications / cluster node notifications to geyser #34618

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,13 +644,19 @@ impl Validator {
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier());

let cluster_info_notifier = geyser_plugin_service
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_cluster_info_notifier());

info!(
"Geyser plugin: accounts_update_notifier: {}, \
transaction_notifier: {}, \
entry_notifier: {}",
entry_notifier: {} \
cluster_info_notifier: {}",
accounts_update_notifier.is_some(),
transaction_notifier.is_some(),
entry_notifier.is_some()
entry_notifier.is_some(),
cluster_info_notifier.is_some(),
);

let system_monitor_service = Some(SystemMonitorService::new(
Expand Down Expand Up @@ -728,6 +734,9 @@ impl Validator {
identity_keypair.clone(),
socket_addr_space,
);

//register Geyzer notifier.
cluster_info.set_clusterinfo_notifier(cluster_info_notifier);
cluster_info.set_contact_debug_interval(config.contact_debug_interval);
cluster_info.set_entrypoints(cluster_entrypoints);
cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
Expand Down
1 change: 1 addition & 0 deletions geyser-plugin-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ edition = { workspace = true }

[dependencies]
log = { workspace = true }
solana-gossip = { workspace = true }
solana-sdk = { workspace = true }
solana-transaction-status = { workspace = true }
thiserror = { workspace = true }
Expand Down
51 changes: 51 additions & 0 deletions geyser-plugin-interface/src/geyser_plugin_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
/// the GeyserPlugin trait to work with the runtime.
/// In addition, the dynamic library must export a "C" function _create_plugin which
/// creates the implementation of the plugin.
use solana_sdk::pubkey::Pubkey;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow our coding style, put them in "use {...}";

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected

use std::net::SocketAddr;
use {
solana_sdk::{
clock::{Slot, UnixTimestamp},
Expand All @@ -13,6 +15,36 @@ use {
thiserror::Error,
};

#[derive(Debug, Clone, PartialEq, Eq)]
/// Information about a node in the cluster.
pub struct ReplicaClusterInfoNode {
pub id: Pubkey,
/// gossip address
pub gossip: Option<SocketAddr>,
/// address to connect to for replication
pub tvu: Option<SocketAddr>,
/// TVU over QUIC protocol.
pub tvu_quic: Option<SocketAddr>,
/// repair service over QUIC protocol.
pub serve_repair_quic: Option<SocketAddr>,
/// transactions address
pub tpu: Option<SocketAddr>,
/// address to forward unprocessed transactions to
pub tpu_forwards: Option<SocketAddr>,
/// address to which to send bank state requests
pub tpu_vote: Option<SocketAddr>,
/// address to which to send JSON-RPC requests
pub rpc: Option<SocketAddr>,
/// websocket for JSON-RPC push notifications
pub rpc_pubsub: Option<SocketAddr>,
/// address to send repair requests to
pub serve_repair: Option<SocketAddr>,
/// latest wallclock picked
pub wallclock: u64,
/// node shred version
pub shred_version: u16,
}

#[derive(Debug, Clone, PartialEq, Eq)]
/// Information about an account being updated
pub struct ReplicaAccountInfo<'a> {
Expand Down Expand Up @@ -317,6 +349,18 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
Ok(())
}

/// Called when a cluster info is updated on gossip network.
#[allow(unused_variables)]
fn update_cluster_info(&self, cluster_info: &ReplicaClusterInfoNode) -> Result<()> {
Ok(())
}

/// Called when a cluster info is removed on gossip network.
#[allow(unused_variables)]
fn notify_clusterinfo_remove(&self, pubkey: &Pubkey) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this called? notify_clusterinfo_remove?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can find the call here:

self.notify_clusterinfo_remove(&value.value.pubkey());

Ok(())
}

/// Called when all accounts are notified of during startup.
fn notify_end_of_startup(&self) -> Result<()> {
Ok(())
Expand Down Expand Up @@ -375,4 +419,11 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
fn entry_notifications_enabled(&self) -> bool {
false
}

/// Check if the plugin is interested in cluster info data
/// Default is false -- if the plugin is interested in
/// cluster info data, return true.
fn clusterinfo_notifications_enabled(&self) -> bool {
false
}
}
2 changes: 2 additions & 0 deletions geyser-plugin-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ libloading = { workspace = true }
log = { workspace = true }
serde_json = { workspace = true }
solana-accounts-db = { workspace = true }
solana-client = { workspace = true }
solana-entry = { workspace = true }
solana-geyser-plugin-interface = { workspace = true }
solana-gossip = { workspace = true }
solana-ledger = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
Expand Down
153 changes: 153 additions & 0 deletions geyser-plugin-manager/src/cluster_info_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/// Module responsible for notifying plugins of transactions
use solana_gossip::legacy_contact_info::LegacyContactInfo;
use solana_sdk::pubkey::Pubkey;
use {
crate::geyser_plugin_manager::GeyserPluginManager,
log::*,
solana_client::connection_cache::Protocol,
solana_geyser_plugin_interface::geyser_plugin_interface::ReplicaClusterInfoNode,
solana_gossip::cluster_info_notifier_interface::ClusterInfoNotifierInterface,
solana_measure::measure::Measure,
solana_metrics::*,
solana_rpc::transaction_notifier_interface::TransactionNotifier,
solana_sdk::{clock::Slot, signature::Signature, transaction::SanitizedTransaction},
solana_transaction_status::TransactionStatusMeta,
std::sync::{Arc, RwLock},
};

/// This implementation of ClusterInfoNotifierImpl is passed to the rpc's TransactionStatusService
/// at the validator startup. TransactionStatusService invokes the notify_transaction method
/// for new transactions. The implementation in turn invokes the notify_transaction of each
/// plugin enabled with transaction notification managed by the GeyserPluginManager.
#[derive(Debug)]
pub(crate) struct ClusterInfoNotifierImpl {
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
}

impl ClusterInfoNotifierImpl {
pub fn new(plugin_manager: Arc<RwLock<GeyserPluginManager>>) -> Self {
ClusterInfoNotifierImpl { plugin_manager }
}

fn clusterinfo_from_legacy_contact_info(
legacy_info: &LegacyContactInfo,
) -> ReplicaClusterInfoNode {
ReplicaClusterInfoNode {
id: *legacy_info.pubkey(),
/// gossip address
gossip: legacy_info.gossip().ok(),
/// address to connect to for replication
tvu: legacy_info.tvu(Protocol::UDP).ok(),
/// TVU over QUIC protocol.
tvu_quic: legacy_info.tvu(Protocol::QUIC).ok(),
/// repair service over QUIC protocol.
serve_repair_quic: legacy_info.serve_repair(Protocol::QUIC).ok(),
/// transactions address
tpu: legacy_info.tpu(Protocol::UDP).ok(),
/// address to forward unprocessed transactions to
tpu_forwards: legacy_info.tpu_forwards(Protocol::UDP).ok(),
/// address to which to send bank state requests
tpu_vote: legacy_info.tpu_vote().ok(),
/// address to which to send JSON-RPC requests
rpc: legacy_info.rpc().ok(),
/// websocket for JSON-RPC push notifications
rpc_pubsub: legacy_info.rpc_pubsub().ok(),
/// address to send repair requests to
serve_repair: legacy_info.serve_repair(Protocol::UDP).ok(),
/// latest wallclock picked
wallclock: legacy_info.wallclock(),
/// node shred version
shred_version: legacy_info.shred_version(),
}
}
}

impl ClusterInfoNotifierInterface for ClusterInfoNotifierImpl {
fn notify_clusterinfo_update(&self, contact_info: &LegacyContactInfo) {
let cluster_info =
ClusterInfoNotifierImpl::clusterinfo_from_legacy_contact_info(contact_info);
let mut measure2 = Measure::start("geyser-plugin-notify_plugins_of_cluster_info_update");
let plugin_manager = self.plugin_manager.read().unwrap();

if plugin_manager.plugins.is_empty() {
return;
}
for plugin in plugin_manager.plugins.iter() {
let mut measure = Measure::start("geyser-plugin-update-cluster_info");
match plugin.update_cluster_info(&cluster_info) {
Err(err) => {
error!(
"Failed to update cluster_info {}, error: {} to plugin {}",
bs58::encode(cluster_info.id).into_string(),
err,
plugin.name()
)
}
Ok(_) => {
trace!(
"Successfully updated cluster_info {} to plugin {}",
bs58::encode(cluster_info.id).into_string(),
plugin.name()
);
}
}
measure.stop();
inc_new_counter_debug!(
"geyser-plugin-update-cluster_info-us",
measure.as_us() as usize,
100000,
100000
);
}
measure2.stop();
inc_new_counter_debug!(
"geyser-plugin-notify_plugins_of_cluster_info_update-us",
measure2.as_us() as usize,
100000,
100000
);
}

fn notify_clusterinfo_remove(&self, pubkey: &Pubkey) {
let mut measure2 = Measure::start("geyser-plugin-notify_plugins_of_cluster_info_update");
let plugin_manager = self.plugin_manager.read().unwrap();

if plugin_manager.plugins.is_empty() {
return;
}
for plugin in plugin_manager.plugins.iter() {
let mut measure = Measure::start("geyser-plugin-remove-cluster_info");
match plugin.notify_clusterinfo_remove(pubkey) {
Err(err) => {
error!(
"Failed to remove cluster_info {}, error: {} to plugin {}",
bs58::encode(pubkey).into_string(),
err,
plugin.name()
)
}
Ok(_) => {
trace!(
"Successfully remove cluster_info {} to plugin {}",
bs58::encode(pubkey).into_string(),
plugin.name()
);
}
}
measure.stop();
inc_new_counter_debug!(
"geyser-plugin-remove-cluster_info-us",
measure.as_us() as usize,
100000,
100000
);
}
measure2.stop();
inc_new_counter_debug!(
"geyser-plugin-notify_plugins_of_cluster_info_remove-us",
measure2.as_us() as usize,
100000,
100000
);
}
}
9 changes: 9 additions & 0 deletions geyser-plugin-manager/src/geyser_plugin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ impl GeyserPluginManager {
false
}

/// Check if the plugin is interested in cluster info data
pub fn clusterinfo_notifications_enabled(&self) -> bool {
for plugin in &self.plugins {
if plugin.entry_notifications_enabled() {
return true;
}
}
false
}
/// Admin RPC request handler
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())
Expand Down
17 changes: 17 additions & 0 deletions geyser-plugin-manager/src/geyser_plugin_service.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::cluster_info_notifier::ClusterInfoNotifierImpl;
use solana_gossip::cluster_info_notifier_interface::ClusterInfoUpdateNotifierLock;
use {
crate::{
accounts_update_notifier::AccountsUpdateNotifierImpl,
Expand Down Expand Up @@ -37,6 +39,7 @@ pub struct GeyserPluginService {
transaction_notifier: Option<TransactionNotifierLock>,
entry_notifier: Option<EntryNotifierLock>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
cluster_info_notifier: Option<ClusterInfoUpdateNotifierLock>,
}

impl GeyserPluginService {
Expand Down Expand Up @@ -81,8 +84,17 @@ impl GeyserPluginService {
plugin_manager.account_data_notifications_enabled();
let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled();
let entry_notifications_enabled = plugin_manager.entry_notifications_enabled();
let cluster_info_notifications_enabled = plugin_manager.clusterinfo_notifications_enabled();
let plugin_manager = Arc::new(RwLock::new(plugin_manager));

let cluster_info_notifier: Option<ClusterInfoUpdateNotifierLock> =
if cluster_info_notifications_enabled {
let cluster_info_notifier = ClusterInfoNotifierImpl::new(plugin_manager.clone());
Some(Arc::new(RwLock::new(cluster_info_notifier)))
} else {
None
};

let accounts_update_notifier: Option<AccountsUpdateNotifier> =
if account_data_notifications_enabled {
let accounts_update_notifier =
Expand Down Expand Up @@ -143,6 +155,7 @@ impl GeyserPluginService {
transaction_notifier,
entry_notifier,
block_metadata_notifier,
cluster_info_notifier,
})
}

Expand Down Expand Up @@ -172,6 +185,10 @@ impl GeyserPluginService {
self.block_metadata_notifier.clone()
}

pub fn get_cluster_info_notifier(&self) -> Option<ClusterInfoUpdateNotifierLock> {
self.cluster_info_notifier.clone()
}

pub fn join(self) -> thread::Result<()> {
if let Some(mut slot_status_observer) = self.slot_status_observer {
slot_status_observer.join()?;
Expand Down
1 change: 1 addition & 0 deletions geyser-plugin-manager/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod accounts_update_notifier;
pub mod block_metadata_notifier;
pub mod block_metadata_notifier_interface;
pub mod cluster_info_notifier;
pub mod entry_notifier;
pub mod geyser_plugin_manager;
pub mod geyser_plugin_service;
Expand Down
8 changes: 8 additions & 0 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
note = "Please use `solana_net_utils::{MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE}` instead"
)]
#[allow(deprecated)]
use crate::cluster_info_notifier_interface::ClusterInfoUpdateNotifierLock;
pub use solana_net_utils::{MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE};
use {
crate::{
Expand Down Expand Up @@ -439,6 +440,13 @@ impl ClusterInfo {
me
}

pub fn set_clusterinfo_notifier(
&self,
cluster_info_notifier: Option<ClusterInfoUpdateNotifierLock>,
) {
self.gossip.set_clusterinfo_notifier(cluster_info_notifier);
}

pub fn set_contact_debug_interval(&mut self, new: u64) {
self.contact_debug_interval = new;
}
Expand Down
Loading
Loading