From 3e1eacd93bc15a9041c5691225e361ae958b5ed5 Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Thu, 27 Jan 2022 00:55:04 +0000 Subject: [PATCH] Squashed commit of the following: commit f8f38dbf45619056533879792b05623f4934c300 Author: Paras Jain Date: Thu Jan 27 00:50:18 2022 +0000 Squashed commit of the following: commit b22747797e5228dc26d2609131a6a5aaca37efcd Author: Paras Jain Date: Wed Jan 26 16:48:59 2022 -0800 More fixes commit 71060590f6fb03eb8346ab4f21b9ecbcfd5a3515 Author: Paras Jain Date: Wed Jan 26 16:46:29 2022 -0800 Fix ingress limit on solver commit dd8e69824c04d63acb74261f116ab5d0b6f23558 Author: Paras Jain Date: Wed Jan 26 23:42:20 2022 +0000 test commit 898481c17bbfca2dd567386fd556178c039c6bc1 Author: Paras Jain Date: Wed Jan 26 01:44:03 2022 +0000 transfer works! commit 313d2d9da7314be3d18080995713343217dcfbd9 Author: Paras Jain Date: Tue Jan 25 22:04:46 2022 +0000 Unhashable type fix commit d970d283cf46822e8660381e0aedcbcd205ec0cd Author: Paras Jain Date: Tue Jan 25 22:01:23 2022 +0000 Fix monitoring commit fdb9255f8b94e9974e78f453c88e3e1733b7d3e2 Author: Paras Jain Date: Tue Jan 25 21:48:14 2022 +0000 Initial draft of a gateway with pre-allocated paths --- Dockerfile | 10 +++ skylark/cli/cli.py | 98 +++++++++++++++++++++- skylark/cli/cli_solver.py | 15 ++-- skylark/compute/server.py | 23 ++++-- skylark/replicate/replication_plan.py | 51 ++++++++---- skylark/replicate/replicator_client.py | 15 ++-- skylark/replicate/solver.py | 108 +++++++++---------------- 7 files changed, 213 insertions(+), 107 deletions(-) diff --git a/Dockerfile b/Dockerfile index 6ae4669b6..89a4b1ec7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,15 @@ # syntax=docker/dockerfile:1 FROM python:3.8-slim + +# increase number of open files and concurrent TCP connections +RUN echo 'net.ipv4.ip_local_port_range = 12000 65535' >> /etc/sysctl.conf +RUN echo 'fs.file-max = 1048576' >> /etc/sysctl.conf +RUN mkdir -p /etc/security/ +RUN echo '* soft nofile 1048576' >> /etc/security/limits.conf +RUN echo '* hard nofile 1048576' >> /etc/security/limits.conf +RUN echo 'root soft nofile 1048576' >> /etc/security/limits.conf +RUN echo 'root hard nofile 1048576' >> /etc/security/limits.conf + COPY scripts/requirements-gateway.txt /tmp/requirements-gateway.txt RUN --mount=type=cache,target=/root/.cache/pip pip install --no-cache-dir --compile -r /tmp/requirements-gateway.txt && rm -r /tmp/requirements-gateway.txt diff --git a/skylark/cli/cli.py b/skylark/cli/cli.py index cf6e13d15..eaa0bf343 100644 --- a/skylark/cli/cli.py +++ b/skylark/cli/cli.py @@ -95,8 +95,8 @@ def replicate_random( inter_region: Optional[str] = typer.Argument(None), num_gateways: int = 1, num_outgoing_connections: int = 16, - chunk_size_mb: int = 8, - n_chunks: int = 2048, + total_transfer_size_mb: int = typer.Option(2048, "--size-total-mb", "-s", help="Total transfer size in MB (across n_chunks chunks)"), + n_chunks: int = 512, reuse_gateways: bool = True, azure_subscription: Optional[str] = None, gcp_project: Optional[str] = None, @@ -104,7 +104,7 @@ def replicate_random( aws_instance_class: str = "m5.8xlarge", azure_instance_class: str = "Standard_D32_v5", gcp_instance_class: Optional[str] = "n2-standard-32", - gcp_use_premium_network: bool = False, + gcp_use_premium_network: bool = True, key_prefix: str = "/test/replicate_random", time_limit_seconds: Optional[int] = None, log_interval_s: float = 1.0, @@ -148,6 +148,9 @@ def replicate_random( for node, gw in rc.bound_nodes.items(): logger.info(f"Provisioned {node}: {gw.gateway_log_viewer_url}") + if total_transfer_size_mb % n_chunks != 0: + logger.warning(f"total_transfer_size_mb ({total_transfer_size_mb}) is not a multiple of n_chunks ({n_chunks})") + chunk_size_mb = total_transfer_size_mb // n_chunks job = ReplicationJob( source_region=src_region, source_bucket=None, @@ -174,6 +177,95 @@ def replicate_random( return 0 if stats["success"] else 1 +@app.command() +def replicate_json( + path: Path = typer.Argument(..., exists=True, file_okay=True, dir_okay=False, help="Path to JSON file describing replication plan"), + total_transfer_size_mb: int = typer.Option(2048, "--size-total-mb", "-s", help="Total transfer size in MB (across n_chunks chunks)"), + n_chunks: int = 512, + # bucket options + use_random_data: bool = True, + bucket_prefix: str = "skylark", + key_prefix: str = "/test/replicate_random", + # gateway provisioning options + reuse_gateways: bool = True, + gateway_docker_image: str = os.environ.get("SKYLARK_DOCKER_IMAGE", "ghcr.io/parasj/skylark:main"), + # cloud provider specific options + azure_subscription: Optional[str] = None, + gcp_project: Optional[str] = None, + aws_instance_class: str = "m5.8xlarge", + azure_instance_class: str = "Standard_D32_v5", + gcp_instance_class: Optional[str] = "n2-standard-32", + gcp_use_premium_network: bool = True, + # logging options + time_limit_seconds: Optional[int] = None, + log_interval_s: float = 1.0, +): + """Replicate objects from remote object store to another remote object store.""" + print_header() + config = load_config() + gcp_project = gcp_project or config.get("gcp_project_id") + azure_subscription = azure_subscription or config.get("azure_subscription_id") + logger.debug(f"Loaded gcp_project: {gcp_project}, azure_subscription: {azure_subscription}") + check_ulimit() + + with path.open("r") as f: + topo = ReplicationTopology.from_json(f.read()) + + rc = ReplicatorClient( + topo, + azure_subscription=azure_subscription, + gcp_project=gcp_project, + gateway_docker_image=gateway_docker_image, + aws_instance_class=aws_instance_class, + azure_instance_class=azure_instance_class, + gcp_instance_class=gcp_instance_class, + gcp_use_premium_network=gcp_use_premium_network, + ) + + if not reuse_gateways: + atexit.register(rc.deprovision_gateways) + else: + logger.warning( + f"Instances will remain up and may result in continued cloud billing. Remember to call `skylark deprovision` to deprovision gateways." + ) + rc.provision_gateways(reuse_gateways) + for node, gw in rc.bound_nodes.items(): + logger.info(f"Provisioned {node}: {gw.gateway_log_viewer_url}") + + if total_transfer_size_mb % n_chunks != 0: + logger.warning(f"total_transfer_size_mb ({total_transfer_size_mb}) is not a multiple of n_chunks ({n_chunks})") + chunk_size_mb = total_transfer_size_mb // n_chunks + + if use_random_data: + job = ReplicationJob( + source_region=topo.source_region(), + source_bucket=None, + dest_region=topo.sink_region(), + dest_bucket=None, + objs=[f"{key_prefix}/{i}" for i in range(n_chunks)], + random_chunk_size_mb=chunk_size_mb, + ) + else: + raise NotImplementedError() + + total_bytes = n_chunks * chunk_size_mb * MB + job = rc.run_replication_plan(job) + logger.info(f"{total_bytes / GB:.2f}GByte replication job launched") + stats = rc.monitor_transfer( + job, + show_pbar=True, + log_interval_s=log_interval_s, + time_limit_seconds=time_limit_seconds, + cancel_pending=False, + ) + stats["success"] = stats["monitor_status"] == "completed" + stats["log"] = rc.get_chunk_status_log_df() + + out_json = {k: v for k, v in stats.items() if k not in ["log", "completed_chunk_ids"]} + typer.echo(f"\n{json.dumps(out_json)}") + return 0 if stats["success"] else 1 + + @app.command() def deprovision(azure_subscription: Optional[str] = None, gcp_project: Optional[str] = None): """Deprovision gateways.""" diff --git a/skylark/cli/cli_solver.py b/skylark/cli/cli_solver.py index 383485a90..402af495c 100644 --- a/skylark/cli/cli_solver.py +++ b/skylark/cli/cli_solver.py @@ -69,24 +69,27 @@ def solve_throughput( # save results tput.print_solution(solution) if solution.is_feasible: - replication_topo = tput.to_replication_topology(solution) - if out: - with open(out, "w") as f: - f.write(replication_topo.to_json()) if visualize: g = tput.plot_graphviz(solution) if g is not None: try: - for f in Path("/tmp/throughput_graph.gv.*").glob("*"): + for f in Path("/tmp/").glob("throughput_graph.gv*"): f.unlink() g.render(filename="/tmp/throughput_graph.gv", quiet_view=True, format="pdf") + g.render(filename="/tmp/throughput_graph.gv", format="png") except FileNotFoundError as e: logger.error(f"Could not render graph: {e}") + replication_topo = tput.to_replication_topology(solution) + if out: + with open(out, "w") as f: + f.write(replication_topo.to_json()) + if visualize: g_rt = replication_topo.to_graphviz() if g_rt is not None: try: - for f in Path("/tmp/replication_topo.gv.*").glob("*"): + for f in Path("/tmp/").glob("replication_topo.gv*"): f.unlink() g_rt.render(filename="/tmp/replication_topo.gv", quiet_view=True, format="pdf") + g_rt.render(filename="/tmp/replication_topo.gv", format="png") except FileNotFoundError as e: logger.error(f"Could not render graph: {e}") diff --git a/skylark/compute/server.py b/skylark/compute/server.py index eaceff2fe..87b171c9c 100644 --- a/skylark/compute/server.py +++ b/skylark/compute/server.py @@ -198,15 +198,24 @@ def start_gateway( # increase TCP connections, enable BBR optionally and raise file limits sysctl_updates = { + # congestion control window "net.core.rmem_max": 2147483647, "net.core.wmem_max": 2147483647, - "net.ipv4.tcp_rmem": "4096 87380 1073741824", - "net.ipv4.tcp_wmem": "4096 65536 1073741824", - "net.ipv4.tcp_tw_reuse": 1, - "net.core.somaxconn": 1024, - "net.core.netdev_max_backlog": 2000, - "net.ipv4.tcp_max_syn_backlog": 2048, - "fs.file-max": 1024 * 1024 * 1024, + "net.ipv4.tcp_rmem": "'4096 87380 1073741824'", + "net.ipv4.tcp_wmem": "'4096 65536 1073741824'", + # increase max number of TCP connections + # "net.ipv4.tcp_tw_reuse": 0, + # "net.ipv4.tcp_tw_recycle": 0, + "net.core.somaxconn": 65535, + "net.core.netdev_max_backlog": 4096, + "net.ipv4.tcp_max_syn_backlog": 32768, + # "net.ipv4.tcp_syn_retries": 1, + # "net.ipv4.tcp_synack_retries": 1, + # "net.ipv4.tcp_fin_timeout": 5, + # "net.ipv4.tcp_syncookies": 0, + "net.ipv4.ip_local_port_range": "'12000 65535'", + # increase file limit + "fs.file-max": 1048576, } if use_bbr: sysctl_updates["net.core.default_qdisc"] = "fq" diff --git a/skylark/replicate/replication_plan.py b/skylark/replicate/replication_plan.py index 7a9e43b8d..975613ed3 100644 --- a/skylark/replicate/replication_plan.py +++ b/skylark/replicate/replication_plan.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from collections import namedtuple +import dataclasses import json import shutil from typing import Dict, List, Optional, Set, Tuple @@ -16,20 +16,17 @@ @dataclass class ReplicationTopologyGateway: region: str - instance_idx: int + instance: int def to_dict(self): - return { - "region": self.region, - "instance_idx": self.instance_idx, - } + return dataclasses.asdict(self) @staticmethod def from_dict(topology_dict: Dict): - return ReplicationTopologyGateway( - region=topology_dict["region"], - instance_idx=topology_dict["instance_idx"], - ) + return ReplicationTopologyGateway(**topology_dict) + + def __hash__(self) -> int: + return hash(self.region) + hash(self.instance) def __hash__(self) -> int: return hash(self.region) + hash(self.instance_idx) @@ -60,6 +57,30 @@ def add_edge(self, src_region: str, src_instance: int, dest_region: str, dest_in def get_outgoing_paths(self, src: ReplicationTopologyGateway): return {dest_gateway: num_connections for src_gateway, dest_gateway, num_connections in self.edges if src_gateway == src} + def source_instances(self) -> Set[ReplicationTopologyGateway]: + nodes = set(k[0] for k in self.edges) + for _, dest, _ in self.edges: + if dest in nodes: + nodes.remove(dest) + return nodes + + def sink_instances(self) -> Set[ReplicationTopologyGateway]: + nodes = set(k[1] for k in self.edges) + for src, _, _ in self.edges: + if src in nodes: + nodes.remove(src) + return nodes + + def source_region(self) -> str: + instances = list(self.source_instances()) + assert all(i.region == instances[0].region for i in instances), "All source instances must be in the same region" + return instances[0].region + + def sink_region(self) -> str: + instances = list(self.sink_instances()) + assert all(i.region == instances[0].region for i in instances), "All sink instances must be in the same region" + return instances[0].region + def to_json(self): """ Returns a JSON representation of the topology. @@ -99,8 +120,8 @@ def to_graphviz(self): subgraphs = {} for src_gateway, dest_gateway, n_connections in self.edges: # group node instances by region - src_region, src_instance = src_gateway.region, src_gateway.instance_idx - dest_region, dest_instance = dest_gateway.region, dest_gateway.instance_idx + src_region, src_instance = src_gateway.region, src_gateway.instance + dest_region, dest_instance = dest_gateway.region, dest_gateway.instance src_region, dest_region = src_region.replace(":", "/"), dest_region.replace(":", "/") src_node = f"{src_region}, {src_instance}" dest_node = f"{dest_region}, {dest_instance}" @@ -108,12 +129,14 @@ def to_graphviz(self): # make a subgraph for each region if src_region not in subgraphs: subgraphs[src_region] = gv.Digraph(name=f"cluster_{src_region}") + subgraphs[src_region].attr(label=src_region) if dest_region not in subgraphs: subgraphs[dest_region] = gv.Digraph(name=f"cluster_{dest_region}") + subgraphs[dest_region].attr(label=dest_region) # add nodes - subgraphs[src_region].node(src_node, label=src_node, shape="box") - subgraphs[dest_region].node(dest_node, label=dest_node, shape="box") + subgraphs[src_region].node(src_node, label=str(src_instance), shape="box") + subgraphs[dest_region].node(dest_node, label=str(dest_instance), shape="box") # add edges g.edge(src_node, dest_node, label=f"{n_connections} connections") diff --git a/skylark/replicate/replicator_client.py b/skylark/replicate/replicator_client.py index 603b49a83..fa1b4c1b5 100644 --- a/skylark/replicate/replicator_client.py +++ b/skylark/replicate/replicator_client.py @@ -212,12 +212,9 @@ def run_replication_plan(self, job: ReplicationJob) -> ReplicationJob: obj_file_size_bytes = job.src_obj_sizes() if job.source_bucket else None for idx, obj in enumerate(job.objs): if obj_file_size_bytes: - # object store objects file_size_bytes = obj_file_size_bytes[obj] else: - # random data file_size_bytes = job.random_chunk_size_mb * MB - chunks.append( Chunk( key=obj, @@ -228,9 +225,10 @@ def run_replication_plan(self, job: ReplicationJob) -> ReplicationJob: ) # partition chunks into roughly equal-sized batches (by bytes) - src_instances = [s for n, s in self.bound_nodes.items() if n.region == job.source_region] + src_instances = [self.bound_nodes[n] for n in self.topology.source_instances()] chunk_lens = [c.chunk_length_bytes for c in chunks] approx_bytes_per_connection = sum(chunk_lens) / len(src_instances) + assert sum(chunk_lens) > 0, f"No chunks to replicate, got {chunk_lens}" batch_bytes = 0 chunk_batches = [] current_batch = [] @@ -265,9 +263,8 @@ def run_replication_plan(self, job: ReplicationJob) -> ReplicationJob: ) ) - # send chunk requests to start gateways in parallel with Timer("Dispatch chunk requests"): - + # send chunk requests to start gateways in parallel def send_chunk_requests(args: Tuple[Server, List[ChunkRequest]]): hop_instance, chunk_requests = args ip = gateway_ips[hop_instance] @@ -276,7 +273,7 @@ def send_chunk_requests(args: Tuple[Server, List[ChunkRequest]]): if reply.status_code != 200: raise Exception(f"Failed to send chunk requests to gateway instance {hop_instance.instance_name()}: {reply.text}") - start_instances = list(zip(self.bound_nodes.values(), chunk_requests_sharded.values())) + start_instances = list(zip(src_instances, chunk_requests_sharded.values())) do_parallel(send_chunk_requests, start_instances, n=-1) job.chunk_requests = [cr for crlist in chunk_requests_sharded.values() for cr in crlist] @@ -291,7 +288,7 @@ def get_chunk_status(args): logs = [] for log_entry in reply.json()["chunk_status_log"]: log_entry["region"] = node.region - log_entry["instance_idx"] = node.instance_idx + log_entry["instance"] = node.instance log_entry["time"] = datetime.fromisoformat(log_entry["time"]) log_entry["state"] = ChunkState.from_str(log_entry["state"]) logs.append(log_entry) @@ -338,7 +335,7 @@ def fn(s: Server): # count completed bytes last_log_df = ( log_df.groupby(["chunk_id"]) - .apply(lambda df: df.sort_values(["region", "instance_idx", "time"], ascending=False).head(1)) + .apply(lambda df: df.sort_values(["region", "instance", "time"], ascending=False).head(1)) .reset_index(drop=True) ) is_complete_fn = lambda row: row["state"] >= ChunkState.upload_complete and row["region"] == job.dest_region diff --git a/skylark/replicate/solver.py b/skylark/replicate/solver.py index 1bc1d7f92..c6d7e5eba 100644 --- a/skylark/replicate/solver.py +++ b/skylark/replicate/solver.py @@ -158,7 +158,9 @@ def solve_min_cost( # connection limits constraints.append(conn >= 0) - constraints.append(cp.sum(conn, axis=1) <= instances_per_region * p.benchmarked_throughput_connections) + constraints.append(cp.sum(conn, axis=1) <= instances_per_region * p.benchmarked_throughput_connections) # egress + for i in range(len(regions)): + constraints.append(cp.sum(conn[:, i]) <= instances_per_region[i] * p.benchmarked_throughput_connections) # ingress # flow capacity constraint adjusted_edge_capacity_gigabits = cp.multiply(p.const_throughput_grid_gbits, conn / p.benchmarked_throughput_connections) @@ -275,10 +277,12 @@ def plot_graphviz(self, solution: ThroughputSolution) -> gv.Digraph: link_cost = self.get_path_cost(src, dst) label = f"{solution.var_edge_flow_gigabits[i, j]:.2f} Gbps (of {solution.problem.const_throughput_grid_gbits[i, j]:.2f}Gbps), " label += f"\n${link_cost:.4f}/GB over {solution.var_conn[i, j]:.1f}c" - g.edge(src.replace(":", "/"), dst.replace(":", "/"), label=label) + src_label = f"{src.replace(':', '/')}x{solution.var_instances_per_region[i]:.1f}" + dst_label = f"{dst.replace(':', '/')}x{solution.var_instances_per_region[j]:.1f}" + g.edge(src_label, dst_label, label=label) return g - def to_replication_topology(self, solution: ThroughputSolution) -> ReplicationTopology: + def to_replication_topology(self, solution: ThroughputSolution, ingress_hard_limit=64, egress_hard_limit=64) -> ReplicationTopology: regions = self.get_regions() Edge = namedtuple("Edge", ["src_region", "src_instance_idx", "dst_region", "dst_instance_idx", "connections"]) @@ -289,79 +293,47 @@ def to_replication_topology(self, solution: ThroughputSolution) -> ReplicationTo src_instance_idx, src_instance_connections = 0, 0 for j, dst in enumerate(regions): if solution.var_edge_flow_gigabits[i, j] > 0: - # add up to solution.problem.benchmarked_throughput_connections connections to an instance - # if more than that, add another instance for any remaining connections and continue - connections_to_allocate = np.ceil(solution.var_conn[i, j]).astype(int) - - # if this edge would exceed the instance connection limit, partially add connections to current instance and increment instance - if connections_to_allocate + src_instance_connections > solution.problem.benchmarked_throughput_connections: - partial_conn = solution.problem.benchmarked_throughput_connections - src_instance_connections - connections_to_allocate = connections_to_allocate - partial_conn - assert connections_to_allocate >= 0, f"connections_to_allocate = {connections_to_allocate}" - assert partial_conn >= 0, f"partial_conn = {partial_conn}" - src_edges.append( - Edge( - src_region=src, - src_instance_idx=src_instance_idx, - dst_region=dst, - dst_instance_idx=None, - connections=partial_conn, - ) - ) - src_instance_idx += 1 - src_instance_connections = 0 - - # add remaining connections - if connections_to_allocate > 0: - src_edges.append( - Edge( - src_region=src, - src_instance_idx=src_instance_idx, - dst_region=dst, - dst_instance_idx=None, - connections=connections_to_allocate, - ) - ) - src_instance_connections += connections_to_allocate + connections_to_allocate = np.rint(solution.var_conn[i, j]).astype(int) + while connections_to_allocate > 0: + # if this edge would exceed the instance connection limit, partially add connections to current instance and increment instance + if connections_to_allocate + src_instance_connections > egress_hard_limit: + partial_conn = egress_hard_limit - src_instance_connections + connections_to_allocate -= partial_conn + assert connections_to_allocate >= 0, f"connections_to_allocate = {connections_to_allocate}" + assert partial_conn >= 0, f"partial_conn = {partial_conn}" + if partial_conn > 0: + src_edges.append(Edge(src, src_instance_idx, dst, None, partial_conn)) + src_instance_idx += 1 + src_instance_connections = 0 + else: + partial_conn = connections_to_allocate + connections_to_allocate = 0 + src_edges.append(Edge(src, src_instance_idx, dst, None, partial_conn)) + src_instance_connections += partial_conn n_instances[i] = src_instance_idx + 1 # assign destination instances (currently None) to Edges - # ingress_conn_per_instance = np.sum(solution.var_conn, axis=0) / n_instances - # balance connections across instances by assigning connections from a source instance to the next - # destination instance until ingress_conn_per_instance is reached, then increment destination instance. - # ensure the total number of destination instances is the same as the number of source instances dst_edges = [] - ingress_conn_per_instance = {r: np.ceil(np.sum(solution.var_conn[:, i]) / n_instances[i]) for i, r in enumerate(regions)} dsts_instance_idx = {i: 0 for i in regions} dsts_instance_conn = {i: 0 for i in regions} for e in src_edges: - connections_to_allocate = np.ceil(e.connections).astype(int) - if connections_to_allocate + dsts_instance_conn[e.dst_region] > ingress_conn_per_instance[e.dst_region]: - partial_conn = ingress_conn_per_instance[e.dst_region] - dsts_instance_conn[e.dst_region] - connections_to_allocate = connections_to_allocate - partial_conn - dst_edges.append( - Edge( - src_region=e.src_region, - src_instance_idx=e.src_instance_idx, - dst_region=e.dst_region, - dst_instance_idx=dsts_instance_idx[e.dst_region], - connections=partial_conn, - ) - ) - dsts_instance_idx[e.dst_region] += 1 - dsts_instance_conn[e.dst_region] = 0 - - if connections_to_allocate > 0: - dst_edges.append( - Edge( - src_region=e.src_region, - src_instance_idx=e.src_instance_idx, - dst_region=e.dst_region, - dst_instance_idx=dsts_instance_idx[e.dst_region], - connections=connections_to_allocate, + connections_to_allocate = np.rint(e.connections).astype(int) + while connections_to_allocate > 0: + if connections_to_allocate + dsts_instance_conn[e.dst_region] > ingress_hard_limit: + partial_conn = ingress_hard_limit - dsts_instance_conn[e.dst_region] + connections_to_allocate = connections_to_allocate - partial_conn + if partial_conn > 0: + dst_edges.append( + Edge(e.src_region, e.src_instance_idx, e.dst_region, dsts_instance_idx[e.dst_region], partial_conn) + ) + dsts_instance_idx[e.dst_region] += 1 + dsts_instance_conn[e.dst_region] = 0 + else: + dst_edges.append( + Edge(e.src_region, e.src_instance_idx, e.dst_region, dsts_instance_idx[e.dst_region], connections_to_allocate) ) - ) - dsts_instance_conn[e.dst_region] += connections_to_allocate + dsts_instance_conn[e.dst_region] += connections_to_allocate + connections_to_allocate = 0 # build ReplicationTopology replication_topology = ReplicationTopology()