Skip to content

Commit

Permalink
Remove RocksDb writer thread and disable WAL
Browse files Browse the repository at this point in the history
Writes are not "sync" temporarily until we use the new RocksDb async abstraction in the follow up PRs.
WAL is disabled as well, this will be configurable follow up PRs.
  • Loading branch information
AhmedSoliman committed Apr 24, 2024
1 parent 060203a commit 1985e78
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 150 deletions.
4 changes: 1 addition & 3 deletions crates/storage-query-datafusion/src/idempotency/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn get_idempotency_key() {
.default_runtime_handle(tokio::runtime::Handle::current())
.build()
.expect("task_center builds");
let (mut engine, shutdown) = tc
let mut engine = tc
.run_in_scope("mock-query-engine", None, MockQueryEngine::create())
.await;

Expand Down Expand Up @@ -96,6 +96,4 @@ async fn get_idempotency_key() {
)
)
);

shutdown.await;
}
4 changes: 1 addition & 3 deletions crates/storage-query-datafusion/src/journal/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn get_entries() {
.default_runtime_handle(tokio::runtime::Handle::current())
.build()
.expect("task_center builds");
let (mut engine, shutdown) = tc
let mut engine = tc
.run_in_scope("mock-query-engine", None, MockQueryEngine::create())
.await;

Expand Down Expand Up @@ -127,6 +127,4 @@ async fn get_entries() {
)
)
);

shutdown.await;
}
19 changes: 5 additions & 14 deletions crates/storage-query-datafusion/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use restate_types::config::{CommonOptions, QueryEngineOptions, WorkerOptions};
use restate_types::identifiers::{DeploymentId, ServiceRevision};
use restate_types::invocation::ServiceType;
use std::fmt::Debug;
use std::future::Future;
use std::marker::PhantomData;

#[derive(Default, Clone, Debug)]
Expand Down Expand Up @@ -87,35 +86,27 @@ impl MockQueryEngine {
+ Debug
+ Clone
+ 'static,
) -> (Self, impl Future<Output = ()>) {
) -> Self {
// Prepare Rocksdb
task_center().run_in_scope_sync("db-manager-init", None, || {
RocksDbManager::init(Constant::new(CommonOptions::default()))
});
let worker_options = WorkerOptions::default();
let (rocksdb, writer) = RocksDBStorage::open(
let rocksdb = RocksDBStorage::open(
Constant::new(worker_options.storage.clone()),
Constant::new(worker_options.storage.rocksdb),
)
.await
.expect("RocksDB storage creation should succeed");
let (signal, watch) = drain::channel();
let writer_join_handle = writer.run(watch);

let query_engine = Self(
Self(
rocksdb.clone(),
QueryContext::from_options(&QueryEngineOptions::default(), rocksdb, status, schemas)
.unwrap(),
);

// Return shutdown future
(query_engine, async {
signal.drain().await;
writer_join_handle.await.unwrap().unwrap();
})
)
}

pub async fn create() -> (Self, impl Future<Output = ()>) {
pub async fn create() -> Self {
Self::create_with(MockStatusHandle::default(), MockSchemas::default()).await
}

Expand Down
4 changes: 1 addition & 3 deletions crates/storage-query-datafusion/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn query_sys_invocation() {
.default_runtime_handle(tokio::runtime::Handle::current())
.build()
.expect("task_center builds");
let (mut engine, shutdown) = tc
let mut engine = tc
.run_in_scope(
"mock-query-engine",
None,
Expand Down Expand Up @@ -117,6 +117,4 @@ async fn query_sys_invocation() {
}
))
);

shutdown.await;
}
8 changes: 1 addition & 7 deletions crates/storage-rocksdb/benches/basic_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@ async fn writing_to_rocksdb(worker_options: WorkerOptions) {
//
// setup
//
let (mut rocksdb, writer) = RocksDBStorage::open(
let mut rocksdb = RocksDBStorage::open(
Constant::new(worker_options.storage.clone()),
Constant::new(worker_options.storage.rocksdb),
)
.await
.expect("RocksDB storage creation should succeed");

let (signal, watch) = drain::channel();
let writer_join_handler = writer.run(watch);

//
// write
//
Expand All @@ -45,9 +42,6 @@ async fn writing_to_rocksdb(worker_options: WorkerOptions) {
}
txn.commit().await.unwrap();
}

signal.drain().await;
writer_join_handler.await.unwrap().unwrap();
}

fn basic_writing_reading_benchmark(c: &mut Criterion) {
Expand Down
43 changes: 16 additions & 27 deletions crates/storage-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ pub mod scan;
pub mod service_status_table;
pub mod state_table;
pub mod timer_table;
mod writer;

use crate::keys::TableKey;
use crate::scan::{PhysicalScan, TableScan};
use crate::writer::{Writer, WriterHandle};
use crate::TableKind::{
Deduplication, Idempotency, Inbox, InvocationStatus, Journal, Outbox, PartitionStateMachine,
ServiceStatus, State, Timers,
Expand All @@ -49,17 +47,12 @@ use rocksdb::PrefixRange;
use rocksdb::ReadOptions;
use std::sync::Arc;

pub use writer::JoinHandle as RocksDBWriterJoinHandle;
pub use writer::Writer as RocksDBWriter;

pub type DB = rocksdb::OptimisticTransactionDB<MultiThreaded>;
type TransactionDB<'a> = rocksdb::Transaction<'a, DB>;

pub type DBIterator<'b> = DBRawIteratorWithThreadMode<'b, DB>;
pub type DBIteratorTransaction<'b> = DBRawIteratorWithThreadMode<'b, rocksdb::Transaction<'b, DB>>;

type WriteBatch = rocksdb::WriteBatchWithTransaction<true>;

// matches the default directory name
const DB_NAME: &str = "db";

Expand Down Expand Up @@ -156,7 +149,6 @@ pub enum BuildError {

pub struct RocksDBStorage {
db: Arc<DB>,
writer_handle: WriterHandle,
key_buffer: BytesMut,
value_buffer: BytesMut,
}
Expand All @@ -165,7 +157,6 @@ impl std::fmt::Debug for RocksDBStorage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RocksDBStorage")
.field("db", &self.db)
.field("writer_handle", &self.writer_handle)
.field("key_buffer", &self.key_buffer)
.field("value_buffer", &self.value_buffer)
.finish()
Expand All @@ -176,7 +167,6 @@ impl Clone for RocksDBStorage {
fn clone(&self) -> Self {
RocksDBStorage {
db: self.db.clone(),
writer_handle: self.writer_handle.clone(),
key_buffer: BytesMut::default(),
value_buffer: BytesMut::default(),
}
Expand Down Expand Up @@ -231,7 +221,7 @@ impl RocksDBStorage {
pub async fn open(
mut storage_opts: impl Updateable<StorageOptions> + Send + 'static,
updateable_opts: impl Updateable<RocksDbOptions> + Send + 'static,
) -> std::result::Result<(Self, Writer), BuildError> {
) -> std::result::Result<Self, BuildError> {
let cfs = vec![
//
// keyed by partition key + user key
Expand Down Expand Up @@ -272,18 +262,11 @@ impl RocksDBStorage {
.await
.map_err(|_| ShutdownError)??;

let writer = Writer::new(rdb.clone(), storage_opts);
let writer_handle = writer.create_writer_handle();

Ok((
Self {
db: rdb,
writer_handle,
key_buffer: BytesMut::default(),
value_buffer: BytesMut::default(),
},
writer,
))
Ok(Self {
db: rdb,
key_buffer: BytesMut::default(),
value_buffer: BytesMut::default(),
})
}

fn table_handle(&self, table_kind: TableKind) -> Arc<BoundColumnFamily> {
Expand Down Expand Up @@ -341,7 +324,6 @@ impl RocksDBStorage {
db,
key_buffer: &mut self.key_buffer,
value_buffer: &mut self.value_buffer,
writer_handle: &self.writer_handle,
}
}
}
Expand Down Expand Up @@ -406,7 +388,6 @@ pub struct RocksDBTransaction<'a> {
db: Arc<DB>,
key_buffer: &'a mut BytesMut,
value_buffer: &'a mut BytesMut,
writer_handle: &'a WriterHandle,
}

impl<'a> RocksDBTransaction<'a> {
Expand Down Expand Up @@ -446,8 +427,16 @@ impl<'a> Transaction for RocksDBTransaction<'a> {
// writes to RocksDB. However, it is safe to write the WriteBatch for a given partition,
// because there can only be a single writer (the leading PartitionProcessor).
let write_batch = self.txn.get_writebatch();

self.writer_handle.write(write_batch).await
// todo: make async and use configuration to control use of WAL
if write_batch.is_empty() {
return Ok(());
}
let mut opts = rocksdb::WriteOptions::default();
// We disable WAL since bifrost is our durable distributed log.
opts.disable_wal(true);
self.db
.write_opt(&write_batch, &rocksdb::WriteOptions::default())
.map_err(|error| StorageError::Generic(error.into()))
}
}

Expand Down
4 changes: 1 addition & 3 deletions crates/storage-rocksdb/tests/idempotency_table_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const IDEMPOTENCY_ID_3: IdempotencyId =

#[tokio::test]
async fn test_idempotency_key() {
let (mut rocksdb, close) = storage_test_environment().await;
let mut rocksdb = storage_test_environment().await;

// Fill in some data
let mut txn = rocksdb.transaction();
Expand Down Expand Up @@ -114,6 +114,4 @@ async fn test_idempotency_key() {
.unwrap(),
None
);

close.await;
}
19 changes: 4 additions & 15 deletions crates/storage-rocksdb/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use restate_types::invocation::{InvocationTarget, ServiceInvocation, Source, Spa
use restate_types::state_mut::ExternalStateMutation;
use std::collections::HashMap;
use std::fmt::Debug;
use std::future::Future;
use std::pin::pin;
use tokio_stream::StreamExt;

Expand All @@ -34,7 +33,7 @@ mod state_table_test;
mod timer_table_test;
mod virtual_object_status_table_test;

async fn storage_test_environment() -> (RocksDBStorage, impl Future<Output = ()>) {
async fn storage_test_environment() -> RocksDBStorage {
//
// create a rocksdb storage from options
//
Expand All @@ -46,25 +45,17 @@ async fn storage_test_environment() -> (RocksDBStorage, impl Future<Output = ()>
RocksDbManager::init(Constant::new(CommonOptions::default()))
});
let worker_options = WorkerOptions::default();
let (rocksdb, writer) = RocksDBStorage::open(
RocksDBStorage::open(
Constant::new(worker_options.storage.clone()),
Constant::new(worker_options.storage.rocksdb),
)
.await
.expect("RocksDB storage creation should succeed");

let (signal, watch) = drain::channel();
let writer_join_handle = writer.run(watch);

(rocksdb, async {
signal.drain().await;
writer_join_handle.await.unwrap().unwrap();
})
.expect("RocksDB storage creation should succeed")
}

#[tokio::test]
async fn test_read_write() {
let (rocksdb, close) = storage_test_environment().await;
let rocksdb = storage_test_environment().await;

//
// run the tests
Expand All @@ -76,8 +67,6 @@ async fn test_read_write() {
invocation_status_table_test::run_tests(rocksdb.clone()).await;
virtual_object_status_table_test::run_tests(rocksdb.clone()).await;
timer_table_test::run_tests(rocksdb).await;

close.await;
}

pub(crate) fn mock_service_invocation(service_id: ServiceId) -> ServiceInvocation {
Expand Down
4 changes: 1 addition & 3 deletions crates/storage-rocksdb/tests/state_table_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub(crate) async fn run_tests(mut rocksdb: RocksDBStorage) {

#[tokio::test]
async fn test_delete_all() {
let (mut rocksdb, close) = storage_test_environment().await;
let mut rocksdb = storage_test_environment().await;

let mut txn = rocksdb.transaction();

Expand Down Expand Up @@ -143,6 +143,4 @@ async fn test_delete_all() {
.await
.expect("should not fail")
.is_some());

close.await;
}
Loading

0 comments on commit 1985e78

Please sign in to comment.