diff --git a/backend/danswer/background/celery/apps/app_base.py b/backend/danswer/background/celery/apps/app_base.py index 7d3e7644ed9..2c72a2f7103 100644 --- a/backend/danswer/background/celery/apps/app_base.py +++ b/backend/danswer/background/celery/apps/app_base.py @@ -8,6 +8,8 @@ from celery import Task from celery.app import trace from celery.exceptions import WorkerShutdown +from celery.signals import task_postrun +from celery.signals import task_prerun from celery.states import READY_STATES from celery.utils.log import get_task_logger from celery.worker import strategy # type: ignore @@ -33,7 +35,8 @@ from danswer.utils.logger import setup_logger from shared_configs.configs import MULTI_TENANT from shared_configs.configs import SENTRY_DSN - +from shared_configs.configs import TENANT_ID_PREFIX +from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR logger = setup_logger() @@ -54,8 +57,8 @@ def on_task_prerun( sender: Any | None = None, task_id: str | None = None, task: Task | None = None, - args: tuple | None = None, - kwargs: dict | None = None, + args: tuple[Any, ...] | None = None, + kwargs: dict[str, Any] | None = None, **kwds: Any, ) -> None: pass @@ -332,26 +335,36 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None: def on_setup_logging( - loglevel: Any, logfile: Any, format: Any, colorize: Any, **kwargs: Any + loglevel: int, + logfile: str | None, + format: str, + colorize: bool, + **kwargs: Any, ) -> None: # TODO: could unhardcode format and colorize and accept these as options from # celery's config - # reformats the root logger root_logger = logging.getLogger() + root_logger.handlers = [] - root_handler = logging.StreamHandler() # Set up a handler for the root logger + # Define the log format + log_format = ( + "%(levelname)-8s %(asctime)s %(filename)15s:%(lineno)-4d: %(name)s %(message)s" + ) + + # Set up the root handler + root_handler = logging.StreamHandler() root_formatter = ColoredFormatter( - "%(asctime)s %(filename)30s %(lineno)4s: %(message)s", + log_format, datefmt="%m/%d/%Y %I:%M:%S %p", ) root_handler.setFormatter(root_formatter) - root_logger.addHandler(root_handler) # Apply the handler to the root logger + root_logger.addHandler(root_handler) if logfile: root_file_handler = logging.FileHandler(logfile) root_file_formatter = PlainFormatter( - "%(asctime)s %(filename)30s %(lineno)4s: %(message)s", + log_format, datefmt="%m/%d/%Y %I:%M:%S %p", ) root_file_handler.setFormatter(root_file_formatter) @@ -359,19 +372,23 @@ def on_setup_logging( root_logger.setLevel(loglevel) - # reformats celery's task logger + # Configure the task logger + task_logger.handlers = [] + + task_handler = logging.StreamHandler() + task_handler.addFilter(TenantContextFilter()) task_formatter = CeleryTaskColoredFormatter( - "%(asctime)s %(filename)30s %(lineno)4s: %(message)s", + log_format, datefmt="%m/%d/%Y %I:%M:%S %p", ) - task_handler = logging.StreamHandler() # Set up a handler for the task logger task_handler.setFormatter(task_formatter) - task_logger.addHandler(task_handler) # Apply the handler to the task logger + task_logger.addHandler(task_handler) if logfile: task_file_handler = logging.FileHandler(logfile) + task_file_handler.addFilter(TenantContextFilter()) task_file_formatter = CeleryTaskPlainFormatter( - "%(asctime)s %(filename)30s %(lineno)4s: %(message)s", + log_format, datefmt="%m/%d/%Y %I:%M:%S %p", ) task_file_handler.setFormatter(task_file_formatter) @@ -380,10 +397,51 @@ def on_setup_logging( task_logger.setLevel(loglevel) task_logger.propagate = False - # hide celery task received spam - # e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] received" + # Hide celery task received and succeeded/failed messages strategy.logger.setLevel(logging.WARNING) - - # hide celery task succeeded/failed spam - # e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] succeeded in 0.03137450001668185s: None" trace.logger.setLevel(logging.WARNING) + + +class TenantContextFilter(logging.Filter): + + """Logging filter to inject tenant ID into the logger's name.""" + + def filter(self, record: logging.LogRecord) -> bool: + if not MULTI_TENANT: + record.name = "" + return True + + tenant_id = CURRENT_TENANT_ID_CONTEXTVAR.get() + if tenant_id: + tenant_id = tenant_id.split(TENANT_ID_PREFIX)[-1][:5] + record.name = f"[t:{tenant_id}]" + else: + record.name = "" + return True + + +@task_prerun.connect +def set_tenant_id( + sender: Any | None = None, + task_id: str | None = None, + task: Task | None = None, + args: tuple[Any, ...] | None = None, + kwargs: dict[str, Any] | None = None, + **other_kwargs: Any, +) -> None: + """Signal handler to set tenant ID in context var before task starts.""" + tenant_id = kwargs.get("tenant_id") if kwargs else None + CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id) + + +@task_postrun.connect +def reset_tenant_id( + sender: Any | None = None, + task_id: str | None = None, + task: Task | None = None, + args: tuple[Any, ...] | None = None, + kwargs: dict[str, Any] | None = None, + **other_kwargs: Any, +) -> None: + """Signal handler to reset tenant ID in context var after task ends.""" + CURRENT_TENANT_ID_CONTEXTVAR.reset() diff --git a/backend/danswer/background/celery/apps/beat.py b/backend/danswer/background/celery/apps/beat.py index 979cf07cbb1..3a9d36bca2b 100644 --- a/backend/danswer/background/celery/apps/beat.py +++ b/backend/danswer/background/celery/apps/beat.py @@ -42,18 +42,18 @@ def tick(self) -> float: self._last_reload is None or (now - self._last_reload) > self._reload_interval ): - logger.info("Reload interval reached, initiating tenant task update") + logger.info("Reload interval reached, initiating task update") self._update_tenant_tasks() self._last_reload = now - logger.info("Tenant task update completed, reset reload timer") + logger.info("Task update completed, reset reload timer") return retval def _update_tenant_tasks(self) -> None: - logger.info("Starting tenant task update process") + logger.info("Starting task update process") try: - logger.info("Fetching all tenant IDs") + logger.info("Fetching all IDs") tenant_ids = get_all_tenant_ids() - logger.info(f"Found {len(tenant_ids)} tenants") + logger.info(f"Found {len(tenant_ids)} IDs") logger.info("Fetching tasks to schedule") tasks_to_schedule = fetch_versioned_implementation( @@ -68,11 +68,11 @@ def _update_tenant_tasks(self) -> None: for task_name, _ in current_schedule: if "-" in task_name: existing_tenants.add(task_name.split("-")[-1]) - logger.info(f"Found {len(existing_tenants)} existing tenants in schedule") + logger.info(f"Found {len(existing_tenants)} existing items in schedule") for tenant_id in tenant_ids: if tenant_id not in existing_tenants: - logger.info(f"Processing new tenant: {tenant_id}") + logger.info(f"Processing new item: {tenant_id}") for task in tasks_to_schedule(): task_name = f"{task['name']}-{tenant_id}" @@ -118,11 +118,10 @@ def _update_tenant_tasks(self) -> None: logger.info("Schedule update completed successfully") else: logger.info("Schedule is up to date, no changes needed") - - except (AttributeError, KeyError): - logger.exception("Failed to process task configuration") - except Exception: - logger.exception("Unexpected error updating tenant tasks") + except (AttributeError, KeyError) as e: + logger.exception(f"Failed to process task configuration: {str(e)}") + except Exception as e: + logger.exception(f"Unexpected error updating tasks: {str(e)}") def _should_update_schedule( self, current_schedule: dict, new_schedule: dict diff --git a/backend/danswer/background/celery/tasks/connector_deletion/tasks.py b/backend/danswer/background/celery/tasks/connector_deletion/tasks.py index 360481015bb..125efb2685c 100644 --- a/backend/danswer/background/celery/tasks/connector_deletion/tasks.py +++ b/backend/danswer/background/celery/tasks/connector_deletion/tasks.py @@ -76,7 +76,7 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N "Soft time limit exceeded, task is being terminated gracefully." ) except Exception: - task_logger.exception(f"Unexpected exception: tenant={tenant_id}") + task_logger.exception("Unexpected exception during connector deletion check") finally: if lock_beat.owned(): lock_beat.release() @@ -132,14 +132,14 @@ def try_generate_document_cc_pair_cleanup_tasks( redis_connector_index = redis_connector.new_index(search_settings.id) if redis_connector_index.fenced: raise TaskDependencyError( - f"Connector deletion - Delayed (indexing in progress): " + "Connector deletion - Delayed (indexing in progress): " f"cc_pair={cc_pair_id} " f"search_settings={search_settings.id}" ) if redis_connector.prune.fenced: raise TaskDependencyError( - f"Connector deletion - Delayed (pruning in progress): " + "Connector deletion - Delayed (pruning in progress): " f"cc_pair={cc_pair_id}" ) @@ -170,7 +170,7 @@ def try_generate_document_cc_pair_cleanup_tasks( # return 0 task_logger.info( - f"RedisConnectorDeletion.generate_tasks finished. " + "RedisConnectorDeletion.generate_tasks finished. " f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}" ) diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index 666defd9586..dbe8dd00cd3 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -185,7 +185,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: "Soft time limit exceeded, task is being terminated gracefully." ) except Exception: - task_logger.exception(f"Unexpected exception: tenant={tenant_id}") + task_logger.exception("Unexpected exception during indexing check") finally: if lock_beat.owned(): lock_beat.release() @@ -371,7 +371,6 @@ def try_creating_indexing_task( redis_connector_index.set_fence(payload) task_logger.exception( f"Unexpected exception: " - f"tenant={tenant_id} " f"cc_pair={cc_pair.id} " f"search_settings={search_settings.id}" ) @@ -393,7 +392,6 @@ def connector_indexing_proxy_task( """celery tasks are forked, but forking is unstable. This proxies work to a spawned task.""" task_logger.info( f"Indexing proxy - starting: attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) @@ -412,7 +410,6 @@ def connector_indexing_proxy_task( if not job: task_logger.info( f"Indexing proxy - spawn failed: attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) @@ -420,7 +417,6 @@ def connector_indexing_proxy_task( task_logger.info( f"Indexing proxy - spawn succeeded: attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) @@ -445,7 +441,6 @@ def connector_indexing_proxy_task( task_logger.error( f"Indexing proxy - spawned task exceptioned: " f"attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id} " f"error={job.exception()}" @@ -456,7 +451,6 @@ def connector_indexing_proxy_task( task_logger.info( f"Indexing proxy - finished: attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) @@ -644,7 +638,6 @@ def connector_indexing_task( logger.info( f"Indexing spawned task finished: attempt={index_attempt_id} " - f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) diff --git a/backend/danswer/background/celery/tasks/pruning/tasks.py b/backend/danswer/background/celery/tasks/pruning/tasks.py index af80e6b886c..7bba5ff6e76 100644 --- a/backend/danswer/background/celery/tasks/pruning/tasks.py +++ b/backend/danswer/background/celery/tasks/pruning/tasks.py @@ -84,7 +84,7 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> None: "Soft time limit exceeded, task is being terminated gracefully." ) except Exception: - task_logger.exception(f"Unexpected exception: tenant={tenant_id}") + task_logger.exception("Unexpected exception during pruning check") finally: if lock_beat.owned(): lock_beat.release() @@ -294,7 +294,7 @@ def connector_pruning_generator_task( doc_ids_to_remove = list(all_indexed_document_ids - all_connector_doc_ids) task_logger.info( - f"Pruning set collected: " + "Pruning set collected: " f"cc_pair={cc_pair_id} " f"docs_to_remove={len(doc_ids_to_remove)} " f"doc_source={cc_pair.connector.source}" @@ -310,7 +310,7 @@ def connector_pruning_generator_task( return None task_logger.info( - f"RedisConnector.prune.generate_tasks finished. " + "RedisConnector.prune.generate_tasks finished. " f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}" ) diff --git a/backend/danswer/background/celery/tasks/shared/tasks.py b/backend/danswer/background/celery/tasks/shared/tasks.py index 116e7e1ff7e..bf6622577bf 100644 --- a/backend/danswer/background/celery/tasks/shared/tasks.py +++ b/backend/danswer/background/celery/tasks/shared/tasks.py @@ -59,7 +59,7 @@ def document_by_cc_pair_cleanup_task( connector / credential pair from the access list (6) delete all relevant entries from postgres """ - task_logger.info(f"tenant={tenant_id} doc={document_id}") + task_logger.info(f"doc={document_id}") try: with get_session_with_tenant(tenant_id) as db_session: @@ -128,16 +128,13 @@ def document_by_cc_pair_cleanup_task( db_session.commit() task_logger.info( - f"tenant={tenant_id} " f"doc={document_id} " f"action={action} " f"refcount={count} " f"chunks={chunks_affected}" ) except SoftTimeLimitExceeded: - task_logger.info( - f"SoftTimeLimitExceeded exception. tenant={tenant_id} doc={document_id}" - ) + task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}") return False except Exception as ex: if isinstance(ex, RetryError): @@ -154,15 +151,12 @@ def document_by_cc_pair_cleanup_task( if e.response.status_code == HTTPStatus.BAD_REQUEST: task_logger.exception( f"Non-retryable HTTPStatusError: " - f"tenant={tenant_id} " f"doc={document_id} " f"status={e.response.status_code}" ) return False - task_logger.exception( - f"Unexpected exception: tenant={tenant_id} doc={document_id}" - ) + task_logger.exception(f"Unexpected exception: doc={document_id}") if self.request.retries < DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES: # Still retrying. Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64 @@ -173,7 +167,7 @@ def document_by_cc_pair_cleanup_task( # eventually gets fixed out of band via stale document reconciliation task_logger.info( f"Max retries reached. Marking doc as dirty for reconciliation: " - f"tenant={tenant_id} doc={document_id}" + f"doc={document_id}" ) with get_session_with_tenant(tenant_id): mark_document_as_modified(document_id, db_session) diff --git a/backend/danswer/background/celery/tasks/vespa/tasks.py b/backend/danswer/background/celery/tasks/vespa/tasks.py index b01a0eac815..e151660d024 100644 --- a/backend/danswer/background/celery/tasks/vespa/tasks.py +++ b/backend/danswer/background/celery/tasks/vespa/tasks.py @@ -152,7 +152,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> None: "Soft time limit exceeded, task is being terminated gracefully." ) except Exception: - task_logger.exception(f"Unexpected exception: tenant={tenant_id}") + task_logger.exception("Unexpected exception during vespa metadata sync") finally: if lock_beat.owned(): lock_beat.release() @@ -802,13 +802,9 @@ def vespa_metadata_sync_task( # the sync might repeat again later mark_document_as_synced(document_id, db_session) - task_logger.info( - f"tenant={tenant_id} doc={document_id} action=sync chunks={chunks_affected}" - ) + task_logger.info(f"doc={document_id} action=sync chunks={chunks_affected}") except SoftTimeLimitExceeded: - task_logger.info( - f"SoftTimeLimitExceeded exception. tenant={tenant_id} doc={document_id}" - ) + task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}") except Exception as ex: if isinstance(ex, RetryError): task_logger.warning(f"Retry failed: {ex.last_attempt.attempt_number}") @@ -824,14 +820,13 @@ def vespa_metadata_sync_task( if e.response.status_code == HTTPStatus.BAD_REQUEST: task_logger.exception( f"Non-retryable HTTPStatusError: " - f"tenant={tenant_id} " f"doc={document_id} " f"status={e.response.status_code}" ) return False task_logger.exception( - f"Unexpected exception: tenant={tenant_id} doc={document_id}" + f"Unexpected exception during vespa metadata sync: doc={document_id}" ) # Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64