diff --git a/core/bin/external_node/src/node_builder.rs b/core/bin/external_node/src/node_builder.rs index 883f3f8a5fae..b7f6f8039025 100644 --- a/core/bin/external_node/src/node_builder.rs +++ b/core/bin/external_node/src/node_builder.rs @@ -11,7 +11,9 @@ use zksync_config::{ }, PostgresConfig, }; -use zksync_metadata_calculator::{MetadataCalculatorConfig, MetadataCalculatorRecoveryConfig}; +use zksync_metadata_calculator::{ + MerkleTreeReaderConfig, MetadataCalculatorConfig, MetadataCalculatorRecoveryConfig, +}; use zksync_node_api_server::web3::Namespace; use zksync_node_framework::{ implementations::layers::{ @@ -25,7 +27,7 @@ use zksync_node_framework::{ logs_bloom_backfill::LogsBloomBackfillLayer, main_node_client::MainNodeClientLayer, main_node_fee_params_fetcher::MainNodeFeeParamsFetcherLayer, - metadata_calculator::MetadataCalculatorLayer, + metadata_calculator::{MetadataCalculatorLayer, TreeApiServerLayer}, node_storage_init::{ external_node_strategy::{ExternalNodeInitStrategyLayer, SnapshotRecoveryConfig}, NodeStorageInitializerLayer, @@ -385,6 +387,29 @@ impl ExternalNodeBuilder { Ok(self) } + fn add_isolated_tree_api_layer(mut self) -> anyhow::Result { + let reader_config = MerkleTreeReaderConfig { + db_path: self.config.required.merkle_tree_path.clone(), + max_open_files: self.config.optional.merkle_tree_max_open_files, + multi_get_chunk_size: self.config.optional.merkle_tree_multi_get_chunk_size, + block_cache_capacity: self.config.optional.merkle_tree_block_cache_size(), + include_indices_and_filters_in_block_cache: self + .config + .optional + .merkle_tree_include_indices_and_filters_in_block_cache, + }; + let api_config = MerkleTreeApiConfig { + port: self + .config + .tree_component + .api_port + .context("should contain tree api port")?, + }; + self.node + .add_layer(TreeApiServerLayer::new(reader_config, api_config)); + Ok(self) + } + fn add_tx_sender_layer(mut self) -> anyhow::Result { let postgres_storage_config = PostgresStorageCachesConfig { factory_deps_cache_size: self.config.optional.factory_deps_cache_size() as u64, @@ -607,11 +632,11 @@ impl ExternalNodeBuilder { self = self.add_metadata_calculator_layer(with_tree_api)?; } Component::TreeApi => { - anyhow::ensure!( - components.contains(&Component::Tree), - "Merkle tree API cannot be started without a tree component" - ); - // Do nothing, will be handled by the `Tree` component. + if components.contains(&Component::Tree) { + // Do nothing, will be handled by the `Tree` component. + } else { + self = self.add_isolated_tree_api_layer()?; + } } Component::TreeFetcher => { self = self.add_tree_data_fetcher_layer()?; diff --git a/core/bin/external_node/src/tests/mod.rs b/core/bin/external_node/src/tests/mod.rs index c5dd88748e52..59aceea819f1 100644 --- a/core/bin/external_node/src/tests/mod.rs +++ b/core/bin/external_node/src/tests/mod.rs @@ -17,7 +17,7 @@ mod utils; const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10); const POLL_INTERVAL: Duration = Duration::from_millis(100); -#[test_casing(3, ["all", "core", "api"])] +#[test_casing(4, ["all", "core", "api", "core,tree_api"])] #[tokio::test] #[tracing::instrument] // Add args to the test logs async fn external_node_basics(components_str: &'static str) { @@ -170,40 +170,3 @@ async fn running_tree_without_core_is_not_allowed() { err ); } - -#[tokio::test] -async fn running_tree_api_without_tree_is_not_allowed() { - let _guard = zksync_vlog::ObservabilityBuilder::new().try_build().ok(); // Enable logging to simplify debugging - let (env, _env_handles) = utils::TestEnvironment::with_genesis_block("core,tree_api").await; - - let l2_client = utils::mock_l2_client(&env); - let eth_client = utils::mock_eth_client(env.config.diamond_proxy_address()); - - let node_handle = tokio::task::spawn_blocking(move || { - std::thread::spawn(move || { - let mut node = ExternalNodeBuilder::new(env.config)?; - inject_test_layers( - &mut node, - env.sigint_receiver, - env.app_health_sender, - eth_client, - l2_client, - ); - - // We're only interested in the error, so we drop the result. - node.build(env.components.0.into_iter().collect()).map(drop) - }) - .join() - .unwrap() - }); - - // Check that we cannot build the node without the core component. - let result = node_handle.await.expect("Building the node panicked"); - let err = result.expect_err("Building the node with tree api but without tree should fail"); - assert!( - err.to_string() - .contains("Merkle tree API cannot be started without a tree component"), - "Unexpected errror: {}", - err - ); -} diff --git a/core/lib/merkle_tree/src/domain.rs b/core/lib/merkle_tree/src/domain.rs index a4d577fc3ba5..bb69bda209cc 100644 --- a/core/lib/merkle_tree/src/domain.rs +++ b/core/lib/merkle_tree/src/domain.rs @@ -9,10 +9,11 @@ use crate::{ consistency::ConsistencyError, storage::{PatchSet, Patched, RocksDBWrapper}, types::{ - Key, Root, TreeEntry, TreeEntryWithProof, TreeInstruction, TreeLogEntry, ValueHash, - TREE_DEPTH, + Key, NodeKey, RawNode, Root, TreeEntry, TreeEntryWithProof, TreeInstruction, TreeLogEntry, + ValueHash, TREE_DEPTH, }, BlockOutput, HashTree, MerkleTree, MerkleTreePruner, MerkleTreePrunerHandle, NoVersionError, + PruneDatabase, }; impl TreeInstruction { @@ -444,6 +445,28 @@ impl ZkSyncTreeReader { self.0.entries_with_proofs(version, keys) } + /// Returns raw nodes for the specified `keys`. + pub fn raw_nodes(&self, keys: &[NodeKey]) -> Vec> { + let raw_nodes = self.0.db.raw_nodes(keys).into_iter(); + raw_nodes + .zip(keys) + .map(|(slice, key)| { + let slice = slice?; + Some(if key.is_empty() { + RawNode::deserialize_root(&slice) + } else { + RawNode::deserialize(&slice) + }) + }) + .collect() + } + + /// Returns raw stale keys obsoleted in the specified version of the tree. + pub fn raw_stale_keys(&self, l1_batch_number: L1BatchNumber) -> Vec { + let version = u64::from(l1_batch_number.0); + self.0.db.stale_keys(version) + } + /// Verifies consistency of the tree at the specified L1 batch number. /// /// # Errors diff --git a/core/lib/merkle_tree/src/errors.rs b/core/lib/merkle_tree/src/errors.rs index b8130717f93b..c187ce4977bf 100644 --- a/core/lib/merkle_tree/src/errors.rs +++ b/core/lib/merkle_tree/src/errors.rs @@ -22,6 +22,8 @@ pub enum DeserializeErrorKind { /// Bit mask specifying a child kind in an internal tree node is invalid. #[error("invalid bit mask specifying a child kind in an internal tree node")] InvalidChildKind, + #[error("data left after deserialization")] + Leftovers, /// Missing required tag in the tree manifest. #[error("missing required tag `{0}` in tree manifest")] diff --git a/core/lib/merkle_tree/src/lib.rs b/core/lib/merkle_tree/src/lib.rs index 6f9da59cf0ed..824f23eaf526 100644 --- a/core/lib/merkle_tree/src/lib.rs +++ b/core/lib/merkle_tree/src/lib.rs @@ -82,7 +82,7 @@ mod utils; pub mod unstable { pub use crate::{ errors::DeserializeError, - types::{Manifest, Node, NodeKey, ProfiledTreeOperation, Root}, + types::{Manifest, Node, NodeKey, ProfiledTreeOperation, RawNode, Root}, }; } diff --git a/core/lib/merkle_tree/src/storage/rocksdb.rs b/core/lib/merkle_tree/src/storage/rocksdb.rs index 711ccaa6137e..22335c829404 100644 --- a/core/lib/merkle_tree/src/storage/rocksdb.rs +++ b/core/lib/merkle_tree/src/storage/rocksdb.rs @@ -53,6 +53,23 @@ impl NamedColumnFamily for MerkleTreeColumnFamily { type LocalProfiledOperation = RefCell>>; +/// Unifies keys that can be used to load raw data from RocksDB. +pub(crate) trait ToDbKey: Sync { + fn to_db_key(&self) -> Vec; +} + +impl ToDbKey for NodeKey { + fn to_db_key(&self) -> Vec { + NodeKey::to_db_key(*self) + } +} + +impl ToDbKey for (NodeKey, bool) { + fn to_db_key(&self) -> Vec { + NodeKey::to_db_key(self.0) + } +} + /// Main [`Database`] implementation wrapping a [`RocksDB`] reference. /// /// # Cloning @@ -112,7 +129,7 @@ impl RocksDBWrapper { .expect("Failed reading from RocksDB") } - fn raw_nodes(&self, keys: &NodeKeys) -> Vec>> { + pub(crate) fn raw_nodes(&self, keys: &[T]) -> Vec>> { // Propagate the currently profiled operation to rayon threads used in the parallel iterator below. let profiled_operation = self .profiled_operation @@ -126,7 +143,7 @@ impl RocksDBWrapper { let _guard = profiled_operation .as_ref() .and_then(ProfiledOperation::start_profiling); - let keys = chunk.iter().map(|(key, _)| key.to_db_key()); + let keys = chunk.iter().map(ToDbKey::to_db_key); let results = self.db.multi_get_cf(MerkleTreeColumnFamily::Tree, keys); results .into_iter() @@ -144,9 +161,9 @@ impl RocksDBWrapper { // If we didn't succeed with the patch set, or the key version is old, // access the underlying storage. let node = if is_leaf { - LeafNode::deserialize(raw_node).map(Node::Leaf) + LeafNode::deserialize(raw_node, false).map(Node::Leaf) } else { - InternalNode::deserialize(raw_node).map(Node::Internal) + InternalNode::deserialize(raw_node, false).map(Node::Internal) }; node.map_err(|err| { err.with_context(if is_leaf { @@ -187,7 +204,7 @@ impl Database for RocksDBWrapper { let Some(raw_root) = self.raw_node(&NodeKey::empty(version).to_db_key()) else { return Ok(None); }; - Root::deserialize(&raw_root) + Root::deserialize(&raw_root, false) .map(Some) .map_err(|err| err.with_context(ErrorContext::Root(version))) } diff --git a/core/lib/merkle_tree/src/storage/serialization.rs b/core/lib/merkle_tree/src/storage/serialization.rs index f21fece94e09..d0c573fd8170 100644 --- a/core/lib/merkle_tree/src/storage/serialization.rs +++ b/core/lib/merkle_tree/src/storage/serialization.rs @@ -5,7 +5,7 @@ use std::{collections::HashMap, str}; use crate::{ errors::{DeserializeError, DeserializeErrorKind, ErrorContext}, types::{ - ChildRef, InternalNode, Key, LeafNode, Manifest, Node, Root, TreeTags, ValueHash, + ChildRef, InternalNode, Key, LeafNode, Manifest, Node, RawNode, Root, TreeTags, ValueHash, HASH_SIZE, KEY_SIZE, }, }; @@ -15,7 +15,7 @@ use crate::{ const LEB128_SIZE_ESTIMATE: usize = 3; impl LeafNode { - pub(super) fn deserialize(bytes: &[u8]) -> Result { + pub(super) fn deserialize(bytes: &[u8], strict: bool) -> Result { if bytes.len() < KEY_SIZE + HASH_SIZE { return Err(DeserializeErrorKind::UnexpectedEof.into()); } @@ -26,6 +26,10 @@ impl LeafNode { let leaf_index = leb128::read::unsigned(&mut bytes).map_err(|err| { DeserializeErrorKind::Leb128(err).with_context(ErrorContext::LeafIndex) })?; + if strict && !bytes.is_empty() { + return Err(DeserializeErrorKind::Leftovers.into()); + } + Ok(Self { full_key, value_hash, @@ -105,7 +109,7 @@ impl ChildRef { } impl InternalNode { - pub(super) fn deserialize(bytes: &[u8]) -> Result { + pub(super) fn deserialize(bytes: &[u8], strict: bool) -> Result { if bytes.len() < 4 { let err = DeserializeErrorKind::UnexpectedEof; return Err(err.with_context(ErrorContext::ChildrenMask)); @@ -134,6 +138,9 @@ impl InternalNode { } bitmap >>= 2; } + if strict && !bytes.is_empty() { + return Err(DeserializeErrorKind::Leftovers.into()); + } Ok(this) } @@ -161,8 +168,36 @@ impl InternalNode { } } +impl RawNode { + pub(crate) fn deserialize(bytes: &[u8]) -> Self { + Self { + raw: bytes.to_vec(), + leaf: LeafNode::deserialize(bytes, true).ok(), + internal: InternalNode::deserialize(bytes, true).ok(), + } + } + + pub(crate) fn deserialize_root(bytes: &[u8]) -> Self { + let root = Root::deserialize(bytes, true).ok(); + let node = root.and_then(|root| match root { + Root::Empty => None, + Root::Filled { node, .. } => Some(node), + }); + let (leaf, internal) = match node { + None => (None, None), + Some(Node::Leaf(leaf)) => (Some(leaf), None), + Some(Node::Internal(node)) => (None, Some(node)), + }; + Self { + raw: bytes.to_vec(), + leaf, + internal, + } + } +} + impl Root { - pub(super) fn deserialize(mut bytes: &[u8]) -> Result { + pub(super) fn deserialize(mut bytes: &[u8], strict: bool) -> Result { let leaf_count = leb128::read::unsigned(&mut bytes).map_err(|err| { DeserializeErrorKind::Leb128(err).with_context(ErrorContext::LeafCount) })?; @@ -172,11 +207,11 @@ impl Root { // Try both the leaf and internal node serialization; in some cases, a single leaf // may still be persisted as an internal node. Since serialization of an internal node with a single child // is always shorter than that a leaf, the order (first leaf, then internal node) is chosen intentionally. - LeafNode::deserialize(bytes) + LeafNode::deserialize(bytes, strict) .map(Node::Leaf) - .or_else(|_| InternalNode::deserialize(bytes).map(Node::Internal))? + .or_else(|_| InternalNode::deserialize(bytes, strict).map(Node::Internal))? } - _ => Node::Internal(InternalNode::deserialize(bytes)?), + _ => Node::Internal(InternalNode::deserialize(bytes, strict)?), }; Ok(Self::new(leaf_count, node)) } @@ -440,7 +475,7 @@ mod tests { assert_eq!(buffer[64], 42); // leaf index assert_eq!(buffer.len(), 65); - let leaf_copy = LeafNode::deserialize(&buffer).unwrap(); + let leaf_copy = LeafNode::deserialize(&buffer, true).unwrap(); assert_eq!(leaf_copy, leaf); } @@ -471,7 +506,7 @@ mod tests { let child_count = bitmap.count_ones(); assert_eq!(child_count, 2); - let node_copy = InternalNode::deserialize(&buffer).unwrap(); + let node_copy = InternalNode::deserialize(&buffer, true).unwrap(); assert_eq!(node_copy, node); } @@ -482,7 +517,7 @@ mod tests { root.serialize(&mut buffer); assert_eq!(buffer, [0]); - let root_copy = Root::deserialize(&buffer).unwrap(); + let root_copy = Root::deserialize(&buffer, true).unwrap(); assert_eq!(root_copy, root); } @@ -494,7 +529,7 @@ mod tests { root.serialize(&mut buffer); assert_eq!(buffer[0], 1); - let root_copy = Root::deserialize(&buffer).unwrap(); + let root_copy = Root::deserialize(&buffer, true).unwrap(); assert_eq!(root_copy, root); } @@ -506,7 +541,7 @@ mod tests { root.serialize(&mut buffer); assert_eq!(buffer[0], 2); - let root_copy = Root::deserialize(&buffer).unwrap(); + let root_copy = Root::deserialize(&buffer, true).unwrap(); assert_eq!(root_copy, root); } } diff --git a/core/lib/merkle_tree/src/types/internal.rs b/core/lib/merkle_tree/src/types/internal.rs index 399f6c840a3c..2db075d92212 100644 --- a/core/lib/merkle_tree/src/types/internal.rs +++ b/core/lib/merkle_tree/src/types/internal.rs @@ -2,7 +2,9 @@ //! some of these types are declared as public and can be even exported using the `unstable` module. //! Still, logically these types are private, so adding them to new public APIs etc. is a logical error. -use std::{collections::HashMap, fmt, num::NonZeroU64}; +use std::{collections::HashMap, fmt, num::NonZeroU64, str::FromStr}; + +use anyhow::Context; use crate::{ hasher::{HashTree, InternalNodeCache}, @@ -276,6 +278,34 @@ impl fmt::Debug for Nibbles { } } +impl FromStr for Nibbles { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + anyhow::ensure!(s.len() <= KEY_SIZE * 2, "too many nibbles"); + let mut bytes = NibblesBytes::default(); + for (i, byte) in s.bytes().enumerate() { + let nibble = match byte { + b'0'..=b'9' => byte - b'0', + b'A'..=b'F' => byte - b'A' + 10, + b'a'..=b'f' => byte - b'a' + 10, + _ => anyhow::bail!("unexpected nibble: {byte:?}"), + }; + + assert!(nibble < 16); + if i % 2 == 0 { + bytes[i / 2] = nibble * 16; + } else { + bytes[i / 2] += nibble; + } + } + Ok(Self { + nibble_count: s.len(), + bytes, + }) + } +} + /// Versioned key in a radix-16 Merkle tree. #[derive(Clone, Copy, PartialEq, Eq, Hash)] pub struct NodeKey { @@ -283,12 +313,31 @@ pub struct NodeKey { pub(crate) nibbles: Nibbles, } -impl fmt::Debug for NodeKey { +impl fmt::Display for NodeKey { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { write!(formatter, "{}:{}", self.version, self.nibbles) } } +impl fmt::Debug for NodeKey { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(self, formatter) + } +} + +impl FromStr for NodeKey { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let (version, nibbles) = s + .split_once(':') + .context("node key does not contain `:` delimiter")?; + let version = version.parse().context("invalid key version")?; + let nibbles = nibbles.parse().context("invalid nibbles")?; + Ok(Self { version, nibbles }) + } +} + impl NodeKey { pub(crate) const fn empty(version: u64) -> Self { Self { @@ -331,19 +380,13 @@ impl NodeKey { } } -impl fmt::Display for NodeKey { - fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(formatter, "{}:{}", self.version, self.nibbles) - } -} - /// Leaf node of the tree. #[derive(Debug, Clone, Copy)] #[cfg_attr(test, derive(PartialEq, Eq))] pub struct LeafNode { - pub(crate) full_key: Key, - pub(crate) value_hash: ValueHash, - pub(crate) leaf_index: u64, + pub full_key: Key, + pub value_hash: ValueHash, + pub leaf_index: u64, } impl LeafNode { @@ -364,7 +407,7 @@ impl LeafNode { /// Reference to a child in an [`InternalNode`]. #[derive(Debug, Clone, Copy)] #[cfg_attr(test, derive(PartialEq, Eq))] -pub(crate) struct ChildRef { +pub struct ChildRef { pub hash: ValueHash, pub version: u64, pub is_leaf: bool, @@ -449,7 +492,7 @@ impl InternalNode { self.cache.get_or_insert(cache) } - pub(crate) fn children(&self) -> impl Iterator + '_ { + pub fn children(&self) -> impl Iterator + '_ { self.children.iter() } @@ -510,6 +553,17 @@ impl From for Node { } } +/// Raw node fetched from a database. +#[derive(Debug)] +pub struct RawNode { + /// Bytes for a serialized node. + pub raw: Vec, + /// Leaf if a node can be deserialized into it. + pub leaf: Option, + /// Internal node if a node can be deserialized into it. + pub internal: Option, +} + /// Root node of the tree. Besides a [`Node`], contains the general information about the tree /// (e.g., the number of leaves). #[derive(Debug, Clone)] @@ -614,15 +668,23 @@ mod tests { fn nibbles_and_node_key_display() { let nibbles = Nibbles::new(&TEST_KEY, 5); assert_eq!(nibbles.to_string(), "deadb"); + let restored: Nibbles = nibbles.to_string().parse().unwrap(); + assert_eq!(restored, nibbles); let nibbles = Nibbles::new(&TEST_KEY, 6); assert_eq!(nibbles.to_string(), "deadbe"); + let restored: Nibbles = nibbles.to_string().parse().unwrap(); + assert_eq!(restored, nibbles); let nibbles = Nibbles::new(&TEST_KEY, 9); assert_eq!(nibbles.to_string(), "deadbeef0"); + let restored: Nibbles = nibbles.to_string().parse().unwrap(); + assert_eq!(restored, nibbles); let node_key = nibbles.with_version(3); assert_eq!(node_key.to_string(), "3:deadbeef0"); + let restored: NodeKey = node_key.to_string().parse().unwrap(); + assert_eq!(restored, node_key); } #[test] diff --git a/core/lib/merkle_tree/src/types/mod.rs b/core/lib/merkle_tree/src/types/mod.rs index 807ae0238769..63db4b318b27 100644 --- a/core/lib/merkle_tree/src/types/mod.rs +++ b/core/lib/merkle_tree/src/types/mod.rs @@ -6,7 +6,7 @@ pub(crate) use self::internal::{ ChildRef, Nibbles, NibblesBytes, StaleNodeKey, TreeTags, HASH_SIZE, KEY_SIZE, TREE_DEPTH, }; pub use self::internal::{ - InternalNode, LeafNode, Manifest, Node, NodeKey, ProfiledTreeOperation, Root, + InternalNode, LeafNode, Manifest, Node, NodeKey, ProfiledTreeOperation, RawNode, Root, }; mod internal; diff --git a/core/lib/merkle_tree/tests/integration/domain.rs b/core/lib/merkle_tree/tests/integration/domain.rs index abd3dbbcd3f3..fa7ec4cfde30 100644 --- a/core/lib/merkle_tree/tests/integration/domain.rs +++ b/core/lib/merkle_tree/tests/integration/domain.rs @@ -68,6 +68,31 @@ fn basic_workflow() { tree.verify_consistency(L1BatchNumber(0)).unwrap(); assert_eq!(tree.root_hash(), expected_root_hash); assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(1)); + + let keys = ["0:", "0:0"].map(|key| key.parse().unwrap()); + let raw_nodes = tree.reader().raw_nodes(&keys); + assert_eq!(raw_nodes.len(), 2); + let raw_root = raw_nodes[0].as_ref().unwrap(); + assert!(!raw_root.raw.is_empty()); + assert!(raw_root.internal.is_some()); + assert!(raw_root.leaf.is_none()); + + let raw_node = raw_nodes[1].as_ref().unwrap(); + assert!(!raw_node.raw.is_empty()); + assert!(raw_node.leaf.is_none()); + let raw_node = raw_node.internal.as_ref().unwrap(); + + let (nibble, _) = raw_node + .children() + .find(|(_, child_ref)| child_ref.is_leaf) + .unwrap(); + let leaf_key = format!("0:0{nibble:x}").parse().unwrap(); + let raw_nodes = tree.reader().raw_nodes(&[leaf_key]); + assert_eq!(raw_nodes.len(), 1); + let raw_leaf = raw_nodes.into_iter().next().unwrap().expect("no leaf"); + assert!(!raw_leaf.raw.is_empty()); + assert!(raw_leaf.leaf.is_some()); + assert!(raw_leaf.internal.is_none()); } #[test] diff --git a/core/node/metadata_calculator/src/api_server/metrics.rs b/core/node/metadata_calculator/src/api_server/metrics.rs index d185861d07c6..92f948e09702 100644 --- a/core/node/metadata_calculator/src/api_server/metrics.rs +++ b/core/node/metadata_calculator/src/api_server/metrics.rs @@ -9,6 +9,8 @@ use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Histogram, Metrics pub(super) enum MerkleTreeApiMethod { Info, GetProofs, + GetNodes, + GetStaleKeys, } /// Metrics for Merkle tree API. diff --git a/core/node/metadata_calculator/src/api_server/mod.rs b/core/node/metadata_calculator/src/api_server/mod.rs index 6f46e8aeea81..4612d859a3dd 100644 --- a/core/node/metadata_calculator/src/api_server/mod.rs +++ b/core/node/metadata_calculator/src/api_server/mod.rs @@ -1,6 +1,6 @@ //! Primitive Merkle tree API used internally to fetch proofs. -use std::{fmt, future::Future, net::SocketAddr, pin::Pin}; +use std::{collections::HashMap, fmt, future::Future, net::SocketAddr, pin::Pin}; use anyhow::Context as _; use async_trait::async_trait; @@ -10,12 +10,16 @@ use axum::{ response::{IntoResponse, Response}, routing, Json, Router, }; -use serde::{Deserialize, Serialize}; +use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use tokio::sync::watch; use zksync_crypto_primitives::hasher::blake2::Blake2Hasher; use zksync_health_check::{CheckHealth, Health, HealthStatus}; -use zksync_merkle_tree::NoVersionError; -use zksync_types::{L1BatchNumber, H256, U256}; +use zksync_merkle_tree::{ + unstable::{NodeKey, RawNode}, + NoVersionError, ValueHash, +}; +use zksync_types::{web3, L1BatchNumber, H256, U256}; +use zksync_utils::u256_to_h256; use self::metrics::{MerkleTreeApiMethod, API_METRICS}; use crate::{AsyncTreeReader, LazyAsyncTreeReader, MerkleTreeInfo}; @@ -77,6 +81,117 @@ impl TreeEntryWithProof { } } +#[derive(Debug, PartialEq, Eq, Hash)] +struct HexNodeKey(NodeKey); + +impl Serialize for HexNodeKey { + fn serialize(&self, serializer: S) -> Result { + serializer.serialize_str(&self.0.to_string()) + } +} + +impl<'de> Deserialize<'de> for HexNodeKey { + fn deserialize>(deserializer: D) -> Result { + struct HexNodeKeyVisitor; + + impl de::Visitor<'_> for HexNodeKeyVisitor { + type Value = HexNodeKey; + + fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str("hex-encoded versioned key like `123:c0ffee`") + } + + fn visit_str(self, v: &str) -> Result { + v.parse().map(HexNodeKey).map_err(de::Error::custom) + } + } + + deserializer.deserialize_str(HexNodeKeyVisitor) + } +} + +#[derive(Debug, Serialize)] +struct ApiLeafNode { + full_key: H256, + value_hash: H256, + leaf_index: u64, +} + +#[derive(Debug, Serialize)] +struct ApiChildRef { + hash: ValueHash, + version: u64, + is_leaf: bool, +} + +#[derive(Debug, Serialize)] +#[serde(transparent)] +struct ApiInternalNode(HashMap); + +#[derive(Debug, Serialize)] +struct ApiRawNode { + raw: web3::Bytes, + #[serde(skip_serializing_if = "Option::is_none")] + leaf: Option, + #[serde(skip_serializing_if = "Option::is_none")] + internal: Option, +} + +impl From for ApiRawNode { + fn from(node: RawNode) -> Self { + Self { + raw: web3::Bytes(node.raw), + leaf: node.leaf.map(|leaf| ApiLeafNode { + full_key: u256_to_h256(leaf.full_key), + value_hash: leaf.value_hash, + leaf_index: leaf.leaf_index, + }), + internal: node.internal.map(|internal| { + ApiInternalNode( + internal + .children() + .map(|(nibble, child_ref)| { + let nibble = if nibble < 10 { + b'0' + nibble + } else { + b'a' + nibble - 10 + }; + ( + char::from(nibble), + ApiChildRef { + hash: child_ref.hash, + version: child_ref.version, + is_leaf: child_ref.is_leaf, + }, + ) + }) + .collect(), + ) + }), + } + } +} + +#[derive(Debug, Deserialize)] +struct TreeNodesRequest { + keys: Vec, +} + +#[derive(Debug, Serialize)] +struct TreeNodesResponse { + nodes: HashMap, +} + +#[derive(Debug, Deserialize)] +struct StaleKeysRequest { + l1_batch_number: L1BatchNumber, +} + +#[derive(Debug, Serialize)] +struct StaleKeysResponse { + stale_keys: Vec, +} + /// Server-side tree API error. #[derive(Debug)] enum TreeApiServerError { @@ -343,6 +458,35 @@ impl AsyncTreeReader { Ok(Json(response)) } + async fn get_nodes_handler( + State(this): State, + Json(request): Json, + ) -> Json { + let latency = API_METRICS.latency[&MerkleTreeApiMethod::GetNodes].start(); + let keys: Vec<_> = request.keys.iter().map(|key| key.0).collect(); + let nodes = this.clone().raw_nodes(keys).await; + let nodes = request + .keys + .into_iter() + .zip(nodes) + .filter_map(|(key, node)| Some((key, node?.into()))) + .collect(); + let response = TreeNodesResponse { nodes }; + latency.observe(); + Json(response) + } + + async fn get_stale_keys_handler( + State(this): State, + Json(request): Json, + ) -> Json { + let latency = API_METRICS.latency[&MerkleTreeApiMethod::GetStaleKeys].start(); + let stale_keys = this.clone().raw_stale_keys(request.l1_batch_number).await; + let stale_keys = stale_keys.into_iter().map(HexNodeKey).collect(); + latency.observe(); + Json(StaleKeysResponse { stale_keys }) + } + async fn create_api_server( self, bind_address: &SocketAddr, @@ -353,6 +497,11 @@ impl AsyncTreeReader { let app = Router::new() .route("/", routing::get(Self::info_handler)) .route("/proofs", routing::post(Self::get_proofs_handler)) + .route("/debug/nodes", routing::post(Self::get_nodes_handler)) + .route( + "/debug/stale-keys", + routing::post(Self::get_stale_keys_handler), + ) .with_state(self); let listener = tokio::net::TcpListener::bind(bind_address) @@ -369,8 +518,8 @@ impl AsyncTreeReader { } tracing::info!("Stop signal received, Merkle tree API server is shutting down"); }) - .await - .context("Merkle tree API server failed")?; + .await + .context("Merkle tree API server failed")?; tracing::info!("Merkle tree API server shut down"); Ok(()) diff --git a/core/node/metadata_calculator/src/api_server/tests.rs b/core/node/metadata_calculator/src/api_server/tests.rs index 42a3152e6b53..815522a4cd8e 100644 --- a/core/node/metadata_calculator/src/api_server/tests.rs +++ b/core/node/metadata_calculator/src/api_server/tests.rs @@ -72,11 +72,67 @@ async fn merkle_tree_api() { assert_eq!(err.version_count, 6); assert_eq!(err.missing_version, 10); + let raw_nodes_response = api_client + .inner + .post(format!("http://{local_addr}/debug/nodes")) + .json(&serde_json::json!({ "keys": ["0:", "0:0"] })) + .send() + .await + .unwrap() + .error_for_status() + .unwrap(); + let raw_nodes_response: serde_json::Value = raw_nodes_response.json().await.unwrap(); + assert_raw_nodes_response(&raw_nodes_response); + + let raw_stale_keys_response = api_client + .inner + .post(format!("http://{local_addr}/debug/stale-keys")) + .json(&serde_json::json!({ "l1_batch_number": 1 })) + .send() + .await + .unwrap() + .error_for_status() + .unwrap(); + let raw_stale_keys_response: serde_json::Value = raw_stale_keys_response.json().await.unwrap(); + assert_raw_stale_keys_response(&raw_stale_keys_response); + // Stop the calculator and the tree API server. stop_sender.send_replace(true); api_server_task.await.unwrap().unwrap(); } +fn assert_raw_nodes_response(response: &serde_json::Value) { + let response = response.as_object().expect("not an object"); + let response = response["nodes"].as_object().expect("not an object"); + let root = response["0:"].as_object().expect("not an object"); + assert!( + root.len() == 2 && root.contains_key("internal") && root.contains_key("raw"), + "{root:#?}" + ); + let root = root["internal"].as_object().expect("not an object"); + for key in root.keys() { + assert_eq!(key.len(), 1, "{key}"); + let key = key.as_bytes()[0]; + assert_matches!(key, b'0'..=b'9' | b'a'..=b'f'); + } + + let node = response["0:0"].as_object().expect("not an object"); + assert!( + node.len() == 2 && node.contains_key("internal") && node.contains_key("raw"), + "{node:#?}" + ); +} + +fn assert_raw_stale_keys_response(response: &serde_json::Value) { + let response = response.as_object().expect("not an object"); + let stale_keys = response["stale_keys"].as_array().expect("not an array"); + assert!(!stale_keys.is_empty()); // At least the root is always obsoleted + for stale_key in stale_keys { + let stale_key = stale_key.as_str().expect("not a string"); + stale_key.parse::().unwrap(); + } +} + #[tokio::test] async fn api_client_connection_error() { // Use an address that will definitely fail on a timeout. diff --git a/core/node/metadata_calculator/src/helpers.rs b/core/node/metadata_calculator/src/helpers.rs index b6989afb179f..3f370afaf77e 100644 --- a/core/node/metadata_calculator/src/helpers.rs +++ b/core/node/metadata_calculator/src/helpers.rs @@ -22,6 +22,7 @@ use zksync_health_check::{CheckHealth, Health, HealthStatus, ReactiveHealthCheck use zksync_merkle_tree::{ domain::{TreeMetadata, ZkSyncTree, ZkSyncTreeReader}, recovery::{MerkleTreeRecovery, PersistenceThreadHandle}, + unstable::{NodeKey, RawNode}, Database, Key, MerkleTreeColumnFamily, NoVersionError, RocksDBWrapper, TreeEntry, TreeEntryWithProof, TreeInstruction, }; @@ -35,7 +36,7 @@ use zksync_types::{ use super::{ metrics::{LoadChangesStage, TreeUpdateStage, METRICS}, pruning::PruningHandles, - MetadataCalculatorConfig, MetadataCalculatorRecoveryConfig, + MerkleTreeReaderConfig, MetadataCalculatorConfig, MetadataCalculatorRecoveryConfig, }; /// General information about the Merkle tree. @@ -176,6 +177,40 @@ fn create_db_sync(config: &MetadataCalculatorConfig) -> anyhow::Result anyhow::Result { + tokio::task::spawn_blocking(move || { + let MerkleTreeReaderConfig { + db_path, + max_open_files, + multi_get_chunk_size, + block_cache_capacity, + include_indices_and_filters_in_block_cache, + } = config; + + tracing::info!( + "Initializing Merkle tree database at `{db_path}` (max open files: {max_open_files:?}) with {multi_get_chunk_size} multi-get chunk size, \ + {block_cache_capacity}B block cache (indices & filters included: {include_indices_and_filters_in_block_cache:?})" + ); + let mut db = RocksDB::with_options( + db_path.as_ref(), + RocksDBOptions { + block_cache_capacity: Some(block_cache_capacity), + include_indices_and_filters_in_block_cache, + max_open_files, + ..RocksDBOptions::default() + } + )?; + if cfg!(test) { + db = db.with_sync_writes(); + } + Ok(RocksDBWrapper::from(db)) + }) + .await + .context("panicked creating Merkle tree RocksDB")? +} + /// Wrapper around the "main" tree implementation used by [`MetadataCalculator`]. /// /// Async methods provided by this wrapper are not cancel-safe! This is probably not an issue; @@ -307,6 +342,13 @@ pub struct AsyncTreeReader { } impl AsyncTreeReader { + pub(super) fn new(db: RocksDBWrapper, mode: MerkleTreeMode) -> anyhow::Result { + Ok(Self { + inner: ZkSyncTreeReader::new(db)?, + mode, + }) + } + fn downgrade(&self) -> WeakAsyncTreeReader { WeakAsyncTreeReader { db: self.inner.db().clone().into_inner().downgrade(), @@ -366,6 +408,18 @@ impl AsyncTreeReader { .await .unwrap() } + + pub(crate) async fn raw_nodes(self, keys: Vec) -> Vec> { + tokio::task::spawn_blocking(move || self.inner.raw_nodes(&keys)) + .await + .unwrap() + } + + pub(crate) async fn raw_stale_keys(self, l1_batch_number: L1BatchNumber) -> Vec { + tokio::task::spawn_blocking(move || self.inner.raw_stale_keys(l1_batch_number)) + .await + .unwrap() + } } /// Version of async tree reader that holds a weak reference to RocksDB. Used in [`MerkleTreeHealthCheck`]. diff --git a/core/node/metadata_calculator/src/lib.rs b/core/node/metadata_calculator/src/lib.rs index 451090694b2c..5c64330a0e7d 100644 --- a/core/node/metadata_calculator/src/lib.rs +++ b/core/node/metadata_calculator/src/lib.rs @@ -27,6 +27,7 @@ pub use self::{ helpers::{AsyncTreeReader, LazyAsyncTreeReader, MerkleTreeInfo}, pruning::MerkleTreePruningTask, }; +use crate::helpers::create_readonly_db; pub mod api_server; mod helpers; @@ -264,3 +265,55 @@ impl MetadataCalculator { .await } } + +/// Configuration of [`TreeReaderTask`]. +#[derive(Debug, Clone)] +pub struct MerkleTreeReaderConfig { + /// Filesystem path to the RocksDB instance that stores the tree. + pub db_path: String, + /// Maximum number of files concurrently opened by RocksDB. Useful to fit into OS limits; can be used + /// as a rudimentary way to control RAM usage of the tree. + pub max_open_files: Option, + /// Chunk size for multi-get operations. Can speed up loading data for the Merkle tree on some environments, + /// but the effects vary wildly depending on the setup (e.g., the filesystem used). + pub multi_get_chunk_size: usize, + /// Capacity of RocksDB block cache in bytes. Reasonable values range from ~100 MiB to several GB. + pub block_cache_capacity: usize, + /// If specified, RocksDB indices and Bloom filters will be managed by the block cache, rather than + /// being loaded entirely into RAM on the RocksDB initialization. The block cache capacity should be increased + /// correspondingly; otherwise, RocksDB performance can significantly degrade. + pub include_indices_and_filters_in_block_cache: bool, +} + +/// Alternative to [`MetadataCalculator`] that provides readonly access to the Merkle tree. +#[derive(Debug)] +pub struct TreeReaderTask { + config: MerkleTreeReaderConfig, + tree_reader: watch::Sender>, +} + +impl TreeReaderTask { + /// Creates a new task with the provided configuration. + pub fn new(config: MerkleTreeReaderConfig) -> Self { + Self { + config, + tree_reader: watch::channel(None).0, + } + } + + /// Returns a reference to the tree reader. + pub fn tree_reader(&self) -> LazyAsyncTreeReader { + LazyAsyncTreeReader(self.tree_reader.subscribe()) + } + + /// Runs this task. The task exits on error, or when the tree reader is successfully initialized. + pub async fn run(self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { + let db = tokio::select! { + db_result = create_readonly_db(self.config) => db_result?, + _ = stop_receiver.changed() => return Ok(()), + }; + let reader = AsyncTreeReader::new(db, MerkleTreeMode::Lightweight)?; + self.tree_reader.send_replace(Some(reader)); + Ok(()) + } +} diff --git a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs index 827ec69d9427..4092ee6dcd56 100644 --- a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs +++ b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs @@ -7,7 +7,8 @@ use std::{ use anyhow::Context as _; use zksync_config::configs::{api::MerkleTreeApiConfig, database::MerkleTreeMode}; use zksync_metadata_calculator::{ - LazyAsyncTreeReader, MerkleTreePruningTask, MetadataCalculator, MetadataCalculatorConfig, + LazyAsyncTreeReader, MerkleTreePruningTask, MerkleTreeReaderConfig, MetadataCalculator, + MetadataCalculatorConfig, TreeReaderTask, }; use zksync_storage::RocksDB; @@ -19,7 +20,7 @@ use crate::{ web3_api::TreeApiClientResource, }, service::{ShutdownHook, StopReceiver}, - task::{Task, TaskId}, + task::{Task, TaskId, TaskKind}, wiring_layer::{WiringError, WiringLayer}, FromContext, IntoContext, }; @@ -205,3 +206,65 @@ impl Task for MerkleTreePruningTask { (*self).run(stop_receiver.0).await } } + +/// Mutually exclusive with [`MetadataCalculatorLayer`]. +#[derive(Debug)] +pub struct TreeApiServerLayer { + config: MerkleTreeReaderConfig, + api_config: MerkleTreeApiConfig, +} + +impl TreeApiServerLayer { + pub fn new(config: MerkleTreeReaderConfig, api_config: MerkleTreeApiConfig) -> Self { + Self { config, api_config } + } +} + +#[derive(Debug, IntoContext)] +#[context(crate = crate)] +pub struct TreeApiServerOutput { + tree_api_client: TreeApiClientResource, + #[context(task)] + tree_reader_task: TreeReaderTask, + #[context(task)] + tree_api_task: TreeApiTask, +} + +#[async_trait::async_trait] +impl WiringLayer for TreeApiServerLayer { + type Input = (); + type Output = TreeApiServerOutput; + + fn layer_name(&self) -> &'static str { + "tree_api_server" + } + + async fn wire(self, (): Self::Input) -> Result { + let tree_reader_task = TreeReaderTask::new(self.config); + let bind_addr = (Ipv4Addr::UNSPECIFIED, self.api_config.port).into(); + let tree_api_task = TreeApiTask { + bind_addr, + tree_reader: tree_reader_task.tree_reader(), + }; + Ok(TreeApiServerOutput { + tree_api_client: TreeApiClientResource(Arc::new(tree_reader_task.tree_reader())), + tree_api_task, + tree_reader_task, + }) + } +} + +#[async_trait::async_trait] +impl Task for TreeReaderTask { + fn kind(&self) -> TaskKind { + TaskKind::OneshotTask + } + + fn id(&self) -> TaskId { + "merkle_tree_reader_task".into() + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + (*self).run(stop_receiver.0).await + } +}