diff --git a/core/src/validator.rs b/core/src/validator.rs index fa354f3f758af5..c23f5786caf000 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -737,6 +737,7 @@ impl Validator { block_commitment_cache.clone(), optimistically_confirmed_bank.clone(), &config.pubsub_config, + None, )); let max_slots = Arc::new(MaxSlots::default()); diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 9ee8696d7b5cb7..718fc4d2b78e97 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -559,6 +559,7 @@ impl RpcSubscriptions { block_commitment_cache, optimistically_confirmed_bank, &PubSubConfig::default(), + None, ) } @@ -573,14 +574,13 @@ impl RpcSubscriptions { let blockstore = Blockstore::open(&ledger_path).unwrap(); let blockstore = Arc::new(blockstore); - Self::new_with_config( + Self::new_for_tests_with_blockstore( exit, max_complete_transaction_status_slot, blockstore, bank_forks, block_commitment_cache, optimistically_confirmed_bank, - &PubSubConfig::default_for_tests(), ) } @@ -592,7 +592,9 @@ impl RpcSubscriptions { block_commitment_cache: Arc>, optimistically_confirmed_bank: Arc>, ) -> Self { - Self::new_with_config( + let rpc_notifier_ready = Arc::new(AtomicBool::new(false)); + + let rpc_subscriptions = Self::new_with_config( exit, max_complete_transaction_status_slot, blockstore, @@ -600,7 +602,20 @@ impl RpcSubscriptions { block_commitment_cache, optimistically_confirmed_bank, &PubSubConfig::default_for_tests(), - ) + Some(rpc_notifier_ready.clone()), + ); + + // Ensure RPC notifier is ready to receive notifications before proceeding + let start_time = Instant::now(); + loop { + if rpc_notifier_ready.load(Ordering::Relaxed) { + break; + } else if (Instant::now() - start_time).as_millis() > 5000 { + panic!("RPC notifier thread setup took too long"); + } + } + + rpc_subscriptions } pub fn new_with_config( @@ -611,6 +626,7 @@ impl RpcSubscriptions { block_commitment_cache: Arc>, optimistically_confirmed_bank: Arc>, config: &PubSubConfig, + rpc_notifier_ready: Option>, ) -> Self { let (notification_sender, notification_receiver) = crossbeam_channel::unbounded(); @@ -640,6 +656,9 @@ impl RpcSubscriptions { .build() .unwrap(); pool.install(|| { + if let Some(rpc_notifier_ready) = rpc_notifier_ready { + rpc_notifier_ready.fetch_or(true, Ordering::Relaxed); + } Self::process_notifications( exit_clone, max_complete_transaction_status_slot,