Skip to content

Commit

Permalink
feat: add dag_stats rest api endpoint (apache#41017)
Browse files Browse the repository at this point in the history
  • Loading branch information
dondaum authored and molcay committed Aug 19, 2024
1 parent 8c6557a commit 6141d15
Show file tree
Hide file tree
Showing 6 changed files with 498 additions and 0 deletions.
62 changes: 62 additions & 0 deletions airflow/api_connexion/endpoints/dag_stats_endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

from flask import g
from sqlalchemy import func, select

from airflow.api_connexion import security
from airflow.api_connexion.schemas.dag_stats_schema import (
dag_stats_collection_schema,
)
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.models.dag import DagRun
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState
from airflow.www.extensions.init_auth_manager import get_auth_manager

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.api_connexion.types import APIResponse


@security.requires_access_dag("GET", DagAccessEntity.RUN)
@provide_session
def get_dag_stats(*, dag_ids: str, session: Session = NEW_SESSION) -> APIResponse:
"""Get Dag statistics."""
allowed_dag_ids = get_auth_manager().get_permitted_dag_ids(methods=["GET"], user=g.user)
dags_list = set(dag_ids.split(","))
filter_dag_ids = dags_list.intersection(allowed_dag_ids)

query = (
select(DagRun.dag_id, DagRun.state, func.count(DagRun.state))
.group_by(DagRun.dag_id, DagRun.state)
.where(DagRun.dag_id.in_(filter_dag_ids))
)
dag_state_stats = session.execute(query)

dag_state_data = {(dag_id, state): count for dag_id, state, count in dag_state_stats}
dag_stats = {
dag_id: [{"state": state, "count": dag_state_data.get((dag_id, state), 0)} for state in DagRunState]
for dag_id in filter_dag_ids
}

dags = [{"dag_id": stat, "stats": dag_stats[stat]} for stat in dag_stats]
return dag_stats_collection_schema.dump({"dags": dags, "total_entries": len(dag_stats)})
66 changes: 66 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2225,6 +2225,32 @@ paths:
"404":
$ref: "#/components/responses/NotFound"

/dagStats:
get:
summary: List Dag statistics
x-openapi-router-controller: airflow.api_connexion.endpoints.dag_stats_endpoint
operationId: get_dag_stats
tags: [DagStats]
parameters:
- name: dag_ids
in: query
schema:
type: string
required: true
description: |
One or more DAG IDs separated by commas to filter relevant Dags.
responses:
"200":
description: Success.
content:
application/json:
schema:
$ref: "#/components/schemas/DagStatsCollectionSchema"
"401":
$ref: "#/components/responses/Unauthenticated"
"403":
$ref: "#/components/responses/PermissionDenied"

/dagSources/{file_token}:
parameters:
- $ref: "#/components/parameters/FileToken"
Expand Down Expand Up @@ -3424,6 +3450,46 @@ components:
$ref: "#/components/schemas/DAGRun"
- $ref: "#/components/schemas/CollectionInfo"

DagStatsCollectionSchema:
type: object
description: |
Collection of Dag statistics.
allOf:
- type: object
properties:
dags:
type: array
items:
$ref: "#/components/schemas/DagStatsCollectionItem"
- $ref: "#/components/schemas/CollectionInfo"

DagStatsCollectionItem:
description: DagStats entry collection item.

type: object
properties:
dag_id:
type: string
description: The DAG ID.
stats:
type: array
nullable: true
items:
$ref: "#/components/schemas/DagStatsStateCollectionItem"

DagStatsStateCollectionItem:
description: DagStatsState entry collection item.

type: object
properties:
state:
type: string
description: The DAG state.
count:
type: integer
description: The DAG state count.

DagWarning:
type: object
properties:
Expand Down
47 changes: 47 additions & 0 deletions airflow/api_connexion/schemas/dag_stats_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from marshmallow import Schema, fields

from airflow.api_connexion.schemas.enum_schemas import DagStateField


class DagStatsStateSchema(Schema):
"""DagStatsState Schema."""

state = DagStateField(dump_only=True)
count = fields.Int(dump_only=True)


class DagStatsSchema(Schema):
"""DagStats Schema."""

dag_id = fields.String(required=True)
stats = fields.List(fields.Nested(DagStatsStateSchema))


class DagStatsCollectionSchema(Schema):
"""DagStassCollection Schema."""

dags = fields.List(fields.Nested(DagStatsSchema))
total_entries = fields.Int()


dag_stats_state_schema = DagStatsStateSchema()
dag_stats_schema = DagStatsSchema()
dag_stats_collection_schema = DagStatsCollectionSchema()
50 changes: 50 additions & 0 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,9 @@ export interface paths {
};
};
};
"/dagStats": {
get: operations["get_dag_stats"];
};
"/dagSources/{file_token}": {
/** Get a source code using file token. */
get: operations["get_dag_source"];
Expand Down Expand Up @@ -1264,6 +1267,23 @@ export interface components {
DAGRunCollection: {
dag_runs?: components["schemas"]["DAGRun"][];
} & components["schemas"]["CollectionInfo"];
/** @description Collection of Dag statistics. */
DagStatsCollectionSchema: {
dags?: components["schemas"]["DagStatsCollectionItem"][];
} & components["schemas"]["CollectionInfo"];
/** @description DagStats entry collection item. */
DagStatsCollectionItem: {
/** @description The DAG ID. */
dag_id?: string;
stats?: components["schemas"]["DagStatsStateCollectionItem"][] | null;
};
/** @description DagStatsState entry collection item. */
DagStatsStateCollectionItem: {
/** @description The DAG state. */
state?: string;
/** @description The DAG state count. */
count?: number;
};
DagWarning: {
/** @description The dag_id. */
dag_id?: string;
Expand Down Expand Up @@ -4820,6 +4840,24 @@ export interface operations {
404: components["responses"]["NotFound"];
};
};
get_dag_stats: {
parameters: {
query: {
/** One or more DAG IDs separated by commas to filter relevant Dags. */
dag_ids: string;
};
};
responses: {
/** Success. */
200: {
content: {
"application/json": components["schemas"]["DagStatsCollectionSchema"];
};
};
401: components["responses"]["Unauthenticated"];
403: components["responses"]["PermissionDenied"];
};
};
/** Get a source code using file token. */
get_dag_source: {
parameters: {
Expand Down Expand Up @@ -5433,6 +5471,15 @@ export type UpdateDagRunState = CamelCasedPropertiesDeep<
export type DAGRunCollection = CamelCasedPropertiesDeep<
components["schemas"]["DAGRunCollection"]
>;
export type DagStatsCollectionSchema = CamelCasedPropertiesDeep<
components["schemas"]["DagStatsCollectionSchema"]
>;
export type DagStatsCollectionItem = CamelCasedPropertiesDeep<
components["schemas"]["DagStatsCollectionItem"]
>;
export type DagStatsStateCollectionItem = CamelCasedPropertiesDeep<
components["schemas"]["DagStatsStateCollectionItem"]
>;
export type DagWarning = CamelCasedPropertiesDeep<
components["schemas"]["DagWarning"]
>;
Expand Down Expand Up @@ -5894,6 +5941,9 @@ export type GetTasksVariables = CamelCasedPropertiesDeep<
export type GetTaskVariables = CamelCasedPropertiesDeep<
operations["get_task"]["parameters"]["path"]
>;
export type GetDagStatsVariables = CamelCasedPropertiesDeep<
operations["get_dag_stats"]["parameters"]["query"]
>;
export type GetDagSourceVariables = CamelCasedPropertiesDeep<
operations["get_dag_source"]["parameters"]["path"]
>;
Expand Down
Loading

0 comments on commit 6141d15

Please sign in to comment.