diff --git a/src/deadline/job_attachments/upload.py b/src/deadline/job_attachments/upload.py index 5c63d932..86c31b0c 100644 --- a/src/deadline/job_attachments/upload.py +++ b/src/deadline/job_attachments/upload.py @@ -65,6 +65,10 @@ LIST_OBJECT_THRESHOLD: int = 100 CHUNK_SIZE_FOR_MULTIPART_UPLOAD = 8 * (1024**2) # 8 MB (Default chunk size for multipart upload) MAX_WORKERS_FOR_MULTIPART_UPLOAD = 10 # Max workers for multipart upload +# We split the input files into two queues - one for small files, the other for large files. +# The threshold to draw a line on large vs. small file must be a multiple of the S3 multipart +# upload chunk size, 8 MB, which is a default size. +SMALL_FILE_THRESHOLD: int = 160 * (1024**2) # 160 MB logger = logging.getLogger("deadline.job_attachments.upload") @@ -191,6 +195,13 @@ def upload_input_files( sum((source_root.joinpath(file.path)).stat().st_size for file in files_to_skip), ) + # Split into a separate 'large file' and 'small file' queues. + # Separate 'large' files from 'small' files so that we can process 'large' files serially. + # This wastes less bandwidth if uploads are cancelled, as it's better to use the multi-threaded + # multi-part upload for a single large file than multiple large files at the same time. + (small_file_queue, large_file_queue) = self._separate_files_by_size(files_to_upload) + + # First, process the whole 'small file' queue with parallel object uploads. # TODO: tune this. max_worker defaults to 5 * number of processors. We can run into issues here # if we thread too aggressively on slower internet connections. So for now let's set it to 5, # which would the number of threads with one processor. @@ -205,7 +216,7 @@ def upload_input_files( check_if_in_s3, progress_tracker, ): file - for file in files_to_upload + for file in small_file_queue } # surfaces any exceptions in the thread for future in concurrent.futures.as_completed(futures): @@ -213,6 +224,19 @@ def upload_input_files( if progress_tracker and not is_uploaded: progress_tracker.increase_skipped(1, file_size) + # Now process the whole 'large file' queue with serial object uploads (but still parallel multi-part upload.) + for file in large_file_queue: + (is_uploaded, file_size) = self.upload_object_to_cas( + file, + s3_bucket, + source_root, + s3_cas_prefix, + check_if_in_s3, + progress_tracker, + ) + if progress_tracker and not is_uploaded: + progress_tracker.increase_skipped(1, file_size) + # to report progress 100% at the end, and # to check if the job submission was canceled in the middle of processing the last batch of files. if progress_tracker: @@ -222,6 +246,23 @@ def upload_input_files( "File upload cancelled.", progress_tracker.get_summary_statistics() ) + def _separate_files_by_size( + self, + files_to_upload: list[base_manifest.BaseManifestPath], + size_threshold: int = SMALL_FILE_THRESHOLD, + ) -> Tuple[list[base_manifest.BaseManifestPath], list[base_manifest.BaseManifestPath]]: + """ + Splits the given list of files into two queues: one for small files and one for large files. + """ + small_file_queue: list[base_manifest.BaseManifestPath] = [] + large_file_queue: list[base_manifest.BaseManifestPath] = [] + for file in files_to_upload: + if file.size <= size_threshold: + small_file_queue.append(file) + else: + large_file_queue.append(file) + return (small_file_queue, large_file_queue) + def upload_object_to_cas( self, file: base_manifest.BaseManifestPath, diff --git a/test/unit/deadline_job_attachments/test_upload.py b/test/unit/deadline_job_attachments/test_upload.py index 9e7f7569..064b6785 100644 --- a/test/unit/deadline_job_attachments/test_upload.py +++ b/test/unit/deadline_job_attachments/test_upload.py @@ -22,7 +22,11 @@ from moto import mock_sts import deadline -from deadline.job_attachments.asset_manifests import BaseManifestModel, ManifestVersion +from deadline.job_attachments.asset_manifests import ( + BaseManifestModel, + BaseManifestPath, + ManifestVersion, +) from deadline.job_attachments.exceptions import ( JobAttachmentsS3ClientError, MissingS3BucketError, @@ -2060,6 +2064,63 @@ def test_get_asset_groups_for_windows_case_insensitive( } assert result[0].outputs == {Path("C:\\username\\docs\\outputs")} + @pytest.mark.parametrize( + "input_files, size_threshold, expected_queues", + [ + ( + [], + 100 * (1024**2), # 100 MB + ([], []), + ), + ( + [ + BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1), + BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1), + BaseManifestPath(path="", hash="", size=1000 * (1024**2), mtime=1), + ], + 100 * (1024**2), # 100 MB + ( + [ + BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1), + BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1), + ], + [ + BaseManifestPath(path="", hash="", size=1000 * (1024**2), mtime=1), + ], + ), + ), + ( + [ + BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1), + BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1), + ], + 800 * (1024**2), # 800 MB + ( + [ + BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1), + BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1), + ], + [], + ), + ), + ], + ) + def test_separate_files_by_size( + self, + input_files: List[BaseManifestPath], + size_threshold: int, + expected_queues: Tuple[List[BaseManifestPath], List[BaseManifestPath]], + ): + """ + Tests that a helper method `_separate_files_by_size` is working as expected. + """ + a3_asset_uploader = S3AssetUploader() + actual_queues = a3_asset_uploader._separate_files_by_size( + files_to_upload=input_files, + size_threshold=size_threshold, + ) + assert actual_queues == expected_queues + def assert_progress_report_last_callback( num_input_files: int,