Skip to content

Commit

Permalink
Return state sync notification instead of channel
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Sep 15, 2023
1 parent e642b50 commit a29438b
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 64 deletions.
30 changes: 8 additions & 22 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::{
order_rule::OrderRule,
rb_handler::NodeBroadcastHandler,
storage::DAGStorage,
types::DAGMessage,
types::{CertifiedNodeMessage, DAGMessage},
};
use crate::{
dag::{adapter::NotifierAdapter, dag_fetcher::DagFetcher},
Expand Down Expand Up @@ -195,18 +195,10 @@ impl DagBootstrapper {
let mut shutdown_rx = shutdown_rx.into_stream();

loop {
let (rebootstrap_notification_tx, mut rebootstrap_notification_rx) =
tokio::sync::mpsc::channel(1);

// TODO: fix
let (dag_store, order_rule) =
self.bootstrap_dag_store(ledger_info.ledger_info().clone(), adapter.clone());

let state_sync_trigger = StateSyncTrigger::new(
dag_store.clone(),
adapter.clone(),
rebootstrap_notification_tx,
);
let state_sync_trigger = StateSyncTrigger::new(dag_store.clone(), adapter.clone());

let (handler, fetch_service) =
self.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger);
Expand All @@ -221,17 +213,16 @@ impl DagBootstrapper {
let _ = df_handle.await;
return Ok(());
},
Some(node) = rebootstrap_notification_rx.recv() => {
certified_node_msg = handler.run(&mut dag_rpc_rx) => {
df_handle.abort();
let _ = df_handle.await;

let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone());

if let Err(e) = sync_manager.sync_dag_to(&node, dag_fetcher, dag_store.clone()).await {
if let Err(e) = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone()).await {
error!(error = ?e, "unable to sync");
}
},
_ = handler.start(&mut dag_rpc_rx) => {}
}
}
}
}
Expand All @@ -249,7 +240,7 @@ pub(super) fn bootstrap_dag_for_test(
payload_client: Arc<dyn PayloadClient>,
state_computer: Arc<dyn StateComputer>,
) -> (
JoinHandle<()>,
JoinHandle<CertifiedNodeMessage>,
JoinHandle<()>,
aptos_channel::Sender<Author, IncomingDAGRequest>,
UnboundedReceiver<OrderedBlocks>,
Expand All @@ -269,21 +260,16 @@ pub(super) fn bootstrap_dag_for_test(
let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded();
let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, storage.clone()));
let (dag_rpc_tx, mut dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None);
let (rebootstrap_notification_tx, _rebootstrap_notification_rx) = tokio::sync::mpsc::channel(1);

let (dag_store, order_rule) =
bootstraper.bootstrap_dag_store(latest_ledger_info, adapter.clone());

let state_sync_trigger = StateSyncTrigger::new(
dag_store.clone(),
adapter.clone(),
rebootstrap_notification_tx,
);
let state_sync_trigger = StateSyncTrigger::new(dag_store.clone(), adapter.clone());

let (handler, fetch_service) =
bootstraper.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger);

let dh_handle = tokio::spawn(async move { handler.start(&mut dag_rpc_rx).await });
let dh_handle = tokio::spawn(async move { handler.run(&mut dag_rpc_rx).await });
let df_handle = tokio::spawn(fetch_service.start());

(dh_handle, df_handle, dag_rpc_tx, ordered_nodes_rx)
Expand Down
5 changes: 1 addition & 4 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_reliable_broadcast::ReliableBroadcast;
use aptos_time_service::{TimeService, TimeServiceTrait};
use aptos_types::{
block_info::{Round},
epoch_state::EpochState,
};
use aptos_types::{block_info::Round, epoch_state::EpochState};
use futures::{
future::{AbortHandle, Abortable},
FutureExt,
Expand Down
49 changes: 33 additions & 16 deletions consensus/src/dag/dag_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
use super::{
dag_driver::DagDriver,
dag_fetcher::{FetchRequestHandler, FetchWaiter},
dag_state_sync::StateSyncTrigger,
types::TDAGMessage,
dag_state_sync::{
StateSyncStatus::{self, NeedsSync, Synced},
StateSyncTrigger,
},
types::{CertifiedNodeMessage, TDAGMessage},
CertifiedNode, Node,
};
use crate::{
Expand Down Expand Up @@ -53,16 +56,23 @@ impl NetworkHandler {
}
}

pub async fn start(
pub async fn run(
mut self,
dag_rpc_rx: &mut aptos_channel::Receiver<Author, IncomingDAGRequest>,
) {
) -> CertifiedNodeMessage {
// TODO(ibalajiarun): clean up Reliable Broadcast storage periodically.
loop {
select! {
Some(msg) = dag_rpc_rx.next() => {
if let Err(e) = self.process_rpc(msg).await {
warn!(error = ?e, "error processing rpc");
match self.process_rpc(msg).await {
Ok(sync_status) => {
if let StateSyncStatus::NeedsSync(certified_node_msg) = sync_status {
return certified_node_msg;
}
},
Err(e) => {
warn!(error = ?e, "error processing rpc");
}
}
},
Some(res) = self.node_fetch_waiter.next() => {
Expand All @@ -79,7 +89,10 @@ impl NetworkHandler {
}
}

async fn process_rpc(&mut self, rpc_request: IncomingDAGRequest) -> anyhow::Result<()> {
async fn process_rpc(
&mut self,
rpc_request: IncomingDAGRequest,
) -> anyhow::Result<StateSyncStatus> {
let dag_message: DAGMessage = rpc_request.req.try_into()?;

let author = dag_message
Expand All @@ -94,14 +107,18 @@ impl NetworkHandler {
.verify(&self.epoch_state.verifier)
.and_then(|_| self.node_receiver.process(node))
.map(|r| r.into()),
DAGMessage::CertifiedNodeMsg(node) => match node.verify(&self.epoch_state.verifier) {
Ok(_) => {
let node = self.state_sync_trigger.check(node).await
self.dag_driver
.process(node.certified_node())
.map(|r| r.into())
},
Err(e) => Err(e),
DAGMessage::CertifiedNodeMsg(certified_node_msg) => {
match certified_node_msg.verify(&self.epoch_state.verifier) {
Ok(_) => match self.state_sync_trigger.check(certified_node_msg).await {
ret @ (NeedsSync(_), None) => return Ok(ret.0),
(Synced, Some(certified_node_msg)) => self
.dag_driver
.process(certified_node_msg.certified_node())
.map(|r| r.into()),
_ => unreachable!(),
},
Err(e) => Err(e),
}
},
DAGMessage::FetchRequest(request) => request
.verify(&self.epoch_state.verifier)
Expand All @@ -126,6 +143,6 @@ impl NetworkHandler {
.response_sender
.send(response)
.map_err(|_| anyhow::anyhow!("unable to respond to rpc"))
.into()
.map(|_| StateSyncStatus::Synced)
}
}
25 changes: 12 additions & 13 deletions consensus/src/dag/dag_state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,39 @@ use std::sync::Arc;
pub const DAG_WINDOW: usize = 1;
pub const STATE_SYNC_WINDOW_MULTIPLIER: usize = 30;

pub(super) enum StateSyncStatus {
NeedsSync(CertifiedNodeMessage),
Synced,
}

pub(super) struct StateSyncTrigger {
dag_store: Arc<RwLock<Dag>>,
downstream_notifier: Arc<dyn Notifier>,
bootstrap_notifier: tokio::sync::mpsc::Sender<CertifiedNodeMessage>,
}

impl StateSyncTrigger {
pub(super) fn new(
dag_store: Arc<RwLock<Dag>>,
downstream_notifier: Arc<dyn Notifier>,
bootstrap_notifier: tokio::sync::mpsc::Sender<CertifiedNodeMessage>,
) -> Self {
pub(super) fn new(dag_store: Arc<RwLock<Dag>>, downstream_notifier: Arc<dyn Notifier>) -> Self {
Self {
dag_store,
downstream_notifier,
bootstrap_notifier,
}
}

/// This method checks if a state sync is required, and if so,
/// notifies the bootstraper and yields the current task infinitely,
/// to let the bootstraper can abort this task.
pub(super) async fn check(&self, node: CertifiedNodeMessage) -> anyhow::Result<CertifiedNodeMessage> {
pub(super) async fn check(
&self,
node: CertifiedNodeMessage,
) -> (StateSyncStatus, Option<CertifiedNodeMessage>) {
let ledger_info = node.ledger_info();

self.notify_commit_proof(ledger_info).await;

if self.need_sync_for_ledger_info(ledger_info) {
self.bootstrap_notifier.send(node).await?;
loop {
tokio::task::yield_now().await;
}
return (StateSyncStatus::NeedsSync(node), None);
}
Ok(node)
(StateSyncStatus::Synced, Some(node))
}

/// Fast forward in the decoupled-execution pipeline if the block exists there
Expand Down
5 changes: 1 addition & 4 deletions consensus/src/dag/tests/dag_state_sync_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,5 @@ async fn test_dag_state_sync() {
assert_eq!(new_dag.lowest_round(), (LI_ROUNDS - DAG_WINDOW) as Round);
assert_eq!(new_dag.highest_round(), (NUM_ROUNDS - 1) as Round);
assert_none!(new_dag.highest_ordered_anchor_round(),);
assert_eq!(
new_dag.highest_committed_anchor_round(),
LI_ROUNDS as Round
);
assert_eq!(new_dag.highest_committed_anchor_round(), LI_ROUNDS as Round);
}
8 changes: 3 additions & 5 deletions consensus/src/dag/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

use super::dag_test;
use crate::{
dag::bootstrap::bootstrap_dag_for_test,
dag::{bootstrap::bootstrap_dag_for_test, types::CertifiedNodeMessage},
experimental::buffer_manager::OrderedBlocks,
network::{DAGNetworkSenderImpl, IncomingDAGRequest, NetworkSender},
network_interface::{ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC},
network_tests::{NetworkPlayground, TwinId},
test_utils::{
consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStorage,
},
test_utils::{consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStorage},
};
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::network_id::{NetworkId, PeerNetworkId};
Expand Down Expand Up @@ -43,7 +41,7 @@ use std::sync::Arc;
use tokio::task::JoinHandle;

struct DagBootstrapUnit {
nh_task_handle: JoinHandle<()>,
nh_task_handle: JoinHandle<CertifiedNodeMessage>,
df_task_handle: JoinHandle<()>,
dag_rpc_tx: aptos_channel::Sender<Author, IncomingDAGRequest>,
network_events:
Expand Down

0 comments on commit a29438b

Please sign in to comment.