Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2p stability - support for updated ya-relay #2614

Merged
merged 28 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ffc29dc
Yagna is able to run, but doesn't do anything more
nieznanysprawiciel Jun 7, 2023
0ea1789
Semi-working yagna integration
nieznanysprawiciel Jun 7, 2023
5ee9d0f
Freeze ya-relay repository revision
nieznanysprawiciel Jun 13, 2023
9c8854d
Fix clippy
nieznanysprawiciel Jun 13, 2023
d676a53
Merge branch 'master' into p2p-stability
nieznanysprawiciel Jun 14, 2023
eefaa13
Allow back non_snake_case_lint
MrDarthShoe Jul 6, 2023
7141621
Apply changes in ya-relay-client api related to getting rid of channe…
nieznanysprawiciel Aug 9, 2023
139de27
Bump up ya-relay-client and stack to latest version on main
MrDarthShoe Aug 9, 2023
cae1984
merge master
staszek-krotki Sep 15, 2023
118a41d
smoltcp .10
staszek-krotki Sep 15, 2023
cc8bb55
IpAddress doesn't have unknowns
staszek-krotki Sep 15, 2023
af00a0e
FailFast::No
staszek-krotki Sep 18, 2023
822df7c
ya-relay version update
pwalski Sep 20, 2023
e615183
Goth tests with locked ya-runtime-vm and ya-relay versions
pwalski Sep 20, 2023
b4f48b2
Goth version update
pwalski Sep 21, 2023
b1e73a1
merge master
staszek-krotki Sep 25, 2023
5c57424
current ya-relay
staszek-krotki Sep 25, 2023
9377b8c
Cargo.lock
pwalski Sep 25, 2023
715ad75
poetry.lock
pwalski Sep 25, 2023
cf4c40c
goth config overrides with ya-relay and ya-runtime-vm artifacts versi…
pwalski Sep 27, 2023
c105c07
Merge branch 'master' into p2p-stability
pwalski Sep 27, 2023
25d1d29
configurable session_request_timeout
staszek-krotki Sep 28, 2023
5fffe46
Merge branch 'p2p-stability' of github.com:golemfactory/yagna into p2…
staszek-krotki Sep 28, 2023
61cb2b5
Merge branch 'master' into p2p-stability
scx1332 Sep 28, 2023
f641c84
Merge branch 'release/v0.13' into p2p-stability
prekucki Sep 28, 2023
8e0b38b
Merge branch 'release/v0.13' into p2p-stability
pwalski Oct 4, 2023
3a61c84
update ya-relay
staszek-krotki Oct 10, 2023
7598f5a
Merge branch 'release/v0.13' into p2p-stability
pwalski Oct 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,658 changes: 894 additions & 764 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,17 @@ ya-sb-util = { git = "https://github.com/golemfactory/ya-service-bus.git", rev =
#ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "2a6350f62cf8d926721225a3451822731456e3fe" }

## RELAY and networking stack
ya-relay-stack = { git = "https://github.com/golemfactory/ya-relay.git", rev = "c92a75b0cf062fcc9dbb3ea2a034d913e5fad8e5" }
ya-relay-client = { git = "https://github.com/golemfactory/ya-relay.git", rev = "c92a75b0cf062fcc9dbb3ea2a034d913e5fad8e5" }
ya-relay-stack = { git = "https://github.com/golemfactory/ya-relay.git", rev = "29c860046ecc4419f0ed068da909adbd959a37ed" }
ya-relay-client = { git = "https://github.com/golemfactory/ya-relay.git", rev = "29c860046ecc4419f0ed068da909adbd959a37ed" }
# TODO: We shouldn't reference `ya-relay-core` and `ya-relay-proto` directly
ya-relay-core = { git = "https://github.com/golemfactory/ya-relay.git", rev = "29c860046ecc4419f0ed068da909adbd959a37ed" }
ya-relay-proto = { git = "https://github.com/golemfactory/ya-relay.git", rev = "29c860046ecc4419f0ed068da909adbd959a37ed" }

#ya-relay-stack = { path = "../ya-relay/crates/stack" }
#ya-relay-client = { path = "../ya-relay/client" }
#ya-relay-core = { path = "../ya-relay/crates/core" }
#ya-relay-proto = { path = "../ya-relay/crates/proto" }


## OTHERS
gftp = { path = "core/gftp" }
Expand Down
2 changes: 1 addition & 1 deletion core/gsb-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ mod tests {
#[test_case(r#"{ "id": "some", "error": {} }"#, Frame::Close(Some(CloseReason {
code: CloseCode::Policy,
description: Some("Failed to read response. Err: Missing 'payload' and 'error' fields. Id: some.".to_string()) }));
"Close when error empty (error needs at least top level error name field)"
"Close when error is empty because it needs at least top level error name field"
)]
#[actix_web::test]
#[serial]
Expand Down
3 changes: 2 additions & 1 deletion core/net/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use url::Url;
#[derive(
StructOpt, EnumString, EnumVariantNames, IntoStaticStr, Copy, Clone, Eq, PartialEq, Debug,
)]
#[strum(serialize_all = "lowercase")]
#[strum(serialize_all = "kebab-case")]
pub enum NetType {
Central,
Hybrid,
HybridV2,
}

#[derive(StructOpt, Clone)]
Expand Down
64 changes: 64 additions & 0 deletions core/net/src/hybrid_v2/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use ya_core_model::net;
use ya_core_model::net::local::{
BindBroadcastError, BroadcastMessage, SendBroadcastMessage, ToEndpoint,
};
use ya_service_bus::{typed as bus, Error, RpcEndpoint, RpcMessage};

pub async fn broadcast<M, S>(
caller: S,
message: M,
) -> Result<
Result<
<SendBroadcastMessage<M> as RpcMessage>::Item,
<SendBroadcastMessage<M> as RpcMessage>::Error,
>,
Error,
>
where
M: BroadcastMessage + Send + Sync + Unpin + 'static,
S: ToString + 'static,
{
// TODO: We shouldn't use send_as. Put identity inside broadcasted message instead.
bus::service(net::local::BUS_ID)
.send_as(caller, SendBroadcastMessage::new(message))
.await
}

pub async fn bind_broadcast_with_caller<M, T, F>(
broadcast_address: &str,
handler: F,
) -> Result<(), BindBroadcastError>
where
M: BroadcastMessage + Send + Sync + 'static,
T: std::future::Future<
Output = Result<
<SendBroadcastMessage<M> as RpcMessage>::Item,
<SendBroadcastMessage<M> as RpcMessage>::Error,
>,
> + 'static,
F: FnMut(String, SendBroadcastMessage<M>) -> T + Send + 'static,
{
log::debug!("Creating broadcast topic {}.", M::TOPIC);

let address = broadcast_address.to_string();
let subscription = M::into_subscribe_msg(address.clone());

log::trace!(
"Binding broadcast handler for topic: {}",
subscription.topic()
);

bus::service(net::local::BUS_ID)
.send(subscription)
.await??;

log::debug!(
"Binding handler '{broadcast_address}' for broadcast topic {}.",
M::TOPIC
);

// We created endpoint address above. Now we must add handler, which will
// handle broadcasts forwarded to this address.
bus::bind_with_caller(broadcast_address, handler);
Ok(())
}
243 changes: 243 additions & 0 deletions core/net/src/hybrid_v2/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
use anyhow::anyhow;
use futures::future::join_all;
use futures::TryFutureExt;
use std::time::{Duration, Instant};
use ya_client_model::NodeId;

use ya_core_model::net as ya_net;
use ya_core_model::net::local::{FindNodeResponse, GsbPingResponse, StatusError};
use ya_core_model::net::{
local as model, GenericNetError, GsbRemotePing, RemoteEndpoint, DIAGNOSTIC,
};
use ya_relay_client::ChannelMetrics;
use ya_service_bus::timeout::IntoTimeoutFuture;
use ya_service_bus::typed::ServiceBinder;
use ya_service_bus::{typed as bus, RpcEndpoint};

use crate::hybrid_v2::Net;

pub(crate) fn bind_service() {
let _ = bus::bind(model::BUS_ID, |ping: model::GsbPing| {
cli_ping(ping.nodes).map_err(status_err)
});

let _ = bus::bind(model::BUS_ID, |msg: model::Connect| {
connect(msg).map_err(|e| GenericNetError(e.to_string()))
});

let _ = bus::bind(model::BUS_ID, |msg: model::Disconnect| {
disconnect(msg.node).map_err(|e| GenericNetError(e.to_string()))
});

ServiceBinder::new(DIAGNOSTIC, &(), ())
.bind(move |_, _caller: String, _msg: GsbRemotePing| async move { Ok(GsbRemotePing {}) });

let _ = bus::bind(model::BUS_ID, move |_: model::Status| {
async move {
let client = Net::client().await?;

Ok(model::StatusResponse {
node_id: client.node_id().await?,
listen_address: client.bind_addr().await?,
public_address: client.public_addr().await?,
sessions: client.sessions().await?.len(),
metrics: to_status_metrics(&mut client.metrics().await?),
})
}
.map_err(status_err)
});
let _ = bus::bind(model::BUS_ID, move |_: model::Sessions| {
async move {
let client = Net::client().await?;
let mut responses = Vec::new();
let now = Instant::now();

let mut metrics = client.session_metrics().await?;

for session in client.sessions().await? {
let node_id = client.remote_id(session.remote).await?;
let kind = match node_id {
Some(id) => {
let is_p2p = client.is_p2p(id).await?;
if is_p2p {
"p2p"
} else {
"relay"
}
}
None => "server",
};

let mut metric = node_id
.and_then(|node_id| metrics.remove(&node_id))
.unwrap_or_default();

responses.push(model::SessionResponse {
node_id,
id: session.id.to_string(),
session_type: kind.to_string(),
remote_address: session.remote,
seen: now - session.last_seen,
duration: now - session.created,
ping: session.last_ping,
metrics: to_status_metrics(&mut metric),
});
}

Ok(responses)
}
.map_err(status_err)
});
let _ = bus::bind(model::BUS_ID, move |_: model::Sockets| {
async move {
let client = Net::client().await?;

let sockets = client
.sockets()
.await?
.into_iter()
.map(|(desc, mut state)| model::SocketResponse {
protocol: desc.protocol.to_string().to_lowercase(),
state: state.to_string(),
local_port: desc.local.port_repr(),
remote_addr: desc.remote.addr_repr(),
remote_port: desc.remote.port_repr(),
metrics: to_status_metrics(state.inner_mut()),
})
.collect();

Ok(sockets)
}
.map_err(status_err)
});
let _ = bus::bind(model::BUS_ID, move |find: model::FindNode| {
async move {
let client = Net::client().await?;
let node_id: NodeId = find.node_id.parse()?;
client.find_node(node_id).await?
}
.map_err(status_err)
});
}

fn to_status_metrics(metrics: &mut ChannelMetrics) -> model::StatusMetrics {
let time = Instant::now();
model::StatusMetrics {
tx_total: metrics.tx.long.sum() as usize,
tx_avg: metrics.tx.long.average(time),
tx_current: metrics.tx.short.average(time),
rx_total: metrics.rx.long.sum() as usize,
rx_avg: metrics.rx.long.average(time),
rx_current: metrics.rx.short.average(time),
}
}

pub async fn connect(msg: model::Connect) -> anyhow::Result<FindNodeResponse> {
log::info!("Connecting to Node: {}", msg.node);

let client = Net::client().await?;
client.connect(msg.clone()).await??;

client.find_node(msg.node).await?
}

pub async fn disconnect(node: NodeId) -> anyhow::Result<()> {
log::info!("Disconnecting from Node: {node}");

let client = Net::client().await?;
Ok(client.disconnect(node).await??)
}

pub async fn cli_ping(nodes: Vec<NodeId>) -> anyhow::Result<Vec<GsbPingResponse>> {
let client = Net::client().await?;

// This will update sessions ping. We don't display them in this view
// but I think it is good place to enforce this.
client.ping_sessions().await?;

let nodes = match nodes.is_empty() {
true => client.connected_nodes().await?,
false => nodes.into_iter().map(|id| (id, None)).collect(),
};

let our_node_id = client.node_id().await?;
let ping_timeout = Duration::from_secs(10);

log::debug!("Ping: Num connected nodes: {}", nodes.len());

let mut results = join_all(
nodes
.iter()
.map(|(id, _)| {
let target_id = *id;

let udp_future = async move {
let udp_before = Instant::now();

ya_net::from(our_node_id)
.to(target_id)
.service_udp(ya_net::DIAGNOSTIC)
.send(GsbRemotePing {})
.timeout(Some(ping_timeout))
.await???;

anyhow::Ok(udp_before.elapsed())
}
.map_err(|e| anyhow!("(Udp ping). {e}"));

let tcp_future = async move {
let tcp_before = Instant::now();

ya_net::from(our_node_id)
.to(target_id)
.service(ya_net::DIAGNOSTIC)
.send(GsbRemotePing {})
.timeout(Some(ping_timeout))
.await???;

anyhow::Ok(tcp_before.elapsed())
}
.map_err(|e| anyhow!("(Tcp ping). {e}"));

futures::future::join(udp_future, tcp_future)
})
.collect::<Vec<_>>(),
)
.await
.into_iter()
.enumerate()
.map(|(idx, results)| {
if let Err(e) = &results.0 {
log::warn!("Failed to ping node: {} {e}", nodes[idx].0);
}
if let Err(e) = &results.1 {
log::warn!("Failed to ping node: {} {e}", nodes[idx].0);
}

let udp_ping = results.0.unwrap_or(ping_timeout);
let tcp_ping = results.1.unwrap_or(ping_timeout);

GsbPingResponse {
node_id: nodes[idx].0,
node_alias: nodes[idx].1,
tcp_ping,
udp_ping,
is_p2p: false, // Updated later
}
})
.collect::<Vec<_>>();

for result in &mut results {
let main_id = match client.get_alias(result.node_id).await.ok().flatten() {
Some(id) => id,
None => result.node_id,
};
result.is_p2p = client.is_p2p(main_id).await?;
}
Ok(results)
}

#[inline]
fn status_err(e: anyhow::Error) -> StatusError {
StatusError::RuntimeException(e.to_string())
}
Loading