Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Refactor metrics/make API use binaries #2735

Merged
merged 13 commits into from
Aug 28, 2024
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ categories = ["cryptography"]
anyhow = "1"
assert_matches = "1.5"
async-trait = "0.1"
axum = "0.7.5"
axum = { version = "0.7.5", features = ["multipart"] }
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved
backon = "0.4.4"
bigdecimal = "0.4.5"
bincode = "1"
Expand Down
3 changes: 0 additions & 3 deletions core/lib/prover_interface/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ pub enum SubmitProofRequest {
SkippedProofGeneration,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct OptionalProofGenerationDataRequest(pub Option<L1BatchNumber>);

#[derive(Debug, Serialize, Deserialize)]
pub struct VerifyProofRequest(pub Box<L1BatchProofForL1>);

Expand Down
30 changes: 19 additions & 11 deletions core/node/external_proof_integration_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ mod processor;
use std::{net::SocketAddr, sync::Arc};

use anyhow::Context;
use axum::{extract::Path, routing::post, Json, Router};
use axum::{
extract::{Multipart, Path},
middleware,
routing::{get, post},
Router,
};
use tokio::sync::watch;
use zksync_basic_types::commitment::L1BatchCommitmentMode;
use zksync_config::configs::external_proof_integration_api::ExternalProofIntegrationApiConfig;
use zksync_dal::{ConnectionPool, Core};
use zksync_object_store::ObjectStore;
use zksync_prover_interface::api::{OptionalProofGenerationDataRequest, VerifyProofRequest};

use crate::processor::Processor;

Expand Down Expand Up @@ -50,25 +54,29 @@ async fn create_router(
let mut processor =
Processor::new(blob_store.clone(), connection_pool.clone(), commitment_mode);
let verify_proof_processor = processor.clone();
let specific_proof_processor = processor.clone();
Router::new()
.route(
"/proof_generation_data",
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved
post(
// we use post method because the returned data is not idempotent,
// i.e we return different result on each call.
move |payload: Json<OptionalProofGenerationDataRequest>| async move {
processor.get_proof_generation_data(payload).await
},
),
get(move || async move { processor.get_proof_generation_data().await }),
)
.route(
"/proof_generation_data/:l1_batch_number",
get(move |l1_batch_number: Path<u32>| async move {
specific_proof_processor
.proof_generation_data_for_existing_batch(l1_batch_number)
.await
}),
)
.route(
"/verify_proof/:l1_batch_number",
post(
move |l1_batch_number: Path<u32>, payload: Json<VerifyProofRequest>| async move {
move |l1_batch_number: Path<u32>, multipart: Multipart| async move {
verify_proof_processor
.verify_proof(l1_batch_number, payload)
.verify_proof(l1_batch_number, multipart)
.await
},
),
)
.layer(middleware::from_fn(metrics::call_outcome_tracker))
}
49 changes: 29 additions & 20 deletions core/node/external_proof_integration_api/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::time::Duration;

use axum::{extract::Request, middleware::Next, response::Response};
use tokio::time::Instant;
use vise::{EncodeLabelSet, EncodeLabelValue, Histogram, LabeledFamily, Metrics};

Expand All @@ -16,6 +17,7 @@ pub(crate) enum Method {
GetLatestProofGenerationData,
GetSpecificProofGenerationData,
VerifyProof,
Unknown,
}

#[derive(Debug, Metrics)]
Expand All @@ -25,30 +27,37 @@ pub(crate) struct ProofIntegrationApiMetrics {
pub call_latency: LabeledFamily<(Method, CallOutcome), Histogram<Duration>, 2>,
}

pub(crate) struct MethodCallGuard {
method_type: Method,
outcome: CallOutcome,
started_at: Instant,
}
pub(crate) async fn call_outcome_tracker(request: Request, next: Next) -> Response {
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved
let start = Instant::now();
let path = request.uri().path();

impl MethodCallGuard {
pub(crate) fn new(method_type: Method) -> Self {
MethodCallGuard {
method_type,
outcome: CallOutcome::Failure,
started_at: Instant::now(),
let method = if path.starts_with("/proof_generation_data") {
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved
if let Some(char) = path.get(22..23) {
if char == "/" && path.get(23..).is_some() {
Method::GetSpecificProofGenerationData
} else {
Method::GetLatestProofGenerationData
}
} else {
Method::GetLatestProofGenerationData
}
}
} else if path.starts_with("/verify_proof/") {
Method::VerifyProof
} else {
Method::Unknown
};

pub(crate) fn mark_successful(&mut self) {
self.outcome = CallOutcome::Success;
}
}
let response = next.run(request).await;

let outcome = if response.status().is_success() {
CallOutcome::Success
} else {
CallOutcome::Failure
};

METRICS.call_latency[&(method, outcome)].observe(start.elapsed());

impl Drop for MethodCallGuard {
fn drop(&mut self) {
METRICS.call_latency[&(self.method_type, self.outcome)].observe(self.started_at.elapsed());
}
response
}

#[vise::register]
Expand Down
136 changes: 94 additions & 42 deletions core/node/external_proof_integration_api/src/processor.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
use std::sync::Arc;

use axum::{extract::Path, Json};
use axum::{
extract::{Multipart, Path},
http::header,
response::IntoResponse,
};
use zksync_basic_types::{
basic_fri_types::Eip4844Blobs, commitment::L1BatchCommitmentMode, L1BatchNumber,
};
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_object_store::{bincode, ObjectStore};
use zksync_prover_interface::{
api::{
OptionalProofGenerationDataRequest, ProofGenerationData, ProofGenerationDataResponse,
VerifyProofRequest,
},
api::{ProofGenerationData, VerifyProofRequest},
inputs::{
L1BatchMetadataHashes, VMRunWitnessInputData, WitnessInputData, WitnessInputMerklePaths,
},
outputs::L1BatchProofForL1,
};

use crate::{
error::ProcessorError,
metrics::{Method, MethodCallGuard},
};
use crate::error::ProcessorError;

#[derive(Clone)]
pub(crate) struct Processor {
Expand All @@ -45,17 +43,32 @@ impl Processor {
pub(crate) async fn verify_proof(
&self,
Path(l1_batch_number): Path<u32>,
Json(payload): Json<VerifyProofRequest>,
mut multipart: Multipart,
) -> Result<(), ProcessorError> {
let mut guard = MethodCallGuard::new(Method::VerifyProof);

let l1_batch_number = L1BatchNumber(l1_batch_number);
tracing::info!(
"Received request to verify proof for batch: {:?}",
l1_batch_number
);

let serialized_proof = bincode::serialize(&payload.0)?;
let mut serialized_proof = vec![];
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved

while let Some(mut data) = multipart.next_field().await.map_err(|err| {
tracing::error!("Failed to read field: {:?}", err);
ProcessorError::InvalidProof
})? {
while let Some(chunk) = data.chunk().await.map_err(|err| {
tracing::error!("Failed to read chunk: {:?}", err);
ProcessorError::InvalidProof
})? {
serialized_proof.extend_from_slice(&chunk);
}
}
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved

tracing::info!("Received proof is size: {}", serialized_proof.len());

let payload: VerifyProofRequest = bincode::deserialize(&serialized_proof)?;

let expected_proof = bincode::serialize(
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved
&self
.blob_store
Expand All @@ -67,22 +80,12 @@ impl Processor {
return Err(ProcessorError::InvalidProof);
}

guard.mark_successful();

Ok(())
}

#[tracing::instrument(skip_all)]
pub(crate) async fn get_proof_generation_data(
&mut self,
request: Json<OptionalProofGenerationDataRequest>,
) -> Result<Json<ProofGenerationDataResponse>, ProcessorError> {
tracing::info!("Received request for proof generation data: {:?}", request);

let mut guard = match request.0 .0 {
Some(_) => MethodCallGuard::new(Method::GetSpecificProofGenerationData),
None => MethodCallGuard::new(Method::GetLatestProofGenerationData),
};
pub(crate) async fn get_proof_generation_data(&mut self) -> impl IntoResponse {
tracing::info!("Received request for proof generation data");

let latest_available_batch = self
.pool
Expand All @@ -93,38 +96,87 @@ impl Processor {
.get_latest_proven_batch()
.await?;

let l1_batch_number = if let Some(l1_batch_number) = request.0 .0 {
if l1_batch_number > latest_available_batch {
tracing::error!(
"Requested batch is not available: {:?}, latest available batch is {:?}",
l1_batch_number,
latest_available_batch
);
return Err(ProcessorError::BatchNotReady(l1_batch_number));
let proof_generation_data = self
.proof_generation_data_for_existing_batch_internal(latest_available_batch)
.await;

match proof_generation_data {
Ok(data) => {
let data = bincode::serialize(&data)?;
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved

let headers = [
(header::CONTENT_TYPE, "application/octet-stream"),
(
header::CONTENT_DISPOSITION,
&format!(
"attachment; filename=\"witness_inputs_{}.bin\"",
latest_available_batch.0
),
),
];

Ok((headers, data).into_response())
}
Err(err) => Err(err),
}
}

#[tracing::instrument(skip(self))]
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) async fn proof_generation_data_for_existing_batch(
&self,
Path(l1_batch_number): Path<u32>,
) -> impl IntoResponse {
let l1_batch_number = L1BatchNumber(l1_batch_number);
tracing::info!(
"Received request for proof generation data for batch: {:?}",
l1_batch_number
} else {
latest_available_batch
};
);

let latest_available_batch = self
.pool
.connection()
.await
.unwrap()
.proof_generation_dal()
.get_latest_proven_batch()
.await?;

if l1_batch_number > latest_available_batch {
tracing::error!(
"Requested batch is not available: {:?}, latest available batch is {:?}",
l1_batch_number,
latest_available_batch
);
return Err(ProcessorError::BatchNotReady(l1_batch_number));
}

let proof_generation_data = self
.proof_generation_data_for_existing_batch(l1_batch_number)
.proof_generation_data_for_existing_batch_internal(latest_available_batch)
.await;

match proof_generation_data {
Ok(data) => {
guard.mark_successful();
let data = bincode::serialize(&data)?;

let headers = [
(header::CONTENT_TYPE, "text/bin; charset=utf-8"),
(
header::CONTENT_DISPOSITION,
&format!(
"attachment; filename=\"witness_inputs_{}.bin\"",
latest_available_batch.0
),
),
];

Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new(
data,
)))))
Ok((headers, data).into_response())
}
Err(err) => Err(err),
}
}

#[tracing::instrument(skip(self))]
async fn proof_generation_data_for_existing_batch(
async fn proof_generation_data_for_existing_batch_internal(
&self,
l1_batch_number: L1BatchNumber,
) -> Result<ProofGenerationData, ProcessorError> {
Expand Down
Loading
Loading