Skip to content

Commit

Permalink
Fix handling of deadlocked jobs
Browse files Browse the repository at this point in the history
- Raise an error when a backfill deadlocks

Deadlocked backfills didn’t raise AirflowExceptions, so
SubDagOperators didn’t recognize that their subdags
were failing.

- Fix bug with marking DagRuns as failed

- Let SchedulerJob mark DagRuns as deadlocked when there
are no TIs available; other deadlock metrics depend on TIs

- Adds unit tests.
  • Loading branch information
jlowin committed Apr 5, 2016
1 parent 4763720 commit 2e0421a
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 89 deletions.
3 changes: 2 additions & 1 deletion airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def process_subdir(subdir):
def get_dag(args):
dagbag = DagBag(process_subdir(args.subdir))
if args.dag_id not in dagbag.dags:
raise AirflowException('dag_id could not be found')
raise AirflowException(
'dag_id could not be found: {}'.format(args.dag_id))
return dagbag.dags[args.dag_id]


Expand Down
163 changes: 97 additions & 66 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ def process_dag(self, dag, executor):
skip_tis = {(ti[0], ti[1]) for ti in qry.all()}

descartes = [obj for obj in product(dag.tasks, active_runs)]
could_not_run = set()
self.logger.info('Checking dependencies on {} tasks instances, minus {} '
'skippable ones'.format(len(descartes), len(skip_tis)))
for task, dttm in descartes:
Expand All @@ -513,6 +514,23 @@ def process_dag(self, dag, executor):
elif ti.is_runnable(flag_upstream_failed=True):
self.logger.debug('Firing task: {}'.format(ti))
executor.queue_task_instance(ti, pickle_id=pickle_id)
else:
could_not_run.add(ti)

# this type of deadlock happens when dagruns can't even start and so
# the TI's haven't been persisted to the database.
if len(could_not_run) == len(descartes):
self.logger.error(
'Dag runs are deadlocked for DAG: {}'.format(dag.dag_id))
(session
.query(models.DagRun)
.filter(
models.DagRun.dag_id == dag.dag_id,
models.DagRun.state == State.RUNNING,
models.DagRun.execution_date.in_(active_runs))
.update(
{models.DagRun.state: State.FAILED},
synchronize_session='fetch'))

# Releasing the lock
self.logger.debug("Unlocking DAG (scheduler_lock)")
Expand Down Expand Up @@ -781,11 +799,12 @@ def _execute(self):

# Build a list of all instances to run
tasks_to_run = {}
failed = []
succeeded = []
started = []
wont_run = []
not_ready_to_run = set()
failed = set()
succeeded = set()
started = set()
skipped = set()
not_ready = set()
deadlocked = set()

for task in self.dag.tasks:
if (not self.include_adhoc) and task.adhoc:
Expand All @@ -800,67 +819,56 @@ def _execute(self):
session.commit()

# Triggering what is ready to get triggered
deadlocked = False
while tasks_to_run and not deadlocked:

not_ready.clear()
for key, ti in list(tasks_to_run.items()):

ti.refresh_from_db()
ignore_depends_on_past = (
self.ignore_first_depends_on_past and
ti.execution_date == (start_date or ti.start_date))

# Did the task finish without failing? -- then we're done
if (
ti.state in (State.SUCCESS, State.SKIPPED) and
key in tasks_to_run):
succeeded.append(key)
tasks_to_run.pop(key)
# The task was already marked successful or skipped by a
# different Job. Don't rerun it.
if key not in started:
if ti.state == State.SUCCESS:
succeeded.add(key)
tasks_to_run.pop(key)
continue
elif ti.state == State.SKIPPED:
skipped.add(key)
tasks_to_run.pop(key)
continue

# Is the task runnable? -- the run it
elif ti.is_queueable(
# Is the task runnable? -- then run it
if ti.is_queueable(
include_queued=True,
ignore_depends_on_past=ignore_depends_on_past,
flag_upstream_failed=True):
self.logger.debug('Sending {} to executor'.format(ti))
executor.queue_task_instance(
ti,
mark_success=self.mark_success,
pickle_id=pickle_id,
ignore_dependencies=self.ignore_dependencies,
ignore_depends_on_past=ignore_depends_on_past,
pool=self.pool)
ti.state = State.RUNNING
if key not in started:
started.append(key)
if ti in not_ready_to_run:
not_ready_to_run.remove(ti)

# Mark the task as not ready to run. If the set of tasks
# that aren't ready ever equals the set of tasks to run,
# then the backfill is deadlocked
started.add(key)

# Mark the task as not ready to run
elif ti.state in (State.NONE, State.UPSTREAM_FAILED):
not_ready_to_run.add(ti)
if not_ready_to_run == set(tasks_to_run.values()):
msg = 'BackfillJob is deadlocked: no tasks can be run.'
if any(
t.are_dependencies_met() !=
t.are_dependencies_met(
ignore_depends_on_past=True)
for t in tasks_to_run.values()):
msg += (
' Some of the tasks that were unable to '
'run have "depends_on_past=True". Try running '
'the backfill with the option '
'"ignore_first_depends_on_past=True" '
' or passing "-I" at the command line.')
self.logger.error(msg)
deadlocked = True
wont_run.extend(not_ready_to_run)
tasks_to_run.clear()
self.logger.debug('Added {} to not_ready'.format(ti))
not_ready.add(key)

self.heartbeat()
executor.heartbeat()

# If the set of tasks that aren't ready ever equals the set of
# tasks to run, then the backfill is deadlocked
if not_ready and not_ready == set(tasks_to_run):
deadlocked.update(tasks_to_run.values())
tasks_to_run.clear()

# Reacting to events
for key, state in list(executor.get_event_buffer().items()):
dag_id, task_id, execution_date = key
Expand All @@ -882,12 +890,12 @@ def _execute(self):

# task reports skipped
elif ti.state == State.SKIPPED:
wont_run.append(key)
skipped.add(key)
self.logger.error("Skipping {} ".format(key))

# anything else is a failure
else:
failed.append(key)
failed.add(key)
self.logger.error("Task instance {} failed".format(key))

tasks_to_run.pop(key)
Expand All @@ -899,18 +907,19 @@ def _execute(self):
if ti.state == State.SUCCESS:
self.logger.info(
'Task instance {} succeeded'.format(key))
succeeded.append(key)
succeeded.add(key)
tasks_to_run.pop(key)

# task reports failure
elif ti.state == State.FAILED:
self.logger.error("Task instance {} failed".format(key))
failed.append(key)
failed.add(key)
tasks_to_run.pop(key)

# this probably won't ever be triggered
elif key in not_ready_to_run:
continue
elif ti in not_ready:
self.logger.info(
"{} wasn't expected to run, but it did".format(ti))

# executor reports success but task does not - this is weird
elif ti.state not in (
Expand Down Expand Up @@ -939,29 +948,51 @@ def _execute(self):
ti.handle_failure(msg)
tasks_to_run.pop(key)

msg = (
"[backfill progress] "
"waiting: {0} | "
"succeeded: {1} | "
"kicked_off: {2} | "
"failed: {3} | "
"wont_run: {4} ").format(
len(tasks_to_run),
len(succeeded),
len(started),
len(failed),
len(wont_run))
msg = ' | '.join([
"[backfill progress]",
"waiting: {0}",
"succeeded: {1}",
"kicked_off: {2}",
"failed: {3}",
"skipped: {4}",
"deadlocked: {5}"
]).format(
len(tasks_to_run),
len(succeeded),
len(started),
len(failed),
len(skipped),
len(deadlocked))
self.logger.info(msg)

executor.end()
session.close()

err = ''
if failed:
msg = (
"------------------------------------------\n"
"Some tasks instances failed, "
"here's the list:\n{}".format(failed))
raise AirflowException(msg)
self.logger.info("All done. Exiting.")
err += (
"---------------------------------------------------\n"
"Some task instances failed:\n{}\n".format(failed))
if deadlocked:
err += (
'---------------------------------------------------\n'
'BackfillJob is deadlocked.')
deadlocked_depends_on_past = any(
t.are_dependencies_met() != t.are_dependencies_met(
ignore_depends_on_past=True)
for t in deadlocked)
if deadlocked_depends_on_past:
err += (
'Some of the deadlocked tasks were unable to run because '
'of "depends_on_past" relationships. Try running the '
'backfill with the option '
'"ignore_first_depends_on_past=True" or passing "-I" at '
'the command line.')
err += ' These tasks were unable to run:\n{}\n'.format(deadlocked)
if err:
raise AirflowException(err)

self.logger.info("Backfill done. Exiting.")


class LocalTaskJob(BaseJob):
Expand Down
3 changes: 2 additions & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2398,7 +2398,8 @@ def get_active_runs(self):
# AND there are unfinished tasks...
any(ti.state in State.unfinished() for ti in task_instances) and
# AND none of them have dependencies met...
all(not ti.are_dependencies_met() for ti in task_instances
all(not ti.are_dependencies_met(session=session)
for ti in task_instances
if ti.state in State.unfinished()))

for run in active_runs:
Expand Down
18 changes: 18 additions & 0 deletions airflow/utils/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,26 @@ def runnable(cls):
cls.QUEUED
]

@classmethod
def finished(cls):
"""
A list of states indicating that a task started and completed a
run attempt. Note that the attempt could have resulted in failure or
have been interrupted; in any case, it is no longer running.
"""
return [
cls.SUCCESS,
cls.SHUTDOWN,
cls.FAILED,
cls.SKIPPED,
]

@classmethod
def unfinished(cls):
"""
A list of states indicating that a task either has not completed
a run or has not even started.
"""
return [
cls.NONE,
cls.QUEUED,
Expand Down
21 changes: 21 additions & 0 deletions tests/dags/test_issue_1225.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,24 @@ def fail():
depends_on_past=True,
dag=dag6,)
dag6_task2.set_upstream(dag6_task1)


# DAG tests that a deadlocked subdag is properly caught
dag7 = DAG(dag_id='test_subdag_deadlock', default_args=default_args)
subdag7 = DAG(dag_id='test_subdag_deadlock.subdag', default_args=default_args)
subdag7_task1 = PythonOperator(
task_id='test_subdag_fail',
dag=subdag7,
python_callable=fail)
subdag7_task2 = DummyOperator(
task_id='test_subdag_dummy_1',
dag=subdag7,)
subdag7_task3 = DummyOperator(
task_id='test_subdag_dummy_2',
dag=subdag7)
dag7_subdag1 = SubDagOperator(
task_id='subdag',
dag=dag7,
subdag=subdag7)
subdag7_task1.set_downstream(subdag7_task2)
subdag7_task2.set_downstream(subdag7_task3)
Loading

0 comments on commit 2e0421a

Please sign in to comment.