Skip to content

Commit

Permalink
feat: add pull method
Browse files Browse the repository at this point in the history
  • Loading branch information
hydra-yse committed Nov 28, 2024
1 parent 45a3143 commit 727cbb9
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 3 deletions.
1 change: 0 additions & 1 deletion lib/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ semver = "1.0.23"
lazy_static = "1.5.0"

[dev-dependencies]
lazy_static = "1.5.0"
paste = "1.0.15"
tempdir = "0.3.7"
uuid = { version = "1.8.0", features = ["v4"] }
Expand Down
8 changes: 6 additions & 2 deletions lib/core/src/persist/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ use std::collections::HashMap;
use anyhow::Result;
use rusqlite::{named_params, Connection, OptionalExtension, Row, Statement, TransactionBehavior};

use super::Persister;
use super::{PaymentState, Persister};
use crate::{
sync::model::{sync::Record, RecordType, SyncOutgoingChanges, SyncSettings, SyncState},
sync::model::{
data::{ChainSyncData, ReceiveSyncData, SendSyncData},
sync::Record,
RecordType, SyncOutgoingChanges, SyncSettings, SyncState,
},
utils,
};

Expand Down
181 changes: 181 additions & 0 deletions lib/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::{anyhow, Result};

use crate::{persist::Persister, prelude::Signer};

use self::client::SyncerClient;
use self::model::sync::Record;
use self::model::DecryptedRecord;
use self::model::{
data::{ChainSyncData, ReceiveSyncData, SendSyncData, SyncData},
sync::ListChangesRequest,
RecordType, SyncState,
};

pub(crate) mod client;
pub(crate) mod model;

Expand All @@ -30,4 +39,176 @@ impl SyncService {
}
}

fn commit_record(
&self,
decrypted_record: &DecryptedRecord,
sync_state: SyncState,
is_update: bool,
last_commit_time: Option<u32>,
) -> Result<()> {
match decrypted_record.data.clone() {
SyncData::Chain(chain_data) => self.persister.commit_incoming_chain_swap(
&chain_data,
sync_state,
is_update,
last_commit_time,
)?,
SyncData::Send(send_data) => self.persister.commit_incoming_send_swap(
&send_data,
sync_state,
is_update,
last_commit_time,
)?,
SyncData::Receive(receive_data) => self.persister.commit_incoming_receive_swap(
&receive_data,
sync_state,
is_update,
last_commit_time,
)?,
}
Ok(())
}

fn load_sync_data(&self, data_id: &str, record_type: RecordType) -> Result<SyncData> {
let data = match record_type {
RecordType::Receive => {
let receive_data: ReceiveSyncData = self
.persister
.fetch_receive_swap_by_id(data_id)?
.ok_or(anyhow!("Could not find Receive swap {data_id}"))?
.into();
SyncData::Receive(receive_data)
}
RecordType::Send => {
let send_data: SendSyncData = self
.persister
.fetch_send_swap_by_id(data_id)?
.ok_or(anyhow!("Could not find Send swap {data_id}"))?
.into();
SyncData::Send(send_data)
}
RecordType::Chain => {
let chain_data: ChainSyncData = self
.persister
.fetch_chain_swap_by_id(data_id)?
.ok_or(anyhow!("Could not find Chain swap {data_id}"))?
.into();
SyncData::Chain(chain_data)
}
};
Ok(data)
}

async fn fetch_and_save_records(&self) -> Result<()> {
log::info!("Initiating record pull");

let local_latest_revision = self
.persister
.get_sync_settings()?
.latest_revision
.unwrap_or(0);
let req = ListChangesRequest::new(local_latest_revision, self.signer.clone())?;
let incoming_records = self.client.pull(req).await?.changes;

self.persister.set_incoming_records(&incoming_records)?;
let remote_latest_revision = incoming_records.last().map(|record| record.revision);
if let Some(latest_revision) = remote_latest_revision {
self.persister.set_sync_settings(HashMap::from([(
"latest_revision",
latest_revision.to_string(),
)]))?;
log::info!(
"Successfully pulled and persisted records. New latest revision: {latest_revision}"
);
} else {
log::info!("No new records found. Local latest revision: {local_latest_revision}");
}

Ok(())
}

async fn handle_pull(&self, new_record: Record) -> Result<()> {
log::info!("Handling pull for record record_id {}", &new_record.id);

// Step 3: Check whether or not record is applicable (from its schema_version)
if !new_record.is_applicable()? {
return Err(anyhow!("Record is not applicable: schema_version too high"));
}

// Step 4: Check whether we already have this record, and if the revision is newer
let maybe_sync_state = self.persister.get_sync_state_by_record_id(&new_record.id)?;
if let Some(sync_state) = &maybe_sync_state {
if sync_state.record_revision >= new_record.revision {
log::info!("Remote record revision is lower or equal to the persisted one. Skipping update.");
return Ok(());
}
}

// Step 5: Decrypt the incoming record
let mut decrypted_record = new_record.decrypt(self.signer.clone())?;

// Step 6: Merge with outgoing records, if present
let maybe_outgoing_changes = self
.persister
.get_sync_outgoing_changes_by_id(&decrypted_record.id)?;

if let Some(outgoing_changes) = &maybe_outgoing_changes {
if let Some(updated_fields) = &outgoing_changes.updated_fields {
let local_data =
self.load_sync_data(decrypted_record.data.id(), outgoing_changes.record_type)?;
decrypted_record.data.merge(&local_data, updated_fields)?;
}
}

// Step 7: Apply the changes and update sync state
let new_sync_state = SyncState {
data_id: decrypted_record.data.id().to_string(),
record_id: decrypted_record.id.clone(),
record_revision: decrypted_record.revision,
is_local: maybe_sync_state
.as_ref()
.map(|state| state.is_local)
.unwrap_or(false),
};
let is_update = maybe_sync_state.is_some();
let last_commit_time = maybe_outgoing_changes.map(|details| details.commit_time);
self.commit_record(
&decrypted_record,
new_sync_state,
is_update,
last_commit_time,
)?;

log::info!(
"Successfully pulled record record_id {}",
&decrypted_record.id
);

Ok(())
}

async fn pull(&self) -> Result<()> {
// Step 1: Fetch and save incoming records from remote, then update local tip
self.fetch_and_save_records().await?;

// Step 2: Grab all pending incoming records from the database, merge them with
// outgoing if necessary, then apply
let mut succeded = vec![];
let incoming_records = self.persister.get_incoming_records()?;
for new_record in incoming_records {
let record_id = new_record.id.clone();
if let Err(err) = self.handle_pull(new_record).await {
log::debug!("Could not handle incoming record {record_id}: {err:?}");
continue;
}
succeded.push(record_id);
}

if !succeded.is_empty() {
self.persister.remove_incoming_records(succeded)?;
}

Ok(())
}
}
}
44 changes: 44 additions & 0 deletions lib/core/src/sync/model/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ pub(crate) struct ChainSyncData {
pub(crate) description: Option<String>,
}

impl ChainSyncData {
pub(crate) fn merge(&mut self, other: &Self, updated_fields: &[String]) {
for field in updated_fields {
match field.as_str() {
"accept_zero_conf" => self.accept_zero_conf = other.accept_zero_conf,
_ => continue,
}
}
}
}

impl From<ChainSwap> for ChainSyncData {
fn from(value: ChainSwap) -> Self {
Self {
Expand Down Expand Up @@ -55,6 +66,17 @@ pub(crate) struct SendSyncData {
pub(crate) description: Option<String>,
}

impl SendSyncData {
pub(crate) fn merge(&mut self, other: &Self, updated_fields: &[String]) {
for field in updated_fields {
match field.as_str() {
"preimage" => clone_if_set(&mut self.preimage, &other.preimage),
_ => continue,
}
}
}
}

impl From<SendSwap> for SendSyncData {
fn from(value: SendSwap) -> Self {
Self {
Expand Down Expand Up @@ -127,4 +149,26 @@ impl SyncData {
pub(crate) fn to_bytes(&self) -> serde_json::Result<Vec<u8>> {
serde_json::to_vec(self)
}

pub(crate) fn merge(&mut self, other: &Self, updated_fields: &[String]) -> anyhow::Result<()> {
match (self, other) {
(SyncData::Chain(ref mut base), SyncData::Chain(other)) => {
base.merge(other, updated_fields)
}
(SyncData::Send(ref mut base), SyncData::Send(other)) => {
base.merge(other, updated_fields)
}
(SyncData::Receive(ref mut _base), SyncData::Receive(_other)) => {
log::warn!("Attempting to merge for unnecessary type SyncData::Receive");
}
_ => return Err(anyhow::anyhow!("Cannot merge data from two separate types")),
};
Ok(())
}
}

fn clone_if_set<T: Clone>(s: &mut Option<T>, other: &Option<T>) {
if other.is_some() {
s.clone_from(other)
}
}
81 changes: 81 additions & 0 deletions lib/core/src/sync/model/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
use std::sync::Arc;

use self::{data::SyncData, sync::Record};
use crate::prelude::Signer;
use anyhow::Result;
use lazy_static::lazy_static;
use lwk_wollet::hashes::hex::DisplayHex;
use openssl::sha::sha256;
use rusqlite::{
types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
ToSql,
};
use semver::Version;

pub(crate) mod client;
pub(crate) mod data;
pub(crate) mod sync;

const MESSAGE_PREFIX: &[u8; 13] = b"realtimesync:";
lazy_static! {
static ref CURRENT_SCHEMA_VERSION: Version = Version::parse("0.0.1").unwrap();
}

#[derive(Copy, Clone)]
pub(crate) enum RecordType {
Expand Down Expand Up @@ -55,3 +67,72 @@ pub(crate) struct SyncOutgoingChanges {
pub(crate) commit_time: u32,
pub(crate) updated_fields: Option<Vec<String>>,
}

pub(crate) struct DecryptedRecord {
pub(crate) revision: u64,
pub(crate) id: String,
#[allow(dead_code)]
pub(crate) schema_version: String,
pub(crate) data: SyncData,
}

impl Record {
pub(crate) fn new(
data: SyncData,
revision: u64,
signer: Arc<Box<dyn Signer>>,
) -> Result<Self, anyhow::Error> {
let id = Self::get_id_from_sync_data(&data);
let data = data.to_bytes()?;
let data = signer
.ecies_encrypt(&data)
.map_err(|err| anyhow::anyhow!("Could not encrypt sync data: {err:?}"))?;
let schema_version = CURRENT_SCHEMA_VERSION.to_string();
Ok(Self {
id,
revision,
schema_version,
data,
})
}

fn id(prefix: String, data_id: &str) -> String {
sha256((prefix + ":" + data_id).as_bytes()).to_lower_hex_string()
}

pub(crate) fn get_id_from_sync_data(data: &SyncData) -> String {
let prefix = match data {
SyncData::Chain(_) => "chain-swap",
SyncData::Send(_) => "send-swap",
SyncData::Receive(_) => "receive-swap",
}
.to_string();
Self::id(prefix, data.id())
}

pub(crate) fn get_id_from_record_type(record_type: RecordType, data_id: &str) -> String {
let prefix = match record_type {
RecordType::Chain => "chain-swap",
RecordType::Send => "send-swap",
RecordType::Receive => "receive-swap",
}
.to_string();
Self::id(prefix, data_id)
}

pub(crate) fn is_applicable(&self) -> Result<bool> {
let record_version = Version::parse(&self.schema_version)?;
Ok(CURRENT_SCHEMA_VERSION.major >= record_version.major)
}

pub(crate) fn decrypt(self, signer: Arc<Box<dyn Signer>>) -> Result<DecryptedRecord> {
let dec_data = signer.ecies_decrypt(self.data.as_slice())?;
let data = serde_json::from_slice(&dec_data)?;
Ok(DecryptedRecord {
id: self.id,
revision: self.revision,
schema_version: self.schema_version,
data,
})
}
}

0 comments on commit 727cbb9

Please sign in to comment.