From a06362ea2bb37038e98a3361fc5c02eb3dff20d1 Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Mon, 25 Mar 2024 16:49:53 +0530 Subject: [PATCH] Use UDF specific names for server info file Signed-off-by: Sreekanth --- src/map.rs | 45 ++++++++++++++++++------------------------ src/reduce.rs | 8 ++------ src/shared.rs | 30 +++++++++++++++------------- src/sideinput.rs | 10 ++++------ src/sink.rs | 43 +++++++++++++++++----------------------- src/source.rs | 36 +++++++++++++-------------------- src/sourcetransform.rs | 36 +++++++++++++-------------------- 7 files changed, 87 insertions(+), 121 deletions(-) diff --git a/src/map.rs b/src/map.rs index 32dae55..d08074a 100644 --- a/src/map.rs +++ b/src/map.rs @@ -1,6 +1,7 @@ -use chrono::{DateTime, Utc}; +use std::future::Future; use std::path::PathBuf; -use tokio::sync::oneshot; + +use chrono::{DateTime, Utc}; use tonic::{async_trait, Request, Response, Status}; use crate::shared; @@ -121,21 +122,16 @@ pub struct Server { sock_addr: PathBuf, max_message_size: usize, server_info_file: PathBuf, - map_svc: Option, + svc: Option, } impl Server { pub fn new(map_svc: T) -> Self { - let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() { - "/var/run/numaflow/server-info" - } else { - "/tmp/numaflow.server-info" - }; Server { sock_addr: "/var/run/numaflow/map.sock".into(), max_message_size: 64 * 1024 * 1024, - server_info_file: server_info_file.into(), - map_svc: Some(map_svc), + server_info_file: "/var/run/numaflow/mapper-server-info".into(), + svc: Some(map_svc), } } @@ -157,42 +153,38 @@ impl Server { self } - /// Get the maximum size of an encoded and decoded gRPC message in bytes. Default value is 4MB. + /// Get the maximum size of an encoded and decoded gRPC message in bytes. Default value is 64MB. pub fn max_message_size(&self) -> usize { self.max_message_size } - /// Change the file in which numflow server information is stored on start up to the new value. Default value is `/tmp/numaflow.server-info` + /// Change the file in which numflow server information is stored on start up to the new value. Default value is `/var/run/numaflow/mapper-server-info` pub fn with_server_info_file(mut self, file: impl Into) -> Self { self.server_info_file = file.into(); self } - /// Get the path to the file where numaflow server info is stored. Default value is `/tmp/numaflow.server-info` + /// Get the path to the file where numaflow server info is stored. Default value is `/var/run/numaflow/mapper-server-info` pub fn server_info_file(&self) -> &std::path::Path { self.server_info_file.as_path() } /// Starts the gRPC server. When message is received on the `shutdown` channel, graceful shutdown of the gRPC server will be initiated. - pub async fn start_with_shutdown( + pub async fn start_with_shutdown( &mut self, - shutdown: oneshot::Receiver<()>, + shutdown: F, ) -> Result<(), Box> where T: Mapper + Send + Sync + 'static, + F: Future, { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; - let handler = self.map_svc.take().unwrap(); + let handler = self.svc.take().unwrap(); let map_svc = MapService { handler }; let map_svc = proto::map_server::MapServer::new(map_svc) .max_encoding_message_size(self.max_message_size) .max_decoding_message_size(self.max_message_size); - let shutdown = async { - shutdown - .await - .expect("Receiving message from shutdown channel"); - }; tonic::transport::Server::builder() .add_service(map_svc) .serve_with_incoming_shutdown(listener, shutdown) @@ -205,9 +197,7 @@ impl Server { where T: Mapper + Send + Sync + 'static, { - let (tx, rx) = oneshot::channel::<()>(); - tokio::spawn(shared::wait_for_signal(tx)); - self.start_with_shutdown(rx).await + self.start_with_shutdown(shared::shutdown_signal()).await } } @@ -238,7 +228,7 @@ mod tests { let tmp_dir = TempDir::new()?; let sock_file = tmp_dir.path().join("map.sock"); - let server_info_file = tmp_dir.path().join("server_info"); + let server_info_file = tmp_dir.path().join("mapper-server-info"); let mut server = map::Server::new(Cat) .with_server_info_file(&server_info_file) @@ -250,7 +240,10 @@ mod tests { assert_eq!(server.socket_file(), sock_file); let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await }); + let shutdown = async { + shutdown_rx.await.unwrap(); + }; + let task = tokio::spawn(async move { server.start_with_shutdown(shutdown).await }); tokio::time::sleep(Duration::from_millis(50)).await; diff --git a/src/reduce.rs b/src/reduce.rs index 8c28068..18827cb 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -303,13 +303,9 @@ pub async fn start_uds_server(m: T) -> Result<(), Box) -> Option { }) } -pub(crate) async fn wait_for_signal(tx: oneshot::Sender<()>) { - use tokio::signal::unix::{signal, SignalKind}; - let mut interrupt = - signal(SignalKind::interrupt()).expect("Failed to register SIGINT interrupt handler"); - let mut termination = - signal(SignalKind::terminate()).expect("Failed to register SIGTERM interrupt handler"); +pub(crate) async fn shutdown_signal() { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install SIGINT handler"); + }; + + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install SITERM handler") + .recv() + .await; + }; tokio::select! { - _ = interrupt.recv() => { - tracing::info!("Received SIGINT. Stopping gRPC server") - } - _ = termination.recv() => { - tracing::info!("Received SIGTERM. Stopping gRPC server") - } + _ = ctrl_c => {}, + _ = terminate => {}, } - tx.send(()).expect("Sending shutdown signal to gRPC server"); } diff --git a/src/sideinput.rs b/src/sideinput.rs index ce0d3ee..49ce051 100644 --- a/src/sideinput.rs +++ b/src/sideinput.rs @@ -46,13 +46,11 @@ pub async fn start_uds_server(m: T) -> Result<(), Box { impl Server { pub fn new(svc: T) -> Self { - let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() { - "/var/run/numaflow/server-info" - } else { - "/tmp/numaflow.server-info" - }; Self { sock_addr: "/var/run/numaflow/sink.sock".into(), max_message_size: 64 * 1024 * 1024, - server_info_file: server_info_file.into(), + server_info_file: "/var/run/numaflow/sinker-server-info".into(), svc: Some(svc), } } @@ -198,40 +194,41 @@ impl Server { self } - /// Get the unix domain socket file path where gRPC server listens for incoming connections. Default value is `/var/run/numaflow/map.sock` + /// Get the unix domain socket file path where gRPC server listens for incoming connections. Default value is `/var/run/numaflow/sink.sock` pub fn socket_file(&self) -> &std::path::Path { self.sock_addr.as_path() } - /// Set the maximum size of an encoded and decoded gRPC message. The value of `message_size` is in bytes. Default value is 4MB. + /// Set the maximum size of an encoded and decoded gRPC message. The value of `message_size` is in bytes. Default value is 64MB. pub fn with_max_message_size(mut self, message_size: usize) -> Self { self.max_message_size = message_size; self } - /// Get the maximum size of an encoded and decoded gRPC message in bytes. Default value is 4MB. + /// Get the maximum size of an encoded and decoded gRPC message in bytes. Default value is 64MB. pub fn max_message_size(&self) -> usize { self.max_message_size } - /// Change the file in which numflow server information is stored on start up to the new value. Default value is `/tmp/numaflow.server-info` + /// Change the file in which numflow server information is stored on start up to the new value. Default value is `/var/run/numaflow/sinker-server-info` pub fn with_server_info_file(mut self, file: impl Into) -> Self { self.server_info_file = file.into(); self } - /// Get the path to the file where numaflow server info is stored. Default value is `/tmp/numaflow.server-info` + /// Get the path to the file where numaflow server info is stored. Default value is `/var/run/numaflow/sinker-server-info` pub fn server_info_file(&self) -> &std::path::Path { self.server_info_file.as_path() } /// Starts the gRPC server. When message is received on the `shutdown` channel, graceful shutdown of the gRPC server will be initiated. - pub async fn start_with_shutdown( + pub async fn start_with_shutdown( &mut self, - shutdown: oneshot::Receiver<()>, + shutdown: F, ) -> Result<(), Box> where T: Sinker + Send + Sync + 'static, + F: Future, { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; let handler = self.svc.take().unwrap(); @@ -240,11 +237,6 @@ impl Server { .max_encoding_message_size(self.max_message_size) .max_decoding_message_size(self.max_message_size); - let shutdown = async { - shutdown - .await - .expect("Receiving message from shutdown channel"); - }; tonic::transport::Server::builder() .add_service(svc) .serve_with_incoming_shutdown(listener, shutdown) @@ -257,9 +249,7 @@ impl Server { where T: Sinker + Send + Sync + 'static, { - let (tx, rx) = oneshot::channel::<()>(); - tokio::spawn(shared::wait_for_signal(tx)); - self.start_with_shutdown(rx).await + self.start_with_shutdown(shared::shutdown_signal()).await } } @@ -315,8 +305,8 @@ mod tests { } let tmp_dir = TempDir::new()?; - let sock_file = tmp_dir.path().join("map.sock"); - let server_info_file = tmp_dir.path().join("server_info"); + let sock_file = tmp_dir.path().join("sink.sock"); + let server_info_file = tmp_dir.path().join("sinker-server-info"); let mut server = sink::Server::new(Logger) .with_server_info_file(&server_info_file) @@ -328,7 +318,10 @@ mod tests { assert_eq!(server.socket_file(), sock_file); let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await }); + let shutdown = async { + shutdown_rx.await.unwrap(); + }; + let task = tokio::spawn(async move { server.start_with_shutdown(shutdown).await }); tokio::time::sleep(Duration::from_millis(50)).await; diff --git a/src/source.rs b/src/source.rs index 1e05698..cbd7f77 100644 --- a/src/source.rs +++ b/src/source.rs @@ -1,5 +1,6 @@ #![warn(missing_docs)] +use std::future::Future; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -7,7 +8,6 @@ use std::time::Duration; use crate::shared::{self, prost_timestamp_from_utc}; use chrono::{DateTime, Utc}; use tokio::sync::mpsc::{self, Sender}; -use tokio::sync::oneshot; use tokio_stream::wrappers::ReceiverStream; use tonic::{async_trait, Request, Response, Status}; @@ -205,15 +205,10 @@ pub struct Server { impl Server { /// Creates a new gRPC `Server` instance pub fn new(source_svc: T) -> Self { - let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() { - "/var/run/numaflow/server-info" - } else { - "/tmp/numaflow.server-info" - }; Server { sock_addr: "/var/run/numaflow/source.sock".into(), max_message_size: 64 * 1024 * 1024, - server_info_file: server_info_file.into(), + server_info_file: "/var/run/numaflow/sourcer-server-info".into(), svc: Some(source_svc), } } @@ -230,7 +225,7 @@ impl Server { self.sock_addr.as_path() } - /// Set the maximum size of an encoded and decoded gRPC message. The value of `message_size` is in bytes. Default value is 4MB. + /// Set the maximum size of an encoded and decoded gRPC message. The value of `message_size` is in bytes. Default value is 64MB. pub fn with_max_message_size(mut self, message_size: usize) -> Self { self.max_message_size = message_size; self @@ -241,24 +236,25 @@ impl Server { self.max_message_size } - /// Change the file in which numflow server information is stored on start up to the new value. Default value is `/tmp/numaflow.server-info` + /// Change the file in which numflow server information is stored on start up to the new value. Default value is `/var/run/numaflow/sourcer-server-info` pub fn with_server_info_file(mut self, file: impl Into) -> Self { self.server_info_file = file.into(); self } - /// Get the path to the file where numaflow server info is stored. Default value is `/tmp/numaflow.server-info` + /// Get the path to the file where numaflow server info is stored. Default value is `/var/run/numaflow/sourcer-server-info` pub fn server_info_file(&self) -> &std::path::Path { self.server_info_file.as_path() } /// Starts the gRPC server. When message is received on the `shutdown` channel, graceful shutdown of the gRPC server will be initiated. - pub async fn start_with_shutdown( + pub async fn start_with_shutdown( &mut self, - shutdown: oneshot::Receiver<()>, + shutdown: F, ) -> Result<(), Box> where T: Sourcer + Send + Sync + 'static, + F: Future, { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; let handler = self.svc.take().unwrap(); @@ -270,11 +266,6 @@ impl Server { .max_decoding_message_size(self.max_message_size) .max_decoding_message_size(self.max_message_size); - let shutdown = async { - shutdown - .await - .expect("Receiving message from shutdown channel"); - }; tonic::transport::Server::builder() .add_service(source_svc) .serve_with_incoming_shutdown(listener, shutdown) @@ -287,9 +278,7 @@ impl Server { where T: Sourcer + Send + Sync + 'static, { - let (tx, rx) = oneshot::channel::<()>(); - tokio::spawn(shared::wait_for_signal(tx)); - self.start_with_shutdown(rx).await + self.start_with_shutdown(shared::shutdown_signal()).await } } @@ -374,7 +363,7 @@ mod tests { async fn source_server() -> Result<(), Box> { let tmp_dir = TempDir::new()?; let sock_file = tmp_dir.path().join("source.sock"); - let server_info_file = tmp_dir.path().join("server_info"); + let server_info_file = tmp_dir.path().join("sourcer-server-info"); let mut server = source::Server::new(Repeater::new(8)) .with_server_info_file(&server_info_file) @@ -386,7 +375,10 @@ mod tests { assert_eq!(server.socket_file(), sock_file); let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await }); + let shutdown = async { + shutdown_rx.await.unwrap(); + }; + let task = tokio::spawn(async move { server.start_with_shutdown(shutdown).await }); tokio::time::sleep(Duration::from_millis(50)).await; diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index 48a432f..bae4548 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -1,7 +1,7 @@ +use std::future::Future; use std::path::PathBuf; use chrono::{DateTime, Utc}; -use tokio::sync::oneshot; use tonic::{async_trait, Request, Response, Status}; use crate::shared::{self, prost_timestamp_from_utc}; @@ -144,15 +144,10 @@ pub struct Server { impl Server { pub fn new(sourcetransformer_svc: T) -> Self { - let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() { - "/var/run/numaflow/server-info" - } else { - "/tmp/numaflow.server-info" - }; Server { sock_addr: "/var/run/numaflow/sourcetransform.sock".into(), max_message_size: 64 * 1024 * 1024, - server_info_file: server_info_file.into(), + server_info_file: "/var/run/numaflow/sourcetransformer-server-info".into(), svc: Some(sourcetransformer_svc), } } @@ -175,29 +170,30 @@ impl Server { self } - /// Get the maximum size of an encoded and decoded gRPC message in bytes. Default value is 4MB. + /// Get the maximum size of an encoded and decoded gRPC message in bytes. Default value is 64MB. pub fn max_message_size(&self) -> usize { self.max_message_size } - /// Change the file in which numflow server information is stored on start up to the new value. Default value is `/tmp/numaflow.server-info` + /// Change the file in which numflow server information is stored on start up to the new value. Default value is `/var/run/numaflow/sourcetransformer-server-info` pub fn with_server_info_file(mut self, file: impl Into) -> Self { self.server_info_file = file.into(); self } - /// Get the path to the file where numaflow server info is stored. Default value is `/tmp/numaflow.server-info` + /// Get the path to the file where numaflow server info is stored. Default value is `/var/run/numaflow/sourcetransformer-server-info` pub fn server_info_file(&self) -> &std::path::Path { self.server_info_file.as_path() } /// Starts the gRPC server. When message is received on the `shutdown` channel, graceful shutdown of the gRPC server will be initiated. - pub async fn start_with_shutdown( + pub async fn start_with_shutdown( &mut self, - shutdown: oneshot::Receiver<()>, + shutdown: F, ) -> Result<(), Box> where T: SourceTransformer + Send + Sync + 'static, + F: Future, { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; let handler = self.svc.take().unwrap(); @@ -207,11 +203,6 @@ impl Server { .max_encoding_message_size(self.max_message_size) .max_decoding_message_size(self.max_message_size); - let shutdown = async { - shutdown - .await - .expect("Receiving message from shutdown channel"); - }; tonic::transport::Server::builder() .add_service(sourcetrf_svc) .serve_with_incoming_shutdown(listener, shutdown) @@ -224,9 +215,7 @@ impl Server { where T: SourceTransformer + Send + Sync + 'static, { - let (tx, rx) = oneshot::channel::<()>(); - tokio::spawn(shared::wait_for_signal(tx)); - self.start_with_shutdown(rx).await + self.start_with_shutdown(shared::shutdown_signal()).await } } @@ -261,7 +250,7 @@ mod tests { let tmp_dir = TempDir::new()?; let sock_file = tmp_dir.path().join("sourcetransform.sock"); - let server_info_file = tmp_dir.path().join("server_info"); + let server_info_file = tmp_dir.path().join("sourcetransformer-server-info"); let mut server = sourcetransform::Server::new(NowCat) .with_server_info_file(&server_info_file) @@ -273,7 +262,10 @@ mod tests { assert_eq!(server.socket_file(), sock_file); let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await }); + let shutdown = async { + shutdown_rx.await.unwrap(); + }; + let task = tokio::spawn(async move { server.start_with_shutdown(shutdown).await }); tokio::time::sleep(Duration::from_millis(50)).await;