Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EGv2] Binary mode #32922

Merged
merged 78 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from 77 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
ec73729
[EGv2] Build Release (#30325)
l0lawrence May 19, 2023
c7188d3
Beta LiveTests (#30728)
l0lawrence Aug 25, 2023
7e5003c
[EGv2] Build Release (#30325)
l0lawrence May 19, 2023
951a9d8
fix merge
l0lawrence Oct 5, 2023
69d253b
dont go to generated before binary
l0lawrence Oct 5, 2023
000e3b3
update patch
l0lawrence Oct 12, 2023
97fb553
update patches
l0lawrence Oct 12, 2023
2e7bb32
eventgrid client patch
l0lawrence Oct 12, 2023
028484f
changes
l0lawrence Oct 12, 2023
503c9e8
add
l0lawrence Oct 12, 2023
92df675
update test
l0lawrence Oct 12, 2023
c2b9eed
update tyoe checking
l0lawrence Oct 13, 2023
31f0c02
pass through binary_mode for now --
l0lawrence Oct 13, 2023
67a7279
update patch aio
l0lawrence Oct 13, 2023
54b2c43
add async func
l0lawrence Oct 16, 2023
2bcb97f
update
l0lawrence Oct 16, 2023
7a650bb
sys
l0lawrence Oct 16, 2023
db034eb
update kwargs
l0lawrence Oct 16, 2023
e83ac70
add Todo and start adding more tests
l0lawrence Oct 17, 2023
0ec4a3a
update
l0lawrence Oct 17, 2023
8647028
differentiate between binary and not
l0lawrence Oct 17, 2023
2e3b639
update binary
l0lawrence Oct 17, 2023
3a1b72a
no base64 in binary mode
l0lawrence Oct 17, 2023
110f3f1
binary
l0lawrence Oct 17, 2023
015d1f4
try JSONEncoder on everything if not str/bytes
l0lawrence Oct 18, 2023
13f0ae3
update test
l0lawrence Oct 18, 2023
79dba1b
update test
l0lawrence Oct 20, 2023
47148f5
update changes
l0lawrence Oct 23, 2023
0a1307d
whitespace
l0lawrence Oct 23, 2023
9ca6f1d
space
l0lawrence Oct 23, 2023
52845d3
remove commented
l0lawrence Oct 23, 2023
fb53f33
str serialize extensions?
l0lawrence Oct 23, 2023
74c434d
xml test
l0lawrence Oct 23, 2023
cec8eaf
encode extensions as object
l0lawrence Oct 23, 2023
08eacb6
update test
l0lawrence Oct 23, 2023
fd002c1
update extension serialization for deserialize
l0lawrence Oct 23, 2023
dec4248
move flag to operation level
l0lawrence Oct 25, 2023
4eb1876
extra comma
l0lawrence Oct 25, 2023
60ff404
dont raise httpresponse
l0lawrence Oct 25, 2023
2b17321
update patch
l0lawrence Oct 25, 2023
6fe171d
accept dict cloud events
l0lawrence Oct 26, 2023
bb8ba88
spacing
l0lawrence Oct 26, 2023
633166e
remove content_type check
l0lawrence Oct 26, 2023
3b37f20
add live test
l0lawrence Oct 26, 2023
6b50b87
remove live test mark
l0lawrence Oct 27, 2023
bc997b2
update
l0lawrence Oct 27, 2023
efb5911
use env vars
l0lawrence Oct 27, 2023
4e046c8
update test
l0lawrence Oct 27, 2023
7db3968
only run live test
l0lawrence Oct 27, 2023
1d74cda
comment
l0lawrence Oct 30, 2023
0abd334
typo
l0lawrence Oct 31, 2023
9495362
error incorrect
l0lawrence Oct 31, 2023
58bd912
start comments
l0lawrence Nov 1, 2023
205758e
update test
l0lawrence Nov 1, 2023
67ac8d2
add sample
l0lawrence Nov 1, 2023
35ecda1
update tests
l0lawrence Nov 1, 2023
6611e67
update docstrings to add clarity
l0lawrence Nov 2, 2023
5016b76
update err message
l0lawrence Nov 2, 2023
1f07d6f
remove generated cloud event
l0lawrence Nov 2, 2023
dac9cf0
update sample
l0lawrence Nov 2, 2023
fc52a69
update
l0lawrence Nov 2, 2023
502c371
update samples to include dict
l0lawrence Nov 2, 2023
9a9c4b1
update patch
l0lawrence Nov 2, 2023
ba01c09
spacing
l0lawrence Nov 2, 2023
6bcbb77
add comments
l0lawrence Nov 2, 2023
464cd6b
formatting
l0lawrence Nov 2, 2023
8d0bbe6
update doc
l0lawrence Nov 2, 2023
ee013cc
update tests
l0lawrence Nov 2, 2023
5b0beac
update tests
l0lawrence Nov 2, 2023
3bf9dd2
tests
l0lawrence Nov 2, 2023
5c47708
skip tests for now
l0lawrence Nov 2, 2023
eeb1aaf
typo
l0lawrence Nov 3, 2023
df23b8b
add dict binary mode
l0lawrence Nov 3, 2023
b5b5236
update docstring
l0lawrence Nov 3, 2023
b21b5e6
update patch to allow throw error
l0lawrence Nov 3, 2023
11f9560
first pass at comments
l0lawrence Nov 3, 2023
89e889d
update patch eror
l0lawrence Nov 3, 2023
f3546d9
nit
l0lawrence Nov 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
342 changes: 306 additions & 36 deletions sdk/eventgrid/azure-eventgrid/azure/eventgrid/_operations/_patch.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------

VERSION = "4.14.1"
VERSION = "4.15.1"
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,36 @@
"""Customize generated code here.
Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize
"""
from typing import List, overload, Union, Any, Optional
from typing import List, overload, Union, Any, Optional, Callable, Dict, TypeVar
import sys
from azure.core.messaging import CloudEvent
from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, ResourceNotModifiedError, map_error
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.pipeline import PipelineResponse
from azure.core.rest import HttpRequest, AsyncHttpResponse
from azure.core.utils import case_insensitive_dict
from ...models._patch import ReceiveResult, ReceiveDetails
from ..._operations._patch import _cloud_event_to_generated
from ..._operations._patch import _to_http_request
from ._operations import EventGridClientOperationsMixin as OperationsMixin

from ... import models as _models
from ..._model_base import _deserialize
if sys.version_info >= (3, 9):
from collections.abc import MutableMapping
else:
from typing import MutableMapping # type: ignore # pylint: disable=ungrouped-imports
JSON = MutableMapping[str, Any] # pylint: disable=unsubscriptable-object
T = TypeVar('T')
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]

class EventGridClientOperationsMixin(OperationsMixin):

@overload
async def publish_cloud_events(
self,
topic_name: str,
body: List[CloudEvent],
*,
binary_mode: bool = False,
content_type: str = "application/cloudevents-batch+json; charset=utf-8",
**kwargs: Any
) -> None:
Expand All @@ -33,6 +48,10 @@ async def publish_cloud_events(
:type topic_name: str
:param body: Array of Cloud Events being published. Required.
:type body: list[~azure.core.messaging.CloudEvent]
:keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False.
When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`.
If 'datacontenttype` is not specified the default content type is `application/cloudevents-batch+json; charset=utf-8`.
Requires CloudEvent data to be passed in as bytes.
:keyword content_type: content type. Default value is "application/cloudevents-batch+json;
charset=utf-8".
:paramtype content_type: str
Expand All @@ -49,6 +68,7 @@ async def publish_cloud_events(
topic_name: str,
body: CloudEvent,
*,
binary_mode: bool = False,
content_type: str = "application/cloudevents+json; charset=utf-8",
**kwargs: Any
) -> None:
Expand All @@ -62,6 +82,78 @@ async def publish_cloud_events(
:type topic_name: str
:param body: Single Cloud Event being published. Required.
:type body: ~azure.core.messaging.CloudEvent
:keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False.
When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`.
If `datacontenttype` is not specified, the default content type is `application/cloudevents+json; charset=utf-8`.
Requires CloudEvent data to be passed in as bytes.
:keyword content_type: content type. Default value is "application/cloudevents+json;
charset=utf-8".
:paramtype content_type: str
:keyword bool stream: Whether to stream the response of this operation. Defaults to False. You
will have to context manage the returned stream.
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""

@overload
async def publish_cloud_events(
self,
topic_name: str,
body: Dict[str, Any],
*,
binary_mode: bool = False,
content_type: str = "application/cloudevents+json; charset=utf-8",
**kwargs: Any
) -> None:
"""Publish Single Cloud Event to namespace topic. In case of success, the server responds with an
HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return
various error codes. For example, 401: which indicates authorization failure, 403: which
indicates quota exceeded or message is too large, 410: which indicates that specific topic is
not found, 400: for bad request, and 500: for internal server error.

:param topic_name: Topic Name. Required.
:type topic_name: str
:param body: Single Cloud Event being published. Required.
:type body: dict[str, Any]
:keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False.
When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`.
If `datacontenttype` is not specified, the default content type is `application/cloudevents+json; charset=utf-8`.
Requires CloudEvent data to be passed in as bytes.
:keyword content_type: content type. Default value is "application/cloudevents+json;
charset=utf-8".
:paramtype content_type: str
:keyword bool stream: Whether to stream the response of this operation. Defaults to False. You
will have to context manage the returned stream.
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""

@overload
async def publish_cloud_events(
self,
topic_name: str,
body: List[Dict[str, Any]],
*,
binary_mode: bool = False,
content_type: str = "application/cloudevents-batch+json; charset=utf-8",
**kwargs: Any
) -> None:
"""Publish Single Cloud Event to namespace topic. In case of success, the server responds with an
HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return
various error codes. For example, 401: which indicates authorization failure, 403: which
indicates quota exceeded or message is too large, 410: which indicates that specific topic is
not found, 400: for bad request, and 500: for internal server error.

:param topic_name: Topic Name. Required.
:type topic_name: str
:param body: Batch of Cloud Events being published. Required.
:type body: list[dict[str, Any]]
:keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False.
When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`.
If 'datacontenttype` is not specified, the default content type is `application/cloudevents-batch+json; charset=utf-8`.
Requires CloudEvent data to be passed in as bytes.
:keyword content_type: content type. Default value is "application/cloudevents+json;
charset=utf-8".
:paramtype content_type: str
Expand All @@ -74,7 +166,12 @@ async def publish_cloud_events(

@distributed_trace_async
async def publish_cloud_events(
self, topic_name: str, body: Union[List[CloudEvent], CloudEvent], **kwargs
self,
topic_name: str,
body: Union[List[CloudEvent], CloudEvent, List[Dict[str, Any]], Dict[str, Any]],
*,
binary_mode: bool = False,
**kwargs
) -> None:
"""Publish Batch Cloud Event or Events to namespace topic. In case of success, the server responds with an
HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return
Expand All @@ -85,7 +182,11 @@ async def publish_cloud_events(
:param topic_name: Topic Name. Required.
:type topic_name: str
:param body: Cloud Event or Array of Cloud Events being published. Required.
:type body: ~azure.core.messaging.CloudEvent or list[~azure.core.messaging.CloudEvent]
:type body: ~azure.core.messaging.CloudEvent or list[~azure.core.messaging.CloudEvent] or dict[str, any] or list[dict[str, any]]
:keyword bool binary_mode: Whether to publish the events in binary mode. Defaults to False.
When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`.
If not specified, the default content type is "application/cloudevents+json; charset=utf-8".
Requires CloudEvent data to be passed in as bytes.
:keyword content_type: content type. Default value is "application/cloudevents+json;
charset=utf-8".
:paramtype content_type: str
Expand All @@ -95,16 +196,30 @@ async def publish_cloud_events(
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""

# Check that the body is a CloudEvent or list of CloudEvents even if dict
if isinstance(body, dict) or (isinstance(body, list) and isinstance(body[0], dict)):
kashifkhan marked this conversation as resolved.
Show resolved Hide resolved
try:
if isinstance(body, list):
body = [CloudEvent.from_dict(event) for event in body]
else:
body = CloudEvent.from_dict(body)
except AttributeError:
raise TypeError("Incorrect type for body. Expected CloudEvent,"
" list of CloudEvents, dict, or list of dicts."
" If dict passed, must follow the CloudEvent format.")


if isinstance(body, CloudEvent):
kwargs["content_type"] = "application/cloudevents+json; charset=utf-8"
internal_body = _cloud_event_to_generated(body)
await self._publish_cloud_event(topic_name, internal_body, **kwargs)
else:
await self._publish(topic_name, body, self._config.api_version, binary_mode, **kwargs)
elif isinstance(body, list):
kwargs["content_type"] = "application/cloudevents-batch+json; charset=utf-8"
internal_body_list = []
for item in body:
internal_body_list.append(_cloud_event_to_generated(item))
await self._publish_cloud_events(topic_name, internal_body_list, **kwargs)
await self._publish(topic_name, body, self._config.api_version, binary_mode, **kwargs)
else:
raise TypeError("Incorrect type for body. Expected CloudEvent,"
" list of CloudEvents, dict, or list of dicts."
" If dict passed, must follow the CloudEvent format.")

@distributed_trace_async
async def receive_cloud_events(
Expand Down Expand Up @@ -158,6 +273,69 @@ async def receive_cloud_events(
receive_result_deserialized = ReceiveResult(value=detail_items)
return receive_result_deserialized

async def _publish(self, topic_name: str, event: Any, api_version, binary_mode, **kwargs: Any) -> None:

error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError
}
error_map.update(kwargs.pop('error_map', {}) or {})

_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}

cls: ClsType[_models._models.PublishResult] = kwargs.pop( # pylint: disable=protected-access
'cls', None
)

content_type: str = kwargs.pop('content_type', _headers.pop('content-type', "application/cloudevents+json; charset=utf-8"))

# Given that we know the cloud event is binary mode, we can convert it to a HTTP request
http_request = _to_http_request(
topic_name=topic_name,
api_version=api_version,
headers=_headers,
params=_params,
content_type=content_type,
event=event,
binary_mode=binary_mode,
**kwargs
)

_stream = kwargs.pop("stream", False)

path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
}
http_request.url = self._client.format_url(http_request.url, **path_format_arguments)

_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access
http_request,
stream=_stream,
**kwargs
)

response = pipeline_response.http_response

if response.status_code not in [200]:
if _stream:
await response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)

if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(
_models._models.PublishResult, # pylint: disable=protected-access
response.json()
)

if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore

return deserialized # type: ignore


__all__: List[str] = [
"EventGridClientOperationsMixin"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import os
import asyncio
import json
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid.aio import EventGridClient
from azure.eventgrid.models import *
from azure.core.messaging import CloudEvent
from azure.core.exceptions import HttpResponseError


EVENTGRID_KEY: str = os.environ["EVENTGRID_KEY"]
EVENTGRID_ENDPOINT: str = os.environ["EVENTGRID_ENDPOINT"]
TOPIC_NAME: str = os.environ["EVENTGRID_TOPIC_NAME"]
EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENTGRID_EVENT_SUBSCRIPTION_NAME"]

# Create a client
client = EventGridClient(EVENTGRID_ENDPOINT, AzureKeyCredential(EVENTGRID_KEY))


async def run():
async with client:
# Publish a CloudEvent
try:
# Publish CloudEvent in binary mode with str encoded as bytes
cloud_event_dict = {"data":b"HI", "source":"https://example.com", "type":"example", "datacontenttype":"text/plain"}
await client.publish_cloud_events(topic_name=TOPIC_NAME, body=cloud_event_dict)

# Publish CloudEvent in binary mode with json encoded as bytes
cloud_event = CloudEvent(data=json.dumps({"hello":"data"}).encode("utf-8"), source="https://example.com", type="example", datacontenttype="application/json")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking, but might be helpful to add a comment to say what you're showing for each cloudEvent that you construct here. something like/better than:
line 29

# publish cloud event dict
...
# binary encode dict data, then publish in binary_mode
...

await client.publish_cloud_events(topic_name=TOPIC_NAME, body=cloud_event, binary_mode=True)

receive_result = await client.receive_cloud_events(
topic_name=TOPIC_NAME,
event_subscription_name=EVENT_SUBSCRIPTION_NAME,
max_events=10,
max_wait_time=10,
)
for details in receive_result.value:
cloud_event_received = details.event
print("CloudEvent: ", cloud_event_received)
print("Data: ", cloud_event_received.data)
except HttpResponseError:
raise

if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(run())
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,30 @@

EVENTGRID_KEY: str = os.environ["EVENTGRID_KEY"]
EVENTGRID_ENDPOINT: str = os.environ["EVENTGRID_ENDPOINT"]
TOPIC_NAME: str = os.environ["TOPIC_NAME"]
EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENT_SUBSCRIPTION_NAME"]
TOPIC_NAME: str = os.environ["EVENTGRID_TOPIC_NAME"]
EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENTGRID_EVENT_SUBSCRIPTION_NAME"]

# Create a client
client = EventGridClient(EVENTGRID_ENDPOINT, AzureKeyCredential(EVENTGRID_KEY))


async def run():
async with client:

# Publish a CloudEvent as dict
try:
cloud_event_dict = {"data": "hello", "source": "https://example.com", "type": "example"}
await client.publish_cloud_events(topic_name=TOPIC_NAME, body=cloud_event_dict)
except HttpResponseError:
raise

# Publish a list of CloudEvents as dict
try:
await client.publish_cloud_events(topic_name=TOPIC_NAME, body=[cloud_event_dict, cloud_event_dict])
except HttpResponseError:
raise


# Publish a CloudEvent
try:
cloud_event = CloudEvent(
Expand Down
Loading