Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit f8f38db
Author: Paras Jain <[email protected]>
Date:   Thu Jan 27 00:50:18 2022 +0000

    Squashed commit of the following:

    commit b227477
    Author: Paras Jain <[email protected]>
    Date:   Wed Jan 26 16:48:59 2022 -0800

        More fixes

    commit 7106059
    Author: Paras Jain <[email protected]>
    Date:   Wed Jan 26 16:46:29 2022 -0800

        Fix ingress limit on solver

    commit dd8e698
    Author: Paras Jain <[email protected]>
    Date:   Wed Jan 26 23:42:20 2022 +0000

        test

    commit 898481c
    Author: Paras Jain <[email protected]>
    Date:   Wed Jan 26 01:44:03 2022 +0000

        transfer works!

    commit 313d2d9
    Author: Paras Jain <[email protected]>
    Date:   Tue Jan 25 22:04:46 2022 +0000

        Unhashable type fix

    commit d970d28
    Author: Paras Jain <[email protected]>
    Date:   Tue Jan 25 22:01:23 2022 +0000

        Fix monitoring

    commit fdb9255
    Author: Paras Jain <[email protected]>
    Date:   Tue Jan 25 21:48:14 2022 +0000

        Initial draft of a gateway with pre-allocated paths
  • Loading branch information
parasj committed Jan 27, 2022
1 parent 615c554 commit 3e1eacd
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 107 deletions.
10 changes: 10 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# syntax=docker/dockerfile:1
FROM python:3.8-slim

# increase number of open files and concurrent TCP connections
RUN echo 'net.ipv4.ip_local_port_range = 12000 65535' >> /etc/sysctl.conf
RUN echo 'fs.file-max = 1048576' >> /etc/sysctl.conf
RUN mkdir -p /etc/security/
RUN echo '* soft nofile 1048576' >> /etc/security/limits.conf
RUN echo '* hard nofile 1048576' >> /etc/security/limits.conf
RUN echo 'root soft nofile 1048576' >> /etc/security/limits.conf
RUN echo 'root hard nofile 1048576' >> /etc/security/limits.conf

COPY scripts/requirements-gateway.txt /tmp/requirements-gateway.txt
RUN --mount=type=cache,target=/root/.cache/pip pip install --no-cache-dir --compile -r /tmp/requirements-gateway.txt && rm -r /tmp/requirements-gateway.txt

Expand Down
98 changes: 95 additions & 3 deletions skylark/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,16 @@ def replicate_random(
inter_region: Optional[str] = typer.Argument(None),
num_gateways: int = 1,
num_outgoing_connections: int = 16,
chunk_size_mb: int = 8,
n_chunks: int = 2048,
total_transfer_size_mb: int = typer.Option(2048, "--size-total-mb", "-s", help="Total transfer size in MB (across n_chunks chunks)"),
n_chunks: int = 512,
reuse_gateways: bool = True,
azure_subscription: Optional[str] = None,
gcp_project: Optional[str] = None,
gateway_docker_image: str = os.environ.get("SKYLARK_DOCKER_IMAGE", "ghcr.io/parasj/skylark:main"),
aws_instance_class: str = "m5.8xlarge",
azure_instance_class: str = "Standard_D32_v5",
gcp_instance_class: Optional[str] = "n2-standard-32",
gcp_use_premium_network: bool = False,
gcp_use_premium_network: bool = True,
key_prefix: str = "/test/replicate_random",
time_limit_seconds: Optional[int] = None,
log_interval_s: float = 1.0,
Expand Down Expand Up @@ -148,6 +148,9 @@ def replicate_random(
for node, gw in rc.bound_nodes.items():
logger.info(f"Provisioned {node}: {gw.gateway_log_viewer_url}")

if total_transfer_size_mb % n_chunks != 0:
logger.warning(f"total_transfer_size_mb ({total_transfer_size_mb}) is not a multiple of n_chunks ({n_chunks})")
chunk_size_mb = total_transfer_size_mb // n_chunks
job = ReplicationJob(
source_region=src_region,
source_bucket=None,
Expand All @@ -174,6 +177,95 @@ def replicate_random(
return 0 if stats["success"] else 1


@app.command()
def replicate_json(
path: Path = typer.Argument(..., exists=True, file_okay=True, dir_okay=False, help="Path to JSON file describing replication plan"),
total_transfer_size_mb: int = typer.Option(2048, "--size-total-mb", "-s", help="Total transfer size in MB (across n_chunks chunks)"),
n_chunks: int = 512,
# bucket options
use_random_data: bool = True,
bucket_prefix: str = "skylark",
key_prefix: str = "/test/replicate_random",
# gateway provisioning options
reuse_gateways: bool = True,
gateway_docker_image: str = os.environ.get("SKYLARK_DOCKER_IMAGE", "ghcr.io/parasj/skylark:main"),
# cloud provider specific options
azure_subscription: Optional[str] = None,
gcp_project: Optional[str] = None,
aws_instance_class: str = "m5.8xlarge",
azure_instance_class: str = "Standard_D32_v5",
gcp_instance_class: Optional[str] = "n2-standard-32",
gcp_use_premium_network: bool = True,
# logging options
time_limit_seconds: Optional[int] = None,
log_interval_s: float = 1.0,
):
"""Replicate objects from remote object store to another remote object store."""
print_header()
config = load_config()
gcp_project = gcp_project or config.get("gcp_project_id")
azure_subscription = azure_subscription or config.get("azure_subscription_id")
logger.debug(f"Loaded gcp_project: {gcp_project}, azure_subscription: {azure_subscription}")
check_ulimit()

with path.open("r") as f:
topo = ReplicationTopology.from_json(f.read())

rc = ReplicatorClient(
topo,
azure_subscription=azure_subscription,
gcp_project=gcp_project,
gateway_docker_image=gateway_docker_image,
aws_instance_class=aws_instance_class,
azure_instance_class=azure_instance_class,
gcp_instance_class=gcp_instance_class,
gcp_use_premium_network=gcp_use_premium_network,
)

if not reuse_gateways:
atexit.register(rc.deprovision_gateways)
else:
logger.warning(
f"Instances will remain up and may result in continued cloud billing. Remember to call `skylark deprovision` to deprovision gateways."
)
rc.provision_gateways(reuse_gateways)
for node, gw in rc.bound_nodes.items():
logger.info(f"Provisioned {node}: {gw.gateway_log_viewer_url}")

if total_transfer_size_mb % n_chunks != 0:
logger.warning(f"total_transfer_size_mb ({total_transfer_size_mb}) is not a multiple of n_chunks ({n_chunks})")
chunk_size_mb = total_transfer_size_mb // n_chunks

if use_random_data:
job = ReplicationJob(
source_region=topo.source_region(),
source_bucket=None,
dest_region=topo.sink_region(),
dest_bucket=None,
objs=[f"{key_prefix}/{i}" for i in range(n_chunks)],
random_chunk_size_mb=chunk_size_mb,
)
else:
raise NotImplementedError()

total_bytes = n_chunks * chunk_size_mb * MB
job = rc.run_replication_plan(job)
logger.info(f"{total_bytes / GB:.2f}GByte replication job launched")
stats = rc.monitor_transfer(
job,
show_pbar=True,
log_interval_s=log_interval_s,
time_limit_seconds=time_limit_seconds,
cancel_pending=False,
)
stats["success"] = stats["monitor_status"] == "completed"
stats["log"] = rc.get_chunk_status_log_df()

out_json = {k: v for k, v in stats.items() if k not in ["log", "completed_chunk_ids"]}
typer.echo(f"\n{json.dumps(out_json)}")
return 0 if stats["success"] else 1


@app.command()
def deprovision(azure_subscription: Optional[str] = None, gcp_project: Optional[str] = None):
"""Deprovision gateways."""
Expand Down
15 changes: 9 additions & 6 deletions skylark/cli/cli_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,24 +69,27 @@ def solve_throughput(
# save results
tput.print_solution(solution)
if solution.is_feasible:
replication_topo = tput.to_replication_topology(solution)
if out:
with open(out, "w") as f:
f.write(replication_topo.to_json())
if visualize:
g = tput.plot_graphviz(solution)
if g is not None:
try:
for f in Path("/tmp/throughput_graph.gv.*").glob("*"):
for f in Path("/tmp/").glob("throughput_graph.gv*"):
f.unlink()
g.render(filename="/tmp/throughput_graph.gv", quiet_view=True, format="pdf")
g.render(filename="/tmp/throughput_graph.gv", format="png")
except FileNotFoundError as e:
logger.error(f"Could not render graph: {e}")
replication_topo = tput.to_replication_topology(solution)
if out:
with open(out, "w") as f:
f.write(replication_topo.to_json())
if visualize:
g_rt = replication_topo.to_graphviz()
if g_rt is not None:
try:
for f in Path("/tmp/replication_topo.gv.*").glob("*"):
for f in Path("/tmp/").glob("replication_topo.gv*"):
f.unlink()
g_rt.render(filename="/tmp/replication_topo.gv", quiet_view=True, format="pdf")
g_rt.render(filename="/tmp/replication_topo.gv", format="png")
except FileNotFoundError as e:
logger.error(f"Could not render graph: {e}")
23 changes: 16 additions & 7 deletions skylark/compute/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,24 @@ def start_gateway(

# increase TCP connections, enable BBR optionally and raise file limits
sysctl_updates = {
# congestion control window
"net.core.rmem_max": 2147483647,
"net.core.wmem_max": 2147483647,
"net.ipv4.tcp_rmem": "4096 87380 1073741824",
"net.ipv4.tcp_wmem": "4096 65536 1073741824",
"net.ipv4.tcp_tw_reuse": 1,
"net.core.somaxconn": 1024,
"net.core.netdev_max_backlog": 2000,
"net.ipv4.tcp_max_syn_backlog": 2048,
"fs.file-max": 1024 * 1024 * 1024,
"net.ipv4.tcp_rmem": "'4096 87380 1073741824'",
"net.ipv4.tcp_wmem": "'4096 65536 1073741824'",
# increase max number of TCP connections
# "net.ipv4.tcp_tw_reuse": 0,
# "net.ipv4.tcp_tw_recycle": 0,
"net.core.somaxconn": 65535,
"net.core.netdev_max_backlog": 4096,
"net.ipv4.tcp_max_syn_backlog": 32768,
# "net.ipv4.tcp_syn_retries": 1,
# "net.ipv4.tcp_synack_retries": 1,
# "net.ipv4.tcp_fin_timeout": 5,
# "net.ipv4.tcp_syncookies": 0,
"net.ipv4.ip_local_port_range": "'12000 65535'",
# increase file limit
"fs.file-max": 1048576,
}
if use_bbr:
sysctl_updates["net.core.default_qdisc"] = "fq"
Expand Down
51 changes: 37 additions & 14 deletions skylark/replicate/replication_plan.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from collections import namedtuple
import dataclasses
import json
import shutil
from typing import Dict, List, Optional, Set, Tuple
Expand All @@ -16,20 +16,17 @@
@dataclass
class ReplicationTopologyGateway:
region: str
instance_idx: int
instance: int

def to_dict(self):
return {
"region": self.region,
"instance_idx": self.instance_idx,
}
return dataclasses.asdict(self)

@staticmethod
def from_dict(topology_dict: Dict):
return ReplicationTopologyGateway(
region=topology_dict["region"],
instance_idx=topology_dict["instance_idx"],
)
return ReplicationTopologyGateway(**topology_dict)

def __hash__(self) -> int:
return hash(self.region) + hash(self.instance)

def __hash__(self) -> int:
return hash(self.region) + hash(self.instance_idx)
Expand Down Expand Up @@ -60,6 +57,30 @@ def add_edge(self, src_region: str, src_instance: int, dest_region: str, dest_in
def get_outgoing_paths(self, src: ReplicationTopologyGateway):
return {dest_gateway: num_connections for src_gateway, dest_gateway, num_connections in self.edges if src_gateway == src}

def source_instances(self) -> Set[ReplicationTopologyGateway]:
nodes = set(k[0] for k in self.edges)
for _, dest, _ in self.edges:
if dest in nodes:
nodes.remove(dest)
return nodes

def sink_instances(self) -> Set[ReplicationTopologyGateway]:
nodes = set(k[1] for k in self.edges)
for src, _, _ in self.edges:
if src in nodes:
nodes.remove(src)
return nodes

def source_region(self) -> str:
instances = list(self.source_instances())
assert all(i.region == instances[0].region for i in instances), "All source instances must be in the same region"
return instances[0].region

def sink_region(self) -> str:
instances = list(self.sink_instances())
assert all(i.region == instances[0].region for i in instances), "All sink instances must be in the same region"
return instances[0].region

def to_json(self):
"""
Returns a JSON representation of the topology.
Expand Down Expand Up @@ -99,21 +120,23 @@ def to_graphviz(self):
subgraphs = {}
for src_gateway, dest_gateway, n_connections in self.edges:
# group node instances by region
src_region, src_instance = src_gateway.region, src_gateway.instance_idx
dest_region, dest_instance = dest_gateway.region, dest_gateway.instance_idx
src_region, src_instance = src_gateway.region, src_gateway.instance
dest_region, dest_instance = dest_gateway.region, dest_gateway.instance
src_region, dest_region = src_region.replace(":", "/"), dest_region.replace(":", "/")
src_node = f"{src_region}, {src_instance}"
dest_node = f"{dest_region}, {dest_instance}"

# make a subgraph for each region
if src_region not in subgraphs:
subgraphs[src_region] = gv.Digraph(name=f"cluster_{src_region}")
subgraphs[src_region].attr(label=src_region)
if dest_region not in subgraphs:
subgraphs[dest_region] = gv.Digraph(name=f"cluster_{dest_region}")
subgraphs[dest_region].attr(label=dest_region)

# add nodes
subgraphs[src_region].node(src_node, label=src_node, shape="box")
subgraphs[dest_region].node(dest_node, label=dest_node, shape="box")
subgraphs[src_region].node(src_node, label=str(src_instance), shape="box")
subgraphs[dest_region].node(dest_node, label=str(dest_instance), shape="box")

# add edges
g.edge(src_node, dest_node, label=f"{n_connections} connections")
Expand Down
15 changes: 6 additions & 9 deletions skylark/replicate/replicator_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,9 @@ def run_replication_plan(self, job: ReplicationJob) -> ReplicationJob:
obj_file_size_bytes = job.src_obj_sizes() if job.source_bucket else None
for idx, obj in enumerate(job.objs):
if obj_file_size_bytes:
# object store objects
file_size_bytes = obj_file_size_bytes[obj]
else:
# random data
file_size_bytes = job.random_chunk_size_mb * MB

chunks.append(
Chunk(
key=obj,
Expand All @@ -228,9 +225,10 @@ def run_replication_plan(self, job: ReplicationJob) -> ReplicationJob:
)

# partition chunks into roughly equal-sized batches (by bytes)
src_instances = [s for n, s in self.bound_nodes.items() if n.region == job.source_region]
src_instances = [self.bound_nodes[n] for n in self.topology.source_instances()]
chunk_lens = [c.chunk_length_bytes for c in chunks]
approx_bytes_per_connection = sum(chunk_lens) / len(src_instances)
assert sum(chunk_lens) > 0, f"No chunks to replicate, got {chunk_lens}"
batch_bytes = 0
chunk_batches = []
current_batch = []
Expand Down Expand Up @@ -265,9 +263,8 @@ def run_replication_plan(self, job: ReplicationJob) -> ReplicationJob:
)
)

# send chunk requests to start gateways in parallel
with Timer("Dispatch chunk requests"):

# send chunk requests to start gateways in parallel
def send_chunk_requests(args: Tuple[Server, List[ChunkRequest]]):
hop_instance, chunk_requests = args
ip = gateway_ips[hop_instance]
Expand All @@ -276,7 +273,7 @@ def send_chunk_requests(args: Tuple[Server, List[ChunkRequest]]):
if reply.status_code != 200:
raise Exception(f"Failed to send chunk requests to gateway instance {hop_instance.instance_name()}: {reply.text}")

start_instances = list(zip(self.bound_nodes.values(), chunk_requests_sharded.values()))
start_instances = list(zip(src_instances, chunk_requests_sharded.values()))
do_parallel(send_chunk_requests, start_instances, n=-1)

job.chunk_requests = [cr for crlist in chunk_requests_sharded.values() for cr in crlist]
Expand All @@ -291,7 +288,7 @@ def get_chunk_status(args):
logs = []
for log_entry in reply.json()["chunk_status_log"]:
log_entry["region"] = node.region
log_entry["instance_idx"] = node.instance_idx
log_entry["instance"] = node.instance
log_entry["time"] = datetime.fromisoformat(log_entry["time"])
log_entry["state"] = ChunkState.from_str(log_entry["state"])
logs.append(log_entry)
Expand Down Expand Up @@ -338,7 +335,7 @@ def fn(s: Server):
# count completed bytes
last_log_df = (
log_df.groupby(["chunk_id"])
.apply(lambda df: df.sort_values(["region", "instance_idx", "time"], ascending=False).head(1))
.apply(lambda df: df.sort_values(["region", "instance", "time"], ascending=False).head(1))
.reset_index(drop=True)
)
is_complete_fn = lambda row: row["state"] >= ChunkState.upload_complete and row["region"] == job.dest_region
Expand Down
Loading

0 comments on commit 3e1eacd

Please sign in to comment.