From 3a0f551436b659afb2208fd558ddb846f4d62d98 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 9 Oct 2023 15:18:48 -0700 Subject: [PATCH] docs: Add snippets for upload_chunks_concurrently and add chunk_size (#1135) * docs: Add snippets for upload_chunks_concurrently and add chunk_size * switch from 'processes' to 'workers' in sample nomenclature * copyright * tests --- samples/snippets/snippets_test.py | 61 ++++++++++++++----- ...torage_transfer_manager_download_bucket.py | 9 +-- ...er_manager_download_chunks_concurrently.py | 20 ++++-- .../storage_transfer_manager_download_many.py | 9 +-- ...sfer_manager_upload_chunks_concurrently.py | 57 +++++++++++++++++ ...orage_transfer_manager_upload_directory.py | 9 +-- .../storage_transfer_manager_upload_many.py | 9 +-- 7 files changed, 140 insertions(+), 34 deletions(-) create mode 100644 samples/snippets/storage_transfer_manager_upload_chunks_concurrently.py diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index 2da7bb94c..8014411e8 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -75,6 +75,7 @@ import storage_transfer_manager_download_bucket import storage_transfer_manager_download_chunks_concurrently import storage_transfer_manager_download_many +import storage_transfer_manager_upload_chunks_concurrently import storage_transfer_manager_upload_directory import storage_transfer_manager_upload_many import storage_upload_file @@ -243,7 +244,10 @@ def test_upload_blob_with_kms(test_bucket): with tempfile.NamedTemporaryFile() as source_file: source_file.write(b"test") storage_upload_with_kms_key.upload_blob_with_kms( - test_bucket.name, source_file.name, blob_name, KMS_KEY, + test_bucket.name, + source_file.name, + blob_name, + KMS_KEY, ) bucket = storage.Client().bucket(test_bucket.name) kms_blob = bucket.get_blob(blob_name) @@ -396,7 +400,10 @@ def test_move_blob(test_bucket_create, test_blob): print(f"test_move_blob not found in bucket {test_bucket_create.name}") storage_move_file.move_blob( - bucket.name, test_blob.name, test_bucket_create.name, "test_move_blob", + bucket.name, + test_blob.name, + test_bucket_create.name, + "test_move_blob", ) assert test_bucket_create.get_blob("test_move_blob") is not None @@ -412,7 +419,10 @@ def test_copy_blob(test_blob): pass storage_copy_file.copy_blob( - bucket.name, test_blob.name, bucket.name, "test_copy_blob", + bucket.name, + test_blob.name, + bucket.name, + "test_copy_blob", ) assert bucket.get_blob("test_copy_blob") is not None @@ -551,7 +561,10 @@ def test_define_bucket_website_configuration(test_bucket): def test_object_get_kms_key(test_bucket): with tempfile.NamedTemporaryFile() as source_file: storage_upload_with_kms_key.upload_blob_with_kms( - test_bucket.name, source_file.name, "test_upload_blob_encrypted", KMS_KEY, + test_bucket.name, + source_file.name, + "test_upload_blob_encrypted", + KMS_KEY, ) kms_key = storage_object_get_kms_key.object_get_kms_key( test_bucket.name, "test_upload_blob_encrypted" @@ -568,7 +581,10 @@ def test_storage_compose_file(test_bucket): with tempfile.NamedTemporaryFile() as dest_file: destination = storage_compose_file.compose_file( - test_bucket.name, source_files[0], source_files[1], dest_file.name, + test_bucket.name, + source_files[0], + source_files[1], + dest_file.name, ) composed = destination.download_as_string() @@ -608,7 +624,8 @@ def test_change_default_storage_class(test_bucket, capsys): def test_change_file_storage_class(test_blob, capsys): blob = storage_change_file_storage_class.change_file_storage_class( - test_blob.bucket.name, test_blob.name, + test_blob.bucket.name, + test_blob.name, ) out, _ = capsys.readouterr() assert f"Blob {blob.name} in bucket {blob.bucket.name}" in out @@ -694,7 +711,7 @@ def test_transfer_manager_snippets(test_bucket, capsys): test_bucket.name, BLOB_NAMES, source_directory="{}/".format(uploads), - processes=8, + workers=8, ) out, _ = capsys.readouterr() @@ -706,7 +723,7 @@ def test_transfer_manager_snippets(test_bucket, capsys): storage_transfer_manager_download_bucket.download_bucket_with_transfer_manager( test_bucket.name, destination_directory=os.path.join(downloads, ""), - processes=8, + workers=8, max_results=10000, ) out, _ = capsys.readouterr() @@ -720,7 +737,7 @@ def test_transfer_manager_snippets(test_bucket, capsys): test_bucket.name, blob_names=BLOB_NAMES, destination_directory=os.path.join(downloads, ""), - processes=8, + workers=8, ) out, _ = capsys.readouterr() @@ -763,9 +780,7 @@ def test_transfer_manager_download_chunks_concurrently(test_bucket, capsys): with tempfile.NamedTemporaryFile() as file: file.write(b"test") - storage_upload_file.upload_blob( - test_bucket.name, file.name, BLOB_NAME - ) + storage_upload_file.upload_blob(test_bucket.name, file.name, BLOB_NAME) with tempfile.TemporaryDirectory() as downloads: # Download the file. @@ -773,8 +788,26 @@ def test_transfer_manager_download_chunks_concurrently(test_bucket, capsys): test_bucket.name, BLOB_NAME, os.path.join(downloads, BLOB_NAME), - processes=8, + workers=8, ) out, _ = capsys.readouterr() - assert "Downloaded {} to {}".format(BLOB_NAME, os.path.join(downloads, BLOB_NAME)) in out + assert ( + "Downloaded {} to {}".format(BLOB_NAME, os.path.join(downloads, BLOB_NAME)) + in out + ) + + +def test_transfer_manager_upload_chunks_concurrently(test_bucket, capsys): + BLOB_NAME = "test_file.txt" + + with tempfile.NamedTemporaryFile() as file: + file.write(b"test") + file.flush() + + storage_transfer_manager_upload_chunks_concurrently.upload_chunks_concurrently( + test_bucket.name, file.name, BLOB_NAME + ) + + out, _ = capsys.readouterr() + assert "File {} uploaded to {}".format(file.name, BLOB_NAME) in out diff --git a/samples/snippets/storage_transfer_manager_download_bucket.py b/samples/snippets/storage_transfer_manager_download_bucket.py index 4f21ee6e9..5d94a67ae 100644 --- a/samples/snippets/storage_transfer_manager_download_bucket.py +++ b/samples/snippets/storage_transfer_manager_download_bucket.py @@ -14,7 +14,7 @@ # [START storage_transfer_manager_download_bucket] def download_bucket_with_transfer_manager( - bucket_name, destination_directory="", processes=8, max_results=1000 + bucket_name, destination_directory="", workers=8, max_results=1000 ): """Download all of the blobs in a bucket, concurrently in a process pool. @@ -40,8 +40,9 @@ def download_bucket_with_transfer_manager( # The maximum number of processes to use for the operation. The performance # impact of this value depends on the use case, but smaller files usually # benefit from a higher number of processes. Each additional process occupies - # some CPU and memory resources until finished. - # processes=8 + # some CPU and memory resources until finished. Threads can be used instead + # of processes by passing `worker_type=transfer_manager.THREAD`. + # workers=8 # The maximum number of results to fetch from bucket.list_blobs(). This # sample code fetches all of the blobs up to max_results and queues them all @@ -60,7 +61,7 @@ def download_bucket_with_transfer_manager( blob_names = [blob.name for blob in bucket.list_blobs(max_results=max_results)] results = transfer_manager.download_many_to_path( - bucket, blob_names, destination_directory=destination_directory, max_workers=processes + bucket, blob_names, destination_directory=destination_directory, max_workers=workers ) for name, result in zip(blob_names, results): diff --git a/samples/snippets/storage_transfer_manager_download_chunks_concurrently.py b/samples/snippets/storage_transfer_manager_download_chunks_concurrently.py index 9ddec094e..b6ac9982d 100644 --- a/samples/snippets/storage_transfer_manager_download_chunks_concurrently.py +++ b/samples/snippets/storage_transfer_manager_download_chunks_concurrently.py @@ -13,7 +13,9 @@ # limitations under the License. # [START storage_transfer_manager_download_chunks_concurrently] -def download_chunks_concurrently(bucket_name, blob_name, filename, processes=8): +def download_chunks_concurrently( + bucket_name, blob_name, filename, chunk_size=32 * 1024 * 1024, workers=8 +): """Download a single file in chunks, concurrently in a process pool.""" # The ID of your GCS bucket @@ -25,11 +27,17 @@ def download_chunks_concurrently(bucket_name, blob_name, filename, processes=8): # The destination filename or path # filename = "" + # The size of each chunk. The performance impact of this value depends on + # the use case. The remote service has a minimum of 5 MiB and a maximum of + # 5 GiB. + # chunk_size = 32 * 1024 * 1024 (32 MiB) + # The maximum number of processes to use for the operation. The performance # impact of this value depends on the use case, but smaller files usually # benefit from a higher number of processes. Each additional process occupies - # some CPU and memory resources until finished. - # processes=8 + # some CPU and memory resources until finished. Threads can be used instead + # of processes by passing `worker_type=transfer_manager.THREAD`. + # workers=8 from google.cloud.storage import Client, transfer_manager @@ -37,7 +45,11 @@ def download_chunks_concurrently(bucket_name, blob_name, filename, processes=8): bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_name) - transfer_manager.download_chunks_concurrently(blob, filename, max_workers=processes) + transfer_manager.download_chunks_concurrently( + blob, filename, chunk_size=chunk_size, max_workers=workers + ) print("Downloaded {} to {}.".format(blob_name, filename)) + + # [END storage_transfer_manager_download_chunks_concurrently] diff --git a/samples/snippets/storage_transfer_manager_download_many.py b/samples/snippets/storage_transfer_manager_download_many.py index 500eea1ce..02cb9b887 100644 --- a/samples/snippets/storage_transfer_manager_download_many.py +++ b/samples/snippets/storage_transfer_manager_download_many.py @@ -14,7 +14,7 @@ # [START storage_transfer_manager_download_many] def download_many_blobs_with_transfer_manager( - bucket_name, blob_names, destination_directory="", processes=8 + bucket_name, blob_names, destination_directory="", workers=8 ): """Download blobs in a list by name, concurrently in a process pool. @@ -46,8 +46,9 @@ def download_many_blobs_with_transfer_manager( # The maximum number of processes to use for the operation. The performance # impact of this value depends on the use case, but smaller files usually # benefit from a higher number of processes. Each additional process occupies - # some CPU and memory resources until finished. - # processes=8 + # some CPU and memory resources until finished. Threads can be used instead + # of processes by passing `worker_type=transfer_manager.THREAD`. + # workers=8 from google.cloud.storage import Client, transfer_manager @@ -55,7 +56,7 @@ def download_many_blobs_with_transfer_manager( bucket = storage_client.bucket(bucket_name) results = transfer_manager.download_many_to_path( - bucket, blob_names, destination_directory=destination_directory, max_workers=processes + bucket, blob_names, destination_directory=destination_directory, max_workers=workers ) for name, result in zip(blob_names, results): diff --git a/samples/snippets/storage_transfer_manager_upload_chunks_concurrently.py b/samples/snippets/storage_transfer_manager_upload_chunks_concurrently.py new file mode 100644 index 000000000..009f09648 --- /dev/null +++ b/samples/snippets/storage_transfer_manager_upload_chunks_concurrently.py @@ -0,0 +1,57 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START storage_transfer_manager_upload_chunks_concurrently] +def upload_chunks_concurrently( + bucket_name, + source_filename, + destination_blob_name, + chunk_size=32 * 1024 * 1024, + workers=8, +): + """Upload a single file, in chunks, concurrently in a process pool.""" + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # The path to your file to upload + # source_filename = "local/path/to/file" + + # The ID of your GCS object + # destination_blob_name = "storage-object-name" + + # The size of each chunk. The performance impact of this value depends on + # the use case. The remote service has a minimum of 5 MiB and a maximum of + # 5 GiB. + # chunk_size = 32 * 1024 * 1024 (32 MiB) + + # The maximum number of processes to use for the operation. The performance + # impact of this value depends on the use case. Each additional process + # occupies some CPU and memory resources until finished. Threads can be used + # instead of processes by passing `worker_type=transfer_manager.THREAD`. + # workers=8 + + from google.cloud.storage import Client, transfer_manager + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(destination_blob_name) + + transfer_manager.upload_chunks_concurrently( + source_filename, blob, chunk_size=chunk_size, max_workers=workers + ) + + print(f"File {source_filename} uploaded to {destination_blob_name}.") + + +# [END storage_transfer_manager_upload_chunks_concurrently] diff --git a/samples/snippets/storage_transfer_manager_upload_directory.py b/samples/snippets/storage_transfer_manager_upload_directory.py index c0dbb9c9c..329ca1081 100644 --- a/samples/snippets/storage_transfer_manager_upload_directory.py +++ b/samples/snippets/storage_transfer_manager_upload_directory.py @@ -13,7 +13,7 @@ # limitations under the License. # [START storage_transfer_manager_upload_directory] -def upload_directory_with_transfer_manager(bucket_name, source_directory, processes=8): +def upload_directory_with_transfer_manager(bucket_name, source_directory, workers=8): """Upload every file in a directory, including all files in subdirectories. Each blob name is derived from the filename, not including the `directory` @@ -33,8 +33,9 @@ def upload_directory_with_transfer_manager(bucket_name, source_directory, proces # The maximum number of processes to use for the operation. The performance # impact of this value depends on the use case, but smaller files usually # benefit from a higher number of processes. Each additional process occupies - # some CPU and memory resources until finished. - # processes=8 + # some CPU and memory resources until finished. Threads can be used instead + # of processes by passing `worker_type=transfer_manager.THREAD`. + # workers=8 from pathlib import Path @@ -65,7 +66,7 @@ def upload_directory_with_transfer_manager(bucket_name, source_directory, proces # Start the upload. results = transfer_manager.upload_many_from_filenames( - bucket, string_paths, source_directory=source_directory, max_workers=processes + bucket, string_paths, source_directory=source_directory, max_workers=workers ) for name, result in zip(string_paths, results): diff --git a/samples/snippets/storage_transfer_manager_upload_many.py b/samples/snippets/storage_transfer_manager_upload_many.py index 2ed647650..1b9b9fc89 100644 --- a/samples/snippets/storage_transfer_manager_upload_many.py +++ b/samples/snippets/storage_transfer_manager_upload_many.py @@ -14,7 +14,7 @@ # [START storage_transfer_manager_upload_many] def upload_many_blobs_with_transfer_manager( - bucket_name, filenames, source_directory="", processes=8 + bucket_name, filenames, source_directory="", workers=8 ): """Upload every file in a list to a bucket, concurrently in a process pool. @@ -43,8 +43,9 @@ def upload_many_blobs_with_transfer_manager( # The maximum number of processes to use for the operation. The performance # impact of this value depends on the use case, but smaller files usually # benefit from a higher number of processes. Each additional process occupies - # some CPU and memory resources until finished. - # processes=8 + # some CPU and memory resources until finished. Threads can be used instead + # of processes by passing `worker_type=transfer_manager.THREAD`. + # workers=8 from google.cloud.storage import Client, transfer_manager @@ -52,7 +53,7 @@ def upload_many_blobs_with_transfer_manager( bucket = storage_client.bucket(bucket_name) results = transfer_manager.upload_many_from_filenames( - bucket, filenames, source_directory=source_directory, max_workers=processes + bucket, filenames, source_directory=source_directory, max_workers=workers ) for name, result in zip(filenames, results):