Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add transfer_manager.upload_chunks_concurrently using the XML MPU API #1115

Merged
merged 12 commits into from
Sep 18, 2023
4 changes: 2 additions & 2 deletions google/cloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -1697,7 +1697,7 @@ def _get_writable_metadata(self):

return object_metadata

def _get_upload_arguments(self, client, content_type):
def _get_upload_arguments(self, client, content_type, filename=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm just a reminder there are changes in the tm-metrics branch relevant to this private method as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, already resolved via merge in a different branch

"""Get required arguments for performing an upload.

The content type returned will be determined in order of precedence:
Expand All @@ -1716,7 +1716,7 @@ def _get_upload_arguments(self, client, content_type):
* An object metadata dictionary
* The ``content_type`` as a string (according to precedence)
"""
content_type = self._get_content_type(content_type)
content_type = self._get_content_type(content_type, filename=filename)
headers = {
**_get_default_headers(client._connection.user_agent, content_type),
**_get_encryption_headers(self._encryption_key),
Expand Down
280 changes: 269 additions & 11 deletions google/cloud/storage/transfer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
from google.api_core import exceptions
from google.cloud.storage import Client
from google.cloud.storage import Blob
from google.cloud.storage.blob import _get_host_name
from google.cloud.storage.constants import _DEFAULT_TIMEOUT

from google.resumable_media.requests.upload import XMLMPUContainer
from google.resumable_media.requests.upload import XMLMPUPart


warnings.warn(
"The module `transfer_manager` is a preview feature. Functionality and API "
Expand All @@ -35,7 +41,14 @@

TM_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024
DEFAULT_MAX_WORKERS = 8

METADATA_HEADER_TRANSLATION = {
"cacheControl": "Cache-Control",
"contentDisposition": "Content-Disposition",
"contentEncoding": "Content-Encoding",
"contentLanguage": "Content-Language",
"customTime": "x-goog-custom-time",
"storageClass": "x-goog-storage-class",
}

# Constants to be passed in as `worker_type`.
PROCESS = "process"
Expand Down Expand Up @@ -198,7 +211,7 @@ def upload_many(
futures.append(
executor.submit(
_call_method_on_maybe_pickled_blob,
_pickle_blob(blob) if needs_pickling else blob,
_pickle_client(blob) if needs_pickling else blob,
"upload_from_filename"
if isinstance(path_or_file, str)
else "upload_from_file",
Expand Down Expand Up @@ -343,7 +356,7 @@ def download_many(
futures.append(
executor.submit(
_call_method_on_maybe_pickled_blob,
_pickle_blob(blob) if needs_pickling else blob,
_pickle_client(blob) if needs_pickling else blob,
"download_to_filename"
if isinstance(path_or_file, str)
else "download_to_file",
Expand Down Expand Up @@ -733,7 +746,6 @@ def download_chunks_concurrently(
Checksumming (md5 or crc32c) is not supported for chunked operations. Any
`checksum` parameter passed in to download_kwargs will be ignored.

:type bucket: 'google.cloud.storage.bucket.Bucket'
:param bucket:
The bucket which contains the blobs to be downloaded

Expand All @@ -745,6 +757,12 @@ def download_chunks_concurrently(
:param filename:
The destination filename or path.

:type chunk_size: int
:param chunk_size:
The size in bytes of each chunk to send. The optimal chunk size for
maximum throughput may vary depending on the exact network environment
and size of the blob.

:type download_kwargs: dict
:param download_kwargs:
A dictionary of keyword arguments to pass to the download method. Refer
Expand Down Expand Up @@ -809,7 +827,7 @@ def download_chunks_concurrently(

pool_class, needs_pickling = _get_pool_class_and_requirements(worker_type)
# Pickle the blob ahead of time (just once, not once per chunk) if needed.
maybe_pickled_blob = _pickle_blob(blob) if needs_pickling else blob
maybe_pickled_blob = _pickle_client(blob) if needs_pickling else blob

futures = []

Expand Down Expand Up @@ -844,9 +862,249 @@ def download_chunks_concurrently(
return None


def upload_chunks_concurrently(
filename,
blob,
content_type=None,
chunk_size=TM_DEFAULT_CHUNK_SIZE,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is technically MPU part_size? Are using chunk_size for consistency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the part size. I am using chunk_size for consistency with the similar download method.

deadline=None,
worker_type=PROCESS,
max_workers=DEFAULT_MAX_WORKERS,
*,
checksum="md5",
timeout=_DEFAULT_TIMEOUT,
):
"""Upload a single file in chunks, concurrently.

This function uses the XML MPU API to initialize an upload and upload a
file in chunks, concurrently with a worker pool.

The XML MPU API is significantly different from other uploads; please review
the documentation at https://cloud.google.com/storage/docs/multipart-uploads
before using this feature.

The library will attempt to cancel uploads that fail due to an exception.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

If the upload fails in a way that precludes cancellation, such as a
hardware failure, process termination, or power outage, then the incomplete
upload may persist indefinitely. To mitigate this, set the
`AbortIncompleteMultipartUpload` with a nonzero `Age` in bucket lifecycle
rules, or refer to the XML API documentation linked above to learn more
about how to list and delete individual downloads.

Using this feature with multiple threads is unlikely to improve upload
performance under normal circumstances due to Python interpreter threading
behavior. The default is therefore to use processes instead of threads.

ACL information cannot be sent with this function and should be set
separately with :class:`ObjectACL` methods.

:type filename: str
:param filename:
The path to the file to upload. File-like objects are not supported.

:type blob: `google.cloud.storage.Blob`
:param blob:
The blob to which to upload.

:type content_type: str
:param content_type: (Optional) Type of content being uploaded.

:type chunk_size: int
:param chunk_size:
The size in bytes of each chunk to send. The optimal chunk size for
maximum throughput may vary depending on the exact network environment
and size of the blob. The remote API has restrictions on the minimum
and maximum size allowable, see: https://cloud.google.com/storage/quotas#requests

:type deadline: int
:param deadline:
The number of seconds to wait for all threads to resolve. If the
deadline is reached, all threads will be terminated regardless of their
progress and concurrent.futures.TimeoutError will be raised. This can be
left as the default of None (no deadline) for most use cases.

:type worker_type: str
:param worker_type:
The worker type to use; one of google.cloud.storage.transfer_manager.PROCESS
or google.cloud.storage.transfer_manager.THREAD.

Although the exact performance impact depends on the use case, in most
situations the PROCESS worker type will use more system resources (both
memory and CPU) and result in faster operations than THREAD workers.

Because the subprocesses of the PROCESS worker type can't access memory
from the main process, Client objects have to be serialized and then
recreated in each subprocess. The serialization of the Client object
for use in subprocesses is an approximation and may not capture every
detail of the Client object, especially if the Client was modified after
its initial creation or if `Client._http` was modified in any way.

THREAD worker types are observed to be relatively efficient for
operations with many small files, but not for operations with large
files. PROCESS workers are recommended for large file operations.

:type max_workers: int
:param max_workers:
The maximum number of workers to create to handle the workload.

With PROCESS workers, a larger number of workers will consume more
system resources (memory and CPU) at once.

How many workers is optimal depends heavily on the specific use case,
and the default is a conservative number that should work okay in most
cases without consuming excessive resources.

:type checksum: str
:param checksum:
(Optional) The checksum scheme to use: either 'md5', 'crc32c' or None.
Each individual part is checksummed. At present, the selected checksum
rule is only applied to parts and a separate checksum of the entire
resulting blob is not computed. Please compute and compare the checksum
of the file to the resulting blob separately if needed, using the
'crc32c' algorithm as per the XML MPU documentation.

:type timeout: float or tuple
:param timeout:
(Optional) The amount of time, in seconds, to wait
for the server response. See: :ref:`configuring_timeouts`

:raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded.
"""

bucket = blob.bucket
client = blob.client
transport = blob._get_transport(client)

hostname = _get_host_name(client._connection)
url = "{hostname}/{bucket}/{blob}".format(
hostname=hostname, bucket=bucket.name, blob=blob.name
)

base_headers, object_metadata, content_type = blob._get_upload_arguments(
client, content_type, filename=filename
)
headers = {**base_headers, **_headers_from_metadata(object_metadata)}

if blob.user_project is not None:
headers["x-goog-user-project"] = blob.user_project

# When a Customer Managed Encryption Key is used to encrypt Cloud Storage object
# at rest, object resource metadata will store the version of the Key Management
# Service cryptographic material. If a Blob instance with KMS Key metadata set is
# used to upload a new version of the object then the existing kmsKeyName version
# value can't be used in the upload request and the client instead ignores it.
if blob.kms_key_name is not None and "cryptoKeyVersions" not in blob.kms_key_name:
headers["x-goog-encryption-kms-key-name"] = blob.kms_key_name

container = XMLMPUContainer(url, filename, headers=headers)
container.initiate(transport=transport, content_type=content_type)
upload_id = container.upload_id

size = os.path.getsize(filename)
num_of_parts = -(size // -chunk_size) # Ceiling division

pool_class, needs_pickling = _get_pool_class_and_requirements(worker_type)
# Pickle the blob ahead of time (just once, not once per chunk) if needed.
maybe_pickled_client = _pickle_client(client) if needs_pickling else client

futures = []

with pool_class(max_workers=max_workers) as executor:

for part_number in range(1, num_of_parts + 1):
start = (part_number - 1) * chunk_size
end = min(part_number * chunk_size, size)
andrewsg marked this conversation as resolved.
Show resolved Hide resolved

futures.append(
executor.submit(
_upload_part,
maybe_pickled_client,
url,
upload_id,
filename,
start=start,
end=end,
part_number=part_number,
checksum=checksum,
headers=headers,
)
)

concurrent.futures.wait(
futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED
)

try:
# Harvest results and raise exceptions.
for future in futures:
part_number, etag = future.result()
container.register_part(part_number, etag)

container.finalize(blob._get_transport(client))
except Exception:
container.cancel(blob._get_transport(client))
andrewsg marked this conversation as resolved.
Show resolved Hide resolved
raise


def _upload_part(
maybe_pickled_client,
url,
upload_id,
filename,
start,
end,
part_number,
checksum,
headers,
):
"""Helper function that runs inside a thread or subprocess to upload a part.
andrewsg marked this conversation as resolved.
Show resolved Hide resolved

`maybe_pickled_client` is either a Client (for threads) or a specially
pickled Client (for processes) because the default pickling mangles Client
objects."""

if isinstance(maybe_pickled_client, Client):
client = maybe_pickled_client
else:
client = pickle.loads(maybe_pickled_client)
part = XMLMPUPart(
url,
upload_id,
filename,
start=start,
end=end,
part_number=part_number,
checksum=checksum,
headers=headers,
)
part.upload(client._http)
return (part_number, part.etag)


def _headers_from_metadata(metadata):
"""Helper function to translate object metadata into a header dictionary."""

headers = {}
# Handle standard writable metadata
for key, value in metadata.items():
if key in METADATA_HEADER_TRANSLATION:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ are there any writable metadata fields that don't require translation?

Just wanted to make sure that they are all captured in _headers_from_metadata. If not, this works!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is the complete list according to the documentation. Note that custom metadata and some other features like encryption are not supported through this translation dictionary but handled differently.

headers[METADATA_HEADER_TRANSLATION[key]] = value
# Handle custom metadata
if "metadata" in metadata:
for key, value in metadata["metadata"].items():
headers["x-goog-meta-" + key] = value
return headers


def _download_and_write_chunk_in_place(
maybe_pickled_blob, filename, start, end, download_kwargs
):
"""Helper function that runs inside a thread or subprocess.

`maybe_pickled_blob` is either a Blob (for threads) or a specially pickled
Blob (for processes) because the default pickling mangles Client objects
which are attached to Blobs."""

if isinstance(maybe_pickled_blob, Blob):
blob = maybe_pickled_blob
else:
Expand All @@ -863,9 +1121,9 @@ def _call_method_on_maybe_pickled_blob(
):
"""Helper function that runs inside a thread or subprocess.

`maybe_pickled_blob` is either a blob (for threads) or a specially pickled
blob (for processes) because the default pickling mangles clients which are
attached to blobs."""
`maybe_pickled_blob` is either a Blob (for threads) or a specially pickled
Blob (for processes) because the default pickling mangles Client objects
which are attached to Blobs."""

if isinstance(maybe_pickled_blob, Blob):
blob = maybe_pickled_blob
Expand Down Expand Up @@ -894,8 +1152,8 @@ def _reduce_client(cl):
)


def _pickle_blob(blob):
"""Pickle a Blob (and its Bucket and Client) and return a bytestring."""
def _pickle_client(obj):
"""Pickle a Client or an object that owns a Client (like a Blob)"""

# We need a custom pickler to process Client objects, which are attached to
# Buckets (and therefore to Blobs in turn). Unfortunately, the Python
Expand All @@ -907,7 +1165,7 @@ def _pickle_blob(blob):
p = pickle.Pickler(f)
p.dispatch_table = copyreg.dispatch_table.copy()
p.dispatch_table[Client] = _reduce_client
p.dump(blob)
p.dump(obj)
return f.getvalue()


Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"google-auth >= 1.25.0, < 3.0dev",
"google-api-core >= 1.31.5, <3.0.0dev,!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0",
"google-cloud-core >= 2.3.0, < 3.0dev",
"google-resumable-media >= 2.3.2",
"google-resumable-media >= 2.6.0",
"requests >= 2.18.0, < 3.0.0dev",
]
extras = {"protobuf": ["protobuf<5.0.0dev"]}
Expand Down
Loading