diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index f4524df2d..2146f4939 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -1,9 +1,12 @@ pub use crate::inbox_owner::SigningError; use crate::logger::init_logger; use crate::{FfiSubscribeError, GenericError}; +use prost::Message; use std::{collections::HashMap, convert::TryInto, sync::Arc}; use tokio::sync::Mutex; use xmtp_api_grpc::grpc_api_helper::Client as TonicApiClient; +use xmtp_content_types::reaction::ReactionCodec; +use xmtp_content_types::ContentCodec; use xmtp_id::associations::verify_signed_with_public_context; use xmtp_id::scw_verifier::RemoteSignatureVerifier; use xmtp_id::{ @@ -19,8 +22,8 @@ use xmtp_mls::groups::device_sync::preference_sync::UserPreferenceUpdate; use xmtp_mls::groups::scoped_client::LocalScopedGroupClient; use xmtp_mls::groups::HmacKey; use xmtp_mls::storage::group::ConversationType; -use xmtp_mls::storage::group_message::SortDirection; use xmtp_mls::storage::group_message::{ContentType, MsgQueryArgs}; +use xmtp_mls::storage::group_message::{SortDirection, StoredGroupMessageWithReactions}; use xmtp_mls::{ api::ApiClientWrapper, builder::ClientBuilder, @@ -46,7 +49,8 @@ use xmtp_mls::{ }, AbortHandle, GenericStreamHandle, StreamHandle, }; -use xmtp_proto::xmtp::mls::message_contents::DeviceSyncKind; +use xmtp_proto::xmtp::mls::message_contents::content_types::ReactionV2; +use xmtp_proto::xmtp::mls::message_contents::{DeviceSyncKind, EncodedContent}; pub type RustXmtpClient = MlsClient; #[derive(uniffi::Object, Clone)] @@ -1438,6 +1442,37 @@ impl FfiConversation { Ok(messages) } + pub async fn find_messages_with_reactions( + &self, + opts: FfiListMessagesOptions, + ) -> Result, GenericError> { + let delivery_status = opts.delivery_status.map(|status| status.into()); + let direction = opts.direction.map(|dir| dir.into()); + let kind = match self.conversation_type().await? { + FfiConversationType::Group => None, + FfiConversationType::Dm => Some(GroupMessageKind::Application), + FfiConversationType::Sync => None, + }; + + let messages: Vec = self + .inner + .find_messages_with_reactions(&MsgQueryArgs { + sent_before_ns: opts.sent_before_ns, + sent_after_ns: opts.sent_after_ns, + kind, + delivery_status, + limit: opts.limit, + direction, + content_types: opts + .content_types + .map(|types| types.into_iter().map(Into::into).collect()), + })? + .into_iter() + .map(|msg| msg.into()) + .collect(); + Ok(messages) + } + pub async fn process_streamed_conversation_message( &self, envelope_bytes: Vec, @@ -1769,6 +1804,135 @@ impl From for DeliveryStatus { } } +#[derive(uniffi::Record)] +pub struct FfiMessageWithReactions { + pub message: FfiMessage, + pub reactions: Vec, +} + +impl From for FfiMessageWithReactions { + fn from(msg_with_reactions: StoredGroupMessageWithReactions) -> Self { + Self { + message: msg_with_reactions.message.into(), + reactions: msg_with_reactions + .reactions + .into_iter() + .map(|reaction| reaction.into()) + .collect(), + } + } +} + +#[derive(uniffi::Record, Clone, Default)] +pub struct FfiReaction { + pub reference: String, + pub reference_inbox_id: String, + pub action: FfiReactionAction, + pub content: String, + pub schema: FfiReactionSchema, +} + +impl From for ReactionV2 { + fn from(reaction: FfiReaction) -> Self { + ReactionV2 { + reference: reaction.reference, + reference_inbox_id: reaction.reference_inbox_id, + action: reaction.action.into(), + content: reaction.content, + schema: reaction.schema.into(), + } + } +} + +impl From for FfiReaction { + fn from(reaction: ReactionV2) -> Self { + FfiReaction { + reference: reaction.reference, + reference_inbox_id: reaction.reference_inbox_id, + action: match reaction.action { + 1 => FfiReactionAction::Added, + 2 => FfiReactionAction::Removed, + _ => FfiReactionAction::Unknown, + }, + content: reaction.content, + schema: match reaction.schema { + 1 => FfiReactionSchema::Unicode, + 2 => FfiReactionSchema::Shortcode, + 3 => FfiReactionSchema::Custom, + _ => FfiReactionSchema::Unknown, + }, + } + } +} + +#[uniffi::export] +pub fn encode_reaction(reaction: FfiReaction) -> Result, GenericError> { + // Convert FfiReaction to Reaction + let reaction: ReactionV2 = reaction.into(); + + // Use ReactionCodec to encode the reaction + let encoded = ReactionCodec::encode(reaction) + .map_err(|e| GenericError::Generic { err: e.to_string() })?; + + // Encode the EncodedContent to bytes + let mut buf = Vec::new(); + encoded + .encode(&mut buf) + .map_err(|e| GenericError::Generic { err: e.to_string() })?; + + Ok(buf) +} + +#[uniffi::export] +pub fn decode_reaction(bytes: Vec) -> Result { + // Decode bytes into EncodedContent + let encoded_content = EncodedContent::decode(bytes.as_slice()) + .map_err(|e| GenericError::Generic { err: e.to_string() })?; + + // Use ReactionCodec to decode into Reaction and convert to FfiReaction + ReactionCodec::decode(encoded_content) + .map(Into::into) + .map_err(|e| GenericError::Generic { err: e.to_string() }) +} + +#[derive(uniffi::Enum, Clone, Default, PartialEq, Debug)] +pub enum FfiReactionAction { + Unknown, + #[default] + Added, + Removed, +} + +impl From for i32 { + fn from(action: FfiReactionAction) -> Self { + match action { + FfiReactionAction::Unknown => 0, + FfiReactionAction::Added => 1, + FfiReactionAction::Removed => 2, + } + } +} + +#[derive(uniffi::Enum, Clone, Default, PartialEq, Debug)] +pub enum FfiReactionSchema { + Unknown, + #[default] + Unicode, + Shortcode, + Custom, +} + +impl From for i32 { + fn from(schema: FfiReactionSchema) -> Self { + match schema { + FfiReactionSchema::Unknown => 0, + FfiReactionSchema::Unicode => 1, + FfiReactionSchema::Shortcode => 2, + FfiReactionSchema::Custom => 3, + } + } +} + #[derive(uniffi::Record, Clone)] pub struct FfiMessage { pub id: Vec, @@ -1974,12 +2138,13 @@ mod tests { FfiPreferenceUpdate, FfiXmtpClient, }; use crate::{ - connect_to_backend, get_inbox_id_for_address, inbox_owner::SigningError, FfiConsent, - FfiConsentEntityType, FfiConsentState, FfiContentType, FfiConversation, - FfiConversationCallback, FfiConversationMessageKind, FfiCreateGroupOptions, FfiDirection, - FfiGroupPermissionsOptions, FfiInboxOwner, FfiListConversationsOptions, - FfiListMessagesOptions, FfiMetadataField, FfiPermissionPolicy, FfiPermissionPolicySet, - FfiPermissionUpdateType, FfiSubscribeError, + connect_to_backend, decode_reaction, encode_reaction, get_inbox_id_for_address, + inbox_owner::SigningError, FfiConsent, FfiConsentEntityType, FfiConsentState, + FfiContentType, FfiConversation, FfiConversationCallback, FfiConversationMessageKind, + FfiCreateGroupOptions, FfiDirection, FfiGroupPermissionsOptions, FfiInboxOwner, + FfiListConversationsOptions, FfiListMessagesOptions, FfiMessageWithReactions, + FfiMetadataField, FfiPermissionPolicy, FfiPermissionPolicySet, FfiPermissionUpdateType, + FfiReaction, FfiReactionAction, FfiReactionSchema, FfiSubscribeError, }; use ethers::utils::hex; use prost::Message; @@ -2004,7 +2169,10 @@ mod tests { storage::EncryptionKey, InboxOwner, }; - use xmtp_proto::xmtp::mls::message_contents::{ContentTypeId, EncodedContent}; + use xmtp_proto::xmtp::mls::message_contents::{ + content_types::{ReactionAction, ReactionSchema, ReactionV2}, + ContentTypeId, EncodedContent, + }; const HISTORY_SYNC_URL: &str = "http://localhost:5558"; @@ -5254,4 +5422,128 @@ mod tests { let text_message = TextCodec::decode(latest_message_encoded_content).unwrap(); assert_eq!(text_message, "hey alix"); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_can_send_and_receive_reaction() { + // Create two test clients + let alix = new_test_client().await; + let bo = new_test_client().await; + + // Create a conversation between them + let alix_conversation = alix + .conversations() + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + // Send initial message to react to + let mut buf = Vec::new(); + TextCodec::encode("Hello world".to_string()) + .unwrap() + .encode(&mut buf) + .unwrap(); + alix_conversation.send(buf).await.unwrap(); + + // Have Bo sync to get the conversation and message + bo.conversations().sync().await.unwrap(); + let bo_conversation = bo.conversation(alix_conversation.id()).unwrap(); + bo_conversation.sync().await.unwrap(); + + // Get the message to react to + let messages = bo_conversation + .find_messages(FfiListMessagesOptions::default()) + .await + .unwrap(); + let message_to_react_to = &messages[0]; + + // Create and send reaction + let ffi_reaction = FfiReaction { + reference: hex::encode(message_to_react_to.id.clone()), + reference_inbox_id: alix.inbox_id(), + action: FfiReactionAction::Added, + content: "👍".to_string(), + schema: FfiReactionSchema::Unicode, + }; + let bytes_to_send = encode_reaction(ffi_reaction).unwrap(); + bo_conversation.send(bytes_to_send).await.unwrap(); + + // Have Alix sync to get the reaction + alix_conversation.sync().await.unwrap(); + + // Get reactions for the original message + let messages = alix_conversation + .find_messages(FfiListMessagesOptions::default()) + .await + .unwrap(); + + // Verify reaction details + assert_eq!(messages.len(), 3); + let received_reaction = &messages[2]; + let message_content = received_reaction.content.clone(); + let reaction = decode_reaction(message_content).unwrap(); + assert_eq!(reaction.content, "👍"); + assert_eq!(reaction.action, FfiReactionAction::Added); + assert_eq!(reaction.reference_inbox_id, alix.inbox_id()); + assert_eq!( + reaction.reference, + hex::encode(message_to_react_to.id.clone()) + ); + assert_eq!(reaction.schema, FfiReactionSchema::Unicode); + + // Test find_messages_with_reactions query + let messages_with_reactions: Vec = alix_conversation + .find_messages_with_reactions(FfiListMessagesOptions::default()) + .await + .unwrap(); + assert_eq!(messages_with_reactions.len(), 2); + let message_with_reactions = &messages_with_reactions[1]; + assert_eq!(message_with_reactions.reactions.len(), 1); + let message_content = message_with_reactions.reactions[0].content.clone(); + let slice: &[u8] = message_content.as_slice(); + let encoded_content = EncodedContent::decode(slice).unwrap(); + let reaction = ReactionV2::decode(encoded_content.content.as_slice()).unwrap(); + assert_eq!(reaction.content, "👍"); + assert_eq!(reaction.action, ReactionAction::Added as i32); + assert_eq!(reaction.reference_inbox_id, alix.inbox_id()); + assert_eq!( + reaction.reference, + hex::encode(message_to_react_to.id.clone()) + ); + assert_eq!(reaction.schema, ReactionSchema::Unicode as i32); + } + + #[tokio::test] + async fn test_reaction_encode_decode() { + // Create a test reaction + let original_reaction = FfiReaction { + reference: "123abc".to_string(), + reference_inbox_id: "test_inbox_id".to_string(), + action: FfiReactionAction::Added, + content: "👍".to_string(), + schema: FfiReactionSchema::Unicode, + }; + + // Encode the reaction + let encoded_bytes = encode_reaction(original_reaction.clone()) + .expect("Should encode reaction successfully"); + + // Decode the reaction + let decoded_reaction = + decode_reaction(encoded_bytes).expect("Should decode reaction successfully"); + + // Verify the decoded reaction matches the original + assert_eq!(decoded_reaction.reference, original_reaction.reference); + assert_eq!( + decoded_reaction.reference_inbox_id, + original_reaction.reference_inbox_id + ); + assert!(matches!(decoded_reaction.action, FfiReactionAction::Added)); + assert_eq!(decoded_reaction.content, original_reaction.content); + assert!(matches!( + decoded_reaction.schema, + FfiReactionSchema::Unicode + )); + } } diff --git a/xmtp_content_types/src/reaction.rs b/xmtp_content_types/src/reaction.rs index 3f599bdf9..f70b925bc 100644 --- a/xmtp_content_types/src/reaction.rs +++ b/xmtp_content_types/src/reaction.rs @@ -20,7 +20,7 @@ impl ContentCodec for ReactionCodec { ContentTypeId { authority_id: ReactionCodec::AUTHORITY_ID.to_string(), type_id: ReactionCodec::TYPE_ID.to_string(), - version_major: 1, + version_major: 2, version_minor: 0, } } diff --git a/xmtp_mls/migrations/2025-01-03-002434_create_group_message_parent_id/down.sql b/xmtp_mls/migrations/2025-01-03-002434_create_group_message_parent_id/down.sql new file mode 100644 index 000000000..1045a6775 --- /dev/null +++ b/xmtp_mls/migrations/2025-01-03-002434_create_group_message_parent_id/down.sql @@ -0,0 +1,3 @@ +DROP INDEX idx_group_messages_reference_id; +ALTER TABLE group_messages +DROP COLUMN reference_id; \ No newline at end of file diff --git a/xmtp_mls/migrations/2025-01-03-002434_create_group_message_parent_id/up.sql b/xmtp_mls/migrations/2025-01-03-002434_create_group_message_parent_id/up.sql new file mode 100644 index 000000000..52732f957 --- /dev/null +++ b/xmtp_mls/migrations/2025-01-03-002434_create_group_message_parent_id/up.sql @@ -0,0 +1,3 @@ +ALTER TABLE group_messages +ADD COLUMN reference_id BINARY; +CREATE INDEX idx_group_messages_reference_id ON group_messages(reference_id); \ No newline at end of file diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 69be71c2e..989782cb0 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -699,6 +699,7 @@ where version_major: conversation_item.version_major?, version_minor: conversation_item.version_minor?, authority_id: conversation_item.authority_id?, + reference_id: None, // conversation_item does not use message reference_id }) }); diff --git a/xmtp_mls/src/groups/mls_sync.rs b/xmtp_mls/src/groups/mls_sync.rs index 522969392..339025918 100644 --- a/xmtp_mls/src/groups/mls_sync.rs +++ b/xmtp_mls/src/groups/mls_sync.rs @@ -559,6 +559,7 @@ where version_major: queryable_content_fields.version_major, version_minor: queryable_content_fields.version_minor, authority_id: queryable_content_fields.authority_id, + reference_id: queryable_content_fields.reference_id, } .store_or_ignore(provider.conn_ref())? } @@ -591,6 +592,7 @@ where version_major: 0, version_minor: 0, authority_id: "unknown".to_string(), + reference_id: None, } .store_or_ignore(provider.conn_ref())?; @@ -624,6 +626,7 @@ where version_major: 0, version_minor: 0, authority_id: "unknown".to_string(), + reference_id: None, } .store_or_ignore(provider.conn_ref())?; @@ -971,6 +974,7 @@ where version_major: content_type.version_major as i32, version_minor: content_type.version_minor as i32, authority_id: content_type.authority_id.to_string(), + reference_id: None, }; msg.store_or_ignore(conn)?; diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 70f9ae97c..0d4049d9a 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -35,6 +35,7 @@ use openmls_traits::OpenMlsProvider; use prost::Message; use thiserror::Error; use tokio::sync::Mutex; +use xmtp_content_types::reaction::ReactionCodec; use self::device_sync::DeviceSyncError; pub use self::group_permissions::PreconfiguredPolicies; @@ -58,7 +59,11 @@ use self::{ intents::IntentError, validated_commit::CommitValidationError, }; -use crate::storage::{group::DmIdExt, group_message::ContentType, NotFound, StorageError}; +use crate::storage::{ + group::DmIdExt, + group_message::{ContentType, StoredGroupMessageWithReactions}, + NotFound, StorageError, +}; use xmtp_common::time::now_ns; use xmtp_proto::xmtp::mls::{ api::v1::{ @@ -66,6 +71,7 @@ use xmtp_proto::xmtp::mls::{ GroupMessage, }, message_contents::{ + content_types::ReactionV2, plaintext_envelope::{Content, V1}, EncodedContent, PlaintextEnvelope, }, @@ -320,6 +326,7 @@ pub struct QueryableContentFields { pub version_major: i32, pub version_minor: i32, pub authority_id: String, + pub reference_id: Option>, } impl Default for QueryableContentFields { @@ -329,20 +336,38 @@ impl Default for QueryableContentFields { version_major: 0, version_minor: 0, authority_id: String::new(), + reference_id: None, } } } -impl From for QueryableContentFields { - fn from(content: EncodedContent) -> Self { +impl TryFrom for QueryableContentFields { + type Error = prost::DecodeError; + + fn try_from(content: EncodedContent) -> Result { let content_type_id = content.r#type.unwrap_or_default(); + let reference_id = match ( + content_type_id.type_id.as_str(), + content_type_id.version_major, + ) { + (ReactionCodec::TYPE_ID, major) if major >= 2 => { + let reaction = ReactionV2::decode(content.content.as_slice())?; + hex::decode(reaction.reference).ok() + } + (ReactionCodec::TYPE_ID, _) => { + // TODO: Implement JSON deserialization for legacy reaction format + None + } + _ => None, + }; - QueryableContentFields { + Ok(QueryableContentFields { content_type: content_type_id.type_id.into(), version_major: content_type_id.version_major as i32, version_minor: content_type_id.version_minor as i32, authority_id: content_type_id.authority_id.to_string(), - } + reference_id, + }) } } @@ -746,7 +771,14 @@ impl MlsGroup { // Return early with default if decoding fails or type is missing EncodedContent::decode(message) .inspect_err(|e| tracing::debug!("Failed to decode message as EncodedContent: {}", e)) - .map(QueryableContentFields::from) + .and_then(|content| { + QueryableContentFields::try_from(content).inspect_err(|e| { + tracing::debug!( + "Failed to convert EncodedContent to QueryableContentFields: {}", + e + ) + }) + }) .unwrap_or_default() } @@ -792,6 +824,7 @@ impl MlsGroup { version_major: queryable_content_fields.version_major, version_minor: queryable_content_fields.version_minor, authority_id: queryable_content_fields.authority_id, + reference_id: queryable_content_fields.reference_id, }; group_message.store(provider.conn_ref())?; @@ -818,6 +851,17 @@ impl MlsGroup { Ok(messages) } + /// Query the database for stored messages. Optionally filtered by time, kind, delivery_status + /// and limit + pub fn find_messages_with_reactions( + &self, + args: &MsgQueryArgs, + ) -> Result, GroupError> { + let conn = self.context().store().conn()?; + let messages = conn.get_group_messages_with_reactions(&self.group_id, args)?; + Ok(messages) + } + /// /// Add members to the group by account address /// diff --git a/xmtp_mls/src/storage/encrypted_store/group_message.rs b/xmtp_mls/src/storage/encrypted_store/group_message.rs index e4d05c86e..20335e88c 100644 --- a/xmtp_mls/src/storage/encrypted_store/group_message.rs +++ b/xmtp_mls/src/storage/encrypted_store/group_message.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use diesel::{ backend::Backend, deserialize::{self, FromSql, FromSqlRow}, @@ -6,6 +8,7 @@ use diesel::{ serialize::{self, IsNull, Output, ToSql}, sql_types::Integer, }; + use serde::{Deserialize, Serialize}; use xmtp_content_types::{ attachment, group_updated, membership_change, reaction, read_receipt, remote_attachment, reply, @@ -53,6 +56,14 @@ pub struct StoredGroupMessage { pub version_minor: i32, /// The ID of the authority defining the content type pub authority_id: String, + /// The ID of a referenced message + pub reference_id: Option>, +} + +pub struct StoredGroupMessageWithReactions { + pub message: StoredGroupMessage, + // Messages who's reference_id matches this message's id + pub reactions: Vec, } #[derive(Clone, Debug, PartialEq)] @@ -213,7 +224,7 @@ impl_fetch!(StoredGroupMessage, group_messages, Vec); impl_store!(StoredGroupMessage, group_messages); impl_store_or_ignore!(StoredGroupMessage, group_messages); -#[derive(Default)] +#[derive(Default, Clone)] pub struct MsgQueryArgs { pub sent_after_ns: Option, pub sent_before_ns: Option, @@ -282,6 +293,85 @@ impl DbConnection { Ok(self.raw_query(|conn| query.load::(conn))?) } + /// Query for group messages with their reactions + #[allow(clippy::too_many_arguments)] + pub fn get_group_messages_with_reactions( + &self, + group_id: &[u8], + args: &MsgQueryArgs, + ) -> Result, StorageError> { + // First get all the main messages + let mut modified_args = args.clone(); + // filter out reactions from the main query so we don't get them twice + let content_types = match modified_args.content_types.clone() { + Some(content_types) => { + let mut content_types = content_types.clone(); + content_types.retain(|content_type| *content_type != ContentType::Reaction); + Some(content_types) + } + None => Some(vec![ + ContentType::Text, + ContentType::GroupMembershipChange, + ContentType::GroupUpdated, + ContentType::ReadReceipt, + ContentType::Reply, + ContentType::Attachment, + ContentType::RemoteAttachment, + ContentType::TransactionReference, + ContentType::Unknown, + ]), + }; + + modified_args.content_types = content_types; + let messages = self.get_group_messages(group_id, &modified_args)?; + + // Then get all reactions for these messages in a single query + let message_ids: Vec<&[u8]> = messages.iter().map(|m| m.id.as_slice()).collect(); + + let mut reactions_query = dsl::group_messages + .filter(dsl::group_id.eq(group_id)) + .filter(dsl::reference_id.is_not_null()) + .filter(dsl::reference_id.eq_any(message_ids)) + .into_boxed(); + + // Apply the same sorting as the main messages + reactions_query = match args.direction.as_ref().unwrap_or(&SortDirection::Ascending) { + SortDirection::Ascending => reactions_query.order(dsl::sent_at_ns.asc()), + SortDirection::Descending => reactions_query.order(dsl::sent_at_ns.desc()), + }; + + let reactions: Vec = + self.raw_query(|conn| reactions_query.load(conn))?; + + // Group reactions by parent message id + let mut reactions_by_reference: HashMap, Vec> = HashMap::new(); + + for reaction in reactions { + if let Some(reference_id) = &reaction.reference_id { + reactions_by_reference + .entry(reference_id.clone()) + .or_default() + .push(reaction); + } + } + + // Combine messages with their reactions + let messages_with_reactions: Vec = messages + .into_iter() + .map(|message| { + let message_clone = message.clone(); + StoredGroupMessageWithReactions { + message, + reactions: reactions_by_reference + .remove(&message_clone.id) + .unwrap_or_default(), + } + }) + .collect(); + + Ok(messages_with_reactions) + } + /// Get a particular group message pub fn get_group_message>( &self, @@ -370,6 +460,7 @@ pub(crate) mod tests { version_major: 0, version_minor: 0, authority_id: "unknown".to_string(), + reference_id: None, } } diff --git a/xmtp_mls/src/storage/encrypted_store/schema_gen.rs b/xmtp_mls/src/storage/encrypted_store/schema_gen.rs index e3f160d14..f687f4fb9 100644 --- a/xmtp_mls/src/storage/encrypted_store/schema_gen.rs +++ b/xmtp_mls/src/storage/encrypted_store/schema_gen.rs @@ -47,6 +47,7 @@ diesel::table! { version_minor -> Integer, version_major -> Integer, authority_id -> Text, + reference_id -> Nullable, } }