Skip to content

Commit

Permalink
[smoke tests] when getting "random" ports, use the filesystem to avoi…
Browse files Browse the repository at this point in the history
…d conflicts between parallel test cases (processes) (#5421)

### Description

This should eliminate an entire class of smoke test flakiness in CICD due to conflicting ports.

Doing lsof on flaky tests in CICD, we observe that a new parallel test takes a port from a currently running node between the time it is stopped and started. When a node N goes down (by design, in the test) a port that was being used by that node N is stolen by a node M in another test. When node N restarts, the conflict occurs. (It seems very hard to hit if things were really random, yet in practice we see this at least once a day. Possibly the OS is prioritizing recently released ephemeral ports.)

cargo nextest runs parallel tests in different processes, so we need to avoid conflicts between processes. This introduces a counter file that keeps track of the last port used and a file for locking this to avoid races.

### Test Plan

Run smoke tests in CICD. Also run `cargo test` locally as a sanity check that the behavior is not changed for non-nextest uses.
  • Loading branch information
bchocho authored Nov 10, 2022
1 parent b85aad4 commit 16781dc
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 9 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ rust-version = { workspace = true }
[dependencies]
anyhow = "1.0.57"
bcs = { git = "https://github.com/aptos-labs/bcs", rev = "2cde3e8446c460cb17b0c1d6bac7e27e964ac169" }
byteorder = "1.4.3"
get_if_addrs = { version = "0.5.3", default-features = false }
mirai-annotations = "1.12.0"
poem-openapi = { version = "2.0.10", features = ["url"] }
Expand Down
179 changes: 170 additions & 9 deletions config/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,93 @@
// SPDX-License-Identifier: Apache-2.0

use crate::config::NodeConfig;
use aptos_crypto::_once_cell::sync::Lazy;
use aptos_logger::prelude::*;
use aptos_types::{
network_address::{NetworkAddress, Protocol},
transaction::Transaction,
};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use get_if_addrs::get_if_addrs;
use rand::rngs::OsRng;
use rand::Rng;
use rand::seq::SliceRandom;
use rand::SeedableRng;
use std::fs::{File, OpenOptions};
use std::io::Seek;
use std::net::{TcpListener, TcpStream};
use std::ops::Range;
use std::time::Duration;
use std::{env, fs, thread};

const MAX_PORT_RETRIES: u16 = 1000;
// Using non-ephemeral ports, to avoid conflicts with OS-selected ports (i.e., bind on port 0)
const RANDOM_PORT_RANGE: Range<u16> = 10000..30000;
const UNIQUE_PORT_RANGE: Range<u16> = 10000..30000;
// Consistent seed across processes
const PORT_SEED: [u8; 32] = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7,
];
// See https://nexte.st/book/env-vars.html#environment-variables-nextest-sets
static NEXTEST_RUN_ID: Lazy<Option<String>> = Lazy::new(|| {
if let Ok(run_id) = env::var("NEXTEST_RUN_ID") {
Some(run_id)
} else {
None
}
});
static PORT_VECTOR: Lazy<Vec<u16>> = Lazy::new(|| {
let mut ports: Vec<_> = UNIQUE_PORT_RANGE.collect();
let mut rng = rand::rngs::StdRng::from_seed(PORT_SEED);
ports.shuffle(&mut rng);
ports
});

struct PortCounterFiles {
counter_file: File,
_lock_file: File,
}

impl PortCounterFiles {
fn new(counter_file: File, lock_file: File) -> Self {
Self {
counter_file,
_lock_file: lock_file,
}
}
}

impl Drop for PortCounterFiles {
fn drop(&mut self) {
fs::remove_file(&lock_path()).unwrap();
}
}

/// Return a non-ephemeral, available port. On unix systems, the port returned will be in the
pub fn get_available_port() -> u16 {
if NEXTEST_RUN_ID.is_some() {
get_unique_port()
} else {
get_random_port()
}
}

/// Return an ephemeral, available port. On unix systems, the port returned will be in the
/// TIME_WAIT state ensuring that the OS won't hand out this port for some grace period.
/// Callers should be able to bind to this port given they use SO_REUSEADDR.
pub fn get_available_port() -> u16 {
fn get_random_port() -> u16 {
for _ in 0..MAX_PORT_RETRIES {
if let Ok(port) = get_random_port() {
if let Ok(port) = try_bind(None) {
return port;
}
}

panic!("Error: could not find an available port");
}

fn get_random_port() -> ::std::io::Result<u16> {
// Choose a random port and try to bind
let port = OsRng.gen_range(RANDOM_PORT_RANGE.start, RANDOM_PORT_RANGE.end);
fn try_bind(port: Option<u16>) -> ::std::io::Result<u16> {
// Use the provided port or 0 to request a random available port from the OS
let port = if let Some(provided_port) = port {
provided_port
} else {
0
};
let listener = TcpListener::bind(("localhost", port))?;
let addr = listener.local_addr()?;

Expand All @@ -44,6 +101,110 @@ fn get_random_port() -> ::std::io::Result<u16> {
Ok(addr.port())
}

fn lock_path() -> String {
format!(
"/tmp/aptos-port-counter.{}.lock",
&NEXTEST_RUN_ID.clone().unwrap()
)
}

fn counter_path() -> String {
format!(
"/tmp/aptos-port-counter.{}",
&NEXTEST_RUN_ID.clone().unwrap()
)
}

/// We use the filesystem to bind to unique ports for cargo nextest.
/// cargo nextest runs tests concurrently in different processes. We have observed that using
/// a simple bind(0) results in flaky tests when nodes are restarted within tests; likely the OS
/// is prioritizing recently released ports.
fn get_unique_port() -> u16 {
let mut port_counter_files = open_counter_file();
let global_counter = match port_counter_files.counter_file.read_u16::<BigEndian>() {
Ok(counter) => {
if counter as usize >= PORT_VECTOR.len() {
0
} else {
counter
}
}
Err(_) => {
warn!(
"Unable to read port counter from file {}, starting from 0",
counter_path()
);
0
}
};
let (port, updated_counter) = bind_port_from_counter(global_counter);

port_counter_files.counter_file.set_len(0).unwrap();
port_counter_files.counter_file.rewind().unwrap();
port_counter_files
.counter_file
.write_u16::<BigEndian>(updated_counter)
.unwrap();

port
}

fn open_counter_file() -> PortCounterFiles {
for i in 0..100 {
match OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(&lock_path())
{
Ok(lock_file) => match OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&counter_path())
{
Ok(counter_file) => return PortCounterFiles::new(counter_file, lock_file),
Err(_) => {
panic!("Could not read {}", counter_path());
}
},
Err(_) => {
info!("Lock could not be acquired, attempt {}", i);
thread::sleep(Duration::from_millis(100));
}
}
}
panic!("Could not acquire lock to: {}", lock_path());
}

fn bind_port_from_counter(mut counter: u16) -> (u16, u16) {
for attempt in 0..MAX_PORT_RETRIES {
let port = PORT_VECTOR[counter as usize];
counter += 1;
if counter as usize == PORT_VECTOR.len() {
counter = 0;
}

match try_bind(Some(port)) {
Ok(port) => {
return (port, counter);
}
Err(_) => {
info!(
"Conflicting port: {}, on count {} and attempt {}",
port, counter, attempt
);
continue;
}
}
}

panic!(
"Error: could not find an available port. Counter: {}",
counter
);
}

/// Extracts one local non-loopback IP address, if one exists. Otherwise returns None.
pub fn get_local_ip() -> Option<NetworkAddress> {
get_if_addrs().ok().and_then(|if_addrs| {
Expand Down

0 comments on commit 16781dc

Please sign in to comment.