Skip to content

Commit

Permalink
[Core] Avoid job scheduling race condition (#4310)
Browse files Browse the repository at this point in the history
* Avoid job schedule race condition

* format

* format

* Avoid race for cancel
  • Loading branch information
Michaelvll authored Nov 9, 2024
1 parent 1b3d968 commit 2944014
Showing 1 changed file with 29 additions and 17 deletions.
46 changes: 29 additions & 17 deletions sky/skylet/job_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
import sqlite3
import subprocess
import time
import typing
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional

import colorama
import filelock
Expand All @@ -24,9 +23,6 @@
from sky.utils import db_utils
from sky.utils import log_utils

if typing.TYPE_CHECKING:
from ray.dashboard.modules.job import pydantic_models as ray_pydantic

logger = sky_logging.init_logger(__name__)

_LINUX_NEW_LINE = '\n'
Expand Down Expand Up @@ -184,12 +180,20 @@ def _run_job(self, job_id: int, run_cmd: str):
def schedule_step(self, force_update_jobs: bool = False) -> None:
if force_update_jobs:
update_status()
pending_jobs = self._get_pending_jobs()
pending_job_ids = self._get_pending_job_ids()
# TODO(zhwu, mraheja): One optimization can be allowing more than one
# job staying in the pending state after ray job submit, so that to be
# faster to schedule a large amount of jobs.
for job_id, run_cmd, submit, created_time in pending_jobs:
for job_id in pending_job_ids:
with filelock.FileLock(_get_lock_path(job_id)):
pending_job = _get_pending_job(job_id)
if pending_job is None:
# Pending job can be removed by another thread, due to the
# job being scheduled already.
continue
run_cmd = pending_job['run_cmd']
submit = pending_job['submit']
created_time = pending_job['created_time']
# We don't have to refresh the job status before checking, as
# the job status will only be stale in rare cases where ray job
# crashes; or the job stays in INIT state for a long time.
Expand All @@ -208,8 +212,8 @@ def schedule_step(self, force_update_jobs: bool = False) -> None:
self._run_job(job_id, run_cmd)
return

def _get_pending_jobs(self) -> List[Tuple[int, str, int, int]]:
"""Returns the metadata for jobs in the pending jobs table
def _get_pending_job_ids(self) -> List[int]:
"""Returns the job ids in the pending jobs table
The information contains job_id, run command, submit time,
creation time.
Expand All @@ -220,9 +224,10 @@ def _get_pending_jobs(self) -> List[Tuple[int, str, int, int]]:
class FIFOScheduler(JobScheduler):
"""First in first out job scheduler"""

def _get_pending_jobs(self) -> List[Tuple[int, str, int, int]]:
return list(
_CURSOR.execute('SELECT * FROM pending_jobs ORDER BY job_id'))
def _get_pending_job_ids(self) -> List[int]:
rows = _CURSOR.execute(
'SELECT job_id FROM pending_jobs ORDER BY job_id').fetchall()
return [row[0] for row in rows]


scheduler = FIFOScheduler()
Expand Down Expand Up @@ -519,11 +524,16 @@ def _get_jobs_by_ids(job_ids: List[int]) -> List[Dict[str, Any]]:


def _get_pending_job(job_id: int) -> Optional[Dict[str, Any]]:
rows = _CURSOR.execute('SELECT created_time, submit FROM pending_jobs '
f'WHERE job_id={job_id!r}')
rows = _CURSOR.execute(
'SELECT created_time, submit, run_cmd FROM pending_jobs '
f'WHERE job_id={job_id!r}')
for row in rows:
created_time, submit = row
return {'created_time': created_time, 'submit': submit}
created_time, submit, run_cmd = row
return {
'created_time': created_time,
'submit': submit,
'run_cmd': run_cmd
}
return None


Expand Down Expand Up @@ -794,7 +804,9 @@ def cancel_jobs_encoded_results(jobs: Optional[List[int]],
logger.warning(str(e))
continue

if job['status'] in [
# Get the job status again to avoid race condition.
job_status = get_status_no_lock(job['job_id'])
if job_status in [
JobStatus.PENDING, JobStatus.SETTING_UP, JobStatus.RUNNING
]:
_set_status_no_lock(job['job_id'], JobStatus.CANCELLED)
Expand Down

0 comments on commit 2944014

Please sign in to comment.