Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols/mdns: Make libp2p-mdns socket agnostic #1699

Merged
merged 5 commits into from
Aug 18, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ default = [
"identify",
"kad",
"gossipsub",
"mdns",
"mdns-async-std",
"mplex",
"noise",
"ping",
Expand All @@ -38,7 +38,8 @@ floodsub = ["libp2p-floodsub"]
identify = ["libp2p-identify"]
kad = ["libp2p-kad"]
gossipsub = ["libp2p-gossipsub"]
mdns = ["libp2p-mdns"]
mdns-async-std = ["libp2p-mdns", "libp2p-mdns/async-std"]
mdns-tokio = ["libp2p-mdns", "libp2p-mdns/tokio"]
mplex = ["libp2p-mplex"]
noise = ["libp2p-noise"]
ping = ["libp2p-ping"]
Expand Down Expand Up @@ -96,6 +97,7 @@ libp2p-websocket = { version = "0.22.0", path = "transports/websocket", optional
[dev-dependencies]
async-std = "1.6.2"
env_logger = "0.7.1"
tokio = { version = "0.2", features = ["io-util", "io-std", "stream"] }

[workspace]
members = [
Expand Down
3 changes: 2 additions & 1 deletion protocols/mdns/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
async-std = "1.6.2"
async-std = { version = "1.6.2", optional = true }
data-encoding = "2.0"
dns-parser = "0.8"
either = "1.5.3"
Expand All @@ -22,6 +22,7 @@ log = "0.4"
net2 = "0.2"
rand = "0.7"
smallvec = "1.0"
tokio = { version = "0.2", default-features = false, features = ["udp"], optional = true }
void = "1.0"
wasm-timer = "0.2.4"

Expand Down
210 changes: 111 additions & 99 deletions protocols/mdns/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::service::{MdnsService, MdnsPacket, build_query_response, build_service_discovery_response};
use crate::service::{MdnsPacket, build_query_response, build_service_discovery_response};
use futures::prelude::*;
use libp2p_core::{
Multiaddr,
Expand All @@ -41,11 +41,14 @@ use wasm_timer::{Delay, Instant};

const MDNS_RESPONSE_TTL: std::time::Duration = Duration::from_secs(5 * 60);

macro_rules! codegen {
($feature_name:expr, $behaviour_name:ident, $maybe_busy_wrapper:ident, $service_name:ty) => {

/// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds
/// them to the topology.
pub struct Mdns {
pub struct $behaviour_name {
/// The inner service.
service: MaybeBusyMdnsService,
service: $maybe_busy_wrapper,

/// List of nodes that we have discovered, the address, and when their TTL expires.
///
Expand All @@ -63,37 +66,37 @@ pub struct Mdns {
/// and a `MdnsPacket` (similar to the old Tokio socket send style). The two states are thus `Free`
/// with an `MdnsService` or `Busy` with a future returning the original `MdnsService` and an
/// `MdnsPacket`.
enum MaybeBusyMdnsService {
Free(MdnsService),
Busy(Pin<Box<dyn Future<Output = (MdnsService, MdnsPacket)> + Send>>),
enum $maybe_busy_wrapper {
Free($service_name),
Busy(Pin<Box<dyn Future<Output = ($service_name, MdnsPacket)> + Send>>),
Poisoned,
}

impl fmt::Debug for MaybeBusyMdnsService {
impl fmt::Debug for $maybe_busy_wrapper {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
MaybeBusyMdnsService::Free(service) => {
fmt.debug_struct("MaybeBusyMdnsService::Free")
$maybe_busy_wrapper::Free(service) => {
fmt.debug_struct("$maybe_busy_wrapper::Free")
.field("service", service)
.finish()
},
MaybeBusyMdnsService::Busy(_) => {
fmt.debug_struct("MaybeBusyMdnsService::Busy")
$maybe_busy_wrapper::Busy(_) => {
fmt.debug_struct("$maybe_busy_wrapper::Busy")
.finish()
}
MaybeBusyMdnsService::Poisoned => {
fmt.debug_struct("MaybeBusyMdnsService::Poisoned")
$maybe_busy_wrapper::Poisoned => {
fmt.debug_struct("$maybe_busy_wrapper::Poisoned")
.finish()
}
}
}
}

impl Mdns {
impl $behaviour_name {
/// Builds a new `Mdns` behaviour.
pub fn new() -> io::Result<Mdns> {
Ok(Mdns {
service: MaybeBusyMdnsService::Free(MdnsService::new()?),
pub fn new() -> io::Result<$behaviour_name> {
Ok($behaviour_name {
service: $maybe_busy_wrapper::Free(<$service_name>::new()?),
discovered_nodes: SmallVec::new(),
closest_expiration: None,
})
Expand All @@ -110,78 +113,7 @@ impl Mdns {
}
}

/// Event that can be produced by the `Mdns` behaviour.
#[derive(Debug)]
pub enum MdnsEvent {
/// Discovered nodes through mDNS.
Discovered(DiscoveredAddrsIter),

/// The given combinations of `PeerId` and `Multiaddr` have expired.
///
/// Each discovered record has a time-to-live. When this TTL expires and the address hasn't
/// been refreshed, we remove it from the list and emit it as an `Expired` event.
Expired(ExpiredAddrsIter),
}

/// Iterator that produces the list of addresses that have been discovered.
pub struct DiscoveredAddrsIter {
inner: smallvec::IntoIter<[(PeerId, Multiaddr); 4]>
}

impl Iterator for DiscoveredAddrsIter {
type Item = (PeerId, Multiaddr);

#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}

impl ExactSizeIterator for DiscoveredAddrsIter {
}

impl fmt::Debug for DiscoveredAddrsIter {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("DiscoveredAddrsIter")
.finish()
}
}

/// Iterator that produces the list of addresses that have expired.
pub struct ExpiredAddrsIter {
inner: smallvec::IntoIter<[(PeerId, Multiaddr); 4]>
}

impl Iterator for ExpiredAddrsIter {
type Item = (PeerId, Multiaddr);

#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}

impl ExactSizeIterator for ExpiredAddrsIter {
}

impl fmt::Debug for ExpiredAddrsIter {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("ExpiredAddrsIter")
.finish()
}
}

impl NetworkBehaviour for Mdns {
impl NetworkBehaviour for $behaviour_name {
type ProtocolsHandler = DummyProtocolsHandler;
type OutEvent = MdnsEvent;

Expand Down Expand Up @@ -247,32 +179,32 @@ impl NetworkBehaviour for Mdns {

// Polling the mDNS service, and obtain the list of nodes discovered this round.
let discovered = loop {
let service = mem::replace(&mut self.service, MaybeBusyMdnsService::Poisoned);
let service = mem::replace(&mut self.service, $maybe_busy_wrapper::Poisoned);

let packet = match service {
MaybeBusyMdnsService::Free(service) => {
self.service = MaybeBusyMdnsService::Busy(Box::pin(service.next()));
$maybe_busy_wrapper::Free(service) => {
self.service = $maybe_busy_wrapper::Busy(Box::pin(service.next()));
continue;
},
MaybeBusyMdnsService::Busy(mut fut) => {
$maybe_busy_wrapper::Busy(mut fut) => {
match fut.as_mut().poll(cx) {
Poll::Ready((service, packet)) => {
self.service = MaybeBusyMdnsService::Free(service);
self.service = $maybe_busy_wrapper::Free(service);
packet
},
Poll::Pending => {
self.service = MaybeBusyMdnsService::Busy(fut);
self.service = $maybe_busy_wrapper::Busy(fut);
return Poll::Pending;
}
}
},
MaybeBusyMdnsService::Poisoned => panic!("Mdns poisoned"),
$maybe_busy_wrapper::Poisoned => panic!("Mdns poisoned"),
};

match packet {
MdnsPacket::Query(query) => {
// MaybeBusyMdnsService should always be Free.
if let MaybeBusyMdnsService::Free(ref mut service) = self.service {
if let $maybe_busy_wrapper::Free(ref mut service) = self.service {
let resp = build_query_response(
query.query_id(),
params.local_peer_id().clone(),
Expand Down Expand Up @@ -324,7 +256,7 @@ impl NetworkBehaviour for Mdns {
},
MdnsPacket::ServiceDiscovery(disc) => {
// MaybeBusyMdnsService should always be Free.
if let MaybeBusyMdnsService::Free(ref mut service) = self.service {
if let $maybe_busy_wrapper::Free(ref mut service) = self.service {
let resp = build_service_discovery_response(
disc.query_id(),
MDNS_RESPONSE_TTL,
Expand All @@ -349,10 +281,90 @@ impl NetworkBehaviour for Mdns {
}
}

impl fmt::Debug for Mdns {
impl fmt::Debug for $behaviour_name {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Mdns")
.field("service", &self.service)
.finish()
}
}

};
}

#[cfg(feature = "async-std")]
codegen!("async-std", Mdns, MaybeBusyMdnsService, crate::service::MdnsService);

#[cfg(feature = "tokio")]
codegen!("tokio", TokioMdns, MaybeBusyTokioMdnsService, crate::service::TokioMdnsService);

/// Event that can be produced by the `Mdns` behaviour.
#[derive(Debug)]
pub enum MdnsEvent {
/// Discovered nodes through mDNS.
Discovered(DiscoveredAddrsIter),

/// The given combinations of `PeerId` and `Multiaddr` have expired.
///
/// Each discovered record has a time-to-live. When this TTL expires and the address hasn't
/// been refreshed, we remove it from the list and emit it as an `Expired` event.
Expired(ExpiredAddrsIter),
}

/// Iterator that produces the list of addresses that have been discovered.
pub struct DiscoveredAddrsIter {
inner: smallvec::IntoIter<[(PeerId, Multiaddr); 4]>
}

impl Iterator for DiscoveredAddrsIter {
type Item = (PeerId, Multiaddr);

#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}

impl ExactSizeIterator for DiscoveredAddrsIter {
}

impl fmt::Debug for DiscoveredAddrsIter {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("DiscoveredAddrsIter")
.finish()
}
}

/// Iterator that produces the list of addresses that have expired.
pub struct ExpiredAddrsIter {
inner: smallvec::IntoIter<[(PeerId, Multiaddr); 4]>
}

impl Iterator for ExpiredAddrsIter {
type Item = (PeerId, Multiaddr);

#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}

impl ExactSizeIterator for ExpiredAddrsIter {
}

impl fmt::Debug for ExpiredAddrsIter {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("ExpiredAddrsIter")
.finish()
}
}
8 changes: 6 additions & 2 deletions protocols/mdns/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ const SERVICE_NAME: &[u8] = b"_p2p._udp.local";
/// Hardcoded name of the service used for DNS-SD.
const META_QUERY_SERVICE: &[u8] = b"_services._dns-sd._udp.local";

pub use self::behaviour::{Mdns, MdnsEvent};
pub use self::service::MdnsService;
#[cfg(feature = "async-std")]
pub use self::{behaviour::Mdns, service::MdnsService};
#[cfg(feature = "tokio")]
pub use self::{behaviour::TokioMdns, service::TokioMdnsService};

pub use self::behaviour::MdnsEvent;

mod behaviour;
mod dns;
Expand Down
Loading