Skip to content

Commit

Permalink
Added Grpc server stream result
Browse files Browse the repository at this point in the history
  • Loading branch information
amigin committed Aug 9, 2024
1 parent e925bfa commit 8234052
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 0 deletions.
4 changes: 4 additions & 0 deletions my-grpc-extensions/src/grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use tokio::sync::mpsc::error::SendTimeoutError;
#[cfg(not(feature = "adjust-server-stream"))]
const DEFAULT_SEND_TIMEOUT: Duration = Duration::from_secs(30);

pub type SendStream<TDest> = tonic::Response<
Pin<Box<dyn futures_util::Stream<Item = Result<TDest, tonic::Status>> + Send + Sync + 'static>>,
>;

pub async fn create_empty_stream<TDest>() -> Result<
tonic::Response<
Pin<
Expand Down
3 changes: 3 additions & 0 deletions my-grpc-extensions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ mod ssh;
pub use ssh::*;
#[cfg(feature = "with-ssh")]
pub extern crate my_ssh;

#[cfg(feature = "grpc-server")]
pub mod server_stream_result;
27 changes: 27 additions & 0 deletions my-grpc-extensions/src/server_stream_result.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use std::pin::Pin;

use tokio::sync::mpsc::Sender;

use crate::grpc_server::SendStream;

pub struct GrpcServerStreamResult<TModel: Send + Sync + 'static> {
tx: Sender<Result<TModel, tonic::Status>>,
}

impl<TModel: Send + Sync + 'static> GrpcServerStreamResult<TModel> {
pub fn new() -> (Self, SendStream<TModel>) {
let (tx, rx) = tokio::sync::mpsc::channel(32768);

let output_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
let response: Pin<
Box<dyn futures::Stream<Item = Result<TModel, tonic::Status>> + Send + Sync + 'static>,
> = Box::pin(output_stream);

let result = tonic::Response::new(response);
(Self { tx }, result)
}

pub async fn send_item(&mut self, item: TModel) {
self.tx.send(Ok(item)).await.unwrap()
}
}

0 comments on commit 8234052

Please sign in to comment.