diff --git a/INTHEWILD.md b/INTHEWILD.md index ed9e8c7fae2cf..6b5f97f72fb2f 100644 --- a/INTHEWILD.md +++ b/INTHEWILD.md @@ -31,6 +31,7 @@ Currently, **officially** using Airflow: 1. [99](https://99taxis.com) [[@fbenevides](https://github.com/fbenevides), [@gustavoamigo](https://github.com/gustavoamigo) & [@mmmaia](https://github.com/mmmaia)] 1. [AdBOOST](https://www.adboost.sk) [[AdBOOST](https://github.com/AdBOOST)] 1. [Adobe](https://www.adobe.com/) [[@mishikaSingh](https://github.com/mishikaSingh), [@ramandumcs](https://github.com/ramandumcs), [@vardancse](https://github.com/vardancse)] +1. [Adyen](https://www.adyen.com/) [[@jorricks](https://github.com/jorricks)] 1. [Agari](https://github.com/agaridata) [[@r39132](https://github.com/r39132)] 1. [Agoda](https://agoda.com) [[@akki](https://github.com/akki)] 1. [Airbnb](https://airbnb.io/) [[@mistercrunch](https://github.com/mistercrunch), [@artwr](https://github.com/artwr)] diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 27d13b6f5af3c..c926c043d5f9e 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -36,7 +36,7 @@ @security.requires_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN), ] ) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 3926b87bf24d1..ba22b794208d8 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -229,7 +229,7 @@ def get_task_instances_batch(session=None): @security.requires_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), ] @@ -263,7 +263,7 @@ def post_clear_task_instances(dag_id: str, session=None): @security.requires_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), ] diff --git a/airflow/www/views.py b/airflow/www/views.py index 24f9021c651a2..3d2be590c76fa 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -26,9 +26,10 @@ import traceback from collections import defaultdict from datetime import timedelta +from functools import wraps from json import JSONDecodeError from operator import itemgetter -from typing import Iterable, List, Optional, Tuple +from typing import Callable, Iterable, List, Optional, Set, Tuple, Union from urllib.parse import parse_qsl, unquote, urlencode, urlparse import lazy_object_proxy @@ -1359,7 +1360,7 @@ def xcom(self, session=None): @expose('/run', methods=['POST']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE), ] ) @@ -1587,7 +1588,7 @@ def _clear_dag_tis( @expose('/clear', methods=['POST']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE), ] ) @@ -1631,7 +1632,7 @@ def clear(self): @expose('/dagrun_clear', methods=['POST']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE), ] ) @@ -1653,7 +1654,7 @@ def dagrun_clear(self): @expose('/blocked', methods=['POST']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), ] ) @@ -1761,7 +1762,7 @@ def _mark_dagrun_state_as_success(self, dag_id, execution_date, confirmed, origi @expose('/dagrun_failed', methods=['POST']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN), ] ) @@ -1777,7 +1778,7 @@ def dagrun_failed(self): @expose('/dagrun_success', methods=['POST']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN), ] ) @@ -1855,7 +1856,7 @@ def _mark_task_instance_state( @expose('/failed', methods=['POST']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), ] ) @@ -1889,7 +1890,7 @@ def failed(self): @expose('/success', methods=['POST']) @auth.has_access( [ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), ] ) @@ -2895,6 +2896,14 @@ def apply(self, query, func): return query.filter(self.model.dag_id.in_(filter_dag_ids)) +class DagEditFilter(BaseFilter): + """Filter using DagIDs""" + + def apply(self, query, func): # pylint: disable=redefined-outer-name,unused-argument + filter_dag_ids = current_app.appbuilder.sm.get_editable_dag_ids(g.user) + return query.filter(self.model.dag_id.in_(filter_dag_ids)) + + class AirflowModelView(ModelView): """Airflow Mode View.""" @@ -2904,6 +2913,71 @@ class AirflowModelView(ModelView): CustomSQLAInterface = wwwutils.CustomSQLAInterface +class AirflowPrivilegeVerifierModelView(AirflowModelView): + """ + This ModelView prevents ability to pass primary keys of objects relating to DAGs you shouldn't be able to + edit. This only holds for the add, update and delete operations. + You will still need to use the `action_has_dag_edit_access()` for actions. + """ + + @staticmethod + def validate_dag_edit_access(item: Union[DagRun, TaskInstance]): + """Validates whether the user has 'can_edit' access for this specific DAG.""" + if not current_app.appbuilder.sm.can_edit_dag(item.dag_id): + raise AirflowException(f"Access denied for dag_id {item.dag_id}") + + def pre_add(self, item: Union[DagRun, TaskInstance]): + self.validate_dag_edit_access(item) + + def pre_update(self, item: Union[DagRun, TaskInstance]): + self.validate_dag_edit_access(item) + + def pre_delete(self, item: Union[DagRun, TaskInstance]): + self.validate_dag_edit_access(item) + + def post_add_redirect(self): # Required to prevent redirect loop + return redirect(self.get_default_url()) + + def post_edit_redirect(self): # Required to prevent redirect loop + return redirect(self.get_default_url()) + + def post_delete_redirect(self): # Required to prevent redirect loop + return redirect(self.get_default_url()) + + +def action_has_dag_edit_access(action_func: Callable) -> Callable: + """Decorator for actions which verifies you have DAG edit access on the given tis/drs.""" + + @wraps(action_func) + def check_dag_edit_acl_for_actions( + self, + items: Optional[Union[List[TaskInstance], List[DagRun], TaskInstance, DagRun]], + *args, + **kwargs, + ) -> None: + if items is None: + dag_ids: Set[str] = set() + elif isinstance(items, list): + dag_ids = {item.dag_id for item in items if item is not None} + elif isinstance(items, TaskInstance) or isinstance(items, DagRun): + dag_ids = {items.dag_id} + else: + raise ValueError( + "Was expecting the first argument of the action to be of type " + "Optional[Union[List[TaskInstance], List[DagRun], TaskInstance, DagRun]]." + f"Was of type: {type(items)}" + ) + + for dag_id in dag_ids: + if not current_app.appbuilder.sm.can_edit_dag(dag_id): + flash(f"Access denied for dag_id {dag_id}", "danger") + logging.warning("User %s tried to modify %s without having access.", g.user.username, dag_id) + return redirect(self.get_default_url()) + return action_func(self, items, *args, **kwargs) + + return check_dag_edit_acl_for_actions + + class SlaMissModelView(AirflowModelView): """View to show SlaMiss table""" @@ -3435,7 +3509,7 @@ class JobModelView(AirflowModelView): } -class DagRunModelView(AirflowModelView): +class DagRunModelView(AirflowPrivilegeVerifierModelView): """View to show records from DagRun table""" route_base = '/dagrun' @@ -3484,7 +3558,7 @@ class DagRunModelView(AirflowModelView): base_order = ('execution_date', 'desc') - base_filters = [['dag_id', DagFilter, lambda: []]] + base_filters = [['dag_id', DagEditFilter, lambda: []]] edit_form = DagRunEditForm @@ -3499,6 +3573,7 @@ class DagRunModelView(AirflowModelView): } @action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False) + @action_has_dag_edit_access @provide_session def action_muldelete(self, items, session=None): """Multiple delete.""" @@ -3507,6 +3582,7 @@ def action_muldelete(self, items, session=None): return redirect(self.get_redirect()) @action('set_running', "Set state to 'running'", '', single=False) + @action_has_dag_edit_access @provide_session def action_set_running(self, drs, session=None): """Set state to running.""" @@ -3529,6 +3605,7 @@ def action_set_running(self, drs, session=None): "All running task instances would also be marked as failed, are you sure?", single=False, ) + @action_has_dag_edit_access @provide_session def action_set_failed(self, drs, session=None): """Set state to failed.""" @@ -3555,6 +3632,7 @@ def action_set_failed(self, drs, session=None): "All task instances would also be marked as success, are you sure?", single=False, ) + @action_has_dag_edit_access @provide_session def action_set_success(self, drs, session=None): """Set state to success.""" @@ -3576,6 +3654,7 @@ def action_set_success(self, drs, session=None): return redirect(self.get_default_url()) @action('clear', "Clear the state", "All task instances would be cleared, are you sure?", single=False) + @action_has_dag_edit_access @provide_session def action_clear(self, drs, session=None): """Clears the state.""" @@ -3684,7 +3763,7 @@ def duration_f(self): } -class TaskInstanceModelView(AirflowModelView): +class TaskInstanceModelView(AirflowPrivilegeVerifierModelView): """View to show records from TaskInstance table""" route_base = '/taskinstance' @@ -3763,7 +3842,7 @@ class TaskInstanceModelView(AirflowModelView): base_order = ('job_id', 'asc') - base_filters = [['dag_id', DagFilter, lambda: []]] + base_filters = [['dag_id', DagEditFilter, lambda: []]] def log_url_formatter(self): """Formats log URL.""" @@ -3793,7 +3872,6 @@ def duration_f(self): 'duration': duration_f, } - @provide_session @action( 'clear', lazy_gettext('Clear'), @@ -3803,6 +3881,8 @@ def duration_f(self): ), single=False, ) + @action_has_dag_edit_access + @provide_session def action_clear(self, task_instances, session=None): """Clears the action.""" try: @@ -3837,11 +3917,7 @@ def set_task_instance_state(self, tis, target_state, session=None): flash('Failed to set state', 'error') @action('set_running', "Set state to 'running'", '', single=False) - @auth.has_access( - [ - (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), - ] - ) + @action_has_dag_edit_access def action_set_running(self, tis): """Set state to 'running'""" self.set_task_instance_state(tis, State.RUNNING) @@ -3849,11 +3925,7 @@ def action_set_running(self, tis): return redirect(self.get_redirect()) @action('set_failed', "Set state to 'failed'", '', single=False) - @auth.has_access( - [ - (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), - ] - ) + @action_has_dag_edit_access def action_set_failed(self, tis): """Set state to 'failed'""" self.set_task_instance_state(tis, State.FAILED) @@ -3861,11 +3933,7 @@ def action_set_failed(self, tis): return redirect(self.get_redirect()) @action('set_success', "Set state to 'success'", '', single=False) - @auth.has_access( - [ - (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), - ] - ) + @action_has_dag_edit_access def action_set_success(self, tis): """Set state to 'success'""" self.set_task_instance_state(tis, State.SUCCESS) @@ -3873,11 +3941,7 @@ def action_set_success(self, tis): return redirect(self.get_redirect()) @action('set_retry', "Set state to 'up_for_retry'", '', single=False) - @auth.has_access( - [ - (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), - ] - ) + @action_has_dag_edit_access def action_set_retry(self, tis): """Set state to 'up_for_retry'""" self.set_task_instance_state(tis, State.UP_FOR_RETRY) diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 0aa13b25e7a50..33093b0a1578e 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -25,7 +25,7 @@ from airflow.utils import timezone from airflow.utils.session import create_session, provide_session from airflow.utils.types import DagRunType -from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user +from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_roles, delete_user from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_dags, clear_db_runs @@ -47,6 +47,18 @@ def configured_app(minimal_app_for_api): (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN), ], ) + create_user( + app, # type: ignore + username="test_dag_view_only", + role_name="TestViewDags", + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN), + ], + ) create_user( app, # type: ignore username="test_view_dags", @@ -71,9 +83,11 @@ def configured_app(minimal_app_for_api): yield app delete_user(app, username="test") # type: ignore + delete_user(app, username="test_dag_view_only") # type: ignore delete_user(app, username="test_view_dags") # type: ignore delete_user(app, username="test_granular_permissions") # type: ignore delete_user(app, username="test_no_permissions") # type: ignore + delete_roles(app) class TestDagRunEndpoint: @@ -1070,7 +1084,10 @@ def test_should_raises_401_unauthenticated(self): assert_401(response) - def test_should_raises_403_unauthorized(self): + @parameterized.expand( + ["test_dag_view_only", "test_view_dags", "test_granular_permissions", "test_no_permissions"] + ) + def test_should_raises_403_unauthorized(self, username): self._create_dag("TEST_DAG_ID") response = self.client.post( "api/v1/dags/TEST_DAG_ID/dagRuns", @@ -1078,6 +1095,6 @@ def test_should_raises_403_unauthorized(self): "dag_run_id": "TEST_DAG_RUN_ID_1", "execution_date": self.default_time, }, - environ_overrides={'REMOTE_USER': "test_view_dags"}, + environ_overrides={'REMOTE_USER': username}, ) assert response.status_code == 403 diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 79ff113e6823b..024100ddc9c9c 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -27,7 +27,7 @@ from airflow.utils.state import State from airflow.utils.timezone import datetime from airflow.utils.types import DagRunType -from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user +from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_roles, delete_user from tests.test_utils.db import clear_db_runs, clear_db_sla_miss DEFAULT_DATETIME_1 = datetime(2020, 1, 1) @@ -44,16 +44,43 @@ def configured_app(minimal_app_for_api): role_name="Test", permissions=[ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), ], ) + create_user( + app, # type: ignore + username="test_dag_read_only", + role_name="TestDagReadOnly", + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), + ], + ) + create_user( + app, # type: ignore + username="test_task_read_only", + role_name="TestTaskReadOnly", + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), + ], + ) create_user(app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore yield app delete_user(app, username="test") # type: ignore + delete_user(app, username="test_dag_read_only") # type: ignore + delete_user(app, username="test_task_read_only") # type: ignore + delete_user(app, username="test_no_permissions") # type: ignore + delete_roles(app) class TestTaskInstanceEndpoint: @@ -133,7 +160,9 @@ def create_task_instances( class TestGetTaskInstance(TestTaskInstanceEndpoint): - def test_should_respond_200(self, session): + @parameterized.expand(["test", "test_dag_read_only", "test_task_read_only"]) + @provide_session + def test_should_respond_200(self, username, session): self.create_task_instances(session) # Update ti and set operator to None to # test that operator field is nullable. @@ -144,7 +173,7 @@ def test_should_respond_200(self, session): session.commit() response = self.client.get( "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context", - environ_overrides={"REMOTE_USER": "test"}, + environ_overrides={"REMOTE_USER": username}, ) assert response.status_code == 200 assert response.json == { @@ -479,6 +508,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint): True, {"queue": ["test_queue_1", "test_queue_2"]}, 2, + "test", ), ( "test pool filter", @@ -491,6 +521,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint): True, {"pool": ["test_pool_1", "test_pool_2"]}, 2, + "test_dag_read_only", ), ( "test state filter", @@ -503,6 +534,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint): True, {"state": ["running", "queued"]}, 2, + "test_task_read_only", ), ( "test duration filter", @@ -515,6 +547,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint): True, {"duration_gte": 100, "duration_lte": 200}, 3, + "test", ), ( "test end date filter", @@ -530,6 +563,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint): "end_date_lte": DEFAULT_DATETIME_STR_2, }, 2, + "test_task_read_only", ), ( "test start date filter", @@ -545,6 +579,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint): "start_date_lte": DEFAULT_DATETIME_STR_2, }, 2, + "test_dag_read_only", ), ( "with execution date filter", @@ -563,12 +598,13 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint): "execution_date_lte": (DEFAULT_DATETIME_1 + dt.timedelta(days=2)), }, 3, + "test", ), ] ) @provide_session def test_should_respond_200( - self, _, task_instances, update_extras, single_dag_run, payload, expected_ti_count, session + self, _, task_instances, update_extras, single_dag_run, payload, expected_ti_count, username, session ): self.create_task_instances( session, @@ -578,7 +614,7 @@ def test_should_respond_200( ) response = self.client.post( "/api/v1/dags/~/dagRuns/~/taskInstances/list", - environ_overrides={"REMOTE_USER": "test"}, + environ_overrides={"REMOTE_USER": username}, json=payload, ) assert response.status_code == 200, response.json @@ -984,10 +1020,11 @@ def test_should_raises_401_unauthenticated(self): ) assert_401(response) - def test_should_raise_403_forbidden(self): + @parameterized.expand(["test_no_permissions", "test_dag_read_only", "test_task_read_only"]) + def test_should_raise_403_forbidden(self, username: str): response = self.client.post( "/api/v1/dags/example_python_operator/clearTaskInstances", - environ_overrides={'REMOTE_USER': "test_no_permissions"}, + environ_overrides={'REMOTE_USER': username}, json={ "dry_run": False, "reset_dag_runs": True, @@ -1094,10 +1131,11 @@ def test_should_raises_401_unauthenticated(self): ) assert_401(response) - def test_should_raise_403_forbidden(self): + @parameterized.expand(["test_no_permissions", "test_dag_read_only", "test_task_read_only"]) + def test_should_raise_403_forbidden(self, username): response = self.client.post( "/api/v1/dags/example_python_operator/updateTaskInstancesState", - environ_overrides={'REMOTE_USER': "test_no_permissions"}, + environ_overrides={'REMOTE_USER': username}, json={ "dry_run": True, "task_id": "print_the_context", diff --git a/tests/www/views/test_views_acl.py b/tests/www/views/test_views_acl.py index 0341a4b2ac754..c5d8c06b95aa6 100644 --- a/tests/www/views/test_views_acl.py +++ b/tests/www/views/test_views_acl.py @@ -647,7 +647,7 @@ def user_all_dags_edit_tis(acl_app): username="user_all_dags_edit_tis", role_name="role_all_dags_edit_tis", permissions=[ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), ], diff --git a/tests/www/views/test_views_dagrun.py b/tests/www/views/test_views_dagrun.py index aa989cf7631f1..af5783d82e3ef 100644 --- a/tests/www/views/test_views_dagrun.py +++ b/tests/www/views/test_views_dagrun.py @@ -16,11 +16,42 @@ # specific language governing permissions and limitations # under the License. import pytest +import werkzeug from airflow.models import DagBag, DagRun, TaskInstance +from airflow.security import permissions from airflow.utils import timezone from airflow.utils.session import create_session -from tests.test_utils.www import check_content_in_response +from airflow.www.views import DagRunModelView +from tests.test_utils.api_connexion_utils import create_user, delete_roles, delete_user +from tests.test_utils.www import check_content_in_response, client_with_login +from tests.www.views.test_views_tasks import _get_appbuilder_pk_string + + +@pytest.fixture(scope="module") +def client_dr_without_dag_edit(app): + create_user( + app, + username="all_dr_permissions_except_dag_edit", + role_name="all_dr_permissions_except_dag_edit", + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_RUN), + ], + ) + + yield client_with_login( + app, + username="all_dr_permissions_except_dag_edit", + password="all_dr_permissions_except_dag_edit", + ) + + delete_user(app, username="all_dr_permissions_except_dag_edit") # type: ignore + delete_roles(app) @pytest.fixture(scope="module", autouse=True) @@ -42,6 +73,19 @@ def reset_dagrun(): session.query(TaskInstance).delete() +def test_create_dagrun_permission_denied(session, client_dr_without_dag_edit): + data = { + "state": "running", + "dag_id": "example_bash_operator", + "execution_date": "2018-07-06 05:06:03", + "run_id": "test_list_dagrun_includes_conf", + "conf": '{"include": "me"}', + } + + with pytest.raises(werkzeug.test.ClientRedirectError): + client_dr_without_dag_edit.post('/dagrun/add', data=data, follow_redirects=True) + + def test_clear_dag_runs_action(session, admin_client): dag = DagBag().get_dag("example_bash_operator") task0 = dag.get_task("runme_0") @@ -65,7 +109,54 @@ def test_clear_dag_runs_action(session, admin_client): assert [ti.state for ti in session.query(TaskInstance).all()] == [None, None] -def test_clear_dag_runs_action_fails(admin_client): - data = {"action": "clear", "rowid": ["0"]} - resp = admin_client.post("/dagrun/action_post", data=data, follow_redirects=True) - check_content_in_response("Failed to clear state", resp) +@pytest.fixture() +def running_dag_run(session): + dag = DagBag().get_dag("example_bash_operator") + task0 = dag.get_task("runme_0") + task1 = dag.get_task("runme_1") + execution_date = timezone.datetime(2016, 1, 9) + dr = dag.create_dagrun( + state="running", + execution_date=execution_date, + run_id="test_clear_dag_runs_action", + session=session, + ) + session.add(dr) + tis = [ + TaskInstance(task0, execution_date, state="success"), + TaskInstance(task1, execution_date, state="failed"), + ] + session.bulk_save_objects(tis) + session.flush() + return dr + + +def test_delete_dagrun(session, admin_client, running_dag_run): + composite_key = _get_appbuilder_pk_string(DagRunModelView, running_dag_run) + assert session.query(DagRun).filter(DagRun.dag_id == running_dag_run.dag_id).count() == 1 + admin_client.post(f"/dagrun/delete/{composite_key}", follow_redirects=True) + assert session.query(DagRun).filter(DagRun.dag_id == running_dag_run.dag_id).count() == 0 + + +def test_delete_dagrun_permission_denied(session, client_dr_without_dag_edit, running_dag_run): + composite_key = _get_appbuilder_pk_string(DagRunModelView, running_dag_run) + + assert session.query(DagRun).filter(DagRun.dag_id == running_dag_run.dag_id).count() == 1 + resp = client_dr_without_dag_edit.post(f"/dagrun/delete/{composite_key}", follow_redirects=True) + assert resp.status_code == 404 # If it doesn't fully succeed it gives a 404. + assert session.query(DagRun).filter(DagRun.dag_id == running_dag_run.dag_id).count() == 1 + + +@pytest.mark.parametrize( + "action", + ["clear", "set_success", "set_failed", "set_running"], + ids=["clear", "success", "failed", "running"], +) +def test_set_dag_runs_action_permission_denied(client_dr_without_dag_edit, running_dag_run, action): + running_dag_id = running_dag_run.id + resp = client_dr_without_dag_edit.post( + "/dagrun/action_post", + data={"action": action, "rowid": [str(running_dag_id)]}, + follow_redirects=True, + ) + check_content_in_response(f"Access denied for dag_id {running_dag_run.dag_id}", resp) diff --git a/tests/www/views/test_views_decorators.py b/tests/www/views/test_views_decorators.py index ce2ffe4759c5a..39a0be88eeba5 100644 --- a/tests/www/views/test_views_decorators.py +++ b/tests/www/views/test_views_decorators.py @@ -16,13 +16,17 @@ # specific language governing permissions and limitations # under the License. import urllib.parse +from typing import List +from unittest import mock import pytest -from airflow.models import DagBag, Log +from airflow.models import DagBag, DagRun, Log, TaskInstance from airflow.utils import dates, timezone from airflow.utils.state import State from airflow.utils.types import DagRunType +from airflow.www import app +from airflow.www.views import action_has_dag_edit_access from tests.test_utils.db import clear_db_runs from tests.test_utils.www import check_content_in_response @@ -78,6 +82,11 @@ def dagruns(bash_dag, sub_dag, xcom_dag): clear_db_runs() +@action_has_dag_edit_access +def some_view_action_which_requires_dag_edit_access(*args) -> bool: + return True + + def _check_last_log(session, dag_id, event, execution_date): logs = ( session.query( @@ -150,3 +159,8 @@ def test_calendar(admin_client, dagruns): datestr = bash_dagrun.execution_date.date().isoformat() expected = rf'{{\"date\":\"{datestr}\",\"state\":\"running\",\"count\":1}}' check_content_in_response(expected, resp) + + +def test_action_has_dag_edit_access_exception(): + with pytest.raises(ValueError): + some_view_action_which_requires_dag_edit_access(None, "some_incorrect_value") diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index 7c94006f288ae..7414337c4f4d4 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -27,15 +27,18 @@ from airflow.executors.celery_executor import CeleryExecutor from airflow.models import DagBag, DagModel, TaskInstance from airflow.models.dagcode import DagCode +from airflow.security import permissions from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES, RUNNABLE_STATES from airflow.utils import dates, timezone from airflow.utils.log.logging_mixin import ExternalLoggingMixin from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.types import DagRunType +from airflow.www.views import TaskInstanceModelView +from tests.test_utils.api_connexion_utils import create_user, delete_roles, delete_user from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_runs -from tests.test_utils.www import check_content_in_response, check_content_not_in_response +from tests.test_utils.www import check_content_in_response, check_content_not_in_response, client_with_login DEFAULT_DATE = dates.days_ago(2) @@ -72,6 +75,32 @@ def init_dagruns(app, reset_dagruns): clear_db_runs() +@pytest.fixture(scope="module") +def client_ti_without_dag_edit(app): + create_user( + app, + username="all_ti_permissions_except_dag_edit", + role_name="all_ti_permissions_except_dag_edit", + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), + (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE), + (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_TASK_INSTANCE), + ], + ) + + yield client_with_login( + app, + username="all_ti_permissions_except_dag_edit", + password="all_ti_permissions_except_dag_edit", + ) + + delete_user(app, username="all_ti_permissions_except_dag_edit") # type: ignore + delete_roles(app) + + @pytest.mark.parametrize( "url, contents", [ @@ -601,3 +630,45 @@ def test_external_log_redirect_link_with_external_log_handler_not_shown( ctx = templates[0].local_context assert not ctx['show_external_log_redirect'] assert ctx['external_log_name'] is None + + +def _get_appbuilder_pk_string(model_view_cls, instance) -> str: + """Utility to get Flask-Appbuilder's string format "pk" for an object. + + Used to generate requests to FAB action views without *too* much difficulty. + The implementation relies on FAB internals, but unfortunately I don't see + a better way around it. + + Example usage:: + + >>> from airflow.www.views import TaskInstanceModelView + >>> ti = session.Query(TaskInstance).filter(...).one() + >>> pk = _get_appbuilder_pk_string(TaskInstanceModelView, ti) + >>> client.post("...", data={"action": "...", "rowid": pk}) + """ + pk_value = model_view_cls.datamodel.get_pk_value(instance) + return model_view_cls._serialize_pk_if_composite(model_view_cls, pk_value) + + +@pytest.mark.parametrize( + "action", + ["clear", "set_success", "set_failed", "set_running"], + ids=["clear", "success", "failed", "running"], +) +def test_set_task_instance_action_permission_denied(session, client_ti_without_dag_edit, action): + task_id = "runme_0" + + # Set the state to success for clearing. + ti_q = session.query(TaskInstance).filter(TaskInstance.task_id == task_id) + ti_q.update({"state": State.SUCCESS}) + session.commit() + + # Send a request to clear. + rowid = _get_appbuilder_pk_string(TaskInstanceModelView, ti_q.one()) + expected_message = f"Access denied for dag_id {ti_q.one().dag_id}" + resp = client_ti_without_dag_edit.post( + "/taskinstance/action_post", + data={"action": action, "rowid": [rowid]}, + follow_redirects=True, + ) + check_content_in_response(expected_message, resp)