From 4e99aadb88f68cd53db40cd99107f0d91477dc19 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 1 May 2024 14:58:50 +0300 Subject: [PATCH 01/22] initial gossip client --- iroh-gossip/src/proto/plumtree.rs | 2 +- iroh-gossip/src/proto/topic.rs | 2 +- iroh/src/client.rs | 5 + iroh/src/client/gossip.rs | 40 ++++ iroh/src/gossip_dispatcher.rs | 319 ++++++++++++++++++++++++++++++ iroh/src/lib.rs | 1 + iroh/src/node.rs | 2 + iroh/src/node/builder.rs | 3 + iroh/src/node/rpc.rs | 7 + iroh/src/rpc_protocol.rs | 48 ++++- 10 files changed, 426 insertions(+), 3 deletions(-) create mode 100644 iroh/src/client/gossip.rs create mode 100644 iroh/src/gossip_dispatcher.rs diff --git a/iroh-gossip/src/proto/plumtree.rs b/iroh-gossip/src/proto/plumtree.rs index c1a96dea10..365630cff9 100644 --- a/iroh-gossip/src/proto/plumtree.rs +++ b/iroh-gossip/src/proto/plumtree.rs @@ -84,7 +84,7 @@ pub enum Event { Received(GossipEvent), } -#[derive(Clone, derive_more::Debug, PartialEq, Eq, Ord, PartialOrd)] +#[derive(Clone, derive_more::Debug, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)] pub struct GossipEvent { /// The content of the gossip message. #[debug("<{}b>", content.len())] diff --git a/iroh-gossip/src/proto/topic.rs b/iroh-gossip/src/proto/topic.rs index df36578dbb..dc573fae45 100644 --- a/iroh-gossip/src/proto/topic.rs +++ b/iroh-gossip/src/proto/topic.rs @@ -112,7 +112,7 @@ impl Message { } /// An event to be emitted to the application for a particular topic. -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] pub enum Event { /// We have a new, direct neighbor in the swarm membership layer for this topic NeighborUp(PI), diff --git a/iroh/src/client.rs b/iroh/src/client.rs index 840cb103b2..5d01e3d7cd 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -13,6 +13,7 @@ pub mod quic; mod authors; mod blobs; mod docs; +mod gossip; mod node; mod tags; @@ -22,6 +23,7 @@ pub use self::blobs::{ BlobStatus, Client as BlobsClient, }; pub use self::docs::{Client as DocsClient, Doc, Entry, LiveEvent}; +pub use self::gossip::Client as GossipClient; pub use self::node::Client as NodeClient; pub use self::tags::Client as TagsClient; @@ -38,6 +40,8 @@ pub struct Iroh { pub authors: AuthorsClient, /// Client for tags operations. pub tags: TagsClient, + /// Client for tags operations. + pub gossip: GossipClient, } impl Iroh @@ -51,6 +55,7 @@ where blobs: BlobsClient { rpc: rpc.clone() }, docs: DocsClient { rpc: rpc.clone() }, authors: AuthorsClient { rpc: rpc.clone() }, + gossip: GossipClient { rpc: rpc.clone() }, tags: TagsClient { rpc }, } } diff --git a/iroh/src/client/gossip.rs b/iroh/src/client/gossip.rs new file mode 100644 index 0000000000..cec8d344bc --- /dev/null +++ b/iroh/src/client/gossip.rs @@ -0,0 +1,40 @@ +use std::collections::BTreeSet; + +use anyhow::Result; +use futures_lite::{Stream, StreamExt}; +use futures_util::Sink; +use iroh_gossip::proto::TopicId; +use iroh_net::NodeId; +use quic_rpc::{RpcClient, ServiceConnection}; + +use crate::rpc_protocol::{ + GossipSubscribeRequest, GossipSubscribeResponse, GossipSubscribeUpdate, ProviderService, +}; + +/// Iroh gossip client. +#[derive(Debug, Clone)] +pub struct Client { + pub(super) rpc: RpcClient, +} + +impl Client +where + C: ServiceConnection, +{ + /// Subscribe to a gossip topic. + pub async fn subscribe( + &self, + topic: TopicId, + bootstrap: BTreeSet, + ) -> Result<( + impl Sink, + impl Stream>, + )> { + let (sink, stream) = self + .rpc + .bidi(GossipSubscribeRequest { topic, bootstrap }) + .await?; + let stream = stream.map(|item| anyhow::Ok(item??)); + Ok((sink, stream)) + } +} diff --git a/iroh/src/gossip_dispatcher.rs b/iroh/src/gossip_dispatcher.rs new file mode 100644 index 0000000000..3739dba39b --- /dev/null +++ b/iroh/src/gossip_dispatcher.rs @@ -0,0 +1,319 @@ +//! A gossip engine that manages gossip subscriptions and updates. +use std::{ + collections::{btree_map::Entry, BTreeMap, BTreeSet}, + pin::Pin, + sync::{Arc, Mutex}, +}; + +use futures_util::Stream; +use iroh_base::rpc::{RpcError, RpcResult}; +use iroh_gossip::{ + net::{Event, Gossip}, + proto::TopicId, +}; +use iroh_net::{util::AbortingJoinHandle, NodeId}; + +use crate::rpc_protocol::{GossipSubscribeRequest, GossipSubscribeResponse, GossipSubscribeUpdate}; + +/// A gossip engine that manages gossip subscriptions and updates. +#[derive(Debug, Clone)] +pub struct GossipDispatcher { + gossip: Gossip, + inner: Arc>, +} + +/// The mutable state of the gossip engine. +#[derive(Debug)] +struct State { + current_subscriptions: BTreeMap, + /// the single task that dispatches gossip events to all subscribed streams + /// + /// this isn't really part of the mutable state, but it needs to live somewhere + task: Option>, +} + +/// The maximum number of messages that can be buffered in a subscription. +/// +/// If this limit is reached, the subscriber will receive a `Lagged` response, +/// the message will be dropped, and the subscriber will be closed. +/// +/// This is to prevent a single slow subscriber from blocking the dispatch loop. +/// If a subscriber is lagging, it should be closed and re-opened. +const SUBSCRIPTION_CAPACITY: usize = 128; +type UpdateStream = Box + Send + Sync + Unpin + 'static>; +type ResponseSink = flume::Sender>; + +#[derive(derive_more::Debug)] +enum TopicState { + /// The topic is currently joining. + /// Making new subscriptions is allowed, but they will have to wait for the join to finish. + Joining { + #[debug(skip)] + waiting: Vec<(UpdateStream, ResponseSink)>, + peers: BTreeSet, + #[allow(dead_code)] + join_task: AbortingJoinHandle<()>, + }, + /// The topic is currently live. + /// New subscriptions can be immediately added. + Live { + live: Vec<(AbortingJoinHandle<()>, ResponseSink)>, + }, + /// The topic is currently quitting. + /// We can't make new subscriptions without waiting for the quit to finish. + Quitting { + #[debug(skip)] + waiting: Vec<(UpdateStream, ResponseSink)>, + peers: BTreeSet, + #[allow(dead_code)] + quit_task: AbortingJoinHandle<()>, + }, +} + +impl TopicState { + fn into_senders(self) -> Vec { + match self { + TopicState::Joining { waiting, .. } | TopicState::Quitting { waiting, .. } => { + waiting.into_iter().map(|(_, send)| send).collect() + } + TopicState::Live { live } => live.into_iter().map(|(_, send)| send).collect(), + } + } +} + +impl GossipDispatcher { + /// Create a new gossip engine with the given gossip instance. + pub fn spawn(gossip: Gossip) -> Self { + let inner = Arc::new(Mutex::new(State { + current_subscriptions: BTreeMap::new(), + task: None, + })); + let res = Self { gossip, inner }; + let dispatch_task = spawn_owned(res.clone().dispatch_task()); + res.inner.lock().unwrap().task = Some(dispatch_task); + res + } + + async fn quit_task(self, topic: TopicId) { + let res = self.gossip.quit(topic).await; + let mut inner = self.inner.lock().unwrap(); + if let Some(TopicState::Quitting { waiting, peers, .. }) = + inner.current_subscriptions.remove(&topic) + { + match res { + Ok(()) => { + if waiting.is_empty() { + return; + } + let bootstrap = peers.iter().copied().collect(); + let join_task = spawn_owned(self.clone().join_task(topic, bootstrap)); + inner.current_subscriptions.insert( + topic, + TopicState::Joining { + waiting, + peers, + join_task, + }, + ); + } + Err(e) => { + // notify all waiting streams that there is something wrong with the topic + let error = RpcError::from(e); + for (_, send) in waiting { + send.try_send(Err(error.clone())).ok(); + } + } + } + } + } + + fn try_send(entry: &(AbortingJoinHandle<()>, ResponseSink), event: &Event) -> bool { + let (task, send) = entry; + if task.is_finished() { + return false; + } + if send.is_disconnected() { + return false; + } + if let Some(cap) = send.capacity() { + if send.len() >= cap - 1 { + send.try_send(Ok(GossipSubscribeResponse::Lagged)).ok(); + return false; + } + } + send.try_send(Ok(GossipSubscribeResponse::Event(event.clone()))) + .is_ok() + } + + /// Dispatch gossip events to all subscribed streams. + /// + /// This should not fail unless the gossip instance is faulty. + async fn dispatch_loop(self) -> anyhow::Result<()> { + use futures_lite::stream::StreamExt; + let stream = self.gossip.clone().subscribe_all(); + tokio::pin!(stream); + while let Some(item) = stream.next().await { + let (topic, event) = item?; + let mut inner = self.inner.lock().unwrap(); + if let Some(TopicState::Live { live }) = inner.current_subscriptions.get_mut(&topic) { + live.retain(|entry| Self::try_send(entry, &event)); + if live.is_empty() { + let quit_task = tokio::task::spawn(self.clone().quit_task(topic)); + inner.current_subscriptions.insert( + topic, + TopicState::Quitting { + waiting: vec![], + peers: BTreeSet::new(), + quit_task: quit_task.into(), + }, + ); + } + } + } + Ok(()) + } + + /// Dispatch gossip events to all subscribed streams, and handle the unlikely case of a dispatch loop failure. + async fn dispatch_task(self) { + if let Err(cause) = self.clone().dispatch_loop().await { + // dispatch task failed. Not sure what to do here. + tracing::error!("Gossip dispatch task failed: {}", cause); + let mut inner = self.inner.lock().unwrap(); + let error = RpcError::from(cause); + for (_, state) in std::mem::take(&mut inner.current_subscriptions) { + for sender in state.into_senders() { + sender.try_send(Err(error.clone())).ok(); + } + } + } + } + + /// Handle updates from the client. + async fn update_loop( + gossip: Gossip, + topic: TopicId, + mut updates: UpdateStream, + ) -> anyhow::Result<()> { + use futures_lite::stream::StreamExt; + while let Some(update) = Pin::new(&mut updates).next().await { + match update { + GossipSubscribeUpdate::Broadcast(msg) => { + gossip.broadcast(topic, msg).await?; + } + GossipSubscribeUpdate::BroadcastNeighbors(msg) => { + gossip.broadcast_neighbors(topic, msg).await?; + } + } + } + Ok(()) + } + + /// Handle updates from the client, and handle update loop failure. + async fn update_task(self, topic: TopicId, updates: UpdateStream) { + let Err(e) = Self::update_loop(self.gossip.clone(), topic, updates).await else { + return; + }; + let mut inner = self.inner.lock().unwrap(); + // we got an error while sending to the topic + if let Some(TopicState::Live { live }) = inner.current_subscriptions.remove(&topic) { + let error = RpcError::from(e); + // notify all live streams that sending to the topic failed + for (_, send) in live { + send.try_send(Err(error.clone())).ok(); + } + } + } + + /// Call join, then await the result. + async fn join(gossip: Gossip, topic: TopicId, bootstrap: Vec) -> anyhow::Result<()> { + gossip.join(topic, bootstrap).await?.await?; + Ok(()) + } + + /// Join a gossip topic and handle turning waiting streams into live streams. + async fn join_task(self, topic: TopicId, bootstrap: BTreeSet) { + let res = Self::join(self.gossip.clone(), topic, bootstrap.into_iter().collect()).await; + let mut inner = self.inner.lock().unwrap(); + if let Some(TopicState::Joining { waiting, .. }) = + inner.current_subscriptions.remove(&topic) + { + match res { + Ok(()) => { + let mut live = vec![]; + for (updates, send) in waiting { + // if the stream is disconnected, we don't need to keep it and start the update task + if send.is_disconnected() { + continue; + } + let task = spawn_owned(self.clone().update_task(topic, updates)); + live.push((task, send)); + } + inner + .current_subscriptions + .insert(topic, TopicState::Live { live }); + } + Err(e) => { + // notify all waiting streams that the subscription failed + let error = RpcError::from(e); + for (_, send) in waiting { + send.try_send(Err(error.clone())).ok(); + } + } + } + } + } + + /// Subscribe to a gossip topic. + pub fn subscribe( + &self, + msg: GossipSubscribeRequest, + updates: UpdateStream, + ) -> impl Stream> { + let topic = msg.topic; + let mut inner = self.inner.lock().unwrap(); + let (send, recv) = flume::bounded(SUBSCRIPTION_CAPACITY); + match inner.current_subscriptions.entry(topic) { + Entry::Vacant(entry) => { + // There is no existing subscription, so we need to start a new one. + let waiting = vec![(updates, send)]; + let this = self.clone(); + let join_task = spawn_owned(this.clone().join_task(topic, msg.bootstrap.clone())); + entry.insert(TopicState::Joining { + waiting, + peers: msg.bootstrap, + join_task, + }); + } + Entry::Occupied(mut entry) => { + // There is already a subscription + let state = entry.get_mut(); + match state { + TopicState::Joining { waiting, peers, .. } => { + // We are joining, so we need to wait with creating the update task. + peers.extend(msg.bootstrap.into_iter()); + waiting.push((updates, send)); + } + TopicState::Quitting { waiting, peers, .. } => { + // We are quitting, so we need to wait with creating the update task. + peers.extend(msg.bootstrap.into_iter()); + waiting.push((updates, send)); + } + TopicState::Live { live } => { + // There is already a live subscription, so we can immediately start the update task. + let task = spawn_owned(self.clone().update_task(topic, updates)); + live.push((task, send)); + } + } + } + } + recv.into_stream() + } +} + +fn spawn_owned(f: F) -> AbortingJoinHandle +where + F: std::future::Future + Send + 'static, + T: Send + 'static, +{ + tokio::spawn(f).into() +} diff --git a/iroh/src/lib.rs b/iroh/src/lib.rs index 69ce9b98bb..484a3e0da6 100644 --- a/iroh/src/lib.rs +++ b/iroh/src/lib.rs @@ -14,6 +14,7 @@ pub use iroh_base::base32; pub mod client; pub mod dial; +pub mod gossip_dispatcher; pub mod node; pub mod rpc_protocol; pub mod sync_engine; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 9866a652a8..d76d2a26e1 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -31,6 +31,7 @@ use tokio_util::sync::CancellationToken; use tokio_util::task::LocalPoolHandle; use tracing::debug; +use crate::gossip_dispatcher::GossipDispatcher; use crate::rpc_protocol::{ProviderRequest, ProviderResponse}; use crate::sync_engine::SyncEngine; use crate::ticket::BlobTicket; @@ -106,6 +107,7 @@ struct NodeInner { #[debug("rt")] rt: LocalPoolHandle, pub(crate) sync: SyncEngine, + gossip: GossipDispatcher, downloader: Downloader, } diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index d964d88a71..0e10bc39fe 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -33,6 +33,7 @@ use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use crate::{ client::quic::RPC_ALPN, + gossip_dispatcher::GossipDispatcher, node::{Event, NodeInner}, rpc_protocol::{ProviderRequest, ProviderResponse, ProviderService}, sync_engine::SyncEngine, @@ -398,6 +399,7 @@ where self.blobs_store.clone(), downloader.clone(), ); + let gossip_dispatcher = GossipDispatcher::spawn(gossip.clone()); let sync_db = sync.sync.clone(); let callbacks = Callbacks::default(); @@ -424,6 +426,7 @@ where gc_task, rt: lp.clone(), sync, + gossip: gossip_dispatcher, downloader, }); let task = { diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 29941fc680..bb7d51eb6f 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -277,6 +277,13 @@ impl Handler { }) .await } + GossipSubscribe(msg) => { + chan.bidi_streaming(msg, handler, |handler, req, updates| { + handler.inner.gossip.subscribe(req, Box::new(updates)) + }) + .await + } + GossipSubscribeUpdate(_msg) => Err(RpcServerError::UnexpectedUpdateMessage), } }); } diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 14ab810946..cef5e8c96b 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -7,7 +7,11 @@ //! response, while others like provide have a stream of responses. //! //! Note that this is subject to change. The RPC protocol is not yet stable. -use std::{collections::BTreeMap, net::SocketAddr, path::PathBuf}; +use std::{ + collections::{BTreeMap, BTreeSet}, + net::SocketAddr, + path::PathBuf, +}; use bytes::Bytes; use derive_more::{From, TryInto}; @@ -18,6 +22,7 @@ use iroh_bytes::{ store::{BaoBlobSize, ConsistencyCheckProgress}, util::Tag, }; +use iroh_gossip::proto::TopicId; use iroh_net::{ key::PublicKey, magic_endpoint::{ConnectionInfo, NodeAddr}, @@ -1117,6 +1122,42 @@ pub struct NodeStatsResponse { pub stats: BTreeMap, } +/// Join a gossip topic +#[derive(Serialize, Deserialize, Debug)] +pub struct GossipSubscribeRequest { + /// The topic to join + pub topic: TopicId, + /// The initial bootstrap nodes + pub bootstrap: BTreeSet, +} + +/// Send a gossip message +#[derive(Serialize, Deserialize, Debug)] +pub enum GossipSubscribeUpdate { + /// Broadcast a message to all nodes in the swarm + Broadcast(Bytes), + /// Broadcast a message to all direct neighbors + BroadcastNeighbors(Bytes), +} + +/// Update from a subscribed gossip topic +#[derive(Serialize, Deserialize, Debug)] +pub enum GossipSubscribeResponse { + /// A message was received + Event(iroh_gossip::net::Event), + /// We missed some messages + Lagged, +} + +impl Msg for GossipSubscribeRequest { + type Pattern = BidiStreaming; +} + +impl BidiStreamingMsg for GossipSubscribeRequest { + type Update = GossipSubscribeUpdate; + type Response = RpcResult; +} + /// The RPC service for the iroh provider process. #[derive(Debug, Clone)] pub struct ProviderService; @@ -1177,6 +1218,9 @@ pub enum ProviderRequest { AuthorImport(AuthorImportRequest), AuthorExport(AuthorExportRequest), AuthorDelete(AuthorDeleteRequest), + + GossipSubscribe(GossipSubscribeRequest), + GossipSubscribeUpdate(GossipSubscribeUpdate), } /// The response enum, listing all possible responses. @@ -1234,6 +1278,8 @@ pub enum ProviderResponse { AuthorImport(RpcResult), AuthorExport(RpcResult), AuthorDelete(RpcResult), + + GossipSubscribeUpdate(RpcResult), } impl Service for ProviderService { From 48a299300f5c4caffc7a2b291c2eaebeac5a2df8 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 1 May 2024 16:17:48 +0300 Subject: [PATCH 02/22] Add CLI and make a fuew things public --- Cargo.lock | 1 + iroh-cli/Cargo.toml | 1 + iroh-cli/src/commands.rs | 1 + iroh-cli/src/commands/gossip.rs | 79 +++++++++++++++++++++++++++++++++ iroh-cli/src/commands/rpc.rs | 12 ++++- iroh-gossip/src/proto.rs | 2 +- iroh/src/client/gossip.rs | 5 ++- iroh/src/gossip_dispatcher.rs | 2 +- iroh/src/rpc_protocol.rs | 43 +++++++++++++++++- 9 files changed, 138 insertions(+), 8 deletions(-) create mode 100644 iroh-cli/src/commands/gossip.rs diff --git a/Cargo.lock b/Cargo.lock index 023a441b94..668d77c627 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2471,6 +2471,7 @@ dependencies = [ "flume", "futures-buffered", "futures-lite", + "futures-util", "hex", "human-time", "indicatif", diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index 78b5ae7274..a248a9139e 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -35,6 +35,7 @@ dirs-next = "2.0.0" flume = "0.11.0" futures-buffered = "0.2.4" futures-lite = "2.3" +futures-util = { version = "0.3.30", features = ["futures-sink"] } hex = "0.4.3" human-time = "0.1.6" indicatif = { version = "0.17", features = ["tokio"] } diff --git a/iroh-cli/src/commands.rs b/iroh-cli/src/commands.rs index b2fd8c42e4..6f3e30f2d0 100644 --- a/iroh-cli/src/commands.rs +++ b/iroh-cli/src/commands.rs @@ -16,6 +16,7 @@ pub(crate) mod blob; pub(crate) mod console; pub(crate) mod doc; pub(crate) mod doctor; +pub(crate) mod gossip; pub(crate) mod node; pub(crate) mod rpc; pub(crate) mod start; diff --git a/iroh-cli/src/commands/gossip.rs b/iroh-cli/src/commands/gossip.rs new file mode 100644 index 0000000000..62ea762b9a --- /dev/null +++ b/iroh-cli/src/commands/gossip.rs @@ -0,0 +1,79 @@ +use anyhow::{Context, Result}; +use bao_tree::blake3; +use clap::Subcommand; +use futures_lite::StreamExt; +use futures_util::SinkExt; +use iroh::net::NodeId; +use iroh::rpc_protocol::{ + GossipEvent, GossipMessage, GossipSubscribeResponse, GossipSubscribeUpdate, +}; +use iroh::{client::Iroh, rpc_protocol::ProviderService}; +use quic_rpc::ServiceConnection; +use tokio::io::AsyncBufReadExt; + +#[derive(Subcommand, Debug, Clone)] +#[allow(clippy::large_enum_variant)] +pub enum GossipCommands { + /// Subscribe to a topic + Subscribe { + #[clap(long)] + topic: String, + bootstrap: Vec, + #[clap(long, short)] + verbose: bool, + }, +} + +impl GossipCommands { + pub async fn run(self, iroh: &Iroh) -> Result<()> + where + C: ServiceConnection, + { + match self { + Self::Subscribe { + topic, + bootstrap, + verbose, + } => { + let bootstrap = bootstrap.into_iter().collect(); + let topic = blake3::hash(topic.as_ref()).into(); + + let (mut sink, mut stream) = iroh.gossip.subscribe(topic, bootstrap).await?; + let mut input_lines = tokio::io::BufReader::new(tokio::io::stdin()).lines(); + loop { + tokio::select! { + line = input_lines.next_line() => { + let line = line.context("failed to read from stdin")?; + if let Some(line) = line { + sink.send(GossipSubscribeUpdate::Broadcast(line.into())).await?; + } else { + break; + } + } + res = stream.next() => { + let res = res.context("gossip stream ended")?.context("failed to read gossip stream")?; + match res { + GossipSubscribeResponse::Event(event) => { + if verbose { + println!("{:?}", event); + } else { + match event { + GossipEvent::Received(GossipMessage { content, .. }) => { + println!("{:?}", content); + } + _ => {} + } + } + } + GossipSubscribeResponse::Lagged => { + anyhow::bail!("gossip stream lagged"); + } + }; + } + } + } + } + } + Ok(()) + } +} diff --git a/iroh-cli/src/commands/rpc.rs b/iroh-cli/src/commands/rpc.rs index 036ecf08c7..a22d67206a 100644 --- a/iroh-cli/src/commands/rpc.rs +++ b/iroh-cli/src/commands/rpc.rs @@ -6,8 +6,8 @@ use quic_rpc::ServiceConnection; use crate::config::ConsoleEnv; use super::{ - author::AuthorCommands, blob::BlobCommands, doc::DocCommands, node::NodeCommands, - tag::TagCommands, + author::AuthorCommands, blob::BlobCommands, doc::DocCommands, gossip::GossipCommands, + node::NodeCommands, tag::TagCommands, }; #[derive(Subcommand, Debug, Clone)] @@ -42,6 +42,13 @@ pub enum RpcCommands { #[clap(subcommand)] command: NodeCommands, }, + /// Manage gossip + /// + /// Gossip is a way to broadcast messages to a group of nodes. + Gossip { + #[clap(subcommand)] + command: GossipCommands, + }, /// Manage tags /// /// Tags are local, human-readable names for things iroh should keep. @@ -68,6 +75,7 @@ impl RpcCommands { Self::Doc { command } => command.run(iroh, env).await, Self::Author { command } => command.run(iroh, env).await, Self::Tag { command } => command.run(iroh).await, + Self::Gossip { command } => command.run(iroh).await, } } } diff --git a/iroh-gossip/src/proto.rs b/iroh-gossip/src/proto.rs index 25eb6f1384..105a9d29d5 100644 --- a/iroh-gossip/src/proto.rs +++ b/iroh-gossip/src/proto.rs @@ -59,7 +59,7 @@ pub mod util; #[cfg(test)] mod tests; -pub use plumtree::Scope; +pub use plumtree::{DeliveryScope, Scope}; pub use state::{InEvent, Message, OutEvent, State, Timer, TopicId}; pub use topic::{Command, Config, Event, IO}; diff --git a/iroh/src/client/gossip.rs b/iroh/src/client/gossip.rs index cec8d344bc..2436a3de64 100644 --- a/iroh/src/client/gossip.rs +++ b/iroh/src/client/gossip.rs @@ -2,7 +2,7 @@ use std::collections::BTreeSet; use anyhow::Result; use futures_lite::{Stream, StreamExt}; -use futures_util::Sink; +use futures_util::{Sink, SinkExt}; use iroh_gossip::proto::TopicId; use iroh_net::NodeId; use quic_rpc::{RpcClient, ServiceConnection}; @@ -27,7 +27,7 @@ where topic: TopicId, bootstrap: BTreeSet, ) -> Result<( - impl Sink, + impl Sink, impl Stream>, )> { let (sink, stream) = self @@ -35,6 +35,7 @@ where .bidi(GossipSubscribeRequest { topic, bootstrap }) .await?; let stream = stream.map(|item| anyhow::Ok(item??)); + let sink = sink.sink_map_err(|_| anyhow::anyhow!("send error")); Ok((sink, stream)) } } diff --git a/iroh/src/gossip_dispatcher.rs b/iroh/src/gossip_dispatcher.rs index 3739dba39b..73c422a70a 100644 --- a/iroh/src/gossip_dispatcher.rs +++ b/iroh/src/gossip_dispatcher.rs @@ -141,7 +141,7 @@ impl GossipDispatcher { return false; } } - send.try_send(Ok(GossipSubscribeResponse::Event(event.clone()))) + send.try_send(Ok(GossipSubscribeResponse::Event(event.clone().into()))) .is_ok() } diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index cef5e8c96b..99b1bb0ea8 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -22,10 +22,11 @@ use iroh_bytes::{ store::{BaoBlobSize, ConsistencyCheckProgress}, util::Tag, }; -use iroh_gossip::proto::TopicId; +use iroh_gossip::proto::{DeliveryScope, TopicId}; use iroh_net::{ key::PublicKey, magic_endpoint::{ConnectionInfo, NodeAddr}, + NodeId, }; use iroh_sync::{ @@ -1144,7 +1145,7 @@ pub enum GossipSubscribeUpdate { #[derive(Serialize, Deserialize, Debug)] pub enum GossipSubscribeResponse { /// A message was received - Event(iroh_gossip::net::Event), + Event(GossipEvent), /// We missed some messages Lagged, } @@ -1158,6 +1159,44 @@ impl BidiStreamingMsg for GossipSubscribeRequest { type Response = RpcResult; } +/// Gossip event +/// An event to be emitted to the application for a particular topic. +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] +pub enum GossipEvent { + /// We have a new, direct neighbor in the swarm membership layer for this topic + NeighborUp(NodeId), + /// We dropped direct neighbor in the swarm membership layer for this topic + NeighborDown(NodeId), + /// A gossip message was received for this topic + Received(GossipMessage), +} + +impl From> for GossipEvent { + fn from(event: iroh_gossip::proto::Event) -> Self { + match event { + iroh_gossip::proto::Event::NeighborUp(node_id) => Self::NeighborUp(node_id), + iroh_gossip::proto::Event::NeighborDown(node_id) => Self::NeighborDown(node_id), + iroh_gossip::proto::Event::Received(message) => Self::Received(GossipMessage { + content: message.content, + scope: message.scope, + delivered_from: message.delivered_from, + }), + } + } +} + +/// A gossip message +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] +pub struct GossipMessage { + /// The content of the message + pub content: Bytes, + /// The scope of the message. + /// This tells us if the message is from a direct neighbor or actual gossip. + pub scope: DeliveryScope, + /// The node that delivered the message. This is not the same as the original author. + pub delivered_from: NodeId, +} + /// The RPC service for the iroh provider process. #[derive(Debug, Clone)] pub struct ProviderService; From 488241227e39ec4701f9febf46d3ab1112afa93f Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 1 May 2024 16:46:37 +0300 Subject: [PATCH 03/22] more comments --- iroh/src/gossip_dispatcher.rs | 64 +++++++++++++++++++++++++++++------ 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/iroh/src/gossip_dispatcher.rs b/iroh/src/gossip_dispatcher.rs index 73c422a70a..89339e83a0 100644 --- a/iroh/src/gossip_dispatcher.rs +++ b/iroh/src/gossip_dispatcher.rs @@ -40,7 +40,9 @@ struct State { /// This is to prevent a single slow subscriber from blocking the dispatch loop. /// If a subscriber is lagging, it should be closed and re-opened. const SUBSCRIPTION_CAPACITY: usize = 128; +/// Type alias for a stream of gossip updates, so we don't have to repeat all the bounds. type UpdateStream = Box + Send + Sync + Unpin + 'static>; +/// Type alias for a sink of gossip responses. type ResponseSink = flume::Sender>; #[derive(derive_more::Debug)] @@ -48,29 +50,41 @@ enum TopicState { /// The topic is currently joining. /// Making new subscriptions is allowed, but they will have to wait for the join to finish. Joining { + /// Stream/sink pairs that are waiting for the topic to become live. #[debug(skip)] waiting: Vec<(UpdateStream, ResponseSink)>, - peers: BTreeSet, + /// Set of bootstrap nodes we are using. + bootstrap: BTreeSet, + /// The task that is driving the join future. #[allow(dead_code)] join_task: AbortingJoinHandle<()>, }, /// The topic is currently live. /// New subscriptions can be immediately added. Live { + /// Task/sink pairs that are currently live. + /// The task is the task that is sending broadcast messages to the topic. live: Vec<(AbortingJoinHandle<()>, ResponseSink)>, }, /// The topic is currently quitting. /// We can't make new subscriptions without waiting for the quit to finish. Quitting { + /// Stream/sink pairs that are waiting for the topic to quit so + /// it can be joined again. #[debug(skip)] waiting: Vec<(UpdateStream, ResponseSink)>, - peers: BTreeSet, + /// Set of bootstrap nodes we are using. + /// + /// This is used to re-join the topic after quitting. + bootstrap: BTreeSet, + /// The task that is driving the quit future. #[allow(dead_code)] quit_task: AbortingJoinHandle<()>, }, } impl TopicState { + /// Extract all senders from the state. fn into_senders(self) -> Vec { match self { TopicState::Joining { waiting, .. } | TopicState::Quitting { waiting, .. } => { @@ -82,7 +96,7 @@ impl TopicState { } impl GossipDispatcher { - /// Create a new gossip engine with the given gossip instance. + /// Create a new gossip dispatcher with the given gossip instance. pub fn spawn(gossip: Gossip) -> Self { let inner = Arc::new(Mutex::new(State { current_subscriptions: BTreeMap::new(), @@ -94,24 +108,31 @@ impl GossipDispatcher { res } + /// Quit a gossip topic and handle the result of the quitting. + /// + /// On quit success, will try to join the topic again with the bootstrap nodes we have accumulated while waiting for quit to finish. + /// On quit failure, all waiting streams will be notified with the error and removed. async fn quit_task(self, topic: TopicId) { let res = self.gossip.quit(topic).await; let mut inner = self.inner.lock().unwrap(); - if let Some(TopicState::Quitting { waiting, peers, .. }) = - inner.current_subscriptions.remove(&topic) + if let Some(TopicState::Quitting { + waiting, + bootstrap: peers, + .. + }) = inner.current_subscriptions.remove(&topic) { match res { Ok(()) => { if waiting.is_empty() { return; } - let bootstrap = peers.iter().copied().collect(); + let bootstrap = peers.clone(); let join_task = spawn_owned(self.clone().join_task(topic, bootstrap)); inner.current_subscriptions.insert( topic, TopicState::Joining { waiting, - peers, + bootstrap: peers, join_task, }, ); @@ -127,20 +148,28 @@ impl GossipDispatcher { } } + /// Try to send an event to a sink. + /// + /// This will not wait until the sink is full, but send a `Lagged` response if the sink is almost full. fn try_send(entry: &(AbortingJoinHandle<()>, ResponseSink), event: &Event) -> bool { let (task, send) = entry; + // This means that we stop sending to the stream when the update side is finished. if task.is_finished() { return false; } + // If the stream is disconnected, we don't need to send to it. if send.is_disconnected() { return false; } + // Check if the send buffer is almost full, and send a lagged response if it is. if let Some(cap) = send.capacity() { if send.len() >= cap - 1 { send.try_send(Ok(GossipSubscribeResponse::Lagged)).ok(); return false; } } + // Send the event to the stream. + // We are the owner of the stream, so we can be sure that there is still room. send.try_send(Ok(GossipSubscribeResponse::Event(event.clone().into()))) .is_ok() } @@ -163,7 +192,7 @@ impl GossipDispatcher { topic, TopicState::Quitting { waiting: vec![], - peers: BTreeSet::new(), + bootstrap: BTreeSet::new(), quit_task: quit_task.into(), }, ); @@ -225,6 +254,8 @@ impl GossipDispatcher { } /// Call join, then await the result. + /// + /// Basically just flattens the two stages of joining into one. async fn join(gossip: Gossip, topic: TopicId, bootstrap: Vec) -> anyhow::Result<()> { gossip.join(topic, bootstrap).await?.await?; Ok(()) @@ -280,7 +311,7 @@ impl GossipDispatcher { let join_task = spawn_owned(this.clone().join_task(topic, msg.bootstrap.clone())); entry.insert(TopicState::Joining { waiting, - peers: msg.bootstrap, + bootstrap: msg.bootstrap, join_task, }); } @@ -288,12 +319,22 @@ impl GossipDispatcher { // There is already a subscription let state = entry.get_mut(); match state { - TopicState::Joining { waiting, peers, .. } => { + TopicState::Joining { + waiting, + bootstrap: peers, + .. + } => { // We are joining, so we need to wait with creating the update task. + // + // TODO: should we merge the bootstrap nodes and try to join with all of them? peers.extend(msg.bootstrap.into_iter()); waiting.push((updates, send)); } - TopicState::Quitting { waiting, peers, .. } => { + TopicState::Quitting { + waiting, + bootstrap: peers, + .. + } => { // We are quitting, so we need to wait with creating the update task. peers.extend(msg.bootstrap.into_iter()); waiting.push((updates, send)); @@ -310,6 +351,7 @@ impl GossipDispatcher { } } +/// tokio::spawn but returns an `AbortingJoinHandle` that owns the task. fn spawn_owned(f: F) -> AbortingJoinHandle where F: std::future::Future + Send + 'static, From b88398e743935c5975571e8b4fcb54c929ddc5e8 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 1 May 2024 16:47:05 +0300 Subject: [PATCH 04/22] fix(iroh-sync): Ignore all gossip events that are not for the sync engine --- iroh/src/sync_engine/gossip.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/iroh/src/sync_engine/gossip.rs b/iroh/src/sync_engine/gossip.rs index 86853cab38..5c5d9e6806 100644 --- a/iroh/src/sync_engine/gossip.rs +++ b/iroh/src/sync_engine/gossip.rs @@ -147,6 +147,10 @@ impl GossipActor { }, }; let namespace: NamespaceId = topic.as_bytes().into(); + if !self.joined.contains(&namespace) && !self.want_join.contains(&namespace) { + trace!(namespace = %namespace.fmt_short(), "received gossip event for unknown topic"); + return Ok(()); + } if let Err(err) = self.on_gossip_event_inner(namespace, event).await { error!(namespace = %namespace.fmt_short(), ?err, "Failed to process gossip event"); } From 2d4ef9e0bc8ce37a2a11373226f6e8994849b4da Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 1 May 2024 17:35:44 +0300 Subject: [PATCH 05/22] More logging. --- iroh/src/gossip_dispatcher.rs | 11 ++++++++++- iroh/src/sync_engine/gossip.rs | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/iroh/src/gossip_dispatcher.rs b/iroh/src/gossip_dispatcher.rs index 89339e83a0..2634e7c5e8 100644 --- a/iroh/src/gossip_dispatcher.rs +++ b/iroh/src/gossip_dispatcher.rs @@ -197,6 +197,11 @@ impl GossipDispatcher { }, ); } + } else { + tracing::trace!( + "Received event for unknown topic, possibly sync {}", + hex::encode(topic) + ); } } Ok(()) @@ -257,7 +262,11 @@ impl GossipDispatcher { /// /// Basically just flattens the two stages of joining into one. async fn join(gossip: Gossip, topic: TopicId, bootstrap: Vec) -> anyhow::Result<()> { - gossip.join(topic, bootstrap).await?.await?; + tracing::error!("Joining gossip topic {:?}", topic); + let join = gossip.join(topic, bootstrap).await?; + tracing::error!("Waiting for joint to gossip topic {:?} to succeed", topic); + join.await?; + tracing::error!("Joined gossip topic {:?}", topic); Ok(()) } diff --git a/iroh/src/sync_engine/gossip.rs b/iroh/src/sync_engine/gossip.rs index 5c5d9e6806..3dfd5b08fd 100644 --- a/iroh/src/sync_engine/gossip.rs +++ b/iroh/src/sync_engine/gossip.rs @@ -148,7 +148,7 @@ impl GossipActor { }; let namespace: NamespaceId = topic.as_bytes().into(); if !self.joined.contains(&namespace) && !self.want_join.contains(&namespace) { - trace!(namespace = %namespace.fmt_short(), "received gossip event for unknown topic"); + error!(namespace = %namespace.fmt_short(), "received gossip event for unknown topic"); return Ok(()); } if let Err(err) = self.on_gossip_event_inner(namespace, event).await { From f53cef5bb6b0076e31c597a27856eeca65b07494 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 1 May 2024 17:52:00 +0300 Subject: [PATCH 06/22] clippy --- iroh-cli/src/commands/gossip.rs | 9 ++------- iroh/src/gossip_dispatcher.rs | 4 ++-- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/iroh-cli/src/commands/gossip.rs b/iroh-cli/src/commands/gossip.rs index 62ea762b9a..da94b88c2e 100644 --- a/iroh-cli/src/commands/gossip.rs +++ b/iroh-cli/src/commands/gossip.rs @@ -56,13 +56,8 @@ impl GossipCommands { GossipSubscribeResponse::Event(event) => { if verbose { println!("{:?}", event); - } else { - match event { - GossipEvent::Received(GossipMessage { content, .. }) => { - println!("{:?}", content); - } - _ => {} - } + } else if let GossipEvent::Received(GossipMessage { content, .. }) = event { + println!("{:?}", content); } } GossipSubscribeResponse::Lagged => { diff --git a/iroh/src/gossip_dispatcher.rs b/iroh/src/gossip_dispatcher.rs index 2634e7c5e8..b6323cba7b 100644 --- a/iroh/src/gossip_dispatcher.rs +++ b/iroh/src/gossip_dispatcher.rs @@ -336,7 +336,7 @@ impl GossipDispatcher { // We are joining, so we need to wait with creating the update task. // // TODO: should we merge the bootstrap nodes and try to join with all of them? - peers.extend(msg.bootstrap.into_iter()); + peers.extend(msg.bootstrap); waiting.push((updates, send)); } TopicState::Quitting { @@ -345,7 +345,7 @@ impl GossipDispatcher { .. } => { // We are quitting, so we need to wait with creating the update task. - peers.extend(msg.bootstrap.into_iter()); + peers.extend(msg.bootstrap); waiting.push((updates, send)); } TopicState::Live { live } => { From 2201a40c1668f67d4a5a2bfda579812dfd45abe4 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 5 Jun 2024 16:41:12 +0300 Subject: [PATCH 07/22] Move the gossip types into the gossip dispatcher --- iroh/src/gossip_dispatcher.rs | 57 ++++++++++++++++++++++++++++++-- iroh/src/rpc_protocol.rs | 62 +++-------------------------------- 2 files changed, 59 insertions(+), 60 deletions(-) diff --git a/iroh/src/gossip_dispatcher.rs b/iroh/src/gossip_dispatcher.rs index b6323cba7b..64df99513f 100644 --- a/iroh/src/gossip_dispatcher.rs +++ b/iroh/src/gossip_dispatcher.rs @@ -5,15 +5,66 @@ use std::{ sync::{Arc, Mutex}, }; +use bytes::Bytes; use futures_util::Stream; use iroh_base::rpc::{RpcError, RpcResult}; use iroh_gossip::{ net::{Event, Gossip}, - proto::TopicId, + proto::{DeliveryScope, TopicId}, }; -use iroh_net::{util::AbortingJoinHandle, NodeId}; +use iroh_net::{key::PublicKey, util::AbortingJoinHandle, NodeId}; +use serde::{Deserialize, Serialize}; -use crate::rpc_protocol::{GossipSubscribeRequest, GossipSubscribeResponse, GossipSubscribeUpdate}; +/// Join a gossip topic +#[derive(Serialize, Deserialize, Debug)] +pub struct GossipSubscribeRequest { + /// The topic to join + pub topic: TopicId, + /// The initial bootstrap nodes + pub bootstrap: BTreeSet, +} + +/// Send a gossip message +#[derive(Serialize, Deserialize, Debug)] +pub enum GossipSubscribeUpdate { + /// Broadcast a message to all nodes in the swarm + Broadcast(Bytes), + /// Broadcast a message to all direct neighbors + BroadcastNeighbors(Bytes), +} + +/// Update from a subscribed gossip topic +#[derive(Serialize, Deserialize, Debug)] +pub enum GossipSubscribeResponse { + /// A message was received + Event(GossipEvent), + /// We missed some messages + Lagged, +} + +/// Gossip event +/// An event to be emitted to the application for a particular topic. +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] +pub enum GossipEvent { + /// We have a new, direct neighbor in the swarm membership layer for this topic + NeighborUp(NodeId), + /// We dropped direct neighbor in the swarm membership layer for this topic + NeighborDown(NodeId), + /// A gossip message was received for this topic + Received(GossipMessage), +} + +/// A gossip message +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] +pub struct GossipMessage { + /// The content of the message + pub content: Bytes, + /// The scope of the message. + /// This tells us if the message is from a direct neighbor or actual gossip. + pub scope: DeliveryScope, + /// The node that delivered the message. This is not the same as the original author. + pub delivered_from: NodeId, +} /// A gossip engine that manages gossip subscriptions and updates. #[derive(Debug, Clone)] diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 99b1bb0ea8..23d6195453 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -7,11 +7,7 @@ //! response, while others like provide have a stream of responses. //! //! Note that this is subject to change. The RPC protocol is not yet stable. -use std::{ - collections::{BTreeMap, BTreeSet}, - net::SocketAddr, - path::PathBuf, -}; +use std::{collections::BTreeMap, net::SocketAddr, path::PathBuf}; use bytes::Bytes; use derive_more::{From, TryInto}; @@ -22,7 +18,6 @@ use iroh_bytes::{ store::{BaoBlobSize, ConsistencyCheckProgress}, util::Tag, }; -use iroh_gossip::proto::{DeliveryScope, TopicId}; use iroh_net::{ key::PublicKey, magic_endpoint::{ConnectionInfo, NodeAddr}, @@ -45,6 +40,10 @@ pub use iroh_base::rpc::{RpcError, RpcResult}; use iroh_bytes::store::{ExportFormat, ExportMode}; pub use iroh_bytes::{provider::AddProgress, store::ValidateProgress}; +pub use crate::gossip_dispatcher::{ + GossipEvent, GossipMessage, GossipSubscribeRequest, GossipSubscribeResponse, + GossipSubscribeUpdate, +}; use crate::sync_engine::LiveEvent; pub use crate::ticket::DocTicket; pub use iroh_bytes::util::SetTagOption; @@ -1123,33 +1122,6 @@ pub struct NodeStatsResponse { pub stats: BTreeMap, } -/// Join a gossip topic -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipSubscribeRequest { - /// The topic to join - pub topic: TopicId, - /// The initial bootstrap nodes - pub bootstrap: BTreeSet, -} - -/// Send a gossip message -#[derive(Serialize, Deserialize, Debug)] -pub enum GossipSubscribeUpdate { - /// Broadcast a message to all nodes in the swarm - Broadcast(Bytes), - /// Broadcast a message to all direct neighbors - BroadcastNeighbors(Bytes), -} - -/// Update from a subscribed gossip topic -#[derive(Serialize, Deserialize, Debug)] -pub enum GossipSubscribeResponse { - /// A message was received - Event(GossipEvent), - /// We missed some messages - Lagged, -} - impl Msg for GossipSubscribeRequest { type Pattern = BidiStreaming; } @@ -1159,18 +1131,6 @@ impl BidiStreamingMsg for GossipSubscribeRequest { type Response = RpcResult; } -/// Gossip event -/// An event to be emitted to the application for a particular topic. -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] -pub enum GossipEvent { - /// We have a new, direct neighbor in the swarm membership layer for this topic - NeighborUp(NodeId), - /// We dropped direct neighbor in the swarm membership layer for this topic - NeighborDown(NodeId), - /// A gossip message was received for this topic - Received(GossipMessage), -} - impl From> for GossipEvent { fn from(event: iroh_gossip::proto::Event) -> Self { match event { @@ -1185,18 +1145,6 @@ impl From> for GossipEvent { } } -/// A gossip message -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] -pub struct GossipMessage { - /// The content of the message - pub content: Bytes, - /// The scope of the message. - /// This tells us if the message is from a direct neighbor or actual gossip. - pub scope: DeliveryScope, - /// The node that delivered the message. This is not the same as the original author. - pub delivered_from: NodeId, -} - /// The RPC service for the iroh provider process. #[derive(Debug, Clone)] pub struct ProviderService; From 6b83cd22608f95651ddd5aebdd139f69aff8ff10 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 5 Jun 2024 17:37:26 +0300 Subject: [PATCH 08/22] move dispatcher into iroh-gossip under a feature flag --- Cargo.lock | 2 + iroh-cli/src/commands/gossip.rs | 5 +- iroh-gossip/Cargo.toml | 7 +- .../src/dispatcher.rs | 106 ++++++++++-------- iroh-gossip/src/lib.rs | 2 + iroh/src/client/gossip.rs | 6 +- iroh/src/lib.rs | 1 - iroh/src/node.rs | 3 +- iroh/src/node/builder.rs | 6 +- iroh/src/node/rpc.rs | 9 +- iroh/src/rpc_protocol.rs | 41 +++---- 11 files changed, 111 insertions(+), 77 deletions(-) rename iroh/src/gossip_dispatcher.rs => iroh-gossip/src/dispatcher.rs (84%) diff --git a/Cargo.lock b/Cargo.lock index 668d77c627..3510fbf838 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2561,7 +2561,9 @@ dependencies = [ "clap", "derive_more", "ed25519-dalek", + "flume", "futures-lite", + "futures-util", "genawaiter", "indexmap 2.2.6", "iroh-base", diff --git a/iroh-cli/src/commands/gossip.rs b/iroh-cli/src/commands/gossip.rs index da94b88c2e..cc9317e8fa 100644 --- a/iroh-cli/src/commands/gossip.rs +++ b/iroh-cli/src/commands/gossip.rs @@ -4,9 +4,8 @@ use clap::Subcommand; use futures_lite::StreamExt; use futures_util::SinkExt; use iroh::net::NodeId; -use iroh::rpc_protocol::{ - GossipEvent, GossipMessage, GossipSubscribeResponse, GossipSubscribeUpdate, -}; +use iroh::node::GossipEvent; +use iroh::rpc_protocol::{GossipMessage, GossipSubscribeResponse, GossipSubscribeUpdate}; use iroh::{client::Iroh, rpc_protocol::ProviderService}; use quic_rpc::ServiceConnection; use tokio::io::AsyncBufReadExt; diff --git a/iroh-gossip/Cargo.toml b/iroh-gossip/Cargo.toml index cd83f63df5..7af7dbfc8d 100644 --- a/iroh-gossip/Cargo.toml +++ b/iroh-gossip/Cargo.toml @@ -38,6 +38,10 @@ tokio = { version = "1", optional = true, features = ["io-util", "sync", "rt", " tokio-util = { version = "0.7.8", optional = true, features = ["codec"] } genawaiter = { version = "0.99.1", default-features = false, features = ["futures03"] } +# dispatcher dependencies (optional) +futures-util = { version = "0.3.30", optional = true } +flume = { version = "0.11", optional = true } + [dev-dependencies] clap = { version = "4", features = ["derive"] } iroh-test = { path = "../iroh-test" } @@ -46,8 +50,9 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } url = "2.4.0" [features] -default = ["net"] +default = ["net", "dispatcher"] net = ["dep:futures-lite", "dep:iroh-net", "dep:quinn", "dep:tokio", "dep:tokio-util"] +dispatcher = ["dep:flume", "dep:futures-util"] [[example]] name = "chat" diff --git a/iroh/src/gossip_dispatcher.rs b/iroh-gossip/src/dispatcher.rs similarity index 84% rename from iroh/src/gossip_dispatcher.rs rename to iroh-gossip/src/dispatcher.rs index 64df99513f..8d37fb2d06 100644 --- a/iroh/src/gossip_dispatcher.rs +++ b/iroh-gossip/src/dispatcher.rs @@ -1,32 +1,38 @@ -//! A gossip engine that manages gossip subscriptions and updates. +//! A higher level wrapper for the gossip engine that manages multiple gossip subscriptions and updates. use std::{ collections::{btree_map::Entry, BTreeMap, BTreeSet}, pin::Pin, sync::{Arc, Mutex}, }; +use crate::{ + net::{Event as IrohGossipEvent, Gossip}, + proto::{DeliveryScope, TopicId}, +}; use bytes::Bytes; use futures_util::Stream; use iroh_base::rpc::{RpcError, RpcResult}; -use iroh_gossip::{ - net::{Event, Gossip}, - proto::{DeliveryScope, TopicId}, -}; use iroh_net::{key::PublicKey, util::AbortingJoinHandle, NodeId}; use serde::{Deserialize, Serialize}; /// Join a gossip topic #[derive(Serialize, Deserialize, Debug)] -pub struct GossipSubscribeRequest { - /// The topic to join - pub topic: TopicId, +pub struct Options { /// The initial bootstrap nodes pub bootstrap: BTreeSet, + /// The maximum number of messages that can be buffered in a subscription. + /// + /// If this limit is reached, the subscriber will receive a `Lagged` response, + /// the message will be dropped, and the subscriber will be closed. + /// + /// This is to prevent a single slow subscriber from blocking the dispatch loop. + /// If a subscriber is lagging, it should be closed and re-opened. + pub subscription_capacity: usize, } /// Send a gossip message #[derive(Serialize, Deserialize, Debug)] -pub enum GossipSubscribeUpdate { +pub enum Command { /// Broadcast a message to all nodes in the swarm Broadcast(Bytes), /// Broadcast a message to all direct neighbors @@ -35,7 +41,7 @@ pub enum GossipSubscribeUpdate { /// Update from a subscribed gossip topic #[derive(Serialize, Deserialize, Debug)] -pub enum GossipSubscribeResponse { +pub enum Event { /// A message was received Event(GossipEvent), /// We missed some messages @@ -51,12 +57,26 @@ pub enum GossipEvent { /// We dropped direct neighbor in the swarm membership layer for this topic NeighborDown(NodeId), /// A gossip message was received for this topic - Received(GossipMessage), + Received(Message), +} + +impl From> for GossipEvent { + fn from(event: crate::proto::Event) -> Self { + match event { + crate::proto::Event::NeighborUp(node_id) => Self::NeighborUp(node_id), + crate::proto::Event::NeighborDown(node_id) => Self::NeighborDown(node_id), + crate::proto::Event::Received(message) => Self::Received(Message { + content: message.content, + scope: message.scope, + delivered_from: message.delivered_from, + }), + } + } } /// A gossip message #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] -pub struct GossipMessage { +pub struct Message { /// The content of the message pub content: Bytes, /// The scope of the message. @@ -83,18 +103,10 @@ struct State { task: Option>, } -/// The maximum number of messages that can be buffered in a subscription. -/// -/// If this limit is reached, the subscriber will receive a `Lagged` response, -/// the message will be dropped, and the subscriber will be closed. -/// -/// This is to prevent a single slow subscriber from blocking the dispatch loop. -/// If a subscriber is lagging, it should be closed and re-opened. -const SUBSCRIPTION_CAPACITY: usize = 128; /// Type alias for a stream of gossip updates, so we don't have to repeat all the bounds. -type UpdateStream = Box + Send + Sync + Unpin + 'static>; -/// Type alias for a sink of gossip responses. -type ResponseSink = flume::Sender>; +type CommandStream = Box + Send + Sync + Unpin + 'static>; +/// Type alias for a sink of gossip events. +type EventSink = flume::Sender>; #[derive(derive_more::Debug)] enum TopicState { @@ -103,7 +115,7 @@ enum TopicState { Joining { /// Stream/sink pairs that are waiting for the topic to become live. #[debug(skip)] - waiting: Vec<(UpdateStream, ResponseSink)>, + waiting: Vec<(CommandStream, EventSink)>, /// Set of bootstrap nodes we are using. bootstrap: BTreeSet, /// The task that is driving the join future. @@ -115,7 +127,7 @@ enum TopicState { Live { /// Task/sink pairs that are currently live. /// The task is the task that is sending broadcast messages to the topic. - live: Vec<(AbortingJoinHandle<()>, ResponseSink)>, + live: Vec<(AbortingJoinHandle<()>, EventSink)>, }, /// The topic is currently quitting. /// We can't make new subscriptions without waiting for the quit to finish. @@ -123,7 +135,7 @@ enum TopicState { /// Stream/sink pairs that are waiting for the topic to quit so /// it can be joined again. #[debug(skip)] - waiting: Vec<(UpdateStream, ResponseSink)>, + waiting: Vec<(CommandStream, EventSink)>, /// Set of bootstrap nodes we are using. /// /// This is used to re-join the topic after quitting. @@ -136,7 +148,7 @@ enum TopicState { impl TopicState { /// Extract all senders from the state. - fn into_senders(self) -> Vec { + fn into_senders(self) -> Vec { match self { TopicState::Joining { waiting, .. } | TopicState::Quitting { waiting, .. } => { waiting.into_iter().map(|(_, send)| send).collect() @@ -202,7 +214,7 @@ impl GossipDispatcher { /// Try to send an event to a sink. /// /// This will not wait until the sink is full, but send a `Lagged` response if the sink is almost full. - fn try_send(entry: &(AbortingJoinHandle<()>, ResponseSink), event: &Event) -> bool { + fn try_send(entry: &(AbortingJoinHandle<()>, EventSink), event: &IrohGossipEvent) -> bool { let (task, send) = entry; // This means that we stop sending to the stream when the update side is finished. if task.is_finished() { @@ -215,13 +227,13 @@ impl GossipDispatcher { // Check if the send buffer is almost full, and send a lagged response if it is. if let Some(cap) = send.capacity() { if send.len() >= cap - 1 { - send.try_send(Ok(GossipSubscribeResponse::Lagged)).ok(); + send.try_send(Ok(Event::Lagged)).ok(); return false; } } // Send the event to the stream. // We are the owner of the stream, so we can be sure that there is still room. - send.try_send(Ok(GossipSubscribeResponse::Event(event.clone().into()))) + send.try_send(Ok(Event::Event(event.clone().into()))) .is_ok() } @@ -249,10 +261,7 @@ impl GossipDispatcher { ); } } else { - tracing::trace!( - "Received event for unknown topic, possibly sync {}", - hex::encode(topic) - ); + tracing::trace!("Received event for unknown topic, possibly sync {topic}",); } } Ok(()) @@ -277,15 +286,15 @@ impl GossipDispatcher { async fn update_loop( gossip: Gossip, topic: TopicId, - mut updates: UpdateStream, + mut updates: CommandStream, ) -> anyhow::Result<()> { use futures_lite::stream::StreamExt; while let Some(update) = Pin::new(&mut updates).next().await { match update { - GossipSubscribeUpdate::Broadcast(msg) => { + Command::Broadcast(msg) => { gossip.broadcast(topic, msg).await?; } - GossipSubscribeUpdate::BroadcastNeighbors(msg) => { + Command::BroadcastNeighbors(msg) => { gossip.broadcast_neighbors(topic, msg).await?; } } @@ -294,7 +303,7 @@ impl GossipDispatcher { } /// Handle updates from the client, and handle update loop failure. - async fn update_task(self, topic: TopicId, updates: UpdateStream) { + async fn update_task(self, topic: TopicId, updates: CommandStream) { let Err(e) = Self::update_loop(self.gossip.clone(), topic, updates).await else { return; }; @@ -355,23 +364,24 @@ impl GossipDispatcher { } /// Subscribe to a gossip topic. - pub fn subscribe( + pub fn subscribe_with_opts( &self, - msg: GossipSubscribeRequest, - updates: UpdateStream, - ) -> impl Stream> { - let topic = msg.topic; + topic: TopicId, + options: Options, + updates: CommandStream, + ) -> impl Stream> { let mut inner = self.inner.lock().unwrap(); - let (send, recv) = flume::bounded(SUBSCRIPTION_CAPACITY); + let (send, recv) = flume::bounded(options.subscription_capacity); match inner.current_subscriptions.entry(topic) { Entry::Vacant(entry) => { // There is no existing subscription, so we need to start a new one. let waiting = vec![(updates, send)]; let this = self.clone(); - let join_task = spawn_owned(this.clone().join_task(topic, msg.bootstrap.clone())); + let join_task = + spawn_owned(this.clone().join_task(topic, options.bootstrap.clone())); entry.insert(TopicState::Joining { waiting, - bootstrap: msg.bootstrap, + bootstrap: options.bootstrap, join_task, }); } @@ -387,7 +397,7 @@ impl GossipDispatcher { // We are joining, so we need to wait with creating the update task. // // TODO: should we merge the bootstrap nodes and try to join with all of them? - peers.extend(msg.bootstrap); + peers.extend(options.bootstrap); waiting.push((updates, send)); } TopicState::Quitting { @@ -396,7 +406,7 @@ impl GossipDispatcher { .. } => { // We are quitting, so we need to wait with creating the update task. - peers.extend(msg.bootstrap); + peers.extend(options.bootstrap); waiting.push((updates, send)); } TopicState::Live { live } => { diff --git a/iroh-gossip/src/lib.rs b/iroh-gossip/src/lib.rs index 9c6dd3f27e..70a015a787 100644 --- a/iroh-gossip/src/lib.rs +++ b/iroh-gossip/src/lib.rs @@ -2,6 +2,8 @@ #![deny(missing_docs, rustdoc::broken_intra_doc_links)] +#[cfg(feature = "dispatcher")] +pub mod dispatcher; pub mod metrics; #[cfg(feature = "net")] pub mod net; diff --git a/iroh/src/client/gossip.rs b/iroh/src/client/gossip.rs index 2436a3de64..c9c87feea8 100644 --- a/iroh/src/client/gossip.rs +++ b/iroh/src/client/gossip.rs @@ -32,7 +32,11 @@ where )> { let (sink, stream) = self .rpc - .bidi(GossipSubscribeRequest { topic, bootstrap }) + .bidi(GossipSubscribeRequest { + topic, + bootstrap, + subscription_capacity: 1024, + }) .await?; let stream = stream.map(|item| anyhow::Ok(item??)); let sink = sink.sink_map_err(|_| anyhow::anyhow!("send error")); diff --git a/iroh/src/lib.rs b/iroh/src/lib.rs index 484a3e0da6..69ce9b98bb 100644 --- a/iroh/src/lib.rs +++ b/iroh/src/lib.rs @@ -14,7 +14,6 @@ pub use iroh_base::base32; pub mod client; pub mod dial; -pub mod gossip_dispatcher; pub mod node; pub mod rpc_protocol; pub mod sync_engine; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index d76d2a26e1..9ba8de7a07 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -16,6 +16,7 @@ use iroh_bytes::downloader::Downloader; use iroh_bytes::store::Store as BaoStore; use iroh_bytes::BlobFormat; use iroh_bytes::Hash; +use iroh_gossip::dispatcher::GossipDispatcher; use iroh_net::relay::RelayUrl; use iroh_net::util::AbortingJoinHandle; use iroh_net::{ @@ -31,7 +32,6 @@ use tokio_util::sync::CancellationToken; use tokio_util::task::LocalPoolHandle; use tracing::debug; -use crate::gossip_dispatcher::GossipDispatcher; use crate::rpc_protocol::{ProviderRequest, ProviderResponse}; use crate::sync_engine::SyncEngine; use crate::ticket::BlobTicket; @@ -41,6 +41,7 @@ mod rpc; mod rpc_status; pub use builder::{Builder, GcPolicy, NodeDiscoveryConfig, StorageConfig}; +pub use iroh_gossip::dispatcher::GossipEvent; pub use rpc_status::RpcStatus; type EventCallback = Box BoxFuture<()> + 'static + Sync + Send>; diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 0e10bc39fe..4ca82ed072 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -14,7 +14,10 @@ use iroh_bytes::{ protocol::Closed, store::{GcMarkEvent, GcSweepEvent, Map, Store as BaoStore}, }; -use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; +use iroh_gossip::{ + dispatcher::GossipDispatcher, + net::{Gossip, GOSSIP_ALPN}, +}; use iroh_net::{ discovery::{dns::DnsDiscovery, pkarr_publish::PkarrPublisher, ConcurrentDiscovery, Discovery}, magic_endpoint::get_alpn, @@ -33,7 +36,6 @@ use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use crate::{ client::quic::RPC_ALPN, - gossip_dispatcher::GossipDispatcher, node::{Event, NodeInner}, rpc_protocol::{ProviderRequest, ProviderResponse, ProviderService}, sync_engine::SyncEngine, diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index bb7d51eb6f..f962704f63 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -279,7 +279,14 @@ impl Handler { } GossipSubscribe(msg) => { chan.bidi_streaming(msg, handler, |handler, req, updates| { - handler.inner.gossip.subscribe(req, Box::new(updates)) + handler.inner.gossip.subscribe_with_opts( + req.topic, + iroh_gossip::dispatcher::Options { + bootstrap: req.bootstrap, + subscription_capacity: req.subscription_capacity, + }, + Box::new(updates), + ) }) .await } diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 23d6195453..266b0ad0df 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -7,7 +7,11 @@ //! response, while others like provide have a stream of responses. //! //! Note that this is subject to change. The RPC protocol is not yet stable. -use std::{collections::BTreeMap, net::SocketAddr, path::PathBuf}; +use std::{ + collections::{BTreeMap, BTreeSet}, + net::SocketAddr, + path::PathBuf, +}; use bytes::Bytes; use derive_more::{From, TryInto}; @@ -40,13 +44,13 @@ pub use iroh_base::rpc::{RpcError, RpcResult}; use iroh_bytes::store::{ExportFormat, ExportMode}; pub use iroh_bytes::{provider::AddProgress, store::ValidateProgress}; -pub use crate::gossip_dispatcher::{ - GossipEvent, GossipMessage, GossipSubscribeRequest, GossipSubscribeResponse, - GossipSubscribeUpdate, -}; use crate::sync_engine::LiveEvent; pub use crate::ticket::DocTicket; pub use iroh_bytes::util::SetTagOption; +pub use iroh_gossip::dispatcher::{ + Command as GossipSubscribeUpdate, Event as GossipSubscribeResponse, Message as GossipMessage, +}; +use iroh_gossip::proto::TopicId; /// A 32-byte key or token pub type KeyBytes = [u8; 32]; @@ -1122,6 +1126,19 @@ pub struct NodeStatsResponse { pub stats: BTreeMap, } +/// A request to the node to subscribe to gossip events. +/// +/// This is basically a topic and additional optins +#[derive(Serialize, Deserialize, Debug)] +pub struct GossipSubscribeRequest { + /// The topic to subscribe to + pub topic: TopicId, + /// The nodes to bootstrap the subscription from + pub bootstrap: BTreeSet, + /// The capacity of the subscription + pub subscription_capacity: usize, +} + impl Msg for GossipSubscribeRequest { type Pattern = BidiStreaming; } @@ -1131,20 +1148,6 @@ impl BidiStreamingMsg for GossipSubscribeRequest { type Response = RpcResult; } -impl From> for GossipEvent { - fn from(event: iroh_gossip::proto::Event) -> Self { - match event { - iroh_gossip::proto::Event::NeighborUp(node_id) => Self::NeighborUp(node_id), - iroh_gossip::proto::Event::NeighborDown(node_id) => Self::NeighborDown(node_id), - iroh_gossip::proto::Event::Received(message) => Self::Received(GossipMessage { - content: message.content, - scope: message.scope, - delivered_from: message.delivered_from, - }), - } - } -} - /// The RPC service for the iroh provider process. #[derive(Debug, Clone)] pub struct ProviderService; From b19ed510cfedc0fceb7a05d54541ae598666c8f0 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 5 Jun 2024 18:02:15 +0300 Subject: [PATCH 09/22] some renaming --- iroh-cli/src/commands/gossip.rs | 2 +- iroh-gossip/src/dispatcher.rs | 10 +++++----- iroh/src/node/builder.rs | 2 +- iroh/src/node/rpc.rs | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/iroh-cli/src/commands/gossip.rs b/iroh-cli/src/commands/gossip.rs index cc9317e8fa..81644585bf 100644 --- a/iroh-cli/src/commands/gossip.rs +++ b/iroh-cli/src/commands/gossip.rs @@ -52,7 +52,7 @@ impl GossipCommands { res = stream.next() => { let res = res.context("gossip stream ended")?.context("failed to read gossip stream")?; match res { - GossipSubscribeResponse::Event(event) => { + GossipSubscribeResponse::Gossip(event) => { if verbose { println!("{:?}", event); } else if let GossipEvent::Received(GossipMessage { content, .. }) = event { diff --git a/iroh-gossip/src/dispatcher.rs b/iroh-gossip/src/dispatcher.rs index 8d37fb2d06..061b1ef78b 100644 --- a/iroh-gossip/src/dispatcher.rs +++ b/iroh-gossip/src/dispatcher.rs @@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize}; /// Join a gossip topic #[derive(Serialize, Deserialize, Debug)] -pub struct Options { +pub struct SubscribeOptions { /// The initial bootstrap nodes pub bootstrap: BTreeSet, /// The maximum number of messages that can be buffered in a subscription. @@ -43,7 +43,7 @@ pub enum Command { #[derive(Serialize, Deserialize, Debug)] pub enum Event { /// A message was received - Event(GossipEvent), + Gossip(GossipEvent), /// We missed some messages Lagged, } @@ -160,7 +160,7 @@ impl TopicState { impl GossipDispatcher { /// Create a new gossip dispatcher with the given gossip instance. - pub fn spawn(gossip: Gossip) -> Self { + pub fn new(gossip: Gossip) -> Self { let inner = Arc::new(Mutex::new(State { current_subscriptions: BTreeMap::new(), task: None, @@ -233,7 +233,7 @@ impl GossipDispatcher { } // Send the event to the stream. // We are the owner of the stream, so we can be sure that there is still room. - send.try_send(Ok(Event::Event(event.clone().into()))) + send.try_send(Ok(Event::Gossip(event.clone().into()))) .is_ok() } @@ -367,7 +367,7 @@ impl GossipDispatcher { pub fn subscribe_with_opts( &self, topic: TopicId, - options: Options, + options: SubscribeOptions, updates: CommandStream, ) -> impl Stream> { let mut inner = self.inner.lock().unwrap(); diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 4ca82ed072..6cc91b5cd8 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -401,7 +401,7 @@ where self.blobs_store.clone(), downloader.clone(), ); - let gossip_dispatcher = GossipDispatcher::spawn(gossip.clone()); + let gossip_dispatcher = GossipDispatcher::new(gossip.clone()); let sync_db = sync.sync.clone(); let callbacks = Callbacks::default(); diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index f962704f63..b36ab0218f 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -281,7 +281,7 @@ impl Handler { chan.bidi_streaming(msg, handler, |handler, req, updates| { handler.inner.gossip.subscribe_with_opts( req.topic, - iroh_gossip::dispatcher::Options { + iroh_gossip::dispatcher::SubscribeOptions { bootstrap: req.bootstrap, subscription_capacity: req.subscription_capacity, }, From 5d6b7d0aa4b756dd922a3d2834391609f261f332 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 5 Jun 2024 19:06:55 +0300 Subject: [PATCH 10/22] fix weird unused warning --- iroh-net/src/portmapper/current_mapping.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/iroh-net/src/portmapper/current_mapping.rs b/iroh-net/src/portmapper/current_mapping.rs index 0c2273ece3..4c1c66591f 100644 --- a/iroh-net/src/portmapper/current_mapping.rs +++ b/iroh-net/src/portmapper/current_mapping.rs @@ -184,9 +184,6 @@ mod tests { // for testing a mapping is simply an ip, port pair type M = (Ipv4Addr, NonZeroU16); - const TEST_PORT: NonZeroU16 = // SAFETY: it's clearly non zero - unsafe { NonZeroU16::new_unchecked(9586) }; - const TEST_IP: std::net::Ipv4Addr = std::net::Ipv4Addr::LOCALHOST; const HALF_LIFETIME_SECS: u64 = 1; impl Mapping for M { @@ -201,6 +198,9 @@ mod tests { #[tokio::test] #[ntest::timeout(2500)] async fn report_renew_expire_report() { + const TEST_PORT: NonZeroU16 = // SAFETY: it's clearly non zero + unsafe { NonZeroU16::new_unchecked(9586) }; + const TEST_IP: std::net::Ipv4Addr = std::net::Ipv4Addr::LOCALHOST; let (mut c, mut watcher) = CurrentMapping::::new(); let now = std::time::Instant::now(); c.update(Some((TEST_IP, TEST_PORT))); From 865bc0d1d02b1c78e90721bccd1e8f01b4155466 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 7 Jun 2024 13:35:49 +0300 Subject: [PATCH 11/22] PR review - remove some error level loggings - more docs for subscribe - add subscribe opts - allow passing in raw topic --- iroh-cli/src/commands/gossip.rs | 35 +++++++++++++++++++++++++++++---- iroh-gossip/src/dispatcher.rs | 14 +++++-------- iroh-net/src/util.rs | 1 + iroh/src/client/gossip.rs | 34 ++++++++++++++++++++++++++++---- 4 files changed, 67 insertions(+), 17 deletions(-) diff --git a/iroh-cli/src/commands/gossip.rs b/iroh-cli/src/commands/gossip.rs index 26c7f82424..858ef5c9c5 100644 --- a/iroh-cli/src/commands/gossip.rs +++ b/iroh-cli/src/commands/gossip.rs @@ -1,8 +1,9 @@ use anyhow::{Context, Result}; use bao_tree::blake3; -use clap::Subcommand; +use clap::{ArgGroup, Subcommand}; use futures_lite::StreamExt; use futures_util::SinkExt; +use iroh::client::gossip::SubscribeOpts; use iroh::client::{Iroh, RpcService}; use iroh::net::NodeId; use quic_rpc::ServiceConnection; @@ -12,9 +13,20 @@ use tokio::io::AsyncBufReadExt; #[allow(clippy::large_enum_variant)] pub enum GossipCommands { /// Subscribe to a topic + #[command(group( + ArgGroup::new("input") + .required(true) + .args(&["topic", "raw_topic"]) + ))] Subscribe { + /// Topic string to subscribe to. + /// + /// This will be hashed with BLAKE3 to get the actual topic ID. #[clap(long)] - topic: String, + topic: Option, + /// The raw topic to subscribe to as hex. Needs to be 32 bytes, i.e. 64 hex characters. + #[clap(long)] + raw_topic: Option, bootstrap: Vec, #[clap(long, short)] verbose: bool, @@ -29,13 +41,28 @@ impl GossipCommands { match self { Self::Subscribe { topic, + raw_topic, bootstrap, verbose, } => { let bootstrap = bootstrap.into_iter().collect(); - let topic = blake3::hash(topic.as_ref()).into(); + let topic = match (topic, raw_topic) { + (Some(topic), None) => blake3::hash(topic.as_bytes()).into(), + (None, Some(raw_topic)) => { + let mut slice = [0; 32]; + hex::decode_to_slice(raw_topic, &mut slice) + .context("failed to decode raw topic")?; + slice.into() + } + _ => anyhow::bail!("either topic or raw_topic must be provided"), + }; + // blake3::hash(topic.as_ref()).into(); + let opts = SubscribeOpts { + bootstrap, + subscription_capacity: 1024, + }; - let (mut sink, mut stream) = iroh.gossip.subscribe(topic, bootstrap).await?; + let (mut sink, mut stream) = iroh.gossip.subscribe_with_opts(topic, opts).await?; let mut input_lines = tokio::io::BufReader::new(tokio::io::stdin()).lines(); loop { tokio::select! { diff --git a/iroh-gossip/src/dispatcher.rs b/iroh-gossip/src/dispatcher.rs index 061b1ef78b..3da9452290 100644 --- a/iroh-gossip/src/dispatcher.rs +++ b/iroh-gossip/src/dispatcher.rs @@ -119,8 +119,7 @@ enum TopicState { /// Set of bootstrap nodes we are using. bootstrap: BTreeSet, /// The task that is driving the join future. - #[allow(dead_code)] - join_task: AbortingJoinHandle<()>, + _join_task: AbortingJoinHandle<()>, }, /// The topic is currently live. /// New subscriptions can be immediately added. @@ -190,13 +189,13 @@ impl GossipDispatcher { return; } let bootstrap = peers.clone(); - let join_task = spawn_owned(self.clone().join_task(topic, bootstrap)); + let _join_task = spawn_owned(self.clone().join_task(topic, bootstrap)); inner.current_subscriptions.insert( topic, TopicState::Joining { waiting, bootstrap: peers, - join_task, + _join_task, }, ); } @@ -322,11 +321,8 @@ impl GossipDispatcher { /// /// Basically just flattens the two stages of joining into one. async fn join(gossip: Gossip, topic: TopicId, bootstrap: Vec) -> anyhow::Result<()> { - tracing::error!("Joining gossip topic {:?}", topic); let join = gossip.join(topic, bootstrap).await?; - tracing::error!("Waiting for joint to gossip topic {:?} to succeed", topic); join.await?; - tracing::error!("Joined gossip topic {:?}", topic); Ok(()) } @@ -377,12 +373,12 @@ impl GossipDispatcher { // There is no existing subscription, so we need to start a new one. let waiting = vec![(updates, send)]; let this = self.clone(); - let join_task = + let _join_task = spawn_owned(this.clone().join_task(topic, options.bootstrap.clone())); entry.insert(TopicState::Joining { waiting, bootstrap: options.bootstrap, - join_task, + _join_task, }); } Entry::Occupied(mut entry) => { diff --git a/iroh-net/src/util.rs b/iroh-net/src/util.rs index e94655b51f..a6aa824af0 100644 --- a/iroh-net/src/util.rs +++ b/iroh-net/src/util.rs @@ -14,6 +14,7 @@ pub mod chain; /// A join handle that owns the task it is running, and aborts it when dropped. #[derive(Debug, derive_more::Deref)] +#[must_use = "aborting join handle will abort the task when dropped"] pub struct AbortingJoinHandle { handle: tokio::task::JoinHandle, } diff --git a/iroh/src/client/gossip.rs b/iroh/src/client/gossip.rs index 2f0356c5c7..fd1c7614b1 100644 --- a/iroh/src/client/gossip.rs +++ b/iroh/src/client/gossip.rs @@ -18,15 +18,41 @@ pub struct Client { pub(super) rpc: RpcClient, } +/// Options for subscribing to a gossip topic. +#[derive(Debug, Clone)] +pub struct SubscribeOpts { + /// Bootstrap nodes to connect to. + pub bootstrap: BTreeSet, + /// Subscription capacity. + pub subscription_capacity: usize, +} + impl Client where C: ServiceConnection, { /// Subscribe to a gossip topic. - pub async fn subscribe( + /// + /// Returns a sink to send updates to the topic and a stream of responses. + /// + /// Updates are either [Broadcast](iroh_gossip::dispatcher::Command::Broadcast) + /// or [BroadcastNeighbors](iroh_gossip::dispatcher::Command::BroadcastNeighbors). + /// + /// Broadcasts are gossiped to the entire swarm, while BroadcastNeighbors are sent to + /// just the immediate neighbors of the node. + /// + /// Responses are either [Gossip](iroh_gossip::dispatcher::Event::Gossip) or + /// [Lagged](iroh_gossip::dispatcher::Event::Lagged). + /// + /// Gossip events contain the actual message content, as well as information about the + /// immediate neighbors of the node. + /// + /// A Lagged event indicates that the gossip stream has not been consumed quickly enough. + /// You can adjust the buffer size with the [] option. + pub async fn subscribe_with_opts( &self, topic: TopicId, - bootstrap: BTreeSet, + opts: SubscribeOpts, ) -> Result<( impl Sink, impl Stream>, @@ -35,8 +61,8 @@ where .rpc .bidi(GossipSubscribeRequest { topic, - bootstrap, - subscription_capacity: 1024, + bootstrap: opts.bootstrap, + subscription_capacity: opts.subscription_capacity, }) .await?; let stream = stream.map(|item| anyhow::Ok(item??)); From cb216f65b8106b683bb597912b60b7a6291ce677 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 28 Jun 2024 17:15:07 +0300 Subject: [PATCH 12/22] very simple gossip smoke test - connect two nodes - send a message from node 1 - recv it on node 2 --- iroh/tests/client.rs | 92 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 iroh/tests/client.rs diff --git a/iroh/tests/client.rs b/iroh/tests/client.rs new file mode 100644 index 0000000000..540df7261d --- /dev/null +++ b/iroh/tests/client.rs @@ -0,0 +1,92 @@ +use bytes::Bytes; +use futures_lite::{Stream, StreamExt}; +use futures_util::SinkExt; +use iroh::client::{gossip::SubscribeOpts, Iroh}; +use iroh_gossip::{ + dispatcher::{Command, Event, GossipEvent}, + proto::TopicId, +}; +use iroh_net::{key::SecretKey, NodeAddr}; +use tokio::task::JoinHandle; + +/// Spawn an iroh node in a separate thread and tokio runtime, and return +/// the address and client. +fn spawn_node() -> (NodeAddr, Iroh) { + let (sender, receiver) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + runtime.block_on(async move { + let secret_key = SecretKey::generate(); + let node = iroh::node::Builder::default() + .secret_key(secret_key) + .spawn() + .await?; + let addr = node.my_addr().await?; + sender.send((addr, node.client().clone()))?; + node.cancel_token().cancelled().await; + anyhow::Ok(()) + })?; + anyhow::Ok(()) + }); + receiver.recv().unwrap() +} + +/// Await `n` messages from a stream of gossip events. +fn await_messages( + mut stream: impl Stream> + Unpin + Send + Sync + 'static, + n: usize, +) -> JoinHandle> { + tokio::spawn(async move { + let mut res = Vec::new(); + while let Some(msg) = stream.next().await { + match msg.unwrap() { + Event::Gossip(GossipEvent::Received(msg)) => { + res.push(msg.content); + if res.len() >= n { + break; + } + } + _ => {} + } + } + res + }) +} + +#[tokio::test] +async fn gossip_smoke() { + let _ = tracing_subscriber::fmt::try_init(); + let (addr1, node1) = spawn_node(); + let (_addr2, node2) = spawn_node(); + let gossip1 = node1.gossip(); + let gossip2 = node2.gossip(); + let topic = TopicId::from([0u8; 32]); + let (mut sink1, _stream2) = gossip1 + .subscribe_with_opts( + topic, + SubscribeOpts { + bootstrap: Default::default(), + subscription_capacity: 10, + }, + ) + .await + .unwrap(); + let (_sink2, stream2) = gossip2 + .subscribe_with_opts( + topic, + SubscribeOpts { + bootstrap: [addr1.node_id].into_iter().collect(), + subscription_capacity: 10, + }, + ) + .await + .unwrap(); + sink1 + .send(Command::Broadcast("hello".into())) + .await + .unwrap(); + let msgs = await_messages(stream2, 1).await.unwrap(); + assert_eq!(msgs, vec![Bytes::from("hello")]); +} From 3656b5a9ae51bc5e3e3645df1f41ac1dbe96c9a4 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 28 Jun 2024 17:52:40 +0300 Subject: [PATCH 13/22] add bootstrap node in both directions --- iroh/src/client/gossip.rs | 9 +++++++++ iroh/tests/client.rs | 8 ++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/iroh/src/client/gossip.rs b/iroh/src/client/gossip.rs index 3c109ace06..723e3a7bba 100644 --- a/iroh/src/client/gossip.rs +++ b/iroh/src/client/gossip.rs @@ -28,6 +28,15 @@ pub struct SubscribeOpts { pub subscription_capacity: usize, } +impl Default for SubscribeOpts { + fn default() -> Self { + Self { + bootstrap: BTreeSet::new(), + subscription_capacity: 256, + } + } +} + impl Client { /// Subscribe to a gossip topic. /// diff --git a/iroh/tests/client.rs b/iroh/tests/client.rs index 540df7261d..93536d59ea 100644 --- a/iroh/tests/client.rs +++ b/iroh/tests/client.rs @@ -59,7 +59,7 @@ fn await_messages( async fn gossip_smoke() { let _ = tracing_subscriber::fmt::try_init(); let (addr1, node1) = spawn_node(); - let (_addr2, node2) = spawn_node(); + let (addr2, node2) = spawn_node(); let gossip1 = node1.gossip(); let gossip2 = node2.gossip(); let topic = TopicId::from([0u8; 32]); @@ -67,8 +67,8 @@ async fn gossip_smoke() { .subscribe_with_opts( topic, SubscribeOpts { - bootstrap: Default::default(), - subscription_capacity: 10, + bootstrap: [addr2.node_id].into_iter().collect(), + ..Default::default() }, ) .await @@ -78,7 +78,7 @@ async fn gossip_smoke() { topic, SubscribeOpts { bootstrap: [addr1.node_id].into_iter().collect(), - subscription_capacity: 10, + ..Default::default() }, ) .await From 3771dc21ab9eb4c45d7d35027711d5600df5330b Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 1 Jul 2024 11:27:14 +0300 Subject: [PATCH 14/22] Add new request to add a NodeAddr via rpc otherwise it is impossible to write certain tests using just the client. --- iroh/src/client/node.rs | 12 +++++++++--- iroh/src/node/rpc.rs | 15 +++++++++++---- iroh/src/rpc_protocol.rs | 10 ++++++++++ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/iroh/src/client/node.rs b/iroh/src/client/node.rs index 7091c78a49..5898182072 100644 --- a/iroh/src/client/node.rs +++ b/iroh/src/client/node.rs @@ -9,9 +9,9 @@ use iroh_net::{endpoint::ConnectionInfo, relay::RelayUrl, NodeAddr, NodeId}; use serde::{Deserialize, Serialize}; use crate::rpc_protocol::{ - CounterStats, NodeAddrRequest, NodeConnectionInfoRequest, NodeConnectionInfoResponse, - NodeConnectionsRequest, NodeIdRequest, NodeRelayRequest, NodeShutdownRequest, NodeStatsRequest, - NodeStatusRequest, + CounterStats, NodeAddAddrRequest, NodeAddrRequest, NodeConnectionInfoRequest, + NodeConnectionInfoResponse, NodeConnectionsRequest, NodeIdRequest, NodeRelayRequest, + NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest, }; use super::{flatten, Iroh}; @@ -56,6 +56,12 @@ impl Iroh { Ok(addr) } + /// Add a known node address to the node. + pub async fn add_node_addr(&self, addr: NodeAddr) -> Result<()> { + self.rpc.rpc(NodeAddAddrRequest { addr }).await??; + Ok(()) + } + /// Get the relay server we are connected to. pub async fn my_relay(&self) -> Result> { let relay = self.rpc.rpc(NodeRelayRequest).await??; diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index f24854b139..c71f961b39 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -46,10 +46,10 @@ use crate::rpc_protocol::{ BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest, DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocSetHashRequest, - ListTagsRequest, NodeAddrRequest, NodeConnectionInfoRequest, NodeConnectionInfoResponse, - NodeConnectionsRequest, NodeConnectionsResponse, NodeIdRequest, NodeRelayRequest, - NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, NodeWatchRequest, - NodeWatchResponse, Request, RpcService, SetTagOption, + ListTagsRequest, NodeAddAddrRequest, NodeAddrRequest, NodeConnectionInfoRequest, + NodeConnectionInfoResponse, NodeConnectionsRequest, NodeConnectionsResponse, NodeIdRequest, + NodeRelayRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, + NodeWatchRequest, NodeWatchResponse, Request, RpcService, SetTagOption, }; mod docs; @@ -141,6 +141,7 @@ impl Handler { .await } NodeConnectionInfo(msg) => chan.rpc(msg, self, Self::node_connection_info).await, + NodeAddAddr(msg) => chan.rpc(msg, self, Self::node_add_addr).await, BlobList(msg) => chan.server_streaming(msg, self, Self::blob_list).await, BlobListIncomplete(msg) => { chan.server_streaming(msg, self, Self::blob_list_incomplete) @@ -996,6 +997,12 @@ impl Handler { Ok(NodeConnectionInfoResponse { conn_info }) } + async fn node_add_addr(self, req: NodeAddAddrRequest) -> RpcResult<()> { + let NodeAddAddrRequest { addr } = req; + self.inner.endpoint.add_node_addr(addr)?; + Ok(()) + } + async fn create_collection( self, req: CreateCollectionRequest, diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 4e7d5a6068..b0e52f3aca 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -365,6 +365,15 @@ impl RpcMsg for NodeAddrRequest { type Response = RpcResult; } +#[derive(Serialize, Deserialize, Debug)] +pub struct NodeAddAddrRequest { + pub addr: NodeAddr, +} + +impl RpcMsg for NodeAddAddrRequest { + type Response = RpcResult<()>; +} + #[derive(Serialize, Deserialize, Debug)] pub struct NodeRelayRequest; @@ -1046,6 +1055,7 @@ pub enum Request { NodeStatus(NodeStatusRequest), NodeId(NodeIdRequest), NodeAddr(NodeAddrRequest), + NodeAddAddr(NodeAddAddrRequest), NodeRelay(NodeRelayRequest), NodeStats(NodeStatsRequest), NodeShutdown(NodeShutdownRequest), From 609626fa57f9ac4ce31ecca49bc38716d186835d Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 1 Jul 2024 11:49:58 +0300 Subject: [PATCH 15/22] Move the various node ops into a separate subservice impl deref for backwards compat (maybe we can remove this at some point) --- iroh/src/client.rs | 17 +++++++++++++++-- iroh/src/client/node.rs | 12 ++++++++++-- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/iroh/src/client.rs b/iroh/src/client.rs index ce9647f5fd..1b93e1db2c 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -2,6 +2,7 @@ use futures_lite::{Stream, StreamExt}; use ref_cast::RefCast; +use std::ops::Deref; #[doc(inline)] pub use crate::rpc_protocol::RpcService; @@ -24,10 +25,9 @@ pub(crate) use self::quic::{connect_raw as quic_connect_raw, RPC_ALPN}; pub mod authors; pub mod blobs; pub mod docs; +pub mod node; pub mod tags; -mod node; - /// Iroh rpc client - boxed so that we can have a concrete type. pub(crate) type RpcClient = quic_rpc::RpcClient>; @@ -38,6 +38,14 @@ pub struct Iroh { rpc: RpcClient, } +impl Deref for Iroh { + type Target = node::Client; + + fn deref(&self) -> &Self::Target { + self.node() + } +} + impl Iroh { /// Create a new high-level client to a Iroh node from the low-level RPC client. pub fn new(rpc: RpcClient) -> Self { @@ -63,6 +71,11 @@ impl Iroh { pub fn tags(&self) -> &tags::Client { tags::Client::ref_cast(&self.rpc) } + + /// Node client + pub fn node(&self) -> &node::Client { + node::Client::ref_cast(&self.rpc) + } } fn flatten( diff --git a/iroh/src/client/node.rs b/iroh/src/client/node.rs index 5898182072..2af115fc0f 100644 --- a/iroh/src/client/node.rs +++ b/iroh/src/client/node.rs @@ -6,6 +6,7 @@ use anyhow::Result; use futures_lite::{Stream, StreamExt}; use iroh_base::key::PublicKey; use iroh_net::{endpoint::ConnectionInfo, relay::RelayUrl, NodeAddr, NodeId}; +use ref_cast::RefCast; use serde::{Deserialize, Serialize}; use crate::rpc_protocol::{ @@ -14,9 +15,16 @@ use crate::rpc_protocol::{ NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest, }; -use super::{flatten, Iroh}; +use super::{flatten, RpcClient}; -impl Iroh { +/// Iroh node client. +#[derive(Debug, Clone, RefCast)] +#[repr(transparent)] +pub struct Client { + pub(super) rpc: RpcClient, +} + +impl Client { /// Get statistics of the running node. pub async fn stats(&self) -> Result> { let res = self.rpc.rpc(NodeStatsRequest {}).await??; From eaf4e866ea7bac34c15a024bfd4e9877cc234b8a Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 1 Jul 2024 12:35:16 +0300 Subject: [PATCH 16/22] Make clippy happy --- iroh/src/node/rpc.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index c71f961b39..bb87bdcb4a 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -997,6 +997,8 @@ impl Handler { Ok(NodeConnectionInfoResponse { conn_info }) } + // This method is called as an RPC method, which have to be async + #[allow(clippy::unused_async)] async fn node_add_addr(self, req: NodeAddAddrRequest) -> RpcResult<()> { let NodeAddAddrRequest { addr } = req; self.inner.endpoint.add_node_addr(addr)?; From 51dfde83bde87321e676a56a454aec35cb0a8340 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 1 Jul 2024 12:42:57 +0300 Subject: [PATCH 17/22] Make sure the nodes know each others addr without discovery this test is not meant to be a discovery test... --- iroh/tests/client.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/iroh/tests/client.rs b/iroh/tests/client.rs index 93536d59ea..b062113fe0 100644 --- a/iroh/tests/client.rs +++ b/iroh/tests/client.rs @@ -62,6 +62,8 @@ async fn gossip_smoke() { let (addr2, node2) = spawn_node(); let gossip1 = node1.gossip(); let gossip2 = node2.gossip(); + node1.add_node_addr(addr2.clone()).await.unwrap(); + node2.add_node_addr(addr1.clone()).await.unwrap(); let topic = TopicId::from([0u8; 32]); let (mut sink1, _stream2) = gossip1 .subscribe_with_opts( From 0b227c0f4c5417546f8243af44e4936600c0c3de Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 1 Jul 2024 13:09:25 +0300 Subject: [PATCH 18/22] shut up clippy --- iroh/src/rpc_protocol.rs | 2 +- iroh/tests/client.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 397e28eea5..fa952427a0 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -1052,7 +1052,7 @@ pub struct NodeStatsResponse { /// A request to the node to subscribe to gossip events. /// -/// This is basically a topic and additional optins +/// This is basically a topic and additional options #[derive(Serialize, Deserialize, Debug)] pub struct GossipSubscribeRequest { /// The topic to subscribe to diff --git a/iroh/tests/client.rs b/iroh/tests/client.rs index b062113fe0..edbdff44e4 100644 --- a/iroh/tests/client.rs +++ b/iroh/tests/client.rs @@ -40,6 +40,7 @@ fn await_messages( ) -> JoinHandle> { tokio::spawn(async move { let mut res = Vec::new(); + #[allow(clippy::single_match)] while let Some(msg) = stream.next().await { match msg.unwrap() { Event::Gossip(GossipEvent::Received(msg)) => { From 23634484028deaee412799a8de1a421b8a7b11cc Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 1 Jul 2024 13:56:59 +0300 Subject: [PATCH 19/22] Fix test compile error --- iroh/tests/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh/tests/client.rs b/iroh/tests/client.rs index edbdff44e4..e1adfe7178 100644 --- a/iroh/tests/client.rs +++ b/iroh/tests/client.rs @@ -23,7 +23,7 @@ fn spawn_node() -> (NodeAddr, Iroh) { .secret_key(secret_key) .spawn() .await?; - let addr = node.my_addr().await?; + let addr = node.node_addr().await?; sender.send((addr, node.client().clone()))?; node.cancel_token().cancelled().await; anyhow::Ok(()) From 4fc00c24637d0caa5bad1f32d821e81850cba6d8 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 1 Jul 2024 14:24:05 +0300 Subject: [PATCH 20/22] Add short subscribe fn --- iroh/src/client/gossip.rs | 20 +++++++++++++++ iroh/tests/client.rs | 52 +++++++++++++++++---------------------- 2 files changed, 43 insertions(+), 29 deletions(-) diff --git a/iroh/src/client/gossip.rs b/iroh/src/client/gossip.rs index 723e3a7bba..4c2c12b084 100644 --- a/iroh/src/client/gossip.rs +++ b/iroh/src/client/gossip.rs @@ -76,4 +76,24 @@ impl Client { let sink = sink.sink_map_err(|_| anyhow::anyhow!("send error")); Ok((sink, stream)) } + + /// Subscribe to a gossip topic with default options. + pub async fn subscribe( + &self, + topic: impl Into, + bootstrap: impl IntoIterator>, + ) -> Result<( + impl Sink, + impl Stream>, + )> { + let bootstrap = bootstrap.into_iter().map(Into::into).collect(); + self.subscribe_with_opts( + topic.into(), + SubscribeOpts { + bootstrap, + ..Default::default() + }, + ) + .await + } } diff --git a/iroh/tests/client.rs b/iroh/tests/client.rs index e1adfe7178..071b0dc469 100644 --- a/iroh/tests/client.rs +++ b/iroh/tests/client.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use futures_lite::{Stream, StreamExt}; use futures_util::SinkExt; -use iroh::client::{gossip::SubscribeOpts, Iroh}; +use iroh::client::Iroh; use iroh_gossip::{ dispatcher::{Command, Event, GossipEvent}, proto::TopicId, @@ -57,39 +57,33 @@ fn await_messages( } #[tokio::test] -async fn gossip_smoke() { +async fn gossip_smoke() -> TestResult { let _ = tracing_subscriber::fmt::try_init(); let (addr1, node1) = spawn_node(); let (addr2, node2) = spawn_node(); let gossip1 = node1.gossip(); let gossip2 = node2.gossip(); - node1.add_node_addr(addr2.clone()).await.unwrap(); - node2.add_node_addr(addr1.clone()).await.unwrap(); + node1.add_node_addr(addr2.clone()).await?; + node2.add_node_addr(addr1.clone()).await?; let topic = TopicId::from([0u8; 32]); - let (mut sink1, _stream2) = gossip1 - .subscribe_with_opts( - topic, - SubscribeOpts { - bootstrap: [addr2.node_id].into_iter().collect(), - ..Default::default() - }, - ) - .await - .unwrap(); - let (_sink2, stream2) = gossip2 - .subscribe_with_opts( - topic, - SubscribeOpts { - bootstrap: [addr1.node_id].into_iter().collect(), - ..Default::default() - }, - ) - .await - .unwrap(); - sink1 - .send(Command::Broadcast("hello".into())) - .await - .unwrap(); - let msgs = await_messages(stream2, 1).await.unwrap(); + let (mut sink1, _stream2) = gossip1.subscribe(topic, [addr2.node_id]).await?; + let (_sink2, stream2) = gossip2.subscribe(topic, [addr1.node_id]).await?; + sink1.send(Command::Broadcast("hello".into())).await?; + let msgs = await_messages(stream2, 1).await?; assert_eq!(msgs, vec![Bytes::from("hello")]); + Ok(()) +} + +// An uninhabited error type that allows us to use `?` in tests instead of `unwrap`. +// +// Any use of `?` in a test will immediately panic with the error message. +#[derive(Debug)] +enum TestError {} + +type TestResult = Result; + +impl From for TestError { + fn from(error: E) -> Self { + panic!("Test failed: {:?}", error); + } } From 41d067b63473928103cb268e736856a3e99b53bd Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 4 Jul 2024 22:15:07 +0300 Subject: [PATCH 21/22] Use TestResult crate instead of implementing it --- Cargo.lock | 7 +++++++ iroh/Cargo.toml | 1 + iroh/tests/client.rs | 15 +-------------- 3 files changed, 9 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8d93125de8..212d5a40c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2468,6 +2468,7 @@ dependencies = [ "strum 0.25.0", "tempfile", "testdir", + "testresult", "thiserror", "tokio", "tokio-stream", @@ -5672,6 +5673,12 @@ dependencies = [ "whoami", ] +[[package]] +name = "testresult" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d72e255c0541f86589b0287139b70bd941a197ea4cea8fd8f87afe9c965a99e4" + [[package]] name = "thiserror" version = "1.0.61" diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 316b53d62a..5699900302 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -75,6 +75,7 @@ rand_chacha = "0.3.1" regex = { version = "1.7.1", features = ["std"] } serde_json = "1.0.107" testdir = "0.9.1" +testresult = "0.4.0" tokio = { version = "1", features = ["macros", "io-util", "rt"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/iroh/tests/client.rs b/iroh/tests/client.rs index 071b0dc469..c8164dd783 100644 --- a/iroh/tests/client.rs +++ b/iroh/tests/client.rs @@ -7,6 +7,7 @@ use iroh_gossip::{ proto::TopicId, }; use iroh_net::{key::SecretKey, NodeAddr}; +use testresult::TestResult; use tokio::task::JoinHandle; /// Spawn an iroh node in a separate thread and tokio runtime, and return @@ -73,17 +74,3 @@ async fn gossip_smoke() -> TestResult { assert_eq!(msgs, vec![Bytes::from("hello")]); Ok(()) } - -// An uninhabited error type that allows us to use `?` in tests instead of `unwrap`. -// -// Any use of `?` in a test will immediately panic with the error message. -#[derive(Debug)] -enum TestError {} - -type TestResult = Result; - -impl From for TestError { - fn from(error: E) -> Self { - panic!("Test failed: {:?}", error); - } -} From c4172368ca96e3994fd48597aeefb68c6b768e0c Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 4 Jul 2024 22:24:56 +0300 Subject: [PATCH 22/22] remove must_use from AbortingJoinHandle --- iroh-net/src/util.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/iroh-net/src/util.rs b/iroh-net/src/util.rs index a6aa824af0..e94655b51f 100644 --- a/iroh-net/src/util.rs +++ b/iroh-net/src/util.rs @@ -14,7 +14,6 @@ pub mod chain; /// A join handle that owns the task it is running, and aborts it when dropped. #[derive(Debug, derive_more::Deref)] -#[must_use = "aborting join handle will abort the task when dropped"] pub struct AbortingJoinHandle { handle: tokio::task::JoinHandle, }