diff --git a/src/unipoll_api/actions/group.py b/src/unipoll_api/actions/group.py index bb71085..e5878cd 100644 --- a/src/unipoll_api/actions/group.py +++ b/src/unipoll_api/actions/group.py @@ -1,6 +1,6 @@ from beanie import DeleteRules, WriteRules from unipoll_api import AccountManager -from unipoll_api.documents import Policy, Workspace, Group, Account, create_link +from unipoll_api.documents import Policy, Workspace, Group, Account from unipoll_api import actions from unipoll_api.schemas import GroupSchemas, WorkspaceSchemas from unipoll_api.exceptions import (GroupExceptions, WorkspaceExceptions) @@ -22,6 +22,9 @@ async def get_groups(workspace: Workspace | None = None, search_filter['members._id'] = account.id # type: ignore search_result = await Group.find(search_filter, fetch_links=True).to_list() + # TODO: Rewrite to iterate over list of workspaces + # TODO: to avoid permission check for every group if the user has permission to get all groups + groups = [] for group in search_result: try: @@ -35,8 +38,10 @@ async def get_groups(workspace: Workspace | None = None, # Create a new group with account as the owner async def create_group(workspace: Workspace, name: str, - description: str) -> GroupSchemas.GroupCreateOutput: - # await workspace.fetch_link(workspace.groups) + description: str, + check_permissions: bool = True) -> GroupSchemas.GroupCreateOutput: + + await Permissions.check_permissions(workspace, "add_groups", check_permissions) account = AccountManager.active_user.get() # Check if group name is unique @@ -57,15 +62,8 @@ async def create_group(workspace: Workspace, # Add the account to group member list await new_group.add_member(account, Permissions.GROUP_ALL_PERMISSIONS) - # Create a policy for the new group - permissions = Permissions.WORKSPACE_BASIC_PERMISSIONS # type: ignore - new_policy = Policy(policy_holder_type='group', - policy_holder=(await create_link(new_group)), - permissions=permissions, - parent_resource=workspace) # type: ignore - - # Add the group and the policy to the workspace - workspace.policies.append(new_policy) # type: ignore + # Create a policy for the new group + await workspace.add_policy(new_group, Permissions.GROUP_BASIC_PERMISSIONS, False) workspace.groups.append(new_group) # type: ignore await Workspace.save(workspace, link_rule=WriteRules.WRITE) @@ -74,17 +72,14 @@ async def create_group(workspace: Workspace, # Get group -async def get_group(group: Group, include_members: bool = False, include_policies: bool = False) -> GroupSchemas.Group: - account = AccountManager.active_user.get() - - # Check if the user has a permission to get all the groups in the workspace - workspace_permissions = await Permissions.get_all_permissions(group.workspace, account) - group_permissions = await Permissions.get_all_permissions(group, account) - - if not (Permissions.check_permission(workspace_permissions, Permissions.WorkspacePermissions["get_groups"]) or - Permissions.check_permission(group_permissions, Permissions.GroupPermissions["get_group"])): - raise GroupExceptions.UserNotAuthorized( - account, group, f"to view group {group.id}") +async def get_group(group: Group, + include_members: bool = False, + include_policies: bool = False, + check_permissions: bool = True) -> GroupSchemas.Group: + try: + await Permissions.check_permissions(group.workspace, "get_groups", check_permissions) + except WorkspaceExceptions.UserNotAuthorized: + await Permissions.check_permissions(group, "get_group", check_permissions) members = (await actions.MembersActions.get_members(group)).members if include_members else None policies = (await actions.PolicyActions.get_policies(resource=group)).policies if include_policies else None @@ -102,7 +97,13 @@ async def get_group(group: Group, include_members: bool = False, include_policie # Update a group async def update_group(group: Group, - group_data: GroupSchemas.GroupUpdateRequest) -> GroupSchemas.Group: + group_data: GroupSchemas.GroupUpdateRequest, + check_permissions: bool = True) -> GroupSchemas.Group: + try: + await Permissions.check_permissions(group.workspace, "update_groups", check_permissions) + except WorkspaceExceptions.UserNotAuthorized: + await Permissions.check_permissions(group, "update_group", check_permissions) + save_changes = False workspace: Workspace = group.workspace # type: ignore # The group must belong to a workspace @@ -130,7 +131,13 @@ async def update_group(group: Group, # Delete a group -async def delete_group(group: Group): +async def delete_group(group: Group, + check_permissions: bool = True): + try: + await Permissions.check_permissions(group.workspace, "delete_groups", check_permissions) + except WorkspaceExceptions.UserNotAuthorized: + await Permissions.check_permissions(group, "delete_group", check_permissions) + # await group.fetch_link(Group.workspace) workspace: Workspace = group.workspace # type: ignore workspace.groups = [ diff --git a/src/unipoll_api/actions/members.py b/src/unipoll_api/actions/members.py index aa1724c..b1ba66f 100644 --- a/src/unipoll_api/actions/members.py +++ b/src/unipoll_api/actions/members.py @@ -3,23 +3,13 @@ from unipoll_api.documents import Account, Group, ResourceID, Workspace from unipoll_api.utils import Permissions from unipoll_api.schemas import MemberSchemas -from unipoll_api import AccountManager +# from unipoll_api import AccountManager from unipoll_api.exceptions import ResourceExceptions -async def get_members(resource: Workspace | Group) -> MemberSchemas.MemberList: - account = AccountManager.active_user.get() - permissions = await Permissions.get_all_permissions(resource, account) - - if resource.resource_type == "workspace": - req_permissions = Permissions.WorkspacePermissions["get_workspace_members"] # type: ignore - elif resource.resource_type == "group": - req_permissions = Permissions.GroupPermissions["get_group_members"] # type: ignore - else: - raise ResourceExceptions.InternalServerError("Invalid resource type") - - if not Permissions.check_permission(permissions, req_permissions): - ResourceExceptions.UserNotAuthorized(account, resource.resource_type, "to view members") +async def get_members(resource: Workspace | Group, check_permissions: bool = True) -> MemberSchemas.MemberList: + # Check if the user has permission to add members + await Permissions.check_permissions(resource, "add_members", check_permissions) def build_member_scheme(member: Account) -> MemberSchemas.Member: member_data = member.model_dump(include={'id', 'first_name', 'last_name', 'email'}) @@ -33,21 +23,10 @@ def build_member_scheme(member: Account) -> MemberSchemas.Member: # Add groups/members to group async def add_members(resource: Workspace | Group, - account_id_list: list[ResourceID]) -> MemberSchemas.MemberList: + account_id_list: list[ResourceID], + check_permissions: bool = True) -> MemberSchemas.MemberList: # Check if the user has permission to add members - account = AccountManager.active_user.get() - permissions = await Permissions.get_all_permissions(resource, account) - if resource.resource_type == "workspace": - req_permissions = Permissions.WorkspacePermissions["add_workspace_members"] - default_permissions = Permissions.WORKSPACE_BASIC_PERMISSIONS - elif resource.resource_type == "group": - req_permissions = Permissions.GroupPermissions["add_group_members"] # type: ignore - default_permissions = Permissions.GROUP_BASIC_PERMISSIONS # type: ignore - else: - raise ResourceExceptions.InternalServerError("Invalid resource type") - - if not Permissions.check_permission(permissions, req_permissions): - ResourceExceptions.UserNotAuthorized(account, resource.resource_type, "to add members") + await Permissions.check_permissions(resource, "add_members", check_permissions) # Remove duplicates from the list of accounts accounts = set(account_id_list) @@ -58,6 +37,7 @@ async def add_members(resource: Workspace | Group, # Add the accounts to the group member list with basic permissions for account in account_list: + default_permissions = eval("Permissions." + resource.resource_type.upper() + "_BASIC_PERMISSIONS") await resource.add_member(account, default_permissions, save=False) await resource.save(link_rule=WriteRules.WRITE) # type: ignore @@ -66,18 +46,11 @@ async def add_members(resource: Workspace | Group, # Remove a member from a workspace -async def remove_member(resource: Workspace | Group, account: Account): +async def remove_member(resource: Workspace | Group, + account: Account, + permission_check: bool = True) -> MemberSchemas.MemberList: # Check if the user has permission to add members - account = AccountManager.active_user.get() - permissions = await Permissions.get_all_permissions(resource, account) - if resource.resource_type == "workspace": - req_permissions = Permissions.WorkspacePermissions["remove_workspace_members"] # type: ignore - elif resource.resource_type == "group": - req_permissions = Permissions.GroupPermissions["remove_group_members"] # type: ignore - else: - raise ResourceExceptions.InternalServerError("Invalid resource type") - if not Permissions.check_permission(permissions, req_permissions): - ResourceExceptions.UserNotAuthorized(account, resource.resource_type, "to add members") + await Permissions.check_permissions(resource, "remove_members", permission_check) # Check if the account is a member of the workspace if account.id not in [ResourceID(member.id) for member in resource.members]: # type: ignore diff --git a/src/unipoll_api/actions/policy.py b/src/unipoll_api/actions/policy.py index 5c559c5..9d3a59e 100644 --- a/src/unipoll_api/actions/policy.py +++ b/src/unipoll_api/actions/policy.py @@ -3,28 +3,20 @@ from unipoll_api.schemas import MemberSchemas, PolicySchemas from unipoll_api.exceptions import PolicyExceptions, ResourceExceptions from unipoll_api.utils import Permissions +from unipoll_api.utils.permissions import check_permissions # Helper function to get policies from a resource # NOTE: This can be moved to utils.py async def get_policies_from_resource(resource: Resource) -> list[Policy]: - account: Account = AccountManager.active_user.get() - req_permissions: Permissions.Permissions | None = None policies: list[Policy] = [] - if resource.resource_type == "workspace": - req_permissions = Permissions.WorkspacePermissions["get_workspace_policies"] - elif resource.resource_type == "group": - req_permissions = Permissions.GroupPermissions["get_group_policies"] - elif resource.resource_type == "poll": - req_permissions = Permissions.PollPermissions["get_poll_policies"] - if req_permissions: - permissions = await Permissions.get_all_permissions(resource, account) - if Permissions.check_permission(permissions, req_permissions): - policies = resource.policies # type: ignore - else: - for policy in resource.policies: - if policy.policy_holder.ref.id == account.id: # type: ignore - policies.append(policy) # type: ignore + account = AccountManager.active_user.get() + if await check_permissions(resource, "get_policies"): + policies = resource.policies # type: ignore + else: + for policy in resource.policies: + if policy.policy_holder.ref.id == account.id: # type: ignore + policies.append(policy) # type: ignore return policies @@ -58,23 +50,10 @@ async def get_policies(policy_holder: Account | Group | None = None, return PolicySchemas.PolicyList(policies=policy_list) +# @check_permissions(resource=policy.parent_resource, required_permissions="get_policy", permission_check=True) async def get_policy(policy: Policy, permission_check: bool = True) -> PolicySchemas.PolicyShort: - if permission_check: - account = AccountManager.active_user.get() - permissions = await Permissions.get_all_permissions(policy.parent_resource, account) - resource_type: str = policy.parent_resource.resource_type # type: ignore - - if resource_type == "workspace": # type: ignore - req_permissions = Permissions.WorkspacePermissions["get_workspace_policies"] # type: ignore - elif resource_type == "group": # type: ignore - req_permissions = Permissions.GroupPermissions["get_group_policies"] # type: ignore - elif resource_type == "poll": # type: ignore - req_permissions = Permissions.PollPermissions["get_poll_policies"] # type: ignore - else: - raise ResourceExceptions.InternalServerError("Unknown resource type") - - if not Permissions.check_permission(permissions, req_permissions): - raise ResourceExceptions.UserNotAuthorized(account, "policy", "Get policy") + await policy.parent_resource.fetch_all_links() # type: ignore + await check_permissions(policy.parent_resource, "get_policies", permission_check) # Convert policy_holder link to Member object ph_type = policy.policy_holder_type @@ -92,28 +71,17 @@ async def get_policy(policy: Policy, permission_check: bool = True) -> PolicySch permissions=permissions) -async def update_policy(policy: Policy, new_permissions: list[str]) -> PolicySchemas.PolicyOutput: +async def update_policy(policy: Policy, + new_permissions: list[str], + check_permissions: bool = True) -> PolicySchemas.PolicyOutput: # BUG: since the parent_resource is of multiple types, it is not fetched properly, so we fetch it manually await policy.parent_resource.fetch_all_links() # type: ignore # Check if the user has the required permissions to update the policy - account: Account = AccountManager.active_user.get() - permissions = await Permissions.get_all_permissions(policy.parent_resource, account) - if policy.parent_resource.resource_type == "workspace": # type: ignore - ResourcePermissions = Permissions.WorkspacePermissions # type: ignore - req_permissions = Permissions.WorkspacePermissions["set_workspace_policy"] # type: ignore - elif policy.parent_resource.resource_type == "group": # type: ignore - ResourcePermissions = Permissions.GroupPermissions # type: ignore - req_permissions = Permissions.GroupPermissions["set_group_policy"] # type: ignore - elif policy.parent_resource.resource_type == "poll": # type: ignore - ResourcePermissions = Permissions.PollPermissions # type: ignore - req_permissions = Permissions.PollPermissions["set_poll_policy"] # type: ignore - else: - raise ResourceExceptions.InternalServerError("Unknown resource type") - - if not Permissions.check_permission(permissions, req_permissions): - raise ResourceExceptions.UserNotAuthorized(account, "policy", "Update policy") + await Permissions.check_permissions(policy.parent_resource, "set_policy", check_permissions) + ResourcePermissions = eval( + "Permissions." + policy.parent_resource.resource_type.capitalize() + "Permissions") # type: ignore # Calculate the new permission value from request new_permission_value = 0 diff --git a/src/unipoll_api/actions/poll.py b/src/unipoll_api/actions/poll.py index 0188f4f..636a673 100644 --- a/src/unipoll_api/actions/poll.py +++ b/src/unipoll_api/actions/poll.py @@ -1,5 +1,4 @@ from beanie import WriteRules -from unipoll_api import AccountManager from unipoll_api.documents import Poll, Workspace from unipoll_api.schemas import PollSchemas, QuestionSchemas, WorkspaceSchemas from unipoll_api.utils import Permissions @@ -7,25 +6,22 @@ from unipoll_api import actions -async def get_polls(workspace: Workspace | None = None) -> PollSchemas.PollList: - account = AccountManager.active_user.get() - req_permissions = Permissions.WorkspacePermissions["get_polls"] # type: ignore +async def get_polls(workspace: Workspace | None = None, + check_permissions: bool = True) -> PollSchemas.PollList: all_workspaces = [workspace] if workspace else await Workspace.find(fetch_links=True).to_list() poll_list = [] for workspace in all_workspaces: - permissions = await Permissions.get_all_permissions(workspace, account) - if Permissions.check_permission(permissions, req_permissions): + try: + await Permissions.check_permissions(workspace, "get_polls", check_permissions) poll_list += workspace.polls # type: ignore - else: - req_permissions = Permissions.WorkspacePermissions["get_poll"] + except ResourceExceptions.UserNotAuthorized: + poll: Poll for poll in workspace.polls: # type: ignore - if poll.public: # type: ignore + if poll.public: poll_list.append(poll) else: - permissions = await Permissions.get_all_permissions(poll, account) - if Permissions.check_permission(permissions, req_permissions): - poll_list.append(poll) + poll_list.append(await get_poll(poll, check_permissions)) # Build poll list and return the result for poll in poll_list: poll_list.append(PollSchemas.PollShort(**poll.model_dump())) # type: ignore @@ -33,7 +29,12 @@ async def get_polls(workspace: Workspace | None = None) -> PollSchemas.PollList: # Create a new poll in a workspace -async def create_poll(workspace: Workspace, input_data: PollSchemas.CreatePollRequest) -> PollSchemas.PollResponse: +async def create_poll(workspace: Workspace, + input_data: PollSchemas.CreatePollRequest, + check_permissions: bool = True) -> PollSchemas.PollResponse: + # Check if the user has permission to create polls + await Permissions.check_permissions(actions, "create_polls", check_permissions) + # Check if poll name is unique poll: Poll # For type hinting, until Link type is supported for poll in workspace.polls: # type: ignore @@ -63,21 +64,14 @@ async def create_poll(workspace: Workspace, input_data: PollSchemas.CreatePollRe async def get_poll(poll: Poll, include_questions: bool = False, - include_policies: bool = False) -> PollSchemas.PollResponse: - account = AccountManager.active_user.get() - questions = [] - policies = None + include_policies: bool = False, + check_permissions: bool = True) -> PollSchemas.PollResponse: + if not poll.public: + await Permissions.check_permissions(poll, "get_poll", check_permissions) - permissions = await Permissions.get_all_permissions(poll, account) # Fetch the resources if the user has the required permissions - if include_questions: - req_permissions = Permissions.PollPermissions["get_poll_questions"] # type: ignore - if poll.public or Permissions.check_permission(permissions, req_permissions): - questions = (await get_poll_questions(poll)).questions - if include_policies: - req_permissions = Permissions.PollPermissions["get_poll_policies"] # type: ignore - if Permissions.check_permission(permissions, req_permissions): - policies = (await actions.PolicyActions.get_policies(resource=poll)).policies + questions = (await get_poll_questions(poll)).questions if include_questions else None + policies = (await actions.PolicyActions.get_policies(resource=poll)).policies if include_policies else None workspace = WorkspaceSchemas.WorkspaceShort(**poll.workspace.model_dump()) # type: ignore @@ -92,8 +86,12 @@ async def get_poll(poll: Poll, policies=policies) -async def get_poll_questions(poll: Poll) -> QuestionSchemas.QuestionList: - print("Poll: ", poll.questions) +async def get_poll_questions(poll: Poll, + check_permissions: bool = True) -> QuestionSchemas.QuestionList: + # Check if the user has permission to get questions + if not poll.public: + await Permissions.check_permissions(poll, "get_questions", check_permissions) + question_list = [] for question in poll.questions: # question_data = question.model_dump() diff --git a/src/unipoll_api/actions/workspace.py b/src/unipoll_api/actions/workspace.py index 498823e..af693ba 100644 --- a/src/unipoll_api/actions/workspace.py +++ b/src/unipoll_api/actions/workspace.py @@ -46,7 +46,9 @@ async def get_workspace(workspace: Workspace, include_groups: bool = False, include_policies: bool = False, include_members: bool = False, - include_polls: bool = False) -> WorkspaceSchemas.Workspace: + include_polls: bool = False, + check_permissions: bool = True) -> WorkspaceSchemas.Workspace: + await Permissions.check_permissions(workspace, "get_workspace", check_permissions) groups = (await actions.GroupActions.get_groups(workspace)).groups if include_groups else None members = (await actions.MembersActions.get_members(workspace)).members if include_members else None policies = (await actions.PolicyActions.get_policies(resource=workspace)).policies if include_policies else None @@ -63,7 +65,10 @@ async def get_workspace(workspace: Workspace, # Update a workspace async def update_workspace(workspace: Workspace, - input_data: WorkspaceSchemas.WorkspaceUpdateRequest) -> WorkspaceSchemas.Workspace: + input_data: WorkspaceSchemas.WorkspaceUpdateRequest, + check_permissions: bool = True) -> WorkspaceSchemas.Workspace: + await Permissions.check_permissions(workspace, "update_workspace", check_permissions) + save_changes = False # Check if user suplied a name if input_data.name and input_data.name != workspace.name: @@ -84,8 +89,9 @@ async def update_workspace(workspace: Workspace, # Delete a workspace -async def delete_workspace(workspace: Workspace): - # BUG: Cannot delete groups +async def delete_workspace(workspace: Workspace, check_permissions: bool = True): + await Permissions.check_permissions(workspace, "delete_workspace", check_permissions) + for group in workspace.groups: await actions.GroupActions.delete_group(group) # type: ignore