Skip to content

Commit

Permalink
feat: Fail Job Attachments action when consecutive transfer rates dro…
Browse files Browse the repository at this point in the history
…p below threshold

Signed-off-by: Gahyun Suh <[email protected]>
  • Loading branch information
gahyusuh committed Sep 21, 2023
1 parent 174ea42 commit 695d169
Show file tree
Hide file tree
Showing 3 changed files with 423 additions and 3 deletions.
335 changes: 335 additions & 0 deletions docs/research/download_slowdown_deteection.ipynb

Large diffs are not rendered by default.

44 changes: 41 additions & 3 deletions src/deadline_worker_agent/sessions/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,12 +752,50 @@ def sync_asset_inputs(
if self._asset_sync is None:
return

def progress_handler(job_upload_status: ProgressReportMetadata) -> bool:
# A transfer rate below 1 Kb/s is considered concerning or potentially stalled.
TRANSFER_RATE_THRESHOLD = 50 * 10**3 # 50 KB/s
# Each progress report callback takes 1 min, so 2 reports amount to 2 mins in total
LOW_TRANSFER_COUNT_THRESHOLD = 2
low_transfer_count = 0

def progress_handler(job_attachment_download_status: ProgressReportMetadata) -> bool:
"""
Callback for Job Attachments' sync_inputs() to track the the file transfer progress.
It performs checks on the tarnsfer rate and decides whether to continue the process.
Args:
job_attachment_download_status: contains information about the currenet progress.
Returns:
True if the operation should continue as normal or False to cancel.
"""
# Check the trasfer rate from the progress report. Counts the successive low transfer
# rates, and if the count exceeds the spcified threshold, cancels the download and
# fails the current (sync_input_job_attachments) action.
nonlocal low_transfer_count
transfer_rate = job_attachment_download_status.transferRate
if transfer_rate < TRANSFER_RATE_THRESHOLD:
low_transfer_count += 1
else:
low_transfer_count = 0
if low_transfer_count >= LOW_TRANSFER_COUNT_THRESHOLD:
cancel.set()
self.update_action(
action_status=ActionStatus(
state=ActionState.FAILED,
fail_message=(
"Input syncing failed due to successive low transfer rates (< 1 Kb/s). "
"The transfer rate was below the threshold for the last three checks."
),
),
)
return False

self.update_action(
action_status=ActionStatus(
state=ActionState.RUNNING,
status_message=job_upload_status.progressMessage,
progress=job_upload_status.progress,
status_message=job_attachment_download_status.progressMessage,
progress=job_attachment_download_status.progress,
),
)
return not cancel.is_set()
Expand Down
47 changes: 47 additions & 0 deletions test/unit/sessions/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
AssetLoadingMethod,
PosixFileSystemPermissionSettings,
)
from deadline.job_attachments.progress_tracker import ProgressReportMetadata, ProgressStatus
import deadline_worker_agent.sessions.session as session_mod


Expand Down Expand Up @@ -627,6 +628,52 @@ def test_job_attachments_path_mapping_rules_compatibility(
session.sync_asset_inputs(cancel=cancel, **sync_asset_inputs_args) # type: ignore[arg-type]
# No errors on generating path mapping rules - success!

def test_sync_asset_inputs_cacellation_by_low_transfer_rate(
self,
session: Session,
mock_asset_sync: MagicMock,
):
"""
Tests that the session is failed if the sync_inputs function reports successive
low transfer rates.
"""
LOW_TRANSFER_COUNT_THRESHOLD = 3

# Mock out the AssetSync's sync_inputs function to simulate multiple
# consecutive low transfer rates.
def mock_sync_inputs(on_downloading_files, *args, **kwargs):
low_transfer_rate_report = ProgressReportMetadata(
status=ProgressStatus.DOWNLOAD_IN_PROGRESS,
progress=0.0,
transferRate=10,
progressMessage="",
)
for _ in range(LOW_TRANSFER_COUNT_THRESHOLD):
on_downloading_files(low_transfer_rate_report)
return ({}, {})

mock_asset_sync.sync_inputs = mock_sync_inputs
mock_cancel = MagicMock(spec=Event)

with patch.object(session, "update_action") as mock_update_action:
session.sync_asset_inputs(
cancel=mock_cancel,
job_attachment_details=JobAttachmentDetails(
manifests=[],
asset_loading_method=AssetLoadingMethod.PRELOAD,
),
)
mock_cancel.set.assert_called_once()
mock_update_action.assert_called_with(
action_status=ActionStatus(
state=ActionState.FAILED,
fail_message=(
"Input syncing failed due to successive low transfer rates (< 1 Kb/s). "
"The transfer rate was below the threshold for the last three checks."
),
),
)


class TestSessionInnerRun:
"""Test cases for Session._run()"""
Expand Down

0 comments on commit 695d169

Please sign in to comment.