Skip to content

Commit

Permalink
Feature Update (#66)
Browse files Browse the repository at this point in the history
* Dep update

* Improve filtering

* Temp commit

* Another temp commit

* Further progress

* Temp commit

* Further modifications

* More updates

* temp changes

* More changes

* More temp commit

* Temp fighting trait generics

* Filter trait complete

* Temp commit

* Compiles

* Most tests passing

* All tests pass

* Add rt to tokio

* Further testing and optimisations

* Fix typo

* Handle all cases of an insert or update

* Version bump this update

* Enable reading of table status

* Make key accessible

* Expose connection status

* Remove ENR branch

* Increase the accessibility of sub-components

* Cargo fmt
  • Loading branch information
AgeManning authored May 28, 2021
1 parent fa19557 commit 82c11b2
Show file tree
Hide file tree
Showing 22 changed files with 1,702 additions and 439 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
- name: Get latest version of stable rust
run: rustup update stable
- name: Check formatting with cargofmt
run: cargo fmt --all -- --check --config merge_imports=true
run: cargo fmt --all -- --check --config imports_granularity=Crate
release-tests-ubuntu:
runs-on: ubuntu-latest
needs: cargo-fmt
Expand Down
53 changes: 27 additions & 26 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "discv5"
authors = ["Age Manning <[email protected]>"]
edition = "2018"
version = "0.1.0-beta.3"
version = "0.1.0-beta.4"
description = "Implementation of the p2p discv5 discovery protocol"
license = "MIT"
repository = "https://github.com/sigp/discv5"
Expand All @@ -15,41 +15,42 @@ exclude = [
]

[dependencies]
enr = { version = "0.5.0", features = ["k256", "ed25519"] }
tokio = { version = "1.1", features = ["net", "sync", "macros"] }
tokio-stream = "0.1.2"
tokio-util = { version = "0.6.2", features = ["time"] }
libp2p-core = { version = "0.27.0", optional = true }
zeroize = { version = "1.1.1", features = ["zeroize_derive"] }
futures = "0.3.8"
uint = { version = "0.9", default-features = false }
rlp = "0.5"
sha2 = "0.9.2"
hkdf = "0.10.0"
hex = "0.4.2"
enr = { version = "0.5.1", features = ["k256", "ed25519"] }
tokio = { version = "1.5.0", features = ["net", "sync", "macros", "rt"] }
tokio-stream = "0.1.5"
tokio-util = { version = "0.6.6", features = ["time"] }
libp2p-core = { version = "0.28.3", optional = true }
zeroize = { version = "1.3.0", features = ["zeroize_derive"] }
futures = "0.3.14"
uint = { version = "0.9.0", default-features = false }
rlp = "0.5.0"
sha2 = "0.9.3"
hkdf = "0.11.0"
hex = "0.4.3"
fnv = "1.0.7"
arrayvec = "0.5.2"
arrayvec = "0.7.0"
digest = "0.9.0"
rand = "0.7.3"
smallvec = "1.5.0"
rand = "0.8.3"
smallvec = "1.6.1"
parking_lot = "0.11.1"
lru_time_cache = "0.11.2"
lru_time_cache = "0.11.10"
lazy_static = "1.4.0"
aes-gcm = "0.8.0"
aes-gcm = "0.9.0"
aes-ctr = "0.6.0"
k256 = { version = "0.7", features = ["zeroize", "ecdh", "sha2"] }
tracing = { version = "0.1.21", features = ["log"] }
tracing-subscriber = "0.2.15"
tracing = { version = "0.1.26", features = ["log"] }
tracing-subscriber = "0.2.18"
lru = "0.6.5"

[dev-dependencies]
rand_07 = { package = "rand", version = "0.7" }
quickcheck = "0.9.2"
env_logger = "0.8.2"
env_logger = "0.8.3"
hex-literal = "0.3.1"
simple_logger = "1.11.0"
tokio-util = { version = "0.6.2", features = ["time"] }
tokio = { version = "1.1", features = ["full"] }
rand_xorshift = "0.2.0"
rand_core = "0.5.1"
tokio-util = { version = "0.6.6", features = ["time"] }
tokio = { version = "1.5.0", features = ["full"] }
rand_xorshift = "0.3.0"
rand_core = "0.6.2"

[features]
libp2p = ["libp2p-core"]
6 changes: 5 additions & 1 deletion examples/find_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
//!
//! For a simple CLI discovery service see [discv5-cli](https://github.com/AgeManning/discv5-cli)
use discv5::{enr, enr::CombinedKey, Discv5, Discv5ConfigBuilder};
use discv5::{
enr,
enr::{k256, CombinedKey},
Discv5, Discv5ConfigBuilder,
};
use std::{
net::{Ipv4Addr, SocketAddr},
time::Duration,
Expand Down
31 changes: 30 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{Enr, Executor, FilterConfig, PermitBanList};
use crate::{kbucket::MAX_NODES_PER_BUCKET, Enr, Executor, FilterConfig, PermitBanList};
///! A set of configuration parameters to tune the discovery protocol.
use std::time::Duration;

Expand All @@ -11,6 +11,10 @@ pub struct Discv5Config {
/// The request timeout for each UDP request. Default: 2 seconds.
pub request_timeout: Duration,

/// The interval over which votes are remembered when determining our external IP. A lower
/// interval will respond faster to IP changes. Default is 30 seconds.
pub vote_duration: Duration,

/// The timeout after which a `QueryPeer` in an ongoing query is marked unresponsive.
/// Unresponsive peers don't count towards the parallelism limits for a query.
/// Hence, we may potentially end up making more requests to good peers. Default: 2 seconds.
Expand Down Expand Up @@ -45,6 +49,10 @@ pub struct Discv5Config {
/// /24 subnet in the kbuckets table. This is to mitigate eclipse attacks. Default: false.
pub ip_limit: bool,

/// Sets a maximum limit to the number of incoming nodes (nodes that have dialed us) to exist per-bucket. This cannot be larger
/// than the bucket size (16). By default this is disabled (set to the maximum bucket size, 16).
pub incoming_bucket_limit: usize,

/// A filter used to decide whether to insert nodes into our local routing table. Nodes can be
/// excluded if they do not pass this filter. The default is to accept all nodes.
pub table_filter: fn(&Enr) -> bool,
Expand Down Expand Up @@ -79,6 +87,7 @@ impl Default for Discv5Config {
Self {
enable_packet_filter: false,
request_timeout: Duration::from_secs(1),
vote_duration: Duration::from_secs(30),
query_peer_timeout: Duration::from_secs(2),
query_timeout: Duration::from_secs(60),
request_retries: 1,
Expand All @@ -89,6 +98,7 @@ impl Default for Discv5Config {
enr_peer_update_min: 10,
query_parallelism: 3,
ip_limit: false,
incoming_bucket_limit: MAX_NODES_PER_BUCKET,
table_filter: |_| true,
talkreq_callback: |_, _| Vec::new(),
ping_interval: Duration::from_secs(300),
Expand Down Expand Up @@ -132,6 +142,13 @@ impl Discv5ConfigBuilder {
self
}

/// The interval over which votes are remembered when determining our external IP. A lower
/// interval will respond faster to IP changes. Default is 30 seconds.
pub fn vote_duration(&mut self, vote_duration: Duration) -> &mut Self {
self.config.vote_duration = vote_duration;
self
}

/// The timeout for an entire query. Any peers discovered before this timeout are returned.
pub fn query_timeout(&mut self, timeout: Duration) -> &mut Self {
self.config.query_timeout = timeout;
Expand Down Expand Up @@ -199,6 +216,13 @@ impl Discv5ConfigBuilder {
self
}

/// Sets a maximum limit to the number of incoming nodes (nodes that have dialed us) to exist per-bucket. This cannot be larger
/// than the bucket size (16). By default, half of every bucket (8 positions) is the largest number of nodes that we accept that dial us.
pub fn incoming_bucket_limit(&mut self, limit: usize) -> &mut Self {
self.config.incoming_bucket_limit = limit;
self
}

/// A filter used to decide whether to insert nodes into our local routing table. Nodes can be
/// excluded if they do not pass this filter.
pub fn table_filter(&mut self, filter: fn(&Enr) -> bool) -> &mut Self {
Expand Down Expand Up @@ -250,6 +274,9 @@ impl Discv5ConfigBuilder {
if self.config.executor.is_none() {
self.config.executor = Some(Box::new(crate::executor::TokioExecutor::default()));
};

assert!(self.config.incoming_bucket_limit <= MAX_NODES_PER_BUCKET);

self.config.clone()
}
}
Expand All @@ -259,6 +286,7 @@ impl std::fmt::Debug for Discv5Config {
let mut builder = f.debug_struct("Discv5Config");
let _ = builder.field("filter_enabled", &self.enable_packet_filter);
let _ = builder.field("request_timeout", &self.request_timeout);
let _ = builder.field("vote_duration", &self.vote_duration);
let _ = builder.field("query_timeout", &self.query_timeout);
let _ = builder.field("query_peer_timeout", &self.query_peer_timeout);
let _ = builder.field("request_retries", &self.request_retries);
Expand All @@ -268,6 +296,7 @@ impl std::fmt::Debug for Discv5Config {
let _ = builder.field("query_parallelism", &self.query_parallelism);
let _ = builder.field("report_discovered_peers", &self.report_discovered_peers);
let _ = builder.field("ip_limit", &self.ip_limit);
let _ = builder.field("incoming_bucket_limit", &self.incoming_bucket_limit);
let _ = builder.field("ping_interval", &self.ping_interval);
builder.finish()
}
Expand Down
103 changes: 62 additions & 41 deletions src/discv5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
use crate::{
error::{Discv5Error, QueryError, RequestError},
kbucket::{self, ip_limiter, KBucketsTable, NodeStatus},
kbucket::{
self, ConnectionDirection, ConnectionState, FailureReason, InsertResult, KBucketsTable,
NodeStatus, UpdateResult,
},
node_info::NodeContact,
service::{QueryKind, Service, ServiceRequest},
Discv5Config, Enr,
Expand Down Expand Up @@ -89,11 +92,26 @@ impl Discv5 {
config.executor = Some(Box::new(crate::executor::TokioExecutor::default()));
};

// NOTE: Currently we don't expose custom filter support in the configuration. Users can
// optionally use the IP filter via the ip_limit configuration parameter. In the future, we
// may expose this functionality to the users if there is demand for it.
let (table_filter, bucket_filter) = if config.ip_limit {
(
Some(Box::new(kbucket::IpTableFilter) as Box<dyn kbucket::Filter<Enr>>),
Some(Box::new(kbucket::IpBucketFilter) as Box<dyn kbucket::Filter<Enr>>),
)
} else {
(None, None)
};

let local_enr = Arc::new(RwLock::new(local_enr));
let enr_key = Arc::new(RwLock::new(enr_key));
let kbuckets = Arc::new(RwLock::new(KBucketsTable::new(
local_enr.read().node_id().into(),
Duration::from_secs(60),
config.incoming_bucket_limit,
table_filter,
bucket_filter,
)));

// Update the PermitBan list based on initial configuration
Expand Down Expand Up @@ -163,35 +181,26 @@ impl Discv5 {

let key = kbucket::Key::from(enr.node_id());

// should the ENR be inserted or updated to a value that would exceed the IP limit ban
let ip_limit_ban = self.config.ip_limit
&& !self
.kbuckets
.read()
.check(&key, &enr, |v, o, l| ip_limiter(v, &o, l));

match self.kbuckets.write().entry(&key) {
kbucket::Entry::Present(mut entry, _) => {
// still update an ENR, regardless of the IP limit ban
*entry.value() = enr;
}
kbucket::Entry::Pending(mut entry, _) => {
*entry.value() = enr;
}
kbucket::Entry::Absent(entry) => {
if !ip_limit_ban {
match entry.insert(enr, NodeStatus::Disconnected) {
kbucket::InsertResult::Inserted => {}
kbucket::InsertResult::Full => {
return Err("Table full");
}
kbucket::InsertResult::Pending { .. } => {}
}
}
}
kbucket::Entry::SelfEntry => {}
};
Ok(())
match self.kbuckets.write().insert_or_update(
&key,
enr,
NodeStatus {
state: ConnectionState::Disconnected,
direction: ConnectionDirection::Incoming,
},
) {
InsertResult::Inserted
| InsertResult::Pending { .. }
| InsertResult::StatusUpdated { .. }
| InsertResult::ValueUpdated
| InsertResult::Updated { .. }
| InsertResult::UpdatedPending => Ok(()),
InsertResult::Failed(FailureReason::BucketFull) => Err("Table full"),
InsertResult::Failed(FailureReason::BucketFilter) => Err("Failed bucket filter"),
InsertResult::Failed(FailureReason::TableFilter) => Err("Failed table filter"),
InsertResult::Failed(FailureReason::InvalidSelfUpdate) => Err("Invalid self update"),
InsertResult::Failed(_) => Err("Failed to insert ENR"),
}
}

/// Removes a `node_id` from the routing table.
Expand All @@ -210,16 +219,13 @@ impl Discv5 {
/// Returns `true` if node was in table and `false` otherwise.
pub fn disconnect_node(&mut self, node_id: &NodeId) -> bool {
let key = &kbucket::Key::from(*node_id);
match self.kbuckets.write().entry(key) {
kbucket::Entry::Present(entry, _) => {
entry.update(NodeStatus::Disconnected);
true
}
kbucket::Entry::Pending(entry, _) => {
entry.update(NodeStatus::Disconnected);
true
}
_ => false,
match self
.kbuckets
.write()
.update_node_status(key, ConnectionState::Disconnected, None)
{
UpdateResult::Failed(_) => false,
_ => true,
}
}

Expand All @@ -228,7 +234,7 @@ impl Discv5 {
self.kbuckets
.write()
.iter()
.filter(|entry| entry.status == NodeStatus::Connected)
.filter(|entry| entry.status.is_connected())
.count()
}

Expand Down Expand Up @@ -345,6 +351,21 @@ impl Discv5 {
.collect()
}

/// Returns an iterator over all the entries in the routing table.
pub fn table_entries(&mut self) -> Vec<(NodeId, Enr, NodeStatus)> {
self.kbuckets
.write()
.iter()
.map(|entry| {
(
*entry.node.key.preimage(),
entry.node.value.clone(),
entry.status.clone(),
)
})
.collect()
}

/// Requests the ENR of a node corresponding to multiaddr or multi-addr string.
///
/// Only `ed25519` and `secp256k1` key types are currently supported.
Expand Down
6 changes: 3 additions & 3 deletions src/discv5/test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![cfg(test)]

use crate::{kbucket, Discv5, *};
use enr::{CombinedKey, Enr, EnrBuilder, EnrKey, NodeId};
use enr::{k256, CombinedKey, Enr, EnrBuilder, EnrKey, NodeId};
use rand_core::{RngCore, SeedableRng};
use std::{
collections::HashMap,
Expand Down Expand Up @@ -424,7 +424,7 @@ async fn test_table_limits() {
})
.collect();
for enr in enrs {
discv5.add_enr(enr.clone()).unwrap();
let _ = discv5.add_enr(enr.clone()); // we expect some of these to fail the filter.
}
// Number of entries should be `table_limit`, i.e one node got restricted
assert_eq!(discv5.kbuckets.read().iter_ref().count(), table_limit);
Expand Down Expand Up @@ -474,7 +474,7 @@ async fn test_bucket_limits() {
let config = Discv5ConfigBuilder::new().ip_limit().build();
let mut discv5 = Discv5::new(enr, enr_key, config).unwrap();
for enr in enrs {
discv5.add_enr(enr.clone()).unwrap();
let _ = discv5.add_enr(enr.clone()); // we expect some of these to fail based on the filter.
}

// Number of entries should be equal to `bucket_limit`.
Expand Down
3 changes: 2 additions & 1 deletion src/handler/crypto/ecdh.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Implements the static ecdh algorithm required by discv5 in terms of the `k256` library.
use k256::{
use super::k256::{
self,
ecdsa::{SigningKey, VerifyingKey},
elliptic_curve::sec1::ToEncodedPoint,
};
Expand Down
Loading

0 comments on commit 82c11b2

Please sign in to comment.