Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eigen-client-m0-implementation): grafana metrics #334

Merged
4 changes: 2 additions & 2 deletions core/node/da_clients/src/eigen/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use zksync_da_client::{
};

use super::{blob_info::BlobInfo, memstore::MemStore, sdk::RawEigenClient, Disperser};
use crate::utils::to_non_retriable_da_error;
use crate::utils::{to_non_retriable_da_error, to_retriable_da_error};

/// EigenClient is a client for the Eigen DA service.
/// It can be configured to use one of two dispersal methods:
Expand Down Expand Up @@ -58,7 +58,7 @@ impl DataAvailabilityClient for EigenClient {
Disperser::Remote(remote_disperser) => remote_disperser
.dispatch_blob(data)
.await
.map_err(to_non_retriable_da_error)?,
.map_err(to_retriable_da_error)?,
Disperser::Memory(memstore) => memstore
.clone()
.put_blob(data)
Expand Down
63 changes: 59 additions & 4 deletions core/node/da_clients/src/eigen/eigenda-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,27 @@ cargo install --path zkstack_cli/crates/zkstack --force --locked
zkstack containers --observability true
```

3. Create `eigen_da` chain
3. Temporary metrics setup (until `era-observabilty` changes are also merged)

a. Setup the observability container at least once so the `era-observability` directory is cloned.

```bash
zkstack containers --observability true
```

b. Add `lambda` remote to the `era-observability` project:

```bash
cd era-observability && git remote add lambda https://github.com/lambdaclass/era-observability.git
```

c. Fetch and checkout the `eigenda` branch:

```bash
git fetch lambda && git checkout eigenda
```

4. Create `eigen_da` chain

```bash
zkstack chain create \
Expand All @@ -91,7 +111,7 @@ zkstack chain create \
--set-as-default false
```

4. Initialize created ecosystem
5. Initialize created ecosystem

```bash
zkstack ecosystem init \
Expand All @@ -107,7 +127,42 @@ zkstack ecosystem init \

You may enable observability here if you want to.

5. Start the server
6. Setup grafana dashboard for Data Availability

a. Get the running port of the eigen_da chain in the `chains/eigen_da/configs/general.yaml` file:

```yaml
prometheus:
listener_port: 3414 # <- this is the port
```

(around line 108)

Then modify the `era-observability/etc/prometheus/prometheus.yml` with the retrieved port:

```yaml
- job_name: 'zksync'
scrape_interval: 5s
honor_labels: true
static_configs:
- targets: ['host.docker.internal:3312'] # <- change this to the port
```

b. Enable the Data Availability Grafana dashboard

```bash
mv era-observability/additional_dashboards/EigenDA.json era-observability/dashboards/EigenDA.json
```

c. Restart the era-observability container

```bash
docker ps --filter "label=com.docker.compose.project=era-observability" -q | xargs docker restart
```

(this can also be done through the docker dashboard)

7. Start the server

```bash
zkstack server --chain eigen_da
Expand All @@ -125,7 +180,7 @@ And with the server running on one terminal, you can run the server integration
following command:

```bash
zkstack dev test --chain eigen_da
zkstack dev test integration --chain eigen_da
```

## Mainnet/Testnet setup
Expand Down
79 changes: 44 additions & 35 deletions core/node/da_clients/src/eigen/sdk.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::{str::FromStr, time::Duration};
use std::{str::FromStr, sync::Arc, time::Duration};

use secp256k1::{ecdsa::RecoverableSignature, SecretKey};
use tokio::{sync::mpsc, time::Instant};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tokio::{
sync::{mpsc, Mutex},
time::Instant,
};
use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt};
use tonic::{
transport::{Channel, ClientTlsConfig, Endpoint},
Streaming,
Expand All @@ -28,7 +31,7 @@ use crate::eigen::{

#[derive(Debug, Clone)]
pub(crate) struct RawEigenClient {
client: DisperserClient<Channel>,
client: Arc<Mutex<DisperserClient<Channel>>>,
private_key: SecretKey,
pub config: DisperserConfig,
verifier: Verifier,
Expand All @@ -37,14 +40,14 @@ pub(crate) struct RawEigenClient {
pub(crate) const DATA_CHUNK_SIZE: usize = 32;

impl RawEigenClient {
pub(crate) const BUFFER_SIZE: usize = 1000;

pub async fn new(private_key: SecretKey, config: DisperserConfig) -> anyhow::Result<Self> {
let endpoint =
Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?;
let client = DisperserClient::connect(endpoint)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?;
let client = Arc::new(Mutex::new(
DisperserClient::connect(endpoint)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?,
));

let verifier_config = VerifierConfig {
verify_certs: true,
Expand Down Expand Up @@ -72,13 +75,16 @@ impl RawEigenClient {
account_id: String::default(), // Account Id is not used in non-authenticated mode
};

let mut client_clone = self.client.clone();
let disperse_reply = client_clone.disperse_blob(request).await?.into_inner();
let disperse_reply = self
.client
.lock()
.await
.disperse_blob(request)
.await?
.into_inner();

let disperse_time = Instant::now();
let blob_info = self
.await_for_inclusion(client_clone, disperse_reply)
.await?;
let blob_info = self.await_for_inclusion(disperse_reply).await?;
let disperse_elapsed = Instant::now() - disperse_time;

let blob_info = blob_info::BlobInfo::try_from(blob_info)
Expand Down Expand Up @@ -118,25 +124,29 @@ impl RawEigenClient {
}

async fn dispatch_blob_authenticated(&self, data: Vec<u8>) -> anyhow::Result<String> {
let mut client_clone = self.client.clone();
let (tx, rx) = mpsc::channel(Self::BUFFER_SIZE);
let (tx, rx) = mpsc::unbounded_channel();

let disperse_time = Instant::now();
let response_stream = client_clone.disperse_blob_authenticated(ReceiverStream::new(rx));
let padded_data = convert_by_padding_empty_byte(&data);

// 1. send DisperseBlobRequest
self.disperse_data(padded_data, &tx).await?;
let padded_data = convert_by_padding_empty_byte(&data);
self.disperse_data(padded_data, &tx)?;

// this await is blocked until the first response on the stream, so we only await after sending the `DisperseBlobRequest`
let mut response_stream = response_stream.await?.into_inner();
let mut response_stream = self
.client
.clone()
.lock()
.await
.disperse_blob_authenticated(UnboundedReceiverStream::new(rx))
.await?;
let response_stream = response_stream.get_mut();

// 2. receive BlobAuthHeader
let blob_auth_header = self.receive_blob_auth_header(&mut response_stream).await?;
let blob_auth_header = self.receive_blob_auth_header(response_stream).await?;

// 3. sign and send BlobAuthHeader
self.submit_authentication_data(blob_auth_header.clone(), &tx)
.await?;
self.submit_authentication_data(blob_auth_header.clone(), &tx)?;

// 4. receive DisperseBlobReply
let reply = response_stream
Expand All @@ -152,9 +162,7 @@ impl RawEigenClient {
};

// 5. poll for blob status until it reaches the Confirmed state
let blob_info = self
.await_for_inclusion(client_clone, disperse_reply)
.await?;
let blob_info = self.await_for_inclusion(disperse_reply).await?;

let blob_info = blob_info::BlobInfo::try_from(blob_info)
.map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?;
Expand Down Expand Up @@ -183,10 +191,10 @@ impl RawEigenClient {
}
}

async fn disperse_data(
fn disperse_data(
&self,
data: Vec<u8>,
tx: &mpsc::Sender<disperser::AuthenticatedRequest>,
tx: &mpsc::UnboundedSender<disperser::AuthenticatedRequest>,
) -> anyhow::Result<()> {
let req = disperser::AuthenticatedRequest {
payload: Some(DisperseRequest(disperser::DisperseBlobRequest {
Expand All @@ -197,14 +205,13 @@ impl RawEigenClient {
};

tx.send(req)
.await
.map_err(|e| anyhow::anyhow!("Failed to send DisperseBlobRequest: {}", e))
}

async fn submit_authentication_data(
fn submit_authentication_data(
&self,
blob_auth_header: BlobAuthHeader,
tx: &mpsc::Sender<disperser::AuthenticatedRequest>,
tx: &mpsc::UnboundedSender<disperser::AuthenticatedRequest>,
) -> anyhow::Result<()> {
// TODO: replace challenge_parameter with actual auth header when it is available
let digest = zksync_basic_types::web3::keccak256(
Expand All @@ -228,7 +235,6 @@ impl RawEigenClient {
};

tx.send(req)
.await
.map_err(|e| anyhow::anyhow!("Failed to send AuthenticationData: {}", e))
}

Expand Down Expand Up @@ -258,7 +264,6 @@ impl RawEigenClient {

async fn await_for_inclusion(
&self,
mut client: DisperserClient<Channel>,
disperse_blob_reply: DisperseBlobReply,
) -> anyhow::Result<DisperserBlobInfo> {
let polling_request = disperser::BlobStatusRequest {
Expand All @@ -269,7 +274,10 @@ impl RawEigenClient {
while Instant::now() - start_time < Duration::from_millis(self.config.status_query_timeout)
{
tokio::time::sleep(Duration::from_millis(self.config.status_query_interval)).await;
let resp = client
let resp = self
.client
.lock()
.await
.get_blob_status(polling_request.clone())
.await?
.into_inner();
Expand Down Expand Up @@ -326,7 +334,8 @@ impl RawEigenClient {
.batch_header_hash;
let get_response = self
.client
.clone()
.lock()
.await
.retrieve_blob(disperser::RetrieveBlobRequest {
batch_header_hash,
blob_index,
Expand Down
Loading
Loading