Skip to content
This repository has been archived by the owner on Aug 28, 2024. It is now read-only.

Commit

Permalink
perf(db): Try yet another storage log pruning approach (matter-labs#2268
Browse files Browse the repository at this point in the history
)

## What ❔

Structures storage log pruning differently by first loading primary keys
for the latest logs in the pruned block range, and then range-removing
older logs based on these PKs. Both of these queries are designed to use
particular indexes, making them have predictable performance.

## Why ❔

The current DB queries for storage log pruning sometimes use
unpredictable indexes and have suboptimal performance.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli authored and gabrieldemian committed Jun 21, 2024
1 parent 0c7345d commit 7c0a9fd
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 102 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

119 changes: 63 additions & 56 deletions core/lib/dal/src/pruning_dal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::ops;

use itertools::Itertools;
use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt};
use zksync_types::{L1BatchNumber, L2BlockNumber};

Expand Down Expand Up @@ -27,8 +28,8 @@ pub struct PruningInfo {
pub struct HardPruningStats {
pub deleted_l1_batches: u64,
pub deleted_l2_blocks: u64,
pub deleted_storage_logs_from_past_batches: u64,
pub deleted_storage_logs_from_pruned_batches: u64,
pub overwriting_logs: u64,
pub deleted_storage_logs: u64,
pub deleted_events: u64,
pub deleted_call_traces: u64,
pub deleted_l2_to_l1_logs: u64,
Expand All @@ -41,6 +42,14 @@ enum PruneType {
Hard,
}

/// Raw database presentation of a primary key in the `miniblocks` table.
#[derive(Debug)]
struct StorageLogPrimaryKey {
hashed_key: Vec<u8>,
miniblock_number: i64,
operation_number: i32,
}

impl PruningDal<'_, '_> {
pub async fn get_pruning_info(&mut self) -> DalResult<PruningInfo> {
let pruning_info = sqlx::query!(
Expand Down Expand Up @@ -174,17 +183,18 @@ impl PruningDal<'_, '_> {
self.clear_transaction_fields(first_l2_block_to_prune..=last_l2_block_to_prune)
.await?;

// The deleting of logs is split into two queries to make it faster,
// only the first query has to go through all previous logs
// and the query optimizer should be happy with it
let deleted_storage_logs_from_past_batches = self
.prune_storage_logs_from_past_l2_blocks(
first_l2_block_to_prune..=last_l2_block_to_prune,
)
.await?;
let deleted_storage_logs_from_pruned_batches = self
.prune_storage_logs_in_range(first_l2_block_to_prune..=last_l2_block_to_prune)
// Storage log pruning is designed to use deterministic indexes and thus have predictable performance.
//
// - `get_pks_for_latest_logs` is guaranteed to use the block number index (that's the only WHERE condition),
// and the supplied range of blocks should be reasonably small.
// - `prune_storage_logs` is virtually guaranteed to use the primary key index since the query removes ranges w.r.t. this index.
//
// Combining these two queries or using more sophisticated queries leads to fluctuating performance due to
// unpredictable indexes being used.
let new_logs = self
.get_pks_for_latest_logs(first_l2_block_to_prune..=last_l2_block_to_prune)
.await?;
let deleted_storage_logs = self.prune_storage_logs(&new_logs).await?;
let deleted_l1_batches = self.delete_l1_batches(last_l1_batch_to_prune).await?;
let deleted_l2_blocks = self.delete_l2_blocks(last_l2_block_to_prune).await?;

Expand All @@ -194,8 +204,8 @@ impl PruningDal<'_, '_> {
deleted_events,
deleted_l2_to_l1_logs,
deleted_call_traces,
deleted_storage_logs_from_past_batches,
deleted_storage_logs_from_pruned_batches,
overwriting_logs: new_logs.len() as u64,
deleted_storage_logs,
}
} else {
HardPruningStats::default()
Expand Down Expand Up @@ -314,65 +324,62 @@ impl PruningDal<'_, '_> {
Ok(execution_result.rows_affected())
}

async fn prune_storage_logs_from_past_l2_blocks(
/// Gets primary keys for all latest logs in the specified L2 block range.
async fn get_pks_for_latest_logs(
&mut self,
l2_blocks_to_prune: ops::RangeInclusive<L2BlockNumber>,
) -> DalResult<u64> {
let execution_result = sqlx::query!(
) -> DalResult<Vec<StorageLogPrimaryKey>> {
sqlx::query_as!(
StorageLogPrimaryKey,
r#"
DELETE FROM storage_logs
SELECT DISTINCT
ON (hashed_key) hashed_key,
miniblock_number,
operation_number
FROM
storage_logs
WHERE
storage_logs.miniblock_number < $1
AND hashed_key IN (
SELECT
hashed_key
FROM
storage_logs
WHERE
miniblock_number BETWEEN $1 AND $2
)
miniblock_number BETWEEN $1 AND $2
ORDER BY
hashed_key,
miniblock_number DESC,
operation_number DESC
"#,
i64::from(l2_blocks_to_prune.start().0),
i64::from(l2_blocks_to_prune.end().0)
)
.instrument("hard_prune_batches_range#prune_storage_logs_from_past_l2_blocks")
.instrument("hard_prune_batches_range#get_latest_logs")
.with_arg("l2_blocks_to_prune", &l2_blocks_to_prune)
.report_latency()
.execute(self.storage)
.await?;
Ok(execution_result.rows_affected())
.fetch_all(self.storage)
.await
}

async fn prune_storage_logs_in_range(
&mut self,
l2_blocks_to_prune: ops::RangeInclusive<L2BlockNumber>,
) -> DalResult<u64> {
/// Removes storage logs overwritten by the specified new logs.
async fn prune_storage_logs(&mut self, new_logs: &[StorageLogPrimaryKey]) -> DalResult<u64> {
let (hashed_keys, block_numbers, operation_numbers): (Vec<_>, Vec<_>, Vec<_>) = new_logs
.iter()
.map(|log| {
(
log.hashed_key.as_slice(),
log.miniblock_number,
log.operation_number,
)
})
.multiunzip();
let execution_result = sqlx::query!(
r#"
DELETE FROM storage_logs USING (
SELECT
hashed_key,
MAX(ARRAY[miniblock_number, operation_number]::INT[]) AS op
FROM
storage_logs
WHERE
miniblock_number BETWEEN $1 AND $2
GROUP BY
hashed_key
) AS last_storage_logs
DELETE FROM storage_logs USING UNNEST($1::bytea[], $2::BIGINT[], $3::INT[]) AS new_logs (hashed_key, miniblock_number, operation_number)
WHERE
storage_logs.miniblock_number BETWEEN $1 AND $2
AND last_storage_logs.hashed_key = storage_logs.hashed_key
AND (
storage_logs.miniblock_number != last_storage_logs.op[1]
OR storage_logs.operation_number != last_storage_logs.op[2]
)
storage_logs.hashed_key = new_logs.hashed_key
AND (storage_logs.miniblock_number, storage_logs.operation_number) < (new_logs.miniblock_number, new_logs.operation_number)
"#,
i64::from(l2_blocks_to_prune.start().0),
i64::from(l2_blocks_to_prune.end().0)
&hashed_keys as &[&[u8]],
&block_numbers,
&operation_numbers
)
.instrument("hard_prune_batches_range#prune_storage_logs_in_range")
.with_arg("l2_blocks_to_prune", &l2_blocks_to_prune)
.instrument("hard_prune_batches_range#prune_storage_logs")
.with_arg("new_logs.len", &new_logs.len())
.report_latency()
.execute(self.storage)
.await?;
Expand Down
6 changes: 2 additions & 4 deletions core/lib/dal/src/pruning_dal/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,7 @@ async fn storage_logs_pruning_works_correctly() {
&[random_storage_log(2, 3), random_storage_log(3, 4)],
);
assert_l2_block_storage_logs_equal(L2BlockNumber(1), &actual_logs, &[random_storage_log(1, 1)]);
assert_eq!(stats.deleted_storage_logs_from_past_batches, 0);
assert_eq!(stats.deleted_storage_logs_from_pruned_batches, 1);
assert_eq!(stats.deleted_storage_logs, 1);

let stats = transaction
.pruning_dal()
Expand All @@ -402,8 +401,7 @@ async fn storage_logs_pruning_works_correctly() {
&actual_logs,
&[random_storage_log(5, 7)],
);
assert_eq!(stats.deleted_storage_logs_from_past_batches, 1);
assert_eq!(stats.deleted_storage_logs_from_pruned_batches, 1);
assert_eq!(stats.deleted_storage_logs, 2);
}

#[tokio::test]
Expand Down
19 changes: 7 additions & 12 deletions core/node/db_pruner/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ pub(super) enum MetricPruneType {
enum PrunedEntityType {
L1Batch,
L2Block,
StorageLogFromPrunedBatch,
StorageLogFromPastBatch,
StorageLog,
OverwritingLog, // not really removed; just used to measure query complexity
Event,
L2ToL1Log,
CallTrace,
Expand Down Expand Up @@ -44,27 +44,22 @@ impl DbPrunerMetrics {
let HardPruningStats {
deleted_l1_batches,
deleted_l2_blocks,
deleted_storage_logs_from_past_batches,
deleted_storage_logs_from_pruned_batches,
overwriting_logs,
deleted_storage_logs,
deleted_events,
deleted_call_traces,
deleted_l2_to_l1_logs,
} = stats;
let deleted_storage_logs =
deleted_storage_logs_from_past_batches + deleted_storage_logs_from_pruned_batches;
tracing::info!(
"Performed pruning of database, deleted {deleted_l1_batches} L1 batches, {deleted_l2_blocks} L2 blocks, \
{deleted_storage_logs} storage logs ({deleted_storage_logs_from_pruned_batches} from pruned batches + \
{deleted_storage_logs_from_past_batches} from past batches), \
{deleted_storage_logs} storage logs ({overwriting_logs} overwriting logs), \
{deleted_events} events, {deleted_call_traces} call traces, {deleted_l2_to_l1_logs} L2-to-L1 logs"
);

self.deleted_entities[&PrunedEntityType::L1Batch].observe(deleted_l1_batches);
self.deleted_entities[&PrunedEntityType::L2Block].observe(deleted_l2_blocks);
self.deleted_entities[&PrunedEntityType::StorageLogFromPastBatch]
.observe(deleted_storage_logs_from_past_batches);
self.deleted_entities[&PrunedEntityType::StorageLogFromPrunedBatch]
.observe(deleted_storage_logs_from_pruned_batches);
self.deleted_entities[&PrunedEntityType::OverwritingLog].observe(overwriting_logs);
self.deleted_entities[&PrunedEntityType::StorageLog].observe(deleted_storage_logs);
self.deleted_entities[&PrunedEntityType::Event].observe(deleted_events);
self.deleted_entities[&PrunedEntityType::L2ToL1Log].observe(deleted_l2_to_l1_logs);
self.deleted_entities[&PrunedEntityType::CallTrace].observe(deleted_call_traces);
Expand Down

0 comments on commit 7c0a9fd

Please sign in to comment.