Skip to content

Commit

Permalink
Rewrite BackfillJob logic for clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
jlowin committed Apr 3, 2016
1 parent 6516a24 commit c1eb83a
Showing 1 changed file with 77 additions and 65 deletions.
142 changes: 77 additions & 65 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ def import_errors(self, dagbag):
filename=filename, stacktrace=stacktrace))
session.commit()


def schedule_dag(self, dag):
"""
This method checks whether a new DagRun needs to be created
Expand Down Expand Up @@ -506,6 +505,7 @@ def process_dag(self, dag, executor):
if task.adhoc or (task.task_id, dttm) in skip_tis:
continue
ti = TI(task, dttm)

ti.refresh_from_db()
if ti.state in (
State.RUNNING, State.QUEUED, State.SUCCESS, State.FAILED):
Expand Down Expand Up @@ -594,7 +594,7 @@ def prioritize_queued(self, session, executor, dagbag):

queue_size = len(tis)
self.logger.info("Pool {pool} has {open_slots} slots, {queue_size} "
"task instances in queue".format(**locals()))
"task instances in queue".format(**locals()))
if open_slots <= 0:
continue
tis = sorted(
Expand Down Expand Up @@ -784,7 +784,7 @@ def _execute(self):
succeeded = []
started = []
wont_run = []
could_not_run = set()
not_ready_to_run = set()

for task in self.dag.tasks:
if (not self.include_adhoc) and task.adhoc:
Expand All @@ -795,6 +795,8 @@ def _execute(self):
for dttm in self.dag.date_range(start_date, end_date=end_date):
ti = models.TaskInstance(task, dttm)
tasks_to_run[ti.key] = ti
session.merge(ti)
session.commit()

# Triggering what is ready to get triggered
deadlocked = False
Expand Down Expand Up @@ -829,16 +831,15 @@ def _execute(self):
ti.state = State.RUNNING
if key not in started:
started.append(key)
if ti in could_not_run:
could_not_run.remove(ti)

# if the task is not runnable and has no state indicating why
# (like FAILED or UP_FOR_RETRY), then it's just not ready
# to run. If the set of tasks that aren't ready ever equals
# the set of tasks to run, then the backfill is deadlocked
elif ti.state is None:
could_not_run.add(ti)
if could_not_run == set(tasks_to_run.values()):
if ti in not_ready_to_run:
not_ready_to_run.remove(ti)

# Mark the task as not ready to run. If the set of tasks
# that aren't ready ever equals the set of tasks to run,
# then the backfill is deadlocked
elif ti.state in (State.NONE, State.UPSTREAM_FAILED):
not_ready_to_run.add(ti)
if not_ready_to_run == set(tasks_to_run.values()):
msg = 'BackfillJob is deadlocked: no tasks can be run.'
if any(
t.are_dependencies_met() !=
Expand All @@ -847,13 +848,13 @@ def _execute(self):
for t in tasks_to_run.values()):
msg += (
' Some of the tasks that were unable to '
'run have depends_on_past=True. Try running '
'the backfill with '
'ignore_first_depends_on_past=True '
'(or -I from the command line).')
'run have "depends_on_past=True". Try running '
'the backfill with the option '
'"ignore_first_depends_on_past=True" '
' or passing "-I" at the command line.')
self.logger.error(msg)
deadlocked = True
wont_run.extend(could_not_run)
wont_run.extend(not_ready_to_run)
tasks_to_run.clear()

self.heartbeat()
Expand All @@ -866,65 +867,76 @@ def _execute(self):
continue
ti = tasks_to_run[key]
ti.refresh_from_db()
if (
ti.state in (State.FAILED, State.SKIPPED) or
state == State.FAILED):
# executor reports failure; task reports running
if ti.state == State.RUNNING and state == State.FAILED:

# executor reports failure
if state == State.FAILED:

# task reports running
if ti.state == State.RUNNING:
msg = (
'Executor reports that task instance {} failed '
'although the task says it is running.'.format(key))
self.logger.error(msg)
ti.handle_failure(msg)
# executor and task report failure
elif ti.state == State.FAILED or state == State.FAILED:
failed.append(key)
self.logger.error("Task instance {} failed".format(key))

# task reports skipped
elif ti.state == State.SKIPPED:
wont_run.append(key)
self.logger.error("Skipping {} ".format(key))

# anything else is a failure
else:
failed.append(key)
self.logger.error("Task instance {} failed".format(key))

tasks_to_run.pop(key)
# Removing downstream tasks that also shouldn't run
for t in self.dag.get_task(task_id).get_flat_relatives(
upstream=False):
key = (ti.dag_id, t.task_id, execution_date)
if key in tasks_to_run:
wont_run.append(key)
tasks_to_run.pop(key)
# executor and task report success
elif ti.state == State.SUCCESS and state == State.SUCCESS:
succeeded.append(key)
tasks_to_run.pop(key)
elif state == State.SUCCESS and key in could_not_run:
continue
# executor reports success but task does not -- this is weird
elif (
ti.state not in (

# executor reports success
elif state == State.SUCCESS:

# task reports success
if ti.state == State.SUCCESS:
self.logger.info(
'Task instance {} succeeded'.format(key))
succeeded.append(key)
tasks_to_run.pop(key)

# task reports failure
elif ti.state == State.FAILED:
self.logger.error("Task instance {} failed".format(key))
failed.append(key)
tasks_to_run.pop(key)

# this probably won't ever be triggered
elif key in not_ready_to_run:
continue

# executor reports success but task does not - this is weird
elif ti.state not in (
State.SUCCESS,
State.QUEUED,
State.UP_FOR_RETRY) and
state == State.SUCCESS):
self.logger.error(
"The airflow run command failed "
"at reporting an error. This should not occur "
"in normal circumstances. Task state is '{}',"
"reported state is '{}'. TI is {}"
"".format(ti.state, state, ti))

# if the executor fails 3 or more times, stop trying to
# run the task
executor_fails[key] += 1
if executor_fails[key] >= 3:
msg = (
'The airflow run command failed to report an '
'error for task {} three or more times. The task '
'is being marked as failed. This is very unusual '
'and probably means that an error is taking place '
'before the task even starts.'.format(key))
self.logger.error(msg)
ti.handle_failure(msg)
tasks_to_run.pop(key)
State.UP_FOR_RETRY):
self.logger.error(
"The airflow run command failed "
"at reporting an error. This should not occur "
"in normal circumstances. Task state is '{}',"
"reported state is '{}'. TI is {}"
"".format(ti.state, state, ti))

# if the executor fails 3 or more times, stop trying to
# run the task
executor_fails[key] += 1
if executor_fails[key] >= 3:
msg = (
'The airflow run command failed to report an '
'error for task {} three or more times. The '
'task is being marked as failed. This is very '
'unusual and probably means that an error is '
'taking place before the task even '
'starts.'.format(key))
self.logger.error(msg)
ti.handle_failure(msg)
tasks_to_run.pop(key)

msg = (
"[backfill progress] "
Expand Down

0 comments on commit c1eb83a

Please sign in to comment.