Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of stream_all_messages, fix flaky stream tests #835

Merged
merged 35 commits into from
Jul 16, 2024
Merged
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
70dc4ef
quick pass at stream_all_msg refactor
insipx Jun 11, 2024
f6e5b66
tests pass
insipx Jun 12, 2024
292fad6
try replacing StreamCloser with JoinHandle
insipx Jun 12, 2024
f0c91bd
use Notify
insipx Jun 26, 2024
d55304e
Merge branch 'main' of github.com:xmtp/libxmtp into insipx/async-call…
insipx Jun 26, 2024
a56a5cd
fix, not needed to wait on new groups
insipx Jun 26, 2024
8695b91
slowly fixing streaming tests
insipx Jun 26, 2024
a0e53fd
stream closer
insipx Jun 27, 2024
5fb9717
Merge branch 'main' of github.com:xmtp/libxmtp into insipx/async-call…
insipx Jun 27, 2024
3748bee
ignore errors resulting from cancellation of the task
insipx Jun 27, 2024
7115f51
simplify stream_all_messages even further
insipx Jun 27, 2024
4f323c4
progress. a custom sync solution similiar to notify might be good
insipx Jun 27, 2024
95f3a24
debug stream_all_messages
insipx Jul 1, 2024
a9e1a91
remove printlns
insipx Jul 1, 2024
281c364
notify only stores one permit
insipx Jul 1, 2024
9b96dc6
comment about missed messages
insipx Jul 1, 2024
a8c2aae
whitespace
insipx Jul 1, 2024
a641d2f
Merge branch 'main' of github.com:xmtp/libxmtp into insipx/async-call…
insipx Jul 1, 2024
e669a0f
fix node bindings
insipx Jul 1, 2024
6e1a961
refactor to allow waiting for stream readiness
insipx Jul 1, 2024
5910e5c
annotate streams correctly
insipx Jul 1, 2024
36d1c34
fix mls tests
insipx Jul 1, 2024
866bd10
revert logging
insipx Jul 1, 2024
2bae690
use std::mem::replace to try and avoid lost messages
insipx Jul 1, 2024
f596e4b
fmt
insipx Jul 1, 2024
f619147
fmt
insipx Jul 1, 2024
a5799b0
fmt
insipx Jul 1, 2024
2fc66d1
Merge branch 'insipx/async-callback' of github.com:xmtp/libxmtp into …
insipx Jul 1, 2024
9c916ae
cleanup
insipx Jul 2, 2024
5069ca1
some tests still flaky
insipx Jul 9, 2024
2db52c3
Merge branch 'main' of github.com:xmtp/libxmtp into insipx/async-call…
insipx Jul 11, 2024
236e882
make tests easier to debug
insipx Jul 15, 2024
7df1306
default for deliveyr
insipx Jul 15, 2024
70419cf
lose all messages test is definitely flaky
insipx Jul 15, 2024
23150d5
Merge branch 'main' of github.com:xmtp/libxmtp into insipx/async-call…
insipx Jul 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
progress. a custom sync solution similiar to notify might be good
  • Loading branch information
insipx committed Jun 27, 2024
commit 4f323c4238670fb3713f194ebd8807c9a7d571c5
48 changes: 24 additions & 24 deletions xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ use std::{
use futures::{Stream, StreamExt};
use prost::Message;
use tokio::{
sync::mpsc::{self, UnboundedSender},
sync::mpsc::self,
task::JoinHandle,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
@@ -19,7 +19,7 @@ use crate::{
groups::{extract_group_id, GroupError, MlsGroup},
retry::Retry,
retry_async,
storage::group_message::StoredGroupMessage,
storage::{group_message::StoredGroupMessage, group::StoredGroup},
Client, XmtpApi,
};

@@ -29,6 +29,17 @@ pub(crate) struct MessagesStreamInfo {
pub cursor: u64,
}

impl From<StoredGroup> for (Vec<u8>, MessagesStreamInfo) {
fn from(group: StoredGroup) -> (Vec<u8>, MessagesStreamInfo) {
(
group.id,
MessagesStreamInfo {
convo_created_at_ns: group.created_at_ns,
cursor: 0
})
}
}

impl<ApiClient> Client<ApiClient>
where
ApiClient: XmtpApi,
@@ -215,21 +226,7 @@ where

client.sync_welcomes().await?;

let current_groups = client.store().conn()?.find_groups(None, None, None, None)?;

let mut group_id_to_info: HashMap<Vec<u8>, MessagesStreamInfo> = current_groups

.into_iter()
.map(|group| {
(
group.id.clone(),
MessagesStreamInfo {
convo_created_at_ns: group.created_at_ns,
cursor: 0,
},
)
})
.collect();
let mut group_id_to_info = client.store().conn()?.find_groups(None, None, None, None)?.into_iter().map(Into::into).collect::<HashMap<Vec<u8>, MessagesStreamInfo>>();

tokio::spawn(async move {
let client = client.clone();
@@ -405,23 +402,25 @@ mod tests {
.await
.unwrap();

tokio::time::sleep(std::time::Duration::from_millis(100)).await;

let messages: Arc<Mutex<Vec<StoredGroupMessage>>> = Arc::new(Mutex::new(Vec::new()));
let messages_clone = messages.clone();
let notify = Arc::new(tokio::sync::Notify::new());
let notify_pointer = notify.clone();
let handle =
Client::<GrpcClient>::stream_all_messages_with_callback(caro.clone(), move |message| {
let text = String::from_utf8(message.decrypted_message_bytes.clone())
.unwrap_or("<not UTF8>".to_string());
println!("Received: {}", text);
notify_pointer.notify_one();
(*messages_clone.lock().unwrap()).push(message);
});

alix_group
.send_message("first".as_bytes(), &alix)
.await
.unwrap();

notify.notified().await;

let bo_group = bo
.create_group(None, GroupMetadataOptions::default())
.unwrap();
@@ -434,11 +433,13 @@ mod tests {
.send_message("second".as_bytes(), &bo)
.await
.unwrap();
notify.notified().await;

alix_group
.send_message("third".as_bytes(), &alix)
.await
.unwrap();
notify.notified().await;

let alix_group_2 = alix
.create_group(None, GroupMetadataOptions::default())
@@ -447,19 +448,18 @@ mod tests {
.add_members_by_inbox_id(&alix, vec![caro.inbox_id()])
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(300)).await;

alix_group
.send_message("fourth".as_bytes(), &alix)
.await
.unwrap();
notify.notified().await;

alix_group_2
.send_message("fifth".as_bytes(), &alix)
.await
.unwrap();

tokio::time::sleep(std::time::Duration::from_millis(100)).await;
notify.notified().await;

{
let messages = messages.lock().unwrap();
@@ -468,7 +468,7 @@ mod tests {

let a = handle.abort_handle();
a.abort();
handle.await.unwrap();
let _ = handle.await;
assert!(a.is_finished());

alix_group
Loading