Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade libp2p to version 0.53.1 #1977

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,438 changes: 675 additions & 1,763 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,5 @@ tokio-console = ["console-subscriber", "logging", "tokio/tracing"]
tokio-websocket = ["nimiq-network-libp2p/tokio-websocket"]
validator = ["database-storage", "nimiq-mempool", "nimiq-validator", "nimiq-validator-network", "nimiq-rpc-server"]
wallet = ["database-storage", "nimiq-wallet"]
wasm-websocket = ["nimiq-network-libp2p/wasm-websocket"]
web-logging = ["nimiq-log", "time/wasm-bindgen", "tracing-subscriber", "tracing-web"]
zkp-prover = ["nimiq-zkp/zkp-prover", "nimiq-zkp-circuits/zkp-prover", "nimiq-zkp-component/zkp-prover", "nimiq-zkp-primitives/zkp-prover"]
2 changes: 1 addition & 1 deletion network-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async-trait = "0.1"
bitflags = { version = "2.0", features = ["serde"] }
futures = { package = "futures-util", version = "0.3" }
log = { package = "tracing", version = "0.1", features = ["log"] }
multiaddr = "0.16"
multiaddr = "0.18"
serde = "1.0"
thiserror = "1.0"
tokio = { version = "1.32", features = ["rt"] }
Expand Down
12 changes: 8 additions & 4 deletions network-libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ futures = { package = "futures-util", version = "0.3" }
hex = "0.4"
instant = { version = "0.1", features = [ "wasm-bindgen" ] }
ip_network = "0.4"
libp2p-websys-transport = { git = "https://github.com/jsdanielh/libp2p-websys-transport.git", optional = true }
log = { package = "tracing", version = "0.1", features = ["log"] }
parking_lot = "0.12"
pin-project = "1.1"
Expand All @@ -36,6 +35,8 @@ serde-big-array = "0.5"
thiserror = "1.0"
tokio = { version = "1.32", features = ["macros", "rt", "tracing"] }
tokio-stream = "0.1"
unsigned-varint = "0.8"
void = "1.0"
wasm-timer = "0.2"

nimiq-bls = { workspace = true }
Expand All @@ -51,8 +52,9 @@ nimiq-utils = { workspace = true, features = [
] }
nimiq-validator-network = { workspace = true }

# For now we need to use a fork
[target.'cfg(not(target_family = "wasm"))'.dependencies]
libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", default-features = false, features = [
libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", branch = "libp2p_0.53.1", default-features = false, features = [
hrxi marked this conversation as resolved.
Show resolved Hide resolved
"gossipsub",
"identify",
"kad",
Expand All @@ -61,11 +63,13 @@ libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", default-feature
"ping",
"request-response",
"serde",
"tokio",
"yamux",
] }

# For now we need to use a fork
[target.'cfg(target_family = "wasm")'.dependencies]
libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", default-features = false, features = [
libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", branch = "libp2p_0.53.1", default-features = false, features = [
"gossipsub",
"identify",
"kad",
Expand All @@ -76,6 +80,7 @@ libp2p = { git = "https://github.com/jsdanielh/rust-libp2p.git", default-feature
"serde",
"yamux",
"wasm-bindgen",
"websocket-websys",
] }

[dev-dependencies]
Expand All @@ -88,4 +93,3 @@ nimiq-test-log = { workspace = true }
metrics = ["prometheus-client"]
tokio-time = ["tokio/time"]
tokio-websocket = ["libp2p/dns", "libp2p/tcp", "libp2p/tokio", "libp2p/websocket"]
wasm-websocket = ["libp2p-websys-transport"]
185 changes: 61 additions & 124 deletions network-libp2p/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,178 +1,114 @@
use std::{iter, sync::Arc};

use libp2p::{
core::either::EitherError,
gossipsub::{
error::GossipsubHandlerError, Gossipsub, GossipsubEvent, MessageAuthenticity,
PeerScoreParams, PeerScoreThresholds,
},
identify::{Behaviour as IdentifyBehaviour, Config as IdentifyConfig, Event as IdentifyEvent},
kad::{store::MemoryStore, Kademlia, KademliaEvent},
ping::{
Behaviour as PingBehaviour, Config as PingConfig, Event as PingEvent,
Failure as PingFailure,
},
request_response::{
ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent as ReqResEvent,
},
swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour},
Multiaddr, PeerId,
connection_limits, gossipsub, identify,
kad::{self, store::MemoryStore},
ping, request_response,
swarm::NetworkBehaviour,
Multiaddr, PeerId, StreamProtocol,
};
use nimiq_utils::time::OffsetTime;
use parking_lot::RwLock;

use crate::{
connection_pool::{
behaviour::{ConnectionPoolBehaviour, ConnectionPoolEvent},
handler::ConnectionPoolHandlerError,
},
discovery::{
behaviour::{DiscoveryBehaviour, DiscoveryEvent},
handler::DiscoveryHandlerError,
peer_contacts::PeerContactBook,
},
dispatch::codecs::typed::{IncomingRequest, MessageCodec, OutgoingResponse, ReqResProtocol},
connection_pool,
discovery::{self, peer_contacts::PeerContactBook},
dispatch::codecs::typed::MessageCodec,
Config,
};

pub type NimiqNetworkBehaviourError = EitherError<
EitherError<
EitherError<
EitherError<
EitherError<
EitherError<std::io::Error, DiscoveryHandlerError>,
GossipsubHandlerError,
>,
std::io::Error,
>,
PingFailure,
>,
ConnectionPoolHandlerError,
>,
ConnectionHandlerUpgrErr<std::io::Error>,
>;
jsdanielh marked this conversation as resolved.
Show resolved Hide resolved

pub type RequestResponseEvent = ReqResEvent<IncomingRequest, OutgoingResponse>;

#[derive(Debug)]
pub enum NimiqEvent {
Dht(KademliaEvent),
Discovery(DiscoveryEvent),
Gossip(GossipsubEvent),
Identify(IdentifyEvent),
Ping(PingEvent),
Pool(ConnectionPoolEvent),
RequestResponse(RequestResponseEvent),
}

impl From<KademliaEvent> for NimiqEvent {
fn from(event: KademliaEvent) -> Self {
Self::Dht(event)
}
}

impl From<DiscoveryEvent> for NimiqEvent {
fn from(event: DiscoveryEvent) -> Self {
Self::Discovery(event)
}
}

impl From<GossipsubEvent> for NimiqEvent {
fn from(event: GossipsubEvent) -> Self {
Self::Gossip(event)
}
}

impl From<IdentifyEvent> for NimiqEvent {
fn from(event: IdentifyEvent) -> Self {
Self::Identify(event)
}
}

impl From<ConnectionPoolEvent> for NimiqEvent {
fn from(event: ConnectionPoolEvent) -> Self {
Self::Pool(event)
}
}

impl From<PingEvent> for NimiqEvent {
fn from(event: PingEvent) -> Self {
Self::Ping(event)
}
}

impl From<RequestResponseEvent> for NimiqEvent {
fn from(event: RequestResponseEvent) -> Self {
Self::RequestResponse(event)
}
}

/// Maximum simultaneous libp2p connections per peer
const MAX_CONNECTIONS_PER_PEER: u32 = 2;

/// Network behaviour.
/// This is composed of several other behaviours that build a tree of behaviours using
/// the `NetworkBehaviour` macro and the order of listed behaviours matters.
/// The first behaviours are behaviours that can close connections before establishing them
/// such as connection limits and the connection pool. They must be at the top since they
/// other behaviours such as request-response do not handle well that a connection is
/// denied in a behaviour that is "after".
/// See: https://github.com/libp2p/rust-libp2p/pull/4777#discussion_r1389951783.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "NimiqEvent")]
pub struct NimiqBehaviour {
pub dht: Kademlia<MemoryStore>,
pub discovery: DiscoveryBehaviour,
pub gossipsub: Gossipsub,
pub identify: IdentifyBehaviour,
pub ping: PingBehaviour,
pub pool: ConnectionPoolBehaviour,
pub request_response: RequestResponse<MessageCodec>,
pub struct Behaviour {
pub connection_limits: connection_limits::Behaviour,
pub pool: connection_pool::Behaviour,
pub discovery: discovery::Behaviour,
pub dht: kad::Behaviour<MemoryStore>,
pub gossipsub: gossipsub::Behaviour,
pub identify: identify::Behaviour,
pub ping: ping::Behaviour,
pub request_response: request_response::Behaviour<MessageCodec>,
}

impl NimiqBehaviour {
impl Behaviour {
pub fn new(
config: Config,
clock: Arc<OffsetTime>,
contacts: Arc<RwLock<PeerContactBook>>,
peer_score_params: PeerScoreParams,
peer_score_params: gossipsub::PeerScoreParams,
) -> Self {
let public_key = config.keypair.public();
let peer_id = public_key.to_peer_id();

// DHT behaviour
let store = MemoryStore::new(peer_id);
let dht = Kademlia::with_config(peer_id, store, config.kademlia);
let mut dht = kad::Behaviour::with_config(peer_id, store, config.kademlia);
// Fixme: This could be avoided with a protocol such as Autonat that properly set external addresses to the
// swarm and also avoids us to add addresses that are purely connection candidates.
dht.set_mode(Some(kad::Mode::Server));

// Discovery behaviour
let discovery = DiscoveryBehaviour::new(
let discovery = discovery::Behaviour::new(
config.discovery.clone(),
config.keypair.clone(),
Arc::clone(&contacts),
clock,
);

// Gossipsub behaviour
let thresholds = PeerScoreThresholds::default();
let mut gossipsub = Gossipsub::new(MessageAuthenticity::Author(peer_id), config.gossipsub)
.expect("Wrong configuration");
let thresholds = gossipsub::PeerScoreThresholds::default();
let mut gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Author(peer_id),
config.gossipsub,
)
.expect("Wrong configuration");
gossipsub
.with_peer_score(peer_score_params, thresholds)
.expect("Valid score params and thresholds");

// Identify behaviour
let identify_config = IdentifyConfig::new("/albatross/2.0".to_string(), public_key);
let identify = IdentifyBehaviour::new(identify_config);
let identify_config = identify::Config::new("/albatross/2.0".to_string(), public_key);
let identify = identify::Behaviour::new(identify_config);

// Ping behaviour:
// - Send a ping every 15 seconds and timeout at 20 seconds.
// - The ping behaviour will close the connection if a ping timeouts.
let ping = PingBehaviour::new(PingConfig::new());
let ping = ping::Behaviour::new(ping::Config::new());

// Connection pool behaviour
let pool = ConnectionPoolBehaviour::new(
let pool = connection_pool::Behaviour::new(
Arc::clone(&contacts),
peer_id,
config.seeds,
config.discovery.required_services,
);

// Request Response behaviour
let codec = MessageCodec::default();
let protocol = ReqResProtocol::Version1;
let config = RequestResponseConfig::default();
let request_response =
RequestResponse::new(codec, iter::once((protocol, ProtocolSupport::Full)), config);
let protocol = StreamProtocol::new("/nimiq/reqres/0.0.1");
jsdanielh marked this conversation as resolved.
Show resolved Hide resolved
let config = request_response::Config::default();
let request_response = request_response::Behaviour::new(
iter::once((protocol, request_response::ProtocolSupport::Full)),
config,
);

// Connection limits behaviour
let limits = connection_limits::ConnectionLimits::default()
.with_max_pending_incoming(Some(16))
.with_max_pending_outgoing(Some(16))
.with_max_established_incoming(Some(4800))
.with_max_established_outgoing(Some(4800))
.with_max_established_per_peer(Some(MAX_CONNECTIONS_PER_PEER));
let connection_limits = connection_limits::Behaviour::new(limits);

Self {
dht,
Expand All @@ -182,6 +118,7 @@ impl NimiqBehaviour {
ping,
pool,
request_response,
connection_limits,
}
}

Expand Down
27 changes: 11 additions & 16 deletions network-libp2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@ use std::{
time::Duration,
};

use libp2p::{
gossipsub::{GossipsubConfig, GossipsubConfigBuilder, MessageId},
identity::Keypair,
kad::{KademliaBucketInserts, KademliaConfig, KademliaStoreInserts},
Multiaddr,
};
use libp2p::{gossipsub, identity::Keypair, kad, Multiaddr};
use nimiq_hash::Blake2bHash;
use nimiq_network_interface::peer_info::Services;

use crate::discovery::{behaviour::DiscoveryConfig, peer_contacts::PeerContact};
use crate::discovery::{self, peer_contacts::PeerContact};

/// TLS settings for configuring a secure WebSocket
pub struct TlsConfig {
Expand All @@ -29,9 +24,9 @@ pub struct Config {
pub keypair: Keypair,
pub peer_contact: PeerContact,
pub seeds: Vec<Multiaddr>,
pub discovery: DiscoveryConfig,
pub kademlia: KademliaConfig,
pub gossipsub: GossipsubConfig,
pub discovery: discovery::Config,
pub kademlia: kad::Config,
pub gossipsub: gossipsub::Config,
pub memory_transport: bool,
pub required_services: Services,
pub tls: Option<TlsConfig>,
Expand All @@ -49,7 +44,7 @@ impl Config {
) -> Self {
// Hardcoding the minimum number of peers in mesh network before adding more
// TODO: Maybe change this to a mesh limits configuration argument of this function
let gossipsub = GossipsubConfigBuilder::default()
let gossipsub = gossipsub::ConfigBuilder::default()
.mesh_n_low(3)
.validate_messages()
.max_transmit_size(1_000_000) // TODO find a reasonable value for this parameter
Expand All @@ -61,25 +56,25 @@ impl Config {
let mut s = DefaultHasher::new();
message.topic.hash(&mut s);
message.data.hash(&mut s);
MessageId::from(s.finish().to_string())
gossipsub::MessageId::from(s.finish().to_string())
})
.build()
.expect("Invalid Gossipsub config");

let mut kademlia = KademliaConfig::default();
kademlia.set_kbucket_inserts(KademliaBucketInserts::OnConnected);
let mut kademlia = kad::Config::default();
kademlia.set_kbucket_inserts(kad::BucketInserts::OnConnected);
kademlia.set_record_ttl(Some(Duration::from_secs(5 * 60)));
kademlia.set_publication_interval(Some(Duration::from_secs(60)));

// Since we have a record TTL of 5 minutes, record replication is not needed right now
kademlia.set_replication_interval(None);
kademlia.set_record_filtering(KademliaStoreInserts::FilterBoth);
kademlia.set_record_filtering(kad::StoreInserts::FilterBoth);

Self {
keypair,
peer_contact,
seeds,
discovery: DiscoveryConfig::new(genesis_hash, required_services),
discovery: discovery::Config::new(genesis_hash, required_services),
kademlia,
gossipsub,
memory_transport,
Expand Down
Loading
Loading