From 1409ad2f09f601becdff6495497c33d56f6af214 Mon Sep 17 00:00:00 2001 From: Vladislav Volosnikov Date: Mon, 19 Aug 2024 15:59:23 +0200 Subject: [PATCH] Revert use of spawn_blocking in LWG/NWG --- .../witness_generator/src/leaf_aggregation.rs | 80 ++++++++-------- .../witness_generator/src/node_aggregation.rs | 91 +++++++++---------- 2 files changed, 81 insertions(+), 90 deletions(-) diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation.rs index 2cfae1600287..d8cad84e777d 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Instant}; use anyhow::Context as _; use async_trait::async_trait; use circuit_definitions::circuit_definitions::recursion_layer::base_circuit_type_into_recursive_leaf_circuit_type; -use tokio::{runtime::Handle, sync::Semaphore}; +use tokio::sync::Semaphore; use zkevm_test_harness::{ witness::recursive_aggregation::{ compute_leaf_params, create_leaf_witness, split_recursion_queue, @@ -298,48 +298,44 @@ pub async fn process_leaf_aggregation_job( let base_vk = job.base_vk.clone(); let leaf_params = (circuit_id, job.leaf_params.clone()); - let handle = tokio::task::spawn_blocking(move || { - let async_task = async { - let _permit = semaphore - .acquire() - .await - .expect("failed to get permit to process queues chunk"); - - let proofs = load_proofs_for_job_ids(&proofs_ids_for_queue, &*object_store).await; - let base_proofs = proofs - .into_iter() - .map(|wrapper| match wrapper { - FriProofWrapper::Base(base_proof) => base_proof, - FriProofWrapper::Recursive(_) => { - panic!( - "Expected only base proofs for leaf agg {} {}", - job.circuit_id, job.block_number - ); - } - }) - .collect(); - - let (_, circuit) = create_leaf_witness( - circuit_id.into(), - queue, - base_proofs, - &base_vk, - &leaf_params, - ); - - save_recursive_layer_prover_input_artifacts( - job.block_number, - circuit_idx, - vec![circuit], - AggregationRound::LeafAggregation, - 0, - &*object_store, - None, - ) + let handle = tokio::task::spawn(async move { + let _permit = semaphore + .acquire() .await - }; - - Handle::current().block_on(async_task) + .expect("failed to get permit to process queues chunk"); + + let proofs = load_proofs_for_job_ids(&proofs_ids_for_queue, &*object_store).await; + let base_proofs = proofs + .into_iter() + .map(|wrapper| match wrapper { + FriProofWrapper::Base(base_proof) => base_proof, + FriProofWrapper::Recursive(_) => { + panic!( + "Expected only base proofs for leaf agg {} {}", + job.circuit_id, job.block_number + ); + } + }) + .collect(); + + let (_, circuit) = create_leaf_witness( + circuit_id.into(), + queue, + base_proofs, + &base_vk, + &leaf_params, + ); + + save_recursive_layer_prover_input_artifacts( + job.block_number, + circuit_idx, + vec![circuit], + AggregationRound::LeafAggregation, + 0, + &*object_store, + None, + ) + .await }); handles.push(handle); diff --git a/prover/crates/bin/witness_generator/src/node_aggregation.rs b/prover/crates/bin/witness_generator/src/node_aggregation.rs index 4f396fd4b5a5..a7dce2a513d8 100644 --- a/prover/crates/bin/witness_generator/src/node_aggregation.rs +++ b/prover/crates/bin/witness_generator/src/node_aggregation.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Instant}; use anyhow::Context as _; use async_trait::async_trait; use circuit_definitions::circuit_definitions::recursion_layer::RECURSION_ARITY; -use tokio::{runtime::Handle, sync::Semaphore}; +use tokio::sync::Semaphore; use zkevm_test_harness::witness::recursive_aggregation::{ compute_node_vk_commitment, create_node_witness, }; @@ -138,56 +138,51 @@ impl NodeAggregationWitnessGenerator { let vk = vk.clone(); let all_leafs_layer_params = job.all_leafs_layer_params.clone(); - let handle = tokio::task::spawn_blocking(move || { - let async_task = async { - let _permit = semaphore - .acquire() - .await - .expect("failed to get permit to process queues chunk"); - - let proofs = - load_proofs_for_job_ids(&proofs_ids_for_chunk, &*object_store).await; - let mut recursive_proofs = vec![]; - for wrapper in proofs { - match wrapper { - FriProofWrapper::Base(_) => { - panic!( - "Expected only recursive proofs for node agg {} {}", - job.circuit_id, job.block_number - ); - } - FriProofWrapper::Recursive(recursive_proof) => { - recursive_proofs.push(recursive_proof) - } + let handle = tokio::task::spawn(async move { + let _permit = semaphore + .acquire() + .await + .expect("failed to get permit to process queues chunk"); + + let proofs = load_proofs_for_job_ids(&proofs_ids_for_chunk, &*object_store).await; + let mut recursive_proofs = vec![]; + for wrapper in proofs { + match wrapper { + FriProofWrapper::Base(_) => { + panic!( + "Expected only recursive proofs for node agg {} {}", + job.circuit_id, job.block_number + ); + } + FriProofWrapper::Recursive(recursive_proof) => { + recursive_proofs.push(recursive_proof) } } + } + + let (result_circuit_id, recursive_circuit, input_queue) = create_node_witness( + &chunk, + recursive_proofs, + &vk, + node_vk_commitment, + &all_leafs_layer_params, + ); + + let recursive_circuit_id_and_url = save_recursive_layer_prover_input_artifacts( + job.block_number, + circuit_idx, + vec![recursive_circuit], + AggregationRound::NodeAggregation, + job.depth + 1, + &*object_store, + Some(job.circuit_id), + ) + .await; - let (result_circuit_id, recursive_circuit, input_queue) = create_node_witness( - &chunk, - recursive_proofs, - &vk, - node_vk_commitment, - &all_leafs_layer_params, - ); - - let recursive_circuit_id_and_url = save_recursive_layer_prover_input_artifacts( - job.block_number, - circuit_idx, - vec![recursive_circuit], - AggregationRound::NodeAggregation, - job.depth + 1, - &*object_store, - Some(job.circuit_id), - ) - .await; - - ( - (result_circuit_id, input_queue), - recursive_circuit_id_and_url, - ) - }; - - Handle::current().block_on(async_task) + ( + (result_circuit_id, input_queue), + recursive_circuit_id_and_url, + ) }); handles.push(handle);