Skip to content

Commit

Permalink
Add cucumber tests for chat
Browse files Browse the repository at this point in the history
This provides a barebones chat client used for testing within cucumber.
The client is relatively isolated from the test suite itself and could
be expanded on in the future.
  • Loading branch information
brianp committed Mar 29, 2023
1 parent 0ea3ab5 commit edaae1c
Show file tree
Hide file tree
Showing 9 changed files with 497 additions and 10 deletions.
5 changes: 4 additions & 1 deletion integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ tari_app_grpc = { path = "../applications/tari_app_grpc" }
tari_app_utilities = { path = "../applications/tari_app_utilities" }
tari_base_node = { path = "../applications/tari_base_node" }
tari_base_node_grpc_client = { path = "../clients/rust/base_node_grpc_client" }
tari_chat_client = { path = "tests/utils/chat_client" }
tari_common = { path = "../common" }
tari_common_sqlite = { path = "../common_sqlite" }
tari_common_types = { path = "../base_layer/common_types" }
tari_comms = { path = "../comms/core" }
tari_comms_dht = { path = "../comms/dht" }
tari_console_wallet = { path = "../applications/tari_console_wallet" }
tari_contacts = { path = "../base_layer/contacts" }
tari_core = { path = "../base_layer/core" }
tari_merge_mining_proxy = { path = "../applications/tari_merge_mining_proxy" }
tari_miner = { path = "../applications/tari_miner" }
Expand All @@ -24,8 +27,8 @@ tari_script = { path = "../infrastructure/tari_script" }
tari_shutdown = { path = "../infrastructure/shutdown" }
tari_utilities = { git = "https://github.com/tari-project/tari_utilities.git", tag="v0.4.10"}
tari_wallet = { path = "../base_layer/wallet" }
tari_wallet_grpc_client = { path = "../clients/rust/wallet_grpc_client" }
tari_wallet_ffi = { path = "../base_layer/wallet_ffi" }
tari_wallet_grpc_client = { path = "../clients/rust/wallet_grpc_client" }

anyhow = "1.0.53"
async-trait = "0.1.50"
Expand Down
61 changes: 56 additions & 5 deletions integration_tests/tests/cucumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,19 @@ use cucumber::{event::ScenarioFinished, gherkin::Scenario, given, then, when, Wo
use futures::StreamExt;
use indexmap::IndexMap;
use log::*;
use rand::Rng;
use rand::{rngs::OsRng, Rng};
use serde_json::Value;
use tari_app_grpc::tari_rpc::{self as grpc};
use tari_app_utilities::utilities::UniPublicKey;
use tari_base_node::BaseNodeConfig;
use tari_base_node_grpc_client::grpc::{GetBlocksRequest, ListHeadersRequest};
use tari_chat_client::Client;
use tari_common::{configuration::Network, initialize_logging};
use tari_common_types::{
tari_address::TariAddress,
types::{BlindingFactor, ComAndPubSignature, Commitment, PrivateKey, PublicKey},
};
use tari_comms::multiaddr::Multiaddr;
use tari_comms::{multiaddr::Multiaddr, peer_manager::PeerFeatures, NodeIdentity};
use tari_console_wallet::{
BurnTariArgs,
CliCommands,
Expand Down Expand Up @@ -100,8 +101,9 @@ use thiserror::Error;
use tokio::runtime::Runtime;

use crate::utils::{
base_node_process::{spawn_base_node, spawn_base_node_with_config, BaseNodeProcess},
base_node_process::{get_base_dir, spawn_base_node, spawn_base_node_with_config, BaseNodeProcess},
get_peer_addresses,
get_port,
merge_mining_proxy::{register_merge_mining_proxy_process, MergeMiningProxyProcess},
miner::{
mine_block,
Expand Down Expand Up @@ -147,6 +149,7 @@ pub struct TariWorld {
miners: IndexMap<String, MinerProcess>,
ffi_wallets: IndexMap<String, WalletFFI>,
wallets: IndexMap<String, WalletProcess>,
chat_clients: IndexMap<String, Client>,
merge_mining_proxies: IndexMap<String, MergeMiningProxyProcess>,
transactions: IndexMap<String, Transaction>,
wallet_addresses: IndexMap<String, String>, // values are strings representing tari addresses
Expand Down Expand Up @@ -277,10 +280,11 @@ impl TariWorld {

pub async fn after(&mut self, _scenario: &Scenario) {
self.base_nodes.clear();
self.seed_nodes.clear();
self.wallets.clear();
self.chat_clients.clear();
self.ffi_wallets.clear();
self.miners.clear();
self.seed_nodes.clear();
self.wallets.clear();
}
}

Expand Down Expand Up @@ -4980,6 +4984,53 @@ async fn merge_mining_ask_for_block_header_by_hash(world: &mut TariWorld, mining
world.last_merge_miner_response = merge_miner.get_block_header_by_hash(hash).await;
}

#[when(expr = "I have a chat client {word} connected to seed node {word}")]
async fn chat_client_connected_to_base_node(world: &mut TariWorld, name: String, seed_node_name: String) {
let base_node = world.get_node(&seed_node_name).unwrap();

let port = get_port(18000..18499).unwrap();
let temp_dir_path = get_base_dir()
.join("chat_clients")
.join(format!("port_{}", port))
.join(name.clone());
let address = Multiaddr::from_str(&format!("/ip4/127.0.0.1/tcp/{}", port)).unwrap();
let identity = NodeIdentity::random(&mut OsRng, address, PeerFeatures::COMMUNICATION_NODE);

let mut client = Client::new(identity, vec![base_node.identity.to_peer()], temp_dir_path);
client.initialize().await;

world.chat_clients.insert(name, client);
}

#[when(regex = r"^I use (.+) to send a message '(.+)' to (.*)$")]
async fn send_message_to(world: &mut TariWorld, sender: String, message: String, receiver: String) {
let sender = world.chat_clients.get(&sender).unwrap();
let receiver = world.chat_clients.get(&receiver).unwrap();
let address = TariAddress::from_public_key(receiver.identity.public_key(), Network::LocalNet);

sender.send_message(address, message).await;
}

#[then(expr = "{word} will have {int} message(s) with {word}")]
async fn receive_n_messages(world: &mut TariWorld, receiver: String, message_count: u64, sender: String) {
let receiver = world.chat_clients.get(&receiver).unwrap();
let sender = world.chat_clients.get(&sender).unwrap();
let address = TariAddress::from_public_key(sender.identity.public_key(), Network::LocalNet);

let messages = receiver.get_messages(address).await;
assert_eq!(messages.len() as u64, message_count)
}

#[when(expr = "I add {word} as a contact to {word}")]
async fn add_as_contact(world: &mut TariWorld, receiver: String, sender: String) {
let receiver: &Client = world.chat_clients.get(&receiver).unwrap();
let sender: &Client = world.chat_clients.get(&sender).unwrap();

let address = TariAddress::from_public_key(receiver.identity.public_key(), Network::LocalNet);

sender.add_contact(&address).await;
}

fn flush_stdout(buffer: &Arc<Mutex<Vec<u8>>>) {
// After each test we flush the stdout to the logs.
info!(
Expand Down
44 changes: 44 additions & 0 deletions integration_tests/tests/features/Chat.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2023 The Tari Project
# SPDX-License-Identifier: BSD-3-Clause

Feature: Chat messaging

Scenario: A message is propagated between nodes
Given I have a seed node SEED_A
When I have a chat client CHAT_A connected to seed node SEED_A
When I have a chat client CHAT_B connected to seed node SEED_A
When I wait 5 seconds
When I use CHAT_A to send a message 'Hey there' to CHAT_B
When I wait 5 seconds
Then CHAT_B will have 1 message with CHAT_A

Scenario: A message is sent directly between nodes
Given I have a seed node SEED_A
When I have a chat client CHAT_A connected to seed node SEED_A
When I have a chat client CHAT_B connected to seed node SEED_A
When I add CHAT_B as a contact to CHAT_A
When I wait 5 seconds
When I use CHAT_A to send a message 'Hey there' to CHAT_B
When I wait 5 seconds
Then CHAT_B will have 1 message with CHAT_A

Scenario: Message counts are distinct
Given I have a seed node SEED_A
When I have a chat client CHAT_A connected to seed node SEED_A
When I have a chat client CHAT_B connected to seed node SEED_A
When I have a chat client CHAT_C connected to seed node SEED_A
When I wait 5 seconds
When I use CHAT_A to send a message 'Message 1ab' to CHAT_B
When I use CHAT_A to send a message 'Message 2ab' to CHAT_B
When I use CHAT_C to send a message 'Message 1cb' to CHAT_B
When I use CHAT_A to send a message 'Message 1ac' to CHAT_C
When I use CHAT_B to send a message 'Message 1bc' to CHAT_C
When I use CHAT_B to send a message 'Message 2bc' to CHAT_C
When I wait 5 seconds
Then CHAT_B will have 2 messages with CHAT_A
Then CHAT_B will have 3 messages with CHAT_C
Then CHAT_C will have 1 messages with CHAT_A
Then CHAT_C will have 3 messages with CHAT_B
Then CHAT_A will have 2 messages with CHAT_B
Then CHAT_A will have 1 messages with CHAT_C

5 changes: 1 addition & 4 deletions integration_tests/tests/utils/base_node_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::{
TariWorld,
};

#[derive(Clone)]
pub struct BaseNodeProcess {
pub name: String,
pub port: u64,
Expand Down Expand Up @@ -175,11 +176,7 @@ pub async fn spawn_base_node_with_config(
format!("/ip4/127.0.0.1/tcp/{}", port).parse().unwrap();
base_node_config.base_node.p2p.public_addresses =
vec![base_node_config.base_node.p2p.transport.tcp.listener_address.clone()];
// base_node_config.base_node.p2p.datastore_path = temp_dir_path.to_path_buf();
// base_node_config.base_node.p2p.peer_database_name = "peer_db.mdb".to_string();
base_node_config.base_node.p2p.dht = DhtConfig::default_local_test();
// base_node_config.base_node.p2p.dht.database_url =
// DbConnectionUrl::File(temp_dir_path.clone().join("dht.sqlite"));
base_node_config.base_node.p2p.dht.network_discovery.enabled = true;
base_node_config.base_node.p2p.allow_test_addresses = true;
base_node_config.base_node.storage.orphan_storage_capacity = 10;
Expand Down
23 changes: 23 additions & 0 deletions integration_tests/tests/utils/chat_client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "tari_chat_client"
authors = ["The Tari Development Community"]
description = "Tari cucumber chat client"
license = "BSD-3-Clause"
version = "0.48.0-pre.1"
edition = "2018"

[dependencies]
tari_common_types = { path = "../../../../base_layer/common_types" }
tari_common_sqlite = { path = "../../../../common_sqlite" }
tari_comms = { path = "../../../../comms/core" }
tari_comms_dht = { path = "../../../../comms/dht" }
tari_contacts = { path = "../../../../base_layer/contacts" }
tari_p2p = { path = "../../../../base_layer/p2p" }
tari_service_framework= { path = "../../../../base_layer/service_framework" }
tari_shutdown = { path = "../../../../infrastructure/shutdown" }
tari_test_utils = { path = "../../../../infrastructure/test_utils" }
tari_storage = { path = "../../../../infrastructure/storage" }

anyhow = "1.0.41"
diesel = { version = "2.0.3", features = ["sqlite", "r2d2", "serde_json", "chrono", "64-column-tables"] }
lmdb-zero = "0.4.4"
149 changes: 149 additions & 0 deletions integration_tests/tests/utils/chat_client/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2023. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
fmt::{Debug, Formatter},
path::PathBuf,
sync::Arc,
time::Duration,
};

use tari_common_types::tari_address::TariAddress;
use tari_comms::{peer_manager::Peer, CommsNode, NodeIdentity};
use tari_contacts::contacts_service::{
handle::ContactsServiceHandle,
types::{Message, MessageBuilder},
};
use tari_shutdown::Shutdown;

use crate::{database, networking};

#[derive(Clone)]
pub struct Client {
pub identity: Arc<NodeIdentity>,
pub base_dir: PathBuf,
pub seed_peers: Vec<Peer>,
pub shutdown: Shutdown,
pub contacts: Option<ContactsServiceHandle>,
}

impl Debug for Client {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Client")
.field("identity", &self.identity)
.field("base_dir", &self.base_dir)
.field("seed_peers", &self.seed_peers)
.field("shutdown", &self.shutdown)
.finish()
}
}

impl Drop for Client {
fn drop(&mut self) {
self.quit();
}
}

impl Client {
pub fn new(identity: NodeIdentity, seed_peers: Vec<Peer>, base_dir: PathBuf) -> Self {
Self {
identity: Arc::new(identity),
base_dir,
seed_peers,
shutdown: Shutdown::new(),
contacts: None,
}
}

pub async fn add_contact(&self, address: &TariAddress) {
if let Some(mut contacts_service) = self.contacts.clone() {
contacts_service
.upsert_contact(address.into())
.await
.expect("Contact wasn't added");
}
}

pub async fn send_message(&self, receiver: TariAddress, message: String) {
if let Some(mut contacts_service) = self.contacts.clone() {
contacts_service
.send_message(MessageBuilder::new().message(message).address(receiver).build())
.await
.expect("Message wasn't sent");
}
}

pub async fn get_messages(&self, sender: TariAddress) -> Vec<Message> {
let mut messages = vec![];
if let Some(mut contacts_service) = self.contacts.clone() {
messages = contacts_service
.get_messages(sender, 0, 0)
.await
.expect("Messages not fetched");
}

messages
}

pub async fn initialize(&mut self) {
println!("initializing chat");

let signal = self.shutdown.to_signal();
let db = database::create_chat_storage(self.base_dir.clone()).unwrap();

let (contacts, comms_node) = networking::start(
self.identity.clone(),
self.base_dir.clone(),
self.seed_peers.clone(),
db,
signal,
)
.await
.unwrap();

if !self.seed_peers.is_empty() {
loop {
println!("Waiting for peer connections...");
match wait_for_connectivity(comms_node.clone()).await {
Ok(_) => break,
Err(e) => println!("{}. Still waiting...", e),
}
}
}

self.contacts = Some(contacts);

println!("Connections established")
}

pub fn quit(&mut self) {
self.shutdown.trigger();
}
}

pub async fn wait_for_connectivity(comms: CommsNode) -> anyhow::Result<()> {
comms
.connectivity()
.wait_for_connectivity(Duration::from_secs(30))
.await?;
Ok(())
}
Loading

0 comments on commit edaae1c

Please sign in to comment.