From f7ac3c5cb11c74352d45ec5fe6dcd5884915db30 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar <65710+neekolas@users.noreply.github.com> Date: Thu, 29 Aug 2024 20:11:24 -0700 Subject: [PATCH] Skip duplicate message processing (#1022) ## Summary Often messages are received on a stream after they have already been synced. This can happen when you have multiple streams open, or when you are processing your own messages. Processing messages is expensive and requires locking the database for a transaction. We're better off skipping these duplicates if we can. This does that by checking if the last synced ID is > the ID of the message. --- xmtp_mls/src/groups/subscriptions.rs | 110 +++++++++++++++------------ 1 file changed, 63 insertions(+), 47 deletions(-) diff --git a/xmtp_mls/src/groups/subscriptions.rs b/xmtp_mls/src/groups/subscriptions.rs index a7f0cb8a8..88cf2faa6 100644 --- a/xmtp_mls/src/groups/subscriptions.rs +++ b/xmtp_mls/src/groups/subscriptions.rs @@ -6,9 +6,11 @@ use futures::Stream; use super::{extract_message_v1, GroupError, MlsGroup}; use crate::storage::group_message::StoredGroupMessage; +use crate::storage::refresh_state::EntityKind; +use crate::storage::StorageError; use crate::subscriptions::{MessagesStreamInfo, StreamHandle}; -use crate::XmtpApi; use crate::{retry::Retry, retry_async, Client}; +use crate::{retry_sync, XmtpApi}; use prost::Message; use xmtp_proto::xmtp::mls::api::v1::GroupMessage; @@ -31,53 +33,55 @@ impl MlsGroup { ); let created_ns = msgv1.created_ns; - let client_pointer = client.clone(); - let process_result = retry_async!( - Retry::default(), - (async { - let client_pointer = client_pointer.clone(); - let client_id = client_id.clone(); - let msgv1 = msgv1.clone(); - self.context - .store - .transaction_async(|provider| async move { - let mut openmls_group = self.load_mls_group(&provider)?; - - // Attempt processing immediately, but fail if the message is not an Application Message - // Returning an error should roll back the DB tx - log::info!( - "current epoch for [{}] in process_stream_entry() is Epoch: [{}]", - client_id, - openmls_group.epoch() - ); - - self.process_message( - client_pointer.as_ref(), - &mut openmls_group, - &provider, - &msgv1, - false, - ) + if !self.has_already_synced(msg_id)? { + let client_pointer = client.clone(); + let process_result = retry_async!( + Retry::default(), + (async { + let client_pointer = client_pointer.clone(); + let client_id = client_id.clone(); + let msgv1 = msgv1.clone(); + self.context + .store + .transaction_async(|provider| async move { + let mut openmls_group = self.load_mls_group(&provider)?; + + // Attempt processing immediately, but fail if the message is not an Application Message + // Returning an error should roll back the DB tx + log::info!( + "current epoch for [{}] in process_stream_entry() is Epoch: [{}]", + client_id, + openmls_group.epoch() + ); + + self.process_message( + client_pointer.as_ref(), + &mut openmls_group, + &provider, + &msgv1, + false, + ) + .await + .map_err(GroupError::ReceiveError) + }) .await - .map_err(GroupError::ReceiveError) - }) - .await - }) - ); - - if let Some(GroupError::ReceiveError(_)) = process_result.as_ref().err() { - // Swallow errors here, since another process may have successfully saved the message - // to the DB - match self.sync_with_conn(&client.mls_provider()?, &client).await { - Ok(_) => { - log::debug!("Sync triggered by streamed message successful") - } - Err(err) => { - log::warn!("Sync triggered by streamed message failed: {}", err); - } - }; - } else if process_result.is_err() { - log::error!("Process stream entry {:?}", process_result.err()); + }) + ); + + if let Some(GroupError::ReceiveError(_)) = process_result.as_ref().err() { + // Swallow errors here, since another process may have successfully saved the message + // to the DB + match self.sync_with_conn(&client.mls_provider()?, &client).await { + Ok(_) => { + log::debug!("Sync triggered by streamed message successful") + } + Err(err) => { + log::warn!("Sync triggered by streamed message failed: {}", err); + } + }; + } else if process_result.is_err() { + log::error!("Process stream entry {:?}", process_result.err()); + } } // Load the message from the DB to handle cases where it may have been already processed in @@ -91,6 +95,18 @@ impl MlsGroup { Ok(new_message) } + // Checks if a message has already been processed through a sync + fn has_already_synced(&self, id: u64) -> Result { + let check_for_last_cursor = || -> Result { + let conn = self.context.store.conn()?; + conn.get_last_cursor_for_id(&self.group_id, EntityKind::Group) + }; + + let last_id = retry_sync!(Retry::default(), check_for_last_cursor)?; + + Ok(last_id >= id as i64) + } + pub async fn process_streamed_group_message( &self, envelope_bytes: Vec,