Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Native cmd fallback #694

Merged
merged 2 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions skyplane/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ def __init__(
self.log_dir.mkdir(parents=True, exist_ok=True)
logger.open_log_file(self.log_dir / "client.log")

self.provisioner = Provisioner(host_uuid=self.clientid, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth,)
self.provisioner = Provisioner(
host_uuid=self.clientid,
aws_auth=self.aws_auth,
azure_auth=self.azure_auth,
gcp_auth=self.gcp_auth,
)

def copy(self, src: str, dst: str, recursive: bool = False, num_vms: int = 1):
provider_src, bucket_src, self.src_prefix = parse_path(src)
Expand Down Expand Up @@ -71,7 +76,14 @@ def dataplane(
num_connections: int = 32,
) -> Dataplane:
if type == "direct":
planner = DirectPlanner(src_cloud_provider, src_region, dst_cloud_provider, dst_region, n_vms, num_connections,)
planner = DirectPlanner(
src_cloud_provider,
src_region,
dst_cloud_provider,
dst_region,
n_vms,
num_connections,
)
topo = planner.plan()
logger.fs.info(f"[SkyplaneClient.direct_dataplane] Topology: {topo.to_json()}")
return Dataplane(clientid=self.clientid, topology=topo, provisioner=self.provisioner, transfer_config=self.transfer_config)
Expand Down
36 changes: 29 additions & 7 deletions skyplane/api/provision/dataplane.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ class Dataplane:
"""A Dataplane represents a concrete Skyplane network, including topology and VMs."""

def __init__(
self, clientid: str, topology: ReplicationTopology, provisioner: "Provisioner", transfer_config: TransferConfig,
self,
clientid: str,
topology: ReplicationTopology,
provisioner: "Provisioner",
transfer_config: TransferConfig,
):
self.clientid = clientid
self.topology = topology
Expand Down Expand Up @@ -87,11 +91,17 @@ def provision(

# initialize clouds
self.provisioner.init_global(
aws=len(aws_nodes_to_provision) > 0, azure=len(azure_nodes_to_provision) > 0, gcp=len(gcp_nodes_to_provision) > 0,
aws=len(aws_nodes_to_provision) > 0,
azure=len(azure_nodes_to_provision) > 0,
gcp=len(gcp_nodes_to_provision) > 0,
)

# provision VMs
uuids = self.provisioner.provision(authorize_firewall=allow_firewall, max_jobs=max_jobs, spinner=spinner,)
uuids = self.provisioner.provision(
authorize_firewall=allow_firewall,
max_jobs=max_jobs,
spinner=spinner,
)

# bind VMs to nodes
servers = [self.provisioner.get_node(u) for u in uuids]
Expand All @@ -108,7 +118,8 @@ def provision(
self.provisioned = True

def _start_gateway(
gateway_node: ReplicationTopologyGateway, gateway_server: compute.Server,
gateway_node: ReplicationTopologyGateway,
gateway_server: compute.Server,
):
# map outgoing ports
setup_args = {}
Expand Down Expand Up @@ -162,7 +173,8 @@ def deprovision(self, max_jobs: int = 64, spinner: bool = False):
raise
finally:
self.provisioner.deprovision(
max_jobs=max_jobs, spinner=spinner,
max_jobs=max_jobs,
spinner=spinner,
)
self.provisioned = False

Expand All @@ -189,13 +201,23 @@ def source_gateways(self) -> List[compute.Server]:
def sink_gateways(self) -> List[compute.Server]:
return [self.bound_nodes[n] for n in self.topology.sink_instances()] if self.provisioned else []

def queue_copy(self, src: str, dst: str, recursive: bool = False,) -> str:
def queue_copy(
self,
src: str,
dst: str,
recursive: bool = False,
) -> str:
job = CopyJob(src, dst, recursive, requester_pays=self.transfer_config.requester_pays)
logger.fs.debug(f"[SkyplaneClient] Queued copy job {job}")
self.jobs_to_dispatch.append(job)
return job.uuid

def queue_sync(self, src: str, dst: str, recursive: bool = False,) -> str:
def queue_sync(
self,
src: str,
dst: str,
recursive: bool = False,
) -> str:
job = SyncJob(src, dst, recursive, requester_pays=self.transfer_config.requester_pays)
logger.fs.debug(f"[SkyplaneClient] Queued sync job {job}")
self.jobs_to_dispatch.append(job)
Expand Down
20 changes: 17 additions & 3 deletions skyplane/api/provision/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ def _provision_task(self, task: ProvisionerTask):
assert self.gcp.auth.enabled(), "GCP credentials not configured"
# todo specify network tier in ReplicationTopology
server = self.gcp.provision_instance(
task.region, task.vm_type, use_spot_instances=task.spot, gcp_premium_network=False, tags=task.tags,
task.region,
task.vm_type,
use_spot_instances=task.spot,
gcp_premium_network=False,
tags=task.tags,
)
else:
raise NotImplementedError(f"Unknown provider {task.cloud_provider}")
Expand Down Expand Up @@ -131,7 +135,12 @@ def provision(self, authorize_firewall: bool = True, max_jobs: int = 16, spinner
# provision VMs
logger.fs.info(f"[Provisioner.provision] Provisioning {len(provision_tasks)} VMs")
results: List[Tuple[ProvisionerTask, compute.Server]] = do_parallel(
self._provision_task, provision_tasks, n=max_jobs, spinner=spinner, spinner_persist=spinner, desc="Provisioning VMs",
self._provision_task,
provision_tasks,
n=max_jobs,
spinner=spinner,
spinner_persist=spinner,
desc="Provisioning VMs",
)

# configure firewall
Expand Down Expand Up @@ -193,7 +202,12 @@ def deprovision_gateway_instance(server: compute.Server):
logger.warning("Azure deprovisioning is very slow. Please be patient.")
logger.fs.info(f"[Provisioner.deprovision] Deprovisioning {len(servers)} VMs")
do_parallel(
deprovision_gateway_instance, servers, n=max_jobs, spinner=spinner, spinner_persist=False, desc="Deprovisioning VMs",
deprovision_gateway_instance,
servers,
n=max_jobs,
spinner=spinner,
spinner_persist=False,
desc="Deprovisioning VMs",
)

# clean up firewall
Expand Down
15 changes: 12 additions & 3 deletions skyplane/api/transfer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ def __init__(
self.concurrent_multipart_chunk_threads = concurrent_multipart_chunk_threads

def _run_multipart_chunk_thread(
self, exit_event: threading.Event, in_queue: "Queue[Tuple[ObjectStoreObject, ObjectStoreObject]]", out_queue: "Queue[Chunk]",
self,
exit_event: threading.Event,
in_queue: "Queue[Tuple[ObjectStoreObject, ObjectStoreObject]]",
out_queue: "Queue[Chunk]",
):
"""Chunks large files into many small chunks."""
region = self.dest_iface.region_tag()
Expand Down Expand Up @@ -221,7 +224,10 @@ def chunk(
multipart_send_queue.put((src_obj, dst_obj))
else:
yield Chunk(
src_key=src_obj.key, dest_key=dst_obj.key, chunk_id=uuid.uuid4().hex, chunk_length_bytes=src_obj.size,
src_key=src_obj.key,
dest_key=dst_obj.key,
chunk_id=uuid.uuid4().hex,
chunk_length_bytes=src_obj.size,
)

if self.transfer_config.multipart_enabled:
Expand Down Expand Up @@ -322,7 +328,10 @@ def gen_transfer_pairs(self, chunker: Optional[Chunker] = None) -> Generator[Tup
)

def dispatch(
self, dataplane: "Dataplane", transfer_config: TransferConfig, dispatch_batch_size: int = 64,
self,
dataplane: "Dataplane",
transfer_config: TransferConfig,
dispatch_batch_size: int = 64,
) -> Generator[ChunkRequest, None, None]:
"""Dispatch transfer job to specified gateways."""
chunker = Chunker(self.src_iface, self.dst_iface, transfer_config)
Expand Down
13 changes: 11 additions & 2 deletions skyplane/broadcast/gateway/gateway_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@

class GatewayDaemon:
def __init__(
self, region: str, chunk_dir: PathLike, max_incoming_ports=64, use_tls=True, use_e2ee=False,
self,
region: str,
chunk_dir: PathLike,
max_incoming_ports=64,
use_tls=True,
use_e2ee=False,
):
# read gateway program
gateway_program_path = Path(os.environ["GATEWAY_PROGRAM_FILE"]).expanduser()
Expand Down Expand Up @@ -302,5 +307,9 @@ def exit_handler(signum, frame):
args = parser.parse_args()

os.makedirs(args.chunk_dir)
daemon = GatewayDaemon(region=args.region, chunk_dir=args.chunk_dir, use_tls=not args.disable_tls,)
daemon = GatewayDaemon(
region=args.region,
chunk_dir=args.chunk_dir,
use_tls=not args.disable_tls,
)
daemon.run()
3 changes: 0 additions & 3 deletions skyplane/broadcast/gateway/operators/gateway_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
from skyplane.utils.timer import Timer





class GatewayReceiver:
def __init__(
self,
Expand Down
6 changes: 5 additions & 1 deletion skyplane/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,11 @@ def cp(
and (job.transfer_size / GB) < cloud_config.get_flag("native_cmd_threshold_gb")
and small_transfer_cmd
):
typer.secho(f"Transfer is small enough to delegate to native tools. Delegating to: {small_transfer_cmd}", fg="yellow")
typer.secho(
f"Transfer of {job.transfer_size / GB} is small enough to delegate to native tools. Delegating to: {small_transfer_cmd}",
fg="yellow",
)
typer.secho(f"You can change this by `skyplane config set native_cmd_threshold_gb <value>`")
os.system(small_transfer_cmd)
return 0
else:
Expand Down
12 changes: 9 additions & 3 deletions skyplane/cli/experiments/cli_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@


def util_grid_throughput(
src: str, dest: str, src_tier: str = "PREMIUM", dest_tier: str = "PREMIUM",
src: str,
dest: str,
src_tier: str = "PREMIUM",
dest_tier: str = "PREMIUM",
):
with path("skyplane.data", "throughput.csv") as throughput_grid_path:
solver = ThroughputSolver(throughput_grid_path)
print(solver.get_path_throughput(src, dest, src_tier, dest_tier) / 2 ** 30)
print(solver.get_path_throughput(src, dest, src_tier, dest_tier) / 2**30)


def util_grid_cost(
src: str, dest: str, src_tier: str = "PREMIUM", dest_tier: str = "PREMIUM",
src: str,
dest: str,
src_tier: str = "PREMIUM",
dest_tier: str = "PREMIUM",
):
with path("skyplane.data", "throughput.csv") as throughput_grid_path:
solver = ThroughputSolver(throughput_grid_path)
Expand Down
5 changes: 4 additions & 1 deletion skyplane/compute/aws/aws_cloud_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ def start_instance(subnet_id: str):
}
],
BlockDeviceMappings=[
{"DeviceName": "/dev/sda1", "Ebs": {"DeleteOnTermination": True, "VolumeSize": disk_size, "VolumeType": "gp2"},}
{
"DeviceName": "/dev/sda1",
"Ebs": {"DeleteOnTermination": True, "VolumeSize": disk_size, "VolumeType": "gp2"},
}
],
NetworkInterfaces=[
{
Expand Down
12 changes: 10 additions & 2 deletions skyplane/compute/gcp/gcp_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,16 @@ def create_default_firewall_rules(errors, self):
priority=65533,
)
self.create_firewall_rule(
"skyplane-default-allow-ssh", ["0.0.0.0/0"], ["22"], ["tcp"], priority=65533,
"skyplane-default-allow-ssh",
["0.0.0.0/0"],
["22"],
["tcp"],
priority=65533,
)
self.create_firewall_rule(
"skyplane-default-allow-icmp", ["0.0.0.0/0"], [], ["icmp"], priority=65533,
"skyplane-default-allow-icmp",
["0.0.0.0/0"],
[],
["icmp"],
priority=65533,
)
6 changes: 5 additions & 1 deletion skyplane/compute/key_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@


def generate_keypair(pubkey_path: PathLike, pem_path: PathLike):
key = rsa.generate_private_key(backend=crypto_default_backend(), public_exponent=65537, key_size=4096,)
key = rsa.generate_private_key(
backend=crypto_default_backend(),
public_exponent=65537,
key_size=4096,
)
private_key = key.private_bytes(
crypto_serialization.Encoding.PEM, crypto_serialization.PrivateFormat.TraditionalOpenSSL, crypto_serialization.NoEncryption()
)
Expand Down
6 changes: 5 additions & 1 deletion skyplane/obj_store/gcs_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ def upload_object(self, src_file_path, dst_object_name, part_number=None, upload
# send XML api request
headers = {"Content-MD5": b64_md5sum} if check_md5 else None
response = self.send_xml_request(
dst_object_name, {"uploadId": upload_id, "partNumber": part_number}, "PUT", headers=headers, data=open(src_file_path, "rb"),
dst_object_name,
{"uploadId": upload_id, "partNumber": part_number},
"PUT",
headers=headers,
data=open(src_file_path, "rb"),
)

# check response
Expand Down
2 changes: 1 addition & 1 deletion skyplane/obj_store/s3_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def download_object(
size_bytes=None,
write_at_offset=False,
generate_md5=False,
write_block_size=2 ** 16,
write_block_size=2**16,
) -> Tuple[Optional[str], Optional[bytes]]:
src_object_name, dst_file_path = str(src_object_name), str(dst_file_path)

Expand Down