Skip to content

Commit

Permalink
Revert "Remove loguru format"
Browse files Browse the repository at this point in the history
This reverts commit 8ee89e5.
  • Loading branch information
parasj committed Jan 28, 2022
1 parent 634de7e commit 8e89096
Show file tree
Hide file tree
Showing 27 changed files with 153 additions and 152 deletions.
12 changes: 6 additions & 6 deletions nb/12-11-2021_get_transfer_cost.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@
"for src_region, region_data in data[\"regions\"].items():\n",
" src = region_name_map.get(src_region)\n",
" if src is None:\n",
" print("[INFO]\t" + f\"Missing region {src} ({src_region})\")\n",
" print("[INFO]" + f\"Missing region {src} ({src_region})\")\n",
" unparsed_regions.append(src_region)\n",
" else:\n",
" for dst_region, dst_region_data in region_data.items():\n",
Expand All @@ -178,7 +178,7 @@
" if match.group(\"volume\") == \"Next 10 TB\":\n",
" cost_per_gb.append(dict(src=src, dst=\"internet\", cost=cost))\n",
" else:\n",
" print("[ERROR]\t" + f\"Could not parse {dst_region}\")\n",
" print("[ERROR]" + f\"Could not parse {dst_region}\")\n",
" elif dst_region.startswith(\"DataTransfer InterRegion Outbound to\"):\n",
" regex = re.compile(r\"DataTransfer InterRegion Outbound to (?P<dst_region>.*)\")\n",
" match = regex.search(dst_region)\n",
Expand All @@ -189,7 +189,7 @@
" else:\n",
" cost_per_gb.append(dict(src=src, dst=dst, cost=cost))\n",
" else:\n",
" print("[ERROR]\t" + f\"Could not parse {dst_region}\")\n",
" print("[ERROR]" + f\"Could not parse {dst_region}\")\n",
" elif (\n",
" dst_region.startswith(\"Cloudfrontless\")\n",
" or dst_region.startswith(\"DirectoryService\")\n",
Expand Down Expand Up @@ -253,7 +253,7 @@
" )\n",
" )\n",
" else:\n",
" print("[ERROR]\t" + f'Could not parse {row[\"SKU description\"]}')\n",
" print("[ERROR]" + f'Could not parse {row[\"SKU description\"]}')\n",
" elif \"Internet\" in row[\"SKU description\"] and \"from\" in row[\"SKU description\"] and \"to\" not in row[\"SKU description\"]:\n",
" regex = re.compile(r\"Network Internet(?P<tier>.*) Egress from (?P<region>.*)\")\n",
" match = regex.search(row[\"SKU description\"])\n",
Expand All @@ -272,7 +272,7 @@
" )\n",
" )\n",
" else:\n",
" print("[ERROR]\t" + f'Could not parse {row[\"SKU description\"]}')\n",
" print("[ERROR]" + f'Could not parse {row[\"SKU description\"]}')\n",
" elif \"from\" in row[\"SKU description\"] and \"to\" in row[\"SKU description\"]:\n",
" regex = re.compile(r\".*from (?P<from>.*) to (?P<to>.*)\")\n",
" match = regex.search(row[\"SKU description\"])\n",
Expand All @@ -291,7 +291,7 @@
" )\n",
" )\n",
" else:\n",
" print("[ERROR]\t" + f'Could not parse {row[\"SKU description\"]}')\n",
" print("[ERROR]" + f'Could not parse {row[\"SKU description\"]}')\n",
" else:\n",
" out_rows.append(row)\n",
"df_gcp_bw = pd.DataFrame(bw_tuples)\n",
Expand Down
4 changes: 2 additions & 2 deletions skylark/benchmark/network/latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ def parse_ping_result(string):
m = re.search(regex, string)
return dict(min=float(m.group("min")), avg=float(m.group("avg")), max=float(m.group("max")), mdev=float(m.group("mdev")))
except Exception as e:
print("[EXCEPTION]\t" + e)
print("[EXCEPTION]" + e)
return {}

# save results
latency_results_out = []
for (i1, i2), r in latency_results:
row = dict(src=i1.region_tag, dst=i2.region_tag, ping_str=r, **parse_ping_result(r))
print("[INFO]\t" + row)
print("[INFO]" + row)
latency_results_out.append(row)

with open(str(data_dir / "latency.json"), "w") as f:
Expand Down
14 changes: 7 additions & 7 deletions skylark/benchmark/network/throughput.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def parse_args():

def main(args):
experiment_tag = os.popen("bash scripts/utils/get_random_word_hash.sh").read()
print("[INFO]\t" + f"Experiment tag: {experiment_tag}")
print("[INFO]" + f"Experiment tag: {experiment_tag}")

data_dir = skylark_root / "data"
log_dir = data_dir / "logs" / f"throughput_{args.iperf3_mode}" / f"{experiment_tag}"
Expand All @@ -81,7 +81,7 @@ def main(args):
instance_list.extend([i for ilist in gcp_instances.values() for i in ilist])

if args.gcp_test_standard_network:
print("[INFO]\t" + f"Provisioning standard GCP instances")
print("[INFO]" + f"Provisioning standard GCP instances")
_, gcp_standard_instances = provision(
aws=aws,
azure=azure,
Expand Down Expand Up @@ -122,7 +122,7 @@ def start_iperf3_client(arg_pair: Tuple[Server, Server]):
try:
result = json.loads(stdout)
except json.JSONDecodeError:
print("[ERROR]\t" + f"({instance_src.region_tag} -> {instance_dst.region_tag}) iperf3 client failed: {stdout} {stderr}")
print("[ERROR]" + f"({instance_src.region_tag} -> {instance_dst.region_tag}) iperf3 client failed: {stdout} {stderr}")
return None

throughput_sent = result["end"]["sum_sent"]["bits_per_second"]
Expand All @@ -139,7 +139,7 @@ def start_iperf3_client(arg_pair: Tuple[Server, Server]):
regex = r"\s*(?P<data_sent>\d+\.\d+)\s*MB\s*/\s*(?P<runtime>\d+\.\d+)\s*sec\s*=\s*(?P<throughput_mbps>\d+\.\d+)\s*Mbps\s*(?P<cpu_tx>\d+)\s*%TX\s*(?P<cpu_rx>\d+)\s*%RX\s*(?P<dropped_packets>\d+)\s*\/\s*(?P<total_packets>\d+)\s*drop/pkt\s*(?P<loss_percent>\d+\.?\d*)\s*%loss"
match = re.search(regex, stdout)
if match is None:
print("[ERROR]\t" + f"({instance_src.region_tag} -> {instance_dst.region_tag}) nuttcp client failed: {stdout} {stderr}")
print("[ERROR]" + f"({instance_src.region_tag} -> {instance_dst.region_tag}) nuttcp client failed: {stdout} {stderr}")
return None
else:
out_rec = match.groupdict()
Expand All @@ -161,12 +161,12 @@ def start_iperf3_client(arg_pair: Tuple[Server, Server]):
print(instance_pairs)
groups = split_list(instance_pairs)
for group_idx, group in enumerate(groups):
print("[INFO]\t" + f"Group {group_idx}:")
print("[INFO]" + f"Group {group_idx}:")
for instance_pair in group:
print("[INFO]\t" + f"\t{instance_pair[0].region_tag} -> {instance_pair[1].region_tag}")
print("[INFO]" + f"\t{instance_pair[0].region_tag} -> {instance_pair[1].region_tag}")

if not questionary.confirm("Launch experiment?").ask():
print("[ERROR]\t" + "Exiting")
print("[ERROR]" + "Exiting")
sys.exit(1)

with tqdm(total=len(instance_pairs), desc="Total throughput evaluation") as pbar:
Expand Down
4 changes: 2 additions & 2 deletions skylark/benchmark/network/traceroute.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def main(args):
instance_list: List[Server] = [i for ilist in aws_instances.values() for i in ilist]
instance_list.extend([i for ilist in gcp_instances.values() for i in ilist])
if args.gcp_test_standard_network:
print("[INFO]\t" + f"Provisioning standard GCP instances")
print("[INFO]" + f"Provisioning standard GCP instances")
_, gcp_standard_instances = provision(
aws=aws,
gcp=gcp,
Expand All @@ -83,7 +83,7 @@ def setup(instance: Server):
def run_traceroute(arg_pair: Tuple[Server, Server]):
instance_src, instance_dst = arg_pair
src_ip, dst_ip = instance_src.public_ip, instance_dst.public_ip
print("[INFO]\t" + f"traceroute {instance_src.region_tag} -> {instance_dst.region_tag} ({dst_ip})")
print("[INFO]" + f"traceroute {instance_src.region_tag} -> {instance_dst.region_tag} ({dst_ip})")
stdout, stderr = instance_src.run_command(f"traceroute {dst_ip}")

parser = TracerouteParser()
Expand Down
12 changes: 6 additions & 6 deletions skylark/benchmark/replicate/benchmark_triangles.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def bench_triangle(
rc.provision_gateways(reuse_instances=False)
atexit.register(rc.deprovision_gateways)
for node, gw in rc.bound_nodes.items():
print("[INFO]\t" + f"Provisioned {node}: {gw.gateway_log_viewer_url}")
print("[INFO]" + f"Provisioned {node}: {gw.gateway_log_viewer_url}")

job = ReplicationJob(
source_region=src_region,
Expand All @@ -67,14 +67,14 @@ def bench_triangle(

total_bytes = n_chunks * chunk_size_mb * MB
job = rc.run_replication_plan(job)
print("[INFO]\t" + f"{total_bytes / GB:.2f}GByte replication job launched")
print("[INFO]" + f"{total_bytes / GB:.2f}GByte replication job launched")
stats = rc.monitor_transfer(job, show_pbar=False, time_limit_seconds=600)
stats["success"] = True
stats["log"] = rc.get_chunk_status_log_df()
rc.deprovision_gateways()
except Exception as e:
print("[ERROR]\t" + f"Failed to benchmark triangle {src_region} -> {dst_region}")
print("[EXCEPTION]\t" + (e)
print("[ERROR]" + f"Failed to benchmark triangle {src_region} -> {dst_region}")
print("[EXCEPTION]" + (e)

stats = {}
stats["error"] = str(e)
Expand All @@ -88,10 +88,10 @@ def bench_triangle(
stats["chunk_size_mb"] = chunk_size_mb
stats["n_chunks"] = n_chunks

print("[INFO]\t" + f"Stats:")
print("[INFO]" + f"Stats:")
for k, v in stats.items():
if k not in ["log", "completed_chunk_ids"]:
print("[INFO]\t" + f"\t{k}: {v}")
print("[INFO]" + f"\t{k}: {v}")

# compute hash of src_region, dst_region, inter_region, num_gateways, num_outgoing_connections, chunk_size_mb, n_chunks
arg_hash = hash((src_region, dst_region, inter_region, num_gateways, num_outgoing_connections, chunk_size_mb, n_chunks))
Expand Down
8 changes: 4 additions & 4 deletions skylark/benchmark/replicate/test_direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def setup(tup):
server.run_command("sudo apt-get update && sudo apt-get install -y iperf3")
docker_installed = "Docker version" in server.run_command(f"sudo docker --version")[0]
if not docker_installed:
print("[DEBUG]\t" + f"[{server.region_tag}] Installing docker")
print("[DEBUG]" + f"[{server.region_tag}] Installing docker")
server.run_command("curl -fsSL https://get.docker.com -o get-docker.sh && sudo sh get-docker.sh")
out, err = server.run_command("sudo docker run --rm hello-world")
assert "Hello from Docker!" in out
Expand All @@ -47,9 +47,9 @@ def parse_output(output):
try:
return json.loads(last_line[-1])
except json.decoder.JSONDecodeError:
print("[ERROR]\t" + f"JSON parse error, stdout = '{stdout}', stderr = '{stderr}'")
print("[ERROR]" + f"JSON parse error, stdout = '{stdout}', stderr = '{stderr}'")
else:
print("[ERROR]\t" + f"No output from server, stderr = {stderr}")
print("[ERROR]" + f"No output from server, stderr = {stderr}")
return None


Expand Down Expand Up @@ -121,7 +121,7 @@ def main(args):
client_cmd = f"sudo docker run --rm --ipc=host --network=host --name=gateway_client {args.gateway_docker_image} /env/bin/python /pkg/skylark/replicate/gateway_client.py --dst_host {dst_server.public_ip} --dst_port 3333 --chunk_id 1"
dst_data = parse_output(src_server.run_command(client_cmd))
src_server.run_command("sudo docker kill gateway_server")
print("[INFO]\t" + f"Src to dst copy: {dst_data}")
print("[INFO]" + f"Src to dst copy: {dst_data}")

# run iperf server on dst
out, err = dst_server.run_command("iperf3 -s -D")
Expand Down
6 changes: 3 additions & 3 deletions skylark/benchmark/stop_all_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ def stop_instance(instance: Server):
instances = []

if not args.disable_aws:
print("[INFO]\t" + "Getting matching AWS instances")
print("[INFO]" + "Getting matching AWS instances")
aws = AWSCloudProvider()
for _, instance_list in do_parallel(aws.get_matching_instances, aws.region_list(), progress_bar=True):
instances += instance_list

if args.gcp_project:
print("[INFO]\t" + "Getting matching GCP instances")
print("[INFO]" + "Getting matching GCP instances")
gcp = GCPCloudProvider(gcp_project=args.gcp_project)
instances += gcp.get_matching_instances()

if args.azure_subscription:
print("[INFO]\t" + "Getting matching Azure instances")
print("[INFO]" + "Getting matching Azure instances")
azure = AzureCloudProvider(azure_subscription=args.azure_subscription)
instances += azure.get_matching_instances()

Expand Down
12 changes: 6 additions & 6 deletions skylark/benchmark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def provision(
# TODO: It might be significantly faster to provision AWS, Azure, and GCP concurrently (e.g., using threads)

if len(aws_regions_to_provision) > 0:
print("[INFO]\t" + f"Provisioning AWS instances in {aws_regions_to_provision}")
print("[INFO]" + f"Provisioning AWS instances in {aws_regions_to_provision}")
aws_instance_filter = {
"tags": {"skylark": "true"},
"instance_type": aws_instance_class,
Expand All @@ -67,15 +67,15 @@ def provision(
aws_instances = refresh_instance_list(aws, aws_regions_to_provision, aws_instance_filter)
missing_aws_regions = set(aws_regions_to_provision) - set(aws_instances.keys())
if missing_aws_regions:
print("[INFO]\t" + f"(AWS) provisioning missing regions: {missing_aws_regions}")
print("[INFO]" + f"(AWS) provisioning missing regions: {missing_aws_regions}")
aws_provisioner = lambda r: aws.provision_instance(r, aws_instance_class)
results = do_parallel(aws_provisioner, missing_aws_regions, progress_bar=True, desc="provision aws")
for region, result in results:
aws_instances[region] = [result]
aws_instances = refresh_instance_list(aws, aws_regions_to_provision, aws_instance_filter)

if len(azure_regions_to_provision) > 0:
print("[INFO]\t" + f"Provisioning Azure instances in {azure_regions_to_provision}")
print("[INFO]" + f"Provisioning Azure instances in {azure_regions_to_provision}")
azure_instance_filter = {
"tags": {"skylark": "true"},
"instance_type": azure_instance_class,
Expand All @@ -84,7 +84,7 @@ def provision(
azure_instances = refresh_instance_list(azure, azure_regions_to_provision, azure_instance_filter)
missing_azure_regions = set(azure_regions_to_provision) - set(azure_instances.keys())
if missing_azure_regions:
print("[INFO]\t" + f"(Azure) provisioning missing regions: {missing_azure_regions}")
print("[INFO]" + f"(Azure) provisioning missing regions: {missing_azure_regions}")
azure_provisioner = lambda r: azure.provision_instance(r, azure_instance_class)
results = do_parallel(azure_provisioner, missing_azure_regions, progress_bar=True, desc="provision Azure")
for region, result in results:
Expand All @@ -93,7 +93,7 @@ def provision(
azure_instances = refresh_instance_list(azure, azure_regions_to_provision, azure_instance_filter)

if len(gcp_regions_to_provision) > 0:
print("[INFO]\t" + f"Provisioning GCP instances in {gcp_regions_to_provision}")
print("[INFO]" + f"Provisioning GCP instances in {gcp_regions_to_provision}")
gcp_instance_filter = {
"tags": {"skylark": "true"},
"instance_type": gcp_instance_class,
Expand All @@ -106,7 +106,7 @@ def provision(
gcp_instances = refresh_instance_list(gcp, gcp_regions_to_provision, gcp_instance_filter)
missing_gcp_regions = set(gcp_regions_to_provision) - set(gcp_instances.keys())
if missing_gcp_regions:
print("[INFO]\t" + f"(GCP) provisioning missing regions: {missing_gcp_regions}")
print("[INFO]" + f"(GCP) provisioning missing regions: {missing_gcp_regions}")
gcp_provisioner = lambda r: gcp.provision_instance(r, gcp_instance_class, premium_network=gcp_use_premium_network)
results = do_parallel(
gcp_provisioner, missing_gcp_regions, progress_bar=True, desc=f"provision GCP (premium network = {gcp_use_premium_network})"
Expand Down
Loading

0 comments on commit 8e89096

Please sign in to comment.