diff --git a/crates/hotshot/src/tasks/task_state.rs b/crates/hotshot/src/tasks/task_state.rs index 18ef2c91d2..d24cd2e646 100644 --- a/crates/hotshot/src/tasks/task_state.rs +++ b/crates/hotshot/src/tasks/task_state.rs @@ -254,7 +254,7 @@ impl, V: Versions> CreateTaskState Self { latest_proposed_view: handle.cur_view().await, - proposal_dependencies: HashMap::new(), + proposal_dependencies: BTreeMap::new(), network: Arc::clone(&handle.hotshot.network), output_event_stream: handle.hotshot.external_event_stream.0.clone(), consensus: OuterConsensus::new(consensus), diff --git a/crates/task-impls/src/helpers.rs b/crates/task-impls/src/helpers.rs index 07b2a3604c..745340ca1f 100644 --- a/crates/task-impls/src/helpers.rs +++ b/crates/task-impls/src/helpers.rs @@ -10,7 +10,7 @@ use std::{ sync::Arc, }; -use async_broadcast::{Receiver, SendError, Sender}; +use async_broadcast::{InactiveReceiver, Receiver, SendError, Sender}; use async_compatibility_layer::art::{async_sleep, async_spawn, async_timeout}; use async_lock::RwLock; #[cfg(async_executor_impl = "async-std")] @@ -363,7 +363,7 @@ pub async fn decide_from_proposal( pub(crate) async fn parent_leaf_and_state( next_proposal_view_number: TYPES::View, event_sender: &Sender>>, - event_receiver: &Receiver>>, + event_receiver: &InactiveReceiver>>, quorum_membership: Arc, public_key: TYPES::SignatureKey, private_key: ::PrivateKey, @@ -391,7 +391,7 @@ pub(crate) async fn parent_leaf_and_state( let _ = fetch_proposal( parent_view_number, event_sender.clone(), - event_receiver.clone(), + event_receiver.activate_cloned(), quorum_membership, consensus.clone(), public_key.clone(), diff --git a/crates/task-impls/src/quorum_proposal/handlers.rs b/crates/task-impls/src/quorum_proposal/handlers.rs index 04abda0669..29f78edf6b 100644 --- a/crates/task-impls/src/quorum_proposal/handlers.rs +++ b/crates/task-impls/src/quorum_proposal/handlers.rs @@ -9,7 +9,7 @@ use std::{marker::PhantomData, sync::Arc, time::Duration}; -use async_broadcast::{Receiver, Sender}; +use async_broadcast::{InactiveReceiver, Sender}; use async_compatibility_layer::art::{async_sleep, async_spawn}; use async_lock::RwLock; use hotshot_task::{ @@ -69,7 +69,7 @@ pub struct ProposalDependencyHandle { pub sender: Sender>>, /// The event receiver. - pub receiver: Receiver>>, + pub receiver: InactiveReceiver>>, /// Immutable instance state pub instance_state: Arc, @@ -252,6 +252,7 @@ impl HandleDepOutput for ProposalDependencyHandle< #[allow(clippy::no_effect_underscore_binding, clippy::too_many_lines)] async fn handle_dep_result(self, res: Self::Output) { let high_qc_view_number = self.consensus.read().await.high_qc().view_number; + let event_receiver = self.receiver.activate_cloned(); if !self .consensus .read() @@ -262,16 +263,16 @@ impl HandleDepOutput for ProposalDependencyHandle< // The proposal for the high qc view is missing, try to get it asynchronously let membership = Arc::clone(&self.quorum_membership); let event_sender = self.sender.clone(); - let event_receiver = self.receiver.clone(); let sender_public_key = self.public_key.clone(); let sender_private_key = self.private_key.clone(); let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)); let upgrade_lock = self.upgrade_lock.clone(); + let rx = event_receiver.clone(); async_spawn(async move { fetch_proposal( high_qc_view_number, event_sender, - event_receiver, + rx, membership, consensus, sender_public_key, @@ -282,7 +283,7 @@ impl HandleDepOutput for ProposalDependencyHandle< }); // Block on receiving the event from the event stream. EventDependency::new( - self.receiver.clone(), + event_receiver, Box::new(move |event| { let event = event.as_ref(); if let HotShotEvent::ValidatedStateUpdated(view_number, _) = event { diff --git a/crates/task-impls/src/quorum_proposal/mod.rs b/crates/task-impls/src/quorum_proposal/mod.rs index 022b608698..65d905b47b 100644 --- a/crates/task-impls/src/quorum_proposal/mod.rs +++ b/crates/task-impls/src/quorum_proposal/mod.rs @@ -4,7 +4,7 @@ // You should have received a copy of the MIT License // along with the HotShot repository. If not, see . -use std::{collections::HashMap, sync::Arc}; +use std::{collections::BTreeMap, sync::Arc}; use async_broadcast::{Receiver, Sender}; use async_lock::RwLock; @@ -12,6 +12,7 @@ use async_lock::RwLock; use async_std::task::JoinHandle; use async_trait::async_trait; use either::Either; +use futures::future::join_all; use hotshot_task::{ dependency::{AndDependency, EventDependency, OrDependency}, dependency_task::DependencyTask, @@ -49,7 +50,7 @@ pub struct QuorumProposalTaskState pub latest_proposed_view: TYPES::View, /// Table for the in-progress proposal dependency tasks. - pub proposal_dependencies: HashMap>, + pub proposal_dependencies: BTreeMap>, /// The underlying network pub network: Arc, @@ -321,7 +322,7 @@ impl, V: Versions> latest_proposed_view: self.latest_proposed_view, view_number, sender: event_sender, - receiver: event_receiver, + receiver: event_receiver.deactivate(), quorum_membership: Arc::clone(&self.quorum_membership), public_key: self.public_key.clone(), private_key: self.private_key.clone(), @@ -533,10 +534,23 @@ impl, V: Versions> Arc::clone(&event), )?; } + HotShotEvent::ViewChange(view) | HotShotEvent::Timeout(view) => { + self.cancel_tasks(*view).await; + } _ => {} } Ok(()) } + /// Cancel all tasks the consensus tasks has spawned before the given view + pub async fn cancel_tasks(&mut self, view: TYPES::View) { + let keep = self.proposal_dependencies.split_off(&view); + let mut cancel = Vec::new(); + while let Some((_, task)) = self.proposal_dependencies.pop_first() { + cancel.push(cancel_task(task)); + } + self.proposal_dependencies = keep; + join_all(cancel).await; + } } #[async_trait] @@ -555,11 +569,7 @@ impl, V: Versions> TaskState } async fn cancel_subtasks(&mut self) { - for handle in self - .proposal_dependencies - .drain() - .map(|(_view, handle)| handle) - { + while let Some((_, handle)) = self.proposal_dependencies.pop_first() { #[cfg(async_executor_impl = "async-std")] handle.cancel().await; #[cfg(async_executor_impl = "tokio")] diff --git a/crates/task-impls/src/quorum_proposal_recv/mod.rs b/crates/task-impls/src/quorum_proposal_recv/mod.rs index 7c3ab2ed24..c1f10a038e 100644 --- a/crates/task-impls/src/quorum_proposal_recv/mod.rs +++ b/crates/task-impls/src/quorum_proposal_recv/mod.rs @@ -169,7 +169,6 @@ impl, V: Versions> TaskState let Some((_, handles)) = self.spawned_tasks.pop_first() else { break; }; - for handle in handles { #[cfg(async_executor_impl = "async-std")] handle.cancel().await;