From 8cbfad287c02881d17531e21d6648236ae6224d5 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Wed, 18 Dec 2024 14:15:57 -0300 Subject: [PATCH 1/3] use trait object --- core/node/da_clients/src/eigen/client.rs | 16 ++++++++------ .../node/da_clients/src/eigen/client_tests.rs | 10 ++++++--- core/node/da_clients/src/eigen/sdk.rs | 22 ++++++++++++++----- core/node/da_clients/src/eigen/verifier.rs | 5 ++++- .../layers/da_clients/eigen.rs | 4 ++++ 5 files changed, 41 insertions(+), 16 deletions(-) diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index ef4baa1c2ad..5baee9475e9 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -13,21 +13,23 @@ use super::sdk::RawEigenClient; use crate::utils::to_retriable_da_error; #[async_trait] -pub trait GetBlobData: Clone + std::fmt::Debug + Send + Sync { +pub trait GetBlobData: std::fmt::Debug + Send + Sync { async fn get_blob_data(&self, input: &str) -> anyhow::Result>>; + + fn clone_boxed(&self) -> Box; } /// EigenClient is a client for the Eigen DA service. #[derive(Debug, Clone)] -pub struct EigenClient { - pub(crate) client: Arc>, +pub struct EigenClient { + pub(crate) client: Arc, } -impl EigenClient { +impl EigenClient { pub async fn new( config: EigenConfig, secrets: EigenSecrets, - get_blob_data: Box, + get_blob_data: Box, ) -> anyhow::Result { let private_key = SecretKey::from_str(secrets.private_key.0.expose_secret().as_str()) .map_err(|e| anyhow::anyhow!("Failed to parse private key: {}", e))?; @@ -40,7 +42,7 @@ impl EigenClient { } #[async_trait] -impl DataAvailabilityClient for EigenClient { +impl DataAvailabilityClient for EigenClient { async fn dispatch_blob( &self, _: u32, // batch number @@ -75,6 +77,6 @@ impl DataAvailabilityClient for EigenClient { } fn blob_size_limit(&self) -> Option { - Some(RawEigenClient::::blob_size_limit()) + Some(RawEigenClient::blob_size_limit()) } } diff --git a/core/node/da_clients/src/eigen/client_tests.rs b/core/node/da_clients/src/eigen/client_tests.rs index 99cd81cf279..4fbbd5c3676 100644 --- a/core/node/da_clients/src/eigen/client_tests.rs +++ b/core/node/da_clients/src/eigen/client_tests.rs @@ -17,7 +17,7 @@ mod tests { use crate::eigen::{blob_info::BlobInfo, EigenClient, GetBlobData}; - impl EigenClient { + impl EigenClient { pub async fn get_blob_data( &self, blob_id: BlobInfo, @@ -32,8 +32,8 @@ mod tests { const STATUS_QUERY_TIMEOUT: u64 = 1800000; // 30 minutes const STATUS_QUERY_INTERVAL: u64 = 5; // 5 ms - async fn get_blob_info( - client: &EigenClient, + async fn get_blob_info( + client: &EigenClient, result: &DispatchResponse, ) -> anyhow::Result { let blob_info = (|| async { @@ -62,6 +62,10 @@ mod tests { async fn get_blob_data(&self, _input: &'_ str) -> anyhow::Result>> { Ok(None) } + + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } } #[ignore = "depends on external RPC"] diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index c1e52e42ffc..63a679c4f7d 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -30,24 +30,36 @@ use crate::eigen::{ verifier::VerificationError, }; -#[derive(Debug, Clone)] -pub(crate) struct RawEigenClient { +#[derive(Debug)] +pub(crate) struct RawEigenClient { client: Arc>>, private_key: SecretKey, pub config: EigenConfig, verifier: Verifier, - get_blob_data: Box, + get_blob_data: Box, +} + +impl Clone for RawEigenClient { + fn clone(&self) -> Self { + Self { + client: self.client.clone(), + private_key: self.private_key.clone(), + config: self.config.clone(), + verifier: self.verifier.clone(), + get_blob_data: self.get_blob_data.clone_boxed(), + } + } } pub(crate) const DATA_CHUNK_SIZE: usize = 32; -impl RawEigenClient { +impl RawEigenClient { const BLOB_SIZE_LIMIT: usize = 1024 * 1024 * 2; // 2 MB pub async fn new( private_key: SecretKey, config: EigenConfig, - get_blob_data: Box, + get_blob_data: Box, ) -> anyhow::Result { let endpoint = Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?; diff --git a/core/node/da_clients/src/eigen/verifier.rs b/core/node/da_clients/src/eigen/verifier.rs index 7750b99ba54..3d56df88f11 100644 --- a/core/node/da_clients/src/eigen/verifier.rs +++ b/core/node/da_clients/src/eigen/verifier.rs @@ -295,7 +295,10 @@ impl Verifier { .map_err(|_| VerificationError::ServiceManagerError)? .as_u64(); - let depth = self.cfg.settlement_layer_confirmation_depth.saturating_sub(1); + let depth = self + .cfg + .settlement_layer_confirmation_depth + .saturating_sub(1); let block_to_return = latest - depth as u64; Ok(block_to_return) } diff --git a/core/node/node_framework/src/implementations/layers/da_clients/eigen.rs b/core/node/node_framework/src/implementations/layers/da_clients/eigen.rs index ee5755d85c1..3eb9675c03b 100644 --- a/core/node/node_framework/src/implementations/layers/da_clients/eigen.rs +++ b/core/node/node_framework/src/implementations/layers/da_clients/eigen.rs @@ -74,4 +74,8 @@ impl GetBlobData for GetBlobFromDB { .await?; Ok(batch.map(|b| b.pubdata)) } + + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } } From 83288f4107b5026c20e5a35b6a3f3f5a133396fc Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Wed, 18 Dec 2024 14:51:51 -0300 Subject: [PATCH 2/3] prevent blocking non async code --- core/node/da_clients/src/eigen/verifier.rs | 32 ++++++++++++++-------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/core/node/da_clients/src/eigen/verifier.rs b/core/node/da_clients/src/eigen/verifier.rs index 3d56df88f11..332384e1443 100644 --- a/core/node/da_clients/src/eigen/verifier.rs +++ b/core/node/da_clients/src/eigen/verifier.rs @@ -1,8 +1,9 @@ -use std::{collections::HashMap, fs::File, io::copy, path::Path}; +use std::{collections::HashMap, path::Path}; use ark_bn254::{Fq, G1Affine}; use ethabi::{encode, ParamType, Token}; use rust_kzg_bn254::{blob::Blob, kzg::Kzg, polynomial::PolynomialFormat}; +use tokio::{fs::File, io::AsyncWriteExt}; use url::Url; use zksync_basic_types::web3::CallRequest; use zksync_eth_client::{clients::PKSigningClient, EnrichedClientResult}; @@ -113,12 +114,14 @@ impl Verifier { } let path = format!("./{}", point); let path = Path::new(&path); - let mut file = File::create(path).map_err(|_| VerificationError::LinkError)?; + let mut file = File::create(path).await.map_err(|_| VerificationError::LinkError)?; let content = response .bytes() .await .map_err(|_| VerificationError::LinkError)?; - copy(&mut content.as_ref(), &mut file).map_err(|_| VerificationError::LinkError)?; + file.write_all(&content) + .await + .map_err(|_| VerificationError::LinkError)?; Ok(()) } async fn save_points(url_g1: String, url_g2: String) -> Result { @@ -133,15 +136,20 @@ impl Verifier { ) -> Result { let srs_points_to_load = cfg.max_blob_size / Self::POINT_SIZE; let path = Self::save_points(cfg.clone().g1_url, cfg.clone().g2_url).await?; - let kzg = Kzg::setup( - &format!("{}/{}", path, Self::G1POINT), - "", - &format!("{}/{}", path, Self::G2POINT), - Self::SRSORDER, - srs_points_to_load, - "".to_string(), - ); - let kzg = kzg.map_err(|e| { + let kzg_handle = tokio::task::spawn_blocking(move || { + Kzg::setup( + &format!("{}/{}", path, Self::G1POINT), + "", + &format!("{}/{}", path, Self::G2POINT), + Self::SRSORDER, + srs_points_to_load, + "".to_string(), + ) + }); + let kzg = kzg_handle.await.map_err(|e| { + tracing::error!("Failed to setup KZG: {:?}", e); + VerificationError::KzgError + })?.map_err(|e| { tracing::error!("Failed to setup KZG: {:?}", e); VerificationError::KzgError })?; From 7a393d05a2666c3c2a923fdd3d2c9cefce31a937 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Wed, 18 Dec 2024 15:21:21 -0300 Subject: [PATCH 3/3] clippy suggestion --- core/node/da_clients/src/eigen/sdk.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index 63a679c4f7d..18c98ef2ce7 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -43,7 +43,7 @@ impl Clone for RawEigenClient { fn clone(&self) -> Self { Self { client: self.client.clone(), - private_key: self.private_key.clone(), + private_key: self.private_key, config: self.config.clone(), verifier: self.verifier.clone(), get_blob_data: self.get_blob_data.clone_boxed(),