diff --git a/docs/research/download_slowdown_detection.ipynb b/docs/research/download_slowdown_detection.ipynb new file mode 100644 index 00000000..95912296 --- /dev/null +++ b/docs/research/download_slowdown_detection.ipynb @@ -0,0 +1,335 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setting Thresholds For Anomaly Detection in Download Speed\n", + "\n", + "### Background\n", + "Worker Agent uses Job Attachment module to download input files from an S3 bucket. During the download process, Job Attachment provides progress reports periodically using callback, which include information about the current (or average) transfer rate (in bytes/s). We are looking to implement a feature where, If the transfer rate falls below a certain **speed threshold** for a duration bigger than a specific **time threshold**, the download process is halted. This is to identify transfer speeds slow enough to reasonably imply system or network issues.\n", + "\n", + "### Objective\n", + "The key question is how to determine the hard-coded threshold values for transfer rate and time duration. These values should keep a balance between not prematurely terminating downloads and not allowing them to continue for too long with very low transfer speeds.\n", + "\n", + "### Simulation Approach\n", + "Since we don't have real-world data about the transfer rate yet, we decided to simulate the download process based on the following assumptions:\n", + "\n", + "1. Transfer speed\n", + "We assume the transfer speed at any given time (unless it is within the slowdown duration) follows a normal distribution. The mean for this distribution is derived from average internet speeds of North America, which is 12.5 MB/s. The standard deviation is set as 2.5 (without any specific underlying rationale.)\n", + "\n", + "2. Slowdown occurrence\n", + "For each download session in our simulation, we assume that a single time of significant slowdown will occur. The start time, duration, and rate of this slowdown are randomly determined as below, where `expected end time` is (Transfer size) / (Mean speed). \n", + " - start time: a random value from [0, `expected end time`]\n", + " - end time: a random value from [1, 2 * `expected end time` - start time + 1]\n", + " - We assume that problematic slowdowns could last long enough to raise concerns about time and monetary costs, so we consider durations up to twice the expected total time.\n", + " - slowdown rate: a random value from [99.9%, 99.99%]\n", + " - We assume the speed should drop huge enough to feel almost stopped.\n", + "\n", + "For each simulated download session, we test various combinations of speed and time thresholds.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Parameters and Assumptions" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [], + "source": [ + "MEAN_SPEED = 12.5 * 10**6 # 100 Mbps in B/s\n", + "STD_DEV_FOR_SPEED = 2.5 * 10**6 # MEAN_SPEED / 5\n", + "SLOWDOWN_RATE_RANGE = (0.999, 0.9999) # 99.9% to 99.99% slowdown (e.g., 100 Mbps --> 0.1 Mbps)\n", + "TRANSFER_SIZES = [10 * 10**9] # bytes\n", + "SPEED_THRESHOLDS = [\n", + " 1 * 10**3,\n", + " 2 * 10**3,\n", + " 5 * 10**3,\n", + " 10 * 10**3,\n", + " 50 * 10**3,\n", + " 100 * 10**3,\n", + " 200 * 10**3,\n", + " 500 * 10**3,\n", + " 1 * 10**6,\n", + " 2 * 10**6,\n", + " 5 * 10**6,\n", + " 10 * 10**6,\n", + "] # B/s,\n", + "TIME_THRESHOLDS = [1, 2, 3, 4, 5, 10, 15, 20] # minutes\n", + "MEASURE_INTERVAL = 1 * 60 # seconds" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Classes / methods for Running Simulation \n" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [], + "source": [ + "from dataclasses import dataclass\n", + "from typing import List\n", + "import numpy as np\n", + "import random\n", + "\n", + "\n", + "@dataclass\n", + "class SimulationEnv:\n", + " \"\"\"\n", + " Transfer speed profile, start time and duration of delay occurrence\n", + " \"\"\"\n", + "\n", + " transfer_size: int\n", + " speed_profile: list[tuple[float, int]]\n", + " slowdown_speed_mean: float\n", + " slowdown_start: int\n", + " slowdown_end: int\n", + " slowdown_duration: int\n", + "\n", + "\n", + "@dataclass\n", + "class SimulationResult:\n", + " \"\"\"\n", + " A combination of speed threshold and time threshold... and a simulation result.\n", + " \"\"\"\n", + "\n", + " speed_threshold: float\n", + " time_threshold: int\n", + " false_positive: bool\n", + " completed_but_cancelled: bool = False\n", + " worker_time: float = 0 # Time taken to complete the download / max possible time\n", + "\n", + " # Make the class iterable\n", + " def __iter__(self):\n", + " yield self\n", + "\n", + " @classmethod\n", + " def get_average_time(cls, results: List[\"SimulationResult\"]) -> float:\n", + " if not results:\n", + " return 0.0\n", + "\n", + " total = sum(result.worker_time for result in results)\n", + " return total / len(results)\n", + "\n", + "\n", + "def create_simulation_env(\n", + " transfer_size: int,\n", + ") -> SimulationEnv:\n", + " \"\"\"\n", + " Create a simulation environment with a randomized speed profile and\n", + " a single occurance of randomized slowdown.\n", + " \"\"\"\n", + " slowdown_start = random.randint(0, transfer_size / MEAN_SPEED) # seconds\n", + " slowdown_duration = random.randint(\n", + " 1, transfer_size * 2 / MEAN_SPEED - slowdown_start + 1\n", + " ) # seconds\n", + " slowdown_end = slowdown_start + slowdown_duration\n", + " slowdown_rate = random.uniform(SLOWDOWN_RATE_RANGE[0], SLOWDOWN_RATE_RANGE[1])\n", + " slowdown_speed_mean = (1 - slowdown_rate) * MEAN_SPEED # in B/s\n", + " slowdown_speed_std_dev = (1 - slowdown_rate) * 0.1\n", + "\n", + " time = 0\n", + " downloaded_so_far = 0 # bytes\n", + " speed_profile: list[tuple[float, int]] = []\n", + "\n", + " while downloaded_so_far < transfer_size:\n", + " if time >= slowdown_start and time < slowdown_end:\n", + " speed = np.random.normal(slowdown_speed_mean, slowdown_speed_std_dev)\n", + " else:\n", + " speed = np.random.normal(MEAN_SPEED, STD_DEV_FOR_SPEED)\n", + " current_speed = speed\n", + " # calculate how much data is downloaded (cumulative)\n", + " downloaded_so_far += current_speed * 1\n", + " time += 1\n", + " speed_profile.append((speed, transfer_size - downloaded_so_far))\n", + "\n", + " return SimulationEnv(\n", + " transfer_size=transfer_size,\n", + " speed_profile=speed_profile,\n", + " slowdown_speed_mean=slowdown_speed_mean,\n", + " slowdown_start=slowdown_start,\n", + " slowdown_end=slowdown_end,\n", + " slowdown_duration=slowdown_duration,\n", + " )\n", + "\n", + "\n", + "# Run a simulation with thresholds combination\n", + "def run_simulation(\n", + " simulation_env: SimulationEnv,\n", + " measure_interval: int,\n", + " speed_threshold: float,\n", + " time_threshold: int,\n", + ") -> SimulationResult:\n", + " speeds_to_avg = [] # in B/s\n", + " time_in_interval = 0 # in seconds\n", + " worker_time = 0 # seconds\n", + " consecutive_slow_speed_count = 0 # count\n", + " false_positive = None\n", + " total_time_expectation = simulation_env.transfer_size / MEAN_SPEED # seconds\n", + " # Total time taken until the download completes without slowdown detection or cancelling\n", + " max_time = len(simulation_env.speed_profile) # seconds\n", + "\n", + " for i, (speed, _) in enumerate(simulation_env.speed_profile):\n", + " speeds_to_avg.append(speed)\n", + " time_in_interval += 1\n", + " worker_time += 1\n", + "\n", + " if time_in_interval >= measure_interval:\n", + " # calculate avg speed over measure_interval\n", + " assert len(speeds_to_avg) > 0\n", + " avg_speed = sum(speeds_to_avg) / len(speeds_to_avg) # bytes/s\n", + " speeds_to_avg = []\n", + "\n", + " # check the measured speed against the threshold\n", + " if avg_speed < speed_threshold:\n", + " consecutive_slow_speed_count += 1\n", + " if consecutive_slow_speed_count >= time_threshold:\n", + " false_positive = (\n", + " i < simulation_env.slowdown_start or i >= simulation_env.slowdown_end\n", + " )\n", + " # After detecting the slowdown and canceling the download, we assume that the user retries\n", + " # from the beginning when the network and speed are back to normal.\n", + " worker_time += total_time_expectation\n", + " return SimulationResult(\n", + " speed_threshold=speed_threshold,\n", + " time_threshold=time_threshold,\n", + " false_positive=false_positive,\n", + " completed_but_cancelled=True,\n", + " worker_time=worker_time / max_time,\n", + " )\n", + " else:\n", + " consecutive_slow_speed_count = 0\n", + " time_in_interval = 0\n", + "\n", + " return SimulationResult(\n", + " speed_threshold=speed_threshold,\n", + " time_threshold=time_threshold,\n", + " false_positive=False,\n", + " completed_but_cancelled=False,\n", + " worker_time=worker_time / max_time,\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Run Simulation and Visulize Results by Heatmap" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from collections import defaultdict\n", + "import pandas as pd\n", + "import seaborn as sns\n", + "import matplotlib.pyplot as plt\n", + "\n", + "num_simulations = 1000\n", + "results = defaultdict(list)\n", + "for transfer_size in TRANSFER_SIZES:\n", + " for _ in range(num_simulations):\n", + " sim_env = create_simulation_env(transfer_size)\n", + " for speed_threshold in SPEED_THRESHOLDS:\n", + " for time_threshold in TIME_THRESHOLDS:\n", + " key = (speed_threshold, time_threshold)\n", + " # Run a simulation and append the result to dictionary\n", + " results[key].append(\n", + " run_simulation(sim_env, MEASURE_INTERVAL, speed_threshold, time_threshold)\n", + " )\n", + "\n", + " # the list of all speed thresholds used in simulations\n", + " speed_thresholds = [key[0] / 10**3 for key in results.keys()] # KB/s\n", + " # the list of all time thresholds used in simulations\n", + " time_thresholds = [key[1] for key in results.keys()]\n", + " # the list of average times for each combination of speed and time thresholds\n", + " avg_time_list = [\n", + " SimulationResult.get_average_time(results_list) for results_list in list(results.values())\n", + " ]\n", + "\n", + " df = pd.DataFrame(\n", + " {\"Speed thres\": speed_thresholds, \"Time thres\": time_thresholds, \"Avg time\": avg_time_list}\n", + " )\n", + "\n", + " pivot_table = df.pivot_table(values=\"Avg time\", index=\"Time thres\", columns=\"Speed thres\")\n", + "\n", + " # Draw a heatmap\n", + " plt.figure(figsize=(10, 8))\n", + " sns.heatmap(pivot_table, cmap=\"coolwarm\", annot=True, fmt=\".2f\", cbar=True)\n", + " plt.xlabel(\"Speed Threshold (KB/s)\")\n", + " plt.ylabel(\"Time Threshold (Minutes)\")\n", + " plt.title(\"Heatmap of Worker Runtime\")\n", + " plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Sample Results and Analysis" + ] + }, + { + "attachments": { + "image.png": { + "image/png": "" + } + }, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "![image.png](attachment:image.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This simulation is based on a lot of assumptions, making it difficult to be a good reflection of real-world situation, and there may be many disagreement in interpreting the results. Nevertheless, some key points that can be derived from these heatmap results are as follows:\n", + "\n", + "- When the speed threshold is set too low (less than 10 KB/s), even if a significant speed degradation occurs from the original speed (99.99% or more), this slowdown itself may not be detected.\n", + "\n", + "- When the time threshold exceeds a certain level (more than 4-5 minutes), it does not seem to have a significant impact on time or cost savings in the context of threshold-based cancellation. (However, as mentioned earlier, caution is required in interpreting these results: we do not assume extremely long durations of slowdown - in this simulation, we limit it to twice the expected download time. Furthermore, we assume that after halting the download, a new session starts to download files from the beginning. Therefore, in some cases, it may appear that it is better to continue downloading until it fully recovers the normal speed, rather than stopping it in the middle.)\n", + "\n", + "- Based on this heatmap, the most suitable range for threshold values appears to be as follows:\n", + " - Speed threshold: 50 - 200 KB/s\n", + " - Time threshold: 1 - 3 minutes\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "deadline-cloud-worker-agent", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.8" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/src/deadline_worker_agent/sessions/session.py b/src/deadline_worker_agent/sessions/session.py index 9a6b7250..e8e20542 100644 --- a/src/deadline_worker_agent/sessions/session.py +++ b/src/deadline_worker_agent/sessions/session.py @@ -765,12 +765,50 @@ def sync_asset_inputs( if self._asset_sync is None: return - def progress_handler(job_upload_status: ProgressReportMetadata) -> bool: + # A transfer rate below 50 Kb/s is considered concerning or potentially stalled. + TRANSFER_RATE_THRESHOLD = 50 * 10**3 # 50 KB/s + # Each progress report callback takes 1 min, so 2 reports amount to 2 mins in total + LOW_TRANSFER_COUNT_THRESHOLD = 2 + low_transfer_count = 0 + + def progress_handler(job_attachment_download_status: ProgressReportMetadata) -> bool: + """ + Callback for Job Attachments' sync_inputs() to track the the file transfer progress. + It performs checks on the tarnsfer rate and decides whether to continue the process. + + Args: + job_attachment_download_status: contains information about the currenet progress. + + Returns: + True if the operation should continue as normal or False to cancel. + """ + # Check the trasfer rate from the progress report. Counts the successive low transfer + # rates, and if the count exceeds the spcified threshold, cancels the download and + # fails the current (sync_input_job_attachments) action. + nonlocal low_transfer_count + transfer_rate = job_attachment_download_status.transferRate + if transfer_rate < TRANSFER_RATE_THRESHOLD: + low_transfer_count += 1 + else: + low_transfer_count = 0 + if low_transfer_count >= LOW_TRANSFER_COUNT_THRESHOLD: + cancel.set() + self.update_action( + action_status=ActionStatus( + state=ActionState.FAILED, + fail_message=( + "Input syncing failed due to successive low transfer rates (< 50 Kb/s). " + "The transfer rate was below the threshold for the last two checks." + ), + ), + ) + return False + self.update_action( action_status=ActionStatus( state=ActionState.RUNNING, - status_message=job_upload_status.progressMessage, - progress=job_upload_status.progress, + status_message=job_attachment_download_status.progressMessage, + progress=job_attachment_download_status.progress, ), ) return not cancel.is_set() diff --git a/test/unit/sessions/test_session.py b/test/unit/sessions/test_session.py index 2992f8b3..ef908681 100644 --- a/test/unit/sessions/test_session.py +++ b/test/unit/sessions/test_session.py @@ -49,6 +49,7 @@ JobAttachmentsFileSystem, PosixFileSystemPermissionSettings, ) +from deadline.job_attachments.progress_tracker import ProgressReportMetadata, ProgressStatus import deadline_worker_agent.sessions.session as session_mod @@ -617,6 +618,52 @@ def test_sync_asset_inputs( else: session.sync_asset_inputs(cancel=cancel, **args) # type: ignore[arg-type] + def test_sync_asset_inputs_cacellation_by_low_transfer_rate( + self, + session: Session, + mock_asset_sync: MagicMock, + ): + """ + Tests that the session is failed if the sync_inputs function reports successive + low transfer rates. + """ + LOW_TRANSFER_COUNT_THRESHOLD = 2 + + # Mock out the AssetSync's sync_inputs function to simulate multiple + # consecutive low transfer rates. + def mock_sync_inputs(on_downloading_files, *args, **kwargs): + low_transfer_rate_report = ProgressReportMetadata( + status=ProgressStatus.DOWNLOAD_IN_PROGRESS, + progress=0.0, + transferRate=10, + progressMessage="", + ) + for _ in range(LOW_TRANSFER_COUNT_THRESHOLD): + on_downloading_files(low_transfer_rate_report) + return ({}, {}) + + mock_asset_sync.sync_inputs = mock_sync_inputs + mock_cancel = MagicMock(spec=Event) + + with patch.object(session, "update_action") as mock_update_action: + session.sync_asset_inputs( + cancel=mock_cancel, + job_attachment_details=JobAttachmentDetails( + manifests=[], + job_attachments_file_system=JobAttachmentsFileSystem.COPIED, + ), + ) + mock_cancel.set.assert_called_once() + mock_update_action.assert_called_with( + action_status=ActionStatus( + state=ActionState.FAILED, + fail_message=( + "Input syncing failed due to successive low transfer rates (< 50 Kb/s). " + "The transfer rate was below the threshold for the last two checks." + ), + ), + ) + class TestSessionInnerRun: """Test cases for Session._run()"""