Skip to content

Commit

Permalink
Revert "Network protocol upgrades (#2345)" (#2388)
Browse files Browse the repository at this point in the history
## Issue Addressed

NA

## Proposed Changes

Reverts #2345 in the interests of getting v1.4.0 out this week. Once we have released that, we can go back to testing this again.

## Additional Info

NA
  • Loading branch information
paulhauner committed Jun 2, 2021
1 parent d34f922 commit 90ea075
Show file tree
Hide file tree
Showing 18 changed files with 613 additions and 691 deletions.
1,129 changes: 534 additions & 595 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.52.1 AS builder
FROM rust:1.50.0 AS builder
RUN apt-get update && apt-get install -y cmake
COPY . lighthouse
ARG PORTABLE
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/eth2_libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Sigma Prime <[email protected]>"]
edition = "2018"

[dependencies]
discv5 = { version = "0.1.0-beta.4", features = ["libp2p"] }
discv5 = { git = "https://github.com/sigp/discv5 ", rev = "02d2c896c66f8dc2b848c3996fedcd98e1dfec69", features = ["libp2p"] }
unsigned-varint = { version = "0.6.0", features = ["codec"] }
types = { path = "../../consensus/types" }
hashset_delay = { path = "../../common/hashset_delay" }
Expand Down Expand Up @@ -42,9 +42,9 @@ regex = "1.3.9"
strum = { version = "0.20", features = ["derive"] }

[dependencies.libp2p]
version = "0.38.0"
version = "0.35.1"
default-features = false
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns-tokio", "tcp-tokio"]
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "tcp-tokio"]

[dev-dependencies]
tokio = { version = "1.1.0", features = ["full"] }
Expand Down
8 changes: 4 additions & 4 deletions beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<TSpec: EthSpec> DelegatingHandler<TSpec> {

/// Wrapper around the `ProtocolsHandler::InEvent` types of the handlers.
/// Simply delegated to the corresponding behaviour's handler.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum DelegateIn<TSpec: EthSpec> {
Gossipsub(<GossipHandler as ProtocolsHandler>::InEvent),
RPC(<RPCHandler<TSpec> as ProtocolsHandler>::InEvent),
Expand Down Expand Up @@ -141,8 +141,8 @@ impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
.max(identify_proto.timeout());

let select = SelectUpgrade::new(
gossip_proto.into_upgrade().0,
SelectUpgrade::new(rpc_proto.into_upgrade().0, identify_proto.into_upgrade().0),
gossip_proto.into_upgrade().1,
SelectUpgrade::new(rpc_proto.into_upgrade().1, identify_proto.into_upgrade().1),
);

SubstreamProtocol::new(select, ()).with_timeout(timeout)
Expand Down Expand Up @@ -202,7 +202,7 @@ impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
match event {
DelegateIn::Gossipsub(ev) => self.gossip_handler.inject_event(ev),
DelegateIn::RPC(ev) => self.rpc_handler.inject_event(ev),
DelegateIn::Identify(ev) => self.identify_handler.inject_event(ev),
DelegateIn::Identify(()) => self.identify_handler.inject_event(()),
}
}

Expand Down
1 change: 1 addition & 0 deletions beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl<TSpec: EthSpec> BehaviourHandler<TSpec> {
}
}

#[derive(Clone)]
pub enum BehaviourHandlerIn<TSpec: EthSpec> {
Delegate(DelegateIn<TSpec>),
/// Start the shutdown process.
Expand Down
37 changes: 20 additions & 17 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use libp2p::{
Gossipsub as BaseGossipsub, GossipsubEvent, IdentTopic as Topic, MessageAcceptance,
MessageAuthenticity, MessageId, PeerScoreThresholds,
},
identify::{Identify, IdentifyConfig, IdentifyEvent},
identify::{Identify, IdentifyEvent},
swarm::{
AddressScore, NetworkBehaviour, NetworkBehaviourAction as NBAction, NotifyHandler,
PollParameters, ProtocolsHandler,
Expand Down Expand Up @@ -151,14 +151,18 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
) -> error::Result<Self> {
let behaviour_log = log.new(o!());

let identify_config = if net_conf.private {
IdentifyConfig::new(
let identify = if net_conf.private {
Identify::new(
"".into(),
"".into(),
local_key.public(), // Still send legitimate public key
)
} else {
IdentifyConfig::new("eth2/1.0.0".into(), local_key.public())
.with_agent_version(lighthouse_version::version_with_platform())
Identify::new(
"lighthouse/libp2p".into(),
lighthouse_version::version_with_platform(),
local_key.public(),
)
};

let enr_fork_id = network_globals
Expand Down Expand Up @@ -217,7 +221,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
Ok(Behaviour {
eth2_rpc: RPC::new(log.clone()),
gossipsub,
identify: Identify::new(identify_config),
identify,
peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)
.await?,
events: VecDeque::new(),
Expand Down Expand Up @@ -898,7 +902,11 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {

fn on_identify_event(&mut self, event: IdentifyEvent) {
match event {
IdentifyEvent::Received { peer_id, mut info } => {
IdentifyEvent::Received {
peer_id,
mut info,
observed_addr,
} => {
if info.listen_addrs.len() > MAX_IDENTIFY_ADDRESSES {
debug!(
self.log,
Expand All @@ -913,13 +921,12 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
"protocol_version" => info.protocol_version,
"agent_version" => info.agent_version,
"listening_ addresses" => ?info.listen_addrs,
"observed_address" => ?info.observed_addr,
"observed_address" => ?observed_addr,
"protocols" => ?info.protocols
);
}
IdentifyEvent::Sent { .. } => {}
IdentifyEvent::Error { .. } => {}
IdentifyEvent::Pushed { .. } => {}
}
}

Expand Down Expand Up @@ -1164,16 +1171,12 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
delegate_to_behaviours!(self, inject_dial_failure, peer_id);
}

fn inject_new_listener(&mut self, id: ListenerId) {
delegate_to_behaviours!(self, inject_new_listener, id);
}

fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
delegate_to_behaviours!(self, inject_new_listen_addr, id, addr);
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
delegate_to_behaviours!(self, inject_new_listen_addr, addr);
}

fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
delegate_to_behaviours!(self, inject_expired_listen_addr, id, addr);
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
delegate_to_behaviours!(self, inject_expired_listen_addr, addr);
}

fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
Expand Down
1 change: 0 additions & 1 deletion beacon_node/eth2_libp2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ impl Default for Config {
.query_parallelism(5)
.disable_report_discovered_peers()
.ip_limit() // limits /24 IP's in buckets.
.incoming_bucket_limit(8) // half the bucket size
.ping_interval(Duration::from_secs(300))
.build();

Expand Down
26 changes: 8 additions & 18 deletions beacon_node/eth2_libp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
let mut subscribed_topics: Vec<GossipKind> = vec![];

for topic_kind in &config.topics {
if swarm.behaviour_mut().subscribe_kind(topic_kind.clone()) {
if swarm.subscribe_kind(topic_kind.clone()) {
subscribed_topics.push(topic_kind.clone());
} else {
warn!(log, "Could not subscribe to topic"; "topic" => %topic_kind);
Expand All @@ -244,9 +244,7 @@ impl<TSpec: EthSpec> Service<TSpec> {

/// Sends a request to a peer, with a given Id.
pub fn send_request(&mut self, peer_id: PeerId, request_id: RequestId, request: Request) {
self.swarm
.behaviour_mut()
.send_request(peer_id, request_id, request);
self.swarm.send_request(peer_id, request_id, request);
}

/// Informs the peer that their request failed.
Expand All @@ -257,30 +255,22 @@ impl<TSpec: EthSpec> Service<TSpec> {
error: RPCResponseErrorCode,
reason: String,
) {
self.swarm
.behaviour_mut()
._send_error_reponse(peer_id, id, error, reason);
self.swarm._send_error_reponse(peer_id, id, error, reason);
}

/// Report a peer's action.
pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction, source: ReportSource) {
self.swarm
.behaviour_mut()
.report_peer(peer_id, action, source);
self.swarm.report_peer(peer_id, action, source);
}

/// Disconnect and ban a peer, providing a reason.
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) {
self.swarm
.behaviour_mut()
.goodbye_peer(peer_id, reason, source);
self.swarm.goodbye_peer(peer_id, reason, source);
}

/// Sends a response to a peer's request.
pub fn send_response(&mut self, peer_id: PeerId, id: PeerRequestId, response: Response<TSpec>) {
self.swarm
.behaviour_mut()
.send_successful_response(peer_id, id, response);
self.swarm.send_successful_response(peer_id, id, response);
}

pub async fn next_event(&mut self) -> Libp2pEvent<TSpec> {
Expand Down Expand Up @@ -360,8 +350,8 @@ type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;
fn build_transport(
local_private_key: Keypair,
) -> std::io::Result<(BoxedTransport, Arc<BandwidthSinks>)> {
let tcp = libp2p::tcp::TokioTcpConfig::new().nodelay(true);
let transport = libp2p::dns::TokioDnsConfig::system(tcp)?;
let transport = libp2p::tcp::TokioTcpConfig::new().nodelay(true);
let transport = libp2p::dns::DnsConfig::new(transport)?;
#[cfg(feature = "libp2p-websocket")]
let transport = {
let trans_clone = transport.clone();
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/eth2_libp2p/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub async fn build_libp2p_instance(

#[allow(dead_code)]
pub fn get_enr(node: &LibP2PService<E>) -> Enr {
node.swarm.behaviour().local_enr()
node.swarm.local_enr()
}

// Returns `n` libp2p peers in fully connected topology.
Expand Down Expand Up @@ -171,7 +171,7 @@ pub async fn build_node_pair(
let mut sender = build_libp2p_instance(rt.clone(), vec![], sender_log).await;
let mut receiver = build_libp2p_instance(rt, vec![], receiver_log).await;

let receiver_multiaddr = receiver.swarm.behaviour_mut().local_enr().multiaddr()[1].clone();
let receiver_multiaddr = receiver.swarm.local_enr().multiaddr()[1].clone();

// let the two nodes set up listeners
let sender_fut = async {
Expand Down
32 changes: 16 additions & 16 deletions beacon_node/eth2_libp2p/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn test_status_rpc() {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.swarm.behaviour_mut().send_request(
sender.swarm.send_request(
peer_id,
RequestId::Sync(10),
rpc_request.clone(),
Expand Down Expand Up @@ -90,7 +90,7 @@ fn test_status_rpc() {
if request == rpc_request {
// send the response
debug!(log, "Receiver Received");
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
peer_id,
id,
rpc_response.clone(),
Expand Down Expand Up @@ -155,7 +155,7 @@ fn test_blocks_by_range_chunked_rpc() {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.swarm.behaviour_mut().send_request(
sender.swarm.send_request(
peer_id,
RequestId::Sync(10),
rpc_request.clone(),
Expand Down Expand Up @@ -200,14 +200,14 @@ fn test_blocks_by_range_chunked_rpc() {
// send the response
warn!(log, "Receiver got request");
for _ in 1..=messages_to_send {
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
peer_id,
id,
rpc_response.clone(),
);
}
// send the stream termination
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
peer_id,
id,
Response::BlocksByRange(None),
Expand Down Expand Up @@ -272,7 +272,7 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.swarm.behaviour_mut().send_request(
sender.swarm.send_request(
peer_id,
RequestId::Sync(10),
rpc_request.clone(),
Expand Down Expand Up @@ -341,7 +341,7 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() {
if message_info.is_some() {
messages_sent += 1;
let (peer_id, stream_id) = message_info.as_ref().unwrap();
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
*peer_id,
*stream_id,
rpc_response.clone(),
Expand Down Expand Up @@ -407,7 +407,7 @@ fn test_blocks_by_range_single_empty_rpc() {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.swarm.behaviour_mut().send_request(
sender.swarm.send_request(
peer_id,
RequestId::Sync(10),
rpc_request.clone(),
Expand Down Expand Up @@ -450,14 +450,14 @@ fn test_blocks_by_range_single_empty_rpc() {
warn!(log, "Receiver got request");

for _ in 1..=messages_to_send {
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
peer_id,
id,
rpc_response.clone(),
);
}
// send the stream termination
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
peer_id,
id,
Response::BlocksByRange(None),
Expand Down Expand Up @@ -525,7 +525,7 @@ fn test_blocks_by_root_chunked_rpc() {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.swarm.behaviour_mut().send_request(
sender.swarm.send_request(
peer_id,
RequestId::Sync(10),
rpc_request.clone(),
Expand Down Expand Up @@ -568,15 +568,15 @@ fn test_blocks_by_root_chunked_rpc() {
debug!(log, "Receiver got request");

for _ in 1..=messages_to_send {
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
peer_id,
id,
rpc_response.clone(),
);
debug!(log, "Sending message");
}
// send the stream termination
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
peer_id,
id,
Response::BlocksByRange(None),
Expand Down Expand Up @@ -649,7 +649,7 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.swarm.behaviour_mut().send_request(
sender.swarm.send_request(
peer_id,
RequestId::Sync(10),
rpc_request.clone(),
Expand Down Expand Up @@ -718,7 +718,7 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() {
if message_info.is_some() {
messages_sent += 1;
let (peer_id, stream_id) = message_info.as_ref().unwrap();
receiver.swarm.behaviour_mut().send_successful_response(
receiver.swarm.send_successful_response(
*peer_id,
*stream_id,
rpc_response.clone(),
Expand Down Expand Up @@ -764,7 +764,7 @@ fn test_goodbye_rpc() {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a goodbye and disconnect
debug!(log, "Sending RPC");
sender.swarm.behaviour_mut().goodbye_peer(
sender.swarm.goodbye_peer(
&peer_id,
GoodbyeReason::IrrelevantNetwork,
ReportSource::SyncService,
Expand Down
1 change: 1 addition & 0 deletions beacon_node/http_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ futures = "0.3.8"
store = { path = "../store" }
environment = { path = "../../lighthouse/environment" }
tree_hash = "0.1.1"
discv5 = { git = "https://github.com/sigp/discv5 ", rev = "02d2c896c66f8dc2b848c3996fedcd98e1dfec69", features = ["libp2p"] }
sensitive_url = { path = "../../common/sensitive_url" }
Loading

0 comments on commit 90ea075

Please sign in to comment.