From 37de85ad240f4e240754c9a63a28368612a8dd4c Mon Sep 17 00:00:00 2001 From: Paras Jain Date: Sat, 29 Jan 2022 19:40:01 -0800 Subject: [PATCH] Update solver --- skylark/benchmark/pareto_speedups.py | 91 +++++++++++----------------- skylark/replicate/solver.py | 45 +++++++++----- 2 files changed, 65 insertions(+), 71 deletions(-) diff --git a/skylark/benchmark/pareto_speedups.py b/skylark/benchmark/pareto_speedups.py index d46b9f3f7..fd6a00208 100644 --- a/skylark/benchmark/pareto_speedups.py +++ b/skylark/benchmark/pareto_speedups.py @@ -1,6 +1,7 @@ import argparse import pickle import pickle +from typing import List import cvxpy as cp import numpy as np @@ -9,82 +10,57 @@ from tqdm import tqdm from skylark import GB, skylark_root -from skylark.replicate.solver import ThroughputSolverILP +from skylark.replicate.solver import ThroughputProblem, ThroughputSolution, ThroughputSolverILP @ray.remote -def benchmark( - src, dst, min_throughput, gbyte_to_transfer=1, instance_limit=1, max_connections_per_path=64, max_connections_per_node=64, log_dir=None -): - solver = ThroughputSolverILP(skylark_root / "profiles" / "throughput_mini.csv") - solution = solver.solve_min_cost( - src, - dst, - required_throughput_gbits=min_throughput * instance_limit, - gbyte_to_transfer=gbyte_to_transfer, - instance_limit=instance_limit, - max_connections_per_path=max_connections_per_path, - max_connections_per_node=max_connections_per_node, - solver=cp.CBC, - solver_verbose=False - ) - if solution["feasible"]: - baseline_throughput = solver.get_path_throughput(src, dst) / GB - baseline_cost = solver.get_path_cost(src, dst) * gbyte_to_transfer - return dict( - src=src, - dst=dst, - min_throughput=min_throughput, - gbyte_to_transfer=gbyte_to_transfer, - instance_limit=instance_limit, - max_connections_per_path=max_connections_per_path, - max_connections_per_node=max_connections_per_node, - cost=solution["cost"], - baseline_cost=baseline_cost, - throughput=solution["throughput"], - baseline_throughput=baseline_throughput, - throughput_speedup=solution["throughput"] / baseline_throughput, - cost_factor=solution["cost"] / baseline_cost, - solution=solution, - ) - else: - return None +def benchmark(p: ThroughputProblem) -> ThroughputSolution: + solver = ThroughputSolverILP(skylark_root / "profiles" / "throughput.csv") + solution = solver.solve_min_cost(p=p, instance_cost_multipler=1, solver=cp.CBC, solver_verbose=False) + solution.problem.const_throughput_grid_gbits = None + solution.problem.const_cost_per_gb_grid = None + return solution def main(args): - ray.init() - solver = ThroughputSolverILP(skylark_root / "profiles" / "throughput_mini.csv") + ray.init(address="auto") + solver = ThroughputSolverILP(skylark_root / "profiles" / "throughput.csv") regions = solver.get_regions() - # regions = np.random.choice(regions, size=6, replace=False) + regions = np.random.choice(regions, size=3, replace=False) - configs = [] + problems = [] for src in regions: for dst in regions: if src != dst: for instance_limit in [1, 2, 4]: - for min_throughput in np.linspace(0, args.max_throughput * instance_limit, args.num_throughputs)[1:]: - configs.append(dict( - src=src, - dst=dst, - min_throughput=min_throughput, - gbyte_to_transfer=args.gbyte_to_transfer, - instance_limit=instance_limit, - max_connections_per_path=64, - max_connections_per_node=64, - )) + for min_throughput in np.linspace(0, args.max_throughput * instance_limit, args.num_throughputs): + problems.append( + ThroughputProblem( + src=src, + dst=dst, + required_throughput_gbits=min_throughput, + gbyte_to_transfer=args.gbyte_to_transfer, + instance_limit=instance_limit, + ) + ) results = [] - for config in tqdm(configs, desc="dispatch"): - results.append(benchmark.remote(**config)) + for problem in tqdm(problems, desc="dispatch"): + results.append(benchmark.remote(problem)) # get batches of results with ray.get, update tqdm progress bar + n_feasible, n_infeasible = 0, 0 remaining_refs = results results_out = [] - with tqdm(total=len(results)) as pbar: + with tqdm(total=len(results), desc="Solve") as pbar: while remaining_refs: ready_refs, remaining_refs = ray.wait(remaining_refs, num_returns=1) - results_out.extend(ray.get(ready_refs)) + sol: List[ThroughputSolution] = ray.get(ready_refs) + results_out.extend(sol) pbar.update(len(ready_refs)) + n_feasible += len([s for s in sol if s.is_feasible]) + n_infeasible += len([s for s in sol if not s.is_feasible]) + pbar.set_postfix(feasible=n_feasible, infeasible=n_infeasible) results_out = [r for r in results_out if r is not None] with open(skylark_root / "data" / "pareto.pkl", "wb") as f: @@ -92,12 +68,13 @@ def main(args): # df = pd.DataFrame(results_out) # df.to_pickle(skylark_root / "data" / "pareto.pkl") print(f"Saved {len(results_out)} results to {skylark_root / 'data' / 'pareto.pkl'}") + breakpoint() if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("--max-throughput", type=float, default=20) - parser.add_argument("--num-throughputs", type=int, default=100) + parser.add_argument("--max-throughput", type=float, default=12.5) + parser.add_argument("--num-throughputs", type=int, default=40) parser.add_argument("--gbyte-to-transfer", type=float, default=1) args = parser.parse_args() main(args) diff --git a/skylark/replicate/solver.py b/skylark/replicate/solver.py index f2339bf0a..0be5f0dcb 100644 --- a/skylark/replicate/solver.py +++ b/skylark/replicate/solver.py @@ -53,24 +53,28 @@ class ThroughputSolution: cost_total: Optional[float] = None transfer_runtime_s: Optional[float] = None + # estimated values of baseline throughput, cost, and speedup + baseline_throughput_gbits: Optional[float] = None + baseline_cost: Optional[float] = None + class ThroughputSolver: def __init__(self, df_path, default_throughput=0.0): - self.df = pd.read_csv(df_path).set_index(["src_region", "dst_region"]) + self.df = pd.read_csv(df_path).set_index(["src_region", "src_tier", "dst_region", "dst_tier"]).sort_index() self.default_throughput = default_throughput - def get_path_throughput(self, src, dst): + def get_path_throughput(self, src, dst, src_tier="PREMIUM", dst_tier="PREMIUM"): if src == dst: return self.default_throughput - elif (src, dst) not in self.df.index: + elif (src, dst, src_tier, dst_tier) not in self.df.index: return None - return self.df.loc[(src, dst), "throughput_sent"] + return self.df.loc[(src, dst, src_tier, dst_tier), "throughput_sent"] - def get_path_cost(self, src, dst): - return CloudProvider.get_transfer_cost(src, dst) + def get_path_cost(self, src, dst, premium_tier=True): + return CloudProvider.get_transfer_cost(src, dst, premium_tier=premium_tier) def get_regions(self): - return list(sorted(set(list(self.df.index.levels[0].unique()) + list(self.df.index.levels[1].unique())))) + return list(sorted(set(list(self.df.index.levels[0].unique()) + list(self.df.index.levels[2].unique())))) def get_throughput_grid(self): regions = self.get_regions() @@ -123,14 +127,23 @@ def plot_throughput_grid(self, data_grid, title="Throughput (Gbps)"): class ThroughputSolverILP(ThroughputSolver): + def get_baseline_throughput_and_cost(self, p: ThroughputProblem, src_idx: int, dst_idx: int) -> Tuple[float, float]: + throughput = max(p.instance_limit * p.const_throughput_grid_gbits[src_idx, dst_idx], 1e-6) + transfer_s = p.gbyte_to_transfer * GBIT_PER_GBYTE / throughput + cost = p.cost_per_instance_hr * p.instance_limit * transfer_s / 3600 + return throughput, cost + def solve_min_cost( self, p: ThroughputProblem, instance_cost_multipler: float = 1.0, solver=cp.GLPK, solver_verbose=False, save_lp_path=None ): - logger.debug(f"Solving for problem {p}") - - regions = self.get_regions() - sources = [regions.index(p.src)] - sinks = [regions.index(p.dst)] + try: + regions = self.get_regions() + sources = [regions.index(p.src)] + sinks = [regions.index(p.dst)] + except ValueError as e: + logger.error(f"{p.src} or {p.dst} not in regions") + logger.error(f"regions: {regions}") + raise e # define constants if p.const_throughput_grid_gbits is None: @@ -211,6 +224,7 @@ def solve_min_cost( else: prob.solve(solver=solver, verbose=solver_verbose) + baseline_throughput, baseline_cost = self.get_baseline_throughput_and_cost(p, sources[0], sinks[0]) if prob.status == "optimal": return ThroughputSolution( problem=p, @@ -224,10 +238,13 @@ def solve_min_cost( cost_instance=instance_cost.value, cost_total=instance_cost.value + cost_egress.value, transfer_runtime_s=runtime_s, + baseline_throughput_gbits=baseline_throughput, + baseline_cost=baseline_cost, ) else: - logger.warning(f"Solver status: {prob.status}") - return ThroughputSolution(problem=p, is_feasible=False) + return ThroughputSolution( + problem=p, is_feasible=False, baseline_throughput_gbits=baseline_throughput, baseline_cost=baseline_cost + ) def print_solution(self, solution: ThroughputSolution): if solution.is_feasible: