Skip to content

Commit

Permalink
proposal dep takes inactive receiver and cancel tasks (#3784)
Browse files Browse the repository at this point in the history
  • Loading branch information
bfish713 authored Oct 28, 2024
1 parent 5d9ed74 commit bd976bd
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 18 deletions.
2 changes: 1 addition & 1 deletion crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState

Self {
latest_proposed_view: handle.cur_view().await,
proposal_dependencies: HashMap::new(),
proposal_dependencies: BTreeMap::new(),
network: Arc::clone(&handle.hotshot.network),
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
consensus: OuterConsensus::new(consensus),
Expand Down
6 changes: 3 additions & 3 deletions crates/task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
sync::Arc,
};

use async_broadcast::{Receiver, SendError, Sender};
use async_broadcast::{InactiveReceiver, Receiver, SendError, Sender};
use async_compatibility_layer::art::{async_sleep, async_spawn, async_timeout};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
Expand Down Expand Up @@ -363,7 +363,7 @@ pub async fn decide_from_proposal<TYPES: NodeType>(
pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
next_proposal_view_number: TYPES::View,
event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
event_receiver: &InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
quorum_membership: Arc<TYPES::Membership>,
public_key: TYPES::SignatureKey,
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
Expand Down Expand Up @@ -391,7 +391,7 @@ pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
let _ = fetch_proposal(
parent_view_number,
event_sender.clone(),
event_receiver.clone(),
event_receiver.activate_cloned(),
quorum_membership,
consensus.clone(),
public_key.clone(),
Expand Down
11 changes: 6 additions & 5 deletions crates/task-impls/src/quorum_proposal/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use std::{marker::PhantomData, sync::Arc, time::Duration};

use async_broadcast::{Receiver, Sender};
use async_broadcast::{InactiveReceiver, Sender};
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::RwLock;
use hotshot_task::{
Expand Down Expand Up @@ -69,7 +69,7 @@ pub struct ProposalDependencyHandle<TYPES: NodeType, V: Versions> {
pub sender: Sender<Arc<HotShotEvent<TYPES>>>,

/// The event receiver.
pub receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
pub receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,

/// Immutable instance state
pub instance_state: Arc<TYPES::InstanceState>,
Expand Down Expand Up @@ -252,6 +252,7 @@ impl<TYPES: NodeType, V: Versions> HandleDepOutput for ProposalDependencyHandle<
#[allow(clippy::no_effect_underscore_binding, clippy::too_many_lines)]
async fn handle_dep_result(self, res: Self::Output) {
let high_qc_view_number = self.consensus.read().await.high_qc().view_number;
let event_receiver = self.receiver.activate_cloned();
if !self
.consensus
.read()
Expand All @@ -262,16 +263,16 @@ impl<TYPES: NodeType, V: Versions> HandleDepOutput for ProposalDependencyHandle<
// The proposal for the high qc view is missing, try to get it asynchronously
let membership = Arc::clone(&self.quorum_membership);
let event_sender = self.sender.clone();
let event_receiver = self.receiver.clone();
let sender_public_key = self.public_key.clone();
let sender_private_key = self.private_key.clone();
let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
let upgrade_lock = self.upgrade_lock.clone();
let rx = event_receiver.clone();
async_spawn(async move {
fetch_proposal(
high_qc_view_number,
event_sender,
event_receiver,
rx,
membership,
consensus,
sender_public_key,
Expand All @@ -282,7 +283,7 @@ impl<TYPES: NodeType, V: Versions> HandleDepOutput for ProposalDependencyHandle<
});
// Block on receiving the event from the event stream.
EventDependency::new(
self.receiver.clone(),
event_receiver,
Box::new(move |event| {
let event = event.as_ref();
if let HotShotEvent::ValidatedStateUpdated(view_number, _) = event {
Expand Down
26 changes: 18 additions & 8 deletions crates/task-impls/src/quorum_proposal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see <https://mit-license.org/>.

use std::{collections::HashMap, sync::Arc};
use std::{collections::BTreeMap, sync::Arc};

use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use async_trait::async_trait;
use either::Either;
use futures::future::join_all;
use hotshot_task::{
dependency::{AndDependency, EventDependency, OrDependency},
dependency_task::DependencyTask,
Expand Down Expand Up @@ -49,7 +50,7 @@ pub struct QuorumProposalTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>
pub latest_proposed_view: TYPES::View,

/// Table for the in-progress proposal dependency tasks.
pub proposal_dependencies: HashMap<TYPES::View, JoinHandle<()>>,
pub proposal_dependencies: BTreeMap<TYPES::View, JoinHandle<()>>,

/// The underlying network
pub network: Arc<I::Network>,
Expand Down Expand Up @@ -321,7 +322,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
latest_proposed_view: self.latest_proposed_view,
view_number,
sender: event_sender,
receiver: event_receiver,
receiver: event_receiver.deactivate(),
quorum_membership: Arc::clone(&self.quorum_membership),
public_key: self.public_key.clone(),
private_key: self.private_key.clone(),
Expand Down Expand Up @@ -533,10 +534,23 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
Arc::clone(&event),
)?;
}
HotShotEvent::ViewChange(view) | HotShotEvent::Timeout(view) => {
self.cancel_tasks(*view).await;
}
_ => {}
}
Ok(())
}
/// Cancel all tasks the consensus tasks has spawned before the given view
pub async fn cancel_tasks(&mut self, view: TYPES::View) {
let keep = self.proposal_dependencies.split_off(&view);
let mut cancel = Vec::new();
while let Some((_, task)) = self.proposal_dependencies.pop_first() {
cancel.push(cancel_task(task));
}
self.proposal_dependencies = keep;
join_all(cancel).await;
}
}

#[async_trait]
Expand All @@ -555,11 +569,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
}

async fn cancel_subtasks(&mut self) {
for handle in self
.proposal_dependencies
.drain()
.map(|(_view, handle)| handle)
{
while let Some((_, handle)) = self.proposal_dependencies.pop_first() {
#[cfg(async_executor_impl = "async-std")]
handle.cancel().await;
#[cfg(async_executor_impl = "tokio")]
Expand Down
1 change: 0 additions & 1 deletion crates/task-impls/src/quorum_proposal_recv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
let Some((_, handles)) = self.spawned_tasks.pop_first() else {
break;
};

for handle in handles {
#[cfg(async_executor_impl = "async-std")]
handle.cancel().await;
Expand Down

0 comments on commit bd976bd

Please sign in to comment.