Skip to content

Commit

Permalink
feat: Bidirectional Streaming for Source (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
yhl25 authored Sep 14, 2024
1 parent d3afabd commit 8e170f4
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 165 deletions.
8 changes: 3 additions & 5 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,9 @@ pub(crate) mod simple_source {
self.yet_to_ack.write().unwrap().extend(message_offsets)
}

async fn ack(&self, offsets: Vec<Offset>) {
for offset in offsets {
let x = &String::from_utf8(offset.offset).unwrap();
self.yet_to_ack.write().unwrap().remove(x);
}
async fn ack(&self, offset: Offset) {
let x = &String::from_utf8(offset.offset).unwrap();
self.yet_to_ack.write().unwrap().remove(x);
}

async fn pending(&self) -> usize {
Expand Down
48 changes: 36 additions & 12 deletions proto/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ package source.v1;

service Source {
// Read returns a stream of datum responses.
// The size of the returned ReadResponse is less than or equal to the num_records specified in ReadRequest.
// If the request timeout is reached on server side, the returned ReadResponse will contain all the datum that have been read (which could be an empty list).
rpc ReadFn(ReadRequest) returns (stream ReadResponse);
// The size of the returned ReadResponse is less than or equal to the num_records specified in each ReadRequest.
// If the request timeout is reached on the server side, the returned ReadResponse will contain all the datum that have been read (which could be an empty list).
// The server will continue to read and respond to subsequent ReadRequests until the client closes the stream.
rpc ReadFn(stream ReadRequest) returns (stream ReadResponse);

// AckFn acknowledges a list of datum offsets.
// AckFn acknowledges a stream of datum offsets.
// When AckFn is called, it implicitly indicates that the datum stream has been processed by the source vertex.
// The caller (numa) expects the AckFn to be successful, and it does not expect any errors.
// If there are some irrecoverable errors when the callee (UDSource) is processing the AckFn request,
// then it is best to crash because there are no other retry mechanisms possible.
rpc AckFn(AckRequest) returns (AckResponse);
// then it is best to crash because there are no other retry mechanisms possible.
rpc AckFn(stream AckRequest) returns (AckResponse);

// PendingFn returns the number of pending records at the user defined source.
rpc PendingFn(google.protobuf.Empty) returns (PendingResponse);
Expand Down Expand Up @@ -60,9 +61,35 @@ message ReadResponse {
// We add this optional field to support the use case where the user defined source can provide keys for the datum.
// e.g. Kafka and Redis Stream message usually include information about the keys.
repeated string keys = 4;
// Optional list of headers associated with the datum.
// Headers are the metadata associated with the datum.
// e.g. Kafka and Redis Stream message usually include information about the headers.
map<string, string> headers = 5;
}
message Status {
// Code to indicate the status of the response.
enum Code {
SUCCESS = 0;
FAILURE = 1;
}

// Error to indicate the error type. If the code is FAILURE, then the error field will be populated.
enum Error {
UNACKED = 0;
OTHER = 1;
}

// End of transmission flag.
bool eot = 1;
Code code = 2;
Error error = 3;
optional string msg = 4;
}
// Required field holding the result.
Result result = 1;
// Status of the response. Holds the end of transmission flag and the status code.
//
Status status = 2;
}

/*
Expand All @@ -71,11 +98,8 @@ message ReadResponse {
*/
message AckRequest {
message Request {
// Required field holding a list of offsets to be acknowledged.
// The offsets must be strictly corresponding to the previously read batch,
// meaning the offsets must be in the same order as the datum responses in the ReadResponse.
// By enforcing ordering, we can save deserialization effort on the server side, assuming the server keeps a local copy of the raw/un-serialized offsets.
repeated Offset offsets = 1;
// Required field holding the offset to be acked
Offset offset = 1;
}
// Required field holding the request. The list will be ordered and will have the same order as the original Read response.
Request request = 1;
Expand Down Expand Up @@ -146,4 +170,4 @@ message Offset {
// It is useful for sources that have multiple partitions. e.g. Kafka.
// If the partition_id is not specified, it is assumed that the source has a single partition.
int32 partition_id = 2;
}
}
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use thiserror::Error;

pub type Result<T> = std::result::Result<T, Error>;

#[derive(Error, Debug, Clone)]
pub enum ErrorKind {
#[error("User Defined Error: {0}")]
Expand Down
Loading

0 comments on commit 8e170f4

Please sign in to comment.