From cc6ec8ebffca76d585e3e954329f0f47e333f7e8 Mon Sep 17 00:00:00 2001 From: Gahyun Suh Date: Mon, 20 Nov 2023 17:44:41 +0000 Subject: [PATCH] fix(job_attachments): distinguish large/small file uploads - Split the input files to upload into two separate queues based on their sizes, one for smaller files and another for larger ones. - First, process the whole "small file" queue with parallel object uploads. Then, once the small files are done, process the whole "large file" queue with serial object uploads (but still parallel multi-part upload). Signed-off-by: Gahyun Suh --- scripted_tests/upload_cancel_test.py | 4 +- src/deadline/job_attachments/upload.py | 38 ++++++++++- .../deadline_job_attachments/test_upload.py | 63 ++++++++++++++++++- 3 files changed, 100 insertions(+), 5 deletions(-) diff --git a/scripted_tests/upload_cancel_test.py b/scripted_tests/upload_cancel_test.py index 05b34bdfd..76f150e12 100644 --- a/scripted_tests/upload_cancel_test.py +++ b/scripted_tests/upload_cancel_test.py @@ -74,7 +74,7 @@ def run(): file_path = root_path / f"medium_test{i}.txt" if not os.path.exists(file_path): with file_path.open("wb") as f: - f.write(os.urandom(102428800)) # 100 MB files + f.write(os.urandom(100 * (1024**2))) # 100 MB files files.append(str(file_path)) # Make large files @@ -84,7 +84,7 @@ def run(): if not os.path.exists(file_path): for i in range(1): with file_path.open("ab") as f: - f.write(os.urandom(1073741824)) # Write 1 GB at a time + f.write(os.urandom(1 * (1024**3))) # Write 1 GB at a time files.append(str(file_path)) queue = get_queue(farm_id=farm_id, queue_id=queue_id) diff --git a/src/deadline/job_attachments/upload.py b/src/deadline/job_attachments/upload.py index 00ba9f0f9..fce1efd74 100644 --- a/src/deadline/job_attachments/upload.py +++ b/src/deadline/job_attachments/upload.py @@ -61,7 +61,7 @@ # TODO: full performance analysis to determine the ideal threshold LIST_OBJECT_THRESHOLD: int = 100 - +SMALL_FILE_THRESHOLD: int = 400 * (1024**2) # 400 MB logger = logging.getLogger("deadline.job_attachments.upload") @@ -188,6 +188,10 @@ def upload_input_files( sum((source_root.joinpath(file.path)).stat().st_size for file in files_to_skip), ) + # Split into a separate 'large file' and 'small file' queues. + (small_file_queue, large_file_queue) = self._separate_files_by_size(files_to_upload) + + # First, process the whole 'small file' queue with parallel object uploads. # TODO: tune this. max_worker defaults to 5 * number of processors. We can run into issues here # if we thread too aggressively on slower internet connections. So for now let's set it to 5, # which would the number of threads with one processor. @@ -202,7 +206,7 @@ def upload_input_files( check_if_in_s3, progress_tracker, ): file - for file in files_to_upload + for file in small_file_queue } # surfaces any exceptions in the thread for future in concurrent.futures.as_completed(futures): @@ -210,6 +214,19 @@ def upload_input_files( if progress_tracker and not is_uploaded: progress_tracker.increase_skipped(1, file_size) + # Now process the whole 'large file' queue with serial object uploads (but still parallel multi-part upload.) + for file in large_file_queue: + (is_uploaded, file_size) = self.upload_object_to_cas( + file, + s3_bucket, + source_root, + s3_cas_prefix, + check_if_in_s3, + progress_tracker, + ) + if progress_tracker and not is_uploaded: + progress_tracker.increase_skipped(1, file_size) + # to report progress 100% at the end, and # to check if the job submission was canceled in the middle of processing the last batch of files. if progress_tracker: @@ -219,6 +236,23 @@ def upload_input_files( "File upload cancelled.", progress_tracker.get_summary_statistics() ) + def _separate_files_by_size( + self, + files_to_upload: list[base_manifest.BaseManifestPath], + size_threshold: int = SMALL_FILE_THRESHOLD, + ) -> Tuple[list[base_manifest.BaseManifestPath], list[base_manifest.BaseManifestPath]]: + """ + Splits the given list of files into two queues: one for small files and one for large files. + """ + small_file_queue: list[base_manifest.BaseManifestPath] = [] + large_file_queue: list[base_manifest.BaseManifestPath] = [] + for file in files_to_upload: + if file.size <= size_threshold: + small_file_queue.append(file) + else: + large_file_queue.append(file) + return (small_file_queue, large_file_queue) + def upload_object_to_cas( self, file: base_manifest.BaseManifestPath, diff --git a/test/unit/deadline_job_attachments/test_upload.py b/test/unit/deadline_job_attachments/test_upload.py index 84f7ca087..bd15372e0 100644 --- a/test/unit/deadline_job_attachments/test_upload.py +++ b/test/unit/deadline_job_attachments/test_upload.py @@ -23,7 +23,11 @@ from moto import mock_sts import deadline -from deadline.job_attachments.asset_manifests import BaseManifestModel, ManifestVersion +from deadline.job_attachments.asset_manifests import ( + BaseManifestModel, + BaseManifestPath, + ManifestVersion, +) from deadline.job_attachments.exceptions import ( AssetSyncError, JobAttachmentsS3ClientError, @@ -2055,6 +2059,63 @@ def test_get_asset_groups_for_windows_case_insensitive( } assert result[0].outputs == {Path("C:\\username\\docs\\outputs")} + @pytest.mark.parametrize( + "input_files, size_threshold, expected_queues", + [ + ( + [], + 100 * (1024**2), # 100 MB + ([], []), + ), + ( + [ + BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1), + BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1), + BaseManifestPath(path="", hash="", size=1000 * (1024**2), mtime=1), + ], + 100 * (1024**2), # 100 MB + ( + [ + BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1), + BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1), + ], + [ + BaseManifestPath(path="", hash="", size=1000 * (1024**2), mtime=1), + ], + ), + ), + ( + [ + BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1), + BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1), + ], + 800 * (1024**2), # 800 MB + ( + [ + BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1), + BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1), + ], + [], + ), + ), + ], + ) + def test_separate_files_by_size( + self, + input_files: list[BaseManifestPath], + size_threshold: int, + expected_queues: tuple[list[BaseManifestPath], list[BaseManifestPath]], + ): + """ + Tests that a helper method `_separate_files_by_size` is working as expected. + """ + a3_asset_uploader = S3AssetUploader() + actual_queues = a3_asset_uploader._separate_files_by_size( + files_to_upload=input_files, + size_threshold=size_threshold, + ) + assert actual_queues == expected_queues + def assert_progress_report_last_callback( num_input_files: int,