Skip to content

Commit

Permalink
feat: add migration step for new remotes
Browse files Browse the repository at this point in the history
[skip ci]
  • Loading branch information
hydra-yse committed Nov 28, 2024
1 parent 593a38b commit e9f9798
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 7 deletions.
61 changes: 54 additions & 7 deletions lib/core/src/persist/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,72 @@ impl Persister {
Ok(sync_settings)
}

fn set_sync_setting_stmt(con: &Connection) -> rusqlite::Result<Statement> {
con.prepare("INSERT OR REPLACE INTO sync_settings(key, value) VALUES(:key, :value)")
}

pub(crate) fn set_sync_settings(&self, map: HashMap<&'static str, String>) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

for (key, value) in map {
tx.execute(
"INSERT OR REPLACE INTO sync_settings(key, value) VALUES(:key, :value)",
named_params! {
":key": key,
":value": value,
},
)?;
Self::set_sync_setting_stmt(&tx)?.execute(named_params! {
":key": key,
":value": value,
})?;
}

tx.commit()?;

Ok(())
}

pub(crate) fn set_new_remote(&self, remote_url: String) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

tx.execute("DELETE FROM sync_state", [])?;
tx.execute("DELETE FROM sync_incoming", [])?;
tx.execute("DELETE FROM sync_outgoing", [])?;

let swap_tables = HashMap::from([
("receive_swaps", RecordType::Receive),
("send_swaps", RecordType::Send),
("chain_swaps", RecordType::Chain),
]);
for (table_name, record_type) in swap_tables {
let mut stmt = tx.prepare(&format!("SELECT id FROM {table_name}"))?;
let mut rows = stmt.query([])?;

while let Some(row) = rows.next()? {
let data_id: String = row.get(0)?;
let record_id = Record::get_id_from_record_type(record_type, &data_id);

tx.execute(
"
INSERT INTO sync_outgoing(record_id, data_id, record_type, commit_time)
VALUES(:record_id, :data_id, :record_type, :commit_time)
",
named_params! {
":record_id": record_id,
":data_id": data_id,
":record_type": record_type,
":commit_time": utils::now(),
},
)?;
}
}

Self::set_sync_setting_stmt(&tx)?.execute(named_params! {
":key": "remote_url",
":value": remote_url
})?;

tx.commit()?;

Ok(())
}

pub(crate) fn get_incoming_records(&self) -> Result<Vec<Record>> {
let con = self.get_connection()?;

Expand Down
12 changes: 12 additions & 0 deletions lib/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ impl SyncService {
}
}

fn check_remote_change(&self) -> Result<()> {
match self
.persister
.get_sync_settings()?
.remote_url
.is_some_and(|url| url == self.remote_url)
{
true => Ok(()),
false => self.persister.set_new_remote(self.remote_url.clone()),
}
}

pub(crate) async fn run(self: Arc<Self>, mut shutdown: watch::Receiver<()>) -> Result<()> {
self.client.connect(self.remote_url.clone()).await?;

Expand Down

0 comments on commit e9f9798

Please sign in to comment.