From e83aba5534393a1df6bcdfff7e303223728b1a7d Mon Sep 17 00:00:00 2001 From: pls148 <184445976+pls148@users.noreply.github.com> Date: Mon, 14 Oct 2024 16:14:36 -0700 Subject: [PATCH] push task_state as QuorumProposalRecv task state into helper functions, rename consensus2 --- crates/hotshot/src/lib.rs | 2 - crates/hotshot/src/tasks/mod.rs | 4 +- crates/hotshot/src/tasks/task_state.rs | 4 +- .../src/{consensus2 => consensus}/handlers.rs | 12 +-- .../src/{consensus2 => consensus}/mod.rs | 8 +- crates/task-impls/src/helpers.rs | 81 ++++++++----------- crates/task-impls/src/lib.rs | 2 +- .../src/quorum_proposal_recv/handlers.rs | 30 ++----- crates/testing/src/view_generator.rs | 2 +- .../tests_1/upgrade_task_with_proposal.rs | 2 +- .../tests/tests_1/upgrade_task_with_vote.rs | 2 +- justfile | 2 +- 12 files changed, 59 insertions(+), 92 deletions(-) rename crates/task-impls/src/{consensus2 => consensus}/handlers.rs (96%) rename crates/task-impls/src/{consensus2 => consensus}/mod.rs (95%) diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index eb63cb630b..577318befb 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -350,8 +350,6 @@ impl, V: Versions> SystemContext, { use hotshot_task_impls::{ - consensus2::Consensus2TaskState, quorum_proposal::QuorumProposalTaskState, + consensus::ConsensusTaskState, quorum_proposal::QuorumProposalTaskState, quorum_proposal_recv::QuorumProposalRecvTaskState, quorum_vote::QuorumVoteTaskState, }; handle.add_task(QuorumProposalTaskState::::create_from(handle).await); handle.add_task(QuorumVoteTaskState::::create_from(handle).await); handle.add_task(QuorumProposalRecvTaskState::::create_from(handle).await); - handle.add_task(Consensus2TaskState::::create_from(handle).await); + handle.add_task(ConsensusTaskState::::create_from(handle).await); } #[cfg(feature = "rewind")] diff --git a/crates/hotshot/src/tasks/task_state.rs b/crates/hotshot/src/tasks/task_state.rs index 1946912cd9..50d08221c4 100644 --- a/crates/hotshot/src/tasks/task_state.rs +++ b/crates/hotshot/src/tasks/task_state.rs @@ -13,7 +13,7 @@ use async_compatibility_layer::art::async_spawn; use async_trait::async_trait; use chrono::Utc; use hotshot_task_impls::{ - builder::BuilderClient, consensus2::Consensus2TaskState, da::DaTaskState, + builder::BuilderClient, consensus::ConsensusTaskState, da::DaTaskState, quorum_proposal::QuorumProposalTaskState, quorum_proposal_recv::QuorumProposalRecvTaskState, quorum_vote::QuorumVoteTaskState, request::NetworkRequestState, rewind::RewindTaskState, transactions::TransactionTaskState, upgrade::UpgradeTaskState, vid::VidTaskState, @@ -299,7 +299,7 @@ impl, V: Versions> CreateTaskState #[async_trait] impl, V: Versions> CreateTaskState - for Consensus2TaskState + for ConsensusTaskState { async fn create_from(handle: &SystemContextHandle) -> Self { let consensus = handle.hotshot.consensus(); diff --git a/crates/task-impls/src/consensus2/handlers.rs b/crates/task-impls/src/consensus/handlers.rs similarity index 96% rename from crates/task-impls/src/consensus2/handlers.rs rename to crates/task-impls/src/consensus/handlers.rs index c766334471..cc0fa65a41 100644 --- a/crates/task-impls/src/consensus2/handlers.rs +++ b/crates/task-impls/src/consensus/handlers.rs @@ -21,9 +21,9 @@ use hotshot_types::{ }; use tracing::{debug, error, instrument}; -use super::Consensus2TaskState; +use super::ConsensusTaskState; use crate::{ - consensus2::Versions, + consensus::Versions, events::HotShotEvent, helpers::{broadcast_event, cancel_task}, vote_collection::handle_vote, @@ -38,7 +38,7 @@ pub(crate) async fn handle_quorum_vote_recv< vote: &QuorumVote, event: Arc>, sender: &Sender>>, - task_state: &mut Consensus2TaskState, + task_state: &mut ConsensusTaskState, ) -> Result<()> { // Are we the leader for this view? ensure!( @@ -73,7 +73,7 @@ pub(crate) async fn handle_timeout_vote_recv< vote: &TimeoutVote, event: Arc>, sender: &Sender>>, - task_state: &mut Consensus2TaskState, + task_state: &mut ConsensusTaskState, ) -> Result<()> { // Are we the leader for this view? ensure!( @@ -108,7 +108,7 @@ pub(crate) async fn handle_view_change< >( new_view_number: TYPES::Time, sender: &Sender>>, - task_state: &mut Consensus2TaskState, + task_state: &mut ConsensusTaskState, ) -> Result<()> { ensure!( new_view_number > task_state.cur_view, @@ -205,7 +205,7 @@ pub(crate) async fn handle_view_change< pub(crate) async fn handle_timeout, V: Versions>( view_number: TYPES::Time, sender: &Sender>>, - task_state: &mut Consensus2TaskState, + task_state: &mut ConsensusTaskState, ) -> Result<()> { ensure!( task_state.cur_view < view_number, diff --git a/crates/task-impls/src/consensus2/mod.rs b/crates/task-impls/src/consensus/mod.rs similarity index 95% rename from crates/task-impls/src/consensus2/mod.rs rename to crates/task-impls/src/consensus/mod.rs index 7b139842f9..82610978a6 100644 --- a/crates/task-impls/src/consensus2/mod.rs +++ b/crates/task-impls/src/consensus/mod.rs @@ -37,7 +37,7 @@ use crate::{events::HotShotEvent, vote_collection::VoteCollectorsMap}; mod handlers; /// Task state for the Consensus task. -pub struct Consensus2TaskState, V: Versions> { +pub struct ConsensusTaskState, V: Versions> { /// Our public key pub public_key: TYPES::SignatureKey, @@ -96,9 +96,9 @@ pub struct Consensus2TaskState, V: /// Lock for a decided upgrade pub upgrade_lock: UpgradeLock, } -impl, V: Versions> Consensus2TaskState { +impl, V: Versions> ConsensusTaskState { /// Handles a consensus event received on the event stream - #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, last_decided_view = *self.last_decided_view), name = "Consensus replica task", level = "error", target = "Consensus2TaskState")] + #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, last_decided_view = *self.last_decided_view), name = "Consensus replica task", level = "error", target = "ConsensusTaskState")] pub async fn handle( &mut self, event: Arc>, @@ -151,7 +151,7 @@ impl, V: Versions> Consensus2TaskS #[async_trait] impl, V: Versions> TaskState - for Consensus2TaskState + for ConsensusTaskState { type Event = HotShotEvent; diff --git a/crates/task-impls/src/helpers.rs b/crates/task-impls/src/helpers.rs index 6390c360aa..199538e981 100644 --- a/crates/task-impls/src/helpers.rs +++ b/crates/task-impls/src/helpers.rs @@ -41,7 +41,7 @@ use hotshot_types::{ use tokio::task::JoinHandle; use tracing::{debug, info, instrument, warn}; -use crate::{events::HotShotEvent, request::REQUEST_TIMEOUT}; +use crate::{events::HotShotEvent, request::REQUEST_TIMEOUT, quorum_proposal_recv::QuorumProposalRecvTaskState}; /// Trigger a request to the network for a proposal for a view and wait for the response or timeout. #[instrument(skip_all)] @@ -436,11 +436,8 @@ pub(crate) async fn parent_leaf_and_state( /// /// # Errors /// If any validation or state update fails. -/// TODO - This should just take the QuorumProposalRecv task state after -/// we merge the dependency tasks. -#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_lines)] -#[instrument(skip_all, fields(id = id, view = *proposal.data.view_number()))] +#[instrument(skip_all, fields(id = task_state.id, view = *proposal.data.view_number()))] pub async fn validate_proposal_safety_and_liveness< TYPES: NodeType, I: NodeImplementation, @@ -448,21 +445,15 @@ pub async fn validate_proposal_safety_and_liveness< >( proposal: Proposal>, parent_leaf: Leaf, - consensus: OuterConsensus, - decided_upgrade_certificate: Arc>>>, - quorum_membership: Arc, + task_state: &mut QuorumProposalRecvTaskState, event_stream: Sender>>, sender: TYPES::SignatureKey, - event_sender: Sender>, - id: u64, - upgrade_lock: UpgradeLock, - storage: Arc>, ) -> Result<()> { let view_number = proposal.data.view_number(); let proposed_leaf = Leaf::from_quorum_proposal(&proposal.data); ensure!( - proposed_leaf.parent_commitment() == parent_leaf.commit(&upgrade_lock).await, + proposed_leaf.parent_commitment() == parent_leaf.commit(&task_state.upgrade_lock).await, "Proposed leaf does not extend the parent leaf." ); @@ -471,19 +462,19 @@ pub async fn validate_proposal_safety_and_liveness< ); let view = View { view_inner: ViewInner::Leaf { - leaf: proposed_leaf.commit(&upgrade_lock).await, + leaf: proposed_leaf.commit(&task_state.upgrade_lock).await, state, delta: None, // May be updated to `Some` in the vote task. }, }; { - let mut consensus_write = consensus.write().await; + let mut consensus_write = task_state.consensus.write().await; if let Err(e) = consensus_write.update_validated_state_map(view_number, view.clone()) { tracing::trace!("{e:?}"); } consensus_write - .update_saved_leaves(proposed_leaf.clone(), &upgrade_lock) + .update_saved_leaves(proposed_leaf.clone(), &task_state.upgrade_lock) .await; // Update our internal storage of the proposal. The proposal is valid, so @@ -502,14 +493,14 @@ pub async fn validate_proposal_safety_and_liveness< UpgradeCertificate::validate( &proposal.data.upgrade_certificate, - &quorum_membership, - &upgrade_lock, + &task_state.quorum_membership, + &task_state.upgrade_lock, ) .await?; // Validate that the upgrade certificate is re-attached, if we saw one on the parent proposed_leaf - .extends_upgrade(&parent_leaf, &decided_upgrade_certificate) + .extends_upgrade(&parent_leaf, &task_state.upgrade_lock.decided_upgrade_certificate) .await?; let justify_qc = proposal.data.justify_qc.clone(); @@ -518,7 +509,7 @@ pub async fn validate_proposal_safety_and_liveness< // Liveness check. { - let read_consensus = consensus.read().await; + let read_consensus = task_state.consensus.read().await; let liveness_check = justify_qc.view_number() > read_consensus.locked_view(); // Safety check. @@ -542,7 +533,7 @@ pub async fn validate_proposal_safety_and_liveness< view_number, event: EventType::Error { error: Arc::new(e) }, }, - &event_sender, + &task_state.output_event_stream, ) .await; } @@ -553,7 +544,7 @@ pub async fn validate_proposal_safety_and_liveness< // Update our persistent storage of the proposal. If we cannot store the proposal reutrn // and error so we don't vote - storage.write().await.append_proposal(&proposal).await?; + task_state.storage.write().await.append_proposal(&proposal).await?; // We accept the proposal, notify the application layer broadcast_event( @@ -564,7 +555,7 @@ pub async fn validate_proposal_safety_and_liveness< sender, }, }, - &event_sender, + &task_state.output_event_stream, ) .await; @@ -665,25 +656,23 @@ pub async fn validate_proposal_view_and_certs( /// /// # Errors /// Returns an [`anyhow::Error`] when the new view is not greater than the current view. -/// TODO: Remove args when we merge dependency tasks. -#[allow(clippy::too_many_arguments)] -pub(crate) async fn update_view( +pub(crate) async fn update_view< + TYPES: NodeType, + I: NodeImplementation, + V: Versions, +>( new_view: TYPES::Time, event_stream: &Sender>>, - timeout: u64, - consensus: OuterConsensus, - cur_view: &mut TYPES::Time, - cur_view_time: &mut i64, - timeout_task: &mut JoinHandle<()>, - output_event_stream: &Sender>, - is_old_view_leader: bool, + task_state: &mut QuorumProposalRecvTaskState, ) -> Result<()> { + ensure!( - new_view > *cur_view, + new_view > task_state.cur_view, "New view is not greater than our current view" ); - let old_view = *cur_view; + let is_old_view_leader = task_state.quorum_membership.leader(task_state.cur_view) == task_state.public_key; + let old_view = task_state.cur_view; debug!("Updating view from {} to {}", *old_view, *new_view); @@ -691,10 +680,10 @@ pub(crate) async fn update_view( info!("Progress: entered view {:>6}", *new_view); } - *cur_view = new_view; + task_state.cur_view = new_view; // The next view is just the current view + 1 - let next_view = *cur_view + 1; + let next_view = task_state.cur_view + 1; futures::join! { broadcast_event(Arc::new(HotShotEvent::ViewChange(new_view)), event_stream), @@ -705,7 +694,7 @@ pub(crate) async fn update_view( view_number: old_view, }, }, - output_event_stream, + &task_state.output_event_stream, ) }; @@ -715,7 +704,7 @@ pub(crate) async fn update_view( // Nuance: We timeout on the view + 1 here because that means that we have // not seen evidence to transition to this new view let view_number = next_view; - let timeout = Duration::from_millis(timeout); + let timeout = Duration::from_millis(task_state.timeout); async move { async_sleep(timeout).await; broadcast_event( @@ -727,30 +716,30 @@ pub(crate) async fn update_view( }); // cancel the old timeout task - cancel_task(std::mem::replace(timeout_task, new_timeout_task)).await; + cancel_task(std::mem::replace(&mut task_state.timeout_task, new_timeout_task)).await; - let consensus = consensus.upgradable_read().await; + let consensus = task_state.consensus.upgradable_read().await; consensus .metrics .current_view - .set(usize::try_from(cur_view.u64()).unwrap()); + .set(usize::try_from(task_state.cur_view.u64()).unwrap()); let new_view_time = Utc::now().timestamp(); if is_old_view_leader { #[allow(clippy::cast_precision_loss)] consensus .metrics .view_duration_as_leader - .add_point((new_view_time - *cur_view_time) as f64); + .add_point((new_view_time - task_state.cur_view_time) as f64); } - *cur_view_time = new_view_time; + task_state.cur_view_time = new_view_time; // Do the comparison before the subtraction to avoid potential overflow, since // `last_decided_view` may be greater than `cur_view` if the node is catching up. - if usize::try_from(cur_view.u64()).unwrap() + if usize::try_from(task_state.cur_view.u64()).unwrap() > usize::try_from(consensus.last_decided_view().u64()).unwrap() { consensus.metrics.number_of_views_since_last_decide.set( - usize::try_from(cur_view.u64()).unwrap() + usize::try_from(task_state.cur_view.u64()).unwrap() - usize::try_from(consensus.last_decided_view().u64()).unwrap(), ); } diff --git a/crates/task-impls/src/lib.rs b/crates/task-impls/src/lib.rs index 754d2a972e..1f55bd9146 100644 --- a/crates/task-impls/src/lib.rs +++ b/crates/task-impls/src/lib.rs @@ -8,7 +8,7 @@ //! consensus in an event driven way /// The task which implements the core state logic of consensus. -pub mod consensus2; +pub mod consensus; /// The task which handles the logic for the quorum vote. pub mod quorum_vote; diff --git a/crates/task-impls/src/quorum_proposal_recv/handlers.rs b/crates/task-impls/src/quorum_proposal_recv/handlers.rs index 37b7ba5d12..873ee0d880 100644 --- a/crates/task-impls/src/quorum_proposal_recv/handlers.rs +++ b/crates/task-impls/src/quorum_proposal_recv/handlers.rs @@ -93,17 +93,10 @@ async fn validate_proposal_liveness( + if let Err(e) = update_view::( view_number, event_sender, - task_state.timeout, - OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)), - &mut task_state.cur_view, - &mut task_state.cur_view_time, - &mut task_state.timeout_task, - &task_state.output_event_stream, - task_state.quorum_membership.leader(cur_view) == task_state.public_key, + task_state, ) .await { @@ -138,7 +131,6 @@ pub(crate) async fn handle_quorum_proposal_recv< task_state: &mut QuorumProposalRecvTaskState, ) -> Result<()> { let quorum_proposal_sender_key = quorum_proposal_sender_key.clone(); - let cur_view = task_state.cur_view; validate_proposal_view_and_certs( proposal, @@ -250,29 +242,17 @@ pub(crate) async fn handle_quorum_proposal_recv< validate_proposal_safety_and_liveness::( proposal.clone(), parent_leaf, - OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)), - Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate), - Arc::clone(&task_state.quorum_membership), + task_state, event_sender.clone(), quorum_proposal_sender_key, - task_state.output_event_stream.clone(), - task_state.id, - task_state.upgrade_lock.clone(), - Arc::clone(&task_state.storage), ) .await?; // NOTE: We could update our view with a valid TC but invalid QC, but that is not what we do here - if let Err(e) = update_view::( + if let Err(e) = update_view::( view_number, event_sender, - task_state.timeout, - OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)), - &mut task_state.cur_view, - &mut task_state.cur_view_time, - &mut task_state.timeout_task, - &task_state.output_event_stream, - task_state.quorum_membership.leader(cur_view) == task_state.public_key, + task_state, ) .await { diff --git a/crates/testing/src/view_generator.rs b/crates/testing/src/view_generator.rs index e574859669..315ac2d5e8 100644 --- a/crates/testing/src/view_generator.rs +++ b/crates/testing/src/view_generator.rs @@ -63,7 +63,7 @@ pub struct TestView { pub da_certificate: DaCertificate, pub transactions: Vec, upgrade_data: Option>, - pub formed_upgrade_certificate: Option>, + formed_upgrade_certificate: Option>, view_sync_finalize_data: Option>, timeout_cert_data: Option>, upgrade_lock: UpgradeLock, diff --git a/crates/testing/tests/tests_1/upgrade_task_with_proposal.rs b/crates/testing/tests/tests_1/upgrade_task_with_proposal.rs index 74ef7800e1..5bf95997c0 100644 --- a/crates/testing/tests/tests_1/upgrade_task_with_proposal.rs +++ b/crates/testing/tests/tests_1/upgrade_task_with_proposal.rs @@ -17,7 +17,7 @@ use hotshot_example_types::{ }; use hotshot_macros::{run_test, test_scripts}; use hotshot_task_impls::{ - consensus2::Consensus2TaskState, events::HotShotEvent::*, + consensus::ConsensusTaskState, events::HotShotEvent::*, quorum_proposal::QuorumProposalTaskState, upgrade::UpgradeTaskState, }; use hotshot_testing::{ diff --git a/crates/testing/tests/tests_1/upgrade_task_with_vote.rs b/crates/testing/tests/tests_1/upgrade_task_with_vote.rs index d7bdb28dad..88112683a5 100644 --- a/crates/testing/tests/tests_1/upgrade_task_with_vote.rs +++ b/crates/testing/tests/tests_1/upgrade_task_with_vote.rs @@ -17,7 +17,7 @@ use hotshot_example_types::{ }; use hotshot_macros::{run_test, test_scripts}; use hotshot_task_impls::{ - consensus2::Consensus2TaskState, events::HotShotEvent::*, quorum_vote::QuorumVoteTaskState, + consensus::ConsensusTaskState, events::HotShotEvent::*, quorum_vote::QuorumVoteTaskState, upgrade::UpgradeTaskState, }; use hotshot_testing::{ diff --git a/justfile b/justfile index 64bf115ae1..c067269f75 100644 --- a/justfile +++ b/justfile @@ -145,7 +145,7 @@ test_quorum_proposal_recv_task: cargo test --lib --bins --tests --benches --workspace --no-fail-fast test_quorum_proposal_recv_task -- --test-threads=1 --nocapture test_upgrade_task: - echo Testing the upgrade task without dependency tasks + echo Testing the upgrade task cargo test --lib --bins --tests --benches --workspace --no-fail-fast test_upgrade_task -- --test-threads=1 --nocapture test_pkg := "hotshot"