Skip to content

Commit

Permalink
chore: update map to new proto (#8)
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith authored Aug 29, 2023
1 parent 2b44cdc commit c49c744
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 60 deletions.
5 changes: 4 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
fn main() {
tonic_build::configure()
.build_server(true)
.compile(&["proto/udf.proto", "proto/sink.proto"], &["proto"])
.compile(
&["proto/map.proto", "proto/udf.proto", "proto/sink.proto"],
&["proto"],
)
.unwrap_or_else(|e| panic!("failed to compile the proto, {:?}", e))
}
2 changes: 1 addition & 1 deletion examples/map-cat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ path = "src/main.rs"
[dependencies]
tonic = "0.9"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch="main" }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "main" }
26 changes: 6 additions & 20 deletions examples/map-cat/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use numaflow::function::start_uds_server;
use numaflow::map::start_uds_server;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -18,33 +18,19 @@ pub(crate) mod cat {
}
}

use numaflow::function;
use numaflow::function::{Datum, Message, Metadata};
use tokio::sync::mpsc::Receiver;
use numaflow::map;

#[tonic::async_trait]
impl function::FnHandler for Cat {
async fn map_handle<T>(&self, input: T) -> Vec<function::Message>
impl map::Mapper for Cat {
async fn map<T>(&self, input: T) -> Vec<map::Message>
where
T: function::Datum + Send + Sync + 'static,
T: map::Datum + Send + Sync + 'static,
{
vec![function::Message {
vec![map::Message {
keys: input.keys().clone(),
value: input.value().clone(),
tags: vec![],
}]
}

async fn reduce_handle<
T: Datum + Send + Sync + 'static,
U: Metadata + Send + Sync + 'static,
>(
&self,
_: Vec<String>,
_: Receiver<T>,
_: &U,
) -> Vec<Message> {
todo!()
}
}
}
2 changes: 1 addition & 1 deletion examples/map-tickgen-serde/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
serde = { version = "1.0.103", features = ["derive"] }
serde_json = "1.0.103"
chrono = "0.4.26"
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch="main" }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "main" }
24 changes: 6 additions & 18 deletions examples/map-tickgen-serde/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use numaflow::function::start_uds_server;
use numaflow::map::start_uds_server;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -11,7 +11,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

pub(crate) mod tickgen {
use chrono::{SecondsFormat, TimeZone, Utc};
use numaflow::function;
use numaflow::map;
use numaflow::function::{Datum, Message, Metadata};
use serde::Serialize;
use tokio::sync::mpsc::Receiver;
Expand Down Expand Up @@ -69,17 +69,17 @@ pub(crate) mod tickgen {
}

#[tonic::async_trait]
impl function::FnHandler for TickGen {
async fn map_handle<T: function::Datum + Send + Sync + 'static>(
impl map::Mapper for TickGen {
async fn map<T: map::Datum + Send + Sync + 'static>(
&self,
input: T,
) -> Vec<function::Message> {
) -> Vec<map::Message> {
let value = input.value();
if let Ok(payload) = serde_json::from_slice::<Payload>(value) {
let ts = Utc
.timestamp_nanos(payload.created_ts)
.to_rfc3339_opts(SecondsFormat::Nanos, true);
vec![function::Message {
vec![map::Message {
keys: input.keys().clone(),
value: serde_json::to_vec(&ResultPayload {
value: payload.data.value,
Expand All @@ -92,17 +92,5 @@ pub(crate) mod tickgen {
vec![]
}
}

async fn reduce_handle<
T: Datum + Send + Sync + 'static,
U: Metadata + Send + Sync + 'static,
>(
&self,
_: Vec<String>,
_: Receiver<T>,
_: &U,
) -> Vec<Message> {
todo!()
}
}
}
43 changes: 43 additions & 0 deletions proto/map.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

package map.v1;

service Map {
// MapFn applies a function to each map request element.
rpc MapFn(MapRequest) returns (MapResponse);

// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}

/**
* MapRequest represents a request element.
*/
message MapRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
}

/**
* MapResponse represents a response element.
*/
message MapResponse {
message Result {
repeated string keys = 1;
bytes value = 2;
repeated string tags = 3;
}
repeated Result results = 1;
}

/**
* ReadyResponse is the health check result.
*/
message ReadyResponse {
bool ready = 1;
}
6 changes: 3 additions & 3 deletions src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use user_defined_function::user_defined_function_server;
use user_defined_function::user_defined_function_server::UserDefinedFunction;
use user_defined_function::{DatumRequest, DatumResponse, DatumResponseList, ReadyResponse};

use crate::startup;
use crate::shared;

mod user_defined_function {
tonic::include_proto!("function.v1");
Expand Down Expand Up @@ -397,7 +397,7 @@ where
{
// TODO: make port configurable and pass it to info_file
let addr = "0.0.0.0:55551";
startup::write_info_file();
shared::write_info_file();

let addr: SocketAddr = addr.parse().unwrap();

Expand Down Expand Up @@ -434,7 +434,7 @@ pub async fn start_uds_server<T>(m: T) -> Result<(), Box<dyn std::error::Error>>
where
T: FnHandler + Send + Sync + 'static,
{
startup::write_info_file();
shared::write_info_file();

let path = "/var/run/numaflow/function.sock";
fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?;
Expand Down
7 changes: 5 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
//! [Reduce]: https://numaflow.numaproj.io/user-guide/user-defined-functions/reduce/reduce/
//! [User Defined Sinks]: https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/

// start up code
mod startup;
/// start up code
mod shared;

/// map is for writing the [map](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/) handlers.
pub mod map;

/// map and reduce for writing [map and reduce](https://numaflow.numaproj.io/user-guide/user-defined-functions/user-defined-functions/) handlers.
pub mod function;
Expand Down
183 changes: 183 additions & 0 deletions src/map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use chrono::{DateTime, Utc};
use tonic::{async_trait, Request, Response, Status};

use crate::map::mapper::{map_response, map_server, MapRequest, MapResponse, ReadyResponse};
use crate::shared;

mod mapper {
tonic::include_proto!("map.v1");
}

struct MapService<T> {
handler: T,
}

/// Mapper trait for implementing Map handler.
#[async_trait]
pub trait Mapper {
/// map_handle takes in an input element can can produce 0, 1, or more results. The input is a [`Datum`]
/// and the output is a [`Vec`] of [`Message`]. In a `map` function, each element is processed
/// independently and there is no state associated with the elements. More about map can be read
/// [here](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/#map-udf).
///
/// # Example
///
/// Following is an example of a cat container that just copies the input to output.
///
/// ```rust
/// use numaflow::map::start_uds_server;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let map_handler = cat::Cat::new();
///
/// start_uds_server(map_handler).await?;
///
/// Ok(())
/// }
///
/// pub(crate) mod cat {
/// pub(crate) struct Cat {}
///
/// impl Cat {
/// pub(crate) fn new() -> Self {
/// Self {}
/// }
/// }
///
/// use numaflow::map;
///
/// #[tonic::async_trait]
/// impl map::Mapper for Cat {
/// async fn map<T>(&self, input: T) -> Vec<map::Message>
/// where
/// T: map::Datum + Send + Sync + 'static,
/// {
/// vec![map::Message {
/// keys: input.keys().clone(),
/// value: input.value().clone(),
/// tags: vec![],
/// }]
/// }
/// }
/// }
/// ```
async fn map<T: Datum + Send + Sync + 'static>(&self, input: T) -> Vec<Message>;
}

#[async_trait]
impl<T> map_server::Map for MapService<T>
where
T: Mapper + Send + Sync + 'static,
{
async fn map_fn(&self, request: Request<MapRequest>) -> Result<Response<MapResponse>, Status> {
let request = request.into_inner();

// call the map handle
let result = self.handler.map(OwnedMapRequest::new(request)).await;

let mut response_list = vec![];
// build the response struct
for message in result {
let datum_response = map_response::Result {
keys: message.keys,
value: message.value,
tags: message.tags,
};
response_list.push(datum_response);
}

// return the result
Ok(Response::new(MapResponse {
results: response_list,
}))
}

async fn is_ready(&self, _: Request<()>) -> Result<Response<ReadyResponse>, Status> {
Ok(Response::new(ReadyResponse { ready: true }))
}
}

pub struct Message {
/// Keys are a collection of strings which will be passed on to the next vertex as is. It can
/// be an empty collection.
pub keys: Vec<String>,
/// Value is the value passed to the next vertex.
pub value: Vec<u8>,
/// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/).
pub tags: Vec<String>,
}

/// Datum trait represents an incoming element into the map/reduce handles of [`FnHandler`].
pub trait Datum {
/// keys are the keys in the (key, value) terminology of map/reduce paradigm.
/// Once called, it will replace the content with None, so subsequent calls will return None
fn keys(&self) -> &Vec<String>;
/// value is the value in (key, value) terminology of map/reduce paradigm.
/// Once called, it will replace the content with None, so subsequent calls will return None
fn value(&self) -> &Vec<u8>;
/// [watermark](https://numaflow.numaproj.io/core-concepts/watermarks/) represented by time is a guarantee that we will not see an element older than this
/// time.
fn watermark(&self) -> DateTime<Utc>;
/// event_time is the time of the element as seen at source or aligned after a reduce operation.
fn event_time(&self) -> DateTime<Utc>;
}

/// Owned copy of MapRequest from Datum.
struct OwnedMapRequest {
keys: Vec<String>,
value: Vec<u8>,
watermark: DateTime<Utc>,
eventtime: DateTime<Utc>,
}

impl OwnedMapRequest {
fn new(mr: MapRequest) -> Self {
Self {
keys: mr.keys,
value: mr.value,
watermark: shared::utc_from_timestamp(mr.watermark),
eventtime: shared::utc_from_timestamp(mr.event_time),
}
}
}

impl Datum for OwnedMapRequest {
fn keys(&self) -> &Vec<String> {
&self.keys
}

fn value(&self) -> &Vec<u8> {
&self.value
}

fn watermark(&self) -> DateTime<Utc> {
self.watermark
}

fn event_time(&self) -> DateTime<Utc> {
self.eventtime
}
}

pub async fn start_uds_server<T>(m: T) -> Result<(), Box<dyn std::error::Error>>
where
T: Mapper + Send + Sync + 'static,
{
shared::write_info_file();

let path = "/var/run/numaflow/map.sock";
std::fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?;

let uds = tokio::net::UnixListener::bind(path)?;
let _uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds);

let map_svc = MapService { handler: m };

tonic::transport::Server::builder()
.add_service(map_server::MapServer::new(map_svc))
.serve_with_incoming(_uds_stream)
.await?;

Ok(())
}
Loading

0 comments on commit c49c744

Please sign in to comment.