Skip to content

Commit

Permalink
Stream preferences (#1424)
Browse files Browse the repository at this point in the history
* streaming wip

* the rest of the implementation

* test preference streaming

* comments

* typo
  • Loading branch information
codabrink authored Dec 16, 2024
1 parent 567751f commit 25758b4
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 18 deletions.
124 changes: 106 additions & 18 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use xmtp_id::{
},
InboxId,
};
use xmtp_mls::groups::device_sync::preference_sync::UserPreferenceUpdate;
use xmtp_mls::groups::scoped_client::LocalScopedGroupClient;
use xmtp_mls::groups::HmacKey;
use xmtp_mls::storage::group::ConversationType;
Expand Down Expand Up @@ -1072,6 +1073,8 @@ impl FfiConversations {
FfiStreamCloser::new(handle)
}

/// Get notified when there is a new consent update either locally or is synced from another device
/// allowing the user to re-render the new state appropriately
pub async fn stream_consent(&self, callback: Arc<dyn FfiConsentCallback>) -> FfiStreamCloser {
let handle =
RustXmtpClient::stream_consent_with_callback(self.inner_client.clone(), move |msg| {
Expand All @@ -1083,6 +1086,25 @@ impl FfiConversations {

FfiStreamCloser::new(handle)
}

/// Get notified when a preference changes either locally or is synced from another device
/// allowing the user to re-render the new state appropriately.
pub async fn stream_preferences(
&self,
callback: Arc<dyn FfiPreferenceCallback>,
) -> FfiStreamCloser {
let handle = RustXmtpClient::stream_preferences_with_callback(
self.inner_client.clone(),
move |msg| match msg {
Ok(m) => callback.on_preference_update(
m.into_iter().filter_map(|v| v.try_into().ok()).collect(),
),
Err(e) => callback.on_error(e.into()),
},
);

FfiStreamCloser::new(handle)
}
}

impl From<FfiConversationType> for ConversationType {
Expand All @@ -1095,6 +1117,20 @@ impl From<FfiConversationType> for ConversationType {
}
}

impl TryFrom<UserPreferenceUpdate> for FfiPreferenceUpdate {
type Error = GenericError;
fn try_from(value: UserPreferenceUpdate) -> Result<Self, Self::Error> {
match value {
UserPreferenceUpdate::HmacKeyUpdate { key } => Ok(FfiPreferenceUpdate::HMAC { key }),
// These are filtered out in the stream and should not be here
// We're keeping preference update and consent streams separate right now.
UserPreferenceUpdate::ConsentUpdate(_) => Err(GenericError::Generic {
err: "Consent updates should be filtered out.".to_string(),
}),
}
}
}

#[derive(uniffi::Object)]
pub struct FfiConversation {
inner: MlsGroup<RustXmtpClient>,
Expand Down Expand Up @@ -1755,6 +1791,17 @@ pub trait FfiConsentCallback: Send + Sync {
fn on_error(&self, error: FfiSubscribeError);
}

#[uniffi::export(with_foreign)]
pub trait FfiPreferenceCallback: Send + Sync {
fn on_preference_update(&self, preference: Vec<FfiPreferenceUpdate>);
fn on_error(&self, error: FfiSubscribeError);
}

#[derive(uniffi::Enum)]
pub enum FfiPreferenceUpdate {
HMAC { key: Vec<u8> },
}

#[derive(uniffi::Object)]
pub struct FfiConversationMetadata {
inner: Arc<GroupMetadata>,
Expand Down Expand Up @@ -1818,7 +1865,10 @@ impl FfiGroupPermissions {

#[cfg(test)]
mod tests {
use super::{create_client, FfiConsentCallback, FfiMessage, FfiMessageCallback, FfiXmtpClient};
use super::{
create_client, FfiConsentCallback, FfiMessage, FfiMessageCallback, FfiPreferenceCallback,
FfiPreferenceUpdate, FfiXmtpClient,
};
use crate::{
get_inbox_id_for_address, inbox_owner::SigningError, FfiConsent, FfiConsentEntityType,
FfiConsentState, FfiConversation, FfiConversationCallback, FfiConversationMessageKind,
Expand All @@ -1827,12 +1877,9 @@ mod tests {
FfiPermissionPolicySet, FfiPermissionUpdateType, FfiSubscribeError,
};
use ethers::utils::hex;
use std::{
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
},
time::{Duration, Instant},
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
};
use tokio::{sync::Notify, time::error::Elapsed};
use xmtp_common::tmp_path;
Expand Down Expand Up @@ -1887,6 +1934,7 @@ mod tests {
messages: Mutex<Vec<FfiMessage>>,
conversations: Mutex<Vec<Arc<FfiConversation>>>,
consent_updates: Mutex<Vec<FfiConsent>>,
preference_updates: Mutex<Vec<FfiPreferenceUpdate>>,
notify: Notify,
inbox_id: Option<String>,
installation_id: Option<String>,
Expand Down Expand Up @@ -1973,6 +2021,23 @@ mod tests {
}
}

impl FfiPreferenceCallback for RustStreamCallback {
fn on_preference_update(&self, mut preference: Vec<super::FfiPreferenceUpdate>) {
log::debug!(
inbox_id = self.inbox_id,
installation_id = self.installation_id,
"received consent update"
);
let mut preference_updates = self.preference_updates.lock().unwrap();
preference_updates.append(&mut preference);
self.notify.notify_one();
}

fn on_error(&self, error: FfiSubscribeError) {
log::error!("{}", error)
}
}

fn static_enc_key() -> EncryptionKey {
[2u8; 32]
}
Expand Down Expand Up @@ -4211,23 +4276,17 @@ mod tests {
let result = stream_a_callback.wait_for_delivery(Some(3)).await;
assert!(result.is_ok());

let start = Instant::now();
loop {
// update the sync group's messages to pipe them into the events
wait_for_ok(|| async {
alix_b
.conversations()
.sync_all_conversations(None)
.await
.unwrap();

if stream_b_callback.wait_for_delivery(Some(1)).await.is_ok() {
break;
}

if start.elapsed() > Duration::from_secs(5) {
panic!("Timed out while waiting for alix_b consent updates.");
}
}
stream_b_callback.wait_for_delivery(Some(1)).await
})
.await
.unwrap();

// two outgoing consent updates
assert_eq!(stream_a_callback.consent_updates_count(), 2);
Expand All @@ -4238,6 +4297,35 @@ mod tests {
b_stream.end_and_wait().await.unwrap();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_stream_preferences() {
let wallet = generate_local_wallet();
let alix_a = new_test_client_with_wallet_and_history(wallet.clone()).await;
let stream_a_callback = Arc::new(RustStreamCallback::default());

let a_stream = alix_a
.conversations()
.stream_preferences(stream_a_callback.clone())
.await;

let _alix_b = new_test_client_with_wallet_and_history(wallet).await;

let result = stream_a_callback.wait_for_delivery(Some(3)).await;
assert!(result.is_ok());

let update = {
let mut a_updates = stream_a_callback.preference_updates.lock().unwrap();
assert_eq!(a_updates.len(), 1);

a_updates.pop().unwrap()
};

// We got the HMAC update
assert!(matches!(update, FfiPreferenceUpdate::HMAC { .. }));

a_stream.end_and_wait().await.unwrap();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_set_and_get_group_consent() {
let alix = new_test_client().await;
Expand Down
61 changes: 61 additions & 0 deletions xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,44 @@ impl<C> LocalEvents<C> {
_ => None,
}
}

fn preference_filter(self) -> Option<Vec<UserPreferenceUpdate>> {
use LocalEvents::*;

match self {
OutgoingPreferenceUpdates(updates) => {
let updates = updates
.into_iter()
.filter_map(|pu| match pu {
UserPreferenceUpdate::ConsentUpdate(_) => None,
_ => Some(pu),
})
.collect();
Some(updates)
}
IncomingPreferenceUpdate(updates) => {
let updates = updates
.into_iter()
.filter_map(|pu| match pu {
UserPreferenceUpdate::ConsentUpdate(_) => None,
_ => Some(pu),
})
.collect();
Some(updates)
}
_ => None,
}
}
}

pub(crate) trait StreamMessages<C> {
fn stream_sync_messages(self) -> impl Stream<Item = Result<LocalEvents<C>, SubscribeError>>;
fn stream_consent_updates(
self,
) -> impl Stream<Item = Result<Vec<StoredConsentRecord>, SubscribeError>>;
fn stream_preference_updates(
self,
) -> impl Stream<Item = Result<Vec<UserPreferenceUpdate>, SubscribeError>>;
}

impl<C> StreamMessages<C> for broadcast::Receiver<LocalEvents<C>>
Expand All @@ -144,6 +175,16 @@ where
.map(Result::Ok)
})
}

fn stream_preference_updates(
self,
) -> impl Stream<Item = Result<Vec<UserPreferenceUpdate>, SubscribeError>> {
BroadcastStream::new(self).filter_map(|event| async {
xmtp_common::optify!(event, "Missed message due to event queue lag")
.and_then(LocalEvents::preference_filter)
.map(Result::Ok)
})
}
}

impl<T> StreamHandle<T> {
Expand Down Expand Up @@ -521,6 +562,26 @@ where
Ok::<_, ClientError>(())
})
}

pub fn stream_preferences_with_callback(
client: Arc<Client<ApiClient, V>>,
mut callback: impl FnMut(Result<Vec<UserPreferenceUpdate>, SubscribeError>) + Send + 'static,
) -> impl crate::StreamHandle<StreamOutput = Result<(), ClientError>> {
let (tx, rx) = oneshot::channel();

crate::spawn(Some(rx), async move {
let receiver = client.local_events.subscribe();
let stream = receiver.stream_preference_updates();

futures::pin_mut!(stream);
let _ = tx.send(());
while let Some(message) = stream.next().await {
callback(message)
}
tracing::debug!("`stream_consent` stream ended, dropping stream");
Ok::<_, ClientError>(())
})
}
}

#[cfg(test)]
Expand Down

0 comments on commit 25758b4

Please sign in to comment.