Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub fix #278

Merged
merged 13 commits into from
May 24, 2022
Merged
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-
sp-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.18" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.18" }
libp2p = "0.40.0"
libp2p-swarm = "0.31.0"
futures = "0.3.15"
bincode = "1.3.1"
log = "0.4.11"
Expand Down
36 changes: 25 additions & 11 deletions protocol/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use futures::channel::oneshot;
use futures::Future;
use libp2p::core::connection::ConnectionLimit;
use libp2p::core::transport::TransportError;
use libp2p::{gossipsub, swarm};
use std::pin::Pin;
// use substrate_subxt::MetadataError;

/// Protocol Result typedef.
pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -42,14 +42,16 @@ pub enum Error {
Transport(TransportError<std::io::Error>),
/// Libp2p connection limit error.
ConnectionLimit(ConnectionLimit),
// /// Transaction sending error.
// SubmitFailure(substrate_subxt::Error),
/// Codec error.
Codec(bincode::Error),
/// Unable to decode address.
Ss58CodecError,
/// Unable to get metadata.
MetadataError,
/// Libp2p swarm dial error.
DialError,
/// Libp2p gossipsub subscribe error.
SubscriptionError,
/// Libp2p gossipsub publish error.
PublishError,
/// Other error.
Other(String),
}
Expand All @@ -60,12 +62,6 @@ impl<'a> From<&'a str> for Error {
}
}

// impl From<MetadataError> for Error {
// fn from(_: MetadataError) -> Self {
// Error::MetadataError
// }
// }

impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Expand All @@ -75,3 +71,21 @@ impl std::error::Error for Error {
}
}
}

impl From<swarm::DialError> for Error {
fn from(_: swarm::DialError) -> Self {
Error::DialError
}
}

impl From<gossipsub::error::SubscriptionError> for Error {
fn from(_: gossipsub::error::SubscriptionError) -> Self {
Error::SubscriptionError
}
}

impl From<gossipsub::error::PublishError> for Error {
fn from(_: gossipsub::error::PublishError) -> Self {
Error::PublishError
}
}
14 changes: 8 additions & 6 deletions protocol/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@
//! Robonomics Network broadcasting layer.

use crate::error::FutureResult;
use futures::Stream;
use futures::channel::mpsc;
use std::fmt;
use std::pin::Pin;

pub use gossipsub::PubSub as Gossipsub;
pub use libp2p::{Multiaddr, PeerId};

pub mod discovery;
pub mod gossipsub;
pub mod pubsubapi;

pub use gossipsub::PubSub as Gossipsub;

/// Robonomics PubSub message.
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct Message {
Expand All @@ -44,7 +42,7 @@ impl fmt::Display for Message {
}

/// Stream of incoming messages.
pub type Inbox = Pin<Box<dyn Stream<Item = Message> + Send>>;
pub type Inbox = mpsc::UnboundedReceiver<Message>;

/// Robonomics Publisher/Subscriber.
pub trait PubSub {
Expand Down Expand Up @@ -75,5 +73,9 @@ pub trait PubSub {
fn unsubscribe<T: ToString>(&self, topic_name: &T) -> FutureResult<bool>;

/// Publish message into the topic by name.
fn publish<T: ToString, M: Into<Vec<u8>>>(&self, topic_name: &T, message: M);
fn publish<T: ToString, M: Into<Vec<u8>>>(
&self,
topic_name: &T,
message: M,
) -> FutureResult<bool>;
}
89 changes: 48 additions & 41 deletions protocol/src/pubsub/gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use libp2p::gossipsub::{
Gossipsub, GossipsubConfigBuilder, GossipsubEvent, GossipsubMessage, MessageAuthenticity,
MessageId, Sha256Topic as Topic, TopicHash,
};
use libp2p::swarm::SwarmEvent;
use libp2p::{Multiaddr, PeerId, Swarm};
use libp2p_swarm::SwarmEvent;
//use libp2p::swarm::SwarmEvent;

use std::{
collections::hash_map::{DefaultHasher, HashMap},
hash::{Hash, Hasher},
Expand All @@ -52,7 +52,7 @@ enum ToWorkerMsg {
Listeners(oneshot::Sender<Vec<Multiaddr>>),
Subscribe(String, mpsc::UnboundedSender<super::Message>),
Unsubscribe(String, oneshot::Sender<bool>),
Publish(String, Vec<u8>),
Publish(String, Vec<u8>, oneshot::Sender<bool>),
}

struct PubSubWorker {
Expand Down Expand Up @@ -113,56 +113,52 @@ impl PubSubWorker {
target: "robonomics-pubsub",
"Listener for address {} created: {:?}", address, listener
);

Ok(listener)
}

fn listeners(&self) -> Vec<Multiaddr> {
let listeners = Swarm::listeners(&self.swarm).cloned().collect();
log::debug!(target: "robonomics-pubsub", "Listeners: {:?}", listeners);

listeners
}

fn connect(&mut self, address: Multiaddr) -> bool {
log::debug!(target: "robonomics-pubsub", "Connecting to {}", address);
fn connect(&mut self, address: Multiaddr) -> Result<()> {
Swarm::dial_addr(&mut self.swarm, address.clone())?;
log::debug!(target: "robonomics-pubsub", "Connected to {}", address);

Swarm::dial_addr(&mut self.swarm, address).is_ok()
Ok(())
}

fn subscribe(
&mut self,
topic_name: String,
inbox: mpsc::UnboundedSender<super::Message>,
) -> bool {
) -> Result<bool> {
let topic = Topic::new(topic_name.clone());
let subscribed = self.swarm.behaviour_mut().subscribe(&topic);
if subscribed.is_ok() {
log::debug!(target: "robonomics-pubsub", "Subscribed to {}", topic_name);
self.inbox.insert(topic.hash(), inbox);
} else {
log::warn!(target: "robonomics-pubsub",
"Subscription error {:?}", subscribed);
}
subscribed.is_ok()
self.swarm.behaviour_mut().subscribe(&topic)?;
self.inbox.insert(topic.hash(), inbox);
log::debug!(target: "robonomics-pubsub", "Subscribed to {}", topic_name);

Ok(true)
}

fn unsubscribe(&mut self, topic_name: String) -> bool {
fn unsubscribe(&mut self, topic_name: String) -> Result<bool> {
let topic = Topic::new(topic_name.clone());
let unsubscribed = self.swarm.behaviour_mut().unsubscribe(&topic);
if unsubscribed.is_ok() {
log::debug!(target: "robonomics-pubsub", "Unsubscribed from {}", topic_name);
self.inbox.remove(&topic.hash());
} else {
log::warn!(target: "robonomics-pubsub",
"Unsubscribe error {:?}", unsubscribed);
}
unsubscribed.is_ok()
self.swarm.behaviour_mut().unsubscribe(&topic)?;
self.inbox.remove(&topic.hash());
log::debug!(target: "robonomics-pubsub", "Unsubscribed from {}", topic_name);

Ok(true)
}

fn publish(&mut self, topic_name: String, message: Vec<u8>) {
fn publish(&mut self, topic_name: String, message: Vec<u8>) -> Result<bool> {
let topic = Topic::new(topic_name.clone());
self.swarm.behaviour_mut().publish(topic.clone(), message)?;
log::debug!(target: "robonomics-pubsub", "Publish to {}", topic_name);

let topic = Topic::new(topic_name);
let _ = self.swarm.behaviour_mut().publish(topic, message);
Ok(true)
}
}

Expand Down Expand Up @@ -203,7 +199,9 @@ impl Future for PubSubWorker {
},
_ => {}
},
Poll::Ready(None) | Poll::Pending => break,
Poll::Ready(None) | Poll::Pending => {
break;
}
}
}

Expand All @@ -213,20 +211,20 @@ impl Future for PubSubWorker {
ToWorkerMsg::Listen(addr, result) => {
let _ = result.send(self.listen(addr).is_ok());
}
ToWorkerMsg::Connect(addr, result) => {
let _ = result.send(self.connect(addr));
}
ToWorkerMsg::Listeners(result) => {
let _ = result.send(self.listeners());
}
ToWorkerMsg::Connect(addr, result) => {
let _ = result.send(self.connect(addr).is_ok());
}
ToWorkerMsg::Subscribe(topic_name, inbox) => {
let _ = self.subscribe(topic_name, inbox);
}
ToWorkerMsg::Unsubscribe(topic_name, result) => {
let _ = result.send(self.unsubscribe(topic_name));
let _ = result.send(self.unsubscribe(topic_name).is_ok());
}
ToWorkerMsg::Publish(topic_name, message) => {
self.publish(topic_name, message);
ToWorkerMsg::Publish(topic_name, message, result) => {
let _ = result.send(self.publish(topic_name, message).is_ok());
}
},
Poll::Ready(None) | Poll::Pending => break,
Expand Down Expand Up @@ -283,7 +281,8 @@ impl super::PubSub for PubSub {
let _ = self
.to_worker
.unbounded_send(ToWorkerMsg::Subscribe(topic_name.to_string(), sender));
receiver.boxed()

receiver
}

fn unsubscribe<T: ToString>(&self, topic_name: &T) -> FutureResult<bool> {
Expand All @@ -294,9 +293,17 @@ impl super::PubSub for PubSub {
receiver.boxed()
}

fn publish<T: ToString, M: Into<Vec<u8>>>(&self, topic_name: &T, message: M) {
let _ = self
.to_worker
.unbounded_send(ToWorkerMsg::Publish(topic_name.to_string(), message.into()));
fn publish<T: ToString, M: Into<Vec<u8>>>(
&self,
topic_name: &T,
message: M,
) -> FutureResult<bool> {
let (sender, receiver) = oneshot::channel();
let _ = self.to_worker.unbounded_send(ToWorkerMsg::Publish(
topic_name.to_string(),
message.into(),
sender,
));
receiver.boxed()
}
}
Loading