Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Sep 15, 2023
1 parent a29438b commit df5a89d
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::{
};
use crate::{
dag::{adapter::NotifierAdapter, dag_fetcher::DagFetcher},
experimental::buffer_manager::OrderedBlocks,
experimental::{buffer_manager::OrderedBlocks, commit_reliable_broadcast::DropGuard},
network::IncomingDAGRequest,
state_replication::{PayloadClient, StateComputer},
};
Expand All @@ -36,7 +36,10 @@ use aptos_types::{
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
validator_signer::ValidatorSigner,
};
use futures::{FutureExt, StreamExt};
use futures::{
stream::{AbortHandle, Abortable},
FutureExt, StreamExt,
};
use futures_channel::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
Expand Down Expand Up @@ -174,8 +177,8 @@ impl DagBootstrapper {
self,
mut dag_rpc_rx: Receiver<Author, IncomingDAGRequest>,
ordered_nodes_tx: UnboundedSender<OrderedBlocks>,
shutdown_rx: oneshot::Receiver<()>,
) -> anyhow::Result<()> {
mut shutdown_rx: oneshot::Receiver<()>,
) {
let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, self.storage.clone()));

let sync_manager = DagStateSynchronizer::new(
Expand All @@ -192,8 +195,6 @@ impl DagBootstrapper {
AggregateSignature::empty(),
);

let mut shutdown_rx = shutdown_rx.into_stream();

loop {
let (dag_store, order_rule) =
self.bootstrap_dag_store(ledger_info.ledger_info().clone(), adapter.clone());
Expand All @@ -208,10 +209,10 @@ impl DagBootstrapper {
// poll the network handler while waiting for rebootstrap notification or shutdown notification
select! {
biased;
_ = shutdown_rx.select_next_some() => {
_ = &mut shutdown_rx => {
df_handle.abort();
let _ = df_handle.await;
return Ok(());
return;
},
certified_node_msg = handler.run(&mut dag_rpc_rx) => {
df_handle.abort();
Expand Down

0 comments on commit df5a89d

Please sign in to comment.