Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AttributeError: 'TaskInstance' object has no attribute 'task' #25576

Closed
1 of 2 tasks
dKosarevsky opened this issue Aug 7, 2022 · 1 comment
Closed
1 of 2 tasks

AttributeError: 'TaskInstance' object has no attribute 'task' #25576

dKosarevsky opened this issue Aug 7, 2022 · 1 comment
Labels
area:core invalid kind:bug This is a clearly a bug

Comments

@dKosarevsky
Copy link

dKosarevsky commented Aug 7, 2022

Apache Airflow version

2.3.3

What happened

Try to run task from code and get error:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 171, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 189, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 71, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/dags/repo/dags/pik_digital/pik_dags/_queued_tasks_runner/__init__.py", line 39, in run_queued_tasks
    ti.run(session=session)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1786, in run
    res = self.check_and_change_state_before_execution(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1307, in check_and_change_state_before_execution
    task = self.task
AttributeError: 'TaskInstance' object has no attribute 'task'

What you think should happen instead

Instead of error task should run. I think it chained with Hide some task instance attributes in details page #23338

How to reproduce

import logging

from sqlalchemy.sql.expression import or_
from datetime import datetime, timedelta

from airflow import DAG
from airflow.utils import timezone
from airflow.utils.state import State
from airflow.utils.dates import days_ago
from airflow.utils.db import provide_session
from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator
from airflow.models.taskinstance import TaskInstance
from airflow.operators.latest_only import LatestOnlyOperator


logger = logging.getLogger(__name__)


@provide_session
def run_queued_tasks(session):
    filter_ = [
        or_(TaskInstance.state == State.QUEUED, TaskInstance.state == State.NONE),
        TaskInstance.queued_dttm < datetime.now(timezone.utc) - timedelta(minutes=20)
    ]
    tis = session.query(TaskInstance).filter(*filter_).all()

    if not len(tis):
        raise AirflowSkipException("Skipped. Empty task instances list.")

    logger.info(f"Updating {len(tis)} task instances:")
    for ti in tis:
        logger.info(dict(
            dag_id=ti.dag_id,
            task_id=ti.task_id,
            state=ti.state,
            execution_date=ti.execution_date,
        ))
        ti.run(session=session)

    session.commit()
    logger.info("Done.")


with DAG(
        "_queued_tasks_runner",
        start_date=days_ago(1),
        schedule_interval="*/10 * * * *",
        default_args=dict(retries=3, retry_delay=timedelta(minutes=1)),
        max_active_runs=1,
        catchup=False,
        tags=["utils"],
        description="Utility DAG to Run TaskInstances stuck in queued state",
        is_paused_upon_creation=True,
) as dag:
    dag.doc_md = """
        DAG change the state to RUNNING.
    """
    latest = LatestOnlyOperator(task_id="latest", owner="@kosarevsky")
    run_tasks = PythonOperator(task_id="run_tasks", python_callable=run_queued_tasks)

    latest >> run_tasks

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==4.0.0
apache-airflow-providers-celery==3.0.0
apache-airflow-providers-cncf-kubernetes==4.1.0
apache-airflow-providers-common-sql==1.0.0
apache-airflow-providers-docker==3.0.0
apache-airflow-providers-elasticsearch==4.0.0
apache-airflow-providers-ftp==3.0.0
apache-airflow-providers-google==8.1.0
apache-airflow-providers-grpc==3.0.0
apache-airflow-providers-hashicorp==3.0.0
apache-airflow-providers-http==3.0.0
apache-airflow-providers-imap==3.0.0
apache-airflow-providers-microsoft-azure==4.0.0
apache-airflow-providers-microsoft-mssql==3.1.0
apache-airflow-providers-mysql==3.0.0
apache-airflow-providers-odbc==3.0.0
apache-airflow-providers-postgres==5.0.0
apache-airflow-providers-redis==3.0.0
apache-airflow-providers-sendgrid==3.0.0
apache-airflow-providers-sftp==3.0.0
apache-airflow-providers-slack==5.0.0
apache-airflow-providers-sqlite==3.0.0
apache-airflow-providers-ssh==3.0.0
apache-airflow-providers-telegram==3.0.0
apache-airflow-providers-vertica==3.1.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

terraform, helm, k8s, cloud

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@dKosarevsky dKosarevsky added area:core kind:bug This is a clearly a bug labels Aug 7, 2022
@potiuk
Copy link
Member

potiuk commented Aug 7, 2022

Nope. You are doing it wrong. Why do you want to run() ti method from within Python Callable ? this is an absolutely unsupported way of using Airflow and expecting it would work is a huge leap of faith.

Where did you get the idea to do it from? can you please point out to the documents, blogs or anything mentioning that you should do it? We need to make a coment that this is something that is completely wrong.

@potiuk potiuk closed this as completed Aug 7, 2022
@potiuk potiuk added the invalid label Aug 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core invalid kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

2 participants