Skip to content

Commit

Permalink
Add fields for tracking where jobs are executed
Browse files Browse the repository at this point in the history
  • Loading branch information
bjester committed Dec 6, 2023
1 parent fb70ecd commit 4dfd8cc
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 2 deletions.
33 changes: 33 additions & 0 deletions kolibri/core/tasks/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ class Job(object):
"result",
"args",
"kwargs",
"worker_host",
"worker_process",
"worker_thread",
"worker_extra",
}

JSON_KEYS = UPDATEABLE_KEYS | {
Expand Down Expand Up @@ -174,6 +178,10 @@ def from_job(cls, job, **kwargs):
kwargs["long_running"] = job.long_running
kwargs["extra_metadata"] = job.extra_metadata.copy()
kwargs["facility_id"] = job.facility_id
kwargs["worker_host"] = job.worker_host
kwargs["worker_process"] = job.worker_process
kwargs["worker_thread"] = job.worker_thread
kwargs["worker_extra"] = job.worker_extra
return cls(job.func, **kwargs)

def __init__(
Expand All @@ -193,6 +201,10 @@ def __init__(
total_progress=0,
result=None,
long_running=False,
worker_host=None,
worker_process=None,
worker_thread=None,
worker_extra=None,
):
"""
Create a new Job that will run func given the arguments passed to Job(). If the track_progress keyword parameter
Expand Down Expand Up @@ -232,6 +244,10 @@ def __init__(
self.kwargs = kwargs or {}
self._storage = None
self.func = callable_to_import_path(func)
self.worker_host = worker_host
self.worker_process = worker_process
self.worker_thread = worker_thread
self.worker_extra = worker_extra

def _check_storage_attached(self):
if self._storage is None:
Expand Down Expand Up @@ -262,6 +278,23 @@ def update_metadata(self, **kwargs):
self.extra_metadata[key] = value
self.save_meta()

def update_worker_info(self, host=None, process=None, thread=None, extra=None):
if host is not None:
self.worker_host = host
if process is not None:
self.worker_process = process
if thread is not None:
self.worker_thread = thread
if extra is not None:
self.worker_extra = extra
self.storage.save_worker_info(
self.job_id,
host=self.worker_host,
process=self.worker_process,
thread=self.worker_process,
extra=self.worker_extra,
)

def check_for_cancel(self):
if self.cancellable:
if self.storage.check_job_canceled(self.job_id):
Expand Down
33 changes: 33 additions & 0 deletions kolibri/core/tasks/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ class ORMJob(Base):

scheduled_time = Column(DateTime())

# Optional references to the worker host, process and thread that are running this job,
# and any extra metadata that can be used by specific worker implementations.
worker_host = Column(String, nullable=True)
worker_process = Column(String, nullable=True)
worker_thread = Column(String, nullable=True)
worker_extra = Column(String, nullable=True)

__table_args__ = (Index("queue__scheduled_time", "queue", "scheduled_time"),)


Expand Down Expand Up @@ -491,6 +498,28 @@ def save_job_meta(self, job):
def save_job_as_cancellable(self, job_id, cancellable=True):
self._update_job(job_id, cancellable=cancellable)

def save_worker_info(
self, job_id, host=None, process=None, thread=None, extra=None
):
"""
Generally we only want to capture/update, not erase, any of this information so we only
update the fields that are non-None.
"""
kwargs = {}
if host is not None:
kwargs["worker_host"] = host
if process is not None:
kwargs["worker_process"] = process
if thread is not None:
kwargs["worker_thread"] = thread
if extra is not None:
kwargs["worker_extra"] = extra
if kwargs:
logger.debug(
"Updating worker info for job {} with {}".format(job_id, kwargs)
)
self._update_job(job_id, **kwargs)

# Turning off the complexity warning for this function as moving the conditional validation checks
# inline would be the simplest way to 'reduce' the complexity, but would make it less readable.
def reschedule_finished_job_if_needed( # noqa: C901
Expand Down Expand Up @@ -715,6 +744,10 @@ def schedule(
retry_interval=retry_interval,
scheduled_time=naive_utc_datetime(dt),
saved_job=job.to_json(),
worker_host=job.worker_host,
worker_process=job.worker_process,
worker_thread=job.worker_thread,
worker_extra=job.worker_extra,
)
session.merge(orm_job)
try:
Expand Down
24 changes: 22 additions & 2 deletions kolibri/core/tasks/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
logger = logging.getLogger(__name__)


def execute_job(job_id):
def execute_job(
job_id, worker_host=None, worker_process=None, worker_thread=None, worker_extra=None
):
"""
Call the function stored in the job.func.
:return: None
Expand All @@ -24,6 +26,8 @@ def execute_job(job_id):

job = storage.get_job(job_id)

job.update_worker_info(worker_host, worker_process, worker_thread, worker_extra)

job.execute()

connection.dispose()
Expand All @@ -32,6 +36,22 @@ def execute_job(job_id):
django_connection.close()


def execute_job_with_python_worker(job_id):
"""
Call execute_job but additionally with the current host, process and thread information taken
directly from python internals.
"""
import os
import threading

execute_job(
job_id,
worker_host=os.uname()[1],
worker_process=str(os.getpid()),
worker_thread=str(threading.get_ident()),
)


class Worker(object):
def __init__(self, connection, regular_workers=2, high_workers=1):
# Internally, we use concurrent.future.Future to run and track
Expand Down Expand Up @@ -166,7 +186,7 @@ def start_next_job(self, job):
:return future:
"""
future = self.workers.submit(
execute_job,
execute_job_with_python_worker,
job_id=job.job_id,
)

Expand Down

0 comments on commit 4dfd8cc

Please sign in to comment.