Skip to content

Commit

Permalink
Propagate /v1/queue/empty to clients (#253)
Browse files Browse the repository at this point in the history
  • Loading branch information
gferon authored Oct 15, 2023
1 parent 96e4f39 commit 16695d0
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
27 changes: 18 additions & 9 deletions libsignal-service/src/messagepipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ pub enum WebSocketStreamItem {
KeepAliveRequest,
}

pub enum Incoming {
Envelope(Envelope),
QueueEmpty,
}

#[cfg_attr(feature = "unsend-futures", async_trait::async_trait(?Send))]
#[cfg_attr(not(feature = "unsend-futures"), async_trait::async_trait)]
pub trait WebSocketService {
Expand Down Expand Up @@ -52,7 +57,7 @@ impl MessagePipe {
/// Worker task that processes the websocket into Envelopes
async fn run(
mut self,
mut sink: Sender<Result<Envelope, ServiceError>>,
mut sink: Sender<Result<Incoming, ServiceError>>,
) -> Result<(), mpsc::SendError> {
let mut ws = self.ws.clone();
let mut stream = ws
Expand All @@ -61,10 +66,12 @@ impl MessagePipe {

while let Some((request, responder)) = stream.next().await {
// WebsocketConnection::onMessage(ByteString)
let env =
self.process_request(request, responder).await.transpose();
if let Some(env) = env {
if let Some(env) =
self.process_request(request, responder).await.transpose()
{
sink.send(env).await?;
} else {
log::trace!("got empty message in websocket");
}
}

Expand All @@ -77,7 +84,7 @@ impl MessagePipe {
&mut self,
request: WebSocketRequestMessage,
responder: oneshot::Sender<WebSocketResponseMessage>,
) -> Result<Option<Envelope>, ServiceError> {
) -> Result<Option<Incoming>, ServiceError> {
// Java: MessagePipe::read
let response = WebSocketResponseMessage::from_request(&request);

Expand All @@ -90,14 +97,16 @@ impl MessagePipe {
reason: "Request without body.".into(),
});
};
Some(Envelope::decrypt(
Some(Incoming::Envelope(Envelope::decrypt(
body,
self.credentials
.signaling_key
.as_ref()
.expect("signaling_key required to decrypt envelopes"),
request.is_signal_key_encrypted(),
)?)
)?))
} else if request.is_queue_empty() {
Some(Incoming::QueueEmpty)
} else {
None
};
Expand All @@ -114,12 +123,12 @@ impl MessagePipe {
/// Returns the stream of `Envelope`s
///
/// Envelopes yielded are acknowledged.
pub fn stream(self) -> impl Stream<Item = Result<Envelope, ServiceError>> {
pub fn stream(self) -> impl Stream<Item = Result<Incoming, ServiceError>> {
let (sink, stream) = mpsc::channel(1);

let stream = stream.map(Some);
let runner = self.run(sink).map(|e| {
log::info!("Sink was closed. Reason: {:?}", e);
log::info!("sink was closed: {:?}", e);
None
});

Expand Down
1 change: 1 addition & 0 deletions libsignal-service/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl<WS: WebSocketService> SignalWebSocketProcess<WS> {
}),
(Type::Request, Some(request), _) => {
let (sink, recv) = oneshot::channel();
log::trace!("sending request with body");
self.request_sink.send((request, sink)).await.map_err(
|_| ServiceError::WsError {
reason: "request handler failed".into(),
Expand Down

0 comments on commit 16695d0

Please sign in to comment.