diff --git a/kolibri/core/tasks/job.py b/kolibri/core/tasks/job.py index 75c9053070d..38f21eef241 100644 --- a/kolibri/core/tasks/job.py +++ b/kolibri/core/tasks/job.py @@ -125,6 +125,10 @@ class Job(object): "result", "args", "kwargs", + "worker_host", + "worker_process", + "worker_thread", + "worker_extra", } JSON_KEYS = UPDATEABLE_KEYS | { @@ -174,6 +178,10 @@ def from_job(cls, job, **kwargs): kwargs["long_running"] = job.long_running kwargs["extra_metadata"] = job.extra_metadata.copy() kwargs["facility_id"] = job.facility_id + kwargs["worker_host"] = job.worker_host + kwargs["worker_process"] = job.worker_process + kwargs["worker_thread"] = job.worker_thread + kwargs["worker_extra"] = job.worker_extra return cls(job.func, **kwargs) def __init__( @@ -193,6 +201,10 @@ def __init__( total_progress=0, result=None, long_running=False, + worker_host=None, + worker_process=None, + worker_thread=None, + worker_extra=None, ): """ Create a new Job that will run func given the arguments passed to Job(). If the track_progress keyword parameter @@ -232,6 +244,10 @@ def __init__( self.kwargs = kwargs or {} self._storage = None self.func = callable_to_import_path(func) + self.worker_host = worker_host + self.worker_process = worker_process + self.worker_thread = worker_thread + self.worker_extra = worker_extra def _check_storage_attached(self): if self._storage is None: @@ -262,6 +278,23 @@ def update_metadata(self, **kwargs): self.extra_metadata[key] = value self.save_meta() + def update_worker_info(self, host=None, process=None, thread=None, extra=None): + if host is not None: + self.worker_host = host + if process is not None: + self.worker_process = process + if thread is not None: + self.worker_thread = thread + if extra is not None: + self.worker_extra = extra + self.storage.save_worker_info( + self.job_id, + host=self.worker_host, + process=self.worker_process, + thread=self.worker_process, + extra=self.worker_extra, + ) + def check_for_cancel(self): if self.cancellable: if self.storage.check_job_canceled(self.job_id): diff --git a/kolibri/core/tasks/storage.py b/kolibri/core/tasks/storage.py index 3e64bbaaa4e..cdd45ba26fa 100644 --- a/kolibri/core/tasks/storage.py +++ b/kolibri/core/tasks/storage.py @@ -79,6 +79,13 @@ class ORMJob(Base): scheduled_time = Column(DateTime()) + # Optional references to the worker host, process and thread that are running this job, + # and any extra metadata that can be used by specific worker implementations. + worker_host = Column(String, nullable=True) + worker_process = Column(String, nullable=True) + worker_thread = Column(String, nullable=True) + worker_extra = Column(String, nullable=True) + __table_args__ = (Index("queue__scheduled_time", "queue", "scheduled_time"),) @@ -491,6 +498,28 @@ def save_job_meta(self, job): def save_job_as_cancellable(self, job_id, cancellable=True): self._update_job(job_id, cancellable=cancellable) + def save_worker_info( + self, job_id, host=None, process=None, thread=None, extra=None + ): + """ + Generally we only want to capture/update, not erase, any of this information so we only + update the fields that are non-None. + """ + kwargs = {} + if host is not None: + kwargs["worker_host"] = host + if process is not None: + kwargs["worker_process"] = process + if thread is not None: + kwargs["worker_thread"] = thread + if extra is not None: + kwargs["worker_extra"] = extra + if kwargs: + logger.debug( + "Updating worker info for job {} with {}".format(job_id, kwargs) + ) + self._update_job(job_id, **kwargs) + # Turning off the complexity warning for this function as moving the conditional validation checks # inline would be the simplest way to 'reduce' the complexity, but would make it less readable. def reschedule_finished_job_if_needed( # noqa: C901 @@ -715,6 +744,10 @@ def schedule( retry_interval=retry_interval, scheduled_time=naive_utc_datetime(dt), saved_job=job.to_json(), + worker_host=job.worker_host, + worker_process=job.worker_process, + worker_thread=job.worker_thread, + worker_extra=job.worker_extra, ) session.merge(orm_job) try: diff --git a/kolibri/core/tasks/worker.py b/kolibri/core/tasks/worker.py index 3d52d5fef8a..778f0c805d7 100644 --- a/kolibri/core/tasks/worker.py +++ b/kolibri/core/tasks/worker.py @@ -12,7 +12,9 @@ logger = logging.getLogger(__name__) -def execute_job(job_id): +def execute_job( + job_id, worker_host=None, worker_process=None, worker_thread=None, worker_extra=None +): """ Call the function stored in the job.func. :return: None @@ -24,6 +26,8 @@ def execute_job(job_id): job = storage.get_job(job_id) + job.update_worker_info(worker_host, worker_process, worker_thread, worker_extra) + job.execute() connection.dispose() @@ -32,6 +36,22 @@ def execute_job(job_id): django_connection.close() +def execute_job_with_python_worker(job_id): + """ + Call execute_job but additionally with the current host, process and thread information taken + directly from python internals. + """ + import os + import threading + + execute_job( + job_id, + worker_host=os.uname()[1], + worker_process=str(os.getpid()), + worker_thread=str(threading.get_ident()), + ) + + class Worker(object): def __init__(self, connection, regular_workers=2, high_workers=1): # Internally, we use concurrent.future.Future to run and track @@ -166,7 +186,7 @@ def start_next_job(self, job): :return future: """ future = self.workers.submit( - execute_job, + execute_job_with_python_worker, job_id=job.job_id, )