Skip to content

Commit

Permalink
[consensus] payload manager enum to trait refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Jul 22, 2024
1 parent ff35414 commit 3120643
Show file tree
Hide file tree
Showing 21 changed files with 296 additions and 208 deletions.
6 changes: 3 additions & 3 deletions consensus/src/block_preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::{
counters::{MAX_TXNS_FROM_BLOCK_TO_EXECUTE, TXN_SHUFFLE_SECONDS},
payload_manager::PayloadManager,
payload_manager::TPayloadManager,
transaction_deduper::TransactionDeduper,
transaction_filter::TransactionFilter,
transaction_shuffler::TransactionShuffler,
Expand All @@ -15,15 +15,15 @@ use fail::fail_point;
use std::sync::Arc;

pub struct BlockPreparer {
payload_manager: Arc<PayloadManager>,
payload_manager: Arc<dyn TPayloadManager>,
txn_filter: Arc<TransactionFilter>,
txn_deduper: Arc<dyn TransactionDeduper>,
txn_shuffler: Arc<dyn TransactionShuffler>,
}

impl BlockPreparer {
pub fn new(
payload_manager: Arc<PayloadManager>,
payload_manager: Arc<dyn TPayloadManager>,
txn_filter: Arc<TransactionFilter>,
txn_deduper: Arc<dyn TransactionDeduper>,
txn_shuffler: Arc<dyn TransactionShuffler>,
Expand Down
8 changes: 4 additions & 4 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
BlockReader,
},
counters,
payload_manager::PayloadManager,
payload_manager::TPayloadManager,
persistent_liveness_storage::{
PersistentLivenessStorage, RecoveryData, RootInfo, RootMetadata,
},
Expand Down Expand Up @@ -74,7 +74,7 @@ pub struct BlockStore {
time_service: Arc<dyn TimeService>,
// consistent with round type
vote_back_pressure_limit: Round,
payload_manager: Arc<PayloadManager>,
payload_manager: Arc<dyn TPayloadManager>,
#[cfg(any(test, feature = "fuzzing"))]
back_pressure_for_test: AtomicBool,
order_vote_enabled: bool,
Expand All @@ -89,7 +89,7 @@ impl BlockStore {
max_pruned_blocks_in_mem: usize,
time_service: Arc<dyn TimeService>,
vote_back_pressure_limit: Round,
payload_manager: Arc<PayloadManager>,
payload_manager: Arc<dyn TPayloadManager>,
order_vote_enabled: bool,
pending_blocks: Arc<Mutex<PendingBlocks>>,
) -> Self {
Expand Down Expand Up @@ -144,7 +144,7 @@ impl BlockStore {
max_pruned_blocks_in_mem: usize,
time_service: Arc<dyn TimeService>,
vote_back_pressure_limit: Round,
payload_manager: Arc<PayloadManager>,
payload_manager: Arc<dyn TPayloadManager>,
order_vote_enabled: bool,
pending_blocks: Arc<Mutex<PendingBlocks>>,
) -> Self {
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/block_storage/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
monitor,
network::{IncomingBlockRetrievalRequest, NetworkSender},
network_interface::ConsensusMsg,
payload_manager::PayloadManager,
payload_manager::TPayloadManager,
persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData},
pipeline::execution_client::TExecutionClient,
};
Expand Down Expand Up @@ -295,7 +295,7 @@ impl BlockStore {
retriever: &'a mut BlockRetriever,
storage: Arc<dyn PersistentLivenessStorage>,
execution_client: Arc<dyn TExecutionClient>,
payload_manager: Arc<PayloadManager>,
payload_manager: Arc<dyn TPayloadManager>,
order_vote_enabled: bool,
) -> anyhow::Result<RecoveryData> {
info!(
Expand Down
17 changes: 9 additions & 8 deletions consensus/src/consensus_observer/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ use crate::{
payload_store::BlockPayloadStore,
pending_blocks::PendingBlockStore,
publisher::ConsensusPublisher,
subscription,
subscription::ConsensusObserverSubscription,
subscription::{self, ConsensusObserverSubscription},
},
dag::DagCommitSigner,
network::{IncomingCommitRequest, IncomingRandGenRequest},
network_interface::CommitMessage,
payload_manager::PayloadManager,
payload_manager::{
ConsensusObserverPayloadManager, DirectMempoolPayloadManager, TPayloadManager,
},
pipeline::execution_client::TExecutionClient,
state_replication::StateComputerCommitCallBackType,
};
Expand Down Expand Up @@ -1094,13 +1095,13 @@ impl ConsensusObserver {
);

// Create the payload manager
let payload_manager = if self.quorum_store_enabled {
PayloadManager::ConsensusObserver(
let payload_manager: Arc<dyn TPayloadManager> = if self.quorum_store_enabled {
Arc::new(ConsensusObserverPayloadManager::new(
self.block_payload_store.get_block_payloads(),
self.consensus_publisher.clone(),
)
))
} else {
PayloadManager::DirectMempool
Arc::new(DirectMempoolPayloadManager {})
};

// Start the new epoch
Expand All @@ -1115,7 +1116,7 @@ impl ConsensusObserver {
.start_epoch(
epoch_state.clone(),
dummy_signer,
Arc::new(payload_manager),
payload_manager,
&consensus_config,
&execution_config,
&randomness_config,
Expand Down
8 changes: 4 additions & 4 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
monitor,
network::IncomingDAGRequest,
payload_client::PayloadClient,
payload_manager::PayloadManager,
payload_manager::TPayloadManager,
pipeline::{buffer_manager::OrderedBlocks, execution_client::TExecutionClient},
};
use aptos_bounded_executor::BoundedExecutor;
Expand Down Expand Up @@ -330,7 +330,7 @@ pub struct DagBootstrapper {
dag_network_sender: Arc<dyn TDAGNetworkSender>,
proof_notifier: Arc<dyn ProofNotifier>,
time_service: aptos_time_service::TimeService,
payload_manager: Arc<PayloadManager>,
payload_manager: Arc<dyn TPayloadManager>,
payload_client: Arc<dyn PayloadClient>,
ordered_nodes_tx: UnboundedSender<OrderedBlocks>,
execution_client: Arc<dyn TExecutionClient>,
Expand All @@ -355,7 +355,7 @@ impl DagBootstrapper {
dag_network_sender: Arc<dyn TDAGNetworkSender>,
proof_notifier: Arc<dyn ProofNotifier>,
time_service: aptos_time_service::TimeService,
payload_manager: Arc<PayloadManager>,
payload_manager: Arc<dyn TPayloadManager>,
payload_client: Arc<dyn PayloadClient>,
ordered_nodes_tx: UnboundedSender<OrderedBlocks>,
execution_client: Arc<dyn TExecutionClient>,
Expand Down Expand Up @@ -731,7 +731,7 @@ pub(super) fn bootstrap_dag_for_test(
dag_network_sender: Arc<dyn TDAGNetworkSender>,
proof_notifier: Arc<dyn ProofNotifier>,
time_service: aptos_time_service::TimeService,
payload_manager: Arc<PayloadManager>,
payload_manager: Arc<dyn TPayloadManager>,
payload_client: Arc<dyn PayloadClient>,
execution_client: Arc<dyn TExecutionClient>,
) -> (
Expand Down
19 changes: 17 additions & 2 deletions consensus/src/dag/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,30 @@ use crate::{
},
payload_manager::TPayloadManager,
};
use aptos_consensus_types::common::{Author, Payload, Round};
use aptos_types::aggregate_signature::AggregateSignature;
use aptos_consensus_types::{
block::Block,
common::{Author, Payload, Round},
};
use aptos_executor_types::ExecutorResult;
use aptos_types::{aggregate_signature::AggregateSignature, transaction::SignedTransaction};
use async_trait::async_trait;

pub(super) const TEST_DAG_WINDOW: u64 = 5;

pub(super) struct MockPayloadManager {}

#[async_trait]
impl TPayloadManager for MockPayloadManager {
fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) {}

fn notify_commit(&self, _block_timestamp: u64, _payloads: Vec<Payload>) {}

async fn get_transactions(
&self,
block: &Block,
) -> ExecutorResult<(Vec<SignedTransaction>, Option<u64>)> {
Ok((Vec::new(), None))
}
}

pub(super) struct MockOrderRule {}
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/dag/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
network::{IncomingDAGRequest, NetworkSender, RpcResponder},
network_interface::{ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC},
network_tests::{NetworkPlayground, TwinId},
payload_manager::PayloadManager,
payload_manager::{DirectMempoolPayloadManager, TPayloadManager},
pipeline::{buffer_manager::OrderedBlocks, execution_client::DummyExecutionClient},
test_utils::{consensus_runtime, MockPayloadManager, MockStorage},
};
Expand Down Expand Up @@ -78,7 +78,7 @@ impl DagBootstrapUnit {
let network = Arc::new(network);

let payload_client = Arc::new(MockPayloadManager::new(None));
let payload_manager = Arc::new(PayloadManager::DirectMempool);
let payload_manager = Arc::new(DirectMempoolPayloadManager::new());

let execution_client = Arc::new(DummyExecutionClient);

Expand Down
26 changes: 17 additions & 9 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::{
payload_client::{
mixed::MixedPayloadClient, user::quorum_store_client::QuorumStoreClient, PayloadClient,
},
payload_manager::PayloadManager,
payload_manager::{DirectMempoolPayloadManager, TPayloadManager},
persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData},
pipeline::execution_client::TExecutionClient,
quorum_store::{
Expand Down Expand Up @@ -172,7 +172,7 @@ pub struct EpochManager<P: OnChainConfigProvider> {
dag_rpc_tx: Option<aptos_channel::Sender<AccountAddress, IncomingDAGRequest>>,
dag_shutdown_tx: Option<oneshot::Sender<oneshot::Sender<()>>>,
dag_config: DagConsensusConfig,
payload_manager: Arc<PayloadManager>,
payload_manager: Arc<dyn TPayloadManager>,
rand_storage: Arc<dyn RandStorage<AugmentedData>>,
proof_cache: ProofCache,
consensus_publisher: Option<Arc<ConsensusPublisher>>,
Expand Down Expand Up @@ -237,7 +237,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
dag_shutdown_tx: None,
aptos_time_service,
dag_config,
payload_manager: Arc::new(PayloadManager::DirectMempool),
payload_manager: Arc::new(DirectMempoolPayloadManager::new()),
rand_storage,
proof_cache: Cache::builder()
.max_capacity(node_config.consensus.proof_cache_capacity)
Expand Down Expand Up @@ -672,7 +672,11 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
epoch_state: &EpochState,
network_sender: NetworkSender,
consensus_config: &OnChainConsensusConfig,
) -> (Arc<PayloadManager>, QuorumStoreClient, QuorumStoreBuilder) {
) -> (
Arc<dyn TPayloadManager>,
QuorumStoreClient,
QuorumStoreBuilder,
) {
// Start QuorumStore
let (consensus_to_quorum_store_tx, consensus_to_quorum_store_rx) =
mpsc::channel(self.config.intra_consensus_channel_buffer_size);
Expand Down Expand Up @@ -755,7 +759,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
onchain_jwk_consensus_config: OnChainJWKConsensusConfig,
network_sender: Arc<NetworkSender>,
payload_client: Arc<dyn PayloadClient>,
payload_manager: Arc<PayloadManager>,
payload_manager: Arc<dyn TPayloadManager>,
rand_config: Option<RandConfig>,
fast_rand_config: Option<RandConfig>,
rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
Expand Down Expand Up @@ -1194,7 +1198,11 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
&mut self,
epoch_state: &EpochState,
consensus_config: &OnChainConsensusConfig,
) -> (NetworkSender, Arc<dyn PayloadClient>, Arc<PayloadManager>) {
) -> (
NetworkSender,
Arc<dyn PayloadClient>,
Arc<dyn TPayloadManager>,
) {
self.set_epoch_start_metrics(epoch_state);
self.quorum_store_enabled = self.enable_quorum_store(consensus_config);
let network_sender = self.create_network_sender(epoch_state);
Expand Down Expand Up @@ -1225,7 +1233,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
jwk_consensus_config: OnChainJWKConsensusConfig,
network_sender: NetworkSender,
payload_client: Arc<dyn PayloadClient>,
payload_manager: Arc<PayloadManager>,
payload_manager: Arc<dyn TPayloadManager>,
rand_config: Option<RandConfig>,
fast_rand_config: Option<RandConfig>,
rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
Expand Down Expand Up @@ -1271,7 +1279,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
onchain_jwk_consensus_config: OnChainJWKConsensusConfig,
network_sender: NetworkSender,
payload_client: Arc<dyn PayloadClient>,
payload_manager: Arc<PayloadManager>,
payload_manager: Arc<dyn TPayloadManager>,
rand_config: Option<RandConfig>,
fast_rand_config: Option<RandConfig>,
rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
Expand Down Expand Up @@ -1550,7 +1558,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
buffered_proposal_tx: Option<aptos_channel::Sender<Author, VerifiedEvent>>,
peer_id: AccountAddress,
event: VerifiedEvent,
payload_manager: Arc<PayloadManager>,
payload_manager: Arc<dyn TPayloadManager>,
pending_blocks: Arc<Mutex<PendingBlocks>>,
) {
if let VerifiedEvent::ProposalMsg(proposal) = &event {
Expand Down
Loading

0 comments on commit 3120643

Please sign in to comment.