Skip to content

Commit

Permalink
push task_state as QuorumProposalRecv task state into helper function…
Browse files Browse the repository at this point in the history
…s, rename consensus2
  • Loading branch information
pls148 committed Oct 14, 2024
1 parent a4604dd commit e83aba5
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 92 deletions.
2 changes: 0 additions & 2 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
/// Panics if sending genesis fails
#[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
pub async fn start_consensus(&self) {
tracing::error!("HotShot is running with the dependency tasks feature enabled!!");

#[cfg(all(feature = "rewind", not(debug_assertions)))]
compile_error!("Cannot run rewind in production builds!");

Expand Down
4 changes: 2 additions & 2 deletions crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,14 @@ pub async fn add_consensus_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>,

{
use hotshot_task_impls::{
consensus2::Consensus2TaskState, quorum_proposal::QuorumProposalTaskState,
consensus::ConsensusTaskState, quorum_proposal::QuorumProposalTaskState,
quorum_proposal_recv::QuorumProposalRecvTaskState, quorum_vote::QuorumVoteTaskState,
};

handle.add_task(QuorumProposalTaskState::<TYPES, I, V>::create_from(handle).await);
handle.add_task(QuorumVoteTaskState::<TYPES, I, V>::create_from(handle).await);
handle.add_task(QuorumProposalRecvTaskState::<TYPES, I, V>::create_from(handle).await);
handle.add_task(Consensus2TaskState::<TYPES, I, V>::create_from(handle).await);
handle.add_task(ConsensusTaskState::<TYPES, I, V>::create_from(handle).await);
}

#[cfg(feature = "rewind")]
Expand Down
4 changes: 2 additions & 2 deletions crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use async_compatibility_layer::art::async_spawn;
use async_trait::async_trait;
use chrono::Utc;
use hotshot_task_impls::{
builder::BuilderClient, consensus2::Consensus2TaskState, da::DaTaskState,
builder::BuilderClient, consensus::ConsensusTaskState, da::DaTaskState,
quorum_proposal::QuorumProposalTaskState, quorum_proposal_recv::QuorumProposalRecvTaskState,
quorum_vote::QuorumVoteTaskState, request::NetworkRequestState, rewind::RewindTaskState,
transactions::TransactionTaskState, upgrade::UpgradeTaskState, vid::VidTaskState,
Expand Down Expand Up @@ -299,7 +299,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState

#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState<TYPES, I, V>
for Consensus2TaskState<TYPES, I, V>
for ConsensusTaskState<TYPES, I, V>
{
async fn create_from(handle: &SystemContextHandle<TYPES, I, V>) -> Self {
let consensus = handle.hotshot.consensus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use hotshot_types::{
};
use tracing::{debug, error, instrument};

use super::Consensus2TaskState;
use super::ConsensusTaskState;
use crate::{
consensus2::Versions,
consensus::Versions,
events::HotShotEvent,
helpers::{broadcast_event, cancel_task},
vote_collection::handle_vote,
Expand All @@ -38,7 +38,7 @@ pub(crate) async fn handle_quorum_vote_recv<
vote: &QuorumVote<TYPES>,
event: Arc<HotShotEvent<TYPES>>,
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
task_state: &mut Consensus2TaskState<TYPES, I, V>,
task_state: &mut ConsensusTaskState<TYPES, I, V>,
) -> Result<()> {
// Are we the leader for this view?
ensure!(
Expand Down Expand Up @@ -73,7 +73,7 @@ pub(crate) async fn handle_timeout_vote_recv<
vote: &TimeoutVote<TYPES>,
event: Arc<HotShotEvent<TYPES>>,
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
task_state: &mut Consensus2TaskState<TYPES, I, V>,
task_state: &mut ConsensusTaskState<TYPES, I, V>,
) -> Result<()> {
// Are we the leader for this view?
ensure!(
Expand Down Expand Up @@ -108,7 +108,7 @@ pub(crate) async fn handle_view_change<
>(
new_view_number: TYPES::Time,
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
task_state: &mut Consensus2TaskState<TYPES, I, V>,
task_state: &mut ConsensusTaskState<TYPES, I, V>,
) -> Result<()> {
ensure!(
new_view_number > task_state.cur_view,
Expand Down Expand Up @@ -205,7 +205,7 @@ pub(crate) async fn handle_view_change<
pub(crate) async fn handle_timeout<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
view_number: TYPES::Time,
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
task_state: &mut Consensus2TaskState<TYPES, I, V>,
task_state: &mut ConsensusTaskState<TYPES, I, V>,
) -> Result<()> {
ensure!(
task_state.cur_view < view_number,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::{events::HotShotEvent, vote_collection::VoteCollectorsMap};
mod handlers;

/// Task state for the Consensus task.
pub struct Consensus2TaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
pub struct ConsensusTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
/// Our public key
pub public_key: TYPES::SignatureKey,

Expand Down Expand Up @@ -96,9 +96,9 @@ pub struct Consensus2TaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V:
/// Lock for a decided upgrade
pub upgrade_lock: UpgradeLock<TYPES, V>,
}
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Consensus2TaskState<TYPES, I, V> {
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusTaskState<TYPES, I, V> {
/// Handles a consensus event received on the event stream
#[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, last_decided_view = *self.last_decided_view), name = "Consensus replica task", level = "error", target = "Consensus2TaskState")]
#[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, last_decided_view = *self.last_decided_view), name = "Consensus replica task", level = "error", target = "ConsensusTaskState")]
pub async fn handle(
&mut self,
event: Arc<HotShotEvent<TYPES>>,
Expand Down Expand Up @@ -151,7 +151,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Consensus2TaskS

#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
for Consensus2TaskState<TYPES, I, V>
for ConsensusTaskState<TYPES, I, V>
{
type Event = HotShotEvent<TYPES>;

Expand Down
81 changes: 35 additions & 46 deletions crates/task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use hotshot_types::{
use tokio::task::JoinHandle;
use tracing::{debug, info, instrument, warn};

use crate::{events::HotShotEvent, request::REQUEST_TIMEOUT};
use crate::{events::HotShotEvent, request::REQUEST_TIMEOUT, quorum_proposal_recv::QuorumProposalRecvTaskState};

/// Trigger a request to the network for a proposal for a view and wait for the response or timeout.
#[instrument(skip_all)]
Expand Down Expand Up @@ -436,33 +436,24 @@ pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
///
/// # Errors
/// If any validation or state update fails.
/// TODO - This should just take the QuorumProposalRecv task state after
/// we merge the dependency tasks.
#[allow(clippy::too_many_arguments)]
#[allow(clippy::too_many_lines)]
#[instrument(skip_all, fields(id = id, view = *proposal.data.view_number()))]
#[instrument(skip_all, fields(id = task_state.id, view = *proposal.data.view_number()))]
pub async fn validate_proposal_safety_and_liveness<
TYPES: NodeType,
I: NodeImplementation<TYPES>,
V: Versions,
>(
proposal: Proposal<TYPES, QuorumProposal<TYPES>>,
parent_leaf: Leaf<TYPES>,
consensus: OuterConsensus<TYPES>,
decided_upgrade_certificate: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
quorum_membership: Arc<TYPES::Membership>,
task_state: &mut QuorumProposalRecvTaskState<TYPES, I, V>,
event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
sender: TYPES::SignatureKey,
event_sender: Sender<Event<TYPES>>,
id: u64,
upgrade_lock: UpgradeLock<TYPES, V>,
storage: Arc<RwLock<I::Storage>>,
) -> Result<()> {
let view_number = proposal.data.view_number();

let proposed_leaf = Leaf::from_quorum_proposal(&proposal.data);
ensure!(
proposed_leaf.parent_commitment() == parent_leaf.commit(&upgrade_lock).await,
proposed_leaf.parent_commitment() == parent_leaf.commit(&task_state.upgrade_lock).await,
"Proposed leaf does not extend the parent leaf."
);

Expand All @@ -471,19 +462,19 @@ pub async fn validate_proposal_safety_and_liveness<
);
let view = View {
view_inner: ViewInner::Leaf {
leaf: proposed_leaf.commit(&upgrade_lock).await,
leaf: proposed_leaf.commit(&task_state.upgrade_lock).await,
state,
delta: None, // May be updated to `Some` in the vote task.
},
};

{
let mut consensus_write = consensus.write().await;
let mut consensus_write = task_state.consensus.write().await;
if let Err(e) = consensus_write.update_validated_state_map(view_number, view.clone()) {
tracing::trace!("{e:?}");
}
consensus_write
.update_saved_leaves(proposed_leaf.clone(), &upgrade_lock)
.update_saved_leaves(proposed_leaf.clone(), &task_state.upgrade_lock)
.await;

// Update our internal storage of the proposal. The proposal is valid, so
Expand All @@ -502,14 +493,14 @@ pub async fn validate_proposal_safety_and_liveness<

UpgradeCertificate::validate(
&proposal.data.upgrade_certificate,
&quorum_membership,
&upgrade_lock,
&task_state.quorum_membership,
&task_state.upgrade_lock,
)
.await?;

// Validate that the upgrade certificate is re-attached, if we saw one on the parent
proposed_leaf
.extends_upgrade(&parent_leaf, &decided_upgrade_certificate)
.extends_upgrade(&parent_leaf, &task_state.upgrade_lock.decided_upgrade_certificate)
.await?;

let justify_qc = proposal.data.justify_qc.clone();
Expand All @@ -518,7 +509,7 @@ pub async fn validate_proposal_safety_and_liveness<

// Liveness check.
{
let read_consensus = consensus.read().await;
let read_consensus = task_state.consensus.read().await;
let liveness_check = justify_qc.view_number() > read_consensus.locked_view();

// Safety check.
Expand All @@ -542,7 +533,7 @@ pub async fn validate_proposal_safety_and_liveness<
view_number,
event: EventType::Error { error: Arc::new(e) },
},
&event_sender,
&task_state.output_event_stream,
)
.await;
}
Expand All @@ -553,7 +544,7 @@ pub async fn validate_proposal_safety_and_liveness<

// Update our persistent storage of the proposal. If we cannot store the proposal reutrn
// and error so we don't vote
storage.write().await.append_proposal(&proposal).await?;
task_state.storage.write().await.append_proposal(&proposal).await?;

// We accept the proposal, notify the application layer
broadcast_event(
Expand All @@ -564,7 +555,7 @@ pub async fn validate_proposal_safety_and_liveness<
sender,
},
},
&event_sender,
&task_state.output_event_stream,
)
.await;

Expand Down Expand Up @@ -665,36 +656,34 @@ pub async fn validate_proposal_view_and_certs<TYPES: NodeType, V: Versions>(
///
/// # Errors
/// Returns an [`anyhow::Error`] when the new view is not greater than the current view.
/// TODO: Remove args when we merge dependency tasks.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn update_view<TYPES: NodeType>(
pub(crate) async fn update_view<
TYPES: NodeType,
I: NodeImplementation<TYPES>,
V: Versions,
>(
new_view: TYPES::Time,
event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
timeout: u64,
consensus: OuterConsensus<TYPES>,
cur_view: &mut TYPES::Time,
cur_view_time: &mut i64,
timeout_task: &mut JoinHandle<()>,
output_event_stream: &Sender<Event<TYPES>>,
is_old_view_leader: bool,
task_state: &mut QuorumProposalRecvTaskState<TYPES, I, V>,
) -> Result<()> {

ensure!(
new_view > *cur_view,
new_view > task_state.cur_view,
"New view is not greater than our current view"
);

let old_view = *cur_view;
let is_old_view_leader = task_state.quorum_membership.leader(task_state.cur_view) == task_state.public_key;
let old_view = task_state.cur_view;

debug!("Updating view from {} to {}", *old_view, *new_view);

if *old_view / 100 != *new_view / 100 {
info!("Progress: entered view {:>6}", *new_view);
}

*cur_view = new_view;
task_state.cur_view = new_view;

// The next view is just the current view + 1
let next_view = *cur_view + 1;
let next_view = task_state.cur_view + 1;

futures::join! {
broadcast_event(Arc::new(HotShotEvent::ViewChange(new_view)), event_stream),
Expand All @@ -705,7 +694,7 @@ pub(crate) async fn update_view<TYPES: NodeType>(
view_number: old_view,
},
},
output_event_stream,
&task_state.output_event_stream,
)
};

Expand All @@ -715,7 +704,7 @@ pub(crate) async fn update_view<TYPES: NodeType>(
// Nuance: We timeout on the view + 1 here because that means that we have
// not seen evidence to transition to this new view
let view_number = next_view;
let timeout = Duration::from_millis(timeout);
let timeout = Duration::from_millis(task_state.timeout);
async move {
async_sleep(timeout).await;
broadcast_event(
Expand All @@ -727,30 +716,30 @@ pub(crate) async fn update_view<TYPES: NodeType>(
});

// cancel the old timeout task
cancel_task(std::mem::replace(timeout_task, new_timeout_task)).await;
cancel_task(std::mem::replace(&mut task_state.timeout_task, new_timeout_task)).await;

let consensus = consensus.upgradable_read().await;
let consensus = task_state.consensus.upgradable_read().await;
consensus
.metrics
.current_view
.set(usize::try_from(cur_view.u64()).unwrap());
.set(usize::try_from(task_state.cur_view.u64()).unwrap());
let new_view_time = Utc::now().timestamp();
if is_old_view_leader {
#[allow(clippy::cast_precision_loss)]
consensus
.metrics
.view_duration_as_leader
.add_point((new_view_time - *cur_view_time) as f64);
.add_point((new_view_time - task_state.cur_view_time) as f64);
}
*cur_view_time = new_view_time;
task_state.cur_view_time = new_view_time;

// Do the comparison before the subtraction to avoid potential overflow, since
// `last_decided_view` may be greater than `cur_view` if the node is catching up.
if usize::try_from(cur_view.u64()).unwrap()
if usize::try_from(task_state.cur_view.u64()).unwrap()
> usize::try_from(consensus.last_decided_view().u64()).unwrap()
{
consensus.metrics.number_of_views_since_last_decide.set(
usize::try_from(cur_view.u64()).unwrap()
usize::try_from(task_state.cur_view.u64()).unwrap()
- usize::try_from(consensus.last_decided_view().u64()).unwrap(),
);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/task-impls/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! consensus in an event driven way
/// The task which implements the core state logic of consensus.
pub mod consensus2;
pub mod consensus;

/// The task which handles the logic for the quorum vote.
pub mod quorum_vote;
Expand Down
Loading

0 comments on commit e83aba5

Please sign in to comment.