Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(db): Improve storage switching for state keeper cache #2234

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 209 additions & 19 deletions core/lib/state/src/catchup.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{sync::Arc, time::Instant};
use std::{error, fmt, time::Instant};

use anyhow::Context;
use once_cell::sync::OnceCell;
use tokio::sync::watch;
use zksync_dal::{ConnectionPool, Core};
use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS};
Expand All @@ -10,6 +9,85 @@ use zksync_types::L1BatchNumber;

use crate::{RocksdbStorage, RocksdbStorageOptions, StateKeeperColumnFamily};

/// Initial RocksDB cache state returned by [`RocksdbCell::ensure_initialized()`].
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct InitialRocksdbState {
/// Last processed L1 batch number in the RocksDB cache + 1 (i.e., the batch that the cache is ready to process).
/// `None` if the cache is empty (i.e., needs recovery).
pub l1_batch_number: Option<L1BatchNumber>,
}

/// Error returned from [`RocksdbCell`] methods if the corresponding [`AsyncCatchupTask`] has failed
/// or was canceled.
#[derive(Debug)]
pub struct AsyncCatchupFailed(());

impl fmt::Display for AsyncCatchupFailed {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("Async RocksDB cache catchup failed or was canceled")
}
}

impl error::Error for AsyncCatchupFailed {}

/// `OnceCell` equivalent that can be `.await`ed. Correspondingly, it has the following invariants:
///
/// - The cell is only set once
/// - The cell is always set to `Some(_)`.
///
/// `OnceCell` (either from `once_cell` or `tokio`) is not used because it lacks a way to wait for the cell
/// to be initialized. `once_cell::sync::OnceCell` has a blocking `wait()` method, but since it's blocking,
/// it risks spawning non-cancellable threads if misused.
type AsyncOnceCell<T> = watch::Receiver<Option<T>>;

/// A lazily initialized handle to RocksDB cache returned from [`AsyncCatchupTask::new()`].
#[derive(Debug)]
pub struct RocksdbCell {
initial_state: AsyncOnceCell<InitialRocksdbState>,
db: AsyncOnceCell<RocksDB<StateKeeperColumnFamily>>,
}

impl RocksdbCell {
/// Waits until RocksDB is initialized and returns it.
///
/// # Errors
///
/// Returns an error if the async catch-up task failed or was canceled before initialization.
#[allow(clippy::missing_panics_doc)] // false positive
pub async fn wait(&self) -> Result<RocksDB<StateKeeperColumnFamily>, AsyncCatchupFailed> {
self.db
.clone()
.wait_for(Option::is_some)
.await
// `unwrap` below is safe by construction
.map(|db| db.clone().unwrap())
.map_err(|_| AsyncCatchupFailed(()))
}

/// Gets a RocksDB instance if it has been initialized.
pub fn get(&self) -> Option<RocksDB<StateKeeperColumnFamily>> {
self.db.borrow().clone()
}

/// Ensures that the RocksDB has started catching up, and returns the **initial** RocksDB state
/// at the start of the catch-up.
///
/// # Errors
///
/// Returns an error if the async catch-up task failed or was canceled.
#[allow(clippy::missing_panics_doc)] // false positive
pub async fn ensure_initialized(&self) -> Result<InitialRocksdbState, AsyncCatchupFailed> {
self.initial_state
.clone()
.wait_for(Option::is_some)
.await
// `unwrap` below is safe by construction
.map(|state| state.clone().unwrap())
.map_err(|_| AsyncCatchupFailed(()))
}
}

/// A runnable task that blocks until the provided RocksDB cache instance is caught up with
/// Postgres.
///
Expand All @@ -19,27 +97,41 @@ pub struct AsyncCatchupTask {
pool: ConnectionPool<Core>,
state_keeper_db_path: String,
state_keeper_db_options: RocksdbStorageOptions,
rocksdb_cell: Arc<OnceCell<RocksDB<StateKeeperColumnFamily>>>,
initial_state_sender: watch::Sender<Option<InitialRocksdbState>>,
db_sender: watch::Sender<Option<RocksDB<StateKeeperColumnFamily>>>,
to_l1_batch_number: Option<L1BatchNumber>,
}

impl AsyncCatchupTask {
/// Create a new catch-up task with the provided Postgres and RocksDB instances. Optionally
/// accepts the last L1 batch number to catch up to (defaults to latest if not specified).
pub fn new(
pool: ConnectionPool<Core>,
state_keeper_db_path: String,
state_keeper_db_options: RocksdbStorageOptions,
rocksdb_cell: Arc<OnceCell<RocksDB<StateKeeperColumnFamily>>>,
to_l1_batch_number: Option<L1BatchNumber>,
) -> Self {
Self {
pub fn new(pool: ConnectionPool<Core>, state_keeper_db_path: String) -> (Self, RocksdbCell) {
let (initial_state_sender, initial_state) = watch::channel(None);
let (db_sender, db) = watch::channel(None);
let this = Self {
pool,
state_keeper_db_path,
state_keeper_db_options,
rocksdb_cell,
to_l1_batch_number,
}
state_keeper_db_options: RocksdbStorageOptions::default(),
initial_state_sender,
db_sender,
to_l1_batch_number: None,
};
(this, RocksdbCell { initial_state, db })
}

/// Sets RocksDB options.
#[must_use]
pub fn with_db_options(mut self, options: RocksdbStorageOptions) -> Self {
self.state_keeper_db_options = options;
self
}

/// Sets the L1 batch number to catch up. By default, the task will catch up to the latest L1 batch
/// (at the start of catch-up).
#[must_use]
pub fn with_target_l1_batch_number(mut self, number: L1BatchNumber) -> Self {
self.to_l1_batch_number = Some(number);
self
}

/// Block until RocksDB cache instance is caught up with Postgres.
Expand All @@ -49,7 +141,10 @@ impl AsyncCatchupTask {
/// Propagates RocksDB and Postgres errors.
pub async fn run(self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let started_at = Instant::now();
tracing::debug!("Catching up RocksDB asynchronously");
tracing::debug!(
"Catching up RocksDB asynchronously to L1 batch #{:?}",
self.to_l1_batch_number
);

slowli marked this conversation as resolved.
Show resolved Hide resolved
let mut rocksdb_builder = RocksdbStorage::builder_with_options(
self.state_keeper_db_path.as_ref(),
Expand All @@ -58,6 +153,12 @@ impl AsyncCatchupTask {
.await
.context("Failed creating RocksDB storage builder")?;

let initial_state = InitialRocksdbState {
l1_batch_number: rocksdb_builder.l1_batch_number().await,
};
tracing::info!("Initialized RocksDB catchup from state: {initial_state:?}");
self.initial_state_sender.send_replace(Some(initial_state));

let mut connection = self.pool.connection_tagged("state_keeper").await?;
let was_recovered_from_snapshot = rocksdb_builder
.ensure_ready(&mut connection, &stop_receiver)
Expand All @@ -76,12 +177,101 @@ impl AsyncCatchupTask {
.context("Failed to catch up RocksDB to Postgres")?;
drop(connection);
if let Some(rocksdb) = rocksdb {
self.rocksdb_cell
.set(rocksdb.into_rocksdb())
.map_err(|_| anyhow::anyhow!("Async RocksDB cache was initialized twice"))?;
self.db_sender.send_replace(Some(rocksdb.into_rocksdb()));
} else {
tracing::info!("Synchronizing RocksDB interrupted");
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use tempfile::TempDir;
use test_casing::test_casing;
use zksync_types::L2BlockNumber;

use super::*;
use crate::{
test_utils::{create_l1_batch, create_l2_block, gen_storage_logs, prepare_postgres},
RocksdbStorageBuilder,
};

#[tokio::test]
async fn catching_up_basics() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut conn = pool.connection().await.unwrap();
prepare_postgres(&mut conn).await;
let storage_logs = gen_storage_logs(20..40);
create_l2_block(&mut conn, L2BlockNumber(1), storage_logs.clone()).await;
create_l1_batch(&mut conn, L1BatchNumber(1), &storage_logs).await;
drop(conn);

let temp_dir = TempDir::new().unwrap();
let (task, rocksdb_cell) =
AsyncCatchupTask::new(pool.clone(), temp_dir.path().to_str().unwrap().to_owned());
let (_stop_sender, stop_receiver) = watch::channel(false);
let task_handle = tokio::spawn(task.run(stop_receiver));

let initial_state = rocksdb_cell.ensure_initialized().await.unwrap();
assert_eq!(initial_state.l1_batch_number, None);

let db = rocksdb_cell.wait().await.unwrap();
assert_eq!(
RocksdbStorageBuilder::from_rocksdb(db)
.l1_batch_number()
.await,
Some(L1BatchNumber(2))
);
task_handle.await.unwrap().unwrap();
drop(rocksdb_cell); // should be enough to release RocksDB lock

let (task, rocksdb_cell) =
AsyncCatchupTask::new(pool, temp_dir.path().to_str().unwrap().to_owned());
let (_stop_sender, stop_receiver) = watch::channel(false);
let task_handle = tokio::spawn(task.run(stop_receiver));

let initial_state = rocksdb_cell.ensure_initialized().await.unwrap();
assert_eq!(initial_state.l1_batch_number, Some(L1BatchNumber(2)));

task_handle.await.unwrap().unwrap();
rocksdb_cell.get().unwrap(); // RocksDB must be caught up at this point
}

#[derive(Debug)]
enum CancellationScenario {
DropTask,
CancelTask,
}

impl CancellationScenario {
const ALL: [Self; 2] = [Self::DropTask, Self::CancelTask];
}

#[test_casing(2, CancellationScenario::ALL)]
#[tokio::test]
async fn catching_up_cancellation(scenario: CancellationScenario) {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut conn = pool.connection().await.unwrap();
prepare_postgres(&mut conn).await;
let storage_logs = gen_storage_logs(20..40);
create_l2_block(&mut conn, L2BlockNumber(1), storage_logs.clone()).await;
create_l1_batch(&mut conn, L1BatchNumber(1), &storage_logs).await;
drop(conn);

let temp_dir = TempDir::new().unwrap();
let (task, rocksdb_cell) =
AsyncCatchupTask::new(pool.clone(), temp_dir.path().to_str().unwrap().to_owned());
let (stop_sender, stop_receiver) = watch::channel(false);
match scenario {
CancellationScenario::DropTask => drop(task),
CancellationScenario::CancelTask => {
stop_sender.send_replace(true);
task.run(stop_receiver).await.unwrap();
}
}

assert!(rocksdb_cell.get().is_none());
rocksdb_cell.wait().await.unwrap_err();
}
}
2 changes: 1 addition & 1 deletion core/lib/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mod test_utils;

pub use self::{
cache::sequential_cache::SequentialCache,
catchup::AsyncCatchupTask,
catchup::{AsyncCatchupTask, RocksdbCell},
in_memory::InMemoryStorage,
// Note, that `test_infra` of the bootloader tests relies on this value to be exposed
in_memory::IN_MEMORY_STORAGE_DEFAULT_NETWORK_ID,
Expand Down
2 changes: 1 addition & 1 deletion core/lib/state/src/storage_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl ReadStorage for RocksdbWithMemory {
}

impl ReadStorage for PgOrRocksdbStorage<'_> {
fn read_value(&mut self, key: &StorageKey) -> zksync_types::StorageValue {
fn read_value(&mut self, key: &StorageKey) -> StorageValue {
match self {
Self::Postgres(postgres) => postgres.read_value(key),
Self::Rocksdb(rocksdb) => rocksdb.read_value(key),
Expand Down
44 changes: 26 additions & 18 deletions core/node/state_keeper/src/state_keeper_storage.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
use std::{fmt::Debug, sync::Arc};
use std::fmt::Debug;

use anyhow::Context;
use async_trait::async_trait;
use once_cell::sync::OnceCell;
use tokio::sync::watch;
use zksync_dal::{ConnectionPool, Core};
use zksync_state::{
AsyncCatchupTask, PgOrRocksdbStorage, ReadStorageFactory, RocksdbStorageOptions,
StateKeeperColumnFamily,
AsyncCatchupTask, PgOrRocksdbStorage, ReadStorageFactory, RocksdbCell, RocksdbStorageOptions,
};
use zksync_storage::RocksDB;
use zksync_types::L1BatchNumber;

/// A [`ReadStorageFactory`] implementation that can produce short-lived [`ReadStorage`] handles
/// backed by either Postgres or RocksDB (if it's caught up). Always initialized as a `Postgres`
/// variant and is then mutated into `Rocksdb` once RocksDB cache is caught up. After which it
/// can never revert back to `Postgres` as we assume RocksDB cannot fall behind under normal state
/// keeper operation.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct AsyncRocksdbCache {
pool: ConnectionPool<Core>,
rocksdb_cell: Arc<OnceCell<RocksDB<StateKeeperColumnFamily>>>,
rocksdb_cell: RocksdbCell,
}

impl AsyncRocksdbCache {
Expand All @@ -29,15 +26,30 @@ impl AsyncRocksdbCache {
stop_receiver: &watch::Receiver<bool>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<PgOrRocksdbStorage<'_>>> {
if let Some(rocksdb) = self.rocksdb_cell.get() {
let initial_state = self.rocksdb_cell.ensure_initialized().await?;
let rocksdb = if initial_state.l1_batch_number >= Some(l1_batch_number) {
tracing::info!(
"RocksDB cache (initial state: {initial_state:?}) doesn't need to catch up to L1 batch #{l1_batch_number}, \
waiting for it to become available"
);
// Opening the cache RocksDB can take a couple of seconds, so if we don't wait here, we unnecessarily miss an opportunity
// to use the cache for an entire batch.
Some(self.rocksdb_cell.wait().await?)
} else {
// This clause includes several cases: if the cache needs catching up or recovery, or if `l1_batch_number`
// is not the first processed L1 batch.
self.rocksdb_cell.get()
};

if let Some(rocksdb) = rocksdb {
let mut connection = self
.pool
.connection_tagged("state_keeper")
.await
.context("Failed getting a Postgres connection")?;
PgOrRocksdbStorage::access_storage_rocksdb(
&mut connection,
rocksdb.clone(),
rocksdb,
stop_receiver,
l1_batch_number,
)
Expand All @@ -57,15 +69,11 @@ impl AsyncRocksdbCache {
state_keeper_db_path: String,
state_keeper_db_options: RocksdbStorageOptions,
) -> (Self, AsyncCatchupTask) {
let rocksdb_cell = Arc::new(OnceCell::new());
let task = AsyncCatchupTask::new(
pool.clone(),
state_keeper_db_path,
state_keeper_db_options,
rocksdb_cell.clone(),
None,
);
(Self { pool, rocksdb_cell }, task)
let (task, rocksdb_cell) = AsyncCatchupTask::new(pool.clone(), state_keeper_db_path);
(
Self { pool, rocksdb_cell },
task.with_db_options(state_keeper_db_options),
)
}
}

Expand Down
Loading
Loading