Skip to content

Commit

Permalink
refactor(db): Combine storage log pruning into single query (#2279)
Browse files Browse the repository at this point in the history
## What ❔

Minor follow-up for #2268
that combines both parts of log pruning into a single query.

## Why ❔

Easier to maintain and could be slightly more efficient since
intermediate data doesn't need to travel from Postgres to the node and
back.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli authored Jun 19, 2024
1 parent 3bf8966 commit 7f4e6ac
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 123 deletions.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

99 changes: 31 additions & 68 deletions core/lib/dal/src/pruning_dal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::ops;

use itertools::Itertools;
use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt};
use zksync_types::{L1BatchNumber, L2BlockNumber};

Expand Down Expand Up @@ -28,7 +27,6 @@ pub struct PruningInfo {
pub struct HardPruningStats {
pub deleted_l1_batches: u64,
pub deleted_l2_blocks: u64,
pub overwriting_logs: u64,
pub deleted_storage_logs: u64,
pub deleted_events: u64,
pub deleted_call_traces: u64,
Expand All @@ -42,14 +40,6 @@ enum PruneType {
Hard,
}

/// Raw database presentation of a primary key in the `miniblocks` table.
#[derive(Debug)]
struct StorageLogPrimaryKey {
hashed_key: Vec<u8>,
miniblock_number: i64,
operation_number: i32,
}

impl PruningDal<'_, '_> {
pub async fn get_pruning_info(&mut self) -> DalResult<PruningInfo> {
let pruning_info = sqlx::query!(
Expand Down Expand Up @@ -183,18 +173,9 @@ impl PruningDal<'_, '_> {
self.clear_transaction_fields(first_l2_block_to_prune..=last_l2_block_to_prune)
.await?;

// Storage log pruning is designed to use deterministic indexes and thus have predictable performance.
//
// - `get_pks_for_latest_logs` is guaranteed to use the block number index (that's the only WHERE condition),
// and the supplied range of blocks should be reasonably small.
// - `prune_storage_logs` is virtually guaranteed to use the primary key index since the query removes ranges w.r.t. this index.
//
// Combining these two queries or using more sophisticated queries leads to fluctuating performance due to
// unpredictable indexes being used.
let new_logs = self
.get_pks_for_latest_logs(first_l2_block_to_prune..=last_l2_block_to_prune)
let deleted_storage_logs = self
.prune_storage_logs(first_l2_block_to_prune..=last_l2_block_to_prune)
.await?;
let deleted_storage_logs = self.prune_storage_logs(&new_logs).await?;
let deleted_l1_batches = self.delete_l1_batches(last_l1_batch_to_prune).await?;
let deleted_l2_blocks = self.delete_l2_blocks(last_l2_block_to_prune).await?;

Expand All @@ -204,7 +185,6 @@ impl PruningDal<'_, '_> {
deleted_events,
deleted_l2_to_l1_logs,
deleted_call_traces,
overwriting_logs: new_logs.len() as u64,
deleted_storage_logs,
}
} else {
Expand Down Expand Up @@ -324,62 +304,45 @@ impl PruningDal<'_, '_> {
Ok(execution_result.rows_affected())
}

/// Gets primary keys for all latest logs in the specified L2 block range.
async fn get_pks_for_latest_logs(
/// Removes storage logs overwritten by the specified new logs.
async fn prune_storage_logs(
&mut self,
l2_blocks_to_prune: ops::RangeInclusive<L2BlockNumber>,
) -> DalResult<Vec<StorageLogPrimaryKey>> {
sqlx::query_as!(
StorageLogPrimaryKey,
r#"
SELECT DISTINCT
ON (hashed_key) hashed_key,
miniblock_number,
operation_number
FROM
storage_logs
WHERE
miniblock_number BETWEEN $1 AND $2
ORDER BY
hashed_key,
miniblock_number DESC,
operation_number DESC
"#,
i64::from(l2_blocks_to_prune.start().0),
i64::from(l2_blocks_to_prune.end().0)
)
.instrument("hard_prune_batches_range#get_latest_logs")
.with_arg("l2_blocks_to_prune", &l2_blocks_to_prune)
.report_latency()
.fetch_all(self.storage)
.await
}

/// Removes storage logs overwritten by the specified new logs.
async fn prune_storage_logs(&mut self, new_logs: &[StorageLogPrimaryKey]) -> DalResult<u64> {
let (hashed_keys, block_numbers, operation_numbers): (Vec<_>, Vec<_>, Vec<_>) = new_logs
.iter()
.map(|log| {
(
log.hashed_key.as_slice(),
log.miniblock_number,
log.operation_number,
)
})
.multiunzip();
) -> DalResult<u64> {
// Storage log pruning is designed to use deterministic indexes and thus have predictable performance.
//
// - The WITH query is guaranteed to use the block number index (that's the only WHERE condition),
// and the supplied range of blocks should be reasonably small.
// - The main DELETE query is virtually guaranteed to use the primary key index since it removes ranges w.r.t. this index.
//
// Using more sophisticated queries leads to fluctuating performance due to unpredictable indexes being used.
let execution_result = sqlx::query!(
r#"
DELETE FROM storage_logs USING UNNEST($1::bytea[], $2::BIGINT[], $3::INT[]) AS new_logs (hashed_key, miniblock_number, operation_number)
WITH
new_logs AS MATERIALIZED (
SELECT DISTINCT
ON (hashed_key) hashed_key,
miniblock_number,
operation_number
FROM
storage_logs
WHERE
miniblock_number BETWEEN $1 AND $2
ORDER BY
hashed_key,
miniblock_number DESC,
operation_number DESC
)
DELETE FROM storage_logs USING new_logs
WHERE
storage_logs.hashed_key = new_logs.hashed_key
AND (storage_logs.miniblock_number, storage_logs.operation_number) < (new_logs.miniblock_number, new_logs.operation_number)
"#,
&hashed_keys as &[&[u8]],
&block_numbers,
&operation_numbers
i64::from(l2_blocks_to_prune.start().0),
i64::from(l2_blocks_to_prune.end().0)
)
.instrument("hard_prune_batches_range#prune_storage_logs")
.with_arg("new_logs.len", &new_logs.len())
.with_arg("l2_blocks_to_prune", &l2_blocks_to_prune)
.report_latency()
.execute(self.storage)
.await?;
Expand Down
5 changes: 1 addition & 4 deletions core/node/db_pruner/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ enum PrunedEntityType {
L1Batch,
L2Block,
StorageLog,
OverwritingLog, // not really removed; just used to measure query complexity
Event,
L2ToL1Log,
CallTrace,
Expand Down Expand Up @@ -44,21 +43,19 @@ impl DbPrunerMetrics {
let HardPruningStats {
deleted_l1_batches,
deleted_l2_blocks,
overwriting_logs,
deleted_storage_logs,
deleted_events,
deleted_call_traces,
deleted_l2_to_l1_logs,
} = stats;
tracing::info!(
"Performed pruning of database, deleted {deleted_l1_batches} L1 batches, {deleted_l2_blocks} L2 blocks, \
{deleted_storage_logs} storage logs ({overwriting_logs} overwriting logs), \
{deleted_storage_logs} storage logs, \
{deleted_events} events, {deleted_call_traces} call traces, {deleted_l2_to_l1_logs} L2-to-L1 logs"
);

self.deleted_entities[&PrunedEntityType::L1Batch].observe(deleted_l1_batches);
self.deleted_entities[&PrunedEntityType::L2Block].observe(deleted_l2_blocks);
self.deleted_entities[&PrunedEntityType::OverwritingLog].observe(overwriting_logs);
self.deleted_entities[&PrunedEntityType::StorageLog].observe(deleted_storage_logs);
self.deleted_entities[&PrunedEntityType::Event].observe(deleted_events);
self.deleted_entities[&PrunedEntityType::L2ToL1Log].observe(deleted_l2_to_l1_logs);
Expand Down

0 comments on commit 7f4e6ac

Please sign in to comment.