From 82c11b261fc65f0962f79612393a8ce2a4ccb01d Mon Sep 17 00:00:00 2001 From: Age Manning Date: Fri, 28 May 2021 13:13:47 +1000 Subject: [PATCH] Feature Update (#66) * Dep update * Improve filtering * Temp commit * Another temp commit * Further progress * Temp commit * Further modifications * More updates * temp changes * More changes * More temp commit * Temp fighting trait generics * Filter trait complete * Temp commit * Compiles * Most tests passing * All tests pass * Add rt to tokio * Further testing and optimisations * Fix typo * Handle all cases of an insert or update * Version bump this update * Enable reading of table status * Make key accessible * Expose connection status * Remove ENR branch * Increase the accessibility of sub-components * Cargo fmt --- .github/workflows/build.yml | 2 +- Cargo.toml | 53 +-- examples/find_nodes.rs | 6 +- src/config.rs | 31 +- src/discv5.rs | 103 +++-- src/discv5/test.rs | 6 +- src/handler/crypto/ecdh.rs | 3 +- src/handler/mod.rs | 72 +++- src/handler/tests.rs | 2 +- src/kbucket.rs | 453 ++++++++++++++++++---- src/kbucket/bucket.rs | 649 ++++++++++++++++++++++++++------ src/kbucket/entry.rs | 57 +-- src/kbucket/filter.rs | 96 +++++ src/lib.rs | 3 +- src/packet/mod.rs | 2 +- src/query_pool/peers/closest.rs | 2 +- src/service.rs | 322 ++++++++++------ src/service/hashset_delay.rs | 131 +++++++ src/service/ip_vote.rs | 25 +- src/service/test.rs | 51 ++- src/socket/filter/config.rs | 8 + src/socket/filter/mod.rs | 64 +++- 22 files changed, 1702 insertions(+), 439 deletions(-) create mode 100644 src/kbucket/filter.rs create mode 100644 src/service/hashset_delay.rs diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 868ff080f..c3ff8a06b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -10,7 +10,7 @@ jobs: - name: Get latest version of stable rust run: rustup update stable - name: Check formatting with cargofmt - run: cargo fmt --all -- --check --config merge_imports=true + run: cargo fmt --all -- --check --config imports_granularity=Crate release-tests-ubuntu: runs-on: ubuntu-latest needs: cargo-fmt diff --git a/Cargo.toml b/Cargo.toml index 48c83b608..fb194086a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "discv5" authors = ["Age Manning "] edition = "2018" -version = "0.1.0-beta.3" +version = "0.1.0-beta.4" description = "Implementation of the p2p discv5 discovery protocol" license = "MIT" repository = "https://github.com/sigp/discv5" @@ -15,41 +15,42 @@ exclude = [ ] [dependencies] -enr = { version = "0.5.0", features = ["k256", "ed25519"] } -tokio = { version = "1.1", features = ["net", "sync", "macros"] } -tokio-stream = "0.1.2" -tokio-util = { version = "0.6.2", features = ["time"] } -libp2p-core = { version = "0.27.0", optional = true } -zeroize = { version = "1.1.1", features = ["zeroize_derive"] } -futures = "0.3.8" -uint = { version = "0.9", default-features = false } -rlp = "0.5" -sha2 = "0.9.2" -hkdf = "0.10.0" -hex = "0.4.2" +enr = { version = "0.5.1", features = ["k256", "ed25519"] } +tokio = { version = "1.5.0", features = ["net", "sync", "macros", "rt"] } +tokio-stream = "0.1.5" +tokio-util = { version = "0.6.6", features = ["time"] } +libp2p-core = { version = "0.28.3", optional = true } +zeroize = { version = "1.3.0", features = ["zeroize_derive"] } +futures = "0.3.14" +uint = { version = "0.9.0", default-features = false } +rlp = "0.5.0" +sha2 = "0.9.3" +hkdf = "0.11.0" +hex = "0.4.3" fnv = "1.0.7" -arrayvec = "0.5.2" +arrayvec = "0.7.0" digest = "0.9.0" -rand = "0.7.3" -smallvec = "1.5.0" +rand = "0.8.3" +smallvec = "1.6.1" parking_lot = "0.11.1" -lru_time_cache = "0.11.2" +lru_time_cache = "0.11.10" lazy_static = "1.4.0" -aes-gcm = "0.8.0" +aes-gcm = "0.9.0" aes-ctr = "0.6.0" -k256 = { version = "0.7", features = ["zeroize", "ecdh", "sha2"] } -tracing = { version = "0.1.21", features = ["log"] } -tracing-subscriber = "0.2.15" +tracing = { version = "0.1.26", features = ["log"] } +tracing-subscriber = "0.2.18" +lru = "0.6.5" [dev-dependencies] +rand_07 = { package = "rand", version = "0.7" } quickcheck = "0.9.2" -env_logger = "0.8.2" +env_logger = "0.8.3" hex-literal = "0.3.1" simple_logger = "1.11.0" -tokio-util = { version = "0.6.2", features = ["time"] } -tokio = { version = "1.1", features = ["full"] } -rand_xorshift = "0.2.0" -rand_core = "0.5.1" +tokio-util = { version = "0.6.6", features = ["time"] } +tokio = { version = "1.5.0", features = ["full"] } +rand_xorshift = "0.3.0" +rand_core = "0.6.2" [features] libp2p = ["libp2p-core"] diff --git a/examples/find_nodes.rs b/examples/find_nodes.rs index 2ff5d4160..8b87c9a5d 100644 --- a/examples/find_nodes.rs +++ b/examples/find_nodes.rs @@ -36,7 +36,11 @@ //! //! For a simple CLI discovery service see [discv5-cli](https://github.com/AgeManning/discv5-cli) -use discv5::{enr, enr::CombinedKey, Discv5, Discv5ConfigBuilder}; +use discv5::{ + enr, + enr::{k256, CombinedKey}, + Discv5, Discv5ConfigBuilder, +}; use std::{ net::{Ipv4Addr, SocketAddr}, time::Duration, diff --git a/src/config.rs b/src/config.rs index bbd74a7cc..e248426d0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,4 @@ -use crate::{Enr, Executor, FilterConfig, PermitBanList}; +use crate::{kbucket::MAX_NODES_PER_BUCKET, Enr, Executor, FilterConfig, PermitBanList}; ///! A set of configuration parameters to tune the discovery protocol. use std::time::Duration; @@ -11,6 +11,10 @@ pub struct Discv5Config { /// The request timeout for each UDP request. Default: 2 seconds. pub request_timeout: Duration, + /// The interval over which votes are remembered when determining our external IP. A lower + /// interval will respond faster to IP changes. Default is 30 seconds. + pub vote_duration: Duration, + /// The timeout after which a `QueryPeer` in an ongoing query is marked unresponsive. /// Unresponsive peers don't count towards the parallelism limits for a query. /// Hence, we may potentially end up making more requests to good peers. Default: 2 seconds. @@ -45,6 +49,10 @@ pub struct Discv5Config { /// /24 subnet in the kbuckets table. This is to mitigate eclipse attacks. Default: false. pub ip_limit: bool, + /// Sets a maximum limit to the number of incoming nodes (nodes that have dialed us) to exist per-bucket. This cannot be larger + /// than the bucket size (16). By default this is disabled (set to the maximum bucket size, 16). + pub incoming_bucket_limit: usize, + /// A filter used to decide whether to insert nodes into our local routing table. Nodes can be /// excluded if they do not pass this filter. The default is to accept all nodes. pub table_filter: fn(&Enr) -> bool, @@ -79,6 +87,7 @@ impl Default for Discv5Config { Self { enable_packet_filter: false, request_timeout: Duration::from_secs(1), + vote_duration: Duration::from_secs(30), query_peer_timeout: Duration::from_secs(2), query_timeout: Duration::from_secs(60), request_retries: 1, @@ -89,6 +98,7 @@ impl Default for Discv5Config { enr_peer_update_min: 10, query_parallelism: 3, ip_limit: false, + incoming_bucket_limit: MAX_NODES_PER_BUCKET, table_filter: |_| true, talkreq_callback: |_, _| Vec::new(), ping_interval: Duration::from_secs(300), @@ -132,6 +142,13 @@ impl Discv5ConfigBuilder { self } + /// The interval over which votes are remembered when determining our external IP. A lower + /// interval will respond faster to IP changes. Default is 30 seconds. + pub fn vote_duration(&mut self, vote_duration: Duration) -> &mut Self { + self.config.vote_duration = vote_duration; + self + } + /// The timeout for an entire query. Any peers discovered before this timeout are returned. pub fn query_timeout(&mut self, timeout: Duration) -> &mut Self { self.config.query_timeout = timeout; @@ -199,6 +216,13 @@ impl Discv5ConfigBuilder { self } + /// Sets a maximum limit to the number of incoming nodes (nodes that have dialed us) to exist per-bucket. This cannot be larger + /// than the bucket size (16). By default, half of every bucket (8 positions) is the largest number of nodes that we accept that dial us. + pub fn incoming_bucket_limit(&mut self, limit: usize) -> &mut Self { + self.config.incoming_bucket_limit = limit; + self + } + /// A filter used to decide whether to insert nodes into our local routing table. Nodes can be /// excluded if they do not pass this filter. pub fn table_filter(&mut self, filter: fn(&Enr) -> bool) -> &mut Self { @@ -250,6 +274,9 @@ impl Discv5ConfigBuilder { if self.config.executor.is_none() { self.config.executor = Some(Box::new(crate::executor::TokioExecutor::default())); }; + + assert!(self.config.incoming_bucket_limit <= MAX_NODES_PER_BUCKET); + self.config.clone() } } @@ -259,6 +286,7 @@ impl std::fmt::Debug for Discv5Config { let mut builder = f.debug_struct("Discv5Config"); let _ = builder.field("filter_enabled", &self.enable_packet_filter); let _ = builder.field("request_timeout", &self.request_timeout); + let _ = builder.field("vote_duration", &self.vote_duration); let _ = builder.field("query_timeout", &self.query_timeout); let _ = builder.field("query_peer_timeout", &self.query_peer_timeout); let _ = builder.field("request_retries", &self.request_retries); @@ -268,6 +296,7 @@ impl std::fmt::Debug for Discv5Config { let _ = builder.field("query_parallelism", &self.query_parallelism); let _ = builder.field("report_discovered_peers", &self.report_discovered_peers); let _ = builder.field("ip_limit", &self.ip_limit); + let _ = builder.field("incoming_bucket_limit", &self.incoming_bucket_limit); let _ = builder.field("ping_interval", &self.ping_interval); builder.finish() } diff --git a/src/discv5.rs b/src/discv5.rs index 76bfe2c9a..61e4108f9 100644 --- a/src/discv5.rs +++ b/src/discv5.rs @@ -14,7 +14,10 @@ use crate::{ error::{Discv5Error, QueryError, RequestError}, - kbucket::{self, ip_limiter, KBucketsTable, NodeStatus}, + kbucket::{ + self, ConnectionDirection, ConnectionState, FailureReason, InsertResult, KBucketsTable, + NodeStatus, UpdateResult, + }, node_info::NodeContact, service::{QueryKind, Service, ServiceRequest}, Discv5Config, Enr, @@ -89,11 +92,26 @@ impl Discv5 { config.executor = Some(Box::new(crate::executor::TokioExecutor::default())); }; + // NOTE: Currently we don't expose custom filter support in the configuration. Users can + // optionally use the IP filter via the ip_limit configuration parameter. In the future, we + // may expose this functionality to the users if there is demand for it. + let (table_filter, bucket_filter) = if config.ip_limit { + ( + Some(Box::new(kbucket::IpTableFilter) as Box>), + Some(Box::new(kbucket::IpBucketFilter) as Box>), + ) + } else { + (None, None) + }; + let local_enr = Arc::new(RwLock::new(local_enr)); let enr_key = Arc::new(RwLock::new(enr_key)); let kbuckets = Arc::new(RwLock::new(KBucketsTable::new( local_enr.read().node_id().into(), Duration::from_secs(60), + config.incoming_bucket_limit, + table_filter, + bucket_filter, ))); // Update the PermitBan list based on initial configuration @@ -163,35 +181,26 @@ impl Discv5 { let key = kbucket::Key::from(enr.node_id()); - // should the ENR be inserted or updated to a value that would exceed the IP limit ban - let ip_limit_ban = self.config.ip_limit - && !self - .kbuckets - .read() - .check(&key, &enr, |v, o, l| ip_limiter(v, &o, l)); - - match self.kbuckets.write().entry(&key) { - kbucket::Entry::Present(mut entry, _) => { - // still update an ENR, regardless of the IP limit ban - *entry.value() = enr; - } - kbucket::Entry::Pending(mut entry, _) => { - *entry.value() = enr; - } - kbucket::Entry::Absent(entry) => { - if !ip_limit_ban { - match entry.insert(enr, NodeStatus::Disconnected) { - kbucket::InsertResult::Inserted => {} - kbucket::InsertResult::Full => { - return Err("Table full"); - } - kbucket::InsertResult::Pending { .. } => {} - } - } - } - kbucket::Entry::SelfEntry => {} - }; - Ok(()) + match self.kbuckets.write().insert_or_update( + &key, + enr, + NodeStatus { + state: ConnectionState::Disconnected, + direction: ConnectionDirection::Incoming, + }, + ) { + InsertResult::Inserted + | InsertResult::Pending { .. } + | InsertResult::StatusUpdated { .. } + | InsertResult::ValueUpdated + | InsertResult::Updated { .. } + | InsertResult::UpdatedPending => Ok(()), + InsertResult::Failed(FailureReason::BucketFull) => Err("Table full"), + InsertResult::Failed(FailureReason::BucketFilter) => Err("Failed bucket filter"), + InsertResult::Failed(FailureReason::TableFilter) => Err("Failed table filter"), + InsertResult::Failed(FailureReason::InvalidSelfUpdate) => Err("Invalid self update"), + InsertResult::Failed(_) => Err("Failed to insert ENR"), + } } /// Removes a `node_id` from the routing table. @@ -210,16 +219,13 @@ impl Discv5 { /// Returns `true` if node was in table and `false` otherwise. pub fn disconnect_node(&mut self, node_id: &NodeId) -> bool { let key = &kbucket::Key::from(*node_id); - match self.kbuckets.write().entry(key) { - kbucket::Entry::Present(entry, _) => { - entry.update(NodeStatus::Disconnected); - true - } - kbucket::Entry::Pending(entry, _) => { - entry.update(NodeStatus::Disconnected); - true - } - _ => false, + match self + .kbuckets + .write() + .update_node_status(key, ConnectionState::Disconnected, None) + { + UpdateResult::Failed(_) => false, + _ => true, } } @@ -228,7 +234,7 @@ impl Discv5 { self.kbuckets .write() .iter() - .filter(|entry| entry.status == NodeStatus::Connected) + .filter(|entry| entry.status.is_connected()) .count() } @@ -345,6 +351,21 @@ impl Discv5 { .collect() } + /// Returns an iterator over all the entries in the routing table. + pub fn table_entries(&mut self) -> Vec<(NodeId, Enr, NodeStatus)> { + self.kbuckets + .write() + .iter() + .map(|entry| { + ( + *entry.node.key.preimage(), + entry.node.value.clone(), + entry.status.clone(), + ) + }) + .collect() + } + /// Requests the ENR of a node corresponding to multiaddr or multi-addr string. /// /// Only `ed25519` and `secp256k1` key types are currently supported. diff --git a/src/discv5/test.rs b/src/discv5/test.rs index 691137798..378436356 100644 --- a/src/discv5/test.rs +++ b/src/discv5/test.rs @@ -1,7 +1,7 @@ #![cfg(test)] use crate::{kbucket, Discv5, *}; -use enr::{CombinedKey, Enr, EnrBuilder, EnrKey, NodeId}; +use enr::{k256, CombinedKey, Enr, EnrBuilder, EnrKey, NodeId}; use rand_core::{RngCore, SeedableRng}; use std::{ collections::HashMap, @@ -424,7 +424,7 @@ async fn test_table_limits() { }) .collect(); for enr in enrs { - discv5.add_enr(enr.clone()).unwrap(); + let _ = discv5.add_enr(enr.clone()); // we expect some of these to fail the filter. } // Number of entries should be `table_limit`, i.e one node got restricted assert_eq!(discv5.kbuckets.read().iter_ref().count(), table_limit); @@ -474,7 +474,7 @@ async fn test_bucket_limits() { let config = Discv5ConfigBuilder::new().ip_limit().build(); let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); for enr in enrs { - discv5.add_enr(enr.clone()).unwrap(); + let _ = discv5.add_enr(enr.clone()); // we expect some of these to fail based on the filter. } // Number of entries should be equal to `bucket_limit`. diff --git a/src/handler/crypto/ecdh.rs b/src/handler/crypto/ecdh.rs index fe0c7a07d..9a73978dd 100644 --- a/src/handler/crypto/ecdh.rs +++ b/src/handler/crypto/ecdh.rs @@ -1,5 +1,6 @@ //! Implements the static ecdh algorithm required by discv5 in terms of the `k256` library. -use k256::{ +use super::k256::{ + self, ecdsa::{SigningKey, VerifyingKey}, elliptic_curve::sec1::ToEncodedPoint, }; diff --git a/src/handler/mod.rs b/src/handler/mod.rs index eaed8e01f..32184b16a 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -95,7 +95,7 @@ pub enum HandlerResponse { /// /// A session is only considered established once we have received a signed ENR from the /// node and received messages from it's `SocketAddr` matching it's ENR fields. - Established(Enr), + Established(Enr, ConnectionDirection), /// A Request has been received. Request(NodeAddress, Box), @@ -113,14 +113,26 @@ pub enum HandlerResponse { RequestFailed(RequestId, RequestError), } +/// How we connected to the node. +#[derive(PartialEq, Eq, Debug, Copy, Clone)] +pub enum ConnectionDirection { + /// The node contacted us. + Incoming, + /// We contacted the node. + Outgoing, +} + /// A reference for the application layer to send back when the handler requests any known /// ENR for the NodeContact. #[derive(Debug, Clone, PartialEq)] pub struct WhoAreYouRef(pub NodeAddress, MessageNonce); #[derive(Debug)] +/// A Challenge (WHOAREYOU) object used to handle and send WHOAREYOU requests. pub struct Challenge { + /// The challenge data received from the node. data: ChallengeData, + /// The remote's ENR if we know it. We can receive a challenge from an unknown node. remote_enr: Option, } @@ -139,10 +151,18 @@ pub(crate) struct RequestCall { /// If we receive a Nodes Response with a total greater than 1. This keeps track of the /// remaining responses expected. remaining_responses: Option, + /// Signifies if we are initiating the session with a random packet. This is only used to + /// determine the connection direction of the session. + initiating_session: bool, } impl RequestCall { - fn new(contact: NodeContact, packet: Packet, request: Request) -> Self { + fn new( + contact: NodeContact, + packet: Packet, + request: Request, + initiating_session: bool, + ) -> Self { RequestCall { contact, packet, @@ -150,6 +170,7 @@ impl RequestCall { handshake_sent: false, retries: 1, remaining_responses: None, + initiating_session, } } @@ -203,7 +224,7 @@ type HandlerReturn = ( ); impl Handler { /// A new Session service which instantiates the UDP socket send/recv tasks. - pub(crate) async fn spawn( + pub async fn spawn( enr: Arc>, key: Arc>, listen_socket: SocketAddr, @@ -433,23 +454,27 @@ impl Handler { return Ok(()); } - let packet = { + let (packet, initiating_session) = { if let Some(session) = self.sessions.get_mut(&node_address) { // Encrypt the message and send - session + let packet = session .encrypt_message(self.node_id, &request.clone().encode()) - .map_err(|e| RequestError::EncryptionFailed(format!("{:?}", e)))? + .map_err(|e| RequestError::EncryptionFailed(format!("{:?}", e)))?; + (packet, false) } else { // No session exists, start a new handshake trace!( "Starting session. Sending random packet to: {}", node_address ); - Packet::new_random(&self.node_id).map_err(|e| RequestError::EntropyFailure(e))? + let packet = Packet::new_random(&self.node_id) + .map_err(|e| RequestError::EntropyFailure(e))?; + // We are initiating a new session + (packet, true) } }; - let call = RequestCall::new(contact, packet.clone(), request); + let call = RequestCall::new(contact, packet.clone(), request, initiating_session); // let the filter know we are expecting a response self.add_expected_response(node_address.socket_addr); let nonce = *packet.message_nonce(); @@ -644,11 +669,23 @@ impl Handler { .expect("All sent requests must have a node address"); match request_call.contact.clone() { NodeContact::Enr(enr) => { - // We already know the ENR. Send the handshake response packet + // NOTE: Here we decide if the session is outgoing or ingoing. The condition for an + // outgoing session is that we originally sent a RANDOM packet (signifying we did + // not have a session for a request) and the packet is not a PING (we are not + // trying to update an old session that may have expired. + let connection_direction = { + match (&request_call.initiating_session, &request_call.request.body) { + (true, RequestBody::Ping { .. }) => ConnectionDirection::Incoming, + (true, _) => ConnectionDirection::Outgoing, + (false, _) => ConnectionDirection::Incoming, + } + }; + // We already know the ENR. Send the handshake response packet trace!("Sending Authentication response to node: {}", node_address); request_call.packet = auth_packet.clone(); request_call.handshake_sent = true; + request_call.initiating_session = false; // Reinsert the request_call self.insert_active_request(request_call); // Send the actual packet to the send task. @@ -656,7 +693,7 @@ impl Handler { // Notify the application that the session has been established self.outbound_channel - .send(HandlerResponse::Established(*enr)) + .send(HandlerResponse::Established(*enr, connection_direction)) .await .unwrap_or_else(|e| warn!("Error with sending channel: {}", e)); } @@ -736,9 +773,14 @@ impl Handler { if self.verify_enr(&enr, &node_address) { // Session is valid // Notify the application + // The session established here are from WHOAREYOU packets that we sent. + // This occurs when a node established a connection with us. let _ = self .outbound_channel - .send(HandlerResponse::Established(enr)) + .send(HandlerResponse::Established( + enr, + ConnectionDirection::Incoming, + )) .await; self.new_session(node_address.clone(), session); self.handle_message( @@ -865,9 +907,15 @@ impl Handler { if let Some(enr) = nodes.pop() { if self.verify_enr(&enr, &node_address) { // Notify the application + // This can occur when we try to dial a node without an + // ENR. In this case we have attempted to establish the + // connection, so this is an outgoing connection. let _ = self .outbound_channel - .send(HandlerResponse::Established(enr)) + .send(HandlerResponse::Established( + enr, + ConnectionDirection::Outgoing, + )) .await; return; } diff --git a/src/handler/tests.rs b/src/handler/tests.rs index c758b8bf4..a6d303547 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -167,7 +167,7 @@ async fn multiple_messages() { let sender = async move { loop { match sender_handler_recv.recv().await { - Some(HandlerResponse::Established(_)) => { + Some(HandlerResponse::Established(_, _)) => { // now the session is established, send the rest of the messages for _ in 0..messages_to_send - 1 { let _ = sender_handler.send(HandlerRequest::Request( diff --git a/src/kbucket.rs b/src/kbucket.rs index bde54f628..aa97bfe5c 100644 --- a/src/kbucket.rs +++ b/src/kbucket.rs @@ -72,13 +72,19 @@ mod bucket; mod entry; +mod filter; mod key; pub use entry::*; -use crate::Enr; +pub use crate::handler::ConnectionDirection; use arrayvec::{self, ArrayVec}; use bucket::KBucket; +pub use bucket::{ + ConnectionState, FailureReason, InsertResult as BucketInsertResult, UpdateResult, + MAX_NODES_PER_BUCKET, +}; +pub use filter::{Filter, IpBucketFilter, IpTableFilter}; use std::{ collections::VecDeque, time::{Duration, Instant}, @@ -86,8 +92,6 @@ use std::{ /// Maximum number of k-buckets. const NUM_BUCKETS: usize = 256; -/// Number of permitted nodes in the same /24 subnet -const MAX_NODES_PER_SUBNET_TABLE: usize = 10; /// A key that can be returned from the `closest_keys` function, which indicates if the key matches the /// predicate or not. @@ -109,8 +113,8 @@ impl From> for Key { } /// A `KBucketsTable` represents a Kademlia routing table. -#[derive(Debug, Clone)] -pub struct KBucketsTable { +#[derive(Clone)] +pub struct KBucketsTable { /// The key identifying the local peer that owns the routing table. local_key: Key, /// The buckets comprising the routing table. @@ -118,6 +122,42 @@ pub struct KBucketsTable { /// The list of evicted entries that have been replaced with pending /// entries since the last call to [`KBucketsTable::take_applied_pending`]. applied_pending: VecDeque>, + /// Filter to be applied at the table level when adding/updating a node. + table_filter: Option>>, +} + +#[must_use] +#[derive(Debug, Clone)] +/// Informs if the record was inserted. +pub enum InsertResult { + /// The node didn't exist and the new record was inserted. + Inserted, + /// The node was inserted into a pending state. + Pending { + /// The key of the least-recently connected entry that is currently considered + /// disconnected and whose corresponding peer should be checked for connectivity + /// in order to prevent it from being evicted. If connectivity to the peer is + /// re-established, the corresponding entry should be updated with + /// [`NodeStatus::Connected`]. + disconnected: Key, + }, + /// The node existed and the status was updated. + StatusUpdated { + // Returns true if the status updated promoted a disconnected node to a connected node. + promoted_to_connected: bool, + }, + /// The node existed and the value was updated. + ValueUpdated, + /// Both the status and value were updated. + Updated { + // Returns true if the status updated promoted a disconnected node to a connected node. + promoted_to_connected: bool, + }, + /// The pending slot was updated. + UpdatedPending, + /// The record failed to be inserted. This can happen to not passing table/bucket filters or + /// the bucket was full. + Failed(FailureReason), } /// A (type-safe) index into a `KBucketsTable`, i.e. a non-negative integer in the @@ -148,6 +188,7 @@ impl BucketIndex { impl KBucketsTable where TNodeId: Clone, + TVal: Eq, { /// Creates a new, empty Kademlia routing table with entries partitioned /// into buckets as per the Kademlia protocol. @@ -155,13 +196,255 @@ where /// The given `pending_timeout` specifies the duration after creation of /// a [`PendingEntry`] after which it becomes eligible for insertion into /// a full bucket, replacing the least-recently (dis)connected node. - pub fn new(local_key: Key, pending_timeout: Duration) -> Self { + /// + /// A filter can be applied that limits entries into a bucket based on the buckets contents. + /// Entries that fail the filter, will not be inserted. + pub fn new( + local_key: Key, + pending_timeout: Duration, + max_incoming_per_bucket: usize, + table_filter: Option>>, + bucket_filter: Option>>, + ) -> Self { KBucketsTable { local_key, buckets: (0..NUM_BUCKETS) - .map(|_| KBucket::new(pending_timeout)) + .map(|_| { + KBucket::new( + pending_timeout, + max_incoming_per_bucket, + bucket_filter.clone(), + ) + }) .collect(), applied_pending: VecDeque::new(), + table_filter, + } + } + + // Updates a node's status if it exists in the table. + // This checks all table and bucket filters before performing the update. + pub fn update_node_status( + &mut self, + key: &Key, + state: ConnectionState, + direction: Option, + ) -> UpdateResult { + let index = BucketIndex::new(&self.local_key.distance(key)); + if let Some(i) = index { + let bucket = &mut self.buckets[i.get()]; + if let Some(applied) = bucket.apply_pending() { + self.applied_pending.push_back(applied) + } + + bucket.update_status(key, state, direction) + } else { + UpdateResult::NotModified // The key refers to our current node. + } + } + + /// Updates a node's value if it exists in the table. + /// + /// Optionally the connection state can be modified. + pub fn update_node( + &mut self, + key: &Key, + value: TVal, + state: Option, + ) -> UpdateResult { + // Apply the table filter + let mut passed_table_filter = true; + if let Some(table_filter) = self.table_filter.as_ref() { + // Check if the value is a duplicate before applying the table filter (optimisation). + let duplicate = { + let index = BucketIndex::new(&self.local_key.distance(key)); + if let Some(i) = index { + let bucket = &mut self.buckets[i.get()]; + if let Some(node) = bucket.get(&key) { + if node.value == value { + true + } else { + false + } + } else { + false + } + } else { + false + } + }; + + // If the Value is new, check the table filter + if !duplicate && !table_filter.filter(&value, &mut self.table_iter()) { + passed_table_filter = false; + } + } + + let index = BucketIndex::new(&self.local_key.distance(key)); + if let Some(i) = index { + let bucket = &mut self.buckets[i.get()]; + if let Some(applied) = bucket.apply_pending() { + self.applied_pending.push_back(applied) + } + + if !passed_table_filter { + bucket.remove(key); + return UpdateResult::Failed(FailureReason::TableFilter); + } + + let update_result = bucket.update_value(key, value); + + match &update_result { + UpdateResult::Failed(_) => { + return update_result; + } + _ => {} + } + + // If we need to update the connection state, update it here. + let status_result = if let Some(state) = state { + bucket.update_status(key, state, None) + } else { + UpdateResult::NotModified + }; + + // Return an appropriate value + match (&update_result, &status_result) { + (_, UpdateResult::Failed(_)) => status_result, + (UpdateResult::Failed(_), _) => update_result, + (_, UpdateResult::UpdatedAndPromoted) => UpdateResult::UpdatedAndPromoted, + (UpdateResult::UpdatedPending, _) => UpdateResult::UpdatedPending, + (_, UpdateResult::UpdatedPending) => UpdateResult::UpdatedPending, + (UpdateResult::NotModified, UpdateResult::NotModified) => UpdateResult::NotModified, + (_, _) => UpdateResult::Updated, + } + } else { + UpdateResult::NotModified // The key refers to our current node. + } + } + + // Attempts to insert or update + pub fn insert_or_update( + &mut self, + key: &Key, + value: TVal, + status: NodeStatus, + ) -> InsertResult { + // Check the table filter + let mut passed_table_filter = true; + if let Some(table_filter) = self.table_filter.as_ref() { + // Check if the value is a duplicate before applying the table filter (optimisation). + let duplicate = { + let index = BucketIndex::new(&self.local_key.distance(key)); + if let Some(i) = index { + let bucket = &mut self.buckets[i.get()]; + if let Some(node) = bucket.get(&key) { + if node.value == value { + true + } else { + false + } + } else { + false + } + } else { + false + } + }; + + if !duplicate && !table_filter.filter(&value, &mut self.table_iter()) { + passed_table_filter = false; + } + } + + let index = BucketIndex::new(&self.local_key.distance(key)); + if let Some(i) = index { + let bucket = &mut self.buckets[i.get()]; + if let Some(applied) = bucket.apply_pending() { + self.applied_pending.push_back(applied) + } + + if !passed_table_filter { + bucket.remove(key); + return InsertResult::Failed(FailureReason::TableFilter); + } + + // If the node doesn't exist, insert it + if bucket.position(key).is_none() { + let node = Node { + key: key.clone(), + value, + status, + }; + match bucket.insert(node) { + bucket::InsertResult::NodeExists => unreachable!("Node must exist"), + bucket::InsertResult::Full => InsertResult::Failed(FailureReason::BucketFull), + bucket::InsertResult::TooManyIncoming => { + InsertResult::Failed(FailureReason::TooManyIncoming) + } + bucket::InsertResult::FailedFilter => { + InsertResult::Failed(FailureReason::BucketFilter) + } + bucket::InsertResult::Pending { disconnected } => { + InsertResult::Pending { disconnected } + } + bucket::InsertResult::Inserted => InsertResult::Inserted, + } + } else { + // The node exists in the bucket + // Attempt to update the status + let update_status = bucket.update_status(key, status.state, Some(status.direction)); + + if update_status.failed() { + // The node was removed from the table + return InsertResult::Failed(FailureReason::TooManyIncoming); + } + // Attempt to update the value + let update_value = bucket.update_value(key, value); + + match (update_value, update_status) { + (UpdateResult::Updated { .. }, UpdateResult::Updated) => { + InsertResult::Updated { + promoted_to_connected: false, + } + } + (UpdateResult::Updated { .. }, UpdateResult::UpdatedAndPromoted) => { + InsertResult::Updated { + promoted_to_connected: true, + } + } + (UpdateResult::Updated { .. }, UpdateResult::NotModified) + | (UpdateResult::Updated { .. }, UpdateResult::UpdatedPending) => { + InsertResult::ValueUpdated + } + (UpdateResult::NotModified, UpdateResult::Updated) => { + InsertResult::StatusUpdated { + promoted_to_connected: false, + } + } + (UpdateResult::NotModified, UpdateResult::UpdatedAndPromoted) => { + InsertResult::StatusUpdated { + promoted_to_connected: true, + } + } + (UpdateResult::NotModified, UpdateResult::NotModified) => { + InsertResult::Updated { + promoted_to_connected: false, + } + } + (UpdateResult::UpdatedPending, _) | (_, UpdateResult::UpdatedPending) => { + InsertResult::UpdatedPending + } + (UpdateResult::Failed(reason), _) => InsertResult::Failed(reason), + (_, UpdateResult::Failed(_)) => unreachable!("Status failure handled earlier."), + (UpdateResult::UpdatedAndPromoted, _) => { + unreachable!("Value update cannot promote a connection.") + } + } + } + } else { + // Cannot insert our local entry. + InsertResult::Failed(FailureReason::InvalidSelfUpdate) } } @@ -181,6 +464,8 @@ where /// Returns an `Entry` for the given key, representing the state of the entry /// in the routing table. + /// NOTE: This must be used with caution. Modifying values manually can bypass the internal + /// table filters and ingoing/outgoing limits. pub fn entry<'a>(&'a mut self, key: &'a Key) -> Entry<'a, TNodeId, TVal> { let index = BucketIndex::new(&self.local_key.distance(key)); if let Some(i) = index { @@ -201,27 +486,37 @@ where if let Some(applied) = table.apply_pending() { applied_pending.push_back(applied) } - table.iter().map(move |(n, status)| EntryRefView { + table.iter().map(move |n| EntryRefView { node: NodeRefView { key: &n.key, value: &n.value, }, - status, + status: n.status, }) }) } + /// Returns an iterator over all the entries in the routing table to give to a table filter. + /// + /// This differs from the regular iterator as it doesn't take ownership of self and doesn't try + /// to apply any pending nodes. + fn table_iter(&self) -> impl Iterator { + self.buckets + .iter() + .flat_map(move |table| table.iter().map(|n| &n.value)) + } + /// Returns an iterator over all the entries in the routing table. /// Does not add pending node to kbucket to get an iterator which /// takes a reference instead of a mutable reference. pub fn iter_ref(&self) -> impl Iterator> { self.buckets.iter().flat_map(move |table| { - table.iter().map(move |(n, status)| EntryRefView { + table.iter().map(move |n| EntryRefView { node: NodeRefView { key: &n.key, value: &n.value, }, - status, + status: n.status, }) }) } @@ -269,12 +564,15 @@ where // Note we search via distance in order for distance in distances { let bucket = &self.buckets[(distance - 1) as usize]; - for node in bucket.iter().map(|(n, status)| { + for node in bucket.iter().map(|n| { let node = NodeRefView { key: &n.key, value: &n.value, }; - EntryRefView { node, status } + EntryRefView { + node, + status: n.status, + } }) { matching_nodes.push(node); // Exit early if we have found enough nodes @@ -301,8 +599,8 @@ where iter: None, table: self, buckets_iter: ClosestBucketsIter::new(distance), - fmap: |b: &KBucket<_, _>| -> ArrayVec<_> { - b.iter().map(|(n, _)| n.key.clone()).collect() + fmap: |b: &KBucket<_, _>| -> ArrayVec<_, MAX_NODES_PER_BUCKET> { + b.iter().map(|n| n.key.clone()).collect() }, } } @@ -324,9 +622,9 @@ where iter: None, table: self, buckets_iter: ClosestBucketsIter::new(distance), - fmap: move |b: &KBucket| -> ArrayVec<_> { + fmap: move |b: &KBucket| -> ArrayVec<_, MAX_NODES_PER_BUCKET> { b.iter() - .map(|(n, _)| PredicateKey { + .map(|n| PredicateKey { key: n.key.clone(), predicate_match: predicate(&n.value), }) @@ -345,29 +643,11 @@ where None } } - - /// Checks if key and value can be inserted into the kbuckets table. - /// A single bucket can only have `MAX_NODES_PER_SUBNET_BUCKET` nodes per /24 subnet. - /// The entire table can only have `MAX_NODES_PER_SUBNET_TABLE` nodes per /24 subnet. - pub fn check( - &self, - key: &Key, - value: &TVal, - f: impl Fn(&TVal, Vec<&TVal>, usize) -> bool, - ) -> bool { - let bucket = self.get_bucket(key); - if let Some(b) = bucket { - let others = self.iter_ref().map(|e| e.node.value).collect(); - f(value, others, MAX_NODES_PER_SUBNET_TABLE) && b.check(value, f) - } else { - true - } - } } /// An iterator over (some projection of) the closest entries in a /// `KBucketsTable` w.r.t. some target `Key`. -struct ClosestIter<'a, TTarget, TNodeId, TVal, TMap, TOut> { +struct ClosestIter<'a, TTarget, TNodeId, TVal: Eq, TMap, TOut> { /// A reference to the target key whose distance to the local key determines /// the order in which the buckets are traversed. The resulting /// array from projecting the entries of each bucket using `fmap` is @@ -379,7 +659,7 @@ struct ClosestIter<'a, TTarget, TNodeId, TVal, TMap, TOut> { /// distance of the local key to the target. buckets_iter: ClosestBucketsIter, /// The iterator over the entries in the currently traversed bucket. - iter: Option>, + iter: Option>, /// The projection function / mapping applied on each bucket as /// it is encountered, producing the next `iter`ator. fmap: TMap, @@ -483,7 +763,8 @@ impl Iterator for ClosestIter<'_, TTarget, TNodeId, TVal, TMap, TOut> where TNodeId: Clone, - TMap: Fn(&KBucket) -> ArrayVec<[TOut; MAX_NODES_PER_BUCKET]>, + TVal: Eq, + TMap: Fn(&KBucket) -> ArrayVec, TOut: AsRef>, { type Item = TOut; @@ -519,18 +800,38 @@ where #[cfg(test)] mod tests { - use super::*; + use super::{bucket::InsertResult as BucketInsertResult, *}; use enr::NodeId; + fn connected_state() -> NodeStatus { + NodeStatus { + state: ConnectionState::Connected, + direction: ConnectionDirection::Outgoing, + } + } + + fn disconnected_state() -> NodeStatus { + NodeStatus { + state: ConnectionState::Disconnected, + direction: ConnectionDirection::Outgoing, + } + } + #[test] fn basic_closest() { let local_key = Key::from(NodeId::random()); let other_id = Key::from(NodeId::random()); - let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5)); + let mut table = KBucketsTable::<_, ()>::new( + local_key, + Duration::from_secs(5), + MAX_NODES_PER_BUCKET, + None, + None, + ); if let Entry::Absent(entry) = table.entry(&other_id) { - match entry.insert((), NodeStatus::Connected) { - InsertResult::Inserted => (), + match entry.insert((), connected_state()) { + BucketInsertResult::Inserted => (), _ => panic!(), } } else { @@ -545,7 +846,13 @@ mod tests { #[test] fn update_local_id_fails() { let local_key = Key::from(NodeId::random()); - let mut table = KBucketsTable::<_, ()>::new(local_key.clone(), Duration::from_secs(5)); + let mut table = KBucketsTable::<_, ()>::new( + local_key.clone(), + Duration::from_secs(5), + MAX_NODES_PER_BUCKET, + None, + None, + ); match table.entry(&local_key) { Entry::SelfEntry => (), _ => panic!(), @@ -555,7 +862,13 @@ mod tests { #[test] fn closest() { let local_key = Key::from(NodeId::random()); - let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5)); + let mut table = KBucketsTable::<_, ()>::new( + local_key, + Duration::from_secs(5), + MAX_NODES_PER_BUCKET, + None, + None, + ); let mut count = 0; loop { if count == 100 { @@ -563,8 +876,8 @@ mod tests { } let key = Key::from(NodeId::random()); if let Entry::Absent(e) = table.entry(&key) { - match e.insert((), NodeStatus::Connected) { - InsertResult::Inserted => count += 1, + match e.insert((), connected_state()) { + BucketInsertResult::Inserted => count += 1, _ => continue, } } else { @@ -575,7 +888,7 @@ mod tests { let mut expected_keys: Vec<_> = table .buckets .iter() - .flat_map(|t| t.iter().map(|(n, _)| n.key.clone())) + .flat_map(|t| t.iter().map(|n| n.key.clone())) .collect(); for _ in 0..10 { @@ -590,22 +903,29 @@ mod tests { #[test] fn applied_pending() { let local_key = Key::from(NodeId::random()); - let mut table = KBucketsTable::<_, ()>::new(local_key.clone(), Duration::from_millis(1)); + let mut table = KBucketsTable::<_, ()>::new( + local_key.clone(), + Duration::from_millis(1), + MAX_NODES_PER_BUCKET, + None, + None, + ); let expected_applied; let full_bucket_index; loop { let key = Key::from(NodeId::random()); if let Entry::Absent(e) = table.entry(&key) { - match e.insert((), NodeStatus::Disconnected) { - InsertResult::Full => { + match e.insert((), disconnected_state()) { + BucketInsertResult::Full => { if let Entry::Absent(e) = table.entry(&key) { - match e.insert((), NodeStatus::Connected) { - InsertResult::Pending { disconnected } => { + match e.insert((), connected_state()) { + BucketInsertResult::Pending { disconnected } => { expected_applied = AppliedPending { inserted: key.clone(), evicted: Some(Node { key: disconnected, value: (), + status: disconnected_state(), }), }; full_bucket_index = BucketIndex::new(&key.distance(&local_key)); @@ -630,7 +950,13 @@ mod tests { full_bucket.pending_mut().unwrap().set_ready_at(elapsed); match table.entry(&expected_applied.inserted) { - Entry::Present(_, NodeStatus::Connected) => {} + Entry::Present( + _, + NodeStatus { + state: ConnectionState::Connected, + direction: _direction, + }, + ) => {} x => panic!("Unexpected entry: {:?}", x), } @@ -643,24 +969,3 @@ mod tests { assert_eq!(None, table.take_applied_pending()); } } - -/// Takes an `ENR` to insert and a list of other `ENR`s to compare against. -/// Returns `true` if `ENR` can be inserted and `false` otherwise. -/// `enr` can be inserted if the count of enrs in `others` in the same /24 subnet as `ENR` -/// is less than `limit`. -pub fn ip_limiter(enr: &Enr, others: &[&Enr], limit: usize) -> bool { - let mut allowed = true; - if let Some(ip) = enr.ip() { - let count = others.iter().flat_map(|e| e.ip()).fold(0, |acc, x| { - if x.octets()[0..3] == ip.octets()[0..3] { - acc + 1 - } else { - acc - } - }); - if count >= limit { - allowed = false; - } - }; - allowed -} diff --git a/src/kbucket/bucket.rs b/src/kbucket/bucket.rs index f1eb2be91..78abfda98 100644 --- a/src/kbucket/bucket.rs +++ b/src/kbucket/bucket.rs @@ -34,17 +34,13 @@ use super::*; /// Maximum number of nodes in a bucket, i.e. the (fixed) `k` parameter. pub const MAX_NODES_PER_BUCKET: usize = 16; -const MAX_NODES_PER_SUBNET_BUCKET: usize = 2; /// A `PendingNode` is a `Node` that is pending insertion into a `KBucket`. #[derive(Debug, Clone)] -pub struct PendingNode { +pub struct PendingNode { /// The pending node to insert. node: Node, - /// The status of the pending node. - status: NodeStatus, - /// The instant at which the pending node is eligible for insertion into a bucket. replace: Instant, } @@ -55,16 +51,42 @@ pub struct PendingNode { /// last status change determines the position of the node in a /// bucket. #[derive(PartialEq, Eq, Debug, Copy, Clone)] -pub enum NodeStatus { - /// The node is considered connected. +pub struct NodeStatus { + /// The direction (incoming or outgoing) for the node. If in the disconnected state, this + /// represents the last connection status. + pub direction: ConnectionDirection, + /// The connection state, connected or disconnected. + pub state: ConnectionState, +} + +/// The connection state of a node. +#[derive(PartialEq, Eq, Debug, Copy, Clone)] +pub enum ConnectionState { + /// The node is connected. Connected, /// The node is considered disconnected. Disconnected, } -impl PendingNode { +impl NodeStatus { + pub fn is_connected(&self) -> bool { + match self.state { + ConnectionState::Connected => true, + ConnectionState::Disconnected => false, + } + } + + pub fn is_incoming(&self) -> bool { + match self.direction { + ConnectionDirection::Outgoing => false, + ConnectionDirection::Incoming => true, + } + } +} + +impl PendingNode { pub fn status(&self) -> NodeStatus { - self.status + self.node.status } pub fn value_mut(&mut self) -> &mut TVal { @@ -80,11 +102,13 @@ impl PendingNode { /// in the Kademlia DHT together with an associated value (e.g. contact /// information). #[derive(Debug, Clone, PartialEq, Eq)] -pub struct Node { +pub struct Node { /// The key of the node, identifying the peer. pub key: Key, /// The associated value. pub value: TVal, + /// The status of the node. + pub status: NodeStatus, } /// The position of a node in a `KBucket`, i.e. a non-negative integer @@ -94,10 +118,10 @@ pub struct Position(usize); /// A `KBucket` is a list of up to `MAX_NODES_PER_BUCKET` `Key`s and associated values, /// ordered from least-recently connected to most-recently connected. -#[derive(Debug, Clone)] -pub struct KBucket { +#[derive(Clone)] +pub struct KBucket { /// The nodes contained in the bucket. - nodes: ArrayVec<[Node; MAX_NODES_PER_BUCKET]>, + nodes: ArrayVec, MAX_NODES_PER_BUCKET>, /// The position (index) in `nodes` that marks the first connected node. /// @@ -122,6 +146,14 @@ pub struct KBucket { /// if the least-recently connected node is not updated as being connected /// in the meantime. pending_timeout: Duration, + + /// An optional filter that filters new entries given an iterator over current entries in + /// the bucket. + filter: Option>>, + + /// The maximum number of incoming connections allowed per bucket. Setting this to + /// MAX_NODES_PER_BUCKET means there is no restriction on incoming nodes. + max_incoming: usize, } /// The result of inserting an entry into a bucket. @@ -142,14 +174,63 @@ pub enum InsertResult { /// [`NodeStatus::Connected`]. disconnected: Key, }, + /// The attempted entry failed to pass the filter. + FailedFilter, + /// There were too many incoming nodes for this bucket. + TooManyIncoming, /// The entry was not inserted because the relevant bucket is full. Full, + /// The entry already exists. + NodeExists, +} + +/// The result of performing an update on a kbucket/table. +#[must_use] +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum UpdateResult { + /// The node was updated successfully, + Updated, + /// The update promoted the node to a connected state from a disconnected state. + UpdatedAndPromoted, + /// The pending entry was updated. + UpdatedPending, + /// The update removed the node because it would violate the incoming peers condition. + Failed(FailureReason), + /// There were no changes made to the value of the node. + NotModified, +} + +impl UpdateResult { + // The update failed. + pub fn failed(&self) -> bool { + match self { + UpdateResult::Failed(_) => true, + _ => false, + } + } +} + +/// A reason for failing to update or insert a node into the bucket. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FailureReason { + /// Too many incoming nodes already in the bucket. + TooManyIncoming, + /// The node didn't pass the bucket filter. + BucketFilter, + /// The node didn't pass the table filter. + TableFilter, + /// The node didn't exist. + KeyNonExistant, + /// The bucket was full. + BucketFull, + /// Cannot update self, + InvalidSelfUpdate, } /// The result of applying a pending node to a bucket, possibly /// replacing an existing node. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct AppliedPending { +pub struct AppliedPending { /// The key of the inserted pending node. pub inserted: Key, /// The node that has been evicted from the bucket to make room for the @@ -160,14 +241,21 @@ pub struct AppliedPending { impl KBucket where TNodeId: Clone, + TVal: Eq, { /// Creates a new `KBucket` with the given timeout for pending entries. - pub fn new(pending_timeout: Duration) -> Self { + pub fn new( + pending_timeout: Duration, + max_incoming: usize, + filter: Option>>, + ) -> Self { KBucket { nodes: ArrayVec::new(), first_connected_pos: None, pending: None, pending_timeout, + filter, + max_incoming, } } @@ -188,11 +276,8 @@ where } /// Returns an iterator over the nodes in the bucket, together with their status. - pub fn iter(&self) -> impl Iterator, NodeStatus)> { - self.nodes - .iter() - .enumerate() - .map(move |(p, n)| (n, self.status(Position(p)))) + pub fn iter(&self) -> impl Iterator> { + self.nodes.iter() } /// Inserts the pending node into the bucket, if its timeout has elapsed, @@ -205,15 +290,35 @@ where if let Some(pending) = self.pending.take() { if pending.replace <= Instant::now() { if self.nodes.is_full() { - if self.status(Position(0)) == NodeStatus::Connected { + // Apply bucket filters + + // Check if the bucket is full + if self.nodes[0].status.is_connected() { // The bucket is full with connected nodes. Drop the pending node. return None; } + // Check the custom filter + if let Some(filter) = self.filter.as_ref() { + if !filter.filter( + &pending.node.value, + &mut self.iter().map(|node| &node.value), + ) { + return None; + } + } + // Check the incoming node restriction + if pending.status().is_connected() && pending.status().is_incoming() { + // Make sure this doesn't violate the incoming conditions + if self.is_max_incoming() { + return None; + } + } + // The pending node will be inserted. let inserted = pending.node.key.clone(); // A connected pending node goes at the end of the list for // the connected peers, removing the least-recently connected. - if pending.status == NodeStatus::Connected { + if pending.status().is_connected() { let evicted = Some(self.nodes.remove(0)); self.first_connected_pos = self .first_connected_pos @@ -239,14 +344,14 @@ where } else { // There is room in the bucket, so just insert the pending node. let inserted = pending.node.key.clone(); - match self.insert(pending.node, pending.status) { + match self.insert(pending.node) { InsertResult::Inserted => { return Some(AppliedPending { inserted, evicted: None, }) } - _ => unreachable!("Bucket is not full."), + _ => unreachable!("Bucket is not full."), // Bucket filter should already be checked } } } else { @@ -260,13 +365,21 @@ where /// Updates the status of the pending node, if any. pub fn update_pending(&mut self, status: NodeStatus) { if let Some(pending) = &mut self.pending { - pending.status = status + pending.node.status = status } } /// Updates the status of the node referred to by the given key, if it is - /// in the bucket. - pub fn update(&mut self, key: &Key, status: NodeStatus) { + /// in the bucket. If the node is not in the bucket, or the update would violate a bucket + /// filter or incoming limits, returns an update result indicating the outcome. + /// An optional connection state can be given. If this is omitted the connection state will not + /// be modified. + pub fn update_status( + &mut self, + key: &Key, + state: ConnectionState, + direction: Option, + ) -> UpdateResult { // Remove the node from its current position and then reinsert it // with the desired status, which puts it at the end of either the // prefix list of disconnected nodes or the suffix list of connected @@ -274,11 +387,25 @@ where // respectively). if let Some(pos) = self.position(key) { // Remove the node from its current position. - let old_status = self.status(pos); - let node = self.nodes.remove(pos.0); + let mut node = self.nodes.remove(pos.0); + let old_status = node.status; + node.status.state = state; + if let Some(direction) = direction { + node.status.direction = direction; + } + + // Flag indicating if this update modified the entry. + let not_modified = old_status == node.status; + // Flag indicating we are upgrading to a connected status + let is_connected = if let ConnectionState::Connected = state { + true + } else { + false + }; + // Adjust `first_connected_pos` accordingly. - match old_status { - NodeStatus::Connected => { + match old_status.state { + ConnectionState::Connected => { if self.first_connected_pos.map_or(false, |p| p == pos.0) && pos.0 == self.nodes.len() { @@ -286,21 +413,87 @@ where self.first_connected_pos = None } } - NodeStatus::Disconnected => { + ConnectionState::Disconnected => { self.first_connected_pos = self.first_connected_pos.and_then(|p| p.checked_sub(1)) } } // If the least-recently connected node re-establishes its // connected status, drop the pending node. - if pos == Position(0) && status == NodeStatus::Connected { + if pos == Position(0) && is_connected { self.pending = None } // Reinsert the node with the desired status. - match self.insert(node, status) { - InsertResult::Inserted => {} + match self.insert(node) { + InsertResult::Inserted => { + if not_modified { + UpdateResult::NotModified + } else if !old_status.is_connected() && is_connected { + // This means the status was updated from a disconnected state to connected + // state + UpdateResult::UpdatedAndPromoted + } else { + UpdateResult::Updated + } + } + InsertResult::TooManyIncoming => { + UpdateResult::Failed(FailureReason::TooManyIncoming) + } // Node could not be inserted _ => unreachable!("The node is removed before being (re)inserted."), } + } else { + if let Some(pending) = &mut self.pending { + if &pending.node.key == key { + pending.node.status.state = state; + if let Some(direction) = direction { + pending.node.status.direction = direction; + } + UpdateResult::UpdatedPending + } else { + UpdateResult::Failed(FailureReason::KeyNonExistant) + } + } else { + UpdateResult::Failed(FailureReason::KeyNonExistant) + } + } + } + + /// Updates the value of the node referred to by the given key, if it is + /// in the bucket. If the node is not in the bucket, or the update would violate a bucket + /// filter or incoming limits, returns false and removes the node from the bucket. + /// NOTE: This does not update the position of the node in the table. It node will be removed + /// if it fails the filter however. + pub fn update_value(&mut self, key: &Key, value: TVal) -> UpdateResult { + // Remove the node from its current position, check the filter and add it back in. + if let Some(pos) = self.position(key) { + // Remove the node from its current position. + let mut node = self.nodes.remove(pos.0); + if node.value == value { + self.nodes.insert(pos.0, node); + return UpdateResult::NotModified; + } else { + // check bucket filter + if let Some(filter) = self.filter.as_ref() { + if !filter.filter(&value, &mut self.iter().map(|node| &node.value)) { + self.nodes.remove(pos.0); + return UpdateResult::Failed(FailureReason::BucketFilter); + } + } + node.value = value; + self.nodes.insert(pos.0, node); + return UpdateResult::Updated; + } + } else { + if let Some(pending) = &mut self.pending { + if &pending.node.key == key { + pending.node.value = value; + UpdateResult::UpdatedPending + } else { + UpdateResult::Failed(FailureReason::KeyNonExistant) + } + } else { + UpdateResult::Failed(FailureReason::KeyNonExistant) + } } } @@ -308,7 +501,7 @@ where /// /// The status of the node to insert determines the result as follows: /// - /// * `NodeStatus::Connected`: If the bucket is full and either all nodes are connected + /// * `NodeStatus::ConnectedIncoming` or `NodeStatus::ConnectedOutgoing`: If the bucket is full and either all nodes are connected /// or there is already a pending node, insertion fails with `InsertResult::Full`. /// If the bucket is full but at least one node is disconnected and there is no pending /// node, the new node is inserted as pending, yielding `InsertResult::Pending`. @@ -321,20 +514,37 @@ where /// i.e. as the most-recently disconnected node. If there are no connected nodes, /// the new node is added as the last element of the bucket. /// - pub fn insert( - &mut self, - node: Node, - status: NodeStatus, - ) -> InsertResult { - match status { - NodeStatus::Connected => { + /// The insert can fail if a provided bucket filter does not pass. If a node is attempted + /// to be inserted that doesn't pass the bucket filter, `InsertResult::FailedFilter` will be + /// returned. Similarly, if the inserted node would violate the `max_incoming` value, the + /// result will return `InsertResult::TooManyIncoming`. + pub fn insert(&mut self, node: Node) -> InsertResult { + // Prevent inserting duplicate nodes. + if self.position(&node.key).is_some() { + return InsertResult::NodeExists; + } + + // check bucket filter + if let Some(filter) = self.filter.as_ref() { + if !filter.filter(&node.value, &mut self.iter().map(|node| &node.value)) { + return InsertResult::FailedFilter; + } + } + + match node.status.state { + ConnectionState::Connected => { + if node.status.is_incoming() { + // check the maximum counter + if self.is_max_incoming() { + return InsertResult::TooManyIncoming; + } + } if self.nodes.is_full() { if self.first_connected_pos == Some(0) || self.pending.is_some() { return InsertResult::Full; } else { self.pending = Some(PendingNode { node, - status: NodeStatus::Connected, replace: Instant::now() + self.pending_timeout, }); return InsertResult::Pending { @@ -347,7 +557,7 @@ where self.nodes.push(node); InsertResult::Inserted } - NodeStatus::Disconnected => { + ConnectionState::Disconnected => { if self.nodes.is_full() { return InsertResult::Full; } @@ -372,15 +582,6 @@ where } } - /// Returns the status of the node at the given position. - pub fn status(&self, pos: Position) -> NodeStatus { - if self.first_connected_pos.map_or(false, |i| pos.0 >= i) { - NodeStatus::Connected - } else { - NodeStatus::Disconnected - } - } - /// Gets the number of entries currently in the bucket. pub fn num_entries(&self) -> usize { self.nodes.len() @@ -401,6 +602,19 @@ where self.nodes.iter().position(|p| &p.key == key).map(Position) } + /// Returns the state of the node at the given position. + pub fn status(&self, pos: Position) -> NodeStatus { + if let Some(node) = self.nodes.get(pos.0) { + node.status.clone() + } else { + // If the node isn't in the bucket, return the worst kind of state. + NodeStatus { + state: ConnectionState::Disconnected, + direction: ConnectionDirection::Incoming, + } + } + } + /// Gets a mutable reference to the node identified by the given key. /// /// Returns `None` if the given key does not refer to an node in the @@ -409,14 +623,46 @@ where self.nodes.iter_mut().find(move |p| &p.key == key) } - /// Checks if value can be inserted into the kbuckets table. - /// A single bucket can only have 2 nodes per /24 subnet - pub fn check(&self, value: &TVal, f: impl Fn(&TVal, Vec<&TVal>, usize) -> bool) -> bool { - f( - value, - self.iter().map(|(e, _)| &e.value).collect(), - MAX_NODES_PER_SUBNET_BUCKET, - ) + /// Gets a reference to the node identified by the given key. + /// + /// Returns `None` if the given key does not refer to an node in the + /// bucket. + pub fn get(&mut self, key: &Key) -> Option<&Node> { + self.nodes.iter().find(move |p| &p.key == key) + } + + /// Returns whether the bucket has reached its maximum capacity of incoming nodes. This is used + /// to determine if new nodes can be added to the bucket or not. + fn is_max_incoming(&self) -> bool { + self.nodes + .iter() + .filter(|node| node.status.is_connected() && node.status.is_incoming()) + .count() + >= self.max_incoming + } +} + +impl std::fmt::Debug + for KBucket +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut builder = f.debug_struct("KBucket"); + let _ = builder.field("nodes", &self.nodes); + let _ = builder.field("first_connected_pos", &self.first_connected_pos); + let _ = builder.field("pending", &self.pending); + let _ = builder.field("pending_timeout", &self.pending_timeout); + let _ = builder.field("filter", &self.filter.is_some()); + let _ = builder.field("max_incoming", &self.max_incoming); + builder.finish() + } +} + +impl std::fmt::Display for ConnectionDirection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self { + ConnectionDirection::Incoming => write!(f, "Incoming"), + ConnectionDirection::Outgoing => write!(f, "Outgoing"), + } } } @@ -425,24 +671,41 @@ mod tests { use super::*; use enr::NodeId; use quickcheck::*; - use rand::Rng; + use rand_07::Rng; use std::collections::VecDeque; + fn connected_state() -> NodeStatus { + NodeStatus { + state: ConnectionState::Connected, + direction: ConnectionDirection::Outgoing, + } + } + + fn disconnected_state() -> NodeStatus { + NodeStatus { + state: ConnectionState::Disconnected, + direction: ConnectionDirection::Outgoing, + } + } + impl Arbitrary for KBucket { fn arbitrary(g: &mut G) -> KBucket { let timeout = Duration::from_secs(g.gen_range(1, g.size() as u64)); - let mut bucket = KBucket::::new(timeout); + let mut bucket = KBucket::::new(timeout, MAX_NODES_PER_BUCKET, None); let num_nodes = g.gen_range(1, MAX_NODES_PER_BUCKET + 1); for _ in 0..num_nodes { let key = Key::from(NodeId::random()); - let node = Node { - key: key.clone(), - value: (), - }; - let status = NodeStatus::arbitrary(g); - match bucket.insert(node, status) { - InsertResult::Inserted => {} - _ => panic!(), + loop { + let node = Node { + key: key.clone(), + value: (), + status: NodeStatus::arbitrary(g), + }; + match bucket.insert(node) { + InsertResult::Inserted => break, + InsertResult::TooManyIncoming => {} + _ => panic!(), + } } } bucket @@ -451,10 +714,24 @@ mod tests { impl Arbitrary for NodeStatus { fn arbitrary(g: &mut G) -> NodeStatus { - if g.gen() { - NodeStatus::Connected - } else { - NodeStatus::Disconnected + match g.gen_range(1, 4) { + 1 => NodeStatus { + direction: ConnectionDirection::Incoming, + state: ConnectionState::Connected, + }, + 2 => NodeStatus { + direction: ConnectionDirection::Outgoing, + state: ConnectionState::Connected, + }, + 3 => NodeStatus { + direction: ConnectionDirection::Incoming, + state: ConnectionState::Disconnected, + }, + 4 => NodeStatus { + direction: ConnectionDirection::Outgoing, + state: ConnectionState::Disconnected, + }, + x => unreachable!("Should not generate numbers out of this range {}", x), } } } @@ -470,8 +747,12 @@ mod tests { let num_entries_start = bucket.num_entries(); for i in 0..MAX_NODES_PER_BUCKET - num_entries_start { let key = Key::from(NodeId::random()); - let node = Node { key, value: () }; - assert_eq!(InsertResult::Inserted, bucket.insert(node, status)); + let node = Node { + key, + value: (), + status, + }; + assert_eq!(InsertResult::Inserted, bucket.insert(node)); assert_eq!(bucket.num_entries(), num_entries_start + i + 1); } } @@ -479,7 +760,8 @@ mod tests { #[test] fn ordering() { fn prop(status: Vec) -> bool { - let mut bucket = KBucket::::new(Duration::from_secs(1)); + let mut bucket = + KBucket::::new(Duration::from_secs(1), MAX_NODES_PER_BUCKET, None); // The expected lists of connected and disconnected nodes. let mut connected = VecDeque::new(); @@ -491,12 +773,14 @@ mod tests { let node = Node { key: key.clone(), value: (), + status, }; let full = bucket.num_entries() == MAX_NODES_PER_BUCKET; - if let InsertResult::Inserted = bucket.insert(node, status) { - let vec = match status { - NodeStatus::Connected => &mut connected, - NodeStatus::Disconnected => &mut disconnected, + if let InsertResult::Inserted = bucket.insert(node) { + let vec = if status.is_connected() { + &mut connected + } else { + &mut disconnected }; if full { vec.pop_front(); @@ -508,11 +792,11 @@ mod tests { // Get all nodes from the bucket, together with their status. let mut nodes = bucket .iter() - .map(|(n, s)| (s, n.key.clone())) + .map(|n| (n.status, n.key.clone())) .collect::>(); // Split the list of nodes at the first connected node. - let first_connected_pos = nodes.iter().position(|(s, _)| *s == NodeStatus::Connected); + let first_connected_pos = nodes.iter().position(|(status, _)| status.is_connected()); assert_eq!(bucket.first_connected_pos, first_connected_pos); let tail = first_connected_pos.map_or(Vec::new(), |p| nodes.split_off(p)); @@ -527,24 +811,33 @@ mod tests { #[test] fn full_bucket() { - let mut bucket = KBucket::::new(Duration::from_secs(1)); + let mut bucket = + KBucket::::new(Duration::from_secs(1), MAX_NODES_PER_BUCKET, None); + let disconnected_status = NodeStatus { + state: ConnectionState::Disconnected, + direction: ConnectionDirection::Outgoing, + }; // Fill the bucket with disconnected nodes. - fill_bucket(&mut bucket, NodeStatus::Disconnected); + fill_bucket(&mut bucket, disconnected_status.clone()); // Trying to insert another disconnected node fails. let key = Key::from(NodeId::random()); - let node = Node { key, value: () }; - match bucket.insert(node, NodeStatus::Disconnected) { + let node = Node { + key, + value: (), + status: disconnected_status.clone(), + }; + match bucket.insert(node) { InsertResult::Full => {} x => panic!("{:?}", x), } // One-by-one fill the bucket with connected nodes, replacing the disconnected ones. for i in 0..MAX_NODES_PER_BUCKET { - let (first, first_status) = bucket.iter().next().unwrap(); + let first = bucket.iter().next().unwrap(); let first_disconnected = first.clone(); - assert_eq!(first_status, NodeStatus::Disconnected); + assert_eq!(first.status, disconnected_status); // Add a connected node, which is expected to be pending, scheduled to // replace the first (i.e. least-recently connected) node. @@ -552,8 +845,9 @@ mod tests { let node = Node { key: key.clone(), value: (), + status: connected_state(), }; - match bucket.insert(node.clone(), NodeStatus::Connected) { + match bucket.insert(node.clone()) { InsertResult::Pending { disconnected } => { assert_eq!(disconnected, first_disconnected.key) } @@ -561,7 +855,7 @@ mod tests { } // Trying to insert another connected node fails. - match bucket.insert(node.clone(), NodeStatus::Connected) { + match bucket.insert(node.clone()) { InsertResult::Full => {} x => panic!("{:?}", x), } @@ -579,7 +873,10 @@ mod tests { evicted: Some(first_disconnected) }) ); - assert_eq!(Some((&node, NodeStatus::Connected)), bucket.iter().last()); + assert_eq!( + Some(connected_state()), + bucket.iter().map(|v| v.status).last() + ); assert!(bucket.pending().is_none()); assert_eq!( Some(MAX_NODES_PER_BUCKET - (i + 1)), @@ -592,8 +889,12 @@ mod tests { // Trying to insert another connected node fails. let key = Key::from(NodeId::random()); - let node = Node { key, value: () }; - match bucket.insert(node, NodeStatus::Connected) { + let node = Node { + key, + value: (), + status: connected_state(), + }; + match bucket.insert(node) { InsertResult::Full => {} x => panic!("{:?}", x), } @@ -601,9 +902,10 @@ mod tests { #[test] fn full_bucket_discard_pending() { - let mut bucket = KBucket::::new(Duration::from_secs(1)); - fill_bucket(&mut bucket, NodeStatus::Disconnected); - let (first, _) = bucket.iter().next().unwrap(); + let mut bucket = + KBucket::::new(Duration::from_secs(1), MAX_NODES_PER_BUCKET, None); + fill_bucket(&mut bucket, disconnected_state()); + let first = bucket.iter().next().unwrap(); let first_disconnected = first.clone(); // Add a connected pending node. @@ -611,8 +913,9 @@ mod tests { let node = Node { key: key.clone(), value: (), + status: connected_state(), }; - if let InsertResult::Pending { disconnected } = bucket.insert(node, NodeStatus::Connected) { + if let InsertResult::Pending { disconnected } = bucket.insert(node) { assert_eq!(&disconnected, &first_disconnected.key); } else { panic!() @@ -620,16 +923,16 @@ mod tests { assert!(bucket.pending().is_some()); // Update the status of the first disconnected node to be connected. - bucket.update(&first_disconnected.key, NodeStatus::Connected); + let _ = bucket.update_status(&first_disconnected.key, ConnectionState::Connected, None); // The pending node has been discarded. assert!(bucket.pending().is_none()); - assert!(bucket.iter().all(|(n, _)| n.key != key)); + assert!(bucket.iter().all(|n| n.key != key)); // The initially disconnected node is now the most-recently connected. assert_eq!( - Some((&first_disconnected, NodeStatus::Connected)), - bucket.iter().last() + Some((&first_disconnected.key, connected_state())), + bucket.iter().map(|v| (&v.key, v.status)).last() ); assert_eq!( bucket.position(&first_disconnected.key).map(|p| p.0), @@ -640,7 +943,7 @@ mod tests { } #[test] - fn bucket_update() { + fn bucket_update_status() { fn prop(mut bucket: KBucket, pos: Position, status: NodeStatus) -> bool { let num_nodes = bucket.num_entries(); @@ -651,27 +954,151 @@ mod tests { // Record the (ordered) list of status of all nodes in the bucket. let mut expected = bucket .iter() - .map(|(n, s)| (n.key.clone(), s)) + .map(|n| (n.key.clone(), n.status)) .collect::>(); // Update the node in the bucket. - bucket.update(&key, status); + let _ = bucket.update_status(&key, status.state, Some(status.direction)); // Check that the bucket now contains the node with the new status, // preserving the status and relative order of all other nodes. - let expected_pos = match status { - NodeStatus::Connected => num_nodes - 1, - NodeStatus::Disconnected => bucket.first_connected_pos.unwrap_or(num_nodes) - 1, + let expected_pos = if status.is_connected() { + num_nodes - 1 + } else { + bucket.first_connected_pos.unwrap_or(num_nodes) - 1 }; expected.remove(pos); expected.insert(expected_pos, (key, status)); let actual = bucket .iter() - .map(|(n, s)| (n.key.clone(), s)) + .map(|n| (n.key.clone(), n.status)) .collect::>(); expected == actual } quickcheck(prop as fn(_, _, _) -> _); } + + #[test] + fn table_update_status_connection() { + let max_incoming = 7; + let mut bucket = KBucket::::new(Duration::from_secs(1), max_incoming, None); + + let mut incoming_connected = 0; + let mut keys = Vec::new(); + for _ in 0..MAX_NODES_PER_BUCKET { + let key = Key::from(NodeId::random()); + keys.push(key.clone()); + incoming_connected += 1; + let direction = if incoming_connected <= max_incoming { + ConnectionDirection::Incoming + } else { + ConnectionDirection::Outgoing + }; + let status = NodeStatus { + state: ConnectionState::Connected, + direction, + }; + let node = Node { + key: key.clone(), + value: (), + status, + }; + assert_eq!(InsertResult::Inserted, bucket.insert(node)); + } + + // Bucket is full + // Attempt to modify a new state + let result = bucket.update_status( + &keys[max_incoming], + ConnectionState::Disconnected, + Some(ConnectionDirection::Incoming), + ); + assert_eq!(result, UpdateResult::Updated); + let result = bucket.update_status( + &keys[max_incoming], + ConnectionState::Connected, + Some(ConnectionDirection::Outgoing), + ); + assert_eq!(result, UpdateResult::UpdatedAndPromoted); + let result = bucket.update_status( + &keys[max_incoming], + ConnectionState::Connected, + Some(ConnectionDirection::Outgoing), + ); + assert_eq!(result, UpdateResult::NotModified); + let result = bucket.update_status( + &keys[max_incoming], + ConnectionState::Connected, + Some(ConnectionDirection::Incoming), + ); + assert_eq!(result, UpdateResult::Failed(FailureReason::TooManyIncoming)); + } + + #[test] + fn bucket_max_incoming_nodes() { + fn prop(status: Vec) -> bool { + let max_incoming_nodes = 5; + let mut bucket = + KBucket::::new(Duration::from_secs(1), max_incoming_nodes, None); + + // The expected lists of connected and disconnected nodes. + let mut connected = VecDeque::new(); + let mut disconnected = VecDeque::new(); + + // Fill the bucket, thereby populating the expected lists in insertion order. + for status in status { + let key = Key::from(NodeId::random()); + let node = Node { + key: key.clone(), + value: (), + status, + }; + let full = bucket.num_entries() == MAX_NODES_PER_BUCKET; + match bucket.insert(node) { + InsertResult::Inserted => { + let vec = if status.is_connected() { + &mut connected + } else { + &mut disconnected + }; + if full { + vec.pop_front(); + } + vec.push_back((status, key.clone())); + } + InsertResult::FailedFilter => break, + _ => {} + } + } + + // Get all nodes from the bucket, together with their status. + let mut nodes = bucket + .iter() + .map(|n| (n.status, n.key.clone())) + .collect::>(); + + // Split the list of nodes at the first connected node. + let first_connected_pos = nodes.iter().position(|(status, _)| status.is_connected()); + assert_eq!(bucket.first_connected_pos, first_connected_pos); + let tail = first_connected_pos.map_or(Vec::new(), |p| nodes.split_off(p)); + + let number_of_incoming_nodes = bucket + .iter() + .filter(|n| n.status.is_connected() && n.status.is_incoming()) + .count(); + + assert!(number_of_incoming_nodes <= max_incoming_nodes); + + // All nodes before the first connected node must be disconnected and + // in insertion order. Similarly, all remaining nodes must be connected + // and in insertion order. + // The number of incoming nodes does not exceed the maximum limit. + nodes == Vec::from(disconnected) + && tail == Vec::from(connected) + && number_of_incoming_nodes <= 5 + } + + quickcheck(prop as fn(_) -> _); + } } diff --git a/src/kbucket/entry.rs b/src/kbucket/entry.rs index 6ce439c85..bba03ac6d 100644 --- a/src/kbucket/entry.rs +++ b/src/kbucket/entry.rs @@ -21,18 +21,21 @@ // This basis of this file has been taken from the rust-libp2p codebase: // https://github.com/libp2p/rust-libp2p -//! The `Entry` API for quering and modifying the entries of a `KBucketsTable` +//! The `Entry` API for querying and modifying the entries of a `KBucketsTable` //! representing the nodes participating in the Kademlia DHT. pub use super::{ - bucket::{AppliedPending, InsertResult, Node, NodeStatus, MAX_NODES_PER_BUCKET}, + bucket::{ + AppliedPending, ConnectionState, InsertResult, Node, NodeStatus, MAX_NODES_PER_BUCKET, + }, key::*, + ConnectionDirection, }; use super::*; /// An immutable by-reference view of a bucket entry. -pub struct EntryRefView<'a, TPeerId, TVal> { +pub struct EntryRefView<'a, TPeerId, TVal: Eq> { /// The node represented by the entry. pub node: NodeRefView<'a, TPeerId, TVal>, /// The status of the node identified by the key. @@ -40,7 +43,7 @@ pub struct EntryRefView<'a, TPeerId, TVal> { } /// An immutable by-reference view of a `Node`. -pub struct NodeRefView<'a, TPeerId, TVal> { +pub struct NodeRefView<'a, TPeerId, TVal: Eq> { pub key: &'a Key, pub value: &'a TVal, } @@ -48,14 +51,14 @@ pub struct NodeRefView<'a, TPeerId, TVal> { /// A cloned, immutable view of an entry that is either present in a bucket /// or pending insertion. #[derive(Clone, Debug)] -pub struct EntryView { +pub struct EntryView { /// The node represented by the entry. pub node: Node, /// The status of the node. pub status: NodeStatus, } -impl AsRef> for EntryView { +impl AsRef> for EntryView { fn as_ref(&self) -> &Key { &self.node.key } @@ -63,7 +66,7 @@ impl AsRef> for EntryView { /// A reference into a single entry of a `KBucketsTable`. #[derive(Debug)] -pub enum Entry<'a, TPeerId, TVal> { +pub enum Entry<'a, TPeerId, TVal: Eq> { /// The entry is present in a bucket. Present(PresentEntry<'a, TPeerId, TVal>, NodeStatus), /// The entry is pending insertion in a bucket. @@ -77,7 +80,7 @@ pub enum Entry<'a, TPeerId, TVal> { /// The internal representation of the different states of an `Entry`, /// referencing the associated key and bucket. #[derive(Debug)] -struct EntryRef<'a, TPeerId, TVal> { +struct EntryRef<'a, TPeerId, TVal: Eq> { bucket: &'a mut KBucket, key: &'a Key, } @@ -85,6 +88,7 @@ struct EntryRef<'a, TPeerId, TVal> { impl<'a, TPeerId, TVal> Entry<'a, TPeerId, TVal> where TPeerId: Clone, + TVal: Eq, { /// Creates a new `Entry` for a `Key`, encapsulating access to a bucket. pub(super) fn new(bucket: &'a mut KBucket, key: &'a Key) -> Self { @@ -102,11 +106,12 @@ where /// An entry present in a bucket. #[derive(Debug)] -pub struct PresentEntry<'a, TPeerId, TVal>(EntryRef<'a, TPeerId, TVal>); +pub struct PresentEntry<'a, TPeerId, TVal: Eq>(EntryRef<'a, TPeerId, TVal>); impl<'a, TPeerId, TVal> PresentEntry<'a, TPeerId, TVal> where TPeerId: Clone, + TVal: Eq, { fn new(bucket: &'a mut KBucket, key: &'a Key) -> Self { PresentEntry(EntryRef { bucket, key }) @@ -122,20 +127,26 @@ where .value } - /// Sets the status of the entry to `NodeStatus::Disconnected`. - pub fn update(self, status: NodeStatus) -> Self { - self.0.bucket.update(self.0.key, status); + /// Sets the status of the entry. + pub fn update(self, state: ConnectionState, direction: Option) -> Self { + let _ = self.0.bucket.update_status(self.0.key, state, direction); Self::new(self.0.bucket, self.0.key) } + + /// Removes the entry from the table. + pub fn remove(self) { + self.0.bucket.remove(self.0.key); + } } /// An entry waiting for a slot to be available in a bucket. #[derive(Debug)] -pub struct PendingEntry<'a, TPeerId, TVal>(EntryRef<'a, TPeerId, TVal>); +pub struct PendingEntry<'a, TPeerId, TVal: Eq>(EntryRef<'a, TPeerId, TVal>); -impl<'a, TPeerId, TVal> PendingEntry<'a, TPeerId, TVal> +impl<'a, TPeerId, TVal: Eq> PendingEntry<'a, TPeerId, TVal> where TPeerId: Clone, + TVal: Eq, { fn new(bucket: &'a mut KBucket, key: &'a Key) -> Self { PendingEntry(EntryRef { bucket, key }) @@ -155,15 +166,21 @@ where self.0.bucket.update_pending(status); PendingEntry::new(self.0.bucket, self.0.key) } + + /// Removes the entry from the table. + pub fn remove(self) { + self.0.bucket.remove(self.0.key); + } } /// An entry that is not present in any bucket. #[derive(Debug)] -pub struct AbsentEntry<'a, TPeerId, TVal>(EntryRef<'a, TPeerId, TVal>); +pub struct AbsentEntry<'a, TPeerId, TVal: Eq>(EntryRef<'a, TPeerId, TVal>); impl<'a, TPeerId, TVal> AbsentEntry<'a, TPeerId, TVal> where TPeerId: Clone, + TVal: Eq, { fn new(bucket: &'a mut KBucket, key: &'a Key) -> Self { AbsentEntry(EntryRef { bucket, key }) @@ -171,12 +188,10 @@ where /// Attempts to insert the entry into a bucket. pub fn insert(self, value: TVal, status: NodeStatus) -> InsertResult { - self.0.bucket.insert( - Node { - key: self.0.key.clone(), - value, - }, + self.0.bucket.insert(Node { + key: self.0.key.clone(), + value, status, - ) + }) } } diff --git a/src/kbucket/filter.rs b/src/kbucket/filter.rs new file mode 100644 index 000000000..36fe7fa9b --- /dev/null +++ b/src/kbucket/filter.rs @@ -0,0 +1,96 @@ +//! Provides a trait that can be implemented to apply a filter to a table or bucket. + +use crate::Enr; + +pub trait Filter: FilterClone + Send + Sync { + fn filter( + &self, + value_to_be_inserted: &TVal, + other_vals: &mut dyn Iterator, + ) -> bool; +} + +/// Allow the trait objects to be cloneable. +pub trait FilterClone { + fn clone_box(&self) -> Box>; +} + +impl FilterClone for T +where + T: 'static + Filter + Clone, +{ + fn clone_box(&self) -> Box> { + Box::new(self.clone()) + } +} + +impl Clone for Box> { + fn clone(&self) -> Box> { + self.clone_box() + } +} + +// Implementation of an IP filter for buckets and for tables + +/// Number of permitted nodes in the same /24 subnet per table. +const MAX_NODES_PER_SUBNET_TABLE: usize = 10; +/// The number of nodes permitted in the same /24 subnet per bucket. +const MAX_NODES_PER_SUBNET_BUCKET: usize = 2; + +#[derive(Clone)] +pub struct IpTableFilter; + +impl Filter for IpTableFilter { + fn filter( + &self, + value_to_be_inserted: &Enr, + other_vals: &mut dyn Iterator, + ) -> bool { + ip_filter(value_to_be_inserted, other_vals, MAX_NODES_PER_SUBNET_TABLE) + } +} + +#[derive(Clone)] +pub struct IpBucketFilter; + +impl Filter for IpBucketFilter { + fn filter( + &self, + value_to_be_inserted: &Enr, + other_vals: &mut dyn Iterator, + ) -> bool { + ip_filter( + value_to_be_inserted, + other_vals, + MAX_NODES_PER_SUBNET_BUCKET, + ) + } +} + +fn ip_filter( + value_to_be_inserted: &Enr, + other_vals: &mut dyn Iterator, + limit: usize, +) -> bool { + if let Some(ip) = value_to_be_inserted.ip() { + let mut count = 0; + for enr in other_vals { + // Ignore duplicates + if enr == value_to_be_inserted { + continue; + } + + // Count the same /24 subnet + if let Some(other_ip) = enr.ip() { + if other_ip.octets()[0..3] == ip.octets()[0..3] { + count += 1; + } + } + if count >= limit { + return false; + } + } + } + // No IP, so no restrictions + true +} diff --git a/src/lib.rs b/src/lib.rs index 837816a3f..11f7bac90 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -113,7 +113,7 @@ mod node_info; pub mod packet; pub mod permit_ban; mod query_pool; -mod rpc; +pub mod rpc; pub mod service; mod socket; @@ -126,6 +126,7 @@ pub use crate::discv5::{Discv5, Discv5Event}; pub use config::{Discv5Config, Discv5ConfigBuilder}; pub use error::{Discv5Error, QueryError, RequestError}; pub use executor::{Executor, TokioExecutor}; +pub use kbucket::{ConnectionDirection, ConnectionState, Key}; pub use permit_ban::PermitBanList; pub use socket::{FilterConfig, FilterConfigBuilder}; // re-export the ENR crate diff --git a/src/packet/mod.rs b/src/packet/mod.rs index d8d773f1a..69d3a4f9b 100644 --- a/src/packet/mod.rs +++ b/src/packet/mod.rs @@ -468,7 +468,7 @@ impl Packet { .expect("Can only be 2 bytes in size"), ); - let remaining_data = data[STATIC_HEADER_LENGTH..].to_vec(); + let remaining_data = data[IV_LENGTH + STATIC_HEADER_LENGTH..].to_vec(); if auth_data_size as usize > remaining_data.len() { return Err(PacketError::InvalidAuthDataSize); } diff --git a/src/query_pool/peers/closest.rs b/src/query_pool/peers/closest.rs index f69b42b61..25341b4df 100644 --- a/src/query_pool/peers/closest.rs +++ b/src/query_pool/peers/closest.rs @@ -448,7 +448,7 @@ mod tests { use super::*; use enr::NodeId; use quickcheck::*; - use rand::{thread_rng, Rng}; + use rand_07::{thread_rng, Rng}; use std::time::Duration; type TestQuery = FindNodeQuery; diff --git a/src/service.rs b/src/service.rs index e3db47663..e2e6619bf 100644 --- a/src/service.rs +++ b/src/service.rs @@ -20,7 +20,10 @@ use self::{ use crate::{ error::RequestError, handler::{Handler, HandlerRequest, HandlerResponse}, - kbucket::{self, ip_limiter, KBucketsTable, NodeStatus}, + kbucket::{ + self, ConnectionDirection, ConnectionState, FailureReason, InsertResult, KBucketsTable, + NodeStatus, UpdateResult, + }, node_info::{NodeAddress, NodeContact}, packet::MAX_PACKET_SIZE, query_pool::{ @@ -34,45 +37,42 @@ use futures::prelude::*; use parking_lot::RwLock; use rpc::*; use std::{collections::HashMap, net::SocketAddr, sync::Arc, task::Poll}; -use tokio::{ - sync::{mpsc, oneshot}, - time::Interval, -}; +use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error, info, trace, warn}; +mod hashset_delay; mod ip_vote; mod query_info; mod test; +use hashset_delay::HashSetDelay; + /// The number of distances (buckets) we simultaneously request from each peer. pub(crate) const DISTANCES_TO_REQUEST_PER_PEER: usize = 3; /// The types of requests to send to the Discv5 service. pub enum ServiceRequest { + /// A request to start a query. There are two types of queries: + /// - A FindNode Query - Searches for peers using a random target. + /// - A Predicate Query - Searches for peers closest to a random target that match a specified + /// predicate. StartQuery(QueryKind, oneshot::Sender>), + /// Find the ENR of a node given its multiaddr. FindEnr(NodeContact, oneshot::Sender>), + /// The TALK discv5 RPC function. Talk( NodeContact, Vec, Vec, oneshot::Sender, RequestError>>, ), + /// Sets up an event stream where the discv5 server will return various events such as + /// discovered nodes as it traverses the DHT. RequestEventStream(oneshot::Sender>), } use crate::discv5::PERMIT_BAN_LIST; -pub enum QueryKind { - FindNode { - target_node: NodeId, - }, - Predicate { - target_node: NodeId, - target_peer_no: usize, - predicate: Box bool + Send>, - }, -} - pub struct Service { /// Configuration parameters. config: Discv5Config, @@ -114,8 +114,8 @@ pub struct Service { /// The exit channel for the service. exit: oneshot::Receiver<()>, - /// An interval to check and ping all nodes in the routing table. - ping_heartbeat: Interval, + /// A queue of peers that require regular ping to check connectivity. + peers_to_ping: HashSetDelay, /// A channel that the service emits events on. event_stream: Option>, @@ -174,7 +174,10 @@ impl Service { ) -> Result<(oneshot::Sender<()>, mpsc::Sender), std::io::Error> { // process behaviour-level configuration parameters let ip_votes = if config.enr_update { - Some(IpVote::new(config.enr_peer_update_min)) + Some(IpVote::new( + config.enr_peer_update_min, + config.vote_duration, + )) } else { None }; @@ -208,7 +211,7 @@ impl Service { handler_send, handler_recv, handler_exit: Some(handler_exit), - ping_heartbeat: tokio::time::interval(config.ping_interval), + peers_to_ping: HashSetDelay::new(config.ping_interval), discv5_recv, event_stream: None, exit, @@ -265,8 +268,8 @@ impl Service { } Some(event) = self.handler_recv.recv() => { match event { - HandlerResponse::Established(enr) => { - self.inject_session_established(enr); + HandlerResponse::Established(enr, direction) => { + self.inject_session_established(enr,direction); } HandlerResponse::Request(node_address, request) => { self.handle_rpc_request(node_address, *request); @@ -327,8 +330,20 @@ impl Service { } } } - _ = self.ping_heartbeat.tick() => { - self.ping_connected_peers(); + Some(Ok(node_id)) = self.peers_to_ping.next() => { + // If the node is in the routing table, Ping it and re-queue the node. + let key = kbucket::Key::from(node_id); + let enr = { + if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.write().entry(&key) { + // The peer is in the routing table, ping it and re-queue the ping + self.peers_to_ping.insert(node_id); + Some(entry.value().clone()) + } else { None } + }; + + if let Some(enr) = enr { + self.send_ping(enr); + } } } } @@ -370,10 +385,13 @@ impl Service { let target_key: kbucket::Key = target.key(); + // Map the TableEntry to an ENR. + let kbucket_predicate = |e: &Enr| predicate(&e); + let known_closest_peers: Vec> = { let mut kbuckets = self.kbuckets.write(); kbuckets - .closest_keys_predicate(&target_key, &predicate) + .closest_keys_predicate(&target_key, &kbucket_predicate) .collect() }; @@ -638,7 +656,19 @@ impl Service { // // only attempt the majority-update if the peer supplies an ipv4 address to // mitigate https://github.com/sigp/lighthouse/issues/2215 - if socket.is_ipv4() { + // + // Only count votes that from peers we have contacted. + let key: kbucket::Key = node_id.into(); + let should_count = match self.kbuckets.write().entry(&key) { + kbucket::Entry::Present(_, status) + if status.is_connected() && !status.is_incoming() => + { + true + } + _ => false, + }; + + if should_count && socket.is_ipv4() { let local_socket = self.local_enr.read().udp_socket(); if let Some(ref mut ip_votes) = self.ip_votes { ip_votes.insert(node_id, socket); @@ -653,7 +683,6 @@ impl Service { .set_udp_socket(majority_socket, &self.enr_key.read()) .is_ok() { - // alert known peers to our updated enr self.ping_connected_peers(); } } @@ -675,7 +704,7 @@ impl Service { }; self.send_rpc_request(active_request); } - self.connection_updated(node_id, Some(enr), NodeStatus::Connected); + self.connection_updated(node_id, ConnectionStatus::PongReceived(enr)); } } ResponseBody::Talk { response } => { @@ -718,6 +747,7 @@ impl Service { self.send_rpc_request(active_request); } + /// Ping all peers that are connected in the routing table. fn ping_connected_peers(&mut self) { // maintain the ping interval let connected_peers = { @@ -725,7 +755,7 @@ impl Service { kbuckets .iter() .filter_map(|entry| { - if entry.status == NodeStatus::Connected { + if entry.status.is_connected() { Some(entry.node.value.clone()) } else { None @@ -951,28 +981,31 @@ impl Service { } // ignore peers that don't pass the table filter - if (self.config.table_filter)(enr_ref) { + if (self.config.table_filter)(&enr_ref) { let key = kbucket::Key::from(enr_ref.node_id()); - if !self.config.ip_limit - || self + + // If the ENR exists in the routing table and the discovered ENR has a greater + // sequence number, perform some filter checks before updating the enr. + + let must_update_enr = match self.kbuckets.write().entry(&key) { + kbucket::Entry::Present(mut entry, _) => entry.value().seq() < enr_ref.seq(), + kbucket::Entry::Pending(mut entry, _) => entry.value().seq() < enr_ref.seq(), + _ => false, + }; + + if must_update_enr { + match self .kbuckets - .read() - .check(&key, enr_ref, |v, o, l| ip_limiter(v, &o, l)) - { - match self.kbuckets.write().entry(&key) { - kbucket::Entry::Present(mut entry, _) => { - if entry.value().seq() < enr_ref.seq() { - trace!("ENR updated: {}", enr_ref); - *entry.value() = enr_ref.clone(); - } - } - kbucket::Entry::Pending(mut entry, _) => { - if entry.value().seq() < enr_ref.seq() { - trace!("ENR updated: {}", enr_ref); - *entry.value() = enr_ref.clone(); - } + .write() + .update_node(&key, enr_ref.clone(), None) + { + UpdateResult::Failed(reason) => { + self.peers_to_ping.remove(&enr_ref.node_id()); + debug!( + "Failed to update discovered ENR. Node: {}, Reason: {:?}", + source, reason + ); } - kbucket::Entry::Absent(_entry) => {} _ => {} } } @@ -1003,98 +1036,134 @@ impl Service { } /// Update the connection status of a node in the routing table. - fn connection_updated( - &mut self, - node_id: NodeId, - enr: Option, - mut new_status: NodeStatus, - ) { - let key = kbucket::Key::from(node_id); - if let Some(enr) = enr.as_ref() { - // ignore peers that don't pass the table filter - if !(self.config.table_filter)(enr) { - return; - } - - // should the ENR be inserted or updated to a value that would exceed the IP limit ban - if self.config.ip_limit - && !self - .kbuckets - .read() - .check(&key, enr, |v, o, l| ip_limiter(v, &o, l)) - { - // if the node status is connected and it would exceed the ip ban, consider it - // disconnected to be pruned. - new_status = NodeStatus::Disconnected; - } - } - - let mut event_to_send = None; + /// This tracks whether or not we should be pinging peers. Disconnected peers are removed from + /// the queue and newly added peers to the routing table are added to the queue. + fn connection_updated(&mut self, node_id: NodeId, new_status: ConnectionStatus) { + // Variables to that may require post-processing let mut ping_peer = None; - match self.kbuckets.write().entry(&key) { - kbucket::Entry::Present(mut entry, old_status) => { - if let Some(enr) = enr { - *entry.value() = enr; - } - if old_status != new_status { - entry.update(new_status); + let mut event_to_send = None; + + let key = kbucket::Key::from(node_id); + match new_status { + ConnectionStatus::Connected(enr, direction) => { + // attempt to update or insert the new ENR. + let status = NodeStatus { + state: ConnectionState::Connected, + direction, + }; + match self.kbuckets.write().insert_or_update(&key, enr, status) { + InsertResult::Inserted => { + // We added this peer to the table + debug!("New connected node added to routing table: {}", node_id); + self.peers_to_ping.insert(node_id); + let event = Discv5Event::NodeInserted { + node_id, + replaced: None, + }; + event_to_send = Some(event); + } + InsertResult::Pending { disconnected } => { + ping_peer = Some(disconnected); + } + InsertResult::StatusUpdated { + promoted_to_connected, + } + | InsertResult::Updated { + promoted_to_connected, + } => { + // The node was updated + if promoted_to_connected { + debug!("Node promoted to connected: {}", node_id); + self.peers_to_ping.insert(node_id); + } + } + InsertResult::ValueUpdated | InsertResult::UpdatedPending => {} + InsertResult::Failed(reason) => { + self.peers_to_ping.remove(&node_id); + trace!("Could not insert node: {}, reason: {:?}", node_id, reason); + } } } - kbucket::Entry::Pending(mut entry, old_status) => { - if let Some(enr) = enr { - *entry.value() = enr; - } - if old_status != new_status { - entry.update(new_status); + ConnectionStatus::PongReceived(enr) => { + match self + .kbuckets + .write() + .update_node(&key, enr, Some(ConnectionState::Connected)) + { + UpdateResult::Failed(reason) => { + self.peers_to_ping.remove(&node_id); + debug!( + "Could not update ENR from pong. Node: {}, reason: {:?}", + node_id, reason + ); + } + update => { + debug!("Updated {:?}", update) + } // Updated ENR successfully. } } - kbucket::Entry::Absent(entry) => { - if new_status == NodeStatus::Connected { - // Note: If an ENR is not provided, no record is added - debug_assert!(enr.is_some()); - if let Some(enr) = enr { - match entry.insert(enr, new_status) { - kbucket::InsertResult::Inserted => { - let event = Discv5Event::NodeInserted { - node_id, - replaced: None, - }; - event_to_send = Some(event); - } - kbucket::InsertResult::Full => (), - kbucket::InsertResult::Pending { disconnected } => { - ping_peer = Some(*disconnected.preimage()); - } + ConnectionStatus::Disconnected => { + // If the node has disconnected, remove any ping timer for the node. + match self.kbuckets.write().update_node_status( + &key, + ConnectionState::Disconnected, + None, + ) { + UpdateResult::Failed(reason) => match reason { + FailureReason::KeyNonExistant => {} + others => { + warn!( + "Could not update node to disconnected. Node: {}, Reason: {:?}", + node_id, others + ); } + }, + _ => { + debug!("Node set to disconnected: {}", node_id) } } + self.peers_to_ping.remove(&node_id); } - _ => {} - } + }; + + // Post processing if let Some(event) = event_to_send { self.send_event(event); } + if let Some(node_id) = ping_peer { - if let Some(enr) = self.find_enr(&node_id) { - self.send_ping(enr); + let optional_enr = { + let node_key = kbucket::Key::from(node_id); + if let kbucket::Entry::Present(mut entry, _status) = + self.kbuckets.write().entry(&node_key) + { + // NOTE: We don't check the status of this peer. We try and ping outdated peers. + Some(entry.value().clone()) + } else { + None + } + }; + if let Some(enr) = optional_enr { + self.send_ping(enr) } } } /// The equivalent of libp2p `inject_connected()` for a udp session. We have no stream, but a /// session key-pair has been negotiated. - fn inject_session_established(&mut self, enr: Enr) { + fn inject_session_established(&mut self, enr: Enr, direction: ConnectionDirection) { // Ignore sessions with non-contactable ENRs if enr.udp_socket().is_none() { return; } let node_id = enr.node_id(); - debug!("Session established with Node: {}", node_id); - self.connection_updated(node_id, Some(enr.clone()), NodeStatus::Connected); - // send an initial ping and start the ping interval - self.send_ping(enr); + debug!( + "Session established with Node: {}, direction: {}", + node_id, direction + ); + self.connection_updated(node_id, ConnectionStatus::Connected(enr, direction)); } /// A session could not be established or an RPC request timed-out (after a few retries, if @@ -1176,7 +1245,7 @@ impl Service { } } - self.connection_updated(node_id, None, NodeStatus::Disconnected); + self.connection_updated(node_id, ConnectionStatus::Disconnected); } } @@ -1239,3 +1308,26 @@ enum QueryEvent { /// The query has completed successfully. Finished(Box>), } + +/// The types of queries that can be made. +pub enum QueryKind { + /// A FindNode query. Searches for peers that are closest to a particular target. + FindNode { target_node: NodeId }, + /// A predicate query. Searches for peers that are close to a target but filtered by a specific + /// predicate and limited by a target peer count. + Predicate { + target_node: NodeId, + target_peer_no: usize, + predicate: Box bool + Send>, + }, +} + +/// Reporting the connection status of a node. +enum ConnectionStatus { + /// A node has started a new connection with us. + Connected(Enr, ConnectionDirection), + /// We received a Pong from a new node. Do not have the connection direction. + PongReceived(Enr), + /// The node has disconnected + Disconnected, +} diff --git a/src/service/hashset_delay.rs b/src/service/hashset_delay.rs new file mode 100644 index 000000000..fde9ab5d6 --- /dev/null +++ b/src/service/hashset_delay.rs @@ -0,0 +1,131 @@ +//! A simple hashset object coupled with a `delay_queue` which has entries that expire after a +//! fixed time. +//! +//! A `HashSetDelay` implements `Stream` which removes expired items from the map. + +/// The default delay for entries, in seconds. This is only used when `insert()` is used to add +/// entries. +const DEFAULT_DELAY: u64 = 30; + +use futures::prelude::*; +use std::{ + collections::HashMap, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; +use tokio_util::time::delay_queue::{self, DelayQueue}; + +pub struct HashSetDelay +where + K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin, +{ + /// The given entries. + entries: HashMap, + /// A queue holding the timeouts of each entry. + expirations: DelayQueue, + /// The default expiration timeout of an entry. + default_entry_timeout: Duration, +} + +impl Default for HashSetDelay +where + K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin, +{ + fn default() -> Self { + HashSetDelay::new(Duration::from_secs(DEFAULT_DELAY)) + } +} + +impl HashSetDelay +where + K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin, +{ + /// Creates a new instance of `HashSetDelay`. + pub fn new(default_entry_timeout: Duration) -> Self { + HashSetDelay { + entries: HashMap::new(), + expirations: DelayQueue::new(), + default_entry_timeout, + } + } + + /// Insert an entry into the mapping. Entries will expire after the `default_entry_timeout`. + pub fn insert(&mut self, key: K) { + self.insert_at(key, self.default_entry_timeout); + } + + /// Inserts an entry that will expire at a given instant. + pub fn insert_at(&mut self, key: K, entry_duration: Duration) { + if self.contains_key(&key) { + // update the timeout + self.update_timeout(&key, entry_duration); + } else { + let delay_key = self.expirations.insert(key.clone(), entry_duration); + self.entries.insert(key, delay_key); + } + } + + /// Updates the timeout for a given key. Returns true if the key existed, false otherwise. + /// + /// Panics if the duration is too far in the future. + pub fn update_timeout(&mut self, key: &K, timeout: Duration) -> bool { + if let Some(delay_key) = self.entries.get_mut(&key) { + self.expirations.reset(&delay_key, timeout); + true + } else { + false + } + } + + /// Returns true if the key exists, false otherwise. + pub fn contains_key(&self, key: &K) -> bool { + self.entries.contains_key(key) + } + + /// Returns the length of the mapping. + #[allow(dead_code)] + pub fn len(&self) -> usize { + self.entries.len() + } + + /// Removes a key from the map returning the value associated with the key that was in the map. + /// + /// Return None if the key was not in the map. + pub fn remove(&mut self, key: &K) -> bool { + if let Some(delay_key) = self.entries.remove(key) { + self.expirations.remove(&delay_key); + true + } else { + false + } + } + + /// Removes all entries from the map. + #[allow(dead_code)] + pub fn clear(&mut self) { + self.entries.clear(); + self.expirations.clear(); + } +} + +impl Stream for HashSetDelay +where + K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.expirations.poll_expired(cx) { + Poll::Ready(Some(Ok(key))) => match self.entries.remove(key.get_ref()) { + Some(_delay_key) => Poll::Ready(Some(Ok(key.into_inner()))), + None => Poll::Ready(Some(Err("Value no longer exists in expirations".into()))), + }, + Poll::Ready(Some(Err(e))) => { + Poll::Ready(Some(Err(format!("delay queue error: {:?}", e)))) + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/service/ip_vote.rs b/src/service/ip_vote.rs index ff0107d3c..3d34ffda3 100644 --- a/src/service/ip_vote.rs +++ b/src/service/ip_vote.rs @@ -6,19 +6,18 @@ use std::{ time::{Duration, Instant}, }; -/// The timeout before a report/vote expires. Currently set to a 5 minute window. -const PING_VOTE_TIMEOUT: u64 = 300; - /// A collection of IP:Ports for our node reported from external peers. pub(crate) struct IpVote { /// The current collection of IP:Port votes. votes: HashMap, /// The minimum number of votes required before an IP/PORT is accepted. minimum_threshold: usize, + /// The time votes remain valid. + vote_duration: Duration, } impl IpVote { - pub fn new(minimum_threshold: usize) -> Self { + pub fn new(minimum_threshold: usize, vote_duration: Duration) -> Self { // do not allow minimum thresholds less than 2 if minimum_threshold < 2 { panic!("Setting enr_peer_update_min to a value less than 2 will cause issues with discovery with peers behind NAT"); @@ -26,17 +25,13 @@ impl IpVote { IpVote { votes: HashMap::new(), minimum_threshold, + vote_duration, } } pub fn insert(&mut self, key: NodeId, socket: SocketAddr) { - self.votes.insert( - key, - ( - socket, - Instant::now() + Duration::from_secs(PING_VOTE_TIMEOUT), - ), - ); + self.votes + .insert(key, (socket, Instant::now() + self.vote_duration)); } /// Returns the majority `SocketAddr` if it exists. If there are not enough votes to meet the threshold this returns None. @@ -62,11 +57,11 @@ impl IpVote { #[cfg(test)] mod tests { - use super::{IpVote, NodeId, SocketAddr}; + use super::{Duration, IpVote, NodeId, SocketAddr}; #[test] fn test_three_way_vote_draw() { - let mut votes = IpVote::new(2); + let mut votes = IpVote::new(2, Duration::from_secs(10)); let socket_1 = SocketAddr::new("127.0.0.1".parse().unwrap(), 1); let socket_2 = SocketAddr::new("127.0.0.1".parse().unwrap(), 2); @@ -88,7 +83,7 @@ mod tests { #[test] fn test_majority_vote() { - let mut votes = IpVote::new(2); + let mut votes = IpVote::new(2, Duration::from_secs(10)); let socket_1 = SocketAddr::new("127.0.0.1".parse().unwrap(), 1); let socket_2 = SocketAddr::new("127.0.0.1".parse().unwrap(), 2); let socket_3 = SocketAddr::new("127.0.0.1".parse().unwrap(), 3); @@ -103,7 +98,7 @@ mod tests { #[test] fn test_below_threshold() { - let mut votes = IpVote::new(3); + let mut votes = IpVote::new(3, Duration::from_secs(10)); let socket_1 = SocketAddr::new("127.0.0.1".parse().unwrap(), 1); let socket_2 = SocketAddr::new("127.0.0.1".parse().unwrap(), 2); let socket_3 = SocketAddr::new("127.0.0.1".parse().unwrap(), 3); diff --git a/src/service/test.rs b/src/service/test.rs index 78b8a83bb..29b21b189 100644 --- a/src/service/test.rs +++ b/src/service/test.rs @@ -1,17 +1,19 @@ #![cfg(test)] +use super::*; + use crate::{ handler::Handler, kbucket, - kbucket::{KBucketsTable, NodeStatus}, + kbucket::{BucketInsertResult, KBucketsTable, NodeStatus}, node_info::NodeContact, query_pool::{QueryId, QueryPool}, rpc, rpc::RequestId, service::{ActiveRequest, Service}, - Discv5ConfigBuilder, + Discv5ConfigBuilder, Enr, }; -use enr::{CombinedKey, Enr, EnrBuilder}; +use enr::{CombinedKey, EnrBuilder}; use parking_lot::RwLock; use std::{ collections::HashMap, @@ -21,6 +23,20 @@ use std::{ }; use tokio::sync::{mpsc, oneshot}; +fn _connected_state() -> NodeStatus { + NodeStatus { + state: ConnectionState::Connected, + direction: ConnectionDirection::Outgoing, + } +} + +fn disconnected_state() -> NodeStatus { + NodeStatus { + state: ConnectionState::Disconnected, + direction: ConnectionDirection::Outgoing, + } +} + fn init() { let _ = tracing_subscriber::fmt() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) @@ -28,9 +44,10 @@ fn init() { } async fn build_service( - local_enr: Arc>>, + local_enr: Arc>, enr_key: Arc>, listen_socket: SocketAddr, + filters: bool, ) -> Service { let config = Discv5ConfigBuilder::new() .executor(Box::new(crate::executor::TokioExecutor::default())) @@ -45,9 +62,21 @@ async fn build_service( .await .unwrap(); + let (table_filter, bucket_filter) = if filters { + ( + Some(Box::new(kbucket::IpTableFilter) as Box>), + Some(Box::new(kbucket::IpBucketFilter) as Box>), + ) + } else { + (None, None) + }; + let kbuckets = Arc::new(RwLock::new(KBucketsTable::new( local_enr.read().node_id().into(), Duration::from_secs(60), + config.incoming_bucket_limit, + table_filter, + bucket_filter, ))); // create the required channels @@ -65,7 +94,7 @@ async fn build_service( handler_send, handler_recv, handler_exit: Some(_handler_exit), - ping_heartbeat: tokio::time::interval(config.ping_interval), + peers_to_ping: HashSetDelay::new(config.ping_interval), discv5_recv, event_stream: None, exit, @@ -97,17 +126,19 @@ async fn test_updating_connection_on_ping() { Arc::new(RwLock::new(enr)), Arc::new(RwLock::new(enr_key1)), socket_addr, + false, ) .await; // Set up service with one disconnected node let key = kbucket::Key::from(enr2.node_id()); if let kbucket::Entry::Absent(entry) = service.kbuckets.write().entry(&key) { - match entry.insert(enr2.clone(), NodeStatus::Disconnected) { - kbucket::InsertResult::Inserted => {} - kbucket::InsertResult::Full => { + match entry.insert(enr2.clone(), disconnected_state()) { + BucketInsertResult::Inserted => {} + BucketInsertResult::Full => { panic!("Can't be full"); } - kbucket::InsertResult::Pending { .. } => {} + BucketInsertResult::Pending { .. } => {} + _ => panic!("Could not be inserted"), } } @@ -138,5 +169,5 @@ async fn test_updating_connection_on_ping() { service.handle_rpc_response(expected_return_addr, response); let buckets = service.kbuckets.read(); let node = buckets.iter_ref().next().unwrap(); - assert_eq!(node.status, NodeStatus::Connected); + assert!(node.status.is_connected()) } diff --git a/src/socket/filter/config.rs b/src/socket/filter/config.rs index 9e9fbb16b..d4d2931ce 100644 --- a/src/socket/filter/config.rs +++ b/src/socket/filter/config.rs @@ -11,6 +11,12 @@ pub struct FilterConfig { /// The maximum requests tolerated per IP per second. This must be less than /// `max_requests_per_second`. pub max_requests_per_ip_per_second: Option, + /// The maximum number of node-ids allowed per IP address before the IP address gets banned. + /// Having this set to None, disables this feature. Default value is 10. + pub max_nodes_per_ip: Option, + /// The maximum number of nodes that can be banned by a single IP before that IP gets banned. + /// The default is 5. + pub max_bans_per_ip: Option, } impl Default for FilterConfig { @@ -20,6 +26,8 @@ impl Default for FilterConfig { max_requests_per_second: 10, max_requests_per_node_per_second: Some(10.0), max_requests_per_ip_per_second: Some(10.0), + max_nodes_per_ip: Some(10), + max_bans_per_ip: Some(5), } } } diff --git a/src/socket/filter/mod.rs b/src/socket/filter/mod.rs index da6954668..440936ad4 100644 --- a/src/socket/filter/mod.rs +++ b/src/socket/filter/mod.rs @@ -3,7 +3,12 @@ use crate::{discv5::PERMIT_BAN_LIST, metrics::METRICS, node_info::NodeAddress, packet::Packet}; use cache::ReceivedPacketCache; use enr::NodeId; -use std::{collections::HashMap, net::SocketAddr, sync::atomic::Ordering}; +use lru::LruCache; +use std::{ + collections::{HashMap, HashSet}, + net::{IpAddr, SocketAddr}, + sync::atomic::Ordering, +}; use tracing::{debug, warn}; mod cache; @@ -11,23 +16,33 @@ mod config; pub use config::{FilterConfig, FilterConfigBuilder}; /// The maximum percentage of our unsolicited requests per second limit a node is able to consume -/// for `NUMBER_OF_WINDOWS` duration before being banned. +/// for `NUMBER_OF_WINDOWS` duration before being banned. /// This allows us to ban the IP/NodeId of an attacker spamming us with requests. const MAX_PERCENT_OF_LIMIT_PER_NODE: f64 = 0.9; /// The number of windows to remember before banning a node. const NUMBER_OF_WINDOWS: usize = 5; +/// The maximum number of IPs to retain when calculating the number of nodes per IP. +const KNOWN_ADDRS_SIZE: usize = 500; +/// The number of IPs to retain at any given time that have banned nodes. +const BANNED_NODES_SIZE: usize = 50; /// The packet filter which decides whether we accept or reject incoming packets. pub(crate) struct Filter { /// Configuration for the packet filter. config: FilterConfig, /// An ordered (by time) collection of recently seen packets by SocketAddr. The packet data is not - /// stored here. This stores a 5 seconds of history to calculate a 5 second moving average for + /// stored here. This stores 5 seconds of history to calculate a 5 second moving average for /// the metrics. raw_packets_received: ReceivedPacketCache, /// An ordered (by time) collection of seen NodeIds that have passed the first filter check and /// have an associated NodeId. received_by_node: ReceivedPacketCache, + /// Keep track of node ids per socket. If someone is using too many node-ids per IP, they can + /// be banned. + known_addrs: LruCache>, + /// Keep track of Ips that have banned nodes. If a single IP has many nodes that get banned, + /// then we ban the IP address. + banned_nodes: LruCache, } impl Filter { @@ -42,6 +57,8 @@ impl Filter { config.max_requests_per_second * NUMBER_OF_WINDOWS, METRICS.moving_window, ), + known_addrs: LruCache::new(KNOWN_ADDRS_SIZE), + banned_nodes: LruCache::new(BANNED_NODES_SIZE), } } @@ -151,12 +168,53 @@ impl Filter { "Node has exceeded its request limit and is now banned {}", node_address.node_id ); + + // The node is being banned PERMIT_BAN_LIST .write() .ban_nodes .insert(node_address.node_id); + + // If we are tracking banned nodes per IP, add to the count. If the count is higher + // than our tolerance, ban the IP. + if let Some(max_bans_per_ip) = self.config.max_bans_per_ip { + let ip = node_address.socket_addr.ip(); + if let Some(banned_count) = self.banned_nodes.get_mut(&ip) { + *banned_count += 1; + if *banned_count >= max_bans_per_ip { + PERMIT_BAN_LIST.write().ban_ips.insert(ip); + } + } else { + self.banned_nodes.put(ip, 0); + } + } + return false; } + + // Check the nodes per IP filter configuration + if let Some(max_nodes_per_ip) = self.config.max_nodes_per_ip { + // This option is set, store the known nodes per IP. + let ip = node_address.socket_addr.ip(); + let known_nodes = { + if let Some(known_nodes) = self.known_addrs.get_mut(&ip) { + known_nodes.insert(node_address.node_id); + known_nodes.len() + } else { + let mut ids = HashSet::new(); + ids.insert(node_address.node_id); + self.known_addrs.put(ip, ids); + 1 + } + }; + + if known_nodes >= max_nodes_per_ip { + warn!("IP has exceeded its node-id limit and is now banned {}", ip); + PERMIT_BAN_LIST.write().ban_ips.insert(ip); + self.known_addrs.pop(&ip); + return false; + } + } } true