Skip to content

Commit

Permalink
feat(node-framework): New wiring interface (#2384)
Browse files Browse the repository at this point in the history
## What ❔

⚠️ No nitpick territory! This PR touches _a lot of code_, and many
places there may be improved for sure.
Let's focus on fundamentals only.
You are free to leave nitpick comments, but please don't block the
review on them only. I may or may not fix nitpicks, though will try
depending on complexity and capacity, most likely in follow-up PRs.

This PR introduces a new interface for `WiringLayer`. Instead of giving
direct access to the `ServiceContext`, it now has to define `Input` and
`Output` types, which will be fetched from/inserted to the context
correspondingly.
`WiringLayer::Input` has to implement `FromContext` trait. This trait
has implementations for `()`, `T: Resource`, `Option<T: Resource>` , and
can be derived.
`WiringLayer::Output` has to implement `IntoContext`, which has the same
basic implementations, and also has a derive macro.

With this approach, all the inputs and outputs can be easily seen for
the layer, so that we don't need to worry about docs getting outdated,
and also it saves quite some boilerplate when using the framework.

Besides, small changes were made where necessary, e.g.:
- Consensus layer was split into two, for main and external node.
- TxSink layer was split into two, for DB and proxy sinks.
- A lot of "wrapper" tasks were removed.
- Some convenience impls (e.g. impl `From <WrappedType> to
<ResourceType>`).
- Shutdown hook was made into a separate entity that implements
`IntoContext`.

## Why ❔

Finalization of the framework design.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
popzxc authored Jul 8, 2024
1 parent 52a4680 commit f2f4056
Show file tree
Hide file tree
Showing 82 changed files with 2,266 additions and 1,827 deletions.
12 changes: 4 additions & 8 deletions core/bin/external_node/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use zksync_node_framework::{
implementations::layers::{
batch_status_updater::BatchStatusUpdaterLayer,
commitment_generator::CommitmentGeneratorLayer,
consensus::{ConsensusLayer, Mode},
consensus::ExternalNodeConsensusLayer,
consistency_checker::ConsistencyCheckerLayer,
healtcheck_server::HealthCheckLayer,
l1_batch_commitment_mode_validation::L1BatchCommitmentModeValidationLayer,
Expand All @@ -41,7 +41,7 @@ use zksync_node_framework::{
server::{Web3ServerLayer, Web3ServerOptionalConfig},
tree_api_client::TreeApiClientLayer,
tx_sender::{PostgresStorageCachesConfig, TxSenderLayer},
tx_sink::TxSinkLayer,
tx_sink::ProxySinkLayer,
},
},
service::{ZkStackService, ZkStackServiceBuilder},
Expand Down Expand Up @@ -209,11 +209,7 @@ impl ExternalNodeBuilder {
let config = self.config.consensus.clone();
let secrets =
config::read_consensus_secrets().context("config::read_consensus_secrets()")?;
let layer = ConsensusLayer {
mode: Mode::External,
config,
secrets,
};
let layer = ExternalNodeConsensusLayer { config, secrets };
self.node.add_layer(layer);
Ok(self)
}
Expand Down Expand Up @@ -359,7 +355,7 @@ impl ExternalNodeBuilder {
)
.with_whitelisted_tokens_for_aa_cache(true);

self.node.add_layer(TxSinkLayer::ProxySink);
self.node.add_layer(ProxySinkLayer);
self.node.add_layer(tx_sender_layer);
Ok(self)
}
Expand Down
20 changes: 13 additions & 7 deletions core/bin/zksync_server/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use zksync_node_framework::{
base_token_ratio_provider::BaseTokenRatioProviderLayer,
circuit_breaker_checker::CircuitBreakerCheckerLayer,
commitment_generator::CommitmentGeneratorLayer,
consensus::{ConsensusLayer, Mode as ConsensusMode},
consensus::MainNodeConsensusLayer,
contract_verification_api::ContractVerificationApiLayer,
da_dispatcher::DataAvailabilityDispatcherLayer,
eth_sender::{EthTxAggregatorLayer, EthTxManagerLayer},
Expand Down Expand Up @@ -56,7 +56,7 @@ use zksync_node_framework::{
server::{Web3ServerLayer, Web3ServerOptionalConfig},
tree_api_client::TreeApiClientLayer,
tx_sender::{PostgresStorageCachesConfig, TxSenderLayer},
tx_sink::TxSinkLayer,
tx_sink::MasterPoolSinkLayer,
},
},
service::{ZkStackService, ZkStackServiceBuilder},
Expand Down Expand Up @@ -280,7 +280,7 @@ impl MainNodeBuilder {
};

// On main node we always use master pool sink.
self.node.add_layer(TxSinkLayer::MasterPoolSink);
self.node.add_layer(MasterPoolSinkLayer);
self.node.add_layer(TxSenderLayer::new(
TxSenderConfig::new(
&sk_config,
Expand Down Expand Up @@ -445,10 +445,16 @@ impl MainNodeBuilder {
}

fn add_consensus_layer(mut self) -> anyhow::Result<Self> {
self.node.add_layer(ConsensusLayer {
mode: ConsensusMode::Main,
config: self.consensus_config.clone(),
secrets: self.secrets.consensus.clone(),
self.node.add_layer(MainNodeConsensusLayer {
config: self
.consensus_config
.clone()
.context("Consensus config has to be provided")?,
secrets: self
.secrets
.consensus
.clone()
.context("Consensus secrets have to be provided")?,
});

Ok(self)
Expand Down
44 changes: 28 additions & 16 deletions core/bin/zksync_tee_prover/src/tee_prover.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::time::Duration;
use std::{fmt, time::Duration};

use secp256k1::{ecdsa::Signature, Message, PublicKey, Secp256k1, SecretKey};
use url::Url;
use zksync_basic_types::H256;
use zksync_node_framework::{
service::{ServiceContext, StopReceiver},
service::StopReceiver,
task::{Task, TaskId},
wiring_layer::{WiringError, WiringLayer},
IntoContext,
};
use zksync_prover_interface::inputs::TeeVerifierInput;
use zksync_tee_verifier::Verify;
Expand All @@ -15,16 +16,8 @@ use zksync_types::{tee_types::TeeType, L1BatchNumber};
use crate::{api_client::TeeApiClient, error::TeeProverError, metrics::METRICS};

/// Wiring layer for `TeeProver`
///
/// ## Requests resources
///
/// no resources requested
///
/// ## Adds tasks
///
/// - `TeeProver`
#[derive(Debug)]
pub struct TeeProverLayer {
pub(crate) struct TeeProverLayer {
api_url: Url,
signing_key: SecretKey,
attestation_quote_bytes: Vec<u8>,
Expand All @@ -47,27 +40,35 @@ impl TeeProverLayer {
}
}

#[derive(Debug, IntoContext)]
pub(crate) struct LayerOutput {
#[context(task)]
pub tee_prover: TeeProver,
}

#[async_trait::async_trait]
impl WiringLayer for TeeProverLayer {
type Input = ();
type Output = LayerOutput;

fn layer_name(&self) -> &'static str {
"tee_prover_layer"
}

async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
let tee_prover_task = TeeProver {
async fn wire(self, _input: Self::Input) -> Result<Self::Output, WiringError> {
let tee_prover = TeeProver {
config: Default::default(),
signing_key: self.signing_key,
public_key: self.signing_key.public_key(&Secp256k1::new()),
attestation_quote_bytes: self.attestation_quote_bytes,
tee_type: self.tee_type,
api_client: TeeApiClient::new(self.api_url),
};
context.add_task(tee_prover_task);
Ok(())
Ok(LayerOutput { tee_prover })
}
}

struct TeeProver {
pub(crate) struct TeeProver {
config: TeeProverConfig,
signing_key: SecretKey,
public_key: PublicKey,
Expand All @@ -76,6 +77,17 @@ struct TeeProver {
api_client: TeeApiClient,
}

impl fmt::Debug for TeeProver {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TeeProver")
.field("config", &self.config)
.field("public_key", &self.public_key)
.field("attestation_quote_bytes", &self.attestation_quote_bytes)
.field("tee_type", &self.tee_type)
.finish()
}
}

impl TeeProver {
fn verify(
&self,
Expand Down
18 changes: 13 additions & 5 deletions core/lib/default_da_clients/src/no_da/wiring_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,34 @@ use std::fmt::Debug;
use zksync_da_client::DataAvailabilityClient;
use zksync_node_framework::{
implementations::resources::da_client::DAClientResource,
service::ServiceContext,
wiring_layer::{WiringError, WiringLayer},
IntoContext,
};

use crate::no_da::client::NoDAClient;

#[derive(Debug, Default)]
pub struct NoDAClientWiringLayer;

#[derive(Debug, IntoContext)]
pub struct Output {
pub client: DAClientResource,
}

#[async_trait::async_trait]
impl WiringLayer for NoDAClientWiringLayer {
type Input = ();
type Output = Output;

fn layer_name(&self) -> &'static str {
"no_da_layer"
}

async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
async fn wire(self, _input: Self::Input) -> Result<Self::Output, WiringError> {
let client: Box<dyn DataAvailabilityClient> = Box::new(NoDAClient);

context.insert_resource(DAClientResource(client))?;

Ok(())
Ok(Output {
client: DAClientResource(client),
})
}
}
18 changes: 13 additions & 5 deletions core/lib/default_da_clients/src/object_store/wiring_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use zksync_config::ObjectStoreConfig;
use zksync_da_client::DataAvailabilityClient;
use zksync_node_framework::{
implementations::resources::da_client::DAClientResource,
service::ServiceContext,
wiring_layer::{WiringError, WiringLayer},
IntoContext,
};

use crate::object_store::client::ObjectStoreDAClient;
Expand All @@ -19,18 +19,26 @@ impl ObjectStorageClientWiringLayer {
}
}

#[derive(Debug, IntoContext)]
pub struct Output {
pub client: DAClientResource,
}

#[async_trait::async_trait]
impl WiringLayer for ObjectStorageClientWiringLayer {
type Input = ();
type Output = Output;

fn layer_name(&self) -> &'static str {
"object_store_da_layer"
}

async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
async fn wire(self, _input: Self::Input) -> Result<Self::Output, WiringError> {
let client: Box<dyn DataAvailabilityClient> =
Box::new(ObjectStoreDAClient::new(self.config).await?);

context.insert_resource(DAClientResource(client))?;

Ok(())
Ok(Output {
client: DAClientResource(client),
})
}
}
47 changes: 27 additions & 20 deletions core/node/base_token_adjuster/src/base_token_ratio_provider.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{fmt::Debug, num::NonZeroU64, time::Duration};
use std::{
fmt::Debug,
num::NonZeroU64,
sync::{Arc, RwLock},
time::Duration,
};

use anyhow::Context;
use async_trait::async_trait;
Expand All @@ -9,23 +14,23 @@ use zksync_types::fee_model::BaseTokenConversionRatio;
const CACHE_UPDATE_INTERVAL: Duration = Duration::from_millis(500);

#[async_trait]
pub trait BaseTokenRatioProvider: Debug + Send + Sync {
pub trait BaseTokenRatioProvider: Debug + Send + Sync + 'static {
fn get_conversion_ratio(&self) -> BaseTokenConversionRatio;
}

#[derive(Debug, Clone)]
pub struct DBBaseTokenRatioProvider {
pub pool: ConnectionPool<Core>,
pub latest_ratio: BaseTokenConversionRatio,
pub latest_ratio: Arc<RwLock<BaseTokenConversionRatio>>,
}

impl DBBaseTokenRatioProvider {
pub async fn new(pool: ConnectionPool<Core>) -> anyhow::Result<Self> {
let mut fetcher = Self {
let fetcher = Self {
pool,
latest_ratio: BaseTokenConversionRatio::default(),
latest_ratio: Arc::default(),
};
fetcher.latest_ratio = fetcher.get_latest_price().await?;
fetcher.update_latest_price().await?;

// TODO(PE-129): Implement latest ratio usability logic.

Expand All @@ -36,7 +41,11 @@ impl DBBaseTokenRatioProvider {
Ok(fetcher)
}

pub async fn run(&mut self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
fn get_latest_ratio(&self) -> BaseTokenConversionRatio {
*self.latest_ratio.read().unwrap()
}

pub async fn run(&self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let mut timer = tokio::time::interval(CACHE_UPDATE_INTERVAL);

while !*stop_receiver.borrow_and_update() {
Expand All @@ -45,20 +54,15 @@ impl DBBaseTokenRatioProvider {
_ = stop_receiver.changed() => break,
}

let latest_storage_ratio = self.get_latest_price().await?;

// TODO(PE-129): Implement latest ratio usability logic.
self.latest_ratio = BaseTokenConversionRatio {
numerator: latest_storage_ratio.numerator,
denominator: latest_storage_ratio.denominator,
};
self.update_latest_price().await?;
}

tracing::info!("Stop signal received, base_token_ratio_provider is shutting down");
Ok(())
}

async fn get_latest_price(&self) -> anyhow::Result<BaseTokenConversionRatio> {
async fn update_latest_price(&self) -> anyhow::Result<()> {
let latest_storage_ratio = self
.pool
.connection_tagged("db_base_token_ratio_provider")
Expand All @@ -68,28 +72,31 @@ impl DBBaseTokenRatioProvider {
.get_latest_ratio()
.await;

match latest_storage_ratio {
Ok(Some(latest_storage_price)) => Ok(BaseTokenConversionRatio {
let ratio = match latest_storage_ratio {
Ok(Some(latest_storage_price)) => BaseTokenConversionRatio {
numerator: latest_storage_price.numerator,
denominator: latest_storage_price.denominator,
}),
},
Ok(None) => {
// TODO(PE-136): Insert initial ratio from genesis.
// Though the DB should be populated very soon after the server starts, it is possible
// to have no ratios in the DB right after genesis. Having initial ratios in the DB
// from the genesis stage will eliminate this possibility.
tracing::error!("No latest price found in the database. Using default ratio.");
Ok(BaseTokenConversionRatio::default())
BaseTokenConversionRatio::default()
}
Err(err) => anyhow::bail!("Failed to get latest base token ratio: {:?}", err),
}
};

*self.latest_ratio.write().unwrap() = ratio;
Ok(())
}
}

#[async_trait]
impl BaseTokenRatioProvider for DBBaseTokenRatioProvider {
fn get_conversion_ratio(&self) -> BaseTokenConversionRatio {
self.latest_ratio
self.get_latest_ratio()
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/node/node_framework/examples/main_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use zksync_node_framework::{
server::{Web3ServerLayer, Web3ServerOptionalConfig},
tree_api_client::TreeApiClientLayer,
tx_sender::{PostgresStorageCachesConfig, TxSenderLayer},
tx_sink::TxSinkLayer,
tx_sink::MasterPoolSinkLayer,
},
},
service::{ZkStackService, ZkStackServiceBuilder, ZkStackServiceError},
Expand Down Expand Up @@ -215,7 +215,7 @@ impl MainNodeBuilder {
let wallets = Wallets::from_env()?;

// On main node we always use master pool sink.
self.node.add_layer(TxSinkLayer::MasterPoolSink);
self.node.add_layer(MasterPoolSinkLayer);
self.node.add_layer(TxSenderLayer::new(
TxSenderConfig::new(
&state_keeper_config,
Expand Down
Loading

0 comments on commit f2f4056

Please sign in to comment.