diff --git a/base_layer/wallet/src/output_manager_service/handle.rs b/base_layer/wallet/src/output_manager_service/handle.rs index ff483c8f3b..710f52307a 100644 --- a/base_layer/wallet/src/output_manager_service/handle.rs +++ b/base_layer/wallet/src/output_manager_service/handle.rs @@ -48,6 +48,7 @@ pub enum OutputManagerRequest { GetBalance, AddOutput(Box), AddOutputWithTxId((TxId, Box)), + AddUnvalidatedOutput((TxId, Box)), UpdateOutputMetadataSignature(Box), GetRecipientTransaction(TransactionSenderMessage), GetCoinbaseTransaction((u64, MicroTari, MicroTari, u64)), @@ -131,6 +132,9 @@ impl fmt::Display for OutputManagerRequest { pre_image, fee_per_gram, ), + OutputManagerRequest::AddUnvalidatedOutput((t, v)) => { + write!(f, "AddUnvalidatedOutput ({}: {})", t, v.value) + }, } } } @@ -234,6 +238,21 @@ impl OutputManagerHandle { } } + pub async fn add_unvalidated_output( + &mut self, + tx_id: TxId, + output: UnblindedOutput, + ) -> Result<(), OutputManagerError> { + match self + .handle + .call(OutputManagerRequest::AddUnvalidatedOutput((tx_id, Box::new(output)))) + .await?? + { + OutputManagerResponse::OutputAdded => Ok(()), + _ => Err(OutputManagerError::UnexpectedApiResponse), + } + } + pub async fn update_output_metadata_signature( &mut self, output: TransactionOutput, diff --git a/base_layer/wallet/src/output_manager_service/service.rs b/base_layer/wallet/src/output_manager_service/service.rs index e47cbe9db1..e4738d8d09 100644 --- a/base_layer/wallet/src/output_manager_service/service.rs +++ b/base_layer/wallet/src/output_manager_service/service.rs @@ -339,6 +339,10 @@ where self.claim_sha_atomic_swap_with_hash(output_hash, pre_image, fee_per_gram) .await }, + OutputManagerRequest::AddUnvalidatedOutput((tx_id, uo)) => self + .add_unvalidated_output(tx_id, *uo) + .await + .map(|_| OutputManagerResponse::OutputAdded), } } @@ -417,7 +421,7 @@ where self.validate_outputs() } - /// Add an unblinded output to the unspent outputs list + /// Add an unblinded output to the outputs table and marks is as `Unspent`. pub async fn add_output(&mut self, tx_id: Option, output: UnblindedOutput) -> Result<(), OutputManagerError> { debug!( target: LOG_TARGET, @@ -431,6 +435,22 @@ where Ok(()) } + /// Add an unblinded output to the outputs table and marks is as `EncumberedToBeReceived`. This is so that it will + /// require a successful validation to confirm that it indeed spendable. + pub async fn add_unvalidated_output( + &mut self, + tx_id: TxId, + output: UnblindedOutput, + ) -> Result<(), OutputManagerError> { + debug!( + target: LOG_TARGET, + "Add unvalidated output of value {} to Output Manager", output.value + ); + let output = DbUnblindedOutput::from_unblinded_output(output, &self.resources.factories)?; + self.resources.db.add_unvalidated_output(tx_id, output).await?; + Ok(()) + } + /// Update an output's metadata signature, akin to 'finalize output' pub async fn update_output_metadata_signature( &mut self, diff --git a/base_layer/wallet/src/output_manager_service/storage/database.rs b/base_layer/wallet/src/output_manager_service/storage/database.rs index 127472d2d9..72caec2822 100644 --- a/base_layer/wallet/src/output_manager_service/storage/database.rs +++ b/base_layer/wallet/src/output_manager_service/storage/database.rs @@ -128,6 +128,8 @@ pub trait OutputManagerBackend: Send + Sync + Clone { &self, current_tip_for_time_lock_calculation: Option, ) -> Result; + /// Import unvalidated output + fn add_unvalidated_output(&self, output: DbUnblindedOutput, tx_id: TxId) -> Result<(), OutputManagerStorageError>; } /// Holds the state of the KeyManager being used by the Output Manager Service @@ -264,6 +266,19 @@ where T: OutputManagerBackend + 'static Ok(()) } + pub async fn add_unvalidated_output( + &self, + tx_id: TxId, + output: DbUnblindedOutput, + ) -> Result<(), OutputManagerStorageError> { + let db_clone = self.db.clone(); + tokio::task::spawn_blocking(move || db_clone.add_unvalidated_output(output, tx_id)) + .await + .map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??; + + Ok(()) + } + pub async fn add_output_to_be_received( &self, tx_id: TxId, diff --git a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs index fdd8661e9d..974c4a57fc 100644 --- a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs +++ b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs @@ -1166,6 +1166,30 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { } Ok(()) } + + fn add_unvalidated_output(&self, output: DbUnblindedOutput, tx_id: TxId) -> Result<(), OutputManagerStorageError> { + let start = Instant::now(); + let conn = self.database_connection.get_pooled_connection()?; + let acquire_lock = start.elapsed(); + + if OutputSql::find_by_commitment_and_cancelled(&output.commitment.to_vec(), false, &conn).is_ok() { + return Err(OutputManagerStorageError::DuplicateOutput); + } + let mut new_output = NewOutputSql::new(output, OutputStatus::EncumberedToBeReceived, Some(tx_id), None)?; + self.encrypt_if_necessary(&mut new_output)?; + new_output.commit(&conn)?; + + if start.elapsed().as_millis() > 0 { + trace!( + target: LOG_TARGET, + "sqlite profile - reinstate_cancelled_inbound_output: lock {} + db_op {} = {} ms", + acquire_lock.as_millis(), + (start.elapsed() - acquire_lock).as_millis(), + start.elapsed().as_millis() + ); + } + Ok(()) + } } impl TryFrom for OutputStatus { diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index 1b71f2744d..1ea209b069 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -373,7 +373,7 @@ where .await?; self.output_manager_service - .add_output_with_tx_id(tx_id, unblinded_output.clone()) + .add_unvalidated_output(tx_id, unblinded_output.clone()) .await?; info!( diff --git a/base_layer/wallet/tests/wallet/mod.rs b/base_layer/wallet/tests/wallet/mod.rs index e2a2df6da2..3dcc79190b 100644 --- a/base_layer/wallet/tests/wallet/mod.rs +++ b/base_layer/wallet/tests/wallet/mod.rs @@ -744,7 +744,7 @@ async fn test_import_utxo() { let balance = alice_wallet.output_manager_service.get_balance().await.unwrap(); - assert_eq!(balance.available_balance, 20000 * uT); + assert_eq!(balance.pending_incoming_balance, 20000 * uT); let completed_tx = alice_wallet .transaction_service @@ -755,8 +755,6 @@ async fn test_import_utxo() { .expect("Tx should be in collection"); assert_eq!(completed_tx.amount, 20000 * uT); - let stored_utxo = alice_wallet.output_manager_service.get_unspent_outputs().await.unwrap()[0].clone(); - assert_eq!(stored_utxo, utxo); } #[test] diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index c783a1147a..fbd9046d76 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -4392,7 +4392,25 @@ pub unsafe extern "C" fn wallet_import_utxo( &(*spending_key).clone(), &Default::default(), )) { - Ok(tx_id) => tx_id, + Ok(tx_id) => { + if let Err(e) = (*wallet) + .runtime + .block_on((*wallet).wallet.output_manager_service.validate_txos()) + { + error = LibWalletError::from(WalletError::OutputManagerError(e)).code; + ptr::swap(error_out, &mut error as *mut c_int); + return 0; + } + if let Err(e) = (*wallet) + .runtime + .block_on((*wallet).wallet.transaction_service.validate_transactions()) + { + error = LibWalletError::from(WalletError::TransactionServiceError(e)).code; + ptr::swap(error_out, &mut error as *mut c_int); + return 0; + } + tx_id + }, Err(e) => { error = LibWalletError::from(e).code; ptr::swap(error_out, &mut error as *mut c_int);