Skip to content

Commit

Permalink
[Perf] Rocksdb performance tuning
Browse files Browse the repository at this point in the history
This also introduces a change to how memory budgeting is configured and the default number of partitions is now 24
  • Loading branch information
AhmedSoliman committed May 31, 2024
1 parent 854947e commit faab766
Show file tree
Hide file tree
Showing 20 changed files with 422 additions and 244 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ prost-build = "0.12.1"
prost-types = "0.12.1"
rand = "0.8.5"
rayon = { version = "1.10" }
rocksdb = { version = "0.22.0", features = ["multi-threaded-cf"], git = "https://github.com/restatedev/rust-rocksdb", rev="c2181f2b5da6d7bc201dc858433ed9e1c4bba4b7" }
rocksdb = { version = "0.22.0", features = ["multi-threaded-cf"], git = "https://github.com/restatedev/rust-rocksdb", rev="64a3c698910380e4fcbd8e56ce459779932cf1ff" }
rustls = "0.21.6"
schemars = { version = "0.8", features = ["bytes", "enumset"] }
serde = { version = "1.0", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ pub fn flamegraph_options<'a>() -> Options<'a> {
pub fn restate_configuration() -> Configuration {
let common_options = CommonOptionsBuilder::default()
.base_dir(tempfile::tempdir().expect("tempdir failed").into_path())
.bootstrap_num_partitions(NonZeroU64::new(10).unwrap())
.build()
.expect("building common options should work");

let worker_options = WorkerOptionsBuilder::default()
.bootstrap_num_partitions(NonZeroU64::new(10).unwrap())
.build()
.expect("building worker options should work");

Expand Down
157 changes: 100 additions & 57 deletions crates/bifrost/src/loglets/local_loglet/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use restate_types::arc_util::Updateable;
use restate_types::config::{LocalLogletOptions, RocksDbOptions};
use restate_types::storage::{StorageDecodeError, StorageEncodeError};
use rocksdb::{BoundColumnFamily, DBCompressionType, SliceTransform, DB};
use static_assertions::const_assert;

use super::keys::{MetadataKey, MetadataKind, DATA_KEY_PREFIX_LENGTH};
use super::log_state::{log_state_full_merge, log_state_partial_merge, LogState};
Expand All @@ -28,6 +29,10 @@ pub(crate) const DB_NAME: &str = "local-loglet";
pub(crate) const DATA_CF: &str = "logstore_data";
pub(crate) const METADATA_CF: &str = "logstore_metadata";

const DATA_CF_BUDGET_RATIO: f64 = 0.85;

const_assert!(DATA_CF_BUDGET_RATIO < 1.0);

#[derive(Debug, Clone, thiserror::Error)]
pub enum LogStoreError {
#[error(transparent)]
Expand Down Expand Up @@ -59,8 +64,14 @@ impl RocksDbLogStore {
let data_dir = options.data_dir();

let db_spec = DbSpecBuilder::new(DbName::new(DB_NAME), data_dir, db_options(options))
.add_cf_pattern(CfExactPattern::new(DATA_CF), cf_data_options)
.add_cf_pattern(CfExactPattern::new(METADATA_CF), cf_metadata_options)
.add_cf_pattern(
CfExactPattern::new(DATA_CF),
cf_data_options(options.rocksdb_memory_budget()),
)
.add_cf_pattern(
CfExactPattern::new(METADATA_CF),
cf_metadata_options(options.rocksdb_memory_budget()),
)
// not very important but it's to reduce the number of merges by flushing.
// it's also a small cf so it should be quick.
.add_to_flush_on_shutdown(CfExactPattern::new(METADATA_CF))
Expand Down Expand Up @@ -101,8 +112,8 @@ impl RocksDbLogStore {
}
}

pub fn create_writer(&self, manual_wal_flush: bool) -> LogStoreWriter {
LogStoreWriter::new(self.rocksdb.clone(), manual_wal_flush)
pub fn create_writer(&self) -> LogStoreWriter {
LogStoreWriter::new(self.rocksdb.clone())
}

pub fn db(&self) -> &DB {
Expand All @@ -112,65 +123,97 @@ impl RocksDbLogStore {

fn db_options(options: &LocalLogletOptions) -> rocksdb::Options {
let mut opts = rocksdb::Options::default();
//
// no need to retain 1000 log files by default.
//
opts.set_keep_log_file_num(10);

if !options.rocksdb.rocksdb_disable_wal() {
opts.set_manual_wal_flush(options.batch_wal_flushes);
}

// unconditionally enable atomic flushes to not persist inconsistent data in case WAL
// enable atomic flushes to not persist inconsistent data in case WAL
// is disabled
opts.set_atomic_flush(true);
if options.rocksdb.rocksdb_disable_wal() {
opts.set_atomic_flush(true);
}

opts
}

// todo: optimize
fn cf_data_options(mut opts: rocksdb::Options) -> rocksdb::Options {
//
// Set compactions per level
//
opts.set_max_write_buffer_number(10);
opts.set_num_levels(7);
opts.set_compression_per_level(&[
DBCompressionType::None,
DBCompressionType::Snappy,
DBCompressionType::Zstd,
DBCompressionType::Zstd,
DBCompressionType::Zstd,
DBCompressionType::Zstd,
DBCompressionType::Zstd,
]);

opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(DATA_KEY_PREFIX_LENGTH));
opts.set_memtable_prefix_bloom_ratio(0.2);
// most reads are sequential
opts.set_advise_random_on_open(false);
//
opts
fn cf_data_options(
memory_budget: usize,
) -> impl Fn(rocksdb::Options) -> rocksdb::Options + Send + Sync + 'static {
move |mut opts| {
// memory budget is in bytes. We divide the budget between the data cf and metadata cf.
// data 10% to metadata 90% to data.
let memtables_budget = (memory_budget as f64 * DATA_CF_BUDGET_RATIO).floor() as usize;
assert!(
memtables_budget > 0,
"memory budget should be greater than 0"
);

set_memory_related_opts(&mut opts, memtables_budget);
opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
opts.set_num_levels(7);

opts.set_compression_per_level(&[
DBCompressionType::None,
DBCompressionType::None,
DBCompressionType::Lz4,
DBCompressionType::Lz4,
DBCompressionType::Lz4,
DBCompressionType::Lz4,
DBCompressionType::Zstd,
]);

opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(DATA_KEY_PREFIX_LENGTH));
opts.set_memtable_prefix_bloom_ratio(0.2);
// most reads are sequential
opts.set_advise_random_on_open(false);
//
opts
}
}

// todo: optimize
fn cf_metadata_options(mut opts: rocksdb::Options) -> rocksdb::Options {
//
// Set compactions per level
//
opts.set_num_levels(3);
opts.set_compression_per_level(&[
DBCompressionType::None,
DBCompressionType::Snappy,
DBCompressionType::Zstd,
]);
opts.set_max_write_buffer_number(10);
opts.set_max_successive_merges(10);
// Merge operator for log state updates
opts.set_merge_operator(
"LogStateMerge",
log_state_full_merge,
log_state_partial_merge,
);
opts
fn set_memory_related_opts(opts: &mut rocksdb::Options, memtables_budget: usize) {
// We set the budget to allow 1 mutable + 3 immutable.
opts.set_write_buffer_size(memtables_budget / 4);

// merge 2 memtables when flushing to L0
opts.set_min_write_buffer_number_to_merge(2);
opts.set_max_write_buffer_number(4);
// start flushing L0->L1 as soon as possible. each file on level0 is
// (memtable_memory_budget / 2). This will flush level 0 when it's bigger than
// memtable_memory_budget.
opts.set_level_zero_file_num_compaction_trigger(2);
// doesn't really matter much, but we don't want to create too many files
opts.set_target_file_size_base(memtables_budget as u64 / 8);
// make Level1 size equal to Level0 size, so that L0->L1 compactions are fast
opts.set_max_bytes_for_level_base(memtables_budget as u64);
}

fn cf_metadata_options(
memory_budget: usize,
) -> impl Fn(rocksdb::Options) -> rocksdb::Options + Send + Sync + 'static {
move |mut opts| {
let memtables_budget =
(memory_budget as f64 * (1.0 - DATA_CF_BUDGET_RATIO)).floor() as usize;
assert!(
memtables_budget > 0,
"memory budget should be greater than 0"
);
set_memory_related_opts(&mut opts, memtables_budget);
//
// Set compactions per level
//
opts.set_num_levels(3);
opts.set_compression_per_level(&[
DBCompressionType::None,
DBCompressionType::None,
DBCompressionType::Lz4,
]);
opts.set_memtable_whole_key_filtering(true);
opts.set_max_write_buffer_number(4);
opts.set_max_successive_merges(10);
// Merge operator for log state updates
opts.set_merge_operator(
"LogStateMerge",
log_state_full_merge,
log_state_partial_merge,
);
opts
}
}
27 changes: 4 additions & 23 deletions crates/bifrost/src/loglets/local_loglet/log_store_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,14 @@ enum DataUpdate {
pub(crate) struct LogStoreWriter {
rocksdb: Arc<RocksDb>,
batch_acks_buf: Vec<Ack>,
manual_wal_flush: bool,
buffer: BytesMut,
}

impl LogStoreWriter {
pub(crate) fn new(rocksdb: Arc<RocksDb>, manual_wal_flush: bool) -> Self {
pub(crate) fn new(rocksdb: Arc<RocksDb>) -> Self {
Self {
rocksdb,
batch_acks_buf: Vec::default(),
manual_wal_flush,
buffer: BytesMut::default(),
}
}
Expand Down Expand Up @@ -245,20 +243,16 @@ impl LogStoreWriter {
async fn commit(&mut self, opts: &LocalLogletOptions, write_batch: WriteBatch) {
let mut write_opts = rocksdb::WriteOptions::new();
write_opts.disable_wal(opts.rocksdb.rocksdb_disable_wal());

if !self.manual_wal_flush && !opts.rocksdb.rocksdb_disable_wal() {
// if we are not manually flushing the wal, we need to configure the sync behaviour
// for the write operation explicitly
write_opts.set_sync(opts.sync_wal_before_ack);
}
// if WAL is enabled, we sync after every write.
write_opts.set_sync(!opts.rocksdb.rocksdb_disable_wal());

trace!(
"Committing local loglet current write batch: {} items",
write_batch.len(),
);
let result = self
.rocksdb
.write_batch(Priority::High, IoMode::Default, write_opts, write_batch)
.write_batch(Priority::High, IoMode::default(), write_opts, write_batch)
.await;

if let Err(e) = result {
Expand All @@ -267,19 +261,6 @@ impl LogStoreWriter {
return;
}

if self.manual_wal_flush {
// WAL flush is done in the foreground, but sync will happen in the background to avoid
// blocking IO.
if let Err(e) = self.rocksdb.flush_wal(opts.sync_wal_before_ack).await {
warn!("Failed to flush rocksdb WAL in local loglet : {}", e);
self.send_acks(Err(Error::LogStoreError(e.into())));
return;
}
if !opts.sync_wal_before_ack {
self.rocksdb.run_bg_wal_sync();
}
}

self.send_acks(Ok(()));
}

Expand Down
10 changes: 2 additions & 8 deletions crates/bifrost/src/loglets/local_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use crate::loglet::{Loglet, LogletOffset, LogletProvider};
use crate::Error;
use crate::ProviderError;

//#[derive(Debug)]
pub struct LocalLogletProvider {
log_store: RocksDbLogStore,
active_loglets: AsyncMutex<HashMap<String, Arc<LocalLoglet>>>,
Expand Down Expand Up @@ -90,13 +89,8 @@ impl LogletProvider for LocalLogletProvider {
}

fn start(&self) -> Result<(), ProviderError> {
let mut updateable = Configuration::mapped_updateable(|c| &c.bifrost.local);
let opts = updateable.load();
let manual_wal_flush = opts.batch_wal_flushes && !opts.rocksdb.rocksdb_disable_wal();
let log_writer = self
.log_store
.create_writer(manual_wal_flush)
.start(updateable)?;
let updateable = Configuration::mapped_updateable(|c| &c.bifrost.local);
let log_writer = self.log_store.create_writer().start(updateable)?;
self.log_writer
.set(log_writer)
.expect("local loglet started once");
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ impl Node {
) -> Result<FixedPartitionTable, Error> {
Self::retry_on_network_error(|| {
metadata_store_client.get_or_insert(PARTITION_TABLE_KEY.clone(), || {
FixedPartitionTable::new(Version::MIN, config.worker.bootstrap_num_partitions())
FixedPartitionTable::new(Version::MIN, config.common.bootstrap_num_partitions())
})
})
.await
Expand Down
4 changes: 4 additions & 0 deletions crates/node/src/network_server/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ const ROCKSDB_DB_PROPERTIES: &[(&str, MetricUnit)] = &[
const ROCKSDB_CF_PROPERTIES: &[(&str, MetricUnit)] = &[
("rocksdb.num-immutable-mem-table", MetricUnit::Count),
("rocksdb.mem-table-flush-pending", MetricUnit::Count),
("rocksdb.is-write-stopped", MetricUnit::Count),
("rocksdb.compaction-pending", MetricUnit::Count),
("rocksdb.background-errors", MetricUnit::Count),
("rocksdb.cur-size-active-mem-table", MetricUnit::Bytes),
Expand All @@ -171,6 +172,9 @@ const ROCKSDB_CF_PROPERTIES: &[(&str, MetricUnit)] = &[
// Add more as needed.
("rocksdb.num-files-at-level2", MetricUnit::Count),
("rocksdb.num-files-at-level3", MetricUnit::Count),
("rocksdb.num-files-at-level4", MetricUnit::Count),
("rocksdb.num-files-at-level5", MetricUnit::Count),
("rocksdb.num-files-at-level6", MetricUnit::Count),
];

// -- Direct HTTP Handlers --
Expand Down
Loading

0 comments on commit faab766

Please sign in to comment.