Skip to content

Commit

Permalink
Upgrade libp2p to version 0.53.1
Browse files Browse the repository at this point in the history
Upgrade libp2p to version 0.53.1 using a forked version for now due
to the lack of support of workers in the `websocket-websys`
transport.
This also modifies most of the network code to use the adopted libp2p
naming standard.

Co-authored-by: hrxi <[email protected]>
  • Loading branch information
hrxi authored and jsdanielh committed Nov 17, 2023
1 parent 6bf3132 commit d13d4aa
Show file tree
Hide file tree
Showing 27 changed files with 1,552 additions and 2,621 deletions.
2,416 changes: 666 additions & 1,750 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,5 @@ tokio-console = ["console-subscriber", "logging", "tokio/tracing"]
tokio-websocket = ["nimiq-network-libp2p/tokio-websocket"]
validator = ["database-storage", "nimiq-mempool", "nimiq-validator", "nimiq-validator-network", "nimiq-rpc-server"]
wallet = ["database-storage", "nimiq-wallet"]
wasm-websocket = ["nimiq-network-libp2p/wasm-websocket"]
web-logging = ["nimiq-log", "time/wasm-bindgen", "tracing-subscriber", "tracing-web"]
zkp-prover = ["nimiq-zkp/zkp-prover", "nimiq-zkp-circuits/zkp-prover", "nimiq-zkp-component/zkp-prover", "nimiq-zkp-primitives/zkp-prover"]
2 changes: 1 addition & 1 deletion network-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async-trait = "0.1"
bitflags = { version = "2.0", features = ["serde"] }
futures = { package = "futures-util", version = "0.3" }
log = { package = "tracing", version = "0.1", features = ["log"] }
multiaddr = "0.16"
multiaddr = "0.18"
serde = "1.0"
thiserror = "1.0"
tokio = { version = "1.32", features = ["rt"] }
Expand Down
11 changes: 7 additions & 4 deletions network-libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ futures = { package = "futures-util", version = "0.3" }
hex = "0.4"
instant = { version = "0.1", features = [ "wasm-bindgen" ] }
ip_network = "0.4"
libp2p-websys-transport = { git = "https://github.com/jsdanielh/libp2p-websys-transport.git", optional = true }
log = { package = "tracing", version = "0.1", features = ["log"] }
parking_lot = "0.12"
pin-project = "1.1"
Expand All @@ -36,6 +35,7 @@ serde-big-array = "0.5"
thiserror = "1.0"
tokio = { version = "1.32", features = ["macros", "rt", "tracing"] }
tokio-stream = "0.1"
void = "1.0"
wasm-timer = "0.2"

nimiq-bls = { workspace = true }
Expand All @@ -51,8 +51,9 @@ nimiq-utils = { workspace = true, features = [
] }
nimiq-validator-network = { workspace = true }

# For now we need to use a fork
[target.'cfg(not(target_family = "wasm"))'.dependencies]
libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", default-features = false, features = [
libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", branch = "libp2p_0.53.1", default-features = false, features = [
"gossipsub",
"identify",
"kad",
Expand All @@ -61,11 +62,13 @@ libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", default-feature
"ping",
"request-response",
"serde",
"tokio",
"yamux",
] }

# For now we need to use a fork
[target.'cfg(target_family = "wasm")'.dependencies]
libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", default-features = false, features = [
libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", branch = "libp2p_0.53.1", default-features = false, features = [
"gossipsub",
"identify",
"kad",
Expand All @@ -76,6 +79,7 @@ libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", default-feature
"serde",
"yamux",
"wasm-bindgen",
"websocket-websys",
] }

[dev-dependencies]
Expand All @@ -88,4 +92,3 @@ nimiq-test-log = { workspace = true }
metrics = ["prometheus-client"]
tokio-time = ["tokio/time"]
tokio-websocket = ["libp2p/dns", "libp2p/tcp", "libp2p/tokio", "libp2p/websocket"]
wasm-websocket = ["libp2p-websys-transport"]
1 change: 1 addition & 0 deletions network-libp2p/TODO
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
HandlerOutEvent → HandlerToSwarm?
181 changes: 91 additions & 90 deletions network-libp2p/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,178 +1,178 @@
use std::{iter, sync::Arc};

use libp2p::{
core::either::EitherError,
gossipsub::{
error::GossipsubHandlerError, Gossipsub, GossipsubEvent, MessageAuthenticity,
PeerScoreParams, PeerScoreThresholds,
},
identify::{Behaviour as IdentifyBehaviour, Config as IdentifyConfig, Event as IdentifyEvent},
kad::{store::MemoryStore, Kademlia, KademliaEvent},
ping::{
Behaviour as PingBehaviour, Config as PingConfig, Event as PingEvent,
Failure as PingFailure,
},
request_response::{
ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent as ReqResEvent,
},
swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour},
Multiaddr, PeerId,
connection_limits, gossipsub, identify,
kad::{self, store::MemoryStore},
ping, request_response,
swarm::NetworkBehaviour,
Multiaddr, PeerId, StreamProtocol,
};
use nimiq_utils::time::OffsetTime;
use parking_lot::RwLock;
use void::Void;

use crate::{
connection_pool::{
behaviour::{ConnectionPoolBehaviour, ConnectionPoolEvent},
handler::ConnectionPoolHandlerError,
},
discovery::{
behaviour::{DiscoveryBehaviour, DiscoveryEvent},
handler::DiscoveryHandlerError,
peer_contacts::PeerContactBook,
},
dispatch::codecs::typed::{IncomingRequest, MessageCodec, OutgoingResponse, ReqResProtocol},
connection_pool,
discovery::{self, peer_contacts::PeerContactBook},
dispatch::codecs::typed::{IncomingRequest, MessageCodec, OutgoingResponse},
Config,
};

pub type NimiqNetworkBehaviourError = EitherError<
EitherError<
EitherError<
EitherError<
EitherError<
EitherError<std::io::Error, DiscoveryHandlerError>,
GossipsubHandlerError,
>,
std::io::Error,
>,
PingFailure,
>,
ConnectionPoolHandlerError,
>,
ConnectionHandlerUpgrErr<std::io::Error>,
>;

pub type RequestResponseEvent = ReqResEvent<IncomingRequest, OutgoingResponse>;
/// Maximum simultaneous libp2p connections per peer
const MAX_CONNECTIONS_PER_PEER: u32 = 2;

pub type RequestResponseEvent = request_response::Event<IncomingRequest, OutgoingResponse>;

#[derive(Debug)]
pub enum NimiqEvent {
Dht(KademliaEvent),
Discovery(DiscoveryEvent),
Gossip(GossipsubEvent),
Identify(IdentifyEvent),
Ping(PingEvent),
Pool(ConnectionPoolEvent),
pub enum Event {
Dht(kad::Event),
Discovery(discovery::Event),
Gossip(gossipsub::Event),
Identify(identify::Event),
Ping(ping::Event),
Pool(connection_pool::Event),
RequestResponse(RequestResponseEvent),
Void,
}

impl From<KademliaEvent> for NimiqEvent {
fn from(event: KademliaEvent) -> Self {
impl From<kad::Event> for Event {
fn from(event: kad::Event) -> Self {
Self::Dht(event)
}
}

impl From<DiscoveryEvent> for NimiqEvent {
fn from(event: DiscoveryEvent) -> Self {
impl From<discovery::Event> for Event {
fn from(event: discovery::Event) -> Self {
Self::Discovery(event)
}
}

impl From<GossipsubEvent> for NimiqEvent {
fn from(event: GossipsubEvent) -> Self {
impl From<gossipsub::Event> for Event {
fn from(event: gossipsub::Event) -> Self {
Self::Gossip(event)
}
}

impl From<IdentifyEvent> for NimiqEvent {
fn from(event: IdentifyEvent) -> Self {
impl From<identify::Event> for Event {
fn from(event: identify::Event) -> Self {
Self::Identify(event)
}
}

impl From<ConnectionPoolEvent> for NimiqEvent {
fn from(event: ConnectionPoolEvent) -> Self {
impl From<connection_pool::Event> for Event {
fn from(event: connection_pool::Event) -> Self {
Self::Pool(event)
}
}

impl From<PingEvent> for NimiqEvent {
fn from(event: PingEvent) -> Self {
impl From<ping::Event> for Event {
fn from(event: ping::Event) -> Self {
Self::Ping(event)
}
}

impl From<RequestResponseEvent> for NimiqEvent {
impl From<RequestResponseEvent> for Event {
fn from(event: RequestResponseEvent) -> Self {
Self::RequestResponse(event)
}
}

impl From<Void> for Event {
fn from(_event: Void) -> Self {
Self::Void
}
}

/// Network behaviour.
/// This is composed of several other behaviours that build a tree of behaviours using
/// the `NetworkBehaviour` macro and the order of listed behaviours matters.
/// The first behaviours are behaviours that can close connections before establishing them
/// such as connection limits and the connection pool. They must be at the top since they
/// other behaviours such as request-response do not handle well that a connection is
/// denied in a behaviour that is "after".
/// See: https://github.com/libp2p/rust-libp2p/pull/4777#discussion_r1389951783.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "NimiqEvent")]
pub struct NimiqBehaviour {
pub dht: Kademlia<MemoryStore>,
pub discovery: DiscoveryBehaviour,
pub gossipsub: Gossipsub,
pub identify: IdentifyBehaviour,
pub ping: PingBehaviour,
pub pool: ConnectionPoolBehaviour,
pub request_response: RequestResponse<MessageCodec>,
#[behaviour(out_event = "Event")]
pub struct Behaviour {
pub connection_limits: connection_limits::Behaviour,
pub pool: connection_pool::Behaviour,
pub discovery: discovery::Behaviour,
pub dht: kad::Behaviour<MemoryStore>,
pub gossipsub: gossipsub::Behaviour,
pub identify: identify::Behaviour,
pub ping: ping::Behaviour,
pub request_response: request_response::Behaviour<MessageCodec>,
}

impl NimiqBehaviour {
impl Behaviour {
pub fn new(
config: Config,
clock: Arc<OffsetTime>,
contacts: Arc<RwLock<PeerContactBook>>,
peer_score_params: PeerScoreParams,
peer_score_params: gossipsub::PeerScoreParams,
) -> Self {
let public_key = config.keypair.public();
let peer_id = public_key.to_peer_id();

// DHT behaviour
let store = MemoryStore::new(peer_id);
let dht = Kademlia::with_config(peer_id, store, config.kademlia);
let mut dht = kad::Behaviour::with_config(peer_id, store, config.kademlia);
// Fixme: This could be avoided with a protocol such as Autonat that properly set external addresses to the
// swarm and also avoids us to add addresses that are purely connection candidates.
dht.set_mode(Some(kad::Mode::Server));

// Discovery behaviour
let discovery = DiscoveryBehaviour::new(
let discovery = discovery::Behaviour::new(
config.discovery.clone(),
config.keypair.clone(),
Arc::clone(&contacts),
clock,
);

// Gossipsub behaviour
let thresholds = PeerScoreThresholds::default();
let mut gossipsub = Gossipsub::new(MessageAuthenticity::Author(peer_id), config.gossipsub)
.expect("Wrong configuration");
let thresholds = gossipsub::PeerScoreThresholds::default();
let mut gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Author(peer_id),
config.gossipsub,
)
.expect("Wrong configuration");
gossipsub
.with_peer_score(peer_score_params, thresholds)
.expect("Valid score params and thresholds");

// Identify behaviour
let identify_config = IdentifyConfig::new("/albatross/2.0".to_string(), public_key);
let identify = IdentifyBehaviour::new(identify_config);
let identify_config = identify::Config::new("/albatross/2.0".to_string(), public_key);
let identify = identify::Behaviour::new(identify_config);

// Ping behaviour:
// - Send a ping every 15 seconds and timeout at 20 seconds.
// - The ping behaviour will close the connection if a ping timeouts.
let ping = PingBehaviour::new(PingConfig::new());
let ping = ping::Behaviour::new(ping::Config::new());

// Connection pool behaviour
let pool = ConnectionPoolBehaviour::new(
let pool = connection_pool::Behaviour::new(
Arc::clone(&contacts),
peer_id,
config.seeds,
config.discovery.required_services,
);

// Request Response behaviour
let codec = MessageCodec::default();
let protocol = ReqResProtocol::Version1;
let config = RequestResponseConfig::default();
let request_response =
RequestResponse::new(codec, iter::once((protocol, ProtocolSupport::Full)), config);
let protocol = StreamProtocol::new("/nimiq/reqres/0.0.1");
let config = request_response::Config::default();
let request_response = request_response::Behaviour::new(
iter::once((protocol, request_response::ProtocolSupport::Full)),
config,
);

// Connection limits behaviour
let limits = connection_limits::ConnectionLimits::default()
.with_max_pending_incoming(Some(16))
.with_max_pending_outgoing(Some(16))
.with_max_established_incoming(Some(4800))
.with_max_established_outgoing(Some(4800))
.with_max_established_per_peer(Some(MAX_CONNECTIONS_PER_PEER));
let connection_limits = connection_limits::Behaviour::new(limits);

Self {
dht,
Expand All @@ -182,6 +182,7 @@ impl NimiqBehaviour {
ping,
pool,
request_response,
connection_limits,
}
}

Expand Down
Loading

0 comments on commit d13d4aa

Please sign in to comment.