From 073ccade3a815d27f863b127c1edeff6337ae541 Mon Sep 17 00:00:00 2001 From: Shishir Patil Date: Tue, 22 Nov 2022 12:20:34 -0800 Subject: [PATCH 1/2] default transfer fallback cmd --- skyplane/cli/cli.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/skyplane/cli/cli.py b/skyplane/cli/cli.py index 84f6e11d0..2c8a1c37b 100644 --- a/skyplane/cli/cli.py +++ b/skyplane/cli/cli.py @@ -203,7 +203,11 @@ def cp( and (job.transfer_size / GB) < cloud_config.get_flag("native_cmd_threshold_gb") and small_transfer_cmd ): - typer.secho(f"Transfer is small enough to delegate to native tools. Delegating to: {small_transfer_cmd}", fg="yellow") + typer.secho( + f"Transfer of {job.transfer_size / GB} is small enough to delegate to native tools. Delegating to: {small_transfer_cmd}", + fg="yellow", + ) + typer.secho(f"You can change this by `skyplane config set native_cmd_threshold_gb `") os.system(small_transfer_cmd) return 0 else: From f3684858847720733dd0bd032dd471a6f8b4c200 Mon Sep 17 00:00:00 2001 From: Shishir Patil Date: Tue, 22 Nov 2022 12:21:45 -0800 Subject: [PATCH 2/2] format from prev commit --- skyplane/api/client.py | 16 +++++++-- skyplane/api/provision/dataplane.py | 36 +++++++++++++++---- skyplane/api/provision/provisioner.py | 20 +++++++++-- skyplane/api/transfer_job.py | 15 ++++++-- skyplane/broadcast/gateway/gateway_daemon.py | 13 +++++-- .../gateway/operators/gateway_receiver.py | 3 -- skyplane/cli/experiments/cli_query.py | 12 +++++-- skyplane/compute/aws/aws_cloud_provider.py | 5 ++- skyplane/compute/gcp/gcp_network.py | 12 +++++-- skyplane/compute/key_utils.py | 6 +++- skyplane/obj_store/gcs_interface.py | 6 +++- skyplane/obj_store/s3_interface.py | 2 +- 12 files changed, 117 insertions(+), 29 deletions(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index b21475647..97084ab74 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -41,7 +41,12 @@ def __init__( self.log_dir.mkdir(parents=True, exist_ok=True) logger.open_log_file(self.log_dir / "client.log") - self.provisioner = Provisioner(host_uuid=self.clientid, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth,) + self.provisioner = Provisioner( + host_uuid=self.clientid, + aws_auth=self.aws_auth, + azure_auth=self.azure_auth, + gcp_auth=self.gcp_auth, + ) def copy(self, src: str, dst: str, recursive: bool = False, num_vms: int = 1): provider_src, bucket_src, self.src_prefix = parse_path(src) @@ -71,7 +76,14 @@ def dataplane( num_connections: int = 32, ) -> Dataplane: if type == "direct": - planner = DirectPlanner(src_cloud_provider, src_region, dst_cloud_provider, dst_region, n_vms, num_connections,) + planner = DirectPlanner( + src_cloud_provider, + src_region, + dst_cloud_provider, + dst_region, + n_vms, + num_connections, + ) topo = planner.plan() logger.fs.info(f"[SkyplaneClient.direct_dataplane] Topology: {topo.to_json()}") return Dataplane(clientid=self.clientid, topology=topo, provisioner=self.provisioner, transfer_config=self.transfer_config) diff --git a/skyplane/api/provision/dataplane.py b/skyplane/api/provision/dataplane.py index 179db716a..a02b59be4 100644 --- a/skyplane/api/provision/dataplane.py +++ b/skyplane/api/provision/dataplane.py @@ -38,7 +38,11 @@ class Dataplane: """A Dataplane represents a concrete Skyplane network, including topology and VMs.""" def __init__( - self, clientid: str, topology: ReplicationTopology, provisioner: "Provisioner", transfer_config: TransferConfig, + self, + clientid: str, + topology: ReplicationTopology, + provisioner: "Provisioner", + transfer_config: TransferConfig, ): self.clientid = clientid self.topology = topology @@ -87,11 +91,17 @@ def provision( # initialize clouds self.provisioner.init_global( - aws=len(aws_nodes_to_provision) > 0, azure=len(azure_nodes_to_provision) > 0, gcp=len(gcp_nodes_to_provision) > 0, + aws=len(aws_nodes_to_provision) > 0, + azure=len(azure_nodes_to_provision) > 0, + gcp=len(gcp_nodes_to_provision) > 0, ) # provision VMs - uuids = self.provisioner.provision(authorize_firewall=allow_firewall, max_jobs=max_jobs, spinner=spinner,) + uuids = self.provisioner.provision( + authorize_firewall=allow_firewall, + max_jobs=max_jobs, + spinner=spinner, + ) # bind VMs to nodes servers = [self.provisioner.get_node(u) for u in uuids] @@ -108,7 +118,8 @@ def provision( self.provisioned = True def _start_gateway( - gateway_node: ReplicationTopologyGateway, gateway_server: compute.Server, + gateway_node: ReplicationTopologyGateway, + gateway_server: compute.Server, ): # map outgoing ports setup_args = {} @@ -162,7 +173,8 @@ def deprovision(self, max_jobs: int = 64, spinner: bool = False): raise finally: self.provisioner.deprovision( - max_jobs=max_jobs, spinner=spinner, + max_jobs=max_jobs, + spinner=spinner, ) self.provisioned = False @@ -189,13 +201,23 @@ def source_gateways(self) -> List[compute.Server]: def sink_gateways(self) -> List[compute.Server]: return [self.bound_nodes[n] for n in self.topology.sink_instances()] if self.provisioned else [] - def queue_copy(self, src: str, dst: str, recursive: bool = False,) -> str: + def queue_copy( + self, + src: str, + dst: str, + recursive: bool = False, + ) -> str: job = CopyJob(src, dst, recursive, requester_pays=self.transfer_config.requester_pays) logger.fs.debug(f"[SkyplaneClient] Queued copy job {job}") self.jobs_to_dispatch.append(job) return job.uuid - def queue_sync(self, src: str, dst: str, recursive: bool = False,) -> str: + def queue_sync( + self, + src: str, + dst: str, + recursive: bool = False, + ) -> str: job = SyncJob(src, dst, recursive, requester_pays=self.transfer_config.requester_pays) logger.fs.debug(f"[SkyplaneClient] Queued sync job {job}") self.jobs_to_dispatch.append(job) diff --git a/skyplane/api/provision/provisioner.py b/skyplane/api/provision/provisioner.py index 2fb8c4134..58fbc69b3 100644 --- a/skyplane/api/provision/provisioner.py +++ b/skyplane/api/provision/provisioner.py @@ -96,7 +96,11 @@ def _provision_task(self, task: ProvisionerTask): assert self.gcp.auth.enabled(), "GCP credentials not configured" # todo specify network tier in ReplicationTopology server = self.gcp.provision_instance( - task.region, task.vm_type, use_spot_instances=task.spot, gcp_premium_network=False, tags=task.tags, + task.region, + task.vm_type, + use_spot_instances=task.spot, + gcp_premium_network=False, + tags=task.tags, ) else: raise NotImplementedError(f"Unknown provider {task.cloud_provider}") @@ -131,7 +135,12 @@ def provision(self, authorize_firewall: bool = True, max_jobs: int = 16, spinner # provision VMs logger.fs.info(f"[Provisioner.provision] Provisioning {len(provision_tasks)} VMs") results: List[Tuple[ProvisionerTask, compute.Server]] = do_parallel( - self._provision_task, provision_tasks, n=max_jobs, spinner=spinner, spinner_persist=spinner, desc="Provisioning VMs", + self._provision_task, + provision_tasks, + n=max_jobs, + spinner=spinner, + spinner_persist=spinner, + desc="Provisioning VMs", ) # configure firewall @@ -193,7 +202,12 @@ def deprovision_gateway_instance(server: compute.Server): logger.warning("Azure deprovisioning is very slow. Please be patient.") logger.fs.info(f"[Provisioner.deprovision] Deprovisioning {len(servers)} VMs") do_parallel( - deprovision_gateway_instance, servers, n=max_jobs, spinner=spinner, spinner_persist=False, desc="Deprovisioning VMs", + deprovision_gateway_instance, + servers, + n=max_jobs, + spinner=spinner, + spinner_persist=False, + desc="Deprovisioning VMs", ) # clean up firewall diff --git a/skyplane/api/transfer_job.py b/skyplane/api/transfer_job.py index 98e458dd8..2604a0a76 100644 --- a/skyplane/api/transfer_job.py +++ b/skyplane/api/transfer_job.py @@ -48,7 +48,10 @@ def __init__( self.concurrent_multipart_chunk_threads = concurrent_multipart_chunk_threads def _run_multipart_chunk_thread( - self, exit_event: threading.Event, in_queue: "Queue[Tuple[ObjectStoreObject, ObjectStoreObject]]", out_queue: "Queue[Chunk]", + self, + exit_event: threading.Event, + in_queue: "Queue[Tuple[ObjectStoreObject, ObjectStoreObject]]", + out_queue: "Queue[Chunk]", ): """Chunks large files into many small chunks.""" region = self.dest_iface.region_tag() @@ -221,7 +224,10 @@ def chunk( multipart_send_queue.put((src_obj, dst_obj)) else: yield Chunk( - src_key=src_obj.key, dest_key=dst_obj.key, chunk_id=uuid.uuid4().hex, chunk_length_bytes=src_obj.size, + src_key=src_obj.key, + dest_key=dst_obj.key, + chunk_id=uuid.uuid4().hex, + chunk_length_bytes=src_obj.size, ) if self.transfer_config.multipart_enabled: @@ -322,7 +328,10 @@ def gen_transfer_pairs(self, chunker: Optional[Chunker] = None) -> Generator[Tup ) def dispatch( - self, dataplane: "Dataplane", transfer_config: TransferConfig, dispatch_batch_size: int = 64, + self, + dataplane: "Dataplane", + transfer_config: TransferConfig, + dispatch_batch_size: int = 64, ) -> Generator[ChunkRequest, None, None]: """Dispatch transfer job to specified gateways.""" chunker = Chunker(self.src_iface, self.dst_iface, transfer_config) diff --git a/skyplane/broadcast/gateway/gateway_daemon.py b/skyplane/broadcast/gateway/gateway_daemon.py index 53f60312e..bad50473b 100644 --- a/skyplane/broadcast/gateway/gateway_daemon.py +++ b/skyplane/broadcast/gateway/gateway_daemon.py @@ -35,7 +35,12 @@ class GatewayDaemon: def __init__( - self, region: str, chunk_dir: PathLike, max_incoming_ports=64, use_tls=True, use_e2ee=False, + self, + region: str, + chunk_dir: PathLike, + max_incoming_ports=64, + use_tls=True, + use_e2ee=False, ): # read gateway program gateway_program_path = Path(os.environ["GATEWAY_PROGRAM_FILE"]).expanduser() @@ -302,5 +307,9 @@ def exit_handler(signum, frame): args = parser.parse_args() os.makedirs(args.chunk_dir) - daemon = GatewayDaemon(region=args.region, chunk_dir=args.chunk_dir, use_tls=not args.disable_tls,) + daemon = GatewayDaemon( + region=args.region, + chunk_dir=args.chunk_dir, + use_tls=not args.disable_tls, + ) daemon.run() diff --git a/skyplane/broadcast/gateway/operators/gateway_receiver.py b/skyplane/broadcast/gateway/operators/gateway_receiver.py index e93ce4b59..571f594cb 100644 --- a/skyplane/broadcast/gateway/operators/gateway_receiver.py +++ b/skyplane/broadcast/gateway/operators/gateway_receiver.py @@ -18,9 +18,6 @@ from skyplane.utils.timer import Timer - - - class GatewayReceiver: def __init__( self, diff --git a/skyplane/cli/experiments/cli_query.py b/skyplane/cli/experiments/cli_query.py index 20cb638b0..3f7725a89 100644 --- a/skyplane/cli/experiments/cli_query.py +++ b/skyplane/cli/experiments/cli_query.py @@ -6,15 +6,21 @@ def util_grid_throughput( - src: str, dest: str, src_tier: str = "PREMIUM", dest_tier: str = "PREMIUM", + src: str, + dest: str, + src_tier: str = "PREMIUM", + dest_tier: str = "PREMIUM", ): with path("skyplane.data", "throughput.csv") as throughput_grid_path: solver = ThroughputSolver(throughput_grid_path) - print(solver.get_path_throughput(src, dest, src_tier, dest_tier) / 2 ** 30) + print(solver.get_path_throughput(src, dest, src_tier, dest_tier) / 2**30) def util_grid_cost( - src: str, dest: str, src_tier: str = "PREMIUM", dest_tier: str = "PREMIUM", + src: str, + dest: str, + src_tier: str = "PREMIUM", + dest_tier: str = "PREMIUM", ): with path("skyplane.data", "throughput.csv") as throughput_grid_path: solver = ThroughputSolver(throughput_grid_path) diff --git a/skyplane/compute/aws/aws_cloud_provider.py b/skyplane/compute/aws/aws_cloud_provider.py index 5762817b9..ed69755b6 100644 --- a/skyplane/compute/aws/aws_cloud_provider.py +++ b/skyplane/compute/aws/aws_cloud_provider.py @@ -196,7 +196,10 @@ def start_instance(subnet_id: str): } ], BlockDeviceMappings=[ - {"DeviceName": "/dev/sda1", "Ebs": {"DeleteOnTermination": True, "VolumeSize": disk_size, "VolumeType": "gp2"},} + { + "DeviceName": "/dev/sda1", + "Ebs": {"DeleteOnTermination": True, "VolumeSize": disk_size, "VolumeType": "gp2"}, + } ], NetworkInterfaces=[ { diff --git a/skyplane/compute/gcp/gcp_network.py b/skyplane/compute/gcp/gcp_network.py index abe6c8cb6..df0fa1ba8 100644 --- a/skyplane/compute/gcp/gcp_network.py +++ b/skyplane/compute/gcp/gcp_network.py @@ -112,8 +112,16 @@ def create_default_firewall_rules(errors, self): priority=65533, ) self.create_firewall_rule( - "skyplane-default-allow-ssh", ["0.0.0.0/0"], ["22"], ["tcp"], priority=65533, + "skyplane-default-allow-ssh", + ["0.0.0.0/0"], + ["22"], + ["tcp"], + priority=65533, ) self.create_firewall_rule( - "skyplane-default-allow-icmp", ["0.0.0.0/0"], [], ["icmp"], priority=65533, + "skyplane-default-allow-icmp", + ["0.0.0.0/0"], + [], + ["icmp"], + priority=65533, ) diff --git a/skyplane/compute/key_utils.py b/skyplane/compute/key_utils.py index 6e4fac0c8..cdb17c328 100644 --- a/skyplane/compute/key_utils.py +++ b/skyplane/compute/key_utils.py @@ -9,7 +9,11 @@ def generate_keypair(pubkey_path: PathLike, pem_path: PathLike): - key = rsa.generate_private_key(backend=crypto_default_backend(), public_exponent=65537, key_size=4096,) + key = rsa.generate_private_key( + backend=crypto_default_backend(), + public_exponent=65537, + key_size=4096, + ) private_key = key.private_bytes( crypto_serialization.Encoding.PEM, crypto_serialization.PrivateFormat.TraditionalOpenSSL, crypto_serialization.NoEncryption() ) diff --git a/skyplane/obj_store/gcs_interface.py b/skyplane/obj_store/gcs_interface.py index bffa26e11..4f03754f4 100644 --- a/skyplane/obj_store/gcs_interface.py +++ b/skyplane/obj_store/gcs_interface.py @@ -214,7 +214,11 @@ def upload_object(self, src_file_path, dst_object_name, part_number=None, upload # send XML api request headers = {"Content-MD5": b64_md5sum} if check_md5 else None response = self.send_xml_request( - dst_object_name, {"uploadId": upload_id, "partNumber": part_number}, "PUT", headers=headers, data=open(src_file_path, "rb"), + dst_object_name, + {"uploadId": upload_id, "partNumber": part_number}, + "PUT", + headers=headers, + data=open(src_file_path, "rb"), ) # check response diff --git a/skyplane/obj_store/s3_interface.py b/skyplane/obj_store/s3_interface.py index b41181446..c7de5eea1 100644 --- a/skyplane/obj_store/s3_interface.py +++ b/skyplane/obj_store/s3_interface.py @@ -129,7 +129,7 @@ def download_object( size_bytes=None, write_at_offset=False, generate_md5=False, - write_block_size=2 ** 16, + write_block_size=2**16, ) -> Tuple[Optional[str], Optional[bytes]]: src_object_name, dst_file_path = str(src_object_name), str(dst_file_path)