Skip to content

Commit

Permalink
Limit the number of queued dagruns created by the Scheduler (#18065)
Browse files Browse the repository at this point in the history
There's no limit to the amount of queued dagruns to create currently
and it has become a concern with issues raised against it. See #18023 and #17979

Co-authored-by: Sam Wheating <[email protected]>
  • Loading branch information
ephraimbuddy and SamWheating authored Sep 9, 2021
1 parent 28de326 commit 0eb41b5
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 2 deletions.
8 changes: 8 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@
type: string
example: ~
default: "16"
- name: max_queued_runs_per_dag
description: |
The maximum number of queued dagruns for a single DAG. The scheduler will not create more DAG runs
if it reaches the limit. This is not configurable at the DAG level.
version_added: ~
type: string
example: ~
default: "16"
- name: load_examples
description: |
Whether to load the DAG examples that ship with Airflow. It's good to
Expand Down
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ dags_are_paused_at_creation = True
# which is defaulted as ``max_active_runs_per_dag``.
max_active_runs_per_dag = 16

# The maximum number of queued dagruns for a single DAG. The scheduler will not create more DAG runs
# if it reaches the limit. This is not configurable at the DAG level.
max_queued_runs_per_dag = 16

# Whether to load the DAG examples that ship with Airflow. It's good to
# get started, but you probably want to set this to ``False`` in a production
# environment
Expand Down
21 changes: 20 additions & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,14 +823,31 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
existing_dagruns = (
session.query(DagRun.dag_id, DagRun.execution_date).filter(existing_dagruns_filter).all()
)
max_queued_dagruns = conf.getint('core', 'max_queued_runs_per_dag')

queued_runs_of_dags = defaultdict(
int,
session.query(DagRun.dag_id, func.count('*'))
.filter( # We use `list` here because SQLA doesn't accept a set
# We use set to avoid duplicate dag_ids
DagRun.dag_id.in_(list({dm.dag_id for dm in dag_models})),
DagRun.state == State.QUEUED,
)
.group_by(DagRun.dag_id)
.all(),
)

for dag_model in dag_models:
# Lets quickly check if we have exceeded the number of queued dagruns per dags
total_queued = queued_runs_of_dags[dag_model.dag_id]
if total_queued >= max_queued_dagruns:
continue

try:
dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
except SerializedDagNotFound:
self.log.exception("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
continue

dag_hash = self.dagbag.dags_hash.get(dag.dag_id)
# Explicitly check if the DagRun already exists. This is an edge case
# where a Dag Run is created but `DagModel.next_dagrun` and `DagModel.next_dagrun_create_after`
Expand All @@ -841,6 +858,7 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
# create a new one. This is so that in the next Scheduling loop we try to create new runs
# instead of falling in a loop of Integrity Error.
if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:

dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=dag_model.next_dagrun,
Expand All @@ -851,6 +869,7 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
dag_hash=dag_hash,
creating_job_id=self.id,
)
queued_runs_of_dags[dag_model.dag_id] += 1
dag_model.calculate_dagrun_date_fields(dag, dag_model.next_dagrun)

# TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add index on state, dag_id for queued dagrun
Revision ID: ccde3e26fe78
Revises: 092435bf5d12
Create Date: 2021-09-08 16:35:34.867711
"""

from alembic import op
from sqlalchemy import text

# revision identifiers, used by Alembic.
revision = 'ccde3e26fe78'
down_revision = '092435bf5d12'
branch_labels = None
depends_on = None


def upgrade():
"""Apply Add index on state, dag_id for queued dagrun"""
with op.batch_alter_table('dag_run') as batch_op:
batch_op.create_index(
'idx_dag_run_queued_dags',
["state", "dag_id"],
postgres_where=text("state='queued'"),
mssql_where=text("state='queued'"),
sqlite_where=text("state='queued'"),
)


def downgrade():
"""Unapply Add index on state, dag_id for queued dagrun"""
with op.batch_alter_table('dag_run') as batch_op:
batch_op.drop_index('idx_dag_run_queued_dags')
10 changes: 10 additions & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,16 @@ class DagRun(Base, LoggingMixin):
mssql_where=text("state='running'"),
sqlite_where=text("state='running'"),
),
# since mysql lacks filtered/partial indices, this creates a
# duplicate index on mysql. Not the end of the world
Index(
'idx_dag_run_queued_dags',
'state',
'dag_id',
postgres_where=text("state='queued'"),
mssql_where=text("state='queued'"),
sqlite_where=text("state='queued'"),
),
)

task_instances = relationship(TI, back_populates="dag_run")
Expand Down
4 changes: 3 additions & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``092435bf5d12`` (head) | ``7b2661a43ba3`` | | Add ``max_active_runs`` column to ``dag_model`` table |
| ``ccde3e26fe78`` (head) | ``092435bf5d12`` | | Add index on state, dag_id for queued ``dagrun`` |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``092435bf5d12`` | ``7b2661a43ba3`` | | Add ``max_active_runs`` column to ``dag_model`` table |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``7b2661a43ba3`` | ``142555e44c17`` | | Change TaskInstance and TaskReschedule tables from execution_date to run_id. |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
Expand Down
25 changes: 25 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,31 @@ def test_cleanup_methods_all_called(self, mock_processor_agent):
self.scheduler_job.executor.end.assert_called_once()
mock_processor_agent.return_value.end.reset_mock(side_effect=True)

def test_theres_limit_to_queued_dagruns_in_a_dag(self, dag_maker):
"""This tests that there's limit to the number of queued dagrun scheduler can create in a dag"""
with dag_maker() as dag:
DummyOperator(task_id='mytask')

session = settings.Session()
self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.executor = MockExecutor()
self.scheduler_job.processor_agent = mock.MagicMock()

self.scheduler_job.dagbag = dag_maker.dagbag

session = settings.Session()
orm_dag = session.query(DagModel).get(dag.dag_id)
assert orm_dag is not None
for _ in range(20):
self.scheduler_job._create_dag_runs([orm_dag], session)
assert session.query(DagRun).count() == 16

with conf_vars({('core', 'max_queued_runs_per_dag'): '5'}):
clear_db_runs()
for i in range(20):
self.scheduler_job._create_dag_runs([orm_dag], session)
assert session.query(DagRun).count() == 5

def test_dagrun_timeout_verify_max_active_runs(self, dag_maker):
"""
Test if a a dagrun will not be scheduled if max_dag_runs
Expand Down

0 comments on commit 0eb41b5

Please sign in to comment.