Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2p stability - support for updated ya-relay #2614

Merged
merged 28 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ffc29dc
Yagna is able to run, but doesn't do anything more
nieznanysprawiciel Jun 7, 2023
0ea1789
Semi-working yagna integration
nieznanysprawiciel Jun 7, 2023
5ee9d0f
Freeze ya-relay repository revision
nieznanysprawiciel Jun 13, 2023
9c8854d
Fix clippy
nieznanysprawiciel Jun 13, 2023
d676a53
Merge branch 'master' into p2p-stability
nieznanysprawiciel Jun 14, 2023
eefaa13
Allow back non_snake_case_lint
MrDarthShoe Jul 6, 2023
7141621
Apply changes in ya-relay-client api related to getting rid of channe…
nieznanysprawiciel Aug 9, 2023
139de27
Bump up ya-relay-client and stack to latest version on main
MrDarthShoe Aug 9, 2023
cae1984
merge master
staszek-krotki Sep 15, 2023
118a41d
smoltcp .10
staszek-krotki Sep 15, 2023
cc8bb55
IpAddress doesn't have unknowns
staszek-krotki Sep 15, 2023
af00a0e
FailFast::No
staszek-krotki Sep 18, 2023
822df7c
ya-relay version update
pwalski Sep 20, 2023
e615183
Goth tests with locked ya-runtime-vm and ya-relay versions
pwalski Sep 20, 2023
b4f48b2
Goth version update
pwalski Sep 21, 2023
b1e73a1
merge master
staszek-krotki Sep 25, 2023
5c57424
current ya-relay
staszek-krotki Sep 25, 2023
9377b8c
Cargo.lock
pwalski Sep 25, 2023
715ad75
poetry.lock
pwalski Sep 25, 2023
cf4c40c
goth config overrides with ya-relay and ya-runtime-vm artifacts versi…
pwalski Sep 27, 2023
c105c07
Merge branch 'master' into p2p-stability
pwalski Sep 27, 2023
25d1d29
configurable session_request_timeout
staszek-krotki Sep 28, 2023
5fffe46
Merge branch 'p2p-stability' of github.com:golemfactory/yagna into p2…
staszek-krotki Sep 28, 2023
61cb2b5
Merge branch 'master' into p2p-stability
scx1332 Sep 28, 2023
f641c84
Merge branch 'release/v0.13' into p2p-stability
prekucki Sep 28, 2023
8e0b38b
Merge branch 'release/v0.13' into p2p-stability
pwalski Oct 4, 2023
3a61c84
update ya-relay
staszek-krotki Oct 10, 2023
7598f5a
Merge branch 'release/v0.13' into p2p-stability
pwalski Oct 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 204 additions & 34 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 14 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ packet-trace-enable = [
"ya-vpn/packet-trace-enable",
"ya-file-logging/packet-trace-enable",
"ya-net/packet-trace-enable",
"ya-service-bus/packet-trace-enable"
"ya-service-bus/packet-trace-enable",
]

[[bin]]
Expand Down Expand Up @@ -214,7 +214,7 @@ members = [
"utils/fd-metrics",
"core/metrics",
"test-utils/test-framework",
"test-utils/test-framework/framework-macro"
"test-utils/test-framework/framework-macro",
]

[workspace.dependencies]
Expand Down Expand Up @@ -249,10 +249,10 @@ ya-service-api-interfaces = { path = "core/serv-api/interfaces" }
ya-service-api-web = { path = "core/serv-api/web" }

## SERVICE BUS
ya-service-bus = { git = "https://github.com/golemfactory/ya-service-bus.git", rev = "190f0d772f7ed0830d54a2cef77d7a177f276c68"}
ya-sb-proto = { git = "https://github.com/golemfactory/ya-service-bus.git", rev = "190f0d772f7ed0830d54a2cef77d7a177f276c68"}
ya-sb-router = { git = "https://github.com/golemfactory/ya-service-bus.git", rev = "190f0d772f7ed0830d54a2cef77d7a177f276c68"}
ya-sb-util = { git = "https://github.com/golemfactory/ya-service-bus.git", rev = "190f0d772f7ed0830d54a2cef77d7a177f276c68"}
ya-service-bus = { git = "https://github.com/golemfactory/ya-service-bus.git", rev = "190f0d772f7ed0830d54a2cef77d7a177f276c68" }
ya-sb-proto = { git = "https://github.com/golemfactory/ya-service-bus.git", rev = "190f0d772f7ed0830d54a2cef77d7a177f276c68" }
ya-sb-router = { git = "https://github.com/golemfactory/ya-service-bus.git", rev = "190f0d772f7ed0830d54a2cef77d7a177f276c68" }
ya-sb-util = { git = "https://github.com/golemfactory/ya-service-bus.git", rev = "190f0d772f7ed0830d54a2cef77d7a177f276c68" }

#ya-service-bus = { path = "../ya-service-bus" }
#ya-sb-proto = { path = "../ya-service-bus/crates/proto" }
Expand All @@ -264,8 +264,14 @@ ya-sb-util = { git = "https://github.com/golemfactory/ya-service-bus.git", rev =
#ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "2a6350f62cf8d926721225a3451822731456e3fe" }

## RELAY and networking stack
ya-relay-stack = { git = "https://github.com/golemfactory/ya-relay.git", rev = "c92a75b0cf062fcc9dbb3ea2a034d913e5fad8e5" }
ya-relay-client = { git = "https://github.com/golemfactory/ya-relay.git", rev = "c92a75b0cf062fcc9dbb3ea2a034d913e5fad8e5" }
ya-relay-stack = { git = "https://github.com/golemfactory/ya-relay.git", rev = "3e89c4b7c6ad06a4fc08cf6fdfe22f4740759af5" }
ya-relay-client = { git = "https://github.com/golemfactory/ya-relay.git", rev = "3e89c4b7c6ad06a4fc08cf6fdfe22f4740759af5" }

#ya-relay-stack = { path = "../ya-relay/crates/stack" }
#ya-relay-client = { path = "../ya-relay/client" }
#ya-relay-core = { path = "../ya-relay/crates/core" }
#ya-relay-proto = { path = "../ya-relay/crates/proto" }


## OTHERS
gftp = { path = "core/gftp" }
Expand Down
74 changes: 3 additions & 71 deletions core/gsb-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ async fn get_service_messages(
log::debug!("No old WS connection");
}
let handler = WsMessagesHandler { service };
let (_addr, resp) = ws::WsResponseBuilder::new(handler, &req, stream)
.protocols(&["gsb+flexbuffers"])
.start_with_addr()?;
let (_addr, resp) = ws::WsResponseBuilder::new(handler, &req, stream).start_with_addr()?;
Ok(resp)
}

Expand Down Expand Up @@ -128,7 +126,7 @@ mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use test_case::test_case;
use ya_core_model::gftp::{GetChunk, GftpChunk, UploadChunk};
use ya_core_model::gftp::{GetChunk, GftpChunk};
use ya_core_model::NodeId;
use ya_service_api_interfaces::Provider;
use ya_service_api_web::middleware::auth::dummy::DummyAuth;
Expand Down Expand Up @@ -257,72 +255,6 @@ mod tests {
);
}

#[actix_web::test]
#[serial]
async fn upload_chunk_test() {
let mut api = dummy_api();

let service_number = SERVICE_COUNTER.fetch_add(1, Ordering::SeqCst);
let service_addr = format!("{SERVICE_ADDR}_{service_number}");

let bind_req = api
.post(format!("/{}/{}", GSB_API_PATH, "services"))
.send_json(&ServiceRequest {
listen: ServiceListenRequest {
components: vec!["UploadChunk".to_string()],
on: service_addr.clone(),
},
});

let body =
verify_bind_service_response(bind_req, vec!["UploadChunk".to_string()], &service_addr)
.await;

let services_path = format!("gsb-api/v1/services/{}", body.services_id);
let mut ws_frames = api.ws_at(&services_path).await.unwrap();

let gsb_endpoint = ya_service_bus::typed::service(service_addr.clone());

let (gsb_res, ws_res) = tokio::join!(
async {
gsb_endpoint
.call(UploadChunk {
chunk: GftpChunk {
offset: 0,
content: vec![1, 2, 3],
},
})
.await
},
async {
println!("WS sleep");
tokio::time::sleep(Duration::from_millis(100)).await;
let ws_req = ws_frames.next().await;
let ws_req = match ws_req {
Some(Ok(Frame::Binary(ws_req))) => {
flexbuffers::from_slice::<TestWsRequest<UploadChunk>>(&ws_req).unwrap()
}
msg => panic!("Unexpected msg: {:?}", msg),
};
let ws_res = TestWsResponse {
id: ws_req.id,
payload: (),
};
let ws_res = flexbuffers::to_vec(ws_res).unwrap();
ws_frames
.send(ws::Message::Binary(Bytes::from(ws_res)))
.await
}
);

ws_res.unwrap();
gsb_res
.expect("Response is unit type")
.expect("Response is ok");

verify_delete_service(&mut api, &service_addr).await;
}

#[actix_web::test]
#[serial]
async fn ok_payload_test() {
Expand Down Expand Up @@ -449,7 +381,7 @@ mod tests {
#[test_case(r#"{ "id": "some", "error": {} }"#, Frame::Close(Some(CloseReason {
code: CloseCode::Policy,
description: Some("Failed to read response. Err: Missing 'payload' and 'error' fields. Id: some.".to_string()) }));
"Close when error empty error needs at least top level error name field"
"Close when error is empty because it needs at least top level error name field"
)]
#[actix_web::test]
#[serial]
Expand Down
11 changes: 6 additions & 5 deletions core/net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ service = []
central-net = []
packet-trace-enable = [
"ya-packet-trace/enable",
"ya-relay-client/packet-trace-enable"
"ya-relay-client/packet-trace-enable",
]

[dependencies]
ya-client-model = "0.5"
ya-core-model = { version = "^0.9", features=["net", "identity"] }
ya-core-model = { version = "^0.9", features = ["net", "identity"] }

ya-relay-client = "0.6"
ya-relay-client = { git = "https://github.com/golemfactory/ya-relay.git", rev = "3e89c4b7c6ad06a4fc08cf6fdfe22f4740759af5" }
#ya-relay-client = "0.6"
#ya-relay-client = { path = "../../../ya-relay/client" }

ya-sb-proto = { version = "0.6.1" }
Expand All @@ -37,7 +38,7 @@ futures = "0.3"
humantime = "2.1"
lazy_static = "1.4"
log = "0.4"
metrics="0.12"
metrics = "0.12"
serde_json = "1.0"
structopt = "0.3"
strum = { version = "0.24", features = ["derive"] }
Expand All @@ -50,7 +51,7 @@ ethsign = { version = "0.8" }
tokio-util = { version = "0.7" }
url = { version = "2.2" }
prost = { version = "0.10" }
rand = { version = "0.7"}
rand = { version = "0.7" }

[dev-dependencies]
ya-sb-proto = "0.6.1"
Expand Down
4 changes: 3 additions & 1 deletion core/net/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use url::Url;
#[derive(
StructOpt, EnumString, EnumVariantNames, IntoStaticStr, Copy, Clone, Eq, PartialEq, Debug,
)]
#[strum(serialize_all = "lowercase")]
#[strum(serialize_all = "kebab-case")]
pub enum NetType {
Central,
Hybrid,
Expand All @@ -26,6 +26,8 @@ pub struct Config {
pub broadcast_size: u32,
#[structopt(env = "YA_NET_SESSION_EXPIRATION", parse(try_from_str = humantime::parse_duration), default_value = "15s")]
pub session_expiration: Duration,
#[structopt(env = "YA_NET_SESSION_REQUEST_TIMEOUT", parse(try_from_str = humantime::parse_duration), default_value = "3s")]
pub session_request_timeout: Duration,
}

impl Config {
Expand Down
4 changes: 2 additions & 2 deletions core/net/src/hybrid/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ya_core_model::net::local::{FindNodeResponse, GsbPingResponse, StatusError};
use ya_core_model::net::{
local as model, GenericNetError, GsbRemotePing, RemoteEndpoint, DIAGNOSTIC,
};
use ya_relay_client::ChannelMetrics;
use ya_relay_client::metrics::ChannelMetrics;
use ya_service_bus::timeout::IntoTimeoutFuture;
use ya_service_bus::typed::ServiceBinder;
use ya_service_bus::{typed as bus, RpcEndpoint};
Expand Down Expand Up @@ -145,7 +145,7 @@ pub async fn disconnect(node: NodeId) -> anyhow::Result<()> {
log::info!("Disconnecting from Node: {node}");

let client = Net::client().await?;
client.disconnect(node).await?
Ok(client.disconnect(node).await??)
}

pub async fn cli_ping(nodes: Vec<NodeId>) -> anyhow::Result<Vec<GsbPingResponse>> {
Expand Down
27 changes: 13 additions & 14 deletions core/net/src/hybrid/client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#![allow(dead_code)]
#![allow(unused)]

use std::collections::HashMap;
use std::convert::TryFrom;
use std::net::{IpAddr, SocketAddr};
Expand All @@ -10,7 +13,10 @@ use anyhow::{anyhow, bail};
use ya_core_model::net::local as model;
use ya_core_model::net::local::FindNodeResponse;
use ya_core_model::NodeId;
use ya_relay_client::{ChannelMetrics, Client, SessionDesc, SocketDesc, SocketState};
use ya_relay_client::metrics::ChannelMetrics;
use ya_relay_client::model::{SessionDesc, SocketDesc, SocketState};
use ya_relay_client::Client;
use ya_relay_client::SessionError;

lazy_static::lazy_static! {
static ref ADDRESS: Arc<RwLock<Option<Addr<ClientActor >>>> = Default::default();
Expand Down Expand Up @@ -118,7 +124,7 @@ proxy!(
connect,
|client: Client, msg: Connect| async move {
if msg.0.reliable_channel {
let _ = client.forward(msg.0.node).await?;
let _ = client.forward_reliable(msg.0.node).await?;
}

if msg.0.transfer_channel {
Expand All @@ -132,34 +138,27 @@ proxy!(
}
);
proxy!(
Disconnect(NodeId) -> anyhow::Result<()>,
Disconnect(NodeId) -> Result<(), SessionError>,
disconnect,
|client: Client, msg: Disconnect| async move {
let node_id = msg.0;
let node = client.sessions.get_node(node_id).await?;

if node.is_p2p() {
client.sessions.close_session(node.session).await?;
} else {
client.sessions.remove_node(node_id).await;
}
Ok(())
client.disconnect(node_id).await
}
);
proxy!(
GetAlias(NodeId) -> Option<NodeId>,
get_alias,
|client: Client, msg: GetAlias| async move { client.sessions.alias(&msg.0).await }
|client: Client, msg: GetAlias| async move { client.default_id(msg.0).await }
);
proxy!(
IsP2p(NodeId) -> bool,
is_p2p,
|client: Client, msg: IsP2p| async move { client.sessions.is_p2p(&msg.0).await }
|client: Client, msg: IsP2p| async move { client.is_p2p(msg.0).await }
);
proxy!(
GetRemoteId(SocketAddr) -> Option<NodeId>,
remote_id,
|client: Client, msg: GetRemoteId| async move { client.sessions.remote_id(&msg.0).await }
|client: Client, msg: GetRemoteId| async move { client.remote_id(&msg.0).await }
);
proxy!(
GetNodeId -> NodeId,
Expand Down
40 changes: 15 additions & 25 deletions core/net/src/hybrid/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use ya_core_model::net::local::{
BindBroadcastError, BroadcastMessage, NewNeighbour, SendBroadcastMessage, SendBroadcastStub,
};
use ya_core_model::{identity, net, NodeId};
use ya_relay_client::codec::forward::{PrefixedSink, PrefixedStream, SinkKind};
use ya_relay_client::channels::{ForwardReceiver, ForwardSender, PrefixedStream};
use ya_relay_client::crypto::CryptoProvider;
use ya_relay_client::proto::Payload;
use ya_relay_client::{Client, ClientBuilder, ForwardReceiver, TransportType};
use ya_relay_client::model::{Payload, TransportType};
use ya_relay_client::{Client, ClientBuilder, FailFast, GenericSender};
use ya_sb_proto::codec::GsbMessage;
use ya_sb_proto::CallReplyCode;
use ya_sb_util::RevPrefixes;
Expand All @@ -50,7 +50,7 @@ const DEFAULT_NET_RELAY_HOST: &str = "127.0.0.1:7464";
type BusSender = mpsc::Sender<ResponseChunk>;
type BusReceiver = mpsc::Receiver<ResponseChunk>;
type NetSender = mpsc::Sender<Payload>;
type NetSinkKind = SinkKind<NetSender, mpsc::SendError>;
type NetSinkKind = ForwardSender;
type NetSinkKey = (NodeId, TransportType);

lazy_static::lazy_static! {
Expand Down Expand Up @@ -124,7 +124,7 @@ pub async fn start_network(
counter!("net.connections.p2p", 0);
counter!("net.connections.relay", 0);

log::info!("Starting network (hybrid) with identity: {}", default_id);
log::info!("Starting network (hybrid) with identity: {default_id}");

let broadcast_size = config.broadcast_size;
let crypto = IdentityCryptoProvider::new(default_id);
Expand Down Expand Up @@ -231,7 +231,8 @@ async fn build_client(
.crypto(crypto)
.listen(config.bind_url.clone())
.expire_session_after(config.session_expiration)
.connect()
.session_request_timeout(config.session_request_timeout)
.connect(FailFast::No)
.build()
.await
}
Expand Down Expand Up @@ -513,7 +514,7 @@ fn forward_bus_to_net(

match state.forward_sink(remote_id, transport).await {
Ok(mut sink) => {
let _ = sink.send(msg).await.map_err(|_| {
let _ = sink.send(msg.into()).await.map_err(|_| {
let err = "Net: error sending message: session closed".to_string();
handler_reply_service_err(request_id, err, tx);
});
Expand Down Expand Up @@ -566,7 +567,7 @@ fn push_bus_to_net(

match state.forward_sink(remote_id, transport).await {
Ok(mut sink) => {
let _ = sink.send(msg).await.map_err(|_| {
let _ = sink.send(msg.into()).await.map_err(|_| {
log::debug!("Net: error sending message: session closed");
});
}
Expand Down Expand Up @@ -881,7 +882,7 @@ fn handle_request(

//stream.forward(sink).await?;
while let Some(item) = stream.next().await {
sink.send(item?).await.ok();
sink.send(item?.into()).await.ok();
log::debug!("Handled request: {request_id_sent} from: {caller_id}");
}

Expand Down Expand Up @@ -1024,22 +1025,11 @@ impl State {
.with(|c| c.borrow().clone())
.ok_or_else(|| anyhow::anyhow!("network not started"))?;

let forward: NetSinkKind = match transport {
TransportType::Unreliable => client.forward_unreliable(remote_id).await?.into(),
TransportType::Reliable => PrefixedSink::new(client.forward(remote_id).await?).into(),
TransportType::Transfer => {
PrefixedSink::new(client.forward_transfer(remote_id).await?).into()
}
};

// FIXME: yagna daemon doesn't handle connections; ya-relay-client does
// if client.sessions.has_p2p_connection(remote_id).await {
// counter!("net.connections.p2p", 1)
// } else {
// counter!("net.connections.relay", 1)
// };

Ok(forward)
Ok::<_, anyhow::Error>(match transport {
TransportType::Unreliable => client.forward_unreliable(remote_id).await?,
TransportType::Reliable => client.forward_reliable(remote_id).await?.framed(),
TransportType::Transfer => client.forward_transfer(remote_id).await?.framed(),
})
}

fn get_public_service(&self, addr: &str) -> Option<String> {
Expand Down
2 changes: 1 addition & 1 deletion core/vpn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ mime = "0.3.16"
rand = "0.7.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
smoltcp = { package = "ya-smoltcp", version = "0.1" }
smoltcp = "0.10.0"
thiserror = "1.0"
tokio = { version = "1", features = ["time"] }
tokio-stream = "0.1.6"
Expand Down
Loading
Loading