Skip to content

Commit

Permalink
feat: fix subscriptions and add bucket notifications (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
HomelessDinosaur authored May 17, 2023
2 parents c31870d + e35bd28 commit deb42f4
Show file tree
Hide file tree
Showing 56 changed files with 2,272 additions and 595 deletions.
2 changes: 1 addition & 1 deletion docs/nitric/api/documents.html
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ <h1 class="title">Module <code>nitric.api.documents</code></h1>
from grpclib import GRPCError

from nitric.api.const import MAX_SUB_COLLECTION_DEPTH
from nitric.api.exception import exception_from_grpc_error
from nitric.exception import exception_from_grpc_error
from nitric.proto.nitric.document.v1 import (
DocumentServiceStub,
Collection as CollectionMessage,
Expand Down
2 changes: 1 addition & 1 deletion docs/nitric/api/events.html
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ <h1 class="title">Module <code>nitric.api.events</code></h1>

from grpclib import GRPCError

from nitric.api.exception import exception_from_grpc_error
from nitric.exception import exception_from_grpc_error
from nitric.utils import new_default_channel, _struct_from_dict
from nitric.proto.nitric.event.v1 import (
EventServiceStub,
Expand Down
9 changes: 2 additions & 7 deletions docs/nitric/api/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ <h2 class="section-title" id="header-submodules">Sub-modules</h2>
<dd>
<div class="desc"></div>
</dd>
<dt><code class="name"><a title="nitric.api.exception" href="exception.html">nitric.api.exception</a></code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt><code class="name"><a title="nitric.api.queues" href="queues.html">nitric.api.queues</a></code></dt>
<dd>
<div class="desc"></div>
Expand Down Expand Up @@ -421,7 +417,7 @@ <h3>Methods</h3>

def bucket(self, name: str):
&#34;&#34;&#34;Return a reference to a bucket from the connected storage service.&#34;&#34;&#34;
return BucketRef(_storage=self, name=name)</code></pre>
return BucketRef(_storage=self, name=name, _server=None)</code></pre>
</details>
<h3>Methods</h3>
<dl>
Expand All @@ -436,7 +432,7 @@ <h3>Methods</h3>
</summary>
<pre><code class="python">def bucket(self, name: str):
&#34;&#34;&#34;Return a reference to a bucket from the connected storage service.&#34;&#34;&#34;
return BucketRef(_storage=self, name=name)</code></pre>
return BucketRef(_storage=self, name=name, _server=None)</code></pre>
</details>
</dd>
</dl>
Expand Down Expand Up @@ -586,7 +582,6 @@ <h1>Index</h1>
<li><code><a title="nitric.api.const" href="const.html">nitric.api.const</a></code></li>
<li><code><a title="nitric.api.documents" href="documents.html">nitric.api.documents</a></code></li>
<li><code><a title="nitric.api.events" href="events.html">nitric.api.events</a></code></li>
<li><code><a title="nitric.api.exception" href="exception.html">nitric.api.exception</a></code></li>
<li><code><a title="nitric.api.queues" href="queues.html">nitric.api.queues</a></code></li>
<li><code><a title="nitric.api.secrets" href="secrets.html">nitric.api.secrets</a></code></li>
<li><code><a title="nitric.api.storage" href="storage.html">nitric.api.storage</a></code></li>
Expand Down
14 changes: 10 additions & 4 deletions docs/nitric/api/queues.html
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ <h1 class="title">Module <code>nitric.api.queues</code></h1>

from grpclib import GRPCError

from nitric.api.exception import FailedPreconditionException, exception_from_grpc_error, InvalidArgumentException
from nitric.exception import FailedPreconditionException, exception_from_grpc_error, InvalidArgumentException
from nitric.utils import new_default_channel, _struct_from_dict, _dict_from_struct
from nitric.proto.nitric.queue.v1 import (
QueueServiceStub,
Expand Down Expand Up @@ -183,7 +183,9 @@ <h1 class="title">Module <code>nitric.api.queues</code></h1>
task = Task()

if isinstance(task, dict):
# TODO: handle tasks that are just a payload
# Handle if its just a payload
if task.get(&#34;payload&#34;) is None:
task = {&#34;payload&#34;: task}
task = Task(**task)

try:
Expand Down Expand Up @@ -332,7 +334,9 @@ <h3>Class variables</h3>
task = Task()

if isinstance(task, dict):
# TODO: handle tasks that are just a payload
# Handle if its just a payload
if task.get(&#34;payload&#34;) is None:
task = {&#34;payload&#34;: task}
task = Task(**task)

try:
Expand Down Expand Up @@ -471,7 +475,9 @@ <h3>Methods</h3>
task = Task()

if isinstance(task, dict):
# TODO: handle tasks that are just a payload
# Handle if its just a payload
if task.get(&#34;payload&#34;) is None:
task = {&#34;payload&#34;: task}
task = Task(**task)

try:
Expand Down
2 changes: 1 addition & 1 deletion docs/nitric/api/secrets.html
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ <h1 class="title">Module <code>nitric.api.secrets</code></h1>

from grpclib import GRPCError

from nitric.api.exception import exception_from_grpc_error
from nitric.exception import exception_from_grpc_error
from nitric.utils import new_default_channel
from nitric.proto.nitric.secret.v1 import (
SecretServiceStub,
Expand Down
96 changes: 82 additions & 14 deletions docs/nitric/api/storage.html
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ <h1 class="title">Module <code>nitric.api.storage</code></h1>
# limitations under the License.
#
from dataclasses import dataclass
from typing import Union

from grpclib import GRPCError
from nitric.api.exception import exception_from_grpc_error, InvalidArgumentException
from nitric.exception import exception_from_grpc_error, InvalidArgumentException
from nitric.application import Nitric
from nitric.faas import FunctionServer, FileNotificationWorkerOptions, FileNotificationMiddleware
from nitric.utils import new_default_channel
from nitric.proto.nitric.storage.v1 import (
StorageServiceStub,
Expand All @@ -59,6 +62,7 @@ <h1 class="title">Module <code>nitric.api.storage</code></h1>
StorageListFilesRequest,
)
from enum import Enum
from warnings import warn


class Storage(object):
Expand All @@ -80,15 +84,16 @@ <h1 class="title">Module <code>nitric.api.storage</code></h1>

def bucket(self, name: str):
&#34;&#34;&#34;Return a reference to a bucket from the connected storage service.&#34;&#34;&#34;
return BucketRef(_storage=self, name=name)
return BucketRef(_storage=self, name=name, _server=None)


@dataclass(frozen=True, order=True)
@dataclass(order=True)
class BucketRef(object):
&#34;&#34;&#34;A reference to a bucket in a storage service, used to the perform operations on that bucket.&#34;&#34;&#34;

_storage: Storage
name: str
_server: Union[FunctionServer, None]

def file(self, key: str):
&#34;&#34;&#34;Return a reference to a file in this bucket.&#34;&#34;&#34;
Expand All @@ -101,6 +106,22 @@ <h1 class="title">Module <code>nitric.api.storage</code></h1>
)
return [self.file(f.key) for f in resp.files]

def on(self, notification_type: str, notification_prefix_filter: str):
&#34;&#34;&#34;Create and return a bucket notification decorator for this bucket.&#34;&#34;&#34;

def decorator(func: FileNotificationMiddleware):
self._server = FunctionServer(
FileNotificationWorkerOptions(
bucket=self,
notification_type=notification_type,
notification_prefix_filter=notification_prefix_filter,
)
)
self._server.bucket_notification(func)
Nitric._register_worker(self._server)

return decorator


class FileMode(Enum):
&#34;&#34;&#34;Definition of available operation modes for file signed URLs.&#34;&#34;&#34;
Expand Down Expand Up @@ -160,14 +181,15 @@ <h1 class="title">Module <code>nitric.api.storage</code></h1>

async def upload_url(self, expiry: int = 600):
&#34;&#34;&#34;Get a temporary writable URL to this file.&#34;&#34;&#34;
await self.sign_url(mode=FileMode.WRITE, expiry=expiry)
return await self.sign_url(mode=FileMode.WRITE, expiry=expiry)

async def download_url(self, expiry: int = 600):
&#34;&#34;&#34;Get a temporary readable URL to this file.&#34;&#34;&#34;
await self.sign_url(mode=FileMode.READ, expiry=expiry)
return await self.sign_url(mode=FileMode.READ, expiry=expiry)

async def sign_url(self, mode: FileMode = FileMode.READ, expiry: int = 3600):
&#34;&#34;&#34;Generate a signed URL for reading or writing to a file.&#34;&#34;&#34;
warn(&#34;File.sign_url() is deprecated, use upload_url() or download_url() instead&#34;, DeprecationWarning)
try:
response = await self._storage._storage_stub.pre_sign_url(
storage_pre_sign_url_request=StoragePreSignUrlRequest(
Expand All @@ -190,20 +212,21 @@ <h2 class="section-title" id="header-classes">Classes</h2>
<dl>
<dt id="nitric.api.storage.BucketRef"><code class="flex name class">
<span>class <span class="ident">BucketRef</span></span>
<span>(</span><span>_storage: <a title="nitric.api.storage.Storage" href="#nitric.api.storage.Storage">Storage</a>, name: str)</span>
<span>(</span><span>_storage: <a title="nitric.api.storage.Storage" href="#nitric.api.storage.Storage">Storage</a>, name: str, _server: Optional[<a title="nitric.faas.FunctionServer" href="../faas.html#nitric.faas.FunctionServer">FunctionServer</a>])</span>
</code></dt>
<dd>
<div class="desc"><p>A reference to a bucket in a storage service, used to the perform operations on that bucket.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">@dataclass(frozen=True, order=True)
<pre><code class="python">@dataclass(order=True)
class BucketRef(object):
&#34;&#34;&#34;A reference to a bucket in a storage service, used to the perform operations on that bucket.&#34;&#34;&#34;

_storage: Storage
name: str
_server: Union[FunctionServer, None]

def file(self, key: str):
&#34;&#34;&#34;Return a reference to a file in this bucket.&#34;&#34;&#34;
Expand All @@ -214,7 +237,23 @@ <h2 class="section-title" id="header-classes">Classes</h2>
resp = await self._storage._storage_stub.list_files(
storage_list_files_request=StorageListFilesRequest(bucket_name=self.name)
)
return [self.file(f.key) for f in resp.files]</code></pre>
return [self.file(f.key) for f in resp.files]

def on(self, notification_type: str, notification_prefix_filter: str):
&#34;&#34;&#34;Create and return a bucket notification decorator for this bucket.&#34;&#34;&#34;

def decorator(func: FileNotificationMiddleware):
self._server = FunctionServer(
FileNotificationWorkerOptions(
bucket=self,
notification_type=notification_type,
notification_prefix_filter=notification_prefix_filter,
)
)
self._server.bucket_notification(func)
Nitric._register_worker(self._server)

return decorator</code></pre>
</details>
<h3>Class variables</h3>
<dl>
Expand Down Expand Up @@ -256,6 +295,32 @@ <h3>Methods</h3>
return [self.file(f.key) for f in resp.files]</code></pre>
</details>
</dd>
<dt id="nitric.api.storage.BucketRef.on"><code class="name flex">
<span>def <span class="ident">on</span></span>(<span>self, notification_type: str, notification_prefix_filter: str)</span>
</code></dt>
<dd>
<div class="desc"><p>Create and return a bucket notification decorator for this bucket.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def on(self, notification_type: str, notification_prefix_filter: str):
&#34;&#34;&#34;Create and return a bucket notification decorator for this bucket.&#34;&#34;&#34;

def decorator(func: FileNotificationMiddleware):
self._server = FunctionServer(
FileNotificationWorkerOptions(
bucket=self,
notification_type=notification_type,
notification_prefix_filter=notification_prefix_filter,
)
)
self._server.bucket_notification(func)
Nitric._register_worker(self._server)

return decorator</code></pre>
</details>
</dd>
</dl>
</dd>
<dt id="nitric.api.storage.File"><code class="flex name class">
Expand Down Expand Up @@ -310,14 +375,15 @@ <h3>Methods</h3>

async def upload_url(self, expiry: int = 600):
&#34;&#34;&#34;Get a temporary writable URL to this file.&#34;&#34;&#34;
await self.sign_url(mode=FileMode.WRITE, expiry=expiry)
return await self.sign_url(mode=FileMode.WRITE, expiry=expiry)

async def download_url(self, expiry: int = 600):
&#34;&#34;&#34;Get a temporary readable URL to this file.&#34;&#34;&#34;
await self.sign_url(mode=FileMode.READ, expiry=expiry)
return await self.sign_url(mode=FileMode.READ, expiry=expiry)

async def sign_url(self, mode: FileMode = FileMode.READ, expiry: int = 3600):
&#34;&#34;&#34;Generate a signed URL for reading or writing to a file.&#34;&#34;&#34;
warn(&#34;File.sign_url() is deprecated, use upload_url() or download_url() instead&#34;, DeprecationWarning)
try:
response = await self._storage._storage_stub.pre_sign_url(
storage_pre_sign_url_request=StoragePreSignUrlRequest(
Expand Down Expand Up @@ -367,7 +433,7 @@ <h3>Methods</h3>
</summary>
<pre><code class="python">async def download_url(self, expiry: int = 600):
&#34;&#34;&#34;Get a temporary readable URL to this file.&#34;&#34;&#34;
await self.sign_url(mode=FileMode.READ, expiry=expiry)</code></pre>
return await self.sign_url(mode=FileMode.READ, expiry=expiry)</code></pre>
</details>
</dd>
<dt id="nitric.api.storage.File.read"><code class="name flex">
Expand Down Expand Up @@ -401,6 +467,7 @@ <h3>Methods</h3>
</summary>
<pre><code class="python">async def sign_url(self, mode: FileMode = FileMode.READ, expiry: int = 3600):
&#34;&#34;&#34;Generate a signed URL for reading or writing to a file.&#34;&#34;&#34;
warn(&#34;File.sign_url() is deprecated, use upload_url() or download_url() instead&#34;, DeprecationWarning)
try:
response = await self._storage._storage_stub.pre_sign_url(
storage_pre_sign_url_request=StoragePreSignUrlRequest(
Expand All @@ -423,7 +490,7 @@ <h3>Methods</h3>
</summary>
<pre><code class="python">async def upload_url(self, expiry: int = 600):
&#34;&#34;&#34;Get a temporary writable URL to this file.&#34;&#34;&#34;
await self.sign_url(mode=FileMode.WRITE, expiry=expiry)</code></pre>
return await self.sign_url(mode=FileMode.WRITE, expiry=expiry)</code></pre>
</details>
</dd>
<dt id="nitric.api.storage.File.write"><code class="name flex">
Expand Down Expand Up @@ -545,7 +612,7 @@ <h3>Methods</h3>

def bucket(self, name: str):
&#34;&#34;&#34;Return a reference to a bucket from the connected storage service.&#34;&#34;&#34;
return BucketRef(_storage=self, name=name)</code></pre>
return BucketRef(_storage=self, name=name, _server=None)</code></pre>
</details>
<h3>Methods</h3>
<dl>
Expand All @@ -560,7 +627,7 @@ <h3>Methods</h3>
</summary>
<pre><code class="python">def bucket(self, name: str):
&#34;&#34;&#34;Return a reference to a bucket from the connected storage service.&#34;&#34;&#34;
return BucketRef(_storage=self, name=name)</code></pre>
return BucketRef(_storage=self, name=name, _server=None)</code></pre>
</details>
</dd>
</dl>
Expand All @@ -587,6 +654,7 @@ <h4><code><a title="nitric.api.storage.BucketRef" href="#nitric.api.storage.Buck
<li><code><a title="nitric.api.storage.BucketRef.file" href="#nitric.api.storage.BucketRef.file">file</a></code></li>
<li><code><a title="nitric.api.storage.BucketRef.files" href="#nitric.api.storage.BucketRef.files">files</a></code></li>
<li><code><a title="nitric.api.storage.BucketRef.name" href="#nitric.api.storage.BucketRef.name">name</a></code></li>
<li><code><a title="nitric.api.storage.BucketRef.on" href="#nitric.api.storage.BucketRef.on">on</a></code></li>
</ul>
</li>
<li>
Expand Down
17 changes: 10 additions & 7 deletions docs/nitric/application.html
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ <h1 class="title">Module <code>nitric.application</code></h1>
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient

from nitric.faas import FunctionServer
from nitric.api.exception import NitricUnavailableException
from nitric.exception import NitricUnavailableException

# from nitric.resources.base import BaseResource
from typing import Dict, List, Type, Any, TypeVar
Expand Down Expand Up @@ -95,9 +95,9 @@ <h1 class="title">Module <code>nitric.application</code></h1>
)

@classmethod
def _create_tracer(cls) -&gt; TracerProvider:
local_run = &#34;OTELCOL_BIN&#34; not in environ
samplePercent = int(getenv(&#34;NITRIC_TRACE_SAMPLE_PERCENT&#34;, &#34;100&#34;)) / 100.0
def _create_tracer(cls, local: bool = True, sampler: int = 100) -&gt; TracerProvider:
local_run = local or &#34;OTELCOL_BIN&#34; not in environ
samplePercent = int(getenv(&#34;NITRIC_TRACE_SAMPLE_PERCENT&#34;, sampler)) / 100.0

# If its a local run use a console exporter, otherwise export using OTEL Protocol
exporter = OTLPSpanExporter(endpoint=&#34;http://localhost:4317&#34;, insecure=True)
Expand All @@ -123,6 +123,7 @@ <h1 class="title">Module <code>nitric.application</code></h1>
This will execute in an existing event loop if there is one, otherwise it will attempt to create its own.
&#34;&#34;&#34;
provider = cls._create_tracer()
print(cls._workers)
try:
try:
loop = asyncio.get_running_loop()
Expand Down Expand Up @@ -191,9 +192,9 @@ <h2 class="section-title" id="header-classes">Classes</h2>
)

@classmethod
def _create_tracer(cls) -&gt; TracerProvider:
local_run = &#34;OTELCOL_BIN&#34; not in environ
samplePercent = int(getenv(&#34;NITRIC_TRACE_SAMPLE_PERCENT&#34;, &#34;100&#34;)) / 100.0
def _create_tracer(cls, local: bool = True, sampler: int = 100) -&gt; TracerProvider:
local_run = local or &#34;OTELCOL_BIN&#34; not in environ
samplePercent = int(getenv(&#34;NITRIC_TRACE_SAMPLE_PERCENT&#34;, sampler)) / 100.0

# If its a local run use a console exporter, otherwise export using OTEL Protocol
exporter = OTLPSpanExporter(endpoint=&#34;http://localhost:4317&#34;, insecure=True)
Expand All @@ -219,6 +220,7 @@ <h2 class="section-title" id="header-classes">Classes</h2>
This will execute in an existing event loop if there is one, otherwise it will attempt to create its own.
&#34;&#34;&#34;
provider = cls._create_tracer()
print(cls._workers)
try:
try:
loop = asyncio.get_running_loop()
Expand Down Expand Up @@ -256,6 +258,7 @@ <h3>Static methods</h3>
This will execute in an existing event loop if there is one, otherwise it will attempt to create its own.
&#34;&#34;&#34;
provider = cls._create_tracer()
print(cls._workers)
try:
try:
loop = asyncio.get_running_loop()
Expand Down
Loading

0 comments on commit deb42f4

Please sign in to comment.