Skip to content

Commit

Permalink
accomodate that LedgerCommitProgress was not recorded after all ledge…
Browse files Browse the repository at this point in the history
…r commits
  • Loading branch information
msmouse authored and perryjrandall committed Sep 13, 2023
1 parent d058198 commit 40e1161
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 78 deletions.
10 changes: 10 additions & 0 deletions storage/aptosdb/src/event_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,16 @@ impl EventStore {
Ok(())
}

pub fn latest_version(&self) -> Result<Option<Version>> {
let mut iter = self.event_db.iter::<EventSchema>(ReadOptions::default())?;
iter.seek_to_last();
if let Some(((version, _), _)) = iter.next().transpose()? {
Ok(Some(version))
} else {
Ok(None)
}
}

/// Prune a set of candidate events in the range of version in [begin, end) and all related indices
pub fn prune_events(
&self,
Expand Down
45 changes: 21 additions & 24 deletions storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const MAX_WRITE_SETS_AFTER_SNAPSHOT: LeafCount = buffered_state::TARGET_SNAPSHOT
* (buffered_state::ASYNC_COMMIT_CHANNEL_BUFFER_SIZE + 2 + 1/* Rendezvous channel */)
* 2;

const MAX_COMMIT_PROGRESS_DIFFERENCE: u64 = 100000;
pub const MAX_COMMIT_PROGRESS_DIFFERENCE: u64 = 100000;

static IO_POOL: Lazy<rayon::ThreadPool> = Lazy::new(|| {
rayon::ThreadPoolBuilder::new()
Expand Down Expand Up @@ -313,11 +313,13 @@ impl StateStore {
empty_buffered_state_for_restore: bool,
skip_usage: bool,
) -> Self {
Self::sync_commit_progress(
Arc::clone(&ledger_db),
Arc::clone(&state_kv_db),
/*crash_if_difference_is_too_large=*/ true,
);
if !hack_for_tests {
Self::sync_commit_progress(
Arc::clone(&ledger_db),
Arc::clone(&state_kv_db),
/*crash_if_difference_is_too_large=*/ true,
);
}
let state_db = Arc::new(StateDb {
ledger_db,
state_merkle_db,
Expand Down Expand Up @@ -387,29 +389,24 @@ impl StateStore {
.expect_version();
assert_ge!(state_kv_commit_progress, overall_commit_progress);

if ledger_commit_progress != overall_commit_progress {
info!(
ledger_commit_progress = ledger_commit_progress,
"Start truncation...",
);
let difference = ledger_commit_progress - overall_commit_progress;
if crash_if_difference_is_too_large {
assert_le!(difference, MAX_COMMIT_PROGRESS_DIFFERENCE);
}
// TODO(grao): Support truncation for splitted ledger DBs.
truncate_ledger_db(
ledger_db,
ledger_commit_progress,
overall_commit_progress,
difference as usize,
)
.expect("Failed to truncate ledger db.");
// LedgerCommitProgress was not guaranteed to commit after all ledger changes finish,
// have to attempt truncating every column family.
info!(
ledger_commit_progress = ledger_commit_progress,
"Attempt ledger truncation...",
);
let difference = ledger_commit_progress - overall_commit_progress;
if crash_if_difference_is_too_large {
assert_le!(difference, MAX_COMMIT_PROGRESS_DIFFERENCE);
}
// TODO(grao): Support truncation for split ledger DBs.
truncate_ledger_db(ledger_db, overall_commit_progress)
.expect("Failed to truncate ledger db.");

if state_kv_commit_progress != overall_commit_progress {
info!(
state_kv_commit_progress = state_kv_commit_progress,
"Start truncation..."
"Start state KV truncation..."
);
let difference = state_kv_commit_progress - overall_commit_progress;
if crash_if_difference_is_too_large {
Expand Down
161 changes: 107 additions & 54 deletions storage/aptosdb/src/utils/truncation_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ use crate::{
},
state_kv_db::StateKvDb,
state_merkle_db::StateMerkleDb,
state_store::MAX_COMMIT_PROGRESS_DIFFERENCE,
utils::get_progress,
EventStore, TransactionStore, NUM_STATE_SHARDS,
};
use anyhow::Result;
use aptos_jellyfish_merkle::{node_type::NodeKey, StaleNodeIndex};
use aptos_logger::info;
use aptos_schemadb::{
schema::{Schema, SeekKeyCodec},
ReadOptions, SchemaBatch, DB,
Expand Down Expand Up @@ -61,34 +63,18 @@ pub(crate) fn get_state_merkle_commit_progress(
)
}

pub(crate) fn truncate_ledger_db(
ledger_db: Arc<LedgerDb>,
current_version: Version,
target_version: Version,
batch_size: usize,
) -> Result<()> {
let status = StatusLine::new(Progress::new(target_version));

pub(crate) fn truncate_ledger_db(ledger_db: Arc<LedgerDb>, target_version: Version) -> Result<()> {
let event_store = EventStore::new(ledger_db.event_db_arc());
let transaction_store = TransactionStore::new(Arc::clone(&ledger_db));

let mut current_version = current_version;
while current_version > target_version {
let start_version =
std::cmp::max(current_version - batch_size as u64 + 1, target_version + 1);
let end_version = current_version + 1;
// TODO(grao): Support splitted ledger DBs here.
truncate_ledger_db_single_batch(
ledger_db.metadata_db(),
&event_store,
&transaction_store,
start_version,
end_version,
)?;
current_version = start_version - 1;
status.set_current_version(current_version);
}
assert_eq!(current_version, target_version);
let start_version = target_version + 1;
// TODO(grao): Support split ledger DBs here.
truncate_ledger_db_single_batch(
ledger_db.clone(),
&event_store,
&transaction_store,
start_version,
)?;
Ok(())
}

Expand Down Expand Up @@ -234,20 +220,16 @@ pub(crate) fn num_frozen_nodes_in_accumulator(num_leaves: u64) -> u64 {
fn truncate_transaction_accumulator(
ledger_db: &DB,
start_version: Version,
end_version: Version,
batch: &SchemaBatch,
) -> Result<()> {
let num_frozen_nodes = num_frozen_nodes_in_accumulator(end_version);
let mut iter = ledger_db.iter::<TransactionAccumulatorSchema>(ReadOptions::default())?;
iter.seek_to_last();
let (position, _) = iter.next().transpose()?.unwrap();
assert_eq!(position.to_postorder_index() + 1, num_frozen_nodes);
let num_frozen_nodes = position.to_postorder_index() + 1;
let num_frozen_nodes_after = num_frozen_nodes_in_accumulator(start_version);
let mut num_nodes_to_delete = num_frozen_nodes - num_frozen_nodes_after;

let num_frozen_nodes_after_this_batch = num_frozen_nodes_in_accumulator(start_version);

let mut num_nodes_to_delete = num_frozen_nodes - num_frozen_nodes_after_this_batch;

let start_position = Position::from_postorder_index(num_frozen_nodes_after_this_batch)?;
let start_position = Position::from_postorder_index(num_frozen_nodes_after)?;
iter.seek(&start_position)?;

for item in iter {
Expand All @@ -262,56 +244,69 @@ fn truncate_transaction_accumulator(
}

fn truncate_ledger_db_single_batch(
ledger_db: &DB,
ledger_db: Arc<LedgerDb>,
event_store: &EventStore,
transaction_store: &TransactionStore,
start_version: Version,
end_version: Version,
) -> Result<()> {
let batch = SchemaBatch::new();

delete_transaction_index_data(transaction_store, start_version, end_version, &batch)?;
delete_per_epoch_data(ledger_db, start_version, end_version, &batch)?;
delete_per_version_data(start_version, end_version, &batch)?;
delete_transaction_index_data(transaction_store, start_version, &batch)?;
delete_per_epoch_data(ledger_db.metadata_db(), start_version, &batch)?;
delete_per_version_data(&ledger_db, start_version, &batch)?;

event_store.prune_events(start_version, end_version, &batch)?;
delete_event_data(event_store, start_version, &batch)?;

truncate_transaction_accumulator(ledger_db, start_version, end_version, &batch)?;
truncate_transaction_accumulator(
ledger_db.transaction_accumulator_db(),
start_version,
&batch,
)?;

batch.put::<DbMetadataSchema>(
&DbMetadataKey::LedgerCommitProgress,
&DbMetadataValue::Version(start_version - 1),
)?;
ledger_db.write_schemas(batch)
ledger_db.metadata_db().write_schemas(batch)
}

fn delete_transaction_index_data(
transaction_store: &TransactionStore,
start_version: Version,
end_version: Version,
batch: &SchemaBatch,
) -> Result<()> {
let transactions = transaction_store
.get_transaction_iter(start_version, (end_version - start_version) as usize)?
.get_transaction_iter(start_version, MAX_COMMIT_PROGRESS_DIFFERENCE as usize * 2)?
.collect::<Result<Vec<_>>>()?;
transaction_store.prune_transaction_by_account(&transactions, batch)?;
transaction_store.prune_transaction_by_hash(&transactions, batch)?;
let num_txns = transactions.len();
if num_txns > 0 {
info!(
start_version = start_version,
latest_version = start_version + num_txns as u64 - 1,
"Truncate transaction index data."
);
transaction_store.prune_transaction_by_account(&transactions, batch)?;
transaction_store.prune_transaction_by_hash(&transactions, batch)?;
}

Ok(())
}

fn delete_per_epoch_data(
ledger_db: &DB,
start_version: Version,
end_version: Version,
batch: &SchemaBatch,
) -> Result<()> {
let mut iter = ledger_db.iter::<LedgerInfoSchema>(ReadOptions::default())?;
iter.seek_to_last();
if let Some((epoch, ledger_info)) = iter.next().transpose()? {
let version = ledger_info.commit_info().version();
assert_lt!(version, end_version);
if version >= start_version {
info!(
version = version,
epoch = epoch,
"Truncate latest epoch data."
);
batch.delete::<LedgerInfoSchema>(&epoch)?;
}
}
Expand All @@ -321,7 +316,11 @@ fn delete_per_epoch_data(

for item in iter {
let (version, epoch) = item?;
assert_lt!(version, end_version);
info!(
version = version,
epoch = epoch,
"Truncate epoch ending data."
);
batch.delete::<EpochByVersionSchema>(&version)?;
batch.delete::<LedgerInfoSchema>(&epoch)?;
}
Expand All @@ -330,17 +329,71 @@ fn delete_per_epoch_data(
}

fn delete_per_version_data(
ledger_db: &LedgerDb,
start_version: Version,
end_version: Version,
batch: &SchemaBatch,
) -> Result<()> {
for version in start_version..end_version {
batch.delete::<TransactionInfoSchema>(&version)?;
batch.delete::<TransactionSchema>(&version)?;
batch.delete::<VersionDataSchema>(&version)?;
batch.delete::<WriteSetSchema>(&version)?;
delete_per_version_data_impl::<TransactionInfoSchema>(
ledger_db.transaction_info_db(),
start_version,
batch,
)?;
delete_per_version_data_impl::<TransactionSchema>(
ledger_db.transaction_db(),
start_version,
batch,
)?;
delete_per_version_data_impl::<VersionDataSchema>(
ledger_db.metadata_db(),
start_version,
batch,
)?;
delete_per_version_data_impl::<WriteSetSchema>(ledger_db.write_set_db(), start_version, batch)?;

Ok(())
}

fn delete_per_version_data_impl<S>(
ledger_db: &DB,
start_version: Version,
batch: &SchemaBatch,
) -> Result<()>
where
S: Schema<Key = Version>,
{
let mut iter = ledger_db.iter::<S>(ReadOptions::default())?;
iter.seek_to_last();
if let Some((lastest_version, _)) = iter.next().transpose()? {
if lastest_version >= start_version {
info!(
start_version = start_version,
latest_version = lastest_version,
cf_name = S::COLUMN_FAMILY_NAME,
"Truncate per version data."
);
for version in start_version..=lastest_version {
batch.delete::<S>(&version)?;
}
}
}
Ok(())
}

fn delete_event_data(
event_store: &EventStore,
start_version: Version,
batch: &SchemaBatch,
) -> Result<()> {
if let Some(latest_version) = event_store.latest_version()? {
if latest_version >= start_version {
info!(
start_version = start_version,
latest_version = latest_version,
"Truncate event data."
);
event_store.prune_events(start_version, latest_version + 1, batch)?;
}
}
Ok(())
}

Expand Down

0 comments on commit 40e1161

Please sign in to comment.