From 33a4502f68be6b25fbccb96ed1832c5b527bb02a Mon Sep 17 00:00:00 2001 From: Khalid Mammadov Date: Tue, 30 Nov 2021 16:33:55 +0000 Subject: [PATCH] Convert www.security test suite to pytest and remove residuals (#18961) Co-authored-by: Tzu-ping Chung --- tests/test_utils/api_connexion_utils.py | 16 +- tests/www/test_security.py | 1164 +++++++++++++---------- tests/www/views/conftest.py | 65 +- tests/www/views/test_views_base.py | 13 +- 4 files changed, 691 insertions(+), 567 deletions(-) diff --git a/tests/test_utils/api_connexion_utils.py b/tests/test_utils/api_connexion_utils.py index bee38c37b33dd..e8063f2cbc6a0 100644 --- a/tests/test_utils/api_connexion_utils.py +++ b/tests/test_utils/api_connexion_utils.py @@ -80,15 +80,23 @@ def create_role(app, name, permissions=None): return role +def set_user_single_role(app, username, role_name): + role = create_role(app, role_name) + user = app.appbuilder.sm.find_user(username) + if role not in user.roles: + user.roles = [role] + app.appbuilder.sm.update_user(user) + + def delete_role(app, name): - if app.appbuilder.sm.find_role(name): - app.appbuilder.sm.delete_role(name) + if name not in EXISTING_ROLES: + if app.appbuilder.sm.find_role(name): + app.appbuilder.sm.delete_role(name) def delete_roles(app): for role in app.appbuilder.sm.get_all_roles(): - if role.name not in EXISTING_ROLES: - app.appbuilder.sm.delete_role(role.name) + delete_role(app, role.name) def delete_user(app, username): diff --git a/tests/www/test_security.py b/tests/www/test_security.py index 66cc26dc369c3..efa53ddd6d13c 100644 --- a/tests/www/test_security.py +++ b/tests/www/test_security.py @@ -15,9 +15,8 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +import contextlib import logging -import unittest from unittest import mock import pytest @@ -25,15 +24,15 @@ from flask_appbuilder.views import BaseView, ModelView from sqlalchemy import Column, Date, Float, Integer, String -from airflow import settings from airflow.exceptions import AirflowException from airflow.models import DagModel from airflow.models.dag import DAG from airflow.security import permissions from airflow.www import app as application from airflow.www.fab_security.sqla.models import assoc_permission_role +from airflow.www.security import AirflowSecurityManager from airflow.www.utils import CustomSQLAInterface -from tests.test_utils import api_connexion_utils +from tests.test_utils.api_connexion_utils import create_user_scope, delete_role, set_user_single_role from tests.test_utils.asserts import assert_queries_count from tests.test_utils.db import clear_db_dags, clear_db_runs from tests.test_utils.mock_security_manager import MockSecurityManager @@ -78,627 +77,738 @@ def some_action(self): return "action!" -class TestSecurity(unittest.TestCase): - @classmethod - def setUpClass(cls): - settings.configure_orm() - cls.session = settings.Session - cls.app = application.create_app(testing=True) - cls.appbuilder = cls.app.appbuilder - cls.app.config['WTF_CSRF_ENABLED'] = False - cls.security_manager = cls.appbuilder.sm - cls.delete_roles() - - def setUp(self): - clear_db_runs() - clear_db_dags() - self.db = SQLA(self.app) - self.appbuilder.add_view(SomeBaseView, "SomeBaseView", category="BaseViews") - self.appbuilder.add_view(SomeModelView, "SomeModelView", category="ModelViews") - - log.debug("Complete setup!") - - @classmethod - def delete_roles(cls): - for role_name in [ - 'team-a', - 'MyRole1', - 'MyRole5', - 'Test_Role', - 'MyRole3', - 'MyRole2', - 'dag_permission_role', - ]: - api_connexion_utils.delete_role(cls.app, role_name) - - def expect_user_is_in_role(self, user, rolename): - self.security_manager.bulk_sync_roles([{'role': rolename, 'perms': []}]) - role = self.security_manager.find_role(rolename) - if not role: - self.security_manager.add_role(rolename) - role = self.security_manager.find_role(rolename) - user.roles = [role] - self.security_manager.update_user(user) - - def assert_user_has_dag_perms(self, perms, dag_id, user=None): - for perm in perms: - assert self._has_dag_perm(perm, dag_id, user), f"User should have '{perm}' on DAG '{dag_id}'" +def _clear_db_dag_and_runs(): + clear_db_runs() + clear_db_dags() - def assert_user_does_not_have_dag_perms(self, dag_id, perms, user=None): - for perm in perms: - assert not self._has_dag_perm( - perm, dag_id, user - ), f"User should not have '{perm}' on DAG '{dag_id}'" - - def _has_dag_perm(self, perm, dag_id, user): - return self.security_manager.has_access(perm, permissions.resource_name_for_dag(dag_id), user) - - def _create_dag(self, dag_id): - dag_model = DagModel(dag_id=dag_id) - self.session.add(dag_model) - self.session.commit() - self.security_manager.sync_perm_for_dag(dag_id, access_control=None) - - def tearDown(self): - clear_db_runs() - clear_db_dags() - self.appbuilder = None - self.app = None - self.db = None - log.debug("Complete teardown!") - - def test_init_role_baseview(self): - role_name = 'MyRole7' - role_perms = [('can_some_other_action', 'AnotherBaseView')] - with pytest.warns( - DeprecationWarning, - match="`init_role` has been deprecated\\. Please use `bulk_sync_roles` instead\\.", - ): - self.security_manager.init_role(role_name, role_perms) - - role = self.appbuilder.sm.find_role(role_name) - assert role is not None - assert len(role_perms) == len(role.permissions) - - def test_bulk_sync_roles_baseview(self): - role_name = 'MyRole3' - role_perms = [('can_some_action', 'SomeBaseView')] - self.security_manager.bulk_sync_roles([{'role': role_name, 'perms': role_perms}]) - - role = self.appbuilder.sm.find_role(role_name) - assert role is not None - assert len(role_perms) == len(role.permissions) - - def test_bulk_sync_roles_modelview(self): - role_name = 'MyRole2' - role_perms = [ - ('can_list', 'SomeModelView'), - ('can_show', 'SomeModelView'), - ('can_add', 'SomeModelView'), - (permissions.ACTION_CAN_EDIT, 'SomeModelView'), - (permissions.ACTION_CAN_DELETE, 'SomeModelView'), - ] - mock_roles = [{'role': role_name, 'perms': role_perms}] - self.security_manager.bulk_sync_roles(mock_roles) - role = self.appbuilder.sm.find_role(role_name) - assert role is not None - assert len(role_perms) == len(role.permissions) +def _delete_dag_permissions(dag_id, security_manager): + dag_resource_name = permissions.resource_name_for_dag(dag_id) + for dag_action_name in security_manager.DAG_ACTIONS: + security_manager.delete_permission(dag_action_name, dag_resource_name) - # Check short circuit works - with assert_queries_count(2): # One for Permission, one for roles - self.security_manager.bulk_sync_roles(mock_roles) - def test_update_and_verify_permission_role(self): - role_name = 'Test_Role' - role_perms = [] - mock_roles = [{'role': role_name, 'perms': role_perms}] - self.security_manager.bulk_sync_roles(mock_roles) - role = self.security_manager.find_role(role_name) +def _create_dag_model(dag_id, session, security_manager): + dag_model = DagModel(dag_id=dag_id) + session.add(dag_model) + session.commit() + security_manager.sync_perm_for_dag(dag_id, access_control=None) + return dag_model - perm = self.security_manager.get_permission(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_ROLE) - self.security_manager.add_permission_to_role(role, perm) - role_perms_len = len(role.permissions) - self.security_manager.bulk_sync_roles(mock_roles) - new_role_perms_len = len(role.permissions) +def _delete_dag_model(dag_model, session, security_manager): + session.delete(dag_model) + session.commit() + _delete_dag_permissions(dag_model.dag_id, security_manager) - assert role_perms_len == new_role_perms_len - assert new_role_perms_len == 1 - def test_verify_public_role_has_no_permissions(self): - public = self.appbuilder.sm.find_role("Public") +@contextlib.contextmanager +def _create_dag_model_context(dag_id, session, security_manager): + dag = _create_dag_model(dag_id, session, security_manager) + yield dag + _delete_dag_model(dag, session, security_manager) - assert public.permissions == [] - def test_verify_default_anon_user_has_no_accessible_dag_ids(self): - with self.app.app_context(): - user = mock.MagicMock() - user.is_anonymous = True - self.app.config['AUTH_ROLE_PUBLIC'] = 'Public' - assert self.app.appbuilder.sm.get_user_roles(user) == [self.app.appbuilder.sm.get_public_role()] +@pytest.fixture(scope="module", autouse=True) +def clear_db_after_suite(): + yield None + _clear_db_dag_and_runs() - self._create_dag("test_dag_id") - self.security_manager.sync_roles() - assert self.security_manager.get_accessible_dag_ids(user) == set() +@pytest.fixture(scope="function", autouse=True) +def clear_db_before_test(): + _clear_db_dag_and_runs() - def test_verify_default_anon_user_has_no_access_to_specific_dag(self): - with self.app.app_context(): - user = mock.MagicMock() - user.is_anonymous = True - self.app.config['AUTH_ROLE_PUBLIC'] = 'Public' - assert self.app.appbuilder.sm.get_user_roles(user) == [self.app.appbuilder.sm.get_public_role()] - dag_id = "test_dag_id" - self._create_dag(dag_id) - self.app.appbuilder.sm.sync_roles() +@pytest.fixture(scope="module") +def app(): + _app = application.create_app(testing=True) + _app.config['WTF_CSRF_ENABLED'] = False + return _app - assert self.app.appbuilder.sm.can_read_dag(dag_id, user) is False - assert self.app.appbuilder.sm.can_edit_dag(dag_id, user) is False - assert self._has_dag_perm(permissions.ACTION_CAN_READ, dag_id, user) is False - assert self._has_dag_perm(permissions.ACTION_CAN_EDIT, dag_id, user) is False - def test_verify_anon_user_with_admin_role_has_all_dag_access(self): - with self.app.app_context(): - self.app.config['AUTH_ROLE_PUBLIC'] = 'Admin' - user = mock.MagicMock() - user.is_anonymous = True +@pytest.fixture(scope="module") +def app_builder(app): + app_builder = app.appbuilder + app_builder.add_view(SomeBaseView, "SomeBaseView", category="BaseViews") + app_builder.add_view(SomeModelView, "SomeModelView", category="ModelViews") + return app.appbuilder - assert self.app.appbuilder.sm.get_user_roles(user) == [self.app.appbuilder.sm.get_public_role()] - test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"] - for dag_id in test_dag_ids: - self._create_dag(dag_id) - self.security_manager.sync_roles() +@pytest.fixture(scope="module") +def security_manager(app_builder): + return app_builder.sm - assert self.security_manager.get_accessible_dag_ids(user) == set(test_dag_ids) - def test_verify_anon_user_with_admin_role_has_access_to_each_dag(self): - with self.app.app_context(): - user = mock.MagicMock() - user.is_anonymous = True - self.app.config['AUTH_ROLE_PUBLIC'] = 'Admin' +@pytest.fixture(scope="module") +def session(app_builder): + return app_builder.get_session - # Call `.get_user_roles` bc `user` is a mock and the `user.roles` prop needs to be set. - user.roles = self.app.appbuilder.sm.get_user_roles(user) - assert user.roles == [self.app.appbuilder.sm.get_public_role()] - test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"] +@pytest.fixture(scope="module") +def db(app): + return SQLA(app) - for dag_id in test_dag_ids: - self._create_dag(dag_id) - self.security_manager.sync_roles() - for dag_id in test_dag_ids: - assert self.app.appbuilder.sm.can_read_dag(dag_id, user) is True - assert self.app.appbuilder.sm.can_edit_dag(dag_id, user) is True - assert self._has_dag_perm(permissions.ACTION_CAN_READ, dag_id, user) is True - assert self._has_dag_perm(permissions.ACTION_CAN_EDIT, dag_id, user) is True +@pytest.fixture(scope="function") +def role(request, app, security_manager): + params = request.param + _role = None + params['mock_roles'] = [{'role': params['name'], 'perms': params['permissions']}] + if params.get("create", True): + security_manager.bulk_sync_roles(params['mock_roles']) + _role = security_manager.find_role(params['name']) + yield _role, params + delete_role(app, params['name']) - def test_get_user_roles(self): - user = mock.MagicMock() - user.is_anonymous = False - roles = self.appbuilder.sm.find_role('Admin') - user.roles = roles - assert self.security_manager.get_user_roles(user) == roles - - def test_get_user_roles_for_anonymous_user(self): - viewer_role_perms = { - (permissions.ACTION_CAN_READ, permissions.RESOURCE_AUDIT_LOG), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_DEPENDENCIES), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_CODE), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_JOB), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_PLUGIN), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_SLA_MISS), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_XCOM), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PASSWORD), - (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_MY_PASSWORD), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PROFILE), - (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_MY_PROFILE), - (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_BROWSE_MENU), - (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_DEPENDENCIES), - (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_RUN), - (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_JOB), - (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_AUDIT_LOG), - (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_PLUGIN), - (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_SLA_MISS), - (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_TASK_INSTANCE), - (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DOCS_MENU), - (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DOCS), + +@pytest.fixture(scope="function") +def mock_dag_models(request, session, security_manager): + dags_ids = request.param + dags = [_create_dag_model(dag_id, session, security_manager) for dag_id in dags_ids] + + yield dags_ids + + for dag in dags: + _delete_dag_model(dag, session, security_manager) + + +@pytest.fixture(scope="function") +def sample_dags(security_manager): + dags = [ + DAG('has_access_control', access_control={'Public': {permissions.ACTION_CAN_READ}}), + DAG('no_access_control'), + ] + + yield dags + + for dag in dags: + _delete_dag_permissions(dag.dag_id, security_manager) + + +@pytest.fixture(scope="module") +def has_dag_perm(security_manager): + def _has_dag_perm(perm, dag_id, user): + return security_manager.has_access(perm, permissions.resource_name_for_dag(dag_id), user) + + return _has_dag_perm + + +@pytest.fixture(scope="module") +def assert_user_has_dag_perms(has_dag_perm): + def _assert_user_has_dag_perms(perms, dag_id, user=None): + for perm in perms: + assert has_dag_perm(perm, dag_id, user), f"User should have '{perm}' on DAG '{dag_id}'" + + return _assert_user_has_dag_perms + + +@pytest.fixture(scope="module") +def assert_user_does_not_have_dag_perms(has_dag_perm): + def _assert_user_does_not_have_dag_perms(dag_id, perms, user=None): + for perm in perms: + assert not has_dag_perm(perm, dag_id, user), f"User should not have '{perm}' on DAG '{dag_id}'" + + return _assert_user_does_not_have_dag_perms + + +@pytest.mark.parametrize( + "role", + [{"name": "MyRole7", "permissions": [('can_some_other_action', 'AnotherBaseView')], "create": False}], + indirect=True, +) +def test_init_role_baseview(app, security_manager, role): + _, params = role + + with pytest.warns( + DeprecationWarning, + match="`init_role` has been deprecated\\. Please use `bulk_sync_roles` instead\\.", + ): + security_manager.init_role(params['name'], params['permissions']) + + _role = security_manager.find_role(params['name']) + assert _role is not None + assert len(_role.permissions) == len(params['permissions']) + + +@pytest.mark.parametrize( + "role", + [{"name": "MyRole3", "permissions": [('can_some_action', 'SomeBaseView')]}], + indirect=True, +) +def test_bulk_sync_roles_baseview(app, security_manager, role): + _role, params = role + assert _role is not None + assert len(_role.permissions) == len(params['permissions']) + + +@pytest.mark.parametrize( + "role", + [ + { + "name": "MyRole2", + "permissions": [ + ('can_list', 'SomeModelView'), + ('can_show', 'SomeModelView'), + ('can_add', 'SomeModelView'), + (permissions.ACTION_CAN_EDIT, 'SomeModelView'), + (permissions.ACTION_CAN_DELETE, 'SomeModelView'), + ], } - self.app.config['AUTH_ROLE_PUBLIC'] = 'Viewer' - - with self.app.app_context(): - user = mock.MagicMock() - user.is_anonymous = True - - perms_views = set() - for role in self.security_manager.get_user_roles(user): - perms_views.update({(perm.action.name, perm.resource.name) for perm in role.permissions}) - assert perms_views == viewer_role_perms - - @mock.patch('airflow.www.security.AirflowSecurityManager.get_user_roles') - def test_get_current_user_permissions(self, mock_get_user_roles): - role_name = 'MyRole5' - role_perm = 'can_some_action' - role_vm = 'SomeBaseView' - username = 'get_current_user_permissions' - - with self.app.app_context(): - user = api_connexion_utils.create_user( - self.app, - username=username, - role_name=role_name, - permissions=[ - (role_perm, role_vm), - ], - ) + ], + indirect=True, +) +def test_bulk_sync_roles_modelview(app, security_manager, role): + _role, params = role + assert role is not None + assert len(_role.permissions) == len(params['permissions']) + + # Check short circuit works + with assert_queries_count(2): # One for Permission, one for roles + security_manager.bulk_sync_roles(params['mock_roles']) + + +@pytest.mark.parametrize( + "role", + [{"name": "Test_Role", "permissions": []}], + indirect=True, +) +def test_update_and_verify_permission_role(app, security_manager, role): + _role, params = role + perm = security_manager.get_permission(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_ROLE) + security_manager.add_permission_to_role(_role, perm) + role_perms_len = len(_role.permissions) + + security_manager.bulk_sync_roles(params['mock_roles']) + new_role_perms_len = len(_role.permissions) + + assert role_perms_len == new_role_perms_len + assert new_role_perms_len == 1 + + +def test_verify_public_role_has_no_permissions(security_manager): + public = security_manager.find_role("Public") + assert public.permissions == [] + + +def test_verify_default_anon_user_has_no_accessible_dag_ids(app, session, security_manager): + with app.app_context(): + user = mock.MagicMock() + user.is_anonymous = True + app.config['AUTH_ROLE_PUBLIC'] = 'Public' + assert security_manager.get_user_roles(user) == [security_manager.get_public_role()] + + with _create_dag_model_context("test_dag_id", session, security_manager): + security_manager.sync_roles() + + assert security_manager.get_accessible_dag_ids(user) == set() + + +def test_verify_default_anon_user_has_no_access_to_specific_dag(app, session, security_manager, has_dag_perm): + with app.app_context(): + user = mock.MagicMock() + user.is_anonymous = True + app.config['AUTH_ROLE_PUBLIC'] = 'Public' + assert security_manager.get_user_roles(user) == [security_manager.get_public_role()] + + dag_id = "test_dag_id" + with _create_dag_model_context(dag_id, session, security_manager): + security_manager.sync_roles() + + assert security_manager.can_read_dag(dag_id, user) is False + assert security_manager.can_edit_dag(dag_id, user) is False + assert has_dag_perm(permissions.ACTION_CAN_READ, dag_id, user) is False + assert has_dag_perm(permissions.ACTION_CAN_EDIT, dag_id, user) is False + + +@pytest.mark.parametrize( + "mock_dag_models", + [["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"]], + indirect=True, +) +def test_verify_anon_user_with_admin_role_has_all_dag_access(app, session, security_manager, mock_dag_models): + test_dag_ids = mock_dag_models + with app.app_context(): + app.config['AUTH_ROLE_PUBLIC'] = 'Admin' + user = mock.MagicMock() + user.is_anonymous = True + + assert security_manager.get_user_roles(user) == [security_manager.get_public_role()] + + security_manager.sync_roles() + + assert security_manager.get_accessible_dag_ids(user) == set(test_dag_ids) + + +def test_verify_anon_user_with_admin_role_has_access_to_each_dag( + app, session, security_manager, has_dag_perm +): + with app.app_context(): + user = mock.MagicMock() + user.is_anonymous = True + app.config['AUTH_ROLE_PUBLIC'] = 'Admin' + + # Call `.get_user_roles` bc `user` is a mock and the `user.roles` prop needs to be set. + user.roles = security_manager.get_user_roles(user) + assert user.roles == [security_manager.get_public_role()] + + test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"] + + for dag_id in test_dag_ids: + with _create_dag_model_context(dag_id, session, security_manager): + security_manager.sync_roles() + + assert security_manager.can_read_dag(dag_id, user) is True + assert security_manager.can_edit_dag(dag_id, user) is True + assert has_dag_perm(permissions.ACTION_CAN_READ, dag_id, user) is True + assert has_dag_perm(permissions.ACTION_CAN_EDIT, dag_id, user) is True + + +def test_get_user_roles(app_builder, security_manager): + user = mock.MagicMock() + user.is_anonymous = False + roles = app_builder.sm.find_role('Admin') + user.roles = roles + assert security_manager.get_user_roles(user) == roles + + +def test_get_user_roles_for_anonymous_user(app, security_manager): + viewer_role_perms = { + (permissions.ACTION_CAN_READ, permissions.RESOURCE_AUDIT_LOG), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_DEPENDENCIES), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_CODE), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_JOB), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_PLUGIN), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_SLA_MISS), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_XCOM), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PASSWORD), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_MY_PASSWORD), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PROFILE), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_MY_PROFILE), + (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_BROWSE_MENU), + (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_DEPENDENCIES), + (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_RUN), + (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_JOB), + (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_AUDIT_LOG), + (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_PLUGIN), + (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_SLA_MISS), + (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_TASK_INSTANCE), + (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DOCS_MENU), + (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DOCS), + } + app.config['AUTH_ROLE_PUBLIC'] = 'Viewer' + + with app.app_context(): + user = mock.MagicMock() + user.is_anonymous = True + + perms_views = set() + for role in security_manager.get_user_roles(user): + perms_views.update({(perm.action.name, perm.resource.name) for perm in role.permissions}) + assert perms_views == viewer_role_perms + + +def test_get_current_user_permissions(app, security_manager, monkeypatch): + role_perm = 'can_some_action' + role_vm = 'SomeBaseView' + + from airflow.www.security import AirflowSecurityManager + + with app.app_context(): + with create_user_scope( + app, + username='get_current_user_permissions', + role_name='MyRole5', + permissions=[ + (role_perm, role_vm), + ], + ) as user: role = user.roles[0] - mock_get_user_roles.return_value = [role] + monkeypatch.setattr(AirflowSecurityManager, "get_user_roles", lambda x: [role]) - assert self.security_manager.get_current_user_permissions() == {(role_perm, role_vm)} + assert security_manager.get_current_user_permissions() == {(role_perm, role_vm)} - mock_get_user_roles.return_value = [] - assert len(self.security_manager.get_current_user_permissions()) == 0 + monkeypatch.setattr(AirflowSecurityManager, "get_user_roles", lambda x: []) + assert len(security_manager.get_current_user_permissions()) == 0 - @mock.patch('airflow.www.security.AirflowSecurityManager.get_user_roles') - def test_current_user_has_permissions(self, mock_get_user_roles): - with self.app.app_context(): - user = api_connexion_utils.create_user( - self.app, - username="current_user_has_permissions", - role_name="current_user_has_permissions", - permissions=[("can_some_action", "SomeBaseView")], - ) + +def test_current_user_has_permissions(app, security_manager, monkeypatch): + with app.app_context(): + with create_user_scope( + app, + username="current_user_has_permissions", + role_name="current_user_has_permissions", + permissions=[("can_some_action", "SomeBaseView")], + ) as user: role = user.roles[0] - mock_get_user_roles.return_value = [role] - assert self.security_manager.current_user_has_permissions() + monkeypatch.setattr(AirflowSecurityManager, "get_user_roles", lambda x: [role]) + assert security_manager.current_user_has_permissions() # Role, but no permissions role.permissions = [] - assert not self.security_manager.current_user_has_permissions() + assert not security_manager.current_user_has_permissions() # No role - mock_get_user_roles.return_value = [] - assert not self.security_manager.current_user_has_permissions() + monkeypatch.setattr(AirflowSecurityManager, "get_user_roles", lambda x: []) + assert not security_manager.current_user_has_permissions() - def test_get_accessible_dag_ids(self): - role_name = 'MyRole1' - permission_action = [permissions.ACTION_CAN_READ] - dag_id = 'dag_id' - username = "ElUser" - user = api_connexion_utils.create_user( - self.app, - username=username, - role_name=role_name, - permissions=[ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), - ], - ) +def test_get_accessible_dag_ids(app, security_manager, session): + role_name = 'MyRole1' + permission_action = [permissions.ACTION_CAN_READ] + dag_id = 'dag_id' + username = "ElUser" + + with create_user_scope( + app, + username=username, + role_name=role_name, + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + ], + ) as user: dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py", schedule_interval="2 2 * * *") - self.session.add(dag_model) - self.session.commit() + session.add(dag_model) + session.commit() - self.security_manager.sync_perm_for_dag( # type: ignore + security_manager.sync_perm_for_dag( # type: ignore dag_id, access_control={role_name: permission_action} ) - assert self.security_manager.get_accessible_dag_ids(user) == {'dag_id'} + assert security_manager.get_accessible_dag_ids(user) == {'dag_id'} - def test_dont_get_inaccessible_dag_ids_for_dag_resource_permission(self): - # In this test case, - # get_readable_dag_ids() don't return DAGs to which the user has CAN_EDIT action - username = "Monsieur User" - role_name = "MyRole1" - permission_action = [permissions.ACTION_CAN_EDIT] - dag_id = "dag_id" - user = api_connexion_utils.create_user( - self.app, - username=username, - role_name=role_name, - permissions=[ - (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), - ], - ) +def test_dont_get_inaccessible_dag_ids_for_dag_resource_permission(app, security_manager, session): + # In this test case, + # get_readable_dag_ids() don't return DAGs to which the user has CAN_EDIT action + username = "Monsieur User" + role_name = "MyRole1" + permission_action = [permissions.ACTION_CAN_EDIT] + dag_id = "dag_id" + + with create_user_scope( + app, + username=username, + role_name=role_name, + permissions=[ + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), + ], + ) as user: dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py", schedule_interval="2 2 * * *") - self.session.add(dag_model) - self.session.commit() + session.add(dag_model) + session.commit() - self.security_manager.sync_perm_for_dag( # type: ignore + security_manager.sync_perm_for_dag( # type: ignore dag_id, access_control={role_name: permission_action} ) - assert self.security_manager.get_readable_dag_ids(user) == set() + assert security_manager.get_readable_dag_ids(user) == set() - @mock.patch('airflow.www.security.AirflowSecurityManager._has_resource_access') - def test_has_access(self, mock_has_resource_access): - user = mock.MagicMock() - user.is_anonymous = False - mock_has_resource_access.return_value = True - assert self.security_manager.has_access('perm', 'view', user) - - def test_sync_perm_for_dag_creates_permissions_on_resources(self): - test_dag_id = 'TEST_DAG' - prefixed_test_dag_id = f'DAG:{test_dag_id}' - self.security_manager.sync_perm_for_dag(test_dag_id, access_control=None) - assert ( - self.security_manager.get_permission(permissions.ACTION_CAN_READ, prefixed_test_dag_id) - is not None - ) - assert ( - self.security_manager.get_permission(permissions.ACTION_CAN_EDIT, prefixed_test_dag_id) - is not None + +def test_has_access(security_manager, monkeypatch): + user = mock.MagicMock() + user.is_anonymous = False + from airflow.www.security import AirflowSecurityManager + + monkeypatch.setattr(AirflowSecurityManager, "_has_resource_access", lambda a, b, c, d: True) + assert security_manager.has_access('perm', 'view', user) + + +def test_sync_perm_for_dag_creates_permissions_on_resources(security_manager): + test_dag_id = 'TEST_DAG' + prefixed_test_dag_id = f'DAG:{test_dag_id}' + security_manager.sync_perm_for_dag(test_dag_id, access_control=None) + assert security_manager.get_permission(permissions.ACTION_CAN_READ, prefixed_test_dag_id) is not None + assert security_manager.get_permission(permissions.ACTION_CAN_EDIT, prefixed_test_dag_id) is not None + + +def test_has_all_dag_access(security_manager, monkeypatch): + from airflow.www.security import AirflowSecurityManager + + monkeypatch.setattr(AirflowSecurityManager, "_has_role", lambda a, b: True) + assert security_manager.has_all_dags_access() + + monkeypatch.setattr(AirflowSecurityManager, "_has_role", lambda a, b: False) + monkeypatch.setattr(AirflowSecurityManager, "_has_perm", lambda a, b, c: False) + assert not security_manager.has_all_dags_access() + + monkeypatch.setattr(AirflowSecurityManager, "_has_perm", lambda a, b, c: True) + assert security_manager.has_all_dags_access() + + +def test_access_control_with_non_existent_role(security_manager): + with pytest.raises(AirflowException) as ctx: + security_manager._sync_dag_view_permissions( + dag_id='access-control-test', + access_control={ + 'this-role-does-not-exist': [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ] + }, ) + assert "role does not exist" in str(ctx.value) - @mock.patch('airflow.www.security.AirflowSecurityManager._has_perm') - @mock.patch('airflow.www.security.AirflowSecurityManager._has_role') - def test_has_all_dag_access(self, mock_has_role, mock_has_perm): - mock_has_role.return_value = True - assert self.security_manager.has_all_dags_access() - - mock_has_role.return_value = False - mock_has_perm.return_value = False - assert not self.security_manager.has_all_dags_access() - - mock_has_perm.return_value = True - assert self.security_manager.has_all_dags_access() - - def test_access_control_with_non_existent_role(self): - with pytest.raises(AirflowException) as ctx: - self.security_manager._sync_dag_view_permissions( - dag_id='access-control-test', - access_control={ - 'this-role-does-not-exist': [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ] - }, - ) - assert "role does not exist" in str(ctx.value) - - def test_all_dag_access_doesnt_give_non_dag_access(self): - username = 'dag_access_user' - role_name = 'dag_access_role' - with self.app.app_context(): - user = api_connexion_utils.create_user( - self.app, - username=username, - role_name=role_name, - permissions=[ - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), - (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), - ], - ) - assert self.security_manager.has_access( - permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG, user - ) - assert not self.security_manager.has_access( + +def test_all_dag_access_doesnt_give_non_dag_access(app, security_manager): + username = 'dag_access_user' + role_name = 'dag_access_role' + with app.app_context(): + with create_user_scope( + app, + username=username, + role_name=role_name, + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + ], + ) as user: + assert security_manager.has_access(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG, user) + assert not security_manager.has_access( permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE, user ) - def test_access_control_with_invalid_permission(self): - invalid_actions = [ - 'can_varimport', # a real action, but not a member of DAG_ACTIONS - 'can_eat_pudding', # clearly not a real action - ] - username = "LaUser" - user = api_connexion_utils.create_user( - self.app, - username=username, - role_name='team-a', - ) + +def test_access_control_with_invalid_permission(app, security_manager): + invalid_actions = [ + 'can_varimport', # a real action, but not a member of DAG_ACTIONS + 'can_eat_pudding', # clearly not a real action + ] + username = "LaUser" + rolename = "team-a" + with create_user_scope( + app, + username=username, + role_name=rolename, + ): for action in invalid_actions: - self.expect_user_is_in_role(user, rolename='team-a') with pytest.raises(AirflowException) as ctx: - self.security_manager._sync_dag_view_permissions( - 'access_control_test', access_control={'team-a': {action}} + security_manager._sync_dag_view_permissions( + 'access_control_test', access_control={rolename: {action}} ) assert "invalid permissions" in str(ctx.value) - def test_access_control_is_set_on_init(self): - username = 'access_control_is_set_on_init' - role_name = 'team-a' - with self.app.app_context(): - user = api_connexion_utils.create_user( - self.app, - username=username, - role_name=role_name, - permissions=[], - ) - self.expect_user_is_in_role(user, rolename='team-a') - self.security_manager._sync_dag_view_permissions( + +def test_access_control_is_set_on_init( + app, + security_manager, + assert_user_has_dag_perms, + assert_user_does_not_have_dag_perms, +): + username = 'access_control_is_set_on_init' + role_name = 'team-a' + negated_role = 'NOT-team-a' + with app.app_context(): + with create_user_scope( + app, + username=username, + role_name=role_name, + permissions=[], + ) as user: + security_manager._sync_dag_view_permissions( 'access_control_test', - access_control={'team-a': [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]}, + access_control={role_name: [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]}, ) - self.assert_user_has_dag_perms( + assert_user_has_dag_perms( perms=[permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ], dag_id='access_control_test', user=user, ) - self.expect_user_is_in_role(user, rolename='NOT-team-a') - self.assert_user_does_not_have_dag_perms( + security_manager.bulk_sync_roles([{'role': negated_role, 'perms': []}]) + set_user_single_role(app, username, role_name=negated_role) + assert_user_does_not_have_dag_perms( perms=[permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ], dag_id='access_control_test', user=user, ) - def test_access_control_stale_perms_are_revoked(self): - username = 'access_control_stale_perms_are_revoked' - role_name = 'team-a' - with self.app.app_context(): - user = api_connexion_utils.create_user( - self.app, - username=username, - role_name=role_name, - permissions=[], - ) - self.expect_user_is_in_role(user, rolename='team-a') - self.security_manager._sync_dag_view_permissions( + +def test_access_control_stale_perms_are_revoked( + app, + security_manager, + assert_user_has_dag_perms, + assert_user_does_not_have_dag_perms, +): + username = 'access_control_stale_perms_are_revoked' + role_name = 'team-a' + with app.app_context(): + with create_user_scope( + app, + username=username, + role_name=role_name, + permissions=[], + ) as user: + set_user_single_role(app, username, role_name='team-a') + security_manager._sync_dag_view_permissions( 'access_control_test', access_control={'team-a': READ_WRITE} ) - self.assert_user_has_dag_perms(perms=READ_WRITE, dag_id='access_control_test', user=user) + assert_user_has_dag_perms(perms=READ_WRITE, dag_id='access_control_test', user=user) - self.security_manager._sync_dag_view_permissions( + security_manager._sync_dag_view_permissions( 'access_control_test', access_control={'team-a': READ_ONLY} ) - self.assert_user_has_dag_perms( + assert_user_has_dag_perms( perms=[permissions.ACTION_CAN_READ], dag_id='access_control_test', user=user ) - self.assert_user_does_not_have_dag_perms( + assert_user_does_not_have_dag_perms( perms=[permissions.ACTION_CAN_EDIT], dag_id='access_control_test', user=user ) - def test_no_additional_dag_permission_views_created(self): - ab_perm_role = assoc_permission_role - - self.security_manager.sync_roles() - num_pv_before = self.db.session().query(ab_perm_role).count() - self.security_manager.sync_roles() - num_pv_after = self.db.session().query(ab_perm_role).count() - assert num_pv_before == num_pv_after - - def test_override_role_vm(self): - test_security_manager = MockSecurityManager(appbuilder=self.appbuilder) - assert len(test_security_manager.VIEWER_VMS) == 1 - assert test_security_manager.VIEWER_VMS == {'Airflow'} - - def test_correct_roles_have_perms_to_read_config(self): - roles_to_check = self.security_manager.get_all_roles() - assert len(roles_to_check) >= 5 - for role in roles_to_check: - if role.name in ["Admin", "Op"]: - assert self.security_manager.permission_exists_in_one_or_more_roles( - permissions.RESOURCE_CONFIG, permissions.ACTION_CAN_READ, [role.id] - ) - else: - assert not self.security_manager.permission_exists_in_one_or_more_roles( - permissions.RESOURCE_CONFIG, permissions.ACTION_CAN_READ, [role.id] - ), ( - f"{role.name} should not have {permissions.ACTION_CAN_READ} " - f"on {permissions.RESOURCE_CONFIG}" - ) - @mock.patch("airflow.www.security.DagBag") - def test_create_dag_specific_permissions(self, dagbag_mock): - access_control = {'Public': {permissions.ACTION_CAN_READ}} - dags = [ - DAG('has_access_control', access_control=access_control), - DAG('no_access_control'), - ] +def test_no_additional_dag_permission_views_created(db, security_manager): + ab_perm_role = assoc_permission_role - collect_dags_from_db_mock = mock.Mock() - dagbag = mock.Mock() + security_manager.sync_roles() + num_pv_before = db.session().query(ab_perm_role).count() + security_manager.sync_roles() + num_pv_after = db.session().query(ab_perm_role).count() + assert num_pv_before == num_pv_after - dagbag.dags = {dag.dag_id: dag for dag in dags} - dagbag.collect_dags_from_db = collect_dags_from_db_mock - dagbag_mock.return_value = dagbag - self.security_manager._sync_dag_view_permissions = mock.Mock() +def test_override_role_vm(app_builder): + test_security_manager = MockSecurityManager(appbuilder=app_builder) + assert len(test_security_manager.VIEWER_VMS) == 1 + assert test_security_manager.VIEWER_VMS == {'Airflow'} - for dag in dags: - dag_resource_name = permissions.resource_name_for_dag(dag.dag_id) - all_perms = self.security_manager.get_all_permissions() - assert ('can_read', dag_resource_name) not in all_perms - assert ('can_edit', dag_resource_name) not in all_perms - self.security_manager.create_dag_specific_permissions() +def test_correct_roles_have_perms_to_read_config(security_manager): + roles_to_check = security_manager.get_all_roles() + assert len(roles_to_check) >= 5 + for role in roles_to_check: + if role.name in ["Admin", "Op"]: + assert security_manager.permission_exists_in_one_or_more_roles( + permissions.RESOURCE_CONFIG, permissions.ACTION_CAN_READ, [role.id] + ) + else: + assert not security_manager.permission_exists_in_one_or_more_roles( + permissions.RESOURCE_CONFIG, permissions.ACTION_CAN_READ, [role.id] + ), ( + f"{role.name} should not have {permissions.ACTION_CAN_READ} " + f"on {permissions.RESOURCE_CONFIG}" + ) - dagbag_mock.assert_called_once_with(read_dags_from_db=True) - collect_dags_from_db_mock.assert_called_once_with() - for dag in dags: - dag_resource_name = permissions.resource_name_for_dag(dag.dag_id) - all_perms = self.security_manager.get_all_permissions() - assert ('can_read', dag_resource_name) in all_perms - assert ('can_edit', dag_resource_name) in all_perms +def test_create_dag_specific_permissions(session, security_manager, monkeypatch, sample_dags): - self.security_manager._sync_dag_view_permissions.assert_called_once_with( - permissions.resource_name_for_dag('has_access_control'), access_control - ) + access_control = {'Public': {permissions.ACTION_CAN_READ}} - del dagbag.dags["has_access_control"] - with assert_queries_count(1): # one query to get all perms; dagbag is mocked - self.security_manager.create_dag_specific_permissions() + collect_dags_from_db_mock = mock.Mock() + dagbag_mock = mock.Mock() + dagbag_mock.dags = {dag.dag_id: dag for dag in sample_dags} + dagbag_mock.collect_dags_from_db = collect_dags_from_db_mock + dagbag_class_mock = mock.Mock() + dagbag_class_mock.return_value = dagbag_mock + import airflow.www.security - def test_get_all_permissions(self): - with assert_queries_count(1): - perms = self.security_manager.get_all_permissions() + monkeypatch.setitem(airflow.www.security.__dict__, "DagBag", dagbag_class_mock) + security_manager._sync_dag_view_permissions = mock.Mock() - assert isinstance(perms, set) - for perm in perms: - assert isinstance(perm, tuple) - assert len(perm) == 2 - - assert ('can_read', 'Connections') in perms - - def test_get_all_non_dag_permissions(self): - with assert_queries_count(1): - pvs = self.security_manager._get_all_non_dag_permissions() - - assert isinstance(pvs, dict) - for (perm_name, viewmodel_name), perm in pvs.items(): - assert isinstance(perm_name, str) - assert isinstance(viewmodel_name, str) - assert isinstance(perm, self.security_manager.permission_model) - - assert ('can_read', 'Connections') in pvs - - def test_get_all_roles_with_permissions(self): - with assert_queries_count(1): - roles = self.security_manager._get_all_roles_with_permissions() - - assert isinstance(roles, dict) - for role_name, role in roles.items(): - assert isinstance(role_name, str) - assert isinstance(role, self.security_manager.role_model) - - assert 'Admin' in roles - - def test_prefixed_dag_id_is_deprecated(self): - with pytest.warns( - DeprecationWarning, - match=( - "`prefixed_dag_id` has been deprecated. " - "Please use `airflow.security.permissions.resource_name_for_dag` instead." - ), - ): - self.security_manager.prefixed_dag_id("hello") - - def test_parent_dag_access_applies_to_subdag(self): - username = 'dag_permission_user' - role_name = 'dag_permission_role' - parent_dag_name = "parent_dag" - with self.app.app_context(): - mock_roles = [ - { - 'role': role_name, - 'perms': [ - (permissions.ACTION_CAN_READ, f"DAG:{parent_dag_name}"), - (permissions.ACTION_CAN_EDIT, f"DAG:{parent_dag_name}"), - ], - } - ] - user = api_connexion_utils.create_user( - self.app, - username=username, - role_name=role_name, - ) - self.security_manager.bulk_sync_roles(mock_roles) - self.security_manager._sync_dag_view_permissions( + for dag in sample_dags: + dag_resource_name = permissions.resource_name_for_dag(dag.dag_id) + all_perms = security_manager.get_all_permissions() + assert ('can_read', dag_resource_name) not in all_perms + assert ('can_edit', dag_resource_name) not in all_perms + + security_manager.create_dag_specific_permissions() + + dagbag_class_mock.assert_called_once_with(read_dags_from_db=True) + collect_dags_from_db_mock.assert_called_once_with() + + for dag in sample_dags: + dag_resource_name = permissions.resource_name_for_dag(dag.dag_id) + all_perms = security_manager.get_all_permissions() + assert ('can_read', dag_resource_name) in all_perms + assert ('can_edit', dag_resource_name) in all_perms + + security_manager._sync_dag_view_permissions.assert_called_once_with( + permissions.resource_name_for_dag('has_access_control'), access_control + ) + + del dagbag_mock.dags["has_access_control"] + with assert_queries_count(1): # one query to get all perms; dagbag is mocked + security_manager.create_dag_specific_permissions() + + +def test_get_all_permissions(security_manager): + with assert_queries_count(1): + perms = security_manager.get_all_permissions() + + assert isinstance(perms, set) + for perm in perms: + assert isinstance(perm, tuple) + assert len(perm) == 2 + + assert ('can_read', 'Connections') in perms + + +def test_get_all_non_dag_permissions(security_manager): + with assert_queries_count(1): + pvs = security_manager._get_all_non_dag_permissions() + + assert isinstance(pvs, dict) + for (perm_name, viewmodel_name), perm in pvs.items(): + assert isinstance(perm_name, str) + assert isinstance(viewmodel_name, str) + assert isinstance(perm, security_manager.permission_model) + + assert ('can_read', 'Connections') in pvs + + +def test_get_all_roles_with_permissions(security_manager): + with assert_queries_count(1): + roles = security_manager._get_all_roles_with_permissions() + + assert isinstance(roles, dict) + for role_name, role in roles.items(): + assert isinstance(role_name, str) + assert isinstance(role, security_manager.role_model) + + assert 'Admin' in roles + + +def test_prefixed_dag_id_is_deprecated(security_manager): + with pytest.warns( + DeprecationWarning, + match=( + "`prefixed_dag_id` has been deprecated. " + "Please use `airflow.security.permissions.resource_name_for_dag` instead." + ), + ): + security_manager.prefixed_dag_id("hello") + + +def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_has_dag_perms): + username = 'dag_permission_user' + role_name = 'dag_permission_role' + parent_dag_name = "parent_dag" + with app.app_context(): + mock_roles = [ + { + 'role': role_name, + 'perms': [ + (permissions.ACTION_CAN_READ, f"DAG:{parent_dag_name}"), + (permissions.ACTION_CAN_EDIT, f"DAG:{parent_dag_name}"), + ], + } + ] + with create_user_scope( + app, + username=username, + role_name=role_name, + ) as user: + security_manager.bulk_sync_roles(mock_roles) + security_manager._sync_dag_view_permissions( parent_dag_name, access_control={role_name: READ_WRITE} ) - self.assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name, user=user) - self.assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name + ".subdag", user=user) - self.assert_user_has_dag_perms( + assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name, user=user) + assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name + ".subdag", user=user) + assert_user_has_dag_perms( perms=READ_WRITE, dag_id=parent_dag_name + ".subdag.subsubdag", user=user ) diff --git a/tests/www/views/conftest.py b/tests/www/views/conftest.py index 049742bc860c6..fab4ef41d6ce5 100644 --- a/tests/www/views/conftest.py +++ b/tests/www/views/conftest.py @@ -25,7 +25,7 @@ from airflow import settings from airflow.models import DagBag from airflow.www.app import create_app -from tests.test_utils.api_connexion_utils import delete_roles +from tests.test_utils.api_connexion_utils import delete_user from tests.test_utils.decorators import dont_initialize_flask_app_submodules from tests.test_utils.www import client_with_login @@ -66,42 +66,47 @@ def factory(): app.jinja_env.undefined = jinja2.StrictUndefined security_manager = app.appbuilder.sm - if not security_manager.find_user(username='test'): - security_manager.add_user( - username='test', - first_name='test', - last_name='test', - email='test@fab.org', - role=security_manager.find_role('Admin'), - password='test', - ) - if not security_manager.find_user(username='test_user'): - security_manager.add_user( - username='test_user', - first_name='test_user', - last_name='test_user', - email='test_user@fab.org', - role=security_manager.find_role('User'), - password='test_user', - ) - if not security_manager.find_user(username='test_viewer'): - security_manager.add_user( - username='test_viewer', - first_name='test_viewer', - last_name='test_viewer', - email='test_viewer@fab.org', - role=security_manager.find_role('Viewer'), - password='test_viewer', - ) + + test_users = [ + { + 'username': 'test_admin', + 'first_name': 'test_admin_first_name', + 'last_name': 'test_admin_last_name', + 'email': 'test_admin@fab.org', + 'role': security_manager.find_role('Admin'), + 'password': 'test_admin_password', + }, + { + 'username': 'test_user', + 'first_name': 'test_user_first_name', + 'last_name': 'test_user_last_name', + 'email': 'test_user@fab.org', + 'role': security_manager.find_role('User'), + 'password': 'test_user_password', + }, + { + 'username': 'test_viewer', + 'first_name': 'test_viewer_first_name', + 'last_name': 'test_viewer_last_name', + 'email': 'test_viewer@fab.org', + 'role': security_manager.find_role('Viewer'), + 'password': 'test_viewer_password', + }, + ] + + for user_dict in test_users: + if not security_manager.find_user(username=user_dict['username']): + security_manager.add_user(**user_dict) yield app - delete_roles(app) + for user_dict in test_users: + delete_user(app, user_dict['username']) @pytest.fixture() def admin_client(app): - return client_with_login(app, username="test", password="test") + return client_with_login(app, username="test_admin", password="test_admin") @pytest.fixture() diff --git a/tests/www/views/test_views_base.py b/tests/www/views/test_views_base.py index 807552b0a44f6..cc9968276e006 100644 --- a/tests/www/views/test_views_base.py +++ b/tests/www/views/test_views_base.py @@ -260,17 +260,18 @@ def test_views_post(admin_client, url, check_response): @pytest.mark.parametrize( - "url, client, content", + "url, client, content, username", [ - ("resetmypassword/form", "viewer_client", "Password Changed"), - ("resetpassword/form?pk=1", "admin_client", "Password Changed"), - ("resetpassword/form?pk=1", "viewer_client", "Access is Denied"), + ("resetmypassword/form", "viewer_client", "Password Changed", "test_viewer"), + ("resetpassword/form?pk={}", "admin_client", "Password Changed", "test_admin"), + ("resetpassword/form?pk={}", "viewer_client", "Access is Denied", "test_viewer"), ], ids=["my-viewer", "pk-admin", "pk-viewer"], ) -def test_resetmypasswordview_edit(request, url, client, content): +def test_resetmypasswordview_edit(app, request, url, client, content, username): + user = app.appbuilder.sm.find_user(username) resp = request.getfixturevalue(client).post( - url, data={'password': 'blah', 'conf_password': 'blah'}, follow_redirects=True + url.format(user.id), data={'password': 'blah', 'conf_password': 'blah'}, follow_redirects=True ) check_content_in_response(content, resp)