Skip to content

Commit

Permalink
Upgrade libp2p to version 0.53.1
Browse files Browse the repository at this point in the history
Upgrade libp2p to version 0.53.1 using a forked version for now due
to the lack of support of workers in the `websocket-websys`
transport.
This also modifies most of the network code to use the adopted libp2p
naming standard.

Co-authored-by: hrxi <[email protected]>
  • Loading branch information
hrxi authored and jsdanielh committed Nov 20, 2023
1 parent c32f338 commit c447dc2
Show file tree
Hide file tree
Showing 27 changed files with 1,647 additions and 2,700 deletions.
2,438 changes: 675 additions & 1,763 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,5 @@ tokio-console = ["console-subscriber", "logging", "tokio/tracing"]
tokio-websocket = ["nimiq-network-libp2p/tokio-websocket"]
validator = ["database-storage", "nimiq-mempool", "nimiq-validator", "nimiq-validator-network", "nimiq-rpc-server"]
wallet = ["database-storage", "nimiq-wallet"]
wasm-websocket = ["nimiq-network-libp2p/wasm-websocket"]
web-logging = ["nimiq-log", "time/wasm-bindgen", "tracing-subscriber", "tracing-web"]
zkp-prover = ["nimiq-zkp/zkp-prover", "nimiq-zkp-circuits/zkp-prover", "nimiq-zkp-component/zkp-prover", "nimiq-zkp-primitives/zkp-prover"]
2 changes: 1 addition & 1 deletion network-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async-trait = "0.1"
bitflags = { version = "2.0", features = ["serde"] }
futures = { package = "futures-util", version = "0.3" }
log = { package = "tracing", version = "0.1", features = ["log"] }
multiaddr = "0.16"
multiaddr = "0.18"
serde = "1.0"
thiserror = "1.0"
tokio = { version = "1.32", features = ["rt"] }
Expand Down
12 changes: 8 additions & 4 deletions network-libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ futures = { package = "futures-util", version = "0.3" }
hex = "0.4"
instant = { version = "0.1", features = [ "wasm-bindgen" ] }
ip_network = "0.4"
libp2p-websys-transport = { git = "https://github.com/jsdanielh/libp2p-websys-transport.git", optional = true }
log = { package = "tracing", version = "0.1", features = ["log"] }
parking_lot = "0.12"
pin-project = "1.1"
Expand All @@ -36,6 +35,8 @@ serde-big-array = "0.5"
thiserror = "1.0"
tokio = { version = "1.32", features = ["macros", "rt", "tracing"] }
tokio-stream = "0.1"
unsigned-varint = "0.8"
void = "1.0"
wasm-timer = "0.2"

nimiq-bls = { workspace = true }
Expand All @@ -51,8 +52,9 @@ nimiq-utils = { workspace = true, features = [
] }
nimiq-validator-network = { workspace = true }

# For now we need to use a fork
[target.'cfg(not(target_family = "wasm"))'.dependencies]
libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", default-features = false, features = [
libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", branch = "libp2p_0.53.1", default-features = false, features = [
"gossipsub",
"identify",
"kad",
Expand All @@ -61,11 +63,13 @@ libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", default-feature
"ping",
"request-response",
"serde",
"tokio",
"yamux",
] }

# For now we need to use a fork
[target.'cfg(target_family = "wasm")'.dependencies]
libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", default-features = false, features = [
libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", branch = "libp2p_0.53.1", default-features = false, features = [
"gossipsub",
"identify",
"kad",
Expand All @@ -76,6 +80,7 @@ libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", default-feature
"serde",
"yamux",
"wasm-bindgen",
"websocket-websys",
] }

[dev-dependencies]
Expand All @@ -88,4 +93,3 @@ nimiq-test-log = { workspace = true }
metrics = ["prometheus-client"]
tokio-time = ["tokio/time"]
tokio-websocket = ["libp2p/dns", "libp2p/tcp", "libp2p/tokio", "libp2p/websocket"]
wasm-websocket = ["libp2p-websys-transport"]
185 changes: 61 additions & 124 deletions network-libp2p/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,178 +1,114 @@
use std::{iter, sync::Arc};

use libp2p::{
core::either::EitherError,
gossipsub::{
error::GossipsubHandlerError, Gossipsub, GossipsubEvent, MessageAuthenticity,
PeerScoreParams, PeerScoreThresholds,
},
identify::{Behaviour as IdentifyBehaviour, Config as IdentifyConfig, Event as IdentifyEvent},
kad::{store::MemoryStore, Kademlia, KademliaEvent},
ping::{
Behaviour as PingBehaviour, Config as PingConfig, Event as PingEvent,
Failure as PingFailure,
},
request_response::{
ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent as ReqResEvent,
},
swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour},
Multiaddr, PeerId,
connection_limits, gossipsub, identify,
kad::{self, store::MemoryStore},
ping, request_response,
swarm::NetworkBehaviour,
Multiaddr, PeerId, StreamProtocol,
};
use nimiq_utils::time::OffsetTime;
use parking_lot::RwLock;

use crate::{
connection_pool::{
behaviour::{ConnectionPoolBehaviour, ConnectionPoolEvent},
handler::ConnectionPoolHandlerError,
},
discovery::{
behaviour::{DiscoveryBehaviour, DiscoveryEvent},
handler::DiscoveryHandlerError,
peer_contacts::PeerContactBook,
},
dispatch::codecs::typed::{IncomingRequest, MessageCodec, OutgoingResponse, ReqResProtocol},
connection_pool,
discovery::{self, peer_contacts::PeerContactBook},
dispatch::codecs::typed::MessageCodec,
Config,
};

pub type NimiqNetworkBehaviourError = EitherError<
EitherError<
EitherError<
EitherError<
EitherError<
EitherError<std::io::Error, DiscoveryHandlerError>,
GossipsubHandlerError,
>,
std::io::Error,
>,
PingFailure,
>,
ConnectionPoolHandlerError,
>,
ConnectionHandlerUpgrErr<std::io::Error>,
>;

pub type RequestResponseEvent = ReqResEvent<IncomingRequest, OutgoingResponse>;

#[derive(Debug)]
pub enum NimiqEvent {
Dht(KademliaEvent),
Discovery(DiscoveryEvent),
Gossip(GossipsubEvent),
Identify(IdentifyEvent),
Ping(PingEvent),
Pool(ConnectionPoolEvent),
RequestResponse(RequestResponseEvent),
}

impl From<KademliaEvent> for NimiqEvent {
fn from(event: KademliaEvent) -> Self {
Self::Dht(event)
}
}

impl From<DiscoveryEvent> for NimiqEvent {
fn from(event: DiscoveryEvent) -> Self {
Self::Discovery(event)
}
}

impl From<GossipsubEvent> for NimiqEvent {
fn from(event: GossipsubEvent) -> Self {
Self::Gossip(event)
}
}

impl From<IdentifyEvent> for NimiqEvent {
fn from(event: IdentifyEvent) -> Self {
Self::Identify(event)
}
}

impl From<ConnectionPoolEvent> for NimiqEvent {
fn from(event: ConnectionPoolEvent) -> Self {
Self::Pool(event)
}
}

impl From<PingEvent> for NimiqEvent {
fn from(event: PingEvent) -> Self {
Self::Ping(event)
}
}

impl From<RequestResponseEvent> for NimiqEvent {
fn from(event: RequestResponseEvent) -> Self {
Self::RequestResponse(event)
}
}

/// Maximum simultaneous libp2p connections per peer
const MAX_CONNECTIONS_PER_PEER: u32 = 2;

/// Network behaviour.
/// This is composed of several other behaviours that build a tree of behaviours using
/// the `NetworkBehaviour` macro and the order of listed behaviours matters.
/// The first behaviours are behaviours that can close connections before establishing them
/// such as connection limits and the connection pool. They must be at the top since they
/// other behaviours such as request-response do not handle well that a connection is
/// denied in a behaviour that is "after".
/// See: https://github.com/libp2p/rust-libp2p/pull/4777#discussion_r1389951783.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "NimiqEvent")]
pub struct NimiqBehaviour {
pub dht: Kademlia<MemoryStore>,
pub discovery: DiscoveryBehaviour,
pub gossipsub: Gossipsub,
pub identify: IdentifyBehaviour,
pub ping: PingBehaviour,
pub pool: ConnectionPoolBehaviour,
pub request_response: RequestResponse<MessageCodec>,
pub struct Behaviour {
pub connection_limits: connection_limits::Behaviour,
pub pool: connection_pool::Behaviour,
pub discovery: discovery::Behaviour,
pub dht: kad::Behaviour<MemoryStore>,
pub gossipsub: gossipsub::Behaviour,
pub identify: identify::Behaviour,
pub ping: ping::Behaviour,
pub request_response: request_response::Behaviour<MessageCodec>,
}

impl NimiqBehaviour {
impl Behaviour {
pub fn new(
config: Config,
clock: Arc<OffsetTime>,
contacts: Arc<RwLock<PeerContactBook>>,
peer_score_params: PeerScoreParams,
peer_score_params: gossipsub::PeerScoreParams,
) -> Self {
let public_key = config.keypair.public();
let peer_id = public_key.to_peer_id();

// DHT behaviour
let store = MemoryStore::new(peer_id);
let dht = Kademlia::with_config(peer_id, store, config.kademlia);
let mut dht = kad::Behaviour::with_config(peer_id, store, config.kademlia);
// Fixme: This could be avoided with a protocol such as Autonat that properly set external addresses to the
// swarm and also avoids us to add addresses that are purely connection candidates.
dht.set_mode(Some(kad::Mode::Server));

// Discovery behaviour
let discovery = DiscoveryBehaviour::new(
let discovery = discovery::Behaviour::new(
config.discovery.clone(),
config.keypair.clone(),
Arc::clone(&contacts),
clock,
);

// Gossipsub behaviour
let thresholds = PeerScoreThresholds::default();
let mut gossipsub = Gossipsub::new(MessageAuthenticity::Author(peer_id), config.gossipsub)
.expect("Wrong configuration");
let thresholds = gossipsub::PeerScoreThresholds::default();
let mut gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Author(peer_id),
config.gossipsub,
)
.expect("Wrong configuration");
gossipsub
.with_peer_score(peer_score_params, thresholds)
.expect("Valid score params and thresholds");

// Identify behaviour
let identify_config = IdentifyConfig::new("/albatross/2.0".to_string(), public_key);
let identify = IdentifyBehaviour::new(identify_config);
let identify_config = identify::Config::new("/albatross/2.0".to_string(), public_key);
let identify = identify::Behaviour::new(identify_config);

// Ping behaviour:
// - Send a ping every 15 seconds and timeout at 20 seconds.
// - The ping behaviour will close the connection if a ping timeouts.
let ping = PingBehaviour::new(PingConfig::new());
let ping = ping::Behaviour::new(ping::Config::new());

// Connection pool behaviour
let pool = ConnectionPoolBehaviour::new(
let pool = connection_pool::Behaviour::new(
Arc::clone(&contacts),
peer_id,
config.seeds,
config.discovery.required_services,
);

// Request Response behaviour
let codec = MessageCodec::default();
let protocol = ReqResProtocol::Version1;
let config = RequestResponseConfig::default();
let request_response =
RequestResponse::new(codec, iter::once((protocol, ProtocolSupport::Full)), config);
let protocol = StreamProtocol::new("/nimiq/reqres/0.0.1");
let config = request_response::Config::default();
let request_response = request_response::Behaviour::new(
iter::once((protocol, request_response::ProtocolSupport::Full)),
config,
);

// Connection limits behaviour
let limits = connection_limits::ConnectionLimits::default()
.with_max_pending_incoming(Some(16))
.with_max_pending_outgoing(Some(16))
.with_max_established_incoming(Some(4800))
.with_max_established_outgoing(Some(4800))
.with_max_established_per_peer(Some(MAX_CONNECTIONS_PER_PEER));
let connection_limits = connection_limits::Behaviour::new(limits);

Self {
dht,
Expand All @@ -182,6 +118,7 @@ impl NimiqBehaviour {
ping,
pool,
request_response,
connection_limits,
}
}

Expand Down
27 changes: 11 additions & 16 deletions network-libp2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@ use std::{
time::Duration,
};

use libp2p::{
gossipsub::{GossipsubConfig, GossipsubConfigBuilder, MessageId},
identity::Keypair,
kad::{KademliaBucketInserts, KademliaConfig, KademliaStoreInserts},
Multiaddr,
};
use libp2p::{gossipsub, identity::Keypair, kad, Multiaddr};
use nimiq_hash::Blake2bHash;
use nimiq_network_interface::peer_info::Services;

use crate::discovery::{behaviour::DiscoveryConfig, peer_contacts::PeerContact};
use crate::discovery::{self, peer_contacts::PeerContact};

/// TLS settings for configuring a secure WebSocket
pub struct TlsConfig {
Expand All @@ -29,9 +24,9 @@ pub struct Config {
pub keypair: Keypair,
pub peer_contact: PeerContact,
pub seeds: Vec<Multiaddr>,
pub discovery: DiscoveryConfig,
pub kademlia: KademliaConfig,
pub gossipsub: GossipsubConfig,
pub discovery: discovery::Config,
pub kademlia: kad::Config,
pub gossipsub: gossipsub::Config,
pub memory_transport: bool,
pub required_services: Services,
pub tls: Option<TlsConfig>,
Expand All @@ -49,7 +44,7 @@ impl Config {
) -> Self {
// Hardcoding the minimum number of peers in mesh network before adding more
// TODO: Maybe change this to a mesh limits configuration argument of this function
let gossipsub = GossipsubConfigBuilder::default()
let gossipsub = gossipsub::ConfigBuilder::default()
.mesh_n_low(3)
.validate_messages()
.max_transmit_size(1_000_000) // TODO find a reasonable value for this parameter
Expand All @@ -61,25 +56,25 @@ impl Config {
let mut s = DefaultHasher::new();
message.topic.hash(&mut s);
message.data.hash(&mut s);
MessageId::from(s.finish().to_string())
gossipsub::MessageId::from(s.finish().to_string())
})
.build()
.expect("Invalid Gossipsub config");

let mut kademlia = KademliaConfig::default();
kademlia.set_kbucket_inserts(KademliaBucketInserts::OnConnected);
let mut kademlia = kad::Config::default();
kademlia.set_kbucket_inserts(kad::BucketInserts::OnConnected);
kademlia.set_record_ttl(Some(Duration::from_secs(5 * 60)));
kademlia.set_publication_interval(Some(Duration::from_secs(60)));

// Since we have a record TTL of 5 minutes, record replication is not needed right now
kademlia.set_replication_interval(None);
kademlia.set_record_filtering(KademliaStoreInserts::FilterBoth);
kademlia.set_record_filtering(kad::StoreInserts::FilterBoth);

Self {
keypair,
peer_contact,
seeds,
discovery: DiscoveryConfig::new(genesis_hash, required_services),
discovery: discovery::Config::new(genesis_hash, required_services),
kademlia,
gossipsub,
memory_transport,
Expand Down
Loading

0 comments on commit c447dc2

Please sign in to comment.