From 727cbb991b65195a53dd129d65ff285646af7f7e Mon Sep 17 00:00:00 2001 From: yse Date: Thu, 14 Nov 2024 19:15:29 +0100 Subject: [PATCH] feat: add `pull` method --- lib/core/Cargo.toml | 1 - lib/core/src/persist/sync.rs | 8 +- lib/core/src/sync/mod.rs | 181 ++++++++++++++++++++++++++++++++ lib/core/src/sync/model/data.rs | 44 ++++++++ lib/core/src/sync/model/mod.rs | 81 ++++++++++++++ 5 files changed, 312 insertions(+), 3 deletions(-) diff --git a/lib/core/Cargo.toml b/lib/core/Cargo.toml index 475953357..1110009ef 100644 --- a/lib/core/Cargo.toml +++ b/lib/core/Cargo.toml @@ -58,7 +58,6 @@ semver = "1.0.23" lazy_static = "1.5.0" [dev-dependencies] -lazy_static = "1.5.0" paste = "1.0.15" tempdir = "0.3.7" uuid = { version = "1.8.0", features = ["v4"] } diff --git a/lib/core/src/persist/sync.rs b/lib/core/src/persist/sync.rs index 956a9c162..72a9d1724 100644 --- a/lib/core/src/persist/sync.rs +++ b/lib/core/src/persist/sync.rs @@ -3,9 +3,13 @@ use std::collections::HashMap; use anyhow::Result; use rusqlite::{named_params, Connection, OptionalExtension, Row, Statement, TransactionBehavior}; -use super::Persister; +use super::{PaymentState, Persister}; use crate::{ - sync::model::{sync::Record, RecordType, SyncOutgoingChanges, SyncSettings, SyncState}, + sync::model::{ + data::{ChainSyncData, ReceiveSyncData, SendSyncData}, + sync::Record, + RecordType, SyncOutgoingChanges, SyncSettings, SyncState, + }, utils, }; diff --git a/lib/core/src/sync/mod.rs b/lib/core/src/sync/mod.rs index fafc8d037..7366ac93f 100644 --- a/lib/core/src/sync/mod.rs +++ b/lib/core/src/sync/mod.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use anyhow::{anyhow, Result}; @@ -5,6 +6,14 @@ use anyhow::{anyhow, Result}; use crate::{persist::Persister, prelude::Signer}; use self::client::SyncerClient; +use self::model::sync::Record; +use self::model::DecryptedRecord; +use self::model::{ + data::{ChainSyncData, ReceiveSyncData, SendSyncData, SyncData}, + sync::ListChangesRequest, + RecordType, SyncState, +}; + pub(crate) mod client; pub(crate) mod model; @@ -30,4 +39,176 @@ impl SyncService { } } + fn commit_record( + &self, + decrypted_record: &DecryptedRecord, + sync_state: SyncState, + is_update: bool, + last_commit_time: Option, + ) -> Result<()> { + match decrypted_record.data.clone() { + SyncData::Chain(chain_data) => self.persister.commit_incoming_chain_swap( + &chain_data, + sync_state, + is_update, + last_commit_time, + )?, + SyncData::Send(send_data) => self.persister.commit_incoming_send_swap( + &send_data, + sync_state, + is_update, + last_commit_time, + )?, + SyncData::Receive(receive_data) => self.persister.commit_incoming_receive_swap( + &receive_data, + sync_state, + is_update, + last_commit_time, + )?, + } + Ok(()) + } + + fn load_sync_data(&self, data_id: &str, record_type: RecordType) -> Result { + let data = match record_type { + RecordType::Receive => { + let receive_data: ReceiveSyncData = self + .persister + .fetch_receive_swap_by_id(data_id)? + .ok_or(anyhow!("Could not find Receive swap {data_id}"))? + .into(); + SyncData::Receive(receive_data) + } + RecordType::Send => { + let send_data: SendSyncData = self + .persister + .fetch_send_swap_by_id(data_id)? + .ok_or(anyhow!("Could not find Send swap {data_id}"))? + .into(); + SyncData::Send(send_data) + } + RecordType::Chain => { + let chain_data: ChainSyncData = self + .persister + .fetch_chain_swap_by_id(data_id)? + .ok_or(anyhow!("Could not find Chain swap {data_id}"))? + .into(); + SyncData::Chain(chain_data) + } + }; + Ok(data) + } + + async fn fetch_and_save_records(&self) -> Result<()> { + log::info!("Initiating record pull"); + + let local_latest_revision = self + .persister + .get_sync_settings()? + .latest_revision + .unwrap_or(0); + let req = ListChangesRequest::new(local_latest_revision, self.signer.clone())?; + let incoming_records = self.client.pull(req).await?.changes; + + self.persister.set_incoming_records(&incoming_records)?; + let remote_latest_revision = incoming_records.last().map(|record| record.revision); + if let Some(latest_revision) = remote_latest_revision { + self.persister.set_sync_settings(HashMap::from([( + "latest_revision", + latest_revision.to_string(), + )]))?; + log::info!( + "Successfully pulled and persisted records. New latest revision: {latest_revision}" + ); + } else { + log::info!("No new records found. Local latest revision: {local_latest_revision}"); + } + + Ok(()) + } + + async fn handle_pull(&self, new_record: Record) -> Result<()> { + log::info!("Handling pull for record record_id {}", &new_record.id); + + // Step 3: Check whether or not record is applicable (from its schema_version) + if !new_record.is_applicable()? { + return Err(anyhow!("Record is not applicable: schema_version too high")); + } + + // Step 4: Check whether we already have this record, and if the revision is newer + let maybe_sync_state = self.persister.get_sync_state_by_record_id(&new_record.id)?; + if let Some(sync_state) = &maybe_sync_state { + if sync_state.record_revision >= new_record.revision { + log::info!("Remote record revision is lower or equal to the persisted one. Skipping update."); + return Ok(()); + } + } + + // Step 5: Decrypt the incoming record + let mut decrypted_record = new_record.decrypt(self.signer.clone())?; + + // Step 6: Merge with outgoing records, if present + let maybe_outgoing_changes = self + .persister + .get_sync_outgoing_changes_by_id(&decrypted_record.id)?; + + if let Some(outgoing_changes) = &maybe_outgoing_changes { + if let Some(updated_fields) = &outgoing_changes.updated_fields { + let local_data = + self.load_sync_data(decrypted_record.data.id(), outgoing_changes.record_type)?; + decrypted_record.data.merge(&local_data, updated_fields)?; + } + } + + // Step 7: Apply the changes and update sync state + let new_sync_state = SyncState { + data_id: decrypted_record.data.id().to_string(), + record_id: decrypted_record.id.clone(), + record_revision: decrypted_record.revision, + is_local: maybe_sync_state + .as_ref() + .map(|state| state.is_local) + .unwrap_or(false), + }; + let is_update = maybe_sync_state.is_some(); + let last_commit_time = maybe_outgoing_changes.map(|details| details.commit_time); + self.commit_record( + &decrypted_record, + new_sync_state, + is_update, + last_commit_time, + )?; + + log::info!( + "Successfully pulled record record_id {}", + &decrypted_record.id + ); + + Ok(()) + } + + async fn pull(&self) -> Result<()> { + // Step 1: Fetch and save incoming records from remote, then update local tip + self.fetch_and_save_records().await?; + + // Step 2: Grab all pending incoming records from the database, merge them with + // outgoing if necessary, then apply + let mut succeded = vec![]; + let incoming_records = self.persister.get_incoming_records()?; + for new_record in incoming_records { + let record_id = new_record.id.clone(); + if let Err(err) = self.handle_pull(new_record).await { + log::debug!("Could not handle incoming record {record_id}: {err:?}"); + continue; + } + succeded.push(record_id); + } + + if !succeded.is_empty() { + self.persister.remove_incoming_records(succeded)?; + } + + Ok(()) + } +} } diff --git a/lib/core/src/sync/model/data.rs b/lib/core/src/sync/model/data.rs index 1c09030fd..e80881e90 100644 --- a/lib/core/src/sync/model/data.rs +++ b/lib/core/src/sync/model/data.rs @@ -20,6 +20,17 @@ pub(crate) struct ChainSyncData { pub(crate) description: Option, } +impl ChainSyncData { + pub(crate) fn merge(&mut self, other: &Self, updated_fields: &[String]) { + for field in updated_fields { + match field.as_str() { + "accept_zero_conf" => self.accept_zero_conf = other.accept_zero_conf, + _ => continue, + } + } + } +} + impl From for ChainSyncData { fn from(value: ChainSwap) -> Self { Self { @@ -55,6 +66,17 @@ pub(crate) struct SendSyncData { pub(crate) description: Option, } +impl SendSyncData { + pub(crate) fn merge(&mut self, other: &Self, updated_fields: &[String]) { + for field in updated_fields { + match field.as_str() { + "preimage" => clone_if_set(&mut self.preimage, &other.preimage), + _ => continue, + } + } + } +} + impl From for SendSyncData { fn from(value: SendSwap) -> Self { Self { @@ -127,4 +149,26 @@ impl SyncData { pub(crate) fn to_bytes(&self) -> serde_json::Result> { serde_json::to_vec(self) } + + pub(crate) fn merge(&mut self, other: &Self, updated_fields: &[String]) -> anyhow::Result<()> { + match (self, other) { + (SyncData::Chain(ref mut base), SyncData::Chain(other)) => { + base.merge(other, updated_fields) + } + (SyncData::Send(ref mut base), SyncData::Send(other)) => { + base.merge(other, updated_fields) + } + (SyncData::Receive(ref mut _base), SyncData::Receive(_other)) => { + log::warn!("Attempting to merge for unnecessary type SyncData::Receive"); + } + _ => return Err(anyhow::anyhow!("Cannot merge data from two separate types")), + }; + Ok(()) + } +} + +fn clone_if_set(s: &mut Option, other: &Option) { + if other.is_some() { + s.clone_from(other) + } } diff --git a/lib/core/src/sync/model/mod.rs b/lib/core/src/sync/model/mod.rs index 18a6e90ad..98e460bce 100644 --- a/lib/core/src/sync/model/mod.rs +++ b/lib/core/src/sync/model/mod.rs @@ -1,13 +1,25 @@ +use std::sync::Arc; + +use self::{data::SyncData, sync::Record}; +use crate::prelude::Signer; +use anyhow::Result; +use lazy_static::lazy_static; +use lwk_wollet::hashes::hex::DisplayHex; +use openssl::sha::sha256; use rusqlite::{ types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef}, ToSql, }; +use semver::Version; pub(crate) mod client; pub(crate) mod data; pub(crate) mod sync; const MESSAGE_PREFIX: &[u8; 13] = b"realtimesync:"; +lazy_static! { + static ref CURRENT_SCHEMA_VERSION: Version = Version::parse("0.0.1").unwrap(); +} #[derive(Copy, Clone)] pub(crate) enum RecordType { @@ -55,3 +67,72 @@ pub(crate) struct SyncOutgoingChanges { pub(crate) commit_time: u32, pub(crate) updated_fields: Option>, } + +pub(crate) struct DecryptedRecord { + pub(crate) revision: u64, + pub(crate) id: String, + #[allow(dead_code)] + pub(crate) schema_version: String, + pub(crate) data: SyncData, +} + +impl Record { + pub(crate) fn new( + data: SyncData, + revision: u64, + signer: Arc>, + ) -> Result { + let id = Self::get_id_from_sync_data(&data); + let data = data.to_bytes()?; + let data = signer + .ecies_encrypt(&data) + .map_err(|err| anyhow::anyhow!("Could not encrypt sync data: {err:?}"))?; + let schema_version = CURRENT_SCHEMA_VERSION.to_string(); + Ok(Self { + id, + revision, + schema_version, + data, + }) + } + + fn id(prefix: String, data_id: &str) -> String { + sha256((prefix + ":" + data_id).as_bytes()).to_lower_hex_string() + } + + pub(crate) fn get_id_from_sync_data(data: &SyncData) -> String { + let prefix = match data { + SyncData::Chain(_) => "chain-swap", + SyncData::Send(_) => "send-swap", + SyncData::Receive(_) => "receive-swap", + } + .to_string(); + Self::id(prefix, data.id()) + } + + pub(crate) fn get_id_from_record_type(record_type: RecordType, data_id: &str) -> String { + let prefix = match record_type { + RecordType::Chain => "chain-swap", + RecordType::Send => "send-swap", + RecordType::Receive => "receive-swap", + } + .to_string(); + Self::id(prefix, data_id) + } + + pub(crate) fn is_applicable(&self) -> Result { + let record_version = Version::parse(&self.schema_version)?; + Ok(CURRENT_SCHEMA_VERSION.major >= record_version.major) + } + + pub(crate) fn decrypt(self, signer: Arc>) -> Result { + let dec_data = signer.ecies_decrypt(self.data.as_slice())?; + let data = serde_json::from_slice(&dec_data)?; + Ok(DecryptedRecord { + id: self.id, + revision: self.revision, + schema_version: self.schema_version, + data, + }) + } +}