Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add contacts liveness service to base layer wallet #3857

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion applications/tari_base_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ where B: BlockchainBackend + 'static
.add_initializer(mempool_sync)
.add_initializer(LivenessInitializer::new(
LivenessConfig {
auto_ping_interval: Some(Duration::from_secs(config.auto_ping_interval)),
auto_ping_interval: Some(Duration::from_secs(config.metadata_auto_ping_interval)),
monitored_peers: sync_peers.clone(),
..Default::default()
},
Expand Down
3 changes: 2 additions & 1 deletion applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{fs, path::PathBuf, str::FromStr, sync::Arc};
use std::{fs, path::PathBuf, str::FromStr, sync::Arc, time::Duration};

use log::*;
use rpassword::prompt_password_stdout;
Expand Down Expand Up @@ -453,6 +453,7 @@ pub async fn init_wallet(
Some(config.buffer_rate_limit_console_wallet),
Some(updater_config),
config.autoupdate_check_interval,
Some(Duration::from_secs(config.contacts_auto_ping_interval)),
);

let mut wallet = Wallet::start(
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl AppState {
},
};

let contact = Contact { alias, public_key };
let contact = Contact::new(alias, public_key, None, None);
inner.wallet.contacts_service.upsert_contact(contact).await?;

inner.refresh_contacts_state().await?;
Expand Down
2 changes: 2 additions & 0 deletions base_layer/p2p/src/proto/liveness.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ enum MetadataKey {
MetadataKeyNone = 0;
// The value for this key contains chain metadata
MetadataKeyChainMetadata = 1;
// The value for this key contains empty data
MetadataKeyContactsLiveness = 2;
}
24 changes: 24 additions & 0 deletions base_layer/p2p/src/services/liveness/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ pub enum LivenessRequest {
GetNetworkAvgLatency,
/// Set the metadata attached to each ping/pong message
SetMetadataEntry(MetadataKey, Vec<u8>),
/// Add a monitored peer to the basic config
AddMonitoredPeer(NodeId),
/// Remove a monitored peer from the basic config
RemoveMonitoredPeer(NodeId),
}

/// Response type for `LivenessService`
Expand Down Expand Up @@ -153,6 +157,26 @@ impl LivenessHandle {
}
}

/// Add a monitored peer to the basic config if not present
pub async fn check_add_monitored_peer(&mut self, node_id: NodeId) -> Result<(), LivenessError> {
match self.handle.call(LivenessRequest::AddMonitoredPeer(node_id)).await?? {
LivenessResponse::Ok => Ok(()),
_ => Err(LivenessError::UnexpectedApiResponse),
}
}

/// Remove a monitored peer from the basic config if present
pub async fn check_remove_monitored_peer(&mut self, node_id: NodeId) -> Result<(), LivenessError> {
match self
.handle
.call(LivenessRequest::RemoveMonitoredPeer(node_id))
.await??
{
LivenessResponse::Ok => Ok(()),
_ => Err(LivenessError::UnexpectedApiResponse),
}
}

/// Retrieve the average latency for a given node
pub async fn get_avg_latency(&mut self, node_id: NodeId) -> Result<Option<Duration>, LivenessError> {
match self.handle.call(LivenessRequest::GetAvgLatency(node_id)).await?? {
Expand Down
6 changes: 6 additions & 0 deletions base_layer/p2p/src/services/liveness/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ impl LivenessMock {
SetMetadataEntry(_, _) => {
reply.send(Ok(LivenessResponse::Ok)).unwrap();
},
AddMonitoredPeer(_) => {
reply.send(Ok(LivenessResponse::Ok)).unwrap();
},
RemoveMonitoredPeer(_) => {
reply.send(Ok(LivenessResponse::Ok)).unwrap();
},
}
}
}
23 changes: 20 additions & 3 deletions base_layer/p2p/src/services/liveness/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tari_comms_dht::{
};
use tari_service_framework::reply_channel::RequestContext;
use tari_shutdown::ShutdownSignal;
use tokio::{time, time::MissedTickBehavior};
use tokio::{sync::RwLock, time, time::MissedTickBehavior};
use tokio_stream::wrappers;

use super::{
Expand Down Expand Up @@ -63,6 +63,7 @@ pub struct LivenessService<THandleStream, TPingStream> {
outbound_messaging: OutboundMessageRequester,
event_publisher: LivenessEventSender,
shutdown_signal: ShutdownSignal,
monitored_peers: Arc<RwLock<Vec<NodeId>>>,
}

impl<TRequestStream, TPingStream> LivenessService<TRequestStream, TPingStream>
Expand All @@ -88,7 +89,8 @@ where
outbound_messaging,
event_publisher,
shutdown_signal,
config,
config: config.clone(),
monitored_peers: Arc::new(RwLock::new(config.monitored_peers)),
}
}

Expand Down Expand Up @@ -254,10 +256,25 @@ where
self.state.set_metadata_entry(key, value);
Ok(LivenessResponse::Ok)
},
AddMonitoredPeer(node_id) => {
let node_id_exists = { self.monitored_peers.read().await.iter().any(|val| val == &node_id) };
if !node_id_exists {
self.monitored_peers.write().await.push(node_id.clone());
}
Ok(LivenessResponse::Ok)
},
RemoveMonitoredPeer(node_id) => {
let node_id_exists = { self.monitored_peers.read().await.iter().position(|val| *val == node_id) };
if let Some(pos) = node_id_exists {
self.monitored_peers.write().await.swap_remove(pos);
}
Ok(LivenessResponse::Ok)
},
}
}

async fn start_ping_round(&mut self) -> Result<(), LivenessError> {
let monitored_peers = { self.monitored_peers.read().await.clone() };
let selected_peers = self
.connectivity
.select_connections(ConnectivitySelection::random_nodes(
Expand All @@ -267,7 +284,7 @@ where
.await?
.into_iter()
.map(|c| c.peer_node_id().clone())
.chain(self.config.monitored_peers.clone())
.chain(monitored_peers)
.collect::<Vec<_>>();

if selected_peers.is_empty() {
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ tempfile = "3.1.0"
thiserror = "1.0.26"
tokio = { version = "1.11", features = ["sync", "macros"] }
tower = "0.4"
prost = "0.9"

[dependencies.tari_core]
path = "../../base_layer/core"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- This migration is only meant for fresh databases on a testnet reset, so the down is not needed
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
-- Copyright 2021. The Tari Project
--
-- Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
-- following conditions are met:
--
-- 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
-- disclaimer.
--
-- 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
-- following disclaimer in the documentation and/or other materials provided with the distribution.
--
-- 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
-- products derived from this software without specific prior written permission.
--
-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
-- INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-- DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-- SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-- SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
-- WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
-- USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

PRAGMA foreign_keys=OFF;
DROP TABLE contacts;
CREATE TABLE contacts (
public_key BLOB PRIMARY KEY NOT NULL UNIQUE,
node_id BLOB NOT NULL UNIQUE,
alias TEXT NOT NULL,
last_seen DATETIME,
latency INTEGER
);
PRAGMA foreign_keys=ON;
3 changes: 3 additions & 0 deletions base_layer/wallet/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct WalletConfig {
pub base_node_service_config: BaseNodeServiceConfig,
pub updater_config: Option<AutoUpdateConfig>,
pub autoupdate_check_interval: Option<Duration>,
pub contacts_auto_ping_interval: Duration,
}

impl WalletConfig {
Expand All @@ -59,6 +60,7 @@ impl WalletConfig {
rate_limit: Option<usize>,
updater_config: Option<AutoUpdateConfig>,
autoupdate_check_interval: Option<Duration>,
contacts_auto_ping_interval: Option<Duration>,
) -> Self {
Self {
comms_config,
Expand All @@ -71,6 +73,7 @@ impl WalletConfig {
base_node_service_config: base_node_service_config.unwrap_or_default(),
updater_config,
autoupdate_check_interval,
contacts_auto_ping_interval: contacts_auto_ping_interval.unwrap_or_else(|| Duration::from_secs(20)),
}
}
}
3 changes: 3 additions & 0 deletions base_layer/wallet/src/contacts_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use diesel::result::Error as DieselError;
use tari_p2p::services::liveness::error::LivenessError;
use tari_service_framework::reply_channel::TransportChannelError;
use thiserror::Error;

Expand All @@ -37,6 +38,8 @@ pub enum ContactsServiceError {
ContactsServiceStorageError(#[from] ContactsServiceStorageError),
#[error("Transport channel error: `{0}`")]
TransportChannelError(#[from] TransportChannelError),
#[error("Livenessl error: `{0}`")]
LivenessError(#[from] LivenessError),
}

#[derive(Debug, Error)]
Expand Down
Loading