From cfc9062a108f05194856e855df6c747b0afa2841 Mon Sep 17 00:00:00 2001 From: Libba Lawrence Date: Fri, 3 Nov 2023 14:49:42 -0700 Subject: [PATCH] [EGv2] Binary mode (#32922) * [EGv2] Build Release (#30325) * move old sdk under legacy * gen typespec code * naming changes from archboard * samples * update patch naming * update imports with new gen * update samples * update client naming on aio * update receive op * update async to close client * update receive() * update gen code * moving around samples * updating samples * update samples * update patch and samples * patch internalmodels * spacing * updating model patch * update patch models * add both models back * update docstring * update docs * updating patch for receive * old EG models * add reject samples * patch * update format * update patch * eventgrid_client exceptions * update test imports * update total sample * receive patch fix * add in more tests * update test file * remove locktoken model * remove LockToken in patch * remove event delivery delay * eg client exceptions * .8.5 generation, and deliveryCount * rename sample * update version for beta * changelog * updating for gen * regen * generate via commit * publish result * fix docstring * publish docstring * return type * publish result * return publish result -- is none * format * update Publish result model * deliverycount patch * update from main * add copyright * added to readme * remove from readme * force publish_result response * update patch tp unindent * cspell * update mypy.ini * import order * mark livetest * update operations init * rename async * mypy * ignore mypy * pylint * pylint * ignore pylint for now to avoid gen code errors * ignore samples until ARM setup * update patches * remove publish result * remove PublishResult * remove publishresult * comma Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * update publishResult * change to .value * gen code " to ' * remove comment * ran black * update changelog * update sample readme * gen code without query name * gen code * update tsp commit * remove publishresult * readme disclaimer * update changelog --------- Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * Beta LiveTests (#30728) * add bicep file for tests * update output * update test * secret sanitization * refactor failing test * update conftest * update assets and sanitizers * update preparer loc * update conftest * conftest * update conftest * remove variables for now * update assets * update tests * try to update regex * update recordings * update conftest * update preparer * update test * update exception test * update tests * update asset * update conftest * pr comments * default needs to be eastus * import * [EGv2] Build Release (#30325) * move old sdk under legacy * gen typespec code * naming changes from archboard * samples * update patch naming * update imports with new gen * update samples * update client naming on aio * update receive op * update async to close client * update receive() * update gen code * moving around samples * updating samples * update samples * update patch and samples * patch internalmodels * spacing * updating model patch * update patch models * add both models back * update docstring * update docs * updating patch for receive * old EG models * add reject samples * patch * update format * update patch * eventgrid_client exceptions * update test imports * update total sample * receive patch fix * add in more tests * update test file * remove locktoken model * remove LockToken in patch * remove event delivery delay * eg client exceptions * .8.5 generation, and deliveryCount * rename sample * update version for beta * changelog * updating for gen * regen * generate via commit * publish result * fix docstring * publish docstring * return type * publish result * return publish result -- is none * format * update Publish result model * deliverycount patch * update from main * add copyright * added to readme * remove from readme * force publish_result response * update patch tp unindent * cspell * update mypy.ini * import order * mark livetest * update operations init * rename async * mypy * ignore mypy * pylint * pylint * ignore pylint for now to avoid gen code errors * ignore samples until ARM setup * update patches * remove publish result * remove PublishResult * remove publishresult * comma Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * update publishResult * change to .value * gen code " to ' * remove comment * ran black * update changelog * update sample readme * gen code without query name * gen code * update tsp commit * remove publishresult * readme disclaimer * update changelog --------- Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * fix merge * dont go to generated before binary * update patch * update patches * eventgrid client patch * changes * add * update test * update tyoe checking * pass through binary_mode for now -- * update patch aio * add async func * update * sys * update kwargs * add Todo and start adding more tests * update * differentiate between binary and not * update binary * no base64 in binary mode * binary * try JSONEncoder on everything if not str/bytes * update test * update test * update changes * whitespace * space * remove commented * str serialize extensions? * xml test * encode extensions as object * update test * update extension serialization for deserialize * move flag to operation level * extra comma * dont raise httpresponse * update patch * accept dict cloud events * spacing * remove content_type check * add live test * remove live test mark * update * use env vars * update test * only run live test * comment * typo * error incorrect * start comments * update test * add sample * update tests * update docstrings to add clarity * update err message * remove generated cloud event * update sample * update * update samples to include dict * update patch * spacing * add comments * formatting * update doc * update tests * update tests * tests * skip tests for now * typo * add dict binary mode * update docstring * update patch to allow throw error * first pass at comments * update patch eror * nit --------- Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> --- .../azure/eventgrid/_operations/_patch.py | 342 ++++++++++++++++-- .../azure/eventgrid/aio/_operations/_patch.py | 202 ++++++++++- .../sample_binary_mode_async.py | 51 +++ .../sample_publish_operation_async.py | 19 +- .../sample_binary_mode.py | 43 +++ .../sample_publish_operation.py | 16 +- .../azure-eventgrid/tests/test_eg_client.py | 131 +++++++ .../tests/test_eg_client_exceptions.py | 4 +- .../azure-eventgrid/tests/test_exceptions.py | 5 +- .../tests/test_exceptions_async.py | 4 +- .../tests/unittests/test_binary_mode.py | 68 ++++ 11 files changed, 826 insertions(+), 59 deletions(-) create mode 100644 sdk/eventgrid/azure-eventgrid/samples/async_samples/eventgrid_client_samples/sample_binary_mode_async.py create mode 100644 sdk/eventgrid/azure-eventgrid/samples/sync_samples/eventgrid_client_samples/sample_binary_mode.py create mode 100644 sdk/eventgrid/azure-eventgrid/tests/unittests/test_binary_mode.py diff --git a/sdk/eventgrid/azure-eventgrid/azure/eventgrid/_operations/_patch.py b/sdk/eventgrid/azure-eventgrid/azure/eventgrid/_operations/_patch.py index dc841427df53..79d4cd307960 100644 --- a/sdk/eventgrid/azure-eventgrid/azure/eventgrid/_operations/_patch.py +++ b/sdk/eventgrid/azure-eventgrid/azure/eventgrid/_operations/_patch.py @@ -6,39 +6,33 @@ Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize """ import base64 -from typing import List, overload, Union, Any, Optional +import json +import sys +from typing import Any, Callable, Dict, IO, List, Optional, TypeVar, Union, overload + +from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, ResourceNotModifiedError, map_error from azure.core.messaging import CloudEvent from azure.core.tracing.decorator import distributed_trace +from azure.core.pipeline import PipelineResponse +from azure.core.rest import HttpRequest, HttpResponse +from azure.core.utils import case_insensitive_dict + from ._operations import EventGridClientOperationsMixin as OperationsMixin -from ..models._models import CloudEvent as InternalCloudEvent +from .._model_base import _deserialize from ..models._patch import ReceiveResult, ReceiveDetails +from .. import models as _models +from .._serialization import Serializer +if sys.version_info >= (3, 9): + from collections.abc import MutableMapping +else: + from typing import MutableMapping # type: ignore # pylint: disable=ungrouped-imports -def _cloud_event_to_generated(cloud_event, **kwargs): - data_kwargs = {} - - if isinstance(cloud_event.data, bytes): - data_kwargs["data_base64"] = base64.b64encode( - cloud_event.data - ) - else: - data_kwargs["data"] = cloud_event.data - - internal_event = InternalCloudEvent( - id=cloud_event.id, - source=cloud_event.source, - type=cloud_event.type, - specversion=cloud_event.specversion, - time=cloud_event.time, - dataschema=cloud_event.dataschema, - datacontenttype=cloud_event.datacontenttype, - subject=cloud_event.subject, - **data_kwargs, - **kwargs - ) - if cloud_event.extensions: - internal_event.update(cloud_event.extensions) - return internal_event +JSON = MutableMapping[str, Any] # pylint: disable=unsubscriptable-object +T = TypeVar('T') +ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]] +_SERIALIZER = Serializer() +_SERIALIZER.client_side_validation = False class EventGridClientOperationsMixin(OperationsMixin): @@ -48,6 +42,7 @@ def publish_cloud_events( topic_name: str, body: List[CloudEvent], *, + binary_mode: bool = False, content_type: str = "application/cloudevents-batch+json; charset=utf-8", **kwargs: Any ) -> None: @@ -61,6 +56,10 @@ def publish_cloud_events( :type topic_name: str :param body: Array of Cloud Events being published. Required. :type body: list[~azure.core.messaging.CloudEvent] + :keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False. + When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`. If 'datacontenttype` is not specified, + the default content type is `application/cloudevents-batch+json; charset=utf-8`. + Requires CloudEvent data to be passed in as bytes. :keyword content_type: content type. Default value is "application/cloudevents-batch+json; charset=utf-8". :paramtype content_type: str @@ -77,6 +76,7 @@ def publish_cloud_events( topic_name: str, body: CloudEvent, *, + binary_mode: bool = False, content_type: str = "application/cloudevents+json; charset=utf-8", **kwargs: Any ) -> None: @@ -90,6 +90,44 @@ def publish_cloud_events( :type topic_name: str :param body: Single Cloud Event being published. Required. :type body: ~azure.core.messaging.CloudEvent + :keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False. + When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`. + If `datacontenttype` is not specified, the default content type is `application/cloudevents+json; charset=utf-8`. + Requires CloudEvent data to be passed in as bytes. + :keyword content_type: content type. Default value is "application/cloudevents+json; + charset=utf-8". + :paramtype content_type: str + :keyword bool stream: Whether to stream the response of this operation. Defaults to False. You + will have to context manage the returned stream. + :return: None + :rtype: None + :raises ~azure.core.exceptions.HttpResponseError: + """ + + @overload + def publish_cloud_events( + self, + topic_name: str, + body: Dict[str, Any], + *, + binary_mode: bool = False, + content_type: str = "application/cloudevents+json; charset=utf-8", + **kwargs: Any + ) -> None: + """Publish Single Cloud Event to namespace topic. In case of success, the server responds with an + HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return + various error codes. For example, 401: which indicates authorization failure, 403: which + indicates quota exceeded or message is too large, 410: which indicates that specific topic is + not found, 400: for bad request, and 500: for internal server error. + + :param topic_name: Topic Name. Required. + :type topic_name: str + :param body: Single Cloud Event being published. Required. + :type body: dict[str, Any] + :keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False. + When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`. + If `datacontenttype` is not specified, the default content type is `application/cloudevents+json; charset=utf-8`. + Requires CloudEvent data to be passed in as bytes. :keyword content_type: content type. Default value is "application/cloudevents+json; charset=utf-8". :paramtype content_type: str @@ -99,10 +137,48 @@ def publish_cloud_events( :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """ + + @overload + def publish_cloud_events( + self, + topic_name: str, + body: List[Dict[str, Any]], + *, + binary_mode: bool = False, + content_type: str = "application/cloudevents-batch+json; charset=utf-8", + **kwargs: Any + ) -> None: + """Publish Single Cloud Event to namespace topic. In case of success, the server responds with an + HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return + various error codes. For example, 401: which indicates authorization failure, 403: which + indicates quota exceeded or message is too large, 410: which indicates that specific topic is + not found, 400: for bad request, and 500: for internal server error. + :param topic_name: Topic Name. Required. + :type topic_name: str + :param body: Batch of Cloud Events being published. Required. + :type body: list[dict[str, Any]] + :keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False. + When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`. + If 'datacontenttype` is not specified, the default content type is `application/cloudevents-batch+json; charset=utf-8`. + Requires CloudEvent data to be passed in as bytes. + :keyword content_type: content type. Default value is "application/cloudevents-batch+json; charset=utf-8". + :paramtype content_type: str + :keyword bool stream: Whether to stream the response of this operation. Defaults to False. You + will have to context manage the returned stream. + :return: None + :rtype: None + :raises ~azure.core.exceptions.HttpResponseError: + """ + @distributed_trace def publish_cloud_events( - self, topic_name: str, body: Union[List[CloudEvent], CloudEvent], **kwargs + self, + topic_name: str, + body: Union[List[CloudEvent], CloudEvent, List[Dict[str, Any]], Dict[str, Any]], + *, + binary_mode: bool = False, + **kwargs ) -> None: """Publish Batch Cloud Event or Events to namespace topic. In case of success, the server responds with an HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return @@ -113,7 +189,11 @@ def publish_cloud_events( :param topic_name: Topic Name. Required. :type topic_name: str :param body: Cloud Event or array of Cloud Events being published. Required. - :type body: ~azure.core.messaging.CloudEvent or list[~azure.core.messaging.CloudEvent] + :type body: ~azure.core.messaging.CloudEvent or list[~azure.core.messaging.CloudEvent] or dict[str, any] or list[dict[str, any]] + :keyword bool binary_mode: Whether to publish the events in binary mode. Defaults to False. + When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`. + If not specified, the default content type is "application/cloudevents+json; charset=utf-8". + Requires CloudEvent data to be passed in as bytes. :keyword content_type: content type. Default value is "application/cloudevents+json; charset=utf-8". :paramtype content_type: str @@ -123,17 +203,30 @@ def publish_cloud_events( :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """ + + # Check that the body is a CloudEvent or list of CloudEvents even if dict + if isinstance(body, dict) or (isinstance(body, list) and isinstance(body[0], dict)): + try: + if isinstance(body, list): + body = [CloudEvent.from_dict(event) for event in body] + else: + body = CloudEvent.from_dict(body) + except AttributeError: + raise TypeError("Incorrect type for body. Expected CloudEvent," + " list of CloudEvents, dict, or list of dicts." + " If dict passed, must follow the CloudEvent format.") + if isinstance(body, CloudEvent): kwargs["content_type"] = "application/cloudevents+json; charset=utf-8" - internal_body = _cloud_event_to_generated(body) - self._publish_cloud_event(topic_name, internal_body, **kwargs) - else: + self._publish(topic_name, body, self._config.api_version, binary_mode, **kwargs) + elif isinstance(body, list): kwargs["content_type"] = "application/cloudevents-batch+json; charset=utf-8" - internal_body_list = [] - for item in body: - internal_body_list.append(_cloud_event_to_generated(item)) - self._publish_cloud_events(topic_name, internal_body_list, **kwargs) + self._publish(topic_name, body, self._config.api_version, binary_mode, **kwargs) + else: + raise TypeError("Incorrect type for body. Expected CloudEvent," + " list of CloudEvents, dict, or list of dicts." + " If dict passed, must follow the CloudEvent format.") @distributed_trace def receive_cloud_events( @@ -187,6 +280,183 @@ def receive_cloud_events( receive_result_deserialized = ReceiveResult(value=detail_items) return receive_result_deserialized + def _publish(self, topic_name: str, event: Any, api_version: str, binary_mode: Optional[bool] = False, **kwargs: Any) -> None: + + error_map = { + 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError + } + error_map.update(kwargs.pop('error_map', {}) or {}) + + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[_models._models.PublishResult] = kwargs.pop( # pylint: disable=protected-access + 'cls', None + ) + + content_type: str = kwargs.pop('content_type', _headers.pop('content-type', "application/cloudevents+json; charset=utf-8")) + + # Given that we know the cloud event is binary mode, we can convert it to a HTTP request + http_request = _to_http_request( + topic_name=topic_name, + api_version=api_version, + headers=_headers, + params=_params, + content_type=content_type, + event=event, + binary_mode=binary_mode, + **kwargs + ) + + _stream = kwargs.pop("stream", False) + + path_format_arguments = { + "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), + } + http_request.url = self._client.format_url(http_request.url, **path_format_arguments) + + # pipeline_response: PipelineResponse = self.send_request(http_request, **kwargs) + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + http_request, + stream=_stream, + **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + if _stream: + response.read() # Load the body in memory and close the socket + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + if _stream: + deserialized = response.iter_bytes() + else: + deserialized = _deserialize( + _models._models.PublishResult, # pylint: disable=protected-access + response.json() + ) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + +def _to_http_request(topic_name: str, **kwargs: Any) -> HttpRequest: + # Create a HTTP request for a binary mode CloudEvent + + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) + + event = kwargs.pop("event") + binary_mode = kwargs.pop("binary_mode", False) + + + if binary_mode: + # Content of the request is the data, if already in binary - no work needed + try: + if isinstance(event.data, bytes): + _content = event.data + else: + raise TypeError("CloudEvent data must be bytes when in binary mode." + "Did you forget to call `json.dumps()` and/or `encode()` on CloudEvent data?") + except AttributeError: + raise TypeError("Binary mode is not supported for batch CloudEvents. Set `binary_mode` to False when passing in a batch of CloudEvents.") + else: + # Content of the request is the serialized CloudEvent or serialized List[CloudEvent] + _content = _serialize_cloud_events(event) + + # content_type must be CloudEvent DataContentType when in binary mode + default_content_type = kwargs.pop('content_type', _headers.pop('content-type', "application/cloudevents+json; charset=utf-8")) + content_type: str = event.datacontenttype if (binary_mode and event.datacontenttype) else default_content_type + + api_version: str = kwargs.pop('api_version', _params.pop('api-version', "2023-10-01-preview")) + accept = _headers.pop('Accept', "application/json") + + # Construct URL + _url = "/topics/{topicName}:publish" + path_format_arguments = { + "topicName": _SERIALIZER.url("topic_name", topic_name, 'str'), + } + + _url: str = _url.format(**path_format_arguments) # type: ignore + + # Construct parameters + _params['api-version'] = _SERIALIZER.query("api_version", api_version, 'str') + + # Construct headers + _headers['content-type'] = _SERIALIZER.header("content_type", content_type, 'str') + _headers['Accept'] = _SERIALIZER.header("accept", accept, 'str') + + if binary_mode: + # Cloud Headers + _headers['ce-source'] = _SERIALIZER.header('ce-source', event.source, 'str') + _headers['ce-type'] = _SERIALIZER.header('ce-type', event.type, 'str') + if event.specversion: + _headers['ce-specversion'] = _SERIALIZER.header('ce-specversion', event.specversion, 'str') + if event.id: + _headers['ce-id'] = _SERIALIZER.header('ce-id', event.id, 'str') + if event.time: + _headers['ce-time'] = _SERIALIZER.header('ce-time', event.time, 'str') + if event.dataschema: + _headers['ce-dataschema'] = _SERIALIZER.header('ce-dataschema', event.dataschema, 'str') + if event.subject: + _headers['ce-subject'] = _SERIALIZER.header('ce-subject', event.subject, 'str') + if event.extensions: + for extension, value in event.extensions.items(): + _headers[f'ce-{extension}'] = _SERIALIZER.header('ce-extensions', value, 'str') + + return HttpRequest( + method="POST", + url=_url, + params=_params, + headers=_headers, + content=_content, # pass through content + **kwargs + ) + +def _serialize_cloud_events(events: Union[CloudEvent, List[CloudEvent]]) -> None: + # Serialize CloudEvent or List[CloudEvent] into a JSON string + is_list = isinstance(events, list) + data = {} + list_data = [] + for event in events if isinstance(events, list) else [events]: + # CloudEvent required fields but validate they are not set to None + if event.type: + data["type"] = _SERIALIZER.body(event.type, "str") + if event.specversion: + data["specversion"] = _SERIALIZER.body(event.specversion, "str") + if event.source: + data["source"] = _SERIALIZER.body(event.source, "str") + if event.id: + data["id"] = _SERIALIZER.body(event.id, "str") + + # Check if data is bytes and serialize to base64 + if isinstance(event.data, bytes): + data["data_base64"] = _SERIALIZER.serialize_bytearray(event.data) + elif event.data: + data["data"] = _SERIALIZER.body(event.data, "str") + + if event.subject: + data["subject"] = _SERIALIZER.body(event.subject, "str") + if event.time: + data["time"] = _SERIALIZER.body(event.time, "str") + if event.datacontenttype: + data["datacontenttype"] = _SERIALIZER.body(event.datacontenttype, "str") + if event.extensions: + for extension, value in event.extensions.items(): + data[extension] = _SERIALIZER.body(value, "str") + + # If single cloud event return the data + if not is_list: + return json.dumps(data) + else: + list_data.append(data) + # If list of cloud events return the list + return json.dumps(list_data) + __all__: List[str] = [ "EventGridClientOperationsMixin" diff --git a/sdk/eventgrid/azure-eventgrid/azure/eventgrid/aio/_operations/_patch.py b/sdk/eventgrid/azure-eventgrid/azure/eventgrid/aio/_operations/_patch.py index 4b940f11020d..6dc63a1bdfe0 100644 --- a/sdk/eventgrid/azure-eventgrid/azure/eventgrid/aio/_operations/_patch.py +++ b/sdk/eventgrid/azure-eventgrid/azure/eventgrid/aio/_operations/_patch.py @@ -5,21 +5,36 @@ """Customize generated code here. Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize """ -from typing import List, overload, Union, Any, Optional +from typing import List, overload, Union, Any, Optional, Callable, Dict, TypeVar +import sys from azure.core.messaging import CloudEvent +from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, ResourceNotModifiedError, map_error from azure.core.tracing.decorator_async import distributed_trace_async +from azure.core.pipeline import PipelineResponse +from azure.core.rest import HttpRequest, AsyncHttpResponse +from azure.core.utils import case_insensitive_dict from ...models._patch import ReceiveResult, ReceiveDetails -from ..._operations._patch import _cloud_event_to_generated +from ..._operations._patch import _to_http_request from ._operations import EventGridClientOperationsMixin as OperationsMixin - +from ... import models as _models +from ..._model_base import _deserialize +if sys.version_info >= (3, 9): + from collections.abc import MutableMapping +else: + from typing import MutableMapping # type: ignore # pylint: disable=ungrouped-imports +JSON = MutableMapping[str, Any] # pylint: disable=unsubscriptable-object +T = TypeVar('T') +ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]] class EventGridClientOperationsMixin(OperationsMixin): + @overload async def publish_cloud_events( self, topic_name: str, body: List[CloudEvent], *, + binary_mode: bool = False, content_type: str = "application/cloudevents-batch+json; charset=utf-8", **kwargs: Any ) -> None: @@ -33,6 +48,10 @@ async def publish_cloud_events( :type topic_name: str :param body: Array of Cloud Events being published. Required. :type body: list[~azure.core.messaging.CloudEvent] + :keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False. + When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`. + If 'datacontenttype` is not specified the default content type is `application/cloudevents-batch+json; charset=utf-8`. + Requires CloudEvent data to be passed in as bytes. :keyword content_type: content type. Default value is "application/cloudevents-batch+json; charset=utf-8". :paramtype content_type: str @@ -49,6 +68,7 @@ async def publish_cloud_events( topic_name: str, body: CloudEvent, *, + binary_mode: bool = False, content_type: str = "application/cloudevents+json; charset=utf-8", **kwargs: Any ) -> None: @@ -62,6 +82,78 @@ async def publish_cloud_events( :type topic_name: str :param body: Single Cloud Event being published. Required. :type body: ~azure.core.messaging.CloudEvent + :keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False. + When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`. + If `datacontenttype` is not specified, the default content type is `application/cloudevents+json; charset=utf-8`. + Requires CloudEvent data to be passed in as bytes. + :keyword content_type: content type. Default value is "application/cloudevents+json; + charset=utf-8". + :paramtype content_type: str + :keyword bool stream: Whether to stream the response of this operation. Defaults to False. You + will have to context manage the returned stream. + :return: None + :rtype: None + :raises ~azure.core.exceptions.HttpResponseError: + """ + + @overload + async def publish_cloud_events( + self, + topic_name: str, + body: Dict[str, Any], + *, + binary_mode: bool = False, + content_type: str = "application/cloudevents+json; charset=utf-8", + **kwargs: Any + ) -> None: + """Publish Single Cloud Event to namespace topic. In case of success, the server responds with an + HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return + various error codes. For example, 401: which indicates authorization failure, 403: which + indicates quota exceeded or message is too large, 410: which indicates that specific topic is + not found, 400: for bad request, and 500: for internal server error. + + :param topic_name: Topic Name. Required. + :type topic_name: str + :param body: Single Cloud Event being published. Required. + :type body: dict[str, Any] + :keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False. + When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`. + If `datacontenttype` is not specified, the default content type is `application/cloudevents+json; charset=utf-8`. + Requires CloudEvent data to be passed in as bytes. + :keyword content_type: content type. Default value is "application/cloudevents+json; + charset=utf-8". + :paramtype content_type: str + :keyword bool stream: Whether to stream the response of this operation. Defaults to False. You + will have to context manage the returned stream. + :return: None + :rtype: None + :raises ~azure.core.exceptions.HttpResponseError: + """ + + @overload + async def publish_cloud_events( + self, + topic_name: str, + body: List[Dict[str, Any]], + *, + binary_mode: bool = False, + content_type: str = "application/cloudevents-batch+json; charset=utf-8", + **kwargs: Any + ) -> None: + """Publish Single Cloud Event to namespace topic. In case of success, the server responds with an + HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return + various error codes. For example, 401: which indicates authorization failure, 403: which + indicates quota exceeded or message is too large, 410: which indicates that specific topic is + not found, 400: for bad request, and 500: for internal server error. + + :param topic_name: Topic Name. Required. + :type topic_name: str + :param body: Batch of Cloud Events being published. Required. + :type body: list[dict[str, Any]] + :keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False. + When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`. + If 'datacontenttype` is not specified, the default content type is `application/cloudevents-batch+json; charset=utf-8`. + Requires CloudEvent data to be passed in as bytes. :keyword content_type: content type. Default value is "application/cloudevents+json; charset=utf-8". :paramtype content_type: str @@ -74,7 +166,12 @@ async def publish_cloud_events( @distributed_trace_async async def publish_cloud_events( - self, topic_name: str, body: Union[List[CloudEvent], CloudEvent], **kwargs + self, + topic_name: str, + body: Union[List[CloudEvent], CloudEvent, List[Dict[str, Any]], Dict[str, Any]], + *, + binary_mode: bool = False, + **kwargs ) -> None: """Publish Batch Cloud Event or Events to namespace topic. In case of success, the server responds with an HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return @@ -85,7 +182,11 @@ async def publish_cloud_events( :param topic_name: Topic Name. Required. :type topic_name: str :param body: Cloud Event or Array of Cloud Events being published. Required. - :type body: ~azure.core.messaging.CloudEvent or list[~azure.core.messaging.CloudEvent] + :type body: ~azure.core.messaging.CloudEvent or list[~azure.core.messaging.CloudEvent] or dict[str, any] or list[dict[str, any]] + :keyword bool binary_mode: Whether to publish the events in binary mode. Defaults to False. + When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`. + If not specified, the default content type is "application/cloudevents+json; charset=utf-8". + Requires CloudEvent data to be passed in as bytes. :keyword content_type: content type. Default value is "application/cloudevents+json; charset=utf-8". :paramtype content_type: str @@ -95,16 +196,30 @@ async def publish_cloud_events( :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """ + + # Check that the body is a CloudEvent or list of CloudEvents even if dict + if isinstance(body, dict) or (isinstance(body, list) and isinstance(body[0], dict)): + try: + if isinstance(body, list): + body = [CloudEvent.from_dict(event) for event in body] + else: + body = CloudEvent.from_dict(body) + except AttributeError: + raise TypeError("Incorrect type for body. Expected CloudEvent," + " list of CloudEvents, dict, or list of dicts." + " If dict passed, must follow the CloudEvent format.") + + if isinstance(body, CloudEvent): kwargs["content_type"] = "application/cloudevents+json; charset=utf-8" - internal_body = _cloud_event_to_generated(body) - await self._publish_cloud_event(topic_name, internal_body, **kwargs) - else: + await self._publish(topic_name, body, self._config.api_version, binary_mode, **kwargs) + elif isinstance(body, list): kwargs["content_type"] = "application/cloudevents-batch+json; charset=utf-8" - internal_body_list = [] - for item in body: - internal_body_list.append(_cloud_event_to_generated(item)) - await self._publish_cloud_events(topic_name, internal_body_list, **kwargs) + await self._publish(topic_name, body, self._config.api_version, binary_mode, **kwargs) + else: + raise TypeError("Incorrect type for body. Expected CloudEvent," + " list of CloudEvents, dict, or list of dicts." + " If dict passed, must follow the CloudEvent format.") @distributed_trace_async async def receive_cloud_events( @@ -158,6 +273,69 @@ async def receive_cloud_events( receive_result_deserialized = ReceiveResult(value=detail_items) return receive_result_deserialized + async def _publish(self, topic_name: str, event: Any, api_version, binary_mode, **kwargs: Any) -> None: + + error_map = { + 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError + } + error_map.update(kwargs.pop('error_map', {}) or {}) + + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[_models._models.PublishResult] = kwargs.pop( # pylint: disable=protected-access + 'cls', None + ) + + content_type: str = kwargs.pop('content_type', _headers.pop('content-type', "application/cloudevents+json; charset=utf-8")) + + # Given that we know the cloud event is binary mode, we can convert it to a HTTP request + http_request = _to_http_request( + topic_name=topic_name, + api_version=api_version, + headers=_headers, + params=_params, + content_type=content_type, + event=event, + binary_mode=binary_mode, + **kwargs + ) + + _stream = kwargs.pop("stream", False) + + path_format_arguments = { + "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True), + } + http_request.url = self._client.format_url(http_request.url, **path_format_arguments) + + _stream = kwargs.pop("stream", False) + pipeline_response: PipelineResponse = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access + http_request, + stream=_stream, + **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + if _stream: + await response.read() # Load the body in memory and close the socket + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + if _stream: + deserialized = response.iter_bytes() + else: + deserialized = _deserialize( + _models._models.PublishResult, # pylint: disable=protected-access + response.json() + ) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + __all__: List[str] = [ "EventGridClientOperationsMixin" diff --git a/sdk/eventgrid/azure-eventgrid/samples/async_samples/eventgrid_client_samples/sample_binary_mode_async.py b/sdk/eventgrid/azure-eventgrid/samples/async_samples/eventgrid_client_samples/sample_binary_mode_async.py new file mode 100644 index 000000000000..2c440f9f9e7f --- /dev/null +++ b/sdk/eventgrid/azure-eventgrid/samples/async_samples/eventgrid_client_samples/sample_binary_mode_async.py @@ -0,0 +1,51 @@ +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +import os +import asyncio +import json +from azure.core.credentials import AzureKeyCredential +from azure.eventgrid.aio import EventGridClient +from azure.eventgrid.models import * +from azure.core.messaging import CloudEvent +from azure.core.exceptions import HttpResponseError + + +EVENTGRID_KEY: str = os.environ["EVENTGRID_KEY"] +EVENTGRID_ENDPOINT: str = os.environ["EVENTGRID_ENDPOINT"] +TOPIC_NAME: str = os.environ["EVENTGRID_TOPIC_NAME"] +EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENTGRID_EVENT_SUBSCRIPTION_NAME"] + +# Create a client +client = EventGridClient(EVENTGRID_ENDPOINT, AzureKeyCredential(EVENTGRID_KEY)) + + +async def run(): + async with client: + # Publish a CloudEvent + try: + # Publish CloudEvent in binary mode with str encoded as bytes + cloud_event_dict = {"data":b"HI", "source":"https://example.com", "type":"example", "datacontenttype":"text/plain"} + await client.publish_cloud_events(topic_name=TOPIC_NAME, body=cloud_event_dict) + + # Publish CloudEvent in binary mode with json encoded as bytes + cloud_event = CloudEvent(data=json.dumps({"hello":"data"}).encode("utf-8"), source="https://example.com", type="example", datacontenttype="application/json") + await client.publish_cloud_events(topic_name=TOPIC_NAME, body=cloud_event, binary_mode=True) + + receive_result = await client.receive_cloud_events( + topic_name=TOPIC_NAME, + event_subscription_name=EVENT_SUBSCRIPTION_NAME, + max_events=10, + max_wait_time=10, + ) + for details in receive_result.value: + cloud_event_received = details.event + print("CloudEvent: ", cloud_event_received) + print("Data: ", cloud_event_received.data) + except HttpResponseError: + raise + +if __name__ == "__main__": + asyncio.get_event_loop().run_until_complete(run()) diff --git a/sdk/eventgrid/azure-eventgrid/samples/async_samples/eventgrid_client_samples/sample_publish_operation_async.py b/sdk/eventgrid/azure-eventgrid/samples/async_samples/eventgrid_client_samples/sample_publish_operation_async.py index bb519f1cde2c..5fd2486723a6 100644 --- a/sdk/eventgrid/azure-eventgrid/samples/async_samples/eventgrid_client_samples/sample_publish_operation_async.py +++ b/sdk/eventgrid/azure-eventgrid/samples/async_samples/eventgrid_client_samples/sample_publish_operation_async.py @@ -14,8 +14,8 @@ EVENTGRID_KEY: str = os.environ["EVENTGRID_KEY"] EVENTGRID_ENDPOINT: str = os.environ["EVENTGRID_ENDPOINT"] -TOPIC_NAME: str = os.environ["TOPIC_NAME"] -EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENT_SUBSCRIPTION_NAME"] +TOPIC_NAME: str = os.environ["EVENTGRID_TOPIC_NAME"] +EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENTGRID_EVENT_SUBSCRIPTION_NAME"] # Create a client client = EventGridClient(EVENTGRID_ENDPOINT, AzureKeyCredential(EVENTGRID_KEY)) @@ -23,6 +23,21 @@ async def run(): async with client: + + # Publish a CloudEvent as dict + try: + cloud_event_dict = {"data": "hello", "source": "https://example.com", "type": "example"} + await client.publish_cloud_events(topic_name=TOPIC_NAME, body=cloud_event_dict) + except HttpResponseError: + raise + + # Publish a list of CloudEvents as dict + try: + await client.publish_cloud_events(topic_name=TOPIC_NAME, body=[cloud_event_dict, cloud_event_dict]) + except HttpResponseError: + raise + + # Publish a CloudEvent try: cloud_event = CloudEvent( diff --git a/sdk/eventgrid/azure-eventgrid/samples/sync_samples/eventgrid_client_samples/sample_binary_mode.py b/sdk/eventgrid/azure-eventgrid/samples/sync_samples/eventgrid_client_samples/sample_binary_mode.py new file mode 100644 index 000000000000..de3a4c3f0c32 --- /dev/null +++ b/sdk/eventgrid/azure-eventgrid/samples/sync_samples/eventgrid_client_samples/sample_binary_mode.py @@ -0,0 +1,43 @@ +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +import os +import json +from azure.core.credentials import AzureKeyCredential +from azure.eventgrid import EventGridClient +from azure.eventgrid.models import * +from azure.core.messaging import CloudEvent +from azure.core.exceptions import HttpResponseError + + +EVENTGRID_KEY: str = os.environ["EVENTGRID_KEY"] +EVENTGRID_ENDPOINT: str = os.environ["EVENTGRID_ENDPOINT"] +TOPIC_NAME: str = os.environ["EVENTGRID_TOPIC_NAME"] +EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENTGRID_EVENT_SUBSCRIPTION_NAME"] + +# Create a client +client = EventGridClient(EVENTGRID_ENDPOINT, AzureKeyCredential(EVENTGRID_KEY)) + + +# Publish a CloudEvent +try: + # Publish CloudEvent in binary mode with str encoded as bytes + cloud_event_dict = {"data":b"HI", "source":"https://example.com", "type":"example", "datacontenttype":"text/plain"} + client.publish_cloud_events(topic_name=TOPIC_NAME, body=cloud_event_dict) + + # Publish CloudEvent in binary mode with json encoded as bytes + cloud_event = CloudEvent(data=json.dumps({"hello":"data"}).encode("utf-8"), source="https://example.com", type="example", datacontenttype="application/json") + client.publish_cloud_events(topic_name=TOPIC_NAME, body=cloud_event, binary_mode=True) + + # Receive a CloudEvent + receive_result = client.receive_cloud_events(topic_name=TOPIC_NAME, event_subscription_name=EVENT_SUBSCRIPTION_NAME, max_events=100) + for receive_details in receive_result.value: + cloud_event_received = receive_details.event + print("CloudEvent: ", cloud_event_received) + print("CloudEvent data: ", cloud_event_received.data) +except HttpResponseError: + raise + + diff --git a/sdk/eventgrid/azure-eventgrid/samples/sync_samples/eventgrid_client_samples/sample_publish_operation.py b/sdk/eventgrid/azure-eventgrid/samples/sync_samples/eventgrid_client_samples/sample_publish_operation.py index fe5460387824..3c4db3a37f3b 100644 --- a/sdk/eventgrid/azure-eventgrid/samples/sync_samples/eventgrid_client_samples/sample_publish_operation.py +++ b/sdk/eventgrid/azure-eventgrid/samples/sync_samples/eventgrid_client_samples/sample_publish_operation.py @@ -13,12 +13,24 @@ EVENTGRID_KEY: str = os.environ["EVENTGRID_KEY"] EVENTGRID_ENDPOINT: str = os.environ["EVENTGRID_ENDPOINT"] -TOPIC_NAME: str = os.environ["TOPIC_NAME"] -EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENT_SUBSCRIPTION_NAME"] +TOPIC_NAME: str = os.environ["EVENTGRID_TOPIC_NAME"] +EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENTGRID_EVENT_SUBSCRIPTION_NAME"] # Create a client client = EventGridClient(EVENTGRID_ENDPOINT, AzureKeyCredential(EVENTGRID_KEY)) +# Publish a CloudEvent as dict +try: + cloud_event_dict = {"data": "hello", "source": "https://example.com", "type": "example"} + client.publish_cloud_events(topic_name=TOPIC_NAME, body=cloud_event_dict) +except HttpResponseError: + raise + +# Publish a list of CloudEvents as dict +try: + client.publish_cloud_events(topic_name=TOPIC_NAME, body=[cloud_event_dict, cloud_event_dict]) +except HttpResponseError: + raise # Publish a CloudEvent try: diff --git a/sdk/eventgrid/azure-eventgrid/tests/test_eg_client.py b/sdk/eventgrid/azure-eventgrid/tests/test_eg_client.py index 6f848c7c28a0..ed092f4e3b39 100644 --- a/sdk/eventgrid/azure-eventgrid/tests/test_eg_client.py +++ b/sdk/eventgrid/azure-eventgrid/tests/test_eg_client.py @@ -21,6 +21,137 @@ def create_eg_client(self, endpoint, key): endpoint=endpoint, credential=AzureKeyCredential(key) ) return client + + + @pytest.mark.live_test_only + def test_publish_binary_mode_xml(self): + eventgrid_endpoint = os.environ['EVENTGRID_ENDPOINT'] + eventgrid_key = os.environ['EVENTGRID_KEY'] + eventgrid_topic_name = os.environ['EVENTGRID_TOPIC_NAME'] + eventgrid_event_subscription_name = os.environ['EVENTGRID_EVENT_SUBSCRIPTION_NAME'] + client = self.create_eg_client(eventgrid_endpoint, eventgrid_key) + + from xml.etree import ElementTree as ET + xml_string = """test""" + tree = xml_string.encode('utf-8') + event = CloudEvent( + type="Contoso.Items.ItemReceived", + source="source", + subject="MySubject", + data=tree, + datacontenttype="text/xml", + extensions={"extension1": "value1", "extension2": "value2"} + ) + + client.publish_cloud_events( + eventgrid_topic_name, body=event, binary_mode=True + ) + + time.sleep(5) + + events = client.receive_cloud_events(eventgrid_topic_name, eventgrid_event_subscription_name,max_events=1) + my_returned_event = events.value[0].event + assert my_returned_event.data == xml_string + assert my_returned_event.datacontenttype == 'text/xml' + assert my_returned_event.type == "Contoso.Items.ItemReceived" + + + @pytest.mark.live_test_only + def test_publish_binary_mode_cloud_event(self): + eventgrid_endpoint = os.environ['EVENTGRID_ENDPOINT'] + eventgrid_key = os.environ['EVENTGRID_KEY'] + eventgrid_topic_name = os.environ['EVENTGRID_TOPIC_NAME'] + eventgrid_event_subscription_name = os.environ['EVENTGRID_EVENT_SUBSCRIPTION_NAME'] + client = self.create_eg_client(eventgrid_endpoint, eventgrid_key) + + event = CloudEvent( + type="Contoso.Items.ItemReceived", + source="source", + subject="MySubject", + data=b'this is binary data', + datacontenttype='text/plain' + ) + + client.publish_cloud_events( + eventgrid_topic_name, body=event, binary_mode=True + ) + + time.sleep(5) + + events = client.receive_cloud_events(eventgrid_topic_name, eventgrid_event_subscription_name,max_events=1) + my_returned_event = events.value[0].event + assert my_returned_event.data == 'this is binary data' + assert my_returned_event.datacontenttype == 'text/plain' + assert my_returned_event.type == "Contoso.Items.ItemReceived" + + + @pytest.mark.live_test_only + def test_publish_binary_mode_incorrect_cloud_event(self): + eventgrid_endpoint = os.environ['EVENTGRID_ENDPOINT'] + eventgrid_key = os.environ['EVENTGRID_KEY'] + eventgrid_topic_name = os.environ['EVENTGRID_TOPIC_NAME'] + client = self.create_eg_client(eventgrid_endpoint, eventgrid_key) + + event = CloudEvent( + type="Contoso.Items.ItemReceived", + source="source", + subject="MySubject", + data={"key": "value"}, + datacontenttype='text/plain' + ) + + with pytest.raises(TypeError): + client.publish_cloud_events( + eventgrid_topic_name, body=event, binary_mode=True + ) + + @pytest.mark.live_test_only + def test_publish_binary_mode_list_cloud_event(self): + eventgrid_endpoint = os.environ['EVENTGRID_ENDPOINT'] + eventgrid_key = os.environ['EVENTGRID_KEY'] + eventgrid_topic_name = os.environ['EVENTGRID_TOPIC_NAME'] + eventgrid_event_subscription_name = os.environ['EVENTGRID_EVENT_SUBSCRIPTION_NAME'] + client = self.create_eg_client(eventgrid_endpoint, eventgrid_key) + + event = CloudEvent( + type="Contoso.Items.ItemReceived", + source="source", + subject="MySubject", + data={"key": "value"}, + datacontenttype='text/plain' + ) + + with pytest.raises(TypeError): + client.publish_cloud_events( + eventgrid_topic_name, body=[event], binary_mode=True + ) + + @pytest.mark.live_test_only + def test_publish_binary_mode_combinations(self): + eventgrid_endpoint = os.environ['EVENTGRID_ENDPOINT'] + eventgrid_key = os.environ['EVENTGRID_KEY'] + eventgrid_topic_name = os.environ['EVENTGRID_TOPIC_NAME'] + eventgrid_event_subscription_name = os.environ['EVENTGRID_EVENT_SUBSCRIPTION_NAME'] + client = self.create_eg_client(eventgrid_endpoint, eventgrid_key) + + event = CloudEvent( + type="Contoso.Items.ItemReceived", + source="source", + subject="MySubject", + data=b"hello", + datacontenttype='text/plain' + ) + + dict_event = {"type": "Contoso.Items.ItemReceived", "source": "source", "subject": "MySubject", "data": b"hello", "datacontenttype": "text/plain"} + + + client.publish_cloud_events( + eventgrid_topic_name, body=event, binary_mode=True + ) + + client.publish_cloud_events( + eventgrid_topic_name, body=dict_event, binary_mode=True + ) @pytest.mark.skip("need to update conftest") @EventGridPreparer() diff --git a/sdk/eventgrid/azure-eventgrid/tests/test_eg_client_exceptions.py b/sdk/eventgrid/azure-eventgrid/tests/test_eg_client_exceptions.py index 7f5f8ffa4a91..108b2c1a8e71 100644 --- a/sdk/eventgrid/azure-eventgrid/tests/test_eg_client_exceptions.py +++ b/sdk/eventgrid/azure-eventgrid/tests/test_eg_client_exceptions.py @@ -25,7 +25,7 @@ def create_eg_client(self, endpoint, key): @EventGridPreparer() @recorded_by_proxy - def test_publish_cloud_event_bad_request(self, eventgrid_endpoint, eventgrid_key): + def test_publish_cloud_event_bad_request(self, eventgrid_endpoint, eventgrid_key, eventgrid_topic_name): client = self.create_eg_client(eventgrid_endpoint, eventgrid_key) event = CloudEvent( type="Contoso.Items.ItemReceived", @@ -35,7 +35,7 @@ def test_publish_cloud_event_bad_request(self, eventgrid_endpoint, eventgrid_key ) with pytest.raises(HttpResponseError): - client.publish_cloud_events("testtopic1", [event]) + client.publish_cloud_events(eventgrid_topic_name, [event]) @EventGridPreparer() @recorded_by_proxy diff --git a/sdk/eventgrid/azure-eventgrid/tests/test_exceptions.py b/sdk/eventgrid/azure-eventgrid/tests/test_exceptions.py index 294ef662ab93..a6cddbd4bc1e 100644 --- a/sdk/eventgrid/azure-eventgrid/tests/test_exceptions.py +++ b/sdk/eventgrid/azure-eventgrid/tests/test_exceptions.py @@ -62,9 +62,8 @@ def test_raise_on_auth_error(self, eventgrid_topic_endpoint): @pytest.mark.skip("Fix during MQ - skip to unblock pipeline") @pytest.mark.live_test_only - @EventGridPreparer() - def test_raise_on_bad_resource(self, eventgrid_topic_key): - credential = AzureKeyCredential(eventgrid_topic_key) + def test_raise_on_bad_resource(self): + credential = AzureKeyCredential(os.environ["EVENTGRID_TOPIC_KEY"]) client = EventGridPublisherClient( "https://bad-resource.eastus-1.eventgrid.azure.net/api/events", credential, diff --git a/sdk/eventgrid/azure-eventgrid/tests/test_exceptions_async.py b/sdk/eventgrid/azure-eventgrid/tests/test_exceptions_async.py index 2779ab678bc4..d69eb481f763 100644 --- a/sdk/eventgrid/azure-eventgrid/tests/test_exceptions_async.py +++ b/sdk/eventgrid/azure-eventgrid/tests/test_exceptions_async.py @@ -69,8 +69,8 @@ async def test_raise_on_auth_error(self, eventgrid_topic_endpoint): @pytest.mark.live_test_only @EventGridPreparer() @pytest.mark.asyncio - async def test_raise_on_bad_resource(self, eventgrid_topic_key): - credential = AzureKeyCredential(eventgrid_topic_key) + async def test_raise_on_bad_resource(self): + credential = AzureKeyCredential(os.environ["EVENTGRID_TOPIC_KEY"]) client = EventGridPublisherClient( "https://bad-resource.eastus-1.eventgrid.azure.net/api/events", credential, diff --git a/sdk/eventgrid/azure-eventgrid/tests/unittests/test_binary_mode.py b/sdk/eventgrid/azure-eventgrid/tests/unittests/test_binary_mode.py new file mode 100644 index 000000000000..5cbcdc2c8f39 --- /dev/null +++ b/sdk/eventgrid/azure-eventgrid/tests/unittests/test_binary_mode.py @@ -0,0 +1,68 @@ +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +import pytest +import json +import base64 +from azure.eventgrid._operations._patch import _to_http_request +from azure.eventgrid._model_base import AzureJSONEncoder +from azure.eventgrid.models import * +from azure.core.messaging import CloudEvent + +class MyTestClass(object): + def __init__(self, name): + self.name = name + def __str__(self): + return self.name + +class TestEGClientExceptions(): + + def test_binary_request_format(self): + event = CloudEvent( + type="Contoso.Items.ItemReceived", + source="source", + subject="MySubject", + data=b'this is binary data', + ) + + request = _to_http_request("https://eg-topic.westus2-1.eventgrid.azure.net/api/events", event=event, binary_mode=True) + + assert request.data == b"this is binary data" + assert request.headers.get("ce-source") == "source" + assert request.headers.get("ce-subject") == "MySubject" + assert request.headers.get("ce-type") == "Contoso.Items.ItemReceived" + + def test_binary_request_format_with_extensions_and_datacontenttype(self): + event = CloudEvent( + type="Contoso.Items.ItemReceived", + source="source", + subject="MySubject", + data=b'this is my data', + datacontenttype="application/json", + extensions={"extension1": "value1", "extension2": "value2"} + ) + + request = _to_http_request("https://eg-topic.westus2-1.eventgrid.azure.net/api/events", event=event, binary_mode=True) + + assert request.data == b"this is my data" + assert request.headers.get("ce-source") == "source" + assert request.headers.get("ce-subject") == "MySubject" + assert request.headers.get("ce-type") == "Contoso.Items.ItemReceived" + assert request.headers.get("ce-extension1") == "value1" + + def test_class_binary_request_format_error(self): + test_class = MyTestClass("test") + event = CloudEvent( + type="Contoso.Items.ItemReceived", + source="source", + subject="MySubject", + data=test_class, + datacontenttype="application/json", + extensions={"extension1": "value1", "extension2": "value2"} + ) + + with pytest.raises(TypeError): + _to_http_request("https://eg-topic.westus2-1.eventgrid.azure.net/api/events", event=event, binary_mode=True) +