Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(vm-runner): Allow switching between VMs for latest protocol version – follow ups #2567

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-core-reusable.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ jobs:
base_token: ["Eth", "Custom"]
deployment_mode: ["Rollup", "Validium"]
env:
SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads,vm_runner_bwip,da_dispatcher${{ matrix.consensus && ',consensus' || '' }}${{ matrix.base_token == 'Custom' && ',base_token_ratio_persister' || '' }}"
SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads,vm_runner_bwip,vm_playground,da_dispatcher${{ matrix.consensus && ',consensus' || '' }}${{ matrix.base_token == 'Custom' && ',base_token_ratio_persister' || '' }}"

runs-on: [matterlabs-ci-runner]
steps:
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use async_trait::async_trait;
use zksync_config::configs::ExperimentalVmPlaygroundConfig;
use zksync_node_framework_derive::{FromContext, IntoContext};
use zksync_state_keeper::MainBatchExecutor;
use zksync_types::L2ChainId;
use zksync_vm_runner::{
impls::{VmPlayground, VmPlaygroundIo, VmPlaygroundLoaderTask},
ConcurrentOutputHandlerFactoryTask,
};

use crate::{
implementations::resources::pools::{MasterPool, PoolResource},
implementations::resources::{
healthcheck::AppHealthCheckResource,
pools::{MasterPool, PoolResource},
},
StopReceiver, Task, TaskId, WiringError, WiringLayer,
};

Expand All @@ -32,6 +34,8 @@ impl VmPlaygroundLayer {
#[context(crate = crate)]
pub struct Input {
pub master_pool: PoolResource<MasterPool>,
#[context(default)]
pub app_health: AppHealthCheckResource,
}

#[derive(Debug, IntoContext)]
Expand All @@ -55,7 +59,10 @@ impl WiringLayer for VmPlaygroundLayer {
}

async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let Input { master_pool } = input;
let Input {
master_pool,
app_health,
} = input;

// - 1 connection for `StorageSyncTask` which can hold a long-term connection in case it needs to
// catch up cache.
Expand All @@ -64,19 +71,21 @@ impl WiringLayer for VmPlaygroundLayer {
// - 1 connection for the only running VM instance.
let connection_pool = master_pool.get_custom(3).await?;

let mut batch_executor = Box::new(MainBatchExecutor::new(false, false));
batch_executor.set_fast_vm_mode(self.config.fast_vm_mode);

let (playground, tasks) = VmPlayground::new(
connection_pool,
batch_executor,
self.config.fast_vm_mode,
self.config.db_path,
self.zksync_network_id,
self.config.first_processed_batch,
self.config.reset,
)
.await?;

app_health
.0
.insert_component(playground.health_check())
.map_err(WiringError::internal)?;

Ok(Output {
output_handler_factory_task: tasks.output_handler_factory_task,
loader_task: tasks.loader_task,
Expand Down
2 changes: 2 additions & 0 deletions core/node/vm_runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ zksync_utils.workspace = true
zksync_prover_interface.workspace = true
zksync_object_store.workspace = true
zksync_vm_utils.workspace = true
zksync_health_check.workspace = true

serde.workspace = true
tokio = { workspace = true, features = ["time"] }
anyhow.workspace = true
async-trait.workspace = true
Expand Down
51 changes: 45 additions & 6 deletions core/node/vm_runner/src/impls/playground.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,40 @@ use std::{

use anyhow::Context as _;
use async_trait::async_trait;
use serde::Serialize;
use tokio::{
fs,
sync::{oneshot, watch},
};
use zksync_dal::{Connection, ConnectionPool, Core, CoreDal};
use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck};
use zksync_state::RocksdbStorage;
use zksync_state_keeper::{BatchExecutor, StateKeeperOutputHandler, UpdatesManager};
use zksync_types::{L1BatchNumber, L2ChainId};
use zksync_state_keeper::{MainBatchExecutor, StateKeeperOutputHandler, UpdatesManager};
use zksync_types::{vm::FastVmMode, L1BatchNumber, L2ChainId};

use crate::{
ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, OutputHandlerFactory,
StorageSyncTask, VmRunner, VmRunnerIo, VmRunnerStorage,
};

#[derive(Debug, Serialize)]
struct VmPlaygroundHealth {
vm_mode: FastVmMode,
last_processed_batch: L1BatchNumber,
}

impl From<VmPlaygroundHealth> for Health {
fn from(health: VmPlaygroundHealth) -> Self {
Health::from(HealthStatus::Ready).with_details(health)
}
}

/// Virtual machine playground. Does not persist anything in Postgres; instead, keeps an L1 batch cursor as a plain text file in the RocksDB directory
/// (so that the playground doesn't repeatedly process same batches after a restart).
#[derive(Debug)]
pub struct VmPlayground {
pool: ConnectionPool<Core>,
batch_executor: Box<dyn BatchExecutor>,
batch_executor: MainBatchExecutor,
rocksdb_path: String,
chain_id: L2ChainId,
io: VmPlaygroundIo,
Expand All @@ -39,14 +53,14 @@ impl VmPlayground {
/// Creates a new playground.
pub async fn new(
pool: ConnectionPool<Core>,
batch_executor: Box<dyn BatchExecutor>,
vm_mode: FastVmMode,
rocksdb_path: String,
chain_id: L2ChainId,
first_processed_batch: L1BatchNumber,
reset_state: bool,
) -> anyhow::Result<(Self, VmPlaygroundTasks)> {
tracing::info!(
"Starting VM playground with executor {batch_executor:?}, first processed batch is #{first_processed_batch} \
"Starting VM playground with mode {vm_mode:?}, first processed batch is #{first_processed_batch} \
(reset processing: {reset_state:?})"
);

Expand All @@ -59,9 +73,14 @@ impl VmPlayground {
latest_processed_batch.unwrap_or(first_processed_batch)
};

let mut batch_executor = MainBatchExecutor::new(false, false);
batch_executor.set_fast_vm_mode(vm_mode);

let io = VmPlaygroundIo {
cursor_file_path,
vm_mode,
latest_processed_batch: Arc::new(watch::channel(latest_processed_batch).0),
health_updater: Arc::new(ReactiveHealthCheck::new("vm_playground").1),
};
let (output_handler_factory, output_handler_factory_task) =
ConcurrentOutputHandlerFactory::new(
Expand Down Expand Up @@ -92,6 +111,11 @@ impl VmPlayground {
))
}

/// Returns a health check for this component.
pub fn health_check(&self) -> ReactiveHealthCheck {
self.io.health_updater.subscribe()
}

#[cfg(test)]
pub(crate) fn io(&self) -> &VmPlaygroundIo {
&self.io
Expand Down Expand Up @@ -123,6 +147,8 @@ impl VmPlayground {
.with_context(|| format!("cannot create dir `{}`", self.rocksdb_path))?;

if let Some(reset_to_batch) = self.reset_to_batch {
self.io.health_updater.update(HealthStatus::Affected.into());

self.reset_rocksdb_cache(reset_to_batch).await?;
self.io
.write_cursor(reset_to_batch)
Expand All @@ -131,6 +157,8 @@ impl VmPlayground {
tracing::info!("Finished resetting playground state");
}

self.io.update_health();

let (loader, loader_task) = VmRunnerStorage::new(
self.pool.clone(),
self.rocksdb_path,
Expand All @@ -144,7 +172,7 @@ impl VmPlayground {
Box::new(self.io),
Arc::new(loader),
Box::new(self.output_handler_factory),
self.batch_executor,
Box::new(self.batch_executor),
);
vm_runner.run(stop_receiver).await
}
Expand Down Expand Up @@ -184,9 +212,11 @@ pub struct VmPlaygroundTasks {
#[derive(Debug, Clone)]
pub struct VmPlaygroundIo {
cursor_file_path: PathBuf,
vm_mode: FastVmMode,
// We don't read this value from the cursor file in the `VmRunnerIo` implementation because reads / writes
// aren't guaranteed to be atomic.
latest_processed_batch: Arc<watch::Sender<L1BatchNumber>>,
health_updater: Arc<HealthUpdater>,
}

impl VmPlaygroundIo {
Expand Down Expand Up @@ -218,6 +248,14 @@ impl VmPlaygroundIo {
})
}

fn update_health(&self) {
let health = VmPlaygroundHealth {
vm_mode: self.vm_mode,
last_processed_batch: *self.latest_processed_batch.borrow(),
};
self.health_updater.update(health.into());
}

#[cfg(test)]
pub(crate) fn subscribe_to_completed_batches(&self) -> watch::Receiver<L1BatchNumber> {
self.latest_processed_batch.subscribe()
Expand Down Expand Up @@ -268,6 +306,7 @@ impl VmRunnerIo for VmPlaygroundIo {
self.write_cursor(l1_batch_number).await?;
// We should only update the in-memory value after the write to the cursor file succeeded.
self.latest_processed_batch.send_replace(l1_batch_number);
self.update_health();
Ok(())
}
}
Expand Down
17 changes: 13 additions & 4 deletions core/node/vm_runner/src/tests/playground.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use test_casing::test_casing;
use tokio::sync::watch;
use zksync_health_check::HealthStatus;
use zksync_node_genesis::{insert_genesis_batch, GenesisParams};
use zksync_state::RocksdbStorage;
use zksync_state_keeper::MainBatchExecutor;
use zksync_types::vm::FastVmMode;

use super::*;
Expand Down Expand Up @@ -33,11 +33,9 @@ async fn run_playground(
.unwrap();
}

let mut batch_executor = MainBatchExecutor::new(false, false);
batch_executor.set_fast_vm_mode(FastVmMode::Shadow);
let (playground, playground_tasks) = VmPlayground::new(
pool.clone(),
Box::new(batch_executor),
FastVmMode::Shadow,
rocksdb_dir.path().to_str().unwrap().to_owned(),
genesis_params.config().l2_chain_id,
L1BatchNumber(0),
Expand All @@ -62,6 +60,7 @@ async fn run_playground(
.unwrap(),
L1BatchNumber(1)
);
let mut health_check = playground.health_check();

let mut completed_batches = playground_io.subscribe_to_completed_batches();
let task_handles = [
Expand All @@ -78,6 +77,16 @@ async fn run_playground(
.wait_for(|&number| number == L1BatchNumber(1))
.await
.unwrap();
health_check
.wait_for(|health| {
if !matches!(health.status(), HealthStatus::Ready) {
return false;
}
let health_details = health.details().unwrap();
assert_eq!(health_details["vm_mode"], "shadow");
health_details["last_processed_batch"] == 1_u64
})
.await;

// Check that playground I/O works correctly.
assert_eq!(
Expand Down
70 changes: 67 additions & 3 deletions core/tests/ts-integration/src/context-owner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as zksync from 'zksync-ethers';
import * as ethers from 'ethers';
import { BigNumberish } from 'ethers';

import { TestContext, TestEnvironment, TestWallets } from './types';
import { NodeMode, TestContext, TestEnvironment, TestWallets } from './types';
import { lookupPrerequisites } from './prerequisites';
import { Reporter } from './reporter';
import { scaledGasPrice } from './helpers';
Expand Down Expand Up @@ -541,17 +541,81 @@ export class TestContextOwner {
this.reporter.finishAction();
}

/**
* Waits until the VM playground processes all L1 batches. If the playground runs the new VM in the shadow mode, this means
* that there are no divergence in old and new VM execution. Outputs a warning if the VM playground isn't run or runs not in the shadow mode.
*/
private async waitForVmPlayground() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask you to add somewhat verbose comments on what do these functions do and why do we need them? Because there is a good chance that in the future someone will hard time understanding what's going on in here.

while (true) {
const lastProcessedBatch = await this.lastPlaygroundBatch();
if (lastProcessedBatch === undefined) {
this.reporter.warn('The node does not run VM playground; run to check old / new VM divergence');
break;
}
const lastNodeBatch = await this.l2Provider.getL1BatchNumber();

this.reporter.debug(`VM playground progress: L1 batch #${lastProcessedBatch} / ${lastNodeBatch}`);
if (lastProcessedBatch >= lastNodeBatch) {
break;
}
await zksync.utils.sleep(500);
}
}

/**
* Returns the number of the last L1 batch processed by the VM playground, taking it from the node health endpoint.
* Returns `undefined` if the VM playground isn't run or doesn't have the shadow mode.
*/
private async lastPlaygroundBatch() {
interface VmPlaygroundHealth {
readonly status: string;
readonly details?: {
vm_mode?: string;
last_processed_batch?: number;
};
}

interface NodeHealth {
readonly components: {
vm_playground?: VmPlaygroundHealth;
};
}

const healthcheckPort = process.env.API_HEALTHCHECK_PORT ?? '3071';
const nodeHealth = (await (await fetch(`http://127.0.0.1:${healthcheckPort}/health`)).json()) as NodeHealth;
const playgroundHealth = nodeHealth.components.vm_playground;
if (playgroundHealth === undefined) {
return undefined;
}
if (playgroundHealth.status !== 'ready') {
throw new Error(`Unexpected VM playground health status: ${playgroundHealth.status}`);
}
if (playgroundHealth.details?.vm_mode !== 'shadow') {
this.reporter.warn(
`VM playground mode is '${playgroundHealth.details?.vm_mode}'; should be set to 'shadow' to check VM divergence`
);
return undefined;
}
return playgroundHealth.details?.last_processed_batch ?? 0;
}

/**
* Performs context deinitialization.
*/
async teardownContext() {
// Reset the reporter context.
this.reporter = new Reporter();
try {
if (this.env.nodeMode == NodeMode.Main && this.env.network === 'localhost') {
// Check that the VM execution hasn't diverged using the VM playground. The component and thus the main node
// will crash on divergence, so we just need to make sure that the test doesn't exit before the VM playground
// processes all batches on the node.
this.reporter.startAction('Waiting for VM playground to catch up');
await this.waitForVmPlayground();
this.reporter.finishAction();
}
this.reporter.startAction(`Tearing down the context`);

await this.collectFunds();

this.reporter.finishAction();
} catch (error: any) {
// Report the issue to the console and mark the last action as failed.
Expand Down
Loading
Loading