diff --git a/airflow/jobs.py b/airflow/jobs.py index 7e4867a770e25..c7cbcc7903897 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -571,8 +571,6 @@ def process_events(self, executor, dagbag): # collect queued tasks for prioritiztion if ti.state == State.QUEUED: self.queued_tis.add(ti) - elif ti in self.queued_tis: - self.queued_tis.remove(ti) else: # special instructions for failed executions could go here pass @@ -601,6 +599,8 @@ def prioritize_queued(self, session, executor, dagbag): else: d[ti.pool].append(ti) + self.queued_tis.clear() + dag_blacklist = set(dagbag.paused_dags()) for pool, tis in list(d.items()): if not pool: diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py index 1c81bf0218f94..898cc04991eb9 100644 --- a/tests/dags/test_issue_1225.py +++ b/tests/dags/test_issue_1225.py @@ -115,3 +115,15 @@ def fail(): subdag=subdag7) subdag7_task1.set_downstream(subdag7_task2) subdag7_task2.set_downstream(subdag7_task3) + +# DAG tests that queued tasks are run +dag8 = DAG( + dag_id='test_scheduled_queued_tasks', + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, + default_args=default_args) +dag8_task1 = PythonOperator( + python_callable=fail, + task_id='test_queued_task', + dag=dag8, + pool='test_queued_pool') diff --git a/tests/jobs.py b/tests/jobs.py index 9f0070f6390d6..ac892404c362e 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -76,7 +76,7 @@ def run_with_timeout(): job.run() self.assertRaises(AirflowException, run_with_timeout) - def test_backfill_pooled_task(self): + def test_backfill_pooled_tasks(self): """ Test that queued tasks are executed by BackfillJob @@ -262,6 +262,35 @@ def test_dagrun_deadlock(self): dagrun_state=State.FAILED, advance_execution_date=True) + def test_scheduler_pooled_tasks(self): + """ + Test that the scheduler handles queued tasks correctly + See issue #1299 + """ + session = settings.Session() + if not ( + session.query(Pool) + .filter(Pool.pool == 'test_queued_pool') + .first()): + pool = Pool(pool='test_queued_pool', slots=5) + session.merge(pool) + session.commit() + session.close() + + dag_id = 'test_scheduled_queued_tasks' + dag = self.dagbag.get_dag(dag_id) + dag.clear() + + scheduler = SchedulerJob(dag_id, num_runs=10) + scheduler.run() + + task_1 = dag.tasks[0] + ti = TI(task_1, dag.start_date) + ti.refresh_from_db() + self.assertEqual(ti.state, State.FAILED) + + dag.clear() + def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self): """ DagRun is marked a success if ignore_first_depends_on_past=True