diff --git a/nitric/resources/buckets.py b/nitric/resources/buckets.py index c2e2d72..2db40f1 100644 --- a/nitric/resources/buckets.py +++ b/nitric/resources/buckets.py @@ -22,7 +22,7 @@ from dataclasses import dataclass from datetime import timedelta from enum import Enum -from typing import Callable, List, Literal, Optional, Union +from typing import Callable, List, Literal, Optional, Union, cast from warnings import warn import betterproto @@ -146,20 +146,40 @@ async def delete(self): except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err) from grpc_err - async def upload_url(self, expiry: Optional[timedelta] = None): - """Get a temporary writable URL to this file.""" - return await self.sign_url(mode=FileMode.WRITE, expiry=expiry) + async def upload_url(self, expiry: Optional[Union[timedelta, int]] = None): + """ + Get a temporary writable URL to this file. - async def download_url(self, expiry: Optional[timedelta] = None): - """Get a temporary readable URL to this file.""" - return await self.sign_url(mode=FileMode.READ, expiry=expiry) + Parameters: - async def sign_url(self, mode: FileMode = FileMode.READ, expiry: Optional[timedelta] = None): - """Generate a signed URL for reading or writing to a file.""" - warn("File.sign_url() is deprecated, use upload_url() or download_url() instead", DeprecationWarning) + expiry (timedelta or int, optional): The expiry time for the signed URL. + If an integer is provided, it is treated as seconds. Default is 600 seconds. + Returns: + str: The signed URL. + """ + return await self._sign_url(mode=FileMode.WRITE, expiry=expiry) + + async def download_url(self, expiry: Optional[Union[timedelta, int]] = None): + """ + Get a temporary readable URL to this file. + + Parameters: + + expiry (timedelta or int, optional): The expiry time for the signed URL. + If an integer is provided, it is treated as seconds. Default is 600 seconds. + + Returns: + str: The signed URL. + """ + return await self._sign_url(mode=FileMode.READ, expiry=expiry) + + async def _sign_url(self, mode: FileMode = FileMode.READ, expiry: Optional[Union[timedelta, int]] = None): + """Generate a signed URL for reading or writing to a file.""" if expiry is None: expiry = timedelta(seconds=600) + if not isinstance(expiry, timedelta): + expiry = timedelta(seconds=expiry) try: response = await self._bucket._storage_stub.pre_sign_url( # type: ignore pylint: disable=protected-access @@ -172,7 +192,25 @@ async def sign_url(self, mode: FileMode = FileMode.READ, expiry: Optional[timede raise exception_from_grpc_error(grpc_err) from grpc_err -BucketPermission = Literal["reading", "writing", "deleting"] +LegacyBucketPermission = Literal["reading", "writing", "deleting"] +BucketPermission = Literal["read", "write", "delete"] + +legacy_perms: List[LegacyBucketPermission] = ["reading", "writing", "deleting"] +new_perms: List[BucketPermission] = ["read", "write", "delete"] + + +def check_permission(permission: Union[LegacyBucketPermission, BucketPermission]) -> BucketPermission: + """Check if the permission is valid and return the new permission if it is a legacy permission.""" + if permission in legacy_perms: + new_perm = new_perms[legacy_perms.index(cast(LegacyBucketPermission, permission))] + warn( + f"The permission '{permission}' is deprecated. Use '{new_perm}' instead.", DeprecationWarning, stacklevel=2 + ) + return new_perm + elif permission in new_perms: + return cast(BucketPermission, permission) + else: + raise ValueError("Invalid permission value, must be one of 'read', 'write', or 'delete'.") class BucketNotificationWorkerOptions: @@ -211,9 +249,9 @@ async def _register(self) -> None: def _perms_to_actions(self, *args: BucketPermission) -> List[Action]: permission_actions_map: dict[BucketPermission, List[Action]] = { - "reading": [Action.BucketFileGet, Action.BucketFileList], - "writing": [Action.BucketFilePut], - "deleting": [Action.BucketFileDelete], + "read": [Action.BucketFileGet, Action.BucketFileList], + "write": [Action.BucketFilePut], + "delete": [Action.BucketFileDelete], } return [action for perm in args for action in permission_actions_map[perm]] @@ -221,9 +259,15 @@ def _perms_to_actions(self, *args: BucketPermission) -> List[Action]: def _to_resource_id(self) -> ResourceIdentifier: return ResourceIdentifier(name=self.name, type=ResourceType.Bucket) # type:ignore - def allow(self, perm: BucketPermission, *args: BucketPermission) -> BucketRef: + def allow( + self, + perm: Union[LegacyBucketPermission, BucketPermission], + *args: Union[LegacyBucketPermission, BucketPermission], + ) -> BucketRef: """Request the required permissions for this resource.""" - str_args = [str(perm)] + [str(permission) for permission in args] + all_perms: List[BucketPermission] = [check_permission(perm)] + [check_permission(p) for p in args] + + str_args = [str(permission) for permission in all_perms] self._register_policy(*str_args) return BucketRef(self.name)