diff --git a/execution/executor-benchmark/src/lib.rs b/execution/executor-benchmark/src/lib.rs index 09f2a5b91558bf..8332c7f0c29745 100644 --- a/execution/executor-benchmark/src/lib.rs +++ b/execution/executor-benchmark/src/lib.rs @@ -508,6 +508,7 @@ impl GasMeasurement { } } +/// FIXME(aldenhu): update static OTHER_LABELS: &[(&str, bool, &str)] = &[ ("1.", true, "verified_state_view"), ("2.", true, "state_checkpoint"), diff --git a/execution/executor-types/src/state_checkpoint_output.rs b/execution/executor-types/src/state_checkpoint_output.rs index c611d6c2c90a00..c5faf719a13967 100644 --- a/execution/executor-types/src/state_checkpoint_output.rs +++ b/execution/executor-types/src/state_checkpoint_output.rs @@ -7,9 +7,12 @@ use crate::transactions_with_output::TransactionsWithOutput; use aptos_crypto::HashValue; use aptos_drop_helper::DropHelper; use aptos_storage_interface::state_delta::StateDelta; -use aptos_types::{state_store::ShardedStateUpdates, transaction::TransactionStatus}; +use aptos_types::{ + state_store::{state_key::StateKey, state_value::StateValue}, + transaction::TransactionStatus, +}; use derive_more::Deref; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; #[derive(Default)] pub struct TransactionsByStatus { @@ -69,15 +72,13 @@ impl StateCheckpointOutput { pub fn new( parent_state: Arc, result_state: Arc, - state_updates_before_last_checkpoint: Option, - per_version_state_updates: Vec, + state_updates_before_last_checkpoint: Option>>, state_checkpoint_hashes: Vec>, ) -> Self { Self::new_impl(Inner { parent_state, result_state, state_updates_before_last_checkpoint, - per_version_state_updates, state_checkpoint_hashes, }) } @@ -87,7 +88,6 @@ impl StateCheckpointOutput { parent_state: state.clone(), result_state: state, state_updates_before_last_checkpoint: None, - per_version_state_updates: vec![], state_checkpoint_hashes: vec![], }) } @@ -111,8 +111,7 @@ impl StateCheckpointOutput { pub struct Inner { pub parent_state: Arc, pub result_state: Arc, - pub state_updates_before_last_checkpoint: Option, - pub per_version_state_updates: Vec, + pub state_updates_before_last_checkpoint: Option>>, pub state_checkpoint_hashes: Vec>, } diff --git a/execution/executor-types/src/state_compute_result.rs b/execution/executor-types/src/state_compute_result.rs index cbcfe661a5dd39..f2c7ed3b059d59 100644 --- a/execution/executor-types/src/state_compute_result.rs +++ b/execution/executor-types/src/state_compute_result.rs @@ -157,7 +157,6 @@ impl StateComputeResult { transactions: self.execution_output.to_commit.txns(), transaction_outputs: self.execution_output.to_commit.transaction_outputs(), transaction_infos: &self.ledger_update_output.transaction_infos, - per_version_state_updates: &self.state_checkpoint_output.per_version_state_updates, base_state_version: self.state_checkpoint_output.parent_state.base_version, latest_in_memory_state: &self.state_checkpoint_output.result_state, state_updates_until_last_checkpoint: self diff --git a/execution/executor/src/types/in_memory_state_calculator_v2.rs b/execution/executor/src/types/in_memory_state_calculator_v2.rs index 21987a51c39b3b..7488277555f0e6 100644 --- a/execution/executor/src/types/in_memory_state_calculator_v2.rs +++ b/execution/executor/src/types/in_memory_state_calculator_v2.rs @@ -17,15 +17,13 @@ use aptos_storage_interface::{ }; use aptos_types::{ state_store::{ - create_empty_sharded_state_updates, state_key::StateKey, - state_storage_usage::StateStorageUsage, state_value::StateValue, ShardedStateUpdates, + state_key::StateKey, state_storage_usage::StateStorageUsage, state_value::StateValue, }, - transaction::Version, + transaction::{TransactionOutput, Version}, write_set::{TransactionWrite, WriteSet}, }; -use arr_macro::arr; use dashmap::DashMap; -use itertools::zip_eq; +use itertools::Itertools; use rayon::prelude::*; use std::{collections::HashMap, ops::Deref, sync::Arc}; @@ -42,11 +40,6 @@ impl InMemoryStateCalculatorV2 { Self::validate_input_for_block(parent_state, &execution_output.to_commit)?; } - let state_updates_vec = Self::get_sharded_state_updates( - execution_output.to_commit.transaction_outputs(), - |txn_output| txn_output.write_set(), - ); - // If there are multiple checkpoints in the chunk, we only calculate the SMT (and its root // hash) for the last one. let last_checkpoint_index = execution_output.to_commit.get_last_checkpoint_index(); @@ -54,7 +47,11 @@ impl InMemoryStateCalculatorV2 { Self::calculate_impl( parent_state, &execution_output.state_cache, - state_updates_vec, + execution_output + .to_commit + .transaction_outputs + .iter() + .map(TransactionOutput::write_set), last_checkpoint_index, execution_output.is_block, known_state_checkpoints, @@ -67,22 +64,20 @@ impl InMemoryStateCalculatorV2 { last_checkpoint_index: Option, write_sets: &[WriteSet], ) -> Result { - let state_updates_vec = Self::get_sharded_state_updates(write_sets, |write_set| write_set); - Self::calculate_impl( parent_state, state_cache, - state_updates_vec, + write_sets, last_checkpoint_index, false, Option::>::None, ) } - fn calculate_impl( + fn calculate_impl<'a>( parent_state: &Arc, state_cache: &StateCache, - state_updates_vec: Vec, + write_sets: impl IntoIterator, last_checkpoint_index: Option, is_block: bool, known_state_checkpoints: Option>>, @@ -96,25 +91,24 @@ impl InMemoryStateCalculatorV2 { } = state_cache; assert!(frozen_base.smt.is_the_same(&parent_state.current)); + let write_sets = write_sets.into_iter().collect_vec(); + let num_txns = write_sets.len(); let (updates_before_last_checkpoint, updates_after_last_checkpoint) = if let Some(index) = last_checkpoint_index { ( - Self::calculate_updates(&state_updates_vec[..=index]), - Self::calculate_updates(&state_updates_vec[index + 1..]), + Self::calculate_updates(&write_sets[..=index]), + Self::calculate_updates(&write_sets[index + 1..]), ) } else { - ( - create_empty_sharded_state_updates(), - Self::calculate_updates(&state_updates_vec), - ) + (HashMap::new(), Self::calculate_updates(&write_sets)) }; + let all_updates = Self::calculate_updates(&write_sets); - let num_txns = state_updates_vec.len(); - - let usage = Self::calculate_usage(parent_state.current.usage(), sharded_state_cache, &[ - &updates_before_last_checkpoint, - &updates_after_last_checkpoint, - ]); + let usage = Self::calculate_usage( + parent_state.current.usage(), + sharded_state_cache, + &all_updates, + ); let first_version = parent_state.current_version.map_or(0, |v| v + 1); let proof_reader = ProofReader::new(proofs); @@ -181,11 +175,7 @@ impl InMemoryStateCalculatorV2 { } else { let mut updates_since_latest_checkpoint = parent_state.updates_since_base.deref().deref().clone(); - zip_eq( - updates_since_latest_checkpoint.iter_mut(), - updates_after_last_checkpoint, - ) - .for_each(|(base, delta)| base.extend(delta)); + updates_since_latest_checkpoint.extend(updates_after_last_checkpoint); updates_since_latest_checkpoint }; @@ -209,54 +199,25 @@ impl InMemoryStateCalculatorV2 { parent_state.clone(), Arc::new(result_state), last_checkpoint_index.map(|_| updates_before_last_checkpoint), - state_updates_vec, state_checkpoint_hashes, )) } - fn get_sharded_state_updates<'a, T, F>( - outputs: &'a [T], - write_set_fn: F, - ) -> Vec - where - T: Sync + 'a, - F: Fn(&'a T) -> &'a WriteSet + Sync, - { - let _timer = OTHER_TIMERS.timer_with(&["get_sharded_state_updates"]); + fn calculate_updates<'a>( + write_sets: &'a [&'a WriteSet], + ) -> HashMap> { + let _timer = OTHER_TIMERS.timer_with(&["calculate_updates"]); - outputs - .par_iter() - .map(|output| { - let mut updates = arr![HashMap::new(); 16]; - write_set_fn(output) + write_sets + .iter() + .flat_map(|write_set| { + write_set .iter() - .for_each(|(state_key, write_op)| { - updates[state_key.get_shard_id() as usize] - .insert(state_key.clone(), write_op.as_state_value()); - }); - updates + .map(|(key, op)| (key.clone(), op.as_state_value())) }) .collect() } - fn calculate_updates(state_updates_vec: &[ShardedStateUpdates]) -> ShardedStateUpdates { - let _timer = OTHER_TIMERS.timer_with(&["calculate_updates"]); - let mut updates: ShardedStateUpdates = create_empty_sharded_state_updates(); - updates - .par_iter_mut() - .enumerate() - .for_each(|(i, per_shard_update)| { - per_shard_update.extend( - state_updates_vec - .iter() - .flat_map(|hms| &hms[i]) - .map(|(k, v)| (k.clone(), v.clone())) - .collect::>(), - ) - }); - updates - } - fn add_to_delta( k: &StateKey, v: &Option, @@ -280,7 +241,7 @@ impl InMemoryStateCalculatorV2 { fn calculate_usage( old_usage: StateStorageUsage, sharded_state_cache: &ShardedStateCache, - updates: &[&ShardedStateUpdates; 2], + updates: &HashMap>, ) -> StateStorageUsage { let _timer = OTHER_TIMERS .with_label_values(&["calculate_usage"]) @@ -288,38 +249,21 @@ impl InMemoryStateCalculatorV2 { if old_usage.is_untracked() { return StateStorageUsage::new_untracked(); } - let (items_delta, bytes_delta) = updates[0] + + let (items_delta, bytes_delta) = updates .par_iter() - .zip_eq(updates[1].par_iter()) - .enumerate() - .map( - |(i, (shard_updates_before_checkpoint, shard_updates_after_checkpoint))| { - let mut items_delta = 0i64; - let mut bytes_delta = 0i64; - let num_updates_before_checkpoint = shard_updates_before_checkpoint.len(); - for (index, (k, v)) in shard_updates_before_checkpoint - .iter() - .chain(shard_updates_after_checkpoint.iter()) - .enumerate() - { - // Ignore updates before the checkpoint if there is an update for the same - // key after the checkpoint. - if index < num_updates_before_checkpoint - && shard_updates_after_checkpoint.contains_key(k) - { - continue; - } - Self::add_to_delta( - k, - v, - sharded_state_cache.shard(i as u8), - &mut items_delta, - &mut bytes_delta, - ); - } - (items_delta, bytes_delta) - }, - ) + .map(|(key, value)| { + let mut items_delta = 0i64; + let mut bytes_delta = 0i64; + Self::add_to_delta( + key, + value, + sharded_state_cache.shard(key.get_shard_id()), + &mut items_delta, + &mut bytes_delta, + ); + (items_delta, bytes_delta) + }) .reduce( || (0i64, 0i64), |(items_now, bytes_now), (items_delta, bytes_delta)| { @@ -334,7 +278,7 @@ impl InMemoryStateCalculatorV2 { fn make_checkpoint( latest_checkpoint: FrozenSparseMerkleTree, - updates: &ShardedStateUpdates, + updates: &HashMap>, usage: StateStorageUsage, proof_reader: &ProofReader, ) -> Result> { @@ -342,12 +286,10 @@ impl InMemoryStateCalculatorV2 { // Update SMT. // - // TODO(grao): Consider use the sharded updates directly instead of flatten. - let smt_updates: Vec<_> = updates + // TODO(aldenhu): avoid collecting into vec + let smt_updates = updates .iter() - .flatten() - .map(|(key, value)| (key.hash(), value.as_ref())) - .collect(); + .map(|(key, value)| (key.hash(), value.as_ref())); let new_checkpoint = latest_checkpoint.batch_update(smt_updates, usage, proof_reader)?; Ok(new_checkpoint) } @@ -365,7 +307,7 @@ impl InMemoryStateCalculatorV2 { base.current_version, ); ensure!( - base.updates_since_base.iter().all(|shard| shard.is_empty()), + base.updates_since_base.is_empty(), "Base state is corrupted, updates_since_base is not empty at a checkpoint." ); diff --git a/storage/aptosdb/src/db/fake_aptosdb.rs b/storage/aptosdb/src/db/fake_aptosdb.rs index 948d7e95042804..254cad17e35938 100644 --- a/storage/aptosdb/src/db/fake_aptosdb.rs +++ b/storage/aptosdb/src/db/fake_aptosdb.rs @@ -118,10 +118,9 @@ impl FakeBufferedState { new_state_after_checkpoint.base_version > self.state_after_checkpoint.base_version, "Diff between base and latest checkpoints provided, while they are the same.", ); - combine_sharded_state_updates( - &mut self.state_after_checkpoint.updates_since_base, - updates_until_next_checkpoint_since_current, - ); + self.state_after_checkpoint + .updates_since_base + .extend(updates_until_next_checkpoint_since_current); self.state_after_checkpoint.current = new_state_after_checkpoint.base.clone(); self.state_after_checkpoint.current_version = new_state_after_checkpoint.base_version; let state_after_checkpoint = self diff --git a/storage/aptosdb/src/db/include/aptosdb_testonly.rs b/storage/aptosdb/src/db/include/aptosdb_testonly.rs index 8389c0549b44ea..ca1e46d8cd9c0a 100644 --- a/storage/aptosdb/src/db/include/aptosdb_testonly.rs +++ b/storage/aptosdb/src/db/include/aptosdb_testonly.rs @@ -1,14 +1,15 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashMap; use crate::state_store::buffered_state::BufferedState; use aptos_config::config::{ BUFFERED_STATE_TARGET_ITEMS_FOR_TEST, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD}; use aptos_infallible::Mutex; -use aptos_types::state_store::{create_empty_sharded_state_updates, ShardedStateUpdates}; use std::default::Default; use aptos_storage_interface::cached_state_view::ShardedStateCache; use aptos_storage_interface::state_delta::StateDelta; use aptos_types::transaction::{TransactionStatus, TransactionToCommit}; +use aptos_types::write_set::TransactionWrite; impl AptosDB { /// This opens db in non-readonly mode, without the pruner. @@ -133,8 +134,7 @@ pub struct ChunkToCommitOwned { transaction_infos: Vec, base_state_version: Option, latest_in_memory_state: Arc, - per_version_state_updates: Vec, - state_updates_until_last_checkpoint: Option, + state_updates_until_last_checkpoint: Option>>, sharded_state_cache: ShardedStateCache, is_reconfig: bool, } @@ -146,13 +146,13 @@ impl ChunkToCommitOwned { base_state_version: Option, latest_in_memory_state: &StateDelta, ) -> Self { - let (transactions, transaction_outputs, transaction_infos, per_version_state_updates) = Self::disassemble_txns_to_commit(txns_to_commit); + let (transactions, transaction_outputs, transaction_infos) = Self::disassemble_txns_to_commit(txns_to_commit); let state_updates_until_last_checkpoint = Self::gather_state_updates_until_last_checkpoint( first_version, latest_in_memory_state, - &per_version_state_updates, &transaction_infos, + &transaction_outputs, ); Self { @@ -162,7 +162,6 @@ impl ChunkToCommitOwned { transaction_infos, base_state_version, latest_in_memory_state: Arc::new(latest_in_memory_state.clone()), - per_version_state_updates, state_updates_until_last_checkpoint, sharded_state_cache: ShardedStateCache::default(), is_reconfig: false, @@ -177,7 +176,6 @@ impl ChunkToCommitOwned { transaction_infos: &self.transaction_infos, base_state_version: self.base_state_version, latest_in_memory_state: &self.latest_in_memory_state, - per_version_state_updates: &self.per_version_state_updates, state_updates_until_last_checkpoint: self.state_updates_until_last_checkpoint.as_ref(), sharded_state_cache: &self.sharded_state_cache, is_reconfig: self.is_reconfig, @@ -185,11 +183,11 @@ impl ChunkToCommitOwned { } fn disassemble_txns_to_commit(txns_to_commit: &[TransactionToCommit]) -> ( - Vec, Vec, Vec, Vec, + Vec, Vec, Vec ) { txns_to_commit.iter().map(|txn_to_commit| { let TransactionToCommit { - transaction, transaction_info, state_updates, write_set, events, is_reconfig: _, transaction_auxiliary_data + transaction, transaction_info, write_set, events, is_reconfig: _, transaction_auxiliary_data } = txn_to_commit; let transaction_output = TransactionOutput::new( @@ -200,16 +198,16 @@ impl ChunkToCommitOwned { transaction_auxiliary_data.clone(), ); - (transaction.clone(), transaction_output, transaction_info.clone(), state_updates.clone()) + (transaction.clone(), transaction_output, transaction_info.clone()) }).multiunzip() } pub fn gather_state_updates_until_last_checkpoint( first_version: Version, latest_in_memory_state: &StateDelta, - per_version_state_updates: &[ShardedStateUpdates], transaction_infos: &[TransactionInfo], - ) -> Option { + transaction_outputs: &[TransactionOutput], + ) -> Option>> { if let Some(latest_checkpoint_version) = latest_in_memory_state.base_version { if latest_checkpoint_version >= first_version { let idx = (latest_checkpoint_version - first_version) as usize; @@ -219,15 +217,9 @@ impl ChunkToCommitOwned { latest_checkpoint_version, first_version + idx as u64 ); - let mut sharded_state_updates = create_empty_sharded_state_updates(); - sharded_state_updates.par_iter_mut().enumerate().for_each( - |(shard_id, state_updates_shard)| { - per_version_state_updates[..=idx].iter().for_each(|updates| { - state_updates_shard.extend(updates[shard_id].clone()); - }) - }, - ); - return Some(sharded_state_updates); + return Some(transaction_outputs[..=idx].iter().flat_map(|txn_out| { + txn_out.write_set().iter().map(|(k, op)| (k.clone(), op.as_state_value())) + }).collect()) } } diff --git a/storage/aptosdb/src/db/include/aptosdb_writer.rs b/storage/aptosdb/src/db/include/aptosdb_writer.rs index a098cd4966aec1..b24190a6ea54d4 100644 --- a/storage/aptosdb/src/db/include/aptosdb_writer.rs +++ b/storage/aptosdb/src/db/include/aptosdb_writer.rs @@ -343,7 +343,7 @@ impl AptosDB { // TODO(grao): Make state_store take sharded state updates. self.state_store.put_value_sets( - chunk.per_version_state_updates, + chunk.transaction_outputs.iter().map(TransactionOutput::write_set), chunk.first_version, chunk.latest_in_memory_state.current.usage(), Some(chunk.sharded_state_cache), diff --git a/storage/aptosdb/src/db/test_helper.rs b/storage/aptosdb/src/db/test_helper.rs index 9128226c2c4e3e..6b2b1b7f16ed2c 100644 --- a/storage/aptosdb/src/db/test_helper.rs +++ b/storage/aptosdb/src/db/test_helper.rs @@ -33,9 +33,8 @@ use aptos_types::{ proptest_types::{AccountInfoUniverse, BlockGen}, state_store::{state_key::StateKey, state_value::StateValue}, transaction::{Transaction, TransactionInfo, TransactionToCommit, Version}, + write_set::TransactionWrite, }; -#[cfg(test)] -use arr_macro::arr; use proptest::{collection::vec, prelude::*, sample::Index}; use std::{collections::HashMap, fmt::Debug}; @@ -69,11 +68,11 @@ pub(crate) fn update_store( enable_sharding: bool, ) -> HashValue { use aptos_storage_interface::{jmt_update_refs, jmt_updates}; + use aptos_types::write_set::WriteSet; + let mut root_hash = *aptos_crypto::hash::SPARSE_MERKLE_PLACEHOLDER_HASH; for (i, (key, value)) in input.enumerate() { let value_state_set = vec![(&key, value.as_ref())].into_iter().collect(); - let mut sharded_value_state_set = arr![HashMap::new(); 16]; - sharded_value_state_set[key.get_shard_id() as usize].insert(key.clone(), value.clone()); let jmt_updates = jmt_updates(&value_state_set); let version = first_version + i as Version; root_hash = store @@ -88,7 +87,7 @@ pub(crate) fn update_store( let schema_batch = SchemaBatch::new(); store .put_value_sets( - &[sharded_value_state_set], + &[WriteSet::new_for_test([(key.clone(), value.clone())])], version, StateStorageUsage::new_untracked(), None, @@ -114,14 +113,11 @@ pub(crate) fn update_store( pub fn update_in_memory_state(state: &mut StateDelta, txns_to_commit: &[TransactionToCommit]) { let mut next_version = state.current_version.map_or(0, |v| v + 1); for txn_to_commit in txns_to_commit { - txn_to_commit - .state_updates() - .iter() - .flatten() - .for_each(|(key, value)| { - state.updates_since_base[key.get_shard_id() as usize] - .insert(key.clone(), value.clone()); - }); + txn_to_commit.write_set.iter().for_each(|(key, op)| { + state + .updates_since_base + .insert(key.clone(), op.as_state_value()); + }); next_version += 1; if txn_to_commit.has_state_checkpoint_hash() { state.current = state @@ -130,7 +126,6 @@ pub fn update_in_memory_state(state: &mut StateDelta, txns_to_commit: &[Transact state .updates_since_base .iter() - .flatten() .map(|(k, v)| (k.hash(), v.as_ref())) .collect(), &ProofReader::new_empty(), @@ -139,9 +134,7 @@ pub fn update_in_memory_state(state: &mut StateDelta, txns_to_commit: &[Transact state.current_version = next_version.checked_sub(1); state.base = state.current.clone(); state.base_version = state.current_version; - state.updates_since_base.iter_mut().for_each(|shard| { - shard.clear(); - }); + state.updates_since_base.clear(); } } @@ -152,7 +145,6 @@ pub fn update_in_memory_state(state: &mut StateDelta, txns_to_commit: &[Transact state .updates_since_base .iter() - .flatten() .map(|(k, v)| (k.hash(), v.as_ref())) .collect(), &ProofReader::new_empty(), @@ -326,9 +318,8 @@ fn gen_snapshot_version( updates.extend( txns_to_commit[0..=idx] .iter() - .flat_map(|x| x.state_updates().clone()) - .flatten() - .collect::>(), + .flat_map(|x| x.write_set().iter()) + .map(|(k, op)| (k.clone(), op.as_state_value())), ); if updates.len() >= threshold { snapshot_version = Some(cur_ver + idx as u64); @@ -337,17 +328,15 @@ fn gen_snapshot_version( updates.extend( txns_to_commit[idx + 1..] .iter() - .flat_map(|x| x.state_updates().clone()) - .flatten() - .collect::>(), + .flat_map(|t| t.write_set().iter()) + .map(|(k, op)| (k.clone(), op.as_state_value())), ); } else { updates.extend( txns_to_commit .iter() - .flat_map(|x| x.state_updates().clone()) - .flatten() - .collect::>(), + .flat_map(|x| x.write_set().iter()) + .map(|(k, op)| (k.clone(), op.as_state_value())), ); } snapshot_version @@ -457,7 +446,7 @@ fn verify_snapshots( txns_to_commit: Vec<&TransactionToCommit>, ) { let mut cur_version = start_version; - let mut updates: HashMap<&StateKey, Option<&StateValue>> = HashMap::new(); + let mut updates: HashMap> = HashMap::new(); for snapshot_version in snapshot_versions { let start = (cur_version - start_version) as usize; let end = (snapshot_version - start_version) as usize; @@ -472,19 +461,14 @@ fn verify_snapshots( updates.extend( txns_to_commit[start..=end] .iter() - .flat_map(|x| { - x.state_updates() - .iter() - .flatten() - .map(|(k, v_opt)| (k, v_opt.as_ref())) - }) - .collect::>>(), + .flat_map(|x| x.write_set().iter()) + .map(|(k, op)| (k.clone(), op.as_state_value())), ); for (state_key, state_value) in &updates { let (state_value_in_db, proof) = db .get_state_value_with_proof_by_version(state_key, snapshot_version) .unwrap(); - assert_eq!(state_value_in_db.as_ref(), *state_value); + assert_eq!(state_value_in_db.as_ref(), state_value.as_ref()); proof .verify( expected_root_hash, @@ -818,10 +802,11 @@ pub fn verify_committed_transactions( ); // Fetch and verify account states. - for (state_key, state_value) in txn_to_commit.state_updates().iter().flatten() { - updates.insert(state_key, state_value); + for (state_key, write_op) in txn_to_commit.write_set().iter() { + let state_value = write_op.as_state_value(); let state_value_in_db = db.get_state_value_by_version(state_key, cur_ver).unwrap(); - assert_eq!(state_value_in_db, *state_value); + assert_eq!(state_value_in_db, state_value); + updates.insert(state_key, state_value); } if !txn_to_commit.has_state_checkpoint_hash() { @@ -952,7 +937,7 @@ pub fn put_as_state_root(db: &AptosDB, version: Version, key: StateKey, value: S .clone(); in_memory_state.current = smt; in_memory_state.current_version = Some(version); - in_memory_state.updates_since_base[key.get_shard_id() as usize].insert(key, Some(value)); + in_memory_state.updates_since_base.insert(key, Some(value)); db.state_store .buffered_state() .lock() diff --git a/storage/aptosdb/src/pruner/state_merkle_pruner/test.rs b/storage/aptosdb/src/pruner/state_merkle_pruner/test.rs index cc15fc12a7f13c..9a15d2fe6d82de 100644 --- a/storage/aptosdb/src/pruner/state_merkle_pruner/test.rs +++ b/storage/aptosdb/src/pruner/state_merkle_pruner/test.rs @@ -29,8 +29,8 @@ use aptos_types::{ state_value::{StaleStateValueByKeyHashIndex, StaleStateValueIndex, StateValue}, }, transaction::Version, + write_set::WriteSet, }; -use arr_macro::arr; use proptest::{prelude::*, proptest}; use std::{collections::HashMap, sync::Arc}; @@ -39,13 +39,11 @@ fn put_value_set( value_set: Vec<(StateKey, StateValue)>, version: Version, ) -> HashValue { - let mut sharded_value_set = arr![HashMap::new(); 16]; + let write_set = + WriteSet::new_for_test(value_set.iter().map(|(k, v)| (k.clone(), Some(v.clone())))); let value_set: HashMap<_, _> = value_set .iter() - .map(|(key, value)| { - sharded_value_set[key.get_shard_id() as usize].insert(key.clone(), Some(value.clone())); - (key, Some(value)) - }) + .map(|(key, value)| (key, Some(value))) .collect(); let jmt_updates = jmt_updates(&value_set); @@ -63,7 +61,7 @@ fn put_value_set( let enable_sharding = state_store.state_kv_db.enabled_sharding(); state_store .put_value_sets( - &[sharded_value_set], + &[write_set], version, StateStorageUsage::new_untracked(), None, diff --git a/storage/aptosdb/src/state_store/buffered_state.rs b/storage/aptosdb/src/state_store/buffered_state.rs index 2050af1e1de3a1..2b9236527f4c74 100644 --- a/storage/aptosdb/src/state_store/buffered_state.rs +++ b/storage/aptosdb/src/state_store/buffered_state.rs @@ -11,10 +11,11 @@ use aptos_logger::info; use aptos_scratchpad::SmtAncestors; use aptos_storage_interface::{db_ensure as ensure, state_delta::StateDelta, AptosDbError, Result}; use aptos_types::{ - state_store::{combine_sharded_state_updates, state_value::StateValue, ShardedStateUpdates}, + state_store::{state_key::StateKey, state_value::StateValue}, transaction::Version, }; use std::{ + collections::HashMap, sync::{ mpsc, mpsc::{Sender, SyncSender}, @@ -111,12 +112,7 @@ impl BufferedState { let take_out_to_commit = { let state_until_checkpoint = self.state_until_checkpoint.as_ref().expect("Must exist"); - state_until_checkpoint - .updates_since_base - .iter() - .map(|shard| shard.len()) - .sum::() - >= self.target_items + state_until_checkpoint.updates_since_base.len() >= self.target_items || state_until_checkpoint.current_version.map_or(0, |v| v + 1) - state_until_checkpoint.base_version.map_or(0, |v| v + 1) >= TARGET_SNAPSHOT_INTERVAL_IN_VERSION @@ -154,7 +150,9 @@ impl BufferedState { /// This method updates the buffered state with new data. pub fn update( &mut self, - updates_until_next_checkpoint_since_current_option: Option<&ShardedStateUpdates>, + updates_until_next_checkpoint_since_current_option: Option< + &HashMap>, + >, new_state_after_checkpoint: &StateDelta, sync_commit: bool, ) -> Result<()> { @@ -172,9 +170,10 @@ impl BufferedState { new_state_after_checkpoint.base_version > self.state_after_checkpoint.base_version, "Diff between base and latest checkpoints provided, while they are the same.", ); - combine_sharded_state_updates( - &mut self.state_after_checkpoint.updates_since_base, - updates_until_next_checkpoint_since_current, + self.state_after_checkpoint.updates_since_base.extend( + updates_until_next_checkpoint_since_current + .iter() + .map(|(k, v)| (k.clone(), v.clone())), ); self.state_after_checkpoint.current = new_state_after_checkpoint.base.clone(); self.state_after_checkpoint.current_version = new_state_after_checkpoint.base_version; diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index ae59a5159e5321..d7e7fe7695500f 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -48,6 +48,7 @@ use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; use aptos_infallible::Mutex; use aptos_jellyfish_merkle::iterator::JellyfishMerkleIterator; use aptos_logger::info; +use aptos_metrics_core::TimerHelper; use aptos_schemadb::SchemaBatch; use aptos_scratchpad::{SmtAncestors, SparseMerkleTree}; use aptos_storage_interface::{ @@ -60,19 +61,20 @@ use aptos_storage_interface::{ use aptos_types::{ proof::{definition::LeafCount, SparseMerkleProofExt, SparseMerkleRangeProof}, state_store::{ - create_empty_sharded_state_updates, state_key::{prefix::StateKeyPrefix, StateKey}, state_storage_usage::StateStorageUsage, state_value::{ StaleStateValueByKeyHashIndex, StaleStateValueIndex, StateValue, StateValueChunkWithProof, }, - ShardedStateUpdates, StateViewId, + StateViewId, }, transaction::Version, write_set::{TransactionWrite, WriteSet}, }; +use arr_macro::arr; use claims::{assert_ge, assert_le}; +use itertools::Itertools; use rayon::prelude::*; use std::{collections::HashSet, ops::Deref, sync::Arc}; @@ -92,6 +94,12 @@ const MAX_WRITE_SETS_AFTER_SNAPSHOT: LeafCount = buffered_state::TARGET_SNAPSHOT pub const MAX_COMMIT_PROGRESS_DIFFERENCE: u64 = 1_000_000; +type ShardedKvUpdates = [Vec<((StateKey, Version), Option)>; NUM_STATE_SHARDS]; + +fn empty_kv_updates() -> ShardedKvUpdates { + arr![vec![]; 16] +} + pub(crate) struct StateDb { pub ledger_db: Arc, pub state_merkle_db: Arc, @@ -648,20 +656,8 @@ impl StateStore { sharded_state_kv_batches: &ShardedStateKvSchemaBatch, enable_sharding: bool, ) -> Result<()> { - let value_state_sets: Vec = write_sets - .iter() - .map(|ws| { - let mut sharded_state_updates = create_empty_sharded_state_updates(); - ws.iter().for_each(|(key, value)| { - sharded_state_updates[key.get_shard_id() as usize] - .insert(key.clone(), value.as_state_value()); - }); - sharded_state_updates - }) - .collect::>(); - self.put_value_sets( - &value_state_sets, + &write_sets, first_version, StateStorageUsage::new_untracked(), None, // state cache @@ -673,9 +669,9 @@ impl StateStore { } /// Put the `value_state_sets` into its own CF. - pub fn put_value_sets( + pub fn put_value_sets<'a>( &self, - value_state_sets: &[ShardedStateUpdates], + write_sets: impl IntoIterator, first_version: Version, expected_usage: StateStorageUsage, sharded_state_cache: Option<&ShardedStateCache>, @@ -684,13 +680,15 @@ impl StateStore { enable_sharding: bool, last_checkpoint_index: Option, ) -> Result<()> { - let _timer = OTHER_TIMERS_SECONDS - .with_label_values(&["put_value_sets"]) - .start_timer(); + let _timer = OTHER_TIMERS_SECONDS.timer_with(&["put_value_sets"]); + + let (kv_updates_per_shard, num_versions) = + Self::get_sharded_kv_updates(first_version, write_sets); self.put_stats_and_indices( - value_state_sets, + &kv_updates_per_shard, first_version, + num_versions, expected_usage, sharded_state_cache, ledger_batch, @@ -699,49 +697,56 @@ impl StateStore { enable_sharding, )?; - let _timer = OTHER_TIMERS_SECONDS - .with_label_values(&["add_state_kv_batch"]) - .start_timer(); - self.put_state_values( - value_state_sets, - first_version, + &kv_updates_per_shard, sharded_state_kv_batches, enable_sharding, ) } + fn get_sharded_kv_updates<'a>( + first_version: Version, + write_sets: impl IntoIterator, + ) -> (ShardedKvUpdates, usize) { + let _timer = OTHER_TIMERS_SECONDS.timer_with(&["get_sharded_kv_updates"]); + + let mut updates_by_shard = empty_kv_updates(); + let num_versions = write_sets + .into_iter() + .enumerate() + .map(|(idx, write_set)| { + let version = first_version + idx as Version; + write_set.iter().for_each(|(state_key, write_op)| { + updates_by_shard[state_key.get_shard_id() as usize] + .push(((state_key.clone(), version), write_op.as_state_value())); + }); + }) + .count(); + + (updates_by_shard, num_versions) + } + pub fn put_state_values( &self, - value_state_sets: &[ShardedStateUpdates], - first_version: Version, + kv_updates_per_shard: &ShardedKvUpdates, sharded_state_kv_batches: &ShardedStateKvSchemaBatch, enable_sharding: bool, ) -> Result<()> { + let _timer = OTHER_TIMERS_SECONDS.timer_with(&["add_state_kv_batch"]); + sharded_state_kv_batches .par_iter() - .enumerate() - .try_for_each(|(shard_id, batch)| { - value_state_sets - .par_iter() - .enumerate() - .flat_map_iter(|(i, shards)| { - let version = first_version + i as Version; - let kvs = &shards[shard_id]; - kvs.iter().map(move |(k, v)| { - if enable_sharding { - batch.put::( - &(k.clone().hash(), version), - v, - ) - } else { - batch.put::(&(k.clone(), version), v) - } - }) - }) - .collect::>() - })?; - Ok(()) + .zip_eq(kv_updates_per_shard.par_iter()) + .try_for_each(|(batch, updates)| { + updates.iter().try_for_each(|(key_and_ver, val)| { + if enable_sharding { + let (key, ver) = key_and_ver; + batch.put::(&(CryptoHash::hash(key), *ver), val) + } else { + batch.put::(key_and_ver, val) + } + }) + }) } pub fn get_usage(&self, version: Option) -> Result { @@ -762,8 +767,9 @@ impl StateStore { /// extra stale index as 1 cover the latter case. pub fn put_stats_and_indices( &self, - value_state_sets: &[ShardedStateUpdates], + per_shard_kv_updates: &ShardedKvUpdates, first_version: Version, + num_versions: usize, expected_usage: StateStorageUsage, // If not None, it must contains all keys in the value_state_sets. // TODO(grao): Restructure this function. @@ -773,11 +779,7 @@ impl StateStore { last_checkpoint_index: Option, enable_sharding: bool, ) -> Result<()> { - let _timer = OTHER_TIMERS_SECONDS - .with_label_values(&["put_stats_and_indices"]) - .start_timer(); - - let num_versions = value_state_sets.len(); + let _timer = OTHER_TIMERS_SECONDS.timer_with(&["put_stats_and_indices"]); let base_version = first_version.checked_sub(1); let mut usage = self.get_usage(base_version)?; @@ -785,27 +787,29 @@ impl StateStore { let mut state_cache_with_version = &ShardedStateCache::default(); if let Some(base_version) = base_version { - let _timer = OTHER_TIMERS_SECONDS - .with_label_values(&["put_stats_and_indices__total_get"]) - .start_timer(); + let _timer = OTHER_TIMERS_SECONDS.timer_with(&["put_stats_and_indices__total_get"]); if let Some(sharded_state_cache) = sharded_state_cache { // For some entries the base value version is None, here is to fiil those in. // See `ShardedStateCache`. self.prepare_version_in_cache(base_version, sharded_state_cache)?; state_cache_with_version = sharded_state_cache; } else { - let key_set = value_state_sets - .iter() - .flat_map(|sharded_states| sharded_states.iter().flatten()) - .map(|(key, _)| key) - .collect::>(); + // TODO(aldenhu): get all updates from StateDelta directly + let key_set = { + let _timer = OTHER_TIMERS_SECONDS + .timer_with(&["put_stats_and_indices__get_all_updates"]); + per_shard_kv_updates + .iter() + .flatten() + .map(|((key, _ver), _val)| key) + .collect::>() + }; THREAD_MANAGER.get_high_pri_io_pool().scope(|s| { for key in key_set { let cache = state_cache_with_version.shard(key.get_shard_id()); s.spawn(move |_| { let _timer = OTHER_TIMERS_SECONDS - .with_label_values(&["put_stats_and_indices__get_state_value"]) - .start_timer(); + .timer_with(&["put_stats_and_indices__get_state_value"]); let version_and_value = self .state_db .get_state_value_with_version_by_version(key, base_version) @@ -821,26 +825,80 @@ impl StateStore { } } - let _timer = OTHER_TIMERS_SECONDS - .with_label_values(&["put_stats_and_indices__calculate_total_size"]) - .start_timer(); + let usage_deltas = Self::put_stale_state_value_index( + first_version, + num_versions, + per_shard_kv_updates, + sharded_state_kv_batches, + enable_sharding, + &mut state_cache_with_version, + ); + + for i in 0..num_versions { + let mut items_delta = 0; + let mut bytes_delta = 0; + for usage_delta in usage_deltas.iter() { + items_delta += usage_delta[i].0; + bytes_delta += usage_delta[i].1; + } + usage = StateStorageUsage::new( + (usage.items() as i64 + items_delta) as usize, + (usage.bytes() as i64 + bytes_delta) as usize, + ); + if (i == num_versions - 1) || Some(i) == last_checkpoint_index { + let version = first_version + i as u64; + info!("Write usage at version {version}, {usage:?}."); + batch.put::(&version, &usage.into())? + } + } + + if !expected_usage.is_untracked() { + ensure!( + expected_usage == usage, + "Calculated state db usage at version {} not expected. expected: {:?}, calculated: {:?}, base version: {:?}, base version usage: {:?}", + first_version + num_versions as u64 - 1, + expected_usage, + usage, + base_version, + base_version_usage, + ); + } + + STATE_ITEMS.set(usage.items() as i64); + TOTAL_STATE_BYTES.set(usage.bytes() as i64); + + Ok(()) + } + + fn put_stale_state_value_index( + first_version: Version, + num_versions: usize, + per_shard_kv_updates: &ShardedKvUpdates, + sharded_state_kv_batches: &ShardedStateKvSchemaBatch, + enable_sharding: bool, + state_cache_with_version: &mut &ShardedStateCache, + ) -> Vec> { + let _timer = OTHER_TIMERS_SECONDS.timer_with(&["put_stale_kv_index"]); // calculate total state size in bytes let usage_deltas: Vec> = state_cache_with_version .par_iter() + .zip_eq(per_shard_kv_updates.par_iter()) .enumerate() - .map(|(shard_id, cache)| { - let _timer = OTHER_TIMERS_SECONDS - .with_label_values(&[&format!( - "put_stats_and_indices__calculate_total_size__shard_{shard_id}" - )]) - .start_timer(); - let mut usage_delta = Vec::with_capacity(num_versions); - for (idx, kvs) in value_state_sets.iter().enumerate() { - let version = first_version + idx as Version; + .map(|(shard_id, (cache, kv_updates))| { + let _timer = + OTHER_TIMERS_SECONDS.timer_with(&[&format!("put_stale_kv_index__{shard_id}")]); + + let mut usage_delta = vec![]; + let mut iter = kv_updates.iter(); + + // TODO(aldenhu): no need to iter by version after we calcualte the usage elsewhere + for version in first_version..first_version + num_versions as Version { + let ver_iter = iter.take_while_ref(|((_, ver), _)| *ver == version); + let mut items_delta = 0; let mut bytes_delta = 0; - for (key, value) in kvs[shard_id].iter() { + for ((key, _ver), value) in ver_iter { if let Some(value) = value { items_delta += 1; bytes_delta += (key.size() + value.size()) as i64; @@ -918,41 +976,7 @@ impl StateStore { usage_delta }) .collect(); - - for i in 0..num_versions { - let mut items_delta = 0; - let mut bytes_delta = 0; - for usage_delta in usage_deltas.iter() { - items_delta += usage_delta[i].0; - bytes_delta += usage_delta[i].1; - } - usage = StateStorageUsage::new( - (usage.items() as i64 + items_delta) as usize, - (usage.bytes() as i64 + bytes_delta) as usize, - ); - if (i == num_versions - 1) || Some(i) == last_checkpoint_index { - let version = first_version + i as u64; - info!("Write usage at version {version}, {usage:?}."); - batch.put::(&version, &usage.into())? - } - } - - if !expected_usage.is_untracked() { - ensure!( - expected_usage == usage, - "Calculated state db usage at version {} not expected. expected: {:?}, calculated: {:?}, base version: {:?}, base version usage: {:?}", - first_version + value_state_sets.len() as u64 - 1, - expected_usage, - usage, - base_version, - base_version_usage, - ); - } - - STATE_ITEMS.set(usage.items() as i64); - TOTAL_STATE_BYTES.set(usage.bytes() as i64); - - Ok(()) + usage_deltas } pub(crate) fn shard_state_value_batch( diff --git a/storage/aptosdb/src/state_store/state_snapshot_committer.rs b/storage/aptosdb/src/state_store/state_snapshot_committer.rs index fa608ff32c2ea5..c334fea4963a71 100644 --- a/storage/aptosdb/src/state_store/state_snapshot_committer.rs +++ b/storage/aptosdb/src/state_store/state_snapshot_committer.rs @@ -4,6 +4,7 @@ //! This file defines the state snapshot committer running in background thread within StateStore. use crate::{ + common::NUM_STATE_SHARDS, metrics::OTHER_TIMERS_SECONDS, state_store::{ buffered_state::CommitMessage, @@ -12,14 +13,19 @@ use crate::{ }, versioned_node_cache::VersionedNodeCache, }; +use aptos_crypto::hash::CryptoHash; use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; use aptos_logger::trace; +use aptos_metrics_core::TimerHelper; use aptos_scratchpad::SmtAncestors; -use aptos_storage_interface::{jmt_update_refs, jmt_updates, state_delta::StateDelta, Result}; +use aptos_storage_interface::{jmt_update_refs, state_delta::StateDelta, Result}; use aptos_types::state_store::state_value::StateValue; +use arr_macro::arr; +use itertools::Itertools; use rayon::prelude::*; use static_assertions::const_assert; use std::{ + collections::HashMap, sync::{ mpsc, mpsc::{Receiver, SyncSender}, @@ -95,8 +101,35 @@ impl StateSnapshotCommitter { .get_shard_persisted_versions(base_version) .unwrap(); + let jmt_updates_by_shard = { + let _timer = + OTHER_TIMERS_SECONDS.timer_with(&["get_sharded_jmt_updates"]); + let mut ret = arr![HashMap::new(); 16]; + delta_to_commit + .updates_since_base + .iter() + .for_each(|(key, value)| { + ret[key.get_shard_id() as usize].insert( + key, + // value.as_ref().map(|v| (v.hash(), key.clone())), + value, + ); + }); + ret.map(|hash_map| { + hash_map + .into_iter() + .map(|(key, val_opt)| { + ( + key.hash(), + val_opt.as_ref().map(|val| (val.hash(), key.clone())), + ) + }) + .collect_vec() + }) + }; + THREAD_MANAGER.get_non_exe_cpu_pool().install(|| { - (0..16) + (0..NUM_STATE_SHARDS as u8) .into_par_iter() .map(|shard_id| { let node_hashes = delta_to_commit @@ -104,12 +137,7 @@ impl StateSnapshotCommitter { .new_node_hashes_since(&delta_to_commit.base, shard_id); self.state_db.state_merkle_db.merklize_value_set_for_shard( shard_id, - jmt_update_refs(&jmt_updates( - &delta_to_commit.updates_since_base[shard_id as usize] - .iter() - .map(|(k, v)| (k, v.as_ref())) - .collect(), - )), + jmt_update_refs(&jmt_updates_by_shard[shard_id as usize]), Some(&node_hashes), version, base_version, diff --git a/storage/aptosdb/src/state_store/state_store_test.rs b/storage/aptosdb/src/state_store/state_store_test.rs index 5abad363445461..058f058a405ecc 100644 --- a/storage/aptosdb/src/state_store/state_store_test.rs +++ b/storage/aptosdb/src/state_store/state_store_test.rs @@ -25,7 +25,6 @@ use aptos_types::{ state_store::state_key::inner::StateKeyTag, AptosCoinType, }; -use arr_macro::arr; use proptest::{collection::hash_map, prelude::*}; use std::collections::HashMap; @@ -35,13 +34,11 @@ fn put_value_set( version: Version, base_version: Option, ) -> HashValue { - let mut sharded_value_set = arr![HashMap::new(); 16]; + let write_set = + WriteSet::new_for_test(value_set.iter().map(|(k, v)| (k.clone(), Some(v.clone())))); let value_set: HashMap<_, _> = value_set .iter() - .map(|(key, value)| { - sharded_value_set[key.get_shard_id() as usize].insert(key.clone(), Some(value.clone())); - (key, Some(value)) - }) + .map(|(key, value)| (key, Some(value))) .collect(); let jmt_updates = jmt_updates(&value_set); @@ -53,7 +50,7 @@ fn put_value_set( let state_kv_metadata_batch = SchemaBatch::new(); state_store .put_value_sets( - &[sharded_value_set], + &[write_set], version, StateStorageUsage::new_untracked(), None, diff --git a/storage/scratchpad/src/sparse_merkle/mod.rs b/storage/scratchpad/src/sparse_merkle/mod.rs index 6b9f35e1454390..afff0bd188caf5 100644 --- a/storage/scratchpad/src/sparse_merkle/mod.rs +++ b/storage/scratchpad/src/sparse_merkle/mod.rs @@ -492,9 +492,9 @@ where /// all at once. /// Since the tree is immutable, existing tree remains the same and may share parts with the /// new, returned tree. - pub fn batch_update( + pub fn batch_update<'a>( &self, - updates: Vec<(HashValue, Option<&V>)>, + updates: impl IntoIterator)>, usage: StateStorageUsage, proof_reader: &impl ProofRead, ) -> Result { diff --git a/storage/storage-interface/src/chunk_to_commit.rs b/storage/storage-interface/src/chunk_to_commit.rs index 105fee2074d755..93b48e0ebe195a 100644 --- a/storage/storage-interface/src/chunk_to_commit.rs +++ b/storage/storage-interface/src/chunk_to_commit.rs @@ -3,9 +3,10 @@ use crate::{cached_state_view::ShardedStateCache, state_delta::StateDelta}; use aptos_types::{ - state_store::ShardedStateUpdates, + state_store::{state_key::StateKey, state_value::StateValue}, transaction::{Transaction, TransactionInfo, TransactionOutput, Version}, }; +use std::collections::HashMap; #[derive(Clone)] pub struct ChunkToCommit<'a> { @@ -15,8 +16,7 @@ pub struct ChunkToCommit<'a> { pub transaction_infos: &'a [TransactionInfo], pub base_state_version: Option, pub latest_in_memory_state: &'a StateDelta, - pub per_version_state_updates: &'a [ShardedStateUpdates], - pub state_updates_until_last_checkpoint: Option<&'a ShardedStateUpdates>, + pub state_updates_until_last_checkpoint: Option<&'a HashMap>>, pub sharded_state_cache: &'a ShardedStateCache, pub is_reconfig: bool, } diff --git a/storage/storage-interface/src/state_delta.rs b/storage/storage-interface/src/state_delta.rs index ea93d8afa910d5..8d2ee191a3da4a 100644 --- a/storage/storage-interface/src/state_delta.rs +++ b/storage/storage-interface/src/state_delta.rs @@ -6,11 +6,11 @@ use aptos_drop_helper::DropHelper; use aptos_scratchpad::SparseMerkleTree; use aptos_types::{ state_store::{ - combine_sharded_state_updates, create_empty_sharded_state_updates, - state_storage_usage::StateStorageUsage, state_value::StateValue, ShardedStateUpdates, + state_key::StateKey, state_storage_usage::StateStorageUsage, state_value::StateValue, }, transaction::Version, }; +use std::{collections::HashMap, ops::DerefMut}; /// This represents two state sparse merkle trees at their versions in memory with the updates /// reflecting the difference of `current` on top of `base`. @@ -25,7 +25,7 @@ pub struct StateDelta { pub base_version: Option, pub current: SparseMerkleTree, pub current_version: Option, - pub updates_since_base: DropHelper, + pub updates_since_base: DropHelper>>, } impl StateDelta { @@ -34,7 +34,7 @@ impl StateDelta { base_version: Option, current: SparseMerkleTree, current_version: Option, - updates_since_base: ShardedStateUpdates, + updates_since_base: HashMap>, ) -> Self { assert!(base.is_family(¤t)); assert!(base_version.map_or(0, |v| v + 1) <= current_version.map_or(0, |v| v + 1)); @@ -49,13 +49,7 @@ impl StateDelta { pub fn new_empty() -> Self { let smt = SparseMerkleTree::new_empty(); - Self::new( - smt.clone(), - None, - smt, - None, - create_empty_sharded_state_updates(), - ) + Self::new(smt.clone(), None, smt, None, HashMap::new()) } pub fn new_at_checkpoint( @@ -69,13 +63,14 @@ impl StateDelta { checkpoint_version, smt, checkpoint_version, - create_empty_sharded_state_updates(), + HashMap::new(), ) } - pub fn merge(&mut self, other: StateDelta) { + pub fn merge(&mut self, mut other: StateDelta) { assert!(other.follow(self)); - combine_sharded_state_updates(&mut self.updates_since_base, &other.updates_since_base); + self.updates_since_base + .extend(other.updates_since_base.deref_mut().drain()); self.current = other.current; self.current_version = other.current_version; diff --git a/types/src/proptest_types.rs b/types/src/proptest_types.rs index fb26fdf7ce988f..4338137b5fface 100644 --- a/types/src/proptest_types.rs +++ b/types/src/proptest_types.rs @@ -19,7 +19,7 @@ use crate::{ ledger_info::{generate_ledger_info_with_sig, LedgerInfo, LedgerInfoWithSignatures}, on_chain_config::{Features, ValidatorSet}, proof::TransactionInfoListWithProof, - state_store::{state_key::StateKey, state_value::StateValue}, + state_store::state_key::StateKey, transaction::{ block_epilogue::BlockEndInfo, ChangeSet, ExecutionStatus, Module, RawTransaction, Script, SignatureCheckedTransaction, SignedTransaction, Transaction, TransactionArgument, @@ -41,8 +41,6 @@ use aptos_crypto::{ traits::*, HashValue, }; -use arr_macro::arr; -use bytes::Bytes; use move_core_types::language_storage::TypeTag; use proptest::{ collection::{vec, SizeRange}, @@ -53,7 +51,7 @@ use proptest::{ use proptest_derive::Arbitrary; use serde_json::Value; use std::{ - collections::{BTreeMap, BTreeSet, HashMap}, + collections::{BTreeMap, BTreeSet}, iter::Iterator, sync::Arc, }; @@ -793,33 +791,22 @@ impl TransactionToCommitGen { .map(|(index, event_gen)| event_gen.materialize(index, universe)) .collect(); - let (state_updates, write_set): (HashMap<_, _>, BTreeMap<_, _>) = self + let write_set: BTreeMap<_, _> = self .account_state_gens .into_iter() .flat_map(|(index, account_gen)| { account_gen.materialize(index, universe).into_iter().map( move |(state_key, value)| { - ( - ( - state_key.clone(), - Some(StateValue::new_legacy(Bytes::copy_from_slice(&value))), - ), - (state_key, WriteOp::legacy_modification(value.into())), - ) + (state_key, WriteOp::legacy_modification(value.into())) }, ) }) - .unzip(); - let mut sharded_state_updates = arr![HashMap::new(); 16]; - state_updates.into_iter().for_each(|(k, v)| { - sharded_state_updates[k.get_shard_id() as usize].insert(k, v); - }); + .collect(); TransactionToCommit::new( Transaction::UserTransaction(transaction), TransactionInfo::new_placeholder(self.gas_used, None, self.status), - sharded_state_updates, - WriteSetMut::new(write_set).freeze().expect("Cannot fail"), + WriteSet::new(write_set).unwrap(), events, false, /* event_gen never generates reconfig events */ TransactionAuxiliaryData::default(), @@ -1177,7 +1164,6 @@ impl BlockGen { Some(HashValue::random()), ExecutionStatus::Success, ), - arr![HashMap::new(); 16], WriteSet::default(), Vec::new(), false, diff --git a/types/src/transaction/mod.rs b/types/src/transaction/mod.rs index f0601df0074009..85fa9d1015653e 100644 --- a/types/src/transaction/mod.rs +++ b/types/src/transaction/mod.rs @@ -13,7 +13,6 @@ use crate::{ ledger_info::LedgerInfo, on_chain_config::{FeatureFlag, Features}, proof::{TransactionInfoListWithProof, TransactionInfoWithProof}, - state_store::ShardedStateUpdates, transaction::authenticator::{ AccountAuthenticator, AnyPublicKey, AnySignature, SingleKeyAuthenticator, TransactionAuthenticator, @@ -54,8 +53,6 @@ pub mod user_transaction_context; pub mod webauthn; pub use self::block_epilogue::{BlockEndInfo, BlockEpiloguePayload}; -#[cfg(any(test, feature = "fuzzing"))] -use crate::state_store::create_empty_sharded_state_updates; use crate::{ block_metadata_ext::BlockMetadataExt, contract_event::TransactionEvent, executable::ModulePath, fee_statement::FeeStatement, keyless::FederatedKeylessPublicKey, @@ -1515,7 +1512,6 @@ impl Display for TransactionInfo { pub struct TransactionToCommit { pub transaction: Transaction, pub transaction_info: TransactionInfo, - pub state_updates: ShardedStateUpdates, pub write_set: WriteSet, pub events: Vec, pub is_reconfig: bool, @@ -1526,7 +1522,6 @@ impl TransactionToCommit { pub fn new( transaction: Transaction, transaction_info: TransactionInfo, - state_updates: ShardedStateUpdates, write_set: WriteSet, events: Vec, is_reconfig: bool, @@ -1535,7 +1530,6 @@ impl TransactionToCommit { TransactionToCommit { transaction, transaction_info, - state_updates, write_set, events, is_reconfig, @@ -1548,7 +1542,6 @@ impl TransactionToCommit { Self { transaction: Transaction::StateCheckpoint(HashValue::zero()), transaction_info: TransactionInfo::dummy(), - state_updates: create_empty_sharded_state_updates(), write_set: Default::default(), events: vec![], is_reconfig: false, @@ -1608,10 +1601,6 @@ impl TransactionToCommit { self.transaction_info = txn_info } - pub fn state_updates(&self) -> &ShardedStateUpdates { - &self.state_updates - } - pub fn write_set(&self) -> &WriteSet { &self.write_set } diff --git a/types/src/write_set.rs b/types/src/write_set.rs index 159b33e7ba63a7..162461d8733335 100644 --- a/types/src/write_set.rs +++ b/types/src/write_set.rs @@ -461,6 +461,22 @@ impl WriteSet { Self::V0(write_set) => write_set.0, } } + + pub fn new(write_ops: impl IntoIterator) -> Result { + WriteSetMut::new(write_ops).freeze() + } + + pub fn new_for_test(kvs: impl IntoIterator)>) -> Self { + Self::new(kvs.into_iter().map(|(k, v_opt)| { + ( + k, + v_opt.map_or_else(WriteOp::legacy_deletion, |v| { + WriteOp::legacy_modification(v.bytes().clone()) + }), + ) + })) + .expect("Must succeed") + } } impl Deref for WriteSet {