Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(en): Parallelize persistence and chunk processing during tree recovery #2050

Merged
merged 17 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,12 @@ pub(crate) struct ExperimentalENConfig {
/// of recovery and then restarted with a different config).
#[serde(default = "ExperimentalENConfig::default_snapshots_recovery_tree_chunk_size")]
pub snapshots_recovery_tree_chunk_size: u64,
/// Buffer capacity for parallel persistence operations. Should be reasonably small since larger buffer means more RAM usage;
/// buffer elements are persisted tree chunks. OTOH, small buffer can lead to persistence parallelization being inefficient.
///
/// If not set, parallel persistence will be disabled.
#[serde(default)] // Temporarily use a conservative option (sequential recovery) as default
pub snapshots_recovery_tree_parallel_persistence_buffer: Option<NonZeroUsize>,

// Commitment generator
/// Maximum degree of parallelism during commitment generation, i.e., the maximum number of L1 batches being processed in parallel.
Expand All @@ -778,6 +784,7 @@ impl ExperimentalENConfig {
Self::default_state_keeper_db_block_cache_capacity_mb(),
state_keeper_db_max_open_files: None,
snapshots_recovery_tree_chunk_size: Self::default_snapshots_recovery_tree_chunk_size(),
snapshots_recovery_tree_parallel_persistence_buffer: None,
commitment_generator_max_parallelism: None,
}
}
Expand Down
3 changes: 3 additions & 0 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ async fn run_tree(
stalled_writes_timeout: config.optional.merkle_tree_stalled_writes_timeout(),
recovery: MetadataCalculatorRecoveryConfig {
desired_chunk_size: config.experimental.snapshots_recovery_tree_chunk_size,
parallel_persistence_buffer: config
.experimental
.snapshots_recovery_tree_parallel_persistence_buffer,
},
};

Expand Down
36 changes: 23 additions & 13 deletions core/lib/merkle_tree/examples/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ struct Cli {
/// Perform testing on in-memory DB rather than RocksDB (i.e., with focus on hashing logic).
#[arg(long = "in-memory", short = 'M')]
in_memory: bool,
/// Parallelize DB persistence with processing.
#[arg(long = "parallelize", conflicts_with = "in_memory")]
parallelize: bool,
/// Block cache capacity for RocksDB in bytes.
#[arg(long = "block-cache", conflicts_with = "in_memory")]
block_cache: Option<usize>,
Expand All @@ -52,11 +55,13 @@ impl Cli {
Self::init_logging();
tracing::info!("Launched with options: {self:?}");

let (mut mock_db, mut rocksdb);
let mut _temp_dir = None;
let db: &mut dyn PruneDatabase = if self.in_memory {
mock_db = PatchSet::default();
&mut mock_db
let hasher: &dyn HashTree = if self.no_hashing { &() } else { &Blake2Hasher };
let recovered_version = 123;

if self.in_memory {
let recovery =
MerkleTreeRecovery::with_hasher(PatchSet::default(), recovered_version, hasher)?;
self.recover_tree(recovery, recovered_version)
} else {
let dir = TempDir::new().context("failed creating temp dir for RocksDB")?;
tracing::info!(
Expand All @@ -69,24 +74,29 @@ impl Cli {
};
let db =
RocksDB::with_options(dir.path(), db_options).context("failed creating RocksDB")?;
rocksdb = RocksDBWrapper::from(db);
_temp_dir = Some(dir);
&mut rocksdb
};
let db = RocksDBWrapper::from(db);
let mut recovery = MerkleTreeRecovery::with_hasher(db, recovered_version, hasher)?;
if self.parallelize {
recovery.parallelize_persistence(4)?;
}
self.recover_tree(recovery, recovered_version)
}
}

let hasher: &dyn HashTree = if self.no_hashing { &() } else { &Blake2Hasher };
fn recover_tree<DB: PruneDatabase>(
self,
mut recovery: MerkleTreeRecovery<DB, &dyn HashTree>,
recovered_version: u64,
) -> anyhow::Result<()> {
let mut rng = StdRng::seed_from_u64(self.rng_seed);

let recovered_version = 123;
let key_step =
Key::MAX / (Key::from(self.update_count) * Key::from(self.writes_per_update));
assert!(key_step > Key::from(u64::MAX));
// ^ Total number of generated keys is <2^128.

let mut last_key = Key::zero();
let mut last_leaf_index = 0;
let mut recovery = MerkleTreeRecovery::with_hasher(db, recovered_version, hasher)
.context("cannot create tree")?;
let recovery_started_at = Instant::now();
for updated_idx in 0..self.update_count {
let started_at = Instant::now();
Expand Down
3 changes: 3 additions & 0 deletions core/lib/merkle_tree/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ pub(crate) static PRUNING_TIMINGS: Global<PruningTimings> = Global::new();
pub(crate) enum RecoveryStage {
Extend,
ApplyPatch,
ParallelPersistence,
}

const CHUNK_SIZE_BUCKETS: Buckets = Buckets::values(&[
Expand All @@ -391,6 +392,8 @@ pub(crate) struct RecoveryMetrics {
/// Latency of a specific stage of recovery for a single chunk.
#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
pub stage_latency: Family<RecoveryStage, Histogram<Duration>>,
/// Number of buffered commands if parallel persistence is used.
pub parallel_persistence_buffer_size: Gauge<usize>,
}

#[vise::register]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,21 @@ use std::{collections::HashMap, time::Instant};
use anyhow::Context as _;
use zksync_crypto::hasher::blake2::Blake2Hasher;

pub use crate::storage::PersistenceThreadHandle;
use crate::{
hasher::{HashTree, HasherWithStats},
metrics::{RecoveryStage, RECOVERY_METRICS},
storage::{PatchSet, PruneDatabase, PrunePatchSet, Storage},
storage::{Database, MaybeParallel, PatchSet, PruneDatabase, PrunePatchSet, Storage},
types::{Key, Manifest, Root, TreeEntry, TreeTags, ValueHash},
};

#[cfg(test)]
mod tests;

/// Handle to a Merkle tree during its recovery.
#[derive(Debug)]
pub struct MerkleTreeRecovery<DB, H = Blake2Hasher> {
pub(crate) db: DB,
pub(crate) db: MaybeParallel<DB>,
hasher: H,
recovered_version: u64,
}
Expand Down Expand Up @@ -105,7 +109,7 @@ impl<DB: PruneDatabase, H: HashTree> MerkleTreeRecovery<DB, H> {
db.apply_patch(PatchSet::from_manifest(manifest))?;

Ok(Self {
db,
db: MaybeParallel::Sequential(db),
hasher,
recovered_version,
})
Expand Down Expand Up @@ -257,7 +261,54 @@ impl<DB: PruneDatabase, H: HashTree> MerkleTreeRecovery<DB, H> {
self.db.apply_patch(PatchSet::from_manifest(manifest))?;
tracing::debug!("Updated tree manifest to mark recovery as complete");

Ok(self.db)
self.db.join()
}
}

impl<DB: 'static + Clone + PruneDatabase, H: HashTree> MerkleTreeRecovery<DB, H> {
/// Offloads database persistence to a background thread, so that it can run at the same time as processing of the following chunks.
/// Chunks are still guaranteed to be persisted atomically and in order.
///
/// # Arguments
///
/// - `buffer_capacity` determines how many chunks can be buffered before persistence blocks (i.e., back-pressure).
/// Also controls memory usage, since each chunk translates into a non-trivial database patch (order of 1 kB / entry;
/// i.e., a chunk with 200,000 entries would translate to a 200 MB patch).
///
/// # Return value
///
/// On success, returns a handle allowing to control background persistence thread. For now, it can only be used to emulate persistence crashes;
/// the handle can be dropped otherwise.
///
/// # Safety
///
/// If recovery is interrupted (e.g., its process crashes), then some of the latest chunks may not be persisted,
/// and will need to be processed again. It is **unsound** to restart recovery while a persistence thread may be active;
/// this may lead to a corrupted database state.
///
/// # Errors
///
/// Returns an error if `buffer_capacity` is 0, or if persistence was already parallelized.
pub fn parallelize_persistence(
&mut self,
buffer_capacity: usize,
) -> anyhow::Result<PersistenceThreadHandle> {
anyhow::ensure!(buffer_capacity > 0, "Buffer capacity must be positive");
self.db
.parallelize(self.recovered_version, buffer_capacity)
.context("persistence is already parallelized")
}

/// Waits until all changes in the underlying database are persisted, i.e. all chunks are flushed into it.
/// This is only relevant if [persistence was parallelized](Self::parallelize_persistence()) earlier;
/// otherwise, this method will return immediately.
///
/// # Errors
///
/// Propagates database I/O errors, should they occur during persistence.
pub fn wait_for_persistence(self) -> anyhow::Result<()> {
self.db.join()?;
Ok(())
}
}

Expand All @@ -267,63 +318,3 @@ fn entries_key_range(entries: &[TreeEntry]) -> String {
};
format!("{:0>64x}..={:0>64x}", first.key, last.key)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{hasher::HasherWithStats, types::LeafNode, MerkleTree};

#[test]
fn recovery_for_initialized_tree() {
let mut db = PatchSet::default();
MerkleTreeRecovery::new(&mut db, 123)
.unwrap()
.finalize()
.unwrap();
let err = MerkleTreeRecovery::new(db, 123).unwrap_err().to_string();
assert!(
err.contains("Tree is expected to be in the process of recovery"),
"{err}"
);
}

#[test]
fn recovery_for_different_version() {
let mut db = PatchSet::default();
MerkleTreeRecovery::new(&mut db, 123).unwrap();
let err = MerkleTreeRecovery::new(&mut db, 42)
.unwrap_err()
.to_string();
assert!(
err.contains("Requested to recover tree version 42"),
"{err}"
);
}

#[test]
fn recovering_empty_tree() {
let db = MerkleTreeRecovery::new(PatchSet::default(), 42)
.unwrap()
.finalize()
.unwrap();
let tree = MerkleTree::new(db).unwrap();
assert_eq!(tree.latest_version(), Some(42));
assert_eq!(tree.root(42), Some(Root::Empty));
}

#[test]
fn recovering_tree_with_single_node() {
let mut recovery = MerkleTreeRecovery::new(PatchSet::default(), 42).unwrap();
let recovery_entry = TreeEntry::new(Key::from(123), 1, ValueHash::repeat_byte(1));
recovery.extend_linear(vec![recovery_entry]).unwrap();
let tree = MerkleTree::new(recovery.finalize().unwrap()).unwrap();

assert_eq!(tree.latest_version(), Some(42));
let mut hasher = HasherWithStats::new(&Blake2Hasher);
assert_eq!(
tree.latest_root_hash(),
LeafNode::new(recovery_entry).hash(&mut hasher, 0)
);
tree.verify_consistency(42, true).unwrap();
}
}
56 changes: 56 additions & 0 deletions core/lib/merkle_tree/src/recovery/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use super::*;
use crate::{hasher::HasherWithStats, types::LeafNode, MerkleTree};

#[test]
fn recovery_for_initialized_tree() {
let mut db = PatchSet::default();
MerkleTreeRecovery::new(&mut db, 123)
.unwrap()
.finalize()
.unwrap();
let err = MerkleTreeRecovery::new(db, 123).unwrap_err().to_string();
assert!(
err.contains("Tree is expected to be in the process of recovery"),
"{err}"
);
}

#[test]
fn recovery_for_different_version() {
let mut db = PatchSet::default();
MerkleTreeRecovery::new(&mut db, 123).unwrap();
let err = MerkleTreeRecovery::new(&mut db, 42)
.unwrap_err()
.to_string();
assert!(
err.contains("Requested to recover tree version 42"),
"{err}"
);
}

#[test]
fn recovering_empty_tree() {
let db = MerkleTreeRecovery::new(PatchSet::default(), 42)
.unwrap()
.finalize()
.unwrap();
let tree = MerkleTree::new(db).unwrap();
assert_eq!(tree.latest_version(), Some(42));
assert_eq!(tree.root(42), Some(Root::Empty));
}

#[test]
fn recovering_tree_with_single_node() {
let mut recovery = MerkleTreeRecovery::new(PatchSet::default(), 42).unwrap();
let recovery_entry = TreeEntry::new(Key::from(123), 1, ValueHash::repeat_byte(1));
recovery.extend_linear(vec![recovery_entry]).unwrap();
let tree = MerkleTree::new(recovery.finalize().unwrap()).unwrap();

assert_eq!(tree.latest_version(), Some(42));
let mut hasher = HasherWithStats::new(&Blake2Hasher);
assert_eq!(
tree.latest_root_hash(),
LeafNode::new(recovery_entry).hash(&mut hasher, 0)
);
tree.verify_consistency(42, true).unwrap();
}
7 changes: 6 additions & 1 deletion core/lib/merkle_tree/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
//! Storage-related logic.

pub(crate) use self::patch::{LoadAncestorsResult, WorkingPatchSet};
pub use self::{
database::{Database, NodeKeys, Patched, PruneDatabase, PrunePatchSet},
parallel::PersistenceThreadHandle,
patch::PatchSet,
rocksdb::{MerkleTreeColumnFamily, RocksDBWrapper},
};
pub(crate) use self::{
parallel::MaybeParallel,
patch::{LoadAncestorsResult, WorkingPatchSet},
};
use crate::{
hasher::HashTree,
metrics::{TreeUpdaterStats, BLOCK_TIMINGS, GENERAL_METRICS},
Expand All @@ -16,6 +20,7 @@ use crate::{
};

mod database;
mod parallel;
mod patch;
mod proofs;
mod rocksdb;
Expand Down
Loading
Loading