Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(en): Switch EN to use node framework #2427

Merged
merged 15 commits into from
Jul 12, 2024
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/bin/external_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ zksync_concurrency.workspace = true
zksync_consensus_roles.workspace = true
vise.workspace = true

async-trait.workspace = true
anyhow.workspace = true
tokio = { workspace = true, features = ["full"] }
futures.workspace = true
Expand Down
25 changes: 12 additions & 13 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ use zksync_config::configs::{api::MerkleTreeApiConfig, database::MerkleTreeMode}
use zksync_consistency_checker::ConsistencyChecker;
use zksync_core_leftovers::setup_sigint_handler;
use zksync_dal::{metrics::PostgresMetrics, ConnectionPool, Core};
use zksync_db_connection::{
connection_pool::ConnectionPoolBuilder, healthcheck::ConnectionPoolHealthCheck,
};
use zksync_db_connection::connection_pool::ConnectionPoolBuilder;
use zksync_health_check::{AppHealthCheck, HealthStatus, ReactiveHealthCheck};
use zksync_metadata_calculator::{
api_server::{TreeApiClient, TreeApiHttpClient},
Expand Down Expand Up @@ -105,7 +103,6 @@ async fn build_state_keeper(
Box::new(main_node_client.for_component("external_io")),
chain_id,
)
.await
.context("Failed initializing I/O for external node state keeper")?;

Ok(ZkSyncStateKeeper::new(
Expand Down Expand Up @@ -725,10 +722,6 @@ struct Cli {
external_node_config_path: Option<std::path::PathBuf>,
/// Path to the yaml with consensus.
consensus_path: Option<std::path::PathBuf>,

/// Run the node using the node framework.
#[arg(long)]
use_node_framework: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's incompatible, have you checked, that we don't use it in gitops ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course. EN wasn't fully ported to the framework until this PR, it wouldn't have worked with it

}

#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)]
Expand Down Expand Up @@ -825,8 +818,11 @@ async fn main() -> anyhow::Result<()> {
.await
.context("failed fetching remote part of node config from main node")?;

// Can be used to force the old approach to the external node.
let force_old_approach = std::env::var("EXTERNAL_NODE_OLD_APPROACH").is_ok();

// If the node framework is used, run the node.
if opt.use_node_framework {
if !force_old_approach {
popzxc marked this conversation as resolved.
Show resolved Hide resolved
// We run the node from a different thread, since the current thread is in tokio context.
std::thread::spawn(move || {
let node =
Expand All @@ -840,6 +836,8 @@ async fn main() -> anyhow::Result<()> {
return Ok(());
}

tracing::info!("Running the external node in the old approach");

if let Some(threshold) = config.optional.slow_query_threshold() {
ConnectionPool::<Core>::global_config().set_slow_query_threshold(threshold)?;
}
Expand All @@ -848,7 +846,11 @@ async fn main() -> anyhow::Result<()> {
}

RUST_METRICS.initialize();
EN_METRICS.observe_config(&config);
EN_METRICS.observe_config(
config.required.l1_chain_id,
config.required.l2_chain_id,
config.postgres.max_connections,
);

let singleton_pool_builder = ConnectionPool::singleton(config.postgres.database_url());
let connection_pool = ConnectionPool::<Core>::builder(
Expand Down Expand Up @@ -911,9 +913,6 @@ async fn run_node(
app_health.insert_custom_component(Arc::new(MainNodeHealthCheck::from(
main_node_client.clone(),
)))?;
app_health.insert_custom_component(Arc::new(ConnectionPoolHealthCheck::new(
connection_pool.clone(),
)))?;

// Start the health check server early into the node lifecycle so that its health can be monitored from the very start.
let healthcheck_handle = HealthCheckHandle::spawn_server(
Expand Down
82 changes: 82 additions & 0 deletions core/bin/external_node/src/metrics/framework.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::time::Duration;

use zksync_dal::{ConnectionPool, Core, CoreDal as _};
use zksync_node_framework::{
implementations::resources::pools::{MasterPool, PoolResource},
FromContext, IntoContext, StopReceiver, Task, TaskId, WiringError, WiringLayer,
};
use zksync_shared_metrics::rustc::RUST_METRICS;
use zksync_types::{L1ChainId, L2ChainId};

use super::EN_METRICS;

#[derive(Debug)]
pub struct ExternalNodeMetricsLayer {
popzxc marked this conversation as resolved.
Show resolved Hide resolved
pub l1_chain_id: L1ChainId,
pub l2_chain_id: L2ChainId,
pub postgres_pool_size: u32,
}

#[derive(Debug, FromContext)]
pub struct Input {
pub master_pool: PoolResource<MasterPool>,
}

#[derive(Debug, IntoContext)]
pub struct Output {
#[context(task)]
pub task: ProtocolVersionMetricsTask,
}

#[async_trait::async_trait]
impl WiringLayer for ExternalNodeMetricsLayer {
type Input = Input;
type Output = Output;

fn layer_name(&self) -> &'static str {
"external_node_metrics"
}

async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
RUST_METRICS.initialize();
EN_METRICS.observe_config(self.l1_chain_id, self.l2_chain_id, self.postgres_pool_size);

let pool = input.master_pool.get_singleton().await?;
let task = ProtocolVersionMetricsTask { pool };
Ok(Output { task })
}
}

#[derive(Debug)]
pub struct ProtocolVersionMetricsTask {
pool: ConnectionPool<Core>,
}

#[async_trait::async_trait]
impl Task for ProtocolVersionMetricsTask {
fn id(&self) -> TaskId {
"en_protocol_version_metrics".into()
}

async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
const QUERY_INTERVAL: Duration = Duration::from_secs(10);

while !*stop_receiver.0.borrow_and_update() {
let maybe_protocol_version = self
.pool
.connection()
.await?
.protocol_versions_dal()
.last_used_version_id()
.await;
if let Some(version) = maybe_protocol_version {
EN_METRICS.protocol_version.set(version as u64);
}

tokio::time::timeout(QUERY_INTERVAL, stop_receiver.0.changed())
.await
.ok();
}
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ use std::time::Duration;
use tokio::sync::watch;
use vise::{EncodeLabelSet, Gauge, Info, Metrics};
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_types::{L1ChainId, L2ChainId};

use crate::{config::ExternalNodeConfig, metadata::SERVER_VERSION};
use crate::metadata::SERVER_VERSION;

pub(crate) mod framework;

/// Immutable EN parameters that affect multiple components.
#[derive(Debug, Clone, Copy, EncodeLabelSet)]
Expand All @@ -26,12 +29,17 @@ pub(crate) struct ExternalNodeMetrics {
}

impl ExternalNodeMetrics {
pub(crate) fn observe_config(&self, config: &ExternalNodeConfig) {
pub(crate) fn observe_config(
&self,
l1_chain_id: L1ChainId,
l2_chain_id: L2ChainId,
postgres_pool_size: u32,
) {
let info = ExternalNodeInfo {
server_version: SERVER_VERSION,
l1_chain_id: config.required.l1_chain_id.0,
l2_chain_id: config.required.l2_chain_id.as_u64(),
postgres_pool_size: config.postgres.max_connections,
l1_chain_id: l1_chain_id.0,
l2_chain_id: l2_chain_id.as_u64(),
postgres_pool_size,
};
tracing::info!("Setting general node information: {info:?}");

Expand Down
50 changes: 40 additions & 10 deletions core/bin/external_node/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! as well as an interface to run the node with the specified components.

use anyhow::Context as _;
use zksync_block_reverter::NodeRole;
use zksync_config::{
configs::{
api::{HealthCheckConfig, MerkleTreeApiConfig},
Expand All @@ -15,6 +16,7 @@ use zksync_node_api_server::{tx_sender::ApiContracts, web3::Namespace};
use zksync_node_framework::{
implementations::layers::{
batch_status_updater::BatchStatusUpdaterLayer,
block_reverter::BlockReverterLayer,
commitment_generator::CommitmentGeneratorLayer,
consensus::ExternalNodeConsensusLayer,
consistency_checker::ConsistencyCheckerLayer,
Expand Down Expand Up @@ -55,13 +57,14 @@ use zksync_state::RocksdbStorageOptions;

use crate::{
config::{self, ExternalNodeConfig},
metrics::framework::ExternalNodeMetricsLayer,
Component,
};

/// Builder for the external node.
#[derive(Debug)]
pub(crate) struct ExternalNodeBuilder {
node: ZkStackServiceBuilder,
pub(crate) node: ZkStackServiceBuilder,
config: ExternalNodeConfig,
}

Expand Down Expand Up @@ -115,6 +118,15 @@ impl ExternalNodeBuilder {
Ok(self)
}

fn add_external_node_metrics_layer(mut self) -> anyhow::Result<Self> {
self.node.add_layer(ExternalNodeMetricsLayer {
l1_chain_id: self.config.required.l1_chain_id,
l2_chain_id: self.config.required.l2_chain_id,
postgres_pool_size: self.config.postgres.max_connections,
});
Ok(self)
}

fn add_main_node_client_layer(mut self) -> anyhow::Result<Self> {
let layer = MainNodeClientLayer::new(
self.config.required.main_node_url.clone(),
Expand Down Expand Up @@ -431,6 +443,18 @@ impl ExternalNodeBuilder {
Ok(self)
}

fn add_block_reverter_layer(mut self) -> anyhow::Result<Self> {
let mut layer = BlockReverterLayer::new(NodeRole::External);
// Reverting executed batches is more-or-less safe for external nodes.
layer
.allow_rolling_back_executed_batches()
.enable_rolling_back_postgres()
.enable_rolling_back_merkle_tree(self.config.required.merkle_tree_path.clone())
.enable_rolling_back_state_keeper_cache(self.config.required.state_cache_path.clone());
self.node.add_layer(layer);
Ok(self)
}

/// This layer will make sure that the database is initialized correctly,
/// e.g.:
/// - genesis or snapshot recovery will be performed if it's required.
Expand Down Expand Up @@ -480,6 +504,21 @@ impl ExternalNodeBuilder {
.add_query_eth_client_layer()?
.add_reorg_detector_layer()?;

// Add layers that must run only on a single component.
if components.contains(&Component::Core) {
// Core is a singleton & mandatory component,
// so until we have a dedicated component for "auxiliary" tasks,
// it's responsible for things like metrics.
self = self
.add_postgres_metrics_layer()?
.add_external_node_metrics_layer()?;
// We assign the storage initialization to the core, as it's considered to be
// the "main" component.
self = self
.add_block_reverter_layer()?
.add_storage_initialization_layer(LayerKind::Task)?;
}

// Add preconditions for all the components.
self = self
.add_l1_batch_commitment_mode_validation_layer()?
Expand Down Expand Up @@ -536,11 +575,6 @@ impl ExternalNodeBuilder {
self = self.add_tree_data_fetcher_layer()?;
}
Component::Core => {
// Core is a singleton & mandatory component,
// so until we have a dedicated component for "auxiliary" tasks,
// it's responsible for things like metrics.
self = self.add_postgres_metrics_layer()?;

// Main tasks
self = self
.add_state_keeper_layer()?
Expand All @@ -549,10 +583,6 @@ impl ExternalNodeBuilder {
.add_consistency_checker_layer()?
.add_commitment_generator_layer()?
.add_batch_status_updater_layer()?;

// We assign the storage initialization to the core, as it's considered to be
// the "main" component.
self = self.add_storage_initialization_layer(LayerKind::Task)?;
}
}
}
Expand Down
Loading
Loading