Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cp rework 2 #284

Merged
merged 14 commits into from
Apr 16, 2022
9 changes: 8 additions & 1 deletion skylark/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,14 @@ def cp(
for i in range(max_instances):
topo.add_edge(src_region, i, dst_region, i, num_connections)

replicate_helper(topo, source_bucket=bucket_src, dest_bucket=bucket_dst, src_key_prefix=path_src, dest_key_prefix=path_dst, reuse_gateways=reuse_gateways)
replicate_helper(
topo,
source_bucket=bucket_src,
dest_bucket=bucket_dst,
src_key_prefix=path_src,
dest_key_prefix=path_dst,
reuse_gateways=reuse_gateways,
)
else:
raise NotImplementedError(f"{provider_src} to {provider_dst} not supported yet")

Expand Down
31 changes: 25 additions & 6 deletions skylark/cli/cli_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import Dict, List
from sys import platform
from typing import Dict, List
from urllib.parse import ParseResultBytes, parse_qs


import boto3
Expand Down Expand Up @@ -50,10 +51,24 @@ def is_plausible_local_path(path: str):

def parse_path(path: str):
if path.startswith("s3://"):
bucket_name, key_name = path[5:].split("/", 1)
parsed = path[5:].split("/", 1)
if len(parsed) == 1:
bucket_name, key_name = parsed[0], "/"
else:
if parsed[1] == "":
bucket_name, key_name = parsed[0], "/"
else:
bucket_name, key_name = parsed[0], parsed[1]
return "s3", bucket_name, key_name
elif path.startswith("gs://"):
bucket_name, key_name = path[5:].split("/", 1)
parsed = path[5:].split("/", 1)
if len(parsed) == 1:
bucket_name, key_name = parsed[0], "/"
else:
if parsed[1] == "":
bucket_name, key_name = parsed[0], "/"
else:
bucket_name, key_name = parsed[0], parsed[1]
return "gs", bucket_name, key_name
elif (path.startswith("https://") or path.startswith("http://")) and "blob.core.windows.net" in path:
regex = re.compile(r"https?://([^/]+).blob.core.windows.net/([^/]+)/(.*)")
Expand Down Expand Up @@ -228,15 +243,19 @@ def replicate_helper(
)
else:
# make replication job
objs = list(ObjectStoreInterface.create(topo.source_region(), source_bucket).list_objects(src_key_prefix))
src_objs = list(ObjectStoreInterface.create(topo.source_region(), source_bucket).list_objects(src_key_prefix))
dest_is_directory = False
if dest_key_prefix.endswith("/"):
dest_is_directory = True

job = ReplicationJob(
source_region=topo.source_region(),
source_bucket=source_bucket,
dest_region=topo.sink_region(),
dest_bucket=dest_bucket,
src_objs=[obj.key for obj in objs],
dest_objs=[dest_key_prefix + obj.key for obj in objs],
obj_sizes={obj.key: obj.size for obj in objs},
src_objs=[obj.key for obj in src_objs],
dest_objs=[dest_key_prefix + obj.key if dest_is_directory else dest_key_prefix for obj in src_objs],
obj_sizes={obj.key: obj.size for obj in src_objs},
)

rc = ReplicatorClient(
Expand Down