diff --git a/INTHEWILD.md b/INTHEWILD.md index c1a5b3267474f..588db4287efcb 100644 --- a/INTHEWILD.md +++ b/INTHEWILD.md @@ -33,6 +33,7 @@ Currently, **officially** using Airflow: 1. [Accenture](https://www.accenture.com/au-en) [[@nijanthanvijayakumar](https://github.com/nijanthanvijayakumar)] 1. [AdBOOST](https://www.adboost.sk) [[AdBOOST](https://github.com/AdBOOST)] 1. [Adobe](https://www.adobe.com/) [[@mishikaSingh](https://github.com/mishikaSingh), [@ramandumcs](https://github.com/ramandumcs), [@vardancse](https://github.com/vardancse)] +1. [Adyen](https://www.adyen.com/) [[@jorricks](https://github.com/jorricks)] 1. [Agari](https://github.com/agaridata) [[@r39132](https://github.com/r39132)] 1. [Agoda](https://agoda.com) [[@akki](https://github.com/akki)] 1. [Airbnb](https://airbnb.io/) [[@mistercrunch](https://github.com/mistercrunch), [@artwr](https://github.com/artwr)] diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 29e0efc1c2058..93278049ba342 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -40,7 +40,7 @@ @security.requires_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN), ] ) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 361d29e846295..cfa27ac57830d 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -227,7 +227,7 @@ def get_task_instances_batch(session=None): @security.requires_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), ] @@ -261,7 +261,7 @@ def post_clear_task_instances(dag_id: str, session=None): @security.requires_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), ] diff --git a/airflow/www/views.py b/airflow/www/views.py index ee099085246f8..b2c8712959b7e 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -27,9 +27,10 @@ import traceback from collections import defaultdict from datetime import timedelta +from functools import wraps from json import JSONDecodeError from operator import itemgetter -from typing import Any, Iterable, List, Optional, Tuple +from typing import Any, Callable, Iterable, List, Optional, Set, Tuple, Union from urllib.parse import parse_qsl, unquote, urlencode, urlparse import lazy_object_proxy @@ -1515,7 +1516,7 @@ def xcom(self, session=None): @expose('/run', methods=['POST']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE), ] ) @@ -1793,7 +1794,7 @@ def _clear_dag_tis( @expose('/clear', methods=['POST']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE), ] ) @@ -1837,7 +1838,7 @@ def clear(self): @expose('/dagrun_clear', methods=['POST']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE), ] ) @@ -1859,7 +1860,7 @@ def dagrun_clear(self): @expose('/blocked', methods=['POST']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), ] ) @@ -1966,7 +1967,7 @@ def _mark_dagrun_state_as_success(self, dag_id, execution_date, confirmed, origi @expose('/dagrun_failed', methods=['POST']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN), ] ) @@ -1982,7 +1983,7 @@ def dagrun_failed(self): @expose('/dagrun_success', methods=['POST']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN), ] ) @@ -2026,7 +2027,7 @@ def _mark_task_instance_state( @expose('/confirm', methods=['GET']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), ] ) @@ -2099,7 +2100,7 @@ def confirm(self): @expose('/failed', methods=['POST']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), ] ) @@ -2132,7 +2133,7 @@ def failed(self): @expose('/success', methods=['POST']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), ] ) @@ -3140,6 +3141,14 @@ def apply(self, query, func): return query.filter(self.model.dag_id.in_(filter_dag_ids)) +class DagEditFilter(BaseFilter): + """Filter using DagIDs""" + + def apply(self, query, func): # pylint: disable=redefined-outer-name,unused-argument + filter_dag_ids = current_app.appbuilder.sm.get_editable_dag_ids(g.user) + return query.filter(self.model.dag_id.in_(filter_dag_ids)) + + class AirflowModelView(ModelView): """Airflow Mode View.""" @@ -3149,6 +3158,71 @@ class AirflowModelView(ModelView): CustomSQLAInterface = wwwutils.CustomSQLAInterface +class AirflowPrivilegeVerifierModelView(AirflowModelView): + """ + This ModelView prevents ability to pass primary keys of objects relating to DAGs you shouldn't be able to + edit. This only holds for the add, update and delete operations. + You will still need to use the `action_has_dag_edit_access()` for actions. + """ + + @staticmethod + def validate_dag_edit_access(item: Union[DagRun, TaskInstance]): + """Validates whether the user has 'can_edit' access for this specific DAG.""" + if not current_app.appbuilder.sm.can_edit_dag(item.dag_id): + raise AirflowException(f"Access denied for dag_id {item.dag_id}") + + def pre_add(self, item: Union[DagRun, TaskInstance]): + self.validate_dag_edit_access(item) + + def pre_update(self, item: Union[DagRun, TaskInstance]): + self.validate_dag_edit_access(item) + + def pre_delete(self, item: Union[DagRun, TaskInstance]): + self.validate_dag_edit_access(item) + + def post_add_redirect(self): # Required to prevent redirect loop + return redirect(self.get_default_url()) + + def post_edit_redirect(self): # Required to prevent redirect loop + return redirect(self.get_default_url()) + + def post_delete_redirect(self): # Required to prevent redirect loop + return redirect(self.get_default_url()) + + +def action_has_dag_edit_access(action_func: Callable) -> Callable: + """Decorator for actions which verifies you have DAG edit access on the given tis/drs.""" + + @wraps(action_func) + def check_dag_edit_acl_for_actions( + self, + items: Optional[Union[List[TaskInstance], List[DagRun], TaskInstance, DagRun]], + *args, + **kwargs, + ) -> None: + if items is None: + dag_ids: Set[str] = set() + elif isinstance(items, list): + dag_ids = {item.dag_id for item in items if item is not None} + elif isinstance(items, TaskInstance) or isinstance(items, DagRun): + dag_ids = {items.dag_id} + else: + raise ValueError( + "Was expecting the first argument of the action to be of type " + "Optional[Union[List[TaskInstance], List[DagRun], TaskInstance, DagRun]]." + f"Was of type: {type(items)}" + ) + + for dag_id in dag_ids: + if not current_app.appbuilder.sm.can_edit_dag(dag_id): + flash(f"Access denied for dag_id {dag_id}", "danger") + logging.warning("User %s tried to modify %s without having access.", g.user.username, dag_id) + return redirect(self.get_default_url()) + return action_func(self, items, *args, **kwargs) + + return check_dag_edit_acl_for_actions + + class SlaMissModelView(AirflowModelView): """View to show SlaMiss table""" @@ -3812,7 +3886,7 @@ class JobModelView(AirflowModelView): } -class DagRunModelView(AirflowModelView): +class DagRunModelView(AirflowPrivilegeVerifierModelView): """View to show records from DagRun table""" route_base = '/dagrun' @@ -3861,7 +3935,7 @@ class DagRunModelView(AirflowModelView): base_order = ('execution_date', 'desc') - base_filters = [['dag_id', DagFilter, lambda: []]] + base_filters = [['dag_id', DagEditFilter, lambda: []]] edit_form = DagRunEditForm @@ -3876,6 +3950,7 @@ class DagRunModelView(AirflowModelView): } @action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False) + @action_has_dag_edit_access @provide_session def action_muldelete(self, items, session=None): """Multiple delete.""" @@ -3884,6 +3959,7 @@ def action_muldelete(self, items, session=None): return redirect(self.get_redirect()) @action('set_running', "Set state to 'running'", '', single=False) + @action_has_dag_edit_access @provide_session def action_set_running(self, drs, session=None): """Set state to running.""" @@ -3906,6 +3982,7 @@ def action_set_running(self, drs, session=None): "All running task instances would also be marked as failed, are you sure?", single=False, ) + @action_has_dag_edit_access @provide_session def action_set_failed(self, drs, session=None): """Set state to failed.""" @@ -3932,6 +4009,7 @@ def action_set_failed(self, drs, session=None): "All task instances would also be marked as success, are you sure?", single=False, ) + @action_has_dag_edit_access @provide_session def action_set_success(self, drs, session=None): """Set state to success.""" @@ -3953,6 +4031,7 @@ def action_set_success(self, drs, session=None): return redirect(self.get_default_url()) @action('clear', "Clear the state", "All task instances would be cleared, are you sure?", single=False) + @action_has_dag_edit_access @provide_session def action_clear(self, drs, session=None): """Clears the state.""" @@ -4114,7 +4193,7 @@ class TriggerModelView(AirflowModelView): } -class TaskInstanceModelView(AirflowModelView): +class TaskInstanceModelView(AirflowPrivilegeVerifierModelView): """View to show records from TaskInstance table""" route_base = '/taskinstance' @@ -4198,7 +4277,7 @@ class TaskInstanceModelView(AirflowModelView): base_order = ('job_id', 'asc') - base_filters = [['dag_id', DagFilter, lambda: []]] + base_filters = [['dag_id', DagEditFilter, lambda: []]] def log_url_formatter(self): """Formats log URL.""" @@ -4229,7 +4308,6 @@ def duration_f(self): 'duration': duration_f, } - @provide_session @action( 'clear', lazy_gettext('Clear'), @@ -4239,6 +4317,8 @@ def duration_f(self): ), single=False, ) + @action_has_dag_edit_access + @provide_session def action_clear(self, task_instances, session=None): """Clears the action.""" try: @@ -4271,11 +4351,7 @@ def set_task_instance_state(self, tis, target_state, session=None): flash('Failed to set state', 'error') @action('set_running', "Set state to 'running'", '', single=False) - @auth.has_access( - [ - (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), - ] - ) + @action_has_dag_edit_access def action_set_running(self, tis): """Set state to 'running'""" self.set_task_instance_state(tis, State.RUNNING) @@ -4283,11 +4359,7 @@ def action_set_running(self, tis): return redirect(self.get_redirect()) @action('set_failed', "Set state to 'failed'", '', single=False) - @auth.has_access( - [ - (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), - ] - ) + @action_has_dag_edit_access def action_set_failed(self, tis): """Set state to 'failed'""" self.set_task_instance_state(tis, State.FAILED) @@ -4295,11 +4367,7 @@ def action_set_failed(self, tis): return redirect(self.get_redirect()) @action('set_success', "Set state to 'success'", '', single=False) - @auth.has_access( - [ - (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), - ] - ) + @action_has_dag_edit_access def action_set_success(self, tis): """Set state to 'success'""" self.set_task_instance_state(tis, State.SUCCESS) @@ -4307,11 +4375,7 @@ def action_set_success(self, tis): return redirect(self.get_redirect()) @action('set_retry', "Set state to 'up_for_retry'", '', single=False) - @auth.has_access( - [ - (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), - ] - ) + @action_has_dag_edit_access def action_set_retry(self, tis): """Set state to 'up_for_retry'""" self.set_task_instance_state(tis, State.UP_FOR_RETRY) diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 47d89cba7c44a..69c06b79dc80d 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -28,7 +28,7 @@ from airflow.utils import timezone from airflow.utils.session import create_session, provide_session from airflow.utils.types import DagRunType -from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user +from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_roles, delete_user from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_dags, clear_db_runs @@ -50,6 +50,18 @@ def configured_app(minimal_app_for_api): (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN), ], ) + create_user( + app, # type: ignore + username="test_dag_view_only", + role_name="TestViewDags", + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN), + ], + ) create_user( app, # type: ignore username="test_view_dags", @@ -74,9 +86,11 @@ def configured_app(minimal_app_for_api): yield app delete_user(app, username="test") # type: ignore + delete_user(app, username="test_dag_view_only") # type: ignore delete_user(app, username="test_view_dags") # type: ignore delete_user(app, username="test_granular_permissions") # type: ignore delete_user(app, username="test_no_permissions") # type: ignore + delete_roles(app) class TestDagRunEndpoint: @@ -1145,7 +1159,10 @@ def test_should_raises_401_unauthenticated(self): assert_401(response) - def test_should_raises_403_unauthorized(self): + @parameterized.expand( + ["test_dag_view_only", "test_view_dags", "test_granular_permissions", "test_no_permissions"] + ) + def test_should_raises_403_unauthorized(self, username): self._create_dag("TEST_DAG_ID") response = self.client.post( "api/v1/dags/TEST_DAG_ID/dagRuns", @@ -1153,7 +1170,7 @@ def test_should_raises_403_unauthorized(self): "dag_run_id": "TEST_DAG_RUN_ID_1", "execution_date": self.default_time, }, - environ_overrides={'REMOTE_USER': "test_view_dags"}, + environ_overrides={'REMOTE_USER': username}, ) assert response.status_code == 403 diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 7696b76efe430..8ed4cda5a153f 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -28,7 +28,7 @@ from airflow.utils.state import State from airflow.utils.timezone import datetime from airflow.utils.types import DagRunType -from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user +from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_roles, delete_user from tests.test_utils.db import clear_db_runs, clear_db_sla_miss DEFAULT_DATETIME_1 = datetime(2020, 1, 1) @@ -45,16 +45,43 @@ def configured_app(minimal_app_for_api): role_name="Test", permissions=[ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), ], ) + create_user( + app, # type: ignore + username="test_dag_read_only", + role_name="TestDagReadOnly", + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), + ], + ) + create_user( + app, # type: ignore + username="test_task_read_only", + role_name="TestTaskReadOnly", + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), + ], + ) create_user(app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore yield app delete_user(app, username="test") # type: ignore + delete_user(app, username="test_dag_read_only") # type: ignore + delete_user(app, username="test_task_read_only") # type: ignore + delete_user(app, username="test_no_permissions") # type: ignore + delete_roles(app) class TestTaskInstanceEndpoint: @@ -133,7 +160,9 @@ def create_task_instances( class TestGetTaskInstance(TestTaskInstanceEndpoint): - def test_should_respond_200(self, session): + @parameterized.expand(["test", "test_dag_read_only", "test_task_read_only"]) + @provide_session + def test_should_respond_200(self, username, session): self.create_task_instances(session) # Update ti and set operator to None to # test that operator field is nullable. @@ -144,7 +173,7 @@ def test_should_respond_200(self, session): session.commit() response = self.client.get( "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context", - environ_overrides={"REMOTE_USER": "test"}, + environ_overrides={"REMOTE_USER": username}, ) assert response.status_code == 200 assert response.json == { @@ -451,6 +480,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint): True, {"queue": ["test_queue_1", "test_queue_2"]}, 2, + "test", ), ( "test pool filter", @@ -462,6 +492,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint): True, {"pool": ["test_pool_1", "test_pool_2"]}, 2, + "test_dag_read_only", ), ( "test state filter", @@ -473,6 +504,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint): False, {"state": ["running", "queued"]}, 2, + "test_task_read_only", ), ( "test duration filter", @@ -484,6 +516,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint): True, {"duration_gte": 100, "duration_lte": 200}, 3, + "test", ), ( "test end date filter", @@ -498,6 +531,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint): "end_date_lte": DEFAULT_DATETIME_STR_2, }, 2, + "test_task_read_only", ), ( "test start date filter", @@ -512,6 +546,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint): "start_date_lte": DEFAULT_DATETIME_STR_2, }, 2, + "test_dag_read_only", ), ( "with execution date filter", @@ -529,11 +564,14 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint): "execution_date_lte": (DEFAULT_DATETIME_1 + dt.timedelta(days=2)), }, 3, + "test", ), ] ) @provide_session - def test_should_respond_200(self, _, task_instances, update_extras, payload, expected_ti_count, session): + def test_should_respond_200( + self, _, task_instances, update_extras, payload, expected_ti_count, username, session + ): self.create_task_instances( session, update_extras=update_extras, @@ -541,7 +579,7 @@ def test_should_respond_200(self, _, task_instances, update_extras, payload, exp ) response = self.client.post( "/api/v1/dags/~/dagRuns/~/taskInstances/list", - environ_overrides={"REMOTE_USER": "test"}, + environ_overrides={"REMOTE_USER": username}, json=payload, ) assert response.status_code == 200, response.json @@ -943,10 +981,11 @@ def test_should_raises_401_unauthenticated(self): ) assert_401(response) - def test_should_raise_403_forbidden(self): + @parameterized.expand(["test_no_permissions", "test_dag_read_only", "test_task_read_only"]) + def test_should_raise_403_forbidden(self, username: str): response = self.client.post( "/api/v1/dags/example_python_operator/clearTaskInstances", - environ_overrides={'REMOTE_USER': "test_no_permissions"}, + environ_overrides={'REMOTE_USER': username}, json={ "dry_run": False, "reset_dag_runs": True, @@ -1055,10 +1094,11 @@ def test_should_raises_401_unauthenticated(self): ) assert_401(response) - def test_should_raise_403_forbidden(self): + @parameterized.expand(["test_no_permissions", "test_dag_read_only", "test_task_read_only"]) + def test_should_raise_403_forbidden(self, username): response = self.client.post( "/api/v1/dags/example_python_operator/updateTaskInstancesState", - environ_overrides={'REMOTE_USER': "test_no_permissions"}, + environ_overrides={'REMOTE_USER': username}, json={ "dry_run": True, "task_id": "print_the_context", diff --git a/tests/www/views/test_views_acl.py b/tests/www/views/test_views_acl.py index 82b77c06b7299..de5a2817e6c89 100644 --- a/tests/www/views/test_views_acl.py +++ b/tests/www/views/test_views_acl.py @@ -697,7 +697,7 @@ def user_all_dags_edit_tis(acl_app): username="user_all_dags_edit_tis", role_name="role_all_dags_edit_tis", permissions=[ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), ], diff --git a/tests/www/views/test_views_dagrun.py b/tests/www/views/test_views_dagrun.py index 919adff937bc7..ebd4f3084002c 100644 --- a/tests/www/views/test_views_dagrun.py +++ b/tests/www/views/test_views_dagrun.py @@ -16,11 +16,42 @@ # specific language governing permissions and limitations # under the License. import pytest +import werkzeug from airflow.models import DagBag, DagRun, TaskInstance +from airflow.security import permissions from airflow.utils import timezone from airflow.utils.session import create_session -from tests.test_utils.www import check_content_in_response +from airflow.www.views import DagRunModelView +from tests.test_utils.api_connexion_utils import create_user, delete_roles, delete_user +from tests.test_utils.www import check_content_in_response, client_with_login +from tests.www.views.test_views_tasks import _get_appbuilder_pk_string + + +@pytest.fixture(scope="module") +def client_dr_without_dag_edit(app): + create_user( + app, + username="all_dr_permissions_except_dag_edit", + role_name="all_dr_permissions_except_dag_edit", + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_RUN), + ], + ) + + yield client_with_login( + app, + username="all_dr_permissions_except_dag_edit", + password="all_dr_permissions_except_dag_edit", + ) + + delete_user(app, username="all_dr_permissions_except_dag_edit") # type: ignore + delete_roles(app) @pytest.fixture(scope="module", autouse=True) @@ -42,6 +73,19 @@ def reset_dagrun(): session.query(TaskInstance).delete() +def test_create_dagrun_permission_denied(session, client_dr_without_dag_edit): + data = { + "state": "running", + "dag_id": "example_bash_operator", + "execution_date": "2018-07-06 05:06:03", + "run_id": "test_list_dagrun_includes_conf", + "conf": '{"include": "me"}', + } + + with pytest.raises(werkzeug.test.ClientRedirectError): + client_dr_without_dag_edit.post('/dagrun/add', data=data, follow_redirects=True) + + @pytest.fixture() def running_dag_run(session): dag = DagBag().get_dag("example_bash_operator") @@ -62,6 +106,22 @@ def running_dag_run(session): return dr +def test_delete_dagrun(session, admin_client, running_dag_run): + composite_key = _get_appbuilder_pk_string(DagRunModelView, running_dag_run) + assert session.query(DagRun).filter(DagRun.dag_id == running_dag_run.dag_id).count() == 1 + admin_client.post(f"/dagrun/delete/{composite_key}", follow_redirects=True) + assert session.query(DagRun).filter(DagRun.dag_id == running_dag_run.dag_id).count() == 0 + + +def test_delete_dagrun_permission_denied(session, client_dr_without_dag_edit, running_dag_run): + composite_key = _get_appbuilder_pk_string(DagRunModelView, running_dag_run) + + assert session.query(DagRun).filter(DagRun.dag_id == running_dag_run.dag_id).count() == 1 + resp = client_dr_without_dag_edit.post(f"/dagrun/delete/{composite_key}", follow_redirects=True) + assert resp.status_code == 404 # If it doesn't fully succeed it gives a 404. + assert session.query(DagRun).filter(DagRun.dag_id == running_dag_run.dag_id).count() == 1 + + @pytest.mark.parametrize( "action, expected_ti_states, expected_message", [ @@ -134,3 +194,18 @@ def test_muldelete_dag_runs_action(session, admin_client, running_dag_run): assert resp.status_code == 200 assert session.query(TaskInstance).count() == 0 # Deletes associated TIs. assert session.query(DagRun).filter(DagRun.id == dag_run_id).count() == 0 + + +@pytest.mark.parametrize( + "action", + ["clear", "set_success", "set_failed", "set_running"], + ids=["clear", "success", "failed", "running"], +) +def test_set_dag_runs_action_permission_denied(client_dr_without_dag_edit, running_dag_run, action): + running_dag_id = running_dag_run.id + resp = client_dr_without_dag_edit.post( + "/dagrun/action_post", + data={"action": action, "rowid": [str(running_dag_id)]}, + follow_redirects=True, + ) + check_content_in_response(f"Access denied for dag_id {running_dag_run.dag_id}", resp) diff --git a/tests/www/views/test_views_decorators.py b/tests/www/views/test_views_decorators.py index ce2ffe4759c5a..621d3a44ae21d 100644 --- a/tests/www/views/test_views_decorators.py +++ b/tests/www/views/test_views_decorators.py @@ -16,13 +16,17 @@ # specific language governing permissions and limitations # under the License. import urllib.parse +from typing import List +from unittest import mock import pytest -from airflow.models import DagBag, Log +from airflow.models import DagBag, DagRun, Log, TaskInstance from airflow.utils import dates, timezone from airflow.utils.state import State from airflow.utils.types import DagRunType +from airflow.www import app +from airflow.www.views import action_has_dag_edit_access from tests.test_utils.db import clear_db_runs from tests.test_utils.www import check_content_in_response @@ -78,6 +82,11 @@ def dagruns(bash_dag, sub_dag, xcom_dag): clear_db_runs() +@action_has_dag_edit_access +def some_view_action_which_requires_dag_edit_access(*args) -> bool: + return True + + def _check_last_log(session, dag_id, event, execution_date): logs = ( session.query( @@ -150,3 +159,48 @@ def test_calendar(admin_client, dagruns): datestr = bash_dagrun.execution_date.date().isoformat() expected = rf'{{\"date\":\"{datestr}\",\"state\":\"running\",\"count\":1}}' check_content_in_response(expected, resp) + + +@pytest.mark.parametrize( + "class_type, no_instances, no_unique_dags", + [ + (None, 0, 0), + (TaskInstance, 0, 0), + (TaskInstance, 1, 1), + (TaskInstance, 10, 1), + (TaskInstance, 10, 5), + (DagRun, 0, 0), + (DagRun, 1, 1), + (DagRun, 10, 1), + (DagRun, 10, 9), + ], +) +def test_action_has_dag_edit_access(create_task_instance, class_type, no_instances, no_unique_dags): + unique_dag_ids = [f"test_dag_id_{nr}" for nr in range(no_unique_dags)] + tis: List[TaskInstance] = [ + create_task_instance( + task_id=f"test_task_instance_{nr}", + execution_date=timezone.datetime(2021, 1, 1 + nr), + dag_id=unique_dag_ids[nr % len(unique_dag_ids)], + run_id=f"test_run_id_{nr}", + ) + for nr in range(no_instances) + ] + if class_type is None: + test_items = None + else: + test_items = tis if class_type == TaskInstance else [ti.get_dagrun() for ti in tis] + test_items = test_items[0] if len(test_items) == 1 else test_items + + with app.create_app(testing=True).app_context(): + with mock.patch("airflow.www.views.current_app.appbuilder.sm.can_edit_dag") as mocked_can_edit: + mocked_can_edit.return_value = True + assert not isinstance(test_items, list) or len(test_items) == no_instances + assert some_view_action_which_requires_dag_edit_access(None, test_items) is True + assert mocked_can_edit.call_count == no_unique_dags + clear_db_runs() + + +def test_action_has_dag_edit_access_exception(): + with pytest.raises(ValueError): + some_view_action_which_requires_dag_edit_access(None, "some_incorrect_value") diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index 9376ae2f64945..05eca9a2a98a1 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -27,6 +27,7 @@ from airflow.executors.celery_executor import CeleryExecutor from airflow.models import DagBag, DagModel, TaskInstance from airflow.models.dagcode import DagCode +from airflow.security import permissions from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES, RUNNABLE_STATES from airflow.utils import dates, timezone from airflow.utils.log.logging_mixin import ExternalLoggingMixin @@ -34,9 +35,10 @@ from airflow.utils.state import State from airflow.utils.types import DagRunType from airflow.www.views import TaskInstanceModelView +from tests.test_utils.api_connexion_utils import create_user, delete_roles, delete_user from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_runs -from tests.test_utils.www import check_content_in_response, check_content_not_in_response +from tests.test_utils.www import check_content_in_response, check_content_not_in_response, client_with_login DEFAULT_DATE = dates.days_ago(2) @@ -73,6 +75,32 @@ def init_dagruns(app, reset_dagruns): clear_db_runs() +@pytest.fixture(scope="module") +def client_ti_without_dag_edit(app): + create_user( + app, + username="all_ti_permissions_except_dag_edit", + role_name="all_ti_permissions_except_dag_edit", + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), + (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE), + (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_TASK_INSTANCE), + ], + ) + + yield client_with_login( + app, + username="all_ti_permissions_except_dag_edit", + password="all_ti_permissions_except_dag_edit", + ) + + delete_user(app, username="all_ti_permissions_except_dag_edit") # type: ignore + delete_roles(app) + + @pytest.mark.parametrize( "url, contents", [ @@ -594,6 +622,35 @@ def _get_appbuilder_pk_string(model_view_cls, instance) -> str: return model_view_cls._serialize_pk_if_composite(model_view_cls, pk_value) +def test_task_instance_delete(session, admin_client, create_task_instance): + task_instance_to_delete = create_task_instance( + task_id="test_task_instance_delete", + execution_date=timezone.utcnow(), + state=State.DEFERRED, + ) + composite_key = _get_appbuilder_pk_string(TaskInstanceModelView, task_instance_to_delete) + task_id = task_instance_to_delete.task_id + + assert session.query(TaskInstance).filter(TaskInstance.task_id == task_id).count() == 1 + admin_client.post(f"/taskinstance/delete/{composite_key}", follow_redirects=True) + assert session.query(TaskInstance).filter(TaskInstance.task_id == task_id).count() == 0 + + +def test_task_instance_delete_permission_denied(session, client_ti_without_dag_edit, create_task_instance): + task_instance_to_delete = create_task_instance( + task_id="test_task_instance_delete_permission_denied", + execution_date=timezone.utcnow(), + state=State.DEFERRED, + ) + composite_key = _get_appbuilder_pk_string(TaskInstanceModelView, task_instance_to_delete) + task_id = task_instance_to_delete.task_id + + assert session.query(TaskInstance).filter(TaskInstance.task_id == task_id).count() == 1 + resp = client_ti_without_dag_edit.post(f"/taskinstance/delete/{composite_key}", follow_redirects=True) + assert resp.status_code == 404 # If it doesn't fully succeed it gives a 404. + assert session.query(TaskInstance).filter(TaskInstance.task_id == task_id).count() == 1 + + def test_task_instance_clear(session, admin_client): task_id = "runme_0" @@ -673,3 +730,27 @@ def test_task_instance_set_state_failure(admin_client, action): ) assert resp.status_code == 200 check_content_in_response("Failed to set state", resp) + + +@pytest.mark.parametrize( + "action", + ["clear", "set_success", "set_failed", "set_running"], + ids=["clear", "success", "failed", "running"], +) +def test_set_task_instance_action_permission_denied(session, client_ti_without_dag_edit, action): + task_id = "runme_0" + + # Set the state to success for clearing. + ti_q = session.query(TaskInstance).filter(TaskInstance.task_id == task_id) + ti_q.update({"state": State.SUCCESS}) + session.commit() + + # Send a request to clear. + rowid = _get_appbuilder_pk_string(TaskInstanceModelView, ti_q.one()) + expected_message = f"Access denied for dag_id {ti_q.one().dag_id}" + resp = client_ti_without_dag_edit.post( + "/taskinstance/action_post", + data={"action": action, "rowid": [rowid]}, + follow_redirects=True, + ) + check_content_in_response(expected_message, resp)