-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hide some task instance attributes in details page #23338
Conversation
The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease. |
(cherry picked from commit f5f9c58)
(cherry picked from commit f5f9c58)
Hey because these attributes are hidden, we are getting AttributeError in Airflow-2.3.2. Traceback (most recent call last): |
@Vishal487 Is this on your all tasks and dags, or only a specific dag or task? If it is a specific dag/task, could you provide an example that I could test with? |
Hi, may be this example can help test: import logging
from sqlalchemy.sql.expression import or_
from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils import timezone
from airflow.utils.state import State
from airflow.utils.dates import days_ago
from airflow.utils.db import provide_session
from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator
from airflow.models.taskinstance import TaskInstance
from airflow.operators.latest_only import LatestOnlyOperator
logger = logging.getLogger(__name__)
@provide_session
def run_queued_tasks(session):
filter_ = [
or_(TaskInstance.state == State.QUEUED, TaskInstance.state == State.NONE),
TaskInstance.queued_dttm < datetime.now(timezone.utc) - timedelta(minutes=20)
]
tis = session.query(TaskInstance).filter(*filter_).all()
if not len(tis):
raise AirflowSkipException("Skipped. Empty task instances list.")
logger.info(f"Updating {len(tis)} task instances:")
for ti in tis:
logger.info(dict(
dag_id=ti.dag_id,
task_id=ti.task_id,
state=ti.state,
execution_date=ti.execution_date,
))
ti.run(session=session)
session.commit()
logger.info("Done.")
with DAG(
"_queued_tasks_runner",
start_date=days_ago(1),
schedule_interval="*/10 * * * *",
default_args=dict(retries=3, retry_delay=timedelta(minutes=1)),
max_active_runs=1,
catchup=False,
tags=["utils"],
description="Utility DAG to Run TaskInstances stuck in queued state",
is_paused_upon_creation=True,
) as dag:
dag.doc_md = """
DAG change the state to RUNNING.
"""
latest = LatestOnlyOperator(task_id="latest", owner="@kosarevsky")
run_tasks = PythonOperator(task_id="run_tasks", python_callable=run_queued_tasks)
latest >> run_tasks this code return: Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 171, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 189, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 71, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/airflow/dags/repo/dags/pik_digital/pik_dags/_queued_tasks_runner/__init__.py", line 39, in run_queued_tasks
ti.run(session=session)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1786, in run
res = self.check_and_change_state_before_execution(
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1307, in check_and_change_state_before_execution
task = self.task
AttributeError: 'TaskInstance' object has no attribute 'task' upd: Airflow 2.3.3 + k8s |
This is completely different issue. You are completely abusing the way how internal tasks and methods of task instances and models are run. This example is great example of antipattern and things you should not do. |
In the task instance page at
/task
, we were showing some task instance attributes that were unnecessary.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragement file, named
{pr_number}.significant.rst
, in newsfragments.