Skip to content

Commit

Permalink
rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Oct 13, 2023
1 parent 2a12ee1 commit 96ffc72
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 39 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions base_layer/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ tokio = { version = "1.23", features = ["time", "sync", "macros"] }
tracing = "0.1.26"
uint = { version = "0.9", default-features = false }
zeroize = "1"
tower = "0.4.11"

[dev-dependencies]
tari_p2p = { path = "../../base_layer/p2p", features = ["test-mocks"] }
Expand Down
173 changes: 135 additions & 38 deletions base_layer/core/tests/helpers/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,18 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{path::Path, sync::Arc, time::Duration};

use std::{iter, path::Path, sync::Arc, time::Duration};
use lmdb_zero::open;
use rand::distributions::Alphanumeric;
use tokio::sync::{broadcast, mpsc};
use tower::ServiceBuilder;
use rand::rngs::OsRng;
use rand::{Rng, thread_rng};
use tari_common::configuration::Network;
use tari_comms::{peer_manager::{NodeIdentity, PeerFeatures}, protocol::messaging::MessagingEventSender, transports::MemoryTransport, CommsNode, UnspawnedCommsNode};
use tari_comms::{peer_manager::{NodeIdentity, PeerFeatures}, protocol::messaging::MessagingEventSender, transports::MemoryTransport, CommsNode, UnspawnedCommsNode, CommsBuilder, pipeline};
use tari_comms::backoff::ConstantBackoff;
use tari_comms::peer_manager::Peer;
use tari_comms::protocol::messaging::MessagingProtocolExtension;
use tari_comms::protocol::rpc::RpcServer;
use tari_comms_dht::{outbound::OutboundMessageRequester, Dht};
use tari_core::{base_node::{
Expand All @@ -48,9 +55,13 @@ use tari_core::{base_node::{
InternalConsistencyValidator,
}};
use tari_core::chain_storage::async_db::AsyncBlockchainDb;
use tari_p2p::{comms_connector::{pubsub_connector, InboundDomainConnector}, initialization::initialize_local_test_comms, P2pConfig, services::liveness::{config::LivenessConfig, LivenessHandle, LivenessInitializer}};
use tari_core::chain_storage::BlockchainBackend;
use tari_p2p::{comms_connector::{pubsub_connector, InboundDomainConnector}, P2pConfig, services::liveness::{config::LivenessConfig, LivenessHandle, LivenessInitializer}};
use tari_p2p::initialization::{add_seed_peers, CommsInitializationError, MESSAGING_PROTOCOL_ID};
use tari_service_framework::{RegisterHandle, ServiceHandles, StackBuilder};
use tari_shutdown::Shutdown;
use tari_shutdown::{Shutdown, ShutdownSignal};
use tari_storage::lmdb_store::{LMDBBuilder, LMDBConfig};
use tari_storage::LMDBWrapper;

use crate::helpers::mock_state_machine::MockBaseNodeStateMachine;

Expand Down Expand Up @@ -90,6 +101,7 @@ pub struct BaseNodeBuilder {
mempool_config: Option<MempoolConfig>,
mempool_service_config: Option<MempoolServiceConfig>,
liveness_service_config: Option<LivenessConfig>,
p2p_config: Option<P2pConfig>,
validators: Option<Validators<TempDatabase>>,
consensus_manager: Option<ConsensusManager>,
network: NetworkConsensus,
Expand All @@ -105,6 +117,7 @@ impl BaseNodeBuilder {
mempool_config: None,
mempool_service_config: None,
liveness_service_config: None,
p2p_config: None,
validators: None,
consensus_manager: None,
network,
Expand Down Expand Up @@ -141,6 +154,12 @@ impl BaseNodeBuilder {
self
}

/// Set the p2p configuration
pub fn with_p2p_config(mut self, config: P2pConfig) -> Self {
self.p2p_config = Some(config);
self
}

pub fn with_validators(
mut self,
block: impl CandidateBlockValidator<TempDatabase> + 'static,
Expand Down Expand Up @@ -188,6 +207,7 @@ impl BaseNodeBuilder {
mempool,
consensus_manager.clone(),
self.liveness_service_config.unwrap_or_default(),
self.p2p_config.unwrap_or_default(),
data_path,
)
.await;
Expand Down Expand Up @@ -236,6 +256,7 @@ pub async fn create_network_with_2_base_nodes(data_path: &str) -> (NodeInterface
pub async fn create_network_with_2_base_nodes_with_config<P: AsRef<Path>>(
mempool_service_config: MempoolServiceConfig,
liveness_service_config: LivenessConfig,
p2p_config: P2pConfig,
consensus_manager: ConsensusManager,
data_path: P,
) -> (NodeInterfaces, NodeInterfaces, ConsensusManager) {
Expand All @@ -246,6 +267,7 @@ pub async fn create_network_with_2_base_nodes_with_config<P: AsRef<Path>>(
.with_node_identity(alice_node_identity.clone())
.with_mempool_service_config(mempool_service_config.clone())
.with_liveness_service_config(liveness_service_config.clone())
.with_p2p_config(p2p_config.clone())
.with_consensus_manager(consensus_manager)
.start(data_path.as_ref().join("alice").as_os_str().to_str().unwrap())
.await;
Expand All @@ -254,6 +276,7 @@ pub async fn create_network_with_2_base_nodes_with_config<P: AsRef<Path>>(
.with_peers(vec![alice_node_identity])
.with_mempool_service_config(mempool_service_config)
.with_liveness_service_config(liveness_service_config)
.with_p2p_config(p2p_config.clone())
.with_consensus_manager(consensus_manager)
.start(data_path.as_ref().join("bob").as_os_str().to_str().unwrap())
.await;
Expand Down Expand Up @@ -340,16 +363,17 @@ pub fn random_node_identity() -> Arc<NodeIdentity> {

// Helper function for starting the comms stack.
#[allow(dead_code)]
async fn setup_comms_services(
async fn setup_comms_services<T: BlockchainBackend>(
node_identity: Arc<NodeIdentity>,
peers: Vec<Arc<NodeIdentity>>,
publisher: InboundDomainConnector,
data_path: &str,
) -> (CommsNode, Dht, MessagingEventSender, Shutdown) {
) -> (CommsNode, Dht, MessagingEventSender, Shutdown)
{
let peers = peers.into_iter().map(|p| p.to_peer()).collect();
let shutdown = Shutdown::new();

let (comms, dht, messaging_events) = initialize_local_test_comms(
let (comms, dht, messaging_events) = initialize_local_test_comms_with_rpc(
node_identity,
publisher,
data_path,
Expand All @@ -363,6 +387,96 @@ async fn setup_comms_services(
(comms, dht, messaging_events, shutdown)
}

/// Initialize Tari Comms configured for tests
pub async fn initialize_local_test_comms_with_rpc<P: AsRef<Path>, T: BlockchainBackend>(
node_identity: Arc<NodeIdentity>,
connector: InboundDomainConnector,
data_path: P,
discovery_request_timeout: Duration,
seed_peers: Vec<Peer>,
shutdown_signal: ShutdownSignal,
) -> Result<(CommsNode, Dht, MessagingEventSender), CommsInitializationError>
{
let peer_database_name = {
let mut rng = thread_rng();
iter::repeat(())
.map(|_| rng.sample(Alphanumeric) as char)
.take(8)
.collect::<String>()
};
std::fs::create_dir_all(&data_path).unwrap();
let datastore = LMDBBuilder::new()
.set_path(&data_path)
.set_env_flags(open::NOLOCK)
.set_env_config(LMDBConfig::default())
.set_max_number_of_databases(1)
.add_database(&peer_database_name, lmdb_zero::db::CREATE)
.build()
.unwrap();
let peer_database = datastore.get_handle(&peer_database_name).unwrap();
let peer_database = LMDBWrapper::new(Arc::new(peer_database));

//---------------------------------- Comms --------------------------------------------//

let comms = CommsBuilder::new()
.allow_test_addresses()
.with_listener_address(node_identity.first_public_address().unwrap())
.with_listener_liveness_max_sessions(1)
.with_node_identity(node_identity)
.with_user_agent(&"/test/1.0")
.with_peer_storage(peer_database, None)
.with_dial_backoff(ConstantBackoff::new(Duration::from_millis(500)))
.with_min_connectivity(1)
.with_network_byte(Network::LocalNet.as_byte())
.with_shutdown_signal(shutdown_signal)
.build()?;

add_seed_peers(&comms.peer_manager(), &comms.node_identity(), seed_peers).await?;

// Create outbound channel
let (outbound_tx, outbound_rx) = mpsc::channel(10);

let dht = Dht::builder()
.local_test()
.with_outbound_sender(outbound_tx)
.with_discovery_timeout(discovery_request_timeout)
.build(
comms.node_identity(),
comms.peer_manager(),
comms.connectivity(),
comms.shutdown_signal(),
)
.await?;

let dht_outbound_layer = dht.outbound_middleware_layer();
let (event_sender, _) = broadcast::channel(100);
let pipeline = pipeline::Builder::new()
.with_outbound_pipeline(outbound_rx, |sink| {
ServiceBuilder::new().layer(dht_outbound_layer).service(sink)
})
.max_concurrent_inbound_tasks(10)
.with_inbound_pipeline(
ServiceBuilder::new()
.layer(dht.inbound_middleware_layer())
.service(connector),
)
.build();

let comms = comms
.add_protocol_extension(
MessagingProtocolExtension::new(MESSAGING_PROTOCOL_ID.clone(), event_sender.clone(), pipeline)
.enable_message_received_event(),
)
.spawn_with_transport(MemoryTransport)
.await?;

comms
.node_identity()
.add_public_address(comms.listening_address().clone());

Ok((comms, dht, event_sender))
}

// Helper function for starting the services of the Base node.
async fn setup_base_node_services(
node_identity: Arc<NodeIdentity>,
Expand All @@ -371,6 +485,7 @@ async fn setup_base_node_services(
mempool: Mempool,
consensus_manager: ConsensusManager,
liveness_service_config: LivenessConfig,
p2p_config: P2pConfig,
data_path: &str,
) -> NodeInterfaces {
let (publisher, subscription_factory) = pubsub_connector(100);
Expand Down Expand Up @@ -402,11 +517,18 @@ async fn setup_base_node_services(
.await
.unwrap();

let comms = handles
.take_handle::<UnspawnedCommsNode>()
.expect("P2pInitializer was not added to the stack or did not add UnspawnedCommsNode");
let p2p_config = P2pConfig::default();
let comms = setup_base_node_rpc_service(comms, &handles, blockchain_db.into(), &p2p_config);
let base_node_service = handles.expect_handle::<LocalNodeCommsInterface>();
let rpc_server = RpcServer::builder()
.with_maximum_simultaneous_sessions(p2p_config.rpc_max_simultaneous_sessions)
.with_maximum_sessions_per_client(p2p_config.rpc_max_sessions_per_peer)
.finish();
let rpc_server = rpc_server
.add_service(base_node::create_base_node_sync_rpc_service(
blockchain_db.clone().into(),
base_node_service,
));
handles.register(rpc_server.get_handle());
comms.add_protocol_extension(rpc_server);

let outbound_nci = handles.expect_handle::<OutboundNodeCommsInterface>();
let local_nci = handles.expect_handle::<LocalNodeCommsInterface>();
Expand Down Expand Up @@ -437,28 +559,3 @@ async fn setup_base_node_services(
state_machine_handle,
}
}

fn setup_base_node_rpc_service(
comms: UnspawnedCommsNode,
handles: &ServiceHandles,
db: AsyncBlockchainDb<B>,
config: &P2pConfig,
) -> UnspawnedCommsNode {
let base_node_service = handles.expect_handle::<LocalNodeCommsInterface>();
let rpc_server = RpcServer::builder()
.with_maximum_simultaneous_sessions(config.rpc_max_simultaneous_sessions)
.with_maximum_sessions_per_client(config.rpc_max_sessions_per_peer)
.finish();

// Add your RPC services here ‍🏴‍☠️️☮️🌊
let rpc_server = rpc_server
.add_service(base_node::create_base_node_sync_rpc_service(
db.clone(),
base_node_service,
));

handles.register(rpc_server.get_handle());

comms.add_protocol_extension(rpc_server)
}

2 changes: 2 additions & 0 deletions base_layer/core/tests/tests/header_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use tempfile::tempdir;
use tokio::{
sync::{broadcast, watch},
};
use tari_p2p::P2pConfig;

use crate::helpers::{
block_builders::{append_block, chain_block, create_genesis_block},
Expand Down Expand Up @@ -74,6 +75,7 @@ async fn test_header_sync() {
auto_ping_interval: Some(Duration::from_millis(100)),
..Default::default()
},
P2pConfig::default(),
consensus_manager,
temp_dir.path().to_str().unwrap(),
)
Expand Down
2 changes: 1 addition & 1 deletion base_layer/p2p/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ fn acquire_exclusive_file_lock(db_path: &Path) -> Result<File, CommsInitializati
///
/// ## Returns
/// A Result to determine if the call was successful or not, string will indicate the reason on error
async fn add_seed_peers(
pub async fn add_seed_peers(
peer_manager: &PeerManager,
node_identity: &NodeIdentity,
peers: Vec<Peer>,
Expand Down

0 comments on commit 96ffc72

Please sign in to comment.