Skip to content

Commit

Permalink
Add SocketPool to Pinger
Browse files Browse the repository at this point in the history
In order to add SocketPool, Pinger had to be broken out of telio-utils
because of circular dependencies. LinkDetection had to be also updated
to use the new Pinger.
  • Loading branch information
tomasz-grz committed Jan 13, 2025
1 parent dc5d53a commit e5ec181
Show file tree
Hide file tree
Showing 13 changed files with 186 additions and 34 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ telio-task.workspace = true
telio-traversal.workspace = true
telio-utils.workspace = true
telio-wg.workspace = true
telio-pinger.workspace = true
once_cell.workspace = true
nat-detect.workspace = true
smart-default.workspace = true
Expand Down Expand Up @@ -201,6 +202,7 @@ telio-utils = { version = "0.1.0", path = "./crates/telio-utils" }
telio-wg = { version = "0.1.0", path = "./crates/telio-wg" }
telio-pq = { version = "0.1.0", path = "./crates/telio-pq" }
telio-pmtu = { version = "0.1.0", path = "./crates/telio-pmtu" }
telio-pinger = { version = "0.1.0", path = "./crates/telio-pinger" }

[profile.release]
opt-level = "s"
Expand Down
2 changes: 2 additions & 0 deletions crates/telio-nurse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ telio-proto.workspace = true
telio-task.workspace = true
telio-utils.workspace = true
telio-wg.workspace = true
telio-pinger.workspace = true
telio-sockets.workspace = true
once_cell.workspace = true

[dev-dependencies]
Expand Down
8 changes: 7 additions & 1 deletion crates/telio-nurse/src/nurse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use telio_crypto::{PublicKey, SecretKey};
use telio_lana::*;
use telio_model::event::Event;
use telio_sockets::SocketPool;
use telio_task::{
io::{chan, mc_chan, Chan, McChan},
task_exec, ExecError, Runtime, RuntimeExt, Task, WaitResponse,
Expand Down Expand Up @@ -60,9 +61,12 @@ impl Nurse {
io: NurseIo<'_>,
aggregator: Arc<ConnectivityDataAggregator>,
ipv6_enabled: bool,
sock_pool: Arc<SocketPool>,
) -> Self {
Self {
task: Task::start(State::new(public_key, config, io, aggregator, ipv6_enabled).await),
task: Task::start(
State::new(public_key, config, io, aggregator, ipv6_enabled, sock_pool).await,
),
}
}

Expand Down Expand Up @@ -130,6 +134,7 @@ impl State {
io: NurseIo<'_>,
aggregator: Arc<ConnectivityDataAggregator>,
ipv6_enabled: bool,
sock_pool: Arc<SocketPool>,
) -> Self {
let meshnet_id = Self::meshnet_id();
telio_log_debug!("Meshnet ID: {meshnet_id}");
Expand Down Expand Up @@ -181,6 +186,7 @@ impl State {
config_update_channel: config_update_channel.subscribe(),
},
ipv6_enabled,
sock_pool,
)))
} else {
None
Expand Down
23 changes: 17 additions & 6 deletions crates/telio-nurse/src/qos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use telio_model::features::RttType;
use telio_task::{io::mc_chan, Runtime, RuntimeExt, WaitResponse};
use telio_wg::uapi::{AnalyticsEvent, PeerState};

use telio_utils::{
interval, telio_log_debug, telio_log_trace, DualPingResults, DualTarget, IpStack, Pinger,
};
use telio_pinger::{DualPingResults, Pinger};
use telio_sockets::SocketPool;
use telio_utils::{interval, telio_log_debug, telio_log_trace, DualTarget, IpStack};

use crate::{config::QoSConfig, data::MeshConfigUpdateEvent};

Expand Down Expand Up @@ -240,15 +240,22 @@ impl Analytics {
///
/// * `config` - Config for QoS component.
/// * `io` - Channel(s) for communicating with WireGuard.
/// * `ipv6_enabled` - if ipv6 is supported.
/// * `socket_pool` - SocketPool used to protect the sockets.
///
/// # Returns
///
/// A new `Analytics` instance with the given configuration but with no nodes.
pub fn new(config: QoSConfig, io: Io, ipv6_enabled: bool) -> Self {
pub fn new(
config: QoSConfig,
io: Io,
ipv6_enabled: bool,
socket_pool: Arc<SocketPool>,
) -> Self {
let (ping_channel_tx, ping_channel_rx) = mpsc::channel(1);

let ping_backend = if config.rtt_types.contains(&RttType::Ping) {
Arc::new(Pinger::new(config.rtt_tries, ipv6_enabled).ok())
Arc::new(Pinger::new(config.rtt_tries, ipv6_enabled, Some(socket_pool)).ok())
} else {
Arc::new(None)
};
Expand Down Expand Up @@ -600,6 +607,7 @@ mod tests {
collections::HashSet,
net::{Ipv4Addr, Ipv6Addr},
};
use telio_sockets::NativeProtector;
use telio_task::{
io::{mc_chan::Tx, McChan},
task_exec, Task,
Expand Down Expand Up @@ -931,9 +939,12 @@ mod tests {
rtt_types: vec![RttType::Ping],
buckets: 5,
};
let sock_pool = Arc::new(SocketPool::new(
NativeProtector::new(true).expect("Failed to create NativeProtector"),
));

(
Analytics::new(config, io, true),
Analytics::new(config, io, true, sock_pool),
manual_trigger_channel.tx,
wg_channel.tx,
)
Expand Down
18 changes: 18 additions & 0 deletions crates/telio-pinger/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "telio-pinger"
version = "0.1.0"
edition = "2021"
license = "GPL-3.0-only"
repository = "https://github.com/NordSecurity/libtelio"
publish = false

[dependencies]
socket2.workspace = true
telio-utils.workspace = true
telio-sockets.workspace = true
surge-ping.workspace = true
rand.workspace = true
tracing.workspace = true

[dev-dependencies]
tokio = { workspace = true, features = ["net", "sync", "test-util"] }
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::time::Duration;
use std::{convert::TryInto, net::IpAddr};
use surge_ping::{Client, Config as PingerConfig, PingIdentifier, PingSequence, ICMP};

use crate::{telio_log_debug, telio_log_error, DualTarget};
use telio_sockets::SocketPool;
use telio_utils::{telio_log_debug, telio_log_error, telio_log_trace, DualTarget};

/// Information needed to check the reachability of endpoints.
///
Expand Down Expand Up @@ -42,20 +43,37 @@ pub struct DualPingResults {
impl Pinger {
const PING_TIMEOUT: Duration = Duration::from_secs(5);

/// Create new instance of `Ping`.
/// Create new instance of `Ping` with socket pool.
///
/// # Arguments
///
/// * `no_of_tries` - How many pings should be sent.
pub fn new(no_of_tries: u32, ipv6: bool) -> std::io::Result<Self> {
/// * `ipv6` - if ipv6 is supported.
/// * `sock_pool` - Optional SocketPool used to protect the sockets.
pub fn new(
no_of_tries: u32,
ipv6: bool,
socket_pool: Option<Arc<SocketPool>>,
) -> std::io::Result<Self> {
let client_v6 = if ipv6 {
Some(Arc::new(Self::build_client(ICMP::V6)?))
let client_v6 = Arc::new(Self::build_client(ICMP::V6)?);
if let Some(ref socket_pool) = socket_pool {
telio_log_trace!("Making pinger IPv6 socket internal");
socket_pool.make_internal(client_v6.get_socket().get_native_sock())?;
}
Some(client_v6)
} else {
None
};

let client_v4 = Arc::new(Self::build_client(ICMP::V4)?);
if let Some(ref socket_pool) = socket_pool {
telio_log_trace!("Making pinger IPv4 socket internal");
socket_pool.make_internal(client_v4.get_socket().get_native_sock())?;
}

Ok(Self {
client_v4: Arc::new(Self::build_client(ICMP::V4)?),
client_v4,
client_v6,
no_of_tries,
})
Expand Down Expand Up @@ -139,9 +157,17 @@ impl Pinger {

let mut sum = Duration::default();

// TODO: Remove after testing
let mut payload = [0xFF; 56];
let data = b"DEADBEEF";
let offset = payload.len() - data.len();
payload[offset..].copy_from_slice(data);

for i in 0..self.no_of_tries {
match pinger
.ping(PingSequence(i.try_into().unwrap_or(0)), &[0; 56])
// TODO: Revert after testing
// .ping(PingSequence(i.try_into().unwrap_or(0)), &[0; 56])
.ping(PingSequence(i.try_into().unwrap_or(0)), &payload)
.await
{
Ok((_, duration)) => {
Expand Down Expand Up @@ -181,3 +207,41 @@ impl Pinger {
Client::new(&config_builder.build())
}
}

#[cfg(test)]
mod tests {
use super::*;
use telio_sockets::NativeProtector;

// Basic constructor test
#[tokio::test]
async fn test_pinger_new_v6_sock_pool() {
let pinger = Pinger::new(1, true, None).expect("Failed to create Pinger");
assert!(pinger.client_v4.get_socket().get_native_sock() > 0);
assert!(pinger.client_v6.is_some());
assert_eq!(pinger.no_of_tries, 1);
}

// Basic ping test
#[tokio::test]
async fn test_ping_localhost() {
let pinger = Pinger::new(
2,
false,
Some(Arc::new(SocketPool::new(
NativeProtector::new(true).expect("Failed to create Protector"),
))),
)
.expect("Failed to create Pinger");

let target =
DualTarget::new(("127.0.0.1".parse().ok(), None)).expect("Failed to create target");

let result = pinger.perform(target).await;
assert!(
result.v4.unwrap().successful_pings > 0,
"Expected at least one successful ping to 127.0.0.1"
);
assert!(result.v6.is_none());
}
}
4 changes: 0 additions & 4 deletions crates/telio-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ pub use interval::*;
pub mod ip_stack;
pub use ip_stack::*;

/// Basic ICMP Pinger
pub mod pinger;
pub use pinger::*;

/// Testing tools
pub mod test;

Expand Down
1 change: 1 addition & 0 deletions crates/telio-wg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ telio-model.workspace = true
telio-sockets.workspace = true
telio-task.workspace = true
telio-utils.workspace = true
telio-pinger.workspace = true
itertools.workspace = true

[dev-dependencies]
Expand Down
15 changes: 13 additions & 2 deletions crates/telio-wg/src/link_detection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use telio_model::{
features::FeatureLinkDetection,
mesh::{LinkState, NodeState},
};
use telio_sockets::SocketPool;
use telio_task::io::{chan, Chan};
use telio_utils::{
get_ip_stack, telio_err_with_log, telio_log_debug, telio_log_error, telio_log_trace,
Expand All @@ -35,10 +36,20 @@ pub struct LinkDetection {
}

impl LinkDetection {
pub fn new(cfg: FeatureLinkDetection, ipv6_enabled: bool) -> Self {
pub fn new(
cfg: FeatureLinkDetection,
ipv6_enabled: bool,
socket_pool: Option<Arc<SocketPool>>,
) -> Self {
let ping_channel = Chan::default();
let enhanced_detection = if cfg.no_of_pings != 0 {
EnhancedDetection::start_with(ping_channel.rx, cfg.no_of_pings, ipv6_enabled).ok()
EnhancedDetection::start_with(
ping_channel.rx,
cfg.no_of_pings,
ipv6_enabled,
socket_pool,
)
.ok()
} else {
None
};
Expand Down
17 changes: 13 additions & 4 deletions crates/telio-wg/src/link_detection/enhanced_detection.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use async_trait::async_trait;
use std::net::IpAddr;
use std::{net::IpAddr, sync::Arc};

use telio_pinger::Pinger;
use telio_sockets::SocketPool;
use telio_task::{io::chan, task_exec, Runtime, RuntimeExt, Task, WaitResponse};
use telio_utils::{ip_stack, DualTarget, IpStack, Pinger};
use telio_utils::{ip_stack, DualTarget, IpStack};

use telio_utils::telio_log_debug;
/// Component used to check the link state when we think it's down.
Expand All @@ -17,9 +19,15 @@ impl EnhancedDetection {
ping_channel: chan::Rx<(Vec<IpAddr>, Option<IpStack>)>,
no_of_pings: u32,
ipv6_enabled: bool,
socket_pool: Option<Arc<SocketPool>>,
) -> std::io::Result<Self> {
Ok(Self {
task: Task::start(State::new(ping_channel, no_of_pings, ipv6_enabled)?),
task: Task::start(State::new(
ping_channel,
no_of_pings,
ipv6_enabled,
socket_pool,
)?),
})
}

Expand All @@ -38,10 +46,11 @@ impl State {
ping_channel: chan::Rx<(Vec<IpAddr>, Option<IpStack>)>,
no_of_pings: u32,
ipv6_enabled: bool,
socket_pool: Option<Arc<SocketPool>>,
) -> std::io::Result<Self> {
Ok(State {
ping_channel,
pinger: Pinger::new(no_of_pings, ipv6_enabled)?,
pinger: Pinger::new(no_of_pings, ipv6_enabled, socket_pool)?,
})
}
}
Expand Down
Loading

0 comments on commit e5ec181

Please sign in to comment.