diff --git a/Cargo.lock b/Cargo.lock index 513a18f56c2..fa07d1578ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5710,6 +5710,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tower", "tracing", "uint", "zeroize", diff --git a/base_layer/core/Cargo.toml b/base_layer/core/Cargo.toml index b9d212d1e8c..7f66301c8ee 100644 --- a/base_layer/core/Cargo.toml +++ b/base_layer/core/Cargo.toml @@ -76,6 +76,7 @@ tokio = { version = "1.23", features = ["time", "sync", "macros"] } tracing = "0.1.26" uint = { version = "0.9", default-features = false } zeroize = "1" +tower = "0.4.11" [dev-dependencies] tari_p2p = { path = "../../base_layer/p2p", features = ["test-mocks"] } diff --git a/base_layer/core/tests/helpers/nodes.rs b/base_layer/core/tests/helpers/nodes.rs index 189ac0d8724..05775cd111c 100644 --- a/base_layer/core/tests/helpers/nodes.rs +++ b/base_layer/core/tests/helpers/nodes.rs @@ -20,11 +20,18 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{path::Path, sync::Arc, time::Duration}; - +use std::{iter, path::Path, sync::Arc, time::Duration}; +use lmdb_zero::open; +use rand::distributions::Alphanumeric; +use tokio::sync::{broadcast, mpsc}; +use tower::ServiceBuilder; use rand::rngs::OsRng; +use rand::{Rng, thread_rng}; use tari_common::configuration::Network; -use tari_comms::{peer_manager::{NodeIdentity, PeerFeatures}, protocol::messaging::MessagingEventSender, transports::MemoryTransport, CommsNode, UnspawnedCommsNode}; +use tari_comms::{peer_manager::{NodeIdentity, PeerFeatures}, protocol::messaging::MessagingEventSender, transports::MemoryTransport, CommsNode, UnspawnedCommsNode, CommsBuilder, pipeline}; +use tari_comms::backoff::ConstantBackoff; +use tari_comms::peer_manager::Peer; +use tari_comms::protocol::messaging::MessagingProtocolExtension; use tari_comms::protocol::rpc::RpcServer; use tari_comms_dht::{outbound::OutboundMessageRequester, Dht}; use tari_core::{base_node::{ @@ -48,9 +55,13 @@ use tari_core::{base_node::{ InternalConsistencyValidator, }}; use tari_core::chain_storage::async_db::AsyncBlockchainDb; -use tari_p2p::{comms_connector::{pubsub_connector, InboundDomainConnector}, initialization::initialize_local_test_comms, P2pConfig, services::liveness::{config::LivenessConfig, LivenessHandle, LivenessInitializer}}; +use tari_core::chain_storage::BlockchainBackend; +use tari_p2p::{comms_connector::{pubsub_connector, InboundDomainConnector}, P2pConfig, services::liveness::{config::LivenessConfig, LivenessHandle, LivenessInitializer}}; +use tari_p2p::initialization::{add_seed_peers, CommsInitializationError, MESSAGING_PROTOCOL_ID}; use tari_service_framework::{RegisterHandle, ServiceHandles, StackBuilder}; -use tari_shutdown::Shutdown; +use tari_shutdown::{Shutdown, ShutdownSignal}; +use tari_storage::lmdb_store::{LMDBBuilder, LMDBConfig}; +use tari_storage::LMDBWrapper; use crate::helpers::mock_state_machine::MockBaseNodeStateMachine; @@ -90,6 +101,7 @@ pub struct BaseNodeBuilder { mempool_config: Option, mempool_service_config: Option, liveness_service_config: Option, + p2p_config: Option, validators: Option>, consensus_manager: Option, network: NetworkConsensus, @@ -105,6 +117,7 @@ impl BaseNodeBuilder { mempool_config: None, mempool_service_config: None, liveness_service_config: None, + p2p_config: None, validators: None, consensus_manager: None, network, @@ -141,6 +154,12 @@ impl BaseNodeBuilder { self } + /// Set the p2p configuration + pub fn with_p2p_config(mut self, config: P2pConfig) -> Self { + self.p2p_config = Some(config); + self + } + pub fn with_validators( mut self, block: impl CandidateBlockValidator + 'static, @@ -188,6 +207,7 @@ impl BaseNodeBuilder { mempool, consensus_manager.clone(), self.liveness_service_config.unwrap_or_default(), + self.p2p_config.unwrap_or_default(), data_path, ) .await; @@ -236,6 +256,7 @@ pub async fn create_network_with_2_base_nodes(data_path: &str) -> (NodeInterface pub async fn create_network_with_2_base_nodes_with_config>( mempool_service_config: MempoolServiceConfig, liveness_service_config: LivenessConfig, + p2p_config: P2pConfig, consensus_manager: ConsensusManager, data_path: P, ) -> (NodeInterfaces, NodeInterfaces, ConsensusManager) { @@ -246,6 +267,7 @@ pub async fn create_network_with_2_base_nodes_with_config>( .with_node_identity(alice_node_identity.clone()) .with_mempool_service_config(mempool_service_config.clone()) .with_liveness_service_config(liveness_service_config.clone()) + .with_p2p_config(p2p_config.clone()) .with_consensus_manager(consensus_manager) .start(data_path.as_ref().join("alice").as_os_str().to_str().unwrap()) .await; @@ -254,6 +276,7 @@ pub async fn create_network_with_2_base_nodes_with_config>( .with_peers(vec![alice_node_identity]) .with_mempool_service_config(mempool_service_config) .with_liveness_service_config(liveness_service_config) + .with_p2p_config(p2p_config.clone()) .with_consensus_manager(consensus_manager) .start(data_path.as_ref().join("bob").as_os_str().to_str().unwrap()) .await; @@ -340,16 +363,17 @@ pub fn random_node_identity() -> Arc { // Helper function for starting the comms stack. #[allow(dead_code)] -async fn setup_comms_services( +async fn setup_comms_services( node_identity: Arc, peers: Vec>, publisher: InboundDomainConnector, data_path: &str, -) -> (CommsNode, Dht, MessagingEventSender, Shutdown) { +) -> (CommsNode, Dht, MessagingEventSender, Shutdown) +{ let peers = peers.into_iter().map(|p| p.to_peer()).collect(); let shutdown = Shutdown::new(); - let (comms, dht, messaging_events) = initialize_local_test_comms( + let (comms, dht, messaging_events) = initialize_local_test_comms_with_rpc( node_identity, publisher, data_path, @@ -363,6 +387,96 @@ async fn setup_comms_services( (comms, dht, messaging_events, shutdown) } +/// Initialize Tari Comms configured for tests +pub async fn initialize_local_test_comms_with_rpc, T: BlockchainBackend>( + node_identity: Arc, + connector: InboundDomainConnector, + data_path: P, + discovery_request_timeout: Duration, + seed_peers: Vec, + shutdown_signal: ShutdownSignal, +) -> Result<(CommsNode, Dht, MessagingEventSender), CommsInitializationError> +{ + let peer_database_name = { + let mut rng = thread_rng(); + iter::repeat(()) + .map(|_| rng.sample(Alphanumeric) as char) + .take(8) + .collect::() + }; + std::fs::create_dir_all(&data_path).unwrap(); + let datastore = LMDBBuilder::new() + .set_path(&data_path) + .set_env_flags(open::NOLOCK) + .set_env_config(LMDBConfig::default()) + .set_max_number_of_databases(1) + .add_database(&peer_database_name, lmdb_zero::db::CREATE) + .build() + .unwrap(); + let peer_database = datastore.get_handle(&peer_database_name).unwrap(); + let peer_database = LMDBWrapper::new(Arc::new(peer_database)); + + //---------------------------------- Comms --------------------------------------------// + + let comms = CommsBuilder::new() + .allow_test_addresses() + .with_listener_address(node_identity.first_public_address().unwrap()) + .with_listener_liveness_max_sessions(1) + .with_node_identity(node_identity) + .with_user_agent(&"/test/1.0") + .with_peer_storage(peer_database, None) + .with_dial_backoff(ConstantBackoff::new(Duration::from_millis(500))) + .with_min_connectivity(1) + .with_network_byte(Network::LocalNet.as_byte()) + .with_shutdown_signal(shutdown_signal) + .build()?; + + add_seed_peers(&comms.peer_manager(), &comms.node_identity(), seed_peers).await?; + + // Create outbound channel + let (outbound_tx, outbound_rx) = mpsc::channel(10); + + let dht = Dht::builder() + .local_test() + .with_outbound_sender(outbound_tx) + .with_discovery_timeout(discovery_request_timeout) + .build( + comms.node_identity(), + comms.peer_manager(), + comms.connectivity(), + comms.shutdown_signal(), + ) + .await?; + + let dht_outbound_layer = dht.outbound_middleware_layer(); + let (event_sender, _) = broadcast::channel(100); + let pipeline = pipeline::Builder::new() + .with_outbound_pipeline(outbound_rx, |sink| { + ServiceBuilder::new().layer(dht_outbound_layer).service(sink) + }) + .max_concurrent_inbound_tasks(10) + .with_inbound_pipeline( + ServiceBuilder::new() + .layer(dht.inbound_middleware_layer()) + .service(connector), + ) + .build(); + + let comms = comms + .add_protocol_extension( + MessagingProtocolExtension::new(MESSAGING_PROTOCOL_ID.clone(), event_sender.clone(), pipeline) + .enable_message_received_event(), + ) + .spawn_with_transport(MemoryTransport) + .await?; + + comms + .node_identity() + .add_public_address(comms.listening_address().clone()); + + Ok((comms, dht, event_sender)) +} + // Helper function for starting the services of the Base node. async fn setup_base_node_services( node_identity: Arc, @@ -371,6 +485,7 @@ async fn setup_base_node_services( mempool: Mempool, consensus_manager: ConsensusManager, liveness_service_config: LivenessConfig, + p2p_config: P2pConfig, data_path: &str, ) -> NodeInterfaces { let (publisher, subscription_factory) = pubsub_connector(100); @@ -402,11 +517,18 @@ async fn setup_base_node_services( .await .unwrap(); - let comms = handles - .take_handle::() - .expect("P2pInitializer was not added to the stack or did not add UnspawnedCommsNode"); - let p2p_config = P2pConfig::default(); - let comms = setup_base_node_rpc_service(comms, &handles, blockchain_db.into(), &p2p_config); + let base_node_service = handles.expect_handle::(); + let rpc_server = RpcServer::builder() + .with_maximum_simultaneous_sessions(p2p_config.rpc_max_simultaneous_sessions) + .with_maximum_sessions_per_client(p2p_config.rpc_max_sessions_per_peer) + .finish(); + let rpc_server = rpc_server + .add_service(base_node::create_base_node_sync_rpc_service( + blockchain_db.clone().into(), + base_node_service, + )); + handles.register(rpc_server.get_handle()); + comms.add_protocol_extension(rpc_server); let outbound_nci = handles.expect_handle::(); let local_nci = handles.expect_handle::(); @@ -437,28 +559,3 @@ async fn setup_base_node_services( state_machine_handle, } } - -fn setup_base_node_rpc_service( - comms: UnspawnedCommsNode, - handles: &ServiceHandles, - db: AsyncBlockchainDb, - config: &P2pConfig, -) -> UnspawnedCommsNode { - let base_node_service = handles.expect_handle::(); - let rpc_server = RpcServer::builder() - .with_maximum_simultaneous_sessions(config.rpc_max_simultaneous_sessions) - .with_maximum_sessions_per_client(config.rpc_max_sessions_per_peer) - .finish(); - - // Add your RPC services here ‍🏴‍☠️️☮️🌊 - let rpc_server = rpc_server - .add_service(base_node::create_base_node_sync_rpc_service( - db.clone(), - base_node_service, - )); - - handles.register(rpc_server.get_handle()); - - comms.add_protocol_extension(rpc_server) -} - diff --git a/base_layer/core/tests/tests/header_sync.rs b/base_layer/core/tests/tests/header_sync.rs index 8f75b42a1ad..ddfe662a5a5 100644 --- a/base_layer/core/tests/tests/header_sync.rs +++ b/base_layer/core/tests/tests/header_sync.rs @@ -46,6 +46,7 @@ use tempfile::tempdir; use tokio::{ sync::{broadcast, watch}, }; +use tari_p2p::P2pConfig; use crate::helpers::{ block_builders::{append_block, chain_block, create_genesis_block}, @@ -74,6 +75,7 @@ async fn test_header_sync() { auto_ping_interval: Some(Duration::from_millis(100)), ..Default::default() }, + P2pConfig::default(), consensus_manager, temp_dir.path().to_str().unwrap(), ) diff --git a/base_layer/p2p/src/initialization.rs b/base_layer/p2p/src/initialization.rs index f87eed31b0d..f36c738837b 100644 --- a/base_layer/p2p/src/initialization.rs +++ b/base_layer/p2p/src/initialization.rs @@ -424,7 +424,7 @@ fn acquire_exclusive_file_lock(db_path: &Path) -> Result,