Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More lenient ipv6 auto-update #266

Merged
merged 19 commits into from
Oct 14, 2024
Merged
41 changes: 40 additions & 1 deletion src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ impl Service {
kbucket::Entry::Present(_, status)
if status.is_connected() && !status.is_incoming());

if should_count {
if should_count | self.require_more_ip_votes(socket.is_ipv6()) {
// get the advertised local addresses
let (local_ip4_socket, local_ip6_socket) = {
let local_enr = self.local_enr.read();
Expand Down Expand Up @@ -1345,6 +1345,19 @@ impl Service {
}
InsertResult::ValueUpdated | InsertResult::UpdatedPending => {}
InsertResult::Failed(reason) => {
// On large networks with limited IPv6 nodes, it is hard to get enough
// PONG votes in order to estimate our external IP address. Often the
// routing table can be full, and so we reject useful IPv6 here.
//
// If we are low on votes and we initiated this connection (i.e it was not
// forced on us) then lets get a PONG from this node.

if direction == ConnectionDirection::Outgoing
&& self.require_more_ip_votes(enr.udp6_socket().is_some())
{
self.send_ping(enr, None);
}

self.peers_to_ping.remove(&node_id);
trace!(%node_id, ?reason, "Could not insert node");
}
Expand Down Expand Up @@ -1535,6 +1548,32 @@ impl Service {
}
}

/// This is a helper function that determines if we need more votes for a specific IP
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
/// class.
///
/// If we are in dual-stack made and don't have enough votes for either ipv4 or ipv6 and the
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
/// requesting node/vote is what we need, then this will return true
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
fn require_more_ip_votes(&mut self, is_ipv6: bool) -> bool {
if !matches!(self.ip_mode, IpMode::DualStack) {
return false;
}

if let Some(ip_votes) = self.ip_votes.as_mut() {
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
match (ip_votes.majority(), is_ipv6) {
// We don't have enough ipv4 votes, but this is an IPv4-only node.
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
((None, Some(_)), false) |
// We don't have enough ipv6 votes, but this is an IPv6 node
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
((Some(_), None), true) |
// We don't have enough ipv6 or ipv4 nodes, ping this peer
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
((None, None), _,) => true,
// We have enough votes do nothing
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
((_, _), _,) => false,
}
} else {
false
}
}

/// A future that maintains the routing table and inserts nodes when required. This returns the
/// [`Event::NodeInserted`] variant if a new node has been inserted into the routing table.
async fn bucket_maintenance_poll(kbuckets: &Arc<RwLock<KBucketsTable<NodeId, Enr>>>) -> Event {
Expand Down
165 changes: 163 additions & 2 deletions src/service/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,18 @@ use crate::{
};
use enr::CombinedKey;
use parking_lot::RwLock;
use std::{collections::HashMap, net::Ipv4Addr, sync::Arc, time::Duration};
use tokio::sync::{mpsc, oneshot};
use rand;
use std::{
collections::HashMap,
net::{Ipv4Addr, Ipv6Addr},
sync::Arc,
time::Duration,
};
use tokio::sync::{
mpsc,
mpsc::{Sender, UnboundedReceiver},
oneshot,
};

/// Default UDP port number to use for tests requiring UDP exposure
pub const DEFAULT_UDP_PORT: u16 = 0;
Expand Down Expand Up @@ -102,6 +112,65 @@ async fn build_service<P: ProtocolIdentity>(
}
}

fn build_non_handler_service(
local_enr: Arc<RwLock<Enr>>,
enr_key: Arc<RwLock<CombinedKey>>,
filters: bool,
) -> (Service, UnboundedReceiver<HandlerIn>, Sender<HandlerOut>) {
let listen_config = ListenConfig::Ipv4 {
ip: local_enr.read().ip4().unwrap(),
port: local_enr.read().udp4().unwrap(),
};
let config = ConfigBuilder::new(listen_config).build();

// Fake's the handler with empty channels
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
let (handler_send, handler_recv_fake) = mpsc::unbounded_channel();
let (handler_send_fake, handler_recv) = mpsc::channel(1000);

let (table_filter, bucket_filter) = if filters {
(
Some(Box::new(kbucket::IpTableFilter) as Box<dyn kbucket::Filter<Enr>>),
Some(Box::new(kbucket::IpBucketFilter) as Box<dyn kbucket::Filter<Enr>>),
)
} else {
(None, None)
};

let kbuckets = Arc::new(RwLock::new(KBucketsTable::new(
local_enr.read().node_id().into(),
Duration::from_secs(60),
config.incoming_bucket_limit,
table_filter,
bucket_filter,
)));

let ip_vote = IpVote::new(10, Duration::from_secs(10000));

// create the required channels
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
let (_discv5_send, discv5_recv) = mpsc::channel(30);
let (_exit_send, exit) = oneshot::channel();

let service = Service {
local_enr,
enr_key,
kbuckets,
queries: QueryPool::new(config.query_timeout),
active_requests: Default::default(),
active_nodes_responses: HashMap::new(),
ip_votes: Some(ip_vote),
handler_send,
handler_recv,
handler_exit: None,
peers_to_ping: HashSetDelay::new(config.ping_interval),
discv5_recv,
event_stream: None,
exit,
config,
ip_mode: IpMode::DualStack,
};
(service, handler_recv_fake, handler_send_fake)
}

#[tokio::test]
async fn test_updating_connection_on_ping() {
init();
Expand Down Expand Up @@ -341,3 +410,95 @@ async fn test_handling_concurrent_responses() {
assert!(service.active_requests.is_empty());
assert!(service.active_nodes_responses.is_empty());
}

fn generate_rand_ipv4() -> Ipv4Addr {
let a: u8 = rand::random();
let b: u8 = rand::random();
let c: u8 = rand::random();
let d: u8 = rand::random();
Ipv4Addr::new(a, b, c, d)
}

fn generate_rand_ipv6() -> Ipv6Addr {
let a: u16 = rand::random();
let b: u16 = rand::random();
let c: u16 = rand::random();
let d: u16 = rand::random();
let e: u16 = rand::random();
let f: u16 = rand::random();
let g: u16 = rand::random();
let h: u16 = rand::random();
Ipv6Addr::new(a, b, c, d, e, f, g, h)
}

fn random_connection_direction() -> ConnectionDirection {
let outgoing: bool = rand::random();
if outgoing {
ConnectionDirection::Outgoing
} else {
ConnectionDirection::Incoming
}
}

#[tokio::test]
async fn test_ipv6_update_amongst_ipv4_dominated_network() {
init();

let enr_key = CombinedKey::generate_secp256k1();
let ip = std::net::Ipv4Addr::LOCALHOST;
let local_enr = Enr::builder()
.ip4(ip)
.udp4(DEFAULT_UDP_PORT)
.build(&enr_key)
.unwrap();

let (mut service, mut handler_recv, _handler_send) = build_non_handler_service(
Arc::new(RwLock::new(local_enr)),
Arc::new(RwLock::new(enr_key)),
false,
);

// Load up the routing table with 100 random ENRs
AgeManning marked this conversation as resolved.
Show resolved Hide resolved

for _ in 0..100 {
let key = CombinedKey::generate_secp256k1();
let ip = generate_rand_ipv4();
let enr = Enr::builder()
.ip4(ip)
.udp4(DEFAULT_UDP_PORT)
.build(&key)
.unwrap();

let direction = random_connection_direction();
service.inject_session_established(enr.clone(), direction);
}

// Attempt to add 10 IPv6 nodes and expect that we attempt to send 10 PING's to IPv6 nodes.
for _ in 0..10 {
let key = CombinedKey::generate_secp256k1();
let ip = generate_rand_ipv6();
let enr = Enr::builder()
.ip6(ip)
.udp6(DEFAULT_UDP_PORT)
.build(&key)
.unwrap();

let direction = ConnectionDirection::Outgoing;
service.inject_session_established(enr.clone(), direction);
}

// Collect all the messages to the handler and count the PING requests for ENR v6 addresses
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
let mut v6_pings = 0;
while let Ok(event) = handler_recv.try_recv() {
if let HandlerIn::Request(contact, request) = event {
if contact.node_address().socket_addr.is_ipv6()
&& matches!(request.body, RequestBody::Ping { .. })
{
v6_pings += 1
}
}
}

// Should be 10 ipv6 pings
assert_eq!(v6_pings, 10)
}