diff --git a/src/deadline_worker_agent/sessions/session.py b/src/deadline_worker_agent/sessions/session.py index 56742653..f39476f9 100644 --- a/src/deadline_worker_agent/sessions/session.py +++ b/src/deadline_worker_agent/sessions/session.py @@ -752,12 +752,50 @@ def sync_asset_inputs( if self._asset_sync is None: return - def progress_handler(job_upload_status: ProgressReportMetadata) -> bool: + # A transfer rate below 1 Kb/s is considered concerning or potentially stalled. + TRANSFER_RATE_THRESHOLD = 1000 # bytes/s + # Each progress report callback takes 5 mins, so 3 reports amount to 15 mins in total + LOW_TRANSFER_COUNT_THRESHOLD = 3 + low_transfer_count = 0 + + def progress_handler(job_attachment_download_status: ProgressReportMetadata) -> bool: + """ + Callback for Job Attachments' sync_inputs() to track the the file transfer progress. + It performs checks on the tarnsfer rate and decides whether to continue the process. + + Args: + job_attachment_download_status: contains information about the currenet progress. + + Returns: + True if the operation should continue as normal or False to cancel. + """ + # Check the trasfer rate from the progress report. Counts the successive low transfer + # rates, and if the count exceeds the spcified threshold, cancels the download and + # fails the current (sync_input_job_attachments) action. + nonlocal low_transfer_count + transfer_rate = job_attachment_download_status.transferRate + if transfer_rate < TRANSFER_RATE_THRESHOLD: + low_transfer_count += 1 + else: + low_transfer_count = 0 + if low_transfer_count >= LOW_TRANSFER_COUNT_THRESHOLD: + cancel.set() + self.update_action( + action_status=ActionStatus( + state=ActionState.FAILED, + fail_message=( + "Input syncing failed due to successive low transfer rates (< 1 Kb/s). " + "The transfer rate was below the threshold for the last three checks." + ), + ), + ) + return False + self.update_action( action_status=ActionStatus( state=ActionState.RUNNING, - status_message=job_upload_status.progressMessage, - progress=job_upload_status.progress, + status_message=job_attachment_download_status.progressMessage, + progress=job_attachment_download_status.progress, ), ) return not cancel.is_set() diff --git a/test/unit/sessions/test_session.py b/test/unit/sessions/test_session.py index e270ce6f..cd868092 100644 --- a/test/unit/sessions/test_session.py +++ b/test/unit/sessions/test_session.py @@ -48,6 +48,7 @@ AssetLoadingMethod, PosixFileSystemPermissionSettings, ) +from deadline.job_attachments.progress_tracker import ProgressReportMetadata, ProgressStatus import deadline_worker_agent.sessions.session as session_mod @@ -627,6 +628,52 @@ def test_job_attachments_path_mapping_rules_compatibility( session.sync_asset_inputs(cancel=cancel, **sync_asset_inputs_args) # type: ignore[arg-type] # No errors on generating path mapping rules - success! + def test_sync_asset_inputs_cacellation_by_low_transfer_rate( + self, + session: Session, + mock_asset_sync: MagicMock, + ): + """ + Tests that the session is failed if the sync_inputs function reports successive + low transfer rates. + """ + LOW_TRANSFER_COUNT_THRESHOLD = 3 + + # Mock out the AssetSync's sync_inputs function to simulate multiple + # consecutive low transfer rates. + def mock_sync_inputs(on_downloading_files, *args, **kwargs): + low_transfer_rate_report = ProgressReportMetadata( + status=ProgressStatus.DOWNLOAD_IN_PROGRESS, + progress=0.0, + transferRate=10, + progressMessage="", + ) + for _ in range(LOW_TRANSFER_COUNT_THRESHOLD): + on_downloading_files(low_transfer_rate_report) + return ({}, {}) + + mock_asset_sync.sync_inputs = mock_sync_inputs + mock_cancel = MagicMock(spec=Event) + + with patch.object(session, "update_action") as mock_update_action: + session.sync_asset_inputs( + cancel=mock_cancel, + job_attachment_details=JobAttachmentDetails( + manifests=[], + asset_loading_method=AssetLoadingMethod.PRELOAD, + ), + ) + mock_cancel.set.assert_called_once() + mock_update_action.assert_called_with( + action_status=ActionStatus( + state=ActionState.FAILED, + fail_message=( + "Input syncing failed due to successive low transfer rates (< 1 Kb/s). " + "The transfer rate was below the threshold for the last three checks." + ), + ), + ) + class TestSessionInnerRun: """Test cases for Session._run()"""