From 16695d0655e047709729c7839ae2f5e778db62a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Sun, 15 Oct 2023 12:14:42 +0200 Subject: [PATCH] Propagate /v1/queue/empty to clients (#253) --- libsignal-service/src/messagepipe.rs | 27 ++++++++++++++++++--------- libsignal-service/src/websocket.rs | 1 + 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/libsignal-service/src/messagepipe.rs b/libsignal-service/src/messagepipe.rs index f9e54d3f8..f47290603 100644 --- a/libsignal-service/src/messagepipe.rs +++ b/libsignal-service/src/messagepipe.rs @@ -23,6 +23,11 @@ pub enum WebSocketStreamItem { KeepAliveRequest, } +pub enum Incoming { + Envelope(Envelope), + QueueEmpty, +} + #[cfg_attr(feature = "unsend-futures", async_trait::async_trait(?Send))] #[cfg_attr(not(feature = "unsend-futures"), async_trait::async_trait)] pub trait WebSocketService { @@ -52,7 +57,7 @@ impl MessagePipe { /// Worker task that processes the websocket into Envelopes async fn run( mut self, - mut sink: Sender>, + mut sink: Sender>, ) -> Result<(), mpsc::SendError> { let mut ws = self.ws.clone(); let mut stream = ws @@ -61,10 +66,12 @@ impl MessagePipe { while let Some((request, responder)) = stream.next().await { // WebsocketConnection::onMessage(ByteString) - let env = - self.process_request(request, responder).await.transpose(); - if let Some(env) = env { + if let Some(env) = + self.process_request(request, responder).await.transpose() + { sink.send(env).await?; + } else { + log::trace!("got empty message in websocket"); } } @@ -77,7 +84,7 @@ impl MessagePipe { &mut self, request: WebSocketRequestMessage, responder: oneshot::Sender, - ) -> Result, ServiceError> { + ) -> Result, ServiceError> { // Java: MessagePipe::read let response = WebSocketResponseMessage::from_request(&request); @@ -90,14 +97,16 @@ impl MessagePipe { reason: "Request without body.".into(), }); }; - Some(Envelope::decrypt( + Some(Incoming::Envelope(Envelope::decrypt( body, self.credentials .signaling_key .as_ref() .expect("signaling_key required to decrypt envelopes"), request.is_signal_key_encrypted(), - )?) + )?)) + } else if request.is_queue_empty() { + Some(Incoming::QueueEmpty) } else { None }; @@ -114,12 +123,12 @@ impl MessagePipe { /// Returns the stream of `Envelope`s /// /// Envelopes yielded are acknowledged. - pub fn stream(self) -> impl Stream> { + pub fn stream(self) -> impl Stream> { let (sink, stream) = mpsc::channel(1); let stream = stream.map(Some); let runner = self.run(sink).map(|e| { - log::info!("Sink was closed. Reason: {:?}", e); + log::info!("sink was closed: {:?}", e); None }); diff --git a/libsignal-service/src/websocket.rs b/libsignal-service/src/websocket.rs index 5c667649c..d9f22c701 100644 --- a/libsignal-service/src/websocket.rs +++ b/libsignal-service/src/websocket.rs @@ -104,6 +104,7 @@ impl SignalWebSocketProcess { }), (Type::Request, Some(request), _) => { let (sink, recv) = oneshot::channel(); + log::trace!("sending request with body"); self.request_sink.send((request, sink)).await.map_err( |_| ServiceError::WsError { reason: "request handler failed".into(),