From 07d6409e99090a14bb984322e5109eaa5f4efa70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Garillot?= <4142+huitseeker@users.noreply.github.com> Date: Mon, 8 Aug 2022 10:07:13 -0400 Subject: [PATCH] fix: Mitigate issue #706 (#715) Context: The `BatchLoader` is establishing a primary->worker communication using a public address, which is adding latency and competing with outbound traffic. The proper fix us to use the `BlockWaiter`. The issue: We would like a mitigation to be deployed and effective sooner. The fix: We inspect the worker addresses used by the `BatchLoader`, and rewrite their hostname to localhost when it matches the local primary. --- Cargo.lock | 1 + executor/Cargo.toml | 1 + executor/src/lib.rs | 110 ++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 108 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca80070ef..9aa9b1c63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1345,6 +1345,7 @@ dependencies = [ "crypto", "futures", "indexmap", + "match_opt", "multiaddr", "mysten-network 0.1.0 (git+https://github.com/mystenlabs/mysten-infra.git?rev=d965a5a795dcdb4d1c7964acf556bc249fdc58aa)", "primary", diff --git a/executor/Cargo.toml b/executor/Cargo.toml index 82ce968cb..cbb59a58b 100644 --- a/executor/Cargo.toml +++ b/executor/Cargo.toml @@ -30,6 +30,7 @@ worker = { path = "../worker" } store = { git = "https://github.com/mystenlabs/mysten-infra.git", package = "typed-store", rev = "d965a5a795dcdb4d1c7964acf556bc249fdc58aa" } mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "d965a5a795dcdb4d1c7964acf556bc249fdc58aa" } workspace-hack = { version = "0.1", path = "../workspace-hack" } +match_opt = "0.1.2" [dev-dependencies] rand = "0.7.3" diff --git a/executor/src/lib.rs b/executor/src/lib.rs index 73c604a24..1c2685fe6 100644 --- a/executor/src/lib.rs +++ b/executor/src/lib.rs @@ -15,6 +15,7 @@ mod fixtures; mod execution_state; pub use errors::{ExecutionStateError, SubscriberError, SubscriberResult}; +use multiaddr::{Multiaddr, Protocol}; pub use state::ExecutionIndices; use crate::{batch_loader::BatchLoader, core::Core, subscriber::Subscriber}; @@ -23,7 +24,13 @@ use config::SharedCommittee; use consensus::ConsensusOutput; use crypto::PublicKey; use serde::de::DeserializeOwned; -use std::{fmt::Debug, sync::Arc}; +use std::{ + borrow::Cow, + collections::HashMap, + fmt::Debug, + net::{Ipv4Addr, Ipv6Addr}, + sync::Arc, +}; use store::Store; use tokio::{ sync::{ @@ -131,17 +138,41 @@ impl Executor { ); // Spawn the batch loader. - let worker_addresses = committee + let mut worker_addresses: HashMap = committee .load() .authorities .iter() - .find(|(x, _)| *x == &name) - .map(|(_, authority)| authority) + .find_map(|v| match_opt::match_opt!(v, (x, authority) if *x == name => authority)) .expect("Our public key is not in the committee") .workers .iter() .map(|(id, x)| (*id, x.worker_to_worker.clone())) .collect(); + //////////////////////////////////////////////////////////////// + // TODO: remove this hack once #706 is fixed + //////////////////////////////////////////////////////////////// + + // retrieve our primary address + let our_primary_to_primary_address = committee + .load() + .primary(&name) + .expect("Out public key is not in the committee!") + .primary_to_primary; + // extract the hostname portion + let our_primary_hostname = our_primary_to_primary_address + .into_iter() + .flat_map(move |x| match x { + p @ Protocol::Ip4(_) | p @ Protocol::Ip6(_) | p @ Protocol::Dns(_) => Some(p), + _ => None, + }) + .next() + .expect("Could not find hostname in our primary address!"); + // Modify the worker addresses that we are about to use : would we talk better using a loopback address? + for worker_address in worker_addresses.values_mut() { + replace_distant_by_localhost(worker_address, &our_primary_hostname); + } + //////////////////////////////////////////////////////////////// + let batch_loader_handle = BatchLoader::spawn( store, tx_reconfigure.subscribe(), @@ -158,3 +189,74 @@ impl Executor { ]) } } + +fn replace_distant_by_localhost(target: &mut Multiaddr, hostname_pattern: &Protocol) { + // does the hostname match our pattern exactly? + if target.iter().next() == Some(hostname_pattern.clone()) { + if let Some(replacement) = target.replace(0, move |x| match x { + Protocol::Ip4(_) => Some(Protocol::Ip4(Ipv4Addr::LOCALHOST)), + Protocol::Ip6(_) => Some(Protocol::Ip6(Ipv6Addr::LOCALHOST)), + Protocol::Dns(_) => Some(Protocol::Dns(Cow::Owned("localhost".to_owned()))), + _ => None, + }) { + tracing::debug!("Address for worker {} replaced by {}", target, replacement); + *target = replacement; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use multiaddr::{multiaddr, Protocol}; + use std::net::Ipv4Addr; + + #[test] + fn test_replace_distant_by_localhost() { + // IPV4 positive + let non_local: Ipv4Addr = "8.8.8.8".parse().unwrap(); + let mut addr1 = multiaddr!(Ip4(non_local), Tcp(10000u16)); + + replace_distant_by_localhost(&mut addr1, &Protocol::Ip4(non_local)); + assert_eq!(addr1, multiaddr!(Ip4(Ipv4Addr::LOCALHOST), Tcp(10000u16))); + + // IPV4 negative + let other_target: Ipv4Addr = "8.8.8.4".parse().unwrap(); + let addr1 = multiaddr!(Ip4(non_local), Tcp(10000u16)); + let mut addr2 = multiaddr!(Ip4(non_local), Tcp(10000u16)); + + replace_distant_by_localhost(&mut addr2, &Protocol::Ip4(other_target)); + assert_eq!(addr2, addr1); + + // IPV6 positive + let non_local: Ipv6Addr = "2607:f0d0:1002:51::4".parse().unwrap(); + let mut addr1 = multiaddr!(Ip6(non_local), Tcp(10000u16)); + + replace_distant_by_localhost(&mut addr1, &Protocol::Ip6(non_local)); + assert_eq!(addr1, multiaddr!(Ip6(Ipv6Addr::LOCALHOST), Tcp(10000u16))); + + // IPV6 negative + let other_target: Ipv6Addr = "2607:f0d0:1002:50::4".parse().unwrap(); + let addr1 = multiaddr!(Ip6(non_local), Tcp(10000u16)); + let mut addr2 = multiaddr!(Ip6(non_local), Tcp(10000u16)); + + replace_distant_by_localhost(&mut addr2, &Protocol::Ip6(other_target)); + assert_eq!(addr2, addr1); + + // DNS positive + let non_local: Cow = Cow::Owned("google.com".to_owned()); + let mut addr1 = multiaddr!(Dns(non_local.clone()), Tcp(10000u16)); + + replace_distant_by_localhost(&mut addr1, &Protocol::Dns(non_local.clone())); + let localhost: Cow = Cow::Owned("localhost".to_owned()); + assert_eq!(addr1, multiaddr!(Dns(localhost), Tcp(10000u16))); + + // DNS negative + let other_target: Cow = Cow::Owned("apple.com".to_owned()); + let addr1 = multiaddr!(Dns(non_local.clone()), Tcp(10000u16)); + let mut addr2 = multiaddr!(Dns(non_local), Tcp(10000u16)); + + replace_distant_by_localhost(&mut addr2, &Protocol::Dns(other_target)); + assert_eq!(addr2, addr1); + } +}