Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Require can_edit on DAG privileges to modify TaskInstances and DagRuns #16634

Merged
merged 2 commits into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions INTHEWILD.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Currently, **officially** using Airflow:
1. [Accenture](https://www.accenture.com/au-en) [[@nijanthanvijayakumar](https://github.com/nijanthanvijayakumar)]
1. [AdBOOST](https://www.adboost.sk) [[AdBOOST](https://github.com/AdBOOST)]
1. [Adobe](https://www.adobe.com/) [[@mishikaSingh](https://github.com/mishikaSingh), [@ramandumcs](https://github.com/ramandumcs), [@vardancse](https://github.com/vardancse)]
1. [Adyen](https://www.adyen.com/) [[@jorricks](https://github.com/jorricks)]
1. [Agari](https://github.com/agaridata) [[@r39132](https://github.com/r39132)]
1. [Agoda](https://agoda.com) [[@akki](https://github.com/akki)]
1. [Airbnb](https://airbnb.io/) [[@mistercrunch](https://github.com/mistercrunch), [@artwr](https://github.com/artwr)]
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

@security.requires_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
]
)
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def get_task_instances_batch(session=None):

@security.requires_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
]
Expand Down Expand Up @@ -261,7 +261,7 @@ def post_clear_task_instances(dag_id: str, session=None):

@security.requires_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
]
Expand Down
134 changes: 99 additions & 35 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
import traceback
from collections import defaultdict
from datetime import timedelta
from functools import wraps
from json import JSONDecodeError
from operator import itemgetter
from typing import Any, Iterable, List, Optional, Tuple
from typing import Any, Callable, Iterable, List, Optional, Set, Tuple, Union
from urllib.parse import parse_qsl, unquote, urlencode, urlparse

import lazy_object_proxy
Expand Down Expand Up @@ -1515,7 +1516,7 @@ def xcom(self, session=None):
@expose('/run', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE),
]
)
Expand Down Expand Up @@ -1793,7 +1794,7 @@ def _clear_dag_tis(
@expose('/clear', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE),
]
)
Expand Down Expand Up @@ -1837,7 +1838,7 @@ def clear(self):
@expose('/dagrun_clear', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE),
]
)
Expand All @@ -1859,7 +1860,7 @@ def dagrun_clear(self):
@expose('/blocked', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
]
)
Expand Down Expand Up @@ -1966,7 +1967,7 @@ def _mark_dagrun_state_as_success(self, dag_id, execution_date, confirmed, origi
@expose('/dagrun_failed', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
]
)
Expand All @@ -1982,7 +1983,7 @@ def dagrun_failed(self):
@expose('/dagrun_success', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
]
)
Expand Down Expand Up @@ -2026,7 +2027,7 @@ def _mark_task_instance_state(
@expose('/confirm', methods=['GET'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
]
)
Expand Down Expand Up @@ -2099,7 +2100,7 @@ def confirm(self):
@expose('/failed', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
]
)
Expand Down Expand Up @@ -2132,7 +2133,7 @@ def failed(self):
@expose('/success', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
]
)
Expand Down Expand Up @@ -3140,6 +3141,14 @@ def apply(self, query, func):
return query.filter(self.model.dag_id.in_(filter_dag_ids))


class DagEditFilter(BaseFilter):
"""Filter using DagIDs"""

def apply(self, query, func): # pylint: disable=redefined-outer-name,unused-argument
filter_dag_ids = current_app.appbuilder.sm.get_editable_dag_ids(g.user)
return query.filter(self.model.dag_id.in_(filter_dag_ids))


class AirflowModelView(ModelView):
"""Airflow Mode View."""

Expand All @@ -3149,6 +3158,71 @@ class AirflowModelView(ModelView):
CustomSQLAInterface = wwwutils.CustomSQLAInterface


class AirflowPrivilegeVerifierModelView(AirflowModelView):
"""
This ModelView prevents ability to pass primary keys of objects relating to DAGs you shouldn't be able to
edit. This only holds for the add, update and delete operations.
You will still need to use the `action_has_dag_edit_access()` for actions.
"""

@staticmethod
def validate_dag_edit_access(item: Union[DagRun, TaskInstance]):
"""Validates whether the user has 'can_edit' access for this specific DAG."""
if not current_app.appbuilder.sm.can_edit_dag(item.dag_id):
raise AirflowException(f"Access denied for dag_id {item.dag_id}")

def pre_add(self, item: Union[DagRun, TaskInstance]):
self.validate_dag_edit_access(item)

def pre_update(self, item: Union[DagRun, TaskInstance]):
self.validate_dag_edit_access(item)

def pre_delete(self, item: Union[DagRun, TaskInstance]):
self.validate_dag_edit_access(item)

def post_add_redirect(self): # Required to prevent redirect loop
return redirect(self.get_default_url())

def post_edit_redirect(self): # Required to prevent redirect loop
return redirect(self.get_default_url())

def post_delete_redirect(self): # Required to prevent redirect loop
return redirect(self.get_default_url())


def action_has_dag_edit_access(action_func: Callable) -> Callable:
"""Decorator for actions which verifies you have DAG edit access on the given tis/drs."""

@wraps(action_func)
def check_dag_edit_acl_for_actions(
self,
items: Optional[Union[List[TaskInstance], List[DagRun], TaskInstance, DagRun]],
*args,
**kwargs,
) -> None:
if items is None:
dag_ids: Set[str] = set()
elif isinstance(items, list):
Jorricks marked this conversation as resolved.
Show resolved Hide resolved
dag_ids = {item.dag_id for item in items if item is not None}
elif isinstance(items, TaskInstance) or isinstance(items, DagRun):
dag_ids = {items.dag_id}
else:
raise ValueError(
"Was expecting the first argument of the action to be of type "
"Optional[Union[List[TaskInstance], List[DagRun], TaskInstance, DagRun]]."
f"Was of type: {type(items)}"
)

for dag_id in dag_ids:
if not current_app.appbuilder.sm.can_edit_dag(dag_id):
flash(f"Access denied for dag_id {dag_id}", "danger")
logging.warning("User %s tried to modify %s without having access.", g.user.username, dag_id)
return redirect(self.get_default_url())
return action_func(self, items, *args, **kwargs)

return check_dag_edit_acl_for_actions


class SlaMissModelView(AirflowModelView):
"""View to show SlaMiss table"""

Expand Down Expand Up @@ -3812,7 +3886,7 @@ class JobModelView(AirflowModelView):
}


class DagRunModelView(AirflowModelView):
class DagRunModelView(AirflowPrivilegeVerifierModelView):
"""View to show records from DagRun table"""

route_base = '/dagrun'
Expand Down Expand Up @@ -3861,7 +3935,7 @@ class DagRunModelView(AirflowModelView):

base_order = ('execution_date', 'desc')

base_filters = [['dag_id', DagFilter, lambda: []]]
base_filters = [['dag_id', DagEditFilter, lambda: []]]

edit_form = DagRunEditForm

Expand All @@ -3876,6 +3950,7 @@ class DagRunModelView(AirflowModelView):
}

@action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False)
@action_has_dag_edit_access
@provide_session
def action_muldelete(self, items, session=None):
"""Multiple delete."""
Expand All @@ -3884,6 +3959,7 @@ def action_muldelete(self, items, session=None):
return redirect(self.get_redirect())

@action('set_running', "Set state to 'running'", '', single=False)
@action_has_dag_edit_access
@provide_session
def action_set_running(self, drs, session=None):
"""Set state to running."""
Expand All @@ -3906,6 +3982,7 @@ def action_set_running(self, drs, session=None):
"All running task instances would also be marked as failed, are you sure?",
single=False,
)
@action_has_dag_edit_access
@provide_session
def action_set_failed(self, drs, session=None):
"""Set state to failed."""
Expand All @@ -3932,6 +4009,7 @@ def action_set_failed(self, drs, session=None):
"All task instances would also be marked as success, are you sure?",
single=False,
)
@action_has_dag_edit_access
@provide_session
def action_set_success(self, drs, session=None):
"""Set state to success."""
Expand All @@ -3953,6 +4031,7 @@ def action_set_success(self, drs, session=None):
return redirect(self.get_default_url())

@action('clear', "Clear the state", "All task instances would be cleared, are you sure?", single=False)
@action_has_dag_edit_access
@provide_session
def action_clear(self, drs, session=None):
"""Clears the state."""
Expand Down Expand Up @@ -4114,7 +4193,7 @@ class TriggerModelView(AirflowModelView):
}


class TaskInstanceModelView(AirflowModelView):
class TaskInstanceModelView(AirflowPrivilegeVerifierModelView):
"""View to show records from TaskInstance table"""

route_base = '/taskinstance'
Expand Down Expand Up @@ -4198,7 +4277,7 @@ class TaskInstanceModelView(AirflowModelView):

base_order = ('job_id', 'asc')

base_filters = [['dag_id', DagFilter, lambda: []]]
base_filters = [['dag_id', DagEditFilter, lambda: []]]

def log_url_formatter(self):
"""Formats log URL."""
Expand Down Expand Up @@ -4229,7 +4308,6 @@ def duration_f(self):
'duration': duration_f,
}

@provide_session
@action(
'clear',
lazy_gettext('Clear'),
Expand All @@ -4239,6 +4317,8 @@ def duration_f(self):
),
single=False,
)
@action_has_dag_edit_access
@provide_session
def action_clear(self, task_instances, session=None):
"""Clears the action."""
try:
Expand Down Expand Up @@ -4271,47 +4351,31 @@ def set_task_instance_state(self, tis, target_state, session=None):
flash('Failed to set state', 'error')

@action('set_running', "Set state to 'running'", '', single=False)
@auth.has_access(
[
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
]
)
@action_has_dag_edit_access
def action_set_running(self, tis):
"""Set state to 'running'"""
self.set_task_instance_state(tis, State.RUNNING)
self.update_redirect()
return redirect(self.get_redirect())

@action('set_failed', "Set state to 'failed'", '', single=False)
@auth.has_access(
[
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
]
)
@action_has_dag_edit_access
def action_set_failed(self, tis):
"""Set state to 'failed'"""
self.set_task_instance_state(tis, State.FAILED)
self.update_redirect()
return redirect(self.get_redirect())

@action('set_success', "Set state to 'success'", '', single=False)
@auth.has_access(
[
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
]
)
@action_has_dag_edit_access
def action_set_success(self, tis):
"""Set state to 'success'"""
self.set_task_instance_state(tis, State.SUCCESS)
self.update_redirect()
return redirect(self.get_redirect())

@action('set_retry', "Set state to 'up_for_retry'", '', single=False)
@auth.has_access(
[
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
]
)
@action_has_dag_edit_access
def action_set_retry(self, tis):
"""Set state to 'up_for_retry'"""
self.set_task_instance_state(tis, State.UP_FOR_RETRY)
Expand Down
Loading