diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index 0e7be118f8..e9981b8b5f 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -24,7 +24,7 @@ use hotshot_task_impls::rewind::RewindTaskState; use hotshot_task_impls::{ da::DaTaskState, events::HotShotEvent, - network::{self, NetworkEventTaskState, NetworkMessageTaskState}, + network::{NetworkEventTaskState, NetworkMessageTaskState}, request::NetworkRequestState, response::{run_response_task, NetworkResponseState}, transactions::TransactionTaskState, @@ -163,15 +163,15 @@ pub fn add_network_event_task< NET: ConnectedNetwork, >( handle: &mut SystemContextHandle, - channel: Arc, - membership: TYPES::Membership, - filter: fn(&Arc>) -> bool, + network: Arc, + quorum_membership: TYPES::Membership, + da_membership: TYPES::Membership, ) { let network_state: NetworkEventTaskState<_, V, _, _> = NetworkEventTaskState { - channel, + network, view: TYPES::Time::genesis(), - membership, - filter, + quorum_membership, + da_membership, storage: Arc::clone(&handle.storage()), consensus: Arc::clone(&handle.consensus()), upgrade_lock: handle.hotshot.upgrade_lock.clone(), @@ -502,31 +502,7 @@ where handle, Arc::clone(&network), quorum_membership.clone(), - network::quorum_filter, - ); - self.add_network_event_task( - handle, - Arc::clone(&network), - quorum_membership.clone(), - network::upgrade_filter, - ); - self.add_network_event_task( - handle, - Arc::clone(&network), da_membership, - network::da_filter, - ); - self.add_network_event_task( - handle, - Arc::clone(&network), - quorum_membership.clone(), - network::view_sync_filter, - ); - self.add_network_event_task( - handle, - Arc::clone(&network), - quorum_membership, - network::vid_filter, ); } @@ -535,10 +511,10 @@ where &self, handle: &mut SystemContextHandle, channel: Arc<>::Network>, - membership: TYPES::Membership, - filter: fn(&Arc>) -> bool, + quorum_membership: TYPES::Membership, + da_membership: TYPES::Membership, ) { - add_network_event_task(handle, channel, membership, filter); + add_network_event_task(handle, channel, quorum_membership, da_membership); } } @@ -561,7 +537,6 @@ pub async fn add_network_message_and_request_receiver_tasks< ) { let network = Arc::clone(&handle.network); - add_network_message_task(handle, &network); add_network_message_task(handle, &network); add_request_network_task(handle).await; @@ -572,38 +547,13 @@ pub async fn add_network_message_and_request_receiver_tasks< pub fn add_network_event_tasks, V: Versions>( handle: &mut SystemContextHandle, ) { - let network = Arc::clone(&handle.network); let quorum_membership = handle.memberships.quorum_membership.clone(); let da_membership = handle.memberships.da_membership.clone(); add_network_event_task( handle, - Arc::clone(&network), - quorum_membership.clone(), - network::quorum_filter, - ); - add_network_event_task( - handle, - Arc::clone(&network), - quorum_membership.clone(), - network::upgrade_filter, - ); - add_network_event_task( - handle, - Arc::clone(&network), - da_membership, - network::da_filter, - ); - add_network_event_task( - handle, - Arc::clone(&network), - quorum_membership.clone(), - network::view_sync_filter, - ); - add_network_event_task( - handle, - Arc::clone(&network), + Arc::clone(&handle.network), quorum_membership, - network::vid_filter, + da_membership, ); } diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index 718d752f27..dcd7e85945 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -38,63 +38,6 @@ use crate::{ helpers::broadcast_event, }; -/// quorum filter -pub fn quorum_filter(event: &Arc>) -> bool { - !matches!( - event.as_ref(), - HotShotEvent::QuorumProposalSend(_, _) - | HotShotEvent::QuorumVoteSend(_) - | HotShotEvent::DacSend(_, _) - | HotShotEvent::TimeoutVoteSend(_) - | HotShotEvent::ViewChange(_) - ) -} - -/// upgrade filter -pub fn upgrade_filter(event: &Arc>) -> bool { - !matches!( - event.as_ref(), - HotShotEvent::UpgradeProposalSend(_, _) - | HotShotEvent::UpgradeVoteSend(_) - | HotShotEvent::ViewChange(_) - ) -} - -/// DA filter -pub fn da_filter(event: &Arc>) -> bool { - !matches!( - event.as_ref(), - HotShotEvent::DaProposalSend(_, _) - | HotShotEvent::QuorumProposalRequestSend(..) - | HotShotEvent::QuorumProposalResponseSend(..) - | HotShotEvent::VidResponseSend(..) - | HotShotEvent::VidRequestSend(..) - | HotShotEvent::DaVoteSend(_) - | HotShotEvent::ViewChange(_) - ) -} - -/// vid filter -pub fn vid_filter(event: &Arc>) -> bool { - !matches!( - event.as_ref(), - HotShotEvent::VidDisperseSend(_, _) | HotShotEvent::ViewChange(_) - ) -} - -/// view sync filter -pub fn view_sync_filter(event: &Arc>) -> bool { - !matches!( - event.as_ref(), - HotShotEvent::ViewSyncPreCommitCertificate2Send(_, _) - | HotShotEvent::ViewSyncCommitCertificate2Send(_, _) - | HotShotEvent::ViewSyncFinalizeCertificate2Send(_, _) - | HotShotEvent::ViewSyncPreCommitVoteSend(_) - | HotShotEvent::ViewSyncCommitVoteSend(_) - | HotShotEvent::ViewSyncFinalizeVoteSend(_) - | HotShotEvent::ViewChange(_) - ) -} /// the network message task state #[derive(Clone)] pub struct NetworkMessageTaskState { @@ -235,18 +178,17 @@ impl NetworkMessageTaskState { pub struct NetworkEventTaskState< TYPES: NodeType, V: Versions, - COMMCHANNEL: ConnectedNetwork, + NET: ConnectedNetwork, S: Storage, > { - /// comm channel - pub channel: Arc, + /// comm network + pub network: Arc, /// view number pub view: TYPES::Time, - /// membership for the channel - pub membership: TYPES::Membership, - // TODO ED Need to add exchange so we can get the recipient key and our own key? - /// Filter which returns false for the events that this specific network task cares about - pub filter: fn(&Arc>) -> bool, + /// quorum for the network + pub quorum_membership: TYPES::Membership, + /// da for the network + pub da_membership: TYPES::Membership, /// Storage to store actionable events pub storage: Arc>, /// Shared consensus state @@ -259,9 +201,9 @@ pub struct NetworkEventTaskState< impl< TYPES: NodeType, V: Versions, - COMMCHANNEL: ConnectedNetwork, + NET: ConnectedNetwork, S: Storage + 'static, - > TaskState for NetworkEventTaskState + > TaskState for NetworkEventTaskState { type Event = HotShotEvent; @@ -271,11 +213,7 @@ impl< _sender: &Sender>, _receiver: &Receiver>, ) -> Result<()> { - let membership = self.membership.clone(); - - if !(self.filter)(&event) { - self.handle(event, &membership).await; - } + self.handle(event).await; Ok(()) } @@ -286,24 +224,20 @@ impl< impl< TYPES: NodeType, V: Versions, - COMMCHANNEL: ConnectedNetwork, + NET: ConnectedNetwork, S: Storage + 'static, - > NetworkEventTaskState + > NetworkEventTaskState { /// Handle the given event. /// /// Returns the completion status. #[instrument(skip_all, fields(view = *self.view), name = "Network Task", level = "error")] - pub async fn handle( - &mut self, - event: Arc>, - membership: &TYPES::Membership, - ) { + pub async fn handle(&mut self, event: Arc>) { let mut maybe_action = None; if let Some((sender, message_kind, transmit)) = - self.parse_event(event, &mut maybe_action, membership).await + self.parse_event(event, &mut maybe_action).await { - self.spawn_transmit_task(message_kind, membership, maybe_action, transmit, sender); + self.spawn_transmit_task(message_kind, maybe_action, transmit, sender); }; } @@ -336,11 +270,11 @@ impl< messages.insert(recipient, serialized_message); } - let net = Arc::clone(&self.channel); + let net = Arc::clone(&self.network); let storage = Arc::clone(&self.storage); let state = Arc::clone(&self.consensus); async_spawn(async move { - if NetworkEventTaskState::::maybe_record_action( + if NetworkEventTaskState::::maybe_record_action( Some(HotShotAction::VidDisperse), storage, state, @@ -393,7 +327,6 @@ impl< &mut self, event: Arc>, maybe_action: &mut Option, - membership: &TYPES::Membership, ) -> Option<( ::SignatureKey, MessageKind, @@ -419,7 +352,7 @@ impl< MessageKind::::from_consensus_message(SequencingMessage::General( GeneralConsensusMessage::Vote(vote.clone()), )), - TransmitType::Direct(membership.leader(vote.view_number() + 1)), + TransmitType::Direct(self.quorum_membership.leader(vote.view_number() + 1)), )) } HotShotEvent::QuorumProposalRequestSend(req, signature) => Some(( @@ -427,7 +360,9 @@ impl< MessageKind::::from_consensus_message(SequencingMessage::General( GeneralConsensusMessage::ProposalRequested(req.clone(), signature), )), - TransmitType::DaCommitteeAndLeaderBroadcast(membership.leader(req.view_number)), + TransmitType::DaCommitteeAndLeaderBroadcast( + self.quorum_membership.leader(req.view_number), + ), )), HotShotEvent::QuorumProposalResponseSend(sender_key, proposal) => Some(( sender_key.clone(), @@ -457,10 +392,9 @@ impl< MessageKind::::from_consensus_message(SequencingMessage::Da( DaConsensusMessage::DaVote(vote.clone()), )), - TransmitType::Direct(membership.leader(vote.view_number())), + TransmitType::Direct(self.quorum_membership.leader(vote.view_number())), )) } - // ED NOTE: This needs to be broadcasted to all nodes, not just ones on the DA committee HotShotEvent::DacSend(certificate, sender) => { *maybe_action = Some(HotShotAction::DaCert); Some(( @@ -476,21 +410,30 @@ impl< MessageKind::::from_consensus_message(SequencingMessage::General( GeneralConsensusMessage::ViewSyncPreCommitVote(vote.clone()), )), - TransmitType::Direct(membership.leader(vote.view_number() + vote.date().relay)), + TransmitType::Direct( + self.quorum_membership + .leader(vote.view_number() + vote.date().relay), + ), )), HotShotEvent::ViewSyncCommitVoteSend(vote) => Some(( vote.signing_key(), MessageKind::::from_consensus_message(SequencingMessage::General( GeneralConsensusMessage::ViewSyncCommitVote(vote.clone()), )), - TransmitType::Direct(membership.leader(vote.view_number() + vote.date().relay)), + TransmitType::Direct( + self.quorum_membership + .leader(vote.view_number() + vote.date().relay), + ), )), HotShotEvent::ViewSyncFinalizeVoteSend(vote) => Some(( vote.signing_key(), MessageKind::::from_consensus_message(SequencingMessage::General( GeneralConsensusMessage::ViewSyncFinalizeVote(vote.clone()), )), - TransmitType::Direct(membership.leader(vote.view_number() + vote.date().relay)), + TransmitType::Direct( + self.quorum_membership + .leader(vote.view_number() + vote.date().relay), + ), )), HotShotEvent::ViewSyncPreCommitCertificate2Send(certificate, sender) => Some(( sender, @@ -520,7 +463,7 @@ impl< MessageKind::::from_consensus_message(SequencingMessage::General( GeneralConsensusMessage::TimeoutVote(vote.clone()), )), - TransmitType::Direct(membership.leader(vote.view_number() + 1)), + TransmitType::Direct(self.quorum_membership.leader(vote.view_number() + 1)), )) } HotShotEvent::UpgradeProposalSend(proposal, sender) => Some(( @@ -537,13 +480,13 @@ impl< MessageKind::::from_consensus_message(SequencingMessage::General( GeneralConsensusMessage::UpgradeVote(vote.clone()), )), - TransmitType::Direct(membership.leader(vote.view_number())), + TransmitType::Direct(self.quorum_membership.leader(vote.view_number())), )) } HotShotEvent::ViewChange(view) => { self.view = view; - self.channel - .update_view::(self.view.u64(), membership) + self.network + .update_view::(self.view.u64(), &self.quorum_membership) .await; None } @@ -570,7 +513,6 @@ impl< fn spawn_transmit_task( &self, message_kind: MessageKind, - membership: &TYPES::Membership, maybe_action: Option, transmit: TransmitType, sender: TYPES::SignatureKey, @@ -587,14 +529,14 @@ impl< kind: message_kind, }; let view = message.kind.view_number(); - let committee = membership.committee_members(view); - let committee_topic = membership.committee_topic(); - let net = Arc::clone(&self.channel); + let committee_topic = self.quorum_membership.committee_topic(); + let da_committee = self.da_membership.committee_members(view); + let net = Arc::clone(&self.network); let storage = Arc::clone(&self.storage); let state = Arc::clone(&self.consensus); let upgrade_lock = self.upgrade_lock.clone(); async_spawn(async move { - if NetworkEventTaskState::::maybe_record_action( + if NetworkEventTaskState::::maybe_record_action( maybe_action, Arc::clone(&storage), state, @@ -631,23 +573,19 @@ impl< .await } TransmitType::DaCommitteeBroadcast => { - net.da_broadcast_message(serialized_message, committee, broadcast_delay) + net.da_broadcast_message(serialized_message, da_committee, broadcast_delay) .await } TransmitType::DaCommitteeAndLeaderBroadcast(recipient) => { - // Short-circuit exit from this call if we get an error during the direct leader broadcast. - // NOTE: An improvement to this is to check if the leader is in the DA committee but it's - // just a single extra message to the leader, so it's not an optimization that we need now. if let Err(e) = net .direct_message(serialized_message.clone(), recipient) .await { error!("Failed to send message from network task: {e:?}"); - return; } // Otherwise, send the next message. - net.da_broadcast_message(serialized_message, committee, broadcast_delay) + net.da_broadcast_message(serialized_message, da_committee, broadcast_delay) .await } }; @@ -685,11 +623,11 @@ pub mod test { pub struct NetworkEventTaskStateModifier< TYPES: NodeType, V: Versions, - COMMCHANNEL: ConnectedNetwork, + NET: ConnectedNetwork, S: Storage, > { /// The real `NetworkEventTaskState` - pub network_event_task_state: NetworkEventTaskState, + pub network_event_task_state: NetworkEventTaskState, /// A function that takes the result of `NetworkEventTaskState::parse_event` and /// changes it before transmitting on the network. pub modifier: Arc>, @@ -698,23 +636,24 @@ pub mod test { impl< TYPES: NodeType, V: Versions, - COMMCHANNEL: ConnectedNetwork, + NET: ConnectedNetwork, S: Storage + 'static, - > NetworkEventTaskStateModifier + > NetworkEventTaskStateModifier { /// Handles the received event modifying it before sending on the network. - pub async fn handle( - &mut self, - event: Arc>, - membership: &TYPES::Membership, - ) { + pub async fn handle(&mut self, event: Arc>) { let mut maybe_action = None; if let Some((mut sender, mut message_kind, mut transmit)) = - self.parse_event(event, &mut maybe_action, membership).await + self.parse_event(event, &mut maybe_action).await { // Modify the values acquired by parsing the event. - (self.modifier)(&mut sender, &mut message_kind, &mut transmit, membership); - self.spawn_transmit_task(message_kind, membership, maybe_action, transmit, sender); + (self.modifier)( + &mut sender, + &mut message_kind, + &mut transmit, + &self.quorum_membership, + ); + self.spawn_transmit_task(message_kind, maybe_action, transmit, sender); } } } @@ -723,9 +662,9 @@ pub mod test { impl< TYPES: NodeType, V: Versions, - COMMCHANNEL: ConnectedNetwork, + NET: ConnectedNetwork, S: Storage + 'static, - > TaskState for NetworkEventTaskStateModifier + > TaskState for NetworkEventTaskStateModifier { type Event = HotShotEvent; @@ -735,11 +674,7 @@ pub mod test { _sender: &Sender>, _receiver: &Receiver>, ) -> Result<()> { - let membership = self.network_event_task_state.membership.clone(); - - if !(self.network_event_task_state.filter)(&event) { - self.handle(event, &membership).await; - } + self.handle(event).await; Ok(()) } @@ -750,11 +685,11 @@ pub mod test { impl< TYPES: NodeType, V: Versions, - COMMCHANNEL: ConnectedNetwork, + NET: ConnectedNetwork, S: Storage, - > Deref for NetworkEventTaskStateModifier + > Deref for NetworkEventTaskStateModifier { - type Target = NetworkEventTaskState; + type Target = NetworkEventTaskState; fn deref(&self) -> &Self::Target { &self.network_event_task_state @@ -764,9 +699,9 @@ pub mod test { impl< TYPES: NodeType, V: Versions, - COMMCHANNEL: ConnectedNetwork, + NET: ConnectedNetwork, S: Storage, - > DerefMut for NetworkEventTaskStateModifier + > DerefMut for NetworkEventTaskStateModifier { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.network_event_task_state diff --git a/crates/testing/src/byzantine/byzantine_behaviour.rs b/crates/testing/src/byzantine/byzantine_behaviour.rs index 58eb6ed58d..beab8b5ae6 100644 --- a/crates/testing/src/byzantine/byzantine_behaviour.rs +++ b/crates/testing/src/byzantine/byzantine_behaviour.rs @@ -340,15 +340,15 @@ impl + std::fmt::Debug, V: Version fn add_network_event_task( &self, handle: &mut SystemContextHandle, - channel: Arc<>::Network>, - membership: TYPES::Membership, - filter: fn(&Arc>) -> bool, + network: Arc<>::Network>, + quorum_membership: TYPES::Membership, + da_membership: TYPES::Membership, ) { let network_state: NetworkEventTaskState<_, V, _, _> = NetworkEventTaskState { - channel, + network, view: TYPES::Time::genesis(), - membership, - filter, + quorum_membership, + da_membership, storage: Arc::clone(&handle.storage()), consensus: Arc::clone(&handle.consensus()), upgrade_lock: handle.hotshot.upgrade_lock.clone(), diff --git a/crates/testing/tests/tests_1/network_task.rs b/crates/testing/tests/tests_1/network_task.rs index cb623926d1..03a855a05f 100644 --- a/crates/testing/tests/tests_1/network_task.rs +++ b/crates/testing/tests/tests_1/network_task.rs @@ -12,10 +12,7 @@ use async_lock::RwLock; use hotshot::traits::implementations::MemoryNetwork; use hotshot_example_types::node_types::{MemoryImpl, TestTypes, TestVersions}; use hotshot_task::task::{ConsensusTaskRegistry, Task}; -use hotshot_task_impls::{ - events::HotShotEvent, - network::{self, NetworkEventTaskState}, -}; +use hotshot_task_impls::{events::HotShotEvent, network::NetworkEventTaskState}; use hotshot_testing::{ helpers::build_system_handle, test_builder::TestDescription, test_task::add_network_message_test_task, view_generator::TestViewGenerator, @@ -63,10 +60,10 @@ async fn test_network_task() { ::Membership::new(all_nodes.clone(), all_nodes, Topic::Global); let network_state: NetworkEventTaskState, _> = NetworkEventTaskState { - channel: network.clone(), + network: network.clone(), view: ViewNumber::new(0), - membership: membership.clone(), - filter: network::quorum_filter, + quorum_membership: membership.clone(), + da_membership: membership.clone(), upgrade_lock: upgrade_lock.clone(), storage, consensus, @@ -139,10 +136,10 @@ async fn test_network_storage_fail() { ::Membership::new(all_nodes.clone(), all_nodes, Topic::Global); let network_state: NetworkEventTaskState, _> = NetworkEventTaskState { - channel: network.clone(), + network: network.clone(), view: ViewNumber::new(0), - membership: membership.clone(), - filter: network::quorum_filter, + quorum_membership: membership.clone(), + da_membership: membership.clone(), upgrade_lock: upgrade_lock.clone(), storage, consensus,