Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eigen-client-extra-features): Move inclusion logic #357

Merged
merged 2 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions core/node/da_clients/src/eigen/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ impl EigenClient {
client: Arc::new(client),
})
}

pub async fn get_commitment(&self, blob_id: &str) -> anyhow::Result<String> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, anyhow should not be used

let blob_info = self.client.get_inclusion_data(blob_id).await?;
Ok(blob_info)
}
}

#[async_trait]
Expand Down Expand Up @@ -60,7 +65,11 @@ impl DataAvailabilityClient for EigenClient {
}

async fn get_inclusion_data(&self, blob_id: &str) -> Result<Option<InclusionData>, DAError> {
let rlp_encoded_bytes = hex::decode(blob_id).map_err(|_| DAError {
let blob_info = self
.get_commitment(blob_id)
.await
.map_err(to_non_retriable_da_error)?;
let rlp_encoded_bytes = hex::decode(blob_info).map_err(|_| DAError {
error: anyhow!("Failed to decode blob_id"),
is_retriable: false,
})?;
Expand Down Expand Up @@ -123,8 +132,11 @@ mod tests {
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();

let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap();
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
Expand All @@ -133,7 +145,7 @@ mod tests {
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}

Expand Down Expand Up @@ -163,8 +175,10 @@ mod tests {
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap();
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
Expand All @@ -173,7 +187,7 @@ mod tests {
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}

Expand Down Expand Up @@ -203,8 +217,10 @@ mod tests {
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap();
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
Expand All @@ -213,7 +229,7 @@ mod tests {
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}

Expand Down Expand Up @@ -277,8 +293,10 @@ mod tests {
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap();
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
Expand All @@ -287,7 +305,7 @@ mod tests {
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}

Expand Down Expand Up @@ -317,8 +335,10 @@ mod tests {
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap();
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
Expand All @@ -327,7 +347,7 @@ mod tests {
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}
}
51 changes: 18 additions & 33 deletions core/node/da_clients/src/eigen/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use tonic::{
Streaming,
};
use zksync_config::EigenConfig;
#[cfg(test)]
use zksync_da_client::types::DAError;

use super::{
Expand Down Expand Up @@ -79,28 +78,7 @@ impl RawEigenClient {
let mut client_clone = self.client.clone();
let disperse_reply = client_clone.disperse_blob(request).await?.into_inner();

let disperse_time = Instant::now();
let blob_info = self
.await_for_inclusion(client_clone, disperse_reply)
.await?;
let disperse_elapsed = Instant::now() - disperse_time;

let blob_info = blob_info::BlobInfo::try_from(blob_info)
.map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?;
self.verifier
.verify_commitment(blob_info.blob_header.commitment.clone(), data)
.map_err(|_| anyhow::anyhow!("Failed to verify commitment"))?;

self.loop_verify_certificate(blob_info.clone(), disperse_elapsed)
.await?;
let verification_proof = blob_info.blob_verification_proof.clone();
let blob_id = format!(
"{}:{}",
verification_proof.batch_id, verification_proof.blob_index
);
tracing::info!("Blob dispatch confirmed, blob id: {}", blob_id);

Ok(hex::encode(rlp::encode(&blob_info)))
Ok(hex::encode(disperse_reply.request_id))
}

async fn loop_verify_certificate(
Expand All @@ -126,7 +104,6 @@ impl RawEigenClient {
let mut client_clone = self.client.clone();
let (tx, rx) = mpsc::channel(Self::BUFFER_SIZE);

let disperse_time = Instant::now();
let response_stream = client_clone.disperse_blob_authenticated(ReceiverStream::new(rx));
let padded_data = convert_by_padding_empty_byte(&data);

Expand Down Expand Up @@ -155,18 +132,28 @@ impl RawEigenClient {
let disperser::authenticated_reply::Payload::DisperseReply(disperse_reply) = reply else {
return Err(anyhow::anyhow!("Unexpected response from server"));
};
Ok(hex::encode(disperse_reply.request_id))
}

// 5. poll for blob status until it reaches the Confirmed state
pub async fn get_inclusion_data(&self, blob_id: &str) -> anyhow::Result<String> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

let client_clone = self.client.clone();
let disperse_time = Instant::now();
let blob_info = self
.await_for_inclusion(client_clone, disperse_reply)
.await_for_inclusion(client_clone, blob_id.to_string())
.await?;

let blob_info = blob_info::BlobInfo::try_from(blob_info)
.map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?;

let disperse_elapsed = Instant::now() - disperse_time;
let data = self
.get_blob_data(&hex::encode(rlp::encode(&blob_info)))
.await?;
if data.is_none() {
return Err(anyhow::anyhow!("Failed to get blob data"));
}
self.verifier
.verify_commitment(blob_info.blob_header.commitment.clone(), data)
.verify_commitment(blob_info.blob_header.commitment.clone(), data.unwrap())
.map_err(|_| anyhow::anyhow!("Failed to verify commitment"))?;

self.loop_verify_certificate(blob_info.clone(), disperse_elapsed)
Expand Down Expand Up @@ -265,10 +252,10 @@ impl RawEigenClient {
async fn await_for_inclusion(
&self,
client: DisperserClient<Channel>,
disperse_blob_reply: DisperseBlobReply,
request_id: String,
) -> anyhow::Result<DisperserBlobInfo> {
let polling_request = disperser::BlobStatusRequest {
request_id: disperse_blob_reply.request_id,
request_id: hex::decode(request_id)?,
};

let blob_info = (|| async {
Expand Down Expand Up @@ -318,14 +305,13 @@ impl RawEigenClient {
Ok(blob_info)
}

#[cfg(test)]
pub async fn get_blob_data(&self, blob_id: &str) -> anyhow::Result<Option<Vec<u8>>, DAError> {
pub async fn get_blob_data(&self, blob_info: &str) -> anyhow::Result<Option<Vec<u8>>, DAError> {
use anyhow::anyhow;
use zksync_da_client::types::DAError;

use crate::eigen::blob_info::BlobInfo;

let commit = hex::decode(blob_id).map_err(|_| DAError {
let commit = hex::decode(blob_info).map_err(|_| DAError {
error: anyhow!("Failed to decode blob_id"),
is_retriable: false,
})?;
Expand Down Expand Up @@ -398,7 +384,6 @@ fn convert_by_padding_empty_byte(data: &[u8]) -> Vec<u8> {
valid_data
}

#[cfg(test)]
fn remove_empty_byte_from_padded_bytes(data: &[u8]) -> Vec<u8> {
let parse_size = DATA_CHUNK_SIZE;

Expand Down
Loading