diff --git a/Cargo.lock b/Cargo.lock index df8f1e1586134c..54d1cbc4e0b4b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -233,6 +233,7 @@ dependencies = [ "solana-perf", "solana-poh", "solana-program-runtime", + "solana-rayon-threadlimit", "solana-rpc", "solana-rpc-client", "solana-rpc-client-api", diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 4d606fc4e9ed51..9c78461b1b0a81 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -118,6 +118,7 @@ dependencies = [ "solana-perf", "solana-poh", "solana-program-runtime", + "solana-rayon-threadlimit", "solana-rpc", "solana-rpc-client", "solana-rpc-client-api", diff --git a/rpc/src/rpc_pubsub_service.rs b/rpc/src/rpc_pubsub_service.rs index 3e32503691d78e..99155e678675f5 100644 --- a/rpc/src/rpc_pubsub_service.rs +++ b/rpc/src/rpc_pubsub_service.rs @@ -12,10 +12,12 @@ use { jsonrpc_core::IoHandler, soketto::handshake::{server, Server}, solana_metrics::TokenCounter, + solana_rayon_threadlimit::get_thread_count, solana_sdk::timing::AtomicInterval, std::{ io, net::SocketAddr, + num::NonZeroUsize, str, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, @@ -43,7 +45,7 @@ pub struct PubSubConfig { pub queue_capacity_items: usize, pub queue_capacity_bytes: usize, pub worker_threads: usize, - pub notification_threads: Option, + pub notification_threads: Option, } impl Default for PubSubConfig { @@ -55,7 +57,7 @@ impl Default for PubSubConfig { queue_capacity_items: DEFAULT_QUEUE_CAPACITY_ITEMS, queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES, worker_threads: DEFAULT_WORKER_THREADS, - notification_threads: None, + notification_threads: NonZeroUsize::new(get_thread_count()), } } } @@ -69,7 +71,7 @@ impl PubSubConfig { queue_capacity_items: DEFAULT_TEST_QUEUE_CAPACITY_ITEMS, queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES, worker_threads: DEFAULT_WORKER_THREADS, - notification_threads: Some(2), + notification_threads: NonZeroUsize::new(2), } } } diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 7ecfd6a31a42cc..39d746c48049de 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -19,7 +19,6 @@ use { solana_account_decoder::{parse_token::is_known_spl_token_id, UiAccount, UiAccountEncoding}, solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}, solana_measure::measure::Measure, - solana_rayon_threadlimit::get_thread_count, solana_rpc_client_api::response::{ ProcessedSignatureResult, ReceivedSignatureResult, Response as RpcResponse, RpcBlockUpdate, RpcBlockUpdateError, RpcKeyedAccount, RpcLogsResponse, RpcResponseContext, @@ -631,41 +630,37 @@ impl RpcSubscriptions { config.queue_capacity_bytes, )), }; - let notification_threads = config.notification_threads.unwrap_or_else(get_thread_count); - let t_cleanup = if notification_threads == 0 { - None - } else { + + let t_cleanup = config.notification_threads.map(|notification_threads| { let exit = exit.clone(); - Some( - Builder::new() - .name("solRpcNotifier".to_string()) - .spawn(move || { - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(notification_threads) - .thread_name(|i| format!("solRpcNotify{i:02}")) - .build() - .unwrap(); - pool.install(|| { - if let Some(rpc_notifier_ready) = rpc_notifier_ready { - rpc_notifier_ready.fetch_or(true, Ordering::Relaxed); - } - Self::process_notifications( - exit, - max_complete_transaction_status_slot, - max_complete_rewards_slot, - blockstore, - notifier, - notification_receiver, - subscriptions, - bank_forks, - block_commitment_cache, - optimistically_confirmed_bank, - ) - }); - }) - .unwrap(), - ) - }; + Builder::new() + .name("solRpcNotifier".to_string()) + .spawn(move || { + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(notification_threads.get()) + .thread_name(|i| format!("solRpcNotify{i:02}")) + .build() + .unwrap(); + pool.install(|| { + if let Some(rpc_notifier_ready) = rpc_notifier_ready { + rpc_notifier_ready.fetch_or(true, Ordering::Relaxed); + } + Self::process_notifications( + exit, + max_complete_transaction_status_slot, + max_complete_rewards_slot, + blockstore, + notifier, + notification_receiver, + subscriptions, + bank_forks, + block_commitment_cache, + optimistically_confirmed_bank, + ) + }); + }) + .unwrap() + }); let control = SubscriptionControl::new( config.max_active_subscriptions, @@ -674,11 +669,7 @@ impl RpcSubscriptions { ); Self { - notification_sender: if notification_threads == 0 { - None - } else { - Some(notification_sender) - }, + notification_sender: config.notification_threads.map(|_| notification_sender), t_cleanup, exit, control, diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 74742c90faa29d..0a6324f454e2b2 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -50,6 +50,7 @@ solana-net-utils = { workspace = true } solana-perf = { workspace = true } solana-poh = { workspace = true } solana-program-runtime = { workspace = true } +solana-rayon-threadlimit = { workspace = true } solana-rpc = { workspace = true } solana-rpc-client = { workspace = true } solana-rpc-client-api = { workspace = true } diff --git a/validator/src/cli.rs b/validator/src/cli.rs index d1ad63b760f031..e9298d9c02928e 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -26,6 +26,7 @@ use { solana_faucet::faucet::{self, FAUCET_PORT}, solana_ledger::use_snapshot_archives_at_startup, solana_net_utils::{MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE}, + solana_rayon_threadlimit::get_thread_count, solana_rpc::{rpc::MAX_REQUEST_BODY_SIZE, rpc_pubsub_service::PubSubConfig}, solana_rpc_client_api::request::MAX_MULTIPLE_ACCOUNTS, solana_runtime::{ @@ -1079,6 +1080,11 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .takes_value(true) .value_name("NUM_THREADS") .validator(is_parsable::) + .default_value_if( + "full_rpc_api", + None, + &default_args.rpc_pubsub_notification_threads, + ) .help( "The maximum number of threads that RPC PubSub will use for generating \ notifications. 0 will disable RPC PubSub notifications", @@ -2138,6 +2144,7 @@ pub struct DefaultArgs { pub rpc_bigtable_max_message_size: String, pub rpc_max_request_body_size: String, pub rpc_pubsub_worker_threads: String, + pub rpc_pubsub_notification_threads: String, pub maximum_local_snapshot_age: String, pub maximum_full_snapshot_archives_to_retain: String, @@ -2225,6 +2232,7 @@ impl DefaultArgs { rpc_bigtable_max_message_size: solana_storage_bigtable::DEFAULT_MAX_MESSAGE_SIZE .to_string(), rpc_pubsub_worker_threads: "4".to_string(), + rpc_pubsub_notification_threads: get_thread_count().to_string(), maximum_full_snapshot_archives_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN .to_string(), maximum_incremental_snapshot_archives_to_retain: diff --git a/validator/src/main.rs b/validator/src/main.rs index b00eabfef9a7b0..7f3de66b457c74 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1382,11 +1382,9 @@ pub fn main() { usize ), worker_threads: value_t_or_exit!(matches, "rpc_pubsub_worker_threads", usize), - notification_threads: if full_api { - value_of(&matches, "rpc_pubsub_notification_threads") - } else { - Some(0) - }, + notification_threads: value_t!(matches, "rpc_pubsub_notification_threads", usize) + .ok() + .and_then(NonZeroUsize::new), }, voting_disabled: matches.is_present("no_voting") || restricted_repair_only_mode, wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),