Skip to content

Commit

Permalink
Optimize output manager db operations
Browse files Browse the repository at this point in the history
Optimized output manager database operations to do as much work
as possible inside the SQL database instead of inside Rust code.
  • Loading branch information
hansieodendaal committed Sep 14, 2022
1 parent d0e2568 commit 9abc858
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 119 deletions.
119 changes: 66 additions & 53 deletions base_layer/wallet/src/contacts_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,57 +80,43 @@ impl ContactsBackend for ContactsServiceSqliteDatabase {

match op {
WriteOperation::Upsert(kvp) => match *kvp {
DbKeyValuePair::Contact(k, c) => match ContactSql::find_by_public_key(&k.to_vec(), &conn) {
Ok(found_c) => {
let _contact_sql = found_c.update(
UpdateContact {
alias: Some(c.alias),
last_seen: None,
latency: None,
},
&conn,
)?;
},
Err(_) => {
DbKeyValuePair::Contact(k, c) => {
if ContactSql::find_by_public_key_and_update(&conn, &k.to_vec(), UpdateContact {
alias: Some(c.clone().alias),
last_seen: None,
latency: None,
})
.is_err()
{
ContactSql::from(c).commit(&conn)?;
},
}
},
DbKeyValuePair::LastSeen(..) => return Err(ContactsServiceStorageError::OperationNotSupported),
},
WriteOperation::UpdateLastSeen(kvp) => match *kvp {
DbKeyValuePair::LastSeen(node_id, date_time, latency) => {
match ContactSql::find_by_node_id(&node_id.to_vec(), &conn) {
Ok(found_c) => {
let contact = found_c.update(
UpdateContact {
alias: None,
last_seen: Some(Some(date_time)),
latency: Some(latency),
},
&conn,
)?;
return Ok(Some(DbValue::PublicKey(Box::new(
PublicKey::from_vec(&contact.public_key)
.map_err(|_| ContactsServiceStorageError::ConversionError)?,
))));
},
Err(e) => return Err(e),
}
let contact = ContactSql::find_by_node_id_and_update(&conn, &node_id.to_vec(), UpdateContact {
alias: None,
last_seen: Some(Some(date_time)),
latency: Some(latency),
})?;
return Ok(Some(DbValue::PublicKey(Box::new(
PublicKey::from_vec(&contact.public_key)
.map_err(|_| ContactsServiceStorageError::ConversionError)?,
))));
},
DbKeyValuePair::Contact(..) => return Err(ContactsServiceStorageError::OperationNotSupported),
},
WriteOperation::Remove(k) => match k {
DbKey::Contact(k) => match ContactSql::find_by_public_key(&k.to_vec(), &conn) {
DbKey::Contact(k) => match ContactSql::find_by_public_key_and_delete(&conn, &k.to_vec()) {
Ok(c) => {
c.delete(&conn)?;
return Ok(Some(DbValue::Contact(Box::new(Contact::try_from(c)?))));
},
Err(ContactsServiceStorageError::DieselError(DieselError::NotFound)) => (),
Err(e) => return Err(e),
},
DbKey::ContactId(id) => match ContactSql::find_by_node_id(&id.to_vec(), &conn) {
DbKey::ContactId(id) => match ContactSql::find_by_node_id_and_delete(&conn, &id.to_vec()) {
Ok(c) => {
c.delete(&conn)?;
return Ok(Some(DbValue::Contact(Box::new(Contact::try_from(c)?))));
},
Err(ContactsServiceStorageError::DieselError(DieselError::NotFound)) => (),
Expand Down Expand Up @@ -186,28 +172,58 @@ impl ContactSql {
.first::<ContactSql>(conn)?)
}

pub fn delete(&self, conn: &SqliteConnection) -> Result<(), ContactsServiceStorageError> {
let num_deleted =
diesel::delete(contacts::table.filter(contacts::public_key.eq(&self.public_key))).execute(conn)?;
/// Find a particular Contact by their public key, and update it if it exists, returning the affected record
pub fn find_by_public_key_and_update(
conn: &SqliteConnection,
public_key: &[u8],
updated_contact: UpdateContact,
) -> Result<ContactSql, ContactsServiceStorageError> {
// Note: `get_result` not implemented for SQLite
diesel::update(contacts::table.filter(contacts::public_key.eq(public_key)))
.set(updated_contact)
.execute(conn)
.num_rows_affected_or_not_found(1)?;
ContactSql::find_by_public_key(public_key, conn)
}

if num_deleted == 0 {
/// Find a particular Contact by their public key, and delete it if it exists, returning the affected record
pub fn find_by_public_key_and_delete(
conn: &SqliteConnection,
public_key: &[u8],
) -> Result<ContactSql, ContactsServiceStorageError> {
// Note: `get_result` not implemented for SQLite
let contact = ContactSql::find_by_public_key(public_key, conn)?;
if diesel::delete(contacts::table.filter(contacts::public_key.eq(public_key))).execute(conn)? == 0 {
return Err(ContactsServiceStorageError::ValuesNotFound);
}

Ok(())
Ok(contact)
}

pub fn update(
&self,
updated_contact: UpdateContact,
/// Find a particular Contact by their node ID, and update it if it exists, returning the affected record
pub fn find_by_node_id_and_update(
conn: &SqliteConnection,
node_id: &[u8],
updated_contact: UpdateContact,
) -> Result<ContactSql, ContactsServiceStorageError> {
diesel::update(contacts::table.filter(contacts::public_key.eq(&self.public_key)))
// Note: `get_result` not implemented for SQLite
diesel::update(contacts::table.filter(contacts::node_id.eq(node_id)))
.set(updated_contact)
.execute(conn)
.num_rows_affected_or_not_found(1)?;
ContactSql::find_by_node_id(node_id, conn)
}

ContactSql::find_by_public_key(&self.public_key, conn)
/// Find a particular Contact by their node ID, and delete it if it exists, returning the affected record
pub fn find_by_node_id_and_delete(
conn: &SqliteConnection,
node_id: &[u8],
) -> Result<ContactSql, ContactsServiceStorageError> {
// Note: `get_result` not implemented for SQLite
let contact = ContactSql::find_by_node_id(node_id, conn)?;
if diesel::delete(contacts::table.filter(contacts::node_id.eq(node_id))).execute(conn)? == 0 {
return Err(ContactsServiceStorageError::ValuesNotFound);
}
Ok(contact)
}
}

Expand Down Expand Up @@ -306,7 +322,7 @@ mod test {
.unwrap()
);

ContactSql::from(contacts[0].clone()).delete(&conn).unwrap();
ContactSql::find_by_public_key_and_delete(&conn, &contacts[0].public_key.clone().to_vec()).unwrap();

let retrieved_contacts = ContactSql::index(&conn).unwrap();
assert_eq!(retrieved_contacts.len(), 2);
Expand All @@ -315,16 +331,13 @@ mod test {
.iter()
.any(|v| v == &ContactSql::from(contacts[0].clone())));

let c = ContactSql::find_by_public_key(&contacts[1].public_key.to_vec(), &conn).unwrap();
c.update(
UpdateContact {
let _c =
ContactSql::find_by_public_key_and_update(&conn, &contacts[1].public_key.to_vec(), UpdateContact {
alias: Some("Fred".to_string()),
last_seen: None,
latency: None,
},
&conn,
)
.unwrap();
})
.unwrap();

let c_updated = ContactSql::find_by_public_key(&contacts[1].public_key.to_vec(), &conn).unwrap();
assert_eq!(c_updated.alias, "Fred".to_string());
Expand Down
122 changes: 59 additions & 63 deletions base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ use std::{
use chacha20poly1305::XChaCha20Poly1305;
use chrono::NaiveDateTime;
use derivative::Derivative;
use diesel::{prelude::*, result::Error as DieselError, SqliteConnection};
use diesel::{
prelude::*,
r2d2::{ConnectionManager, PooledConnection},
result::Error as DieselError,
SqliteConnection,
};
use log::*;
pub use new_output_sql::NewOutputSql;
pub use output_sql::OutputSql;
Expand Down Expand Up @@ -715,29 +720,20 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
let conn = self.database_connection.get_pooled_connection()?;
let acquire_lock = start.elapsed();

let outputs_to_be_received =
OutputSql::find_by_tx_id_and_status(tx_id, OutputStatus::ShortTermEncumberedToBeReceived, &conn)?;
for o in &outputs_to_be_received {
o.update(
UpdateOutput {
status: Some(OutputStatus::EncumberedToBeReceived),
..Default::default()
},
&conn,
)?;
}
update_outputs_with_tx_id_and_status_to_new_status(
&conn,
tx_id,
OutputStatus::ShortTermEncumberedToBeReceived,
OutputStatus::EncumberedToBeReceived,
)?;

update_outputs_with_tx_id_and_status_to_new_status(
&conn,
tx_id,
OutputStatus::ShortTermEncumberedToBeSpent,
OutputStatus::EncumberedToBeSpent,
)?;

let outputs_to_be_spent =
OutputSql::find_by_tx_id_and_status(tx_id, OutputStatus::ShortTermEncumberedToBeSpent, &conn)?;
for o in &outputs_to_be_spent {
o.update(
UpdateOutput {
status: Some(OutputStatus::EncumberedToBeSpent),
..Default::default()
},
&conn,
)?;
}
if start.elapsed().as_millis() > 0 {
trace!(
target: LOG_TARGET,
Expand All @@ -757,27 +753,14 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
let conn = self.database_connection.get_pooled_connection()?;
let acquire_lock = start.elapsed();

let outputs_to_be_received = OutputSql::index_status(OutputStatus::ShortTermEncumberedToBeReceived, &conn)?;
for o in &outputs_to_be_received {
o.update(
UpdateOutput {
status: Some(OutputStatus::CancelledInbound),
..Default::default()
},
&conn,
)?;
}
diesel::update(outputs::table.filter(outputs::status.eq(OutputStatus::ShortTermEncumberedToBeReceived as i32)))
.set((outputs::status.eq(OutputStatus::CancelledInbound as i32),))
.execute(&conn)?;

diesel::update(outputs::table.filter(outputs::status.eq(OutputStatus::ShortTermEncumberedToBeSpent as i32)))
.set((outputs::status.eq(OutputStatus::Unspent as i32),))
.execute(&conn)?;

let outputs_to_be_spent = OutputSql::index_status(OutputStatus::ShortTermEncumberedToBeSpent, &conn)?;
for o in &outputs_to_be_spent {
o.update(
UpdateOutput {
status: Some(OutputStatus::Unspent),
..Default::default()
},
&conn,
)?;
}
if start.elapsed().as_millis() > 0 {
trace!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1093,16 +1076,12 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
.execute(&conn)
.num_rows_affected_or_not_found(1)?;
} else {
let output = OutputSql::find_by_tx_id_and_status(tx_id, OutputStatus::AbandonedCoinbase, &conn)?;
for o in output {
o.update(
UpdateOutput {
status: Some(OutputStatus::EncumberedToBeReceived),
..Default::default()
},
&conn,
)?;
}
update_outputs_with_tx_id_and_status_to_new_status(
&conn,
tx_id,
OutputStatus::AbandonedCoinbase,
OutputStatus::EncumberedToBeReceived,
)?;
};
if start.elapsed().as_millis() > 0 {
trace!(
Expand All @@ -1121,17 +1100,14 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
let start = Instant::now();
let conn = self.database_connection.get_pooled_connection()?;
let acquire_lock = start.elapsed();
let outputs = OutputSql::find_by_tx_id_and_status(tx_id, OutputStatus::CancelledInbound, &conn)?;

for o in outputs {
o.update(
UpdateOutput {
status: Some(OutputStatus::EncumberedToBeReceived),
..Default::default()
},
&conn,
)?;
}
update_outputs_with_tx_id_and_status_to_new_status(
&conn,
tx_id,
OutputStatus::CancelledInbound,
OutputStatus::EncumberedToBeReceived,
)?;

if start.elapsed().as_millis() > 0 {
trace!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1231,6 +1207,26 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
}
}

fn update_outputs_with_tx_id_and_status_to_new_status(
conn: &PooledConnection<ConnectionManager<SqliteConnection>>,
tx_id: TxId,
from_status: OutputStatus,
to_status: OutputStatus,
) -> Result<(), OutputManagerStorageError> {
diesel::update(
outputs::table
.filter(
outputs::received_in_tx_id
.eq(Some(tx_id.as_u64() as i64))
.or(outputs::spent_in_tx_id.eq(Some(tx_id.as_u64() as i64))),
)
.filter(outputs::status.eq(from_status as i32)),
)
.set(outputs::status.eq(to_status as i32))
.execute(conn)?;
Ok(())
}

/// These are the fields that can be updated for an Output
#[derive(Default)]
pub struct UpdateOutput {
Expand Down
6 changes: 3 additions & 3 deletions common_sqlite/src/connection_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ impl ConnectionOptions {
impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error> for ConnectionOptions {
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
(|| {
if let Some(d) = self.busy_timeout {
conn.batch_execute(&format!("PRAGMA busy_timeout = {};", d.as_millis()))?;
}
if self.enable_wal {
conn.batch_execute("PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL;")?;
}
if self.enable_foreign_keys {
conn.batch_execute("PRAGMA foreign_keys = ON;")?;
}
if let Some(d) = self.busy_timeout {
conn.batch_execute(&format!("PRAGMA busy_timeout = {};", d.as_millis()))?;
}
Ok(())
})()
.map_err(diesel::r2d2::Error::QueryError)
Expand Down

0 comments on commit 9abc858

Please sign in to comment.