diff --git a/Cargo.lock b/Cargo.lock index 30d6b0c0cb..df60760987 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3109,6 +3109,7 @@ dependencies = [ "libp2p-networking", "lru 0.12.4", "num_enum", + "parking_lot", "portpicker", "rand 0.8.5", "serde", diff --git a/crates/hotshot/Cargo.toml b/crates/hotshot/Cargo.toml index 696a9cc7c4..9dd8e2a35a 100644 --- a/crates/hotshot/Cargo.toml +++ b/crates/hotshot/Cargo.toml @@ -61,6 +61,7 @@ blake3.workspace = true sha2 = { workspace = true } url = { workspace = true } num_enum = "0.7" +parking_lot = "0.12" [target.'cfg(all(async_executor_impl = "tokio"))'.dependencies] tokio = { workspace = true } diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index 5db94d0e14..106300e271 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -8,10 +8,10 @@ /// Provides trait to create task states from a `SystemContextHandle` pub mod task_state; -use std::{fmt::Debug, sync::Arc, time::Duration}; +use std::{fmt::Debug, sync::Arc}; use async_broadcast::broadcast; -use async_compatibility_layer::art::{async_sleep, async_spawn}; +use async_compatibility_layer::art::async_spawn; use async_lock::RwLock; use async_trait::async_trait; use futures::{ @@ -35,7 +35,7 @@ use hotshot_task_impls::{ use hotshot_types::{ consensus::Consensus, constants::EVENT_CHANNEL_SIZE, - message::{Messages, UpgradeLock}, + message::{Message, UpgradeLock}, request_response::RequestReceiver, traits::{ network::ConnectedNetwork, @@ -119,53 +119,37 @@ pub fn add_network_message_task< let task_handle = async_spawn(async move { futures::pin_mut!(shutdown_signal); - let recv_stream = stream::unfold((), |()| async { - let msgs = match network.recv_msgs().await { - Ok(msgs) => { - let mut deserialized_messages = Vec::new(); - for msg in msgs { - let deserialized_message = match upgrade_lock.deserialize(&msg).await { - Ok(deserialized) => deserialized, - Err(e) => { - tracing::error!("Failed to deserialize message: {}", e); - continue; - } - }; - deserialized_messages.push(deserialized_message); - } - Messages(deserialized_messages) - } - Err(err) => { - tracing::error!("failed to receive messages: {err}"); - Messages(vec![]) - } - }; - Some((msgs, ())) - }); - - let fused_recv_stream = recv_stream.boxed().fuse(); - futures::pin_mut!(fused_recv_stream); - loop { + // Wait for one of the following to resolve: futures::select! { + // Wait for a shutdown signal () = shutdown_signal => { tracing::error!("Shutting down network message task"); return; } - msgs_option = fused_recv_stream.next() => { - if let Some(msgs) = msgs_option { - if msgs.0.is_empty() { - // TODO: Stop sleeping here: https://github.com/EspressoSystems/HotShot/issues/2558 - async_sleep(Duration::from_millis(100)).await; - } else { - state.handle_messages(msgs.0).await; + + // Wait for a message from the network + message = network.recv_message().fuse() => { + // Make sure the message did not fail + let message = match message { + Ok(message) => message, + Err(e) => { + tracing::error!("Failed to receive message: {:?}", e); + continue; } - } else { - // Stream has ended, which shouldn't happen in this case. - // You might want to handle this situation, perhaps by breaking the loop or logging an error. - tracing::error!("Network message stream unexpectedly ended"); - return; - } + }; + + // Deserialize the message + let deserialized_message: Message = match upgrade_lock.deserialize(&message).await { + Ok(message) => message, + Err(e) => { + tracing::error!("Failed to deserialize message: {:?}", e); + continue; + } + }; + + // Handle the message + state.handle_message(deserialized_message).await; } } } diff --git a/crates/hotshot/src/traits/networking/combined_network.rs b/crates/hotshot/src/traits/networking/combined_network.rs index 8403964d60..4bed1471b6 100644 --- a/crates/hotshot/src/traits/networking/combined_network.rs +++ b/crates/hotshot/src/traits/networking/combined_network.rs @@ -18,6 +18,8 @@ use std::{ time::Duration, }; +use parking_lot::RwLock as PlRwLock; + use async_broadcast::{broadcast, InactiveReceiver, Sender}; use async_compatibility_layer::{ art::{async_sleep, async_spawn}, @@ -68,7 +70,7 @@ pub struct CombinedNetworks { networks: Arc>, /// Last n seen messages to prevent processing duplicates - message_cache: Arc>>, + message_cache: Arc>>, /// How many times primary failed to deliver primary_fail_counter: Arc, @@ -106,7 +108,7 @@ impl CombinedNetworks { Self { networks, - message_cache: Arc::new(RwLock::new(LruCache::new( + message_cache: Arc::new(PlRwLock::new(LruCache::new( NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(), ))), primary_fail_counter: Arc::new(AtomicU64::new(0)), @@ -304,7 +306,7 @@ impl TestableNetworkingImplementation for CombinedNetwor ); // We want to use the same message cache between the two networks - let message_cache = Arc::new(RwLock::new(LruCache::new( + let message_cache = Arc::new(PlRwLock::new(LruCache::new( NonZeroUsize::new(COMBINED_NETWORK_CACHE_SIZE).unwrap(), ))); @@ -466,35 +468,29 @@ impl ConnectedNetwork for CombinedNetworks /// /// # Errors /// Does not error - async fn recv_msgs(&self) -> Result>, NetworkError> { - // recv on both networks because nodes may be accessible only on either. discard duplicates - // TODO: improve this algorithm: https://github.com/EspressoSystems/HotShot/issues/2089 - let mut primary_fut = self.primary().recv_msgs().fuse(); - let mut secondary_fut = self.secondary().recv_msgs().fuse(); - - let msgs = select! { - p = primary_fut => p?, - s = secondary_fut => s?, - }; + async fn recv_message(&self) -> Result, NetworkError> { + loop { + // Receive from both networks + let mut primary_fut = self.primary().recv_message().fuse(); + let mut secondary_fut = self.secondary().recv_message().fuse(); + + // Wait for one to return a message + let message = select! { + p = primary_fut => p?, + s = secondary_fut => s?, + }; - let mut filtered_msgs = Vec::with_capacity(msgs.len()); - - // For each message, - for msg in msgs { // Calculate hash of the message - let message_hash = calculate_hash_of(&msg); + let message_hash = calculate_hash_of(&message); - // Add the hash to the cache - if !self.message_cache.read().await.contains(&message_hash) { - // If the message is not in the cache, process it - filtered_msgs.push(msg.clone()); + // Check if the hash is in the cache + if !self.message_cache.read().contains(&message_hash) { + // Add the hash to the cache + self.message_cache.write().put(message_hash, ()); - // Add it to the cache - self.message_cache.write().await.put(message_hash, ()); + break Ok(message); } } - - Ok(filtered_msgs) } fn queue_node_lookup( diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index dbc4d36e5b..e75e601235 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -1055,12 +1055,12 @@ impl ConnectedNetwork for Libp2pNetwork { /// /// # Errors /// If there is a network-related failure. - #[instrument(name = "Libp2pNetwork::recv_msgs", skip_all)] - async fn recv_msgs(&self) -> Result>, NetworkError> { + #[instrument(name = "Libp2pNetwork::recv_message", skip_all)] + async fn recv_message(&self) -> Result, NetworkError> { let result = self .inner .receiver - .drain_at_least_one() + .recv() .await .map_err(|_x| NetworkError::ShutDown)?; diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index 392dc9c005..ee3a55684f 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -371,19 +371,19 @@ impl ConnectedNetwork for MemoryNetwork { /// /// # Errors /// If the other side of the channel is closed - #[instrument(name = "MemoryNetwork::recv_msgs", skip_all)] - async fn recv_msgs(&self) -> Result>, NetworkError> { + #[instrument(name = "MemoryNetwork::recv_messages", skip_all)] + async fn recv_message(&self) -> Result, NetworkError> { let ret = self .inner .output .lock() .await - .drain_at_least_one() + .recv() .await .map_err(|_x| NetworkError::ShutDown)?; self.inner .in_flight_message_count - .fetch_sub(ret.len(), Ordering::Relaxed); + .fetch_sub(1, Ordering::Relaxed); Ok(ret) } } diff --git a/crates/hotshot/src/traits/networking/push_cdn_network.rs b/crates/hotshot/src/traits/networking/push_cdn_network.rs index 3c544a6e61..f675b01f37 100644 --- a/crates/hotshot/src/traits/networking/push_cdn_network.rs +++ b/crates/hotshot/src/traits/networking/push_cdn_network.rs @@ -546,7 +546,7 @@ impl ConnectedNetwork for PushCdnNetwork { /// /// # Errors /// - If we fail to receive messages. Will trigger a retry automatically. - async fn recv_msgs(&self) -> Result>, NetworkError> { + async fn recv_message(&self) -> Result, NetworkError> { // Receive a message let message = self.client.receive_message().await; @@ -577,7 +577,7 @@ impl ConnectedNetwork for PushCdnNetwork { return Ok(vec![]); }; - Ok(vec![message]) + Ok(message) } /// Do nothing here, as we don't need to look up nodes. diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index 7102179585..2042a9fc04 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -102,109 +102,106 @@ pub struct NetworkMessageTaskState { impl NetworkMessageTaskState { #[instrument(skip_all, name = "Network message task", level = "trace")] - /// Handle the message. - pub async fn handle_messages(&mut self, messages: Vec>) { - // We will send only one event for a vector of transactions. - let mut transactions = Vec::new(); - for message in messages { - tracing::trace!("Received message from network:\n\n{message:?}"); - let sender = message.sender; - match message.kind { - MessageKind::Consensus(consensus_message) => { - let event = match consensus_message { - SequencingMessage::General(general_message) => match general_message { - GeneralConsensusMessage::Proposal(proposal) => { - HotShotEvent::QuorumProposalRecv(proposal, sender) - } - GeneralConsensusMessage::ProposalRequested(req, sig) => { - HotShotEvent::QuorumProposalRequestRecv(req, sig) - } - GeneralConsensusMessage::LeaderProposalAvailable(proposal) => { - HotShotEvent::QuorumProposalResponseRecv(proposal) - } - GeneralConsensusMessage::Vote(vote) => { - HotShotEvent::QuorumVoteRecv(vote.clone()) - } - GeneralConsensusMessage::ViewSyncPreCommitVote(view_sync_message) => { - HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message) - } - GeneralConsensusMessage::ViewSyncPreCommitCertificate( - view_sync_message, - ) => HotShotEvent::ViewSyncPreCommitCertificate2Recv(view_sync_message), - - GeneralConsensusMessage::ViewSyncCommitVote(view_sync_message) => { - HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message) - } - GeneralConsensusMessage::ViewSyncCommitCertificate( - view_sync_message, - ) => HotShotEvent::ViewSyncCommitCertificate2Recv(view_sync_message), - - GeneralConsensusMessage::ViewSyncFinalizeVote(view_sync_message) => { - HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message) - } - GeneralConsensusMessage::ViewSyncFinalizeCertificate( - view_sync_message, - ) => HotShotEvent::ViewSyncFinalizeCertificate2Recv(view_sync_message), - - GeneralConsensusMessage::TimeoutVote(message) => { - HotShotEvent::TimeoutVoteRecv(message) - } - GeneralConsensusMessage::UpgradeProposal(message) => { - HotShotEvent::UpgradeProposalRecv(message, sender) - } - GeneralConsensusMessage::UpgradeVote(message) => { - error!("Received upgrade vote!"); - HotShotEvent::UpgradeVoteRecv(message) - } - }, - SequencingMessage::Da(da_message) => match da_message { - DaConsensusMessage::DaProposal(proposal) => { - HotShotEvent::DaProposalRecv(proposal, sender) - } - DaConsensusMessage::DaVote(vote) => { - HotShotEvent::DaVoteRecv(vote.clone()) - } - DaConsensusMessage::DaCertificate(cert) => { - HotShotEvent::DaCertificateRecv(cert) - } - DaConsensusMessage::VidDisperseMsg(proposal) => { - HotShotEvent::VidShareRecv(proposal) - } - }, - }; - // TODO (Keyao benchmarking) Update these event variants (similar to the - // `TransactionsRecv` event) so we can send one event for a vector of messages. - // - broadcast_event(Arc::new(event), &self.internal_event_stream).await; - } - MessageKind::Data(message) => match message { - DataMessage::SubmitTransaction(transaction, _) => { - transactions.push(transaction); - } - DataMessage::DataResponse(_) | DataMessage::RequestData(_) => { - warn!("Request and Response messages should not be received in the NetworkMessage task"); - } - }, + /// Handles a (deserialized) message from the network + pub async fn handle_message(&mut self, message: Message) { + tracing::trace!("Received message from network:\n\n{message:?}"); + + // Match the message kind and send the appropriate event to the internal event stream + let sender = message.sender; + match message.kind { + // Handle consensus messages + MessageKind::Consensus(consensus_message) => { + let event = match consensus_message { + SequencingMessage::General(general_message) => match general_message { + GeneralConsensusMessage::Proposal(proposal) => { + HotShotEvent::QuorumProposalRecv(proposal, sender) + } + GeneralConsensusMessage::ProposalRequested(req, sig) => { + HotShotEvent::QuorumProposalRequestRecv(req, sig) + } + GeneralConsensusMessage::LeaderProposalAvailable(proposal) => { + HotShotEvent::QuorumProposalResponseRecv(proposal) + } + GeneralConsensusMessage::Vote(vote) => { + HotShotEvent::QuorumVoteRecv(vote.clone()) + } + GeneralConsensusMessage::ViewSyncPreCommitVote(view_sync_message) => { + HotShotEvent::ViewSyncPreCommitVoteRecv(view_sync_message) + } + GeneralConsensusMessage::ViewSyncPreCommitCertificate( + view_sync_message, + ) => HotShotEvent::ViewSyncPreCommitCertificate2Recv(view_sync_message), + + GeneralConsensusMessage::ViewSyncCommitVote(view_sync_message) => { + HotShotEvent::ViewSyncCommitVoteRecv(view_sync_message) + } + GeneralConsensusMessage::ViewSyncCommitCertificate(view_sync_message) => { + HotShotEvent::ViewSyncCommitCertificate2Recv(view_sync_message) + } + + GeneralConsensusMessage::ViewSyncFinalizeVote(view_sync_message) => { + HotShotEvent::ViewSyncFinalizeVoteRecv(view_sync_message) + } + GeneralConsensusMessage::ViewSyncFinalizeCertificate(view_sync_message) => { + HotShotEvent::ViewSyncFinalizeCertificate2Recv(view_sync_message) + } + + GeneralConsensusMessage::TimeoutVote(message) => { + HotShotEvent::TimeoutVoteRecv(message) + } + GeneralConsensusMessage::UpgradeProposal(message) => { + HotShotEvent::UpgradeProposalRecv(message, sender) + } + GeneralConsensusMessage::UpgradeVote(message) => { + error!("Received upgrade vote!"); + HotShotEvent::UpgradeVoteRecv(message) + } + }, + SequencingMessage::Da(da_message) => match da_message { + DaConsensusMessage::DaProposal(proposal) => { + HotShotEvent::DaProposalRecv(proposal, sender) + } + DaConsensusMessage::DaVote(vote) => HotShotEvent::DaVoteRecv(vote.clone()), + DaConsensusMessage::DaCertificate(cert) => { + HotShotEvent::DaCertificateRecv(cert) + } + DaConsensusMessage::VidDisperseMsg(proposal) => { + HotShotEvent::VidShareRecv(proposal) + } + }, + }; + // TODO (Keyao benchmarking) Update these event variants (similar to the + // `TransactionsRecv` event) so we can send one event for a vector of messages. + // + broadcast_event(Arc::new(event), &self.internal_event_stream).await; + } - MessageKind::External(data) => { - // Send the external message to the external event stream so it can be processed + // Handle data messages + MessageKind::Data(message) => match message { + DataMessage::SubmitTransaction(transaction, _) => { broadcast_event( - Event { - view_number: TYPES::Time::new(1), - event: EventType::ExternalMessageReceived(data), - }, - &self.external_event_stream, + Arc::new(HotShotEvent::TransactionsRecv(vec![transaction])), + &self.internal_event_stream, ) .await; } - }; - } - if !transactions.is_empty() { - broadcast_event( - Arc::new(HotShotEvent::TransactionsRecv(transactions)), - &self.internal_event_stream, - ) - .await; + DataMessage::DataResponse(_) | DataMessage::RequestData(_) => { + warn!("Request and Response messages should not be received in the NetworkMessage task"); + } + }, + + // Handle external messages + MessageKind::External(data) => { + // Send the external message to the external event stream so it can be processed + broadcast_event( + Event { + view_number: TYPES::Time::new(1), + event: EventType::ExternalMessageReceived(data), + }, + &self.external_event_stream, + ) + .await; + } } } } diff --git a/crates/testing/src/test_task.rs b/crates/testing/src/test_task.rs index 3559c4cee9..536153b77c 100644 --- a/crates/testing/src/test_task.rs +++ b/crates/testing/src/test_task.rs @@ -8,15 +8,15 @@ use std::{sync::Arc, time::Duration}; use anyhow::Result; use async_broadcast::{Receiver, Sender}; -use async_compatibility_layer::art::{async_sleep, async_spawn, async_timeout}; +use async_compatibility_layer::art::{async_spawn, async_timeout}; #[cfg(async_executor_impl = "async-std")] use async_std::task::{spawn, JoinHandle}; use async_trait::async_trait; use futures::future::select_all; -use hotshot::types::Event; +use hotshot::types::{Event, Message}; use hotshot_task_impls::{events::HotShotEvent, network::NetworkMessageTaskState}; use hotshot_types::{ - message::{Messages, UpgradeLock}, + message::UpgradeLock, traits::{ network::ConnectedNetwork, node_implementation::{NodeType, Versions}, @@ -131,37 +131,27 @@ pub async fn add_network_message_test_task< async_spawn(async move { loop { - let msgs = match network.recv_msgs().await { - Ok(msgs) => { - let mut deserialized_messages = Vec::new(); - - for msg in msgs { - let deserialized_message = match upgrade_lock.deserialize(&msg).await { - Ok(deserialized) => deserialized, - Err(e) => { - tracing::error!("Failed to deserialize message: {}", e); - return; - } - }; - - deserialized_messages.push(deserialized_message); - } - - Messages(deserialized_messages) - } - Err(err) => { - error!("failed to receive messages: {err}"); - - // return zero messages so we sleep and try again - Messages(vec![]) + // Get the next message from the network + let message = match network.recv_message().await { + Ok(message) => message, + Err(e) => { + error!("Failed to receive message: {:?}", e); + continue; } }; - if msgs.0.is_empty() { - // TODO: Stop sleeping here: https://github.com/EspressoSystems/HotShot/issues/2558 - async_sleep(Duration::from_millis(100)).await; - } else { - state.handle_messages(msgs.0).await; - } + + // Deserialize the message + let deserialized_message: Message = + match upgrade_lock.deserialize(&message).await { + Ok(message) => message, + Err(e) => { + tracing::error!("Failed to deserialize message: {:?}", e); + continue; + } + }; + + // Handle the message + state.handle_message(deserialized_message).await; } }) } diff --git a/crates/testing/tests/tests_3/memory_network.rs b/crates/testing/tests/tests_3/memory_network.rs index 0ecbbccc50..21db37e42d 100644 --- a/crates/testing/tests/tests_3/memory_network.rs +++ b/crates/testing/tests/tests_3/memory_network.rs @@ -5,9 +5,9 @@ // along with the HotShot repository. If not, see . #![allow(clippy::panic)] -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; -use async_compatibility_layer::logging::setup_logging; +use async_compatibility_layer::{art::async_timeout, logging::setup_logging}; use hotshot::{ traits::{ election::static_committee::GeneralStaticCommittee, @@ -171,13 +171,16 @@ async fn memory_network_direct_queue() { .direct_message(serialized_message.clone(), pub_key_2) .await .expect("Failed to message node"); - let mut recv_messages = network2 - .recv_msgs() + let recv_message = network2 + .recv_message() .await .expect("Failed to receive message"); - let recv_message = recv_messages.pop().unwrap(); let deserialized_message = upgrade_lock.deserialize(&recv_message).await.unwrap(); - assert!(recv_messages.is_empty()); + assert!( + async_timeout(Duration::from_secs(1), network2.recv_message()) + .await + .is_err() + ); fake_message_eq(sent_message, deserialized_message); } @@ -191,13 +194,16 @@ async fn memory_network_direct_queue() { .direct_message(serialized_message.clone(), pub_key_1) .await .expect("Failed to message node"); - let mut recv_messages = network1 - .recv_msgs() + let recv_message = network1 + .recv_message() .await .expect("Failed to receive message"); - let recv_message = recv_messages.pop().unwrap(); let deserialized_message = upgrade_lock.deserialize(&recv_message).await.unwrap(); - assert!(recv_messages.is_empty()); + assert!( + async_timeout(Duration::from_secs(1), network1.recv_message()) + .await + .is_err() + ); fake_message_eq(sent_message, deserialized_message); } } @@ -226,13 +232,16 @@ async fn memory_network_broadcast_queue() { .broadcast_message(serialized_message.clone(), Topic::Da, BroadcastDelay::None) .await .expect("Failed to message node"); - let mut recv_messages = network2 - .recv_msgs() + let recv_message = network2 + .recv_message() .await .expect("Failed to receive message"); - let recv_message = recv_messages.pop().unwrap(); let deserialized_message = upgrade_lock.deserialize(&recv_message).await.unwrap(); - assert!(recv_messages.is_empty()); + assert!( + async_timeout(Duration::from_secs(1), network2.recv_message()) + .await + .is_err() + ); fake_message_eq(sent_message, deserialized_message); } @@ -250,13 +259,16 @@ async fn memory_network_broadcast_queue() { ) .await .expect("Failed to message node"); - let mut recv_messages = network1 - .recv_msgs() + let recv_message = network1 + .recv_message() .await .expect("Failed to receive message"); - let recv_message = recv_messages.pop().unwrap(); let deserialized_message = upgrade_lock.deserialize(&recv_message).await.unwrap(); - assert!(recv_messages.is_empty()); + assert!( + async_timeout(Duration::from_secs(1), network1.recv_message()) + .await + .is_err() + ); fake_message_eq(sent_message, deserialized_message); } } @@ -325,18 +337,18 @@ async fn memory_network_test_in_flight_message_count() { while TestableNetworkingImplementation::::in_flight_message_count(&network1).unwrap() > 0 { - network1.recv_msgs().await.unwrap(); + network1.recv_message().await.unwrap(); } while TestableNetworkingImplementation::::in_flight_message_count(&network2).unwrap() > messages.len() { - network2.recv_msgs().await.unwrap(); + network2.recv_message().await.unwrap(); } while TestableNetworkingImplementation::::in_flight_message_count(&network2).unwrap() > 0 { - network2.recv_msgs().await.unwrap(); + network2.recv_message().await.unwrap(); } assert_eq!( diff --git a/crates/types/src/traits/network.rs b/crates/types/src/traits/network.rs index d8d89e7c20..c67644eecd 100644 --- a/crates/types/src/traits/network.rs +++ b/crates/types/src/traits/network.rs @@ -304,7 +304,7 @@ pub trait ConnectedNetwork: Clone + Send + Sync + 'st /// /// # Errors /// If there is a network-related failure. - async fn recv_msgs(&self) -> Result>, NetworkError>; + async fn recv_message(&self) -> Result, NetworkError>; /// Ask request the network for some data. Returns the request ID for that data, /// The ID returned can be used for cancelling the request