diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 16a1a3cc566492..bf30910f9b39bc 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -12,7 +12,7 @@ use { blockstore_meta::*, blockstore_metrics::BlockstoreRpcApiMetrics, blockstore_options::{ - AccessType, BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL, + BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL, }, blockstore_processor::BlockstoreProcessorError, leader_schedule_cache::LeaderScheduleCache, @@ -90,7 +90,9 @@ pub mod blockstore_purge; use static_assertions::const_assert_eq; pub use { crate::{ - blockstore_db::BlockstoreError, + blockstore_db::{ + default_num_compaction_threads, default_num_flush_threads, BlockstoreError, + }, blockstore_meta::{OptimisticSlotMetaVersioned, SlotMeta}, blockstore_metrics::BlockstoreInsertionMetrics, }, @@ -4961,10 +4963,9 @@ pub fn create_new_ledger( let blockstore = Blockstore::open_with_options( ledger_path, BlockstoreOptions { - access_type: AccessType::Primary, - recovery_mode: None, enforce_ulimit_nofile: false, column_options: column_options.clone(), + ..BlockstoreOptions::default() }, )?; let ticks_per_slot = genesis_config.ticks_per_slot; diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 8eac6622fa3468..39a1742d7b8767 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -37,6 +37,7 @@ use { fs, marker::PhantomData, mem, + num::NonZeroUsize, path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -2032,6 +2033,18 @@ fn get_db_options(access_type: &AccessType) -> Options { options } +/// The default number of threads to use for rocksdb compaction in the rocksdb +/// low priority threadpool +pub fn default_num_compaction_threads() -> NonZeroUsize { + NonZeroUsize::new(num_cpus::get()).expect("thread count is non-zero") +} + +/// The default number of threads to use for rocksdb memtable flushes in the +/// rocksdb high priority threadpool +pub fn default_num_flush_threads() -> NonZeroUsize { + NonZeroUsize::new((num_cpus::get() / 4).max(1)).expect("thread count is non-zero") +} + // Returns whether automatic compactions should be disabled for the entire // database based upon the given access type. fn should_disable_auto_compactions(access_type: &AccessType) -> bool { diff --git a/ledger/src/blockstore_options.rs b/ledger/src/blockstore_options.rs index 977323fe9c3ffc..14b7c88317a078 100644 --- a/ledger/src/blockstore_options.rs +++ b/ledger/src/blockstore_options.rs @@ -1,4 +1,8 @@ -use rocksdb::{DBCompressionType as RocksCompressionType, DBRecoveryMode}; +use { + crate::blockstore_db::{default_num_compaction_threads, default_num_flush_threads}, + rocksdb::{DBCompressionType as RocksCompressionType, DBRecoveryMode}, + std::num::NonZeroUsize, +}; /// The subdirectory under ledger directory where the Blockstore lives pub const BLOCKSTORE_DIRECTORY_ROCKS_LEVEL: &str = "rocksdb"; @@ -13,6 +17,8 @@ pub struct BlockstoreOptions { // desired open file descriptor limit cannot be configured. Default: true. pub enforce_ulimit_nofile: bool, pub column_options: LedgerColumnOptions, + pub num_rocksdb_compaction_threads: NonZeroUsize, + pub num_rocksdb_flush_threads: NonZeroUsize, } impl Default for BlockstoreOptions { @@ -25,6 +31,8 @@ impl Default for BlockstoreOptions { recovery_mode: None, enforce_ulimit_nofile: true, column_options: LedgerColumnOptions::default(), + num_rocksdb_compaction_threads: default_num_compaction_threads(), + num_rocksdb_flush_threads: default_num_flush_threads(), } } } @@ -36,6 +44,8 @@ impl BlockstoreOptions { recovery_mode: None, enforce_ulimit_nofile: false, column_options: LedgerColumnOptions::default(), + num_rocksdb_compaction_threads: default_num_compaction_threads(), + num_rocksdb_flush_threads: default_num_flush_threads(), } } } diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index 589fa7edf598ae..ddf819347c47aa 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -18,6 +18,8 @@ pub struct DefaultThreadArgs { pub rayon_global_threads: String, pub replay_forks_threads: String, pub replay_transactions_threads: String, + pub rocksdb_compaction_threads: String, + pub rocksdb_flush_threads: String, pub tvu_receive_threads: String, pub tvu_sigverify_threads: String, } @@ -36,6 +38,8 @@ impl Default for DefaultThreadArgs { replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(), replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default() .to_string(), + rocksdb_compaction_threads: RocksdbCompactionThreadsArg::bounded_default().to_string(), + rocksdb_flush_threads: RocksdbFlushThreadsArg::bounded_default().to_string(), tvu_receive_threads: TvuReceiveThreadsArg::bounded_default().to_string(), tvu_sigverify_threads: TvuShredSigverifyThreadsArg::bounded_default().to_string(), } @@ -52,6 +56,8 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { new_thread_arg::(&defaults.rayon_global_threads), new_thread_arg::(&defaults.replay_forks_threads), new_thread_arg::(&defaults.replay_transactions_threads), + new_thread_arg::(&defaults.rocksdb_compaction_threads), + new_thread_arg::(&defaults.rocksdb_flush_threads), new_thread_arg::(&defaults.tvu_receive_threads), new_thread_arg::(&defaults.tvu_sigverify_threads), ] @@ -77,6 +83,8 @@ pub struct NumThreadConfig { pub rayon_global_threads: NonZeroUsize, pub replay_forks_threads: NonZeroUsize, pub replay_transactions_threads: NonZeroUsize, + pub rocksdb_compaction_threads: NonZeroUsize, + pub rocksdb_flush_threads: NonZeroUsize, pub tvu_receive_threads: NonZeroUsize, pub tvu_sigverify_threads: NonZeroUsize, } @@ -119,6 +127,16 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { ReplayTransactionsThreadsArg::NAME, NonZeroUsize ), + rocksdb_compaction_threads: value_t_or_exit!( + matches, + RocksdbCompactionThreadsArg::NAME, + NonZeroUsize + ), + rocksdb_flush_threads: value_t_or_exit!( + matches, + RocksdbFlushThreadsArg::NAME, + NonZeroUsize + ), tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize), tvu_sigverify_threads: value_t_or_exit!( matches, @@ -257,6 +275,28 @@ impl ThreadArg for ReplayTransactionsThreadsArg { } } +struct RocksdbCompactionThreadsArg; +impl ThreadArg for RocksdbCompactionThreadsArg { + const NAME: &'static str = "rocksdb_compaction_threads"; + const LONG_NAME: &'static str = "rocksdb-compaction-threads"; + const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) compactions"; + + fn default() -> usize { + solana_ledger::blockstore::default_num_compaction_threads().get() + } +} + +struct RocksdbFlushThreadsArg; +impl ThreadArg for RocksdbFlushThreadsArg { + const NAME: &'static str = "rocksdb_flush_threads"; + const LONG_NAME: &'static str = "rocksdb-flush-threads"; + const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) memtable flushes"; + + fn default() -> usize { + solana_ledger::blockstore::default_num_flush_threads().get() + } +} + struct TvuReceiveThreadsArg; impl ThreadArg for TvuReceiveThreadsArg { const NAME: &'static str = "tvu_receive_threads"; diff --git a/validator/src/main.rs b/validator/src/main.rs index cecf9f873d3019..fc300a83021f87 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -907,6 +907,8 @@ pub fn main() { rayon_global_threads, replay_forks_threads, replay_transactions_threads, + rocksdb_compaction_threads, + rocksdb_flush_threads, tvu_receive_threads, tvu_sigverify_threads, } = cli::thread_args::parse_num_threads_args(&matches); @@ -1055,6 +1057,8 @@ pub fn main() { enforce_ulimit_nofile: true, // The validator needs primary (read/write) access_type: AccessType::Primary, + num_rocksdb_compaction_threads: rocksdb_compaction_threads, + num_rocksdb_flush_threads: rocksdb_flush_threads, }; let accounts_hash_cache_path = matches