From d9c79707cc069a47f6d13e9dc0d067c51a2aa8fb Mon Sep 17 00:00:00 2001 From: Rati Gelashvili Date: Sat, 5 Oct 2024 02:03:22 +0400 Subject: [PATCH] Resource Groups: don't recompute size, test serialized size, parallel finalization (#14489) * Resource Groups: don't recompute size, test serialized size * Moving finalizing groups to (parallel) materialization stage --- Cargo.lock | 2 - .../src/resource_group_adapter.rs | 101 +- aptos-move/aptos-vm/src/block_executor/mod.rs | 5 + .../src/move_vm_ext/write_op_converter.rs | 98 +- .../block-executor/src/captured_reads.rs | 3 - aptos-move/block-executor/src/executor.rs | 215 ++- .../block-executor/src/executor_utilities.rs | 108 +- .../src/proptest_types/baseline.rs | 7 +- .../src/proptest_types/types.rs | 131 +- aptos-move/block-executor/src/task.rs | 5 +- .../src/txn_last_input_output.rs | 55 +- .../block-executor/src/unit_tests/mod.rs | 57 +- aptos-move/block-executor/src/view.rs | 71 +- aptos-move/mvhashmap/Cargo.toml | 2 - aptos-move/mvhashmap/src/lib.rs | 8 +- aptos-move/mvhashmap/src/types.rs | 28 +- aptos-move/mvhashmap/src/unit_tests/mod.rs | 8 +- .../src/unit_tests/proptest_types.rs | 43 +- aptos-move/mvhashmap/src/unsync_map.rs | 162 +- aptos-move/mvhashmap/src/versioned_data.rs | 78 +- .../mvhashmap/src/versioned_delayed_fields.rs | 2 +- .../mvhashmap/src/versioned_group_data.rs | 1470 ++++++++--------- aptos-move/mvhashmap/src/versioned_modules.rs | 2 +- 23 files changed, 1463 insertions(+), 1198 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d362783ff0d38..ff101bc3974dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2905,8 +2905,6 @@ dependencies = [ "claims", "crossbeam", "dashmap", - "derivative", - "move-binary-format", "move-core-types", "move-vm-types", "proptest", diff --git a/aptos-move/aptos-vm-types/src/resource_group_adapter.rs b/aptos-move/aptos-vm-types/src/resource_group_adapter.rs index 02030e40c5972..87293388dca28 100644 --- a/aptos-move/aptos-vm-types/src/resource_group_adapter.rs +++ b/aptos-move/aptos-vm-types/src/resource_group_adapter.rs @@ -3,7 +3,8 @@ use crate::resolver::{ResourceGroupSize, ResourceGroupView, TResourceGroupView, TResourceView}; use aptos_types::{ - serde_helper::bcs_utils::bcs_size_of_byte_array, state_store::state_key::StateKey, + error::code_invariant_error, serde_helper::bcs_utils::bcs_size_of_byte_array, + state_store::state_key::StateKey, }; use bytes::Bytes; use move_binary_format::errors::{PartialVMError, PartialVMResult}; @@ -273,6 +274,104 @@ impl TResourceGroupView for ResourceGroupAdapter<'_> { } } +// We set SPECULATIVE_EXECUTION_ABORT_ERROR here, as the error can happen due to +// speculative reads (and in a non-speculative context, e.g. during commit, it +// is a more serious error and block execution must abort). +// BlockExecutor is responsible with handling this error. +fn group_size_arithmetics_error() -> PartialVMError { + PartialVMError::new(StatusCode::SPECULATIVE_EXECUTION_ABORT_ERROR) + .with_message("Group size arithmetics error while applying updates".to_string()) +} + +// Updates a given ResourceGroupSize (an abstract representation allowing the computation +// of bcs serialized size) size, to reflect the state after removing a resource in a group +// with size old_tagged_resource_size. +pub fn decrement_size_for_remove_tag( + size: &mut ResourceGroupSize, + old_tagged_resource_size: u64, +) -> PartialVMResult<()> { + match size { + ResourceGroupSize::Concrete(_) => Err(code_invariant_error(format!( + "Unexpected ResourceGroupSize::Concrete in decrement_size_for_add_tag \ + (removing resource w. size = {old_tagged_resource_size})" + )) + .into()), + ResourceGroupSize::Combined { + num_tagged_resources, + all_tagged_resources_size, + } => { + *num_tagged_resources = num_tagged_resources + .checked_sub(1) + .ok_or_else(group_size_arithmetics_error)?; + *all_tagged_resources_size = all_tagged_resources_size + .checked_sub(old_tagged_resource_size) + .ok_or_else(group_size_arithmetics_error)?; + Ok(()) + }, + } +} + +// Updates a given ResourceGroupSize (an abstract representation allowing the computation +// of bcs serialized size) size, to reflect the state after adding a resource in a group +// with size new_tagged_resource_size. +pub fn increment_size_for_add_tag( + size: &mut ResourceGroupSize, + new_tagged_resource_size: u64, +) -> PartialVMResult<()> { + match size { + ResourceGroupSize::Concrete(_) => Err(code_invariant_error(format!( + "Unexpected ResourceGroupSize::Concrete in increment_size_for_add_tag \ + (adding resource w. size = {new_tagged_resource_size})" + )) + .into()), + ResourceGroupSize::Combined { + num_tagged_resources, + all_tagged_resources_size, + } => { + *num_tagged_resources = num_tagged_resources + .checked_add(1) + .ok_or_else(group_size_arithmetics_error)?; + *all_tagged_resources_size = all_tagged_resources_size + .checked_add(new_tagged_resource_size) + .ok_or_else(group_size_arithmetics_error)?; + Ok(()) + }, + } +} + +// Checks an invariant that iff a resource group exists, it must have a > 0 size. +pub fn check_size_and_existence_match( + size: &ResourceGroupSize, + exists: bool, + state_key: &StateKey, +) -> PartialVMResult<()> { + if exists { + if size.get() == 0 { + Err( + PartialVMError::new(StatusCode::SPECULATIVE_EXECUTION_ABORT_ERROR).with_message( + format!( + "Group tag count/size shouldn't be 0 for an existing group: {:?}", + state_key + ), + ), + ) + } else { + Ok(()) + } + } else if size.get() > 0 { + Err( + PartialVMError::new(StatusCode::SPECULATIVE_EXECUTION_ABORT_ERROR).with_message( + format!( + "Group tag count/size should be 0 for a new group: {:?}", + state_key + ), + ), + ) + } else { + Ok(()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index 47a5622ecca58..2f2fd0abdca75 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -33,6 +33,7 @@ use aptos_types::{ use aptos_vm_logging::{flush_speculative_logs, init_speculative_logs}; use aptos_vm_types::{ abstract_write_op::AbstractResourceWriteOp, environment::Environment, output::VMOutput, + resolver::ResourceGroupSize, }; use move_core_types::{ language_storage::StructTag, @@ -118,6 +119,7 @@ impl BlockExecutorTransactionOutput for AptosTransactionOutput { ) -> Vec<( StateKey, WriteOp, + ResourceGroupSize, BTreeMap>)>, )> { self.vm_output @@ -131,6 +133,9 @@ impl BlockExecutorTransactionOutput for AptosTransactionOutput { Some(( key.clone(), group_write.metadata_op().clone(), + group_write + .maybe_group_op_size() + .unwrap_or(ResourceGroupSize::zero_combined()), group_write .inner_ops() .iter() diff --git a/aptos-move/aptos-vm/src/move_vm_ext/write_op_converter.rs b/aptos-move/aptos-vm/src/move_vm_ext/write_op_converter.rs index 36cbdc4a23486..a60e719c90e91 100644 --- a/aptos-move/aptos-vm/src/move_vm_ext/write_op_converter.rs +++ b/aptos-move/aptos-vm/src/move_vm_ext/write_op_converter.rs @@ -9,8 +9,11 @@ use aptos_types::{ write_set::WriteOp, }; use aptos_vm_types::{ - abstract_write_op::GroupWrite, resolver::ResourceGroupSize, - resource_group_adapter::group_tagged_resource_size, + abstract_write_op::GroupWrite, + resource_group_adapter::{ + check_size_and_existence_match, decrement_size_for_remove_tag, group_tagged_resource_size, + increment_size_for_add_tag, + }, }; use bytes::Bytes; use move_binary_format::errors::{PartialVMError, PartialVMResult}; @@ -18,7 +21,6 @@ use move_core_types::{ effects::Op as MoveStorageOp, language_storage::StructTag, value::MoveTypeLayout, vm_status::StatusCode, }; -use move_vm_types::delayed_values::error::code_invariant_error; use std::{collections::BTreeMap, sync::Arc}; pub(crate) struct WriteOpConverter<'r> { @@ -47,96 +49,6 @@ macro_rules! convert_impl { }; } -// We set SPECULATIVE_EXECUTION_ABORT_ERROR here, as the error can happen due to -// speculative reads (and in a non-speculative context, e.g. during commit, it -// is a more serious error and block execution must abort). -// BlockExecutor is responsible with handling this error. -fn group_size_arithmetics_error() -> PartialVMError { - PartialVMError::new(StatusCode::SPECULATIVE_EXECUTION_ABORT_ERROR) - .with_message("Group size arithmetics error while applying updates".to_string()) -} - -fn decrement_size_for_remove_tag( - size: &mut ResourceGroupSize, - old_tagged_resource_size: u64, -) -> PartialVMResult<()> { - match size { - ResourceGroupSize::Concrete(_) => Err(code_invariant_error( - "Unexpected ResourceGroupSize::Concrete in decrement_size_for_remove_tag", - )), - ResourceGroupSize::Combined { - num_tagged_resources, - all_tagged_resources_size, - } => { - *num_tagged_resources = num_tagged_resources - .checked_sub(1) - .ok_or_else(group_size_arithmetics_error)?; - *all_tagged_resources_size = all_tagged_resources_size - .checked_sub(old_tagged_resource_size) - .ok_or_else(group_size_arithmetics_error)?; - Ok(()) - }, - } -} - -fn increment_size_for_add_tag( - size: &mut ResourceGroupSize, - new_tagged_resource_size: u64, -) -> PartialVMResult<()> { - match size { - ResourceGroupSize::Concrete(_) => Err(PartialVMError::new( - StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, - ) - .with_message( - "Unexpected ResourceGroupSize::Concrete in increment_size_for_add_tag".to_string(), - )), - ResourceGroupSize::Combined { - num_tagged_resources, - all_tagged_resources_size, - } => { - *num_tagged_resources = num_tagged_resources - .checked_add(1) - .ok_or_else(group_size_arithmetics_error)?; - *all_tagged_resources_size = all_tagged_resources_size - .checked_add(new_tagged_resource_size) - .ok_or_else(group_size_arithmetics_error)?; - Ok(()) - }, - } -} - -fn check_size_and_existence_match( - size: &ResourceGroupSize, - exists: bool, - state_key: &StateKey, -) -> PartialVMResult<()> { - if exists { - if size.get() == 0 { - Err( - PartialVMError::new(StatusCode::SPECULATIVE_EXECUTION_ABORT_ERROR).with_message( - format!( - "Group tag count/size shouldn't be 0 for an existing group: {:?}", - state_key - ), - ), - ) - } else { - Ok(()) - } - } else if size.get() > 0 { - Err( - PartialVMError::new(StatusCode::SPECULATIVE_EXECUTION_ABORT_ERROR).with_message( - format!( - "Group tag count/size should be 0 for a new group: {:?}", - state_key - ), - ), - ) - } else { - Ok(()) - } -} - impl<'r> WriteOpConverter<'r> { convert_impl!(convert_module, get_module_state_value_metadata); diff --git a/aptos-move/block-executor/src/captured_reads.rs b/aptos-move/block-executor/src/captured_reads.rs index 58176d7e4fb61..7e2f83a74ada5 100644 --- a/aptos-move/block-executor/src/captured_reads.rs +++ b/aptos-move/block-executor/src/captured_reads.rs @@ -622,9 +622,6 @@ impl CapturedReads { Err(Uninitialized) => { unreachable!("May not be uninitialized if captured for validation"); }, - Err(TagSerializationError(_)) => { - unreachable!("Should not require tag serialization"); - }, } }) }) diff --git a/aptos-move/block-executor/src/executor.rs b/aptos-move/block-executor/src/executor.rs index 6b67ac0b2e95b..90be88920d6a1 100644 --- a/aptos-move/block-executor/src/executor.rs +++ b/aptos-move/block-executor/src/executor.rs @@ -15,7 +15,11 @@ use crate::{ scheduler::{DependencyStatus, ExecutionTaskType, Scheduler, SchedulerTask, Wave}, task::{ExecutionStatus, ExecutorTask, TransactionOutput}, txn_commit_hook::TransactionCommitHook, - txn_last_input_output::{KeyKind, TxnLastInputOutput}, + txn_last_input_output::{ + KeyKind, + KeyKind::{Group, Module, Resource}, + TxnLastInputOutput, + }, types::ReadWriteSummary, view::{LatestView, ParallelState, SequentialState, ViewState}, }; @@ -43,7 +47,7 @@ use aptos_types::{ write_set::{TransactionWrite, WriteOp}, }; use aptos_vm_logging::{alert, clear_speculative_txn_logs, init_speculative_logs, prelude::*}; -use aptos_vm_types::change_set::randomly_check_layout_matches; +use aptos_vm_types::{change_set::randomly_check_layout_matches, resolver::ResourceGroupSize}; use bytes::Bytes; use claims::assert_none; use core::panic; @@ -116,7 +120,7 @@ where let execute_result = executor.execute_transaction(&sync_view, txn, idx_to_execute); let mut prev_modified_keys = last_input_output - .modified_keys(idx_to_execute) + .modified_keys::(idx_to_execute) .map_or(HashMap::new(), |keys| keys.collect()); let mut prev_modified_delayed_fields = last_input_output @@ -124,23 +128,52 @@ where .map_or(HashSet::new(), |keys| keys.collect()); let mut read_set = sync_view.take_parallel_reads(); + if read_set.is_incorrect_use() { + return Err(PanicOr::from(code_invariant_error(format!( + "Incorrect use detected in CapturedReads after executing txn = {} incarnation = {}", + idx_to_execute, incarnation + )))); + } // For tracking whether it's required to (re-)validate the suffix of transactions in the block. // May happen, for instance, when the recent execution wrote outside of the previous write/delta // set (vanilla Block-STM rule), or if resource group size or metadata changed from an estimate // (since those resource group validations rely on estimates). let mut needs_suffix_validation = false; + let mut group_keys_and_tags: Vec<(T::Key, HashSet)> = vec![]; let mut apply_updates = |output: &E::Output| -> Result< Vec<(T::Key, Arc, Option>)>, // Cached resource writes PanicError, > { - for (group_key, group_metadata_op, group_ops) in - output.resource_group_write_set().into_iter() - { - if prev_modified_keys.remove(&group_key).is_none() { - // Previously no write to the group at all. - needs_suffix_validation = true; - } + let group_output = output.resource_group_write_set(); + group_keys_and_tags = group_output + .iter() + .map(|(key, _, _, ops)| { + let tags = ops.iter().map(|(tag, _)| tag.clone()).collect(); + (key.clone(), tags) + }) + .collect(); + for (group_key, group_metadata_op, group_size, group_ops) in group_output.into_iter() { + let prev_tags = match prev_modified_keys.remove(&group_key) { + Some(Group(tags)) => tags, + Some(Resource) => { + return Err(code_invariant_error(format!( + "Group key {:?} recorded as resource KeyKind", + group_key, + ))); + }, + Some(Module) => { + return Err(code_invariant_error(format!( + "Group key {:?} recorded as module KeyKind", + group_key, + ))); + }, + None => { + // Previously no write to the group at all. + needs_suffix_validation = true; + HashSet::new() + }, + }; if versioned_cache.data().write_metadata( group_key.clone(), @@ -150,12 +183,15 @@ where ) { needs_suffix_validation = true; } + if versioned_cache.group_data().write( group_key, idx_to_execute, incarnation, group_ops.into_iter(), - ) { + group_size, + prev_tags, + )? { needs_suffix_validation = true; } } @@ -281,7 +317,7 @@ where match kind { Resource => versioned_cache.data().remove(&k, idx_to_execute), Module => versioned_cache.modules().remove(&k, idx_to_execute), - Group => { + Group(tags) => { // A change in state observable during speculative execution // (which includes group metadata and size) changes, suffix // re-validation is needed. For resources where speculative @@ -296,7 +332,9 @@ where needs_suffix_validation = true; versioned_cache.data().remove(&k, idx_to_execute); - versioned_cache.group_data().remove(&k, idx_to_execute); + versioned_cache + .group_data() + .remove(&k, idx_to_execute, tags); }, }; } @@ -305,7 +343,13 @@ where versioned_cache.delayed_fields().remove(&id, idx_to_execute); } - if !last_input_output.record(idx_to_execute, read_set, result, resource_write_set) { + if !last_input_output.record( + idx_to_execute, + read_set, + result, + resource_write_set, + group_keys_and_tags, + ) { // Module R/W is an expected fallback behavior, no alert is required. debug!("[Execution] At txn {}, Module read & write", idx_to_execute); @@ -320,17 +364,16 @@ where idx_to_validate: TxnIndex, last_input_output: &TxnLastInputOutput, versioned_cache: &MVHashMap, - ) -> Result { + ) -> bool { let _timer = TASK_VALIDATE_SECONDS.start_timer(); let read_set = last_input_output .read_set(idx_to_validate) .expect("[BlockSTM]: Prior read-set must be recorded"); - if read_set.is_incorrect_use() { - return Err(code_invariant_error( - "Incorrect use detected in CapturedReads", - )); - } + assert!( + !read_set.is_incorrect_use(), + "Incorrect use must be handled after execution" + ); // Note: we validate delayed field reads only at try_commit. // TODO[agg_v2](optimize): potentially add some basic validation. @@ -340,10 +383,8 @@ where // until commit, but mark as estimates). // TODO: validate modules when there is no r/w fallback. - Ok( - read_set.validate_data_reads(versioned_cache.data(), idx_to_validate) - && read_set.validate_group_reads(versioned_cache.group_data(), idx_to_validate), - ) + read_set.validate_data_reads(versioned_cache.data(), idx_to_validate) + && read_set.validate_group_reads(versioned_cache.group_data(), idx_to_validate) } fn update_transaction_on_abort( @@ -357,16 +398,18 @@ where clear_speculative_txn_logs(txn_idx as usize); // Not valid and successfully aborted, mark the latest write/delta sets as estimates. - if let Some(keys) = last_input_output.modified_keys(txn_idx) { + if let Some(keys) = last_input_output.modified_keys::(txn_idx) { for (k, kind) in keys { use KeyKind::*; match kind { Resource => versioned_cache.data().mark_estimate(&k, txn_idx), Module => versioned_cache.modules().mark_estimate(&k, txn_idx), - Group => { + Group(tags) => { // Validation for both group size and metadata is based on values. // Execution may wait for estimates. - versioned_cache.group_data().mark_estimate(&k, txn_idx); + versioned_cache + .group_data() + .mark_estimate(&k, txn_idx, tags); // Group metadata lives in same versioned cache as data / resources. // We are not marking metadata change as estimate, but after @@ -494,8 +537,7 @@ where scheduler.finish_execution_during_commit(txn_idx)?; - let validation_result = - Self::validate(txn_idx, last_input_output, versioned_cache)?; + let validation_result = Self::validate(txn_idx, last_input_output, versioned_cache); if !validation_result || !Self::validate_commit_ready(txn_idx, versioned_cache, last_input_output) .unwrap_or(false) @@ -546,29 +588,6 @@ where } } - let finalized_groups = groups_to_finalize!(last_input_output, txn_idx) - .map(|((group_key, metadata_op), is_read_needing_exchange)| { - // finalize_group copies Arc of values and the Tags (TODO: optimize as needed). - // TODO[agg_v2]: have a test that fails if we don't do the if. - let finalized_result = if is_read_needing_exchange { - versioned_cache - .group_data() - .get_last_committed_group(&group_key) - } else { - versioned_cache - .group_data() - .finalize_group(&group_key, txn_idx) - }; - map_finalized_group::( - group_key, - finalized_result, - metadata_op, - is_read_needing_exchange, - ) - }) - .collect::, _>>()?; - - last_input_output.record_finalized_group(txn_idx, finalized_groups); defer! { scheduler.add_to_commit_queue(txn_idx); } @@ -678,7 +697,21 @@ where shared_counter, ); let latest_view = LatestView::new(base_view, ViewState::Sync(parallel_state), txn_idx); - let finalized_groups = last_input_output.take_finalized_group(txn_idx); + + let finalized_groups = groups_to_finalize!(last_input_output, txn_idx) + .map(|((group_key, metadata_op), is_read_needing_exchange)| { + let finalize_group = versioned_cache + .group_data() + .finalize_group(&group_key, txn_idx); + + map_finalized_group::( + group_key, + finalize_group, + metadata_op, + is_read_needing_exchange, + ) + }) + .collect::, _>>()?; let materialized_finalized_groups = map_id_to_values_in_group_writes(finalized_groups, &latest_view)?; @@ -817,7 +850,7 @@ where scheduler_task = match scheduler_task { SchedulerTask::ValidationTask(txn_idx, incarnation, wave) => { - let valid = Self::validate(txn_idx, last_input_output, versioned_cache)?; + let valid = Self::validate(txn_idx, last_input_output, versioned_cache); Self::update_on_validation( txn_idx, incarnation, @@ -990,10 +1023,10 @@ where unsync_map.write(key, write_op, layout); } - for (group_key, metadata_op, group_ops) in output.resource_group_write_set().into_iter() { - for (value_tag, (group_op, maybe_layout)) in group_ops.into_iter() { - unsync_map.insert_group_op(&group_key, value_tag, group_op, maybe_layout)?; - } + for (group_key, metadata_op, group_size, group_ops) in + output.resource_group_write_set().into_iter() + { + unsync_map.insert_group_ops(&group_key, group_ops, group_size)?; unsync_map.write(group_key, Arc::new(metadata_op), None); } @@ -1180,22 +1213,26 @@ where // previously failed in bcs serialization for preparing final transaction outputs. // TODO: remove this fallback when txn errors can be created from block executor. - let finalize = |group_key| -> BTreeMap<_, _> { - unsync_map - .finalize_group(&group_key) - .map(|(resource_tag, value_with_layout)| { - let value = match value_with_layout { - ValueWithLayout::RawFromStorage(value) - | ValueWithLayout::Exchanged(value, _) => value, - }; - ( - resource_tag, - value - .extract_raw_bytes() - .expect("Deletions should already be applied"), - ) - }) - .collect() + let finalize = |group_key| -> (BTreeMap<_, _>, ResourceGroupSize) { + let (group, size) = unsync_map.finalize_group(&group_key); + + ( + group + .map(|(resource_tag, value_with_layout)| { + let value = match value_with_layout { + ValueWithLayout::RawFromStorage(value) + | ValueWithLayout::Exchanged(value, _) => value, + }; + ( + resource_tag, + value + .extract_raw_bytes() + .expect("Deletions should already be applied"), + ) + }) + .collect(), + size, + ) }; // The IDs are not exchanged but it doesn't change the types (Bytes) or size. @@ -1207,16 +1244,25 @@ where true }); - let finalized_group = finalize(group_key.clone()); - bcs::to_bytes(&finalized_group).is_err() + let (finalized_group, group_size) = finalize(group_key.clone()); + match bcs::to_bytes(&finalized_group) { + Ok(group) => { + (!finalized_group.is_empty() || group_size.get() != 0) + && group.len() as u64 != group_size.get() + }, + Err(_) => true, + } }) || output.resource_group_write_set().into_iter().any( - |(group_key, _, group_ops)| { + |(group_key, _, output_group_size, group_ops)| { fail_point!("fail-point-resource-group-serialization", |_| { true }); - let mut finalized_group = finalize(group_key); + let (mut finalized_group, group_size) = finalize(group_key); + if output_group_size.get() != group_size.get() { + return false; + } for (value_tag, (group_op, _)) in group_ops { if group_op.is_deletion() { finalized_group.remove(&value_tag); @@ -1229,7 +1275,13 @@ where ); } } - bcs::to_bytes(&finalized_group).is_err() + match bcs::to_bytes(&finalized_group) { + Ok(group) => { + (!finalized_group.is_empty() || group_size.get() != 0) + && group.len() as u64 != group_size.get() + }, + Err(_) => true, + } }, ); @@ -1256,8 +1308,9 @@ where { let finalized_groups = groups_to_finalize!(output,) .map(|((group_key, metadata_op), is_read_needing_exchange)| { - let finalized_group = - Ok(unsync_map.finalize_group(&group_key).collect()); + let (group_ops_iter, group_size) = + unsync_map.finalize_group(&group_key); + let finalized_group = Ok((group_ops_iter.collect(), group_size)); map_finalized_group::( group_key, finalized_group, diff --git a/aptos-move/block-executor/src/executor_utilities.rs b/aptos-move/block-executor/src/executor_utilities.rs index e4fee9852054e..5588ce8da62a3 100644 --- a/aptos-move/block-executor/src/executor_utilities.rs +++ b/aptos-move/block-executor/src/executor_utilities.rs @@ -13,6 +13,7 @@ use aptos_types::{ write_set::TransactionWrite, }; use aptos_vm_logging::{alert, prelude::*}; +use aptos_vm_types::resolver::ResourceGroupSize; use bytes::Bytes; use fail::fail_point; use move_core_types::value::MoveTypeLayout; @@ -86,14 +87,22 @@ pub(crate) use resource_writes_to_materialize; pub(crate) fn map_finalized_group( group_key: T::Key, - finalized_group: anyhow::Result)>>, + finalized_group: anyhow::Result<(Vec<(T::Tag, ValueWithLayout)>, ResourceGroupSize)>, metadata_op: T::Value, is_read_needing_exchange: bool, -) -> Result<(T::Key, T::Value, Vec<(T::Tag, ValueWithLayout)>), PanicError> { +) -> Result< + ( + T::Key, + T::Value, + Vec<(T::Tag, ValueWithLayout)>, + ResourceGroupSize, + ), + PanicError, +> { let metadata_is_deletion = metadata_op.is_deletion(); match finalized_group { - Ok(finalized_group) => { + Ok((finalized_group, group_size)) => { if is_read_needing_exchange && metadata_is_deletion { // Value needed exchange but was not written / modified during the txn // execution: may not be empty. @@ -108,7 +117,7 @@ pub(crate) fn map_finalized_group( metadata_is_deletion ))) } else { - Ok((group_key, metadata_op, finalized_group)) + Ok((group_key, metadata_op, finalized_group, group_size)) } }, Err(e) => Err(code_invariant_error(format!( @@ -119,7 +128,12 @@ pub(crate) fn map_finalized_group( } pub(crate) fn serialize_groups( - finalized_groups: Vec<(T::Key, T::Value, Vec<(T::Tag, Arc)>)>, + finalized_groups: Vec<( + T::Key, + T::Value, + Vec<(T::Tag, Arc)>, + ResourceGroupSize, + )>, ) -> Result, ResourceGroupSerializationError> { fail_point!( "fail-point-resource-group-serialization", @@ -129,27 +143,45 @@ pub(crate) fn serialize_groups( finalized_groups .into_iter() - .map(|(group_key, mut metadata_op, finalized_group)| { - let btree: BTreeMap = finalized_group - .into_iter() - .map(|(resource_tag, arc_v)| { - let bytes = arc_v - .extract_raw_bytes() - .expect("Deletions should already be applied"); - (resource_tag, bytes) - }) - .collect(); + .map( + |(group_key, mut metadata_op, finalized_group, group_size)| { + let btree: BTreeMap = finalized_group + .into_iter() + .map(|(resource_tag, arc_v)| { + let bytes = arc_v + .extract_raw_bytes() + .expect("Deletions should already be applied"); + (resource_tag, bytes) + }) + .collect(); - bcs::to_bytes(&btree) - .map_err(|e| { - alert!("Unexpected resource group error {:?}", e); - ResourceGroupSerializationError - }) - .map(|group_bytes| { - metadata_op.set_bytes(group_bytes.into()); - (group_key, metadata_op) - }) - }) + match bcs::to_bytes(&btree) { + Ok(group_bytes) => { + if (!btree.is_empty() || group_size.get() != 0) + && group_bytes.len() as u64 != group_size.get() + { + alert!( + "Serialized resource group size mismatch key = {:?} num items {}, \ + len {} recorded size {}, op {:?}", + group_key, + btree.len(), + group_bytes.len(), + group_size.get(), + metadata_op, + ); + Err(ResourceGroupSerializationError) + } else { + metadata_op.set_bytes(group_bytes.into()); + Ok((group_key, metadata_op)) + } + }, + Err(e) => { + alert!("Unexpected resource group error {:?}", e); + Err(ResourceGroupSerializationError) + }, + } + }, + ) .collect() } @@ -169,11 +201,24 @@ pub(crate) fn map_id_to_values_in_group_writes< S: TStateView + Sync, X: Executable + 'static, >( - finalized_groups: Vec<(T::Key, T::Value, Vec<(T::Tag, ValueWithLayout)>)>, + finalized_groups: Vec<( + T::Key, + T::Value, + Vec<(T::Tag, ValueWithLayout)>, + ResourceGroupSize, + )>, latest_view: &LatestView, -) -> Result)>)>, PanicError> { +) -> Result< + Vec<( + T::Key, + T::Value, + Vec<(T::Tag, Arc)>, + ResourceGroupSize, + )>, + PanicError, +> { let mut patched_finalized_groups = Vec::with_capacity(finalized_groups.len()); - for (group_key, group_metadata_op, resource_vec) in finalized_groups.into_iter() { + for (group_key, group_metadata_op, resource_vec, group_size) in finalized_groups.into_iter() { let mut patched_resource_vec = Vec::with_capacity(resource_vec.len()); for (tag, value_with_layout) in resource_vec.into_iter() { let value = match value_with_layout { @@ -185,7 +230,12 @@ pub(crate) fn map_id_to_values_in_group_writes< }; patched_resource_vec.push((tag, value)); } - patched_finalized_groups.push((group_key, group_metadata_op, patched_resource_vec)); + patched_finalized_groups.push(( + group_key, + group_metadata_op, + patched_resource_vec, + group_size, + )); } Ok(patched_finalized_groups) } diff --git a/aptos-move/block-executor/src/proptest_types/baseline.rs b/aptos-move/block-executor/src/proptest_types/baseline.rs index 21430370818e1..b2d72577fa3c0 100644 --- a/aptos-move/block-executor/src/proptest_types/baseline.rs +++ b/aptos-move/block-executor/src/proptest_types/baseline.rs @@ -310,11 +310,11 @@ impl BaselineOutput { .for_each(|(baseline_read, result_read)| baseline_read.assert_read_result(result_read)); // Update group world. - for (group_key, v, updates) in output.group_writes.iter() { + for (group_key, v, group_size, updates) in output.group_writes.iter() { group_metadata.insert(group_key.clone(), v.as_state_value_metadata()); + let group_map = group_world.entry(group_key).or_insert(base_map.clone()); for (tag, v) in updates { - let group_map = group_world.entry(group_key).or_insert(base_map.clone()); if v.is_deletion() { assert_some!(group_map.remove(tag)); } else { @@ -324,6 +324,9 @@ impl BaselineOutput { assert_eq!(existed, v.is_modification()); } } + let computed_size = + group_size_as_sum(group_map.iter().map(|(t, v)| (t, v.len()))).unwrap(); + assert_eq!(computed_size, *group_size); } // Test recorded finalized group writes: it should contain the whole group, and diff --git a/aptos-move/block-executor/src/proptest_types/types.rs b/aptos-move/block-executor/src/proptest_types/types.rs index 648c8d86d51c4..70cf194fb86ad 100644 --- a/aptos-move/block-executor/src/proptest_types/types.rs +++ b/aptos-move/block-executor/src/proptest_types/types.rs @@ -25,7 +25,12 @@ use aptos_types::{ transaction::BlockExecutableTransaction as Transaction, write_set::{TransactionWrite, WriteOp, WriteOpKind}, }; -use aptos_vm_types::resolver::{TExecutorView, TResourceGroupView}; +use aptos_vm_types::{ + resolver::{ResourceGroupSize, TExecutorView, TResourceGroupView}, + resource_group_adapter::{ + decrement_size_for_remove_tag, group_tagged_resource_size, increment_size_for_add_tag, + }, +}; use bytes::Bytes; use claims::{assert_ge, assert_le, assert_ok}; use move_core_types::{identifier::IdentStr, value::MoveTypeLayout}; @@ -928,6 +933,8 @@ where let mut group_writes = vec![]; for (key, metadata, inner_ops) in behavior.group_writes.iter() { let mut new_inner_ops = HashMap::new(); + let group_size = view.resource_group_size(key).unwrap(); + let mut new_group_size = view.resource_group_size(key).unwrap(); for (tag, inner_op) in inner_ops.iter() { let exists = view .get_resource_from_group(key, tag, None) @@ -940,46 +947,92 @@ where // inner op is either deletion or creation. assert!(!inner_op.is_modification()); - if exists == inner_op.is_deletion() { - // insert the provided inner op. - new_inner_ops.insert(*tag, inner_op.clone()); - } - if exists && inner_op.is_creation() { - // Adjust the type, otherwise executor will assert. - if inner_op.bytes().unwrap()[0] % 4 < 3 || *tag == RESERVED_TAG { - new_inner_ops.insert( - *tag, + let maybe_op = if exists { + Some( + if inner_op.is_creation() + && (inner_op.bytes().unwrap()[0] % 4 < 3 + || *tag == RESERVED_TAG) + { ValueType::new( inner_op.bytes.clone(), StateValueMetadata::none(), WriteOpKind::Modification, - ), - ); - } else { - new_inner_ops.insert( - *tag, + ) + } else { ValueType::new( None, StateValueMetadata::none(), WriteOpKind::Deletion, - ), - ); + ) + }, + ) + } else { + inner_op.is_creation().then(|| inner_op.clone()) + }; + + if let Some(new_inner_op) = maybe_op { + if exists { + let old_tagged_value_size = + view.resource_size_in_group(key, tag).unwrap(); + let old_size = + group_tagged_resource_size(tag, old_tagged_value_size).unwrap(); + // let _ = + // decrement_size_for_remove_tag(&mut new_group_size, old_size); + if decrement_size_for_remove_tag(&mut new_group_size, old_size) + .is_err() + { + // Check it only happens for speculative executions that may not + // commit by returning incorrect (empty) output. + return ExecutionStatus::Success(MockOutput::skip_output()); + } + } + if !new_inner_op.is_deletion() { + let new_size = group_tagged_resource_size( + tag, + inner_op.bytes.as_ref().unwrap().len(), + ) + .unwrap(); + if increment_size_for_add_tag(&mut new_group_size, new_size) + .is_err() + { + // Check it only happens for speculative executions that may not + // commit by returning incorrect (empty) output. + return ExecutionStatus::Success(MockOutput::skip_output()); + } } + + new_inner_ops.insert(*tag, new_inner_op); } } - if !inner_ops.is_empty() { - // Not testing metadata_op here, always modification. - group_writes.push(( - key.clone(), - ValueType::new( - Some(Bytes::new()), - metadata.clone(), - WriteOpKind::Modification, - ), - new_inner_ops, - )); + if !new_inner_ops.is_empty() { + if group_size.get() > 0 + && new_group_size == ResourceGroupSize::zero_combined() + { + // TODO: reserved tag currently prevents this code from being run. + // Group got deleted. + group_writes.push(( + key.clone(), + ValueType::new(None, metadata.clone(), WriteOpKind::Deletion), + new_group_size, + new_inner_ops, + )); + } else { + let op_kind = if group_size.get() == 0 { + WriteOpKind::Creation + } else { + WriteOpKind::Modification + }; + + // Not testing metadata_op here, always modification. + group_writes.push(( + key.clone(), + ValueType::new(Some(Bytes::new()), metadata.clone(), op_kind), + new_group_size, + new_inner_ops, + )); + } } } @@ -1024,7 +1077,7 @@ pub(crate) enum GroupSizeOrMetadata { pub(crate) struct MockOutput { pub(crate) writes: Vec<(K, ValueType)>, // Key, metadata_op, inner_ops - pub(crate) group_writes: Vec<(K, ValueType, HashMap)>, + pub(crate) group_writes: Vec<(K, ValueType, ResourceGroupSize, HashMap)>, pub(crate) deltas: Vec<(K, DeltaOp)>, pub(crate) events: Vec, pub(crate) read_results: Vec>>, @@ -1111,15 +1164,17 @@ where ) -> Vec<( K, ValueType, + ResourceGroupSize, BTreeMap>)>, )> { self.group_writes .iter() .cloned() - .map(|(group_key, metadata_v, inner_ops)| { + .map(|(group_key, metadata_v, group_size, inner_ops)| { ( group_key, metadata_v, + group_size, inner_ops.into_iter().map(|(k, v)| (k, (v, None))).collect(), ) }) @@ -1165,12 +1220,26 @@ where fn incorporate_materialized_txn_output( &self, aggregator_v1_writes: Vec<(::Key, WriteOp)>, - _patched_resource_write_set: Vec<( + patched_resource_write_set: Vec<( ::Key, ::Value, )>, _patched_events: Vec<::Event>, ) -> Result<(), PanicError> { + let resources: HashMap<::Key, ::Value> = + patched_resource_write_set.clone().into_iter().collect(); + for (key, _, size, _) in &self.group_writes { + let v = resources.get(key).unwrap(); + if v.is_deletion() { + assert_eq!(*size, ResourceGroupSize::zero_combined()); + } else { + assert_eq!( + size.get(), + resources.get(key).unwrap().bytes().map_or(0, |b| b.len()) as u64 + ); + } + } + assert_ok!(self.materialized_delta_writes.set(aggregator_v1_writes)); // TODO[agg_v2](tests): Set the patched resource write set and events. But that requires the function // to take &mut self as input diff --git a/aptos-move/block-executor/src/task.rs b/aptos-move/block-executor/src/task.rs index edf3242322501..6b87a482f2993 100644 --- a/aptos-move/block-executor/src/task.rs +++ b/aptos-move/block-executor/src/task.rs @@ -14,7 +14,7 @@ use aptos_types::{ transaction::BlockExecutableTransaction as Transaction, write_set::WriteOp, }; -use aptos_vm_types::resolver::{TExecutorView, TResourceGroupView}; +use aptos_vm_types::resolver::{ResourceGroupSize, TExecutorView, TResourceGroupView}; use move_core_types::{value::MoveTypeLayout, vm_status::StatusCode}; use std::{ collections::{BTreeMap, HashSet}, @@ -144,6 +144,7 @@ pub trait TransactionOutput: Send + Sync + Debug { ) -> Vec<( ::Key, ::Value, + ResourceGroupSize, BTreeMap< ::Tag, ( @@ -161,7 +162,7 @@ pub trait TransactionOutput: Send + Sync + Debug { )> { self.resource_group_write_set() .into_iter() - .map(|(key, op, _)| (key, op)) + .map(|(key, op, _, _)| (key, op)) .collect() } diff --git a/aptos-move/block-executor/src/txn_last_input_output.rs b/aptos-move/block-executor/src/txn_last_input_output.rs index 2de1490bf40c0..95ffa57680608 100644 --- a/aptos-move/block-executor/src/txn_last_input_output.rs +++ b/aptos-move/block-executor/src/txn_last_input_output.rs @@ -9,7 +9,7 @@ use crate::{ types::{InputOutputKey, ReadWriteSummary}, }; use aptos_logger::error; -use aptos_mvhashmap::types::{TxnIndex, ValueWithLayout}; +use aptos_mvhashmap::types::TxnIndex; use aptos_types::{ error::{code_invariant_error, PanicError}, fee_statement::FeeStatement, @@ -44,21 +44,15 @@ macro_rules! forward_on_success_or_skip_rest { }}; } -pub(crate) enum KeyKind { +pub(crate) enum KeyKind { Resource, Module, - Group, + // Contains the set of tags for the given group key. + Group(HashSet), } pub struct TxnLastInputOutput, E: Debug> { inputs: Vec>>>, // txn_idx -> input. - // Set once when the group outputs are committed sequentially, to be processed later by - // concurrent materialization / output preparation. - finalized_groups: Vec< - CachePadded< - ExplicitSyncWrapper)>)>>, - >, - >, // TODO: Consider breaking down the outputs when storing (avoid traversals, cache below). outputs: Vec>>>, // txn_idx -> output. @@ -68,6 +62,8 @@ pub struct TxnLastInputOutput, E: arced_resource_writes: Vec< CachePadded, Option>)>>>, >, + resource_group_keys_and_tags: + Vec)>>>>, // Record all writes and reads to access paths corresponding to modules (code) in any // (speculative) executions. Used to avoid a potential race with module publishing and @@ -90,9 +86,10 @@ impl, E: Debug + Send + Clone> arced_resource_writes: (0..num_txns) .map(|_| CachePadded::new(ExplicitSyncWrapper::>::new(vec![]))) .collect(), - finalized_groups: (0..num_txns) + resource_group_keys_and_tags: (0..num_txns) .map(|_| CachePadded::new(ExplicitSyncWrapper::>::new(vec![]))) .collect(), + module_writes: DashSet::new(), module_reads: DashSet::new(), } @@ -132,6 +129,7 @@ impl, E: Debug + Send + Clone> input: CapturedReads, output: ExecutionStatus, arced_resource_writes: Vec<(T::Key, Arc, Option>)>, + group_keys_and_tags: Vec<(T::Key, HashSet)>, ) -> bool { let written_modules = match &output { ExecutionStatus::Success(output) | ExecutionStatus::SkipRest(output) => { @@ -149,6 +147,7 @@ impl, E: Debug + Send + Clone> } *self.arced_resource_writes[txn_idx as usize].acquire() = arced_resource_writes; + *self.resource_group_keys_and_tags[txn_idx as usize].acquire() = group_keys_and_tags; self.inputs[txn_idx as usize].store(Some(Arc::new(input))); self.outputs[txn_idx as usize].store(Some(Arc::new(output))); @@ -275,10 +274,21 @@ impl, E: Debug + Send + Clone> // Extracts a set of paths (keys) written or updated during execution from transaction // output, .1 for each item is false for non-module paths and true for module paths. - pub(crate) fn modified_keys( + // If TAKE_GROUP_TAGS is set, the final HashSet of tags is moved for the group key - + // should be called once for each incarnation / record due to 'take'. if TAKE_GROUP_TAGS + // is false, stored modified group resource tags in the group are cloned out. + pub(crate) fn modified_keys( &self, txn_idx: TxnIndex, - ) -> Option> { + ) -> Option)>> { + let group_keys_and_tags: Vec<(T::Key, HashSet)> = if TAKE_GROUP_TAGS { + std::mem::take(&mut self.resource_group_keys_and_tags[txn_idx as usize].acquire()) + } else { + self.resource_group_keys_and_tags[txn_idx as usize] + .acquire() + .clone() + }; + self.outputs[txn_idx as usize] .load_full() .and_then(|txn_output| match txn_output.as_ref() { @@ -300,9 +310,9 @@ impl, E: Debug + Send + Clone> .map(|k| (k, KeyKind::Module)), ) .chain( - t.resource_group_metadata_ops() + group_keys_and_tags .into_iter() - .map(|(k, _)| (k, KeyKind::Group)), + .map(|(k, tags)| (k, KeyKind::Group(tags))), ), ), ExecutionStatus::Abort(_) @@ -373,21 +383,6 @@ impl, E: Debug + Send + Clone> ) } - pub(crate) fn record_finalized_group( - &self, - txn_idx: TxnIndex, - finalized_groups: Vec<(T::Key, T::Value, Vec<(T::Tag, ValueWithLayout)>)>, - ) { - *self.finalized_groups[txn_idx as usize].acquire() = finalized_groups; - } - - pub(crate) fn take_finalized_group( - &self, - txn_idx: TxnIndex, - ) -> Vec<(T::Key, T::Value, Vec<(T::Tag, ValueWithLayout)>)> { - std::mem::take(&mut self.finalized_groups[txn_idx as usize].acquire()) - } - pub(crate) fn take_resource_write_set( &self, txn_idx: TxnIndex, diff --git a/aptos-move/block-executor/src/unit_tests/mod.rs b/aptos-move/block-executor/src/unit_tests/mod.rs index 82472dd15aad5..743e2400240ff 100644 --- a/aptos-move/block-executor/src/unit_tests/mod.rs +++ b/aptos-move/block-executor/src/unit_tests/mod.rs @@ -28,8 +28,9 @@ use aptos_types::{ contract_event::TransactionEvent, executable::{ExecutableTestType, ModulePath}, state_store::state_value::StateValueMetadata, + write_set::WriteOpKind, }; -use claims::assert_matches; +use claims::{assert_matches, assert_ok}; use fail::FailScenario; use rand::{prelude::*, random}; use std::{ @@ -41,6 +42,60 @@ use std::{ sync::Arc, }; +#[test] +fn test_resource_group_deletion() { + let mut group_creation: MockIncarnation, MockEvent> = + MockIncarnation::new(vec![KeyType::(1, false)], vec![], vec![], vec![], 10); + group_creation.group_writes.push(( + KeyType::(100, false), + StateValueMetadata::none(), + HashMap::from([(101, ValueType::from_value(vec![5], true))]), + )); + let mut group_deletion: MockIncarnation, MockEvent> = + MockIncarnation::new(vec![KeyType::(1, false)], vec![], vec![], vec![], 10); + group_deletion.group_writes.push(( + KeyType::(100, false), + StateValueMetadata::none(), + HashMap::from([( + 101, + ValueType::new(None, StateValueMetadata::none(), WriteOpKind::Deletion), + )]), + )); + let t_0 = MockTransaction::from_behavior(group_creation); + let t_1 = MockTransaction::from_behavior(group_deletion); + + let transactions = Vec::from([t_0, t_1]); + + let data_view = NonEmptyGroupDataView::> { + group_keys: HashSet::new(), + }; + let executor_thread_pool = Arc::new( + rayon::ThreadPoolBuilder::new() + .num_threads(num_cpus::get()) + .build() + .unwrap(), + ); + let block_executor = BlockExecutor::< + MockTransaction, MockEvent>, + MockTask, MockEvent>, + NonEmptyGroupDataView>, + NoOpTransactionCommitHook, MockEvent>, usize>, + ExecutableTestType, + >::new( + BlockExecutorConfig::new_no_block_limit(num_cpus::get()), + executor_thread_pool, + None, + ); + + assert_ok!(block_executor.execute_transactions_sequential( + (), + &transactions, + &data_view, + false + )); + assert_ok!(block_executor.execute_transactions_parallel(&(), &transactions, &data_view)); +} + #[test] fn resource_group_bcs_fallback() { let no_group_incarnation_1: MockIncarnation, MockEvent> = MockIncarnation::new( diff --git a/aptos-move/block-executor/src/view.rs b/aptos-move/block-executor/src/view.rs index 54841a1dff1db..871c4166dcc32 100644 --- a/aptos-move/block-executor/src/view.rs +++ b/aptos-move/block-executor/src/view.rs @@ -140,7 +140,11 @@ trait ResourceState { } trait ResourceGroupState { - fn set_raw_group_base_values(&self, group_key: T::Key, base_values: Vec<(T::Tag, T::Value)>); + fn set_raw_group_base_values( + &self, + group_key: T::Key, + base_values: Vec<(T::Tag, T::Value)>, + ) -> PartialVMResult<()>; fn read_cached_group_tagged_data( &self, @@ -512,9 +516,6 @@ impl<'a, T: Transaction, X: Executable> ParallelState<'a, T, X> { .with_message("Interrupted as block execution was halted".to_string())); } }, - Err(TagSerializationError(e)) => { - return Err(e); - }, } } } @@ -663,10 +664,19 @@ impl<'a, T: Transaction, X: Executable> ResourceState for ParallelState<'a, T } impl<'a, T: Transaction, X: Executable> ResourceGroupState for ParallelState<'a, T, X> { - fn set_raw_group_base_values(&self, group_key: T::Key, base_values: Vec<(T::Tag, T::Value)>) { + fn set_raw_group_base_values( + &self, + group_key: T::Key, + base_values: Vec<(T::Tag, T::Value)>, + ) -> PartialVMResult<()> { self.versioned_map .group_data() - .set_raw_base_values(group_key.clone(), base_values); + .set_raw_base_values(group_key.clone(), base_values) + .map_err(|e| { + self.captured_reads.borrow_mut().mark_incorrect_use(); + PartialVMError::new(StatusCode::UNEXPECTED_DESERIALIZATION_ERROR) + .with_message(e.to_string()) + }) } fn read_cached_group_tagged_data( @@ -757,9 +767,6 @@ impl<'a, T: Transaction, X: Executable> ResourceGroupState for ParallelState< .with_message("Interrupted as block execution was halted".to_string())); } }, - Err(TagSerializationError(_)) => { - unreachable!("Reading a resource does not require tag serialization"); - }, } } } @@ -770,6 +777,7 @@ pub(crate) struct SequentialState<'a, T: Transaction, X: Executable> { pub(crate) read_set: RefCell>, pub(crate) start_counter: u32, pub(crate) counter: &'a RefCell, + // TODO: Move to UnsyncMap. pub(crate) incorrect_use: RefCell, } @@ -867,9 +875,18 @@ impl<'a, T: Transaction, X: Executable> ResourceState for SequentialState<'a, } impl<'a, T: Transaction, X: Executable> ResourceGroupState for SequentialState<'a, T, X> { - fn set_raw_group_base_values(&self, group_key: T::Key, base_values: Vec<(T::Tag, T::Value)>) { + fn set_raw_group_base_values( + &self, + group_key: T::Key, + base_values: Vec<(T::Tag, T::Value)>, + ) -> PartialVMResult<()> { self.unsync_map - .set_group_base_values(group_key.clone(), base_values); + .set_group_base_values(group_key.clone(), base_values) + .map_err(|e| { + *self.incorrect_use.borrow_mut() = true; + PartialVMError::new(StatusCode::UNEXPECTED_DESERIALIZATION_ERROR) + .with_message(e.to_string()) + }) } fn read_cached_group_tagged_data( @@ -1013,7 +1030,11 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> LatestView< pub fn is_incorrect_use(&self) -> bool { match &self.latest_view { - ViewState::Sync(state) => state.captured_reads.borrow().is_incorrect_use(), + ViewState::Sync(_) => { + // Parallel executor accesses captured reads directly and does not use this API. + true + }, + // TODO: store incorrect use in UnsyncMap and eliminate this API. ViewState::Unsync(state) => *state.incorrect_use.borrow(), } } @@ -1257,7 +1278,7 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> LatestView< return Ok(None); } match self.get_resource_state_value_metadata(key)? { - Some(metadata) => match unsync_map.get_group_size(key)? { + Some(metadata) => match unsync_map.get_group_size(key) { GroupReadResult::Size(group_size) => { Ok(Some((key.clone(), (metadata, group_size.get())))) }, @@ -1352,7 +1373,7 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> LatestView< bcs::from_bytes(state_value.bytes()).map_err(|e| { PartialVMError::new(StatusCode::UNEXPECTED_DESERIALIZATION_ERROR) .with_message(format!( - "Failed to deserialize the resource group at {:? }: {:?}", + "Failed to deserialize the resource group at {:?}: {:?}", group_key, e )) })?, @@ -1372,7 +1393,7 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> LatestView< self.latest_view .get_resource_group_state() - .set_raw_group_base_values(group_key.clone(), base_group_sentinel_ops); + .set_raw_group_base_values(group_key.clone(), base_group_sentinel_ops)?; self.latest_view.get_resource_state().set_base_value( group_key.clone(), ValueWithLayout::RawFromStorage(Arc::new(metadata_op)), @@ -1439,7 +1460,7 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> TResourceGr ) -> PartialVMResult { let mut group_read = match &self.latest_view { ViewState::Sync(state) => state.read_group_size(group_key, self.txn_idx)?, - ViewState::Unsync(state) => state.unsync_map.get_group_size(group_key)?, + ViewState::Unsync(state) => state.unsync_map.get_group_size(group_key), }; if matches!(group_read, GroupReadResult::Uninitialized) { @@ -1447,7 +1468,7 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> TResourceGr group_read = match &self.latest_view { ViewState::Sync(state) => state.read_group_size(group_key, self.txn_idx)?, - ViewState::Unsync(state) => state.unsync_map.get_group_size(group_key)?, + ViewState::Unsync(state) => state.unsync_map.get_group_size(group_key), } }; @@ -1489,22 +1510,6 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> TResourceGr Ok(group_read.into_value().0) } - fn resource_size_in_group( - &self, - _group_key: &Self::GroupKey, - _resource_tag: &Self::ResourceTag, - ) -> PartialVMResult { - unimplemented!("Currently resolved by ResourceGroupAdapter"); - } - - fn resource_exists_in_group( - &self, - _group_key: &Self::GroupKey, - _resource_tag: &Self::ResourceTag, - ) -> PartialVMResult { - unimplemented!("Currently resolved by ResourceGroupAdapter"); - } - fn release_group_cache( &self, ) -> Option>> { diff --git a/aptos-move/mvhashmap/Cargo.toml b/aptos-move/mvhashmap/Cargo.toml index 0ac539ed1b839..3fd3f93b1f68f 100644 --- a/aptos-move/mvhashmap/Cargo.toml +++ b/aptos-move/mvhashmap/Cargo.toml @@ -22,8 +22,6 @@ bytes = { workspace = true } claims = { workspace = true } crossbeam = { workspace = true } dashmap = { workspace = true } -derivative = { workspace = true } -move-binary-format = { workspace = true } move-core-types = { workspace = true } move-vm-types = { workspace = true } serde = { workspace = true } diff --git a/aptos-move/mvhashmap/src/lib.rs b/aptos-move/mvhashmap/src/lib.rs index a9f2ffe8cd3b1..1421fe6921735 100644 --- a/aptos-move/mvhashmap/src/lib.rs +++ b/aptos-move/mvhashmap/src/lib.rs @@ -53,10 +53,10 @@ impl< pub fn new() -> MVHashMap { MVHashMap { - data: VersionedData::new(), - group_data: VersionedGroupData::new(), - delayed_fields: VersionedDelayedFields::new(), - modules: VersionedModules::new(), + data: VersionedData::empty(), + group_data: VersionedGroupData::empty(), + delayed_fields: VersionedDelayedFields::empty(), + modules: VersionedModules::empty(), } } diff --git a/aptos-move/mvhashmap/src/types.rs b/aptos-move/mvhashmap/src/types.rs index 31967be962d0f..d386dd7fe7278 100644 --- a/aptos-move/mvhashmap/src/types.rs +++ b/aptos-move/mvhashmap/src/types.rs @@ -10,8 +10,6 @@ use aptos_types::{ }; use aptos_vm_types::resolver::ResourceGroupSize; use bytes::Bytes; -use derivative::Derivative; -use move_binary_format::errors::PartialVMError; use move_core_types::value::MoveTypeLayout; use std::sync::{atomic::AtomicU32, Arc}; @@ -28,14 +26,13 @@ pub struct StorageVersion; // TODO: Find better representations for this, a similar one for TxnIndex. pub type Version = Result<(TxnIndex, Incarnation), StorageVersion>; -#[derive(Clone, Copy, PartialEq)] +#[derive(Clone, Copy, Debug, PartialEq)] pub(crate) enum Flag { - Done, - Estimate, + Done = 0, + Estimate = 1, } -#[derive(Debug, Derivative)] -#[derivative(PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq)] pub enum MVGroupError { /// The base group contents are not initialized. Uninitialized, @@ -43,8 +40,6 @@ pub enum MVGroupError { TagNotFound, /// A dependency on other transaction has been found during the read. Dependency(TxnIndex), - /// Tag serialization is needed for group size computation. - TagSerializationError(#[derivative(PartialEq = "ignore")] PartialVMError), } /// Returned as Err(..) when failed to read from the multi-version data-structure. @@ -79,14 +74,25 @@ impl GroupReadResult { pub fn into_value(self) -> (Option, Option>) { match self { GroupReadResult::Value(maybe_bytes, maybe_layout) => (maybe_bytes, maybe_layout), - _ => unreachable!("Expected a value"), + GroupReadResult::Size(size) => { + unreachable!("Expected group value, found size {:?}", size) + }, + GroupReadResult::Uninitialized => { + unreachable!("Expected group value, found uninitialized") + }, } } pub fn into_size(self) -> ResourceGroupSize { match self { GroupReadResult::Size(size) => size, - _ => unreachable!("Expected size"), + GroupReadResult::Value(maybe_bytes, maybe_layout) => unreachable!( + "Expected size, found value bytes = {:?}, layout = {:?}", + maybe_bytes, maybe_layout + ), + GroupReadResult::Uninitialized => { + unreachable!("Expected group size, found uninitialized") + }, } } } diff --git a/aptos-move/mvhashmap/src/unit_tests/mod.rs b/aptos-move/mvhashmap/src/unit_tests/mod.rs index 4575e04912158..3f189ca68b189 100644 --- a/aptos-move/mvhashmap/src/unit_tests/mod.rs +++ b/aptos-move/mvhashmap/src/unit_tests/mod.rs @@ -298,7 +298,7 @@ fn create_write_read_placeholder_struct() { fn materialize_delta_shortcut() { use MVDataOutput::*; - let vd: VersionedData>, TestValue> = VersionedData::new(); + let vd: VersionedData>, TestValue> = VersionedData::empty(); let ap = KeyType(b"/foo/b".to_vec()); let limit = 10000; @@ -343,7 +343,7 @@ fn materialize_delta_shortcut() { #[test] #[should_panic] fn aggregator_base_mismatch() { - let vd: VersionedData>, TestValue> = VersionedData::new(); + let vd: VersionedData>, TestValue> = VersionedData::empty(); let ap = KeyType(b"/foo/b".to_vec()); vd.set_base_value( @@ -361,7 +361,7 @@ fn aggregator_base_mismatch() { #[test] #[should_panic] fn commit_without_deltas() { - let vd: VersionedData>, TestValue> = VersionedData::new(); + let vd: VersionedData>, TestValue> = VersionedData::empty(); let ap = KeyType(b"/foo/b".to_vec()); // Must panic as there are no deltas at all. @@ -371,7 +371,7 @@ fn commit_without_deltas() { #[test] #[should_panic] fn commit_without_entry() { - let vd: VersionedData>, TestValue> = VersionedData::new(); + let vd: VersionedData>, TestValue> = VersionedData::empty(); let ap = KeyType(b"/foo/b".to_vec()); vd.add_delta(ap.clone(), 8, delta_add(20, 1000)); diff --git a/aptos-move/mvhashmap/src/unit_tests/proptest_types.rs b/aptos-move/mvhashmap/src/unit_tests/proptest_types.rs index 09c532229d584..9cecda2c82e76 100644 --- a/aptos-move/mvhashmap/src/unit_tests/proptest_types.rs +++ b/aptos-move/mvhashmap/src/unit_tests/proptest_types.rs @@ -13,11 +13,12 @@ use aptos_types::{ state_store::state_value::StateValue, write_set::{TransactionWrite, WriteOpKind}, }; +use aptos_vm_types::resolver::ResourceGroupSize; use bytes::Bytes; use claims::assert_none; use proptest::{collection::vec, prelude::*, sample::Index, strategy::Strategy}; use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, HashSet}, fmt::Debug, hash::Hash, sync::{ @@ -240,9 +241,21 @@ where let value = Value::new(None); let idx = idx as TxnIndex; if test_group { + map.group_data + .set_raw_base_values(key.clone(), vec![]) + .unwrap(); map.group_data() - .write(key.clone(), idx, 0, vec![(5, (value, None))]); - map.group_data().mark_estimate(&key, idx); + .write( + key.clone(), + idx, + 0, + vec![(5, (value, None))], + ResourceGroupSize::zero_combined(), + HashSet::new(), + ) + .unwrap(); + map.group_data() + .mark_estimate(&key, idx, [5usize].into_iter().collect()); } else { map.data().write(key.clone(), idx, 0, Arc::new(value), None); map.data().mark_estimate(&key, idx); @@ -293,12 +306,12 @@ where assert_value(v); break; }, - Err(MVGroupError::Uninitialized) => { + Err(MVGroupError::Uninitialized) + | Err(MVGroupError::TagNotFound) => { assert_eq!(baseline, ExpectedOutput::NotInMap, "{:?}", idx); break; }, Err(MVGroupError::Dependency(_i)) => (), - Err(_) => unreachable!("Unreachable error cases for test"), } } else { match map @@ -350,7 +363,15 @@ where let value = Value::new(None); if test_group { map.group_data() - .write(key, idx as TxnIndex, 1, vec![(5, (value, None))]); + .write( + key, + idx as TxnIndex, + 1, + vec![(5, (value, None))], + ResourceGroupSize::zero_combined(), + HashSet::new(), + ) + .unwrap(); } else { map.data() .write(key, idx as TxnIndex, 1, Arc::new(value), None); @@ -361,7 +382,15 @@ where let value = Value::new(Some(v.clone())); if test_group { map.group_data() - .write(key, idx as TxnIndex, 1, vec![(5, (value, None))]); + .write( + key, + idx as TxnIndex, + 1, + vec![(5, (value, None))], + ResourceGroupSize::zero_combined(), + HashSet::new(), + ) + .unwrap(); } else { map.data() .write(key, idx as TxnIndex, 1, Arc::new(value), None); diff --git a/aptos-move/mvhashmap/src/unsync_map.rs b/aptos-move/mvhashmap/src/unsync_map.rs index 6867d56fcd5ea..7a11c73b1d780 100644 --- a/aptos-move/mvhashmap/src/unsync_map.rs +++ b/aptos-move/mvhashmap/src/unsync_map.rs @@ -6,6 +6,7 @@ use crate::{ utils::module_hash, BlockStateStats, }; +use anyhow::anyhow; use aptos_aggregator::types::DelayedFieldValue; use aptos_crypto::hash::HashValue; use aptos_types::{ @@ -13,8 +14,7 @@ use aptos_types::{ executable::{Executable, ExecutableDescriptor, ModulePath}, write_set::TransactionWrite, }; -use aptos_vm_types::resource_group_adapter::group_size_as_sum; -use move_binary_format::errors::PartialVMResult; +use aptos_vm_types::{resolver::ResourceGroupSize, resource_group_adapter::group_size_as_sum}; use move_core_types::value::MoveTypeLayout; use serde::Serialize; use std::{ @@ -44,7 +44,7 @@ pub struct UnsyncMap< resource_map: RefCell>>, // Optional hash can store the hash of the module to avoid re-computations. module_map: RefCell, Option)>>, - group_cache: RefCell>>>>, + group_cache: RefCell>, ResourceGroupSize)>>>, executable_cache: RefCell>>, executable_bytes: RefCell, delayed_field_map: RefCell>, @@ -102,18 +102,31 @@ impl< &self, group_key: K, base_values: impl IntoIterator, - ) { - let base_map = base_values + ) -> anyhow::Result<()> { + let base_map: HashMap> = base_values .into_iter() .map(|(t, v)| (t, ValueWithLayout::RawFromStorage(Arc::new(v)))) .collect(); + let base_size = group_size_as_sum( + base_map + .iter() + .flat_map(|(t, v)| v.bytes_len().map(|s| (t, s))), + ) + .map_err(|e| { + anyhow!( + "Tag serialization error in resource group at {:?}: {:?}", + group_key.clone(), + e + ) + })?; assert!( self.group_cache .borrow_mut() - .insert(group_key, RefCell::new(base_map)) + .insert(group_key, RefCell::new((base_map, base_size))) .is_none(), "UnsyncMap group cache must be empty to provide base values" ); + Ok(()) } pub fn update_tagged_base_value_with_layout( @@ -128,19 +141,15 @@ impl< .get_mut(&group_key) .expect("Unable to fetch the entry for the group key in group_cache") .borrow_mut() + .0 .insert(tag, ValueWithLayout::Exchanged(Arc::new(value), layout)); } - pub fn get_group_size(&self, group_key: &K) -> PartialVMResult { - Ok(match self.group_cache.borrow().get(group_key) { - Some(group_map) => GroupReadResult::Size(group_size_as_sum( - group_map - .borrow() - .iter() - .flat_map(|(t, v)| v.bytes_len().map(|s| (t, s))), - )?), + pub fn get_group_size(&self, group_key: &K) -> GroupReadResult { + match self.group_cache.borrow().get(group_key) { + Some(entry) => GroupReadResult::Size(entry.borrow().1), None => GroupReadResult::Uninitialized, - }) + } } pub fn fetch_group_tagged_data( @@ -153,6 +162,7 @@ impl< |group_map| { group_map .borrow() + .0 .get(value_tag) .cloned() .ok_or(UnsyncGroupError::TagNotFound) @@ -161,17 +171,41 @@ impl< } /// Contains the latest group ops for the given group key. - pub fn finalize_group(&self, group_key: &K) -> impl Iterator)> { - self.group_cache - .borrow() + pub fn finalize_group( + &self, + group_key: &K, + ) -> ( + impl Iterator)>, + ResourceGroupSize, + ) { + let binding = self.group_cache.borrow(); + let group = binding .get(group_key) .expect("Resource group must be cached") - .borrow() - .clone() - .into_iter() + .borrow(); + + (group.0.clone().into_iter(), group.1) } - pub fn insert_group_op( + pub fn insert_group_ops( + &self, + group_key: &K, + group_ops: impl IntoIterator>))>, + group_size: ResourceGroupSize, + ) -> Result<(), PanicError> { + for (value_tag, (group_op, maybe_layout)) in group_ops.into_iter() { + self.insert_group_op(group_key, value_tag, group_op, maybe_layout)?; + } + self.group_cache + .borrow_mut() + .get_mut(group_key) + .expect("Resource group must be cached") + .borrow_mut() + .1 = group_size; + Ok(()) + } + + fn insert_group_op( &self, group_key: &K, value_tag: T, @@ -186,6 +220,7 @@ impl< .get_mut(group_key) .expect("Resource group must be cached") .borrow_mut() + .0 .entry(value_tag.clone()), v.write_op_kind(), ) { @@ -199,11 +234,12 @@ impl< entry.insert(ValueWithLayout::Exchanged(Arc::new(v), maybe_layout)); }, (l, r) => { - println!("WriteOp kind {:?} not consistent with previous value at tag {:?}. l: {:?}, r: {:?}", v.write_op_kind(), value_tag, l, r); return Err(code_invariant_error(format!( - "WriteOp kind {:?} not consistent with previous value at tag {:?}", + "WriteOp kind {:?} not consistent with previous value at tag {:?}. Existing: {:?}, new: {:?}", v.write_op_kind(), - value_tag + value_tag, + l, + r, ))); }, } @@ -227,6 +263,7 @@ impl< self.group_cache.borrow().get(key).map(|group_map| { group_map .borrow() + .0 .iter() .map(|(tag, value)| (Arc::new(tag.clone()), value.clone())) .collect() @@ -327,7 +364,7 @@ mod test { map: &UnsyncMap>, usize, TestValue, ExecutableTestType, ()>, key: &KeyType>, ) -> HashMap> { - map.finalize_group(key).collect() + map.finalize_group(key).0.collect() } // TODO[agg_v2](test) Add tests with non trivial layout @@ -340,7 +377,8 @@ mod test { ap.clone(), // base tag 1, 2, 3 (1..4).map(|i| (i, TestValue::with_kind(i, true))), - ); + ) + .unwrap(); assert_ok!(map.insert_group_op(&ap, 2, TestValue::with_kind(202, false), None)); assert_ok!(map.insert_group_op(&ap, 3, TestValue::with_kind(203, false), None)); let committed = finalize_group_as_hashmap(&map, &ap); @@ -424,14 +462,14 @@ mod test { let ap = KeyType(b"/foo/f".to_vec()); let map = UnsyncMap::>, usize, TestValue, ExecutableTestType, ()>::new(); - map.set_group_base_values( + assert_ok!(map.set_group_base_values( ap.clone(), (1..4).map(|i| (i, TestValue::with_kind(i, true))), - ); - map.set_group_base_values( + )); + assert_ok!(map.set_group_base_values( ap.clone(), (1..4).map(|i| (i, TestValue::with_kind(i, true))), - ); + )); } #[should_panic] @@ -449,7 +487,7 @@ mod test { let ap = KeyType(b"/foo/b".to_vec()); let map = UnsyncMap::>, usize, TestValue, ExecutableTestType, ()>::new(); - let _ = map.finalize_group(&ap).collect::>(); + let _ = map.finalize_group(&ap).0.collect::>(); } #[test] @@ -457,13 +495,14 @@ mod test { let ap = KeyType(b"/foo/f".to_vec()); let map = UnsyncMap::>, usize, TestValue, ExecutableTestType, ()>::new(); - assert_ok_eq!(map.get_group_size(&ap), GroupReadResult::Uninitialized); + assert_eq!(map.get_group_size(&ap), GroupReadResult::Uninitialized); map.set_group_base_values( ap.clone(), // base tag 1, 2, 3, 4 (1..5).map(|i| (i, TestValue::creation_with_len(1))), - ); + ) + .unwrap(); let tag: usize = 5; let one_entry_len = TestValue::creation_with_len(1).bytes().unwrap().len(); @@ -471,13 +510,9 @@ mod test { let three_entry_len = TestValue::creation_with_len(3).bytes().unwrap().len(); let four_entry_len = TestValue::creation_with_len(4).bytes().unwrap().len(); - let exp_size = group_size_as_sum(vec![(&tag, one_entry_len); 4].into_iter()).unwrap(); - assert_ok_eq!(map.get_group_size(&ap), GroupReadResult::Size(exp_size)); + let base_size = group_size_as_sum(vec![(&tag, one_entry_len); 4].into_iter()).unwrap(); + assert_eq!(map.get_group_size(&ap), GroupReadResult::Size(base_size)); - assert_err!(map.insert_group_op(&ap, 0, TestValue::modification_with_len(2), None)); - assert_ok!(map.insert_group_op(&ap, 0, TestValue::creation_with_len(2), None)); - assert_err!(map.insert_group_op(&ap, 1, TestValue::creation_with_len(2), None)); - assert_ok!(map.insert_group_op(&ap, 1, TestValue::modification_with_len(2), None)); let exp_size = group_size_as_sum(vec![(&tag, two_entry_len); 2].into_iter().chain(vec![ ( &tag, @@ -486,10 +521,26 @@ mod test { 3 ])) .unwrap(); - assert_ok_eq!(map.get_group_size(&ap), GroupReadResult::Size(exp_size)); + assert_err!(map.insert_group_ops( + &ap, + vec![(0, (TestValue::modification_with_len(2), None))], + exp_size, + )); + assert_err!(map.insert_group_ops( + &ap, + vec![(1, (TestValue::creation_with_len(2), None))], + exp_size, + )); + assert_ok!(map.insert_group_ops( + &ap, + vec![ + (0, (TestValue::creation_with_len(2), None)), + (1, (TestValue::modification_with_len(2), None)) + ], + exp_size + )); + assert_eq!(map.get_group_size(&ap), GroupReadResult::Size(exp_size)); - assert_ok!(map.insert_group_op(&ap, 4, TestValue::modification_with_len(3), None)); - assert_ok!(map.insert_group_op(&ap, 5, TestValue::creation_with_len(3), None)); let exp_size = group_size_as_sum( vec![(&tag, one_entry_len); 2] .into_iter() @@ -497,10 +548,16 @@ mod test { .chain(vec![(&tag, three_entry_len); 2]), ) .unwrap(); - assert_ok_eq!(map.get_group_size(&ap), GroupReadResult::Size(exp_size)); + assert_ok!(map.insert_group_ops( + &ap, + vec![ + (4, (TestValue::modification_with_len(3), None)), + (5, (TestValue::creation_with_len(3), None)), + ], + exp_size + )); + assert_eq!(map.get_group_size(&ap), GroupReadResult::Size(exp_size)); - assert_ok!(map.insert_group_op(&ap, 0, TestValue::modification_with_len(4), None)); - assert_ok!(map.insert_group_op(&ap, 1, TestValue::modification_with_len(4), None)); let exp_size = group_size_as_sum( vec![(&tag, one_entry_len); 2] .into_iter() @@ -508,7 +565,15 @@ mod test { .chain(vec![(&tag, four_entry_len); 2]), ) .unwrap(); - assert_ok_eq!(map.get_group_size(&ap), GroupReadResult::Size(exp_size)); + assert_ok!(map.insert_group_ops( + &ap, + vec![ + (0, (TestValue::modification_with_len(4), None)), + (1, (TestValue::modification_with_len(4), None)) + ], + exp_size + )); + assert_eq!(map.get_group_size(&ap), GroupReadResult::Size(exp_size)); } #[test] @@ -526,7 +591,8 @@ mod test { ap.clone(), // base tag 1, 2, 3, 4 (1..5).map(|i| (i, TestValue::creation_with_len(i))), - ); + ) + .unwrap(); for i in 1..5 { assert_ok_eq!( diff --git a/aptos-move/mvhashmap/src/versioned_data.rs b/aptos-move/mvhashmap/src/versioned_data.rs index a9eb066f0617d..aed65b5a9bd40 100644 --- a/aptos-move/mvhashmap/src/versioned_data.rs +++ b/aptos-move/mvhashmap/src/versioned_data.rs @@ -3,7 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::types::{ - Flag, Incarnation, MVDataError, MVDataOutput, ShiftedTxnIndex, TxnIndex, ValueWithLayout, + Incarnation, MVDataError, MVDataOutput, ShiftedTxnIndex, TxnIndex, ValueWithLayout, }; use anyhow::Result; use aptos_aggregator::delta_change_set::DeltaOp; @@ -17,19 +17,24 @@ use std::{ fmt::Debug, hash::Hash, sync::{ - atomic::{AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, }; +pub(crate) const FLAG_DONE: bool = false; +pub(crate) const FLAG_ESTIMATE: bool = true; + /// Every entry in shared multi-version data-structure has an "estimate" flag /// and some content. -struct Entry { +/// TODO: can remove pub(crate) once aggregator V1 is deprecated. +pub(crate) struct Entry { /// Actual contents. - cell: EntryCell, + pub(crate) value: V, - /// Used to mark the entry as a "write estimate". - flag: Flag, + /// Used to mark the entry as a "write estimate". Stored as an atomic so + /// marking an estimate can proceed w. read lock. + flag: AtomicBool, } /// Represents the content of a single entry in multi-version data-structure. @@ -49,7 +54,7 @@ enum EntryCell { /// A versioned value internally is represented as a BTreeMap from indices of /// transactions that update the given access path & the corresponding entries. struct VersionedValue { - versioned_map: BTreeMap>>, + versioned_map: BTreeMap>>>, } /// Maps each key (access path) to an internal versioned value representation. @@ -58,36 +63,39 @@ pub struct VersionedData { total_base_value_size: AtomicU64, } -impl Entry { - fn new_write_from(incarnation: Incarnation, value: ValueWithLayout) -> Entry { - Entry { - cell: EntryCell::Write(incarnation, value), - flag: Flag::Done, - } - } +fn new_write_entry(incarnation: Incarnation, value: ValueWithLayout) -> Entry> { + Entry::new(EntryCell::Write(incarnation, value)) +} - fn new_delta_from(data: DeltaOp) -> Entry { +fn new_delta_entry(data: DeltaOp) -> Entry> { + Entry::new(EntryCell::Delta(data, None)) +} + +impl Entry { + pub(crate) fn new(value: V) -> Entry { Entry { - cell: EntryCell::Delta(data, None), - flag: Flag::Done, + value, + flag: AtomicBool::new(FLAG_DONE), } } - fn flag(&self) -> Flag { - self.flag + pub(crate) fn is_estimate(&self) -> bool { + self.flag.load(Ordering::Relaxed) == FLAG_ESTIMATE } - fn mark_estimate(&mut self) { - self.flag = Flag::Estimate; + pub(crate) fn mark_estimate(&self) { + self.flag.store(FLAG_ESTIMATE, Ordering::Relaxed); } +} +impl Entry> { // The entry must be a delta, will record the provided value as a base value // shortcut (the value in storage before block execution). If a value was already // recorded, the new value is asserted for equality. fn record_delta_shortcut(&mut self, value: u128) { use crate::versioned_data::EntryCell::Delta; - self.cell = match self.cell { + self.value = match self.value { Delta(delta_op, maybe_shortcut) => { if let Some(prev_value) = maybe_shortcut { assert_eq!(value, prev_value, "Recording different shortcuts"); @@ -121,14 +129,14 @@ impl VersionedValue { // During traversal, all aggregator deltas have to be accumulated together. let mut accumulator: Option> = None; while let Some((idx, entry)) = iter.next_back() { - if entry.flag() == Flag::Estimate { + if entry.is_estimate() { // Found a dependency. return Err(Dependency( idx.idx().expect("May not depend on storage version"), )); } - match (&entry.cell, accumulator.as_mut()) { + match (&entry.value, accumulator.as_mut()) { (EntryCell::Write(incarnation, data), None) => { // Resolve to the write if no deltas were applied in between. return Ok(Versioned( @@ -214,7 +222,7 @@ impl VersionedValue { } impl VersionedData { - pub(crate) fn new() -> Self { + pub(crate) fn empty() -> Self { Self { values: DashMap::new(), total_base_value_size: AtomicU64::new(0), @@ -233,16 +241,16 @@ impl VersionedData { let mut v = self.values.entry(key).or_default(); v.versioned_map.insert( ShiftedTxnIndex::new(txn_idx), - CachePadded::new(Entry::new_delta_from(delta)), + CachePadded::new(new_delta_entry(delta)), ); } /// Mark an entry from transaction 'txn_idx' at access path 'key' as an estimated write /// (for future incarnation). Will panic if the entry is not in the data-structure. pub fn mark_estimate(&self, key: &K, txn_idx: TxnIndex) { - let mut v = self.values.get_mut(key).expect("Path must exist"); + let v = self.values.get(key).expect("Path must exist"); v.versioned_map - .get_mut(&ShiftedTxnIndex::new(txn_idx)) + .get(&ShiftedTxnIndex::new(txn_idx)) .expect("Entry by the txn must exist to mark estimate") .mark_estimate(); } @@ -295,10 +303,10 @@ impl VersionedData { self.total_base_value_size .fetch_add(base_size as u64, Ordering::Relaxed); } - v.insert(CachePadded::new(Entry::new_write_from(0, value))); + v.insert(CachePadded::new(new_write_entry(0, value))); }, Occupied(mut o) => { - if let EntryCell::Write(i, existing_value) = &o.get().cell { + if let EntryCell::Write(i, existing_value) = &o.get().value { assert!(*i == 0); match (existing_value, &value) { (RawFromStorage(ev), RawFromStorage(v)) => { @@ -311,7 +319,7 @@ impl VersionedData { }, (RawFromStorage(_), Exchanged(_, _)) => { // Received more info, update. - o.insert(CachePadded::new(Entry::new_write_from(0, value))); + o.insert(CachePadded::new(new_write_entry(0, value))); }, (Exchanged(ev, e_layout), Exchanged(v, layout)) => { // base value may have already been provided by another transaction @@ -345,7 +353,7 @@ impl VersionedData { let mut v = self.values.entry(key).or_default(); let prev_entry = v.versioned_map.insert( ShiftedTxnIndex::new(txn_idx), - CachePadded::new(Entry::new_write_from( + CachePadded::new(new_write_entry( incarnation, ValueWithLayout::Exchanged(data, maybe_layout), )), @@ -353,7 +361,7 @@ impl VersionedData { // Assert that the previous entry for txn_idx, if present, had lower incarnation. assert!(prev_entry.map_or(true, |entry| -> bool { - if let EntryCell::Write(i, _) = entry.cell { + if let EntryCell::Write(i, _) = entry.value { i < incarnation } else { true @@ -376,7 +384,7 @@ impl VersionedData { let mut v = self.values.entry(key).or_default(); let prev_entry = v.versioned_map.insert( ShiftedTxnIndex::new(txn_idx), - CachePadded::new(Entry::new_write_from( + CachePadded::new(new_write_entry( incarnation, ValueWithLayout::Exchanged(arc_data.clone(), None), )), @@ -384,7 +392,7 @@ impl VersionedData { // Changes versioned metadata that was stored. prev_entry.map_or(true, |entry| -> bool { - if let EntryCell::Write(_, existing_v) = &entry.cell { + if let EntryCell::Write(_, existing_v) = &entry.value { arc_data.as_state_value_metadata() != existing_v .extract_value_no_layout() diff --git a/aptos-move/mvhashmap/src/versioned_delayed_fields.rs b/aptos-move/mvhashmap/src/versioned_delayed_fields.rs index bd1c4deccd71e..042e0024c8dd0 100644 --- a/aptos-move/mvhashmap/src/versioned_delayed_fields.rs +++ b/aptos-move/mvhashmap/src/versioned_delayed_fields.rs @@ -402,7 +402,7 @@ impl VersionedDelayedFields { /// Part of the big multi-versioned data-structure, which creates different types of /// versioned maps (including this one for delayed fields), and delegates access. Hence, /// new should only be used from the crate. - pub(crate) fn new() -> Self { + pub(crate) fn empty() -> Self { Self { values: DashMap::new(), next_idx_to_commit: CachePadded::new(AtomicTxnIndex::new(0)), diff --git a/aptos-move/mvhashmap/src/versioned_group_data.rs b/aptos-move/mvhashmap/src/versioned_group_data.rs index bf728828d64d6..afa857a49a9e1 100644 --- a/aptos-move/mvhashmap/src/versioned_group_data.rs +++ b/aptos-move/mvhashmap/src/versioned_group_data.rs @@ -1,473 +1,259 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::types::{ - Flag, Incarnation, MVGroupError, ShiftedTxnIndex, TxnIndex, ValueWithLayout, Version, +use crate::{ + types::{ + Incarnation, MVDataError, MVDataOutput, MVGroupError, ShiftedTxnIndex, TxnIndex, + ValueWithLayout, Version, + }, + versioned_data::Entry as SizeEntry, + VersionedData, +}; +use anyhow::{anyhow, bail}; +use aptos_types::{ + error::{code_invariant_error, PanicError}, + write_set::{TransactionWrite, WriteOpKind}, }; -use anyhow::bail; -use aptos_types::write_set::{TransactionWrite, WriteOpKind}; use aptos_vm_types::{resolver::ResourceGroupSize, resource_group_adapter::group_size_as_sum}; -use claims::{assert_matches, assert_none, assert_some}; -use crossbeam::utils::CachePadded; +use claims::assert_some; use dashmap::DashMap; use move_core_types::value::MoveTypeLayout; use serde::Serialize; use std::{ collections::{ - btree_map::{self, BTreeMap}, - HashMap, + btree_map::{BTreeMap, Entry::Vacant}, + HashSet, }, fmt::Debug, hash::Hash, sync::Arc, }; -struct GroupEntry { - incarnation: Incarnation, - // Note: can be a raw pointer (different data-structure holds the value during the - // lifetime), but would require unsafe access. - value: ValueWithLayout, - flag: Flag, -} - -impl GroupEntry { - fn new(incarnation: Incarnation, value: ValueWithLayout) -> Self { - Self { - incarnation, - value, - flag: Flag::Done, - } - } -} - -/// Represents a group value, i.e. a key that does not correspond to a single value, -/// but instead a collection of values each associated with a tag. -/// -/// Implementation note: due to DashMap in VersionedGroupData, the updates are atomic. -/// If this changes, we must maintain invariants on insertion / deletion order among -/// members (e.g. versioned_map then idx_to_update, deletion vice versa). -pub(crate) struct VersionedGroupValue { - /// While versioned_map maps tags to versioned entries for the tag, idx_to_update - /// maps a transaction index to all corresponding group updates. ShiftedTxnIndex is used - /// to dedicated index 0 for base (storage version, prior to block execution) values. - versioned_map: HashMap>>>, - /// Mapping transaction indices to the set of group member updates. As it is required - /// to provide base values from storage, and since all versions including storage are - /// represented in the same data-structure, the key set corresponds to all relevant - /// tags (group membership is not fixed, see aip-9). - /// Note: if we do not garbage collect final idx_to_update contents until the end of - /// block execution (lifetime of the data-structure), then we can have other structures - /// hold raw pointers to the values as an optimization. - idx_to_update: BTreeMap>>>, - - /// Group contents corresponding to the latest committed version. - committed_group: HashMap>, - - /// Group size has changed between speculative executions. Useful to know for the best - /// heuristic behavior when reading the group size (e.g. wait on the dependency or not). - size_changed: bool, +#[derive(Default)] +struct VersionedGroupSize { + size_entries: BTreeMap>, + // Determines whether it is safe for size queries to read the value from an entry marked as + // ESTIMATE. The heuristic checks on every write, whether the same size would be returned + // after the respective write took effect. Once set, the flag remains set to true. + // TODO: Handle remove similarly. May want to depend on transaction indices, i.e. if size + // has changed early in the block, it may not have an influence on much later transactions. + size_has_changed: bool, } /// Maps each key (access path) to an internal VersionedValue. pub struct VersionedGroupData { - group_values: DashMap>, + // TODO: Optimize the key represetantion to avoid cloning and concatenation for APIs + // such as get, where only & of the key is needed. + values: VersionedData<(K, T), V>, + // TODO: Once AggregatorV1 is deprecated (no V: TransactionWrite trait bound), + // switch to VersionedVersionedData. + // If an entry exists for a group key in Dashmap, the group is considered initialized. + group_sizes: DashMap, + + // Stores a set of tags for this group, basically a superset of all tags encountered in + // group related APIs. The accesses are synchronized with group size entry (for now), + // but it is stored separately for conflict free read-path for txn materialization + // (as the contents of group_tags are used in preparing finalized group contents). + // Note: The contents of group_tags are non-deterministic, but finalize_group filters + // out tags for which the latest value does not exist. The implementation invariant + // that the contents observed in the multi-versioned map after index is committed + // must correspond to the outputs recorded by the committed transaction incarnations. + // (and the correctness of the outputs is the responsibility of BlockSTM validation). + group_tags: DashMap>, } -impl Default - for VersionedGroupValue +impl< + K: Hash + Clone + Debug + Eq, + T: Hash + Clone + Debug + Eq + Serialize, + V: TransactionWrite, + > VersionedGroupData { - fn default() -> Self { + pub(crate) fn empty() -> Self { Self { - versioned_map: HashMap::new(), - idx_to_update: BTreeMap::new(), - committed_group: HashMap::new(), - size_changed: false, + values: VersionedData::empty(), + group_sizes: DashMap::new(), + group_tags: DashMap::new(), } } -} -impl VersionedGroupValue { - fn set_raw_base_values(&mut self, values: impl Iterator) { - let zero_idx = ShiftedTxnIndex::zero_idx(); - match self.idx_to_update.get(&zero_idx) { - Some(previous) => { - // base value may have already been provided by another transaction - // executed simultaneously and asking for the same resource group. - // Value from storage must be identical, but then delayed field - // identifier exchange could've modified it. - // - // If they are RawFromStorage, they need to be identical. - // Assert the length of bytes for efficiency (instead of full equality) - for (tag, v) in values { - let prev_v = previous - .get(&tag) - .expect("Reading twice from storage must be consistent"); - if let ValueWithLayout::RawFromStorage(prev_v) = prev_v { - assert_eq!(v.bytes().map(|b| b.len()), prev_v.bytes().map(|b| b.len())); - } - } - }, - // For base value, incarnation is irrelevant, and is always set to 0. - None => { - self.write( - zero_idx, - 0, - values.map(|(k, v)| (k, ValueWithLayout::RawFromStorage(Arc::new(v)))), + pub(crate) fn num_keys(&self) -> usize { + self.group_sizes.len() + } + + pub fn set_raw_base_values( + &self, + group_key: K, + base_values: Vec<(T, V)>, + ) -> anyhow::Result<()> { + let mut group_sizes = self.group_sizes.entry(group_key.clone()).or_default(); + + if let Vacant(entry) = group_sizes.size_entries.entry(ShiftedTxnIndex::zero_idx()) { + // Perform group size computation if base not already provided. + let group_size = group_size_as_sum::( + base_values + .iter() + .flat_map(|(tag, value)| value.bytes().map(|b| (tag.clone(), b.len()))), + ) + .map_err(|e| { + anyhow!( + "Tag serialization error in resource group at {:?}: {:?}", + group_key.clone(), + e + ) + })?; + + entry.insert(SizeEntry::new(group_size)); + + let mut superset_tags = self.group_tags.entry(group_key.clone()).or_default(); + for (tag, value) in base_values.into_iter() { + superset_tags.insert(tag.clone()); + self.values.set_base_value( + (group_key.clone(), tag), + ValueWithLayout::RawFromStorage(Arc::new(value)), ); - }, + } } + + Ok(()) } - fn update_tagged_base_value_with_layout( - &mut self, + pub fn update_tagged_base_value_with_layout( + &self, + group_key: K, tag: T, value: V, layout: Option>, ) { - let zero_idx = ShiftedTxnIndex::zero_idx(); - let v = ValueWithLayout::Exchanged(Arc::new(value), layout.clone()); - - use btree_map::Entry::*; - match self - .versioned_map - .entry(tag.clone()) - .or_default() - .entry(zero_idx.clone()) - { - Occupied(mut o) => { - match &o.get().value { - ValueWithLayout::RawFromStorage(_) => { - o.insert(CachePadded::new(GroupEntry::new(0, v.clone()))); - - assert_matches!( - self.idx_to_update - .get_mut(&zero_idx) - .expect("Base version must exist when updating for exchange") - .insert(tag.clone(), v.clone()), - Some(ValueWithLayout::RawFromStorage(_)) - ); - - let existing = self - .committed_group - .get_mut(&tag) - .expect("Tag must exist in committed when updating for exchange"); - assert_matches!(existing, &mut ValueWithLayout::RawFromStorage(_)); - *existing = v; - }, - ValueWithLayout::Exchanged(_, _) => { - // already exchanged, skipping. - }, - } - }, - Vacant(_) => { - unreachable!("Base version must exist when updating for exchange") - }, - }; + self.values.set_base_value( + (group_key, tag), + ValueWithLayout::Exchanged(Arc::new(value), layout.clone()), + ); } - fn write( - &mut self, - shifted_idx: ShiftedTxnIndex, + /// Writes new resource group values (and size) specified by tag / value pair + /// iterators. Returns true if a new tag is written compared to the previous + /// incarnation (set of previous tags provided as a parameter), or if the size + /// as observed after the new write differs from before the write took place. + /// In these cases the caller (Block-STM) may have to do certain validations. + pub fn write( + &self, + group_key: K, + txn_idx: TxnIndex, incarnation: Incarnation, - values: impl Iterator)>, - ) -> bool { - let zero_idx = ShiftedTxnIndex::zero_idx(); - let at_base_version = shifted_idx == zero_idx; - - // Remove any prior entries. - let mut prev_tag_and_sizes: HashMap> = - self.remove(shifted_idx.clone()).into_iter().collect(); - - // Changes the set of values, or the size of the entries (that might have been - // used even when marked as an estimate, if self.size_changed was still false). - // Note: we can flag if an estimate entry's size was used, or if the group size - // read observed self.size_changed == false. Otherwise, as in vanilla Block-STM, - // it would suffice to simply check if the re-execution writes outside of the - // prior (group) write-set. Not implemented (yet), as for this optimization to - // be useful, the group metadata checks also need to be handled similarly. - let mut changes_behavior = false; - - let arc_map = values - .map(|(tag, v)| { - changes_behavior |= prev_tag_and_sizes.remove(&tag) != Some(v.bytes_len()); - - // Update versioned_map. - self.versioned_map.entry(tag.clone()).or_default().insert( - shifted_idx.clone(), - CachePadded::new(GroupEntry::new(incarnation, v.clone())), - ); - - (tag, v) - }) - .collect(); - - if !prev_tag_and_sizes.is_empty() { - changes_behavior = true; - } - - assert_none!( - self.idx_to_update - .insert(shifted_idx, CachePadded::new(arc_map)), - "prev_map previously removed and processed." - ); - - if at_base_version { - // base version is from storage and final - immediately treat as committed. - self.commit_idx(zero_idx, true) - .expect("Marking storage version as committed must succeed"); - } + values: impl IntoIterator>))>, + size: ResourceGroupSize, + mut prev_tags: HashSet, + ) -> Result { + let mut ret = false; + let mut tags_to_write = vec![]; - if changes_behavior && incarnation > 0 { - // Incarnation 0 sets the group contents the first time, but this is not - // considered as changing size between speculative executions - all later - // incarnations, however, are considered. - self.size_changed = true; - } + { + let superset_tags = self.group_tags.get(&group_key).ok_or_else(|| { + // Due to read-before-write. + code_invariant_error("Group (tags) must be initialized to write to") + })?; + + for (tag, (value, layout)) in values.into_iter() { + if !superset_tags.contains(&tag) { + tags_to_write.push(tag.clone()); + } - changes_behavior - } + ret |= !prev_tags.remove(&tag); - fn mark_estimate(&mut self, txn_idx: TxnIndex) { - let shifted_idx = ShiftedTxnIndex::new(txn_idx); - let idx_updates = self - .idx_to_update - .get(&shifted_idx) - .expect("Group updates must exist at the index to mark estimate"); - - // estimate flag lives in GroupEntry, w. value in versioned_map to simplify reading - // based on txn_idx and tag. marking estimates occurs per txn (data MVHashMap exposes - // the interface for txn_idx & key). Hence, we must mark tags individually. - for (tag, _) in idx_updates.iter() { - self.versioned_map - .get_mut(tag) - .expect("Versioned entry must exist for tag") - .get_mut(&shifted_idx) - .expect("Versioned entry must exist") - .flag = Flag::Estimate; + self.values.write( + (group_key.clone(), tag), + txn_idx, + incarnation, + Arc::new(value), + layout, + ); + } } - } - fn remove(&mut self, shifted_idx: ShiftedTxnIndex) -> Vec<(T, Option)> { - // Remove idx updates first, then entries. - let idx_update_tags: Vec<(T, Option)> = self - .idx_to_update - .remove(&shifted_idx) - .map_or(vec![], |map| { - map.into_inner() - .into_iter() - .map(|(tag, v)| (tag, v.bytes_len())) - .collect() - }); - - // Similar to mark_estimate, need to remove an individual entry for each tag. - for (tag, _) in idx_update_tags.iter() { - assert_some!( - self.versioned_map - .get_mut(tag) - .expect("Versioned entry must exist for tag") - .remove(&shifted_idx), - "Entry for tag / idx must exist to be removed" - ); + for prev_tag in prev_tags { + let key = (group_key.clone(), prev_tag); + self.values.remove(&key, txn_idx); } - idx_update_tags - } - - // Records the latest committed op for each tag in the group (removed tags ar excluded). - fn commit_idx( - &mut self, - shifted_idx: ShiftedTxnIndex, - allow_new_modification: bool, - ) -> anyhow::Result<()> { - use std::collections::hash_map::Entry::*; - use WriteOpKind::*; - - let idx_updates = self - .idx_to_update - .get(&shifted_idx) - .expect("Group updates must exist at the index to commit"); - for (tag, v) in idx_updates.iter() { - match (self.committed_group.entry(tag.clone()), v.write_op_kind()) { - (Occupied(entry), Deletion) => { - entry.remove(); - }, - (Occupied(mut entry), Modification) => { - entry.insert(v.clone()); - }, - (Vacant(entry), Creation) => { - entry.insert(v.clone()); - }, - (Vacant(entry), Modification) if allow_new_modification => { - entry.insert(v.clone()); - }, - (Occupied(mut entry), Creation) if entry.get().write_op_kind() == Deletion => { - entry.insert(v.clone()); - }, - (e, _) => { - bail!( - "[{shifted_idx:?}] WriteOp kind {:?} not consistent with previous value at tag {tag:?}, value: {e:?}", - v.write_op_kind(), - ); - }, - } + if !tags_to_write.is_empty() { + let mut superset_tags = self + .group_tags + .get_mut(&group_key) + .expect("Group must be initialized"); + superset_tags.extend(tags_to_write); } - Ok(()) - } - - fn get_committed_group(&self) -> Vec<(T, ValueWithLayout)> { - self.committed_group.clone().into_iter().collect() - } - - fn get_latest_tagged_value( - &self, - tag: &T, - txn_idx: TxnIndex, - ) -> Result<(Version, ValueWithLayout), MVGroupError> { - let common_error = || -> MVGroupError { - if self - .idx_to_update - .contains_key(&ShiftedTxnIndex::zero_idx()) - { - MVGroupError::TagNotFound - } else { - MVGroupError::Uninitialized - } - }; - - self.versioned_map - .get(tag) - .ok_or(common_error()) - .and_then(|tree| { - match tree - .range(ShiftedTxnIndex::zero_idx()..ShiftedTxnIndex::new(txn_idx)) - .next_back() - { - Some((idx, entry)) => { - if entry.flag == Flag::Estimate { - Err(MVGroupError::Dependency( - idx.idx() - .expect("Base version cannot be marked as estimate"), - )) - } else { - Ok(( - idx.idx().map(|idx| (idx, entry.incarnation)), - entry.value.clone(), - )) - } - }, - None => Err(common_error()), + let mut group_sizes = self.group_sizes.get_mut(&group_key).ok_or_else(|| { + // Due to read-before-write. + code_invariant_error("Group (sizes) must be initialized to write to") + })?; + + if !(group_sizes.size_has_changed && ret) { + let (size_changed, update_flag) = group_sizes + .size_entries + .range(ShiftedTxnIndex::zero_idx()..ShiftedTxnIndex::new(txn_idx + 1)) + .next_back() + .ok_or_else(|| { + code_invariant_error("Initialized group sizes must contain storage version") + }) + .map(|(idx, prev_size)| { + ( + prev_size.value != size, + // Update the size_has_changed flag if the entry isn't the base value + // (which may be non-existent) or if the incarnation > 0. + *idx != ShiftedTxnIndex::zero_idx() || incarnation > 0, + ) + })?; + + if size_changed { + ret = true; + if update_flag { + group_sizes.size_has_changed = true; } - }) - } - - fn get_latest_group_size(&self, txn_idx: TxnIndex) -> Result { - if !self - .idx_to_update - .contains_key(&ShiftedTxnIndex::zero_idx()) - { - return Err(MVGroupError::Uninitialized); - } - - let sizes = self - .versioned_map - .iter() - .flat_map(|(tag, tree)| { - tree.range(ShiftedTxnIndex::zero_idx()..ShiftedTxnIndex::new(txn_idx)) - .next_back() - .and_then(|(idx, entry)| { - // We would like to use the value in an estimated entry if size never changed - // between speculative executions, i.e. to depend on estimates only when the - // size has changed. In this case, execution can wait on a dependency, while - // validation can short circuit to fail. - if entry.flag == Flag::Estimate && self.size_changed { - Some(Err(MVGroupError::Dependency( - idx.idx().expect("May not depend on storage version"), - ))) - } else { - entry - .value - .bytes_len() - .map(|bytes_len| Ok((tag, bytes_len))) - } - }) - }) - .collect::, MVGroupError>>()?; - group_size_as_sum(sizes.into_iter()).map_err(MVGroupError::TagSerializationError) - } -} - -impl< - K: Hash + Clone + Debug + Eq, - T: Hash + Clone + Debug + Eq + Serialize, - V: TransactionWrite, - > VersionedGroupData -{ - pub(crate) fn new() -> Self { - Self { - group_values: DashMap::new(), + } } - } - - pub(crate) fn num_keys(&self) -> usize { - self.group_values.len() - } - pub fn set_raw_base_values(&self, key: K, base_values: impl IntoIterator) { - // Incarnation is irrelevant for storage version, set to 0. - self.group_values - .entry(key) - .or_default() - .set_raw_base_values(base_values.into_iter()); - } - - pub fn update_tagged_base_value_with_layout( - &self, - key: K, - tag: T, - value: V, - layout: Option>, - ) { - // Incarnation is irrelevant for storage version, set to 0. - self.group_values - .entry(key) - .or_default() - .update_tagged_base_value_with_layout(tag, value, layout); - } + group_sizes + .size_entries + .insert(ShiftedTxnIndex::new(txn_idx), SizeEntry::new(size)); - pub fn write( - &self, - key: K, - txn_idx: TxnIndex, - incarnation: Incarnation, - values: impl IntoIterator>))>, - ) -> bool { - self.group_values.entry(key).or_default().write( - ShiftedTxnIndex::new(txn_idx), - incarnation, - values - .into_iter() - .map(|(k, (v, l))| (k, ValueWithLayout::Exchanged(Arc::new(v), l))), - ) + Ok(ret) } /// Mark all entry from transaction 'txn_idx' at access path 'key' as an estimated write /// (for future incarnation). Will panic if the entry is not in the data-structure. - pub fn mark_estimate(&self, key: &K, txn_idx: TxnIndex) { - self.group_values - .get_mut(key) + pub fn mark_estimate(&self, group_key: &K, txn_idx: TxnIndex, tags: HashSet) { + for tag in tags { + let key = (group_key.clone(), tag); + self.values.mark_estimate(&key, txn_idx); + } + + self.group_sizes + .get(group_key) .expect("Path must exist") - .mark_estimate(txn_idx); + .size_entries + .get(&ShiftedTxnIndex::new(txn_idx)) + .expect("Entry by the txn must exist to mark estimate") + .mark_estimate(); } /// Remove all entries from transaction 'txn_idx' at access path 'key'. - pub fn remove(&self, key: &K, txn_idx: TxnIndex) { - let mut group = self.group_values.get_mut(key).expect("Path must exist"); - let removed = group.remove(ShiftedTxnIndex::new(txn_idx)); - - if !removed.is_empty() { - group.size_changed = true; + pub fn remove(&self, group_key: &K, txn_idx: TxnIndex, tags: HashSet) { + for tag in tags { + let key = (group_key.clone(), tag); + self.values.remove(&key, txn_idx); } + + // TODO: consider setting size_has_changed flag if e.g. the size observed + // after remove is different. + assert_some!( + self.group_sizes + .get_mut(group_key) + .expect("Path must exist") + .size_entries + .remove(&ShiftedTxnIndex::new(txn_idx)), + "Entry for the txn must exist to be deleted" + ); } /// Read the latest value corresponding to a tag at a given group (identified by key). @@ -477,40 +263,60 @@ impl< /// group to the provided layout. pub fn fetch_tagged_data( &self, - key: &K, + group_key: &K, tag: &T, txn_idx: TxnIndex, ) -> Result<(Version, ValueWithLayout), MVGroupError> { - match self.group_values.get(key) { - Some(g) => g.get_latest_tagged_value(tag, txn_idx), - None => Err(MVGroupError::Uninitialized), + let key = (group_key.clone(), tag.clone()); + let initialized = self.group_sizes.contains_key(group_key); + + match self.values.fetch_data(&key, txn_idx) { + Ok(MVDataOutput::Versioned(version, value)) => Ok((version, value)), + Err(MVDataError::Uninitialized) => Err(if initialized { + MVGroupError::TagNotFound + } else { + MVGroupError::Uninitialized + }), + Err(MVDataError::Dependency(dep_idx)) => Err(MVGroupError::Dependency(dep_idx)), + Ok(MVDataOutput::Resolved(_)) + | Err(MVDataError::Unresolved(_)) + | Err(MVDataError::DeltaApplicationFailure) => { + unreachable!("Not using aggregatorV1") + }, } } - /// Returns the sum of latest sizes of all group members (and respective tags), collected - /// based on the recorded list of tags. If the latest entry at a tag is marked as estimate - /// and the group size has changed between speculative executions then a dependency is - /// returned. Otherwise, the size is computed including the sizes of estimated entries. - /// This works w. Block-STM, because a validation wave is triggered when any group entry - /// size changes after re-execution (also when an entry is added or removed). pub fn get_group_size( &self, - key: &K, + group_key: &K, txn_idx: TxnIndex, ) -> Result { - match self.group_values.get(key) { - Some(g) => g.get_latest_group_size(txn_idx), + match self.group_sizes.get(group_key) { + Some(g) => g + .size_entries + .range(ShiftedTxnIndex::zero_idx()..ShiftedTxnIndex::new(txn_idx)) + .next_back() + .map(|(idx, size)| { + if size.is_estimate() && g.size_has_changed { + Err(MVGroupError::Dependency( + idx.idx().expect("May not depend on storage version"), + )) + } else { + Ok(size.value) + } + }) + .unwrap_or(Err(MVGroupError::Uninitialized)), None => Err(MVGroupError::Uninitialized), } } pub fn validate_group_size( &self, - key: &K, + group_key: &K, txn_idx: TxnIndex, group_size_to_validate: ResourceGroupSize, ) -> bool { - self.get_group_size(key, txn_idx) == Ok(group_size_to_validate) + self.get_group_size(group_key, txn_idx) == Ok(group_size_to_validate) } /// For a given key that corresponds to a group, and an index of a transaction the last @@ -528,21 +334,36 @@ impl< /// modification otherwise). When consistent, the output is Ok(..). pub fn finalize_group( &self, - key: &K, + group_key: &K, txn_idx: TxnIndex, - ) -> anyhow::Result)>> { - let mut v = self.group_values.get_mut(key).expect("Path must exist"); - - v.commit_idx(ShiftedTxnIndex::new(txn_idx), false)?; - Ok(v.get_committed_group()) - } - - pub fn get_last_committed_group( - &self, - key: &K, - ) -> anyhow::Result)>> { - let v = self.group_values.get_mut(key).expect("Path must exist"); - Ok(v.get_committed_group()) + ) -> anyhow::Result<(Vec<(T, ValueWithLayout)>, ResourceGroupSize)> { + let superset_tags = self + .group_tags + .get(group_key) + .expect("Group tags must be set") + .clone(); + + let committed_group = superset_tags + .into_iter() + .map( + |tag| match self.fetch_tagged_data(group_key, &tag, txn_idx + 1) { + Ok((_, value)) => Ok((value.write_op_kind() != WriteOpKind::Deletion) + .then(|| (tag, value.clone()))), + Err(MVGroupError::TagNotFound) => Ok(None), + Err(e) => { + bail!("Unexpected error in finalize group fetching value {:?}", e) + }, + }, + ) + .collect::>>()? + .into_iter() + .flatten() + .collect(); + Ok(( + committed_group, + self.get_group_size(group_key, txn_idx + 1) + .map_err(|e| anyhow!("Unexpected error in finalize group get size {:?}", e))?, + )) } } @@ -553,7 +374,10 @@ mod test { test::{KeyType, TestValue}, StorageVersion, }; - use claims::{assert_err, assert_matches, assert_none, assert_ok_eq, assert_some_eq}; + use claims::{ + assert_err, assert_matches, assert_none, assert_ok, assert_ok_eq, assert_some_eq, + }; + use std::collections::HashMap; use test_case::test_case; #[should_panic] @@ -562,14 +386,14 @@ mod test { #[test_case(2)] fn group_no_path_exists(test_idx: usize) { let ap = KeyType(b"/foo/b".to_vec()); - let map = VersionedGroupData::>, usize, TestValue>::new(); + let map = VersionedGroupData::>, usize, TestValue>::empty(); match test_idx { 0 => { - map.mark_estimate(&ap, 1); + map.mark_estimate(&ap, 1, HashSet::new()); }, 1 => { - map.remove(&ap, 2); + map.remove(&ap, 2, HashSet::new()); }, 2 => { let _ = map.finalize_group(&ap, 0); @@ -579,111 +403,247 @@ mod test { } #[test] - fn group_uninitialized() { + fn group_write_behavior_changes() { let ap_0 = KeyType(b"/foo/a".to_vec()); let ap_1 = KeyType(b"/foo/b".to_vec()); - let ap_2 = KeyType(b"/foo/c".to_vec()); + let map = VersionedGroupData::>, usize, TestValue>::empty(); + assert_ok!(map.set_raw_base_values(ap_0.clone(), vec![])); + assert_ok!(map.set_raw_base_values(ap_1.clone(), vec![])); + + let test_values = vec![ + (0usize, (TestValue::creation_with_len(1), None)), + (1usize, (TestValue::creation_with_len(1), None)), + ]; + let test_tags: HashSet = (0..2).collect(); + + // Sizes do need to be accurate with respect to written values for test. + let fake_size = ResourceGroupSize::Combined { + num_tagged_resources: 2, + all_tagged_resources_size: 20, + }; + let fake_changed_size = ResourceGroupSize::Combined { + num_tagged_resources: 3, + all_tagged_resources_size: 20, + }; + + let check_write = |ap: &KeyType>, + idx, + incarnation, + size, + prev_tags, + expected_write_ret, + expected_size_changed| { + assert_ok_eq!( + map.write( + ap.clone(), + idx, + incarnation, + test_values.clone().into_iter(), + size, + prev_tags, + ), + expected_write_ret + ); + + assert_eq!( + map.group_sizes.get(ap).unwrap().size_has_changed, + expected_size_changed, + ); + assert_eq!( + map.group_sizes + .get(ap) + .unwrap() + .size_entries + .get(&ShiftedTxnIndex::new(idx)) + .unwrap() + .value, + size + ); + }; + + // Incarnation 0 changes behavior due to empty prior tags, leading to write returning Ok(false), + // but it should not set the size_changed flag. + check_write(&ap_0, 3, 0, fake_size, HashSet::new(), true, false); + // However, if the first write is by incarnation >0, then size_has_changed will also be set. + check_write(&ap_1, 5, 1, fake_size, HashSet::new(), true, true); + + // Incarnation 1 does not change size. + check_write(&ap_0, 3, 1, fake_size, test_tags.clone(), false, false); + // Even with incarnation > 0, observed size does not change. + check_write(&ap_0, 4, 1, fake_size, HashSet::new(), true, false); + + // Incarnation 2 changes size. + check_write( + &ap_0, + 3, + 2, + fake_changed_size, + test_tags.clone(), + true, + true, + ); + // Once size_changed is set, it stays true. + check_write( + &ap_0, + 3, + 3, + fake_changed_size, + test_tags.clone(), + false, + true, + ); + check_write(&ap_0, 6, 0, fake_changed_size, HashSet::new(), true, true); + } + + #[test] + fn group_initialize_and_write() { + let ap = KeyType(b"/foo/a".to_vec()); + let ap_empty = KeyType(b"/foo/b".to_vec()); - let map = VersionedGroupData::>, usize, TestValue>::new(); + let map = VersionedGroupData::>, usize, TestValue>::empty(); + assert_matches!(map.get_group_size(&ap, 3), Err(MVGroupError::Uninitialized)); assert_matches!( - map.get_group_size(&ap_0, 3), + map.fetch_tagged_data(&ap, &1, 3), Err(MVGroupError::Uninitialized) ); - map.write( - ap_1.clone(), + // Does not need to be accurate. + let idx_3_size = ResourceGroupSize::Combined { + num_tagged_resources: 2, + all_tagged_resources_size: 20, + }; + // Write should fail because group is not initialized (R before W, where + // and read causes the base values/size to be set). + assert_err!(map.write( + ap.clone(), + 3, + 1, + (0..2).map(|i| (i, (TestValue::creation_with_len(1), None))), + idx_3_size, + HashSet::new(), + )); + assert_ok!(map.set_raw_base_values(ap.clone(), vec![])); + // Write should now succeed. + assert_ok!(map.write( + ap.clone(), 3, 1, // tags 0, 1, 2. (0..2).map(|i| (i, (TestValue::creation_with_len(1), None))), + idx_3_size, + HashSet::new(), + )); + + // Check sizes. + assert_ok_eq!(map.get_group_size(&ap, 4), idx_3_size); + assert_ok_eq!( + map.get_group_size(&ap, 3), + ResourceGroupSize::zero_combined() ); - // Size should be uninitialized even if the output of lower txn is stored - // (as long as the base isn't set). + // Check values. assert_matches!( - map.get_group_size(&ap_1, 3), - Err(MVGroupError::Uninitialized) - ); - assert_matches!( - map.get_group_size(&ap_1, 4), - Err(MVGroupError::Uninitialized) + map.fetch_tagged_data(&ap, &1, 3), + Err(MVGroupError::TagNotFound) ); - // for reading a tag at ap_1, w.o. returning size, idx = 3 is Uninitialized. assert_matches!( - map.fetch_tagged_data(&ap_1, &1, 3), - Err(MVGroupError::Uninitialized) + map.fetch_tagged_data(&ap, &3, 4), + Err(MVGroupError::TagNotFound) ); // ... but idx = 4 should find the previously stored value. assert_eq!( - map.fetch_tagged_data(&ap_1, &1, 4).unwrap(), - // Arc compares by value, no return size, incarnation. + map.fetch_tagged_data(&ap, &1, 4).unwrap(), ( Ok((3, 1)), ValueWithLayout::Exchanged(Arc::new(TestValue::creation_with_len(1)), None) ) ); - // ap_0 should still be uninitialized. + + // ap_empty should still be uninitialized. + assert_matches!( + map.fetch_tagged_data(&ap_empty, &1, 3), + Err(MVGroupError::Uninitialized) + ); assert_matches!( - map.fetch_tagged_data(&ap_0, &1, 3), + map.get_group_size(&ap_empty, 3), Err(MVGroupError::Uninitialized) ); + } - map.write( - ap_2.clone(), + #[test] + fn group_base_and_write() { + let ap = KeyType(b"/foo/a".to_vec()); + let map = VersionedGroupData::>, usize, TestValue>::empty(); + + // base tags 0, 1. + let base_values = vec![ + (0usize, TestValue::creation_with_len(1)), + (1usize, TestValue::creation_with_len(2)), + ]; + assert_ok!(map.set_raw_base_values(ap.clone(), base_values)); + + assert_ok!(map.write( + ap.clone(), 4, 0, // tags 1, 2. (1..3).map(|i| (i, (TestValue::creation_with_len(4), None))), - ); - assert_matches!( - map.fetch_tagged_data(&ap_2, &2, 4), - Err(MVGroupError::Uninitialized) - ); - map.set_raw_base_values( - ap_2.clone(), - // base tags 0, 1. - (0..2).map(|i| (i, TestValue::creation_with_len(2))), - ); + ResourceGroupSize::zero_combined(), + HashSet::new(), + )); - // Tag not found vs not initialized, assert_matches!( - map.fetch_tagged_data(&ap_2, &2, 4), + map.fetch_tagged_data(&ap, &2, 4), Err(MVGroupError::TagNotFound) ); assert_matches!( - map.fetch_tagged_data(&ap_2, &4, 5), + map.fetch_tagged_data(&ap, &3, 5), Err(MVGroupError::TagNotFound) ); - // vs finding a versioned entry from txn 4, vs from storage. assert_eq!( - map.fetch_tagged_data(&ap_2, &2, 5).unwrap(), + map.fetch_tagged_data(&ap, &2, 5).unwrap(), ( Ok((4, 0)), ValueWithLayout::Exchanged(Arc::new(TestValue::creation_with_len(4)), None) ) ); assert_eq!( - map.fetch_tagged_data(&ap_2, &0, 5).unwrap(), + map.fetch_tagged_data(&ap, &1, 4).unwrap(), ( Err(StorageVersion), ValueWithLayout::RawFromStorage(Arc::new(TestValue::creation_with_len(2))) ) ); + assert_eq!( + map.fetch_tagged_data(&ap, &0, 6).unwrap(), + ( + Err(StorageVersion), + ValueWithLayout::RawFromStorage(Arc::new(TestValue::creation_with_len(1))) + ) + ); } #[test] fn group_read_write_estimate() { use MVGroupError::*; let ap = KeyType(b"/foo/f".to_vec()); - let map = VersionedGroupData::>, usize, TestValue>::new(); + let map = VersionedGroupData::>, usize, TestValue>::empty(); - map.write( + let idx_5_size = ResourceGroupSize::Combined { + num_tagged_resources: 2, + all_tagged_resources_size: 20, + }; + + assert_ok!(map.set_raw_base_values(ap.clone(), vec![])); + assert_ok!(map.write( ap.clone(), 5, 3, // tags 0, 1, values are derived from [txn_idx, incarnation] seed. (0..2).map(|i| (i, (TestValue::new(vec![5, 3]), None))), - ); + idx_5_size, + HashSet::new(), + )); assert_eq!( map.fetch_tagged_data(&ap, &1, 12).unwrap(), ( @@ -691,13 +651,15 @@ mod test { ValueWithLayout::Exchanged(Arc::new(TestValue::new(vec![5, 3])), None) ) ); - map.write( + assert_ok!(map.write( ap.clone(), 10, 1, // tags 1, 2, values are derived from [txn_idx, incarnation] seed. (1..3).map(|i| (i, (TestValue::new(vec![10, 1]), None))), - ); + ResourceGroupSize::zero_combined(), + HashSet::new(), + )); assert_eq!( map.fetch_tagged_data(&ap, &1, 12).unwrap(), ( @@ -706,10 +668,10 @@ mod test { ) ); - map.mark_estimate(&ap, 10); + map.mark_estimate(&ap, 10, (1..3).collect()); assert_matches!(map.fetch_tagged_data(&ap, &1, 12), Err(Dependency(10))); assert_matches!(map.fetch_tagged_data(&ap, &2, 12), Err(Dependency(10))); - assert_matches!(map.fetch_tagged_data(&ap, &3, 12), Err(Uninitialized)); + assert_matches!(map.fetch_tagged_data(&ap, &3, 12), Err(TagNotFound)); assert_eq!( map.fetch_tagged_data(&ap, &0, 12).unwrap(), ( @@ -717,8 +679,9 @@ mod test { ValueWithLayout::Exchanged(Arc::new(TestValue::new(vec![5, 3])), None) ) ); + assert_matches!(map.get_group_size(&ap, 12), Err(Dependency(10))); - map.remove(&ap, 10); + map.remove(&ap, 10, (1..3).collect()); assert_eq!( map.fetch_tagged_data(&ap, &0, 12).unwrap(), ( @@ -733,43 +696,20 @@ mod test { ValueWithLayout::Exchanged(Arc::new(TestValue::new(vec![5, 3])), None) ) ); + + // Size should also be removed at 10. + assert_ok_eq!(map.get_group_size(&ap, 12), idx_5_size); } #[test] - fn latest_group_size() { - use MVGroupError::*; + fn group_size_changed_dependency() { let ap = KeyType(b"/foo/f".to_vec()); - let map = VersionedGroupData::>, usize, TestValue>::new(); - - map.write( - ap.clone(), - 5, - 3, - // tags 0, 1 - (0..2).map(|i| (i, (TestValue::creation_with_len(2), None))), - ); - - map.write( - ap.clone(), - 5, - 3, - // tags 0, 1 - (0..2).map(|i| (i, (TestValue::creation_with_len(2), None))), - ); - assert_matches!(map.get_group_size(&ap, 12), Err(Uninitialized)); - - map.set_raw_base_values( - ap.clone(), - // base tag 1, 2, 3, 4 - (1..5).map(|i| (i, TestValue::creation_with_len(1))), - ); + let map = VersionedGroupData::>, usize, TestValue>::empty(); let tag: usize = 5; let one_entry_len = TestValue::creation_with_len(1).bytes().unwrap().len(); let two_entry_len = TestValue::creation_with_len(2).bytes().unwrap().len(); - let three_entry_len = TestValue::creation_with_len(3).bytes().unwrap().len(); - let four_entry_len = TestValue::creation_with_len(4).bytes().unwrap().len(); - let exp_size = group_size_as_sum(vec![(&tag, two_entry_len); 2].into_iter().chain(vec![ + let idx_5_size = group_size_as_sum(vec![(&tag, two_entry_len); 2].into_iter().chain(vec![ ( &tag, one_entry_len @@ -777,363 +717,339 @@ mod test { 3 ])) .unwrap(); - assert_ok_eq!(map.get_group_size(&ap, 12), exp_size); - - map.write( - ap.clone(), - 10, - 1, - // tags 4, 5 - (4..6).map(|i| (i, (TestValue::creation_with_len(3), None))), - ); - let exp_size_12 = group_size_as_sum( - vec![(&tag, one_entry_len); 2] - .into_iter() - .chain(vec![(&tag, two_entry_len); 2]) - .chain(vec![(&tag, three_entry_len); 2]), - ) - .unwrap(); - assert_ok_eq!(map.get_group_size(&ap, 12), exp_size_12); - assert_ok_eq!(map.get_group_size(&ap, 10), exp_size); - - map.mark_estimate(&ap, 5); - assert_matches!(map.get_group_size(&ap, 12), Err(Dependency(5))); - let exp_size_4 = group_size_as_sum(vec![(&tag, one_entry_len); 4].into_iter()).unwrap(); - - assert_ok_eq!(map.get_group_size(&ap, 4), exp_size_4); + let base_size = group_size_as_sum(vec![(&tag, one_entry_len); 4].into_iter()).unwrap(); + let idx_5_size_with_ones = + group_size_as_sum(vec![(&tag, one_entry_len); 5].into_iter()).unwrap(); - map.write( + assert_ok!(map.set_raw_base_values( ap.clone(), - 6, - 1, - (0..2).map(|i| (i, (TestValue::creation_with_len(4), None))), - ); - let exp_size_7 = group_size_as_sum(vec![(&tag, one_entry_len); 3].into_iter().chain(vec![ - ( - &tag, - four_entry_len - ); - 2 - ])) - .unwrap(); - - assert_ok_eq!(map.get_group_size(&ap, 7), exp_size_7); - assert_matches!(map.get_group_size(&ap, 6), Err(Dependency(5))); - - map.remove(&ap, 5); - assert_ok_eq!(map.get_group_size(&ap, 6), exp_size_4); - } - - #[test] - fn size_changed_dependency() { - let ap = KeyType(b"/foo/f".to_vec()); - let map = VersionedGroupData::>, usize, TestValue>::new(); - - map.write( + // base tag 1, 2, 3, 4 + (1..5) + .map(|i| (i, TestValue::creation_with_len(1))) + .collect(), + )); + assert_ok!(map.write( ap.clone(), 5, 0, // tags 0, 1 (0..2).map(|i| (i, (TestValue::creation_with_len(2), None))), - ); + idx_5_size, + HashSet::new(), + )); - map.set_raw_base_values( - ap.clone(), - // base tag 1, 2, 3, 4 - (1..5).map(|i| (i, TestValue::creation_with_len(1))), - ); // Incarnation 0 and base values should not affect size_changed flag. - assert!(!map.group_values.get(&ap).unwrap().size_changed); + assert!(!map.group_sizes.get(&ap).unwrap().size_has_changed); - let tag: usize = 5; - let one_entry_len = TestValue::creation_with_len(1).bytes().unwrap().len(); - let two_entry_len = TestValue::creation_with_len(2).bytes().unwrap().len(); - let exp_size = group_size_as_sum(vec![(&tag, two_entry_len); 2].into_iter().chain(vec![ - ( - &tag, - one_entry_len - ); - 3 - ])) - .unwrap(); - let exp_size_with_ones = - group_size_as_sum(vec![(&tag, one_entry_len); 5].into_iter()).unwrap(); + assert_ok_eq!(map.get_group_size(&ap, 5), base_size); + assert!(map.validate_group_size(&ap, 4, base_size)); + assert!(!map.validate_group_size(&ap, 5, idx_5_size)); + assert_ok_eq!(map.get_group_size(&ap, 6), idx_5_size); // Despite estimates, should still return size. - map.mark_estimate(&ap, 5); - assert_ok_eq!(map.get_group_size(&ap, 12), exp_size); - assert!(map.validate_group_size(&ap, 12, exp_size)); - assert!(!map.validate_group_size(&ap, 12, exp_size_with_ones)); - - // Same write again won't change size. - map.write( - ap.clone(), - 5, - 1, - (0..2).map(|i| (i, (TestValue::creation_with_len(2), None))), + map.mark_estimate(&ap, 5, (0..2).collect()); + assert_ok_eq!(map.get_group_size(&ap, 12), idx_5_size); + assert!(map.validate_group_size(&ap, 12, idx_5_size)); + assert!(!map.validate_group_size(&ap, 12, ResourceGroupSize::zero_combined())); + + // Different write, same size again. + assert_ok_eq!( + map.write( + ap.clone(), + 5, + 1, + (0..3).map(|i| (i, (TestValue::creation_with_len(2), None))), + idx_5_size, + (0..2).collect(), + ), + true ); - assert!(!map.group_values.get(&ap).unwrap().size_changed); - map.mark_estimate(&ap, 5); - assert_ok_eq!(map.get_group_size(&ap, 12), exp_size); - assert!(map.validate_group_size(&ap, 12, exp_size)); - assert!(!map.validate_group_size(&ap, 12, exp_size_with_ones)); - - // Removing nothing won't change size. - map.remove(&ap, 6); - assert!(!map.group_values.get(&ap).unwrap().size_changed); - - map.write( + assert!(!map.group_sizes.get(&ap).unwrap().size_has_changed); + map.mark_estimate(&ap, 5, (0..2).collect()); + assert_ok_eq!(map.get_group_size(&ap, 12), idx_5_size); + assert!(map.validate_group_size(&ap, 12, idx_5_size)); + assert!(!map.validate_group_size(&ap, 12, ResourceGroupSize::zero_concrete())); + + // Remove currently does not affect size_has_changed. + map.remove(&ap, 5, (0..3).collect()); + assert!(!map.group_sizes.get(&ap).unwrap().size_has_changed); + assert_ok_eq!(map.get_group_size(&ap, 4), base_size); + assert!(map.validate_group_size(&ap, 6, base_size)); + + assert_ok!(map.write( ap.clone(), 5, 2, - (0..2).map(|i| (i, (TestValue::creation_with_len(1), None))), - ); + (0..3).map(|i| (i, (TestValue::creation_with_len(1), None))), + idx_5_size_with_ones, + (0..2).collect(), + )); // Size has changed between speculative writes. - assert!(map.group_values.get(&ap).unwrap().size_changed); - assert_ok_eq!(map.get_group_size(&ap, 12), exp_size_with_ones); - assert!(map.validate_group_size(&ap, 12, exp_size_with_ones)); - assert!(!map.validate_group_size(&ap, 12, exp_size)); + assert!(map.group_sizes.get(&ap).unwrap().size_has_changed); + assert_ok_eq!(map.get_group_size(&ap, 10), idx_5_size_with_ones); + assert!(map.validate_group_size(&ap, 10, idx_5_size_with_ones)); + assert!(!map.validate_group_size(&ap, 10, idx_5_size)); + assert_ok_eq!(map.get_group_size(&ap, 3), base_size); - map.mark_estimate(&ap, 5); + map.mark_estimate(&ap, 5, (0..3).collect()); assert_matches!( map.get_group_size(&ap, 12), Err(MVGroupError::Dependency(5)) ); - assert!(!map.validate_group_size(&ap, 12, exp_size_with_ones)); - assert!(!map.validate_group_size(&ap, 12, exp_size)); - - // Next check that size change gets properly set w. differing set of writes. - let ap_1 = KeyType(b"/foo/1".to_vec()); - let ap_2 = KeyType(b"/foo/2".to_vec()); - let ap_3 = KeyType(b"/foo/3".to_vec()); - - map.write( - ap_1.clone(), - 5, - 0, - // tags 0, 1 - (0..2).map(|i| (i, (TestValue::creation_with_len(2), None))), - ); - assert!(!map.group_values.get(&ap_1).unwrap().size_changed); - map.write( - ap_1.clone(), - 5, - 1, - // tags 0, 1 - (0..1).map(|i| (i, (TestValue::creation_with_len(2), None))), - ); - assert!(map.group_values.get(&ap_1).unwrap().size_changed); + assert!(!map.validate_group_size(&ap, 12, idx_5_size_with_ones)); + assert!(!map.validate_group_size(&ap, 12, idx_5_size)); + } - map.write( - ap_2.clone(), - 5, - 0, - // tags 0, 1 - (0..2).map(|i| (i, (TestValue::creation_with_len(2), None))), + #[test] + fn group_write_tags_change_behavior() { + let ap = KeyType(b"/foo/1".to_vec()); + + let map = VersionedGroupData::>, usize, TestValue>::empty(); + assert_ok!(map.set_raw_base_values(ap.clone(), vec![],)); + + assert_ok_eq!( + map.write( + ap.clone(), + 5, + 0, + // tags 0, 1 + (0..2).map(|i| (i, (TestValue::creation_with_len(2), None))), + ResourceGroupSize::zero_combined(), + HashSet::new(), + ), + true, ); - assert!(!map.group_values.get(&ap_2).unwrap().size_changed); - map.write( - ap_2.clone(), - 5, - 1, - // tags 0, 1 - (1..3).map(|i| (i, (TestValue::creation_with_len(2), None))), + // Write changes behavior (requiring re-validation) because of tags only when + // the new tags are not contained in the old tags. Not when a tag is no longer + // written. This is because no information about a resource in a group is + // validated by equality (group size and metadata are stored separately) - + // and in this sense resources in group are like normal resources. + assert_ok_eq!( + map.write( + ap.clone(), + 5, + 1, + // tags 0 - contained among {0, 1} + (0..1).map(|i| (i, (TestValue::creation_with_len(2), None))), + ResourceGroupSize::zero_combined(), + (0..2).collect(), + ), + false ); - assert!(map.group_values.get(&ap_2).unwrap().size_changed); - - map.write( - ap_3.clone(), - 5, - 0, - // tags 0, 1 - (0..2).map(|i| (i, (TestValue::creation_with_len(2), None))), + assert_ok_eq!( + map.write( + ap.clone(), + 5, + 2, + // tags 0, 1 - not contained among {0} + (0..2).map(|i| (i, (TestValue::creation_with_len(2), None))), + ResourceGroupSize::zero_combined(), + (0..1).collect(), + ), + true ); - assert!(!map.group_values.get(&ap_3).unwrap().size_changed); - map.remove(&ap_3, 5); - assert!(map.group_values.get(&ap_3).unwrap().size_changed); } fn finalize_group_as_hashmap( map: &VersionedGroupData>, usize, TestValue>, key: &KeyType>, idx: TxnIndex, - ) -> HashMap> { - map.finalize_group(key, idx).unwrap().into_iter().collect() + ) -> ( + HashMap>, + ResourceGroupSize, + ) { + let (group, size) = map.finalize_group(key, idx).unwrap(); + + (group.into_iter().collect(), size) } #[test] - fn group_commit_idx() { + fn group_finalize() { let ap = KeyType(b"/foo/f".to_vec()); - let map = VersionedGroupData::>, usize, TestValue>::new(); + let map = VersionedGroupData::>, usize, TestValue>::empty(); + + let base_values: Vec<_> = (1..4) + .map(|i| (i, TestValue::creation_with_len(i))) + .collect(); - map.set_raw_base_values( + assert_ok!(map.set_raw_base_values( ap.clone(), // base tag 1, 2, 3 - (1..4).map(|i| (i, TestValue::with_kind(i, true))), - ); - map.write( + base_values.clone(), + )); + let base_size = group_size_as_sum( + base_values + .into_iter() + .map(|(tag, value)| (tag, value.bytes().unwrap().len())), + ) + .unwrap(); + + // Does not need to be accurate. + let idx_3_size = ResourceGroupSize::Combined { + num_tagged_resources: 2, + all_tagged_resources_size: 20, + }; + let idx_5_size = ResourceGroupSize::Combined { + num_tagged_resources: 5, + all_tagged_resources_size: 50, + }; + let idx_7_size = ResourceGroupSize::Combined { + num_tagged_resources: 7, + all_tagged_resources_size: 70, + }; + let idx_8_size = ResourceGroupSize::Combined { + num_tagged_resources: 8, + all_tagged_resources_size: 80, + }; + + assert_ok!(map.write( ap.clone(), 7, 3, // insert at 0, remove at 1. vec![ - (0, (TestValue::with_kind(100, true), None)), + (0, (TestValue::creation_with_len(100), None)), (1, (TestValue::deletion(), None)), ], - ); - map.write( + idx_7_size, + HashSet::new(), + )); + assert_ok!(map.write( ap.clone(), 3, 0, // tags 2, 3 - (2..4).map(|i| (i, (TestValue::with_kind(200 + i, false), None))), - ); - let committed_3 = finalize_group_as_hashmap(&map, &ap, 3); + (2..4).map(|i| (i, (TestValue::creation_with_len(200 + i), None))), + idx_3_size, + HashSet::new(), + )); + + let (finalized_3, size_3) = finalize_group_as_hashmap(&map, &ap, 3); + // Finalize returns size recorded by txn 3, while get_group_size at txn index + // 3 must return the size recorded below it. + assert_eq!(size_3, idx_3_size); + assert_ok_eq!(map.get_group_size(&ap, 3), base_size,); + // The value at tag 1 is from base, while 2 and 3 are from txn 3. // (Arc compares with value equality) - assert_eq!(committed_3.len(), 3); + assert_eq!(finalized_3.len(), 3); assert_some_eq!( - committed_3.get(&1), - &ValueWithLayout::RawFromStorage(Arc::new(TestValue::with_kind(1, true))) + finalized_3.get(&1), + &ValueWithLayout::RawFromStorage(Arc::new(TestValue::creation_with_len(1))) ); assert_some_eq!( - committed_3.get(&2), - &ValueWithLayout::Exchanged(Arc::new(TestValue::with_kind(202, false)), None) + finalized_3.get(&2), + &ValueWithLayout::Exchanged(Arc::new(TestValue::creation_with_len(202)), None) ); assert_some_eq!( - committed_3.get(&3), - &ValueWithLayout::Exchanged(Arc::new(TestValue::with_kind(203, false)), None) + finalized_3.get(&3), + &ValueWithLayout::Exchanged(Arc::new(TestValue::creation_with_len(203)), None) ); - map.write(ap.clone(), 5, 3, vec![ - (3, (TestValue::with_kind(303, false), None)), - (4, (TestValue::with_kind(304, true), None)), - ]); - let committed_5 = finalize_group_as_hashmap(&map, &ap, 5); - assert_eq!(committed_5.len(), 4); + assert_ok!(map.write( + ap.clone(), + 5, + 3, + vec![ + (3, (TestValue::creation_with_len(303), None)), + (4, (TestValue::creation_with_len(304), None)), + ], + idx_5_size, + HashSet::new(), + )); + // Finalize should work even for indices without writes. + let (finalized_6, size_6) = finalize_group_as_hashmap(&map, &ap, 6); + assert_eq!(size_6, idx_5_size); + assert_eq!(finalized_6.len(), 4); assert_some_eq!( - committed_5.get(&1), - &ValueWithLayout::RawFromStorage(Arc::new(TestValue::with_kind(1, true))) + finalized_6.get(&1), + &ValueWithLayout::RawFromStorage(Arc::new(TestValue::creation_with_len(1))) ); assert_some_eq!( - committed_5.get(&2), - &ValueWithLayout::Exchanged(Arc::new(TestValue::with_kind(202, false)), None) + finalized_6.get(&2), + &ValueWithLayout::Exchanged(Arc::new(TestValue::creation_with_len(202)), None) ); assert_some_eq!( - committed_5.get(&3), - &ValueWithLayout::Exchanged(Arc::new(TestValue::with_kind(303, false)), None) + finalized_6.get(&3), + &ValueWithLayout::Exchanged(Arc::new(TestValue::creation_with_len(303)), None) ); assert_some_eq!( - committed_5.get(&4), - &ValueWithLayout::Exchanged(Arc::new(TestValue::with_kind(304, true)), None) + finalized_6.get(&4), + &ValueWithLayout::Exchanged(Arc::new(TestValue::creation_with_len(304)), None) ); - let committed_7 = finalize_group_as_hashmap(&map, &ap, 7); - assert_eq!(committed_7.len(), 4); + let (finalized_7, size_7) = finalize_group_as_hashmap(&map, &ap, 7); + assert_eq!(size_7, idx_7_size); + assert_eq!(finalized_7.len(), 4); assert_some_eq!( - committed_7.get(&0), - &ValueWithLayout::Exchanged(Arc::new(TestValue::with_kind(100, true)), None) + finalized_7.get(&0), + &ValueWithLayout::Exchanged(Arc::new(TestValue::creation_with_len(100)), None) ); - assert_none!(committed_7.get(&1)); + assert_none!(finalized_7.get(&1)); assert_some_eq!( - committed_7.get(&2), - &ValueWithLayout::Exchanged(Arc::new(TestValue::with_kind(202, false)), None) + finalized_7.get(&2), + &ValueWithLayout::Exchanged(Arc::new(TestValue::creation_with_len(202)), None) ); assert_some_eq!( - committed_7.get(&3), - &ValueWithLayout::Exchanged(Arc::new(TestValue::with_kind(303, false)), None) + finalized_7.get(&3), + &ValueWithLayout::Exchanged(Arc::new(TestValue::creation_with_len(303)), None) ); assert_some_eq!( - committed_7.get(&4), - &ValueWithLayout::Exchanged(Arc::new(TestValue::with_kind(304, true)), None) + finalized_7.get(&4), + &ValueWithLayout::Exchanged(Arc::new(TestValue::creation_with_len(304)), None) ); - map.write( + assert_ok!(map.write( ap.clone(), 8, 0, // re-insert at 1, remove everything else vec![ (0, (TestValue::deletion(), None)), - (1, (TestValue::with_kind(400, true), None)), + (1, (TestValue::creation_with_len(400), None)), (2, (TestValue::deletion(), None)), (3, (TestValue::deletion(), None)), (4, (TestValue::deletion(), None)), ], - ); - let committed_8 = finalize_group_as_hashmap(&map, &ap, 8); - assert_eq!(committed_8.len(), 1); + idx_8_size, + HashSet::new(), + )); + let (finalized_8, size_8) = finalize_group_as_hashmap(&map, &ap, 8); + assert_eq!(size_8, idx_8_size); + assert_eq!(finalized_8.len(), 1); assert_some_eq!( - committed_8.get(&1), - &ValueWithLayout::Exchanged(Arc::new(TestValue::with_kind(400, true)), None) + finalized_8.get(&1), + &ValueWithLayout::Exchanged(Arc::new(TestValue::creation_with_len(400)), None) ); } // TODO[agg_v2](test) Test with non trivial layout. #[test] - fn group_commit_op_kind_checks() { + fn group_base_layout() { let ap = KeyType(b"/foo/f".to_vec()); - let map = VersionedGroupData::>, usize, TestValue>::new(); - - map.set_raw_base_values( - ap.clone(), - // base tag 1, 2, 3 - (1..4).map(|i| (i, TestValue::with_kind(i, true))), - ); - map.write( - ap.clone(), - 3, - 2, - // remove at 0, must fail commit. - vec![(0, (TestValue::deletion(), None))], - ); - assert_err!(map.finalize_group(&ap, 3)); + let map = VersionedGroupData::>, usize, TestValue>::empty(); - map.write( - ap.clone(), - 3, - 2, - // modify at 0, must fail commit. - vec![(0, (TestValue::with_kind(100, false), None))], - ); - assert_err!(map.finalize_group(&ap, 3)); - - map.write( - ap.clone(), - 3, - 2, - // create at 1, must fail commit - vec![(1, (TestValue::with_kind(101, true), None))], + assert_ok!(map.set_raw_base_values(ap.clone(), vec![(1, TestValue::creation_with_len(1))],)); + assert_eq!( + map.fetch_tagged_data(&ap, &1, 6).unwrap(), + ( + Err(StorageVersion), + ValueWithLayout::RawFromStorage(Arc::new(TestValue::creation_with_len(1))) + ) ); - assert_err!(map.finalize_group(&ap, 3)); - // sanity check the commit succeeds with proper kind. - map.write( + map.update_tagged_base_value_with_layout( ap.clone(), - 3, - 2, - // modify at 0, must fail commit. - vec![ - (0, (TestValue::with_kind(100, true), None)), - (1, (TestValue::with_kind(101, false), None)), - ], - ); - let committed = finalize_group_as_hashmap(&map, &ap, 3); - assert_some_eq!( - committed.get(&0), - &ValueWithLayout::Exchanged(Arc::new(TestValue::with_kind(100, true)), None) - ); - assert_some_eq!( - committed.get(&1), - &ValueWithLayout::Exchanged(Arc::new(TestValue::with_kind(101, false)), None) - ); - assert_some_eq!( - committed.get(&2), - &ValueWithLayout::RawFromStorage(Arc::new(TestValue::with_kind(2, true))) + 1, + TestValue::creation_with_len(1), + None, ); - assert_some_eq!( - committed.get(&3), - &ValueWithLayout::RawFromStorage(Arc::new(TestValue::with_kind(3, true))) + assert_eq!( + map.fetch_tagged_data(&ap, &1, 6).unwrap(), + ( + Err(StorageVersion), + ValueWithLayout::Exchanged(Arc::new(TestValue::creation_with_len(1)), None) + ) ); } } diff --git a/aptos-move/mvhashmap/src/versioned_modules.rs b/aptos-move/mvhashmap/src/versioned_modules.rs index edab94b933fdd..828677c52e4e8 100644 --- a/aptos-move/mvhashmap/src/versioned_modules.rs +++ b/aptos-move/mvhashmap/src/versioned_modules.rs @@ -101,7 +101,7 @@ impl Default for VersionedValue { } impl VersionedModules { - pub(crate) fn new() -> Self { + pub(crate) fn empty() -> Self { Self { values: DashMap::new(), }