diff --git a/aptos-move/aptos-workspace-server/src/main.rs b/aptos-move/aptos-workspace-server/src/main.rs index 0a6a5e5334355..8c730f28797dc 100644 --- a/aptos-move/aptos-workspace-server/src/main.rs +++ b/aptos-move/aptos-workspace-server/src/main.rs @@ -1,22 +1,35 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use anyhow::Result; +use anyhow::{anyhow, Context, Result}; use aptos::node::local_testnet::HealthChecker; use aptos_config::config::{NodeConfig, TableInfoServiceMode}; use aptos_faucet_core::server::{FunderKeyEnum, RunConfig}; use aptos_node::{load_node_config, start_and_report_ports}; use aptos_types::network_address::{NetworkAddress, Protocol}; -use futures::channel::oneshot; +use futures::{channel::oneshot, future::Shared, FutureExt}; use rand::{rngs::StdRng, SeedableRng}; use std::{ + future::Future, net::{IpAddr, Ipv4Addr}, - path::Path, + path::{Path, PathBuf}, + sync::Arc, thread, - time::Duration, }; use url::Url; +const IP_LOCAL_HOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); + +/// Converts a future into a shared one by putting the error into an Arc. +fn make_shared(fut: F) -> Shared>>> +where + T: Clone, + F: Future>, +{ + fut.map(|r| r.map_err(|err| Arc::new(err))).shared() +} + +/// Sets all ports in the node config to zero so the OS can assign them random ones. pub fn zero_all_ports(config: &mut NodeConfig) { // TODO: Double check if all ports are covered. @@ -42,7 +55,17 @@ pub fn zero_all_ports(config: &mut NodeConfig) { } } -async fn spawn_node(test_dir: &Path) -> Result<()> { +/// Starts a local node and returns three futures: +/// 1. A future for the node API, which resolves to the port number once the service is fully up. +/// 2. A future for the indexer gRPC, which resolves to the port number once the service is fully up. +/// 3. A final future that resolves when the node stops. +fn start_node( + test_dir: &Path, +) -> Result<( + impl Future>, + impl Future>, + impl Future>, +)> { let rng = StdRng::from_entropy(); let mut node_config = load_node_config( @@ -62,17 +85,11 @@ async fn spawn_node(test_dir: &Path) -> Result<()> { node_config.indexer_table_info.table_info_service_mode = TableInfoServiceMode::IndexingOnly; - node_config - .api - .address - .set_ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))); - node_config - .indexer_grpc - .address - .set_ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))); + node_config.api.address.set_ip(IP_LOCAL_HOST); + node_config.indexer_grpc.address.set_ip(IP_LOCAL_HOST); - node_config.admin_service.address = "127.0.0.1".to_string(); - node_config.inspection_service.address = "127.0.0.1".to_string(); + node_config.admin_service.address = IP_LOCAL_HOST.to_string(); + node_config.inspection_service.address = IP_LOCAL_HOST.to_string(); let (api_port_tx, api_port_rx) = oneshot::channel(); let (indexer_grpc_port_tx, indexer_grpc_port_rx) = oneshot::channel(); @@ -91,74 +108,169 @@ async fn spawn_node(test_dir: &Path) -> Result<()> { } }; - let _node_thread_handle = thread::spawn(move || { - let res = run_node(); + let node_thread_handle = thread::spawn(run_node); - if let Err(err) = res { - println!("Node stopped unexpectedly {:?}", err); - } + let fut_node_finish = async { + let join_handle = tokio::task::spawn_blocking(move || -> Result<()> { + node_thread_handle + .join() + .map_err(|_err| anyhow!("failed to wait for node thread"))? + }); + + join_handle + .await + .map_err(|err| anyhow!("failed to join node task: {}", err))? + }; + + let fut_api = async move { + let api_port = api_port_rx.await?; + + let api_health_checker = HealthChecker::NodeApi( + Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, api_port)).unwrap(), + ); + api_health_checker.wait(None).await?; + + println!( + "Node API is ready. Endpoint: http://{}:{}/", + IP_LOCAL_HOST, api_port + ); + + Ok(api_port) + }; + + let fut_indexer_grpc = async move { + let indexer_grpc_port = indexer_grpc_port_rx.await?; + + let indexer_grpc_health_checker = HealthChecker::DataServiceGrpc( + Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, indexer_grpc_port)).unwrap(), + ); + + indexer_grpc_health_checker.wait(None).await?; + println!( + "Transaction stream is ready. Endpoint: http://{}:{}/", + IP_LOCAL_HOST, indexer_grpc_port + ); + + Ok(indexer_grpc_port) + }; + + Ok((fut_api, fut_indexer_grpc, fut_node_finish)) +} + +/// Starts the faucet service and returns two futures. +/// 1. A future that resolves to the port used, once the faucet service is fully up. +/// 2. A future that resolves, when the service stops. +fn start_faucet( + test_dir: PathBuf, + fut_node_api: impl Future>> + Send + 'static, + fut_indexer_grpc: impl Future>> + Send + 'static, +) -> ( + impl Future>, + impl Future> + 'static, +) { + let (faucet_port_tx, faucet_port_rx) = oneshot::channel(); + + let handle_faucet = tokio::spawn(async move { + let api_port = fut_node_api + .await + .map_err(anyhow::Error::msg) + .context("failed to start faucet: node api did not start successfully")?; + + fut_indexer_grpc + .await + .map_err(anyhow::Error::msg) + .context("failed to start faucet: indexer grpc did not start successfully")?; + + let faucet_run_config = RunConfig::build_for_cli( + Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, api_port)).unwrap(), + IP_LOCAL_HOST.to_string(), + 0, + FunderKeyEnum::KeyFile(test_dir.join("mint.key")), + false, + None, + ); + + faucet_run_config.run_and_report_port(faucet_port_tx).await }); - let api_port = api_port_rx.await?; - let indexer_grpc_port = indexer_grpc_port_rx.await?; + let fut_faucet_finish = async move { + handle_faucet + .await + .map_err(|err| anyhow!("failed to join handle task: {}", err))? + }; - let api_health_checker = HealthChecker::NodeApi( - Url::parse(&format!( - "http://{}:{}", - node_config.api.address.ip(), - api_port - )) - .unwrap(), - ); - let indexer_grpc_health_checker = HealthChecker::DataServiceGrpc( - Url::parse(&format!( - "http://{}:{}", - node_config.indexer_grpc.address.ip(), - indexer_grpc_port - )) - .unwrap(), - ); + let fut_faucet_port = async move { + let faucet_port = faucet_port_rx + .await + .context("failed to receive faucet port")?; - api_health_checker.wait(None).await?; - eprintln!( - "Node API is ready. Endpoint: http://127.0.0.1:{}/", - api_port - ); + let faucet_health_checker = + HealthChecker::http_checker_from_port(faucet_port, "Faucet".to_string()); + faucet_health_checker.wait(None).await?; - indexer_grpc_health_checker.wait(None).await?; - eprintln!( - "Transaction stream is ready. Endpoint: http://127.0.0.1:{}/", - indexer_grpc_port - ); + println!( + "Faucet is ready. Endpoint: http://{}:{}", + IP_LOCAL_HOST, faucet_port + ); - let faucet_run_config = RunConfig::build_for_cli( - Url::parse(&format!( - "http://{}:{}", - node_config.api.address.ip(), - api_port - )) - .unwrap(), - "127.0.0.1".to_string(), - 0, - FunderKeyEnum::KeyFile(test_dir.join("mint.key")), - false, - None, + Ok(faucet_port) + }; + + (fut_faucet_port, fut_faucet_finish) +} + +async fn start_all_services(test_dir: &Path) -> Result<()> { + // Step 1: spawn all services. + let (fut_node_api, fut_indexer_grpc, fut_node_finish) = start_node(test_dir)?; + + let fut_node_api = make_shared(fut_node_api); + let fut_indexer_grpc = make_shared(fut_indexer_grpc); + let (fut_faucet, fut_faucet_finish) = start_faucet( + test_dir.to_owned(), + fut_node_api.clone(), + fut_indexer_grpc.clone(), ); - let (faucet_port_tx, faucet_port_rx) = oneshot::channel(); - tokio::spawn(faucet_run_config.run_and_report_port(faucet_port_tx)); + let (res_node_api, res_indexer_grpc, res_faucet) = + tokio::join!(fut_node_api, fut_indexer_grpc, fut_faucet); - let faucet_port = faucet_port_rx.await?; + // Step 2: wait for all services to be up. + res_node_api + .map_err(anyhow::Error::msg) + .context("failed to start node api")?; + res_indexer_grpc + .map_err(anyhow::Error::msg) + .context("failed to start node api")?; + res_faucet.context("failed to start faucet")?; - let faucet_health_checker = - HealthChecker::http_checker_from_port(faucet_port, "Faucet".to_string()); - faucet_health_checker.wait(None).await?; - eprintln!( - "Faucet is ready. Endpoint: http://127.0.0.1:{}", - faucet_port + println!( + "Indexer API is ready. Endpoint: http://{}:0/", + IP_LOCAL_HOST ); - eprintln!("Indexer API is ready. Endpoint: http://127.0.0.1:0/"); + println!("ALL SERVICES STARTED SUCCESSFULLY"); + + // Step 3: wait for services to stop. + tokio::pin!(fut_node_finish); + tokio::pin!(fut_faucet_finish); + + let mut finished: u64 = 0; + while finished < 2 { + tokio::select! { + res = &mut fut_node_finish => { + if let Err(err) = res { + eprintln!("Node existed with error: {}", err); + } + finished += 1; + } + res = &mut fut_faucet_finish => { + if let Err(err) = res { + eprintln!("Faucet existed with error: {}", err); + } + finished += 1; + } + } + } Ok(()) } @@ -169,9 +281,7 @@ async fn main() -> Result<()> { println!("Test directory: {}", test_dir.path().display()); - spawn_node(test_dir.path()).await?; + start_all_services(test_dir.path()).await?; - loop { - tokio::time::sleep(Duration::from_millis(200)).await; - } + Ok(()) }