diff --git a/crates/task-impls/src/events.rs b/crates/task-impls/src/events.rs index 8fc85a6031..4aea9b5ec8 100644 --- a/crates/task-impls/src/events.rs +++ b/crates/task-impls/src/events.rs @@ -100,7 +100,7 @@ pub enum HotShotEvent { /// 2. The proposal has been correctly signed by the leader of the current view /// 3. The justify QC is valid /// 4. The proposal passes either liveness or safety check. - QuorumProposalValidated(QuorumProposal, Leaf), + QuorumProposalValidated(Proposal>, Leaf), /// A quorum proposal is missing for a view that we need. QuorumProposalRequestSend( ProposalRequestPayload, @@ -267,9 +267,14 @@ impl HotShotEvent { Some(v.view_number()) } HotShotEvent::QuorumProposalRecv(proposal, _) - | HotShotEvent::QuorumProposalSend(proposal, _) => Some(proposal.data.view_number()), + | HotShotEvent::QuorumProposalSend(proposal, _) + | HotShotEvent::QuorumProposalValidated(proposal, _) + | HotShotEvent::QuorumProposalResponseSend(_, proposal) + | HotShotEvent::QuorumProposalResponseRecv(proposal) + | HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => { + Some(proposal.data.view_number()) + } HotShotEvent::QuorumVoteSend(vote) => Some(vote.view_number()), - HotShotEvent::QuorumProposalValidated(proposal, _) => Some(proposal.view_number()), HotShotEvent::DaProposalRecv(proposal, _) | HotShotEvent::DaProposalValidated(proposal, _) | HotShotEvent::DaProposalSend(proposal, _) => Some(proposal.data.view_number()), @@ -311,11 +316,6 @@ impl HotShotEvent { } HotShotEvent::QuorumProposalRequestSend(req, _) | HotShotEvent::QuorumProposalRequestRecv(req, _) => Some(req.view_number), - HotShotEvent::QuorumProposalResponseSend(_, proposal) - | HotShotEvent::QuorumProposalResponseRecv(proposal) - | HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => { - Some(proposal.data.view_number()) - } HotShotEvent::QuorumVoteDependenciesValidated(view_number) | HotShotEvent::ViewChange(view_number) | HotShotEvent::ViewSyncTimeout(view_number, _, _) @@ -398,7 +398,7 @@ impl Display for HotShotEvent { HotShotEvent::QuorumProposalValidated(proposal, _) => write!( f, "QuorumProposalValidated(view_number={:?})", - proposal.view_number() + proposal.data.view_number() ), HotShotEvent::DaProposalSend(proposal, _) => write!( f, diff --git a/crates/task-impls/src/helpers.rs b/crates/task-impls/src/helpers.rs index 07b2a3604c..3c2d5c423a 100644 --- a/crates/task-impls/src/helpers.rs +++ b/crates/task-impls/src/helpers.rs @@ -30,7 +30,6 @@ use hotshot_types::{ election::Membership, node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions}, signature_key::SignatureKey, - storage::Storage, BlockPayload, ValidatedState, }, utils::{Terminator, View, ViewInner}, @@ -119,6 +118,9 @@ pub(crate) async fn fetch_proposal( } } + } else { + // If the dep returns early return none + return None; } } @@ -546,17 +548,6 @@ pub async fn validate_proposal_safety_and_liveness< }); } - // Update our persistent storage of the proposal. If we cannot store the proposal reutrn - // and error so we don't vote - task_state - .storage - .write() - .await - .append_proposal(&proposal) - .await - .wrap() - .context(error!("Failed to append proposal in storage!"))?; - // We accept the proposal, notify the application layer broadcast_event( Event { @@ -573,7 +564,7 @@ pub async fn validate_proposal_safety_and_liveness< // Notify other tasks broadcast_event( Arc::new(HotShotEvent::QuorumProposalValidated( - proposal.data.clone(), + proposal.clone(), parent_leaf, )), &event_stream, diff --git a/crates/task-impls/src/quorum_proposal_recv/handlers.rs b/crates/task-impls/src/quorum_proposal_recv/handlers.rs index a1800a8178..488854e95d 100644 --- a/crates/task-impls/src/quorum_proposal_recv/handlers.rs +++ b/crates/task-impls/src/quorum_proposal_recv/handlers.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use async_broadcast::{broadcast, Receiver, Sender}; +use async_compatibility_layer::art::async_spawn; use async_lock::RwLockUpgradableReadGuard; use committable::Committable; use hotshot_types::{ @@ -19,6 +20,7 @@ use hotshot_types::{ traits::{ election::Membership, node_implementation::{NodeImplementation, NodeType}, + signature_key::SignatureKey, storage::Storage, ValidatedState, }, @@ -104,6 +106,35 @@ async fn validate_proposal_liveness( + view: TYPES::View, + event_sender: Sender>>, + event_receiver: Receiver>>, + membership: Arc, + consensus: OuterConsensus, + sender_public_key: TYPES::SignatureKey, + sender_private_key: ::PrivateKey, + upgrade_lock: UpgradeLock, +) { + async_spawn(async move { + let lock = upgrade_lock; + + let _ = fetch_proposal( + view, + event_sender, + event_receiver, + membership, + consensus, + sender_public_key, + sender_private_key, + &lock, + ) + .await; + }); +} + /// Handles the `QuorumProposalRecv` event by first validating the cert itself for the view, and then /// updating the states, which runs when the proposal cannot be found in the internal state map. /// @@ -155,7 +186,7 @@ pub(crate) async fn handle_quorum_proposal_recv< .await; // Get the parent leaf and state. - let mut parent_leaf = task_state + let parent_leaf = task_state .consensus .read() .await @@ -163,9 +194,8 @@ pub(crate) async fn handle_quorum_proposal_recv< .get(&justify_qc.data.leaf_commit) .cloned(); - parent_leaf = match parent_leaf { - Some(p) => Some(p), - None => fetch_proposal( + if parent_leaf.is_none() { + spawn_fetch_proposal( justify_qc.view_number(), event_sender.clone(), event_receiver.clone(), @@ -176,11 +206,9 @@ pub(crate) async fn handle_quorum_proposal_recv< // incorrectly. task_state.public_key.clone(), task_state.private_key.clone(), - &task_state.upgrade_lock, - ) - .await - .ok(), - }; + task_state.upgrade_lock.clone(), + ); + } let consensus_reader = task_state.consensus.read().await; let parent = match parent_leaf { diff --git a/crates/task-impls/src/quorum_vote/mod.rs b/crates/task-impls/src/quorum_vote/mod.rs index a25b6d2213..d94e4cc105 100644 --- a/crates/task-impls/src/quorum_vote/mod.rs +++ b/crates/task-impls/src/quorum_vote/mod.rs @@ -285,7 +285,7 @@ impl + 'static, V: Versions> Handl match event.as_ref() { #[allow(unused_assignments)] HotShotEvent::QuorumProposalValidated(proposal, parent_leaf) => { - let proposal_payload_comm = proposal.block_header.payload_commitment(); + let proposal_payload_comm = proposal.data.block_header.payload_commitment(); if let Some(ref comm) = payload_commitment { if proposal_payload_comm != *comm { tracing::error!("Quorum proposal has inconsistent payload commitment with DAC or VID."); @@ -295,11 +295,17 @@ impl + 'static, V: Versions> Handl payload_commitment = Some(proposal_payload_comm); } let parent_commitment = parent_leaf.commit(&self.upgrade_lock).await; - let proposed_leaf = Leaf::from_quorum_proposal(proposal); + let proposed_leaf = Leaf::from_quorum_proposal(&proposal.data); if proposed_leaf.parent_commitment() != parent_commitment { tracing::warn!("Proposed leaf parent commitment does not match parent leaf payload commitment. Aborting vote."); return; } + // Update our persistent storage of the proposal. If we cannot store the proposal reutrn + // and error so we don't vote + if let Err(e) = self.storage.write().await.append_proposal(proposal).await { + tracing::error!("failed to store proposal, not voting. error = {e:#}"); + return; + } leaf = Some(proposed_leaf); } HotShotEvent::DaCertificateValidated(cert) => { @@ -424,7 +430,7 @@ impl, V: Versions> QuorumVoteTaskS let event_view = match dependency_type { VoteDependency::QuorumProposal => { if let HotShotEvent::QuorumProposalValidated(proposal, _) = event { - proposal.view_number + proposal.data.view_number } else { return false; } @@ -549,11 +555,14 @@ impl, V: Versions> QuorumVoteTaskS match event.as_ref() { HotShotEvent::QuorumProposalValidated(proposal, _leaf) => { let cur_epoch = self.consensus.read().await.cur_epoch(); - tracing::trace!("Received Proposal for view {}", *proposal.view_number()); + tracing::trace!( + "Received Proposal for view {}", + *proposal.data.view_number() + ); // Handle the event before creating the dependency task. if let Err(e) = - handle_quorum_proposal_validated(proposal, &event_sender, self).await + handle_quorum_proposal_validated(&proposal.data, &event_sender, self).await { tracing::debug!( "Failed to handle QuorumProposalValidated event; error = {e:#}" @@ -561,7 +570,7 @@ impl, V: Versions> QuorumVoteTaskS } self.create_dependency_task_if_new( - proposal.view_number, + proposal.data.view_number, cur_epoch, event_receiver, &event_sender, diff --git a/crates/task-impls/src/request.rs b/crates/task-impls/src/request.rs index 1344c31ee6..6964e38cd7 100644 --- a/crates/task-impls/src/request.rs +++ b/crates/task-impls/src/request.rs @@ -96,7 +96,7 @@ impl> TaskState for NetworkRequest ) -> Result<()> { match event.as_ref() { HotShotEvent::QuorumProposalValidated(proposal, _) => { - let prop_view = proposal.view_number(); + let prop_view = proposal.data.view_number(); let cur_epoch = self.consensus.read().await.cur_epoch(); // If we already have the VID shares for the next view, do nothing. diff --git a/crates/testing/src/byzantine/byzantine_behaviour.rs b/crates/testing/src/byzantine/byzantine_behaviour.rs index 825108e48b..e92e33cdd4 100644 --- a/crates/testing/src/byzantine/byzantine_behaviour.rs +++ b/crates/testing/src/byzantine/byzantine_behaviour.rs @@ -186,7 +186,7 @@ impl + std::fmt::Debug, V: Version ]; } HotShotEvent::QuorumProposalValidated(proposal, _) => { - self.validated_proposals.push(proposal.clone()); + self.validated_proposals.push(proposal.data.clone()); } _ => {} } diff --git a/crates/testing/tests/tests_1/quorum_proposal_recv_task.rs b/crates/testing/tests/tests_1/quorum_proposal_recv_task.rs index 55ef973f9d..d5a83a1953 100644 --- a/crates/testing/tests/tests_1/quorum_proposal_recv_task.rs +++ b/crates/testing/tests/tests_1/quorum_proposal_recv_task.rs @@ -111,7 +111,7 @@ async fn test_quorum_proposal_recv_task() { .await, )), exact(QuorumProposalValidated( - proposals[1].data.clone(), + proposals[1].clone(), leaves[0].clone(), )), exact(ViewChange(ViewNumber::new(2))), diff --git a/crates/testing/tests/tests_1/quorum_vote_task.rs b/crates/testing/tests/tests_1/quorum_vote_task.rs index e981ed9bda..3030a1aea2 100644 --- a/crates/testing/tests/tests_1/quorum_vote_task.rs +++ b/crates/testing/tests/tests_1/quorum_vote_task.rs @@ -75,7 +75,7 @@ async fn test_quorum_vote_task_success() { // Send the quorum proposal, DAC, VID share data, and validated state, in which case a dummy // vote can be formed and the view number will be updated. let inputs = vec![random![ - QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()), + QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()), DaCertificateRecv(dacs[1].clone()), VidShareRecv(leaders[1], vids[1].0[0].clone()), ]]; @@ -150,11 +150,11 @@ async fn test_quorum_vote_task_miss_dependency() { // Send two of quorum proposal, DAC, VID share data, in which case there's no vote. let inputs = vec![ random![ - QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()), + QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()), VidShareRecv(leaders[1], vid_share(&vids[1].0, handle.public_key())), ], random![ - QuorumProposalValidated(proposals[2].data.clone(), leaves[1].clone()), + QuorumProposalValidated(proposals[2].clone(), leaves[1].clone()), DaCertificateRecv(dacs[2].clone()), ], random![ @@ -223,7 +223,7 @@ async fn test_quorum_vote_task_incorrect_dependency() { // Send the correct quorum proposal and DAC, and incorrect VID share data. let inputs = vec![random![ - QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()), + QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()), DaCertificateRecv(dacs[1].clone()), VidShareRecv(leaders[0], vids[0].0[0].clone()), ]]; diff --git a/crates/testing/tests/tests_1/upgrade_task_with_vote.rs b/crates/testing/tests/tests_1/upgrade_task_with_vote.rs index 88112683a5..212ca114bb 100644 --- a/crates/testing/tests/tests_1/upgrade_task_with_vote.rs +++ b/crates/testing/tests/tests_1/upgrade_task_with_vote.rs @@ -108,27 +108,27 @@ async fn test_upgrade_task_with_vote() { let inputs = vec![ random![ - QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()), + QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()), DaCertificateRecv(dacs[1].clone()), VidShareRecv(leaders[1], vids[1].0[0].clone()), ], random![ - QuorumProposalValidated(proposals[2].data.clone(), leaves[1].clone()), + QuorumProposalValidated(proposals[2].clone(), leaves[1].clone()), DaCertificateRecv(dacs[2].clone()), VidShareRecv(leaders[2], vids[2].0[0].clone()), ], random![ - QuorumProposalValidated(proposals[3].data.clone(), leaves[2].clone()), + QuorumProposalValidated(proposals[3].clone(), leaves[2].clone()), DaCertificateRecv(dacs[3].clone()), VidShareRecv(leaders[3], vids[3].0[0].clone()), ], random![ - QuorumProposalValidated(proposals[4].data.clone(), leaves[3].clone()), + QuorumProposalValidated(proposals[4].clone(), leaves[3].clone()), DaCertificateRecv(dacs[4].clone()), VidShareRecv(leaders[4], vids[4].0[0].clone()), ], random![QuorumProposalValidated( - proposals[5].data.clone(), + proposals[5].clone(), leaves[5].clone() ),], ]; diff --git a/crates/testing/tests/tests_1/vote_dependency_handle.rs b/crates/testing/tests/tests_1/vote_dependency_handle.rs index a65d360428..085e37862a 100644 --- a/crates/testing/tests/tests_1/vote_dependency_handle.rs +++ b/crates/testing/tests/tests_1/vote_dependency_handle.rs @@ -69,7 +69,7 @@ async fn test_vote_dependency_handle() { // the dependency handles do not (yet) work with the existing test suite. let all_inputs = vec![ DaCertificateValidated(dacs[1].clone()), - QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()), + QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()), VidShareValidated(vids[1].0[0].clone()), ] .into_iter()