Skip to content

Commit

Permalink
Send method changes (#16318)
Browse files Browse the repository at this point in the history
* commit -1

* async

* lint

* Update sdk/eventgrid/azure-eventgrid/azure/eventgrid/_helpers.py

* Update sdk/eventgrid/azure-eventgrid/azure/eventgrid/_helpers.py

* fix test

* Update sdk/eventgrid/azure-eventgrid/tests/test_eg_publisher_client_async.py

Co-authored-by: swathipil <[email protected]>

Co-authored-by: swathipil <[email protected]>
  • Loading branch information
Rakshith Bhyravabhotla and swathipil authored Jan 27, 2021
1 parent 1b10b4e commit db4ca2e
Show file tree
Hide file tree
Showing 17 changed files with 85 additions and 83 deletions.
1 change: 0 additions & 1 deletion sdk/eventgrid/azure-eventgrid/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
- `decode_cloud_event` is renamed to `deserialize_cloud_events`.
- `decode_eventgrid_event` is renamed to `deserialize_eventgrid_events`.
- The system events now exist in the `azure.eventgrid.systemevents` namespace instead of `azure.eventgrid.models` namespace.
- The `send` method in the `EventGridPubliserClient` is now replaced by the `send_events`.
- `topic_hostname` is renamed to `endpoint` in the `EventGridPublisherClient`.
- `data` is now a required param for `CloudEvent`.
- `azure.eventgrid.generate_shared_access_signature` method is now renamed to `generate_sas`.
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventgrid/azure-eventgrid/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ event = EventGridEvent(
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send_events(event)
client.send(event)
```

### Send a Cloud Event
Expand All @@ -120,7 +120,7 @@ event = CloudEvent(
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send_events(event)
client.send(event)
```

### Consume an Event Grid Event
Expand Down
8 changes: 8 additions & 0 deletions sdk/eventgrid/azure-eventgrid/azure/eventgrid/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ def _is_cloud_event(event):
except TypeError:
return False

def _is_eventgrid_event(event):
# type: (Any) -> bool
required = ('subject', 'event_type', 'data', 'data_version', 'id', 'event_time')
try:
return all([prop in event for prop in required])
except TypeError:
return False

def _eventgrid_data_typecheck(event):
try:
data = event.get('data')
Expand Down
27 changes: 14 additions & 13 deletions sdk/eventgrid/azure-eventgrid/azure/eventgrid/_publisher_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
_get_endpoint_only_fqdn,
_get_authentication_policy,
_is_cloud_event,
_is_eventgrid_event,
_eventgrid_data_typecheck
)
from ._generated._event_grid_publisher_client import EventGridPublisherClient as EventGridPublisherClientImpl
from ._policies import CloudEventDistributedTracingPolicy
from ._version import VERSION
from ._generated.models import CloudEvent as InternalCloudEvent, EventGridEvent as InternalEventGridEvent
from ._generated.models import CloudEvent as InternalCloudEvent

if TYPE_CHECKING:
# pylint: disable=unused-import,ungrouped-imports
Expand Down Expand Up @@ -98,11 +99,14 @@ def _policies(credential, **kwargs):
return policies

@distributed_trace
def send_events(self, events, **kwargs):
def send(self, events, **kwargs):
# type: (SendType, Any) -> None
"""Sends event data to topic hostname specified during client initialization.
Multiple events can be published at once by seding a list of events. It is very
inefficient to loop the send method for each event instead of just using a list
and we highly recommend against it.
:param events: A list or an instance of CloudEvent/EventGridEvent/CustomEvent to be sent.
:param events: A list of CloudEvent/EventGridEvent/CustomEvent to be sent.
:type events: SendType
:keyword str content_type: The type of content to be used to send the events.
Has default value "application/json; charset=utf-8" for EventGridEvents,
Expand All @@ -113,27 +117,24 @@ def send_events(self, events, **kwargs):
if not isinstance(events, list):
events = cast(ListEventType, [events])

if all(isinstance(e, CloudEvent) for e in events) or all(_is_cloud_event(e) for e in events):
if isinstance(events[0], CloudEvent) or _is_cloud_event(events[0]):
try:
events = [cast(CloudEvent, e)._to_generated(**kwargs) for e in events] # pylint: disable=protected-access
except AttributeError:
pass # means it's a dictionary
kwargs.setdefault("content_type", "application/cloudevents-batch+json; charset=utf-8")
self._client.publish_cloud_event_events(
return self._client.publish_cloud_event_events(
self._endpoint,
cast(List[InternalCloudEvent], events),
**kwargs
)
elif all(isinstance(e, EventGridEvent) for e in events) or all(isinstance(e, dict) for e in events):
kwargs.setdefault("content_type", "application/json; charset=utf-8")
kwargs.setdefault("content_type", "application/json; charset=utf-8")
if isinstance(events[0], EventGridEvent) or _is_eventgrid_event(events[0]):
for event in events:
_eventgrid_data_typecheck(event)
self._client.publish_events(self._endpoint, cast(List[InternalEventGridEvent], events), **kwargs)
elif all(isinstance(e, CustomEvent) for e in events):
serialized_events = [dict(e) for e in events] # type: ignore
self._client.publish_custom_event_events(self._endpoint, cast(List, serialized_events), **kwargs)
else:
raise ValueError("Event schema is not correct.")
elif isinstance(events[0], CustomEvent):
events = [dict(e) for e in events] # type: ignore
return self._client.publish_custom_event_events(self._endpoint, cast(List, events), **kwargs)

def close(self):
# type: () -> None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
_get_endpoint_only_fqdn,
_get_authentication_policy,
_is_cloud_event,
_is_eventgrid_event,
_eventgrid_data_typecheck
)
from .._generated.aio import EventGridPublisherClient as EventGridPublisherClientAsync
from .._generated.models import CloudEvent as InternalCloudEvent, EventGridEvent as InternalEventGridEvent
from .._generated.models import CloudEvent as InternalCloudEvent
from .._version import VERSION

SendType = Union[
Expand Down Expand Up @@ -98,13 +99,16 @@ def _policies(
return policies

@distributed_trace_async
async def send_events(
async def send(
self,
events: SendType,
**kwargs: Any) -> None:
"""Sends event data to topic hostname specified during client initialization.
Multiple events can be published at once by seding a list of events. It is very
inefficient to loop the send method for each event instead of just using a list
and we highly recommend against it.
:param events: A list or an instance of CloudEvent/EventGridEvent/CustomEvent to be sent.
:param events: A list of CloudEvent/EventGridEvent/CustomEvent to be sent.
:type events: SendType
:keyword str content_type: The type of content to be used to send the events.
Has default value "application/json; charset=utf-8" for EventGridEvents,
Expand All @@ -115,37 +119,24 @@ async def send_events(
if not isinstance(events, list):
events = cast(ListEventType, [events])

if all(isinstance(e, CloudEvent) for e in events) or all(_is_cloud_event(e) for e in events):
if isinstance(events[0], CloudEvent) or _is_cloud_event(events[0]):
try:
events = [
cast(CloudEvent, e)._to_generated(**kwargs) for e in events # pylint: disable=protected-access
]
events = [cast(CloudEvent, e)._to_generated(**kwargs) for e in events] # pylint: disable=protected-access
except AttributeError:
pass # means it's a dictionary
kwargs.setdefault("content_type", "application/cloudevents-batch+json; charset=utf-8")
await self._client.publish_cloud_event_events(
return await self._client.publish_cloud_event_events(
self._endpoint,
cast(List[InternalCloudEvent], events),
**kwargs
)
elif all(isinstance(e, EventGridEvent) for e in events) or all(isinstance(e, dict) for e in events):
kwargs.setdefault("content_type", "application/json; charset=utf-8")
kwargs.setdefault("content_type", "application/json; charset=utf-8")
if isinstance(events[0], EventGridEvent) or _is_eventgrid_event(events[0]):
for event in events:
_eventgrid_data_typecheck(event)
await self._client.publish_events(
self._endpoint,
cast(List[InternalEventGridEvent], events),
**kwargs
)
elif all(isinstance(e, CustomEvent) for e in events):
serialized_events = [dict(e) for e in events] # type: ignore
await self._client.publish_custom_event_events(
self._endpoint,
cast(List, serialized_events),
**kwargs
)
else:
raise ValueError("Event schema is not correct.")
elif isinstance(events[0], CustomEvent):
events = [dict(e) for e in events] # type: ignore
return await self._client.publish_custom_event_events(self._endpoint, cast(List, events), **kwargs)

async def __aenter__(self) -> "EventGridPublisherClient":
await self._client.__aenter__()
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventgrid/azure-eventgrid/migration_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ The `publish_events` API is replaced with `send` in v2.0. Additionally, `send` A

| In v1.3 | Equivalent in v2.0 | Sample |
|---|---|---|
|`EventGridClient(credentials).publish_events(topic_hostname, events)`|`EventGridPublisherClient(endpoint, credential).send_events(events)`|[Sample for client construction](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventgrid/azure-eventgrid/samples/champion_scenarios/cs5_publish_events_using_cloud_events_1.0_schema.py)|
|`EventGridClient(credentials).publish_events(topic_hostname, events)`|`EventGridPublisherClient(endpoint, credential).send(events)`|[Sample for client construction](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventgrid/azure-eventgrid/samples/champion_scenarios/cs5_publish_events_using_cloud_events_1.0_schema.py)|

### Consuming Events

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
credential = AzureKeyCredential(topic_key)
client = EventGridPublisherClient(endpoint, credential)

client.send_events([
client.send([
EventGridEvent(
event_type="Contoso.Items.ItemReceived",
data={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
credential = AzureSasCredential(signature)
client = EventGridPublisherClient(endpoint, credential)

client.send_events([
client.send([
EventGridEvent(
event_type="Contoso.Items.ItemReceived",
data={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
credential = AzureKeyCredential(domain_key)
client = EventGridPublisherClient(domain_hostname, credential)

client.send_events([
client.send([
EventGridEvent(
topic="MyCustomDomainTopic1",
event_type="Contoso.Items.ItemReceived",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
credential = AzureKeyCredential(topic_key)
client = EventGridPublisherClient(endpoint, credential)

client.send_events([
client.send([
CloudEvent(
type="Contoso.Items.ItemReceived",
source="/contoso/items",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def publish_event():
event_list.append(event)

# publish list of events
client.send_events(event_list)
client.send(event_list)
print("Batch of size {} published".format(len(event_list)))
time.sleep(randint(1, 5))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def publish_event():
event_list.append(event)

# publish list of events
client.send_events(event_list)
client.send(event_list)
print("Batch of size {} published".format(len(event_list)))
time.sleep(randint(1, 5))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def publish_event():
event_list.append(event)

# publish list of events
client.send_events(event_list)
client.send(event_list)
print("Batch of size {} published".format(len(event_list)))
time.sleep(randint(1, 5))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def publish_event():
event_list.append(event)

# publish list of events
client.send_events(event_list)
client.send(event_list)
print("Batch of size {} published".format(len(event_list)))
time.sleep(randint(1, 5))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def publish_event():
event_list.append(event)

# publish list of events
client.send_events(event_list)
client.send(event_list)
print("Batch of size {} published".format(len(event_list)))
time.sleep(randint(1, 5))

Expand Down
Loading

0 comments on commit db4ca2e

Please sign in to comment.