Skip to content

Commit

Permalink
Silence send errors in network callbacks when receiver dropped (typed…
Browse files Browse the repository at this point in the history
…b#526)

## Release notes: usage and product changes

Downgrade the "channel closed" `SendError` from ERROR to DEBUG when the
receiving end of the stream is dropped before the stream is exhausted.
  • Loading branch information
dmitrii-ubskii committed Jan 8, 2024
1 parent 7a9f772 commit eec033f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
6 changes: 3 additions & 3 deletions rust/src/common/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ error_messages! { InternalError
RecvError() =
1: "Channel is closed.",
SendError() =
2: "Channel is closed.",
2: "Unable to send response over callback channel (receiver dropped).",
UnexpectedRequestType(String) =
3: "Unexpected request type for remote procedure call: {}.",
UnexpectedResponseType(String) =
Expand Down Expand Up @@ -215,8 +215,8 @@ impl From<tonic::transport::Error> for Error {
}

impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
fn from(err: tokio::sync::mpsc::error::SendError<T>) -> Self {
Self::Other(err.to_string())
fn from(_err: tokio::sync::mpsc::error::SendError<T>) -> Self {
Self::Internal(InternalError::SendError())
}
}

Expand Down
14 changes: 9 additions & 5 deletions rust/src/connection/network/transmitter/response_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
*/

use crossbeam::channel::Sender as SyncSender;
use log::error;
use log::{debug, error};
use tokio::sync::{mpsc::UnboundedSender, oneshot::Sender as AsyncOneshotSender};

use crate::{
Expand All @@ -43,8 +43,10 @@ impl<T> ResponseSink<T> {
Self::BlockingOneShot(sink) => sink.send(response).map_err(Error::from),
Self::Streamed(sink) => sink.send(response).map_err(Error::from),
};
if let Err(err) = result {
error!("{}", err);
match result {
Err(Error::Internal(err @ InternalError::SendError())) => debug!("{err}"),
Err(err) => error!("{err}"),
Ok(()) => (),
}
}

Expand All @@ -53,8 +55,10 @@ impl<T> ResponseSink<T> {
Self::Streamed(sink) => sink.send(response).map_err(Error::from),
_ => unreachable!("attempted to stream over a one-shot callback"),
};
if let Err(err) = result {
error!("{}", err);
match result {
Err(Error::Internal(err @ InternalError::SendError())) => debug!("{err}"),
Err(err) => error!("{err}"),
Ok(()) => (),
}
}

Expand Down

0 comments on commit eec033f

Please sign in to comment.