Skip to content

Commit

Permalink
Update solver
Browse files Browse the repository at this point in the history
  • Loading branch information
parasj committed Jan 30, 2022
1 parent a2863f9 commit 37de85a
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 71 deletions.
91 changes: 34 additions & 57 deletions skylark/benchmark/pareto_speedups.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import argparse
import pickle
import pickle
from typing import List

import cvxpy as cp
import numpy as np
Expand All @@ -9,95 +10,71 @@
from tqdm import tqdm

from skylark import GB, skylark_root
from skylark.replicate.solver import ThroughputSolverILP
from skylark.replicate.solver import ThroughputProblem, ThroughputSolution, ThroughputSolverILP


@ray.remote
def benchmark(
src, dst, min_throughput, gbyte_to_transfer=1, instance_limit=1, max_connections_per_path=64, max_connections_per_node=64, log_dir=None
):
solver = ThroughputSolverILP(skylark_root / "profiles" / "throughput_mini.csv")
solution = solver.solve_min_cost(
src,
dst,
required_throughput_gbits=min_throughput * instance_limit,
gbyte_to_transfer=gbyte_to_transfer,
instance_limit=instance_limit,
max_connections_per_path=max_connections_per_path,
max_connections_per_node=max_connections_per_node,
solver=cp.CBC,
solver_verbose=False
)
if solution["feasible"]:
baseline_throughput = solver.get_path_throughput(src, dst) / GB
baseline_cost = solver.get_path_cost(src, dst) * gbyte_to_transfer
return dict(
src=src,
dst=dst,
min_throughput=min_throughput,
gbyte_to_transfer=gbyte_to_transfer,
instance_limit=instance_limit,
max_connections_per_path=max_connections_per_path,
max_connections_per_node=max_connections_per_node,
cost=solution["cost"],
baseline_cost=baseline_cost,
throughput=solution["throughput"],
baseline_throughput=baseline_throughput,
throughput_speedup=solution["throughput"] / baseline_throughput,
cost_factor=solution["cost"] / baseline_cost,
solution=solution,
)
else:
return None
def benchmark(p: ThroughputProblem) -> ThroughputSolution:
solver = ThroughputSolverILP(skylark_root / "profiles" / "throughput.csv")
solution = solver.solve_min_cost(p=p, instance_cost_multipler=1, solver=cp.CBC, solver_verbose=False)
solution.problem.const_throughput_grid_gbits = None
solution.problem.const_cost_per_gb_grid = None
return solution


def main(args):
ray.init()
solver = ThroughputSolverILP(skylark_root / "profiles" / "throughput_mini.csv")
ray.init(address="auto")
solver = ThroughputSolverILP(skylark_root / "profiles" / "throughput.csv")
regions = solver.get_regions()
# regions = np.random.choice(regions, size=6, replace=False)
regions = np.random.choice(regions, size=3, replace=False)

configs = []
problems = []
for src in regions:
for dst in regions:
if src != dst:
for instance_limit in [1, 2, 4]:
for min_throughput in np.linspace(0, args.max_throughput * instance_limit, args.num_throughputs)[1:]:
configs.append(dict(
src=src,
dst=dst,
min_throughput=min_throughput,
gbyte_to_transfer=args.gbyte_to_transfer,
instance_limit=instance_limit,
max_connections_per_path=64,
max_connections_per_node=64,
))
for min_throughput in np.linspace(0, args.max_throughput * instance_limit, args.num_throughputs):
problems.append(
ThroughputProblem(
src=src,
dst=dst,
required_throughput_gbits=min_throughput,
gbyte_to_transfer=args.gbyte_to_transfer,
instance_limit=instance_limit,
)
)

results = []
for config in tqdm(configs, desc="dispatch"):
results.append(benchmark.remote(**config))
for problem in tqdm(problems, desc="dispatch"):
results.append(benchmark.remote(problem))

# get batches of results with ray.get, update tqdm progress bar
n_feasible, n_infeasible = 0, 0
remaining_refs = results
results_out = []
with tqdm(total=len(results)) as pbar:
with tqdm(total=len(results), desc="Solve") as pbar:
while remaining_refs:
ready_refs, remaining_refs = ray.wait(remaining_refs, num_returns=1)
results_out.extend(ray.get(ready_refs))
sol: List[ThroughputSolution] = ray.get(ready_refs)
results_out.extend(sol)
pbar.update(len(ready_refs))
n_feasible += len([s for s in sol if s.is_feasible])
n_infeasible += len([s for s in sol if not s.is_feasible])
pbar.set_postfix(feasible=n_feasible, infeasible=n_infeasible)

results_out = [r for r in results_out if r is not None]
with open(skylark_root / "data" / "pareto.pkl", "wb") as f:
pickle.dump(results_out, f, protocol=pickle.HIGHEST_PROTOCOL)
# df = pd.DataFrame(results_out)
# df.to_pickle(skylark_root / "data" / "pareto.pkl")
print(f"Saved {len(results_out)} results to {skylark_root / 'data' / 'pareto.pkl'}")
breakpoint()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--max-throughput", type=float, default=20)
parser.add_argument("--num-throughputs", type=int, default=100)
parser.add_argument("--max-throughput", type=float, default=12.5)
parser.add_argument("--num-throughputs", type=int, default=40)
parser.add_argument("--gbyte-to-transfer", type=float, default=1)
args = parser.parse_args()
main(args)
45 changes: 31 additions & 14 deletions skylark/replicate/solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,28 @@ class ThroughputSolution:
cost_total: Optional[float] = None
transfer_runtime_s: Optional[float] = None

# estimated values of baseline throughput, cost, and speedup
baseline_throughput_gbits: Optional[float] = None
baseline_cost: Optional[float] = None


class ThroughputSolver:
def __init__(self, df_path, default_throughput=0.0):
self.df = pd.read_csv(df_path).set_index(["src_region", "dst_region"])
self.df = pd.read_csv(df_path).set_index(["src_region", "src_tier", "dst_region", "dst_tier"]).sort_index()
self.default_throughput = default_throughput

def get_path_throughput(self, src, dst):
def get_path_throughput(self, src, dst, src_tier="PREMIUM", dst_tier="PREMIUM"):
if src == dst:
return self.default_throughput
elif (src, dst) not in self.df.index:
elif (src, dst, src_tier, dst_tier) not in self.df.index:
return None
return self.df.loc[(src, dst), "throughput_sent"]
return self.df.loc[(src, dst, src_tier, dst_tier), "throughput_sent"]

def get_path_cost(self, src, dst):
return CloudProvider.get_transfer_cost(src, dst)
def get_path_cost(self, src, dst, premium_tier=True):
return CloudProvider.get_transfer_cost(src, dst, premium_tier=premium_tier)

def get_regions(self):
return list(sorted(set(list(self.df.index.levels[0].unique()) + list(self.df.index.levels[1].unique()))))
return list(sorted(set(list(self.df.index.levels[0].unique()) + list(self.df.index.levels[2].unique()))))

def get_throughput_grid(self):
regions = self.get_regions()
Expand Down Expand Up @@ -123,14 +127,23 @@ def plot_throughput_grid(self, data_grid, title="Throughput (Gbps)"):


class ThroughputSolverILP(ThroughputSolver):
def get_baseline_throughput_and_cost(self, p: ThroughputProblem, src_idx: int, dst_idx: int) -> Tuple[float, float]:
throughput = max(p.instance_limit * p.const_throughput_grid_gbits[src_idx, dst_idx], 1e-6)
transfer_s = p.gbyte_to_transfer * GBIT_PER_GBYTE / throughput
cost = p.cost_per_instance_hr * p.instance_limit * transfer_s / 3600
return throughput, cost

def solve_min_cost(
self, p: ThroughputProblem, instance_cost_multipler: float = 1.0, solver=cp.GLPK, solver_verbose=False, save_lp_path=None
):
logger.debug(f"Solving for problem {p}")

regions = self.get_regions()
sources = [regions.index(p.src)]
sinks = [regions.index(p.dst)]
try:
regions = self.get_regions()
sources = [regions.index(p.src)]
sinks = [regions.index(p.dst)]
except ValueError as e:
logger.error(f"{p.src} or {p.dst} not in regions")
logger.error(f"regions: {regions}")
raise e

# define constants
if p.const_throughput_grid_gbits is None:
Expand Down Expand Up @@ -211,6 +224,7 @@ def solve_min_cost(
else:
prob.solve(solver=solver, verbose=solver_verbose)

baseline_throughput, baseline_cost = self.get_baseline_throughput_and_cost(p, sources[0], sinks[0])
if prob.status == "optimal":
return ThroughputSolution(
problem=p,
Expand All @@ -224,10 +238,13 @@ def solve_min_cost(
cost_instance=instance_cost.value,
cost_total=instance_cost.value + cost_egress.value,
transfer_runtime_s=runtime_s,
baseline_throughput_gbits=baseline_throughput,
baseline_cost=baseline_cost,
)
else:
logger.warning(f"Solver status: {prob.status}")
return ThroughputSolution(problem=p, is_feasible=False)
return ThroughputSolution(
problem=p, is_feasible=False, baseline_throughput_gbits=baseline_throughput, baseline_cost=baseline_cost
)

def print_solution(self, solution: ThroughputSolution):
if solution.is_feasible:
Expand Down

0 comments on commit 37de85a

Please sign in to comment.