Skip to content

Commit

Permalink
[Stability] Fix GC for tasks (#2235)
Browse files Browse the repository at this point in the history
* remove cancelled polls from the taskmap

* remove debug prints

* merge current proposal changes
  • Loading branch information
rob-maron authored Dec 14, 2023
1 parent 6e33101 commit c436836
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 79 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/hotshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ surf-disco = { workspace = true }
time = { workspace = true }
local-ip-address = "0.5.6"
dyn-clone = { git = "https://github.com/dtolnay/dyn-clone", tag = "1.0.16" }
derive_more = "0.99.17"
portpicker = "0.1.1"

tracing = { workspace = true }
Expand Down
194 changes: 115 additions & 79 deletions crates/hotshot/src/traits/networking/web_server_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use async_compatibility_layer::{
};
use async_lock::RwLock;
use async_trait::async_trait;
use derive_more::{Deref, DerefMut};
use hotshot_task::{boxed_sync, BoxSyncFuture};
use hotshot_types::{
message::{Message, MessagePurpose},
Expand Down Expand Up @@ -94,6 +95,79 @@ impl<TYPES: NodeType> WebServerNetwork<TYPES> {
}
}

/// `TaskChannel` is a type alias for an unbounded sender channel that sends `ConsensusIntentEvent`s.
///
/// This channel is used to send events to a task. The `K` type parameter is the type of the key used in the `ConsensusIntentEvent`.
///
/// # Examples
///
/// ```
/// let (tx, _rx): (TaskChannel<MyKey>, _) = tokio::sync::mpsc::unbounded_channel();
/// ```
///
/// # Note
///
/// This type alias is used in the context of a `TaskMap`, where each task is represented by a `TaskChannel`.
type TaskChannel<K> = UnboundedSender<ConsensusIntentEvent<K>>;

/// `TaskMap` is a wrapper around a `BTreeMap` that maps view numbers to tasks.
///
/// Each task is represented by a `TaskChannel` that can be used to send events to the task.
/// The key `K` is a type that implements the `SignatureKey` trait.
///
/// # Examples
///
/// ```
/// use your_crate::TaskMap;
/// let mut map: TaskMap<MyKey> = TaskMap::default();
/// ```
///
/// # Note
///
/// This struct is `Clone`, `Deref`, and `DerefMut`, so it can be used just like a `BTreeMap`.
#[derive(Debug, Clone, Deref, DerefMut)]
struct TaskMap<K: SignatureKey>(BTreeMap<u64, TaskChannel<K>>);

impl<K: SignatureKey> Default for TaskMap<K> {
fn default() -> Self {
Self(BTreeMap::default())
}
}

impl<K: SignatureKey> TaskMap<K> {
/// Prunes tasks that are polling for a view less than or equal to `current_view - 2`.
///
/// This method cancels and removes all entries in the task map that are polling for a view less than or equal to `current_view - 2`.
/// The cancellation is performed by sending a `cancel_event` to the task.
///
/// # Arguments
///
/// * `current_view` - The current view number. Tasks polling for a view less than or equal to `current_view - 2` will be pruned.
/// * `cancel_event_fn` - A function that takes a view number and returns a `ConsensusIntentEvent` to be sent to the task for cancellation.
///
/// # Examples
///
/// ```
/// let mut map: TaskMap<MyKey> = TaskMap::default();
/// map.prune_tasks(10, ConsensusIntentEvent::CancelPollForProposal).await;
/// ```
async fn prune_tasks(
&mut self,
current_view: u64,
cancel_event_fn: fn(u64) -> ConsensusIntentEvent<K>,
) {
let cutoff_view = current_view.saturating_sub(2);
let views_to_remove: Vec<_> = self.range(..cutoff_view).map(|(key, _)| *key).collect();

for view in views_to_remove {
let task = self.remove(&view);
if let Some(task) = task {
let _ = task.send(cancel_event_fn(view)).await;
}
}
}
}

/// Represents the core of web server networking
#[derive(Debug)]
struct Inner<TYPES: NodeType> {
Expand All @@ -118,26 +192,19 @@ struct Inner<TYPES: NodeType> {
tx_index: Arc<RwLock<u64>>,

/// Task map for quorum proposals.
proposal_task_map:
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
proposal_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task map for quorum votes.
vote_task_map:
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
vote_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task map for VID disperse data
vid_disperse_task_map:
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
vid_disperse_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task map for DACs.
dac_task_map:
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
dac_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task map for view sync certificates.
view_sync_cert_task_map:
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
view_sync_cert_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task map for view sync votes.
view_sync_vote_task_map:
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
view_sync_vote_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task map for transactions
txn_task_map:
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
txn_task_map: Arc<RwLock<TaskMap<TYPES::SignatureKey>>>,
/// Task polling for current propsal
current_proposal_task:
Arc<RwLock<Option<UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Expand Down Expand Up @@ -802,15 +869,10 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForProposal(*view))
.await;
}
// Cancel old, stale tasks
task_map
.prune_tasks(view_number, ConsensusIntentEvent::CancelPollForProposal)
.await;
}
ConsensusIntentEvent::PollForVIDDisperse(view_number) => {
// Check if we already have a task for this (we shouldn't)
Expand Down Expand Up @@ -840,15 +902,10 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForVIDDisperse(*view))
.await;
}
// Cancel old, stale tasks
task_map
.prune_tasks(view_number, ConsensusIntentEvent::CancelPollForVIDDisperse)
.await;
}
ConsensusIntentEvent::PollForCurrentProposal => {
let mut proposal_task = self.inner.current_proposal_task.write().await;
Expand Down Expand Up @@ -899,15 +956,10 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForVotes(*view))
.await;
}
// Cancel old, stale tasks
task_map
.prune_tasks(view_number, ConsensusIntentEvent::CancelPollForVotes)
.await;
}

ConsensusIntentEvent::PollForDAC(view_number) => {
Expand All @@ -934,15 +986,10 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForDAC(*view))
.await;
}
// Cancel old, stale tasks
task_map
.prune_tasks(view_number, ConsensusIntentEvent::CancelPollForDAC)
.await;
}

ConsensusIntentEvent::CancelPollForVotes(view_number) => {
Expand Down Expand Up @@ -986,17 +1033,13 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForViewSyncCertificate(
*view,
))
.await;
}
// Cancel old, stale tasks
task_map
.prune_tasks(
view_number,
ConsensusIntentEvent::CancelPollForViewSyncCertificate,
)
.await;
}
ConsensusIntentEvent::PollForViewSyncVotes(view_number) => {
let mut task_map = self.inner.view_sync_vote_task_map.write().await;
Expand Down Expand Up @@ -1026,15 +1069,13 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForViewSyncVotes(*view))
.await;
}
// Cancel old, stale tasks
task_map
.prune_tasks(
view_number,
ConsensusIntentEvent::CancelPollForViewSyncVotes,
)
.await;
}

ConsensusIntentEvent::CancelPollForViewSyncCertificate(view_number) => {
Expand Down Expand Up @@ -1089,15 +1130,10 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForTransactions(*view))
.await;
}
// Cancel old, stale tasks
task_map
.prune_tasks(view_number, ConsensusIntentEvent::CancelPollForTransactions)
.await;
}
ConsensusIntentEvent::CancelPollForTransactions(view_number) => {
let mut task_map = self.inner.txn_task_map.write().await;
Expand Down

0 comments on commit c436836

Please sign in to comment.