Skip to content

Commit

Permalink
refactor: Added permission check to actions
Browse files Browse the repository at this point in the history
Added permission check to actions using new check_permission function
  • Loading branch information
mike-pisman committed Oct 13, 2023
1 parent e5eb703 commit 3b83508
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 145 deletions.
57 changes: 32 additions & 25 deletions src/unipoll_api/actions/group.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from beanie import DeleteRules, WriteRules
from unipoll_api import AccountManager
from unipoll_api.documents import Policy, Workspace, Group, Account, create_link
from unipoll_api.documents import Policy, Workspace, Group, Account
from unipoll_api import actions
from unipoll_api.schemas import GroupSchemas, WorkspaceSchemas
from unipoll_api.exceptions import (GroupExceptions, WorkspaceExceptions)
Expand All @@ -22,6 +22,9 @@ async def get_groups(workspace: Workspace | None = None,
search_filter['members._id'] = account.id # type: ignore
search_result = await Group.find(search_filter, fetch_links=True).to_list()

# TODO: Rewrite to iterate over list of workspaces
# TODO: to avoid permission check for every group if the user has permission to get all groups

groups = []
for group in search_result:
try:
Expand All @@ -35,8 +38,10 @@ async def get_groups(workspace: Workspace | None = None,
# Create a new group with account as the owner
async def create_group(workspace: Workspace,
name: str,
description: str) -> GroupSchemas.GroupCreateOutput:
# await workspace.fetch_link(workspace.groups)
description: str,
check_permissions: bool = True) -> GroupSchemas.GroupCreateOutput:

await Permissions.check_permissions(workspace, "add_groups", check_permissions)
account = AccountManager.active_user.get()

# Check if group name is unique
Expand All @@ -57,15 +62,8 @@ async def create_group(workspace: Workspace,
# Add the account to group member list
await new_group.add_member(account, Permissions.GROUP_ALL_PERMISSIONS)

# Create a policy for the new group
permissions = Permissions.WORKSPACE_BASIC_PERMISSIONS # type: ignore
new_policy = Policy(policy_holder_type='group',
policy_holder=(await create_link(new_group)),
permissions=permissions,
parent_resource=workspace) # type: ignore

# Add the group and the policy to the workspace
workspace.policies.append(new_policy) # type: ignore
# Create a policy for the new group
await workspace.add_policy(new_group, Permissions.GROUP_BASIC_PERMISSIONS, False)
workspace.groups.append(new_group) # type: ignore
await Workspace.save(workspace, link_rule=WriteRules.WRITE)

Expand All @@ -74,17 +72,14 @@ async def create_group(workspace: Workspace,


# Get group
async def get_group(group: Group, include_members: bool = False, include_policies: bool = False) -> GroupSchemas.Group:
account = AccountManager.active_user.get()

# Check if the user has a permission to get all the groups in the workspace
workspace_permissions = await Permissions.get_all_permissions(group.workspace, account)
group_permissions = await Permissions.get_all_permissions(group, account)

if not (Permissions.check_permission(workspace_permissions, Permissions.WorkspacePermissions["get_groups"]) or
Permissions.check_permission(group_permissions, Permissions.GroupPermissions["get_group"])):
raise GroupExceptions.UserNotAuthorized(
account, group, f"to view group {group.id}")
async def get_group(group: Group,
include_members: bool = False,
include_policies: bool = False,
check_permissions: bool = True) -> GroupSchemas.Group:
try:
await Permissions.check_permissions(group.workspace, "get_groups", check_permissions)
except WorkspaceExceptions.UserNotAuthorized:
await Permissions.check_permissions(group, "get_group", check_permissions)

members = (await actions.MembersActions.get_members(group)).members if include_members else None
policies = (await actions.PolicyActions.get_policies(resource=group)).policies if include_policies else None
Expand All @@ -102,7 +97,13 @@ async def get_group(group: Group, include_members: bool = False, include_policie

# Update a group
async def update_group(group: Group,
group_data: GroupSchemas.GroupUpdateRequest) -> GroupSchemas.Group:
group_data: GroupSchemas.GroupUpdateRequest,
check_permissions: bool = True) -> GroupSchemas.Group:
try:
await Permissions.check_permissions(group.workspace, "update_groups", check_permissions)
except WorkspaceExceptions.UserNotAuthorized:
await Permissions.check_permissions(group, "update_group", check_permissions)

save_changes = False
workspace: Workspace = group.workspace # type: ignore
# The group must belong to a workspace
Expand Down Expand Up @@ -130,7 +131,13 @@ async def update_group(group: Group,


# Delete a group
async def delete_group(group: Group):
async def delete_group(group: Group,
check_permissions: bool = True):
try:
await Permissions.check_permissions(group.workspace, "delete_groups", check_permissions)
except WorkspaceExceptions.UserNotAuthorized:
await Permissions.check_permissions(group, "delete_group", check_permissions)

# await group.fetch_link(Group.workspace)
workspace: Workspace = group.workspace # type: ignore
workspace.groups = [
Expand Down
51 changes: 12 additions & 39 deletions src/unipoll_api/actions/members.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,13 @@
from unipoll_api.documents import Account, Group, ResourceID, Workspace
from unipoll_api.utils import Permissions
from unipoll_api.schemas import MemberSchemas
from unipoll_api import AccountManager
# from unipoll_api import AccountManager
from unipoll_api.exceptions import ResourceExceptions


async def get_members(resource: Workspace | Group) -> MemberSchemas.MemberList:
account = AccountManager.active_user.get()
permissions = await Permissions.get_all_permissions(resource, account)

if resource.resource_type == "workspace":
req_permissions = Permissions.WorkspacePermissions["get_workspace_members"] # type: ignore
elif resource.resource_type == "group":
req_permissions = Permissions.GroupPermissions["get_group_members"] # type: ignore
else:
raise ResourceExceptions.InternalServerError("Invalid resource type")

if not Permissions.check_permission(permissions, req_permissions):
ResourceExceptions.UserNotAuthorized(account, resource.resource_type, "to view members")
async def get_members(resource: Workspace | Group, check_permissions: bool = True) -> MemberSchemas.MemberList:
# Check if the user has permission to add members
await Permissions.check_permissions(resource, "add_members", check_permissions)

def build_member_scheme(member: Account) -> MemberSchemas.Member:
member_data = member.model_dump(include={'id', 'first_name', 'last_name', 'email'})
Expand All @@ -33,21 +23,10 @@ def build_member_scheme(member: Account) -> MemberSchemas.Member:

# Add groups/members to group
async def add_members(resource: Workspace | Group,
account_id_list: list[ResourceID]) -> MemberSchemas.MemberList:
account_id_list: list[ResourceID],
check_permissions: bool = True) -> MemberSchemas.MemberList:
# Check if the user has permission to add members
account = AccountManager.active_user.get()
permissions = await Permissions.get_all_permissions(resource, account)
if resource.resource_type == "workspace":
req_permissions = Permissions.WorkspacePermissions["add_workspace_members"]
default_permissions = Permissions.WORKSPACE_BASIC_PERMISSIONS
elif resource.resource_type == "group":
req_permissions = Permissions.GroupPermissions["add_group_members"] # type: ignore
default_permissions = Permissions.GROUP_BASIC_PERMISSIONS # type: ignore
else:
raise ResourceExceptions.InternalServerError("Invalid resource type")

if not Permissions.check_permission(permissions, req_permissions):
ResourceExceptions.UserNotAuthorized(account, resource.resource_type, "to add members")
await Permissions.check_permissions(resource, "add_members", check_permissions)

# Remove duplicates from the list of accounts
accounts = set(account_id_list)
Expand All @@ -58,6 +37,7 @@ async def add_members(resource: Workspace | Group,
# Add the accounts to the group member list with basic permissions

for account in account_list:
default_permissions = eval("Permissions." + resource.resource_type.upper() + "_BASIC_PERMISSIONS")
await resource.add_member(account, default_permissions, save=False)
await resource.save(link_rule=WriteRules.WRITE) # type: ignore

Expand All @@ -66,18 +46,11 @@ async def add_members(resource: Workspace | Group,


# Remove a member from a workspace
async def remove_member(resource: Workspace | Group, account: Account):
async def remove_member(resource: Workspace | Group,
account: Account,
permission_check: bool = True) -> MemberSchemas.MemberList:
# Check if the user has permission to add members
account = AccountManager.active_user.get()
permissions = await Permissions.get_all_permissions(resource, account)
if resource.resource_type == "workspace":
req_permissions = Permissions.WorkspacePermissions["remove_workspace_members"] # type: ignore
elif resource.resource_type == "group":
req_permissions = Permissions.GroupPermissions["remove_group_members"] # type: ignore
else:
raise ResourceExceptions.InternalServerError("Invalid resource type")
if not Permissions.check_permission(permissions, req_permissions):
ResourceExceptions.UserNotAuthorized(account, resource.resource_type, "to add members")
await Permissions.check_permissions(resource, "remove_members", permission_check)

# Check if the account is a member of the workspace
if account.id not in [ResourceID(member.id) for member in resource.members]: # type: ignore
Expand Down
66 changes: 17 additions & 49 deletions src/unipoll_api/actions/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,20 @@
from unipoll_api.schemas import MemberSchemas, PolicySchemas
from unipoll_api.exceptions import PolicyExceptions, ResourceExceptions
from unipoll_api.utils import Permissions
from unipoll_api.utils.permissions import check_permissions


# Helper function to get policies from a resource
# NOTE: This can be moved to utils.py
async def get_policies_from_resource(resource: Resource) -> list[Policy]:
account: Account = AccountManager.active_user.get()
req_permissions: Permissions.Permissions | None = None
policies: list[Policy] = []
if resource.resource_type == "workspace":
req_permissions = Permissions.WorkspacePermissions["get_workspace_policies"]
elif resource.resource_type == "group":
req_permissions = Permissions.GroupPermissions["get_group_policies"]
elif resource.resource_type == "poll":
req_permissions = Permissions.PollPermissions["get_poll_policies"]
if req_permissions:
permissions = await Permissions.get_all_permissions(resource, account)
if Permissions.check_permission(permissions, req_permissions):
policies = resource.policies # type: ignore
else:
for policy in resource.policies:
if policy.policy_holder.ref.id == account.id: # type: ignore
policies.append(policy) # type: ignore
account = AccountManager.active_user.get()
if await check_permissions(resource, "get_policies"):
policies = resource.policies # type: ignore
else:
for policy in resource.policies:
if policy.policy_holder.ref.id == account.id: # type: ignore
policies.append(policy) # type: ignore
return policies


Expand Down Expand Up @@ -58,23 +50,10 @@ async def get_policies(policy_holder: Account | Group | None = None,
return PolicySchemas.PolicyList(policies=policy_list)


# @check_permissions(resource=policy.parent_resource, required_permissions="get_policy", permission_check=True)
async def get_policy(policy: Policy, permission_check: bool = True) -> PolicySchemas.PolicyShort:
if permission_check:
account = AccountManager.active_user.get()
permissions = await Permissions.get_all_permissions(policy.parent_resource, account)
resource_type: str = policy.parent_resource.resource_type # type: ignore

if resource_type == "workspace": # type: ignore
req_permissions = Permissions.WorkspacePermissions["get_workspace_policies"] # type: ignore
elif resource_type == "group": # type: ignore
req_permissions = Permissions.GroupPermissions["get_group_policies"] # type: ignore
elif resource_type == "poll": # type: ignore
req_permissions = Permissions.PollPermissions["get_poll_policies"] # type: ignore
else:
raise ResourceExceptions.InternalServerError("Unknown resource type")

if not Permissions.check_permission(permissions, req_permissions):
raise ResourceExceptions.UserNotAuthorized(account, "policy", "Get policy")
await policy.parent_resource.fetch_all_links() # type: ignore
await check_permissions(policy.parent_resource, "get_policies", permission_check)

# Convert policy_holder link to Member object
ph_type = policy.policy_holder_type
Expand All @@ -92,28 +71,17 @@ async def get_policy(policy: Policy, permission_check: bool = True) -> PolicySch
permissions=permissions)


async def update_policy(policy: Policy, new_permissions: list[str]) -> PolicySchemas.PolicyOutput:
async def update_policy(policy: Policy,
new_permissions: list[str],
check_permissions: bool = True) -> PolicySchemas.PolicyOutput:

# BUG: since the parent_resource is of multiple types, it is not fetched properly, so we fetch it manually
await policy.parent_resource.fetch_all_links() # type: ignore

# Check if the user has the required permissions to update the policy
account: Account = AccountManager.active_user.get()
permissions = await Permissions.get_all_permissions(policy.parent_resource, account)
if policy.parent_resource.resource_type == "workspace": # type: ignore
ResourcePermissions = Permissions.WorkspacePermissions # type: ignore
req_permissions = Permissions.WorkspacePermissions["set_workspace_policy"] # type: ignore
elif policy.parent_resource.resource_type == "group": # type: ignore
ResourcePermissions = Permissions.GroupPermissions # type: ignore
req_permissions = Permissions.GroupPermissions["set_group_policy"] # type: ignore
elif policy.parent_resource.resource_type == "poll": # type: ignore
ResourcePermissions = Permissions.PollPermissions # type: ignore
req_permissions = Permissions.PollPermissions["set_poll_policy"] # type: ignore
else:
raise ResourceExceptions.InternalServerError("Unknown resource type")

if not Permissions.check_permission(permissions, req_permissions):
raise ResourceExceptions.UserNotAuthorized(account, "policy", "Update policy")
await Permissions.check_permissions(policy.parent_resource, "set_policy", check_permissions)
ResourcePermissions = eval(
"Permissions." + policy.parent_resource.resource_type.capitalize() + "Permissions") # type: ignore

# Calculate the new permission value from request
new_permission_value = 0
Expand Down
Loading

0 comments on commit 3b83508

Please sign in to comment.