diff --git a/.github/workflows/integration-test-local.yml b/.github/workflows/integration-test-local.yml new file mode 100644 index 000000000..c73a9b9db --- /dev/null +++ b/.github/workflows/integration-test-local.yml @@ -0,0 +1,95 @@ +name: integration-test-local +on: [push] +concurrency: transfer-test +env: + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + SKYPLANE_USAGE_STATS_ENABLED: 0 +jobs: + integration: + if: (github.event_name == 'push' && github.ref == 'refs/heads/main') || (github.event_name == 'pull_request') + runs-on: ubuntu-latest + strategy: + max-parallel: 8 + matrix: + pairs: + # AWS + - aws:us-east-1 local + - local aws:us-east-2 + # GCP + - gcp:us-central1-a local + - local gcp:us-east1-a + # Azure + - azure:westus local + - local azure:westus + timeout-minutes: 40 + env: + STRATEGY_UUID: itest-${{ github.run_id }}-${{ github.run_attempt }}-${{ strategy.job-index }} + steps: + - uses: actions/checkout@v1 + - name: Install poetry + run: pipx install poetry + - name: Set up Python 3.10 + uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "poetry" + - name: Set Poetry config + run: | + poetry config virtualenvs.in-project false + poetry config virtualenvs.path ~/.virtualenvs + - name: Install Dependencies + run: poetry install -E aws -E azure -E gcp + if: steps.cache.outputs.cache-hit != 'true' + - id: 'auth' + uses: 'google-github-actions/auth@v1' + with: + credentials_json: '${{ secrets.GCP_CREDENTIALS_JSON }}' + - name: Log into Azure + uses: azure/login@v1 + with: + creds: '{"clientId":"${{ secrets.AZURE_CLIENT_ID }}","clientSecret":"${{ secrets.AZURE_CLIENT_SECRET }}","subscriptionId":"${{ secrets.AZURE_SUBSCRIPTION_ID }}","tenantId":"${{ secrets.AZURE_TENANT_ID }}"}' + - name: Skyplane init + run: | + poetry run skyplane config set gcp_service_account_name ${{ env.STRATEGY_UUID }} + poetry run skyplane config set native_cmd_enabled false + cat ~/.skyplane/config + poetry run skyplane init -y + poetry run skyplane config set usage_stats false + - name: Single small file test + run: poetry run python tests/integration/cp_local.py ${{ matrix.pairs }} --n-files 1 --file-size-mb 32 + - name: 128 small files test + run: poetry run python tests/integration/cp_local.py ${{ matrix.pairs }} --n-files 128 --file-size-mb 1 + - name: Single large file test + run: poetry run python tests/integration/cp_local.py ${{ matrix.pairs }} --n-files 1 --file-size-mb 2000 + - name: Cleanup GCP service account + if: always() + run: gcloud iam service-accounts delete ${{ env.STRATEGY_UUID }}@${{ secrets.GCP_PROJECT_ID }}.iam.gserviceaccount.com + deprovision: + runs-on: ubuntu-latest + if: ${{ always() }} + needs: [integration] + env: + STRATEGY_UUID: itest-d-${{ github.run_id }}-${{ github.run_attempt }} + steps: + - uses: actions/checkout@v1 + - name: Set up Python 3.10 + uses: actions/setup-python@v4 + with: + python-version: "3.10" + - name: Install skyplane from pypi + run: pip install skyplane[aws,azure,gcp] + - id: 'auth' + uses: 'google-github-actions/auth@v1' + with: + credentials_json: '${{ secrets.GCP_CREDENTIALS_JSON }}' + - name: Skyplane init + run: | + skyplane config set gcp_service_account_name ${{ env.STRATEGY_UUID }} + skyplane init -y --disable-config-azure + skyplane config set usage_stats false + - name: Deprovision + run: skyplane deprovision + - name: Cleanup GCP service account + if: always() + run: gcloud iam service-accounts delete --quiet ${{ env.STRATEGY_UUID }}@${{ secrets.GCP_PROJECT_ID }}.iam.gserviceaccount.com diff --git a/.github/workflows/integration-test-multiple-sizes.yml b/.github/workflows/integration-test-multiple-sizes.yml index bab896fe6..3afe5245e 100644 --- a/.github/workflows/integration-test-multiple-sizes.yml +++ b/.github/workflows/integration-test-multiple-sizes.yml @@ -7,28 +7,27 @@ env: SKYPLANE_USAGE_STATS_ENABLED: 0 jobs: integration: - if: (github.event_name == 'push' && github.ref == 'refs/heads/main') || (github.event_name == 'pull_request') runs-on: ubuntu-latest strategy: max-parallel: 1 matrix: pairs: # AWS to AWS - - aws:us-east-1 aws:us-east-1 - - aws:us-east-2 aws:us-west-2 + - aws:us-east-1 aws:us-east-1 --multipart + - aws:us-east-2 aws:us-west-2 --multipart # GCP to GCP - - gcp:us-central1-a gcp:us-central1-a - - gcp:us-west1-a gcp:us-east1-a + - gcp:us-central1-a gcp:us-central1-a --multipart + - gcp:us-west1-a gcp:us-east1-a --multipart # Azure to Azure - #- azure:westus azure:westus - #- azure:eastus azure:westus + - azure:westus azure:westus + - azure:eastus azure:westus # cross cloud tests - - aws:us-west-1 gcp:us-west2-a - - gcp:us-west2-a aws:us-west-1 - #- aws:us-west-1 azure:westus - #- azure:westus aws:us-west-1 - #- gcp:us-west2-a azure:westus - #- azure:westus gcp:us-west2-a + - aws:us-west-1 gcp:us-west2-a --multipart + - gcp:us-west2-a aws:us-west-1 --multipart + - aws:us-west-1 azure:westus + - azure:westus aws:us-west-1 + - gcp:us-west2-a azure:westus + - azure:westus gcp:us-west2-a timeout-minutes: 40 env: STRATEGY_UUID: itest-${{ github.run_id }}-${{ github.run_attempt }}-${{ strategy.job-index }} diff --git a/poetry.lock b/poetry.lock index 610f3aa08..2e489065b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -299,18 +299,18 @@ uvloop = ["uvloop (>=0.15.2)"] [[package]] name = "boto3" -version = "1.26.123" +version = "1.26.127" description = "The AWS SDK for Python" category = "main" optional = false python-versions = ">= 3.7" files = [ - {file = "boto3-1.26.123-py3-none-any.whl", hash = "sha256:2ccb1827dc9d654970375b9bbe1bd77eb6a50ec6fa76cf227a34b4bff144814f"}, - {file = "boto3-1.26.123.tar.gz", hash = "sha256:51722a3a791108236c8ce05e0ced030b13c08276001132d4153d29353038f8cc"}, + {file = "boto3-1.26.127-py3-none-any.whl", hash = "sha256:ded836536be41de9f9bd6a75e9feeb74d61b8c58ed7dc4ea89095082d7a616af"}, + {file = "boto3-1.26.127.tar.gz", hash = "sha256:ed31b2d35aad31418bd8093c5732c6296f785cc234333330df6b81396424d93b"}, ] [package.dependencies] -botocore = ">=1.29.123,<1.30.0" +botocore = ">=1.29.127,<1.30.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.6.0,<0.7.0" @@ -319,14 +319,14 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.29.123" +version = "1.29.127" description = "Low-level, data-driven core of boto 3." category = "main" optional = false python-versions = ">= 3.7" files = [ - {file = "botocore-1.29.123-py3-none-any.whl", hash = "sha256:88f743e1db5bb65e3770af674400642f1d09fca159a1508bdbb8f8cc691589ca"}, - {file = "botocore-1.29.123.tar.gz", hash = "sha256:b69cdf1c6f451ab1314a363bf97753dfbfe1da93d33853ed241edbdcb806867d"}, + {file = "botocore-1.29.127-py3-none-any.whl", hash = "sha256:cf41d871b2a17d40bd579ce44dace18c875659ac13139a66680540fdf6f1c304"}, + {file = "botocore-1.29.127.tar.gz", hash = "sha256:d2f9d00df16058cb4a3572a66bb1831e846e5aaa7c0a3033dd47f9a80e2dd58b"}, ] [package.dependencies] @@ -800,14 +800,14 @@ testing = ["pre-commit"] [[package]] name = "flask" -version = "2.2.4" +version = "2.2.5" description = "A simple framework for building complex web applications." category = "main" optional = true python-versions = ">=3.7" files = [ - {file = "Flask-2.2.4-py3-none-any.whl", hash = "sha256:13f6329ddbfff11340939cd11919daf150a01358ded4b7e81c03c055dfecb559"}, - {file = "Flask-2.2.4.tar.gz", hash = "sha256:77504c4c097f56ac5f29b00f9009213010cf9d2923a288c0e0564a5db2bb53d6"}, + {file = "Flask-2.2.5-py3-none-any.whl", hash = "sha256:58107ed83443e86067e41eff4631b058178191a355886f8e479e347fa1285fdf"}, + {file = "Flask-2.2.5.tar.gz", hash = "sha256:edee9b0a7ff26621bd5a8c10ff484ae28737a2410d99b0bb9a6850c7fb977aa0"}, ] [package.dependencies] @@ -1001,14 +1001,14 @@ protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.0 || >4.21.0,<4 [[package]] name = "google-cloud-storage" -version = "2.8.0" +version = "2.9.0" description = "Google Cloud Storage API client library" category = "main" optional = true python-versions = ">=3.7" files = [ - {file = "google-cloud-storage-2.8.0.tar.gz", hash = "sha256:4388da1ff5bda6d729f26dbcaf1bfa020a2a52a7b91f0a8123edbda51660802c"}, - {file = "google_cloud_storage-2.8.0-py2.py3-none-any.whl", hash = "sha256:248e210c13bc109909160248af546a91cb2dabaf3d7ebbf04def9dd49f02dbb6"}, + {file = "google-cloud-storage-2.9.0.tar.gz", hash = "sha256:9b6ae7b509fc294bdacb84d0f3ea8e20e2c54a8b4bbe39c5707635fec214eff3"}, + {file = "google_cloud_storage-2.9.0-py2.py3-none-any.whl", hash = "sha256:83a90447f23d5edd045e0037982c270302e3aeb45fc1288d2c2ca713d27bad94"}, ] [package.dependencies] @@ -2164,25 +2164,25 @@ testing = ["google-api-core[grpc] (>=1.31.5)"] [[package]] name = "protobuf" -version = "4.22.3" +version = "4.22.4" description = "" category = "main" optional = true python-versions = ">=3.7" files = [ - {file = "protobuf-4.22.3-cp310-abi3-win32.whl", hash = "sha256:8b54f56d13ae4a3ec140076c9d937221f887c8f64954673d46f63751209e839a"}, - {file = "protobuf-4.22.3-cp310-abi3-win_amd64.whl", hash = "sha256:7760730063329d42a9d4c4573b804289b738d4931e363ffbe684716b796bde51"}, - {file = "protobuf-4.22.3-cp37-abi3-macosx_10_9_universal2.whl", hash = "sha256:d14fc1a41d1a1909998e8aff7e80d2a7ae14772c4a70e4bf7db8a36690b54425"}, - {file = "protobuf-4.22.3-cp37-abi3-manylinux2014_aarch64.whl", hash = "sha256:70659847ee57a5262a65954538088a1d72dfc3e9882695cab9f0c54ffe71663b"}, - {file = "protobuf-4.22.3-cp37-abi3-manylinux2014_x86_64.whl", hash = "sha256:13233ee2b9d3bd9a5f216c1fa2c321cd564b93d8f2e4f521a85b585447747997"}, - {file = "protobuf-4.22.3-cp37-cp37m-win32.whl", hash = "sha256:ecae944c6c2ce50dda6bf76ef5496196aeb1b85acb95df5843cd812615ec4b61"}, - {file = "protobuf-4.22.3-cp37-cp37m-win_amd64.whl", hash = "sha256:d4b66266965598ff4c291416be429cef7989d8fae88b55b62095a2331511b3fa"}, - {file = "protobuf-4.22.3-cp38-cp38-win32.whl", hash = "sha256:f08aa300b67f1c012100d8eb62d47129e53d1150f4469fd78a29fa3cb68c66f2"}, - {file = "protobuf-4.22.3-cp38-cp38-win_amd64.whl", hash = "sha256:f2f4710543abec186aee332d6852ef5ae7ce2e9e807a3da570f36de5a732d88e"}, - {file = "protobuf-4.22.3-cp39-cp39-win32.whl", hash = "sha256:7cf56e31907c532e460bb62010a513408e6cdf5b03fb2611e4b67ed398ad046d"}, - {file = "protobuf-4.22.3-cp39-cp39-win_amd64.whl", hash = "sha256:e0e630d8e6a79f48c557cd1835865b593d0547dce221c66ed1b827de59c66c97"}, - {file = "protobuf-4.22.3-py3-none-any.whl", hash = "sha256:52f0a78141078077cfe15fe333ac3e3a077420b9a3f5d1bf9b5fe9d286b4d881"}, - {file = "protobuf-4.22.3.tar.gz", hash = "sha256:23452f2fdea754a8251d0fc88c0317735ae47217e0d27bf330a30eec2848811a"}, + {file = "protobuf-4.22.4-cp310-abi3-win32.whl", hash = "sha256:a4e661247896c2ffea4b894bca2d8657e752bedb8f3c66d7befa2557291be1e8"}, + {file = "protobuf-4.22.4-cp310-abi3-win_amd64.whl", hash = "sha256:7b42086d6027be2730151b49f27b2f5be40f3b036adf7b8da5917f4567f268c3"}, + {file = "protobuf-4.22.4-cp37-abi3-macosx_10_9_universal2.whl", hash = "sha256:4bfb28d48628deacdb66a95aaa7b6640f3dc82b4edd34db444c7a3cdd90b01fb"}, + {file = "protobuf-4.22.4-cp37-abi3-manylinux2014_aarch64.whl", hash = "sha256:e98e26328d7c668541d1052b02de4205b1094ef6b2ce57167440d3e39876db48"}, + {file = "protobuf-4.22.4-cp37-abi3-manylinux2014_x86_64.whl", hash = "sha256:8fd329e5dd7b6c4b878cab4b85bb6cec880e2adaf4e8aa2c75944dcbb05e1ff1"}, + {file = "protobuf-4.22.4-cp37-cp37m-win32.whl", hash = "sha256:b7728b5da9eee15c0aa3baaee79e94fa877ddcf7e3d2f34b1eab586cd26eea89"}, + {file = "protobuf-4.22.4-cp37-cp37m-win_amd64.whl", hash = "sha256:f4a711588c3a79b6f9c44af4d7f4a2ae868e27063654683932ab6462f90e9656"}, + {file = "protobuf-4.22.4-cp38-cp38-win32.whl", hash = "sha256:11b28b4e779d7f275e3ea0efa3938f4d4e8ed3ca818f9fec3b193f8e9ada99fd"}, + {file = "protobuf-4.22.4-cp38-cp38-win_amd64.whl", hash = "sha256:144d5b46df5e44f914f715accaadf88d617242ba5a40cacef4e8de7effa79954"}, + {file = "protobuf-4.22.4-cp39-cp39-win32.whl", hash = "sha256:5128b4d5efcaef92189e076077ae389700606ff81d2126b8361dc01f3e026197"}, + {file = "protobuf-4.22.4-cp39-cp39-win_amd64.whl", hash = "sha256:9537ae27d43318acf8ce27d0359fe28e6ebe4179c3350bc055bb60ff4dc4fcd3"}, + {file = "protobuf-4.22.4-py3-none-any.whl", hash = "sha256:3b21074b7fb748d8e123acaef9fa63a84fdc1436dc71199d2317b139f77dd6f4"}, + {file = "protobuf-4.22.4.tar.gz", hash = "sha256:21fbaef7f012232eb8d6cb8ba334e931fc6ff8570f5aaedc77d5b22a439aa909"}, ] [[package]] @@ -2527,21 +2527,21 @@ docs = ["Sphinx (>=3.3,<4.0)", "sphinx-autobuild (>=2020.9.1,<2021.0.0)", "sphin [[package]] name = "requests" -version = "2.29.0" +version = "2.30.0" description = "Python HTTP for Humans." category = "main" optional = false python-versions = ">=3.7" files = [ - {file = "requests-2.29.0-py3-none-any.whl", hash = "sha256:e8f3c9be120d3333921d213eef078af392fba3933ab7ed2d1cba3b56f2568c3b"}, - {file = "requests-2.29.0.tar.gz", hash = "sha256:f2e34a75f4749019bb0e3effb66683630e4ffeaf75819fb51bebef1bf5aef059"}, + {file = "requests-2.30.0-py3-none-any.whl", hash = "sha256:10e94cc4f3121ee6da529d358cdaeaff2f1c409cd377dbc72b825852f2f7e294"}, + {file = "requests-2.30.0.tar.gz", hash = "sha256:239d7d4458afcb28a692cdd298d87542235f4ca8d36d03a15bfc128a6559a2f4"}, ] [package.dependencies] certifi = ">=2017.4.17" charset-normalizer = ">=2,<4" idna = ">=2.5,<4" -urllib3 = ">=1.21.1,<1.27" +urllib3 = ">=1.21.1,<3" [package.extras] socks = ["PySocks (>=1.5.6,!=1.5.7)"] @@ -2603,14 +2603,14 @@ pyasn1 = ">=0.1.3" [[package]] name = "s3transfer" -version = "0.6.0" +version = "0.6.1" description = "An Amazon S3 Transfer Manager" category = "main" optional = false python-versions = ">= 3.7" files = [ - {file = "s3transfer-0.6.0-py3-none-any.whl", hash = "sha256:06176b74f3a15f61f1b4f25a1fc29a4429040b7647133a463da8fa5bd28d5ecd"}, - {file = "s3transfer-0.6.0.tar.gz", hash = "sha256:2ed07d3866f523cc561bf4a00fc5535827981b117dd7876f036b0c1aca42c947"}, + {file = "s3transfer-0.6.1-py3-none-any.whl", hash = "sha256:3c0da2d074bf35d6870ef157158641178a4204a6e689e82546083e31e0311346"}, + {file = "s3transfer-0.6.1.tar.gz", hash = "sha256:640bb492711f4c0c0905e1f62b6aaeb771881935ad27884852411f8e9cacbca9"}, ] [package.dependencies] @@ -2778,24 +2778,25 @@ files = [ [[package]] name = "typer" -version = "0.7.0" +version = "0.9.0" description = "Typer, build great CLIs. Easy to code. Based on Python type hints." category = "main" optional = false python-versions = ">=3.6" files = [ - {file = "typer-0.7.0-py3-none-any.whl", hash = "sha256:b5e704f4e48ec263de1c0b3a2387cd405a13767d2f907f44c1a08cbad96f606d"}, - {file = "typer-0.7.0.tar.gz", hash = "sha256:ff797846578a9f2a201b53442aedeb543319466870fbe1c701eab66dd7681165"}, + {file = "typer-0.9.0-py3-none-any.whl", hash = "sha256:5d96d986a21493606a358cae4461bd8cdf83cbf33a5aa950ae629ca3b51467ee"}, + {file = "typer-0.9.0.tar.gz", hash = "sha256:50922fd79aea2f4751a8e0408ff10d2662bd0c8bbfa84755a699f3bada2978b2"}, ] [package.dependencies] click = ">=7.1.1,<9.0.0" +typing-extensions = ">=3.7.4.3" [package.extras] -all = ["colorama (>=0.4.3,<0.5.0)", "rich (>=10.11.0,<13.0.0)", "shellingham (>=1.3.0,<2.0.0)"] +all = ["colorama (>=0.4.3,<0.5.0)", "rich (>=10.11.0,<14.0.0)", "shellingham (>=1.3.0,<2.0.0)"] dev = ["autoflake (>=1.3.1,<2.0.0)", "flake8 (>=3.8.3,<4.0.0)", "pre-commit (>=2.17.0,<3.0.0)"] doc = ["cairosvg (>=2.5.2,<3.0.0)", "mdx-include (>=1.4.1,<2.0.0)", "mkdocs (>=1.1.2,<2.0.0)", "mkdocs-material (>=8.1.4,<9.0.0)", "pillow (>=9.3.0,<10.0.0)"] -test = ["black (>=22.3.0,<23.0.0)", "coverage (>=6.2,<7.0)", "isort (>=5.0.6,<6.0.0)", "mypy (==0.910)", "pytest (>=4.4.0,<8.0.0)", "pytest-cov (>=2.10.0,<5.0.0)", "pytest-sugar (>=0.9.4,<0.10.0)", "pytest-xdist (>=1.32.0,<4.0.0)", "rich (>=10.11.0,<13.0.0)", "shellingham (>=1.3.0,<2.0.0)"] +test = ["black (>=22.3.0,<23.0.0)", "coverage (>=6.2,<7.0)", "isort (>=5.0.6,<6.0.0)", "mypy (==0.910)", "pytest (>=4.4.0,<8.0.0)", "pytest-cov (>=2.10.0,<5.0.0)", "pytest-sugar (>=0.9.4,<0.10.0)", "pytest-xdist (>=1.32.0,<4.0.0)", "rich (>=10.11.0,<14.0.0)", "shellingham (>=1.3.0,<2.0.0)"] [[package]] name = "typing-extensions" diff --git a/skyplane/api/dataplane.py b/skyplane/api/dataplane.py index 289857ff1..1b2003790 100644 --- a/skyplane/api/dataplane.py +++ b/skyplane/api/dataplane.py @@ -226,7 +226,7 @@ def copy_gateway_logs(self): # copy logs from all gateways in parallel def copy_log(instance): out_file = self.transfer_dir / f"gateway_{instance.uuid()}.stdout" - err_file = self.transfer_dir / f"gateway_{instance.uuid()}.stdout" + err_file = self.transfer_dir / f"gateway_{instance.uuid()}.stderr" logger.fs.info(f"[Dataplane.copy_gateway_logs] Copying logs from {instance.uuid()}: {out_file}") instance.run_command("sudo docker logs -t skyplane_gateway 2> /tmp/gateway.stderr > /tmp/gateway.stdout") instance.download_file("/tmp/gateway.stdout", out_file) diff --git a/skyplane/api/tracker.py b/skyplane/api/tracker.py index f481bf229..9d2c462d4 100644 --- a/skyplane/api/tracker.py +++ b/skyplane/api/tracker.py @@ -1,6 +1,5 @@ import functools from pprint import pprint -from pprint import pprint import json import time from abc import ABC @@ -222,7 +221,12 @@ def monitor_single_dst_helper(dst_region): job.finalize() except Exception as e: UsageClient.log_exception( - "finalize job", e, args, self.dataplane.src_region_tag, self.dataplane.dst_region_tag, session_start_timestamp_ms + "finalize job", + e, + args, + self.dataplane.topology.src_region_tag, + self.dataplane.topology.dest_region_tags[0], + session_start_timestamp_ms, ) raise e end_time = int(time.time()) @@ -308,7 +312,6 @@ def monitor_transfer(pd, self, region_tag): self.job_pending_chunk_ids[job_uuid][region_tag] = self.job_pending_chunk_ids[job_uuid][region_tag].difference( job_complete_chunk_ids ) - # sleep time.sleep(0.05) @@ -362,6 +365,7 @@ def query_bytes_remaining(self, region_tag: Optional[str] = None): ] ) logger.fs.debug(f"[TransferProgressTracker] Bytes remaining per job: {bytes_remaining_per_job}") + print(f"[TransferProgressTracker] Bytes remaining per job: {bytes_remaining_per_job}") return sum(bytes_remaining_per_job.values()) def query_bytes_dispatched(self): diff --git a/skyplane/api/transfer_job.py b/skyplane/api/transfer_job.py index f308d6a79..2183ac31a 100644 --- a/skyplane/api/transfer_job.py +++ b/skyplane/api/transfer_job.py @@ -1,4 +1,5 @@ import json +import time import typer import math import queue @@ -18,6 +19,7 @@ import urllib3 from rich import print as rprint +from functools import partial from skyplane import exceptions from skyplane.api.config import TransferConfig @@ -31,6 +33,7 @@ from skyplane.utils.definitions import MB from skyplane.utils.fn import do_parallel from skyplane.utils.path import parse_path +from skyplane.utils.retry import retry_backoff if TYPE_CHECKING: from skyplane.api.dataplane import Dataplane @@ -554,21 +557,25 @@ def http_pool(self): self._http_pool = urllib3.PoolManager(retries=urllib3.Retry(total=3)) return self._http_pool - def gen_transfer_pairs(self, chunker: Optional[Chunker] = None) -> Generator[TransferPair, None, None]: + def gen_transfer_pairs( + self, + chunker: Optional[Chunker] = None, + transfer_config: Optional[TransferConfig] = field(init=False, default_factory=lambda: TransferConfig()), + ) -> Generator[TransferPair, None, None]: """Generate transfer pairs for the transfer job. :param chunker: chunker that makes the chunk requests :type chunker: Chunker """ if chunker is None: # used for external access to transfer pair list - chunker = Chunker(self.src_iface, self.dst_ifaces, TransferConfig()) + chunker = Chunker(self.src_iface, self.dst_ifaces, transfer_config) # TODO: should read in existing transfer config yield from chunker.transfer_pair_generator(self.src_prefix, self.dst_prefixes, self.recursive, self._pre_filter_fn) def dispatch( self, dataplane: "Dataplane", - transfer_config: TransferConfig, dispatch_batch_size: int = 100, # 6.4 GB worth of chunks + transfer_config: Optional[TransferConfig] = field(init=False, default_factory=lambda: TransferConfig()), ) -> Generator[Chunk, None, None]: """Dispatch transfer job to specified gateways. @@ -665,7 +672,9 @@ def finalize(self): def complete_fn(batch): for req in batch: logger.fs.debug(f"Finalize upload id {req['upload_id']} for key {req['key']}") - obj_store_interface.complete_multipart_upload(req["key"], req["upload_id"]) + + # retry - sometimes slight delay before object store knows all parts are uploaded + retry_backoff(partial(obj_store_interface.complete_multipart_upload, req["key"], req["upload_id"]), initial_backoff=0.5) do_parallel(complete_fn, batches, n=8) @@ -733,14 +742,18 @@ def __init__( [not isinstance(iface, ObjectStoreInterface) for iface in self.dst_ifaces] ), "Destination must be a object store interface" - def gen_transfer_pairs(self, chunker: Optional[Chunker] = None) -> Generator[TransferPair, None, None]: + def gen_transfer_pairs( + self, + chunker: Optional[Chunker] = None, + transfer_config: Optional[TransferConfig] = field(init=False, default_factory=lambda: TransferConfig()), + ) -> Generator[TransferPair, None, None]: """Generate transfer pairs for the transfer job. :param chunker: chunker that makes the chunk requests :type chunker: Chunker """ if chunker is None: # used for external access to transfer pair list - chunker = Chunker(self.src_iface, self.dst_ifaces, TransferConfig()) + chunker = Chunker(self.src_iface, self.dst_ifaces, transfer_config) transfer_pair_gen = chunker.transfer_pair_generator(self.src_prefix, self.dst_prefixes, self.recursive, self._pre_filter_fn) # only single destination supported diff --git a/skyplane/cli/cli_transfer.py b/skyplane/cli/cli_transfer.py index 8c01036cb..637948fbc 100644 --- a/skyplane/cli/cli_transfer.py +++ b/skyplane/cli/cli_transfer.py @@ -10,7 +10,7 @@ import skyplane from skyplane.api.config import TransferConfig, AWSConfig, GCPConfig, AzureConfig, IBMCloudConfig -from skyplane.api.transfer_job import CopyJob +from skyplane.api.transfer_job import CopyJob, SyncJob, TransferJob from skyplane.cli.impl.cp_replicate_fallback import ( replicate_onprem_cp_cmd, replicate_onprem_sync_cmd, @@ -21,7 +21,7 @@ from skyplane.api.usage import UsageClient from skyplane.config import SkyplaneConfig from skyplane.config_paths import cloud_config, config_path -from skyplane.obj_store.object_store_interface import ObjectStoreInterface +from skyplane.obj_store.object_store_interface import ObjectStoreInterface, StorageInterface from skyplane.obj_store.file_system_interface import FileSystemInterface from skyplane.cli.impl.progress_bar import ProgressBarTransferHook from skyplane.utils import logger @@ -54,6 +54,14 @@ def __init__(self, src_region_tag: str, dst_region_tag: str, args: Dict[str, Any self.src_region_tag, self.dst_region_tag = src_region_tag, dst_region_tag self.args = args self.aws_config, self.azure_config, self.gcp_config, self.ibmcloud_config = self.to_api_config(skyplane_config or cloud_config) + + # update config + # TODO: set remaining config params + if skyplane_config: + skyplane_config.set_flag("multipart_enabled", str(self.args["multipart"])) + if cloud_config: + cloud_config.set_flag("multipart_enabled", str(self.args["multipart"])) + self.transfer_config = self.make_transfer_config(skyplane_config or cloud_config) self.client = skyplane.SkyplaneClient( aws_config=self.aws_config, @@ -190,15 +198,24 @@ def make_pipeline(self, **solver_args) -> skyplane.Pipeline: logger.fs.debug(f"Using pipeline: {pipeline}") return pipeline - def confirm_transfer(self, pipeline: skyplane.Pipeline, dp: skyplane.Dataplane, query_n: int = 5, ask_to_confirm_transfer=True) -> bool: + def confirm_transfer( + self, pipeline: skyplane.Pipeline, src_region_tag: str, dest_region_tags: List[str], query_n: int = 5, ask_to_confirm_transfer=True + ) -> bool: """Prompts the user to confirm their transfer by querying the first query_n files from the TransferJob""" if not len(pipeline.jobs_to_dispatch) > 0: typer.secho("No jobs to dispatch.") return False transfer_pair_gen = pipeline.jobs_to_dispatch[0].gen_transfer_pairs() # type: ignore - console.print( - f"[bold yellow]Will transfer objects from {dp.topology.src_region_tag} to {dp.topology.dest_region_tags}[/bold yellow]" - ) + if len(dest_region_tags) == 1: + console.print(f"[bold yellow]Will transfer objects from {src_region_tag} to {dest_region_tags[0]}[/bold yellow]") + else: + console.print(f"[bold yellow]Will transfer objects from {src_region_tag} to {dest_region_tags}[/bold yellow]") + + if src_region_tag.startswith("local") or dest_region_tags[0].startswith("local"): + # TODO: should still pass cost estimate + console.print(f"[yellow]Note: local transfers are not monitored by Skyplane[yellow]") + return True + topology = pipeline.planner.plan(pipeline.jobs_to_dispatch) sorted_counts = sorted(topology.per_region_count().items(), key=lambda x: x[0]) console.print( @@ -248,12 +265,11 @@ def confirm_transfer(self, pipeline: skyplane.Pipeline, dp: skyplane.Dataplane, console.print("[green]Transfer starting[/green]") return True - def estimate_small_transfer(self, pipeline: skyplane.Pipeline, size_threshold_bytes: float, query_n: int = 1000) -> bool: + def estimate_small_transfer(self, job: TransferJob, size_threshold_bytes: float, query_n: int = 1000) -> bool: """Estimates if the transfer is small by querying up to `query_n` files from the TransferJob. If it exceeds the file size limit, then it will fall back to the cloud CLIs.""" - if len(pipeline.jobs_to_dispatch) != 1: - return False - job = pipeline.jobs_to_dispatch[0] + + # TODO: why shouldn't this include sync? if not isinstance(job, CopyJob): return False transfer_pair_gen = job.gen_transfer_pairs() @@ -281,6 +297,114 @@ def force_deprovision(dp: skyplane.Dataplane): signal.signal(signal.SIGINT, s) +def run_transfer( + src: str, + dst: str, + recursive: bool, + debug: bool, + multipart: bool, + confirm: bool, + max_instances: int, + max_connections: int, + solver: str, + cmd: str, +): + assert cmd == "cp" or cmd == "sync", f"Invalid command: {cmd}" + if not debug: + register_exception_handler() + print_header() + + provider_src, bucket_src, path_src = parse_path(src) + provider_dst, bucket_dst, path_dst = parse_path(dst) + src_region_tag = StorageInterface.create(f"{provider_src}:infer", bucket_src).region_tag() + dst_region_tag = StorageInterface.create(f"{provider_dst}:infer", bucket_dst).region_tag() + args = { + "cmd": cmd, + "recursive": True, + "debug": debug, + "multipart": multipart, + "confirm": confirm, + "max_instances": max_instances, + "max_connections": max_connections, + "solver": solver, + } + + # create CLI object + cli = SkyplaneCLI(src_region_tag=src_region_tag, dst_region_tag=dst_region_tag, args=args) + if not cli.check_config(): + typer.secho( + f"Skyplane configuration file is not valid. Please reset your config by running `rm {config_path}` and then rerunning `skyplane init` to fix.", + fg="red", + ) + return 1 + + # create pipeline and queue transfer + pipeline = cli.make_pipeline(planning_algorithm=solver, max_instances=max_instances) + if cli.args["cmd"] == "cp": + pipeline.queue_copy(src, dst, recursive=recursive) + else: + pipeline.queue_sync(src, dst) + + # confirm transfer + if not cli.confirm_transfer(pipeline, src_region_tag, [dst_region_tag], 5, ask_to_confirm_transfer=not confirm): + return 1 + + # local->local transfers not supported (yet) + if provider_src == "local" and provider_dst == "local": + raise NotImplementedError("Local->local transfers not supported (yet)") + + # fall back options: local->cloud, cloud->local, small cloud->cloud transfers + if provider_src == "local" or provider_dst == "local": + if cli.args["cmd"] == "cp": + return 0 if cli.transfer_cp_onprem(src, dst, recursive) else 1 + else: + return 0 if cli.transfer_sync_onprem(src, dst) else 1 + elif cloud_config.get_flag("native_cmd_enabled"): + # fallback option: transfer is too small + if cli.args["cmd"] == "cp": + job = CopyJob(src, [dst], recursive=recursive) # TODO: rever to using pipeline + if cli.estimate_small_transfer(job, cloud_config.get_flag("native_cmd_threshold_gb") * GB): + small_transfer_status = cli.transfer_cp_small(src, dst, recursive) + return 0 if small_transfer_status else 1 + else: + job = SyncJob(src, [dst], recursive=recursive) + if cli.estimate_small_transfer(job, cloud_config.get_flag("native_cmd_threshold_gb") * GB): + small_transfer_status = cli.transfer_sync_small(src, dst) + return 0 if small_transfer_status else 1 + + # dataplane must be created after transfers are queued + dp = pipeline.create_dataplane(debug=debug) + with dp.auto_deprovision(): + try: + dp.provision(spinner=True) + dp.run(pipeline.jobs_to_dispatch, hooks=ProgressBarTransferHook(dp.topology.dest_region_tags)) + except KeyboardInterrupt: + logger.fs.warning("Transfer cancelled by user (KeyboardInterrupt).") + console.print("\n[red]Transfer cancelled by user. Copying gateway logs and exiting.[/red]") + try: + dp.copy_gateway_logs() + force_deprovision(dp) + except Exception as e: + logger.fs.exception(e) + console.print(f"[bright_black]{traceback.format_exc()}[/bright_black]") + console.print(e) + UsageClient.log_exception("cli_cp", e, args, cli.src_region_tag, cli.dst_region_tag) + console.print("[bold red]Deprovisioning was interrupted! VMs may still be running which will incur charges.[/bold red]") + console.print("[bold red]Please manually deprovision the VMs by running `skyplane deprovision`.[/bold red]") + return 1 + except skyplane.exceptions.SkyplaneException as e: + console.print(f"[bright_black]{traceback.format_exc()}[/bright_black]") + console.print(e.pretty_print_str()) + UsageClient.log_exception("cli_query_objstore", e, args, cli.src_region_tag, cli.dst_region_tag) + force_deprovision(dp) + except Exception as e: + logger.fs.exception(e) + console.print(f"[bright_black]{traceback.format_exc()}[/bright_black]") + console.print(e) + UsageClient.log_exception("cli_query_objstore", e, args, cli.src_region_tag, cli.dst_region_tag) + force_deprovision(dp) + + def cp( src: str, dst: str, @@ -326,117 +450,7 @@ def cp( :param solver: The solver to use for the transfer (default: direct) :type solver: str """ - if not debug: - register_exception_handler() - print_header() - provider_src, bucket_src, path_src = parse_path(src) - provider_dst, bucket_dst, path_dst = parse_path(dst) - if provider_src in ("local", "nfs"): - src_region_tag = FileSystemInterface.create(f"{provider_src}:infer", path_src).region_tag() - else: - src_region_tag = ObjectStoreInterface.create(f"{provider_src}:infer", bucket_src).region_tag() - - if provider_dst in ("local", "nfs"): - dst_region_tag = FileSystemInterface.create(f"{provider_dst}:infer", path_dst).region_tag() - else: - dst_region_tag = ObjectStoreInterface.create(f"{provider_dst}:infer", bucket_dst).region_tag() - - args = { - "cmd": "cp", - "recursive": recursive, - "debug": debug, - "multipart": multipart, - "confirm": confirm, - "max_instances": max_instances, - "max_connections": max_connections, - "solver": solver, - } - - cli = SkyplaneCLI(src_region_tag=src_region_tag, dst_region_tag=dst_region_tag, args=args) - if not cli.check_config(): - typer.secho( - f"Skyplane configuration file is not valid. Please reset your config by running `rm {config_path}` and then rerunning `skyplane init` to fix.", - fg="red", - ) - return 1 - - # dp = cli.make_dataplane( - # solver_type=solver, - # n_vms=max_instances, - # n_connections=max_connections, - # solver_required_throughput_gbits=solver_required_throughput_gbits, - # debug=debug, - # ) - pipeline = cli.make_pipeline(planning_algorithm=solver, max_instances=max_instances) - pipeline.queue_copy(src, dst, recursive=recursive) - - # dataplane must be created after transfers are queued - dp = pipeline.create_dataplane(debug=debug) - - if provider_src in ("local", "nfs") and provider_dst in ("aws", "gcp", "azure"): - # manually create dataplane for queued transfer - with dp.auto_deprovision(): - try: - if not cli.confirm_transfer(pipeline, dp, 5, ask_to_confirm_transfer=not confirm): - return 1 - dp.provision(spinner=True) - dp.run(pipeline.jobs_to_dispatch, hooks=ProgressBarTransferHook(dp.topology.dest_region_tags)) - except skyplane.exceptions.SkyplaneException as e: - console.print(f"[bright_black]{traceback.format_exc()}[/bright_black]") - console.print(e.pretty_print_str()) - UsageClient.log_exception("cli_query_objstore", e, args, src_region_tag, dst_region_tag) - return 1 - # return 0 if cli.transfer_cp_onprem(src, dst, recursive) else 1 - elif provider_src in ("aws", "gcp", "azure", "hdfs", "ibmcloud") and provider_dst in ("aws", "gcp", "azure", "ibmcloud"): - # todo support ILP solver params - # dp = cli.make_dataplane( - # solver_type=solver, - # solver_required_throughput_gbits=solver_required_throughput_gbits, - # n_vms=max_instances, - # n_connections=max_connections, - # debug=debug, - # ) - with dp.auto_deprovision(): - # dp.queue_copy(src, dst, recursive=recursive) - if cloud_config.get_flag("native_cmd_enabled") and cli.estimate_small_transfer( - pipeline, cloud_config.get_flag("native_cmd_threshold_gb") * GB - ): - small_transfer_status = cli.transfer_cp_small(src, dst, recursive) - if small_transfer_status: - return 0 - try: - if not cli.confirm_transfer(pipeline, dp, 5, ask_to_confirm_transfer=not confirm): - return 1 - dp.provision(spinner=True) - # dp.run(ProgressBarTransferHook()) - dp.run(pipeline.jobs_to_dispatch, hooks=ProgressBarTransferHook(dp.topology.dest_region_tags)) - except KeyboardInterrupt: - logger.fs.warning("Transfer cancelled by user (KeyboardInterrupt).") - console.print("\n[red]Transfer cancelled by user. Copying gateway logs and exiting.[/red]") - dp.copy_gateway_logs() - try: - force_deprovision(dp) - except Exception as e: - logger.fs.exception(e) - console.print(f"[bright_black]{traceback.format_exc()}[/bright_black]") - console.print(e) - UsageClient.log_exception("cli_cp", e, args, cli.src_region_tag, cli.dst_region_tag) - console.print("[bold red]Deprovisioning was interrupted! VMs may still be running which will incur charges.[/bold red]") - console.print("[bold red]Please manually deprovision the VMs by running `skyplane deprovision`.[/bold red]") - return 1 - except skyplane.exceptions.SkyplaneException as e: - console.print(f"[bright_black]{traceback.format_exc()}[/bright_black]") - console.print(e.pretty_print_str()) - UsageClient.log_exception("cli_query_objstore", e, args, cli.src_region_tag, cli.dst_region_tag) - force_deprovision(dp) - except Exception as e: - logger.fs.exception(e) - console.print(f"[bright_black]{traceback.format_exc()}[/bright_black]") - console.print(e) - UsageClient.log_exception("cli_query_objstore", e, args, cli.src_region_tag, cli.dst_region_tag) - force_deprovision(dp) - if dp.provisioned: - typer.secho("Dataplane is not deprovisioned! Run `skyplane deprovision` to force deprovision VMs.", fg="red") + return run_transfer(src, dst, recursive, debug, multipart, confirm, max_instances, max_connections, solver, "cp") def sync( @@ -485,82 +499,4 @@ def sync( :param solver: The solver to use for the transfer (default: direct) :type solver: str """ - if not debug: - register_exception_handler() - print_header() - provider_src, bucket_src, path_src = parse_path(src) - provider_dst, bucket_dst, path_dst = parse_path(dst) - src_region_tag = ObjectStoreInterface.create(f"{provider_src}:infer", bucket_src).region_tag() - dst_region_tag = ObjectStoreInterface.create(f"{provider_dst}:infer", bucket_dst).region_tag() - args = { - "cmd": "sync", - "recursive": True, - "debug": debug, - "multipart": multipart, - "confirm": confirm, - "max_instances": max_instances, - "max_connections": max_connections, - "solver": solver, - } - - cli = SkyplaneCLI(src_region_tag=src_region_tag, dst_region_tag=dst_region_tag, args=args) - if not cli.check_config(): - typer.secho( - f"Skyplane configuration file is not valid. Please reset your config by running `rm {config_path}` and then rerunning `skyplane init` to fix.", - fg="red", - ) - return 1 - - if provider_src in ("local", "hdfs", "nfs") or provider_dst in ("local", "hdfs", "nfs"): - if provider_src == "hdfs" or provider_dst == "hdfs": - typer.secho("HDFS is not supported yet.", fg="red") - return 1 - return 0 if cli.transfer_sync_onprem(src, dst) else 1 - elif provider_src in ("aws", "gcp", "azure") and provider_dst in ("aws", "gcp", "azure"): - # todo support ILP solver params - print() - pipeline = cli.make_pipeline(planning_algorithm=solver, max_instances=max_instances) - pipeline.queue_sync(src, dst) - - dp = pipeline.create_dataplane(debug=True) - - with dp.auto_deprovision(): - if cloud_config.get_flag("native_cmd_enabled") and cli.estimate_small_transfer( - pipeline, cloud_config.get_flag("native_cmd_threshold_gb") * GB - ): - small_transfer_status = cli.transfer_sync_small(src, dst) - if small_transfer_status: - return 0 - try: - console.print("[yellow]Note: sync must query the destination bucket to diff objects. This may take a while.[/yellow]") - if not cli.confirm_transfer(pipeline, dp, 5, ask_to_confirm_transfer=not confirm): - return 1 - dp.provision(spinner=True) - dp.run(pipeline.jobs_to_dispatch, hooks=ProgressBarTransferHook(dp.topology.dest_region_tags)) - except KeyboardInterrupt: - logger.fs.warning("Transfer cancelled by user (KeyboardInterrupt).") - console.print("\n[red]Transfer cancelled by user. Copying gateway logs and exiting.[/red]") - dp.copy_gateway_logs() - try: - force_deprovision(dp) - except Exception as e: - logger.fs.exception(e) - console.print(f"[bright_black]{traceback.format_exc()}[/bright_black]") - console.print(e) - UsageClient.log_exception("cli_cp", e, args, cli.src_region_tag, cli.dst_region_tag) - console.print("[bold red]Deprovisioning was interrupted! VMs may still be running which will incur charges.[/bold red]") - console.print("[bold red]Please manually deprovision the VMs by running `skyplane deprovision`.[/bold red]") - return 1 - except skyplane.exceptions.SkyplaneException as e: - console.print(f"[bright_black]{traceback.format_exc()}[/bright_black]") - console.print(e.pretty_print_str()) - UsageClient.log_exception("cli_query_objstore", e, args, cli.src_region_tag, cli.dst_region_tag) - return 1 - except Exception as e: - logger.fs.exception(e) - console.print(f"[bright_black]{traceback.format_exc()}[/bright_black]") - console.print(e) - UsageClient.log_exception("cli_query_objstore", e, args, cli.src_region_tag, cli.dst_region_tag) - force_deprovision(dp) - if dp.provisioned: - typer.secho("Dataplane is not deprovisioned! Run `skyplane deprovision` to force deprovision VMs.", fg="red") + return run_transfer(src, dst, False, debug, multipart, confirm, max_instances, max_connections, solver, "sync") diff --git a/skyplane/gateway/gateway_daemon_api.py b/skyplane/gateway/gateway_daemon_api.py index c31cd1892..94fe5e7a4 100644 --- a/skyplane/gateway/gateway_daemon_api.py +++ b/skyplane/gateway/gateway_daemon_api.py @@ -144,7 +144,16 @@ def pull_chunk_status_queue(self, timeout=0.5): # else: # print(f"[gateway_api] chunk {chunk_id}: after {handle} state = {elem['state']}") - self.chunk_status_log.append(elem) + + # only update chunk status log with terminal operators + # otherwise, the client needs to filter chunk updates depending on whether the operator is terminal or not + # this would require us to inform the client about the terminal operators, whcih seems annoying (though doable) + # we can change this if we need to profile the chunk status progression through the DAG in detail + if elem["state"] == ChunkState.complete.name: + if handle in self.terminal_operators[elem["partition"]]: + self.chunk_status_log.append(elem) + else: + self.chunk_status_log.append(elem) def run(self): self.server.serve_forever() diff --git a/skyplane/gateway/operators/gateway_operator.py b/skyplane/gateway/operators/gateway_operator.py index 2069338e2..f76823b53 100644 --- a/skyplane/gateway/operators/gateway_operator.py +++ b/skyplane/gateway/operators/gateway_operator.py @@ -568,5 +568,7 @@ def process(self, chunk_req: ChunkRequest): ), max_retries=1, ) - logger.debug(f"[obj_store:{self.worker_id}] Uploaded {chunk_req.chunk.chunk_id} to {self.bucket_name}") + logger.debug( + f"[obj_store:{self.worker_id}] Uploaded {chunk_req.chunk.chunk_id} partition {chunk_req.chunk.part_number} to {self.bucket_name}" + ) return True diff --git a/skyplane/obj_store/file_system_interface.py b/skyplane/obj_store/file_system_interface.py index cd37b2b83..6f59ec67e 100644 --- a/skyplane/obj_store/file_system_interface.py +++ b/skyplane/obj_store/file_system_interface.py @@ -1,6 +1,7 @@ from dataclasses import dataclass from typing import Iterator, List, Optional from skyplane.obj_store.storage_interface import StorageInterface +import os @dataclass @@ -24,6 +25,9 @@ def real_path(self): class FileSystemInterface(StorageInterface): + def region_tag(self) -> str: + return "local" + def path(self) -> str: raise NotImplementedError() diff --git a/skyplane/obj_store/posix_file_interface.py b/skyplane/obj_store/posix_file_interface.py index cb3e61dd2..c5a3656dd 100644 --- a/skyplane/obj_store/posix_file_interface.py +++ b/skyplane/obj_store/posix_file_interface.py @@ -54,10 +54,10 @@ def exists(self, obj_name: str): return os.path.exists(obj_name) def region_tag(self) -> str: - return "gcp:us-central1-a" + return "local" def bucket(self) -> str: - return "" + return self.dir_path def create_bucket(self, region_tag: str): return None diff --git a/skyplane/obj_store/s3_interface.py b/skyplane/obj_store/s3_interface.py index 07c74af5a..55b5493f0 100644 --- a/skyplane/obj_store/s3_interface.py +++ b/skyplane/obj_store/s3_interface.py @@ -86,14 +86,13 @@ def create_bucket(self, aws_region): else: s3_client.create_bucket(Bucket=self.bucket_name, CreateBucketConfiguration={"LocationConstraint": aws_region}) else: - print("bucket already exists", aws_region, self.bucket_name) + logger.warning(f"Bucket {self.bucket} in region {aws_region} already exists") def delete_bucket(self): # delete 1000 keys at a time for batch in batch_generator(self.list_objects(), 1000): self.delete_objects([obj.key for obj in batch]) assert len(list(self.list_objects())) == 0, f"Bucket not empty after deleting all keys {list(self.list_objects())}" - # delete bucket self._s3_client().delete_bucket(Bucket=self.bucket_name) diff --git a/skyplane/obj_store/storage_interface.py b/skyplane/obj_store/storage_interface.py index 8db1ce097..84d8e8a19 100644 --- a/skyplane/obj_store/storage_interface.py +++ b/skyplane/obj_store/storage_interface.py @@ -56,5 +56,10 @@ def create(region_tag: str, bucket: str): logger.fs.debug(f"attempting to create hdfs bucket {bucket}") return HDFSInterface(host=bucket) + elif region_tag.startswith("local"): + # from skyplane.obj_store.file_system_interface import FileSystemInterface + from skyplane.obj_store.posix_file_interface import POSIXInterface + + return POSIXInterface(bucket) else: raise ValueError(f"Invalid region_tag {region_tag} - could not create interface") diff --git a/skyplane/utils/path.py b/skyplane/utils/path.py index 4334d5001..ea23e781b 100644 --- a/skyplane/utils/path.py +++ b/skyplane/utils/path.py @@ -59,4 +59,6 @@ def is_plausible_local_path(path_test: str): else: if not is_plausible_local_path(path): logger.warning(f"Local path '{path}' does not exist") - return "local", None, path + + # path is subsitutute for bucket + return "local", path, path diff --git a/tests/integration/cp.py b/tests/integration/cp.py index 3250ab822..689f9f90f 100644 --- a/tests/integration/cp.py +++ b/tests/integration/cp.py @@ -37,7 +37,7 @@ def setup_buckets(src_region, dest_region, n_files=1, file_size_mb=1): return src_bucket_name, dest_bucket_name, src_prefix, dest_prefix -def run(src_region, dest_region, n_files=1, file_size_mb=1, multipart=True): +def run(src_region, dest_region, n_files=1, file_size_mb=1, multipart=False): logger.info( f"Running skyplane cp integration test with config " + f"src_region={src_region}, " diff --git a/tests/integration/cp_local.py b/tests/integration/cp_local.py new file mode 100644 index 000000000..fca726c55 --- /dev/null +++ b/tests/integration/cp_local.py @@ -0,0 +1,149 @@ +import argparse +import time +import os +import tempfile +import uuid +from skyplane.utils.definitions import MB +from skyplane.obj_store.object_store_interface import ObjectStoreInterface +from skyplane.cli.cli import cp, sync +from skyplane.utils import logger + + +def setup_buckets(region, n_files=1, file_size_mb=1, write=False): + provider, zone = region.split(":") + if provider == "azure": + bucket_name = f"integration{zone}/{str(uuid.uuid4()).replace('-', '')}" + else: + bucket_name = f"integration{zone}-{str(uuid.uuid4())[:8]}" + logger.debug(f"creating buckets {bucket_name}") + iface = ObjectStoreInterface.create(region, bucket_name) + iface.create_bucket(zone) + + prefix = f"{uuid.uuid4()}" + if write: + with tempfile.NamedTemporaryFile() as tmp: + fpath = tmp.name + with open(fpath, "wb+") as f: + f.write(os.urandom(int(file_size_mb * MB))) + for i in range(n_files): + iface.upload_object(fpath, f"{prefix}/{i}", mime_type="text/plain") + + return iface, bucket_name, prefix + + +def run(src_region, dest_region, n_files=1, file_size_mb=1, multipart=False): + logger.info( + f"Running skyplane [cp/sync] integration test with config " + + f"src_region={src_region}, " + + f"dest_region={dest_region}, " + + f"n_files={n_files}, " + + f"file_size_mb={file_size_mb}, " + + f"multipart={multipart}" + ) + + # map region to path + def map_path(region, bucket, prefix): + provider, _ = region.split(":") + if provider == "aws": + return f"s3://{bucket}/{prefix}" + elif provider == "azure": + storage_account, container = bucket.split("/") + return f"https://{storage_account}.blob.core.windows.net/{container}/{prefix}" + elif provider == "gcp": + return f"gs://{bucket}/{prefix}" + else: + raise Exception(f"Unknown provider {provider}") + + # create temporary files + for mode in ["sync", "cp"]: + return_code = 0 + if n_files == 1 and mode == "cp": + tmp = tempfile.NamedTemporaryFile() + fpath = tmp.name + with open(fpath, "wb+") as f: + f.write(os.urandom(int(file_size_mb * MB))) + elif n_files > 1: # create directory + tmp = tempfile.TemporaryDirectory() + fpath = tmp.name + for i in range(n_files): + with open(f"{fpath}/{i}", "wb+") as f: + f.write(os.urandom(int(file_size_mb * MB))) + else: + continue + + if src_region == "local": + iface, bucket_name, prefix = setup_buckets(dest_region) + if mode == "cp": + return_code = cp( + fpath, + map_path(dest_region, bucket_name, prefix), + recursive=(n_files > 1), + debug=False, + multipart=multipart, + confirm=True, + max_instances=1, + max_connections=1, + solver="direct", + solver_required_throughput_gbits=1, + ) + elif n_files > 1: + return_code = sync( + fpath, + map_path(dest_region, bucket_name, prefix), + debug=False, + multipart=multipart, + confirm=True, + max_instances=1, + max_connections=1, + solver="direct", + solver_required_throughput_gbits=1, + ) + elif dest_region == "local": + iface, bucket_name, prefix = setup_buckets(src_region, n_files=n_files, file_size_mb=file_size_mb, write=True) + if mode == "cp": + return_code = cp( + map_path(src_region, bucket_name, prefix), + fpath, + recursive=True, + debug=False, + multipart=multipart, + confirm=True, + max_instances=1, + max_connections=1, + solver="direct", + solver_required_throughput_gbits=1, + ) + elif n_files > 1: + return_code = sync( + map_path(src_region, bucket_name, prefix), + fpath, + debug=False, + multipart=multipart, + confirm=True, + max_instances=1, + max_connections=1, + solver="direct", + solver_required_throughput_gbits=1, + ) + else: + raise ValueError("This script only tests local transfers") + + iface.delete_bucket() + + if return_code > 0: + return return_code + + return return_code + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("src", help="source region") + parser.add_argument("dest", help="destination region") + parser.add_argument("--n-files", type=int, default=1) + parser.add_argument("--file-size-mb", type=int, default=1) + parser.add_argument("--multipart", action="store_true") + args = parser.parse_args() + + return_code = run(args.src, args.dest, n_files=args.n_files, file_size_mb=args.file_size_mb, multipart=args.multipart) + exit(return_code) diff --git a/tests/unit_nocloud/test_common.py b/tests/unit_nocloud/test_common.py index c66df90a3..bdc377c71 100644 --- a/tests/unit_nocloud/test_common.py +++ b/tests/unit_nocloud/test_common.py @@ -3,9 +3,9 @@ def test_parse_path(): # test local - assert parse_path("/") == ("local", None, "/") - assert parse_path("/tmp") == ("local", None, "/tmp") - assert parse_path("does-not-exist-0000000/file") == ("local", None, "does-not-exist-0000000/file") + assert parse_path("/") == ("local", "/", "/") + assert parse_path("/tmp") == ("local", "/tmp", "/tmp") + assert parse_path("does-not-exist-0000000/file") == ("local", "does-not-exist-0000000/file", "does-not-exist-0000000/file") # test s3:// assert parse_path("s3://bucket") == ("aws", "bucket", "")