Skip to content

Commit

Permalink
chorre: fix code readability
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith committed Sep 13, 2024
1 parent c117aa9 commit 205517a
Showing 1 changed file with 86 additions and 62 deletions.
148 changes: 86 additions & 62 deletions src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ use std::time::Duration;
use crate::error::Error::SourceError;
use crate::error::{Error, ErrorKind};
use crate::shared::{self, prost_timestamp_from_utc};
use crate::source::proto::{AckRequest, AckResponse, ReadRequest};
use crate::source::proto::{AckRequest, AckResponse, ReadRequest, ReadResponse};
use chrono::{DateTime, Utc};
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;
use tonic::{async_trait, Request, Response, Status, Streaming};
use tracing::{error, info};

const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/source.sock";
Expand Down Expand Up @@ -80,12 +81,88 @@ pub struct Offset {
pub partition_id: i32,
}

impl<T> SourceService<T>
where
T: Sourcer + Send + Sync + 'static,
{
async fn write_a_batch(
grpc_resp_tx: Sender<Result<ReadResponse, Status>>,
mut srx: Receiver<Message>,
) -> crate::error::Result<()> {
// even though we use bi-di; the user-defined source sees this as a 1/2 duplex
// server side streaming. this means that the below while loop will terminate
// after every batch of read has been returned.
while let Some(resp) = srx.recv().await {
grpc_resp_tx
.send(Ok(ReadResponse {
result: Some(proto::read_response::Result {
payload: resp.value,
offset: Some(proto::Offset {
offset: resp.offset.offset,
partition_id: resp.offset.partition_id,
}),
event_time: prost_timestamp_from_utc(resp.event_time),
keys: resp.keys,
headers: Default::default(),
}),
status: None,
}))
.await
.map_err(|e| SourceError(ErrorKind::InternalError(e.to_string())))?;
}

// send end of transmission on success
grpc_resp_tx
.send(Ok(ReadResponse {
result: None,
status: Some(proto::read_response::Status {
eot: true,
code: 0,
error: 0,
msg: None,
}),
}))
.await
.map_err(|e| SourceError(ErrorKind::InternalError(e.to_string())))?;

Ok(())
}

async fn forward_a_batch(
handler_fn: Arc<T>,
grpc_resp_tx: Sender<Result<ReadResponse, Status>>,
stx: Sender<Message>,
srx: Receiver<Message>,
request: proto::read_request::Request,
) -> crate::error::Result<()> {
let grpc_writer_handle: JoinHandle<Result<(), Error>> =
tokio::spawn(async move { Self::write_a_batch(grpc_resp_tx, srx).await });

handler_fn
.read(
SourceReadRequest {
count: request.num_records as usize,
timeout: Duration::from_millis(request.timeout_in_ms as u64),
},
stx,
)
.await;

grpc_writer_handle
.await
.map_err(|e| SourceError(ErrorKind::InternalError(e.to_string())))?
.map_err(|e| SourceError(ErrorKind::InternalError(e.to_string())))?;

Ok(())
}
}

#[async_trait]
impl<T> proto::source_server::Source for SourceService<T>
where
T: Sourcer + Send + Sync + 'static,
{
type ReadFnStream = ReceiverStream<Result<proto::ReadResponse, Status>>;
type ReadFnStream = ReceiverStream<Result<ReadResponse, Status>>;

async fn read_fn(
&self,
Expand All @@ -111,72 +188,18 @@ where
.ok_or_else(|| SourceError(ErrorKind::InternalError("Stream closed".to_string())))?;

// tx,rx pair for sending data over to user-defined source
let (stx, mut srx) = mpsc::channel::<Message>(DEFAULT_CHANNEL_SIZE);
let (stx, srx) = mpsc::channel::<Message>(DEFAULT_CHANNEL_SIZE);

let Some(request) = read_request.request else {
panic!("request cannot be empty");
};
let request = read_request.request.ok_or_else(|| SourceError(ErrorKind::InternalError("Stream closed".to_string())))?;

// start the ud-source rx asynchronously and start populating the gRPC
// response, so it can be streamed to the gRPC client (numaflow).
let grpc_resp_tx = grpc_tx.clone();
let grpc_writer_handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
// even though we use bi-di; the user-defined source sees this as a 1/2 duplex
// server side streaming. this means that the below while loop will terminate
// after every batch of read has been returned.
while let Some(resp) = srx.recv().await {
grpc_resp_tx
.send(Ok(proto::ReadResponse {
result: Some(proto::read_response::Result {
payload: resp.value,
offset: Some(proto::Offset {
offset: resp.offset.offset,
partition_id: resp.offset.partition_id,
}),
event_time: prost_timestamp_from_utc(resp.event_time),
keys: resp.keys,
headers: Default::default(),
}),
status: None,
}))
.await
.map_err(|e| SourceError(ErrorKind::InternalError(e.to_string())))?;
}

// send end of transmission on success
grpc_resp_tx
.send(Ok(proto::ReadResponse {
result: None,
status: Some(proto::read_response::Status {
eot: true,
code: 0,
error: 0,
msg: None,
}),
}))
.await
.map_err(|e| SourceError(ErrorKind::InternalError(e.to_string())))?;

Ok(())
});

handler_fn
.read(
SourceReadRequest {
count: request.num_records as usize,
timeout: Duration::from_millis(request.timeout_in_ms as u64),
},
stx,
)
.await;

grpc_writer_handle
.await
.map_err(|e| SourceError(ErrorKind::InternalError(e.to_string())))?
.map_err(|e| SourceError(ErrorKind::InternalError(e.to_string())))?;

Self::forward_a_batch(handler_fn.clone(), grpc_resp_tx, stx, srx, request).await?
}
_ = cln_token.cancelled() => {
eprintln!("Cancellation token triggered, shutting down");
info!("Cancellation token triggered, shutting down");
break;
}
}
Expand All @@ -188,6 +211,7 @@ where
tokio::spawn(async move {
// wait for grpc read handle, if there are any errors write to the grpc response channel
if let Err(e) = grpc_read_handle.await {
error!("shutting down the gRPC channel, {}", e);
tx.send(Err(Status::internal(e.to_string())))
.await
.map_err(|e| SourceError(ErrorKind::InternalError(e.to_string())))
Expand Down

0 comments on commit 205517a

Please sign in to comment.