From 592e72a36daa2be9858604adb8f0667295750aec Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Thu, 15 Jun 2023 06:57:59 +0000 Subject: [PATCH 01/13] fix planner --- Dockerfile | 26 +++++++++++++------------- skyplane/api/pipeline.py | 2 +- skyplane/planner/planner.py | 3 ++- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/Dockerfile b/Dockerfile index a861e1055..4a99ae0d1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,19 +7,19 @@ RUN --mount=type=cache,target=/var/cache/apt apt-get update \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* -#install HDFS Onprem Packages -RUN apt-get update && \ - apt-get install -y openjdk-11-jdk && \ - apt-get clean - -ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64 - -RUN wget https://archive.apache.org/dist/hadoop/core/hadoop-3.3.0/hadoop-3.3.0.tar.gz -P /tmp \ - && tar -xzf /tmp/hadoop-3.3.0.tar.gz -C /tmp \ - && mv /tmp/hadoop-3.3.0 /usr/local/hadoop \ - && rm /tmp/hadoop-3.3.0.tar.gz - -ENV HADOOP_HOME /usr/local/hadoop +##install HDFS Onprem Packages +#RUN apt-get update && \ +# apt-get install -y openjdk-11-jdk && \ +# apt-get clean +# +#ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64 +# +#RUN wget https://archive.apache.org/dist/hadoop/core/hadoop-3.3.0/hadoop-3.3.0.tar.gz -P /tmp \ +# && tar -xzf /tmp/hadoop-3.3.0.tar.gz -C /tmp \ +# && mv /tmp/hadoop-3.3.0 /usr/local/hadoop \ +# && rm /tmp/hadoop-3.3.0.tar.gz +# +#ENV HADOOP_HOME /usr/local/hadoop # configure stunnel RUN mkdir -p /etc/stunnel \ diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index 5471ebf55..17adf740e 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -68,7 +68,7 @@ def __init__( # planner self.planning_algorithm = planning_algorithm if self.planning_algorithm == "direct": - self.planner = MulticastDirectPlanner(self.max_instances, 64) + self.planner = MulticastDirectPlanner(self.max_instances, 64, self.transfer_config) elif self.planning_algorithm == "src_one_sided": self.planner = DirectPlannerSourceOneSided(self.max_instances, 64) elif self.planning_algorithm == "dst_one_sided": diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 59eeac33e..0110158cb 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -176,7 +176,8 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: vm_types = {v[0]: Planner._vcpus_to_vm(cloud_provider=v[0].split(":")[0], vcpus=v[1][0]) for v in vm_info} # type: ignore # Taking the minimum so that we can use the same number of instances for both source and destination - n_instances = min(v[1][1] for v in vm_info) # type: ignore + n_instances = min(self.n_instances, min(v[1][1] for v in vm_info)) # type: ignore + print("n_instances", n_instances) # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions for i in range(n_instances): From 42256443d3b33d98ab5a6949c866bfc50285ae72 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Fri, 16 Jun 2023 02:12:15 +0000 Subject: [PATCH 02/13] add copy tests --- tests/integration/test_cp.py | 99 ++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 tests/integration/test_cp.py diff --git a/tests/integration/test_cp.py b/tests/integration/test_cp.py new file mode 100644 index 000000000..cedb5c599 --- /dev/null +++ b/tests/integration/test_cp.py @@ -0,0 +1,99 @@ +import pytest +from skyplane.utils import logger +import time +from skyplane.api.client import SkyplaneClient +from skyplane.obj_store.object_store_interface import ObjectStoreInterface +import uuid +import os + +test_bucket = "gs://skyplane-test-bucket" # bucket containing test data + +# test cases +test_bucket_small_file = f"{test_bucket}/files_100000_size_4_mb" +test_bucket_large_file = f"{test_bucket}/file_1_size_416_gb" +test_bucket_empty_folder = f"{test_bucket}/empty_folder" + +region_tags = [ + "aws:us-west-2", + "azure:westus2", + "gcp:us-west2", + "gcp:us-east4", # TODO: make sure one is in same region as bucket +] + + +@pytest.mark.skip(reason="Shared function") +def setup_bucket(region_tag): + provider, region = region_tag.split(":") + if provider == "azure": + bucket_name = f"integration{region}/{str(uuid.uuid4()).replace('-', '')}" + else: + bucket_name = f"integration{region}-{str(uuid.uuid4())[:8]}" + + # create bucket + try: + iface = ObjectStoreInterface.create(region_tag, bucket_name) + iface.create_bucket(region) + except Exception as e: + logger.fs.error(f"Failed to create bucket {bucket_name}: {e}") + raise e + + return iface + +@pytest.fixture +def bucket(region_tag): + iface = setup_bucket(region_tag) + yield iface.bucket() + # cleanup + iface.delete_bucket() + +@pytest.fixture() +def azure_bucket(): + azure_region_tag = "azure:westus2" + iface = setup_bucket(azure_region_tag) + while not iface.bucket_exists(): + logger.fs.info(f"Waiting for bucket {iface.bucket()}") + time.sleep(1) + yield iface.bucket() + # cleanup + iface.delete_bucket() + +@pytest.fixture() +def aws_bucket(): + aws_region_tag = "aws:us-west-2" + iface = setup_bucket(aws_region_tag) + #while not iface.bucket_exists(): + # print("waiting for bucket...") + # logger.fs.info(f"Waiting for bucket {iface.bucket()}") + # time.sleep(1) + + assert iface.bucket_exists(), f"Bucket {iface.bucket()} does not exist" + + yield iface.bucket() + # cleanup + #iface.delete_bucket() + + +@pytest.mark.parametrize("test_case", [test_bucket_small_file, test_bucket_large_file, test_bucket_empty_folder]) +def test_cp_aws(aws_bucket, test_case): + + client = SkyplaneClient() + src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) + + print("AWS BUCKEt", aws_bucket) + + test_case = "files_100000_size_4_mb" + assert isinstance(aws_bucket, str), f"Bucket name is not a string {aws_bucket}" + assert len(list(src_iface.list_objects(prefix=test_case))) > 0, f"Test case {test_bucket}/{test_case} does not exist in {test_bucket}" + client.copy(test_case, f"s3://{aws_bucket}/{test_case}", recursive=True) + + +# test one sided transfers + +# test multicast + +# test same region transfers + +# test multiple VMs + + + From 53d3439215c71cc4d28e2f3a7fabfcec6f019f4e Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Fri, 16 Jun 2023 04:57:29 +0000 Subject: [PATCH 03/13] initial integration tests --- tests/integration/test_cp.py | 88 ++++++++++++++++++++++++++---------- 1 file changed, 65 insertions(+), 23 deletions(-) diff --git a/tests/integration/test_cp.py b/tests/integration/test_cp.py index cedb5c599..6783503e1 100644 --- a/tests/integration/test_cp.py +++ b/tests/integration/test_cp.py @@ -1,4 +1,5 @@ import pytest +from skyplane.api.config import TransferConfig from skyplane.utils import logger import time from skyplane.api.client import SkyplaneClient @@ -7,19 +8,12 @@ import os test_bucket = "gs://skyplane-test-bucket" # bucket containing test data +test_region_tag = "gcp:us-west2" # test cases -test_bucket_small_file = f"{test_bucket}/files_100000_size_4_mb" -test_bucket_large_file = f"{test_bucket}/file_1_size_416_gb" -test_bucket_empty_folder = f"{test_bucket}/empty_folder" - -region_tags = [ - "aws:us-west-2", - "azure:westus2", - "gcp:us-west2", - "gcp:us-east4", # TODO: make sure one is in same region as bucket -] - +test_bucket_small_file = f"{test_bucket}/files_10000_size_4_mb" +test_bucket_large_file = f"{test_bucket}/file_1_size_16_gb" +#test_bucket_empty_folder = f"{test_bucket}/empty_folder" @pytest.mark.skip(reason="Shared function") def setup_bucket(region_tag): @@ -39,14 +33,33 @@ def setup_bucket(region_tag): return iface -@pytest.fixture +@pytest.fixture(scope="session") def bucket(region_tag): iface = setup_bucket(region_tag) yield iface.bucket() # cleanup iface.delete_bucket() -@pytest.fixture() +@pytest.fixture(scope="session") +def same_region_bucket(): + iface = setup_bucket(test_region_tag) + assert iface.bucket_exists(), f"Bucket {iface.bucket()} does not exist" + yield iface.bucket() + + # cleanup + iface.delete_bucket() + +@pytest.fixture(scope="session") +def gcp_bucket(): + region_tag = "gcp:europe-west2" + iface = setup_bucket(region_tag) + assert iface.bucket_exists(), f"Bucket {iface.bucket()} does not exist" + yield iface.bucket() + + # cleanup + iface.delete_bucket() + +@pytest.fixture(scope="session") def azure_bucket(): azure_region_tag = "azure:westus2" iface = setup_bucket(azure_region_tag) @@ -57,7 +70,7 @@ def azure_bucket(): # cleanup iface.delete_bucket() -@pytest.fixture() +@pytest.fixture(scope="session") def aws_bucket(): aws_region_tag = "aws:us-west-2" iface = setup_bucket(aws_region_tag) @@ -70,30 +83,59 @@ def aws_bucket(): yield iface.bucket() # cleanup - #iface.delete_bucket() + iface.delete_bucket() + +@pytest.fixture(scope="session") +def cloudflare_bucket(): + iface = setup_bucket("cloudflare:infer") + assert iface.bucket_exists(), f"Bucket {iface.bucket()} does not exist" + yield iface.bucket() + # cleanup + iface.delete_bucket() -@pytest.mark.parametrize("test_case", [test_bucket_small_file, test_bucket_large_file, test_bucket_empty_folder]) + +# TODO: add more parameters for bucket types +@pytest.mark.parametrize("test_case", [test_bucket_large_file]) #, test_bucket_empty_folder]) def test_cp_aws(aws_bucket, test_case): client = SkyplaneClient() src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) - print("AWS BUCKEt", aws_bucket) - - test_case = "files_100000_size_4_mb" assert isinstance(aws_bucket, str), f"Bucket name is not a string {aws_bucket}" - assert len(list(src_iface.list_objects(prefix=test_case))) > 0, f"Test case {test_bucket}/{test_case} does not exist in {test_bucket}" + assert len(list(src_iface.list_objects(prefix=test_case.replace(f"{test_bucket}/", "")))) > 0, f"Test case {test_case} does not exist in {test_bucket}" client.copy(test_case, f"s3://{aws_bucket}/{test_case}", recursive=True) + # assert sync has cost zero + pipeline = client.get_pipeline() + pipeline.queue_copy(test_case, f"s3://{aws_bucket}/{test_case}", recursive=True) + cost = pipeline.estimate_total_cost() + assert cost == 0, f"Cost is not zero {cost}, still objects to copy" + + # copy back + client.copy(f"s3://{aws_bucket}/{test_case}", f"{test_bucket}/aws/{test_case}", recursive=True) # test one sided transfers +def test_cp_one_sided(): + pass -# test multicast +# test multiple VMs +def test_cp_multiple_vms(aws_bucket): + print("starting") + client = SkyplaneClient() + print("created client") + pipeline = client.pipeline(max_instances=2) + print('created pipeline)') + pipeline.queue_copy(test_bucket_large_file, f"s3://{aws_bucket}/") + print('start') + pipeline.start(debug=True, progress=True) + print('started pipeline') -# test same region transfers -# test multiple VMs +# test multicast +def test_cp_multicast(aws_bucket, gcp_bucket, azure_bucket): + client = SkyplaneClient() + client.copy(test_bucket_large_file, [f"s3://{aws_bucket}/", f"gs://{gcp_bucket}/", f"az://{azure_bucket}/"]) From 2429449e9ed035e11e6ffb681183d48ad851f16e Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Fri, 16 Jun 2023 06:30:13 +0000 Subject: [PATCH 04/13] planner fixe --- skyplane/api/client.py | 2 +- skyplane/compute/aws/aws_cloud_provider.py | 3 +- skyplane/planner/planner.py | 11 +++--- skyplane/utils/path.py | 2 ++ tests/integration/test_cp.py | 39 ++++++++++++++-------- 5 files changed, 37 insertions(+), 20 deletions(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index f7afae7e1..d931e6625 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -100,7 +100,7 @@ def copy(self, src: str, dst: str, recursive: bool = False): pipeline = self.pipeline() pipeline.queue_copy(src, dst, recursive=recursive) - pipeline.start() + pipeline.start(progress=True) def object_store(self): return ObjectStore() diff --git a/skyplane/compute/aws/aws_cloud_provider.py b/skyplane/compute/aws/aws_cloud_provider.py index 780ef6305..b32140e51 100644 --- a/skyplane/compute/aws/aws_cloud_provider.py +++ b/skyplane/compute/aws/aws_cloud_provider.py @@ -229,7 +229,8 @@ def start_instance(subnet_id: str): elif "VcpuLimitExceeded" in str(e): raise skyplane_exceptions.InsufficientVCPUException() from e elif "Invalid IAM Instance Profile name" in str(e): - logger.warning(str(e)) + # TODO: suppress this + logger.(str(e)) elif "InsufficientInstanceCapacity" in str(e): # try another subnet current_subnet_id = (current_subnet_id + 1) % len(subnets) diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 1fa5a12f7..1e84c7af1 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -34,6 +34,7 @@ class Planner: def __init__(self, transfer_config: TransferConfig): self.transfer_config = transfer_config self.config = SkyplaneConfig.load_config(config_path) + self.n_instances = self.config.get_flag("max_instances") # Loading the quota information, add ibm cloud when it is supported self.quota_limits = {} @@ -117,11 +118,13 @@ def _calculate_vm_types(self, region_tag: str) -> Optional[Tuple[str, int]]: cloud_provider=cloud_provider, region=region, spot=getattr(self.transfer_config, f"{cloud_provider}_use_spot_instances") ) + config_vm_type = getattr(self.transfer_config, f"{cloud_provider}_instance_class") + # No quota limits (quota limits weren't initialized properly during skyplane init) if quota_limit is None: - return None + logger.warning(f"Quota information for {cloud_provider} wasn't loaded properly during `skyplane init`, using defaults.") + return config_vm_type, self.n_instances - config_vm_type = getattr(self.transfer_config, f"{cloud_provider}_instance_class") config_vcpus = self._vm_to_vcpus(cloud_provider, config_vm_type) if config_vcpus <= quota_limit: return config_vm_type, quota_limit // config_vcpus @@ -133,8 +136,7 @@ def _calculate_vm_types(self, region_tag: str) -> Optional[Tuple[str, int]]: break # shouldn't happen, but just in case we use more complicated vm types in the future - if vm_type is None or vcpus is None: - return None + assert vm_type is not None and vcpus is not None # number of instances allowed by the quota with the selected vm type n_instances = quota_limit // vcpus @@ -267,6 +269,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" + print("REGION TAGS", src_region_tag, dst_region_tags) plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags) # Dynammically calculate n_instances based on quota limits diff --git a/skyplane/utils/path.py b/skyplane/utils/path.py index 3328fe956..2f57c1926 100644 --- a/skyplane/utils/path.py +++ b/skyplane/utils/path.py @@ -66,6 +66,8 @@ def is_plausible_local_path(path_test: str): else: if not is_plausible_local_path(path): logger.warning(f"Local path '{path}' does not exist") + if path.startswith("az://"): + logger.warning(f"Did you mean azure://...? If not, assuming local path.") # path is subsitutute for bucket return "local", path, path diff --git a/tests/integration/test_cp.py b/tests/integration/test_cp.py index 6783503e1..112b993f0 100644 --- a/tests/integration/test_cp.py +++ b/tests/integration/test_cp.py @@ -97,23 +97,36 @@ def cloudflare_bucket(): # TODO: add more parameters for bucket types @pytest.mark.parametrize("test_case", [test_bucket_large_file]) #, test_bucket_empty_folder]) -def test_cp_aws(aws_bucket, test_case): +def test_big_file(aws_bucket, gcp_bucket, test_case): client = SkyplaneClient() src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) assert isinstance(aws_bucket, str), f"Bucket name is not a string {aws_bucket}" assert len(list(src_iface.list_objects(prefix=test_case.replace(f"{test_bucket}/", "")))) > 0, f"Test case {test_case} does not exist in {test_bucket}" - client.copy(test_case, f"s3://{aws_bucket}/{test_case}", recursive=True) + client.copy(test_case, f"s3://{aws_bucket}/{test_case}") # assert sync has cost zero - pipeline = client.get_pipeline() - pipeline.queue_copy(test_case, f"s3://{aws_bucket}/{test_case}", recursive=True) - cost = pipeline.estimate_total_cost() - assert cost == 0, f"Cost is not zero {cost}, still objects to copy" + dst_iface = ObjectStoreInterface.create("aws:us-west-2", aws_bucket) + dst_objects = list(dst_iface.list_objects()) + assert dst_objects == [test_case], f"Object {test_case} not copied to {aws_bucket}: only container {dst_objects}" # copy back - client.copy(f"s3://{aws_bucket}/{test_case}", f"{test_bucket}/aws/{test_case}", recursive=True) + client.copy(f"s3://{aws_bucket}/{test_case}", f"gs://{gcp_bucket}/aws/{test_case}") + +@pytest.mark.parametrize("test_case", [test_bucket_small_file]) #, test_bucket_empty_folder]) +def test_many_files(aws_bucket, gcp_bucket, test_case): + + client = SkyplaneClient() + src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) + + assert isinstance(aws_bucket, str), f"Bucket name is not a string {aws_bucket}" + assert len(list(src_iface.list_objects(prefix=test_case.replace(f"{test_bucket}/", "")))) > 0, f"Test case {test_case} does not exist in {test_bucket}" + client.copy(test_case, f"s3://{aws_bucket}/{test_case}", recursive=True) + + # copy back + client.copy(f"s3://{aws_bucket}/{test_case}", f"gs://{gcp_bucket}/aws/{test_case}", recursive=True) + # test one sided transfers def test_cp_one_sided(): @@ -121,21 +134,19 @@ def test_cp_one_sided(): # test multiple VMs def test_cp_multiple_vms(aws_bucket): - print("starting") client = SkyplaneClient() - print("created client") pipeline = client.pipeline(max_instances=2) - print('created pipeline)') pipeline.queue_copy(test_bucket_large_file, f"s3://{aws_bucket}/") - print('start') pipeline.start(debug=True, progress=True) - print('started pipeline') # test multicast -def test_cp_multicast(aws_bucket, gcp_bucket, azure_bucket): +# TODO: add azure +def test_cp_multicast(aws_bucket, gcp_bucket, same_region_bucket): client = SkyplaneClient() - client.copy(test_bucket_large_file, [f"s3://{aws_bucket}/", f"gs://{gcp_bucket}/", f"az://{azure_bucket}/"]) + src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) + assert len(list(src_iface.list_objects(prefix=test_bucket_large_file.replace(f"{test_bucket}/", "")))) > 0, f"Test case {test_bucket_large_file} does not exist in {test_bucket}" + client.copy(test_bucket_large_file, [f"s3://{aws_bucket}/", f"gs://{gcp_bucket}/", f"gs://{same_region_bucket}/"]) From 84ebdc278a2231c8aa8990c0516d3d24c3cf9a48 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Fri, 16 Jun 2023 06:32:59 +0000 Subject: [PATCH 05/13] actualy disable cloudflare --- skyplane/cli/cli_init.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skyplane/cli/cli_init.py b/skyplane/cli/cli_init.py index e946de451..90b973236 100644 --- a/skyplane/cli/cli_init.py +++ b/skyplane/cli/cli_init.py @@ -535,7 +535,7 @@ def init( cloud_config = load_gcp_config(cloud_config, force_init=reinit_gcp, non_interactive=non_interactive) # load cloudflare config - if not reinit_cloudflare: # TODO: fix reinit logic + if not reinit_cloudflare and not disable_config_cloudflare: # TODO: fix reinit logic typer.secho("\n(4) Configuring Cloudflare R2:", fg="yellow", bold=True) if not disable_config_cloudflare: cloud_config = load_cloudflare_config(cloud_config, non_interactive=non_interactive) From 9fd0cd8204c7b8f9d9bc46ed6587237cd0a31877 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Fri, 16 Jun 2023 06:33:33 +0000 Subject: [PATCH 06/13] actualy disable cloudflare --- skyplane/compute/aws/aws_cloud_provider.py | 2 +- skyplane/planner/planner.py | 4 - skyplane/utils/path.py | 2 +- tests/integration/test_cp.py | 94 ++++++++++++---------- 4 files changed, 54 insertions(+), 48 deletions(-) diff --git a/skyplane/compute/aws/aws_cloud_provider.py b/skyplane/compute/aws/aws_cloud_provider.py index b32140e51..2c82e239e 100644 --- a/skyplane/compute/aws/aws_cloud_provider.py +++ b/skyplane/compute/aws/aws_cloud_provider.py @@ -230,7 +230,7 @@ def start_instance(subnet_id: str): raise skyplane_exceptions.InsufficientVCPUException() from e elif "Invalid IAM Instance Profile name" in str(e): # TODO: suppress this - logger.(str(e)) + logger.warning(str(e)) elif "InsufficientInstanceCapacity" in str(e): # try another subnet current_subnet_id = (current_subnet_id + 1) % len(subnets) diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 1e84c7af1..85eed68a5 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -244,9 +244,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: class MulticastDirectPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig): - super().__init__(transfer_config) self.n_instances = n_instances self.n_connections = n_connections @@ -260,7 +258,6 @@ def __init__(self, n_instances: int, n_connections: int, transfer_config: Transf with self.transfer_config.azure_vcpu_file.open("r") as f: self.quota_limits["azure"] = json.load(f) - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag = jobs[0].src_iface.region_tag() dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces] @@ -474,4 +471,3 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: for dst_region_tag, program in dst_program.items(): plan.set_gateway_program(dst_region_tag, program) return plan - diff --git a/skyplane/utils/path.py b/skyplane/utils/path.py index 2f57c1926..9670934f8 100644 --- a/skyplane/utils/path.py +++ b/skyplane/utils/path.py @@ -66,7 +66,7 @@ def is_plausible_local_path(path_test: str): else: if not is_plausible_local_path(path): logger.warning(f"Local path '{path}' does not exist") - if path.startswith("az://"): + if path.startswith("az://"): logger.warning(f"Did you mean azure://...? If not, assuming local path.") # path is subsitutute for bucket diff --git a/tests/integration/test_cp.py b/tests/integration/test_cp.py index 112b993f0..9ae1999f4 100644 --- a/tests/integration/test_cp.py +++ b/tests/integration/test_cp.py @@ -7,22 +7,23 @@ import uuid import os -test_bucket = "gs://skyplane-test-bucket" # bucket containing test data +test_bucket = "gs://skyplane-test-bucket" # bucket containing test data test_region_tag = "gcp:us-west2" -# test cases +# test cases test_bucket_small_file = f"{test_bucket}/files_10000_size_4_mb" test_bucket_large_file = f"{test_bucket}/file_1_size_16_gb" -#test_bucket_empty_folder = f"{test_bucket}/empty_folder" +# test_bucket_empty_folder = f"{test_bucket}/empty_folder" + @pytest.mark.skip(reason="Shared function") -def setup_bucket(region_tag): +def setup_bucket(region_tag): provider, region = region_tag.split(":") if provider == "azure": bucket_name = f"integration{region}/{str(uuid.uuid4()).replace('-', '')}" else: bucket_name = f"integration{region}-{str(uuid.uuid4())[:8]}" - + # create bucket try: iface = ObjectStoreInterface.create(region_tag, bucket_name) @@ -33,22 +34,25 @@ def setup_bucket(region_tag): return iface + @pytest.fixture(scope="session") -def bucket(region_tag): +def bucket(region_tag): iface = setup_bucket(region_tag) - yield iface.bucket() - # cleanup + yield iface.bucket() + # cleanup iface.delete_bucket() + @pytest.fixture(scope="session") -def same_region_bucket(): +def same_region_bucket(): iface = setup_bucket(test_region_tag) assert iface.bucket_exists(), f"Bucket {iface.bucket()} does not exist" yield iface.bucket() - # cleanup + # cleanup iface.delete_bucket() + @pytest.fixture(scope="session") def gcp_bucket(): region_tag = "gcp:europe-west2" @@ -56,25 +60,27 @@ def gcp_bucket(): assert iface.bucket_exists(), f"Bucket {iface.bucket()} does not exist" yield iface.bucket() - # cleanup + # cleanup iface.delete_bucket() + @pytest.fixture(scope="session") -def azure_bucket(): +def azure_bucket(): azure_region_tag = "azure:westus2" iface = setup_bucket(azure_region_tag) - while not iface.bucket_exists(): + while not iface.bucket_exists(): logger.fs.info(f"Waiting for bucket {iface.bucket()}") time.sleep(1) - yield iface.bucket() - # cleanup + yield iface.bucket() + # cleanup iface.delete_bucket() + @pytest.fixture(scope="session") -def aws_bucket(): +def aws_bucket(): aws_region_tag = "aws:us-west-2" iface = setup_bucket(aws_region_tag) - #while not iface.bucket_exists(): + # while not iface.bucket_exists(): # print("waiting for bucket...") # logger.fs.info(f"Waiting for bucket {iface.bucket()}") # time.sleep(1) @@ -85,8 +91,9 @@ def aws_bucket(): # cleanup iface.delete_bucket() + @pytest.fixture(scope="session") -def cloudflare_bucket(): +def cloudflare_bucket(): iface = setup_bucket("cloudflare:infer") assert iface.bucket_exists(), f"Bucket {iface.bucket()} does not exist" yield iface.bucket() @@ -95,58 +102,61 @@ def cloudflare_bucket(): iface.delete_bucket() -# TODO: add more parameters for bucket types -@pytest.mark.parametrize("test_case", [test_bucket_large_file]) #, test_bucket_empty_folder]) -def test_big_file(aws_bucket, gcp_bucket, test_case): - +# TODO: add more parameters for bucket types +@pytest.mark.parametrize("test_case", [test_bucket_large_file]) # , test_bucket_empty_folder]) +def test_big_file(aws_bucket, gcp_bucket, test_case): client = SkyplaneClient() src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) assert isinstance(aws_bucket, str), f"Bucket name is not a string {aws_bucket}" - assert len(list(src_iface.list_objects(prefix=test_case.replace(f"{test_bucket}/", "")))) > 0, f"Test case {test_case} does not exist in {test_bucket}" + assert ( + len(list(src_iface.list_objects(prefix=test_case.replace(f"{test_bucket}/", "")))) > 0 + ), f"Test case {test_case} does not exist in {test_bucket}" client.copy(test_case, f"s3://{aws_bucket}/{test_case}") - # assert sync has cost zero + # assert sync has cost zero dst_iface = ObjectStoreInterface.create("aws:us-west-2", aws_bucket) dst_objects = list(dst_iface.list_objects()) - assert dst_objects == [test_case], f"Object {test_case} not copied to {aws_bucket}: only container {dst_objects}" + assert dst_objects == [test_case], f"Object {test_case} not copied to {aws_bucket}: only container {dst_objects}" - # copy back + # copy back client.copy(f"s3://{aws_bucket}/{test_case}", f"gs://{gcp_bucket}/aws/{test_case}") -@pytest.mark.parametrize("test_case", [test_bucket_small_file]) #, test_bucket_empty_folder]) -def test_many_files(aws_bucket, gcp_bucket, test_case): - + +@pytest.mark.parametrize("test_case", [test_bucket_small_file]) # , test_bucket_empty_folder]) +def test_many_files(aws_bucket, gcp_bucket, test_case): client = SkyplaneClient() src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) assert isinstance(aws_bucket, str), f"Bucket name is not a string {aws_bucket}" - assert len(list(src_iface.list_objects(prefix=test_case.replace(f"{test_bucket}/", "")))) > 0, f"Test case {test_case} does not exist in {test_bucket}" + assert ( + len(list(src_iface.list_objects(prefix=test_case.replace(f"{test_bucket}/", "")))) > 0 + ), f"Test case {test_case} does not exist in {test_bucket}" client.copy(test_case, f"s3://{aws_bucket}/{test_case}", recursive=True) - # copy back + # copy back client.copy(f"s3://{aws_bucket}/{test_case}", f"gs://{gcp_bucket}/aws/{test_case}", recursive=True) -# test one sided transfers -def test_cp_one_sided(): - pass +# test one sided transfers +def test_cp_one_sided(): + pass -# test multiple VMs -def test_cp_multiple_vms(aws_bucket): + +# test multiple VMs +def test_cp_multiple_vms(aws_bucket): client = SkyplaneClient() pipeline = client.pipeline(max_instances=2) pipeline.queue_copy(test_bucket_large_file, f"s3://{aws_bucket}/") pipeline.start(debug=True, progress=True) -# test multicast -# TODO: add azure +# test multicast +# TODO: add azure def test_cp_multicast(aws_bucket, gcp_bucket, same_region_bucket): client = SkyplaneClient() src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) - assert len(list(src_iface.list_objects(prefix=test_bucket_large_file.replace(f"{test_bucket}/", "")))) > 0, f"Test case {test_bucket_large_file} does not exist in {test_bucket}" + assert ( + len(list(src_iface.list_objects(prefix=test_bucket_large_file.replace(f"{test_bucket}/", "")))) > 0 + ), f"Test case {test_bucket_large_file} does not exist in {test_bucket}" client.copy(test_bucket_large_file, [f"s3://{aws_bucket}/", f"gs://{gcp_bucket}/", f"gs://{same_region_bucket}/"]) - - - From d2e42001ad39b0362b4e07c6c1f8d331eee3681e Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Sun, 18 Jun 2023 07:05:15 +0000 Subject: [PATCH 07/13] test fixes --- skyplane/api/transfer_job.py | 21 +++++-- skyplane/cli/cli_cloud.py | 34 ++++++++++ skyplane/compute/aws/aws_cloud_provider.py | 2 +- skyplane/obj_store/azure_blob_interface.py | 3 + skyplane/planner/planner.py | 1 + tests/integration/test_cp.py | 73 +++++++++++++--------- 6 files changed, 100 insertions(+), 34 deletions(-) diff --git a/skyplane/api/transfer_job.py b/skyplane/api/transfer_job.py index 898dcad26..2e82042cb 100644 --- a/skyplane/api/transfer_job.py +++ b/skyplane/api/transfer_job.py @@ -100,9 +100,12 @@ def _run_multipart_chunk_thread( while not exit_event.is_set(): try: transfer_pair = in_queue.get(block=False, timeout=0.1) + print("GOT TRANSFER PAIR") except queue.Empty: continue + print("start thread") + src_object = transfer_pair.src_obj dest_objects = transfer_pair.dst_objs dest_key = transfer_pair.dst_key @@ -113,7 +116,7 @@ def _run_multipart_chunk_thread( for dest_iface in self.dst_ifaces: dest_object = dest_objects[dest_iface.region_tag()] upload_id = dest_iface.initiate_multipart_upload(dest_object.key, mime_type=mime_type) - # print(f"Created upload id for key {dest_object.key} with upload id {upload_id} for bucket {dest_iface.bucket_name}") + print(f"Created upload id for key {dest_object.key} with upload id {upload_id} for bucket {dest_iface.bucket_name}") # store mapping between key and upload id for each region upload_id_mapping[dest_iface.region_tag()] = (src_object.key, upload_id) out_queue_chunks.put(GatewayMessage(upload_id_mapping=upload_id_mapping)) # send to output queue @@ -263,11 +266,13 @@ def transfer_pair_generator( for dst_iface in self.dst_ifaces: if not dst_iface.bucket_exists(): raise exceptions.MissingBucketException(f"Destination bucket {dst_iface.path()} does not exist or is not readable.") - + # query all source region objects logger.fs.debug(f"Querying objects in {self.src_iface.path()}") + print("PREFIX", src_prefix) n_objs = 0 for obj in self.src_iface.list_objects(src_prefix): + print('obj', obj) if prefilter_fn is None or prefilter_fn(obj): # collect list of destination objects dest_objs = {} @@ -318,8 +323,13 @@ def chunk(self, transfer_pair_generator: Generator[TransferPair, None, None]) -> multipart_exit_event = threading.Event() multipart_chunk_threads = [] + + # TODO: remove after azure multipart implemented + azure_dest = any([dst_iface.provider() == "azure" for dst_iface in self.dst_ifaces]) + print([dst_iface.provider() == "azure" for dst_iface in self.dst_ifaces]) + # start chunking threads - if self.transfer_config.multipart_enabled: + if not azure_dest and self.transfer_config.multipart_enabled: for _ in range(self.concurrent_multipart_chunk_threads): t = threading.Thread( target=self._run_multipart_chunk_thread, @@ -332,9 +342,11 @@ def chunk(self, transfer_pair_generator: Generator[TransferPair, None, None]) -> # begin chunking loop for transfer_pair in transfer_pair_generator: src_obj = transfer_pair.src_obj - if self.transfer_config.multipart_enabled and src_obj.size > self.transfer_config.multipart_threshold_mb * MB: + if not azure_dest and self.transfer_config.multipart_enabled and src_obj.size > self.transfer_config.multipart_threshold_mb * MB: + print("MULTIPART", transfer_pair) multipart_send_queue.put(transfer_pair) else: + print("NO MULTIPART") if transfer_pair.src_obj.size == 0: logger.fs.debug(f"Skipping empty object {src_obj.key}") continue @@ -803,6 +815,7 @@ def _enrich_dest_objs( if dest_obj.key in self._found_dest_objs: dest_obj.size = self._found_dest_objs[dest_obj.key].size dest_obj.last_modified = self._found_dest_objs[dest_obj.key].last_modified + print("par", src_obj, dest_obj) yield src_obj, dest_obj @classmethod diff --git a/skyplane/cli/cli_cloud.py b/skyplane/cli/cli_cloud.py index 37687b8dd..57d7d74ef 100644 --- a/skyplane/cli/cli_cloud.py +++ b/skyplane/cli/cli_cloud.py @@ -3,6 +3,7 @@ """ import json +import uuid import subprocess import time from collections import defaultdict @@ -281,6 +282,39 @@ def azure_check( iface = AzureBlobInterface(account, container) print(iface.container_client.get_container_properties()) + # write object temorarily + # check skyplane AzureBlobInterface + print(f"\n{hline}\n[bold]Checking Skyplane AzureBlobInterface[/bold]\n{hline}") + from skyplane.obj_store.azure_blob_interface import AzureBlobInterface + + iface = AzureBlobInterface(account, container) + print(iface.container_client.get_container_properties()) + + # check if writeable + rprint(f"\n{hline}\n[bold]Checking Skyplane AzureBlobInterface write access[/bold]\n{hline}") + import tempfile + import random + import string + + def generate_random_string(length): + """Generate a random string of given length""" + letters = string.ascii_letters + return "".join(random.choice(letters) for _ in range(length)) + + def create_temp_file(size): + """Create a temporary file with random data""" + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_file_path = temp_file.name + random_data = generate_random_string(size) + temp_file.write(random_data.encode()) + return temp_file_path + + tmp_file = create_temp_file(1024) + tmp_object = f"skyplane-{uuid.uuid4()}" + iface.upload_object(tmp_file, tmp_object) + iface.delete_objects([tmp_object]) + + @app.command() def gcp_check( diff --git a/skyplane/compute/aws/aws_cloud_provider.py b/skyplane/compute/aws/aws_cloud_provider.py index 2c82e239e..90cc92012 100644 --- a/skyplane/compute/aws/aws_cloud_provider.py +++ b/skyplane/compute/aws/aws_cloud_provider.py @@ -229,7 +229,7 @@ def start_instance(subnet_id: str): elif "VcpuLimitExceeded" in str(e): raise skyplane_exceptions.InsufficientVCPUException() from e elif "Invalid IAM Instance Profile name" in str(e): - # TODO: suppress this + # TODO: suppress this logger.warning(str(e)) elif "InsufficientInstanceCapacity" in str(e): # try another subnet diff --git a/skyplane/obj_store/azure_blob_interface.py b/skyplane/obj_store/azure_blob_interface.py index f46c1bfd2..3ba36f200 100644 --- a/skyplane/obj_store/azure_blob_interface.py +++ b/skyplane/obj_store/azure_blob_interface.py @@ -26,6 +26,9 @@ def __init__(self, account_name: str, container_name: str, max_concurrency=1): self.container_name = container_name self.max_concurrency = max_concurrency # parallel upload/downloads, seems to cause issues if too high + def provider(self): + return "azure" + def path(self): return f"https://{self.account_name}.blob.core.windows.net/{self.container_name}" diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 85eed68a5..411db974f 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -117,6 +117,7 @@ def _calculate_vm_types(self, region_tag: str) -> Optional[Tuple[str, int]]: quota_limit = self._get_quota_limits_for( cloud_provider=cloud_provider, region=region, spot=getattr(self.transfer_config, f"{cloud_provider}_use_spot_instances") ) + print("QUOTA LIMIT", region_tag, quota_limit) config_vm_type = getattr(self.transfer_config, f"{cloud_provider}_instance_class") diff --git a/tests/integration/test_cp.py b/tests/integration/test_cp.py index 9ae1999f4..6fcc45d69 100644 --- a/tests/integration/test_cp.py +++ b/tests/integration/test_cp.py @@ -13,6 +13,7 @@ # test cases test_bucket_small_file = f"{test_bucket}/files_10000_size_4_mb" test_bucket_large_file = f"{test_bucket}/file_1_size_16_gb" +test_bucket_medium_file = f"{test_bucket}/files_100_size_64_mb" # test_bucket_empty_folder = f"{test_bucket}/empty_folder" @@ -20,7 +21,7 @@ def setup_bucket(region_tag): provider, region = region_tag.split(":") if provider == "azure": - bucket_name = f"integration{region}/{str(uuid.uuid4()).replace('-', '')}" + bucket_name = f"{str(uuid.uuid4())[:8]}/{str(uuid.uuid4()).replace('-', '')}" else: bucket_name = f"integration{region}-{str(uuid.uuid4())[:8]}" @@ -38,7 +39,7 @@ def setup_bucket(region_tag): @pytest.fixture(scope="session") def bucket(region_tag): iface = setup_bucket(region_tag) - yield iface.bucket() + yield iface # cleanup iface.delete_bucket() @@ -47,7 +48,7 @@ def bucket(region_tag): def same_region_bucket(): iface = setup_bucket(test_region_tag) assert iface.bucket_exists(), f"Bucket {iface.bucket()} does not exist" - yield iface.bucket() + yield iface # cleanup iface.delete_bucket() @@ -58,7 +59,7 @@ def gcp_bucket(): region_tag = "gcp:europe-west2" iface = setup_bucket(region_tag) assert iface.bucket_exists(), f"Bucket {iface.bucket()} does not exist" - yield iface.bucket() + yield iface # cleanup iface.delete_bucket() @@ -66,12 +67,12 @@ def gcp_bucket(): @pytest.fixture(scope="session") def azure_bucket(): - azure_region_tag = "azure:westus2" + azure_region_tag = "azure:westus" iface = setup_bucket(azure_region_tag) while not iface.bucket_exists(): logger.fs.info(f"Waiting for bucket {iface.bucket()}") time.sleep(1) - yield iface.bucket() + yield iface # cleanup iface.delete_bucket() @@ -80,14 +81,9 @@ def azure_bucket(): def aws_bucket(): aws_region_tag = "aws:us-west-2" iface = setup_bucket(aws_region_tag) - # while not iface.bucket_exists(): - # print("waiting for bucket...") - # logger.fs.info(f"Waiting for bucket {iface.bucket()}") - # time.sleep(1) - assert iface.bucket_exists(), f"Bucket {iface.bucket()} does not exist" - yield iface.bucket() + yield iface # cleanup iface.delete_bucket() @@ -103,39 +99,58 @@ def cloudflare_bucket(): # TODO: add more parameters for bucket types -@pytest.mark.parametrize("test_case", [test_bucket_large_file]) # , test_bucket_empty_folder]) -def test_big_file(aws_bucket, gcp_bucket, test_case): +@pytest.mark.parametrize("test_case, recursive", [(test_bucket_medium_file, True)]) +def test_azure(azure_bucket, gcp_bucket, test_case, recursive): + """ + Test copying a big file to different cloud providers + :param azure_bucket: destination interface + :param gcp_bucket: gcp bucket to copy FROM dstiface + :param test_case: test case from test_bucket to copy from + """ + print("DEST", azure_bucket, gcp_bucket) client = SkyplaneClient() src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) - assert isinstance(aws_bucket, str), f"Bucket name is not a string {aws_bucket}" + assert isinstance(azure_bucket.bucket(), str), f"Bucket name is not a string {azure_bucket.bucket()}" assert ( len(list(src_iface.list_objects(prefix=test_case.replace(f"{test_bucket}/", "")))) > 0 ), f"Test case {test_case} does not exist in {test_bucket}" - client.copy(test_case, f"s3://{aws_bucket}/{test_case}") + print("test case", test_case) + client.copy(test_case, f"{azure_bucket.path()}/{test_case}", recursive=recursive) # assert sync has cost zero - dst_iface = ObjectStoreInterface.create("aws:us-west-2", aws_bucket) - dst_objects = list(dst_iface.list_objects()) - assert dst_objects == [test_case], f"Object {test_case} not copied to {aws_bucket}: only container {dst_objects}" + dst_objects = list(azure_bucket.list_objects()) + assert len(dst_objects) > 0, f"Object {test_case} not copied to {azure_bucket.bucket()}: only container {dst_objects}" + print(f"gs://{gcp_bucket}/azure/{test_case}") # copy back - client.copy(f"s3://{aws_bucket}/{test_case}", f"gs://{gcp_bucket}/aws/{test_case}") - - -@pytest.mark.parametrize("test_case", [test_bucket_small_file]) # , test_bucket_empty_folder]) -def test_many_files(aws_bucket, gcp_bucket, test_case): + client.copy(f"{azure_bucket.path()}/{test_case}", f"gs://{gcp_bucket.bucket()}/azure/", recursive=recursive) + +@pytest.mark.parametrize("test_case, recursive", [(test_bucket_large_file, False)]) +def test_aws(aws_bucket, gcp_bucket, test_case, recursive): + """ + Test copying a big file to different cloud providers + :param aws_bucket: destination interface + :param gcp_bucket: gcp bucket to copy FROM dstiface + :param test_case: test case from test_bucket to copy from + """ + print("DEST", aws_bucket, gcp_bucket) client = SkyplaneClient() src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) - assert isinstance(aws_bucket, str), f"Bucket name is not a string {aws_bucket}" + assert isinstance(aws_bucket.bucket(), str), f"Bucket name is not a string {aws_bucket.bucket()}" assert ( len(list(src_iface.list_objects(prefix=test_case.replace(f"{test_bucket}/", "")))) > 0 ), f"Test case {test_case} does not exist in {test_bucket}" - client.copy(test_case, f"s3://{aws_bucket}/{test_case}", recursive=True) + print("test case", test_case) + client.copy(test_case, f"{aws_bucket.path()}/{test_case}", recursive=recursive) + + # assert sync has cost zero + dst_objects = list(aws_bucket.list_objects()) + assert dst_objects == [test_case], f"Object {test_case} not copied to {aws_bucket.bucket()}: only container {dst_objects}" # copy back - client.copy(f"s3://{aws_bucket}/{test_case}", f"gs://{gcp_bucket}/aws/{test_case}", recursive=True) + client.copy(f"{aws_bucket.path()}/{test_case}", f"gs://{gcp_bucket.bucket()}/aws/", recursive=recursive) # test one sided transfers @@ -147,7 +162,7 @@ def test_cp_one_sided(): def test_cp_multiple_vms(aws_bucket): client = SkyplaneClient() pipeline = client.pipeline(max_instances=2) - pipeline.queue_copy(test_bucket_large_file, f"s3://{aws_bucket}/") + pipeline.queue_copy(test_bucket_large_file, f"s3://{aws_bucket.bucket()}/") pipeline.start(debug=True, progress=True) @@ -159,4 +174,4 @@ def test_cp_multicast(aws_bucket, gcp_bucket, same_region_bucket): assert ( len(list(src_iface.list_objects(prefix=test_bucket_large_file.replace(f"{test_bucket}/", "")))) > 0 ), f"Test case {test_bucket_large_file} does not exist in {test_bucket}" - client.copy(test_bucket_large_file, [f"s3://{aws_bucket}/", f"gs://{gcp_bucket}/", f"gs://{same_region_bucket}/"]) + client.copy(test_bucket_large_file, [f"s3://{aws_bucket}/", f"gs://{gcp_bucket}/", f"gs://{same_region_bucket}/", f"azure://{azure_bucket}/"]) From 00dd79bc21dba605f3e78c2d127d89ccff172e03 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Mon, 19 Jun 2023 06:07:00 +0000 Subject: [PATCH 08/13] cleanup --- skyplane/api/transfer_job.py | 24 ++++++--------- skyplane/cli/cli_cloud.py | 3 +- skyplane/obj_store/azure_blob_interface.py | 1 + skyplane/obj_store/gcs_interface.py | 6 ++-- skyplane/obj_store/r2_interface.py | 5 +++- skyplane/obj_store/s3_interface.py | 5 +++- skyplane/obj_store/storage_interface.py | 4 +++ skyplane/planner/planner.py | 4 +-- tests/integration/test_cp.py | 34 +++++++++++++++++----- 9 files changed, 55 insertions(+), 31 deletions(-) diff --git a/skyplane/api/transfer_job.py b/skyplane/api/transfer_job.py index 2e82042cb..db4308ae4 100644 --- a/skyplane/api/transfer_job.py +++ b/skyplane/api/transfer_job.py @@ -100,12 +100,9 @@ def _run_multipart_chunk_thread( while not exit_event.is_set(): try: transfer_pair = in_queue.get(block=False, timeout=0.1) - print("GOT TRANSFER PAIR") except queue.Empty: continue - print("start thread") - src_object = transfer_pair.src_obj dest_objects = transfer_pair.dst_objs dest_key = transfer_pair.dst_key @@ -116,7 +113,7 @@ def _run_multipart_chunk_thread( for dest_iface in self.dst_ifaces: dest_object = dest_objects[dest_iface.region_tag()] upload_id = dest_iface.initiate_multipart_upload(dest_object.key, mime_type=mime_type) - print(f"Created upload id for key {dest_object.key} with upload id {upload_id} for bucket {dest_iface.bucket_name}") + # print(f"Created upload id for key {dest_object.key} with upload id {upload_id} for bucket {dest_iface.bucket_name}") # store mapping between key and upload id for each region upload_id_mapping[dest_iface.region_tag()] = (src_object.key, upload_id) out_queue_chunks.put(GatewayMessage(upload_id_mapping=upload_id_mapping)) # send to output queue @@ -266,13 +263,11 @@ def transfer_pair_generator( for dst_iface in self.dst_ifaces: if not dst_iface.bucket_exists(): raise exceptions.MissingBucketException(f"Destination bucket {dst_iface.path()} does not exist or is not readable.") - + # query all source region objects logger.fs.debug(f"Querying objects in {self.src_iface.path()}") - print("PREFIX", src_prefix) n_objs = 0 for obj in self.src_iface.list_objects(src_prefix): - print('obj', obj) if prefilter_fn is None or prefilter_fn(obj): # collect list of destination objects dest_objs = {} @@ -323,10 +318,8 @@ def chunk(self, transfer_pair_generator: Generator[TransferPair, None, None]) -> multipart_exit_event = threading.Event() multipart_chunk_threads = [] - - # TODO: remove after azure multipart implemented - azure_dest = any([dst_iface.provider() == "azure" for dst_iface in self.dst_ifaces]) - print([dst_iface.provider() == "azure" for dst_iface in self.dst_ifaces]) + # TODO: remove after azure multipart implemented + azure_dest = any([dst_iface.provider == "azure" for dst_iface in self.dst_ifaces]) # start chunking threads if not azure_dest and self.transfer_config.multipart_enabled: @@ -342,11 +335,13 @@ def chunk(self, transfer_pair_generator: Generator[TransferPair, None, None]) -> # begin chunking loop for transfer_pair in transfer_pair_generator: src_obj = transfer_pair.src_obj - if not azure_dest and self.transfer_config.multipart_enabled and src_obj.size > self.transfer_config.multipart_threshold_mb * MB: - print("MULTIPART", transfer_pair) + if ( + not azure_dest + and self.transfer_config.multipart_enabled + and src_obj.size > self.transfer_config.multipart_threshold_mb * MB + ): multipart_send_queue.put(transfer_pair) else: - print("NO MULTIPART") if transfer_pair.src_obj.size == 0: logger.fs.debug(f"Skipping empty object {src_obj.key}") continue @@ -815,7 +810,6 @@ def _enrich_dest_objs( if dest_obj.key in self._found_dest_objs: dest_obj.size = self._found_dest_objs[dest_obj.key].size dest_obj.last_modified = self._found_dest_objs[dest_obj.key].last_modified - print("par", src_obj, dest_obj) yield src_obj, dest_obj @classmethod diff --git a/skyplane/cli/cli_cloud.py b/skyplane/cli/cli_cloud.py index 57d7d74ef..87e6a1da8 100644 --- a/skyplane/cli/cli_cloud.py +++ b/skyplane/cli/cli_cloud.py @@ -283,7 +283,7 @@ def azure_check( print(iface.container_client.get_container_properties()) # write object temorarily - # check skyplane AzureBlobInterface + # check skyplane AzureBlobInterface print(f"\n{hline}\n[bold]Checking Skyplane AzureBlobInterface[/bold]\n{hline}") from skyplane.obj_store.azure_blob_interface import AzureBlobInterface @@ -313,7 +313,6 @@ def create_temp_file(size): tmp_object = f"skyplane-{uuid.uuid4()}" iface.upload_object(tmp_file, tmp_object) iface.delete_objects([tmp_object]) - @app.command() diff --git a/skyplane/obj_store/azure_blob_interface.py b/skyplane/obj_store/azure_blob_interface.py index 3ba36f200..72f0fef4f 100644 --- a/skyplane/obj_store/azure_blob_interface.py +++ b/skyplane/obj_store/azure_blob_interface.py @@ -26,6 +26,7 @@ def __init__(self, account_name: str, container_name: str, max_concurrency=1): self.container_name = container_name self.max_concurrency = max_concurrency # parallel upload/downloads, seems to cause issues if too high + @property def provider(self): return "azure" diff --git a/skyplane/obj_store/gcs_interface.py b/skyplane/obj_store/gcs_interface.py index 6ce6efb71..062b2a0d1 100644 --- a/skyplane/obj_store/gcs_interface.py +++ b/skyplane/obj_store/gcs_interface.py @@ -27,8 +27,10 @@ def __init__(self, bucket_name: str): self.auth = compute.GCPAuthentication() self._gcs_client = self.auth.get_storage_client() self._requests_session = requests.Session() - self.provider = "gcp" - # self.region_tag = self.a + + @property + def provider(self): + return "gcp" def path(self): return f"gs://{self.bucket_name}" diff --git a/skyplane/obj_store/r2_interface.py b/skyplane/obj_store/r2_interface.py index 063f3b431..af2e1a36b 100644 --- a/skyplane/obj_store/r2_interface.py +++ b/skyplane/obj_store/r2_interface.py @@ -39,7 +39,10 @@ def __init__(self, account_id: str, bucket_name: str): self.requester_pays = False self.bucket_name = bucket_name self.account_id = account_id - self.provider = "cloudflare" + + @property + def provider(self): + return "cloudflare" def path(self): return f"{self.endpoint_url}/{self.bucket_name}" diff --git a/skyplane/obj_store/s3_interface.py b/skyplane/obj_store/s3_interface.py index 535d2afb7..e7e59e594 100644 --- a/skyplane/obj_store/s3_interface.py +++ b/skyplane/obj_store/s3_interface.py @@ -24,7 +24,10 @@ def __init__(self, bucket_name: str): self.requester_pays = False self.bucket_name = bucket_name self._cached_s3_clients = {} - self.provider = "aws" + + @property + def provider(self): + return "aws" def path(self): return f"s3://{self.bucket_name}" diff --git a/skyplane/obj_store/storage_interface.py b/skyplane/obj_store/storage_interface.py index c32b50f1d..a430573f8 100644 --- a/skyplane/obj_store/storage_interface.py +++ b/skyplane/obj_store/storage_interface.py @@ -6,6 +6,10 @@ class StorageInterface: def bucket(self) -> str: return self.bucket_name + @property + def provider(self) -> str: + raise NotImplementedError() + def region_tag(self) -> str: raise NotImplementedError() diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 411db974f..998167d6d 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -176,10 +176,10 @@ def _get_vm_type_and_instances( class UnicastDirectPlanner(Planner): # DO NOT USE THIS - broken for single-region transfers - def __init__(self, n_instances: int, n_connections: int): + def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig): + super().__init__(transfer_config) self.n_instances = n_instances self.n_connections = n_connections - super().__init__() def plan(self, jobs: List[TransferJob]) -> TopologyPlan: # make sure only single destination diff --git a/tests/integration/test_cp.py b/tests/integration/test_cp.py index 6fcc45d69..80d1c5c53 100644 --- a/tests/integration/test_cp.py +++ b/tests/integration/test_cp.py @@ -105,9 +105,9 @@ def test_azure(azure_bucket, gcp_bucket, test_case, recursive): Test copying a big file to different cloud providers :param azure_bucket: destination interface :param gcp_bucket: gcp bucket to copy FROM dstiface - :param test_case: test case from test_bucket to copy from + :param test_case: test case from test_bucket to copy from """ - print("DEST", azure_bucket, gcp_bucket) + print("DEST", azure_bucket.path(), gcp_bucket.path()) client = SkyplaneClient() src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) @@ -126,15 +126,16 @@ def test_azure(azure_bucket, gcp_bucket, test_case, recursive): # copy back client.copy(f"{azure_bucket.path()}/{test_case}", f"gs://{gcp_bucket.bucket()}/azure/", recursive=recursive) -@pytest.mark.parametrize("test_case, recursive", [(test_bucket_large_file, False)]) + +@pytest.mark.parametrize("test_case, recursive", [(test_bucket_medium_file, True)]) def test_aws(aws_bucket, gcp_bucket, test_case, recursive): """ Test copying a big file to different cloud providers :param aws_bucket: destination interface :param gcp_bucket: gcp bucket to copy FROM dstiface - :param test_case: test case from test_bucket to copy from + :param test_case: test case from test_bucket to copy from """ - print("DEST", aws_bucket, gcp_bucket) + print("DEST", aws_bucket.path(), gcp_bucket.path()) client = SkyplaneClient() src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) @@ -147,14 +148,29 @@ def test_aws(aws_bucket, gcp_bucket, test_case, recursive): # assert sync has cost zero dst_objects = list(aws_bucket.list_objects()) - assert dst_objects == [test_case], f"Object {test_case} not copied to {aws_bucket.bucket()}: only container {dst_objects}" + assert len(dst_objects) > 0, f"Object {test_case} not copied to {aws_bucket.bucket()}: only container {dst_objects}" + print(f"gs://{gcp_bucket}/azure/{test_case}") # copy back - client.copy(f"{aws_bucket.path()}/{test_case}", f"gs://{gcp_bucket.bucket()}/aws/", recursive=recursive) + client.copy(f"{aws_bucket.path()}/{test_case}", f"gs://{gcp_bucket.bucket()}/azure/", recursive=recursive) + + +def test_pipeline(gcp_bucket): + """Test pipeline's ability to run multiple copy jobs on a single dataplane""" + client = SkyplaneClient() + pipeline = client.pipeline() + + # queue two copy jobs + pipeline.queue_copy(test_bucket_large_file, gcp_bucket.path()) + pipeline.queue_copy(test_bucket_medium_file, gcp_bucket.path()) + + # start pipeline + pipeline.start(debug=True, progress=True) # test one sided transfers def test_cp_one_sided(): + # TODO: run on-sided tranfer between all cloud pairs pass @@ -174,4 +190,6 @@ def test_cp_multicast(aws_bucket, gcp_bucket, same_region_bucket): assert ( len(list(src_iface.list_objects(prefix=test_bucket_large_file.replace(f"{test_bucket}/", "")))) > 0 ), f"Test case {test_bucket_large_file} does not exist in {test_bucket}" - client.copy(test_bucket_large_file, [f"s3://{aws_bucket}/", f"gs://{gcp_bucket}/", f"gs://{same_region_bucket}/", f"azure://{azure_bucket}/"]) + client.copy( + test_bucket_large_file, [f"s3://{aws_bucket}/", f"gs://{gcp_bucket}/", f"gs://{same_region_bucket}/", f"azure://{azure_bucket}/"] + ) From 17d863dd56b98c172eee890bd50a45ff4f8b0175 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Mon, 19 Jun 2023 06:13:19 +0000 Subject: [PATCH 09/13] add all cloud tests --- tests/integration/test_cp.py | 65 +++++++++++++++++++++++++++++++++--- 1 file changed, 60 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_cp.py b/tests/integration/test_cp.py index 80d1c5c53..a67585f6d 100644 --- a/tests/integration/test_cp.py +++ b/tests/integration/test_cp.py @@ -127,7 +127,9 @@ def test_azure(azure_bucket, gcp_bucket, test_case, recursive): client.copy(f"{azure_bucket.path()}/{test_case}", f"gs://{gcp_bucket.bucket()}/azure/", recursive=recursive) -@pytest.mark.parametrize("test_case, recursive", [(test_bucket_medium_file, True)]) +@pytest.mark.parametrize( + "test_case, recursive", [(test_bucket_medium_file, True), (test_bucket_large_file, False), (test_bucket_small_file, True)] +) def test_aws(aws_bucket, gcp_bucket, test_case, recursive): """ Test copying a big file to different cloud providers @@ -135,7 +137,6 @@ def test_aws(aws_bucket, gcp_bucket, test_case, recursive): :param gcp_bucket: gcp bucket to copy FROM dstiface :param test_case: test case from test_bucket to copy from """ - print("DEST", aws_bucket.path(), gcp_bucket.path()) client = SkyplaneClient() src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) @@ -143,16 +144,70 @@ def test_aws(aws_bucket, gcp_bucket, test_case, recursive): assert ( len(list(src_iface.list_objects(prefix=test_case.replace(f"{test_bucket}/", "")))) > 0 ), f"Test case {test_case} does not exist in {test_bucket}" - print("test case", test_case) client.copy(test_case, f"{aws_bucket.path()}/{test_case}", recursive=recursive) # assert sync has cost zero dst_objects = list(aws_bucket.list_objects()) assert len(dst_objects) > 0, f"Object {test_case} not copied to {aws_bucket.bucket()}: only container {dst_objects}" - print(f"gs://{gcp_bucket}/azure/{test_case}") # copy back - client.copy(f"{aws_bucket.path()}/{test_case}", f"gs://{gcp_bucket.bucket()}/azure/", recursive=recursive) + client.copy(f"{aws_bucket.path()}/{test_case}", f"gs://{gcp_bucket.bucket()}/aws/", recursive=recursive) + + +@pytest.mark.parametrize( + "test_case, recursive", [(test_bucket_medium_file, True), (test_bucket_large_file, False), (test_bucket_small_file, True)] +) +def test_cloudflare(cloudflare_bucket, gcp_bucket, test_case, recursive): + """ + Test copying a big file to different cloud providers + :param cloudflare_bucket: destination interface + :param gcp_bucket: gcp bucket to copy FROM dstiface + :param test_case: test case from test_bucket to copy from + """ + client = SkyplaneClient() + src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) + + assert isinstance(cloudflare_bucket.bucket(), str), f"Bucket name is not a string {cloudflare_bucket.bucket()}" + assert ( + len(list(src_iface.list_objects(prefix=test_case.replace(f"{test_bucket}/", "")))) > 0 + ), f"Test case {test_case} does not exist in {test_bucket}" + client.copy(test_case, f"{cloudflare_bucket.path()}/{test_case}", recursive=recursive) + + # assert sync has cost zero + dst_objects = list(cloudflare_bucket.list_objects()) + assert len(dst_objects) > 0, f"Object {test_case} not copied to {cloudflare_bucket.bucket()}: only container {dst_objects}" + + # copy back + client.copy(f"{cloudflare_bucket.path()}/{test_case}", f"gs://{gcp_bucket.bucket()}/cloudflare/", recursive=recursive) + + +@pytest.mark.parametrize( + "test_case, recursive", [(test_bucket_medium_file, True), (test_bucket_large_file, False), (test_bucket_small_file, True)] +) +def test_gcp(gcp_bucket, test_case, recursive): + """ + Test copying a big file to different cloud providers + :param gcp_bucket: destination interface + :param gcp_bucket: gcp bucket to copy FROM dstiface + :param test_case: test case from test_bucket to copy from + """ + client = SkyplaneClient() + src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) + + assert isinstance(gcp_bucket.bucket(), str), f"Bucket name is not a string {gcp_bucket.bucket()}" + assert ( + len(list(src_iface.list_objects(prefix=test_case.replace(f"{test_bucket}/", "")))) > 0 + ), f"Test case {test_case} does not exist in {test_bucket}" + client.copy(test_case, f"{gcp_bucket.path()}/{test_case}", recursive=recursive) + + # assert sync has cost zero + dst_objects = list(gcp_bucket.list_objects()) + assert len(dst_objects) > 0, f"Object {test_case} not copied to {gcp_bucket.bucket()}: only container {dst_objects}" + + +def test_same_region(same_region_bucket): + client = SkyplaneClient() + client.copy(test_bucket_large_file, f"{same_region_bucket.path()}") def test_pipeline(gcp_bucket): From 7096e6386f1532acefecd2d449bd3d5c946197c7 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Mon, 19 Jun 2023 19:33:09 +0000 Subject: [PATCH 10/13] debugging pipeline --- skyplane/api/dataplane.py | 5 ++++- skyplane/api/pipeline.py | 1 + skyplane/api/tracker.py | 3 +++ skyplane/api/transfer_job.py | 17 ++++++++++------- skyplane/obj_store/s3_interface.py | 8 ++++++-- skyplane/planner/planner.py | 2 -- tests/integration/test_cp.py | 28 +++++++++++++++++++++++++--- 7 files changed, 49 insertions(+), 15 deletions(-) diff --git a/skyplane/api/dataplane.py b/skyplane/api/dataplane.py index b09f0268a..6a5736fd2 100644 --- a/skyplane/api/dataplane.py +++ b/skyplane/api/dataplane.py @@ -64,6 +64,8 @@ def __init__( self.topology = topology self.provisioner = provisioner self.transfer_config = transfer_config + # disable for azure + # TODO: remove this self.http_pool = urllib3.PoolManager(retries=urllib3.Retry(total=3)) self.provisioning_lock = threading.Lock() self.provisioned = False @@ -234,7 +236,7 @@ def copy_log(instance): instance.run_command("sudo docker logs -t skyplane_gateway 2> /tmp/gateway.stderr > /tmp/gateway.stdout") instance.download_file("/tmp/gateway.stdout", out_file) instance.download_file("/tmp/gateway.stderr", err_file) - + print("COPY GATEWAY LOGS") do_parallel(copy_log, self.bound_nodes.values(), n=-1) def deprovision(self, max_jobs: int = 64, spinner: bool = False): @@ -307,6 +309,7 @@ def run_async(self, jobs: List[TransferJob], hooks: Optional[TransferHook] = Non """ if not self.provisioned: logger.error("Dataplane must be pre-provisioned. Call dataplane.provision() before starting a transfer") + print("discord", jobs) tracker = TransferProgressTracker(self, jobs, self.transfer_config, hooks) self.pending_transfers.append(tracker) tracker.start() diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index 929a18832..f37f399dd 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -101,6 +101,7 @@ def start(self, debug=False, progress=False): ## create dataplane from plan # dp = Dataplane(self.clientid, topo, self.provisioner, self.transfer_config, self.transfer_dir, debug=debug) dp = self.create_dataplane(debug) + print("pipeline", self.jobs_to_dispatch) try: dp.provision(spinner=True) if progress: diff --git a/skyplane/api/tracker.py b/skyplane/api/tracker.py index 9f9d2e4a7..c7d2218ba 100644 --- a/skyplane/api/tracker.py +++ b/skyplane/api/tracker.py @@ -141,6 +141,8 @@ def run(self): chunk_streams = { job_uuid: job.dispatch(self.dataplane, transfer_config=self.transfer_config) for job_uuid, job in self.jobs.items() } + print("JOBS", self.jobs) + print(chunk_streams, "jobs", self.jobs) for job_uuid, job in self.jobs.items(): logger.fs.debug(f"[TransferProgressTracker] Dispatching job {job.uuid}") self.job_chunk_requests[job_uuid] = {} @@ -276,6 +278,7 @@ def monitor_transfer(pd, self, region_tag): # check for errors and exit if there are any (while setting debug flags) errors = self.dataplane.check_error_logs() if any(errors.values()): + print("errors", errors) logger.warning("Copying gateway logs...") self.dataplane.copy_gateway_logs() self.errors = errors diff --git a/skyplane/api/transfer_job.py b/skyplane/api/transfer_job.py index db4308ae4..134fc335c 100644 --- a/skyplane/api/transfer_job.py +++ b/skyplane/api/transfer_job.py @@ -113,7 +113,7 @@ def _run_multipart_chunk_thread( for dest_iface in self.dst_ifaces: dest_object = dest_objects[dest_iface.region_tag()] upload_id = dest_iface.initiate_multipart_upload(dest_object.key, mime_type=mime_type) - # print(f"Created upload id for key {dest_object.key} with upload id {upload_id} for bucket {dest_iface.bucket_name}") + print(f"Created upload id for key {dest_object.key} with upload id {upload_id} for bucket {dest_iface.bucket_name}") # store mapping between key and upload id for each region upload_id_mapping[dest_iface.region_tag()] = (src_object.key, upload_id) out_queue_chunks.put(GatewayMessage(upload_id_mapping=upload_id_mapping)) # send to output queue @@ -460,13 +460,16 @@ def __init__( dst_paths: List[str] or str, recursive: bool = False, requester_pays: bool = False, - uuid: str = field(init=False, default_factory=lambda: str(uuid.uuid4())), + job_id: Optional[str] = None ): self.src_path = src_path self.dst_paths = dst_paths self.recursive = recursive self.requester_pays = requester_pays - self.uuid = uuid + if job_id is None: + self.uuid = str(uuid.uuid4()) + else: + self.uuid = job_id @property def transfer_type(self) -> str: @@ -559,9 +562,9 @@ def __init__( dst_paths: List[str] or str, recursive: bool = False, requester_pays: bool = False, - uuid: str = field(init=False, default_factory=lambda: str(uuid.uuid4())), + job_id: Optional[str] = None ): - super().__init__(src_path, dst_paths, recursive, requester_pays, uuid) + super().__init__(src_path, dst_paths, recursive, requester_pays, job_id) self.transfer_list = [] self.multipart_transfer_list = [] @@ -750,9 +753,9 @@ def __init__( src_path: str, dst_paths: List[str] or str, requester_pays: bool = False, - uuid: str = field(init=False, default_factory=lambda: str(uuid.uuid4())), + job_id: Optional[str] = None ): - super().__init__(src_path, dst_paths, True, requester_pays, uuid) + super().__init__(src_path, dst_paths, True, requester_pays, job_id) self.transfer_list = [] self.multipart_transfer_list = [] diff --git a/skyplane/obj_store/s3_interface.py b/skyplane/obj_store/s3_interface.py index e7e59e594..5e909095c 100644 --- a/skyplane/obj_store/s3_interface.py +++ b/skyplane/obj_store/s3_interface.py @@ -45,8 +45,12 @@ def aws_region(self): if "An error occurred (AccessDenied) when calling the GetBucketLocation operation" in str(e): logger.warning(f"Bucket location {self.bucket_name} is not public. Assuming region is {default_region}") return default_region - logger.warning(f"Specified bucket {self.bucket_name} does not exist, got AWS error: {e}") - raise exceptions.MissingBucketException(f"S3 bucket {self.bucket_name} does not exist") from e + elif "An error occurred (InvalidAccessKeyId) when calling" in str(e): + logger.warning(f"Invalid AWS credentials. Check to make sure credentials configured properly.") + raise exceptions.PermissionsException(f"Invalid AWS credentials for accessing bucket {self.bucket_name}") + else: + logger.warning(f"Specified bucket {self.bucket_name} does not exist, got AWS error: {e}") + raise exceptions.MissingBucketException(f"S3 bucket {self.bucket_name} does not exist") from e def region_tag(self): return "aws:" + self.aws_region diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 998167d6d..c1c72ef0c 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -117,7 +117,6 @@ def _calculate_vm_types(self, region_tag: str) -> Optional[Tuple[str, int]]: quota_limit = self._get_quota_limits_for( cloud_provider=cloud_provider, region=region, spot=getattr(self.transfer_config, f"{cloud_provider}_use_spot_instances") ) - print("QUOTA LIMIT", region_tag, quota_limit) config_vm_type = getattr(self.transfer_config, f"{cloud_provider}_instance_class") @@ -267,7 +266,6 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" - print("REGION TAGS", src_region_tag, dst_region_tags) plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags) # Dynammically calculate n_instances based on quota limits diff --git a/tests/integration/test_cp.py b/tests/integration/test_cp.py index a67585f6d..7d6b4e03b 100644 --- a/tests/integration/test_cp.py +++ b/tests/integration/test_cp.py @@ -99,6 +99,9 @@ def cloudflare_bucket(): # TODO: add more parameters for bucket types +#@pytest.mark.parametrize( # tests large objects +# "test_case, recursive", [(test_bucket_medium_file, True), (test_bucket_large_file, False), (test_bucket_small_file, True)] +#) @pytest.mark.parametrize("test_case, recursive", [(test_bucket_medium_file, True)]) def test_azure(azure_bucket, gcp_bucket, test_case, recursive): """ @@ -110,6 +113,7 @@ def test_azure(azure_bucket, gcp_bucket, test_case, recursive): print("DEST", azure_bucket.path(), gcp_bucket.path()) client = SkyplaneClient() src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) + print(azure_bucket.path()) assert isinstance(azure_bucket.bucket(), str), f"Bucket name is not a string {azure_bucket.bucket()}" assert ( @@ -139,6 +143,8 @@ def test_aws(aws_bucket, gcp_bucket, test_case, recursive): """ client = SkyplaneClient() src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) + print("test case:", test_case) + print(aws_bucket.path()) assert isinstance(aws_bucket.bucket(), str), f"Bucket name is not a string {aws_bucket.bucket()}" assert ( @@ -205,22 +211,36 @@ def test_gcp(gcp_bucket, test_case, recursive): assert len(dst_objects) > 0, f"Object {test_case} not copied to {gcp_bucket.bucket()}: only container {dst_objects}" +@pytest.mark.timeout(60*20) def test_same_region(same_region_bucket): client = SkyplaneClient() client.copy(test_bucket_large_file, f"{same_region_bucket.path()}") +@pytest.mark.timeout(60*20) def test_pipeline(gcp_bucket): """Test pipeline's ability to run multiple copy jobs on a single dataplane""" client = SkyplaneClient() pipeline = client.pipeline() # queue two copy jobs - pipeline.queue_copy(test_bucket_large_file, gcp_bucket.path()) - pipeline.queue_copy(test_bucket_medium_file, gcp_bucket.path()) + pipeline.queue_copy(test_bucket_large_file, f"{gcp_bucket.path()}/large/") + pipeline.queue_copy(test_bucket_medium_file, f"{gcp_bucket.path()}/medium/", recursive=True) # start pipeline - pipeline.start(debug=True, progress=True) + try: + pipeline.start(debug=True, progress=True) + except Exception as e: + print(e) + raise e + + assert len(list(gcp_bucket.list_objects(prefix="large/"))) > 0, f"No data from {test_bucket_large_file} transferred" + assert len(list(gcp_bucket.list_objects(prefix="medium/"))) > 0, f"No data from {test_bucket_medium_file} transferred" + + print("arge", list(gcp_bucket.list_objects(prefix="large/"))) + print("medium", list(gcp_bucket.list_objects(prefix="medium/"))) + + # test one sided transfers @@ -230,6 +250,7 @@ def test_cp_one_sided(): # test multiple VMs +@pytest.mark.timeout(60*20) def test_cp_multiple_vms(aws_bucket): client = SkyplaneClient() pipeline = client.pipeline(max_instances=2) @@ -239,6 +260,7 @@ def test_cp_multiple_vms(aws_bucket): # test multicast # TODO: add azure +@pytest.mark.timeout(60*20) def test_cp_multicast(aws_bucket, gcp_bucket, same_region_bucket): client = SkyplaneClient() src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) From 801371ce84630b6bd14bf01e6684a996098c0471 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Tue, 20 Jun 2023 22:18:21 +0000 Subject: [PATCH 11/13] fixed pipeline bug --- skyplane/api/tracker.py | 213 +++++++++++++++++++---------------- skyplane/api/transfer_job.py | 16 ++- skyplane/planner/planner.py | 8 +- tests/integration/test_cp.py | 2 +- 4 files changed, 134 insertions(+), 105 deletions(-) diff --git a/skyplane/api/tracker.py b/skyplane/api/tracker.py index c7d2218ba..435b35976 100644 --- a/skyplane/api/tracker.py +++ b/skyplane/api/tracker.py @@ -135,23 +135,27 @@ def run(self): # "src_spot_instance": getattr(self.transfer_config, f"{src_cloud_provider}_use_spot_instances"), # "dst_spot_instance": getattr(self.transfer_config, f"{dst_cloud_provider}_use_spot_instances"), } + # TODO: eventually jobs should be able to be concurrently dispatched and executed + # however this will require being able to handle conflicting multipart uploads ids + + # initialize everything first + for job_uuid, job in self.jobs.items(): + self.job_chunk_requests[job_uuid] = {} + self.job_pending_chunk_ids[job_uuid] = {region: set() for region in self.dataplane.topology.dest_region_tags} + self.job_complete_chunk_ids[job_uuid] = {region: set() for region in self.dataplane.topology.dest_region_tags} + session_start_timestamp_ms = int(time.time() * 1000) - try: + for job_uuid, job in self.jobs.items(): # pre-dispatch chunks to begin pre-buffering chunks - chunk_streams = { - job_uuid: job.dispatch(self.dataplane, transfer_config=self.transfer_config) for job_uuid, job in self.jobs.items() - } - print("JOBS", self.jobs) - print(chunk_streams, "jobs", self.jobs) - for job_uuid, job in self.jobs.items(): + try: + chunk_stream = job.dispatch(self.dataplane, transfer_config=self.transfer_config) logger.fs.debug(f"[TransferProgressTracker] Dispatching job {job.uuid}") - self.job_chunk_requests[job_uuid] = {} - self.job_pending_chunk_ids[job_uuid] = {region: set() for region in self.dataplane.topology.dest_region_tags} - self.job_complete_chunk_ids[job_uuid] = {region: set() for region in self.dataplane.topology.dest_region_tags} - - for chunk in chunk_streams[job_uuid]: + for chunk in chunk_stream: chunks_dispatched = [chunk] + # TODO: check chunk ID self.job_chunk_requests[job_uuid][chunk.chunk_id] = chunk + print("DISPATCHED", chunk.chunk_id, job_uuid) + assert job_uuid in self.job_chunk_requests and chunk.chunk_id in self.job_chunk_requests[job_uuid] self.hooks.on_chunk_dispatched(chunks_dispatched) for region in self.dataplane.topology.dest_region_tags: self.job_pending_chunk_ids[job_uuid][region].add(chunk.chunk_id) @@ -159,108 +163,111 @@ def run(self): logger.fs.debug( f"[TransferProgressTracker] Job {job.uuid} dispatched with {len(self.job_chunk_requests[job_uuid])} chunk requests" ) + except Exception as e: + UsageClient.log_exception( + "dispatch job", + e, + args, + self.dataplane.topology.src_region_tag, + self.dataplane.topology.dest_region_tags[0], # TODO: support multiple destinations + session_start_timestamp_ms, + ) + raise e - except Exception as e: - UsageClient.log_exception( - "dispatch job", - e, - args, - self.dataplane.topology.src_region_tag, - self.dataplane.topology.dest_region_tags[0], # TODO: support multiple destinations - session_start_timestamp_ms, - ) - raise e + print("AFTER DISPATCH", len(list(self.job_chunk_requests.keys())), job_uuid, self.job_chunk_requests) + + self.hooks.on_dispatch_end() + + print("MAPPING", self._chunk_to_job_map) + def monitor_single_dst_helper(dst_region): + start_time = time.time() + try: + self.monitor_transfer(dst_region) + except exceptions.SkyplaneGatewayException as err: + reformat_err = Exception(err.pretty_print_str()[37:]) + UsageClient.log_exception( + "monitor transfer", + reformat_err, + args, + self.dataplane.topology.src_region_tag, + dst_region, + session_start_timestamp_ms, + ) + raise err + except Exception as e: + UsageClient.log_exception( + "monitor transfer", e, args, self.dataplane.topology.src_region_tag, dst_region, session_start_timestamp_ms + ) + raise e + end_time = time.time() + + runtime_s = end_time - start_time + # transfer successfully completed + transfer_stats = { + "dst_region": dst_region, + "total_runtime_s": round(runtime_s, 4), + } + + results = [] + dest_regions = self.dataplane.topology.dest_region_tags + with ThreadPoolExecutor(max_workers=len(dest_regions)) as executor: + e2e_start_time = time.time() + try: + future_list = [executor.submit(monitor_single_dst_helper, dest) for dest in dest_regions] + for future in as_completed(future_list): + results.append(future.result()) + except Exception as e: + raise e + e2e_end_time = time.time() + transfer_stats = { + "total_runtime_s": e2e_end_time - e2e_start_time, + "throughput_gbits": self.query_bytes_dispatched() / (e2e_end_time - e2e_start_time) / GB * 8, + } + self.hooks.on_transfer_end() - self.hooks.on_dispatch_end() - def monitor_single_dst_helper(dst_region): - start_time = time.time() + start_time = int(time.time()) try: - self.monitor_transfer(dst_region) - except exceptions.SkyplaneGatewayException as err: - reformat_err = Exception(err.pretty_print_str()[37:]) + for job in self.jobs.values(): + logger.fs.debug(f"[TransferProgressTracker] Finalizing job {job.uuid}") + job.finalize() + except Exception as e: UsageClient.log_exception( - "monitor transfer", - reformat_err, + "finalize job", + e, args, self.dataplane.topology.src_region_tag, - dst_region, + self.dataplane.topology.dest_region_tags[0], session_start_timestamp_ms, ) - raise err + raise e + end_time = int(time.time()) + + # verify transfer + try: + for job in self.jobs.values(): + logger.fs.debug(f"[TransferProgressTracker] Verifying job {job.uuid}") + job.verify() except Exception as e: UsageClient.log_exception( - "monitor transfer", e, args, self.dataplane.topology.src_region_tag, dst_region, session_start_timestamp_ms + "verify job", + e, + args, + self.dataplane.topology.src_region_tag, + self.dataplane.topology.dest_region_tags[0], + session_start_timestamp_ms, ) raise e - end_time = time.time() - runtime_s = end_time - start_time # transfer successfully completed - transfer_stats = { - "dst_region": dst_region, - "total_runtime_s": round(runtime_s, 4), - } - - results = [] - dest_regions = self.dataplane.topology.dest_region_tags - with ThreadPoolExecutor(max_workers=len(dest_regions)) as executor: - e2e_start_time = time.time() - try: - future_list = [executor.submit(monitor_single_dst_helper, dest) for dest in dest_regions] - for future in as_completed(future_list): - results.append(future.result()) - except Exception as e: - raise e - e2e_end_time = time.time() - transfer_stats = { - "total_runtime_s": e2e_end_time - e2e_start_time, - "throughput_gbits": self.query_bytes_dispatched() / (e2e_end_time - e2e_start_time) / GB * 8, - } - self.hooks.on_transfer_end() - - start_time = int(time.time()) - try: - for job in self.jobs.values(): - logger.fs.debug(f"[TransferProgressTracker] Finalizing job {job.uuid}") - job.finalize() - except Exception as e: - UsageClient.log_exception( - "finalize job", - e, + UsageClient.log_transfer( + transfer_stats, args, self.dataplane.topology.src_region_tag, - self.dataplane.topology.dest_region_tags[0], + self.dataplane.topology.dest_region_tags, session_start_timestamp_ms, ) - raise e - end_time = int(time.time()) - - # verify transfer - try: - for job in self.jobs.values(): - logger.fs.debug(f"[TransferProgressTracker] Verifying job {job.uuid}") - job.verify() - except Exception as e: - UsageClient.log_exception( - "verify job", - e, - args, - self.dataplane.topology.src_region_tag, - self.dataplane.topology.dest_region_tags[0], - session_start_timestamp_ms, - ) - raise e - - # transfer successfully completed - UsageClient.log_transfer( - transfer_stats, - args, - self.dataplane.topology.src_region_tag, - self.dataplane.topology.dest_region_tags, - session_start_timestamp_ms, - ) - print_stats_completed(total_runtime_s=transfer_stats["total_runtime_s"], throughput_gbits=transfer_stats["throughput_gbits"]) + print_stats_completed(total_runtime_s=transfer_stats["total_runtime_s"], throughput_gbits=transfer_stats["throughput_gbits"]) @imports.inject("pandas") def monitor_transfer(pd, self, region_tag): @@ -278,7 +285,7 @@ def monitor_transfer(pd, self, region_tag): # check for errors and exit if there are any (while setting debug flags) errors = self.dataplane.check_error_logs() if any(errors.values()): - print("errors", errors) + print("error", errors) logger.warning("Copying gateway logs...") self.dataplane.copy_gateway_logs() self.errors = errors @@ -302,13 +309,20 @@ def monitor_transfer(pd, self, region_tag): # update job_complete_chunk_ids and job_pending_chunk_ids # TODO: do chunk-tracking per-destination for job_uuid, job in self.jobs.items(): - job_complete_chunk_ids = set(chunk_id for chunk_id in completed_chunk_ids if self._chunk_to_job_map[chunk_id] == job_uuid) + try: + job_complete_chunk_ids = set(chunk_id for chunk_id in completed_chunk_ids if self._chunk_to_job_map[chunk_id] == job_uuid) + except Exception as e: + print("chunk_to_job", self._chunk_to_job_map) + print("chunk req", self.job_chunk_requests) + raise e new_chunk_ids = ( self.job_complete_chunk_ids[job_uuid][region_tag] .union(job_complete_chunk_ids) .difference(self.job_complete_chunk_ids[job_uuid][region_tag]) ) completed_chunks = [] + for id in new_chunk_ids: + assert job_uuid in self.job_chunk_requests and id in self.job_chunk_requests[job_uuid], f"Missing chunk id {id} for job {job_uuid}: {self.job_chunk_requests}" for id in new_chunk_ids: completed_chunks.append(self.job_chunk_requests[job_uuid][id]) self.hooks.on_chunk_completed(completed_chunks, region_tag) @@ -322,8 +336,11 @@ def monitor_transfer(pd, self, region_tag): time.sleep(0.05) @property - @functools.lru_cache(maxsize=1) + # TODO: this is a very slow function, but we can't cache it since self.job_chunk_requests changes over time + # do not call it more often than necessary def _chunk_to_job_map(self): + for job in self.job_chunk_requests.keys(): + print(f"all chunks {job}", list(self.job_chunk_requests[job].keys())) return {chunk_id: job_uuid for job_uuid, cr_dict in self.job_chunk_requests.items() for chunk_id in cr_dict.keys()} def _query_chunk_status(self): diff --git a/skyplane/api/transfer_job.py b/skyplane/api/transfer_job.py index 134fc335c..2fdc103f3 100644 --- a/skyplane/api/transfer_job.py +++ b/skyplane/api/transfer_job.py @@ -268,6 +268,7 @@ def transfer_pair_generator( logger.fs.debug(f"Querying objects in {self.src_iface.path()}") n_objs = 0 for obj in self.src_iface.list_objects(src_prefix): + print(obj) if prefilter_fn is None or prefilter_fn(obj): # collect list of destination objects dest_objs = {} @@ -334,6 +335,7 @@ def chunk(self, transfer_pair_generator: Generator[TransferPair, None, None]) -> # begin chunking loop for transfer_pair in transfer_pair_generator: + #print("transfer_pair", transfer_pair.src_obj.key, transfer_pair.dst_objs) src_obj = transfer_pair.src_obj if ( not azure_dest @@ -460,7 +462,7 @@ def __init__( dst_paths: List[str] or str, recursive: bool = False, requester_pays: bool = False, - job_id: Optional[str] = None + job_id: Optional[str] = None, ): self.src_path = src_path self.dst_paths = dst_paths @@ -648,6 +650,9 @@ def dispatch( # send chunk requests to source gateways chunk_batch = [cr.chunk for cr in batch if cr.chunk is not None] + # TODO: allow multiple partition ids per chunk + for chunk in chunk_batch: # assign job UUID as partition ID + chunk.partition_id = self.uuid min_idx = queue_size.index(min(queue_size)) n_added = 0 while n_added < len(chunk_batch): @@ -656,6 +661,7 @@ def dispatch( assert Chunk.from_dict(chunk_batch[0].as_dict()) == chunk_batch[0], f"Invalid chunk request: {chunk_batch[0].as_dict}" # TODO: make async + print("dispatch chunks", [chunk.as_dict() for chunk in chunk_batch[n_added:]]) reply = self.http_pool.request( "POST", f"{server.gateway_api_url}/api/v1/chunk_requests", @@ -694,11 +700,14 @@ def finalize(self): def complete_fn(batch): for req in batch: - logger.fs.debug(f"Finalize upload id {req['upload_id']} for key {req['key']}") + logger.fs.debug(f"Finalize upload id {req['upload_id']} for key {req['key']} bucket {bucket}") retry_backoff(partial(obj_store_interface.complete_multipart_upload, req["key"], req["upload_id"]), initial_backoff=0.5) do_parallel(complete_fn, batches, n=8) + # TODO: Do NOT do this if we are pipelining multiple transfers - remove just what was completed + self.multipart_transfer_list = [] + def verify(self): """Verify the integrity of the transfered destination objects""" @@ -805,7 +814,9 @@ def _enrich_dest_objs( dest_prefix = dest_prefixes[i] logger.fs.debug(f"Querying objects in {dst_iface.bucket()}") if not hasattr(self, "_found_dest_objs"): + print("isting") self._found_dest_objs = {obj.key: obj for obj in dst_iface.list_objects(dest_prefix)} + print("done") for pair in transfer_pairs: src_obj = pair.src_obj dest_obj = list(pair.dst_objs.values())[0] @@ -813,6 +824,7 @@ def _enrich_dest_objs( if dest_obj.key in self._found_dest_objs: dest_obj.size = self._found_dest_objs[dest_obj.key].size dest_obj.last_modified = self._found_dest_objs[dest_obj.key].last_modified + print(src_obj, dest_obj) yield src_obj, dest_obj @classmethod diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index c1c72ef0c..f2131edad 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -213,7 +213,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: dst_bucket = job.dst_ifaces[0].bucket() # give each job a different partition id, so we can read/write to different buckets - partition_id = jobs.index(job) + partition_id = job.uuid # source region gateway program obj_store_read = src_program.add_operator( @@ -288,7 +288,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_provider = src_region_tag.split(":")[0] # give each job a different partition id, so we can read/write to different buckets - partition_id = jobs.index(job) + partition_id = job.uuid # source region gateway program obj_store_read = src_program.add_operator( @@ -380,7 +380,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_provider = src_region_tag.split(":")[0] # give each job a different partition id, so we can read/write to different buckets - partition_id = jobs.index(job) + partition_id = job.uuid # source region gateway program obj_store_read = src_program.add_operator( @@ -441,7 +441,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag = job.src_iface.region_tag() src_provider = src_region_tag.split(":")[0] - partition_id = jobs.index(job) + partition_id = job.uuid # send to all destination dst_prefixes = job.dst_prefixes diff --git a/tests/integration/test_cp.py b/tests/integration/test_cp.py index 7d6b4e03b..02567f798 100644 --- a/tests/integration/test_cp.py +++ b/tests/integration/test_cp.py @@ -219,7 +219,7 @@ def test_same_region(same_region_bucket): @pytest.mark.timeout(60*20) def test_pipeline(gcp_bucket): - """Test pipeline's ability to run multiple copy jobs on a single dataplane""" +"""Test pipeline's ability to run multiple copy jobs on a single dataplane""" client = SkyplaneClient() pipeline = client.pipeline() From 54ffa1f39be2ac109058cd7186880d859e8b3937 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Wed, 21 Jun 2023 03:40:47 +0000 Subject: [PATCH 12/13] cleanup --- skyplane/api/dataplane.py | 5 +++-- skyplane/api/tracker.py | 32 ++++++++++++----------------- skyplane/api/transfer_job.py | 27 ++++++++---------------- skyplane/gateway/gateway_program.py | 4 ++-- skyplane/obj_store/s3_interface.py | 2 +- tests/integration/test_cp.py | 26 +++++++++++------------ 6 files changed, 39 insertions(+), 57 deletions(-) diff --git a/skyplane/api/dataplane.py b/skyplane/api/dataplane.py index 6a5736fd2..4b4c3cd38 100644 --- a/skyplane/api/dataplane.py +++ b/skyplane/api/dataplane.py @@ -64,8 +64,8 @@ def __init__( self.topology = topology self.provisioner = provisioner self.transfer_config = transfer_config - # disable for azure - # TODO: remove this + # disable for azure + # TODO: remove this self.http_pool = urllib3.PoolManager(retries=urllib3.Retry(total=3)) self.provisioning_lock = threading.Lock() self.provisioned = False @@ -236,6 +236,7 @@ def copy_log(instance): instance.run_command("sudo docker logs -t skyplane_gateway 2> /tmp/gateway.stderr > /tmp/gateway.stdout") instance.download_file("/tmp/gateway.stdout", out_file) instance.download_file("/tmp/gateway.stderr", err_file) + print("COPY GATEWAY LOGS") do_parallel(copy_log, self.bound_nodes.values(), n=-1) diff --git a/skyplane/api/tracker.py b/skyplane/api/tracker.py index 435b35976..1354b90b1 100644 --- a/skyplane/api/tracker.py +++ b/skyplane/api/tracker.py @@ -135,10 +135,10 @@ def run(self): # "src_spot_instance": getattr(self.transfer_config, f"{src_cloud_provider}_use_spot_instances"), # "dst_spot_instance": getattr(self.transfer_config, f"{dst_cloud_provider}_use_spot_instances"), } - # TODO: eventually jobs should be able to be concurrently dispatched and executed - # however this will require being able to handle conflicting multipart uploads ids + # TODO: eventually jobs should be able to be concurrently dispatched and executed + # however this will require being able to handle conflicting multipart uploads ids - # initialize everything first + # initialize everything first for job_uuid, job in self.jobs.items(): self.job_chunk_requests[job_uuid] = {} self.job_pending_chunk_ids[job_uuid] = {region: set() for region in self.dataplane.topology.dest_region_tags} @@ -152,9 +152,8 @@ def run(self): logger.fs.debug(f"[TransferProgressTracker] Dispatching job {job.uuid}") for chunk in chunk_stream: chunks_dispatched = [chunk] - # TODO: check chunk ID + # TODO: check chunk ID self.job_chunk_requests[job_uuid][chunk.chunk_id] = chunk - print("DISPATCHED", chunk.chunk_id, job_uuid) assert job_uuid in self.job_chunk_requests and chunk.chunk_id in self.job_chunk_requests[job_uuid] self.hooks.on_chunk_dispatched(chunks_dispatched) for region in self.dataplane.topology.dest_region_tags: @@ -174,11 +173,8 @@ def run(self): ) raise e - print("AFTER DISPATCH", len(list(self.job_chunk_requests.keys())), job_uuid, self.job_chunk_requests) - self.hooks.on_dispatch_end() - print("MAPPING", self._chunk_to_job_map) def monitor_single_dst_helper(dst_region): start_time = time.time() try: @@ -225,7 +221,6 @@ def monitor_single_dst_helper(dst_region): } self.hooks.on_transfer_end() - start_time = int(time.time()) try: for job in self.jobs.values(): @@ -285,7 +280,6 @@ def monitor_transfer(pd, self, region_tag): # check for errors and exit if there are any (while setting debug flags) errors = self.dataplane.check_error_logs() if any(errors.values()): - print("error", errors) logger.warning("Copying gateway logs...") self.dataplane.copy_gateway_logs() self.errors = errors @@ -310,10 +304,10 @@ def monitor_transfer(pd, self, region_tag): # TODO: do chunk-tracking per-destination for job_uuid, job in self.jobs.items(): try: - job_complete_chunk_ids = set(chunk_id for chunk_id in completed_chunk_ids if self._chunk_to_job_map[chunk_id] == job_uuid) - except Exception as e: - print("chunk_to_job", self._chunk_to_job_map) - print("chunk req", self.job_chunk_requests) + job_complete_chunk_ids = set( + chunk_id for chunk_id in completed_chunk_ids if self._chunk_to_job_map[chunk_id] == job_uuid + ) + except Exception as e: raise e new_chunk_ids = ( self.job_complete_chunk_ids[job_uuid][region_tag] @@ -322,7 +316,9 @@ def monitor_transfer(pd, self, region_tag): ) completed_chunks = [] for id in new_chunk_ids: - assert job_uuid in self.job_chunk_requests and id in self.job_chunk_requests[job_uuid], f"Missing chunk id {id} for job {job_uuid}: {self.job_chunk_requests}" + assert ( + job_uuid in self.job_chunk_requests and id in self.job_chunk_requests[job_uuid] + ), f"Missing chunk id {id} for job {job_uuid}: {self.job_chunk_requests}" for id in new_chunk_ids: completed_chunks.append(self.job_chunk_requests[job_uuid][id]) self.hooks.on_chunk_completed(completed_chunks, region_tag) @@ -336,11 +332,9 @@ def monitor_transfer(pd, self, region_tag): time.sleep(0.05) @property - # TODO: this is a very slow function, but we can't cache it since self.job_chunk_requests changes over time - # do not call it more often than necessary + # TODO: this is a very slow function, but we can't cache it since self.job_chunk_requests changes over time + # do not call it more often than necessary def _chunk_to_job_map(self): - for job in self.job_chunk_requests.keys(): - print(f"all chunks {job}", list(self.job_chunk_requests[job].keys())) return {chunk_id: job_uuid for job_uuid, cr_dict in self.job_chunk_requests.items() for chunk_id in cr_dict.keys()} def _query_chunk_status(self): diff --git a/skyplane/api/transfer_job.py b/skyplane/api/transfer_job.py index 2fdc103f3..aa78f8551 100644 --- a/skyplane/api/transfer_job.py +++ b/skyplane/api/transfer_job.py @@ -113,7 +113,7 @@ def _run_multipart_chunk_thread( for dest_iface in self.dst_ifaces: dest_object = dest_objects[dest_iface.region_tag()] upload_id = dest_iface.initiate_multipart_upload(dest_object.key, mime_type=mime_type) - print(f"Created upload id for key {dest_object.key} with upload id {upload_id} for bucket {dest_iface.bucket_name}") + # print(f"Created upload id for key {dest_object.key} with upload id {upload_id} for bucket {dest_iface.bucket_name}") # store mapping between key and upload id for each region upload_id_mapping[dest_iface.region_tag()] = (src_object.key, upload_id) out_queue_chunks.put(GatewayMessage(upload_id_mapping=upload_id_mapping)) # send to output queue @@ -268,7 +268,6 @@ def transfer_pair_generator( logger.fs.debug(f"Querying objects in {self.src_iface.path()}") n_objs = 0 for obj in self.src_iface.list_objects(src_prefix): - print(obj) if prefilter_fn is None or prefilter_fn(obj): # collect list of destination objects dest_objs = {} @@ -335,7 +334,7 @@ def chunk(self, transfer_pair_generator: Generator[TransferPair, None, None]) -> # begin chunking loop for transfer_pair in transfer_pair_generator: - #print("transfer_pair", transfer_pair.src_obj.key, transfer_pair.dst_objs) + # print("transfer_pair", transfer_pair.src_obj.key, transfer_pair.dst_objs) src_obj = transfer_pair.src_obj if ( not azure_dest @@ -462,7 +461,7 @@ def __init__( dst_paths: List[str] or str, recursive: bool = False, requester_pays: bool = False, - job_id: Optional[str] = None, + job_id: Optional[str] = None, ): self.src_path = src_path self.dst_paths = dst_paths @@ -564,7 +563,7 @@ def __init__( dst_paths: List[str] or str, recursive: bool = False, requester_pays: bool = False, - job_id: Optional[str] = None + job_id: Optional[str] = None, ): super().__init__(src_path, dst_paths, recursive, requester_pays, job_id) self.transfer_list = [] @@ -650,8 +649,8 @@ def dispatch( # send chunk requests to source gateways chunk_batch = [cr.chunk for cr in batch if cr.chunk is not None] - # TODO: allow multiple partition ids per chunk - for chunk in chunk_batch: # assign job UUID as partition ID + # TODO: allow multiple partition ids per chunk + for chunk in chunk_batch: # assign job UUID as partition ID chunk.partition_id = self.uuid min_idx = queue_size.index(min(queue_size)) n_added = 0 @@ -661,7 +660,6 @@ def dispatch( assert Chunk.from_dict(chunk_batch[0].as_dict()) == chunk_batch[0], f"Invalid chunk request: {chunk_batch[0].as_dict}" # TODO: make async - print("dispatch chunks", [chunk.as_dict() for chunk in chunk_batch[n_added:]]) reply = self.http_pool.request( "POST", f"{server.gateway_api_url}/api/v1/chunk_requests", @@ -706,7 +704,7 @@ def complete_fn(batch): do_parallel(complete_fn, batches, n=8) # TODO: Do NOT do this if we are pipelining multiple transfers - remove just what was completed - self.multipart_transfer_list = [] + self.multipart_transfer_list = [] def verify(self): """Verify the integrity of the transfered destination objects""" @@ -757,13 +755,7 @@ def size_gb(self): class SyncJob(CopyJob): """sync job that copies the source objects that does not exist in the destination bucket to the destination""" - def __init__( - self, - src_path: str, - dst_paths: List[str] or str, - requester_pays: bool = False, - job_id: Optional[str] = None - ): + def __init__(self, src_path: str, dst_paths: List[str] or str, requester_pays: bool = False, job_id: Optional[str] = None): super().__init__(src_path, dst_paths, True, requester_pays, job_id) self.transfer_list = [] self.multipart_transfer_list = [] @@ -814,9 +806,7 @@ def _enrich_dest_objs( dest_prefix = dest_prefixes[i] logger.fs.debug(f"Querying objects in {dst_iface.bucket()}") if not hasattr(self, "_found_dest_objs"): - print("isting") self._found_dest_objs = {obj.key: obj for obj in dst_iface.list_objects(dest_prefix)} - print("done") for pair in transfer_pairs: src_obj = pair.src_obj dest_obj = list(pair.dst_objs.values())[0] @@ -824,7 +814,6 @@ def _enrich_dest_objs( if dest_obj.key in self._found_dest_objs: dest_obj.size = self._found_dest_objs[dest_obj.key].size dest_obj.last_modified = self._found_dest_objs[dest_obj.key].last_modified - print(src_obj, dest_obj) yield src_obj, dest_obj @classmethod diff --git a/skyplane/gateway/gateway_program.py b/skyplane/gateway/gateway_program.py index a8c1808e4..f275b82e8 100644 --- a/skyplane/gateway/gateway_program.py +++ b/skyplane/gateway/gateway_program.py @@ -113,7 +113,7 @@ def __init__(self): def get_operators(self) -> List[GatewayOperator]: return list(self._ops.values()) - def add_operators(self, ops: List[GatewayOperator], parent_handle: Optional[str] = None, partition_id: Optional[int] = 0): + def add_operators(self, ops: List[GatewayOperator], parent_handle: Optional[str] = None, partition_id: Optional[str] = "default"): parent_op = self._ops[parent_handle] if parent_handle else None ops_handles = [] for op in ops: @@ -121,7 +121,7 @@ def add_operators(self, ops: List[GatewayOperator], parent_handle: Optional[str] return ops_handles - def add_operator(self, op: GatewayOperator, parent_handle: Optional[str] = None, partition_id: Optional[int] = 0): + def add_operator(self, op: GatewayOperator, parent_handle: Optional[str] = None, partition_id: Optional[str] = "default"): parent_op = self._ops[parent_handle] if parent_handle else None if not parent_op: # root operation self._plan[partition_id].append(op) diff --git a/skyplane/obj_store/s3_interface.py b/skyplane/obj_store/s3_interface.py index 5e909095c..660c1eeff 100644 --- a/skyplane/obj_store/s3_interface.py +++ b/skyplane/obj_store/s3_interface.py @@ -47,7 +47,7 @@ def aws_region(self): return default_region elif "An error occurred (InvalidAccessKeyId) when calling" in str(e): logger.warning(f"Invalid AWS credentials. Check to make sure credentials configured properly.") - raise exceptions.PermissionsException(f"Invalid AWS credentials for accessing bucket {self.bucket_name}") + raise exceptions.PermissionsException(f"Invalid AWS credentials for accessing bucket {self.bucket_name}") else: logger.warning(f"Specified bucket {self.bucket_name} does not exist, got AWS error: {e}") raise exceptions.MissingBucketException(f"S3 bucket {self.bucket_name} does not exist") from e diff --git a/tests/integration/test_cp.py b/tests/integration/test_cp.py index 02567f798..0eed71598 100644 --- a/tests/integration/test_cp.py +++ b/tests/integration/test_cp.py @@ -20,7 +20,7 @@ @pytest.mark.skip(reason="Shared function") def setup_bucket(region_tag): provider, region = region_tag.split(":") - if provider == "azure": + if provider == "azure" or provider == "cloudflare": bucket_name = f"{str(uuid.uuid4())[:8]}/{str(uuid.uuid4()).replace('-', '')}" else: bucket_name = f"integration{region}-{str(uuid.uuid4())[:8]}" @@ -28,7 +28,10 @@ def setup_bucket(region_tag): # create bucket try: iface = ObjectStoreInterface.create(region_tag, bucket_name) - iface.create_bucket(region) + if provider == "cloudflare": + iface.create_bucket() + else: + iface.create_bucket(region) except Exception as e: logger.fs.error(f"Failed to create bucket {bucket_name}: {e}") raise e @@ -99,9 +102,9 @@ def cloudflare_bucket(): # TODO: add more parameters for bucket types -#@pytest.mark.parametrize( # tests large objects +# @pytest.mark.parametrize( # tests large objects # "test_case, recursive", [(test_bucket_medium_file, True), (test_bucket_large_file, False), (test_bucket_small_file, True)] -#) +# ) @pytest.mark.parametrize("test_case, recursive", [(test_bucket_medium_file, True)]) def test_azure(azure_bucket, gcp_bucket, test_case, recursive): """ @@ -211,15 +214,15 @@ def test_gcp(gcp_bucket, test_case, recursive): assert len(dst_objects) > 0, f"Object {test_case} not copied to {gcp_bucket.bucket()}: only container {dst_objects}" -@pytest.mark.timeout(60*20) +@pytest.mark.timeout(60 * 20) def test_same_region(same_region_bucket): client = SkyplaneClient() client.copy(test_bucket_large_file, f"{same_region_bucket.path()}") -@pytest.mark.timeout(60*20) +@pytest.mark.timeout(60 * 20) def test_pipeline(gcp_bucket): -"""Test pipeline's ability to run multiple copy jobs on a single dataplane""" + """Test pipeline's ability to run multiple copy jobs on a single dataplane""" client = SkyplaneClient() pipeline = client.pipeline() @@ -237,11 +240,6 @@ def test_pipeline(gcp_bucket): assert len(list(gcp_bucket.list_objects(prefix="large/"))) > 0, f"No data from {test_bucket_large_file} transferred" assert len(list(gcp_bucket.list_objects(prefix="medium/"))) > 0, f"No data from {test_bucket_medium_file} transferred" - print("arge", list(gcp_bucket.list_objects(prefix="large/"))) - print("medium", list(gcp_bucket.list_objects(prefix="medium/"))) - - - # test one sided transfers def test_cp_one_sided(): @@ -250,7 +248,7 @@ def test_cp_one_sided(): # test multiple VMs -@pytest.mark.timeout(60*20) +@pytest.mark.timeout(60 * 20) def test_cp_multiple_vms(aws_bucket): client = SkyplaneClient() pipeline = client.pipeline(max_instances=2) @@ -260,7 +258,7 @@ def test_cp_multiple_vms(aws_bucket): # test multicast # TODO: add azure -@pytest.mark.timeout(60*20) +@pytest.mark.timeout(60 * 20) def test_cp_multicast(aws_bucket, gcp_bucket, same_region_bucket): client = SkyplaneClient() src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) From e27b90d0ed1ab5943d4d7869d711a5cf84438145 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Wed, 21 Jun 2023 04:12:01 +0000 Subject: [PATCH 13/13] cleanup --- skyplane/api/pipeline.py | 1 - 1 file changed, 1 deletion(-) diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index f37f399dd..929a18832 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -101,7 +101,6 @@ def start(self, debug=False, progress=False): ## create dataplane from plan # dp = Dataplane(self.clientid, topo, self.provisioner, self.transfer_config, self.transfer_dir, debug=debug) dp = self.create_dataplane(debug) - print("pipeline", self.jobs_to_dispatch) try: dp.provision(spinner=True) if progress: