diff --git a/Cargo.lock b/Cargo.lock index 3b429c2e148c0..0e9aa46fcaee1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4691,11 +4691,19 @@ dependencies = [ "aptos-faucet-core", "aptos-node", "aptos-types", + "bollard", + "diesel", + "diesel-async", "futures", + "maplit", + "processor", "rand 0.7.3", + "server-framework", "tempfile", "tokio", + "tokio-util 0.7.10", "url", + "uuid", ] [[package]] diff --git a/aptos-move/aptos-workspace-server/Cargo.toml b/aptos-move/aptos-workspace-server/Cargo.toml index 10739a87132d2..e72f59d6b7d2a 100644 --- a/aptos-move/aptos-workspace-server/Cargo.toml +++ b/aptos-move/aptos-workspace-server/Cargo.toml @@ -12,6 +12,7 @@ repository = { workspace = true } rust-version = { workspace = true } [dependencies] +# aptos deps aptos = { workspace = true } aptos-cached-packages = { workspace = true } aptos-config = { workspace = true } @@ -19,9 +20,22 @@ aptos-faucet-core = { workspace = true } aptos-node = { workspace = true } aptos-types = { workspace = true } +# third party deps anyhow = { workspace = true } +bollard = { workspace = true } +diesel = { workspace = true, features = [ + "postgres_backend", +] } +diesel-async = { workspace = true } futures = { workspace = true } +maplit = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } url = { workspace = true } +uuid = { workspace = true } + +# indexer deps +processor = { git = "https://github.com/aptos-labs/aptos-indexer-processors.git", rev = "51a34901b40d7f75767ac907b4d2478104d6a515", default-features = false } +server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processors.git", rev = "51a34901b40d7f75767ac907b4d2478104d6a515" } diff --git a/aptos-move/aptos-workspace-server/src/common.rs b/aptos-move/aptos-workspace-server/src/common.rs new file mode 100644 index 0000000000000..a33302263bd68 --- /dev/null +++ b/aptos-move/aptos-workspace-server/src/common.rs @@ -0,0 +1,23 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +//! Common utilities and constants for networking and asynchronous operations. + +use futures::{future::Shared, FutureExt}; +use std::{ + future::Future, + net::{IpAddr, Ipv4Addr}, + sync::Arc, +}; + +/// The local IP address services are bound to. +pub(crate) const IP_LOCAL_HOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); + +/// Converts a future into a shared future by wrapping the error in an `Arc`. +pub(crate) fn make_shared(fut: F) -> Shared>>> +where + T: Clone, + F: Future>, +{ + fut.map(|r| r.map_err(|err| Arc::new(err))).shared() +} diff --git a/aptos-move/aptos-workspace-server/src/main.rs b/aptos-move/aptos-workspace-server/src/main.rs index 8c730f28797dc..34a66ea96f942 100644 --- a/aptos-move/aptos-workspace-server/src/main.rs +++ b/aptos-move/aptos-workspace-server/src/main.rs @@ -1,277 +1,179 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use anyhow::{anyhow, Context, Result}; -use aptos::node::local_testnet::HealthChecker; -use aptos_config::config::{NodeConfig, TableInfoServiceMode}; -use aptos_faucet_core::server::{FunderKeyEnum, RunConfig}; -use aptos_node::{load_node_config, start_and_report_ports}; -use aptos_types::network_address::{NetworkAddress, Protocol}; -use futures::{channel::oneshot, future::Shared, FutureExt}; -use rand::{rngs::StdRng, SeedableRng}; -use std::{ - future::Future, - net::{IpAddr, Ipv4Addr}, - path::{Path, PathBuf}, - sync::Arc, - thread, +//! This binary runs and manages a set of services that makes up a local Aptos network. +//! - node +//! - node API +//! - indexer grpc +//! - faucet +//! - indexer +//! - postgres db +//! - processors +//! - indexer API +//! +//! The services are bound to unique OS-assigned ports to allow for multiple local networks +//! to operate simultaneously, enabling testing and development in isolated environments. +//! +//! ## Key Features: +//! - Shared Futures +//! - The code makes extensive use of shared futures across multiple services, +//! ensuring orderly startup while maximizing parallel execution. +//! - Graceful Shutdown +//! - When a `Ctrl-C` signal is received or if any of the services fail to start +//! or exit unexpectedly, the system attempts to gracefully shut down all services, +//! cleaning up resources like Docker containers, volumes and networks. + +mod common; +mod services; + +use anyhow::{Context, Result}; +use common::make_shared; +use futures::TryFutureExt; +use services::{ + docker_common::create_docker_network, indexer_api::start_indexer_api, + processors::start_all_processors, }; -use url::Url; +use std::path::Path; +use tokio_util::sync::CancellationToken; +use uuid::Uuid; -const IP_LOCAL_HOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); +async fn run_all_services(test_dir: &Path) -> Result<()> { + let instance_id = Uuid::new_v4(); -/// Converts a future into a shared one by putting the error into an Arc. -fn make_shared(fut: F) -> Shared>>> -where - T: Clone, - F: Future>, -{ - fut.map(|r| r.map_err(|err| Arc::new(err))).shared() -} - -/// Sets all ports in the node config to zero so the OS can assign them random ones. -pub fn zero_all_ports(config: &mut NodeConfig) { - // TODO: Double check if all ports are covered. - - config.admin_service.port = 0; - config.api.address.set_port(0); - config.inspection_service.port = 0; - config.storage.backup_service_address.set_port(0); - config.indexer_grpc.address.set_port(0); - - if let Some(network) = config.validator_network.as_mut() { - network.listen_address = NetworkAddress::from_protocols(vec![ - Protocol::Ip4("0.0.0.0".parse().unwrap()), - Protocol::Tcp(0), - ]) - .unwrap(); - } - for network in config.full_node_networks.iter_mut() { - network.listen_address = NetworkAddress::from_protocols(vec![ - Protocol::Ip4("0.0.0.0".parse().unwrap()), - Protocol::Tcp(0), - ]) - .unwrap(); - } -} - -/// Starts a local node and returns three futures: -/// 1. A future for the node API, which resolves to the port number once the service is fully up. -/// 2. A future for the indexer gRPC, which resolves to the port number once the service is fully up. -/// 3. A final future that resolves when the node stops. -fn start_node( - test_dir: &Path, -) -> Result<( - impl Future>, - impl Future>, - impl Future>, -)> { - let rng = StdRng::from_entropy(); - - let mut node_config = load_node_config( - &None, - &None, - test_dir, - false, - false, - false, - aptos_cached_packages::head_release_bundle(), - rng, - )?; - - zero_all_ports(&mut node_config); - node_config.indexer_grpc.enabled = true; - node_config.indexer_grpc.use_data_service_interface = true; + // Phase 0: Register the signal handler for ctrl-c. + let shutdown = CancellationToken::new(); + { + // TODO: Find a way to register the signal handler in a blocking manner without + // waiting for it to trigger. + let shutdown = shutdown.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); - node_config.indexer_table_info.table_info_service_mode = TableInfoServiceMode::IndexingOnly; + println!("\nCtrl-C received. Shutting down services. This may take a while.\n"); - node_config.api.address.set_ip(IP_LOCAL_HOST); - node_config.indexer_grpc.address.set_ip(IP_LOCAL_HOST); - - node_config.admin_service.address = IP_LOCAL_HOST.to_string(); - node_config.inspection_service.address = IP_LOCAL_HOST.to_string(); - - let (api_port_tx, api_port_rx) = oneshot::channel(); - let (indexer_grpc_port_tx, indexer_grpc_port_rx) = oneshot::channel(); - - let run_node = { - let test_dir = test_dir.to_owned(); - let node_config = node_config.clone(); - move || -> Result<()> { - start_and_report_ports( - node_config, - Some(test_dir.join("validator.log")), - false, - Some(api_port_tx), - Some(indexer_grpc_port_tx), - ) - } - }; - - let node_thread_handle = thread::spawn(run_node); - - let fut_node_finish = async { - let join_handle = tokio::task::spawn_blocking(move || -> Result<()> { - node_thread_handle - .join() - .map_err(|_err| anyhow!("failed to wait for node thread"))? + shutdown.cancel(); }); + } - join_handle - .await - .map_err(|err| anyhow!("failed to join node task: {}", err))? - }; - - let fut_api = async move { - let api_port = api_port_rx.await?; - - let api_health_checker = HealthChecker::NodeApi( - Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, api_port)).unwrap(), - ); - api_health_checker.wait(None).await?; - - println!( - "Node API is ready. Endpoint: http://{}:{}/", - IP_LOCAL_HOST, api_port - ); - - Ok(api_port) - }; - - let fut_indexer_grpc = async move { - let indexer_grpc_port = indexer_grpc_port_rx.await?; - - let indexer_grpc_health_checker = HealthChecker::DataServiceGrpc( - Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, indexer_grpc_port)).unwrap(), - ); - - indexer_grpc_health_checker.wait(None).await?; - println!( - "Transaction stream is ready. Endpoint: http://{}:{}/", - IP_LOCAL_HOST, indexer_grpc_port - ); - - Ok(indexer_grpc_port) - }; - - Ok((fut_api, fut_indexer_grpc, fut_node_finish)) -} - -/// Starts the faucet service and returns two futures. -/// 1. A future that resolves to the port used, once the faucet service is fully up. -/// 2. A future that resolves, when the service stops. -fn start_faucet( - test_dir: PathBuf, - fut_node_api: impl Future>> + Send + 'static, - fut_indexer_grpc: impl Future>> + Send + 'static, -) -> ( - impl Future>, - impl Future> + 'static, -) { - let (faucet_port_tx, faucet_port_rx) = oneshot::channel(); - - let handle_faucet = tokio::spawn(async move { - let api_port = fut_node_api - .await - .map_err(anyhow::Error::msg) - .context("failed to start faucet: node api did not start successfully")?; - - fut_indexer_grpc - .await - .map_err(anyhow::Error::msg) - .context("failed to start faucet: indexer grpc did not start successfully")?; - - let faucet_run_config = RunConfig::build_for_cli( - Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, api_port)).unwrap(), - IP_LOCAL_HOST.to_string(), - 0, - FunderKeyEnum::KeyFile(test_dir.join("mint.key")), - false, - None, - ); - - faucet_run_config.run_and_report_port(faucet_port_tx).await - }); - - let fut_faucet_finish = async move { - handle_faucet - .await - .map_err(|err| anyhow!("failed to join handle task: {}", err))? - }; - - let fut_faucet_port = async move { - let faucet_port = faucet_port_rx - .await - .context("failed to receive faucet port")?; - - let faucet_health_checker = - HealthChecker::http_checker_from_port(faucet_port, "Faucet".to_string()); - faucet_health_checker.wait(None).await?; - - println!( - "Faucet is ready. Endpoint: http://{}:{}", - IP_LOCAL_HOST, faucet_port - ); - - Ok(faucet_port) - }; - - (fut_faucet_port, fut_faucet_finish) -} - -async fn start_all_services(test_dir: &Path) -> Result<()> { - // Step 1: spawn all services. - let (fut_node_api, fut_indexer_grpc, fut_node_finish) = start_node(test_dir)?; + // Phase 1: Start all services. + // Node + let (fut_node_api, fut_indexer_grpc, fut_node_finish) = services::node::start_node(test_dir)?; let fut_node_api = make_shared(fut_node_api); let fut_indexer_grpc = make_shared(fut_indexer_grpc); - let (fut_faucet, fut_faucet_finish) = start_faucet( + + // Faucet + let (fut_faucet, fut_faucet_finish) = services::faucet::start_faucet( test_dir.to_owned(), fut_node_api.clone(), fut_indexer_grpc.clone(), ); - let (res_node_api, res_indexer_grpc, res_faucet) = - tokio::join!(fut_node_api, fut_indexer_grpc, fut_faucet); - - // Step 2: wait for all services to be up. - res_node_api - .map_err(anyhow::Error::msg) - .context("failed to start node api")?; - res_indexer_grpc - .map_err(anyhow::Error::msg) - .context("failed to start node api")?; - res_faucet.context("failed to start faucet")?; + // Docker Network + let docker_network_name = format!("aptos-workspace-{}", instance_id); + let (fut_docker_network, fut_docker_network_clean_up) = + create_docker_network(shutdown.clone(), docker_network_name); + + // Indexer part 1: postgres db + let (fut_postgres, fut_postgres_finish, fut_postgres_clean_up) = + services::postgres::start_postgres( + shutdown.clone(), + fut_docker_network.clone(), + instance_id, + ); + let fut_postgres = make_shared(fut_postgres); - println!( - "Indexer API is ready. Endpoint: http://{}:0/", - IP_LOCAL_HOST + // Indexer part 2: processors + let (fut_all_processors_ready, fut_any_processor_finish) = start_all_processors( + fut_node_api.clone(), + fut_indexer_grpc.clone(), + fut_postgres.clone(), + ); + let fut_all_processors_ready = make_shared(fut_all_processors_ready); + + // Indexer part 3: indexer API + let (fut_indexer_api, fut_indexer_api_finish, fut_indexer_api_clean_up) = start_indexer_api( + instance_id, + shutdown.clone(), + fut_docker_network.clone(), + fut_postgres.clone(), + fut_all_processors_ready.clone(), ); - println!("ALL SERVICES STARTED SUCCESSFULLY"); - - // Step 3: wait for services to stop. - tokio::pin!(fut_node_finish); - tokio::pin!(fut_faucet_finish); + // Phase 2: Wait for all services to be up. + let all_services_up = async move { + tokio::try_join!( + fut_node_api.map_err(anyhow::Error::msg), + fut_indexer_grpc.map_err(anyhow::Error::msg), + fut_faucet, + fut_postgres.map_err(anyhow::Error::msg), + fut_all_processors_ready.map_err(anyhow::Error::msg), + fut_indexer_api, + ) + }; + let clean_up_all = async move { + eprintln!("Running shutdown steps"); + fut_indexer_api_clean_up.await; + fut_postgres_clean_up.await; + fut_docker_network_clean_up.await; + }; + tokio::select! { + _ = shutdown.cancelled() => { + clean_up_all.await; - let mut finished: u64 = 0; - while finished < 2 { - tokio::select! { - res = &mut fut_node_finish => { - if let Err(err) = res { - eprintln!("Node existed with error: {}", err); + return Ok(()) + } + res = all_services_up => { + match res.context("one or more services failed to start") { + Ok(_) => println!("ALL SERVICES UP"), + Err(err) => { + eprintln!("\nOne or more services failed to start, will run shutdown steps\n"); + clean_up_all.await; + + return Err(err) } - finished += 1; } - res = &mut fut_faucet_finish => { - if let Err(err) = res { - eprintln!("Faucet existed with error: {}", err); - } - finished += 1; + } + } + + // Phase 3: Wait for services to stop, which should only happen in case of an error, or + // the shutdown signal to be received. + tokio::select! { + _ = shutdown.cancelled() => (), + res = fut_node_finish => { + eprintln!("Node exited unexpectedly"); + if let Err(err) = res { + eprintln!("Error: {}", err); + } + } + res = fut_faucet_finish => { + eprintln!("Faucet exited unexpectedly"); + if let Err(err) = res { + eprintln!("Error: {}", err); + } + } + res = fut_postgres_finish => { + eprintln!("Postgres exited unexpectedly"); + if let Err(err) = res { + eprintln!("Error: {}", err); + } + } + res = fut_any_processor_finish => { + eprintln!("One of the processors exited unexpectedly"); + if let Err(err) = res { + eprintln!("Error: {}", err); + } + } + res = fut_indexer_api_finish => { + eprintln!("Indexer API exited unexpectedly"); + if let Err(err) = res { + eprintln!("Error: {}", err); } } } + clean_up_all.await; + Ok(()) } @@ -281,7 +183,9 @@ async fn main() -> Result<()> { println!("Test directory: {}", test_dir.path().display()); - start_all_services(test_dir.path()).await?; + run_all_services(test_dir.path()).await?; + + println!("Finished running all services"); Ok(()) } diff --git a/aptos-move/aptos-workspace-server/src/services/docker_common.rs b/aptos-move/aptos-workspace-server/src/services/docker_common.rs new file mode 100644 index 0000000000000..7315a0fd28391 --- /dev/null +++ b/aptos-move/aptos-workspace-server/src/services/docker_common.rs @@ -0,0 +1,353 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::common::make_shared; +use anyhow::{anyhow, bail, Context, Result}; +use aptos::node::local_testnet::docker; +use bollard::{ + container::{CreateContainerOptions, InspectContainerOptions, StartContainerOptions}, + network::CreateNetworkOptions, + secret::ContainerInspectResponse, + volume::CreateVolumeOptions, +}; +use std::{future::Future, sync::Arc}; +use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; + +/// Creates a Docker network asynchronously and provides a cleanup task for network removal. +/// +/// A cancellation token can be used to signal an early shutdown, allowing the creation task to +/// abort without performing any additional steps. +/// +/// It returns two futures +/// 1. One that creates the Docker network +/// 2. Another that handles the cleanup (removal of the network) +/// +/// As the caller, you should always await the cleanup task when you are ready to shutdown the +/// service. The cleanup task will only attempt to remove the network if it may have been created. +/// +/// Note that the cleanup is a "best-effort" operation -- success is not guaranteed due to +/// reliance on external commands, which may fail for various reasons. +pub fn create_docker_network( + shutdown: CancellationToken, + name: String, +) -> ( + impl Future>> + Clone, + impl Future, +) { + // Flag indicating whether cleanup is needed. + // + // Note: The `Arc>` is used to satisfy Rust's borrow checking rules. + // Exclusive access is ensured by the sequencing of the futures. + let needs_cleanup = Arc::new(Mutex::new(false)); + + let fut_create_network = make_shared({ + let needs_cleanup = needs_cleanup.clone(); + let name = name.clone(); + + let handle = tokio::spawn(async move { + let docker = tokio::select! { + _ = shutdown.cancelled() => { + bail!("failed to create docker network: cancelled") + } + res = docker::get_docker() => { + res.context("failed to create docker network")? + } + }; + + *needs_cleanup.lock().await = true; + + docker + .create_network(CreateNetworkOptions { + name: name.clone(), + internal: false, + check_duplicate: true, + ..Default::default() + }) + .await + .context("failed to create docker network")?; + + println!("Created docker network {}", name); + + Ok(name) + }); + + async move { + handle + .await + .map_err(|err| anyhow!("failed to join task handle: {}", err))? + } + }); + + let fut_clean_up = { + // Note: The creation task must be allowed to finish, even if a shutdown signal or other + // early abort signal is received. This is to prevent race conditions. + // + // Do not abort the creation task prematurely -- let it either finish or handle its own abort. + let fut_create_network = fut_create_network.clone(); + + async move { + _ = fut_create_network.await; + + let network_name = name.as_str(); + let cleanup = async move { + if *needs_cleanup.lock().await { + let docker = docker::get_docker().await?; + docker.remove_network(network_name).await?; + } + + anyhow::Ok(()) + }; + + match cleanup.await { + Ok(_) => { + println!("Removed docker network {}", name); + }, + Err(err) => { + eprintln!("Failed to remove docker network {}: {}", name, err) + }, + } + } + }; + + (fut_create_network, fut_clean_up) +} + +/// Creates a Docker volume asynchronously and provides a cleanup task for volume removal. +/// +/// A cancellation token can be used to signal an early shutdown, allowing the creation task to +/// abort without performing any additional steps. +/// +/// It returns two futures +/// 1. One that creates the Docker volume +/// 2. Another that handles the cleanup (removal of the volume) +/// +/// As the caller, you should always await the cleanup task when you are ready to shutdown the +/// service. The cleanup task will only attempt to remove the volume if it may have been created. +/// +/// Note that the cleanup is a "best-effort" operation -- success is not guaranteed due to +/// success is not guaranteed due to the reliance on external commands, which may fail for +/// various reasons. +pub fn create_docker_volume( + shutdown: CancellationToken, + name: String, +) -> ( + impl Future>> + Clone, + impl Future, +) { + // Flag indicating whether cleanup is needed. + // + // Note: The `Arc>` is used to satisfy Rust's borrow checking rules. + // Exclusive access is ensured by the sequencing of the futures. + let needs_cleanup = Arc::new(Mutex::new(false)); + + let fut_create_volume = make_shared({ + let needs_cleanup = needs_cleanup.clone(); + let name = name.clone(); + + let handle = tokio::spawn(async move { + let docker = tokio::select! { + _ = shutdown.cancelled() => { + bail!("failed to create docker volume: cancelled") + } + res = docker::get_docker() => { + res.context("failed to create docker volume")? + } + }; + + *needs_cleanup.lock().await = true; + + docker + .create_volume(CreateVolumeOptions { + name: name.clone(), + ..Default::default() + }) + .await + .context("failed to create docker volume")?; + + println!("Created docker volume {}", name); + + Ok(name) + }); + + async move { + handle + .await + .map_err(|err| anyhow!("failed to join task handle: {}", err))? + } + }); + + let fut_clean_up = { + // Note: The creation task must be allowed to finish, even if a shutdown signal or other + // early abort signal is received. This is to prevent race conditions. + // + // Do not abort the creation task prematurely -- let it either finish or handle its own abort. + let fut_create_volume = fut_create_volume.clone(); + + async move { + _ = fut_create_volume.await; + + let volume_name = name.as_str(); + let cleanup = async move { + if *needs_cleanup.lock().await { + let docker = docker::get_docker().await?; + docker.remove_volume(volume_name, None).await?; + } + + anyhow::Ok(()) + }; + + match cleanup.await { + Ok(_) => { + println!("Removed docker volume {}", name); + }, + Err(err) => { + eprintln!("Failed to remove docker volume {}: {}", name, err) + }, + } + } + }; + + (fut_create_volume, fut_clean_up) +} + +/// Creates, starts, and inspects a Docker container asynchronously, and provides a cleanup task +/// for stopping and removing the container. +/// +/// A cancellation token can be used to signal an early shutdown, allowing the creation task to +/// abort without performing any additional steps. +/// +/// It returns two futures +/// 1. One that creates the container, starts it, and inspects it. +/// It resolves with the container's inspection info, such as port binding. +/// 2. Another that handles the cleanup (stopping and removing the container) +/// +/// As the caller, you should always await the cleanup task when you are ready to shutdown the +/// service. The cleanup task will only attempt to stop or remove the container if it may have +/// gotten past the respective states. +/// +/// Note that the cleanup is a "best-effort" operation -- success is not guaranteed due to +/// success is not guaranteed due to the reliance on external commands, which may fail for +/// various reasons. +pub fn create_start_and_inspect_container( + shutdown: CancellationToken, + options: CreateContainerOptions, + config: bollard::container::Config, +) -> ( + impl Future, Arc>> + Clone, + impl Future, +) { + #[derive(PartialEq, Eq, Clone, Copy)] + enum State { + Initial = 0, + Created = 1, + Started = 2, + } + + // Flag indicating the current stage of the creation task and which resources need + // to be cleaned up. + // + // Note: The `Arc>` is used to satisfy Rust's borrow checking rules. + // Exclusive access is ensured by the sequencing of the futures. + let state = Arc::new(Mutex::new(State::Initial)); + let name = options.name.clone(); + + let fut_run = make_shared({ + let state = state.clone(); + let name = name.clone(); + + let handle = tokio::spawn(async move { + let docker = tokio::select! { + _ = shutdown.cancelled() => { + bail!("failed to create docker container: cancelled") + } + res = docker::get_docker() => { + res.context("failed to create docker container")? + } + }; + + let mut state = state.lock().await; + + *state = State::Created; + docker + .create_container(Some(options), config) + .await + .context("failed to create docker container")?; + println!("Created docker container {}", name); + + if shutdown.is_cancelled() { + bail!("failed to start docker container: cancelled") + } + *state = State::Started; + docker + .start_container(&name, None::>) + .await + .context("failed to start docker container")?; + println!("Started docker container {}", name); + + if shutdown.is_cancelled() { + bail!("failed to inspect docker container: cancelled") + } + let container_info = docker + .inspect_container(&name, Some(InspectContainerOptions::default())) + .await + .context("failed to inspect postgres container")?; + + Ok(Arc::new(container_info)) + }); + + async move { + handle + .await + .map_err(|err| anyhow!("failed to join task handle: {}", err))? + } + }); + + let fut_clean_up = { + let fut_run = fut_run.clone(); + + async move { + // Note: The creation task must be allowed to finish, even if a shutdown signal or other + // early abort signal is received. This is to prevent race conditions. + // + // Do not abort the creation task prematurely -- let it either finish or handle its own abort. + _ = fut_run.await; + + let state = state.lock().await; + + if *state == State::Initial { + return; + } + + let docker = match docker::get_docker().await { + Ok(docker) => docker, + Err(err) => { + eprintln!("Failed to clean up docker container {}: {}", name, err); + return; + }, + }; + + if *state == State::Started { + match docker.stop_container(name.as_str(), None).await { + Ok(_) => { + println!("Stopped docker container {}", name) + }, + Err(err) => { + eprintln!("Failed to stop docker container {}: {}", name, err) + }, + } + } + + match docker.remove_container(name.as_str(), None).await { + Ok(_) => { + println!("Removed docker container {}", name) + }, + Err(err) => { + eprintln!("Failed to remove docker container {}: {}", name, err) + }, + } + } + }; + + (fut_run, fut_clean_up) +} diff --git a/aptos-move/aptos-workspace-server/src/services/faucet.rs b/aptos-move/aptos-workspace-server/src/services/faucet.rs new file mode 100644 index 0000000000000..d44c8a7d4bd71 --- /dev/null +++ b/aptos-move/aptos-workspace-server/src/services/faucet.rs @@ -0,0 +1,81 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::common::IP_LOCAL_HOST; +use anyhow::{anyhow, Context, Result}; +use aptos::node::local_testnet::HealthChecker; +use aptos_faucet_core::server::{FunderKeyEnum, RunConfig}; +use futures::channel::oneshot; +use std::{future::Future, path::PathBuf, sync::Arc}; +use url::Url; + +/// Starts the faucet service. +/// +/// Prerequisites +/// - Node API +/// - Node indexer grpc +/// +/// This function returns two futures +/// - One that resolves to the port the faucet service is running on, once it has fully started. +/// - One that resolves when the faucet service stops, which should not normally happen unless +/// there is an error. +pub fn start_faucet( + test_dir: PathBuf, + fut_node_api: impl Future>> + Send + 'static, + fut_indexer_grpc: impl Future>> + Send + 'static, +) -> ( + impl Future>, + impl Future> + 'static, +) { + let (faucet_port_tx, faucet_port_rx) = oneshot::channel(); + + let handle_faucet = tokio::spawn(async move { + let api_port = fut_node_api + .await + .map_err(anyhow::Error::msg) + .context("failed to start faucet: node api did not start successfully")?; + + fut_indexer_grpc + .await + .map_err(anyhow::Error::msg) + .context("failed to start faucet: indexer grpc did not start successfully")?; + + println!("Starting faucet.."); + + let faucet_run_config = RunConfig::build_for_cli( + Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, api_port)).unwrap(), + IP_LOCAL_HOST.to_string(), + 0, + FunderKeyEnum::KeyFile(test_dir.join("mint.key")), + false, + None, + ); + + faucet_run_config.run_and_report_port(faucet_port_tx).await + }); + + let fut_faucet_finish = async move { + handle_faucet + .await + .map_err(|err| anyhow!("failed to join task handle: {}", err))? + }; + + let fut_faucet_port = async move { + let faucet_port = faucet_port_rx + .await + .context("failed to receive faucet port")?; + + let faucet_health_checker = + HealthChecker::http_checker_from_port(faucet_port, "Faucet".to_string()); + faucet_health_checker.wait(None).await?; + + println!( + "Faucet is ready. Endpoint: http://{}:{}", + IP_LOCAL_HOST, faucet_port + ); + + Ok(faucet_port) + }; + + (fut_faucet_port, fut_faucet_finish) +} diff --git a/aptos-move/aptos-workspace-server/src/services/indexer_api.rs b/aptos-move/aptos-workspace-server/src/services/indexer_api.rs new file mode 100644 index 0000000000000..159470709912b --- /dev/null +++ b/aptos-move/aptos-workspace-server/src/services/indexer_api.rs @@ -0,0 +1,256 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::{ + docker_common::create_start_and_inspect_container, + postgres::get_postgres_connection_string_within_docker_network, +}; +use crate::common::{make_shared, IP_LOCAL_HOST}; +use anyhow::{anyhow, Context, Result}; +use aptos::node::local_testnet::{ + docker, + indexer_api::{post_metadata, HASURA_IMAGE, HASURA_METADATA}, + HealthChecker, +}; +use bollard::{ + container::{CreateContainerOptions, WaitContainerOptions}, + secret::{ContainerInspectResponse, HostConfig, PortBinding}, +}; +use futures::TryStreamExt; +use maplit::hashmap; +use std::{future::Future, sync::Arc}; +use tokio::{sync::Mutex, try_join}; +use tokio_util::sync::CancellationToken; +use url::Url; +use uuid::Uuid; + +const HASURA_DEFAULT_PORT: u16 = 8080; + +/// Extracts the host port assigned to the hasura container from its inspection data. +fn get_hasura_assigned_port(container_info: &ContainerInspectResponse) -> Option { + if let Some(port_bindings) = container_info + .network_settings + .as_ref() + .and_then(|ns| ns.ports.as_ref()) + { + if let Some(Some(bindings)) = port_bindings.get(&format!("{}/tcp", HASURA_DEFAULT_PORT)) { + if let Some(binding) = bindings.first() { + return binding + .host_port + .as_ref() + .and_then(|port| port.parse::().ok()); + } + } + } + None +} + +/// Returns the Docker container options and configuration to start a hasura container with +/// - The container bound to a Docker network (`network_name`). +/// - [`HASURA_DEFAULT_PORT`] mapped to a random OS-assigned host port. +fn create_container_options_and_config( + instance_id: Uuid, + network_name: String, +) -> ( + CreateContainerOptions, + bollard::container::Config, +) { + let postgres_connection_string = + get_postgres_connection_string_within_docker_network(instance_id); + + let host_config = HostConfig { + // Connect the container to the network we made in the postgres pre_run. + // This allows the indexer API to access the postgres container without + // routing through the host network. + network_mode: Some(network_name), + // This is necessary so connecting to the host postgres works on Linux. + extra_hosts: Some(vec!["host.docker.internal:host-gateway".to_string()]), + port_bindings: Some(hashmap! { + HASURA_DEFAULT_PORT.to_string() => Some(vec![PortBinding { + host_ip: Some(IP_LOCAL_HOST.to_string()), + host_port: None, + }]), + }), + ..Default::default() + }; + + let config = bollard::container::Config { + image: Some(HASURA_IMAGE.to_string()), + tty: Some(true), + exposed_ports: Some(hashmap! {HASURA_DEFAULT_PORT.to_string() => hashmap!{}}), + host_config: Some(host_config), + env: Some(vec![ + format!("PG_DATABASE_URL={}", postgres_connection_string), + format!( + "HASURA_GRAPHQL_METADATA_DATABASE_URL={}", + postgres_connection_string + ), + format!("INDEXER_V2_POSTGRES_URL={}", postgres_connection_string), + "HASURA_GRAPHQL_DEV_MODE=true".to_string(), + "HASURA_GRAPHQL_ENABLE_CONSOLE=true".to_string(), + // See the docs for the image, this is a magic path inside the + // container where they have already bundled in the UI assets. + "HASURA_GRAPHQL_CONSOLE_ASSETS_DIR=/srv/console-assets".to_string(), + format!("HASURA_GRAPHQL_SERVER_PORT={}", HASURA_DEFAULT_PORT), + ]), + ..Default::default() + }; + + let options = CreateContainerOptions { + name: format!("aptos-workspace-{}-indexer-api", instance_id), + ..Default::default() + }; + + (options, config) +} + +/// Starts the indexer API service, running in a docker container. +/// +/// Prerequisites +/// - Previous task to create the docker network +/// - Needs to be the same one the postgres container connects to +/// - Postgres DB (container) +/// - Indexer processors +/// +/// The function returns three futures: +/// - One that resolves to the host port that can be used to access the indexer API service +/// when it's fully up. +/// - One that resolves when the container stops (which it should not under normal operation). +/// - A cleanup task that stops the container and removes the associated data volume. +/// +/// As the caller, you should always await the cleanup task when you are ready to shutdown the +/// service. The cleanup is a "best-effort" operation -- success is not guaranteed +/// as it relies on external commands that may fail for various reasons. +pub fn start_indexer_api( + instance_id: Uuid, + shutdown: CancellationToken, + fut_docker_network: impl Future>> + + Clone + + Send + + 'static, + fut_postgres: impl Future>> + Clone + Send + 'static, + fut_all_processors_ready: impl Future>> + + Clone + + Send + + 'static, +) -> ( + impl Future>, + impl Future>, + impl Future, +) { + let fut_container_clean_up = Arc::new(Mutex::new(None)); + + let fut_create_indexer_api = make_shared({ + let fut_container_clean_up = fut_container_clean_up.clone(); + + async move { + let (docker_network_name, _postgres_port, _) = try_join!( + fut_docker_network, + fut_postgres, + fut_all_processors_ready + ) + .map_err(anyhow::Error::msg) + .context( + "failed to start indexer api server: one or more dependencies failed to start", + )?; + + println!("Starting indexer API.."); + + let (options, config) = + create_container_options_and_config(instance_id, docker_network_name); + let (fut_container, fut_container_cleanup) = + create_start_and_inspect_container(shutdown.clone(), options, config); + *fut_container_clean_up.lock().await = Some(fut_container_cleanup); + + let container_info = fut_container + .await + .map_err(anyhow::Error::msg) + .context("failed to start indexer api server")?; + + let indexer_api_port = get_hasura_assigned_port(&container_info) + .ok_or_else(|| anyhow!("failed to get indexer api server port"))?; + + anyhow::Ok(indexer_api_port) + } + }); + + let fut_indexer_api_port = { + let fut_create_indexer_api = fut_create_indexer_api.clone(); + + async move { + let indexer_api_port = fut_create_indexer_api.await.map_err(anyhow::Error::msg)?; + + let url = + Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, indexer_api_port)).unwrap(); + + // The first health checker waits for the service to be up at all. + let health_checker = HealthChecker::Http(url.clone(), "Indexer API".to_string()); + health_checker + .wait(None) + .await + .context("failed to wait for indexer API to be ready")?; + + println!("Indexer API is up, applying hasura metadata.."); + + // Apply the hasura metadata, with the second health checker waiting for it to succeed. + post_metadata(url.clone(), HASURA_METADATA) + .await + .context("failed to apply hasura metadata")?; + + let health_checker_metadata = HealthChecker::IndexerApiMetadata(url); + health_checker_metadata + .wait(None) + .await + .context("failed to wait for indexer API to be ready")?; + + println!( + "Indexer API is ready. Endpoint: http://{}:{}/", + IP_LOCAL_HOST, indexer_api_port + ); + + anyhow::Ok(indexer_api_port) + } + }; + + let fut_indexer_api_finish = async move { + let docker = docker::get_docker() + .await + .context("failed to wait on indexer api container")?; + + // Wait for the container to stop (which it shouldn't). + let _wait = docker + .wait_container( + &format!("aptos-workspace-{}-indexer-api", instance_id), + Some(WaitContainerOptions { + condition: "not-running", + }), + ) + .try_collect::>() + .await + .context("failed to wait on indexer api container")?; + + anyhow::Ok(()) + }; + + let fut_indexer_api_clean_up = { + // Note: The creation task must be allowed to finish, even if a shutdown signal or other + // early abort signal is received. This is to prevent race conditions. + // + // Do not abort the creation task prematurely -- let it either finish or handle its own abort. + let fut_create_indexer_api = fut_create_indexer_api.clone(); + + async move { + _ = fut_create_indexer_api.await; + + if let Some(fut_container_clean_up) = fut_container_clean_up.lock().await.take() { + fut_container_clean_up.await; + } + } + }; + + ( + fut_indexer_api_port, + fut_indexer_api_finish, + fut_indexer_api_clean_up, + ) +} diff --git a/aptos-move/aptos-workspace-server/src/services/mod.rs b/aptos-move/aptos-workspace-server/src/services/mod.rs new file mode 100644 index 0000000000000..5752f990549a8 --- /dev/null +++ b/aptos-move/aptos-workspace-server/src/services/mod.rs @@ -0,0 +1,9 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod docker_common; +pub mod faucet; +pub mod indexer_api; +pub mod node; +pub mod postgres; +pub mod processors; diff --git a/aptos-move/aptos-workspace-server/src/services/node.rs b/aptos-move/aptos-workspace-server/src/services/node.rs new file mode 100644 index 0000000000000..0ceb165606426 --- /dev/null +++ b/aptos-move/aptos-workspace-server/src/services/node.rs @@ -0,0 +1,150 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::common::IP_LOCAL_HOST; +use anyhow::{bail, Result}; +use aptos::node::local_testnet::HealthChecker; +use aptos_config::config::{NodeConfig, TableInfoServiceMode}; +use aptos_node::{load_node_config, start_and_report_ports}; +use aptos_types::network_address::{NetworkAddress, Protocol}; +use futures::channel::oneshot; +use rand::{rngs::StdRng, SeedableRng}; +use std::{future::Future, path::Path, thread, time::Duration}; +use url::Url; + +/// Sets all ports in the node config to zero so the OS can assign them random ones. +fn zero_all_ports(config: &mut NodeConfig) { + // TODO: Double check if all ports are covered. + + config.admin_service.port = 0; + config.api.address.set_port(0); + config.inspection_service.port = 0; + config.storage.backup_service_address.set_port(0); + config.indexer_grpc.address.set_port(0); + + if let Some(network) = config.validator_network.as_mut() { + network.listen_address = NetworkAddress::from_protocols(vec![ + Protocol::Ip4("0.0.0.0".parse().unwrap()), + Protocol::Tcp(0), + ]) + .unwrap(); + } + for network in config.full_node_networks.iter_mut() { + network.listen_address = NetworkAddress::from_protocols(vec![ + Protocol::Ip4("0.0.0.0".parse().unwrap()), + Protocol::Tcp(0), + ]) + .unwrap(); + } +} + +/// Returns the URL for connecting to the indexer grpc service. +/// +/// Note: This can only be used by clients running directly on the host machine, +/// not from within a docker container. +pub fn get_data_service_url(indexer_grpc_port: u16) -> Url { + Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, indexer_grpc_port)).unwrap() +} + +/// Starts a local node and returns three futures: +/// - A future for the node API, which resolves to the port number once the service is fully up. +/// - A future for the indexer gRPC, which resolves to the port number once the service is fully up. +/// - A future that resolves when the node stops, which should not normally happen unless there is +/// an error. +pub fn start_node( + test_dir: &Path, +) -> Result<( + impl Future>, + impl Future>, + impl Future>, +)> { + let rng = StdRng::from_entropy(); + + let mut node_config = load_node_config( + &None, + &None, + test_dir, + false, + false, + false, + aptos_cached_packages::head_release_bundle(), + rng, + )?; + + zero_all_ports(&mut node_config); + node_config.indexer_grpc.enabled = true; + node_config.indexer_grpc.use_data_service_interface = true; + + node_config.indexer_table_info.table_info_service_mode = TableInfoServiceMode::IndexingOnly; + + node_config.api.address.set_ip(IP_LOCAL_HOST); + node_config.indexer_grpc.address.set_ip(IP_LOCAL_HOST); + + node_config.admin_service.address = IP_LOCAL_HOST.to_string(); + node_config.inspection_service.address = IP_LOCAL_HOST.to_string(); + + let (api_port_tx, api_port_rx) = oneshot::channel(); + let (indexer_grpc_port_tx, indexer_grpc_port_rx) = oneshot::channel(); + + let run_node = { + println!("Starting node.."); + + let test_dir = test_dir.to_owned(); + let node_config = node_config.clone(); + move || -> Result<()> { + start_and_report_ports( + node_config, + Some(test_dir.join("validator.log")), + false, + Some(api_port_tx), + Some(indexer_grpc_port_tx), + ) + } + }; + + let node_thread_handle = thread::spawn(run_node); + + let fut_node_finish = async move { + // Note: we cannot join the thread here because that will cause the future to block, + // preventing the runtime from existing. + loop { + if node_thread_handle.is_finished() { + bail!("node finished unexpectedly"); + } + tokio::time::sleep(Duration::from_millis(200)).await; + } + }; + + let fut_api = async move { + let api_port = api_port_rx.await?; + + let api_health_checker = HealthChecker::NodeApi( + Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, api_port)).unwrap(), + ); + api_health_checker.wait(None).await?; + + println!( + "Node API is ready. Endpoint: http://{}:{}/", + IP_LOCAL_HOST, api_port + ); + + Ok(api_port) + }; + + let fut_indexer_grpc = async move { + let indexer_grpc_port = indexer_grpc_port_rx.await?; + + let indexer_grpc_health_checker = + HealthChecker::DataServiceGrpc(get_data_service_url(indexer_grpc_port)); + + indexer_grpc_health_checker.wait(None).await?; + println!( + "Transaction stream is ready. Endpoint: http://{}:{}/", + IP_LOCAL_HOST, indexer_grpc_port + ); + + Ok(indexer_grpc_port) + }; + + Ok((fut_api, fut_indexer_grpc, fut_node_finish)) +} diff --git a/aptos-move/aptos-workspace-server/src/services/postgres.rs b/aptos-move/aptos-workspace-server/src/services/postgres.rs new file mode 100644 index 0000000000000..1a84798a626d1 --- /dev/null +++ b/aptos-move/aptos-workspace-server/src/services/postgres.rs @@ -0,0 +1,263 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::docker_common::{create_docker_volume, create_start_and_inspect_container}; +use crate::common::{make_shared, IP_LOCAL_HOST}; +use anyhow::{anyhow, Context, Result}; +use aptos::node::local_testnet::{docker, HealthChecker}; +use bollard::{ + container::{CreateContainerOptions, WaitContainerOptions}, + secret::{ContainerInspectResponse, HostConfig, PortBinding}, +}; +use futures::TryStreamExt; +use maplit::hashmap; +use std::{future::Future, sync::Arc}; +use tokio::{sync::Mutex, try_join}; +use tokio_util::sync::CancellationToken; +use uuid::Uuid; + +const POSTGRES_DEFAULT_PORT: u16 = 5432; +const POSTGRES_IMAGE: &str = "postgres:14.11"; +const POSTGRES_USER: &str = "postgres"; +const POSTGRES_DB_NAME: &str = "local-testnet"; + +/// Extracts the host port assigned to the postgres container from its inspection data. +fn get_postgres_assigned_port(container_info: &ContainerInspectResponse) -> Option { + if let Some(port_bindings) = container_info + .network_settings + .as_ref() + .and_then(|ns| ns.ports.as_ref()) + { + if let Some(Some(bindings)) = port_bindings.get(&format!("{}/tcp", POSTGRES_DEFAULT_PORT)) { + if let Some(binding) = bindings.first() { + return binding + .host_port + .as_ref() + .and_then(|port| port.parse::().ok()); + } + } + } + None +} + +/// Constructs a connection string for accessing the postgres database from the host. +/// +/// Note: This connection string is intended for use on the host only. +/// If you want to access the database from within another docker container, +/// use [`get_postgres_connection_string_within_docker_network`] instead. +pub fn get_postgres_connection_string(postgres_port: u16) -> String { + format!( + "postgres://{}@{}:{}/{}", + POSTGRES_USER, IP_LOCAL_HOST, postgres_port, POSTGRES_DB_NAME + ) +} + +/// Constructs a connection string for accessing the postgres database from within +/// another docker container. +/// +/// Note: This connection string is intended for use within a docker network only. +/// If you want to access the database from clients running on the host directly, +/// use [`get_postgres_connection_string`] instead. +pub fn get_postgres_connection_string_within_docker_network(instance_id: Uuid) -> String { + format!( + "postgres://{}@aptos-workspace-{}-postgres:{}/{}", + POSTGRES_USER, instance_id, POSTGRES_DEFAULT_PORT, POSTGRES_DB_NAME + ) +} + +/// Returns the Docker container options and configuration to start a postgres container with +/// - The container bound to a Docker network (`network_name`). +/// - [`POSTGRES_DEFAULT_PORT`] mapped to a random OS-assigned host port. +/// - A volume (`volume_name`) mounted for data storage. +/// - Environment variables to configure postgres with +/// - Authentication method: trust +/// - User: [`POSTGRES_USER`], +/// - Database: [`POSTGRES_DB_NAME`]. +fn create_container_options_and_config( + instance_id: Uuid, + network_name: String, + volume_name: String, +) -> ( + CreateContainerOptions, + bollard::container::Config, +) { + let host_config = Some(HostConfig { + // Bind the container to the network we created in the pre_run. This does + // not prevent the binary in the container from exposing itself to the host + // on 127.0.0.1. See more here: https://stackoverflow.com/a/77432636/3846032. + network_mode: Some(network_name.clone()), + port_bindings: Some(hashmap! { + POSTGRES_DEFAULT_PORT.to_string() => Some(vec![PortBinding { + host_ip: Some(IP_LOCAL_HOST.to_string()), + host_port: None, + }]), + }), + // Mount the volume in to the container. We use a volume because they are + // more performant and easier to manage via the Docker API. + binds: Some(vec![format!("{}:/var/lib/postgresql/data", volume_name)]), + ..Default::default() + }); + + let config = bollard::container::Config { + image: Some(POSTGRES_IMAGE.to_string()), + // We set this to false so the container keeps running after the CLI + // shuts down by default. We manually kill the container if applicable, + // for example if the user set --force-restart. + tty: Some(false), + exposed_ports: Some(hashmap! {POSTGRES_DEFAULT_PORT.to_string() => hashmap!{}}), + host_config, + env: Some(vec![ + // We run postgres without any auth + no password. + "POSTGRES_HOST_AUTH_METHOD=trust".to_string(), + format!("POSTGRES_USER={}", POSTGRES_USER), + format!("POSTGRES_DB={}", POSTGRES_DB_NAME), + // This tells where postgres to store the DB data on disk. This is the + // directory inside the container that is mounted from the host system. + // format!("PGDATA={}", DATA_PATH_IN_CONTAINER), + ]), + cmd: Some( + vec![ + "postgres", + "-c", + // The default is 100 as of Postgres 14.11. Given the localnet + // can be composed of many different processors all with their own + // connection pools, 100 is insufficient. + "max_connections=200", + "-c", + // The default is 128MB as of Postgres 14.11. We 2x that value to + // match the fact that we 2x'd max_connections. + "shared_buffers=256MB", + ] + .into_iter() + .map(|s| s.to_string()) + .collect(), + ), + ..Default::default() + }; + + let options = CreateContainerOptions { + name: format!("aptos-workspace-{}-postgres", instance_id), + ..Default::default() + }; + + (options, config) +} + +/// Starts a postgres container within a docker network. +/// +/// Prerequisites +/// - This depends on a previous task to create the docker network +/// +/// The function returns three futures: +/// - One that resolves to the host port that can be used to access the postgres container when +/// it's fully up. +/// - One that resolves when the container stops (which it should not under normal operation). +/// - A cleanup task that stops the container and removes the associated data volume. +/// +/// As the caller, you should always await the cleanup task when you are ready to shutdown the +/// service. The cleanup is a "best-effort" operation -- success is not guaranteed +/// as it relies on external commands that may fail for various reasons. +pub fn start_postgres( + shutdown: CancellationToken, + fut_network: impl Future>>, + instance_id: Uuid, +) -> ( + impl Future>, + impl Future>, + impl Future, +) { + println!("Starting postgres.."); + + let volume_name = format!("aptos-workspace-{}-postgres", instance_id); + let (fut_volume, fut_volume_clean_up) = create_docker_volume(shutdown.clone(), volume_name); + + let fut_container_clean_up = Arc::new(Mutex::new(None)); + + let fut_create_postgres = make_shared({ + let fut_container_clean_up = fut_container_clean_up.clone(); + + async move { + let (network_name, volume_name) = try_join!(fut_network, fut_volume) + .map_err(anyhow::Error::msg) + .context("failed to start postgres: one or more dependencies failed to start")?; + + let (options, config) = + create_container_options_and_config(instance_id, network_name, volume_name); + let (fut_container, fut_container_cleanup) = + create_start_and_inspect_container(shutdown.clone(), options, config); + *fut_container_clean_up.lock().await = Some(fut_container_cleanup); + + let container_info = fut_container + .await + .map_err(anyhow::Error::msg) + .context("failed to start postgres")?; + + let postgres_port = get_postgres_assigned_port(&container_info) + .ok_or_else(|| anyhow!("failed to get postgres port"))?; + + anyhow::Ok(postgres_port) + } + }); + + let fut_postgres_port = { + let fut_create_postgres = fut_create_postgres.clone(); + + async move { + let postgres_port = fut_create_postgres.await.map_err(anyhow::Error::msg)?; + + let health_checker = + HealthChecker::Postgres(get_postgres_connection_string(postgres_port)); + health_checker.wait(None).await?; + + println!( + "Postgres is ready. Endpoint: http://{}:{}", + IP_LOCAL_HOST, postgres_port + ); + + anyhow::Ok(postgres_port) + } + }; + + let fut_postgres_finish = async move { + let docker = docker::get_docker() + .await + .context("failed to wait on postgres container")?; + + // Wait for the container to stop (which it shouldn't). + let _wait = docker + .wait_container( + &format!("aptos-workspace-{}-postgres", instance_id), + Some(WaitContainerOptions { + condition: "not-running", + }), + ) + .try_collect::>() + .await + .context("failed to wait on postgres container")?; + + anyhow::Ok(()) + }; + + let fut_postgres_clean_up = { + // Note: The creation task must be allowed to finish, even if a shutdown signal or other + // early abort signal is received. This is to prevent race conditions. + // + // Do not abort the creation task prematurely -- let it either finish or handle its own abort. + let fut_create_postgres = fut_create_postgres.clone(); + + async move { + _ = fut_create_postgres.await; + + if let Some(fut_container_clean_up) = fut_container_clean_up.lock().await.take() { + fut_container_clean_up.await; + } + fut_volume_clean_up.await; + } + }; + + ( + fut_postgres_port, + fut_postgres_finish, + fut_postgres_clean_up, + ) +} diff --git a/aptos-move/aptos-workspace-server/src/services/processors.rs b/aptos-move/aptos-workspace-server/src/services/processors.rs new file mode 100644 index 0000000000000..97a0dc38a64c0 --- /dev/null +++ b/aptos-move/aptos-workspace-server/src/services/processors.rs @@ -0,0 +1,198 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::{node::get_data_service_url, postgres::get_postgres_connection_string}; +use crate::common::make_shared; +use anyhow::{anyhow, Context, Result}; +use aptos::node::local_testnet::{processors::get_processor_config, HealthChecker}; +use diesel::Connection; +use diesel_async::{async_connection_wrapper::AsyncConnectionWrapper, pg::AsyncPgConnection}; +use futures::{future::try_join_all, stream::FuturesUnordered, StreamExt}; +use processor::{ + gap_detectors::DEFAULT_GAP_DETECTION_BATCH_SIZE, processors::ProcessorName, + utils::database::run_pending_migrations, IndexerGrpcProcessorConfig, +}; +use server_framework::RunnableConfig; +use std::{future::Future, sync::Arc}; +use tokio::try_join; + +/// Names of the processors to enable in the local network. +const PROCESSOR_NAMES: &[ProcessorName] = { + use ProcessorName::*; + + &[ + AccountTransactionsProcessor, + DefaultProcessor, + EventsProcessor, + FungibleAssetProcessor, + ObjectsProcessor, + StakeProcessor, + TokenV2Processor, + TransactionMetadataProcessor, + UserTransactionProcessor, + ] +}; + +/// Starts a single processor. +/// +/// Needs to await a task to bring up the prerequisite services and perform the DB migration, +/// shared among all processors. +/// +/// The function returns two futures: +/// - One that resolves when the processor is up. +/// - One that resolves when the processor stops (which it should not under normal operation). +fn start_processor( + fut_prerequisites: &(impl Future>> + + Clone + + Send + + 'static), + processor_name: &ProcessorName, +) -> ( + impl Future>, + impl Future>, +) { + let fut_prerequisites_ = fut_prerequisites.clone(); + let processor_name_ = processor_name.to_owned(); + let handle_processor = tokio::spawn(async move { + let (postgres_port, indexer_grpc_port) = + fut_prerequisites_.await.map_err(anyhow::Error::msg)?; + + println!("Starting processor {}..", processor_name_); + + let config = IndexerGrpcProcessorConfig { + processor_config: get_processor_config(&processor_name_)?, + postgres_connection_string: get_postgres_connection_string(postgres_port), + indexer_grpc_data_service_address: get_data_service_url(indexer_grpc_port), + + auth_token: "notused".to_string(), + grpc_http2_config: Default::default(), + starting_version: None, + ending_version: None, + number_concurrent_processing_tasks: None, + enable_verbose_logging: None, + // The default at the time of writing is 30 but we don't need that + // many in a localnet environment. + db_pool_size: Some(8), + gap_detection_batch_size: 50, + pb_channel_txn_chunk_size: 100_000, + per_table_chunk_sizes: Default::default(), + transaction_filter: Default::default(), + grpc_response_item_timeout_in_secs: 10, + deprecated_tables: Default::default(), + parquet_gap_detection_batch_size: DEFAULT_GAP_DETECTION_BATCH_SIZE, + }; + + config.run().await + }); + + let fut_processor_finish = async move { + handle_processor + .await + .map_err(|err| anyhow!("failed to join task handle: {}", err))? + }; + + let fut_prerequisites_ = fut_prerequisites.clone(); + let processor_name_ = processor_name.to_owned(); + let fut_processor_ready = async move { + let (postgres_port, _indexer_grpc_port) = + fut_prerequisites_.await.map_err(anyhow::Error::msg)?; + + let processor_health_checker = HealthChecker::Processor( + get_postgres_connection_string(postgres_port), + processor_name_.to_string(), + ); + + processor_health_checker.wait(None).await?; + + println!("Processor {} is ready.", processor_name_); + + Ok(()) + }; + + (fut_processor_ready, fut_processor_finish) +} + +/// Starts the indexer processor services. See [`PROCESSOR_NAMES`] for the full list. +/// +/// Prerequisites +/// - Node API +/// - Node indexer gRPC +/// - Postgres DB +/// +/// The function returns two futures: +/// - One that resolves when all processors are up. +/// - One that resolves when any of the processors stops (which it should not under normal operation). +pub fn start_all_processors( + fut_node_api: impl Future>> + Clone + Send + 'static, + fut_indexer_grpc: impl Future>> + Clone + Send + 'static, + fut_postgres: impl Future>> + Clone + Send + 'static, +) -> ( + impl Future>, + impl Future>, +) { + let fut_migration = async move { + let postgres_port = fut_postgres + .await + .map_err(anyhow::Error::msg) + .context("failed to run migration: postgres did not start successfully")?; + + println!("Starting migration.."); + + let connection_string = get_postgres_connection_string(postgres_port); + + tokio::task::spawn_blocking(move || { + // This lets us use the connection like a normal diesel connection. See more: + // https://docs.rs/diesel-async/latest/diesel_async/async_connection_wrapper/type.AsyncConnectionWrapper.html + let mut conn: AsyncConnectionWrapper = + AsyncConnectionWrapper::establish(&connection_string).with_context(|| { + format!("Failed to connect to postgres at {}", connection_string) + })?; + run_pending_migrations(&mut conn); + anyhow::Ok(()) + }) + .await + .map_err(|err| anyhow!("failed to join task handle: {}", err))??; + + println!("Migration done."); + + Ok(postgres_port) + }; + + let fut_prerequisites = make_shared::<_, _, anyhow::Error>(async move { + let (_node_api_port, indexer_grpc_port, postgres_port) = try_join!( + fut_node_api, + fut_indexer_grpc, + fut_migration + ) + .map_err(anyhow::Error::msg) + .context( + "failed to start processors: one or more prerequisites did not start successfully", + )?; + + Ok((postgres_port, indexer_grpc_port)) + }); + + let mut futs_ready = vec![]; + let mut futs_finish = vec![]; + + for processor_name in PROCESSOR_NAMES { + let (fut_ready, fut_finish) = start_processor(&fut_prerequisites, processor_name); + + futs_ready.push(fut_ready); + futs_finish.push(fut_finish); + } + + let fut_all_processors_ready = async move { + try_join_all(futs_ready) + .await + .map_err(|err| err.context("one or more processors did not start successfully"))?; + Ok(()) + }; + + let fut_any_processor_finish = async move { + let mut futs: FuturesUnordered<_> = futs_finish.into_iter().collect(); + futs.next().await.expect("there must be at least 1 future") + }; + + (fut_all_processors_ready, fut_any_processor_finish) +} diff --git a/crates/aptos/src/node/local_testnet/indexer_api.rs b/crates/aptos/src/node/local_testnet/indexer_api.rs index 8ff415a879605..7c9dc6dafe3ed 100644 --- a/crates/aptos/src/node/local_testnet/indexer_api.rs +++ b/crates/aptos/src/node/local_testnet/indexer_api.rs @@ -24,7 +24,7 @@ use std::{collections::HashSet, path::PathBuf, time::Duration}; use tracing::{info, warn}; const INDEXER_API_CONTAINER_NAME: &str = "local-testnet-indexer-api"; -const HASURA_IMAGE: &str = "hasura/graphql-engine:v2.44.0-ce"; +pub const HASURA_IMAGE: &str = "hasura/graphql-engine:v2.44.0-ce"; /// This Hasura metadata originates from the aptos-indexer-processors repo. /// @@ -42,7 +42,7 @@ const HASURA_IMAGE: &str = "hasura/graphql-engine:v2.44.0-ce"; /// This works fine today since all the key processors you'd need in a localnet /// are in the set of processors written in Rust. If this changes, we can explore /// alternatives, e.g. running processors in other languages using containers. -const HASURA_METADATA: &str = include_str!("hasura_metadata.json"); +pub const HASURA_METADATA: &str = include_str!("hasura_metadata.json"); /// Args related to running an indexer API for the localnet. #[derive(Debug, Parser)] @@ -337,7 +337,7 @@ impl ServiceManager for IndexerApiManager { } /// This submits a POST request to apply metadata to a Hasura API. -async fn post_metadata(url: Url, metadata_content: &str) -> Result<()> { +pub async fn post_metadata(url: Url, metadata_content: &str) -> Result<()> { // Parse the metadata content as JSON. let metadata_json: serde_json::Value = serde_json::from_str(metadata_content)?; diff --git a/crates/aptos/src/node/local_testnet/mod.rs b/crates/aptos/src/node/local_testnet/mod.rs index 638b44b3696e2..871a31a133aa7 100644 --- a/crates/aptos/src/node/local_testnet/mod.rs +++ b/crates/aptos/src/node/local_testnet/mod.rs @@ -1,18 +1,18 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -mod docker; -mod indexer_api; mod logging; mod postgres; -mod processors; mod ready_server; mod utils; // This is to allow external crates to use the localnode. +pub mod docker; pub mod faucet; pub mod health_checker; +pub mod indexer_api; pub mod node; +pub mod processors; pub mod traits; use self::{ diff --git a/crates/aptos/src/node/local_testnet/processors.rs b/crates/aptos/src/node/local_testnet/processors.rs index f9654f9c662b8..4be69ac2e5bf2 100644 --- a/crates/aptos/src/node/local_testnet/processors.rs +++ b/crates/aptos/src/node/local_testnet/processors.rs @@ -57,6 +57,64 @@ pub struct ProcessorManager { prerequisite_health_checkers: HashSet, } +pub fn get_processor_config(processor_name: &ProcessorName) -> Result { + Ok(match processor_name { + ProcessorName::AccountTransactionsProcessor => { + ProcessorConfig::AccountTransactionsProcessor + }, + ProcessorName::AnsProcessor => { + bail!("ANS processor is not supported in the localnet") + }, + ProcessorName::DefaultProcessor => ProcessorConfig::DefaultProcessor, + ProcessorName::EventsProcessor => ProcessorConfig::EventsProcessor, + ProcessorName::FungibleAssetProcessor => ProcessorConfig::FungibleAssetProcessor, + ProcessorName::MonitoringProcessor => { + bail!("Monitoring processor is not supported in the localnet") + }, + ProcessorName::NftMetadataProcessor => { + bail!("NFT Metadata processor is not supported in the localnet") + }, + ProcessorName::ObjectsProcessor => { + ProcessorConfig::ObjectsProcessor(ObjectsProcessorConfig { + query_retries: Default::default(), + query_retry_delay_ms: Default::default(), + }) + }, + ProcessorName::ParquetDefaultProcessor => { + bail!("ParquetDefaultProcessor is not supported in the localnet") + }, + ProcessorName::ParquetFungibleAssetProcessor => { + bail!("ParquetFungibleAssetProcessor is not supported in the localnet") + }, + ProcessorName::ParquetTransactionMetadataProcessor => { + bail!("ParquetTransactionMetadataProcessor is not supported in the localnet") + }, + ProcessorName::ParquetAnsProcessor => { + bail!("ParquetAnsProcessor is not supported in the localnet") + }, + ProcessorName::ParquetEventsProcessor => { + bail!("ParquetEventsProcessor is not supported in the localnet") + }, + ProcessorName::ParquetTokenV2Processor => { + bail!("ParquetTokenV2Processor is not supported in the localnet") + }, + ProcessorName::StakeProcessor => ProcessorConfig::StakeProcessor(StakeProcessorConfig { + query_retries: Default::default(), + query_retry_delay_ms: Default::default(), + }), + ProcessorName::TokenV2Processor => { + ProcessorConfig::TokenV2Processor(TokenV2ProcessorConfig { + query_retries: Default::default(), + query_retry_delay_ms: Default::default(), + }) + }, + ProcessorName::TransactionMetadataProcessor => { + ProcessorConfig::TransactionMetadataProcessor + }, + ProcessorName::UserTransactionProcessor => ProcessorConfig::UserTransactionProcessor, + }) +} + impl ProcessorManager { fn new( processor_name: &ProcessorName, @@ -64,63 +122,7 @@ impl ProcessorManager { data_service_url: Url, postgres_connection_string: String, ) -> Result { - let processor_config = match processor_name { - ProcessorName::AccountTransactionsProcessor => { - ProcessorConfig::AccountTransactionsProcessor - }, - ProcessorName::AnsProcessor => { - bail!("ANS processor is not supported in the localnet") - }, - ProcessorName::DefaultProcessor => ProcessorConfig::DefaultProcessor, - ProcessorName::EventsProcessor => ProcessorConfig::EventsProcessor, - ProcessorName::FungibleAssetProcessor => ProcessorConfig::FungibleAssetProcessor, - ProcessorName::MonitoringProcessor => { - bail!("Monitoring processor is not supported in the localnet") - }, - ProcessorName::NftMetadataProcessor => { - bail!("NFT Metadata processor is not supported in the localnet") - }, - ProcessorName::ObjectsProcessor => { - ProcessorConfig::ObjectsProcessor(ObjectsProcessorConfig { - query_retries: Default::default(), - query_retry_delay_ms: Default::default(), - }) - }, - ProcessorName::ParquetDefaultProcessor => { - bail!("ParquetDefaultProcessor is not supported in the localnet") - }, - ProcessorName::ParquetFungibleAssetProcessor => { - bail!("ParquetFungibleAssetProcessor is not supported in the localnet") - }, - ProcessorName::ParquetTransactionMetadataProcessor => { - bail!("ParquetTransactionMetadataProcessor is not supported in the localnet") - }, - ProcessorName::ParquetAnsProcessor => { - bail!("ParquetAnsProcessor is not supported in the localnet") - }, - ProcessorName::ParquetEventsProcessor => { - bail!("ParquetEventsProcessor is not supported in the localnet") - }, - ProcessorName::ParquetTokenV2Processor => { - bail!("ParquetTokenV2Processor is not supported in the localnet") - }, - ProcessorName::StakeProcessor => { - ProcessorConfig::StakeProcessor(StakeProcessorConfig { - query_retries: Default::default(), - query_retry_delay_ms: Default::default(), - }) - }, - ProcessorName::TokenV2Processor => { - ProcessorConfig::TokenV2Processor(TokenV2ProcessorConfig { - query_retries: Default::default(), - query_retry_delay_ms: Default::default(), - }) - }, - ProcessorName::TransactionMetadataProcessor => { - ProcessorConfig::TransactionMetadataProcessor - }, - ProcessorName::UserTransactionProcessor => ProcessorConfig::UserTransactionProcessor, - }; + let processor_config = get_processor_config(processor_name)?; let config = IndexerGrpcProcessorConfig { processor_config, postgres_connection_string, @@ -174,6 +176,7 @@ impl ProcessorManager { /// Create the necessary tables in the DB for the processors to work. async fn run_migrations(&self) -> Result<()> { let connection_string = self.config.postgres_connection_string.clone(); + tokio::task::spawn_blocking(move || { // This lets us use the connection like a normal diesel connection. See more: // https://docs.rs/diesel-async/latest/diesel_async/async_connection_wrapper/type.AsyncConnectionWrapper.html