Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GCS support to skylark cp CLI command #97

Merged
merged 15 commits into from
Jan 27, 2022
6 changes: 4 additions & 2 deletions skylark/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
copy_local_local,
copy_local_s3,
copy_s3_local,
copy_gcs_local,
copy_local_gcs,
deprovision_skylark_instances,
load_config,
ls_local,
Expand Down Expand Up @@ -80,9 +82,9 @@ def cp(src: str, dst: str):
copy_local_s3(Path(path_src), bucket_dst, path_dst)
elif provider_src == "s3" and provider_dst == "local":
copy_s3_local(bucket_src, path_src, Path(path_dst))
elif provider_src == "local" and provider_dst == "gcs":
elif provider_src == "local" and provider_dst == "gs":
copy_local_gcs(Path(path_src), bucket_dst, path_dst)
elif provider_src == "gcs" and provider_dst == "local":
elif provider_src == "gs" and provider_dst == "local":
copy_gcs_local(bucket_src, path_src, Path(path_dst))
else:
raise NotImplementedError(f"{provider_src} to {provider_dst} not supported yet")
Expand Down
51 changes: 49 additions & 2 deletions skylark/cli/cli_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from skylark.compute.gcp.gcp_cloud_provider import GCPCloudProvider
from skylark.obj_store.object_store_interface import ObjectStoreObject
from skylark.obj_store.s3_interface import S3Interface
from skylark.obj_store.gcs_interface import GCSInterface
from skylark.utils.utils import do_parallel
from tqdm import tqdm

Expand Down Expand Up @@ -85,12 +86,58 @@ def copy_local_local(src: Path, dst: Path):
copyfile(src, dst)


# TODO: probably shoudl merge this with the s3 function (duplicate code)
def copy_local_gcs(src: Path, dst_bucket: str, dst_key: str):
raise NotImplementedError(f"GCS not yet supported")
gcs = GCSInterface(None, dst_bucket)
ops: List[concurrent.futures.Future] = []
path_mapping: Dict[concurrent.futures.Future, Path] = {}

def _copy(path: Path, dst_key: str, total_size=0.0):
if path.is_dir():
for child in path.iterdir():
total_size += _copy(child, os.path.join(dst_key, child.name))
return total_size
else:
future = gcs.upload_object(path, dst_key)
ops.append(future)
path_mapping[future] = path
return path.stat().st_size

total_bytes = _copy(src, dst_key)

# wait for all uploads to complete, displaying a progress bar
with tqdm(total=total_bytes, unit="B", unit_scale=True, unit_divisor=1024, desc="Uploading") as pbar:
for op in concurrent.futures.as_completed(ops):
op.result()
pbar.update(path_mapping[op].stat().st_size)


# TODO: probably shoudl merge this with the s3 function (duplicate code)
def copy_gcs_local(src_bucket: str, src_key: str, dst: Path):
raise NotImplementedError(f"GCS not yet supported")
gcs = GCSInterface(None, src_bucket)
ops: List[concurrent.futures.Future] = []
obj_mapping: Dict[concurrent.futures.Future, ObjectStoreObject] = {}

# copy single object
def _copy(src_obj: ObjectStoreObject, dst: Path):
dst.parent.mkdir(exist_ok=True, parents=True)
future = gcs.download_object(src_obj.key, dst)
ops.append(future)
obj_mapping[future] = src_obj
return src_obj.size

total_bytes = 0.0
for obj in gcs.list_objects(prefix=src_key):
sub_key = obj.key[len(src_key) :]
sub_key = sub_key.lstrip("/")
dest_path = dst / sub_key
total_bytes += _copy(obj, dest_path)

# wait for all downloads to complete, displaying a progress bar
with tqdm(total=total_bytes, unit="B", unit_scale=True, unit_divisor=1024, desc="Downloading") as pbar:
for op in concurrent.futures.as_completed(ops):
op.result()
pbar.update(obj_mapping[op].size)


def copy_local_s3(src: Path, dst_bucket: str, dst_key: str, use_tls: bool = True):
Expand Down