Skip to content

Commit

Permalink
Add REST API endpoint for bulk update of DAGs (#19758)
Browse files Browse the repository at this point in the history
Added endpoint for bulk update of DAGs in the airflow stable API
  • Loading branch information
megan-parker authored Mar 14, 2022
1 parent 3931394 commit 408a7d6
Show file tree
Hide file tree
Showing 3 changed files with 533 additions and 17 deletions.
58 changes: 50 additions & 8 deletions airflow/api_connexion/endpoints/dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def get_dags(
cond = [DagModel.tags.any(DagTag.name == tag) for tag in tags]
dags_query = dags_query.filter(or_(*cond))

total_entries = len(dags_query.all())
total_entries = dags_query.count()

dags = dags_query.order_by(DagModel.dag_id).offset(offset).limit(limit).all()

Expand All @@ -100,25 +100,67 @@ def get_dags(
@provide_session
def patch_dag(*, dag_id: str, update_mask: UpdateMask = None, session: Session = NEW_SESSION) -> APIResponse:
"""Update the specific DAG"""
try:
patch_body = dag_schema.load(request.json, session=session)
except ValidationError as err:
raise BadRequest(detail=str(err.messages))
if update_mask:
patch_body_ = {}
if update_mask != ['is_paused']:
raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
patch_body_[update_mask[0]] = patch_body[update_mask[0]]
patch_body = patch_body_
dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).one_or_none()
if not dag:
raise NotFound(f"Dag with id: '{dag_id}' not found")
dag.is_paused = patch_body['is_paused']
session.flush()
return dag_schema.dump(dag)


@security.requires_access([(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG)])
@format_parameters({'limit': check_limit})
@provide_session
def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pattern=None, update_mask=None):
"""Patch multiple DAGs."""
try:
patch_body = dag_schema.load(request.json, session=session)
except ValidationError as err:
raise BadRequest("Invalid Dag schema", detail=str(err.messages))
raise BadRequest(detail=str(err.messages))
if update_mask:
patch_body_ = {}
if len(update_mask) > 1:
if update_mask != ['is_paused']:
raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
update_mask = update_mask[0]
if update_mask != 'is_paused':
raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
patch_body_[update_mask] = patch_body[update_mask]
patch_body = patch_body_
setattr(dag, 'is_paused', patch_body['is_paused'])
session.commit()
return dag_schema.dump(dag)
if only_active:
dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
else:
dags_query = session.query(DagModel).filter(~DagModel.is_subdag)

if dag_id_pattern == '~':
dag_id_pattern = '%'
dags_query = dags_query.filter(DagModel.dag_id.ilike(f'%{dag_id_pattern}%'))
editable_dags = current_app.appbuilder.sm.get_editable_dag_ids(g.user)

dags_query = dags_query.filter(DagModel.dag_id.in_(editable_dags))
if tags:
cond = [DagModel.tags.any(DagTag.name == tag) for tag in tags]
dags_query = dags_query.filter(or_(*cond))

total_entries = dags_query.count()

dags = dags_query.order_by(DagModel.dag_id).offset(offset).limit(limit).all()

dags_to_update = {dag.dag_id for dag in dags}
session.query(DagModel).filter(DagModel.dag_id.in_(dags_to_update)).update(
{DagModel.is_paused: patch_body['is_paused']}, synchronize_session='fetch'
)

session.flush()

return dags_collection_schema.dump(DAGCollection(dags=dags, total_entries=total_entries))


@security.requires_access([(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG)])
Expand Down
69 changes: 60 additions & 9 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,10 @@ paths:
/dags:
get:
summary: List DAGs
description: >
List DAGs in the database.
`dag_id_pattern` can be set to match dags of a specific pattern
x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
operationId: get_dags
tags: [DAG]
Expand All @@ -404,25 +408,56 @@ paths:
- $ref: '#/components/parameters/PageOffset'
- $ref: '#/components/parameters/OrderBy'
- $ref: '#/components/parameters/FilterTags'
- name: only_active
- $ref: '#/components/parameters/OnlyActive'
- name: dag_id_pattern
in: query
schema:
type: boolean
default: true
type: string
required: false
description: |
Only return active DAGs.
If set, only return DAGs with dag_ids matching this pattern.
responses:
'200':
description: Success.
content:
application/json:
schema:
$ref: '#/components/schemas/DAGCollection'
'401':
$ref: '#/components/responses/Unauthenticated'

*New in version 2.1.1*
patch:
summary: Update DAGs
description: >
Update DAGs of a given dag_id_pattern using UpdateMask.
This endpoint allows specifying `~` as the dag_id_pattern to update all DAGs.
*New in version 2.3.0*
x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
operationId: patch_dags
parameters:
- $ref: '#/components/parameters/PageLimit'
- $ref: '#/components/parameters/PageOffset'
- $ref: '#/components/parameters/FilterTags'
- $ref: '#/components/parameters/UpdateMask'
- $ref: '#/components/parameters/OnlyActive'
- name: dag_id_pattern
in: query
schema:
type: string
required: false
required: true
description: |
If set, only return DAGs with dag_ids matching this pattern.
*New in version 2.3.0*
If set, only update DAGs with dag_ids matching this pattern.
tags: [DAG]
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/DAG'
example:
is_paused: true
responses:
'200':
description: Success.
Expand All @@ -432,6 +467,10 @@ paths:
$ref: '#/components/schemas/DAGCollection'
'401':
$ref: '#/components/responses/Unauthenticated'
'403':
$ref: '#/components/responses/PermissionDenied'
'404':
$ref: '#/components/responses/NotFound'

/dags/{dag_id}:
parameters:
Expand Down Expand Up @@ -3804,6 +3843,18 @@ components:
*New in version 2.1.0*
OnlyActive:
in: query
name: only_active
schema:
type: boolean
default: true
required: false
description: |
Only filter active DAGs.
*New in version 2.1.1*
# Other parameters

FileToken:
Expand Down
Loading

0 comments on commit 408a7d6

Please sign in to comment.