Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
dermanyang committed Nov 6, 2024
2 parents eccb90e + 13a9587 commit 4b67e5d
Show file tree
Hide file tree
Showing 52 changed files with 4,377 additions and 285 deletions.
2 changes: 1 addition & 1 deletion .github/actions/rust-smoke-tests/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ runs:
# We always try to create the artifact, but it only creates on flaky or failed smoke tests -- when the directories are empty.
- name: Upload smoke test logs for failed and flaky tests
if: ${{ failure() || success() }}
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: failed-smoke-test-logs
# Retain all smoke test data except for the db (which may be large).
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/backport-to-release-branches.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ permissions:

jobs:
permission-check:
if: github.event.pull_request.merged == true && contains(join(github.event.pull_request.labels.*.name, ','), 'v1.')
runs-on: ubuntu-latest
steps:
- name: Check repository permission for user which triggered workflow
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/module-verify.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
secrets: inherit
with:
GIT_SHA: ${{ inputs.GIT_SHA }}
BUCKET: aptos-mainnet-backup-backup-831a69a8
BUCKET: aptos-mainnet-backup-backup-6addc21b
SUB_DIR: e1
BACKUP_CONFIG_TEMPLATE_PATH: terraform/helm/fullnode/files/backup/s3-public.yaml
# workflow config
Expand Down
23 changes: 12 additions & 11 deletions .github/workflows/workflow-run-forge.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ env:
VERBOSE: true
FORGE_NUM_VALIDATORS: ${{ inputs.FORGE_NUM_VALIDATORS }}
FORGE_NUM_VALIDATOR_FULLNODES: ${{ inputs.FORGE_NUM_VALIDATOR_FULLNODES }}
FORGE_JUNIT_XML_PATH: ${{ inputs.SEND_RESULTS_TO_TRUNK && '/tmp/test.xml' || '' }}
# FORGE_JUNIT_XML_PATH: ${{ inputs.SEND_RESULTS_TO_TRUNK && '/tmp/test.xml' || '' }}

# TODO: should we migrate this to a composite action, so that we can skip it
# at the call site, and don't need to wrap each step in an if statement?
Expand Down Expand Up @@ -234,13 +234,14 @@ jobs:
- run: echo "Skipping forge test!"
if: ${{ inputs.SKIP_JOB }}

- name: Upload results
# Run this step even if the test step ahead fails
if: ${{ !inputs.SKIP_JOB && inputs.SEND_RESULTS_TO_TRUNK && !cancelled() }}
uses: trunk-io/analytics-uploader@main
with:
# Configured in the nextest.toml file
junit-paths: ${{ env.FORGE_JUNIT_XML_PATH }}
org-slug: aptoslabs
token: ${{ secrets.TRUNK_API_TOKEN }}
continue-on-error: true
# TEMP disable till fixed
# - name: Upload results
# # Run this step even if the test step ahead fails
# if: ${{ !inputs.SKIP_JOB && inputs.SEND_RESULTS_TO_TRUNK && !cancelled() }}
# uses: trunk-io/analytics-uploader@main
# with:
# # Configured in the nextest.toml file
# junit-paths: ${{ env.FORGE_JUNIT_XML_PATH }}
# org-slug: aptoslabs
# token: ${{ secrets.TRUNK_API_TOKEN }}
# continue-on-error: true
2 changes: 1 addition & 1 deletion CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
/crates/aptos-open-api @banool @gregnazario

# Owners for the aptos-protos crate
/crates/aptos-protos @banool @bowenyang007 @jillxuu @larry-aptos @rtso
/crates/aptos-protos @banool @bowenyang007 @jillxuu @larry-aptos @rtso @aptos-labs/ecosystem-infra

/crates/aptos-rest-client @banool @gregnazario

Expand Down
258 changes: 184 additions & 74 deletions aptos-move/aptos-workspace-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,35 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use anyhow::{anyhow, Context, Result};
use aptos::node::local_testnet::HealthChecker;
use aptos_config::config::{NodeConfig, TableInfoServiceMode};
use aptos_faucet_core::server::{FunderKeyEnum, RunConfig};
use aptos_node::{load_node_config, start_and_report_ports};
use aptos_types::network_address::{NetworkAddress, Protocol};
use futures::channel::oneshot;
use futures::{channel::oneshot, future::Shared, FutureExt};
use rand::{rngs::StdRng, SeedableRng};
use std::{
future::Future,
net::{IpAddr, Ipv4Addr},
path::Path,
path::{Path, PathBuf},
sync::Arc,
thread,
time::Duration,
};
use url::Url;

const IP_LOCAL_HOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));

/// Converts a future into a shared one by putting the error into an Arc.
fn make_shared<F, T, E>(fut: F) -> Shared<impl Future<Output = Result<T, Arc<E>>>>
where
T: Clone,
F: Future<Output = Result<T, E>>,
{
fut.map(|r| r.map_err(|err| Arc::new(err))).shared()
}

/// Sets all ports in the node config to zero so the OS can assign them random ones.
pub fn zero_all_ports(config: &mut NodeConfig) {
// TODO: Double check if all ports are covered.

Expand All @@ -42,7 +55,17 @@ pub fn zero_all_ports(config: &mut NodeConfig) {
}
}

async fn spawn_node(test_dir: &Path) -> Result<()> {
/// Starts a local node and returns three futures:
/// 1. A future for the node API, which resolves to the port number once the service is fully up.
/// 2. A future for the indexer gRPC, which resolves to the port number once the service is fully up.
/// 3. A final future that resolves when the node stops.
fn start_node(
test_dir: &Path,
) -> Result<(
impl Future<Output = Result<u16>>,
impl Future<Output = Result<u16>>,
impl Future<Output = Result<()>>,
)> {
let rng = StdRng::from_entropy();

let mut node_config = load_node_config(
Expand All @@ -62,17 +85,11 @@ async fn spawn_node(test_dir: &Path) -> Result<()> {

node_config.indexer_table_info.table_info_service_mode = TableInfoServiceMode::IndexingOnly;

node_config
.api
.address
.set_ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
node_config
.indexer_grpc
.address
.set_ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
node_config.api.address.set_ip(IP_LOCAL_HOST);
node_config.indexer_grpc.address.set_ip(IP_LOCAL_HOST);

node_config.admin_service.address = "127.0.0.1".to_string();
node_config.inspection_service.address = "127.0.0.1".to_string();
node_config.admin_service.address = IP_LOCAL_HOST.to_string();
node_config.inspection_service.address = IP_LOCAL_HOST.to_string();

let (api_port_tx, api_port_rx) = oneshot::channel();
let (indexer_grpc_port_tx, indexer_grpc_port_rx) = oneshot::channel();
Expand All @@ -91,74 +108,169 @@ async fn spawn_node(test_dir: &Path) -> Result<()> {
}
};

let _node_thread_handle = thread::spawn(move || {
let res = run_node();
let node_thread_handle = thread::spawn(run_node);

if let Err(err) = res {
println!("Node stopped unexpectedly {:?}", err);
}
let fut_node_finish = async {
let join_handle = tokio::task::spawn_blocking(move || -> Result<()> {
node_thread_handle
.join()
.map_err(|_err| anyhow!("failed to wait for node thread"))?
});

join_handle
.await
.map_err(|err| anyhow!("failed to join node task: {}", err))?
};

let fut_api = async move {
let api_port = api_port_rx.await?;

let api_health_checker = HealthChecker::NodeApi(
Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, api_port)).unwrap(),
);
api_health_checker.wait(None).await?;

println!(
"Node API is ready. Endpoint: http://{}:{}/",
IP_LOCAL_HOST, api_port
);

Ok(api_port)
};

let fut_indexer_grpc = async move {
let indexer_grpc_port = indexer_grpc_port_rx.await?;

let indexer_grpc_health_checker = HealthChecker::DataServiceGrpc(
Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, indexer_grpc_port)).unwrap(),
);

indexer_grpc_health_checker.wait(None).await?;
println!(
"Transaction stream is ready. Endpoint: http://{}:{}/",
IP_LOCAL_HOST, indexer_grpc_port
);

Ok(indexer_grpc_port)
};

Ok((fut_api, fut_indexer_grpc, fut_node_finish))
}

/// Starts the faucet service and returns two futures.
/// 1. A future that resolves to the port used, once the faucet service is fully up.
/// 2. A future that resolves, when the service stops.
fn start_faucet(
test_dir: PathBuf,
fut_node_api: impl Future<Output = Result<u16, Arc<anyhow::Error>>> + Send + 'static,
fut_indexer_grpc: impl Future<Output = Result<u16, Arc<anyhow::Error>>> + Send + 'static,
) -> (
impl Future<Output = Result<u16>>,
impl Future<Output = Result<()>> + 'static,
) {
let (faucet_port_tx, faucet_port_rx) = oneshot::channel();

let handle_faucet = tokio::spawn(async move {
let api_port = fut_node_api
.await
.map_err(anyhow::Error::msg)
.context("failed to start faucet: node api did not start successfully")?;

fut_indexer_grpc
.await
.map_err(anyhow::Error::msg)
.context("failed to start faucet: indexer grpc did not start successfully")?;

let faucet_run_config = RunConfig::build_for_cli(
Url::parse(&format!("http://{}:{}", IP_LOCAL_HOST, api_port)).unwrap(),
IP_LOCAL_HOST.to_string(),
0,
FunderKeyEnum::KeyFile(test_dir.join("mint.key")),
false,
None,
);

faucet_run_config.run_and_report_port(faucet_port_tx).await
});

let api_port = api_port_rx.await?;
let indexer_grpc_port = indexer_grpc_port_rx.await?;
let fut_faucet_finish = async move {
handle_faucet
.await
.map_err(|err| anyhow!("failed to join handle task: {}", err))?
};

let api_health_checker = HealthChecker::NodeApi(
Url::parse(&format!(
"http://{}:{}",
node_config.api.address.ip(),
api_port
))
.unwrap(),
);
let indexer_grpc_health_checker = HealthChecker::DataServiceGrpc(
Url::parse(&format!(
"http://{}:{}",
node_config.indexer_grpc.address.ip(),
indexer_grpc_port
))
.unwrap(),
);
let fut_faucet_port = async move {
let faucet_port = faucet_port_rx
.await
.context("failed to receive faucet port")?;

api_health_checker.wait(None).await?;
eprintln!(
"Node API is ready. Endpoint: http://127.0.0.1:{}/",
api_port
);
let faucet_health_checker =
HealthChecker::http_checker_from_port(faucet_port, "Faucet".to_string());
faucet_health_checker.wait(None).await?;

indexer_grpc_health_checker.wait(None).await?;
eprintln!(
"Transaction stream is ready. Endpoint: http://127.0.0.1:{}/",
indexer_grpc_port
);
println!(
"Faucet is ready. Endpoint: http://{}:{}",
IP_LOCAL_HOST, faucet_port
);

let faucet_run_config = RunConfig::build_for_cli(
Url::parse(&format!(
"http://{}:{}",
node_config.api.address.ip(),
api_port
))
.unwrap(),
"127.0.0.1".to_string(),
0,
FunderKeyEnum::KeyFile(test_dir.join("mint.key")),
false,
None,
Ok(faucet_port)
};

(fut_faucet_port, fut_faucet_finish)
}

async fn start_all_services(test_dir: &Path) -> Result<()> {
// Step 1: spawn all services.
let (fut_node_api, fut_indexer_grpc, fut_node_finish) = start_node(test_dir)?;

let fut_node_api = make_shared(fut_node_api);
let fut_indexer_grpc = make_shared(fut_indexer_grpc);
let (fut_faucet, fut_faucet_finish) = start_faucet(
test_dir.to_owned(),
fut_node_api.clone(),
fut_indexer_grpc.clone(),
);

let (faucet_port_tx, faucet_port_rx) = oneshot::channel();
tokio::spawn(faucet_run_config.run_and_report_port(faucet_port_tx));
let (res_node_api, res_indexer_grpc, res_faucet) =
tokio::join!(fut_node_api, fut_indexer_grpc, fut_faucet);

let faucet_port = faucet_port_rx.await?;
// Step 2: wait for all services to be up.
res_node_api
.map_err(anyhow::Error::msg)
.context("failed to start node api")?;
res_indexer_grpc
.map_err(anyhow::Error::msg)
.context("failed to start node api")?;
res_faucet.context("failed to start faucet")?;

let faucet_health_checker =
HealthChecker::http_checker_from_port(faucet_port, "Faucet".to_string());
faucet_health_checker.wait(None).await?;
eprintln!(
"Faucet is ready. Endpoint: http://127.0.0.1:{}",
faucet_port
println!(
"Indexer API is ready. Endpoint: http://{}:0/",
IP_LOCAL_HOST
);

eprintln!("Indexer API is ready. Endpoint: http://127.0.0.1:0/");
println!("ALL SERVICES STARTED SUCCESSFULLY");

// Step 3: wait for services to stop.
tokio::pin!(fut_node_finish);
tokio::pin!(fut_faucet_finish);

let mut finished: u64 = 0;
while finished < 2 {
tokio::select! {
res = &mut fut_node_finish => {
if let Err(err) = res {
eprintln!("Node existed with error: {}", err);
}
finished += 1;
}
res = &mut fut_faucet_finish => {
if let Err(err) = res {
eprintln!("Faucet existed with error: {}", err);
}
finished += 1;
}
}
}

Ok(())
}
Expand All @@ -169,9 +281,7 @@ async fn main() -> Result<()> {

println!("Test directory: {}", test_dir.path().display());

spawn_node(test_dir.path()).await?;
start_all_services(test_dir.path()).await?;

loop {
tokio::time::sleep(Duration::from_millis(200)).await;
}
Ok(())
}
Loading

0 comments on commit 4b67e5d

Please sign in to comment.