diff --git a/Cargo.lock b/Cargo.lock index 23fc82ffa..f77386d3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8236,7 +8236,6 @@ dependencies = [ "jsonrpc-derive", "jsonrpc-pubsub", "libp2p", - "libp2p-swarm", "log", "parity-scale-codec", "rand 0.8.5", diff --git a/protocol/Cargo.toml b/protocol/Cargo.toml index 2d9b2ead8..d2b8138a8 100644 --- a/protocol/Cargo.toml +++ b/protocol/Cargo.toml @@ -21,7 +21,6 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot- sp-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.18" } sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.18" } libp2p = "0.40.0" -libp2p-swarm = "0.31.0" futures = "0.3.15" bincode = "1.3.1" log = "0.4.11" diff --git a/protocol/src/error.rs b/protocol/src/error.rs index ea63ddd0e..d2f7d9c46 100644 --- a/protocol/src/error.rs +++ b/protocol/src/error.rs @@ -21,8 +21,8 @@ use futures::channel::oneshot; use futures::Future; use libp2p::core::connection::ConnectionLimit; use libp2p::core::transport::TransportError; +use libp2p::{gossipsub, swarm}; use std::pin::Pin; -// use substrate_subxt::MetadataError; /// Protocol Result typedef. pub type Result = std::result::Result; @@ -42,14 +42,16 @@ pub enum Error { Transport(TransportError), /// Libp2p connection limit error. ConnectionLimit(ConnectionLimit), - // /// Transaction sending error. - // SubmitFailure(substrate_subxt::Error), /// Codec error. Codec(bincode::Error), /// Unable to decode address. Ss58CodecError, - /// Unable to get metadata. - MetadataError, + /// Libp2p swarm dial error. + DialError, + /// Libp2p gossipsub subscribe error. + SubscriptionError, + /// Libp2p gossipsub publish error. + PublishError, /// Other error. Other(String), } @@ -60,12 +62,6 @@ impl<'a> From<&'a str> for Error { } } -// impl From for Error { -// fn from(_: MetadataError) -> Self { -// Error::MetadataError -// } -// } - impl std::error::Error for Error { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { @@ -75,3 +71,21 @@ impl std::error::Error for Error { } } } + +impl From for Error { + fn from(_: swarm::DialError) -> Self { + Error::DialError + } +} + +impl From for Error { + fn from(_: gossipsub::error::SubscriptionError) -> Self { + Error::SubscriptionError + } +} + +impl From for Error { + fn from(_: gossipsub::error::PublishError) -> Self { + Error::PublishError + } +} diff --git a/protocol/src/pubsub.rs b/protocol/src/pubsub.rs index 3033a1efb..931117ae0 100644 --- a/protocol/src/pubsub.rs +++ b/protocol/src/pubsub.rs @@ -18,18 +18,16 @@ //! Robonomics Network broadcasting layer. use crate::error::FutureResult; -use futures::Stream; +use futures::channel::mpsc; use std::fmt; -use std::pin::Pin; +pub use gossipsub::PubSub as Gossipsub; pub use libp2p::{Multiaddr, PeerId}; pub mod discovery; pub mod gossipsub; pub mod pubsubapi; -pub use gossipsub::PubSub as Gossipsub; - /// Robonomics PubSub message. #[derive(PartialEq, Eq, Clone, Debug)] pub struct Message { @@ -44,7 +42,7 @@ impl fmt::Display for Message { } /// Stream of incoming messages. -pub type Inbox = Pin + Send>>; +pub type Inbox = mpsc::UnboundedReceiver; /// Robonomics Publisher/Subscriber. pub trait PubSub { @@ -75,5 +73,9 @@ pub trait PubSub { fn unsubscribe(&self, topic_name: &T) -> FutureResult; /// Publish message into the topic by name. - fn publish>>(&self, topic_name: &T, message: M); + fn publish>>( + &self, + topic_name: &T, + message: M, + ) -> FutureResult; } diff --git a/protocol/src/pubsub/gossipsub.rs b/protocol/src/pubsub/gossipsub.rs index d1e4edcc0..f6a232f93 100644 --- a/protocol/src/pubsub/gossipsub.rs +++ b/protocol/src/pubsub/gossipsub.rs @@ -32,9 +32,9 @@ use libp2p::gossipsub::{ Gossipsub, GossipsubConfigBuilder, GossipsubEvent, GossipsubMessage, MessageAuthenticity, MessageId, Sha256Topic as Topic, TopicHash, }; +use libp2p::swarm::SwarmEvent; use libp2p::{Multiaddr, PeerId, Swarm}; -use libp2p_swarm::SwarmEvent; -//use libp2p::swarm::SwarmEvent; + use std::{ collections::hash_map::{DefaultHasher, HashMap}, hash::{Hash, Hasher}, @@ -52,7 +52,7 @@ enum ToWorkerMsg { Listeners(oneshot::Sender>), Subscribe(String, mpsc::UnboundedSender), Unsubscribe(String, oneshot::Sender), - Publish(String, Vec), + Publish(String, Vec, oneshot::Sender), } struct PubSubWorker { @@ -113,56 +113,52 @@ impl PubSubWorker { target: "robonomics-pubsub", "Listener for address {} created: {:?}", address, listener ); + Ok(listener) } fn listeners(&self) -> Vec { let listeners = Swarm::listeners(&self.swarm).cloned().collect(); log::debug!(target: "robonomics-pubsub", "Listeners: {:?}", listeners); + listeners } - fn connect(&mut self, address: Multiaddr) -> bool { - log::debug!(target: "robonomics-pubsub", "Connecting to {}", address); + fn connect(&mut self, address: Multiaddr) -> Result<()> { + Swarm::dial_addr(&mut self.swarm, address.clone())?; + log::debug!(target: "robonomics-pubsub", "Connected to {}", address); - Swarm::dial_addr(&mut self.swarm, address).is_ok() + Ok(()) } fn subscribe( &mut self, topic_name: String, inbox: mpsc::UnboundedSender, - ) -> bool { + ) -> Result { let topic = Topic::new(topic_name.clone()); - let subscribed = self.swarm.behaviour_mut().subscribe(&topic); - if subscribed.is_ok() { - log::debug!(target: "robonomics-pubsub", "Subscribed to {}", topic_name); - self.inbox.insert(topic.hash(), inbox); - } else { - log::warn!(target: "robonomics-pubsub", - "Subscription error {:?}", subscribed); - } - subscribed.is_ok() + self.swarm.behaviour_mut().subscribe(&topic)?; + self.inbox.insert(topic.hash(), inbox); + log::debug!(target: "robonomics-pubsub", "Subscribed to {}", topic_name); + + Ok(true) } - fn unsubscribe(&mut self, topic_name: String) -> bool { + fn unsubscribe(&mut self, topic_name: String) -> Result { let topic = Topic::new(topic_name.clone()); - let unsubscribed = self.swarm.behaviour_mut().unsubscribe(&topic); - if unsubscribed.is_ok() { - log::debug!(target: "robonomics-pubsub", "Unsubscribed from {}", topic_name); - self.inbox.remove(&topic.hash()); - } else { - log::warn!(target: "robonomics-pubsub", - "Unsubscribe error {:?}", unsubscribed); - } - unsubscribed.is_ok() + self.swarm.behaviour_mut().unsubscribe(&topic)?; + self.inbox.remove(&topic.hash()); + log::debug!(target: "robonomics-pubsub", "Unsubscribed from {}", topic_name); + + Ok(true) } - fn publish(&mut self, topic_name: String, message: Vec) { + fn publish(&mut self, topic_name: String, message: Vec) -> Result { + let topic = Topic::new(topic_name.clone()); + self.swarm.behaviour_mut().publish(topic.clone(), message)?; log::debug!(target: "robonomics-pubsub", "Publish to {}", topic_name); - let topic = Topic::new(topic_name); - let _ = self.swarm.behaviour_mut().publish(topic, message); + Ok(true) } } @@ -203,7 +199,9 @@ impl Future for PubSubWorker { }, _ => {} }, - Poll::Ready(None) | Poll::Pending => break, + Poll::Ready(None) | Poll::Pending => { + break; + } } } @@ -213,20 +211,20 @@ impl Future for PubSubWorker { ToWorkerMsg::Listen(addr, result) => { let _ = result.send(self.listen(addr).is_ok()); } - ToWorkerMsg::Connect(addr, result) => { - let _ = result.send(self.connect(addr)); - } ToWorkerMsg::Listeners(result) => { let _ = result.send(self.listeners()); } + ToWorkerMsg::Connect(addr, result) => { + let _ = result.send(self.connect(addr).is_ok()); + } ToWorkerMsg::Subscribe(topic_name, inbox) => { let _ = self.subscribe(topic_name, inbox); } ToWorkerMsg::Unsubscribe(topic_name, result) => { - let _ = result.send(self.unsubscribe(topic_name)); + let _ = result.send(self.unsubscribe(topic_name).is_ok()); } - ToWorkerMsg::Publish(topic_name, message) => { - self.publish(topic_name, message); + ToWorkerMsg::Publish(topic_name, message, result) => { + let _ = result.send(self.publish(topic_name, message).is_ok()); } }, Poll::Ready(None) | Poll::Pending => break, @@ -283,7 +281,8 @@ impl super::PubSub for PubSub { let _ = self .to_worker .unbounded_send(ToWorkerMsg::Subscribe(topic_name.to_string(), sender)); - receiver.boxed() + + receiver } fn unsubscribe(&self, topic_name: &T) -> FutureResult { @@ -294,9 +293,17 @@ impl super::PubSub for PubSub { receiver.boxed() } - fn publish>>(&self, topic_name: &T, message: M) { - let _ = self - .to_worker - .unbounded_send(ToWorkerMsg::Publish(topic_name.to_string(), message.into())); + fn publish>>( + &self, + topic_name: &T, + message: M, + ) -> FutureResult { + let (sender, receiver) = oneshot::channel(); + let _ = self.to_worker.unbounded_send(ToWorkerMsg::Publish( + topic_name.to_string(), + message.into(), + sender, + )); + receiver.boxed() } } diff --git a/protocol/src/pubsub/pubsubapi.rs b/protocol/src/pubsub/pubsubapi.rs index 5e63da8e5..5588e6fb7 100644 --- a/protocol/src/pubsub/pubsubapi.rs +++ b/protocol/src/pubsub/pubsubapi.rs @@ -16,21 +16,77 @@ // /////////////////////////////////////////////////////////////////////////////// //! Robonomics Publisher/Subscriber protocol. - -use crate::pubsub::{Gossipsub, Message, PubSub}; -use futures::{executor, StreamExt}; +//! +//! A basic pubsub client demonstrating libp2p and the gossipsub protocol. +//! +//! Using two terminal windows, start two instances. +//! ```sh +//! target/debug/robonomics --dev --tmp -l rpc=trace +//! target/debug/robonomics --dev --tmp --ws-port 9991 -l rpc=trace +//! ``` +//! +//! Then using two terminal windows, start two clients. +//! One of them will send messages, the other one will catch them and print. +//! You can of course open more terminal windows and add more participants. +//! +//! Pubsub subscribe: +//! +//! ```{python} +//! import time +//! import robonomicsinterface as RI +//! from robonomicsinterface import PubSub +//! +//! def subscription_handler(obj, update_nr, subscription_id): +//! print(obj['params']['result']) +//! if update_nr >= 2: +//! return 0 +//! +//! interface = RI.RobonomicsInterface(remote_ws="ws://127.0.0.1:9944") +//! pubsub = PubSub(interface) +//! +//! print(pubsub.listen("/ip4/127.0.0.1/tcp/44440")) +//! time.sleep(2) +//! print(pubsub.connect("/ip4/127.0.0.1/tcp/44441")) +//! print(pubsub.subscribe("topic_name", result_handler=subscription_handler)) +//! ``` +//! +//! Pubsub publish: +//! +//! ```{python} +//! import time +//! import robonomicsinterface as RI +//! from robonomicsinterface import PubSub +//! +//! interface = RI.RobonomicsInterface(remote_ws="ws://127.0.0.1:9991") +//! pubsub = PubSub(interface) +//! +//! print(pubsub.listen("/ip4/127.0.0.1/tcp/44441")) +//! time.sleep(2) +//! print(pubsub.connect("/ip4/127.0.0.1/tcp/44440")) +//! +//! for i in range(10): +//! time.sleep(2) +//! print("publish:", pubsub.publish("topic_name", "message_" + str(time.time()))) +//! ``` + +use crate::pubsub::{Gossipsub, PubSub}; +use futures::executor; use jsonrpc_core::Result; use jsonrpc_derive::rpc; -use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; +use jsonrpc_pubsub::{ + typed::{Sink, Subscriber}, + SubscriptionId, +}; use libp2p::Multiaddr; use rand::Rng; use std::sync::{Arc, Mutex}; -use std::{collections::HashMap, thread}; +use std::{collections::HashMap, str, thread}; #[derive(Clone)] pub struct PubSubApi { pubsub: Arc, - subscriptions: Arc>>, + subscriptions: Arc>>>, + topics: Arc>>, } impl PubSubApi { @@ -38,11 +94,12 @@ impl PubSubApi { PubSubApi { pubsub, subscriptions: Arc::new(Mutex::new(HashMap::new())), + topics: Arc::new(Mutex::new(HashMap::new())), } } } -#[rpc(server)] +#[rpc] pub trait PubSubT { type Metadata; @@ -102,43 +159,54 @@ impl PubSubT for PubSubApi { executor::block_on(async { self.pubsub.connect(address).await }).or(Ok(false)) } - fn subscribe(&self, _: Self::Metadata, subscriber: Subscriber, topic_name: String) { - let inbox = self.pubsub.subscribe(&topic_name); + fn subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber, topic_name: String) { + let mut inbox = self.pubsub.subscribe(&topic_name.clone()); let mut rng = rand::thread_rng(); let subscription_id = SubscriptionId::Number(rng.gen()); + let sink = subscriber.assign_id(subscription_id.clone()).unwrap(); self.subscriptions .lock() .unwrap() - .insert(subscription_id.clone(), topic_name); - - thread::spawn(move || { - let sink = subscriber.assign_id(subscription_id).unwrap(); - let _ = inbox.map(|m: Message| { - let _ = sink.notify(Ok(m.to_string())); - }); + .insert(subscription_id.clone(), sink.clone()); + self.topics + .lock() + .unwrap() + .insert(subscription_id, topic_name); + + thread::spawn(move || loop { + match inbox.try_next() { + // Message is fetched. + Ok(Some(message)) => { + if let Ok(message) = str::from_utf8(&message.data) { + let _ = sink.notify(Ok(message.to_string())); + } else { + continue; + } + } + // Channel is closed and no messages left in the queue. + Ok(None) => break, + + // There are no messages available, but channel is not yet closed. + Err(_) => {} + } }); } - fn unsubscribe( - &self, - _: Option, - subscription_id: SubscriptionId, - ) -> Result { - if let Some(topic_name) = self.subscriptions.lock().unwrap().get(&subscription_id) { - self.pubsub.unsubscribe(&topic_name); - self.subscriptions.lock().unwrap().remove(&subscription_id); - Ok(true) - } else { - Ok(false) - } + fn unsubscribe(&self, _meta: Option, sid: SubscriptionId) -> Result { + if let Some(topic_name) = self.topics.lock().unwrap().remove(&sid) { + let _ = self.subscriptions.lock().unwrap().remove(&sid); + let _ = executor::block_on(async { self.pubsub.unsubscribe(&topic_name).await }); + }; + + Ok(true) } fn publish(&self, topic_name: String, message: String) -> Result { executor::block_on(async { self.pubsub .publish(&topic_name, message.as_bytes().to_vec()) - }); - - Ok(true) + .await + }) + .or(Ok(false)) } }