Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UDP benchmark #27

Merged
merged 4 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 54 additions & 24 deletions skylark/benchmark/network/throughput.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
from datetime import datetime
from typing import List, Tuple
import re

from loguru import logger
from tqdm import tqdm
Expand Down Expand Up @@ -32,6 +33,7 @@ def parse_args():
parser.add_argument("--iperf_connection_list", type=int, nargs="+", default=[128], help="List of connections to test")
parser.add_argument("--iperf3_runtime", type=int, default=4, help="Runtime for iperf3 in seconds")
parser.add_argument("--iperf3_congestion", type=str, default="cubic", help="Congestion control algorithm for iperf3")
parser.add_argument("--iperf3_mode", type=str, default="tcp", help="Mode for iperf3")
args = parser.parse_args()

# filter by valid regions
Expand All @@ -43,7 +45,7 @@ def parse_args():

def main(args):
data_dir = skylark_root / "data"
log_dir = data_dir / "logs" / "throughput" / datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_dir = data_dir / "logs" / f"throughput_{args.iperf3_mode}" / datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_dir.mkdir(exist_ok=True, parents=True)

aws = AWSCloudProvider()
Expand Down Expand Up @@ -77,36 +79,64 @@ def main(args):
instance_list.extend([i for ilist in gcp_standard_instances.values() for i in ilist])

def setup(instance: Server):
instance.run_command("sudo apt-get update")
instance.run_command("sudo apt-get install -y iperf3")
instance.run_command("pkill iperf3")
instance.run_command("iperf3 -s -D")
instance.run_command("(sudo apt-get update && sudo apt-get install -y iperf3 nuttcp); pkill iperf3 nuttcp")
if args.iperf3_mode == "tcp":
instance.run_command("iperf3 -s -D -J")
else:
instance.run_command("nuttcp -S -u -w4m -l1460")
if args.iperf3_congestion == "bbr":
instance.run_command("sudo sysctl -w net.ipv4.tcp_congestion_control=bbr")
instance.run_command("sudo sysctl -w net.core.default_qdisc=fq")
instance.run_command("sudo sysctl -w net.ipv4.tcp_available_congestion_control=bbr")
instance.run_command(
"sudo sysctl -w net.ipv4.tcp_congestion_control=bbr; sudo sysctl -w net.core.default_qdisc=fq; sudo sysctl -w net.ipv4.tcp_available_congestion_control=bbr"
)

do_parallel(setup, instance_list, progress_bar=True, n=24, desc="Setup")

# start iperf3 clients on each pair of instances
def start_iperf3_client(arg_pair: Tuple[Server, Server]):
instance_src, instance_dst = arg_pair
stdout, stderr = instance_src.run_command(
f"iperf3 -J -C {args.iperf3_congestion} -t {args.iperf3_runtime} -P 32 -c {instance_dst.public_ip()}"
)
try:
result = json.loads(stdout)
except json.JSONDecodeError:
logger.error(f"({instance_src.region_tag} -> {instance_dst.region_tag}) iperf3 client failed: {stdout} {stderr}")
return None
throughput_sent = result["end"]["sum_sent"]["bits_per_second"]
throughput_received = result["end"]["sum_received"]["bits_per_second"]
tqdm.write(
f"({instance_src.region_tag}:{instance_src.network_tier()} -> {instance_dst.region_tag}:{instance_dst.network_tier()}) is {throughput_sent / 1e9:0.2f} Gbps"
)
if args.iperf3_mode == "tcp":
stdout, stderr = instance_src.run_command(
f"iperf3 -J -Z -C {args.iperf3_congestion} -t {args.iperf3_runtime} -P 32 -c {instance_dst.public_ip()}"
)
try:
result = json.loads(stdout)
except json.JSONDecodeError:
logger.error(f"({instance_src.region_tag} -> {instance_dst.region_tag}) iperf3 client failed: {stdout} {stderr}")
return None

throughput_sent = result["end"]["sum_sent"]["bits_per_second"]
throughput_received = result["end"]["sum_received"]["bits_per_second"]
cpu_utilization = result["end"]["cpu_utilization_percent"]["host_total"]
tqdm.write(
f"({instance_src.region_tag}:{instance_src.network_tier()} -> {instance_dst.region_tag}:{instance_dst.network_tier()}) is {throughput_sent / 1e9:0.2f} Gbps"
)
out_rec = dict(throughput_sent=throughput_sent, throughput_received=throughput_received, cpu_utilization=cpu_utilization)
elif args.iperf3_mode == "udp":
stdout, stderr = instance_src.run_command(f"nuttcp -uu -l1460 -w4m -T {args.iperf3_runtime} {instance_dst.public_ip()}")
# example return:
# " 659.1631 MB / 2.00 sec = 2764.6834 Mbps 99 %TX 53 %RX 36045 / 711028 drop/pkt 5.07 %loss"
# " 208.2891 MB / 4.00 sec = 436.7620 Mbps 11 %TX 8 %RX 0 / 213288 drop/pkt 0.00 %loss"
# " 1490.9326 MB / 4.00 sec = 3126.9937 Mbps 99 %TX 55 %RX 179 / 1526894 drop/pkt 0.01172 %loss"
# returns: data_sent, runtime, throughput, cpu_tx, cpu_rx, dropped_packets, total_packets, loss_percent
regex = r"\s*(?P<data_sent>\d+\.\d+)\s*MB\s*/\s*(?P<runtime>\d+\.\d+)\s*sec\s*=\s*(?P<throughput_mbps>\d+\.\d+)\s*Mbps\s*(?P<cpu_tx>\d+)\s*%TX\s*(?P<cpu_rx>\d+)\s*%RX\s*(?P<dropped_packets>\d+)\s*\/\s*(?P<total_packets>\d+)\s*drop/pkt\s*(?P<loss_percent>\d+\.?\d*)\s*%loss"
match = re.search(regex, stdout)
if match is None:
logger.error(f"({instance_src.region_tag} -> {instance_dst.region_tag}) nuttcp client failed: {stdout} {stderr}")
return None
else:
out_rec = match.groupdict()
out_rec["throughput_mbps"] = float(out_rec["throughput_mbps"])
out_rec["cpu_tx"] = float(out_rec["cpu_tx"])
out_rec["cpu_rx"] = float(out_rec["cpu_rx"])
out_rec["dropped_packets"] = float(out_rec["dropped_packets"])
out_rec["total_packets"] = float(out_rec["total_packets"])
out_rec["loss_percent"] = float(out_rec["loss_percent"])
tqdm.write(
f"({instance_src.region_tag}:{instance_src.network_tier()} -> {instance_dst.region_tag}:{instance_dst.network_tier()}) is {out_rec['throughput_mbps'] / 1e3:0.2f} Gbps w/ loss {out_rec['loss_percent']:0.2f}%"
)
instance_src.close_server()
instance_dst.close_server()
return throughput_sent, throughput_received, result
return out_rec

throughput_results = []
instance_pairs = [(i1, i2) for i1 in instance_list for i2 in instance_list if i1 != i2]
Expand All @@ -131,9 +161,9 @@ def start_iperf3_client(arg_pair: Tuple[Server, Server]):
"dst_instance_class": pair[1].instance_class(),
"src_network_tier": pair[0].network_tier(),
"dst_network_tier": pair[1].network_tier(),
"throughput_sent": result[0],
"throughput_received": result[1],
}
if result is not None:
result_rec.update(result)
throughput_results.append(result_rec)

throughput_dir = data_dir / "throughput" / "iperf3"
Expand Down
6 changes: 4 additions & 2 deletions skylark/compute/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ def is_up():
except Exception as e:
logger.debug(f"wait_for_ready exception: {e}")
return False
ping_return = subprocess.run(["ping", "-c", "1", ip], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return ping_return.returncode == 0
if ip is not None:
ping_return = subprocess.run(["ping", "-c", "1", ip], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return ping_return.returncode == 0
return False

wait_for(is_up, timeout=timeout, interval=interval)

Expand Down