diff --git a/core/lib/object_store/src/mirror.rs b/core/lib/object_store/src/mirror.rs index 948770e7b39c..cc76a49fd67d 100644 --- a/core/lib/object_store/src/mirror.rs +++ b/core/lib/object_store/src/mirror.rs @@ -23,7 +23,7 @@ impl MirroringObjectStore { #[async_trait] impl ObjectStore for MirroringObjectStore { - #[tracing::instrument(skip(self))] + #[tracing::instrument(name = "MirroringObjectStore::get_raw", skip(self))] async fn get_raw(&self, bucket: Bucket, key: &str) -> Result, ObjectStoreError> { match self.mirror_store.get_raw(bucket, key).await { Ok(object) => { @@ -49,7 +49,11 @@ impl ObjectStore for MirroringObjectStore { } } - #[tracing::instrument(skip(self, value), fields(value.len = value.len()))] + #[tracing::instrument( + name = "MirroringObjectStore::put_raw", + skip(self, value), + fields(value.len = value.len()) + )] async fn put_raw( &self, bucket: Bucket, @@ -66,7 +70,7 @@ impl ObjectStore for MirroringObjectStore { Ok(()) } - #[tracing::instrument(skip(self))] + #[tracing::instrument(name = "MirroringObjectStore::remove_raw", skip(self))] async fn remove_raw(&self, bucket: Bucket, key: &str) -> Result<(), ObjectStoreError> { self.inner.remove_raw(bucket, key).await?; // Only remove the value from the mirror once it has been removed in the underlying store diff --git a/core/lib/object_store/src/objects.rs b/core/lib/object_store/src/objects.rs index 897c93e0b6f8..f5bb3706d9d4 100644 --- a/core/lib/object_store/src/objects.rs +++ b/core/lib/object_store/src/objects.rs @@ -128,8 +128,15 @@ impl dyn ObjectStore + '_ { /// /// Returns an error if an object with the `key` does not exist, cannot be accessed, /// or cannot be deserialized. + #[tracing::instrument( + name = "ObjectStore::get", + skip_all, + fields(key) // Will be recorded within the function. + )] pub async fn get(&self, key: V::Key<'_>) -> Result { let key = V::encode_key(key); + // Record the key for tracing. + tracing::Span::current().record("key", key.as_str()); let bytes = self.get_raw(V::BUCKET, &key).await?; V::deserialize(bytes).map_err(ObjectStoreError::Serialization) } @@ -140,6 +147,11 @@ impl dyn ObjectStore + '_ { /// /// Returns an error if an object with the `encoded_key` does not exist, cannot be accessed, /// or cannot be deserialized. + #[tracing::instrument( + name = "ObjectStore::get_by_encoded_key", + skip_all, + fields(key = %encoded_key) + )] pub async fn get_by_encoded_key( &self, encoded_key: String, @@ -154,12 +166,19 @@ impl dyn ObjectStore + '_ { /// # Errors /// /// Returns an error if serialization or the insertion / replacement operation fails. + #[tracing::instrument( + name = "ObjectStore::put", + skip_all, + fields(key) // Will be recorded within the function. + )] pub async fn put( &self, key: V::Key<'_>, value: &V, ) -> Result { let key = V::encode_key(key); + // Record the key for tracing. + tracing::Span::current().record("key", key.as_str()); let bytes = value.serialize().map_err(ObjectStoreError::Serialization)?; self.put_raw(V::BUCKET, &key, bytes).await?; Ok(key) @@ -170,8 +189,15 @@ impl dyn ObjectStore + '_ { /// # Errors /// /// Returns I/O errors specific to the storage. + #[tracing::instrument( + name = "ObjectStore::put", + skip_all, + fields(key) // Will be recorded within the function. + )] pub async fn remove(&self, key: V::Key<'_>) -> Result<(), ObjectStoreError> { let key = V::encode_key(key); + // Record the key for tracing. + tracing::Span::current().record("key", key.as_str()); self.remove_raw(V::BUCKET, &key).await } diff --git a/core/lib/object_store/src/retries.rs b/core/lib/object_store/src/retries.rs index fafde47a9e2d..a1932d799f8e 100644 --- a/core/lib/object_store/src/retries.rs +++ b/core/lib/object_store/src/retries.rs @@ -19,7 +19,11 @@ enum Request<'a> { } impl Request<'_> { - #[tracing::instrument(skip(f))] // output request and store as a part of structured logs + #[tracing::instrument( + name = "object_store::Request::retry", + skip(f), // output request and store as a part of structured logs + fields(retries) // Will be recorded before returning from the function + )] async fn retry( self, store: &impl fmt::Debug, @@ -32,13 +36,13 @@ impl Request<'_> { { let mut retries = 1; let mut backoff_secs = 1; - loop { + let result = loop { match f().await { - Ok(result) => return Ok(result), + Ok(result) => break Ok(result), Err(err) if err.is_transient() => { if retries > max_retries { tracing::warn!(%err, "Exhausted {max_retries} retries performing request; returning last error"); - return Err(err); + break Err(err); } tracing::info!(%err, "Failed request, retries: {retries}/{max_retries}"); retries += 1; @@ -50,10 +54,12 @@ impl Request<'_> { } Err(err) => { tracing::warn!(%err, "Failed request with a fatal error"); - return Err(err); + break Err(err); } } - } + }; + tracing::Span::current().record("retries", retries); + result } } diff --git a/core/node/commitment_generator/src/lib.rs b/core/node/commitment_generator/src/lib.rs index 6dc1ef2d29fa..64e60b6dec0e 100644 --- a/core/node/commitment_generator/src/lib.rs +++ b/core/node/commitment_generator/src/lib.rs @@ -75,6 +75,7 @@ impl CommitmentGenerator { self.health_updater.subscribe() } + #[tracing::instrument(skip(self))] async fn calculate_aux_commitments( &self, l1_batch_number: L1BatchNumber, @@ -145,6 +146,7 @@ impl CommitmentGenerator { }) } + #[tracing::instrument(skip(self))] async fn prepare_input( &self, l1_batch_number: L1BatchNumber, @@ -285,6 +287,7 @@ impl CommitmentGenerator { Ok(input) } + #[tracing::instrument(skip(self))] async fn process_batch( &self, l1_batch_number: L1BatchNumber, @@ -307,6 +310,7 @@ impl CommitmentGenerator { Ok(artifacts) } + #[tracing::instrument(skip(self))] async fn step( &self, l1_batch_numbers: ops::RangeInclusive, @@ -384,6 +388,7 @@ impl CommitmentGenerator { } } + #[tracing::instrument(skip(self))] async fn next_batch_range(&self) -> anyhow::Result>> { let mut connection = self .connection_pool diff --git a/core/node/eth_sender/src/eth_tx_aggregator.rs b/core/node/eth_sender/src/eth_tx_aggregator.rs index ee5806c72f54..25d57b8cee39 100644 --- a/core/node/eth_sender/src/eth_tx_aggregator.rs +++ b/core/node/eth_sender/src/eth_tx_aggregator.rs @@ -340,7 +340,7 @@ impl EthTxAggregator { Ok(vk_hash) } - #[tracing::instrument(skip(self, storage))] + #[tracing::instrument(skip_all, name = "EthTxAggregator::loop_iteration")] async fn loop_iteration( &mut self, storage: &mut Connection<'_, Core>, diff --git a/core/node/eth_sender/src/eth_tx_manager.rs b/core/node/eth_sender/src/eth_tx_manager.rs index d2ee4380d68b..d9fa504e498f 100644 --- a/core/node/eth_sender/src/eth_tx_manager.rs +++ b/core/node/eth_sender/src/eth_tx_manager.rs @@ -630,7 +630,7 @@ impl EthTxManager { Ok(()) } - #[tracing::instrument(skip(self, storage))] + #[tracing::instrument(skip_all, name = "EthTxManager::loop_iteration")] async fn loop_iteration( &mut self, storage: &mut Connection<'_, Core>, diff --git a/core/node/eth_watch/src/lib.rs b/core/node/eth_watch/src/lib.rs index 72b6b29a2533..e964d63bb19d 100644 --- a/core/node/eth_watch/src/lib.rs +++ b/core/node/eth_watch/src/lib.rs @@ -93,6 +93,7 @@ impl EthWatch { }) } + #[tracing::instrument(name = "EthWatch::initialize_state", skip_all)] async fn initialize_state( client: &dyn EthClient, storage: &mut Connection<'_, Core>, @@ -166,7 +167,7 @@ impl EthWatch { Ok(()) } - #[tracing::instrument(skip_all)] + #[tracing::instrument(name = "EthWatch::loop_iteration", skip_all)] async fn loop_iteration( &mut self, storage: &mut Connection<'_, Core>, diff --git a/core/node/house_keeper/src/periodic_job.rs b/core/node/house_keeper/src/periodic_job.rs index 8ac4c3574408..5de17b92c047 100644 --- a/core/node/house_keeper/src/periodic_job.rs +++ b/core/node/house_keeper/src/periodic_job.rs @@ -3,6 +3,7 @@ use std::time::Duration; use anyhow::Context; use async_trait::async_trait; use tokio::sync::watch; +use tracing::Instrument; #[async_trait] pub trait PeriodicJob: Sync + Send { @@ -20,8 +21,12 @@ pub trait PeriodicJob: Sync + Send { "Starting periodic job: {} with frequency: {timeout:?}", Self::SERVICE_NAME ); + while !*stop_receiver.borrow_and_update() { self.run_routine_task() + .instrument( + tracing::info_span!("run_routine_task", service_name = %Self::SERVICE_NAME), + ) .await .context("run_routine_task()")?; // Error here corresponds to a timeout w/o `stop_receiver` changed; we're OK with this. diff --git a/core/node/metadata_calculator/src/updater.rs b/core/node/metadata_calculator/src/updater.rs index d0bd2f2b82c0..e2acf62dea8a 100644 --- a/core/node/metadata_calculator/src/updater.rs +++ b/core/node/metadata_calculator/src/updater.rs @@ -196,6 +196,7 @@ impl TreeUpdater { Ok(()) } + #[tracing::instrument(name = "TreeUpdater::step", skip(self, pool))] async fn step( &mut self, pool: &ConnectionPool, diff --git a/core/node/proof_data_handler/src/request_processor.rs b/core/node/proof_data_handler/src/request_processor.rs index 11d0aebfa806..ee266a88971e 100644 --- a/core/node/proof_data_handler/src/request_processor.rs +++ b/core/node/proof_data_handler/src/request_processor.rs @@ -45,6 +45,7 @@ impl RequestProcessor { } } + #[tracing::instrument(skip_all)] pub(crate) async fn get_proof_generation_data( &self, request: Json, @@ -105,6 +106,7 @@ impl RequestProcessor { /// /// Expects all the data to be present in the database. /// Will panic if any of the required data is missing. + #[tracing::instrument(skip(self))] async fn proof_generation_data_for_existing_batch( &self, l1_batch_number: L1BatchNumber, diff --git a/core/node/state_keeper/src/batch_executor/mod.rs b/core/node/state_keeper/src/batch_executor/mod.rs index 4577ab1b360a..da4eb194bff5 100644 --- a/core/node/state_keeper/src/batch_executor/mod.rs +++ b/core/node/state_keeper/src/batch_executor/mod.rs @@ -122,6 +122,7 @@ impl BatchExecutorHandle { } } + #[tracing::instrument(skip_all)] pub async fn execute_tx(&mut self, tx: Transaction) -> anyhow::Result { let tx_gas_limit = tx.gas_limit().as_u64(); @@ -161,6 +162,7 @@ impl BatchExecutorHandle { Ok(res) } + #[tracing::instrument(skip_all)] pub async fn start_next_l2_block(&mut self, env: L2BlockEnv) -> anyhow::Result<()> { // While we don't get anything from the channel, it's useful to have it as a confirmation that the operation // indeed has been processed. @@ -184,6 +186,7 @@ impl BatchExecutorHandle { Ok(()) } + #[tracing::instrument(skip_all)] pub async fn rollback_last_tx(&mut self) -> anyhow::Result<()> { // While we don't get anything from the channel, it's useful to have it as a confirmation that the operation // indeed has been processed. @@ -207,6 +210,7 @@ impl BatchExecutorHandle { Ok(()) } + #[tracing::instrument(skip_all)] pub async fn finish_batch(mut self) -> anyhow::Result { let (response_sender, response_receiver) = oneshot::channel(); let send_failed = self diff --git a/core/node/state_keeper/src/io/output_handler.rs b/core/node/state_keeper/src/io/output_handler.rs index ec254eaec669..1ac017c04a04 100644 --- a/core/node/state_keeper/src/io/output_handler.rs +++ b/core/node/state_keeper/src/io/output_handler.rs @@ -64,6 +64,11 @@ impl OutputHandler { Ok(()) } + #[tracing::instrument( + name = "OutputHandler::handle_l2_block" + skip_all, + fields(l2_block = %updates_manager.l2_block.number) + )] pub(crate) async fn handle_l2_block( &mut self, updates_manager: &UpdatesManager, @@ -82,6 +87,11 @@ impl OutputHandler { Ok(()) } + #[tracing::instrument( + name = "OutputHandler::handle_l1_batch" + skip_all, + fields(l1_batch = %updates_manager.l1_batch.number) + )] pub(crate) async fn handle_l1_batch( &mut self, updates_manager: Arc, diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index 6c1718232a09..7675960ecbdc 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -6,6 +6,7 @@ use std::{ use anyhow::Context as _; use tokio::sync::watch; +use tracing::{info_span, Instrument}; use zksync_multivm::interface::{Halt, L1BatchEnv, SystemEnv}; use zksync_state::ReadStorageFactory; use zksync_types::{ @@ -285,6 +286,12 @@ impl ZkSyncStateKeeper { .with_context(|| format!("failed loading upgrade transaction for {protocol_version:?}")) } + #[tracing::instrument( + skip_all, + fields( + l1_batch = %cursor.l1_batch, + ) + )] async fn wait_for_new_batch_params( &mut self, cursor: &IoCursor, @@ -301,6 +308,12 @@ impl ZkSyncStateKeeper { Err(Error::Canceled) } + #[tracing::instrument( + skip_all, + fields( + l1_batch = %cursor.l1_batch, + ) + )] async fn wait_for_new_batch_env( &mut self, cursor: &IoCursor, @@ -329,6 +342,13 @@ impl ZkSyncStateKeeper { } } + #[tracing::instrument( + skip_all, + fields( + l1_batch = %updates.l1_batch.number, + l2_block = %updates.l2_block.number, + ) + )] async fn wait_for_new_l2_block_params( &mut self, updates: &UpdatesManager, @@ -349,6 +369,13 @@ impl ZkSyncStateKeeper { Err(Error::Canceled) } + #[tracing::instrument( + skip_all, + fields( + l1_batch = %updates_manager.l1_batch.number, + l2_block = %updates_manager.l2_block.number, + ) + )] async fn start_next_l2_block( params: L2BlockParams, updates_manager: &mut UpdatesManager, @@ -364,6 +391,13 @@ impl ZkSyncStateKeeper { }) } + #[tracing::instrument( + skip_all, + fields( + l1_batch = %updates_manager.l1_batch.number, + l2_block = %updates_manager.l2_block.number, + ) + )] async fn seal_l2_block(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> { self.output_handler .handle_l2_block(updates_manager) @@ -381,6 +415,10 @@ impl ZkSyncStateKeeper { /// batch, we need to restore the state. We must ensure that every transaction is executed successfully. /// /// Additionally, it initialized the next L2 block timestamp. + #[tracing::instrument( + skip_all, + fields(n_blocks = %l2_blocks_to_reexecute.len()) + )] async fn restore_state( &mut self, batch_executor: &mut BatchExecutorHandle, @@ -481,6 +519,10 @@ impl ZkSyncStateKeeper { Ok(()) } + #[tracing::instrument( + skip_all, + fields(l1_batch = %updates_manager.l1_batch.number) + )] async fn process_l1_batch( &mut self, batch_executor: &mut BatchExecutorHandle, @@ -532,6 +574,7 @@ impl ZkSyncStateKeeper { let Some(tx) = self .io .wait_for_next_tx(POLL_WAIT_DURATION) + .instrument(info_span!("wait_for_next_tx")) .await .context("error waiting for next transaction")? else { @@ -674,6 +717,7 @@ impl ZkSyncStateKeeper { /// 2. Seal manager decided that batch is ready to be sealed. /// Note: this method doesn't mutate `updates_manager` in the end. However, reference should be mutable /// because we use `apply_and_rollback` method of `updates_manager.storage_writes_deduplicator`. + #[tracing::instrument(skip_all)] async fn process_one_tx( &mut self, batch_executor: &mut BatchExecutorHandle, diff --git a/core/node/vm_runner/src/impls/bwip.rs b/core/node/vm_runner/src/impls/bwip.rs index 7ab18397353d..48f243cd9bc5 100644 --- a/core/node/vm_runner/src/impls/bwip.rs +++ b/core/node/vm_runner/src/impls/bwip.rs @@ -154,6 +154,11 @@ impl StateKeeperOutputHandler for BasicWitnessInputProducerOutputHandler { Ok(()) } + #[tracing::instrument( + name = "BasicWitnessInputProducerOutputHandler::handle_l1_batch", + skip_all, + fields(l1_batch = %updates_manager.l1_batch.number) + )] async fn handle_l1_batch( &mut self, updates_manager: Arc, @@ -186,6 +191,7 @@ impl StateKeeperOutputHandler for BasicWitnessInputProducerOutputHandler { } } +#[tracing::instrument(skip_all)] async fn get_updates_manager_witness_input_data( connection: &mut Connection<'_, Core>, updates_manager: Arc, @@ -262,6 +268,7 @@ async fn get_updates_manager_witness_input_data( }) } +#[tracing::instrument(skip_all)] async fn assert_database_witness_input_data( connection: &mut Connection<'_, Core>, l1_batch_number: L1BatchNumber, diff --git a/core/node/vm_runner/src/impls/protective_reads.rs b/core/node/vm_runner/src/impls/protective_reads.rs index 3be37b77d114..dfd5251fd39b 100644 --- a/core/node/vm_runner/src/impls/protective_reads.rs +++ b/core/node/vm_runner/src/impls/protective_reads.rs @@ -142,6 +142,11 @@ impl StateKeeperOutputHandler for ProtectiveReadsOutputHandler { Ok(()) } + #[tracing::instrument( + name = "ProtectiveReadsOutputHandler::handle_l1_batch", + skip_all, + fields(l1_batch = %updates_manager.l1_batch.number) + )] async fn handle_l1_batch( &mut self, updates_manager: Arc, diff --git a/etc/env/configs/prover-local.toml b/etc/env/configs/prover-local.toml index 1850871bc2c8..c5a4fec8d197 100644 --- a/etc/env/configs/prover-local.toml +++ b/etc/env/configs/prover-local.toml @@ -1,8 +1,9 @@ # Config for running prover locally -__imports__ = [ "base", "l1-inits/.init.env", "l2-inits/dev.init.env" ] +__imports__ = [ "base", "l1-inits/.init.env", "l2-inits/prover-local.init.env" ] [eth_sender.sender] proof_sending_mode = "OnlyRealProofs" +pubdata_sending_mode = "Blobs" [fri_prover] cloud_type = "Local" diff --git a/infrastructure/zk/src/database.ts b/infrastructure/zk/src/database.ts index f83227ed37e5..a30d262b8389 100644 --- a/infrastructure/zk/src/database.ts +++ b/infrastructure/zk/src/database.ts @@ -143,10 +143,10 @@ export async function setupForDal(dalPath: DalPath, dbUrl: string, shouldCheck: process.chdir(process.env.ZKSYNC_HOME as string); } -export async function setup(opts: DbOpts) { +export async function setup(opts: DbOpts, shouldCheck: boolean = true) { let dals = getDals(opts); for (const [dalPath, dbUrl] of dals.entries()) { - await setupForDal(dalPath, dbUrl, true); + await setupForDal(dalPath, dbUrl, shouldCheck); } } diff --git a/infrastructure/zk/src/hyperchain_wizard.ts b/infrastructure/zk/src/hyperchain_wizard.ts index ba4c85454563..166ad2b19f5f 100644 --- a/infrastructure/zk/src/hyperchain_wizard.ts +++ b/infrastructure/zk/src/hyperchain_wizard.ts @@ -746,7 +746,8 @@ async function configDemoHyperchain(cmd: Command) { testTokenOptions: { envFile: process.env.CHAIN_ETH_NETWORK! }, // TODO(EVM-573): support Validium mode runObservability: false, - deploymentMode: DeploymentMode.Rollup + deploymentMode: DeploymentMode.Rollup, + shouldCheckPostgres: true }); env.mergeInitToEnv(); diff --git a/infrastructure/zk/src/init.ts b/infrastructure/zk/src/init.ts index 9ed6e178e51e..6dbad67b489c 100644 --- a/infrastructure/zk/src/init.ts +++ b/infrastructure/zk/src/init.ts @@ -72,9 +72,9 @@ const initSetup = async ({ ]); }; -const initDatabase = async (): Promise => { +const initDatabase = async (shouldCheck: boolean = true): Promise => { await announced('Drop postgres db', db.drop({ core: true, prover: true })); - await announced('Setup postgres db', db.setup({ core: true, prover: true })); + await announced('Setup postgres db', db.setup({ core: true, prover: true }, shouldCheck)); await announced('Clean rocksdb', clean(`db/${process.env.ZKSYNC_ENV!}`)); await announced('Clean backups', clean(`backups/${process.env.ZKSYNC_ENV!}`)); }; @@ -145,6 +145,7 @@ type InitDevCmdActionOptions = InitSetupOptions & { baseTokenName?: string; validiumMode?: boolean; localLegacyBridgeTesting?: boolean; + shouldCheckPostgres: boolean; // Whether to perform `cargo sqlx prepare --check` }; export const initDevCmdAction = async ({ skipEnvSetup, @@ -155,7 +156,8 @@ export const initDevCmdAction = async ({ baseTokenName, runObservability, validiumMode, - localLegacyBridgeTesting + localLegacyBridgeTesting, + shouldCheckPostgres }: InitDevCmdActionOptions): Promise => { if (localLegacyBridgeTesting) { await makeEraChainIdSameAsCurrent(); @@ -174,7 +176,7 @@ export const initDevCmdAction = async ({ await deployTestTokens(testTokenOptions); } await initBridgehubStateTransition(); - await initDatabase(); + await initDatabase(shouldCheckPostgres); await initHyperchain({ includePaymaster: true, baseTokenName, @@ -251,6 +253,7 @@ export const initCommand = new Command('init') '--local-legacy-bridge-testing', 'used to test LegacyBridge compatibily. The chain will have the same id as the era chain id, while eraChainId in L2SharedBridge will be 0' ) + .option('--should-check-postgres', 'Whether to perform cargo sqlx prepare --check during database setup', true) .description('Deploys the shared bridge and registers a hyperchain locally, as quickly as possible.') .action(initDevCmdAction); diff --git a/infrastructure/zk/src/reinit.ts b/infrastructure/zk/src/reinit.ts index 65f0b73d6540..abbbe28d72dc 100644 --- a/infrastructure/zk/src/reinit.ts +++ b/infrastructure/zk/src/reinit.ts @@ -16,7 +16,8 @@ const reinitDevCmdAction = async (): Promise => { skipTestTokenDeployment: true, // TODO(EVM-573): support Validium mode runObservability: true, - deploymentMode: DeploymentMode.Rollup + deploymentMode: DeploymentMode.Rollup, + shouldCheckPostgres: false }); }; diff --git a/prover/crates/bin/proof_fri_compressor/src/compressor.rs b/prover/crates/bin/proof_fri_compressor/src/compressor.rs index 0d9083a57c5c..dc5ca939d9b4 100644 --- a/prover/crates/bin/proof_fri_compressor/src/compressor.rs +++ b/prover/crates/bin/proof_fri_compressor/src/compressor.rs @@ -54,7 +54,9 @@ impl ProofCompressor { } } + #[tracing::instrument(skip(proof, _compression_mode))] pub fn compress_proof( + l1_batch: L1BatchNumber, proof: ZkSyncRecursionLayerProof, _compression_mode: u8, ) -> anyhow::Result { @@ -173,8 +175,7 @@ impl JobProcessor for ProofCompressor { let compression_mode = self.compression_mode; let block_number = *job_id; tokio::task::spawn_blocking(move || { - let _span = tracing::info_span!("compress", %block_number).entered(); - Self::compress_proof(job, compression_mode) + Self::compress_proof(block_number, job, compression_mode) }) } diff --git a/prover/crates/bin/proof_fri_compressor/src/initial_setup_keys.rs b/prover/crates/bin/proof_fri_compressor/src/initial_setup_keys.rs index 222d2ec69cc5..172c6054b7db 100644 --- a/prover/crates/bin/proof_fri_compressor/src/initial_setup_keys.rs +++ b/prover/crates/bin/proof_fri_compressor/src/initial_setup_keys.rs @@ -1,5 +1,6 @@ use std::{fs::create_dir_all, io::Cursor, path::Path, time::Duration}; +#[tracing::instrument(skip_all)] fn download_initial_setup(key_download_url: &str) -> reqwest::Result> { tracing::info!("Downloading initial setup from {:?}", key_download_url); @@ -32,6 +33,7 @@ fn download_initial_setup(key_download_url: &str) -> reqwest::Result> { .and_then(|response| response.bytes().map(|bytes| bytes.to_vec())) } +#[tracing::instrument(skip_all)] pub fn download_initial_setup_keys_if_not_present( initial_setup_key_path: &str, key_download_url: &str, diff --git a/prover/crates/bin/prover_fri/src/gpu_prover_job_processor.rs b/prover/crates/bin/prover_fri/src/gpu_prover_job_processor.rs index 6148ca3e0aed..04146473f646 100644 --- a/prover/crates/bin/prover_fri/src/gpu_prover_job_processor.rs +++ b/prover/crates/bin/prover_fri/src/gpu_prover_job_processor.rs @@ -99,6 +99,7 @@ pub mod gpu_prover { } } + #[tracing::instrument(name = "Prover::get_setup_data", skip_all)] fn get_setup_data( &self, key: ProverServiceDataKey, @@ -124,6 +125,11 @@ pub mod gpu_prover { }) } + #[tracing::instrument( + name = "Prover::prove", + skip_all, + fields(l1_batch = %job.witness_vector_artifacts.prover_job.block_number) + )] pub fn prove( job: GpuProverJob, setup_data: Arc, diff --git a/prover/crates/bin/prover_fri/src/socket_listener.rs b/prover/crates/bin/prover_fri/src/socket_listener.rs index e65471409e1e..2a0a4d67e894 100644 --- a/prover/crates/bin/prover_fri/src/socket_listener.rs +++ b/prover/crates/bin/prover_fri/src/socket_listener.rs @@ -107,6 +107,7 @@ pub mod gpu_socket_listener { } } + #[tracing::instrument(name = "SocketListener::handle_incoming_file", skip_all)] async fn handle_incoming_file(&self, mut stream: TcpStream) -> anyhow::Result<()> { let mut assembly: Vec = vec![]; let started_at = Instant::now(); @@ -123,8 +124,11 @@ pub mod gpu_socket_listener { METRICS.witness_vector_blob_time[&(file_size_in_gb as u64)] .observe(started_at.elapsed()); - let witness_vector = bincode::deserialize::(&assembly) - .context("Failed deserializing witness vector")?; + let deserialize_span = tracing::info_span!("deserialize_witness_vector"); + let witness_vector = deserialize_span.in_scope(|| { + bincode::deserialize::(&assembly) + .context("Failed deserializing witness vector") + })?; tracing::info!( "Deserialized witness vector after {:?}", started_at.elapsed() diff --git a/prover/crates/bin/prover_fri_gateway/src/proof_gen_data_fetcher.rs b/prover/crates/bin/prover_fri_gateway/src/proof_gen_data_fetcher.rs index 809df8ae8225..f3c850ae56c8 100644 --- a/prover/crates/bin/prover_fri_gateway/src/proof_gen_data_fetcher.rs +++ b/prover/crates/bin/prover_fri_gateway/src/proof_gen_data_fetcher.rs @@ -30,6 +30,11 @@ impl ProofGenDataFetcher { } impl ProofGenDataFetcher { + #[tracing::instrument( + name = "ProofGenDataFetcher::save_proof_gen_data", + skip_all, + fields(l1_batch = %data.l1_batch_number) + )] async fn save_proof_gen_data(&self, data: ProofGenerationData) { let store = &*self.0.blob_store; let witness_inputs = store diff --git a/prover/crates/bin/witness_generator/src/basic_circuits.rs b/prover/crates/bin/witness_generator/src/basic_circuits.rs index 859b8515805a..76d3dce5ac83 100644 --- a/prover/crates/bin/witness_generator/src/basic_circuits.rs +++ b/prover/crates/bin/witness_generator/src/basic_circuits.rs @@ -185,6 +185,7 @@ impl JobProcessor for BasicWitnessGenerator { }) } + #[tracing::instrument(skip_all, fields(l1_batch = %job_id))] async fn save_result( &self, job_id: L1BatchNumber, @@ -243,7 +244,7 @@ impl JobProcessor for BasicWitnessGenerator { } } -#[allow(clippy::too_many_arguments)] +#[tracing::instrument(skip_all, fields(l1_batch = %block_number))] async fn process_basic_circuits_job( object_store: &dyn ObjectStore, started_at: Instant, @@ -268,6 +269,7 @@ async fn process_basic_circuits_job( } } +#[tracing::instrument(skip_all, fields(l1_batch = %block_number))] async fn update_database( prover_connection_pool: &ConnectionPool, started_at: Instant, @@ -305,6 +307,7 @@ async fn update_database( .await; } +#[tracing::instrument(skip_all, fields(l1_batch = %block_number))] async fn get_artifacts( block_number: L1BatchNumber, object_store: &dyn ObjectStore, @@ -313,6 +316,7 @@ async fn get_artifacts( BasicWitnessGeneratorJob { block_number, job } } +#[tracing::instrument(skip_all, fields(l1_batch = %block_number))] async fn save_scheduler_artifacts( block_number: L1BatchNumber, scheduler_partial_input: SchedulerCircuitInstanceWitness< @@ -341,6 +345,7 @@ async fn save_scheduler_artifacts( object_store.put(block_number, &wrapper).await.unwrap() } +#[tracing::instrument(skip_all, fields(l1_batch = %block_number, circuit_id = %circuit_id))] async fn save_recursion_queue( block_number: L1BatchNumber, circuit_id: u8, @@ -362,11 +367,7 @@ async fn save_recursion_queue( (circuit_id, blob_url, basic_circuit_count) } -async fn generate_witness( - block_number: L1BatchNumber, - object_store: &dyn ObjectStore, - input: WitnessInputData, -) -> ( +type Witness = ( Vec<(u8, String)>, Vec<(u8, String, usize)>, SchedulerCircuitInstanceWitness< @@ -375,7 +376,14 @@ async fn generate_witness( GoldilocksExt2, >, BlockAuxilaryOutputWitness, -) { +); + +#[tracing::instrument(skip_all, fields(l1_batch = %block_number))] +async fn generate_witness( + block_number: L1BatchNumber, + object_store: &dyn ObjectStore, + input: WitnessInputData, +) -> Witness { let bootloader_contents = expand_bootloader_contents( &input.vm_run_data.initial_heap_content, input.vm_run_data.protocol_version, @@ -397,7 +405,11 @@ async fn generate_witness( let (circuit_sender, mut circuit_receiver) = tokio::sync::mpsc::channel(1); let (queue_sender, mut queue_receiver) = tokio::sync::mpsc::channel(1); + let make_circuits_span = tracing::info_span!("make_circuits"); + let make_circuits_span_copy = make_circuits_span.clone(); let make_circuits = tokio::task::spawn_blocking(move || { + let span = tracing::info_span!(parent: make_circuits_span_copy, "make_circuits_blocking"); + let witness_storage = WitnessStorage::new(input.vm_run_data.witness_block_state); let storage_view = StorageView::new(witness_storage).to_rc_ptr(); @@ -432,37 +444,42 @@ async fn generate_witness( path, input.eip_4844_blobs.blobs(), |circuit| { - circuit_sender.blocking_send(circuit).unwrap(); + let parent_span = span.clone(); + tracing::info_span!(parent: parent_span, "send_circuit").in_scope(|| { + circuit_sender.blocking_send(circuit).unwrap(); + }); }, |a, b, c| queue_sender.blocking_send((a as u8, b, c)).unwrap(), ); (scheduler_witness, block_witness) - }); + }) + .instrument(make_circuits_span); let mut circuit_urls = vec![]; let mut recursion_urls = vec![]; let mut circuits_present = HashSet::::new(); + let save_circuits_span = tracing::info_span!("save_circuits"); let save_circuits = async { loop { tokio::select! { - Some(circuit) = circuit_receiver.recv() => { + Some(circuit) = circuit_receiver.recv().instrument(tracing::info_span!("wait_for_circuit")) => { circuits_present.insert(circuit.numeric_circuit_type()); circuit_urls.push( save_circuit(block_number, circuit, circuit_urls.len(), object_store).await, ); } - Some((circuit_id, queue, inputs)) = queue_receiver.recv() => recursion_urls.push( - save_recursion_queue(block_number, circuit_id, queue, &inputs, object_store) - .await, - ), + Some((circuit_id, queue, inputs)) = queue_receiver.recv().instrument(tracing::info_span!("wait_for_queue")) => { + let urls = save_recursion_queue(block_number, circuit_id, queue, &inputs, object_store).await; + recursion_urls.push(urls); + } else => break, }; } - }; + }.instrument(save_circuits_span); - let (witnesses, ()) = tokio::join!(make_circuits, save_circuits); + let (witnesses, ()) = tokio::join!(make_circuits, save_circuits,); let (mut scheduler_witness, block_aux_witness) = witnesses.unwrap(); recursion_urls.retain(|(circuit_id, _, _)| circuits_present.contains(circuit_id)); diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation.rs index 76703d0d874d..b0eff0f100e1 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation.rs @@ -93,6 +93,10 @@ impl LeafAggregationWitnessGenerator { } } + #[tracing::instrument( + skip_all, + fields(l1_batch = %leaf_job.block_number, circuit_id = %leaf_job.circuit_id) + )] pub fn process_job_sync( leaf_job: LeafAggregationWitnessGeneratorJob, started_at: Instant, @@ -151,11 +155,7 @@ impl JobProcessor for LeafAggregationWitnessGenerator { job: LeafAggregationWitnessGeneratorJob, started_at: Instant, ) -> tokio::task::JoinHandle> { - tokio::task::spawn_blocking(move || { - let block_number = job.block_number; - let _span = tracing::info_span!("leaf_aggregation", %block_number).entered(); - Ok(Self::process_job_sync(job, started_at)) - }) + tokio::task::spawn_blocking(move || Ok(Self::process_job_sync(job, started_at))) } async fn save_result( @@ -209,6 +209,10 @@ impl JobProcessor for LeafAggregationWitnessGenerator { } } +#[tracing::instrument( + skip_all, + fields(l1_batch = %metadata.block_number, circuit_id = %metadata.circuit_id) +)] pub async fn prepare_leaf_aggregation_job( metadata: LeafAggregationJobMetadata, object_store: &dyn ObjectStore, @@ -257,6 +261,10 @@ pub async fn prepare_leaf_aggregation_job( }) } +#[tracing::instrument( + skip_all, + fields(l1_batch = %job.block_number, circuit_id = %job.circuit_id) +)] pub fn process_leaf_aggregation_job( started_at: Instant, job: LeafAggregationWitnessGeneratorJob, @@ -288,6 +296,10 @@ pub fn process_leaf_aggregation_job( } } +#[tracing::instrument( + skip_all, + fields(l1_batch = %block_number, circuit_id = %circuit_id) +)] async fn update_database( prover_connection_pool: &ConnectionPool, started_at: Instant, @@ -362,6 +374,10 @@ async fn update_database( transaction.commit().await.unwrap(); } +#[tracing::instrument( + skip_all, + fields(l1_batch = %metadata.block_number, circuit_id = %metadata.circuit_id) +)] async fn get_artifacts( metadata: &LeafAggregationJobMetadata, object_store: &dyn ObjectStore, @@ -376,6 +392,10 @@ async fn get_artifacts( .unwrap_or_else(|_| panic!("leaf aggregation job artifacts missing: {:?}", key)) } +#[tracing::instrument( + skip_all, + fields(l1_batch = %artifacts.block_number, circuit_id = %artifacts.circuit_id) +)] async fn save_artifacts( artifacts: LeafAggregationArtifacts, object_store: &dyn ObjectStore, diff --git a/prover/crates/bin/witness_generator/src/node_aggregation.rs b/prover/crates/bin/witness_generator/src/node_aggregation.rs index 36b13d4357a9..b49928d6d27b 100644 --- a/prover/crates/bin/witness_generator/src/node_aggregation.rs +++ b/prover/crates/bin/witness_generator/src/node_aggregation.rs @@ -93,6 +93,10 @@ impl NodeAggregationWitnessGenerator { } } + #[tracing::instrument( + skip_all, + fields(l1_batch = %job.block_number, circuit_id = %job.circuit_id) + )] pub fn process_job_sync( job: NodeAggregationWitnessGeneratorJob, started_at: Instant, @@ -182,13 +186,13 @@ impl JobProcessor for NodeAggregationWitnessGenerator { job: NodeAggregationWitnessGeneratorJob, started_at: Instant, ) -> tokio::task::JoinHandle> { - tokio::task::spawn_blocking(move || { - let block_number = job.block_number; - let _span = tracing::info_span!("node_aggregation", %block_number).entered(); - Ok(Self::process_job_sync(job, started_at)) - }) + tokio::task::spawn_blocking(move || Ok(Self::process_job_sync(job, started_at))) } + #[tracing::instrument( + skip_all, + fields(l1_batch = %artifacts.block_number, circuit_id = %artifacts.circuit_id) + )] async fn save_result( &self, job_id: u32, @@ -233,6 +237,10 @@ impl JobProcessor for NodeAggregationWitnessGenerator { } } +#[tracing::instrument( + skip_all, + fields(l1_batch = %metadata.block_number, circuit_id = %metadata.circuit_id) +)] pub async fn prepare_job( metadata: NodeAggregationJobMetadata, object_store: &dyn ObjectStore, @@ -284,6 +292,10 @@ pub async fn prepare_job( } #[allow(clippy::too_many_arguments)] +#[tracing::instrument( + skip_all, + fields(l1_batch = %block_number, circuit_id = %circuit_id) +)] async fn update_database( prover_connection_pool: &ConnectionPool, started_at: Instant, @@ -351,6 +363,10 @@ async fn update_database( transaction.commit().await.unwrap(); } +#[tracing::instrument( + skip_all, + fields(l1_batch = %metadata.block_number, circuit_id = %metadata.circuit_id) +)] async fn get_artifacts( metadata: &NodeAggregationJobMetadata, object_store: &dyn ObjectStore, @@ -366,6 +382,10 @@ async fn get_artifacts( .unwrap_or_else(|_| panic!("node aggregation job artifacts missing: {:?}", key)) } +#[tracing::instrument( + skip_all, + fields(l1_batch = %artifacts.block_number, circuit_id = %artifacts.circuit_id) +)] async fn save_artifacts( artifacts: NodeAggregationArtifacts, object_store: &dyn ObjectStore, diff --git a/prover/crates/bin/witness_generator/src/recursion_tip.rs b/prover/crates/bin/witness_generator/src/recursion_tip.rs index 2f55621fecaf..2a57ffff85ff 100644 --- a/prover/crates/bin/witness_generator/src/recursion_tip.rs +++ b/prover/crates/bin/witness_generator/src/recursion_tip.rs @@ -92,6 +92,10 @@ impl RecursionTipWitnessGenerator { } } + #[tracing::instrument( + skip_all, + fields(l1_batch = %job.block_number) + )] pub fn process_job_sync( job: RecursionTipWitnessGeneratorJob, started_at: Instant, @@ -191,13 +195,13 @@ impl JobProcessor for RecursionTipWitnessGenerator { job: RecursionTipWitnessGeneratorJob, started_at: Instant, ) -> tokio::task::JoinHandle> { - tokio::task::spawn_blocking(move || { - let block_number = job.block_number; - let _span = tracing::info_span!("recursion_tip", %block_number).entered(); - Ok(Self::process_job_sync(job, started_at)) - }) + tokio::task::spawn_blocking(move || Ok(Self::process_job_sync(job, started_at))) } + #[tracing::instrument( + skip_all, + fields(l1_batch = %job_id) + )] async fn save_result( &self, job_id: L1BatchNumber, @@ -272,6 +276,10 @@ impl JobProcessor for RecursionTipWitnessGenerator { } } +#[tracing::instrument( + skip_all, + fields(l1_batch = %l1_batch_number) +)] pub async fn prepare_job( l1_batch_number: L1BatchNumber, final_node_proof_job_ids: Vec<(u8, u32)>, diff --git a/prover/crates/bin/witness_generator/src/scheduler.rs b/prover/crates/bin/witness_generator/src/scheduler.rs index 80c4322e644e..f69d338061e2 100644 --- a/prover/crates/bin/witness_generator/src/scheduler.rs +++ b/prover/crates/bin/witness_generator/src/scheduler.rs @@ -74,6 +74,10 @@ impl SchedulerWitnessGenerator { } } + #[tracing::instrument( + skip_all, + fields(l1_batch = %job.block_number) + )] pub fn process_job_sync( job: SchedulerWitnessGeneratorJob, started_at: Instant, @@ -173,6 +177,10 @@ impl JobProcessor for SchedulerWitnessGenerator { }) } + #[tracing::instrument( + skip_all, + fields(l1_batch = %job_id) + )] async fn save_result( &self, job_id: L1BatchNumber, @@ -242,6 +250,10 @@ impl JobProcessor for SchedulerWitnessGenerator { } } +#[tracing::instrument( + skip_all, + fields(l1_batch = %l1_batch_number) +)] pub async fn prepare_job( l1_batch_number: L1BatchNumber, recursion_tip_job_id: u32, diff --git a/prover/crates/bin/witness_generator/src/utils.rs b/prover/crates/bin/witness_generator/src/utils.rs index a1046f258fc1..7671e2fd86db 100644 --- a/prover/crates/bin/witness_generator/src/utils.rs +++ b/prover/crates/bin/witness_generator/src/utils.rs @@ -122,6 +122,10 @@ impl StoredObject for SchedulerPartialInputWrapper { serialize_using_bincode!(); } +#[tracing::instrument( + skip_all, + fields(l1_batch = %block_number, circuit_id = %circuit.numeric_circuit_type()) +)] pub async fn save_circuit( block_number: L1BatchNumber, circuit: ZkSyncBaseLayerCircuit, @@ -143,6 +147,10 @@ pub async fn save_circuit( (circuit_id, blob_url) } +#[tracing::instrument( + skip_all, + fields(l1_batch = %block_number) +)] pub async fn save_recursive_layer_prover_input_artifacts( block_number: L1BatchNumber, aggregations: Vec<( @@ -174,6 +182,10 @@ pub async fn save_recursive_layer_prover_input_artifacts( ids_and_urls } +#[tracing::instrument( + skip_all, + fields(l1_batch = %block_number, circuit_id = %circuit_id) +)] pub async fn save_node_aggregations_artifacts( block_number: L1BatchNumber, circuit_id: u8, @@ -196,6 +208,7 @@ pub async fn save_node_aggregations_artifacts( .unwrap() } +#[tracing::instrument(skip_all)] pub async fn load_proofs_for_job_ids( job_ids: &[u32], object_store: &dyn ObjectStore, @@ -211,6 +224,7 @@ pub async fn load_proofs_for_job_ids( /// Note that recursion tip may not have proofs for some specific circuits (because the batch didn't contain them). /// In this scenario, we still need to pass a proof, but it won't be taken into account during proving. /// For this scenario, we use an empty_proof, but any proof would suffice. +#[tracing::instrument(skip_all)] pub async fn load_proofs_for_recursion_tip( job_ids: Vec<(u8, u32)>, object_store: &dyn ObjectStore, diff --git a/prover/crates/bin/witness_vector_generator/src/generator.rs b/prover/crates/bin/witness_vector_generator/src/generator.rs index 5574f0f1578d..e26173067fb0 100644 --- a/prover/crates/bin/witness_vector_generator/src/generator.rs +++ b/prover/crates/bin/witness_vector_generator/src/generator.rs @@ -61,6 +61,10 @@ impl WitnessVectorGenerator { } } + #[tracing::instrument( + skip_all, + fields(l1_batch = %job.block_number) + )] pub fn generate_witness_vector( job: ProverJob, keystore: &Keystore, @@ -134,6 +138,11 @@ impl JobProcessor for WitnessVectorGenerator { }) } + #[tracing::instrument( + name = "WitnessVectorGenerator::save_result", + skip_all, + fields(l1_batch = %artifacts.prover_job.block_number) + )] async fn save_result( &self, job_id: Self::JobId, diff --git a/prover/docs/03_launch.md b/prover/docs/03_launch.md index 395a35e90227..203fb6e8cecf 100644 --- a/prover/docs/03_launch.md +++ b/prover/docs/03_launch.md @@ -15,6 +15,8 @@ It will create a config similar to `dev`, but with: You can always switch back to dev config via `zk env dev`. +**Important:** If you change environments, you have to do `zk init` again. + ## Enter the prover workspace All the commands for binaries in the prover workspace must be done from the prover folder: