From d0e0a269880363e1d2aa14aceb514cfb99ea9038 Mon Sep 17 00:00:00 2001 From: runtianz Date: Thu, 8 Dec 2022 12:24:35 -0800 Subject: [PATCH] [replay-verify] Add an option to skip txns that are known to broke backward compatibility (#5747) --- execution/executor-types/src/lib.rs | 9 +- execution/executor/src/chunk_executor.rs | 110 ++++++++++++++++-- execution/executor/src/tests/mod.rs | 4 +- .../backup-cli/src/backup_types/tests.rs | 1 + .../src/backup_types/transaction/restore.rs | 44 +++++-- .../src/backup_types/transaction/tests.rs | 1 + .../backup/backup-cli/src/bin/db-restore.rs | 1 + .../backup-cli/src/bin/replay-verify.rs | 7 ++ .../src/coordinators/replay_verify.rs | 4 + .../backup-cli/src/coordinators/restore.rs | 1 + .../backup-cli/src/coordinators/verify.rs | 1 + 11 files changed, 159 insertions(+), 24 deletions(-) diff --git a/execution/executor-types/src/lib.rs b/execution/executor-types/src/lib.rs index ffde655278855..597ba2984f94f 100644 --- a/execution/executor-types/src/lib.rs +++ b/execution/executor-types/src/lib.rs @@ -3,7 +3,11 @@ #![forbid(unsafe_code)] -use std::{cmp::max, collections::HashMap, sync::Arc}; +use std::{ + cmp::max, + collections::{BTreeSet, HashMap}, + sync::Arc, +}; use anyhow::Result; use serde::{Deserialize, Serialize}; @@ -122,6 +126,9 @@ pub trait TransactionReplayer: Send { &self, transactions: Vec, transaction_infos: Vec, + writesets: Vec, + events: Vec>, + txns_to_skip: Arc>, ) -> Result<()>; fn commit(&self) -> Result>; diff --git a/execution/executor/src/chunk_executor.rs b/execution/executor/src/chunk_executor.rs index df1fc24f4ab9e..e33c39c8c2f8a 100644 --- a/execution/executor/src/chunk_executor.rs +++ b/execution/executor/src/chunk_executor.rs @@ -20,17 +20,20 @@ use aptos_infallible::{Mutex, RwLock}; use aptos_logger::prelude::*; use aptos_state_view::StateViewId; use aptos_types::{ + contract_event::ContractEvent, ledger_info::LedgerInfoWithSignatures, transaction::{ - Transaction, TransactionInfo, TransactionListWithProof, TransactionOutputListWithProof, + Transaction, TransactionInfo, TransactionListWithProof, TransactionOutput, + TransactionOutputListWithProof, TransactionStatus, Version, }, + write_set::WriteSet, }; use aptos_vm::VMExecutor; use executor_types::{ ChunkCommitNotification, ChunkExecutorTrait, ExecutedChunk, TransactionReplayer, }; use fail::fail_point; -use std::{marker::PhantomData, sync::Arc}; +use std::{collections::BTreeSet, marker::PhantomData, sync::Arc}; use storage_interface::{ cached_state_view::CachedStateView, sync_proof_fetcher::SyncProofFetcher, DbReaderWriter, ExecutedTrees, @@ -297,13 +300,18 @@ impl TransactionReplayer for ChunkExecutor { &self, transactions: Vec, transaction_infos: Vec, + writesets: Vec, + events: Vec>, + txns_to_skip: Arc>, ) -> Result<()> { self.maybe_initialize()?; - self.inner - .read() - .as_ref() - .expect("not reset") - .replay(transactions, transaction_infos) + self.inner.read().as_ref().expect("not reset").replay( + transactions, + transaction_infos, + writesets, + events, + txns_to_skip, + ) } fn commit(&self) -> Result> { @@ -313,6 +321,60 @@ impl TransactionReplayer for ChunkExecutor { impl TransactionReplayer for ChunkExecutorInner { fn replay( + &self, + mut transactions: Vec, + mut transaction_infos: Vec, + writesets: Vec, + events: Vec>, + txns_to_skip: Arc>, + ) -> Result<()> { + let current_begin_version = { + self.commit_queue + .lock() + .persisted_and_latest_view() + .1 + .version() + .ok_or_else(|| anyhow!("Current version is not available"))? + }; + + let mut offset = current_begin_version; + let total_length = transactions.len(); + + for version in txns_to_skip + .range(current_begin_version + 1..current_begin_version + total_length as u64 + 1) + { + let remaining = transactions.split_off((version - offset) as usize); + let remaining_info = transaction_infos.split_off((version - offset) as usize); + let txn_to_skip = transactions.pop().unwrap(); + let txn_info = transaction_infos.pop().unwrap(); + + self.replay_impl(transactions, transaction_infos)?; + + self.apply_transaction_and_output( + txn_to_skip, + TransactionOutput::new( + writesets[(version - current_begin_version - 1) as usize].clone(), + events[(version - current_begin_version - 1) as usize].clone(), + txn_info.gas_used(), + TransactionStatus::Keep(txn_info.status().clone()), + ), + txn_info, + )?; + + transactions = remaining; + transaction_infos = remaining_info; + offset = version + 1; + } + self.replay_impl(transactions, transaction_infos) + } + + fn commit(&self) -> Result> { + self.commit_chunk_impl() + } +} + +impl ChunkExecutorInner { + fn replay_impl( &self, transactions: Vec, mut transaction_infos: Vec, @@ -322,13 +384,14 @@ impl TransactionReplayer for ChunkExecutorInner { let mut executed_chunk = ExecutedChunk::default(); let mut to_run = Some(transactions); + while !to_run.as_ref().unwrap().is_empty() { // Execute transactions. let state_view = self.state_view(&latest_view)?; let txns = to_run.take().unwrap(); - let (executed, to_discard, to_retry) = - ChunkOutput::by_transaction_execution::(txns, state_view)? - .apply_to_ledger(&latest_view)?; + let chunk_output = ChunkOutput::by_transaction_execution::(txns, state_view)?; + + let (executed, to_discard, to_retry) = chunk_output.apply_to_ledger(&latest_view)?; // Accumulate result and deal with retry ensure_no_discard(to_discard)?; @@ -347,7 +410,30 @@ impl TransactionReplayer for ChunkExecutorInner { Ok(()) } - fn commit(&self) -> Result> { - self.commit_chunk_impl() + fn apply_transaction_and_output( + &self, + txn: Transaction, + output: TransactionOutput, + expected_info: TransactionInfo, + ) -> Result<()> { + let (_persisted_view, latest_view) = self.commit_queue.lock().persisted_and_latest_view(); + + info!( + "Overiding the output of txn at version: {:?}", + latest_view.version().unwrap(), + ); + + let chunk_output = ChunkOutput::by_transaction_output( + vec![(txn, output)], + self.state_view(&latest_view)?, + )?; + + let (executed, to_discard, _to_retry) = chunk_output.apply_to_ledger(&latest_view)?; + + // Accumulate result and deal with retry + ensure_no_discard(to_discard)?; + executed.ensure_transaction_infos_match(&vec![expected_info])?; + self.commit_queue.lock().enqueue(executed); + Ok(()) } } diff --git a/execution/executor/src/tests/mod.rs b/execution/executor/src/tests/mod.rs index bb15d7918ad64..959e369a31ff8 100644 --- a/execution/executor/src/tests/mod.rs +++ b/execution/executor/src/tests/mod.rs @@ -1,7 +1,7 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 -use std::{iter::once, sync::Arc}; +use std::{collections::BTreeSet, iter::once, sync::Arc}; use proptest::prelude::*; @@ -661,7 +661,7 @@ proptest! { // replay txns in one batch across epoch boundary, // and the replayer should deal with `Retry`s automatically let replayer = chunk_executor_tests::TestExecutor::new(); - replayer.executor.replay(block.txns, txn_infos).unwrap(); + replayer.executor.replay(block.txns, txn_infos, vec![], vec![], Arc::new(BTreeSet::new())).unwrap(); replayer.executor.commit().unwrap(); let replayed_db = replayer.db.reader.clone(); prop_assert_eq!( diff --git a/storage/backup/backup-cli/src/backup_types/tests.rs b/storage/backup/backup-cli/src/backup_types/tests.rs index 9274739e3de11..2777c90215d17 100644 --- a/storage/backup/backup-cli/src/backup_types/tests.rs +++ b/storage/backup/backup-cli/src/backup_types/tests.rs @@ -154,6 +154,7 @@ fn test_end_to_end_impl(d: TestData) { global_restore_opt, store, None, /* epoch_history */ + vec![], ) .run(), ) diff --git a/storage/backup/backup-cli/src/backup_types/transaction/restore.rs b/storage/backup/backup-cli/src/backup_types/transaction/restore.rs index 973e83d223c9f..fe4da1e051fd1 100644 --- a/storage/backup/backup-cli/src/backup_types/transaction/restore.rs +++ b/storage/backup/backup-cli/src/backup_types/transaction/restore.rs @@ -41,9 +41,10 @@ use futures::{ stream::{Peekable, Stream, TryStreamExt}, StreamExt, }; -use itertools::zip_eq; +use itertools::{izip, Itertools}; use std::{ cmp::{max, min}, + collections::BTreeSet, pin::Pin, sync::Arc, time::Instant, @@ -82,6 +83,7 @@ struct LoadedChunk { pub txns: Vec, pub txn_infos: Vec, pub event_vecs: Vec>, + pub write_sets: Vec, pub range_proof: TransactionAccumulatorRangeProof, pub ledger_info: LedgerInfoWithSignatures, } @@ -96,13 +98,15 @@ impl LoadedChunk { let mut txns = Vec::new(); let mut txn_infos = Vec::new(); let mut event_vecs = Vec::new(); + let mut write_sets = Vec::new(); while let Some(record_bytes) = file.read_record_bytes().await? { - let (txn, txn_info, events, _write_set): (_, _, _, WriteSet) = + let (txn, txn_info, events, write_set): (_, _, _, WriteSet) = bcs::from_bytes(&record_bytes)?; txns.push(txn); txn_infos.push(txn_info); event_vecs.push(events); + write_sets.push(write_set); } ensure!( @@ -145,6 +149,7 @@ impl LoadedChunk { event_vecs, range_proof, ledger_info, + write_sets, }) } } @@ -155,6 +160,7 @@ impl TransactionRestoreController { global_opt: GlobalRestoreOptions, storage: Arc, epoch_history: Option>, + txns_to_skip: Vec, ) -> Self { let inner = TransactionRestoreBatchController::new( global_opt, @@ -162,6 +168,7 @@ impl TransactionRestoreController { vec![opt.manifest_handle], opt.replay_from_version, epoch_history, + txns_to_skip.into_iter().collect(), ); Self { inner } @@ -181,6 +188,7 @@ pub struct TransactionRestoreBatchController { manifest_handles: Vec, replay_from_version: Option, epoch_history: Option>, + txns_to_skip: Arc>, } impl TransactionRestoreBatchController { @@ -190,6 +198,7 @@ impl TransactionRestoreBatchController { manifest_handles: Vec, replay_from_version: Option, epoch_history: Option>, + txns_to_skip: Vec, ) -> Self { Self { global_opt, @@ -197,6 +206,7 @@ impl TransactionRestoreBatchController { manifest_handles, replay_from_version, epoch_history, + txns_to_skip: Arc::new(txns_to_skip.into_iter().collect()), } } @@ -325,7 +335,11 @@ impl TransactionRestoreBatchController { global_first_version: Version, loaded_chunk_stream: impl Stream> + Unpin, restore_handler: &RestoreHandler, - ) -> Result>>> { + ) -> Result< + Option< + impl Stream)>>, + >, + > { let next_expected_version = self .global_opt .run_mode @@ -359,6 +373,7 @@ impl TransactionRestoreBatchController { mut txns, mut txn_infos, mut event_vecs, + mut write_sets, range_proof: _, ledger_info: _, } = chunk; @@ -368,6 +383,7 @@ impl TransactionRestoreBatchController { txns.drain(num_to_keep..); txn_infos.drain(num_to_keep..); event_vecs.drain(num_to_keep..); + write_sets.drain(num_to_keep..); last_version = target_version; } @@ -377,6 +393,7 @@ impl TransactionRestoreBatchController { let txns_to_save: Vec<_> = txns.drain(..num_to_save).collect(); let txn_infos_to_save: Vec<_> = txn_infos.drain(..num_to_save).collect(); let event_vecs_to_save: Vec<_> = event_vecs.drain(..num_to_save).collect(); + write_sets.drain(..num_to_save); tokio::task::spawn_blocking(move || { restore_handler.save_transactions( @@ -398,7 +415,9 @@ impl TransactionRestoreBatchController { } Ok(stream::iter( - zip_eq(txns, txn_infos).into_iter().map(Result::<_>::Ok), + izip!(txns, txn_infos, write_sets, event_vecs) + .into_iter() + .map(Result::<_>::Ok), )) }) }) @@ -422,7 +441,9 @@ impl TransactionRestoreBatchController { async fn replay_transactions( &self, restore_handler: &RestoreHandler, - txns_to_execute_stream: impl Stream>, + txns_to_execute_stream: impl Stream< + Item = Result<(Transaction, TransactionInfo, WriteSet, Vec)>, + >, ) -> Result<()> { let first_version = self.replay_from_version.unwrap(); restore_handler.reset_state_store(); @@ -434,15 +455,20 @@ impl TransactionRestoreBatchController { .try_chunks(BATCH_SIZE) .err_into::() .map_ok(|chunk| { - let (txns, txn_infos): (Vec<_>, Vec<_>) = chunk.into_iter().unzip(); + let (txns, txn_infos, write_sets, events): (Vec<_>, Vec<_>, Vec<_>, Vec<_>) = + chunk.into_iter().multiunzip(); let chunk_replayer = chunk_replayer.clone(); + let txns_to_skip = self.txns_to_skip.clone(); + async move { let _timer = OTHER_TIMERS_SECONDS .with_label_values(&["replay_txn_chunk"]) .start_timer(); - tokio::task::spawn_blocking(move || chunk_replayer.replay(txns, txn_infos)) - .err_into::() - .await + tokio::task::spawn_blocking(move || { + chunk_replayer.replay(txns, txn_infos, write_sets, events, txns_to_skip) + }) + .err_into::() + .await } }) .try_buffered_x(self.global_opt.concurrent_downloads, 1) diff --git a/storage/backup/backup-cli/src/backup_types/transaction/tests.rs b/storage/backup/backup-cli/src/backup_types/transaction/tests.rs index 579e9d47a9a5f..1d071f94a83f5 100644 --- a/storage/backup/backup-cli/src/backup_types/transaction/tests.rs +++ b/storage/backup/backup-cli/src/backup_types/transaction/tests.rs @@ -90,6 +90,7 @@ fn end_to_end() { .unwrap(), store, None, /* epoch_history */ + vec![], ) .run(), ) diff --git a/storage/backup/backup-cli/src/bin/db-restore.rs b/storage/backup/backup-cli/src/bin/db-restore.rs index bf7dbadfe313f..5f6869ca9cdfe 100644 --- a/storage/backup/backup-cli/src/bin/db-restore.rs +++ b/storage/backup/backup-cli/src/bin/db-restore.rs @@ -92,6 +92,7 @@ async fn main_impl() -> Result<()> { global_opt, storage.init_storage().await?, None, /* epoch_history */ + vec![], ) .run() .await?; diff --git a/storage/backup/backup-cli/src/bin/replay-verify.rs b/storage/backup/backup-cli/src/bin/replay-verify.rs index ff75f31c0f496..49399d1e114ec 100644 --- a/storage/backup/backup-cli/src/bin/replay-verify.rs +++ b/storage/backup/backup-cli/src/bin/replay-verify.rs @@ -48,6 +48,12 @@ struct Opt { end_version: Option, #[clap(long)] validate_modules: bool, + #[clap( + long, + multiple = true, + help = "Skip the execution for txns that are known to break compatibility." + )] + txns_to_skip: Vec, } #[tokio::main] @@ -82,6 +88,7 @@ async fn main_impl() -> Result<()> { opt.start_version.unwrap_or(0), opt.end_version.unwrap_or(Version::MAX), opt.validate_modules, + opt.txns_to_skip, )? .run() .await diff --git a/storage/backup/backup-cli/src/coordinators/replay_verify.rs b/storage/backup/backup-cli/src/coordinators/replay_verify.rs index 6b54f26bd597e..32a83dc6a1b54 100644 --- a/storage/backup/backup-cli/src/coordinators/replay_verify.rs +++ b/storage/backup/backup-cli/src/coordinators/replay_verify.rs @@ -28,6 +28,7 @@ pub struct ReplayVerifyCoordinator { start_version: Version, end_version: Version, validate_modules: bool, + txns_to_skip: Vec, } impl ReplayVerifyCoordinator { @@ -41,6 +42,7 @@ impl ReplayVerifyCoordinator { start_version: Version, end_version: Version, validate_modules: bool, + txns_to_skip: Vec, ) -> Result { Ok(Self { storage, @@ -52,6 +54,7 @@ impl ReplayVerifyCoordinator { start_version, end_version, validate_modules, + txns_to_skip, }) } @@ -132,6 +135,7 @@ impl ReplayVerifyCoordinator { txn_manifests, Some(replay_transactions_from_version), /* replay_from_version */ None, /* epoch_history */ + self.txns_to_skip, ) .run() .await?; diff --git a/storage/backup/backup-cli/src/coordinators/restore.rs b/storage/backup/backup-cli/src/coordinators/restore.rs index fd2803add4f01..8b51e6e298394 100644 --- a/storage/backup/backup-cli/src/coordinators/restore.rs +++ b/storage/backup/backup-cli/src/coordinators/restore.rs @@ -186,6 +186,7 @@ impl RestoreCoordinator { txn_manifests, Some(version + 1), epoch_history, + vec![], ) .run() .await?; diff --git a/storage/backup/backup-cli/src/coordinators/verify.rs b/storage/backup/backup-cli/src/coordinators/verify.rs index 5936b1b8e436b..e9765f5679188 100644 --- a/storage/backup/backup-cli/src/coordinators/verify.rs +++ b/storage/backup/backup-cli/src/coordinators/verify.rs @@ -117,6 +117,7 @@ impl VerifyCoordinator { txn_manifests, None, /* replay_from_version */ Some(epoch_history), + vec![], ) .run() .await?;