From ef28fd131f3de0f4b111023302e1fe69d616dd96 Mon Sep 17 00:00:00 2001 From: brianp Date: Tue, 12 Sep 2023 22:49:25 +0200 Subject: [PATCH] Expand reply / metadata functionality --- Cargo.lock | 1 + base_layer/chat_ffi/chat.h | 49 ++++++- base_layer/chat_ffi/src/lib.rs | 137 ++++++++++++++++-- .../examples/chat_client/src/client.rs | 23 ++- .../src/contacts_service/types/message.rs | 7 +- .../contacts_service/types/message_builder.rs | 11 +- integration_tests/Cargo.toml | 1 + integration_tests/src/chat_ffi.rs | 91 ++++++++---- .../tests/features/ChatFFI.feature | 10 ++ integration_tests/tests/steps/chat_steps.rs | 97 ++++++++++++- 10 files changed, 372 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a8b97e5cf67..c6b882b8e23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5758,6 +5758,7 @@ dependencies = [ "time", "tokio", "tonic 0.6.2", + "uuid", ] [[package]] diff --git a/base_layer/chat_ffi/chat.h b/base_layer/chat_ffi/chat.h index f7db2686294..8a9a773b459 100644 --- a/base_layer/chat_ffi/chat.h +++ b/base_layer/chat_ffi/chat.h @@ -16,6 +16,8 @@ struct ChatClientFFI; struct ChatMessages; +struct Message; + struct TariAddress; struct TransportConfig; @@ -116,20 +118,57 @@ void destroy_chat_config(struct ApplicationConfig *config); * * ## Arguments * `client` - The Client pointer + * `message` - Pointer to a Message struct + * `error_out` - Pointer to an int which will be modified + * + * ## Returns + * `()` - Does not return a value, equivalent to void in C + * + * # Safety + * The ```receiver``` should be destroyed after use + */ +void send_chat_message(struct ChatClientFFI *client, struct Message *message, int *error_out); + +/** + * Creates a message and returns a ptr to it + * + * ## Arguments * `receiver` - A string containing a tari address * `message` - The peer seeds config for the node * `error_out` - Pointer to an int which will be modified * * ## Returns - * `()` - Does not return a value, equivalent to void in C + * `*mut Message` - Does not return a value, equivalent to void in C * * # Safety * The ```receiver``` should be destroyed after use */ -void send_chat_message(struct ChatClientFFI *client, - struct TariAddress *receiver, - const char *message_c_char, - int *error_out); +struct Message *create_chat_message(struct TariAddress *receiver, + const char *message, + int *error_out); + +/** + * Creates message metadata + * + * ## Arguments + * `message` - A pointer to a message *IMPORTANT: This pointer will be consumed, and dropped during this function call. + * A new pointer for a new message will be returned* + * `metadata_type` - An int8 that maps to MessageMetadataType enum + * '0' -> Reply + * '1' -> TokenRequest + * `data` - contents for the metadata + * `error_out` - Pointer to an int which will be modified + * + * ## Returns + * `*mut Message` - a new pointer to the extended message + * + * ## Safety + * `message` Argument is dropped during this function. + */ +struct Message *add_chat_message_metadata(struct Message *message, + int *metadata_type, + const char *data_char, + int *error_out); /** * Add a contact diff --git a/base_layer/chat_ffi/src/lib.rs b/base_layer/chat_ffi/src/lib.rs index 195c66fca72..bb4dca5c040 100644 --- a/base_layer/chat_ffi/src/lib.rs +++ b/base_layer/chat_ffi/src/lib.rs @@ -50,7 +50,7 @@ use tari_common_types::tari_address::TariAddress; use tari_comms::multiaddr::Multiaddr; use tari_contacts::contacts_service::{ handle::{DEFAULT_MESSAGE_LIMIT, DEFAULT_MESSAGE_PAGE}, - types::Message, + types::{Message, MessageBuilder, MessageMetadata, MessageMetadataType}, }; use tari_p2p::{SocksAuthentication, TorControlAuthentication, TorTransportConfig, TransportConfig, TransportType}; use tari_utilities::hex; @@ -485,8 +485,7 @@ unsafe fn init_logging(log_path: PathBuf, error_out: *mut c_int) { /// /// ## Arguments /// `client` - The Client pointer -/// `receiver` - A string containing a tari address -/// `message` - The peer seeds config for the node +/// `message` - Pointer to a Message struct /// `error_out` - Pointer to an int which will be modified /// /// ## Returns @@ -495,12 +494,7 @@ unsafe fn init_logging(log_path: PathBuf, error_out: *mut c_int) { /// # Safety /// The ```receiver``` should be destroyed after use #[no_mangle] -pub unsafe extern "C" fn send_chat_message( - client: *mut ChatClientFFI, - receiver: *mut TariAddress, - message_c_char: *const c_char, - error_out: *mut c_int, -) { +pub unsafe extern "C" fn send_chat_message(client: *mut ChatClientFFI, message: *mut Message, error_out: *mut c_int) { let mut error = 0; ptr::swap(error_out, &mut error as *mut c_int); @@ -509,23 +503,138 @@ pub unsafe extern "C" fn send_chat_message( ptr::swap(error_out, &mut error as *mut c_int); } + if message.is_null() { + error = LibChatError::from(InterfaceError::NullError("message".to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + } + + (*client) + .runtime + .block_on((*client).client.send_message((*message).clone())); +} + +/// Creates a message and returns a ptr to it +/// +/// ## Arguments +/// `receiver` - A string containing a tari address +/// `message` - The peer seeds config for the node +/// `error_out` - Pointer to an int which will be modified +/// +/// ## Returns +/// `*mut Message` - Does not return a value, equivalent to void in C +/// +/// # Safety +/// The ```receiver``` should be destroyed after use +#[no_mangle] +pub unsafe extern "C" fn create_chat_message( + receiver: *mut TariAddress, + message: *const c_char, + error_out: *mut c_int, +) -> *mut Message { + let mut error = 0; + ptr::swap(error_out, &mut error as *mut c_int); + if receiver.is_null() { error = LibChatError::from(InterfaceError::NullError("receiver".to_string())).code; ptr::swap(error_out, &mut error as *mut c_int); } - let message = match CStr::from_ptr(message_c_char).to_str() { + let message_str = match CStr::from_ptr(message).to_str() { Ok(str) => str.to_string(), Err(e) => { error = LibChatError::from(InterfaceError::InvalidArgument(e.to_string())).code; ptr::swap(error_out, &mut error as *mut c_int); - return; + return ptr::null_mut(); }, }; - (*client) - .runtime - .block_on((*client).client.send_message((*receiver).clone(), message)); + let message_out = MessageBuilder::new() + .address((*receiver).clone()) + .message(message_str) + .build(); + + Box::into_raw(Box::new(message_out)) +} + +/// Creates message metadata +/// +/// ## Arguments +/// `message` - A pointer to a message *IMPORTANT: This pointer will be consumed, and dropped during this function call. +/// A new pointer for a new message will be returned* +/// `metadata_type` - An int8 that maps to MessageMetadataType enum +/// '0' -> Reply +/// '1' -> TokenRequest +/// `data` - contents for the metadata +/// `error_out` - Pointer to an int which will be modified +/// +/// ## Returns +/// `*mut Message` - a new pointer to the extended message +/// +/// ## Safety +/// `message` Argument is dropped during this function. +#[no_mangle] +pub unsafe extern "C" fn add_chat_message_metadata( + message: *mut Message, + metadata_type: *mut c_int, + data_char: *const c_char, + error_out: *mut c_int, +) -> *mut Message { + let mut error = 0; + ptr::swap(error_out, &mut error as *mut c_int); + + if message.is_null() { + error = LibChatError::from(InterfaceError::NullError("message".to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + return ptr::null_mut(); + } + + if metadata_type.is_null() { + error = LibChatError::from(InterfaceError::NullError("metadata type".to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + return ptr::null_mut(); + } + + let metadata_type = match u8::try_from(*metadata_type) { + Ok(n) => match MessageMetadataType::from_byte(n) { + Some(t) => t, + None => { + error = LibChatError::from(InterfaceError::InvalidArgument( + "Couldn't convert byte to Metadata type".to_string(), + )) + .code; + ptr::swap(error_out, &mut error as *mut c_int); + return ptr::null_mut(); + }, + }, + Err(e) => { + error = LibChatError::from(InterfaceError::InvalidArgument(e.to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + return ptr::null_mut(); + }, + }; + + if data_char.is_null() { + error = LibChatError::from(InterfaceError::NullError("data".to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + return ptr::null_mut(); + } + + let data = match CStr::from_ptr(data_char).to_str() { + Ok(str) => str.as_bytes().into(), + Err(e) => { + error = LibChatError::from(InterfaceError::InvalidArgument(e.to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + return ptr::null_mut(); + }, + }; + + let metadata = MessageMetadata { metadata_type, data }; + let new_message = Box::into_raw(Box::new( + MessageBuilder::from((*message).clone()).metadata(metadata).build(), + )); + drop(Box::from_raw(message)); + + new_message } /// Add a contact diff --git a/base_layer/contacts/examples/chat_client/src/client.rs b/base_layer/contacts/examples/chat_client/src/client.rs index 1e80fff8727..f487717d7f4 100644 --- a/base_layer/contacts/examples/chat_client/src/client.rs +++ b/base_layer/contacts/examples/chat_client/src/client.rs @@ -33,7 +33,7 @@ use tari_comms::{CommsNode, NodeIdentity}; use tari_contacts::contacts_service::{ handle::ContactsServiceHandle, service::ContactOnlineStatus, - types::{Message, MessageBuilder}, + types::{Message, MessageBuilder, MessageMetadata, MessageMetadataType}, }; use tari_shutdown::Shutdown; @@ -44,9 +44,11 @@ const LOG_TARGET: &str = "contacts::chat_client"; #[async_trait] pub trait ChatClient { async fn add_contact(&self, address: &TariAddress); + fn add_metadata(&self, message: Message, metadata_type: MessageMetadataType, data: String) -> Message; async fn check_online_status(&self, address: &TariAddress) -> ContactOnlineStatus; - async fn send_message(&self, receiver: TariAddress, message: String); + fn create_message(&self, receiver: &TariAddress, message: String) -> Message; async fn get_messages(&self, sender: &TariAddress, limit: u64, page: u64) -> Vec; + async fn send_message(&self, message: Message); fn identity(&self) -> &NodeIdentity; fn shutdown(&mut self); } @@ -148,10 +150,10 @@ impl ChatClient for Client { ContactOnlineStatus::Offline } - async fn send_message(&self, receiver: TariAddress, message: String) { + async fn send_message(&self, message: Message) { if let Some(mut contacts_service) = self.contacts.clone() { contacts_service - .send_message(MessageBuilder::new().message(message).address(receiver).build()) + .send_message(message) .await .expect("Message wasn't sent"); } @@ -168,6 +170,19 @@ impl ChatClient for Client { messages } + + fn create_message(&self, receiver: &TariAddress, message: String) -> Message { + MessageBuilder::new().address(receiver.clone()).message(message).build() + } + + fn add_metadata(&self, message: Message, metadata_type: MessageMetadataType, data: String) -> Message { + let metadata = MessageMetadata { + metadata_type, + data: data.into_bytes(), + }; + + MessageBuilder::from(message).metadata(metadata).build() + } } pub async fn wait_for_connectivity(comms: CommsNode) -> anyhow::Result<()> { diff --git a/base_layer/contacts/src/contacts_service/types/message.rs b/base_layer/contacts/src/contacts_service/types/message.rs index a24d23072ba..2147a373fca 100644 --- a/base_layer/contacts/src/contacts_service/types/message.rs +++ b/base_layer/contacts/src/contacts_service/types/message.rs @@ -43,7 +43,7 @@ pub struct Message { } #[repr(u8)] -#[derive(FromPrimitive, Debug, Copy, Clone, Default)] +#[derive(FromPrimitive, Debug, Copy, Clone, Default, PartialEq)] pub enum Direction { Inbound = 0, #[default] @@ -67,10 +67,11 @@ pub struct MessageMetadata { } #[repr(u8)] -#[derive(FromPrimitive, Debug, Copy, Clone, Default, Deserialize, Serialize)] +#[derive(FromPrimitive, Debug, Copy, Clone, Default, Deserialize, Serialize, PartialEq)] pub enum MessageMetadataType { + Reply = 0, #[default] - TokenRequest = 0, + TokenRequest = 1, } impl MessageMetadataType { diff --git a/base_layer/contacts/src/contacts_service/types/message_builder.rs b/base_layer/contacts/src/contacts_service/types/message_builder.rs index da4fff8b9c3..74b85f42379 100644 --- a/base_layer/contacts/src/contacts_service/types/message_builder.rs +++ b/base_layer/contacts/src/contacts_service/types/message_builder.rs @@ -21,7 +21,6 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use tari_common_types::tari_address::TariAddress; -use tari_utilities::ByteArray; use uuid::Uuid; use crate::contacts_service::types::{message::MessageMetadata, Message}; @@ -33,7 +32,7 @@ pub struct MessageBuilder { impl MessageBuilder { pub fn new() -> Self { - let message_id = Uuid::new_v4().into_bytes().to_vec(); + let message_id = Uuid::new_v4().to_string().into_bytes(); Self { inner: Message { @@ -78,3 +77,11 @@ impl MessageBuilder { self.inner.clone() } } + +impl From for MessageBuilder { + fn from(message: Message) -> Self { + Self { + inner: Message { ..message }, + } + } +} diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 8dc78ca48b4..4a085033643 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -49,6 +49,7 @@ thiserror = "^1.0.20" time = "0.3.15" tokio = { version = "1.10", features = ["macros", "time", "sync", "rt-multi-thread"] } tonic = "0.6.2" +uuid = { version = "1.3", features = ["v4"] } [package.metadata.cargo-machete] ignored = ["minotari_wallet_ffi", "minotari_chat_ffi"] diff --git a/integration_tests/src/chat_ffi.rs b/integration_tests/src/chat_ffi.rs index 9dd2f77f778..a5ca90fd02b 100644 --- a/integration_tests/src/chat_ffi.rs +++ b/integration_tests/src/chat_ffi.rs @@ -41,7 +41,10 @@ use tari_comms::{ peer_manager::{Peer, PeerFeatures}, NodeIdentity, }; -use tari_contacts::contacts_service::{service::ContactOnlineStatus, types::Message}; +use tari_contacts::contacts_service::{ + service::ContactOnlineStatus, + types::{Message, MessageMetadataType}, +}; use crate::{chat_client::test_config, get_port}; @@ -60,24 +63,26 @@ extern "C" fn callback_message_received(_state: *mut c_void) { extern "C" { pub fn create_chat_client( config: *mut c_void, - out_error: *const c_int, + error_out: *const c_int, callback_contact_status_change: unsafe extern "C" fn(*mut c_void), callback_message_received: unsafe extern "C" fn(*mut c_void), ) -> *mut ClientFFI; - pub fn send_chat_message( - client: *mut ClientFFI, - receiver: *mut c_void, - message: *const c_char, - out_error: *const c_int, - ); - pub fn add_chat_contact(client: *mut ClientFFI, address: *mut c_void, out_error: *const c_int); - pub fn check_online_status(client: *mut ClientFFI, address: *mut c_void, out_error: *const c_int) -> c_int; + pub fn create_chat_message(receiver: *mut c_void, message: *const c_char, error_out: *const c_int) -> *mut c_void; + pub fn send_chat_message(client: *mut ClientFFI, message: *mut c_void, error_out: *const c_int); + pub fn add_chat_message_metadata( + message: *mut c_void, + metadata_type: *const c_int, + data: *const c_char, + error_out: *const c_int, + ) -> *mut c_void; + pub fn add_chat_contact(client: *mut ClientFFI, address: *mut c_void, error_out: *const c_int); + pub fn check_online_status(client: *mut ClientFFI, address: *mut c_void, error_out: *const c_int) -> c_int; pub fn get_chat_messages( client: *mut ClientFFI, sender: *mut c_void, limit: *mut c_void, page: *mut c_void, - out_error: *const c_int, + error_out: *const c_int, ) -> *mut c_void; pub fn destroy_chat_client_ffi(client: *mut ClientFFI); } @@ -99,8 +104,8 @@ impl ChatClient for ChatFFI { let address_ptr = Box::into_raw(Box::new(address.to_owned())) as *mut c_void; - let out_error = Box::into_raw(Box::new(0)); - unsafe { add_chat_contact(client.0, address_ptr, out_error) } + let error_out = Box::into_raw(Box::new(0)); + unsafe { add_chat_contact(client.0, address_ptr, error_out) } } async fn check_online_status(&self, address: &TariAddress) -> ContactOnlineStatus { @@ -109,23 +114,20 @@ impl ChatClient for ChatFFI { let address_ptr = Box::into_raw(Box::new(address.clone())) as *mut c_void; let result; - let out_error = Box::into_raw(Box::new(0)); - unsafe { result = check_online_status(client.0, address_ptr, out_error) } + let error_out = Box::into_raw(Box::new(0)); + unsafe { result = check_online_status(client.0, address_ptr, error_out) } ContactOnlineStatus::from_byte(u8::try_from(result).unwrap()).expect("A valid u8 from FFI status") } - async fn send_message(&self, receiver: TariAddress, message: String) { + async fn send_message(&self, message: Message) { let client = self.ptr.lock().unwrap(); - let message_c_str = CString::new(message).unwrap(); - let message_c_char: *const c_char = CString::into_raw(message_c_str) as *const c_char; - - let receiver_ptr = Box::into_raw(Box::new(receiver)) as *mut c_void; - let out_error = Box::into_raw(Box::new(0)); + let error_out = Box::into_raw(Box::new(0)); + let message_ptr = Box::into_raw(Box::new(message)) as *mut c_void; unsafe { - send_chat_message(client.0, receiver_ptr, message_c_char, out_error); + send_chat_message(client.0, message_ptr, error_out); } } @@ -136,16 +138,53 @@ impl ChatClient for ChatFFI { let messages; unsafe { - let out_error = Box::into_raw(Box::new(0)); + let error_out = Box::into_raw(Box::new(0)); let limit = Box::into_raw(Box::new(limit)) as *mut c_void; let page = Box::into_raw(Box::new(page)) as *mut c_void; - let all_messages = get_chat_messages(client.0, address_ptr, limit, page, out_error) as *mut Vec; + let all_messages = get_chat_messages(client.0, address_ptr, limit, page, error_out) as *mut Vec; messages = (*all_messages).clone(); } messages } + fn create_message(&self, receiver: &TariAddress, message: String) -> Message { + let address_ptr = Box::into_raw(Box::new(receiver.to_owned())) as *mut c_void; + + let message_c_str = CString::new(message).unwrap(); + let message_c_char: *const c_char = CString::into_raw(message_c_str) as *const c_char; + + let error_out = Box::into_raw(Box::new(0)); + + let message; + + unsafe { + let message_ptr = create_chat_message(address_ptr, message_c_char, error_out) as *mut Message; + message = (*message_ptr).clone(); + } + + message + } + + fn add_metadata(&self, message: Message, metadata_type: MessageMetadataType, data: String) -> Message { + let message_ptr = Box::into_raw(Box::new(message)) as *mut c_void; + let message_type = Box::into_raw(Box::new(metadata_type.as_byte())) as *const c_int; + + let data_c_str = CString::new(data).unwrap(); + let data_c_char: *const c_char = CString::into_raw(data_c_str) as *const c_char; + + let error_out = Box::into_raw(Box::new(0)); + + let message; + unsafe { + let message_ptr = + add_chat_message_metadata(message_ptr, message_type, data_c_char, error_out) as *mut Message; + message = (*message_ptr).clone(); + } + + message + } + fn identity(&self) -> &NodeIdentity { &self.identity } @@ -189,14 +228,14 @@ pub async fn spawn_ffi_chat_client(name: &str, seed_peers: Vec, base_dir: let client_ptr; - let out_error = Box::into_raw(Box::new(0)); + let error_out = Box::into_raw(Box::new(0)); unsafe { *ChatCallback::instance().contact_status_change.lock().unwrap() = 0; client_ptr = create_chat_client( config_ptr, - out_error, + error_out, callback_contact_status_change, callback_message_received, ); diff --git a/integration_tests/tests/features/ChatFFI.feature b/integration_tests/tests/features/ChatFFI.feature index 3842ff156b5..4a98580b698 100644 --- a/integration_tests/tests/features/ChatFFI.feature +++ b/integration_tests/tests/features/ChatFFI.feature @@ -45,3 +45,13 @@ Feature: Chat FFI messaging Given I have a seed node SEED_A When I have a chat FFI client CHAT_A connected to seed node SEED_A Then I can shutdown CHAT_A without a problem + + Scenario: Reply to message + Given I have a seed node SEED_A + When I have a chat FFI client CHAT_A connected to seed node SEED_A + When I have a chat FFI client CHAT_B connected to seed node SEED_A + When I use CHAT_A to send a message 'Hey there' to CHAT_B + When I use CHAT_B to send a reply saying 'oh hai' to CHAT_A's message 'Hey there' + Then CHAT_B will have 2 messages with CHAT_A + Then CHAT_A will have 2 messages with CHAT_B + Then CHAT_A will have a replied to message from CHAT_B with 'oh hai' diff --git a/integration_tests/tests/steps/chat_steps.rs b/integration_tests/tests/steps/chat_steps.rs index 5fbe417efa6..9bc9a8aee55 100644 --- a/integration_tests/tests/steps/chat_steps.rs +++ b/integration_tests/tests/steps/chat_steps.rs @@ -28,6 +28,7 @@ use tari_common_types::tari_address::TariAddress; use tari_contacts::contacts_service::{ handle::{DEFAULT_MESSAGE_LIMIT, DEFAULT_MESSAGE_PAGE}, service::ContactOnlineStatus, + types::{Direction, Message, MessageMetadata, MessageMetadataType}, }; use tari_integration_tests::{chat_client::spawn_chat_client, TariWorld}; @@ -65,7 +66,51 @@ async fn send_message_to(world: &mut TariWorld, sender: String, message: String, let receiver = world.chat_clients.get(&receiver).unwrap(); let address = TariAddress::from_public_key(receiver.identity().public_key(), Network::LocalNet); - sender.send_message(address, message).await; + let message = sender.create_message(&address, message); + + sender.send_message(message).await; +} + +#[when(regex = r"^I use (.+) to send a reply saying '(.+)' to (.*)'s message '(.*)'$")] +async fn i_reply_to_message( + world: &mut TariWorld, + sender: String, + outbound_msg: String, + receiver: String, + inbound_msg: String, +) { + let sender = world.chat_clients.get(&sender).unwrap(); + let receiver = world.chat_clients.get(&receiver).unwrap(); + let address = TariAddress::from_public_key(receiver.identity().public_key(), Network::LocalNet); + + for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { + let messages: Vec = (*sender) + .get_messages(&address, DEFAULT_MESSAGE_LIMIT, DEFAULT_MESSAGE_PAGE) + .await; + + if messages.is_empty() { + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + continue; + } + + let inbound_chat_message = messages + .iter() + .find(|m| m.body == inbound_msg.clone().into_bytes()) + .expect("no message with that content found") + .clone(); + + let message = sender.create_message(&address, outbound_msg); + let message = sender.add_metadata( + message, + MessageMetadataType::Reply, + String::from_utf8(inbound_chat_message.message_id).expect("bytes to uuid"), + ); + + sender.send_message(message).await; + return; + } + + panic!("Never received incoming chat message",) } #[then(expr = "{word} will have {int} message(s) with {word}")] @@ -128,3 +173,53 @@ async fn wait_for_contact_to_be_online(world: &mut TariWorld, client: String, co last_status ) } + +#[then(regex = r"^(.+) will have a replied to message from (.*) with '(.*)'$")] +async fn have_replied_message(world: &mut TariWorld, receiver: String, sender: String, inbound_reply: String) { + let receiver = world.chat_clients.get(&receiver).unwrap(); + let sender = world.chat_clients.get(&sender).unwrap(); + let address = TariAddress::from_public_key(sender.identity().public_key(), Network::LocalNet); + + for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { + let messages: Vec = (*receiver) + .get_messages(&address, DEFAULT_MESSAGE_LIMIT, DEFAULT_MESSAGE_PAGE) + .await; + + // 1 message out, 1 message back = 2 + if messages.len() < 2 { + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + continue; + } + + let inbound_chat_message = messages + .iter() + .find(|m| m.body == inbound_reply.clone().into_bytes()) + .expect("no message with that content found") + .clone(); + + let outbound_chat_message = messages + .iter() + .find(|m| m.direction == Direction::Outbound) + .expect("no message with that direction found") + .clone(); + + let metadata: &MessageMetadata = &inbound_chat_message.metadata[0]; + + // Metadata data is a reply type + assert_eq!( + metadata.metadata_type, + MessageMetadataType::Reply, + "Metadata type is wrong" + ); + + // Metadata data contains id to original message + assert_eq!( + metadata.data, outbound_chat_message.message_id, + "Message id does not match" + ); + + return; + } + + panic!("Never received incoming chat message",) +}