diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 310a471c4b7f1..4c3cafb8052a7 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -39,7 +39,8 @@ def process_subdir(subdir): def get_dag(args): dagbag = DagBag(process_subdir(args.subdir)) if args.dag_id not in dagbag.dags: - raise AirflowException('dag_id could not be found') + raise AirflowException( + 'dag_id could not be found: {}'.format(args.dag_id)) return dagbag.dags[args.dag_id] diff --git a/airflow/jobs.py b/airflow/jobs.py index 34318f394b81b..7e4867a770e25 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -499,6 +499,7 @@ def process_dag(self, dag, executor): skip_tis = {(ti[0], ti[1]) for ti in qry.all()} descartes = [obj for obj in product(dag.tasks, active_runs)] + could_not_run = set() self.logger.info('Checking dependencies on {} tasks instances, minus {} ' 'skippable ones'.format(len(descartes), len(skip_tis))) for task, dttm in descartes: @@ -513,6 +514,23 @@ def process_dag(self, dag, executor): elif ti.is_runnable(flag_upstream_failed=True): self.logger.debug('Firing task: {}'.format(ti)) executor.queue_task_instance(ti, pickle_id=pickle_id) + else: + could_not_run.add(ti) + + # this type of deadlock happens when dagruns can't even start and so + # the TI's haven't been persisted to the database. + if len(could_not_run) == len(descartes): + self.logger.error( + 'Dag runs are deadlocked for DAG: {}'.format(dag.dag_id)) + (session + .query(models.DagRun) + .filter( + models.DagRun.dag_id == dag.dag_id, + models.DagRun.state == State.RUNNING, + models.DagRun.execution_date.in_(active_runs)) + .update( + {models.DagRun.state: State.FAILED}, + synchronize_session='fetch')) # Releasing the lock self.logger.debug("Unlocking DAG (scheduler_lock)") @@ -781,11 +799,12 @@ def _execute(self): # Build a list of all instances to run tasks_to_run = {} - failed = [] - succeeded = [] - started = [] - wont_run = [] - not_ready_to_run = set() + failed = set() + succeeded = set() + started = set() + skipped = set() + not_ready = set() + deadlocked = set() for task in self.dag.tasks: if (not self.include_adhoc) and task.adhoc: @@ -800,9 +819,8 @@ def _execute(self): session.commit() # Triggering what is ready to get triggered - deadlocked = False while tasks_to_run and not deadlocked: - + not_ready.clear() for key, ti in list(tasks_to_run.items()): ti.refresh_from_db() @@ -810,18 +828,24 @@ def _execute(self): self.ignore_first_depends_on_past and ti.execution_date == (start_date or ti.start_date)) - # Did the task finish without failing? -- then we're done - if ( - ti.state in (State.SUCCESS, State.SKIPPED) and - key in tasks_to_run): - succeeded.append(key) - tasks_to_run.pop(key) + # The task was already marked successful or skipped by a + # different Job. Don't rerun it. + if key not in started: + if ti.state == State.SUCCESS: + succeeded.add(key) + tasks_to_run.pop(key) + continue + elif ti.state == State.SKIPPED: + skipped.add(key) + tasks_to_run.pop(key) + continue - # Is the task runnable? -- the run it - elif ti.is_queueable( + # Is the task runnable? -- then run it + if ti.is_queueable( include_queued=True, ignore_depends_on_past=ignore_depends_on_past, flag_upstream_failed=True): + self.logger.debug('Sending {} to executor'.format(ti)) executor.queue_task_instance( ti, mark_success=self.mark_success, @@ -829,38 +853,22 @@ def _execute(self): ignore_dependencies=self.ignore_dependencies, ignore_depends_on_past=ignore_depends_on_past, pool=self.pool) - ti.state = State.RUNNING - if key not in started: - started.append(key) - if ti in not_ready_to_run: - not_ready_to_run.remove(ti) - - # Mark the task as not ready to run. If the set of tasks - # that aren't ready ever equals the set of tasks to run, - # then the backfill is deadlocked + started.add(key) + + # Mark the task as not ready to run elif ti.state in (State.NONE, State.UPSTREAM_FAILED): - not_ready_to_run.add(ti) - if not_ready_to_run == set(tasks_to_run.values()): - msg = 'BackfillJob is deadlocked: no tasks can be run.' - if any( - t.are_dependencies_met() != - t.are_dependencies_met( - ignore_depends_on_past=True) - for t in tasks_to_run.values()): - msg += ( - ' Some of the tasks that were unable to ' - 'run have "depends_on_past=True". Try running ' - 'the backfill with the option ' - '"ignore_first_depends_on_past=True" ' - ' or passing "-I" at the command line.') - self.logger.error(msg) - deadlocked = True - wont_run.extend(not_ready_to_run) - tasks_to_run.clear() + self.logger.debug('Added {} to not_ready'.format(ti)) + not_ready.add(key) self.heartbeat() executor.heartbeat() + # If the set of tasks that aren't ready ever equals the set of + # tasks to run, then the backfill is deadlocked + if not_ready and not_ready == set(tasks_to_run): + deadlocked.update(tasks_to_run.values()) + tasks_to_run.clear() + # Reacting to events for key, state in list(executor.get_event_buffer().items()): dag_id, task_id, execution_date = key @@ -882,12 +890,12 @@ def _execute(self): # task reports skipped elif ti.state == State.SKIPPED: - wont_run.append(key) + skipped.add(key) self.logger.error("Skipping {} ".format(key)) # anything else is a failure else: - failed.append(key) + failed.add(key) self.logger.error("Task instance {} failed".format(key)) tasks_to_run.pop(key) @@ -899,18 +907,19 @@ def _execute(self): if ti.state == State.SUCCESS: self.logger.info( 'Task instance {} succeeded'.format(key)) - succeeded.append(key) + succeeded.add(key) tasks_to_run.pop(key) # task reports failure elif ti.state == State.FAILED: self.logger.error("Task instance {} failed".format(key)) - failed.append(key) + failed.add(key) tasks_to_run.pop(key) # this probably won't ever be triggered - elif key in not_ready_to_run: - continue + elif ti in not_ready: + self.logger.info( + "{} wasn't expected to run, but it did".format(ti)) # executor reports success but task does not - this is weird elif ti.state not in ( @@ -939,29 +948,51 @@ def _execute(self): ti.handle_failure(msg) tasks_to_run.pop(key) - msg = ( - "[backfill progress] " - "waiting: {0} | " - "succeeded: {1} | " - "kicked_off: {2} | " - "failed: {3} | " - "wont_run: {4} ").format( - len(tasks_to_run), - len(succeeded), - len(started), - len(failed), - len(wont_run)) + msg = ' | '.join([ + "[backfill progress]", + "waiting: {0}", + "succeeded: {1}", + "kicked_off: {2}", + "failed: {3}", + "skipped: {4}", + "deadlocked: {5}" + ]).format( + len(tasks_to_run), + len(succeeded), + len(started), + len(failed), + len(skipped), + len(deadlocked)) self.logger.info(msg) executor.end() session.close() + + err = '' if failed: - msg = ( - "------------------------------------------\n" - "Some tasks instances failed, " - "here's the list:\n{}".format(failed)) - raise AirflowException(msg) - self.logger.info("All done. Exiting.") + err += ( + "---------------------------------------------------\n" + "Some task instances failed:\n{}\n".format(failed)) + if deadlocked: + err += ( + '---------------------------------------------------\n' + 'BackfillJob is deadlocked.') + deadlocked_depends_on_past = any( + t.are_dependencies_met() != t.are_dependencies_met( + ignore_depends_on_past=True) + for t in deadlocked) + if deadlocked_depends_on_past: + err += ( + 'Some of the deadlocked tasks were unable to run because ' + 'of "depends_on_past" relationships. Try running the ' + 'backfill with the option ' + '"ignore_first_depends_on_past=True" or passing "-I" at ' + 'the command line.') + err += ' These tasks were unable to run:\n{}\n'.format(deadlocked) + if err: + raise AirflowException(err) + + self.logger.info("Backfill done. Exiting.") class LocalTaskJob(BaseJob): diff --git a/airflow/models.py b/airflow/models.py index 2cc51cc7b6705..6868d34d7bbe1 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -2398,7 +2398,8 @@ def get_active_runs(self): # AND there are unfinished tasks... any(ti.state in State.unfinished() for ti in task_instances) and # AND none of them have dependencies met... - all(not ti.are_dependencies_met() for ti in task_instances + all(not ti.are_dependencies_met(session=session) + for ti in task_instances if ti.state in State.unfinished())) for run in active_runs: diff --git a/airflow/utils/state.py b/airflow/utils/state.py index 7824f15b3ffe6..169f5b6d5911f 100644 --- a/airflow/utils/state.py +++ b/airflow/utils/state.py @@ -69,8 +69,26 @@ def runnable(cls): cls.QUEUED ] + @classmethod + def finished(cls): + """ + A list of states indicating that a task started and completed a + run attempt. Note that the attempt could have resulted in failure or + have been interrupted; in any case, it is no longer running. + """ + return [ + cls.SUCCESS, + cls.SHUTDOWN, + cls.FAILED, + cls.SKIPPED, + ] + @classmethod def unfinished(cls): + """ + A list of states indicating that a task either has not completed + a run or has not even started. + """ return [ cls.NONE, cls.QUEUED, diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py index 051ff32ef0c33..1c81bf0218f94 100644 --- a/tests/dags/test_issue_1225.py +++ b/tests/dags/test_issue_1225.py @@ -94,3 +94,24 @@ def fail(): depends_on_past=True, dag=dag6,) dag6_task2.set_upstream(dag6_task1) + + +# DAG tests that a deadlocked subdag is properly caught +dag7 = DAG(dag_id='test_subdag_deadlock', default_args=default_args) +subdag7 = DAG(dag_id='test_subdag_deadlock.subdag', default_args=default_args) +subdag7_task1 = PythonOperator( + task_id='test_subdag_fail', + dag=subdag7, + python_callable=fail) +subdag7_task2 = DummyOperator( + task_id='test_subdag_dummy_1', + dag=subdag7,) +subdag7_task3 = DummyOperator( + task_id='test_subdag_dummy_2', + dag=subdag7) +dag7_subdag1 = SubDagOperator( + task_id='subdag', + dag=dag7, + subdag=subdag7) +subdag7_task1.set_downstream(subdag7_task2) +subdag7_task2.set_downstream(subdag7_task3) diff --git a/tests/jobs.py b/tests/jobs.py index 39343e40b5e6d..9f0070f6390d6 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -40,6 +40,9 @@ def setUp(self): self.dagbag = DagBag() def test_backfill_examples(self): + """ + Test backfilling example dags + """ dags = [ dag for dag in self.dagbag.dags.values() if dag.dag_id in ('example_bash_operator',)] @@ -56,12 +59,12 @@ def test_backfill_examples(self): def test_trap_executor_error(self): """ - Test for https://github.com/airbnb/airflow/pull/1220 + Test that errors setting up tasks (before tasks run) are caught - Test that errors setting up tasks (before tasks run) are properly - caught + Test for https://github.com/airbnb/airflow/pull/1220 """ dag = self.dagbag.get_dag('test_raise_executor_error') + dag.clear() job = BackfillJob( dag=dag, start_date=DEFAULT_DATE, @@ -75,9 +78,9 @@ def run_with_timeout(): def test_backfill_pooled_task(self): """ - Test for https://github.com/airbnb/airflow/pull/1225 - Test that queued tasks are executed by BackfillJob + + Test for https://github.com/airbnb/airflow/pull/1225 """ session = settings.Session() pool = Pool(pool='test_backfill_pooled_task_pool', slots=1) @@ -85,6 +88,7 @@ def test_backfill_pooled_task(self): session.commit() dag = self.dagbag.get_dag('test_backfill_pooled_task_dag') + dag.clear() job = BackfillJob( dag=dag, @@ -103,15 +107,18 @@ def test_backfill_pooled_task(self): self.assertEqual(ti.state, State.SUCCESS) def test_backfill_depends_on_past(self): + """ + Test that backfill resects ignore_depends_on_past + """ dag = self.dagbag.get_dag('test_depends_on_past') + dag.clear() run_date = DEFAULT_DATE + datetime.timedelta(days=5) - # import ipdb; ipdb.set_trace() - BackfillJob(dag=dag, start_date=run_date, end_date=run_date).run() - # ti should not have run - ti = TI(dag.tasks[0], run_date) - ti.refresh_from_db() - self.assertIs(ti.state, None) + # backfill should deadlock + self.assertRaisesRegexp( + AirflowException, + 'BackfillJob is deadlocked', + BackfillJob(dag=dag, start_date=run_date, end_date=run_date).run) BackfillJob( dag=dag, @@ -119,12 +126,15 @@ def test_backfill_depends_on_past(self): end_date=run_date, ignore_first_depends_on_past=True).run() - # ti should have run + # ti should have succeeded ti = TI(dag.tasks[0], run_date) ti.refresh_from_db() self.assertEquals(ti.state, State.SUCCESS) def test_cli_backfill_depends_on_past(self): + """ + Test that CLI respects -I argument + """ dag_id = 'test_dagrun_states_deadlock' run_date = DEFAULT_DATE + datetime.timedelta(days=1) args = [ @@ -135,8 +145,9 @@ def test_cli_backfill_depends_on_past(self): run_date.isoformat(), ] dag = self.dagbag.get_dag(dag_id) + dag.clear() - self.assertRaisesRegex( + self.assertRaisesRegexp( AirflowException, 'BackfillJob is deadlocked', cli.backfill, @@ -172,6 +183,7 @@ def evaluate_dagrun( scheduler = SchedulerJob() dag = self.dagbag.get_dag(dag_id) + dag.clear() dr = scheduler.schedule_dag(dag) if advance_execution_date: # run a second time to schedule a dagrun after the start_date @@ -208,8 +220,7 @@ def evaluate_dagrun( def test_dagrun_fail(self): """ - Test that a DagRun with one failed task and one incomplete root task - is marked a failure + DagRuns with one failed and one incomplete root task -> FAILED """ self.evaluate_dagrun( dag_id='test_dagrun_states_fail', @@ -219,8 +230,7 @@ def test_dagrun_fail(self): def test_dagrun_success(self): """ - Test that a DagRun with one failed task and one successful root task - is marked a success + DagRuns with one failed and one successful root task -> SUCCESS """ self.evaluate_dagrun( dag_id='test_dagrun_states_success', @@ -230,8 +240,7 @@ def test_dagrun_success(self): def test_dagrun_root_fail(self): """ - Test that a DagRun with one successful root task and one failed root - task is marked a failure + DagRuns with one successful and one failed root task -> FAILED """ self.evaluate_dagrun( dag_id='test_dagrun_states_root_fail', @@ -241,6 +250,8 @@ def test_dagrun_root_fail(self): def test_dagrun_deadlock(self): """ + Deadlocked DagRun is marked a failure + Test that a deadlocked dagrun is marked as a failure by having depends_on_past and an execution_date after the start_date """ @@ -253,6 +264,8 @@ def test_dagrun_deadlock(self): def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self): """ + DagRun is marked a success if ignore_first_depends_on_past=True + Test that an otherwise-deadlocked dagrun is marked as a success if ignore_first_depends_on_past=True and the dagrun execution_date is after the start_date. diff --git a/tests/models.py b/tests/models.py index 8f04f4bc1ec5d..f786b328bb223 100644 --- a/tests/models.py +++ b/tests/models.py @@ -227,6 +227,7 @@ def run_with_error(ti): def test_depends_on_past(self): dagbag = models.DagBag() dag = dagbag.get_dag('test_depends_on_past') + dag.clear() task = dag.tasks[0] run_date = task.start_date + datetime.timedelta(days=5) ti = TI(task, run_date) diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py index b3dd4f1641527..0006f6068da3e 100644 --- a/tests/operators/subdag_operator.py +++ b/tests/operators/subdag_operator.py @@ -82,6 +82,16 @@ def test_subdag_pools(self): session.delete(pool_10) session.commit() + def test_subdag_deadlock(self): + dagbag = DagBag() + dag = dagbag.get_dag('test_subdag_deadlock') + dag.clear() + subdag = dagbag.get_dag('test_subdag_deadlock.subdag') + subdag.clear() -if __name__ == "__main__": - unittest.main() + # first make sure subdag is deadlocked + self.assertRaisesRegexp(AirflowException, 'deadlocked', subdag.run, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + # now make sure dag picks up the subdag error + subdag.clear() + self.assertRaises(AirflowException, dag.run, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) diff --git a/tests/utils.py b/tests/utils.py index 692e63b4e9c0e..89a39d8302336 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -27,6 +27,9 @@ class LogUtilsTest(unittest.TestCase): def test_gcs_url_parse(self): + """ + Test GCS url parsing + """ logging.info( 'About to create a GCSLog object without a connection. This will ' 'log an error but testing will proceed.')