Skip to content

Commit

Permalink
Replace usages of task context logger with the log table (#40867)
Browse files Browse the repository at this point in the history
Use Log table instead of task context logger

The task context logger is inefficient; Log is better for this reason 

---------

Co-authored-by: Vincent <[email protected]>
  • Loading branch information
dstandish and vincbeck authored Jul 19, 2024
1 parent e7e8325 commit f684a58
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 133 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3372,6 +3372,7 @@ components:
owner:
description: Name of the user who triggered these events a.
type: string
nullable: true
readOnly: true
extra:
description: |
Expand Down
42 changes: 26 additions & 16 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,18 @@
import logging
import sys
import warnings
from collections import defaultdict
from collections import defaultdict, deque
from dataclasses import dataclass, field
from functools import cached_property
from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple

import pendulum

from airflow.cli.cli_config import DefaultHelpParser
from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models import Log
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.task_context_logger import TaskContextLogger
from airflow.utils.state import TaskInstanceState

PARALLELISM: int = conf.getint("core", "PARALLELISM")
Expand Down Expand Up @@ -131,6 +130,16 @@ def __init__(self, parallelism: int = PARALLELISM):
self.queued_tasks: dict[TaskInstanceKey, QueuedTaskInstanceType] = {}
self.running: set[TaskInstanceKey] = set()
self.event_buffer: dict[TaskInstanceKey, EventBufferValueType] = {}
self._task_event_logs: deque[Log] = deque()
"""
Deque for storing task event log messages.
This attribute is only internally public and should not be manipulated
directly by subclasses.
:meta private:
"""

self.attempts: dict[TaskInstanceKey, RunningRetryAttemptType] = defaultdict(RunningRetryAttemptType)

def __repr__(self):
Expand All @@ -139,6 +148,10 @@ def __repr__(self):
def start(self): # pragma: no cover
"""Executors may need to get things started."""

def log_task_event(self, *, event: str, extra: str, ti_key: TaskInstanceKey):
"""Add an event to the log table."""
self._task_event_logs.append(Log(event=event, task_instance=ti_key, extra=extra))

def queue_command(
self,
task_instance: TaskInstance,
Expand Down Expand Up @@ -288,13 +301,20 @@ def trigger_tasks(self, open_slots: int) -> None:
# if it hasn't been much time since first check, let it be checked again next time
self.log.info("queued but still running; attempt=%s task=%s", attempt.total_tries, key)
continue

# Otherwise, we give up and remove the task from the queue.
self.send_message_to_task_logs(
logging.ERROR,
self.log.error(
"could not queue task %s (still running after %d attempts).",
key,
attempt.total_tries,
ti=ti,
)
self.log_task_event(
event="task launch failure",
extra=(
"Task was in running set and could not be queued "
f"after {attempt.total_tries} attempts."
),
ti_key=key,
)
del self.attempts[key]
del self.queued_tasks[key]
Expand Down Expand Up @@ -526,16 +546,6 @@ def send_callback(self, request: CallbackRequest) -> None:
raise ValueError("Callback sink is not ready.")
self.callback_sink.send(request)

@cached_property
def _task_context_logger(self) -> TaskContextLogger:
return TaskContextLogger(
component_name="Executor",
call_site_logger=self.log,
)

def send_message_to_task_logs(self, level: int, msg: str, *args, ti: TaskInstance | TaskInstanceKey):
self._task_context_logger._log(level, msg, *args, ti=ti)

@staticmethod
def get_cli_commands() -> list[GroupCommand]:
"""
Expand Down
82 changes: 57 additions & 25 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import sys
import time
import warnings
from collections import Counter, defaultdict
from collections import Counter, defaultdict, deque
from dataclasses import dataclass
from datetime import timedelta
from functools import lru_cache, partial
Expand All @@ -44,6 +44,7 @@
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.jobs.job import Job, perform_heartbeat
from airflow.models import Log
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
Expand All @@ -62,7 +63,6 @@
from airflow.utils import timezone
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.task_context_logger import TaskContextLogger
from airflow.utils.retries import MAX_DB_RETRIES, retry_db_transaction, run_with_db_retries
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.sqlalchemy import (
Expand Down Expand Up @@ -237,10 +237,6 @@ def __init__(
self.processor_agent: DagFileProcessorAgent | None = None

self.dagbag = DagBag(dag_folder=self.subdir, read_dags_from_db=True, load_op_links=False)
self._task_context_logger: TaskContextLogger = TaskContextLogger(
component_name=self.job_type,
call_site_logger=self.log,
)

@provide_session
def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
Expand Down Expand Up @@ -740,6 +736,11 @@ def _critical_section_enqueue_task_instances(self, session: Session) -> int:

return len(queued_tis)

@staticmethod
def _process_task_event_logs(log_records: deque[Log], session: Session):
objects = (log_records.popleft() for _ in range(len(log_records)))
session.bulk_save_objects(objects=objects, preserve_order=False)

def _process_executor_events(self, executor: BaseExecutor, session: Session) -> int:
"""Respond to executor events."""
if not self._standalone_dag_processor and not self.processor_agent:
Expand Down Expand Up @@ -842,7 +843,8 @@ def _process_executor_events(self, executor: BaseExecutor, session: Session) ->
)
if info is not None:
msg += " Extra info: %s" % info # noqa: RUF100, UP031, flynt
self._task_context_logger.error(msg, ti=ti)
self.log.error(msg)
session.add(Log(event="state mismatch", extra=msg, task_instance=ti.key))

# Get task from the Serialized DAG
try:
Expand Down Expand Up @@ -1066,6 +1068,14 @@ def _run_scheduler_loop(self) -> None:
num_finished_events += self._process_executor_events(
executor=executor, session=session
)

for executor in self.job.executors:
try:
with create_session() as session:
self._process_task_event_logs(executor._task_event_logs, session)
except Exception:
self.log.exception("Something went wrong when trying to save task event logs.")

if self.processor_agent:
self.processor_agent.heartbeat()

Expand Down Expand Up @@ -1649,11 +1659,20 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
for ti in stuck_tis:
if repr(ti) in cleaned_up_task_instances:
self._task_context_logger.warning(
self.log.warning(
"Marking task instance %s stuck in queued as failed. "
"If the task instance has available retries, it will be retried.",
ti,
ti=ti,
)
session.add(
Log(
event="stuck in queued",
task_instance=ti.key,
extra=(
"Task will be marked as failed. If the task instance has "
"available retries, it will be retried."
),
)
)
except NotImplementedError:
self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.")
Expand Down Expand Up @@ -1817,22 +1836,35 @@ def _find_zombies(self) -> None:
if zombies:
self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)

for ti, file_loc, processor_subdir in zombies:
zombie_message_details = self._generate_zombie_message_details(ti)
request = TaskCallbackRequest(
full_filepath=file_loc,
processor_subdir=processor_subdir,
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=str(zombie_message_details),
)
log_message = (
f"Detected zombie job: {request} "
"(See https://airflow.apache.org/docs/apache-airflow/"
"stable/core-concepts/tasks.html#zombie-undead-tasks)"
)
self._task_context_logger.error(log_message, ti=ti)
self.job.executor.send_callback(request)
Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id})
with create_session() as session:
for ti, file_loc, processor_subdir in zombies:
zombie_message_details = self._generate_zombie_message_details(ti)
request = TaskCallbackRequest(
full_filepath=file_loc,
processor_subdir=processor_subdir,
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=str(zombie_message_details),
)
session.add(
Log(
event="heartbeat timeout",
task_instance=ti.key,
extra=(
f"Task did not emit heartbeat within time limit ({self._zombie_threshold_secs} "
"seconds) and will be terminated. "
"See https://airflow.apache.org/docs/apache-airflow/"
"stable/core-concepts/tasks.html#zombie-undead-tasks"
),
)
)
self.log.error(
"Detected zombie job: %s "
"(See https://airflow.apache.org/docs/apache-airflow/"
"stable/core-concepts/tasks.html#zombie-undead-tasks)",
request,
)
self.job.executor.send_callback(request)
Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id})

# [END find_zombies]

Expand Down
24 changes: 20 additions & 4 deletions airflow/models/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

from sqlalchemy import Column, Index, Integer, String, Text

from airflow.models.base import Base, StringID
from airflow.utils import timezone
from airflow.utils.sqlalchemy import UtcDateTime

if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey


class Log(Base):
"""Used to actively log events to the database."""
Expand All @@ -49,22 +55,32 @@ class Log(Base):
Index("idx_log_task_instance", dag_id, task_id, run_id, map_index, try_number),
)

def __init__(self, event, task_instance=None, owner=None, owner_display_name=None, extra=None, **kwargs):
def __init__(
self,
event,
task_instance: TaskInstance | TaskInstanceKey | None = None,
owner=None,
owner_display_name=None,
extra=None,
**kwargs,
):
self.dttm = timezone.utcnow()
self.event = event
self.extra = extra

task_owner = None

self.execution_date = None
if task_instance:
self.dag_id = task_instance.dag_id
self.task_id = task_instance.task_id
self.execution_date = task_instance.execution_date
if execution_date := getattr(task_instance, "execution_date", None):
self.execution_date = execution_date
self.run_id = task_instance.run_id
self.try_number = task_instance.try_number
self.map_index = task_instance.map_index
if getattr(task_instance, "task", None):
task_owner = task_instance.task.owner
if task := getattr(task_instance, "task", None):
task_owner = task.owner

if "task_id" in kwargs:
self.task_id = kwargs["task_id"]
Expand Down
33 changes: 20 additions & 13 deletions airflow/providers/amazon/aws/executors/batch/batch_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

from __future__ import annotations

import contextlib
import logging
import time
from collections import deque
from contextlib import suppress
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Dict, List, Sequence

Expand Down Expand Up @@ -292,12 +291,19 @@ def attempt_submit_jobs(self):

if failure_reason:
if attempt_number >= int(self.__class__.MAX_SUBMIT_JOB_ATTEMPTS):
self.send_message_to_task_logs(
logging.ERROR,
"This job has been unsuccessfully attempted too many times (%s). Dropping the task. Reason: %s",
self.log.error(
(
"This job has been unsuccessfully attempted too many times (%s). "
"Dropping the task. Reason: %s"
),
attempt_number,
failure_reason,
ti=key,
)
self.log_task_event(
event="batch job submit failure",
extra=f"This job has been unsuccessfully attempted too many times ({attempt_number}). "
f"Dropping the task. Reason: {failure_reason}",
ti_key=key,
)
self.fail(key=key)
else:
Expand All @@ -317,7 +323,7 @@ def attempt_submit_jobs(self):
exec_config=exec_config,
attempt_number=attempt_number,
)
with contextlib.suppress(AttributeError):
with suppress(AttributeError):
# TODO: Remove this when min_airflow_version is 2.10.0 or higher in Amazon provider.
# running_state is added in Airflow 2.10 and only needed to support task adoption
# (an optional executor feature).
Expand Down Expand Up @@ -458,10 +464,11 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
not_adopted_tis = [ti for ti in tis if ti not in adopted_tis]
return not_adopted_tis

def send_message_to_task_logs(self, level: int, msg: str, *args, ti: TaskInstance | TaskInstanceKey):
def log_task_event(self, *, event: str, extra: str, ti_key: TaskInstanceKey):
# TODO: remove this method when min_airflow_version is set to higher than 2.10.0
try:
super().send_message_to_task_logs(level, msg, *args, ti=ti)
except AttributeError:
# ``send_message_to_task_logs`` is added in 2.10.0
self.log.error(msg, *args)
with suppress(AttributeError):
super().log_task_event(
event=event,
extra=extra,
ti_key=ti_key,
)
Loading

0 comments on commit f684a58

Please sign in to comment.