Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AWS-MWAA] Sending SIGTERM due to status change detection after task completion #21497

Closed
1 of 2 tasks
mitsos1os opened this issue Feb 10, 2022 · 2 comments
Closed
1 of 2 tasks
Labels
area:core kind:bug This is a clearly a bug

Comments

@mitsos1os
Copy link

mitsos1os commented Feb 10, 2022

Apache Airflow version

2.0.2

What happened

We are running 2.0.2 version of Airflow in AWS MWAA.

After a point in time, our MWAA setup started producing errors on ALL tasks asynchronously.
It is not related to the type of task since it is happening in ALL tasks (Plain Python operators and ECS Operators) in ALL Dags.

More specifically:

1. Initially, the task completes successfully and pipeline follows to next tasks as seen in the lines:

[2022-02-10 11:42:07,012] {{taskinstance.py:1192}} INFO - Marking task as SUCCESS. dag_id=non_enterprise_24h_horizon, task_id=init_dag, execution_date=20220210T104200, start_date=20220210T114206, end_date=20220210T114207
[2022-02-10 11:42:07,012] {{taskinstance.py:1891}} DEBUG - Task Duration set to 0.247267
[2022-02-10 11:42:07,051] {{taskinstance.py:1246}} INFO - 1 downstream tasks scheduled from follow-on schedule check

2. However after 5 seconds an extra read-state functionality is triggered on the task that somehow thinks that the task's state was marked externally and starts trying to terminate it:

[2022-02-10 11:42:11,842] {{taskinstance.py:595}} DEBUG - Refreshing TaskInstance <TaskInstance: non_enterprise_24h_horizon.init_dag 2022-02-10T10:42:00+00:00 [running]> from DB
[2022-02-10 11:42:11,853] {{taskinstance.py:630}} DEBUG - Refreshed TaskInstance <TaskInstance: non_enterprise_24h_horizon.init_dag 2022-02-10T10:42:00+00:00 [success]>
[2022-02-10 11:42:11,855] {{logging_mixin.py:104}} INFO - [2022-02-10 11:42:11,855] {{local_task_job.py:188}} WARNING - State of this instance has been externally set to success. Terminating instance.
[2022-02-10 11:42:11,856] {{process_utils.py:100}} INFO - Sending Signals.SIGTERM to GPID 5523

The task though is already terminated so the SIGTERM signal sent times out and an arbitrary ERROR is returned as -9 stating that this could be an out of memory error. This is not the case however since this is just assigned as an explicit code when the process cannot be found as seen here, so it finally errors out with:

[2022-02-10 11:43:11,885] {{process_utils.py:113}} WARNING - process psutil.Process(pid=5523, name='airflow task ru', status='sleeping', started='11:42:06') did not respond to SIGTERM. Trying SIGKILL
[2022-02-10 11:43:11,893] {{process_utils.py:66}} INFO - Process psutil.Process(pid=5523, name='airflow task ru', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='11:42:06') (5523) terminated with exit code Negsignal.SIGKILL
[2022-02-10 11:43:11,893] {{standard_task_runner.py:130}} ERROR - Job 415494 was killed before it finished (likely due to running out of memory)

I tried removing all custom configuration entries in MWAA as well as at the same time revert DAGs to a previous version that did not produce these errors but without any result.

UPDATE

I found exactly the same issue in #13824 as well as the root cause in #16227 and its PR-Solution #16289

Since MWAA on AWS is stuck on updates is there a suggested way to tackle this except for setting the AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION variable to False which will affect performance?

Also strange that this was not happening before and it started now... I also saw that it could be related to Database load... Could this be the case?

What you expected to happen

I would expect the tasks to terminate gracefully after successful completion as shown in:

[2022-02-10 11:42:07,012] {{taskinstance.py:1192}} INFO - Marking task as SUCCESS. dag_id=non_enterprise_24h_horizon, task_id=init_dag, execution_date=20220210T104200, start_date=20220210T114206, end_date=20220210T114207
[2022-02-10 11:42:07,012] {{taskinstance.py:1891}} DEBUG - Task Duration set to 0.247267
[2022-02-10 11:42:07,051] {{taskinstance.py:1246}} INFO - 1 downstream tasks scheduled from follow-on schedule check

and not moving on with the state change situation that is marked for termination..

Complete log below:

[2022-02-10 11:42:07,012] {{taskinstance.py:1192}} INFO - Marking task as SUCCESS. dag_id=non_enterprise_24h_horizon, task_id=init_dag, execution_date=20220210T104200, start_date=20220210T114206, end_date=20220210T114207
[2022-02-10 11:42:07,012] {{taskinstance.py:1891}} DEBUG - Task Duration set to 0.247267
[2022-02-10 11:42:07,051] {{taskinstance.py:1246}} INFO - 1 downstream tasks scheduled from follow-on schedule check
[2022-02-10 11:42:07,053] {{cli_action_loggers.py:84}} DEBUG - Calling callbacks: []
[2022-02-10 11:42:11,842] {{taskinstance.py:595}} DEBUG - Refreshing TaskInstance <TaskInstance: non_enterprise_24h_horizon.init_dag 2022-02-10T10:42:00+00:00 [running]> from DB
[2022-02-10 11:42:11,853] {{taskinstance.py:630}} DEBUG - Refreshed TaskInstance <TaskInstance: non_enterprise_24h_horizon.init_dag 2022-02-10T10:42:00+00:00 [success]>
[2022-02-10 11:42:11,855] {{logging_mixin.py:104}} INFO - [2022-02-10 11:42:11,855] {{local_task_job.py:188}} WARNING - State of this instance has been externally set to success. Terminating instance.
[2022-02-10 11:42:11,856] {{process_utils.py:100}} INFO - Sending Signals.SIGTERM to GPID 5523
[2022-02-10 11:43:11,885] {{process_utils.py:113}} WARNING - process psutil.Process(pid=5523, name='airflow task ru', status='sleeping', started='11:42:06') did not respond to SIGTERM. Trying SIGKILL
[2022-02-10 11:43:11,893] {{process_utils.py:66}} INFO - Process psutil.Process(pid=5523, name='airflow task ru', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='11:42:06') (5523) terminated with exit code Negsignal.SIGKILL
[2022-02-10 11:43:11,893] {{standard_task_runner.py:130}} ERROR - Job 415494 was killed before it finished (likely due to running out of memory)

How to reproduce

This happens for every task even for small Python ShortCircuitOperators like:

def validate_rate_config(bucket: str, key: str) -> bool:
    s3 = S3Hook(aws_conn_id="aws_default")
    return s3.check_for_key(key, bucket)


rate_config_validator = ShortCircuitOperator(
        dag=dag_instance,
        task_id=f"rate_config_validator_i{index}",
        op_kwargs={"bucket": rates_s3_bucket, "key": inputs_key_prefix + f"/input_{index}.json"},
        python_callable=validate_rate_config,
        retries=2,
    )

Operating System

AWS MWAA - class: mw1.large

Versions of Apache Airflow Providers

As taken from MWAA doc page and requirements link (althought it might be outdated):

apache-airflow-providers-amazon==1.3.0
apache-airflow-providers-celery==1.0.1
apache-airflow-providers-ftp==1.0.1
apache-airflow-providers-http==1.1.1
apache-airflow-providers-imap==1.0.1
apache-airflow-providers-sqlite==1.0.2

Deployment

MWAA

Deployment details

Some custom options have been tried but also reverted to make sure that they are not responsible for the error:

"celery.sync_parallelism" = 1
"scheduler.parsing_processes" = 6
"core.min_serialized_dag_update_interval" = 30
"core.min_serialized_dag_fetch_interval" = 35
"scheduler.processor_poll_interval" = 1

Anything else

It occurs everytime

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@mitsos1os mitsos1os added area:core kind:bug This is a clearly a bug labels Feb 10, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Feb 10, 2022

Thanks for opening your first issue here! Be sure to follow the issue template!

@potiuk
Copy link
Member

potiuk commented Feb 10, 2022

AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION variable to False affect performance just a little (it basically makes sure that there are as small gaps between finishing the task and starting the depending tasks). It does not affect "overall" performance of scheduling, just latency of specific cases. So I'd say it's a reasonable workaround.

Also MWAA supports 2.2. line of Airflow (with no easy migration though) so I'd suggest this one also as a possibilitty - Airflow 2.2 has quite a number of improvements and fixes comparing to 2.0, not mentioning some new features - and I'd say trying to implement any kind of other workaround might be more costly than migration to 2.2. So if you are considering some investment - I propose to migrate to 2.2.

Moving it into discussion as it is not an issue any more.

@apache apache locked and limited conversation to collaborators Feb 10, 2022
@potiuk potiuk converted this issue into discussion #21499 Feb 10, 2022

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

2 participants