Skip to content

Commit

Permalink
[replay-verify] Add an option to skip txns that are known to broke ba…
Browse files Browse the repository at this point in the history
…ckward compatibility (#5747)
  • Loading branch information
runtian-zhou authored and zekun000 committed Dec 21, 2022
1 parent 5d88aaa commit d0e0a26
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 24 deletions.
9 changes: 8 additions & 1 deletion execution/executor-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@

#![forbid(unsafe_code)]

use std::{cmp::max, collections::HashMap, sync::Arc};
use std::{
cmp::max,
collections::{BTreeSet, HashMap},
sync::Arc,
};

use anyhow::Result;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -122,6 +126,9 @@ pub trait TransactionReplayer: Send {
&self,
transactions: Vec<Transaction>,
transaction_infos: Vec<TransactionInfo>,
writesets: Vec<WriteSet>,
events: Vec<Vec<ContractEvent>>,
txns_to_skip: Arc<BTreeSet<Version>>,
) -> Result<()>;

fn commit(&self) -> Result<Arc<ExecutedChunk>>;
Expand Down
110 changes: 98 additions & 12 deletions execution/executor/src/chunk_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@ use aptos_infallible::{Mutex, RwLock};
use aptos_logger::prelude::*;
use aptos_state_view::StateViewId;
use aptos_types::{
contract_event::ContractEvent,
ledger_info::LedgerInfoWithSignatures,
transaction::{
Transaction, TransactionInfo, TransactionListWithProof, TransactionOutputListWithProof,
Transaction, TransactionInfo, TransactionListWithProof, TransactionOutput,
TransactionOutputListWithProof, TransactionStatus, Version,
},
write_set::WriteSet,
};
use aptos_vm::VMExecutor;
use executor_types::{
ChunkCommitNotification, ChunkExecutorTrait, ExecutedChunk, TransactionReplayer,
};
use fail::fail_point;
use std::{marker::PhantomData, sync::Arc};
use std::{collections::BTreeSet, marker::PhantomData, sync::Arc};
use storage_interface::{
cached_state_view::CachedStateView, sync_proof_fetcher::SyncProofFetcher, DbReaderWriter,
ExecutedTrees,
Expand Down Expand Up @@ -297,13 +300,18 @@ impl<V: VMExecutor> TransactionReplayer for ChunkExecutor<V> {
&self,
transactions: Vec<Transaction>,
transaction_infos: Vec<TransactionInfo>,
writesets: Vec<WriteSet>,
events: Vec<Vec<ContractEvent>>,
txns_to_skip: Arc<BTreeSet<Version>>,
) -> Result<()> {
self.maybe_initialize()?;
self.inner
.read()
.as_ref()
.expect("not reset")
.replay(transactions, transaction_infos)
self.inner.read().as_ref().expect("not reset").replay(
transactions,
transaction_infos,
writesets,
events,
txns_to_skip,
)
}

fn commit(&self) -> Result<Arc<ExecutedChunk>> {
Expand All @@ -313,6 +321,60 @@ impl<V: VMExecutor> TransactionReplayer for ChunkExecutor<V> {

impl<V: VMExecutor> TransactionReplayer for ChunkExecutorInner<V> {
fn replay(
&self,
mut transactions: Vec<Transaction>,
mut transaction_infos: Vec<TransactionInfo>,
writesets: Vec<WriteSet>,
events: Vec<Vec<ContractEvent>>,
txns_to_skip: Arc<BTreeSet<Version>>,
) -> Result<()> {
let current_begin_version = {
self.commit_queue
.lock()
.persisted_and_latest_view()
.1
.version()
.ok_or_else(|| anyhow!("Current version is not available"))?
};

let mut offset = current_begin_version;
let total_length = transactions.len();

for version in txns_to_skip
.range(current_begin_version + 1..current_begin_version + total_length as u64 + 1)
{
let remaining = transactions.split_off((version - offset) as usize);
let remaining_info = transaction_infos.split_off((version - offset) as usize);
let txn_to_skip = transactions.pop().unwrap();
let txn_info = transaction_infos.pop().unwrap();

self.replay_impl(transactions, transaction_infos)?;

self.apply_transaction_and_output(
txn_to_skip,
TransactionOutput::new(
writesets[(version - current_begin_version - 1) as usize].clone(),
events[(version - current_begin_version - 1) as usize].clone(),
txn_info.gas_used(),
TransactionStatus::Keep(txn_info.status().clone()),
),
txn_info,
)?;

transactions = remaining;
transaction_infos = remaining_info;
offset = version + 1;
}
self.replay_impl(transactions, transaction_infos)
}

fn commit(&self) -> Result<Arc<ExecutedChunk>> {
self.commit_chunk_impl()
}
}

impl<V: VMExecutor> ChunkExecutorInner<V> {
fn replay_impl(
&self,
transactions: Vec<Transaction>,
mut transaction_infos: Vec<TransactionInfo>,
Expand All @@ -322,13 +384,14 @@ impl<V: VMExecutor> TransactionReplayer for ChunkExecutorInner<V> {

let mut executed_chunk = ExecutedChunk::default();
let mut to_run = Some(transactions);

while !to_run.as_ref().unwrap().is_empty() {
// Execute transactions.
let state_view = self.state_view(&latest_view)?;
let txns = to_run.take().unwrap();
let (executed, to_discard, to_retry) =
ChunkOutput::by_transaction_execution::<V>(txns, state_view)?
.apply_to_ledger(&latest_view)?;
let chunk_output = ChunkOutput::by_transaction_execution::<V>(txns, state_view)?;

let (executed, to_discard, to_retry) = chunk_output.apply_to_ledger(&latest_view)?;

// Accumulate result and deal with retry
ensure_no_discard(to_discard)?;
Expand All @@ -347,7 +410,30 @@ impl<V: VMExecutor> TransactionReplayer for ChunkExecutorInner<V> {
Ok(())
}

fn commit(&self) -> Result<Arc<ExecutedChunk>> {
self.commit_chunk_impl()
fn apply_transaction_and_output(
&self,
txn: Transaction,
output: TransactionOutput,
expected_info: TransactionInfo,
) -> Result<()> {
let (_persisted_view, latest_view) = self.commit_queue.lock().persisted_and_latest_view();

info!(
"Overiding the output of txn at version: {:?}",
latest_view.version().unwrap(),
);

let chunk_output = ChunkOutput::by_transaction_output(
vec![(txn, output)],
self.state_view(&latest_view)?,
)?;

let (executed, to_discard, _to_retry) = chunk_output.apply_to_ledger(&latest_view)?;

// Accumulate result and deal with retry
ensure_no_discard(to_discard)?;
executed.ensure_transaction_infos_match(&vec![expected_info])?;
self.commit_queue.lock().enqueue(executed);
Ok(())
}
}
4 changes: 2 additions & 2 deletions execution/executor/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use std::{iter::once, sync::Arc};
use std::{collections::BTreeSet, iter::once, sync::Arc};

use proptest::prelude::*;

Expand Down Expand Up @@ -661,7 +661,7 @@ proptest! {
// replay txns in one batch across epoch boundary,
// and the replayer should deal with `Retry`s automatically
let replayer = chunk_executor_tests::TestExecutor::new();
replayer.executor.replay(block.txns, txn_infos).unwrap();
replayer.executor.replay(block.txns, txn_infos, vec![], vec![], Arc::new(BTreeSet::new())).unwrap();
replayer.executor.commit().unwrap();
let replayed_db = replayer.db.reader.clone();
prop_assert_eq!(
Expand Down
1 change: 1 addition & 0 deletions storage/backup/backup-cli/src/backup_types/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ fn test_end_to_end_impl(d: TestData) {
global_restore_opt,
store,
None, /* epoch_history */
vec![],
)
.run(),
)
Expand Down
44 changes: 35 additions & 9 deletions storage/backup/backup-cli/src/backup_types/transaction/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ use futures::{
stream::{Peekable, Stream, TryStreamExt},
StreamExt,
};
use itertools::zip_eq;
use itertools::{izip, Itertools};
use std::{
cmp::{max, min},
collections::BTreeSet,
pin::Pin,
sync::Arc,
time::Instant,
Expand Down Expand Up @@ -82,6 +83,7 @@ struct LoadedChunk {
pub txns: Vec<Transaction>,
pub txn_infos: Vec<TransactionInfo>,
pub event_vecs: Vec<Vec<ContractEvent>>,
pub write_sets: Vec<WriteSet>,
pub range_proof: TransactionAccumulatorRangeProof,
pub ledger_info: LedgerInfoWithSignatures,
}
Expand All @@ -96,13 +98,15 @@ impl LoadedChunk {
let mut txns = Vec::new();
let mut txn_infos = Vec::new();
let mut event_vecs = Vec::new();
let mut write_sets = Vec::new();

while let Some(record_bytes) = file.read_record_bytes().await? {
let (txn, txn_info, events, _write_set): (_, _, _, WriteSet) =
let (txn, txn_info, events, write_set): (_, _, _, WriteSet) =
bcs::from_bytes(&record_bytes)?;
txns.push(txn);
txn_infos.push(txn_info);
event_vecs.push(events);
write_sets.push(write_set);
}

ensure!(
Expand Down Expand Up @@ -145,6 +149,7 @@ impl LoadedChunk {
event_vecs,
range_proof,
ledger_info,
write_sets,
})
}
}
Expand All @@ -155,13 +160,15 @@ impl TransactionRestoreController {
global_opt: GlobalRestoreOptions,
storage: Arc<dyn BackupStorage>,
epoch_history: Option<Arc<EpochHistory>>,
txns_to_skip: Vec<Version>,
) -> Self {
let inner = TransactionRestoreBatchController::new(
global_opt,
storage,
vec![opt.manifest_handle],
opt.replay_from_version,
epoch_history,
txns_to_skip.into_iter().collect(),
);

Self { inner }
Expand All @@ -181,6 +188,7 @@ pub struct TransactionRestoreBatchController {
manifest_handles: Vec<FileHandle>,
replay_from_version: Option<Version>,
epoch_history: Option<Arc<EpochHistory>>,
txns_to_skip: Arc<BTreeSet<Version>>,
}

impl TransactionRestoreBatchController {
Expand All @@ -190,13 +198,15 @@ impl TransactionRestoreBatchController {
manifest_handles: Vec<FileHandle>,
replay_from_version: Option<Version>,
epoch_history: Option<Arc<EpochHistory>>,
txns_to_skip: Vec<Version>,
) -> Self {
Self {
global_opt,
storage,
manifest_handles,
replay_from_version,
epoch_history,
txns_to_skip: Arc::new(txns_to_skip.into_iter().collect()),
}
}

Expand Down Expand Up @@ -325,7 +335,11 @@ impl TransactionRestoreBatchController {
global_first_version: Version,
loaded_chunk_stream: impl Stream<Item = Result<LoadedChunk>> + Unpin,
restore_handler: &RestoreHandler,
) -> Result<Option<impl Stream<Item = Result<(Transaction, TransactionInfo)>>>> {
) -> Result<
Option<
impl Stream<Item = Result<(Transaction, TransactionInfo, WriteSet, Vec<ContractEvent>)>>,
>,
> {
let next_expected_version = self
.global_opt
.run_mode
Expand Down Expand Up @@ -359,6 +373,7 @@ impl TransactionRestoreBatchController {
mut txns,
mut txn_infos,
mut event_vecs,
mut write_sets,
range_proof: _,
ledger_info: _,
} = chunk;
Expand All @@ -368,6 +383,7 @@ impl TransactionRestoreBatchController {
txns.drain(num_to_keep..);
txn_infos.drain(num_to_keep..);
event_vecs.drain(num_to_keep..);
write_sets.drain(num_to_keep..);
last_version = target_version;
}

Expand All @@ -377,6 +393,7 @@ impl TransactionRestoreBatchController {
let txns_to_save: Vec<_> = txns.drain(..num_to_save).collect();
let txn_infos_to_save: Vec<_> = txn_infos.drain(..num_to_save).collect();
let event_vecs_to_save: Vec<_> = event_vecs.drain(..num_to_save).collect();
write_sets.drain(..num_to_save);

tokio::task::spawn_blocking(move || {
restore_handler.save_transactions(
Expand All @@ -398,7 +415,9 @@ impl TransactionRestoreBatchController {
}

Ok(stream::iter(
zip_eq(txns, txn_infos).into_iter().map(Result::<_>::Ok),
izip!(txns, txn_infos, write_sets, event_vecs)
.into_iter()
.map(Result::<_>::Ok),
))
})
})
Expand All @@ -422,7 +441,9 @@ impl TransactionRestoreBatchController {
async fn replay_transactions(
&self,
restore_handler: &RestoreHandler,
txns_to_execute_stream: impl Stream<Item = Result<(Transaction, TransactionInfo)>>,
txns_to_execute_stream: impl Stream<
Item = Result<(Transaction, TransactionInfo, WriteSet, Vec<ContractEvent>)>,
>,
) -> Result<()> {
let first_version = self.replay_from_version.unwrap();
restore_handler.reset_state_store();
Expand All @@ -434,15 +455,20 @@ impl TransactionRestoreBatchController {
.try_chunks(BATCH_SIZE)
.err_into::<anyhow::Error>()
.map_ok(|chunk| {
let (txns, txn_infos): (Vec<_>, Vec<_>) = chunk.into_iter().unzip();
let (txns, txn_infos, write_sets, events): (Vec<_>, Vec<_>, Vec<_>, Vec<_>) =
chunk.into_iter().multiunzip();
let chunk_replayer = chunk_replayer.clone();
let txns_to_skip = self.txns_to_skip.clone();

async move {
let _timer = OTHER_TIMERS_SECONDS
.with_label_values(&["replay_txn_chunk"])
.start_timer();
tokio::task::spawn_blocking(move || chunk_replayer.replay(txns, txn_infos))
.err_into::<anyhow::Error>()
.await
tokio::task::spawn_blocking(move || {
chunk_replayer.replay(txns, txn_infos, write_sets, events, txns_to_skip)
})
.err_into::<anyhow::Error>()
.await
}
})
.try_buffered_x(self.global_opt.concurrent_downloads, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ fn end_to_end() {
.unwrap(),
store,
None, /* epoch_history */
vec![],
)
.run(),
)
Expand Down
1 change: 1 addition & 0 deletions storage/backup/backup-cli/src/bin/db-restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ async fn main_impl() -> Result<()> {
global_opt,
storage.init_storage().await?,
None, /* epoch_history */
vec![],
)
.run()
.await?;
Expand Down
Loading

0 comments on commit d0e0a26

Please sign in to comment.