Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Add entry notification service for geyser (#31290)
Browse files Browse the repository at this point in the history
* Move entry_notifier_interface

* Add EntryNotifierService

* Use descriptive struct in sender/receiver

* Optionally initialize EntryNotifierService in validator

* Plumb EntryNotfierSender into Tvu, blockstore_processor

* Plumb EntryNotfierSender into Tpu

* Only return one option when constructing EntryNotifierService
  • Loading branch information
Tyera authored May 10, 2023
1 parent c900ef8 commit 3f70ddb
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use {
blockstore_processor::{
self, BlockstoreProcessorError, ConfirmationProgress, TransactionStatusSender,
},
entry_notifier_service::EntryNotifierSender,
leader_schedule_cache::LeaderScheduleCache,
leader_schedule_utils::first_of_consecutive_leader_slots,
},
Expand Down Expand Up @@ -236,6 +237,7 @@ pub struct ReplayStageConfig {
pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
pub cache_block_meta_sender: Option<CacheBlockMetaSender>,
pub entry_notification_sender: Option<EntryNotifierSender>,
pub bank_notification_sender: Option<BankNotificationSenderConfig>,
pub wait_for_vote_to_start_leader: bool,
pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender,
Expand Down Expand Up @@ -501,6 +503,7 @@ impl ReplayStage {
transaction_status_sender,
rewards_recorder_sender,
cache_block_meta_sender,
entry_notification_sender,
bank_notification_sender,
wait_for_vote_to_start_leader,
ancestor_hashes_replay_update_sender,
Expand Down Expand Up @@ -596,6 +599,7 @@ impl ReplayStage {
&mut progress,
transaction_status_sender.as_ref(),
cache_block_meta_sender.as_ref(),
entry_notification_sender.as_ref(),
&verify_recyclers,
&mut heaviest_subtree_fork_choice,
&replay_vote_sender,
Expand Down Expand Up @@ -1869,12 +1873,14 @@ impl ReplayStage {
}
}

#[allow(clippy::too_many_arguments)]
fn replay_blockstore_into_bank(
bank: &Arc<Bank>,
blockstore: &Blockstore,
replay_stats: &RwLock<ReplaySlotStats>,
replay_progress: &RwLock<ConfirmationProgress>,
transaction_status_sender: Option<&TransactionStatusSender>,
entry_notification_sender: Option<&EntryNotifierSender>,
replay_vote_sender: &ReplayVoteSender,
verify_recyclers: &VerifyRecyclers,
log_messages_bytes_limit: Option<usize>,
Expand All @@ -1893,6 +1899,7 @@ impl ReplayStage {
&mut w_replay_progress,
false,
transaction_status_sender,
entry_notification_sender,
Some(replay_vote_sender),
verify_recyclers,
false,
Expand Down Expand Up @@ -2412,6 +2419,7 @@ impl ReplayStage {
vote_account: &Pubkey,
progress: &mut ProgressMap,
transaction_status_sender: Option<&TransactionStatusSender>,
entry_notification_sender: Option<&EntryNotifierSender>,
verify_recyclers: &VerifyRecyclers,
replay_vote_sender: &ReplayVoteSender,
replay_timing: &mut ReplayTiming,
Expand Down Expand Up @@ -2490,6 +2498,7 @@ impl ReplayStage {
&replay_stats,
&replay_progress,
transaction_status_sender,
entry_notification_sender,
&replay_vote_sender.clone(),
&verify_recyclers.clone(),
log_messages_bytes_limit,
Expand Down Expand Up @@ -2519,6 +2528,7 @@ impl ReplayStage {
vote_account: &Pubkey,
progress: &mut ProgressMap,
transaction_status_sender: Option<&TransactionStatusSender>,
entry_notification_sender: Option<&EntryNotifierSender>,
verify_recyclers: &VerifyRecyclers,
replay_vote_sender: &ReplayVoteSender,
replay_timing: &mut ReplayTiming,
Expand Down Expand Up @@ -2571,6 +2581,7 @@ impl ReplayStage {
&bank_progress.replay_stats,
&bank_progress.replay_progress,
transaction_status_sender,
entry_notification_sender,
&replay_vote_sender.clone(),
&verify_recyclers.clone(),
log_messages_bytes_limit,
Expand Down Expand Up @@ -2781,6 +2792,7 @@ impl ReplayStage {
progress: &mut ProgressMap,
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
entry_notification_sender: Option<&EntryNotifierSender>,
verify_recyclers: &VerifyRecyclers,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
replay_vote_sender: &ReplayVoteSender,
Expand Down Expand Up @@ -2819,6 +2831,7 @@ impl ReplayStage {
vote_account,
progress,
transaction_status_sender,
entry_notification_sender,
verify_recyclers,
replay_vote_sender,
replay_timing,
Expand All @@ -2837,6 +2850,7 @@ impl ReplayStage {
vote_account,
progress,
transaction_status_sender,
entry_notification_sender,
verify_recyclers,
replay_vote_sender,
replay_timing,
Expand Down Expand Up @@ -4437,6 +4451,7 @@ pub(crate) mod tests {
&bank1_progress.replay_stats,
&bank1_progress.replay_progress,
None,
None,
&replay_vote_sender,
&VerifyRecyclers::default(),
None,
Expand Down
6 changes: 5 additions & 1 deletion core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use {
crossbeam_channel::{unbounded, Receiver},
solana_client::connection_cache::ConnectionCache,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusSender},
solana_ledger::{
blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
entry_notifier_service::EntryNotifierSender,
},
solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry},
solana_rpc::{
optimistically_confirmed_bank_tracker::BankNotificationSender,
Expand Down Expand Up @@ -80,6 +83,7 @@ impl Tpu {
sockets: TpuSockets,
subscriptions: &Arc<RpcSubscriptions>,
transaction_status_sender: Option<TransactionStatusSender>,
_entry_notification_sender: Option<EntryNotifierSender>,
blockstore: &Arc<Blockstore>,
broadcast_type: &BroadcastStageType,
exit: &Arc<AtomicBool>,
Expand Down
5 changes: 4 additions & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use {
},
solana_ledger::{
blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
leader_schedule_cache::LeaderScheduleCache,
entry_notifier_service::EntryNotifierSender, leader_schedule_cache::LeaderScheduleCache,
},
solana_poh::poh_recorder::PohRecorder,
solana_rpc::{
Expand Down Expand Up @@ -120,6 +120,7 @@ impl Tvu {
transaction_status_sender: Option<TransactionStatusSender>,
rewards_recorder_sender: Option<RewardsRecorderSender>,
cache_block_meta_sender: Option<CacheBlockMetaSender>,
entry_notification_sender: Option<EntryNotifierSender>,
vote_tracker: Arc<VoteTracker>,
retransmit_slots_sender: RetransmitSlotsSender,
gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
Expand Down Expand Up @@ -243,6 +244,7 @@ impl Tvu {
transaction_status_sender,
rewards_recorder_sender,
cache_block_meta_sender,
entry_notification_sender,
bank_notification_sender,
wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
ancestor_hashes_replay_update_sender,
Expand Down Expand Up @@ -460,6 +462,7 @@ pub mod tests {
None,
None,
None,
None,
Arc::<VoteTracker>::default(),
retransmit_slots_sender,
gossip_verified_vote_hash_receiver,
Expand Down
45 changes: 43 additions & 2 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ use {
},
blockstore_options::{BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions},
blockstore_processor::{self, TransactionStatusSender},
entry_notifier_interface::EntryNotifierLock,
entry_notifier_service::{EntryNotifierSender, EntryNotifierService},
leader_schedule::FixedSchedule,
leader_schedule_cache::LeaderScheduleCache,
},
Expand Down Expand Up @@ -422,6 +424,7 @@ pub struct Validator {
transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_service: Option<RewardsRecorderService>,
cache_block_meta_service: Option<CacheBlockMetaService>,
entry_notifier_service: Option<EntryNotifierService>,
system_monitor_service: Option<SystemMonitorService>,
sample_performance_service: Option<SamplePerformanceService>,
poh_timing_report_service: PohTimingReportService,
Expand Down Expand Up @@ -584,14 +587,21 @@ impl Validator {
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_transaction_notifier());

let entry_notifier = geyser_plugin_service
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_entry_notifier());

let block_metadata_notifier = geyser_plugin_service
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier());

info!(
"Geyser plugin: accounts_update_notifier: {} transaction_notifier: {}",
"Geyser plugin: accounts_update_notifier: {}, \
transaction_notifier: {}, \
entry_notifier: {}",
accounts_update_notifier.is_some(),
transaction_notifier.is_some()
transaction_notifier.is_some(),
entry_notifier.is_some()
);

let system_monitor_service = Some(SystemMonitorService::new(
Expand Down Expand Up @@ -630,13 +640,15 @@ impl Validator {
blockstore_process_options,
blockstore_root_scan,
pruned_banks_receiver,
entry_notifier_service,
) = load_blockstore(
config,
ledger_path,
&exit,
&start_progress,
accounts_update_notifier,
transaction_notifier,
entry_notifier,
Some(poh_timing_point_sender.clone()),
)?;

Expand Down Expand Up @@ -751,6 +763,9 @@ impl Validator {
);

let leader_schedule_cache = Arc::new(leader_schedule_cache);
let entry_notification_sender = entry_notifier_service
.as_ref()
.map(|service| service.sender());
let mut process_blockstore = ProcessBlockStore::new(
&id,
vote_account,
Expand All @@ -762,6 +777,7 @@ impl Validator {
&blockstore_process_options,
transaction_status_sender.as_ref(),
cache_block_meta_sender.clone(),
entry_notification_sender,
blockstore_root_scan,
accounts_background_request_sender.clone(),
config,
Expand Down Expand Up @@ -1074,6 +1090,9 @@ impl Validator {
info!("Disabled banking tracer");
}

let entry_notification_sender = entry_notifier_service
.as_ref()
.map(|service| service.sender_cloned());
let (replay_vote_sender, replay_vote_receiver) = unbounded();
let tvu = Tvu::new(
vote_account,
Expand All @@ -1100,6 +1119,7 @@ impl Validator {
transaction_status_sender.clone(),
rewards_recorder_sender,
cache_block_meta_sender,
entry_notification_sender.clone(),
vote_tracker.clone(),
retransmit_slots_sender,
gossip_verified_vote_hash_receiver,
Expand Down Expand Up @@ -1141,6 +1161,7 @@ impl Validator {
},
&rpc_subscriptions,
transaction_status_sender,
entry_notification_sender,
&blockstore,
&config.broadcast_stage_type,
&exit,
Expand Down Expand Up @@ -1183,6 +1204,7 @@ impl Validator {
transaction_status_service,
rewards_recorder_service,
cache_block_meta_service,
entry_notifier_service,
system_monitor_service,
sample_performance_service,
poh_timing_report_service,
Expand Down Expand Up @@ -1299,6 +1321,12 @@ impl Validator {
.expect("sample_performance_service");
}

if let Some(entry_notifier_service) = self.entry_notifier_service {
entry_notifier_service
.join()
.expect("entry_notifier_service");
}

if let Some(s) = self.snapshot_packager_service {
s.join().expect("snapshot_packager_service");
}
Expand Down Expand Up @@ -1488,6 +1516,7 @@ fn load_blockstore(
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
transaction_notifier: Option<TransactionNotifierLock>,
entry_notifier: Option<EntryNotifierLock>,
poh_timing_point_sender: Option<PohTimingSender>,
) -> Result<
(
Expand All @@ -1503,6 +1532,7 @@ fn load_blockstore(
blockstore_processor::ProcessOptions,
BlockstoreRootScan,
DroppedSlotsReceiver,
Option<EntryNotifierService>,
),
String,
> {
Expand Down Expand Up @@ -1580,6 +1610,9 @@ fn load_blockstore(
TransactionHistoryServices::default()
};

let entry_notifier_service =
entry_notifier.map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit));

let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
bank_forks_utils::load_bank_forks(
&genesis_config,
Expand All @@ -1591,6 +1624,9 @@ fn load_blockstore(
transaction_history_services
.cache_block_meta_sender
.as_ref(),
entry_notifier_service
.as_ref()
.map(|service| service.sender()),
accounts_update_notifier,
exit,
);
Expand Down Expand Up @@ -1643,6 +1679,7 @@ fn load_blockstore(
process_options,
blockstore_root_scan,
pruned_banks_receiver,
entry_notifier_service,
))
}

Expand All @@ -1657,6 +1694,7 @@ pub struct ProcessBlockStore<'a> {
process_options: &'a blockstore_processor::ProcessOptions,
transaction_status_sender: Option<&'a TransactionStatusSender>,
cache_block_meta_sender: Option<CacheBlockMetaSender>,
entry_notification_sender: Option<&'a EntryNotifierSender>,
blockstore_root_scan: Option<BlockstoreRootScan>,
accounts_background_request_sender: AbsRequestSender,
config: &'a ValidatorConfig,
Expand All @@ -1676,6 +1714,7 @@ impl<'a> ProcessBlockStore<'a> {
process_options: &'a blockstore_processor::ProcessOptions,
transaction_status_sender: Option<&'a TransactionStatusSender>,
cache_block_meta_sender: Option<CacheBlockMetaSender>,
entry_notification_sender: Option<&'a EntryNotifierSender>,
blockstore_root_scan: BlockstoreRootScan,
accounts_background_request_sender: AbsRequestSender,
config: &'a ValidatorConfig,
Expand All @@ -1691,6 +1730,7 @@ impl<'a> ProcessBlockStore<'a> {
process_options,
transaction_status_sender,
cache_block_meta_sender,
entry_notification_sender,
blockstore_root_scan: Some(blockstore_root_scan),
accounts_background_request_sender,
config,
Expand Down Expand Up @@ -1728,6 +1768,7 @@ impl<'a> ProcessBlockStore<'a> {
self.process_options,
self.transaction_status_sender,
self.cache_block_meta_sender.as_ref(),
self.entry_notification_sender,
&self.accounts_background_request_sender,
) {
exit.store(true, Ordering::Relaxed);
Expand Down
1 change: 1 addition & 0 deletions geyser-plugin-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ log = { workspace = true }
serde_json = { workspace = true }
solana-entry = { workspace = true }
solana-geyser-plugin-interface = { workspace = true }
solana-ledger = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-rpc = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion geyser-plugin-manager/src/entry_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use {
solana_geyser_plugin_interface::geyser_plugin_interface::{
ReplicaEntryInfo, ReplicaEntryInfoVersions,
},
solana_ledger::entry_notifier_interface::EntryNotifier,
solana_measure::measure::Measure,
solana_metrics::*,
solana_rpc::entry_notifier_interface::EntryNotifier,
solana_sdk::clock::Slot,
std::sync::{Arc, RwLock},
};
Expand Down
Loading

0 comments on commit 3f70ddb

Please sign in to comment.