diff --git a/Cargo.lock b/Cargo.lock index 41c7268343..212d5a40c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2468,6 +2468,7 @@ dependencies = [ "strum 0.25.0", "tempfile", "testdir", + "testresult", "thiserror", "tokio", "tokio-stream", @@ -2589,10 +2590,12 @@ dependencies = [ "flume", "futures-buffered", "futures-lite 2.3.0", + "futures-util", "hex", "human-time", "indicatif", "iroh", + "iroh-gossip", "iroh-metrics", "nix 0.27.1", "parking_lot", @@ -2721,7 +2724,9 @@ dependencies = [ "clap", "derive_more", "ed25519-dalek", + "flume", "futures-lite 2.3.0", + "futures-util", "genawaiter", "indexmap 2.2.6", "iroh-base", @@ -5668,6 +5673,12 @@ dependencies = [ "whoami", ] +[[package]] +name = "testresult" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d72e255c0541f86589b0287139b70bd941a197ea4cea8fd8f87afe9c965a99e4" + [[package]] name = "thiserror" version = "1.0.61" diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index 887ed55e19..1615b59bc2 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -36,10 +36,12 @@ dirs-next = "2.0.0" flume = "0.11.0" futures-buffered = "0.2.4" futures-lite = "2.3" +futures-util = { version = "0.3.30", features = ["futures-sink"] } hex = "0.4.3" human-time = "0.1.6" indicatif = { version = "0.17", features = ["tokio"] } iroh = { version = "0.19.0", path = "../iroh", features = ["metrics"] } +iroh-gossip = { version = "0.19.0", path = "../iroh-gossip" } iroh-metrics = { version = "0.19.0", path = "../iroh-metrics" } parking_lot = "0.12.1" pkarr = { version = "1.1.5", default-features = false } diff --git a/iroh-cli/src/commands.rs b/iroh-cli/src/commands.rs index a9e8d44cdf..a07c38ead5 100644 --- a/iroh-cli/src/commands.rs +++ b/iroh-cli/src/commands.rs @@ -16,6 +16,7 @@ pub(crate) mod blob; pub(crate) mod console; pub(crate) mod doc; pub(crate) mod doctor; +pub(crate) mod gossip; pub(crate) mod node; pub(crate) mod rpc; pub(crate) mod start; diff --git a/iroh-cli/src/commands/gossip.rs b/iroh-cli/src/commands/gossip.rs new file mode 100644 index 0000000000..0ab236bdce --- /dev/null +++ b/iroh-cli/src/commands/gossip.rs @@ -0,0 +1,94 @@ +use anyhow::{Context, Result}; +use bao_tree::blake3; +use clap::{ArgGroup, Subcommand}; +use futures_lite::StreamExt; +use futures_util::SinkExt; +use iroh::client::gossip::SubscribeOpts; +use iroh::client::Iroh; +use iroh::net::NodeId; +use tokio::io::AsyncBufReadExt; + +#[derive(Subcommand, Debug, Clone)] +#[allow(clippy::large_enum_variant)] +pub enum GossipCommands { + /// Subscribe to a topic + #[command(group( + ArgGroup::new("input") + .required(true) + .args(&["topic", "raw_topic"]) + ))] + Subscribe { + /// Topic string to subscribe to. + /// + /// This will be hashed with BLAKE3 to get the actual topic ID. + #[clap(long)] + topic: Option, + /// The raw topic to subscribe to as hex. Needs to be 32 bytes, i.e. 64 hex characters. + #[clap(long)] + raw_topic: Option, + bootstrap: Vec, + #[clap(long, short)] + verbose: bool, + }, +} + +impl GossipCommands { + pub async fn run(self, iroh: &Iroh) -> Result<()> { + match self { + Self::Subscribe { + topic, + raw_topic, + bootstrap, + verbose, + } => { + let bootstrap = bootstrap.into_iter().collect(); + let topic = match (topic, raw_topic) { + (Some(topic), None) => blake3::hash(topic.as_bytes()).into(), + (None, Some(raw_topic)) => { + let mut slice = [0; 32]; + hex::decode_to_slice(raw_topic, &mut slice) + .context("failed to decode raw topic")?; + slice.into() + } + _ => anyhow::bail!("either topic or raw_topic must be provided"), + }; + // blake3::hash(topic.as_ref()).into(); + let opts = SubscribeOpts { + bootstrap, + subscription_capacity: 1024, + }; + + let (mut sink, mut stream) = iroh.gossip().subscribe_with_opts(topic, opts).await?; + let mut input_lines = tokio::io::BufReader::new(tokio::io::stdin()).lines(); + loop { + tokio::select! { + line = input_lines.next_line() => { + let line = line.context("failed to read from stdin")?; + if let Some(line) = line { + sink.send(iroh_gossip::dispatcher::Command::Broadcast(line.into())).await?; + } else { + break; + } + } + res = stream.next() => { + let res = res.context("gossip stream ended")?.context("failed to read gossip stream")?; + match res { + iroh_gossip::dispatcher::Event::Gossip(event) => { + if verbose { + println!("{:?}", event); + } else if let iroh_gossip::dispatcher::GossipEvent::Received(iroh_gossip::dispatcher::Message { content, .. }) = event { + println!("{:?}", content); + } + } + iroh_gossip::dispatcher::Event::Lagged => { + anyhow::bail!("gossip stream lagged"); + } + }; + } + } + } + } + } + Ok(()) + } +} diff --git a/iroh-cli/src/commands/rpc.rs b/iroh-cli/src/commands/rpc.rs index 414a894ddb..9911a6040f 100644 --- a/iroh-cli/src/commands/rpc.rs +++ b/iroh-cli/src/commands/rpc.rs @@ -5,8 +5,8 @@ use iroh::client::Iroh; use crate::config::ConsoleEnv; use super::{ - author::AuthorCommands, blob::BlobCommands, doc::DocCommands, node::NodeCommands, - tag::TagCommands, + author::AuthorCommands, blob::BlobCommands, doc::DocCommands, gossip::GossipCommands, + node::NodeCommands, tag::TagCommands, }; #[derive(Subcommand, Debug, Clone)] @@ -41,6 +41,13 @@ pub enum RpcCommands { #[clap(subcommand)] command: NodeCommands, }, + /// Manage gossip + /// + /// Gossip is a way to broadcast messages to a group of nodes. + Gossip { + #[clap(subcommand)] + command: GossipCommands, + }, /// Manage tags /// /// Tags are local, human-readable names for things iroh should keep. @@ -64,6 +71,7 @@ impl RpcCommands { Self::Doc { command } => command.run(iroh, env).await, Self::Author { command } => command.run(iroh, env).await, Self::Tag { command } => command.run(iroh).await, + Self::Gossip { command } => command.run(iroh).await, } } } diff --git a/iroh-gossip/Cargo.toml b/iroh-gossip/Cargo.toml index 5c06e03046..9db0951666 100644 --- a/iroh-gossip/Cargo.toml +++ b/iroh-gossip/Cargo.toml @@ -37,6 +37,10 @@ tokio = { version = "1", optional = true, features = ["io-util", "sync", "rt", " tokio-util = { version = "0.7.8", optional = true, features = ["codec"] } genawaiter = { version = "0.99.1", default-features = false, features = ["futures03"] } +# dispatcher dependencies (optional) +futures-util = { version = "0.3.30", optional = true } +flume = { version = "0.11", optional = true } + [dev-dependencies] clap = { version = "4", features = ["derive"] } iroh-test = { path = "../iroh-test" } @@ -45,8 +49,9 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } url = "2.4.0" [features] -default = ["net"] +default = ["net", "dispatcher"] net = ["dep:futures-lite", "dep:iroh-net", "dep:tokio", "dep:tokio-util"] +dispatcher = ["dep:flume", "dep:futures-util"] [[example]] name = "chat" diff --git a/iroh-gossip/src/dispatcher.rs b/iroh-gossip/src/dispatcher.rs new file mode 100644 index 0000000000..3da9452290 --- /dev/null +++ b/iroh-gossip/src/dispatcher.rs @@ -0,0 +1,427 @@ +//! A higher level wrapper for the gossip engine that manages multiple gossip subscriptions and updates. +use std::{ + collections::{btree_map::Entry, BTreeMap, BTreeSet}, + pin::Pin, + sync::{Arc, Mutex}, +}; + +use crate::{ + net::{Event as IrohGossipEvent, Gossip}, + proto::{DeliveryScope, TopicId}, +}; +use bytes::Bytes; +use futures_util::Stream; +use iroh_base::rpc::{RpcError, RpcResult}; +use iroh_net::{key::PublicKey, util::AbortingJoinHandle, NodeId}; +use serde::{Deserialize, Serialize}; + +/// Join a gossip topic +#[derive(Serialize, Deserialize, Debug)] +pub struct SubscribeOptions { + /// The initial bootstrap nodes + pub bootstrap: BTreeSet, + /// The maximum number of messages that can be buffered in a subscription. + /// + /// If this limit is reached, the subscriber will receive a `Lagged` response, + /// the message will be dropped, and the subscriber will be closed. + /// + /// This is to prevent a single slow subscriber from blocking the dispatch loop. + /// If a subscriber is lagging, it should be closed and re-opened. + pub subscription_capacity: usize, +} + +/// Send a gossip message +#[derive(Serialize, Deserialize, Debug)] +pub enum Command { + /// Broadcast a message to all nodes in the swarm + Broadcast(Bytes), + /// Broadcast a message to all direct neighbors + BroadcastNeighbors(Bytes), +} + +/// Update from a subscribed gossip topic +#[derive(Serialize, Deserialize, Debug)] +pub enum Event { + /// A message was received + Gossip(GossipEvent), + /// We missed some messages + Lagged, +} + +/// Gossip event +/// An event to be emitted to the application for a particular topic. +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] +pub enum GossipEvent { + /// We have a new, direct neighbor in the swarm membership layer for this topic + NeighborUp(NodeId), + /// We dropped direct neighbor in the swarm membership layer for this topic + NeighborDown(NodeId), + /// A gossip message was received for this topic + Received(Message), +} + +impl From> for GossipEvent { + fn from(event: crate::proto::Event) -> Self { + match event { + crate::proto::Event::NeighborUp(node_id) => Self::NeighborUp(node_id), + crate::proto::Event::NeighborDown(node_id) => Self::NeighborDown(node_id), + crate::proto::Event::Received(message) => Self::Received(Message { + content: message.content, + scope: message.scope, + delivered_from: message.delivered_from, + }), + } + } +} + +/// A gossip message +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] +pub struct Message { + /// The content of the message + pub content: Bytes, + /// The scope of the message. + /// This tells us if the message is from a direct neighbor or actual gossip. + pub scope: DeliveryScope, + /// The node that delivered the message. This is not the same as the original author. + pub delivered_from: NodeId, +} + +/// A gossip engine that manages gossip subscriptions and updates. +#[derive(Debug, Clone)] +pub struct GossipDispatcher { + gossip: Gossip, + inner: Arc>, +} + +/// The mutable state of the gossip engine. +#[derive(Debug)] +struct State { + current_subscriptions: BTreeMap, + /// the single task that dispatches gossip events to all subscribed streams + /// + /// this isn't really part of the mutable state, but it needs to live somewhere + task: Option>, +} + +/// Type alias for a stream of gossip updates, so we don't have to repeat all the bounds. +type CommandStream = Box + Send + Sync + Unpin + 'static>; +/// Type alias for a sink of gossip events. +type EventSink = flume::Sender>; + +#[derive(derive_more::Debug)] +enum TopicState { + /// The topic is currently joining. + /// Making new subscriptions is allowed, but they will have to wait for the join to finish. + Joining { + /// Stream/sink pairs that are waiting for the topic to become live. + #[debug(skip)] + waiting: Vec<(CommandStream, EventSink)>, + /// Set of bootstrap nodes we are using. + bootstrap: BTreeSet, + /// The task that is driving the join future. + _join_task: AbortingJoinHandle<()>, + }, + /// The topic is currently live. + /// New subscriptions can be immediately added. + Live { + /// Task/sink pairs that are currently live. + /// The task is the task that is sending broadcast messages to the topic. + live: Vec<(AbortingJoinHandle<()>, EventSink)>, + }, + /// The topic is currently quitting. + /// We can't make new subscriptions without waiting for the quit to finish. + Quitting { + /// Stream/sink pairs that are waiting for the topic to quit so + /// it can be joined again. + #[debug(skip)] + waiting: Vec<(CommandStream, EventSink)>, + /// Set of bootstrap nodes we are using. + /// + /// This is used to re-join the topic after quitting. + bootstrap: BTreeSet, + /// The task that is driving the quit future. + #[allow(dead_code)] + quit_task: AbortingJoinHandle<()>, + }, +} + +impl TopicState { + /// Extract all senders from the state. + fn into_senders(self) -> Vec { + match self { + TopicState::Joining { waiting, .. } | TopicState::Quitting { waiting, .. } => { + waiting.into_iter().map(|(_, send)| send).collect() + } + TopicState::Live { live } => live.into_iter().map(|(_, send)| send).collect(), + } + } +} + +impl GossipDispatcher { + /// Create a new gossip dispatcher with the given gossip instance. + pub fn new(gossip: Gossip) -> Self { + let inner = Arc::new(Mutex::new(State { + current_subscriptions: BTreeMap::new(), + task: None, + })); + let res = Self { gossip, inner }; + let dispatch_task = spawn_owned(res.clone().dispatch_task()); + res.inner.lock().unwrap().task = Some(dispatch_task); + res + } + + /// Quit a gossip topic and handle the result of the quitting. + /// + /// On quit success, will try to join the topic again with the bootstrap nodes we have accumulated while waiting for quit to finish. + /// On quit failure, all waiting streams will be notified with the error and removed. + async fn quit_task(self, topic: TopicId) { + let res = self.gossip.quit(topic).await; + let mut inner = self.inner.lock().unwrap(); + if let Some(TopicState::Quitting { + waiting, + bootstrap: peers, + .. + }) = inner.current_subscriptions.remove(&topic) + { + match res { + Ok(()) => { + if waiting.is_empty() { + return; + } + let bootstrap = peers.clone(); + let _join_task = spawn_owned(self.clone().join_task(topic, bootstrap)); + inner.current_subscriptions.insert( + topic, + TopicState::Joining { + waiting, + bootstrap: peers, + _join_task, + }, + ); + } + Err(e) => { + // notify all waiting streams that there is something wrong with the topic + let error = RpcError::from(e); + for (_, send) in waiting { + send.try_send(Err(error.clone())).ok(); + } + } + } + } + } + + /// Try to send an event to a sink. + /// + /// This will not wait until the sink is full, but send a `Lagged` response if the sink is almost full. + fn try_send(entry: &(AbortingJoinHandle<()>, EventSink), event: &IrohGossipEvent) -> bool { + let (task, send) = entry; + // This means that we stop sending to the stream when the update side is finished. + if task.is_finished() { + return false; + } + // If the stream is disconnected, we don't need to send to it. + if send.is_disconnected() { + return false; + } + // Check if the send buffer is almost full, and send a lagged response if it is. + if let Some(cap) = send.capacity() { + if send.len() >= cap - 1 { + send.try_send(Ok(Event::Lagged)).ok(); + return false; + } + } + // Send the event to the stream. + // We are the owner of the stream, so we can be sure that there is still room. + send.try_send(Ok(Event::Gossip(event.clone().into()))) + .is_ok() + } + + /// Dispatch gossip events to all subscribed streams. + /// + /// This should not fail unless the gossip instance is faulty. + async fn dispatch_loop(self) -> anyhow::Result<()> { + use futures_lite::stream::StreamExt; + let stream = self.gossip.clone().subscribe_all(); + tokio::pin!(stream); + while let Some(item) = stream.next().await { + let (topic, event) = item?; + let mut inner = self.inner.lock().unwrap(); + if let Some(TopicState::Live { live }) = inner.current_subscriptions.get_mut(&topic) { + live.retain(|entry| Self::try_send(entry, &event)); + if live.is_empty() { + let quit_task = tokio::task::spawn(self.clone().quit_task(topic)); + inner.current_subscriptions.insert( + topic, + TopicState::Quitting { + waiting: vec![], + bootstrap: BTreeSet::new(), + quit_task: quit_task.into(), + }, + ); + } + } else { + tracing::trace!("Received event for unknown topic, possibly sync {topic}",); + } + } + Ok(()) + } + + /// Dispatch gossip events to all subscribed streams, and handle the unlikely case of a dispatch loop failure. + async fn dispatch_task(self) { + if let Err(cause) = self.clone().dispatch_loop().await { + // dispatch task failed. Not sure what to do here. + tracing::error!("Gossip dispatch task failed: {}", cause); + let mut inner = self.inner.lock().unwrap(); + let error = RpcError::from(cause); + for (_, state) in std::mem::take(&mut inner.current_subscriptions) { + for sender in state.into_senders() { + sender.try_send(Err(error.clone())).ok(); + } + } + } + } + + /// Handle updates from the client. + async fn update_loop( + gossip: Gossip, + topic: TopicId, + mut updates: CommandStream, + ) -> anyhow::Result<()> { + use futures_lite::stream::StreamExt; + while let Some(update) = Pin::new(&mut updates).next().await { + match update { + Command::Broadcast(msg) => { + gossip.broadcast(topic, msg).await?; + } + Command::BroadcastNeighbors(msg) => { + gossip.broadcast_neighbors(topic, msg).await?; + } + } + } + Ok(()) + } + + /// Handle updates from the client, and handle update loop failure. + async fn update_task(self, topic: TopicId, updates: CommandStream) { + let Err(e) = Self::update_loop(self.gossip.clone(), topic, updates).await else { + return; + }; + let mut inner = self.inner.lock().unwrap(); + // we got an error while sending to the topic + if let Some(TopicState::Live { live }) = inner.current_subscriptions.remove(&topic) { + let error = RpcError::from(e); + // notify all live streams that sending to the topic failed + for (_, send) in live { + send.try_send(Err(error.clone())).ok(); + } + } + } + + /// Call join, then await the result. + /// + /// Basically just flattens the two stages of joining into one. + async fn join(gossip: Gossip, topic: TopicId, bootstrap: Vec) -> anyhow::Result<()> { + let join = gossip.join(topic, bootstrap).await?; + join.await?; + Ok(()) + } + + /// Join a gossip topic and handle turning waiting streams into live streams. + async fn join_task(self, topic: TopicId, bootstrap: BTreeSet) { + let res = Self::join(self.gossip.clone(), topic, bootstrap.into_iter().collect()).await; + let mut inner = self.inner.lock().unwrap(); + if let Some(TopicState::Joining { waiting, .. }) = + inner.current_subscriptions.remove(&topic) + { + match res { + Ok(()) => { + let mut live = vec![]; + for (updates, send) in waiting { + // if the stream is disconnected, we don't need to keep it and start the update task + if send.is_disconnected() { + continue; + } + let task = spawn_owned(self.clone().update_task(topic, updates)); + live.push((task, send)); + } + inner + .current_subscriptions + .insert(topic, TopicState::Live { live }); + } + Err(e) => { + // notify all waiting streams that the subscription failed + let error = RpcError::from(e); + for (_, send) in waiting { + send.try_send(Err(error.clone())).ok(); + } + } + } + } + } + + /// Subscribe to a gossip topic. + pub fn subscribe_with_opts( + &self, + topic: TopicId, + options: SubscribeOptions, + updates: CommandStream, + ) -> impl Stream> { + let mut inner = self.inner.lock().unwrap(); + let (send, recv) = flume::bounded(options.subscription_capacity); + match inner.current_subscriptions.entry(topic) { + Entry::Vacant(entry) => { + // There is no existing subscription, so we need to start a new one. + let waiting = vec![(updates, send)]; + let this = self.clone(); + let _join_task = + spawn_owned(this.clone().join_task(topic, options.bootstrap.clone())); + entry.insert(TopicState::Joining { + waiting, + bootstrap: options.bootstrap, + _join_task, + }); + } + Entry::Occupied(mut entry) => { + // There is already a subscription + let state = entry.get_mut(); + match state { + TopicState::Joining { + waiting, + bootstrap: peers, + .. + } => { + // We are joining, so we need to wait with creating the update task. + // + // TODO: should we merge the bootstrap nodes and try to join with all of them? + peers.extend(options.bootstrap); + waiting.push((updates, send)); + } + TopicState::Quitting { + waiting, + bootstrap: peers, + .. + } => { + // We are quitting, so we need to wait with creating the update task. + peers.extend(options.bootstrap); + waiting.push((updates, send)); + } + TopicState::Live { live } => { + // There is already a live subscription, so we can immediately start the update task. + let task = spawn_owned(self.clone().update_task(topic, updates)); + live.push((task, send)); + } + } + } + } + recv.into_stream() + } +} + +/// tokio::spawn but returns an `AbortingJoinHandle` that owns the task. +fn spawn_owned(f: F) -> AbortingJoinHandle +where + F: std::future::Future + Send + 'static, + T: Send + 'static, +{ + tokio::spawn(f).into() +} diff --git a/iroh-gossip/src/lib.rs b/iroh-gossip/src/lib.rs index 9c6dd3f27e..70a015a787 100644 --- a/iroh-gossip/src/lib.rs +++ b/iroh-gossip/src/lib.rs @@ -2,6 +2,8 @@ #![deny(missing_docs, rustdoc::broken_intra_doc_links)] +#[cfg(feature = "dispatcher")] +pub mod dispatcher; pub mod metrics; #[cfg(feature = "net")] pub mod net; diff --git a/iroh-gossip/src/proto.rs b/iroh-gossip/src/proto.rs index 25eb6f1384..105a9d29d5 100644 --- a/iroh-gossip/src/proto.rs +++ b/iroh-gossip/src/proto.rs @@ -59,7 +59,7 @@ pub mod util; #[cfg(test)] mod tests; -pub use plumtree::Scope; +pub use plumtree::{DeliveryScope, Scope}; pub use state::{InEvent, Message, OutEvent, State, Timer, TopicId}; pub use topic::{Command, Config, Event, IO}; diff --git a/iroh-gossip/src/proto/plumtree.rs b/iroh-gossip/src/proto/plumtree.rs index c1a96dea10..365630cff9 100644 --- a/iroh-gossip/src/proto/plumtree.rs +++ b/iroh-gossip/src/proto/plumtree.rs @@ -84,7 +84,7 @@ pub enum Event { Received(GossipEvent), } -#[derive(Clone, derive_more::Debug, PartialEq, Eq, Ord, PartialOrd)] +#[derive(Clone, derive_more::Debug, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)] pub struct GossipEvent { /// The content of the gossip message. #[debug("<{}b>", content.len())] diff --git a/iroh-gossip/src/proto/topic.rs b/iroh-gossip/src/proto/topic.rs index 0ac50d4f1f..64cffb783d 100644 --- a/iroh-gossip/src/proto/topic.rs +++ b/iroh-gossip/src/proto/topic.rs @@ -116,7 +116,7 @@ impl Message { } /// An event to be emitted to the application for a particular topic. -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] pub enum Event { /// We have a new, direct neighbor in the swarm membership layer for this topic NeighborUp(PI), diff --git a/iroh-net/src/portmapper/current_mapping.rs b/iroh-net/src/portmapper/current_mapping.rs index 0c2273ece3..4c1c66591f 100644 --- a/iroh-net/src/portmapper/current_mapping.rs +++ b/iroh-net/src/portmapper/current_mapping.rs @@ -184,9 +184,6 @@ mod tests { // for testing a mapping is simply an ip, port pair type M = (Ipv4Addr, NonZeroU16); - const TEST_PORT: NonZeroU16 = // SAFETY: it's clearly non zero - unsafe { NonZeroU16::new_unchecked(9586) }; - const TEST_IP: std::net::Ipv4Addr = std::net::Ipv4Addr::LOCALHOST; const HALF_LIFETIME_SECS: u64 = 1; impl Mapping for M { @@ -201,6 +198,9 @@ mod tests { #[tokio::test] #[ntest::timeout(2500)] async fn report_renew_expire_report() { + const TEST_PORT: NonZeroU16 = // SAFETY: it's clearly non zero + unsafe { NonZeroU16::new_unchecked(9586) }; + const TEST_IP: std::net::Ipv4Addr = std::net::Ipv4Addr::LOCALHOST; let (mut c, mut watcher) = CurrentMapping::::new(); let now = std::time::Instant::now(); c.update(Some((TEST_IP, TEST_PORT))); diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 316b53d62a..5699900302 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -75,6 +75,7 @@ rand_chacha = "0.3.1" regex = { version = "1.7.1", features = ["std"] } serde_json = "1.0.107" testdir = "0.9.1" +testresult = "0.4.0" tokio = { version = "1", features = ["macros", "io-util", "rt"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/iroh/src/client.rs b/iroh/src/client.rs index 1b93e1db2c..bf06a12426 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -25,6 +25,7 @@ pub(crate) use self::quic::{connect_raw as quic_connect_raw, RPC_ALPN}; pub mod authors; pub mod blobs; pub mod docs; +pub mod gossip; pub mod node; pub mod tags; @@ -72,6 +73,11 @@ impl Iroh { tags::Client::ref_cast(&self.rpc) } + /// Gossip client + pub fn gossip(&self) -> &gossip::Client { + gossip::Client::ref_cast(&self.rpc) + } + /// Node client pub fn node(&self) -> &node::Client { node::Client::ref_cast(&self.rpc) diff --git a/iroh/src/client/gossip.rs b/iroh/src/client/gossip.rs new file mode 100644 index 0000000000..40c695eeec --- /dev/null +++ b/iroh/src/client/gossip.rs @@ -0,0 +1,99 @@ +//! Gossip client. +use std::collections::BTreeSet; + +use anyhow::Result; +use futures_lite::{Stream, StreamExt}; +use futures_util::{Sink, SinkExt}; +use iroh_gossip::proto::TopicId; +use iroh_net::NodeId; +use ref_cast::RefCast; + +use crate::rpc_protocol::gossip::{SubscribeRequest, SubscribeResponse, SubscribeUpdate}; + +use super::RpcClient; + +/// Iroh gossip client. +#[derive(Debug, Clone, RefCast)] +#[repr(transparent)] +pub struct Client { + pub(super) rpc: RpcClient, +} + +/// Options for subscribing to a gossip topic. +#[derive(Debug, Clone)] +pub struct SubscribeOpts { + /// Bootstrap nodes to connect to. + pub bootstrap: BTreeSet, + /// Subscription capacity. + pub subscription_capacity: usize, +} + +impl Default for SubscribeOpts { + fn default() -> Self { + Self { + bootstrap: BTreeSet::new(), + subscription_capacity: 256, + } + } +} + +impl Client { + /// Subscribe to a gossip topic. + /// + /// Returns a sink to send updates to the topic and a stream of responses. + /// + /// Updates are either [Broadcast](iroh_gossip::dispatcher::Command::Broadcast) + /// or [BroadcastNeighbors](iroh_gossip::dispatcher::Command::BroadcastNeighbors). + /// + /// Broadcasts are gossiped to the entire swarm, while BroadcastNeighbors are sent to + /// just the immediate neighbors of the node. + /// + /// Responses are either [Gossip](iroh_gossip::dispatcher::Event::Gossip) or + /// [Lagged](iroh_gossip::dispatcher::Event::Lagged). + /// + /// Gossip events contain the actual message content, as well as information about the + /// immediate neighbors of the node. + /// + /// A Lagged event indicates that the gossip stream has not been consumed quickly enough. + /// You can adjust the buffer size with the [] option. + pub async fn subscribe_with_opts( + &self, + topic: TopicId, + opts: SubscribeOpts, + ) -> Result<( + impl Sink, + impl Stream>, + )> { + let (sink, stream) = self + .rpc + .bidi(SubscribeRequest { + topic, + bootstrap: opts.bootstrap, + subscription_capacity: opts.subscription_capacity, + }) + .await?; + let stream = stream.map(|item| anyhow::Ok(item??)); + let sink = sink.sink_map_err(|_| anyhow::anyhow!("send error")); + Ok((sink, stream)) + } + + /// Subscribe to a gossip topic with default options. + pub async fn subscribe( + &self, + topic: impl Into, + bootstrap: impl IntoIterator>, + ) -> Result<( + impl Sink, + impl Stream>, + )> { + let bootstrap = bootstrap.into_iter().map(Into::into).collect(); + self.subscribe_with_opts( + topic.into(), + SubscribeOpts { + bootstrap, + ..Default::default() + }, + ) + .await + } +} diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 3fd1475bdb..02f4dfad50 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -13,6 +13,7 @@ use futures_lite::StreamExt; use iroh_base::key::PublicKey; use iroh_blobs::store::{GcMarkEvent, GcSweepEvent, Store as BaoStore}; use iroh_blobs::{downloader::Downloader, protocol::Closed}; +use iroh_gossip::dispatcher::GossipDispatcher; use iroh_gossip::net::Gossip; use iroh_net::key::SecretKey; use iroh_net::Endpoint; @@ -68,6 +69,7 @@ struct NodeInner { #[debug("rt")] rt: LocalPoolHandle, downloader: Downloader, + gossip_dispatcher: GossipDispatcher, } /// In memory node. diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 3b24996b6c..3472ce011a 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -14,7 +14,10 @@ use iroh_blobs::{ }; use iroh_docs::engine::DefaultAuthorStorage; use iroh_docs::net::DOCS_ALPN; -use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; +use iroh_gossip::{ + dispatcher::GossipDispatcher, + net::{Gossip, GOSSIP_ALPN}, +}; use iroh_net::{ discovery::{dns::DnsDiscovery, pkarr::PkarrPublisher, ConcurrentDiscovery, Discovery}, dns::DnsResolver, @@ -545,6 +548,7 @@ where downloader.clone(), ) .await?; + let gossip_dispatcher = GossipDispatcher::new(gossip.clone()); // Initialize the internal RPC connection. let (internal_rpc, controller) = quic_rpc::transport::flume::connection(32); @@ -564,6 +568,7 @@ where rt: lp, downloader, gossip, + gossip_dispatcher, }); let protocol_builder = ProtocolBuilder { diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 26cc0dba1f..0b5357263a 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -53,7 +53,7 @@ use crate::rpc_protocol::{ ExportFileRequest, ExportFileResponse, ImportFileRequest, ImportFileResponse, SetHashRequest, }, - node, + gossip, node, node::{ AddAddrRequest, AddrRequest, ConnectionInfoRequest, ConnectionInfoResponse, ConnectionsRequest, ConnectionsResponse, IdRequest, NodeWatchRequest, RelayRequest, @@ -201,6 +201,30 @@ impl Handler { } } + async fn handle_gossip_request>( + self, + msg: gossip::Request, + chan: RpcChannel, + ) -> Result<(), RpcServerError> { + use gossip::Request::*; + match msg { + Subscribe(msg) => { + chan.bidi_streaming(msg, self, |handler, req, updates| { + handler.inner.gossip_dispatcher.subscribe_with_opts( + req.topic, + iroh_gossip::dispatcher::SubscribeOptions { + bootstrap: req.bootstrap, + subscription_capacity: req.subscription_capacity, + }, + Box::new(updates), + ) + }) + .await + } + Update(_msg) => Err(RpcServerError::UnexpectedUpdateMessage), + } + } + async fn handle_authors_request>( self, msg: authors::Request, @@ -401,6 +425,7 @@ impl Handler { Tags(msg) => self.handle_tags_request(msg, chan).await, Authors(msg) => self.handle_authors_request(msg, chan).await, Docs(msg) => self.handle_docs_request(msg, chan).await, + Gossip(msg) => self.handle_gossip_request(msg, chan).await, } } diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 480c778546..bac8d577b2 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -12,6 +12,7 @@ use serde::{Deserialize, Serialize}; pub mod authors; pub mod blobs; pub mod docs; +pub mod gossip; pub mod node; pub mod tags; @@ -29,6 +30,7 @@ pub enum Request { Docs(docs::Request), Tags(tags::Request), Authors(authors::Request), + Gossip(gossip::Request), } /// The response enum, listing all possible responses. @@ -41,6 +43,7 @@ pub enum Response { Tags(tags::Response), Docs(docs::Response), Authors(authors::Response), + Gossip(gossip::Response), } impl quic_rpc::Service for RpcService { diff --git a/iroh/src/rpc_protocol/gossip.rs b/iroh/src/rpc_protocol/gossip.rs new file mode 100644 index 0000000000..b2a589ea0e --- /dev/null +++ b/iroh/src/rpc_protocol/gossip.rs @@ -0,0 +1,50 @@ +use std::collections::BTreeSet; + +use iroh_base::rpc::RpcResult; +use iroh_gossip::proto::TopicId; +use iroh_net::NodeId; +use nested_enum_utils::enum_conversions; +use quic_rpc::message::{BidiStreaming, BidiStreamingMsg, Msg}; +use serde::{Deserialize, Serialize}; + +use super::RpcService; + +pub use iroh_gossip::dispatcher::Command as SubscribeUpdate; +pub use iroh_gossip::dispatcher::Event as SubscribeResponse; + +#[allow(missing_docs)] +#[derive(strum::Display, Debug, Serialize, Deserialize)] +#[enum_conversions(super::Request)] +pub enum Request { + Subscribe(SubscribeRequest), + Update(SubscribeUpdate), +} + +#[allow(missing_docs)] +#[derive(strum::Display, Debug, Serialize, Deserialize)] +#[enum_conversions(super::Response)] +pub enum Response { + Subscribe(RpcResult), +} + +/// A request to the node to subscribe to gossip events. +/// +/// This is basically a topic and additional options +#[derive(Serialize, Deserialize, Debug)] +pub struct SubscribeRequest { + /// The topic to subscribe to + pub topic: TopicId, + /// The nodes to bootstrap the subscription from + pub bootstrap: BTreeSet, + /// The capacity of the subscription + pub subscription_capacity: usize, +} + +impl Msg for SubscribeRequest { + type Pattern = BidiStreaming; +} + +impl BidiStreamingMsg for SubscribeRequest { + type Update = SubscribeUpdate; + type Response = RpcResult; +} diff --git a/iroh/tests/client.rs b/iroh/tests/client.rs new file mode 100644 index 0000000000..c8164dd783 --- /dev/null +++ b/iroh/tests/client.rs @@ -0,0 +1,76 @@ +use bytes::Bytes; +use futures_lite::{Stream, StreamExt}; +use futures_util::SinkExt; +use iroh::client::Iroh; +use iroh_gossip::{ + dispatcher::{Command, Event, GossipEvent}, + proto::TopicId, +}; +use iroh_net::{key::SecretKey, NodeAddr}; +use testresult::TestResult; +use tokio::task::JoinHandle; + +/// Spawn an iroh node in a separate thread and tokio runtime, and return +/// the address and client. +fn spawn_node() -> (NodeAddr, Iroh) { + let (sender, receiver) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + runtime.block_on(async move { + let secret_key = SecretKey::generate(); + let node = iroh::node::Builder::default() + .secret_key(secret_key) + .spawn() + .await?; + let addr = node.node_addr().await?; + sender.send((addr, node.client().clone()))?; + node.cancel_token().cancelled().await; + anyhow::Ok(()) + })?; + anyhow::Ok(()) + }); + receiver.recv().unwrap() +} + +/// Await `n` messages from a stream of gossip events. +fn await_messages( + mut stream: impl Stream> + Unpin + Send + Sync + 'static, + n: usize, +) -> JoinHandle> { + tokio::spawn(async move { + let mut res = Vec::new(); + #[allow(clippy::single_match)] + while let Some(msg) = stream.next().await { + match msg.unwrap() { + Event::Gossip(GossipEvent::Received(msg)) => { + res.push(msg.content); + if res.len() >= n { + break; + } + } + _ => {} + } + } + res + }) +} + +#[tokio::test] +async fn gossip_smoke() -> TestResult { + let _ = tracing_subscriber::fmt::try_init(); + let (addr1, node1) = spawn_node(); + let (addr2, node2) = spawn_node(); + let gossip1 = node1.gossip(); + let gossip2 = node2.gossip(); + node1.add_node_addr(addr2.clone()).await?; + node2.add_node_addr(addr1.clone()).await?; + let topic = TopicId::from([0u8; 32]); + let (mut sink1, _stream2) = gossip1.subscribe(topic, [addr2.node_id]).await?; + let (_sink2, stream2) = gossip2.subscribe(topic, [addr1.node_id]).await?; + sink1.send(Command::Broadcast("hello".into())).await?; + let msgs = await_messages(stream2, 1).await?; + assert_eq!(msgs, vec![Bytes::from("hello")]); + Ok(()) +}