Skip to content

Commit

Permalink
AIP-84: Migrating GET queued asset events for DAG to fastAPI (apache#…
Browse files Browse the repository at this point in the history
…43934)

* AIP-84: Migrating GET queued asset events for DAG to fastAPI

* adding test cases

* adding setup and teardown

* review comments part 1

* introducing _generate_queued_event_where_clause

* changing to bad request

* adding paginated_select

* moving _generate_queued_event_where_clause to assets.py

* moving datamodels to assets

* moving tests to assets

* reuse fixtures from TestGetAssets and inherit classes to simplify

* Move route to assets module

* Small adjustments

---------

Co-authored-by: pierrejeambrun <[email protected]>
  • Loading branch information
2 people authored and Lefteris Gilmaz committed Jan 5, 2025
1 parent a417d0f commit eb7d1ad
Show file tree
Hide file tree
Showing 13 changed files with 507 additions and 17 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ def delete_dag_asset_queued_event(
)


@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@security.requires_access_dag("GET")
@provide_session
Expand Down
26 changes: 24 additions & 2 deletions airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from abc import ABC, abstractmethod
from datetime import datetime
from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List, Optional, TypeVar
from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List, Optional, TypeVar, Union, overload

from fastapi import Depends, HTTPException, Query
from pendulum.parsing.exceptions import ParserError
Expand Down Expand Up @@ -409,6 +409,27 @@ def _safe_parse_datetime(date_to_check: str) -> datetime:
"""
if not date_to_check:
raise ValueError(f"{date_to_check} cannot be None.")
return _safe_parse_datetime_optional(date_to_check)


@overload
def _safe_parse_datetime_optional(date_to_check: str) -> datetime: ...


@overload
def _safe_parse_datetime_optional(date_to_check: None) -> None: ...


def _safe_parse_datetime_optional(date_to_check: str | None) -> datetime | None:
"""
Parse datetime and raise error for invalid dates.
Allow None values.
:param date_to_check: the string value to be parsed
"""
if date_to_check is None:
return None
try:
return timezone.parse(date_to_check, strict=True)
except (TypeError, ParserError):
Expand Down Expand Up @@ -614,7 +635,8 @@ def depends_float(


# Common Safe DateTime
DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)]
DateTimeQuery = Annotated[datetime, AfterValidator(_safe_parse_datetime)]
OptionalDateTimeQuery = Annotated[Union[datetime, None], AfterValidator(_safe_parse_datetime_optional)]

# DAG
QueryLimit = Annotated[LimitFilter, Depends(LimitFilter().depends)]
Expand Down
15 changes: 15 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,21 @@ class AssetEventCollectionResponse(BaseModel):
total_entries: int


class QueuedEventResponse(BaseModel):
"""Queued Event serializer for responses.."""

uri: str
dag_id: str
created_at: datetime


class QueuedEventCollectionResponse(BaseModel):
"""Queued Event Collection serializer for responses."""

queued_events: list[QueuedEventResponse]
total_entries: int


class CreateAssetEventsBody(BaseModel):
"""Create asset events request."""

Expand Down
102 changes: 97 additions & 5 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ paths:
required: true
schema:
type: string
format: date-time
title: Start Date
- name: end_date
in: query
required: true
schema:
type: string
format: date-time
title: End Date
responses:
'200':
Expand Down Expand Up @@ -170,7 +172,7 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets/:
/public/assets:
get:
tags:
- Asset
Expand Down Expand Up @@ -346,18 +348,19 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/events:
post:
tags:
- Asset
summary: Create Asset Event
description: Create asset events.
operationId: create_asset_event
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/CreateAssetEventsBody'
required: true
responses:
'200':
description: Successful Response
Expand All @@ -366,23 +369,23 @@ paths:
schema:
$ref: '#/components/schemas/AssetEventResponse'
'401':
description: Unauthorized
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
description: Forbidden
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
description: Not Found
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -434,6 +437,60 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/assets/queuedEvent:
get:
tags:
- Asset
summary: Get Dag Asset Queued Events
description: Get queued asset events for a DAG.
operationId: get_dag_asset_queued_events
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: before
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Before
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/QueuedEventCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/backfills/:
get:
tags:
Expand Down Expand Up @@ -5730,6 +5787,41 @@ components:
- version
title: ProviderResponse
description: Provider serializer for responses.
QueuedEventCollectionResponse:
properties:
queued_events:
items:
$ref: '#/components/schemas/QueuedEventResponse'
type: array
title: Queued Events
total_entries:
type: integer
title: Total Entries
type: object
required:
- queued_events
- total_entries
title: QueuedEventCollectionResponse
description: Queued Event Collection serializer for responses.
QueuedEventResponse:
properties:
uri:
type: string
title: Uri
dag_id:
type: string
title: Dag Id
created_at:
type: string
format: date-time
title: Created At
type: object
required:
- uri
- dag_id
- created_at
title: QueuedEventResponse
description: Queued Event serializer for responses..
ReprocessBehavior:
type: string
enum:
Expand Down
79 changes: 74 additions & 5 deletions airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from __future__ import annotations

from datetime import datetime
from typing import Annotated

from fastapi import Depends, HTTPException, status
Expand All @@ -25,6 +26,7 @@

from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import (
OptionalDateTimeQuery,
QueryAssetDagIdPatternSearch,
QueryAssetIdFilter,
QueryLimit,
Expand All @@ -43,18 +45,41 @@
AssetEventResponse,
AssetResponse,
CreateAssetEventsBody,
QueuedEventCollectionResponse,
QueuedEventResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.assets import Asset
from airflow.assets.manager import asset_manager
from airflow.models.asset import AssetEvent, AssetModel
from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
from airflow.utils import timezone

assets_router = AirflowRouter(tags=["Asset"], prefix="/assets")
assets_router = AirflowRouter(tags=["Asset"])


def _generate_queued_event_where_clause(
*,
dag_id: str | None = None,
uri: str | None = None,
before: datetime | None = None,
) -> list:
"""Get AssetDagRunQueue where clause."""
where_clause = []
if dag_id is not None:
where_clause.append(AssetDagRunQueue.target_dag_id == dag_id)
if uri is not None:
where_clause.append(
AssetDagRunQueue.asset_id.in_(
select(AssetModel.id).where(AssetModel.uri == uri),
),
)
if before is not None:
where_clause.append(AssetDagRunQueue.created_at < before)
return where_clause


@assets_router.get(
"/",
"/assets",
responses=create_openapi_http_exception_doc([401, 403, 404]),
)
def get_assets(
Expand Down Expand Up @@ -89,7 +114,7 @@ def get_assets(


@assets_router.get(
"/events",
"/assets/events",
responses=create_openapi_http_exception_doc([404]),
)
def get_asset_events(
Expand Down Expand Up @@ -165,7 +190,7 @@ def create_asset_event(


@assets_router.get(
"/{uri:path}",
"/assets/{uri:path}",
responses=create_openapi_http_exception_doc([401, 403, 404]),
)
def get_asset(
Expand All @@ -183,3 +208,47 @@ def get_asset(
raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri: `{uri}` was not found")

return AssetResponse.model_validate(asset, from_attributes=True)


@assets_router.get(
"/dags/{dag_id}/assets/queuedEvent",
responses=create_openapi_http_exception_doc(
[
status.HTTP_404_NOT_FOUND,
]
),
)
def get_dag_asset_queued_events(
dag_id: str,
session: Annotated[Session, Depends(get_session)],
before: OptionalDateTimeQuery = None,
) -> QueuedEventCollectionResponse:
"""Get queued asset events for a DAG."""
where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before)
query = (
select(AssetDagRunQueue, AssetModel.uri)
.join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
.where(*where_clause)
)

dag_asset_queued_events_select, total_entries = paginated_select(
query,
[],
)
adrqs = session.execute(dag_asset_queued_events_select).all()

if not adrqs:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found")

queued_events = [
QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri)
for adrq, uri in adrqs
]

return QueuedEventCollectionResponse(
queued_events=[
QueuedEventResponse.model_validate(queued_event, from_attributes=True)
for queued_event in queued_events
],
total_entries=total_entries,
)
Loading

0 comments on commit eb7d1ad

Please sign in to comment.