From e7630ac288c3917dc3efd0a77ae274bc26c107d6 Mon Sep 17 00:00:00 2001 From: Mojtaba Chenani Date: Fri, 20 Dec 2024 20:40:45 +0100 Subject: [PATCH] feat(conversations): create conversation list with last message (#1437) --- bindings_ffi/src/mls.rs | 191 ++++++++++++++++++ xmtp_mls/Cargo.toml | 1 + .../down.sql | 1 + .../up.sql | 47 +++++ xmtp_mls/src/client.rs | 38 ++++ xmtp_mls/src/groups/mod.rs | 5 + .../encrypted_store/conversation_list.rs | 190 +++++++++++++++++ xmtp_mls/src/storage/encrypted_store/group.rs | 9 +- .../storage/encrypted_store/group_message.rs | 2 +- xmtp_mls/src/storage/encrypted_store/mod.rs | 1 + .../src/storage/encrypted_store/schema.rs | 26 +++ 11 files changed, 509 insertions(+), 2 deletions(-) create mode 100644 xmtp_mls/migrations/2024-12-20-143210_create_conversation_list_view/down.sql create mode 100644 xmtp_mls/migrations/2024-12-20-143210_create_conversation_list_view/up.sql create mode 100644 xmtp_mls/src/storage/encrypted_store/conversation_list.rs diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 193ea8732..ec9ce607b 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -947,6 +947,26 @@ impl FfiConversations { Ok(convo_list) } + pub async fn list_conversations( + &self, + ) -> Result>, GenericError> { + let inner = self.inner_client.as_ref(); + let convo_list: Vec> = inner + .list_conversations()? + .into_iter() + .map(|conversation_item| { + Arc::new(FfiConversationListItem { + conversation: conversation_item.group.into(), + last_message: conversation_item + .last_message + .map(|stored_message| stored_message.into()), + }) + }) + .collect(); + + Ok(convo_list) + } + pub async fn list_groups( &self, opts: FfiListConversationsOptions, @@ -1142,6 +1162,13 @@ pub struct FfiConversation { inner: MlsGroup, } +#[derive(uniffi::Object)] +#[allow(unused_variables, dead_code)] +pub struct FfiConversationListItem { + conversation: FfiConversation, + last_message: Option, +} + impl From> for FfiConversation { fn from(mls_group: MlsGroup) -> FfiConversation { FfiConversation { inner: mls_group } @@ -2667,6 +2694,170 @@ mod tests { assert!(stream_messages.is_closed()); } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_list_conversations_last_message() { + // Step 1: Setup test client Alix and bo + let alix = new_test_client().await; + let bo = new_test_client().await; + + // Step 2: Create a group and add messages + let alix_conversations = alix.conversations(); + + // Create a group + let group = alix_conversations + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + // Add messages to the group + group + .send("First message".as_bytes().to_vec()) + .await + .unwrap(); + group + .send("Second message".as_bytes().to_vec()) + .await + .unwrap(); + + // Step 3: Synchronize conversations + alix_conversations + .sync_all_conversations(None) + .await + .unwrap(); + + // Step 4: List conversations and verify + let conversations = alix_conversations.list_conversations().await.unwrap(); + + // Ensure the group is included + assert_eq!(conversations.len(), 1, "Alix should have exactly 1 group"); + + let last_message = conversations[0].last_message.as_ref().unwrap(); + assert_eq!( + last_message.content, + "Second message".as_bytes().to_vec(), + "Last message content should be the most recent" + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_list_conversations_no_messages() { + // Step 1: Setup test clients Alix and Bo + let alix = new_test_client().await; + let bo = new_test_client().await; + + let alix_conversations = alix.conversations(); + + // Step 2: Create a group with Bo but do not send messages + alix_conversations + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + // Step 3: Synchronize conversations + alix_conversations + .sync_all_conversations(None) + .await + .unwrap(); + + // Step 4: List conversations and verify + let conversations = alix_conversations.list_conversations().await.unwrap(); + + // Ensure the group is included + assert_eq!(conversations.len(), 1, "Alix should have exactly 1 group"); + + // Verify that the last_message is None + assert!( + conversations[0].last_message.is_none(), + "Last message should be None since no messages were sent" + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_conversation_list_ordering() { + // Step 1: Setup test client + let client = new_test_client().await; + let conversations_api = client.conversations(); + + // Step 2: Create Group A + let group_a = conversations_api + .create_group(vec![], FfiCreateGroupOptions::default()) + .await + .unwrap(); + + // Step 3: Create Group B + let group_b = conversations_api + .create_group(vec![], FfiCreateGroupOptions::default()) + .await + .unwrap(); + + // Step 4: Send a message to Group A + group_a + .send("Message to Group A".as_bytes().to_vec()) + .await + .unwrap(); + + // Step 5: Create Group C + let group_c = conversations_api + .create_group(vec![], FfiCreateGroupOptions::default()) + .await + .unwrap(); + + // Step 6: Synchronize conversations + conversations_api + .sync_all_conversations(None) + .await + .unwrap(); + + // Step 7: Fetch the conversation list + let conversations = conversations_api.list_conversations().await.unwrap(); + + // Step 8: Assert the correct order of conversations + assert_eq!( + conversations.len(), + 3, + "There should be exactly 3 conversations" + ); + + // Verify the order: Group C, Group A, Group B + assert_eq!( + conversations[0].conversation.inner.group_id, group_c.inner.group_id, + "Group C should be the first conversation" + ); + assert_eq!( + conversations[1].conversation.inner.group_id, group_a.inner.group_id, + "Group A should be the second conversation" + ); + assert_eq!( + conversations[2].conversation.inner.group_id, group_b.inner.group_id, + "Group B should be the third conversation" + ); + + // Verify the last_message field for Group A and None for others + assert!( + conversations[0].last_message.is_none(), + "Group C should have no messages" + ); + assert!( + conversations[1].last_message.is_some(), + "Group A should have a last message" + ); + assert_eq!( + conversations[1].last_message.as_ref().unwrap().content, + "Message to Group A".as_bytes().to_vec(), + "Group A's last message content should match" + ); + assert!( + conversations[2].last_message.is_none(), + "Group B should have no messages" + ); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn test_can_sync_all_groups() { let alix = new_test_client().await; diff --git a/xmtp_mls/Cargo.toml b/xmtp_mls/Cargo.toml index 0e160e485..9954151a5 100644 --- a/xmtp_mls/Cargo.toml +++ b/xmtp_mls/Cargo.toml @@ -107,6 +107,7 @@ diesel = { workspace = true, features = [ "r2d2", "returning_clauses_for_sqlite_3_35", "sqlite", + "32-column-tables" ] } dyn-clone.workspace = true libsqlite3-sys = { workspace = true } diff --git a/xmtp_mls/migrations/2024-12-20-143210_create_conversation_list_view/down.sql b/xmtp_mls/migrations/2024-12-20-143210_create_conversation_list_view/down.sql new file mode 100644 index 000000000..64a220cd8 --- /dev/null +++ b/xmtp_mls/migrations/2024-12-20-143210_create_conversation_list_view/down.sql @@ -0,0 +1 @@ +DROP VIEW IF EXISTS conversation_list; \ No newline at end of file diff --git a/xmtp_mls/migrations/2024-12-20-143210_create_conversation_list_view/up.sql b/xmtp_mls/migrations/2024-12-20-143210_create_conversation_list_view/up.sql new file mode 100644 index 000000000..01d5433ab --- /dev/null +++ b/xmtp_mls/migrations/2024-12-20-143210_create_conversation_list_view/up.sql @@ -0,0 +1,47 @@ +CREATE VIEW conversation_list AS +WITH ranked_messages AS ( + SELECT + gm.group_id, + gm.id AS message_id, + gm.decrypted_message_bytes, + gm.sent_at_ns, + gm.kind AS message_kind, + gm.sender_installation_id, + gm.sender_inbox_id, + gm.delivery_status, + gm.content_type, + gm.version_major, + gm.version_minor, + gm.authority_id, + ROW_NUMBER() OVER (PARTITION BY gm.group_id ORDER BY gm.sent_at_ns DESC) AS row_num + FROM + group_messages gm + WHERE + gm.kind = 1 +) +SELECT + g.id AS id, + g.created_at_ns, + g.membership_state, + g.installations_last_checked, + g.added_by_inbox_id, + g.welcome_id, + g.dm_inbox_id, + g.rotated_at_ns, + g.conversation_type, + rm.message_id, + rm.decrypted_message_bytes, + rm.sent_at_ns, + rm.message_kind, + rm.sender_installation_id, + rm.sender_inbox_id, + rm.delivery_status, + rm.content_type, + rm.version_major, + rm.version_minor, + rm.authority_id +FROM + groups g + LEFT JOIN ranked_messages rm + ON g.id = rm.group_id AND rm.row_num = 1 +ORDER BY COALESCE(rm.sent_at_ns, g.created_at_ns) DESC; \ No newline at end of file diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index d1041dfcf..3d10e03f1 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -33,6 +33,7 @@ use xmtp_proto::xmtp::mls::api::v1::{ #[cfg(any(test, feature = "test-utils"))] use crate::groups::device_sync::WorkerHandle; +use crate::groups::ConversationListItem; use crate::{ api::ApiClientWrapper, groups::{ @@ -669,6 +670,43 @@ where .collect()) } + pub fn list_conversations(&self) -> Result>, ClientError> { + Ok(self + .store() + .conn()? + .fetch_conversation_list()? + .into_iter() + .map(|conversation_item| { + let message = conversation_item.message_id.and_then(|message_id| { + // Only construct StoredGroupMessage if all fields are Some + Some(StoredGroupMessage { + id: message_id, + group_id: conversation_item.id.clone(), + decrypted_message_bytes: conversation_item.decrypted_message_bytes?, + sent_at_ns: conversation_item.sent_at_ns?, + sender_installation_id: conversation_item.sender_installation_id?, + sender_inbox_id: conversation_item.sender_inbox_id?, + kind: conversation_item.kind?, + delivery_status: conversation_item.delivery_status?, + content_type: conversation_item.content_type?, + version_major: conversation_item.version_major?, + version_minor: conversation_item.version_minor?, + authority_id: conversation_item.authority_id?, + }) + }); + + ConversationListItem { + group: MlsGroup::new( + self.clone(), + conversation_item.id, + conversation_item.created_at_ns, + ), + last_message: message, + } + }) + .collect()) + } + /// Upload a Key Package to the network and publish the signed identity update /// from the provided SignatureRequest pub async fn register_identity( diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 5f0bfa22c..15871b599 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -276,6 +276,11 @@ pub struct MlsGroup { mutex: Arc>, } +pub struct ConversationListItem { + pub group: MlsGroup, + pub last_message: Option, +} + #[derive(Default)] pub struct GroupMetadataOptions { pub name: Option, diff --git a/xmtp_mls/src/storage/encrypted_store/conversation_list.rs b/xmtp_mls/src/storage/encrypted_store/conversation_list.rs new file mode 100644 index 000000000..e82d2fd66 --- /dev/null +++ b/xmtp_mls/src/storage/encrypted_store/conversation_list.rs @@ -0,0 +1,190 @@ +use crate::storage::group::{ConversationType, GroupMembershipState}; +use crate::storage::group_message::{ContentType, DeliveryStatus, GroupMessageKind}; +use crate::storage::schema::conversation_list::dsl::conversation_list; +use crate::storage::{DbConnection, StorageError}; +use diesel::{QueryDsl, Queryable, RunQueryDsl, Table}; +use serde::{Deserialize, Serialize}; + +#[derive(Queryable, Debug, Clone, Deserialize, Serialize)] +#[diesel(table_name = conversation_list)] +#[diesel(primary_key(id))] +/// Combined view of a group and its messages, now named `conversation_list`. +pub struct ConversationListItem { + /// group_id + pub id: Vec, + /// Based on timestamp of the welcome message + pub created_at_ns: i64, + /// Enum, [`GroupMembershipState`] representing access to the group + pub membership_state: GroupMembershipState, + /// Track when the latest, most recent installations were checked + pub installations_last_checked: i64, + /// The inbox_id of who added the user to the group + pub added_by_inbox_id: String, + /// The sequence id of the welcome message + pub welcome_id: Option, + /// The inbox_id of the DM target + pub dm_inbox_id: Option, + /// The last time the leaf node encryption key was rotated + pub rotated_at_ns: i64, + /// Enum, [`ConversationType`] signifies the group conversation type which extends to who can access it. + pub conversation_type: ConversationType, + /// Id of the message. Nullable because not every group has messages. + pub message_id: Option>, + /// Contents of message after decryption. + pub decrypted_message_bytes: Option>, + /// Time in nanoseconds the message was sent. + pub sent_at_ns: Option, + /// Group Message Kind Enum: 1 = Application, 2 = MembershipChange + pub kind: Option, + /// The ID of the App Installation this message was sent from. + pub sender_installation_id: Option>, + /// The Inbox ID of the Sender + pub sender_inbox_id: Option, + /// We optimistically store messages before sending. + pub delivery_status: Option, + /// The Content Type of the message + pub content_type: Option, + /// The content type version major + pub version_major: Option, + /// The content type version minor + pub version_minor: Option, + /// The ID of the authority defining the content type + pub authority_id: Option, +} + +impl DbConnection { + pub fn fetch_conversation_list(&self) -> Result, StorageError> { + let query = conversation_list + .select(conversation_list::all_columns()) + .into_boxed(); + Ok(self.raw_query(|conn| query.load::(conn))?) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use crate::storage::group::tests::{generate_group, generate_group_with_created_at}; + use crate::storage::tests::with_connection; + use crate::Store; + use wasm_bindgen_test::wasm_bindgen_test; + + #[wasm_bindgen_test(unsupported = tokio::test)] + async fn test_single_group_multiple_messages() { + with_connection(|conn| { + // Create a group + let group = generate_group(None); + group.store(conn).unwrap(); + + // Insert multiple messages into the group + for i in 1..5 { + let message = + crate::storage::encrypted_store::group_message::tests::generate_message( + None, + Some(&group.id), + Some(i * 1000), + None, + ); + message.store(conn).unwrap(); + } + + // Fetch the conversation list + let conversation_list = conn.fetch_conversation_list().unwrap(); + assert_eq!(conversation_list.len(), 1, "Should return one group"); + assert_eq!( + conversation_list[0].id, group.id, + "Returned group ID should match the created group" + ); + assert_eq!( + conversation_list[0].sent_at_ns.unwrap(), + 4000, + "Last message should be the most recent one" + ); + }) + .await + } + #[wasm_bindgen_test(unsupported = tokio::test)] + async fn test_three_groups_specific_ordering() { + with_connection(|conn| { + // Create three groups + let group_a = generate_group_with_created_at(None, 5000); // Created after last message + let group_b = generate_group_with_created_at(None, 2000); // Created before last message + let group_c = generate_group_with_created_at(None, 1000); // Created before last message with no messages + + group_a.store(conn).unwrap(); + group_b.store(conn).unwrap(); + group_c.store(conn).unwrap(); + // Add a message to group_b + let message = crate::storage::encrypted_store::group_message::tests::generate_message( + None, + Some(&group_b.id), + Some(3000), // Last message timestamp + None, + ); + message.store(conn).unwrap(); + + // Fetch the conversation list + let conversation_list = conn.fetch_conversation_list().unwrap(); + + assert_eq!(conversation_list.len(), 3, "Should return all three groups"); + assert_eq!( + conversation_list[0].id, group_a.id, + "Group created after the last message should come first" + ); + assert_eq!( + conversation_list[1].id, group_b.id, + "Group with the last message should come second" + ); + assert_eq!( + conversation_list[2].id, group_c.id, + "Group created before the last message with no messages should come last" + ); + }) + .await + } + #[wasm_bindgen_test(unsupported = tokio::test)] + async fn test_group_with_newer_message_update() { + with_connection(|conn| { + // Create a group + let group = generate_group(None); + group.store(conn).unwrap(); + + // Add an initial message + let first_message = + crate::storage::encrypted_store::group_message::tests::generate_message( + None, + Some(&group.id), + Some(1000), + None, + ); + first_message.store(conn).unwrap(); + + // Fetch the conversation list and check last message + let mut conversation_list = conn.fetch_conversation_list().unwrap(); + assert_eq!(conversation_list.len(), 1, "Should return one group"); + assert_eq!( + conversation_list[0].sent_at_ns.unwrap(), + 1000, + "Last message should match the first message" + ); + + // Add a newer message + let second_message = + crate::storage::encrypted_store::group_message::tests::generate_message( + None, + Some(&group.id), + Some(2000), + None, + ); + second_message.store(conn).unwrap(); + + // Fetch the conversation list again and validate the last message is updated + conversation_list = conn.fetch_conversation_list().unwrap(); + assert_eq!( + conversation_list[0].sent_at_ns.unwrap(), + 2000, + "Last message should now match the second (newest) message" + ); + }) + .await + } +} diff --git a/xmtp_mls/src/storage/encrypted_store/group.rs b/xmtp_mls/src/storage/encrypted_store/group.rs index 547f85134..0c6642018 100644 --- a/xmtp_mls/src/storage/encrypted_store/group.rs +++ b/xmtp_mls/src/storage/encrypted_store/group.rs @@ -554,8 +554,15 @@ pub(crate) mod tests { /// Generate a test group pub fn generate_group(state: Option) -> StoredGroup { + // Default behavior: Use `now_ns()` as the creation time + generate_group_with_created_at(state, now_ns()) + } + + pub fn generate_group_with_created_at( + state: Option, + created_at_ns: i64, + ) -> StoredGroup { let id = rand_vec::<24>(); - let created_at_ns = now_ns(); let membership_state = state.unwrap_or(GroupMembershipState::Allowed); StoredGroup::new( id, diff --git a/xmtp_mls/src/storage/encrypted_store/group_message.rs b/xmtp_mls/src/storage/encrypted_store/group_message.rs index cfc3d599c..30fd41b44 100644 --- a/xmtp_mls/src/storage/encrypted_store/group_message.rs +++ b/xmtp_mls/src/storage/encrypted_store/group_message.rs @@ -401,7 +401,7 @@ pub(crate) mod tests { use wasm_bindgen_test::wasm_bindgen_test; use xmtp_common::{assert_err, assert_ok, rand_time, rand_vec}; - fn generate_message( + pub(crate) fn generate_message( kind: Option, group_id: Option<&[u8]>, sent_at_ns: Option, diff --git a/xmtp_mls/src/storage/encrypted_store/mod.rs b/xmtp_mls/src/storage/encrypted_store/mod.rs index e89e306bd..dfeecd843 100644 --- a/xmtp_mls/src/storage/encrypted_store/mod.rs +++ b/xmtp_mls/src/storage/encrypted_store/mod.rs @@ -12,6 +12,7 @@ pub mod association_state; pub mod consent_record; +mod conversation_list; pub mod db_connection; pub mod group; pub mod group_intent; diff --git a/xmtp_mls/src/storage/encrypted_store/schema.rs b/xmtp_mls/src/storage/encrypted_store/schema.rs index 9dba148f6..640b71f44 100644 --- a/xmtp_mls/src/storage/encrypted_store/schema.rs +++ b/xmtp_mls/src/storage/encrypted_store/schema.rs @@ -125,6 +125,31 @@ diesel::table! { } } +diesel::table! { + conversation_list (id) { + id -> Binary, + created_at_ns -> BigInt, + membership_state -> Integer, + installations_last_checked -> BigInt, + added_by_inbox_id -> Text, + welcome_id -> Nullable, + dm_inbox_id -> Nullable, + rotated_at_ns -> BigInt, + conversation_type -> Integer, + message_id -> Nullable, + decrypted_message_bytes -> Nullable, + sent_at_ns -> Nullable, + message_kind -> Nullable, + sender_installation_id -> Nullable, + sender_inbox_id -> Nullable, + delivery_status -> Nullable, + content_type -> Nullable, + version_major -> Nullable, + version_minor -> Nullable, + authority_id -> Nullable + } +} + diesel::joinable!(group_intents -> groups (group_id)); diesel::joinable!(group_messages -> groups (group_id)); @@ -142,4 +167,5 @@ diesel::allow_tables_to_appear_in_same_query!( refresh_state, user_preferences, wallet_addresses, + conversation_list );