Skip to content

Commit

Permalink
Stop locking BufferedState for read
Browse files Browse the repository at this point in the history
Move the part which need to be read out, so that the read path is not
blocked by buffered state back pressure or sync commits.

Also the pre commited version is now in the same lock of the latest
state, eliminating some edge cases.
  • Loading branch information
msmouse committed Nov 13, 2024
1 parent fbf5ad1 commit 431249e
Show file tree
Hide file tree
Showing 17 changed files with 168 additions and 174 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions storage/aptosdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ claims = { workspace = true }
clap = { workspace = true, optional = true }
crossbeam-channel = { workspace = true, optional = true }
dashmap = { workspace = true }
derive_more = { workspace = true }
either = { workspace = true }
hex = { workspace = true }
indicatif = { workspace = true, optional = true }
Expand Down
8 changes: 4 additions & 4 deletions storage/aptosdb/src/backup/restore_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
};
use aptos_crypto::HashValue;
use aptos_schemadb::{SchemaBatch, DB};
use aptos_storage_interface::{db_ensure as ensure, AptosDbError, Result};
use aptos_storage_interface::{db_ensure as ensure, state_delta::StateDelta, AptosDbError, Result};
use aptos_types::{
contract_event::ContractEvent,
ledger_info::LedgerInfoWithSignatures,
Expand Down Expand Up @@ -161,9 +161,9 @@ pub(crate) fn save_transactions(
)?;

ledger_db.write_schemas(ledger_db_batch)?;
ledger_db
.metadata_db()
.set_pre_committed_version(last_version);

*state_store.current_state().lock() =
StateDelta::new_empty_with_version(Some(last_version));
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/backup/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ proptest! {
fn test_get_transaction_iter(input in arb_blocks_to_commit()) {
let tmp_dir = TempPath::new();
let db = AptosDB::new_for_test(&tmp_dir);
let mut in_memory_state = db.state_store.buffered_state().lock().current_state().clone();
let mut in_memory_state = db.state_store.current_state_cloned();
let _ancestor = in_memory_state.base.clone();
let mut cur_ver: Version = 0;
for (txns_to_commit, ledger_info_with_sigs) in input.iter() {
Expand Down
7 changes: 1 addition & 6 deletions storage/aptosdb/src/db/aptosdb_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,7 @@ pub fn test_state_merkle_pruning_impl(
.unwrap();

// augment DB in blocks
let mut in_memory_state = db
.state_store
.buffered_state()
.lock()
.current_state()
.clone();
let mut in_memory_state = db.state_store.current_state_cloned();
let _ancester = in_memory_state.current.clone();
let mut next_ver: Version = 0;
let mut snapshot_versions = vec![];
Expand Down
5 changes: 1 addition & 4 deletions storage/aptosdb/src/db/fake_aptosdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -994,10 +994,7 @@ mod tests {

let mut in_memory_state = db
.inner
.buffered_state()
.lock()
.current_state()
.clone();
.get_latest_executed_trees().state;

let mut cur_ver: Version = 0;
for (txns_to_commit, ledger_info_with_sigs) in input.iter() {
Expand Down
16 changes: 7 additions & 9 deletions storage/aptosdb/src/db/include/aptosdb_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl DbReader for AptosDB {

fn get_pre_committed_version(&self) -> Result<Option<Version>> {
gauged_api("get_pre_committed_version", || {
Ok(self.ledger_db.metadata_db().get_pre_committed_version())
Ok(self.state_store.current_state().lock().current_version)
})
}

Expand Down Expand Up @@ -529,11 +529,8 @@ impl DbReader for AptosDB {

fn get_latest_executed_trees(&self) -> Result<ExecutedTrees> {
gauged_api("get_latest_executed_trees", || {
let buffered_state = self.state_store.buffered_state().lock();
let num_txns = buffered_state
.current_state()
.current_version
.map_or(0, |v| v + 1);
let current_state = self.state_store.current_state_cloned();
let num_txns = current_state.next_version();

let frozen_subtrees = self
.ledger_db
Expand All @@ -542,7 +539,7 @@ impl DbReader for AptosDB {
let transaction_accumulator =
Arc::new(InMemoryAccumulator::new(frozen_subtrees, num_txns)?);
let executed_trees = ExecutedTrees::new(
Arc::new(buffered_state.current_state().clone()),
Arc::new(current_state),
transaction_accumulator,
);
Ok(executed_trees)
Expand Down Expand Up @@ -643,9 +640,10 @@ impl DbReader for AptosDB {
gauged_api("get_latest_state_checkpoint_version", || {
Ok(self
.state_store
.buffered_state()
.current_state()
.lock()
.current_checkpoint_version())
.base_version
)
})
}

Expand Down
7 changes: 0 additions & 7 deletions storage/aptosdb/src/db/include/aptosdb_testonly.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::state_store::buffered_state::BufferedState;
use aptos_config::config::{ BUFFERED_STATE_TARGET_ITEMS_FOR_TEST, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD};
use aptos_infallible::Mutex;
use aptos_types::state_store::{create_empty_sharded_state_updates, ShardedStateUpdates};
use std::default::Default;
use aptos_storage_interface::cached_state_view::ShardedStateCache;
Expand Down Expand Up @@ -91,11 +89,6 @@ impl AptosDB {
)
}

/// This gets the current buffered_state in StateStore.
pub fn buffered_state(&self) -> &Mutex<BufferedState> {
self.state_store.buffered_state()
}

pub(crate) fn state_merkle_db(&self) -> Arc<StateMerkleDb> {
self.state_store.state_db.state_merkle_db.clone()
}
Expand Down
20 changes: 11 additions & 9 deletions storage/aptosdb/src/db/include/aptosdb_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ impl DbWriter for AptosDB {
sync_commit || chunk.is_reconfig,
)?;
}
self.ledger_db.metadata_db().set_pre_committed_version(chunk.expect_last_version());

*self.state_store.current_state().lock() = chunk.latest_in_memory_state.clone();

Ok(())
})
}
Expand Down Expand Up @@ -208,7 +210,6 @@ impl DbWriter for AptosDB {
.save_min_readable_version(version)?;

restore_utils::update_latest_ledger_info(self.ledger_db.metadata_db(), ledger_infos)?;
self.ledger_db.metadata_db().set_pre_committed_version(version);
self.state_store.reset();

Ok(())
Expand Down Expand Up @@ -238,19 +239,20 @@ impl AptosDB {

let num_transactions_in_db = self.get_pre_committed_version()?.map_or(0, |v| v + 1);
{
let buffered_state = self.state_store.buffered_state().lock();
let current_state_guard = self.state_store.current_state();
let current_state = current_state_guard.lock();
ensure!(
chunk.base_state_version == buffered_state.current_state().base_version,
chunk.base_state_version == current_state.base_version,
"base_state_version {:?} does not equal to the base_version {:?} in buffered state with current version {:?}",
chunk.base_state_version,
buffered_state.current_state().base_version,
buffered_state.current_state().current_version,
current_state.base_version,
current_state.current_version,
);

// Ensure the incoming committing requests are always consecutive and the version in
// buffered state is consistent with that in db.
let next_version_in_buffered_state = buffered_state
.current_state()
let next_version_in_buffered_state =
current_state
.current_version
.map(|version| version + 1)
.unwrap_or(0);
Expand Down Expand Up @@ -560,7 +562,7 @@ impl AptosDB {
version_to_commit: Version,
) -> Result<Option<Version>> {
let old_committed_ver = self.ledger_db.metadata_db().get_synced_version()?;
let pre_committed_ver = self.ledger_db.metadata_db().get_pre_committed_version();
let pre_committed_ver = self.state_store.current_state().lock().current_version;
ensure!(
old_committed_ver.is_none() || version_to_commit >= old_committed_ver.unwrap(),
"Version too old to commit. Committed: {:?}; Trying to commit with LI: {}",
Expand Down
2 changes: 2 additions & 0 deletions storage/aptosdb/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ pub struct AptosDB {
pub(crate) transaction_store: Arc<TransactionStore>,
ledger_pruner: LedgerPrunerManager,
_rocksdb_property_reporter: RocksdbPropertyReporter,
/// This is just to detect concurrent calls to `pre_commit_ledger()`
pre_commit_lock: std::sync::Mutex<()>,
/// This is just to detect concurrent calls to `commit_ledger()`
commit_lock: std::sync::Mutex<()>,
indexer: Option<Indexer>,
skip_index_and_usage: bool,
Expand Down
21 changes: 3 additions & 18 deletions storage/aptosdb/src/db/test_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,7 @@ pub fn test_save_blocks_impl(
let db =
AptosDB::new_for_test_with_buffered_state_target_items(&tmp_dir, snapshot_size_threshold);

let mut in_memory_state = db
.state_store
.buffered_state()
.lock()
.current_state()
.clone();
let mut in_memory_state = db.state_store.current_state_cloned();
let _ancester = in_memory_state.current.clone();
let _usage = _ancester.usage();
let num_batches = input.len();
Expand Down Expand Up @@ -941,12 +936,7 @@ pub fn put_as_state_root(db: &AptosDB, version: Version, key: StateKey, value: S
.metadata_db()
.put::<StateValueSchema>(&(key.clone(), version), &Some(value.clone()))
.unwrap();
let mut in_memory_state = db
.state_store
.buffered_state()
.lock()
.current_state()
.clone();
let mut in_memory_state = db.state_store.current_state_cloned();
in_memory_state.current = smt;
in_memory_state.current_version = Some(version);
in_memory_state.updates_since_base[key.get_shard_id() as usize].insert(key, Some(value));
Expand All @@ -965,12 +955,7 @@ pub fn test_sync_transactions_impl(
let db =
AptosDB::new_for_test_with_buffered_state_target_items(&tmp_dir, snapshot_size_threshold);

let mut in_memory_state = db
.state_store
.buffered_state()
.lock()
.current_state()
.clone();
let mut in_memory_state = db.state_store.current_state_cloned();
let _ancester = in_memory_state.current.clone();
let num_batches = input.len();
let mut cur_ver: Version = 0;
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/db_debugger/truncate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ mod test {
let tmp_dir = TempPath::new();

let db = if input.1 { AptosDB::new_for_test_with_sharding(&tmp_dir, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD) } else { AptosDB::new_for_test(&tmp_dir) };
let mut in_memory_state = db.state_store.buffered_state().lock().current_state().clone();
let mut in_memory_state = db.state_store.current_state_cloned();
let _ancestor = in_memory_state.base.clone();
let mut version = 0;
for (txns_to_commit, ledger_info_with_sigs) in input.0.iter() {
Expand Down
37 changes: 4 additions & 33 deletions storage/aptosdb/src/ledger_db/ledger_metadata_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,12 @@ use anyhow::anyhow;
use aptos_schemadb::{SchemaBatch, DB};
use aptos_storage_interface::{block_info::BlockInfo, db_ensure as ensure, AptosDbError, Result};
use aptos_types::{
account_config::NewBlockEvent,
block_info::BlockHeight,
contract_event::ContractEvent,
epoch_state::EpochState,
ledger_info::LedgerInfoWithSignatures,
state_store::state_storage_usage::StateStorageUsage,
transaction::{AtomicVersion, Version},
account_config::NewBlockEvent, block_info::BlockHeight, contract_event::ContractEvent,
epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures,
state_store::state_storage_usage::StateStorageUsage, transaction::Version,
};
use arc_swap::ArcSwap;
use std::{
ops::Deref,
path::Path,
sync::{atomic::Ordering, Arc},
};
use std::{ops::Deref, path::Path, sync::Arc};

fn get_latest_ledger_info_in_db_impl(db: &DB) -> Result<Option<LedgerInfoWithSignatures>> {
let mut iter = db.iter::<LedgerInfoSchema>()?;
Expand All @@ -45,23 +37,16 @@ pub(crate) struct LedgerMetadataDb {
/// cache it in memory in order to avoid reading DB and deserializing the object frequently. It
/// should be updated every time new ledger info and signatures are persisted.
latest_ledger_info: ArcSwap<Option<LedgerInfoWithSignatures>>,

next_pre_commit_version: AtomicVersion,
}

impl LedgerMetadataDb {
pub(super) fn new(db: Arc<DB>) -> Self {
let latest_ledger_info = get_latest_ledger_info_in_db_impl(&db).expect("DB read failed.");
let latest_ledger_info = ArcSwap::from(Arc::new(latest_ledger_info));

let synced_version =
get_progress(&db, &DbMetadataKey::OverallCommitProgress).expect("DB read failed.");
let next_pre_commit_version = AtomicVersion::new(synced_version.map_or(0, |v| v + 1));

Self {
db,
latest_ledger_info,
next_pre_commit_version,
}
}

Expand Down Expand Up @@ -92,20 +77,6 @@ impl LedgerMetadataDb {
get_progress(&self.db, &DbMetadataKey::OverallCommitProgress)
}

pub(crate) fn get_pre_committed_version(&self) -> Option<Version> {
let next_version = self.next_pre_commit_version.load(Ordering::Acquire);
if next_version == 0 {
None
} else {
Some(next_version - 1)
}
}

pub(crate) fn set_pre_committed_version(&self, version: Version) {
self.next_pre_commit_version
.store(version + 1, Ordering::Release);
}

pub(crate) fn get_ledger_commit_progress(&self) -> Result<Version> {
get_progress(&self.db, &DbMetadataKey::LedgerCommitProgress)?
.ok_or_else(|| AptosDbError::NotFound("No LedgerCommitProgress in db.".to_string()))
Expand Down
Loading

0 comments on commit 431249e

Please sign in to comment.