Skip to content

Commit

Permalink
chore: da_dispatcher refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
dimazhornyk committed Dec 20, 2024
1 parent ea18999 commit a5cf30f
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 30 deletions.
15 changes: 12 additions & 3 deletions core/bin/zksync_server/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,10 +530,16 @@ impl MainNodeBuilder {
}

fn add_da_client_layer(mut self) -> anyhow::Result<Self> {
let eth_sender_config = try_load_config!(self.configs.eth);
if let Some(sender_config) = eth_sender_config.sender {
if sender_config.pubdata_sending_mode != PubdataSendingMode::Custom {
tracing::warn!("DA dispatcher is enabled, but the pubdata sending mode is not `Custom`. DA client will not be started.");
return Ok(self);
}
}

let Some(da_client_config) = self.configs.da_client_config.clone() else {
tracing::warn!("No config for DA client, using the NoDA client");
self.node.add_layer(NoDAClientWiringLayer);
return Ok(self);
bail!("No config for DA client");
};

let secrets = try_load_config!(self.secrets.data_availability);
Expand All @@ -555,6 +561,9 @@ impl MainNodeBuilder {
self.node
.add_layer(ObjectStorageClientWiringLayer::new(config));
}
(DAClientConfig::NoDA, _) => {
self.node.add_layer(NoDAClientWiringLayer);
}
_ => bail!("invalid pair of da_client and da_secrets"),
}

Expand Down
2 changes: 2 additions & 0 deletions core/lib/config/src/configs/da_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ pub const AVAIL_CLIENT_CONFIG_NAME: &str = "Avail";
pub const CELESTIA_CLIENT_CONFIG_NAME: &str = "Celestia";
pub const EIGEN_CLIENT_CONFIG_NAME: &str = "Eigen";
pub const OBJECT_STORE_CLIENT_CONFIG_NAME: &str = "ObjectStore";
pub const NO_DA_CLIENT_CONFIG_NAME: &str = "NoDA";

#[derive(Debug, Clone, PartialEq)]
pub enum DAClientConfig {
Avail(AvailConfig),
Celestia(CelestiaConfig),
Eigen(EigenConfig),
ObjectStore(ObjectStoreConfig),
NoDA,
}

impl From<AvailConfig> for DAClientConfig {
Expand Down
2 changes: 1 addition & 1 deletion core/lib/config/src/configs/da_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use serde::Deserialize;

pub const DEFAULT_POLLING_INTERVAL_MS: u32 = 5000;
pub const DEFAULT_MAX_ROWS_TO_DISPATCH: u32 = 100;
pub const DEFAULT_MAX_ROWS_TO_DISPATCH: u32 = 3;
pub const DEFAULT_MAX_RETRIES: u16 = 5;
pub const DEFAULT_USE_DUMMY_INCLUSION_DATA: bool = false;

Expand Down
4 changes: 3 additions & 1 deletion core/lib/protobuf_config/src/da_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use zksync_config::configs::{
avail::{AvailClientConfig, AvailConfig, AvailDefaultConfig, AvailGasRelayConfig},
celestia::CelestiaConfig,
eigen::EigenConfig,
DAClientConfig::{Avail, Celestia, Eigen, ObjectStore},
DAClientConfig::{Avail, Celestia, Eigen, NoDA, ObjectStore},
},
};
use zksync_protobuf::{required, ProtoRepr};
Expand Down Expand Up @@ -62,6 +62,7 @@ impl ProtoRepr for proto::DataAvailabilityClient {
proto::data_availability_client::Config::ObjectStore(conf) => {
ObjectStore(object_store_proto::ObjectStore::read(conf)?)
}
proto::data_availability_client::Config::NoDa(_) => NoDA,
};

Ok(client)
Expand Down Expand Up @@ -102,6 +103,7 @@ impl ProtoRepr for proto::DataAvailabilityClient {
ObjectStore(config) => proto::data_availability_client::Config::ObjectStore(
object_store_proto::ObjectStore::build(config),
),
NoDA => proto::data_availability_client::Config::NoDa(proto::NoDaConfig {}),
};

Self {
Expand Down
3 changes: 3 additions & 0 deletions core/lib/protobuf_config/src/proto/config/da_client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ message EigenConfig {
optional uint64 inclusion_polling_interval_ms = 2;
}

message NoDAConfig {}

message DataAvailabilityClient {
// oneof in protobuf allows for None
oneof config {
AvailConfig avail = 1;
object_store.ObjectStore object_store = 2;
CelestiaConfig celestia = 3;
EigenConfig eigen = 4;
NoDAConfig no_da = 5;
}
}
68 changes: 43 additions & 25 deletions core/node/da_dispatcher/src/da_dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::Future, time::Duration};
use std::{future::Future, sync::Arc, time::Duration};

use anyhow::Context;
use chrono::Utc;
Expand All @@ -14,7 +14,7 @@ use zksync_types::L1BatchNumber;

use crate::metrics::METRICS;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DataAvailabilityDispatcher {
client: Box<dyn DataAvailabilityClient>,
pool: ConnectionPool<Core>,
Expand All @@ -35,37 +35,55 @@ impl DataAvailabilityDispatcher {
}

pub async fn run(self, mut stop_receiver: Receiver<bool>) -> anyhow::Result<()> {
loop {
if *stop_receiver.borrow() {
break;
}
let self_arc = Arc::new(self.clone());

let subtasks = futures::future::join(
async {
if let Err(err) = self.dispatch().await {
tracing::error!("dispatch error {err:?}");
}
},
async {
if let Err(err) = self.poll_for_inclusion().await {
tracing::error!("poll_for_inclusion error {err:?}");
}
},
);
let mut stop_receiver_dispatch = stop_receiver.clone();

tokio::select! {
_ = subtasks => {},
_ = stop_receiver.changed() => {
let dispatch_task = tokio::spawn(async move {
loop {
if *stop_receiver_dispatch.borrow() {
break;
}
}

if tokio::time::timeout(self.config.polling_interval(), stop_receiver.changed())
if let Err(err) = self_arc.dispatch().await {
tracing::error!("dispatch error {err:?}");
}

if tokio::time::timeout(
self_arc.config.polling_interval(),
stop_receiver_dispatch.changed(),
)
.await
.is_ok()
{
break;
{
break;
}
}
});

let inclusion_task = tokio::spawn(async move {
loop {
if *stop_receiver.borrow() {
break;
}

if let Err(err) = self.poll_for_inclusion().await {
tracing::error!("poll_for_inclusion error {err:?}");
}

if tokio::time::timeout(self.config.polling_interval(), stop_receiver.changed())
.await
.is_ok()
{
break;
}
}
});

tokio::select! {
_ = dispatch_task => {},
_ = inclusion_task => {},
_ = stop_receiver.changed() => {},
}

tracing::info!("Stop signal received, da_dispatcher is shutting down");
Expand Down
2 changes: 2 additions & 0 deletions etc/env/file_based/overrides/validium.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ eth:
state_keeper:
pubdata_overhead_part: 0
compute_overhead_part: 1
da_client:
no_da:

0 comments on commit a5cf30f

Please sign in to comment.