Skip to content

Commit

Permalink
AIP-84 Migrate Clear Dag Run public endpoint to FastAPI (#42975)
Browse files Browse the repository at this point in the history
* add clear_dag_run

* add tests

* Merge branch 'main' of https://github.com/apache/airflow into kalyan/API-84/clear_dag_run

* add ti response

* add

* use logical_date

* fix tests

* remove async

* Update airflow/api_fastapi/core_api/routes/public/dag_run.py

Co-authored-by: Pierre Jeambrun <[email protected]>

* Update airflow/api_fastapi/core_api/datamodels/dag_run.py

Co-authored-by: Pierre Jeambrun <[email protected]>

* remove type ignore

* update ti state and assert it

* reuse state

* remove breakpoint

* feedback

---------

Co-authored-by: Pierre Jeambrun <[email protected]>
  • Loading branch information
rawwar and pierrejeambrun authored Nov 15, 2024
1 parent 19303ca commit c3aabba
Show file tree
Hide file tree
Showing 11 changed files with 351 additions and 3 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ def update_dag_run_state(*, dag_id: str, dag_run_id: str, session: Session = NEW
return dagrun_schema.dump(dag_run)


@mark_fastapi_migration_done
@security.requires_access_dag("PUT", DagAccessEntity.RUN)
@action_logging
@provide_session
Expand Down
6 changes: 6 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ class DAGRunPatchBody(BaseModel):
note: str | None = Field(None, max_length=1000)


class DAGRunClearBody(BaseModel):
"""DAG Run serializer for clear endpoint body."""

dry_run: bool = True


class DAGRunResponse(BaseModel):
"""DAG Run serializer for responses."""

Expand Down
68 changes: 68 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,65 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/clear:
post:
tags:
- DagRun
summary: Clear Dag Run
operationId: clear_dag_run
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRunClearBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
anyOf:
- $ref: '#/components/schemas/TaskInstanceCollectionResponse'
- $ref: '#/components/schemas/DAGRunResponse'
title: Response Clear Dag Run
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dagSources/{file_token}:
get:
tags:
Expand Down Expand Up @@ -4383,6 +4442,15 @@ components:
- file_token
title: DAGResponse
description: DAG serializer for responses.
DAGRunClearBody:
properties:
dry_run:
type: boolean
title: Dry Run
default: true
type: object
title: DAGRunClearBody
description: DAG Run serializer for clear endpoint body.
DAGRunPatchBody:
properties:
state:
Expand Down
53 changes: 53 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@
from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.dag_run import (
DAGRunClearBody,
DAGRunPatchBody,
DAGRunPatchStates,
DAGRunResponse,
)
from airflow.api_fastapi.core_api.datamodels.task_instances import (
TaskInstanceCollectionResponse,
TaskInstanceResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models import DAG, DagRun

Expand Down Expand Up @@ -142,3 +147,51 @@ def patch_dag_run(
dag_run = session.get(DagRun, dag_run.id)

return DAGRunResponse.model_validate(dag_run, from_attributes=True)


@dag_run_router.post(
"/{dag_run_id}/clear", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND])
)
def clear_dag_run(
dag_id: str,
dag_run_id: str,
body: DAGRunClearBody,
request: Request,
session: Annotated[Session, Depends(get_session)],
) -> TaskInstanceCollectionResponse | DAGRunResponse:
dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id))
if dag_run is None:
raise HTTPException(
404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found"
)

dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
start_date = dag_run.logical_date
end_date = dag_run.logical_date

if body.dry_run:
task_instances = dag.clear(
start_date=start_date,
end_date=end_date,
task_ids=None,
only_failed=False,
dry_run=True,
session=session,
)

return TaskInstanceCollectionResponse(
task_instances=[
TaskInstanceResponse.model_validate(ti, from_attributes=True) for ti in task_instances
],
total_entries=len(task_instances),
)
else:
dag.clear(
start_date=dag_run.start_date,
end_date=dag_run.end_date,
task_ids=None,
only_failed=False,
session=session,
)
dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == dag_run.id))
return DAGRunResponse.model_validate(dag_run_cleared, from_attributes=True)
36 changes: 35 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,40 @@ def set_task_group_state(

return altered

@overload
def clear(
self,
*,
dry_run: Literal[True],
task_ids: Collection[str | tuple[str, int]] | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
only_failed: bool = False,
only_running: bool = False,
confirm_prompt: bool = False,
dag_run_state: DagRunState = DagRunState.QUEUED,
session: Session = NEW_SESSION,
dag_bag: DagBag | None = None,
exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(),
) -> list[TaskInstance]: ... # pragma: no cover

@overload
def clear(
self,
*,
task_ids: Collection[str | tuple[str, int]] | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
only_failed: bool = False,
only_running: bool = False,
confirm_prompt: bool = False,
dag_run_state: DagRunState = DagRunState.QUEUED,
dry_run: Literal[False] = False,
session: Session = NEW_SESSION,
dag_bag: DagBag | None = None,
exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(),
) -> int: ... # pragma: no cover

@provide_session
def clear(
self,
Expand All @@ -1418,7 +1452,7 @@ def clear(
session: Session = NEW_SESSION,
dag_bag: DagBag | None = None,
exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(),
) -> int | Iterable[TaskInstance]:
) -> int | list[TaskInstance]:
"""
Clear a set of task instances associated with the current dag for a specified date range.
Expand Down
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,9 @@ export type ConnectionServicePostConnectionMutationResult = Awaited<
export type ConnectionServiceTestConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.testConnection>
>;
export type DagRunServiceClearDagRunMutationResult = Awaited<
ReturnType<typeof DagRunService.clearDagRun>
>;
export type PoolServicePostPoolMutationResult = Awaited<
ReturnType<typeof PoolService.postPool>
>;
Expand Down
47 changes: 47 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
BackfillPostBody,
ConnectionBody,
DAGPatchBody,
DAGRunClearBody,
DAGRunPatchBody,
DagRunState,
DagWarningType,
Expand Down Expand Up @@ -1814,6 +1815,52 @@ export const useConnectionServiceTestConnection = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Clear Dag Run
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.requestBody
* @returns unknown Successful Response
* @throws ApiError
*/
export const useDagRunServiceClearDagRun = <
TData = Common.DagRunServiceClearDagRunMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
dagId: string;
dagRunId: string;
requestBody: DAGRunClearBody;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
dagId: string;
dagRunId: string;
requestBody: DAGRunClearBody;
},
TContext
>({
mutationFn: ({ dagId, dagRunId, requestBody }) =>
DagRunService.clearDagRun({
dagId,
dagRunId,
requestBody,
}) as unknown as Promise<TData>,
...options,
});
/**
* Post Pool
* Create a Pool.
Expand Down
13 changes: 13 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1303,6 +1303,19 @@ export const $DAGResponse = {
description: "DAG serializer for responses.",
} as const;

export const $DAGRunClearBody = {
properties: {
dry_run: {
type: "boolean",
title: "Dry Run",
default: true,
},
},
type: "object",
title: "DAGRunClearBody",
description: "DAG Run serializer for clear endpoint body.",
} as const;

export const $DAGRunPatchBody = {
properties: {
state: {
Expand Down
32 changes: 32 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import type {
DeleteDagRunResponse,
PatchDagRunData,
PatchDagRunResponse,
ClearDagRunData,
ClearDagRunResponse,
GetDagSourceData,
GetDagSourceResponse,
GetDagStatsData,
Expand Down Expand Up @@ -735,6 +737,36 @@ export class DagRunService {
},
});
}

/**
* Clear Dag Run
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.requestBody
* @returns unknown Successful Response
* @throws ApiError
*/
public static clearDagRun(
data: ClearDagRunData,
): CancelablePromise<ClearDagRunResponse> {
return __request(OpenAPI, {
method: "POST",
url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/clear",
path: {
dag_id: data.dagId,
dag_run_id: data.dagRunId,
},
body: data.requestBody,
mediaType: "application/json",
errors: {
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Validation Error",
},
});
}
}

export class DagSourceService {
Expand Down
Loading

0 comments on commit c3aabba

Please sign in to comment.