Skip to content

Commit

Permalink
Merge branch 'main' into sdy/ans_txns
Browse files Browse the repository at this point in the history
  • Loading branch information
dermanyang authored Nov 5, 2024
2 parents e318e45 + 4af1500 commit c770646
Showing 1 changed file with 184 additions and 74 deletions.
258 changes: 184 additions & 74 deletions aptos-move/aptos-workspace-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,35 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use anyhow::{anyhow, Context, Result};
use aptos::node::local_testnet::HealthChecker;
use aptos_config::config::{NodeConfig, TableInfoServiceMode};
use aptos_faucet_core::server::{FunderKeyEnum, RunConfig};
use aptos_node::{load_node_config, start_and_report_ports};
use aptos_types::network_address::{NetworkAddress, Protocol};
use futures::channel::oneshot;
use futures::{channel::oneshot, future::Shared, FutureExt};
use rand::{rngs::StdRng, SeedableRng};
use std::{
future::Future,
net::{IpAddr, Ipv4Addr},
path::Path,
path::{Path, PathBuf},
sync::Arc,
thread,
time::Duration,
};
use url::Url;

const IP_LOCAL_HOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));

/// Converts a future into a shared one by putting the error into an Arc.
fn make_shared<F, T, E>(fut: F) -> Shared<impl Future<Output = Result<T, Arc<E>>>>
where
T: Clone,
F: Future<Output = Result<T, E>>,
{
fut.map(|r| r.map_err(|err| Arc::new(err))).shared()
}

/// Sets all ports in the node config to zero so the OS can assign them random ones.
pub fn zero_all_ports(config: &mut NodeConfig) {
// TODO: Double check if all ports are covered.

Expand All @@ -42,7 +55,17 @@ pub fn zero_all_ports(config: &mut NodeConfig) {
}
}

async fn spawn_node(test_dir: &Path) -> Result<()> {
/// Starts a local node and returns three futures:
/// 1. A future for the node API, which resolves to the port number once the service is fully up.
/// 2. A future for the indexer gRPC, which resolves to the port number once the service is fully up.
/// 3. A final future that resolves when the node stops.
fn start_node(
test_dir: &Path,
) -> Result<(
impl Future<Output = Result<u16>>,
impl Future<Output = Result<u16>>,
impl Future<Output = Result<()>>,
)> {
let rng = StdRng::from_entropy();

let mut node_config = load_node_config(
Expand All @@ -62,17 +85,11 @@ async fn spawn_node(test_dir: &Path) -> Result<()> {

node_config.indexer_table_info.table_info_service_mode = TableInfoServiceMode::IndexingOnly;

node_config
.api
.address
.set_ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
node_config
.indexer_grpc
.address
.set_ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
node_config.api.address.set_ip(IP_LOCAL_HOST);
node_config.indexer_grpc.address.set_ip(IP_LOCAL_HOST);

node_config.admin_service.address = "127.0.0.1".to_string();
node_config.inspection_service.address = "127.0.0.1".to_string();
node_config.admin_service.address = IP_LOCAL_HOST.to_string();
node_config.inspection_service.address = IP_LOCAL_HOST.to_string();

let (api_port_tx, api_port_rx) = oneshot::channel();
let (indexer_grpc_port_tx, indexer_grpc_port_rx) = oneshot::channel();
Expand All @@ -91,74 +108,169 @@ async fn spawn_node(test_dir: &Path) -> Result<()> {
}
};

let _node_thread_handle = thread::spawn(move || {
let res = run_node();
let node_thread_handle = thread::spawn(run_node);

if let Err(err) = res {
println!("Node stopped unexpectedly {:?}", err);
}
let fut_node_finish = async {
let join_handle = tokio::task::spawn_blocking(move || -> Result<()> {
node_thread_handle
.join()
.map_err(|_err| anyhow!("failed to wait for node thread"))?
});

join_handle
.await
.map_err(|err| anyhow!("failed to join node task: {}", err))?
};

let fut_api = async move {
let api_port = api_port_rx.await?;

let api_health_checker = HealthChecker::NodeApi(
Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, api_port)).unwrap(),
);
api_health_checker.wait(None).await?;

println!(
"Node API is ready. Endpoint: http://{}:{}/",
IP_LOCAL_HOST, api_port
);

Ok(api_port)
};

let fut_indexer_grpc = async move {
let indexer_grpc_port = indexer_grpc_port_rx.await?;

let indexer_grpc_health_checker = HealthChecker::DataServiceGrpc(
Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, indexer_grpc_port)).unwrap(),
);

indexer_grpc_health_checker.wait(None).await?;
println!(
"Transaction stream is ready. Endpoint: http://{}:{}/",
IP_LOCAL_HOST, indexer_grpc_port
);

Ok(indexer_grpc_port)
};

Ok((fut_api, fut_indexer_grpc, fut_node_finish))
}

/// Starts the faucet service and returns two futures.
/// 1. A future that resolves to the port used, once the faucet service is fully up.
/// 2. A future that resolves, when the service stops.
fn start_faucet(
test_dir: PathBuf,
fut_node_api: impl Future<Output = Result<u16, Arc<anyhow::Error>>> + Send + 'static,
fut_indexer_grpc: impl Future<Output = Result<u16, Arc<anyhow::Error>>> + Send + 'static,
) -> (
impl Future<Output = Result<u16>>,
impl Future<Output = Result<()>> + 'static,
) {
let (faucet_port_tx, faucet_port_rx) = oneshot::channel();

let handle_faucet = tokio::spawn(async move {
let api_port = fut_node_api
.await
.map_err(anyhow::Error::msg)
.context("failed to start faucet: node api did not start successfully")?;

fut_indexer_grpc
.await
.map_err(anyhow::Error::msg)
.context("failed to start faucet: indexer grpc did not start successfully")?;

let faucet_run_config = RunConfig::build_for_cli(
Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, api_port)).unwrap(),
IP_LOCAL_HOST.to_string(),
0,
FunderKeyEnum::KeyFile(test_dir.join("mint.key")),
false,
None,
);

faucet_run_config.run_and_report_port(faucet_port_tx).await
});

let api_port = api_port_rx.await?;
let indexer_grpc_port = indexer_grpc_port_rx.await?;
let fut_faucet_finish = async move {
handle_faucet
.await
.map_err(|err| anyhow!("failed to join handle task: {}", err))?
};

let api_health_checker = HealthChecker::NodeApi(
Url::parse(&format!(
"http://{}:{}",
node_config.api.address.ip(),
api_port
))
.unwrap(),
);
let indexer_grpc_health_checker = HealthChecker::DataServiceGrpc(
Url::parse(&format!(
"http://{}:{}",
node_config.indexer_grpc.address.ip(),
indexer_grpc_port
))
.unwrap(),
);
let fut_faucet_port = async move {
let faucet_port = faucet_port_rx
.await
.context("failed to receive faucet port")?;

api_health_checker.wait(None).await?;
eprintln!(
"Node API is ready. Endpoint: http://127.0.0.1:{}/",
api_port
);
let faucet_health_checker =
HealthChecker::http_checker_from_port(faucet_port, "Faucet".to_string());
faucet_health_checker.wait(None).await?;

indexer_grpc_health_checker.wait(None).await?;
eprintln!(
"Transaction stream is ready. Endpoint: http://127.0.0.1:{}/",
indexer_grpc_port
);
println!(
"Faucet is ready. Endpoint: http://{}:{}",
IP_LOCAL_HOST, faucet_port
);

let faucet_run_config = RunConfig::build_for_cli(
Url::parse(&format!(
"http://{}:{}",
node_config.api.address.ip(),
api_port
))
.unwrap(),
"127.0.0.1".to_string(),
0,
FunderKeyEnum::KeyFile(test_dir.join("mint.key")),
false,
None,
Ok(faucet_port)
};

(fut_faucet_port, fut_faucet_finish)
}

async fn start_all_services(test_dir: &Path) -> Result<()> {
// Step 1: spawn all services.
let (fut_node_api, fut_indexer_grpc, fut_node_finish) = start_node(test_dir)?;

let fut_node_api = make_shared(fut_node_api);
let fut_indexer_grpc = make_shared(fut_indexer_grpc);
let (fut_faucet, fut_faucet_finish) = start_faucet(
test_dir.to_owned(),
fut_node_api.clone(),
fut_indexer_grpc.clone(),
);

let (faucet_port_tx, faucet_port_rx) = oneshot::channel();
tokio::spawn(faucet_run_config.run_and_report_port(faucet_port_tx));
let (res_node_api, res_indexer_grpc, res_faucet) =
tokio::join!(fut_node_api, fut_indexer_grpc, fut_faucet);

let faucet_port = faucet_port_rx.await?;
// Step 2: wait for all services to be up.
res_node_api
.map_err(anyhow::Error::msg)
.context("failed to start node api")?;
res_indexer_grpc
.map_err(anyhow::Error::msg)
.context("failed to start node api")?;
res_faucet.context("failed to start faucet")?;

let faucet_health_checker =
HealthChecker::http_checker_from_port(faucet_port, "Faucet".to_string());
faucet_health_checker.wait(None).await?;
eprintln!(
"Faucet is ready. Endpoint: http://127.0.0.1:{}",
faucet_port
println!(
"Indexer API is ready. Endpoint: http://{}:0/",
IP_LOCAL_HOST
);

eprintln!("Indexer API is ready. Endpoint: http://127.0.0.1:0/");
println!("ALL SERVICES STARTED SUCCESSFULLY");

// Step 3: wait for services to stop.
tokio::pin!(fut_node_finish);
tokio::pin!(fut_faucet_finish);

let mut finished: u64 = 0;
while finished < 2 {
tokio::select! {
res = &mut fut_node_finish => {
if let Err(err) = res {
eprintln!("Node existed with error: {}", err);
}
finished += 1;
}
res = &mut fut_faucet_finish => {
if let Err(err) = res {
eprintln!("Faucet existed with error: {}", err);
}
finished += 1;
}
}
}

Ok(())
}
Expand All @@ -169,9 +281,7 @@ async fn main() -> Result<()> {

println!("Test directory: {}", test_dir.path().display());

spawn_node(test_dir.path()).await?;
start_all_services(test_dir.path()).await?;

loop {
tokio::time::sleep(Duration::from_millis(200)).await;
}
Ok(())
}

0 comments on commit c770646

Please sign in to comment.