Skip to content

Commit

Permalink
feat: let sql in wal mode provide async db, not app level spawn block…
Browse files Browse the repository at this point in the history
…ing (transaction service) (#4597)

Description
---
Removed spawn blocking calls for db operations from the wallet in the transaction service.
(This is the last  PR in a couple of PRs required to implement this fully throughout the wallet code.)

Motivation and Context
---
As per #3982 and #4555

How Has This Been Tested?
---
Unit tests
Cucumber tests
  • Loading branch information
hansieodendaal authored Sep 2, 2022
1 parent 07cf7fb commit e17c1f9
Show file tree
Hide file tree
Showing 12 changed files with 430 additions and 774 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
.await
.ok_or_else(|| TransactionServiceProtocolError::new(self.tx_id, TransactionServiceError::Shutdown))?;

let completed_tx = match self.resources.db.get_completed_transaction(self.tx_id).await {
let completed_tx = match self.resources.db.get_completed_transaction(self.tx_id) {
Ok(tx) => tx,
Err(e) => {
error!(
Expand Down Expand Up @@ -275,7 +275,6 @@ where
self.resources
.db
.broadcast_completed_transaction(self.tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.tx_id, TransactionServiceError::from(e)))?;
let _size = self
.resources
Expand Down Expand Up @@ -430,7 +429,7 @@ where
"Failed to Cancel outputs for TxId: {} after failed sending attempt with error {:?}", self.tx_id, e
);
}
if let Err(e) = self.resources.db.reject_completed_transaction(self.tx_id, reason).await {
if let Err(e) = self.resources.db.reject_completed_transaction(self.tx_id, reason) {
warn!(
target: LOG_TARGET,
"Failed to Cancel TxId: {} after failed sending attempt with error {:?}", self.tx_id, e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ where
.resources
.db
.transaction_exists(data.tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?
{
trace!(
Expand Down Expand Up @@ -167,7 +166,6 @@ where
self.resources
.db
.add_pending_inbound_transaction(inbound_transaction.tx_id, inbound_transaction.clone())
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

let send_result = send_transaction_reply(
Expand All @@ -182,7 +180,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

if send_result {
Expand Down Expand Up @@ -237,7 +234,7 @@ where
.ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))?
.fuse();

let inbound_tx = match self.resources.db.get_pending_inbound_transaction(self.id).await {
let inbound_tx = match self.resources.db.get_pending_inbound_transaction(self.id) {
Ok(tx) => tx,
Err(_e) => {
debug!(
Expand Down Expand Up @@ -295,7 +292,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
}

Expand Down Expand Up @@ -339,7 +335,6 @@ where
Ok(_) => self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?,
Err(e) => warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -456,8 +451,7 @@ where

self.resources
.db
.complete_inbound_transaction(self.id, completed_transaction.clone())
.await
.complete_inbound_transaction(self.id, completed_transaction)
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

info!(
Expand Down Expand Up @@ -486,17 +480,13 @@ where
"Cancelling Transaction Receive Protocol (TxId: {}) due to timeout after no counterparty response", self.id
);

self.resources
.db
.cancel_pending_transaction(self.id)
.await
.map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?;
self.resources.db.cancel_pending_transaction(self.id).map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?;

self.resources
.output_manager_service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ where
.resources
.db
.transaction_exists(tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?
{
let fee = sender_protocol
Expand All @@ -337,14 +336,12 @@ where
self.resources
.db
.add_pending_outbound_transaction(outbound_tx.tx_id, outbound_tx)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
}
if transaction_status == TransactionStatus::Pending {
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
}

Expand Down Expand Up @@ -394,7 +391,6 @@ where
.resources
.db
.get_pending_outbound_transaction(tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

if !outbound_tx.sender_protocol.is_collecting_single_signature() {
Expand Down Expand Up @@ -452,7 +448,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, e.into()))?
}
},
Expand Down Expand Up @@ -499,7 +494,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(
self.id, TransactionServiceError::from(e))
)?;
Expand All @@ -521,7 +515,6 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(
self.id, TransactionServiceError::from(e))
)?
Expand Down Expand Up @@ -594,7 +587,6 @@ where
self.resources
.db
.complete_outbound_transaction(tx_id, completed_transaction.clone())
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;
info!(
target: LOG_TARGET,
Expand All @@ -615,7 +607,6 @@ where
self.resources
.db
.increment_send_count(tx_id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

let _size = self
Expand Down Expand Up @@ -905,20 +896,15 @@ where
self.resources
.db
.increment_send_count(self.id)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

self.resources
.db
.cancel_pending_transaction(self.id)
.await
.map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?;
self.resources.db.cancel_pending_transaction(self.id).map_err(|e| {
warn!(
target: LOG_TARGET,
"Pending Transaction does not exist and could not be cancelled: {:?}", e
);
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?;

self.resources
.output_manager_service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ where
let unconfirmed_transactions = self
.db
.fetch_unconfirmed_transactions_info()
.await
.for_protocol(self.operation_id)
.unwrap();

Expand Down Expand Up @@ -216,7 +215,7 @@ where
self.operation_id
);
let op_id = self.operation_id;
while let Some(last_mined_transaction) = self.db.fetch_last_mined_transaction().await.for_protocol(op_id)? {
while let Some(last_mined_transaction) = self.db.fetch_last_mined_transaction().for_protocol(op_id)? {
let mined_height = last_mined_transaction
.mined_height
.ok_or_else(|| {
Expand Down Expand Up @@ -414,7 +413,6 @@ where
num_confirmations >= self.config.num_confirmations_required,
status.is_faux(),
)
.await
.for_protocol(self.operation_id)?;

if num_confirmations >= self.config.num_confirmations_required {
Expand Down Expand Up @@ -488,12 +486,10 @@ where
num_confirmations >= self.config.num_confirmations_required,
false,
)
.await
.for_protocol(self.operation_id)?;

self.db
.abandon_coinbase_transaction(tx_id)
.await
.for_protocol(self.operation_id)?;

self.publish_event(TransactionEvent::TransactionCancelled(
Expand All @@ -510,7 +506,6 @@ where
) -> Result<(), TransactionServiceProtocolError<OperationId>> {
self.db
.set_transaction_as_unmined(tx_id)
.await
.for_protocol(self.operation_id)?;

if *status == TransactionStatus::Coinbase {
Expand Down
Loading

0 comments on commit e17c1f9

Please sign in to comment.