Skip to content

Commit

Permalink
finish out the record provider trait
Browse files Browse the repository at this point in the history
  • Loading branch information
codabrink committed Jan 9, 2025
1 parent b5734f1 commit 2c3d0e4
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 16 deletions.
22 changes: 12 additions & 10 deletions xmtp_mls/src/groups/device_sync/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,41 @@ use crate::storage::DbConnection;
use backup_element::{BackupElement, BackupRecordStreamer};
use futures::Stream;
use serde::{Deserialize, Serialize};
use std::{ops::Range, sync::Arc};
use std::sync::Arc;
use xmtp_proto::xmtp::device_sync::consent_backup::ConsentRecordSave;

mod backup_element;

#[derive(Serialize, Deserialize)]
pub struct BackupMetadata {
exported_at_ns: u64,
exported_elements: Vec<BackupSelection>,
exported_elements: Vec<BackupOptionsElementSelection>,
/// Range of timestamp messages from_ns..to_ns
from_ns: u64,
to_ns: u64,
start_ns: Option<u64>,
end_ns: Option<u64>,
}

pub struct BackupOptions {
range_ns: Option<Range<u64>>,
elements: Vec<BackupSelection>,
start_ns: Option<u64>,
end_ns: Option<u64>,
elements: Vec<BackupOptionsElementSelection>,
}

#[derive(Serialize, Deserialize, Clone)]
pub enum BackupSelection {
pub enum BackupOptionsElementSelection {
Messages,
Consent,
}

impl BackupSelection {
impl BackupOptionsElementSelection {
fn to_streamers(
&self,
conn: &Arc<DbConnection>,
opts: &BackupOptions,
) -> Vec<Box<dyn Stream<Item = Vec<BackupElement>>>> {
match self {
Self::Consent => vec![Box::new(BackupRecordStreamer::<ConsentRecordSave>::new(
conn,
conn, opts,
))],
Self::Messages => vec![],
}
Expand All @@ -46,7 +48,7 @@ impl BackupOptions {
let input_streams = self
.elements
.iter()
.map(|e| e.to_streamers(conn))
.map(|e| e.to_streamers(conn, &self))
.collect::<Vec<_>>();

BackupWriter {
Expand Down
10 changes: 8 additions & 2 deletions xmtp_mls/src/groups/device_sync/backup/backup_element.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{marker::PhantomData, sync::Arc};
use std::{marker::PhantomData, ops::Range, sync::Arc};

use futures::Stream;
use serde::{Deserialize, Serialize};
Expand All @@ -8,6 +8,8 @@ use xmtp_proto::xmtp::device_sync::{

use crate::storage::DbConnection;

use super::BackupOptions;

pub(crate) mod consent_save;
pub(crate) mod group_save;
pub(crate) mod message_save;
Expand All @@ -31,14 +33,18 @@ trait BackupRecordProvider {
pub(super) struct BackupRecordStreamer<R> {
offset: i64,
conn: Arc<DbConnection>,
start_ns: Option<u64>,
end_ns: Option<u64>,
_phantom: PhantomData<R>,
}

impl<R> BackupRecordStreamer<R> {
pub(super) fn new(conn: &Arc<DbConnection>) -> Self {
pub(super) fn new(conn: &Arc<DbConnection>, opts: &BackupOptions) -> Self {
Self {
offset: 0,
conn: conn.clone(),
start_ns: opts.start_ns,
end_ns: opts.end_ns,
_phantom: PhantomData,
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,42 @@
use crate::storage::group::{ConversationType, GroupMembershipState, StoredGroup};
use super::*;
use crate::storage::{
group::{ConversationType, GroupMembershipState, StoredGroup},
schema::groups,
};
use diesel::prelude::*;
use xmtp_proto::xmtp::device_sync::group_backup::{
ConversationTypeSave, GroupMembershipStateSave, GroupSave,
};

impl BackupRecordProvider for GroupSave {
const BATCH_SIZE: i64 = 100;
fn backup_records(streamer: &BackupRecordStreamer<Self>) -> Vec<BackupElement>
where
Self: Sized,
{
let mut query = groups::table.order_by(groups::id).into_boxed();

if let Some(start_ns) = streamer.start_ns {
query = query.filter(groups::created_at_ns.gt(start_ns as i64));
}
if let Some(end_ns) = streamer.end_ns {
query = query.filter(groups::created_at_ns.le(end_ns as i64));
}

query = query.limit(BATCH_SIZE).offset(streamer.offset);

let batch = streamer
.conn
.raw_query(|conn| query.load::<StoredGroup>(conn))
.expect("Failed to load group records");

batch
.into_iter()
.map(|record| BackupElement::Group(record.into()))
.collect()
}
}

impl From<GroupSave> for StoredGroup {
fn from(value: GroupSave) -> Self {
let membership_state = value.membership_state().into();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,43 @@
use super::*;
use crate::storage::{
group_message::{ContentType, DeliveryStatus, GroupMessageKind, StoredGroupMessage},
schema::group_messages,
};
use diesel::prelude::*;
use xmtp_proto::xmtp::device_sync::message_backup::{
ContentTypeSave, DeliveryStatusSave, GroupMessageKindSave, GroupMessageSave,
};

use crate::storage::group_message::{
ContentType, DeliveryStatus, GroupMessageKind, StoredGroupMessage,
};
impl BackupRecordProvider for GroupMessageSave {
const BATCH_SIZE: i64 = 100;
fn backup_records(streamer: &BackupRecordStreamer<Self>) -> Vec<BackupElement>
where
Self: Sized,
{
let mut query = group_messages::table
.order_by(group_messages::id)
.into_boxed();

if let Some(start_ns) = streamer.start_ns {
query = query.filter(group_messages::sent_at_ns.gt(start_ns as i64));
}
if let Some(end_ns) = streamer.end_ns {
query = query.filter(group_messages::sent_at_ns.le(end_ns as i64));
}

query = query.limit(BATCH_SIZE).offset(streamer.offset);

let batch = streamer
.conn
.raw_query(|conn| query.load::<StoredGroupMessage>(conn))
.expect("Failed to load group records");

batch
.into_iter()
.map(|record| BackupElement::Message(record.into()))
.collect()
}
}

impl From<GroupMessageSave> for StoredGroupMessage {
fn from(value: GroupMessageSave) -> Self {
Expand Down

0 comments on commit 2c3d0e4

Please sign in to comment.