Skip to content

Commit

Permalink
feat(wallet_ffi)!: add base node connectivity callback to wallet ffi (#…
Browse files Browse the repository at this point in the history
…3796)

Description
---
This PR adds a new callback to the FFI interface that fires when there is a change in the wallet’s connectivity to the set base node.

This information was already available from the Wallet Connectivity Service via a watch channel and this PR just hooks up this watch channel to the Wallet FFI callback handler.

How Has This Been Tested?
---
The unit test for the callback handler is updated to test the addition.
  • Loading branch information
philipr-za authored Feb 5, 2022
1 parent cfc42dd commit 66ea697
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 3 deletions.
27 changes: 27 additions & 0 deletions base_layer/wallet_ffi/src/callback_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use tari_comms::types::CommsPublicKey;
use tari_comms_dht::event::{DhtEvent, DhtEventReceiver};
use tari_shutdown::ShutdownSignal;
use tari_wallet::{
connectivity_service::OnlineStatus,
output_manager_service::{
handle::{OutputManagerEvent, OutputManagerEventReceiver, OutputManagerHandle},
service::Balance,
Expand All @@ -66,6 +67,7 @@ use tari_wallet::{
},
},
};
use tokio::sync::watch;

const LOG_TARGET: &str = "wallet::transaction_service::callback_handler";

Expand All @@ -85,6 +87,7 @@ where TBackend: TransactionBackend + 'static
callback_balance_updated: unsafe extern "C" fn(*mut Balance),
callback_transaction_validation_complete: unsafe extern "C" fn(u64, bool),
callback_saf_messages_received: unsafe extern "C" fn(),
callback_connectivity_status: unsafe extern "C" fn(u64),
db: TransactionDatabase<TBackend>,
transaction_service_event_stream: TransactionEventReceiver,
output_manager_service_event_stream: OutputManagerEventReceiver,
Expand All @@ -93,6 +96,7 @@ where TBackend: TransactionBackend + 'static
shutdown_signal: Option<ShutdownSignal>,
comms_public_key: CommsPublicKey,
balance_cache: Balance,
connectivity_status_watch: watch::Receiver<OnlineStatus>,
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -107,6 +111,7 @@ where TBackend: TransactionBackend + 'static
dht_event_stream: DhtEventReceiver,
shutdown_signal: ShutdownSignal,
comms_public_key: CommsPublicKey,
connectivity_status_watch: watch::Receiver<OnlineStatus>,
callback_received_transaction: unsafe extern "C" fn(*mut InboundTransaction),
callback_received_transaction_reply: unsafe extern "C" fn(*mut CompletedTransaction),
callback_received_finalized_transaction: unsafe extern "C" fn(*mut CompletedTransaction),
Expand All @@ -120,6 +125,7 @@ where TBackend: TransactionBackend + 'static
callback_balance_updated: unsafe extern "C" fn(*mut Balance),
callback_transaction_validation_complete: unsafe extern "C" fn(u64, bool),
callback_saf_messages_received: unsafe extern "C" fn(),
callback_connectivity_status: unsafe extern "C" fn(u64),
) -> Self {
info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -173,6 +179,10 @@ where TBackend: TransactionBackend + 'static
target: LOG_TARGET,
"SafMessagesReceivedCallback -> Assigning Fn: {:?}", callback_saf_messages_received
);
info!(
target: LOG_TARGET,
"ConnectivityStatusCallback -> Assigning Fn: {:?}", callback_connectivity_status
);

Self {
callback_received_transaction,
Expand All @@ -188,6 +198,7 @@ where TBackend: TransactionBackend + 'static
callback_balance_updated,
callback_transaction_validation_complete,
callback_saf_messages_received,
callback_connectivity_status,
db,
transaction_service_event_stream,
output_manager_service_event_stream,
Expand All @@ -196,6 +207,7 @@ where TBackend: TransactionBackend + 'static
shutdown_signal: Some(shutdown_signal),
comms_public_key,
balance_cache: Balance::zero(),
connectivity_status_watch,
}
}

Expand Down Expand Up @@ -302,6 +314,11 @@ where TBackend: TransactionBackend + 'static
Err(_e) => error!(target: LOG_TARGET, "Error reading from DHT event broadcast channel"),
}
}
Ok(_) = self.connectivity_status_watch.changed() => {
let status = *self.connectivity_status_watch.borrow();
trace!(target: LOG_TARGET, "Connectivity status change detected: {:?}", status);
self.connectivity_status_changed(status);
},
_ = shutdown_signal.wait() => {
info!(target: LOG_TARGET, "Transaction Callback Handler shutting down because the shutdown signal was received");
break;
Expand Down Expand Up @@ -516,4 +533,14 @@ where TBackend: TransactionBackend + 'static
(self.callback_saf_messages_received)();
}
}

fn connectivity_status_changed(&mut self, status: OnlineStatus) {
debug!(
target: LOG_TARGET,
"Calling Connectivity Status changed callback function"
);
unsafe {
(self.callback_connectivity_status)(status as u64);
}
}
}
28 changes: 27 additions & 1 deletion base_layer/wallet_ffi/src/callback_handler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ mod test {
use tari_service_framework::reply_channel;
use tari_shutdown::Shutdown;
use tari_wallet::{
connectivity_service::OnlineStatus,
output_manager_service::{
handle::{OutputManagerEvent, OutputManagerHandle},
service::Balance,
Expand All @@ -60,7 +61,11 @@ mod test {
},
},
};
use tokio::{runtime::Runtime, sync::broadcast, time::Instant};
use tokio::{
runtime::Runtime,
sync::{broadcast, watch},
time::Instant,
};

use crate::{callback_handler::CallbackHandler, output_manager_service_mock::MockOutputManagerService};

Expand All @@ -81,6 +86,7 @@ mod test {
pub callback_balance_updated: u32,
pub callback_transaction_validation_complete: u32,
pub saf_messages_received: bool,
pub connectivity_status_callback_called: u64,
}

impl CallbackState {
Expand All @@ -101,6 +107,7 @@ mod test {
tx_cancellation_callback_called_inbound: false,
tx_cancellation_callback_called_outbound: false,
saf_messages_received: false,
connectivity_status_callback_called: 0,
}
}
}
Expand Down Expand Up @@ -200,6 +207,12 @@ mod test {
drop(lock);
}

unsafe extern "C" fn connectivity_status_callback(status: u64) {
let mut lock = CALLBACK_STATE.lock().unwrap();
lock.connectivity_status_callback_called += status + 1;
drop(lock);
}

#[test]
fn test_callback_handler() {
let runtime = Runtime::new().unwrap();
Expand Down Expand Up @@ -300,6 +313,8 @@ mod test {
runtime.spawn(mock_output_manager_service.run());
assert_eq!(balance, runtime.block_on(oms_handle.get_balance()).unwrap());

let (connectivity_tx, connectivity_rx) = watch::channel(OnlineStatus::Offline);

let callback_handler = CallbackHandler::new(
db,
transaction_event_receiver,
Expand All @@ -308,6 +323,7 @@ mod test {
dht_event_receiver,
shutdown_signal.to_signal(),
PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)),
connectivity_rx,
received_tx_callback,
received_tx_reply_callback,
received_tx_finalized_callback,
Expand All @@ -321,6 +337,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
);

runtime.spawn(callback_handler.start());
Expand Down Expand Up @@ -506,6 +523,14 @@ mod test {
dht_event_sender
.send(Arc::new(DhtEvent::StoreAndForwardMessagesReceived))
.unwrap();
thread::sleep(Duration::from_secs(2));
connectivity_tx.send(OnlineStatus::Offline).unwrap();
thread::sleep(Duration::from_secs(2));
connectivity_tx.send(OnlineStatus::Connecting).unwrap();
thread::sleep(Duration::from_secs(2));
connectivity_tx.send(OnlineStatus::Online).unwrap();
thread::sleep(Duration::from_secs(2));
connectivity_tx.send(OnlineStatus::Connecting).unwrap();

thread::sleep(Duration::from_secs(10));

Expand All @@ -525,6 +550,7 @@ mod test {
assert_eq!(lock.callback_txo_validation_complete, 3);
assert_eq!(lock.callback_balance_updated, 5);
assert_eq!(lock.callback_transaction_validation_complete, 7);
assert_eq!(lock.connectivity_status_callback_called, 7);

drop(lock);
}
Expand Down
25 changes: 25 additions & 0 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ use tari_p2p::{
use tari_shutdown::Shutdown;
use tari_utilities::{hex, hex::Hex};
use tari_wallet::{
connectivity_service::WalletConnectivityInterface,
contacts_service::storage::database::Contact,
error::{WalletError, WalletStorageError},
storage::{
Expand Down Expand Up @@ -3260,6 +3261,13 @@ unsafe fn init_logging(
/// `callback_saf_message_received` - The callback function pointer that will be called when the Dht has determined that
/// is has connected to enough of its neighbours to be confident that it has received any SAF messages that were waiting
/// for it.
/// `callback_connectivity_status` - This callback is called when the status of connection to the set base node
/// changes. it will return an enum encoded as an integer as follows:
/// pub enum OnlineStatus {
/// Connecting, // 0
/// Online, // 1
/// Offline, // 2
/// }
/// `recovery_in_progress` - Pointer to an bool which will be modified to indicate if there is an outstanding recovery
/// that should be completed or not to an error code should one occur, may not be null. Functions as an out parameter.
/// `error_out` - Pointer to an int which will be modified
Expand Down Expand Up @@ -3292,6 +3300,7 @@ pub unsafe extern "C" fn wallet_create(
callback_balance_updated: unsafe extern "C" fn(*mut TariBalance),
callback_transaction_validation_complete: unsafe extern "C" fn(u64, bool),
callback_saf_messages_received: unsafe extern "C" fn(),
callback_connectivity_status: unsafe extern "C" fn(u64),
recovery_in_progress: *mut bool,
error_out: *mut c_int,
) -> *mut TariWallet {
Expand Down Expand Up @@ -3483,6 +3492,7 @@ pub unsafe extern "C" fn wallet_create(
w.dht_service.subscribe_dht_events(),
w.comms.shutdown_signal(),
w.comms.node_identity().public_key().clone(),
w.wallet_connectivity.get_connectivity_status_watch(),
callback_received_transaction,
callback_received_transaction_reply,
callback_received_finalized_transaction,
Expand All @@ -3496,6 +3506,7 @@ pub unsafe extern "C" fn wallet_create(
callback_balance_updated,
callback_transaction_validation_complete,
callback_saf_messages_received,
callback_connectivity_status,
);

runtime.spawn(callback_handler.start());
Expand Down Expand Up @@ -6199,6 +6210,10 @@ mod test {
// assert!(true); //optimized out by compiler
}

unsafe extern "C" fn connectivity_status_callback(_status: u64) {
// assert!(true); //optimized out by compiler
}

const NETWORK_STRING: &str = "dibbler";

#[test]
Expand Down Expand Up @@ -6571,6 +6586,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down Expand Up @@ -6607,6 +6623,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down Expand Up @@ -6709,6 +6726,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down Expand Up @@ -6756,6 +6774,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down Expand Up @@ -6786,6 +6805,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand All @@ -6811,6 +6831,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down Expand Up @@ -6857,6 +6878,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down Expand Up @@ -6932,6 +6954,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down Expand Up @@ -7138,6 +7161,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down Expand Up @@ -7192,6 +7216,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down
8 changes: 8 additions & 0 deletions base_layer/wallet_ffi/wallet.h
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,13 @@ struct TariPublicKeys *comms_list_connected_public_keys(struct TariWallet *walle
/// `callback_saf_message_received` - The callback function pointer that will be called when the Dht has determined that
/// is has connected to enough of its neighbours to be confident that it has received any SAF messages that were waiting
/// for it.
/// `callback_connectivity_status` - This callback is called when the status of connection to the set base node changes.
/// it will return an enum encoded as an integer as follows:
/// pub enum OnlineStatus {
/// Connecting, // 0
/// Online, // 1
/// Offline, // 2
/// }
/// `recovery_in_progress` - Pointer to an bool which will be modified to indicate if there is an outstanding recovery
/// that should be completed or not to an error code should one occur, may not be null. Functions as an out parameter.
/// `error_out` - Pointer to an int which will be modified
Expand Down Expand Up @@ -515,6 +522,7 @@ struct TariWallet *wallet_create(struct TariCommsConfig *config,
void (*callback_balance_updated)(struct TariBalance *),
void (*callback_transaction_validation_complete)(unsigned long long, bool),
void (*callback_saf_message_received)(),
void (*callback_connectivity_status)(unsigned long long),
bool *recovery_in_progress,
int *error_out);

Expand Down
8 changes: 7 additions & 1 deletion integration_tests/helpers/ffi/ffiInterface.js
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ class InterfaceFFI {
this.ptr,
this.ptr,
this.ptr,
this.ptr,
this.boolPtr,
this.intPtr,
],
Expand Down Expand Up @@ -1165,6 +1166,9 @@ class InterfaceFFI {
fn
);
}
static createCallbackConnectivityStatus(fn) {
return ffi.Callback(this.void, [this.ulonglong], fn);
}
//endregion

static walletCreate(
Expand All @@ -1186,7 +1190,8 @@ class InterfaceFFI {
callback_txo_validation_complete,
callback_balance_updated,
callback_transaction_validation_complete,
callback_saf_message_received
callback_saf_message_received,
callback_connectivity_status
) {
let error = this.initError();
let recovery_in_progress = this.initBool();
Expand All @@ -1211,6 +1216,7 @@ class InterfaceFFI {
callback_balance_updated,
callback_transaction_validation_complete,
callback_saf_message_received,
callback_connectivity_status,
recovery_in_progress,
error
);
Expand Down
Loading

0 comments on commit 66ea697

Please sign in to comment.