From 4c82c1d3fd99ae605bd195b38640e88b3016a088 Mon Sep 17 00:00:00 2001 From: Jarred Parr Date: Tue, 27 Aug 2024 14:02:55 -0600 Subject: [PATCH 1/5] add support for DA broadcast --- crates/task-impls/src/network.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index e658cca42b..b036733634 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -315,7 +315,7 @@ impl< MessageKind::::from_consensus_message(SequencingMessage::General( GeneralConsensusMessage::ProposalRequested(req.clone(), signature), )), - TransmitType::Direct(membership.leader(req.view_number)), + TransmitType::DaCommitteeBroadcast, ), HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => ( sender_key.clone(), From 3511f65ce24c09febe6e244b05320bf6b80aea5b Mon Sep 17 00:00:00 2001 From: Jarred Parr Date: Tue, 27 Aug 2024 15:38:05 -0600 Subject: [PATCH 2/5] working test --- Cargo.lock | 22 ++--- crates/task-impls/src/consensus/handlers.rs | 5 - crates/task-impls/src/consensus/mod.rs | 2 +- crates/task-impls/src/helpers.rs | 97 ++++++++++--------- .../src/quorum_proposal/handlers.rs | 4 - crates/task-impls/src/request.rs | 1 + 6 files changed, 64 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3e64fd4b7b..4ce747f931 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3076,7 +3076,7 @@ dependencies = [ [[package]] name = "hotshot" -version = "0.5.71" +version = "0.5.72" dependencies = [ "anyhow", "async-broadcast", @@ -3142,7 +3142,7 @@ dependencies = [ [[package]] name = "hotshot-example-types" -version = "0.5.71" +version = "0.5.72" dependencies = [ "anyhow", "async-broadcast", @@ -3174,7 +3174,7 @@ dependencies = [ [[package]] name = "hotshot-examples" -version = "0.5.71" +version = "0.5.72" dependencies = [ "anyhow", "async-broadcast", @@ -3224,7 +3224,7 @@ dependencies = [ [[package]] name = "hotshot-fakeapi" -version = "0.5.71" +version = "0.5.72" dependencies = [ "anyhow", "async-lock 2.8.0", @@ -3242,7 +3242,7 @@ dependencies = [ [[package]] name = "hotshot-macros" -version = "0.5.71" +version = "0.5.72" dependencies = [ "derive_builder", "proc-macro2", @@ -3252,7 +3252,7 @@ dependencies = [ [[package]] name = "hotshot-orchestrator" -version = "0.5.71" +version = "0.5.72" dependencies = [ "anyhow", "async-compatibility-layer", @@ -3281,7 +3281,7 @@ dependencies = [ [[package]] name = "hotshot-stake-table" -version = "0.5.71" +version = "0.5.72" dependencies = [ "ark-bn254", "ark-ed-on-bn254", @@ -3302,7 +3302,7 @@ dependencies = [ [[package]] name = "hotshot-task" -version = "0.5.71" +version = "0.5.72" dependencies = [ "anyhow", "async-broadcast", @@ -3316,7 +3316,7 @@ dependencies = [ [[package]] name = "hotshot-task-impls" -version = "0.5.71" +version = "0.5.72" dependencies = [ "anyhow", "async-broadcast", @@ -3352,7 +3352,7 @@ dependencies = [ [[package]] name = "hotshot-testing" -version = "0.5.71" +version = "0.5.72" dependencies = [ "anyhow", "async-broadcast", @@ -4513,7 +4513,7 @@ dependencies = [ [[package]] name = "libp2p-networking" -version = "0.5.71" +version = "0.5.72" dependencies = [ "anyhow", "async-compatibility-layer", diff --git a/crates/task-impls/src/consensus/handlers.rs b/crates/task-impls/src/consensus/handlers.rs index 1dd8d0d47c..32c49a9841 100644 --- a/crates/task-impls/src/consensus/handlers.rs +++ b/crates/task-impls/src/consensus/handlers.rs @@ -137,11 +137,6 @@ pub async fn create_and_send_proposal( proposed_leaf.view_number(), ); - consensus - .write() - .await - .update_last_proposed_view(message.clone())?; - async_sleep(Duration::from_millis(round_start_delay)).await; broadcast_event( diff --git a/crates/task-impls/src/consensus/mod.rs b/crates/task-impls/src/consensus/mod.rs index a2af8fd41e..6c3b6fac21 100644 --- a/crates/task-impls/src/consensus/mod.rs +++ b/crates/task-impls/src/consensus/mod.rs @@ -420,7 +420,7 @@ impl, V: Versions> ConsensusTaskSt } if let Err(e) = self.consensus.write().await.update_high_qc(qc.clone()) { - tracing::error!("{e:?}"); + tracing::trace!("{e:?}"); } debug!( "Attempting to publish proposal after forming a QC for view {}", diff --git a/crates/task-impls/src/helpers.rs b/crates/task-impls/src/helpers.rs index 0eb0160790..0b455183c6 100644 --- a/crates/task-impls/src/helpers.rs +++ b/crates/task-impls/src/helpers.rs @@ -469,24 +469,26 @@ pub async fn validate_proposal_safety_and_liveness }, }; - if let Err(e) = consensus - .write() - .await - .update_validated_state_map(view_number, view.clone()) { - tracing::trace!("{e:?}"); - } - consensus - .write() - .await - .update_saved_leaves(proposed_leaf.clone()); + let mut consensus_write = consensus.write().await; + if let Err(e) = consensus_write.update_validated_state_map(view_number, view.clone()) { + tracing::trace!("{e:?}"); + } + consensus_write.update_saved_leaves(proposed_leaf.clone()); - // Broadcast that we've updated our consensus state so that other tasks know it's safe to grab. - broadcast_event( - Arc::new(HotShotEvent::ValidatedStateUpdated(view_number, view)), - &event_stream, - ) - .await; + // Update our internal storage of the proposal. The proposal is valid, so + // we swallow this error and just log if it occurs. + if let Err(e) = consensus_write.update_last_proposed_view(proposal.clone()) { + tracing::debug!("Internal proposal update failed; error = {e:#}"); + }; + + // Broadcast that we've updated our consensus state so that other tasks know it's safe to grab. + broadcast_event( + Arc::new(HotShotEvent::ValidatedStateUpdated(view_number, view)), + &event_stream, + ) + .await; + } UpgradeCertificate::validate( &proposal.data.upgrade_certificate, @@ -505,37 +507,39 @@ pub async fn validate_proposal_safety_and_liveness // passes. // Liveness check. - let read_consensus = consensus.read().await; - let liveness_check = justify_qc.view_number() > read_consensus.locked_view(); - - // Safety check. - // Check if proposal extends from the locked leaf. - let outcome = read_consensus.visit_leaf_ancestors( - justify_qc.view_number(), - Terminator::Inclusive(read_consensus.locked_view()), - false, - |leaf, _, _| { - // if leaf view no == locked view no then we're done, report success by - // returning true - leaf.view_number() != read_consensus.locked_view() - }, - ); - let safety_check = outcome.is_ok(); - - ensure!(safety_check || liveness_check, { - if let Err(e) = outcome { - broadcast_event( - Event { - view_number, - event: EventType::Error { error: Arc::new(e) }, - }, - &event_sender, - ) - .await; - } + { + let read_consensus = consensus.read().await; + let liveness_check = justify_qc.view_number() > read_consensus.locked_view(); + + // Safety check. + // Check if proposal extends from the locked leaf. + let outcome = read_consensus.visit_leaf_ancestors( + justify_qc.view_number(), + Terminator::Inclusive(read_consensus.locked_view()), + false, + |leaf, _, _| { + // if leaf view no == locked view no then we're done, report success by + // returning true + leaf.view_number() != read_consensus.locked_view() + }, + ); + let safety_check = outcome.is_ok(); + + ensure!(safety_check || liveness_check, { + if let Err(e) = outcome { + broadcast_event( + Event { + view_number, + event: EventType::Error { error: Arc::new(e) }, + }, + &event_sender, + ) + .await; + } - format!("Failed safety and liveness check \n High QC is {:?} Proposal QC is {:?} Locked view is {:?}", read_consensus.high_qc(), proposal.data.clone(), read_consensus.locked_view()) - }); + format!("Failed safety and liveness check \n High QC is {:?} Proposal QC is {:?} Locked view is {:?}", read_consensus.high_qc(), proposal.data.clone(), read_consensus.locked_view()) + }); + } // We accept the proposal, notify the application layer @@ -550,6 +554,7 @@ pub async fn validate_proposal_safety_and_liveness &event_sender, ) .await; + // Notify other tasks broadcast_event( Arc::new(HotShotEvent::QuorumProposalValidated( diff --git a/crates/task-impls/src/quorum_proposal/handlers.rs b/crates/task-impls/src/quorum_proposal/handlers.rs index f1fc665343..5a73bec2d1 100644 --- a/crates/task-impls/src/quorum_proposal/handlers.rs +++ b/crates/task-impls/src/quorum_proposal/handlers.rs @@ -229,10 +229,6 @@ impl ProposalDependencyHandle { proposed_leaf.view_number(), ); - self.consensus - .write() - .await - .update_last_proposed_view(message.clone())?; async_sleep(Duration::from_millis(self.round_start_delay)).await; broadcast_event( Arc::new(HotShotEvent::QuorumProposalSend( diff --git a/crates/task-impls/src/request.rs b/crates/task-impls/src/request.rs index c917c47256..faaa217ba9 100644 --- a/crates/task-impls/src/request.rs +++ b/crates/task-impls/src/request.rs @@ -88,6 +88,7 @@ type Signature = impl> TaskState for NetworkRequestState { type Event = HotShotEvent; + #[instrument(skip_all, target = "NetworkRequestState", fields(id = self.id))] async fn handle_event( &mut self, event: Arc, From 4e5f921b8b9f866beae3b4279c50f7665e492d2e Mon Sep 17 00:00:00 2001 From: Jarred Parr Date: Wed, 28 Aug 2024 09:21:04 -0600 Subject: [PATCH 3/5] tmp --- crates/task-impls/src/network.rs | 1 + crates/testing/tests/tests_2/catchup.rs | 65 +++++++++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index b036733634..26fff22bcf 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -464,6 +464,7 @@ impl< { return; } + if let MessageKind::Consensus(SequencingMessage::General( GeneralConsensusMessage::Proposal(prop), )) = &message.kind diff --git a/crates/testing/tests/tests_2/catchup.rs b/crates/testing/tests/tests_2/catchup.rs index 9b1fcb4376..c119c4a410 100644 --- a/crates/testing/tests/tests_2/catchup.rs +++ b/crates/testing/tests/tests_2/catchup.rs @@ -421,3 +421,68 @@ async fn test_all_restart_cdn() { .run_test::() .await; } + +#[cfg(test)] +#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_all_restart_one_da() { + use std::time::Duration; + + use hotshot_example_types::node_types::{CombinedImpl, TestTypes, TestVersions}; + use hotshot_testing::{ + block_builder::SimpleBuilderImplementation, + completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription}, + overall_safety_task::OverallSafetyPropertiesDescription, + spinning_task::{ChangeNode, SpinningTaskDescription, UpDown}, + test_builder::{TestDescription, TimingData}, + }; + + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + let timing_data = TimingData { + next_view_timeout: 2000, + ..Default::default() + }; + let mut metadata: TestDescription = + TestDescription::default(); + let mut catchup_nodes = vec![]; + for i in 1..20 { + catchup_nodes.push(ChangeNode { + idx: i, + updown: UpDown::Restart, + }) + } + + metadata.timing_data = timing_data; + metadata.start_nodes = 20; + metadata.num_nodes_with_stake = 20; + + // Explitily make the DA tiny to exaggerate a missing proposal. + // metadata.da_staked_committee_size = 9; + + metadata.spinning_properties = SpinningTaskDescription { + // Restart all the nodes in view 13 + node_changes: vec![(13, catchup_nodes)], + }; + metadata.view_sync_properties = + hotshot_testing::view_sync_task::ViewSyncTaskDescription::Threshold(0, 20); + + metadata.completion_task_description = + CompletionTaskDescription::TimeBasedCompletionTaskBuilder( + TimeBasedCompletionTaskDescription { + duration: Duration::from_secs(60), + }, + ); + metadata.overall_safety_properties = OverallSafetyPropertiesDescription { + // Make sure we keep committing rounds after the catchup, but not the full 50. + num_successful_views: 22, + num_failed_views: 15, + ..Default::default() + }; + + metadata + .gen_launcher(0) + .launch() + .run_test::() + .await; +} From 0d29eeee8fbf664bd45846f21a0494722dc49d70 Mon Sep 17 00:00:00 2001 From: Jarred Parr Date: Wed, 28 Aug 2024 11:09:32 -0600 Subject: [PATCH 4/5] add support for fused message type, verify failure condition and fix it --- crates/task-impls/src/consensus/handlers.rs | 3 ++- crates/task-impls/src/helpers.rs | 17 ++++++++++++-- crates/task-impls/src/network.rs | 22 ++++++++++++++++--- .../src/quorum_proposal_recv/handlers.rs | 3 ++- crates/testing/tests/tests_2/catchup.rs | 15 ++++++++++--- crates/types/src/traits/network.rs | 2 ++ 6 files changed, 52 insertions(+), 10 deletions(-) diff --git a/crates/task-impls/src/consensus/handlers.rs b/crates/task-impls/src/consensus/handlers.rs index 32c49a9841..0668b87360 100644 --- a/crates/task-impls/src/consensus/handlers.rs +++ b/crates/task-impls/src/consensus/handlers.rs @@ -518,7 +518,7 @@ pub(crate) async fn handle_quorum_proposal_recv< .entry(proposal.data.view_number()) .or_default() .push(async_spawn( - validate_proposal_safety_and_liveness( + validate_proposal_safety_and_liveness::( proposal.clone(), parent_leaf, OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)), @@ -529,6 +529,7 @@ pub(crate) async fn handle_quorum_proposal_recv< task_state.output_event_stream.clone(), task_state.id, task_state.upgrade_lock.clone(), + Arc::clone(&task_state.storage), ) .map(AnyhowTracing::err_as_debug), )); diff --git a/crates/task-impls/src/helpers.rs b/crates/task-impls/src/helpers.rs index 0b455183c6..9fb4da777c 100644 --- a/crates/task-impls/src/helpers.rs +++ b/crates/task-impls/src/helpers.rs @@ -29,8 +29,9 @@ use hotshot_types::{ traits::{ block_contents::BlockHeader, election::Membership, - node_implementation::{ConsensusTime, NodeType, Versions}, + node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions}, signature_key::SignatureKey, + storage::Storage, BlockPayload, ValidatedState, }, utils::{Terminator, View, ViewInner}, @@ -438,7 +439,11 @@ pub(crate) async fn parent_leaf_and_state( #[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_lines)] #[instrument(skip_all, fields(id = id, view = *proposal.data.view_number()))] -pub async fn validate_proposal_safety_and_liveness( +pub async fn validate_proposal_safety_and_liveness< + TYPES: NodeType, + I: NodeImplementation, + V: Versions, +>( proposal: Proposal>, parent_leaf: Leaf, consensus: OuterConsensus, @@ -449,6 +454,7 @@ pub async fn validate_proposal_safety_and_liveness event_sender: Sender>, id: u64, upgrade_lock: UpgradeLock, + storage: Arc>, ) -> Result<()> { let view_number = proposal.data.view_number(); @@ -482,6 +488,13 @@ pub async fn validate_proposal_safety_and_liveness tracing::debug!("Internal proposal update failed; error = {e:#}"); }; + // Update our persistent storage of the proposal. We also itentionally swallow + // this error as it should not affect consensus and would, instead, imply an + // issue on the sequencer side. + if let Err(e) = storage.write().await.append_proposal(&proposal).await { + tracing::debug!("Persisting the proposal update failed; error = {e:#}"); + }; + // Broadcast that we've updated our consensus state so that other tasks know it's safe to grab. broadcast_event( Arc::new(HotShotEvent::ValidatedStateUpdated(view_number, view)), diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index 26fff22bcf..5eff8ee713 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -39,8 +39,6 @@ pub fn quorum_filter(event: &Arc>) -> bool !matches!( event.as_ref(), HotShotEvent::QuorumProposalSend(_, _) - | HotShotEvent::QuorumProposalRequestSend(..) - | HotShotEvent::QuorumProposalResponseSend(..) | HotShotEvent::QuorumVoteSend(_) | HotShotEvent::DacSend(_, _) | HotShotEvent::TimeoutVoteSend(_) @@ -63,6 +61,8 @@ pub fn da_filter(event: &Arc>) -> bool { !matches!( event.as_ref(), HotShotEvent::DaProposalSend(_, _) + | HotShotEvent::QuorumProposalRequestSend(..) + | HotShotEvent::QuorumProposalResponseSend(..) | HotShotEvent::DaVoteSend(_) | HotShotEvent::ViewChange(_) ) @@ -315,7 +315,7 @@ impl< MessageKind::::from_consensus_message(SequencingMessage::General( GeneralConsensusMessage::ProposalRequested(req.clone(), signature), )), - TransmitType::DaCommitteeBroadcast, + TransmitType::DaCommitteeBroadcastAndLeader(membership.leader(req.view_number)), ), HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => ( sender_key.clone(), @@ -494,6 +494,22 @@ impl< net.da_broadcast_message(serialized_message, committee, broadcast_delay) .await } + TransmitType::DaCommitteeBroadcastAndLeader(recipient) => { + // Short-circuit exit from this call if we get an error during the direct leader broadcast. + // NOTE: An improvement to this is to check if the leader is in the DA committee but it's + // just a single extra message to the leader, so it's not an optimization that we need now. + if let Err(e) = net + .direct_message(serialized_message.clone(), recipient) + .await + { + error!("Failed to send message from network task: {e:?}"); + return; + } + + // Otherwise, send the next message. + net.da_broadcast_message(serialized_message, committee, broadcast_delay) + .await + } }; match transmit_result { diff --git a/crates/task-impls/src/quorum_proposal_recv/handlers.rs b/crates/task-impls/src/quorum_proposal_recv/handlers.rs index c5ae9cdca9..1658910986 100644 --- a/crates/task-impls/src/quorum_proposal_recv/handlers.rs +++ b/crates/task-impls/src/quorum_proposal_recv/handlers.rs @@ -246,7 +246,7 @@ pub(crate) async fn handle_quorum_proposal_recv< }; // Validate the proposal - validate_proposal_safety_and_liveness( + validate_proposal_safety_and_liveness::( proposal.clone(), parent_leaf, OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)), @@ -257,6 +257,7 @@ pub(crate) async fn handle_quorum_proposal_recv< task_state.output_event_stream.clone(), task_state.id, task_state.upgrade_lock.clone(), + Arc::clone(&task_state.storage), ) .await?; diff --git a/crates/testing/tests/tests_2/catchup.rs b/crates/testing/tests/tests_2/catchup.rs index c119c4a410..e0b79f7302 100644 --- a/crates/testing/tests/tests_2/catchup.rs +++ b/crates/testing/tests/tests_2/catchup.rs @@ -422,6 +422,10 @@ async fn test_all_restart_cdn() { .await; } +/// This test case ensures that proposals persist off of a restart. We demonstrate this by +/// artificially removing node 0 (the only DA committee member) from the candidate pool, +/// meaning that the entire DA also does not have the proposal, but we're still able to +/// move on because the *leader* does have the proposal. #[cfg(test)] #[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] #[cfg_attr(async_executor_impl = "async-std", async_std::test)] @@ -445,6 +449,11 @@ async fn test_all_restart_one_da() { }; let mut metadata: TestDescription = TestDescription::default(); + + let node_0_down = vec![ChangeNode { + idx: 0, + updown: UpDown::Restart, + }]; let mut catchup_nodes = vec![]; for i in 1..20 { catchup_nodes.push(ChangeNode { @@ -457,12 +466,12 @@ async fn test_all_restart_one_da() { metadata.start_nodes = 20; metadata.num_nodes_with_stake = 20; - // Explitily make the DA tiny to exaggerate a missing proposal. - // metadata.da_staked_committee_size = 9; + // Explicitly make the DA tiny to exaggerate a missing proposal. + metadata.da_staked_committee_size = 1; metadata.spinning_properties = SpinningTaskDescription { // Restart all the nodes in view 13 - node_changes: vec![(13, catchup_nodes)], + node_changes: vec![(12, node_0_down), (13, catchup_nodes)], }; metadata.view_sync_properties = hotshot_testing::view_sync_task::ViewSyncTaskDescription::Threshold(0, 20); diff --git a/crates/types/src/traits/network.rs b/crates/types/src/traits/network.rs index 33fc55df30..d166834563 100644 --- a/crates/types/src/traits/network.rs +++ b/crates/types/src/traits/network.rs @@ -84,6 +84,8 @@ pub enum TransmitType { Broadcast, /// broadcast to DA committee DaCommitteeBroadcast, + /// broadcast to the leader and the DA + DaCommitteeBroadcastAndLeader(TYPES::SignatureKey), } /// Error type for networking From 62161b0431463a57fdbecdc180b163dac5dc160a Mon Sep 17 00:00:00 2001 From: Jarred Parr Date: Thu, 29 Aug 2024 08:56:11 -0600 Subject: [PATCH 5/5] rename --- crates/task-impls/src/network.rs | 4 ++-- crates/types/src/traits/network.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index 5eff8ee713..319c23b12a 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -315,7 +315,7 @@ impl< MessageKind::::from_consensus_message(SequencingMessage::General( GeneralConsensusMessage::ProposalRequested(req.clone(), signature), )), - TransmitType::DaCommitteeBroadcastAndLeader(membership.leader(req.view_number)), + TransmitType::DaCommitteeAndLeaderBroadcast(membership.leader(req.view_number)), ), HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => ( sender_key.clone(), @@ -494,7 +494,7 @@ impl< net.da_broadcast_message(serialized_message, committee, broadcast_delay) .await } - TransmitType::DaCommitteeBroadcastAndLeader(recipient) => { + TransmitType::DaCommitteeAndLeaderBroadcast(recipient) => { // Short-circuit exit from this call if we get an error during the direct leader broadcast. // NOTE: An improvement to this is to check if the leader is in the DA committee but it's // just a single extra message to the leader, so it's not an optimization that we need now. diff --git a/crates/types/src/traits/network.rs b/crates/types/src/traits/network.rs index d166834563..d8d89e7c20 100644 --- a/crates/types/src/traits/network.rs +++ b/crates/types/src/traits/network.rs @@ -85,7 +85,7 @@ pub enum TransmitType { /// broadcast to DA committee DaCommitteeBroadcast, /// broadcast to the leader and the DA - DaCommitteeBroadcastAndLeader(TYPES::SignatureKey), + DaCommitteeAndLeaderBroadcast(TYPES::SignatureKey), } /// Error type for networking