diff --git a/sdk/core/azure-core/CHANGELOG.md b/sdk/core/azure-core/CHANGELOG.md index 80a33bb00acc..8ee413fc3350 100644 --- a/sdk/core/azure-core/CHANGELOG.md +++ b/sdk/core/azure-core/CHANGELOG.md @@ -5,6 +5,7 @@ ### Features Added - Cut hard dependency on requests library +- Added a `from_json` method which now accepts storage QueueMessage, eventhub's EventData or ServiceBusMessage or simply json bytes to return a `CloudEvent` ### Breaking Changes diff --git a/sdk/core/azure-core/azure/core/messaging.py b/sdk/core/azure-core/azure/core/messaging.py index a677ae462dd1..0692986f388a 100644 --- a/sdk/core/azure-core/azure/core/messaging.py +++ b/sdk/core/azure-core/azure/core/messaging.py @@ -8,6 +8,7 @@ from base64 import b64decode from datetime import datetime from .utils._utils import _convert_to_isoformat, TZ_UTC +from .utils._messaging_shared import _get_json_content from .serialization import NULL try: @@ -181,3 +182,17 @@ def from_dict(cls, event): " The `source` and `type` params are required." ) return event_obj + + @classmethod + def from_json(cls, event): + # type: (Any) -> CloudEvent + """ + Returns the deserialized CloudEvent object when a json payload is provided. + :param event: The json string that should be converted into a CloudEvent. This can also be + a storage QueueMessage, eventhub's EventData or ServiceBusMessage + :type event: object + :rtype: CloudEvent + :raises ValueError: If the provided JSON is invalid. + """ + dict_event = _get_json_content(event) + return CloudEvent.from_dict(dict_event) diff --git a/sdk/core/azure-core/azure/core/utils/_messaging_shared.py b/sdk/core/azure-core/azure/core/utils/_messaging_shared.py new file mode 100644 index 000000000000..bc9307d00580 --- /dev/null +++ b/sdk/core/azure-core/azure/core/utils/_messaging_shared.py @@ -0,0 +1,41 @@ +# coding=utf-8 +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +# ========================================================================== +# This file contains duplicate code that is shared with azure-eventgrid. +# Both the files should always be identical. +# ========================================================================== + + + +import json +from azure.core.exceptions import raise_with_traceback + +def _get_json_content(obj): + """Event mixin to have methods that are common to different Event types + like CloudEvent, EventGridEvent etc. + """ + msg = "Failed to load JSON content from the object." + try: + # storage queue + return json.loads(obj.content) + except ValueError as err: + raise_with_traceback(ValueError, msg, err) + except AttributeError: + # eventhubs + try: + return json.loads(next(obj.body))[0] + except KeyError: + # servicebus + return json.loads(next(obj.body)) + except ValueError as err: + raise_with_traceback(ValueError, msg, err) + except: # pylint: disable=bare-except + try: + return json.loads(obj) + except ValueError as err: + raise_with_traceback(ValueError, msg, err) diff --git a/sdk/core/azure-core/tests/test_messaging_cloud_event.py b/sdk/core/azure-core/tests/test_messaging_cloud_event.py index c2d76b5a002c..c6773747a86d 100644 --- a/sdk/core/azure-core/tests/test_messaging_cloud_event.py +++ b/sdk/core/azure-core/tests/test_messaging_cloud_event.py @@ -3,13 +3,94 @@ # Licensed under the MIT License. # ------------------------------------ import pytest -import json +import uuid import datetime from azure.core.messaging import CloudEvent from azure.core.utils._utils import _convert_to_isoformat +from azure.core.utils._messaging_shared import _get_json_content from azure.core.serialization import NULL +class MockQueueMessage(object): + def __init__(self, content=None): + self.id = uuid.uuid4() + self.inserted_on = datetime.datetime.now() + self.expires_on = datetime.datetime.now() + datetime.timedelta(days=100) + self.dequeue_count = 1 + self.content = content + self.pop_receipt = None + self.next_visible_on = None + +class MockServiceBusReceivedMessage(object): + def __init__(self, body=None, **kwargs): + self.body=body + self.application_properties=None + self.session_id=None + self.message_id='3f6c5441-5be5-4f33-80c3-3ffeb6a090ce' + self.content_type='application/cloudevents+json; charset=utf-8' + self.correlation_id=None + self.to=None + self.reply_to=None + self.reply_to_session_id=None + self.subject=None + self.time_to_live=datetime.timedelta(days=14) + self.partition_key=None + self.scheduled_enqueue_time_utc=None + self.auto_renew_error=None, + self.dead_letter_error_description=None + self.dead_letter_reason=None + self.dead_letter_source=None + self.delivery_count=13 + self.enqueued_sequence_number=0 + self.enqueued_time_utc=datetime.datetime(2021, 7, 22, 22, 27, 41, 236000) + self.expires_at_utc=datetime.datetime(2021, 8, 5, 22, 27, 41, 236000) + self.sequence_number=11219 + self.lock_token='233146e3-d5a6-45eb-826f-691d82fb8b13' + +class MockEventhubData(object): + def __init__(self, body=None): + self._last_enqueued_event_properties = {} + self._sys_properties = None + if body is None: + raise ValueError("EventData cannot be None.") + + # Internal usage only for transforming AmqpAnnotatedMessage to outgoing EventData + self.body=body + self._raw_amqp_message = "some amqp data" + self.message_id = None + self.content_type = None + self.correlation_id = None + + +class MockBody(object): + def __init__(self, data=None): + self.data = data + + def __iter__(self): + return self + + def __next__(self): + if not self.data: + return """{"id":"f208feff-099b-4bda-a341-4afd0fa02fef","source":"https://egsample.dev/sampleevent","data":"ServiceBus","type":"Azure.Sdk.Sample","time":"2021-07-22T22:27:38.960209Z","specversion":"1.0"}""" + return self.data + + next = __next__ + +class MockEhBody(object): + def __init__(self, data=None): + self.data = data + + def __iter__(self): + return self + + def __next__(self): + if not self.data: + return b'[{"id":"f208feff-099b-4bda-a341-4afd0fa02fef","source":"https://egsample.dev/sampleevent","data":"Eventhub","type":"Azure.Sdk.Sample","time":"2021-07-22T22:27:38.960209Z","specversion":"1.0"}]' + return self.data + + next = __next__ + + # Cloud Event tests def test_cloud_event_constructor(): event = CloudEvent( @@ -469,4 +550,183 @@ def test_wrong_schema_raises_no_type(): "specversion":"1.0", } with pytest.raises(ValueError, match="The event does not conform to the cloud event spec https://github.com/cloudevents/spec. The `source` and `type` params are required."): - CloudEvent.from_dict(cloud_custom_dict) \ No newline at end of file + CloudEvent.from_dict(cloud_custom_dict) + +def test_get_bytes_storage_queue(): + cloud_storage_dict = """{ + "id":"a0517898-9fa4-4e70-b4a3-afda1dd68672", + "source":"/subscriptions/{subscription-id}/resourceGroups/{resource-group}/providers/Microsoft.Storage/storageAccounts/{storage-account}", + "data":{ + "api":"PutBlockList", + "client_request_id":"6d79dbfb-0e37-4fc4-981f-442c9ca65760", + "request_id":"831e1650-001e-001b-66ab-eeb76e000000", + "e_tag":"0x8D4BCC2E4835CD0", + "content_type":"application/octet-stream", + "content_length":524288, + "blob_type":"BlockBlob", + "url":"https://oc2d2817345i60006.blob.core.windows.net/oc2d2817345i200097container/oc2d2817345i20002296blob", + "sequencer":"00000000000004420000000000028963", + "storage_diagnostics":{"batchId":"b68529f3-68cd-4744-baa4-3c0498ec19f0"} + }, + "type":"Microsoft.Storage.BlobCreated", + "time":"2021-02-18T20:18:10.581147898Z", + "specversion":"1.0" + }""" + obj = MockQueueMessage(content=cloud_storage_dict) + + dict = _get_json_content(obj) + assert dict.get('data') == { + "api":"PutBlockList", + "client_request_id":"6d79dbfb-0e37-4fc4-981f-442c9ca65760", + "request_id":"831e1650-001e-001b-66ab-eeb76e000000", + "e_tag":"0x8D4BCC2E4835CD0", + "content_type":"application/octet-stream", + "content_length":524288, + "blob_type":"BlockBlob", + "url":"https://oc2d2817345i60006.blob.core.windows.net/oc2d2817345i200097container/oc2d2817345i20002296blob", + "sequencer":"00000000000004420000000000028963", + "storage_diagnostics":{"batchId":"b68529f3-68cd-4744-baa4-3c0498ec19f0"} + } + assert dict.get('specversion') == "1.0" + +def test_get_bytes_storage_queue_wrong_content(): + cloud_storage_string = u'This is a random string which must fail' + obj = MockQueueMessage(content=cloud_storage_string) + + with pytest.raises(ValueError, match="Failed to load JSON content from the object."): + _get_json_content(obj) + +def test_get_bytes_servicebus(): + obj = MockServiceBusReceivedMessage( + body=MockBody(), + message_id='3f6c5441-5be5-4f33-80c3-3ffeb6a090ce', + content_type='application/cloudevents+json; charset=utf-8', + time_to_live=datetime.timedelta(days=14), + delivery_count=13, + enqueued_sequence_number=0, + enqueued_time_utc=datetime.datetime(2021, 7, 22, 22, 27, 41, 236000), + expires_at_utc=datetime.datetime(2021, 8, 5, 22, 27, 41, 236000), + sequence_number=11219, + lock_token='233146e3-d5a6-45eb-826f-691d82fb8b13' + ) + dict = _get_json_content(obj) + assert dict.get('data') == "ServiceBus" + assert dict.get('specversion') == '1.0' + +def test_get_bytes_servicebus_wrong_content(): + obj = MockServiceBusReceivedMessage( + body=MockBody(data="random string"), + message_id='3f6c5441-5be5-4f33-80c3-3ffeb6a090ce', + content_type='application/json; charset=utf-8', + time_to_live=datetime.timedelta(days=14), + delivery_count=13, + enqueued_sequence_number=0, + enqueued_time_utc=datetime.datetime(2021, 7, 22, 22, 27, 41, 236000), + expires_at_utc=datetime.datetime(2021, 8, 5, 22, 27, 41, 236000), + sequence_number=11219, + lock_token='233146e3-d5a6-45eb-826f-691d82fb8b13' + ) + + with pytest.raises(ValueError, match="Failed to load JSON content from the object."): + _get_json_content(obj) + +def test_get_bytes_eventhubs(): + obj = MockEventhubData( + body=MockEhBody() + ) + dict = _get_json_content(obj) + assert dict.get('data') == 'Eventhub' + assert dict.get('specversion') == '1.0' + +def test_get_bytes_eventhubs_wrong_content(): + obj = MockEventhubData( + body=MockEhBody(data='random string') + ) + + with pytest.raises(ValueError, match="Failed to load JSON content from the object."): + dict = _get_json_content(obj) + +def test_get_bytes_random_obj(): + json_str = '{"id": "de0fd76c-4ef4-4dfb-ab3a-8f24a307e033", "source": "https://egtest.dev/cloudcustomevent", "data": {"team": "event grid squad"}, "type": "Azure.Sdk.Sample", "time": "2020-08-07T02:06:08.11969Z", "specversion": "1.0"}' + random_obj = { + "id":"de0fd76c-4ef4-4dfb-ab3a-8f24a307e033", + "source":"https://egtest.dev/cloudcustomevent", + "data":{"team": "event grid squad"}, + "type":"Azure.Sdk.Sample", + "time":"2020-08-07T02:06:08.11969Z", + "specversion":"1.0" + } + + assert _get_json_content(json_str) == random_obj + +def test_from_json_sb(): + obj = MockServiceBusReceivedMessage( + body=MockBody(), + message_id='3f6c5441-5be5-4f33-80c3-3ffeb6a090ce', + content_type='application/cloudevents+json; charset=utf-8', + time_to_live=datetime.timedelta(days=14), + delivery_count=13, + enqueued_sequence_number=0, + enqueued_time_utc=datetime.datetime(2021, 7, 22, 22, 27, 41, 236000), + expires_at_utc=datetime.datetime(2021, 8, 5, 22, 27, 41, 236000), + sequence_number=11219, + lock_token='233146e3-d5a6-45eb-826f-691d82fb8b13' + ) + event = CloudEvent.from_json(obj) + + assert event.id == "f208feff-099b-4bda-a341-4afd0fa02fef" + assert event.data == "ServiceBus" + +def test_from_json_eh(): + obj = MockEventhubData( + body=MockEhBody() + ) + event = CloudEvent.from_json(obj) + assert event.id == "f208feff-099b-4bda-a341-4afd0fa02fef" + assert event.data == "Eventhub" + +def test_from_json_storage(): + cloud_storage_dict = """{ + "id":"a0517898-9fa4-4e70-b4a3-afda1dd68672", + "source":"/subscriptions/{subscription-id}/resourceGroups/{resource-group}/providers/Microsoft.Storage/storageAccounts/{storage-account}", + "data":{ + "api":"PutBlockList", + "client_request_id":"6d79dbfb-0e37-4fc4-981f-442c9ca65760", + "request_id":"831e1650-001e-001b-66ab-eeb76e000000", + "e_tag":"0x8D4BCC2E4835CD0", + "content_type":"application/octet-stream", + "content_length":524288, + "blob_type":"BlockBlob", + "url":"https://oc2d2817345i60006.blob.core.windows.net/oc2d2817345i200097container/oc2d2817345i20002296blob", + "sequencer":"00000000000004420000000000028963", + "storage_diagnostics":{"batchId":"b68529f3-68cd-4744-baa4-3c0498ec19f0"} + }, + "type":"Microsoft.Storage.BlobCreated", + "time":"2021-02-18T20:18:10.581147898Z", + "specversion":"1.0" + }""" + obj = MockQueueMessage(content=cloud_storage_dict) + event = CloudEvent.from_json(obj) + assert event.data == { + "api":"PutBlockList", + "client_request_id":"6d79dbfb-0e37-4fc4-981f-442c9ca65760", + "request_id":"831e1650-001e-001b-66ab-eeb76e000000", + "e_tag":"0x8D4BCC2E4835CD0", + "content_type":"application/octet-stream", + "content_length":524288, + "blob_type":"BlockBlob", + "url":"https://oc2d2817345i60006.blob.core.windows.net/oc2d2817345i200097container/oc2d2817345i20002296blob", + "sequencer":"00000000000004420000000000028963", + "storage_diagnostics":{"batchId":"b68529f3-68cd-4744-baa4-3c0498ec19f0"} + } + + +def test_from_json(): + json_str = '{"id": "de0fd76c-4ef4-4dfb-ab3a-8f24a307e033", "source": "https://egtest.dev/cloudcustomevent", "data": {"team": "event grid squad"}, "type": "Azure.Sdk.Sample", "time": "2020-08-07T02:06:08.11969Z", "specversion": "1.0"}' + event = CloudEvent.from_json(json_str) + + assert event.data == {"team": "event grid squad"} + assert event.time.year == 2020 + assert event.time.month == 8 + assert event.time.day == 7 + assert event.time.hour == 2 diff --git a/sdk/eventgrid/azure-eventgrid/CHANGELOG.md b/sdk/eventgrid/azure-eventgrid/CHANGELOG.md index dae285744e73..81753dfc36ba 100644 --- a/sdk/eventgrid/azure-eventgrid/CHANGELOG.md +++ b/sdk/eventgrid/azure-eventgrid/CHANGELOG.md @@ -1,5 +1,13 @@ # Release History +## 4.5.0 (Unreleased) + +### Features Added + +- `EventGridEvent`'s `from_dict` method now accepts objects from servicebus, eventhubs and storage directly. +- Added a `from_json` method which now accepts storage QueueMessage, eventhub's EventData or ServiceBusMessage or simply json bytes to return an `EventGridEvent` + + ## 4.4.0 (2021-07-19) - Bumped `msrest` dependency to `0.6.21` to align with mgmt package. diff --git a/sdk/eventgrid/azure-eventgrid/azure/eventgrid/_messaging_shared.py b/sdk/eventgrid/azure-eventgrid/azure/eventgrid/_messaging_shared.py new file mode 100644 index 000000000000..bc9307d00580 --- /dev/null +++ b/sdk/eventgrid/azure-eventgrid/azure/eventgrid/_messaging_shared.py @@ -0,0 +1,41 @@ +# coding=utf-8 +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +# ========================================================================== +# This file contains duplicate code that is shared with azure-eventgrid. +# Both the files should always be identical. +# ========================================================================== + + + +import json +from azure.core.exceptions import raise_with_traceback + +def _get_json_content(obj): + """Event mixin to have methods that are common to different Event types + like CloudEvent, EventGridEvent etc. + """ + msg = "Failed to load JSON content from the object." + try: + # storage queue + return json.loads(obj.content) + except ValueError as err: + raise_with_traceback(ValueError, msg, err) + except AttributeError: + # eventhubs + try: + return json.loads(next(obj.body))[0] + except KeyError: + # servicebus + return json.loads(next(obj.body)) + except ValueError as err: + raise_with_traceback(ValueError, msg, err) + except: # pylint: disable=bare-except + try: + return json.loads(obj) + except ValueError as err: + raise_with_traceback(ValueError, msg, err) diff --git a/sdk/eventgrid/azure-eventgrid/azure/eventgrid/_models.py b/sdk/eventgrid/azure-eventgrid/azure/eventgrid/_models.py index 02f79cb2dcc4..ecc74505a9db 100644 --- a/sdk/eventgrid/azure-eventgrid/azure/eventgrid/_models.py +++ b/sdk/eventgrid/azure-eventgrid/azure/eventgrid/_models.py @@ -3,10 +3,11 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- # pylint:disable=protected-access -from typing import Any +from typing import Any, cast import datetime as dt import uuid from msrest.serialization import UTC +from ._messaging_shared import _get_json_content from ._generated.models import ( EventGridEvent as InternalEventGridEvent, ) @@ -96,3 +97,17 @@ def __repr__(self): return "EventGridEvent(subject={}, event_type={}, id={}, event_time={})".format( self.subject, self.event_type, self.id, self.event_time )[:1024] + + @classmethod + def from_json(cls, event): + # type: (Any) -> EventGridEvent + """ + Returns the deserialized EventGridEvent object when a json payload is provided. + :param event: The json string that should be converted into a EventGridEvent. This can also be + a storage QueueMessage, eventhub's EventData or ServiceBusMessage + :type event: object + :rtype: EventGridEvent + :raises ValueError: If the provided JSON is invalid. + """ + dict_event = _get_json_content(event) + return cast(EventGridEvent, EventGridEvent.from_dict(dict_event)) diff --git a/sdk/eventgrid/azure-eventgrid/azure/eventgrid/_version.py b/sdk/eventgrid/azure-eventgrid/azure/eventgrid/_version.py index b5234b1c4677..5638bcf9e648 100644 --- a/sdk/eventgrid/azure-eventgrid/azure/eventgrid/_version.py +++ b/sdk/eventgrid/azure-eventgrid/azure/eventgrid/_version.py @@ -9,4 +9,4 @@ # regenerated. # -------------------------------------------------------------------------- -VERSION = "4.4.0" +VERSION = "4.5.0" diff --git a/sdk/eventgrid/azure-eventgrid/samples/consume_samples/consume_cloud_events_from_eventhub.py b/sdk/eventgrid/azure-eventgrid/samples/consume_samples/consume_cloud_events_from_eventhub.py index b372d4ea6752..9aee835f0bfc 100644 --- a/sdk/eventgrid/azure-eventgrid/samples/consume_samples/consume_cloud_events_from_eventhub.py +++ b/sdk/eventgrid/azure-eventgrid/samples/consume_samples/consume_cloud_events_from_eventhub.py @@ -24,11 +24,9 @@ CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] EVENTHUB_NAME = os.environ["EVENT_HUB_NAME"] - - def on_event(partition_context, event): - dict_event = CloudEvent.from_dict(json.loads(event)[0]) - print("data: {}\n".format(deserialized_event.data)) + dict_event = CloudEvent.from_json(event) + print("data: {}\n".format(dict_event.data)) consumer_client = EventHubConsumerClient.from_connection_string( conn_str=CONNECTION_STR, diff --git a/sdk/eventgrid/azure-eventgrid/samples/consume_samples/consume_cloud_events_from_storage_queue.py b/sdk/eventgrid/azure-eventgrid/samples/consume_samples/consume_cloud_events_from_storage_queue.py index 2b94f5816c2b..4db27504690e 100644 --- a/sdk/eventgrid/azure-eventgrid/samples/consume_samples/consume_cloud_events_from_storage_queue.py +++ b/sdk/eventgrid/azure-eventgrid/samples/consume_samples/consume_cloud_events_from_storage_queue.py @@ -27,10 +27,10 @@ payload = qsc.get_queue_client( queue=queue_name, message_decode_policy=BinaryBase64DecodePolicy() - ).peek_messages() + ).peek_messages(max_messages=32) ## deserialize payload into a list of typed Events - events = [CloudEvent.from_dict(json.loads(msg.content)) for msg in payload] + events = [CloudEvent.from_json(msg) for msg in payload] for event in events: print(type(event)) ## CloudEvent diff --git a/sdk/eventgrid/azure-eventgrid/samples/consume_samples/consume_eventgrid_events_from_service_bus_queue.py b/sdk/eventgrid/azure-eventgrid/samples/consume_samples/consume_eventgrid_events_from_service_bus_queue.py index 0756043a9ac7..9889ea87c112 100644 --- a/sdk/eventgrid/azure-eventgrid/samples/consume_samples/consume_eventgrid_events_from_service_bus_queue.py +++ b/sdk/eventgrid/azure-eventgrid/samples/consume_samples/consume_eventgrid_events_from_service_bus_queue.py @@ -30,7 +30,7 @@ payload = sb_client.get_queue_receiver(queue_name).receive_messages() ## deserialize payload into a list of typed Events - events = [EventGridEvent.from_dict(json.loads(next(msg.body).decode('utf-8'))) for msg in payload] + events = [EventGridEvent.from_json(msg) for msg in payload] for event in events: print(type(event)) ## EventGridEvent diff --git a/sdk/eventgrid/azure-eventgrid/tests/test_eg_event_get_bytes.py b/sdk/eventgrid/azure-eventgrid/tests/test_eg_event_get_bytes.py new file mode 100644 index 000000000000..1ae80ecde2c9 --- /dev/null +++ b/sdk/eventgrid/azure-eventgrid/tests/test_eg_event_get_bytes.py @@ -0,0 +1,266 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +import datetime +import pytest +import uuid +from msrest.serialization import UTC +from azure.eventgrid._messaging_shared import _get_json_content +from azure.eventgrid import EventGridEvent + +class MockQueueMessage(object): + def __init__(self, content=None): + self.id = uuid.uuid4() + self.inserted_on = datetime.datetime.now() + self.expires_on = datetime.datetime.now() + datetime.timedelta(days=100) + self.dequeue_count = 1 + self.content = content + self.pop_receipt = None + self.next_visible_on = None + +class MockServiceBusReceivedMessage(object): + def __init__(self, body=None, **kwargs): + self.body=body + self.application_properties=None + self.session_id=None + self.message_id='3f6c5441-5be5-4f33-80c3-3ffeb6a090ce' + self.content_type='application/cloudevents+json; charset=utf-8' + self.correlation_id=None + self.to=None + self.reply_to=None + self.reply_to_session_id=None + self.subject=None + self.time_to_live=datetime.timedelta(days=14) + self.partition_key=None + self.scheduled_enqueue_time_utc=None + self.auto_renew_error=None, + self.dead_letter_error_description=None + self.dead_letter_reason=None + self.dead_letter_source=None + self.delivery_count=13 + self.enqueued_sequence_number=0 + self.enqueued_time_utc=datetime.datetime(2021, 7, 22, 22, 27, 41, 236000) + self.expires_at_utc=datetime.datetime(2021, 8, 5, 22, 27, 41, 236000) + self.sequence_number=11219 + self.lock_token='233146e3-d5a6-45eb-826f-691d82fb8b13' + +class MockEventhubData(object): + def __init__(self, body=None): + self._last_enqueued_event_properties = {} + self._sys_properties = None + if body is None: + raise ValueError("EventData cannot be None.") + + # Internal usage only for transforming AmqpAnnotatedMessage to outgoing EventData + self.body=body + self._raw_amqp_message = "some amqp data" + self.message_id = None + self.content_type = None + self.correlation_id = None + + +class MockBody(object): + def __init__(self, data=None): + self.data = data + + def __iter__(self): + return self + + def __next__(self): + if not self.data: + return """{"id":"f208feff-099b-4bda-a341-4afd0fa02fef","subject":"https://egsample.dev/sampleevent","data":"ServiceBus","event_type":"Azure.Sdk.Sample","event_time":"2021-07-22T22:27:38.960209Z","data_version":"1.0"}""" + return self.data + + next = __next__ + + +class MockEhBody(object): + def __init__(self, data=None): + self.data = data + + def __iter__(self): + return self + + def __next__(self): + if not self.data: + return b'[{"id":"f208feff-099b-4bda-a341-4afd0fa02fef","subject":"https://egsample.dev/sampleevent","data":"Eventhub","event_type":"Azure.Sdk.Sample","event_time":"2021-07-22T22:27:38.960209Z","data_version":"1.0"}]' + return self.data + + next = __next__ + +def test_get_bytes_storage_queue(): + cloud_storage_dict = """{ + "id":"a0517898-9fa4-4e70-b4a3-afda1dd68672", + "subject":"/subscriptions/{subscription-id}/resourceGroups/{resource-group}/providers/Microsoft.Storage/storageAccounts/{storage-account}", + "data":{ + "api":"PutBlockList", + "client_request_id":"6d79dbfb-0e37-4fc4-981f-442c9ca65760", + "request_id":"831e1650-001e-001b-66ab-eeb76e000000", + "e_tag":"0x8D4BCC2E4835CD0", + "content_type":"application/octet-stream", + "content_length":524288, + "blob_type":"BlockBlob", + "url":"https://oc2d2817345i60006.blob.core.windows.net/oc2d2817345i200097container/oc2d2817345i20002296blob", + "sequencer":"00000000000004420000000000028963", + "storage_diagnostics":{"batchId":"b68529f3-68cd-4744-baa4-3c0498ec19f0"} + }, + "event_type":"Microsoft.Storage.BlobCreated", + "event_time":"2021-02-18T20:18:10.581147898Z", + "data_version":"1.0" + }""" + obj = MockQueueMessage(content=cloud_storage_dict) + + dict = _get_json_content(obj) + assert dict.get('data') == { + "api":"PutBlockList", + "client_request_id":"6d79dbfb-0e37-4fc4-981f-442c9ca65760", + "request_id":"831e1650-001e-001b-66ab-eeb76e000000", + "e_tag":"0x8D4BCC2E4835CD0", + "content_type":"application/octet-stream", + "content_length":524288, + "blob_type":"BlockBlob", + "url":"https://oc2d2817345i60006.blob.core.windows.net/oc2d2817345i200097container/oc2d2817345i20002296blob", + "sequencer":"00000000000004420000000000028963", + "storage_diagnostics":{"batchId":"b68529f3-68cd-4744-baa4-3c0498ec19f0"} + } + assert dict.get('data_version') == "1.0" + +def test_get_bytes_storage_queue_wrong_content(): + string = u'This is a random string which must fail' + obj = MockQueueMessage(content=string) + + with pytest.raises(ValueError, match="Failed to load JSON content from the object."): + _get_json_content(obj) + +def test_get_bytes_servicebus(): + obj = MockServiceBusReceivedMessage( + body=MockBody(), + message_id='3f6c5441-5be5-4f33-80c3-3ffeb6a090ce', + content_type='application/cloudevents+json; charset=utf-8', + time_to_live=datetime.timedelta(days=14), + delivery_count=13, + enqueued_sequence_number=0, + enqueued_time_utc=datetime.datetime(2021, 7, 22, 22, 27, 41, 236000), + expires_at_utc=datetime.datetime(2021, 8, 5, 22, 27, 41, 236000), + sequence_number=11219, + lock_token='233146e3-d5a6-45eb-826f-691d82fb8b13' + ) + dict = _get_json_content(obj) + assert dict.get('data') == "ServiceBus" + assert dict.get('data_version') == '1.0' + +def test_get_bytes_servicebus_wrong_content(): + obj = MockServiceBusReceivedMessage( + body=MockBody(data='random'), + message_id='3f6c5441-5be5-4f33-80c3-3ffeb6a090ce', + content_type='application/json; charset=utf-8', + time_to_live=datetime.timedelta(days=14), + delivery_count=13, + enqueued_sequence_number=0, + enqueued_time_utc=datetime.datetime(2021, 7, 22, 22, 27, 41, 236000), + expires_at_utc=datetime.datetime(2021, 8, 5, 22, 27, 41, 236000), + sequence_number=11219, + lock_token='233146e3-d5a6-45eb-826f-691d82fb8b13' + ) + with pytest.raises(ValueError, match="Failed to load JSON content from the object."): + dict = _get_json_content(obj) + + +def test_get_bytes_eventhubs(): + obj = MockEventhubData( + body=MockEhBody() + ) + dict = _get_json_content(obj) + assert dict.get('data') == 'Eventhub' + assert dict.get('data_version') == '1.0' + +def test_get_bytes_eventhubs_wrong_content(): + obj = MockEventhubData( + body=MockEhBody(data='random string') + ) + + with pytest.raises(ValueError, match="Failed to load JSON content from the object."): + dict = _get_json_content(obj) + + +def test_get_bytes_random_obj(): + json_str = '{"id": "de0fd76c-4ef4-4dfb-ab3a-8f24a307e033", "subject": "https://egtest.dev/cloudcustomevent", "data": {"team": "event grid squad"}, "event_type": "Azure.Sdk.Sample", "event_time": "2020-08-07T02:06:08.11969Z", "data_version": "1.0"}' + random_obj = { + "id":"de0fd76c-4ef4-4dfb-ab3a-8f24a307e033", + "subject":"https://egtest.dev/cloudcustomevent", + "data":{"team": "event grid squad"}, + "event_type":"Azure.Sdk.Sample", + "event_time":"2020-08-07T02:06:08.11969Z", + "data_version":"1.0", + } + + assert _get_json_content(json_str) == random_obj + +def test_from_json_sb(): + obj = MockServiceBusReceivedMessage( + body=MockBody(), + message_id='3f6c5441-5be5-4f33-80c3-3ffeb6a090ce', + content_type='application/cloudevents+json; charset=utf-8', + time_to_live=datetime.timedelta(days=14), + delivery_count=13, + enqueued_sequence_number=0, + enqueued_time_utc=datetime.datetime(2021, 7, 22, 22, 27, 41, 236000), + expires_at_utc=datetime.datetime(2021, 8, 5, 22, 27, 41, 236000), + sequence_number=11219, + lock_token='233146e3-d5a6-45eb-826f-691d82fb8b13' + ) + event = EventGridEvent.from_json(obj) + + assert event.id == "f208feff-099b-4bda-a341-4afd0fa02fef" + assert event.data == "ServiceBus" + +def test_from_json_eh(): + obj = MockEventhubData( + body=MockEhBody() + ) + event = EventGridEvent.from_json(obj) + assert event.id == "f208feff-099b-4bda-a341-4afd0fa02fef" + assert event.data == "Eventhub" + +def test_from_json_storage(): + eg_storage_dict = """{ + "id":"a0517898-9fa4-4e70-b4a3-afda1dd68672", + "subject":"/subscriptions/{subscription-id}/resourceGroups/{resource-group}/providers/Microsoft.Storage/storageAccounts/{storage-account}", + "data":{ + "api":"PutBlockList", + "client_request_id":"6d79dbfb-0e37-4fc4-981f-442c9ca65760", + "request_id":"831e1650-001e-001b-66ab-eeb76e000000", + "e_tag":"0x8D4BCC2E4835CD0", + "content_type":"application/octet-stream", + "content_length":524288, + "blob_type":"BlockBlob", + "url":"https://oc2d2817345i60006.blob.core.windows.net/oc2d2817345i200097container/oc2d2817345i20002296blob", + "sequencer":"00000000000004420000000000028963", + "storage_diagnostics":{"batchId":"b68529f3-68cd-4744-baa4-3c0498ec19f0"} + }, + "event_type":"Microsoft.Storage.BlobCreated", + "event_time":"2021-02-18T20:18:10.581147898Z", + "data_version":"1.0" + }""" + obj = MockQueueMessage(content=eg_storage_dict) + event = EventGridEvent.from_json(obj) + assert event.data == { + "api":"PutBlockList", + "client_request_id":"6d79dbfb-0e37-4fc4-981f-442c9ca65760", + "request_id":"831e1650-001e-001b-66ab-eeb76e000000", + "e_tag":"0x8D4BCC2E4835CD0", + "content_type":"application/octet-stream", + "content_length":524288, + "blob_type":"BlockBlob", + "url":"https://oc2d2817345i60006.blob.core.windows.net/oc2d2817345i200097container/oc2d2817345i20002296blob", + "sequencer":"00000000000004420000000000028963", + "storage_diagnostics":{"batchId":"b68529f3-68cd-4744-baa4-3c0498ec19f0"} + } + + +def test_from_json(): + json_str = '{"id": "de0fd76c-4ef4-4dfb-ab3a-8f24a307e033", "subject": "https://egtest.dev/cloudcustomevent", "data": {"team": "event grid squad"}, "event_type": "Azure.Sdk.Sample", "event_time": "2020-08-07T02:06:08.11969Z", "data_version": "1.0"}' + event = EventGridEvent.from_json(json_str) + assert event.data == {"team": "event grid squad"} + assert event.event_time == datetime.datetime(2020, 8, 7, 2, 6, 8, 119690, UTC())