Skip to content

Commit

Permalink
Add SocketPool to Pinger
Browse files Browse the repository at this point in the history
In order to add SocketPool, Pinger had to be broken out of telio-utils
because of circular dependencies. LinkDetection had to be also updated
to use the new Pinger.
  • Loading branch information
tomasz-grz committed Jan 15, 2025
1 parent 6dffa1e commit 7312c7f
Show file tree
Hide file tree
Showing 15 changed files with 185 additions and 35 deletions.
Empty file.
17 changes: 16 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ telio-task.workspace = true
telio-traversal.workspace = true
telio-utils.workspace = true
telio-wg.workspace = true
telio-pinger.workspace = true
once_cell.workspace = true
nat-detect.workspace = true
smart-default.workspace = true
Expand Down Expand Up @@ -199,6 +200,7 @@ telio-utils = { version = "0.1.0", path = "./crates/telio-utils" }
telio-wg = { version = "0.1.0", path = "./crates/telio-wg" }
telio-pq = { version = "0.1.0", path = "./crates/telio-pq" }
telio-pmtu = { version = "0.1.0", path = "./crates/telio-pmtu" }
telio-pinger = { version = "0.1.0", path = "./crates/telio-pinger" }

[profile.release]
opt-level = "s"
Expand Down
2 changes: 2 additions & 0 deletions crates/telio-nurse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ telio-proto.workspace = true
telio-task.workspace = true
telio-utils.workspace = true
telio-wg.workspace = true
telio-pinger.workspace = true
telio-sockets.workspace = true
once_cell.workspace = true

[dev-dependencies]
Expand Down
8 changes: 7 additions & 1 deletion crates/telio-nurse/src/nurse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use telio_crypto::{PublicKey, SecretKey};
use telio_lana::*;
use telio_model::event::Event;
use telio_sockets::SocketPool;
use telio_task::{
io::{chan, mc_chan, Chan, McChan},
task_exec, ExecError, Runtime, RuntimeExt, Task, WaitResponse,
Expand Down Expand Up @@ -60,9 +61,12 @@ impl Nurse {
io: NurseIo<'_>,
aggregator: Arc<ConnectivityDataAggregator>,
ipv6_enabled: bool,
sock_pool: Arc<SocketPool>,
) -> Self {
Self {
task: Task::start(State::new(public_key, config, io, aggregator, ipv6_enabled).await),
task: Task::start(
State::new(public_key, config, io, aggregator, ipv6_enabled, sock_pool).await,
),
}
}

Expand Down Expand Up @@ -130,6 +134,7 @@ impl State {
io: NurseIo<'_>,
aggregator: Arc<ConnectivityDataAggregator>,
ipv6_enabled: bool,
sock_pool: Arc<SocketPool>,
) -> Self {
let meshnet_id = Self::meshnet_id();
telio_log_debug!("Meshnet ID: {meshnet_id}");
Expand Down Expand Up @@ -181,6 +186,7 @@ impl State {
config_update_channel: config_update_channel.subscribe(),
},
ipv6_enabled,
sock_pool,
)))
} else {
None
Expand Down
27 changes: 21 additions & 6 deletions crates/telio-nurse/src/qos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use telio_model::features::RttType;
use telio_task::{io::mc_chan, Runtime, RuntimeExt, WaitResponse};
use telio_wg::uapi::{AnalyticsEvent, PeerState};

use telio_utils::{
interval, telio_log_debug, telio_log_trace, DualPingResults, DualTarget, IpStack, Pinger,
};
use telio_pinger::{DualPingResults, Pinger};
use telio_sockets::SocketPool;
use telio_utils::{interval, telio_log_debug, telio_log_trace, DualTarget, IpStack};

use crate::{config::QoSConfig, data::MeshConfigUpdateEvent};

Expand Down Expand Up @@ -240,15 +240,22 @@ impl Analytics {
///
/// * `config` - Config for QoS component.
/// * `io` - Channel(s) for communicating with WireGuard.
/// * `ipv6_enabled` - if ipv6 is supported.
/// * `socket_pool` - SocketPool used to protect the sockets.
///
/// # Returns
///
/// A new `Analytics` instance with the given configuration but with no nodes.
pub fn new(config: QoSConfig, io: Io, ipv6_enabled: bool) -> Self {
pub fn new(
config: QoSConfig,
io: Io,
ipv6_enabled: bool,
socket_pool: Arc<SocketPool>,
) -> Self {
let (ping_channel_tx, ping_channel_rx) = mpsc::channel(1);

let ping_backend = if config.rtt_types.contains(&RttType::Ping) {
Arc::new(Pinger::new(config.rtt_tries, ipv6_enabled).ok())
Arc::new(Pinger::new(config.rtt_tries, ipv6_enabled, Some(socket_pool)).ok())
} else {
Arc::new(None)
};
Expand Down Expand Up @@ -600,6 +607,7 @@ mod tests {
collections::HashSet,
net::{Ipv4Addr, Ipv6Addr},
};
use telio_sockets::NativeProtector;
use telio_task::{
io::{mc_chan::Tx, McChan},
task_exec, Task,
Expand Down Expand Up @@ -931,9 +939,16 @@ mod tests {
rtt_types: vec![RttType::Ping],
buckets: 5,
};
let sock_pool = Arc::new(SocketPool::new(
NativeProtector::new(
#[cfg(target_os = "macos")]
true,
)
.expect("Failed to create NativeProtector"),
));

(
Analytics::new(config, io, true),
Analytics::new(config, io, true, sock_pool),
manual_trigger_channel.tx,
wg_channel.tx,
)
Expand Down
18 changes: 18 additions & 0 deletions crates/telio-pinger/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "telio-pinger"
version = "0.1.0"
edition = "2021"
license = "GPL-3.0-only"
repository = "https://github.com/NordSecurity/libtelio"
publish = false

[dependencies]
socket2.workspace = true
telio-utils.workspace = true
telio-sockets.workspace = true
surge-ping.workspace = true
rand.workspace = true
tracing.workspace = true

[dev-dependencies]
tokio = { workspace = true, features = ["net", "sync", "test-util"] }
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::time::Duration;
use std::{convert::TryInto, net::IpAddr};
use surge_ping::{Client, Config as PingerConfig, PingIdentifier, PingSequence, ICMP};

use crate::{telio_log_debug, telio_log_error, DualTarget};
use telio_sockets::SocketPool;
use telio_utils::{telio_log_debug, telio_log_error, telio_log_trace, DualTarget};

/// Information needed to check the reachability of endpoints.
///
Expand Down Expand Up @@ -42,20 +43,37 @@ pub struct DualPingResults {
impl Pinger {
const PING_TIMEOUT: Duration = Duration::from_secs(5);

/// Create new instance of `Ping`.
/// Create new instance of `Ping` with socket pool.
///
/// # Arguments
///
/// * `no_of_tries` - How many pings should be sent.
pub fn new(no_of_tries: u32, ipv6: bool) -> std::io::Result<Self> {
/// * `ipv6` - if ipv6 is supported.
/// * `socket_pool` - Optional SocketPool used to protect the sockets.
pub fn new(
no_of_tries: u32,
ipv6: bool,
socket_pool: Option<Arc<SocketPool>>,
) -> std::io::Result<Self> {
let client_v6 = if ipv6 {
Some(Arc::new(Self::build_client(ICMP::V6)?))
let client_v6 = Arc::new(Self::build_client(ICMP::V6)?);
if let Some(ref socket_pool) = socket_pool {
telio_log_trace!("Making pinger IPv6 socket internal");
socket_pool.make_internal(client_v6.get_socket().get_native_sock())?;
}
Some(client_v6)
} else {
None
};

let client_v4 = Arc::new(Self::build_client(ICMP::V4)?);
if let Some(ref socket_pool) = socket_pool {
telio_log_trace!("Making pinger IPv4 socket internal");
socket_pool.make_internal(client_v4.get_socket().get_native_sock())?;
}

Ok(Self {
client_v4: Arc::new(Self::build_client(ICMP::V4)?),
client_v4,
client_v6,
no_of_tries,
})
Expand Down Expand Up @@ -181,3 +199,45 @@ impl Pinger {
Client::new(&config_builder.build())
}
}

#[cfg(test)]
mod tests {
use super::*;
use telio_sockets::NativeProtector;

// Basic constructor test
#[tokio::test]
async fn test_pinger_new_v6_sock_pool() {
let pinger = Pinger::new(1, true, None).expect("Failed to create Pinger");
assert!(pinger.client_v4.get_socket().get_native_sock() > 0);
assert!(pinger.client_v6.is_some());
assert_eq!(pinger.no_of_tries, 1);
}

// Basic ping test
#[tokio::test]
async fn test_ping_localhost() {
let pinger = Pinger::new(
2,
false,
Some(Arc::new(SocketPool::new(
NativeProtector::new(
#[cfg(target_os = "macos")]
true,
)
.expect("Failed to create Protector"),
))),
)
.expect("Failed to create Pinger");

let target =
DualTarget::new(("127.0.0.1".parse().ok(), None)).expect("Failed to create target");

let result = pinger.perform(target).await;
assert!(
result.v4.unwrap().successful_pings > 0,
"Expected at least one successful ping to 127.0.0.1"
);
assert!(result.v6.is_none());
}
}
1 change: 0 additions & 1 deletion crates/telio-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ serde.workspace = true
smart-default.workspace = true
sn_fake_clock = { workspace = true, optional = true }
socket2.workspace = true
surge-ping.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["time"] }
tracing.workspace = true
Expand Down
4 changes: 0 additions & 4 deletions crates/telio-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ pub use interval::*;
pub mod ip_stack;
pub use ip_stack::*;

/// Basic ICMP Pinger
pub mod pinger;
pub use pinger::*;

/// Testing tools
pub mod test;

Expand Down
1 change: 1 addition & 0 deletions crates/telio-wg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ telio-model.workspace = true
telio-sockets.workspace = true
telio-task.workspace = true
telio-utils.workspace = true
telio-pinger.workspace = true
itertools.workspace = true

[dev-dependencies]
Expand Down
15 changes: 13 additions & 2 deletions crates/telio-wg/src/link_detection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use telio_model::{
features::FeatureLinkDetection,
mesh::{LinkState, NodeState},
};
use telio_sockets::SocketPool;
use telio_task::io::{chan, Chan};
use telio_utils::{
get_ip_stack, telio_err_with_log, telio_log_debug, telio_log_error, telio_log_trace,
Expand All @@ -35,10 +36,20 @@ pub struct LinkDetection {
}

impl LinkDetection {
pub fn new(cfg: FeatureLinkDetection, ipv6_enabled: bool) -> Self {
pub fn new(
cfg: FeatureLinkDetection,
ipv6_enabled: bool,
socket_pool: Option<Arc<SocketPool>>,
) -> Self {
let ping_channel = Chan::default();
let enhanced_detection = if cfg.no_of_pings != 0 {
EnhancedDetection::start_with(ping_channel.rx, cfg.no_of_pings, ipv6_enabled).ok()
EnhancedDetection::start_with(
ping_channel.rx,
cfg.no_of_pings,
ipv6_enabled,
socket_pool,
)
.ok()
} else {
None
};
Expand Down
17 changes: 13 additions & 4 deletions crates/telio-wg/src/link_detection/enhanced_detection.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use async_trait::async_trait;
use std::net::IpAddr;
use std::{net::IpAddr, sync::Arc};

use telio_pinger::Pinger;
use telio_sockets::SocketPool;
use telio_task::{io::chan, task_exec, Runtime, RuntimeExt, Task, WaitResponse};
use telio_utils::{ip_stack, DualTarget, IpStack, Pinger};
use telio_utils::{ip_stack, DualTarget, IpStack};

use telio_utils::telio_log_debug;
/// Component used to check the link state when we think it's down.
Expand All @@ -17,9 +19,15 @@ impl EnhancedDetection {
ping_channel: chan::Rx<(Vec<IpAddr>, Option<IpStack>)>,
no_of_pings: u32,
ipv6_enabled: bool,
socket_pool: Option<Arc<SocketPool>>,
) -> std::io::Result<Self> {
Ok(Self {
task: Task::start(State::new(ping_channel, no_of_pings, ipv6_enabled)?),
task: Task::start(State::new(
ping_channel,
no_of_pings,
ipv6_enabled,
socket_pool,
)?),
})
}

Expand All @@ -38,10 +46,11 @@ impl State {
ping_channel: chan::Rx<(Vec<IpAddr>, Option<IpStack>)>,
no_of_pings: u32,
ipv6_enabled: bool,
socket_pool: Option<Arc<SocketPool>>,
) -> std::io::Result<Self> {
Ok(State {
ping_channel,
pinger: Pinger::new(no_of_pings, ipv6_enabled)?,
pinger: Pinger::new(no_of_pings, ipv6_enabled, socket_pool)?,
})
}
}
Expand Down
Loading

0 comments on commit 7312c7f

Please sign in to comment.