Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(db): Try yet another storage log pruning approach #2268

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

119 changes: 63 additions & 56 deletions core/lib/dal/src/pruning_dal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::ops;

use itertools::Itertools;
use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt};
use zksync_types::{L1BatchNumber, L2BlockNumber};

Expand Down Expand Up @@ -27,8 +28,8 @@ pub struct PruningInfo {
pub struct HardPruningStats {
pub deleted_l1_batches: u64,
pub deleted_l2_blocks: u64,
pub deleted_storage_logs_from_past_batches: u64,
pub deleted_storage_logs_from_pruned_batches: u64,
pub overwriting_logs: u64,
pub deleted_storage_logs: u64,
pub deleted_events: u64,
pub deleted_call_traces: u64,
pub deleted_l2_to_l1_logs: u64,
Expand All @@ -41,6 +42,14 @@ enum PruneType {
Hard,
}

/// Raw database presentation of a primary key in the `miniblocks` table.
#[derive(Debug)]
struct StorageLogPrimaryKey {
hashed_key: Vec<u8>,
miniblock_number: i64,
operation_number: i32,
}

impl PruningDal<'_, '_> {
pub async fn get_pruning_info(&mut self) -> DalResult<PruningInfo> {
let pruning_info = sqlx::query!(
Expand Down Expand Up @@ -174,17 +183,18 @@ impl PruningDal<'_, '_> {
self.clear_transaction_fields(first_l2_block_to_prune..=last_l2_block_to_prune)
.await?;

// The deleting of logs is split into two queries to make it faster,
// only the first query has to go through all previous logs
// and the query optimizer should be happy with it
let deleted_storage_logs_from_past_batches = self
.prune_storage_logs_from_past_l2_blocks(
first_l2_block_to_prune..=last_l2_block_to_prune,
)
.await?;
let deleted_storage_logs_from_pruned_batches = self
.prune_storage_logs_in_range(first_l2_block_to_prune..=last_l2_block_to_prune)
// Storage log pruning is designed to use deterministic indexes and thus have predictable performance.
//
// - `get_pks_for_latest_logs` is guaranteed to use the block number index (that's the only WHERE condition),
// and the supplied range of blocks should be reasonably small.
// - `prune_storage_logs` is virtually guaranteed to use the primary key index since the query removes ranges w.r.t. this index.
//
// Combining these two queries or using more sophisticated queries leads to fluctuating performance due to
// unpredictable indexes being used.
let new_logs = self
.get_pks_for_latest_logs(first_l2_block_to_prune..=last_l2_block_to_prune)
.await?;
let deleted_storage_logs = self.prune_storage_logs(&new_logs).await?;
let deleted_l1_batches = self.delete_l1_batches(last_l1_batch_to_prune).await?;
let deleted_l2_blocks = self.delete_l2_blocks(last_l2_block_to_prune).await?;

Expand All @@ -194,8 +204,8 @@ impl PruningDal<'_, '_> {
deleted_events,
deleted_l2_to_l1_logs,
deleted_call_traces,
deleted_storage_logs_from_past_batches,
deleted_storage_logs_from_pruned_batches,
overwriting_logs: new_logs.len() as u64,
deleted_storage_logs,
}
} else {
HardPruningStats::default()
Expand Down Expand Up @@ -314,65 +324,62 @@ impl PruningDal<'_, '_> {
Ok(execution_result.rows_affected())
}

async fn prune_storage_logs_from_past_l2_blocks(
/// Gets primary keys for all latest logs in the specified L2 block range.
async fn get_pks_for_latest_logs(
&mut self,
l2_blocks_to_prune: ops::RangeInclusive<L2BlockNumber>,
) -> DalResult<u64> {
let execution_result = sqlx::query!(
) -> DalResult<Vec<StorageLogPrimaryKey>> {
sqlx::query_as!(
StorageLogPrimaryKey,
r#"
DELETE FROM storage_logs
SELECT DISTINCT
ON (hashed_key) hashed_key,
miniblock_number,
operation_number
FROM
storage_logs
WHERE
storage_logs.miniblock_number < $1
AND hashed_key IN (
SELECT
hashed_key
FROM
storage_logs
WHERE
miniblock_number BETWEEN $1 AND $2
)
miniblock_number BETWEEN $1 AND $2
ORDER BY
hashed_key,
miniblock_number DESC,
operation_number DESC
"#,
i64::from(l2_blocks_to_prune.start().0),
i64::from(l2_blocks_to_prune.end().0)
)
.instrument("hard_prune_batches_range#prune_storage_logs_from_past_l2_blocks")
.instrument("hard_prune_batches_range#get_latest_logs")
.with_arg("l2_blocks_to_prune", &l2_blocks_to_prune)
.report_latency()
.execute(self.storage)
.await?;
Ok(execution_result.rows_affected())
.fetch_all(self.storage)
.await
}

async fn prune_storage_logs_in_range(
&mut self,
l2_blocks_to_prune: ops::RangeInclusive<L2BlockNumber>,
) -> DalResult<u64> {
/// Removes storage logs overwritten by the specified new logs.
async fn prune_storage_logs(&mut self, new_logs: &[StorageLogPrimaryKey]) -> DalResult<u64> {
let (hashed_keys, block_numbers, operation_numbers): (Vec<_>, Vec<_>, Vec<_>) = new_logs
.iter()
.map(|log| {
(
log.hashed_key.as_slice(),
log.miniblock_number,
log.operation_number,
)
})
.multiunzip();
let execution_result = sqlx::query!(
r#"
DELETE FROM storage_logs USING (
SELECT
hashed_key,
MAX(ARRAY[miniblock_number, operation_number]::INT[]) AS op
FROM
storage_logs
WHERE
miniblock_number BETWEEN $1 AND $2
GROUP BY
hashed_key
) AS last_storage_logs
DELETE FROM storage_logs USING UNNEST($1::bytea[], $2::BIGINT[], $3::INT[]) AS new_logs (hashed_key, miniblock_number, operation_number)
WHERE
storage_logs.miniblock_number BETWEEN $1 AND $2
AND last_storage_logs.hashed_key = storage_logs.hashed_key
AND (
storage_logs.miniblock_number != last_storage_logs.op[1]
OR storage_logs.operation_number != last_storage_logs.op[2]
)
storage_logs.hashed_key = new_logs.hashed_key
AND (storage_logs.miniblock_number, storage_logs.operation_number) < (new_logs.miniblock_number, new_logs.operation_number)
"#,
i64::from(l2_blocks_to_prune.start().0),
i64::from(l2_blocks_to_prune.end().0)
&hashed_keys as &[&[u8]],
&block_numbers,
&operation_numbers
)
.instrument("hard_prune_batches_range#prune_storage_logs_in_range")
.with_arg("l2_blocks_to_prune", &l2_blocks_to_prune)
.instrument("hard_prune_batches_range#prune_storage_logs")
.with_arg("new_logs.len", &new_logs.len())
.report_latency()
.execute(self.storage)
.await?;
Expand Down
6 changes: 2 additions & 4 deletions core/lib/dal/src/pruning_dal/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,7 @@ async fn storage_logs_pruning_works_correctly() {
&[random_storage_log(2, 3), random_storage_log(3, 4)],
);
assert_l2_block_storage_logs_equal(L2BlockNumber(1), &actual_logs, &[random_storage_log(1, 1)]);
assert_eq!(stats.deleted_storage_logs_from_past_batches, 0);
assert_eq!(stats.deleted_storage_logs_from_pruned_batches, 1);
assert_eq!(stats.deleted_storage_logs, 1);

let stats = transaction
.pruning_dal()
Expand All @@ -402,8 +401,7 @@ async fn storage_logs_pruning_works_correctly() {
&actual_logs,
&[random_storage_log(5, 7)],
);
assert_eq!(stats.deleted_storage_logs_from_past_batches, 1);
assert_eq!(stats.deleted_storage_logs_from_pruned_batches, 1);
assert_eq!(stats.deleted_storage_logs, 2);
}

#[tokio::test]
Expand Down
19 changes: 7 additions & 12 deletions core/node/db_pruner/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ pub(super) enum MetricPruneType {
enum PrunedEntityType {
L1Batch,
L2Block,
StorageLogFromPrunedBatch,
StorageLogFromPastBatch,
StorageLog,
OverwritingLog, // not really removed; just used to measure query complexity
Event,
L2ToL1Log,
CallTrace,
Expand Down Expand Up @@ -44,27 +44,22 @@ impl DbPrunerMetrics {
let HardPruningStats {
deleted_l1_batches,
deleted_l2_blocks,
deleted_storage_logs_from_past_batches,
deleted_storage_logs_from_pruned_batches,
overwriting_logs,
deleted_storage_logs,
deleted_events,
deleted_call_traces,
deleted_l2_to_l1_logs,
} = stats;
let deleted_storage_logs =
deleted_storage_logs_from_past_batches + deleted_storage_logs_from_pruned_batches;
tracing::info!(
"Performed pruning of database, deleted {deleted_l1_batches} L1 batches, {deleted_l2_blocks} L2 blocks, \
{deleted_storage_logs} storage logs ({deleted_storage_logs_from_pruned_batches} from pruned batches + \
{deleted_storage_logs_from_past_batches} from past batches), \
{deleted_storage_logs} storage logs ({overwriting_logs} overwriting logs), \
{deleted_events} events, {deleted_call_traces} call traces, {deleted_l2_to_l1_logs} L2-to-L1 logs"
);

self.deleted_entities[&PrunedEntityType::L1Batch].observe(deleted_l1_batches);
self.deleted_entities[&PrunedEntityType::L2Block].observe(deleted_l2_blocks);
self.deleted_entities[&PrunedEntityType::StorageLogFromPastBatch]
.observe(deleted_storage_logs_from_past_batches);
self.deleted_entities[&PrunedEntityType::StorageLogFromPrunedBatch]
.observe(deleted_storage_logs_from_pruned_batches);
self.deleted_entities[&PrunedEntityType::OverwritingLog].observe(overwriting_logs);
self.deleted_entities[&PrunedEntityType::StorageLog].observe(deleted_storage_logs);
self.deleted_entities[&PrunedEntityType::Event].observe(deleted_events);
self.deleted_entities[&PrunedEntityType::L2ToL1Log].observe(deleted_l2_to_l1_logs);
self.deleted_entities[&PrunedEntityType::CallTrace].observe(deleted_call_traces);
Expand Down
Loading