From b2741c5778ef572529eff2ddb6803e279a00eac5 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Thu, 15 Jun 2023 09:37:24 -0700 Subject: [PATCH 01/15] Make MulticastDirectPlanner take in `TransferConfig` (#872) --- Dockerfile | 26 +++++++++++++------------- skyplane/api/pipeline.py | 2 +- skyplane/planner/planner.py | 3 ++- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/Dockerfile b/Dockerfile index a861e1055..4a99ae0d1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,19 +7,19 @@ RUN --mount=type=cache,target=/var/cache/apt apt-get update \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* -#install HDFS Onprem Packages -RUN apt-get update && \ - apt-get install -y openjdk-11-jdk && \ - apt-get clean - -ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64 - -RUN wget https://archive.apache.org/dist/hadoop/core/hadoop-3.3.0/hadoop-3.3.0.tar.gz -P /tmp \ - && tar -xzf /tmp/hadoop-3.3.0.tar.gz -C /tmp \ - && mv /tmp/hadoop-3.3.0 /usr/local/hadoop \ - && rm /tmp/hadoop-3.3.0.tar.gz - -ENV HADOOP_HOME /usr/local/hadoop +##install HDFS Onprem Packages +#RUN apt-get update && \ +# apt-get install -y openjdk-11-jdk && \ +# apt-get clean +# +#ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64 +# +#RUN wget https://archive.apache.org/dist/hadoop/core/hadoop-3.3.0/hadoop-3.3.0.tar.gz -P /tmp \ +# && tar -xzf /tmp/hadoop-3.3.0.tar.gz -C /tmp \ +# && mv /tmp/hadoop-3.3.0 /usr/local/hadoop \ +# && rm /tmp/hadoop-3.3.0.tar.gz +# +#ENV HADOOP_HOME /usr/local/hadoop # configure stunnel RUN mkdir -p /etc/stunnel \ diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index 5471ebf55..17adf740e 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -68,7 +68,7 @@ def __init__( # planner self.planning_algorithm = planning_algorithm if self.planning_algorithm == "direct": - self.planner = MulticastDirectPlanner(self.max_instances, 64) + self.planner = MulticastDirectPlanner(self.max_instances, 64, self.transfer_config) elif self.planning_algorithm == "src_one_sided": self.planner = DirectPlannerSourceOneSided(self.max_instances, 64) elif self.planning_algorithm == "dst_one_sided": diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 59eeac33e..0110158cb 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -176,7 +176,8 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: vm_types = {v[0]: Planner._vcpus_to_vm(cloud_provider=v[0].split(":")[0], vcpus=v[1][0]) for v in vm_info} # type: ignore # Taking the minimum so that we can use the same number of instances for both source and destination - n_instances = min(v[1][1] for v in vm_info) # type: ignore + n_instances = min(self.n_instances, min(v[1][1] for v in vm_info)) # type: ignore + print("n_instances", n_instances) # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions for i in range(n_instances): From 3ba7d312efb7d5a2d214e3a5a29888058c872fdf Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Wed, 14 Jun 2023 21:43:44 -0700 Subject: [PATCH 02/15] Update tests for bucket cleanup + remove unnecessary installs (#865) S3 currently does not have proper bucket cleanup after testing leading to `TooManyBuckets` errors on tests after a certain point. This adds the cleanup logic to every integration test. Furthermore, the hadoop jdk installs were removed as they should not be required anymore. --- .github/workflows/integration-test-local.yml | 25 ++++++++++++++----- .../integration-test-multiple-sizes.yml | 21 +++++++++++----- .github/workflows/integration-test.yml | 8 +++--- .github/workflows/pytest.yml | 3 --- 4 files changed, 38 insertions(+), 19 deletions(-) diff --git a/.github/workflows/integration-test-local.yml b/.github/workflows/integration-test-local.yml index c73a9b9db..a19f2d813 100644 --- a/.github/workflows/integration-test-local.yml +++ b/.github/workflows/integration-test-local.yml @@ -73,23 +73,36 @@ jobs: STRATEGY_UUID: itest-d-${{ github.run_id }}-${{ github.run_attempt }} steps: - uses: actions/checkout@v1 + - name: Install poetry + run: pipx install poetry - name: Set up Python 3.10 uses: actions/setup-python@v4 with: python-version: "3.10" - - name: Install skyplane from pypi - run: pip install skyplane[aws,azure,gcp] + cache: "poetry" + - name: Set Poetry config + run: | + poetry config virtualenvs.in-project false + poetry config virtualenvs.path ~/.virtualenvs + - name: Install dependencies + run: poetry install -E aws -E azure -E gcp + if: steps.cache.outputs.cache-hit != 'true' - id: 'auth' uses: 'google-github-actions/auth@v1' with: credentials_json: '${{ secrets.GCP_CREDENTIALS_JSON }}' - name: Skyplane init run: | - skyplane config set gcp_service_account_name ${{ env.STRATEGY_UUID }} - skyplane init -y --disable-config-azure - skyplane config set usage_stats false + poetry run skyplane config set gcp_service_account_name ${{ env.STRATEGY_UUID }} + poetry run skyplane init -y --disable-config-azure --disable-config-cloudflare + poetry run skyplane config set usage_stats false - name: Deprovision - run: skyplane deprovision + run: poetry run skyplane deprovision + - name: Delete matching S3 buckets + run: | + for pattern in "test-skyplane-" "skyplane-integration-us-east-1-" "integrationus-east-1-"; do + aws s3api list-buckets --query "Buckets[?starts_with(Name, \`${pattern}\`) == \`true\`].Name" --output text | tr '\t' '\n' | while read bucket; do aws s3 rb "s3://$bucket" --force; done + done - name: Cleanup GCP service account if: always() run: gcloud iam service-accounts delete --quiet ${{ env.STRATEGY_UUID }}@${{ secrets.GCP_PROJECT_ID }}.iam.gserviceaccount.com diff --git a/.github/workflows/integration-test-multiple-sizes.yml b/.github/workflows/integration-test-multiple-sizes.yml index 3afe5245e..a6960ee08 100644 --- a/.github/workflows/integration-test-multiple-sizes.yml +++ b/.github/workflows/integration-test-multiple-sizes.yml @@ -79,23 +79,32 @@ jobs: STRATEGY_UUID: itest-d-${{ github.run_id }}-${{ github.run_attempt }} steps: - uses: actions/checkout@v1 + - name: Install poetry + run: pipx install poetry - name: Set up Python 3.10 uses: actions/setup-python@v4 with: python-version: "3.10" - - name: Install skyplane from pypi - run: pip install skyplane[aws,azure,gcp] + cache: "poetry" + - name: Install dependencies + run: poetry install -E aws -E azure -E gcp + if: steps.cache.outputs.cache-hit != 'true' - id: 'auth' uses: 'google-github-actions/auth@v1' with: credentials_json: '${{ secrets.GCP_CREDENTIALS_JSON }}' - name: Skyplane init run: | - skyplane config set gcp_service_account_name ${{ env.STRATEGY_UUID }} - skyplane init -y --disable-config-azure - skyplane config set usage_stats false + poetry run skyplane config set gcp_service_account_name ${{ env.STRATEGY_UUID }} + poetry run skyplane init -y --disable-config-azure --disable-config-cloudflare + poetry run skyplane config set usage_stats false - name: Deprovision - run: skyplane deprovision + run: poetry run skyplane deprovision + - name: Delete matching S3 buckets + run: | + for pattern in "test-skyplane-" "skyplane-integration-us-east-1-" "integrationus-east-1-"; do + aws s3api list-buckets --query "Buckets[?starts_with(Name, \`${pattern}\`) == \`true\`].Name" --output text | tr '\t' '\n' | while read bucket; do aws s3 rb "s3://$bucket" --force; done + done - name: Cleanup GCP service account if: always() run: gcloud iam service-accounts delete --quiet ${{ env.STRATEGY_UUID }}@${{ secrets.GCP_PROJECT_ID }}.iam.gserviceaccount.com diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 1bfcd7ab6..eb7b4822c 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -90,11 +90,11 @@ jobs: credentials_json: '${{ secrets.GCP_CREDENTIALS_JSON }}' - name: Skyplane init run: | - skyplane config set gcp_service_account_name ${{ env.STRATEGY_UUID }} - skyplane init -y --disable-config-azure - skyplane config set usage_stats false + poetry run skyplane config set gcp_service_account_name ${{ env.STRATEGY_UUID }} + poetry run skyplane init -y --disable-config-azure + poetry run skyplane config set usage_stats false - name: Deprovision - run: skyplane deprovision + run: poetry run skyplane deprovision - name: Delete matching S3 buckets run: | for pattern in "test-skyplane-" "skyplane-integration-us-east-1-" "integrationus-east-1-"; do diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 49e753645..b7ae05842 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -87,8 +87,6 @@ jobs: run: | poetry install -E gateway -E solver -E aws -E azure -E gcp -E ibm poetry run pip install -r requirements-dev.txt - poetry run sudo apt install default-jdk - poetry run wget https://archive.apache.org/dist/hadoop/core/hadoop-3.3.0/hadoop-3.3.0.tar.gz -P /tmp && tar -xzf /tmp/hadoop-3.3.0.tar.gz -C /tmp && sudo mv /tmp/hadoop-3.3.0 /usr/local/hadoop && rm /tmp/hadoop-3.3.0.tar.gz if: steps.cache.outputs.cache-hit != 'true' - name: Run cloud tests env: @@ -156,7 +154,6 @@ jobs: poetry install -E gateway -E solver -E aws -E azure -E gcp -E ibm poetry run pip install -r requirements-dev.txt poetry run sudo apt install default-jdk - poetry run wget https://archive.apache.org/dist/hadoop/core/hadoop-3.3.0/hadoop-3.3.0.tar.gz -P /tmp && tar -xzf /tmp/hadoop-3.3.0.tar.gz -C /tmp && sudo mv /tmp/hadoop-3.3.0 /usr/local/hadoop && rm /tmp/hadoop-3.3.0.tar.gz if: steps.cache.outputs.cache-hit != 'true' - id: 'auth' uses: 'google-github-actions/auth@v0' From faba29b92da2a976f5dd1a320b3d4b30d5bbfa25 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Mon, 12 Jun 2023 12:58:37 -0700 Subject: [PATCH 03/15] Disable cloudflare for integration tests (#864) --- skyplane/cli/cli_init.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/skyplane/cli/cli_init.py b/skyplane/cli/cli_init.py index 6b162805a..e946de451 100644 --- a/skyplane/cli/cli_init.py +++ b/skyplane/cli/cli_init.py @@ -535,9 +535,9 @@ def init( cloud_config = load_gcp_config(cloud_config, force_init=reinit_gcp, non_interactive=non_interactive) # load cloudflare config - if not reinit_cloudflare: - typer.secho("\n(1) configuring cloudflare R2:", fg="yellow", bold=True) - if not disable_config_aws: + if not reinit_cloudflare: # TODO: fix reinit logic + typer.secho("\n(4) Configuring Cloudflare R2:", fg="yellow", bold=True) + if not disable_config_cloudflare: cloud_config = load_cloudflare_config(cloud_config, non_interactive=non_interactive) # load IBMCloud config From c601c34c1716883fb64058069dc4dbbc81eee684 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=BCtfi=20Eren=20Erdo=C4=9Fan?= Date: Wed, 7 Jun 2023 20:49:20 -0700 Subject: [PATCH 04/15] little changes to the fall back logic + tests (#859) Instead of storing the vcpu limits as a hardcoded variable inside Planner, created a new .csv file that includes that information. When Planner is created, we read this csv file. Wrote test cases that include fake quota limits. --- skyplane/data/vcpu_info.csv | 27 +++ skyplane/planner/planner.py | 260 +++++++++++++-------------- tests/test_fall_back.py | 40 ----- tests/unit_nocloud/test_fall_back.py | 88 +++++++++ 4 files changed, 245 insertions(+), 170 deletions(-) create mode 100644 skyplane/data/vcpu_info.csv delete mode 100644 tests/test_fall_back.py create mode 100644 tests/unit_nocloud/test_fall_back.py diff --git a/skyplane/data/vcpu_info.csv b/skyplane/data/vcpu_info.csv new file mode 100644 index 000000000..cf48a3d3a --- /dev/null +++ b/skyplane/data/vcpu_info.csv @@ -0,0 +1,27 @@ +instance_name,cloud_provider,vcpu_cost +m5.8xlarge,aws,32 +m5.12xlarge,aws,48 +m5.24xlarge,aws,96 +m5.xlarge,aws,4 +m5.4xlarge,aws,16 +m5.large,aws,2 +m5.2xlarge,aws,8 +m5.16xlarge,aws,64 +n2-standard-128,gcp,128 +n2-standard-96,gcp,96 +n2-standard-80,gcp,80 +n2-standard-64,gcp,64 +n2-standard-48,gcp,48 +n2-standard-32,gcp,32 +n2-standard-16,gcp,16 +n2-standard-8,gcp,8 +n2-standard-4,gcp,4 +n2-standard-2,gcp,2 +Standard_D96_v5,azure,96 +Standard_D64_v5,azure,64 +Standard_D48_v5,azure,48 +Standard_D32_v5,azure,32 +Standard_D16_v5,azure,16 +Standard_D8_v5,azure,8 +Standard_D4_v5,azure,4 +Standard_D2_v5,azure,2 diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 0110158cb..a9c5ac235 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -1,6 +1,9 @@ +from collections import defaultdict from importlib.resources import path from typing import Dict, List, Optional, Tuple, Tuple import re +import os +import csv from skyplane import compute from skyplane.api.config import TransferConfig @@ -30,46 +33,112 @@ class Planner: def plan(self) -> TopologyPlan: raise NotImplementedError - @staticmethod - def _vm_to_vcpus(cloud_provider: str, vm: str) -> int: - if cloud_provider == "aws": - n_vcpus = re.findall(r"\d*x", vm)[0] - return 4 * int(n_vcpus.replace("x", "") or 1) if "x" in n_vcpus else int(n_vcpus) + def _vm_to_vcpus(self, cloud_provider: str, vm: str) -> int: + """Gets the vcpu_cost of the given vm instance (instance_name) + + :param cloud_provider: name of the cloud_provider + :type cloud_provider: str + :param instance_name: name of the vm instance + :type instance_name: str + """ + return self.vcpu_info[cloud_provider][vm] + + def _get_quota_limits_for(self, cloud_provider: str, region: str, spot: bool = False) -> Optional[int]: + """Gets the quota info from the saved files. Returns None if quota_info isn't loaded during `skyplane init` + or if the quota info doesn't include the region. + + :param cloud_provider: name of the cloud provider of the region + :type cloud_provider: str + :param region: name of the region for which to get the quota for + :type region: int + :param spot: whether to use spot specified by the user config (default: False) + :type spot: bool + """ + quota_limits = self.quota_limits[cloud_provider] + if not quota_limits: + # User needs to reinitialize to save the quota information + logger.warning(f"Please run `skyplane init --reinit-{cloud_provider}` to load the quota information") + return None + if cloud_provider == "gcp": + region_family = "-".join(region.split("-")[:2]) + if region_family in quota_limits: + return quota_limits[region_family] elif cloud_provider == "azure": - n_vcpus = re.findall(r"\d+", vm) - return int(n_vcpus[0]) - else: - return int(vm.split("-")[-1]) - - @staticmethod - def _vcpus_to_vm(cloud_provider: str, vcpus: int) -> str: - if cloud_provider == "aws": - return "m5.large" if vcpus == 2 else "m5.xlarge" if vcpus == 4 else f"m5.{vcpus // 4}xlarge" - vm_family = "Standard_D{}_v5" if cloud_provider == "azure" else "n2-standard-{}" - return vm_family.format(vcpus) - - # @staticmethod - # def _split_vcpus(num_vcpus: int, quota_limit: int) -> Optional[Tuple]: - # """Splits the total/target number of vcpus used into smaller partitions according to the quota limit with the - # given vcpu options. These options are the same for all providers for now. Enforces that the vcpu_count is - # all the same for all partitions, meaning that for a num_vcpus=32, you cannot partition into 1 vm of 16 vcpus and - # 2 vms of 8 vcpus but can partition into 4 vms of 8 vcpus. Returns the number of partitions and their vcpus usage - # """ - # # If the desired vCPU count is within the quota limit, don't fall back - # if num_vcpus <= quota_limit: - # return (1, num_vcpus) - - # # Otherwise, try to split the desired vCPU count into smaller portions that are within the quota limit and use the largest option - # else: - # for vcpu_count in Planner._VCPUS: - # if vcpu_count <= quota_limit and vcpu_count <= num_vcpus: - # portions = num_vcpus // vcpu_count - # remaining_vcpus = num_vcpus - (vcpu_count * portions) - # # If the remaining vCPUs are 0, use the current option to launch all portions - # if remaining_vcpus == 0: - # return (portions, vcpu_count) # [vcpu_count] * portions - # # Return None if no valid vCPU portions were found - # return None + if region in quota_limits: + return quota_limits[region] + elif cloud_provider == "aws": + for quota in quota_limits: + if quota["region_name"] == region: + return quota["spot_standard_vcpus"] if spot else quota["on_demand_standard_vcpus"] + return None + + def _calculate_vm_types(self, region_tag: str) -> Optional[Tuple[str, int]]: + """Calculates the largest allowed vm type according to the regional quota limit as well as + how many of these vm types can we launch to avoid QUOTA_EXCEEDED errors. Returns None if quota + information wasn't properly loaded or allowed vcpu list is wrong. + + :param region_tag: tag of the node we are calculating the above for, example -> "aws:us-east-1" + :type region_tag: str + """ + cloud_provider, region = region_tag.split(":") + + # Get the quota limit + quota_limit = self._get_quota_limits_for( + cloud_provider=cloud_provider, region=region, spot=getattr(self.transfer_config, f"{cloud_provider}_use_spot_instances") + ) + + # No quota limits (quota limits weren't initialized properly during skyplane init) + if quota_limit is None: + return None + + config_vm_type = getattr(self.transfer_config, f"{cloud_provider}_instance_class") + config_vcpus = self._vm_to_vcpus(cloud_provider, config_vm_type) + if config_vcpus <= quota_limit: + return config_vm_type, quota_limit // config_vcpus + + vm_type, vcpus = None, None + for instance_name, vcpu_cost in sorted(self.vcpu_info[cloud_provider].items(), key=lambda x: x[1], reverse=True): + if vcpu_cost <= quota_limit: + vm_type, vcpus = instance_name, vcpu_cost + break + + # shouldn't happen, but just in case we use more complicated vm types in the future + if vm_type is None or vcpus is None: + return None + + # number of instances allowed by the quota with the selected vm type + n_instances = quota_limit // vcpus + logger.warning( + f"Falling back to instance class `{vm_type}` at {region_tag} " + f"due to cloud vCPU limit of {quota_limit}. You can visit https://skyplane.org/en/latest/increase_vcpus.html " + "to learn more about how to increase your cloud vCPU limits for any cloud provider." + ) + return (vm_type, n_instances) + + def _get_vm_type_and_instances( + self, src_region_tag: Optional[str] = None, dst_region_tags: Optional[List[str]] = None + ) -> Tuple[Dict[str, str], int]: + """Dynamically calculates the vm type each region can use (both the source region and all destination regions) + based on their quota limits and calculates the number of vms to launch in all regions by conservatively + taking the minimum of all regions to stay consistent. + + :param src_region_tag: the source region tag (default: None) + :type src_region_tag: Optional[str] + :param dst_region_tags: a list of the destination region tags (defualt: None) + :type dst_region_tags: Optional[List[str]] + """ + # One of them has to provided + assert src_region_tag is not None or dst_region_tags is not None, "There needs to be at least one source or destination" + src_tags = [src_region_tag] if src_region_tag is not None else [] + dst_tags = dst_region_tags or [] + + # do_parallel returns tuples of (region_tag, (vm_type, n_instances)) + vm_info = do_parallel(self._calculate_vm_types, src_tags + dst_tags) + # Specifies the vm_type for each region + vm_types = {v[0]: v[1][0] for v in vm_info} # type: ignore + # Taking the minimum so that we can use the same number of instances for both source and destination + n_instances = min([self.n_instances] + [v[1][1] for v in vm_info]) # type: ignore + return vm_types, n_instances class UnicastDirectPlanner(Planner): @@ -92,11 +161,14 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: assert job.dst_ifaces[0].region_tag() == dst_region_tag, "All jobs must have same destination region" plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=[dst_region_tag]) - # TODO: use VM limits to determine how many instances to create in each region + + # Dynammically calculate n_instances based on quota limits + vm_types, n_instances = self._get_vm_type_and_instances(src_region_tag=src_region_tag, dst_region_tags=[dst_region_tag]) + # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions - for i in range(self.n_instances): - plan.add_gateway(src_region_tag) - plan.add_gateway(dst_region_tag) + for i in range(n_instances): + plan.add_gateway(src_region_tag, vm_types[src_region_tag]) + plan.add_gateway(dst_region_tag, vm_types[dst_region_tag]) # ids of gateways in dst region dst_gateways = plan.get_region_gateways(dst_region_tag) @@ -116,7 +188,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id ) mux_or = src_program.add_operator(GatewayMuxOr(), parent_handle=obj_store_read, partition_id=partition_id) - for i in range(self.n_instances): + for i in range(n_instances): src_program.add_operator( GatewaySend(target_gateway_id=dst_gateways[i].gateway_id, region=src_region_tag, num_connections=self.n_connections), parent_handle=mux_or, @@ -170,14 +242,8 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags) - # Dynammically calculate n_instances based on quota limits - do_parallel returns - # tuples of (vcpus, n_instances) - vm_info = do_parallel(self._calculate_vm_types, [src_region_tag] + dst_region_tags) # type: ignore - vm_types = {v[0]: Planner._vcpus_to_vm(cloud_provider=v[0].split(":")[0], vcpus=v[1][0]) for v in vm_info} # type: ignore - - # Taking the minimum so that we can use the same number of instances for both source and destination - n_instances = min(self.n_instances, min(v[1][1] for v in vm_info)) # type: ignore - print("n_instances", n_instances) + # Dynammically calculate n_instances based on quota limits + vm_types, n_instances = self._get_vm_type_and_instances(src_region_tag=src_region_tag, dst_region_tags=dst_region_tags) # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions for i in range(n_instances): @@ -257,78 +323,6 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: plan.set_gateway_program(dst_region_tag, program) return plan - def _get_quota_limits_for(self, cloud_provider: str, region: str, spot: bool = False) -> Optional[int]: - """Gets the quota info from the saved files. Returns None if quota_info isn't loaded during `skyplane init` - or if the quota info doesn't include the region. - - :param cloud_provider: name of the cloud provider of the region - :type cloud_provider: str - :param region: name of the region for which to get the quota for - :type region: int - :param spot: whether to use spot specified by the user config (default: False) - :type spot: bool - """ - quota_limits = self.quota_limits[cloud_provider] - if not quota_limits: - # User needs to reinitialize to save the quota information - logger.warning(f"Please run `skyplane init --reinit-{cloud_provider}` to load the quota information") - return None - if cloud_provider == "gcp": - region_family = "-".join(region.split("-")[:2]) - if region_family in quota_limits: - return quota_limits[region_family] - elif cloud_provider == "azure": - if region in quota_limits: - return quota_limits[region] - elif cloud_provider == "aws": - for quota in quota_limits: - if quota["region_name"] == region: - return quota["spot_standard_vcpus"] if spot else quota["on_demand_standard_vcpus"] - return None - - def _calculate_vm_types(self, region_tag: str) -> Optional[Tuple[int, int]]: - """Calculates the largest allowed vm type according to the regional quota limit as well as - how many of these vm types can we launch to avoid QUOTA_EXCEEDED errors. Returns None if quota - information wasn't properly loaded or allowed vcpu list is wrong. - - :param region_tag: tag of the node we are calculating the above for, example -> "aws:us-east-1" - :type region_tag: str - """ - cloud_provider, region = region_tag.split(":") - - # Get the quota limit - quota_limit = self._get_quota_limits_for( - cloud_provider=cloud_provider, region=region, spot=getattr(self.transfer_config, f"{cloud_provider}_use_spot_instances") - ) - - # No quota limits (quota limits weren't initialized properly during skyplane init) - if quota_limit is None: - return None - - config_vm_type = getattr(self.transfer_config, f"{cloud_provider}_instance_class") - config_vcpus = Planner._vm_to_vcpus(cloud_provider, config_vm_type) - if config_vcpus <= quota_limit: - return config_vcpus, quota_limit // config_vcpus - - vcpus = None - for value in Planner._VCPUS: - if value <= quota_limit: - vcpus = value - break - - # shouldn't happen, but just in case we use more complicated vm types in the future - if vcpus is None: - return None - - # number of instances allowed by the quota with the selected vm type - n_instances = quota_limit // vcpus - logger.warning( - f"Falling back to instance class `{Planner._vcpus_to_vm(cloud_provider, vcpus)}` at {region_tag} " - f"due to cloud vCPU limit of {quota_limit}. You can visit https://skyplane.org/en/latest/increase_vcpus.html " - "to learn more about how to increase your cloud vCPU limits for any cloud provider." - ) - return (vcpus, n_instances) - class DirectPlannerSourceOneSided(Planner): """Planner that only creates VMs in the source region""" @@ -347,10 +341,13 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags) - # TODO: use VM limits to determine how many instances to create in each region + + # Dynammically calculate n_instances based on quota limits + vm_types, n_instances = self._get_vm_type_and_instances(src_region_tag=src_region_tag) + # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions - for i in range(self.n_instances): - plan.add_gateway(src_region_tag) + for i in range(n_instances): + plan.add_gateway(src_region_tag, vm_types[src_region_tag]) # initialize gateway programs per region src_program = GatewayProgram() @@ -410,11 +407,14 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags) - # TODO: use VM limits to determine how many instances to create in each region + + # Dynammically calculate n_instances based on quota limits + vm_types, n_instances = self._get_vm_type_and_instances(dst_region_tags=dst_region_tags) + # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions - for i in range(self.n_instances): + for i in range(n_instances): for dst_region_tag in dst_region_tags: - plan.add_gateway(dst_region_tag) + plan.add_gateway(dst_region_tag, vm_types[dst_region_tag]) # initialize gateway programs per region dst_program = {dst_region: GatewayProgram() for dst_region in dst_region_tags} diff --git a/tests/test_fall_back.py b/tests/test_fall_back.py deleted file mode 100644 index 28ee9e20f..000000000 --- a/tests/test_fall_back.py +++ /dev/null @@ -1,40 +0,0 @@ -# from typing import Dict, Tuple -# from skyplane import compute -# from skyplane.api.config import TransferConfig -# from skyplane.planner.planner import MulticastDirectPlanner, Planner -# from skyplane.utils.fn import do_parallel - -# quota_limits = { -# "aws": [ -# {"on_demand_standard_vcpus": 5, "spot_standard_vcpus": 5, "region_name": "us-east-1"}, -# ], -# "gcp": { -# "us-east1": 8, -# }, -# "azure": { -# "uaenorth": 4, -# }, -# } - - -# def test_fall_back(): -# transfer_config = TransferConfig() -# planner = MulticastDirectPlanner(n_instances=8, n_connections=100, transfer_config=transfer_config) -# planner.quota_limits = quota_limits - -# region_tags = ["aws:us-east-1", "azure:uaenorth", "gcp:us-east1"] -# test_vm_types = {"aws:us-east-1": "m5.xlarge", "azure:uaenorth": "Standard_D4_v5", "gcp:us-east1": "n2-standard-8"} -# test_n_instances = { -# "aws:us-east-1": 1, -# "azure:uaenorth": 1, -# "gcp:us-east1": 1, -# } - -# for i, src_region_tag in enumerate(region_tags): -# dst_region_tags = region_tags[:i] + region_tags[i + 1 :] -# vm_info = do_parallel(planner._calculate_vm_types, [src_region_tag] + dst_region_tags) # type: ignore -# vm_types = {v[0]: Planner._vcpus_to_vm(cloud_provider=v[0].split(":")[0], vcpus=v[1][0]) for v in vm_info} # type: ignore -# n_instances = min(v[1][1] for v in vm_info) # type: ignore - -# assert vm_types == test_vm_types, "vm types are calculated wrong" -# assert test_n_instances[src_region_tag] == n_instances diff --git a/tests/unit_nocloud/test_fall_back.py b/tests/unit_nocloud/test_fall_back.py new file mode 100644 index 000000000..17a016f65 --- /dev/null +++ b/tests/unit_nocloud/test_fall_back.py @@ -0,0 +1,88 @@ +from skyplane.api.config import TransferConfig +from skyplane.cli.cli_transfer import SkyplaneCLI +from skyplane.config import SkyplaneConfig +from skyplane.planner.planner import MulticastDirectPlanner + + +def test_fall_back_multiple_limits_and_types(): + # Regions for each provider + regions = { + "aws": "us-east-1", + "azure": "uaenorth", + "gcp": "us-east1", + } + + # Different quota limits + tests = [ + [ + { + "aws": [ + { + "on_demand_standard_vcpus": 5, + "spot_standard_vcpus": 5, + "region_name": "us-east-1", + } + ], + "gcp": {"us-east1": 8}, + "azure": {"uaenorth": 4}, + }, + { + "azure:uaenorth": "Standard_D2_v5", + "aws:us-east-1": "m5.xlarge", + "gcp:us-east1": "n2-standard-8", + }, + 1, + ], + [ + { + "aws": [ + { + "on_demand_standard_vcpus": 10, + "spot_standard_vcpus": 10, + "region_name": "us-east-1", + } + ], + "gcp": {"us-east1": 16}, + "azure": {"uaenorth": 8}, + }, + { + "azure:uaenorth": "Standard_D2_v5", + "gcp:us-east1": "n2-standard-16", + "aws:us-east-1": "m5.2xlarge", + }, + 1, + ], + [ + { + "aws": [ + { + "on_demand_standard_vcpus": 20, + "spot_standard_vcpus": 20, + "region_name": "us-east-1", + } + ], + "gcp": {"us-east1": 32}, + "azure": {"uaenorth": 16}, + }, + { + "azure:uaenorth": "Standard_D2_v5", + "gcp:us-east1": "n2-standard-16", + "aws:us-east-1": "m5.4xlarge", + }, + 1, + ], + ] + + for test in tests: + quota_limits, expected_vm_types, expected_n_instances = test + transfer_config = TransferConfig() + planner = MulticastDirectPlanner(n_instances=8, n_connections=100, transfer_config=transfer_config) + planner.quota_limits = quota_limits + + region_tags = [f"{p}:{regions[p]}" for p in regions.keys()] + for i, src_region_tag in enumerate(region_tags): + dst_region_tags = region_tags[:i] + region_tags[i + 1 :] + vm_types, n_instances = planner._get_vm_type_and_instances(src_region_tag=src_region_tag, dst_region_tags=dst_region_tags) + + assert vm_types == expected_vm_types, f"vm types are calculated wrong {vm_types}" + assert n_instances == expected_n_instances, f"n_instances are calculated wrong {n_instances}" From ac4d589cd8bb8556ec29209d776ea9c27df9b488 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=BCtfi=20Eren=20Erdo=C4=9Fan?= Date: Wed, 7 Jun 2023 20:49:20 -0700 Subject: [PATCH 05/15] Cleanup planner to take in TransferConfig and remove unused planner code --- skyplane/api/pipeline.py | 4 +- skyplane/planner/planner.py | 94 ++++++++++++++----------------------- 2 files changed, 37 insertions(+), 61 deletions(-) diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index 17adf740e..929a18832 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -70,9 +70,9 @@ def __init__( if self.planning_algorithm == "direct": self.planner = MulticastDirectPlanner(self.max_instances, 64, self.transfer_config) elif self.planning_algorithm == "src_one_sided": - self.planner = DirectPlannerSourceOneSided(self.max_instances, 64) + self.planner = DirectPlannerSourceOneSided(self.max_instances, 64, self.transfer_config) elif self.planning_algorithm == "dst_one_sided": - self.planner = DirectPlannerDestOneSided(self.max_instances, 64) + self.planner = DirectPlannerDestOneSided(self.max_instances, 64, self.transfer_config) else: raise ValueError(f"No such planning algorithm {planning_algorithm}") diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index a9c5ac235..1fa5a12f7 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -1,7 +1,6 @@ from collections import defaultdict from importlib.resources import path from typing import Dict, List, Optional, Tuple, Tuple -import re import os import csv @@ -24,12 +23,43 @@ import json from skyplane.utils.fn import do_parallel +from skyplane.config_paths import config_path, azure_standardDv5_quota_path, aws_quota_path, gcp_quota_path +from skyplane.config import SkyplaneConfig class Planner: # Only supporting "aws:m5.", "azure:StandardD_v5", and "gcp:n2-standard" instances for now _VCPUS = (96, 64, 48, 32, 16, 8, 4, 2) + def __init__(self, transfer_config: TransferConfig): + self.transfer_config = transfer_config + self.config = SkyplaneConfig.load_config(config_path) + + # Loading the quota information, add ibm cloud when it is supported + self.quota_limits = {} + if os.path.exists(aws_quota_path): + with aws_quota_path.open("r") as f: + self.quota_limits["aws"] = json.load(f) + if os.path.exists(azure_standardDv5_quota_path): + with azure_standardDv5_quota_path.open("r") as f: + self.quota_limits["azure"] = json.load(f) + if os.path.exists(gcp_quota_path): + with gcp_quota_path.open("r") as f: + self.quota_limits["gcp"] = json.load(f) + + # Loading the vcpu information - a dictionary of dictionaries + # {"cloud_provider": {"instance_name": vcpu_cost}} + self.vcpu_info = defaultdict(dict) + with path("skyplane.data", "vcpu_info.csv") as file_path: + with open(file_path, "r") as csvfile: + reader = csv.reader(csvfile) + next(reader) # Skip the header row + + for row in reader: + instance_name, cloud_provider, vcpu_cost = row + vcpu_cost = int(vcpu_cost) + self.vcpu_info[cloud_provider][instance_name] = vcpu_cost + def plan(self) -> TopologyPlan: raise NotImplementedError @@ -212,14 +242,12 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: class MulticastDirectPlanner(Planner): - n_instances: int - n_connections: int - transfer_config: TransferConfig def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig): + + super().__init__(transfer_config) self.n_instances = n_instances self.n_connections = n_connections - self.transfer_config = transfer_config # Loading the quota information, add ibm cloud when it is supported self.quota_limits = {} @@ -230,7 +258,6 @@ def __init__(self, n_instances: int, n_connections: int, transfer_config: Transf with self.transfer_config.azure_vcpu_file.open("r") as f: self.quota_limits["azure"] = json.load(f) - super().__init__() def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag = jobs[0].src_iface.region_tag() @@ -324,14 +351,9 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: return plan -class DirectPlannerSourceOneSided(Planner): +class DirectPlannerSourceOneSided(MulticastDirectPlanner): """Planner that only creates VMs in the source region""" - def __init__(self, n_instances: int, n_connections: int): - self.n_instances = n_instances - self.n_connections = n_connections - super().__init__() - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag = jobs[0].src_iface.region_tag() dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces] @@ -389,14 +411,9 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: return plan -class DirectPlannerDestOneSided(Planner): +class DirectPlannerDestOneSided(MulticastDirectPlanner): """Planner that only creates instances in the destination region""" - def __init__(self, n_instances: int, n_connections: int): - self.n_instances = n_instances - self.n_connections = n_connections - super().__init__() - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: # only create in destination region src_region_tag = jobs[0].src_iface.region_tag() @@ -455,44 +472,3 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: plan.set_gateway_program(dst_region_tag, program) return plan - -class UnicastILPPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float): - self.n_instances = n_instances - self.n_connections = n_connections - self.solver_required_throughput_gbits = required_throughput_gbits - super().__init__() - - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("ILP solver not implemented yet") - - -class MulticastILPPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float): - self.n_instances = n_instances - self.n_connections = n_connections - self.solver_required_throughput_gbits = required_throughput_gbits - super().__init__() - - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("ILP solver not implemented yet") - - -class MulticastMDSTPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int): - self.n_instances = n_instances - self.n_connections = n_connections - super().__init__() - - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("MDST solver not implemented yet") - - -class MulticastSteinerTreePlanner(Planner): - def __init__(self, n_instances: int, n_connections: int): - self.n_instances = n_instances - self.n_connections = n_connections - super().__init__() - - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("Steiner tree solver not implemented yet") From 99db23ce60bdfbbb54f76ae8d988a198ee9b9c25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=BCtfi=20Eren=20Erdo=C4=9Fan?= Date: Mon, 19 Jun 2023 18:43:44 -0700 Subject: [PATCH 06/15] Enabled Azure Multipart (#863) Edited the AzureBlobInterface class: * Wrote the logic for staging/uploading a block for a multipart upload in the method upload_object * Created two functions to 1) initiate the multipart upload and 2) complete the multipart upload * Since Azure works differently than s3 and gcs in that it doesn't provide a global upload id for a destination object, I used the destination object name instead as an upload id to stay consistent with the other object stores. This pseudo-upload id is to keep track of which blocks and their blockIDs belong to in the CopyJob/SyncJob. * Upon completion of uploading/staging all blocks, all blocks for a destination object are committed together. More things to consider about this implementation: Upload ID handling: Azure doesn't really have a concept equivalent to AWS's upload IDs. Instead, blobs are created immediately and blocks are associated with a blob via block IDs. My workaround of using the blob name as the upload ID should work since I only use upload_id to distinguish between requests in the finalize() method Block IDs: It's worth noting that Azure requires block IDs to be of the same length. I've appropriately handled this by formatting the IDs to be of length len("{number of digits in max blocks supported by Azure (50000) = 5}{destination_object_key}"). --------- Co-authored-by: Sarah Wooders --- skyplane/api/transfer_job.py | 20 ++- skyplane/obj_store/azure_blob_interface.py | 121 +++++++++++++++---- skyplane/obj_store/cos_interface.py | 4 +- skyplane/obj_store/file_system_interface.py | 4 +- skyplane/obj_store/gcs_interface.py | 4 +- skyplane/obj_store/hdfs_interface.py | 4 +- skyplane/obj_store/object_store_interface.py | 4 +- skyplane/obj_store/posix_file_interface.py | 4 +- skyplane/obj_store/s3_interface.py | 5 +- skyplane/planner/planner.py | 54 +++++---- 10 files changed, 164 insertions(+), 60 deletions(-) diff --git a/skyplane/api/transfer_job.py b/skyplane/api/transfer_job.py index 898dcad26..0d6e50526 100644 --- a/skyplane/api/transfer_job.py +++ b/skyplane/api/transfer_job.py @@ -25,7 +25,7 @@ from skyplane import exceptions from skyplane.api.config import TransferConfig from skyplane.chunk import Chunk, ChunkRequest -from skyplane.obj_store.azure_blob_interface import AzureBlobObject +from skyplane.obj_store.azure_blob_interface import AzureBlobInterface, AzureBlobObject from skyplane.obj_store.gcs_interface import GCSObject from skyplane.obj_store.r2_interface import R2Object from skyplane.obj_store.storage_interface import StorageInterface @@ -158,8 +158,15 @@ def _run_multipart_chunk_thread( region = dest_iface.region_tag() dest_object = dest_objects[region] _, upload_id = upload_id_mapping[region] + + metadata = None + # Convert parts to base64 and store mime_type if destination interface is AzureBlobInterface + if isinstance(dest_iface, AzureBlobInterface): + block_ids = list(map(lambda part_num: AzureBlobInterface.id_to_base64_encoding(part_num, dest_object.key), parts)) + metadata = (block_ids, mime_type) + self.multipart_upload_requests.append( - dict(upload_id=upload_id, key=dest_object.key, parts=parts, region=region, bucket=bucket) + dict(upload_id=upload_id, key=dest_object.key, parts=parts, region=region, bucket=bucket, metadata=metadata) ) else: mime_type = None @@ -646,6 +653,7 @@ def dispatch( assert Chunk.from_dict(chunk_batch[0].as_dict()) == chunk_batch[0], f"Invalid chunk request: {chunk_batch[0].as_dict}" # TODO: make async + st = time.time() reply = self.http_pool.request( "POST", f"{server.gateway_api_url}/api/v1/chunk_requests", @@ -654,9 +662,10 @@ def dispatch( ) if reply.status != 200: raise Exception(f"Failed to dispatch chunk requests {server.instance_name()}: {reply.data.decode('utf-8')}") + et = time.time() reply_json = json.loads(reply.data.decode("utf-8")) - logger.fs.debug(f"Added {n_added} chunks to server {server}: {reply_json}") n_added += reply_json["n_added"] + logger.fs.debug(f"Added {n_added} chunks to server {server} in {et-st}: {reply_json}") queue_size[min_idx] = reply_json["qsize"] # update queue size # dont try again with some gateway min_idx = (min_idx + 1) % len(src_gateways) @@ -685,7 +694,10 @@ def finalize(self): def complete_fn(batch): for req in batch: logger.fs.debug(f"Finalize upload id {req['upload_id']} for key {req['key']}") - retry_backoff(partial(obj_store_interface.complete_multipart_upload, req["key"], req["upload_id"]), initial_backoff=0.5) + retry_backoff( + partial(obj_store_interface.complete_multipart_upload, req["key"], req["upload_id"], req["metadata"]), + initial_backoff=0.5, + ) do_parallel(complete_fn, batches, n=8) diff --git a/skyplane/obj_store/azure_blob_interface.py b/skyplane/obj_store/azure_blob_interface.py index f46c1bfd2..04e8c3cae 100644 --- a/skyplane/obj_store/azure_blob_interface.py +++ b/skyplane/obj_store/azure_blob_interface.py @@ -3,13 +3,17 @@ import os from functools import lru_cache -from typing import Iterator, List, Optional, Tuple +from typing import Any, Iterator, List, Optional, Tuple from skyplane import exceptions, compute from skyplane.exceptions import NoSuchObjectException from skyplane.obj_store.azure_storage_account_interface import AzureStorageAccountInterface from skyplane.obj_store.object_store_interface import ObjectStoreInterface, ObjectStoreObject from skyplane.utils import logger, imports +from azure.storage.blob import ContentSettings + + +MAX_BLOCK_DIGITS = 5 class AzureBlobObject(ObjectStoreObject): @@ -149,25 +153,100 @@ def download_object( @imports.inject("azure.storage.blob", pip_extra="azure") def upload_object(azure_blob, self, src_file_path, dst_object_name, part_number=None, upload_id=None, check_md5=None, mime_type=None): - if part_number is not None or upload_id is not None: - # todo implement multipart upload - raise NotImplementedError("Multipart upload is not implemented for Azure") + """Uses the BlobClient instead of ContainerClient since BlobClient allows for + block/part level manipulation for multi-part uploads + """ src_file_path, dst_object_name = str(src_file_path), str(dst_object_name) - with open(src_file_path, "rb") as f: - print(f"Uploading {src_file_path} to {dst_object_name}") - blob_client = self.container_client.upload_blob( - name=dst_object_name, - data=f, - length=os.path.getsize(src_file_path), - max_concurrency=self.max_concurrency, - overwrite=True, - content_settings=azure_blob.ContentSettings(content_type=mime_type), - ) - if check_md5: - b64_md5sum = base64.b64encode(check_md5).decode("utf-8") if check_md5 else None - blob_md5 = blob_client.get_blob_properties().properties.content_settings.content_md5 - if b64_md5sum != blob_md5: - raise exceptions.ChecksumMismatchException( - f"Checksum mismatch for object {dst_object_name} in Azure container {self.container_name}, " - + f"expected {b64_md5sum}, got {blob_md5}" + print(f"Uploading {src_file_path} to {dst_object_name}") + + try: + blob_client = self.blob_service_client.get_blob_client(container=self.container_name, blob=dst_object_name) + + # multipart upload + if part_number is not None and upload_id is not None: + with open(src_file_path, "rb") as f: + block_id = AzureBlobInterface.id_to_base64_encoding(part_number=part_number, dest_key=dst_object_name) + blob_client.stage_block(block_id=block_id, data=f, length=os.path.getsize(src_file_path)) # stage the block + return + + # single upload + with open(src_file_path, "rb") as f: + blob_client.upload_blob( + data=f, + length=os.path.getsize(src_file_path), + max_concurrency=self.max_concurrency, + overwrite=True, + content_settings=azure_blob.ContentSettings(content_type=mime_type), ) + + # check MD5 if required + if check_md5: + b64_md5sum = base64.b64encode(check_md5).decode("utf-8") if check_md5 else None + blob_md5 = blob_client.get_blob_properties().properties.content_settings.content_md5 + if b64_md5sum != blob_md5: + raise exceptions.ChecksumMismatchException( + f"Checksum mismatch for object {dst_object_name} in Azure container {self.container_name}, " + + f"expected {b64_md5sum}, got {blob_md5}" + ) + except Exception as e: + raise ValueError(f"Failed to upload {dst_object_name} to bucket {self.container_name} upload id {upload_id}: {e}") + + def initiate_multipart_upload(self, dst_object_name: str, mime_type: Optional[str] = None) -> str: + """Azure does not have an equivalent function to return an upload ID like s3 and gcs do. + Blocks in Azure are uploaded and associated with an ID, and can then be committed in a single operation to create the blob. + We will just return the dst_object_name (blob name) as the "upload_id" to keep the return type consistent for the multipart thread. + + :param dst_object_name: name of the destination object, also our psuedo-uploadID + :type dst_object_name: str + :param mime_type: unused in this function but is kept for consistency with the other interfaces (default: None) + :type mime_type: str + """ + + assert len(dst_object_name) > 0, f"Destination object name must be non-empty: '{dst_object_name}'" + + return dst_object_name + + @imports.inject("azure.storage.blob", pip_extra="azure") + def complete_multipart_upload(azure_blob, self, dst_object_name: str, upload_id: str, metadata: Optional[Any] = None) -> None: + """After all blocks of a blob are uploaded/staged with their unique block_id, + in order to complete the multipart upload, we commit them together. + + :param dst_object_name: name of the destination object, also is used to index into our block mappings + :type dst_object_name: str + :param upload_id: upload_id to index into our block id mappings, should be the same as the dst_object_name in Azure + :type upload_id: str + :param metadata: In Azure, this custom data is the blockID list (parts) and the object mime_type from the TransferJob instance (default: None) + :type metadata: Optional[Any] + """ + + assert upload_id == dst_object_name, "In Azure, upload_id should be the same as the blob name." + assert metadata is not None, "In Azure, the custom data should exist for multipart" + + # Decouple the custom data + block_list, mime_type = metadata + assert block_list != [], "The blockID list shouldn't be empty for Azure multipart" + block_list = list(map(lambda block_id: azure_blob.BlobBlock(block_id=block_id), block_list)) + + blob_client = self.blob_service_client.get_blob_client(container=self.container_name, blob=dst_object_name) + try: + # The below operation will create the blob from the uploaded blocks. + blob_client.commit_block_list(block_list=block_list, content_settings=azure_blob.ContentSettings(content_type=mime_type)) + except Exception as e: + raise exceptions.SkyplaneException(f"Failed to complete multipart upload for {dst_object_name}: {str(e)}") + + @staticmethod + def id_to_base64_encoding(part_number: int, dest_key: str) -> str: + """Azure expects all blockIDs to be Base64 strings. This function serves to convert the part numbers to + base64-encoded strings of the same length. The maximum number of blocks one blob supports in Azure is + 50,000 blocks, so the maximum length to pad zeroes to will be (#digits in 50,000 = len("50000") = 5) + len(dest_key) + + :param part_number: part number of the block, determined while splitting the date into chunks before the transfer + :type part_number: int + :param dest_key: destination object key, used to distinguish between different objects during concurrent uploads to the same container + """ + max_length = MAX_BLOCK_DIGITS + len(dest_key) + block_id = f"{part_number}{dest_key}" + block_id = block_id.ljust(max_length, "0") # pad with zeroes to get consistent length + block_id = block_id.encode("utf-8") + block_id = base64.b64encode(block_id).decode("utf-8") + return block_id diff --git a/skyplane/obj_store/cos_interface.py b/skyplane/obj_store/cos_interface.py index 9f9745cfb..e37d69c7d 100644 --- a/skyplane/obj_store/cos_interface.py +++ b/skyplane/obj_store/cos_interface.py @@ -2,7 +2,7 @@ import hashlib import os from functools import lru_cache -from typing import Iterator, List, Optional, Tuple +from typing import Any, Iterator, List, Optional, Tuple from skyplane import exceptions @@ -217,7 +217,7 @@ def initiate_multipart_upload(self, dst_object_name: str, mime_type: Optional[st else: raise exceptions.SkyplaneException(f"Failed to initiate multipart upload for {dst_object_name}: {response}") - def complete_multipart_upload(self, dst_object_name, upload_id): + def complete_multipart_upload(self, dst_object_name, upload_id, metadata: Optional[Any] = None): print("complete multipart upload") cos_client = self._cos_client() all_parts = [] diff --git a/skyplane/obj_store/file_system_interface.py b/skyplane/obj_store/file_system_interface.py index 6f59ec67e..e04c8530c 100644 --- a/skyplane/obj_store/file_system_interface.py +++ b/skyplane/obj_store/file_system_interface.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Iterator, List, Optional +from typing import Any, Iterator, List, Optional from skyplane.obj_store.storage_interface import StorageInterface import os @@ -53,7 +53,7 @@ def delete_files(self, paths: List[str]): def initiate_multipart_upload(self, dst_object_name: str) -> str: raise ValueError("Multipart uploads not supported") - def complete_multipart_upload(self, dst_object_name: str, upload_id: str) -> None: + def complete_multipart_upload(self, dst_object_name: str, upload_id: str, metadata: Optional[Any] = None) -> None: raise ValueError("Multipart uploads not supported") @staticmethod diff --git a/skyplane/obj_store/gcs_interface.py b/skyplane/obj_store/gcs_interface.py index 6ce6efb71..14bc565c0 100644 --- a/skyplane/obj_store/gcs_interface.py +++ b/skyplane/obj_store/gcs_interface.py @@ -6,7 +6,7 @@ from xml.etree import ElementTree import requests -from typing import Iterator, List, Optional, Tuple +from typing import Any, Iterator, List, Optional, Tuple from skyplane import exceptions, compute from skyplane.config_paths import cloud_config @@ -254,7 +254,7 @@ def initiate_multipart_upload(self, dst_object_name: str, mime_type: Optional[st response = self.send_xml_request(dst_object_name, {"uploads": None}, "POST", content_type=mime_type) return ElementTree.fromstring(response.content)[2].text - def complete_multipart_upload(self, dst_object_name, upload_id): + def complete_multipart_upload(self, dst_object_name, upload_id, metadata: Optional[Any] = None): # get parts xml_data = ElementTree.Element("CompleteMultipartUpload") next_part_number_marker = None diff --git a/skyplane/obj_store/hdfs_interface.py b/skyplane/obj_store/hdfs_interface.py index 0437994a5..45fe2a940 100644 --- a/skyplane/obj_store/hdfs_interface.py +++ b/skyplane/obj_store/hdfs_interface.py @@ -3,7 +3,7 @@ import os from pyarrow import fs from dataclasses import dataclass -from typing import Iterator, List, Optional +from typing import Any, Iterator, List, Optional from skyplane.exceptions import NoSuchObjectException from skyplane.obj_store.object_store_interface import ObjectStoreInterface, ObjectStoreObject from skyplane.utils import logger @@ -150,7 +150,7 @@ def write_file(self, file_name, data, offset=0): def initiate_multipart_upload(self, dst_object_name: str, mime_type: Optional[str] = None) -> str: raise NotImplementedError(f"Multipart upload is not supported for the POSIX file system.") - def complete_multipart_upload(self, dst_object_name: str, upload_id: str) -> None: + def complete_multipart_upload(self, dst_object_name: str, upload_id: str, metadata: Optional[Any] = None) -> None: raise NotImplementedError(f"Multipart upload is not supported for the POSIX file system.") @lru_cache(maxsize=1024) diff --git a/skyplane/obj_store/object_store_interface.py b/skyplane/obj_store/object_store_interface.py index 99b52501e..cf8d7260b 100644 --- a/skyplane/obj_store/object_store_interface.py +++ b/skyplane/obj_store/object_store_interface.py @@ -1,6 +1,6 @@ from dataclasses import dataclass -from typing import Iterator, List, Optional, Tuple +from typing import Any, Iterator, List, Optional, Tuple from skyplane.obj_store.storage_interface import StorageInterface from skyplane.utils import logger @@ -82,5 +82,5 @@ def delete_objects(self, keys: List[str]): def initiate_multipart_upload(self, dst_object_name: str, mime_type: Optional[str] = None) -> str: raise ValueError("Multipart uploads not supported") - def complete_multipart_upload(self, dst_object_name: str, upload_id: str) -> None: + def complete_multipart_upload(self, dst_object_name: str, upload_id: str, metadata: Optional[Any] = None) -> None: raise ValueError("Multipart uploads not supported") diff --git a/skyplane/obj_store/posix_file_interface.py b/skyplane/obj_store/posix_file_interface.py index c5a3656dd..1b31fcf3a 100644 --- a/skyplane/obj_store/posix_file_interface.py +++ b/skyplane/obj_store/posix_file_interface.py @@ -2,7 +2,7 @@ import os import sys from dataclasses import dataclass -from typing import Iterator, List, Optional +from typing import Any, Iterator, List, Optional from skyplane.exceptions import NoSuchObjectException from skyplane.obj_store.object_store_interface import ObjectStoreInterface, ObjectStoreObject import mimetypes @@ -139,7 +139,7 @@ def write_file(self, file_name, data, offset=0): def initiate_multipart_upload(self, dst_object_name: str, mime_type: Optional[str] = None) -> str: raise NotImplementedError(f"Multipart upload is not supported for the POSIX file system.") - def complete_multipart_upload(self, dst_object_name: str, upload_id: str) -> None: + def complete_multipart_upload(self, dst_object_name: str, upload_id: str, metadata: Optional[Any] = None) -> None: raise NotImplementedError(f"Multipart upload is not supported for the POSIX file system.") @lru_cache(maxsize=1024) diff --git a/skyplane/obj_store/s3_interface.py b/skyplane/obj_store/s3_interface.py index 535d2afb7..430f7c98e 100644 --- a/skyplane/obj_store/s3_interface.py +++ b/skyplane/obj_store/s3_interface.py @@ -3,7 +3,7 @@ import os from functools import lru_cache -from typing import Iterator, List, Optional, Tuple +from typing import Any, Iterator, List, Optional, Tuple from skyplane import exceptions, compute from skyplane.exceptions import NoSuchObjectException @@ -43,6 +43,7 @@ def aws_region(self): logger.warning(f"Bucket location {self.bucket_name} is not public. Assuming region is {default_region}") return default_region logger.warning(f"Specified bucket {self.bucket_name} does not exist, got AWS error: {e}") + print("Error getting AWS region", e) raise exceptions.MissingBucketException(f"S3 bucket {self.bucket_name} does not exist") from e def region_tag(self): @@ -227,7 +228,7 @@ def initiate_multipart_upload(self, dst_object_name: str, mime_type: Optional[st else: raise exceptions.SkyplaneException(f"Failed to initiate multipart upload for {dst_object_name}: {response}") - def complete_multipart_upload(self, dst_object_name, upload_id): + def complete_multipart_upload(self, dst_object_name, upload_id, metadata: Optional[Any] = None): s3_client = self._s3_client() all_parts = [] while True: diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 1fa5a12f7..3ea1f743a 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -17,6 +17,8 @@ GatewayWriteObjectStore, GatewayReceive, GatewaySend, + GatewayGenData, + GatewayWriteLocal, ) from skyplane.api.transfer_job import TransferJob @@ -28,9 +30,6 @@ class Planner: - # Only supporting "aws:m5.", "azure:StandardD_v5", and "gcp:n2-standard" instances for now - _VCPUS = (96, 64, 48, 32, 16, 8, 4, 2) - def __init__(self, transfer_config: TransferConfig): self.transfer_config = transfer_config self.config = SkyplaneConfig.load_config(config_path) @@ -173,10 +172,10 @@ def _get_vm_type_and_instances( class UnicastDirectPlanner(Planner): # DO NOT USE THIS - broken for single-region transfers - def __init__(self, n_instances: int, n_connections: int): + def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig): self.n_instances = n_instances self.n_connections = n_connections - super().__init__() + super().__init__(transfer_config) def plan(self, jobs: List[TransferJob]) -> TopologyPlan: # make sure only single destination @@ -242,26 +241,15 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: class MulticastDirectPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig): - - super().__init__(transfer_config) self.n_instances = n_instances self.n_connections = n_connections - - # Loading the quota information, add ibm cloud when it is supported - self.quota_limits = {} - with self.transfer_config.aws_vcpu_file.open("r") as f: - self.quota_limits["aws"] = json.load(f) - with self.transfer_config.gcp_vcpu_file.open("r") as f: - self.quota_limits["gcp"] = json.load(f) - with self.transfer_config.azure_vcpu_file.open("r") as f: - self.quota_limits["azure"] = json.load(f) - + super().__init__(transfer_config) def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag = jobs[0].src_iface.region_tag() dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces] + # jobs must have same sources and destinations for job in jobs[1:]: assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" @@ -270,13 +258,17 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags) # Dynammically calculate n_instances based on quota limits - vm_types, n_instances = self._get_vm_type_and_instances(src_region_tag=src_region_tag, dst_region_tags=dst_region_tags) + if src_region_tag.split(":")[0] == "test": + vm_types = None + n_instances = self.n_instances + else: + vm_types, n_instances = self._get_vm_type_and_instances(src_region_tag=src_region_tag, dst_region_tags=dst_region_tags) # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions for i in range(n_instances): - plan.add_gateway(src_region_tag, vm_types[src_region_tag]) + plan.add_gateway(src_region_tag, vm_types[src_region_tag] if vm_types else None) for dst_region_tag in dst_region_tags: - plan.add_gateway(dst_region_tag, vm_types[dst_region_tag]) + plan.add_gateway(dst_region_tag, vm_types[dst_region_tag] if vm_types else None) # initialize gateway programs per region dst_program = {dst_region: GatewayProgram() for dst_region in dst_region_tags} @@ -295,6 +287,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: obj_store_read = src_program.add_operator( GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id ) + # send to all destination mux_and = src_program.add_operator(GatewayMuxAnd(), parent_handle=obj_store_read, partition_id=partition_id) dst_prefixes = job.dst_prefixes @@ -472,3 +465,22 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: plan.set_gateway_program(dst_region_tag, program) return plan + +class UnicastILPPlanner(Planner): + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + raise NotImplementedError("ILP solver not implemented yet") + + +class MulticastILPPlanner(Planner): + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + raise NotImplementedError("ILP solver not implemented yet") + + +class MulticastMDSTPlanner(Planner): + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + raise NotImplementedError("MDST solver not implemented yet") + + +class MulticastSteinerTreePlanner(Planner): + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + raise NotImplementedError("Steiner tree solver not implemented yet") From ed52e189726f6838710645580e4c96102feebc93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=BCtfi=20Eren=20Erdo=C4=9Fan?= Date: Mon, 19 Jun 2023 18:57:00 -0700 Subject: [PATCH 07/15] added more fallback tests + using a file (#873) * Modified the tests so that they load from an actual quota file instead of me defining a dictionary. * Modified planner so that it can accept a file name for the quota limits (default to the skyplane config quota files) * Added more tests for error conditions (no quota file is provided + quota file is provided but the requested region is not included in the quota file) --------- Co-authored-by: Sarah Wooders Co-authored-by: Asim Biswal --- skyplane/planner/planner.py | 76 +++++---- tests/unit_nocloud/test_fall_back.py | 225 ++++++++++++++++++++++++--- 2 files changed, 250 insertions(+), 51 deletions(-) diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 3ea1f743a..e8a41eb79 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -17,8 +17,6 @@ GatewayWriteObjectStore, GatewayReceive, GatewaySend, - GatewayGenData, - GatewayWriteLocal, ) from skyplane.api.transfer_job import TransferJob @@ -30,21 +28,27 @@ class Planner: - def __init__(self, transfer_config: TransferConfig): + def __init__(self, transfer_config: TransferConfig, quota_limits_file: Optional[str] = None): self.transfer_config = transfer_config self.config = SkyplaneConfig.load_config(config_path) + self.n_instances = self.config.get_flag("max_instances") # Loading the quota information, add ibm cloud when it is supported - self.quota_limits = {} - if os.path.exists(aws_quota_path): - with aws_quota_path.open("r") as f: - self.quota_limits["aws"] = json.load(f) - if os.path.exists(azure_standardDv5_quota_path): - with azure_standardDv5_quota_path.open("r") as f: - self.quota_limits["azure"] = json.load(f) - if os.path.exists(gcp_quota_path): - with gcp_quota_path.open("r") as f: - self.quota_limits["gcp"] = json.load(f) + quota_limits = {} + if quota_limits_file is not None: + with open(quota_limits_file, "r") as f: + quota_limits = json.load(f) + else: + if os.path.exists(aws_quota_path): + with aws_quota_path.open("r") as f: + quota_limits["aws"] = json.load(f) + if os.path.exists(azure_standardDv5_quota_path): + with azure_standardDv5_quota_path.open("r") as f: + quota_limits["azure"] = json.load(f) + if os.path.exists(gcp_quota_path): + with gcp_quota_path.open("r") as f: + quota_limits["gcp"] = json.load(f) + self.quota_limits = quota_limits # Loading the vcpu information - a dictionary of dictionaries # {"cloud_provider": {"instance_name": vcpu_cost}} @@ -83,10 +87,9 @@ def _get_quota_limits_for(self, cloud_provider: str, region: str, spot: bool = F :param spot: whether to use spot specified by the user config (default: False) :type spot: bool """ - quota_limits = self.quota_limits[cloud_provider] + quota_limits = self.quota_limits.get(cloud_provider, None) if not quota_limits: # User needs to reinitialize to save the quota information - logger.warning(f"Please run `skyplane init --reinit-{cloud_provider}` to load the quota information") return None if cloud_provider == "gcp": region_family = "-".join(region.split("-")[:2]) @@ -116,11 +119,16 @@ def _calculate_vm_types(self, region_tag: str) -> Optional[Tuple[str, int]]: cloud_provider=cloud_provider, region=region, spot=getattr(self.transfer_config, f"{cloud_provider}_use_spot_instances") ) + config_vm_type = getattr(self.transfer_config, f"{cloud_provider}_instance_class") + # No quota limits (quota limits weren't initialized properly during skyplane init) if quota_limit is None: - return None + logger.warning( + f"Quota limit file not found for {region_tag}. Try running `skyplane init --reinit-{cloud_provider}` to load the quota information" + ) + # return default instance type and number of instances + return config_vm_type, self.n_instances - config_vm_type = getattr(self.transfer_config, f"{cloud_provider}_instance_class") config_vcpus = self._vm_to_vcpus(cloud_provider, config_vm_type) if config_vcpus <= quota_limit: return config_vm_type, quota_limit // config_vcpus @@ -144,9 +152,7 @@ def _calculate_vm_types(self, region_tag: str) -> Optional[Tuple[str, int]]: ) return (vm_type, n_instances) - def _get_vm_type_and_instances( - self, src_region_tag: Optional[str] = None, dst_region_tags: Optional[List[str]] = None - ) -> Tuple[Dict[str, str], int]: + def _get_vm_type_and_instances(self, src_region_tag: str, dst_region_tags: List[str]) -> Tuple[Dict[str, str], int]: """Dynamically calculates the vm type each region can use (both the source region and all destination regions) based on their quota limits and calculates the number of vms to launch in all regions by conservatively taking the minimum of all regions to stay consistent. @@ -156,10 +162,16 @@ def _get_vm_type_and_instances( :param dst_region_tags: a list of the destination region tags (defualt: None) :type dst_region_tags: Optional[List[str]] """ + # One of them has to provided - assert src_region_tag is not None or dst_region_tags is not None, "There needs to be at least one source or destination" - src_tags = [src_region_tag] if src_region_tag is not None else [] - dst_tags = dst_region_tags or [] + # assert src_region_tag is not None or dst_region_tags is not None, "There needs to be at least one source or destination" + src_tags = [src_region_tag] # if src_region_tag is not None else [] + dst_tags = dst_region_tags # or [] + + assert len(src_region_tag.split(":")) == 2, f"Source region tag {src_region_tag} must be in the form of `cloud_provider:region`" + assert ( + len(dst_region_tags[0].split(":")) == 2 + ), f"Destination region tag {dst_region_tags} must be in the form of `cloud_provider:region`" # do_parallel returns tuples of (region_tag, (vm_type, n_instances)) vm_info = do_parallel(self._calculate_vm_types, src_tags + dst_tags) @@ -172,10 +184,10 @@ def _get_vm_type_and_instances( class UnicastDirectPlanner(Planner): # DO NOT USE THIS - broken for single-region transfers - def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig): + def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig, quota_limits_file: Optional[str] = None): + super().__init__(transfer_config, quota_limits_file) self.n_instances = n_instances self.n_connections = n_connections - super().__init__(transfer_config) def plan(self, jobs: List[TransferJob]) -> TopologyPlan: # make sure only single destination @@ -184,6 +196,12 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag = jobs[0].src_iface.region_tag() dst_region_tag = jobs[0].dst_ifaces[0].region_tag() + + assert len(src_region_tag.split(":")) == 2, f"Source region tag {src_region_tag} must be in the form of `cloud_provider:region`" + assert ( + len(dst_region_tag.split(":")) == 2 + ), f"Destination region tag {dst_region_tag} must be in the form of `cloud_provider:region`" + # jobs must have same sources and destinations for job in jobs[1:]: assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" @@ -241,10 +259,10 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: class MulticastDirectPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig): + def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig, quota_limits_file: Optional[str] = None): + super().__init__(transfer_config, quota_limits_file) self.n_instances = n_instances self.n_connections = n_connections - super().__init__(transfer_config) def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag = jobs[0].src_iface.region_tag() @@ -358,7 +376,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags) # Dynammically calculate n_instances based on quota limits - vm_types, n_instances = self._get_vm_type_and_instances(src_region_tag=src_region_tag) + vm_types, n_instances = self._get_vm_type_and_instances(src_region_tag, dst_region_tags) # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions for i in range(n_instances): @@ -419,7 +437,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags) # Dynammically calculate n_instances based on quota limits - vm_types, n_instances = self._get_vm_type_and_instances(dst_region_tags=dst_region_tags) + vm_types, n_instances = self._get_vm_type_and_instances(src_region_tag, dst_region_tags) # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions for i in range(n_instances): diff --git a/tests/unit_nocloud/test_fall_back.py b/tests/unit_nocloud/test_fall_back.py index 17a016f65..a5dc17294 100644 --- a/tests/unit_nocloud/test_fall_back.py +++ b/tests/unit_nocloud/test_fall_back.py @@ -1,17 +1,210 @@ +import json +import os + +import pytest from skyplane.api.config import TransferConfig -from skyplane.cli.cli_transfer import SkyplaneCLI -from skyplane.config import SkyplaneConfig from skyplane.planner.planner import MulticastDirectPlanner -def test_fall_back_multiple_limits_and_types(): - # Regions for each provider - regions = { - "aws": "us-east-1", - "azure": "uaenorth", - "gcp": "us-east1", - } +REGIONS = { + "aws": "us-east-1", + "azure": "uaenorth", + "gcp": "us-east1", +} +QUOTA_FILE = "test_quota_file" + + +@pytest.mark.skip(reason="Shared function") +def run_quota_tests(tests): + for test in tests: + quota_limits, expected_vm_types, expected_n_instances = test + + # Overwrite the test quota file - also creates one + with open(QUOTA_FILE, "w") as f: + f.write(json.dumps(quota_limits, indent=2)) + + transfer_config = TransferConfig() + planner = MulticastDirectPlanner(n_instances=8, n_connections=100, transfer_config=transfer_config, quota_limits_file=QUOTA_FILE) + + region_tags = [f"{p}:{REGIONS[p]}" for p in REGIONS.keys()] + for i, src_region_tag in enumerate(region_tags): + dst_region_tags = region_tags[:i] + region_tags[i + 1 :] + vm_types, n_instances = planner._get_vm_type_and_instances(src_region_tag=src_region_tag, dst_region_tags=dst_region_tags) + + assert vm_types == expected_vm_types, f"vm types are calculated wrong - expected: {expected_vm_types}, calculated: {vm_types}" + assert ( + n_instances == expected_n_instances + ), f"n_instances are calculated wrong - expected: {expected_n_instances}, calculated: {n_instances}" + + if os.path.exists(QUOTA_FILE): + # Delete the temporary quota file + os.remove(QUOTA_FILE) + + +def test_fall_back_no_quota_limit_exists(): + # 1. Test for when the quota limit file doesn't exist + no_quota_tests = [ + # No AWS + [ + { + "gcp": {"us-east1": 8}, + "azure": {"uaenorth": 4}, + }, + { + "azure:uaenorth": "Standard_D2_v5", + "aws:us-east-1": "m5.8xlarge", + "gcp:us-east1": "n2-standard-8", + }, + 1, + ], + # No GCP + [ + { + "aws": [ + { + "on_demand_standard_vcpus": 10, + "spot_standard_vcpus": 10, + "region_name": "us-east-1", + } + ], + "azure": {"uaenorth": 8}, + }, + { + "azure:uaenorth": "Standard_D2_v5", + "gcp:us-east1": "n2-standard-16", + "aws:us-east-1": "m5.2xlarge", + }, + 1, + ], + # No Azure + [ + { + "aws": [ + { + "on_demand_standard_vcpus": 20, + "spot_standard_vcpus": 20, + "region_name": "us-east-1", + } + ], + "gcp": {"us-east1": 32}, + }, + { + "azure:uaenorth": "Standard_D2_v5", + "gcp:us-east1": "n2-standard-16", + "aws:us-east-1": "m5.4xlarge", + }, + 1, + ], + # Only AWS + [ + { + "aws": [ + { + "on_demand_standard_vcpus": 10, + "spot_standard_vcpus": 10, + "region_name": "us-east-1", + } + ], + }, + { + "azure:uaenorth": "Standard_D2_v5", + "gcp:us-east1": "n2-standard-16", + "aws:us-east-1": "m5.2xlarge", + }, + 1, + ], + # Only GCP + [ + { + "gcp": {"us-east1": 32}, + }, + { + "azure:uaenorth": "Standard_D2_v5", + "gcp:us-east1": "n2-standard-16", + "aws:us-east-1": "m5.8xlarge", + }, + 2, + ], + # Only Azure + [ + { + "azure": {"uaenorth": 8}, + }, + { + "azure:uaenorth": "Standard_D2_v5", + "gcp:us-east1": "n2-standard-16", + "aws:us-east-1": "m5.8xlarge", + }, + 4, + ], + ] + # 2. Test for when the quota limit file exists but doesn't include a region + no_region_tests = [ + [ + { + "aws": [ + { + "on_demand_standard_vcpus": 5, + "spot_standard_vcpus": 5, + "region_name": "us-west-1", # us-east-1 doesn't exist + } + ], + "gcp": {"us-east1": 8}, + "azure": {"uaenorth": 4}, + }, + { + "azure:uaenorth": "Standard_D2_v5", + "aws:us-east-1": "m5.8xlarge", + "gcp:us-east1": "n2-standard-8", + }, + 1, + ], + [ + { + "aws": [ + { + "on_demand_standard_vcpus": 10, + "spot_standard_vcpus": 10, + "region_name": "us-east-1", + } + ], + "gcp": {"us-west1": 16}, # us-east1 doesn't exist + "azure": {"uaenorth": 8}, + }, + { + "azure:uaenorth": "Standard_D2_v5", + "gcp:us-east1": "n2-standard-16", + "aws:us-east-1": "m5.2xlarge", + }, + 1, + ], + [ + { + "aws": [ + { + "on_demand_standard_vcpus": 20, + "spot_standard_vcpus": 20, + "region_name": "us-east-1", + } + ], + "gcp": {"us-east1": 32}, + "azure": {"westindia": 16}, # uaenorth doesn't exist + }, + { + "azure:uaenorth": "Standard_D2_v5", + "gcp:us-east1": "n2-standard-16", + "aws:us-east-1": "m5.4xlarge", + }, + 1, + ], + ] + + for test in (no_quota_tests, no_region_tests): + run_quota_tests(test) + + +def test_fall_back_multiple_limits_and_types(): # Different quota limits tests = [ [ @@ -73,16 +266,4 @@ def test_fall_back_multiple_limits_and_types(): ], ] - for test in tests: - quota_limits, expected_vm_types, expected_n_instances = test - transfer_config = TransferConfig() - planner = MulticastDirectPlanner(n_instances=8, n_connections=100, transfer_config=transfer_config) - planner.quota_limits = quota_limits - - region_tags = [f"{p}:{regions[p]}" for p in regions.keys()] - for i, src_region_tag in enumerate(region_tags): - dst_region_tags = region_tags[:i] + region_tags[i + 1 :] - vm_types, n_instances = planner._get_vm_type_and_instances(src_region_tag=src_region_tag, dst_region_tags=dst_region_tags) - - assert vm_types == expected_vm_types, f"vm types are calculated wrong {vm_types}" - assert n_instances == expected_n_instances, f"n_instances are calculated wrong {n_instances}" + run_quota_tests(tests) From 4e68500e16d87bd6e6831292a2232cadb5e85f41 Mon Sep 17 00:00:00 2001 From: Shu Liu Date: Tue, 20 Jun 2023 20:54:17 -0700 Subject: [PATCH 08/15] add compression and encryption --- skyplane/api/dataplane.py | 2 +- skyplane/api/pipeline.py | 8 +++-- skyplane/chunk.py | 25 +++++++++---- skyplane/gateway/gateway_daemon.py | 12 ++++--- .../gateway/operators/gateway_operator.py | 36 +++++++++++-------- .../gateway/operators/gateway_receiver.py | 29 +++++++++++---- skyplane/planner/planner.py | 36 +++++++++++-------- 7 files changed, 97 insertions(+), 51 deletions(-) diff --git a/skyplane/api/dataplane.py b/skyplane/api/dataplane.py index b09f0268a..613bd3e1e 100644 --- a/skyplane/api/dataplane.py +++ b/skyplane/api/dataplane.py @@ -117,7 +117,7 @@ def _start_gateway( gateway_docker_image=gateway_docker_image, gateway_program_path=str(gateway_program_filename), gateway_info_path=f"{gateway_log_dir}/gateway_info.json", - e2ee_key_bytes=None, # TODO: remove + e2ee_key_bytes=e2ee_key_bytes, # TODO: remove use_bbr=self.transfer_config.use_bbr, # TODO: remove use_compression=self.transfer_config.use_compression, use_socket_tls=self.transfer_config.use_socket_tls, diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index 929a18832..acd59a6e8 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -39,6 +39,7 @@ def __init__( transfer_config: TransferConfig, # cloud_regions: dict, max_instances: Optional[int] = 1, + n_connections: Optional[int] = 128, planning_algorithm: Optional[str] = "direct", debug: Optional[bool] = False, ): @@ -54,6 +55,7 @@ def __init__( # self.cloud_regions = cloud_regions # TODO: set max instances with VM CPU limits and/or config self.max_instances = max_instances + self.n_connections = n_connections self.provisioner = provisioner self.transfer_config = transfer_config self.http_pool = urllib3.PoolManager(retries=urllib3.Retry(total=3)) @@ -68,11 +70,11 @@ def __init__( # planner self.planning_algorithm = planning_algorithm if self.planning_algorithm == "direct": - self.planner = MulticastDirectPlanner(self.max_instances, 64, self.transfer_config) + self.planner = MulticastDirectPlanner(self.max_instances, self.n_connections, self.transfer_config) elif self.planning_algorithm == "src_one_sided": - self.planner = DirectPlannerSourceOneSided(self.max_instances, 64, self.transfer_config) + self.planner = DirectPlannerSourceOneSided(self.max_instances, self.n_connections, self.transfer_config) elif self.planning_algorithm == "dst_one_sided": - self.planner = DirectPlannerDestOneSided(self.max_instances, 64, self.transfer_config) + self.planner = DirectPlannerDestOneSided(self.max_instances, self.n_connections, self.transfer_config) else: raise ValueError(f"No such planning algorithm {planning_algorithm}") diff --git a/skyplane/chunk.py b/skyplane/chunk.py index 8f84a3808..8b425a1a1 100644 --- a/skyplane/chunk.py +++ b/skyplane/chunk.py @@ -26,9 +26,13 @@ class Chunk: part_number: Optional[int] = None upload_id: Optional[str] = None # TODO: for broadcast, this is not used - def to_wire_header(self, n_chunks_left_on_socket: int, wire_length: int, is_compressed: bool = False): + def to_wire_header(self, n_chunks_left_on_socket: int, wire_length: int, raw_wire_length: int, is_compressed: bool = False): return WireProtocolHeader( - chunk_id=self.chunk_id, data_len=wire_length, is_compressed=is_compressed, n_chunks_left_on_socket=n_chunks_left_on_socket + chunk_id=self.chunk_id, + data_len=wire_length, + raw_data_len=raw_wire_length, + is_compressed=is_compressed, + n_chunks_left_on_socket=n_chunks_left_on_socket, ) def as_dict(self): @@ -94,6 +98,7 @@ class WireProtocolHeader: chunk_id: str # 128bit UUID data_len: int # long + raw_data_len: int # long (uncompressed, unecrypted) is_compressed: bool # char n_chunks_left_on_socket: int # long @@ -110,8 +115,8 @@ def protocol_version(): @staticmethod def length_bytes(): - # magic (8) + protocol_version (4) + chunk_id (16) + data_len (8) + is_compressed (1) + n_chunks_left_on_socket (8) - return 8 + 4 + 16 + 8 + 1 + 8 + # magic (8) + protocol_version (4) + chunk_id (16) + data_len (8) + raw_data_len(8) + is_compressed (1) + n_chunks_left_on_socket (8) + return 8 + 4 + 16 + 8 + 8 + 1 + 8 @staticmethod def from_bytes(data: bytes): @@ -124,10 +129,15 @@ def from_bytes(data: bytes): raise ValueError(f"Invalid protocol version, got {version} but expected {WireProtocolHeader.protocol_version()}") chunk_id = data[12:28].hex() chunk_len = int.from_bytes(data[28:36], byteorder="big") - is_compressed = bool(int.from_bytes(data[36:37], byteorder="big")) - n_chunks_left_on_socket = int.from_bytes(data[37:45], byteorder="big") + raw_chunk_len = int.from_bytes(data[36:44], byteorder="big") + is_compressed = bool(int.from_bytes(data[44:45], byteorder="big")) + n_chunks_left_on_socket = int.from_bytes(data[45:53], byteorder="big") return WireProtocolHeader( - chunk_id=chunk_id, data_len=chunk_len, is_compressed=is_compressed, n_chunks_left_on_socket=n_chunks_left_on_socket + chunk_id=chunk_id, + data_len=chunk_len, + raw_data_len=raw_chunk_len, + is_compressed=is_compressed, + n_chunks_left_on_socket=n_chunks_left_on_socket, ) def to_bytes(self): @@ -138,6 +148,7 @@ def to_bytes(self): assert len(chunk_id_bytes) == 16 out_bytes += chunk_id_bytes out_bytes += self.data_len.to_bytes(8, byteorder="big") + out_bytes += self.raw_data_len.to_bytes(8, byteorder="big") out_bytes += self.is_compressed.to_bytes(1, byteorder="big") out_bytes += self.n_chunks_left_on_socket.to_bytes(8, byteorder="big") assert len(out_bytes) == WireProtocolHeader.length_bytes(), f"{len(out_bytes)} != {WireProtocolHeader.length_bytes()}" diff --git a/skyplane/gateway/gateway_daemon.py b/skyplane/gateway/gateway_daemon.py index f4b34bd4c..bfde83887 100644 --- a/skyplane/gateway/gateway_daemon.py +++ b/skyplane/gateway/gateway_daemon.py @@ -36,9 +36,10 @@ def __init__( self, region: str, chunk_dir: PathLike, - max_incoming_ports=64, + max_incoming_ports=128, use_tls=True, - use_e2ee=False, + use_e2ee=True, # TODO: read from operator field + use_compression=True, # TODO: read from operator field ): # read gateway program gateway_program_path = Path(os.environ["GATEWAY_PROGRAM_FILE"]).expanduser() @@ -71,6 +72,7 @@ def __init__( e2ee_key_path = Path(os.environ["E2EE_KEY_FILE"]).expanduser() with open(e2ee_key_path, "rb") as f: self.e2ee_key_bytes = f.read() + print("Server side E2EE key loaded: ", self.e2ee_key_bytes) else: self.e2ee_key_bytes = None @@ -88,7 +90,7 @@ def __init__( error_queue=self.error_queue, max_pending_chunks=max_incoming_ports, use_tls=self.use_tls, - use_compression=False, # use_compression, + use_compression=use_compression, e2ee_key_bytes=self.e2ee_key_bytes, ) @@ -184,7 +186,7 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_ region=self.region, input_queue=input_queue, output_queue=output_queue, - n_processes=1, # dummy wait thread, not actual reciever + n_processes=1, # dummy wait thread, not actual receiver chunk_store=self.chunk_store, error_event=self.error_event, error_queue=self.error_queue, @@ -230,7 +232,7 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_ error_queue=self.error_queue, chunk_store=self.chunk_store, use_tls=self.use_tls, - use_compression=False, # operator["compress"], + use_compression=op["compress"], e2ee_key_bytes=self.e2ee_key_bytes, n_processes=op["num_connections"], ) diff --git a/skyplane/gateway/operators/gateway_operator.py b/skyplane/gateway/operators/gateway_operator.py index e722543d3..322b5c925 100644 --- a/skyplane/gateway/operators/gateway_operator.py +++ b/skyplane/gateway/operators/gateway_operator.py @@ -6,6 +6,7 @@ import socket import ssl import time +import lz4.frame import traceback from functools import partial from multiprocessing import Event, Process, Queue @@ -134,7 +135,7 @@ def process(self, chunk_req: ChunkRequest): return False # check to see if file is completed downloading - # Successfully recieved chunk 38400a29812142a486eaefcdebedf371, 161867776 0, 67108864 + # Successfully received chunk 38400a29812142a486eaefcdebedf371, 161867776 0, 67108864 with open(chunk_file_path, "rb") as f: data = f.read() if len(data) < chunk_req.chunk.chunk_length_bytes: @@ -144,7 +145,7 @@ def process(self, chunk_req: ChunkRequest): len(data) == chunk_req.chunk.chunk_length_bytes ), f"Downloaded chunk length does not match expected length: {len(data)}, {chunk_req.chunk.chunk_length_bytes}" print( - f"[{self.handle}:{self.worker_id}] Successfully recieved chunk {chunk_req.chunk.chunk_id}, {len(data)}, {chunk_req.chunk.chunk_length_bytes}" + f"[{self.handle}:{self.worker_id}] Successfully received chunk {chunk_req.chunk.chunk_id}, {len(data)}, {chunk_req.chunk.chunk_length_bytes}" ) return True @@ -328,17 +329,24 @@ def process(self, chunk_req: ChunkRequest, dst_host: str): assert len(data) == chunk.chunk_length_bytes, f"chunk {chunk_id} has size {len(data)} but should be {chunk.chunk_length_bytes}" wire_length = len(data) - # compressed_length = None - # if self.use_compression and self.region == chunk_req.src_region: - # data = lz4.frame.compress(data) - # wire_length = len(data) - # compressed_length = wire_length - # if self.e2ee_secretbox is not None and self.region == chunk_req.src_region: - # data = self.e2ee_secretbox.encrypt(data) - # wire_length = len(data) + raw_wire_length = wire_length + compressed_length = None + + if self.use_compression: + data = lz4.frame.compress(data) + wire_length = len(data) + compressed_length = wire_length + if self.e2ee_secretbox is not None: + data = self.e2ee_secretbox.encrypt(data) + wire_length = len(data) # send chunk header - header = chunk.to_wire_header(n_chunks_left_on_socket=len(chunk_ids) - idx - 1, wire_length=wire_length, is_compressed=False) + header = chunk.to_wire_header( + n_chunks_left_on_socket=len(chunk_ids) - idx - 1, + wire_length=wire_length, + raw_wire_length=raw_wire_length, + is_compressed=(compressed_length is not None), + ) # print(f"[sender-{self.worker_id}]:{chunk_id} sending chunk header {header}") header.to_socket(sock) # print(f"[sender-{self.worker_id}]:{chunk_id} sent chunk header") @@ -528,10 +536,10 @@ def process(self, chunk_req: ChunkRequest, **args): # else: # self.chunk_store.update_chunk_checksum(chunk_req.chunk.chunk_id, md5sum) - recieved_chunk_size = self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).stat().st_size + received_chunk_size = self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).stat().st_size assert ( - recieved_chunk_size == chunk_req.chunk.chunk_length_bytes - ), f"Downloaded chunk {chunk_req.chunk.chunk_id} to {fpath} has incorrect size (expected {chunk_req.chunk.chunk_length_bytes} but got {recieved_chunk_size}, {chunk_req.chunk.chunk_length_bytes})" + received_chunk_size == chunk_req.chunk.chunk_length_bytes + ), f"Downloaded chunk {chunk_req.chunk.chunk_id} to {fpath} has incorrect size (expected {chunk_req.chunk.chunk_length_bytes} but got {received_chunk_size}, {chunk_req.chunk.chunk_length_bytes})" logger.debug(f"[obj_store:{self.worker_id}] Downloaded {chunk_req.chunk.chunk_id} from {self.bucket_name}") return True diff --git a/skyplane/gateway/operators/gateway_receiver.py b/skyplane/gateway/operators/gateway_receiver.py index 168df31e1..aeed0e341 100644 --- a/skyplane/gateway/operators/gateway_receiver.py +++ b/skyplane/gateway/operators/gateway_receiver.py @@ -3,6 +3,7 @@ import socket import ssl import time +import lz4.frame import traceback from contextlib import closing from multiprocessing import Event, Process, Value, Queue @@ -152,8 +153,8 @@ def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]): # TODO: this wont work # chunk_request = self.chunk_store.get_chunk_request(chunk_header.chunk_id) - # should_decrypt = self.e2ee_secretbox is not None and chunk_request.dst_region == self.region - # should_decompress = chunk_header.is_compressed and chunk_request.dst_region == self.region + should_decrypt = self.e2ee_secretbox is not None # and chunk_request.dst_region == self.region + should_decompress = chunk_header.is_compressed # and chunk_request.dst_region == self.region # wait for space # while self.chunk_store.remaining_bytes() < chunk_header.data_len * self.max_pending_chunks: @@ -170,7 +171,7 @@ def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]): fpath = self.chunk_store.get_chunk_file_path(chunk_header.chunk_id) with fpath.open("wb") as f: socket_data_len = chunk_header.data_len - chunk_received_size = 0 + chunk_received_size, chunk_received_size_decompressed = 0, 0 to_write = bytearray(socket_data_len) to_write_view = memoryview(to_write) while socket_data_len > 0: @@ -187,6 +188,18 @@ def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]): ) to_write = bytes(to_write) + if should_decrypt: + to_write = self.e2ee_secretbox.decrypt(to_write) + print(f"[receiver:{server_port}]:{chunk_header.chunk_id} Decrypting {len(to_write)} bytes") + + if should_decompress: + data_batch_decompressed = lz4.frame.decompress(to_write) + chunk_received_size_decompressed += len(data_batch_decompressed) + to_write = data_batch_decompressed + print( + f"[receiver:{server_port}]:{chunk_header.chunk_id} Decompressing {len(to_write)} bytes to {chunk_received_size_decompressed} bytes" + ) + # try to write data until successful while True: try: @@ -194,15 +207,17 @@ def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]): f.write(to_write) f.flush() + # check write succeeds + assert os.path.exists(fpath) + # check size file_size = os.path.getsize(fpath) - if file_size == chunk_header.data_len: + if file_size == chunk_header.raw_data_len: break - elif file_size >= chunk_header.data_len: - raise ValueError(f"[Gateway] File size {file_size} greater than chunk size {chunk_header.data_len}") + elif file_size >= chunk_header.raw_data_len: + raise ValueError(f"[Gateway] File size {file_size} greater than chunk size {chunk_header.raw_data_len}") except Exception as e: print(e) - print( f"[receiver:{server_port}]: No remaining space with bytes {self.chunk_store.remaining_bytes()} data len {chunk_header.data_len} max pending {self.max_pending_chunks}, total space {init_space}" ) diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 1fa5a12f7..8e706ead4 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -220,13 +220,19 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: mux_or = src_program.add_operator(GatewayMuxOr(), parent_handle=obj_store_read, partition_id=partition_id) for i in range(n_instances): src_program.add_operator( - GatewaySend(target_gateway_id=dst_gateways[i].gateway_id, region=src_region_tag, num_connections=self.n_connections), + GatewaySend( + target_gateway_id=dst_gateways[i].gateway_id, + region=src_region_tag, + num_connections=self.n_connections, + compress=True, + encrypt=True, + ), parent_handle=mux_or, partition_id=partition_id, ) # dst region gateway program - recv_op = dst_program.add_operator(GatewayReceive(), partition_id=partition_id) + recv_op = dst_program.add_operator(GatewayReceive(decompress=True, decrypt=True), partition_id=partition_id) dst_program.add_operator( GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections), parent_handle=recv_op, partition_id=partition_id ) @@ -242,22 +248,20 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: class MulticastDirectPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig): - super().__init__(transfer_config) self.n_instances = n_instances self.n_connections = n_connections + self.transfer_config = transfer_config # Loading the quota information, add ibm cloud when it is supported - self.quota_limits = {} - with self.transfer_config.aws_vcpu_file.open("r") as f: - self.quota_limits["aws"] = json.load(f) - with self.transfer_config.gcp_vcpu_file.open("r") as f: - self.quota_limits["gcp"] = json.load(f) - with self.transfer_config.azure_vcpu_file.open("r") as f: - self.quota_limits["azure"] = json.load(f) - + # self.quota_limits = {} + # with self.transfer_config.aws_vcpu_file.open("r") as f: + # self.quota_limits["aws"] = json.load(f) + # with self.transfer_config.gcp_vcpu_file.open("r") as f: + # self.quota_limits["gcp"] = json.load(f) + # with self.transfer_config.azure_vcpu_file.open("r") as f: + # self.quota_limits["azure"] = json.load(f) def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag = jobs[0].src_iface.region_tag() @@ -327,13 +331,18 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: region=dst_region_tag, num_connections=int(self.n_connections / len(dst_gateways)), private_ip=private_ip, + compress=self.transfer_config.use_compression, + encrypt=self.transfer_config.use_e2ee, ), parent_handle=mux_or, partition_id=partition_id, ) # each gateway also recieves data from source - recv_op = dst_program[dst_region_tag].add_operator(GatewayReceive(), partition_id=partition_id) + recv_op = dst_program[dst_region_tag].add_operator( + GatewayReceive(decompress=self.transfer_config.use_compression, decrypt=self.transfer_config.use_e2ee), + partition_id=partition_id, + ) dst_program[dst_region_tag].add_operator( GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections, key_prefix=dst_prefix), parent_handle=recv_op, @@ -471,4 +480,3 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: for dst_region_tag, program in dst_program.items(): plan.set_gateway_program(dst_region_tag, program) return plan - From 923a533b0c70d30aa28810d9dec32b1ec6f60ed8 Mon Sep 17 00:00:00 2001 From: Lynn Liu Date: Tue, 20 Jun 2023 21:14:24 -0700 Subject: [PATCH 09/15] change back to 64 connections --- skyplane/api/pipeline.py | 2 +- skyplane/gateway/gateway_daemon.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index acd59a6e8..ca8a45c23 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -39,7 +39,7 @@ def __init__( transfer_config: TransferConfig, # cloud_regions: dict, max_instances: Optional[int] = 1, - n_connections: Optional[int] = 128, + n_connections: Optional[int] = 64, planning_algorithm: Optional[str] = "direct", debug: Optional[bool] = False, ): diff --git a/skyplane/gateway/gateway_daemon.py b/skyplane/gateway/gateway_daemon.py index bfde83887..51af38c80 100644 --- a/skyplane/gateway/gateway_daemon.py +++ b/skyplane/gateway/gateway_daemon.py @@ -36,7 +36,7 @@ def __init__( self, region: str, chunk_dir: PathLike, - max_incoming_ports=128, + max_incoming_ports=64, use_tls=True, use_e2ee=True, # TODO: read from operator field use_compression=True, # TODO: read from operator field From d6c5430a78ffb9f1d1c26af3547a9644ab163835 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Tue, 20 Jun 2023 23:14:46 -0500 Subject: [PATCH 10/15] Add pytest integration tests (#874) --- skyplane/api/client.py | 2 +- skyplane/api/dataplane.py | 4 + skyplane/api/tracker.py | 206 ++++++++-------- skyplane/api/transfer_job.py | 39 +-- skyplane/cli/cli_cloud.py | 33 +++ skyplane/cli/cli_init.py | 2 +- skyplane/compute/aws/aws_cloud_provider.py | 1 + skyplane/gateway/gateway_program.py | 4 +- skyplane/obj_store/azure_blob_interface.py | 4 + skyplane/obj_store/gcs_interface.py | 6 +- skyplane/obj_store/r2_interface.py | 5 +- skyplane/obj_store/s3_interface.py | 14 +- skyplane/obj_store/storage_interface.py | 4 + skyplane/planner/planner.py | 34 +-- skyplane/utils/path.py | 2 + tests/integration/test_cp.py | 270 +++++++++++++++++++++ 16 files changed, 482 insertions(+), 148 deletions(-) create mode 100644 tests/integration/test_cp.py diff --git a/skyplane/api/client.py b/skyplane/api/client.py index f7afae7e1..d931e6625 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -100,7 +100,7 @@ def copy(self, src: str, dst: str, recursive: bool = False): pipeline = self.pipeline() pipeline.queue_copy(src, dst, recursive=recursive) - pipeline.start() + pipeline.start(progress=True) def object_store(self): return ObjectStore() diff --git a/skyplane/api/dataplane.py b/skyplane/api/dataplane.py index b09f0268a..4b4c3cd38 100644 --- a/skyplane/api/dataplane.py +++ b/skyplane/api/dataplane.py @@ -64,6 +64,8 @@ def __init__( self.topology = topology self.provisioner = provisioner self.transfer_config = transfer_config + # disable for azure + # TODO: remove this self.http_pool = urllib3.PoolManager(retries=urllib3.Retry(total=3)) self.provisioning_lock = threading.Lock() self.provisioned = False @@ -235,6 +237,7 @@ def copy_log(instance): instance.download_file("/tmp/gateway.stdout", out_file) instance.download_file("/tmp/gateway.stderr", err_file) + print("COPY GATEWAY LOGS") do_parallel(copy_log, self.bound_nodes.values(), n=-1) def deprovision(self, max_jobs: int = 64, spinner: bool = False): @@ -307,6 +310,7 @@ def run_async(self, jobs: List[TransferJob], hooks: Optional[TransferHook] = Non """ if not self.provisioned: logger.error("Dataplane must be pre-provisioned. Call dataplane.provision() before starting a transfer") + print("discord", jobs) tracker = TransferProgressTracker(self, jobs, self.transfer_config, hooks) self.pending_transfers.append(tracker) tracker.start() diff --git a/skyplane/api/tracker.py b/skyplane/api/tracker.py index 9f9d2e4a7..1354b90b1 100644 --- a/skyplane/api/tracker.py +++ b/skyplane/api/tracker.py @@ -135,21 +135,26 @@ def run(self): # "src_spot_instance": getattr(self.transfer_config, f"{src_cloud_provider}_use_spot_instances"), # "dst_spot_instance": getattr(self.transfer_config, f"{dst_cloud_provider}_use_spot_instances"), } + # TODO: eventually jobs should be able to be concurrently dispatched and executed + # however this will require being able to handle conflicting multipart uploads ids + + # initialize everything first + for job_uuid, job in self.jobs.items(): + self.job_chunk_requests[job_uuid] = {} + self.job_pending_chunk_ids[job_uuid] = {region: set() for region in self.dataplane.topology.dest_region_tags} + self.job_complete_chunk_ids[job_uuid] = {region: set() for region in self.dataplane.topology.dest_region_tags} + session_start_timestamp_ms = int(time.time() * 1000) - try: + for job_uuid, job in self.jobs.items(): # pre-dispatch chunks to begin pre-buffering chunks - chunk_streams = { - job_uuid: job.dispatch(self.dataplane, transfer_config=self.transfer_config) for job_uuid, job in self.jobs.items() - } - for job_uuid, job in self.jobs.items(): + try: + chunk_stream = job.dispatch(self.dataplane, transfer_config=self.transfer_config) logger.fs.debug(f"[TransferProgressTracker] Dispatching job {job.uuid}") - self.job_chunk_requests[job_uuid] = {} - self.job_pending_chunk_ids[job_uuid] = {region: set() for region in self.dataplane.topology.dest_region_tags} - self.job_complete_chunk_ids[job_uuid] = {region: set() for region in self.dataplane.topology.dest_region_tags} - - for chunk in chunk_streams[job_uuid]: + for chunk in chunk_stream: chunks_dispatched = [chunk] + # TODO: check chunk ID self.job_chunk_requests[job_uuid][chunk.chunk_id] = chunk + assert job_uuid in self.job_chunk_requests and chunk.chunk_id in self.job_chunk_requests[job_uuid] self.hooks.on_chunk_dispatched(chunks_dispatched) for region in self.dataplane.topology.dest_region_tags: self.job_pending_chunk_ids[job_uuid][region].add(chunk.chunk_id) @@ -157,108 +162,107 @@ def run(self): logger.fs.debug( f"[TransferProgressTracker] Job {job.uuid} dispatched with {len(self.job_chunk_requests[job_uuid])} chunk requests" ) + except Exception as e: + UsageClient.log_exception( + "dispatch job", + e, + args, + self.dataplane.topology.src_region_tag, + self.dataplane.topology.dest_region_tags[0], # TODO: support multiple destinations + session_start_timestamp_ms, + ) + raise e - except Exception as e: - UsageClient.log_exception( - "dispatch job", - e, - args, - self.dataplane.topology.src_region_tag, - self.dataplane.topology.dest_region_tags[0], # TODO: support multiple destinations - session_start_timestamp_ms, - ) - raise e - - self.hooks.on_dispatch_end() + self.hooks.on_dispatch_end() + + def monitor_single_dst_helper(dst_region): + start_time = time.time() + try: + self.monitor_transfer(dst_region) + except exceptions.SkyplaneGatewayException as err: + reformat_err = Exception(err.pretty_print_str()[37:]) + UsageClient.log_exception( + "monitor transfer", + reformat_err, + args, + self.dataplane.topology.src_region_tag, + dst_region, + session_start_timestamp_ms, + ) + raise err + except Exception as e: + UsageClient.log_exception( + "monitor transfer", e, args, self.dataplane.topology.src_region_tag, dst_region, session_start_timestamp_ms + ) + raise e + end_time = time.time() + + runtime_s = end_time - start_time + # transfer successfully completed + transfer_stats = { + "dst_region": dst_region, + "total_runtime_s": round(runtime_s, 4), + } + + results = [] + dest_regions = self.dataplane.topology.dest_region_tags + with ThreadPoolExecutor(max_workers=len(dest_regions)) as executor: + e2e_start_time = time.time() + try: + future_list = [executor.submit(monitor_single_dst_helper, dest) for dest in dest_regions] + for future in as_completed(future_list): + results.append(future.result()) + except Exception as e: + raise e + e2e_end_time = time.time() + transfer_stats = { + "total_runtime_s": e2e_end_time - e2e_start_time, + "throughput_gbits": self.query_bytes_dispatched() / (e2e_end_time - e2e_start_time) / GB * 8, + } + self.hooks.on_transfer_end() - def monitor_single_dst_helper(dst_region): - start_time = time.time() + start_time = int(time.time()) try: - self.monitor_transfer(dst_region) - except exceptions.SkyplaneGatewayException as err: - reformat_err = Exception(err.pretty_print_str()[37:]) + for job in self.jobs.values(): + logger.fs.debug(f"[TransferProgressTracker] Finalizing job {job.uuid}") + job.finalize() + except Exception as e: UsageClient.log_exception( - "monitor transfer", - reformat_err, + "finalize job", + e, args, self.dataplane.topology.src_region_tag, - dst_region, + self.dataplane.topology.dest_region_tags[0], session_start_timestamp_ms, ) - raise err + raise e + end_time = int(time.time()) + + # verify transfer + try: + for job in self.jobs.values(): + logger.fs.debug(f"[TransferProgressTracker] Verifying job {job.uuid}") + job.verify() except Exception as e: UsageClient.log_exception( - "monitor transfer", e, args, self.dataplane.topology.src_region_tag, dst_region, session_start_timestamp_ms + "verify job", + e, + args, + self.dataplane.topology.src_region_tag, + self.dataplane.topology.dest_region_tags[0], + session_start_timestamp_ms, ) raise e - end_time = time.time() - runtime_s = end_time - start_time # transfer successfully completed - transfer_stats = { - "dst_region": dst_region, - "total_runtime_s": round(runtime_s, 4), - } - - results = [] - dest_regions = self.dataplane.topology.dest_region_tags - with ThreadPoolExecutor(max_workers=len(dest_regions)) as executor: - e2e_start_time = time.time() - try: - future_list = [executor.submit(monitor_single_dst_helper, dest) for dest in dest_regions] - for future in as_completed(future_list): - results.append(future.result()) - except Exception as e: - raise e - e2e_end_time = time.time() - transfer_stats = { - "total_runtime_s": e2e_end_time - e2e_start_time, - "throughput_gbits": self.query_bytes_dispatched() / (e2e_end_time - e2e_start_time) / GB * 8, - } - self.hooks.on_transfer_end() - - start_time = int(time.time()) - try: - for job in self.jobs.values(): - logger.fs.debug(f"[TransferProgressTracker] Finalizing job {job.uuid}") - job.finalize() - except Exception as e: - UsageClient.log_exception( - "finalize job", - e, + UsageClient.log_transfer( + transfer_stats, args, self.dataplane.topology.src_region_tag, - self.dataplane.topology.dest_region_tags[0], + self.dataplane.topology.dest_region_tags, session_start_timestamp_ms, ) - raise e - end_time = int(time.time()) - - # verify transfer - try: - for job in self.jobs.values(): - logger.fs.debug(f"[TransferProgressTracker] Verifying job {job.uuid}") - job.verify() - except Exception as e: - UsageClient.log_exception( - "verify job", - e, - args, - self.dataplane.topology.src_region_tag, - self.dataplane.topology.dest_region_tags[0], - session_start_timestamp_ms, - ) - raise e - - # transfer successfully completed - UsageClient.log_transfer( - transfer_stats, - args, - self.dataplane.topology.src_region_tag, - self.dataplane.topology.dest_region_tags, - session_start_timestamp_ms, - ) - print_stats_completed(total_runtime_s=transfer_stats["total_runtime_s"], throughput_gbits=transfer_stats["throughput_gbits"]) + print_stats_completed(total_runtime_s=transfer_stats["total_runtime_s"], throughput_gbits=transfer_stats["throughput_gbits"]) @imports.inject("pandas") def monitor_transfer(pd, self, region_tag): @@ -299,13 +303,22 @@ def monitor_transfer(pd, self, region_tag): # update job_complete_chunk_ids and job_pending_chunk_ids # TODO: do chunk-tracking per-destination for job_uuid, job in self.jobs.items(): - job_complete_chunk_ids = set(chunk_id for chunk_id in completed_chunk_ids if self._chunk_to_job_map[chunk_id] == job_uuid) + try: + job_complete_chunk_ids = set( + chunk_id for chunk_id in completed_chunk_ids if self._chunk_to_job_map[chunk_id] == job_uuid + ) + except Exception as e: + raise e new_chunk_ids = ( self.job_complete_chunk_ids[job_uuid][region_tag] .union(job_complete_chunk_ids) .difference(self.job_complete_chunk_ids[job_uuid][region_tag]) ) completed_chunks = [] + for id in new_chunk_ids: + assert ( + job_uuid in self.job_chunk_requests and id in self.job_chunk_requests[job_uuid] + ), f"Missing chunk id {id} for job {job_uuid}: {self.job_chunk_requests}" for id in new_chunk_ids: completed_chunks.append(self.job_chunk_requests[job_uuid][id]) self.hooks.on_chunk_completed(completed_chunks, region_tag) @@ -319,7 +332,8 @@ def monitor_transfer(pd, self, region_tag): time.sleep(0.05) @property - @functools.lru_cache(maxsize=1) + # TODO: this is a very slow function, but we can't cache it since self.job_chunk_requests changes over time + # do not call it more often than necessary def _chunk_to_job_map(self): return {chunk_id: job_uuid for job_uuid, cr_dict in self.job_chunk_requests.items() for chunk_id in cr_dict.keys()} diff --git a/skyplane/api/transfer_job.py b/skyplane/api/transfer_job.py index 0d6e50526..9d8fdeceb 100644 --- a/skyplane/api/transfer_job.py +++ b/skyplane/api/transfer_job.py @@ -325,8 +325,11 @@ def chunk(self, transfer_pair_generator: Generator[TransferPair, None, None]) -> multipart_exit_event = threading.Event() multipart_chunk_threads = [] + # TODO: remove after azure multipart implemented + azure_dest = any([dst_iface.provider == "azure" for dst_iface in self.dst_ifaces]) + # start chunking threads - if self.transfer_config.multipart_enabled: + if not azure_dest and self.transfer_config.multipart_enabled: for _ in range(self.concurrent_multipart_chunk_threads): t = threading.Thread( target=self._run_multipart_chunk_thread, @@ -338,8 +341,13 @@ def chunk(self, transfer_pair_generator: Generator[TransferPair, None, None]) -> # begin chunking loop for transfer_pair in transfer_pair_generator: + # print("transfer_pair", transfer_pair.src_obj.key, transfer_pair.dst_objs) src_obj = transfer_pair.src_obj - if self.transfer_config.multipart_enabled and src_obj.size > self.transfer_config.multipart_threshold_mb * MB: + if ( + not azure_dest + and self.transfer_config.multipart_enabled + and src_obj.size > self.transfer_config.multipart_threshold_mb * MB + ): multipart_send_queue.put(transfer_pair) else: if transfer_pair.src_obj.size == 0: @@ -460,13 +468,16 @@ def __init__( dst_paths: List[str] or str, recursive: bool = False, requester_pays: bool = False, - uuid: str = field(init=False, default_factory=lambda: str(uuid.uuid4())), + job_id: Optional[str] = None, ): self.src_path = src_path self.dst_paths = dst_paths self.recursive = recursive self.requester_pays = requester_pays - self.uuid = uuid + if job_id is None: + self.uuid = str(uuid.uuid4()) + else: + self.uuid = job_id @property def transfer_type(self) -> str: @@ -559,9 +570,9 @@ def __init__( dst_paths: List[str] or str, recursive: bool = False, requester_pays: bool = False, - uuid: str = field(init=False, default_factory=lambda: str(uuid.uuid4())), + job_id: Optional[str] = None, ): - super().__init__(src_path, dst_paths, recursive, requester_pays, uuid) + super().__init__(src_path, dst_paths, recursive, requester_pays, job_id) self.transfer_list = [] self.multipart_transfer_list = [] @@ -645,6 +656,9 @@ def dispatch( # send chunk requests to source gateways chunk_batch = [cr.chunk for cr in batch if cr.chunk is not None] + # TODO: allow multiple partition ids per chunk + for chunk in chunk_batch: # assign job UUID as partition ID + chunk.partition_id = self.uuid min_idx = queue_size.index(min(queue_size)) n_added = 0 while n_added < len(chunk_batch): @@ -701,6 +715,9 @@ def complete_fn(batch): do_parallel(complete_fn, batches, n=8) + # TODO: Do NOT do this if we are pipelining multiple transfers - remove just what was completed + self.multipart_transfer_list = [] + def verify(self): """Verify the integrity of the transfered destination objects""" @@ -750,14 +767,8 @@ def size_gb(self): class SyncJob(CopyJob): """sync job that copies the source objects that does not exist in the destination bucket to the destination""" - def __init__( - self, - src_path: str, - dst_paths: List[str] or str, - requester_pays: bool = False, - uuid: str = field(init=False, default_factory=lambda: str(uuid.uuid4())), - ): - super().__init__(src_path, dst_paths, True, requester_pays, uuid) + def __init__(self, src_path: str, dst_paths: List[str] or str, requester_pays: bool = False, job_id: Optional[str] = None): + super().__init__(src_path, dst_paths, True, requester_pays, job_id) self.transfer_list = [] self.multipart_transfer_list = [] diff --git a/skyplane/cli/cli_cloud.py b/skyplane/cli/cli_cloud.py index 37687b8dd..87e6a1da8 100644 --- a/skyplane/cli/cli_cloud.py +++ b/skyplane/cli/cli_cloud.py @@ -3,6 +3,7 @@ """ import json +import uuid import subprocess import time from collections import defaultdict @@ -281,6 +282,38 @@ def azure_check( iface = AzureBlobInterface(account, container) print(iface.container_client.get_container_properties()) + # write object temorarily + # check skyplane AzureBlobInterface + print(f"\n{hline}\n[bold]Checking Skyplane AzureBlobInterface[/bold]\n{hline}") + from skyplane.obj_store.azure_blob_interface import AzureBlobInterface + + iface = AzureBlobInterface(account, container) + print(iface.container_client.get_container_properties()) + + # check if writeable + rprint(f"\n{hline}\n[bold]Checking Skyplane AzureBlobInterface write access[/bold]\n{hline}") + import tempfile + import random + import string + + def generate_random_string(length): + """Generate a random string of given length""" + letters = string.ascii_letters + return "".join(random.choice(letters) for _ in range(length)) + + def create_temp_file(size): + """Create a temporary file with random data""" + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_file_path = temp_file.name + random_data = generate_random_string(size) + temp_file.write(random_data.encode()) + return temp_file_path + + tmp_file = create_temp_file(1024) + tmp_object = f"skyplane-{uuid.uuid4()}" + iface.upload_object(tmp_file, tmp_object) + iface.delete_objects([tmp_object]) + @app.command() def gcp_check( diff --git a/skyplane/cli/cli_init.py b/skyplane/cli/cli_init.py index e946de451..90b973236 100644 --- a/skyplane/cli/cli_init.py +++ b/skyplane/cli/cli_init.py @@ -535,7 +535,7 @@ def init( cloud_config = load_gcp_config(cloud_config, force_init=reinit_gcp, non_interactive=non_interactive) # load cloudflare config - if not reinit_cloudflare: # TODO: fix reinit logic + if not reinit_cloudflare and not disable_config_cloudflare: # TODO: fix reinit logic typer.secho("\n(4) Configuring Cloudflare R2:", fg="yellow", bold=True) if not disable_config_cloudflare: cloud_config = load_cloudflare_config(cloud_config, non_interactive=non_interactive) diff --git a/skyplane/compute/aws/aws_cloud_provider.py b/skyplane/compute/aws/aws_cloud_provider.py index 780ef6305..90cc92012 100644 --- a/skyplane/compute/aws/aws_cloud_provider.py +++ b/skyplane/compute/aws/aws_cloud_provider.py @@ -229,6 +229,7 @@ def start_instance(subnet_id: str): elif "VcpuLimitExceeded" in str(e): raise skyplane_exceptions.InsufficientVCPUException() from e elif "Invalid IAM Instance Profile name" in str(e): + # TODO: suppress this logger.warning(str(e)) elif "InsufficientInstanceCapacity" in str(e): # try another subnet diff --git a/skyplane/gateway/gateway_program.py b/skyplane/gateway/gateway_program.py index a8c1808e4..f275b82e8 100644 --- a/skyplane/gateway/gateway_program.py +++ b/skyplane/gateway/gateway_program.py @@ -113,7 +113,7 @@ def __init__(self): def get_operators(self) -> List[GatewayOperator]: return list(self._ops.values()) - def add_operators(self, ops: List[GatewayOperator], parent_handle: Optional[str] = None, partition_id: Optional[int] = 0): + def add_operators(self, ops: List[GatewayOperator], parent_handle: Optional[str] = None, partition_id: Optional[str] = "default"): parent_op = self._ops[parent_handle] if parent_handle else None ops_handles = [] for op in ops: @@ -121,7 +121,7 @@ def add_operators(self, ops: List[GatewayOperator], parent_handle: Optional[str] return ops_handles - def add_operator(self, op: GatewayOperator, parent_handle: Optional[str] = None, partition_id: Optional[int] = 0): + def add_operator(self, op: GatewayOperator, parent_handle: Optional[str] = None, partition_id: Optional[str] = "default"): parent_op = self._ops[parent_handle] if parent_handle else None if not parent_op: # root operation self._plan[partition_id].append(op) diff --git a/skyplane/obj_store/azure_blob_interface.py b/skyplane/obj_store/azure_blob_interface.py index 04e8c3cae..a55a6fc8c 100644 --- a/skyplane/obj_store/azure_blob_interface.py +++ b/skyplane/obj_store/azure_blob_interface.py @@ -30,6 +30,10 @@ def __init__(self, account_name: str, container_name: str, max_concurrency=1): self.container_name = container_name self.max_concurrency = max_concurrency # parallel upload/downloads, seems to cause issues if too high + @property + def provider(self): + return "azure" + def path(self): return f"https://{self.account_name}.blob.core.windows.net/{self.container_name}" diff --git a/skyplane/obj_store/gcs_interface.py b/skyplane/obj_store/gcs_interface.py index 14bc565c0..7a79c332e 100644 --- a/skyplane/obj_store/gcs_interface.py +++ b/skyplane/obj_store/gcs_interface.py @@ -27,8 +27,10 @@ def __init__(self, bucket_name: str): self.auth = compute.GCPAuthentication() self._gcs_client = self.auth.get_storage_client() self._requests_session = requests.Session() - self.provider = "gcp" - # self.region_tag = self.a + + @property + def provider(self): + return "gcp" def path(self): return f"gs://{self.bucket_name}" diff --git a/skyplane/obj_store/r2_interface.py b/skyplane/obj_store/r2_interface.py index 063f3b431..af2e1a36b 100644 --- a/skyplane/obj_store/r2_interface.py +++ b/skyplane/obj_store/r2_interface.py @@ -39,7 +39,10 @@ def __init__(self, account_id: str, bucket_name: str): self.requester_pays = False self.bucket_name = bucket_name self.account_id = account_id - self.provider = "cloudflare" + + @property + def provider(self): + return "cloudflare" def path(self): return f"{self.endpoint_url}/{self.bucket_name}" diff --git a/skyplane/obj_store/s3_interface.py b/skyplane/obj_store/s3_interface.py index 430f7c98e..f502cd77e 100644 --- a/skyplane/obj_store/s3_interface.py +++ b/skyplane/obj_store/s3_interface.py @@ -24,7 +24,10 @@ def __init__(self, bucket_name: str): self.requester_pays = False self.bucket_name = bucket_name self._cached_s3_clients = {} - self.provider = "aws" + + @property + def provider(self): + return "aws" def path(self): return f"s3://{self.bucket_name}" @@ -42,9 +45,12 @@ def aws_region(self): if "An error occurred (AccessDenied) when calling the GetBucketLocation operation" in str(e): logger.warning(f"Bucket location {self.bucket_name} is not public. Assuming region is {default_region}") return default_region - logger.warning(f"Specified bucket {self.bucket_name} does not exist, got AWS error: {e}") - print("Error getting AWS region", e) - raise exceptions.MissingBucketException(f"S3 bucket {self.bucket_name} does not exist") from e + elif "An error occurred (InvalidAccessKeyId) when calling" in str(e): + logger.warning(f"Invalid AWS credentials. Check to make sure credentials configured properly.") + raise exceptions.PermissionsException(f"Invalid AWS credentials for accessing bucket {self.bucket_name}") + else: + logger.warning(f"Specified bucket {self.bucket_name} does not exist, got AWS error: {e}") + raise exceptions.MissingBucketException(f"S3 bucket {self.bucket_name} does not exist") from e def region_tag(self): return "aws:" + self.aws_region diff --git a/skyplane/obj_store/storage_interface.py b/skyplane/obj_store/storage_interface.py index c32b50f1d..a430573f8 100644 --- a/skyplane/obj_store/storage_interface.py +++ b/skyplane/obj_store/storage_interface.py @@ -6,6 +6,10 @@ class StorageInterface: def bucket(self) -> str: return self.bucket_name + @property + def provider(self) -> str: + raise NotImplementedError() + def region_tag(self) -> str: raise NotImplementedError() diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index e8a41eb79..6bca6c60a 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -140,8 +140,7 @@ def _calculate_vm_types(self, region_tag: str) -> Optional[Tuple[str, int]]: break # shouldn't happen, but just in case we use more complicated vm types in the future - if vm_type is None or vcpus is None: - return None + assert vm_type is not None and vcpus is not None # number of instances allowed by the quota with the selected vm type n_instances = quota_limit // vcpus @@ -228,7 +227,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: dst_bucket = job.dst_ifaces[0].bucket() # give each job a different partition id, so we can read/write to different buckets - partition_id = jobs.index(job) + partition_id = job.uuid # source region gateway program obj_store_read = src_program.add_operator( @@ -259,11 +258,12 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: class MulticastDirectPlanner(Planner): + def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig, quota_limits_file: Optional[str] = None): super().__init__(transfer_config, quota_limits_file) self.n_instances = n_instances self.n_connections = n_connections - + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag = jobs[0].src_iface.region_tag() dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces] @@ -299,7 +299,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_provider = src_region_tag.split(":")[0] # give each job a different partition id, so we can read/write to different buckets - partition_id = jobs.index(job) + partition_id = job.uuid # source region gateway program obj_store_read = src_program.add_operator( @@ -392,7 +392,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_provider = src_region_tag.split(":")[0] # give each job a different partition id, so we can read/write to different buckets - partition_id = jobs.index(job) + partition_id = job.uuid # source region gateway program obj_store_read = src_program.add_operator( @@ -453,7 +453,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag = job.src_iface.region_tag() src_provider = src_region_tag.split(":")[0] - partition_id = jobs.index(job) + partition_id = job.uuid # send to all destination dst_prefixes = job.dst_prefixes @@ -482,23 +482,3 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: for dst_region_tag, program in dst_program.items(): plan.set_gateway_program(dst_region_tag, program) return plan - - -class UnicastILPPlanner(Planner): - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("ILP solver not implemented yet") - - -class MulticastILPPlanner(Planner): - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("ILP solver not implemented yet") - - -class MulticastMDSTPlanner(Planner): - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("MDST solver not implemented yet") - - -class MulticastSteinerTreePlanner(Planner): - def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("Steiner tree solver not implemented yet") diff --git a/skyplane/utils/path.py b/skyplane/utils/path.py index 3328fe956..9670934f8 100644 --- a/skyplane/utils/path.py +++ b/skyplane/utils/path.py @@ -66,6 +66,8 @@ def is_plausible_local_path(path_test: str): else: if not is_plausible_local_path(path): logger.warning(f"Local path '{path}' does not exist") + if path.startswith("az://"): + logger.warning(f"Did you mean azure://...? If not, assuming local path.") # path is subsitutute for bucket return "local", path, path diff --git a/tests/integration/test_cp.py b/tests/integration/test_cp.py new file mode 100644 index 000000000..0eed71598 --- /dev/null +++ b/tests/integration/test_cp.py @@ -0,0 +1,270 @@ +import pytest +from skyplane.api.config import TransferConfig +from skyplane.utils import logger +import time +from skyplane.api.client import SkyplaneClient +from skyplane.obj_store.object_store_interface import ObjectStoreInterface +import uuid +import os + +test_bucket = "gs://skyplane-test-bucket" # bucket containing test data +test_region_tag = "gcp:us-west2" + +# test cases +test_bucket_small_file = f"{test_bucket}/files_10000_size_4_mb" +test_bucket_large_file = f"{test_bucket}/file_1_size_16_gb" +test_bucket_medium_file = f"{test_bucket}/files_100_size_64_mb" +# test_bucket_empty_folder = f"{test_bucket}/empty_folder" + + +@pytest.mark.skip(reason="Shared function") +def setup_bucket(region_tag): + provider, region = region_tag.split(":") + if provider == "azure" or provider == "cloudflare": + bucket_name = f"{str(uuid.uuid4())[:8]}/{str(uuid.uuid4()).replace('-', '')}" + else: + bucket_name = f"integration{region}-{str(uuid.uuid4())[:8]}" + + # create bucket + try: + iface = ObjectStoreInterface.create(region_tag, bucket_name) + if provider == "cloudflare": + iface.create_bucket() + else: + iface.create_bucket(region) + except Exception as e: + logger.fs.error(f"Failed to create bucket {bucket_name}: {e}") + raise e + + return iface + + +@pytest.fixture(scope="session") +def bucket(region_tag): + iface = setup_bucket(region_tag) + yield iface + # cleanup + iface.delete_bucket() + + +@pytest.fixture(scope="session") +def same_region_bucket(): + iface = setup_bucket(test_region_tag) + assert iface.bucket_exists(), f"Bucket {iface.bucket()} does not exist" + yield iface + + # cleanup + iface.delete_bucket() + + +@pytest.fixture(scope="session") +def gcp_bucket(): + region_tag = "gcp:europe-west2" + iface = setup_bucket(region_tag) + assert iface.bucket_exists(), f"Bucket {iface.bucket()} does not exist" + yield iface + + # cleanup + iface.delete_bucket() + + +@pytest.fixture(scope="session") +def azure_bucket(): + azure_region_tag = "azure:westus" + iface = setup_bucket(azure_region_tag) + while not iface.bucket_exists(): + logger.fs.info(f"Waiting for bucket {iface.bucket()}") + time.sleep(1) + yield iface + # cleanup + iface.delete_bucket() + + +@pytest.fixture(scope="session") +def aws_bucket(): + aws_region_tag = "aws:us-west-2" + iface = setup_bucket(aws_region_tag) + assert iface.bucket_exists(), f"Bucket {iface.bucket()} does not exist" + + yield iface + # cleanup + iface.delete_bucket() + + +@pytest.fixture(scope="session") +def cloudflare_bucket(): + iface = setup_bucket("cloudflare:infer") + assert iface.bucket_exists(), f"Bucket {iface.bucket()} does not exist" + yield iface.bucket() + + # cleanup + iface.delete_bucket() + + +# TODO: add more parameters for bucket types +# @pytest.mark.parametrize( # tests large objects +# "test_case, recursive", [(test_bucket_medium_file, True), (test_bucket_large_file, False), (test_bucket_small_file, True)] +# ) +@pytest.mark.parametrize("test_case, recursive", [(test_bucket_medium_file, True)]) +def test_azure(azure_bucket, gcp_bucket, test_case, recursive): + """ + Test copying a big file to different cloud providers + :param azure_bucket: destination interface + :param gcp_bucket: gcp bucket to copy FROM dstiface + :param test_case: test case from test_bucket to copy from + """ + print("DEST", azure_bucket.path(), gcp_bucket.path()) + client = SkyplaneClient() + src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) + print(azure_bucket.path()) + + assert isinstance(azure_bucket.bucket(), str), f"Bucket name is not a string {azure_bucket.bucket()}" + assert ( + len(list(src_iface.list_objects(prefix=test_case.replace(f"{test_bucket}/", "")))) > 0 + ), f"Test case {test_case} does not exist in {test_bucket}" + print("test case", test_case) + client.copy(test_case, f"{azure_bucket.path()}/{test_case}", recursive=recursive) + + # assert sync has cost zero + dst_objects = list(azure_bucket.list_objects()) + assert len(dst_objects) > 0, f"Object {test_case} not copied to {azure_bucket.bucket()}: only container {dst_objects}" + + print(f"gs://{gcp_bucket}/azure/{test_case}") + # copy back + client.copy(f"{azure_bucket.path()}/{test_case}", f"gs://{gcp_bucket.bucket()}/azure/", recursive=recursive) + + +@pytest.mark.parametrize( + "test_case, recursive", [(test_bucket_medium_file, True), (test_bucket_large_file, False), (test_bucket_small_file, True)] +) +def test_aws(aws_bucket, gcp_bucket, test_case, recursive): + """ + Test copying a big file to different cloud providers + :param aws_bucket: destination interface + :param gcp_bucket: gcp bucket to copy FROM dstiface + :param test_case: test case from test_bucket to copy from + """ + client = SkyplaneClient() + src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) + print("test case:", test_case) + print(aws_bucket.path()) + + assert isinstance(aws_bucket.bucket(), str), f"Bucket name is not a string {aws_bucket.bucket()}" + assert ( + len(list(src_iface.list_objects(prefix=test_case.replace(f"{test_bucket}/", "")))) > 0 + ), f"Test case {test_case} does not exist in {test_bucket}" + client.copy(test_case, f"{aws_bucket.path()}/{test_case}", recursive=recursive) + + # assert sync has cost zero + dst_objects = list(aws_bucket.list_objects()) + assert len(dst_objects) > 0, f"Object {test_case} not copied to {aws_bucket.bucket()}: only container {dst_objects}" + + # copy back + client.copy(f"{aws_bucket.path()}/{test_case}", f"gs://{gcp_bucket.bucket()}/aws/", recursive=recursive) + + +@pytest.mark.parametrize( + "test_case, recursive", [(test_bucket_medium_file, True), (test_bucket_large_file, False), (test_bucket_small_file, True)] +) +def test_cloudflare(cloudflare_bucket, gcp_bucket, test_case, recursive): + """ + Test copying a big file to different cloud providers + :param cloudflare_bucket: destination interface + :param gcp_bucket: gcp bucket to copy FROM dstiface + :param test_case: test case from test_bucket to copy from + """ + client = SkyplaneClient() + src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) + + assert isinstance(cloudflare_bucket.bucket(), str), f"Bucket name is not a string {cloudflare_bucket.bucket()}" + assert ( + len(list(src_iface.list_objects(prefix=test_case.replace(f"{test_bucket}/", "")))) > 0 + ), f"Test case {test_case} does not exist in {test_bucket}" + client.copy(test_case, f"{cloudflare_bucket.path()}/{test_case}", recursive=recursive) + + # assert sync has cost zero + dst_objects = list(cloudflare_bucket.list_objects()) + assert len(dst_objects) > 0, f"Object {test_case} not copied to {cloudflare_bucket.bucket()}: only container {dst_objects}" + + # copy back + client.copy(f"{cloudflare_bucket.path()}/{test_case}", f"gs://{gcp_bucket.bucket()}/cloudflare/", recursive=recursive) + + +@pytest.mark.parametrize( + "test_case, recursive", [(test_bucket_medium_file, True), (test_bucket_large_file, False), (test_bucket_small_file, True)] +) +def test_gcp(gcp_bucket, test_case, recursive): + """ + Test copying a big file to different cloud providers + :param gcp_bucket: destination interface + :param gcp_bucket: gcp bucket to copy FROM dstiface + :param test_case: test case from test_bucket to copy from + """ + client = SkyplaneClient() + src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) + + assert isinstance(gcp_bucket.bucket(), str), f"Bucket name is not a string {gcp_bucket.bucket()}" + assert ( + len(list(src_iface.list_objects(prefix=test_case.replace(f"{test_bucket}/", "")))) > 0 + ), f"Test case {test_case} does not exist in {test_bucket}" + client.copy(test_case, f"{gcp_bucket.path()}/{test_case}", recursive=recursive) + + # assert sync has cost zero + dst_objects = list(gcp_bucket.list_objects()) + assert len(dst_objects) > 0, f"Object {test_case} not copied to {gcp_bucket.bucket()}: only container {dst_objects}" + + +@pytest.mark.timeout(60 * 20) +def test_same_region(same_region_bucket): + client = SkyplaneClient() + client.copy(test_bucket_large_file, f"{same_region_bucket.path()}") + + +@pytest.mark.timeout(60 * 20) +def test_pipeline(gcp_bucket): + """Test pipeline's ability to run multiple copy jobs on a single dataplane""" + client = SkyplaneClient() + pipeline = client.pipeline() + + # queue two copy jobs + pipeline.queue_copy(test_bucket_large_file, f"{gcp_bucket.path()}/large/") + pipeline.queue_copy(test_bucket_medium_file, f"{gcp_bucket.path()}/medium/", recursive=True) + + # start pipeline + try: + pipeline.start(debug=True, progress=True) + except Exception as e: + print(e) + raise e + + assert len(list(gcp_bucket.list_objects(prefix="large/"))) > 0, f"No data from {test_bucket_large_file} transferred" + assert len(list(gcp_bucket.list_objects(prefix="medium/"))) > 0, f"No data from {test_bucket_medium_file} transferred" + + +# test one sided transfers +def test_cp_one_sided(): + # TODO: run on-sided tranfer between all cloud pairs + pass + + +# test multiple VMs +@pytest.mark.timeout(60 * 20) +def test_cp_multiple_vms(aws_bucket): + client = SkyplaneClient() + pipeline = client.pipeline(max_instances=2) + pipeline.queue_copy(test_bucket_large_file, f"s3://{aws_bucket.bucket()}/") + pipeline.start(debug=True, progress=True) + + +# test multicast +# TODO: add azure +@pytest.mark.timeout(60 * 20) +def test_cp_multicast(aws_bucket, gcp_bucket, same_region_bucket): + client = SkyplaneClient() + src_iface = ObjectStoreInterface.create("gcp:us-west2", test_bucket.split("://")[1]) + assert ( + len(list(src_iface.list_objects(prefix=test_bucket_large_file.replace(f"{test_bucket}/", "")))) > 0 + ), f"Test case {test_bucket_large_file} does not exist in {test_bucket}" + client.copy( + test_bucket_large_file, [f"s3://{aws_bucket}/", f"gs://{gcp_bucket}/", f"gs://{same_region_bucket}/", f"azure://{azure_bucket}/"] + ) From aa8a630b89ab8f4d20ab0cceb496e5bc17e88b5c Mon Sep 17 00:00:00 2001 From: Lynn Liu Date: Tue, 20 Jun 2023 21:46:58 -0700 Subject: [PATCH 11/15] reformat --- skyplane/planner/planner.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 7d37bc221..ca69d9259 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -264,12 +264,11 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan: class MulticastDirectPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int, transfer_config: TransferConfig, quota_limits_file: Optional[str] = None): super().__init__(transfer_config, quota_limits_file) self.n_instances = n_instances self.n_connections = n_connections - + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: src_region_tag = jobs[0].src_iface.region_tag() dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces] From 22dca429ec7c0b4a90f25261826246d578e83551 Mon Sep 17 00:00:00 2001 From: Lynn Liu Date: Tue, 20 Jun 2023 23:12:28 -0700 Subject: [PATCH 12/15] typo --- skyplane/gateway/gateway_daemon.py | 8 ++++---- skyplane/gateway/operators/gateway_operator.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/skyplane/gateway/gateway_daemon.py b/skyplane/gateway/gateway_daemon.py index 51af38c80..06ef3e1ad 100644 --- a/skyplane/gateway/gateway_daemon.py +++ b/skyplane/gateway/gateway_daemon.py @@ -22,7 +22,7 @@ GatewayWriteLocal, GatewayObjStoreReadOperator, GatewayObjStoreWriteOperator, - GatewayWaitReciever, + GatewayWaitReceiver, ) from skyplane.gateway.operators.gateway_receiver import GatewayReceiver from skyplane.utils import logger @@ -81,7 +81,7 @@ def __init__( self.num_required_terminal = {} self.operators = self.create_gateway_operators(gateway_program) - # single gateway reciever + # single gateway receiver self.gateway_receiver = GatewayReceiver( "reciever", region=region, @@ -180,8 +180,8 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_ # create operators if op["op_type"] == "receive": - # wait for chunks from reciever - operators[handle] = GatewayWaitReciever( + # wait for chunks from receiver + operators[handle] = GatewayWaitReceiver( handle=handle, region=self.region, input_queue=input_queue, diff --git a/skyplane/gateway/operators/gateway_operator.py b/skyplane/gateway/operators/gateway_operator.py index 322b5c925..c574caebc 100644 --- a/skyplane/gateway/operators/gateway_operator.py +++ b/skyplane/gateway/operators/gateway_operator.py @@ -122,11 +122,11 @@ def process(self, chunk_req: ChunkRequest, **args): pass -class GatewayWaitReciever(GatewayOperator): +class GatewayWaitReceiver(GatewayOperator): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - # TODO: alternative (potentially better performnace) implementation: connect via queue with GatewayReciever to listen + # TODO: alternative (potentially better performnace) implementation: connect via queue with GatewayReceiver to listen # for download completition events - join with chunk request queue from ChunkStore def process(self, chunk_req: ChunkRequest): chunk_file_path = self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id) From 41800911b4b25064adcb0d76d54ce4e62d9a26fc Mon Sep 17 00:00:00 2001 From: Lynn Liu Date: Sat, 8 Jul 2023 18:06:16 -0700 Subject: [PATCH 13/15] reformat --- skyplane/api/transfer_job.py | 1 + 1 file changed, 1 insertion(+) diff --git a/skyplane/api/transfer_job.py b/skyplane/api/transfer_job.py index fa906d92d..02b92d439 100644 --- a/skyplane/api/transfer_job.py +++ b/skyplane/api/transfer_job.py @@ -159,6 +159,7 @@ def _run_multipart_chunk_thread( # Convert parts to base64 and store mime_type if destination interface is AzureBlobInterface if dest_iface.provider == "azure": from skyplane.obj_store.azure_blob_interface import AzureBlobInterface + block_ids = list(map(lambda part_num: AzureBlobInterface.id_to_base64_encoding(part_num, dest_object.key), parts)) metadata = (block_ids, mime_type) From b82de44aa2ab3f13fcd310b88373697ba274616c Mon Sep 17 00:00:00 2001 From: Lynn Liu Date: Mon, 10 Jul 2023 16:00:24 -0700 Subject: [PATCH 14/15] remove print --- skyplane/api/dataplane.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/skyplane/api/dataplane.py b/skyplane/api/dataplane.py index 2b4a84ea8..40530771b 100644 --- a/skyplane/api/dataplane.py +++ b/skyplane/api/dataplane.py @@ -236,8 +236,7 @@ def copy_log(instance): instance.run_command("sudo docker logs -t skyplane_gateway 2> /tmp/gateway.stderr > /tmp/gateway.stdout") instance.download_file("/tmp/gateway.stdout", out_file) instance.download_file("/tmp/gateway.stderr", err_file) - - print("COPY GATEWAY LOGS") + do_parallel(copy_log, self.bound_nodes.values(), n=-1) def deprovision(self, max_jobs: int = 64, spinner: bool = False): @@ -310,7 +309,6 @@ def run_async(self, jobs: List[TransferJob], hooks: Optional[TransferHook] = Non """ if not self.provisioned: logger.error("Dataplane must be pre-provisioned. Call dataplane.provision() before starting a transfer") - print("discord", jobs) tracker = TransferProgressTracker(self, jobs, self.transfer_config, hooks) self.pending_transfers.append(tracker) tracker.start() From 5825c62cfb56a83480be5fb2778d7da41c58d0a6 Mon Sep 17 00:00:00 2001 From: Lynn Liu Date: Mon, 10 Jul 2023 16:39:13 -0700 Subject: [PATCH 15/15] reformat --- skyplane/api/dataplane.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skyplane/api/dataplane.py b/skyplane/api/dataplane.py index 40530771b..df6fab67b 100644 --- a/skyplane/api/dataplane.py +++ b/skyplane/api/dataplane.py @@ -236,7 +236,7 @@ def copy_log(instance): instance.run_command("sudo docker logs -t skyplane_gateway 2> /tmp/gateway.stderr > /tmp/gateway.stdout") instance.download_file("/tmp/gateway.stdout", out_file) instance.download_file("/tmp/gateway.stderr", err_file) - + do_parallel(copy_log, self.bound_nodes.values(), n=-1) def deprovision(self, max_jobs: int = 64, spinner: bool = False):