Skip to content

Commit

Permalink
add alessandrod custom runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffhelius committed Sep 6, 2024
1 parent 91cbe59 commit c98b041
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 14 deletions.
64 changes: 59 additions & 5 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use {
crate::{
max_slots::MaxSlots, optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
parsed_token_accounts::*, rpc_cache::LargestAccountsCache, rpc_health::*,
rpc_service::service_runtime,
},
base64::{prelude::BASE64_STANDARD, Engine},
bincode::{config::Options, serialize},
Expand Down Expand Up @@ -111,6 +112,7 @@ use {
},
time::Duration,
},
tokio::runtime::Runtime,
};

pub mod account_resolver;
Expand All @@ -137,7 +139,7 @@ fn is_finalized(
&& (blockstore.is_root(slot) || bank.status_cache_ancestors().contains(&slot))
}

#[derive(Debug, Default, Clone)]
#[derive(Debug, Clone)]
pub struct JsonRpcConfig {
pub enable_rpc_transaction_history: bool,
pub enable_extended_tx_metadata_storage: bool,
Expand All @@ -148,6 +150,7 @@ pub struct JsonRpcConfig {
pub max_multiple_accounts: Option<usize>,
pub account_indexes: AccountSecondaryIndexes,
pub rpc_threads: usize,
pub rpc_blocking_threads: usize,
pub rpc_niceness_adj: i8,
pub full_api: bool,
pub obsolete_v1_7_api: bool,
Expand All @@ -157,6 +160,28 @@ pub struct JsonRpcConfig {
pub disable_health_check: bool,
}

impl Default for JsonRpcConfig {
fn default() -> Self {
Self {
enable_rpc_transaction_history: Default::default(),
enable_extended_tx_metadata_storage: Default::default(),
faucet_addr: Option::default(),
health_check_slot_distance: Default::default(),
rpc_bigtable_config: Option::default(),
max_multiple_accounts: Option::default(),
account_indexes: AccountSecondaryIndexes::default(),
rpc_threads: 1,
rpc_blocking_threads: 1,
rpc_niceness_adj: Default::default(),
full_api: Default::default(),
obsolete_v1_7_api: Default::default(),
rpc_scan_and_fix_roots: Default::default(),
max_request_body_size: Option::default(),
disable_health_check: Default::default(),
}
}
}

impl JsonRpcConfig {
pub fn default_for_test() -> Self {
Self {
Expand Down Expand Up @@ -211,6 +236,7 @@ pub struct JsonRpcRequestProcessor {
max_complete_transaction_status_slot: Arc<AtomicU64>,
max_complete_rewards_slot: Arc<AtomicU64>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
runtime: Arc<Runtime>,
}
impl Metadata for JsonRpcRequestProcessor {}

Expand Down Expand Up @@ -318,6 +344,7 @@ impl JsonRpcRequestProcessor {
max_complete_transaction_status_slot: Arc<AtomicU64>,
max_complete_rewards_slot: Arc<AtomicU64>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
runtime: Arc<Runtime>,
) -> (Self, Receiver<TransactionInfo>) {
let (sender, receiver) = unbounded();
(
Expand All @@ -340,6 +367,7 @@ impl JsonRpcRequestProcessor {
max_complete_transaction_status_slot,
max_complete_rewards_slot,
prioritization_fee_cache,
runtime,
},
receiver,
)
Expand Down Expand Up @@ -385,8 +413,17 @@ impl JsonRpcRequestProcessor {
let slot = bank.slot();
let optimistically_confirmed_bank =
Arc::new(RwLock::new(OptimisticallyConfirmedBank { bank }));
let config = JsonRpcConfig::default();

let JsonRpcConfig {
rpc_threads,
rpc_blocking_threads,
rpc_niceness_adj,
..
} = config;

Self {
config: JsonRpcConfig::default(),
config,
snapshot_config: None,
bank_forks,
block_commitment_cache: Arc::new(RwLock::new(BlockCommitmentCache::new(
Expand Down Expand Up @@ -414,6 +451,7 @@ impl JsonRpcRequestProcessor {
max_complete_transaction_status_slot: Arc::new(AtomicU64::default()),
max_complete_rewards_slot: Arc::new(AtomicU64::default()),
prioritization_fee_cache: Arc::new(PrioritizationFeeCache::default()),
runtime: service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj),
}
}

Expand Down Expand Up @@ -6834,8 +6872,15 @@ pub mod tests {
.my_contact_info()
.tpu(connection_cache.protocol())
.unwrap();
let config = JsonRpcConfig::default();
let JsonRpcConfig {
rpc_threads,
rpc_blocking_threads,
rpc_niceness_adj,
..
} = config;
let (meta, receiver) = JsonRpcRequestProcessor::new(
JsonRpcConfig::default(),
config,
None,
bank_forks.clone(),
block_commitment_cache,
Expand Down Expand Up @@ -7108,8 +7153,15 @@ pub mod tests {
.unwrap();
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let config = JsonRpcConfig::default();
let JsonRpcConfig {
rpc_threads,
rpc_blocking_threads,
rpc_niceness_adj,
..
} = config;
let (request_processor, receiver) = JsonRpcRequestProcessor::new(
JsonRpcConfig::default(),
config,
None,
bank_forks.clone(),
block_commitment_cache,
Expand All @@ -7126,6 +7178,7 @@ pub mod tests {
Arc::new(AtomicU64::default()),
Arc::new(AtomicU64::default()),
Arc::new(PrioritizationFeeCache::default()),
service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj),
);
SendTransactionService::new::<NullTpuInfo>(
tpu_address,
Expand Down Expand Up @@ -8741,7 +8794,7 @@ pub mod tests {
));

let (meta, _receiver) = JsonRpcRequestProcessor::new(
JsonRpcConfig::default(),
config,
None,
bank_forks.clone(),
block_commitment_cache,
Expand All @@ -8758,6 +8811,7 @@ pub mod tests {
max_complete_transaction_status_slot,
max_complete_rewards_slot,
Arc::new(PrioritizationFeeCache::default()),
service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj),
);

let mut io = MetaIoHandler::default();
Expand Down
47 changes: 38 additions & 9 deletions rpc/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ impl JsonRpcService {
info!("rpc bound to {:?}", rpc_addr);
info!("rpc configuration: {:?}", config);
let rpc_threads = 1.max(config.rpc_threads);
let rpc_blocking_threads = 1.max(config.rpc_blocking_threads);
let rpc_niceness_adj = config.rpc_niceness_adj;

let health = Arc::new(RpcHealth::new(
Expand All @@ -389,15 +390,8 @@ impl JsonRpcService {
// So create a (shared) multi-threaded event_loop for jsonrpc and set its .threads() to 1,
// so that we avoid the single-threaded event loops from being created automatically by
// jsonrpc for threads when .threads(N > 1) is given.
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(rpc_threads)
.on_thread_start(move || renice_this_thread(rpc_niceness_adj).unwrap())
.thread_name("solRpcEl")
.enable_all()
.build()
.expect("Runtime"),
);

let runtime = service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj);

let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false));

Expand Down Expand Up @@ -476,6 +470,7 @@ impl JsonRpcService {
max_complete_transaction_status_slot,
max_complete_rewards_slot,
prioritization_fee_cache,
Arc::clone(&runtime),
);

let leader_info =
Expand Down Expand Up @@ -583,6 +578,40 @@ impl JsonRpcService {
}
}

pub fn service_runtime(
rpc_threads: usize,
rpc_blocking_threads: usize,
rpc_niceness_adj: i8,
) -> Arc<tokio::runtime::Runtime> {
// The jsonrpc_http_server crate supports two execution models:
//
// - By default, it spawns a number of threads - configured with .threads(N) - and runs a
// single-threaded futures executor in each thread.
// - Alternatively when configured with .event_loop_executor(executor) and .threads(1),
// it executes all the tasks on the given executor, not spawning any extra internal threads.
//
// We use the latter configuration, using a multi threaded tokio runtime as the executor. We
// do this so we can configure the number of worker threads, the number of blocking threads
// and then use tokio::task::spawn_blocking() to avoid blocking the worker threads on CPU
// bound operations like getMultipleAccounts. This results in reduced latency, since fast
// rpc calls (the majority) are not blocked by slow CPU bound ones.
//
// NB: `rpc_blocking_threads` shouldn't be set too high (defaults to num_cpus / 2). Too many
// (busy) blocking threads could compete with CPU time with other validator threads and
// negatively impact performance.
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(rpc_threads)
.max_blocking_threads(rpc_blocking_threads)
.on_thread_start(move || renice_this_thread(rpc_niceness_adj).unwrap())
.thread_name("solRpcEl")
.enable_all()
.build()
.expect("Runtime"),
);
runtime
}

#[cfg(test)]
mod tests {
use {
Expand Down
11 changes: 11 additions & 0 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,15 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.default_value(&default_args.rpc_threads)
.help("Number of threads to use for servicing RPC requests"),
)
.arg(
Arg::with_name("rpc_blocking_threads")
.long("rpc-blocking-threads")
.value_name("NUMBER")
.validator(is_parsable::<usize>)
.takes_value(true)
.default_value(&default_args.rpc_blocking_threads)
.help("Number of blocking threads to use for servicing CPU bound RPC requests (eg getMultipleAccounts)"),
)
.arg(
Arg::with_name("rpc_niceness_adj")
.long("rpc-niceness-adjustment")
Expand Down Expand Up @@ -1990,6 +1999,7 @@ pub struct DefaultArgs {
pub rpc_send_transaction_service_max_retries: String,
pub rpc_send_transaction_batch_size: String,
pub rpc_threads: String,
pub rpc_blocking_threads: String,
pub rpc_niceness_adjustment: String,
pub rpc_bigtable_timeout: String,
pub rpc_bigtable_instance_name: String,
Expand Down Expand Up @@ -2075,6 +2085,7 @@ impl DefaultArgs {
.batch_size
.to_string(),
rpc_threads: num_cpus::get().to_string(),
rpc_blocking_threads: (num_cpus::get() / 2).to_string(),
rpc_niceness_adjustment: "0".to_string(),
rpc_bigtable_timeout: "30".to_string(),
rpc_bigtable_instance_name: solana_storage_bigtable::DEFAULT_INSTANCE_NAME.to_string(),
Expand Down
1 change: 1 addition & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,7 @@ pub fn main() {
),
disable_health_check: false,
rpc_threads: value_t_or_exit!(matches, "rpc_threads", usize),
rpc_blocking_threads: value_t_or_exit!(matches, "rpc_blocking_threads", usize),
rpc_niceness_adj: value_t_or_exit!(matches, "rpc_niceness_adj", i8),
account_indexes: account_indexes.clone(),
rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"),
Expand Down

0 comments on commit c98b041

Please sign in to comment.