Skip to content

Commit

Permalink
Fix Hanging CI (#3697)
Browse files Browse the repository at this point in the history
* shutdown completion task last

* fix shutdown order

* fmt

* log everything at info when test fails

* clear failed, logging

* fix build

* different log level

* no capture again

* typo

* move logging + do startups in parallel

* fmt

* change initial timeout

* remove nocapture

* nocapture again

* fix

* only log nodes started when there are nodes starting

* log exit

* log when timeout starts

* log id and view

* only shutdown from 1 place

* fix build, remove handles from completetion task

* leave one up in cdn test

* more logs, less threads, maybe fix?

* actual fix

* lint fmt

* allow more than 1 thread, tweaks

* remove nocapture

* move byzantine tests to ci-3

* rebalance tests more

* one more test to 4

* only spawn first timeout when starting consensus

* cleanup

* fix justfile lint tokio

* fix justfil

* sleep longer, nocapture to debug

* info

* fix another hot loop maybe

* don't spawn r/r tasks for cdn that do nothing

* lint no sleep

* lower log level in libp2p

* lower builder test threshold

* remove nocapture for the last time, hopefully

* remove cleanup_previous_timeouts_on_view
  • Loading branch information
bfish713 authored Sep 27, 2024
1 parent 4b54b64 commit ce7c0a3
Show file tree
Hide file tree
Showing 23 changed files with 166 additions and 188 deletions.
20 changes: 19 additions & 1 deletion crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::{
};

use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
use async_compatibility_layer::art::async_spawn;
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::RwLock;
use async_trait::async_trait;
use futures::join;
Expand Down Expand Up @@ -370,6 +370,24 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
self.start_view
)
});

// Clone the event stream that we send the timeout event to
let event_stream = self.internal_event_stream.0.clone();
let next_view_timeout = self.config.next_view_timeout;
let start_view = self.start_view;

// Spawn a task that will sleep for the next view timeout and then send a timeout event
// if not cancelled
async_spawn({
async move {
async_sleep(Duration::from_millis(next_view_timeout)).await;
broadcast_event(
Arc::new(HotShotEvent::Timeout(start_view + 1)),
&event_stream,
)
.await;
}
});
#[cfg(feature = "dependency-tasks")]
{
if let Some(validated_state) = consensus.validated_state_map().get(&self.start_view) {
Expand Down
12 changes: 4 additions & 8 deletions crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
sync::{atomic::AtomicBool, Arc},
};

use async_compatibility_layer::art::async_spawn;
use async_trait::async_trait;
use chrono::Utc;
use hotshot_task_impls::{
Expand Down Expand Up @@ -220,7 +221,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
{
async fn create_from(handle: &SystemContextHandle<TYPES, I, V>) -> Self {
let consensus = handle.hotshot.consensus();
let timeout_task = handle.spawn_initial_timeout_task();

Self {
consensus: OuterConsensus::new(consensus),
Expand All @@ -232,7 +232,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
payload_commitment_and_metadata: None,
vote_collectors: BTreeMap::default(),
timeout_vote_collectors: BTreeMap::default(),
timeout_task,
timeout_task: async_spawn(async {}),
spawned_tasks: BTreeMap::new(),
formed_upgrade_certificate: None,
proposal_cert: None,
Expand Down Expand Up @@ -282,7 +282,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
{
async fn create_from(handle: &SystemContextHandle<TYPES, I, V>) -> Self {
let consensus = handle.hotshot.consensus();
let timeout_task = handle.spawn_initial_timeout_task();

Self {
latest_proposed_view: handle.cur_view().await,
Expand All @@ -297,7 +296,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
private_key: handle.private_key().clone(),
storage: Arc::clone(&handle.storage),
timeout: handle.hotshot.config.next_view_timeout,
timeout_task,
round_start_delay: handle.hotshot.config.round_start_delay,
id: handle.hotshot.id,
formed_upgrade_certificate: None,
Expand All @@ -312,7 +310,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
{
async fn create_from(handle: &SystemContextHandle<TYPES, I, V>) -> Self {
let consensus = handle.hotshot.consensus();
let timeout_task = handle.spawn_initial_timeout_task();

Self {
public_key: handle.public_key().clone(),
Expand All @@ -323,7 +320,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
network: Arc::clone(&handle.hotshot.network),
quorum_membership: handle.hotshot.memberships.quorum_membership.clone().into(),
timeout_membership: handle.hotshot.memberships.quorum_membership.clone().into(),
timeout_task,
timeout_task: async_spawn(async {}),
timeout: handle.hotshot.config.next_view_timeout,
round_start_delay: handle.hotshot.config.round_start_delay,
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
Expand All @@ -343,7 +340,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
{
async fn create_from(handle: &SystemContextHandle<TYPES, I, V>) -> Self {
let consensus = handle.hotshot.consensus();
let timeout_task = handle.spawn_initial_timeout_task();

Self {
public_key: handle.public_key().clone(),
Expand All @@ -359,7 +355,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
cur_view: handle.cur_view().await,
cur_view_time: Utc::now().timestamp(),
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
timeout_task,
timeout_task: async_spawn(async {}),
timeout: handle.hotshot.config.next_view_timeout,
consensus: OuterConsensus::new(consensus),
last_decided_view: handle.cur_view().await,
Expand Down
5 changes: 2 additions & 3 deletions crates/hotshot/src/traits/networking/push_cdn_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>

// Error if we stopped unexpectedly
if let Err(err) = marshal.start().await {
error!("broker stopped: {err}");
error!("marshal stopped: {err}");
}
});

Expand Down Expand Up @@ -447,8 +447,7 @@ impl<K: SignatureKey + 'static> ConnectedNetwork<K> for PushCdnNetwork<K> {
async fn spawn_request_receiver_task(
&self,
) -> Option<mpsc::Receiver<(Vec<u8>, NetworkMsgResponseChannel<Vec<u8>>)>> {
let (mut _tx, rx) = mpsc::channel(1);
Some(rx)
None
}

/// Pause sending and receiving on the PushCDN network.
Expand Down
31 changes: 2 additions & 29 deletions crates/hotshot/src/types/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,19 @@

//! Provides an event-streaming handle for a [`SystemContext`] running in the background
use std::{sync::Arc, time::Duration};
use std::sync::Arc;

use async_broadcast::{InactiveReceiver, Receiver, Sender};
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use futures::Stream;
use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry, Task, TaskState};
use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
use hotshot_task_impls::events::HotShotEvent;
use hotshot_types::{
consensus::Consensus,
data::Leaf,
error::HotShotError,
traits::{election::Membership, network::ConnectedNetwork, node_implementation::NodeType},
};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use tracing::instrument;

use crate::{traits::NodeImplementation, types::Event, Memberships, SystemContext, Versions};
Expand Down Expand Up @@ -236,26 +231,4 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
pub fn storage(&self) -> Arc<RwLock<I::Storage>> {
Arc::clone(&self.storage)
}

/// A helper function to spawn the initial timeout task from a given `SystemContextHandle`.
#[must_use]
pub fn spawn_initial_timeout_task(&self) -> JoinHandle<()> {
// Clone the event stream that we send the timeout event to
let event_stream = self.internal_event_stream.0.clone();
let next_view_timeout = self.hotshot.config.next_view_timeout;
let start_view = self.hotshot.start_view;

// Spawn a task that will sleep for the next view timeout and then send a timeout event
// if not cancelled
async_spawn({
async move {
async_sleep(Duration::from_millis(next_view_timeout)).await;
broadcast_event(
Arc::new(HotShotEvent::Timeout(start_view + 1)),
&event_stream,
)
.await;
}
})
}
}
12 changes: 8 additions & 4 deletions crates/libp2p-networking/src/network/behaviours/dht/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,20 @@ impl DHTBootstrapTask {
/// Task's loop
async fn run_loop(mut self) {
loop {
tracing::debug!("looping bootstrap");
if self.in_progress {
match self.rx.next().await {
Some(InputEvent::BootstrapFinished) => {
tracing::debug!("Bootstrap finished");
self.in_progress = false;
}
Some(InputEvent::ShutdownBootstrap) => {
tracing::debug!("ShutdownBootstrap received, shutting down");
tracing::info!("ShutdownBootstrap received, shutting down");
break;
}
Some(_) => {}
Some(InputEvent::StartBootstrap) => {
tracing::warn!("Trying to start bootstrap that's already in progress");
continue;
}
None => {
tracing::debug!("Bootstrap channel closed, exiting loop");
break;
Expand All @@ -74,7 +76,9 @@ impl DHTBootstrapTask {
tracing::debug!("ShutdownBootstrap received, shutting down");
break;
}
Some(_) => {}
Some(InputEvent::BootstrapFinished) => {
tracing::debug!("not in progress got bootstrap finished");
}
None => {
tracing::debug!("Bootstrap channel closed, exiting loop");
break;
Expand Down
3 changes: 1 addition & 2 deletions crates/libp2p-networking/src/network/behaviours/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use libp2p::kad::{
};
use libp2p_identity::PeerId;
use store::ValidatedStore;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, warn};

/// Additional DHT record functionality
pub mod record;
Expand Down Expand Up @@ -464,7 +464,6 @@ impl<K: SignatureKey + 'static> DHTBehaviour<K> {
..
} => {
if num_remaining == 0 {
info!("Finished bootstrapping");
self.finish_bootstrap();
} else {
debug!("Bootstrap in progress, {} nodes remaining", num_remaining);
Expand Down
26 changes: 26 additions & 0 deletions crates/task-impls/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use async_trait::async_trait;
use committable::Committable;
use futures::future::join_all;
use handlers::publish_proposal_from_commitment_and_metadata;
use hotshot_task::task::TaskState;
Expand Down Expand Up @@ -311,6 +312,31 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusTaskSt
warn!("Failed to handle QuorumProposalValidated event {e:#}");
}
}
HotShotEvent::QuorumProposalRequestRecv(req, signature) => {
// Make sure that this request came from who we think it did
if !req.key.validate(signature, req.commit().as_ref()) {
warn!("Invalid signature key on proposal request.");
return;
}

if let Some(quorum_proposal) = self
.consensus
.read()
.await
.last_proposals()
.get(&req.view_number)
{
broadcast_event(
HotShotEvent::QuorumProposalResponseSend(
req.key.clone(),
quorum_proposal.clone(),
)
.into(),
&event_sender,
)
.await;
}
}
HotShotEvent::QuorumVoteRecv(ref vote) => {
debug!("Received quorum vote: {:?}", vote.view_number());
if self.quorum_membership.leader(vote.view_number() + 1) != self.public_key {
Expand Down
36 changes: 31 additions & 5 deletions crates/task-impls/src/consensus2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@

use std::sync::Arc;

use self::handlers::{
handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change,
};
use crate::helpers::broadcast_event;
use crate::{events::HotShotEvent, vote_collection::VoteCollectorsMap};
use anyhow::Result;
use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use async_trait::async_trait;
use committable::Committable;
use hotshot_task::task::TaskState;
use hotshot_types::{
consensus::OuterConsensus,
Expand All @@ -28,11 +34,6 @@ use hotshot_types::{
use tokio::task::JoinHandle;
use tracing::instrument;

use self::handlers::{
handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change,
};
use crate::{events::HotShotEvent, vote_collection::VoteCollectorsMap};

/// Event handlers for use in the `handle` method.
mod handlers;

Expand Down Expand Up @@ -112,6 +113,31 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Consensus2TaskS
tracing::debug!("Failed to handle QuorumVoteRecv event; error = {e}");
}
}
HotShotEvent::QuorumProposalRequestRecv(req, signature) => {
// Make sure that this request came from who we think it did
if !req.key.validate(signature, req.commit().as_ref()) {
tracing::warn!("Invalid signature key on proposal request.");
return;
}

if let Some(quorum_proposal) = self
.consensus
.read()
.await
.last_proposals()
.get(&req.view_number)
{
broadcast_event(
HotShotEvent::QuorumProposalResponseSend(
req.key.clone(),
quorum_proposal.clone(),
)
.into(),
&sender,
)
.await;
}
}
HotShotEvent::TimeoutVoteRecv(ref vote) => {
if let Err(e) =
handle_timeout_vote_recv(vote, Arc::clone(&event), &sender, self).await
Expand Down
1 change: 1 addition & 0 deletions crates/task-impls/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ impl<
.await;
None
}

_ => None,
}
}
Expand Down
3 changes: 0 additions & 3 deletions crates/task-impls/src/quorum_proposal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ pub struct QuorumProposalTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>
/// Round start delay from config, in milliseconds.
pub round_start_delay: u64,

/// timeout task handle
pub timeout_task: JoinHandle<()>,

/// This node's storage ref
pub storage: Arc<RwLock<I::Storage>>,

Expand Down
Loading

0 comments on commit ce7c0a3

Please sign in to comment.