Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Nov 4, 2024
1 parent 551ea6e commit ea0ecfe
Show file tree
Hide file tree
Showing 20 changed files with 320 additions and 380 deletions.
1 change: 1 addition & 0 deletions execution/executor-benchmark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ impl GasMeasurement {
}
}

/// FIXME(aldenhu): update
static OTHER_LABELS: &[(&str, bool, &str)] = &[
("1.", true, "verified_state_view"),
("2.", true, "state_checkpoint"),
Expand Down
15 changes: 7 additions & 8 deletions execution/executor-types/src/state_checkpoint_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ use crate::transactions_with_output::TransactionsWithOutput;
use aptos_crypto::HashValue;
use aptos_drop_helper::DropHelper;
use aptos_storage_interface::state_delta::StateDelta;
use aptos_types::{state_store::ShardedStateUpdates, transaction::TransactionStatus};
use aptos_types::{
state_store::{state_key::StateKey, state_value::StateValue},
transaction::TransactionStatus,
};
use derive_more::Deref;
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

#[derive(Default)]
pub struct TransactionsByStatus {
Expand Down Expand Up @@ -69,15 +72,13 @@ impl StateCheckpointOutput {
pub fn new(
parent_state: Arc<StateDelta>,
result_state: Arc<StateDelta>,
state_updates_before_last_checkpoint: Option<ShardedStateUpdates>,
per_version_state_updates: Vec<ShardedStateUpdates>,
state_updates_before_last_checkpoint: Option<HashMap<StateKey, Option<StateValue>>>,
state_checkpoint_hashes: Vec<Option<HashValue>>,
) -> Self {
Self::new_impl(Inner {
parent_state,
result_state,
state_updates_before_last_checkpoint,
per_version_state_updates,
state_checkpoint_hashes,
})
}
Expand All @@ -87,7 +88,6 @@ impl StateCheckpointOutput {
parent_state: state.clone(),
result_state: state,
state_updates_before_last_checkpoint: None,
per_version_state_updates: vec![],
state_checkpoint_hashes: vec![],
})
}
Expand All @@ -111,8 +111,7 @@ impl StateCheckpointOutput {
pub struct Inner {
pub parent_state: Arc<StateDelta>,
pub result_state: Arc<StateDelta>,
pub state_updates_before_last_checkpoint: Option<ShardedStateUpdates>,
pub per_version_state_updates: Vec<ShardedStateUpdates>,
pub state_updates_before_last_checkpoint: Option<HashMap<StateKey, Option<StateValue>>>,
pub state_checkpoint_hashes: Vec<Option<HashValue>>,
}

Expand Down
1 change: 0 additions & 1 deletion execution/executor-types/src/state_compute_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ impl StateComputeResult {
transactions: self.execution_output.to_commit.txns(),
transaction_outputs: self.execution_output.to_commit.transaction_outputs(),
transaction_infos: &self.ledger_update_output.transaction_infos,
per_version_state_updates: &self.state_checkpoint_output.per_version_state_updates,
base_state_version: self.state_checkpoint_output.parent_state.base_version,
latest_in_memory_state: &self.state_checkpoint_output.result_state,
state_updates_until_last_checkpoint: self
Expand Down
162 changes: 52 additions & 110 deletions execution/executor/src/types/in_memory_state_calculator_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ use aptos_storage_interface::{
};
use aptos_types::{
state_store::{
create_empty_sharded_state_updates, state_key::StateKey,
state_storage_usage::StateStorageUsage, state_value::StateValue, ShardedStateUpdates,
state_key::StateKey, state_storage_usage::StateStorageUsage, state_value::StateValue,
},
transaction::Version,
transaction::{TransactionOutput, Version},
write_set::{TransactionWrite, WriteSet},
};
use arr_macro::arr;
use dashmap::DashMap;
use itertools::zip_eq;
use itertools::Itertools;
use rayon::prelude::*;
use std::{collections::HashMap, ops::Deref, sync::Arc};

Expand All @@ -42,19 +40,18 @@ impl InMemoryStateCalculatorV2 {
Self::validate_input_for_block(parent_state, &execution_output.to_commit)?;
}

let state_updates_vec = Self::get_sharded_state_updates(
execution_output.to_commit.transaction_outputs(),
|txn_output| txn_output.write_set(),
);

// If there are multiple checkpoints in the chunk, we only calculate the SMT (and its root
// hash) for the last one.
let last_checkpoint_index = execution_output.to_commit.get_last_checkpoint_index();

Self::calculate_impl(
parent_state,
&execution_output.state_cache,
state_updates_vec,
execution_output
.to_commit
.transaction_outputs
.iter()
.map(TransactionOutput::write_set),
last_checkpoint_index,
execution_output.is_block,
known_state_checkpoints,
Expand All @@ -67,22 +64,20 @@ impl InMemoryStateCalculatorV2 {
last_checkpoint_index: Option<usize>,
write_sets: &[WriteSet],
) -> Result<StateCheckpointOutput> {
let state_updates_vec = Self::get_sharded_state_updates(write_sets, |write_set| write_set);

Self::calculate_impl(
parent_state,
state_cache,
state_updates_vec,
write_sets,
last_checkpoint_index,
false,
Option::<Vec<_>>::None,
)
}

fn calculate_impl(
fn calculate_impl<'a>(
parent_state: &Arc<StateDelta>,
state_cache: &StateCache,
state_updates_vec: Vec<ShardedStateUpdates>,
write_sets: impl IntoIterator<Item = &'a WriteSet>,
last_checkpoint_index: Option<usize>,
is_block: bool,
known_state_checkpoints: Option<impl IntoIterator<Item = Option<HashValue>>>,
Expand All @@ -96,25 +91,24 @@ impl InMemoryStateCalculatorV2 {
} = state_cache;
assert!(frozen_base.smt.is_the_same(&parent_state.current));

let write_sets = write_sets.into_iter().collect_vec();
let num_txns = write_sets.len();
let (updates_before_last_checkpoint, updates_after_last_checkpoint) =
if let Some(index) = last_checkpoint_index {
(
Self::calculate_updates(&state_updates_vec[..=index]),
Self::calculate_updates(&state_updates_vec[index + 1..]),
Self::calculate_updates(&write_sets[..=index]),
Self::calculate_updates(&write_sets[index + 1..]),
)
} else {
(
create_empty_sharded_state_updates(),
Self::calculate_updates(&state_updates_vec),
)
(HashMap::new(), Self::calculate_updates(&write_sets))
};
let all_updates = Self::calculate_updates(&write_sets);

let num_txns = state_updates_vec.len();

let usage = Self::calculate_usage(parent_state.current.usage(), sharded_state_cache, &[
&updates_before_last_checkpoint,
&updates_after_last_checkpoint,
]);
let usage = Self::calculate_usage(
parent_state.current.usage(),
sharded_state_cache,
&all_updates,
);

let first_version = parent_state.current_version.map_or(0, |v| v + 1);
let proof_reader = ProofReader::new(proofs);
Expand Down Expand Up @@ -181,11 +175,7 @@ impl InMemoryStateCalculatorV2 {
} else {
let mut updates_since_latest_checkpoint =
parent_state.updates_since_base.deref().deref().clone();
zip_eq(
updates_since_latest_checkpoint.iter_mut(),
updates_after_last_checkpoint,
)
.for_each(|(base, delta)| base.extend(delta));
updates_since_latest_checkpoint.extend(updates_after_last_checkpoint);
updates_since_latest_checkpoint
};

Expand All @@ -209,54 +199,25 @@ impl InMemoryStateCalculatorV2 {
parent_state.clone(),
Arc::new(result_state),
last_checkpoint_index.map(|_| updates_before_last_checkpoint),
state_updates_vec,
state_checkpoint_hashes,
))
}

fn get_sharded_state_updates<'a, T, F>(
outputs: &'a [T],
write_set_fn: F,
) -> Vec<ShardedStateUpdates>
where
T: Sync + 'a,
F: Fn(&'a T) -> &'a WriteSet + Sync,
{
let _timer = OTHER_TIMERS.timer_with(&["get_sharded_state_updates"]);
fn calculate_updates<'a>(
write_sets: &'a [&'a WriteSet],
) -> HashMap<StateKey, Option<StateValue>> {
let _timer = OTHER_TIMERS.timer_with(&["calculate_updates"]);

outputs
.par_iter()
.map(|output| {
let mut updates = arr![HashMap::new(); 16];
write_set_fn(output)
write_sets
.iter()
.flat_map(|write_set| {
write_set
.iter()
.for_each(|(state_key, write_op)| {
updates[state_key.get_shard_id() as usize]
.insert(state_key.clone(), write_op.as_state_value());
});
updates
.map(|(key, op)| (key.clone(), op.as_state_value()))
})
.collect()
}

fn calculate_updates(state_updates_vec: &[ShardedStateUpdates]) -> ShardedStateUpdates {
let _timer = OTHER_TIMERS.timer_with(&["calculate_updates"]);
let mut updates: ShardedStateUpdates = create_empty_sharded_state_updates();
updates
.par_iter_mut()
.enumerate()
.for_each(|(i, per_shard_update)| {
per_shard_update.extend(
state_updates_vec
.iter()
.flat_map(|hms| &hms[i])
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<Vec<_>>(),
)
});
updates
}

fn add_to_delta(
k: &StateKey,
v: &Option<StateValue>,
Expand All @@ -280,46 +241,29 @@ impl InMemoryStateCalculatorV2 {
fn calculate_usage(
old_usage: StateStorageUsage,
sharded_state_cache: &ShardedStateCache,
updates: &[&ShardedStateUpdates; 2],
updates: &HashMap<StateKey, Option<StateValue>>,
) -> StateStorageUsage {
let _timer = OTHER_TIMERS
.with_label_values(&["calculate_usage"])
.start_timer();
if old_usage.is_untracked() {
return StateStorageUsage::new_untracked();
}
let (items_delta, bytes_delta) = updates[0]

let (items_delta, bytes_delta) = updates
.par_iter()
.zip_eq(updates[1].par_iter())
.enumerate()
.map(
|(i, (shard_updates_before_checkpoint, shard_updates_after_checkpoint))| {
let mut items_delta = 0i64;
let mut bytes_delta = 0i64;
let num_updates_before_checkpoint = shard_updates_before_checkpoint.len();
for (index, (k, v)) in shard_updates_before_checkpoint
.iter()
.chain(shard_updates_after_checkpoint.iter())
.enumerate()
{
// Ignore updates before the checkpoint if there is an update for the same
// key after the checkpoint.
if index < num_updates_before_checkpoint
&& shard_updates_after_checkpoint.contains_key(k)
{
continue;
}
Self::add_to_delta(
k,
v,
sharded_state_cache.shard(i as u8),
&mut items_delta,
&mut bytes_delta,
);
}
(items_delta, bytes_delta)
},
)
.map(|(key, value)| {
let mut items_delta = 0i64;
let mut bytes_delta = 0i64;
Self::add_to_delta(
key,
value,
sharded_state_cache.shard(key.get_shard_id()),
&mut items_delta,
&mut bytes_delta,
);
(items_delta, bytes_delta)
})
.reduce(
|| (0i64, 0i64),
|(items_now, bytes_now), (items_delta, bytes_delta)| {
Expand All @@ -334,20 +278,18 @@ impl InMemoryStateCalculatorV2 {

fn make_checkpoint(
latest_checkpoint: FrozenSparseMerkleTree<StateValue>,
updates: &ShardedStateUpdates,
updates: &HashMap<StateKey, Option<StateValue>>,
usage: StateStorageUsage,
proof_reader: &ProofReader,
) -> Result<FrozenSparseMerkleTree<StateValue>> {
let _timer = OTHER_TIMERS.timer_with(&["make_checkpoint"]);

// Update SMT.
//
// TODO(grao): Consider use the sharded updates directly instead of flatten.
let smt_updates: Vec<_> = updates
// TODO(aldenhu): avoid collecting into vec
let smt_updates = updates
.iter()
.flatten()
.map(|(key, value)| (key.hash(), value.as_ref()))
.collect();
.map(|(key, value)| (key.hash(), value.as_ref()));
let new_checkpoint = latest_checkpoint.batch_update(smt_updates, usage, proof_reader)?;
Ok(new_checkpoint)
}
Expand All @@ -365,7 +307,7 @@ impl InMemoryStateCalculatorV2 {
base.current_version,
);
ensure!(
base.updates_since_base.iter().all(|shard| shard.is_empty()),
base.updates_since_base.is_empty(),
"Base state is corrupted, updates_since_base is not empty at a checkpoint."
);

Expand Down
7 changes: 3 additions & 4 deletions storage/aptosdb/src/db/fake_aptosdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,9 @@ impl FakeBufferedState {
new_state_after_checkpoint.base_version > self.state_after_checkpoint.base_version,
"Diff between base and latest checkpoints provided, while they are the same.",
);
combine_sharded_state_updates(
&mut self.state_after_checkpoint.updates_since_base,
updates_until_next_checkpoint_since_current,
);
self.state_after_checkpoint
.updates_since_base
.extend(updates_until_next_checkpoint_since_current);
self.state_after_checkpoint.current = new_state_after_checkpoint.base.clone();
self.state_after_checkpoint.current_version = new_state_after_checkpoint.base_version;
let state_after_checkpoint = self
Expand Down
Loading

0 comments on commit ea0ecfe

Please sign in to comment.