Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove tenant id logs #3063

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 77 additions & 19 deletions backend/danswer/background/celery/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from celery import Task
from celery.app import trace
from celery.exceptions import WorkerShutdown
from celery.signals import task_postrun
from celery.signals import task_prerun
from celery.states import READY_STATES
from celery.utils.log import get_task_logger
from celery.worker import strategy # type: ignore
Expand All @@ -33,7 +35,8 @@
from danswer.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT
from shared_configs.configs import SENTRY_DSN

from shared_configs.configs import TENANT_ID_PREFIX
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR

logger = setup_logger()

Expand All @@ -54,8 +57,8 @@ def on_task_prerun(
sender: Any | None = None,
task_id: str | None = None,
task: Task | None = None,
args: tuple | None = None,
kwargs: dict | None = None,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
**kwds: Any,
) -> None:
pass
Expand Down Expand Up @@ -332,46 +335,60 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None:


def on_setup_logging(
loglevel: Any, logfile: Any, format: Any, colorize: Any, **kwargs: Any
loglevel: int,
logfile: str | None,
format: str,
colorize: bool,
**kwargs: Any,
) -> None:
# TODO: could unhardcode format and colorize and accept these as options from
# celery's config

# reformats the root logger
root_logger = logging.getLogger()
root_logger.handlers = []

root_handler = logging.StreamHandler() # Set up a handler for the root logger
# Define the log format
log_format = (
"%(levelname)-8s %(asctime)s %(filename)15s:%(lineno)-4d: %(name)s %(message)s"
)

# Set up the root handler
root_handler = logging.StreamHandler()
root_formatter = ColoredFormatter(
"%(asctime)s %(filename)30s %(lineno)4s: %(message)s",
log_format,
datefmt="%m/%d/%Y %I:%M:%S %p",
)
root_handler.setFormatter(root_formatter)
root_logger.addHandler(root_handler) # Apply the handler to the root logger
root_logger.addHandler(root_handler)

if logfile:
root_file_handler = logging.FileHandler(logfile)
root_file_formatter = PlainFormatter(
"%(asctime)s %(filename)30s %(lineno)4s: %(message)s",
log_format,
datefmt="%m/%d/%Y %I:%M:%S %p",
)
root_file_handler.setFormatter(root_file_formatter)
root_logger.addHandler(root_file_handler)

root_logger.setLevel(loglevel)

# reformats celery's task logger
# Configure the task logger
task_logger.handlers = []

task_handler = logging.StreamHandler()
task_handler.addFilter(TenantContextFilter())
task_formatter = CeleryTaskColoredFormatter(
"%(asctime)s %(filename)30s %(lineno)4s: %(message)s",
log_format,
datefmt="%m/%d/%Y %I:%M:%S %p",
)
task_handler = logging.StreamHandler() # Set up a handler for the task logger
task_handler.setFormatter(task_formatter)
task_logger.addHandler(task_handler) # Apply the handler to the task logger
task_logger.addHandler(task_handler)

if logfile:
task_file_handler = logging.FileHandler(logfile)
task_file_handler.addFilter(TenantContextFilter())
task_file_formatter = CeleryTaskPlainFormatter(
"%(asctime)s %(filename)30s %(lineno)4s: %(message)s",
log_format,
datefmt="%m/%d/%Y %I:%M:%S %p",
)
task_file_handler.setFormatter(task_file_formatter)
Expand All @@ -380,10 +397,51 @@ def on_setup_logging(
task_logger.setLevel(loglevel)
task_logger.propagate = False

# hide celery task received spam
# e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] received"
# Hide celery task received and succeeded/failed messages
strategy.logger.setLevel(logging.WARNING)

# hide celery task succeeded/failed spam
# e.g. "Task check_for_pruning[a1e96171-0ba8-4e00-887b-9fbf7442eab3] succeeded in 0.03137450001668185s: None"
trace.logger.setLevel(logging.WARNING)


class TenantContextFilter(logging.Filter):

"""Logging filter to inject tenant ID into the logger's name."""

def filter(self, record: logging.LogRecord) -> bool:
if not MULTI_TENANT:
record.name = ""
return True

tenant_id = CURRENT_TENANT_ID_CONTEXTVAR.get()
if tenant_id:
tenant_id = tenant_id.split(TENANT_ID_PREFIX)[-1][:5]
record.name = f"[t:{tenant_id}]"
else:
record.name = ""
return True


@task_prerun.connect
def set_tenant_id(
sender: Any | None = None,
task_id: str | None = None,
task: Task | None = None,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
**other_kwargs: Any,
) -> None:
"""Signal handler to set tenant ID in context var before task starts."""
tenant_id = kwargs.get("tenant_id") if kwargs else None
CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id)


@task_postrun.connect
def reset_tenant_id(
sender: Any | None = None,
task_id: str | None = None,
task: Task | None = None,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
**other_kwargs: Any,
) -> None:
"""Signal handler to reset tenant ID in context var after task ends."""
CURRENT_TENANT_ID_CONTEXTVAR.reset()
23 changes: 11 additions & 12 deletions backend/danswer/background/celery/apps/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,18 @@ def tick(self) -> float:
self._last_reload is None
or (now - self._last_reload) > self._reload_interval
):
logger.info("Reload interval reached, initiating tenant task update")
logger.info("Reload interval reached, initiating task update")
self._update_tenant_tasks()
self._last_reload = now
logger.info("Tenant task update completed, reset reload timer")
logger.info("Task update completed, reset reload timer")
return retval

def _update_tenant_tasks(self) -> None:
logger.info("Starting tenant task update process")
logger.info("Starting task update process")
try:
logger.info("Fetching all tenant IDs")
logger.info("Fetching all IDs")
tenant_ids = get_all_tenant_ids()
logger.info(f"Found {len(tenant_ids)} tenants")
logger.info(f"Found {len(tenant_ids)} IDs")

logger.info("Fetching tasks to schedule")
tasks_to_schedule = fetch_versioned_implementation(
Expand All @@ -68,11 +68,11 @@ def _update_tenant_tasks(self) -> None:
for task_name, _ in current_schedule:
if "-" in task_name:
existing_tenants.add(task_name.split("-")[-1])
logger.info(f"Found {len(existing_tenants)} existing tenants in schedule")
logger.info(f"Found {len(existing_tenants)} existing items in schedule")

for tenant_id in tenant_ids:
if tenant_id not in existing_tenants:
logger.info(f"Processing new tenant: {tenant_id}")
logger.info(f"Processing new item: {tenant_id}")

for task in tasks_to_schedule():
task_name = f"{task['name']}-{tenant_id}"
Expand Down Expand Up @@ -118,11 +118,10 @@ def _update_tenant_tasks(self) -> None:
logger.info("Schedule update completed successfully")
else:
logger.info("Schedule is up to date, no changes needed")

except (AttributeError, KeyError):
logger.exception("Failed to process task configuration")
except Exception:
logger.exception("Unexpected error updating tenant tasks")
except (AttributeError, KeyError) as e:
logger.exception(f"Failed to process task configuration: {str(e)}")
except Exception as e:
logger.exception(f"Unexpected error updating tasks: {str(e)}")

def _should_update_schedule(
self, current_schedule: dict, new_schedule: dict
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str | None) -> N
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception:
task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
task_logger.exception("Unexpected exception during connector deletion check")
finally:
if lock_beat.owned():
lock_beat.release()
Expand Down Expand Up @@ -132,14 +132,14 @@ def try_generate_document_cc_pair_cleanup_tasks(
redis_connector_index = redis_connector.new_index(search_settings.id)
if redis_connector_index.fenced:
raise TaskDependencyError(
f"Connector deletion - Delayed (indexing in progress): "
"Connector deletion - Delayed (indexing in progress): "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings.id}"
)

if redis_connector.prune.fenced:
raise TaskDependencyError(
f"Connector deletion - Delayed (pruning in progress): "
"Connector deletion - Delayed (pruning in progress): "
f"cc_pair={cc_pair_id}"
)

Expand Down Expand Up @@ -170,7 +170,7 @@ def try_generate_document_cc_pair_cleanup_tasks(
# return 0

task_logger.info(
f"RedisConnectorDeletion.generate_tasks finished. "
"RedisConnectorDeletion.generate_tasks finished. "
f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}"
)

Expand Down
9 changes: 1 addition & 8 deletions backend/danswer/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception:
task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
task_logger.exception("Unexpected exception during indexing check")
finally:
if lock_beat.owned():
lock_beat.release()
Expand Down Expand Up @@ -371,7 +371,6 @@ def try_creating_indexing_task(
redis_connector_index.set_fence(payload)
task_logger.exception(
f"Unexpected exception: "
f"tenant={tenant_id} "
f"cc_pair={cc_pair.id} "
f"search_settings={search_settings.id}"
)
Expand All @@ -393,7 +392,6 @@ def connector_indexing_proxy_task(
"""celery tasks are forked, but forking is unstable. This proxies work to a spawned task."""
task_logger.info(
f"Indexing proxy - starting: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
Expand All @@ -412,15 +410,13 @@ def connector_indexing_proxy_task(
if not job:
task_logger.info(
f"Indexing proxy - spawn failed: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
return

task_logger.info(
f"Indexing proxy - spawn succeeded: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
Expand All @@ -445,7 +441,6 @@ def connector_indexing_proxy_task(
task_logger.error(
f"Indexing proxy - spawned task exceptioned: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"error={job.exception()}"
Expand All @@ -456,7 +451,6 @@ def connector_indexing_proxy_task(

task_logger.info(
f"Indexing proxy - finished: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
Expand Down Expand Up @@ -644,7 +638,6 @@ def connector_indexing_task(

logger.info(
f"Indexing spawned task finished: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
Expand Down
6 changes: 3 additions & 3 deletions backend/danswer/background/celery/tasks/pruning/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> None:
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception:
task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
task_logger.exception("Unexpected exception during pruning check")
finally:
if lock_beat.owned():
lock_beat.release()
Expand Down Expand Up @@ -294,7 +294,7 @@ def connector_pruning_generator_task(
doc_ids_to_remove = list(all_indexed_document_ids - all_connector_doc_ids)

task_logger.info(
f"Pruning set collected: "
"Pruning set collected: "
f"cc_pair={cc_pair_id} "
f"docs_to_remove={len(doc_ids_to_remove)} "
f"doc_source={cc_pair.connector.source}"
Expand All @@ -310,7 +310,7 @@ def connector_pruning_generator_task(
return None

task_logger.info(
f"RedisConnector.prune.generate_tasks finished. "
"RedisConnector.prune.generate_tasks finished. "
f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}"
)

Expand Down
14 changes: 4 additions & 10 deletions backend/danswer/background/celery/tasks/shared/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def document_by_cc_pair_cleanup_task(
connector / credential pair from the access list
(6) delete all relevant entries from postgres
"""
task_logger.info(f"tenant={tenant_id} doc={document_id}")
task_logger.info(f"doc={document_id}")

try:
with get_session_with_tenant(tenant_id) as db_session:
Expand Down Expand Up @@ -128,16 +128,13 @@ def document_by_cc_pair_cleanup_task(
db_session.commit()

task_logger.info(
f"tenant={tenant_id} "
f"doc={document_id} "
f"action={action} "
f"refcount={count} "
f"chunks={chunks_affected}"
)
except SoftTimeLimitExceeded:
task_logger.info(
f"SoftTimeLimitExceeded exception. tenant={tenant_id} doc={document_id}"
)
task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}")
return False
except Exception as ex:
if isinstance(ex, RetryError):
Expand All @@ -154,15 +151,12 @@ def document_by_cc_pair_cleanup_task(
if e.response.status_code == HTTPStatus.BAD_REQUEST:
task_logger.exception(
f"Non-retryable HTTPStatusError: "
f"tenant={tenant_id} "
f"doc={document_id} "
f"status={e.response.status_code}"
)
return False

task_logger.exception(
f"Unexpected exception: tenant={tenant_id} doc={document_id}"
)
task_logger.exception(f"Unexpected exception: doc={document_id}")

if self.request.retries < DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES:
# Still retrying. Exponential backoff from 2^4 to 2^6 ... i.e. 16, 32, 64
Expand All @@ -173,7 +167,7 @@ def document_by_cc_pair_cleanup_task(
# eventually gets fixed out of band via stale document reconciliation
task_logger.info(
f"Max retries reached. Marking doc as dirty for reconciliation: "
f"tenant={tenant_id} doc={document_id}"
f"doc={document_id}"
)
with get_session_with_tenant(tenant_id):
mark_document_as_modified(document_id, db_session)
Expand Down
Loading
Loading