Skip to content

Commit

Permalink
feat: optimize pending transactions inbound query (#3500)
Browse files Browse the repository at this point in the history
Description
---
- Optimized pending inbound transaction query by doing filtering with SQL.
- Expanded the unit tests to also test the new query.

Motivation and Context
---
This is a part of a series of PRs to reduce the memory footprint of the console wallet.

How Has This Been Tested?
---
Unit tests, wallet cucumber tests
  • Loading branch information
hansieodendaal authored Oct 28, 2021
1 parent 3dfb503 commit 4ea02e7
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 9 deletions.
2 changes: 2 additions & 0 deletions base_layer/wallet/src/transaction_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ pub enum TransactionStorageError {
AeadError(String),
#[error("Transaction (TxId: '{0}') is not mined")]
TransactionNotMined(TxId),
#[error("Conversion error: `{0}`")]
ByteArrayError(#[from] ByteArrayError),
}

/// This error type is used to return TransactionServiceErrors from inside a Transaction Service protocol but also
Expand Down
6 changes: 3 additions & 3 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1478,9 +1478,9 @@ where
&mut self,
join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
) -> Result<(), TransactionServiceError> {
let inbound_txs = self.db.get_pending_inbound_transactions().await?;
for (tx_id, tx) in inbound_txs {
self.restart_receive_transaction_protocol(tx_id, tx.source_public_key.clone(), join_handles);
let inbound_txs = self.db.get_pending_inbound_transaction_sender_info().await?;
for txn in inbound_txs {
self.restart_receive_transaction_protocol(txn.tx_id, txn.source_public_key, join_handles);
}

Ok(())
Expand Down
23 changes: 20 additions & 3 deletions base_layer/wallet/src/transaction_service/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use aes_gcm::Aes256Gcm;
use chrono::Utc;
use log::*;

use crate::transaction_service::storage::models::WalletTransaction;
use crate::transaction_service::storage::{models::WalletTransaction, sqlite_db::InboundTransactionSenderInfo};
use std::{
collections::HashMap,
fmt,
Expand Down Expand Up @@ -123,11 +123,14 @@ pub trait TransactionBackend: Send + Sync + Clone {
num_confirmations: u64,
is_confirmed: bool,
) -> Result<(), TransactionStorageError>;

/// Clears the mined block and height of a transaction
fn set_transaction_as_unmined(&self, tx_id: TxId) -> Result<(), TransactionStorageError>;

/// Mark all transactions as unvalidated
fn mark_all_transactions_as_unvalidated(&self) -> Result<(), TransactionStorageError>;
/// Get transaction sender info for all pending inbound transactions
fn get_pending_inbound_transaction_sender_info(
&self,
) -> Result<Vec<InboundTransactionSenderInfo>, TransactionStorageError>;
}

#[derive(Clone, PartialEq)]
Expand Down Expand Up @@ -793,6 +796,20 @@ where T: TransactionBackend + 'static
.map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??;
Ok(())
}

pub async fn get_pending_inbound_transaction_sender_info(
&self,
) -> Result<Vec<InboundTransactionSenderInfo>, TransactionStorageError> {
let db_clone = self.db.clone();

let t = tokio::task::spawn_blocking(move || match db_clone.get_pending_inbound_transaction_sender_info() {
Ok(v) => Ok(v),
Err(e) => log_error(DbKey::PendingInboundTransactions, e),
})
.await
.map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??;
Ok(t)
}
}

impl Display for DbKey {
Expand Down
84 changes: 81 additions & 3 deletions base_layer/wallet/src/transaction_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,67 @@ impl TransactionBackend for TransactionServiceSqliteDatabase {
);
Ok(())
}

fn get_pending_inbound_transaction_sender_info(
&self,
) -> Result<Vec<InboundTransactionSenderInfo>, TransactionStorageError> {
let start = Instant::now();
let conn = self.database_connection.acquire_lock();
let acquire_lock = start.elapsed();
let mut sender_info: Vec<InboundTransactionSenderInfo> = vec![];
match InboundTransactionSenderInfoSql::get_pending_inbound_transaction_sender_info(&(*conn)) {
Ok(info) => {
for item in info {
sender_info.push(InboundTransactionSenderInfo::try_from(item)?);
}
},
Err(e) => return Err(e),
}
trace!(
target: LOG_TARGET,
"sqlite profile - get_pending_inbound_transaction_sender_info: lock {} + db_op {} = {} ms",
acquire_lock.as_millis(),
(start.elapsed() - acquire_lock).as_millis(),
start.elapsed().as_millis()
);
Ok(sender_info)
}
}

#[derive(Debug, PartialEq)]
pub struct InboundTransactionSenderInfo {
pub(crate) tx_id: u64,
pub(crate) source_public_key: CommsPublicKey,
}

impl TryFrom<InboundTransactionSenderInfoSql> for InboundTransactionSenderInfo {
type Error = TransactionStorageError;

fn try_from(i: InboundTransactionSenderInfoSql) -> Result<Self, Self::Error> {
Ok(Self {
tx_id: i.tx_id as u64,
source_public_key: CommsPublicKey::from_bytes(&*i.source_public_key)
.map_err(TransactionStorageError::ByteArrayError)?,
})
}
}

#[derive(Clone, Queryable)]
pub struct InboundTransactionSenderInfoSql {
pub tx_id: i64,
pub source_public_key: Vec<u8>,
}

impl InboundTransactionSenderInfoSql {
pub fn get_pending_inbound_transaction_sender_info(
conn: &SqliteConnection,
) -> Result<Vec<InboundTransactionSenderInfoSql>, TransactionStorageError> {
let query_result = inbound_transactions::table
.select((inbound_transactions::tx_id, inbound_transactions::source_public_key))
.filter(inbound_transactions::cancelled.eq(false as i32))
.load::<InboundTransactionSenderInfoSql>(conn)?;
Ok(query_result)
}
}

#[derive(Clone, Debug, Queryable, Insertable, PartialEq)]
Expand Down Expand Up @@ -1796,6 +1857,7 @@ mod test {
models::{CompletedTransaction, InboundTransaction, OutboundTransaction},
sqlite_db::{
CompletedTransactionSql,
InboundTransactionSenderInfo,
InboundTransactionSql,
OutboundTransactionSql,
TransactionServiceSqliteDatabase,
Expand Down Expand Up @@ -2400,7 +2462,7 @@ mod test {
}

#[test]
fn test_get_tranactions_to_be_rebroadcast() {
fn test_customized_transactional_queries() {
let db_name = format!("{}.sqlite3", string(8).as_str());
let temp_dir = tempdir().unwrap();
let db_folder = temp_dir.path().to_str().unwrap().to_string();
Expand All @@ -2411,6 +2473,7 @@ mod test {

embedded_migrations::run_with_output(&conn, &mut std::io::stdout()).expect("Migration failed");

let mut info_list_reference: Vec<InboundTransactionSenderInfo> = vec![];
for i in 0..1000 {
let (valid, cancelled, status, coinbase_block_height) = match i % 13 {
0 => (true, i % 3 == 0, TransactionStatus::Completed, None),
Expand Down Expand Up @@ -2452,20 +2515,35 @@ mod test {
mined_height: None,
mined_in_block: None,
};
let completed_tx_sql = CompletedTransactionSql::try_from(completed_tx).unwrap();
let completed_tx_sql = CompletedTransactionSql::try_from(completed_tx.clone()).unwrap();
completed_tx_sql.commit(&conn).unwrap();

let inbound_tx = InboundTransaction::from(completed_tx);
let inbound_tx_sql = InboundTransactionSql::try_from(inbound_tx.clone()).unwrap();
inbound_tx_sql.commit(&conn).unwrap();

if !cancelled {
info_list_reference.push(InboundTransactionSenderInfo {
tx_id: inbound_tx.tx_id,
source_public_key: inbound_tx.source_public_key,
})
}
}

let connection = WalletDbConnection::new(conn, None);
let db1 = TransactionServiceSqliteDatabase::new(connection, None);

let txn_list = db1.get_transactions_to_be_broadcast().unwrap();
assert_eq!(db1.get_transactions_to_be_broadcast().unwrap().len(), 185);
assert_eq!(txn_list.len(), 185);
for txn in &txn_list {
assert!(txn.status == TransactionStatus::Completed || txn.status == TransactionStatus::Broadcast);
assert!(txn.valid);
assert!(!txn.cancelled);
assert!(txn.coinbase_block_height == None || txn.coinbase_block_height == Some(0));
}

let info_list = db1.get_pending_inbound_transaction_sender_info().unwrap();
assert_eq!(info_list.len(), 941);
assert_eq!(info_list, info_list_reference);
}
}

0 comments on commit 4ea02e7

Please sign in to comment.