Skip to content

Commit

Permalink
feat(job_attachments)!: cancelable multipart file upload (#111)
Browse files Browse the repository at this point in the history
Refactored file upload process to enable cancellation check during the
upload of files.

Signed-off-by: Gahyun Suh <[email protected]>
Co-authored-by: Gahyun Suh <[email protected]>
  • Loading branch information
gahyusuh and Gahyun Suh authored Nov 21, 2023
1 parent 7b3b3e6 commit af8a80a
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 105 deletions.
14 changes: 8 additions & 6 deletions scripted_tests/upload_cancel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
2. In the middle of hashing or uploading those files, you can send a cancel
signal by pressing 'k' and Enter keys in succession. Confirm that cancelling
is working as expected by checking the console output.
Note: This script generates test files in the /tmp/test_submit directory for testing
purpose. But it does not delete these files after the test is completed.
"""

MESSAGE_HOW_TO_CANCEL = (
"To stop the hash/upload process, please hit 'k' key and then 'Enter' key in succession.\n"
)

NUM_SMALL_FILES = 0
NUM_MEDIUM_FILES = 5
NUM_MEDIUM_FILES = 0
NUM_LARGE_FILES = 1

continue_reporting = True
Expand Down Expand Up @@ -74,17 +77,16 @@ def run():
file_path = root_path / f"medium_test{i}.txt"
if not os.path.exists(file_path):
with file_path.open("wb") as f:
f.write(os.urandom(102428800)) # 100 MB files
f.write(os.urandom(100 * (1024**2))) # 100 MB files
files.append(str(file_path))

# Make large files
if NUM_LARGE_FILES > 0:
for i in range(0, NUM_LARGE_FILES):
file_path = root_path / f"large_test{i}.txt"
if not os.path.exists(file_path):
for i in range(1):
with file_path.open("ab") as f:
f.write(os.urandom(1073741824)) # Write 1 GB at a time
with file_path.open("ab") as f:
f.write(os.urandom(1 * (1024**3))) # Write 1 GB at a time
files.append(str(file_path))

queue = get_queue(farm_id=farm_id, queue_id=queue_id)
Expand Down Expand Up @@ -117,7 +119,7 @@ def run():
)
except AssetSyncCancelledError as asce:
print(f"AssetSyncCancelledError: {asce}")
print(f"payload: {asce.summary_statistics}")
print(f"payload:\n{asce.summary_statistics}")

print(f"\nTotal test runtime: {time.perf_counter() - start_time}")

Expand Down
9 changes: 3 additions & 6 deletions src/deadline/job_attachments/asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from ._aws.aws_clients import get_boto3_session
from ._aws.deadline import get_job, get_queue
from .download import (
_progress_logger,
merge_asset_manifests,
download_files_from_manifests,
get_manifest_from_s3,
Expand Down Expand Up @@ -122,12 +121,10 @@ def _upload_output_files_to_s3(
continue

self.s3_uploader.upload_file_to_s3(
file.full_path,
Path(file.full_path),
s3_settings.s3BucketName,
file.s3_key,
progress_handler=_progress_logger(
file.file_size, progress_tracker.track_progress_callback
),
progress_tracker,
)

progress_tracker.total_time = time.perf_counter() - start_time
Expand Down Expand Up @@ -480,7 +477,7 @@ def sync_outputs(
)
else:
raise AssetSyncError(
"Error occurred while attempting to sync input files: "
"Error occurred while attempting to sync output files: "
f"No path mapping rule found for the source path {manifest_properties.rootPath}"
)
else:
Expand Down
4 changes: 2 additions & 2 deletions src/deadline/job_attachments/progress_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def convert_to_summary_statistics(self) -> SummaryStatistics:

class ProgressStatus(Enum):
"""
Reperesents the current stage of asset/file processing
Represents the current stage of asset/file processing
"""

NONE = ("NONE", "")
Expand Down Expand Up @@ -194,7 +194,7 @@ def do_nothing(*args, **kwargs) -> bool:

self._lock = Lock()

def track_progress(bytes_amount: int, current_file_done: bool) -> bool:
def track_progress(bytes_amount: int, current_file_done: Optional[bool] = False) -> bool:
"""
When uploading or downloading files using boto3, pass this to the `Callback` argument
so that the progress can be updated with the amount of bytes processed.
Expand Down
Loading

0 comments on commit af8a80a

Please sign in to comment.