Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: External prover API metrics, refactoring #2630

Merged
merged 6 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/lib/dal/src/proof_generation_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl ProofGenerationDal<'_, '_> {
Ok(result)
}

pub async fn get_available_batch(&mut self) -> DalResult<L1BatchNumber> {
pub async fn get_latest_proven_batch(&mut self) -> DalResult<L1BatchNumber> {
let result = sqlx::query!(
r#"
SELECT
Expand Down
1 change: 1 addition & 0 deletions core/node/external_proof_integration_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ zksync_dal.workspace = true
tokio.workspace = true
bincode.workspace = true
anyhow.workspace = true
vise.workspace = true
1 change: 1 addition & 0 deletions core/node/external_proof_integration_api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod error;
mod metrics;
mod processor;

use std::{net::SocketAddr, sync::Arc};
Expand Down
55 changes: 55 additions & 0 deletions core/node/external_proof_integration_api/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::time::Duration;

use tokio::time::Instant;
use vise::{EncodeLabelSet, EncodeLabelValue, Histogram, LabeledFamily, Metrics};

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)]
#[metrics(label = "outcome", rename_all = "snake_case")]
pub(crate) enum CallOutcome {
Success,
Failure,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)]
#[metrics(label = "type", rename_all = "snake_case")]
pub(crate) enum Method {
GetLatestProofGenerationData,
GetSpecificProofGenerationData,
VerifyProof,
}

#[derive(Debug, Metrics)]
#[metrics(prefix = "external_proof_integration_api")]
pub(crate) struct ProofIntegrationApiMetrics {
#[metrics(labels = ["method", "outcome"], buckets = vise::Buckets::LATENCIES)]
pub call_latency: LabeledFamily<(Method, CallOutcome), Histogram<Duration>, 2>,
}

pub(crate) struct MethodCallGuard {
method_type: Method,
outcome: CallOutcome,
started_at: Instant,
}

impl MethodCallGuard {
pub(crate) fn new(method_type: Method) -> Self {
MethodCallGuard {
method_type,
outcome: CallOutcome::Failure,
started_at: Instant::now(),
}
}

pub(crate) fn mark_successful(&mut self) {
self.outcome = CallOutcome::Success;
}
}

impl Drop for MethodCallGuard {
fn drop(&mut self) {
METRICS.call_latency[&(self.method_type, self.outcome)].observe(self.started_at.elapsed());
}
}

#[vise::register]
pub(crate) static METRICS: vise::Global<ProofIntegrationApiMetrics> = vise::Global::new();
78 changes: 47 additions & 31 deletions core/node/external_proof_integration_api/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use zksync_prover_interface::{
outputs::L1BatchProofForL1,
};

use crate::error::ProcessorError;
use crate::{
error::ProcessorError,
metrics::{Method, MethodCallGuard},
};

#[derive(Clone)]
pub(crate) struct Processor {
Expand All @@ -39,20 +42,55 @@ impl Processor {
}
}

pub(crate) async fn verify_proof(
&self,
Path(l1_batch_number): Path<u32>,
Json(payload): Json<VerifyProofRequest>,
) -> Result<(), ProcessorError> {
let mut guard = MethodCallGuard::new(Method::VerifyProof);

let l1_batch_number = L1BatchNumber(l1_batch_number);
tracing::info!(
"Received request to verify proof for batch: {:?}",
l1_batch_number
);

let serialized_proof = bincode::serialize(&payload.0)?;
let expected_proof = bincode::serialize(
&self
.blob_store
.get::<L1BatchProofForL1>((l1_batch_number, payload.0.protocol_version))
.await?,
)?;

if serialized_proof != expected_proof {
return Err(ProcessorError::InvalidProof);
}

guard.mark_successful();
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}

#[tracing::instrument(skip_all)]
pub(crate) async fn get_proof_generation_data(
&mut self,
request: Json<OptionalProofGenerationDataRequest>,
) -> Result<Json<ProofGenerationDataResponse>, ProcessorError> {
tracing::info!("Received request for proof generation data: {:?}", request);

let mut guard = match request.0 .0 {
Some(_) => MethodCallGuard::new(Method::GetSpecificProofGenerationData),
None => MethodCallGuard::new(Method::GetLatestProofGenerationData),
};

let latest_available_batch = self
.pool
.connection()
.await
.unwrap()
.proof_generation_dal()
.get_available_batch()
.get_latest_proven_batch()
.await?;

let l1_batch_number = if let Some(l1_batch_number) = request.0 .0 {
Expand All @@ -74,9 +112,13 @@ impl Processor {
.await;

match proof_generation_data {
Ok(data) => Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new(
data,
))))),
Ok(data) => {
guard.mark_successful();

Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new(
data,
)))))
}
Err(err) => Err(err),
}
}
Expand Down Expand Up @@ -161,30 +203,4 @@ impl Processor {
l1_verifier_config: protocol_version.l1_verifier_config,
})
}

pub(crate) async fn verify_proof(
&self,
Path(l1_batch_number): Path<u32>,
Json(payload): Json<VerifyProofRequest>,
) -> Result<(), ProcessorError> {
let l1_batch_number = L1BatchNumber(l1_batch_number);
tracing::info!(
"Received request to verify proof for batch: {:?}",
l1_batch_number
);

let serialized_proof = bincode::serialize(&payload.0)?;
let expected_proof = bincode::serialize(
&self
.blob_store
.get::<L1BatchProofForL1>((l1_batch_number, payload.0.protocol_version))
.await?,
)?;

if serialized_proof != expected_proof {
return Err(ProcessorError::InvalidProof);
}

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ pub struct ExternalProofIntegrationApiLayer {
#[derive(Debug, FromContext)]
#[context(crate = crate)]
pub struct Input {
pub master_pool: PoolResource<ReplicaPool>,
pub replica_pool: PoolResource<ReplicaPool>,
pub object_store: ObjectStoreResource,
}

#[derive(Debug, IntoContext)]
#[context(crate = crate)]
pub struct Output {
#[context(task)]
pub task: ProverApiTask,
pub task: ExternalProofIntegrationApiTask,
}

impl ExternalProofIntegrationApiLayer {
Expand All @@ -59,13 +59,13 @@ impl WiringLayer for ExternalProofIntegrationApiLayer {
}

async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let main_pool = input.master_pool.get().await?;
let replica_pool = input.replica_pool.get().await.unwrap();
let blob_store = input.object_store.0;

let task = ProverApiTask {
let task = ExternalProofIntegrationApiTask {
external_proof_integration_api_config: self.external_proof_integration_api_config,
blob_store,
main_pool,
replica_pool,
commitment_mode: self.commitment_mode,
};

Expand All @@ -74,15 +74,15 @@ impl WiringLayer for ExternalProofIntegrationApiLayer {
}

#[derive(Debug)]
pub struct ProverApiTask {
pub struct ExternalProofIntegrationApiTask {
external_proof_integration_api_config: ExternalProofIntegrationApiConfig,
blob_store: Arc<dyn ObjectStore>,
main_pool: ConnectionPool<Core>,
replica_pool: ConnectionPool<Core>,
commitment_mode: L1BatchCommitmentMode,
}

#[async_trait::async_trait]
impl Task for ProverApiTask {
impl Task for ExternalProofIntegrationApiTask {
fn id(&self) -> TaskId {
"external_proof_integration_api".into()
}
Expand All @@ -91,7 +91,7 @@ impl Task for ProverApiTask {
zksync_external_proof_integration_api::run_server(
self.external_proof_integration_api_config,
self.blob_store,
self.main_pool,
self.replica_pool,
self.commitment_mode,
stop_receiver.0,
)
Expand Down
Loading