Skip to content

Commit

Permalink
initial aptos workspace server impl (#14463)
Browse files Browse the repository at this point in the history
  • Loading branch information
vgao1996 authored Oct 3, 2024
1 parent 6938caa commit 6140801
Show file tree
Hide file tree
Showing 11 changed files with 307 additions and 25 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ members = [
"aptos-move/aptos-vm-logging",
"aptos-move/aptos-vm-profiling",
"aptos-move/aptos-vm-types",
"aptos-move/aptos-workspace-server",
"aptos-move/block-executor",
"aptos-move/e2e-benchmark",
"aptos-move/e2e-move-tests",
Expand Down
15 changes: 13 additions & 2 deletions api/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ use crate::{
transactions::TransactionsApi,
view_function::ViewFunctionApi,
};
use anyhow::Context as AnyhowContext;
use anyhow::{anyhow, Context as AnyhowContext};
use aptos_config::config::{ApiConfig, NodeConfig};
use aptos_logger::info;
use aptos_mempool::MempoolClientSender;
use aptos_storage_interface::DbReader;
use aptos_types::{chain_id::ChainId, indexer::indexer_db_reader::IndexerReader};
use futures::channel::oneshot;
use poem::{
handler,
http::Method,
Expand All @@ -45,13 +46,14 @@ pub fn bootstrap(
db: Arc<dyn DbReader>,
mp_sender: MempoolClientSender,
indexer_reader: Option<Arc<dyn IndexerReader>>,
port_tx: Option<oneshot::Sender<u16>>,
) -> anyhow::Result<Runtime> {
let max_runtime_workers = get_max_runtime_workers(&config.api);
let runtime = aptos_runtimes::spawn_named_runtime("api".into(), Some(max_runtime_workers));

let context = Context::new(chain_id, db, mp_sender, config.clone(), indexer_reader);

attach_poem_to_runtime(runtime.handle(), context.clone(), config, false)
attach_poem_to_runtime(runtime.handle(), context.clone(), config, false, port_tx)
.context("Failed to attach poem to runtime")?;

let context_cloned = context.clone();
Expand Down Expand Up @@ -167,6 +169,7 @@ pub fn attach_poem_to_runtime(
context: Context,
config: &NodeConfig,
random_port: bool,
port_tx: Option<oneshot::Sender<u16>>,
) -> anyhow::Result<SocketAddr> {
let context = Arc::new(context);

Expand Down Expand Up @@ -216,6 +219,13 @@ pub fn attach_poem_to_runtime(
let actual_address = *actual_address
.as_socket_addr()
.context("Failed to get socket addr from local addr for Poem webserver")?;

if let Some(port_tx) = port_tx {
port_tx
.send(actual_address.port())
.map_err(|_| anyhow!("Failed to send port"))?;
}

runtime_handle.spawn(async move {
let cors = Cors::new()
// To allow browsers to use cookies (for cookie-based sticky
Expand Down Expand Up @@ -351,6 +361,7 @@ mod tests {
context.db.clone(),
context.mempool.ac_client.clone(),
None,
None,
);
assert!(ret.is_ok());

Expand Down
5 changes: 3 additions & 2 deletions api/test-context/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ pub fn new_test_context_inner(

// Configure the testing depending on which API version we're testing.
let runtime_handle = tokio::runtime::Handle::current();
let poem_address = attach_poem_to_runtime(&runtime_handle, context.clone(), &node_config, true)
.expect("Failed to attach poem to runtime");
let poem_address =
attach_poem_to_runtime(&runtime_handle, context.clone(), &node_config, true, None)
.expect("Failed to attach poem to runtime");
let api_specific_config = ApiSpecificConfig::V1(poem_address);

TestContext::new(
Expand Down
27 changes: 27 additions & 0 deletions aptos-move/aptos-workspace-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "aptos-workspace-server"
version = "0.1.0"

# Workspace inherited keys
authors = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
publish = { workspace = true }
repository = { workspace = true }
rust-version = { workspace = true }

[dependencies]
aptos = { workspace = true }
aptos-cached-packages = { workspace = true }
aptos-config = { workspace = true }
aptos-faucet-core = { workspace = true }
aptos-node = { workspace = true }
aptos-types = { workspace = true }

anyhow = { workspace = true }
futures = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
url = { workspace = true }
176 changes: 176 additions & 0 deletions aptos-move/aptos-workspace-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use aptos::node::local_testnet::HealthChecker;
use aptos_config::config::NodeConfig;
use aptos_faucet_core::server::{FunderKeyEnum, RunConfig};
use aptos_node::{load_node_config, start_and_report_ports};
use aptos_types::network_address::{NetworkAddress, Protocol};
use futures::channel::oneshot;
use rand::{rngs::StdRng, SeedableRng};
use std::{
net::{IpAddr, Ipv4Addr},
path::Path,
thread,
time::Duration,
};
use url::Url;

pub fn zero_all_ports(config: &mut NodeConfig) {
// TODO: Double check if all ports are covered.

config.admin_service.port = 0;
config.api.address.set_port(0);
config.inspection_service.port = 0;
config.storage.backup_service_address.set_port(0);
config.indexer_grpc.address.set_port(0);

if let Some(network) = config.validator_network.as_mut() {
network.listen_address = NetworkAddress::from_protocols(vec![
Protocol::Ip4("0.0.0.0".parse().unwrap()),
Protocol::Tcp(0),
])
.unwrap();
}
for network in config.full_node_networks.iter_mut() {
network.listen_address = NetworkAddress::from_protocols(vec![
Protocol::Ip4("0.0.0.0".parse().unwrap()),
Protocol::Tcp(0),
])
.unwrap();
}
}

async fn spawn_node(test_dir: &Path) -> Result<()> {
let rng = StdRng::from_entropy();

let mut node_config = load_node_config(
&None,
&None,
test_dir,
false,
false,
false,
aptos_cached_packages::head_release_bundle(),
rng,
)?;

zero_all_ports(&mut node_config);
node_config.indexer_grpc.enabled = true;
node_config.indexer_grpc.use_data_service_interface = true;
node_config.storage.enable_indexer = true;

node_config
.api
.address
.set_ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
node_config
.indexer_grpc
.address
.set_ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));

node_config.admin_service.address = "127.0.0.1".to_string();
node_config.inspection_service.address = "127.0.0.1".to_string();

let (api_port_tx, api_port_rx) = oneshot::channel();
let (indexer_grpc_port_tx, indexer_grpc_port_rx) = oneshot::channel();

let run_node = {
let test_dir = test_dir.to_owned();
let node_config = node_config.clone();
move || -> Result<()> {
start_and_report_ports(
node_config,
Some(test_dir.join("validator.log")),
false,
Some(api_port_tx),
Some(indexer_grpc_port_tx),
)
}
};

let _node_thread_handle = thread::spawn(move || {
let res = run_node();

if let Err(err) = res {
println!("Node stopped unexpectedly {:?}", err);
}
});

let api_port = api_port_rx.await?;
let indexer_grpc_port = indexer_grpc_port_rx.await?;

let api_health_checker = HealthChecker::NodeApi(
Url::parse(&format!(
"http://{}:{}",
node_config.api.address.ip(),
api_port
))
.unwrap(),
);
let indexer_grpc_health_checker = HealthChecker::DataServiceGrpc(
Url::parse(&format!(
"http://{}:{}",
node_config.indexer_grpc.address.ip(),
indexer_grpc_port
))
.unwrap(),
);

api_health_checker.wait(None).await?;
eprintln!(
"Node API is ready. Endpoint: http://127.0.0.1:{}/",
api_port
);

indexer_grpc_health_checker.wait(None).await?;
eprintln!(
"Transaction stream is ready. Endpoint: http://127.0.0.1:{}/",
indexer_grpc_port
);

let faucet_run_config = RunConfig::build_for_cli(
Url::parse(&format!(
"http://{}:{}",
node_config.api.address.ip(),
api_port
))
.unwrap(),
"127.0.0.1".to_string(),
0,
FunderKeyEnum::KeyFile(test_dir.join("mint.key")),
false,
None,
);

let (faucet_port_tx, faucet_port_rx) = oneshot::channel();
tokio::spawn(faucet_run_config.run_and_report_port(faucet_port_tx));

let faucet_port = faucet_port_rx.await?;

let faucet_health_checker =
HealthChecker::http_checker_from_port(faucet_port, "Faucet".to_string());
faucet_health_checker.wait(None).await?;
eprintln!(
"Faucet is ready. Endpoint: http://127.0.0.1:{}",
faucet_port
);

eprintln!("Indexer API is ready. Endpoint: http://127.0.0.1:0/");

Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
let test_dir = tempfile::tempdir()?;

println!("Test directory: {}", test_dir.path().display());

spawn_node(test_dir.path()).await?;

loop {
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
27 changes: 23 additions & 4 deletions aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use aptos_logger::{prelude::*, telemetry_log_writer::TelemetryLog, Level, Logger
use aptos_state_sync_driver::driver_factory::StateSyncRuntimes;
use aptos_types::{chain_id::ChainId, on_chain_config::OnChainJWKConsensusConfig};
use clap::Parser;
use futures::channel::mpsc;
use futures::channel::{mpsc, oneshot};
use hex::{FromHex, FromHexError};
use rand::{rngs::StdRng, SeedableRng};
use std::{
Expand Down Expand Up @@ -209,11 +209,21 @@ pub struct AptosHandle {
_indexer_db_runtime: Option<Runtime>,
}

/// Start an Aptos node
pub fn start(
config: NodeConfig,
log_file: Option<PathBuf>,
create_global_rayon_pool: bool,
) -> anyhow::Result<()> {
start_and_report_ports(config, log_file, create_global_rayon_pool, None, None)
}

/// Start an Aptos node
pub fn start_and_report_ports(
config: NodeConfig,
log_file: Option<PathBuf>,
create_global_rayon_pool: bool,
api_port_tx: Option<oneshot::Sender<u16>>,
indexer_grpc_port_tx: Option<oneshot::Sender<u16>>,
) -> anyhow::Result<()> {
// Setup panic handler
aptos_crash_handler::setup_panic_handler();
Expand Down Expand Up @@ -252,8 +262,13 @@ pub fn start(
}

// Set up the node environment and start it
let _node_handle =
setup_environment_and_start_node(config, remote_log_receiver, Some(logger_filter_update))?;
let _node_handle = setup_environment_and_start_node(
config,
remote_log_receiver,
Some(logger_filter_update),
api_port_tx,
indexer_grpc_port_tx,
)?;
let term = Arc::new(AtomicBool::new(false));
while !term.load(Ordering::Acquire) {
thread::park();
Expand Down Expand Up @@ -597,6 +612,8 @@ pub fn setup_environment_and_start_node(
mut node_config: NodeConfig,
remote_log_rx: Option<mpsc::Receiver<TelemetryLog>>,
logger_filter_update_job: Option<LoggerFilterUpdater>,
api_port_tx: Option<oneshot::Sender<u16>>,
indexer_grpc_port_tx: Option<oneshot::Sender<u16>>,
) -> anyhow::Result<AptosHandle> {
// Log the node config at node startup
node_config.log_all_configs();
Expand Down Expand Up @@ -693,6 +710,8 @@ pub fn setup_environment_and_start_node(
chain_id,
indexer_db_opt,
update_receiver,
api_port_tx,
indexer_grpc_port_tx,
)?;

// Create mempool and get the consensus to mempool sender
Expand Down
Loading

0 comments on commit 6140801

Please sign in to comment.