Skip to content

Commit

Permalink
feat: add sync_outgoing
Browse files Browse the repository at this point in the history
  • Loading branch information
hydra-yse committed Nov 16, 2024
1 parent 3e95eff commit c2d2f82
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 0 deletions.
122 changes: 122 additions & 0 deletions lib/core/src/persist/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,126 @@ impl Persister {
Ok(())
}

pub(crate) fn set_sync_outgoing_details_and_state(
&self,
details: SyncOutgoingDetails,
sync_state: SyncState,
) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

let updated_fields_json = match details.updated_fields {
Some(fields) => Some(serde_json::to_string(&fields)?),
None => None,
};

tx.execute(
"
INSERT OR REPLACE INTO sync_outgoing(record_id, record_type, commit_time, updated_fields_json)
VALUES(:record_id, :record_type, :commit_time, :updated_fields_json)
",
named_params! {
":record_id": &details.record_id,
":record_type": &details.record_type,
":commit_time": &details.commit_time,
":updated_fields_json": updated_fields_json,
},
)?;

Self::set_sync_state_stmt(&tx)?.execute(named_params! {
":data_id": &sync_state.data_id,
":record_id": &sync_state.record_id,
":record_revision": &sync_state.record_revision,
":is_local": &sync_state.is_local,
})?;

tx.commit()?;

Ok(())
}

fn select_sync_outgoing_details_query(where_clauses: Vec<String>) -> String {
let mut where_clause_str = String::new();
if !where_clauses.is_empty() {
where_clause_str = String::from("WHERE ");
where_clause_str.push_str(where_clauses.join(" AND ").as_str());
}

format!(
"
SELECT
record_id,
record_type,
commit_time,
updated_fields_json
FROM sync_outgoing
{where_clause_str}
"
)
}

fn sql_row_to_sync_outgoing_details(row: &Row) -> Result<SyncOutgoingDetails> {
let record_id = row.get(0)?;
let record_type = row.get(1)?;
let commit_time = row.get(2)?;
let updated_fields = match row.get::<_, Option<String>>(3)? {
Some(fields) => Some(serde_json::from_str(&fields)?),
None => None,
};

Ok(SyncOutgoingDetails {
record_id,
record_type,
commit_time,
updated_fields,
})
}

pub(crate) fn get_sync_outgoing_details(&self) -> Result<Vec<SyncOutgoingDetails>> {
let con = self.get_connection()?;

let query = Self::select_sync_outgoing_details_query(vec![]);
let mut stmt = con.prepare(&query)?;
let mut rows = stmt.query([])?;

let mut outgoing_details = vec![];
while let Some(row) = rows.next()? {
let detail = Self::sql_row_to_sync_outgoing_details(row)?;
outgoing_details.push(detail);
}

Ok(outgoing_details)
}

pub(crate) fn get_sync_outgoing_details_by_id(
&self,
record_id: &str,
) -> Result<Option<SyncOutgoingDetails>> {
let con = self.get_connection()?;
let query =
Self::select_sync_outgoing_details_query(vec!["record_id = :record_id".to_string()]);
let mut stmt = con.prepare(&query)?;
let mut rows = stmt.query(named_params! {
":record_id": record_id,
})?;

if let Some(row) = rows.next()? {
return Ok(Some(Self::sql_row_to_sync_outgoing_details(row)?));
}

Ok(None)
}

pub(crate) fn remove_sync_outgoing_details(&self, record_id: &str) -> Result<()> {
let con = self.get_connection()?;

con.execute(
"DELETE FROM sync_outgoing WHERE record_id = :record_id",
named_params! {
":record_id": record_id
},
)?;

Ok(())
}
}
32 changes: 32 additions & 0 deletions lib/core/src/sync/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,32 @@ use rusqlite::{

pub(crate) mod sync;

#[derive(Copy, Clone)]
pub(crate) enum RecordType {
Receive = 0,
Send = 1,
Chain = 2,
}

impl ToSql for RecordType {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
Ok(rusqlite::types::ToSqlOutput::from(*self as i8))
}
}

impl FromSql for RecordType {
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
match value {
ValueRef::Integer(i) => match i as u8 {
0 => Ok(Self::Receive),
1 => Ok(Self::Send),
2 => Ok(Self::Chain),
_ => Err(FromSqlError::OutOfRange(i)),
},
_ => Err(FromSqlError::InvalidType),
}
}
}

pub(crate) struct SyncState {
pub(crate) data_id: String,
Expand All @@ -18,3 +44,9 @@ pub(crate) struct SyncSettings {
pub(crate) latest_revision: Option<u64>,
}

pub(crate) struct SyncOutgoingDetails {
pub(crate) record_id: String,
pub(crate) record_type: RecordType,
pub(crate) commit_time: u32,
pub(crate) updated_fields: Option<Vec<String>>,
}

0 comments on commit c2d2f82

Please sign in to comment.