Skip to content

Commit

Permalink
Merge branch 'main' of github.com:lambdaclass/zksync-era into validiu…
Browse files Browse the repository at this point in the history
…m_mode
  • Loading branch information
ilitteri committed Dec 18, 2023
2 parents 8ea17e1 + c55a658 commit 432d278
Show file tree
Hide file tree
Showing 67 changed files with 12,097 additions and 8,190 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build-core-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ jobs:
- external-node
- contract-verifier
- cross-external-nodes-checker
- snapshots-creator
steps:
- uses: actions/checkout@ac593985615ec2ede58e132d2e21d2b1cbd6127c # v3
with:
Expand Down
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions core/bin/snapshots_creator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ zksync_dal = { path = "../../lib/dal" }
zksync_env_config = { path = "../../lib/env_config" }
zksync_utils = { path = "../../lib/utils" }
zksync_types = { path = "../../lib/types" }
zksync_core = { path = "../../lib/zksync_core" }
zksync_object_store = { path = "../../lib/object_store" }
vlog = { path = "../../lib/vlog" }

anyhow = "1.0"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
futures = "0.3"
serde = { version = "1.0.189", features = ["derive"] }
serde_json = "1.0"

[dev-dependencies]
rand = "0.8"
49 changes: 48 additions & 1 deletion core/bin/snapshots_creator/src/chunking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::ops;
use zksync_types::{H256, U256};
use zksync_utils::u256_to_h256;

pub fn get_chunk_hashed_keys_range(chunk_id: u64, chunks_count: u64) -> ops::RangeInclusive<H256> {
pub(crate) fn get_chunk_hashed_keys_range(
chunk_id: u64,
chunks_count: u64,
) -> ops::RangeInclusive<H256> {
assert!(chunks_count > 0);
let mut stride = U256::MAX / chunks_count;
let stride_minus_one = if stride < U256::MAX {
Expand All @@ -20,3 +23,47 @@ pub fn get_chunk_hashed_keys_range(chunk_id: u64, chunks_count: u64) -> ops::Ran
}
u256_to_h256(start)..=u256_to_h256(end)
}

#[cfg(test)]
mod tests {
use zksync_utils::h256_to_u256;

use super::*;

#[test]
fn chunking_is_correct() {
for chunks_count in (2..10).chain([42, 256, 500, 1_001, 12_345]) {
println!("Testing chunks_count={chunks_count}");
let chunked_ranges: Vec<_> = (0..chunks_count)
.map(|chunk_id| get_chunk_hashed_keys_range(chunk_id, chunks_count))
.collect();

assert_eq!(*chunked_ranges[0].start(), H256::zero());
assert_eq!(
*chunked_ranges.last().unwrap().end(),
H256::repeat_byte(0xff)
);
for window in chunked_ranges.windows(2) {
let [prev_chunk, next_chunk] = window else {
unreachable!();
};
assert_eq!(
h256_to_u256(*prev_chunk.end()) + 1,
h256_to_u256(*next_chunk.start())
);
}

let chunk_sizes: Vec<_> = chunked_ranges
.iter()
.map(|chunk| h256_to_u256(*chunk.end()) - h256_to_u256(*chunk.start()) + 1)
.collect();

// Check that chunk sizes are roughly equal. Due to how chunks are constructed, the sizes
// of all chunks except for the last one are the same, and the last chunk size may be slightly smaller;
// the difference in sizes is lesser than the number of chunks.
let min_chunk_size = chunk_sizes.iter().copied().min().unwrap();
let max_chunk_size = chunk_sizes.iter().copied().max().unwrap();
assert!(max_chunk_size - min_chunk_size < U256::from(chunks_count));
}
}
}
118 changes: 61 additions & 57 deletions core/bin/snapshots_creator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
mod chunking;

use std::{cmp::max, time::Duration};
//! Snapshot creator utility. Intended to run on a schedule, with each run creating a new snapshot.
use anyhow::Context as _;
use prometheus_exporter::PrometheusExporterConfig;
use tokio::sync::{watch, Semaphore};
use vise::{Buckets, Gauge, Histogram, Metrics, Unit};
use zksync_config::{configs::PrometheusConfig, PostgresConfig, SnapshotsCreatorConfig};
use zksync_dal::ConnectionPool;
use zksync_env_config::{object_store::SnapshotsObjectStoreConfig, FromEnv};
Expand All @@ -18,28 +15,15 @@ use zksync_types::{
};
use zksync_utils::ceil_div;

use crate::chunking::get_chunk_hashed_keys_range;

#[derive(Debug, Metrics)]
#[metrics(prefix = "snapshots_creator")]
struct SnapshotsCreatorMetrics {
storage_logs_chunks_count: Gauge<u64>,

storage_logs_chunks_left_to_process: Gauge<u64>,

#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
snapshot_generation_duration: Histogram<Duration>,

snapshot_l1_batch: Gauge<u64>,

#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
storage_logs_processing_duration: Histogram<Duration>,
use crate::{
chunking::get_chunk_hashed_keys_range,
metrics::{FactoryDepsStage, StorageChunkStage, METRICS},
};

#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
factory_deps_processing_duration: Histogram<Duration>,
}
#[vise::register]
pub(crate) static METRICS: vise::Global<SnapshotsCreatorMetrics> = vise::Global::new();
mod chunking;
mod metrics;
#[cfg(test)]
mod tests;

async fn maybe_enable_prometheus_metrics(
stop_receiver: watch::Receiver<bool>,
Expand Down Expand Up @@ -70,14 +54,23 @@ async fn process_storage_logs_single_chunk(
) -> anyhow::Result<String> {
let _permit = semaphore.acquire().await?;
let hashed_keys_range = get_chunk_hashed_keys_range(chunk_id, chunks_count);
let latency = METRICS.storage_logs_processing_duration.start();
let mut conn = pool.access_storage_tagged("snapshots_creator").await?;

let latency =
METRICS.storage_logs_processing_duration[&StorageChunkStage::LoadFromPostgres].start();
let logs = conn
.snapshots_creator_dal()
.get_storage_logs_chunk(miniblock_number, hashed_keys_range)
.await
.context("Error fetching storage logs count")?;
drop(conn);
let latency = latency.observe();
tracing::info!(
"Loaded chunk {chunk_id} ({} logs) from Postgres in {latency:?}",
logs.len()
);

let latency = METRICS.storage_logs_processing_duration[&StorageChunkStage::SaveToGcs].start();
let storage_logs_chunk = SnapshotStorageLogsChunk { storage_logs: logs };
let key = SnapshotStorageLogsStorageKey {
l1_batch_number,
Expand All @@ -87,18 +80,15 @@ async fn process_storage_logs_single_chunk(
.put(key, &storage_logs_chunk)
.await
.context("Error storing storage logs chunk in blob store")?;

let output_filepath_prefix = blob_store.get_storage_prefix::<SnapshotStorageLogsChunk>();
let output_filepath = format!("{output_filepath_prefix}/{filename}");
let latency = latency.observe();

let elapsed = latency.observe();
let tasks_left = METRICS.storage_logs_chunks_left_to_process.dec_by(1) - 1;
tracing::info!(
"Finished chunk number {chunk_id}, overall_progress {}/{}, step took {elapsed:?}, output stored in {output_filepath}",
chunks_count - tasks_left,
chunks_count
);

"Saved chunk {chunk_id} (overall progress {}/{chunks_count}) in {latency:?} to location: {output_filepath}",
chunks_count - tasks_left
);
Ok(output_filepath)
}

Expand All @@ -108,43 +98,58 @@ async fn process_factory_deps(
miniblock_number: MiniblockNumber,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<String> {
let latency = METRICS.factory_deps_processing_duration.start();
let mut conn = pool.access_storage_tagged("snapshots_creator").await?;

tracing::info!("Loading factory deps from Postgres...");
let latency =
METRICS.factory_deps_processing_duration[&FactoryDepsStage::LoadFromPostgres].start();
let factory_deps = conn
.snapshots_creator_dal()
.get_all_factory_deps(miniblock_number)
.await?;
let factory_deps = SnapshotFactoryDependencies { factory_deps };
drop(conn);
let latency = latency.observe();
tracing::info!("Loaded {} factory deps in {latency:?}", factory_deps.len());

tracing::info!("Saving factory deps to GCS...");
let latency = METRICS.factory_deps_processing_duration[&FactoryDepsStage::SaveToGcs].start();
let factory_deps = SnapshotFactoryDependencies { factory_deps };
let filename = blob_store
.put(l1_batch_number, &factory_deps)
.await
.context("Error storing factory deps in blob store")?;
let output_filepath_prefix = blob_store.get_storage_prefix::<SnapshotFactoryDependencies>();
let output_filepath = format!("{output_filepath_prefix}/{filename}");
let elapsed = latency.observe();
let latency = latency.observe();
tracing::info!(
"Finished factory dependencies, step took {elapsed:?} , output stored in {}",
output_filepath
"Saved {} factory deps in {latency:?} to location: {output_filepath}",
factory_deps.factory_deps.len()
);

Ok(output_filepath)
}

async fn run(
blob_store: Box<dyn ObjectStore>,
replica_pool: ConnectionPool,
master_pool: ConnectionPool,
min_chunk_count: u64,
) -> anyhow::Result<()> {
let latency = METRICS.snapshot_generation_duration.start();

let config = SnapshotsCreatorConfig::from_env().context("SnapshotsCreatorConfig::from_env")?;

let mut conn = replica_pool
.access_storage_tagged("snapshots_creator")
.await?;

// we subtract 1 so that after restore, EN node has at least one l1 batch to fetch
let l1_batch_number = conn.blocks_dal().get_sealed_l1_batch_number().await? - 1;
// We subtract 1 so that after restore, EN node has at least one L1 batch to fetch
let sealed_l1_batch_number = conn.blocks_dal().get_sealed_l1_batch_number().await?;
assert_ne!(
sealed_l1_batch_number,
L1BatchNumber(0),
"Cannot create snapshot when only the genesis L1 batch is present in Postgres"
);
let l1_batch_number = sealed_l1_batch_number - 1;

let mut master_conn = master_pool
.access_storage_tagged("snapshots_creator")
Expand All @@ -155,33 +160,31 @@ async fn run(
.await?
.is_some()
{
tracing::info!("Snapshot for L1 batch number {l1_batch_number} already exists, exiting",);
tracing::info!("Snapshot for L1 batch number {l1_batch_number} already exists, exiting");
return Ok(());
}
drop(master_conn);

let last_miniblock_number_in_batch = conn
let (_, last_miniblock_number_in_batch) = conn
.blocks_dal()
.get_miniblock_range_of_l1_batch(l1_batch_number)
.await?
.context("Error fetching last miniblock number")?
.1;
.context("Error fetching last miniblock number")?;
let distinct_storage_logs_keys_count = conn
.snapshots_creator_dal()
.get_distinct_storage_logs_keys_count(l1_batch_number)
.await?;

drop(conn);

let chunk_size = config.storage_logs_chunk_size;
// we force at least 10 chunks to avoid situations where only one chunk is created in tests
let chunks_count = max(10, ceil_div(distinct_storage_logs_keys_count, chunk_size));
// We force the minimum number of chunks to avoid situations where only one chunk is created in tests.
let chunks_count = ceil_div(distinct_storage_logs_keys_count, chunk_size).max(min_chunk_count);

METRICS.storage_logs_chunks_count.set(chunks_count);

tracing::info!(
"Creating snapshot for storage logs up to miniblock {last_miniblock_number_in_batch}, l1_batch {}",
l1_batch_number.0
"Creating snapshot for storage logs up to miniblock {last_miniblock_number_in_batch}, \
L1 batch {l1_batch_number}"
);
tracing::info!("Starting to generate {chunks_count} chunks of expected size {chunk_size}");

Expand Down Expand Up @@ -210,15 +213,14 @@ async fn run(
)
});
let mut storage_logs_output_files = futures::future::try_join_all(tasks).await?;
tracing::info!("Finished generating snapshot, storing progress in db");
// Sanity check: the number of files should equal the number of chunks.
assert_eq!(storage_logs_output_files.len(), chunks_count as usize);
storage_logs_output_files.sort();

tracing::info!("Finished generating snapshot, storing progress in Postgres");
let mut master_conn = master_pool
.access_storage_tagged("snapshots_creator")
.await?;

storage_logs_output_files.sort();
//sanity check
assert_eq!(storage_logs_output_files.len(), chunks_count as usize);
master_conn
.snapshots_dal()
.add_snapshot(
Expand All @@ -237,10 +239,12 @@ async fn run(
"storage_logs_chunks_count: {}",
METRICS.storage_logs_chunks_count.get()
);

Ok(())
}

/// Minimum number of storage log chunks to produce.
const MIN_CHUNK_COUNT: u64 = 10;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (stop_sender, stop_receiver) = watch::channel(false);
Expand Down Expand Up @@ -284,7 +288,7 @@ async fn main() -> anyhow::Result<()> {
.build()
.await?;

run(blob_store, replica_pool, master_pool).await?;
run(blob_store, replica_pool, master_pool, MIN_CHUNK_COUNT).await?;
tracing::info!("Finished running snapshot creator!");
stop_sender.send(true).ok();
Ok(())
Expand Down
Loading

0 comments on commit 432d278

Please sign in to comment.