Skip to content

Commit

Permalink
chore: Batch ack requests and sink responses for better performance (#…
Browse files Browse the repository at this point in the history
…100)

Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Oct 30, 2024
1 parent 9fb3c0a commit b59dc12
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 43 deletions.
8 changes: 5 additions & 3 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ pub(crate) mod simple_source {
self.yet_to_ack.write().unwrap().extend(message_offsets)
}

async fn ack(&self, offset: Offset) {
let x = &String::from_utf8(offset.offset).unwrap();
self.yet_to_ack.write().unwrap().remove(x);
async fn ack(&self, offset: Vec<Offset>) {
for offset in offset {
let x = &String::from_utf8(offset.offset).unwrap();
self.yet_to_ack.write().unwrap().remove(x);
}
}

async fn pending(&self) -> usize {
Expand Down
2 changes: 1 addition & 1 deletion proto/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ message SinkResponse {
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
}
Result result = 1;
repeated Result results = 1;
optional Handshake handshake = 2;
optional TransmissionStatus status = 3;
}
2 changes: 1 addition & 1 deletion proto/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ message ReadResponse {
message AckRequest {
message Request {
// Required field holding the offset to be acked
Offset offset = 1;
repeated Offset offsets = 1;
}
// Required field holding the request. The list will be ordered and will have the same order as the original Read response.
Request request = 1;
Expand Down
4 changes: 2 additions & 2 deletions src/servers/sink.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ pub struct TransmissionStatus {
/// SinkResponse is the individual response of each message written to the sink.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SinkResponse {
#[prost(message, optional, tag = "1")]
pub result: ::core::option::Option<sink_response::Result>,
#[prost(message, repeated, tag = "1")]
pub results: ::prost::alloc::vec::Vec<sink_response::Result>,
#[prost(message, optional, tag = "2")]
pub handshake: ::core::option::Option<Handshake>,
#[prost(message, optional, tag = "3")]
Expand Down
4 changes: 2 additions & 2 deletions src/servers/source.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ pub mod ack_request {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Request {
/// Required field holding the offset to be acked
#[prost(message, optional, tag = "1")]
pub offset: ::core::option::Option<super::Offset>,
#[prost(message, repeated, tag = "1")]
pub offsets: ::prost::alloc::vec::Vec<super::Offset>,
}
}
///
Expand Down
40 changes: 20 additions & 20 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ where
// loop until the global stream has been shutdown.
let mut global_stream_ended = false;
while !global_stream_ended {
let start = std::time::Instant::now();
// for every batch, we need to read from the stream. The end-of-batch is
// encoded in the request.
global_stream_ended = Self::process_sink_batch(
Expand All @@ -240,6 +241,7 @@ where
grpc_resp_tx.clone(),
)
.await?;
println!("Time taken for batch: {:?}", start.elapsed().as_micros());
}
Ok(())
}
Expand All @@ -259,20 +261,19 @@ where
// spawn the UDF
let sinker_handle = tokio::spawn(async move {
let responses = sink_handle.sink(rx).await;
for response in responses {
resp_tx
.send(Ok(SinkResponse {
result: Some(response.into()),
handshake: None,
status: None,
}))
.await
.expect("Sending response to channel");
}
resp_tx
.send(Ok(SinkResponse {
results: responses.into_iter().map(|r| r.into()).collect(),
handshake: None,
status: None,
}))
.await
.expect("Sending response to channel");

// send an EOT message to the client to indicate the end of transmission for this batch
resp_tx
.send(Ok(SinkResponse {
result: None,
results: vec![],
handshake: None,
status: Some(sink_pb::TransmissionStatus { eot: true }),
}))
Expand Down Expand Up @@ -385,7 +386,7 @@ where
if let Some(handshake) = handshake_request.handshake {
resp_tx
.send(Ok(SinkResponse {
result: None,
results: vec![],
handshake: Some(handshake),
status: None,
}))
Expand Down Expand Up @@ -641,32 +642,31 @@ mod tests {
let mut resp_stream = resp.into_inner();
// handshake response
let resp = resp_stream.message().await.unwrap().unwrap();
assert!(resp.result.is_none());
assert!(resp.handshake.is_some());

let resp = resp_stream.message().await.unwrap().unwrap();
assert!(resp.result.is_some());
let msg = &resp.result.unwrap();
assert!(!resp.results.is_empty());
let msg = &resp.results.get(0).unwrap();
assert_eq!(msg.err_msg, "");
assert_eq!(msg.id, "1");

// eot for first request
let resp = resp_stream.message().await.unwrap().unwrap();
assert!(resp.result.is_none());
assert!(resp.results.is_empty());
assert!(resp.handshake.is_none());
let msg = &resp.status.unwrap();
assert!(msg.eot);

let resp = resp_stream.message().await.unwrap().unwrap();
assert!(resp.result.is_some());
assert!(!resp.results.is_empty());
assert!(resp.handshake.is_none());
let msg = &resp.result.unwrap();
let msg = &resp.results.get(0).unwrap();
assert_eq!(msg.err_msg, "");
assert_eq!(msg.id, "2");

// eot for second request
let resp = resp_stream.message().await.unwrap().unwrap();
assert!(resp.result.is_none());
assert!(resp.results.is_empty());
assert!(resp.handshake.is_none());
let msg = &resp.status.unwrap();
assert!(msg.eot);
Expand Down Expand Up @@ -773,7 +773,7 @@ mod tests {

// handshake response
let resp = resp_stream.message().await.unwrap().unwrap();
assert!(resp.result.is_none());
assert!(resp.results.is_empty());
assert!(resp.handshake.is_some());

let err_resp = resp_stream.message().await;
Expand Down
32 changes: 18 additions & 14 deletions src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub trait Sourcer {
/// Reads the messages from the source and sends them to the transmitter.
async fn read(&self, request: SourceReadRequest, transmitter: Sender<Message>);
/// Acknowledges the message that has been processed by the user-defined source.
async fn ack(&self, offset: Offset);
async fn ack(&self, offset: Vec<Offset>);
/// Returns the number of messages that are yet to be processed by the user-defined source.
async fn pending(&self) -> usize;
/// Returns the partitions associated with the source. This will be used by the platform to determine
Expand Down Expand Up @@ -275,14 +275,16 @@ where
let request = ack_request.request
.ok_or_else(|| SourceError(ErrorKind::InternalError("Invalid request, request can't be empty".to_string())))?;

let offset = request.offset
.ok_or_else(|| SourceError(ErrorKind::InternalError("Invalid request, offset can't be empty".to_string())))?;

handler_fn
.ack(Offset {
offset: offset.offset,
let offsets = request.offsets
.iter()
.map(|offset| Offset {
offset: offset.offset.clone(),
partition_id: offset.partition_id,
})
.collect();

handler_fn
.ack(offsets)
.await;

// the return of handler_fn implicitly means that the ack is successful; hence
Expand Down Expand Up @@ -602,11 +604,13 @@ mod tests {
self.yet_to_ack.write().unwrap().extend(message_offsets)
}

async fn ack(&self, offset: Offset) {
self.yet_to_ack
.write()
.unwrap()
.remove(&String::from_utf8(offset.offset).unwrap());
async fn ack(&self, offset: Vec<Offset>) {
for offset in offset {
self.yet_to_ack
.write()
.unwrap()
.remove(&String::from_utf8(offset.offset).unwrap());
}
}

async fn pending(&self) -> usize {
Expand Down Expand Up @@ -705,10 +709,10 @@ mod tests {
for resp in response_values.iter() {
let ack_request = proto::AckRequest {
request: Some(proto::ack_request::Request {
offset: Some(proto::Offset {
offsets: vec![proto::Offset {
offset: resp.offset.clone().unwrap().offset,
partition_id: resp.offset.clone().unwrap().partition_id,
}),
}],
}),
handshake: None,
};
Expand Down

0 comments on commit b59dc12

Please sign in to comment.