Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SLAs for only one task in a DAG #16389

Closed
hkarakaki opened this issue Jun 11, 2021 · 7 comments
Closed

SLAs for only one task in a DAG #16389

hkarakaki opened this issue Jun 11, 2021 · 7 comments
Assignees
Labels
affected_version:2.1 Issues Reported for 2.1 kind:bug This is a clearly a bug

Comments

@hkarakaki
Copy link

hkarakaki commented Jun 11, 2021

Apache Airflow version: 2.1.0
Kubernetes version (if you are using kubernetes) (use kubectl version): NA
Environment:

What happened:
We migrated from 1.10.15 this week, and noticed SLA misses are not being reported in UI and callback isn't triggered. Usually we have some checkpoint task in our largest DAGs and the SLA is defined only for that task, the task also helps us set up cross-DAG dependencies.

What you expected to happen:
SLA misses are registered in UI and trigger the callback.

What do you think went wrong?
I think tasks without SLA should be excluded from the loop here:

max_tis: List[TI] = (
session.query(TI)
.filter(
TI.dag_id == dag.dag_id,
TI.task_id == qry.c.task_id,
TI.execution_date == qry.c.max_ti,
)
.all()
)
ts = timezone.utcnow()
for ti in max_tis:
task = dag.get_task(ti.task_id)
if task.sla and not isinstance(task.sla, timedelta):
raise TypeError(
f"SLA is expected to be timedelta object, got "
f"{type(task.sla)} in {task.dag_id}:{task.task_id}"
)
dttm = dag.following_schedule(ti.execution_date)
while dttm < timezone.utcnow():
following_schedule = dag.following_schedule(dttm)
if following_schedule + task.sla < timezone.utcnow():
session.merge(
SlaMiss(task_id=ti.task_id, dag_id=ti.dag_id, execution_date=dttm, timestamp=ts)
)
dttm = dag.following_schedule(dttm)
session.commit()

How to reproduce it:
With this DAG you can check that the SLA is not detected.

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

with DAG(
    dag_id="sla_trigger",
    schedule_interval="*/5 * * * *",
    start_date=datetime(2021, 6, 10),
) as dag:
    BashOperator(
        task_id="bash_task",
        bash_command="sleep 30",
        sla=timedelta(seconds=2),
    )
    BashOperator(
        task_id="bash_task_2",
        bash_command="sleep 30",
    )

But to get the error mentioned below, you need to add a callback function.

Anything else we need to know:
This error happens every time for every DAG that has SLA on some tasks, and the same DAG definition worked fine in v1.10.15.

Scheduler's log We've seen this error in the scheduler for all the DAGs:
[2021-06-09 20:30:57,679] {scheduler_job.py:396} INFO - Running SLA Checks for company_dag
[2021-06-09 20:30:57,763] {scheduler_job.py:569} ERROR - Error executing SlaCallbackRequest callback for file: /opt/airflow/tasks/airflow_dags/eu-central-1/company_dag.py
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/scheduler_job.py", line 565, in execute_callbacks
self.manage_slas(dagbag.dags.get(request.dag_id))
File "/usr/local/lib/python3.8/dist-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/scheduler_job.py", line 433, in manage_slas
if following_schedule + task.sla < timezone.utcnow():
TypeError: unsupported operand type(s) for +: 'datetime.datetime' and 'NoneType'
@hkarakaki hkarakaki added the kind:bug This is a clearly a bug label Jun 11, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 11, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@ashb ashb added the affected_version:2.1 Issues Reported for 2.1 label Jun 11, 2021
@jhtimmins jhtimmins self-assigned this Jun 11, 2021
@Tonkonozhenko
Copy link
Contributor

@jhtimmins any updated? We faced the same issue

@tseruga
Copy link

tseruga commented Aug 26, 2021

To add some more findings from my personal testing - this seems to only appear when using the context manager style of defining a DAG (e.g. with DAG(...) as dag:)

@tazimmerman
Copy link

To add some more findings from my personal testing - this seems to only appear when using the context manager style of defining a DAG (e.g. with DAG(...) as dag:)

As of 2.1.4 this also happens without the context manager.

Traceback (most recent call last):
  File "/local/airflow/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 545, in execute_callbacks
    elif isinstance(request, SlaCallbackRequest):
  File "/local/airflow/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/local/airflow/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 413, in manage_slas
    if following_schedule + task.sla < timezone.utcnow():                                                                                                                                                                                                                                   
TypeError: unsupported operand type(s) for +: 'datetime.datetime' and 'NoneType'

From what I can tell it's because the check at the start of the manage_slas function (in airflow/dag_processing/processor.py) will short-circuit when no tasks have a valid SLA, but the rest of the code assumes that all tasks will have a valid SLA (by valid I mean it's a timedelta and thus comparable).

            while dttm < timezone.utcnow():
                following_schedule = dag.following_schedule(dttm)
                if following_schedule + task.sla < timezone.utcnow():
                    session.merge(
                        SlaMiss(task_id=ti.task_id, dag_id=ti.dag_id, execution_date=dttm, timestamp=ts)
                    )
                dttm = dag.following_schedule(dttm)

I'm not sure if there are other repercussions but it seems like an easy fix is to change that check line to:

    if task.sla and following_schedule + task.sla < timezone.utcnow():                                                                                                                                                                                                                                   

@tazimmerman
Copy link

tazimmerman commented Oct 8, 2021

It appears that commit 2fd3f27 will resolve the issue.

@ReadytoRocc
Copy link
Contributor

I have tested using astronomer ap-airflow image quay.io/astronomer/ap-airflow:2.2.0-onbuild, and no longer see this behavior. SLA misses are now appearing in the UI for bash_task using the provided DAG.

@eladkal
Copy link
Contributor

eladkal commented Feb 22, 2022

great. closing the issue then

@eladkal eladkal closed this as completed Feb 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.1 Issues Reported for 2.1 kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

8 participants