Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate /v1/queue/empty to clients #253

Merged
merged 1 commit into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions libsignal-service/src/messagepipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ pub enum WebSocketStreamItem {
KeepAliveRequest,
}

pub enum Incoming {
Envelope(Envelope),
QueueEmpty,
}

#[cfg_attr(feature = "unsend-futures", async_trait::async_trait(?Send))]
#[cfg_attr(not(feature = "unsend-futures"), async_trait::async_trait)]
pub trait WebSocketService {
Expand Down Expand Up @@ -52,7 +57,7 @@ impl MessagePipe {
/// Worker task that processes the websocket into Envelopes
async fn run(
mut self,
mut sink: Sender<Result<Envelope, ServiceError>>,
mut sink: Sender<Result<Incoming, ServiceError>>,
) -> Result<(), mpsc::SendError> {
let mut ws = self.ws.clone();
let mut stream = ws
Expand All @@ -61,10 +66,12 @@ impl MessagePipe {

while let Some((request, responder)) = stream.next().await {
// WebsocketConnection::onMessage(ByteString)
let env =
self.process_request(request, responder).await.transpose();
if let Some(env) = env {
if let Some(env) =
self.process_request(request, responder).await.transpose()
{
sink.send(env).await?;
} else {
log::trace!("got empty message in websocket");
}
}

Expand All @@ -77,7 +84,7 @@ impl MessagePipe {
&mut self,
request: WebSocketRequestMessage,
responder: oneshot::Sender<WebSocketResponseMessage>,
) -> Result<Option<Envelope>, ServiceError> {
) -> Result<Option<Incoming>, ServiceError> {
// Java: MessagePipe::read
let response = WebSocketResponseMessage::from_request(&request);

Expand All @@ -90,14 +97,16 @@ impl MessagePipe {
reason: "Request without body.".into(),
});
};
Some(Envelope::decrypt(
Some(Incoming::Envelope(Envelope::decrypt(
body,
self.credentials
.signaling_key
.as_ref()
.expect("signaling_key required to decrypt envelopes"),
request.is_signal_key_encrypted(),
)?)
)?))
} else if request.is_queue_empty() {
Some(Incoming::QueueEmpty)
} else {
None
};
Expand All @@ -114,12 +123,12 @@ impl MessagePipe {
/// Returns the stream of `Envelope`s
///
/// Envelopes yielded are acknowledged.
pub fn stream(self) -> impl Stream<Item = Result<Envelope, ServiceError>> {
pub fn stream(self) -> impl Stream<Item = Result<Incoming, ServiceError>> {
let (sink, stream) = mpsc::channel(1);

let stream = stream.map(Some);
let runner = self.run(sink).map(|e| {
log::info!("Sink was closed. Reason: {:?}", e);
log::info!("sink was closed: {:?}", e);
None
});

Expand Down
1 change: 1 addition & 0 deletions libsignal-service/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl<WS: WebSocketService> SignalWebSocketProcess<WS> {
}),
(Type::Request, Some(request), _) => {
let (sink, recv) = oneshot::channel();
log::trace!("sending request with body");
self.request_sink.send((request, sink)).await.map_err(
|_| ServiceError::WsError {
reason: "request handler failed".into(),
Expand Down
Loading