Skip to content

Commit

Permalink
Save reference_id to DB and add get_group_messages_with_reactions fun…
Browse files Browse the repository at this point in the history
…ction (#1454)

* save reference_id to DB and add get_group_messages_with_reactions function

* add find messages with reactions with tests to ffi bindings

---------

Co-authored-by: cameronvoell <[email protected]>
  • Loading branch information
cameronvoell and cameronvoell authored Jan 9, 2025
1 parent ab9bb05 commit 2695d5f
Show file tree
Hide file tree
Showing 9 changed files with 456 additions and 17 deletions.
310 changes: 301 additions & 9 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
pub use crate::inbox_owner::SigningError;
use crate::logger::init_logger;
use crate::{FfiSubscribeError, GenericError};
use prost::Message;
use std::{collections::HashMap, convert::TryInto, sync::Arc};
use tokio::sync::Mutex;
use xmtp_api_grpc::grpc_api_helper::Client as TonicApiClient;
use xmtp_content_types::reaction::ReactionCodec;
use xmtp_content_types::ContentCodec;
use xmtp_id::associations::verify_signed_with_public_context;
use xmtp_id::scw_verifier::RemoteSignatureVerifier;
use xmtp_id::{
Expand All @@ -19,8 +22,8 @@ use xmtp_mls::groups::device_sync::preference_sync::UserPreferenceUpdate;
use xmtp_mls::groups::scoped_client::LocalScopedGroupClient;
use xmtp_mls::groups::HmacKey;
use xmtp_mls::storage::group::ConversationType;
use xmtp_mls::storage::group_message::SortDirection;
use xmtp_mls::storage::group_message::{ContentType, MsgQueryArgs};
use xmtp_mls::storage::group_message::{SortDirection, StoredGroupMessageWithReactions};
use xmtp_mls::{
api::ApiClientWrapper,
builder::ClientBuilder,
Expand All @@ -46,7 +49,8 @@ use xmtp_mls::{
},
AbortHandle, GenericStreamHandle, StreamHandle,
};
use xmtp_proto::xmtp::mls::message_contents::DeviceSyncKind;
use xmtp_proto::xmtp::mls::message_contents::content_types::ReactionV2;
use xmtp_proto::xmtp::mls::message_contents::{DeviceSyncKind, EncodedContent};
pub type RustXmtpClient = MlsClient<TonicApiClient>;

#[derive(uniffi::Object, Clone)]
Expand Down Expand Up @@ -1438,6 +1442,37 @@ impl FfiConversation {
Ok(messages)
}

pub async fn find_messages_with_reactions(
&self,
opts: FfiListMessagesOptions,
) -> Result<Vec<FfiMessageWithReactions>, GenericError> {
let delivery_status = opts.delivery_status.map(|status| status.into());
let direction = opts.direction.map(|dir| dir.into());
let kind = match self.conversation_type().await? {
FfiConversationType::Group => None,
FfiConversationType::Dm => Some(GroupMessageKind::Application),
FfiConversationType::Sync => None,
};

let messages: Vec<FfiMessageWithReactions> = self
.inner
.find_messages_with_reactions(&MsgQueryArgs {
sent_before_ns: opts.sent_before_ns,
sent_after_ns: opts.sent_after_ns,
kind,
delivery_status,
limit: opts.limit,
direction,
content_types: opts
.content_types
.map(|types| types.into_iter().map(Into::into).collect()),
})?
.into_iter()
.map(|msg| msg.into())
.collect();
Ok(messages)
}

pub async fn process_streamed_conversation_message(
&self,
envelope_bytes: Vec<u8>,
Expand Down Expand Up @@ -1769,6 +1804,135 @@ impl From<FfiDeliveryStatus> for DeliveryStatus {
}
}

#[derive(uniffi::Record)]
pub struct FfiMessageWithReactions {
pub message: FfiMessage,
pub reactions: Vec<FfiMessage>,
}

impl From<StoredGroupMessageWithReactions> for FfiMessageWithReactions {
fn from(msg_with_reactions: StoredGroupMessageWithReactions) -> Self {
Self {
message: msg_with_reactions.message.into(),
reactions: msg_with_reactions
.reactions
.into_iter()
.map(|reaction| reaction.into())
.collect(),
}
}
}

#[derive(uniffi::Record, Clone, Default)]
pub struct FfiReaction {
pub reference: String,
pub reference_inbox_id: String,
pub action: FfiReactionAction,
pub content: String,
pub schema: FfiReactionSchema,
}

impl From<FfiReaction> for ReactionV2 {
fn from(reaction: FfiReaction) -> Self {
ReactionV2 {
reference: reaction.reference,
reference_inbox_id: reaction.reference_inbox_id,
action: reaction.action.into(),
content: reaction.content,
schema: reaction.schema.into(),
}
}
}

impl From<ReactionV2> for FfiReaction {
fn from(reaction: ReactionV2) -> Self {
FfiReaction {
reference: reaction.reference,
reference_inbox_id: reaction.reference_inbox_id,
action: match reaction.action {
1 => FfiReactionAction::Added,
2 => FfiReactionAction::Removed,
_ => FfiReactionAction::Unknown,
},
content: reaction.content,
schema: match reaction.schema {
1 => FfiReactionSchema::Unicode,
2 => FfiReactionSchema::Shortcode,
3 => FfiReactionSchema::Custom,
_ => FfiReactionSchema::Unknown,
},
}
}
}

#[uniffi::export]
pub fn encode_reaction(reaction: FfiReaction) -> Result<Vec<u8>, GenericError> {
// Convert FfiReaction to Reaction
let reaction: ReactionV2 = reaction.into();

// Use ReactionCodec to encode the reaction
let encoded = ReactionCodec::encode(reaction)
.map_err(|e| GenericError::Generic { err: e.to_string() })?;

// Encode the EncodedContent to bytes
let mut buf = Vec::new();
encoded
.encode(&mut buf)
.map_err(|e| GenericError::Generic { err: e.to_string() })?;

Ok(buf)
}

#[uniffi::export]
pub fn decode_reaction(bytes: Vec<u8>) -> Result<FfiReaction, GenericError> {
// Decode bytes into EncodedContent
let encoded_content = EncodedContent::decode(bytes.as_slice())
.map_err(|e| GenericError::Generic { err: e.to_string() })?;

// Use ReactionCodec to decode into Reaction and convert to FfiReaction
ReactionCodec::decode(encoded_content)
.map(Into::into)
.map_err(|e| GenericError::Generic { err: e.to_string() })
}

#[derive(uniffi::Enum, Clone, Default, PartialEq, Debug)]
pub enum FfiReactionAction {
Unknown,
#[default]
Added,
Removed,
}

impl From<FfiReactionAction> for i32 {
fn from(action: FfiReactionAction) -> Self {
match action {
FfiReactionAction::Unknown => 0,
FfiReactionAction::Added => 1,
FfiReactionAction::Removed => 2,
}
}
}

#[derive(uniffi::Enum, Clone, Default, PartialEq, Debug)]
pub enum FfiReactionSchema {
Unknown,
#[default]
Unicode,
Shortcode,
Custom,
}

impl From<FfiReactionSchema> for i32 {
fn from(schema: FfiReactionSchema) -> Self {
match schema {
FfiReactionSchema::Unknown => 0,
FfiReactionSchema::Unicode => 1,
FfiReactionSchema::Shortcode => 2,
FfiReactionSchema::Custom => 3,
}
}
}

#[derive(uniffi::Record, Clone)]
pub struct FfiMessage {
pub id: Vec<u8>,
Expand Down Expand Up @@ -1974,12 +2138,13 @@ mod tests {
FfiPreferenceUpdate, FfiXmtpClient,
};
use crate::{
connect_to_backend, get_inbox_id_for_address, inbox_owner::SigningError, FfiConsent,
FfiConsentEntityType, FfiConsentState, FfiContentType, FfiConversation,
FfiConversationCallback, FfiConversationMessageKind, FfiCreateGroupOptions, FfiDirection,
FfiGroupPermissionsOptions, FfiInboxOwner, FfiListConversationsOptions,
FfiListMessagesOptions, FfiMetadataField, FfiPermissionPolicy, FfiPermissionPolicySet,
FfiPermissionUpdateType, FfiSubscribeError,
connect_to_backend, decode_reaction, encode_reaction, get_inbox_id_for_address,
inbox_owner::SigningError, FfiConsent, FfiConsentEntityType, FfiConsentState,
FfiContentType, FfiConversation, FfiConversationCallback, FfiConversationMessageKind,
FfiCreateGroupOptions, FfiDirection, FfiGroupPermissionsOptions, FfiInboxOwner,
FfiListConversationsOptions, FfiListMessagesOptions, FfiMessageWithReactions,
FfiMetadataField, FfiPermissionPolicy, FfiPermissionPolicySet, FfiPermissionUpdateType,
FfiReaction, FfiReactionAction, FfiReactionSchema, FfiSubscribeError,
};
use ethers::utils::hex;
use prost::Message;
Expand All @@ -2004,7 +2169,10 @@ mod tests {
storage::EncryptionKey,
InboxOwner,
};
use xmtp_proto::xmtp::mls::message_contents::{ContentTypeId, EncodedContent};
use xmtp_proto::xmtp::mls::message_contents::{
content_types::{ReactionAction, ReactionSchema, ReactionV2},
ContentTypeId, EncodedContent,
};

const HISTORY_SYNC_URL: &str = "http://localhost:5558";

Expand Down Expand Up @@ -5254,4 +5422,128 @@ mod tests {
let text_message = TextCodec::decode(latest_message_encoded_content).unwrap();
assert_eq!(text_message, "hey alix");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_can_send_and_receive_reaction() {
// Create two test clients
let alix = new_test_client().await;
let bo = new_test_client().await;

// Create a conversation between them
let alix_conversation = alix
.conversations()
.create_group(
vec![bo.account_address.clone()],
FfiCreateGroupOptions::default(),
)
.await
.unwrap();
// Send initial message to react to
let mut buf = Vec::new();
TextCodec::encode("Hello world".to_string())
.unwrap()
.encode(&mut buf)
.unwrap();
alix_conversation.send(buf).await.unwrap();

// Have Bo sync to get the conversation and message
bo.conversations().sync().await.unwrap();
let bo_conversation = bo.conversation(alix_conversation.id()).unwrap();
bo_conversation.sync().await.unwrap();

// Get the message to react to
let messages = bo_conversation
.find_messages(FfiListMessagesOptions::default())
.await
.unwrap();
let message_to_react_to = &messages[0];

// Create and send reaction
let ffi_reaction = FfiReaction {
reference: hex::encode(message_to_react_to.id.clone()),
reference_inbox_id: alix.inbox_id(),
action: FfiReactionAction::Added,
content: "👍".to_string(),
schema: FfiReactionSchema::Unicode,
};
let bytes_to_send = encode_reaction(ffi_reaction).unwrap();
bo_conversation.send(bytes_to_send).await.unwrap();

// Have Alix sync to get the reaction
alix_conversation.sync().await.unwrap();

// Get reactions for the original message
let messages = alix_conversation
.find_messages(FfiListMessagesOptions::default())
.await
.unwrap();

// Verify reaction details
assert_eq!(messages.len(), 3);
let received_reaction = &messages[2];
let message_content = received_reaction.content.clone();
let reaction = decode_reaction(message_content).unwrap();
assert_eq!(reaction.content, "👍");
assert_eq!(reaction.action, FfiReactionAction::Added);
assert_eq!(reaction.reference_inbox_id, alix.inbox_id());
assert_eq!(
reaction.reference,
hex::encode(message_to_react_to.id.clone())
);
assert_eq!(reaction.schema, FfiReactionSchema::Unicode);

// Test find_messages_with_reactions query
let messages_with_reactions: Vec<FfiMessageWithReactions> = alix_conversation
.find_messages_with_reactions(FfiListMessagesOptions::default())
.await
.unwrap();
assert_eq!(messages_with_reactions.len(), 2);
let message_with_reactions = &messages_with_reactions[1];
assert_eq!(message_with_reactions.reactions.len(), 1);
let message_content = message_with_reactions.reactions[0].content.clone();
let slice: &[u8] = message_content.as_slice();
let encoded_content = EncodedContent::decode(slice).unwrap();
let reaction = ReactionV2::decode(encoded_content.content.as_slice()).unwrap();
assert_eq!(reaction.content, "👍");
assert_eq!(reaction.action, ReactionAction::Added as i32);
assert_eq!(reaction.reference_inbox_id, alix.inbox_id());
assert_eq!(
reaction.reference,
hex::encode(message_to_react_to.id.clone())
);
assert_eq!(reaction.schema, ReactionSchema::Unicode as i32);
}

#[tokio::test]
async fn test_reaction_encode_decode() {
// Create a test reaction
let original_reaction = FfiReaction {
reference: "123abc".to_string(),
reference_inbox_id: "test_inbox_id".to_string(),
action: FfiReactionAction::Added,
content: "👍".to_string(),
schema: FfiReactionSchema::Unicode,
};

// Encode the reaction
let encoded_bytes = encode_reaction(original_reaction.clone())
.expect("Should encode reaction successfully");

// Decode the reaction
let decoded_reaction =
decode_reaction(encoded_bytes).expect("Should decode reaction successfully");

// Verify the decoded reaction matches the original
assert_eq!(decoded_reaction.reference, original_reaction.reference);
assert_eq!(
decoded_reaction.reference_inbox_id,
original_reaction.reference_inbox_id
);
assert!(matches!(decoded_reaction.action, FfiReactionAction::Added));
assert_eq!(decoded_reaction.content, original_reaction.content);
assert!(matches!(
decoded_reaction.schema,
FfiReactionSchema::Unicode
));
}
}
2 changes: 1 addition & 1 deletion xmtp_content_types/src/reaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl ContentCodec<ReactionV2> for ReactionCodec {
ContentTypeId {
authority_id: ReactionCodec::AUTHORITY_ID.to_string(),
type_id: ReactionCodec::TYPE_ID.to_string(),
version_major: 1,
version_major: 2,
version_minor: 0,
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP INDEX idx_group_messages_reference_id;
ALTER TABLE group_messages
DROP COLUMN reference_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE group_messages
ADD COLUMN reference_id BINARY;
CREATE INDEX idx_group_messages_reference_id ON group_messages(reference_id);
1 change: 1 addition & 0 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ where
version_major: conversation_item.version_major?,
version_minor: conversation_item.version_minor?,
authority_id: conversation_item.authority_id?,
reference_id: None, // conversation_item does not use message reference_id
})
});

Expand Down
Loading

0 comments on commit 2695d5f

Please sign in to comment.