Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs #13824

Closed
christmoore opened this issue Jan 22, 2021 · 32 comments
Labels
affected_version:2.0 Issues Reported for 2.0 area:logging area:providers kind:bug This is a clearly a bug priority:medium Bug that should be fixed before next release but would not block a release provider:amazon-aws AWS/Amazon - related issues

Comments

@christmoore
Copy link

christmoore commented Jan 22, 2021

Apache Airflow version:
2.0.0

Environment:
Docker Stack
Celery Executor w/ Redis
3 Workers, Scheduler + Webserver
Cloudwatch remote config turned on

What happened:
Following execution of a DAG when using cloudwatch integration, the state of the Task Instance is being externally set, causing SIGTERM/SIGKILL signals to be sent. This causes error logs in Workers, which is a nuisance for alert monitoring

*** Reading remote log from Cloudwatch log_group: dev1-airflow-task log_stream: xxxx/task/2021-01-21T17_59_19.643994+00_00/1.log.
Dependencies all met for <TaskInstance: xxxx.task  2021-01-21T17:59:19.643994+00:00 [queued]>
Dependencies all met for <TaskInstance: xxxx.task  2021-01-21T17:59:19.643994+00:00 [queued]>
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------
Executing <Task(TaskVerificationOperator): task > on 2021-01-21T17:59:19.643994+00:00
Started process 654 to run task
Running <TaskInstance: xxxx.task  2021-01-21T17:59:19.643994+00:00 [running]> on host 88f99fbc97a8
Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=xxxxxxxx
AIRFLOW_CTX_DAG_OWNER=xxxxxxxx
AIRFLOW_CTX_DAG_ID=xxxxxxxx
AIRFLOW_CTX_TASK_ID=xxxxxx
AIRFLOW_CTX_EXECUTION_DATE=2021-01-21T17:59:19.643994+00:00
AIRFLOW_CTX_DAG_RUN_ID=85
Set new audit correlation_id xxxxxxxxxx-xxxxxx-xxxxxxxxx
Using connection to: id: xxxxx. Host: xxxxxxx, Port: 5432, Schema: xxxxxx, Login: xxxxxx, Password: XXXXXXXX, extra: None
Marking task as SUCCESS. dag_id=xxxxxx, task_id=xxxxxx, execution_date=20210121T175919, start_date=20210121T175936, end_date=20210121T175938
1 downstream tasks scheduled from follow-on schedule check

However following the completion of the DAG, the following is appended to the logs:

State of this instance has been externally set to success. Terminating instance.
Sending Signals.SIGTERM to GPID 654
process psutil.Process(pid=654, name='xxxxx', status='sleeping', started='17:59:36') did not respond to SIGTERM. Trying SIGKILL
Process psutil.Process(pid=654, name='xxxxx', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='17:59:36') (654) terminated with exit code Negsignal.SIGKILL
Task exited with return code Negsignal.SIGKILL

This is a problem, because it causes the following to appear in Worker logs:

[2021-01-21 15:00:01,102: WARNING/ForkPoolWorker-8] Running <TaskInstance: xxxx.task 2021-01-21T14:00:00+00:00 [queued]> on host ip-172-31-3-210.ec2.internal
...
[2021-01-21 15:00:06,599: ERROR/ForkPoolWorker-8] Failed to execute task Task received SIGTERM signal.

What you expected to happen:
No errors to appear in Worker logs, if this SIGTERM/SIGKILL is intended

How to reproduce it:
Use Airflow w/ Celery Executor and Cloudwatch Remote Logging

Anything else we need to know:
Occurs every time, every task in DAG

@christmoore christmoore added the kind:bug This is a clearly a bug label Jan 22, 2021
@potiuk potiuk added this to the Airflow 2.0.1 milestone Jan 22, 2021
@vikramkoka vikramkoka added affected_version:2.0 Issues Reported for 2.0 area:logging priority:medium Bug that should be fixed before next release but would not block a release labels Jan 22, 2021
@mtraynham
Copy link
Contributor

mtraynham commented Jan 23, 2021

One thing to note with this issue, I believe this is preventing workers from also consuming more work via a 60 second sleep at the end of every task. We noticed that there was a ~30-40 second delay between 2 workers running tasks, as if all workers were busy doing something, but nothing was actually happening. It seemed work had been queued, but was not being performed.

After looking at the logs for a given task in CloudWatch (which has the added benefit of including times in the log), there's a 60 second sleep after SIGTERM to when the process is finally killed with SIGKILL.

2021-01-23T15:00:02.682Z | Marking task as SUCCESS. dag_id=xxxxxxxx, task_id=xxxxxxx, execution_date=20210123T140000, start_date=20210123T150001, end_date=20210123T150002
-- | --
  | 2021-01-23T15:00:02.715Z | 0 downstream tasks scheduled from follow-on schedule check
  | 2021-01-23T15:00:06.606Z | State of this instance has been externally set to success. Terminating instance.
  | 2021-01-23T15:00:06.607Z | Sending Signals.SIGTERM to GPID 155
  | 2021-01-23T15:01:06.617Z | process psutil.Process(pid=155, name='airflow task ru', status='sleeping', started='15:00:01') did not respond to SIGTERM. Trying SIGKILL
  | 2021-01-23T15:01:06.625Z | Process psutil.Process(pid=155, name='airflow task ru', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='15:00:01') (155) terminated with exit code Negsignal.SIGKILL
  | 2021-01-23T15:01:06.625Z | Task exited with return code Negsignal.SIGKILL

I imagine it's the configuration parameter KILLED_TASK_CLEANUP_TIME and it's default value of 60 that influences that sleep.

https://github.com/apache/airflow/blob/master/airflow/utils/process_utils.py#L42-L52

@kaxil kaxil removed this from the Airflow 2.0.1 milestone Feb 3, 2021
@kaxil kaxil added area:providers provider:amazon-aws AWS/Amazon - related issues labels Feb 3, 2021
@Miksu82
Copy link

Miksu82 commented Feb 16, 2021

It also seems that this issues sets all the JobRuns to have failed state although the TaskInstance is correctly marked as success

@density
Copy link

density commented May 26, 2021

We're also experiencing this issue with ECSOperator and CloudWatch logging on Airflow 2.0.2. Changing stopTimeout in our ECS config didn't fix the issue and it doesn't seem to always set the DAG run to failed for us like it does for @Miksu82.

@andormarkus
Copy link
Contributor

Hi @potiuk, Who can help us with this bug?

@potiuk
Copy link
Member

potiuk commented Jun 28, 2021

Maybe this is something the Amazon team can take a look at @subashcanapathy ? I think that's one of the obvious candidates that someone from AWS could help with since this is CloudWatch integration problem.

@ashb
Copy link
Member

ashb commented Jun 29, 2021

This might be fixed by #16289 -- one thing to try to see if that is the case is to disable the mini scheduler run by setting AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION=False

@andormarkus
Copy link
Contributor

Should I set AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION=False in 2.0.2 or wait till #16289 is released?
Is #16289 part of 2.1.1 or we have to wait till 2.1.2?

@ashb
Copy link
Member

ashb commented Jun 29, 2021

Set it to false and see if that fixes the problem. Sadly we missed that in 2.1.1, so it would have to wait for 2.1.2 which would be at least a few weeks.

And if setting that config doesn't help help then there is something else at fault here.

https://airflow.apache.org/docs/apache-airflow/2.0.2/configurations-ref.html#schedule-after-task-execution for reference.

@andormarkus
Copy link
Contributor

Hi @ashb

I set AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION=False in Airflow 2.1.2 which does not have #16289.

All the tasks are successful on Airflow side, however in flower all are failed.
The logs are showing up in CloudWatch as expected.

My airflow.cfg

[scheduler]
dag_dir_list_interval = 60
parsing_processes = 4
run_duration = 41460
schedule_after_task_execution = False
statsd_host = airflow-statsd
statsd_on = True
statsd_port = 9125
statsd_prefix = airflow

This is how Celery logs look like for a given worker:

[2021-07-20 09:02:49,057: INFO/ForkPoolWorker-15] Executing command in Celery: ['airflow', 'tasks', 'run', 'XXXXXXXX', 'eks.sensor', '2021-07-20T08:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/XXXXXXXX.py']
[2021-07-20 09:02:49,086: INFO/ForkPoolWorker-15] Filling up the DagBag from /opt/airflow/dags/XXXXXXXX.py
[2021-07-20 09:02:51,442: INFO/ForkPoolWorker-15] Datasets List: 2
[2021-07-20 09:02:51,442: INFO/ForkPoolWorker-15] Start getting tables list from dataset: XXXXXXXX
[2021-07-20 09:02:51,585: INFO/ForkPoolWorker-15] Start getting tables list from dataset: XXXXXXXX
[2021-07-20 09:02:52,380: WARNING/ForkPoolWorker-15] Running <TaskInstance: XXXXXXXX.eks.sensor 2021-07-20T08:00:00+00:00 [queued]> on host airflow-worker-7c6b4f75f9-x67r9

[2021-07-20 09:03:18,710: ERROR/ForkPoolWorker-15] Failed to execute task Task received SIGTERM signal.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 117, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 238, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job
    run_job.run()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 245, in run
    self._execute()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py", line 100, in _execute
    self.task_runner.start()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 41, in start
    self.process = self._start_by_fork()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 92, in _start_by_fork
    logging.shutdown()
  File "/usr/local/lib/python3.8/logging/__init__.py", line 2126, in shutdown
    h.flush()
  File "/home/airflow/.local/lib/python3.8/site-packages/watchtower/__init__.py", line 297, in flush
    q.join()
  File "/usr/local/lib/python3.8/queue.py", line 89, in join
    self.all_tasks_done.wait()
  File "/usr/local/lib/python3.8/threading.py", line 302, in wait
    waiter.acquire()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1286, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal

[2021-07-20 09:04:18,738: ERROR/ForkPoolWorker-15] Failed to execute task [Errno 2] No such file or directory: '/tmp/tmpbwn0h8za'.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 117, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 238, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job
    run_job.run()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 245, in run
    self._execute()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py", line 145, in _execute
    self.on_kill()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py", line 166, in on_kill
    self.task_runner.on_finish()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/task/task_runner/base_task_runner.py", line 178, in on_finish
    self._error_file.close()
  File "/usr/local/lib/python3.8/tempfile.py", line 499, in close
    self._closer.close()
  File "/usr/local/lib/python3.8/tempfile.py", line 436, in close
    unlink(self.name)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmpbwn0h8za'
[2021-07-20 09:04:18,834: ERROR/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[0a8ca64c-01df-4868-a21d-b369d3f7a6cd] raised unexpected: AirflowException('Celery command failed on host: airflow-worker-7c6b4f75f9-x67r9')
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command
    _execute_in_fork(command_to_exec)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
    raise AirflowException('Celery command failed on host: ' + get_hostname())
airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-7c6b4f75f9-x67r9

@ephraimbuddy
Copy link
Contributor

ephraimbuddy commented Jul 20, 2021

@andormarkus can you set this:
AIRFLOW__SCHEDULER__ORPHANED_TASKS_CHECK_INTERVAL=84600 and let me know the result?

If this worked for you, you should remove it when 2.1.3 is out

@andormarkus
Copy link
Contributor

Hi @ephraimbuddy

I still having the same issue. Logs are the same

airflow.cfg

[scheduler]
dag_dir_list_interval = 60
orphaned_tasks_check_interval = 84600
parsing_processes = 4
run_duration = 41460
schedule_after_task_execution = False
statsd_host = airflow-statsd
statsd_on = True
statsd_port = 9125
statsd_prefix = airflow

@andormarkus
Copy link
Contributor

When I turned on remote logs to CloudWatch it is causing stability issues as well.
We are running this DAG hourly.

Screen Shot 2021-07-22 at 08 21 27

@john-jac
Copy link
Contributor

Hi Folks...have there been any new insights into this issue?

@kdickinson87
Copy link

Has there been any movement in this, or any proposed fixes? I'm still getting the temp file location error. We are are 2.0.1, though I spun up a local container of 2.2.0, and still experienced this error, so that leads me to believe it still exists. Any updates?

@ephraimbuddy
Copy link
Contributor

We have fixed this on #18269 released in 2.2.0

@andormarkus
Copy link
Contributor

Hi @ephraimbuddy

I have tested it with Airflow 2.2.2 and the problem is still existing.

Example logs 1:

2021-11-18 21:19:01,060: INFO/MainProcess] Task airflow.executors.celery_executor.execute_command[53ca9d3f-b59e-4a3f-9f21-009c32db5473] received
[2021-11-18 21:19:01,082: INFO/ForkPoolWorker-16] Executing command in Celery: ['airflow', 'tasks', 'run', 'simple_dag', 'sleep', 'scheduled__2021-11-18T21:18:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/simple_dag.py']
[2021-11-18 21:19:01,082: INFO/ForkPoolWorker-16] Celery task ID: 53ca9d3f-b59e-4a3f-9f21-009c32db5473
[2021-11-18 21:19:01,123: INFO/ForkPoolWorker-16] Filling up the DagBag from /opt/airflow/dags/repo/dags/simple_dag.py
[2021-11-18 21:19:01,234: WARNING/ForkPoolWorker-16] Running <TaskInstance: simple_dag.sleep scheduled__2021-11-18T21:18:00+00:00 [queued]> on host airflow-worker-5997488b78-t2ftx
[2021-11-18 21:19:17,631: INFO/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[4ccd30db-274f-4f2d-a750-3df8ec6e856c] succeeded in 76.68239335156977s: None
[2021-11-18 21:19:17,852: ERROR/ForkPoolWorker-16] Failed to execute task Task received SIGTERM signal.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/executors/celery_executor.py", line 121, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 105, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 163, in _run_task_by_local_task_job
    run_job.run()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 245, in run
    self._execute()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/local_task_job.py", line 103, in _execute
    self.task_runner.start()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py", line 41, in start
    self.process = self._start_by_fork()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py", line 97, in _start_by_fork
    logging.shutdown()
  File "/usr/local/lib/python3.9/logging/__init__.py", line 2141, in shutdown
    h.flush()
  File "/home/airflow/.local/lib/python3.9/site-packages/watchtower/__init__.py", line 297, in flush
    q.join()
  File "/usr/local/lib/python3.9/queue.py", line 90, in join
    self.all_tasks_done.wait()
  File "/usr/local/lib/python3.9/threading.py", line 312, in wait
    waiter.acquire()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1413, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal

@ephraimbuddy ephraimbuddy reopened this Nov 20, 2021
@john-jac
Copy link
Contributor

Is is possible we're running into a CloudWatch quota?

Per the below docs There is a quota of 5 requests per second per log stream. Additional requests are throttled. This quota can't be changed.

https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html

@o-nikolas
Copy link
Contributor

Hey folks,

I was looking into this today, but I can't seem to reproduce it.

@andormarkus I see in your last post you're using 2.2.2 and are running some simple_dag (specifically a sleep task). I'm assuming that's a test dag. Do you mind sharing it to help me reproduce the error? Along with any other relevant environement configurations you're using. Thanks!

@andormarkus
Copy link
Contributor

Hi @o-nikolas

We are on helm chart 1.3.0 and airlfow 2.2.2-python3.9. Airflow is running on AWS EKS with celery executor (keda enabled). The logs attached in my previous comment was from the celery worker logs.

Please let me know if you need more information.

Relevant section from helm chart configuration

config:
  logging:
    colored_console_log: "False"
    remote_logging: "True"
    remote_log_conn_id: aws_default
    remote_base_log_folder: "cloudwatch://${log_group_arn}"

simple_dag

"""Sample DAG."""

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2020, 1, 1),
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}


def sleep() -> bool:
    """Sleep.

    Returns:
        bool: True
    """
    time.sleep(10)
    return True


with DAG("simple_dag", default_args=default_args, schedule_interval="* * * * *", catchup=False) as dag:
    t1 = PythonOperator(task_id="sleep", python_callable=sleep)

@rafidka
Copy link

rafidka commented Mar 1, 2022

I did some dive deep on this issue and root caused it.

TL;DR It is related to watchtower and has been fixed in version 2.0.0 and later versions. Airflow 2.2.4 now uses that version (see #19907). So, if you are using it, you should be good. If not, then you need to force install watchtower version 2.0.0 or later. Notice, however, that watchtower made some changes to the CloudWatchLogHandler __init__ method so you need to update the relevant code.


The issue is related to a combination of three factors: forking + threading + logging. This combination can lead to a deadlock when logs are being flushed after a task finishes execution. This means that the StandardTaskRunner will be stuck at this line. Now, since the task has actually finished (thus its state is success), but the process didn't yet exit (and will never since it is in a deadlock state), the heartbeat_callback will end up thinking that the task state was set externally and issue the following warning and then SIGTERM the task:

State of this instance has been externally set to success. Terminating instance.

Notice that this could cause further issues:

  • Could not read remote logs from log_group: This error happens when we don’t have the necessary log data in CloudWatch. This can easily happen with a task that writes logs at the end which gets interrupted by the SIGTERM and thus no log is published.
  • Celery command failed on host: Obviously, when a SIGTERM is sent, the process will exit with a non-zero code, and Airflow ends up generating this error here.
  • Tasks executing multiple times: In case a Celery Executor + SQS is used (as in Amazon MWAA for example), and since Airflow uses Celery's ack_late feature (see here), a SIGTERM signal will result in the task message not to be deleted from the SQS queue, and thus after its timeout, it will go back to the queue and will be picked again by another worker.

References

@andormarkus
Copy link
Contributor

Hi @rafidka, thanks, for the update.

Here is not my current findings:

Used versions:

airflow: 2.2.4
watchtower: 2.0.1

Test setup: I'm running 5 simple dag every in the past few days:

Screen Shot 2022-03-01 at 19 31 50

From flower perspective everything looks good:
Screen Shot 2022-03-01 at 19 33 12

I have checked the worker logs and few times per hour I get the following error messages:

[2022-03-01 18:21:24,954: ERROR/ForkPoolWorker-15] Failed to execute task Task received SIGTERM signal.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/executors/celery_executor.py", line 121, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 298, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 105, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 163, in _run_task_by_local_task_job
    run_job.run()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 246, in run
    self._execute()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/local_task_job.py", line 103, in _execute
    self.task_runner.start()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py", line 41, in start
    self.process = self._start_by_fork()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py", line 97, in _start_by_fork
    logging.shutdown()
  File "/usr/local/lib/python3.9/logging/__init__.py", line 2141, in shutdown
    h.flush()
  File "/home/airflow/.local/lib/python3.9/site-packages/watchtower/__init__.py", line 432, in flush
    q.join()
  File "/usr/local/lib/python3.9/queue.py", line 90, in join
    self.all_tasks_done.wait()
  File "/usr/local/lib/python3.9/threading.py", line 312, in wait
    waiter.acquire()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1415, in signal_handler

Worker logs for a failed task looks like this:

▶ kubectl -n airflow logs airflow-worker worker | grep ForkPoolWorker-16

[2022-03-01 19:08:00,654: INFO/ForkPoolWorker-16] Celery task ID: fee17d13-2423-4ed1-ab2f-3f1a3fd34551
[2022-03-01 19:08:00,709: INFO/ForkPoolWorker-16] Filling up the DagBag from /opt/airflow/dags/repo/dags/simple_dag_2.py
[2022-03-01 19:08:00,842: WARNING/ForkPoolWorker-16] Running <TaskInstance: simple_dag_2.sleep scheduled__2022-03-01T19:07:00+00:00 [queued]> on host airflow-worker-58b8d8789b-w7jwv
[2022-03-01 19:08:24,702: ERROR/ForkPoolWorker-16] Failed to execute task Task received SIGTERM signal.
[2022-03-01 19:08:25,170: INFO/ForkPoolWorker-16] Task airflow.executors.celery_executor.execute_command[fee17d13-2423-4ed1-ab2f-3f1a3fd34551] succeeded in 24.535810169007163s: None

@rafidka Where should I find the State of this instance has been externally set to success. Terminating instance. warning Message? I can not find in the worker or scheduler logs.

Based on my testing Celery command failed on host error was fixed with Airflow 2.2.0

@rafidka
Copy link

rafidka commented Mar 1, 2022

@andormarkus , hmm this is interesting. I didn't particularly test Airflow 2.2.4, but I tested 2.0.2 + watchtower 2.0.0 and beyond and I couldn't reproduce the issue, so I assumed since Airflow 2.2.4 is using watchtower 2.0.1, then the issue should be resolved. I will see if I can do some testing and try to reproduce this issue.

Where should I find the State of this instance has been externally set to success. Terminating instance. warning Message? I can not find in the worker or scheduler logs.

This is a bit tricky unfortunately. The thing is that when you configure Airflow to use CloudWatch logging, but the logging itself has issues, you are likely to miss some logs. On the other hand, when stop using CloudWatch logging, the issue itself disappears (i.e. this is a Heisenbug situation.) In my case, I had to modify Airflow source code locally and then use file based logging like this:

            with open('/tmp/local_task_job.py.log', 'a') as f:
                print(f"State of this instance has been externally set to {ti.state}. ", file=f)
                print(f'Dumping stack trace:', file=f)
                frame = inspect.currentframe()
                stack_trace = traceback.format_stack(frame)
                print('\n'.join(stack_trace), file=f)

@andormarkus
Copy link
Contributor

Hi @rafidka,

I'm so sorry, I was looking for this warning in Kubernetes not in CloudWatch.

Here are the warning in CloudWatch:
Screen Shot 2022-03-01 at 20 43 43

Here is an exported Log steam. In is very interesting: CloudWatch does not store the log level (info/warning/error), just to log message.

timestamp,message
1646162883080,Dependencies all met for <TaskInstance: simple_dag_1.sleep scheduled__2022-03-01T19:27:00+00:00 [queued]>
1646162883380,Dependencies all met for <TaskInstance: simple_dag_1.sleep scheduled__2022-03-01T19:27:00+00:00 [queued]>
1646162883380,"
--------------------------------------------------------------------------------"
1646162883380,Starting attempt 1 of 2
1646162883380,"
--------------------------------------------------------------------------------"
1646162883612,Executing <Task(PythonOperator): sleep> on 2022-03-01 19:27:00+00:00
1646162883618,Started process 163492 to run task
1646162883624,"Running: ['airflow', 'tasks', 'run', 'simple_dag_1', 'sleep', 'scheduled__2022-03-01T19:27:00+00:00', '--job-id', '70863', '--raw', '--subdir', 'DAGS_FOLDER/simple_dag_1.py', '--cfg-path', '/tmp/tmpth9fhum_', '--error-file', '/tmp/tmp4e258sys']"
1646162883625,Job 70863: Subtask sleep
1646162883853,Running <TaskInstance: simple_dag_1.sleep scheduled__2022-03-01T19:27:00+00:00 [running]> on host airflow-worker-58b8d8789b-w7jwv
1646162883926,"Exporting the following env vars:
[email protected]
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=simple_dag_1
AIRFLOW_CTX_TASK_ID=sleep
AIRFLOW_CTX_EXECUTION_DATE=2022-03-01T19:27:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-03-01T19:27:00+00:00"
1646162903946,Done. Returned value was: True
1646162903969,"Marking task as SUCCESS. dag_id=simple_dag_1, task_id=sleep, execution_date=20220301T192700, start_date=20220301T192803, end_date=20220301T192823"
1646162904149,State of this instance has been externally set to success. Terminating instance.
1646162904152,Received SIGTERM. Terminating subprocesses.
1646162904152,Sending Signals.SIGTERM to group 163492. PIDs of all processes in the group: [163492]
1646162904152,Sending the signal Signals.SIGTERM to group 163492
1646162904292,"Process psutil.Process(pid=163492, status='terminated', exitcode=1, started='19:28:03') (163492) terminated with exit code 1"

@rafidka
Copy link

rafidka commented Mar 1, 2022

@andormarkus , what you are reporting above is exactly the symptoms I've seen when there is a problem with logging. Could you please do a pip freeze and check the reported watchtower version?

@rafidka
Copy link

rafidka commented Mar 1, 2022

I just Airflow 2.2.4 and it is working fine for me. On Airflow 2.2.4:

[2022-03-01 14:05:19,784: INFO/ForkPoolWorker-16] Executing command in Celery: ['airflow', 'tasks', 'run', 'print', 'execute_fn', 'scheduled__2022-03-01T22:04:03.337259+00:00', '--local', '--subdir', 'DAGS_FOLDER/print.py']
[2022-03-01 14:05:19,784: INFO/ForkPoolWorker-16] Celery task ID: fd4cedea-bbaa-4d92-a536-e19aa0dfc34d
[2022-03-01 14:05:19,831: INFO/ForkPoolWorker-16] Filling up the DagBag from /root/airflow/dags/print.py
[2022-03-01 14:05:19,889: WARNING/ForkPoolWorker-16] Running <TaskInstance: print.execute_fn scheduled__2022-03-01T22:04:03.337259+00:00 [queued]> on host 000464fbb146
[2022-03-01 14:05:21,598: INFO/ForkPoolWorker-16] Task airflow.executors.celery_executor.execute_command[fd4cedea-bbaa-4d92-a536-e19aa0dfc34d] succeeded in 1.8837955820199568s: None

On Airflow 2.2.3:

[2022-03-01 14:07:55,928: INFO/ForkPoolWorker-16] Executing command in Celery: ['airflow', 'tasks', 'run', 'print', 'execute_fn', 'scheduled__2022-03-01T22:06:39.485677+00:00', '--local', '--subdir', 'DAGS_FOLDER/print.py']
[2022-03-01 14:07:55,928: INFO/ForkPoolWorker-16] Celery task ID: c646cb26-e28a-4b33-8346-0e4ca4060232
[2022-03-01 14:07:55,978: INFO/ForkPoolWorker-16] Filling up the DagBag from /root/airflow/dags/print.py
[2022-03-01 14:07:56,049: WARNING/ForkPoolWorker-16] Running <TaskInstance: print.execute_fn scheduled__2022-03-01T22:06:39.485677+00:00 [queued]> on host e2196b74de7f
[2022-03-01 14:08:01,439: ERROR/ForkPoolWorker-16] Failed to execute task Task received SIGTERM signal.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 121, in _execute_in_fork
    args.func(args)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 298, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 105, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 163, in _run_task_by_local_task_job
    run_job.run()
  File "/usr/local/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 245, in run
    self._execute()
  File "/usr/local/lib/python3.7/site-packages/airflow/jobs/local_task_job.py", line 103, in _execute
    self.task_runner.start()
  File "/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 41, in start
    self.process = self._start_by_fork()
  File "/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 97, in _start_by_fork
    logging.shutdown()
  File "/usr/lib64/python3.7/logging/__init__.py", line 2036, in shutdown
    h.flush()
  File "/usr/local/lib/python3.7/site-packages/watchtower/__init__.py", line 297, in flush
    q.join()
  File "/usr/lib64/python3.7/queue.py", line 89, in join
    self.all_tasks_done.wait()
  File "/usr/lib64/python3.7/threading.py", line 296, in wait
    waiter.acquire()
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1410, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal

For reference, this is the DAG I am testing with:

from datetime import timedelta

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

import os
from datetime import datetime

NUM_LINES = 10000
DAG_ID = os.path.basename(__file__).replace(".py", "")

@dag(dag_id=DAG_ID, schedule_interval=timedelta(minutes=1), catchup=False, start_date=days_ago(0), tags=['test'])
def print_dag():
    @task()
    def execute_fn():
        for i in range(0, NUM_LINES):
            print(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])

    execute_fn_t = execute_fn()

test_dag_d = print_dag()

@andormarkus
Copy link
Contributor

Hi @rafidka,

I’m running 5 ‘simple_dag.py’ parallel and I got one error every 5-10 minutes.

I will deploy your dag tomorrow.

Im running my code on Airflow 2.2.4 with watchtower 2.0.1

@rafidka
Copy link

rafidka commented Mar 1, 2022

I just tried a sleep DAG (similar to yours) and it also succeeded on Airflow 2.2.4:

[2022-03-01 14:25:43,323: INFO/MainProcess] Task airflow.executors.celery_executor.execute_command[9f11d899-b415-4541-8e4f-1221fa7b6b09] received
[2022-03-01 14:25:43,394: INFO/ForkPoolWorker-16] Executing command in Celery: ['airflow', 'tasks', 'run', 'sleep', 'execute_fn', 'scheduled__2022-03-01T22:24:29.637891+00:00', '--local', '--subdir', 'DAGS_FOLDER/sleep.py']
[2022-03-01 14:25:43,394: INFO/ForkPoolWorker-16] Celery task ID: 9f11d899-b415-4541-8e4f-1221fa7b6b09
[2022-03-01 14:25:43,438: INFO/ForkPoolWorker-16] Filling up the DagBag from /root/airflow/dags/sleep.py
[2022-03-01 14:25:43,498: WARNING/ForkPoolWorker-16] Running <TaskInstance: sleep.execute_fn scheduled__2022-03-01T22:24:29.637891+00:00 [queued]> on host 1161269d3561
[2022-03-01 14:25:54,463: INFO/ForkPoolWorker-16] Task airflow.executors.celery_executor.execute_command[9f11d899-b415-4541-8e4f-1221fa7b6b09] succeeded in 11.133019998000236s: None

This is my DAG:

import time

from datetime import timedelta

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

import os

DAG_ID = os.path.basename(__file__).replace(".py", "")

@dag(dag_id=DAG_ID, schedule_interval=timedelta(minutes=1), catchup=False, start_date=days_ago(0), tags=['test'])
def sleep_dag():
    @task()
    def execute_fn():
        time.sleep(10)

    execute_fn_t = execute_fn()

test_dag_d = sleep_dag()

I suspect your setup have some issue (perhaps some stale configuration or package.) I would start clean or -even better- use Docker if you aren't already.

@andormarkus
Copy link
Contributor

andormarkus commented Mar 3, 2022

Hello,

We are running our Airflow on AWS EKS with the latest helm chart and official docker image. I don't think it is setup issue.

I was able to reproduce it locally as well with docker compose:

  1. Download the official docker compose file curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.4/docker-compose.yaml .

  2. Create a test CloudWatch log group.

  3. Create dags directory with simple_dag.py 5 times.

  4. Edit the x-airflow-common.environment section in the docker-compose.yaml file the following way:
    Leave the default environment variables as is. Disable the loading of example dags with changing the following environment variable from true to false.

    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'

    Add the following environment variables. Adjust your region according to your preferences.

    AWS_DEFAULT_REGION: 'eu-central-1'
    AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
    AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
    AWS_SESSION_TOKEN: ${AWS_SESSION_TOKEN}
    AIRFLOW_CONN_AWS_DEFAULT: "aws://?region_name=eu-central-1"
    
    AIRFLOW__LOGGING__REMOTE_LOGGING: 'true'
    AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: AWS_DEFAULT
    AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: cloudwatch://<cloudwatch-log-group-arn>
    

    On the end it should looks like this:

    environment:
     &airflow-common-env
     AIRFLOW__CORE__EXECUTOR: CeleryExecutor
     AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
     AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
     AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
     AIRFLOW__CORE__FERNET_KEY: ''
     AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
     AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
     AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
     _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
    
     AWS_DEFAULT_REGION: 'eu-central-1'
     AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
     AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
     AWS_SESSION_TOKEN: ${AWS_SESSION_TOKEN}
     AIRFLOW_CONN_AWS_DEFAULT: "aws://?region_name=eu-central-1"
    
     AIRFLOW__LOGGING__REMOTE_LOGGING: 'true'
     AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: AWS_DEFAULT
     AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: cloudwatch://<cloudwatch-log-group-arn>
  5. Insert you AWS environment variables into terminal. (we are using AWS SSO to login etc...)

    export AWS_ACCESS_KEY_ID="########"
    export AWS_SECRET_ACCESS_KEY="########"
    export AWS_SESSION_TOKEN="########"
  6. Setting the right Airflow user: echo -e "AIRFLOW_UID=$(id -u)" > .env

  7. Initiate docker compose environment: docker compose up -d

  8. Login to webui http://localhost:8080/ and enable dags.
    Screen Shot 2022-03-03 at 09 29 51

  9. Check the worker logs after minimum 60 minutes. It only happens only 1-2% of the dag runs.

    ▶ docker logs airflow_dag_test-airflow-worker-1 --since=60m 2>&1 | grep ERROR
    [2022-03-03 08:34:21,817: ERROR/ForkPoolWorker-15] Failed to execute task Task received SIGTERM signal.
    [2022-03-03 08:38:22,608: ERROR/ForkPoolWorker-15] Failed to execute task Task received SIGTERM signal.
    [2022-03-03 09:01:21,553: ERROR/ForkPoolWorker-16] Failed to execute task Task received SIGTERM signal.
    [2022-03-03 09:11:21,887: ERROR/ForkPoolWorker-3] Failed to execute task Task received SIGTERM signal.
    [2022-03-03 09:11:21,898: ERROR/ForkPoolWorker-2] Failed to execute task Task received SIGTERM signal.
    [2022-03-03 09:15:21,876: ERROR/ForkPoolWorker-15] Failed to execute task Task received SIGTERM signal.
    [2022-03-03 09:15:21,876: ERROR/ForkPoolWorker-1] Failed to execute task Task received SIGTERM signal.

@rafidka
Copy link

rafidka commented Mar 3, 2022

@andormarkus , to avoid making assumptions, could you please share the exact code of simple_dag.py you used in your reproduction?

Also, it is worth noting that based on the setup you mentioned above, the local executor will be used instead of the Celery executor. The testing I done was on the Celery executor.

@andormarkus
Copy link
Contributor

@rafidka, I think you have misunderstood point 4. in my setup guide. I meant leave the default environment variables as is just change AIRFLOW__CORE__LOAD_EXAMPLES from 'true' -> 'false' and extend it new ones. I have edited that comment.

simple_dag.py is same as before.

"""Sample DAG."""

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2020, 1, 1),
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}


def sleep() -> bool:
    """Sleep.

    Returns:
        bool: True
    """
    time.sleep(20)
    return True


with DAG("simple_dag_1", default_args=default_args, schedule_interval="* * * * *", catchup=False) as dag:
    t1 = PythonOperator(task_id="sleep", python_callable=sleep)

@john-jac
Copy link
Contributor

john-jac commented Mar 4, 2022

Thanks @andormarkus ! Are you able to reproduce when using the celery executor?

@eladkal
Copy link
Contributor

eladkal commented Sep 30, 2022

Thanks to everyone trying to get to the root cause of this issue.
According to several reports here this issue is resolved and not reproducible any more thus I'm closing it.

@eladkal eladkal closed this as completed Sep 30, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.0 Issues Reported for 2.0 area:logging area:providers kind:bug This is a clearly a bug priority:medium Bug that should be fixed before next release but would not block a release provider:amazon-aws AWS/Amazon - related issues
Projects
None yet
Development

No branches or pull requests