From f81c9ca4e63ddd083af9e5d616c92ffe1fe40c3e Mon Sep 17 00:00:00 2001 From: pls148 <184445976+pls148@users.noreply.github.com> Date: Thu, 10 Oct 2024 18:00:27 -0600 Subject: [PATCH] remove consensus, extraneous parameter for update_view, repoint deps --- crates/hotshot/src/lib.rs | 2 - crates/hotshot/src/tasks/mod.rs | 7 - crates/hotshot/src/tasks/task_state.rs | 38 +- crates/task-impls/src/consensus/handlers.rs | 891 ------------------ crates/task-impls/src/consensus/mod.rs | 744 --------------- crates/task-impls/src/helpers.rs | 29 +- crates/task-impls/src/lib.rs | 3 - .../src/quorum_proposal_recv/handlers.rs | 4 +- .../src/quorum_proposal_recv/mod.rs | 1 - crates/testing/src/predicates/event.rs | 55 ++ crates/testing/src/predicates/mod.rs | 4 - .../src/predicates/upgrade_with_consensus.rs | 61 -- crates/testing/src/view_generator.rs | 2 +- .../testing/tests/tests_1/consensus_task.rs | 698 -------------- .../tests_1/quorum_proposal_recv_task.rs | 3 - .../tests/tests_1/quorum_proposal_task.rs | 2 - .../testing/tests/tests_1/quorum_vote_task.rs | 1 - crates/testing/tests/tests_1/test_success.rs | 3 - .../tests_1/upgrade_task_with_consensus.rs | 678 ------------- .../tests_1/upgrade_task_with_proposal.rs | 1 - .../tests/tests_1/upgrade_task_with_vote.rs | 2 - .../tests/tests_1/vote_dependency_handle.rs | 2 - 22 files changed, 68 insertions(+), 3163 deletions(-) delete mode 100644 crates/task-impls/src/consensus/handlers.rs delete mode 100644 crates/task-impls/src/consensus/mod.rs delete mode 100644 crates/testing/src/predicates/upgrade_with_consensus.rs delete mode 100644 crates/testing/tests/tests_1/consensus_task.rs delete mode 100644 crates/testing/tests/tests_1/upgrade_task_with_consensus.rs diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index fa85394aeb..eb63cb630b 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -350,7 +350,6 @@ impl, V: Versions> SystemContext, V: Versions> SystemContext, } { - #![cfg(not(feature = "dependency-tasks"))] - use hotshot_task_impls::consensus::ConsensusTaskState; - - handle.add_task(ConsensusTaskState::::create_from(handle).await); - } - { - #![cfg(feature = "dependency-tasks")] use hotshot_task_impls::{ consensus2::Consensus2TaskState, quorum_proposal::QuorumProposalTaskState, quorum_proposal_recv::QuorumProposalRecvTaskState, quorum_vote::QuorumVoteTaskState, diff --git a/crates/hotshot/src/tasks/task_state.rs b/crates/hotshot/src/tasks/task_state.rs index c1a10f4646..c9fe9b58eb 100644 --- a/crates/hotshot/src/tasks/task_state.rs +++ b/crates/hotshot/src/tasks/task_state.rs @@ -13,7 +13,7 @@ use async_compatibility_layer::art::async_spawn; use async_trait::async_trait; use chrono::Utc; use hotshot_task_impls::{ - builder::BuilderClient, consensus::ConsensusTaskState, consensus2::Consensus2TaskState, + builder::BuilderClient, consensus2::Consensus2TaskState, da::DaTaskState, quorum_proposal::QuorumProposalTaskState, quorum_proposal_recv::QuorumProposalRecvTaskState, quorum_vote::QuorumVoteTaskState, request::NetworkRequestState, rewind::RewindTaskState, transactions::TransactionTaskState, @@ -214,42 +214,6 @@ impl, V: Versions> CreateTaskState } } -#[async_trait] -impl, V: Versions> CreateTaskState - for ConsensusTaskState -{ - async fn create_from(handle: &SystemContextHandle) -> Self { - let consensus = handle.hotshot.consensus(); - - Self { - consensus: OuterConsensus::new(consensus), - instance_state: handle.hotshot.instance_state(), - timeout: handle.hotshot.config.next_view_timeout, - round_start_delay: handle.hotshot.config.round_start_delay, - cur_view: handle.cur_view().await, - cur_view_time: Utc::now().timestamp(), - payload_commitment_and_metadata: None, - vote_collectors: BTreeMap::default(), - timeout_vote_collectors: BTreeMap::default(), - timeout_task: async_spawn(async {}), - spawned_tasks: BTreeMap::new(), - formed_upgrade_certificate: None, - proposal_cert: None, - output_event_stream: handle.hotshot.external_event_stream.0.clone(), - current_proposal: None, - id: handle.hotshot.id, - public_key: handle.public_key().clone(), - private_key: handle.private_key().clone(), - network: Arc::clone(&handle.hotshot.network), - timeout_membership: handle.hotshot.memberships.quorum_membership.clone().into(), - quorum_membership: handle.hotshot.memberships.quorum_membership.clone().into(), - da_membership: handle.hotshot.memberships.da_membership.clone().into(), - storage: Arc::clone(&handle.storage), - upgrade_lock: handle.hotshot.upgrade_lock.clone(), - } - } -} - #[async_trait] impl, V: Versions> CreateTaskState for QuorumVoteTaskState diff --git a/crates/task-impls/src/consensus/handlers.rs b/crates/task-impls/src/consensus/handlers.rs deleted file mode 100644 index 23e3459d27..0000000000 --- a/crates/task-impls/src/consensus/handlers.rs +++ /dev/null @@ -1,891 +0,0 @@ -// Copyright (c) 2021-2024 Espresso Systems (espressosys.com) -// This file is part of the HotShot repository. - -// You should have received a copy of the MIT License -// along with the HotShot repository. If not, see . - -use anyhow::{bail, ensure, Context, Result}; -use async_broadcast::{Receiver, Sender}; -use async_compatibility_layer::art::{async_sleep, async_spawn}; -use async_lock::RwLock; -#[cfg(async_executor_impl = "async-std")] -use async_std::task::JoinHandle; -use chrono::Utc; -use core::time::Duration; -use futures::FutureExt; -use hotshot_types::{ - consensus::{CommitmentAndMetadata, OuterConsensus, View}, - data::{null_block, Leaf, QuorumProposal, ViewChangeEvidence}, - event::{Event, EventType}, - message::{GeneralConsensusMessage, Proposal}, - simple_certificate::UpgradeCertificate, - simple_vote::QuorumData, - traits::{ - block_contents::BlockHeader, - election::Membership, - node_implementation::{ConsensusTime, NodeImplementation, NodeType}, - signature_key::SignatureKey, - states::ValidatedState, - storage::Storage, - }, - utils::ViewInner, - vote::{Certificate, HasViewNumber}, -}; -use std::{marker::PhantomData, sync::Arc}; -#[cfg(async_executor_impl = "tokio")] -use tokio::task::JoinHandle; -use tracing::{debug, error, info, instrument, warn}; -use vbs::version::StaticVersionType; - -use super::ConsensusTaskState; -use crate::{ - consensus::{UpgradeLock, Versions}, - events::HotShotEvent, - helpers::{ - broadcast_event, decide_from_proposal, fetch_proposal, parent_leaf_and_state, update_view, - validate_proposal_safety_and_liveness, validate_proposal_view_and_certs, AnyhowTracing, - SEND_VIEW_CHANGE_EVENT, - }, -}; - -/// Create the header for a proposal, build the proposal, and broadcast -/// the proposal send evnet. -#[allow(clippy::too_many_arguments)] -#[instrument(skip_all, fields(id = id, view = *view))] -pub async fn create_and_send_proposal( - public_key: TYPES::SignatureKey, - private_key: ::PrivateKey, - consensus: OuterConsensus, - event_stream: Sender>>, - view: TYPES::Time, - commitment_and_metadata: CommitmentAndMetadata, - parent_leaf: Leaf, - state: Arc, - upgrade_cert: Option>, - proposal_cert: Option>, - round_start_delay: u64, - instance_state: Arc, - upgrade_lock: UpgradeLock, - id: u64, -) -> Result<()> { - let consensus_read = consensus.read().await; - let vid_share = consensus_read - .vid_shares() - .get(&view) - .map(|shares| shares.get(&public_key).cloned()) - .context(format!( - "Cannot propopse without our VID share, view {view:?}" - ))? - .context("Failed to get vid share")?; - drop(consensus_read); - - let version = upgrade_lock - .version(view) - .await - .context("Failed to get version number")?; - - let block_header = if version < V::Marketplace::VERSION { - TYPES::BlockHeader::new_legacy( - state.as_ref(), - instance_state.as_ref(), - &parent_leaf, - commitment_and_metadata.commitment, - commitment_and_metadata.builder_commitment, - commitment_and_metadata.metadata, - commitment_and_metadata.fees.first().clone(), - vid_share.data.common, - version, - ) - .await - .context("Failed to construct legacy block header")? - } else { - TYPES::BlockHeader::new_marketplace( - state.as_ref(), - instance_state.as_ref(), - &parent_leaf, - commitment_and_metadata.commitment, - commitment_and_metadata.builder_commitment, - commitment_and_metadata.metadata, - commitment_and_metadata.fees.to_vec(), - vid_share.data.common, - commitment_and_metadata.auction_result, - version, - ) - .await - .context("Failed to construct marketplace block header")? - }; - - let proposal = QuorumProposal { - block_header, - view_number: view, - justify_qc: consensus.read().await.high_qc().clone(), - proposal_certificate: proposal_cert, - upgrade_certificate: upgrade_cert, - }; - - let proposed_leaf = Leaf::from_quorum_proposal(&proposal); - - ensure!(proposed_leaf.parent_commitment() == parent_leaf.commit(&upgrade_lock).await); - - let signature = TYPES::SignatureKey::sign( - &private_key, - proposed_leaf.commit(&upgrade_lock).await.as_ref(), - )?; - - let message = Proposal { - data: proposal, - signature, - _pd: PhantomData, - }; - - debug!( - "Sending proposal for view {:?} ID: {}", - proposed_leaf.view_number(), - id, - ); - - async_sleep(Duration::from_millis(round_start_delay)).await; - - broadcast_event( - Arc::new(HotShotEvent::QuorumProposalSend( - message.clone(), - public_key, - )), - &event_stream, - ) - .await; - - Ok(()) -} - -/// Send a proposal for the view `view` from the latest high_qc given an upgrade cert. This is the -/// standard case proposal scenario. -#[allow(clippy::too_many_arguments)] -#[instrument(skip_all)] -pub async fn publish_proposal_from_commitment_and_metadata( - view: TYPES::Time, - sender: Sender>>, - receiver: Receiver>>, - quorum_membership: Arc, - public_key: TYPES::SignatureKey, - private_key: ::PrivateKey, - consensus: OuterConsensus, - delay: u64, - formed_upgrade_certificate: Option>, - upgrade_lock: UpgradeLock, - commitment_and_metadata: Option>, - proposal_cert: Option>, - instance_state: Arc, - id: u64, -) -> Result> { - let (parent_leaf, state) = parent_leaf_and_state( - view, - &sender, - &receiver, - quorum_membership, - public_key.clone(), - private_key.clone(), - OuterConsensus::new(Arc::clone(&consensus.inner_consensus)), - &upgrade_lock, - ) - .await?; - - // In order of priority, we should try to attach: - // - the parent certificate if it exists, or - // - our own certificate that we formed. - // In either case, we need to ensure that the certificate is still relevant. - // - // Note: once we reach a point of potentially propose with our formed upgrade certificate, we will ALWAYS drop it. If we cannot immediately use it for whatever reason, we choose to discard it. - // It is possible that multiple nodes form separate upgrade certificates for the some upgrade if we are not careful about voting. But this shouldn't bother us: the first leader to propose is the one whose certificate will be used. And if that fails to reach a decide for whatever reason, we may lose our own certificate, but something will likely have gone wrong there anyway. - let mut proposal_upgrade_certificate = parent_leaf - .upgrade_certificate() - .or(formed_upgrade_certificate); - - if let Some(cert) = proposal_upgrade_certificate.clone() { - if cert - .is_relevant(view, Arc::clone(&upgrade_lock.decided_upgrade_certificate)) - .await - .is_err() - { - proposal_upgrade_certificate = None; - } - } - - // We only want to proposal to be attached if any of them are valid. - let proposal_certificate = proposal_cert - .as_ref() - .filter(|cert| cert.is_valid_for_view(&view)) - .cloned(); - - ensure!( - commitment_and_metadata.is_some(), - "Cannot propose because we don't have the VID payload commitment and metadata" - ); - - // This is a safe unwrap due to the prior ensure call. - let commitment_and_metadata = commitment_and_metadata.unwrap(); - - ensure!( - commitment_and_metadata.block_view == view, - "Cannot propose because our VID payload commitment and metadata is for an older view." - ); - - let create_and_send_proposal_handle = async_spawn(async move { - match create_and_send_proposal::( - public_key, - private_key, - OuterConsensus::new(Arc::clone(&consensus.inner_consensus)), - sender, - view, - commitment_and_metadata, - parent_leaf.clone(), - state, - proposal_upgrade_certificate, - proposal_certificate, - delay, - instance_state, - upgrade_lock, - id, - ) - .await - { - Ok(()) => {} - Err(e) => { - tracing::error!("Failed to send proposal: {}", e); - } - }; - }); - - Ok(create_and_send_proposal_handle) -} - -/// Handle the received quorum proposal. -/// -/// Returns the proposal that should be used to set the `cur_proposal` for other tasks. -#[allow(clippy::too_many_lines)] -#[instrument(skip_all)] -pub(crate) async fn handle_quorum_proposal_recv< - TYPES: NodeType, - I: NodeImplementation, - V: Versions, ->( - proposal: &Proposal>, - sender: &TYPES::SignatureKey, - event_sender: Sender>>, - event_receiver: Receiver>>, - task_state: &mut ConsensusTaskState, -) -> Result>> { - let sender = sender.clone(); - debug!( - "Received Quorum Proposal for view {}", - *proposal.data.view_number - ); - - let cur_view = task_state.cur_view; - - validate_proposal_view_and_certs( - proposal, - task_state.cur_view, - &task_state.quorum_membership, - &task_state.timeout_membership, - &task_state.upgrade_lock, - ) - .await - .context("Failed to validate proposal view and attached certs")?; - - let view = proposal.data.view_number(); - let justify_qc = proposal.data.justify_qc.clone(); - - if !justify_qc - .is_valid_cert( - task_state.quorum_membership.as_ref(), - &task_state.upgrade_lock, - ) - .await - { - let consensus = task_state.consensus.read().await; - consensus.metrics.invalid_qc.update(1); - bail!("Invalid justify_qc in proposal for view {}", *view); - } - - // NOTE: We could update our view with a valid TC but invalid QC, but that is not what we do here - if let Err(e) = update_view::( - view, - &event_sender, - task_state.timeout, - OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)), - &mut task_state.cur_view, - &mut task_state.cur_view_time, - &mut task_state.timeout_task, - &task_state.output_event_stream, - SEND_VIEW_CHANGE_EVENT, - task_state.quorum_membership.leader(cur_view) == task_state.public_key, - ) - .await - { - debug!("Failed to update view; error = {e:#}"); - } - - let mut parent_leaf = task_state - .consensus - .read() - .await - .saved_leaves() - .get(&justify_qc.date().leaf_commit) - .cloned(); - - parent_leaf = match parent_leaf { - Some(p) => Some(p), - None => fetch_proposal( - justify_qc.view_number(), - event_sender.clone(), - event_receiver.clone(), - Arc::clone(&task_state.quorum_membership), - OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)), - task_state.public_key.clone(), - task_state.private_key.clone(), - &task_state.upgrade_lock, - ) - .await - .ok(), - }; - let consensus_read = task_state.consensus.read().await; - - // Get the parent leaf and state. - let parent = match parent_leaf { - Some(leaf) => { - if let (Some(state), _) = consensus_read.state_and_delta(leaf.view_number()) { - Some((leaf, Arc::clone(&state))) - } else { - bail!("Parent state not found! Consensus internally inconsistent"); - } - } - None => None, - }; - - if justify_qc.view_number() > consensus_read.high_qc().view_number { - if let Err(e) = task_state - .storage - .write() - .await - .update_high_qc(justify_qc.clone()) - .await - { - bail!("Failed to store High QC not voting. Error: {:?}", e); - } - } - - drop(consensus_read); - let mut consensus_write = task_state.consensus.write().await; - - if let Err(e) = consensus_write.update_high_qc(justify_qc.clone()) { - tracing::trace!("{e:?}"); - } - - // Justify qc's leaf commitment is not the same as the parent's leaf commitment, but it should be (in this case) - let Some((parent_leaf, _parent_state)) = parent else { - warn!( - "Proposal's parent missing from storage with commitment: {:?}", - justify_qc.date().leaf_commit - ); - let leaf = Leaf::from_quorum_proposal(&proposal.data); - - let state = Arc::new( - >::from_header( - &proposal.data.block_header, - ), - ); - - if let Err(e) = consensus_write.update_validated_state_map( - view, - View { - view_inner: ViewInner::Leaf { - leaf: leaf.commit(&task_state.upgrade_lock).await, - state, - delta: None, - }, - }, - ) { - tracing::trace!("{e:?}"); - } - - consensus_write - .update_saved_leaves(leaf.clone(), &task_state.upgrade_lock) - .await; - let new_leaves = consensus_write.saved_leaves().clone(); - let new_state = consensus_write.validated_state_map().clone(); - drop(consensus_write); - - if let Err(e) = task_state - .storage - .write() - .await - .update_undecided_state(new_leaves, new_state) - .await - { - warn!("Couldn't store undecided state. Error: {:?}", e); - } - - // If we are missing the parent from storage, the safety check will fail. But we can - // still vote if the liveness check succeeds. - let consensus_read = task_state.consensus.read().await; - let liveness_check = justify_qc.view_number() > consensus_read.locked_view(); - - let high_qc = consensus_read.high_qc().clone(); - let locked_view = consensus_read.locked_view(); - - drop(consensus_read); - - let mut current_proposal = None; - if liveness_check { - current_proposal = Some(proposal.data.clone()); - let new_view = proposal.data.view_number + 1; - - // This is for the case where we form a QC but have not yet seen the previous proposal ourselves - let should_propose = task_state.quorum_membership.leader(new_view) - == task_state.public_key - && high_qc.view_number == current_proposal.clone().unwrap().view_number; - - let qc = high_qc.clone(); - if should_propose { - debug!( - "Attempting to publish proposal after voting for liveness; now in view: {}", - *new_view - ); - let create_and_send_proposal_handle = - publish_proposal_from_commitment_and_metadata( - qc.view_number + 1, - event_sender, - event_receiver, - Arc::clone(&task_state.quorum_membership), - task_state.public_key.clone(), - task_state.private_key.clone(), - OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)), - task_state.round_start_delay, - task_state.formed_upgrade_certificate.clone(), - task_state.upgrade_lock.clone(), - task_state.payload_commitment_and_metadata.clone(), - task_state.proposal_cert.clone(), - Arc::clone(&task_state.instance_state), - task_state.id, - ) - .await?; - - task_state - .spawned_tasks - .entry(view) - .or_default() - .push(create_and_send_proposal_handle); - } - } else { - warn!(?high_qc, ?proposal.data, ?locked_view, "Failed liveneess check; cannot find parent either."); - } - - return Ok(current_proposal); - }; - - task_state - .spawned_tasks - .entry(proposal.data.view_number()) - .or_default() - .push(async_spawn( - validate_proposal_safety_and_liveness::( - proposal.clone(), - parent_leaf, - OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)), - Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate), - Arc::clone(&task_state.quorum_membership), - event_sender.clone(), - sender, - task_state.output_event_stream.clone(), - task_state.id, - task_state.upgrade_lock.clone(), - Arc::clone(&task_state.storage), - ) - .map(AnyhowTracing::err_as_debug), - )); - Ok(None) -} - -/// Handle `QuorumProposalValidated` event content and submit a proposal if possible. -#[allow(clippy::too_many_lines)] -#[instrument(skip_all)] -pub async fn handle_quorum_proposal_validated< - TYPES: NodeType, - I: NodeImplementation, - V: Versions, ->( - proposal: &QuorumProposal, - event_sender: Sender>>, - event_receiver: Receiver>>, - task_state: &mut ConsensusTaskState, -) -> Result<()> { - let view = proposal.view_number(); - task_state.current_proposal = Some(proposal.clone()); - - let res = decide_from_proposal( - proposal, - OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)), - Arc::clone(&task_state.upgrade_lock.decided_upgrade_certificate), - &task_state.public_key, - ) - .await; - - if let Some(cert) = res.decided_upgrade_cert { - let mut decided_certificate_lock = task_state - .upgrade_lock - .decided_upgrade_certificate - .write() - .await; - *decided_certificate_lock = Some(cert.clone()); - drop(decided_certificate_lock); - - let _ = task_state - .storage - .write() - .await - .update_decided_upgrade_certificate(Some(cert.clone())) - .await; - } - - let mut consensus = task_state.consensus.write().await; - if let Some(new_locked_view) = res.new_locked_view_number { - if let Err(e) = consensus.update_locked_view(new_locked_view) { - tracing::trace!("{e:?}"); - } - } - - drop(consensus); - - let new_view = task_state.current_proposal.clone().unwrap().view_number + 1; - // In future we can use the mempool model where we fetch the proposal if we don't have it, instead of having to wait for it here - // This is for the case where we form a QC but have not yet seen the previous proposal ourselves - let should_propose = task_state.quorum_membership.leader(new_view) == task_state.public_key - && task_state.consensus.read().await.high_qc().view_number - == task_state.current_proposal.clone().unwrap().view_number; - - if let Some(new_decided_view) = res.new_decided_view_number { - task_state.cancel_tasks(new_decided_view).await; - } - task_state.current_proposal = Some(proposal.clone()); - task_state - .spawn_vote_task(view, event_sender.clone(), event_receiver.clone()) - .await; - if should_propose { - debug!( - "Attempting to publish proposal after voting; now in view: {}", - *new_view - ); - if let Err(e) = task_state - .publish_proposal(new_view, event_sender.clone(), event_receiver.clone()) - .await - { - debug!("Failed to propose; error = {e:?}"); - }; - } - - #[allow(clippy::cast_precision_loss)] - if let Some(new_anchor_view) = res.new_decided_view_number { - let block_size = res.included_txns.map(|set| set.len().try_into().unwrap()); - let decide_sent = broadcast_event( - Event { - view_number: new_anchor_view, - event: EventType::Decide { - leaf_chain: Arc::new(res.leaf_views), - qc: Arc::new(res.new_decide_qc.unwrap()), - block_size, - }, - }, - &task_state.output_event_stream, - ); - let mut consensus = task_state.consensus.write().await; - - let old_anchor_view = consensus.last_decided_view(); - consensus.collect_garbage(old_anchor_view, new_anchor_view); - if let Err(e) = consensus.update_last_decided_view(new_anchor_view) { - tracing::trace!("{e:?}"); - } - consensus - .metrics - .last_decided_time - .set(Utc::now().timestamp().try_into().unwrap()); - consensus.metrics.invalid_qc.set(0); - consensus - .metrics - .last_decided_view - .set(usize::try_from(consensus.last_decided_view().u64()).unwrap()); - let cur_number_of_views_per_decide_event = - *task_state.cur_view - consensus.last_decided_view().u64(); - consensus - .metrics - .number_of_views_per_decide_event - .add_point(cur_number_of_views_per_decide_event as f64); - - debug!( - "Sending Decide for view {:?}", - consensus.last_decided_view() - ); - drop(consensus); - debug!("Decided txns len {:?}", block_size); - decide_sent.await; - broadcast_event( - Arc::new(HotShotEvent::LeafDecided(res.leaves_decided)), - &event_sender, - ) - .await; - debug!("decide send succeeded"); - } - - Ok(()) -} - -/// Private key, latest decided upgrade certificate, committee membership, and event stream, for -/// sending the vote. -pub(crate) struct VoteInfo { - /// The private key of the voting node. - pub private_key: <::SignatureKey as SignatureKey>::PrivateKey, - - /// The locked upgrade of the voting node. - pub upgrade_lock: UpgradeLock, - - /// The DA Membership handle - pub da_membership: Arc<::Membership>, - - /// The event sending stream. - pub event_sender: Sender>>, - - /// The event receiver stream. - pub event_receiver: Receiver>>, -} - -#[allow(clippy::too_many_arguments)] -#[allow(clippy::too_many_lines)] -#[allow(unused_variables)] -/// Check if we are able to vote, like whether the proposal is valid, -/// whether we have DAC and VID share, and if so, vote. -#[instrument(skip_all, fields(id = id, view = *cur_view))] -pub async fn update_state_and_vote_if_able< - TYPES: NodeType, - I: NodeImplementation, - V: Versions, ->( - cur_view: TYPES::Time, - proposal: QuorumProposal, - public_key: TYPES::SignatureKey, - private_key: ::PrivateKey, - consensus: OuterConsensus, - storage: Arc>, - quorum_membership: Arc, - instance_state: Arc, - vote_info: VoteInfo, - id: u64, - upgrade_lock: &UpgradeLock, -) -> bool { - use hotshot_types::simple_vote::QuorumVote; - - if !quorum_membership.has_stake(&public_key) { - debug!("We were not chosen for quorum committee on {:?}", cur_view); - return false; - } - - let read_consnesus = consensus.read().await; - // Only vote if you has seen the VID share for this view - let Some(vid_shares) = read_consnesus.vid_shares().get(&proposal.view_number) else { - debug!( - "We have not seen the VID share for this view {:?} yet, so we cannot vote.", - proposal.view_number - ); - return false; - }; - let Some(vid_share) = vid_shares.get(&public_key).cloned() else { - debug!("we have not seen our VID share yet"); - return false; - }; - - if let Some(upgrade_cert) = &vote_info - .upgrade_lock - .decided_upgrade_certificate - .read() - .await - .clone() - { - if upgrade_cert.upgrading_in(cur_view) - && Some(proposal.block_header.payload_commitment()) - != null_block::commitment(quorum_membership.total_nodes()) - { - info!("Refusing to vote on proposal because it does not have a null commitment, and we are between versions. Expected:\n\n{:?}\n\nActual:{:?}", null_block::commitment(quorum_membership.total_nodes()), Some(proposal.block_header.payload_commitment())); - return false; - } - } - - // Only vote if you have the DA cert - // ED Need to update the view number this is stored under? - let Some(cert) = read_consnesus.saved_da_certs().get(&cur_view).cloned() else { - return false; - }; - drop(read_consnesus); - - let view = cert.view_number; - // TODO: do some of this logic without the vote token check, only do that when voting. - let justify_qc = proposal.justify_qc.clone(); - let mut parent = consensus - .read() - .await - .saved_leaves() - .get(&justify_qc.date().leaf_commit) - .cloned(); - parent = match parent { - Some(p) => Some(p), - None => fetch_proposal( - justify_qc.view_number(), - vote_info.event_sender.clone(), - vote_info.event_receiver.clone(), - Arc::clone(&quorum_membership), - OuterConsensus::new(Arc::clone(&consensus.inner_consensus)), - public_key.clone(), - private_key.clone(), - upgrade_lock, - ) - .await - .ok(), - }; - - let read_consnesus = consensus.read().await; - - // Justify qc's leaf commitment is not the same as the parent's leaf commitment, but it should be (in this case) - let Some(parent) = parent else { - error!( - "Proposal's parent missing from storage with commitment: {:?}, proposal view {:?}", - justify_qc.date().leaf_commit, - proposal.view_number, - ); - return false; - }; - let (Some(parent_state), _) = read_consnesus.state_and_delta(parent.view_number()) else { - warn!("Parent state not found! Consensus internally inconsistent"); - return false; - }; - drop(read_consnesus); - - let version = match vote_info.upgrade_lock.version(view).await { - Ok(version) => version, - Err(e) => { - error!("Failed to calculate the version: {e:?}"); - return false; - } - }; - let Ok((validated_state, state_delta)) = parent_state - .validate_and_apply_header( - instance_state.as_ref(), - &parent, - &proposal.block_header.clone(), - vid_share.data.common.clone(), - version, - ) - .await - else { - warn!("Block header doesn't extend the proposal!"); - return false; - }; - - let state = Arc::new(validated_state); - let delta = Arc::new(state_delta); - let parent_commitment = parent.commit(upgrade_lock).await; - - let proposed_leaf = Leaf::from_quorum_proposal(&proposal); - if proposed_leaf.parent_commitment() != parent_commitment { - return false; - } - - // Validate the DAC. - let message = if cert - .is_valid_cert(vote_info.da_membership.as_ref(), upgrade_lock) - .await - { - // Validate the block payload commitment for non-genesis DAC. - if cert.date().payload_commit != proposal.block_header.payload_commitment() { - warn!( - "Block payload commitment does not equal da cert payload commitment. View = {}", - *view - ); - return false; - } - if let Ok(vote) = QuorumVote::::create_signed_vote( - QuorumData { - leaf_commit: proposed_leaf.commit(upgrade_lock).await, - }, - view, - &public_key, - &vote_info.private_key, - &vote_info.upgrade_lock, - ) - .await - { - GeneralConsensusMessage::::Vote(vote) - } else { - error!("Unable to sign quorum vote!"); - return false; - } - } else { - error!( - "Invalid DAC in proposal! Skipping proposal. {:?} cur view is: {:?}", - cert, cur_view - ); - return false; - }; - - let mut consensus = consensus.write().await; - if let Err(e) = consensus.update_validated_state_map( - cur_view, - View { - view_inner: ViewInner::Leaf { - leaf: proposed_leaf.commit(upgrade_lock).await, - state: Arc::clone(&state), - delta: Some(Arc::clone(&delta)), - }, - }, - ) { - tracing::trace!("{e:?}"); - } - consensus - .update_saved_leaves(proposed_leaf.clone(), upgrade_lock) - .await; - let new_leaves = consensus.saved_leaves().clone(); - let new_state = consensus.validated_state_map().clone(); - drop(consensus); - - if let Err(e) = storage - .write() - .await - .update_undecided_state(new_leaves, new_state) - .await - { - error!("Couldn't store undecided state. Error: {:?}", e); - } - - if let GeneralConsensusMessage::Vote(vote) = message { - debug!( - "Sending vote to next quorum leader {:?}", - vote.view_number() + 1 - ); - // Add to the storage that we have received the VID disperse for a specific view - if let Err(e) = storage.write().await.append_vid(&vid_share).await { - warn!( - "Failed to store VID Disperse Proposal with error {:?}, aborting vote", - e - ); - return false; - } - broadcast_event( - Arc::new(HotShotEvent::QuorumVoteSend(vote)), - &vote_info.event_sender, - ) - .await; - return true; - } - debug!( - "Received VID share, but couldn't find DAC cert for view {:?}", - *proposal.view_number(), - ); - false -} diff --git a/crates/task-impls/src/consensus/mod.rs b/crates/task-impls/src/consensus/mod.rs deleted file mode 100644 index b8fad36a08..0000000000 --- a/crates/task-impls/src/consensus/mod.rs +++ /dev/null @@ -1,744 +0,0 @@ -// Copyright (c) 2021-2024 Espresso Systems (espressosys.com) -// This file is part of the HotShot repository. - -// You should have received a copy of the MIT License -// along with the HotShot repository. If not, see . - -use std::{collections::BTreeMap, sync::Arc}; - -use anyhow::Result; -use async_broadcast::{Receiver, Sender}; -use async_compatibility_layer::art::async_spawn; -use async_lock::RwLock; -#[cfg(async_executor_impl = "async-std")] -use async_std::task::JoinHandle; -use async_trait::async_trait; -use futures::future::join_all; -use handlers::publish_proposal_from_commitment_and_metadata; -use hotshot_task::task::TaskState; -use hotshot_types::{ - consensus::{CommitmentAndMetadata, OuterConsensus}, - data::{QuorumProposal, VidDisperseShare, ViewChangeEvidence}, - event::{Event, EventType}, - message::{Proposal, UpgradeLock}, - simple_certificate::{QuorumCertificate, TimeoutCertificate, UpgradeCertificate}, - simple_vote::{QuorumVote, TimeoutData, TimeoutVote}, - traits::{ - election::Membership, - node_implementation::{NodeImplementation, NodeType, Versions}, - signature_key::SignatureKey, - storage::Storage, - }, - vid::vid_scheme, - vote::{Certificate, HasViewNumber}, -}; -use jf_vid::VidScheme; -#[cfg(async_executor_impl = "tokio")] -use tokio::task::JoinHandle; -use tracing::{debug, error, info, instrument, warn}; - -use crate::{ - consensus::handlers::{ - handle_quorum_proposal_recv, handle_quorum_proposal_validated, - update_state_and_vote_if_able, VoteInfo, - }, - events::HotShotEvent, - helpers::{broadcast_event, cancel_task, update_view, DONT_SEND_VIEW_CHANGE_EVENT}, - vote_collection::{handle_vote, VoteCollectorsMap}, -}; - -/// Helper functions to handle proposal-related functionality. -pub(crate) mod handlers; - -/// The state for the consensus task. Contains all of the information for the implementation -/// of consensus -pub struct ConsensusTaskState, V: Versions> { - /// Our public key - pub public_key: TYPES::SignatureKey, - /// Our Private Key - pub private_key: ::PrivateKey, - /// Reference to consensus. The replica will require a write lock on this. - pub consensus: OuterConsensus, - /// Immutable instance state - pub instance_state: Arc, - /// View timeout from config. - pub timeout: u64, - /// Round start delay from config, in milliseconds. - pub round_start_delay: u64, - /// View number this view is executing in. - pub cur_view: TYPES::Time, - - /// Timestamp this view starts at. - pub cur_view_time: i64, - - /// The commitment to the current block payload and its metadata submitted to DA. - pub payload_commitment_and_metadata: Option>, - - /// The underlying network - pub network: Arc, - - /// Membership for Timeout votes/certs - pub timeout_membership: Arc, - - /// Membership for Quorum Certs/votes - pub quorum_membership: Arc, - - /// Membership for DA committee Votes/certs - pub da_membership: Arc, - - /// A map of `QuorumVote` collector tasks. - pub vote_collectors: VoteCollectorsMap, QuorumCertificate, V>, - - /// A map of `TimeoutVote` collector tasks. - pub timeout_vote_collectors: - VoteCollectorsMap, TimeoutCertificate, V>, - - /// timeout task handle - pub timeout_task: JoinHandle<()>, - - /// Spawned tasks related to a specific view, so we can cancel them when - /// they are stale - pub spawned_tasks: BTreeMap>>, - - /// The most recent upgrade certificate this node formed. - /// Note: this is ONLY for certificates that have been formed internally, - /// so that we can propose with them. - /// - /// Certificates received from other nodes will get reattached regardless of this fields, - /// since they will be present in the leaf we propose off of. - pub formed_upgrade_certificate: Option>, - - /// last View Sync Certificate or Timeout Certificate this node formed. - pub proposal_cert: Option>, - - /// Output events to application - pub output_event_stream: async_broadcast::Sender>, - - /// The most recent proposal we have, will correspond to the current view if Some() - /// Will be none if the view advanced through timeout/view_sync - pub current_proposal: Option>, - - // ED Should replace this with config information since we need it anyway - /// The node's id - pub id: u64, - - /// This node's storage ref - pub storage: Arc>, - - /// Lock for a decided upgrade - pub upgrade_lock: UpgradeLock, -} - -impl, V: Versions> ConsensusTaskState { - /// Cancel all tasks the consensus tasks has spawned before the given view - pub async fn cancel_tasks(&mut self, view: TYPES::Time) { - let keep = self.spawned_tasks.split_off(&view); - let mut cancel = Vec::new(); - while let Some((_, tasks)) = self.spawned_tasks.pop_first() { - let mut to_cancel = tasks.into_iter().map(cancel_task).collect(); - cancel.append(&mut to_cancel); - } - self.spawned_tasks = keep; - join_all(cancel).await; - } - - /// Validate the VID disperse is correctly signed and has the correct share. - fn validate_disperse( - &self, - sender: &TYPES::SignatureKey, - disperse: &Proposal>, - ) -> bool { - let view = disperse.data.view_number(); - let payload_commitment = disperse.data.payload_commitment; - - // Check sender of VID disperse share is signed by DA committee member - let validate_sender = sender.validate(&disperse.signature, payload_commitment.as_ref()) - && self.da_membership.committee_members(view).contains(sender); - - // Check whether the data satisfies one of the following. - // * From the right leader for this view. - // * Calculated and signed by the current node. - let validated = self - .public_key - .validate(&disperse.signature, payload_commitment.as_ref()) - || self - .quorum_membership - .leader(view) - .validate(&disperse.signature, payload_commitment.as_ref()); - if !validate_sender && !validated { - return false; - } - - // Validate the VID share. - // NOTE: `verify_share` returns a nested `Result`, so we must check both the inner - // and outer results - matches!( - vid_scheme(self.quorum_membership.total_nodes()).verify_share( - &disperse.data.share, - &disperse.data.common, - &payload_commitment, - ), - Ok(Ok(())) - ) - } - - /// Publishes a proposal - #[instrument(skip_all, target = "ConsensusTaskState", fields(id = self.id, view = *self.cur_view))] - async fn publish_proposal( - &mut self, - view: TYPES::Time, - event_sender: Sender>>, - event_receiver: Receiver>>, - ) -> Result<()> { - let create_and_send_proposal_handle = publish_proposal_from_commitment_and_metadata( - view, - event_sender, - event_receiver, - Arc::clone(&self.quorum_membership), - self.public_key.clone(), - self.private_key.clone(), - OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)), - self.round_start_delay, - self.formed_upgrade_certificate.clone(), - self.upgrade_lock.clone(), - self.payload_commitment_and_metadata.clone(), - self.proposal_cert.clone(), - Arc::clone(&self.instance_state), - self.id, - ) - .await?; - - self.spawned_tasks - .entry(view) - .or_default() - .push(create_and_send_proposal_handle); - - Ok(()) - } - - /// Spawn a vote task for the given view. Will try to vote - /// and emit a `QuorumVoteSend` event we should vote on the current proposal - #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), target = "ConsensusTaskState")] - async fn spawn_vote_task( - &mut self, - view: TYPES::Time, - event_sender: Sender>>, - event_receiver: Receiver>>, - ) { - let Some(proposal) = self.current_proposal.clone() else { - return; - }; - if proposal.view_number() != view { - return; - } - let upgrade = self.upgrade_lock.clone(); - let pub_key = self.public_key.clone(); - let priv_key = self.private_key.clone(); - let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)); - let storage = Arc::clone(&self.storage); - let quorum_mem = Arc::clone(&self.quorum_membership); - let da_mem = Arc::clone(&self.da_membership); - let instance_state = Arc::clone(&self.instance_state); - let id = self.id; - let handle = async_spawn(async move { - let upgrade_lock = upgrade.clone(); - update_state_and_vote_if_able::( - view, - proposal, - pub_key, - priv_key.clone(), - consensus, - storage, - quorum_mem, - instance_state, - VoteInfo { - private_key: priv_key, - upgrade_lock: upgrade, - da_membership: da_mem, - event_sender, - event_receiver, - }, - id, - &upgrade_lock, - ) - .await; - }); - self.spawned_tasks.entry(view).or_default().push(handle); - } - - /// Handles a consensus event received on the event stream - #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Consensus replica task", level = "error", target = "ConsensusTaskState")] - pub async fn handle( - &mut self, - event: Arc>, - event_sender: Sender>>, - event_receiver: Receiver>>, - ) { - match event.as_ref() { - HotShotEvent::QuorumProposalRecv(proposal, sender) => { - debug!("proposal recv view: {:?}", proposal.data.view_number()); - match handle_quorum_proposal_recv( - proposal, - sender, - event_sender.clone(), - event_receiver.clone(), - self, - ) - .await - { - Ok(Some(current_proposal)) => { - let view = current_proposal.view_number(); - self.current_proposal = Some(current_proposal); - self.spawn_vote_task(view, event_sender, event_receiver) - .await; - } - Ok(None) => {} - Err(e) => debug!("Failed to propose {e:#}"), - } - } - HotShotEvent::QuorumProposalValidated(proposal, _) => { - debug!("proposal validated view: {:?}", proposal.view_number()); - if let Err(e) = handle_quorum_proposal_validated( - proposal, - event_sender.clone(), - event_receiver.clone(), - self, - ) - .await - { - warn!("Failed to handle QuorumProposalValidated event {e:#}"); - } - } - HotShotEvent::QuorumVoteRecv(ref vote) => { - debug!("Received quorum vote: {:?}", vote.view_number()); - if self.quorum_membership.leader(vote.view_number() + 1) != self.public_key { - error!( - "We are not the leader for view {} are we the leader for view + 1? {}", - *vote.view_number() + 1, - self.quorum_membership.leader(vote.view_number() + 2) == self.public_key - ); - return; - } - - handle_vote( - &mut self.vote_collectors, - vote, - self.public_key.clone(), - &self.quorum_membership, - self.id, - &event, - &event_sender, - &self.upgrade_lock, - ) - .await; - } - HotShotEvent::TimeoutVoteRecv(ref vote) => { - if self.timeout_membership.leader(vote.view_number() + 1) != self.public_key { - error!( - "We are not the leader for view {} are we the leader for view + 1? {}", - *vote.view_number() + 1, - self.timeout_membership.leader(vote.view_number() + 2) == self.public_key - ); - return; - } - - handle_vote( - &mut self.timeout_vote_collectors, - vote, - self.public_key.clone(), - &self.quorum_membership, - self.id, - &event, - &event_sender, - &self.upgrade_lock, - ) - .await; - } - HotShotEvent::QcFormed(cert) => match cert { - either::Right(qc) => { - self.proposal_cert = Some(ViewChangeEvidence::Timeout(qc.clone())); - - debug!( - "Attempting to publish proposal after forming a TC for view {}", - *qc.view_number - ); - - if let Err(e) = self - .publish_proposal(qc.view_number + 1, event_sender, event_receiver) - .await - { - debug!("Failed to propose; error = {e:?}"); - }; - } - either::Left(qc) => { - if let Err(e) = self.storage.write().await.update_high_qc(qc.clone()).await { - error!("Failed to store High QC of QC we formed. Error: {:?}", e); - } - - if let Err(e) = self.consensus.write().await.update_high_qc(qc.clone()) { - tracing::trace!("{e:?}"); - } - debug!( - "Attempting to publish proposal after forming a QC for view {}", - *qc.view_number - ); - - if let Err(e) = self - .publish_proposal(qc.view_number + 1, event_sender, event_receiver) - .await - { - debug!("Failed to propose; error = {e:?}"); - }; - } - }, - #[cfg(not(feature = "dependency-tasks"))] - HotShotEvent::UpgradeCertificateFormed(cert) => { - debug!( - "Upgrade certificate received for view {}!", - *cert.view_number - ); - - // Update our current upgrade_cert as long as we still have a chance of reaching a decide on it in time. - if cert.data.decide_by >= self.cur_view + 3 { - debug!("Updating current formed_upgrade_certificate"); - - self.formed_upgrade_certificate = Some(cert.clone()); - } - } - HotShotEvent::DaCertificateRecv(cert) => { - debug!("DAC Received for view {}!", *cert.view_number); - let view = cert.view_number; - - self.consensus - .write() - .await - .update_saved_da_certs(view, cert.clone()); - let Some(proposal) = self.current_proposal.clone() else { - return; - }; - if proposal.view_number() != view { - return; - } - self.spawn_vote_task(view, event_sender, event_receiver) - .await; - } - HotShotEvent::VidShareRecv(sender, disperse) => { - let view = disperse.data.view_number(); - - debug!( - "VID disperse received for view: {:?} in consensus task", - view - ); - - // Allow VID disperse date that is one view older, in case we have updated the - // view. - // Adding `+ 1` on the LHS rather than `- 1` on the RHS, to avoid the overflow - // error due to subtracting the genesis view number. - if view + 1 < self.cur_view { - info!("Throwing away VID disperse data that is more than one view older"); - return; - } - - debug!("VID disperse data is not more than one view older."); - - if !self.validate_disperse(sender, disperse) { - warn!("Failed to validated the VID dispersal/share sig."); - return; - } - - self.consensus - .write() - .await - .update_vid_shares(view, disperse.clone()); - if disperse.data.recipient_key != self.public_key { - return; - } - let Some(proposal) = self.current_proposal.clone() else { - return; - }; - if proposal.view_number() != view { - return; - } - self.spawn_vote_task(view, event_sender.clone(), event_receiver.clone()) - .await; - } - HotShotEvent::ViewChange(new_view) => { - let new_view = *new_view; - tracing::trace!("View Change event for view {} in consensus task", *new_view); - - let old_view_number = self.cur_view; - - // If we have a decided upgrade certificate, the protocol version may also have - // been upgraded. - if let Some(cert) = self - .upgrade_lock - .decided_upgrade_certificate - .read() - .await - .clone() - { - if new_view == cert.data.new_version_first_view { - error!( - "Version upgraded based on a decided upgrade cert: {:?}", - cert - ); - } - } - - if let Some(commitment_and_metadata) = &self.payload_commitment_and_metadata { - if commitment_and_metadata.block_view < old_view_number { - self.payload_commitment_and_metadata = None; - } - } - - // update the view in state to the one in the message - // Publish a view change event to the application - // Returns if the view does not need updating. - if let Err(e) = update_view::( - new_view, - &event_sender, - self.timeout, - OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)), - &mut self.cur_view, - &mut self.cur_view_time, - &mut self.timeout_task, - &self.output_event_stream, - DONT_SEND_VIEW_CHANGE_EVENT, - self.quorum_membership.leader(old_view_number) == self.public_key, - ) - .await - { - tracing::trace!("Failed to update view; error = {e}"); - return; - } - } - HotShotEvent::Timeout(view) => { - let view = *view; - // NOTE: We may optionally have the timeout task listen for view change events - if self.cur_view >= view { - return; - } - if !self.timeout_membership.has_stake(&self.public_key) { - debug!( - "We were not chosen for consensus committee on {:?}", - self.cur_view - ); - return; - } - - let Ok(vote) = TimeoutVote::create_signed_vote( - TimeoutData { view }, - view, - &self.public_key, - &self.private_key, - &self.upgrade_lock, - ) - .await - else { - error!("Failed to sign TimeoutData!"); - return; - }; - - broadcast_event(Arc::new(HotShotEvent::TimeoutVoteSend(vote)), &event_sender).await; - broadcast_event( - Event { - view_number: view, - event: EventType::ViewTimeout { view_number: view }, - }, - &self.output_event_stream, - ) - .await; - debug!( - "We did not receive evidence for view {} in time, sending timeout vote for that view!", - *view - ); - - broadcast_event( - Event { - view_number: view, - event: EventType::ReplicaViewTimeout { view_number: view }, - }, - &self.output_event_stream, - ) - .await; - let consensus = self.consensus.read().await; - consensus.metrics.number_of_timeouts.add(1); - if self.quorum_membership.leader(view) == self.public_key { - consensus.metrics.number_of_timeouts_as_leader.add(1); - } - } - HotShotEvent::SendPayloadCommitmentAndMetadata( - payload_commitment, - builder_commitment, - metadata, - view, - fees, - auction_result, - ) => { - let view = *view; - debug!( - "got commit and meta {:?}, view {:?}", - payload_commitment, view - ); - self.payload_commitment_and_metadata = Some(CommitmentAndMetadata { - commitment: *payload_commitment, - builder_commitment: builder_commitment.clone(), - metadata: metadata.clone(), - fees: fees.clone(), - block_view: view, - auction_result: auction_result.clone(), - }); - if self.quorum_membership.leader(view) == self.public_key - && self.consensus.read().await.high_qc().view_number() + 1 == view - { - if let Err(e) = self - .publish_proposal(view, event_sender.clone(), event_receiver.clone()) - .await - { - error!("Failed to propose; error = {e:?}"); - }; - } - - if let Some(cert) = &self.proposal_cert { - if !cert.is_valid_for_view(&view) { - self.proposal_cert = None; - info!("Failed to propose off SendPayloadCommitmentAndMetadata because we had view change evidence, but it was not current."); - return; - } - match cert { - ViewChangeEvidence::Timeout(tc) => { - if self.quorum_membership.leader(tc.view_number() + 1) - == self.public_key - { - if let Err(e) = self - .publish_proposal(view, event_sender, event_receiver) - .await - { - debug!("Failed to propose; error = {e:?}"); - }; - } - } - ViewChangeEvidence::ViewSync(vsc) => { - if self.quorum_membership.leader(vsc.view_number()) == self.public_key { - if let Err(e) = self - .publish_proposal(view, event_sender, event_receiver) - .await - { - debug!("Failed to propose; error = {e:?}"); - }; - } - } - } - } - } - HotShotEvent::ViewSyncFinalizeCertificate2Recv(certificate) => { - if !certificate - .is_valid_cert(self.quorum_membership.as_ref(), &self.upgrade_lock) - .await - { - error!( - "View Sync Finalize certificate {:?} was invalid", - certificate.date() - ); - return; - } - - let view = certificate.view_number; - - if self.quorum_membership.leader(view) == self.public_key { - self.proposal_cert = Some(ViewChangeEvidence::ViewSync(certificate.clone())); - - debug!( - "Attempting to publish proposal after forming a View Sync Finalized Cert for view {}", - *certificate.view_number - ); - - if let Err(e) = self - .publish_proposal(view, event_sender, event_receiver) - .await - { - debug!("Failed to propose; error = {e:?}"); - }; - } - } - HotShotEvent::QuorumVoteSend(vote) => { - let Some(proposal) = self.current_proposal.clone() else { - return; - }; - let new_view = proposal.view_number() + 1; - // In future we can use the mempool model where we fetch the proposal if we don't have it, instead of having to wait for it here - // This is for the case where we form a QC but have not yet seen the previous proposal ourselves - let should_propose = self.quorum_membership.leader(new_view) == self.public_key - && self.consensus.read().await.high_qc().view_number == proposal.view_number(); - - if should_propose { - debug!( - "Attempting to publish proposal after voting; now in view: {}", - *new_view - ); - if let Err(e) = self - .publish_proposal(new_view, event_sender.clone(), event_receiver.clone()) - .await - { - debug!("failed to propose e = {:?}", e); - } - } - if proposal.view_number() <= vote.view_number() { - self.current_proposal = None; - } - } - HotShotEvent::QuorumProposalSend(proposal, _) => { - if self - .payload_commitment_and_metadata - .as_ref() - .is_some_and(|p| p.block_view <= proposal.data.view_number()) - { - self.payload_commitment_and_metadata = None; - } - if let Some(cert) = &self.proposal_cert { - let view = match cert { - ViewChangeEvidence::Timeout(tc) => tc.view_number() + 1, - ViewChangeEvidence::ViewSync(vsc) => vsc.view_number(), - }; - if view < proposal.data.view_number() { - self.proposal_cert = None; - } - } - } - _ => {} - } - } -} - -#[async_trait] -impl, V: Versions> TaskState - for ConsensusTaskState -{ - type Event = HotShotEvent; - - async fn handle_event( - &mut self, - event: Arc, - sender: &Sender>, - receiver: &Receiver>, - ) -> Result<()> { - self.handle(event, sender.clone(), receiver.clone()).await; - - Ok(()) - } - - async fn cancel_subtasks(&mut self) { - while !self.spawned_tasks.is_empty() { - let Some((_, handles)) = self.spawned_tasks.pop_first() else { - break; - }; - - for handle in handles { - #[cfg(async_executor_impl = "async-std")] - handle.cancel().await; - #[cfg(async_executor_impl = "tokio")] - handle.abort(); - } - } - } -} diff --git a/crates/task-impls/src/helpers.rs b/crates/task-impls/src/helpers.rs index fa0156c424..6390c360aa 100644 --- a/crates/task-impls/src/helpers.rs +++ b/crates/task-impls/src/helpers.rs @@ -660,12 +660,6 @@ pub async fn validate_proposal_view_and_certs( Ok(()) } -/// Constant which tells [`update_view`] to send a view change event when called. -pub(crate) const SEND_VIEW_CHANGE_EVENT: bool = true; - -/// Constant which tells `update_view` to not send a view change event when called. -pub const DONT_SEND_VIEW_CHANGE_EVENT: bool = false; - /// Update the view if it actually changed, takes a mutable reference to the `cur_view` and the /// `timeout_task` which are updated during the operation of the function. /// @@ -682,7 +676,6 @@ pub(crate) async fn update_view( cur_view_time: &mut i64, timeout_task: &mut JoinHandle<()>, output_event_stream: &Sender>, - send_view_change_event: bool, is_old_view_leader: bool, ) -> Result<()> { ensure!( @@ -703,20 +696,18 @@ pub(crate) async fn update_view( // The next view is just the current view + 1 let next_view = *cur_view + 1; - if send_view_change_event { - futures::join! { - broadcast_event(Arc::new(HotShotEvent::ViewChange(new_view)), event_stream), - broadcast_event( - Event { + futures::join! { + broadcast_event(Arc::new(HotShotEvent::ViewChange(new_view)), event_stream), + broadcast_event( + Event { + view_number: old_view, + event: EventType::ViewFinished { view_number: old_view, - event: EventType::ViewFinished { - view_number: old_view, - }, }, - output_event_stream, - ) - }; - } + }, + output_event_stream, + ) + }; // Spawn a timeout task if we did actually update view let new_timeout_task = async_spawn({ diff --git a/crates/task-impls/src/lib.rs b/crates/task-impls/src/lib.rs index ed3dc5a0ee..754d2a972e 100644 --- a/crates/task-impls/src/lib.rs +++ b/crates/task-impls/src/lib.rs @@ -7,9 +7,6 @@ //! The consensus layer for hotshot. This currently implements sequencing //! consensus in an event driven way -/// the task which implements the main parts of consensus -pub mod consensus; - /// The task which implements the core state logic of consensus. pub mod consensus2; diff --git a/crates/task-impls/src/quorum_proposal_recv/handlers.rs b/crates/task-impls/src/quorum_proposal_recv/handlers.rs index 7145c3bde8..37b7ba5d12 100644 --- a/crates/task-impls/src/quorum_proposal_recv/handlers.rs +++ b/crates/task-impls/src/quorum_proposal_recv/handlers.rs @@ -33,7 +33,7 @@ use crate::{ events::HotShotEvent, helpers::{ broadcast_event, fetch_proposal, update_view, validate_proposal_safety_and_liveness, - validate_proposal_view_and_certs, SEND_VIEW_CHANGE_EVENT, + validate_proposal_view_and_certs, }, quorum_proposal_recv::{UpgradeLock, Versions}, }; @@ -103,7 +103,6 @@ async fn validate_proposal_liveness, V: Versions> event_sender: Sender>>, event_receiver: Receiver>>, ) { - #[cfg(feature = "dependency-tasks")] if let HotShotEvent::QuorumProposalRecv(proposal, sender) = event.as_ref() { match handle_quorum_proposal_recv( proposal, diff --git a/crates/testing/src/predicates/event.rs b/crates/testing/src/predicates/event.rs index 22be12227d..9c0b4d9960 100644 --- a/crates/testing/src/predicates/event.rs +++ b/crates/testing/src/predicates/event.rs @@ -272,3 +272,58 @@ where }); Box::new(EventPredicate { check, info }) } + +pub fn vid_share_validated() -> Box> +where + TYPES: NodeType, +{ + let info = "VidShareValidated".to_string(); + let check: EventCallback = Arc::new(move |e: Arc>| { + matches!(e.as_ref(), VidShareValidated(..)) + }); + Box::new(EventPredicate { check, info }) +} + +pub fn da_certificate_validated() -> Box> +where + TYPES: NodeType, +{ + let info = "DaCertificateValidated".to_string(); + let check: EventCallback = Arc::new(move |e: Arc>| { + matches!(e.as_ref(), DaCertificateValidated(..)) + }); + Box::new(EventPredicate { check, info }) +} + +pub fn quorum_proposal_preliminarily_validated() -> Box> +where + TYPES: NodeType, +{ + let info = "QuorumProposalPreliminarilyValidated".to_string(); + let check: EventCallback = Arc::new(move |e: Arc>| { + matches!(e.as_ref(), QuorumProposalPreliminarilyValidated(..)) + }); + Box::new(EventPredicate { check, info }) +} + +pub fn high_qc_updated() -> Box> +where + TYPES: NodeType, +{ + let info = "HighQcUpdated".to_string(); + let check: EventCallback = Arc::new(move |e: Arc>| { + matches!(e.as_ref(), HighQcUpdated(..)) + }); + Box::new(EventPredicate { check, info }) +} + +pub fn quorum_vote_dependencies_validated() -> Box> +where + TYPES: NodeType, +{ + let info = "QuorumVoteDependenciesValidated".to_string(); + let check: EventCallback = Arc::new(move |e: Arc>| { + matches!(e.as_ref(), QuorumVoteDependenciesValidated(..)) + }); + Box::new(EventPredicate { check, info }) +} diff --git a/crates/testing/src/predicates/mod.rs b/crates/testing/src/predicates/mod.rs index c4c05e7f11..0c3d344ece 100644 --- a/crates/testing/src/predicates/mod.rs +++ b/crates/testing/src/predicates/mod.rs @@ -5,11 +5,7 @@ // along with the HotShot repository. If not, see . pub mod event; -#[cfg(not(feature = "dependency-tasks"))] -pub mod upgrade_with_consensus; -#[cfg(feature = "dependency-tasks")] pub mod upgrade_with_proposal; -#[cfg(feature = "dependency-tasks")] pub mod upgrade_with_vote; use async_trait::async_trait; diff --git a/crates/testing/src/predicates/upgrade_with_consensus.rs b/crates/testing/src/predicates/upgrade_with_consensus.rs deleted file mode 100644 index 7a63edd583..0000000000 --- a/crates/testing/src/predicates/upgrade_with_consensus.rs +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright (c) 2021-2024 Espresso Systems (espressosys.com) -// This file is part of the HotShot repository. - -// You should have received a copy of the MIT License -// along with the HotShot repository. If not, see . - -#![cfg(not(feature = "dependency-tasks"))] - -use std::sync::Arc; - -use async_trait::async_trait; -use hotshot_example_types::node_types::{MemoryImpl, TestTypes, TestVersions}; -use hotshot_task_impls::consensus::ConsensusTaskState; -use hotshot_types::simple_certificate::UpgradeCertificate; - -use crate::predicates::{Predicate, PredicateResult}; - -type ConsensusTaskTestState = ConsensusTaskState; - -type UpgradeCertCallback = - Arc>>) -> bool + Send + Sync>; - -pub struct UpgradeCertPredicate { - check: UpgradeCertCallback, - info: String, -} - -impl std::fmt::Debug for UpgradeCertPredicate { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.info) - } -} - -#[async_trait] -impl Predicate for UpgradeCertPredicate { - async fn evaluate(&self, input: &ConsensusTaskTestState) -> PredicateResult { - let upgrade_cert = input - .upgrade_lock - .decided_upgrade_certificate - .read() - .await - .clone(); - PredicateResult::from((self.check)(upgrade_cert.into())) - } - - async fn info(&self) -> String { - self.info.clone() - } -} - -pub fn no_decided_upgrade_certificate() -> Box { - let info = "expected decided_upgrade_certificate to be None".to_string(); - let check: UpgradeCertCallback = Arc::new(move |s| s.is_none()); - Box::new(UpgradeCertPredicate { info, check }) -} - -pub fn decided_upgrade_certificate() -> Box { - let info = "expected decided_upgrade_certificate to be Some(_)".to_string(); - let check: UpgradeCertCallback = Arc::new(move |s| s.is_some()); - Box::new(UpgradeCertPredicate { info, check }) -} diff --git a/crates/testing/src/view_generator.rs b/crates/testing/src/view_generator.rs index 315ac2d5e8..e574859669 100644 --- a/crates/testing/src/view_generator.rs +++ b/crates/testing/src/view_generator.rs @@ -63,7 +63,7 @@ pub struct TestView { pub da_certificate: DaCertificate, pub transactions: Vec, upgrade_data: Option>, - formed_upgrade_certificate: Option>, + pub formed_upgrade_certificate: Option>, view_sync_finalize_data: Option>, timeout_cert_data: Option>, upgrade_lock: UpgradeLock, diff --git a/crates/testing/tests/tests_1/consensus_task.rs b/crates/testing/tests/tests_1/consensus_task.rs deleted file mode 100644 index d5e39eab51..0000000000 --- a/crates/testing/tests/tests_1/consensus_task.rs +++ /dev/null @@ -1,698 +0,0 @@ -// Copyright (c) 2021-2024 Espresso Systems (espressosys.com) -// This file is part of the HotShot repository. - -// You should have received a copy of the MIT License -// along with the HotShot repository. If not, see . - -#![cfg(not(feature = "dependency-tasks"))] -// TODO: Remove after integration of dependency-tasks -#![allow(unused_imports)] - -use std::{sync::Arc, time::Duration}; - -use futures::StreamExt; -use hotshot::tasks::task_state::CreateTaskState; -use hotshot_example_types::{ - block_types::TestMetadata, - node_types::{MemoryImpl, TestTypes, TestVersions}, - state_types::TestInstanceState, -}; -use hotshot_macros::{run_test, test_scripts}; -use hotshot_task_impls::{consensus::ConsensusTaskState, events::HotShotEvent::*}; -use hotshot_testing::{ - all_predicates, - helpers::{ - build_fake_view_with_leaf, build_system_handle, key_pair_for_id, - permute_input_with_index_order, vid_scheme_from_view_number, vid_share, - }, - predicates::event::{ - all_predicates, exact, quorum_proposal_send, quorum_proposal_validated, quorum_vote_send, - timeout_vote_send, validated_state_updated, - }, - random, - script::{Expectations, InputOrder, TaskScript}, - serial, - view_generator::TestViewGenerator, -}; -use hotshot_types::{ - data::{null_block, ViewChangeEvidence, ViewNumber}, - simple_vote::{TimeoutData, TimeoutVote, ViewSyncFinalizeData}, - traits::{ - election::Membership, - node_implementation::{ConsensusTime, Versions}, - }, - utils::BuilderCommitment, - vote::HasViewNumber, -}; -use jf_vid::VidScheme; -use sha2::Digest; -use vec1::vec1; - -const TIMEOUT: Duration = Duration::from_millis(35); - -#[cfg(test)] -#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] -#[cfg_attr(async_executor_impl = "async-std", async_std::test)] -async fn test_consensus_task() { - use vbs::version::StaticVersionType; - - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - - let handle = build_system_handle::(2) - .await - .0; - let quorum_membership = handle.hotshot.memberships.quorum_membership.clone(); - let da_membership = handle.hotshot.memberships.da_membership.clone(); - - // Make some empty encoded transactions, we just care about having a commitment handy for the - // later calls. We need the VID commitment to be able to propose later. - let mut vid = vid_scheme_from_view_number::(&quorum_membership, ViewNumber::new(2)); - let encoded_transactions = Vec::new(); - let vid_disperse = vid.disperse(&encoded_transactions).unwrap(); - let payload_commitment = vid_disperse.commit; - - let mut generator = - TestViewGenerator::generate(quorum_membership.clone(), da_membership.clone()); - - let mut proposals = Vec::new(); - let mut leaders = Vec::new(); - let mut leaves = Vec::new(); - let mut votes = Vec::new(); - let mut dacs = Vec::new(); - let mut vids = Vec::new(); - for view in (&mut generator).take(2).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - leaders.push(view.leader_public_key); - leaves.push(view.leaf.clone()); - votes.push(view.create_quorum_vote(&handle).await); - dacs.push(view.da_certificate.clone()); - vids.push(view.vid_proposal.clone()); - } - - let cert = proposals[1].data.justify_qc.clone(); - let builder_commitment = BuilderCommitment::from_raw_digest(sha2::Sha256::new().finalize()); - - let inputs = vec![ - random![ - QuorumProposalRecv(proposals[0].clone(), leaders[0]), - DaCertificateRecv(dacs[0].clone()), - VidShareRecv(leaders[0], vid_share(&vids[0].0, handle.public_key())), - ], - serial![ - VidShareRecv(leaders[0], vid_share(&vids[1].0, handle.public_key())), - QuorumProposalRecv(proposals[1].clone(), leaders[1]), - QcFormed(either::Left(cert)), - SendPayloadCommitmentAndMetadata( - payload_commitment, - builder_commitment, - TestMetadata { - num_transactions: 0, - }, - ViewNumber::new(2), - vec1![null_block::builder_fee::( - quorum_membership.total_nodes(), - ::Base::VERSION, - ) - .unwrap()], - None, - ), - ], - ]; - - let expectations = vec![ - Expectations::from_outputs(all_predicates![ - validated_state_updated(), - exact(ViewChange(ViewNumber::new(1))), - quorum_proposal_validated(), - exact(QuorumVoteSend(votes[0].clone())), - ]), - Expectations::from_outputs(all_predicates![ - validated_state_updated(), - exact(ViewChange(ViewNumber::new(2))), - quorum_proposal_validated(), - quorum_proposal_send(), - ]), - ]; - - let consensus_state = - ConsensusTaskState::::create_from(&handle).await; - let mut consensus_script = TaskScript { - timeout: TIMEOUT, - state: consensus_state, - expectations, - }; - - run_test![inputs, consensus_script].await; -} - -#[cfg(test)] -#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] -#[cfg_attr(async_executor_impl = "async-std", async_std::test)] -async fn test_consensus_vote() { - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - - let handle = build_system_handle::(2) - .await - .0; - let quorum_membership = handle.hotshot.memberships.quorum_membership.clone(); - let da_membership = handle.hotshot.memberships.da_membership.clone(); - - let mut generator = - TestViewGenerator::generate(quorum_membership.clone(), da_membership.clone()); - - let mut proposals = Vec::new(); - let mut leaders = Vec::new(); - let mut leaves = Vec::new(); - let mut votes = Vec::new(); - let mut dacs = Vec::new(); - let mut vids = Vec::new(); - for view in (&mut generator).take(2).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - leaders.push(view.leader_public_key); - leaves.push(view.leaf.clone()); - votes.push(view.create_quorum_vote(&handle).await); - dacs.push(view.da_certificate.clone()); - vids.push(view.vid_proposal.clone()); - } - - // Send a proposal, vote on said proposal, update view based on proposal QC, receive vote as next leader - let inputs = vec![random![ - QuorumProposalRecv(proposals[0].clone(), leaders[0]), - DaCertificateRecv(dacs[0].clone()), - VidShareRecv(leaders[0], vid_share(&vids[0].0, handle.public_key())), - QuorumVoteRecv(votes[0].clone()), - ]]; - - let expectations = vec![Expectations::from_outputs(all_predicates![ - validated_state_updated(), - exact(ViewChange(ViewNumber::new(1))), - quorum_proposal_validated(), - exact(QuorumVoteSend(votes[0].clone())), - ])]; - - let consensus_state = - ConsensusTaskState::::create_from(&handle).await; - let mut consensus_script = TaskScript { - timeout: TIMEOUT, - state: consensus_state, - expectations, - }; - - run_test![inputs, consensus_script].await; -} - -#[cfg(test)] -#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] -#[cfg_attr(async_executor_impl = "async-std", async_std::test)] -async fn test_view_sync_finalize_propose() { - use hotshot_example_types::{block_types::TestMetadata, state_types::TestValidatedState}; - use hotshot_types::data::null_block; - use vbs::version::StaticVersionType; - - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - - let handle = build_system_handle::(4) - .await - .0; - let (priv_key, pub_key) = key_pair_for_id::(4); - let quorum_membership = handle.hotshot.memberships.quorum_membership.clone(); - let da_membership = handle.hotshot.memberships.da_membership.clone(); - - // Make some empty encoded transactions, we just care about having a commitment handy for the - // later calls. We need the VID commitment to be able to propose later. - let mut vid = vid_scheme_from_view_number::(&quorum_membership, ViewNumber::new(4)); - let encoded_transactions = Vec::new(); - let vid_disperse = vid.disperse(&encoded_transactions).unwrap(); - let payload_commitment = vid_disperse.commit; - - let view_sync_finalize_data: ViewSyncFinalizeData = ViewSyncFinalizeData { - relay: 4, - round: ViewNumber::new(4), - }; - - let mut generator = - TestViewGenerator::generate(quorum_membership.clone(), da_membership.clone()); - let mut proposals = Vec::new(); - let mut leaders = Vec::new(); - let mut votes = Vec::new(); - let mut vids = Vec::new(); - let mut dacs = Vec::new(); - - generator.next().await; - let view = generator.current_view.clone().unwrap(); - proposals.push(view.quorum_proposal.clone()); - leaders.push(view.leader_public_key); - votes.push(view.create_quorum_vote(&handle).await); - vids.push(view.vid_proposal.clone()); - dacs.push(view.da_certificate.clone()); - - // Skip two views - generator.advance_view_number_by(2); - - // Initiate a view sync finalize - generator.add_view_sync_finalize(view_sync_finalize_data); - - // Build the next proposal from view 1 - generator.next_from_anscestor_view(view.clone()).await; - let view = generator.current_view.unwrap(); - proposals.push(view.quorum_proposal.clone()); - leaders.push(view.leader_public_key); - votes.push(view.create_quorum_vote(&handle).await); - vids.push(view.vid_proposal); - - // Handle the view sync finalize cert, get the requisite data, propose. - let cert = match proposals[1].data.proposal_certificate.clone().unwrap() { - ViewChangeEvidence::ViewSync(vsc) => vsc, - _ => panic!("Found a TC when there should have been a view sync cert"), - }; - - let builder_commitment = BuilderCommitment::from_raw_digest(sha2::Sha256::new().finalize()); - let timeout_vote_view_2 = TimeoutVote::create_signed_vote( - TimeoutData { - view: ViewNumber::new(2), - }, - ViewNumber::new(2), - &pub_key, - &priv_key, - &handle.hotshot.upgrade_lock, - ) - .await - .unwrap(); - - let timeout_vote_view_3 = TimeoutVote::create_signed_vote( - TimeoutData { - view: ViewNumber::new(3), - }, - ViewNumber::new(3), - &pub_key, - &priv_key, - &handle.hotshot.upgrade_lock, - ) - .await - .unwrap(); - - let inputs = vec![ - serial![VidShareRecv( - leaders[0], - vid_share(&vids[0].0, handle.public_key()) - )], - random![ - QuorumProposalRecv(proposals[0].clone(), leaders[0]), - DaCertificateRecv(dacs[0].clone()), - ], - serial![Timeout(ViewNumber::new(2)), Timeout(ViewNumber::new(3))], - serial![VidShareRecv( - leaders[0], - vid_share(&vids[1].0, handle.public_key()) - )], - random![ - QuorumProposalRecv(proposals[1].clone(), leaders[1]), - TimeoutVoteRecv(timeout_vote_view_2), - TimeoutVoteRecv(timeout_vote_view_3), - ViewSyncFinalizeCertificate2Recv(cert), - SendPayloadCommitmentAndMetadata( - payload_commitment, - builder_commitment, - TestMetadata { - num_transactions: 0, - }, - ViewNumber::new(4), - vec1![null_block::builder_fee::( - 4, - ::Base::VERSION - ) - .unwrap()], - None, - ), - ], - ]; - - let expectations = vec![ - Expectations::from_outputs(vec![]), - Expectations::from_outputs(all_predicates![ - validated_state_updated(), - exact(ViewChange(ViewNumber::new(1))), - quorum_proposal_validated(), - exact(QuorumVoteSend(votes[0].clone())), - ]), - Expectations::from_outputs(vec![timeout_vote_send(), timeout_vote_send()]), - Expectations::from_outputs(vec![]), - Expectations::from_outputs(all_predicates![ - validated_state_updated(), - exact(ViewChange(ViewNumber::new(4))), - quorum_proposal_validated(), - quorum_proposal_send(), - ]), - ]; - - let consensus_state = - ConsensusTaskState::::create_from(&handle).await; - let mut consensus_script = TaskScript { - timeout: TIMEOUT, - state: consensus_state, - expectations, - }; - - run_test![inputs, consensus_script].await; -} - -#[cfg(test)] -#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] -#[cfg_attr(async_executor_impl = "async-std", async_std::test)] -/// Makes sure that, when a valid ViewSyncFinalize certificate is available, the consensus task -/// will indeed vote if the cert is valid and matches the correct view number. -async fn test_view_sync_finalize_vote() { - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - - let handle = build_system_handle::(5) - .await - .0; - let quorum_membership = handle.hotshot.memberships.quorum_membership.clone(); - let da_membership = handle.hotshot.memberships.da_membership.clone(); - - let view_sync_finalize_data: ViewSyncFinalizeData = ViewSyncFinalizeData { - relay: 4, - round: ViewNumber::new(5), - }; - - let mut generator = - TestViewGenerator::generate(quorum_membership.clone(), da_membership.clone()); - let mut proposals = Vec::new(); - let mut leaders = Vec::new(); - let mut votes = Vec::new(); - let mut vids = Vec::new(); - let mut dacs = Vec::new(); - for view in (&mut generator).take(3).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - leaders.push(view.leader_public_key); - votes.push(view.create_quorum_vote(&handle).await); - vids.push(view.vid_proposal.clone()); - dacs.push(view.da_certificate.clone()); - } - - // Each call to `take` moves us to the next generated view. We advance to view - // 3 and then add the finalize cert for checking there. - generator.add_view_sync_finalize(view_sync_finalize_data); - for view in (&mut generator).take(1).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - leaders.push(view.leader_public_key); - votes.push(view.create_quorum_vote(&handle).await); - vids.push(view.vid_proposal.clone()); - dacs.push(view.da_certificate.clone()); - } - - // When we're on the latest view. We want to set the quorum - // certificate to be the previous highest QC (before the timeouts). This will be distinct from - // the view sync cert, which is saying "hey, I'm _actually_ at view 4, but my highest QC is - // only for view 1." This forces the QC to be for view 1, and we can move on under this - // assumption. - - // Try to view sync at view 4. - let cert = match proposals[3].data.proposal_certificate.clone().unwrap() { - ViewChangeEvidence::ViewSync(vsc) => vsc, - _ => panic!("Found a TC when there should have been a view sync cert"), - }; - - let inputs = vec![ - serial![VidShareRecv( - leaders[0], - vid_share(&vids[0].0, handle.public_key()) - )], - random![ - QuorumProposalRecv(proposals[0].clone(), leaders[0]), - DaCertificateRecv(dacs[0].clone()), - ], - serial![Timeout(ViewNumber::new(2)), Timeout(ViewNumber::new(3))], - random![ - QuorumProposalRecv(proposals[0].clone(), leaders[0]), - ViewSyncFinalizeCertificate2Recv(cert), - ], - ]; - - let expectations = vec![ - Expectations::from_outputs(vec![]), - Expectations::from_outputs(all_predicates![ - validated_state_updated(), - exact(ViewChange(ViewNumber::new(1))), - quorum_proposal_validated(), - exact(QuorumVoteSend(votes[0].clone())) - ]), - Expectations::from_outputs(vec![timeout_vote_send(), timeout_vote_send()]), - Expectations::from_outputs(all_predicates![ - validated_state_updated(), - quorum_proposal_validated(), - quorum_vote_send() - ]), - ]; - - let consensus_state = - ConsensusTaskState::::create_from(&handle).await; - let mut consensus_script = TaskScript { - timeout: TIMEOUT, - state: consensus_state, - expectations, - }; - - run_test![inputs, consensus_script].await; -} - -#[cfg(test)] -#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] -#[cfg_attr(async_executor_impl = "async-std", async_std::test)] -/// Makes sure that, when a valid ViewSyncFinalize certificate is available, the consensus task -/// will NOT vote when the certificate matches a different view number. -async fn test_view_sync_finalize_vote_fail_view_number() { - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - - let handle = build_system_handle::(5) - .await - .0; - let quorum_membership = handle.hotshot.memberships.quorum_membership.clone(); - let da_membership = handle.hotshot.memberships.da_membership.clone(); - - let view_sync_finalize_data: ViewSyncFinalizeData = ViewSyncFinalizeData { - relay: 4, - round: ViewNumber::new(10), - }; - - let mut generator = - TestViewGenerator::generate(quorum_membership.clone(), da_membership.clone()); - let mut proposals = Vec::new(); - let mut leaders = Vec::new(); - let mut votes = Vec::new(); - let mut vids = Vec::new(); - let mut dacs = Vec::new(); - for view in (&mut generator).take(3).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - leaders.push(view.leader_public_key); - votes.push(view.create_quorum_vote(&handle).await); - vids.push(view.vid_proposal.clone()); - dacs.push(view.da_certificate.clone()); - } - - // Each call to `take` moves us to the next generated view. We advance to view - // 3 and then add the finalize cert for checking there. - generator.add_view_sync_finalize(view_sync_finalize_data); - for view in (&mut generator).take(1).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - leaders.push(view.leader_public_key); - votes.push(view.create_quorum_vote(&handle).await); - vids.push(view.vid_proposal.clone()); - dacs.push(view.da_certificate.clone()); - } - - // When we're on the latest view. We want to set the quorum - // certificate to be the previous highest QC (before the timeouts). This will be distinct from - // the view sync cert, which is saying "hey, I'm _actually_ at view 4, but my highest QC is - // only for view 1." This forces the QC to be for view 1, and we can move on under this - // assumption. - - let mut cert = match proposals[3].data.proposal_certificate.clone().unwrap() { - ViewChangeEvidence::ViewSync(vsc) => vsc, - _ => panic!("Found a TC when there should have been a view sync cert"), - }; - - // Force this to fail by making the cert happen for a view we've never seen. This will - // intentionally skip the proposal for this node so we can get the proposal and fail to vote. - cert.view_number = ViewNumber::new(10); - - // Get a good proposal first. - let good_proposal = proposals[0].clone(); - - // Now We introduce an error by setting a different view number as well, this makes the task check - // for a view sync or timeout cert. This value could be anything as long as it is not the - // previous view number. - proposals[0].data.justify_qc.view_number = proposals[3].data.justify_qc.view_number; - - let inputs = vec![ - random![ - QuorumProposalRecv(good_proposal, leaders[0]), - DaCertificateRecv(dacs[0].clone()), - ], - serial![VidShareRecv( - leaders[0], - vid_share(&vids[0].0, handle.public_key()) - )], - serial![Timeout(ViewNumber::new(2)), Timeout(ViewNumber::new(3))], - random![ - ViewSyncFinalizeCertificate2Recv(cert), - QuorumProposalRecv(proposals[0].clone(), leaders[0]), - ], - ]; - - let expectations = vec![ - Expectations::from_outputs(all_predicates![ - quorum_proposal_validated(), - validated_state_updated(), - exact(ViewChange(ViewNumber::new(1))), - ]), - Expectations::from_outputs(vec![exact(QuorumVoteSend(votes[0].clone()))]), - Expectations::from_outputs(vec![timeout_vote_send(), timeout_vote_send()]), - // We get no output here due to the invalid view number. - Expectations::from_outputs(vec![]), - ]; - - let consensus_state = - ConsensusTaskState::::create_from(&handle).await; - let mut consensus_script = TaskScript { - timeout: TIMEOUT, - state: consensus_state, - expectations, - }; - - run_test![inputs, consensus_script].await; -} - -#[cfg(test)] -#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] -#[cfg_attr(async_executor_impl = "async-std", async_std::test)] -async fn test_vid_disperse_storage_failure() { - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - - let handle = build_system_handle::(2) - .await - .0; - - // Set the error flag here for the system handle. This causes it to emit an error on append. - handle.storage().write().await.should_return_err = true; - let quorum_membership = handle.hotshot.memberships.quorum_membership.clone(); - let da_membership = handle.hotshot.memberships.da_membership.clone(); - - let mut generator = - TestViewGenerator::generate(quorum_membership.clone(), da_membership.clone()); - - let mut proposals = Vec::new(); - let mut leaders = Vec::new(); - let mut votes = Vec::new(); - let mut dacs = Vec::new(); - let mut vids = Vec::new(); - for view in (&mut generator).take(1).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - leaders.push(view.leader_public_key); - votes.push(view.create_quorum_vote(&handle).await); - dacs.push(view.da_certificate.clone()); - vids.push(view.vid_proposal.clone()); - } - - let inputs = vec![random![ - QuorumProposalRecv(proposals[0].clone(), leaders[0]), - DaCertificateRecv(dacs[0].clone()), - VidShareRecv(leaders[0], vid_share(&vids[0].0, handle.public_key())), - ]]; - - let expectations = vec![Expectations::from_outputs(all_predicates![ - validated_state_updated(), - exact(ViewChange(ViewNumber::new(1))), - ])]; - - let consensus_state = - ConsensusTaskState::::create_from(&handle).await; - let mut consensus_script = TaskScript { - timeout: TIMEOUT, - state: consensus_state, - expectations, - }; - - run_test![inputs, consensus_script].await; -} - -/// Tests that VID shares that return validation with an Ok(Err) result -/// are correctly rejected -#[cfg(test)] -#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] -#[cfg_attr(async_executor_impl = "async-std", async_std::test)] -#[cfg(feature = "test-srs")] -async fn test_invalid_vid_disperse() { - use hotshot_testing::{ - helpers::{build_payload_commitment, build_vid_proposal}, - test_builder::TestDescription, - }; - use hotshot_types::traits::{ - consensus_api::ConsensusApi, network::Topic, node_implementation::NodeType, - }; - - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - - let handle = build_system_handle::(0) - .await - .0; - - let quorum_membership = handle.hotshot.memberships.quorum_membership.clone(); - let da_membership = handle.hotshot.memberships.da_membership.clone(); - - let mut generator = - TestViewGenerator::generate(quorum_membership.clone(), da_membership.clone()); - - let mut proposals = Vec::new(); - let mut leaders = Vec::new(); - let mut votes = Vec::new(); - let mut dacs = Vec::new(); - let mut vids = Vec::new(); - for view in (&mut generator).take(1).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - leaders.push(view.leader_public_key); - votes.push(view.create_quorum_vote(&handle)); - dacs.push(view.da_certificate.clone()); - vids.push(view.vid_proposal.clone()); - } - - let vid_scheme = - vid_scheme_from_view_number::(&quorum_membership, ViewNumber::new(0)); - - let corrupt_share = vid_scheme.corrupt_share_index(vids[0].0[0].data.share.clone()); - - // Corrupt one of the shares - let mut share = vid_share(&vids[0].0, handle.public_key()); - share.data.share = corrupt_share; - - let inputs = vec![random![ - VidShareRecv(share), - DaCertificateRecv(dacs[0].clone()), - QuorumProposalRecv(proposals[0].clone(), leaders[0]), - ]]; - - // If verify_share does not correctly handle this case, a `QuorumVote` - // will be emitted and cause a test failure - let expectations = vec![Expectations::from_outputs(all_predicates![ - validated_state_updated(), - exact(ViewChange(ViewNumber::new(1))), - quorum_proposal_validated(), - ])]; - - let consensus_state = - ConsensusTaskState::::create_from(&handle).await; - let mut consensus_script = TaskScript { - timeout: TIMEOUT, - state: consensus_state, - expectations, - }; - - run_test![inputs, consensus_script].await; -} diff --git a/crates/testing/tests/tests_1/quorum_proposal_recv_task.rs b/crates/testing/tests/tests_1/quorum_proposal_recv_task.rs index 940e30f62e..55ef973f9d 100644 --- a/crates/testing/tests/tests_1/quorum_proposal_recv_task.rs +++ b/crates/testing/tests/tests_1/quorum_proposal_recv_task.rs @@ -6,7 +6,6 @@ // TODO: Remove after integration #![allow(unused_imports)] -#![cfg(feature = "dependency-tasks")] use committable::Committable; use futures::StreamExt; @@ -38,7 +37,6 @@ use hotshot_types::{ }; #[cfg(test)] -#[cfg(feature = "dependency-tasks")] #[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] #[cfg_attr(async_executor_impl = "async-std", async_std::test)] async fn test_quorum_proposal_recv_task() { @@ -131,7 +129,6 @@ async fn test_quorum_proposal_recv_task() { } #[cfg(test)] -#[cfg(feature = "dependency-tasks")] #[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] #[cfg_attr(async_executor_impl = "async-std", async_std::test)] async fn test_quorum_proposal_recv_task_liveness_check() { diff --git a/crates/testing/tests/tests_1/quorum_proposal_task.rs b/crates/testing/tests/tests_1/quorum_proposal_task.rs index 8e5bb64259..455f304114 100644 --- a/crates/testing/tests/tests_1/quorum_proposal_task.rs +++ b/crates/testing/tests/tests_1/quorum_proposal_task.rs @@ -4,8 +4,6 @@ // You should have received a copy of the MIT License // along with the HotShot repository. If not, see . -#![cfg(feature = "dependency-tasks")] - use std::time::Duration; use futures::StreamExt; diff --git a/crates/testing/tests/tests_1/quorum_vote_task.rs b/crates/testing/tests/tests_1/quorum_vote_task.rs index 72fb7d97e6..e981ed9bda 100644 --- a/crates/testing/tests/tests_1/quorum_vote_task.rs +++ b/crates/testing/tests/tests_1/quorum_vote_task.rs @@ -5,7 +5,6 @@ // along with the HotShot repository. If not, see . #![allow(clippy::panic)] -#![cfg(feature = "dependency-tasks")] use std::time::Duration; diff --git a/crates/testing/tests/tests_1/test_success.rs b/crates/testing/tests/tests_1/test_success.rs index c4ba56416c..a2ee5f1572 100644 --- a/crates/testing/tests/tests_1/test_success.rs +++ b/crates/testing/tests/tests_1/test_success.rs @@ -10,7 +10,6 @@ use hotshot_example_types::node_types::{ Libp2pImpl, MemoryImpl, PushCdnImpl, TestConsecutiveLeaderTypes, TestTypes, TestTypesRandomizedLeader, TestVersions, }; -#[cfg(feature = "dependency-tasks")] use hotshot_example_types::testable_delay::{ DelayConfig, DelayOptions, DelaySettings, SupportedTraitTypesForAsyncDelay, }; @@ -41,7 +40,6 @@ cross_tests!( }, ); -#[cfg(feature = "dependency-tasks")] cross_tests!( TestName: test_success_with_async_delay, Impls: [MemoryImpl, Libp2pImpl, PushCdnImpl], @@ -74,7 +72,6 @@ cross_tests!( }, ); -#[cfg(feature = "dependency-tasks")] cross_tests!( TestName: test_success_with_async_delay_2, Impls: [MemoryImpl, Libp2pImpl, PushCdnImpl], diff --git a/crates/testing/tests/tests_1/upgrade_task_with_consensus.rs b/crates/testing/tests/tests_1/upgrade_task_with_consensus.rs deleted file mode 100644 index e226726246..0000000000 --- a/crates/testing/tests/tests_1/upgrade_task_with_consensus.rs +++ /dev/null @@ -1,678 +0,0 @@ -// Copyright (c) 2021-2024 Espresso Systems (espressosys.com) -// This file is part of the HotShot repository. - -// You should have received a copy of the MIT License -// along with the HotShot repository. If not, see . - -#![cfg(not(feature = "dependency-tasks"))] -// TODO: Remove after integration of dependency-tasks -#![cfg(not(feature = "dependency-tasks"))] -#![allow(unused_imports)] - -use std::time::Duration; - -use futures::StreamExt; -use hotshot::{tasks::task_state::CreateTaskState, types::SystemContextHandle}; -use hotshot_example_types::{ - block_types::TestTransaction, - node_types::{MemoryImpl, TestTypes, TestVersions}, - state_types::TestInstanceState, -}; -use hotshot_macros::test_scripts; -use hotshot_task_impls::{ - consensus::ConsensusTaskState, events::HotShotEvent::*, upgrade::UpgradeTaskState, -}; -use hotshot_testing::{ - helpers::{build_fake_view_with_leaf, vid_share}, - predicates::{event::*, upgrade_with_consensus::*}, - script::{Expectations, TaskScript}, - view_generator::TestViewGenerator, -}; -use hotshot_types::{ - data::{null_block, ViewNumber}, - simple_vote::UpgradeProposalData, - traits::{ - election::Membership, - node_implementation::{ConsensusTime, Versions}, - }, - vote::HasViewNumber, -}; -use vbs::version::{StaticVersionType, Version}; -use vec1::vec1; - -#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] -#[cfg_attr(async_executor_impl = "async-std", async_std::test)] -/// Tests that we correctly update our internal consensus state when reaching a decided upgrade certificate. -async fn test_upgrade_task_vote() { - use hotshot_testing::helpers::build_system_handle; - - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - - let handle = build_system_handle::(1) - .await - .0; - let quorum_membership = handle.hotshot.memberships.quorum_membership.clone(); - let da_membership = handle.hotshot.memberships.da_membership.clone(); - - let old_version = Version { major: 0, minor: 1 }; - let new_version = Version { major: 0, minor: 2 }; - - let upgrade_data: UpgradeProposalData = UpgradeProposalData { - old_version, - new_version, - decide_by: ViewNumber::new(6), - new_version_hash: [0u8; 12].to_vec(), - old_version_last_view: ViewNumber::new(6), - new_version_first_view: ViewNumber::new(7), - }; - - let mut proposals = Vec::new(); - let mut votes = Vec::new(); - let mut dacs = Vec::new(); - let mut vids = Vec::new(); - let mut leaders = Vec::new(); - let mut leaves = Vec::new(); - - let mut generator = TestViewGenerator::generate(quorum_membership.clone(), da_membership); - - for view in (&mut generator).take(2).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - votes.push(view.create_quorum_vote(&handle).await); - dacs.push(view.da_certificate.clone()); - vids.push(view.vid_proposal.clone()); - leaders.push(view.leader_public_key); - leaves.push(view.leaf.clone()); - } - - generator.add_upgrade(upgrade_data); - - for view in generator.take(4).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - votes.push(view.create_quorum_vote(&handle).await); - dacs.push(view.da_certificate.clone()); - vids.push(view.vid_proposal.clone()); - leaders.push(view.leader_public_key); - leaves.push(view.leaf.clone()); - } - let inputs = vec![ - vec![ - QuorumProposalRecv(proposals[0].clone(), leaders[0]), - VidShareRecv(leaders[0], vid_share(&vids[0].0, handle.public_key())), - DaCertificateRecv(dacs[0].clone()), - ], - vec![ - QuorumProposalRecv(proposals[1].clone(), leaders[1]), - VidShareRecv(leaders[1], vid_share(&vids[1].0, handle.public_key())), - DaCertificateRecv(dacs[1].clone()), - ], - vec![ - QuorumProposalRecv(proposals[2].clone(), leaders[2]), - DaCertificateRecv(dacs[2].clone()), - VidShareRecv(leaders[2], vid_share(&vids[2].0, handle.public_key())), - ], - vec![ - QuorumProposalRecv(proposals[3].clone(), leaders[3]), - DaCertificateRecv(dacs[3].clone()), - VidShareRecv(leaders[3], vid_share(&vids[3].0, handle.public_key())), - ], - vec![ - QuorumProposalRecv(proposals[4].clone(), leaders[4]), - DaCertificateRecv(dacs[4].clone()), - VidShareRecv(leaders[4], vid_share(&vids[4].0, handle.public_key())), - ], - vec![QuorumProposalRecv(proposals[5].clone(), leaders[5])], - ]; - - let expectations = vec![ - Expectations { - output_asserts: vec![ - exact(ViewChange(ViewNumber::new(1))), - validated_state_updated(), - quorum_proposal_validated(), - exact(QuorumVoteSend(votes[0].clone())), - ], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![ - exact(ViewChange(ViewNumber::new(2))), - validated_state_updated(), - quorum_proposal_validated(), - exact(QuorumVoteSend(votes[1].clone())), - ], - task_state_asserts: vec![no_decided_upgrade_certificate()], - }, - Expectations { - output_asserts: vec![ - exact(ViewChange(ViewNumber::new(3))), - validated_state_updated(), - quorum_proposal_validated(), - exact(QuorumVoteSend(votes[2].clone())), - ], - task_state_asserts: vec![no_decided_upgrade_certificate()], - }, - Expectations { - output_asserts: vec![ - exact(ViewChange(ViewNumber::new(4))), - validated_state_updated(), - quorum_proposal_validated(), - leaf_decided(), - exact(QuorumVoteSend(votes[3].clone())), - ], - task_state_asserts: vec![no_decided_upgrade_certificate()], - }, - Expectations { - output_asserts: vec![ - exact(ViewChange(ViewNumber::new(5))), - validated_state_updated(), - quorum_proposal_validated(), - leaf_decided(), - exact(QuorumVoteSend(votes[4].clone())), - ], - task_state_asserts: vec![no_decided_upgrade_certificate()], - }, - Expectations { - output_asserts: vec![ - exact(ViewChange(ViewNumber::new(6))), - validated_state_updated(), - quorum_proposal_validated(), - leaf_decided(), - ], - task_state_asserts: vec![decided_upgrade_certificate()], - }, - ]; - - let consensus_state = - ConsensusTaskState::::create_from(&handle).await; - let mut consensus_script = TaskScript { - timeout: Duration::from_millis(65), - state: consensus_state, - expectations, - }; - - test_scripts![inputs, consensus_script].await; -} - -#[cfg_attr( - async_executor_impl = "tokio", - tokio::test(flavor = "multi_thread", worker_threads = 2) -)] -#[cfg_attr(async_executor_impl = "async-std", async_std::test)] -/// Test that we correctly form and include an `UpgradeCertificate` when receiving votes. -async fn test_upgrade_task_propose() { - use std::sync::Arc; - - use hotshot_testing::helpers::build_system_handle; - - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - - let handle = build_system_handle::(3) - .await - .0; - let quorum_membership = handle.hotshot.memberships.quorum_membership.clone(); - let da_membership = handle.hotshot.memberships.da_membership.clone(); - - let other_handles = futures::future::join_all((0..=9).map(build_system_handle)).await; - - let old_version = Version { major: 0, minor: 1 }; - let new_version = Version { major: 0, minor: 2 }; - - let upgrade_data: UpgradeProposalData = UpgradeProposalData { - old_version, - new_version, - decide_by: ViewNumber::new(4), - new_version_hash: [0u8; 12].to_vec(), - old_version_last_view: ViewNumber::new(5), - new_version_first_view: ViewNumber::new(7), - }; - - let mut proposals = Vec::new(); - let mut votes = Vec::new(); - let mut dacs = Vec::new(); - let mut vids = Vec::new(); - let mut leaders = Vec::new(); - let mut views = Vec::new(); - - let mut generator = TestViewGenerator::generate(quorum_membership.clone(), da_membership); - - for view in (&mut generator).take(1).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - votes.push(view.create_quorum_vote(&handle).await); - dacs.push(view.da_certificate.clone()); - vids.push(view.vid_proposal.clone()); - leaders.push(view.leader_public_key); - views.push(view.clone()); - } - - generator.add_upgrade(upgrade_data.clone()); - - for view in generator.take(4).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - votes.push(view.create_quorum_vote(&handle).await); - dacs.push(view.da_certificate.clone()); - vids.push(view.vid_proposal.clone()); - leaders.push(view.leader_public_key); - views.push(view.clone()); - } - - let mut upgrade_votes = Vec::new(); - - for handle in other_handles { - upgrade_votes.push( - views[2] - .create_upgrade_vote(upgrade_data.clone(), &handle.0) - .await, - ); - } - - let consensus_state = - ConsensusTaskState::::create_from(&handle).await; - let upgrade_state = - UpgradeTaskState::::create_from(&handle).await; - - let upgrade_vote_recvs: Vec<_> = upgrade_votes.into_iter().map(UpgradeVoteRecv).collect(); - - let inputs = vec![ - vec![ - QuorumProposalRecv(proposals[0].clone(), leaders[0]), - VidShareRecv(leaders[0], vid_share(&vids[0].0, handle.public_key())), - DaCertificateRecv(dacs[0].clone()), - ], - upgrade_vote_recvs, - vec![ - QuorumProposalRecv(proposals[1].clone(), leaders[1]), - DaCertificateRecv(dacs[1].clone()), - VidShareRecv(leaders[1], vid_share(&vids[1].0, handle.public_key())), - ], - vec![ - VidShareRecv(leaders[2], vid_share(&vids[2].0, handle.public_key())), - SendPayloadCommitmentAndMetadata( - vids[2].0[0].data.payload_commitment, - proposals[2].data.block_header.builder_commitment.clone(), - proposals[2].data.block_header.metadata, - ViewNumber::new(3), - vec1![null_block::builder_fee::( - quorum_membership.total_nodes(), - ::Base::VERSION - ) - .unwrap()], - None, - ), - QcFormed(either::Either::Left(proposals[2].data.justify_qc.clone())), - ], - ]; - - let mut consensus_script = TaskScript { - timeout: Duration::from_millis(35), - state: consensus_state, - expectations: vec![ - Expectations { - output_asserts: vec![ - exact::(ViewChange(ViewNumber::new(1))), - validated_state_updated(), - quorum_proposal_validated::(), - quorum_vote_send::(), - ], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![ - exact::(ViewChange(ViewNumber::new(2))), - validated_state_updated(), - quorum_proposal_validated::(), - quorum_vote_send(), - ], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![quorum_proposal_send_with_upgrade_certificate::()], - task_state_asserts: vec![], - }, - ], - }; - - let mut upgrade_script = TaskScript { - timeout: Duration::from_millis(35), - state: upgrade_state, - expectations: vec![ - Expectations { - output_asserts: vec![], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![upgrade_certificate_formed::()], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![], - task_state_asserts: vec![], - }, - ], - }; - - test_scripts![inputs, consensus_script, upgrade_script].await; -} - -#[cfg_attr( - async_executor_impl = "tokio", - tokio::test(flavor = "multi_thread", worker_threads = 2) -)] -#[cfg_attr(async_executor_impl = "async-std", async_std::test)] -/// Test that we correctly handle blank blocks between versions. -/// Specifically, this test schedules an upgrade between views 4 and 8, -/// and ensures that: -/// - we correctly vote affirmatively on a QuorumProposal with a null block payload in view 5 -/// - we correctly propose with a null block payload in view 6, even if we have indications to do otherwise (via SendPayloadCommitmentAndMetadata, VID etc). -/// - we correctly reject a QuorumProposal with a non-null block payload in view 7. -async fn test_upgrade_task_blank_blocks() { - use hotshot_testing::helpers::build_system_handle; - - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - - let handle = build_system_handle::(6) - .await - .0; - let quorum_membership = handle.hotshot.memberships.quorum_membership.clone(); - let da_membership = handle.hotshot.memberships.da_membership.clone(); - - let old_version = Version { major: 0, minor: 1 }; - let new_version = Version { major: 0, minor: 2 }; - - let builder_fee = null_block::builder_fee::( - quorum_membership.total_nodes(), - ::Base::VERSION, - ) - .unwrap(); - - let upgrade_data: UpgradeProposalData = UpgradeProposalData { - old_version, - new_version, - decide_by: ViewNumber::new(7), - new_version_hash: [0u8; 12].to_vec(), - old_version_last_view: ViewNumber::new(6), - new_version_first_view: ViewNumber::new(8), - }; - - let mut proposals = Vec::new(); - let mut votes = Vec::new(); - let mut dacs = Vec::new(); - let mut vids = Vec::new(); - let mut leaders = Vec::new(); - let mut views = Vec::new(); - - let mut generator = TestViewGenerator::generate(quorum_membership.clone(), da_membership); - - for view in (&mut generator).take(1).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - votes.push(view.create_quorum_vote(&handle).await); - dacs.push(view.da_certificate.clone()); - vids.push(view.vid_proposal.clone()); - leaders.push(view.leader_public_key); - views.push(view.clone()); - } - - generator.add_upgrade(upgrade_data.clone()); - - for view in (&mut generator).take(3).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - votes.push(view.create_quorum_vote(&handle).await); - dacs.push(view.da_certificate.clone()); - vids.push(view.vid_proposal.clone()); - leaders.push(view.leader_public_key); - views.push(view.clone()); - } - - // We are now in the upgrade period, and set the transactions to null for the QuorumProposalRecv in view 5. - // Our node should vote affirmatively on this. - generator.add_transactions(vec![]); - - for view in (&mut generator).take(1).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - votes.push(view.create_quorum_vote(&handle).await); - dacs.push(view.da_certificate.clone()); - vids.push(view.vid_proposal.clone()); - leaders.push(view.leader_public_key); - views.push(view.clone()); - } - - // The transactions task generates an empty transaction set in this view, - // because we are proposing between versions. - generator.add_transactions(vec![]); - - for view in (&mut generator).take(1).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - votes.push(view.create_quorum_vote(&handle).await); - dacs.push(view.da_certificate.clone()); - vids.push(view.vid_proposal.clone()); - leaders.push(view.leader_public_key); - views.push(view.clone()); - } - - // For view 7, we set the transactions to something not null. The node should fail to vote on this. - generator.add_transactions(vec![TestTransaction::new(vec![0])]); - - for view in generator.take(1).collect::>().await { - proposals.push(view.quorum_proposal.clone()); - votes.push(view.create_quorum_vote(&handle).await); - dacs.push(view.da_certificate.clone()); - vids.push(view.vid_proposal.clone()); - leaders.push(view.leader_public_key); - views.push(view.clone()); - } - - let consensus_state = - ConsensusTaskState::::create_from(&handle).await; - let upgrade_state = - UpgradeTaskState::::create_from(&handle).await; - - let inputs = vec![ - vec![ - QuorumProposalRecv(proposals[0].clone(), leaders[0]), - VidShareRecv(leaders[0], vid_share(&vids[0].0, handle.public_key())), - DaCertificateRecv(dacs[0].clone()), - ], - vec![ - QuorumProposalRecv(proposals[1].clone(), leaders[1]), - VidShareRecv(leaders[1], vid_share(&vids[1].0, handle.public_key())), - DaCertificateRecv(dacs[1].clone()), - SendPayloadCommitmentAndMetadata( - vids[1].0[0].data.payload_commitment, - proposals[1].data.block_header.builder_commitment.clone(), - proposals[1].data.block_header.metadata, - ViewNumber::new(2), - vec1![builder_fee.clone()], - None, - ), - ], - vec![ - DaCertificateRecv(dacs[2].clone()), - VidShareRecv(leaders[2], vid_share(&vids[2].0, handle.public_key())), - SendPayloadCommitmentAndMetadata( - vids[2].0[0].data.payload_commitment, - proposals[2].data.block_header.builder_commitment.clone(), - proposals[2].data.block_header.metadata, - ViewNumber::new(3), - vec1![builder_fee.clone()], - None, - ), - QuorumProposalRecv(proposals[2].clone(), leaders[2]), - ], - vec![ - DaCertificateRecv(dacs[3].clone()), - VidShareRecv(leaders[3], vid_share(&vids[3].0, handle.public_key())), - SendPayloadCommitmentAndMetadata( - vids[3].0[0].data.payload_commitment, - proposals[3].data.block_header.builder_commitment.clone(), - proposals[3].data.block_header.metadata, - ViewNumber::new(4), - vec1![builder_fee.clone()], - None, - ), - QuorumProposalRecv(proposals[3].clone(), leaders[3]), - ], - vec![ - DaCertificateRecv(dacs[4].clone()), - VidShareRecv(leaders[4], vid_share(&vids[4].0, handle.public_key())), - SendPayloadCommitmentAndMetadata( - vids[4].0[0].data.payload_commitment, - proposals[4].data.block_header.builder_commitment.clone(), - proposals[4].data.block_header.metadata, - ViewNumber::new(5), - vec1![builder_fee.clone()], - None, - ), - QuorumProposalRecv(proposals[4].clone(), leaders[4]), - ], - vec![ - DaCertificateRecv(dacs[5].clone()), - VidShareRecv(leaders[5], vid_share(&vids[5].0, handle.public_key())), - SendPayloadCommitmentAndMetadata( - vids[5].0[0].data.payload_commitment, - proposals[5].data.block_header.builder_commitment.clone(), - proposals[5].data.block_header.metadata, - ViewNumber::new(6), - vec1![builder_fee.clone()], - None, - ), - QuorumProposalRecv(proposals[5].clone(), leaders[5]), - QcFormed(either::Either::Left(proposals[5].data.justify_qc.clone())), - ], - vec![ - DaCertificateRecv(dacs[6].clone()), - VidShareRecv(leaders[6], vid_share(&vids[6].0, handle.public_key())), - SendPayloadCommitmentAndMetadata( - vids[6].0[0].data.payload_commitment, - proposals[6].data.block_header.builder_commitment.clone(), - proposals[6].data.block_header.metadata, - ViewNumber::new(7), - vec1![builder_fee], - None, - ), - QuorumProposalRecv(proposals[6].clone(), leaders[6]), - ], - ]; - - let mut consensus_script = TaskScript { - timeout: Duration::from_millis(35), - state: consensus_state, - expectations: vec![ - Expectations { - output_asserts: vec![ - exact::(ViewChange(ViewNumber::new(1))), - validated_state_updated(), - quorum_proposal_validated(), - quorum_vote_send(), - ], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![ - exact(ViewChange(ViewNumber::new(2))), - validated_state_updated(), - quorum_proposal_validated(), - quorum_vote_send(), - ], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![ - exact(ViewChange(ViewNumber::new(3))), - validated_state_updated(), - quorum_proposal_validated(), - quorum_vote_send(), - ], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![ - exact(ViewChange(ViewNumber::new(4))), - validated_state_updated(), - quorum_proposal_validated(), - leaf_decided(), - quorum_vote_send(), - ], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![ - exact(ViewChange(ViewNumber::new(5))), - validated_state_updated(), - quorum_proposal_validated(), - leaf_decided(), - // This is between versions, but we are receiving a null block and hence should vote affirmatively on it. - quorum_vote_send(), - ], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![ - exact(ViewChange(ViewNumber::new(6))), - validated_state_updated(), - quorum_proposal_validated(), - quorum_proposal_send_with_null_block(quorum_membership.total_nodes()), - leaf_decided(), - quorum_vote_send(), - ], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![ - exact(ViewChange(ViewNumber::new(7))), - validated_state_updated(), - quorum_proposal_validated(), - leaf_decided(), - // We do NOT expect a quorum_vote_send() because we have set the block to be non-null in this view. - ], - task_state_asserts: vec![], - }, - ], - }; - - let mut upgrade_script = TaskScript { - timeout: Duration::from_millis(35), - state: upgrade_state, - expectations: vec![ - Expectations { - output_asserts: vec![], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![], - task_state_asserts: vec![], - }, - Expectations { - output_asserts: vec![], - task_state_asserts: vec![], - }, - ], - }; - - test_scripts![inputs, consensus_script, upgrade_script].await; -} diff --git a/crates/testing/tests/tests_1/upgrade_task_with_proposal.rs b/crates/testing/tests/tests_1/upgrade_task_with_proposal.rs index b429fddaf4..7b2f40e73e 100644 --- a/crates/testing/tests/tests_1/upgrade_task_with_proposal.rs +++ b/crates/testing/tests/tests_1/upgrade_task_with_proposal.rs @@ -4,7 +4,6 @@ // You should have received a copy of the MIT License // along with the HotShot repository. If not, see . -#![cfg(feature = "dependency-tasks")] // TODO: Remove after integration of dependency-tasks #![allow(unused_imports)] diff --git a/crates/testing/tests/tests_1/upgrade_task_with_vote.rs b/crates/testing/tests/tests_1/upgrade_task_with_vote.rs index 49805d53d5..8cda11c0e3 100644 --- a/crates/testing/tests/tests_1/upgrade_task_with_vote.rs +++ b/crates/testing/tests/tests_1/upgrade_task_with_vote.rs @@ -4,7 +4,6 @@ // You should have received a copy of the MIT License // along with the HotShot repository. If not, see . -#![cfg(feature = "dependency-tasks")] // TODO: Remove after integration of dependency-tasks #![allow(unused_imports)] @@ -40,7 +39,6 @@ use vbs::version::Version; const TIMEOUT: Duration = Duration::from_millis(65); -#[cfg(feature = "dependency-tasks")] #[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] #[cfg_attr(async_executor_impl = "async-std", async_std::test)] /// Tests that we correctly update our internal quorum vote state when reaching a decided upgrade diff --git a/crates/testing/tests/tests_1/vote_dependency_handle.rs b/crates/testing/tests/tests_1/vote_dependency_handle.rs index 62d1d8040b..48fcffe5e7 100644 --- a/crates/testing/tests/tests_1/vote_dependency_handle.rs +++ b/crates/testing/tests/tests_1/vote_dependency_handle.rs @@ -1,5 +1,3 @@ -#![cfg(feature = "dependency-tasks")] - use std::time::Duration; use async_broadcast::broadcast;