From 7d5f2bad082425d0457e3f94e6119f739f3e9892 Mon Sep 17 00:00:00 2001 From: Pierre Jeambrun Date: Fri, 13 Sep 2024 23:04:42 +0800 Subject: [PATCH] Migrate public get dags to fastapi (#42196) * Migrate public get dags to fastapi * Fix CI * Fix CI lowest dependency resolution --- airflow/api_fastapi/db.py | 43 +++ airflow/api_fastapi/openapi/v1-generated.yaml | 259 ++++++++++++++++++ airflow/api_fastapi/serializers/__init__.py | 16 ++ airflow/api_fastapi/serializers/dags.py | 89 ++++++ airflow/api_fastapi/views/public/__init__.py | 5 + airflow/api_fastapi/views/public/dags.py | 78 ++++++ airflow/api_fastapi/views/ui/__init__.py | 4 +- airflow/api_fastapi/views/ui/datasets.py | 93 ++++--- hatch_build.py | 5 +- pyproject.toml | 5 + tests/api_fastapi/views/public/__init__.py | 16 ++ tests/api_fastapi/views/public/test_dags.py | 94 +++++++ 12 files changed, 660 insertions(+), 47 deletions(-) create mode 100644 airflow/api_fastapi/db.py create mode 100644 airflow/api_fastapi/serializers/__init__.py create mode 100644 airflow/api_fastapi/serializers/dags.py create mode 100644 airflow/api_fastapi/views/public/dags.py create mode 100644 tests/api_fastapi/views/public/__init__.py create mode 100644 tests/api_fastapi/views/public/test_dags.py diff --git a/airflow/api_fastapi/db.py b/airflow/api_fastapi/db.py new file mode 100644 index 0000000000000..932cd4532530d --- /dev/null +++ b/airflow/api_fastapi/db.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow.utils.session import create_session + +if TYPE_CHECKING: + from sqlalchemy.orm import Session + + +async def get_session() -> Session: + """ + Dependency for providing a session. + + For non route function please use the :class:`airflow.utils.session.provide_session` decorator. + + Example usage: + + .. code:: python + + @router.get("/your_path") + def your_route(session: Annotated[Session, Depends(get_session)]): + pass + """ + with create_session() as session: + yield session diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index dcd67b84df845..f7549f3b424ab 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -34,8 +34,267 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags: + get: + tags: + - DAG + summary: Get Dags + description: Get all DAGs. + operationId: get_dags_public_dags_get + parameters: + - name: limit + in: query + required: false + schema: + type: integer + default: 100 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + default: 0 + title: Offset + - name: tags + in: query + required: false + schema: + anyOf: + - type: array + items: + type: string + - type: 'null' + title: Tags + - name: dag_id_pattern + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Dag Id Pattern + - name: only_active + in: query + required: false + schema: + type: boolean + default: true + title: Only Active + - name: paused + in: query + required: false + schema: + anyOf: + - type: boolean + - type: 'null' + title: Paused + - name: order_by + in: query + required: false + schema: + type: string + default: dag_id + title: Order By + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/DAGCollectionResponse' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' components: schemas: + DAGCollectionResponse: + properties: + dags: + items: + $ref: '#/components/schemas/DAGModelResponse' + type: array + title: Dags + total_entries: + type: integer + title: Total Entries + type: object + required: + - dags + - total_entries + title: DAGCollectionResponse + description: DAG Collection serializer for responses. + DAGModelResponse: + properties: + dag_id: + type: string + title: Dag Id + dag_display_name: + type: string + title: Dag Display Name + is_paused: + type: boolean + title: Is Paused + is_active: + type: boolean + title: Is Active + last_parsed_time: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Parsed Time + last_pickled: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Pickled + last_expired: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Last Expired + scheduler_lock: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Scheduler Lock + pickle_id: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Pickle Id + default_view: + anyOf: + - type: string + - type: 'null' + title: Default View + fileloc: + type: string + title: Fileloc + description: + anyOf: + - type: string + - type: 'null' + title: Description + timetable_summary: + anyOf: + - type: string + - type: 'null' + title: Timetable Summary + timetable_description: + anyOf: + - type: string + - type: 'null' + title: Timetable Description + tags: + items: + $ref: '#/components/schemas/DagTagPydantic' + type: array + title: Tags + max_active_tasks: + type: integer + title: Max Active Tasks + max_active_runs: + anyOf: + - type: integer + - type: 'null' + title: Max Active Runs + max_consecutive_failed_dag_runs: + type: integer + title: Max Consecutive Failed Dag Runs + has_task_concurrency_limits: + type: boolean + title: Has Task Concurrency Limits + has_import_errors: + type: boolean + title: Has Import Errors + next_dagrun: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun + next_dagrun_data_interval_start: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun Data Interval Start + next_dagrun_data_interval_end: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun Data Interval End + next_dagrun_create_after: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Next Dagrun Create After + owners: + items: + type: string + type: array + title: Owners + file_token: + type: string + title: File Token + description: Return file token. + readOnly: true + type: object + required: + - dag_id + - dag_display_name + - is_paused + - is_active + - last_parsed_time + - last_pickled + - last_expired + - scheduler_lock + - pickle_id + - default_view + - fileloc + - description + - timetable_summary + - timetable_description + - tags + - max_active_tasks + - max_active_runs + - max_consecutive_failed_dag_runs + - has_task_concurrency_limits + - has_import_errors + - next_dagrun + - next_dagrun_data_interval_start + - next_dagrun_data_interval_end + - next_dagrun_create_after + - owners + - file_token + title: DAGModelResponse + description: DAG serializer for responses. + DagTagPydantic: + properties: + name: + type: string + title: Name + dag_id: + type: string + title: Dag Id + type: object + required: + - name + - dag_id + title: DagTagPydantic + description: Serializable representation of the DagTag ORM SqlAlchemyModel used + by internal API. HTTPValidationError: properties: detail: diff --git a/airflow/api_fastapi/serializers/__init__.py b/airflow/api_fastapi/serializers/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/api_fastapi/serializers/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/api_fastapi/serializers/dags.py b/airflow/api_fastapi/serializers/dags.py new file mode 100644 index 0000000000000..264f549e298a5 --- /dev/null +++ b/airflow/api_fastapi/serializers/dags.py @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from itsdangerous import URLSafeSerializer +from pydantic import ( + BaseModel, + computed_field, + field_validator, +) + +from airflow.configuration import conf +from airflow.serialization.pydantic.dag import DagTagPydantic + + +class DAGModelResponse(BaseModel): + """DAG serializer for responses.""" + + dag_id: str + dag_display_name: str + is_paused: bool + is_active: bool + last_parsed_time: datetime | None + last_pickled: datetime | None + last_expired: datetime | None + scheduler_lock: datetime | None + pickle_id: datetime | None + default_view: str | None + fileloc: str + description: str | None + timetable_summary: str | None + timetable_description: str | None + tags: list[DagTagPydantic] + max_active_tasks: int + max_active_runs: int | None + max_consecutive_failed_dag_runs: int + has_task_concurrency_limits: bool + has_import_errors: bool + next_dagrun: datetime | None + next_dagrun_data_interval_start: datetime | None + next_dagrun_data_interval_end: datetime | None + next_dagrun_create_after: datetime | None + owners: list[str] + + @field_validator("owners", mode="before") + @classmethod + def get_owners(cls, v: Any) -> list[str] | None: + """Convert owners attribute to DAG representation.""" + if not (v is None or isinstance(v, str)): + return v + + if v is None: + return [] + elif isinstance(v, str): + return v.split(",") + return v + + # Mypy issue https://github.com/python/mypy/issues/1362 + @computed_field # type: ignore[misc] + @property + def file_token(self) -> str: + """Return file token.""" + serializer = URLSafeSerializer(conf.get_mandatory_value("webserver", "secret_key")) + return serializer.dumps(self.fileloc) + + +class DAGCollectionResponse(BaseModel): + """DAG Collection serializer for responses.""" + + dags: list[DAGModelResponse] + total_entries: int diff --git a/airflow/api_fastapi/views/public/__init__.py b/airflow/api_fastapi/views/public/__init__.py index 230b4a26c37cb..b6466536c3359 100644 --- a/airflow/api_fastapi/views/public/__init__.py +++ b/airflow/api_fastapi/views/public/__init__.py @@ -19,4 +19,9 @@ from fastapi import APIRouter +from airflow.api_fastapi.views.public.dags import dags_router + public_router = APIRouter(prefix="/public") + + +public_router.include_router(dags_router) diff --git a/airflow/api_fastapi/views/public/dags.py b/airflow/api_fastapi/views/public/dags.py new file mode 100644 index 0000000000000..e8fac138fc75e --- /dev/null +++ b/airflow/api_fastapi/views/public/dags.py @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy import or_, select +from sqlalchemy.orm import Session +from typing_extensions import Annotated + +from airflow.api_connexion.parameters import apply_sorting +from airflow.api_fastapi.db import get_session +from airflow.api_fastapi.serializers.dags import DAGCollectionResponse, DAGModelResponse +from airflow.models import DagModel +from airflow.models.dag import DagTag +from airflow.utils.db import get_query_count + +dags_router = APIRouter(tags=["DAG"]) + + +@dags_router.get("/dags") +async def get_dags( + *, + limit: int = 100, + offset: int = 0, + tags: Annotated[list[str] | None, Query()] = None, + dag_id_pattern: str | None = None, + only_active: bool = True, + paused: bool | None = None, + order_by: str = "dag_id", + session: Annotated[Session, Depends(get_session)], +) -> DAGCollectionResponse: + """Get all DAGs.""" + allowed_sorting_attrs = ["dag_id"] + dags_query = select(DagModel) + if only_active: + dags_query = dags_query.where(DagModel.is_active) + if paused is not None: + if paused: + dags_query = dags_query.where(DagModel.is_paused) + else: + dags_query = dags_query.where(~DagModel.is_paused) + if dag_id_pattern: + dags_query = dags_query.where(DagModel.dag_id.ilike(f"%{dag_id_pattern}%")) + + # TODO: Re-enable when permissions are handled. + # readable_dags = get_auth_manager().get_permitted_dag_ids(user=g.user) + # dags_query = dags_query.where(DagModel.dag_id.in_(readable_dags)) + + if tags: + cond = [DagModel.tags.any(DagTag.name == tag) for tag in tags] + dags_query = dags_query.where(or_(*cond)) + + total_entries = get_query_count(dags_query, session=session) + dags_query = apply_sorting(dags_query, order_by, {}, allowed_sorting_attrs) + dags = session.scalars(dags_query.offset(offset).limit(limit)).all() + + try: + return DAGCollectionResponse( + dags=[DAGModelResponse.model_validate(dag, from_attributes=True) for dag in dags], + total_entries=total_entries, + ) + except ValueError as e: + raise HTTPException(400, f"DAGCollectionSchema error: {str(e)}") diff --git a/airflow/api_fastapi/views/ui/__init__.py b/airflow/api_fastapi/views/ui/__init__.py index aa539e2845fad..2d95e040403a7 100644 --- a/airflow/api_fastapi/views/ui/__init__.py +++ b/airflow/api_fastapi/views/ui/__init__.py @@ -18,8 +18,8 @@ from fastapi import APIRouter -from airflow.api_fastapi.views.ui.datasets import dataset_router +from airflow.api_fastapi.views.ui.datasets import datasets_router ui_router = APIRouter(prefix="/ui") -ui_router.include_router(dataset_router) +ui_router.include_router(datasets_router) diff --git a/airflow/api_fastapi/views/ui/datasets.py b/airflow/api_fastapi/views/ui/datasets.py index d6de8ebca0e02..484385031a23d 100644 --- a/airflow/api_fastapi/views/ui/datasets.py +++ b/airflow/api_fastapi/views/ui/datasets.py @@ -17,66 +17,71 @@ from __future__ import annotations -from fastapi import APIRouter, HTTPException, Request +from fastapi import APIRouter, Depends, HTTPException, Request from sqlalchemy import and_, func, select +from sqlalchemy.orm import Session +from typing_extensions import Annotated +from airflow.api_fastapi.db import get_session from airflow.models import DagModel from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue, DatasetEvent, DatasetModel -from airflow.utils.session import create_session -dataset_router = APIRouter(tags=["Dataset"]) +datasets_router = APIRouter(tags=["Dataset"]) # Ultimately we want async routes, with async sqlalchemy session / context manager. # Additional effort to make airflow utility code async, not handled for now and most likely part of the AIP-70 -@dataset_router.get("/next_run_datasets/{dag_id}", include_in_schema=False) -async def next_run_datasets(dag_id: str, request: Request) -> dict: +@datasets_router.get("/next_run_datasets/{dag_id}", include_in_schema=False) +async def next_run_datasets( + dag_id: str, + request: Request, + session: Annotated[Session, Depends(get_session)], +) -> dict: dag = request.app.state.dag_bag.get_dag(dag_id) if not dag: raise HTTPException(404, f"can't find dag {dag_id}") - with create_session() as session: - dag_model = DagModel.get_dagmodel(dag_id, session=session) + dag_model = DagModel.get_dagmodel(dag_id, session=session) - if dag_model is None: - raise HTTPException(404, f"can't find associated dag_model {dag_id}") + if dag_model is None: + raise HTTPException(404, f"can't find associated dag_model {dag_id}") - latest_run = dag_model.get_last_dagrun(session=session) + latest_run = dag_model.get_last_dagrun(session=session) - events = [ - dict(info._mapping) - for info in session.execute( - select( - DatasetModel.id, - DatasetModel.uri, - func.max(DatasetEvent.timestamp).label("lastUpdate"), - ) - .join(DagScheduleDatasetReference, DagScheduleDatasetReference.dataset_id == DatasetModel.id) - .join( - DatasetDagRunQueue, - and_( - DatasetDagRunQueue.dataset_id == DatasetModel.id, - DatasetDagRunQueue.target_dag_id == DagScheduleDatasetReference.dag_id, - ), - isouter=True, - ) - .join( - DatasetEvent, - and_( - DatasetEvent.dataset_id == DatasetModel.id, - ( - DatasetEvent.timestamp >= latest_run.execution_date - if latest_run and latest_run.execution_date - else True - ), + events = [ + dict(info._mapping) + for info in session.execute( + select( + DatasetModel.id, + DatasetModel.uri, + func.max(DatasetEvent.timestamp).label("lastUpdate"), + ) + .join(DagScheduleDatasetReference, DagScheduleDatasetReference.dataset_id == DatasetModel.id) + .join( + DatasetDagRunQueue, + and_( + DatasetDagRunQueue.dataset_id == DatasetModel.id, + DatasetDagRunQueue.target_dag_id == DagScheduleDatasetReference.dag_id, + ), + isouter=True, + ) + .join( + DatasetEvent, + and_( + DatasetEvent.dataset_id == DatasetModel.id, + ( + DatasetEvent.timestamp >= latest_run.execution_date + if latest_run and latest_run.execution_date + else True ), - isouter=True, - ) - .where(DagScheduleDatasetReference.dag_id == dag_id, ~DatasetModel.is_orphaned) - .group_by(DatasetModel.id, DatasetModel.uri) - .order_by(DatasetModel.uri) + ), + isouter=True, ) - ] - data = {"dataset_expression": dag_model.dataset_expression, "events": events} - return data + .where(DagScheduleDatasetReference.dag_id == dag_id, ~DatasetModel.is_orphaned) + .group_by(DatasetModel.id, DatasetModel.uri) + .order_by(DatasetModel.uri) + ) + ] + data = {"dataset_expression": dag_model.dataset_expression, "events": events} + return data diff --git a/hatch_build.py b/hatch_build.py index 6233712ce676e..efd3ccd560e56 100644 --- a/hatch_build.py +++ b/hatch_build.py @@ -424,6 +424,9 @@ "cryptography>=41.0.0", "deprecated>=1.2.13", "dill>=0.2.2", + # Required for python 3.8 and 3.9 to work with new annotations styles. Check package + # description on PyPI for more details: https://pypi.org/project/eval-type-backport/ + "eval-type-backport>=0.2.0", "fastapi[standard]>=0.112.2", "flask-caching>=2.0.0", # Flask-Session 0.6 add new arguments into the SqlAlchemySessionInterface constructor as well as @@ -462,7 +465,7 @@ 'pendulum>=3.0.0,<4.0;python_version>="3.12"', "pluggy>=1.5.0", "psutil>=5.8.0", - "pydantic>=2.3.0", + "pydantic>=2.6.0", "pygments>=2.0.1", "pyjwt>=2.0.0", "python-daemon>=3.0.0", diff --git a/pyproject.toml b/pyproject.toml index caddac66ad18d..8abae197ee7df 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -360,6 +360,11 @@ combine-as-imports = true # Pydantic also require models to be imported during execution "airflow/serialization/pydantic/*.py" = ["I002", "UP007", "TCH001"] +# Failing to detect types and functions used in `Annotated[...]` syntax as required at runtime. +# Annotated is central for FastAPI dependency injection, skipping rules for FastAPI folders. +"airflow/api_fastapi/*" = ["TCH001", "TCH002"] +"tests/api_fastapi/*" = ["TCH001", "TCH002"] + # Ignore pydoc style from these "*.pyi" = ["D"] "scripts/*" = ["D", "PT"] # In addition ignore pytest specific rules diff --git a/tests/api_fastapi/views/public/__init__.py b/tests/api_fastapi/views/public/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/api_fastapi/views/public/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/api_fastapi/views/public/test_dags.py b/tests/api_fastapi/views/public/test_dags.py new file mode 100644 index 0000000000000..8d83bf1d217c4 --- /dev/null +++ b/tests/api_fastapi/views/public/test_dags.py @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +import pytest + +from airflow.models.dag import DAG, DagModel +from airflow.operators.empty import EmptyOperator +from airflow.utils.session import provide_session +from tests.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags + +pytestmark = pytest.mark.db_test + +DAG_ID = "test_dag1" +DAG2_ID = "test_dag2" +DAG3_ID = "test_dag3" +TASK_ID = "op1" + + +@provide_session +def _create_deactivated_paused_dag(session=None): + dag_model = DagModel( + dag_id=DAG3_ID, + fileloc="/tmp/dag_del_1.py", + timetable_summary="2 2 * * *", + is_active=False, + is_paused=True, + ) + session.add(dag_model) + + +@pytest.fixture(autouse=True) +def setup() -> None: + clear_db_runs() + clear_db_dags() + clear_db_serialized_dags() + + with DAG( + DAG_ID, + schedule=None, + start_date=datetime(2020, 6, 15), + doc_md="details", + params={"foo": 1}, + tags=["example"], + ) as dag1: + EmptyOperator(task_id=TASK_ID) + + with DAG(DAG2_ID, schedule=None, start_date=datetime(2020, 6, 15)) as dag2: + EmptyOperator(task_id=TASK_ID) + + dag1.sync_to_db() + dag2.sync_to_db() + + _create_deactivated_paused_dag() + + +@pytest.mark.parametrize( + "query_params, expected_total_entries, expected_ids", + [ + ({}, 2, ["test_dag1", "test_dag2"]), + ({"limit": 1}, 2, ["test_dag1"]), + ({"offset": 1}, 2, ["test_dag2"]), + ({"tags": ["example"]}, 1, ["test_dag1"]), + ({"dag_id_pattern": "1"}, 1, ["test_dag1"]), + ({"only_active": False}, 3, ["test_dag1", "test_dag2", "test_dag3"]), + ({"paused": True, "only_active": False}, 1, ["test_dag3"]), + ({"paused": False}, 2, ["test_dag1", "test_dag2"]), + ({"order_by": "-dag_id"}, 2, ["test_dag2", "test_dag1"]), + ], +) +def test_get_dags(test_client, query_params, expected_total_entries, expected_ids): + response = test_client.get("/public/dags", params=query_params) + + assert response.status_code == 200 + body = response.json() + + assert body["total_entries"] == expected_total_entries + assert [dag["dag_id"] for dag in body["dags"]] == expected_ids