Skip to content

Commit

Permalink
Fix ingress limit on solver
Browse files Browse the repository at this point in the history
  • Loading branch information
parasj committed Jan 27, 2022
1 parent dd8e698 commit 7106059
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 78 deletions.
2 changes: 1 addition & 1 deletion skylark/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def replicate_json(
if total_transfer_size_mb % n_chunks != 0:
logger.warning(f"total_transfer_size_mb ({total_transfer_size_mb}) is not a multiple of n_chunks ({n_chunks})")
chunk_size_mb = total_transfer_size_mb // n_chunks

if use_random_data:
job = ReplicationJob(
source_region=topo.source_region(),
Expand Down
13 changes: 7 additions & 6 deletions skylark/cli/cli_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,24 +69,25 @@ def solve_throughput(
# save results
tput.print_solution(solution)
if solution.is_feasible:
replication_topo = tput.to_replication_topology(solution)
if out:
with open(out, "w") as f:
f.write(replication_topo.to_json())
if visualize:
g = tput.plot_graphviz(solution)
if g is not None:
try:
for f in Path("/tmp/throughput_graph.gv.*").glob("*"):
for f in Path("/tmp/").glob("throughput_graph.gv*"):
f.unlink()
g.render(filename="/tmp/throughput_graph.gv", quiet_view=True, format="pdf")
g.render(filename="/tmp/throughput_graph.gv", format="png")
except FileNotFoundError as e:
logger.error(f"Could not render graph: {e}")
replication_topo = tput.to_replication_topology(solution)
if out:
with open(out, "w") as f:
f.write(replication_topo.to_json())
if visualize:
g_rt = replication_topo.to_graphviz()
if g_rt is not None:
try:
for f in Path("/tmp/replication_topo.gv.*").glob("*"):
for f in Path("/tmp/").glob("replication_topo.gv*"):
f.unlink()
g_rt.render(filename="/tmp/replication_topo.gv", quiet_view=True, format="pdf")
g_rt.render(filename="/tmp/replication_topo.gv", format="png")
Expand Down
4 changes: 2 additions & 2 deletions skylark/replicate/replication_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ def sink_instances(self) -> Set[ReplicationTopologyGateway]:
if src in nodes:
nodes.remove(src)
return nodes

def source_region(self) -> str:
instances = list(self.source_instances())
assert all(i.region == instances[0].region for i in instances), "All source instances must be in the same region"
return instances[0].region

def sink_region(self) -> str:
instances = list(self.sink_instances())
assert all(i.region == instances[0].region for i in instances), "All sink instances must be in the same region"
Expand Down
114 changes: 45 additions & 69 deletions skylark/replicate/solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,14 @@ def solve_min_cost(

# connection limits
constraints.append(conn >= 0)
# constraints.append(cp.sum(conn, axis=1) <= instances_per_region * p.benchmarked_throughput_connections)
constraints.append(cp.sum(conn, axis=0) <= instances_per_region * p.benchmarked_throughput_connections)
constraints.append(cp.sum(conn, axis=1) <= instances_per_region * p.benchmarked_throughput_connections) # egress

# ingress limit (hack)
# currently, the solver will put a ton of connections that fan into the final region
# ideally, this limit would apply to every region, but then we get strange solutions
# instead, this constraint limits fan in to the final region
for sink in sinks:
constraints.append(cp.sum(conn[:, sink]) <= instances_per_region[sink] * p.benchmarked_throughput_connections)

# flow capacity constraint
adjusted_edge_capacity_gigabits = cp.multiply(p.const_throughput_grid_gbits, conn / p.benchmarked_throughput_connections)
Expand Down Expand Up @@ -276,10 +282,12 @@ def plot_graphviz(self, solution: ThroughputSolution) -> gv.Digraph:
link_cost = self.get_path_cost(src, dst)
label = f"{solution.var_edge_flow_gigabits[i, j]:.2f} Gbps (of {solution.problem.const_throughput_grid_gbits[i, j]:.2f}Gbps), "
label += f"\n${link_cost:.4f}/GB over {solution.var_conn[i, j]:.1f}c"
g.edge(src.replace(":", "/"), dst.replace(":", "/"), label=label)
src_label = f"{src.replace(':', '/')}x{solution.var_instances_per_region[i]:.1f}"
dst_label = f"{dst.replace(':', '/')}x{solution.var_instances_per_region[j]:.1f}"
g.edge(src_label, dst_label, label=label)
return g

def to_replication_topology(self, solution: ThroughputSolution) -> ReplicationTopology:
def to_replication_topology(self, solution: ThroughputSolution, ingress_hard_limit=64, egress_hard_limit=64) -> ReplicationTopology:
regions = self.get_regions()
Edge = namedtuple("Edge", ["src_region", "src_instance_idx", "dst_region", "dst_instance_idx", "connections"])

Expand All @@ -290,79 +298,47 @@ def to_replication_topology(self, solution: ThroughputSolution) -> ReplicationTo
src_instance_idx, src_instance_connections = 0, 0
for j, dst in enumerate(regions):
if solution.var_edge_flow_gigabits[i, j] > 0:
# add up to solution.problem.benchmarked_throughput_connections connections to an instance
# if more than that, add another instance for any remaining connections and continue
connections_to_allocate = np.ceil(solution.var_conn[i, j]).astype(int)

# if this edge would exceed the instance connection limit, partially add connections to current instance and increment instance
if connections_to_allocate + src_instance_connections > solution.problem.benchmarked_throughput_connections:
partial_conn = solution.problem.benchmarked_throughput_connections - src_instance_connections
connections_to_allocate = connections_to_allocate - partial_conn
assert connections_to_allocate >= 0, f"connections_to_allocate = {connections_to_allocate}"
assert partial_conn >= 0, f"partial_conn = {partial_conn}"
src_edges.append(
Edge(
src_region=src,
src_instance_idx=src_instance_idx,
dst_region=dst,
dst_instance_idx=None,
connections=partial_conn,
)
)
src_instance_idx += 1
src_instance_connections = 0

# add remaining connections
if connections_to_allocate > 0:
src_edges.append(
Edge(
src_region=src,
src_instance_idx=src_instance_idx,
dst_region=dst,
dst_instance_idx=None,
connections=connections_to_allocate,
)
)
src_instance_connections += connections_to_allocate
connections_to_allocate = np.rint(solution.var_conn[i, j]).astype(int)
while connections_to_allocate > 0:
# if this edge would exceed the instance connection limit, partially add connections to current instance and increment instance
if connections_to_allocate + src_instance_connections > egress_hard_limit:
partial_conn = egress_hard_limit - src_instance_connections
connections_to_allocate -= partial_conn
assert connections_to_allocate >= 0, f"connections_to_allocate = {connections_to_allocate}"
assert partial_conn >= 0, f"partial_conn = {partial_conn}"
if partial_conn > 0:
src_edges.append(Edge(src, src_instance_idx, dst, None, partial_conn))
src_instance_idx += 1
src_instance_connections = 0
else:
partial_conn = connections_to_allocate
connections_to_allocate = 0
src_edges.append(Edge(src, src_instance_idx, dst, None, partial_conn))
src_instance_connections += partial_conn
n_instances[i] = src_instance_idx + 1

# assign destination instances (currently None) to Edges
# ingress_conn_per_instance = np.sum(solution.var_conn, axis=0) / n_instances
# balance connections across instances by assigning connections from a source instance to the next
# destination instance until ingress_conn_per_instance is reached, then increment destination instance.
# ensure the total number of destination instances is the same as the number of source instances
dst_edges = []
ingress_conn_per_instance = {r: np.ceil(np.sum(solution.var_conn[:, i]) / n_instances[i]) for i, r in enumerate(regions)}
dsts_instance_idx = {i: 0 for i in regions}
dsts_instance_conn = {i: 0 for i in regions}
for e in src_edges:
connections_to_allocate = np.ceil(e.connections).astype(int)
if connections_to_allocate + dsts_instance_conn[e.dst_region] > ingress_conn_per_instance[e.dst_region]:
partial_conn = ingress_conn_per_instance[e.dst_region] - dsts_instance_conn[e.dst_region]
connections_to_allocate = connections_to_allocate - partial_conn
dst_edges.append(
Edge(
src_region=e.src_region,
src_instance_idx=e.src_instance_idx,
dst_region=e.dst_region,
dst_instance_idx=dsts_instance_idx[e.dst_region],
connections=partial_conn,
)
)
dsts_instance_idx[e.dst_region] += 1
dsts_instance_conn[e.dst_region] = 0

if connections_to_allocate > 0:
dst_edges.append(
Edge(
src_region=e.src_region,
src_instance_idx=e.src_instance_idx,
dst_region=e.dst_region,
dst_instance_idx=dsts_instance_idx[e.dst_region],
connections=connections_to_allocate,
connections_to_allocate = np.rint(e.connections).astype(int)
while connections_to_allocate > 0:
if connections_to_allocate + dsts_instance_conn[e.dst_region] > ingress_hard_limit:
partial_conn = ingress_hard_limit - dsts_instance_conn[e.dst_region]
connections_to_allocate = connections_to_allocate - partial_conn
if partial_conn > 0:
dst_edges.append(
Edge(e.src_region, e.src_instance_idx, e.dst_region, dsts_instance_idx[e.dst_region], partial_conn)
)
dsts_instance_idx[e.dst_region] += 1
dsts_instance_conn[e.dst_region] = 0
else:
dst_edges.append(
Edge(e.src_region, e.src_instance_idx, e.dst_region, dsts_instance_idx[e.dst_region], connections_to_allocate)
)
)
dsts_instance_conn[e.dst_region] += connections_to_allocate
dsts_instance_conn[e.dst_region] += connections_to_allocate
connections_to_allocate = 0

# build ReplicationTopology
replication_topology = ReplicationTopology()
Expand Down

0 comments on commit 7106059

Please sign in to comment.