Skip to content

Commit

Permalink
update execution client api
Browse files Browse the repository at this point in the history
  • Loading branch information
zjma committed Feb 28, 2024
1 parent d2bd397 commit 166f1b6
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 56 deletions.
16 changes: 16 additions & 0 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
payload_manager: Arc<PayloadManager>,
rand_config: Option<RandConfig>,
features: Features,
rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
) {
let epoch = epoch_state.epoch;
info!(
Expand Down Expand Up @@ -776,6 +777,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
&onchain_execution_config,
&features,
rand_config,
rand_msg_rx,
)
.await;

Expand Down Expand Up @@ -1017,6 +1019,14 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
.initialize_shared_component(&epoch_state, &consensus_config)
.await;

let (rand_msg_tx, rand_msg_rx) = aptos_channel::new::<AccountAddress, IncomingRandGenRequest>(
QueueStyle::FIFO,
100,
None,
);

self.rand_manager_msg_tx = Some(rand_msg_tx);

if consensus_config.is_dag_enabled() {
self.start_new_epoch_with_dag(
epoch_state,
Expand All @@ -1027,6 +1037,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
payload_manager,
rand_config,
&features,
rand_msg_rx,
)
.await
} else {
Expand All @@ -1039,6 +1050,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
payload_manager,
rand_config,
&features,
rand_msg_rx,
)
.await
}
Expand Down Expand Up @@ -1080,6 +1092,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
payload_manager: Arc<PayloadManager>,
rand_config: Option<RandConfig>,
features: &Features,
rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
) {
match self.storage.start() {
LivenessStorageData::FullRecoveryData(initial_data) => {
Expand All @@ -1094,6 +1107,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
payload_manager,
rand_config,
features.clone(),
rand_msg_rx,
)
.await
},
Expand All @@ -1120,6 +1134,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
payload_manager: Arc<PayloadManager>,
rand_config: Option<RandConfig>,
features: &Features,
rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
) {
let epoch = epoch_state.epoch;
let consensus_key = new_consensus_key_from_storage(&self.config.safety_rules.backend)
Expand All @@ -1141,6 +1156,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
&on_chain_execution_config,
features,
rand_config,
rand_msg_rx,
)
.await;

Expand Down
101 changes: 47 additions & 54 deletions consensus/src/pipeline/execution_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ pub trait TExecutionClient: Send + Sync {
onchain_execution_config: &OnChainExecutionConfig,
features: &Features,
rand_config: Option<RandConfig>,
) -> Option<aptos_channel::Sender<AccountAddress, IncomingRandGenRequest>>;
rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
);

/// This is needed for some DAG tests. Clean this up as a TODO.
fn get_execution_channel(&self) -> Option<UnboundedSender<OrderedBlocks>>;
Expand Down Expand Up @@ -170,7 +171,8 @@ impl ExecutionProxyClient {
commit_signer_provider: Arc<dyn CommitSignerProvider>,
epoch_state: Arc<EpochState>,
rand_config: Option<RandConfig>,
) -> Option<aptos_channel::Sender<AccountAddress, IncomingRandGenRequest>> {
rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
) {
let network_sender = NetworkSender::new(
self.author,
self.network_sender.clone(),
Expand All @@ -187,53 +189,44 @@ impl ExecutionProxyClient {
Some(&counters::BUFFER_MANAGER_MSGS),
);

let (
execution_ready_block_tx,
execution_ready_block_rx,
maybe_reset_tx_to_rand_manager,
maybe_rand_msg_tx,
) = if let Some(rand_config) = rand_config {
let (ordered_block_tx, ordered_block_rx) = unbounded::<OrderedBlocks>();
let (rand_ready_block_tx, rand_ready_block_rx) = unbounded::<OrderedBlocks>();
let (rand_msg_tx, rand_msg_rx) = aptos_channel::new::<
AccountAddress,
IncomingRandGenRequest,
>(QueueStyle::FIFO, 100, None);

let (reset_tx_to_rand_manager, reset_rand_manager_rx) = unbounded::<ResetRequest>();
let consensus_key =
load_consensus_key_from_secure_storage(&self.consensus_config.safety_rules)
.expect("Failed in loading consensus key for ExecutionProxyClient.");
let signer = Arc::new(ValidatorSigner::new(self.author, consensus_key));

let rand_manager = RandManager::<Share, AugmentedData>::new(
self.author,
epoch_state.clone(),
signer,
rand_config,
rand_ready_block_tx,
Arc::new(network_sender.clone()),
self.rand_storage.clone(),
self.bounded_executor.clone(),
);

tokio::spawn(rand_manager.start(
ordered_block_rx,
rand_msg_rx,
reset_rand_manager_rx,
self.bounded_executor.clone(),
));

(
ordered_block_tx,
rand_ready_block_rx,
Some(reset_tx_to_rand_manager),
Some(rand_msg_tx),
)
} else {
let (ordered_block_tx, ordered_block_rx) = unbounded();
(ordered_block_tx, ordered_block_rx, None, None)
};
let (execution_ready_block_tx, execution_ready_block_rx, maybe_reset_tx_to_rand_manager) =
if let Some(rand_config) = rand_config {
let (ordered_block_tx, ordered_block_rx) = unbounded::<OrderedBlocks>();
let (rand_ready_block_tx, rand_ready_block_rx) = unbounded::<OrderedBlocks>();

let (reset_tx_to_rand_manager, reset_rand_manager_rx) = unbounded::<ResetRequest>();
let consensus_key =
load_consensus_key_from_secure_storage(&self.consensus_config.safety_rules)
.expect("Failed in loading consensus key for ExecutionProxyClient.");
let signer = Arc::new(ValidatorSigner::new(self.author, consensus_key));

let rand_manager = RandManager::<Share, AugmentedData>::new(
self.author,
epoch_state.clone(),
signer,
rand_config,
rand_ready_block_tx,
Arc::new(network_sender.clone()),
self.rand_storage.clone(),
self.bounded_executor.clone(),
);

tokio::spawn(rand_manager.start(
ordered_block_rx,
rand_msg_rx,
reset_rand_manager_rx,
self.bounded_executor.clone(),
));

(
ordered_block_tx,
rand_ready_block_rx,
Some(reset_tx_to_rand_manager),
)
} else {
let (ordered_block_tx, ordered_block_rx) = unbounded();
(ordered_block_tx, ordered_block_rx, None)
};

self.handle.write().init(
execution_ready_block_tx,
Expand Down Expand Up @@ -266,8 +259,6 @@ impl ExecutionProxyClient {
tokio::spawn(signing_phase.start());
tokio::spawn(persisting_phase.start());
tokio::spawn(buffer_manager.start());

maybe_rand_msg_tx
}
}

Expand All @@ -282,11 +273,13 @@ impl TExecutionClient for ExecutionProxyClient {
onchain_execution_config: &OnChainExecutionConfig,
features: &Features,
rand_config: Option<RandConfig>,
) -> Option<aptos_channel::Sender<AccountAddress, IncomingRandGenRequest>> {
rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
) {
let maybe_rand_msg_tx = self.spawn_decoupled_execution(
commit_signer_provider,
epoch_state.clone(),
rand_config,
rand_msg_rx,
);

let transaction_shuffler =
Expand Down Expand Up @@ -457,8 +450,8 @@ impl TExecutionClient for DummyExecutionClient {
_onchain_execution_config: &OnChainExecutionConfig,
_features: &Features,
_rand_config: Option<RandConfig>,
) -> Option<aptos_channel::Sender<AccountAddress, IncomingRandGenRequest>> {
None
_rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
) {
}

fn get_execution_channel(&self) -> Option<UnboundedSender<OrderedBlocks>> {
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/test_utils/mock_execution_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ impl TExecutionClient for MockExecutionClient {
_onchain_execution_config: &OnChainExecutionConfig,
_features: &Features,
_rand_config: Option<RandConfig>,
) -> Option<aptos_channel::Sender<AccountAddress, IncomingRandGenRequest>> {
None
_rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
) {
}

fn get_execution_channel(&self) -> Option<UnboundedSender<OrderedBlocks>> {
Expand Down

0 comments on commit 166f1b6

Please sign in to comment.