Originally developed as part of the anoma node codebase. Extracted into a separate crate.
This crate implements a general-purpose infection-style gossiping protocol that can be used to form peer to peer topologies and disseminate arbitary data between nodes. This protocol is highly scalable up to thousands of nodes within one topic.
Communication between peers is encrypted using secure node-to-node channels. Peer identity is based on an asymmetric encription keypair generated by each node. Topics are identified using string identifiers and are joined by connecting to bootstrap nodes that are already members of the topic. A node may not specify any bootstrap nodes, in that case it can be used as a bootstrap node for other peers on a topic.
The public API of this crate is:
// API entry point:
pub struct Network { ... }
impl Network {
pub fn new(config: network::Config, keypair: Keypair) -> Result<Self>;
pub fn join(&mut self, config: topic::Config) -> Result<Topic>;
pub async fn runloop(mut self);
}
// after joining a topic:
pub struct Topic { ... }
impl Topic {
pub fn gossip(&self, data: Bytes);
}
// polling on new data gossiped by other peers
impl Stream for Topic {
type Item = Bytes;
fn poll_next(...) -> Poll<Option<Self::Item>>;
}
The basic unit of p2p interaction is a Topic. A single topic is an distinct instance of the HyparView membership protocol that maintains an overlay of peers within one topic. A node can be a member of multiple topics and in this case it will have multiple concurrent instances of HyparView, each with its own peers and a different overlay. Topics may overlap and if you need to route messages between topics, take a look at the Unidirectional cross topic bridge and Bidirectional cross topic bridge examples.
The networking api purposefully does not expose any information about individual peers that are part of the p2p topology of any topic, or any other events other than deduplicated gossip bytes received by the gossip protocol from other peers. This is the job of higher-level constructs built on top of this library to carry such information inside gossiped messages.
In your Cargo.toml
add:
[dependencies]
hypar = "1.0"
// drives all networking across topics
let mut network = Network::default();
// join some topic by its name and a set of bootstrap nodes
let topic1 = network.join(topic::Config {
name: "/example/topic1".into(),
bootstrap: vec![ // Multiaddr
"/ip4/1.2.3.4/tcp/12345".parse()?,
"/ip6/2002:102:405::/tcp/12345".parse()?,
"/dnsaddr/bootstrap.example.com/tcp/12345".parse()?
],
})?;
// create a background task that will print all
// incoming gossip messages on this topic to stdout.
tokio::spawn(async move {
let mut topic1 = topic1;
while let Some(msg) = topic1.next().await {
println!("topic1 message: {msg:?}");
}
});
// gossip a message on the topic every second to all peers
tokio::spawn({
let topic1 = topic1.clone();
async move {
let mut counter = 1u64;
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
topic1.gossip(counter.to_be_bytes().to_vec().into());
counter += 1;
}
}
});
// run the network runloop forever.
tokio::spawn(network.runloop()).await?;
// drives all networking across topics
let mut network = Network::default();
// join two different topics/overlays
let topic1 = network.join(...)?;
let topic2 = network.join(...)?;
// listen on all incoming gossip from topic1
// and gossip a copy to topic2 overlay:
tokio::spawn(async move {
let mut topic1 = topic1;
while let Some(msg) = topic1.next().await {
topic2.gossip(msg);
}
});
// run the network runloop forever.
tokio::spawn(network.runloop()).await?;
// drives all networking across topics
let mut network = Network::default();
// join two different topics/overlays
let topic1 = network.join(...)?;
let topic2 = network.join(...)?;
// listen on all incoming gossip from topic1
// and gossip a copy to topic2 overlay:
tokio::spawn(async move {
let mut topic1 = topic1;
let mut topic2 = topic2;
tokio::select! {
Some(msg) = topic1.next() => {
topic2.gossip(msg);
}
Some(msg) = topic2.next() => {
topic1.gossip(msg);
}
};
});
// run the network runloop forever.
tokio::spawn(network.runloop()).await?;