Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Add deferred_read native function in aggregators #8622

Closed
wants to merge 10 commits into from
72 changes: 65 additions & 7 deletions aptos-move/aptos-aggregator/src/aggregator_extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ pub enum AggregatorState {
NegativeDelta,
}

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct AggregatorHandle(pub AccountAddress);

/// Uniquely identifies each aggregator instance in storage.
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct AggregatorID {
// A handle that is shared across all aggregator instances created by the
// same `AggregatorFactory` and which is used for fine-grained storage
Expand All @@ -35,6 +35,13 @@ pub struct AggregatorID {
pub key: AggregatorHandle,
}

/// Uniquely identifies each aggregator snapshot instance during the block execution.
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct AggregatorSnapshotID {
pub key: u128,
}


impl AggregatorID {
pub fn new(handle: TableHandle, key: AggregatorHandle) -> Self {
AggregatorID { handle, key }
Expand Down Expand Up @@ -79,7 +86,7 @@ pub fn aggregator_id_for_test(key: u128) -> AggregatorID {
///
/// TODO: while we support tracking of the history, it is not yet fully used on
/// executor side because we don't know how to throw errors.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct History {
pub max_positive: u128,
pub min_negative: u128,
Expand Down Expand Up @@ -120,6 +127,24 @@ pub struct Aggregator {
history: Option<History>,
}

/// Internal AggregatorSnapshot data structure.
#[derive(Debug)]
pub struct AggregatorSnapshot {
// Describes a value of an aggregator.
value: u128,
// Describes a state of an aggregator.
state: AggregatorState,
// Describes an upper bound of an aggregator. If `value` exceeds it, the
// aggregator overflows.
// TODO: Currently this is a single u128 value since we use 0 as a trivial
// lower bound. If we want to support custom lower bounds, or have more
// complex postconditions, we should factor this out in its own struct.
limit: u128,
// Describes values seen by this aggregator. Note that if aggregator knows
// its value, then storing history doesn't make sense.
history: Option<History>,
}

impl Aggregator {
/// Records observed delta in history. Should be called after an operation
/// to record its side-effects.
Expand Down Expand Up @@ -277,8 +302,20 @@ impl Aggregator {
}

/// Unpacks aggregator into its fields.
pub fn into(self) -> (u128, AggregatorState, u128, Option<History>) {
(self.value, self.state, self.limit, self.history)
pub fn into(
self,
) -> (
u128,
AggregatorState,
u128,
Option<History>
) {
(
self.value,
self.state,
self.limit,
self.history,
)
}
}

Expand All @@ -294,6 +331,8 @@ pub struct AggregatorData {
destroyed_aggregators: BTreeSet<AggregatorID>,
// All aggregator instances that exist in the current transaction.
aggregators: BTreeMap<AggregatorID, Aggregator>,
// All aggregatorsnapshot instances that exist in the current transaction.
aggregator_snapshots: BTreeMap<AggregatorSnapshotID, AggregatorSnapshot>,
}

impl AggregatorData {
Expand All @@ -317,7 +356,7 @@ impl AggregatorData {
value: 0,
state: AggregatorState::PositiveDelta,
limit,
history: Some(History::new()),
history: Some(History::new())
});

if !aggregator_enabled {
Expand All @@ -339,12 +378,31 @@ impl AggregatorData {
value: 0,
state: AggregatorState::Data,
limit,
history: None,
history: None
};
self.aggregators.insert(id, aggregator);
self.new_aggregators.insert(id);
}

pub fn deferred_read(&mut self, id: AggregatorID) -> PartialVMResult<AggregatorSnapshotID> {
let aggregator = self
.aggregators
.get(&id)
.expect("Aggregator should exist to create a snapshot");
let snapshot = AggregatorSnapshot {
value: aggregator.value,
state: aggregator.state,
history: aggregator.history.clone(),
limit: aggregator.limit
};
// TODO: Use an atomic counter for snapshot_id
let snapshot_id = AggregatorSnapshotID {
key: 0
};
self.aggregator_snapshots.insert(snapshot_id, snapshot);
Ok(snapshot_id)
}

/// If aggregator has been used in this transaction, it is removed. Otherwise,
/// it is marked for deletion.
pub fn remove_aggregator(&mut self, id: AggregatorID) {
Expand Down
23 changes: 18 additions & 5 deletions aptos-move/aptos-aggregator/src/delta_change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! (for accessing the storage) and an operation: a partial function with a
//! postcondition.

use crate::module::AGGREGATOR_MODULE;
use crate::{aggregator_extension::AggregatorID, module::AGGREGATOR_MODULE};
use aptos_state_view::StateView;
use aptos_types::{
state_store::state_key::StateKey,
Expand Down Expand Up @@ -33,6 +33,8 @@ pub struct DeltaOp {
limit: u128,
/// Delta which is the result of the execution.
update: DeltaUpdate,
/// This DeltaOp should be applied to this base_aggregator.
base_aggregator: Option<AggregatorID>,
}

/// Different delta functions.
Expand All @@ -44,12 +46,19 @@ pub enum DeltaUpdate {

impl DeltaOp {
/// Creates a new delta op.
pub fn new(update: DeltaUpdate, limit: u128, max_positive: u128, min_negative: u128) -> Self {
pub fn new(
update: DeltaUpdate,
limit: u128,
max_positive: u128,
min_negative: u128,
base_aggregator: Option<AggregatorID>,
) -> Self {
Self {
max_positive,
min_negative,
limit,
update,
base_aggregator,
}
}

Expand All @@ -58,6 +67,10 @@ impl DeltaOp {
self.update
}

pub fn base_aggregator(&self) -> Option<AggregatorID> {
self.base_aggregator
}

/// Returns the result of delta application to `base` or error if
/// postcondition is not satisfied.
pub fn apply_to(&self, base: u128) -> PartialVMResult<u128> {
Expand Down Expand Up @@ -241,7 +254,7 @@ pub fn subtraction(base: u128, value: u128) -> PartialVMResult<u128> {

/// Error for delta application. Can be used by delta partial functions
/// to return descriptive error messages and an appropriate error code.
fn abort_error(message: impl ToString, code: u64) -> PartialVMError {
pub fn abort_error(message: impl ToString, code: u64) -> PartialVMError {
PartialVMError::new(StatusCode::ABORTED)
.with_message(message.to_string())
.with_sub_status(code)
Expand Down Expand Up @@ -280,12 +293,12 @@ pub fn deserialize(value_bytes: &[u8]) -> u128 {

// Helper for tests, #[cfg(test)] doesn't work for cross-crate.
pub fn delta_sub(v: u128, limit: u128) -> DeltaOp {
DeltaOp::new(DeltaUpdate::Minus(v), limit, 0, v)
DeltaOp::new(DeltaUpdate::Minus(v), limit, 0, v, None)
}

// Helper for tests, #[cfg(test)] doesn't work for cross-crate.
pub fn delta_add(v: u128, limit: u128) -> DeltaOp {
DeltaOp::new(DeltaUpdate::Plus(v), limit, v, 0)
DeltaOp::new(DeltaUpdate::Plus(v), limit, v, 0, None)
}

/// `DeltaChangeSet` contains all access paths that one transaction wants to update with deltas.
Expand Down
2 changes: 2 additions & 0 deletions aptos-move/aptos-gas/src/aptos_framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ crate::natives::define_gas_parameters_for_natives!(GasParameters, "aptos_framewo

[.aggregator.add.base, "aggregator.add.base", 300 * MUL],
[.aggregator.read.base, "aggregator.read.base", 300 * MUL],
// TODO: Find the right amount of gas to charge
[.aggregator.deferred_read.base, "aggregator.deferred_read.base", 200 * MUL],
[.aggregator.sub.base, "aggregator.sub.base", 300 * MUL],
[.aggregator.destroy.base, "aggregator.destroy.base", 500 * MUL],
[.aggregator_factory.new_aggregator.base, "aggregator_factory.new_aggregator.base", 500 * MUL],
Expand Down
16 changes: 16 additions & 0 deletions aptos-move/block-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use aptos_mvhashmap::{
types::{MVDataError, MVDataOutput, TxnIndex, Version},
unsync_map::UnsyncMap,
MVHashMap,
versioned_data::EntryCell
};
use aptos_state_view::TStateView;
use aptos_types::{
Expand Down Expand Up @@ -471,6 +472,21 @@ where
) {
let (num_deltas, delta_keys) = last_input_output.delta_keys(txn_idx);
let mut delta_writes = Vec::with_capacity(num_deltas);
let resolved_promises = Vec::new();

// Resolve all the aggregator snapshots (derived aggregators).
// The DeltaOps corresponding to aggregator snapshots
for k in delta_keys {
versioned_cache.get_delta_value(&k, txn_idx).map(|entry_cell| {
if let EntryCell::Delta(delta_op, _) = entry_cell {
println!("Delta Op in worker commit hook {:?}", entry_cell);
if !matches!(delta_op.base_aggregator(), Some(base_aggregator)) {
println!("Didn't match");
}
}
});
}

for k in delta_keys {
// Note that delta materialization happens concurrently, but under concurrent
// commit_hooks (which may be dispatched by the coordinator), threads may end up
Expand Down
1 change: 1 addition & 0 deletions aptos-move/block-executor/src/unit_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ fn delta_chains() {
u128::MAX,
0,
0,
None,
),
)),
false => None,
Expand Down
65 changes: 46 additions & 19 deletions aptos-move/framework/aptos-framework/doc/aggregator.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ at the moment.**
- [Function `add`](#0x1_aggregator_add)
- [Function `sub`](#0x1_aggregator_sub)
- [Function `read`](#0x1_aggregator_read)
- [Function `deferred_read`](#0x1_aggregator_deferred_read)
- [Function `destroy`](#0x1_aggregator_destroy)
- [Specification](#@Specification_1)
- [Struct `Aggregator`](#@Specification_1_Aggregator)
Expand All @@ -32,7 +33,8 @@ at the moment.**
- [Function `destroy`](#@Specification_1_destroy)


<pre><code></code></pre>
<pre><code><b>use</b> <a href="promise.md#0x1_promise">0x1::promise</a>;
</code></pre>



Expand Down Expand Up @@ -204,6 +206,31 @@ Returns a value stored in this aggregator.



</details>

<a name="0x1_aggregator_deferred_read"></a>

## Function `deferred_read`

Returns a promise whhich acts as a placeholder for the aggregator read operation.
The promise will be resolved (the aggregator read operation) will be performed
when the transaction is committed.


<pre><code><b>public</b> <b>fun</b> <a href="aggregator.md#0x1_aggregator_deferred_read">deferred_read</a>(<a href="aggregator.md#0x1_aggregator">aggregator</a>: &<a href="aggregator.md#0x1_aggregator_Aggregator">aggregator::Aggregator</a>): <a href="promise.md#0x1_promise_Promise">promise::Promise</a>
</code></pre>



<details>
<summary>Implementation</summary>


<pre><code><b>public</b> <b>native</b> <b>fun</b> <a href="aggregator.md#0x1_aggregator_deferred_read">deferred_read</a>(<a href="aggregator.md#0x1_aggregator">aggregator</a>: &<a href="aggregator.md#0x1_aggregator_Aggregator">Aggregator</a>): Promise;
</code></pre>



</details>

<a name="0x1_aggregator_destroy"></a>
Expand Down Expand Up @@ -234,6 +261,24 @@ Destroys an aggregator and removes it from its <code>AggregatorFactory</code>.
## Specification



<a name="0x1_aggregator_spec_aggregator_set_val"></a>


<pre><code><b>native</b> <b>fun</b> <a href="aggregator.md#0x1_aggregator_spec_aggregator_set_val">spec_aggregator_set_val</a>(a: <a href="aggregator.md#0x1_aggregator_Aggregator">Aggregator</a>, v: u128): <a href="aggregator.md#0x1_aggregator_Aggregator">Aggregator</a>;
</code></pre>




<a name="0x1_aggregator_spec_aggregator_get_val"></a>


<pre><code><b>native</b> <b>fun</b> <a href="aggregator.md#0x1_aggregator_spec_aggregator_get_val">spec_aggregator_get_val</a>(a: <a href="aggregator.md#0x1_aggregator_Aggregator">Aggregator</a>): u128;
</code></pre>



<a name="@Specification_1_Aggregator"></a>

### Struct `Aggregator`
Expand Down Expand Up @@ -384,22 +429,4 @@ Destroys an aggregator and removes it from its <code>AggregatorFactory</code>.
</code></pre>




<a name="0x1_aggregator_spec_aggregator_set_val"></a>


<pre><code><b>native</b> <b>fun</b> <a href="aggregator.md#0x1_aggregator_spec_aggregator_set_val">spec_aggregator_set_val</a>(a: <a href="aggregator.md#0x1_aggregator_Aggregator">Aggregator</a>, v: u128): <a href="aggregator.md#0x1_aggregator_Aggregator">Aggregator</a>;
</code></pre>




<a name="0x1_aggregator_spec_aggregator_get_val"></a>


<pre><code><b>native</b> <b>fun</b> <a href="aggregator.md#0x1_aggregator_spec_aggregator_get_val">spec_aggregator_get_val</a>(a: <a href="aggregator.md#0x1_aggregator_Aggregator">Aggregator</a>): u128;
</code></pre>


[move-book]: https://aptos.dev/guides/move-guides/book/SUMMARY
1 change: 1 addition & 0 deletions aptos-move/framework/aptos-framework/doc/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ This is the reference documentation of the Aptos framework.
- [`0x1::object`](object.md#0x1_object)
- [`0x1::optional_aggregator`](optional_aggregator.md#0x1_optional_aggregator)
- [`0x1::primary_fungible_store`](primary_fungible_store.md#0x1_primary_fungible_store)
- [`0x1::promise`](promise.md#0x1_promise)
- [`0x1::reconfiguration`](reconfiguration.md#0x1_reconfiguration)
- [`0x1::resource_account`](resource_account.md#0x1_resource_account)
- [`0x1::stake`](stake.md#0x1_stake)
Expand Down
Loading