Skip to content

Commit

Permalink
Use UDF specific names for server info file (#35)
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing authored Mar 28, 2024
1 parent 9d81686 commit 819ddad
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 121 deletions.
45 changes: 19 additions & 26 deletions src/map.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use chrono::{DateTime, Utc};
use std::future::Future;
use std::path::PathBuf;
use tokio::sync::oneshot;

use chrono::{DateTime, Utc};
use tonic::{async_trait, Request, Response, Status};

use crate::shared;
Expand Down Expand Up @@ -121,21 +122,16 @@ pub struct Server<T> {
sock_addr: PathBuf,
max_message_size: usize,
server_info_file: PathBuf,
map_svc: Option<T>,
svc: Option<T>,
}

impl<T> Server<T> {
pub fn new(map_svc: T) -> Self {
let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() {
"/var/run/numaflow/server-info"
} else {
"/tmp/numaflow.server-info"
};
Server {
sock_addr: "/var/run/numaflow/map.sock".into(),
max_message_size: 64 * 1024 * 1024,
server_info_file: server_info_file.into(),
map_svc: Some(map_svc),
server_info_file: "/var/run/numaflow/mapper-server-info".into(),
svc: Some(map_svc),
}
}

Expand All @@ -157,42 +153,38 @@ impl<T> Server<T> {
self
}

/// Get the maximum size of an encoded and decoded gRPC message in bytes. Default value is 4MB.
/// Get the maximum size of an encoded and decoded gRPC message in bytes. Default value is 64MB.
pub fn max_message_size(&self) -> usize {
self.max_message_size
}

/// Change the file in which numflow server information is stored on start up to the new value. Default value is `/tmp/numaflow.server-info`
/// Change the file in which numflow server information is stored on start up to the new value. Default value is `/var/run/numaflow/mapper-server-info`
pub fn with_server_info_file(mut self, file: impl Into<PathBuf>) -> Self {
self.server_info_file = file.into();
self
}

/// Get the path to the file where numaflow server info is stored. Default value is `/tmp/numaflow.server-info`
/// Get the path to the file where numaflow server info is stored. Default value is `/var/run/numaflow/mapper-server-info`
pub fn server_info_file(&self) -> &std::path::Path {
self.server_info_file.as_path()
}

/// Starts the gRPC server. When message is received on the `shutdown` channel, graceful shutdown of the gRPC server will be initiated.
pub async fn start_with_shutdown(
pub async fn start_with_shutdown<F>(
&mut self,
shutdown: oneshot::Receiver<()>,
shutdown: F,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
T: Mapper + Send + Sync + 'static,
F: Future<Output = ()>,
{
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let handler = self.map_svc.take().unwrap();
let handler = self.svc.take().unwrap();
let map_svc = MapService { handler };
let map_svc = proto::map_server::MapServer::new(map_svc)
.max_encoding_message_size(self.max_message_size)
.max_decoding_message_size(self.max_message_size);

let shutdown = async {
shutdown
.await
.expect("Receiving message from shutdown channel");
};
tonic::transport::Server::builder()
.add_service(map_svc)
.serve_with_incoming_shutdown(listener, shutdown)
Expand All @@ -205,9 +197,7 @@ impl<T> Server<T> {
where
T: Mapper + Send + Sync + 'static,
{
let (tx, rx) = oneshot::channel::<()>();
tokio::spawn(shared::wait_for_signal(tx));
self.start_with_shutdown(rx).await
self.start_with_shutdown(shared::shutdown_signal()).await
}
}

Expand Down Expand Up @@ -238,7 +228,7 @@ mod tests {

let tmp_dir = TempDir::new()?;
let sock_file = tmp_dir.path().join("map.sock");
let server_info_file = tmp_dir.path().join("server_info");
let server_info_file = tmp_dir.path().join("mapper-server-info");

let mut server = map::Server::new(Cat)
.with_server_info_file(&server_info_file)
Expand All @@ -250,7 +240,10 @@ mod tests {
assert_eq!(server.socket_file(), sock_file);

let (shutdown_tx, shutdown_rx) = oneshot::channel();
let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await });
let shutdown = async {
shutdown_rx.await.unwrap();
};
let task = tokio::spawn(async move { server.start_with_shutdown(shutdown).await });

tokio::time::sleep(Duration::from_millis(50)).await;

Expand Down
8 changes: 2 additions & 6 deletions src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,9 @@ pub async fn start_uds_server<T>(m: T) -> Result<(), Box<dyn std::error::Error +
where
T: Reducer + Send + Sync + 'static,
{
let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() {
"/var/run/numaflow/server-info"
} else {
"/tmp/numaflow.server-info"
};
let socket_file = "/var/run/numaflow/reduce.sock";
let listener = shared::create_listener_stream(socket_file, server_info_file)?;
let listener =
shared::create_listener_stream(socket_file, "/var/run/numaflow/reducer-server-info")?;
let reduce_svc = ReduceService {
handler: Arc::new(m),
};
Expand Down
30 changes: 16 additions & 14 deletions src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{collections::HashMap, io};

use chrono::{DateTime, TimeZone, Timelike, Utc};
use prost_types::Timestamp;
use tokio::sync::oneshot;
use tokio::signal;
use tokio_stream::wrappers::UnixListenerStream;
use tracing::info;

Expand Down Expand Up @@ -57,19 +57,21 @@ pub(crate) fn prost_timestamp_from_utc(t: DateTime<Utc>) -> Option<Timestamp> {
})
}

pub(crate) async fn wait_for_signal(tx: oneshot::Sender<()>) {
use tokio::signal::unix::{signal, SignalKind};
let mut interrupt =
signal(SignalKind::interrupt()).expect("Failed to register SIGINT interrupt handler");
let mut termination =
signal(SignalKind::terminate()).expect("Failed to register SIGTERM interrupt handler");
pub(crate) async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install SIGINT handler");
};

let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install SITERM handler")
.recv()
.await;
};
tokio::select! {
_ = interrupt.recv() => {
tracing::info!("Received SIGINT. Stopping gRPC server")
}
_ = termination.recv() => {
tracing::info!("Received SIGTERM. Stopping gRPC server")
}
_ = ctrl_c => {},
_ = terminate => {},
}
tx.send(()).expect("Sending shutdown signal to gRPC server");
}
10 changes: 4 additions & 6 deletions src/sideinput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,11 @@ pub async fn start_uds_server<T>(m: T) -> Result<(), Box<dyn std::error::Error +
where
T: SideInputer + Send + Sync + 'static,
{
let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() {
"/var/run/numaflow/server-info"
} else {
"/tmp/numaflow.server-info"
};
let socket_file = "/var/run/numaflow/sideinput.sock";
let listener = crate::shared::create_listener_stream(socket_file, server_info_file)?;
let listener = crate::shared::create_listener_stream(
socket_file,
"/var/run/numaflow/sideinput-server-info",
)?;
let si_svc = SideInputService { handler: m };

tonic::transport::Server::builder()
Expand Down
43 changes: 18 additions & 25 deletions src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::future::Future;
use std::path::PathBuf;

use chrono::{DateTime, Utc};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc;
use tonic::{Request, Status, Streaming};

use crate::shared;
Expand Down Expand Up @@ -178,15 +179,10 @@ pub struct Server<T> {

impl<T> Server<T> {
pub fn new(svc: T) -> Self {
let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() {
"/var/run/numaflow/server-info"
} else {
"/tmp/numaflow.server-info"
};
Self {
sock_addr: "/var/run/numaflow/sink.sock".into(),
max_message_size: 64 * 1024 * 1024,
server_info_file: server_info_file.into(),
server_info_file: "/var/run/numaflow/sinker-server-info".into(),
svc: Some(svc),
}
}
Expand All @@ -198,40 +194,41 @@ impl<T> Server<T> {
self
}

/// Get the unix domain socket file path where gRPC server listens for incoming connections. Default value is `/var/run/numaflow/map.sock`
/// Get the unix domain socket file path where gRPC server listens for incoming connections. Default value is `/var/run/numaflow/sink.sock`
pub fn socket_file(&self) -> &std::path::Path {
self.sock_addr.as_path()
}

/// Set the maximum size of an encoded and decoded gRPC message. The value of `message_size` is in bytes. Default value is 4MB.
/// Set the maximum size of an encoded and decoded gRPC message. The value of `message_size` is in bytes. Default value is 64MB.
pub fn with_max_message_size(mut self, message_size: usize) -> Self {
self.max_message_size = message_size;
self
}

/// Get the maximum size of an encoded and decoded gRPC message in bytes. Default value is 4MB.
/// Get the maximum size of an encoded and decoded gRPC message in bytes. Default value is 64MB.
pub fn max_message_size(&self) -> usize {
self.max_message_size
}

/// Change the file in which numflow server information is stored on start up to the new value. Default value is `/tmp/numaflow.server-info`
/// Change the file in which numflow server information is stored on start up to the new value. Default value is `/var/run/numaflow/sinker-server-info`
pub fn with_server_info_file(mut self, file: impl Into<PathBuf>) -> Self {
self.server_info_file = file.into();
self
}

/// Get the path to the file where numaflow server info is stored. Default value is `/tmp/numaflow.server-info`
/// Get the path to the file where numaflow server info is stored. Default value is `/var/run/numaflow/sinker-server-info`
pub fn server_info_file(&self) -> &std::path::Path {
self.server_info_file.as_path()
}

/// Starts the gRPC server. When message is received on the `shutdown` channel, graceful shutdown of the gRPC server will be initiated.
pub async fn start_with_shutdown(
pub async fn start_with_shutdown<F>(
&mut self,
shutdown: oneshot::Receiver<()>,
shutdown: F,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
T: Sinker + Send + Sync + 'static,
F: Future<Output = ()>,
{
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let handler = self.svc.take().unwrap();
Expand All @@ -240,11 +237,6 @@ impl<T> Server<T> {
.max_encoding_message_size(self.max_message_size)
.max_decoding_message_size(self.max_message_size);

let shutdown = async {
shutdown
.await
.expect("Receiving message from shutdown channel");
};
tonic::transport::Server::builder()
.add_service(svc)
.serve_with_incoming_shutdown(listener, shutdown)
Expand All @@ -257,9 +249,7 @@ impl<T> Server<T> {
where
T: Sinker + Send + Sync + 'static,
{
let (tx, rx) = oneshot::channel::<()>();
tokio::spawn(shared::wait_for_signal(tx));
self.start_with_shutdown(rx).await
self.start_with_shutdown(shared::shutdown_signal()).await
}
}

Expand Down Expand Up @@ -315,8 +305,8 @@ mod tests {
}

let tmp_dir = TempDir::new()?;
let sock_file = tmp_dir.path().join("map.sock");
let server_info_file = tmp_dir.path().join("server_info");
let sock_file = tmp_dir.path().join("sink.sock");
let server_info_file = tmp_dir.path().join("sinker-server-info");

let mut server = sink::Server::new(Logger)
.with_server_info_file(&server_info_file)
Expand All @@ -328,7 +318,10 @@ mod tests {
assert_eq!(server.socket_file(), sock_file);

let (shutdown_tx, shutdown_rx) = oneshot::channel();
let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await });
let shutdown = async {
shutdown_rx.await.unwrap();
};
let task = tokio::spawn(async move { server.start_with_shutdown(shutdown).await });

tokio::time::sleep(Duration::from_millis(50)).await;

Expand Down
Loading

0 comments on commit 819ddad

Please sign in to comment.