diff --git a/client/src/_client.rs b/client/src/_client.rs index 632559ed..fe3127e5 100644 --- a/client/src/_client.rs +++ b/client/src/_client.rs @@ -4,11 +4,13 @@ use anyhow::{anyhow, bail}; use derive_more::From; use futures::future::{join_all, AbortHandle}; -use futures::{FutureExt, TryFutureExt}; +use futures::{FutureExt, SinkExt, TryFutureExt}; use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; +use std::future::Future; use std::iter::zip; use std::net::SocketAddr; +use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::RwLock; @@ -26,6 +28,7 @@ pub use crate::_error::SessionError; pub use crate::_session::SessionDesc; pub use crate::_transport_layer::ForwardReceiver; +use crate::_direct_session::DirectSession; pub use ya_relay_core::server_session::TransportType; pub use ya_relay_stack::{ChannelMetrics, SocketDesc, SocketState}; @@ -82,19 +85,20 @@ impl Client { } pub async fn remote_id(&self, addr: &SocketAddr) -> Option { - todo!() - // self.transport.session_layer.remote_id(addr).await + self.transport.session_layer.remote_id(addr).await } pub async fn sessions(&self) -> Vec { - todo!() - // self.transport - // .session_layer - // .sessions() - // .await - // .into_iter() - // .map(|s| SessionDesc::from(s.as_ref())) - // .collect() + self.transport + .session_layer + .sessions() + .await + .into_iter() + .filter_map(|s| { + s.upgrade() + .map(|direct| SessionDesc::from(direct.raw.as_ref())) + }) + .collect() } pub async fn find_node( @@ -117,35 +121,38 @@ impl Client { #[inline] pub async fn session_metrics(&self) -> HashMap { - todo!() - // let mut session_metrics = HashMap::new(); - // - // let sessions = self.sessions.sessions().await; - // let sockets = self.sessions.virtual_tcp.sockets(); - // for session in sessions { - // let node_id = match self.remote_id(&session.remote).await { - // Some(node_id) => node_id, - // None => continue, - // }; - // - // let virt_node = match self.transport.virtual_tcp.resolve_node(node_id).await { - // Ok(virt_node) => virt_node, - // Err(_) => continue, - // }; - // - // sockets - // .iter() - // .filter_map(|(desc, metrics)| { - // desc.remote - // .ip_endpoint() - // .ok() - // .filter(|endpoint| endpoint.addr == virt_node.endpoint.addr) - // .map(|_| metrics.clone().inner_mut().clone()) - // }) - // .reduce(|acc, item| acc + item) - // .and_then(|metrics| session_metrics.insert(node_id, metrics)); - // } - // session_metrics + let mut session_metrics = HashMap::new(); + + let sessions = self.transport.session_layer.sessions().await; + let sockets = self.transport.virtual_tcp.sockets(); + for session in sessions { + let session = match session.upgrade() { + None => continue, + Some(session) => session, + }; + let node_id = match self.remote_id(&session.raw.remote).await { + Some(node_id) => node_id, + None => continue, + }; + + let virt_node = match self.transport.virtual_tcp.resolve_node(node_id).await { + Ok(virt_node) => virt_node, + Err(_) => continue, + }; + + sockets + .iter() + .filter_map(|(desc, metrics)| { + desc.remote + .ip_endpoint() + .ok() + .filter(|endpoint| endpoint.addr == virt_node.address) + .map(|_| metrics.clone().inner_mut().clone()) + }) + .reduce(|acc, item| acc + item) + .and_then(|metrics| session_metrics.insert(node_id, metrics)); + } + session_metrics } #[inline] @@ -227,54 +234,128 @@ impl Client { self.transport.forward_unreliable(node_id).await } + pub async fn ping_sessions(&self) { + let sessions = self.transport.session_layer.sessions().await; + let ping_futures = sessions + .iter() + .filter_map(|session| { + session + .upgrade() + .map(|session| async move { session.raw.ping().await.ok() }) + }) + .collect::>(); + + futures::future::join_all(ping_futures).await; + } + + // Returns connected NodeIds. Single Node can have many identities, so the second + // tuple element contains main NodeId (default Id). + pub async fn connected_nodes(&self) -> Vec<(NodeId, Option)> { + let ids = self.transport.session_layer.list_connected().await; + let aliases = join_all( + ids.iter() + .map(|id| self.transport.session_layer.default_id(*id)) + .collect::>(), + ) + .await + .into_iter(); + zip(ids.into_iter(), aliases).collect() + } + + pub async fn reconnect_server(&self) { + if self.transport.session_layer.close_server_session().await { + log::info!("Reconnecting to Hybrid NET relay server"); + let _ = self.transport.session_layer.server_session().await; + } + } + + pub async fn broadcast(&self, data: Vec, count: u32) -> anyhow::Result<()> { + let node_ids = self + .neighbours(count) + .await + .map_err(|e| anyhow!("Unable to query neighbors: {e}"))?; + + log::debug!("Broadcasting message to {} node(s)", node_ids.len()); + + let broadcast_futures = node_ids + .iter() + .map(|node_id| { + let data = data.clone(); + let node_id = *node_id; + + async move { + log::trace!("Broadcasting message to [{node_id}]"); + + match self.forward_unreliable(node_id).await { + Ok(mut forward) => { + if forward.send(data.into()).await.is_err() { + bail!("Cannot broadcast to {node_id}: channel closed"); + } + } + Err(e) => { + bail!("Cannot broadcast to {node_id}: {e}"); + } + }; + anyhow::Result::<()>::Ok(()) + } + .map_err(|e| log::debug!("Failed to broadcast: {e}")) + .map(|_| ()) + .boxed_local() + }) + .collect::>>>>(); + + futures::future::join_all(broadcast_futures).await; + Ok(()) + } + pub async fn neighbours(&self, count: u32) -> anyhow::Result> { - todo!() - // if let Some(neighbours) = { - // let state = self.state.read().await; - // state.neighbours.clone() - // } { - // if neighbours.nodes.len() as u32 >= count - // && neighbours.updated + self.config.neighbourhood_ttl > Instant::now() - // { - // return Ok(neighbours.nodes); - // } - // } - // - // log::debug!("Asking NET relay Server for neighborhood ({}).", count); - // - // let neighbours = self - // .transport - // .session_layer - // .server_session() - // .await - // .map_err(|e| anyhow!("Error establishing session with relay: {e}"))? - // .neighbours(count) - // .await?; - // - // let nodes = neighbours - // .nodes - // .into_iter() - // .filter_map(|n| Identity::try_from(&n).map(|ident| ident.node_id).ok()) - // .collect::>(); - // - // let prev_neighborhood = { - // let mut state = self.state.write().await; - // state.neighbours.replace(Neighbourhood { - // updated: Instant::now(), - // nodes: nodes.clone(), - // }) - // }; - // - // // Compare neighborhood, to see which Nodes could have disappeared. - // if let Some(prev_neighbors) = prev_neighborhood { - // tokio::task::spawn_local( - // self.clone() - // .check_nodes_connection(prev_neighbors, nodes.clone()) - // .map_err(|e| log::debug!("Checking disappeared neighbors failed. {}", e)), - // ); - // } - // - // Ok(nodes) + if let Some(neighbours) = { + let state = self.state.read().await; + state.neighbours.clone() + } { + if neighbours.nodes.len() as u32 >= count + && neighbours.updated + self.config.neighbourhood_ttl > Instant::now() + { + return Ok(neighbours.nodes); + } + } + + log::debug!("Asking NET relay Server for neighborhood ({count})."); + + let neighbours = self + .transport + .session_layer + .server_session() + .await + .map_err(|e| anyhow!("Error establishing session with relay: {e}"))? + .raw + .neighbours(count) + .await?; + + let nodes = neighbours + .nodes + .into_iter() + .filter_map(|n| Identity::try_from(&n).map(|ident| ident.node_id).ok()) + .collect::>(); + + let prev_neighborhood = { + let mut state = self.state.write().await; + state.neighbours.replace(Neighbourhood { + updated: Instant::now(), + nodes: nodes.clone(), + }) + }; + + // Compare neighborhood, to see which Nodes could have disappeared. + if let Some(prev_neighbors) = prev_neighborhood { + tokio::task::spawn_local( + self.clone() + .check_nodes_connection(prev_neighbors, nodes.clone()) + .map_err(|e| log::debug!("Checking disappeared neighbors failed. {e}")), + ); + } + + Ok(nodes) } pub async fn invalidate_neighbourhood_cache(&self) { @@ -286,117 +367,41 @@ impl Client { prev_neighbors: Neighbourhood, new_neighbors: Vec, ) -> anyhow::Result<()> { - todo!() - // let prev_neighbors: HashSet<_> = prev_neighbors.nodes.into_iter().collect(); - // let new_neighbors: HashSet<_> = new_neighbors.into_iter().collect(); - // - // let lost_neighbors = prev_neighbors - // .difference(&new_neighbors) - // .cloned() - // .collect::>(); - // - // if lost_neighbors.is_empty() { - // return Ok(()); - // } - // - // let server = self.transport.session_layer.server_session().await?; - // for neighbor in lost_neighbors { - // if self.transport.session_layer.is_p2p(&neighbor).await { - // continue; - // } - // - // log::debug!( - // "Neighborhood changed. Checking state of Node [{}] on relay Server.", - // neighbor - // ); - // - // // If we can't find node on relay, most probably it lost connection. - // // We remove this Node, otherwise we will have problems to connect to it later, - // // because we will have outdated entry in our registry. - // if server.find_node(neighbor).await.is_err() { - // log::info!( - // "Node [{}], which was earlier in our neighborhood, disconnected.", - // neighbor - // ); - // self.sessions.remove_node(neighbor).await; - // } - // } - // - // Ok(()) - } + let prev_neighbors: HashSet<_> = prev_neighbors.nodes.into_iter().collect(); + let new_neighbors: HashSet<_> = new_neighbors.into_iter().collect(); - pub async fn broadcast(&self, data: Vec, count: u32) -> anyhow::Result<()> { - todo!() - // let node_ids = self - // .neighbours(count) - // .await - // .map_err(|e| anyhow!("Unable to query neighbors: {e}"))?; - // - // log::debug!("Broadcasting message to {} node(s)", node_ids.len()); - // - // let broadcast_futures = node_ids - // .iter() - // .map(|node_id| { - // let data = data.clone(); - // let node_id = *node_id; - // - // async move { - // log::trace!("Broadcasting message to [{node_id}]"); - // - // match self.forward_unreliable(node_id).await { - // Ok(mut forward) => { - // if forward.send(data.into()).await.is_err() { - // bail!("Cannot broadcast to {node_id}: channel closed"); - // } - // } - // Err(e) => { - // bail!("Cannot broadcast to {node_id}: {e}"); - // } - // }; - // anyhow::Result::<()>::Ok(()) - // } - // .map_err(|e| log::debug!("Failed to broadcast: {e}")) - // .map(|_| ()) - // .boxed_local() - // }) - // .collect::>>>>(); - // - // futures::future::join_all(broadcast_futures).await; - // Ok(()) - } + let lost_neighbors = prev_neighbors + .difference(&new_neighbors) + .cloned() + .collect::>(); - pub async fn ping_sessions(&self) { - todo!() - // let sessions = self.sessions.sessions().await; - // let ping_futures = sessions - // .iter() - // .map(|session| async move { session.ping().await.ok() }) - // .collect::>(); - // - // futures::future::join_all(ping_futures).await; - } + if lost_neighbors.is_empty() { + return Ok(()); + } - // Returns connected NodeIds. Single Node can have many identities, so the second - // tuple element contains main NodeId (default Id). - pub async fn connected_nodes(&self) -> Vec<(NodeId, Option)> { - todo!() - // let ids = self.sessions.list_identities().await; - // let aliases = join_all( - // ids.iter() - // .map(|id| self.sessions.alias(id)) - // .collect::>(), - // ) - // .await - // .into_iter(); - // zip(ids.into_iter(), aliases).collect() - } + let server = self.transport.session_layer.server_session().await?; + for neighbor in lost_neighbors { + if self.transport.session_layer.is_p2p(neighbor).await { + continue; + } - pub async fn reconnect_server(&self) { - todo!() - // if self.sessions.drop_server_session().await { - // log::info!("Reconnecting to Hybrid NET relay server"); - // let _ = self.sessions.server_session().await; - // } + log::debug!( + "Neighborhood changed. Checking state of Node [{}] on relay Server.", + neighbor + ); + + // If we can't find node on relay, most probably it lost connection. + // We remove this Node, otherwise we will have problems to connect to it later, + // because we will have outdated entry in our registry. + if server.raw.find_node(neighbor).await.is_err() { + log::info!( + "Node [{neighbor}], which was earlier in our neighborhood, disconnected." + ); + self.transport.session_layer.disconnect(neighbor).await; + } + } + + Ok(()) } pub async fn shutdown(&mut self) -> anyhow::Result<()> { @@ -415,7 +420,6 @@ impl Client { } } -/// TODO: Split to separate file to handle neighborhood management. #[derive(Clone)] pub(crate) struct Neighbourhood { updated: Instant, @@ -428,9 +432,3 @@ pub struct Forwarded { pub node_id: NodeId, pub payload: Payload, } - -#[derive(From, Clone)] -pub enum ForwardId { - SlotId(SlotId), - NodeId(NodeId), -} diff --git a/client/src/_session_layer.rs b/client/src/_session_layer.rs index beaec2b7..9649ef0e 100644 --- a/client/src/_session_layer.rs +++ b/client/src/_session_layer.rs @@ -171,6 +171,19 @@ impl SessionLayer { } } + pub async fn remote_id(&self, addr: &SocketAddr) -> Option { + let state = self.state.read().await; + state + .p2p_sessions + .get(addr) + .map(|direct| direct.owner.default_id) + } + + pub async fn list_connected(&self) -> Vec { + let state = self.state.read().await; + state.nodes.keys().cloned().collect() + } + pub async fn default_id(&self, node_id: NodeId) -> Option { self.registry.get_entry(node_id).await.map(|entry| entry.id) } @@ -365,6 +378,14 @@ impl SessionLayer { ); } + pub(crate) async fn close_server_session(&self) -> bool { + if let Ok(session) = self.server_session().await { + let _ = self.close_session(session).await; + return true; + } + false + } + /// Registers initialized session to be ready to use. pub(crate) async fn register_session( &self, @@ -459,8 +480,6 @@ impl SessionLayer { /// route to destination. pub async fn session(&self, node_id: NodeId) -> Result { if let Some(routing) = self.get_node_routing(node_id).await { - log::debug!("Resolving Node [{node_id}]. Returning already existing connection (route = {} ({})).", routing.route(), routing.session_type()); - // Why we need this ugly solution? Can't we just return `RoutingSender`? // The problem is that we can never have full knowledge about other Node's state. // And we don't know what he knows about our state. It is possible, that other Node will @@ -473,6 +492,8 @@ impl SessionLayer { // And there is second reason: if we have many threads waiting for session, than someone who // will come later, will get through, but the rest of threads would wait for `Established` state. self.await_connected(node_id).await?; + + log::trace!("Resolving Node [{node_id}]. Returning already existing connection (route = {} ({})).", routing.route(), routing.session_type()); return Ok(routing); } @@ -751,7 +772,9 @@ impl SessionLayer { self.config.node_id, ); - todo!() + Err(SessionError::NotApplicable( + "ReverseConnection not implemented.".to_string(), + )) // // let mut awaiting = self.guarded.register_waiting_for_node(node_id).await?; diff --git a/client/src/_tcp_registry.rs b/client/src/_tcp_registry.rs index 71455841..5be54750 100644 --- a/client/src/_tcp_registry.rs +++ b/client/src/_tcp_registry.rs @@ -273,7 +273,12 @@ pub(crate) async fn async_drop( ChannelType::Transfer => node.message.state_notifier.clone(), } .send(result) - .map_err(|_e| log::warn!("Failed to send connection finished broadcast")) + .map_err(|_e| { + log::debug!( + "No one was waiting for info about established tcp connection with [{}]", + node.id() + ) + }) .ok(); } diff --git a/client/src/_transport_layer.rs b/client/src/_transport_layer.rs index 914501c7..766d0824 100644 --- a/client/src/_transport_layer.rs +++ b/client/src/_transport_layer.rs @@ -177,8 +177,6 @@ impl TransportLayer { match self.forward_channel(node_id, channel).await { Some(tx) => Ok(tx), None => { - //self.virtual_tcp_fast_lane.borrow_mut().clear(); - // Check if this isn't secondary identity. TcpLayer should always get default id. // TODO: Consider how to handle changing identities. let info = self.session_layer.query_node_info(node_id).await?; diff --git a/client/src/_virtual_layer.rs b/client/src/_virtual_layer.rs index a1b5ea22..b10549a6 100644 --- a/client/src/_virtual_layer.rs +++ b/client/src/_virtual_layer.rs @@ -26,7 +26,7 @@ use crate::_client::Forwarded; use crate::_error::TcpError; use crate::_session_layer::SessionLayer; use crate::_tcp_registry::{ - to_ipv6, ChannelType, TcpConnection, TcpLock, TcpPermit, TcpRegistry, TcpSender, + to_ipv6, ChannelType, TcpConnection, TcpLock, TcpPermit, TcpRegistry, TcpSender, VirtNode, }; use crate::_transport_layer::ForwardReceiver; @@ -91,6 +91,10 @@ impl TcpLayer { Ok(()) } + pub async fn resolve_node(&self, node: NodeId) -> anyhow::Result { + self.registry.resolve_node(node).await + } + pub async fn remove_node(&self, node_id: NodeId) -> anyhow::Result<()> { let remote_ip = self.registry.resolve_ip(node_id).await; @@ -112,13 +116,13 @@ impl TcpLayer { node_id: NodeId, channel: ChannelType, ) -> anyhow::Result { - log::debug!("[VirtualTcp] Connecting to node [{node_id}], channel: {channel}."); - print_sockets(&self.net); let myself = self.clone(); let connection = match self.registry.connect_attempt(node_id, channel).await { TcpLock::Permit(mut permit) => { + log::debug!("[VirtualTcp] Connecting to node [{node_id}], channel: {channel}."); + // Spawning task protects us from dropping future during initialization. tokio::task::spawn_local(async move { permit.finish(myself.connect_internal(channel, &permit).await) @@ -454,17 +458,17 @@ impl From for ChannelType { } pub fn print_sockets(network: &Network) { - log::debug!("[inet] existing sockets:"); + log::trace!("[inet] existing sockets:"); for (handle, meta, state) in network.sockets_meta() { - log::debug!("[inet] socket: {handle} ({}) {meta}", state.to_string()); + log::trace!("[inet] socket: {handle} ({}) {meta}", state.to_string()); } - log::debug!("[inet] existing connections:"); + log::trace!("[inet] existing connections:"); for (handle, meta) in network.handles.borrow_mut().iter() { - log::debug!("[inet] connection: {handle} {meta}"); + log::trace!("[inet] connection: {handle} {meta}"); } - log::debug!("[inet] listening sockets:"); + log::trace!("[inet] listening sockets:"); for handle in network.bindings.borrow_mut().iter() { - log::debug!("[inet] listening socket: {handle}"); + log::trace!("[inet] listening socket: {handle}"); } }