Fix case where Executors fail to report failure, creating infinite loop #1220
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Finally tracked this one down! Closes #1199
When tasks run, they try to catch any errors and, if there are any, mark themselves failed. In addition,
SequentialExecutor
andLocalExecutor
both usesubprocess.Popen
to run Airflow commands, thenwait()
for the command to return. If an exception is raised, they mark the task as failed; otherwise they mark it as a success.The problem here is that
Popen
doesn't actually raise errors -- you have to check thereturncode
explicitly. That means that Executors always return 'SUCCESS' for their commands (even when the command fails).This is usually not an issue because a task's own status takes precedence over the executor's status; if the task reports failure and the executor reports success, failure wins. However, sometimes this IS a problem (as described in #1199) when an error is raised before the task even has a chance to run. In that case, the task reports no status and the executor reports
SUCCESS
. There is a trap for this case injobs.py
but unfortunately it leads to an infinite loop because the task is simply rescheduled.This PR:
SequentialExecutor
andLocalExecutor
to usesubprocess.check_call()
instead ofsubprocess.Popen().wait()
. Check_call raisesCalledProcessError
if the command returncode is not 0 OR if there is a genuine error calling the function.jobs.py
to see if anairflow run
command failed more than 3 times. If it does, it intervenes and marks the task as failed. This prevents the infinite loop.This DAG demonstrates the issue (inspired by #1168). It (illegally) changes the name of its subdag, which means the subdag can't be found and an error is raised before any tasks run. In the current Airflow master, this DAG loops forever.
Run it:
airflow backfill -s 2016-01-01 -e 2016-01-01
To support this, I have written a "light" dag unit tester but it is not part of this PR.