Skip to content

Commit

Permalink
feat: added auxiliary callback to push base node state changes #5109 (#…
Browse files Browse the repository at this point in the history
…5257)

Description
---
Extra function argument to `wallet_create` FFI.

#5109

Motivation and Context
---
Giving mobile app more information about the active base node
connection.

How Has This Been Tested?
---

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: stringhandler <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Aaron Feickert <[email protected]>
Co-authored-by: C.Lee Taylor <[email protected]>
Co-authored-by: SW van Heerden <[email protected]>
  • Loading branch information
6 people authored Apr 5, 2023
1 parent bfc92fd commit b7f7d31
Show file tree
Hide file tree
Showing 11 changed files with 794 additions and 5 deletions.
2 changes: 2 additions & 0 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ where
Err(e @ BaseNodeMonitorError::RpcFailed(_)) => {
warn!(target: LOG_TARGET, "Connectivity failure to base node: {}", e);
self.update_state(BaseNodeState {
node_id: None,
chain_metadata: None,
is_synced: None,
updated: None,
Expand Down Expand Up @@ -169,6 +170,7 @@ where
let height_of_longest_chain = chain_metadata.height_of_longest_chain();

self.update_state(BaseNodeState {
node_id: Some(base_node_id.clone()),
chain_metadata: Some(chain_metadata),
is_synced: Some(is_synced),
updated: Some(Utc::now().naive_utc()),
Expand Down
2 changes: 2 additions & 0 deletions base_layer/wallet/src/base_node_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use chrono::NaiveDateTime;
use futures::{future, StreamExt};
use log::*;
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::peer_manager::NodeId;
use tari_service_framework::reply_channel::Receiver;
use tari_shutdown::ShutdownSignal;
use tokio::sync::RwLock;
Expand All @@ -46,6 +47,7 @@ const LOG_TARGET: &str = "wallet::base_node_service::service";
/// State determined from Base Node Service Requests
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
pub struct BaseNodeState {
pub node_id: Option<NodeId>,
pub chain_metadata: Option<ChainMetadata>,
pub is_synced: Option<bool>,
pub updated: Option<NaiveDateTime>,
Expand Down
2 changes: 2 additions & 0 deletions base_layer/wallet/tests/support/base_node_service_mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl MockBaseNodeService {
};

self.state = BaseNodeState {
node_id: None,
chain_metadata,
is_synced,
updated: None,
Expand All @@ -96,6 +97,7 @@ impl MockBaseNodeService {
pub fn set_default_base_node_state(&mut self) {
let metadata = ChainMetadata::new(i64::MAX as u64, FixedHash::zero(), 0, 0, 0, 0);
self.state = BaseNodeState {
node_id: None,
chain_metadata: Some(metadata),
is_synced: Some(true),
updated: None,
Expand Down
71 changes: 70 additions & 1 deletion base_layer/wallet_ffi/src/callback_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@
use std::{ops::Deref, sync::Arc};

use log::*;
use tari_common_types::{tari_address::TariAddress, transaction::TxId};
use tari_common_types::{tari_address::TariAddress, transaction::TxId, types::BlockHash};
use tari_comms_dht::event::{DhtEvent, DhtEventReceiver};
use tari_contacts::contacts_service::handle::{ContactsLivenessData, ContactsLivenessEvent};
use tari_shutdown::ShutdownSignal;
use tari_wallet::{
base_node_service::{
handle::{BaseNodeEvent, BaseNodeEventReceiver},
service::BaseNodeState,
},
connectivity_service::OnlineStatus,
output_manager_service::{
handle::{OutputManagerEvent, OutputManagerEventReceiver, OutputManagerHandle},
Expand All @@ -58,6 +62,8 @@ use tari_wallet::{
};
use tokio::sync::{broadcast, watch};

use crate::ffi_basenode_state::TariBaseNodeState;

const LOG_TARGET: &str = "wallet::transaction_service::callback_handler";

pub struct CallbackHandler<TBackend>
Expand All @@ -79,7 +85,9 @@ where TBackend: TransactionBackend + 'static
callback_transaction_validation_complete: unsafe extern "C" fn(u64, u64),
callback_saf_messages_received: unsafe extern "C" fn(),
callback_connectivity_status: unsafe extern "C" fn(u64),
callback_base_node_state: unsafe extern "C" fn(*mut TariBaseNodeState),
db: TransactionDatabase<TBackend>,
base_node_service_event_stream: BaseNodeEventReceiver,
transaction_service_event_stream: TransactionEventReceiver,
output_manager_service_event_stream: OutputManagerEventReceiver,
output_manager_service: OutputManagerHandle,
Expand All @@ -97,6 +105,7 @@ where TBackend: TransactionBackend + 'static
#[allow(clippy::too_many_arguments)]
pub fn new(
db: TransactionDatabase<TBackend>,
base_node_service_event_stream: BaseNodeEventReceiver,
transaction_service_event_stream: TransactionEventReceiver,
output_manager_service_event_stream: OutputManagerEventReceiver,
output_manager_service: OutputManagerHandle,
Expand All @@ -121,6 +130,7 @@ where TBackend: TransactionBackend + 'static
callback_transaction_validation_complete: unsafe extern "C" fn(u64, u64),
callback_saf_messages_received: unsafe extern "C" fn(),
callback_connectivity_status: unsafe extern "C" fn(u64),
callback_base_node_state: unsafe extern "C" fn(*mut TariBaseNodeState),
) -> Self {
info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -204,7 +214,9 @@ where TBackend: TransactionBackend + 'static
callback_transaction_validation_complete,
callback_saf_messages_received,
callback_connectivity_status,
callback_base_node_state,
db,
base_node_service_event_stream,
transaction_service_event_stream,
output_manager_service_event_stream,
output_manager_service,
Expand Down Expand Up @@ -295,6 +307,7 @@ where TBackend: TransactionBackend + 'static
Err(_e) => error!(target: LOG_TARGET, "Error reading from Transaction Service event broadcast channel"),
}
},

result = self.output_manager_service_event_stream.recv() => {
match result {
Ok(msg) => {
Expand All @@ -318,6 +331,7 @@ where TBackend: TransactionBackend + 'static
Err(_e) => error!(target: LOG_TARGET, "Error reading from Output Manager Service event broadcast channel"),
}
},

result = self.dht_event_stream.recv() => {
match result {
Ok(msg) => {
Expand All @@ -329,11 +343,32 @@ where TBackend: TransactionBackend + 'static
Err(_e) => error!(target: LOG_TARGET, "Error reading from DHT event broadcast channel"),
}
}

Ok(_) = self.connectivity_status_watch.changed() => {
let status = *self.connectivity_status_watch.borrow();
trace!(target: LOG_TARGET, "Connectivity status change detected: {:?}", status);
self.connectivity_status_changed(status);
},

event = self.base_node_service_event_stream.recv() => {
match event {
Ok(msg) => {
trace!(target: LOG_TARGET, "Base Node Service Callback Handler event {:?}", msg);
match (*msg).clone() {
BaseNodeEvent::BaseNodeStateChanged(state) => {
trace!("base node state changed: {:#?}", state);
self.base_node_state_changed(state);
},

BaseNodeEvent::NewBlockDetected(_new_block_number) => {
//
},
}
},
Err(_e) => error!(target: LOG_TARGET, "failed to receive base node state event"),
}
},

event = self.contacts_liveness_events.recv() => {
match event {
Ok(liveness_event) => {
Expand Down Expand Up @@ -613,4 +648,38 @@ where TBackend: TransactionBackend + 'static
(self.callback_connectivity_status)(status as u64);
}
}

fn base_node_state_changed(&mut self, state: BaseNodeState) {
debug!(target: LOG_TARGET, "Calling Base Node State changed callback function");

let state = match state.chain_metadata {
None => TariBaseNodeState {
node_id: state.node_id,
height_of_longest_chain: 0,
best_block: BlockHash::zero(),
best_block_timestamp: 0,
pruning_horizon: 0,
pruned_height: 0,
is_node_synced: false,
updated_at: 0,
latency: 0,
},

Some(chain_metadata) => TariBaseNodeState {
node_id: state.node_id,
height_of_longest_chain: chain_metadata.height_of_longest_chain(),
best_block: *chain_metadata.best_block(),
best_block_timestamp: chain_metadata.timestamp(),
pruning_horizon: chain_metadata.pruning_horizon(),
pruned_height: chain_metadata.pruned_height(),
is_node_synced: state.is_synced.unwrap_or(false),
updated_at: state.updated.map(|ts| ts.timestamp_millis() as u64).unwrap_or(0),
latency: state.latency.map(|d| d.as_millis() as u64).unwrap_or(0),
},
};

unsafe {
(self.callback_base_node_state)(Box::into_raw(Box::new(state)));
}
}
}
59 changes: 56 additions & 3 deletions base_layer/wallet_ffi/src/callback_handler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ mod test {
mem::size_of,
sync::{Arc, Mutex},
thread,
time::Duration,
time::{Duration, SystemTime},
};

use chacha20poly1305::{Key, KeyInit, XChaCha20Poly1305};
use chrono::{NaiveDateTime, Utc};
use rand::{rngs::OsRng, RngCore};
use tari_common::configuration::Network;
use tari_common_types::{
chain_metadata::ChainMetadata,
tari_address::TariAddress,
transaction::{TransactionDirection, TransactionStatus},
types::{BlindingFactor, PrivateKey, PublicKey},
};
use tari_comms::peer_manager::NodeId;
use tari_comms_dht::event::DhtEvent;
use tari_contacts::contacts_service::{
handle::{ContactsLivenessData, ContactsLivenessEvent},
Expand All @@ -35,6 +37,7 @@ mod test {
use tari_service_framework::reply_channel;
use tari_shutdown::Shutdown;
use tari_wallet::{
base_node_service::{handle::BaseNodeEvent, service::BaseNodeState},
connectivity_service::OnlineStatus,
output_manager_service::{
handle::{OutputManagerEvent, OutputManagerHandle},
Expand All @@ -56,7 +59,11 @@ mod test {
time::Instant,
};

use crate::{callback_handler::CallbackHandler, output_manager_service_mock::MockOutputManagerService};
use crate::{
callback_handler::CallbackHandler,
ffi_basenode_state::TariBaseNodeState,
output_manager_service_mock::MockOutputManagerService,
};

#[derive(Debug)]
#[allow(clippy::struct_excessive_bools)]
Expand Down Expand Up @@ -84,6 +91,7 @@ mod test {
pub callback_transaction_validation_complete: u32,
pub saf_messages_received: bool,
pub connectivity_status_callback_called: u64,
pub base_node_state_changed_callback_invoked: bool,
}

impl CallbackState {
Expand Down Expand Up @@ -112,6 +120,7 @@ mod test {
tx_cancellation_callback_called_outbound: false,
saf_messages_received: false,
connectivity_status_callback_called: 0,
base_node_state_changed_callback_invoked: false,
}
}
}
Expand Down Expand Up @@ -245,6 +254,13 @@ mod test {
drop(lock);
}

unsafe extern "C" fn base_node_state_changed_callback(state: *mut TariBaseNodeState) {
let mut lock = CALLBACK_STATE.lock().unwrap();
lock.base_node_state_changed_callback_invoked = true;
drop(lock);
drop(Box::from_raw(state))
}

#[test]
#[allow(clippy::too_many_lines)]
fn test_callback_handler() {
Expand Down Expand Up @@ -408,6 +424,7 @@ mod test {
db.insert_completed_transaction(7u64.into(), faux_confirmed_tx.clone())
.unwrap();

let (base_node_event_sender, base_node_event_receiver) = broadcast::channel(20);
let (transaction_event_sender, transaction_event_receiver) = broadcast::channel(20);
let (oms_event_sender, oms_event_receiver) = broadcast::channel(20);
let (dht_event_sender, dht_event_receiver) = broadcast::channel(20);
Expand Down Expand Up @@ -442,6 +459,7 @@ mod test {

let callback_handler = CallbackHandler::new(
db,
base_node_event_receiver,
transaction_event_receiver,
oms_event_receiver,
oms_handle,
Expand All @@ -466,14 +484,49 @@ mod test {
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
base_node_state_changed_callback,
);

runtime.spawn(callback_handler.start());
let mut callback_balance_updated = 0;

let ts_now = NaiveDateTime::from_timestamp_millis(
SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64,
)
.unwrap();

let chain_metadata = ChainMetadata::new(1, Default::default(), 0, 0, 123, ts_now.timestamp_millis() as u64);

base_node_event_sender
.send(Arc::new(BaseNodeEvent::BaseNodeStateChanged(BaseNodeState {
node_id: Some(NodeId::new()),
chain_metadata: Some(chain_metadata),
is_synced: Some(true),
updated: Some(NaiveDateTime::from_timestamp_millis(
ts_now.timestamp_millis() - (60 * 1000),
))
.unwrap(),
latency: Some(Duration::from_micros(500)),
})))
.unwrap();

let start = Instant::now();
while start.elapsed().as_secs() < 10 {
let lock = CALLBACK_STATE.lock().unwrap();

if lock.base_node_state_changed_callback_invoked {
break;
}
}
assert!(CALLBACK_STATE.lock().unwrap().base_node_state_changed_callback_invoked);

// The balance updated callback is bundled with other callbacks and will only fire if the balance actually
// changed from an initial zero balance.
// Balance updated should be detected with following event, total = 1 times
let mut callback_balance_updated = 0;

transaction_event_sender
.send(Arc::new(TransactionEvent::ReceivedTransaction(1u64.into())))
.unwrap();
Expand Down
Loading

0 comments on commit b7f7d31

Please sign in to comment.