diff --git a/airflow/jobs.py b/airflow/jobs.py index 7a7472112ffb2..2936eedae1b9c 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -880,18 +880,19 @@ def _execute(self): 'although the task says it is running.'.format(key)) self.logger.error(msg) ti.handle_failure(msg) + tasks_to_run.pop(key) # task reports skipped elif ti.state == State.SKIPPED: - skipped.add(key) self.logger.error("Skipping {} ".format(key)) + skipped.add(key) + tasks_to_run.pop(key) # anything else is a failure else: - failed.add(key) self.logger.error("Task instance {} failed".format(key)) - - tasks_to_run.pop(key) + failed.add(key) + tasks_to_run.pop(key) # executor reports success elif state == State.SUCCESS: @@ -909,6 +910,12 @@ def _execute(self): failed.add(key) tasks_to_run.pop(key) + # task reports skipped + elif ti.state == State.SKIPPED: + self.logger.info("Task instance {} skipped".format(key)) + skipped.add(key) + tasks_to_run.pop(key) + # this probably won't ever be triggered elif ti in not_ready: self.logger.info( diff --git a/tests/jobs.py b/tests/jobs.py index c7c8e6243cbe6..be72da60e7e18 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -18,6 +18,7 @@ from __future__ import unicode_literals import datetime +import logging import os import unittest @@ -37,39 +38,52 @@ class BackfillJobTest(unittest.TestCase): def setUp(self): self.parser = cli.CLIFactory.get_parser() - self.dagbag = DagBag() + self.dagbag = DagBag(include_examples=True) def test_backfill_examples(self): """ Test backfilling example dags """ + + # some DAGs really are just examples... but try to make them work! + skip_dags = [ + 'example_http_operator', + ] + + logger = logging.getLogger('BackfillJobTest.test_backfill_examples') dags = [ dag for dag in self.dagbag.dags.values() - if dag.dag_id in ('example_bash_operator',)] + if 'example_dags' in dag.full_filepath + and dag.dag_id not in skip_dags + ] + for dag in dags: dag.clear( start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - for dag in dags: + + for i, dag in enumerate(sorted(dags, key=lambda d: d.dag_id)): + logger.info('*** Running example DAG #{}: {}'.format(i, dag.dag_id)) job = BackfillJob( dag=dag, start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE) + end_date=DEFAULT_DATE, + ignore_first_depends_on_past=True) job.run() def test_trap_executor_error(self): """ Test that errors setting up tasks (before tasks run) are caught. - + Executors run tasks with the `airflow run` command. If a task runs, its state (success, failure, or other) is stored in the database and `airflow run` exits without error. However, if an error is raised before the task runs, then the task won't be able to update its status. This can put the executor into an infinite loop of trying to run the - task. - - To counteract that, the executor traps errors coming from - `airflow run` (essentially looking for returncode != 0). + task. + + To counteract that, the executor traps errors coming from + `airflow run` (essentially looking for returncode != 0). This unit test creates such an error by trying to run a subdag whose dag_id has changed and therefore can't be found. If the trap is working properly, the error will be caught