diff --git a/Cargo.lock b/Cargo.lock index 0554982e157a..9f94faea781c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9106,7 +9106,10 @@ dependencies = [ "percent-encoding", "pin-project", "prost 0.12.6", + "rustls-pemfile 2.2.0", + "rustls-pki-types", "tokio", + "tokio-rustls 0.25.0", "tokio-stream", "tower 0.4.13", "tower-layer", @@ -10939,8 +10942,10 @@ dependencies = [ "subxt-metadata", "subxt-signer", "tokio", + "tokio-stream", "tonic 0.11.0", "tracing", + "zksync_basic_types", "zksync_config", "zksync_da_client", "zksync_env_config", diff --git a/Cargo.toml b/Cargo.toml index 5da7612171f9..e7cce4c4c421 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -215,9 +215,12 @@ subxt-signer = { version = "0.34", default-features = false } celestia-types = "0.6.1" bech32 = "0.11.0" ripemd = "0.1.3" -tonic = "0.11.0" +tonic = { version = "0.11.0", default-features = false } pbjson-types = "0.6.0" +# Eigen +tokio-stream = "0.1.16" + # Here and below: # We *always* pin the latest version of protocol to disallow accidental changes in the execution logic. # However, for the historical version of protocol crates, we have lax requirements. Otherwise, diff --git a/core/bin/zksync_server/src/node_builder.rs b/core/bin/zksync_server/src/node_builder.rs index c9d99cc0783f..e7a3dca77f15 100644 --- a/core/bin/zksync_server/src/node_builder.rs +++ b/core/bin/zksync_server/src/node_builder.rs @@ -26,8 +26,8 @@ use zksync_node_framework::{ consensus::MainNodeConsensusLayer, contract_verification_api::ContractVerificationApiLayer, da_clients::{ - avail::AvailWiringLayer, celestia::CelestiaWiringLayer, no_da::NoDAClientWiringLayer, - object_store::ObjectStorageClientWiringLayer, + avail::AvailWiringLayer, celestia::CelestiaWiringLayer, eigen::EigenWiringLayer, + no_da::NoDAClientWiringLayer, object_store::ObjectStorageClientWiringLayer, }, da_dispatcher::DataAvailabilityDispatcherLayer, eth_sender::{EthTxAggregatorLayer, EthTxManagerLayer}, @@ -517,6 +517,10 @@ impl MainNodeBuilder { .add_layer(CelestiaWiringLayer::new(config, secret)); } + (DAClientConfig::Eigen(config), DataAvailabilitySecrets::Eigen(secret)) => { + self.node.add_layer(EigenWiringLayer::new(config, secret)); + } + (DAClientConfig::ObjectStore(config), _) => { self.node .add_layer(ObjectStorageClientWiringLayer::new(config)); diff --git a/core/lib/config/src/configs/da_client/eigen.rs b/core/lib/config/src/configs/da_client/eigen.rs new file mode 100644 index 000000000000..f2c05a0f61ef --- /dev/null +++ b/core/lib/config/src/configs/da_client/eigen.rs @@ -0,0 +1,13 @@ +use serde::Deserialize; +use zksync_basic_types::secrets::PrivateKey; + +#[derive(Clone, Debug, Default, PartialEq, Deserialize)] +pub struct EigenConfig { + pub rpc_node_url: String, + pub inclusion_polling_interval_ms: u64, +} + +#[derive(Clone, Debug, PartialEq)] +pub struct EigenSecrets { + pub private_key: PrivateKey, +} diff --git a/core/lib/config/src/configs/da_client/mod.rs b/core/lib/config/src/configs/da_client/mod.rs index 4806d7ed0996..322c4a20aac8 100644 --- a/core/lib/config/src/configs/da_client/mod.rs +++ b/core/lib/config/src/configs/da_client/mod.rs @@ -1,15 +1,18 @@ -use crate::{AvailConfig, CelestiaConfig, ObjectStoreConfig}; +use crate::{AvailConfig, CelestiaConfig, EigenConfig, ObjectStoreConfig}; pub mod avail; pub mod celestia; +pub mod eigen; pub const AVAIL_CLIENT_CONFIG_NAME: &str = "Avail"; pub const CELESTIA_CLIENT_CONFIG_NAME: &str = "Celestia"; +pub const EIGEN_CLIENT_CONFIG_NAME: &str = "Eigen"; pub const OBJECT_STORE_CLIENT_CONFIG_NAME: &str = "ObjectStore"; #[derive(Debug, Clone, PartialEq)] pub enum DAClientConfig { Avail(AvailConfig), Celestia(CelestiaConfig), + Eigen(EigenConfig), ObjectStore(ObjectStoreConfig), } diff --git a/core/lib/config/src/configs/mod.rs b/core/lib/config/src/configs/mod.rs index 0c756ad95647..2b848030d719 100644 --- a/core/lib/config/src/configs/mod.rs +++ b/core/lib/config/src/configs/mod.rs @@ -5,7 +5,7 @@ pub use self::{ commitment_generator::CommitmentGeneratorConfig, contract_verifier::ContractVerifierConfig, contracts::{ContractsConfig, EcosystemContracts}, - da_client::{avail::AvailConfig, celestia::CelestiaConfig, DAClientConfig}, + da_client::{avail::AvailConfig, celestia::CelestiaConfig, eigen::EigenConfig, DAClientConfig}, da_dispatcher::DADispatcherConfig, database::{DBConfig, PostgresConfig}, eth_sender::{EthConfig, GasAdjusterConfig}, diff --git a/core/lib/config/src/configs/secrets.rs b/core/lib/config/src/configs/secrets.rs index 4d95ae4d1ede..75ff067c2473 100644 --- a/core/lib/config/src/configs/secrets.rs +++ b/core/lib/config/src/configs/secrets.rs @@ -3,7 +3,7 @@ use zksync_basic_types::url::SensitiveUrl; use crate::configs::{ consensus::ConsensusSecrets, - da_client::{avail::AvailSecrets, celestia::CelestiaSecrets}, + da_client::{avail::AvailSecrets, celestia::CelestiaSecrets, eigen::EigenSecrets}, }; #[derive(Debug, Clone, PartialEq)] @@ -22,6 +22,7 @@ pub struct L1Secrets { pub enum DataAvailabilitySecrets { Avail(AvailSecrets), Celestia(CelestiaSecrets), + Eigen(EigenSecrets), } #[derive(Debug, Clone, PartialEq)] diff --git a/core/lib/config/src/lib.rs b/core/lib/config/src/lib.rs index c02f3e531b34..f77a8ceb39ad 100644 --- a/core/lib/config/src/lib.rs +++ b/core/lib/config/src/lib.rs @@ -2,9 +2,9 @@ pub use crate::configs::{ ApiConfig, AvailConfig, BaseTokenAdjusterConfig, CelestiaConfig, ContractVerifierConfig, - ContractsConfig, DAClientConfig, DADispatcherConfig, DBConfig, EthConfig, EthWatchConfig, - ExternalProofIntegrationApiConfig, GasAdjusterConfig, GenesisConfig, ObjectStoreConfig, - PostgresConfig, SnapshotsCreatorConfig, + ContractsConfig, DAClientConfig, DADispatcherConfig, DBConfig, EigenConfig, EthConfig, + EthWatchConfig, ExternalProofIntegrationApiConfig, GasAdjusterConfig, GenesisConfig, + ObjectStoreConfig, PostgresConfig, SnapshotsCreatorConfig, }; pub mod configs; diff --git a/core/lib/env_config/src/da_client.rs b/core/lib/env_config/src/da_client.rs index 70819a706427..8ceeb215faf4 100644 --- a/core/lib/env_config/src/da_client.rs +++ b/core/lib/env_config/src/da_client.rs @@ -6,8 +6,9 @@ use zksync_config::configs::{ AvailClientConfig, AvailSecrets, AVAIL_FULL_CLIENT_NAME, AVAIL_GAS_RELAY_CLIENT_NAME, }, celestia::CelestiaSecrets, + eigen::EigenSecrets, DAClientConfig, AVAIL_CLIENT_CONFIG_NAME, CELESTIA_CLIENT_CONFIG_NAME, - OBJECT_STORE_CLIENT_CONFIG_NAME, + EIGEN_CLIENT_CONFIG_NAME, OBJECT_STORE_CLIENT_CONFIG_NAME, }, secrets::DataAvailabilitySecrets, AvailConfig, @@ -33,6 +34,7 @@ impl FromEnv for DAClientConfig { }, }), CELESTIA_CLIENT_CONFIG_NAME => Self::Celestia(envy_load("da_celestia_config", "DA_")?), + EIGEN_CLIENT_CONFIG_NAME => Self::Eigen(envy_load("da_eigen_config", "DA_")?), OBJECT_STORE_CLIENT_CONFIG_NAME => { Self::ObjectStore(envy_load("da_object_store", "DA_")?) } @@ -66,11 +68,18 @@ impl FromEnv for DataAvailabilitySecrets { } CELESTIA_CLIENT_CONFIG_NAME => { let private_key = env::var("DA_SECRETS_PRIVATE_KEY") - .map_err(|e| anyhow::format_err!("private key not found: {}", e))? + .map_err(|e| anyhow::format_err!("Celestia private key not found: {}", e))? .parse() - .map_err(|e| anyhow::format_err!("failed to parse the auth token: {}", e))?; + .map_err(|e| anyhow::format_err!("failed to parse the private key: {}", e))?; Self::Celestia(CelestiaSecrets { private_key }) } + EIGEN_CLIENT_CONFIG_NAME => { + let private_key = env::var("DA_SECRETS_PRIVATE_KEY") + .map_err(|e| anyhow::format_err!("Eigen private key not found: {}", e))? + .parse() + .map_err(|e| anyhow::format_err!("failed to parse the private key: {}", e))?; + Self::Eigen(EigenSecrets { private_key }) + } _ => anyhow::bail!("Unknown DA client name: {}", client_tag), }; @@ -89,7 +98,7 @@ mod tests { }, object_store::ObjectStoreMode::GCS, }, - AvailConfig, CelestiaConfig, ObjectStoreConfig, + AvailConfig, CelestiaConfig, EigenConfig, ObjectStoreConfig, }; use super::*; @@ -234,6 +243,26 @@ mod tests { ); } + #[test] + fn from_env_eigen_client() { + let mut lock = MUTEX.lock(); + let config = r#" + DA_CLIENT="Eigen" + DA_RPC_NODE_URL="localhost:12345" + DA_INCLUSION_POLLING_INTERVAL_MS="1000" + "#; + lock.set_env(config); + + let actual = DAClientConfig::from_env().unwrap(); + assert_eq!( + actual, + DAClientConfig::Eigen(EigenConfig { + rpc_node_url: "localhost:12345".to_string(), + inclusion_polling_interval_ms: 1000, + }) + ); + } + #[test] fn from_env_celestia_secrets() { let mut lock = MUTEX.lock(); diff --git a/core/lib/protobuf_config/src/da_client.rs b/core/lib/protobuf_config/src/da_client.rs index e175a671c3ce..341a6a9e4f43 100644 --- a/core/lib/protobuf_config/src/da_client.rs +++ b/core/lib/protobuf_config/src/da_client.rs @@ -4,7 +4,8 @@ use zksync_config::configs::{ da_client::{ avail::{AvailClientConfig, AvailConfig, AvailDefaultConfig, AvailGasRelayConfig}, celestia::CelestiaConfig, - DAClientConfig::{Avail, Celestia, ObjectStore}, + eigen::EigenConfig, + DAClientConfig::{Avail, Celestia, Eigen, ObjectStore}, }, }; use zksync_protobuf::{required, ProtoRepr}; @@ -51,6 +52,13 @@ impl ProtoRepr for proto::DataAvailabilityClient { chain_id: required(&conf.chain_id).context("chain_id")?.clone(), timeout_ms: *required(&conf.timeout_ms).context("timeout_ms")?, }), + proto::data_availability_client::Config::Eigen(conf) => Eigen(EigenConfig { + rpc_node_url: required(&conf.rpc_node_url) + .context("rpc_node_url")? + .clone(), + inclusion_polling_interval_ms: *required(&conf.inclusion_polling_interval_ms) + .context("inclusion_polling_interval_ms")?, + }), proto::data_availability_client::Config::ObjectStore(conf) => { ObjectStore(object_store_proto::ObjectStore::read(conf)?) } @@ -79,7 +87,6 @@ impl ProtoRepr for proto::DataAvailabilityClient { ), }, }), - Celestia(config) => { proto::data_availability_client::Config::Celestia(proto::CelestiaConfig { api_node_url: Some(config.api_node_url.clone()), @@ -88,6 +95,10 @@ impl ProtoRepr for proto::DataAvailabilityClient { timeout_ms: Some(config.timeout_ms), }) } + Eigen(config) => proto::data_availability_client::Config::Eigen(proto::EigenConfig { + rpc_node_url: Some(config.rpc_node_url.clone()), + inclusion_polling_interval_ms: Some(config.inclusion_polling_interval_ms), + }), ObjectStore(config) => proto::data_availability_client::Config::ObjectStore( object_store_proto::ObjectStore::build(config), ), diff --git a/core/lib/protobuf_config/src/proto/config/da_client.proto b/core/lib/protobuf_config/src/proto/config/da_client.proto index 206b1d05c04e..0a302120d775 100644 --- a/core/lib/protobuf_config/src/proto/config/da_client.proto +++ b/core/lib/protobuf_config/src/proto/config/da_client.proto @@ -36,11 +36,17 @@ message CelestiaConfig { optional uint64 timeout_ms = 4; } +message EigenConfig { + optional string rpc_node_url = 1; + optional uint64 inclusion_polling_interval_ms = 2; +} + message DataAvailabilityClient { // oneof in protobuf allows for None oneof config { AvailConfig avail = 1; object_store.ObjectStore object_store = 2; CelestiaConfig celestia = 3; + EigenConfig eigen = 4; } } diff --git a/core/lib/protobuf_config/src/proto/config/secrets.proto b/core/lib/protobuf_config/src/proto/config/secrets.proto index 145a8cf0c45f..7c9d0f928237 100644 --- a/core/lib/protobuf_config/src/proto/config/secrets.proto +++ b/core/lib/protobuf_config/src/proto/config/secrets.proto @@ -28,10 +28,15 @@ message CelestiaSecret { optional string private_key = 1; } +message EigenSecret { + optional string private_key = 1; +} + message DataAvailabilitySecrets { oneof da_secrets { AvailSecret avail = 1; CelestiaSecret celestia = 2; + EigenSecret eigen = 3; } } diff --git a/core/lib/protobuf_config/src/secrets.rs b/core/lib/protobuf_config/src/secrets.rs index d9cdf3384899..f5bc10a3e340 100644 --- a/core/lib/protobuf_config/src/secrets.rs +++ b/core/lib/protobuf_config/src/secrets.rs @@ -8,7 +8,7 @@ use zksync_basic_types::{ }; use zksync_config::configs::{ consensus::{AttesterSecretKey, ConsensusSecrets, NodeSecretKey, ValidatorSecretKey}, - da_client::{avail::AvailSecrets, celestia::CelestiaSecrets}, + da_client::{avail::AvailSecrets, celestia::CelestiaSecrets, eigen::EigenSecrets}, secrets::{DataAvailabilitySecrets, Secrets}, DatabaseSecrets, L1Secrets, }; @@ -133,6 +133,11 @@ impl ProtoRepr for proto::DataAvailabilitySecrets { required(&celestia.private_key).context("private_key")?, )?, }), + DaSecrets::Eigen(eigen) => DataAvailabilitySecrets::Eigen(EigenSecrets { + private_key: PrivateKey::from_str( + required(&eigen.private_key).context("private_key")?, + )?, + }), }; Ok(client) @@ -179,6 +184,9 @@ impl ProtoRepr for proto::DataAvailabilitySecrets { private_key: Some(config.private_key.0.expose_secret().to_string()), })) } + DataAvailabilitySecrets::Eigen(config) => Some(DaSecrets::Eigen(proto::EigenSecret { + private_key: Some(config.private_key.0.expose_secret().to_string()), + })), }; Self { diff --git a/core/node/da_clients/Cargo.toml b/core/node/da_clients/Cargo.toml index da5cd4effa68..bde71ce3ec5a 100644 --- a/core/node/da_clients/Cargo.toml +++ b/core/node/da_clients/Cargo.toml @@ -23,6 +23,7 @@ zksync_types.workspace = true zksync_object_store.workspace = true zksync_da_client.workspace = true zksync_env_config.workspace = true +zksync_basic_types.workspace = true futures.workspace = true # Avail dependencies @@ -49,5 +50,8 @@ sha2.workspace = true prost.workspace = true bech32.workspace = true ripemd.workspace = true -tonic.workspace = true +tonic = { workspace = true, features = ["tls", "default"] } pbjson-types.workspace = true + +# Eigen dependencies +tokio-stream.workspace = true diff --git a/core/node/da_clients/README.md b/core/node/da_clients/README.md index df06cef24197..1b22e5198a68 100644 --- a/core/node/da_clients/README.md +++ b/core/node/da_clients/README.md @@ -8,3 +8,5 @@ Currently, the following DataAvailability clients are implemented: utilizing the DA framework. - `Object Store client` that stores the pubdata in the Object Store(GCS). - `Avail` that sends the pubdata to the Avail DA layer. +- `Celestia` that sends the pubdata to the Celestia DA layer. +- `Eigen` that sends the pubdata to the Eigen DA layer. diff --git a/core/node/da_clients/src/eigen/README.md b/core/node/da_clients/src/eigen/README.md new file mode 100644 index 000000000000..634b4eb58780 --- /dev/null +++ b/core/node/da_clients/src/eigen/README.md @@ -0,0 +1,35 @@ +# EigenDA client + +--- + +This is an implementation of the EigenDA client capable of sending the blobs to DA layer. It uses authenticated +requests, though the auth headers are kind of mocked in the current API implementation. + +The generated files are received by compiling the `.proto` files from EigenDA repo using the following function: + +```rust +pub fn compile_protos() { + let fds = protox::compile( + [ + "proto/common.proto", + "proto/disperser.proto", + ], + ["."], + ) + .expect("protox failed to build"); + + tonic_build::configure() + .build_client(true) + .build_server(false) + .skip_protoc_run() + .out_dir("generated") + .compile_fds(fds) + .unwrap(); +} +``` + +proto files are not included here to not create confusion in case they are not updated in time, so the EigenDA +[repo](https://github.com/Layr-Labs/eigenda/tree/master/api/proto) has to be a source of truth for the proto files. + +The generated folder here is considered a temporary solution until the EigenDA has a library with either a protogen, or +preferably a full Rust client implementation. diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs new file mode 100644 index 000000000000..d977620526aa --- /dev/null +++ b/core/node/da_clients/src/eigen/client.rs @@ -0,0 +1,65 @@ +use std::{str::FromStr, sync::Arc}; + +use async_trait::async_trait; +use secp256k1::SecretKey; +use subxt_signer::ExposeSecret; +use zksync_config::{configs::da_client::eigen::EigenSecrets, EigenConfig}; +use zksync_da_client::{ + types::{DAError, DispatchResponse, InclusionData}, + DataAvailabilityClient, +}; + +use super::sdk::RawEigenClient; +use crate::utils::to_non_retriable_da_error; + +#[derive(Debug, Clone)] +pub struct EigenClient { + client: Arc, +} + +impl EigenClient { + pub async fn new(config: EigenConfig, secrets: EigenSecrets) -> anyhow::Result { + let private_key = SecretKey::from_str(secrets.private_key.0.expose_secret().as_str()) + .map_err(|e| anyhow::anyhow!("Failed to parse private key: {}", e))?; + + Ok(EigenClient { + client: Arc::new( + RawEigenClient::new( + config.rpc_node_url, + config.inclusion_polling_interval_ms, + private_key, + ) + .await?, + ), + }) + } +} + +#[async_trait] +impl DataAvailabilityClient for EigenClient { + async fn dispatch_blob( + &self, + _: u32, // batch number + data: Vec, + ) -> Result { + let blob_id = self + .client + .dispatch_blob(data) + .await + .map_err(to_non_retriable_da_error)?; + + Ok(DispatchResponse::from(blob_id)) + } + + async fn get_inclusion_data(&self, _: &str) -> Result, DAError> { + Ok(Some(InclusionData { data: vec![] })) + } + + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + fn blob_size_limit(&self) -> Option { + Some(1920 * 1024) // 2mb - 128kb as a buffer + } +} diff --git a/core/node/da_clients/src/eigen/generated/common.rs b/core/node/da_clients/src/eigen/generated/common.rs new file mode 100644 index 000000000000..0599b9af4127 --- /dev/null +++ b/core/node/da_clients/src/eigen/generated/common.rs @@ -0,0 +1,63 @@ +// This file is @generated by prost-build. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct G1Commitment { + /// The X coordinate of the KZG commitment. This is the raw byte representation of the field element. + #[prost(bytes = "vec", tag = "1")] + pub x: ::prost::alloc::vec::Vec, + /// The Y coordinate of the KZG commitment. This is the raw byte representation of the field element. + #[prost(bytes = "vec", tag = "2")] + pub y: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct G2Commitment { + /// The A0 element of the X coordinate of G2 point. + #[prost(bytes = "vec", tag = "1")] + pub x_a0: ::prost::alloc::vec::Vec, + /// The A1 element of the X coordinate of G2 point. + #[prost(bytes = "vec", tag = "2")] + pub x_a1: ::prost::alloc::vec::Vec, + /// The A0 element of the Y coordinate of G2 point. + #[prost(bytes = "vec", tag = "3")] + pub y_a0: ::prost::alloc::vec::Vec, + /// The A1 element of the Y coordinate of G2 point. + #[prost(bytes = "vec", tag = "4")] + pub y_a1: ::prost::alloc::vec::Vec, +} +/// BlobCommitment represents commitment of a specific blob, containing its +/// KZG commitment, degree proof, the actual degree, and data length in number of symbols. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlobCommitment { + #[prost(message, optional, tag = "1")] + pub commitment: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub length_commitment: ::core::option::Option, + #[prost(message, optional, tag = "3")] + pub length_proof: ::core::option::Option, + #[prost(uint32, tag = "4")] + pub data_length: u32, +} +/// BlobCertificate is what gets attested by the network +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlobCertificate { + #[prost(uint32, tag = "1")] + pub version: u32, + #[prost(bytes = "vec", tag = "2")] + pub blob_key: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "3")] + pub blob_commitment: ::core::option::Option, + #[prost(uint32, repeated, tag = "4")] + pub quorum_numbers: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "5")] + pub reference_block_number: u32, +} +/// A chunk of a blob. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ChunkData { + #[prost(bytes = "vec", tag = "1")] + pub data: ::prost::alloc::vec::Vec, +} diff --git a/core/node/da_clients/src/eigen/generated/disperser.rs b/core/node/da_clients/src/eigen/generated/disperser.rs new file mode 100644 index 000000000000..7e94d910ecb7 --- /dev/null +++ b/core/node/da_clients/src/eigen/generated/disperser.rs @@ -0,0 +1,517 @@ +// This file is @generated by prost-build. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AuthenticatedRequest { + #[prost(oneof = "authenticated_request::Payload", tags = "1, 2")] + pub payload: ::core::option::Option, +} +/// Nested message and enum types in `AuthenticatedRequest`. +pub mod authenticated_request { + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Payload { + #[prost(message, tag = "1")] + DisperseRequest(super::DisperseBlobRequest), + #[prost(message, tag = "2")] + AuthenticationData(super::AuthenticationData), + } +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AuthenticatedReply { + #[prost(oneof = "authenticated_reply::Payload", tags = "1, 2")] + pub payload: ::core::option::Option, +} +/// Nested message and enum types in `AuthenticatedReply`. +pub mod authenticated_reply { + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Payload { + #[prost(message, tag = "1")] + BlobAuthHeader(super::BlobAuthHeader), + #[prost(message, tag = "2")] + DisperseReply(super::DisperseBlobReply), + } +} +/// BlobAuthHeader contains information about the blob for the client to verify and sign. +/// - Once payments are enabled, the BlobAuthHeader will contain the KZG commitment to the blob, which the client +/// will verify and sign. Having the client verify the KZG commitment instead of calculating it avoids +/// the need for the client to have the KZG structured reference string (SRS), which can be large. +/// The signed KZG commitment prevents the disperser from sending a different blob to the DA Nodes +/// than the one the client sent. +/// - In the meantime, the BlobAuthHeader contains a simple challenge parameter is used to prevent +/// replay attacks in the event that a signature is leaked. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlobAuthHeader { + #[prost(uint32, tag = "1")] + pub challenge_parameter: u32, +} +/// AuthenticationData contains the signature of the BlobAuthHeader. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AuthenticationData { + #[prost(bytes = "vec", tag = "1")] + pub authentication_data: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DisperseBlobRequest { + /// The data to be dispersed. + /// The size of data must be <= 2MiB. Every 32 bytes of data chunk is interpreted as an integer in big endian format + /// where the lower address has more significant bits. The integer must stay in the valid range to be interpreted + /// as a field element on the bn254 curve. The valid range is + /// 0 <= x < 21888242871839275222246405745257275088548364400416034343698204186575808495617 + /// containing slightly less than 254 bits and more than 253 bits. If any one of the 32 bytes chunk is outside the range, + /// the whole request is deemed as invalid, and rejected. + #[prost(bytes = "vec", tag = "1")] + pub data: ::prost::alloc::vec::Vec, + /// The quorums to which the blob will be sent, in addition to the required quorums which are configured + /// on the EigenDA smart contract. If required quorums are included here, an error will be returned. + /// The disperser will ensure that the encoded blobs for each quorum are all processed + /// within the same batch. + #[prost(uint32, repeated, tag = "2")] + pub custom_quorum_numbers: ::prost::alloc::vec::Vec, + /// The account ID of the client. This should be a hex-encoded string of the ECSDA public key + /// corresponding to the key used by the client to sign the BlobAuthHeader. + #[prost(string, tag = "3")] + pub account_id: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DisperseBlobReply { + /// The status of the blob associated with the request_id. + #[prost(enumeration = "BlobStatus", tag = "1")] + pub result: i32, + /// The request ID generated by the disperser. + /// Once a request is accepted (although not processed), a unique request ID will be + /// generated. + /// Two different DisperseBlobRequests (determined by the hash of the DisperseBlobRequest) + /// will have different IDs, and the same DisperseBlobRequest sent repeatedly at different + /// times will also have different IDs. + /// The client should use this ID to query the processing status of the request (via + /// the GetBlobStatus API). + #[prost(bytes = "vec", tag = "2")] + pub request_id: ::prost::alloc::vec::Vec, +} +/// BlobStatusRequest is used to query the status of a blob. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlobStatusRequest { + #[prost(bytes = "vec", tag = "1")] + pub request_id: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlobStatusReply { + /// The status of the blob. + #[prost(enumeration = "BlobStatus", tag = "1")] + pub status: i32, + /// The blob info needed for clients to confirm the blob against the EigenDA contracts. + #[prost(message, optional, tag = "2")] + pub info: ::core::option::Option, +} +/// RetrieveBlobRequest contains parameters to retrieve the blob. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RetrieveBlobRequest { + #[prost(bytes = "vec", tag = "1")] + pub batch_header_hash: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "2")] + pub blob_index: u32, +} +/// RetrieveBlobReply contains the retrieved blob data +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RetrieveBlobReply { + #[prost(bytes = "vec", tag = "1")] + pub data: ::prost::alloc::vec::Vec, +} +/// BlobInfo contains information needed to confirm the blob against the EigenDA contracts +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlobInfo { + #[prost(message, optional, tag = "1")] + pub blob_header: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub blob_verification_proof: ::core::option::Option, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlobHeader { + /// KZG commitment of the blob. + #[prost(message, optional, tag = "1")] + pub commitment: ::core::option::Option, + /// The length of the blob in symbols (each symbol is 32 bytes). + #[prost(uint32, tag = "2")] + pub data_length: u32, + /// The params of the quorums that this blob participates in. + #[prost(message, repeated, tag = "3")] + pub blob_quorum_params: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlobQuorumParam { + /// The ID of the quorum. + #[prost(uint32, tag = "1")] + pub quorum_number: u32, + /// The max percentage of stake within the quorum that can be held by or delegated + /// to adversarial operators. Currently, this and the next parameter are standardized + /// across the quorum using values read from the EigenDA contracts. + #[prost(uint32, tag = "2")] + pub adversary_threshold_percentage: u32, + /// The min percentage of stake that must attest in order to consider + /// the dispersal is successful. + #[prost(uint32, tag = "3")] + pub confirmation_threshold_percentage: u32, + /// The length of each chunk. + #[prost(uint32, tag = "4")] + pub chunk_length: u32, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlobVerificationProof { + /// batch_id is an incremental ID assigned to a batch by EigenDAServiceManager + #[prost(uint32, tag = "1")] + pub batch_id: u32, + /// The index of the blob in the batch (which is logically an ordered list of blobs). + #[prost(uint32, tag = "2")] + pub blob_index: u32, + #[prost(message, optional, tag = "3")] + pub batch_metadata: ::core::option::Option, + /// inclusion_proof is a merkle proof for a blob header's inclusion in a batch + #[prost(bytes = "vec", tag = "4")] + pub inclusion_proof: ::prost::alloc::vec::Vec, + /// indexes of quorums in BatchHeader.quorum_numbers that match the quorums in BlobHeader.blob_quorum_params + /// Ex. BlobHeader.blob_quorum_params = [ + /// { + /// quorum_number = 0, + /// ... + /// }, + /// { + /// quorum_number = 3, + /// ... + /// }, + /// { + /// quorum_number = 5, + /// ... + /// }, + /// ] + /// BatchHeader.quorum_numbers = \[0, 5, 3\] => 0x000503 + /// Then, quorum_indexes = \[0, 2, 1\] => 0x000201 + #[prost(bytes = "vec", tag = "5")] + pub quorum_indexes: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BatchMetadata { + #[prost(message, optional, tag = "1")] + pub batch_header: ::core::option::Option, + /// The hash of all public keys of the operators that did not sign the batch. + #[prost(bytes = "vec", tag = "2")] + pub signatory_record_hash: ::prost::alloc::vec::Vec, + /// The fee payment paid by users for dispersing this batch. It's the bytes + /// representation of a big.Int value. + #[prost(bytes = "vec", tag = "3")] + pub fee: ::prost::alloc::vec::Vec, + /// The Ethereum block number at which the batch is confirmed onchain. + #[prost(uint32, tag = "4")] + pub confirmation_block_number: u32, + /// This is the hash of the ReducedBatchHeader defined onchain, see: + /// + /// The is the message that the operators will sign their signatures on. + #[prost(bytes = "vec", tag = "5")] + pub batch_header_hash: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BatchHeader { + /// The root of the merkle tree with the hashes of blob headers as leaves. + #[prost(bytes = "vec", tag = "1")] + pub batch_root: ::prost::alloc::vec::Vec, + /// All quorums associated with blobs in this batch. Sorted in ascending order. + /// Ex. \[0, 2, 1\] => 0x000102 + #[prost(bytes = "vec", tag = "2")] + pub quorum_numbers: ::prost::alloc::vec::Vec, + /// The percentage of stake that has signed for this batch. + /// The quorum_signed_percentages\[i\] is percentage for the quorum_numbers\[i\]. + #[prost(bytes = "vec", tag = "3")] + pub quorum_signed_percentages: ::prost::alloc::vec::Vec, + /// The Ethereum block number at which the batch was created. + /// The Disperser will encode and disperse the blobs based on the onchain info + /// (e.g. operator stakes) at this block number. + #[prost(uint32, tag = "4")] + pub reference_block_number: u32, +} +/// BlobStatus represents the status of a blob. +/// The status of a blob is updated as the blob is processed by the disperser. +/// The status of a blob can be queried by the client using the GetBlobStatus API. +/// Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state: +/// - PROCESSING +/// - DISPERSING +/// - CONFIRMED +/// Terminal states are states that will not be updated to a different state: +/// - FAILED +/// - FINALIZED +/// - INSUFFICIENT_SIGNATURES +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum BlobStatus { + Unknown = 0, + /// PROCESSING means that the blob is currently being processed by the disperser + Processing = 1, + /// CONFIRMED means that the blob has been dispersed to DA Nodes and the dispersed + /// batch containing the blob has been confirmed onchain + Confirmed = 2, + /// FAILED means that the blob has failed permanently (for reasons other than insufficient + /// signatures, which is a separate state) + Failed = 3, + /// FINALIZED means that the block containing the blob's confirmation transaction has been finalized on Ethereum + Finalized = 4, + /// INSUFFICIENT_SIGNATURES means that the confirmation threshold for the blob was not met + /// for at least one quorum. + InsufficientSignatures = 5, + /// DISPERSING means that the blob is currently being dispersed to DA Nodes and being confirmed onchain + Dispersing = 6, +} +impl BlobStatus { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + BlobStatus::Unknown => "UNKNOWN", + BlobStatus::Processing => "PROCESSING", + BlobStatus::Confirmed => "CONFIRMED", + BlobStatus::Failed => "FAILED", + BlobStatus::Finalized => "FINALIZED", + BlobStatus::InsufficientSignatures => "INSUFFICIENT_SIGNATURES", + BlobStatus::Dispersing => "DISPERSING", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "UNKNOWN" => Some(Self::Unknown), + "PROCESSING" => Some(Self::Processing), + "CONFIRMED" => Some(Self::Confirmed), + "FAILED" => Some(Self::Failed), + "FINALIZED" => Some(Self::Finalized), + "INSUFFICIENT_SIGNATURES" => Some(Self::InsufficientSignatures), + "DISPERSING" => Some(Self::Dispersing), + _ => None, + } + } +} +/// Generated client implementations. +pub mod disperser_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + /// Disperser defines the public APIs for dispersing blobs. + #[derive(Debug, Clone)] + pub struct DisperserClient { + inner: tonic::client::Grpc, + } + impl DisperserClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl DisperserClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> DisperserClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + DisperserClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// This API accepts blob to disperse from clients. + /// This executes the dispersal async, i.e. it returns once the request + /// is accepted. The client could use GetBlobStatus() API to poll the the + /// processing status of the blob. + pub async fn disperse_blob( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/disperser.Disperser/DisperseBlob", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("disperser.Disperser", "DisperseBlob")); + self.inner.unary(req, path, codec).await + } + /// DisperseBlobAuthenticated is similar to DisperseBlob, except that it requires the + /// client to authenticate itself via the AuthenticationData message. The protoco is as follows: + /// 1. The client sends a DisperseBlobAuthenticated request with the DisperseBlobRequest message + /// 2. The Disperser sends back a BlobAuthHeader message containing information for the client to + /// verify and sign. + /// 3. The client verifies the BlobAuthHeader and sends back the signed BlobAuthHeader in an + /// AuthenticationData message. + /// 4. The Disperser verifies the signature and returns a DisperseBlobReply message. + pub async fn disperse_blob_authenticated( + &mut self, + request: impl tonic::IntoStreamingRequest< + Message = super::AuthenticatedRequest, + >, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/disperser.Disperser/DisperseBlobAuthenticated", + ); + let mut req = request.into_streaming_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("disperser.Disperser", "DisperseBlobAuthenticated"), + ); + self.inner.streaming(req, path, codec).await + } + /// This API is meant to be polled for the blob status. + pub async fn get_blob_status( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/disperser.Disperser/GetBlobStatus", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("disperser.Disperser", "GetBlobStatus")); + self.inner.unary(req, path, codec).await + } + /// This retrieves the requested blob from the Disperser's backend. + /// This is a more efficient way to retrieve blobs than directly retrieving + /// from the DA Nodes (see detail about this approach in + /// api/proto/retriever/retriever.proto). + /// The blob should have been initially dispersed via this Disperser service + /// for this API to work. + pub async fn retrieve_blob( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/disperser.Disperser/RetrieveBlob", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("disperser.Disperser", "RetrieveBlob")); + self.inner.unary(req, path, codec).await + } + } +} diff --git a/core/node/da_clients/src/eigen/mod.rs b/core/node/da_clients/src/eigen/mod.rs new file mode 100644 index 000000000000..699eae894246 --- /dev/null +++ b/core/node/da_clients/src/eigen/mod.rs @@ -0,0 +1,14 @@ +mod client; +mod sdk; + +pub use self::client::EigenClient; + +#[allow(clippy::all)] +pub(crate) mod disperser { + include!("generated/disperser.rs"); +} + +#[allow(clippy::all)] +pub(crate) mod common { + include!("generated/common.rs"); +} diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs new file mode 100644 index 000000000000..4013cafea298 --- /dev/null +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -0,0 +1,217 @@ +use std::{str::FromStr, time::Duration}; + +use secp256k1::{ecdsa::RecoverableSignature, SecretKey}; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tonic::{ + transport::{Channel, ClientTlsConfig, Endpoint}, + Streaming, +}; + +use crate::eigen::{ + disperser, + disperser::{ + authenticated_request::Payload::{AuthenticationData, DisperseRequest}, + disperser_client::DisperserClient, + AuthenticatedReply, BlobAuthHeader, BlobVerificationProof, DisperseBlobReply, + }, +}; + +#[derive(Debug, Clone)] +pub struct RawEigenClient { + client: DisperserClient, + polling_interval: Duration, + private_key: SecretKey, + account_id: String, +} + +impl RawEigenClient { + pub(crate) const BUFFER_SIZE: usize = 1000; + + pub async fn new( + rpc_node_url: String, + inclusion_polling_interval_ms: u64, + private_key: SecretKey, + ) -> anyhow::Result { + let endpoint = + Endpoint::from_str(rpc_node_url.as_str())?.tls_config(ClientTlsConfig::new())?; + let client = DisperserClient::connect(endpoint) + .await + .map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?; + let polling_interval = Duration::from_millis(inclusion_polling_interval_ms); + + let account_id = get_account_id(&private_key); + + Ok(RawEigenClient { + client, + polling_interval, + private_key, + account_id, + }) + } + + pub async fn dispatch_blob(&self, data: Vec) -> anyhow::Result { + let mut client_clone = self.client.clone(); + let (tx, rx) = mpsc::channel(Self::BUFFER_SIZE); + + let response_stream = client_clone.disperse_blob_authenticated(ReceiverStream::new(rx)); + + // 1. send DisperseBlobRequest + self.disperse_data(data, &tx).await?; + + // this await is blocked until the first response on the stream, so we only await after sending the `DisperseBlobRequest` + let mut response_stream = response_stream.await?.into_inner(); + + // 2. receive BlobAuthHeader + let blob_auth_header = self.receive_blob_auth_header(&mut response_stream).await?; + + // 3. sign and send BlobAuthHeader + self.submit_authentication_data(blob_auth_header.clone(), &tx) + .await?; + + // 4. receive DisperseBlobReply + let reply = response_stream + .next() + .await + .ok_or_else(|| anyhow::anyhow!("No response from server"))? + .unwrap() + .payload + .ok_or_else(|| anyhow::anyhow!("No payload in response"))?; + + let disperser::authenticated_reply::Payload::DisperseReply(disperse_reply) = reply else { + return Err(anyhow::anyhow!("Unexpected response from server")); + }; + + // 5. poll for blob status until it reaches the Confirmed state + let verification_proof = self + .await_for_inclusion(client_clone, disperse_reply) + .await?; + let blob_id = format!( + "{}:{}", + verification_proof.batch_id, verification_proof.blob_index + ); + tracing::info!("Blob dispatch confirmed, blob id: {}", blob_id); + + Ok(blob_id) + } + + async fn disperse_data( + &self, + data: Vec, + tx: &mpsc::Sender, + ) -> anyhow::Result<()> { + let req = disperser::AuthenticatedRequest { + payload: Some(DisperseRequest(disperser::DisperseBlobRequest { + data, + custom_quorum_numbers: vec![], + account_id: self.account_id.clone(), + })), + }; + + tx.send(req) + .await + .map_err(|e| anyhow::anyhow!("Failed to send DisperseBlobRequest: {}", e)) + } + + async fn submit_authentication_data( + &self, + blob_auth_header: BlobAuthHeader, + tx: &mpsc::Sender, + ) -> anyhow::Result<()> { + // TODO: replace challenge_parameter with actual auth header when it is available + let digest = zksync_basic_types::web3::keccak256( + &blob_auth_header.challenge_parameter.to_be_bytes(), + ); + let signature: RecoverableSignature = secp256k1::Secp256k1::signing_only() + .sign_ecdsa_recoverable( + &secp256k1::Message::from_slice(&digest[..])?, + &self.private_key, + ); + let (recovery_id, sig) = signature.serialize_compact(); + + let mut signature = Vec::with_capacity(65); + signature.extend_from_slice(&sig); + signature.push(recovery_id.to_i32() as u8); + + let req = disperser::AuthenticatedRequest { + payload: Some(AuthenticationData(disperser::AuthenticationData { + authentication_data: signature, + })), + }; + + tx.send(req) + .await + .map_err(|e| anyhow::anyhow!("Failed to send AuthenticationData: {}", e)) + } + + async fn receive_blob_auth_header( + &self, + response_stream: &mut Streaming, + ) -> anyhow::Result { + let reply = response_stream + .next() + .await + .ok_or_else(|| anyhow::anyhow!("No response from server"))?; + + let Ok(reply) = reply else { + return Err(anyhow::anyhow!("Err from server: {:?}", reply)); + }; + + let reply = reply + .payload + .ok_or_else(|| anyhow::anyhow!("No payload in response"))?; + + if let disperser::authenticated_reply::Payload::BlobAuthHeader(blob_auth_header) = reply { + Ok(blob_auth_header) + } else { + Err(anyhow::anyhow!("Unexpected response from server")) + } + } + + async fn await_for_inclusion( + &self, + mut client: DisperserClient, + disperse_blob_reply: DisperseBlobReply, + ) -> anyhow::Result { + let polling_request = disperser::BlobStatusRequest { + request_id: disperse_blob_reply.request_id, + }; + + loop { + tokio::time::sleep(self.polling_interval).await; + let resp = client + .get_blob_status(polling_request.clone()) + .await? + .into_inner(); + + match disperser::BlobStatus::try_from(resp.status)? { + disperser::BlobStatus::Processing | disperser::BlobStatus::Dispersing => {} + disperser::BlobStatus::Failed => { + return Err(anyhow::anyhow!("Blob dispatch failed")) + } + disperser::BlobStatus::InsufficientSignatures => { + return Err(anyhow::anyhow!("Insufficient signatures")) + } + disperser::BlobStatus::Confirmed | disperser::BlobStatus::Finalized => { + let verification_proof = resp + .info + .ok_or_else(|| anyhow::anyhow!("No blob header in response"))? + .blob_verification_proof + .ok_or_else(|| anyhow::anyhow!("No blob verification proof in response"))?; + + return Ok(verification_proof); + } + + _ => return Err(anyhow::anyhow!("Received unknown blob status")), + } + } + } +} + +fn get_account_id(secret_key: &SecretKey) -> String { + let public_key = + secp256k1::PublicKey::from_secret_key(&secp256k1::Secp256k1::new(), secret_key); + let hex = hex::encode(public_key.serialize_uncompressed()); + + format!("0x{}", hex) +} diff --git a/core/node/da_clients/src/lib.rs b/core/node/da_clients/src/lib.rs index 8515c128ff3f..8a4c565a650a 100644 --- a/core/node/da_clients/src/lib.rs +++ b/core/node/da_clients/src/lib.rs @@ -1,5 +1,6 @@ pub mod avail; pub mod celestia; +pub mod eigen; pub mod no_da; pub mod object_store; mod utils; diff --git a/core/node/da_clients/src/no_da.rs b/core/node/da_clients/src/no_da.rs index 2710c9ce9d9b..db0557510ed2 100644 --- a/core/node/da_clients/src/no_da.rs +++ b/core/node/da_clients/src/no_da.rs @@ -15,7 +15,7 @@ impl DataAvailabilityClient for NoDAClient { } async fn get_inclusion_data(&self, _: &str) -> Result, DAError> { - return Ok(Some(InclusionData::default())); + Ok(Some(InclusionData::default())) } fn clone_boxed(&self) -> Box { diff --git a/core/node/node_framework/src/implementations/layers/da_clients/eigen.rs b/core/node/node_framework/src/implementations/layers/da_clients/eigen.rs new file mode 100644 index 000000000000..d5391ee433f9 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/da_clients/eigen.rs @@ -0,0 +1,46 @@ +use zksync_config::{configs::da_client::eigen::EigenSecrets, EigenConfig}; +use zksync_da_client::DataAvailabilityClient; +use zksync_da_clients::eigen::EigenClient; + +use crate::{ + implementations::resources::da_client::DAClientResource, + wiring_layer::{WiringError, WiringLayer}, + IntoContext, +}; + +#[derive(Debug)] +pub struct EigenWiringLayer { + config: EigenConfig, + secrets: EigenSecrets, +} + +impl EigenWiringLayer { + pub fn new(config: EigenConfig, secrets: EigenSecrets) -> Self { + Self { config, secrets } + } +} + +#[derive(Debug, IntoContext)] +#[context(crate = crate)] +pub struct Output { + pub client: DAClientResource, +} + +#[async_trait::async_trait] +impl WiringLayer for EigenWiringLayer { + type Input = (); + type Output = Output; + + fn layer_name(&self) -> &'static str { + "eigen_client_layer" + } + + async fn wire(self, _input: Self::Input) -> Result { + let client: Box = + Box::new(EigenClient::new(self.config, self.secrets).await?); + + Ok(Self::Output { + client: DAClientResource(client), + }) + } +} diff --git a/core/node/node_framework/src/implementations/layers/da_clients/mod.rs b/core/node/node_framework/src/implementations/layers/da_clients/mod.rs index 6bb6ce4fb877..c7865c74f3b1 100644 --- a/core/node/node_framework/src/implementations/layers/da_clients/mod.rs +++ b/core/node/node_framework/src/implementations/layers/da_clients/mod.rs @@ -1,4 +1,5 @@ pub mod avail; pub mod celestia; +pub mod eigen; pub mod no_da; pub mod object_store;