Skip to content

Commit

Permalink
Add wallet FFI shutdown tests
Browse files Browse the repository at this point in the history
- Added wallet FFI shutdown tests to prove that all services have been
  terminted when the FFI `wallet_destroy` command has been run.
- Added additional safeguards in the FFI `wallet_destroy` method to ensure the
  wallet has shut down.
  • Loading branch information
hansieodendaal committed Dec 7, 2023
1 parent 0bc62b4 commit c3c5575
Show file tree
Hide file tree
Showing 3 changed files with 310 additions and 1 deletion.
2 changes: 1 addition & 1 deletion base_layer/contacts/src/contacts_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ where T: ContactsBackend + 'static
let shutdown = self
.shutdown_signal
.take()
.expect("Output Manager Service initialized without shutdown signal");
.expect("Contacts Service initialized without shutdown signal");
pin_mut!(shutdown);

// Add all contacts as monitored peers to the liveness service
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet_ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ tari_key_manager = { path = "../key_manager" }
tari_common_types = { path = "../../base_layer/common_types"}
tari_test_utils = { path = "../../infrastructure/test_utils"}
tari_service_framework = { path = "../../base_layer/service_framework" }
tari_core = { path = "../../base_layer/core", default-features = false, features = ["base_node"] }
borsh = "1.2"

[build-dependencies]
Expand Down
308 changes: 308 additions & 0 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8249,8 +8249,22 @@ pub unsafe extern "C" fn emoji_set_destroy(emoji_set: *mut EmojiSet) {
pub unsafe extern "C" fn wallet_destroy(wallet: *mut TariWallet) {
if !wallet.is_null() {
let mut w = Box::from_raw(wallet);
let wallet_comms = w.wallet.comms.clone();
w.shutdown.trigger();
w.runtime.block_on(w.wallet.wait_until_shutdown());
// The wallet should be shutdown by now; these are just additional confirmations
loop {
if w.shutdown.is_triggered() &&
wallet_comms.shutdown_signal().is_triggered() &&
w.runtime
.block_on(wallet_comms.connectivity().get_connectivity_status())
.is_err()
{
break;
};
w.runtime
.block_on(async { tokio::time::sleep(Duration::from_millis(250)).await });
}
}
}

Expand Down Expand Up @@ -8575,6 +8589,7 @@ mod test {
use once_cell::sync::Lazy;
use tari_common_types::{emoji, transaction::TransactionStatus, types::PrivateKey};
use tari_comms::peer_manager::PeerFeatures;
use tari_contacts::contacts_service::types::{Direction, Message, MessageMetadata};
use tari_core::{
covenant,
transactions::{
Expand All @@ -8583,6 +8598,7 @@ mod test {
},
};
use tari_key_manager::{mnemonic::MnemonicLanguage, mnemonic_wordlists};
use tari_p2p::initialization::MESSAGING_PROTOCOL_ID;
use tari_script::script;
use tari_test_utils::random;
use tempfile::tempdir;
Expand Down Expand Up @@ -11046,4 +11062,296 @@ mod test {
let _spending_key = Box::from_raw(spending_key_ptr);
}
}

#[test]
#[allow(clippy::too_many_lines)]
pub fn test_wallet_shutdown() {
unsafe {
let mut error = 0;
let error_ptr = &mut error as *mut c_int;
let mut recovery_in_progress = false;
let recovery_in_progress_ptr = &mut recovery_in_progress as *mut bool;

// Create a new wallet for Alice
let db_name = CString::new(random::string(8).as_str()).unwrap();
let alice_db_name_str: *const c_char = CString::into_raw(db_name) as *const c_char;
let temp_dir = tempdir().unwrap();
let db_path = CString::new(temp_dir.path().to_str().unwrap()).unwrap();
let alice_db_path_str: *const c_char = CString::into_raw(db_path) as *const c_char;
let alice_transport_type = transport_memory_create();
let address = transport_memory_get_address(alice_transport_type, error_ptr);
let address_str = CStr::from_ptr(address).to_str().unwrap().to_owned();
let alice_address_str = CString::new(address_str).unwrap().into_raw() as *const c_char;
let network = CString::new(NETWORK_STRING).unwrap();
let alice_network_str: *const c_char = CString::into_raw(network) as *const c_char;

let alice_config = comms_config_create(
alice_address_str,
alice_transport_type,
alice_db_name_str,
alice_db_path_str,
20,
10800,
error_ptr,
);
let passphrase: *const c_char = CString::into_raw(CString::new("niao").unwrap()) as *const c_char;
let alice_wallet_ptr = wallet_create(
alice_config,
ptr::null(),
0,
0,
passphrase,
ptr::null(),
alice_network_str,
received_tx_callback,
received_tx_reply_callback,
received_tx_finalized_callback,
broadcast_callback,
mined_callback,
mined_unconfirmed_callback,
scanned_callback,
scanned_unconfirmed_callback,
transaction_send_result_callback,
tx_cancellation_callback,
txo_validation_complete_callback,
contacts_liveness_data_updated_callback,
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
base_node_state_callback,
recovery_in_progress_ptr,
error_ptr,
);
assert_eq!(error, 0);
string_destroy(alice_network_str as *mut c_char);
string_destroy(alice_db_name_str as *mut c_char);
string_destroy(alice_db_path_str as *mut c_char);
string_destroy(alice_address_str as *mut c_char);
transport_config_destroy(alice_transport_type);
comms_config_destroy(alice_config);

// Create a new wallet for bob
let db_name = CString::new(random::string(8).as_str()).unwrap();
let bob_db_name_str: *const c_char = CString::into_raw(db_name) as *const c_char;
let temp_dir = tempdir().unwrap();
let db_path = CString::new(temp_dir.path().to_str().unwrap()).unwrap();
let bob_db_path_str: *const c_char = CString::into_raw(db_path) as *const c_char;
let bob_transport_type = transport_memory_create();
let address = transport_memory_get_address(bob_transport_type, error_ptr);
let address_str = CStr::from_ptr(address).to_str().unwrap().to_owned();
let bob_address_str = CString::new(address_str).unwrap().into_raw() as *const c_char;
let network = CString::new(NETWORK_STRING).unwrap();
let bob_network_str: *const c_char = CString::into_raw(network) as *const c_char;

let bob_config = comms_config_create(
bob_address_str,
bob_transport_type,
bob_db_name_str,
bob_db_path_str,
20,
10800,
error_ptr,
);
let passphrase: *const c_char = CString::into_raw(CString::new("niao").unwrap()) as *const c_char;
let bob_wallet_ptr = wallet_create(
bob_config,
ptr::null(),
0,
0,
passphrase,
ptr::null(),
bob_network_str,
received_tx_callback,
received_tx_reply_callback,
received_tx_finalized_callback,
broadcast_callback,
mined_callback,
mined_unconfirmed_callback,
scanned_callback,
scanned_unconfirmed_callback,
transaction_send_result_callback,
tx_cancellation_callback,
txo_validation_complete_callback,
contacts_liveness_data_updated_callback,
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
base_node_state_callback,
recovery_in_progress_ptr,
error_ptr,
);
assert_eq!(error, 0);
string_destroy(bob_network_str as *mut c_char);
string_destroy(bob_db_name_str as *mut c_char);
string_destroy(bob_db_path_str as *mut c_char);
string_destroy(bob_address_str as *mut c_char);
transport_config_destroy(bob_transport_type);
comms_config_destroy(bob_config);

// Add some peers
// - Wallet peer for Alice (add Bob as a base node peer; not how it will be done in production but good
// enough for the test as we just need to make sure the wallet can connect to a peer)
let bob_wallet_comms = (*bob_wallet_ptr).wallet.comms.clone();
let bob_node_identity = bob_wallet_comms.node_identity();
let bob_peer_public_key_ptr = Box::into_raw(Box::new(bob_node_identity.public_key().clone()));
let bob_peer_address_ptr =
CString::into_raw(CString::new(bob_node_identity.first_public_address().unwrap().to_string()).unwrap())
as *const c_char;
wallet_add_base_node_peer(
alice_wallet_ptr,
bob_peer_public_key_ptr,
bob_peer_address_ptr,
error_ptr,
);
string_destroy(bob_peer_address_ptr as *mut c_char);
let _destroyed = Box::from_raw(bob_peer_public_key_ptr);
// - Wallet peer for Bob (add Alice as a base node peer; same as above)
let alice_wallet_comms = (*alice_wallet_ptr).wallet.comms.clone();
let alice_node_identity = alice_wallet_comms.node_identity();
let alice_peer_public_key_ptr = Box::into_raw(Box::new(alice_node_identity.public_key().clone()));
let alice_peer_address_ptr = CString::into_raw(
CString::new(alice_node_identity.first_public_address().unwrap().to_string()).unwrap(),
) as *const c_char;
wallet_add_base_node_peer(
bob_wallet_ptr,
alice_peer_public_key_ptr,
alice_peer_address_ptr,
error_ptr,
);
string_destroy(alice_peer_address_ptr as *mut c_char);
let _destroyed = Box::from_raw(alice_peer_public_key_ptr);

// Add some contacts
// - Contact for Alice
let bob_wallet_address = TariWalletAddress::new(bob_node_identity.public_key().clone(), Network::LocalNet);
let alice_contact_alias_ptr: *const c_char =
CString::into_raw(CString::new("bob").unwrap()) as *const c_char;
let alice_contact_address_ptr = Box::into_raw(Box::new(bob_wallet_address.clone()));
let alice_contact_ptr = contact_create(alice_contact_alias_ptr, alice_contact_address_ptr, true, error_ptr);
tari_address_destroy(alice_contact_address_ptr);
assert!(wallet_upsert_contact(alice_wallet_ptr, alice_contact_ptr, error_ptr));
contact_destroy(alice_contact_ptr);
// - Contact for Bob
let alice_wallet_address =
TariWalletAddress::new(alice_node_identity.public_key().clone(), Network::LocalNet);
let bob_contact_alias_ptr: *const c_char =
CString::into_raw(CString::new("alice").unwrap()) as *const c_char;
let bob_contact_address_ptr = Box::into_raw(Box::new(alice_wallet_address.clone()));
let bob_contact_ptr = contact_create(bob_contact_alias_ptr, bob_contact_address_ptr, true, error_ptr);
tari_address_destroy(bob_contact_address_ptr);
assert!(wallet_upsert_contact(bob_wallet_ptr, bob_contact_ptr, error_ptr));
contact_destroy(bob_contact_ptr);

// Use comms service - do `dial_peer` for both wallets (we do not 'assert!' here to not make the test flaky)
// Note: This loop is just to make sure we actually connect as the first attempts do not always succeed
let alice_wallet_runtime = &(*alice_wallet_ptr).runtime;
let bob_wallet_runtime = &(*bob_wallet_ptr).runtime;
let mut alice_dialed_bob = false;
let mut bob_dialed_alice = false;
let mut dial_count = 0;
loop {
dial_count += 1;
if !alice_dialed_bob {
alice_dialed_bob = alice_wallet_runtime
.block_on(
alice_wallet_comms
.connectivity()
.dial_peer(bob_node_identity.node_id().clone()),
)
.is_ok();
}
if !bob_dialed_alice {
bob_dialed_alice = bob_wallet_runtime
.block_on(
bob_wallet_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone()),
)
.is_ok();
}
if alice_dialed_bob && bob_dialed_alice || dial_count > 10 {
break;
}
// Wait a bit before the next attempt
alice_wallet_runtime.block_on(async { tokio::time::sleep(Duration::from_millis(500)).await });
}

// Use contacts service - send some messages for both wallets
let mut alice_wallet_contacts_service = (*alice_wallet_ptr).wallet.contacts_service.clone();
let mut bob_wallet_contacts_service = (*bob_wallet_ptr).wallet.contacts_service.clone();
let mut alice_msg_count = 0;
let mut bob_msg_count = 0;
// Note: This loop is just to make sure we actually send a couple of messages as the first attempts do not
// always succeed (we do not 'assert!' here to not make the test flaky)
for i in 0..60 {
if alice_msg_count < 5 {
let alice_message_result =
alice_wallet_runtime.block_on(alice_wallet_contacts_service.send_message(Message {
body: vec![i],
metadata: vec![MessageMetadata::default()],
address: bob_wallet_address.clone(),
direction: Direction::Outbound,
stored_at: u64::from(i),
delivery_confirmation_at: None,
read_confirmation_at: None,
message_id: vec![i],
}));
if alice_message_result.is_ok() {
alice_msg_count += 1;
}
}
if bob_msg_count < 5 {
let bob_message_result =
bob_wallet_runtime.block_on(bob_wallet_contacts_service.send_message(Message {
body: vec![i],
metadata: vec![MessageMetadata::default()],
address: alice_wallet_address.clone(),
direction: Direction::Outbound,
stored_at: u64::from(i),
delivery_confirmation_at: None,
read_confirmation_at: None,
message_id: vec![i],
}));
if bob_message_result.is_ok() {
bob_msg_count += 1;
}
}
if alice_msg_count >= 5 && bob_msg_count >= 5 {
break;
}
// Wait a bit before the next attempt
alice_wallet_runtime.block_on(async { tokio::time::sleep(Duration::from_millis(500)).await });
}

// Trigger Alice wallet shutdown (same as `pub unsafe extern "C" fn wallet_destroy(wallet: *mut TariWallet)`
wallet_destroy(alice_wallet_ptr);

// Bob's peer connection to Alice will still be active for a short while until Bob figures out Alice is
// gone, and a 'dial_peer' command to Alice from Bob may return the previous connection state, but it
// should not be possible to do anything with the connection.
let bob_comms_dial_peer = bob_wallet_runtime.block_on(
bob_wallet_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone()),
);
if let Ok(mut connection_to_alice) = bob_comms_dial_peer {
if bob_wallet_runtime
.block_on(connection_to_alice.open_substream(&MESSAGING_PROTOCOL_ID.clone()))
.is_ok()
{
panic!("Connection to Alice should not be active!");
}
}

// - Bob can still retrieve messages Alice sent
let bob_contacts_get_messages =
bob_wallet_runtime.block_on(bob_wallet_contacts_service.get_messages(alice_wallet_address, 1, 1));
assert!(bob_contacts_get_messages.is_ok());

// Cleanup
wallet_destroy(bob_wallet_ptr);
}
}
}

0 comments on commit c3c5575

Please sign in to comment.