diff --git a/skyplane/api/provisioner.py b/skyplane/api/provisioner.py index 29c7f0304..a07281738 100644 --- a/skyplane/api/provisioner.py +++ b/skyplane/api/provisioner.py @@ -85,6 +85,7 @@ def get_node(self, uuid: str) -> compute.Server: return self.provisioned_vms[uuid] def _provision_task(self, task: ProvisionerTask): + print("provision", task.region) with Timer() as t: if task.cloud_provider == "aws": assert self.aws.auth.enabled(), "AWS credentials not configured" diff --git a/skyplane/broadcast/bc_dataplane.py b/skyplane/broadcast/bc_dataplane.py index 27faf4429..f37ddaf07 100644 --- a/skyplane/broadcast/bc_dataplane.py +++ b/skyplane/broadcast/bc_dataplane.py @@ -114,7 +114,7 @@ def add_operator_receive_send( # if no regions to forward data to if len(next_regions) == 0: - print(f"{region} has no next region to forward data to: {g.edges.data()}") + #print(f"{region} has no next region to forward data to: {g.edges.data()}") return False # region name --> ips in this region @@ -280,6 +280,8 @@ def current_gw_programs(self): self.add_operator_receive_send(solution_graph, node_gateway_program, node, partitions, obj_store=None) gateway_programs[node] = self.remap_keys(node_gateway_program.to_dict()) + assert len(gateway_programs[node]) > 0, f"Empty gateway program {node}" + print("PROGRAM", gateway_programs[node]) return gateway_programs @@ -301,6 +303,8 @@ def _start_gateway( if authorize_ssh_pub_key: gateway_server.copy_public_key(authorize_ssh_pub_key) + print("current program", self.current_gw_programs) + gateway_server.start_gateway( {}, # don't need setup arguments here to pass as outgoing_ports gateway_programs=self.current_gw_programs, # NOTE: BC pass in gateway programs diff --git a/skyplane/broadcast/bc_planner.py b/skyplane/broadcast/bc_planner.py index 298394092..7cd645433 100644 --- a/skyplane/broadcast/bc_planner.py +++ b/skyplane/broadcast/bc_planner.py @@ -1,4 +1,5 @@ import os +from random import sample import subprocess from pathlib import Path @@ -578,6 +579,12 @@ def solve_partition( constraints = [] + # dirty hack to ban some regions + print(nodes) + #for banned_node in ["aws:eu-south-2"]: + # i = nodes.index(banned_node) + # constraints.append(v[i] == 1) + # constraints on VM per region for i in range(num_nodes): constraints.append(v[i] <= max_vm_per_region - existing_vms[i]) @@ -676,10 +683,11 @@ def solve_partition( i[edges.index(e)] = 1 # keep future solutions feasible by making sure destinations have # enough remaining ingress to recieve the remaining data - constraints.append( - cp.sum(i @ p) * partition_size_gb * 8 + existing_ingress[node_i] - <= s * ingress_limit[node_i] * (v[node_i] + existing_vms[node_i]) - remaining_data_size_gb * 8 - ) + if node in dest_v: + constraints.append( + cp.sum(i @ p) * partition_size_gb * 8 + existing_ingress[node_i] + <= s * ingress_limit[node_i] * (v[node_i] + existing_vms[node_i]) - remaining_data_size_gb * 8 + ) prob = cp.Problem(obj, constraints) @@ -716,6 +724,12 @@ def plan_iterative( g = g.subgraph(src_dst_li + sampled).copy() print(f"Filter node (only use): {src_dst_li + sampled}") + # banned nodes + sampled = list(self.G.nodes) + sampled.remove("aws:eu-south-2") + sampled.remove("aws:eu-central-2") + g = g.subgraph(sampled).copy() + cost = np.array([e[2] for e in g.edges(data="cost")]) tp = np.array([e[2] for e in g.edges(data="throughput")]) edges = list(g.edges) @@ -860,6 +874,12 @@ def plan( g = g.subgraph(src_dst_li + sampled).copy() print(f"Filter node (only use): {src_dst_li + sampled}") + # banned nodes + sampled = list(self.G.nodes) + sampled.remove("aws:eu-south-2") + sampled.remove("aws:eu-central-2") + g = g.subgraph(sampled).copy() + cost = np.array([e[2] for e in g.edges(data="cost")]) tp = np.array([e[2] for e in g.edges(data="throughput")]) diff --git a/skyplane/broadcast/gateway/gateway_daemon.py b/skyplane/broadcast/gateway/gateway_daemon.py index 63bb5a5eb..1365d8eb2 100644 --- a/skyplane/broadcast/gateway/gateway_daemon.py +++ b/skyplane/broadcast/gateway/gateway_daemon.py @@ -43,6 +43,7 @@ def __init__( print("starting gateway daemon", gateway_program_path) pprint(gateway_program) + assert len(gateway_program) > 0, f"Cannot have empty gateway program {gateway_program}" self.use_tls = use_tls @@ -253,6 +254,7 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_ pprint(gateway_program) # create operator tree for each partition + total_p = 0 for program_group in gateway_program: partitions = program_group["partitions"] program = program_group["value"] @@ -278,7 +280,7 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_ self.chunk_store.add_partition(str(partition), queue) # create DAG for this partition group - total_p = create_gateway_operators_helper( + total_p += create_gateway_operators_helper( self.chunk_store.chunk_requests[str(partition)], # incoming chunk requests for partition program, # single partition program partitions diff --git a/skyplane/broadcast/impl/bc_transfer_job.py b/skyplane/broadcast/impl/bc_transfer_job.py index 013ba11ce..46b442739 100644 --- a/skyplane/broadcast/impl/bc_transfer_job.py +++ b/skyplane/broadcast/impl/bc_transfer_job.py @@ -184,7 +184,12 @@ def dispatch_id_maps_to_dst(): ) end = time.time() if reply.status != 200: + print("failed to dispatch") + print(server.instance_name()) + print(server.public_ip()) + time.sleep(100000) raise Exception(f"Failed to dispatch chunk requests {server.instance_name()}: {reply.data.decode('utf-8')}") + logger.fs.debug( f"Dispatched {len(batch)} chunk requests to {server.instance_name()} ({n_bytes} bytes) in {end - start:.2f} seconds" ) diff --git a/skyplane/broadcast/test/bc_objstore.py b/skyplane/broadcast/test/bc_objstore.py index 23bf65450..3dd995983 100644 --- a/skyplane/broadcast/test/bc_objstore.py +++ b/skyplane/broadcast/test/bc_objstore.py @@ -30,8 +30,8 @@ def start_transfer(args): # source_file = f"s3://broadcast-opt-{src_region}/test_replication/" # dest_files = [f"s3://broadcast-opt-{d}/skyplane/" for d in dst_regions] - source_file = "s3://broadcast-exp1-ap-east-1/OPT-66B/" - dest_files = [f"s3://broadcast-exp1-{d}/OPT-66B/" for d in dst_regions] + source_file = "s3://broadcast-opt-ap-east-1/test_replication/" + dest_files = [f"s3://broadcast-opt-{d}/test_replication/" for d in dst_regions] # source_file = "s3://skyplane-broadcast/imagenet-images/" # dest_files = [f"s3://broadcast-exp1-{d}/imagenet-images/" for d in dst_regions] diff --git a/skyplane/utils/fn.py b/skyplane/utils/fn.py index 076963339..78d9bcafc 100644 --- a/skyplane/utils/fn.py +++ b/skyplane/utils/fn.py @@ -42,7 +42,7 @@ def wrapped_fn(args): try: return args, func(args) except Exception as e: - logger.error(f"Error running {func.__name__}: {e}") + logger.error(f"Error running {func.__name__}, {args}: {e}") raise results = []