diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index 6c3b984f9110e..fa148cfe85a17 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -16,7 +16,7 @@ use super::{ }; use crate::{ dag::{adapter::NotifierAdapter, dag_fetcher::DagFetcher}, - experimental::buffer_manager::OrderedBlocks, + experimental::{buffer_manager::OrderedBlocks, commit_reliable_broadcast::DropGuard}, network::IncomingDAGRequest, state_replication::{PayloadClient, StateComputer}, }; @@ -36,7 +36,10 @@ use aptos_types::{ ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, validator_signer::ValidatorSigner, }; -use futures::{FutureExt, StreamExt}; +use futures::{ + stream::{AbortHandle, Abortable}, + FutureExt, StreamExt, +}; use futures_channel::{ mpsc::{UnboundedReceiver, UnboundedSender}, oneshot, @@ -174,8 +177,8 @@ impl DagBootstrapper { self, mut dag_rpc_rx: Receiver, ordered_nodes_tx: UnboundedSender, - shutdown_rx: oneshot::Receiver<()>, - ) -> anyhow::Result<()> { + mut shutdown_rx: oneshot::Receiver<()>, + ) { let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, self.storage.clone())); let sync_manager = DagStateSynchronizer::new( @@ -192,8 +195,6 @@ impl DagBootstrapper { AggregateSignature::empty(), ); - let mut shutdown_rx = shutdown_rx.into_stream(); - loop { let (dag_store, order_rule) = self.bootstrap_dag_store(ledger_info.ledger_info().clone(), adapter.clone()); @@ -208,10 +209,10 @@ impl DagBootstrapper { // poll the network handler while waiting for rebootstrap notification or shutdown notification select! { biased; - _ = shutdown_rx.select_next_some() => { + _ = &mut shutdown_rx => { df_handle.abort(); let _ = df_handle.await; - return Ok(()); + return; }, certified_node_msg = handler.run(&mut dag_rpc_rx) => { df_handle.abort();