Skip to content

Commit

Permalink
Allow Proposal requesting from any DA node (#3619)
Browse files Browse the repository at this point in the history
* add support for DA broadcast

* working test

* tmp

* add support for fused message type, verify failure condition and fix it

* rename
  • Loading branch information
jparr721 authored Aug 29, 2024
1 parent 165cbad commit 85e84e0
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 74 deletions.
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 2 additions & 6 deletions crates/task-impls/src/consensus/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,6 @@ pub async fn create_and_send_proposal<TYPES: NodeType, V: Versions>(
proposed_leaf.view_number(),
);

consensus
.write()
.await
.update_last_proposed_view(message.clone())?;

async_sleep(Duration::from_millis(round_start_delay)).await;

broadcast_event(
Expand Down Expand Up @@ -523,7 +518,7 @@ pub(crate) async fn handle_quorum_proposal_recv<
.entry(proposal.data.view_number())
.or_default()
.push(async_spawn(
validate_proposal_safety_and_liveness(
validate_proposal_safety_and_liveness::<TYPES, I, V>(
proposal.clone(),
parent_leaf,
OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
Expand All @@ -534,6 +529,7 @@ pub(crate) async fn handle_quorum_proposal_recv<
task_state.output_event_stream.clone(),
task_state.id,
task_state.upgrade_lock.clone(),
Arc::clone(&task_state.storage),
)
.map(AnyhowTracing::err_as_debug),
));
Expand Down
2 changes: 1 addition & 1 deletion crates/task-impls/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusTaskSt
}

if let Err(e) = self.consensus.write().await.update_high_qc(qc.clone()) {
tracing::error!("{e:?}");
tracing::trace!("{e:?}");
}
debug!(
"Attempting to publish proposal after forming a QC for view {}",
Expand Down
114 changes: 66 additions & 48 deletions crates/task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ use hotshot_types::{
traits::{
block_contents::BlockHeader,
election::Membership,
node_implementation::{ConsensusTime, NodeType, Versions},
node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
signature_key::SignatureKey,
storage::Storage,
BlockPayload, ValidatedState,
},
utils::{Terminator, View, ViewInner},
Expand Down Expand Up @@ -438,7 +439,11 @@ pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
#[allow(clippy::too_many_arguments)]
#[allow(clippy::too_many_lines)]
#[instrument(skip_all, fields(id = id, view = *proposal.data.view_number()))]
pub async fn validate_proposal_safety_and_liveness<TYPES: NodeType, V: Versions>(
pub async fn validate_proposal_safety_and_liveness<
TYPES: NodeType,
I: NodeImplementation<TYPES>,
V: Versions,
>(
proposal: Proposal<TYPES, QuorumProposal<TYPES>>,
parent_leaf: Leaf<TYPES>,
consensus: OuterConsensus<TYPES>,
Expand All @@ -449,6 +454,7 @@ pub async fn validate_proposal_safety_and_liveness<TYPES: NodeType, V: Versions>
event_sender: Sender<Event<TYPES>>,
id: u64,
upgrade_lock: UpgradeLock<TYPES, V>,
storage: Arc<RwLock<I::Storage>>,
) -> Result<()> {
let view_number = proposal.data.view_number();

Expand All @@ -469,24 +475,33 @@ pub async fn validate_proposal_safety_and_liveness<TYPES: NodeType, V: Versions>
},
};

if let Err(e) = consensus
.write()
.await
.update_validated_state_map(view_number, view.clone())
{
tracing::trace!("{e:?}");
}
consensus
.write()
.await
.update_saved_leaves(proposed_leaf.clone());
let mut consensus_write = consensus.write().await;
if let Err(e) = consensus_write.update_validated_state_map(view_number, view.clone()) {
tracing::trace!("{e:?}");
}
consensus_write.update_saved_leaves(proposed_leaf.clone());

// Broadcast that we've updated our consensus state so that other tasks know it's safe to grab.
broadcast_event(
Arc::new(HotShotEvent::ValidatedStateUpdated(view_number, view)),
&event_stream,
)
.await;
// Update our internal storage of the proposal. The proposal is valid, so
// we swallow this error and just log if it occurs.
if let Err(e) = consensus_write.update_last_proposed_view(proposal.clone()) {
tracing::debug!("Internal proposal update failed; error = {e:#}");
};

// Update our persistent storage of the proposal. We also itentionally swallow
// this error as it should not affect consensus and would, instead, imply an
// issue on the sequencer side.
if let Err(e) = storage.write().await.append_proposal(&proposal).await {
tracing::debug!("Persisting the proposal update failed; error = {e:#}");
};

// Broadcast that we've updated our consensus state so that other tasks know it's safe to grab.
broadcast_event(
Arc::new(HotShotEvent::ValidatedStateUpdated(view_number, view)),
&event_stream,
)
.await;
}

UpgradeCertificate::validate(
&proposal.data.upgrade_certificate,
Expand All @@ -505,37 +520,39 @@ pub async fn validate_proposal_safety_and_liveness<TYPES: NodeType, V: Versions>
// passes.

// Liveness check.
let read_consensus = consensus.read().await;
let liveness_check = justify_qc.view_number() > read_consensus.locked_view();

// Safety check.
// Check if proposal extends from the locked leaf.
let outcome = read_consensus.visit_leaf_ancestors(
justify_qc.view_number(),
Terminator::Inclusive(read_consensus.locked_view()),
false,
|leaf, _, _| {
// if leaf view no == locked view no then we're done, report success by
// returning true
leaf.view_number() != read_consensus.locked_view()
},
);
let safety_check = outcome.is_ok();

ensure!(safety_check || liveness_check, {
if let Err(e) = outcome {
broadcast_event(
Event {
view_number,
event: EventType::Error { error: Arc::new(e) },
},
&event_sender,
)
.await;
}
{
let read_consensus = consensus.read().await;
let liveness_check = justify_qc.view_number() > read_consensus.locked_view();

// Safety check.
// Check if proposal extends from the locked leaf.
let outcome = read_consensus.visit_leaf_ancestors(
justify_qc.view_number(),
Terminator::Inclusive(read_consensus.locked_view()),
false,
|leaf, _, _| {
// if leaf view no == locked view no then we're done, report success by
// returning true
leaf.view_number() != read_consensus.locked_view()
},
);
let safety_check = outcome.is_ok();

ensure!(safety_check || liveness_check, {
if let Err(e) = outcome {
broadcast_event(
Event {
view_number,
event: EventType::Error { error: Arc::new(e) },
},
&event_sender,
)
.await;
}

format!("Failed safety and liveness check \n High QC is {:?} Proposal QC is {:?} Locked view is {:?}", read_consensus.high_qc(), proposal.data.clone(), read_consensus.locked_view())
});
format!("Failed safety and liveness check \n High QC is {:?} Proposal QC is {:?} Locked view is {:?}", read_consensus.high_qc(), proposal.data.clone(), read_consensus.locked_view())
});
}

// We accept the proposal, notify the application layer

Expand All @@ -550,6 +567,7 @@ pub async fn validate_proposal_safety_and_liveness<TYPES: NodeType, V: Versions>
&event_sender,
)
.await;

// Notify other tasks
broadcast_event(
Arc::new(HotShotEvent::QuorumProposalValidated(
Expand Down
23 changes: 20 additions & 3 deletions crates/task-impls/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ pub fn quorum_filter<TYPES: NodeType>(event: &Arc<HotShotEvent<TYPES>>) -> bool
!matches!(
event.as_ref(),
HotShotEvent::QuorumProposalSend(_, _)
| HotShotEvent::QuorumProposalRequestSend(..)
| HotShotEvent::QuorumProposalResponseSend(..)
| HotShotEvent::QuorumVoteSend(_)
| HotShotEvent::DacSend(_, _)
| HotShotEvent::TimeoutVoteSend(_)
Expand All @@ -63,6 +61,8 @@ pub fn da_filter<TYPES: NodeType>(event: &Arc<HotShotEvent<TYPES>>) -> bool {
!matches!(
event.as_ref(),
HotShotEvent::DaProposalSend(_, _)
| HotShotEvent::QuorumProposalRequestSend(..)
| HotShotEvent::QuorumProposalResponseSend(..)
| HotShotEvent::DaVoteSend(_)
| HotShotEvent::ViewChange(_)
)
Expand Down Expand Up @@ -315,7 +315,7 @@ impl<
MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
GeneralConsensusMessage::ProposalRequested(req.clone(), signature),
)),
TransmitType::Direct(membership.leader(req.view_number)),
TransmitType::DaCommitteeAndLeaderBroadcast(membership.leader(req.view_number)),
),
HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => (
sender_key.clone(),
Expand Down Expand Up @@ -464,6 +464,7 @@ impl<
{
return;
}

if let MessageKind::Consensus(SequencingMessage::General(
GeneralConsensusMessage::Proposal(prop),
)) = &message.kind
Expand Down Expand Up @@ -493,6 +494,22 @@ impl<
net.da_broadcast_message(serialized_message, committee, broadcast_delay)
.await
}
TransmitType::DaCommitteeAndLeaderBroadcast(recipient) => {
// Short-circuit exit from this call if we get an error during the direct leader broadcast.
// NOTE: An improvement to this is to check if the leader is in the DA committee but it's
// just a single extra message to the leader, so it's not an optimization that we need now.
if let Err(e) = net
.direct_message(serialized_message.clone(), recipient)
.await
{
error!("Failed to send message from network task: {e:?}");
return;
}

// Otherwise, send the next message.
net.da_broadcast_message(serialized_message, committee, broadcast_delay)
.await
}
};

match transmit_result {
Expand Down
4 changes: 0 additions & 4 deletions crates/task-impls/src/quorum_proposal/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,6 @@ impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
proposed_leaf.view_number(),
);

self.consensus
.write()
.await
.update_last_proposed_view(message.clone())?;
async_sleep(Duration::from_millis(self.round_start_delay)).await;
broadcast_event(
Arc::new(HotShotEvent::QuorumProposalSend(
Expand Down
3 changes: 2 additions & 1 deletion crates/task-impls/src/quorum_proposal_recv/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ pub(crate) async fn handle_quorum_proposal_recv<
};

// Validate the proposal
validate_proposal_safety_and_liveness(
validate_proposal_safety_and_liveness::<TYPES, I, V>(
proposal.clone(),
parent_leaf,
OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
Expand All @@ -257,6 +257,7 @@ pub(crate) async fn handle_quorum_proposal_recv<
task_state.output_event_stream.clone(),
task_state.id,
task_state.upgrade_lock.clone(),
Arc::clone(&task_state.storage),
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions crates/task-impls/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type Signature<TYPES> =
impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for NetworkRequestState<TYPES, I> {
type Event = HotShotEvent<TYPES>;

#[instrument(skip_all, target = "NetworkRequestState", fields(id = self.id))]
async fn handle_event(
&mut self,
event: Arc<Self::Event>,
Expand Down
Loading

0 comments on commit 85e84e0

Please sign in to comment.