Skip to content

Commit

Permalink
Add wallet FFI shutdown tests
Browse files Browse the repository at this point in the history
Added wallet FFI shutdown tests to prove that all services have been
terminted when the FFI `wallet_destroy` command has been run.
  • Loading branch information
hansieodendaal committed Nov 29, 2023
1 parent c54478f commit b3e15b7
Show file tree
Hide file tree
Showing 6 changed files with 363 additions and 2 deletions.
15 changes: 15 additions & 0 deletions base_layer/contacts/src/contacts_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ pub enum ContactsServiceRequest {
GetMessages(TariAddress, i64, i64),
SendReadConfirmation(TariAddress, Confirmation),
GetConversationalists,
ShutdownSignalTriggered,
}

#[derive(Debug)]
Expand All @@ -153,6 +154,7 @@ pub enum ContactsServiceResponse {
MessageSent,
ReadConfirmationSent,
Conversationalists(Vec<TariAddress>),
ShutdownSignalTriggered(bool),
}

#[derive(Clone)]
Expand Down Expand Up @@ -227,6 +229,19 @@ impl ContactsServiceHandle {
self.liveness_events.subscribe()
}

/// Returns a new `ShutdownSignal`
pub async fn shutdown_signal_triggered(&mut self) -> Result<bool, ContactsServiceError> {
match self
.request_response_service
.call(ContactsServiceRequest::ShutdownSignalTriggered)
.await??
{
ContactsServiceResponse::ShutdownSignalTriggered(c) => Ok(c),
_ => Err(ContactsServiceError::UnexpectedApiResponse),
}
}


pub fn get_messages_event_stream(&self) -> broadcast::Receiver<Arc<MessageDispatch>> {
self.message_events.subscribe()
}
Expand Down
10 changes: 9 additions & 1 deletion base_layer/contacts/src/contacts_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ where T: ContactsBackend + 'static
let shutdown = self
.shutdown_signal
.take()
.expect("Output Manager Service initialized without shutdown signal");
.expect("Contacts Service initialized without shutdown signal");
pin_mut!(shutdown);

// Add all contacts as monitored peers to the liveness service
Expand Down Expand Up @@ -322,6 +322,14 @@ where T: ContactsBackend + 'static
let result = self.db.get_conversationlists();
Ok(result.map(ContactsServiceResponse::Conversationalists)?)
},
ContactsServiceRequest::ShutdownSignalTriggered => {
let result = if let Some(signal) = self.shutdown_signal.clone() {
signal.is_triggered()
} else {
false
};
Ok(ContactsServiceResponse::ShutdownSignalTriggered(result))
}
}
}

Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet_ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ tari_key_manager = { path = "../key_manager" }
tari_common_types = { path = "../../base_layer/common_types"}
tari_test_utils = { path = "../../infrastructure/test_utils"}
tari_service_framework = { path = "../../base_layer/service_framework" }
tari_core = { path = "../../base_layer/core", default-features = false, features = ["base_node"] }
borsh = "0.10"

[build-dependencies]
Expand Down
310 changes: 310 additions & 0 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8578,6 +8578,9 @@ mod test {
};
use tari_common_types::{emoji, transaction::TransactionStatus, types::PrivateKey};
use tari_comms::peer_manager::PeerFeatures;
use tari_contacts::contacts_service::{
types::{Direction, Message, MessageMetadata},
};
use tari_core::{
covenant,
transactions::{
Expand Down Expand Up @@ -11051,4 +11054,311 @@ mod test {
let _spending_key = Box::from_raw(spending_key_ptr);
}
}

#[test]
#[allow(clippy::too_many_lines)]
pub fn test_wallet_shutdown() {
unsafe {
let mut error = 0;
let error_ptr = &mut error as *mut c_int;
let mut recovery_in_progress = false;
let recovery_in_progress_ptr = &mut recovery_in_progress as *mut bool;

// Create a new wallet for Alice
let db_name = CString::new(random::string(8).as_str()).unwrap();
let alice_db_name_str: *const c_char = CString::into_raw(db_name) as *const c_char;
let temp_dir = tempdir().unwrap();
let db_path = CString::new(temp_dir.path().to_str().unwrap()).unwrap();
let alice_db_path_str: *const c_char = CString::into_raw(db_path) as *const c_char;
let alice_transport_type = transport_memory_create();
let address = transport_memory_get_address(alice_transport_type, error_ptr);
let address_str = CStr::from_ptr(address).to_str().unwrap().to_owned();
let alice_address_str = CString::new(address_str).unwrap().into_raw() as *const c_char;
let network = CString::new(NETWORK_STRING).unwrap();
let alice_network_str: *const c_char = CString::into_raw(network) as *const c_char;

let alice_config = comms_config_create(
alice_address_str,
alice_transport_type,
alice_db_name_str,
alice_db_path_str,
20,
10800,
error_ptr,
);
let passphrase: *const c_char = CString::into_raw(CString::new("niao").unwrap()) as *const c_char;
let alice_wallet_ptr = wallet_create(
alice_config,
ptr::null(),
0,
0,
passphrase,
ptr::null(),
alice_network_str,
received_tx_callback,
received_tx_reply_callback,
received_tx_finalized_callback,
broadcast_callback,
mined_callback,
mined_unconfirmed_callback,
scanned_callback,
scanned_unconfirmed_callback,
transaction_send_result_callback,
tx_cancellation_callback,
txo_validation_complete_callback,
contacts_liveness_data_updated_callback,
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
base_node_state_callback,
recovery_in_progress_ptr,
error_ptr,
);
assert_eq!(error, 0);
string_destroy(alice_network_str as *mut c_char);
string_destroy(alice_db_name_str as *mut c_char);
string_destroy(alice_db_path_str as *mut c_char);
string_destroy(alice_address_str as *mut c_char);
transport_config_destroy(alice_transport_type);
comms_config_destroy(alice_config);

// Create a new wallet for bob
let db_name = CString::new(random::string(8).as_str()).unwrap();
let bob_db_name_str: *const c_char = CString::into_raw(db_name) as *const c_char;
let temp_dir = tempdir().unwrap();
let db_path = CString::new(temp_dir.path().to_str().unwrap()).unwrap();
let bob_db_path_str: *const c_char = CString::into_raw(db_path) as *const c_char;
let bob_transport_type = transport_memory_create();
let address = transport_memory_get_address(bob_transport_type, error_ptr);
let address_str = CStr::from_ptr(address).to_str().unwrap().to_owned();
let bob_address_str = CString::new(address_str).unwrap().into_raw() as *const c_char;
let network = CString::new(NETWORK_STRING).unwrap();
let bob_network_str: *const c_char = CString::into_raw(network) as *const c_char;

let bob_config = comms_config_create(
bob_address_str,
bob_transport_type,
bob_db_name_str,
bob_db_path_str,
20,
10800,
error_ptr,
);
let passphrase: *const c_char = CString::into_raw(CString::new("niao").unwrap()) as *const c_char;
let bob_wallet_ptr = wallet_create(
bob_config,
ptr::null(),
0,
0,
passphrase,
ptr::null(),
bob_network_str,
received_tx_callback,
received_tx_reply_callback,
received_tx_finalized_callback,
broadcast_callback,
mined_callback,
mined_unconfirmed_callback,
scanned_callback,
scanned_unconfirmed_callback,
transaction_send_result_callback,
tx_cancellation_callback,
txo_validation_complete_callback,
contacts_liveness_data_updated_callback,
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
base_node_state_callback,
recovery_in_progress_ptr,
error_ptr,
);
assert_eq!(error, 0);
string_destroy(bob_network_str as *mut c_char);
string_destroy(bob_db_name_str as *mut c_char);
string_destroy(bob_db_path_str as *mut c_char);
string_destroy(bob_address_str as *mut c_char);
transport_config_destroy(bob_transport_type);
comms_config_destroy(bob_config);

// Add some peers
// - Wallet peer for Alice
let bob_wallet_comms = (*bob_wallet_ptr).wallet.comms.clone();
let bob_node_identity = bob_wallet_comms.node_identity();
let bob_peer_public_key_ptr = Box::into_raw(Box::new(bob_node_identity.public_key().clone()));
let bob_peer_address_ptr =
CString::into_raw(CString::new(bob_node_identity.first_public_address().unwrap().to_string()).unwrap())
as *const c_char;
wallet_add_base_node_peer(
alice_wallet_ptr,
bob_peer_public_key_ptr,
bob_peer_address_ptr,
error_ptr,
);
string_destroy(bob_peer_address_ptr as *mut c_char);
let _destroyed = Box::from_raw(bob_peer_public_key_ptr);
// - Wallet peer for Bob
let alice_wallet_comms = (*alice_wallet_ptr).wallet.comms.clone();
let alice_node_identity = alice_wallet_comms.node_identity();
let alice_peer_public_key_ptr = Box::into_raw(Box::new(alice_node_identity.public_key().clone()));
let alice_peer_address_ptr = CString::into_raw(
CString::new(alice_node_identity.first_public_address().unwrap().to_string()).unwrap(),
) as *const c_char;
wallet_add_base_node_peer(
bob_wallet_ptr,
alice_peer_public_key_ptr,
alice_peer_address_ptr,
error_ptr,
);
string_destroy(alice_peer_address_ptr as *mut c_char);
let _destroyed = Box::from_raw(alice_peer_public_key_ptr);

// Add some contacts
// - Contact for Alice
let bob_wallet_address = TariWalletAddress::new(bob_node_identity.public_key().clone(), Network::LocalNet);
let alice_contact_alias_ptr: *const c_char =
CString::into_raw(CString::new("bob").unwrap()) as *const c_char;
let alice_contact_address_ptr = Box::into_raw(Box::new(bob_wallet_address.clone()));
let alice_contact_ptr = contact_create(alice_contact_alias_ptr, alice_contact_address_ptr, true, error_ptr);
tari_address_destroy(alice_contact_address_ptr);
assert!(wallet_upsert_contact(alice_wallet_ptr, alice_contact_ptr, error_ptr));
contact_destroy(alice_contact_ptr);
// - Contact for Bob
let alice_wallet_address =
TariWalletAddress::new(alice_node_identity.public_key().clone(), Network::LocalNet);
let bob_contact_alias_ptr: *const c_char =
CString::into_raw(CString::new("alice").unwrap()) as *const c_char;
let bob_contact_address_ptr = Box::into_raw(Box::new(alice_wallet_address.clone()));
let bob_contact_ptr = contact_create(bob_contact_alias_ptr, bob_contact_address_ptr, true, error_ptr);
tari_address_destroy(bob_contact_address_ptr);
assert!(wallet_upsert_contact(bob_wallet_ptr, bob_contact_ptr, error_ptr));
contact_destroy(bob_contact_ptr);

println!("\n1 - use comms\n");

let alice_wallet_runtime = &(*alice_wallet_ptr).runtime;
let bob_connection = alice_wallet_runtime.block_on(
alice_wallet_comms
.connectivity()
.dial_peer(bob_node_identity.node_id().clone()),
);
println!("alice comms service: {:?}", bob_connection);
let bob_wallet_runtime = &(*bob_wallet_ptr).runtime;
let alice_connection = bob_wallet_runtime.block_on(
bob_wallet_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone()),
);
println!("bob comms service: {:?}", alice_connection);
let mut alice_wallet_contacts_service = (*alice_wallet_ptr).wallet.contacts_service.clone();
let mut bob_wallet_contacts_service = (*bob_wallet_ptr).wallet.contacts_service.clone();
let mut alice_msg_count = 0;
let mut bob_msg_count = 0;
for i in 0..60 {
if alice_msg_count < 5 {
let alice_message_result =
alice_wallet_runtime.block_on(alice_wallet_contacts_service.send_message(Message {
body: vec![i],
metadata: vec![MessageMetadata::default()],
address: bob_wallet_address.clone(),
direction: Direction::Outbound,
stored_at: i as u64,
delivery_confirmation_at: None,
read_confirmation_at: None,
message_id: vec![i],
}));
if alice_message_result.is_ok() {
alice_msg_count += 1;
println!("alice contacts service: try {} message {:?}", i, alice_message_result);
}
}
if bob_msg_count < 5 {
let bob_message_result =
bob_wallet_runtime.block_on(bob_wallet_contacts_service.send_message(Message {
body: vec![i],
metadata: vec![MessageMetadata::default()],
address: alice_wallet_address.clone(),
direction: Direction::Outbound,
stored_at: i as u64,
delivery_confirmation_at: None,
read_confirmation_at: None,
message_id: vec![i],
}));
if bob_message_result.is_ok() {
bob_msg_count += 1;
println!("bob contacts service: try {} message {:?}", i, bob_message_result);
}
}
if alice_msg_count >= 5 && bob_msg_count >= 5 {
break;
}
// wait a bit
alice_wallet_runtime.block_on(async { tokio::time::sleep(Duration::from_millis(1000)).await });
}

println!("\n2 - trigger shutdown\n");
assert!(!(*alice_wallet_ptr).shutdown.is_triggered());
assert!(!alice_wallet_comms.shutdown_signal().is_triggered());
assert!(!alice_wallet_runtime.block_on(alice_wallet_contacts_service.shutdown_signal_triggered()).unwrap());

(*alice_wallet_ptr).shutdown.trigger();
alice_wallet_runtime.block_on((*alice_wallet_ptr).wallet.clone().wait_until_shutdown());

assert!((*alice_wallet_ptr).shutdown.is_triggered());
println!("alice wallet shutdown {:?}", (*alice_wallet_ptr).shutdown);
println!("alice comms shutdown {:?}", alice_wallet_comms.shutdown_signal());

for i in 0..30 {
let comms_triggered = alice_wallet_comms.shutdown_signal().is_triggered();
let contacts_triggered = match alice_wallet_runtime.block_on(alice_wallet_contacts_service.shutdown_signal_triggered()) {
Ok(triggered) => triggered,
Err(_) => false,
};
println!("alice shutdown: {} - wallet {}, comms {}, contacts {}", i, (*alice_wallet_ptr).shutdown.is_triggered(), comms_triggered, contacts_triggered);
if comms_triggered && contacts_triggered {
break;
}
bob_wallet_runtime.block_on(async { tokio::time::sleep(Duration::from_millis(1000)).await });
}
println!("\n3 - use comms\n");
let bob_connection = alice_wallet_runtime.block_on(
alice_wallet_comms
.connectivity()
.dial_peer(bob_node_identity.node_id().clone()),
);
println!("alice comms service: {:?}", bob_connection);
let alice_connection = bob_wallet_runtime.block_on(
bob_wallet_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone()),
);
println!("bob comms service: {:?}", alice_connection);
let bob_online_status = alice_wallet_runtime
.block_on(alice_wallet_contacts_service.get_contact_online_status(Contact::from(&bob_wallet_address)));
println!("alice contacts service: {:?}", bob_online_status);
let alice_online_status = bob_wallet_runtime.block_on(
bob_wallet_contacts_service.get_contact_online_status(Contact::from(&alice_wallet_address.clone())),
);
println!("bob contacts service: {:?}", alice_online_status);
let alice_messages = alice_wallet_runtime
.block_on(alice_wallet_contacts_service.get_messages(bob_wallet_address, 1, 1));
println!("alice contacts service: {:?}", alice_messages);
for _ in 0..10 {
let bob_messages = bob_wallet_runtime
.block_on(bob_wallet_contacts_service.get_messages(alice_wallet_address.clone(), 1, 1));
println!("bob contacts service: {:?}", bob_messages);
if let Ok(messages) = bob_messages {
if messages.len() > 0 {
break;
}
}
bob_wallet_runtime.block_on(async { tokio::time::sleep(Duration::from_millis(1000)).await });
}

// Cleanup
wallet_destroy(alice_wallet_ptr);
wallet_destroy(bob_wallet_ptr);
}
}
}
2 changes: 1 addition & 1 deletion infrastructure/shutdown/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ edition = "2018"
futures = "^0.3"

[dev-dependencies]
tokio = { version = "1", default-features = false, features = ["rt", "macros"] }
tokio = { version = "1", default-features = false, features = ["rt", "macros", "rt-multi-thread", "time"] }
Loading

0 comments on commit b3e15b7

Please sign in to comment.