Skip to content

Commit

Permalink
fix(job_attachments): distinguish large/small file uploads
Browse files Browse the repository at this point in the history
- Split the input files to upload into two separate queues based on their sizes, one for smaller files and another for larger ones.
- First, process the whole "small file" queue with parallel object uploads. Then, once the small files are done, process the whole "large file" queue with serial object uploads (but still parallel multi-part upload).

Signed-off-by: Gahyun Suh <[email protected]>
  • Loading branch information
Gahyun Suh committed Nov 21, 2023
1 parent af8a80a commit b3b061d
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 2 deletions.
43 changes: 42 additions & 1 deletion src/deadline/job_attachments/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@
LIST_OBJECT_THRESHOLD: int = 100
CHUNK_SIZE_FOR_MULTIPART_UPLOAD = 8 * (1024**2) # 8 MB (Default chunk size for multipart upload)
MAX_WORKERS_FOR_MULTIPART_UPLOAD = 10 # Max workers for multipart upload
# We split the input files into two queues - one for small files, the other for large files.
# The threshold to draw a line on large vs. small file must be a multiple of the S3 multipart
# upload chunk size, 8 MB, which is a default size.
SMALL_FILE_THRESHOLD: int = 160 * (1024**2) # 160 MB

logger = logging.getLogger("deadline.job_attachments.upload")

Expand Down Expand Up @@ -191,6 +195,13 @@ def upload_input_files(
sum((source_root.joinpath(file.path)).stat().st_size for file in files_to_skip),
)

# Split into a separate 'large file' and 'small file' queues.
# Separate 'large' files from 'small' files so that we can process 'large' files serially.
# This wastes less bandwidth if uploads are cancelled, as it's better to use the multi-threaded
# multi-part upload for a single large file than multiple large files at the same time.
(small_file_queue, large_file_queue) = self._separate_files_by_size(files_to_upload)

# First, process the whole 'small file' queue with parallel object uploads.
# TODO: tune this. max_worker defaults to 5 * number of processors. We can run into issues here
# if we thread too aggressively on slower internet connections. So for now let's set it to 5,
# which would the number of threads with one processor.
Expand All @@ -205,14 +216,27 @@ def upload_input_files(
check_if_in_s3,
progress_tracker,
): file
for file in files_to_upload
for file in small_file_queue
}
# surfaces any exceptions in the thread
for future in concurrent.futures.as_completed(futures):
(is_uploaded, file_size) = future.result()
if progress_tracker and not is_uploaded:
progress_tracker.increase_skipped(1, file_size)

# Now process the whole 'large file' queue with serial object uploads (but still parallel multi-part upload.)
for file in large_file_queue:
(is_uploaded, file_size) = self.upload_object_to_cas(
file,
s3_bucket,
source_root,
s3_cas_prefix,
check_if_in_s3,
progress_tracker,
)
if progress_tracker and not is_uploaded:
progress_tracker.increase_skipped(1, file_size)

# to report progress 100% at the end, and
# to check if the job submission was canceled in the middle of processing the last batch of files.
if progress_tracker:
Expand All @@ -222,6 +246,23 @@ def upload_input_files(
"File upload cancelled.", progress_tracker.get_summary_statistics()
)

def _separate_files_by_size(
self,
files_to_upload: list[base_manifest.BaseManifestPath],
size_threshold: int = SMALL_FILE_THRESHOLD,
) -> Tuple[list[base_manifest.BaseManifestPath], list[base_manifest.BaseManifestPath]]:
"""
Splits the given list of files into two queues: one for small files and one for large files.
"""
small_file_queue: list[base_manifest.BaseManifestPath] = []
large_file_queue: list[base_manifest.BaseManifestPath] = []
for file in files_to_upload:
if file.size <= size_threshold:
small_file_queue.append(file)
else:
large_file_queue.append(file)
return (small_file_queue, large_file_queue)

def upload_object_to_cas(
self,
file: base_manifest.BaseManifestPath,
Expand Down
63 changes: 62 additions & 1 deletion test/unit/deadline_job_attachments/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
from moto import mock_sts

import deadline
from deadline.job_attachments.asset_manifests import BaseManifestModel, ManifestVersion
from deadline.job_attachments.asset_manifests import (
BaseManifestModel,
BaseManifestPath,
ManifestVersion,
)
from deadline.job_attachments.exceptions import (
JobAttachmentsS3ClientError,
MissingS3BucketError,
Expand Down Expand Up @@ -2060,6 +2064,63 @@ def test_get_asset_groups_for_windows_case_insensitive(
}
assert result[0].outputs == {Path("C:\\username\\docs\\outputs")}

@pytest.mark.parametrize(
"input_files, size_threshold, expected_queues",
[
(
[],
100 * (1024**2), # 100 MB
([], []),
),
(
[
BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1),
BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1),
BaseManifestPath(path="", hash="", size=1000 * (1024**2), mtime=1),
],
100 * (1024**2), # 100 MB
(
[
BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1),
BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1),
],
[
BaseManifestPath(path="", hash="", size=1000 * (1024**2), mtime=1),
],
),
),
(
[
BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1),
BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1),
],
800 * (1024**2), # 800 MB
(
[
BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1),
BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1),
],
[],
),
),
],
)
def test_separate_files_by_size(
self,
input_files: List[BaseManifestPath],
size_threshold: int,
expected_queues: Tuple[List[BaseManifestPath], List[BaseManifestPath]],
):
"""
Tests that a helper method `_separate_files_by_size` is working as expected.
"""
a3_asset_uploader = S3AssetUploader()
actual_queues = a3_asset_uploader._separate_files_by_size(
files_to_upload=input_files,
size_threshold=size_threshold,
)
assert actual_queues == expected_queues


def assert_progress_report_last_callback(
num_input_files: int,
Expand Down

0 comments on commit b3b061d

Please sign in to comment.