Skip to content

Commit

Permalink
Skip duplicate message processing (#1022)
Browse files Browse the repository at this point in the history
## Summary

Often messages are received on a stream after they have already been synced. This can happen when you have multiple streams open, or when you are processing your own messages.

Processing messages is expensive and requires locking the database for a transaction. We're better off skipping these duplicates if we can. This does that by checking if the last synced ID is > the ID of the message.
  • Loading branch information
neekolas authored Aug 30, 2024
1 parent c4bc84f commit f7ac3c5
Showing 1 changed file with 63 additions and 47 deletions.
110 changes: 63 additions & 47 deletions xmtp_mls/src/groups/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ use futures::Stream;

use super::{extract_message_v1, GroupError, MlsGroup};
use crate::storage::group_message::StoredGroupMessage;
use crate::storage::refresh_state::EntityKind;
use crate::storage::StorageError;
use crate::subscriptions::{MessagesStreamInfo, StreamHandle};
use crate::XmtpApi;
use crate::{retry::Retry, retry_async, Client};
use crate::{retry_sync, XmtpApi};
use prost::Message;
use xmtp_proto::xmtp::mls::api::v1::GroupMessage;

Expand All @@ -31,53 +33,55 @@ impl MlsGroup {
);
let created_ns = msgv1.created_ns;

let client_pointer = client.clone();
let process_result = retry_async!(
Retry::default(),
(async {
let client_pointer = client_pointer.clone();
let client_id = client_id.clone();
let msgv1 = msgv1.clone();
self.context
.store
.transaction_async(|provider| async move {
let mut openmls_group = self.load_mls_group(&provider)?;

// Attempt processing immediately, but fail if the message is not an Application Message
// Returning an error should roll back the DB tx
log::info!(
"current epoch for [{}] in process_stream_entry() is Epoch: [{}]",
client_id,
openmls_group.epoch()
);

self.process_message(
client_pointer.as_ref(),
&mut openmls_group,
&provider,
&msgv1,
false,
)
if !self.has_already_synced(msg_id)? {
let client_pointer = client.clone();
let process_result = retry_async!(
Retry::default(),
(async {
let client_pointer = client_pointer.clone();
let client_id = client_id.clone();
let msgv1 = msgv1.clone();
self.context
.store
.transaction_async(|provider| async move {
let mut openmls_group = self.load_mls_group(&provider)?;

// Attempt processing immediately, but fail if the message is not an Application Message
// Returning an error should roll back the DB tx
log::info!(
"current epoch for [{}] in process_stream_entry() is Epoch: [{}]",
client_id,
openmls_group.epoch()
);

self.process_message(
client_pointer.as_ref(),
&mut openmls_group,
&provider,
&msgv1,
false,
)
.await
.map_err(GroupError::ReceiveError)
})
.await
.map_err(GroupError::ReceiveError)
})
.await
})
);

if let Some(GroupError::ReceiveError(_)) = process_result.as_ref().err() {
// Swallow errors here, since another process may have successfully saved the message
// to the DB
match self.sync_with_conn(&client.mls_provider()?, &client).await {
Ok(_) => {
log::debug!("Sync triggered by streamed message successful")
}
Err(err) => {
log::warn!("Sync triggered by streamed message failed: {}", err);
}
};
} else if process_result.is_err() {
log::error!("Process stream entry {:?}", process_result.err());
})
);

if let Some(GroupError::ReceiveError(_)) = process_result.as_ref().err() {
// Swallow errors here, since another process may have successfully saved the message
// to the DB
match self.sync_with_conn(&client.mls_provider()?, &client).await {
Ok(_) => {
log::debug!("Sync triggered by streamed message successful")
}
Err(err) => {
log::warn!("Sync triggered by streamed message failed: {}", err);
}
};
} else if process_result.is_err() {
log::error!("Process stream entry {:?}", process_result.err());
}
}

// Load the message from the DB to handle cases where it may have been already processed in
Expand All @@ -91,6 +95,18 @@ impl MlsGroup {
Ok(new_message)
}

// Checks if a message has already been processed through a sync
fn has_already_synced(&self, id: u64) -> Result<bool, GroupError> {
let check_for_last_cursor = || -> Result<i64, StorageError> {
let conn = self.context.store.conn()?;
conn.get_last_cursor_for_id(&self.group_id, EntityKind::Group)
};

let last_id = retry_sync!(Retry::default(), check_for_last_cursor)?;

Ok(last_id >= id as i64)
}

pub async fn process_streamed_group_message<ApiClient>(
&self,
envelope_bytes: Vec<u8>,
Expand Down

0 comments on commit f7ac3c5

Please sign in to comment.