diff --git a/Cargo.lock b/Cargo.lock index 11d4e3424..fa1e466d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -261,6 +261,58 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-executor" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c1da3ae8dabd9c00f453a329dfe1fb28da3c0a72e2478cdcd93171740c20499" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand 2.0.0", + "futures-lite", + "slab", +] + +[[package]] +name = "async-fs" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "279cf904654eeebfa37ac9bb1598880884924aab82e290aa65c9e77a0e142e06" +dependencies = [ + "async-lock", + "autocfg", + "blocking", + "futures-lite", +] + +[[package]] +name = "async-global-executor" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + [[package]] name = "async-graphql" version = "5.0.10" @@ -377,7 +429,35 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" dependencies = [ - "event-listener", + "event-listener 2.5.3", +] + +[[package]] +name = "async-net" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0434b1ed18ce1cf5769b8ac540e33f01fa9471058b5e89da9e06f3c882a8c12f" +dependencies = [ + "async-io", + "blocking", + "futures-lite", +] + +[[package]] +name = "async-process" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea6438ba0a08d81529c69b36700fa2f95837bfe3e776ab39cde9c14d9149da88" +dependencies = [ + "async-io", + "async-lock", + "async-signal", + "blocking", + "cfg-if", + "event-listener 3.0.0", + "futures-lite", + "rustix 0.38.18", + "windows-sys 0.48.0", ] [[package]] @@ -391,6 +471,24 @@ dependencies = [ "syn 2.0.37", ] +[[package]] +name = "async-signal" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2a5415b7abcdc9cd7d63d6badba5288b2ca017e3fbd4173b8f405449f1a2399" +dependencies = [ + "async-io", + "async-lock", + "atomic-waker", + "cfg-if", + "futures-core", + "futures-io", + "rustix 0.38.18", + "signal-hook-registry", + "slab", + "windows-sys 0.48.0", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -413,6 +511,12 @@ dependencies = [ "syn 2.0.37", ] +[[package]] +name = "async-task" +version = "4.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9441c6b2fe128a7c2bf680a44c34d0df31ce09e5b7e401fcca3faa483dbc921" + [[package]] name = "async-trait" version = "0.1.73" @@ -454,6 +558,12 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "atty" version = "0.2.14" @@ -719,6 +829,22 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c36a4d0d48574b3dd360b4b7d95cc651d2b6557b6402848a27d4b228a473e2a" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "fastrand 2.0.0", + "futures-io", + "futures-lite", + "piper", + "tracing", +] + [[package]] name = "bollard" version = "0.13.0" @@ -2269,6 +2395,17 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "event-listener" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29e56284f00d94c1bc7fd3c77027b4623c88c1f53d8d2394c6199f2921dea325" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + [[package]] name = "eyre" version = "0.6.8" @@ -3151,6 +3288,7 @@ dependencies = [ "ipnet", "log", "rtnetlink", + "smol", "system-configuration", "tokio", "windows 0.34.0", @@ -3339,7 +3477,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi 0.3.3", - "rustix 0.38.14", + "rustix 0.38.18", "windows-sys 0.48.0", ] @@ -3760,6 +3898,22 @@ dependencies = [ "zeroize", ] +[[package]] +name = "libp2p-plaintext" +version = "0.40.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37266c683a757df713f7dcda0cdcb5ad4681355ffa1b37b77c113c176a531195" +dependencies = [ + "asynchronous-codec", + "bytes", + "futures", + "libp2p-core", + "libp2p-identity", + "log", + "quick-protobuf", + "unsigned-varint", +] + [[package]] name = "libp2p-quic" version = "0.9.2" @@ -3837,12 +3991,32 @@ dependencies = [ "syn 2.0.37", ] +[[package]] +name = "libp2p-swarm-test" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61761099882b9c4fe02d4d0fc47641e81381dd2a95a7b4ddeb0dc02f3daaaf16" +dependencies = [ + "async-trait", + "futures", + "futures-timer", + "libp2p-core", + "libp2p-identity", + "libp2p-plaintext", + "libp2p-swarm", + "libp2p-tcp", + "libp2p-yamux", + "log", + "rand", +] + [[package]] name = "libp2p-tcp" version = "0.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09bfdfb6f945c5c014b87872a0bdb6e0aef90e92f380ef57cd9013f118f9289d" dependencies = [ + "async-io", "futures", "futures-timer", "if-watch", @@ -3975,9 +4149,9 @@ checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" [[package]] name = "linux-raw-sys" -version = "0.4.7" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a9bad9f94746442c783ca431b22403b519cd7fbeed0533fdd6328b2f2212128" +checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" [[package]] name = "lock_api" @@ -4353,6 +4527,7 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6471bf08e7ac0135876a9581bf3217ef0333c191c128d34878079f42ee150411" dependencies = [ + "async-io", "bytes", "futures", "libc", @@ -5006,6 +5181,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" +dependencies = [ + "atomic-waker", + "fastrand 2.0.0", + "futures-io", +] + [[package]] name = "pkcs8" version = "0.10.2" @@ -5795,6 +5981,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "322c53fd76a18698f1c27381d58091de3a043d356aa5bd0d510608b565f469a0" dependencies = [ + "async-global-executor", "futures", "log", "netlink-packet-route", @@ -5856,14 +6043,14 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.14" +version = "0.38.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "747c788e9ce8e92b12cd485c49ddf90723550b654b32508f979b71a7b1ecda4f" +checksum = "5a74ee2d7c2581cd139b42447d7d9389b889bdaad3a73f1ebb16f2a3237bb19c" dependencies = [ "bitflags 2.4.0", "errno", "libc", - "linux-raw-sys 0.4.7", + "linux-raw-sys 0.4.10", "windows-sys 0.48.0", ] @@ -6380,6 +6567,23 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" +[[package]] +name = "smol" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13f2b548cd8447f8de0fdf1c592929f70f4fc7039a05e47404b0d096ec6987a1" +dependencies = [ + "async-channel", + "async-executor", + "async-fs", + "async-io", + "async-lock", + "async-net", + "async-process", + "blocking", + "futures-lite", +] + [[package]] name = "snow" version = "0.9.3" @@ -6646,7 +6850,7 @@ dependencies = [ "cfg-if", "fastrand 2.0.0", "redox_syscall 0.3.5", - "rustix 0.38.14", + "rustix 0.38.18", "windows-sys 0.48.0", ] @@ -7247,6 +7451,7 @@ dependencies = [ "hyper", "lazy_static", "libp2p", + "libp2p-swarm-test", "pin-project", "prometheus-client", "prost", @@ -7604,12 +7809,14 @@ dependencies = [ "lazy_static", "libp2p", "proc_macro_sdk", + "prost", "rand", "rstest", "tokio", "tokio-stream", "tokio-util", "tonic 0.9.2", + "tonic-build", "topos-core", "topos-crypto", "topos-p2p", @@ -8206,7 +8413,7 @@ dependencies = [ "either", "home", "once_cell", - "rustix 0.38.14", + "rustix 0.38.18", ] [[package]] diff --git a/crates/topos-p2p/Cargo.toml b/crates/topos-p2p/Cargo.toml index 7c36d24d6..bf01e271f 100644 --- a/crates/topos-p2p/Cargo.toml +++ b/crates/topos-p2p/Cargo.toml @@ -38,6 +38,7 @@ http-body-util = "0.1.0-rc.3" hyper.workspace = true uuid.workspace = true pin-project = "1.1.3" +libp2p-swarm-test = "0.2.0" [dev-dependencies] test-log.workspace = true diff --git a/crates/topos-p2p/src/behaviour/grpc.rs b/crates/topos-p2p/src/behaviour/grpc.rs index 44eb4b9e4..a028b3ac9 100644 --- a/crates/topos-p2p/src/behaviour/grpc.rs +++ b/crates/topos-p2p/src/behaviour/grpc.rs @@ -82,6 +82,8 @@ pub(crate) struct Behaviour { next_inbound_request_id: Arc, /// The list of connected peers with the associated gRPC channel connected: HashMap>, + /// The list of known addresses for each peer managed by `add_address` and `remove_address` + addresses: HashMap>, /// The optional inbound stream to receive gRPC connections inbound_stream: Option>>, /// The list of pending outbound connections @@ -99,6 +101,7 @@ impl Behaviour { Self { service, connected: HashMap::new(), + addresses: HashMap::new(), inbound_stream: None, next_request_id: RequestId(1), next_inbound_request_id: Arc::new(AtomicU64::new(0)), @@ -108,6 +111,29 @@ impl Behaviour { } } + /// Adds a known address for a peer that can be used for + /// dialing attempts by the `Swarm` + /// + /// Addresses added in this way are only removed by `remove_address`. + #[cfg(test)] + pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) { + self.addresses.entry(*peer).or_default().push(address); + } + + /// Removes an address of a peer previously added via `add_address`. + #[cfg(test)] + #[allow(unused)] + pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) { + let mut last = false; + if let Some(addresses) = self.addresses.get_mut(peer) { + addresses.retain(|a| a != address); + last = addresses.is_empty(); + } + if last { + self.addresses.remove(peer); + } + } + /// Ask the behaviour to create a new outbound connection for the given peer. /// /// The return value is a [`OutboundConnection`] that can be used to check the status of the @@ -267,6 +293,7 @@ impl Behaviour { connection_id, }: DialFailure, ) { + println!("Dial failure {:?}", error); if let Some(peer_id) = peer_id { match error { DialError::DialPeerConditionFalse(_) => { @@ -360,6 +387,10 @@ impl NetworkBehaviour for Behaviour { addresses.extend(connections.iter().filter_map(|c| c.address.clone())); } + if let Some(more) = self.addresses.get(&peer_id) { + addresses.extend(more.into_iter().cloned()); + } + Ok(addresses) } @@ -370,24 +401,18 @@ impl NetworkBehaviour for Behaviour { event: libp2p::swarm::THandlerOutEvent, ) { match event { - Event::OutboundSuccess { - peer_id, - request_id, - channel, - } => {} - Event::OutboundFailure { - peer_id, - request_id, - error, - } => { - warn!("Unhandled OutboundFailure in gRPC Behaviour"); - } - Event::InboundNegotiatedStream { request_id, stream } => { + handler::event::Event::InboundNegotiatedStream { request_id, stream } => { if let Some(sender) = &mut self.inbound_stream { _ = sender.send(Ok(GrpcStream::new(stream, peer_id, connection_id))); + self.pending_events.push_back(ToSwarm::GenerateEvent( + Event::InboundNegotiatedConnection { + request_id, + connection_id, + }, + )); } } - Event::OutboundNegotiatedStream { request_id, stream } => { + handler::event::Event::OutboundNegotiatedStream { request_id, stream } => { let stream = GrpcStream::new(stream, peer_id, connection_id); let future = stream @@ -444,6 +469,7 @@ impl NetworkBehaviour for Behaviour { if let Some(conn_request_id) = &conn.request_id { if request_id == *conn_request_id { conn.channel = Some(channel.clone()); + break; } } @@ -453,6 +479,12 @@ impl NetworkBehaviour for Behaviour { // Notifying the channel to the initial sender if let Some(req) = self.pending_outbound_connections.remove(&peer_id) { let _ = req.notifier.send(Ok(channel.clone())); + self.pending_events.push_back(ToSwarm::GenerateEvent( + Event::OutboundNegotiatedConnection { + request_id: req.request_id, + peer_id, + }, + )); } return Poll::Ready(ToSwarm::GenerateEvent(Event::OutboundSuccess { diff --git a/crates/topos-p2p/src/behaviour/grpc/event.rs b/crates/topos-p2p/src/behaviour/grpc/event.rs index 0293de62d..808cc0d21 100644 --- a/crates/topos-p2p/src/behaviour/grpc/event.rs +++ b/crates/topos-p2p/src/behaviour/grpc/event.rs @@ -1,4 +1,4 @@ -use libp2p::PeerId; +use libp2p::{swarm::ConnectionId, PeerId}; use tonic::transport::Channel; use super::{OutboundError, RequestId}; @@ -11,18 +11,27 @@ pub enum Event { error: OutboundError, }, - InboundNegotiatedStream { + OutboundSuccess { + peer_id: PeerId, request_id: RequestId, - stream: libp2p::Stream, + channel: Channel, }, - OutboundNegotiatedStream { + // InboundNegotiatedStream { + // request_id: RequestId, + // stream: libp2p::Stream, + // }, + InboundNegotiatedConnection { request_id: RequestId, - stream: libp2p::Stream, + connection_id: ConnectionId, }, - OutboundSuccess { + + // OutboundNegotiatedStream { + // request_id: RequestId, + // stream: libp2p::Stream, + // }, + OutboundNegotiatedConnection { peer_id: PeerId, request_id: RequestId, - channel: Channel, }, } diff --git a/crates/topos-p2p/src/behaviour/grpc/handler.rs b/crates/topos-p2p/src/behaviour/grpc/handler.rs index 8db8623b1..c4751375e 100644 --- a/crates/topos-p2p/src/behaviour/grpc/handler.rs +++ b/crates/topos-p2p/src/behaviour/grpc/handler.rs @@ -11,12 +11,14 @@ use libp2p::swarm::{ handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound}, ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol, }; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use self::protocol::GrpcUpgradeProtocol; -use super::{Event, RequestId}; +use super::RequestId; +pub(crate) mod event; +use event::Event; mod protocol; /// Handler for gRPC connections @@ -42,7 +44,7 @@ impl Handler { impl ConnectionHandler for Handler { type FromBehaviour = RequestId; - type ToBehaviour = Event; + type ToBehaviour = event::Event; type Error = void::Void; @@ -126,6 +128,7 @@ impl ConnectionHandler for Handler { } if let Some(request_id) = self.outbound_request_id.take() { + debug!("Starting outbound request SubstreamProtocol"); return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(GrpcUpgradeProtocol {}, request_id), }); diff --git a/crates/topos-p2p/src/behaviour/grpc/handler/event.rs b/crates/topos-p2p/src/behaviour/grpc/handler/event.rs new file mode 100644 index 000000000..f0b6312d5 --- /dev/null +++ b/crates/topos-p2p/src/behaviour/grpc/handler/event.rs @@ -0,0 +1,13 @@ +use crate::behaviour::grpc::RequestId; + +#[derive(Debug)] +pub enum Event { + InboundNegotiatedStream { + request_id: RequestId, + stream: libp2p::Stream, + }, + OutboundNegotiatedStream { + request_id: RequestId, + stream: libp2p::Stream, + }, +} diff --git a/crates/topos-p2p/src/tests/behaviour/grpc.rs b/crates/topos-p2p/src/tests/behaviour/grpc.rs new file mode 100644 index 000000000..6a0a8e93a --- /dev/null +++ b/crates/topos-p2p/src/tests/behaviour/grpc.rs @@ -0,0 +1,202 @@ +use std::{future::IntoFuture, time::Duration}; + +use async_trait::async_trait; +use libp2p::{multiaddr::Protocol, swarm::SwarmEvent, Multiaddr, Swarm}; +use libp2p_swarm_test::SwarmExt; +use rstest::rstest; +use test_log::test; +use tokio::spawn; +use tokio_util::sync::CancellationToken; +use tonic::{transport::Server, Request, Response, Status}; +use topos_test_sdk::grpc::behaviour::helloworld::{ + self, + greeter_client::GreeterClient, + greeter_server::{Greeter, GreeterServer}, + HelloReply, HelloRequest, HelloWithDelayRequest, +}; +use tower::service_fn; + +use crate::behaviour::grpc; + +#[derive(Default)] +struct DummyServer {} + +#[async_trait] +impl Greeter for DummyServer { + async fn say_hello( + &self, + request: Request, + ) -> Result, Status> { + Ok(Response::new(HelloReply { + message: format!("Hello {}", request.into_inner().name), + })) + } + + async fn say_hello_with_delay( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + tokio::time::sleep(Duration::from_secs(request.delay_in_seconds)).await; + + Ok(Response::new(HelloReply { + message: format!("Hello {}", request.name), + })) + } +} + +#[rstest] +#[test(tokio::test)] +#[timeout(Duration::from_secs(5))] +async fn instantiate_grpc() { + let dummy = DummyServer {}; + let router = Server::builder().add_service(GreeterServer::new(dummy)); + let mut client_swarm = Swarm::new_ephemeral(|_| grpc::Behaviour::new(None)); + let mut server_swarm = Swarm::new_ephemeral(|_| grpc::Behaviour::new(Some(router))); + + let server_peer_id = *server_swarm.local_peer_id(); + + server_swarm.listen().await; + + let server_address = server_swarm.listeners().next().unwrap(); + + client_swarm + .behaviour_mut() + .add_address(&server_peer_id, server_address.clone()); + + let outbound_connection = client_swarm + .behaviour_mut() + .open_outbound_connection(&server_peer_id); + + let shutdown = CancellationToken::new(); + let client_shutdown = shutdown.child_token(); + let server_shutdown = shutdown.child_token(); + let client_swarm = async move { + loop { + tokio::select! { + event = client_swarm.next_swarm_event() => {} + _ = client_shutdown.cancelled() => { return client_swarm; } + } + } + }; + + let server_swarm = async move { + loop { + tokio::select! { + _ = server_swarm.next_swarm_event() => {} + _ = server_shutdown.cancelled() => { return server_swarm; } + } + } + }; + + let server_swarm = spawn(server_swarm); + let client_swarm = spawn(client_swarm); + let connection = outbound_connection.into_future().await.unwrap(); + + shutdown.cancel(); + + let server_swarm = server_swarm.await.unwrap(); + let client_swarm = client_swarm.await.unwrap(); + + assert_eq!( + server_swarm.connected_peers().collect::>(), + vec![client_swarm.local_peer_id()] + ); +} + +#[test(tokio::test)] +async fn opening_outbound_stream() {} + +#[test(tokio::test)] +async fn opening_outbound_stream_half_close() {} + +#[test(tokio::test)] +#[ignore = "Need to find a way to properly close the connection after sending the query"] +async fn closing_stream() { + let dummy = DummyServer {}; + let router = Server::builder().add_service(GreeterServer::new(dummy)); + let mut client_swarm = Swarm::new_ephemeral(|_| grpc::Behaviour::new(None)); + let mut server_swarm = Swarm::new_ephemeral(|_| grpc::Behaviour::new(Some(router))); + + let server_peer_id = *server_swarm.local_peer_id(); + + server_swarm.listen().await; + + let server_address = server_swarm.listeners().next().unwrap(); + client_swarm + .behaviour_mut() + .add_address(&server_peer_id, server_address.clone()); + + let outbound_connection = client_swarm + .behaviour_mut() + .open_outbound_connection(&server_peer_id); + + let client_swarm = async move { + loop { + client_swarm.next_swarm_event().await; + } + }; + let server_swarm = async move { + loop { + server_swarm.next_swarm_event().await; + } + }; + + spawn(server_swarm); + spawn(client_swarm); + let connection = outbound_connection.into_future().await.unwrap(); + + let mut client = GreeterClient::new(connection.channel); + + let result = client + .say_hello_with_delay(HelloWithDelayRequest { + name: "Simon".into(), + delay_in_seconds: 10, + }) + .await + .unwrap(); + + assert_eq!(result.into_inner().message, "Hello Simon"); +} + +#[test(tokio::test)] +async fn executing_query() { + let dummy = DummyServer {}; + let router = Server::builder().add_service(GreeterServer::new(dummy)); + let mut client_swarm = Swarm::new_ephemeral(|_| grpc::Behaviour::new(None)); + let mut server_swarm = Swarm::new_ephemeral(|_| grpc::Behaviour::new(Some(router))); + + let server_peer_id = *server_swarm.local_peer_id(); + + server_swarm.listen().await; + client_swarm.connect(&mut server_swarm).await; + let outbound_connection = client_swarm + .behaviour_mut() + .open_outbound_connection(&server_peer_id); + + let client_swarm = async move { + loop { + client_swarm.next_swarm_event().await; + } + }; + let server_swarm = async move { + loop { + server_swarm.next_swarm_event().await; + } + }; + + spawn(server_swarm); + spawn(client_swarm); + let connection = outbound_connection.into_future().await.unwrap(); + + let mut client = GreeterClient::new(connection.channel); + + let result = client + .say_hello(HelloRequest { + name: "Simon".into(), + }) + .await + .unwrap(); + + assert_eq!(result.into_inner().message, "Hello Simon"); +} diff --git a/crates/topos-p2p/src/tests/behaviour/mod.rs b/crates/topos-p2p/src/tests/behaviour/mod.rs new file mode 100644 index 000000000..f1d69e382 --- /dev/null +++ b/crates/topos-p2p/src/tests/behaviour/mod.rs @@ -0,0 +1 @@ +mod grpc; diff --git a/crates/topos-p2p/src/tests/mod.rs b/crates/topos-p2p/src/tests/mod.rs index ee3088f01..1a3b54a7b 100644 --- a/crates/topos-p2p/src/tests/mod.rs +++ b/crates/topos-p2p/src/tests/mod.rs @@ -1,2 +1,3 @@ +mod behaviour; mod dht; mod support; diff --git a/crates/topos-test-sdk/Cargo.toml b/crates/topos-test-sdk/Cargo.toml index 2ad620006..4aca0b825 100644 --- a/crates/topos-test-sdk/Cargo.toml +++ b/crates/topos-test-sdk/Cargo.toml @@ -18,6 +18,7 @@ topos-tce-gatekeeper = { path = "../topos-tce-gatekeeper/", optional = true } topos-tce-synchronizer = { path = "../topos-tce-synchronizer/", optional = true } topos-tce-broadcast = { path = "../topos-tce-broadcast/", optional = true } topos-tce-transport = { path = "../topos-tce-transport/", optional = true } + hex.workspace = true futures.workspace = true lazy_static = { version = "1.4.0" } @@ -28,14 +29,18 @@ rstest.workspace = true tokio.workspace = true tokio-util.workspace = true tokio-stream.workspace = true -tonic = { workspace = true, optional = true } +prost.workspace = true +tonic = { workspace = true, default-features = false, features = [ + "prost", + "codegen", + "transport", +] } tracing.workspace = true [features] default = [] tce = [ "topos-core/api", - "dep:tonic", "topos-tce", "topos-tce-api", "topos-tce-broadcast", @@ -43,3 +48,8 @@ tce = [ "topos-tce-synchronizer", "topos-tce-transport", ] + +[build-dependencies] +tonic-build = { version = "0.8", default-features = false, features = [ + "prost", "transport" +] } diff --git a/crates/topos-test-sdk/build.rs b/crates/topos-test-sdk/build.rs index 9b34944c2..c655ef162 100644 --- a/crates/topos-test-sdk/build.rs +++ b/crates/topos-test-sdk/build.rs @@ -12,4 +12,13 @@ fn main() { "cargo:rustc-env=TOPOS_TEST_SDK_TMP={}", path.to_str().unwrap() ); + + let path = PathBuf::from("./src/grpc/behaviour/"); + let descriptor_path = path.join("helloworld_descriptor.bin"); + + tonic_build::configure() + .out_dir(path) + .file_descriptor_set_path(descriptor_path) + .compile(&["./proto/behaviour/helloworld.proto"], &["proto/"]) + .unwrap(); } diff --git a/crates/topos-test-sdk/proto/behaviour/helloworld.proto b/crates/topos-test-sdk/proto/behaviour/helloworld.proto new file mode 100644 index 000000000..20d9fc673 --- /dev/null +++ b/crates/topos-test-sdk/proto/behaviour/helloworld.proto @@ -0,0 +1,28 @@ +syntax = "proto3"; + +package helloworld; + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} + + // Send a greeting with a delay + rpc SayHelloWithDelay(HelloWithDelayRequest) returns (HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The request message containing the user's name and the delay. +message HelloWithDelayRequest { + string name = 1; + uint64 delay_in_seconds = 2; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} diff --git a/crates/topos-test-sdk/src/grpc/behaviour/helloworld.rs b/crates/topos-test-sdk/src/grpc/behaviour/helloworld.rs new file mode 100644 index 000000000..d5ef40611 --- /dev/null +++ b/crates/topos-test-sdk/src/grpc/behaviour/helloworld.rs @@ -0,0 +1,328 @@ +/// The request message containing the user's name. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HelloRequest { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, +} +/// The request message containing the user's name and the delay. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HelloWithDelayRequest { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, + #[prost(uint64, tag = "2")] + pub delay_in_seconds: u64, +} +/// The response message containing the greetings +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HelloReply { + #[prost(string, tag = "1")] + pub message: ::prost::alloc::string::String, +} +/// Generated client implementations. +pub mod greeter_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + /// The greeting service definition. + #[derive(Debug, Clone)] + pub struct GreeterClient { + inner: tonic::client::Grpc, + } + impl GreeterClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: std::convert::TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl GreeterClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> GreeterClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + GreeterClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Sends a greeting + pub async fn say_hello( + &mut self, + request: impl tonic::IntoRequest, + ) -> Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/helloworld.Greeter/SayHello", + ); + self.inner.unary(request.into_request(), path, codec).await + } + /// Send a greeting with a delay + pub async fn say_hello_with_delay( + &mut self, + request: impl tonic::IntoRequest, + ) -> Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/helloworld.Greeter/SayHelloWithDelay", + ); + self.inner.unary(request.into_request(), path, codec).await + } + } +} +/// Generated server implementations. +pub mod greeter_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with GreeterServer. + #[async_trait] + pub trait Greeter: Send + Sync + 'static { + /// Sends a greeting + async fn say_hello( + &self, + request: tonic::Request, + ) -> Result, tonic::Status>; + /// Send a greeting with a delay + async fn say_hello_with_delay( + &self, + request: tonic::Request, + ) -> Result, tonic::Status>; + } + /// The greeting service definition. + #[derive(Debug)] + pub struct GreeterServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + } + struct _Inner(Arc); + impl GreeterServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + } + impl tonic::codegen::Service> for GreeterServer + where + T: Greeter, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/helloworld.Greeter/SayHello" => { + #[allow(non_camel_case_types)] + struct SayHelloSvc(pub Arc); + impl tonic::server::UnaryService + for SayHelloSvc { + type Response = super::HelloReply; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = self.0.clone(); + let fut = async move { (*inner).say_hello(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = SayHelloSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/helloworld.Greeter/SayHelloWithDelay" => { + #[allow(non_camel_case_types)] + struct SayHelloWithDelaySvc(pub Arc); + impl< + T: Greeter, + > tonic::server::UnaryService + for SayHelloWithDelaySvc { + type Response = super::HelloReply; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = self.0.clone(); + let fut = async move { + (*inner).say_hello_with_delay(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = SayHelloWithDelaySvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for GreeterServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(self.0.clone()) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService for GreeterServer { + const NAME: &'static str = "helloworld.Greeter"; + } +} diff --git a/crates/topos-test-sdk/src/grpc/behaviour/helloworld_descriptor.bin b/crates/topos-test-sdk/src/grpc/behaviour/helloworld_descriptor.bin new file mode 100644 index 000000000..71d852492 Binary files /dev/null and b/crates/topos-test-sdk/src/grpc/behaviour/helloworld_descriptor.bin differ diff --git a/crates/topos-test-sdk/src/lib.rs b/crates/topos-test-sdk/src/lib.rs index cc06d308f..c1fa88e2b 100644 --- a/crates/topos-test-sdk/src/lib.rs +++ b/crates/topos-test-sdk/src/lib.rs @@ -15,6 +15,12 @@ use lazy_static::lazy_static; lazy_static! { pub static ref PORT_MAPPING: Mutex> = Mutex::new(HashSet::new()); } +pub mod grpc { + pub mod behaviour { + #[rustfmt::skip] + pub mod helloworld; + } +} pub mod constants { use proc_macro_sdk::generate_certificate_ids;