diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index f63bc70e77e7a..8752cae192b68 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -130,15 +130,9 @@ impl ExecutionProxy { tx } - fn pre_commit_hook( - &self, - block: &Block, - payload_manager: Arc, - ) -> PreCommitHook { + fn pre_commit_hook(&self) -> PreCommitHook { let mut pre_commit_notifier = self.pre_commit_notifier.clone(); let state_sync_notifier = self.state_sync_notifier.clone(); - let payload = block.payload().cloned(); - let timestamp = block.timestamp_usecs(); Box::new(move |state_compute_result: &StateComputeResult| { let state_compute_result = state_compute_result.clone(); Box::pin(async move { @@ -157,15 +151,37 @@ impl ExecutionProxy { ) { error!(error = ?e, "Failed to notify state synchronizer"); } - - let payload_vec = payload.into_iter().collect(); - payload_manager.notify_commit(timestamp, payload_vec); })) .await .expect("Failed to send pre-commit notification"); }) }) } + + fn commit_hook( + &self, + blocks: &[Arc], + callback: StateComputerCommitCallBackType, + finality_proof: LedgerInfoWithSignatures, + ) -> NotificationType { + let payload_manager = self + .state + .read() + .as_ref() + .expect("must be set within an epoch") + .payload_manager + .clone(); + let blocks = blocks.to_vec(); + Box::pin(async move { + for block in blocks.iter() { + let payload = block.payload().cloned(); + let payload_vec = payload.into_iter().collect(); + let timestamp = block.timestamp_usecs(); + payload_manager.notify_commit(timestamp, payload_vec); + } + callback(&blocks, finality_proof); + }) + } } #[async_trait::async_trait] @@ -201,7 +217,7 @@ impl StateComputer for ExecutionProxy { let txn_notifier = self.txn_notifier.clone(); let transaction_generator = BlockPreparer::new( - payload_manager.clone(), + payload_manager, self.transaction_filter.clone(), transaction_deduper.clone(), transaction_shuffler.clone(), @@ -225,7 +241,7 @@ impl StateComputer for ExecutionProxy { parent_block_id, transaction_generator, block_executor_onchain_config, - self.pre_commit_hook(block, payload_manager), + self.pre_commit_hook(), lifetime_guard, ) .await; @@ -308,14 +324,9 @@ impl StateComputer for ExecutionProxy { ) .expect("spawn_blocking failed"); - let blocks = blocks.to_vec(); - let callback_fut = Box::pin(async move { - callback(&blocks, finality_proof); - }); - self.commit_notifier .clone() - .send(callback_fut) + .send(self.commit_hook(blocks, callback, finality_proof)) .await .expect("Failed to send commit notification");