From da21198d8c9bf6004ef1e615c16e2bd2e9bd5eba Mon Sep 17 00:00:00 2001
From: Balaji Arun <balaji@aptoslabs.com>
Date: Tue, 23 Jul 2024 17:21:49 -0700
Subject: [PATCH] [consensus] payload manager enum to trait refactor (#14029)

---
 consensus/src/block_preparer.rs               |   6 +-
 consensus/src/block_storage/block_store.rs    |   8 +-
 consensus/src/block_storage/sync_manager.rs   |   4 +-
 consensus/src/consensus_observer/observer.rs  |  17 +-
 consensus/src/dag/bootstrap.rs                |   8 +-
 consensus/src/dag/tests/helpers.rs            |  19 +-
 consensus/src/dag/tests/integration_tests.rs  |   4 +-
 consensus/src/epoch_manager.rs                |  26 +-
 consensus/src/payload_manager.rs              | 309 +++++++++++-------
 consensus/src/pipeline/execution_client.rs    |   8 +-
 .../src/quorum_store/quorum_store_builder.rs  |  12 +-
 consensus/src/recovery_manager.rs             |   6 +-
 consensus/src/round_manager_fuzzing.rs        |   4 +-
 consensus/src/round_manager_test.rs           |   4 +-
 consensus/src/state_computer.rs               |  11 +-
 consensus/src/state_computer_tests.rs         |  14 +-
 consensus/src/state_replication.rs            |   4 +-
 .../src/test_utils/mock_execution_client.rs   |   8 +-
 .../src/test_utils/mock_state_computer.rs     |   6 +-
 consensus/src/test_utils/mod.rs               |  10 +-
 consensus/src/twins/twins_node.rs             |   4 +-
 21 files changed, 291 insertions(+), 201 deletions(-)

diff --git a/consensus/src/block_preparer.rs b/consensus/src/block_preparer.rs
index 948c508901b8b..7e82e81647ec8 100644
--- a/consensus/src/block_preparer.rs
+++ b/consensus/src/block_preparer.rs
@@ -3,7 +3,7 @@
 
 use crate::{
     counters::{MAX_TXNS_FROM_BLOCK_TO_EXECUTE, TXN_SHUFFLE_SECONDS},
-    payload_manager::PayloadManager,
+    payload_manager::TPayloadManager,
     transaction_deduper::TransactionDeduper,
     transaction_filter::TransactionFilter,
     transaction_shuffler::TransactionShuffler,
@@ -15,7 +15,7 @@ use fail::fail_point;
 use std::sync::Arc;
 
 pub struct BlockPreparer {
-    payload_manager: Arc<PayloadManager>,
+    payload_manager: Arc<dyn TPayloadManager>,
     txn_filter: Arc<TransactionFilter>,
     txn_deduper: Arc<dyn TransactionDeduper>,
     txn_shuffler: Arc<dyn TransactionShuffler>,
@@ -23,7 +23,7 @@ pub struct BlockPreparer {
 
 impl BlockPreparer {
     pub fn new(
-        payload_manager: Arc<PayloadManager>,
+        payload_manager: Arc<dyn TPayloadManager>,
         txn_filter: Arc<TransactionFilter>,
         txn_deduper: Arc<dyn TransactionDeduper>,
         txn_shuffler: Arc<dyn TransactionShuffler>,
diff --git a/consensus/src/block_storage/block_store.rs b/consensus/src/block_storage/block_store.rs
index fd732a66ccea2..36549f458e195 100644
--- a/consensus/src/block_storage/block_store.rs
+++ b/consensus/src/block_storage/block_store.rs
@@ -10,7 +10,7 @@ use crate::{
         BlockReader,
     },
     counters,
-    payload_manager::PayloadManager,
+    payload_manager::TPayloadManager,
     persistent_liveness_storage::{
         PersistentLivenessStorage, RecoveryData, RootInfo, RootMetadata,
     },
@@ -74,7 +74,7 @@ pub struct BlockStore {
     time_service: Arc<dyn TimeService>,
     // consistent with round type
     vote_back_pressure_limit: Round,
-    payload_manager: Arc<PayloadManager>,
+    payload_manager: Arc<dyn TPayloadManager>,
     #[cfg(any(test, feature = "fuzzing"))]
     back_pressure_for_test: AtomicBool,
     order_vote_enabled: bool,
@@ -89,7 +89,7 @@ impl BlockStore {
         max_pruned_blocks_in_mem: usize,
         time_service: Arc<dyn TimeService>,
         vote_back_pressure_limit: Round,
-        payload_manager: Arc<PayloadManager>,
+        payload_manager: Arc<dyn TPayloadManager>,
         order_vote_enabled: bool,
         pending_blocks: Arc<Mutex<PendingBlocks>>,
     ) -> Self {
@@ -144,7 +144,7 @@ impl BlockStore {
         max_pruned_blocks_in_mem: usize,
         time_service: Arc<dyn TimeService>,
         vote_back_pressure_limit: Round,
-        payload_manager: Arc<PayloadManager>,
+        payload_manager: Arc<dyn TPayloadManager>,
         order_vote_enabled: bool,
         pending_blocks: Arc<Mutex<PendingBlocks>>,
     ) -> Self {
diff --git a/consensus/src/block_storage/sync_manager.rs b/consensus/src/block_storage/sync_manager.rs
index a66c3efb0cd8f..12a025c351432 100644
--- a/consensus/src/block_storage/sync_manager.rs
+++ b/consensus/src/block_storage/sync_manager.rs
@@ -19,7 +19,7 @@ use crate::{
     monitor,
     network::{IncomingBlockRetrievalRequest, NetworkSender},
     network_interface::ConsensusMsg,
-    payload_manager::PayloadManager,
+    payload_manager::TPayloadManager,
     persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData},
     pipeline::execution_client::TExecutionClient,
 };
@@ -295,7 +295,7 @@ impl BlockStore {
         retriever: &'a mut BlockRetriever,
         storage: Arc<dyn PersistentLivenessStorage>,
         execution_client: Arc<dyn TExecutionClient>,
-        payload_manager: Arc<PayloadManager>,
+        payload_manager: Arc<dyn TPayloadManager>,
         order_vote_enabled: bool,
     ) -> anyhow::Result<RecoveryData> {
         info!(
diff --git a/consensus/src/consensus_observer/observer.rs b/consensus/src/consensus_observer/observer.rs
index 7b5beadd112a7..d50aff50bfe95 100644
--- a/consensus/src/consensus_observer/observer.rs
+++ b/consensus/src/consensus_observer/observer.rs
@@ -16,13 +16,14 @@ use crate::{
         payload_store::BlockPayloadStore,
         pending_blocks::PendingBlockStore,
         publisher::ConsensusPublisher,
-        subscription,
-        subscription::ConsensusObserverSubscription,
+        subscription::{self, ConsensusObserverSubscription},
     },
     dag::DagCommitSigner,
     network::{IncomingCommitRequest, IncomingRandGenRequest},
     network_interface::CommitMessage,
-    payload_manager::PayloadManager,
+    payload_manager::{
+        ConsensusObserverPayloadManager, DirectMempoolPayloadManager, TPayloadManager,
+    },
     pipeline::execution_client::TExecutionClient,
     state_replication::StateComputerCommitCallBackType,
 };
@@ -1116,13 +1117,13 @@ impl ConsensusObserver {
         );
 
         // Create the payload manager
-        let payload_manager = if self.quorum_store_enabled {
-            PayloadManager::ConsensusObserver(
+        let payload_manager: Arc<dyn TPayloadManager> = if self.quorum_store_enabled {
+            Arc::new(ConsensusObserverPayloadManager::new(
                 self.block_payload_store.get_block_payloads(),
                 self.consensus_publisher.clone(),
-            )
+            ))
         } else {
-            PayloadManager::DirectMempool
+            Arc::new(DirectMempoolPayloadManager {})
         };
 
         // Start the new epoch
@@ -1137,7 +1138,7 @@ impl ConsensusObserver {
             .start_epoch(
                 epoch_state.clone(),
                 dummy_signer,
-                Arc::new(payload_manager),
+                payload_manager,
                 &consensus_config,
                 &execution_config,
                 &randomness_config,
diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs
index dca8669eb6e07..e178bcd58fa46 100644
--- a/consensus/src/dag/bootstrap.rs
+++ b/consensus/src/dag/bootstrap.rs
@@ -32,7 +32,7 @@ use crate::{
     monitor,
     network::IncomingDAGRequest,
     payload_client::PayloadClient,
-    payload_manager::PayloadManager,
+    payload_manager::TPayloadManager,
     pipeline::{buffer_manager::OrderedBlocks, execution_client::TExecutionClient},
 };
 use aptos_bounded_executor::BoundedExecutor;
@@ -330,7 +330,7 @@ pub struct DagBootstrapper {
     dag_network_sender: Arc<dyn TDAGNetworkSender>,
     proof_notifier: Arc<dyn ProofNotifier>,
     time_service: aptos_time_service::TimeService,
-    payload_manager: Arc<PayloadManager>,
+    payload_manager: Arc<dyn TPayloadManager>,
     payload_client: Arc<dyn PayloadClient>,
     ordered_nodes_tx: UnboundedSender<OrderedBlocks>,
     execution_client: Arc<dyn TExecutionClient>,
@@ -355,7 +355,7 @@ impl DagBootstrapper {
         dag_network_sender: Arc<dyn TDAGNetworkSender>,
         proof_notifier: Arc<dyn ProofNotifier>,
         time_service: aptos_time_service::TimeService,
-        payload_manager: Arc<PayloadManager>,
+        payload_manager: Arc<dyn TPayloadManager>,
         payload_client: Arc<dyn PayloadClient>,
         ordered_nodes_tx: UnboundedSender<OrderedBlocks>,
         execution_client: Arc<dyn TExecutionClient>,
@@ -731,7 +731,7 @@ pub(super) fn bootstrap_dag_for_test(
     dag_network_sender: Arc<dyn TDAGNetworkSender>,
     proof_notifier: Arc<dyn ProofNotifier>,
     time_service: aptos_time_service::TimeService,
-    payload_manager: Arc<PayloadManager>,
+    payload_manager: Arc<dyn TPayloadManager>,
     payload_client: Arc<dyn PayloadClient>,
     execution_client: Arc<dyn TExecutionClient>,
 ) -> (
diff --git a/consensus/src/dag/tests/helpers.rs b/consensus/src/dag/tests/helpers.rs
index 9cd77cc915274..24dfe1be4a590 100644
--- a/consensus/src/dag/tests/helpers.rs
+++ b/consensus/src/dag/tests/helpers.rs
@@ -8,15 +8,30 @@ use crate::{
     },
     payload_manager::TPayloadManager,
 };
-use aptos_consensus_types::common::{Author, Payload, Round};
-use aptos_types::aggregate_signature::AggregateSignature;
+use aptos_consensus_types::{
+    block::Block,
+    common::{Author, Payload, Round},
+};
+use aptos_executor_types::ExecutorResult;
+use aptos_types::{aggregate_signature::AggregateSignature, transaction::SignedTransaction};
+use async_trait::async_trait;
 
 pub(super) const TEST_DAG_WINDOW: u64 = 5;
 
 pub(super) struct MockPayloadManager {}
 
+#[async_trait]
 impl TPayloadManager for MockPayloadManager {
     fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) {}
+
+    fn notify_commit(&self, _block_timestamp: u64, _payloads: Vec<Payload>) {}
+
+    async fn get_transactions(
+        &self,
+        _block: &Block,
+    ) -> ExecutorResult<(Vec<SignedTransaction>, Option<u64>)> {
+        Ok((Vec::new(), None))
+    }
 }
 
 pub(super) struct MockOrderRule {}
diff --git a/consensus/src/dag/tests/integration_tests.rs b/consensus/src/dag/tests/integration_tests.rs
index a0ae240cbdb75..2a19bcc3e88b7 100644
--- a/consensus/src/dag/tests/integration_tests.rs
+++ b/consensus/src/dag/tests/integration_tests.rs
@@ -7,7 +7,7 @@ use crate::{
     network::{IncomingDAGRequest, NetworkSender, RpcResponder},
     network_interface::{ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC},
     network_tests::{NetworkPlayground, TwinId},
-    payload_manager::PayloadManager,
+    payload_manager::DirectMempoolPayloadManager,
     pipeline::{buffer_manager::OrderedBlocks, execution_client::DummyExecutionClient},
     test_utils::{consensus_runtime, MockPayloadManager, MockStorage},
 };
@@ -78,7 +78,7 @@ impl DagBootstrapUnit {
         let network = Arc::new(network);
 
         let payload_client = Arc::new(MockPayloadManager::new(None));
-        let payload_manager = Arc::new(PayloadManager::DirectMempool);
+        let payload_manager = Arc::new(DirectMempoolPayloadManager::new());
 
         let execution_client = Arc::new(DummyExecutionClient);
 
diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs
index 0de227d9fafde..1ddcb47111c53 100644
--- a/consensus/src/epoch_manager.rs
+++ b/consensus/src/epoch_manager.rs
@@ -37,7 +37,7 @@ use crate::{
     payload_client::{
         mixed::MixedPayloadClient, user::quorum_store_client::QuorumStoreClient, PayloadClient,
     },
-    payload_manager::PayloadManager,
+    payload_manager::{DirectMempoolPayloadManager, TPayloadManager},
     persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData},
     pipeline::execution_client::TExecutionClient,
     quorum_store::{
@@ -172,7 +172,7 @@ pub struct EpochManager<P: OnChainConfigProvider> {
     dag_rpc_tx: Option<aptos_channel::Sender<AccountAddress, IncomingDAGRequest>>,
     dag_shutdown_tx: Option<oneshot::Sender<oneshot::Sender<()>>>,
     dag_config: DagConsensusConfig,
-    payload_manager: Arc<PayloadManager>,
+    payload_manager: Arc<dyn TPayloadManager>,
     rand_storage: Arc<dyn RandStorage<AugmentedData>>,
     proof_cache: ProofCache,
     consensus_publisher: Option<Arc<ConsensusPublisher>>,
@@ -237,7 +237,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
             dag_shutdown_tx: None,
             aptos_time_service,
             dag_config,
-            payload_manager: Arc::new(PayloadManager::DirectMempool),
+            payload_manager: Arc::new(DirectMempoolPayloadManager::new()),
             rand_storage,
             proof_cache: Cache::builder()
                 .max_capacity(node_config.consensus.proof_cache_capacity)
@@ -672,7 +672,11 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
         epoch_state: &EpochState,
         network_sender: NetworkSender,
         consensus_config: &OnChainConsensusConfig,
-    ) -> (Arc<PayloadManager>, QuorumStoreClient, QuorumStoreBuilder) {
+    ) -> (
+        Arc<dyn TPayloadManager>,
+        QuorumStoreClient,
+        QuorumStoreBuilder,
+    ) {
         // Start QuorumStore
         let (consensus_to_quorum_store_tx, consensus_to_quorum_store_rx) =
             mpsc::channel(self.config.intra_consensus_channel_buffer_size);
@@ -755,7 +759,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
         onchain_jwk_consensus_config: OnChainJWKConsensusConfig,
         network_sender: Arc<NetworkSender>,
         payload_client: Arc<dyn PayloadClient>,
-        payload_manager: Arc<PayloadManager>,
+        payload_manager: Arc<dyn TPayloadManager>,
         rand_config: Option<RandConfig>,
         fast_rand_config: Option<RandConfig>,
         rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
@@ -1194,7 +1198,11 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
         &mut self,
         epoch_state: &EpochState,
         consensus_config: &OnChainConsensusConfig,
-    ) -> (NetworkSender, Arc<dyn PayloadClient>, Arc<PayloadManager>) {
+    ) -> (
+        NetworkSender,
+        Arc<dyn PayloadClient>,
+        Arc<dyn TPayloadManager>,
+    ) {
         self.set_epoch_start_metrics(epoch_state);
         self.quorum_store_enabled = self.enable_quorum_store(consensus_config);
         let network_sender = self.create_network_sender(epoch_state);
@@ -1225,7 +1233,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
         jwk_consensus_config: OnChainJWKConsensusConfig,
         network_sender: NetworkSender,
         payload_client: Arc<dyn PayloadClient>,
-        payload_manager: Arc<PayloadManager>,
+        payload_manager: Arc<dyn TPayloadManager>,
         rand_config: Option<RandConfig>,
         fast_rand_config: Option<RandConfig>,
         rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
@@ -1271,7 +1279,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
         onchain_jwk_consensus_config: OnChainJWKConsensusConfig,
         network_sender: NetworkSender,
         payload_client: Arc<dyn PayloadClient>,
-        payload_manager: Arc<PayloadManager>,
+        payload_manager: Arc<dyn TPayloadManager>,
         rand_config: Option<RandConfig>,
         fast_rand_config: Option<RandConfig>,
         rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
@@ -1550,7 +1558,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
         buffered_proposal_tx: Option<aptos_channel::Sender<Author, VerifiedEvent>>,
         peer_id: AccountAddress,
         event: VerifiedEvent,
-        payload_manager: Arc<PayloadManager>,
+        payload_manager: Arc<dyn TPayloadManager>,
         pending_blocks: Arc<Mutex<PendingBlocks>>,
     ) {
         if let VerifiedEvent::ProposalMsg(proposal) = &event {
diff --git a/consensus/src/payload_manager.rs b/consensus/src/payload_manager.rs
index 8d304fc45094e..a81ea46ada53c 100644
--- a/consensus/src/payload_manager.rs
+++ b/consensus/src/payload_manager.rs
@@ -23,6 +23,7 @@ use aptos_executor_types::{
 use aptos_infallible::Mutex;
 use aptos_logger::prelude::*;
 use aptos_types::transaction::SignedTransaction;
+use async_trait::async_trait;
 use futures::channel::mpsc::Sender;
 use std::{
     collections::{btree_map::Entry, BTreeMap},
@@ -30,32 +31,81 @@ use std::{
 };
 use tokio::sync::oneshot;
 
+/// A trait that defines the interface for a payload manager. The payload manager is responsible for
+/// resolving the transactions in a block's payload.
+#[async_trait]
 pub trait TPayloadManager: Send + Sync {
+    /// Notify the payload manager that a block has been committed. This indicates that the
+    /// transactions in the block's payload are no longer required for consensus.
+    fn notify_commit(&self, block_timestamp: u64, payloads: Vec<Payload>);
+
+    /// Prefetch the data for a payload. This is used to ensure that the data for a payload is
+    /// available when block is executed.
     fn prefetch_payload_data(&self, payload: &Payload, timestamp: u64);
+
+    /// Get the transactions in a block's payload. This function returns a vector of transactions.
+    async fn get_transactions(
+        &self,
+        block: &Block,
+    ) -> ExecutorResult<(Vec<SignedTransaction>, Option<u64>)>;
 }
 
-/// Responsible to extract the transactions out of the payload and notify QuorumStore about commits.
-/// If QuorumStore is enabled, has to ask BatchReader for the transaction behind the proofs of availability in the payload.
-pub enum PayloadManager {
-    DirectMempool,
-    InQuorumStore(
-        Arc<dyn BatchReader>,
-        Sender<CoordinatorCommand>,
-        Option<Arc<ConsensusPublisher>>,
-    ),
-    ConsensusObserver(
-        Arc<Mutex<BTreeMap<(u64, Round), BlockPayloadStatus>>>,
-        Option<Arc<ConsensusPublisher>>,
-    ),
+/// A payload manager that directly returns the transactions in a block's payload.
+pub struct DirectMempoolPayloadManager {}
+
+impl DirectMempoolPayloadManager {
+    pub fn new() -> Self {
+        Self {}
+    }
 }
 
-impl TPayloadManager for PayloadManager {
-    fn prefetch_payload_data(&self, payload: &Payload, timestamp: u64) {
-        self.prefetch_payload_data(payload, timestamp);
+#[async_trait]
+impl TPayloadManager for DirectMempoolPayloadManager {
+    fn notify_commit(&self, _block_timestamp: u64, _payloads: Vec<Payload>) {}
+
+    fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) {}
+
+    async fn get_transactions(
+        &self,
+        block: &Block,
+    ) -> ExecutorResult<(Vec<SignedTransaction>, Option<u64>)> {
+        let Some(payload) = block.payload() else {
+            return Ok((Vec::new(), None));
+        };
+
+        match payload {
+            Payload::DirectMempool(txns) => Ok((txns.clone(), None)),
+            _ => unreachable!(
+                "DirectMempoolPayloadManager: Unacceptable payload type {}. Epoch: {}, Round: {}, Block: {}",
+                payload,
+                block.block_data().epoch(),
+                block.block_data().round(),
+                block.id()
+            ),
+        }
     }
 }
 
-impl PayloadManager {
+/// A payload manager that resolves the transactions in a block's payload from the quorum store.
+pub struct QuorumStorePayloadManager {
+    batch_reader: Arc<dyn BatchReader>,
+    coordinator_tx: Sender<CoordinatorCommand>,
+    maybe_consensus_publisher: Option<Arc<ConsensusPublisher>>,
+}
+
+impl QuorumStorePayloadManager {
+    pub fn new(
+        batch_reader: Arc<dyn BatchReader>,
+        coordinator_tx: Sender<CoordinatorCommand>,
+        maybe_consensus_publisher: Option<Arc<ConsensusPublisher>>,
+    ) -> Self {
+        Self {
+            batch_reader,
+            coordinator_tx,
+            maybe_consensus_publisher,
+        }
+    }
+
     fn request_transactions(
         proofs: Vec<ProofOfStore>,
         block_timestamp: u64,
@@ -80,72 +130,69 @@ impl PayloadManager {
         }
         receivers
     }
+}
 
-    ///Pass commit information to BatchReader and QuorumStore wrapper for their internal cleanups.
-    pub fn notify_commit(&self, block_timestamp: u64, payloads: Vec<Payload>) {
-        match self {
-            PayloadManager::DirectMempool | PayloadManager::ConsensusObserver(_, _) => {},
-            PayloadManager::InQuorumStore(batch_reader, coordinator_tx, _) => {
-                batch_reader.update_certified_timestamp(block_timestamp);
-
-                let batches: Vec<_> = payloads
-                    .into_iter()
-                    .flat_map(|payload| match payload {
-                        Payload::DirectMempool(_) => {
-                            unreachable!("InQuorumStore should be used");
-                        },
-                        Payload::InQuorumStore(proof_with_status) => proof_with_status
-                            .proofs
-                            .iter()
-                            .map(|proof| proof.info().clone())
-                            .collect::<Vec<_>>(),
-                        Payload::InQuorumStoreWithLimit(proof_with_status) => proof_with_status
-                            .proof_with_data
-                            .proofs
-                            .iter()
-                            .map(|proof| proof.info().clone())
-                            .collect::<Vec<_>>(),
-                        Payload::QuorumStoreInlineHybrid(inline_batches, proof_with_data, _) => {
-                            inline_batches
+#[async_trait]
+impl TPayloadManager for QuorumStorePayloadManager {
+    fn notify_commit(&self, block_timestamp: u64, payloads: Vec<Payload>) {
+        self.batch_reader
+            .update_certified_timestamp(block_timestamp);
+
+        let batches: Vec<_> = payloads
+            .into_iter()
+            .flat_map(|payload| match payload {
+                Payload::DirectMempool(_) => {
+                    unreachable!("InQuorumStore should be used");
+                },
+                Payload::InQuorumStore(proof_with_status) => proof_with_status
+                    .proofs
+                    .iter()
+                    .map(|proof| proof.info().clone())
+                    .collect::<Vec<_>>(),
+                Payload::InQuorumStoreWithLimit(proof_with_status) => proof_with_status
+                    .proof_with_data
+                    .proofs
+                    .iter()
+                    .map(|proof| proof.info().clone())
+                    .collect::<Vec<_>>(),
+                Payload::QuorumStoreInlineHybrid(inline_batches, proof_with_data, _) => {
+                    inline_batches
+                        .iter()
+                        .map(|(batch_info, _)| batch_info.clone())
+                        .chain(
+                            proof_with_data
+                                .proofs
                                 .iter()
-                                .map(|(batch_info, _)| batch_info.clone())
-                                .chain(
-                                    proof_with_data
-                                        .proofs
-                                        .iter()
-                                        .map(|proof| proof.info().clone()),
-                                )
-                                .collect::<Vec<_>>()
-                        },
-                    })
-                    .collect();
+                                .map(|proof| proof.info().clone()),
+                        )
+                        .collect::<Vec<_>>()
+                },
+            })
+            .collect();
 
-                let mut tx = coordinator_tx.clone();
+        let mut tx = self.coordinator_tx.clone();
 
-                if let Err(e) = tx.try_send(CoordinatorCommand::CommitNotification(
-                    block_timestamp,
-                    batches,
-                )) {
-                    warn!(
-                        "CommitNotification failed. Is the epoch shutting down? error: {}",
-                        e
-                    );
-                }
-            },
+        if let Err(e) = tx.try_send(CoordinatorCommand::CommitNotification(
+            block_timestamp,
+            batches,
+        )) {
+            warn!(
+                "CommitNotification failed. Is the epoch shutting down? error: {}",
+                e
+            );
         }
     }
 
-    /// Called from consensus to pre-fetch the transaction behind the batches in the block.
-    pub fn prefetch_payload_data(&self, payload: &Payload, timestamp: u64) {
+    fn prefetch_payload_data(&self, payload: &Payload, timestamp: u64) {
         let request_txns_and_update_status =
             move |proof_with_status: &ProofWithData, batch_reader: Arc<dyn BatchReader>| {
                 if proof_with_status.status.lock().is_some() {
                     return;
                 }
-                let receivers = PayloadManager::request_transactions(
+                let receivers = Self::request_transactions(
                     proof_with_status.proofs.clone(),
                     timestamp,
-                    batch_reader.clone(),
+                    batch_reader,
                 );
                 proof_with_status
                     .status
@@ -153,65 +200,46 @@ impl PayloadManager {
                     .replace(DataStatus::Requested(receivers));
             };
 
-        match self {
-            PayloadManager::DirectMempool | PayloadManager::ConsensusObserver(_, _) => {},
-            PayloadManager::InQuorumStore(batch_reader, _, _) => match payload {
-                Payload::InQuorumStore(proof_with_status) => {
-                    request_txns_and_update_status(proof_with_status, batch_reader.clone());
-                },
-                Payload::InQuorumStoreWithLimit(proof_with_data) => {
-                    request_txns_and_update_status(
-                        &proof_with_data.proof_with_data,
-                        batch_reader.clone(),
-                    );
-                },
-                Payload::QuorumStoreInlineHybrid(_, proof_with_data, _) => {
-                    request_txns_and_update_status(proof_with_data, batch_reader.clone());
-                },
-                Payload::DirectMempool(_) => {
-                    unreachable!()
-                },
+        match payload {
+            Payload::InQuorumStore(proof_with_status) => {
+                request_txns_and_update_status(proof_with_status, self.batch_reader.clone());
             },
-        }
+            Payload::InQuorumStoreWithLimit(proof_with_data) => {
+                request_txns_and_update_status(
+                    &proof_with_data.proof_with_data,
+                    self.batch_reader.clone(),
+                );
+            },
+            Payload::QuorumStoreInlineHybrid(_, proof_with_data, _) => {
+                request_txns_and_update_status(proof_with_data, self.batch_reader.clone());
+            },
+            Payload::DirectMempool(_) => {
+                unreachable!()
+            },
+        };
     }
 
-    /// Extract transaction from a given block
-    /// Assumes it is never called for the same block concurrently. Otherwise status can be None.
-    pub async fn get_transactions(
+    async fn get_transactions(
         &self,
         block: &Block,
     ) -> ExecutorResult<(Vec<SignedTransaction>, Option<u64>)> {
-        let payload = match block.payload() {
-            Some(p) => p,
-            None => return Ok((Vec::new(), None)),
+        let Some(payload) = block.payload() else {
+            return Ok((Vec::new(), None));
         };
 
-        if let PayloadManager::ConsensusObserver(block_payloads, consensus_publisher) = self {
-            return get_transactions_for_observer(block, block_payloads, consensus_publisher).await;
-        }
-
-        let transaction_payload = match (self, payload) {
-            (PayloadManager::DirectMempool, Payload::DirectMempool(txns)) => {
-                return Ok((txns.clone(), None))
-            },
-            (
-                PayloadManager::InQuorumStore(batch_reader, _, _),
-                Payload::InQuorumStore(proof_with_data),
-            ) => {
+        let transaction_payload = match payload {
+            Payload::InQuorumStore(proof_with_data) => {
                 let transactions =
-                    process_payload(proof_with_data, batch_reader.clone(), block).await?;
+                    process_payload(proof_with_data, self.batch_reader.clone(), block).await?;
                 BlockTransactionPayload::new_in_quorum_store(
                     transactions,
                     proof_with_data.proofs.clone(),
                 )
             },
-            (
-                PayloadManager::InQuorumStore(batch_reader, _, _),
-                Payload::InQuorumStoreWithLimit(proof_with_data),
-            ) => {
+            Payload::InQuorumStoreWithLimit(proof_with_data) => {
                 let transactions = process_payload(
                     &proof_with_data.proof_with_data,
-                    batch_reader.clone(),
+                    self.batch_reader.clone(),
                     block,
                 )
                 .await?;
@@ -221,17 +249,14 @@ impl PayloadManager {
                     proof_with_data.max_txns_to_execute,
                 )
             },
-            (
-                PayloadManager::InQuorumStore(batch_reader, _, _),
-                Payload::QuorumStoreInlineHybrid(
-                    inline_batches,
-                    proof_with_data,
-                    max_txns_to_execute,
-                ),
+            Payload::QuorumStoreInlineHybrid(
+                inline_batches,
+                proof_with_data,
+                max_txns_to_execute,
             ) => {
                 let all_transactions = {
                     let mut all_txns =
-                        process_payload(proof_with_data, batch_reader.clone(), block).await?;
+                        process_payload(proof_with_data, self.batch_reader.clone(), block).await?;
                     all_txns.append(
                         &mut inline_batches
                             .iter()
@@ -252,7 +277,7 @@ impl PayloadManager {
                     inline_batches,
                 )
             },
-            (_, _) => unreachable!(
+            _ => unreachable!(
                 "Wrong payload {} epoch {}, round {}, id {}",
                 payload,
                 block.block_data().epoch(),
@@ -261,7 +286,7 @@ impl PayloadManager {
             ),
         };
 
-        if let PayloadManager::InQuorumStore(_, _, Some(consensus_publisher)) = self {
+        if let Some(consensus_publisher) = &self.maybe_consensus_publisher {
             let message = ConsensusObserverMessage::new_block_payload_message(
                 block.gen_block_info(HashValue::zero(), 0, None),
                 transaction_payload.clone(),
@@ -358,7 +383,7 @@ async fn process_payload(
                             "Oneshot channel to get a batch was dropped with error {:?}",
                             e
                         );
-                        let new_receivers = PayloadManager::request_transactions(
+                        let new_receivers = QuorumStorePayloadManager::request_transactions(
                             proof_with_data.proofs.clone(),
                             block.timestamp_usecs(),
                             batch_reader.clone(),
@@ -374,7 +399,7 @@ async fn process_payload(
                         vec_ret.push(data);
                     },
                     Ok(Err(e)) => {
-                        let new_receivers = PayloadManager::request_transactions(
+                        let new_receivers = QuorumStorePayloadManager::request_transactions(
                             proof_with_data.proofs.clone(),
                             block.timestamp_usecs(),
                             batch_reader.clone(),
@@ -398,3 +423,39 @@ async fn process_payload(
         },
     }
 }
+
+pub struct ConsensusObserverPayloadManager {
+    txns_pool: Arc<Mutex<BTreeMap<(u64, Round), BlockPayloadStatus>>>,
+    consensus_publisher: Option<Arc<ConsensusPublisher>>,
+}
+
+impl ConsensusObserverPayloadManager {
+    pub fn new(
+        txns_pool: Arc<Mutex<BTreeMap<(u64, Round), BlockPayloadStatus>>>,
+        consensus_publisher: Option<Arc<ConsensusPublisher>>,
+    ) -> Self {
+        Self {
+            txns_pool,
+            consensus_publisher,
+        }
+    }
+}
+
+#[async_trait]
+impl TPayloadManager for ConsensusObserverPayloadManager {
+    fn notify_commit(&self, _block_timestamp: u64, _payloads: Vec<Payload>) {
+        // noop
+    }
+
+    fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) {
+        // noop
+    }
+
+    async fn get_transactions(
+        &self,
+        block: &Block,
+    ) -> ExecutorResult<(Vec<SignedTransaction>, Option<u64>)> {
+        return get_transactions_for_observer(block, &self.txns_pool, &self.consensus_publisher)
+            .await;
+    }
+}
diff --git a/consensus/src/pipeline/execution_client.rs b/consensus/src/pipeline/execution_client.rs
index 8f45dbb513702..ef8b1d1f0ad7f 100644
--- a/consensus/src/pipeline/execution_client.rs
+++ b/consensus/src/pipeline/execution_client.rs
@@ -8,7 +8,7 @@ use crate::{
     error::StateSyncError,
     network::{IncomingCommitRequest, IncomingRandGenRequest, NetworkSender},
     network_interface::{ConsensusMsg, ConsensusNetworkClient},
-    payload_manager::PayloadManager,
+    payload_manager::TPayloadManager,
     pipeline::{
         buffer_manager::{OrderedBlocks, ResetAck, ResetRequest, ResetSignal},
         decoupled_execution_utils::prepare_phases_and_buffer_manager,
@@ -60,7 +60,7 @@ pub trait TExecutionClient: Send + Sync {
         &self,
         epoch_state: Arc<EpochState>,
         commit_signer_provider: Arc<dyn CommitSignerProvider>,
-        payload_manager: Arc<PayloadManager>,
+        payload_manager: Arc<dyn TPayloadManager>,
         onchain_consensus_config: &OnChainConsensusConfig,
         onchain_execution_config: &OnChainExecutionConfig,
         onchain_randomness_config: &OnChainRandomnessConfig,
@@ -294,7 +294,7 @@ impl TExecutionClient for ExecutionProxyClient {
         &self,
         epoch_state: Arc<EpochState>,
         commit_signer_provider: Arc<dyn CommitSignerProvider>,
-        payload_manager: Arc<PayloadManager>,
+        payload_manager: Arc<dyn TPayloadManager>,
         onchain_consensus_config: &OnChainConsensusConfig,
         onchain_execution_config: &OnChainExecutionConfig,
         onchain_randomness_config: &OnChainRandomnessConfig,
@@ -485,7 +485,7 @@ impl TExecutionClient for DummyExecutionClient {
         &self,
         _epoch_state: Arc<EpochState>,
         _commit_signer_provider: Arc<dyn CommitSignerProvider>,
-        _payload_manager: Arc<PayloadManager>,
+        _payload_manager: Arc<dyn TPayloadManager>,
         _onchain_consensus_config: &OnChainConsensusConfig,
         _onchain_execution_config: &OnChainExecutionConfig,
         _onchain_randomness_config: &OnChainRandomnessConfig,
diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs
index 1dca357991396..f3b54b7e6aa5b 100644
--- a/consensus/src/quorum_store/quorum_store_builder.rs
+++ b/consensus/src/quorum_store/quorum_store_builder.rs
@@ -7,7 +7,7 @@ use crate::{
     error::error_kind,
     network::{IncomingBatchRetrievalRequest, NetworkSender},
     network_interface::ConsensusMsg,
-    payload_manager::PayloadManager,
+    payload_manager::{DirectMempoolPayloadManager, QuorumStorePayloadManager, TPayloadManager},
     quorum_store::{
         batch_coordinator::{BatchCoordinator, BatchCoordinatorCommand},
         batch_generator::{BackPressure, BatchGenerator, BatchGeneratorCommand},
@@ -51,7 +51,7 @@ impl QuorumStoreBuilder {
         &mut self,
         consensus_publisher: Option<Arc<ConsensusPublisher>>,
     ) -> (
-        Arc<PayloadManager>,
+        Arc<dyn TPayloadManager>,
         Option<aptos_channel::Sender<AccountAddress, VerifiedEvent>>,
     ) {
         match self {
@@ -100,10 +100,10 @@ impl DirectMempoolInnerBuilder {
     fn init_payload_manager(
         &mut self,
     ) -> (
-        Arc<PayloadManager>,
+        Arc<dyn TPayloadManager>,
         Option<aptos_channel::Sender<AccountAddress, VerifiedEvent>>,
     ) {
-        (Arc::from(PayloadManager::DirectMempool), None)
+        (Arc::from(DirectMempoolPayloadManager::new()), None)
     }
 
     fn start(self) {
@@ -432,13 +432,13 @@ impl InnerBuilder {
         &mut self,
         consensus_publisher: Option<Arc<ConsensusPublisher>>,
     ) -> (
-        Arc<PayloadManager>,
+        Arc<dyn TPayloadManager>,
         Option<aptos_channel::Sender<AccountAddress, VerifiedEvent>>,
     ) {
         let batch_reader = self.create_batch_store();
 
         (
-            Arc::from(PayloadManager::InQuorumStore(
+            Arc::from(QuorumStorePayloadManager::new(
                 batch_reader,
                 // TODO: remove after splitting out clean requests
                 self.coordinator_tx.clone(),
diff --git a/consensus/src/recovery_manager.rs b/consensus/src/recovery_manager.rs
index 9f1f34a1e0877..57e308570f93d 100644
--- a/consensus/src/recovery_manager.rs
+++ b/consensus/src/recovery_manager.rs
@@ -7,7 +7,7 @@ use crate::{
     error::error_kind,
     monitor,
     network::NetworkSender,
-    payload_manager::PayloadManager,
+    payload_manager::TPayloadManager,
     persistent_liveness_storage::{PersistentLivenessStorage, RecoveryData},
     pipeline::execution_client::TExecutionClient,
     round_manager::VerifiedEvent,
@@ -33,7 +33,7 @@ pub struct RecoveryManager {
     execution_client: Arc<dyn TExecutionClient>,
     last_committed_round: Round,
     max_blocks_to_request: u64,
-    payload_manager: Arc<PayloadManager>,
+    payload_manager: Arc<dyn TPayloadManager>,
     order_vote_enabled: bool,
     pending_blocks: Arc<Mutex<PendingBlocks>>,
 }
@@ -46,7 +46,7 @@ impl RecoveryManager {
         execution_client: Arc<dyn TExecutionClient>,
         last_committed_round: Round,
         max_blocks_to_request: u64,
-        payload_manager: Arc<PayloadManager>,
+        payload_manager: Arc<dyn TPayloadManager>,
         order_vote_enabled: bool,
         pending_blocks: Arc<Mutex<PendingBlocks>>,
     ) -> Self {
diff --git a/consensus/src/round_manager_fuzzing.rs b/consensus/src/round_manager_fuzzing.rs
index a41b966a48ff9..85b797f50e93e 100644
--- a/consensus/src/round_manager_fuzzing.rs
+++ b/consensus/src/round_manager_fuzzing.rs
@@ -14,7 +14,7 @@ use crate::{
     metrics_safety_rules::MetricsSafetyRules,
     network::NetworkSender,
     network_interface::{ConsensusNetworkClient, DIRECT_SEND, RPC},
-    payload_manager::PayloadManager,
+    payload_manager::DirectMempoolPayloadManager,
     persistent_liveness_storage::{PersistentLivenessStorage, RecoveryData},
     pipeline::execution_client::DummyExecutionClient,
     round_manager::RoundManager,
@@ -90,7 +90,7 @@ fn build_empty_store(
         10, // max pruned blocks in mem
         Arc::new(SimulatedTimeService::new()),
         10,
-        Arc::from(PayloadManager::DirectMempool),
+        Arc::from(DirectMempoolPayloadManager::new()),
         false,
         Arc::new(Mutex::new(PendingBlocks::new())),
     ))
diff --git a/consensus/src/round_manager_test.rs b/consensus/src/round_manager_test.rs
index 25406de2de182..a3da9e193df2b 100644
--- a/consensus/src/round_manager_test.rs
+++ b/consensus/src/round_manager_test.rs
@@ -16,7 +16,7 @@ use crate::{
     network::{IncomingBlockRetrievalRequest, NetworkSender},
     network_interface::{CommitMessage, ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC},
     network_tests::{NetworkPlayground, TwinId},
-    payload_manager::PayloadManager,
+    payload_manager::DirectMempoolPayloadManager,
     persistent_liveness_storage::RecoveryData,
     pipeline::buffer_manager::OrderedBlocks,
     round_manager::RoundManager,
@@ -291,7 +291,7 @@ impl NodeSetup {
             10, // max pruned blocks in mem
             time_service.clone(),
             10,
-            Arc::from(PayloadManager::DirectMempool),
+            Arc::from(DirectMempoolPayloadManager::new()),
             false,
             Arc::new(Mutex::new(PendingBlocks::new())),
         ));
diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs
index 2b635c06f94af..db4240cee67b2 100644
--- a/consensus/src/state_computer.rs
+++ b/consensus/src/state_computer.rs
@@ -9,7 +9,7 @@ use crate::{
     error::StateSyncError,
     execution_pipeline::ExecutionPipeline,
     monitor,
-    payload_manager::PayloadManager,
+    payload_manager::TPayloadManager,
     state_replication::{StateComputer, StateComputerCommitCallBackType},
     transaction_deduper::TransactionDeduper,
     transaction_filter::TransactionFilter,
@@ -72,7 +72,7 @@ impl LogicalTime {
 #[derive(Clone)]
 struct MutableState {
     validators: Arc<[AccountAddress]>,
-    payload_manager: Arc<PayloadManager>,
+    payload_manager: Arc<dyn TPayloadManager>,
     transaction_shuffler: Arc<dyn TransactionShuffler>,
     block_executor_onchain_config: BlockExecutorConfigFromOnchain,
     transaction_deduper: Arc<dyn TransactionDeduper>,
@@ -382,7 +382,7 @@ impl StateComputer for ExecutionProxy {
     fn new_epoch(
         &self,
         epoch_state: &EpochState,
-        payload_manager: Arc<PayloadManager>,
+        payload_manager: Arc<dyn TPayloadManager>,
         transaction_shuffler: Arc<dyn TransactionShuffler>,
         block_executor_onchain_config: BlockExecutorConfigFromOnchain,
         transaction_deduper: Arc<dyn TransactionDeduper>,
@@ -412,7 +412,8 @@ impl StateComputer for ExecutionProxy {
 #[tokio::test]
 async fn test_commit_sync_race() {
     use crate::{
-        error::MempoolError, transaction_deduper::create_transaction_deduper,
+        error::MempoolError, payload_manager::DirectMempoolPayloadManager,
+        transaction_deduper::create_transaction_deduper,
         transaction_shuffler::create_transaction_shuffler,
     };
     use aptos_config::config::transaction_filter_type::Filter;
@@ -544,7 +545,7 @@ async fn test_commit_sync_race() {
 
     executor.new_epoch(
         &EpochState::empty(),
-        Arc::new(PayloadManager::DirectMempool),
+        Arc::new(DirectMempoolPayloadManager {}),
         create_transaction_shuffler(TransactionShufflerType::NoShuffling),
         BlockExecutorConfigFromOnchain::new_no_block_limit(),
         create_transaction_deduper(TransactionDeduperType::NoDedup),
diff --git a/consensus/src/state_computer_tests.rs b/consensus/src/state_computer_tests.rs
index 308eeeaea8729..970f43084cef2 100644
--- a/consensus/src/state_computer_tests.rs
+++ b/consensus/src/state_computer_tests.rs
@@ -2,10 +2,10 @@
 // SPDX-License-Identifier: Apache-2.0
 
 use crate::{
-    error::MempoolError, payload_manager::PayloadManager, state_computer::ExecutionProxy,
-    state_replication::StateComputer, transaction_deduper::NoOpDeduper,
-    transaction_filter::TransactionFilter, transaction_shuffler::NoOpShuffler,
-    txn_notifier::TxnNotifier,
+    error::MempoolError, payload_manager::DirectMempoolPayloadManager,
+    state_computer::ExecutionProxy, state_replication::StateComputer,
+    transaction_deduper::NoOpDeduper, transaction_filter::TransactionFilter,
+    transaction_shuffler::NoOpShuffler, txn_notifier::TxnNotifier,
 };
 use aptos_config::config::transaction_filter_type::Filter;
 use aptos_consensus_notifications::{ConsensusNotificationSender, Error};
@@ -136,6 +136,8 @@ impl BlockExecutorTrait for DummyBlockExecutor {
 #[tokio::test]
 #[cfg(test)]
 async fn schedule_compute_should_discover_validator_txns() {
+    use crate::payload_manager::DirectMempoolPayloadManager;
+
     let executor = Arc::new(DummyBlockExecutor::new());
 
     let execution_policy = ExecutionProxy::new(
@@ -162,7 +164,7 @@ async fn schedule_compute_should_discover_validator_txns() {
 
     execution_policy.new_epoch(
         &epoch_state,
-        Arc::new(PayloadManager::DirectMempool),
+        Arc::new(DirectMempoolPayloadManager::new()),
         Arc::new(NoOpShuffler {}),
         BlockExecutorConfigFromOnchain::new_no_block_limit(),
         Arc::new(NoOpDeduper {}),
@@ -228,7 +230,7 @@ async fn commit_should_discover_validator_txns() {
 
     execution_policy.new_epoch(
         &epoch_state,
-        Arc::new(PayloadManager::DirectMempool),
+        Arc::new(DirectMempoolPayloadManager::new()),
         Arc::new(NoOpShuffler {}),
         BlockExecutorConfigFromOnchain::new_no_block_limit(),
         Arc::new(NoOpDeduper {}),
diff --git a/consensus/src/state_replication.rs b/consensus/src/state_replication.rs
index 26da5fa80d163..fde757e1365bc 100644
--- a/consensus/src/state_replication.rs
+++ b/consensus/src/state_replication.rs
@@ -4,7 +4,7 @@
 
 use crate::{
     error::StateSyncError,
-    payload_manager::PayloadManager,
+    payload_manager::TPayloadManager,
     state_computer::{PipelineExecutionResult, StateComputeResultFut},
     transaction_deduper::TransactionDeduper,
     transaction_shuffler::TransactionShuffler,
@@ -72,7 +72,7 @@ pub trait StateComputer: Send + Sync {
     fn new_epoch(
         &self,
         epoch_state: &EpochState,
-        payload_manager: Arc<PayloadManager>,
+        payload_manager: Arc<dyn TPayloadManager>,
         transaction_shuffler: Arc<dyn TransactionShuffler>,
         block_executor_onchain_config: BlockExecutorConfigFromOnchain,
         transaction_deduper: Arc<dyn TransactionDeduper>,
diff --git a/consensus/src/test_utils/mock_execution_client.rs b/consensus/src/test_utils/mock_execution_client.rs
index f3ed33688bd47..30063920d5d83 100644
--- a/consensus/src/test_utils/mock_execution_client.rs
+++ b/consensus/src/test_utils/mock_execution_client.rs
@@ -5,7 +5,7 @@
 use crate::{
     error::StateSyncError,
     network::{IncomingCommitRequest, IncomingRandGenRequest},
-    payload_manager::PayloadManager,
+    payload_manager::{DirectMempoolPayloadManager, TPayloadManager},
     pipeline::{
         buffer_manager::OrderedBlocks, execution_client::TExecutionClient,
         signing_phase::CommitSignerProvider,
@@ -40,7 +40,7 @@ pub struct MockExecutionClient {
     executor_channel: UnboundedSender<OrderedBlocks>,
     consensus_db: Arc<MockStorage>,
     block_cache: Mutex<HashMap<HashValue, Payload>>,
-    payload_manager: Arc<PayloadManager>,
+    payload_manager: Arc<dyn TPayloadManager>,
 }
 
 impl MockExecutionClient {
@@ -54,7 +54,7 @@ impl MockExecutionClient {
             executor_channel,
             consensus_db,
             block_cache: Mutex::new(HashMap::new()),
-            payload_manager: Arc::from(PayloadManager::DirectMempool),
+            payload_manager: Arc::from(DirectMempoolPayloadManager::new()),
         }
     }
 
@@ -96,7 +96,7 @@ impl TExecutionClient for MockExecutionClient {
         &self,
         _epoch_state: Arc<EpochState>,
         _commit_signer_provider: Arc<dyn CommitSignerProvider>,
-        _payload_manager: Arc<PayloadManager>,
+        _payload_manager: Arc<dyn TPayloadManager>,
         _onchain_consensus_config: &OnChainConsensusConfig,
         _onchain_execution_config: &OnChainExecutionConfig,
         _onchain_randomness_config: &OnChainRandomnessConfig,
diff --git a/consensus/src/test_utils/mock_state_computer.rs b/consensus/src/test_utils/mock_state_computer.rs
index aeef2a0ef6e97..286dc9137ffd6 100644
--- a/consensus/src/test_utils/mock_state_computer.rs
+++ b/consensus/src/test_utils/mock_state_computer.rs
@@ -4,7 +4,7 @@
 
 use crate::{
     error::StateSyncError,
-    payload_manager::PayloadManager,
+    payload_manager::TPayloadManager,
     pipeline::buffer_manager::OrderedBlocks,
     state_computer::{PipelineExecutionResult, StateComputeResultFut},
     state_replication::{StateComputer, StateComputerCommitCallBackType},
@@ -83,7 +83,7 @@ impl StateComputer for EmptyStateComputer {
     fn new_epoch(
         &self,
         _: &EpochState,
-        _: Arc<PayloadManager>,
+        _: Arc<dyn TPayloadManager>,
         _: Arc<dyn TransactionShuffler>,
         _: BlockExecutorConfigFromOnchain,
         _: Arc<dyn TransactionDeduper>,
@@ -149,7 +149,7 @@ impl StateComputer for RandomComputeResultStateComputer {
     fn new_epoch(
         &self,
         _: &EpochState,
-        _: Arc<PayloadManager>,
+        _: Arc<dyn TPayloadManager>,
         _: Arc<dyn TransactionShuffler>,
         _: BlockExecutorConfigFromOnchain,
         _: Arc<dyn TransactionDeduper>,
diff --git a/consensus/src/test_utils/mod.rs b/consensus/src/test_utils/mod.rs
index 406c06ca19023..cedf130d904a8 100644
--- a/consensus/src/test_utils/mod.rs
+++ b/consensus/src/test_utils/mod.rs
@@ -2,7 +2,10 @@
 // Parts of the project are originally copyright © Meta Platforms, Inc.
 // SPDX-License-Identifier: Apache-2.0
 
-use crate::block_storage::{BlockReader, BlockStore};
+use crate::{
+    block_storage::{BlockReader, BlockStore},
+    payload_manager::DirectMempoolPayloadManager,
+};
 use aptos_consensus_types::{
     block::{block_test_utils::certificate_for_genesis, Block},
     common::{Author, Round},
@@ -25,8 +28,7 @@ mod mock_state_computer;
 mod mock_storage;
 
 use crate::{
-    block_storage::pending_blocks::PendingBlocks, payload_manager::PayloadManager,
-    pipeline::execution_client::DummyExecutionClient,
+    block_storage::pending_blocks::PendingBlocks, pipeline::execution_client::DummyExecutionClient,
     util::mock_time_service::SimulatedTimeService,
 };
 use aptos_consensus_types::{block::block_test_utils::gen_test_certificate, common::Payload};
@@ -90,7 +92,7 @@ pub fn build_empty_tree() -> Arc<BlockStore> {
         10, // max pruned blocks in mem
         Arc::new(SimulatedTimeService::new()),
         10,
-        Arc::from(PayloadManager::DirectMempool),
+        Arc::from(DirectMempoolPayloadManager::new()),
         false,
         Arc::new(Mutex::new(PendingBlocks::new())),
     ))
diff --git a/consensus/src/twins/twins_node.rs b/consensus/src/twins/twins_node.rs
index 4d347012544db..a1b511cb8c915 100644
--- a/consensus/src/twins/twins_node.rs
+++ b/consensus/src/twins/twins_node.rs
@@ -8,7 +8,7 @@ use crate::{
     network::NetworkTask,
     network_interface::{ConsensusNetworkClient, DIRECT_SEND, RPC},
     network_tests::{NetworkPlayground, TwinId},
-    payload_manager::PayloadManager,
+    payload_manager::DirectMempoolPayloadManager,
     pipeline::buffer_manager::OrderedBlocks,
     quorum_store::quorum_store_db::MockQuorumStoreDB,
     rand::rand_gen::storage::in_memory::InMemRandDb,
@@ -118,7 +118,7 @@ impl SMRNode {
         let reconfig_listener = ReconfigNotificationListener {
             notification_receiver: reconfig_events,
         };
-        let _commit_notifier = Arc::from(PayloadManager::DirectMempool);
+        let _commit_notifier = Arc::from(DirectMempoolPayloadManager::new());
         let mut configs = HashMap::new();
         configs.insert(
             ValidatorSet::CONFIG_ID,