Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(wallet): detect base node change during txo_validation #4610

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions base_layer/wallet/src/connectivity_service/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ impl WalletConnectivityMock {
self.base_node_watch.send(Some(base_node_peer));
}

pub async fn base_node_changed(&mut self) -> Option<Peer> {
self.base_node_watch.changed().await;
self.base_node_watch.borrow().as_ref().cloned()
}

pub fn send_shutdown(&self) {
self.base_node_wallet_rpc_client.send(None);
self.base_node_sync_rpc_client.send(None);
Expand Down
2 changes: 2 additions & 0 deletions base_layer/wallet/src/output_manager_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub enum OutputManagerError {
ServiceError(String),
#[error("Base node is not synced")]
BaseNodeNotSynced,
#[error("Base node changed")]
BaseNodeChanged,
#[error("Invalid Sender Message Type")]
InvalidSenderMessage,
#[error("Coinbase build error: `{0}`")]
Expand Down
76 changes: 53 additions & 23 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,40 +534,70 @@ where
}

fn validate_outputs(&mut self) -> Result<u64, OutputManagerError> {
if !self.resources.connectivity.is_base_node_set() {
return Err(OutputManagerError::NoBaseNodeKeysProvided);
}
let current_base_node = self
.resources
.connectivity
.get_current_base_node_id()
.ok_or(OutputManagerError::NoBaseNodeKeysProvided)?;
let id = OsRng.next_u64();
let utxo_validation = TxoValidationTask::new(
let txo_validation = TxoValidationTask::new(
id,
self.resources.db.clone(),
self.resources.connectivity.clone(),
self.resources.event_publisher.clone(),
self.resources.config.clone(),
);

let shutdown = self.resources.shutdown_signal.clone();
let mut shutdown = self.resources.shutdown_signal.clone();
let mut base_node_watch = self.resources.connectivity.get_current_base_node_watcher();
let event_publisher = self.resources.event_publisher.clone();
tokio::spawn(async move {
match utxo_validation.execute(shutdown).await {
Ok(id) => {
info!(
target: LOG_TARGET,
"UTXO Validation Protocol (Id: {}) completed successfully", id
);
},
Err(OutputManagerProtocolError { id, error }) => {
warn!(
target: LOG_TARGET,
"Error completing UTXO Validation Protocol (Id: {}): {:?}", id, error
);
if let Err(e) = event_publisher.send(Arc::new(OutputManagerEvent::TxoValidationFailure(id))) {
debug!(
target: LOG_TARGET,
"Error sending event because there are no subscribers: {:?}", e
);
let exec_fut = txo_validation.execute();
tokio::pin!(exec_fut);
loop {
tokio::select! {
result = &mut exec_fut => {
match result {
Ok(id) => {
info!(
target: LOG_TARGET,
"UTXO Validation Protocol (Id: {}) completed successfully", id
);
return;
},
Err(OutputManagerProtocolError { id, error }) => {
warn!(
target: LOG_TARGET,
"Error completing UTXO Validation Protocol (Id: {}): {:?}", id, error
);
if let Err(e) = event_publisher.send(Arc::new(OutputManagerEvent::TxoValidationFailure(id))) {
debug!(
target: LOG_TARGET,
"Error sending event because there are no subscribers: {:?}", e
);
}

return;
},
}
},
_ = shutdown.wait() => {
debug!(target: LOG_TARGET, "TXO Validation Protocol (Id: {}) shutting down because the system is shutting down", id);
return;
},
_ = base_node_watch.changed() => {
if let Some(peer) = base_node_watch.borrow().as_ref() {
if peer.node_id != current_base_node {
debug!(
target: LOG_TARGET,
"TXO Validation Protocol (Id: {}) cancelled because base node changed", id
);
return;
}
}

}
},
}
}
});
Ok(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ use std::{

use log::*;
use tari_common_types::types::{BlockHash, FixedHash};
use tari_comms::protocol::rpc::RpcError::RequestFailed;
use tari_comms::{peer_manager::Peer, protocol::rpc::RpcError::RequestFailed};
use tari_core::{
base_node::rpc::BaseNodeWalletRpcClient,
blocks::BlockHeader,
proto::base_node::{QueryDeletedRequest, UtxoQueryRequest},
};
use tari_shutdown::ShutdownSignal;
use tari_utilities::hex::Hex;
use tokio::sync::watch;

use crate::{
connectivity_service::WalletConnectivityInterface,
Expand All @@ -54,6 +54,7 @@ const LOG_TARGET: &str = "wallet::output_service::txo_validation_task";
pub struct TxoValidationTask<TBackend, TWalletConnectivity> {
operation_id: u64,
db: OutputManagerDatabase<TBackend>,
base_node_watch: watch::Receiver<Option<Peer>>,
connectivity: TWalletConnectivity,
event_publisher: OutputManagerEventSender,
config: OutputManagerServiceConfig,
Expand All @@ -74,23 +75,30 @@ where
Self {
operation_id,
db,
base_node_watch: connectivity.get_current_base_node_watcher(),
connectivity,
event_publisher,
config,
}
}

pub async fn execute(mut self, _shutdown: ShutdownSignal) -> Result<u64, OutputManagerProtocolError> {
pub async fn execute(mut self) -> Result<u64, OutputManagerProtocolError> {
let mut base_node_client = self
.connectivity
.obtain_base_node_wallet_rpc_client()
.await
.ok_or(OutputManagerError::Shutdown)
.for_protocol(self.operation_id)?;

let base_node_peer = self
.base_node_watch
.borrow()
.as_ref()
.map(|p| p.node_id.clone())
.ok_or_else(|| OutputManagerProtocolError::new(self.operation_id, OutputManagerError::BaseNodeChanged))?;
debug!(
target: LOG_TARGET,
"Starting TXO validation protocol (Id: {})", self.operation_id,
"Starting TXO validation protocol with peer {} (Id: {})", base_node_peer, self.operation_id,
);

let last_mined_header = self.check_for_reorgs(&mut base_node_client).await?;
Expand All @@ -99,10 +107,11 @@ where

self.update_spent_outputs(&mut base_node_client, last_mined_header)
.await?;

self.publish_event(OutputManagerEvent::TxoValidationSuccess(self.operation_id));
debug!(
target: LOG_TARGET,
"Finished TXO validation protocol (Id: {})", self.operation_id,
"Finished TXO validation protocol from base node {} (Id: {})", base_node_peer, self.operation_id,
);
Ok(self.operation_id)
}
Expand Down Expand Up @@ -233,6 +242,7 @@ where
batch.len(),
self.operation_id
);

let (mined, unmined, tip_height) = self
.query_base_node_for_outputs(batch, wallet_client)
.await
Expand Down
2 changes: 2 additions & 0 deletions base_layer/wallet/src/transaction_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub enum TransactionServiceError {
AttemptedToBroadcastCoinbaseTransaction(TxId),
#[error("No Base Node public keys are provided for Base chain broadcast and monitoring")]
NoBaseNodeKeysProvided,
#[error("Base node changed during {task_name}")]
BaseNodeChanged { task_name: &'static str },
#[error("Error sending data to Protocol via registered channels")]
ProtocolChannelError,
#[error("Transaction detected as rejected by mempool")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::{
use log::*;
use tari_common_types::{
transaction::{TransactionStatus, TxId},
types::BlockHash,
types::{BlockHash, Signature},
};
use tari_comms::protocol::rpc::{RpcError::RequestFailed, RpcStatusCode::NotFound};
use tari_core::{
Expand All @@ -51,6 +51,7 @@ use crate::{
handle::{TransactionEvent, TransactionEventSender},
storage::{
database::{TransactionBackend, TransactionDatabase},
models::TxCancellationReason,
sqlite_db::UnconfirmedTransactionInfo,
},
},
Expand All @@ -67,9 +68,6 @@ pub struct TransactionValidationProtocol<TTransactionBackend, TWalletConnectivit
event_publisher: TransactionEventSender,
output_manager_handle: OutputManagerHandle,
}
use tari_common_types::types::Signature;

use crate::transaction_service::storage::models::TxCancellationReason;

#[allow(unused_variables)]
impl<TTransactionBackend, TWalletConnectivity> TransactionValidationProtocol<TTransactionBackend, TWalletConnectivity>
Expand Down Expand Up @@ -504,10 +502,6 @@ where
tx_id: TxId,
status: &TransactionStatus,
) -> Result<(), TransactionServiceProtocolError<OperationId>> {
self.db
.set_transaction_as_unmined(tx_id)
.for_protocol(self.operation_id)?;

if *status == TransactionStatus::Coinbase {
if let Err(e) = self.output_manager_handle.set_coinbase_abandoned(tx_id, false).await {
warn!(
Expand All @@ -520,6 +514,10 @@ where
};
}

self.db
.set_transaction_as_unmined(tx_id)
.for_protocol(self.operation_id)?;

self.publish_event(TransactionEvent::TransactionBroadcast(tx_id));
Ok(())
}
Expand Down
33 changes: 29 additions & 4 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2180,9 +2180,12 @@ where
JoinHandle<Result<OperationId, TransactionServiceProtocolError<OperationId>>>,
>,
) -> Result<OperationId, TransactionServiceError> {
if !self.connectivity().is_base_node_set() {
return Err(TransactionServiceError::NoBaseNodeKeysProvided);
}
let current_base_node = self
.resources
.connectivity
.get_current_base_node_id()
.ok_or(TransactionServiceError::NoBaseNodeKeysProvided)?;

trace!(target: LOG_TARGET, "Starting transaction validation protocol");
let id = OperationId::new_random();

Expand All @@ -2195,7 +2198,29 @@ where
self.resources.output_manager_service.clone(),
);

let join_handle = tokio::spawn(protocol.execute());
let mut base_node_watch = self.connectivity().get_current_base_node_watcher();

let join_handle = tokio::spawn(async move {
let exec_fut = protocol.execute();
tokio::pin!(exec_fut);
loop {
tokio::select! {
result = &mut exec_fut => {
return result;
},
_ = base_node_watch.changed() => {
if let Some(peer) = base_node_watch.borrow().as_ref() {
if peer.node_id != current_base_node {
debug!(target: LOG_TARGET, "Base node changed, exiting transaction validation protocol");
return Err(TransactionServiceProtocolError::new(id, TransactionServiceError::BaseNodeChanged {
task_name: "transaction validation_protocol",
}));
}
}
}
}
}
});
join_handles.push(join_handle);

Ok(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,20 @@ async fn setup_output_manager_service<T: OutputManagerBackend + 'static, U: KeyM
mock_base_node_service.set_default_base_node_state();
task::spawn(mock_base_node_service.run());

let wallet_connectivity_mock = create_wallet_connectivity_mock();
let mut wallet_connectivity_mock = create_wallet_connectivity_mock();
// let (connectivity, connectivity_mock) = create_connectivity_mock();
// let connectivity_mock_state = connectivity_mock.get_shared_state();
// task::spawn(connectivity_mock.run());
let server_node_identity = build_node_identity(PeerFeatures::COMMUNICATION_NODE);

wallet_connectivity_mock.notify_base_node_set(server_node_identity.to_peer());
wallet_connectivity_mock.base_node_changed().await;

let service = BaseNodeWalletRpcMockService::new();
let rpc_service_state = service.get_state();

let server = BaseNodeWalletRpcServer::new(service);
let protocol_name = server.as_protocol_name();
let server_node_identity = build_node_identity(PeerFeatures::COMMUNICATION_NODE);

let mut mock_server = MockRpcServer::new(server, server_node_identity.clone());
mock_server.serve();
Expand Down Expand Up @@ -1302,7 +1305,6 @@ async fn test_txo_validation() {

let mut oms = setup_output_manager_service(backend, ks_backend, true).await;

oms.wallet_connectivity_mock.notify_base_node_set(oms.node_id.to_peer());
// Now we add the connection
let mut connection = oms
.mock_rpc_service
Expand Down Expand Up @@ -1854,7 +1856,6 @@ async fn test_txo_revalidation() {

let mut oms = setup_output_manager_service(backend, ks_backend, true).await;

oms.wallet_connectivity_mock.notify_base_node_set(oms.node_id.to_peer());
// Now we add the connection
let mut connection = oms
.mock_rpc_service
Expand Down
Loading