Skip to content

Commit

Permalink
feat: add apply_incoming method
Browse files Browse the repository at this point in the history
Uses the `pull_changes` method to grab the remote records, merges them
with outgoing sync data (if necessary) and persists them
  • Loading branch information
hydra-yse committed Nov 10, 2024
1 parent d2a45fd commit 8b6a409
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 4 deletions.
2 changes: 1 addition & 1 deletion lib/core/src/persist/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl Persister {
// so we split up the insert into two statements.
let mut stmt = con.prepare(
"
INSERT INTO chain_swaps (
INSERT OR REPLACE INTO chain_swaps (
id,
id_hash,
direction,
Expand Down
2 changes: 1 addition & 1 deletion lib/core/src/persist/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl Persister {

let mut stmt = con.prepare(
"
INSERT INTO receive_swaps (
INSERT OR REPLACE INTO receive_swaps (
id,
id_hash,
preimage,
Expand Down
2 changes: 1 addition & 1 deletion lib/core/src/persist/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl Persister {

let mut stmt = con.prepare(
"
INSERT INTO send_swaps (
INSERT OR REPLACE INTO send_swaps (
id,
id_hash,
invoice,
Expand Down
109 changes: 109 additions & 0 deletions lib/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ use anyhow::{anyhow, Result};
use crate::{persist::Persister, prelude::Signer};

use self::client::SyncerClient;
use self::model::{
data::{ChainSyncData, ReceiveSyncData, SendSyncData, SyncData},
sync::{ListChangesRequest, Record},
RecordType, SyncData, SyncOutgoingDetails, SyncState,
};
use self::model::{DecryptedRecord, SyncSettings};

pub(crate) mod client;
pub(crate) mod model;

Expand Down Expand Up @@ -41,4 +48,106 @@ impl SyncService {
Ok(self.client.list_changes(req).await?.changes)
}

fn apply_record(&self, decrypted_record: &DecryptedRecord) -> Result<()> {
match decrypted_record.data {
SyncData::Chain(chain_data) => self.persister.insert_chain_swap(chain_data.into())?,
SyncData::Send(send_data) => self.persister.insert_send_swap(send_data.into())?,
SyncData::Receive(receive_data) => {
self.persister.insert_receive_swap(receive_data.into())?
}
}
Ok(())
}

fn fetch_sync_data(&self, data_id: &str, record_type: RecordType) -> Result<SyncData> {
let data = match record_type {
RecordType::Receive => {
let receive_data: ReceiveSyncData = self
.persister
.fetch_receive_swap_by_id(data_id)?
.ok_or(anyhow!("Could not find Receive swap {data_id}"))?
.into();
SyncData::Receive(receive_data)
}
RecordType::Send => {
let send_data: SendSyncData = self
.persister
.fetch_send_swap_by_id(data_id)?
.ok_or(anyhow!("Could not find Send swap {data_id}"))?
.into();
SyncData::Send(send_data)
}
RecordType::Chain => {
let chain_data: ChainSyncData = self
.persister
.fetch_chain_swap_by_id(data_id)?
.ok_or(anyhow!("Could not find Chain swap {data_id}"))?
.into();
SyncData::Chain(chain_data)
}
};
Ok(data)
}

async fn apply_incoming(&self) -> Result<()> {
let incoming_records = self.pull_changes().await?;
let latest_revision = incoming_records.last().map(|record| record.revision);

for new_record in incoming_records {
// Step 1: Check whether or not record is updatable (from its schema_version)
if new_record.is_major_update()? {
self.persister.set_incoming_record(&new_record)?;
continue;
}

// Step 2: Check whether we already have this record, and if the revision is newer
let maybe_sync_state = self.persister.get_sync_state_by_record_id(&new_record.id)?;
if let Some(sync_state) = maybe_sync_state {
if sync_state.record_revision > new_record.revision {
continue;
}
}

// Step 3: Decrypt the incoming record
let mut decrypted_record = new_record.decrypt(self.signer.clone())?;

// Step 4: Merge with outgoing records, if present
let maybe_outgoing_details = self
.persister
.get_sync_outgoing_details(&decrypted_record.id)?;

if let Some(outgoing_details) = maybe_outgoing_details {
if let Some(updated_fields) = outgoing_details.updated_fields_json {
let updated_fields: Vec<&str> = serde_json::from_str(&updated_fields)?;

let local_data = self.fetch_sync_data(
&decrypted_record.data.id(),
outgoing_details.record_type,
)?;
decrypted_record.data.merge(&local_data, updated_fields)?;
}
}

// Step 5: Apply the changes
self.apply_record(&decrypted_record)?;

// Step 6: Update sync state
self.persister.set_sync_state(SyncState {
data_id: decrypted_record.data.id().to_string(),
record_id: decrypted_record.id,
record_revision: decrypted_record.revision,
})?;
}

// Step 7: Update local tip
if let Some(latest_revision) = latest_revision {
self.persister.set_sync_settings(SyncSettings {
remote_url: self.remote_url.clone(),
latest_revision,
})?;
}

Ok(())
}

}
133 changes: 132 additions & 1 deletion lib/core/src/sync/model/data.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};

use crate::prelude::{ChainSwap, Direction, ReceiveSwap, SendSwap};
use crate::prelude::{ChainSwap, Direction, PaymentState, ReceiveSwap, SendSwap};

#[derive(Serialize, Deserialize, Clone, Debug)]
pub(crate) struct ChainSyncData {
Expand All @@ -21,6 +21,46 @@ pub(crate) struct ChainSyncData {
pub(crate) claim_address: Option<String>,
}

impl ChainSyncData {
pub(crate) fn merge(&mut self, other: &Self, updated_fields: Vec<&str>) {
for field in updated_fields {
match field {
"description" => clone_if_changed(&mut self.description, &other.description),
"claim_address" => clone_if_changed(&mut self.claim_address, &other.claim_address),
"accept_zero_conf" => self.accept_zero_conf = other.accept_zero_conf,
_ => continue,
}
}
}
}

impl Into<ChainSwap> for ChainSyncData {
fn into(self) -> ChainSwap {
ChainSwap {
id: self.swap_id,
preimage: self.preimage,
direction: self.direction,
create_response_json: self.create_response_json,
claim_private_key: self.claim_private_key,
refund_private_key: self.refund_private_key,
payer_amount_sat: self.payer_amount_sat,
receiver_amount_sat: self.receiver_amount_sat,
claim_fees_sat: self.claim_fees_sat,
claim_address: self.claim_address,
lockup_address: self.lockup_address,
timeout_block_height: self.timeout_block_height,
accept_zero_conf: self.accept_zero_conf,
description: self.description,
server_lockup_tx_id: None,
user_lockup_tx_id: None,
claim_tx_id: None,
refund_tx_id: None,
created_at: self.created_at,
state: PaymentState::Created,
}
}
}

impl From<ChainSwap> for ChainSyncData {
fn from(value: ChainSwap) -> Self {
Self {
Expand Down Expand Up @@ -57,6 +97,39 @@ pub(crate) struct SendSyncData {
pub(crate) description: Option<String>,
}

impl SendSyncData {
pub(crate) fn merge(&mut self, other: &Self, updated_fields: Vec<&str>) {
for field in updated_fields {
match field {
"preimage" => clone_if_changed(&mut self.preimage, &other.preimage),
"payment_hash" => clone_if_changed(&mut self.payment_hash, &other.payment_hash),
"description" => clone_if_changed(&mut self.description, &other.description),
_ => continue,
}
}
}
}

impl Into<SendSwap> for SendSyncData {
fn into(self) -> SendSwap {
SendSwap {
id: self.swap_id,
preimage: self.preimage,
create_response_json: self.create_response_json,
invoice: self.invoice,
payment_hash: self.payment_hash,
description: self.description,
payer_amount_sat: self.payer_amount_sat,
receiver_amount_sat: self.receiver_amount_sat,
refund_private_key: self.refund_private_key,
lockup_tx_id: None,
refund_tx_id: None,
created_at: self.created_at,
state: PaymentState::Created,
}
}
}

impl From<SendSwap> for SendSyncData {
fn from(value: SendSwap) -> Self {
Self {
Expand Down Expand Up @@ -91,6 +164,42 @@ pub(crate) struct ReceiveSyncData {
pub(crate) description: Option<String>,
}

impl ReceiveSyncData {
pub(crate) fn merge(&mut self, other: &Self, updated_fields: Vec<&str>) {
for field in updated_fields {
match field {
"payment_hash" => clone_if_changed(&mut self.payment_hash, &other.payment_hash),
"description" => clone_if_changed(&mut self.description, &other.description),
_ => continue,
}
}
}
}

impl Into<ReceiveSwap> for ReceiveSyncData {
fn into(self) -> ReceiveSwap {
ReceiveSwap {
id: self.swap_id,
preimage: self.preimage,
create_response_json: self.create_response_json,
claim_private_key: self.claim_private_key,
invoice: self.invoice,
payment_hash: self.payment_hash,
description: self.description,
payer_amount_sat: self.payer_amount_sat,
receiver_amount_sat: self.receiver_amount_sat,
claim_fees_sat: self.claim_fees_sat,
mrh_address: self.mrh_address,
mrh_script_pubkey: self.mrh_script_pubkey,
mrh_tx_id: None,
claim_tx_id: None,
lockup_tx_id: None,
created_at: self.created_at,
state: PaymentState::Created,
}
}
}

impl From<ReceiveSwap> for ReceiveSyncData {
fn from(value: ReceiveSwap) -> Self {
Self {
Expand Down Expand Up @@ -131,4 +240,26 @@ impl SyncData {
pub(crate) fn to_bytes(&self) -> serde_json::Result<Vec<u8>> {
serde_json::to_vec(self)
}

pub(crate) fn merge(&mut self, other: &Self, updated_fields: Vec<&str>) -> anyhow::Result<()> {
match (self, other) {
(SyncData::Chain(ref mut base), SyncData::Chain(other)) => {
base.merge(other, updated_fields)
}
(SyncData::Send(ref mut base), SyncData::Send(other)) => {
base.merge(other, updated_fields)
}
(SyncData::Receive(ref mut base), SyncData::Receive(other)) => {
base.merge(other, updated_fields)
}
_ => return Err(anyhow::anyhow!("Cannot merge data from two separate types")),
};
Ok(())
}
}

fn clone_if_changed<T: Clone>(s: &mut Option<T>, other: &Option<T>) {
if other.is_some() {
s.clone_from(other)
}
}
58 changes: 58 additions & 0 deletions lib/core/src/sync/model/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
use std::sync::Arc;

use self::{data::SyncData, sync::Record};
use crate::prelude::Signer;
use anyhow::Result;
use rusqlite::{
types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
ToSql,
Expand All @@ -8,6 +13,7 @@ pub(crate) mod data;
pub(crate) mod sync;

const MESSAGE_PREFIX: &[u8; 13] = b"realtimesync:";
const CURRENT_SCHEMA_VERSION: f32 = 0.01;

#[derive(Copy, Clone)]
pub(crate) enum RecordType {
Expand Down Expand Up @@ -52,3 +58,55 @@ pub(crate) struct SyncOutgoingDetails {
pub(crate) record_type: RecordType,
pub(crate) updated_fields_json: Option<String>,
}

pub(crate) struct DecryptedRecord {
pub(crate) revision: u64,
pub(crate) id: String,
pub(crate) schema_version: String,
pub(crate) data: SyncData,
}

impl DecryptedRecord {
pub(crate) fn encrypt(self, signer: Arc<Box<dyn Signer>>) -> Result<Record, anyhow::Error> {
Record::new(self.id, self.data, self.revision, signer)
}
}

impl Record {
pub(crate) fn new(
id: String,
data: SyncData,
revision: u64,
signer: Arc<Box<dyn Signer>>,
) -> Result<Self, anyhow::Error> {
let data = data.to_bytes()?;
let data = signer
.ecies_encrypt(&data)
.map_err(|err| anyhow::anyhow!("Could not encrypt sync data: {err:?}"))?;
Ok(Self {
id,
revision,
schema_version: format!("{CURRENT_SCHEMA_VERSION:.2}"),
data,
})
}

pub(crate) fn is_major_update(&self) -> Result<bool> {
Ok(self
.schema_version
.parse::<f32>()?
.floor()
.le(&CURRENT_SCHEMA_VERSION.floor()))
}

pub(crate) fn decrypt(self, signer: Arc<Box<dyn Signer>>) -> Result<DecryptedRecord> {
let dec_data = signer.ecies_decrypt(self.data.as_slice())?;
let data = serde_json::from_slice(&dec_data)?;
Ok(DecryptedRecord {
id: self.id,
revision: self.revision,
schema_version: self.schema_version,
data,
})
}
}

0 comments on commit 8b6a409

Please sign in to comment.