diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index fd494850a..497d7f49d 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -615,7 +615,7 @@ impl FfiConversations { Ok(convo_list) } - pub fn stream(&self, callback: Box) -> FfiStreamCloser { + pub async fn stream(&self, callback: Box) -> FfiStreamCloser { let client = self.inner_client.clone(); let handle = RustXmtpClient::stream_conversations_with_callback(client.clone(), move |convo| { @@ -629,7 +629,7 @@ impl FfiConversations { FfiStreamCloser::new(handle) } - pub fn stream_all_messages( + pub async fn stream_all_messages( &self, message_callback: Box, ) -> FfiStreamCloser { @@ -1117,7 +1117,7 @@ impl FfiGroup { Ok(()) } - pub fn stream(&self, message_callback: Box) -> FfiStreamCloser { + pub async fn stream(&self, message_callback: Box) -> FfiStreamCloser { let inner_client = Arc::clone(&self.inner_client); let handle = MlsGroup::stream_with_callback( inner_client, @@ -1266,7 +1266,7 @@ impl FfiStreamCloser { } } -#[uniffi::export] +#[uniffi::export(async_runtime = "tokio")] impl FfiStreamCloser { /// Signal the stream to end /// Does not wait for the stream to end. @@ -2010,7 +2010,8 @@ mod tests { let message_callbacks = RustStreamCallback::default(); let stream_messages = bo .conversations() - .stream_all_messages(Box::new(message_callbacks.clone())); + .stream_all_messages(Box::new(message_callbacks.clone())) + .await; stream_messages.wait_for_ready().await; // Create group and send first message @@ -2067,7 +2068,8 @@ mod tests { let message_callbacks = RustStreamCallback::default(); let stream_messages = bo .conversations() - .stream_all_messages(Box::new(message_callbacks.clone())); + .stream_all_messages(Box::new(message_callbacks.clone())) + .await; stream_messages.wait_for_ready().await; let first_msg_check = 2; @@ -2140,7 +2142,8 @@ mod tests { let stream = bola .conversations() - .stream(Box::new(stream_callback.clone())); + .stream(Box::new(stream_callback.clone())) + .await; amal.conversations() .create_group( @@ -2188,7 +2191,8 @@ mod tests { let stream = caro .conversations() - .stream_all_messages(Box::new(stream_callback.clone())); + .stream_all_messages(Box::new(stream_callback.clone())) + .await; stream.wait_for_ready().await; alix_group.send("first".as_bytes().to_vec()).await.unwrap(); @@ -2234,7 +2238,7 @@ mod tests { let bola_group = bola.group(amal_group.group_id.clone()).unwrap(); let stream_callback = RustStreamCallback::default(); - let stream_closer = bola_group.stream(Box::new(stream_callback.clone())); + let stream_closer = bola_group.stream(Box::new(stream_callback.clone())).await; stream_closer.wait_for_ready().await; @@ -2273,7 +2277,8 @@ mod tests { let stream_callback = RustStreamCallback::default(); let stream_closer = bola .conversations() - .stream_all_messages(Box::new(stream_callback.clone())); + .stream_all_messages(Box::new(stream_callback.clone())) + .await; stream_closer.wait_for_ready().await; amal_group.send(b"hello1".to_vec()).await.unwrap(); @@ -2368,11 +2373,15 @@ mod tests { // Stream all group messages let message_callback = RustStreamCallback::default(); let group_callback = RustStreamCallback::default(); - let stream_groups = bo.conversations().stream(Box::new(group_callback.clone())); + let stream_groups = bo + .conversations() + .stream(Box::new(group_callback.clone())) + .await; let stream_messages = bo .conversations() - .stream_all_messages(Box::new(message_callback.clone())); + .stream_all_messages(Box::new(message_callback.clone())) + .await; stream_messages.wait_for_ready().await; // Create group and send first message