Skip to content

Commit

Permalink
log bandwidth at StreamMuxer level
Browse files Browse the repository at this point in the history
An alternative approach to libp2p#3161 suggested in libp2p#3157 (comment)

Closes libp2p#3157
  • Loading branch information
melekes committed Nov 30, 2022
1 parent be0b62a commit be5d150
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 128 deletions.
123 changes: 41 additions & 82 deletions src/bandwidth.rs → core/src/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{
core::{
transport::{TransportError, TransportEvent},
Transport,
},
Multiaddr,
};
use crate::muxing::{StreamMuxer, StreamMuxerEvent};

use futures::{
io::{IoSlice, IoSliceMut},
prelude::*,
ready,
};
use libp2p_core::transport::ListenerId;
use std::{
convert::TryFrom as _,
io,
Expand All @@ -43,8 +36,8 @@ use std::{
task::{Context, Poll},
};

/// Wraps around a `Transport` and counts the number of bytes that go through all the opened
/// connections.
/// Wraps around a `StreamMuxer` and counts the number of bytes that go through all the opened
/// streams.
#[derive(Clone)]
#[pin_project::pin_project]
pub struct BandwidthLogging<TInner> {
Expand All @@ -54,99 +47,57 @@ pub struct BandwidthLogging<TInner> {
}

impl<TInner> BandwidthLogging<TInner> {
/// Creates a new [`BandwidthLogging`] around the transport.
pub fn new(inner: TInner) -> (Self, Arc<BandwidthSinks>) {
let sink = Arc::new(BandwidthSinks {
inbound: AtomicU64::new(0),
outbound: AtomicU64::new(0),
});

let trans = BandwidthLogging {
inner,
sinks: sink.clone(),
};

(trans, sink)
/// Creates a new [`BandwidthLogging`] around the stream muxer.
pub fn new(inner: TInner, sinks: Arc<BandwidthSinks>) -> Self {
Self { inner, sinks }
}
}

impl<TInner> Transport for BandwidthLogging<TInner>
impl<SMInner> StreamMuxer for BandwidthLogging<SMInner>
where
TInner: Transport,
SMInner: StreamMuxer,
{
type Output = BandwidthConnecLogging<TInner::Output>;
type Error = TInner::Error;
type ListenerUpgrade = BandwidthFuture<TInner::ListenerUpgrade>;
type Dial = BandwidthFuture<TInner::Dial>;
type Substream = BandwidthConnecLogging<SMInner::Substream>;
type Error = SMInner::Error;

fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
let this = self.project();
match this.inner.poll(cx) {
Poll::Ready(event) => {
let event = event.map_upgrade({
let sinks = this.sinks.clone();
|inner| BandwidthFuture { inner, sinks }
});
Poll::Ready(event)
}
Poll::Pending => Poll::Pending,
}
this.inner.poll(cx)
}

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
self.inner.listen_on(addr)
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
self.inner.remove_listener(id)
}

fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let sinks = self.sinks.clone();
self.inner
.dial(addr)
.map(move |fut| BandwidthFuture { inner: fut, sinks })
}

fn dial_as_listener(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
let sinks = self.sinks.clone();
self.inner
.dial_as_listener(addr)
.map(move |fut| BandwidthFuture { inner: fut, sinks })
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let this = self.project();
let inner = ready!(this.inner.poll_inbound(cx)?);
let logged = BandwidthConnecLogging {
inner,
sinks: this.sinks.clone(),
};
Poll::Ready(Ok(logged))
}
}

/// Wraps around a `Future` that produces a connection. Wraps the connection around a bandwidth
/// counter.
#[pin_project::pin_project]
pub struct BandwidthFuture<TInner> {
#[pin]
inner: TInner,
sinks: Arc<BandwidthSinks>,
}

impl<TInner: TryFuture> Future for BandwidthFuture<TInner> {
type Output = Result<BandwidthConnecLogging<TInner::Ok>, TInner::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let this = self.project();
let inner = ready!(this.inner.try_poll(cx)?);
let inner = ready!(this.inner.poll_outbound(cx)?);
let logged = BandwidthConnecLogging {
inner,
sinks: this.sinks.clone(),
};
Poll::Ready(Ok(logged))
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
this.inner.poll_close(cx)
}
}

/// Allows obtaining the average bandwidth of the connections created from a [`BandwidthLogging`].
Expand All @@ -156,6 +107,14 @@ pub struct BandwidthSinks {
}

impl BandwidthSinks {
/// Returns a new [`BandwidthSinks`].
pub fn new() -> Arc<Self> {
Arc::new(Self {
inbound: AtomicU64::new(0),
outbound: AtomicU64::new(0),
})
}

/// Returns the total number of bytes that have been downloaded on all the connections spawned
/// through the [`BandwidthLogging`].
///
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub type Negotiated<T> = multistream_select::Negotiated<T>;
mod peer_id;
mod translation;

pub mod bandwidth;
pub mod connection;
pub mod either;
pub mod identity;
Expand Down
4 changes: 0 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,6 @@ pub use libp2p_websocket as websocket;
#[doc(inline)]
pub use libp2p_yamux as yamux;

mod transport_ext;

pub mod bandwidth;
pub mod simple;

#[cfg(doc)]
Expand All @@ -154,7 +151,6 @@ pub use self::core::{
pub use self::multiaddr::{multiaddr as build_multiaddr, Multiaddr};
pub use self::simple::SimpleProtocol;
pub use self::swarm::Swarm;
pub use self::transport_ext::TransportExt;

/// Builds a `Transport` based on TCP/IP that supports the most commonly-used features of libp2p:
///
Expand Down
42 changes: 0 additions & 42 deletions src/transport_ext.rs

This file was deleted.

20 changes: 20 additions & 0 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ use futures::{executor::ThreadPoolBuilder, prelude::*, stream::FusedStream};
use libp2p_core::connection::ConnectionId;
use libp2p_core::muxing::SubstreamBox;
use libp2p_core::{
bandwidth::{BandwidthLogging, BandwidthSinks},
connection::ConnectedPoint,
multiaddr::Protocol,
multihash::Multihash,
Expand All @@ -143,6 +144,7 @@ use std::{
convert::TryFrom,
error, fmt, io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use upgrade::UpgradeInfoSend as _;
Expand Down Expand Up @@ -457,6 +459,24 @@ where
SwarmBuilder::without_executor(transport, behaviour, local_peer_id).build()
}

/// Adds a layer on the `Swarm` that logs all trafic that passes through the sockets
/// created by it.
///
/// This method returns an `Arc<BandwidthSinks>` that can be used to retreive the total number
/// of bytes transferred through the sockets.
pub fn with_bandwidth_logging(mut self) -> Arc<BandwidthSinks> {
let sinks = BandwidthSinks::new();
let sinks_copy = sinks.clone();
self.transport = Transport::map(self.transport, |(peer_id, stream_muxer_box), _| {
(
peer_id,
StreamMuxerBox::new(BandwidthLogging::new(stream_muxer_box, sinks_copy)),
)
})
.boxed();
sinks
}

/// Returns information about the connections underlying the [`Swarm`].
pub fn network_info(&self) -> NetworkInfo {
let num_peers = self.pool.num_peers();
Expand Down

0 comments on commit be5d150

Please sign in to comment.