Skip to content

Commit

Permalink
feat(dht): convenience function for DHT discover then connect
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Feb 22, 2022
1 parent ab52f5e commit fccb738
Show file tree
Hide file tree
Showing 18 changed files with 273 additions and 103 deletions.
3 changes: 2 additions & 1 deletion applications/tari_base_node/src/commands/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use std::{
cmp,
io::{self, Write},
ops::Deref,
str::FromStr,
string::ToString,
sync::Arc,
Expand Down Expand Up @@ -364,7 +365,7 @@ impl CommandHandler {
println!("🌎 Peer discovery started.");
let peer = self
.discovery_service
.discover_peer(dest_pubkey.clone(), NodeDestination::PublicKey(dest_pubkey))
.discover_peer(dest_pubkey.deref().clone(), NodeDestination::PublicKey(dest_pubkey))
.await?;
println!("⚡️ Discovery succeeded in {}ms!", start.elapsed().as_millis());
println!("This peer was found:");
Expand Down
7 changes: 5 additions & 2 deletions applications/tari_console_wallet/src/automation/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,14 +322,17 @@ pub async fn discover_peer(
) -> Result<(), CommandError> {
use ParsedArgument::*;
let dest_public_key = match args[0].clone() {
PublicKey(key) => Ok(Box::new(key)),
PublicKey(key) => Ok(key),
_ => Err(CommandError::Argument),
}?;

let start = Instant::now();
println!("🌎 Peer discovery started.");
match dht_service
.discover_peer(dest_public_key.clone(), NodeDestination::PublicKey(dest_public_key))
.discover_peer(
dest_public_key.clone(),
NodeDestination::PublicKey(Box::new(dest_public_key)),
)
.await
{
Ok(peer) => {
Expand Down
4 changes: 1 addition & 3 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,7 @@ impl DanNode {
let chain_storage = SqliteStorageService {};
let wallet_client = GrpcWalletClient::new(config.wallet_grpc_address);
let checkpoint_manager = ConcreteCheckpointManager::new(asset_definition.clone(), wallet_client);
let connectivity = handles.expect_handle();
let validator_node_client_factory =
TariCommsValidatorNodeClientFactory::new(connectivity, dht.discovery_service_requester());
let validator_node_client_factory = TariCommsValidatorNodeClientFactory::new(dht.dht_requester());
let mut consensus_worker = ConsensusWorker::<DefaultServiceSpecification>::new(
receiver,
outbound,
Expand Down
8 changes: 3 additions & 5 deletions applications/tari_validator_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use tari_common::{
exit_codes::{ExitCode, ExitError},
GlobalConfig,
};
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::PeerFeatures, NodeIdentity};
use tari_comms::{peer_manager::PeerFeatures, NodeIdentity};
use tari_comms_dht::Dht;
use tari_dan_core::services::{ConcreteAssetProcessor, ConcreteAssetProxy, MempoolServiceHandle, ServiceSpecification};
use tari_dan_storage_sqlite::SqliteDbFactory;
Expand Down Expand Up @@ -124,10 +124,8 @@ async fn run_node(config: GlobalConfig, create_id: bool) -> Result<(), ExitError
.await?;

let asset_processor = ConcreteAssetProcessor::default();
let validator_node_client_factory = TariCommsValidatorNodeClientFactory::new(
handles.expect_handle::<ConnectivityRequester>(),
handles.expect_handle::<Dht>().discovery_service_requester(),
);
let validator_node_client_factory =
TariCommsValidatorNodeClientFactory::new(handles.expect_handle::<Dht>().dht_requester());
let asset_proxy: ConcreteAssetProxy<DefaultServiceSpecification> = ConcreteAssetProxy::new(
GrpcBaseNodeClient::new(validator_node_config.base_node_grpc_address),
validator_node_client_factory,
Expand Down
62 changes: 9 additions & 53 deletions applications/tari_validator_node/src/p2p/services/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,8 @@ use std::convert::TryInto;
use async_trait::async_trait;
use log::*;
use tari_common_types::types::PublicKey;
use tari_comms::{
connection_manager::ConnectionManagerError,
connectivity::{ConnectivityError, ConnectivityRequester},
peer_manager::{NodeId, PeerManagerError},
PeerConnection,
};
use tari_comms_dht::{envelope::NodeDestination, DhtDiscoveryRequester};
use tari_comms::PeerConnection;
use tari_comms_dht::DhtRequester;
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_core::{
models::{Node, SchemaState, SideChainBlock, StateOpLogEntry, TemplateId, TreeNodeHash},
Expand All @@ -44,48 +39,14 @@ use crate::p2p::{proto::validator_node as proto, rpc};
const LOG_TARGET: &str = "tari::validator_node::p2p::services::rpc_client";

pub struct TariCommsValidatorNodeRpcClient {
connectivity: ConnectivityRequester,
dht_discovery: DhtDiscoveryRequester,
dht: DhtRequester,
address: PublicKey,
}

impl TariCommsValidatorNodeRpcClient {
async fn create_connection(&mut self) -> Result<PeerConnection, ValidatorNodeClientError> {
match self.connectivity.dial_peer(NodeId::from(self.address.clone())).await {
Ok(connection) => Ok(connection),
Err(connectivity_error) => {
dbg!(&connectivity_error);
match &connectivity_error {
ConnectivityError::ConnectionFailed(err) => {
match err {
ConnectionManagerError::PeerConnectionError(_) |
ConnectionManagerError::DialConnectFailedAllAddresses |
ConnectionManagerError::PeerIdentityNoValidAddresses |
ConnectionManagerError::PeerManagerError(PeerManagerError::PeerNotFoundError) => {
// Try discover, then dial again
// TODO: Should make discovery and connect the responsibility of the DHT layer
self.dht_discovery
.discover_peer(
Box::new(self.address.clone()),
NodeDestination::PublicKey(Box::new(self.address.clone())),
)
.await?;
if let Some(conn) = self
.connectivity
.get_connection(NodeId::from(self.address.clone()))
.await?
{
return Ok(conn);
}
Ok(self.connectivity.dial_peer(NodeId::from(self.address.clone())).await?)
},
_ => Err(connectivity_error.into()),
}
},
_ => Err(connectivity_error.into()),
}
},
}
let conn = self.dht.dial_or_discover_peer(self.address.clone()).await?;
Ok(conn)
}
}

Expand Down Expand Up @@ -280,16 +241,12 @@ impl ValidatorNodeRpcClient for TariCommsValidatorNodeRpcClient {

#[derive(Clone)]
pub struct TariCommsValidatorNodeClientFactory {
connectivity_requester: ConnectivityRequester,
dht_discovery: DhtDiscoveryRequester,
dht: DhtRequester,
}

impl TariCommsValidatorNodeClientFactory {
pub fn new(connectivity_requester: ConnectivityRequester, dht_discovery: DhtDiscoveryRequester) -> Self {
Self {
connectivity_requester,
dht_discovery,
}
pub fn new(dht: DhtRequester) -> Self {
Self { dht }
}
}

Expand All @@ -299,8 +256,7 @@ impl ValidatorNodeClientFactory for TariCommsValidatorNodeClientFactory {

fn create_client(&self, address: &Self::Addr) -> Self::Client {
TariCommsValidatorNodeRpcClient {
connectivity: self.connectivity_requester.clone(),
dht_discovery: self.dht_discovery.clone(),
dht: self.dht.clone(),
address: address.clone(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/examples/memory_net/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub async fn discovery(wallets: &[TestNode], messaging_events_rx: &mut NodeEvent
.dht
.discovery_service_requester()
.discover_peer(
Box::new(wallet2.node_identity().public_key().clone()),
wallet2.node_identity().public_key().clone(),
wallet2.node_identity().node_id().clone().into(),
)
.await;
Expand Down
Loading

0 comments on commit fccb738

Please sign in to comment.