diff --git a/build.rs b/build.rs index 4acd84a..21a3ce7 100644 --- a/build.rs +++ b/build.rs @@ -1,6 +1,9 @@ fn main() { tonic_build::configure() .build_server(true) - .compile(&["proto/udf.proto", "proto/sink.proto"], &["proto"]) + .compile( + &["proto/map.proto", "proto/udf.proto", "proto/sink.proto"], + &["proto"], + ) .unwrap_or_else(|e| panic!("failed to compile the proto, {:?}", e)) } diff --git a/examples/map-cat/Cargo.toml b/examples/map-cat/Cargo.toml index f6fa75f..adf8a98 100644 --- a/examples/map-cat/Cargo.toml +++ b/examples/map-cat/Cargo.toml @@ -11,4 +11,4 @@ path = "src/main.rs" [dependencies] tonic = "0.9" tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } -numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch="main" } \ No newline at end of file +numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "main" } diff --git a/examples/map-cat/src/main.rs b/examples/map-cat/src/main.rs index 768e370..a44bd3f 100644 --- a/examples/map-cat/src/main.rs +++ b/examples/map-cat/src/main.rs @@ -1,4 +1,4 @@ -use numaflow::function::start_uds_server; +use numaflow::map::start_uds_server; #[tokio::main] async fn main() -> Result<(), Box> { @@ -18,33 +18,19 @@ pub(crate) mod cat { } } - use numaflow::function; - use numaflow::function::{Datum, Message, Metadata}; - use tokio::sync::mpsc::Receiver; + use numaflow::map; #[tonic::async_trait] - impl function::FnHandler for Cat { - async fn map_handle(&self, input: T) -> Vec + impl map::Mapper for Cat { + async fn map(&self, input: T) -> Vec where - T: function::Datum + Send + Sync + 'static, + T: map::Datum + Send + Sync + 'static, { - vec![function::Message { + vec![map::Message { keys: input.keys().clone(), value: input.value().clone(), tags: vec![], }] } - - async fn reduce_handle< - T: Datum + Send + Sync + 'static, - U: Metadata + Send + Sync + 'static, - >( - &self, - _: Vec, - _: Receiver, - _: &U, - ) -> Vec { - todo!() - } } } diff --git a/examples/map-tickgen-serde/Cargo.toml b/examples/map-tickgen-serde/Cargo.toml index 4d32aaf..b3ffc66 100644 --- a/examples/map-tickgen-serde/Cargo.toml +++ b/examples/map-tickgen-serde/Cargo.toml @@ -13,4 +13,4 @@ tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } serde = { version = "1.0.103", features = ["derive"] } serde_json = "1.0.103" chrono = "0.4.26" -numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch="main" } +numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "main" } diff --git a/examples/map-tickgen-serde/src/main.rs b/examples/map-tickgen-serde/src/main.rs index f3cf7ea..78fefb1 100644 --- a/examples/map-tickgen-serde/src/main.rs +++ b/examples/map-tickgen-serde/src/main.rs @@ -1,4 +1,4 @@ -use numaflow::function::start_uds_server; +use numaflow::map::start_uds_server; #[tokio::main] async fn main() -> Result<(), Box> { @@ -11,7 +11,7 @@ async fn main() -> Result<(), Box> { pub(crate) mod tickgen { use chrono::{SecondsFormat, TimeZone, Utc}; - use numaflow::function; + use numaflow::map; use numaflow::function::{Datum, Message, Metadata}; use serde::Serialize; use tokio::sync::mpsc::Receiver; @@ -69,17 +69,17 @@ pub(crate) mod tickgen { } #[tonic::async_trait] - impl function::FnHandler for TickGen { - async fn map_handle( + impl map::Mapper for TickGen { + async fn map( &self, input: T, - ) -> Vec { + ) -> Vec { let value = input.value(); if let Ok(payload) = serde_json::from_slice::(value) { let ts = Utc .timestamp_nanos(payload.created_ts) .to_rfc3339_opts(SecondsFormat::Nanos, true); - vec![function::Message { + vec![map::Message { keys: input.keys().clone(), value: serde_json::to_vec(&ResultPayload { value: payload.data.value, @@ -92,17 +92,5 @@ pub(crate) mod tickgen { vec![] } } - - async fn reduce_handle< - T: Datum + Send + Sync + 'static, - U: Metadata + Send + Sync + 'static, - >( - &self, - _: Vec, - _: Receiver, - _: &U, - ) -> Vec { - todo!() - } } } diff --git a/proto/map.proto b/proto/map.proto new file mode 100644 index 0000000..a8ab49b --- /dev/null +++ b/proto/map.proto @@ -0,0 +1,43 @@ +syntax = "proto3"; + +import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; + +package map.v1; + +service Map { + // MapFn applies a function to each map request element. + rpc MapFn(MapRequest) returns (MapResponse); + + // IsReady is the heartbeat endpoint for gRPC. + rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); +} + +/** + * MapRequest represents a request element. + */ +message MapRequest { + repeated string keys = 1; + bytes value = 2; + google.protobuf.Timestamp event_time = 3; + google.protobuf.Timestamp watermark = 4; +} + +/** + * MapResponse represents a response element. + */ +message MapResponse { + message Result { + repeated string keys = 1; + bytes value = 2; + repeated string tags = 3; + } + repeated Result results = 1; +} + +/** + * ReadyResponse is the health check result. + */ +message ReadyResponse { + bool ready = 1; +} \ No newline at end of file diff --git a/src/function.rs b/src/function.rs index 41b9fe5..c6a7edd 100644 --- a/src/function.rs +++ b/src/function.rs @@ -15,7 +15,7 @@ use user_defined_function::user_defined_function_server; use user_defined_function::user_defined_function_server::UserDefinedFunction; use user_defined_function::{DatumRequest, DatumResponse, DatumResponseList, ReadyResponse}; -use crate::startup; +use crate::shared; mod user_defined_function { tonic::include_proto!("function.v1"); @@ -397,7 +397,7 @@ where { // TODO: make port configurable and pass it to info_file let addr = "0.0.0.0:55551"; - startup::write_info_file(); + shared::write_info_file(); let addr: SocketAddr = addr.parse().unwrap(); @@ -434,7 +434,7 @@ pub async fn start_uds_server(m: T) -> Result<(), Box> where T: FnHandler + Send + Sync + 'static, { - startup::write_info_file(); + shared::write_info_file(); let path = "/var/run/numaflow/function.sock"; fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?; diff --git a/src/lib.rs b/src/lib.rs index 13a8e58..fd0fef9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,8 +10,11 @@ //! [Reduce]: https://numaflow.numaproj.io/user-guide/user-defined-functions/reduce/reduce/ //! [User Defined Sinks]: https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/ -// start up code -mod startup; +/// start up code +mod shared; + +/// map is for writing the [map](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/) handlers. +pub mod map; /// map and reduce for writing [map and reduce](https://numaflow.numaproj.io/user-guide/user-defined-functions/user-defined-functions/) handlers. pub mod function; diff --git a/src/map.rs b/src/map.rs new file mode 100644 index 0000000..0012594 --- /dev/null +++ b/src/map.rs @@ -0,0 +1,183 @@ +use chrono::{DateTime, Utc}; +use tonic::{async_trait, Request, Response, Status}; + +use crate::map::mapper::{map_response, map_server, MapRequest, MapResponse, ReadyResponse}; +use crate::shared; + +mod mapper { + tonic::include_proto!("map.v1"); +} + +struct MapService { + handler: T, +} + +/// Mapper trait for implementing Map handler. +#[async_trait] +pub trait Mapper { + /// map_handle takes in an input element can can produce 0, 1, or more results. The input is a [`Datum`] + /// and the output is a [`Vec`] of [`Message`]. In a `map` function, each element is processed + /// independently and there is no state associated with the elements. More about map can be read + /// [here](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/#map-udf). + /// + /// # Example + /// + /// Following is an example of a cat container that just copies the input to output. + /// + /// ```rust + /// use numaflow::map::start_uds_server; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let map_handler = cat::Cat::new(); + /// + /// start_uds_server(map_handler).await?; + /// + /// Ok(()) + /// } + /// + /// pub(crate) mod cat { + /// pub(crate) struct Cat {} + /// + /// impl Cat { + /// pub(crate) fn new() -> Self { + /// Self {} + /// } + /// } + /// + /// use numaflow::map; + /// + /// #[tonic::async_trait] + /// impl map::Mapper for Cat { + /// async fn map(&self, input: T) -> Vec + /// where + /// T: map::Datum + Send + Sync + 'static, + /// { + /// vec![map::Message { + /// keys: input.keys().clone(), + /// value: input.value().clone(), + /// tags: vec![], + /// }] + /// } + /// } + /// } + /// ``` + async fn map(&self, input: T) -> Vec; +} + +#[async_trait] +impl map_server::Map for MapService +where + T: Mapper + Send + Sync + 'static, +{ + async fn map_fn(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + + // call the map handle + let result = self.handler.map(OwnedMapRequest::new(request)).await; + + let mut response_list = vec![]; + // build the response struct + for message in result { + let datum_response = map_response::Result { + keys: message.keys, + value: message.value, + tags: message.tags, + }; + response_list.push(datum_response); + } + + // return the result + Ok(Response::new(MapResponse { + results: response_list, + })) + } + + async fn is_ready(&self, _: Request<()>) -> Result, Status> { + Ok(Response::new(ReadyResponse { ready: true })) + } +} + +pub struct Message { + /// Keys are a collection of strings which will be passed on to the next vertex as is. It can + /// be an empty collection. + pub keys: Vec, + /// Value is the value passed to the next vertex. + pub value: Vec, + /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). + pub tags: Vec, +} + +/// Datum trait represents an incoming element into the map/reduce handles of [`FnHandler`]. +pub trait Datum { + /// keys are the keys in the (key, value) terminology of map/reduce paradigm. + /// Once called, it will replace the content with None, so subsequent calls will return None + fn keys(&self) -> &Vec; + /// value is the value in (key, value) terminology of map/reduce paradigm. + /// Once called, it will replace the content with None, so subsequent calls will return None + fn value(&self) -> &Vec; + /// [watermark](https://numaflow.numaproj.io/core-concepts/watermarks/) represented by time is a guarantee that we will not see an element older than this + /// time. + fn watermark(&self) -> DateTime; + /// event_time is the time of the element as seen at source or aligned after a reduce operation. + fn event_time(&self) -> DateTime; +} + +/// Owned copy of MapRequest from Datum. +struct OwnedMapRequest { + keys: Vec, + value: Vec, + watermark: DateTime, + eventtime: DateTime, +} + +impl OwnedMapRequest { + fn new(mr: MapRequest) -> Self { + Self { + keys: mr.keys, + value: mr.value, + watermark: shared::utc_from_timestamp(mr.watermark), + eventtime: shared::utc_from_timestamp(mr.event_time), + } + } +} + +impl Datum for OwnedMapRequest { + fn keys(&self) -> &Vec { + &self.keys + } + + fn value(&self) -> &Vec { + &self.value + } + + fn watermark(&self) -> DateTime { + self.watermark + } + + fn event_time(&self) -> DateTime { + self.eventtime + } +} + +pub async fn start_uds_server(m: T) -> Result<(), Box> +where + T: Mapper + Send + Sync + 'static, +{ + shared::write_info_file(); + + let path = "/var/run/numaflow/map.sock"; + std::fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?; + + let uds = tokio::net::UnixListener::bind(path)?; + let _uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds); + + let map_svc = MapService { handler: m }; + + tonic::transport::Server::builder() + .add_service(map_server::MapServer::new(map_svc)) + .serve_with_incoming(_uds_stream) + .await?; + + Ok(()) +} diff --git a/src/startup.rs b/src/shared.rs similarity index 73% rename from src/startup.rs rename to src/shared.rs index 0e86a92..ce476af 100644 --- a/src/startup.rs +++ b/src/shared.rs @@ -1,6 +1,9 @@ use std::collections::HashMap; use std::fs; +use chrono::{DateTime, TimeZone, Utc}; +use prost_types::Timestamp; + pub(crate) fn write_info_file() { let path = if let Some(_) = std::env::var_os("NUMAFLOW_POD") { "/var/run/numaflow/server-info" @@ -23,3 +26,11 @@ pub(crate) fn write_info_file() { println!("wrote to {} {}", path, content); fs::write(path, content).unwrap(); } + +pub(crate) fn utc_from_timestamp(t: Option) -> DateTime { + if let Some(ref t) = t { + Utc.timestamp_nanos(t.seconds * (t.nanos as i64)) + } else { + Utc.timestamp_nanos(-1) + } +} diff --git a/src/sink.rs b/src/sink.rs index 125de07..521a96f 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -1,5 +1,4 @@ -use chrono::{DateTime, TimeZone, Utc}; -use prost_types::Timestamp; +use chrono::{DateTime, Utc}; use tokio::sync::mpsc; use tonic::transport::Server; use tonic::{Request, Status, Streaming}; @@ -7,8 +6,8 @@ use tonic::{Request, Status, Streaming}; use sinker_grpc::sink_server::SinkServer; use sinker_grpc::{ReadyResponse, SinkRequest, SinkResponse}; +use crate::shared; use crate::sink::sinker_grpc::sink_server::Sink; -use crate::startup; mod sinker_grpc { tonic::include_proto!("sink.v1"); @@ -123,6 +122,7 @@ pub trait Datum { fn id(&self) -> &str; } +/// Owned copy of SinkRequest from tonic. struct OwnedSinkRequest { keys: Vec, value: Vec, @@ -131,21 +131,13 @@ struct OwnedSinkRequest { id: String, } -fn utc_from_timestamp(t: Option) -> DateTime { - if let Some(ref t) = t { - Utc.timestamp_nanos(t.seconds * (t.nanos as i64)) - } else { - Utc.timestamp_nanos(-1) - } -} - impl OwnedSinkRequest { fn new(sr: SinkRequest) -> Self { Self { keys: sr.keys, value: sr.value, - watermark: utc_from_timestamp(sr.watermark), - eventtime: utc_from_timestamp(sr.event_time), + watermark: shared::utc_from_timestamp(sr.watermark), + eventtime: shared::utc_from_timestamp(sr.event_time), id: sr.id, } } @@ -233,7 +225,7 @@ pub async fn start_uds_server(m: T) -> Result<(), Box> where T: Sinker + Send + Sync + 'static, { - startup::write_info_file(); + shared::write_info_file(); let path = "/var/run/numaflow/sink.sock"; fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?;