Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State of this instance has been externally set to up_for_retry. Terminating instance. #16573

Closed
sodafountain opened this issue Jun 21, 2021 · 7 comments · Fixed by #19375
Closed
Labels
affected_version:2.1 Issues Reported for 2.1 area:core kind:bug This is a clearly a bug

Comments

@sodafountain
Copy link

Apache Airflow version: 2.0.1

Kubernetes version (if you are using kubernetes) (use kubectl version): 1.18.14

Environment:

Cloud provider or hardware configuration: Azure
OS (e.g. from /etc/os-release):
Kernel (e.g. uname -a):
Install tools:
Others:

What happened:

An occasional airflow tasks fails with the following error

[2021-06-21 05:39:48,424] {local_task_job.py:184} WARNING - State of this instance has been externally set to up_for_retry. Terminating instance.
[2021-06-21 05:39:48,425] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 259
[2021-06-21 05:39:48,426] {taskinstance.py:1238} ERROR - Received SIGTERM. Terminating subprocesses.
[2021-06-21 05:39:48,426] {bash.py:185} INFO - Sending SIGTERM signal to bash process group
[2021-06-21 05:39:49,133] {process_utils.py:66} INFO - Process psutil.Process(pid=329, status='terminated', started='04:32:14') (329) terminated with exit code None
[2021-06-21 05:39:50,278] {taskinstance.py:1454} ERROR - Task received SIGTERM signal
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1284, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1309, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/bash.py", line 171, in execute
    for raw_line in iter(self.sub_process.stdout.readline, b''):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1240, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal

There is no indication as to what caused this error. The worker instance is healthy and task did not hit the task timeout.

What you expected to happen:

Task to complete successfully. If a task fad to fail for unavoidable reason (like timeout), it would be helpful to provide the reason for the failure.

How to reproduce it:

I'm not able to reproduce it consistently. It happens every now and then with the same error as provided above.

I'm also wish to know how to debug these failures

@sodafountain sodafountain added the kind:bug This is a clearly a bug label Jun 21, 2021
@eladkal
Copy link
Contributor

eladkal commented Jul 1, 2021

Does it happen to all tasks or a specific task/dag?
Without being able to reproduce the issue it will be impossible to understand what needs to be fixed.

@peay
Copy link

peay commented Jul 26, 2021

@eladkal not sure if it is the same issue, but I am seeing the same symptom on 2.1.1, and can reproduce easily by letting my DAGs run for an hour or two.

This affects tasks randomly in my DAGs, usually after running for a few minutes. I have a single Celery worker pod which was healthy whenever this has occured so far. Airflow audit log table has no indication that the task state was changed.

Here's a sample from a task log, where I have activated SQLAlchemy query logging.

# Task starting up
--------------------------------------------------------------------------------
[2021-07-23 11:11:51,133] {taskinstance.py:1088} INFO - Starting attempt 1 of 4
[2021-07-23 11:11:51,133] {taskinstance.py:1089} INFO - 
--------------------------------------------------------------------------------
[2021-07-23 11:11:51,143] {base.py:727} INFO - BEGIN (implicit)
[2021-07-23 11:11:51,144] {base.py:1234} INFO - SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id 
FROM task_instance 
WHERE task_instance.task_id = %(param_1)s AND task_instance.dag_id = %(param_2)s AND task_instance.execution_date = %(param_3)s
[2021-07-23 11:11:51,144] {base.py:1239} INFO - {'param_1': 'task-name-1', 'param_2': 'dag-name-1', 'param_3': DateTime(2021, 7, 13, 19, 0, 0, tzinfo=Timezone('UTC'))}
[2021-07-23 11:11:51,147] {base.py:1234} INFO - INSERT INTO log (dttm, dag_id, task_id, event, execution_date, owner, extra) VALUES (%(dttm)s, %(dag_id)s, %(task_id)s, %(event)s, %(execution_date)s, %(owner)s, %(extra)s) RETURNING log.id
[2021-07-23 11:11:51,148] {base.py:1239} INFO - {'dttm': datetime.datetime(2021, 7, 23, 11, 11, 51, 133209, tzinfo=Timezone('UTC')), 'dag_id': 'dag-name-1', 'task_id': 'task-name-1', 'event': 'running', 'execution_date': DateTime(2021, 7, 13, 19, 0, 0, tzinfo=Timezone('UTC')), 'owner': 'airflow', 'extra': None}
[2021-07-23 11:11:51,150] {base.py:1234} INFO - UPDATE task_instance SET start_date=%(start_date)s, state=%(state)s, try_number=%(try_number)s, hostname=%(hostname)s, job_id=%(job_id)s WHERE task_instance.task_id = %(task_instance_task_id)s AND task_instance.dag_id = %(task_instance_dag_id)s AND task_instance.execution_date = %(task_instance_execution_date)s
[2021-07-23 11:11:51,150] {base.py:1239} INFO - {'start_date': datetime.datetime(2021, 7, 23, 11, 11, 51, 122723, tzinfo=Timezone('UTC')), 'state': 'running', 'try_number': 1, 'hostname': 'airflow-worker-general-b899585b8-s5lx9', 'job_id': 773, 'task_instance_task_id': 'task-name-1', 'task_instance_dag_id': 'dag-name-1', 'task_instance_execution_date': DateTime(2021, 7, 13, 19, 0, 0, tzinfo=Timezone('UTC'))}
[2021-07-23 11:11:51,151] {base.py:769} INFO - COMMIT

# At this point, the task state has been set and committed to `running`
# Omitting a few minutes of logs

...

# Task has been running for a few minutes OK
[2021-07-23 11:15:51,821] {base.py:769} INFO - COMMIT
[2021-07-23 11:15:51,838] {base.py:727} INFO - BEGIN (implicit)

# This apparently correctly says task instance state is running, since we don't stop here.
[2021-07-23 11:15:51,839] {base.py:1234} INFO - SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id 
FROM task_instance 
WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = %(task_id_1)s AND task_instance.execution_date = %(execution_date_1)s 
 LIMIT %(param_1)s
[2021-07-23 11:15:51,839] {base.py:1239} INFO - {'dag_id_1': 'dag-name-1', 'task_id_1': 'task-name-1', 'execution_date_1': DateTime(2021, 7, 13, 19, 0, 0, tzinfo=Timezone('UTC')), 'param_1': 1}
[2021-07-23 11:15:51,842] {base.py:769} INFO - COMMIT

# Task-specific logs
[2021-07-23 11:15:53,022] {yarn.py:94} INFO - application_1624488826511_18729: RUNNING
[2021-07-23 11:15:58,030] {yarn.py:94} INFO - application_1624488826511_18729: RUNNING
[2021-07-23 11:16:03,037] {yarn.py:94} INFO - application_1624488826511_18729: RUNNING
[2021-07-23 11:16:08,044] {yarn.py:94} INFO - application_1624488826511_18729: RUNNING
[2021-07-23 11:16:13,054] {yarn.py:94} INFO - application_1624488826511_18729: RUNNING
[2021-07-23 11:16:18,062] {yarn.py:94} INFO - application_1624488826511_18729: RUNNING

[2021-07-23 11:16:21,863] {base.py:727} INFO - BEGIN (implicit)

# Still OK
[2021-07-23 11:16:21,863] {base.py:1234} INFO - SELECT job.id AS job_id, job.dag_id AS job_dag_id, job.state AS job_state, job.job_type AS job_job_type, job.start_date AS job_start_date, job.end_date AS job_end_date, job.latest_heartbeat AS job_latest_heartbeat, job.executor_class AS job_executor_class, job.hostname AS job_hostname, job.unixname AS job_unixname 
FROM job 
WHERE job.id = %(param_1)s AND job.job_type IN (%(job_type_1)s)
[2021-07-23 11:16:21,864] {base.py:1239} INFO - {'param_1': 773, 'job_type_1': 'LocalTaskJob'}
[2021-07-23 11:16:21,866] {base.py:1234} INFO - UPDATE job SET latest_heartbeat=%(latest_heartbeat)s WHERE job.id = %(job_id)s
[2021-07-23 11:16:21,866] {base.py:1239} INFO - {'latest_heartbeat': datetime.datetime(2021, 7, 23, 11, 15, 51, 821303, tzinfo=Timezone('UTC')), 'job_id': 773}
[2021-07-23 11:16:21,867] {base.py:769} INFO - COMMIT
[2021-07-23 11:16:21,879] {base.py:727} INFO - BEGIN (implicit)
[2021-07-23 11:16:21,880] {base.py:1234} INFO - SELECT job.id AS job_id, job.dag_id AS job_dag_id, job.state AS job_state, job.job_type AS job_job_type, job.start_date AS job_start_date, job.end_date AS job_end_date, job.latest_heartbeat AS job_latest_heartbeat, job.executor_class AS job_executor_class, job.hostname AS job_hostname, job.unixname AS job_unixname 
FROM job 
WHERE job.id = %(param_1)s AND job.job_type IN (%(job_type_1)s)
[2021-07-23 11:16:21,880] {base.py:1239} INFO - {'param_1': 773, 'job_type_1': 'LocalTaskJob'}
[2021-07-23 11:16:21,882] {base.py:769} INFO - COMMIT
[2021-07-23 11:16:21,894] {base.py:727} INFO - BEGIN (implicit)

# This time, 30s later, it seems this says the task instance is `up_for_retry`, but why?
# I believe this query is https://github.com/apache/airflow/blob/2.1.1/airflow/jobs/local_task_job.py#L181
[2021-07-23 11:16:21,895] {base.py:1234} INFO - SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id 
FROM task_instance 
WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = %(task_id_1)s AND task_instance.execution_date = %(execution_date_1)s 
 LIMIT %(param_1)s
[2021-07-23 11:16:21,895] {base.py:1239} INFO - {'dag_id_1': 'dag-name-1', 'task_id_1': 'task-name-1', 'execution_date_1': DateTime(2021, 7, 13, 19, 0, 0, tzinfo=Timezone('UTC')), 'param_1': 1}
[2021-07-23 11:16:21,898] {base.py:769} INFO - COMMIT

# Boom
[2021-07-23 11:16:21,899] {local_task_job.py:199} WARNING - State of this instance has been externally set to up_for_retry. Terminating instance.
[2021-07-23 11:16:21,900] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 9303
[2021-07-23 11:16:21,900] {taskinstance.py:1284} ERROR - Received SIGTERM. Terminating subprocesses.

I have reviewed the logs for the scheduler, web server and worker with SQLAlchemy query logging as well in order to try and determine where the state is being altered, and found nothing... there is no UPDATE that sets the state to up_for_retry, whether for this DAG/task, or in general over all task instances -- there's just never a query parameter state set to up_for_retry for UPDATEs that I could see. I can provide those logs if needed.

At this stage, I am rather puzzled as to what is going on here. Are there more logs/checks I could enable to understand what's going on?

@peay
Copy link

peay commented Jul 27, 2021

After more investigation, I was able to get to the bottom of it. I had missed the actual SQLAlchemy queries updating the task instance/task fail, as they are not performed by either the scheduler, web server or worker, but by the DAG processor (as part of file-level callbacks), whose logs are only available locally in my setup.

In my case, the scheduler thinks the job is a zombie. I had also missed that as the log message is at INFO level, and does not include the task ID nor the dag ID. Maybe the representation of the task instance should be used to provide more context and make searching logs easier?

[2021-07-27 09:54:44,249] {dag_processing.py:1156} INFO
   Detected zombie job:
   {'full_filepath': '/opt/dags/dags.py', 'msg': 'Detected as zombie', 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f7362fe5510>, 'is_failure_callback': True}

The scheduler is marking the job as failed in adopt_or_reset_orphaned_tasks, which marks as failed all jobs that have not sent a heartbeat in the last scheduler_health_check_threshold=30s.

This is apparently caused by a combination of two facts:

  • I had set job_heartbeat_sec to 30s, to avoid too much pressure on the database as my jobs are long.

  • Whenever the job heartbeats, it sets latest_heartbeat to 30s in the past, as shown in the following database logs. I am not sure if this is on purpose or a bug, but it certainly looks suspicious.

    2021-07-27 09:53:21 UTC  UPDATE job SET latest_heartbeat='2021-07-27T09:52:51.398617+00:00'::timestamptz WHERE job.id = 4451
    2021-07-27 09:53:51 UTC  UPDATE job SET latest_heartbeat='2021-07-27T09:53:21.480933+00:00'::timestamptz WHERE job.id = 4451
    2021-07-27 09:54:21 UTC  UPDATE job SET latest_heartbeat='2021-07-27T09:53:51.566774+00:00'::timestamptz WHERE job.id = 4451

In this example, adopt_or_reset_orphaned_tasks then ran 15s after the last heartbeat and did:

2021-07-27 09:54:36 UTC  UPDATE job SET state='failed' WHERE job.state = 'running' AND job.latest_heartbeat < '2021-07-27T09:54:06.555688+00:00'::timestamptz

From my limited understanding, there seem to be two issues:

  • latest_heartbeat is set to 30s in the past instead of the current time point, as I would expect.

  • Although setting job_heartbeat_sec=30s makes things worse by increasing the odds of this occurring, it seems like this can occur as soon as job_heartbeat_sec > 15s. Generally, it seems odd that job_heartbeat_sec is not used in adopt_or_reset_orphaned_tasks instead of scheduler_health_check_threshold. In particular, if job_heartbeat_sec is much larger than scheduler_health_check_threshold, then won't adopt_or_reset_orphaned_tasks fail most jobs?

@sodafountain
Copy link
Author

sodafountain commented Jul 28, 2021

Thanks for your detailed explanation @peay

This makes lot of sense. I think setting AIRFLOW__SCHEDULER__ORPHANED_TASKS_CHECK_INTERVAL to an appropriate value may help reduce these false positives.

In my case, I was connecting to the database through a load balancer and load balancer sometimes runs out of ports and drops connection causing the db connections from airflow to fail. Some of these connection failures just happened to be the heartbeats from these tasks. Ever since, the db connection failures has been fixed, I'm not seeing this error either in my setup.

Thanks once again.

@collinmcnulty
Copy link
Contributor

I just experienced this as well and was able to use @peay's answer. Perhaps there should be some sort of warning or check to make sure job_heartbeat_sec is no higher than scheduler_health_check_threshold

@eladkal
Copy link
Contributor

eladkal commented Aug 26, 2021

I just experienced this as well and was able to use @peay's answer. Perhaps there should be some sort of warning or check to make sure job_heartbeat_sec is no higher than scheduler_health_check_threshold

I didn't review the discussion here yet but if you have a fix (documentation / code change) we welcome PRs.

@crazyproger
Copy link

Just found failed task with exactly this message. Airflow 2.1.3.
first task attempt end of log:

...
[2021-09-02 15:38:18,542] {python.py:151} INFO - Done. Returned value was: None
[2021-09-02 15:38:18,554] {taskinstance.py:1218} INFO - Marking task as SUCCESS. dag_id=<dag_id>, task_id=process, execution_date=20210809T150000, start_date=20210902T153811, end_date=20210902T153818
[2021-09-02 15:38:18,641] {local_task_job.py:151} INFO - Task exited with return code 0
[2021-09-02 15:38:18,651] {taskinstance.py:1512} INFO - Marking task as FAILED. dag_id=<dag_id>, task_id=process, execution_date=20210809T150000, start_date=20210902T153818, end_date=20210902T153818
[2021-09-02 15:38:18,749] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check

Second attempt log(full):

*** Log file does not exist: /opt/airflow/logs/ <dag_id>/process/2021-08-09T15:00:00+00:00/2.log
*** Fetching from: http://<worker_host>:8793/log/ <dag_id>/process/2021-08-09T15:00:00+00:00/2.log

[2021-09-02 15:38:18,574] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: <dag_id>.process 2021-08-09T15:00:00+00:00 [queued]>
[2021-09-02 15:38:18,603] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance:  <dag_id>.process 2021-08-09T15:00:00+00:00 [queued]>
[2021-09-02 15:38:18,603] {taskinstance.py:1094} INFO - 
--------------------------------------------------------------------------------
[2021-09-02 15:38:18,603] {taskinstance.py:1095} INFO - Starting attempt 2 of 2
[2021-09-02 15:38:18,604] {taskinstance.py:1096} INFO - 
--------------------------------------------------------------------------------
[2021-09-02 15:38:18,619] {taskinstance.py:1114} INFO - Executing <Task(PythonOperator): process> on 2021-08-09T15:00:00+00:00
[2021-09-02 15:38:18,625] {standard_task_runner.py:52} INFO - Started process 18717 to run task
[2021-09-02 15:38:18,628] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', ' <dag_id>', 'process', '2021-08-09T15:00:00+00:00', '--job-id', '46046825', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/datalocker/upload_dags/inapps_hourly_6.py', '--cfg-path', '/tmp/tmpl8uwjzk1', '--error-file', '/tmp/tmpiq4kx8a0']
[2021-09-02 15:38:18,629] {standard_task_runner.py:77} INFO - Job 46046825: Subtask process
[2021-09-02 15:38:23,682] {local_task_job.py:209} WARNING - State of this instance has been externally set to failed. Terminating instance.
[2021-09-02 15:38:23,684] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 18717
[2021-09-02 15:38:27,059] {process_utils.py:66} INFO - Process psutil.Process(pid=18717, status='terminated', exitcode=1, started='15:38:18') (18717) terminated with exit code 1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.1 Issues Reported for 2.1 area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants