From 0eb41b5952c2ce1884594c82bbf05835912b9812 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 9 Sep 2021 14:24:24 +0100 Subject: [PATCH] Limit the number of queued dagruns created by the Scheduler (#18065) There's no limit to the amount of queued dagruns to create currently and it has become a concern with issues raised against it. See #18023 and #17979 Co-authored-by: Sam Wheating --- airflow/config_templates/config.yml | 8 +++ airflow/config_templates/default_airflow.cfg | 4 ++ airflow/jobs/scheduler_job.py | 21 +++++++- ...8_add_index_on_state_dag_id_for_queued_.py | 52 +++++++++++++++++++ airflow/models/dagrun.py | 10 ++++ docs/apache-airflow/migrations-ref.rst | 4 +- tests/jobs/test_scheduler_job.py | 25 +++++++++ 7 files changed, 122 insertions(+), 2 deletions(-) create mode 100644 airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 239ffd73a8db3..6d301fdc9e1a4 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -195,6 +195,14 @@ type: string example: ~ default: "16" + - name: max_queued_runs_per_dag + description: | + The maximum number of queued dagruns for a single DAG. The scheduler will not create more DAG runs + if it reaches the limit. This is not configurable at the DAG level. + version_added: ~ + type: string + example: ~ + default: "16" - name: load_examples description: | Whether to load the DAG examples that ship with Airflow. It's good to diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index b0046271c71ba..1b11bd7908332 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -131,6 +131,10 @@ dags_are_paused_at_creation = True # which is defaulted as ``max_active_runs_per_dag``. max_active_runs_per_dag = 16 +# The maximum number of queued dagruns for a single DAG. The scheduler will not create more DAG runs +# if it reaches the limit. This is not configurable at the DAG level. +max_queued_runs_per_dag = 16 + # Whether to load the DAG examples that ship with Airflow. It's good to # get started, but you probably want to set this to ``False`` in a production # environment diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 4d8a704b0e4f3..ec10c054f1d32 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -823,14 +823,31 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) -> existing_dagruns = ( session.query(DagRun.dag_id, DagRun.execution_date).filter(existing_dagruns_filter).all() ) + max_queued_dagruns = conf.getint('core', 'max_queued_runs_per_dag') + + queued_runs_of_dags = defaultdict( + int, + session.query(DagRun.dag_id, func.count('*')) + .filter( # We use `list` here because SQLA doesn't accept a set + # We use set to avoid duplicate dag_ids + DagRun.dag_id.in_(list({dm.dag_id for dm in dag_models})), + DagRun.state == State.QUEUED, + ) + .group_by(DagRun.dag_id) + .all(), + ) for dag_model in dag_models: + # Lets quickly check if we have exceeded the number of queued dagruns per dags + total_queued = queued_runs_of_dags[dag_model.dag_id] + if total_queued >= max_queued_dagruns: + continue + try: dag = self.dagbag.get_dag(dag_model.dag_id, session=session) except SerializedDagNotFound: self.log.exception("DAG '%s' not found in serialized_dag table", dag_model.dag_id) continue - dag_hash = self.dagbag.dags_hash.get(dag.dag_id) # Explicitly check if the DagRun already exists. This is an edge case # where a Dag Run is created but `DagModel.next_dagrun` and `DagModel.next_dagrun_create_after` @@ -841,6 +858,7 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) -> # create a new one. This is so that in the next Scheduling loop we try to create new runs # instead of falling in a loop of Integrity Error. if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns: + dag.create_dagrun( run_type=DagRunType.SCHEDULED, execution_date=dag_model.next_dagrun, @@ -851,6 +869,7 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) -> dag_hash=dag_hash, creating_job_id=self.id, ) + queued_runs_of_dags[dag_model.dag_id] += 1 dag_model.calculate_dagrun_date_fields(dag, dag_model.next_dagrun) # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in diff --git a/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py b/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py new file mode 100644 index 0000000000000..7326d73abff07 --- /dev/null +++ b/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add index on state, dag_id for queued dagrun + +Revision ID: ccde3e26fe78 +Revises: 092435bf5d12 +Create Date: 2021-09-08 16:35:34.867711 + +""" + +from alembic import op +from sqlalchemy import text + +# revision identifiers, used by Alembic. +revision = 'ccde3e26fe78' +down_revision = '092435bf5d12' +branch_labels = None +depends_on = None + + +def upgrade(): + """Apply Add index on state, dag_id for queued dagrun""" + with op.batch_alter_table('dag_run') as batch_op: + batch_op.create_index( + 'idx_dag_run_queued_dags', + ["state", "dag_id"], + postgres_where=text("state='queued'"), + mssql_where=text("state='queued'"), + sqlite_where=text("state='queued'"), + ) + + +def downgrade(): + """Unapply Add index on state, dag_id for queued dagrun""" + with op.batch_alter_table('dag_run') as batch_op: + batch_op.drop_index('idx_dag_run_queued_dags') diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index aa32fdc9f9242..59b01a7f99501 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -112,6 +112,16 @@ class DagRun(Base, LoggingMixin): mssql_where=text("state='running'"), sqlite_where=text("state='running'"), ), + # since mysql lacks filtered/partial indices, this creates a + # duplicate index on mysql. Not the end of the world + Index( + 'idx_dag_run_queued_dags', + 'state', + 'dag_id', + postgres_where=text("state='queued'"), + mssql_where=text("state='queued'"), + sqlite_where=text("state='queued'"), + ), ) task_instances = relationship(TI, back_populates="dag_run") diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index a5446b2d4a138..20ed044aa7a2e 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+ -| ``092435bf5d12`` (head) | ``7b2661a43ba3`` | | Add ``max_active_runs`` column to ``dag_model`` table | +| ``ccde3e26fe78`` (head) | ``092435bf5d12`` | | Add index on state, dag_id for queued ``dagrun`` | ++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+ +| ``092435bf5d12`` | ``7b2661a43ba3`` | | Add ``max_active_runs`` column to ``dag_model`` table | +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+ | ``7b2661a43ba3`` | ``142555e44c17`` | | Change TaskInstance and TaskReschedule tables from execution_date to run_id. | +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+ diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index ddeaf5f77b253..fe443417fd9f4 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1019,6 +1019,31 @@ def test_cleanup_methods_all_called(self, mock_processor_agent): self.scheduler_job.executor.end.assert_called_once() mock_processor_agent.return_value.end.reset_mock(side_effect=True) + def test_theres_limit_to_queued_dagruns_in_a_dag(self, dag_maker): + """This tests that there's limit to the number of queued dagrun scheduler can create in a dag""" + with dag_maker() as dag: + DummyOperator(task_id='mytask') + + session = settings.Session() + self.scheduler_job = SchedulerJob(subdir=os.devnull) + self.scheduler_job.executor = MockExecutor() + self.scheduler_job.processor_agent = mock.MagicMock() + + self.scheduler_job.dagbag = dag_maker.dagbag + + session = settings.Session() + orm_dag = session.query(DagModel).get(dag.dag_id) + assert orm_dag is not None + for _ in range(20): + self.scheduler_job._create_dag_runs([orm_dag], session) + assert session.query(DagRun).count() == 16 + + with conf_vars({('core', 'max_queued_runs_per_dag'): '5'}): + clear_db_runs() + for i in range(20): + self.scheduler_job._create_dag_runs([orm_dag], session) + assert session.query(DagRun).count() == 5 + def test_dagrun_timeout_verify_max_active_runs(self, dag_maker): """ Test if a a dagrun will not be scheduled if max_dag_runs