Skip to content

Commit

Permalink
Mark all outputs and transactions to be revalidated. 
Browse files Browse the repository at this point in the history
Co-authored-by: Philip Robinson <[email protected]>
  • Loading branch information
SWvheerden and philipr-za committed Oct 21, 2021
1 parent 4bc7c2a commit a75ffca
Show file tree
Hide file tree
Showing 13 changed files with 415 additions and 1 deletion.
8 changes: 7 additions & 1 deletion applications/tari_app_grpc/proto/wallet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ service Wallet {
rpc ListConnectedPeers(Empty) returns (ListConnectedPeersResponse);
// Cancel pending transaction
rpc CancelTransaction (CancelTransactionRequest) returns (CancelTransactionResponse);
// Will triggger a complete revalidation of all wallet outputs.
rpc RevalidateAllTransactions (RevalidateRequest) returns (RevalidateResponse);
}

message GetVersionRequest { }
Expand Down Expand Up @@ -196,4 +198,8 @@ message CancelTransactionRequest {
message CancelTransactionResponse {
bool is_success = 1;
string failure_message = 2;
}
}

message RevalidateRequest{}

message RevalidateResponse{}
19 changes: 19 additions & 0 deletions applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use tari_app_grpc::{
GetVersionResponse,
ImportUtxosRequest,
ImportUtxosResponse,
RevalidateRequest,
RevalidateResponse,
TransactionDirection,
TransactionInfo,
TransactionStatus,
Expand Down Expand Up @@ -118,6 +120,23 @@ impl wallet_server::Wallet for WalletGrpcServer {
}))
}

async fn revalidate_all_transactions(
&self,
_request: Request<RevalidateRequest>,
) -> Result<Response<RevalidateResponse>, Status> {
let mut output_service = self.get_output_manager_service();
output_service
.revalidate_all_outputs()
.await
.map_err(|e| Status::unknown(e.to_string()))?;
let mut tx_service = self.get_transaction_service();
tx_service
.revalidate_all_transactions()
.await
.map_err(|e| Status::unknown(e.to_string()))?;
Ok(Response::new(RevalidateResponse {}))
}

async fn get_coinbase(
&self,
request: Request<GetCoinbaseRequest>,
Expand Down
9 changes: 9 additions & 0 deletions base_layer/wallet/src/output_manager_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub enum OutputManagerRequest {
GetInvalidOutputs,
GetSeedWords,
ValidateUtxos,
RevalidateTxos,
CreateCoinSplit((MicroTari, usize, MicroTari, Option<u64>)),
ApplyEncryption(Box<Aes256Gcm>),
RemoveEncryption,
Expand Down Expand Up @@ -98,6 +99,7 @@ impl fmt::Display for OutputManagerRequest {
GetInvalidOutputs => write!(f, "GetInvalidOutputs"),
GetSeedWords => write!(f, "GetSeedWords"),
ValidateUtxos => write!(f, "ValidateUtxos"),
RevalidateTxos => write!(f, "RevalidateTxos"),
CreateCoinSplit(v) => write!(f, "CreateCoinSplit ({})", v.0),
ApplyEncryption(_) => write!(f, "ApplyEncryption"),
RemoveEncryption => write!(f, "RemoveEncryption"),
Expand Down Expand Up @@ -241,6 +243,13 @@ impl OutputManagerHandle {
}
}

pub async fn revalidate_all_outputs(&mut self) -> Result<u64, OutputManagerError> {
match self.handle.call(OutputManagerRequest::RevalidateTxos).await?? {
OutputManagerResponse::TxoValidationStarted(request_key) => Ok(request_key),
_ => Err(OutputManagerError::UnexpectedApiResponse),
}
}

pub async fn get_recipient_transaction(
&mut self,
sender_message: TransactionSenderMessage,
Expand Down
9 changes: 9 additions & 0 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ where
OutputManagerRequest::ValidateUtxos => {
self.validate_outputs().map(OutputManagerResponse::TxoValidationStarted)
},
OutputManagerRequest::RevalidateTxos => self
.revalidate_outputs()
.await
.map(OutputManagerResponse::TxoValidationStarted),
OutputManagerRequest::GetInvalidOutputs => {
let outputs = self
.fetch_invalid_outputs()
Expand Down Expand Up @@ -385,6 +389,11 @@ where
Ok(id)
}

async fn revalidate_outputs(&mut self) -> Result<u64, OutputManagerError> {
self.resources.db.set_outputs_to_be_revalidated().await?;
self.validate_outputs()
}

/// Add an unblinded output to the unspent outputs list
pub async fn add_output(&mut self, tx_id: Option<TxId>, output: UnblindedOutput) -> Result<(), OutputManagerError> {
debug!(
Expand Down
10 changes: 10 additions & 0 deletions base_layer/wallet/src/output_manager_service/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pub trait OutputManagerBackend: Send + Sync + Clone {

fn set_output_to_unmined(&self, hash: Vec<u8>) -> Result<(), OutputManagerStorageError>;

fn set_outputs_to_be_revalidated(&self) -> Result<(), OutputManagerStorageError>;

fn mark_output_as_spent(
&self,
hash: Vec<u8>,
Expand Down Expand Up @@ -562,6 +564,14 @@ where T: OutputManagerBackend + 'static
Ok(())
}

pub async fn set_outputs_to_be_revalidated(&self) -> Result<(), OutputManagerStorageError> {
let db = self.db.clone();
tokio::task::spawn_blocking(move || db.set_outputs_to_be_revalidated())
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;
Ok(())
}

pub async fn mark_output_as_spent(
&self,
hash: HashOutput,
Expand Down
25 changes: 25 additions & 0 deletions base_layer/wallet/src/output_manager_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,31 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
Ok(())
}

fn set_outputs_to_be_revalidated(&self) -> Result<(), OutputManagerStorageError> {
let start = Instant::now();
let conn = self.database_connection.acquire_lock();
let acquire_lock = start.elapsed();
// Only update non-deleted utxos
let result = diesel::update(outputs::table.filter(outputs::marked_deleted_at_height.is_null()))
.set((
outputs::mined_height.eq::<Option<i64>>(None),
outputs::mined_in_block.eq::<Option<Vec<u8>>>(None),
outputs::mined_mmr_position.eq::<Option<i64>>(None),
))
.execute(&(*conn))?;

trace!(target: LOG_TARGET, "rows updated: {:?}", result);
trace!(
target: LOG_TARGET,
"sqlite profile - set_outputs_to_be_revalidated: lock {} + db_op {} = {} ms",
acquire_lock.as_millis(),
(start.elapsed() - acquire_lock).as_millis(),
start.elapsed().as_millis()
);

Ok(())
}

fn mark_output_as_spent(
&self,
hash: Vec<u8>,
Expand Down
13 changes: 13 additions & 0 deletions base_layer/wallet/src/transaction_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub enum TransactionServiceRequest {
GetNumConfirmationsRequired,
SetNumConfirmationsRequired(u64),
ValidateTransactions,
ReValidateTransactions,
}

impl fmt::Display for TransactionServiceRequest {
Expand Down Expand Up @@ -99,6 +100,7 @@ impl fmt::Display for TransactionServiceRequest {
Self::SetNumConfirmationsRequired(_) => f.write_str("SetNumConfirmationsRequired"),
Self::GetAnyTransaction(t) => f.write_str(&format!("GetAnyTransaction({})", t)),
TransactionServiceRequest::ValidateTransactions => f.write_str("ValidateTransactions"),
TransactionServiceRequest::ReValidateTransactions => f.write_str("ReValidateTransactions"),
}
}
}
Expand Down Expand Up @@ -396,6 +398,17 @@ impl TransactionServiceHandle {
}
}

pub async fn revalidate_all_transactions(&mut self) -> Result<(), TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::ReValidateTransactions)
.await??
{
TransactionServiceResponse::ValidationStarted(_) => Ok(()),
_ => Err(TransactionServiceError::UnexpectedApiResponse),
}
}

pub async fn set_normal_power_mode(&mut self) -> Result<(), TransactionServiceError> {
match self
.handle
Expand Down
12 changes: 12 additions & 0 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,10 @@ where
.start_transaction_validation_protocol(transaction_validation_join_handles)
.await
.map(TransactionServiceResponse::ValidationStarted),
TransactionServiceRequest::ReValidateTransactions => self
.start_transaction_revalidation(transaction_validation_join_handles)
.await
.map(TransactionServiceResponse::ValidationStarted),
};

// If the individual handlers did not already send the API response then do it here.
Expand Down Expand Up @@ -1544,6 +1548,14 @@ where
Ok(())
}

async fn start_transaction_revalidation(
&mut self,
join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
) -> Result<u64, TransactionServiceError> {
self.resources.db.mark_all_transactions_as_unvalidated().await?;
self.start_transaction_validation_protocol(join_handles).await
}

async fn start_transaction_validation_protocol(
&mut self,
join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
Expand Down
10 changes: 10 additions & 0 deletions base_layer/wallet/src/transaction_service/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ pub trait TransactionBackend: Send + Sync + Clone {

/// Clears the mined block and height of a transaction
fn set_transaction_as_unmined(&self, tx_id: TxId) -> Result<(), TransactionStorageError>;

fn mark_all_transactions_as_unvalidated(&self) -> Result<(), TransactionStorageError>;
}

#[derive(Clone, PartialEq)]
Expand Down Expand Up @@ -752,6 +754,14 @@ where T: TransactionBackend + 'static
Ok(())
}

pub async fn mark_all_transactions_as_unvalidated(&self) -> Result<(), TransactionStorageError> {
let db_clone = self.db.clone();
tokio::task::spawn_blocking(move || db_clone.mark_all_transactions_as_unvalidated())
.await
.map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??;
Ok(())
}

pub async fn set_transaction_mined_height(
&self,
tx_id: TxId,
Expand Down
22 changes: 22 additions & 0 deletions base_layer/wallet/src/transaction_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,28 @@ impl TransactionBackend for TransactionServiceSqliteDatabase {
Ok(result)
}

fn mark_all_transactions_as_unvalidated(&self) -> Result<(), TransactionStorageError> {
let start = Instant::now();
let conn = self.database_connection.acquire_lock();
let acquire_lock = start.elapsed();
let result = diesel::update(completed_transactions::table.filter(completed_transactions::cancelled.eq(0)))
.set((
completed_transactions::mined_height.eq::<Option<i64>>(None),
completed_transactions::mined_in_block.eq::<Option<Vec<u8>>>(None),
))
.execute(&(*conn))?;

trace!(target: LOG_TARGET, "rows updated: {:?}", result);
trace!(
target: LOG_TARGET,
"sqlite profile - set_transactions_to_be_revalidated: lock {} + db_op {} = {} ms",
acquire_lock.as_millis(),
(start.elapsed() - acquire_lock).as_millis(),
start.elapsed().as_millis()
);
Ok(())
}

fn set_transaction_as_unmined(&self, tx_id: u64) -> Result<(), TransactionStorageError> {
let start = Instant::now();
let conn = self.database_connection.acquire_lock();
Expand Down
Loading

0 comments on commit a75ffca

Please sign in to comment.