Skip to content

Commit

Permalink
Require can_edit on DAG privileges to modify TaskInstances and DagRuns (
Browse files Browse the repository at this point in the history
apache#16634)

(cherry picked from commit f74d0ab)
(cherry picked from commit e9f0f90)
  • Loading branch information
Jorricks authored and jedcunningham committed Jan 13, 2022
1 parent a1d6146 commit 1609049
Show file tree
Hide file tree
Showing 10 changed files with 353 additions and 57 deletions.
1 change: 1 addition & 0 deletions INTHEWILD.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Currently, **officially** using Airflow:
1. [99](https://99taxis.com) [[@fbenevides](https://github.com/fbenevides), [@gustavoamigo](https://github.com/gustavoamigo) & [@mmmaia](https://github.com/mmmaia)]
1. [AdBOOST](https://www.adboost.sk) [[AdBOOST](https://github.com/AdBOOST)]
1. [Adobe](https://www.adobe.com/) [[@mishikaSingh](https://github.com/mishikaSingh), [@ramandumcs](https://github.com/ramandumcs), [@vardancse](https://github.com/vardancse)]
1. [Adyen](https://www.adyen.com/) [[@jorricks](https://github.com/jorricks)]
1. [Agari](https://github.com/agaridata) [[@r39132](https://github.com/r39132)]
1. [Agoda](https://agoda.com) [[@akki](https://github.com/akki)]
1. [Airbnb](https://airbnb.io/) [[@mistercrunch](https://github.com/mistercrunch), [@artwr](https://github.com/artwr)]
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

@security.requires_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
]
)
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def get_task_instances_batch(session=None):

@security.requires_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
]
Expand Down Expand Up @@ -263,7 +263,7 @@ def post_clear_task_instances(dag_id: str, session=None):

@security.requires_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
]
Expand Down
132 changes: 98 additions & 34 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
import traceback
from collections import defaultdict
from datetime import timedelta
from functools import wraps
from json import JSONDecodeError
from operator import itemgetter
from typing import Iterable, List, Optional, Tuple
from typing import Callable, Iterable, List, Optional, Set, Tuple, Union
from urllib.parse import parse_qsl, unquote, urlencode, urlparse

import lazy_object_proxy
Expand Down Expand Up @@ -1364,7 +1365,7 @@ def xcom(self, session=None):
@expose('/run', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE),
]
)
Expand Down Expand Up @@ -1584,7 +1585,7 @@ def _clear_dag_tis(
@expose('/clear', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE),
]
)
Expand Down Expand Up @@ -1628,7 +1629,7 @@ def clear(self):
@expose('/dagrun_clear', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE),
]
)
Expand All @@ -1650,7 +1651,7 @@ def dagrun_clear(self):
@expose('/blocked', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
]
)
Expand Down Expand Up @@ -1758,7 +1759,7 @@ def _mark_dagrun_state_as_success(self, dag_id, execution_date, confirmed, origi
@expose('/dagrun_failed', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
]
)
Expand All @@ -1774,7 +1775,7 @@ def dagrun_failed(self):
@expose('/dagrun_success', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
]
)
Expand Down Expand Up @@ -1852,7 +1853,7 @@ def _mark_task_instance_state(
@expose('/failed', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
]
)
Expand Down Expand Up @@ -1886,7 +1887,7 @@ def failed(self):
@expose('/success', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
]
)
Expand Down Expand Up @@ -2892,6 +2893,14 @@ def apply(self, query, func):
return query.filter(self.model.dag_id.in_(filter_dag_ids))


class DagEditFilter(BaseFilter):
"""Filter using DagIDs"""

def apply(self, query, func): # pylint: disable=redefined-outer-name,unused-argument
filter_dag_ids = current_app.appbuilder.sm.get_editable_dag_ids(g.user)
return query.filter(self.model.dag_id.in_(filter_dag_ids))


class AirflowModelView(ModelView):
"""Airflow Mode View."""

Expand All @@ -2901,6 +2910,71 @@ class AirflowModelView(ModelView):
CustomSQLAInterface = wwwutils.CustomSQLAInterface


class AirflowPrivilegeVerifierModelView(AirflowModelView):
"""
This ModelView prevents ability to pass primary keys of objects relating to DAGs you shouldn't be able to
edit. This only holds for the add, update and delete operations.
You will still need to use the `action_has_dag_edit_access()` for actions.
"""

@staticmethod
def validate_dag_edit_access(item: Union[DagRun, TaskInstance]):
"""Validates whether the user has 'can_edit' access for this specific DAG."""
if not current_app.appbuilder.sm.can_edit_dag(item.dag_id):
raise AirflowException(f"Access denied for dag_id {item.dag_id}")

def pre_add(self, item: Union[DagRun, TaskInstance]):
self.validate_dag_edit_access(item)

def pre_update(self, item: Union[DagRun, TaskInstance]):
self.validate_dag_edit_access(item)

def pre_delete(self, item: Union[DagRun, TaskInstance]):
self.validate_dag_edit_access(item)

def post_add_redirect(self): # Required to prevent redirect loop
return redirect(self.get_default_url())

def post_edit_redirect(self): # Required to prevent redirect loop
return redirect(self.get_default_url())

def post_delete_redirect(self): # Required to prevent redirect loop
return redirect(self.get_default_url())


def action_has_dag_edit_access(action_func: Callable) -> Callable:
"""Decorator for actions which verifies you have DAG edit access on the given tis/drs."""

@wraps(action_func)
def check_dag_edit_acl_for_actions(
self,
items: Optional[Union[List[TaskInstance], List[DagRun], TaskInstance, DagRun]],
*args,
**kwargs,
) -> None:
if items is None:
dag_ids: Set[str] = set()
elif isinstance(items, list):
dag_ids = {item.dag_id for item in items if item is not None}
elif isinstance(items, TaskInstance) or isinstance(items, DagRun):
dag_ids = {items.dag_id}
else:
raise ValueError(
"Was expecting the first argument of the action to be of type "
"Optional[Union[List[TaskInstance], List[DagRun], TaskInstance, DagRun]]."
f"Was of type: {type(items)}"
)

for dag_id in dag_ids:
if not current_app.appbuilder.sm.can_edit_dag(dag_id):
flash(f"Access denied for dag_id {dag_id}", "danger")
logging.warning("User %s tried to modify %s without having access.", g.user.username, dag_id)
return redirect(self.get_default_url())
return action_func(self, items, *args, **kwargs)

return check_dag_edit_acl_for_actions


class SlaMissModelView(AirflowModelView):
"""View to show SlaMiss table"""

Expand Down Expand Up @@ -3426,7 +3500,7 @@ class JobModelView(AirflowModelView):
}


class DagRunModelView(AirflowModelView):
class DagRunModelView(AirflowPrivilegeVerifierModelView):
"""View to show records from DagRun table"""

route_base = '/dagrun'
Expand Down Expand Up @@ -3475,7 +3549,7 @@ class DagRunModelView(AirflowModelView):

base_order = ('execution_date', 'desc')

base_filters = [['dag_id', DagFilter, lambda: []]]
base_filters = [['dag_id', DagEditFilter, lambda: []]]

edit_form = DagRunEditForm

Expand All @@ -3490,6 +3564,7 @@ class DagRunModelView(AirflowModelView):
}

@action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False)
@action_has_dag_edit_access
@provide_session
def action_muldelete(self, items, session=None):
"""Multiple delete."""
Expand All @@ -3498,6 +3573,7 @@ def action_muldelete(self, items, session=None):
return redirect(self.get_redirect())

@action('set_running', "Set state to 'running'", '', single=False)
@action_has_dag_edit_access
@provide_session
def action_set_running(self, drs, session=None):
"""Set state to running."""
Expand All @@ -3520,6 +3596,7 @@ def action_set_running(self, drs, session=None):
"All running task instances would also be marked as failed, are you sure?",
single=False,
)
@action_has_dag_edit_access
@provide_session
def action_set_failed(self, drs, session=None):
"""Set state to failed."""
Expand All @@ -3546,6 +3623,7 @@ def action_set_failed(self, drs, session=None):
"All task instances would also be marked as success, are you sure?",
single=False,
)
@action_has_dag_edit_access
@provide_session
def action_set_success(self, drs, session=None):
"""Set state to success."""
Expand All @@ -3567,6 +3645,7 @@ def action_set_success(self, drs, session=None):
return redirect(self.get_default_url())

@action('clear', "Clear the state", "All task instances would be cleared, are you sure?", single=False)
@action_has_dag_edit_access
@provide_session
def action_clear(self, drs, session=None):
"""Clears the state."""
Expand Down Expand Up @@ -3675,7 +3754,7 @@ def duration_f(self):
}


class TaskInstanceModelView(AirflowModelView):
class TaskInstanceModelView(AirflowPrivilegeVerifierModelView):
"""View to show records from TaskInstance table"""

route_base = '/taskinstance'
Expand Down Expand Up @@ -3754,7 +3833,7 @@ class TaskInstanceModelView(AirflowModelView):

base_order = ('job_id', 'asc')

base_filters = [['dag_id', DagFilter, lambda: []]]
base_filters = [['dag_id', DagEditFilter, lambda: []]]

def log_url_formatter(self):
"""Formats log URL."""
Expand Down Expand Up @@ -3784,7 +3863,6 @@ def duration_f(self):
'duration': duration_f,
}

@provide_session
@action(
'clear',
lazy_gettext('Clear'),
Expand All @@ -3794,6 +3872,8 @@ def duration_f(self):
),
single=False,
)
@action_has_dag_edit_access
@provide_session
def action_clear(self, task_instances, session=None):
"""Clears the action."""
try:
Expand Down Expand Up @@ -3828,47 +3908,31 @@ def set_task_instance_state(self, tis, target_state, session=None):
flash('Failed to set state', 'error')

@action('set_running', "Set state to 'running'", '', single=False)
@auth.has_access(
[
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
]
)
@action_has_dag_edit_access
def action_set_running(self, tis):
"""Set state to 'running'"""
self.set_task_instance_state(tis, State.RUNNING)
self.update_redirect()
return redirect(self.get_redirect())

@action('set_failed', "Set state to 'failed'", '', single=False)
@auth.has_access(
[
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
]
)
@action_has_dag_edit_access
def action_set_failed(self, tis):
"""Set state to 'failed'"""
self.set_task_instance_state(tis, State.FAILED)
self.update_redirect()
return redirect(self.get_redirect())

@action('set_success', "Set state to 'success'", '', single=False)
@auth.has_access(
[
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
]
)
@action_has_dag_edit_access
def action_set_success(self, tis):
"""Set state to 'success'"""
self.set_task_instance_state(tis, State.SUCCESS)
self.update_redirect()
return redirect(self.get_redirect())

@action('set_retry', "Set state to 'up_for_retry'", '', single=False)
@auth.has_access(
[
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
]
)
@action_has_dag_edit_access
def action_set_retry(self, tis):
"""Set state to 'up_for_retry'"""
self.set_task_instance_state(tis, State.UP_FOR_RETRY)
Expand Down
Loading

0 comments on commit 1609049

Please sign in to comment.