Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dont error if intent is already published #1423

Merged
merged 3 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 38 additions & 18 deletions xmtp_mls/src/groups/mls_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1619,33 +1619,53 @@ pub(crate) mod tests {

use super::*;
use crate::builder::ClientBuilder;
use futures::future;
use std::sync::Arc;
use xmtp_cryptography::utils::generate_local_wallet;

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test(flavor = "multi_thread"))]
/// This test is not reproducible in webassembly, b/c webassembly has only one thread.
#[cfg_attr(
not(target_arch = "wasm32"),
tokio::test(flavor = "multi_thread", worker_threads = 10)
)]
#[cfg(not(target_family = "wasm"))]
async fn publish_intents_worst_case_scenario() {
let wallet = generate_local_wallet();
let amal = Arc::new(ClientBuilder::new_test_client(&wallet).await);
let amal_group: Arc<MlsGroup<_>> =
Arc::new(amal.create_group(None, Default::default()).unwrap());
let amal_a = Arc::new(ClientBuilder::new_test_client(&wallet).await);
let amal_group_a: Arc<MlsGroup<_>> =
Arc::new(amal_a.create_group(None, Default::default()).unwrap());

amal_group.send_message_optimistic(b"1").unwrap();
amal_group.send_message_optimistic(b"2").unwrap();
amal_group.send_message_optimistic(b"3").unwrap();
amal_group.send_message_optimistic(b"4").unwrap();
amal_group.send_message_optimistic(b"5").unwrap();
amal_group.send_message_optimistic(b"6").unwrap();
let conn = amal_a.context().store().conn().unwrap();
let provider: Arc<XmtpOpenMlsProvider> = Arc::new(conn.into());

let conn = amal.context().store().conn().unwrap();
let provider: XmtpOpenMlsProvider = conn.into();
// create group intent
amal_group_a.sync().await.unwrap();
assert_eq!(provider.conn_ref().intents_deleted(), 1);

let mut futures = vec![];
for _ in 0..10 {
futures.push(amal_group.publish_intents(&provider))
for _ in 0..100 {
let s = xmtp_common::rand_string::<100>();
amal_group_a.send_message_optimistic(s.as_bytes()).unwrap();
}

let mut set = tokio::task::JoinSet::new();
for _ in 0..50 {
let g = amal_group_a.clone();
let p = provider.clone();
set.spawn(async move { g.publish_intents(&p).await });
}

let res = set.join_all().await;
let errs: Vec<&Result<_, _>> = res.iter().filter(|r| r.is_err()).collect();
errs.iter().for_each(|e| {
tracing::error!("{}", e.as_ref().unwrap_err());
});

let published = provider.conn_ref().intents_published();
assert_eq!(published, 101);
let created = provider.conn_ref().intents_created();
assert_eq!(created, 101);
if !errs.is_empty() {
panic!("Errors during publish");
}
future::join_all(futures).await;
}

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
Expand Down
20 changes: 16 additions & 4 deletions xmtp_mls/src/storage/encrypted_store/group_intent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,22 @@ impl DbConnection {
})?;

match res {
// If nothing matched the query, return an error. Either ID or state was wrong
0 => Err(StorageError::NotFound(format!(
"ToPublish intent {intent_id} for publish"
))),
// If nothing matched the query, check if its already published, otherwise return an error. Either ID or state was wrong
0 => {
let already_published = self.raw_query(|conn| {
dsl::group_intents
.filter(dsl::id.eq(intent_id))
.first::<StoredGroupIntent>(conn)
});

if already_published.is_ok() {
Ok(())
} else {
Err(StorageError::NotFound(format!(
"Published intent {intent_id} for commit"
)))
}
}
_ => Ok(()),
}
}
Expand Down