Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[API] can return the specified field when get dag/dagRun #36641

Merged
merged 7 commits into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions airflow/api_connexion/endpoints/dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters
from airflow.api_connexion.schemas.dag_schema import (
DAGCollection,
dag_detail_schema,
DAGCollectionSchema,
DAGDetailSchema,
DAGSchema,
dag_schema,
dags_collection_schema,
)
Expand All @@ -50,19 +52,27 @@

@security.requires_access_dag("GET")
@provide_session
def get_dag(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
def get_dag(
*, dag_id: str, fields: Collection[str] | None = None, session: Session = NEW_SESSION
) -> APIResponse:
"""Get basic information about a DAG."""
dag = session.scalar(select(DagModel).where(DagModel.dag_id == dag_id))

if dag is None:
raise NotFound("DAG not found", detail=f"The DAG with dag_id: {dag_id} was not found")

return dag_schema.dump(dag)
try:
dag_schema = DAGSchema(only=fields) if fields else DAGSchema()
except ValueError as e:
raise BadRequest("DAGSchema init error", detail=str(e))
return dag_schema.dump(
dag,
)


@security.requires_access_dag("GET")
@provide_session
def get_dag_details(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
def get_dag_details(
*, dag_id: str, fields: Collection[str] | None = None, session: Session = NEW_SESSION
) -> APIResponse:
"""Get details of DAG."""
dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
if not dag:
Expand All @@ -71,7 +81,10 @@ def get_dag_details(*, dag_id: str, session: Session = NEW_SESSION) -> APIRespon
for key, value in dag.__dict__.items():
if not key.startswith("_") and not hasattr(dag_model, key):
setattr(dag_model, key, value)

try:
dag_detail_schema = DAGDetailSchema(only=fields) if fields else DAGDetailSchema()
except ValueError as e:
raise BadRequest("DAGDetailSchema init error", detail=str(e))
return dag_detail_schema.dump(dag_model)


Expand All @@ -87,6 +100,7 @@ def get_dags(
only_active: bool = True,
paused: bool | None = None,
order_by: str = "dag_id",
fields: Collection[str] | None = None,
session: Session = NEW_SESSION,
) -> APIResponse:
"""Get all DAGs."""
Expand All @@ -113,7 +127,15 @@ def get_dags(
dags_query = apply_sorting(dags_query, order_by, {}, allowed_attrs)
dags = session.scalars(dags_query.offset(offset).limit(limit)).all()

return dags_collection_schema.dump(DAGCollection(dags=dags, total_entries=total_entries))
try:
dags_collection_schema = (
DAGCollectionSchema(only=[f"dags.{field}" for field in fields])
if fields
else DAGCollectionSchema()
)
return dags_collection_schema.dump(DAGCollection(dags=dags, total_entries=total_entries))
except ValueError as e:
raise BadRequest("DAGCollectionSchema error", detail=str(e))


@security.requires_access_dag("PUT")
Expand Down
25 changes: 21 additions & 4 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from __future__ import annotations

from http import HTTPStatus
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Collection

import pendulum
from connexion import NoContent
Expand All @@ -41,6 +41,8 @@
)
from airflow.api_connexion.schemas.dag_run_schema import (
DAGRunCollection,
DAGRunCollectionSchema,
DAGRunSchema,
clear_dagrun_form_schema,
dagrun_collection_schema,
dagrun_schema,
Expand Down Expand Up @@ -91,15 +93,23 @@ def delete_dag_run(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSI

@security.requires_access_dag("GET", DagAccessEntity.RUN)
@provide_session
def get_dag_run(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSION) -> APIResponse:
def get_dag_run(
*, dag_id: str, dag_run_id: str, fields: Collection[str] | None = None, session: Session = NEW_SESSION
) -> APIResponse:
"""Get a DAG Run."""
dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id))
if dag_run is None:
raise NotFound(
"DAGRun not found",
detail=f"DAGRun with DAG ID: '{dag_id}' and DagRun ID: '{dag_run_id}' not found",
)
return dagrun_schema.dump(dag_run)
try:
# parse fields to Schema @post_dump
dagrun_schema = DAGRunSchema(context={"fields": fields}) if fields else DAGRunSchema()
return dagrun_schema.dump(dag_run)
except ValueError as e:
# Invalid fields
raise BadRequest("DAGRunSchema error", detail=str(e))


@security.requires_access_dag("GET", DagAccessEntity.RUN)
Expand Down Expand Up @@ -210,6 +220,7 @@ def get_dag_runs(
offset: int | None = None,
limit: int | None = None,
order_by: str = "id",
fields: Collection[str] | None = None,
session: Session = NEW_SESSION,
):
"""Get all DAG Runs."""
Expand Down Expand Up @@ -241,7 +252,13 @@ def get_dag_runs(
order_by=order_by,
session=session,
)
return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run, total_entries=total_entries))
try:
dagrun_collection_schema = (
DAGRunCollectionSchema(context={"fields": fields}) if fields else DAGRunCollectionSchema()
)
return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run, total_entries=total_entries))
except ValueError as e:
raise BadRequest("DAGRunCollectionSchema error", detail=str(e))


@security.requires_access_dag("GET", DagAccessEntity.RUN)
Expand Down
23 changes: 23 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ paths:
- $ref: "#/components/parameters/FilterTags"
- $ref: "#/components/parameters/OnlyActive"
- $ref: "#/components/parameters/Paused"
- $ref: "#/components/parameters/ReturnFields"
- name: dag_id_pattern
in: query
schema:
Expand Down Expand Up @@ -497,6 +498,8 @@ paths:
x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
operationId: get_dag
tags: [DAG]
parameters:
- $ref: "#/components/parameters/ReturnFields"
responses:
"200":
description: Success.
Expand Down Expand Up @@ -733,6 +736,7 @@ paths:
- $ref: "#/components/parameters/FilterUpdatedAtLTE"
- $ref: "#/components/parameters/FilterState"
- $ref: "#/components/parameters/OrderBy"
- $ref: "#/components/parameters/ReturnFields"
responses:
"200":
description: List of DAG runs.
Expand Down Expand Up @@ -814,6 +818,8 @@ paths:
x-openapi-router-controller: airflow.api_connexion.endpoints.dag_run_endpoint
operationId: get_dag_run
tags: [DAGRun]
parameters:
- $ref: "#/components/parameters/ReturnFields"
responses:
"200":
description: Success.
Expand Down Expand Up @@ -1755,6 +1761,8 @@ paths:
The response contains many DAG attributes, so the response can be large.
If possible, consider using GET /dags/{dag_id}.
tags: [DAG]
parameters:
- $ref: "#/components/parameters/ReturnFields"
responses:
"200":
description: Success.
Expand Down Expand Up @@ -3571,15 +3579,19 @@ components:
properties:
timezone:
$ref: "#/components/schemas/Timezone"
nullable: true
catchup:
type: boolean
readOnly: true
nullable: true
orientation:
type: string
readOnly: true
nullable: true
concurrency:
type: number
readOnly: true
nullable: true
start_date:
type: string
format: "date-time"
Expand All @@ -3591,6 +3603,7 @@ components:
*Changed in version 2.0.1*: Field becomes nullable.
dag_run_timeout:
$ref: "#/components/schemas/TimeDelta"
nullable: true
doc_md:
type: string
readOnly: true
Expand Down Expand Up @@ -5253,6 +5266,16 @@ components:
style: form
explode: false

ReturnFields:
in: query
name: fields
schema:
type: array
items:
type: string
description: |
List of field for return.

# Reusable request bodies
requestBodies: {}

Expand Down
12 changes: 11 additions & 1 deletion airflow/api_connexion/schemas/dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,18 @@ def autogenerate(self, data, **kwargs):
@post_dump
def autofill(self, data, **kwargs):
"""Populate execution_date from logical_date for compatibility."""
ret_data = {}
data["execution_date"] = data["logical_date"]
return data
if self.context.get("fields"):
ret_fields = self.context.get("fields")
for ret_field in ret_fields:
if ret_field not in data:
raise ValueError(f"{ret_field} not in DAGRunSchema")
ret_data[ret_field] = data[ret_field]
else:
ret_data = data

return ret_data


class SetDagRunStateFormSchema(Schema):
Expand Down
37 changes: 29 additions & 8 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1470,18 +1470,18 @@ export interface components {
* [airflow.models.dag.DAG](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG)
*/
DAGDetail: components["schemas"]["DAG"] & {
timezone?: components["schemas"]["Timezone"];
catchup?: boolean;
orientation?: string;
concurrency?: number;
timezone?: components["schemas"]["Timezone"] | null;
catchup?: boolean | null;
orientation?: string | null;
concurrency?: number | null;
/**
* Format: date-time
* @description The DAG's start date.
*
* *Changed in version 2.0.1*: Field becomes nullable.
*/
start_date?: string | null;
dag_run_timeout?: components["schemas"]["TimeDelta"];
dag_run_timeout?: components["schemas"]["TimeDelta"] | null;
doc_md?: string | null;
default_view?: string | null;
/**
Expand Down Expand Up @@ -2472,6 +2472,8 @@ export interface components {
* A comma-separated list of fully qualified names of fields.
*/
UpdateMask: string[];
/** @description List of field for return. */
ReturnFields: string[];
};
requestBodies: {};
headers: {};
Expand Down Expand Up @@ -2659,6 +2661,8 @@ export interface operations {
* *New in version 2.6.0*
*/
paused?: components["parameters"]["Paused"];
/** List of field for return. */
fields?: components["parameters"]["ReturnFields"];
/** If set, only return DAGs with dag_ids matching this pattern. */
dag_id_pattern?: string;
};
Expand Down Expand Up @@ -2733,6 +2737,10 @@ export interface operations {
/** The DAG ID. */
dag_id: components["parameters"]["DAGID"];
};
query: {
/** List of field for return. */
fields?: components["parameters"]["ReturnFields"];
};
};
responses: {
/** Success. */
Expand Down Expand Up @@ -2997,6 +3005,8 @@ export interface operations {
* *New in version 2.1.0*
*/
order_by?: components["parameters"]["OrderBy"];
/** List of field for return. */
fields?: components["parameters"]["ReturnFields"];
};
};
responses: {
Expand Down Expand Up @@ -3063,6 +3073,10 @@ export interface operations {
/** The DAG run ID. */
dag_run_id: components["parameters"]["DAGRunID"];
};
query: {
/** List of field for return. */
fields?: components["parameters"]["ReturnFields"];
};
};
responses: {
/** Success. */
Expand Down Expand Up @@ -4098,6 +4112,10 @@ export interface operations {
/** The DAG ID. */
dag_id: components["parameters"]["DAGID"];
};
query: {
/** List of field for return. */
fields?: components["parameters"]["ReturnFields"];
};
};
responses: {
/** Success. */
Expand Down Expand Up @@ -4988,7 +5006,8 @@ export type PatchDagsVariables = CamelCasedPropertiesDeep<
operations["patch_dags"]["requestBody"]["content"]["application/json"]
>;
export type GetDagVariables = CamelCasedPropertiesDeep<
operations["get_dag"]["parameters"]["path"]
operations["get_dag"]["parameters"]["path"] &
operations["get_dag"]["parameters"]["query"]
>;
export type DeleteDagVariables = CamelCasedPropertiesDeep<
operations["delete_dag"]["parameters"]["path"]
Expand Down Expand Up @@ -5026,7 +5045,8 @@ export type GetDagRunsBatchVariables = CamelCasedPropertiesDeep<
operations["get_dag_runs_batch"]["requestBody"]["content"]["application/json"]
>;
export type GetDagRunVariables = CamelCasedPropertiesDeep<
operations["get_dag_run"]["parameters"]["path"]
operations["get_dag_run"]["parameters"]["path"] &
operations["get_dag_run"]["parameters"]["query"]
>;
export type DeleteDagRunVariables = CamelCasedPropertiesDeep<
operations["delete_dag_run"]["parameters"]["path"]
Expand Down Expand Up @@ -5133,7 +5153,8 @@ export type GetLogVariables = CamelCasedPropertiesDeep<
operations["get_log"]["parameters"]["query"]
>;
export type GetDagDetailsVariables = CamelCasedPropertiesDeep<
operations["get_dag_details"]["parameters"]["path"]
operations["get_dag_details"]["parameters"]["path"] &
operations["get_dag_details"]["parameters"]["query"]
>;
export type GetTasksVariables = CamelCasedPropertiesDeep<
operations["get_tasks"]["parameters"]["path"] &
Expand Down
Loading