Skip to content

Commit

Permalink
[EGv2] Binary mode (#32922)
Browse files Browse the repository at this point in the history
* [EGv2] Build Release (#30325)

* move old sdk under legacy

* gen typespec code

* naming changes from archboard

* samples

* update patch naming

* update imports with new gen

* update samples

* update client naming on aio

* update receive op

* update async to close client

* update receive()

* update gen code

* moving around samples

* updating samples

* update samples

* update patch and samples

* patch internalmodels

* spacing

* updating model patch

* update patch models

* add both models back

* update docstring

* update docs

* updating patch for receive

* old EG models

* add reject samples

* patch

* update format

* update patch

* eventgrid_client exceptions

* update test imports

* update total sample

* receive patch fix

* add in more tests

* update test file

* remove locktoken model

* remove LockToken in patch

* remove event delivery delay

* eg client exceptions

* .8.5 generation, and deliveryCount

* rename sample

* update version for beta

* changelog

* updating for gen

* regen

* generate via commit

* publish result

* fix docstring

* publish docstring

* return type

* publish result

* return publish result -- is none

* format

* update Publish result model

* deliverycount patch

* update from main

* add copyright

* added to readme

* remove from readme

* force publish_result response

* update patch tp unindent

* cspell

* update mypy.ini

* import order

* mark livetest

* update operations init

* rename async

* mypy

* ignore mypy

* pylint

* pylint

* ignore pylint for now to avoid gen code errors

* ignore samples until ARM setup

* update patches

* remove publish result

* remove PublishResult

* remove publishresult

* comma

Co-authored-by: swathipil <[email protected]>

* update publishResult

* change to .value

* gen code " to '

* remove comment

* ran black

* update changelog

* update sample readme

* gen code without query name

* gen code

* update tsp commit

* remove publishresult

* readme disclaimer

* update changelog

---------

Co-authored-by: swathipil <[email protected]>

* Beta LiveTests (#30728)

* add bicep file for tests

* update output

* update test

* secret sanitization

* refactor failing test

* update conftest

* update assets and sanitizers

* update preparer loc

* update conftest

* conftest

* update conftest

* remove variables for now

* update assets

* update tests

* try to update regex

* update recordings

* update conftest

* update preparer

* update test

* update exception test

* update tests

* update asset

* update conftest

* pr comments

* default needs to be eastus

* import

* [EGv2] Build Release (#30325)

* move old sdk under legacy

* gen typespec code

* naming changes from archboard

* samples

* update patch naming

* update imports with new gen

* update samples

* update client naming on aio

* update receive op

* update async to close client

* update receive()

* update gen code

* moving around samples

* updating samples

* update samples

* update patch and samples

* patch internalmodels

* spacing

* updating model patch

* update patch models

* add both models back

* update docstring

* update docs

* updating patch for receive

* old EG models

* add reject samples

* patch

* update format

* update patch

* eventgrid_client exceptions

* update test imports

* update total sample

* receive patch fix

* add in more tests

* update test file

* remove locktoken model

* remove LockToken in patch

* remove event delivery delay

* eg client exceptions

* .8.5 generation, and deliveryCount

* rename sample

* update version for beta

* changelog

* updating for gen

* regen

* generate via commit

* publish result

* fix docstring

* publish docstring

* return type

* publish result

* return publish result -- is none

* format

* update Publish result model

* deliverycount patch

* update from main

* add copyright

* added to readme

* remove from readme

* force publish_result response

* update patch tp unindent

* cspell

* update mypy.ini

* import order

* mark livetest

* update operations init

* rename async

* mypy

* ignore mypy

* pylint

* pylint

* ignore pylint for now to avoid gen code errors

* ignore samples until ARM setup

* update patches

* remove publish result

* remove PublishResult

* remove publishresult

* comma

Co-authored-by: swathipil <[email protected]>

* update publishResult

* change to .value

* gen code " to '

* remove comment

* ran black

* update changelog

* update sample readme

* gen code without query name

* gen code

* update tsp commit

* remove publishresult

* readme disclaimer

* update changelog

---------

Co-authored-by: swathipil <[email protected]>

* fix merge

* dont go to generated before binary

* update patch

* update patches

* eventgrid client patch

* changes

* add

* update test

* update tyoe checking

* pass through binary_mode for now --

* update patch aio

* add async func

* update

* sys

* update kwargs

* add Todo and start adding more tests

* update

* differentiate between binary and not

* update binary

* no base64 in binary mode

* binary

* try JSONEncoder on everything if not str/bytes

* update test

* update test

* update changes

* whitespace

* space

* remove commented

* str serialize extensions?

* xml test

* encode extensions as object

* update test

* update extension serialization for  deserialize

* move flag to operation level

* extra comma

* dont raise httpresponse

* update patch

* accept dict cloud events

* spacing

* remove content_type check

* add live test

* remove live test mark

* update

* use env vars

* update test

* only run live test

* comment

* typo

* error incorrect

* start comments

* update test

* add sample

* update tests

* update docstrings to add clarity

* update err message

* remove generated cloud event

* update sample

* update

* update samples to include dict

* update patch

* spacing

* add comments

* formatting

* update doc

* update tests

* update tests

* tests

* skip tests for now

* typo

* add dict binary mode

* update docstring

* update patch to allow throw error

* first pass at comments

* update patch eror

* nit

---------

Co-authored-by: swathipil <[email protected]>
  • Loading branch information
l0lawrence and swathipil committed Apr 22, 2024
1 parent efdc1b7 commit 8c9da89
Show file tree
Hide file tree
Showing 11 changed files with 826 additions and 59 deletions.
342 changes: 306 additions & 36 deletions sdk/eventgrid/azure-eventgrid/azure/eventgrid/_operations/_patch.py

Large diffs are not rendered by default.

202 changes: 190 additions & 12 deletions sdk/eventgrid/azure-eventgrid/azure/eventgrid/aio/_operations/_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,36 @@
"""Customize generated code here.
Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize
"""
from typing import List, overload, Union, Any, Optional
from typing import List, overload, Union, Any, Optional, Callable, Dict, TypeVar
import sys
from azure.core.messaging import CloudEvent
from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, ResourceNotModifiedError, map_error
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.pipeline import PipelineResponse
from azure.core.rest import HttpRequest, AsyncHttpResponse
from azure.core.utils import case_insensitive_dict
from ...models._patch import ReceiveResult, ReceiveDetails
from ..._operations._patch import _cloud_event_to_generated
from ..._operations._patch import _to_http_request
from ._operations import EventGridClientOperationsMixin as OperationsMixin

from ... import models as _models
from ..._model_base import _deserialize
if sys.version_info >= (3, 9):
from collections.abc import MutableMapping
else:
from typing import MutableMapping # type: ignore # pylint: disable=ungrouped-imports
JSON = MutableMapping[str, Any] # pylint: disable=unsubscriptable-object
T = TypeVar('T')
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]

class EventGridClientOperationsMixin(OperationsMixin):

@overload
async def publish_cloud_events(
self,
topic_name: str,
body: List[CloudEvent],
*,
binary_mode: bool = False,
content_type: str = "application/cloudevents-batch+json; charset=utf-8",
**kwargs: Any
) -> None:
Expand All @@ -33,6 +48,10 @@ async def publish_cloud_events(
:type topic_name: str
:param body: Array of Cloud Events being published. Required.
:type body: list[~azure.core.messaging.CloudEvent]
:keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False.
When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`.
If 'datacontenttype` is not specified the default content type is `application/cloudevents-batch+json; charset=utf-8`.
Requires CloudEvent data to be passed in as bytes.
:keyword content_type: content type. Default value is "application/cloudevents-batch+json;
charset=utf-8".
:paramtype content_type: str
Expand All @@ -49,6 +68,7 @@ async def publish_cloud_events(
topic_name: str,
body: CloudEvent,
*,
binary_mode: bool = False,
content_type: str = "application/cloudevents+json; charset=utf-8",
**kwargs: Any
) -> None:
Expand All @@ -62,6 +82,78 @@ async def publish_cloud_events(
:type topic_name: str
:param body: Single Cloud Event being published. Required.
:type body: ~azure.core.messaging.CloudEvent
:keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False.
When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`.
If `datacontenttype` is not specified, the default content type is `application/cloudevents+json; charset=utf-8`.
Requires CloudEvent data to be passed in as bytes.
:keyword content_type: content type. Default value is "application/cloudevents+json;
charset=utf-8".
:paramtype content_type: str
:keyword bool stream: Whether to stream the response of this operation. Defaults to False. You
will have to context manage the returned stream.
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""

@overload
async def publish_cloud_events(
self,
topic_name: str,
body: Dict[str, Any],
*,
binary_mode: bool = False,
content_type: str = "application/cloudevents+json; charset=utf-8",
**kwargs: Any
) -> None:
"""Publish Single Cloud Event to namespace topic. In case of success, the server responds with an
HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return
various error codes. For example, 401: which indicates authorization failure, 403: which
indicates quota exceeded or message is too large, 410: which indicates that specific topic is
not found, 400: for bad request, and 500: for internal server error.
:param topic_name: Topic Name. Required.
:type topic_name: str
:param body: Single Cloud Event being published. Required.
:type body: dict[str, Any]
:keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False.
When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`.
If `datacontenttype` is not specified, the default content type is `application/cloudevents+json; charset=utf-8`.
Requires CloudEvent data to be passed in as bytes.
:keyword content_type: content type. Default value is "application/cloudevents+json;
charset=utf-8".
:paramtype content_type: str
:keyword bool stream: Whether to stream the response of this operation. Defaults to False. You
will have to context manage the returned stream.
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""

@overload
async def publish_cloud_events(
self,
topic_name: str,
body: List[Dict[str, Any]],
*,
binary_mode: bool = False,
content_type: str = "application/cloudevents-batch+json; charset=utf-8",
**kwargs: Any
) -> None:
"""Publish Single Cloud Event to namespace topic. In case of success, the server responds with an
HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return
various error codes. For example, 401: which indicates authorization failure, 403: which
indicates quota exceeded or message is too large, 410: which indicates that specific topic is
not found, 400: for bad request, and 500: for internal server error.
:param topic_name: Topic Name. Required.
:type topic_name: str
:param body: Batch of Cloud Events being published. Required.
:type body: list[dict[str, Any]]
:keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False.
When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`.
If 'datacontenttype` is not specified, the default content type is `application/cloudevents-batch+json; charset=utf-8`.
Requires CloudEvent data to be passed in as bytes.
:keyword content_type: content type. Default value is "application/cloudevents+json;
charset=utf-8".
:paramtype content_type: str
Expand All @@ -74,7 +166,12 @@ async def publish_cloud_events(

@distributed_trace_async
async def publish_cloud_events(
self, topic_name: str, body: Union[List[CloudEvent], CloudEvent], **kwargs
self,
topic_name: str,
body: Union[List[CloudEvent], CloudEvent, List[Dict[str, Any]], Dict[str, Any]],
*,
binary_mode: bool = False,
**kwargs
) -> None:
"""Publish Batch Cloud Event or Events to namespace topic. In case of success, the server responds with an
HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return
Expand All @@ -85,7 +182,11 @@ async def publish_cloud_events(
:param topic_name: Topic Name. Required.
:type topic_name: str
:param body: Cloud Event or Array of Cloud Events being published. Required.
:type body: ~azure.core.messaging.CloudEvent or list[~azure.core.messaging.CloudEvent]
:type body: ~azure.core.messaging.CloudEvent or list[~azure.core.messaging.CloudEvent] or dict[str, any] or list[dict[str, any]]
:keyword bool binary_mode: Whether to publish the events in binary mode. Defaults to False.
When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`.
If not specified, the default content type is "application/cloudevents+json; charset=utf-8".
Requires CloudEvent data to be passed in as bytes.
:keyword content_type: content type. Default value is "application/cloudevents+json;
charset=utf-8".
:paramtype content_type: str
Expand All @@ -95,16 +196,30 @@ async def publish_cloud_events(
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""

# Check that the body is a CloudEvent or list of CloudEvents even if dict
if isinstance(body, dict) or (isinstance(body, list) and isinstance(body[0], dict)):
try:
if isinstance(body, list):
body = [CloudEvent.from_dict(event) for event in body]
else:
body = CloudEvent.from_dict(body)
except AttributeError:
raise TypeError("Incorrect type for body. Expected CloudEvent,"
" list of CloudEvents, dict, or list of dicts."
" If dict passed, must follow the CloudEvent format.")


if isinstance(body, CloudEvent):
kwargs["content_type"] = "application/cloudevents+json; charset=utf-8"
internal_body = _cloud_event_to_generated(body)
await self._publish_cloud_event(topic_name, internal_body, **kwargs)
else:
await self._publish(topic_name, body, self._config.api_version, binary_mode, **kwargs)
elif isinstance(body, list):
kwargs["content_type"] = "application/cloudevents-batch+json; charset=utf-8"
internal_body_list = []
for item in body:
internal_body_list.append(_cloud_event_to_generated(item))
await self._publish_cloud_events(topic_name, internal_body_list, **kwargs)
await self._publish(topic_name, body, self._config.api_version, binary_mode, **kwargs)
else:
raise TypeError("Incorrect type for body. Expected CloudEvent,"
" list of CloudEvents, dict, or list of dicts."
" If dict passed, must follow the CloudEvent format.")

@distributed_trace_async
async def receive_cloud_events(
Expand Down Expand Up @@ -158,6 +273,69 @@ async def receive_cloud_events(
receive_result_deserialized = ReceiveResult(value=detail_items)
return receive_result_deserialized

async def _publish(self, topic_name: str, event: Any, api_version, binary_mode, **kwargs: Any) -> None:

error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError
}
error_map.update(kwargs.pop('error_map', {}) or {})

_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}

cls: ClsType[_models._models.PublishResult] = kwargs.pop( # pylint: disable=protected-access
'cls', None
)

content_type: str = kwargs.pop('content_type', _headers.pop('content-type', "application/cloudevents+json; charset=utf-8"))

# Given that we know the cloud event is binary mode, we can convert it to a HTTP request
http_request = _to_http_request(
topic_name=topic_name,
api_version=api_version,
headers=_headers,
params=_params,
content_type=content_type,
event=event,
binary_mode=binary_mode,
**kwargs
)

_stream = kwargs.pop("stream", False)

path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
http_request.url = self._client.format_url(http_request.url, **path_format_arguments)

_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access
http_request,
stream=_stream,
**kwargs
)

response = pipeline_response.http_response

if response.status_code not in [200]:
if _stream:
await response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)

if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(
_models._models.PublishResult, # pylint: disable=protected-access
response.json()
)

if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore

return deserialized # type: ignore


__all__: List[str] = [
"EventGridClientOperationsMixin"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import os
import asyncio
import json
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid.aio import EventGridClient
from azure.eventgrid.models import *
from azure.core.messaging import CloudEvent
from azure.core.exceptions import HttpResponseError


EVENTGRID_KEY: str = os.environ["EVENTGRID_KEY"]
EVENTGRID_ENDPOINT: str = os.environ["EVENTGRID_ENDPOINT"]
TOPIC_NAME: str = os.environ["EVENTGRID_TOPIC_NAME"]
EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENTGRID_EVENT_SUBSCRIPTION_NAME"]

# Create a client
client = EventGridClient(EVENTGRID_ENDPOINT, AzureKeyCredential(EVENTGRID_KEY))


async def run():
async with client:
# Publish a CloudEvent
try:
# Publish CloudEvent in binary mode with str encoded as bytes
cloud_event_dict = {"data":b"HI", "source":"https://example.com", "type":"example", "datacontenttype":"text/plain"}
await client.publish_cloud_events(topic_name=TOPIC_NAME, body=cloud_event_dict)

# Publish CloudEvent in binary mode with json encoded as bytes
cloud_event = CloudEvent(data=json.dumps({"hello":"data"}).encode("utf-8"), source="https://example.com", type="example", datacontenttype="application/json")
await client.publish_cloud_events(topic_name=TOPIC_NAME, body=cloud_event, binary_mode=True)

receive_result = await client.receive_cloud_events(
topic_name=TOPIC_NAME,
event_subscription_name=EVENT_SUBSCRIPTION_NAME,
max_events=10,
max_wait_time=10,
)
for details in receive_result.value:
cloud_event_received = details.event
print("CloudEvent: ", cloud_event_received)
print("Data: ", cloud_event_received.data)
except HttpResponseError:
raise

if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(run())
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,30 @@

EVENTGRID_KEY: str = os.environ["EVENTGRID_KEY"]
EVENTGRID_ENDPOINT: str = os.environ["EVENTGRID_ENDPOINT"]
TOPIC_NAME: str = os.environ["TOPIC_NAME"]
EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENT_SUBSCRIPTION_NAME"]
TOPIC_NAME: str = os.environ["EVENTGRID_TOPIC_NAME"]
EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENTGRID_EVENT_SUBSCRIPTION_NAME"]

# Create a client
client = EventGridClient(EVENTGRID_ENDPOINT, AzureKeyCredential(EVENTGRID_KEY))


async def run():
async with client:

# Publish a CloudEvent as dict
try:
cloud_event_dict = {"data": "hello", "source": "https://example.com", "type": "example"}
await client.publish_cloud_events(topic_name=TOPIC_NAME, body=cloud_event_dict)
except HttpResponseError:
raise

# Publish a list of CloudEvents as dict
try:
await client.publish_cloud_events(topic_name=TOPIC_NAME, body=[cloud_event_dict, cloud_event_dict])
except HttpResponseError:
raise


# Publish a CloudEvent
try:
cloud_event = CloudEvent(
Expand Down
Loading

0 comments on commit 8c9da89

Please sign in to comment.