Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(merkle-tree): Rework tree rollback #2207

Merged
merged 5 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/lib/merkle_tree/src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ impl ZkSyncTree {
self.tree.latest_root_hash()
}

/// Returns the root hash and leaf count at the specified L1 batch.
pub fn root_info(&self, l1_batch_number: L1BatchNumber) -> Option<(ValueHash, u64)> {
let root = self.tree.root(l1_batch_number.0.into())?;
Some((root.hash(&Blake2Hasher), root.leaf_count()))
}

/// Checks whether this tree is empty.
pub fn is_empty(&self) -> bool {
let Some(version) = self.tree.latest_version() else {
Expand Down
4 changes: 2 additions & 2 deletions core/lib/snapshots_applier/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,12 @@ impl ObjectStore for HangingObjectStore {
let mut should_proceed = true;
self.count_sender.send_modify(|count| {
*count += 1;
if dbg!(*count) > self.stop_after_count {
if *count > self.stop_after_count {
should_proceed = false;
}
});

if dbg!(should_proceed) {
if should_proceed {
self.inner.get_raw(bucket, key).await
} else {
future::pending().await // Hang up the snapshot applier task
Expand Down
18 changes: 16 additions & 2 deletions core/node/metadata_calculator/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use zksync_merkle_tree::{
};
use zksync_storage::{RocksDB, RocksDBOptions, StalledWritesRetries, WeakRocksDB};
use zksync_types::{
block::L1BatchHeader, writes::TreeWrite, AccountTreeId, L1BatchNumber, StorageKey, H256,
block::{L1BatchHeader, L1BatchTreeData},
writes::TreeWrite,
AccountTreeId, L1BatchNumber, StorageKey, H256,
};

use super::{
Expand Down Expand Up @@ -233,11 +235,23 @@ impl AsyncTree {
self.as_ref().next_l1_batch_number()
}

pub fn min_l1_batch_number(&self) -> Option<L1BatchNumber> {
self.as_ref().reader().min_l1_batch_number()
}

#[cfg(test)]
pub fn root_hash(&self) -> H256 {
self.as_ref().root_hash()
}

pub fn data_for_l1_batch(&self, l1_batch_number: L1BatchNumber) -> Option<L1BatchTreeData> {
let (hash, leaf_count) = self.as_ref().root_info(l1_batch_number)?;
Some(L1BatchTreeData {
hash,
rollup_last_leaf_index: leaf_count + 1,
})
}

/// Returned errors are unrecoverable; the tree must not be used after an error is returned.
pub async fn process_l1_batch(
&mut self,
Expand Down Expand Up @@ -279,7 +293,7 @@ impl AsyncTree {
Ok(())
}

pub fn revert_logs(&mut self, last_l1_batch_to_keep: L1BatchNumber) -> anyhow::Result<()> {
pub fn roll_back_logs(&mut self, last_l1_batch_to_keep: L1BatchNumber) -> anyhow::Result<()> {
self.as_mut().roll_back_logs(last_l1_batch_to_keep)
}
}
Expand Down
14 changes: 10 additions & 4 deletions core/node/metadata_calculator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl MetadataCalculator {
GenericAsyncTree::new(db, &self.config).await
}

pub async fn run(self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
pub async fn run(self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let tree = self.create_tree().await?;
let tree = tree
.ensure_ready(
Expand All @@ -231,13 +231,19 @@ impl MetadataCalculator {
let Some(mut tree) = tree else {
return Ok(()); // recovery was aborted because a stop signal was received
};

// Set a tree reader before the tree is fully initialized to not wait for the first L1 batch to appear in Postgres.
let tree_reader = tree.reader();
let tree_info = tree_reader.clone().info().await;
self.tree_reader.send_replace(Some(tree_reader));

tree.ensure_consistency(&self.delayer, &self.pool, &mut stop_receiver)
.await?;
if !self.pruning_handles_sender.is_closed() {
// Unlike tree reader, we shouldn't initialize pruning (as a task modifying the tree) before the tree is guaranteed
// to be consistent with Postgres.
self.pruning_handles_sender.send(tree.pruner()).ok();
}
self.tree_reader.send_replace(Some(tree_reader));

let tree_info = tree.reader().info().await;
tracing::info!("Merkle tree is initialized and ready to process L1 batches: {tree_info:?}");
self.health_updater
.update(MerkleTreeHealth::MainLoop(tree_info).into());
Expand Down
200 changes: 195 additions & 5 deletions core/node/metadata_calculator/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{future::Future, ops, panic, path::Path, sync::Arc, time::Duration};
use assert_matches::assert_matches;
use itertools::Itertools;
use tempfile::TempDir;
use test_casing::{test_casing, Product};
use tokio::sync::{mpsc, watch};
use zksync_config::configs::{
chain::OperationsManagerConfig,
Expand All @@ -19,16 +20,18 @@ use zksync_object_store::{MockObjectStore, ObjectStore};
use zksync_prover_interface::inputs::PrepareBasicCircuitsJob;
use zksync_storage::RocksDB;
use zksync_types::{
block::L1BatchHeader, AccountTreeId, Address, L1BatchNumber, L2BlockNumber, StorageKey,
StorageLog, H256,
block::{L1BatchHeader, L1BatchTreeData},
AccountTreeId, Address, L1BatchNumber, L2BlockNumber, StorageKey, StorageLog, H256,
};
use zksync_utils::u32_to_h256;

use super::{
helpers::L1BatchWithLogs, GenericAsyncTree, MetadataCalculator, MetadataCalculatorConfig,
MetadataCalculatorRecoveryConfig,
};
use crate::helpers::{AsyncTree, Delayer};

const POLL_INTERVAL: Duration = Duration::from_millis(50);
const RUN_TIMEOUT: Duration = Duration::from_secs(30);

async fn run_with_timeout<T, F>(timeout: Duration, action: F) -> T
Expand All @@ -47,7 +50,7 @@ pub(super) fn mock_config(db_path: &Path) -> MetadataCalculatorConfig {
db_path: db_path.to_str().unwrap().to_owned(),
max_open_files: None,
mode: MerkleTreeMode::Full,
delay_interval: Duration::from_millis(100),
delay_interval: POLL_INTERVAL,
max_l1_batches_per_iter: 10,
multi_get_chunk_size: 500,
block_cache_capacity: 0,
Expand All @@ -74,6 +77,150 @@ async fn genesis_creation() {
assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(1));
}

#[tokio::test]
async fn low_level_genesis_creation() {
let pool = ConnectionPool::<Core>::test_pool().await;
let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB");
insert_genesis_batch(
&mut pool.connection().await.unwrap(),
&GenesisParams::mock(),
)
.await
.unwrap();
reset_db_state(&pool, 1).await;

let db = RocksDB::new(temp_dir.path()).unwrap();
let mut tree = AsyncTree::new(db.into(), MerkleTreeMode::Lightweight).unwrap();
let (_stop_sender, mut stop_receiver) = watch::channel(false);
tree.ensure_consistency(&Delayer::new(POLL_INTERVAL), &pool, &mut stop_receiver)
.await
.unwrap();

assert!(!tree.is_empty());
assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(1));
}

#[test_casing(8, Product(([1, 4, 7, 9], [false, true])))]
#[tokio::test]
async fn tree_truncation_on_l1_batch_divergence(
last_common_l1_batch: u32,
overwrite_tree_data: bool,
) {
const INITIAL_BATCH_COUNT: usize = 10;

assert!((last_common_l1_batch as usize) < INITIAL_BATCH_COUNT);
let last_common_l1_batch = L1BatchNumber(last_common_l1_batch);

let pool = ConnectionPool::<Core>::test_pool().await;
let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB");
let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await;
reset_db_state(&pool, INITIAL_BATCH_COUNT).await;
run_calculator(calculator).await;

let mut storage = pool.connection().await.unwrap();
remove_l1_batches(&mut storage, last_common_l1_batch).await;
// Extend the state with new L1 batches.
let logs = gen_storage_logs(100..200, 5);
extend_db_state(&mut storage, logs).await;

if overwrite_tree_data {
for number in (last_common_l1_batch.0 + 1)..(last_common_l1_batch.0 + 6) {
let new_tree_data = L1BatchTreeData {
hash: H256::from_low_u64_be(number.into()),
rollup_last_leaf_index: 200, // doesn't matter
};
storage
.blocks_dal()
.save_l1_batch_tree_data(L1BatchNumber(number), &new_tree_data)
.await
.unwrap();
}
}

let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await;
let tree = calculator.create_tree().await.unwrap();
let GenericAsyncTree::Ready(mut tree) = tree else {
panic!("Unexpected tree state: {tree:?}");
};
assert_eq!(
tree.next_l1_batch_number(),
L1BatchNumber(INITIAL_BATCH_COUNT as u32 + 1)
);

let (_stop_sender, mut stop_receiver) = watch::channel(false);
tree.ensure_consistency(&Delayer::new(POLL_INTERVAL), &pool, &mut stop_receiver)
.await
.unwrap();
assert_eq!(tree.next_l1_batch_number(), last_common_l1_batch + 1);
}

#[test_casing(4, [1, 4, 6, 7])]
#[tokio::test]
async fn tree_truncation_on_l1_batch_divergence_in_pruned_tree(retained_l1_batch: u32) {
const INITIAL_BATCH_COUNT: usize = 10;
const LAST_COMMON_L1_BATCH: L1BatchNumber = L1BatchNumber(6);

let retained_l1_batch = L1BatchNumber(retained_l1_batch);

let pool = ConnectionPool::<Core>::test_pool().await;
let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB");
let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await;
reset_db_state(&pool, INITIAL_BATCH_COUNT).await;
run_calculator(calculator).await;

let mut storage = pool.connection().await.unwrap();
remove_l1_batches(&mut storage, LAST_COMMON_L1_BATCH).await;
// Extend the state with new L1 batches.
let logs = gen_storage_logs(100..200, 5);
extend_db_state(&mut storage, logs).await;

for number in (LAST_COMMON_L1_BATCH.0 + 1)..(LAST_COMMON_L1_BATCH.0 + 6) {
let new_tree_data = L1BatchTreeData {
hash: H256::from_low_u64_be(number.into()),
rollup_last_leaf_index: 200, // doesn't matter
};
storage
.blocks_dal()
.save_l1_batch_tree_data(L1BatchNumber(number), &new_tree_data)
.await
.unwrap();
}

let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await;
let tree = calculator.create_tree().await.unwrap();
let GenericAsyncTree::Ready(mut tree) = tree else {
panic!("Unexpected tree state: {tree:?}");
};

let reader = tree.reader();
let (mut pruner, pruner_handle) = tree.pruner();
pruner.set_poll_interval(POLL_INTERVAL);
tokio::task::spawn_blocking(|| pruner.run());
pruner_handle
.set_target_retained_version(retained_l1_batch.0.into())
.unwrap();
// Wait until the tree is pruned
while reader.clone().info().await.min_l1_batch_number < Some(retained_l1_batch) {
tokio::time::sleep(POLL_INTERVAL).await;
}

let (_stop_sender, mut stop_receiver) = watch::channel(false);
let consistency_result = tree
.ensure_consistency(&Delayer::new(POLL_INTERVAL), &pool, &mut stop_receiver)
.await;

if retained_l1_batch <= LAST_COMMON_L1_BATCH {
consistency_result.unwrap();
assert_eq!(tree.next_l1_batch_number(), LAST_COMMON_L1_BATCH + 1);
} else {
let err = consistency_result.unwrap_err();
assert!(
format!("{err:#}").contains("diverging min L1 batch"),
"{err:#}"
);
}
}

#[tokio::test]
async fn basic_workflow() {
let pool = ConnectionPool::<Core>::test_pool().await;
Expand Down Expand Up @@ -279,7 +426,7 @@ async fn shutting_down_calculator() {

let (stop_sx, stop_rx) = watch::channel(false);
let calculator_task = tokio::spawn(calculator.run(stop_rx));
tokio::time::sleep(Duration::from_millis(100)).await;
tokio::time::sleep(POLL_INTERVAL).await;
stop_sx.send_replace(true);
run_with_timeout(RUN_TIMEOUT, calculator_task)
.await
Expand Down Expand Up @@ -342,7 +489,7 @@ async fn test_postgres_backup_recovery(
insert_initial_writes_for_batch(&mut txn, batch_header.number).await;
txn.commit().await.unwrap();
if sleep_between_batches {
tokio::time::sleep(Duration::from_millis(100)).await;
tokio::time::sleep(POLL_INTERVAL).await;
}
}
drop(storage);
Expand Down Expand Up @@ -640,6 +787,23 @@ async fn remove_l1_batches(
batch_headers.push(header.unwrap());
}

let (_, last_l2_block_to_keep) = storage
.blocks_dal()
.get_l2_block_range_of_l1_batch(last_l1_batch_to_keep)
.await
.unwrap()
.expect("L1 batch has no blocks");

storage
.storage_logs_dal()
.roll_back_storage_logs(last_l2_block_to_keep)
.await
.unwrap();
storage
.blocks_dal()
.delete_l2_blocks(last_l2_block_to_keep)
.await
.unwrap();
storage
.blocks_dal()
.delete_l1_batches(last_l1_batch_to_keep)
Expand Down Expand Up @@ -740,3 +904,29 @@ async fn deduplication_works_as_expected() {
assert_eq!(initial_writes[key].0, L1BatchNumber(4));
}
}

#[test_casing(3, [3, 5, 8])]
#[tokio::test]
async fn l1_batch_divergence_entire_workflow(last_common_l1_batch: u32) {
const INITIAL_BATCH_COUNT: usize = 10;

assert!((last_common_l1_batch as usize) < INITIAL_BATCH_COUNT);
let last_common_l1_batch = L1BatchNumber(last_common_l1_batch);

let pool = ConnectionPool::<Core>::test_pool().await;
let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB");
let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await;
reset_db_state(&pool, INITIAL_BATCH_COUNT).await;
run_calculator(calculator).await;

let mut storage = pool.connection().await.unwrap();
remove_l1_batches(&mut storage, last_common_l1_batch).await;
// Extend the state with new L1 batches.
let logs = gen_storage_logs(100..200, 5);
extend_db_state(&mut storage, logs).await;
let expected_root_hash = expected_tree_hash(&pool).await;

let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await;
let final_root_hash = run_calculator(calculator).await;
assert_eq!(final_root_hash, expected_root_hash);
}
Loading
Loading