Skip to content

Commit

Permalink
Shift metadata to the global network variables (#1631)
Browse files Browse the repository at this point in the history
## Issue Addressed

N/A

## Proposed Changes

Shifts the local `metadata` to `network_globals` making it accessible to the HTTP API and other areas of lighthouse.

## Additional Info

N/A
  • Loading branch information
AgeManning committed Sep 21, 2020
1 parent 7b97c4a commit 1db8daa
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 58 deletions.
76 changes: 23 additions & 53 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::peer_manager::{score::PeerAction, PeerManager, PeerManagerEvent};
use crate::rpc::*;
use crate::types::{EnrBitfield, GossipEncoding, GossipKind, GossipTopic, SubnetDiscovery};
use crate::service::METADATA_FILENAME;
use crate::types::{GossipEncoding, GossipKind, GossipTopic, SubnetDiscovery};
use crate::Eth2Enr;
use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
use futures::prelude::*;
Expand All @@ -23,9 +24,9 @@ use libp2p::{
PeerId,
};
use slog::{crit, debug, o, trace, warn};
use ssz::{Decode, Encode};
use ssz::Encode;
use std::fs::File;
use std::io::{Read, Write};
use std::io::Write;
use std::path::PathBuf;
use std::{
collections::VecDeque,
Expand All @@ -38,7 +39,6 @@ use types::{EnrForkId, EthSpec, SignedBeaconBlock, SubnetId};
mod handler;

const MAX_IDENTIFY_ADDRESSES: usize = 10;
const METADATA_FILENAME: &str = "metadata";

/// Builds the network behaviour that manages the core protocols of eth2.
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
Expand All @@ -58,8 +58,6 @@ pub struct Behaviour<TSpec: EthSpec> {
events: VecDeque<BehaviourEvent<TSpec>>,
/// Queue of peers to disconnect and an optional reason for the disconnection.
peers_to_dc: VecDeque<(PeerId, Option<GoodbyeReason>)>,
/// The current meta data of the node, so respond to pings and get metadata
meta_data: MetaData<TSpec>,
/// A collections of variables accessible outside the network service.
network_globals: Arc<NetworkGlobals<TSpec>>,
/// Keeps track of the current EnrForkId for upgrading gossipsub topics.
Expand Down Expand Up @@ -95,8 +93,6 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
.eth2()
.expect("Local ENR must have a fork id");

let meta_data = load_or_build_metadata(&net_conf.network_dir, &log);

let gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, net_conf.gs_config.clone())
.map_err(|e| format!("Could not construct gossipsub: {:?}", e))?;

Expand All @@ -115,7 +111,6 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
.await?,
events: VecDeque::new(),
peers_to_dc: VecDeque::new(),
meta_data,
network_globals,
enr_fork_id,
waker: None,
Expand Down Expand Up @@ -407,21 +402,31 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {

/// Updates the current meta data of the node to match the local ENR.
fn update_metadata(&mut self) {
self.meta_data.seq_number += 1;
self.meta_data.attnets = self
let local_attnets = self
.peer_manager
.discovery()
.local_enr()
.bitfield::<TSpec>()
.expect("Local discovery must have bitfield");

{
// write lock scope
let mut meta_data = self.network_globals.local_metadata.write();
meta_data.seq_number += 1;
meta_data.attnets = local_attnets;
}
// Save the updated metadata to disk
save_metadata_to_disk(&self.network_dir, self.meta_data.clone(), &self.log);
save_metadata_to_disk(
&self.network_dir,
self.network_globals.local_metadata.read().clone(),
&self.log,
);
}

/// Sends a Ping request to the peer.
fn ping(&mut self, id: RequestId, peer_id: PeerId) {
let ping = crate::rpc::Ping {
data: self.meta_data.seq_number,
data: self.network_globals.local_metadata.read().seq_number,
};
trace!(self.log, "Sending Ping"; "request_id" => id, "peer_id" => peer_id.to_string());

Expand All @@ -432,7 +437,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
/// Sends a Pong response to the peer.
fn pong(&mut self, id: PeerRequestId, peer_id: PeerId) {
let ping = crate::rpc::Ping {
data: self.meta_data.seq_number,
data: self.network_globals.local_metadata.read().seq_number,
};
trace!(self.log, "Sending Pong"; "request_id" => id.1, "peer_id" => peer_id.to_string());
let event = RPCCodedResponse::Success(RPCResponse::Pong(ping));
Expand All @@ -448,7 +453,9 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {

/// Sends a METADATA response to a peer.
fn send_meta_data_response(&mut self, id: PeerRequestId, peer_id: PeerId) {
let event = RPCCodedResponse::Success(RPCResponse::MetaData(self.meta_data.clone()));
let event = RPCCodedResponse::Success(RPCResponse::MetaData(
self.network_globals.local_metadata.read().clone(),
));
self.eth2_rpc.send_response(peer_id, id, event);
}

Expand Down Expand Up @@ -1107,45 +1114,8 @@ pub enum BehaviourEvent<TSpec: EthSpec> {
StatusPeer(PeerId),
}

/// Load metadata from persisted file. Return default metadata if loading fails.
fn load_or_build_metadata<E: EthSpec>(network_dir: &PathBuf, log: &slog::Logger) -> MetaData<E> {
// Default metadata
let mut meta_data = MetaData {
seq_number: 0,
attnets: EnrBitfield::<E>::default(),
};
// Read metadata from persisted file if available
let metadata_path = network_dir.join(METADATA_FILENAME);
if let Ok(mut metadata_file) = File::open(metadata_path) {
let mut metadata_ssz = Vec::new();
if metadata_file.read_to_end(&mut metadata_ssz).is_ok() {
match MetaData::<E>::from_ssz_bytes(&metadata_ssz) {
Ok(persisted_metadata) => {
meta_data.seq_number = persisted_metadata.seq_number;
// Increment seq number if persisted attnet is not default
if persisted_metadata.attnets != meta_data.attnets {
meta_data.seq_number += 1;
}
debug!(log, "Loaded metadata from disk");
}
Err(e) => {
debug!(
log,
"Metadata from file could not be decoded";
"error" => format!("{:?}", e),
);
}
}
}
};

debug!(log, "Metadata sequence number"; "seq_num" => meta_data.seq_number);
save_metadata_to_disk(network_dir, meta_data.clone(), &log);
meta_data
}

/// Persist metadata to disk
fn save_metadata_to_disk<E: EthSpec>(dir: &PathBuf, metadata: MetaData<E>, log: &slog::Logger) {
pub fn save_metadata_to_disk<E: EthSpec>(dir: &PathBuf, metadata: MetaData<E>, log: &slog::Logger) {
let _ = std::fs::create_dir_all(&dir);
match File::create(dir.join(METADATA_FILENAME))
.and_then(|mut f| f.write_all(&metadata.as_ssz_bytes()))
Expand Down
55 changes: 52 additions & 3 deletions beacon_node/eth2_libp2p/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::behaviour::{Behaviour, BehaviourEvent, PeerRequestId, Request, Response};
use crate::behaviour::{
save_metadata_to_disk, Behaviour, BehaviourEvent, PeerRequestId, Request, Response,
};
use crate::discovery::enr;
use crate::multiaddr::Protocol;
use crate::rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId};
use crate::types::{error, GossipKind};
use crate::rpc::{GoodbyeReason, MetaData, RPCResponseErrorCode, RequestId};
use crate::types::{error, EnrBitfield, GossipKind};
use crate::EnrExt;
use crate::{NetworkConfig, NetworkGlobals, PeerAction};
use futures::prelude::*;
Expand All @@ -15,6 +17,7 @@ use libp2p::{
PeerId, Swarm, Transport,
};
use slog::{crit, debug, info, o, trace, warn};
use ssz::Decode;
use std::fs::File;
use std::io::prelude::*;
use std::io::{Error, ErrorKind};
Expand All @@ -26,6 +29,8 @@ use types::{EnrForkId, EthSpec};
pub const NETWORK_KEY_FILENAME: &str = "key";
/// The maximum simultaneous libp2p connections per peer.
const MAX_CONNECTIONS_PER_PEER: usize = 1;
/// The filename to store our local metadata.
pub const METADATA_FILENAME: &str = "metadata";

/// The types of events than can be obtained from polling the libp2p service.
///
Expand Down Expand Up @@ -70,11 +75,15 @@ impl<TSpec: EthSpec> Service<TSpec> {
enr::build_or_load_enr::<TSpec>(local_keypair.clone(), config, enr_fork_id, &log)?;

let local_peer_id = enr.peer_id();

let meta_data = load_or_build_metadata(&config.network_dir, &log);

// set up a collection of variables accessible outside of the network crate
let network_globals = Arc::new(NetworkGlobals::new(
enr.clone(),
config.libp2p_port,
config.discovery_port,
meta_data,
&log,
));

Expand Down Expand Up @@ -420,3 +429,43 @@ fn strip_peer_id(addr: &mut Multiaddr) {
_ => {}
}
}

/// Load metadata from persisted file. Return default metadata if loading fails.
fn load_or_build_metadata<E: EthSpec>(
network_dir: &std::path::PathBuf,
log: &slog::Logger,
) -> MetaData<E> {
// Default metadata
let mut meta_data = MetaData {
seq_number: 0,
attnets: EnrBitfield::<E>::default(),
};
// Read metadata from persisted file if available
let metadata_path = network_dir.join(METADATA_FILENAME);
if let Ok(mut metadata_file) = File::open(metadata_path) {
let mut metadata_ssz = Vec::new();
if metadata_file.read_to_end(&mut metadata_ssz).is_ok() {
match MetaData::<E>::from_ssz_bytes(&metadata_ssz) {
Ok(persisted_metadata) => {
meta_data.seq_number = persisted_metadata.seq_number;
// Increment seq number if persisted attnet is not default
if persisted_metadata.attnets != meta_data.attnets {
meta_data.seq_number += 1;
}
debug!(log, "Loaded metadata from disk");
}
Err(e) => {
debug!(
log,
"Metadata from file could not be decoded";
"error" => format!("{:?}", e),
);
}
}
}
};

debug!(log, "Metadata sequence number"; "seq_num" => meta_data.seq_number);
save_metadata_to_disk(network_dir, meta_data.clone(), &log);
meta_data
}
12 changes: 11 additions & 1 deletion beacon_node/eth2_libp2p/src/types/globals.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! A collection of variables that are accessible outside of the network thread itself.
use crate::peer_manager::PeerDB;
use crate::rpc::MetaData;
use crate::types::SyncState;
use crate::Client;
use crate::EnrExt;
Expand All @@ -22,21 +23,30 @@ pub struct NetworkGlobals<TSpec: EthSpec> {
pub listen_port_udp: AtomicU16,
/// The collection of known peers.
pub peers: RwLock<PeerDB<TSpec>>,
// The local meta data of our node.
pub local_metadata: RwLock<MetaData<TSpec>>,
/// The current gossipsub topic subscriptions.
pub gossipsub_subscriptions: RwLock<HashSet<GossipTopic>>,
/// The current sync status of the node.
pub sync_state: RwLock<SyncState>,
}

impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
pub fn new(enr: Enr, tcp_port: u16, udp_port: u16, log: &slog::Logger) -> Self {
pub fn new(
enr: Enr,
tcp_port: u16,
udp_port: u16,
local_metadata: MetaData<TSpec>,
log: &slog::Logger,
) -> Self {
NetworkGlobals {
local_enr: RwLock::new(enr.clone()),
peer_id: RwLock::new(enr.peer_id()),
listen_multiaddrs: RwLock::new(Vec::new()),
listen_port_tcp: AtomicU16::new(tcp_port),
listen_port_udp: AtomicU16::new(udp_port),
peers: RwLock::new(PeerDB::new(log)),
local_metadata: RwLock::new(local_metadata),
gossipsub_subscriptions: RwLock::new(HashSet::new()),
sync_state: RwLock::new(SyncState::Stalled),
}
Expand Down
11 changes: 10 additions & 1 deletion beacon_node/network/src/attestation_service/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ mod tests {
migrate::NullMigrator,
};
use eth2_libp2p::discovery::{build_enr, Keypair};
use eth2_libp2p::rpc::methods::MetaData;
use eth2_libp2p::types::EnrBitfield;
use eth2_libp2p::{
discovery::CombinedKey, CombinedKeyExt, NetworkConfig, NetworkGlobals, SubnetDiscovery,
};
Expand Down Expand Up @@ -102,7 +104,14 @@ mod tests {
let enr_key = CombinedKey::from_libp2p(&Keypair::generate_secp256k1()).unwrap();
let enr = build_enr::<MinimalEthSpec>(&enr_key, &config, EnrForkId::default()).unwrap();

let network_globals: NetworkGlobals<MinimalEthSpec> = NetworkGlobals::new(enr, 0, 0, &log);
// Default metadata
let meta_data = MetaData {
seq_number: 0,
attnets: EnrBitfield::<MinimalEthSpec>::default(),
};

let network_globals: NetworkGlobals<MinimalEthSpec> =
NetworkGlobals::new(enr, 0, 0, meta_data, &log);
AttestationService::new(beacon_chain, Arc::new(network_globals), &log)
}

Expand Down

0 comments on commit 1db8daa

Please sign in to comment.