diff --git a/skylark/benchmark/network/throughput.py b/skylark/benchmark/network/throughput.py index 11ca508e4..4f0f40d2f 100644 --- a/skylark/benchmark/network/throughput.py +++ b/skylark/benchmark/network/throughput.py @@ -2,6 +2,7 @@ import json from datetime import datetime from typing import List, Tuple +import re from loguru import logger from tqdm import tqdm @@ -32,6 +33,7 @@ def parse_args(): parser.add_argument("--iperf_connection_list", type=int, nargs="+", default=[128], help="List of connections to test") parser.add_argument("--iperf3_runtime", type=int, default=4, help="Runtime for iperf3 in seconds") parser.add_argument("--iperf3_congestion", type=str, default="cubic", help="Congestion control algorithm for iperf3") + parser.add_argument("--iperf3_mode", type=str, default="tcp", help="Mode for iperf3") args = parser.parse_args() # filter by valid regions @@ -43,7 +45,7 @@ def parse_args(): def main(args): data_dir = skylark_root / "data" - log_dir = data_dir / "logs" / "throughput" / datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + log_dir = data_dir / "logs" / f"throughput_{args.iperf3_mode}" / datetime.now().strftime("%Y-%m-%d_%H-%M-%S") log_dir.mkdir(exist_ok=True, parents=True) aws = AWSCloudProvider() @@ -77,36 +79,64 @@ def main(args): instance_list.extend([i for ilist in gcp_standard_instances.values() for i in ilist]) def setup(instance: Server): - instance.run_command("sudo apt-get update") - instance.run_command("sudo apt-get install -y iperf3") - instance.run_command("pkill iperf3") - instance.run_command("iperf3 -s -D") + instance.run_command("(sudo apt-get update && sudo apt-get install -y iperf3 nuttcp); pkill iperf3 nuttcp") + if args.iperf3_mode == "tcp": + instance.run_command("iperf3 -s -D -J") + else: + instance.run_command("nuttcp -S -u -w4m -l1460") if args.iperf3_congestion == "bbr": - instance.run_command("sudo sysctl -w net.ipv4.tcp_congestion_control=bbr") - instance.run_command("sudo sysctl -w net.core.default_qdisc=fq") - instance.run_command("sudo sysctl -w net.ipv4.tcp_available_congestion_control=bbr") + instance.run_command( + "sudo sysctl -w net.ipv4.tcp_congestion_control=bbr; sudo sysctl -w net.core.default_qdisc=fq; sudo sysctl -w net.ipv4.tcp_available_congestion_control=bbr" + ) do_parallel(setup, instance_list, progress_bar=True, n=24, desc="Setup") # start iperf3 clients on each pair of instances def start_iperf3_client(arg_pair: Tuple[Server, Server]): instance_src, instance_dst = arg_pair - stdout, stderr = instance_src.run_command( - f"iperf3 -J -C {args.iperf3_congestion} -t {args.iperf3_runtime} -P 32 -c {instance_dst.public_ip()}" - ) - try: - result = json.loads(stdout) - except json.JSONDecodeError: - logger.error(f"({instance_src.region_tag} -> {instance_dst.region_tag}) iperf3 client failed: {stdout} {stderr}") - return None - throughput_sent = result["end"]["sum_sent"]["bits_per_second"] - throughput_received = result["end"]["sum_received"]["bits_per_second"] - tqdm.write( - f"({instance_src.region_tag}:{instance_src.network_tier()} -> {instance_dst.region_tag}:{instance_dst.network_tier()}) is {throughput_sent / 1e9:0.2f} Gbps" - ) + if args.iperf3_mode == "tcp": + stdout, stderr = instance_src.run_command( + f"iperf3 -J -Z -C {args.iperf3_congestion} -t {args.iperf3_runtime} -P 32 -c {instance_dst.public_ip()}" + ) + try: + result = json.loads(stdout) + except json.JSONDecodeError: + logger.error(f"({instance_src.region_tag} -> {instance_dst.region_tag}) iperf3 client failed: {stdout} {stderr}") + return None + + throughput_sent = result["end"]["sum_sent"]["bits_per_second"] + throughput_received = result["end"]["sum_received"]["bits_per_second"] + cpu_utilization = result["end"]["cpu_utilization_percent"]["host_total"] + tqdm.write( + f"({instance_src.region_tag}:{instance_src.network_tier()} -> {instance_dst.region_tag}:{instance_dst.network_tier()}) is {throughput_sent / 1e9:0.2f} Gbps" + ) + out_rec = dict(throughput_sent=throughput_sent, throughput_received=throughput_received, cpu_utilization=cpu_utilization) + elif args.iperf3_mode == "udp": + stdout, stderr = instance_src.run_command(f"nuttcp -uu -l1460 -w4m -T {args.iperf3_runtime} {instance_dst.public_ip()}") + # example return: + # " 659.1631 MB / 2.00 sec = 2764.6834 Mbps 99 %TX 53 %RX 36045 / 711028 drop/pkt 5.07 %loss" + # " 208.2891 MB / 4.00 sec = 436.7620 Mbps 11 %TX 8 %RX 0 / 213288 drop/pkt 0.00 %loss" + # " 1490.9326 MB / 4.00 sec = 3126.9937 Mbps 99 %TX 55 %RX 179 / 1526894 drop/pkt 0.01172 %loss" + # returns: data_sent, runtime, throughput, cpu_tx, cpu_rx, dropped_packets, total_packets, loss_percent + regex = r"\s*(?P\d+\.\d+)\s*MB\s*/\s*(?P\d+\.\d+)\s*sec\s*=\s*(?P\d+\.\d+)\s*Mbps\s*(?P\d+)\s*%TX\s*(?P\d+)\s*%RX\s*(?P\d+)\s*\/\s*(?P\d+)\s*drop/pkt\s*(?P\d+\.?\d*)\s*%loss" + match = re.search(regex, stdout) + if match is None: + logger.error(f"({instance_src.region_tag} -> {instance_dst.region_tag}) nuttcp client failed: {stdout} {stderr}") + return None + else: + out_rec = match.groupdict() + out_rec["throughput_mbps"] = float(out_rec["throughput_mbps"]) + out_rec["cpu_tx"] = float(out_rec["cpu_tx"]) + out_rec["cpu_rx"] = float(out_rec["cpu_rx"]) + out_rec["dropped_packets"] = float(out_rec["dropped_packets"]) + out_rec["total_packets"] = float(out_rec["total_packets"]) + out_rec["loss_percent"] = float(out_rec["loss_percent"]) + tqdm.write( + f"({instance_src.region_tag}:{instance_src.network_tier()} -> {instance_dst.region_tag}:{instance_dst.network_tier()}) is {out_rec['throughput_mbps'] / 1e3:0.2f} Gbps w/ loss {out_rec['loss_percent']:0.2f}%" + ) instance_src.close_server() instance_dst.close_server() - return throughput_sent, throughput_received, result + return out_rec throughput_results = [] instance_pairs = [(i1, i2) for i1 in instance_list for i2 in instance_list if i1 != i2] @@ -131,9 +161,9 @@ def start_iperf3_client(arg_pair: Tuple[Server, Server]): "dst_instance_class": pair[1].instance_class(), "src_network_tier": pair[0].network_tier(), "dst_network_tier": pair[1].network_tier(), - "throughput_sent": result[0], - "throughput_received": result[1], } + if result is not None: + result_rec.update(result) throughput_results.append(result_rec) throughput_dir = data_dir / "throughput" / "iperf3" diff --git a/skylark/compute/server.py b/skylark/compute/server.py index 0ce84daae..c206bc40a 100644 --- a/skylark/compute/server.py +++ b/skylark/compute/server.py @@ -124,8 +124,10 @@ def is_up(): except Exception as e: logger.debug(f"wait_for_ready exception: {e}") return False - ping_return = subprocess.run(["ping", "-c", "1", ip], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - return ping_return.returncode == 0 + if ip is not None: + ping_return = subprocess.run(["ping", "-c", "1", ip], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + return ping_return.returncode == 0 + return False wait_for(is_up, timeout=timeout, interval=interval)