diff --git a/scripted_tests/upload_cancel_test.py b/scripted_tests/upload_cancel_test.py index 05b34bdfd..76f150e12 100644 --- a/scripted_tests/upload_cancel_test.py +++ b/scripted_tests/upload_cancel_test.py @@ -74,7 +74,7 @@ def run(): file_path = root_path / f"medium_test{i}.txt" if not os.path.exists(file_path): with file_path.open("wb") as f: - f.write(os.urandom(102428800)) # 100 MB files + f.write(os.urandom(100 * (1024**2))) # 100 MB files files.append(str(file_path)) # Make large files @@ -84,7 +84,7 @@ def run(): if not os.path.exists(file_path): for i in range(1): with file_path.open("ab") as f: - f.write(os.urandom(1073741824)) # Write 1 GB at a time + f.write(os.urandom(1 * (1024**3))) # Write 1 GB at a time files.append(str(file_path)) queue = get_queue(farm_id=farm_id, queue_id=queue_id) diff --git a/src/deadline/job_attachments/upload.py b/src/deadline/job_attachments/upload.py index 00ba9f0f9..fce1efd74 100644 --- a/src/deadline/job_attachments/upload.py +++ b/src/deadline/job_attachments/upload.py @@ -61,7 +61,7 @@ # TODO: full performance analysis to determine the ideal threshold LIST_OBJECT_THRESHOLD: int = 100 - +SMALL_FILE_THRESHOLD: int = 400 * (1024**2) # 400 MB logger = logging.getLogger("deadline.job_attachments.upload") @@ -188,6 +188,10 @@ def upload_input_files( sum((source_root.joinpath(file.path)).stat().st_size for file in files_to_skip), ) + # Split into a separate 'large file' and 'small file' queues. + (small_file_queue, large_file_queue) = self._separate_files_by_size(files_to_upload) + + # First, process the whole 'small file' queue with parallel object uploads. # TODO: tune this. max_worker defaults to 5 * number of processors. We can run into issues here # if we thread too aggressively on slower internet connections. So for now let's set it to 5, # which would the number of threads with one processor. @@ -202,7 +206,7 @@ def upload_input_files( check_if_in_s3, progress_tracker, ): file - for file in files_to_upload + for file in small_file_queue } # surfaces any exceptions in the thread for future in concurrent.futures.as_completed(futures): @@ -210,6 +214,19 @@ def upload_input_files( if progress_tracker and not is_uploaded: progress_tracker.increase_skipped(1, file_size) + # Now process the whole 'large file' queue with serial object uploads (but still parallel multi-part upload.) + for file in large_file_queue: + (is_uploaded, file_size) = self.upload_object_to_cas( + file, + s3_bucket, + source_root, + s3_cas_prefix, + check_if_in_s3, + progress_tracker, + ) + if progress_tracker and not is_uploaded: + progress_tracker.increase_skipped(1, file_size) + # to report progress 100% at the end, and # to check if the job submission was canceled in the middle of processing the last batch of files. if progress_tracker: @@ -219,6 +236,23 @@ def upload_input_files( "File upload cancelled.", progress_tracker.get_summary_statistics() ) + def _separate_files_by_size( + self, + files_to_upload: list[base_manifest.BaseManifestPath], + size_threshold: int = SMALL_FILE_THRESHOLD, + ) -> Tuple[list[base_manifest.BaseManifestPath], list[base_manifest.BaseManifestPath]]: + """ + Splits the given list of files into two queues: one for small files and one for large files. + """ + small_file_queue: list[base_manifest.BaseManifestPath] = [] + large_file_queue: list[base_manifest.BaseManifestPath] = [] + for file in files_to_upload: + if file.size <= size_threshold: + small_file_queue.append(file) + else: + large_file_queue.append(file) + return (small_file_queue, large_file_queue) + def upload_object_to_cas( self, file: base_manifest.BaseManifestPath, diff --git a/test/unit/deadline_job_attachments/test_upload.py b/test/unit/deadline_job_attachments/test_upload.py index 84f7ca087..bd15372e0 100644 --- a/test/unit/deadline_job_attachments/test_upload.py +++ b/test/unit/deadline_job_attachments/test_upload.py @@ -23,7 +23,11 @@ from moto import mock_sts import deadline -from deadline.job_attachments.asset_manifests import BaseManifestModel, ManifestVersion +from deadline.job_attachments.asset_manifests import ( + BaseManifestModel, + BaseManifestPath, + ManifestVersion, +) from deadline.job_attachments.exceptions import ( AssetSyncError, JobAttachmentsS3ClientError, @@ -2055,6 +2059,63 @@ def test_get_asset_groups_for_windows_case_insensitive( } assert result[0].outputs == {Path("C:\\username\\docs\\outputs")} + @pytest.mark.parametrize( + "input_files, size_threshold, expected_queues", + [ + ( + [], + 100 * (1024**2), # 100 MB + ([], []), + ), + ( + [ + BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1), + BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1), + BaseManifestPath(path="", hash="", size=1000 * (1024**2), mtime=1), + ], + 100 * (1024**2), # 100 MB + ( + [ + BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1), + BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1), + ], + [ + BaseManifestPath(path="", hash="", size=1000 * (1024**2), mtime=1), + ], + ), + ), + ( + [ + BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1), + BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1), + ], + 800 * (1024**2), # 800 MB + ( + [ + BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1), + BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1), + ], + [], + ), + ), + ], + ) + def test_separate_files_by_size( + self, + input_files: list[BaseManifestPath], + size_threshold: int, + expected_queues: tuple[list[BaseManifestPath], list[BaseManifestPath]], + ): + """ + Tests that a helper method `_separate_files_by_size` is working as expected. + """ + a3_asset_uploader = S3AssetUploader() + actual_queues = a3_asset_uploader._separate_files_by_size( + files_to_upload=input_files, + size_threshold=size_threshold, + ) + assert actual_queues == expected_queues + def assert_progress_report_last_callback( num_input_files: int,