Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Sep 15, 2023
1 parent 27c3021 commit e642b50
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 47 deletions.
18 changes: 6 additions & 12 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use aptos_channels::{
aptos_channel::{self, Receiver},
message_queues::QueueStyle,
};
use aptos_consensus_types::common::{Author, Round};
use aptos_consensus_types::common::Author;
use aptos_crypto::HashValue;
use aptos_infallible::RwLock;
use aptos_logger::error;
Expand Down Expand Up @@ -84,14 +84,13 @@ impl DagBootstrapper {

fn bootstrap_dag_store(
&self,
initial_round: Round,
latest_ledger_info: LedgerInfo,
notifier: Arc<dyn Notifier>,
) -> (Arc<RwLock<Dag>>, OrderRule) {
let dag = Arc::new(RwLock::new(Dag::new(
self.epoch_state.clone(),
self.storage.clone(),
initial_round,
latest_ledger_info.round(),
DAG_WINDOW,
)));

Expand Down Expand Up @@ -177,10 +176,7 @@ impl DagBootstrapper {
ordered_nodes_tx: UnboundedSender<OrderedBlocks>,
shutdown_rx: oneshot::Receiver<()>,
) -> anyhow::Result<()> {
let adapter = Arc::new(NotifierAdapter::new(
ordered_nodes_tx,
self.storage.clone(),
));
let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, self.storage.clone()));

let sync_manager = DagStateSynchronizer::new(
self.epoch_state.clone(),
Expand All @@ -204,7 +200,7 @@ impl DagBootstrapper {

// TODO: fix
let (dag_store, order_rule) =
self.bootstrap_dag_store(0, ledger_info.ledger_info().clone(), adapter.clone());
self.bootstrap_dag_store(ledger_info.ledger_info().clone(), adapter.clone());

let state_sync_trigger = StateSyncTrigger::new(
dag_store.clone(),
Expand Down Expand Up @@ -276,7 +272,7 @@ pub(super) fn bootstrap_dag_for_test(
let (rebootstrap_notification_tx, _rebootstrap_notification_rx) = tokio::sync::mpsc::channel(1);

let (dag_store, order_rule) =
bootstraper.bootstrap_dag_store(0, latest_ledger_info, adapter.clone());
bootstraper.bootstrap_dag_store(latest_ledger_info, adapter.clone());

let state_sync_trigger = StateSyncTrigger::new(
dag_store.clone(),
Expand All @@ -287,9 +283,7 @@ pub(super) fn bootstrap_dag_for_test(
let (handler, fetch_service) =
bootstraper.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger);

let dh_handle = tokio::spawn(async move {
handler.start(&mut dag_rpc_rx).await
});
let dh_handle = tokio::spawn(async move { handler.start(&mut dag_rpc_rx).await });
let df_handle = tokio::spawn(fetch_service.start());

(dh_handle, df_handle, dag_rpc_tx, ordered_nodes_rx)
Expand Down
4 changes: 1 addition & 3 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ use aptos_logger::error;
use aptos_reliable_broadcast::ReliableBroadcast;
use aptos_time_service::{TimeService, TimeServiceTrait};
use aptos_types::{
aggregate_signature::AggregateSignature,
block_info::{BlockInfo, Round},
block_info::{Round},
epoch_state::EpochState,
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
};
use futures::{
future::{AbortHandle, Abortable},
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/dag_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl NetworkHandler {
.map(|r| r.into()),
DAGMessage::CertifiedNodeMsg(node) => match node.verify(&self.epoch_state.verifier) {
Ok(_) => {
let node = self.state_sync_trigger.check(node).await;
let node = self.state_sync_trigger.check(node).await
self.dag_driver
.process(node.certified_node())
.map(|r| r.into())
Expand Down
12 changes: 6 additions & 6 deletions consensus/src/dag/dag_state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,18 @@ impl StateSyncTrigger {
/// This method checks if a state sync is required, and if so,
/// notifies the bootstraper and yields the current task infinitely,
/// to let the bootstraper can abort this task.
pub(super) async fn check(&self, node: CertifiedNodeMessage) -> CertifiedNodeMessage {
pub(super) async fn check(&self, node: CertifiedNodeMessage) -> anyhow::Result<CertifiedNodeMessage> {
let ledger_info = node.ledger_info();

self.notify_commit_proof(ledger_info).await;

if self.need_sync_for_ledger_info(ledger_info) {
self.bootstrap_notifier.send(node).await;
self.bootstrap_notifier.send(node).await?;
loop {
tokio::task::yield_now().await;
}
}
node
Ok(node)
}

/// Fast forward in the decoupled-execution pipeline if the block exists there
Expand Down Expand Up @@ -127,7 +127,7 @@ impl DagStateSynchronizer {
node: &CertifiedNodeMessage,
dag_fetcher: impl TDagFetcher,
current_dag_store: Arc<RwLock<Dag>>,
) -> anyhow::Result<()> {
) -> anyhow::Result<Option<Dag>> {
let commit_li = node.ledger_info();

{
Expand All @@ -151,7 +151,7 @@ impl DagStateSynchronizer {
))
.await;
// TODO: make sure to terminate DAG and yield to epoch manager
return Ok(());
return Ok(None);
}

// TODO: there is a case where DAG fetches missing nodes in window and a crash happens and when we restart,
Expand Down Expand Up @@ -197,6 +197,6 @@ impl DagStateSynchronizer {

// TODO: the caller should rebootstrap the order rule

Ok(())
Ok(Arc::into_inner(sync_dag_store).map(|r| r.into_inner()))
}
}
35 changes: 13 additions & 22 deletions consensus/src/dag/tests/dag_state_sync_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
dag::{
adapter::Notifier,
dag_fetcher::{FetchRequestHandler, TDagFetcher},
dag_state_sync::DAG_WINDOW,
dag_state_sync::{DagStateSynchronizer, DAG_WINDOW},
dag_store::Dag,
storage::DAGStorage,
tests::{dag_test::MockStorage, helpers::generate_dag_nodes},
Expand Down Expand Up @@ -112,24 +112,17 @@ impl Notifier for MockNotifier {
async fn send_commit_proof(&self, _ledger_info: LedgerInfoWithSignatures) {}
}

fn setup(
epoch_state: Arc<EpochState>,
dag_store: Arc<RwLock<Dag>>,
storage: Arc<dyn DAGStorage>,
) -> StateSyncManager {
let network = Arc::new(MockDAGNetworkSender {});
fn setup(epoch_state: Arc<EpochState>, storage: Arc<dyn DAGStorage>) -> DagStateSynchronizer {
let time_service = TimeService::mock();
let state_computer = Arc::new(EmptyStateComputer {});
let upstream_notifier = Arc::new(MockNotifier {});
let downstream_notifier = Arc::new(MockNotifier {});

StateSyncManager::new(
DagStateSynchronizer::new(
epoch_state,
network,
upstream_notifier,
downstream_notifier,
time_service,
state_computer,
storage,
dag_store,
)
}

Expand Down Expand Up @@ -193,24 +186,22 @@ async fn test_dag_state_sync() {

let sync_node_li = CertifiedNodeMessage::new(sync_to_node, sync_to_li);

let state_sync = setup(epoch_state.clone(), slow_dag.clone(), storage.clone());
let dag_fetcher = Arc::new(MockDagFetcher {
let state_sync = setup(epoch_state.clone(), storage.clone());
let dag_fetcher = MockDagFetcher {
target_dag: fast_dag.clone(),
epoch_state: epoch_state.clone(),
});
};

let sync_result = state_sync
.sync_to_highest_ordered_anchor(&sync_node_li, dag_fetcher)
.sync_dag_to(&sync_node_li, dag_fetcher, slow_dag.clone())
.await;
let new_dag = sync_result.unwrap().unwrap();

let dag_reader = new_dag.read();

assert_eq!(dag_reader.lowest_round(), (LI_ROUNDS - DAG_WINDOW) as Round);
assert_eq!(dag_reader.highest_round(), (NUM_ROUNDS - 1) as Round);
assert_none!(dag_reader.highest_ordered_anchor_round(),);
assert_eq!(new_dag.lowest_round(), (LI_ROUNDS - DAG_WINDOW) as Round);
assert_eq!(new_dag.highest_round(), (NUM_ROUNDS - 1) as Round);
assert_none!(new_dag.highest_ordered_anchor_round(),);
assert_eq!(
dag_reader.highest_committed_anchor_round(),
new_dag.highest_committed_anchor_round(),
LI_ROUNDS as Round
);
}
5 changes: 2 additions & 3 deletions consensus/src/dag/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
network_interface::{ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC},
network_tests::{NetworkPlayground, TwinId},
test_utils::{
consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStateComputer, MockStorage,
consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStorage,
},
};
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
Expand All @@ -25,7 +25,6 @@ use aptos_network::{
transport::ConnectionMetadata,
ProtocolId,
};
use aptos_storage_interface::mock::MockDbReaderWriter;
use aptos_time_service::TimeService;
use aptos_types::{
epoch_state::EpochState,
Expand All @@ -35,7 +34,7 @@ use aptos_types::{
};
use claims::assert_gt;
use futures::{
stream::{select, AbortHandle, Select},
stream::{select, Select},
StreamExt,
};
use futures_channel::mpsc::UnboundedReceiver;
Expand Down

0 comments on commit e642b50

Please sign in to comment.