Skip to content

Commit

Permalink
Add metrics for connection method
Browse files Browse the repository at this point in the history
  • Loading branch information
nieznanysprawiciel committed Jun 28, 2023
1 parent 194b53d commit 87ede5e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 8 deletions.
8 changes: 7 additions & 1 deletion client/src/_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use metrics::{describe_counter, register_counter, Unit};
use metrics::{describe_counter, describe_gauge, register_counter, register_gauge, Unit};

pub static SOURCE_ID: &str = "SourceId";
pub static TARGET_ID: &str = "TargetId";
Expand All @@ -13,6 +13,8 @@ pub fn register_metrics() {
register_counter!("ya-relay.packet.udp.outgoing.num");
register_counter!("ya-relay.packet.udp.incoming.size");
register_counter!("ya-relay.packet.udp.incoming.num");
register_gauge!("ya-relay.client.session.type");
register_gauge!("ya-relay.client.public-address");

describe_counter!(
"ya-relay.packet.tcp.outgoing.size",
Expand All @@ -24,4 +26,8 @@ pub fn register_metrics() {
Unit::Count,
"Number of outgoing tcp packets"
);
describe_gauge!(
"ya-relay.client.session.type",
"Type of established session with Node. Check `ConnectionMethod` for numbers meaning."
);
}
47 changes: 40 additions & 7 deletions client/src/_session_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use async_trait::async_trait;
use derive_more::Display;
use futures::future::{AbortHandle, LocalBoxFuture};
use futures::{FutureExt, SinkExt, TryFutureExt};
use metrics::gauge;
use std::collections::{HashMap, HashSet, VecDeque};
use std::convert::{TryFrom, TryInto};
use std::net::SocketAddr;
Expand All @@ -23,6 +24,7 @@ use crate::_session_state::{RelayedState, ReverseState, SessionState};
use crate::_session_traits::{SessionDeregistration, SessionRegistration};
use crate::_transport_layer::ForwardReceiver;

use crate::_metrics::TARGET_ID;
use ya_relay_core::identity::Identity;
use ya_relay_core::server_session::{Endpoint, NodeInfo, SessionId, TransportType};
use ya_relay_core::udp_stream::{udp_bind, OutStream};
Expand All @@ -37,11 +39,13 @@ use ya_relay_stack::Channel;

type ReqFingerprint = (Vec<u8>, u64);

/// Describes which method was used to establish connection.
/// Numbers mapping is used on Grafana metrics. 0 is reserved for no session.
#[derive(Copy, Clone, Display, PartialEq, Eq)]
pub enum ConnectionMethod {
Direct,
Reverse,
Relay,
Direct = 1,
Reverse = 2,
Relay = 3,
// Here should be NAT punching type(s) defined in the future
}

Expand Down Expand Up @@ -264,6 +268,7 @@ impl SessionDeregistration for SessionLayer {
}
}

gauge!("ya-relay.client.session.type", ConnectionMethod::no_connection(), TARGET_ID => session.owner.default_id.to_string());
log::info!(
"Session {} with [{}] ({}) closed",
session.raw.id,
Expand Down Expand Up @@ -680,7 +685,10 @@ impl SessionLayer {
log::debug!("Attempting to establish direct p2p connection with [{node_id}].");

match self.try_direct_session(node_id, permit).await {
Ok(session) => return Ok(session),
Ok(session) => {
gauge!("ya-relay.client.session.type", ConnectionMethod::Direct.metric(), TARGET_ID => node_id.to_string());
return Ok(session);
}
// We can still try other methods.
Err(SessionError::NotApplicable(e)) => {
log::debug!("Can't establish direct p2p session with [{node_id}]. {e}");
Expand All @@ -699,7 +707,10 @@ impl SessionLayer {
log::debug!("Attempting to establish reverse p2p connection with [{node_id}].");

match self.try_reverse_connection(node_id, permit).await {
Ok(session) => return Ok(session),
Ok(session) => {
gauge!("ya-relay.client.session.type", ConnectionMethod::Reverse.metric(), TARGET_ID => node_id.to_string());
return Ok(session);
}
// We can still try other methods.
Err(SessionError::NotApplicable(e)) => {
log::debug!("Can't establish reverse p2p session with [{node_id}]. {e}");
Expand Down Expand Up @@ -727,7 +738,10 @@ impl SessionLayer {
log::info!("Attempting to use relay Server to forward packets to [{node_id}]");

match self.try_relayed_connection(node_id, permit).await {
Ok(session) => return Ok(session),
Ok(session) => {
gauge!("ya-relay.client.session.type", ConnectionMethod::Relay.metric(), TARGET_ID => node_id.to_string());
return Ok(session);
}
Err(e) => log::debug!("Can't use relayed connection with [{node_id}]. {e}"),
}
} else {
Expand All @@ -742,7 +756,7 @@ impl SessionLayer {

pub async fn try_server_session(
&self,
_node_id: NodeId,
node_id: NodeId,
addr: SocketAddr,
permit: &SessionPermit,
) -> SessionResult<Arc<DirectSession>> {
Expand All @@ -759,9 +773,13 @@ impl SessionLayer {
.into_iter()
.find_map(|endpoint| endpoint.try_into().ok())
{
gauge!("ya-relay.client.public-address", 1.0);
self.set_public_addr(Some(addr)).await;
} else {
gauge!("ya-relay.client.public-address", 0.0);
}

gauge!("ya-relay.client.session.type", ConnectionMethod::Direct.metric(), TARGET_ID => node_id.to_string());
Ok(session)
}

Expand Down Expand Up @@ -981,6 +999,7 @@ impl SessionLayer {
.run_abortable(protocol.new_session(request_id, from, &permit, request))
.await,
)?;
gauge!("ya-relay.client.session.type", ConnectionMethod::Direct.metric(), TARGET_ID => remote_id.to_string());
Ok(())
}
SessionLock::Wait(waiter) => {
Expand All @@ -1001,6 +1020,10 @@ impl SessionLayer {
.await
}

/// TODO: Don't respond to ping when we don't have session with other Node.
/// In this case we should probably send disconnect, so the other Node will re-establish
/// session.
/// This doesn't work with relay, which sends ping to find out if we have public IP.
pub async fn on_ping(
&self,
session_id: Vec<u8>,
Expand Down Expand Up @@ -1412,6 +1435,16 @@ impl Handler for SessionLayer {
}
}

impl ConnectionMethod {
pub fn metric(&self) -> f64 {
(*self as u16) as f64
}

pub fn no_connection() -> f64 {
0.0
}
}

mod testing {
use crate::_session_layer::SessionLayer;
use crate::_session_protocol::SessionInitializer;
Expand Down

0 comments on commit 87ede5e

Please sign in to comment.