Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When running under the Kolibri process bus, use a logging queue to prevent reentrant logging errors and file contention #12785

Merged
merged 2 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion kolibri/core/tasks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ def __job_storage():
""" :type: Storage """


def initialize_workers():
def initialize_workers(log_queue=None):
logger.info("Starting async task workers.")
return Worker(
connection=connection,
regular_workers=conf.OPTIONS["Tasks"]["REGULAR_PRIORITY_WORKERS"],
high_workers=conf.OPTIONS["Tasks"]["HIGH_PRIORITY_WORKERS"],
log_queue=log_queue,
)
10 changes: 5 additions & 5 deletions kolibri/core/tasks/test/taskrunner/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from kolibri.core.tasks import compat
from kolibri.utils import multiprocessing_compat


@pytest.fixture(params=[False, True], autouse=True)
Expand All @@ -24,7 +24,7 @@ class local(object):
from threading import local # noqa
from concurrent.futures import ThreadPoolExecutor as PoolExecutor # noqa

monkeypatch.setattr(compat, "Thread", Thread)
monkeypatch.setattr(compat, "Event", Event)
monkeypatch.setattr(compat, "local", local)
monkeypatch.setattr(compat, "PoolExecutor", PoolExecutor)
monkeypatch.setattr(multiprocessing_compat, "Thread", Thread)
monkeypatch.setattr(multiprocessing_compat, "Event", Event)
monkeypatch.setattr(multiprocessing_compat, "local", local)
monkeypatch.setattr(multiprocessing_compat, "PoolExecutor", PoolExecutor)
2 changes: 1 addition & 1 deletion kolibri/core/tasks/test/taskrunner/test_job_running.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import pytest

from kolibri.core.tasks.compat import Event
from kolibri.core.tasks.exceptions import JobNotFound
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import State
Expand All @@ -13,6 +12,7 @@
from kolibri.core.tasks.utils import get_current_job
from kolibri.core.tasks.utils import import_path_to_callable
from kolibri.core.tasks.worker import Worker
from kolibri.utils.multiprocessing_compat import Event


@pytest.fixture
Expand Down
6 changes: 3 additions & 3 deletions kolibri/core/tasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

from kolibri.core.sqlite.utils import check_sqlite_integrity
from kolibri.core.sqlite.utils import repair_sqlite_db
from kolibri.core.tasks import compat
from kolibri.core.tasks.exceptions import UserCancelledError
from kolibri.utils import conf
from kolibri.utils import multiprocessing_compat
from kolibri.utils.options import FD_PER_THREAD
from kolibri.utils.system import get_fd_limit

Expand All @@ -27,7 +27,7 @@
# An object on which to store data about the current job
# So far the only use is to track the job, but other metadata
# could be added.
current_state_tracker = SimpleLazyObject(compat.local)
current_state_tracker = SimpleLazyObject(multiprocessing_compat.local)


def get_current_job():
Expand Down Expand Up @@ -76,7 +76,7 @@ def __init__(self, func, thread_name, wait_between_runs=1, *args, **kwargs):
:param thread_name: the name of the thread to use during logging and debugging
:param wait_between_runs: how many seconds to wait in between func calls.
"""
self.shutdown_event = compat.Event()
self.shutdown_event = multiprocessing_compat.Event()
self.thread_name = thread_name
self.thread_id = uuid.uuid4().hex
self.logger = logging.getLogger(
Expand Down
17 changes: 13 additions & 4 deletions kolibri/core/tasks/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,22 @@

from django.db import connection as django_connection

from kolibri.core.tasks.compat import PoolExecutor
from kolibri.core.tasks.constants import Priority
from kolibri.core.tasks.storage import Storage
from kolibri.core.tasks.utils import db_connection
from kolibri.core.tasks.utils import InfiniteLoopThread
from kolibri.utils.multiprocessing_compat import PoolExecutor

logger = logging.getLogger(__name__)


def execute_job(
job_id, worker_host=None, worker_process=None, worker_thread=None, worker_extra=None
job_id,
worker_host=None,
worker_process=None,
worker_thread=None,
worker_extra=None,
log_queue=None,
):
"""
Call the function stored in the job.func.
Expand All @@ -36,7 +41,7 @@ def execute_job(
django_connection.close()


def execute_job_with_python_worker(job_id):
def execute_job_with_python_worker(job_id, log_queue=None):
"""
Call execute_job but additionally with the current host, process and thread information taken
directly from python internals.
Expand All @@ -50,11 +55,12 @@ def execute_job_with_python_worker(job_id):
worker_host=socket.gethostname(),
worker_process=str(os.getpid()),
worker_thread=str(threading.get_ident()),
log_queue=log_queue,
)


class Worker(object):
def __init__(self, connection, regular_workers=2, high_workers=1):
def __init__(self, connection, regular_workers=2, high_workers=1, log_queue=None):
# Internally, we use concurrent.future.Future to run and track
# job executions. We need to keep track of which future maps to which
# job they were made from, and we use the job_future_mapping dict to do
Expand All @@ -74,6 +80,8 @@ def __init__(self, connection, regular_workers=2, high_workers=1):
# High workers run only 'high' priority jobs.
self.regular_workers = regular_workers
self.max_workers = regular_workers + high_workers
# Track any log queue that is passed in
self.log_queue = log_queue

self.workers = self.start_workers()
self.job_checker = self.start_job_checker()
Expand Down Expand Up @@ -191,6 +199,7 @@ def start_next_job(self, job):
future = self.workers.submit(
execute_job_with_python_worker,
job_id=job.job_id,
log_queue=self.log_queue,
)

# Check if the job ID already exists in the future_job_mapping dictionary
Expand Down
153 changes: 153 additions & 0 deletions kolibri/utils/logger.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import logging
import os
from logging.handlers import QueueHandler
from logging.handlers import QueueListener
from logging.handlers import TimedRotatingFileHandler
from typing import Dict
from typing import List
from typing import Optional


GET_FILES_TO_DELETE = "getFilesToDelete"
Expand All @@ -18,6 +23,62 @@
}


# Type definition for mapping of logger names to their handlers
LoggerHandlerMap = Dict[str, List[logging.Handler]]


class LoggerAwareQueueHandler(QueueHandler):
"""
A QueueHandler that adds the logger name to the record so that it
can be properly handled in the listener.
"""

def __init__(self, queue, logger_name: str):
super().__init__(queue)
self.logger_name = logger_name

def prepare(self, record: logging.LogRecord) -> logging.LogRecord:
"""Prepare a record for queuing, ensuring it can be pickled if needed"""
# Get Queue class at runtime to check if we need pickle safety
from kolibri.utils.multiprocessing_compat import use_multiprocessing

# Only do pickle-safety preparation for logging if we're using multiprocessing
if use_multiprocessing():
if hasattr(record, "exc_info") and record.exc_info:
record.exc_text = (
logging.getLogger()
.handlers[0]
.formatter.formatException(record.exc_info)
)
record.exc_info = None
if hasattr(record, "args"):
record.args = tuple(str(arg) for arg in record.args)

record = super().prepare(record)
record._logger_name = self.logger_name
return record


class LoggerAwareQueueListener(QueueListener):
"""A QueueListener that routes records to their original logger's handlers"""

def __init__(self, queue, logger_handlers: LoggerHandlerMap):
super().__init__(queue)
self.logger_handlers = logger_handlers

def handle(self, record: logging.LogRecord) -> None:
"""Handle a record by sending it to the original logger's handlers"""
logger_name = getattr(record, "_logger_name", "")
handlers = self.logger_handlers.get(logger_name, [])

for handler in handlers:
try:
if record.levelno >= handler.level:
handler.handle(record)
except Exception:
handler.handleError(record)


class EncodingStreamHandler(logging.StreamHandler):
"""
A custom stream handler that encodes the log message to the specified encoding.
Expand Down Expand Up @@ -312,3 +373,95 @@ def get_logging_config(LOG_ROOT, debug=False, debug_database=False):
admin_logger_handlers = admin_logger.setdefault("handlers", [])
admin_logger_handlers.append("mail_admins")
return config


# Track if queue logging has been initialized for the current process
_queue_logging_initialized_for_process = False


class QueueLoggingInitializedError(RuntimeError):
pass


def _replace_handlers_with_queue(queue) -> LoggerHandlerMap:
"""
Internal function to replace all logger handlers with queue handlers.
Returns a dict of the original logger handlers for the listener to consume.
"""
global _queue_logging_initialized_for_process

if _queue_logging_initialized_for_process:
raise QueueLoggingInitializedError(
"Queue logging has already been initialized for this process"
)

logger_handlers: LoggerHandlerMap = {}

# Set up logging for all loggers
for logger_name in list(logging.root.manager.loggerDict.keys()) + [""]:
logger = logging.getLogger(logger_name)
if logger.handlers:
# Store the original handlers
logger_handlers[logger_name] = logger.handlers[:]

# Remove existing handlers
for handler in logger.handlers[:]:
logger.removeHandler(handler)

# Add queue handler
queue_handler = LoggerAwareQueueHandler(queue, logger_name)
logger.addHandler(queue_handler)

_queue_logging_initialized_for_process = True

return logger_handlers


def setup_queue_logging() -> LoggerAwareQueueListener:
"""
Sets up queue-based logging for the main process.
Returns the queue listener which can be used to stop logging and clean up.
"""
# Import Queue at function scope to avoid import order issues
from kolibri.utils.multiprocessing_compat import Queue

# Create queue using Kolibri's compatibility Queue
log_queue = Queue()

# Replace handlers and get original configurations
logger_handlers = _replace_handlers_with_queue(log_queue)

# Create and start listener with collected handlers
listener = LoggerAwareQueueListener(log_queue, logger_handlers)
listener.start()

return listener


def setup_worker_logging(queue) -> None:
"""Sets up logging in a worker to use the queue if not already configured."""
try:
_replace_handlers_with_queue(queue)
except QueueLoggingInitializedError:
pass


def cleanup_queue_logging(listener: Optional[LoggerAwareQueueListener]) -> None:
"""
Stops the queue listener and cleans up multiprocessing resources if needed.
"""
if not listener:
return

# Stop the listener to ensure pending logs are processed
listener.stop()

# Clean up queue if it's a multiprocessing queue
from kolibri.utils.multiprocessing_compat import use_multiprocessing

if use_multiprocessing():
try:
listener.queue.close()
listener.queue.join_thread()
except (ValueError, AttributeError):
pass
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import multiprocessing
import threading
from concurrent import futures
from queue import Queue as ThreadingQueue

from kolibri.utils.conf import OPTIONS

Expand Down Expand Up @@ -29,6 +30,12 @@ def Event(*args, **kwargs):
return threading.Event(*args, **kwargs)


def Queue(*args, **kwargs):
if use_multiprocessing():
return multiprocessing.Queue(*args, **kwargs)
return ThreadingQueue(*args, **kwargs)


class _Local(object):
"""
Dummy class to use for a local object for multiprocessing
Expand Down
21 changes: 20 additions & 1 deletion kolibri/utils/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
from .system import pid_exists
from kolibri.utils import conf
from kolibri.utils.android import on_android
from kolibri.utils.logger import cleanup_queue_logging
from kolibri.utils.logger import setup_queue_logging

try:
FileNotFoundError
Expand Down Expand Up @@ -274,7 +276,9 @@ def START(self):
from kolibri.core.tasks.main import initialize_workers

# Initialize the iceqube engine to handle queued tasks
self.worker = initialize_workers()
# Add a loose coupling between our LogPlugin and the ServicesPlugin
# by getting any log_queue that might be present on the bus
self.worker = initialize_workers(log_queue=getattr(self.bus, "log_queue", None))

def STOP(self):
if self.worker is not None:
Expand Down Expand Up @@ -534,9 +538,24 @@ def ENTER(self):


class LogPlugin(SimplePlugin):
def ENTER(self):
# Do this setup during INITIAL, so we wait
# until after any WSGI application has been
# imported, as that will trigger Django setup
# which will reinitialize logging, and override
# what we are doing here.
self.queue_listener = setup_queue_logging()
self.bus.log_queue = self.queue_listener.queue

def log(self, msg, level):
logger.log(level, msg)

def EXITED(self):
cleanup_queue_logging(self.queue_listener)

# Set this to priority 100 so that it gets executed after any other EXITED handlers.
EXITED.priority = 100


class SignalHandler(BaseSignalHandler):
def __init__(self, bus):
Expand Down