Skip to content

Commit

Permalink
fix: add on method to BucketRef
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Jye Cusch <[email protected]>
  • Loading branch information
tjholm and jyecusch authored Apr 12, 2024
1 parent 6b20506 commit b8fa116
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 108 deletions.
1 change: 1 addition & 0 deletions nitric/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def run(cls) -> None:

loop.run_until_complete(asyncio.gather(*[wkr.start() for wkr in cls._workers]))
except KeyboardInterrupt:

print("\nexiting")
except ConnectionRefusedError:
raise NitricUnavailableException(
Expand Down
86 changes: 1 addition & 85 deletions nitric/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,10 @@
from opentelemetry import propagate

from nitric.proto.schedules.v1 import ServerMessage as ScheduleServerMessage
from nitric.proto.storage.v1 import BlobEventRequest, BlobEventType
from nitric.proto.topics.v1 import ClientMessage as TopicClientMessage
from nitric.proto.topics.v1 import MessageResponse as TopicResponse
from nitric.proto.topics.v1 import ServerMessage as TopicServerMessage

# from nitric.proto.websockets.v1 import ServerMessage as WebsocketServerMessage
# from nitric.proto.websockets.v1 import WebsocketEventResponse

Record = Dict[str, Union[str, List[str]]]
PROPAGATOR = propagate.get_global_textmap()

Expand Down Expand Up @@ -255,73 +251,6 @@ def __init__(self, request: AnyWebsocketRequest, response: Optional[AnyWebsocket
self.res = WebsocketResponse()


class BucketNotifyRequest:
"""Represents a translated Event, from a subscribed bucket notification, forwarded from the Nitric Membrane."""

bucket_name: str
key: str
notification_type: BlobEventType

def __init__(self, bucket_name: str, key: str, notification_type: BlobEventType):
"""Construct a new EventRequest."""
self.bucket_name = bucket_name
self.key = key
self.notification_type = notification_type


class BucketNotifyResponse:
"""Represents the response to a trigger from a Bucket."""

def __init__(self, success: bool = True):
"""Construct a new BucketNotificationResponse."""
self.success = success


class BucketNotificationContext:
"""Represents the full request/response context for a bucket notification trigger."""

def __init__(self, request: BucketNotifyRequest, response: Optional[BucketNotifyResponse] = None):
"""Construct a new BucketNotificationContext."""
self.req = request
self.res = response if response else BucketNotifyResponse()


class FileNotifyRequest(BucketNotifyRequest):
"""Represents a translated Event, from a subscribed bucket notification, forwarded from the Nitric Membrane."""

def __init__(
self,
bucket_name: str,
bucket_ref: Any, # can't import BucketRef due to circular dependency problems
key: str,
notification_type: BlobEventType,
):
"""Construct a new FileNotificationRequest."""
super().__init__(bucket_name=bucket_name, key=key, notification_type=notification_type)
self.file = bucket_ref.file(key)


class FileNotificationContext(BucketNotificationContext):
"""Represents the full request/response context for a bucket notification trigger."""

def __init__(self, request: FileNotifyRequest, response: Optional[BucketNotifyResponse] = None):
"""Construct a new FileNotificationContext."""
super().__init__(request=request, response=response)
self.req = request

@staticmethod
def _from_client_message_with_bucket(msg: BlobEventRequest, bucket_ref) -> FileNotificationContext:
"""Construct a new FileNotificationTrigger from a Bucket Notification trigger from the Nitric Membrane."""
return FileNotificationContext(
request=FileNotifyRequest(
bucket_name=msg.bucket_name,
key=msg.blob_event.key,
bucket_ref=bucket_ref,
notification_type=msg.blob_event.type,
)
)


# == Schedules ==


Expand Down Expand Up @@ -352,16 +281,7 @@ def __init__(self, msg: ScheduleServerMessage):
self.res = IntervalResponse(msg.id)


C = TypeVar(
"C",
TriggerContext,
HttpContext,
MessageContext,
FileNotificationContext,
BucketNotificationContext,
WebsocketContext,
IntervalContext,
)
C = TypeVar("C")


class Middleware(Protocol, Generic[C]):
Expand All @@ -383,15 +303,11 @@ async def __call__(self, ctx: C) -> C | None:
HttpMiddleware = Middleware[HttpContext]
EventMiddleware = Middleware[MessageContext]
IntervalMiddleware = Middleware[IntervalContext]
BucketNotificationMiddleware = Middleware[BucketNotificationContext]
FileNotificationMiddleware = Middleware[FileNotificationContext]
WebsocketMiddleware = Middleware[WebsocketContext]

HttpHandler = Handler[HttpContext]
EventHandler = Handler[MessageContext]
IntervalHandler = Handler[IntervalContext]
BucketNotificationHandler = Handler[BucketNotificationContext]
FileNotificationHandler = Handler[FileNotificationContext]
WebsocketHandler = Handler[WebsocketContext]


Expand Down
4 changes: 2 additions & 2 deletions nitric/resources/apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def __init__(
if middleware is None:
middleware = []
if security is None:
security = {}
security = []
self.middleware = middleware
self.security = security
self.path = path
Expand Down Expand Up @@ -202,7 +202,7 @@ def _route(self, match: str, opts: Optional[RouteOptions] = None) -> Route:
opts = RouteOptions()

if self.middleware is not None:
opts.middleware = self.middleware + opts.middleware
opts.middleware = (self.middleware + opts.middleware) if opts.middleware is not None else self.middleware

r = Route(self, match, opts)
self.routes.append(r)
Expand Down
138 changes: 121 additions & 17 deletions nitric/resources/buckets.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@

from nitric.application import Nitric
from nitric.bidi import AsyncNotifierList
from nitric.context import BucketNotificationContext, BucketNotificationHandler, BucketNotifyRequest, FunctionServer
from nitric.context import FunctionServer, Handler, Middleware
from nitric.exception import InvalidArgumentException, exception_from_grpc_error
from nitric.proto.resources.v1 import Action, ResourceDeclareRequest, ResourceIdentifier, ResourceType
from nitric.proto.storage.v1 import (
BlobEventRequest,
BlobEventResponse,
BlobEventType,
ClientMessage,
Expand All @@ -54,6 +55,84 @@
from nitric.utils import new_default_channel


class BucketNotifyRequest:
"""Represents a translated Event, from a subscribed bucket notification, forwarded from the Nitric Membrane."""

bucket_name: str
key: str
notification_type: BlobEventType
bucket: BucketRef
file: FileRef

def __init__(self, bucket_name: str, key: str, notification_type: BlobEventType):
"""Construct a new BucketNotifyRequest."""
self.bucket_name = bucket_name
self.key = key
self.notification_type = notification_type
self.bucket = BucketRef(bucket_name)
self.file = self.bucket.file(key)


class BucketNotifyResponse:
"""Represents the response to a trigger from a Bucket."""

def __init__(self, success: bool = True):
"""Construct a new BucketNotificationResponse."""
self.success = success


class BucketNotificationContext:
"""Represents the full request/response context for a bucket notification trigger."""

def __init__(self, request: BucketNotifyRequest, response: Optional[BucketNotifyResponse] = None):
"""Construct a new BucketNotificationContext."""
self.req = request
self.res = response if response else BucketNotifyResponse()


class FileNotifyRequest(BucketNotifyRequest):
"""Represents a translated Event, from a subscribed bucket notification, forwarded from the Nitric Membrane."""

def __init__(
self,
bucket_name: str,
bucket_ref: BucketRef,
key: str,
notification_type: BlobEventType,
):
"""Construct a new FileNotificationRequest."""
super().__init__(bucket_name=bucket_name, key=key, notification_type=notification_type)
self.file = bucket_ref.file(key)


class FileNotificationContext(BucketNotificationContext):
"""Represents the full request/response context for a bucket notification trigger."""

def __init__(self, request: FileNotifyRequest, response: Optional[BucketNotifyResponse] = None):
"""Construct a new FileNotificationContext."""
super().__init__(request=request, response=response)
self.req = request

@staticmethod
def _from_client_message_with_bucket(msg: BlobEventRequest, bucket_ref) -> FileNotificationContext:
"""Construct a new FileNotificationTrigger from a Bucket Notification trigger from the Nitric Membrane."""
return FileNotificationContext(
request=FileNotifyRequest(
bucket_name=msg.bucket_name,
key=msg.blob_event.key,
bucket_ref=bucket_ref,
notification_type=msg.blob_event.type,
)
)


BucketNotificationMiddleware = Middleware[BucketNotificationContext]
BucketNotificationHandler = Handler[BucketNotificationContext]

FileNotificationMiddleware = Middleware[FileNotificationContext]
FileNotificationHandler = Handler[FileNotificationContext]


class BucketRef(object):
"""A reference to a deployed storage bucket, used to interact with the bucket at runtime."""

Expand Down Expand Up @@ -90,6 +169,21 @@ async def exists(self, key: str) -> bool:
)
return resp.exists

def on(
self, notification_type: str, notification_prefix_filter: str
) -> Callable[[BucketNotificationHandler], None]:
"""Create and return a bucket notification decorator for this bucket."""

def decorator(func: BucketNotificationHandler) -> None:
Listener(
bucket_name=self.name,
notification_type=notification_type,
notification_prefix_filter=notification_prefix_filter,
handler=func,
)

return decorator


class FileMode(Enum):
"""Definition of available operation modes for file signed URLs."""
Expand Down Expand Up @@ -121,7 +215,7 @@ async def write(self, body: bytes):
Will create the file if it doesn't already exist.
"""
try:
await self._bucket._storage_stub.write( # type: ignore pylint: disable=protected-access
await self._bucket._storage_stub.write(
storage_write_request=StorageWriteRequest(bucket_name=self._bucket.name, key=self.key, body=body)
)
except GRPCError as grpc_err:
Expand All @@ -130,7 +224,7 @@ async def write(self, body: bytes):
async def read(self) -> bytes:
"""Read this files contents from the bucket."""
try:
response = await self._bucket._storage_stub.read( # type: ignore pylint: disable=protected-access
response = await self._bucket._storage_stub.read(
storage_read_request=StorageReadRequest(bucket_name=self._bucket.name, key=self.key)
)
return response.body
Expand All @@ -140,7 +234,7 @@ async def read(self) -> bytes:
async def delete(self):
"""Delete this file from the bucket."""
try:
await self._bucket._storage_stub.delete( # type: ignore pylint: disable=protected-access
await self._bucket._storage_stub.delete(
storage_delete_request=StorageDeleteRequest(bucket_name=self._bucket.name, key=self.key)
)
except GRPCError as grpc_err:
Expand All @@ -150,27 +244,33 @@ async def upload_url(self, expiry: Optional[Union[timedelta, int]] = None):
"""
Get a temporary writable URL to this file.
Parameters:
expiry (timedelta or int, optional): The expiry time for the signed URL.
If an integer is provided, it is treated as seconds. Default is 600 seconds.
Parameters
----------
expiry : int, timedelta, optional
The expiry time for the signed URL.
If an integer is provided, it is treated as seconds. Default is 600 seconds.
Returns:
Returns
-------
str: The signed URL.
"""
return await self._sign_url(mode=FileMode.WRITE, expiry=expiry)

async def download_url(self, expiry: Optional[Union[timedelta, int]] = None):
"""
Get a temporary readable URL to this file.
Parameters:
expiry (timedelta or int, optional): The expiry time for the signed URL.
If an integer is provided, it is treated as seconds. Default is 600 seconds.
Parameters
----------
expiry : int, timedelta, optional
The expiry time for the signed URL.
If an integer is provided, it is treated as seconds. Default is 600 seconds.
Returns:
Returns
-------
str: The signed URL.
"""
return await self._sign_url(mode=FileMode.READ, expiry=expiry)

Expand All @@ -182,7 +282,7 @@ async def _sign_url(self, mode: FileMode = FileMode.READ, expiry: Optional[Union
expiry = timedelta(seconds=expiry)

try:
response = await self._bucket._storage_stub.pre_sign_url( # type: ignore pylint: disable=protected-access
response = await self._bucket._storage_stub.pre_sign_url(
storage_pre_sign_url_request=StoragePreSignUrlRequest(
bucket_name=self._bucket.name, key=self.key, operation=mode.to_request_operation(), expiry=expiry
)
Expand Down Expand Up @@ -257,7 +357,7 @@ def _perms_to_actions(self, *args: BucketPermission) -> List[Action]:
return [action for perm in args for action in permission_actions_map[perm]]

def _to_resource_id(self) -> ResourceIdentifier:
return ResourceIdentifier(name=self.name, type=ResourceType.Bucket) # type:ignore
return ResourceIdentifier(name=self.name, type=ResourceType.Bucket)

def allow(
self,
Expand Down Expand Up @@ -316,6 +416,7 @@ def __init__(
key_prefix_filter=notification_prefix_filter,
)

# noinspection PyProtectedMember
Nitric._register_worker(self)

async def _listener_request_iterator(self):
Expand Down Expand Up @@ -359,9 +460,12 @@ async def start(self) -> None:
print(f"Stream terminated: {e.message}")
except grpclib.exceptions.StreamTerminatedError:
print("Stream from membrane closed.")
except KeyboardInterrupt:
print("Keyboard interrupt")
finally:
print("Closing client stream")
channel.close()
print("Listener stopped")


def bucket(name: str) -> Bucket:
Expand All @@ -370,4 +474,4 @@ def bucket(name: str) -> Bucket:
If a bucket has already been registered with the same name, the original reference will be reused.
"""
return Nitric._create_resource(Bucket, name) # type: ignore pylint: disable=protected-access
return Nitric._create_resource(Bucket, name)
2 changes: 1 addition & 1 deletion nitric/resources/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def __init__(self, name: str):
super().__init__(name)

def _to_resource_id(self) -> ResourceIdentifier:
return ResourceIdentifier(name=self.name, type=ResourceType.Queue) # type:ignore
return ResourceIdentifier(name=self.name, type=ResourceType.Queue)

def _perms_to_actions(self, *args: QueuePermission) -> List[Action]:
permission_actions_map: dict[QueuePermission, List[Action]] = {
Expand Down
Loading

0 comments on commit b8fa116

Please sign in to comment.