From 68fd1028f9f6964d10651b41b3e1d0633d1d5ba2 Mon Sep 17 00:00:00 2001 From: aldenhu Date: Thu, 24 Oct 2024 05:48:56 +0000 Subject: [PATCH] Move parsing of subscribable events off critical path --- Cargo.lock | 3 + execution/executor-types/Cargo.toml | 3 + .../executor-types/src/execution_output.rs | 12 +-- execution/executor-types/src/lib.rs | 1 + execution/executor-types/src/planned.rs | 75 +++++++++++++++++++ .../src/workflow/do_get_execution_output.rs | 43 ++++++----- 6 files changed, 113 insertions(+), 24 deletions(-) create mode 100644 execution/executor-types/src/planned.rs diff --git a/Cargo.lock b/Cargo.lock index 2a7e5feea00f61..43cacda921c0bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1549,6 +1549,7 @@ dependencies = [ "anyhow", "aptos-crypto", "aptos-drop-helper", + "aptos-infallible", "aptos-scratchpad", "aptos-secure-net", "aptos-storage-interface", @@ -1557,6 +1558,8 @@ dependencies = [ "criterion", "derive_more", "itertools 0.13.0", + "once_cell", + "rayon", "serde", "thiserror", ] diff --git a/execution/executor-types/Cargo.toml b/execution/executor-types/Cargo.toml index 9a449959bd01a9..4a4dbaf9877996 100644 --- a/execution/executor-types/Cargo.toml +++ b/execution/executor-types/Cargo.toml @@ -16,6 +16,7 @@ rust-version = { workspace = true } anyhow = { workspace = true } aptos-crypto = { workspace = true } aptos-drop-helper = { workspace = true } +aptos-infallible = { workspace = true } aptos-scratchpad = { workspace = true } aptos-secure-net = { workspace = true } aptos-storage-interface = { workspace = true } @@ -24,6 +25,8 @@ bcs = { workspace = true } criterion = { workspace = true } derive_more = { workspace = true } itertools = { workspace = true } +once_cell = { workspace = true } +rayon = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } diff --git a/execution/executor-types/src/execution_output.rs b/execution/executor-types/src/execution_output.rs index 9da0e20aaeb11f..b9e2ac48061446 100644 --- a/execution/executor-types/src/execution_output.rs +++ b/execution/executor-types/src/execution_output.rs @@ -4,7 +4,7 @@ #![forbid(unsafe_code)] -use crate::transactions_with_output::TransactionsWithOutput; +use crate::{planned::Planned, transactions_with_output::TransactionsWithOutput}; use aptos_drop_helper::DropHelper; use aptos_storage_interface::{cached_state_view::StateCache, state_delta::StateDelta}; use aptos_types::{ @@ -34,7 +34,7 @@ impl ExecutionOutput { state_cache: StateCache, block_end_info: Option, next_epoch_state: Option, - subscribable_events: Vec, + subscribable_events: Planned>, ) -> Self { if is_block { // If it's a block, ensure it ends with state checkpoint. @@ -73,7 +73,7 @@ impl ExecutionOutput { state_cache: StateCache::new_empty(state.current.clone()), block_end_info: None, next_epoch_state: None, - subscribable_events: vec![], + subscribable_events: Planned::ready(vec![]), }) } @@ -90,7 +90,7 @@ impl ExecutionOutput { state_cache: StateCache::new_dummy(), block_end_info: None, next_epoch_state: None, - subscribable_events: vec![], + subscribable_events: Planned::ready(vec![]), }) } @@ -109,7 +109,7 @@ impl ExecutionOutput { state_cache: StateCache::new_dummy(), block_end_info: None, next_epoch_state: self.next_epoch_state.clone(), - subscribable_events: vec![], + subscribable_events: Planned::ready(vec![]), }) } @@ -155,7 +155,7 @@ pub struct Inner { /// Only present if the block is the last block of an epoch, and is parsed output of the /// state cache. pub next_epoch_state: Option, - pub subscribable_events: Vec, + pub subscribable_events: Planned>, } impl Inner { diff --git a/execution/executor-types/src/lib.rs b/execution/executor-types/src/lib.rs index ea9918f5a9f416..bffbbab12cd64e 100644 --- a/execution/executor-types/src/lib.rs +++ b/execution/executor-types/src/lib.rs @@ -36,6 +36,7 @@ use std::{ mod error; pub mod execution_output; mod ledger_update_output; +pub mod planned; pub mod state_checkpoint_output; pub mod state_compute_result; pub mod transactions_with_output; diff --git a/execution/executor-types/src/planned.rs b/execution/executor-types/src/planned.rs new file mode 100644 index 00000000000000..e16206ce9b6347 --- /dev/null +++ b/execution/executor-types/src/planned.rs @@ -0,0 +1,75 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_infallible::Mutex; +use once_cell::sync::OnceCell; +use rayon::ThreadPool; +use std::{ops::Deref, sync::mpsc::Receiver}; + +#[derive(Debug)] +pub struct Planned { + value: OnceCell, + rx: OnceCell>>, +} + +impl Planned { + pub fn place_holder() -> Self { + Self { + value: OnceCell::new(), + rx: OnceCell::new(), + } + } + + pub fn plan(&self, thread_pool: &ThreadPool, getter: impl FnOnce() -> T + Send + 'static) + where + T: Send + 'static, + { + let (tx, rx) = std::sync::mpsc::channel(); + + thread_pool.spawn(move || { + tx.send(getter()).ok(); + }); + + self.rx.set(Mutex::new(rx)).expect("Already planned."); + } + + pub fn ready(t: T) -> Self { + Self { + value: OnceCell::with_value(t), + rx: OnceCell::new(), + } + } + + pub fn get(&self) -> &T { + if let Some(t) = self.value.get() { + t + } else { + let rx = self.rx.get().expect("Not planned").lock(); + if self.value.get().is_none() { + let t = rx.recv().expect("Plan failed."); + self.value.set(t).map_err(|_| "").expect("Already set."); + } + self.value.get().expect("Must have been set.") + } + } +} + +impl Deref for Planned { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.get() + } +} + +pub trait Plan { + fn plan(&self, getter: impl FnOnce() -> T + Send + 'static) -> Planned; +} + +impl Plan for ThreadPool { + fn plan(&self, getter: impl FnOnce() -> T + Send + 'static) -> Planned { + let planned = Planned::::place_holder(); + planned.plan(self, getter); + planned + } +} diff --git a/execution/executor/src/workflow/do_get_execution_output.rs b/execution/executor/src/workflow/do_get_execution_output.rs index 808a90c10c972a..c2c34a0d4e0f78 100644 --- a/execution/executor/src/workflow/do_get_execution_output.rs +++ b/execution/executor/src/workflow/do_get_execution_output.rs @@ -12,7 +12,7 @@ use aptos_executor_service::{ remote_executor_client::{get_remote_addresses, REMOTE_SHARDED_BLOCK_EXECUTOR}, }; use aptos_executor_types::{ - execution_output::ExecutionOutput, should_forward_to_subscription_service, + execution_output::ExecutionOutput, planned::Planned, should_forward_to_subscription_service, transactions_with_output::TransactionsWithOutput, }; use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; @@ -24,6 +24,7 @@ use aptos_types::{ config::BlockExecutorConfigFromOnchain, partitioner::{ExecutableTransactions, PartitionedTransactions}, }, + contract_event::ContractEvent, epoch_state::EpochState, on_chain_config::{ConfigurationResource, OnChainConfig, ValidatorSet}, state_store::{ @@ -311,21 +312,8 @@ impl Parser { .then(|| Self::ensure_next_epoch_state(&to_commit)) .transpose()? }; - let subscribable_events = { - let _timer = OTHER_TIMERS.timer_with(&["parse_raw_output__subscribable_events"]); - to_commit - .transaction_outputs() - .iter() - .flat_map(|o| { - o.events() - .iter() - .filter(|e| should_forward_to_subscription_service(e)) - }) - .cloned() - .collect_vec() - }; - Ok(ExecutionOutput::new( + let out = ExecutionOutput::new( is_block, first_version, statuses_for_input_txns, @@ -335,8 +323,24 @@ impl Parser { state_cache, block_end_info, next_epoch_state, - subscribable_events, - )) + Planned::place_holder(), + ); + let ret = out.clone(); + ret.subscribable_events + .plan(THREAD_MANAGER.get_non_exe_cpu_pool(), move || { + Self::get_subscribable_events(&out) + }); + Ok(ret) + } + + fn get_subscribable_events(out: &ExecutionOutput) -> Vec { + out.to_commit + .transaction_outputs + .iter() + .flat_map(TransactionOutput::events) + .filter(|e| should_forward_to_subscription_service(e)) + .cloned() + .collect_vec() } fn extract_retries( @@ -543,6 +547,9 @@ mod tests { ]; let execution_output = Parser::parse(0, txns, txn_outs, StateCache::new_dummy(), None, None).unwrap(); - assert_eq!(vec![event_0, event_2], execution_output.subscribable_events); + assert_eq!( + vec![event_0, event_2], + *execution_output.subscribable_events + ); } }