From 32aa62d413a74e39c767f8e84c3d8d068b5bc036 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Thu, 27 Jul 2023 09:32:38 -0700 Subject: [PATCH 01/11] intermediate commit --- google/cloud/storage/transfer_manager.py | 108 +++++++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 0b65702d4..c0b33a4c0 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -843,6 +843,114 @@ def download_chunks_concurrently( future.result() return None +def upload_chunks_concurrently( + filename, + blob, + content_type=None, # FIXME - check against other similar args and positions + chunk_size=TM_DEFAULT_CHUNK_SIZE, + upload_kwargs=None, + deadline=None, + worker_type=PROCESS, + max_workers=DEFAULT_MAX_WORKERS, +): + """Upload a single file in chunks, concurrently.""" + + # TODO: + # async/pool + # automatic chunk division + # use requests query params constructors instead of strings + # use resumable media + # impl retries + # impl md5 of chunks + # open bug for crc32c of finished product (also other tm stuff) + + from google.cloud.storage.blob import _get_host_name + from xml.etree import ElementTree + + _MPU_URL_TEMPLATE = "https://{bucket}.{hostname}/{blob}{query}" + _MPU_INITIATE_QUERY = "?uploads" + _MPU_CHUNK_QUERY_TEMPLATE = "?partNumber=${partNumber}&uploadId=${this.uploadId}" + + bucket = blob.bucket + client = blob.client + timeout = 60 # fixme + + #hostname = _get_host_name(blob.bucket.client._connection) + hostname = "storage.googleapis.com" + url = _MPU_URL_TEMPLATE.format( + hostname=hostname, bucket=bucket.name, blob=blob.name, query=_MPU_INITIATE_QUERY + ) + content_type = blob._get_content_type(content_type) + info = blob._get_upload_arguments(client, content_type) + headers, object_metadata, content_type = info + + response = client._base_connection._make_request( + "POST", url, headers=headers, timeout=timeout + ) + + print(response.text) + + root = ElementTree.fromstring(response.text) + upload_id = root.find("{http://s3.amazonaws.com/doc/2006-03-01/}UploadId").text # FIXME: turn into a constant + + print("upload id {}".format(upload_id)) + + _CHUNK_QUERY_TEMPLATE = "?partNumber={part}&uploadId={upload_id}" + + part = 1 # fixme + chunk_query = _CHUNK_QUERY_TEMPLATE.format(part=part, upload_id=upload_id) + chunk_url = _MPU_URL_TEMPLATE.format( + hostname=hostname, bucket=bucket.name, blob=blob.name, query=chunk_query + ) + + with open(filename, 'r') as f: + response = client._base_connection._make_request("PUT", chunk_url, headers=headers, timeout=timeout, data=f.read()) + print("chunk headers:") + print(response.headers) + etag = response.headers['etag'] + + _FINAL_QUERY_TEMPLATE = "?uploadId={upload_id}" + final_query = _FINAL_QUERY_TEMPLATE.format(upload_id=upload_id) + final_url = _MPU_URL_TEMPLATE.format( + hostname=hostname, bucket=bucket.name, blob=blob.name, query=final_query + ) + + final_xml_root = ElementTree.Element("CompleteMultipartUpload") + part = ElementTree.SubElement(final_xml_root, "Part") # put in a loop + ElementTree.SubElement(part, "PartNumber").text = "1" + ElementTree.SubElement(part, "ETag").text = etag + final_xml = ElementTree.tostring(final_xml_root) + print("final xml:") + print(final_xml) + + response = client._base_connection._make_request("POST", final_url, headers=headers, timeout=timeout, data=final_xml) + print("finish:") + print(response.text) + +# https://github.com/googleapis/nodejs-storage/pull/2192/files + +# "andrewsg-testindex.htmlABPnzm6owR0finnvnKpRLuaA_xsAF1-HO5L6C8qx8amqNzr5kx5zzYwzVYMs" + + # this.baseUrl = `https://${bucket.name}.${new URL(this.bucket.storage.apiEndpoint).hostname}/${fileName}`; + # const url = `${this.baseUrl}?uploads`; + # try { + # const res = await this.authClient.request({ + # method: 'POST', + # url, + # }); + # if (res.data && res.data.error) { + # throw res.data.error; + # } + # const parsedXML = this.xmlParser.parse(res.data); + # this.uploadId = parsedXML.InitiateMultipartUploadResult.UploadId; +# Get the true URL +# Compose an XML request to start the process +# Send request + + + + + def _download_and_write_chunk_in_place( maybe_pickled_blob, filename, start, end, download_kwargs From 33871f572b375c80b1890c757ae11e155201f1c0 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Wed, 16 Aug 2023 15:31:49 -0700 Subject: [PATCH 02/11] temporary commit --- google/cloud/storage/transfer_manager.py | 135 ++++++++++++----------- 1 file changed, 68 insertions(+), 67 deletions(-) diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index c0b33a4c0..2204c27df 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -27,6 +27,9 @@ from google.cloud.storage import Client from google.cloud.storage import Blob +from google.resumable_media.requests.upload import XMLMPUContainer +from google.resumable_media.requests.upload import XMLMPUPart + warnings.warn( "The module `transfer_manager` is a preview feature. Functionality and API " "may change. This warning will be removed in a future release." @@ -843,33 +846,32 @@ def download_chunks_concurrently( future.result() return None + def upload_chunks_concurrently( filename, blob, - content_type=None, # FIXME - check against other similar args and positions + content_type=None, chunk_size=TM_DEFAULT_CHUNK_SIZE, - upload_kwargs=None, deadline=None, worker_type=PROCESS, max_workers=DEFAULT_MAX_WORKERS, + *, + checksum=None ): """Upload a single file in chunks, concurrently.""" # TODO: - # async/pool - # automatic chunk division - # use requests query params constructors instead of strings - # use resumable media - # impl retries - # impl md5 of chunks - # open bug for crc32c of finished product (also other tm stuff) + # open bug for crc32c of finished product (also other tm checksum stuff) + # metadata + # headers + # resumable media docs in init from google.cloud.storage.blob import _get_host_name from xml.etree import ElementTree _MPU_URL_TEMPLATE = "https://{bucket}.{hostname}/{blob}{query}" _MPU_INITIATE_QUERY = "?uploads" - _MPU_CHUNK_QUERY_TEMPLATE = "?partNumber=${partNumber}&uploadId=${this.uploadId}" + _CHUNK_QUERY_TEMPLATE = "?partNumber={part}&uploadId={upload_id}" bucket = blob.bucket client = blob.client @@ -878,78 +880,61 @@ def upload_chunks_concurrently( #hostname = _get_host_name(blob.bucket.client._connection) hostname = "storage.googleapis.com" url = _MPU_URL_TEMPLATE.format( - hostname=hostname, bucket=bucket.name, blob=blob.name, query=_MPU_INITIATE_QUERY - ) - content_type = blob._get_content_type(content_type) - info = blob._get_upload_arguments(client, content_type) - headers, object_metadata, content_type = info + hostname=hostname, bucket=bucket.name, blob=blob.name, query="" + ) # FIXME - response = client._base_connection._make_request( - "POST", url, headers=headers, timeout=timeout - ) + container = XMLMPUContainer(url, filename) - print(response.text) + content_type = blob._get_content_type(content_type, filename=filename) - root = ElementTree.fromstring(response.text) - upload_id = root.find("{http://s3.amazonaws.com/doc/2006-03-01/}UploadId").text # FIXME: turn into a constant + container.initiate(transport=blob._get_transport(client), content_type=content_type) + upload_id = container.upload_id - print("upload id {}".format(upload_id)) + size = os.path.getsize(filename) - _CHUNK_QUERY_TEMPLATE = "?partNumber={part}&uploadId={upload_id}" + num_of_parts = -(size // -chunk_size) - part = 1 # fixme - chunk_query = _CHUNK_QUERY_TEMPLATE.format(part=part, upload_id=upload_id) - chunk_url = _MPU_URL_TEMPLATE.format( - hostname=hostname, bucket=bucket.name, blob=blob.name, query=chunk_query - ) - with open(filename, 'r') as f: - response = client._base_connection._make_request("PUT", chunk_url, headers=headers, timeout=timeout, data=f.read()) - print("chunk headers:") - print(response.headers) - etag = response.headers['etag'] - - _FINAL_QUERY_TEMPLATE = "?uploadId={upload_id}" - final_query = _FINAL_QUERY_TEMPLATE.format(upload_id=upload_id) - final_url = _MPU_URL_TEMPLATE.format( - hostname=hostname, bucket=bucket.name, blob=blob.name, query=final_query - ) - - final_xml_root = ElementTree.Element("CompleteMultipartUpload") - part = ElementTree.SubElement(final_xml_root, "Part") # put in a loop - ElementTree.SubElement(part, "PartNumber").text = "1" - ElementTree.SubElement(part, "ETag").text = etag - final_xml = ElementTree.tostring(final_xml_root) - print("final xml:") - print(final_xml) + pool_class, needs_pickling = _get_pool_class_and_requirements(worker_type) + # Pickle the blob ahead of time (just once, not once per chunk) if needed. + maybe_pickled_client = _pickle_blob(client) if needs_pickling else client - response = client._base_connection._make_request("POST", final_url, headers=headers, timeout=timeout, data=final_xml) - print("finish:") - print(response.text) + futures = [] -# https://github.com/googleapis/nodejs-storage/pull/2192/files + with pool_class(max_workers=max_workers) as executor: -# "andrewsg-testindex.htmlABPnzm6owR0finnvnKpRLuaA_xsAF1-HO5L6C8qx8amqNzr5kx5zzYwzVYMs" + for part_number in range(1, num_of_parts+1): + start = (part_number-1) * chunk_size + end = min(part_number * chunk_size, size) - # this.baseUrl = `https://${bucket.name}.${new URL(this.bucket.storage.apiEndpoint).hostname}/${fileName}`; - # const url = `${this.baseUrl}?uploads`; - # try { - # const res = await this.authClient.request({ - # method: 'POST', - # url, - # }); - # if (res.data && res.data.error) { - # throw res.data.error; - # } - # const parsedXML = this.xmlParser.parse(res.data); - # this.uploadId = parsedXML.InitiateMultipartUploadResult.UploadId; -# Get the true URL -# Compose an XML request to start the process -# Send request + futures.append( + executor.submit( + _upload_part, + maybe_pickled_client, url, upload_id, filename, start=start, end=end, part_number=part_number, checksum=checksum + ) + ) + concurrent.futures.wait( + futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED + ) + # Harvest results and raise exceptions. + for future in futures: + part_number, etag = future.result() + container._parts[str(part_number)] = etag + container.finalize(blob._get_transport(client)) +def _upload_part( + maybe_pickled_client, url, upload_id, filename, start, end, part_number, checksum +): + if isinstance(maybe_pickled_client, Client): + client = maybe_pickled_client + else: + client = pickle.loads(maybe_pickled_client) + part = XMLMPUPart(url, upload_id, filename, start=start, end=end, part_number=part_number, checksum=checksum) + part.upload(client._http) + return (part_number, part.etag) def _download_and_write_chunk_in_place( @@ -1002,6 +987,22 @@ def _reduce_client(cl): ) +def _pickle_client(client): + """Pickle a Client and return a bytestring.""" + + # We need a custom pickler to process Client objects, which are attached to + # Buckets (and therefore to Blobs in turn). Unfortunately, the Python + # multiprocessing library doesn't seem to have a good way to use a custom + # pickler, and using copyreg will mutate global state and affect code + # outside of the client library. Instead, we'll pre-pickle the object and + # pass the bytestring in. + f = io.BytesIO() + p = pickle.Pickler(f) + p.dispatch_table = copyreg.dispatch_table.copy() + p.dispatch_table[Client] = _reduce_client + p.dump(client) + return f.getvalue() + def _pickle_blob(blob): """Pickle a Blob (and its Bucket and Client) and return a bytestring.""" From ff40e9904cb6e84f82b94ee6279bb1c25440d7c2 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Fri, 1 Sep 2023 17:56:03 -0700 Subject: [PATCH 03/11] xml mpu support, unit tests and docstrings --- google/cloud/storage/transfer_manager.py | 221 +++++++++++++++++------ setup.py | 2 +- tests/unit/test_transfer_manager.py | 128 ++++++++++++- 3 files changed, 286 insertions(+), 65 deletions(-) diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 2204c27df..ca8abde80 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -26,10 +26,13 @@ from google.api_core import exceptions from google.cloud.storage import Client from google.cloud.storage import Blob +from google.cloud.storage.blob import _get_host_name +from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.resumable_media.requests.upload import XMLMPUContainer from google.resumable_media.requests.upload import XMLMPUPart + warnings.warn( "The module `transfer_manager` is a preview feature. Functionality and API " "may change. This warning will be removed in a future release." @@ -201,7 +204,7 @@ def upload_many( futures.append( executor.submit( _call_method_on_maybe_pickled_blob, - _pickle_blob(blob) if needs_pickling else blob, + _pickle_client(blob) if needs_pickling else blob, "upload_from_filename" if isinstance(path_or_file, str) else "upload_from_file", @@ -346,7 +349,7 @@ def download_many( futures.append( executor.submit( _call_method_on_maybe_pickled_blob, - _pickle_blob(blob) if needs_pickling else blob, + _pickle_client(blob) if needs_pickling else blob, "download_to_filename" if isinstance(path_or_file, str) else "download_to_file", @@ -736,7 +739,6 @@ def download_chunks_concurrently( Checksumming (md5 or crc32c) is not supported for chunked operations. Any `checksum` parameter passed in to download_kwargs will be ignored. - :type bucket: 'google.cloud.storage.bucket.Bucket' :param bucket: The bucket which contains the blobs to be downloaded @@ -748,6 +750,12 @@ def download_chunks_concurrently( :param filename: The destination filename or path. + :type chunk_size: int + :param chunk_size: + The size in bytes of each chunk to send. The optimal chunk size for + maximum throughput may vary depending on the exact network environment + and size of the blob. + :type download_kwargs: dict :param download_kwargs: A dictionary of keyword arguments to pass to the download method. Refer @@ -812,7 +820,7 @@ def download_chunks_concurrently( pool_class, needs_pickling = _get_pool_class_and_requirements(worker_type) # Pickle the blob ahead of time (just once, not once per chunk) if needed. - maybe_pickled_blob = _pickle_blob(blob) if needs_pickling else blob + maybe_pickled_blob = _pickle_client(blob) if needs_pickling else blob futures = [] @@ -856,61 +864,145 @@ def upload_chunks_concurrently( worker_type=PROCESS, max_workers=DEFAULT_MAX_WORKERS, *, - checksum=None + checksum="md5", + timeout=_DEFAULT_TIMEOUT, ): - """Upload a single file in chunks, concurrently.""" + """Upload a single file in chunks, concurrently. - # TODO: - # open bug for crc32c of finished product (also other tm checksum stuff) - # metadata - # headers - # resumable media docs in init + This function uses the XML MPU API to initialize an upload and upload a + file in chunks, concurrently with a worker pool. - from google.cloud.storage.blob import _get_host_name - from xml.etree import ElementTree + The XML MPU API is significantly different from other uploads; please review + the documentation at https://cloud.google.com/storage/docs/multipart-uploads + before using this feature. - _MPU_URL_TEMPLATE = "https://{bucket}.{hostname}/{blob}{query}" - _MPU_INITIATE_QUERY = "?uploads" - _CHUNK_QUERY_TEMPLATE = "?partNumber={part}&uploadId={upload_id}" + Blob metadata beyond the name is not currently transmitted with this + feature. Please set blob metadata separately after uploading. - bucket = blob.bucket - client = blob.client - timeout = 60 # fixme + Encryption is also not supported at present. Please do not use customer- + supplied encryption keys or customer-managed encryption keys with this + feature. - #hostname = _get_host_name(blob.bucket.client._connection) - hostname = "storage.googleapis.com" - url = _MPU_URL_TEMPLATE.format( - hostname=hostname, bucket=bucket.name, blob=blob.name, query="" - ) # FIXME + Using this feature with multiple threads is unlikely to improve upload + performance under normal circumstances due to Python interpreter threading + behavior. The default is therefore to use processes instead of threads. - container = XMLMPUContainer(url, filename) + :type filename: str + :param filename: + The path to the file to upload. File-like objects are not supported. + + :type blob: `google.cloud.storage.Blob` + :param blob: + The blob to which to upload. + + :type content_type: str + :param content_type: (Optional) Type of content being uploaded. + + :type chunk_size: int + :param chunk_size: + The size in bytes of each chunk to send. The optimal chunk size for + maximum throughput may vary depending on the exact network environment + and size of the blob. + + :type deadline: int + :param deadline: + The number of seconds to wait for all threads to resolve. If the + deadline is reached, all threads will be terminated regardless of their + progress and concurrent.futures.TimeoutError will be raised. This can be + left as the default of None (no deadline) for most use cases. + + :type worker_type: str + :param worker_type: + The worker type to use; one of google.cloud.storage.transfer_manager.PROCESS + or google.cloud.storage.transfer_manager.THREAD. + + Although the exact performance impact depends on the use case, in most + situations the PROCESS worker type will use more system resources (both + memory and CPU) and result in faster operations than THREAD workers. + + Because the subprocesses of the PROCESS worker type can't access memory + from the main process, Client objects have to be serialized and then + recreated in each subprocess. The serialization of the Client object + for use in subprocesses is an approximation and may not capture every + detail of the Client object, especially if the Client was modified after + its initial creation or if `Client._http` was modified in any way. + + THREAD worker types are observed to be relatively efficient for + operations with many small files, but not for operations with large + files. PROCESS workers are recommended for large file operations. + + :type max_workers: int + :param max_workers: + The maximum number of workers to create to handle the workload. + + With PROCESS workers, a larger number of workers will consume more + system resources (memory and CPU) at once. + + How many workers is optimal depends heavily on the specific use case, + and the default is a conservative number that should work okay in most + cases without consuming excessive resources. + :type checksum: str + :param checksum: + (Optional) The checksum scheme to use: either 'md5', 'crc32c' or None. + Each individual part is checksummed. At present, the selected checksum + rule is only applied to parts and a separate checksum of the entire + resulting blob is not computed. Please compute and compare the checksum + of the file to the resulting blob separately if needed, using the + 'crc32c' algorithm as per the XML MPU documentation. + + :type timeout: float or tuple + :param timeout: + (Optional) The amount of time, in seconds, to wait + for the server response. See: :ref:`configuring_timeouts` + + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. + """ + + # TODO: + # make base url creation more robust than string addition - consider also using that for query strings in resumable media + # metadata + # other special purpose header stuff like encryption + # figure out custom client stuff + + bucket = blob.bucket + client = blob.client + transport = blob._get_transport(client) + + hostname = _get_host_name(client._connection) + url = hostname + "/" + bucket.name + "/" + blob.name # FIXME: make this nicer content_type = blob._get_content_type(content_type, filename=filename) - container.initiate(transport=blob._get_transport(client), content_type=content_type) + container = XMLMPUContainer(url, filename) + container.initiate(transport=transport, content_type=content_type) upload_id = container.upload_id size = os.path.getsize(filename) - num_of_parts = -(size // -chunk_size) - pool_class, needs_pickling = _get_pool_class_and_requirements(worker_type) # Pickle the blob ahead of time (just once, not once per chunk) if needed. - maybe_pickled_client = _pickle_blob(client) if needs_pickling else client + maybe_pickled_client = _pickle_client(client) if needs_pickling else client futures = [] with pool_class(max_workers=max_workers) as executor: - for part_number in range(1, num_of_parts+1): - start = (part_number-1) * chunk_size + for part_number in range(1, num_of_parts + 1): + start = (part_number - 1) * chunk_size end = min(part_number * chunk_size, size) futures.append( executor.submit( _upload_part, - maybe_pickled_client, url, upload_id, filename, start=start, end=end, part_number=part_number, checksum=checksum + maybe_pickled_client, + url, + upload_id, + filename, + start=start, + end=end, + part_number=part_number, + checksum=checksum, ) ) @@ -918,21 +1010,40 @@ def upload_chunks_concurrently( futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED ) - # Harvest results and raise exceptions. - for future in futures: - part_number, etag = future.result() - container._parts[str(part_number)] = etag + try: + # Harvest results and raise exceptions. + for future in futures: + part_number, etag = future.result() + container.register_part(part_number, etag) + + container.finalize(blob._get_transport(client)) + except Exception: + container.cancel(blob._get_transport(client)) + raise - container.finalize(blob._get_transport(client)) def _upload_part( maybe_pickled_client, url, upload_id, filename, start, end, part_number, checksum ): + """Helper function that runs inside a thread or subprocess to upload a part. + + `maybe_pickled_client` is either a Client (for threads) or a specially + pickled Client (for processes) because the default pickling mangles Client + objects.""" + if isinstance(maybe_pickled_client, Client): client = maybe_pickled_client else: client = pickle.loads(maybe_pickled_client) - part = XMLMPUPart(url, upload_id, filename, start=start, end=end, part_number=part_number, checksum=checksum) + part = XMLMPUPart( + url, + upload_id, + filename, + start=start, + end=end, + part_number=part_number, + checksum=checksum, + ) part.upload(client._http) return (part_number, part.etag) @@ -940,6 +1051,12 @@ def _upload_part( def _download_and_write_chunk_in_place( maybe_pickled_blob, filename, start, end, download_kwargs ): + """Helper function that runs inside a thread or subprocess. + + `maybe_pickled_blob` is either a Blob (for threads) or a specially pickled + Blob (for processes) because the default pickling mangles Client objects + which are attached to Blobs.""" + if isinstance(maybe_pickled_blob, Blob): blob = maybe_pickled_blob else: @@ -956,9 +1073,9 @@ def _call_method_on_maybe_pickled_blob( ): """Helper function that runs inside a thread or subprocess. - `maybe_pickled_blob` is either a blob (for threads) or a specially pickled - blob (for processes) because the default pickling mangles clients which are - attached to blobs.""" + `maybe_pickled_blob` is either a Blob (for threads) or a specially pickled + Blob (for processes) because the default pickling mangles Client objects + which are attached to Blobs.""" if isinstance(maybe_pickled_blob, Blob): blob = maybe_pickled_blob @@ -987,24 +1104,8 @@ def _reduce_client(cl): ) -def _pickle_client(client): - """Pickle a Client and return a bytestring.""" - - # We need a custom pickler to process Client objects, which are attached to - # Buckets (and therefore to Blobs in turn). Unfortunately, the Python - # multiprocessing library doesn't seem to have a good way to use a custom - # pickler, and using copyreg will mutate global state and affect code - # outside of the client library. Instead, we'll pre-pickle the object and - # pass the bytestring in. - f = io.BytesIO() - p = pickle.Pickler(f) - p.dispatch_table = copyreg.dispatch_table.copy() - p.dispatch_table[Client] = _reduce_client - p.dump(client) - return f.getvalue() - -def _pickle_blob(blob): - """Pickle a Blob (and its Bucket and Client) and return a bytestring.""" +def _pickle_client(obj): + """Pickle a Client or an object that owns a Client (like a Blob)""" # We need a custom pickler to process Client objects, which are attached to # Buckets (and therefore to Blobs in turn). Unfortunately, the Python @@ -1016,7 +1117,7 @@ def _pickle_blob(blob): p = pickle.Pickler(f) p.dispatch_table = copyreg.dispatch_table.copy() p.dispatch_table[Client] = _reduce_client - p.dump(blob) + p.dump(obj) return f.getvalue() diff --git a/setup.py b/setup.py index e2b5cc7a4..a57f972ff 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ "google-auth >= 1.25.0, < 3.0dev", "google-api-core >= 1.31.5, <3.0.0dev,!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0", "google-cloud-core >= 2.3.0, < 3.0dev", - "google-resumable-media >= 2.3.2", + "google-resumable-media >= 2.6.0", "requests >= 2.18.0, < 3.0.0dev", ] extras = {"protobuf": ["protobuf<5.0.0dev"]} diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index 685f48579..3e2a48c52 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -18,6 +18,7 @@ from google.cloud.storage import transfer_manager from google.cloud.storage import Blob +from google.cloud.storage import Client from google.api_core import exceptions @@ -33,6 +34,8 @@ FAKE_ENCODING = "fake_gzip" DOWNLOAD_KWARGS = {"accept-encoding": FAKE_ENCODING} CHUNK_SIZE = 8 +HOSTNAME = "https://example.com/" +URL = "https://example.com/bucket/blob/" # Used in subprocesses only, so excluded from coverage @@ -600,6 +603,85 @@ def test_download_chunks_concurrently_passes_concurrency_options(): wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) +def test_upload_chunks_concurrently(): + blob_mock = mock.Mock() + blob_mock.name = "blob" + blob_mock.bucket.name = "bucket" + transport = mock.Mock() + blob_mock._get_transport = mock.Mock(return_value=transport) + blob_mock._get_content_type = mock.Mock(return_value=FAKE_CONTENT_TYPE) + blob_mock.client = _PickleableMockClient(identify_as_client=True) + FILENAME = "file_a.txt" + SIZE = 2048 + + container_mock = mock.Mock() + container_mock.upload_id = "abcd" + part_mock = mock.Mock() + ETAG = "efgh" + part_mock.etag = ETAG + + with mock.patch("os.path.getsize", return_value=SIZE), mock.patch( + "google.cloud.storage.transfer_manager.XMLMPUContainer", + return_value=container_mock, + ), mock.patch( + "google.cloud.storage.transfer_manager.XMLMPUPart", return_value=part_mock + ): + transfer_manager.upload_chunks_concurrently( + FILENAME, + blob_mock, + chunk_size=SIZE // 2, + worker_type=transfer_manager.THREAD, + ) + container_mock.initiate.assert_called_once_with( + transport=transport, content_type=FAKE_CONTENT_TYPE + ) + container_mock.register_part.assert_any_call(1, ETAG) + container_mock.register_part.assert_any_call(2, ETAG) + container_mock.finalize.assert_called_once_with(transport) + part_mock.upload.assert_called_with(blob_mock.client._http) + + +def test_upload_chunks_concurrently_passes_concurrency_options(): + blob_mock = mock.Mock() + blob_mock.name = "blob" + blob_mock.bucket.name = "bucket" + transport = mock.Mock() + blob_mock._get_transport = mock.Mock(return_value=transport) + blob_mock._get_content_type = mock.Mock(return_value=FAKE_CONTENT_TYPE) + blob_mock.client = _PickleableMockClient(identify_as_client=True) + FILENAME = "file_a.txt" + SIZE = 2048 + + container_mock = mock.Mock() + container_mock.upload_id = "abcd" + + MAX_WORKERS = 7 + DEADLINE = 10 + + with mock.patch("os.path.getsize", return_value=SIZE), mock.patch( + "google.cloud.storage.transfer_manager.XMLMPUContainer", + return_value=container_mock, + ), mock.patch("concurrent.futures.ThreadPoolExecutor") as pool_patch, mock.patch( + "concurrent.futures.wait" + ) as wait_patch: + try: + transfer_manager.upload_chunks_concurrently( + FILENAME, + blob_mock, + chunk_size=SIZE // 2, + worker_type=transfer_manager.THREAD, + max_workers=MAX_WORKERS, + deadline=DEADLINE, + ) + except ValueError: + pass # The futures don't actually work, so we expect this to abort. + # Conveniently, that gives us a chance to test the auto-delete + # exception handling feature. + container_mock.cancel.assert_called_once_with(transport) + pool_patch.assert_called_with(max_workers=MAX_WORKERS) + wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) + + class _PickleableMockBlob: def __init__( self, @@ -623,6 +705,26 @@ def download_to_file(self, *args, **kwargs): return "SUCCESS" +class _PickleableMockConnection: + @staticmethod + def get_api_base_url_for_mtls(): + return HOSTNAME + + +class _PickleableMockClient: + def __init__(self, identify_as_client=False): + self._http = None + self._connection = _PickleableMockConnection() + self.identify_as_client = identify_as_client + + @property + def __class__(self): + if self.identify_as_client: + return Client + else: + return _PickleableMockClient + + # Used in subprocesses only, so excluded from coverage def _validate_blob_token_in_subprocess_for_chunk( maybe_pickled_blob, filename, **kwargs @@ -665,13 +767,13 @@ def test__LazyClient(): assert len(fake_cache) == 1 -def test__pickle_blob(): +def test__pickle_client(): # This test nominally has coverage, but doesn't assert that the essential - # copyreg behavior in _pickle_blob works. Unfortunately there doesn't seem + # copyreg behavior in _pickle_client works. Unfortunately there doesn't seem # to be a good way to check that without actually creating a Client, which # will spin up HTTP connections undesirably. This is more fully checked in - # the system tests, though. - pkl = transfer_manager._pickle_blob(FAKE_RESULT) + # the system tests. + pkl = transfer_manager._pickle_client(FAKE_RESULT) assert pickle.loads(pkl) == FAKE_RESULT @@ -685,6 +787,24 @@ def test__download_and_write_chunk_in_place(): assert result == "SUCCESS" +def test__upload_part(): + pickled_mock = pickle.dumps(_PickleableMockClient()) + FILENAME = "file_a.txt" + UPLOAD_ID = "abcd" + ETAG = "efgh" + + part = mock.Mock() + part.etag = ETAG + with mock.patch( + "google.cloud.storage.transfer_manager.XMLMPUPart", return_value=part + ): + result = transfer_manager._upload_part( + pickled_mock, URL, UPLOAD_ID, FILENAME, 0, 256, 1, None + ) + part.upload.assert_called_once() + assert result == (1, ETAG) + + def test__get_pool_class_and_requirements_error(): with pytest.raises(ValueError): transfer_manager._get_pool_class_and_requirements("garbage") From 2aa0551e22098d93215c24370f03da4ba1967194 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Wed, 6 Sep 2023 17:19:06 -0700 Subject: [PATCH 04/11] integration tests --- google/cloud/storage/transfer_manager.py | 19 ++++++---- tests/system/test_transfer_manager.py | 47 ++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 8 deletions(-) diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index ca8abde80..436830751 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -876,8 +876,17 @@ def upload_chunks_concurrently( the documentation at https://cloud.google.com/storage/docs/multipart-uploads before using this feature. + Downloads that fail due to an exception will be proactively canceled by the + library. If the download fails due to reason that precludes cancellation, + such as a hardware failure, process termination or power outage, then the + incomplete upload may persist indefinitely. To mitigate this, set + the `AbortIncompleteMultipartUpload` with a nonzero `Age` in bucket + lifecycle rules, or refer to the XML API documentation linked above to learn + more about how to list and delete individual downloads. + Blob metadata beyond the name is not currently transmitted with this - feature. Please set blob metadata separately after uploading. + feature. Please set blob metadata separately after uploading. This includes + storage class and custom time options. Encryption is also not supported at present. Please do not use customer- supplied encryption keys or customer-managed encryption keys with this @@ -959,12 +968,6 @@ def upload_chunks_concurrently( :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. """ - # TODO: - # make base url creation more robust than string addition - consider also using that for query strings in resumable media - # metadata - # other special purpose header stuff like encryption - # figure out custom client stuff - bucket = blob.bucket client = blob.client transport = blob._get_transport(client) @@ -978,7 +981,7 @@ def upload_chunks_concurrently( upload_id = container.upload_id size = os.path.getsize(filename) - num_of_parts = -(size // -chunk_size) + num_of_parts = -(size // -chunk_size) # Ceiling division pool_class, needs_pickling = _get_pool_class_and_requirements(worker_type) # Pickle the blob ahead of time (just once, not once per chunk) if needed. diff --git a/tests/system/test_transfer_manager.py b/tests/system/test_transfer_manager.py index bc7e0d31e..610f48547 100644 --- a/tests/system/test_transfer_manager.py +++ b/tests/system/test_transfer_manager.py @@ -171,3 +171,50 @@ def test_download_chunks_concurrently(shared_bucket, file_data): ) with open(threaded_filename, "rb") as file_obj: assert _base64_md5hash(file_obj) == source_file["hash"] + + +def test_upload_chunks_concurrently(shared_bucket, file_data): + source_file = file_data["big"] + filename = source_file["path"] + blob_name = "mpu_file" + upload_blob = shared_bucket.blob(blob_name) + chunk_size = 5 * 1024 * 1024 # Minimum supported by XML MPU API + assert os.path.getsize(filename) > chunk_size # Won't make a good test otherwise + + transfer_manager.upload_chunks_concurrently( + filename, upload_blob, chunk_size=chunk_size, deadline=DEADLINE + ) + + with tempfile.NamedTemporaryFile() as tmp: + download_blob = shared_bucket.blob(blob_name) + download_blob.download_to_file(tmp) + tmp.seek(0) + + with open(source_file["path"], "rb") as sf: + source_contents = sf.read() + temp_contents = tmp.read() + assert source_contents == temp_contents + + # Also test threaded mode + blob_name = "mpu_threaded" + upload_blob = shared_bucket.blob(blob_name) + chunk_size = 5 * 1024 * 1024 # Minimum supported by XML MPU API + assert os.path.getsize(filename) > chunk_size # Won't make a good test otherwise + + transfer_manager.upload_chunks_concurrently( + filename, + upload_blob, + chunk_size=chunk_size, + deadline=DEADLINE, + worker_type=transfer_manager.THREAD, + ) + + with tempfile.NamedTemporaryFile() as tmp: + download_blob = shared_bucket.blob(blob_name) + download_blob.download_to_file(tmp) + tmp.seek(0) + + with open(source_file["path"], "rb") as sf: + source_contents = sf.read() + temp_contents = tmp.read() + assert source_contents == temp_contents From d89b56a3b40c35f48f950c2a331974808620a7e0 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Wed, 13 Sep 2023 09:39:33 -0700 Subject: [PATCH 05/11] add support for metadata --- google/cloud/storage/blob.py | 4 +- google/cloud/storage/transfer_manager.py | 61 ++++++++++++++++-------- tests/system/test_transfer_manager.py | 54 ++++++++++++++++++++- 3 files changed, 96 insertions(+), 23 deletions(-) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 0d663e775..4c493485f 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -1697,7 +1697,7 @@ def _get_writable_metadata(self): return object_metadata - def _get_upload_arguments(self, client, content_type): + def _get_upload_arguments(self, client, content_type, filename=None): """Get required arguments for performing an upload. The content type returned will be determined in order of precedence: @@ -1716,7 +1716,7 @@ def _get_upload_arguments(self, client, content_type): * An object metadata dictionary * The ``content_type`` as a string (according to precedence) """ - content_type = self._get_content_type(content_type) + content_type = self._get_content_type(content_type, filename=filename) headers = { **_get_default_headers(client._connection.user_agent, content_type), **_get_encryption_headers(self._encryption_key), diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 436830751..2269012bb 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -41,7 +41,14 @@ TM_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024 DEFAULT_MAX_WORKERS = 8 - +METADATA_HEADER_TRANSLATION = { + "cacheControl": "Cache-Control", + "contentDisposition": "Content-Disposition", + "contentEncoding": "Content-Encoding", + "contentLanguage": "Content-Language", + "customTime": "x-goog-custom-time", + "storageClass": "x-goog-storage-class", +} # Constants to be passed in as `worker_type`. PROCESS = "process" @@ -876,26 +883,21 @@ def upload_chunks_concurrently( the documentation at https://cloud.google.com/storage/docs/multipart-uploads before using this feature. - Downloads that fail due to an exception will be proactively canceled by the - library. If the download fails due to reason that precludes cancellation, - such as a hardware failure, process termination or power outage, then the - incomplete upload may persist indefinitely. To mitigate this, set - the `AbortIncompleteMultipartUpload` with a nonzero `Age` in bucket - lifecycle rules, or refer to the XML API documentation linked above to learn - more about how to list and delete individual downloads. - - Blob metadata beyond the name is not currently transmitted with this - feature. Please set blob metadata separately after uploading. This includes - storage class and custom time options. - - Encryption is also not supported at present. Please do not use customer- - supplied encryption keys or customer-managed encryption keys with this - feature. + The library will attempt to cancel uploads that fail due to an exception. + If the upload fails in a way that precludes cancellation, such as a + hardware failure, process termination, or power outage, then the incomplete + upload may persist indefinitely. To mitigate this, set the + `AbortIncompleteMultipartUpload` with a nonzero `Age` in bucket lifecycle + rules, or refer to the XML API documentation linked above to learn more + about how to list and delete individual downloads. Using this feature with multiple threads is unlikely to improve upload performance under normal circumstances due to Python interpreter threading behavior. The default is therefore to use processes instead of threads. + ACL information cannot be sent with this function and should be set + separately with :class:`ObjectACL` methods. + :type filename: str :param filename: The path to the file to upload. File-like objects are not supported. @@ -973,10 +975,12 @@ def upload_chunks_concurrently( transport = blob._get_transport(client) hostname = _get_host_name(client._connection) - url = hostname + "/" + bucket.name + "/" + blob.name # FIXME: make this nicer - content_type = blob._get_content_type(content_type, filename=filename) + url = "{hostname}/{bucket}/{blob}".format(hostname=hostname, bucket=bucket.name, blob=blob.name) - container = XMLMPUContainer(url, filename) + base_headers, object_metadata, content_type = blob._get_upload_arguments(client, content_type, filename=filename) + headers = {**base_headers, **_headers_from_metadata(object_metadata)} + + container = XMLMPUContainer(url, filename, headers=headers) container.initiate(transport=transport, content_type=content_type) upload_id = container.upload_id @@ -1006,6 +1010,7 @@ def upload_chunks_concurrently( end=end, part_number=part_number, checksum=checksum, + headers=headers ) ) @@ -1026,7 +1031,7 @@ def upload_chunks_concurrently( def _upload_part( - maybe_pickled_client, url, upload_id, filename, start, end, part_number, checksum + maybe_pickled_client, url, upload_id, filename, start, end, part_number, checksum, headers ): """Helper function that runs inside a thread or subprocess to upload a part. @@ -1046,11 +1051,27 @@ def _upload_part( end=end, part_number=part_number, checksum=checksum, + headers=headers ) part.upload(client._http) return (part_number, part.etag) +def _headers_from_metadata(metadata): + """Helper function to translate object metadata into a header dictionary.""" + + headers = {} + # Handle standard writable metadata + for key, value in metadata.items(): + if key in METADATA_HEADER_TRANSLATION: + headers[METADATA_HEADER_TRANSLATION[key]] = value + # Handle custom metadata + if "metadata" in metadata: + for key, value in metadata["metadata"].items(): + headers["x-goog-meta-" + key] = value + return headers + + def _download_and_write_chunk_in_place( maybe_pickled_blob, filename, start, end, download_kwargs ): diff --git a/tests/system/test_transfer_manager.py b/tests/system/test_transfer_manager.py index 610f48547..996fe0e97 100644 --- a/tests/system/test_transfer_manager.py +++ b/tests/system/test_transfer_manager.py @@ -173,7 +173,7 @@ def test_download_chunks_concurrently(shared_bucket, file_data): assert _base64_md5hash(file_obj) == source_file["hash"] -def test_upload_chunks_concurrently(shared_bucket, file_data): +def test_upload_chunks_concurrently(shared_bucket, file_data, blobs_to_delete): source_file = file_data["big"] filename = source_file["path"] blob_name = "mpu_file" @@ -181,6 +181,8 @@ def test_upload_chunks_concurrently(shared_bucket, file_data): chunk_size = 5 * 1024 * 1024 # Minimum supported by XML MPU API assert os.path.getsize(filename) > chunk_size # Won't make a good test otherwise + blobs_to_delete.append(upload_blob) + transfer_manager.upload_chunks_concurrently( filename, upload_blob, chunk_size=chunk_size, deadline=DEADLINE ) @@ -218,3 +220,53 @@ def test_upload_chunks_concurrently(shared_bucket, file_data): source_contents = sf.read() temp_contents = tmp.read() assert source_contents == temp_contents + +def test_upload_chunks_concurrently_with_metadata(shared_bucket, file_data, blobs_to_delete): + import datetime + from google.cloud._helpers import UTC +# from google.cloud._helpers import _RFC3339_MICROS + + now = datetime.datetime.utcnow().replace(tzinfo=UTC) +# NOW = now.strftime(_RFC3339_MICROS) + + custom_metadata = {"key_a": "value_a", "key_b": "value_b"} + + METADATA = { + "cache_control": "private", + "content_disposition": "inline", + "content_language": "en-US", + "custom_time": now, + "metadata": custom_metadata, + "storage_class": "NEARLINE", + } + + source_file = file_data["big"] + filename = source_file["path"] + blob_name = "mpu_file_with_metadata" + upload_blob = shared_bucket.blob(blob_name) + + blobs_to_delete.append(upload_blob) + + for key, value in METADATA.items(): + setattr(upload_blob, key, value) + + chunk_size = 5 * 1024 * 1024 # Minimum supported by XML MPU API + assert os.path.getsize(filename) > chunk_size # Won't make a good test otherwise + + transfer_manager.upload_chunks_concurrently( + filename, upload_blob, chunk_size=chunk_size, deadline=DEADLINE + ) + + with tempfile.NamedTemporaryFile() as tmp: + download_blob = shared_bucket.get_blob(blob_name) + + for key, value in METADATA.items(): + assert getattr(download_blob, key) == value + + download_blob.download_to_file(tmp) + tmp.seek(0) + + with open(source_file["path"], "rb") as sf: + source_contents = sf.read() + temp_contents = tmp.read() + assert source_contents == temp_contents From 7d113457d378d5905549e893702b2db77d7a6b7a Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Wed, 13 Sep 2023 16:41:44 +0000 Subject: [PATCH 06/11] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- google/cloud/storage/transfer_manager.py | 22 +++++++++++++++++----- tests/system/test_transfer_manager.py | 10 +++++++--- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 2269012bb..0b731801b 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -975,9 +975,13 @@ def upload_chunks_concurrently( transport = blob._get_transport(client) hostname = _get_host_name(client._connection) - url = "{hostname}/{bucket}/{blob}".format(hostname=hostname, bucket=bucket.name, blob=blob.name) + url = "{hostname}/{bucket}/{blob}".format( + hostname=hostname, bucket=bucket.name, blob=blob.name + ) - base_headers, object_metadata, content_type = blob._get_upload_arguments(client, content_type, filename=filename) + base_headers, object_metadata, content_type = blob._get_upload_arguments( + client, content_type, filename=filename + ) headers = {**base_headers, **_headers_from_metadata(object_metadata)} container = XMLMPUContainer(url, filename, headers=headers) @@ -1010,7 +1014,7 @@ def upload_chunks_concurrently( end=end, part_number=part_number, checksum=checksum, - headers=headers + headers=headers, ) ) @@ -1031,7 +1035,15 @@ def upload_chunks_concurrently( def _upload_part( - maybe_pickled_client, url, upload_id, filename, start, end, part_number, checksum, headers + maybe_pickled_client, + url, + upload_id, + filename, + start, + end, + part_number, + checksum, + headers, ): """Helper function that runs inside a thread or subprocess to upload a part. @@ -1051,7 +1063,7 @@ def _upload_part( end=end, part_number=part_number, checksum=checksum, - headers=headers + headers=headers, ) part.upload(client._http) return (part_number, part.etag) diff --git a/tests/system/test_transfer_manager.py b/tests/system/test_transfer_manager.py index 996fe0e97..55cfe728c 100644 --- a/tests/system/test_transfer_manager.py +++ b/tests/system/test_transfer_manager.py @@ -221,13 +221,17 @@ def test_upload_chunks_concurrently(shared_bucket, file_data, blobs_to_delete): temp_contents = tmp.read() assert source_contents == temp_contents -def test_upload_chunks_concurrently_with_metadata(shared_bucket, file_data, blobs_to_delete): + +def test_upload_chunks_concurrently_with_metadata( + shared_bucket, file_data, blobs_to_delete +): import datetime from google.cloud._helpers import UTC -# from google.cloud._helpers import _RFC3339_MICROS + + # from google.cloud._helpers import _RFC3339_MICROS now = datetime.datetime.utcnow().replace(tzinfo=UTC) -# NOW = now.strftime(_RFC3339_MICROS) + # NOW = now.strftime(_RFC3339_MICROS) custom_metadata = {"key_a": "value_a", "key_b": "value_b"} From 04ba357e25880d5be130a126065611e622d7d822 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Thu, 14 Sep 2023 12:03:18 -0700 Subject: [PATCH 07/11] encryption support --- google/cloud/storage/transfer_manager.py | 15 +++ tests/system/conftest.py | 36 +++++++ tests/system/test_transfer_manager.py | 118 ++++++++++++++++++++++- 3 files changed, 164 insertions(+), 5 deletions(-) diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 2269012bb..03bb88313 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -980,6 +980,21 @@ def upload_chunks_concurrently( base_headers, object_metadata, content_type = blob._get_upload_arguments(client, content_type, filename=filename) headers = {**base_headers, **_headers_from_metadata(object_metadata)} + if blob.user_project is not None: + headers["x-goog-user-project"] = blob.user_project + + # When a Customer Managed Encryption Key is used to encrypt Cloud Storage object + # at rest, object resource metadata will store the version of the Key Management + # Service cryptographic material. If a Blob instance with KMS Key metadata set is + # used to upload a new version of the object then the existing kmsKeyName version + # value can't be used in the upload request and the client instead ignores it. + if ( + blob.kms_key_name is not None + and "cryptoKeyVersions" not in blob.kms_key_name + ): + headers["x-goog-encryption-kms-key-name"] = blob.kms_key_name + + container = XMLMPUContainer(url, filename, headers=headers) container.initiate(transport=transport, content_type=content_type) upload_id = container.upload_id diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 26d5c785e..ac48acac2 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -46,6 +46,19 @@ ebh_bucket_iteration = 0 +_key_name_format = "projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}" + +keyring_name = "gcs-test" +default_key_name = "gcs-test" +alt_key_name = "gcs-test-alternate" + +def _kms_key_name(client, bucket, key_name): + return _key_name_format.format( + client.project, + bucket.location.lower(), + keyring_name, + key_name, + ) @pytest.fixture(scope="session") def storage_client(): @@ -218,3 +231,26 @@ def file_data(): file_data["hash"] = _base64_md5hash(file_obj) return _file_data + +@pytest.fixture(scope="session") +def kms_bucket_name(): + return _helpers.unique_name("gcp-systest-kms") + + +@pytest.fixture(scope="session") +def kms_bucket(storage_client, kms_bucket_name, no_mtls): + bucket = _helpers.retry_429_503(storage_client.create_bucket)(kms_bucket_name) + + yield bucket + + _helpers.delete_bucket(bucket) + + +@pytest.fixture(scope="session") +def kms_key_name(storage_client, kms_bucket): + return _kms_key_name(storage_client, kms_bucket, default_key_name) + + +@pytest.fixture(scope="session") +def alt_kms_key_name(storage_client, kms_bucket): + return _kms_key_name(storage_client, kms_bucket, alt_key_name) diff --git a/tests/system/test_transfer_manager.py b/tests/system/test_transfer_manager.py index 996fe0e97..af4cd9b61 100644 --- a/tests/system/test_transfer_manager.py +++ b/tests/system/test_transfer_manager.py @@ -16,6 +16,8 @@ import tempfile import os +import pytest + from google.cloud.storage import transfer_manager from google.cloud.storage._helpers import _base64_md5hash @@ -23,6 +25,16 @@ DEADLINE = 30 +encryption_key = "b23ff11bba187db8c37077e6af3b25b8" + + +def _check_blob_hash(blob, info): + md5_hash = blob.md5_hash + if not isinstance(md5_hash, bytes): + md5_hash = md5_hash.encode("utf-8") + + assert md5_hash == info["hash"] + def test_upload_many(shared_bucket, file_data, blobs_to_delete): FILE_BLOB_PAIRS = [ @@ -224,11 +236,8 @@ def test_upload_chunks_concurrently(shared_bucket, file_data, blobs_to_delete): def test_upload_chunks_concurrently_with_metadata(shared_bucket, file_data, blobs_to_delete): import datetime from google.cloud._helpers import UTC -# from google.cloud._helpers import _RFC3339_MICROS now = datetime.datetime.utcnow().replace(tzinfo=UTC) -# NOW = now.strftime(_RFC3339_MICROS) - custom_metadata = {"key_a": "value_a", "key_b": "value_b"} METADATA = { @@ -245,8 +254,6 @@ def test_upload_chunks_concurrently_with_metadata(shared_bucket, file_data, blob blob_name = "mpu_file_with_metadata" upload_blob = shared_bucket.blob(blob_name) - blobs_to_delete.append(upload_blob) - for key, value in METADATA.items(): setattr(upload_blob, key, value) @@ -256,6 +263,7 @@ def test_upload_chunks_concurrently_with_metadata(shared_bucket, file_data, blob transfer_manager.upload_chunks_concurrently( filename, upload_blob, chunk_size=chunk_size, deadline=DEADLINE ) + blobs_to_delete.append(upload_blob) with tempfile.NamedTemporaryFile() as tmp: download_blob = shared_bucket.get_blob(blob_name) @@ -270,3 +278,103 @@ def test_upload_chunks_concurrently_with_metadata(shared_bucket, file_data, blob source_contents = sf.read() temp_contents = tmp.read() assert source_contents == temp_contents + + +def test_upload_chunks_concurrently_with_content_encoding(shared_bucket, file_data, blobs_to_delete): + import gzip + + METADATA = { + "content_encoding": "gzip", + } + + source_file = file_data["big"] + filename = source_file["path"] + blob_name = "mpu_file_encoded" + upload_blob = shared_bucket.blob(blob_name) + + for key, value in METADATA.items(): + setattr(upload_blob, key, value) + + chunk_size = 5 * 1024 * 1024 # Minimum supported by XML MPU API + + with tempfile.NamedTemporaryFile() as tmp_gzip: + with open(filename, 'rb') as f: + compressed_bytes = gzip.compress(f.read()) + + tmp_gzip.write(compressed_bytes) + tmp_gzip.seek(0) + transfer_manager.upload_chunks_concurrently( + tmp_gzip.name, upload_blob, chunk_size=chunk_size, deadline=DEADLINE + ) + blobs_to_delete.append(upload_blob) + + with tempfile.NamedTemporaryFile() as tmp: + download_blob = shared_bucket.get_blob(blob_name) + + for key, value in METADATA.items(): + assert getattr(download_blob, key) == value + + download_blob.download_to_file(tmp) + tmp.seek(0) + + with open(source_file["path"], "rb") as sf: + source_contents = sf.read() + temp_contents = tmp.read() + assert source_contents == temp_contents + +def test_upload_chunks_concurrently_with_encryption_key(shared_bucket, file_data, blobs_to_delete): + source_file = file_data["big"] + filename = source_file["path"] + blob_name = "mpu_file_encrypted" + upload_blob = shared_bucket.blob(blob_name, encryption_key=encryption_key) + + chunk_size = 5 * 1024 * 1024 # Minimum supported by XML MPU API + assert os.path.getsize(filename) > chunk_size # Won't make a good test otherwise + + transfer_manager.upload_chunks_concurrently( + filename, upload_blob, chunk_size=chunk_size, deadline=DEADLINE + ) + blobs_to_delete.append(upload_blob) + + with tempfile.NamedTemporaryFile() as tmp: + download_blob = shared_bucket.get_blob(blob_name, encryption_key=encryption_key) + + download_blob.download_to_file(tmp) + tmp.seek(0) + + with open(source_file["path"], "rb") as sf: + source_contents = sf.read() + temp_contents = tmp.read() + assert source_contents == temp_contents + + with tempfile.NamedTemporaryFile() as tmp: + keyless_blob = shared_bucket.get_blob(blob_name) + + with pytest.raises(exceptions.BadRequest): + keyless_blob.download_to_file(tmp) + + +def test_upload_chunks_concurrently_with_kms(kms_bucket, file_data, blobs_to_delete, kms_key_name): + source_file = file_data["big"] + filename = source_file["path"] + blob_name = "mpu_file_kms" + blob = kms_bucket.blob(blob_name, kms_key_name=kms_key_name) + + chunk_size = 5 * 1024 * 1024 # Minimum supported by XML MPU API + assert os.path.getsize(filename) > chunk_size # Won't make a good test otherwise + + transfer_manager.upload_chunks_concurrently( + filename, blob, chunk_size=chunk_size, deadline=DEADLINE + ) + blobs_to_delete.append(blob) + blob.reload() + assert blob.kms_key_name.startswith(kms_key_name) + + with tempfile.NamedTemporaryFile() as tmp: + blob.download_to_file(tmp) + tmp.seek(0) + + with open(source_file["path"], "rb") as sf: + source_contents = sf.read() + temp_contents = tmp.read() + assert source_contents == temp_contents From 1313c9e3d838149fcb71bc2b184265ed90c34a47 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Thu, 14 Sep 2023 16:32:02 -0700 Subject: [PATCH 08/11] unit tests for mpu --- google/cloud/storage/transfer_manager.py | 28 +++-- tests/system/conftest.py | 3 + tests/system/test_transfer_manager.py | 20 +++- tests/unit/test_transfer_manager.py | 146 +++++++++++++++++++---- 4 files changed, 159 insertions(+), 38 deletions(-) diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 03bb88313..7bb3e8bdf 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -975,9 +975,13 @@ def upload_chunks_concurrently( transport = blob._get_transport(client) hostname = _get_host_name(client._connection) - url = "{hostname}/{bucket}/{blob}".format(hostname=hostname, bucket=bucket.name, blob=blob.name) + url = "{hostname}/{bucket}/{blob}".format( + hostname=hostname, bucket=bucket.name, blob=blob.name + ) - base_headers, object_metadata, content_type = blob._get_upload_arguments(client, content_type, filename=filename) + base_headers, object_metadata, content_type = blob._get_upload_arguments( + client, content_type, filename=filename + ) headers = {**base_headers, **_headers_from_metadata(object_metadata)} if blob.user_project is not None: @@ -988,13 +992,9 @@ def upload_chunks_concurrently( # Service cryptographic material. If a Blob instance with KMS Key metadata set is # used to upload a new version of the object then the existing kmsKeyName version # value can't be used in the upload request and the client instead ignores it. - if ( - blob.kms_key_name is not None - and "cryptoKeyVersions" not in blob.kms_key_name - ): + if blob.kms_key_name is not None and "cryptoKeyVersions" not in blob.kms_key_name: headers["x-goog-encryption-kms-key-name"] = blob.kms_key_name - container = XMLMPUContainer(url, filename, headers=headers) container.initiate(transport=transport, content_type=content_type) upload_id = container.upload_id @@ -1025,7 +1025,7 @@ def upload_chunks_concurrently( end=end, part_number=part_number, checksum=checksum, - headers=headers + headers=headers, ) ) @@ -1046,7 +1046,15 @@ def upload_chunks_concurrently( def _upload_part( - maybe_pickled_client, url, upload_id, filename, start, end, part_number, checksum, headers + maybe_pickled_client, + url, + upload_id, + filename, + start, + end, + part_number, + checksum, + headers, ): """Helper function that runs inside a thread or subprocess to upload a part. @@ -1066,7 +1074,7 @@ def _upload_part( end=end, part_number=part_number, checksum=checksum, - headers=headers + headers=headers, ) part.upload(client._http) return (part_number, part.etag) diff --git a/tests/system/conftest.py b/tests/system/conftest.py index ac48acac2..fe90ceb80 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -52,6 +52,7 @@ default_key_name = "gcs-test" alt_key_name = "gcs-test-alternate" + def _kms_key_name(client, bucket, key_name): return _key_name_format.format( client.project, @@ -60,6 +61,7 @@ def _kms_key_name(client, bucket, key_name): key_name, ) + @pytest.fixture(scope="session") def storage_client(): from google.cloud.storage import Client @@ -232,6 +234,7 @@ def file_data(): return _file_data + @pytest.fixture(scope="session") def kms_bucket_name(): return _helpers.unique_name("gcp-systest-kms") diff --git a/tests/system/test_transfer_manager.py b/tests/system/test_transfer_manager.py index af4cd9b61..fc7bc2d51 100644 --- a/tests/system/test_transfer_manager.py +++ b/tests/system/test_transfer_manager.py @@ -233,7 +233,10 @@ def test_upload_chunks_concurrently(shared_bucket, file_data, blobs_to_delete): temp_contents = tmp.read() assert source_contents == temp_contents -def test_upload_chunks_concurrently_with_metadata(shared_bucket, file_data, blobs_to_delete): + +def test_upload_chunks_concurrently_with_metadata( + shared_bucket, file_data, blobs_to_delete +): import datetime from google.cloud._helpers import UTC @@ -280,7 +283,9 @@ def test_upload_chunks_concurrently_with_metadata(shared_bucket, file_data, blob assert source_contents == temp_contents -def test_upload_chunks_concurrently_with_content_encoding(shared_bucket, file_data, blobs_to_delete): +def test_upload_chunks_concurrently_with_content_encoding( + shared_bucket, file_data, blobs_to_delete +): import gzip METADATA = { @@ -298,7 +303,7 @@ def test_upload_chunks_concurrently_with_content_encoding(shared_bucket, file_da chunk_size = 5 * 1024 * 1024 # Minimum supported by XML MPU API with tempfile.NamedTemporaryFile() as tmp_gzip: - with open(filename, 'rb') as f: + with open(filename, "rb") as f: compressed_bytes = gzip.compress(f.read()) tmp_gzip.write(compressed_bytes) @@ -322,7 +327,10 @@ def test_upload_chunks_concurrently_with_content_encoding(shared_bucket, file_da temp_contents = tmp.read() assert source_contents == temp_contents -def test_upload_chunks_concurrently_with_encryption_key(shared_bucket, file_data, blobs_to_delete): + +def test_upload_chunks_concurrently_with_encryption_key( + shared_bucket, file_data, blobs_to_delete +): source_file = file_data["big"] filename = source_file["path"] blob_name = "mpu_file_encrypted" @@ -354,7 +362,9 @@ def test_upload_chunks_concurrently_with_encryption_key(shared_bucket, file_data keyless_blob.download_to_file(tmp) -def test_upload_chunks_concurrently_with_kms(kms_bucket, file_data, blobs_to_delete, kms_key_name): +def test_upload_chunks_concurrently_with_kms( + kms_bucket, file_data, blobs_to_delete, kms_key_name +): source_file = file_data["big"] filename = source_file["path"] blob_name = "mpu_file_kms" diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index 3e2a48c52..479341899 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -34,8 +34,9 @@ FAKE_ENCODING = "fake_gzip" DOWNLOAD_KWARGS = {"accept-encoding": FAKE_ENCODING} CHUNK_SIZE = 8 -HOSTNAME = "https://example.com/" -URL = "https://example.com/bucket/blob/" +HOSTNAME = "https://example.com" +URL = "https://example.com/bucket/blob" +USER_AGENT = "agent" # Used in subprocesses only, so excluded from coverage @@ -604,13 +605,15 @@ def test_download_chunks_concurrently_passes_concurrency_options(): def test_upload_chunks_concurrently(): - blob_mock = mock.Mock() - blob_mock.name = "blob" - blob_mock.bucket.name = "bucket" - transport = mock.Mock() - blob_mock._get_transport = mock.Mock(return_value=transport) - blob_mock._get_content_type = mock.Mock(return_value=FAKE_CONTENT_TYPE) - blob_mock.client = _PickleableMockClient(identify_as_client=True) + bucket = mock.Mock() + bucket.name = "bucket" + bucket.client = _PickleableMockClient(identify_as_client=True) + transport = bucket.client._http + bucket.user_project = None + + blob = Blob("blob", bucket) + blob.content_type = FAKE_CONTENT_TYPE + FILENAME = "file_a.txt" SIZE = 2048 @@ -628,27 +631,28 @@ def test_upload_chunks_concurrently(): ): transfer_manager.upload_chunks_concurrently( FILENAME, - blob_mock, + blob, chunk_size=SIZE // 2, worker_type=transfer_manager.THREAD, ) container_mock.initiate.assert_called_once_with( - transport=transport, content_type=FAKE_CONTENT_TYPE + transport=transport, content_type=blob.content_type ) container_mock.register_part.assert_any_call(1, ETAG) container_mock.register_part.assert_any_call(2, ETAG) - container_mock.finalize.assert_called_once_with(transport) - part_mock.upload.assert_called_with(blob_mock.client._http) + container_mock.finalize.assert_called_once_with(bucket.client._http) + part_mock.upload.assert_called_with(transport) def test_upload_chunks_concurrently_passes_concurrency_options(): - blob_mock = mock.Mock() - blob_mock.name = "blob" - blob_mock.bucket.name = "bucket" - transport = mock.Mock() - blob_mock._get_transport = mock.Mock(return_value=transport) - blob_mock._get_content_type = mock.Mock(return_value=FAKE_CONTENT_TYPE) - blob_mock.client = _PickleableMockClient(identify_as_client=True) + bucket = mock.Mock() + bucket.name = "bucket" + bucket.client = _PickleableMockClient(identify_as_client=True) + transport = bucket.client._http + bucket.user_project = None + + blob = Blob("blob", bucket) + FILENAME = "file_a.txt" SIZE = 2048 @@ -667,7 +671,7 @@ def test_upload_chunks_concurrently_passes_concurrency_options(): try: transfer_manager.upload_chunks_concurrently( FILENAME, - blob_mock, + blob, chunk_size=SIZE // 2, worker_type=transfer_manager.THREAD, max_workers=MAX_WORKERS, @@ -682,6 +686,100 @@ def test_upload_chunks_concurrently_passes_concurrency_options(): wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) +def test_upload_chunks_concurrently_with_metadata_and_encryption(): + import datetime + from google.cloud._helpers import UTC + from google.cloud._helpers import _RFC3339_MICROS + + now = datetime.datetime.utcnow().replace(tzinfo=UTC) + now_str = now.strftime(_RFC3339_MICROS) + + custom_metadata = {"key_a": "value_a", "key_b": "value_b"} + encryption_key = "b23ff11bba187db8c37077e6af3b25b8" + kms_key_name = "sample_key_name" + + METADATA = { + "cache_control": "private", + "content_disposition": "inline", + "content_language": "en-US", + "custom_time": now, + "metadata": custom_metadata, + "storage_class": "NEARLINE", + } + + bucket = mock.Mock() + bucket.name = "bucket" + bucket.client = _PickleableMockClient(identify_as_client=True) + transport = bucket.client._http + user_project = "my_project" + bucket.user_project = user_project + + blob = Blob("blob", bucket, kms_key_name=kms_key_name) + blob.content_type = FAKE_CONTENT_TYPE + + for key, value in METADATA.items(): + setattr(blob, key, value) + blob.metadata = {**custom_metadata} + blob.encryption_key = encryption_key + + FILENAME = "file_a.txt" + SIZE = 2048 + + container_mock = mock.Mock() + container_mock.upload_id = "abcd" + part_mock = mock.Mock() + ETAG = "efgh" + part_mock.etag = ETAG + container_cls_mock = mock.Mock(return_value=container_mock) + + invocation_id = "b9f8cbb0-6456-420c-819d-3f4ee3c0c455" + + with mock.patch("os.path.getsize", return_value=SIZE), mock.patch( + "google.cloud.storage.transfer_manager.XMLMPUContainer", new=container_cls_mock + ), mock.patch( + "google.cloud.storage.transfer_manager.XMLMPUPart", return_value=part_mock + ), mock.patch( + "google.cloud.storage._helpers._get_invocation_id", + return_value="gccl-invocation-id/" + invocation_id, + ): + transfer_manager.upload_chunks_concurrently( + FILENAME, + blob, + chunk_size=SIZE // 2, + worker_type=transfer_manager.THREAD, + ) + expected_headers = { + "Accept": "application/json", + "Accept-Encoding": "gzip, deflate", + "User-Agent": "agent", + "X-Goog-API-Client": "agent gccl-invocation-id/{}".format(invocation_id), + "content-type": FAKE_CONTENT_TYPE, + "x-upload-content-type": FAKE_CONTENT_TYPE, + "X-Goog-Encryption-Algorithm": "AES256", + "X-Goog-Encryption-Key": "YjIzZmYxMWJiYTE4N2RiOGMzNzA3N2U2YWYzYjI1Yjg=", + "X-Goog-Encryption-Key-Sha256": "B25Y4hgVlNXDliAklsNz9ykLk7qvgqDrSbdds5iu8r4=", + "Cache-Control": "private", + "Content-Disposition": "inline", + "Content-Language": "en-US", + "x-goog-storage-class": "NEARLINE", + "x-goog-custom-time": now_str, + "x-goog-meta-key_a": "value_a", + "x-goog-meta-key_b": "value_b", + "x-goog-user-project": "my_project", + "x-goog-encryption-kms-key-name": "sample_key_name", + } + container_cls_mock.assert_called_once_with( + URL, FILENAME, headers=expected_headers + ) + container_mock.initiate.assert_called_once_with( + transport=transport, content_type=blob.content_type + ) + container_mock.register_part.assert_any_call(1, ETAG) + container_mock.register_part.assert_any_call(2, ETAG) + container_mock.finalize.assert_called_once_with(transport) + part_mock.upload.assert_called_with(blob.client._http) + + class _PickleableMockBlob: def __init__( self, @@ -710,10 +808,12 @@ class _PickleableMockConnection: def get_api_base_url_for_mtls(): return HOSTNAME + user_agent = USER_AGENT + class _PickleableMockClient: def __init__(self, identify_as_client=False): - self._http = None + self._http = "my_transport" # used as an identifier for "called_with" self._connection = _PickleableMockConnection() self.identify_as_client = identify_as_client @@ -799,7 +899,7 @@ def test__upload_part(): "google.cloud.storage.transfer_manager.XMLMPUPart", return_value=part ): result = transfer_manager._upload_part( - pickled_mock, URL, UPLOAD_ID, FILENAME, 0, 256, 1, None + pickled_mock, URL, UPLOAD_ID, FILENAME, 0, 256, 1, None, {"key", "value"} ) part.upload.assert_called_once() assert result == (1, ETAG) From 43fd95c91fdafe61eac65e46a471684586e5a615 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Thu, 14 Sep 2023 16:35:51 -0700 Subject: [PATCH 09/11] docs update --- google/cloud/storage/transfer_manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 7bb3e8bdf..5cb9b6c46 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -913,7 +913,8 @@ def upload_chunks_concurrently( :param chunk_size: The size in bytes of each chunk to send. The optimal chunk size for maximum throughput may vary depending on the exact network environment - and size of the blob. + and size of the blob. The remote API has restrictions on the minimum + and maximum size allowable, see: https://cloud.google.com/storage/quotas#requests :type deadline: int :param deadline: From 1c641972b51a8b892542026c0abfcb6e1e82a2bf Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 18 Sep 2023 09:59:35 -0700 Subject: [PATCH 10/11] fix unit test issue --- tests/unit/test_transfer_manager.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index 479341899..ea9cabf2d 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -533,7 +533,7 @@ def test_download_chunks_concurrently(): blob_mock.download_to_filename.return_value = FAKE_RESULT - with mock.patch("__main__.open", mock.mock_open()): + with mock.patch("google.cloud.storage.transfer_manager.open", mock.mock_open()): result = transfer_manager.download_chunks_concurrently( blob_mock, FILENAME, @@ -558,7 +558,7 @@ def test_download_chunks_concurrently_raises_on_start_and_end(): MULTIPLE = 4 blob_mock.size = CHUNK_SIZE * MULTIPLE - with mock.patch("__main__.open", mock.mock_open()): + with mock.patch("google.cloud.storage.transfer_manager.open", mock.mock_open()): with pytest.raises(ValueError): transfer_manager.download_chunks_concurrently( blob_mock, @@ -591,7 +591,7 @@ def test_download_chunks_concurrently_passes_concurrency_options(): with mock.patch("concurrent.futures.ThreadPoolExecutor") as pool_patch, mock.patch( "concurrent.futures.wait" - ) as wait_patch, mock.patch("__main__.open", mock.mock_open()): + ) as wait_patch, mock.patch("google.cloud.storage.transfer_manager.open", mock.mock_open()): transfer_manager.download_chunks_concurrently( blob_mock, FILENAME, @@ -844,7 +844,7 @@ def test_download_chunks_concurrently_with_processes(): with mock.patch( "google.cloud.storage.transfer_manager._download_and_write_chunk_in_place", new=_validate_blob_token_in_subprocess_for_chunk, - ), mock.patch("__main__.open", mock.mock_open()): + ), mock.patch("google.cloud.storage.transfer_manager.open", mock.mock_open()): result = transfer_manager.download_chunks_concurrently( blob, FILENAME, @@ -880,7 +880,7 @@ def test__pickle_client(): def test__download_and_write_chunk_in_place(): pickled_mock = pickle.dumps(_PickleableMockBlob()) FILENAME = "file_a.txt" - with mock.patch("__main__.open", mock.mock_open()): + with mock.patch("google.cloud.storage.transfer_manager.open", mock.mock_open()): result = transfer_manager._download_and_write_chunk_in_place( pickled_mock, FILENAME, 0, 8, {} ) From 1049a9acceefbe659b7989e7b5d6cf9200196f0b Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Mon, 18 Sep 2023 17:01:42 +0000 Subject: [PATCH 11/11] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- tests/unit/test_transfer_manager.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index ea9cabf2d..f1d760043 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -591,7 +591,9 @@ def test_download_chunks_concurrently_passes_concurrency_options(): with mock.patch("concurrent.futures.ThreadPoolExecutor") as pool_patch, mock.patch( "concurrent.futures.wait" - ) as wait_patch, mock.patch("google.cloud.storage.transfer_manager.open", mock.mock_open()): + ) as wait_patch, mock.patch( + "google.cloud.storage.transfer_manager.open", mock.mock_open() + ): transfer_manager.download_chunks_concurrently( blob_mock, FILENAME,