diff --git a/vmicore/rust_src/Cargo.toml b/vmicore/rust_src/Cargo.toml index 2a8a03d8..08f128cb 100644 --- a/vmicore/rust_src/Cargo.toml +++ b/vmicore/rust_src/Cargo.toml @@ -7,22 +7,23 @@ edition = "2021" crate-type = ["lib", "staticlib"] [dependencies] -tonic = "0.11.0" -prost = "0.12.3" -prost-types = "0.12.3" -tokio = { version = "1.22", features = ["macros", "rt-multi-thread", "net"] } +tonic = "0.12.3" +prost = "0.13.3" +prost-types = "0.13.3" +tokio = { version = "1.41", features = ["macros", "rt-multi-thread", "net"] } +tokio-stream = "0.1.16" +hyper-util = "0.1.10" cxx = "1.0" triggered = "0.1.2" -async-std = "1.12" -async-stream = "0.3.3" -futures-core = "0.3.25" -chrono = "0.4.23" +async-std = "1.13" +async-stream = "0.3.6" +futures-core = "0.3.31" +chrono = "0.4.38" thiserror = "1" -futures = { version = "0.3", default-features = false, features = ["alloc"] } [dev-dependencies] -tower = "0.4.13" -ctrlc = "3.4.0" +tower = "0.5.1" +ctrlc = "3.4.5" [build-dependencies] -tonic-build = "0.11.0" +tonic-build = "0.12.3" diff --git a/vmicore/rust_src/build.rs b/vmicore/rust_src/build.rs index 32c342b4..da902a1a 100644 --- a/vmicore/rust_src/build.rs +++ b/vmicore/rust_src/build.rs @@ -1,5 +1,5 @@ fn main() -> Result<(), Box> { - tonic_build::configure().build_client(true).compile( + tonic_build::configure().build_client(true).compile_protos( &[ "./protos/pkg/logging/service/v1/log_svc.proto", "./protos/pkg/vmi/v1/vmi_svc.proto", diff --git a/vmicore/rust_src/examples/grpc_client.rs b/vmicore/rust_src/examples/grpc_client.rs index 613ef0a8..d19cc6d6 100644 --- a/vmicore/rust_src/examples/grpc_client.rs +++ b/vmicore/rust_src/examples/grpc_client.rs @@ -1,5 +1,6 @@ use std::error::Error; +use hyper_util::rt::TokioIo; use tokio::{net::UnixStream, try_join}; use tonic::{ transport::{Channel, Endpoint, Uri}, @@ -14,6 +15,8 @@ use rust_grpc_server::pkg::logging::{ }; use rust_grpc_server::pkg::vmi::v1::{vmi_service_client::VmiServiceClient, ListenForEventsRequest}; +const UNIX_SOCKET_ADDR: &str = "/tmp/vmi.sock"; + async fn log_client(channel: Channel, trigger: Trigger, listener: Listener) -> Result<(), Box> { let mut client = LogServiceClient::new(channel); @@ -59,13 +62,11 @@ async fn vmi_client(channel: Channel, trigger: Trigger, listener: Listener) -> R #[tokio::main] async fn main() -> Result<(), Box> { - let addr = "/tmp/vmi.sock"; - // Url not used let channel = Endpoint::try_from("http://[::]:50051")? - .connect_with_connector(service_fn(move |_: Uri| { + .connect_with_connector(service_fn(|_: Uri| async { // Connect to a Uds socket - UnixStream::connect(addr) + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(UNIX_SOCKET_ADDR).await?)) })) .await?; diff --git a/vmicore/rust_src/src/grpc_server.rs b/vmicore/rust_src/src/grpc_server.rs index f28197d4..5a777ad4 100644 --- a/vmicore/rust_src/src/grpc_server.rs +++ b/vmicore/rust_src/src/grpc_server.rs @@ -13,13 +13,13 @@ use crate::pkg::vmi::v1::{ use async_std::channel::{unbounded, Receiver, Sender}; use async_std::task; use cxx::CxxVector; -use futures::TryFutureExt; use std::error::Error; use std::fmt::Debug; use std::pin::Pin; use std::result; use std::time::SystemTime; use tokio::net::UnixListener; +use tokio_stream::wrappers::UnixListenerStream; use tonic::transport::Server; use tonic::{Status, Streaming}; @@ -78,11 +78,7 @@ impl GRPCServer { } let uds = UnixListener::bind(addr_ref)?; - async_stream::stream! { - loop { - yield uds.accept().map_ok(|(st, _)| crate::unix_socket::UnixStream(st)).await; - } - } + UnixListenerStream::new(uds) }; Server::builder() diff --git a/vmicore/rust_src/src/lib.rs b/vmicore/rust_src/src/lib.rs index 53e1843a..5fbfadc3 100644 --- a/vmicore/rust_src/src/lib.rs +++ b/vmicore/rust_src/src/lib.rs @@ -5,7 +5,6 @@ mod grpc_logger; pub mod grpc_server; mod grpc_vmi_service; mod logging; -mod unix_socket; pub mod pkg { #![allow(clippy::derive_partial_eq_without_eq)] diff --git a/vmicore/rust_src/src/unix_socket.rs b/vmicore/rust_src/src/unix_socket.rs deleted file mode 100644 index 12bd3f67..00000000 --- a/vmicore/rust_src/src/unix_socket.rs +++ /dev/null @@ -1,48 +0,0 @@ -use std::{ - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; - -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tonic::transport::server::Connected; - -#[derive(Debug)] -pub struct UnixStream(pub tokio::net::UnixStream); - -impl Connected for UnixStream { - type ConnectInfo = UdsConnectInfo; - - fn connect_info(&self) -> Self::ConnectInfo { - UdsConnectInfo { - peer_addr: self.0.peer_addr().ok().map(Arc::new), - peer_cred: self.0.peer_cred().ok(), - } - } -} - -#[derive(Clone, Debug)] -pub struct UdsConnectInfo { - pub peer_addr: Option>, - pub peer_cred: Option, -} - -impl AsyncRead for UnixStream { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - Pin::new(&mut self.0).poll_read(cx, buf) - } -} - -impl AsyncWrite for UnixStream { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - Pin::new(&mut self.0).poll_write(cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_shutdown(cx) - } -}