Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't Hold Active Receiver and Task Cleanup in Proposal Task #3784

Merged
merged 2 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState

Self {
latest_proposed_view: handle.cur_view().await,
proposal_dependencies: HashMap::new(),
proposal_dependencies: BTreeMap::new(),
network: Arc::clone(&handle.hotshot.network),
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
consensus: OuterConsensus::new(consensus),
Expand Down
6 changes: 3 additions & 3 deletions crates/task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
sync::Arc,
};

use async_broadcast::{Receiver, SendError, Sender};
use async_broadcast::{InactiveReceiver, Receiver, SendError, Sender};
use async_compatibility_layer::art::{async_sleep, async_spawn, async_timeout};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
Expand Down Expand Up @@ -363,7 +363,7 @@ pub async fn decide_from_proposal<TYPES: NodeType>(
pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
next_proposal_view_number: TYPES::View,
event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
event_receiver: &InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
quorum_membership: Arc<TYPES::Membership>,
public_key: TYPES::SignatureKey,
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
Expand Down Expand Up @@ -391,7 +391,7 @@ pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
let _ = fetch_proposal(
parent_view_number,
event_sender.clone(),
event_receiver.clone(),
event_receiver.activate_cloned(),
quorum_membership,
consensus.clone(),
public_key.clone(),
Expand Down
11 changes: 6 additions & 5 deletions crates/task-impls/src/quorum_proposal/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use std::{marker::PhantomData, sync::Arc, time::Duration};

use async_broadcast::{Receiver, Sender};
use async_broadcast::{InactiveReceiver, Sender};
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::RwLock;
use hotshot_task::{
Expand Down Expand Up @@ -69,7 +69,7 @@ pub struct ProposalDependencyHandle<TYPES: NodeType, V: Versions> {
pub sender: Sender<Arc<HotShotEvent<TYPES>>>,

/// The event receiver.
pub receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
pub receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,

/// Immutable instance state
pub instance_state: Arc<TYPES::InstanceState>,
Expand Down Expand Up @@ -252,6 +252,7 @@ impl<TYPES: NodeType, V: Versions> HandleDepOutput for ProposalDependencyHandle<
#[allow(clippy::no_effect_underscore_binding, clippy::too_many_lines)]
async fn handle_dep_result(self, res: Self::Output) {
let high_qc_view_number = self.consensus.read().await.high_qc().view_number;
let event_receiver = self.receiver.activate_cloned();
if !self
.consensus
.read()
Expand All @@ -262,16 +263,16 @@ impl<TYPES: NodeType, V: Versions> HandleDepOutput for ProposalDependencyHandle<
// The proposal for the high qc view is missing, try to get it asynchronously
let membership = Arc::clone(&self.quorum_membership);
let event_sender = self.sender.clone();
let event_receiver = self.receiver.clone();
let sender_public_key = self.public_key.clone();
let sender_private_key = self.private_key.clone();
let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
let upgrade_lock = self.upgrade_lock.clone();
let rx = event_receiver.clone();
async_spawn(async move {
fetch_proposal(
high_qc_view_number,
event_sender,
event_receiver,
rx,
membership,
consensus,
sender_public_key,
Expand All @@ -282,7 +283,7 @@ impl<TYPES: NodeType, V: Versions> HandleDepOutput for ProposalDependencyHandle<
});
// Block on receiving the event from the event stream.
EventDependency::new(
self.receiver.clone(),
event_receiver,
Box::new(move |event| {
let event = event.as_ref();
if let HotShotEvent::ValidatedStateUpdated(view_number, _) = event {
Expand Down
26 changes: 18 additions & 8 deletions crates/task-impls/src/quorum_proposal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see <https://mit-license.org/>.

use std::{collections::HashMap, sync::Arc};
use std::{collections::BTreeMap, sync::Arc};

use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use async_trait::async_trait;
use either::Either;
use futures::future::join_all;
use hotshot_task::{
dependency::{AndDependency, EventDependency, OrDependency},
dependency_task::DependencyTask,
Expand Down Expand Up @@ -49,7 +50,7 @@ pub struct QuorumProposalTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>
pub latest_proposed_view: TYPES::View,

/// Table for the in-progress proposal dependency tasks.
pub proposal_dependencies: HashMap<TYPES::View, JoinHandle<()>>,
pub proposal_dependencies: BTreeMap<TYPES::View, JoinHandle<()>>,

/// The underlying network
pub network: Arc<I::Network>,
Expand Down Expand Up @@ -321,7 +322,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
latest_proposed_view: self.latest_proposed_view,
view_number,
sender: event_sender,
receiver: event_receiver,
receiver: event_receiver.deactivate(),
quorum_membership: Arc::clone(&self.quorum_membership),
public_key: self.public_key.clone(),
private_key: self.private_key.clone(),
Expand Down Expand Up @@ -533,10 +534,23 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
Arc::clone(&event),
)?;
}
HotShotEvent::ViewChange(view) | HotShotEvent::Timeout(view) => {
self.cancel_tasks(*view).await;
}
_ => {}
}
Ok(())
}
/// Cancel all tasks the consensus tasks has spawned before the given view
pub async fn cancel_tasks(&mut self, view: TYPES::View) {
let keep = self.proposal_dependencies.split_off(&view);
let mut cancel = Vec::new();
while let Some((_, task)) = self.proposal_dependencies.pop_first() {
cancel.push(cancel_task(task));
}
self.proposal_dependencies = keep;
join_all(cancel).await;
}
}

#[async_trait]
Expand All @@ -555,11 +569,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
}

async fn cancel_subtasks(&mut self) {
for handle in self
.proposal_dependencies
.drain()
.map(|(_view, handle)| handle)
{
while let Some((_, handle)) = self.proposal_dependencies.pop_first() {
#[cfg(async_executor_impl = "async-std")]
handle.cancel().await;
#[cfg(async_executor_impl = "tokio")]
Expand Down
1 change: 0 additions & 1 deletion crates/task-impls/src/quorum_proposal_recv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
let Some((_, handles)) = self.spawned_tasks.pop_first() else {
break;
};

for handle in handles {
#[cfg(async_executor_impl = "async-std")]
handle.cancel().await;
Expand Down