Skip to content

Commit

Permalink
fix(job_attachments): distinguish large/small file uploads
Browse files Browse the repository at this point in the history
- Split the input files to upload into two separate queues based on their sizes, one for smaller files and another for larger ones.
- First, process the whole "small file" queue with parallel object uploads. Then, once the small files are done, process the whole "large file" queue with serial object uploads (but still parallel multi-part upload).

Signed-off-by: Gahyun Suh <[email protected]>
  • Loading branch information
Gahyun Suh committed Nov 20, 2023
1 parent 7b3b3e6 commit cc6ec8e
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 5 deletions.
4 changes: 2 additions & 2 deletions scripted_tests/upload_cancel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def run():
file_path = root_path / f"medium_test{i}.txt"
if not os.path.exists(file_path):
with file_path.open("wb") as f:
f.write(os.urandom(102428800)) # 100 MB files
f.write(os.urandom(100 * (1024**2))) # 100 MB files
files.append(str(file_path))

# Make large files
Expand All @@ -84,7 +84,7 @@ def run():
if not os.path.exists(file_path):
for i in range(1):
with file_path.open("ab") as f:
f.write(os.urandom(1073741824)) # Write 1 GB at a time
f.write(os.urandom(1 * (1024**3))) # Write 1 GB at a time
files.append(str(file_path))

queue = get_queue(farm_id=farm_id, queue_id=queue_id)
Expand Down
38 changes: 36 additions & 2 deletions src/deadline/job_attachments/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@

# TODO: full performance analysis to determine the ideal threshold
LIST_OBJECT_THRESHOLD: int = 100

SMALL_FILE_THRESHOLD: int = 400 * (1024**2) # 400 MB

logger = logging.getLogger("deadline.job_attachments.upload")

Expand Down Expand Up @@ -188,6 +188,10 @@ def upload_input_files(
sum((source_root.joinpath(file.path)).stat().st_size for file in files_to_skip),
)

# Split into a separate 'large file' and 'small file' queues.
(small_file_queue, large_file_queue) = self._separate_files_by_size(files_to_upload)

# First, process the whole 'small file' queue with parallel object uploads.
# TODO: tune this. max_worker defaults to 5 * number of processors. We can run into issues here
# if we thread too aggressively on slower internet connections. So for now let's set it to 5,
# which would the number of threads with one processor.
Expand All @@ -202,14 +206,27 @@ def upload_input_files(
check_if_in_s3,
progress_tracker,
): file
for file in files_to_upload
for file in small_file_queue
}
# surfaces any exceptions in the thread
for future in concurrent.futures.as_completed(futures):
(is_uploaded, file_size) = future.result()
if progress_tracker and not is_uploaded:
progress_tracker.increase_skipped(1, file_size)

# Now process the whole 'large file' queue with serial object uploads (but still parallel multi-part upload.)
for file in large_file_queue:
(is_uploaded, file_size) = self.upload_object_to_cas(
file,
s3_bucket,
source_root,
s3_cas_prefix,
check_if_in_s3,
progress_tracker,
)
if progress_tracker and not is_uploaded:
progress_tracker.increase_skipped(1, file_size)

# to report progress 100% at the end, and
# to check if the job submission was canceled in the middle of processing the last batch of files.
if progress_tracker:
Expand All @@ -219,6 +236,23 @@ def upload_input_files(
"File upload cancelled.", progress_tracker.get_summary_statistics()
)

def _separate_files_by_size(
self,
files_to_upload: list[base_manifest.BaseManifestPath],
size_threshold: int = SMALL_FILE_THRESHOLD,
) -> Tuple[list[base_manifest.BaseManifestPath], list[base_manifest.BaseManifestPath]]:
"""
Splits the given list of files into two queues: one for small files and one for large files.
"""
small_file_queue: list[base_manifest.BaseManifestPath] = []
large_file_queue: list[base_manifest.BaseManifestPath] = []
for file in files_to_upload:
if file.size <= size_threshold:
small_file_queue.append(file)
else:
large_file_queue.append(file)
return (small_file_queue, large_file_queue)

def upload_object_to_cas(
self,
file: base_manifest.BaseManifestPath,
Expand Down
63 changes: 62 additions & 1 deletion test/unit/deadline_job_attachments/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
from moto import mock_sts

import deadline
from deadline.job_attachments.asset_manifests import BaseManifestModel, ManifestVersion
from deadline.job_attachments.asset_manifests import (
BaseManifestModel,
BaseManifestPath,
ManifestVersion,
)
from deadline.job_attachments.exceptions import (
AssetSyncError,
JobAttachmentsS3ClientError,
Expand Down Expand Up @@ -2055,6 +2059,63 @@ def test_get_asset_groups_for_windows_case_insensitive(
}
assert result[0].outputs == {Path("C:\\username\\docs\\outputs")}

@pytest.mark.parametrize(
"input_files, size_threshold, expected_queues",
[
(
[],
100 * (1024**2), # 100 MB
([], []),
),
(
[
BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1),
BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1),
BaseManifestPath(path="", hash="", size=1000 * (1024**2), mtime=1),
],
100 * (1024**2), # 100 MB
(
[
BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1),
BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1),
],
[
BaseManifestPath(path="", hash="", size=1000 * (1024**2), mtime=1),
],
),
),
(
[
BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1),
BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1),
],
800 * (1024**2), # 800 MB
(
[
BaseManifestPath(path="", hash="", size=10 * (1024**2), mtime=1),
BaseManifestPath(path="", hash="", size=100 * (1024**2), mtime=1),
],
[],
),
),
],
)
def test_separate_files_by_size(
self,
input_files: list[BaseManifestPath],
size_threshold: int,
expected_queues: tuple[list[BaseManifestPath], list[BaseManifestPath]],
):
"""
Tests that a helper method `_separate_files_by_size` is working as expected.
"""
a3_asset_uploader = S3AssetUploader()
actual_queues = a3_asset_uploader._separate_files_by_size(
files_to_upload=input_files,
size_threshold=size_threshold,
)
assert actual_queues == expected_queues


def assert_progress_report_last_callback(
num_input_files: int,
Expand Down

0 comments on commit cc6ec8e

Please sign in to comment.