Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Batch ack requests and sink responses for better performance #100

Merged
merged 3 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ pub(crate) mod simple_source {
self.yet_to_ack.write().unwrap().extend(message_offsets)
}

async fn ack(&self, offset: Offset) {
let x = &String::from_utf8(offset.offset).unwrap();
self.yet_to_ack.write().unwrap().remove(x);
async fn ack(&self, offset: Vec<Offset>) {
for offset in offset {
let x = &String::from_utf8(offset.offset).unwrap();
self.yet_to_ack.write().unwrap().remove(x);
}
}

async fn pending(&self) -> usize {
Expand Down
2 changes: 1 addition & 1 deletion proto/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ message SinkResponse {
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
}
Result result = 1;
repeated Result results = 1;
optional Handshake handshake = 2;
optional TransmissionStatus status = 3;
}
2 changes: 1 addition & 1 deletion proto/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ message ReadResponse {
message AckRequest {
message Request {
// Required field holding the offset to be acked
Offset offset = 1;
repeated Offset offsets = 1;
}
// Required field holding the request. The list will be ordered and will have the same order as the original Read response.
Request request = 1;
Expand Down
4 changes: 2 additions & 2 deletions src/servers/sink.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ pub struct TransmissionStatus {
/// SinkResponse is the individual response of each message written to the sink.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SinkResponse {
#[prost(message, optional, tag = "1")]
pub result: ::core::option::Option<sink_response::Result>,
#[prost(message, repeated, tag = "1")]
pub results: ::prost::alloc::vec::Vec<sink_response::Result>,
#[prost(message, optional, tag = "2")]
pub handshake: ::core::option::Option<Handshake>,
#[prost(message, optional, tag = "3")]
Expand Down
4 changes: 2 additions & 2 deletions src/servers/source.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ pub mod ack_request {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Request {
/// Required field holding the offset to be acked
#[prost(message, optional, tag = "1")]
pub offset: ::core::option::Option<super::Offset>,
#[prost(message, repeated, tag = "1")]
pub offsets: ::prost::alloc::vec::Vec<super::Offset>,
}
}
///
Expand Down
40 changes: 20 additions & 20 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ where
// loop until the global stream has been shutdown.
let mut global_stream_ended = false;
while !global_stream_ended {
let start = std::time::Instant::now();
// for every batch, we need to read from the stream. The end-of-batch is
// encoded in the request.
global_stream_ended = Self::process_sink_batch(
Expand All @@ -240,6 +241,7 @@ where
grpc_resp_tx.clone(),
)
.await?;
println!("Time taken for batch: {:?}", start.elapsed().as_micros());
}
Ok(())
}
Expand All @@ -259,20 +261,19 @@ where
// spawn the UDF
let sinker_handle = tokio::spawn(async move {
let responses = sink_handle.sink(rx).await;
for response in responses {
resp_tx
.send(Ok(SinkResponse {
result: Some(response.into()),
handshake: None,
status: None,
}))
.await
.expect("Sending response to channel");
}
resp_tx
.send(Ok(SinkResponse {
results: responses.into_iter().map(|r| r.into()).collect(),
handshake: None,
status: None,
}))
.await
.expect("Sending response to channel");

// send an EOT message to the client to indicate the end of transmission for this batch
resp_tx
.send(Ok(SinkResponse {
result: None,
results: vec![],
handshake: None,
status: Some(sink_pb::TransmissionStatus { eot: true }),
}))
Expand Down Expand Up @@ -385,7 +386,7 @@ where
if let Some(handshake) = handshake_request.handshake {
resp_tx
.send(Ok(SinkResponse {
result: None,
results: vec![],
handshake: Some(handshake),
status: None,
}))
Expand Down Expand Up @@ -641,32 +642,31 @@ mod tests {
let mut resp_stream = resp.into_inner();
// handshake response
let resp = resp_stream.message().await.unwrap().unwrap();
assert!(resp.result.is_none());
assert!(resp.handshake.is_some());

let resp = resp_stream.message().await.unwrap().unwrap();
assert!(resp.result.is_some());
let msg = &resp.result.unwrap();
assert!(!resp.results.is_empty());
let msg = &resp.results.get(0).unwrap();
assert_eq!(msg.err_msg, "");
assert_eq!(msg.id, "1");

// eot for first request
let resp = resp_stream.message().await.unwrap().unwrap();
assert!(resp.result.is_none());
assert!(resp.results.is_empty());
assert!(resp.handshake.is_none());
let msg = &resp.status.unwrap();
assert!(msg.eot);

let resp = resp_stream.message().await.unwrap().unwrap();
assert!(resp.result.is_some());
assert!(!resp.results.is_empty());
assert!(resp.handshake.is_none());
let msg = &resp.result.unwrap();
let msg = &resp.results.get(0).unwrap();
assert_eq!(msg.err_msg, "");
assert_eq!(msg.id, "2");

// eot for second request
let resp = resp_stream.message().await.unwrap().unwrap();
assert!(resp.result.is_none());
assert!(resp.results.is_empty());
assert!(resp.handshake.is_none());
let msg = &resp.status.unwrap();
assert!(msg.eot);
Expand Down Expand Up @@ -773,7 +773,7 @@ mod tests {

// handshake response
let resp = resp_stream.message().await.unwrap().unwrap();
assert!(resp.result.is_none());
assert!(resp.results.is_empty());
assert!(resp.handshake.is_some());

let err_resp = resp_stream.message().await;
Expand Down
32 changes: 18 additions & 14 deletions src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub trait Sourcer {
/// Reads the messages from the source and sends them to the transmitter.
async fn read(&self, request: SourceReadRequest, transmitter: Sender<Message>);
/// Acknowledges the message that has been processed by the user-defined source.
async fn ack(&self, offset: Offset);
async fn ack(&self, offset: Vec<Offset>);
/// Returns the number of messages that are yet to be processed by the user-defined source.
async fn pending(&self) -> usize;
/// Returns the partitions associated with the source. This will be used by the platform to determine
Expand Down Expand Up @@ -275,14 +275,16 @@ where
let request = ack_request.request
.ok_or_else(|| SourceError(ErrorKind::InternalError("Invalid request, request can't be empty".to_string())))?;

let offset = request.offset
.ok_or_else(|| SourceError(ErrorKind::InternalError("Invalid request, offset can't be empty".to_string())))?;

handler_fn
.ack(Offset {
offset: offset.offset,
let offsets = request.offsets
.iter()
.map(|offset| Offset {
offset: offset.offset.clone(),
partition_id: offset.partition_id,
})
.collect();

handler_fn
.ack(offsets)
.await;

// the return of handler_fn implicitly means that the ack is successful; hence
Expand Down Expand Up @@ -602,11 +604,13 @@ mod tests {
self.yet_to_ack.write().unwrap().extend(message_offsets)
}

async fn ack(&self, offset: Offset) {
self.yet_to_ack
.write()
.unwrap()
.remove(&String::from_utf8(offset.offset).unwrap());
async fn ack(&self, offset: Vec<Offset>) {
for offset in offset {
self.yet_to_ack
.write()
.unwrap()
.remove(&String::from_utf8(offset.offset).unwrap());
}
}

async fn pending(&self) -> usize {
Expand Down Expand Up @@ -705,10 +709,10 @@ mod tests {
for resp in response_values.iter() {
let ack_request = proto::AckRequest {
request: Some(proto::ack_request::Request {
offset: Some(proto::Offset {
offsets: vec![proto::Offset {
offset: resp.offset.clone().unwrap().offset,
partition_id: resp.offset.clone().unwrap().partition_id,
}),
}],
}),
handshake: None,
};
Expand Down
Loading