Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh): Gossip client #2258

Merged
merged 30 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4e99aad
initial gossip client
rklaehn May 1, 2024
48a2993
Add CLI
rklaehn May 1, 2024
4882412
more comments
rklaehn May 1, 2024
b88398e
fix(iroh-sync): Ignore all gossip events that are not for the sync en…
rklaehn May 1, 2024
2d4ef9e
More logging.
rklaehn May 1, 2024
f53cef5
clippy
rklaehn May 1, 2024
2201a40
Move the gossip types into the gossip dispatcher
rklaehn Jun 5, 2024
6b83cd2
move dispatcher into iroh-gossip under a feature flag
rklaehn Jun 5, 2024
b19ed51
some renaming
rklaehn Jun 5, 2024
5e4b557
Merge branch 'main' into gossip-client
rklaehn Jun 5, 2024
5d6b7d0
fix weird unused warning
rklaehn Jun 5, 2024
865bc0d
PR review
rklaehn Jun 7, 2024
12f564e
Merge branch 'main' into gossip-client
rklaehn Jun 19, 2024
150dcd9
Merge branch 'main' into gossip-client
rklaehn Jun 28, 2024
cb216f6
very simple gossip smoke test
rklaehn Jun 28, 2024
3656b5a
add bootstrap node in both directions
rklaehn Jun 28, 2024
3771dc2
Add new request to add a NodeAddr via rpc
rklaehn Jul 1, 2024
609626f
Move the various node ops into a separate subservice
rklaehn Jul 1, 2024
07d2d30
Merge branch 'main' into gossip-client
rklaehn Jul 1, 2024
eaf4e86
Make clippy happy
rklaehn Jul 1, 2024
c4c8ce9
Merge branch 'add-addr-request' into gossip-client
rklaehn Jul 1, 2024
51dfde8
Make sure the nodes know each others addr without discovery
rklaehn Jul 1, 2024
0b227c0
shut up clippy
rklaehn Jul 1, 2024
08fd369
Merge branch 'main' into gossip-client
rklaehn Jul 1, 2024
2363448
Fix test compile error
rklaehn Jul 1, 2024
4fc00c2
Add short subscribe fn
rklaehn Jul 1, 2024
ede9549
Merge branch 'main' into gossip-client
rklaehn Jul 2, 2024
e71b0c8
Merge branch 'main' into gossip-client
rklaehn Jul 4, 2024
41d067b
Use TestResult crate instead of implementing it
rklaehn Jul 4, 2024
c417236
remove must_use from AbortingJoinHandle
rklaehn Jul 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions iroh-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ dirs-next = "2.0.0"
flume = "0.11.0"
futures-buffered = "0.2.4"
futures-lite = "2.3"
futures-util = { version = "0.3.30", features = ["futures-sink"] }
hex = "0.4.3"
human-time = "0.1.6"
indicatif = { version = "0.17", features = ["tokio"] }
iroh = { version = "0.19.0", path = "../iroh", features = ["metrics"] }
iroh-gossip = { version = "0.19.0", path = "../iroh-gossip" }
iroh-metrics = { version = "0.19.0", path = "../iroh-metrics" }
parking_lot = "0.12.1"
pkarr = { version = "1.1.5", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions iroh-cli/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) mod blob;
pub(crate) mod console;
pub(crate) mod doc;
pub(crate) mod doctor;
pub(crate) mod gossip;
pub(crate) mod node;
pub(crate) mod rpc;
pub(crate) mod start;
Expand Down
94 changes: 94 additions & 0 deletions iroh-cli/src/commands/gossip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use anyhow::{Context, Result};
use bao_tree::blake3;
use clap::{ArgGroup, Subcommand};
use futures_lite::StreamExt;
use futures_util::SinkExt;
use iroh::client::gossip::SubscribeOpts;
use iroh::client::Iroh;
use iroh::net::NodeId;
use tokio::io::AsyncBufReadExt;

#[derive(Subcommand, Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum GossipCommands {
/// Subscribe to a topic
#[command(group(
ArgGroup::new("input")
.required(true)
.args(&["topic", "raw_topic"])
))]
Subscribe {
/// Topic string to subscribe to.
///
/// This will be hashed with BLAKE3 to get the actual topic ID.
#[clap(long)]
topic: Option<String>,
/// The raw topic to subscribe to as hex. Needs to be 32 bytes, i.e. 64 hex characters.
#[clap(long)]
raw_topic: Option<String>,
bootstrap: Vec<NodeId>,
#[clap(long, short)]
verbose: bool,
},
}

impl GossipCommands {
pub async fn run(self, iroh: &Iroh) -> Result<()> {
match self {
Self::Subscribe {
topic,
raw_topic,
bootstrap,
verbose,
} => {
let bootstrap = bootstrap.into_iter().collect();
let topic = match (topic, raw_topic) {
(Some(topic), None) => blake3::hash(topic.as_bytes()).into(),
(None, Some(raw_topic)) => {
let mut slice = [0; 32];
hex::decode_to_slice(raw_topic, &mut slice)
.context("failed to decode raw topic")?;
slice.into()
}
_ => anyhow::bail!("either topic or raw_topic must be provided"),
};
// blake3::hash(topic.as_ref()).into();
let opts = SubscribeOpts {
bootstrap,
subscription_capacity: 1024,
};

let (mut sink, mut stream) = iroh.gossip().subscribe_with_opts(topic, opts).await?;
let mut input_lines = tokio::io::BufReader::new(tokio::io::stdin()).lines();
loop {
tokio::select! {
line = input_lines.next_line() => {
let line = line.context("failed to read from stdin")?;
if let Some(line) = line {
sink.send(iroh_gossip::dispatcher::Command::Broadcast(line.into())).await?;
} else {
break;
}
}
res = stream.next() => {
let res = res.context("gossip stream ended")?.context("failed to read gossip stream")?;
match res {
iroh_gossip::dispatcher::Event::Gossip(event) => {
if verbose {
println!("{:?}", event);
} else if let iroh_gossip::dispatcher::GossipEvent::Received(iroh_gossip::dispatcher::Message { content, .. }) = event {
println!("{:?}", content);
}
}
iroh_gossip::dispatcher::Event::Lagged => {
anyhow::bail!("gossip stream lagged");
}
};
}
}
}
}
}
Ok(())
}
}
12 changes: 10 additions & 2 deletions iroh-cli/src/commands/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use iroh::client::Iroh;
use crate::config::ConsoleEnv;

use super::{
author::AuthorCommands, blob::BlobCommands, doc::DocCommands, node::NodeCommands,
tag::TagCommands,
author::AuthorCommands, blob::BlobCommands, doc::DocCommands, gossip::GossipCommands,
node::NodeCommands, tag::TagCommands,
};

#[derive(Subcommand, Debug, Clone)]
Expand Down Expand Up @@ -41,6 +41,13 @@ pub enum RpcCommands {
#[clap(subcommand)]
command: NodeCommands,
},
/// Manage gossip
///
/// Gossip is a way to broadcast messages to a group of nodes.
Gossip {
#[clap(subcommand)]
command: GossipCommands,
},
/// Manage tags
///
/// Tags are local, human-readable names for things iroh should keep.
Expand All @@ -64,6 +71,7 @@ impl RpcCommands {
Self::Doc { command } => command.run(iroh, env).await,
Self::Author { command } => command.run(iroh, env).await,
Self::Tag { command } => command.run(iroh).await,
Self::Gossip { command } => command.run(iroh).await,
}
}
}
7 changes: 6 additions & 1 deletion iroh-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ tokio = { version = "1", optional = true, features = ["io-util", "sync", "rt", "
tokio-util = { version = "0.7.8", optional = true, features = ["codec"] }
genawaiter = { version = "0.99.1", default-features = false, features = ["futures03"] }

# dispatcher dependencies (optional)
futures-util = { version = "0.3.30", optional = true }
flume = { version = "0.11", optional = true }

[dev-dependencies]
clap = { version = "4", features = ["derive"] }
iroh-test = { path = "../iroh-test" }
Expand All @@ -45,8 +49,9 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.4.0"

[features]
default = ["net"]
default = ["net", "dispatcher"]
net = ["dep:futures-lite", "dep:iroh-net", "dep:tokio", "dep:tokio-util"]
dispatcher = ["dep:flume", "dep:futures-util"]

[[example]]
name = "chat"
Expand Down
Loading
Loading