Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Revert "Network protocol upgrades (#2345)" #2388

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,129 changes: 534 additions & 595 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.52.1 AS builder
FROM rust:1.50.0 AS builder
RUN apt-get update && apt-get install -y cmake
COPY . lighthouse
ARG PORTABLE
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/eth2_libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Sigma Prime <[email protected]>"]
edition = "2018"

[dependencies]
discv5 = { version = "0.1.0-beta.4", features = ["libp2p"] }
discv5 = { git = "https://github.com/sigp/discv5 ", rev = "02d2c896c66f8dc2b848c3996fedcd98e1dfec69", features = ["libp2p"] }
unsigned-varint = { version = "0.6.0", features = ["codec"] }
types = { path = "../../consensus/types" }
hashset_delay = { path = "../../common/hashset_delay" }
Expand Down Expand Up @@ -42,9 +42,9 @@ regex = "1.3.9"
strum = { version = "0.20", features = ["derive"] }

[dependencies.libp2p]
version = "0.38.0"
version = "0.35.1"
default-features = false
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns-tokio", "tcp-tokio"]
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "tcp-tokio"]

[dev-dependencies]
tokio = { version = "1.1.0", features = ["full"] }
Expand Down
8 changes: 4 additions & 4 deletions beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<TSpec: EthSpec> DelegatingHandler<TSpec> {

/// Wrapper around the `ProtocolsHandler::InEvent` types of the handlers.
/// Simply delegated to the corresponding behaviour's handler.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum DelegateIn<TSpec: EthSpec> {
Gossipsub(<GossipHandler as ProtocolsHandler>::InEvent),
RPC(<RPCHandler<TSpec> as ProtocolsHandler>::InEvent),
Expand Down Expand Up @@ -141,8 +141,8 @@ impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
.max(identify_proto.timeout());

let select = SelectUpgrade::new(
gossip_proto.into_upgrade().0,
SelectUpgrade::new(rpc_proto.into_upgrade().0, identify_proto.into_upgrade().0),
gossip_proto.into_upgrade().1,
SelectUpgrade::new(rpc_proto.into_upgrade().1, identify_proto.into_upgrade().1),
);

SubstreamProtocol::new(select, ()).with_timeout(timeout)
Expand Down Expand Up @@ -202,7 +202,7 @@ impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
match event {
DelegateIn::Gossipsub(ev) => self.gossip_handler.inject_event(ev),
DelegateIn::RPC(ev) => self.rpc_handler.inject_event(ev),
DelegateIn::Identify(ev) => self.identify_handler.inject_event(ev),
DelegateIn::Identify(()) => self.identify_handler.inject_event(()),
}
}

Expand Down
1 change: 1 addition & 0 deletions beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl<TSpec: EthSpec> BehaviourHandler<TSpec> {
}
}

#[derive(Clone)]
pub enum BehaviourHandlerIn<TSpec: EthSpec> {
Delegate(DelegateIn<TSpec>),
/// Start the shutdown process.
Expand Down
37 changes: 20 additions & 17 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use libp2p::{
Gossipsub as BaseGossipsub, GossipsubEvent, IdentTopic as Topic, MessageAcceptance,
MessageAuthenticity, MessageId, PeerScoreThresholds,
},
identify::{Identify, IdentifyConfig, IdentifyEvent},
identify::{Identify, IdentifyEvent},
swarm::{
AddressScore, NetworkBehaviour, NetworkBehaviourAction as NBAction, NotifyHandler,
PollParameters, ProtocolsHandler,
Expand Down Expand Up @@ -151,14 +151,18 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
) -> error::Result<Self> {
let behaviour_log = log.new(o!());

let identify_config = if net_conf.private {
IdentifyConfig::new(
let identify = if net_conf.private {
Identify::new(
"".into(),
"".into(),
local_key.public(), // Still send legitimate public key
)
} else {
IdentifyConfig::new("eth2/1.0.0".into(), local_key.public())
.with_agent_version(lighthouse_version::version_with_platform())
Identify::new(
"lighthouse/libp2p".into(),
lighthouse_version::version_with_platform(),
local_key.public(),
)
};

let enr_fork_id = network_globals
Expand Down Expand Up @@ -217,7 +221,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
Ok(Behaviour {
eth2_rpc: RPC::new(log.clone()),
gossipsub,
identify: Identify::new(identify_config),
identify,
peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)
.await?,
events: VecDeque::new(),
Expand Down Expand Up @@ -898,7 +902,11 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {

fn on_identify_event(&mut self, event: IdentifyEvent) {
match event {
IdentifyEvent::Received { peer_id, mut info } => {
IdentifyEvent::Received {
peer_id,
mut info,
observed_addr,
} => {
if info.listen_addrs.len() > MAX_IDENTIFY_ADDRESSES {
debug!(
self.log,
Expand All @@ -913,13 +921,12 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
"protocol_version" => info.protocol_version,
"agent_version" => info.agent_version,
"listening_ addresses" => ?info.listen_addrs,
"observed_address" => ?info.observed_addr,
"observed_address" => ?observed_addr,
"protocols" => ?info.protocols
);
}
IdentifyEvent::Sent { .. } => {}
IdentifyEvent::Error { .. } => {}
IdentifyEvent::Pushed { .. } => {}
}
}

Expand Down Expand Up @@ -1164,16 +1171,12 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
delegate_to_behaviours!(self, inject_dial_failure, peer_id);
}

fn inject_new_listener(&mut self, id: ListenerId) {
delegate_to_behaviours!(self, inject_new_listener, id);
}

fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
delegate_to_behaviours!(self, inject_new_listen_addr, id, addr);
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
delegate_to_behaviours!(self, inject_new_listen_addr, addr);
}

fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
delegate_to_behaviours!(self, inject_expired_listen_addr, id, addr);
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
delegate_to_behaviours!(self, inject_expired_listen_addr, addr);
}

fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
Expand Down
1 change: 0 additions & 1 deletion beacon_node/eth2_libp2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ impl Default for Config {
.query_parallelism(5)
.disable_report_discovered_peers()
.ip_limit() // limits /24 IP's in buckets.
.incoming_bucket_limit(8) // half the bucket size
.ping_interval(Duration::from_secs(300))
.build();

Expand Down
26 changes: 8 additions & 18 deletions beacon_node/eth2_libp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
let mut subscribed_topics: Vec<GossipKind> = vec![];

for topic_kind in &config.topics {
if swarm.behaviour_mut().subscribe_kind(topic_kind.clone()) {
if swarm.subscribe_kind(topic_kind.clone()) {
subscribed_topics.push(topic_kind.clone());
} else {
warn!(log, "Could not subscribe to topic"; "topic" => %topic_kind);
Expand All @@ -244,9 +244,7 @@ impl<TSpec: EthSpec> Service<TSpec> {

/// Sends a request to a peer, with a given Id.
pub fn send_request(&mut self, peer_id: PeerId, request_id: RequestId, request: Request) {
self.swarm
.behaviour_mut()
.send_request(peer_id, request_id, request);
self.swarm.send_request(peer_id, request_id, request);
}

/// Informs the peer that their request failed.
Expand All @@ -257,30 +255,22 @@ impl<TSpec: EthSpec> Service<TSpec> {
error: RPCResponseErrorCode,
reason: String,
) {
self.swarm
.behaviour_mut()
._send_error_reponse(peer_id, id, error, reason);
self.swarm._send_error_reponse(peer_id, id, error, reason);
}

/// Report a peer's action.
pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction, source: ReportSource) {
self.swarm
.behaviour_mut()
.report_peer(peer_id, action, source);
self.swarm.report_peer(peer_id, action, source);
}

/// Disconnect and ban a peer, providing a reason.
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) {
self.swarm
.behaviour_mut()
.goodbye_peer(peer_id, reason, source);
self.swarm.goodbye_peer(peer_id, reason, source);
}

/// Sends a response to a peer's request.
pub fn send_response(&mut self, peer_id: PeerId, id: PeerRequestId, response: Response<TSpec>) {
self.swarm
.behaviour_mut()
.send_successful_response(peer_id, id, response);
self.swarm.send_successful_response(peer_id, id, response);
}

pub async fn next_event(&mut self) -> Libp2pEvent<TSpec> {
Expand Down Expand Up @@ -360,8 +350,8 @@ type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;
fn build_transport(
local_private_key: Keypair,
) -> std::io::Result<(BoxedTransport, Arc<BandwidthSinks>)> {
let tcp = libp2p::tcp::TokioTcpConfig::new().nodelay(true);
let transport = libp2p::dns::TokioDnsConfig::system(tcp)?;
let transport = libp2p::tcp::TokioTcpConfig::new().nodelay(true);
let transport = libp2p::dns::DnsConfig::new(transport)?;
#[cfg(feature = "libp2p-websocket")]
let transport = {
let trans_clone = transport.clone();
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/eth2_libp2p/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub async fn build_libp2p_instance(

#[allow(dead_code)]
pub fn get_enr(node: &LibP2PService<E>) -> Enr {
node.swarm.behaviour().local_enr()
node.swarm.local_enr()
}

// Returns `n` libp2p peers in fully connected topology.
Expand Down Expand Up @@ -171,7 +171,7 @@ pub async fn build_node_pair(
let mut sender = build_libp2p_instance(rt.clone(), vec![], sender_log).await;
let mut receiver = build_libp2p_instance(rt, vec![], receiver_log).await;

let receiver_multiaddr = receiver.swarm.behaviour_mut().local_enr().multiaddr()[1].clone();
let receiver_multiaddr = receiver.swarm.local_enr().multiaddr()[1].clone();

// let the two nodes set up listeners
let sender_fut = async {
Expand Down
32 changes: 16 additions & 16 deletions beacon_node/eth2_libp2p/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn test_status_rpc() {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.swarm.behaviour_mut().send_request(
sender.swarm.send_request(
peer_id,
RequestId::Sync(10),
rpc_request.clone(),
Expand Down Expand Up @@ -90,7 +90,7 @@ fn test_status_rpc() {
if request == rpc_request {
// send the response
debug!(log, "Receiver Received");
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
peer_id,
id,
rpc_response.clone(),
Expand Down Expand Up @@ -155,7 +155,7 @@ fn test_blocks_by_range_chunked_rpc() {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.swarm.behaviour_mut().send_request(
sender.swarm.send_request(
peer_id,
RequestId::Sync(10),
rpc_request.clone(),
Expand Down Expand Up @@ -200,14 +200,14 @@ fn test_blocks_by_range_chunked_rpc() {
// send the response
warn!(log, "Receiver got request");
for _ in 1..=messages_to_send {
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
peer_id,
id,
rpc_response.clone(),
);
}
// send the stream termination
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
peer_id,
id,
Response::BlocksByRange(None),
Expand Down Expand Up @@ -272,7 +272,7 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.swarm.behaviour_mut().send_request(
sender.swarm.send_request(
peer_id,
RequestId::Sync(10),
rpc_request.clone(),
Expand Down Expand Up @@ -341,7 +341,7 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() {
if message_info.is_some() {
messages_sent += 1;
let (peer_id, stream_id) = message_info.as_ref().unwrap();
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
*peer_id,
*stream_id,
rpc_response.clone(),
Expand Down Expand Up @@ -407,7 +407,7 @@ fn test_blocks_by_range_single_empty_rpc() {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.swarm.behaviour_mut().send_request(
sender.swarm.send_request(
peer_id,
RequestId::Sync(10),
rpc_request.clone(),
Expand Down Expand Up @@ -450,14 +450,14 @@ fn test_blocks_by_range_single_empty_rpc() {
warn!(log, "Receiver got request");

for _ in 1..=messages_to_send {
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
peer_id,
id,
rpc_response.clone(),
);
}
// send the stream termination
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
peer_id,
id,
Response::BlocksByRange(None),
Expand Down Expand Up @@ -525,7 +525,7 @@ fn test_blocks_by_root_chunked_rpc() {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.swarm.behaviour_mut().send_request(
sender.swarm.send_request(
peer_id,
RequestId::Sync(10),
rpc_request.clone(),
Expand Down Expand Up @@ -568,15 +568,15 @@ fn test_blocks_by_root_chunked_rpc() {
debug!(log, "Receiver got request");

for _ in 1..=messages_to_send {
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
peer_id,
id,
rpc_response.clone(),
);
debug!(log, "Sending message");
}
// send the stream termination
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
peer_id,
id,
Response::BlocksByRange(None),
Expand Down Expand Up @@ -649,7 +649,7 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.swarm.behaviour_mut().send_request(
sender.swarm.send_request(
peer_id,
RequestId::Sync(10),
rpc_request.clone(),
Expand Down Expand Up @@ -718,7 +718,7 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() {
if message_info.is_some() {
messages_sent += 1;
let (peer_id, stream_id) = message_info.as_ref().unwrap();
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
*peer_id,
*stream_id,
rpc_response.clone(),
Expand Down Expand Up @@ -764,7 +764,7 @@ fn test_goodbye_rpc() {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a goodbye and disconnect
debug!(log, "Sending RPC");
sender.swarm.behaviour_mut().goodbye_peer(
sender.swarm.goodbye_peer(
&peer_id,
GoodbyeReason::IrrelevantNetwork,
ReportSource::SyncService,
Expand Down
1 change: 1 addition & 0 deletions beacon_node/http_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ futures = "0.3.8"
store = { path = "../store" }
environment = { path = "../../lighthouse/environment" }
tree_hash = "0.1.1"
discv5 = { git = "https://github.com/sigp/discv5 ", rev = "02d2c896c66f8dc2b848c3996fedcd98e1dfec69", features = ["libp2p"] }
sensitive_url = { path = "../../common/sensitive_url" }
Loading