Skip to content

Commit

Permalink
Remove slow operations from critical path (#3788)
Browse files Browse the repository at this point in the history
* move storage to end of proposal processing

* don't fetch proposal so until we really need it

* whoops wrong fetch

* fix test

* spawn fetch proposal

* revert test

* fixes after merge

* return no proposal if the dep errors

* rename to reader
  • Loading branch information
bfish713 authored Oct 28, 2024
1 parent 92df2ec commit 25c907e
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 50 deletions.
18 changes: 9 additions & 9 deletions crates/task-impls/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub enum HotShotEvent<TYPES: NodeType> {
/// 2. The proposal has been correctly signed by the leader of the current view
/// 3. The justify QC is valid
/// 4. The proposal passes either liveness or safety check.
QuorumProposalValidated(QuorumProposal<TYPES>, Leaf<TYPES>),
QuorumProposalValidated(Proposal<TYPES, QuorumProposal<TYPES>>, Leaf<TYPES>),
/// A quorum proposal is missing for a view that we need.
QuorumProposalRequestSend(
ProposalRequestPayload<TYPES>,
Expand Down Expand Up @@ -267,9 +267,14 @@ impl<TYPES: NodeType> HotShotEvent<TYPES> {
Some(v.view_number())
}
HotShotEvent::QuorumProposalRecv(proposal, _)
| HotShotEvent::QuorumProposalSend(proposal, _) => Some(proposal.data.view_number()),
| HotShotEvent::QuorumProposalSend(proposal, _)
| HotShotEvent::QuorumProposalValidated(proposal, _)
| HotShotEvent::QuorumProposalResponseSend(_, proposal)
| HotShotEvent::QuorumProposalResponseRecv(proposal)
| HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
Some(proposal.data.view_number())
}
HotShotEvent::QuorumVoteSend(vote) => Some(vote.view_number()),
HotShotEvent::QuorumProposalValidated(proposal, _) => Some(proposal.view_number()),
HotShotEvent::DaProposalRecv(proposal, _)
| HotShotEvent::DaProposalValidated(proposal, _)
| HotShotEvent::DaProposalSend(proposal, _) => Some(proposal.data.view_number()),
Expand Down Expand Up @@ -311,11 +316,6 @@ impl<TYPES: NodeType> HotShotEvent<TYPES> {
}
HotShotEvent::QuorumProposalRequestSend(req, _)
| HotShotEvent::QuorumProposalRequestRecv(req, _) => Some(req.view_number),
HotShotEvent::QuorumProposalResponseSend(_, proposal)
| HotShotEvent::QuorumProposalResponseRecv(proposal)
| HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
Some(proposal.data.view_number())
}
HotShotEvent::QuorumVoteDependenciesValidated(view_number)
| HotShotEvent::ViewChange(view_number)
| HotShotEvent::ViewSyncTimeout(view_number, _, _)
Expand Down Expand Up @@ -398,7 +398,7 @@ impl<TYPES: NodeType> Display for HotShotEvent<TYPES> {
HotShotEvent::QuorumProposalValidated(proposal, _) => write!(
f,
"QuorumProposalValidated(view_number={:?})",
proposal.view_number()
proposal.data.view_number()
),
HotShotEvent::DaProposalSend(proposal, _) => write!(
f,
Expand Down
17 changes: 4 additions & 13 deletions crates/task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use hotshot_types::{
election::Membership,
node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
signature_key::SignatureKey,
storage::Storage,
BlockPayload, ValidatedState,
},
utils::{Terminator, View, ViewInner},
Expand Down Expand Up @@ -119,6 +118,9 @@ pub(crate) async fn fetch_proposal<TYPES: NodeType, V: Versions>(
}

}
} else {
// If the dep returns early return none
return None;
}
}

Expand Down Expand Up @@ -546,17 +548,6 @@ pub async fn validate_proposal_safety_and_liveness<
});
}

// Update our persistent storage of the proposal. If we cannot store the proposal reutrn
// and error so we don't vote
task_state
.storage
.write()
.await
.append_proposal(&proposal)
.await
.wrap()
.context(error!("Failed to append proposal in storage!"))?;

// We accept the proposal, notify the application layer
broadcast_event(
Event {
Expand All @@ -573,7 +564,7 @@ pub async fn validate_proposal_safety_and_liveness<
// Notify other tasks
broadcast_event(
Arc::new(HotShotEvent::QuorumProposalValidated(
proposal.data.clone(),
proposal.clone(),
parent_leaf,
)),
&event_stream,
Expand Down
46 changes: 37 additions & 9 deletions crates/task-impls/src/quorum_proposal_recv/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use std::sync::Arc;

use async_broadcast::{broadcast, Receiver, Sender};
use async_compatibility_layer::art::async_spawn;
use async_lock::RwLockUpgradableReadGuard;
use committable::Committable;
use hotshot_types::{
Expand All @@ -19,6 +20,7 @@ use hotshot_types::{
traits::{
election::Membership,
node_implementation::{NodeImplementation, NodeType},
signature_key::SignatureKey,
storage::Storage,
ValidatedState,
},
Expand Down Expand Up @@ -104,6 +106,35 @@ async fn validate_proposal_liveness<TYPES: NodeType, I: NodeImplementation<TYPES
Ok(())
}

/// Spawn a task which will fire a request to get a proposal, and store it.
#[allow(clippy::too_many_arguments)]
fn spawn_fetch_proposal<TYPES: NodeType, V: Versions>(
view: TYPES::View,
event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
membership: Arc<TYPES::Membership>,
consensus: OuterConsensus<TYPES>,
sender_public_key: TYPES::SignatureKey,
sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
upgrade_lock: UpgradeLock<TYPES, V>,
) {
async_spawn(async move {
let lock = upgrade_lock;

let _ = fetch_proposal(
view,
event_sender,
event_receiver,
membership,
consensus,
sender_public_key,
sender_private_key,
&lock,
)
.await;
});
}

/// Handles the `QuorumProposalRecv` event by first validating the cert itself for the view, and then
/// updating the states, which runs when the proposal cannot be found in the internal state map.
///
Expand Down Expand Up @@ -155,17 +186,16 @@ pub(crate) async fn handle_quorum_proposal_recv<
.await;

// Get the parent leaf and state.
let mut parent_leaf = task_state
let parent_leaf = task_state
.consensus
.read()
.await
.saved_leaves()
.get(&justify_qc.data.leaf_commit)
.cloned();

parent_leaf = match parent_leaf {
Some(p) => Some(p),
None => fetch_proposal(
if parent_leaf.is_none() {
spawn_fetch_proposal(
justify_qc.view_number(),
event_sender.clone(),
event_receiver.clone(),
Expand All @@ -176,11 +206,9 @@ pub(crate) async fn handle_quorum_proposal_recv<
// incorrectly.
task_state.public_key.clone(),
task_state.private_key.clone(),
&task_state.upgrade_lock,
)
.await
.ok(),
};
task_state.upgrade_lock.clone(),
);
}
let consensus_reader = task_state.consensus.read().await;

let parent = match parent_leaf {
Expand Down
21 changes: 15 additions & 6 deletions crates/task-impls/src/quorum_vote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> Handl
match event.as_ref() {
#[allow(unused_assignments)]
HotShotEvent::QuorumProposalValidated(proposal, parent_leaf) => {
let proposal_payload_comm = proposal.block_header.payload_commitment();
let proposal_payload_comm = proposal.data.block_header.payload_commitment();
if let Some(ref comm) = payload_commitment {
if proposal_payload_comm != *comm {
tracing::error!("Quorum proposal has inconsistent payload commitment with DAC or VID.");
Expand All @@ -295,11 +295,17 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> Handl
payload_commitment = Some(proposal_payload_comm);
}
let parent_commitment = parent_leaf.commit(&self.upgrade_lock).await;
let proposed_leaf = Leaf::from_quorum_proposal(proposal);
let proposed_leaf = Leaf::from_quorum_proposal(&proposal.data);
if proposed_leaf.parent_commitment() != parent_commitment {
tracing::warn!("Proposed leaf parent commitment does not match parent leaf payload commitment. Aborting vote.");
return;
}
// Update our persistent storage of the proposal. If we cannot store the proposal reutrn
// and error so we don't vote
if let Err(e) = self.storage.write().await.append_proposal(proposal).await {
tracing::error!("failed to store proposal, not voting. error = {e:#}");
return;
}
leaf = Some(proposed_leaf);
}
HotShotEvent::DaCertificateValidated(cert) => {
Expand Down Expand Up @@ -424,7 +430,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS
let event_view = match dependency_type {
VoteDependency::QuorumProposal => {
if let HotShotEvent::QuorumProposalValidated(proposal, _) = event {
proposal.view_number
proposal.data.view_number
} else {
return false;
}
Expand Down Expand Up @@ -549,19 +555,22 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS
match event.as_ref() {
HotShotEvent::QuorumProposalValidated(proposal, _leaf) => {
let cur_epoch = self.consensus.read().await.cur_epoch();
tracing::trace!("Received Proposal for view {}", *proposal.view_number());
tracing::trace!(
"Received Proposal for view {}",
*proposal.data.view_number()
);

// Handle the event before creating the dependency task.
if let Err(e) =
handle_quorum_proposal_validated(proposal, &event_sender, self).await
handle_quorum_proposal_validated(&proposal.data, &event_sender, self).await
{
tracing::debug!(
"Failed to handle QuorumProposalValidated event; error = {e:#}"
);
}

self.create_dependency_task_if_new(
proposal.view_number,
proposal.data.view_number,
cur_epoch,
event_receiver,
&event_sender,
Expand Down
2 changes: 1 addition & 1 deletion crates/task-impls/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for NetworkRequest
) -> Result<()> {
match event.as_ref() {
HotShotEvent::QuorumProposalValidated(proposal, _) => {
let prop_view = proposal.view_number();
let prop_view = proposal.data.view_number();
let cur_epoch = self.consensus.read().await.cur_epoch();

// If we already have the VID shares for the next view, do nothing.
Expand Down
2 changes: 1 addition & 1 deletion crates/testing/src/byzantine/byzantine_behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + std::fmt::Debug, V: Version
];
}
HotShotEvent::QuorumProposalValidated(proposal, _) => {
self.validated_proposals.push(proposal.clone());
self.validated_proposals.push(proposal.data.clone());
}
_ => {}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/testing/tests/tests_1/quorum_proposal_recv_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async fn test_quorum_proposal_recv_task() {
.await,
)),
exact(QuorumProposalValidated(
proposals[1].data.clone(),
proposals[1].clone(),
leaves[0].clone(),
)),
exact(ViewChange(ViewNumber::new(2))),
Expand Down
8 changes: 4 additions & 4 deletions crates/testing/tests/tests_1/quorum_vote_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn test_quorum_vote_task_success() {
// Send the quorum proposal, DAC, VID share data, and validated state, in which case a dummy
// vote can be formed and the view number will be updated.
let inputs = vec![random![
QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()),
QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()),
DaCertificateRecv(dacs[1].clone()),
VidShareRecv(leaders[1], vids[1].0[0].clone()),
]];
Expand Down Expand Up @@ -150,11 +150,11 @@ async fn test_quorum_vote_task_miss_dependency() {
// Send two of quorum proposal, DAC, VID share data, in which case there's no vote.
let inputs = vec![
random![
QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()),
QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()),
VidShareRecv(leaders[1], vid_share(&vids[1].0, handle.public_key())),
],
random![
QuorumProposalValidated(proposals[2].data.clone(), leaves[1].clone()),
QuorumProposalValidated(proposals[2].clone(), leaves[1].clone()),
DaCertificateRecv(dacs[2].clone()),
],
random![
Expand Down Expand Up @@ -223,7 +223,7 @@ async fn test_quorum_vote_task_incorrect_dependency() {

// Send the correct quorum proposal and DAC, and incorrect VID share data.
let inputs = vec![random![
QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()),
QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()),
DaCertificateRecv(dacs[1].clone()),
VidShareRecv(leaders[0], vids[0].0[0].clone()),
]];
Expand Down
10 changes: 5 additions & 5 deletions crates/testing/tests/tests_1/upgrade_task_with_vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,27 +108,27 @@ async fn test_upgrade_task_with_vote() {

let inputs = vec![
random![
QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()),
QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()),
DaCertificateRecv(dacs[1].clone()),
VidShareRecv(leaders[1], vids[1].0[0].clone()),
],
random![
QuorumProposalValidated(proposals[2].data.clone(), leaves[1].clone()),
QuorumProposalValidated(proposals[2].clone(), leaves[1].clone()),
DaCertificateRecv(dacs[2].clone()),
VidShareRecv(leaders[2], vids[2].0[0].clone()),
],
random![
QuorumProposalValidated(proposals[3].data.clone(), leaves[2].clone()),
QuorumProposalValidated(proposals[3].clone(), leaves[2].clone()),
DaCertificateRecv(dacs[3].clone()),
VidShareRecv(leaders[3], vids[3].0[0].clone()),
],
random![
QuorumProposalValidated(proposals[4].data.clone(), leaves[3].clone()),
QuorumProposalValidated(proposals[4].clone(), leaves[3].clone()),
DaCertificateRecv(dacs[4].clone()),
VidShareRecv(leaders[4], vids[4].0[0].clone()),
],
random![QuorumProposalValidated(
proposals[5].data.clone(),
proposals[5].clone(),
leaves[5].clone()
),],
];
Expand Down
2 changes: 1 addition & 1 deletion crates/testing/tests/tests_1/vote_dependency_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn test_vote_dependency_handle() {
// the dependency handles do not (yet) work with the existing test suite.
let all_inputs = vec![
DaCertificateValidated(dacs[1].clone()),
QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()),
QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()),
VidShareValidated(vids[1].0[0].clone()),
]
.into_iter()
Expand Down

0 comments on commit 25c907e

Please sign in to comment.