Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(group): make MLS group thread safe #1349 #1404

Merged
merged 41 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
59a0b6e
wip
mchenani Dec 9, 2024
653f6c9
wip
mchenani Dec 9, 2024
e5b2bf1
wip
mchenani Dec 9, 2024
ab27ee3
fixed tests
mchenani Dec 11, 2024
a8a39f6
wip
mchenani Dec 9, 2024
2b57721
wip
mchenani Dec 9, 2024
5b352a7
wip
mchenani Dec 9, 2024
250eabb
fixed tests
mchenani Dec 11, 2024
a37c79b
Merge remote-tracking branch 'origin/mc/thread-safe-groups' into mc/t…
mchenani Dec 11, 2024
4adf5cf
fix after rebase
mchenani Dec 11, 2024
1472473
remove unneeded changes
mchenani Dec 11, 2024
fbc057c
fix clippy issues
mchenani Dec 11, 2024
dbd57e2
fix fmt
mchenani Dec 11, 2024
f9ad029
Merge branch 'main' into mc/thread-safe-groups
mchenani Dec 12, 2024
261f300
fix webassembly compile
insipx Dec 12, 2024
80b9a8c
Merge branch 'main' of github.com:xmtp/libxmtp into mc/thread-safe-gr…
insipx Dec 12, 2024
6511f09
fix tests
mchenani Dec 12, 2024
7473a24
remove unneeded comments
mchenani Dec 12, 2024
7ef4f3c
use mutex instead of semaphore
mchenani Dec 13, 2024
d288c6b
fix fmt
mchenani Dec 13, 2024
d455367
Merge branch 'main' into mc/thread-safe-groups
mchenani Dec 13, 2024
822236d
fix after conflicts
mchenani Dec 13, 2024
ba0b09c
fix linter
mchenani Dec 13, 2024
a9bbb5d
revert to semaphore
mchenani Dec 16, 2024
7189666
fix clippy
mchenani Dec 16, 2024
3098288
Merge branch 'main' into mc/thread-safe-groups
mchenani Dec 16, 2024
e7e454c
fix linter
mchenani Dec 16, 2024
eb4baf4
Merge remote-tracking branch 'origin/mc/thread-safe-groups' into mc/t…
mchenani Dec 16, 2024
494097c
make group.metadata async
mchenani Dec 16, 2024
e861ba9
Merge branch 'main' into mc/thread-safe-groups
mchenani Dec 16, 2024
2011115
fix tests
mchenani Dec 16, 2024
7cdf654
Merge remote-tracking branch 'origin/mc/thread-safe-groups' into mc/t…
mchenani Dec 16, 2024
948012f
Merge branch 'main' into mc/thread-safe-groups
mchenani Dec 17, 2024
e02b70b
pull changes from https://github.com/xmtp/libxmtp/tree/insipx/trouble…
mchenani Dec 17, 2024
ae6c21e
Merge remote-tracking branch 'origin/mc/thread-safe-groups' into mc/t…
mchenani Dec 17, 2024
6a1f73c
Merge branch 'main' into mc/thread-safe-groups
mchenani Dec 17, 2024
9144b65
fix tests
mchenani Dec 17, 2024
a151eb6
Merge remote-tracking branch 'origin/mc/thread-safe-groups' into mc/t…
mchenani Dec 17, 2024
7e7410f
Fix node bindings tests
rygine Dec 17, 2024
37ed3ec
fix fmt
mchenani Dec 17, 2024
3e43b68
Merge branch 'main' into mc/thread-safe-groups
mchenani Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1396,7 +1396,7 @@ impl FfiConversation {

pub fn group_image_url_square(&self) -> Result<String, GenericError> {
let provider = self.inner.mls_provider()?;
Ok(self.inner.group_image_url_square(provider)?)
Ok(self.inner.group_image_url_square(&provider)?)
}

pub async fn update_group_description(
Expand All @@ -1412,7 +1412,7 @@ impl FfiConversation {

pub fn group_description(&self) -> Result<String, GenericError> {
let provider = self.inner.mls_provider()?;
Ok(self.inner.group_description(provider)?)
Ok(self.inner.group_description(&provider)?)
}

pub async fn update_group_pinned_frame_url(
Expand Down Expand Up @@ -1546,7 +1546,9 @@ impl FfiConversation {

pub fn group_metadata(&self) -> Result<Arc<FfiConversationMetadata>, GenericError> {
let provider = self.inner.mls_provider()?;
let metadata = self.inner.metadata(provider)?;
let metadata = tokio::task::block_in_place(|| {
futures::executor::block_on(self.inner.metadata(&provider))
})?;
Ok(Arc::new(FfiConversationMetadata {
inner: Arc::new(metadata),
}))
Expand All @@ -1558,7 +1560,9 @@ impl FfiConversation {

pub fn conversation_type(&self) -> Result<FfiConversationType, GenericError> {
let provider = self.inner.mls_provider()?;
let conversation_type = self.inner.conversation_type(&provider)?;
let conversation_type = tokio::task::block_in_place(|| {
futures::executor::block_on(self.inner.conversation_type(&provider))
})?;
Ok(conversation_type.into())
}
}
Expand Down
28 changes: 15 additions & 13 deletions bindings_node/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,10 @@ impl Conversation {
self.created_at_ns,
);
let provider = group.mls_provider().map_err(ErrorWrapper::from)?;
let conversation_type = group
.conversation_type(&provider)
.map_err(ErrorWrapper::from)?;
let conversation_type = tokio::task::block_in_place(|| {
futures::executor::block_on(group.conversation_type(&provider))
})
.map_err(ErrorWrapper::from)?;
let kind = match conversation_type {
ConversationType::Group => None,
ConversationType::Dm => Some(XmtpGroupMessageKind::Application),
Expand Down Expand Up @@ -248,7 +249,7 @@ impl Conversation {
);

let admin_list = group
.admin_list(group.mls_provider().map_err(ErrorWrapper::from)?)
.admin_list(&group.mls_provider().map_err(ErrorWrapper::from)?)
.map_err(ErrorWrapper::from)?;

Ok(admin_list)
Expand All @@ -263,7 +264,7 @@ impl Conversation {
);

let super_admin_list = group
.super_admin_list(group.mls_provider().map_err(ErrorWrapper::from)?)
.super_admin_list(&group.mls_provider().map_err(ErrorWrapper::from)?)
.map_err(ErrorWrapper::from)?;

Ok(super_admin_list)
Expand Down Expand Up @@ -449,7 +450,7 @@ impl Conversation {
);

let group_name = group
.group_name(group.mls_provider().map_err(ErrorWrapper::from)?)
.group_name(&group.mls_provider().map_err(ErrorWrapper::from)?)
.map_err(ErrorWrapper::from)?;

Ok(group_name)
Expand Down Expand Up @@ -480,7 +481,7 @@ impl Conversation {
);

let group_image_url_square = group
.group_image_url_square(group.mls_provider().map_err(ErrorWrapper::from)?)
.group_image_url_square(&group.mls_provider().map_err(ErrorWrapper::from)?)
.map_err(ErrorWrapper::from)?;

Ok(group_image_url_square)
Expand Down Expand Up @@ -511,7 +512,7 @@ impl Conversation {
);

let group_description = group
.group_description(group.mls_provider().map_err(ErrorWrapper::from)?)
.group_description(&group.mls_provider().map_err(ErrorWrapper::from)?)
.map_err(ErrorWrapper::from)?;

Ok(group_description)
Expand Down Expand Up @@ -542,7 +543,7 @@ impl Conversation {
);

let group_pinned_frame_url = group
.group_pinned_frame_url(group.mls_provider().map_err(ErrorWrapper::from)?)
.group_pinned_frame_url(&group.mls_provider().map_err(ErrorWrapper::from)?)
.map_err(ErrorWrapper::from)?;

Ok(group_pinned_frame_url)
Expand Down Expand Up @@ -585,7 +586,7 @@ impl Conversation {

Ok(
group
.is_active(group.mls_provider().map_err(ErrorWrapper::from)?)
.is_active(&group.mls_provider().map_err(ErrorWrapper::from)?)
.map_err(ErrorWrapper::from)?,
)
}
Expand All @@ -609,9 +610,10 @@ impl Conversation {
self.created_at_ns,
);

let metadata = group
.metadata(group.mls_provider().map_err(ErrorWrapper::from)?)
.map_err(ErrorWrapper::from)?;
let metadata = tokio::task::block_in_place(|| {
futures::executor::block_on(group.metadata(&group.mls_provider()?))
})
.map_err(ErrorWrapper::from)?;

Ok(GroupMetadata { inner: metadata })
}
Expand Down
25 changes: 15 additions & 10 deletions bindings_wasm/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,18 @@ impl Conversation {
}

#[wasm_bindgen(js_name = findMessages)]
pub fn find_messages(&self, opts: Option<ListMessagesOptions>) -> Result<Vec<Message>, JsError> {
pub async fn find_messages(
&self,
opts: Option<ListMessagesOptions>,
) -> Result<Vec<Message>, JsError> {
let opts = opts.unwrap_or_default();
let group = self.to_mls_group();
let provider = group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?;
let conversation_type = group
.conversation_type(&provider)
.await
.map_err(|e| JsError::new(&format!("{e}")))?;
let kind = match conversation_type {
ConversationType::Group => None,
Expand Down Expand Up @@ -238,7 +242,7 @@ impl Conversation {
let group = self.to_mls_group();
let admin_list = group
.admin_list(
group
&group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?,
)
Expand All @@ -252,7 +256,7 @@ impl Conversation {
let group = self.to_mls_group();
let super_admin_list = group
.super_admin_list(
group
&group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?,
)
Expand Down Expand Up @@ -398,7 +402,7 @@ impl Conversation {

let group_name = group
.group_name(
group
&group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?,
)
Expand Down Expand Up @@ -428,7 +432,7 @@ impl Conversation {

let group_image_url_square = group
.group_image_url_square(
group
&group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?,
)
Expand All @@ -455,7 +459,7 @@ impl Conversation {

let group_description = group
.group_description(
group
&group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?,
)
Expand Down Expand Up @@ -485,7 +489,7 @@ impl Conversation {

let group_pinned_frame_url = group
.group_pinned_frame_url(
group
&group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?,
)
Expand All @@ -505,7 +509,7 @@ impl Conversation {

group
.is_active(
group
&group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?,
)
Expand All @@ -522,14 +526,15 @@ impl Conversation {
}

#[wasm_bindgen(js_name = groupMetadata)]
pub fn group_metadata(&self) -> Result<GroupMetadata, JsError> {
pub async fn group_metadata(&self) -> Result<GroupMetadata, JsError> {
let group = self.to_mls_group();
let metadata = group
.metadata(
group
&group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?,
)
.await
.map_err(|e| JsError::new(&format!("{e}")))?;

Ok(GroupMetadata { inner: metadata })
Expand Down
4 changes: 2 additions & 2 deletions common/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub fn logger() {
.from_env_lossy()
};

tracing_subscriber::registry()
let _ = tracing_subscriber::registry()
// structured JSON logger only if STRUCTURED=true
.with(is_structured.then(|| {
tracing_subscriber::fmt::layer()
Expand All @@ -61,7 +61,7 @@ pub fn logger() {
})
.with_filter(filter())
}))
.init();
.try_init();
insipx marked this conversation as resolved.
Show resolved Hide resolved
});
}

Expand Down
5 changes: 3 additions & 2 deletions examples/cli/serializable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ impl SerializableGroup {

let metadata = group
.metadata(
group
&group
.mls_provider()
.expect("MLS Provider could not be created"),
)
.expect("could not load metadata");
.await
.unwrap();
let permissions = group.permissions().expect("could not load permissions");

Self {
Expand Down
14 changes: 8 additions & 6 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,20 +862,22 @@ where
.map(|group| {
let active_group_count = Arc::clone(&active_group_count);
async move {
let mls_group = group.load_mls_group(provider)?;
tracing::info!(
inbox_id = self.inbox_id(),
"[{}] syncing group",
self.inbox_id()
);
tracing::info!(
inbox_id = self.inbox_id(),
group_epoch = mls_group.epoch().as_u64(),
"current epoch for [{}] in sync_all_groups() is Epoch: [{}]",
self.inbox_id(),
mls_group.epoch()
"[{}] syncing group",
self.inbox_id()
);
if mls_group.is_active() {
let is_active = group
.load_mls_group_with_lock_async(provider, |mls_group| async move {
Ok::<bool, GroupError>(mls_group.is_active())
})
.await?;
if is_active {
group.maybe_update_installations(provider, None).await?;

group.sync_with_conn(provider).await?;
Expand Down
7 changes: 4 additions & 3 deletions xmtp_mls/src/groups/intents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,9 +865,10 @@ pub(crate) mod tests {
};

let provider = group.client.mls_provider().unwrap();
let mut openmls_group = group.load_mls_group(&provider).unwrap();
let decrypted_message = openmls_group
.process_message(&provider, mls_message)
let decrypted_message = group
.load_mls_group_with_lock(&provider, |mut mls_group| {
Ok(mls_group.process_message(&provider, mls_message).unwrap())
})
.unwrap();

let staged_commit = match decrypted_message.into_content() {
Expand Down
6 changes: 3 additions & 3 deletions xmtp_mls/src/groups/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ where
&self,
provider: &XmtpOpenMlsProvider,
) -> Result<Vec<GroupMember>, GroupError> {
let openmls_group = self.load_mls_group(provider)?;
// TODO: Replace with try_into from extensions
let group_membership = extract_group_membership(openmls_group.extensions())?;
let group_membership = self.load_mls_group_with_lock(provider, |mls_group| {
Ok(extract_group_membership(mls_group.extensions())?)
})?;
let requests = group_membership
.members
.into_iter()
Expand Down
Loading
Loading