Skip to content

Commit

Permalink
???
Browse files Browse the repository at this point in the history
Co-authored-by: Sam Kumar <[email protected]>
  • Loading branch information
parasj and samkumar committed Jan 28, 2022
1 parent 12e4115 commit 634de7e
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 28 deletions.
36 changes: 35 additions & 1 deletion nb/01-27-2022_debug_status_df.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,46 @@
"print(completed_chunk_ids)"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'sudo sysctl -w net.core.rmem_max=2147483647 net.core.wmem_max=2147483647 net.ipv4.tcp_rmem=4096 87380 1073741824 net.ipv4.tcp_wmem=4096 65536 1073741824'"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sysctl_updates = {\n",
" \"net.core.rmem_max\": 2147483647,\n",
" \"net.core.wmem_max\": 2147483647,\n",
" \"net.ipv4.tcp_rmem\": \"'4096 87380 1073741824'\",\n",
" \"net.ipv4.tcp_wmem\": \"'4096 65536 1073741824'\",\n",
"}\n",
"\"sudo sysctl -w {}\".format(\" \".join(f\"{k}={v}\" for k, v in sysctl_updates.items()))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
"source": [
"sysctl_updates = {\n",
" \"net.core.rmem_max\": 2147483647,\n",
" \"net.core.wmem_max\": 2147483647,\n",
" \"net.ipv4.tcp_rmem\": \"2 4 6\",\n",
" \"net.ipv4.tcp_wmem\": \"2 4 6\",\n",
"}\n",
"\"sudo sysctl -w {}\".format(\" \".join(f\"{k}={v}\" for k, v in sysctl_updates.items()))"
]
}
],
"metadata": {
Expand Down
16 changes: 11 additions & 5 deletions skylark/cli/experiments/throughput.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ def throughput_grid(
iperf3_runtime: int = typer.Option(5, help="Runtime for iperf3 in seconds"),
iperf3_connections: int = typer.Option(64, help="Number of connections to test"),
):
def check_stderr(tup):
assert tup[1].strip() == "", f"Command failed, err: {tup[1]}"

config = load_config()
gcp_project = gcp_project or config.get("gcp_project_id")
azure_subscription = azure_subscription or config.get("azure_subscription_id")
Expand Down Expand Up @@ -196,12 +199,15 @@ def throughput_grid(
# setup instances
def setup(server: Server):
sysctl_updates = {
"net.core.rmem_max": 2147483647,
"net.core.wmem_max": 2147483647,
"net.ipv4.tcp_rmem": "4096 87380 1073741824",
"net.ipv4.tcp_wmem": "4096 65536 1073741824",
"net.core.rmem_max": 134217728, # from 212992
"net.core.wmem_max": 134217728, # from 212992
"net.ipv4.tcp_rmem": "4096 87380 67108864", # from "4096 131072 6291456"
"net.ipv4.tcp_wmem": "4096 65536 67108864", # from "4096 16384 4194304"
"net.core.somaxconn": 65535,
"fs.file-max": 1024 * 1024 * 1024,
"net.ipv4.tcp_congestion_control": "cubic",
}
server.run_command("sudo sysctl -w {}".format(" ".join(f"{k}={v}" for k, v in sysctl_updates.items())))
check_stderr(server.run_command("sudo sysctl -w {}".format(" ".join(f"\"{k}={v}\"" for k, v in sysctl_updates.items()))))
server.run_command("(sudo apt-get update && sudo apt-get install -y iperf3); pkill iperf3; iperf3 -s -D -J")

do_parallel(setup, instance_list, progress_bar=True, n=-1, desc="Setup")
Expand Down
14 changes: 4 additions & 10 deletions skylark/compute/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,11 @@ def check_stderr(tup):
print("[DEBUG]\t" + desc_prefix + ": Installing docker")

# increase TCP connections, enable BBR optionally and raise file limits
n_sockets = 64
sysctl_updates = {
# "net.core.rmem_max": int(2147483647 // n_sockets),
# "net.core.wmem_max": int(2147483647 // n_sockets),
# "net.ipv4.tcp_rmem": "4096 87380 {}".format(int(1073741824 / n_sockets)), # todo should use number of incoming connections
# "net.ipv4.tcp_wmem": "4096 87380 {}".format(int(1073741824 / n_sockets)),
"net.core.rmem_max": "212992",
"net.core.wmem_max": "212992",
"net.ipv4.tcp_rmem": "4096 131072 6291456",
"net.ipv4.tcp_wmem": "4096 16384 4194304",
"net.core.rmem_max": 134217728, # from 212992
"net.core.wmem_max": 134217728, # from 212992
"net.ipv4.tcp_rmem": "4096 87380 67108864", # from "4096 131072 6291456"
"net.ipv4.tcp_wmem": "4096 65536 67108864", # from "4096 16384 4194304"
"net.core.somaxconn": 65535,
"fs.file-max": 1024 * 1024 * 1024,
}
Expand All @@ -217,7 +212,6 @@ def check_stderr(tup):
sysctl_updates["net.ipv4.tcp_congestion_control"] = "bbr"
else:
sysctl_updates["net.ipv4.tcp_congestion_control"] = "cubic"

check_stderr(self.run_command("sudo sysctl -w {}".format(" ".join(f'"{k}={v}"' for k, v in sysctl_updates.items()))))

# install docker and launch monitoring
Expand Down
20 changes: 8 additions & 12 deletions skylark/test/test_replicator_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,13 @@ def main(args):
obj_store_interface_src.create_bucket()
obj_store_interface_dst.create_bucket()

# TODO: fix this to get the key instead of S3Object
if not args.skip_upload:
# todo implement object store support
# pass
print("Not skipping upload...", src_bucket, dst_bucket)

# TODO: fix this to get the key instead of S3Object
matching_src_keys = list([obj.key for obj in obj_store_interface_src.list_objects(prefix=args.key_prefix)])
matching_dst_keys = list([obj.key for obj in obj_store_interface_dst.list_objects(prefix=args.key_prefix)])
if matching_src_keys:
if matching_src_keys and not args.skip_upload:
print("[WARN]\t" + f"Deleting {len(matching_src_keys)} objects from source bucket")
obj_store_interface_src.delete_objects(matching_src_keys)
if matching_dst_keys:
print("[WARN]\t" + f"Deleting {len(matching_dst_keys)} objects from destination bucket")
obj_store_interface_dst.delete_objects(matching_dst_keys)


# create test objects w/ random data
print("[INFO]\t" + "Creating test objects")
obj_keys = []
Expand All @@ -123,6 +115,11 @@ def main(args):
print("[INFO]\t" + f"Uploading {len(obj_keys)} to bucket {src_bucket}")
concurrent.futures.wait(futures)

matching_dst_keys = list([obj.key for obj in obj_store_interface_dst.list_objects(prefix=args.key_prefix)])
if matching_dst_keys:
print("[WARN]\t" + f"Deleting {len(matching_dst_keys)} objects from destination bucket")
obj_store_interface_dst.delete_objects(matching_dst_keys)

# cleanup temp files once done
for f in tmp_files:
os.remove(f)
Expand Down Expand Up @@ -168,7 +165,6 @@ def main(args):
dest_region=args.dest_region,
dest_bucket=dst_bucket,
objs=obj_keys,
random_chunk_size_mb=args.chunk_size_mb,
)

total_bytes = args.n_chunks * args.chunk_size_mb * MB
Expand Down

0 comments on commit 634de7e

Please sign in to comment.