Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove slow operations from critical path #3788

Merged
merged 10 commits into from
Oct 28, 2024
18 changes: 9 additions & 9 deletions crates/task-impls/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub enum HotShotEvent<TYPES: NodeType> {
/// 2. The proposal has been correctly signed by the leader of the current view
/// 3. The justify QC is valid
/// 4. The proposal passes either liveness or safety check.
QuorumProposalValidated(QuorumProposal<TYPES>, Leaf<TYPES>),
QuorumProposalValidated(Proposal<TYPES, QuorumProposal<TYPES>>, Leaf<TYPES>),
/// A quorum proposal is missing for a view that we need.
QuorumProposalRequestSend(
ProposalRequestPayload<TYPES>,
Expand Down Expand Up @@ -267,9 +267,14 @@ impl<TYPES: NodeType> HotShotEvent<TYPES> {
Some(v.view_number())
}
HotShotEvent::QuorumProposalRecv(proposal, _)
| HotShotEvent::QuorumProposalSend(proposal, _) => Some(proposal.data.view_number()),
| HotShotEvent::QuorumProposalSend(proposal, _)
| HotShotEvent::QuorumProposalValidated(proposal, _)
| HotShotEvent::QuorumProposalResponseSend(_, proposal)
| HotShotEvent::QuorumProposalResponseRecv(proposal)
| HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
Some(proposal.data.view_number())
}
HotShotEvent::QuorumVoteSend(vote) => Some(vote.view_number()),
HotShotEvent::QuorumProposalValidated(proposal, _) => Some(proposal.view_number()),
HotShotEvent::DaProposalRecv(proposal, _)
| HotShotEvent::DaProposalValidated(proposal, _)
| HotShotEvent::DaProposalSend(proposal, _) => Some(proposal.data.view_number()),
Expand Down Expand Up @@ -311,11 +316,6 @@ impl<TYPES: NodeType> HotShotEvent<TYPES> {
}
HotShotEvent::QuorumProposalRequestSend(req, _)
| HotShotEvent::QuorumProposalRequestRecv(req, _) => Some(req.view_number),
HotShotEvent::QuorumProposalResponseSend(_, proposal)
| HotShotEvent::QuorumProposalResponseRecv(proposal)
| HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
Some(proposal.data.view_number())
}
HotShotEvent::QuorumVoteDependenciesValidated(view_number)
| HotShotEvent::ViewChange(view_number)
| HotShotEvent::ViewSyncTimeout(view_number, _, _)
Expand Down Expand Up @@ -398,7 +398,7 @@ impl<TYPES: NodeType> Display for HotShotEvent<TYPES> {
HotShotEvent::QuorumProposalValidated(proposal, _) => write!(
f,
"QuorumProposalValidated(view_number={:?})",
proposal.view_number()
proposal.data.view_number()
),
HotShotEvent::DaProposalSend(proposal, _) => write!(
f,
Expand Down
14 changes: 1 addition & 13 deletions crates/task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use hotshot_types::{
election::Membership,
node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
signature_key::SignatureKey,
storage::Storage,
BlockPayload, ValidatedState,
},
utils::{Terminator, View, ViewInner},
Expand Down Expand Up @@ -546,17 +545,6 @@ pub async fn validate_proposal_safety_and_liveness<
});
}

// Update our persistent storage of the proposal. If we cannot store the proposal reutrn
// and error so we don't vote
task_state
.storage
.write()
.await
.append_proposal(&proposal)
.await
.wrap()
.context(error!("Failed to append proposal in storage!"))?;

// We accept the proposal, notify the application layer
broadcast_event(
Event {
Expand All @@ -573,7 +561,7 @@ pub async fn validate_proposal_safety_and_liveness<
// Notify other tasks
broadcast_event(
Arc::new(HotShotEvent::QuorumProposalValidated(
proposal.data.clone(),
proposal.clone(),
parent_leaf,
)),
&event_stream,
Expand Down
54 changes: 41 additions & 13 deletions crates/task-impls/src/quorum_proposal_recv/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use std::sync::Arc;

use async_broadcast::{broadcast, Receiver, Sender};
use async_compatibility_layer::art::async_spawn;
use async_lock::RwLockUpgradableReadGuard;
use committable::Committable;
use hotshot_types::{
Expand All @@ -19,6 +20,7 @@ use hotshot_types::{
traits::{
election::Membership,
node_implementation::{NodeImplementation, NodeType},
signature_key::SignatureKey,
storage::Storage,
ValidatedState,
},
Expand Down Expand Up @@ -104,6 +106,35 @@ async fn validate_proposal_liveness<TYPES: NodeType, I: NodeImplementation<TYPES
Ok(())
}

/// Spawn a task which will fire a request to get a proposal, and store it.
#[allow(clippy::too_many_arguments)]
fn spawn_fetch_proposal<TYPES: NodeType, V: Versions>(
view: TYPES::View,
event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
membership: Arc<TYPES::Membership>,
consensus: OuterConsensus<TYPES>,
sender_public_key: TYPES::SignatureKey,
sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
upgrade_lock: UpgradeLock<TYPES, V>,
) {
async_spawn(async move {
let lock = upgrade_lock;

let _ = fetch_proposal(
view,
event_sender,
event_receiver,
membership,
consensus,
sender_public_key,
sender_private_key,
&lock,
)
.await;
});
Comment on lines +111 to +135
Copy link
Collaborator

@rob-maron rob-maron Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this get included with the [task] cancellation logic [on new views] somehow? Do we want it to be?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's probably ok to not cancel this because it is bounded anyways: it will exact as soon as it either gets the proposal or fails

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah bounded at 500ms, and doesn't do anything heavy after sending the request

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be bad to add a async_timeout() on the entire fetch_proposal call? I guess the question is whether it's okay for fetch_proposal to abort partway through execution

I agree that it's unlikely fetch_proposal will ever get stuck outside the part where we already have an async_timeout, but it makes me a bit uncomfortable to drop the handle of tasks that need both a read lock and a write lock (at the very least, I think this might obscure a deadlock if we run into one)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a timeout in fetch_proposal I don't think we prevent a deadlock by cancelling this task either, though I'm not sure. Personally I feel it's better to either timeout the actual request for proposal because it seems bad to actually get the proposal then timeout waiting for the write lock to put it into our internal state

}

/// Handles the `QuorumProposalRecv` event by first validating the cert itself for the view, and then
/// updating the states, which runs when the proposal cannot be found in the internal state map.
///
Expand Down Expand Up @@ -155,17 +186,16 @@ pub(crate) async fn handle_quorum_proposal_recv<
.await;

// Get the parent leaf and state.
let mut parent_leaf = task_state
let parent_leaf = task_state
.consensus
.read()
.await
.saved_leaves()
.get(&justify_qc.data.leaf_commit)
.cloned();

parent_leaf = match parent_leaf {
Some(p) => Some(p),
None => fetch_proposal(
if parent_leaf.is_none() {
spawn_fetch_proposal(
justify_qc.view_number(),
event_sender.clone(),
event_receiver.clone(),
Expand All @@ -176,16 +206,14 @@ pub(crate) async fn handle_quorum_proposal_recv<
// incorrectly.
task_state.public_key.clone(),
task_state.private_key.clone(),
&task_state.upgrade_lock,
)
.await
.ok(),
};
let consensus_reader = task_state.consensus.read().await;
task_state.upgrade_lock.clone(),
);
}
let consensus_read = task_state.consensus.read().await;
pls148 marked this conversation as resolved.
Show resolved Hide resolved

let parent = match parent_leaf {
Some(leaf) => {
if let (Some(state), _) = consensus_reader.state_and_delta(leaf.view_number()) {
if let (Some(state), _) = consensus_read.state_and_delta(leaf.view_number()) {
Some((leaf, Arc::clone(&state)))
} else {
bail!("Parent state not found! Consensus internally inconsistent");
Expand All @@ -194,7 +222,7 @@ pub(crate) async fn handle_quorum_proposal_recv<
None => None,
};

if justify_qc.view_number() > consensus_reader.high_qc().view_number {
if justify_qc.view_number() > consensus_read.high_qc().view_number {
if let Err(e) = task_state
.storage
.write()
Expand All @@ -205,7 +233,7 @@ pub(crate) async fn handle_quorum_proposal_recv<
bail!("Failed to store High QC, not voting; error = {:?}", e);
}
}
drop(consensus_reader);
drop(consensus_read);

let mut consensus_writer = task_state.consensus.write().await;
if let Err(e) = consensus_writer.update_high_qc(justify_qc.clone()) {
Expand Down
21 changes: 15 additions & 6 deletions crates/task-impls/src/quorum_vote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> Handl
match event.as_ref() {
#[allow(unused_assignments)]
HotShotEvent::QuorumProposalValidated(proposal, parent_leaf) => {
let proposal_payload_comm = proposal.block_header.payload_commitment();
let proposal_payload_comm = proposal.data.block_header.payload_commitment();
if let Some(ref comm) = payload_commitment {
if proposal_payload_comm != *comm {
tracing::error!("Quorum proposal has inconsistent payload commitment with DAC or VID.");
Expand All @@ -295,11 +295,17 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> Handl
payload_commitment = Some(proposal_payload_comm);
}
let parent_commitment = parent_leaf.commit(&self.upgrade_lock).await;
let proposed_leaf = Leaf::from_quorum_proposal(proposal);
let proposed_leaf = Leaf::from_quorum_proposal(&proposal.data);
if proposed_leaf.parent_commitment() != parent_commitment {
tracing::warn!("Proposed leaf parent commitment does not match parent leaf payload commitment. Aborting vote.");
return;
}
// Update our persistent storage of the proposal. If we cannot store the proposal reutrn
// and error so we don't vote
if let Err(e) = self.storage.write().await.append_proposal(proposal).await {
tracing::error!("failed to store proposal, not voting. error = {e:#}");
return;
}
leaf = Some(proposed_leaf);
}
HotShotEvent::DaCertificateValidated(cert) => {
Expand Down Expand Up @@ -424,7 +430,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS
let event_view = match dependency_type {
VoteDependency::QuorumProposal => {
if let HotShotEvent::QuorumProposalValidated(proposal, _) = event {
proposal.view_number
proposal.data.view_number
} else {
return false;
}
Expand Down Expand Up @@ -549,19 +555,22 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS
match event.as_ref() {
HotShotEvent::QuorumProposalValidated(proposal, _leaf) => {
let cur_epoch = self.consensus.read().await.cur_epoch();
tracing::trace!("Received Proposal for view {}", *proposal.view_number());
tracing::trace!(
"Received Proposal for view {}",
*proposal.data.view_number()
);

// Handle the event before creating the dependency task.
if let Err(e) =
handle_quorum_proposal_validated(proposal, &event_sender, self).await
handle_quorum_proposal_validated(&proposal.data, &event_sender, self).await
{
tracing::debug!(
"Failed to handle QuorumProposalValidated event; error = {e:#}"
);
}

self.create_dependency_task_if_new(
proposal.view_number,
proposal.data.view_number,
cur_epoch,
event_receiver,
&event_sender,
Expand Down
2 changes: 1 addition & 1 deletion crates/task-impls/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for NetworkRequest
) -> Result<()> {
match event.as_ref() {
HotShotEvent::QuorumProposalValidated(proposal, _) => {
let prop_view = proposal.view_number();
let prop_view = proposal.data.view_number();
let cur_epoch = self.consensus.read().await.cur_epoch();

// If we already have the VID shares for the next view, do nothing.
Expand Down
2 changes: 1 addition & 1 deletion crates/testing/src/byzantine/byzantine_behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + std::fmt::Debug, V: Version
];
}
HotShotEvent::QuorumProposalValidated(proposal, _) => {
self.validated_proposals.push(proposal.clone());
self.validated_proposals.push(proposal.data.clone());
}
_ => {}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async fn test_quorum_proposal_recv_task() {
.await,
)),
exact(QuorumProposalValidated(
proposals[1].data.clone(),
proposals[1].clone(),
leaves[0].clone(),
)),
exact(ViewChange(ViewNumber::new(2))),
Expand Down
8 changes: 4 additions & 4 deletions crates/testing/tests/tests_1/quorum_vote_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn test_quorum_vote_task_success() {
// Send the quorum proposal, DAC, VID share data, and validated state, in which case a dummy
// vote can be formed and the view number will be updated.
let inputs = vec![random![
QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()),
QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()),
DaCertificateRecv(dacs[1].clone()),
VidShareRecv(leaders[1], vids[1].0[0].clone()),
]];
Expand Down Expand Up @@ -150,11 +150,11 @@ async fn test_quorum_vote_task_miss_dependency() {
// Send two of quorum proposal, DAC, VID share data, in which case there's no vote.
let inputs = vec![
random![
QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()),
QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()),
VidShareRecv(leaders[1], vid_share(&vids[1].0, handle.public_key())),
],
random![
QuorumProposalValidated(proposals[2].data.clone(), leaves[1].clone()),
QuorumProposalValidated(proposals[2].clone(), leaves[1].clone()),
DaCertificateRecv(dacs[2].clone()),
],
random![
Expand Down Expand Up @@ -223,7 +223,7 @@ async fn test_quorum_vote_task_incorrect_dependency() {

// Send the correct quorum proposal and DAC, and incorrect VID share data.
let inputs = vec![random![
QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()),
QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()),
DaCertificateRecv(dacs[1].clone()),
VidShareRecv(leaders[0], vids[0].0[0].clone()),
]];
Expand Down
10 changes: 5 additions & 5 deletions crates/testing/tests/tests_1/upgrade_task_with_vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,27 +108,27 @@ async fn test_upgrade_task_with_vote() {

let inputs = vec![
random![
QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()),
QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()),
DaCertificateRecv(dacs[1].clone()),
VidShareRecv(leaders[1], vids[1].0[0].clone()),
],
random![
QuorumProposalValidated(proposals[2].data.clone(), leaves[1].clone()),
QuorumProposalValidated(proposals[2].clone(), leaves[1].clone()),
DaCertificateRecv(dacs[2].clone()),
VidShareRecv(leaders[2], vids[2].0[0].clone()),
],
random![
QuorumProposalValidated(proposals[3].data.clone(), leaves[2].clone()),
QuorumProposalValidated(proposals[3].clone(), leaves[2].clone()),
DaCertificateRecv(dacs[3].clone()),
VidShareRecv(leaders[3], vids[3].0[0].clone()),
],
random![
QuorumProposalValidated(proposals[4].data.clone(), leaves[3].clone()),
QuorumProposalValidated(proposals[4].clone(), leaves[3].clone()),
DaCertificateRecv(dacs[4].clone()),
VidShareRecv(leaders[4], vids[4].0[0].clone()),
],
random![QuorumProposalValidated(
proposals[5].data.clone(),
proposals[5].clone(),
leaves[5].clone()
),],
];
Expand Down
2 changes: 1 addition & 1 deletion crates/testing/tests/tests_1/vote_dependency_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn test_vote_dependency_handle() {
// the dependency handles do not (yet) work with the existing test suite.
let all_inputs = vec![
DaCertificateValidated(dacs[1].clone()),
QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()),
QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()),
VidShareValidated(vids[1].0[0].clone()),
]
.into_iter()
Expand Down
Loading