diff --git a/backend/danswer/background/celery/apps/app_base.py b/backend/danswer/background/celery/apps/app_base.py index f1f375886a0..60190e88303 100644 --- a/backend/danswer/background/celery/apps/app_base.py +++ b/backend/danswer/background/celery/apps/app_base.py @@ -1,7 +1,6 @@ import logging import multiprocessing import time -from contextvars import ContextVar from typing import Any import requests @@ -37,6 +36,7 @@ from shared_configs.configs import MULTI_TENANT from shared_configs.configs import SENTRY_DSN from shared_configs.configs import TENANT_ID_PREFIX +from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR logger = setup_logger() @@ -341,12 +341,16 @@ def on_setup_logging( colorize: bool, **kwargs: Any, ) -> None: - # Clear existing handlers to avoid duplicate logs + # TODO: could unhardcode format and colorize and accept these as options from + # celery's config + root_logger = logging.getLogger() root_logger.handlers = [] # Define the log format - log_format = "%(levelname)-8s %(asctime)s %(name)-40s %(filename)15s:%(lineno)-4d: %(message)s" + log_format = ( + "%(levelname)-8s %(asctime)s %(filename)15s:%(lineno)-4d: %(name)s %(message)s" + ) # Set up the root handler root_handler = logging.StreamHandler() @@ -372,7 +376,7 @@ def on_setup_logging( task_logger.handlers = [] task_handler = logging.StreamHandler() - task_handler.addFilter(TenantContextFilter()) # Apply filter here + task_handler.addFilter(TenantContextFilter()) task_formatter = CeleryTaskColoredFormatter( log_format, datefmt="%m/%d/%Y %I:%M:%S %p", @@ -382,7 +386,7 @@ def on_setup_logging( if logfile: task_file_handler = logging.FileHandler(logfile) - task_file_handler.addFilter(TenantContextFilter()) # Apply filter here + task_file_handler.addFilter(TenantContextFilter()) task_file_formatter = CeleryTaskPlainFormatter( log_format, datefmt="%m/%d/%Y %I:%M:%S %p", @@ -404,18 +408,18 @@ class TenantContextFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: if not MULTI_TENANT: + record.name = "" return True - tenant_id = current_tenant.get() + tenant_id = CURRENT_TENANT_ID_CONTEXTVAR.get() if tenant_id: - record.name = f"[tenant:{tenant_id[len(TENANT_ID_PREFIX):len(TENANT_ID_PREFIX)+5]}] {record.name}" + tenant_id = tenant_id.split(TENANT_ID_PREFIX)[-1][:5] + record.name = f"[t:{tenant_id}]" + else: + record.name = "" return True -# Global context variable for tenant_id -current_tenant = ContextVar("current_tenant", default=None) - - @task_prerun.connect def set_tenant_id( sender: Any | None = None, @@ -427,7 +431,7 @@ def set_tenant_id( ) -> None: """Signal handler to set tenant ID in context var before task starts.""" tenant_id = kwargs.get("tenant_id") if kwargs else None - current_tenant.set(tenant_id) + CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id) @task_postrun.connect @@ -440,4 +444,4 @@ def reset_tenant_id( **other_kwargs: Any, ) -> None: """Signal handler to reset tenant ID in context var after task ends.""" - current_tenant.set(None) + CURRENT_TENANT_ID_CONTEXTVAR.set(None) diff --git a/backend/danswer/background/celery/apps/beat.py b/backend/danswer/background/celery/apps/beat.py index 3529498f86c..3a9d36bca2b 100644 --- a/backend/danswer/background/celery/apps/beat.py +++ b/backend/danswer/background/celery/apps/beat.py @@ -51,9 +51,9 @@ def tick(self) -> float: def _update_tenant_tasks(self) -> None: logger.info("Starting task update process") try: - logger.info("Fetching all tenant IDs") + logger.info("Fetching all IDs") tenant_ids = get_all_tenant_ids() - logger.info(f"Found {len(tenant_ids)} items") + logger.info(f"Found {len(tenant_ids)} IDs") logger.info("Fetching tasks to schedule") tasks_to_schedule = fetch_versioned_implementation(