Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix on_failure_callback when task receive SIGKILL #15537

Merged
merged 4 commits into from
May 5, 2021

Conversation

ephraimbuddy
Copy link
Contributor

@ephraimbuddy ephraimbuddy commented Apr 26, 2021

This PR fixes a case where a task would not call the on_failure_callback
when there's a case of OOM. The issue was that task PID was being set
at the wrong place and the local task job heartbeat was not checking the
correct PID of the process runner and task.

Now, instead of setting the task PID in check_and_change_state_before_execution method,
it's now set correctly at the _run_raw_task method

Closes: #11086


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Apr 26, 2021
@ephraimbuddy ephraimbuddy requested a review from houqp April 26, 2021 17:46
@kaxil kaxil added this to the Airflow 2.0.3 milestone Apr 28, 2021
tests/jobs/test_local_task_job.py Outdated Show resolved Hide resolved
@uranusjr
Copy link
Member

Can test_process_kill_call_on_failure_callback and test_process_sigkill_call_on_failure_callback be combined with pytest.mark.parametrize()? They look much too similar to me. (Potentially test_mark_success_on_success_callback as well.)

@ephraimbuddy
Copy link
Contributor Author

Can test_process_kill_call_on_failure_callback and test_process_sigkill_call_on_failure_callback be combined with pytest.mark.parametrize()? They look much too similar to me. (Potentially test_mark_success_on_success_callback as well.)

Have parameterized them except test_mark_success_on_success_callback because of the many if statements I had to deal with

This PR fixes a case where a task would not call the on_failure_callback
when there's a case of OOM. The issue was that task pid was being set
at the wrong place and the local task job heartbeat was not checking the
correct pid of the process runner and task.

Now, instead of setting the task pid in check_and_change_state_before_execution,
it's now set correctly at the _run_raw_task method
@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label May 5, 2021
@github-actions
Copy link

github-actions bot commented May 5, 2021

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@ashb ashb merged commit 817b599 into apache:master May 5, 2021
@ashb ashb deleted the fix-task-sigkill branch May 5, 2021 20:02
@potiuk potiuk removed this from the Airflow 2.0.3 milestone May 9, 2021
@huozhanfeng
Copy link
Contributor

huozhanfeng commented Jun 16, 2021

The PR will cause dags that use hive operations to fail because they can't get the right PID.

the hive task processes info like these

1、airflow  30539  1721  9 16:31 ?        00:00:00 airflow task supervisor: ['airflow', 'tasks', 'run', 'GAEA_hive_task_test'...
2、root     30542 30539  0 16:31 ?        00:00:00 sudo -E -H -u user airflow tasks run GAEA_hive_task_test...
3、user+ 30544 30542 48 16:31 ?        00:00:02 /usr/local/python3/bin/python3.7 /bin/airflow tasks run GAEA_hive_task_test...
4、user+ 30555 30544 99 16:31 ?        00:00:07 /usr/local/java/bin/java -Xmx1024m -Dhdp.version=2.6.5.0-292 -Djava.net.preferIPv4Stack=true ...

The error log as follows:

[2021-06-19 16:31:52,619] {hive.py:256} INFO - Logging initialized using configuration in file:/etc/hive/2.6.5.0-292/0/hive-log4j.properties
[2021-06-19 16:31:54,304] {local_task_job.py:193} WARNING - Recorded pid 30544 does not match the current pid 30542
[2021-06-19 16:31:54,310] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 30542
[2021-06-19 16:31:54,326] {taskinstance.py:1264} ERROR - Received SIGTERM. Terminating subprocesses.
[2021-06-19 16:31:54,326] {logging_mixin.py:104} INFO - Killing the Hive job
[2021-06-19 16:32:54,355] {process_utils.py:113} WARNING - process psutil.Process(pid=30542, name='sudo', status='sleeping', started='16:31:48') did not respond to SIGTERM. Trying SIGKILL
[2021-06-19 16:32:54,357] {process_utils.py:113} WARNING - process psutil.Process(pid=30555, name='java', status='zombie', started='16:31:50') did not respond to SIGTERM. Trying SIGKILL
[2021-06-19 16:32:54,357] {process_utils.py:113} WARNING - process psutil.Process(pid=30544, name='airflow', status='sleeping', started='16:31:48') did not respond to SIGTERM. Trying SIGKILL
[2021-06-19 16:32:54,375] {process_utils.py:66} INFO - Process psutil.Process(pid=30542, name='sudo', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='16:31:48') (30542) terminated with exit code Negsignal.SIGKILL
[2021-06-19 16:32:54,403] {process_utils.py:66} INFO - Process psutil.Process(pid=30555, name='java', status='terminated', started='16:31:50') (30555) terminated with exit code None
[2021-06-19 16:32:54,403] {process_utils.py:66} INFO - Process psutil.Process(pid=30544, name='airflow', status='terminated', started='16:31:48') (30544) terminated with exit code None
[2021-06-19 16:32:54,404] {standard_task_runner.py:130} ERROR - Job 1354 was killed before it finished (likely due to running out of memory)

The demo with hive operation runs success after deleting all the code of this PR. So please consider improving the PR or repeal it.

@ephraimbuddy
Copy link
Contributor Author

The PR will cause dags that use hive operations to fail because they can't get the right PID.

the hive task process info like these
1、airflow 20391 20170 11 17:41 ? 00:00:00 airflow task supervisor: ['airflow...
2、root 20394 20391 0 17:41 ? 00:00:00 sudo -E -H -u user airflow tasks run ...
3、user+ 20396 20394 61 17:41 ? 00:00:02 /usr/local/python3/bin/python3.7 /bin/airflow tasks run...
4、user+ 20407 20396 99 17:41 ? 00:00:04 /usr/local/java/bin/java -Xmx1024m -Dhdp.version=2.6.5.0-292 -Djava.net.preferIPv4...

The error log as follows:

[2021-06-15 16:40:10,222] {hive.py:256} INFO - Logging initialized using configuration in file:/etc/hive/2.6.5.0-292/0/hive-log4j.properties
[2021-06-15 16:40:11,871] {local_task_job.py:193} WARNING - Recorded pid 30962 does not match the current pid 30960
[2021-06-15 16:40:11,877] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 30960
[2021-06-15 16:40:11,901] {taskinstance.py:1264} ERROR - Received SIGTERM. Terminating subprocesses.
[2021-06-15 16:40:11,902] {logging_mixin.py:104} INFO - Killing the Hive job

The demo with hive operation runs success after deleting all the code of this PR. So please consider improving the PR or repeal it.

Interesting. cc: @ashb

@huozhanfeng
Copy link
Contributor

huozhanfeng commented Jun 18, 2021

The PR will cause dags that use hive operations to fail because they can't get the right PID.
the hive task process info like these
1、airflow 20391 20170 11 17:41 ? 00:00:00 airflow task supervisor: ['airflow...
2、root 20394 20391 0 17:41 ? 00:00:00 sudo -E -H -u user airflow tasks run ...
3、user+ 20396 20394 61 17:41 ? 00:00:02 /usr/local/python3/bin/python3.7 /bin/airflow tasks run...
4、user+ 20407 20396 99 17:41 ? 00:00:04 /usr/local/java/bin/java -Xmx1024m -Dhdp.version=2.6.5.0-292 -Djava.net.preferIPv4...
The error log as follows:
[2021-06-15 16:40:10,222] {hive.py:256} INFO - Logging initialized using configuration in file:/etc/hive/2.6.5.0-292/0/hive-log4j.properties
[2021-06-15 16:40:11,871] {local_task_job.py:193} WARNING - Recorded pid 30962 does not match the current pid 30960
[2021-06-15 16:40:11,877] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 30960
[2021-06-15 16:40:11,901] {taskinstance.py:1264} ERROR - Received SIGTERM. Terminating subprocesses.
[2021-06-15 16:40:11,902] {logging_mixin.py:104} INFO - Killing the Hive job
The demo with hive operation runs success after deleting all the code of this PR. So please consider improving the PR or repeal it.

Interesting. cc: @ashb

@ashb @jedcunningham hello, could you please help to take a look? thanks.

@ashb
Copy link
Member

ashb commented Jun 18, 2021

In that example I don't see pid 30960 anywhere -- do you know what process it was it?

I suspect this might be to do with impersonation, rather than anything hive in particular.

@huozhanfeng
Copy link
Contributor

huozhanfeng commented Jun 19, 2021

In that example I don't see pid 30960 anywhere -- do you know what process it was it?

I suspect this might be to do with impersonation, rather than anything hive in particular.

@ashb Sorry for my mistake of copying the wrong logs. I have updated it by using a new airflow task. Agree with you about your suspicion. In my env, the shell-type task can run normally, and meanwhile, tasks that have subprocess all fail to run.

@ashb ashb added this to the Airflow 2.1.2 milestone Jun 21, 2021
@ephraimbuddy
Copy link
Contributor Author

Hi @huozhanfeng, I'm taking a look at fixing this but would like it if you can give me a reproducible step using Bash or python operator.

@huozhanfeng
Copy link
Contributor

Hi @huozhanfeng, I'm taking a look at fixing this but would like it if you can give me a reproducible step using Bash or python operator.

Sure. In my opinion, you can mock a bash or python operator which contains a java sub-process to reproduce it. In my env, python operations that use the apache-sqoop tool fail to run, and hive operators fail to run.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler full tests needed We need to run full set of tests for this PR to merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

on_failure_callback not called when task receives termination signal
7 participants