Skip to content

Commit

Permalink
feat(eigen-client-m0-implementation): optimize concurrent dispatcher (#…
Browse files Browse the repository at this point in the history
…345)

* initial commit

* optimize dispatch_batches fn

* remove commented code

* remove needless variables

* optimize inclusion_poller fn

* break loop if dispatch fail

* remove client_lock variable

* switch to retriable err

* replace arbitrary value with config
  • Loading branch information
juan518munoz authored Nov 19, 2024
1 parent 598d6e0 commit 54edd50
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 261 deletions.
4 changes: 2 additions & 2 deletions core/node/da_clients/src/eigen/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use zksync_da_client::{
};

use super::{blob_info::BlobInfo, memstore::MemStore, sdk::RawEigenClient, Disperser};
use crate::utils::to_non_retriable_da_error;
use crate::utils::{to_non_retriable_da_error, to_retriable_da_error};

/// EigenClient is a client for the Eigen DA service.
/// It can be configured to use one of two dispersal methods:
Expand Down Expand Up @@ -58,7 +58,7 @@ impl DataAvailabilityClient for EigenClient {
Disperser::Remote(remote_disperser) => remote_disperser
.dispatch_blob(data)
.await
.map_err(to_non_retriable_da_error)?,
.map_err(to_retriable_da_error)?,
Disperser::Memory(memstore) => memstore
.clone()
.put_blob(data)
Expand Down
56 changes: 34 additions & 22 deletions core/node/da_clients/src/eigen/sdk.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::{str::FromStr, time::Duration};
use std::{str::FromStr, sync::Arc, time::Duration};

use secp256k1::{ecdsa::RecoverableSignature, SecretKey};
use tokio::{sync::mpsc, time::Instant};
use tokio::{
sync::{mpsc, Mutex},
time::Instant,
};
use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt};
use tonic::{
transport::{Channel, ClientTlsConfig, Endpoint},
Expand All @@ -28,7 +31,7 @@ use crate::eigen::{

#[derive(Debug, Clone)]
pub(crate) struct RawEigenClient {
client: DisperserClient<Channel>,
client: Arc<Mutex<DisperserClient<Channel>>>,
private_key: SecretKey,
pub config: DisperserConfig,
verifier: Verifier,
Expand All @@ -40,9 +43,11 @@ impl RawEigenClient {
pub async fn new(private_key: SecretKey, config: DisperserConfig) -> anyhow::Result<Self> {
let endpoint =
Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?;
let client = DisperserClient::connect(endpoint)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?;
let client = Arc::new(Mutex::new(
DisperserClient::connect(endpoint)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?,
));

let verifier_config = VerifierConfig {
verify_certs: true,
Expand Down Expand Up @@ -70,13 +75,16 @@ impl RawEigenClient {
account_id: String::default(), // Account Id is not used in non-authenticated mode
};

let mut client_clone = self.client.clone();
let disperse_reply = client_clone.disperse_blob(request).await?.into_inner();
let disperse_reply = self
.client
.lock()
.await
.disperse_blob(request)
.await?
.into_inner();

let disperse_time = Instant::now();
let blob_info = self
.await_for_inclusion(client_clone, disperse_reply)
.await?;
let blob_info = self.await_for_inclusion(disperse_reply).await?;
let disperse_elapsed = Instant::now() - disperse_time;

let blob_info = blob_info::BlobInfo::try_from(blob_info)
Expand Down Expand Up @@ -116,19 +124,22 @@ impl RawEigenClient {
}

async fn dispatch_blob_authenticated(&self, data: Vec<u8>) -> anyhow::Result<String> {
let mut client_clone = self.client.clone();
let (tx, rx) = mpsc::unbounded_channel();

let disperse_time = Instant::now();
let response_stream =
client_clone.disperse_blob_authenticated(UnboundedReceiverStream::new(rx));
let padded_data = convert_by_padding_empty_byte(&data);

// 1. send DisperseBlobRequest
let padded_data = convert_by_padding_empty_byte(&data);
self.disperse_data(padded_data, &tx)?;

// this await is blocked until the first response on the stream, so we only await after sending the `DisperseBlobRequest`
let mut response_stream = response_stream.await?;
let mut response_stream = self
.client
.clone()
.lock()
.await
.disperse_blob_authenticated(UnboundedReceiverStream::new(rx))
.await?;
let response_stream = response_stream.get_mut();

// 2. receive BlobAuthHeader
Expand All @@ -151,9 +162,7 @@ impl RawEigenClient {
};

// 5. poll for blob status until it reaches the Confirmed state
let blob_info = self
.await_for_inclusion(client_clone, disperse_reply)
.await?;
let blob_info = self.await_for_inclusion(disperse_reply).await?;

let blob_info = blob_info::BlobInfo::try_from(blob_info)
.map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?;
Expand Down Expand Up @@ -255,7 +264,6 @@ impl RawEigenClient {

async fn await_for_inclusion(
&self,
mut client: DisperserClient<Channel>,
disperse_blob_reply: DisperseBlobReply,
) -> anyhow::Result<DisperserBlobInfo> {
let polling_request = disperser::BlobStatusRequest {
Expand All @@ -266,7 +274,10 @@ impl RawEigenClient {
while Instant::now() - start_time < Duration::from_millis(self.config.status_query_timeout)
{
tokio::time::sleep(Duration::from_millis(self.config.status_query_interval)).await;
let resp = client
let resp = self
.client
.lock()
.await
.get_blob_status(polling_request.clone())
.await?
.into_inner();
Expand Down Expand Up @@ -323,7 +334,8 @@ impl RawEigenClient {
.batch_header_hash;
let get_response = self
.client
.clone()
.lock()
.await
.retrieve_blob(disperser::RetrieveBlobRequest {
batch_header_hash,
blob_index,
Expand Down
Loading

0 comments on commit 54edd50

Please sign in to comment.