Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Hanging CI #3697

Merged
merged 47 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
b581608
shutdown completion task last
bfish713 Sep 23, 2024
4ee6bca
fix shutdown order
bfish713 Sep 23, 2024
c93c222
Merge branch 'main' into bf/test-hang-fix
bfish713 Sep 23, 2024
6c88880
fmt
bfish713 Sep 23, 2024
b261433
log everything at info when test fails
bfish713 Sep 23, 2024
4bfa83d
clear failed, logging
bfish713 Sep 23, 2024
67a2f42
fix build
bfish713 Sep 23, 2024
77e5ab1
different log level
bfish713 Sep 23, 2024
c1f2b7f
no capture again
bfish713 Sep 23, 2024
78d7b3d
typo
bfish713 Sep 23, 2024
803273d
move logging + do startups in parallel
bfish713 Sep 24, 2024
a11ff20
fmt
bfish713 Sep 24, 2024
e347c39
change initial timeout
bfish713 Sep 24, 2024
82175e4
remove nocapture
bfish713 Sep 24, 2024
2b60753
nocapture again
bfish713 Sep 24, 2024
40569d6
Merge branch 'main' into bf/test-hang-fix
bfish713 Sep 24, 2024
a81ad8f
fix
bfish713 Sep 24, 2024
70c994b
only log nodes started when there are nodes starting
bfish713 Sep 24, 2024
11c403b
log exit
bfish713 Sep 24, 2024
ca9e508
log when timeout starts
bfish713 Sep 24, 2024
c90ef6a
log id and view
bfish713 Sep 25, 2024
827aeb4
only shutdown from 1 place
bfish713 Sep 25, 2024
b9e4c28
fix build, remove handles from completetion task
bfish713 Sep 25, 2024
ca2a0d2
leave one up in cdn test
bfish713 Sep 25, 2024
07b54b0
more logs, less threads, maybe fix?
bfish713 Sep 25, 2024
74f0e27
actual fix
bfish713 Sep 25, 2024
b2ca805
lint fmt
bfish713 Sep 25, 2024
05794c4
Merge remote-tracking branch 'origin/main' into bf/test-hang-fix
bfish713 Sep 25, 2024
4cca1d0
allow more than 1 thread, tweaks
bfish713 Sep 25, 2024
6319aa0
remove nocapture
bfish713 Sep 25, 2024
f9af84f
move byzantine tests to ci-3
bfish713 Sep 25, 2024
a3ec2bb
rebalance tests more
bfish713 Sep 25, 2024
aafdc32
one more test to 4
bfish713 Sep 25, 2024
5f305bc
only spawn first timeout when starting consensus
bfish713 Sep 26, 2024
9df4ad2
cleanup
bfish713 Sep 26, 2024
609ceca
fix justfile lint tokio
bfish713 Sep 26, 2024
7e8fb31
fix justfil
bfish713 Sep 26, 2024
99ef939
sleep longer, nocapture to debug
bfish713 Sep 26, 2024
43343bd
info
bfish713 Sep 26, 2024
44fd242
fix another hot loop maybe
bfish713 Sep 26, 2024
a69f68c
don't spawn r/r tasks for cdn that do nothing
bfish713 Sep 26, 2024
369ab05
lint no sleep
bfish713 Sep 26, 2024
99b3f94
lower log level in libp2p
bfish713 Sep 26, 2024
d7e21b6
Merge branch 'main' into bf/test-hang-fix
bfish713 Sep 26, 2024
13fd192
lower builder test threshold
bfish713 Sep 26, 2024
d7290bd
remove nocapture for the last time, hopefully
bfish713 Sep 26, 2024
068bd02
remove cleanup_previous_timeouts_on_view
bfish713 Sep 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::{
};

use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender};
use async_compatibility_layer::art::async_spawn;
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::RwLock;
use async_trait::async_trait;
use futures::join;
Expand Down Expand Up @@ -370,6 +370,24 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
self.start_view
)
});

// Clone the event stream that we send the timeout event to
let event_stream = self.internal_event_stream.0.clone();
let next_view_timeout = self.config.next_view_timeout;
let start_view = self.start_view;

// Spawn a task that will sleep for the next view timeout and then send a timeout event
// if not cancelled
async_spawn({
async move {
async_sleep(Duration::from_millis(next_view_timeout)).await;
broadcast_event(
Arc::new(HotShotEvent::Timeout(start_view + 1)),
&event_stream,
)
.await;
}
});
Comment on lines +379 to +390
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we cancel the task spawned here? Probably I'm missing something.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't, but it can only run for the timeout duration then it'll just send an event into a closed stream (and error) if the node is shutdown before timeout. The timeout itself will be ignored if progress is made. I think cleaning this up so we can cancel on view change is a good idea though.

#[cfg(feature = "dependency-tasks")]
{
if let Some(validated_state) = consensus.validated_state_map().get(&self.start_view) {
Expand Down
12 changes: 4 additions & 8 deletions crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
sync::{atomic::AtomicBool, Arc},
};

use async_compatibility_layer::art::async_spawn;
use async_trait::async_trait;
use chrono::Utc;
use hotshot_task_impls::{
Expand Down Expand Up @@ -220,7 +221,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
{
async fn create_from(handle: &SystemContextHandle<TYPES, I, V>) -> Self {
let consensus = handle.hotshot.consensus();
let timeout_task = handle.spawn_initial_timeout_task();

Self {
consensus: OuterConsensus::new(consensus),
Expand All @@ -232,7 +232,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
payload_commitment_and_metadata: None,
vote_collectors: BTreeMap::default(),
timeout_vote_collectors: BTreeMap::default(),
timeout_task,
timeout_task: async_spawn(async {}),
spawned_tasks: BTreeMap::new(),
formed_upgrade_certificate: None,
proposal_cert: None,
Expand Down Expand Up @@ -282,7 +282,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
{
async fn create_from(handle: &SystemContextHandle<TYPES, I, V>) -> Self {
let consensus = handle.hotshot.consensus();
let timeout_task = handle.spawn_initial_timeout_task();

Self {
latest_proposed_view: handle.cur_view().await,
Expand All @@ -297,7 +296,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
private_key: handle.private_key().clone(),
storage: Arc::clone(&handle.storage),
timeout: handle.hotshot.config.next_view_timeout,
timeout_task,
round_start_delay: handle.hotshot.config.round_start_delay,
id: handle.hotshot.id,
formed_upgrade_certificate: None,
Expand All @@ -312,7 +310,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
{
async fn create_from(handle: &SystemContextHandle<TYPES, I, V>) -> Self {
let consensus = handle.hotshot.consensus();
let timeout_task = handle.spawn_initial_timeout_task();

Self {
public_key: handle.public_key().clone(),
Expand All @@ -323,7 +320,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
network: Arc::clone(&handle.hotshot.network),
quorum_membership: handle.hotshot.memberships.quorum_membership.clone().into(),
timeout_membership: handle.hotshot.memberships.quorum_membership.clone().into(),
timeout_task,
timeout_task: async_spawn(async {}),
timeout: handle.hotshot.config.next_view_timeout,
round_start_delay: handle.hotshot.config.round_start_delay,
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
Expand All @@ -343,7 +340,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
{
async fn create_from(handle: &SystemContextHandle<TYPES, I, V>) -> Self {
let consensus = handle.hotshot.consensus();
let timeout_task = handle.spawn_initial_timeout_task();

Self {
public_key: handle.public_key().clone(),
Expand All @@ -359,7 +355,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
cur_view: handle.cur_view().await,
cur_view_time: Utc::now().timestamp(),
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
timeout_task,
timeout_task: async_spawn(async {}),
timeout: handle.hotshot.config.next_view_timeout,
consensus: OuterConsensus::new(consensus),
last_decided_view: handle.cur_view().await,
Expand Down
5 changes: 2 additions & 3 deletions crates/hotshot/src/traits/networking/push_cdn_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ impl<TYPES: NodeType> TestableNetworkingImplementation<TYPES>

// Error if we stopped unexpectedly
if let Err(err) = marshal.start().await {
error!("broker stopped: {err}");
error!("marshal stopped: {err}");
}
});

Expand Down Expand Up @@ -447,8 +447,7 @@ impl<K: SignatureKey + 'static> ConnectedNetwork<K> for PushCdnNetwork<K> {
async fn spawn_request_receiver_task(
&self,
) -> Option<mpsc::Receiver<(Vec<u8>, NetworkMsgResponseChannel<Vec<u8>>)>> {
let (mut _tx, rx) = mpsc::channel(1);
Some(rx)
None
}

/// Pause sending and receiving on the PushCDN network.
Expand Down
31 changes: 2 additions & 29 deletions crates/hotshot/src/types/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,19 @@

//! Provides an event-streaming handle for a [`SystemContext`] running in the background

use std::{sync::Arc, time::Duration};
use std::sync::Arc;

use async_broadcast::{InactiveReceiver, Receiver, Sender};
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use futures::Stream;
use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry, Task, TaskState};
use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
use hotshot_task_impls::events::HotShotEvent;
use hotshot_types::{
consensus::Consensus,
data::Leaf,
error::HotShotError,
traits::{election::Membership, network::ConnectedNetwork, node_implementation::NodeType},
};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use tracing::instrument;

use crate::{traits::NodeImplementation, types::Event, Memberships, SystemContext, Versions};
Expand Down Expand Up @@ -236,26 +231,4 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
pub fn storage(&self) -> Arc<RwLock<I::Storage>> {
Arc::clone(&self.storage)
}

/// A helper function to spawn the initial timeout task from a given `SystemContextHandle`.
#[must_use]
pub fn spawn_initial_timeout_task(&self) -> JoinHandle<()> {
// Clone the event stream that we send the timeout event to
let event_stream = self.internal_event_stream.0.clone();
let next_view_timeout = self.hotshot.config.next_view_timeout;
let start_view = self.hotshot.start_view;

// Spawn a task that will sleep for the next view timeout and then send a timeout event
// if not cancelled
async_spawn({
async move {
async_sleep(Duration::from_millis(next_view_timeout)).await;
broadcast_event(
Arc::new(HotShotEvent::Timeout(start_view + 1)),
&event_stream,
)
.await;
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,20 @@ impl DHTBootstrapTask {
/// Task's loop
async fn run_loop(mut self) {
loop {
tracing::debug!("looping bootstrap");
if self.in_progress {
match self.rx.next().await {
Some(InputEvent::BootstrapFinished) => {
tracing::debug!("Bootstrap finished");
self.in_progress = false;
}
Some(InputEvent::ShutdownBootstrap) => {
tracing::debug!("ShutdownBootstrap received, shutting down");
tracing::info!("ShutdownBootstrap received, shutting down");
break;
}
Some(_) => {}
Some(InputEvent::StartBootstrap) => {
tracing::warn!("Trying to start bootstrap that's already in progress");
continue;
}
None => {
tracing::debug!("Bootstrap channel closed, exiting loop");
break;
Expand All @@ -74,7 +76,9 @@ impl DHTBootstrapTask {
tracing::debug!("ShutdownBootstrap received, shutting down");
break;
}
Some(_) => {}
Some(InputEvent::BootstrapFinished) => {
tracing::debug!("not in progress got bootstrap finished");
}
None => {
tracing::debug!("Bootstrap channel closed, exiting loop");
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use libp2p::kad::{
};
use libp2p_identity::PeerId;
use store::ValidatedStore;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, warn};

/// Additional DHT record functionality
pub mod record;
Expand Down Expand Up @@ -464,7 +464,6 @@ impl<K: SignatureKey + 'static> DHTBehaviour<K> {
..
} => {
if num_remaining == 0 {
info!("Finished bootstrapping");
self.finish_bootstrap();
} else {
debug!("Bootstrap in progress, {} nodes remaining", num_remaining);
Expand Down
26 changes: 26 additions & 0 deletions crates/task-impls/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use async_trait::async_trait;
use committable::Committable;
use futures::future::join_all;
use handlers::publish_proposal_from_commitment_and_metadata;
use hotshot_task::task::TaskState;
Expand Down Expand Up @@ -311,6 +312,31 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusTaskSt
warn!("Failed to handle QuorumProposalValidated event {e:#}");
}
}
HotShotEvent::QuorumProposalRequestRecv(req, signature) => {
// Make sure that this request came from who we think it did
if !req.key.validate(signature, req.commit().as_ref()) {
warn!("Invalid signature key on proposal request.");
return;
}

if let Some(quorum_proposal) = self
.consensus
.read()
.await
.last_proposals()
.get(&req.view_number)
{
broadcast_event(
HotShotEvent::QuorumProposalResponseSend(
req.key.clone(),
quorum_proposal.clone(),
)
.into(),
&event_sender,
)
.await;
}
}
HotShotEvent::QuorumVoteRecv(ref vote) => {
debug!("Received quorum vote: {:?}", vote.view_number());
if self.quorum_membership.leader(vote.view_number() + 1) != self.public_key {
Expand Down
36 changes: 31 additions & 5 deletions crates/task-impls/src/consensus2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@

use std::sync::Arc;

use self::handlers::{
handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change,
};
use crate::helpers::broadcast_event;
use crate::{events::HotShotEvent, vote_collection::VoteCollectorsMap};
use anyhow::Result;
use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use async_trait::async_trait;
use committable::Committable;
use hotshot_task::task::TaskState;
use hotshot_types::{
consensus::OuterConsensus,
Expand All @@ -28,11 +34,6 @@ use hotshot_types::{
use tokio::task::JoinHandle;
use tracing::instrument;

use self::handlers::{
handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change,
};
use crate::{events::HotShotEvent, vote_collection::VoteCollectorsMap};

/// Event handlers for use in the `handle` method.
mod handlers;

Expand Down Expand Up @@ -112,6 +113,31 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Consensus2TaskS
tracing::debug!("Failed to handle QuorumVoteRecv event; error = {e}");
}
}
HotShotEvent::QuorumProposalRequestRecv(req, signature) => {
// Make sure that this request came from who we think it did
if !req.key.validate(signature, req.commit().as_ref()) {
tracing::warn!("Invalid signature key on proposal request.");
return;
}

if let Some(quorum_proposal) = self
.consensus
.read()
.await
.last_proposals()
.get(&req.view_number)
{
broadcast_event(
HotShotEvent::QuorumProposalResponseSend(
req.key.clone(),
quorum_proposal.clone(),
)
.into(),
&sender,
)
.await;
}
}
HotShotEvent::TimeoutVoteRecv(ref vote) => {
if let Err(e) =
handle_timeout_vote_recv(vote, Arc::clone(&event), &sender, self).await
Expand Down
1 change: 1 addition & 0 deletions crates/task-impls/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ impl<
.await;
None
}

_ => None,
}
}
Expand Down
3 changes: 0 additions & 3 deletions crates/task-impls/src/quorum_proposal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ pub struct QuorumProposalTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>
/// Round start delay from config, in milliseconds.
pub round_start_delay: u64,

/// timeout task handle
pub timeout_task: JoinHandle<()>,

/// This node's storage ref
pub storage: Arc<RwLock<I::Storage>>,

Expand Down
Loading