Skip to content

Commit

Permalink
feat: introduce libp2p-stream
Browse files Browse the repository at this point in the history
For a while now, `rust-libp2p` provided the `request-response` abstraction which makes it easy for users to build request-response based protocols without having to implement a `NetworkBehaviour` themselves. This PR introduces an alpha version of `libp2p-stream`: a `NetworkBehaviour` that directly gives access to negotiated streams.

In addition to complementing `request-response`, `libp2p-stream` also diverges in its design from the remaining modules by offering a clonable `Control` that provides `async` functions.

Resolves: #4457.

Pull-Request: #5027.
  • Loading branch information
thomaseizinger authored Jan 16, 2024
1 parent cbbba79 commit b958897
Show file tree
Hide file tree
Showing 15 changed files with 1,077 additions and 3 deletions.
30 changes: 30 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ members = [
"examples/ping",
"examples/relay-server",
"examples/rendezvous",
"examples/stream",
"examples/upnp",
"hole-punching-tests",
"identity",
Expand Down Expand Up @@ -45,10 +46,11 @@ members = [
"protocols/relay",
"protocols/rendezvous",
"protocols/request-response",
"protocols/stream",
"protocols/upnp",
"swarm",
"swarm-derive",
"swarm-test",
"swarm",
"transports/dns",
"transports/noise",
"transports/plaintext",
Expand All @@ -57,11 +59,11 @@ members = [
"transports/tcp",
"transports/tls",
"transports/uds",
"transports/webrtc",
"transports/webrtc-websys",
"transports/webrtc",
"transports/websocket-websys",
"transports/websocket",
"transports/webtransport-websys",
"transports/websocket-websys",
"wasm-tests/webtransport-tests",
]
resolver = "2"
Expand Down Expand Up @@ -99,6 +101,7 @@ libp2p-relay = { version = "0.17.1", path = "protocols/relay" }
libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" }
libp2p-request-response = { version = "0.26.1", path = "protocols/request-response" }
libp2p-server = { version = "0.12.5", path = "misc/server" }
libp2p-stream = { version = "0.1.0-alpha", path = "protocols/stream" }
libp2p-swarm = { version = "0.44.1", path = "swarm" }
libp2p-swarm-derive = { version = "=0.34.2", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required.
libp2p-swarm-test = { version = "0.3.0", path = "swarm-test" }
Expand Down
22 changes: 22 additions & 0 deletions examples/stream/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "stream-example"
version = "0.1.0"
edition = "2021"
publish = false
license = "MIT"

[package.metadata.release]
release = false

[dependencies]
anyhow = "1"
futures = "0.3.29"
libp2p = { path = "../../libp2p", features = [ "tokio", "quic"] }
libp2p-stream = { path = "../../protocols/stream", version = "0.1.0-alpha" }
rand = "0.8"
tokio = { version = "1.35", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[lints]
workspace = true
35 changes: 35 additions & 0 deletions examples/stream/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
## Description

This example shows the usage of the `stream::Behaviour`.
As a counter-part to the `request_response::Behaviour`, the `stream::Behaviour` allows users to write stream-oriented protocols whilst having minimal interaction with the `Swarm`.

In this showcase, we implement an echo protocol: All incoming data is echoed back to the dialer, until the stream is closed.

## Usage

To run the example, follow these steps:

1. Start an instance of the example in one terminal:

```sh
cargo run --bin stream-example
```

Observe printed listen address.

2. Start another instance in a new terminal, providing the listen address of the first one.

```sh
cargo run --bin stream-example -- <address>
```

3. Both terminals should now continuosly print messages.

## Conclusion

The `stream::Behaviour` is an "escape-hatch" from the way typical rust-libp2p protocols are written.
It is suitable for several scenarios including:

- prototyping of new protocols
- experimentation with rust-libp2p
- integration in `async/await`-heavy applications
154 changes: 154 additions & 0 deletions examples/stream/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use std::{io, time::Duration};

use anyhow::{Context, Result};
use futures::{AsyncReadExt, AsyncWriteExt, StreamExt};
use libp2p::{multiaddr::Protocol, Multiaddr, PeerId, Stream, StreamProtocol};
use libp2p_stream as stream;
use rand::RngCore;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;

const ECHO_PROTOCOL: StreamProtocol = StreamProtocol::new("/echo");

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env()?,
)
.init();

let maybe_address = std::env::args()
.nth(1)
.map(|arg| arg.parse::<Multiaddr>())
.transpose()
.context("Failed to parse argument as `Multiaddr`")?;

let mut swarm = libp2p::SwarmBuilder::with_new_identity()
.with_tokio()
.with_quic()
.with_behaviour(|_| stream::Behaviour::new())?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(10)))
.build();

swarm.listen_on("/ip4/127.0.0.1/udp/0/quic-v1".parse()?)?;

let mut incoming_streams = swarm
.behaviour()
.new_control()
.accept(ECHO_PROTOCOL)
.unwrap();

// Deal with incoming streams.
// Spawning a dedicated task is just one way of doing this.
// libp2p doesn't care how you handle incoming streams but you _must_ handle them somehow.
// To mitigate DoS attacks, libp2p will internally drop incoming streams if your application cannot keep up processing them.
tokio::spawn(async move {
// This loop handles incoming streams _sequentially_ but that doesn't have to be the case.
// You can also spawn a dedicated task per stream if you want to.
// Be aware that this breaks backpressure though as spawning new tasks is equivalent to an unbounded buffer.
// Each task needs memory meaning an aggressive remote peer may force you OOM this way.

while let Some((peer, stream)) = incoming_streams.next().await {
match echo(stream).await {
Ok(n) => {
tracing::info!(%peer, "Echoed {n} bytes!");
}
Err(e) => {
tracing::warn!(%peer, "Echo failed: {e}");
continue;
}
};
}
});

// In this demo application, the dialing peer initiates the protocol.
if let Some(address) = maybe_address {
let Some(Protocol::P2p(peer_id)) = address.iter().last() else {
anyhow::bail!("Provided address does not end in `/p2p`");
};

swarm.dial(address)?;

tokio::spawn(connection_handler(peer_id, swarm.behaviour().new_control()));
}

// Poll the swarm to make progress.
loop {
let event = swarm.next().await.expect("never terminates");

match event {
libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
let listen_address = address.with_p2p(*swarm.local_peer_id()).unwrap();
tracing::info!(%listen_address);
}
event => tracing::trace!(?event),
}
}
}

/// A very simple, `async fn`-based connection handler for our custom echo protocol.
async fn connection_handler(peer: PeerId, mut control: stream::Control) {
loop {
tokio::time::sleep(Duration::from_secs(1)).await; // Wait a second between echos.

let stream = match control.open_stream(peer, ECHO_PROTOCOL).await {
Ok(stream) => stream,
Err(error @ stream::OpenStreamError::UnsupportedProtocol(_)) => {
tracing::info!(%peer, %error);
return;
}
Err(error) => {
// Other errors may be temporary.
// In production, something like an exponential backoff / circuit-breaker may be more appropriate.
tracing::debug!(%peer, %error);
continue;
}
};

if let Err(e) = send(stream).await {
tracing::warn!(%peer, "Echo protocol failed: {e}");
continue;
}

tracing::info!(%peer, "Echo complete!")
}
}

async fn echo(mut stream: Stream) -> io::Result<usize> {
let mut total = 0;

let mut buf = [0u8; 100];

loop {
let read = stream.read(&mut buf).await?;
if read == 0 {
return Ok(total);
}

total += read;
stream.write_all(&buf[..read]).await?;
}
}

async fn send(mut stream: Stream) -> io::Result<()> {
let num_bytes = rand::random::<usize>() % 1000;

let mut bytes = vec![0; num_bytes];
rand::thread_rng().fill_bytes(&mut bytes);

stream.write_all(&bytes).await?;

let mut buf = vec![0; num_bytes];
stream.read_exact(&mut buf).await?;

if bytes != buf {
return Err(io::Error::new(io::ErrorKind::Other, "incorrect echo"));
}

stream.close().await?;

Ok(())
}
3 changes: 3 additions & 0 deletions protocols/stream/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## 0.1.0-alpha

Initial release.
27 changes: 27 additions & 0 deletions protocols/stream/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "libp2p-stream"
version = "0.1.0-alpha"
edition = "2021"
rust-version.workspace = true
description = "Generic stream protocols for libp2p"
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
futures = "0.3.29"
libp2p-core = { workspace = true }
libp2p-identity = { workspace = true, features = ["peerid"] }
libp2p-swarm = { workspace = true }
tracing = "0.1.37"
void = "1"
rand = "0.8"

[dev-dependencies]
libp2p-swarm-test = { workspace = true }
tokio = { version = "1", features = ["full"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[lints]
workspace = true
Loading

0 comments on commit b958897

Please sign in to comment.