diff --git a/xmtp_mls/src/groups/device_sync/backup.rs b/xmtp_mls/src/groups/device_sync/backup.rs index c4eac5379..165dd2293 100644 --- a/xmtp_mls/src/groups/device_sync/backup.rs +++ b/xmtp_mls/src/groups/device_sync/backup.rs @@ -1,10 +1,10 @@ +use crate::storage::DbConnection; use backup_element::{BackupElement, BackupRecordStreamer}; use futures::Stream; use serde::{Deserialize, Serialize}; +use std::{ops::Range, sync::Arc}; use xmtp_proto::xmtp::device_sync::consent_backup::ConsentRecordSave; -use crate::storage::DbConnection; - mod backup_element; #[derive(Serialize, Deserialize)] @@ -17,8 +17,7 @@ pub struct BackupMetadata { } pub struct BackupOptions { - from_ns: u64, - to_ns: u64, + range_ns: Option>, elements: Vec, } @@ -31,7 +30,7 @@ pub enum BackupSelection { impl BackupSelection { fn to_streamers( &self, - conn: &'static DbConnection, + conn: &Arc, ) -> Vec>>> { match self { Self::Consent => vec![Box::new(BackupRecordStreamer::::new( @@ -43,7 +42,7 @@ impl BackupSelection { } impl BackupOptions { - pub fn write(self, conn: &'static DbConnection) -> BackupWriter { + pub fn write(self, conn: &Arc) -> BackupWriter { let input_streams = self .elements .iter() @@ -61,12 +60,3 @@ struct BackupWriter { options: BackupOptions, input_streams: Vec>>>>, } - -impl Stream for BackupWriter { - type Item = Vec; - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - } -} diff --git a/xmtp_mls/src/groups/device_sync/backup/backup_element.rs b/xmtp_mls/src/groups/device_sync/backup/backup_element.rs index f60b8893a..647526de8 100644 --- a/xmtp_mls/src/groups/device_sync/backup/backup_element.rs +++ b/xmtp_mls/src/groups/device_sync/backup/backup_element.rs @@ -1,4 +1,4 @@ -use std::marker::PhantomData; +use std::{marker::PhantomData, sync::Arc}; use futures::Stream; use serde::{Deserialize, Serialize}; @@ -23,28 +23,28 @@ pub enum BackupElement { trait BackupRecordProvider { const BATCH_SIZE: i64; - fn backup_records(streamer: &BackupRecordStreamer<'_, Self>) -> Vec + fn backup_records(streamer: &BackupRecordStreamer) -> Vec where Self: Sized; } -pub(super) struct BackupRecordStreamer<'a, R> { +pub(super) struct BackupRecordStreamer { offset: i64, - conn: &'a DbConnection, + conn: Arc, _phantom: PhantomData, } -impl<'a, R> BackupRecordStreamer<'a, R> { - pub(super) fn new(conn: &'a DbConnection) -> Self { +impl BackupRecordStreamer { + pub(super) fn new(conn: &Arc) -> Self { Self { offset: 0, - conn, + conn: conn.clone(), _phantom: PhantomData, } } } -impl<'a, R> Stream for BackupRecordStreamer<'a, R> +impl Stream for BackupRecordStreamer where R: BackupRecordProvider + Unpin, { @@ -57,7 +57,7 @@ where // Get a mutable reference to self let this = self.get_mut(); - let batch = R::backup_records(&*this); + let batch = R::backup_records(this); // If no records found, we've reached the end of the stream if batch.is_empty() { diff --git a/xmtp_mls/src/groups/device_sync/backup/backup_element/consent_save.rs b/xmtp_mls/src/groups/device_sync/backup/backup_element/consent_save.rs index 90255c891..abd2cb1c7 100644 --- a/xmtp_mls/src/groups/device_sync/backup/backup_element/consent_save.rs +++ b/xmtp_mls/src/groups/device_sync/backup/backup_element/consent_save.rs @@ -10,7 +10,7 @@ use xmtp_proto::xmtp::device_sync::consent_backup::{ impl BackupRecordProvider for ConsentRecordSave { const BATCH_SIZE: i64 = 100; - fn backup_records(streamer: &BackupRecordStreamer<'_, Self>) -> Vec + fn backup_records(streamer: &BackupRecordStreamer) -> Vec where Self: Sized, {