Skip to content

Commit

Permalink
fix: fix bucket url and permission types
Browse files Browse the repository at this point in the history
* fix: allow non-timedelta expiry values for upload/download URLs
* fix: replace incorrect bucket permission values
Existing values will still work, with a deprecation warning.

---------

Co-authored-by: Tim Holm <[email protected]>
  • Loading branch information
jyecusch and tjholm authored Apr 10, 2024
1 parent 3698993 commit 6b20506
Showing 1 changed file with 60 additions and 16 deletions.
76 changes: 60 additions & 16 deletions nitric/resources/buckets.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from dataclasses import dataclass
from datetime import timedelta
from enum import Enum
from typing import Callable, List, Literal, Optional, Union
from typing import Callable, List, Literal, Optional, Union, cast
from warnings import warn

import betterproto
Expand Down Expand Up @@ -146,20 +146,40 @@ async def delete(self):
except GRPCError as grpc_err:
raise exception_from_grpc_error(grpc_err) from grpc_err

async def upload_url(self, expiry: Optional[timedelta] = None):
"""Get a temporary writable URL to this file."""
return await self.sign_url(mode=FileMode.WRITE, expiry=expiry)
async def upload_url(self, expiry: Optional[Union[timedelta, int]] = None):
"""
Get a temporary writable URL to this file.
async def download_url(self, expiry: Optional[timedelta] = None):
"""Get a temporary readable URL to this file."""
return await self.sign_url(mode=FileMode.READ, expiry=expiry)
Parameters:
async def sign_url(self, mode: FileMode = FileMode.READ, expiry: Optional[timedelta] = None):
"""Generate a signed URL for reading or writing to a file."""
warn("File.sign_url() is deprecated, use upload_url() or download_url() instead", DeprecationWarning)
expiry (timedelta or int, optional): The expiry time for the signed URL.
If an integer is provided, it is treated as seconds. Default is 600 seconds.
Returns:
str: The signed URL.
"""
return await self._sign_url(mode=FileMode.WRITE, expiry=expiry)

async def download_url(self, expiry: Optional[Union[timedelta, int]] = None):
"""
Get a temporary readable URL to this file.
Parameters:
expiry (timedelta or int, optional): The expiry time for the signed URL.
If an integer is provided, it is treated as seconds. Default is 600 seconds.
Returns:
str: The signed URL.
"""
return await self._sign_url(mode=FileMode.READ, expiry=expiry)

async def _sign_url(self, mode: FileMode = FileMode.READ, expiry: Optional[Union[timedelta, int]] = None):
"""Generate a signed URL for reading or writing to a file."""
if expiry is None:
expiry = timedelta(seconds=600)
if not isinstance(expiry, timedelta):
expiry = timedelta(seconds=expiry)

try:
response = await self._bucket._storage_stub.pre_sign_url( # type: ignore pylint: disable=protected-access
Expand All @@ -172,7 +192,25 @@ async def sign_url(self, mode: FileMode = FileMode.READ, expiry: Optional[timede
raise exception_from_grpc_error(grpc_err) from grpc_err


BucketPermission = Literal["reading", "writing", "deleting"]
LegacyBucketPermission = Literal["reading", "writing", "deleting"]
BucketPermission = Literal["read", "write", "delete"]

legacy_perms: List[LegacyBucketPermission] = ["reading", "writing", "deleting"]
new_perms: List[BucketPermission] = ["read", "write", "delete"]


def check_permission(permission: Union[LegacyBucketPermission, BucketPermission]) -> BucketPermission:
"""Check if the permission is valid and return the new permission if it is a legacy permission."""
if permission in legacy_perms:
new_perm = new_perms[legacy_perms.index(cast(LegacyBucketPermission, permission))]
warn(
f"The permission '{permission}' is deprecated. Use '{new_perm}' instead.", DeprecationWarning, stacklevel=2
)
return new_perm
elif permission in new_perms:
return cast(BucketPermission, permission)
else:
raise ValueError("Invalid permission value, must be one of 'read', 'write', or 'delete'.")


class BucketNotificationWorkerOptions:
Expand Down Expand Up @@ -211,19 +249,25 @@ async def _register(self) -> None:

def _perms_to_actions(self, *args: BucketPermission) -> List[Action]:
permission_actions_map: dict[BucketPermission, List[Action]] = {
"reading": [Action.BucketFileGet, Action.BucketFileList],
"writing": [Action.BucketFilePut],
"deleting": [Action.BucketFileDelete],
"read": [Action.BucketFileGet, Action.BucketFileList],
"write": [Action.BucketFilePut],
"delete": [Action.BucketFileDelete],
}

return [action for perm in args for action in permission_actions_map[perm]]

def _to_resource_id(self) -> ResourceIdentifier:
return ResourceIdentifier(name=self.name, type=ResourceType.Bucket) # type:ignore

def allow(self, perm: BucketPermission, *args: BucketPermission) -> BucketRef:
def allow(
self,
perm: Union[LegacyBucketPermission, BucketPermission],
*args: Union[LegacyBucketPermission, BucketPermission],
) -> BucketRef:
"""Request the required permissions for this resource."""
str_args = [str(perm)] + [str(permission) for permission in args]
all_perms: List[BucketPermission] = [check_permission(perm)] + [check_permission(p) for p in args]

str_args = [str(permission) for permission in all_perms]
self._register_policy(*str_args)

return BucketRef(self.name)
Expand Down

0 comments on commit 6b20506

Please sign in to comment.