From 384e802400ba9051a7b88802e3e8f2f021f8a737 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Tue, 21 May 2019 06:45:30 -0700 Subject: [PATCH] EventHubs track2 starter (#5330) * Move to under sdk * Remove policies * Remove debugging files * Rename Offset to EventPosition * make tests a namespace package * Revised test receive for new code * Revised test send for track two * Update async code from sync * Revise async receive and send live test for track2 * Use uamqp 1.2 * Resolve code review feedback * add queue_message to async sender * send_batch receives both list and iterator --- .../azure/eventhub/__init__.py | 22 +- .../azure/eventhub/aio/__init__.py | 9 + .../eventhub/aio/event_hubs_client_async.py | 238 ++++++++++ .../azure/eventhub/aio/receiver_async.py | 378 ++++++++++++++++ .../azure/eventhub/aio/sender_async.py | 405 +++++++++++++++++ .../azure-eventhubs/azure/eventhub/client.py | 416 ++---------------- .../azure/eventhub/client_abstract.py | 311 +++++++++++++ .../azure-eventhubs/azure/eventhub/common.py | 58 ++- .../azure/eventhub/configuration.py | 21 + .../azure/eventhub/receiver.py | 51 ++- .../azure-eventhubs/azure/eventhub/sender.py | 143 +++--- .../eventprocessorhost/eh_partition_pump.py | 7 +- .../eventprocessorhost/partition_manager.py | 5 +- sdk/eventhub/azure-eventhubs/conftest.py | 33 +- sdk/eventhub/azure-eventhubs/setup.py | 2 +- .../azure-eventhubs/tests/__init__.py | 0 .../tests/asynctests/test_receive_async.py | 236 ++++------ .../tests/asynctests/test_send_async.py | 139 ++---- .../azure-eventhubs/tests/test_receive.py | 182 +++----- .../azure-eventhubs/tests/test_send.py | 138 ++---- 20 files changed, 1863 insertions(+), 931 deletions(-) create mode 100644 sdk/eventhub/azure-eventhubs/azure/eventhub/aio/__init__.py create mode 100644 sdk/eventhub/azure-eventhubs/azure/eventhub/aio/event_hubs_client_async.py create mode 100644 sdk/eventhub/azure-eventhubs/azure/eventhub/aio/receiver_async.py create mode 100644 sdk/eventhub/azure-eventhubs/azure/eventhub/aio/sender_async.py create mode 100644 sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py create mode 100644 sdk/eventhub/azure-eventhubs/azure/eventhub/configuration.py create mode 100644 sdk/eventhub/azure-eventhubs/tests/__init__.py diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py index 7067761d5ef6..e2bcc43ed877 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py @@ -5,15 +5,21 @@ __version__ = "1.3.1" -from azure.eventhub.common import EventData, EventHubError, Offset +from azure.eventhub.common import EventData, EventHubError, EventPosition from azure.eventhub.client import EventHubClient from azure.eventhub.sender import Sender from azure.eventhub.receiver import Receiver +from uamqp.constants import MessageSendResult +from uamqp.constants import TransportType + +__all__ = [ + "EventData", + "EventHubError", + "EventPosition", + "EventHubClient", + "Sender", + "Receiver", + "MessageSendResult", + "TransportType", +] -try: - from azure.eventhub.async_ops import ( - EventHubClientAsync, - AsyncSender, - AsyncReceiver) -except (ImportError, SyntaxError): - pass # Python 3 async features not supported diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/__init__.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/__init__.py new file mode 100644 index 000000000000..020392000d1f --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/__init__.py @@ -0,0 +1,9 @@ +from .event_hubs_client_async import EventHubClient +from .receiver_async import Receiver +from .sender_async import Sender + +__all__ = [ + "EventHubClient", + "Receiver", + "Sender" +] diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/event_hubs_client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/event_hubs_client_async.py new file mode 100644 index 000000000000..275f76f6ee62 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/event_hubs_client_async.py @@ -0,0 +1,238 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import logging +import asyncio +import time +import datetime +import functools + +from uamqp import authentication, constants, types, errors +from uamqp import ( + Message, + AMQPClientAsync, +) + +from azure.eventhub.common import parse_sas_token +from azure.eventhub import ( + EventHubError) +from ..client_abstract import EventHubClientAbstract + +from .sender_async import Sender +from .receiver_async import Receiver + + +log = logging.getLogger(__name__) + + +class EventHubClient(EventHubClientAbstract): + """ + The EventHubClient class defines a high level interface for asynchronously + sending events to and receiving events from the Azure Event Hubs service. + + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START create_eventhub_client_async] + :end-before: [END create_eventhub_client_async] + :language: python + :dedent: 4 + :caption: Create a new instance of the Event Hub client async. + + """ + + def _create_auth(self, username=None, password=None): + """ + Create an ~uamqp.authentication.cbs_auth_async.SASTokenAuthAsync instance to authenticate + the session. + + :param username: The name of the shared access policy. + :type username: str + :param password: The shared access key. + :type password: str + """ + http_proxy = self.config.http_proxy + transport_type = self.config.transport_type + auth_timeout = self.config.auth_timeout + if self.aad_credential and self.sas_token: + raise ValueError("Can't have both sas_token and aad_credential") + + elif self.aad_credential: + get_jwt_token = functools.partial(self.aad_credential.get_token, ['https://eventhubs.azure.net//.default']) + # TODO: should use async aad_credential.get_token. Check with Charles for async identity api + return authentication.JWTTokenAsync(self.auth_uri, self.auth_uri, + get_jwt_token, http_proxy=http_proxy, + transport_type=transport_type) + elif self.sas_token: + token = self.sas_token() if callable(self.sas_token) else self.sas_token + try: + expiry = int(parse_sas_token(token)['se']) + except (KeyError, TypeError, IndexError): + raise ValueError("Supplied SAS token has no valid expiry value.") + return authentication.SASTokenAsync( + self.auth_uri, self.auth_uri, token, + expires_at=expiry, + timeout=auth_timeout, + http_proxy=http_proxy, + transport_type=transport_type) + + username = username or self._auth_config['username'] + password = password or self._auth_config['password'] + if "@sas.root" in username: + return authentication.SASLPlain( + self.address.hostname, username, password, http_proxy=http_proxy, transport_type=transport_type) + return authentication.SASTokenAsync.from_shared_access_key( + self.auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy, transport_type=transport_type) + + async def get_eventhub_information(self): + """ + Get details on the specified EventHub async. + + :rtype: dict + """ + alt_creds = { + "username": self._auth_config.get("iot_username"), + "password":self._auth_config.get("iot_password")} + try: + mgmt_auth = self._create_auth(**alt_creds) + mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.debug) + await mgmt_client.open_async() + mgmt_msg = Message(application_properties={'name': self.eh_name}) + response = await mgmt_client.mgmt_request_async( + mgmt_msg, + constants.READ_OPERATION, + op_type=b'com.microsoft:eventhub', + status_code_field=b'status-code', + description_fields=b'status-description') + eh_info = response.get_data() + output = {} + if eh_info: + output['name'] = eh_info[b'name'].decode('utf-8') + output['type'] = eh_info[b'type'].decode('utf-8') + output['created_at'] = datetime.datetime.fromtimestamp(float(eh_info[b'created_at'])/1000) + output['partition_count'] = eh_info[b'partition_count'] + output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']] + return output + finally: + await mgmt_client.close_async() + + def create_receiver( + self, consumer_group, partition, offset=None, epoch=None, operation=None, + prefetch=None, keep_alive=None, auto_reconnect=None, loop=None): + """ + Add an async receiver to the client for a particular consumer group and partition. + + :param consumer_group: The name of the consumer group. + :type consumer_group: str + :param partition: The ID of the partition. + :type partition: str + :param offset: The offset from which to start receiving. + :type offset: ~azure.eventhub.common.Offset + :param prefetch: The message prefetch count of the receiver. Default is 300. + :type prefetch: int + :operation: An optional operation to be appended to the hostname in the source URL. + The value must start with `/` character. + :type operation: str + :rtype: ~azure.eventhub.aio.receiver_async.ReceiverAsync + + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START create_eventhub_client_async_receiver] + :end-before: [END create_eventhub_client_async_receiver] + :language: python + :dedent: 4 + :caption: Add an async receiver to the client for a particular consumer group and partition. + + """ + keep_alive = self.config.keep_alive if keep_alive is None else keep_alive + auto_reconnect = self.config.auto_reconnect if auto_reconnect is None else auto_reconnect + prefetch = self.config.prefetch if prefetch is None else prefetch + + path = self.address.path + operation if operation else self.address.path + source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( + self.address.hostname, path, consumer_group, partition) + handler = Receiver( + self, source_url, offset=offset, epoch=epoch, prefetch=prefetch, keep_alive=keep_alive, + auto_reconnect=auto_reconnect, loop=loop) + return handler + + def create_epoch_receiver( + self, consumer_group, partition, epoch, prefetch=300, operation=None, loop=None): + """ + Add an async receiver to the client with an epoch value. Only a single epoch receiver + can connect to a partition at any given time - additional epoch receivers must have + a higher epoch value or they will be rejected. If a 2nd epoch receiver has + connected, the first will be closed. + + :param consumer_group: The name of the consumer group. + :type consumer_group: str + :param partition: The ID of the partition. + :type partition: str + :param epoch: The epoch value for the receiver. + :type epoch: int + :param prefetch: The message prefetch count of the receiver. Default is 300. + :type prefetch: int + :operation: An optional operation to be appended to the hostname in the source URL. + The value must start with `/` character. + :type operation: str + :rtype: ~azure.eventhub.aio.receiver_async.ReceiverAsync + + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START create_eventhub_client_async_epoch_receiver] + :end-before: [END create_eventhub_client_async_epoch_receiver] + :language: python + :dedent: 4 + :caption: Add an async receiver to the client with an epoch value. + + """ + return self.create_receiver(consumer_group, partition, epoch=epoch, prefetch=prefetch, + operation=operation, loop=loop) + + def create_sender( + self, partition=None, operation=None, send_timeout=None, keep_alive=None, auto_reconnect=None, loop=None): + """ + Add an async sender to the client to send ~azure.eventhub.common.EventData object + to an EventHub. + + :param partition: Optionally specify a particular partition to send to. + If omitted, the events will be distributed to available partitions via + round-robin. + :type partition: str + :operation: An optional operation to be appended to the hostname in the target URL. + The value must start with `/` character. + :type operation: str + :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is + queued. Default value is 60 seconds. If set to 0, there will be no timeout. + :type send_timeout: int + :param keep_alive: The time interval in seconds between pinging the connection to keep it alive during + periods of inactivity. The default value is 30 seconds. If set to `None`, the connection will not + be pinged. + :type keep_alive: int + :param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs. + Default value is `True`. + :type auto_reconnect: bool + :rtype: ~azure.eventhub.aio.sender_async.SenderAsync + + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START create_eventhub_client_async_sender] + :end-before: [END create_eventhub_client_async_sender] + :language: python + :dedent: 4 + :caption: Add an async sender to the client to + send ~azure.eventhub.common.EventData object to an EventHub. + + """ + target = "amqps://{}{}".format(self.address.hostname, self.address.path) + if operation: + target = target + operation + send_timeout = self.config.send_timeout if send_timeout is None else send_timeout + keep_alive = self.config.keep_alive if keep_alive is None else keep_alive + auto_reconnect = self.config.auto_reconnect if auto_reconnect is None else auto_reconnect + + handler = Sender( + self, target, partition=partition, send_timeout=send_timeout, + keep_alive=keep_alive, auto_reconnect=auto_reconnect, loop=loop) + return handler diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/receiver_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/receiver_async.py new file mode 100644 index 000000000000..aafe4c8dcd20 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/receiver_async.py @@ -0,0 +1,378 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import asyncio +import uuid +import logging + +from uamqp import errors, types +from uamqp import ReceiveClientAsync, Source + +from azure.eventhub import EventHubError, EventData +from azure.eventhub.common import _error_handler + +log = logging.getLogger(__name__) + + +class Receiver(object): + """ + Implements the async API of a Receiver. + + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START create_eventhub_client_async_receiver_instance] + :end-before: [END create_eventhub_client_async_receiver_instance] + :language: python + :dedent: 4 + :caption: Create a new instance of the Async Receiver. + + """ + timeout = 0 + _epoch = b'com.microsoft:epoch' + + def __init__( # pylint: disable=super-init-not-called + self, client, source, offset=None, prefetch=300, epoch=None, + keep_alive=None, auto_reconnect=False, loop=None): + """ + Instantiate an async receiver. + + :param client: The parent EventHubClientAsync. + :type client: ~azure.eventhub.aio.EventHubClientAsync + :param source: The source EventHub from which to receive events. + :type source: ~uamqp.address.Source + :param prefetch: The number of events to prefetch from the service + for processing. Default is 300. + :type prefetch: int + :param epoch: An optional epoch value. + :type epoch: int + :param loop: An event loop. + """ + self.loop = loop or asyncio.get_event_loop() + self.running = False + self.client = client + self.source = source + self.offset = offset + self.prefetch = prefetch + self.epoch = epoch + self.keep_alive = keep_alive + self.auto_reconnect = auto_reconnect + self.retry_policy = errors.ErrorPolicy(max_retries=self.client.config.max_retries, on_error=_error_handler) + self.reconnect_backoff = 1 + self.redirected = None + self.error = None + self.properties = None + partition = self.source.split('/')[-1] + self.name = "EHReceiver-{}-partition{}".format(uuid.uuid4(), partition) + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) + if epoch: + self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(epoch))} + self._handler = ReceiveClientAsync( + source, + auth=self.client.get_auth(), + debug=self.client.config.network_tracing, + prefetch=self.prefetch, + link_properties=self.properties, + timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=self.keep_alive, + client_name=self.name, + properties=self.client.create_properties(), + loop=self.loop) + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close(exc_val) + + def __aiter__(self): + self.messages_iter = self._handler.receive_messages_iter_async() + return self + + async def __anext__(self): + while True: + try: + message = await self.messages_iter.__anext__() + event_data = EventData(message=message) + self.offset = event_data.offset + return event_data + except (errors.TokenExpired, errors.AuthenticationException): + log.info("Receiver disconnected due to token error. Attempting reconnect.") + await self.reconnect() + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry and self.auto_reconnect: + log.info("Receiver detached. Attempting reconnect.") + await self.reconnect() + log.info("Receiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: + log.info("Receiver detached. Attempting reconnect.") + await self.reconnect() + log.info("Receiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close(exception=error) + raise error + except Exception as e: + log.info("Unexpected error occurred (%r). Shutting down.", e) + error = EventHubError("Receive failed: {}".format(e)) + await self.close(exception=error) + raise error + + async def open(self): + """ + Open the Receiver using the supplied conneciton. + If the handler has previously been redirected, the redirect + context will be used to create a new handler before opening it. + + :param connection: The underlying client shared connection. + :type: connection: ~uamqp.async_ops.connection_async.ConnectionAsync + + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START eventhub_client_async_receiver_open] + :end-before: [END eventhub_client_async_receiver_open] + :language: python + :dedent: 4 + :caption: Open the Receiver using the supplied conneciton. + + """ + # pylint: disable=protected-access + self.running = True + if self.redirected: + self.source = self.redirected.address + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} + self._handler = ReceiveClientAsync( + source, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, + prefetch=self.prefetch, + link_properties=self.properties, + timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=self.keep_alive, + client_name=self.name, + properties=self.client.create_properties(), + loop=self.loop) + await self._handler.open_async() + while not await self._handler.client_ready_async(): + await asyncio.sleep(0.05) + + async def _reconnect(self): # pylint: disable=too-many-statements + # pylint: disable=protected-access + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} + await self._handler.close_async() + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) + self._handler = ReceiveClientAsync( + source, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, + prefetch=self.prefetch, + link_properties=self.properties, + timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=self.keep_alive, + client_name=self.name, + properties=self.client.create_properties(), + loop=self.loop) + try: + await self._handler.open_async() + while not await self._handler.client_ready_async(): + await asyncio.sleep(0.05) + return True + except errors.TokenExpired as shutdown: + log.info("AsyncReceiver disconnected due to token expiry. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close(exception=error) + raise error + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry and self.auto_reconnect: + log.info("AsyncReceiver detached. Attempting reconnect.") + return False + log.info("AsyncReceiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: + log.info("AsyncReceiver detached. Attempting reconnect.") + return False + log.info("AsyncReceiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close(exception=error) + raise error + except errors.AMQPConnectionError as shutdown: + if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect: + log.info("AsyncReceiver couldn't authenticate. Attempting reconnect.") + return False + log.info("AsyncReceiver connection error (%r). Shutting down.", shutdown) + error = EventHubError(str(shutdown)) + await self.close(exception=error) + raise error + except Exception as e: + log.info("Unexpected error occurred (%r). Shutting down.", e) + error = EventHubError("Receiver reconnect failed: {}".format(e)) + await self.close(exception=error) + raise error + + async def reconnect(self): + """If the Receiver was disconnected from the service with + a retryable error - attempt to reconnect.""" + while not await self._reconnect_async(): + await asyncio.sleep(self.reconnect_backoff) + + async def close(self, exception=None): + """ + Close down the handler. If the handler has already closed, + this will be a no op. An optional exception can be passed in to + indicate that the handler was shutdown due to error. + + :param exception: An optional exception if the handler is closing + due to an error. + :type exception: Exception + + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START eventhub_client_async_receiver_close] + :end-before: [END eventhub_client_async_receiver_close] + :language: python + :dedent: 4 + :caption: Close down the handler. + + """ + self.running = False + if self.error: + return + if isinstance(exception, errors.LinkRedirect): + self.redirected = exception + elif isinstance(exception, EventHubError): + self.error = exception + elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): + self.error = EventHubError(str(exception), exception) + elif exception: + self.error = EventHubError(str(exception)) + else: + self.error = EventHubError("This receive handler is now closed.") + await self._handler.close_async() + + async def receive(self, max_batch_size=None, timeout=None): + """ + Receive events asynchronously from the EventHub. + + :param max_batch_size: Receive a batch of events. Batch size will + be up to the maximum specified, but will return as soon as service + returns no new events. If combined with a timeout and no events are + retrieve before the time, the result will be empty. If no batch + size is supplied, the prefetch size will be the maximum. + :type max_batch_size: int + :rtype: list[~azure.eventhub.common.EventData] + + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START eventhub_client_async_receive] + :end-before: [END eventhub_client_async_receive] + :language: python + :dedent: 4 + :caption: Sends an event data and asynchronously waits + until acknowledgement is received or operation times out. + + """ + if self.error: + raise self.error + if not self.running: + await self.open() + data_batch = [] + try: + timeout_ms = 1000 * timeout if timeout else 0 + message_batch = await self._handler.receive_message_batch_async( + max_batch_size=max_batch_size, + timeout=timeout_ms) + for message in message_batch: + event_data = EventData(message=message) + self.offset = event_data.offset + data_batch.append(event_data) + return data_batch + except (errors.TokenExpired, errors.AuthenticationException): + log.info("AsyncReceiver disconnected due to token error. Attempting reconnect.") + await self.reconnect() + return data_batch + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry and self.auto_reconnect: + log.info("AsyncReceiver detached. Attempting reconnect.") + await self.reconnect() + return data_batch + log.info("AsyncReceiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: + log.info("AsyncReceiver detached. Attempting reconnect.") + await self.reconnect() + return data_batch + log.info("AsyncReceiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close(exception=error) + raise error + except Exception as e: + log.info("Unexpected error occurred (%r). Shutting down.", e) + error = EventHubError("Receive failed: {}".format(e)) + await self.close(exception=error) + raise error + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close(exc_val) + + def __aiter__(self): + self.messages_iter = self._handler.receive_messages_iter_async() + return self + + async def __anext__(self): + while True: + try: + message = await self.messages_iter.__anext__() + event_data = EventData(message=message) + self.offset = event_data.offset + return event_data + except (errors.TokenExpired, errors.AuthenticationException): + log.info("Receiver disconnected due to token error. Attempting reconnect.") + await self.reconnect() + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry and self.auto_reconnect: + log.info("Receiver detached. Attempting reconnect.") + await self.reconnect() + log.info("Receiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: + log.info("Receiver detached. Attempting reconnect.") + await self.reconnect() + log.info("Receiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close(exception=error) + raise error + except Exception as e: + log.info("Unexpected error occurred (%r). Shutting down.", e) + error = EventHubError("Receive failed: {}".format(e)) + await self.close(exception=error) + raise error diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/sender_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/sender_async.py new file mode 100644 index 000000000000..0ef46d519579 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/sender_async.py @@ -0,0 +1,405 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import uuid +import asyncio +import logging + +from uamqp import constants, errors +from uamqp import SendClientAsync + +from azure.eventhub import MessageSendResult +from azure.eventhub import EventHubError +from azure.eventhub.common import _error_handler, _BatchSendEventData + +log = logging.getLogger(__name__) + + +class Sender(object): + """ + Implements the async API of a Sender. + + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START create_eventhub_client_async_sender_instance] + :end-before: [END create_eventhub_client_async_sender_instance] + :language: python + :dedent: 4 + :caption: Create a new instance of the Async Sender. + + """ + + def __init__( # pylint: disable=super-init-not-called + self, client, target, partition=None, send_timeout=60, + keep_alive=None, auto_reconnect=False, loop=None): + """ + Instantiate an EventHub event SenderAsync handler. + + :param client: The parent EventHubClientAsync. + :type client: ~azure.eventhub.aio.EventHubClientAsync + :param target: The URI of the EventHub to send to. + :type target: str + :param partition: The specific partition ID to send to. Default is `None`, in which case the service + will assign to all partitions using round-robin. + :type partition: str + :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is + queued. Default value is 60 seconds. If set to 0, there will be no timeout. + :type send_timeout: int + :param keep_alive: The time interval in seconds between pinging the connection to keep it alive during + periods of inactivity. The default value is `None`, i.e. no keep alive pings. + :type keep_alive: int + :param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs. + Default value is `True`. + :type auto_reconnect: bool + :param loop: An event loop. If not specified the default event loop will be used. + """ + self.loop = loop or asyncio.get_event_loop() + self.running = False + self.client = client + self.target = target + self.partition = partition + self.keep_alive = keep_alive + self.auto_reconnect = auto_reconnect + self.timeout = send_timeout + self.retry_policy = errors.ErrorPolicy(max_retries=self.client.config.max_retries, on_error=_error_handler) + self.reconnect_backoff = 1 + self.name = "EHSender-{}".format(uuid.uuid4()) + self.redirected = None + self.error = None + if partition: + self.target += "/Partitions/" + partition + self.name += "-partition{}".format(partition) + self._handler = SendClientAsync( + self.target, + auth=self.client.get_auth(), + debug=self.client.debug, + msg_timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=self.keep_alive, + client_name=self.name, + properties=self.client.create_properties(), + loop=self.loop) + self._outcome = None + self._condition = None + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close(exc_val) + + async def open(self): + """ + Open the Sender using the supplied conneciton. + If the handler has previously been redirected, the redirect + context will be used to create a new handler before opening it. + + :param connection: The underlying client shared connection. + :type: connection: ~uamqp.async_ops.connection_async.ConnectionAsync + + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START eventhub_client_async_sender_open] + :end-before: [END eventhub_client_async_sender_open] + :language: python + :dedent: 4 + :caption: Open the Sender using the supplied conneciton. + + """ + self.running = True + if self.redirected: + self.target = self.redirected.address + self._handler = SendClientAsync( + self.target, + auth=self.client.get_auth(), + debug=self.client.debug, + msg_timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=self.keep_alive, + client_name=self.name, + properties=self.client.create_properties(), + loop=self.loop) + await self._handler.open_async() + while not await self._handler.client_ready_async(): + await asyncio.sleep(0.05) + + async def _reconnect(self): + await self._handler.close_async() + unsent_events = self._handler.pending_messages + self._handler = SendClientAsync( + self.target, + auth=self.client.get_auth(), + debug=self.client.debug, + msg_timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=self.keep_alive, + client_name=self.name, + properties=self.client.create_properties(), + loop=self.loop) + try: + await self._handler.open_async() + self._handler.queue_message(*unsent_events) + await self._handler.wait_async() + return True + except errors.TokenExpired as shutdown: + log.info("AsyncSender disconnected due to token expiry. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close(exception=error) + raise error + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry and self.auto_reconnect: + log.info("AsyncSender detached. Attempting reconnect.") + return False + log.info("AsyncSender reconnect failed. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: + log.info("AsyncSender detached. Attempting reconnect.") + return False + log.info("AsyncSender reconnect failed. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close(exception=error) + raise error + except errors.AMQPConnectionError as shutdown: + if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect: + log.info("AsyncSender couldn't authenticate. Attempting reconnect.") + return False + log.info("AsyncSender connection error (%r). Shutting down.", shutdown) + error = EventHubError(str(shutdown)) + await self.close(exception=error) + raise error + except Exception as e: + log.info("Unexpected error occurred (%r). Shutting down.", e) + error = EventHubError("Sender reconnect failed: {}".format(e)) + await self.close(exception=error) + raise error + + async def reconnect(self): + """If the Receiver was disconnected from the service with + a retryable error - attempt to reconnect.""" + while not await self._reconnect(): + await asyncio.sleep(self.reconnect_backoff) + + async def close(self, exception=None): + """ + Close down the handler. If the handler has already closed, + this will be a no op. An optional exception can be passed in to + indicate that the handler was shutdown due to error. + + :param exception: An optional exception if the handler is closing + due to an error. + :type exception: Exception + + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START eventhub_client_async_sender_close] + :end-before: [END eventhub_client_async_sender_close] + :language: python + :dedent: 4 + :caption: Close down the handler. + + """ + self.running = False + if self.error: + return + if isinstance(exception, errors.LinkRedirect): + self.redirected = exception + elif isinstance(exception, EventHubError): + self.error = exception + elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): + self.error = EventHubError(str(exception), exception) + elif exception: + self.error = EventHubError(str(exception)) + else: + self.error = EventHubError("This send handler is now closed.") + await self._handler.close_async() + + async def _send_event_data(self, event_data): + if not self.running: + await self.open() + try: + self._handler.send_message(event_data.message) + if self._outcome != MessageSendResult.Ok: + raise Sender._error(self._outcome, self._condition) + except errors.MessageException as failed: + error = EventHubError(str(failed), failed) + await self.close(exception=error) + raise error + except (errors.TokenExpired, errors.AuthenticationException): + log.info("Sender disconnected due to token error. Attempting reconnect.") + await self.reconnect() + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry and self.auto_reconnect: + log.info("Sender detached. Attempting reconnect.") + await self.reconnect() + else: + log.info("Sender detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: + log.info("Sender detached. Attempting reconnect.") + await self.reconnect() + else: + log.info("Sender detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close(exception=error) + raise error + except Exception as e: + log.info("Unexpected error occurred (%r). Shutting down.", e) + error = EventHubError("Send failed: {}".format(e)) + await self.close(exception=error) + raise error + else: + return self._outcome + + async def send(self, event_data): + """ + Sends an event data and asynchronously waits until + acknowledgement is received or operation times out. + + :param event_data: The event to be sent. + :type event_data: ~azure.eventhub.common.EventData + :raises: ~azure.eventhub.common.EventHubError if the message fails to + send. + + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START eventhub_client_async_send] + :end-before: [END eventhub_client_async_send] + :language: python + :dedent: 4 + :caption: Sends an event data and asynchronously waits + until acknowledgement is received or operation times out. + + """ + if self.error: + raise self.error + if event_data.partition_key and self.partition: + raise ValueError("EventData partition key cannot be used with a partition sender.") + event_data.message.on_send_complete = self._on_outcome + await self._send_event_data(event_data) + + async def send_batch(self, batch_event_data): + """ + Sends an event data and blocks until acknowledgement is + received or operation times out. + + :param event_data: The event to be sent. + :type event_data: ~azure.eventhub.common.EventData + :raises: ~azure.eventhub.common.EventHubError if the message fails to + send. + :return: The outcome of the message send. + :rtype: ~uamqp.constants.MessageSendResult + + Example: + .. literalinclude:: ../examples/test_examples_eventhub.py + :start-after: [START eventhub_client_sync_send] + :end-before: [END eventhub_client_sync_send] + :language: python + :dedent: 4 + :caption: Sends an event data and blocks until acknowledgement is received or operation times out. + + """ + if self.error: + raise self.error + + def verify_partition(event_datas): + ed_iter = iter(event_datas) + try: + ed = next(ed_iter) + partition_key = ed.partition_key + yield ed + except StopIteration: + raise ValueError("batch_event_data must not be empty") + for ed in ed_iter: + if ed.partition_key != partition_key: + raise ValueError("partition key of all EventData must be the same if being sent in a batch") + yield ed + + wrapper_event_data = _BatchSendEventData(verify_partition(batch_event_data)) + wrapper_event_data.message.on_send_complete = self._on_outcome + return await self._send_event_data(wrapper_event_data) + + def queue_message(self, event_data, callback=None): + """ + Transfers an event data and notifies the callback when the operation is done. + + :param event_data: The event to be sent. + :type event_data: ~azure.eventhub.common.EventData + :param callback: Callback to be run once the message has been send. + This must be a function that accepts two arguments. + :type callback: callable[~uamqp.constants.MessageSendResult, ~azure.eventhub.common.EventHubError] + + Example: + .. literalinclude:: ../examples/test_examples_eventhub.py + :start-after: [START eventhub_client_transfer] + :end-before: [END eventhub_client_transfer] + :language: python + :dedent: 4 + :caption: Transfers an event data and notifies the callback when the operation is done. + + """ + if self.error: + raise self.error + if not self.running: + self.open() + if event_data.partition_key and self.partition: + raise ValueError("EventData partition key cannot be used with a partition sender.") + if callback: + event_data.message.on_send_complete = lambda o, c: callback(o, Sender._error(o, c)) + self._handler.queue_message(event_data.message) + + async def send_pending_messages(self): + """ + Wait until all transferred events have been sent. + """ + if self.error: + raise self.error + if not self.running: + raise ValueError("Unable to send until client has been started.") + try: + await self._handler.wait_async() + except (errors.TokenExpired, errors.AuthenticationException): + log.info("AsyncSender disconnected due to token error. Attempting reconnect.") + await self.reconnect() + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry and self.auto_reconnect: + log.info("AsyncSender detached. Attempting reconnect.") + await self.reconnect() + else: + log.info("AsyncSender detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: + log.info("AsyncSender detached. Attempting reconnect.") + await self.reconnect() + else: + log.info("AsyncSender detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close(exception=error) + raise error + except Exception as e: + log.info("Unexpected error occurred (%r).", e) + raise EventHubError("Send failed: {}".format(e)) + + def _on_outcome(self, outcome, condition): + """ + Called when the outcome is received for a delivery. + + :param outcome: The outcome of the message delivery - success or failure. + :type outcome: ~uamqp.constants.MessageSendResult + """ + self._outcome = outcome + self._condition = condition + + @staticmethod + def _error(outcome, condition): + return None if outcome == MessageSendResult.Ok else EventHubError(outcome, condition) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py index a50babfca8c3..58cd975e4baf 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py @@ -25,67 +25,13 @@ from azure.eventhub.sender import Sender from azure.eventhub.receiver import Receiver from azure.eventhub.common import EventHubError, parse_sas_token +from .client_abstract import EventHubClientAbstract log = logging.getLogger(__name__) -def _parse_conn_str(conn_str): - endpoint = None - shared_access_key_name = None - shared_access_key = None - entity_path = None - for element in conn_str.split(';'): - key, _, value = element.partition('=') - if key.lower() == 'endpoint': - endpoint = value.rstrip('/') - elif key.lower() == 'hostname': - endpoint = value.rstrip('/') - elif key.lower() == 'sharedaccesskeyname': - shared_access_key_name = value - elif key.lower() == 'sharedaccesskey': - shared_access_key = value - elif key.lower() == 'entitypath': - entity_path = value - if not all([endpoint, shared_access_key_name, shared_access_key]): - raise ValueError("Invalid connection string") - return endpoint, shared_access_key_name, shared_access_key, entity_path - - -def _generate_sas_token(uri, policy, key, expiry=None): - """Create a shared access signiture token as a string literal. - :returns: SAS token as string literal. - :rtype: str - """ - from base64 import b64encode, b64decode - from hashlib import sha256 - from hmac import HMAC - if not expiry: - expiry = time.time() + 3600 # Default to 1 hour. - encoded_uri = quote_plus(uri) - ttl = int(expiry) - sign_key = '%s\n%d' % (encoded_uri, ttl) - signature = b64encode(HMAC(b64decode(key), sign_key.encode('utf-8'), sha256).digest()) - result = { - 'sr': uri, - 'sig': signature, - 'se': str(ttl)} - if policy: - result['skn'] = policy - return 'SharedAccessSignature ' + urlencode(result) - - -def _build_uri(address, entity): - parsed = urlparse(address) - if parsed.path: - return address - if not entity: - raise ValueError("No EventHub specified") - address += "/" + str(entity) - return address - - -class EventHubClient(object): +class EventHubClient(EventHubClientAbstract): """ The EventHubClient class defines a high level interface for sending events to and receiving events from the Azure Event Hubs service. @@ -100,166 +46,6 @@ class EventHubClient(object): """ - def __init__( - self, address, username=None, password=None, debug=False, - http_proxy=None, auth_timeout=60, sas_token=None): - """ - Constructs a new EventHubClient with the given address URL. - - :param address: The full URI string of the Event Hub. This can optionally - include URL-encoded access name and key. - :type address: str - :param username: The name of the shared access policy. This must be supplied - if not encoded into the address. - :type username: str - :param password: The shared access key. This must be supplied if not encoded - into the address. - :type password: str - :param debug: Whether to output network trace logs to the logger. Default - is `False`. - :type debug: bool - :param http_proxy: HTTP proxy settings. This must be a dictionary with the following - keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). - Additionally the following keys may also be present: 'username', 'password'. - :type http_proxy: dict[str, Any] - :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. - The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. - :type auth_timeout: int - :param sas_token: A SAS token or function that returns a SAS token. If a function is supplied, - it will be used to retrieve subsequent tokens in the case of token expiry. The function should - take no arguments. - :type sas_token: str or callable - """ - self.container_id = "eventhub.pysdk-" + str(uuid.uuid4())[:8] - self.sas_token = sas_token - self.address = urlparse(address) - self.eh_name = self.address.path.lstrip('/') - self.http_proxy = http_proxy - self.mgmt_target = "amqps://{}/{}".format(self.address.hostname, self.eh_name) - url_username = unquote_plus(self.address.username) if self.address.username else None - username = username or url_username - url_password = unquote_plus(self.address.password) if self.address.password else None - password = password or url_password - if (not username or not password) and not sas_token: - raise ValueError("Please supply either username and password, or a SAS token") - self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path) - self._auth_config = {'username': username, 'password': password} - self.get_auth = functools.partial(self._create_auth) - self.debug = debug - self.auth_timeout = auth_timeout - - self.clients = [] - self.stopped = False - log.info("%r: Created the Event Hub client", self.container_id) - - @classmethod - def from_sas_token(cls, address, sas_token, eventhub=None, **kwargs): - """Create an EventHubClient from an existing auth token or token generator. - - :param address: The Event Hub address URL - :type address: str - :param sas_token: A SAS token or function that returns a SAS token. If a function is supplied, - it will be used to retrieve subsequent tokens in the case of token expiry. The function should - take no arguments. - :type sas_token: str or callable - :param eventhub: The name of the EventHub, if not already included in the address URL. - :type eventhub: str - :param debug: Whether to output network trace logs to the logger. Default - is `False`. - :type debug: bool - :param http_proxy: HTTP proxy settings. This must be a dictionary with the following - keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). - Additionally the following keys may also be present: 'username', 'password'. - :type http_proxy: dict[str, Any] - :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. - The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. - :type auth_timeout: int - - Example: - .. literalinclude:: ../examples/test_examples_eventhub.py - :start-after: [START create_eventhub_client_sas_token] - :end-before: [END create_eventhub_client_sas_token] - :language: python - :dedent: 4 - :caption: Create an EventHubClient from an existing auth token or token generator. - - """ - address = _build_uri(address, eventhub) - return cls(address, sas_token=sas_token, **kwargs) - - @classmethod - def from_connection_string(cls, conn_str, eventhub=None, **kwargs): - """Create an EventHubClient from a connection string. - - :param conn_str: The connection string. - :type conn_str: str - :param eventhub: The name of the EventHub, if the EntityName is - not included in the connection string. - :type eventhub: str - :param debug: Whether to output network trace logs to the logger. Default - is `False`. - :type debug: bool - :param http_proxy: HTTP proxy settings. This must be a dictionary with the following - keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). - Additionally the following keys may also be present: 'username', 'password'. - :type http_proxy: dict[str, Any] - :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. - The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. - :type auth_timeout: int - - Example: - .. literalinclude:: ../examples/test_examples_eventhub.py - :start-after: [START create_eventhub_client_connstr] - :end-before: [END create_eventhub_client_connstr] - :language: python - :dedent: 4 - :caption: Create an EventHubClient from a connection string. - - """ - address, policy, key, entity = _parse_conn_str(conn_str) - entity = eventhub or entity - address = _build_uri(address, entity) - return cls(address, username=policy, password=key, **kwargs) - - @classmethod - def from_iothub_connection_string(cls, conn_str, **kwargs): - """ - Create an EventHubClient from an IoTHub connection string. - - :param conn_str: The connection string. - :type conn_str: str - :param debug: Whether to output network trace logs to the logger. Default - is `False`. - :type debug: bool - :param http_proxy: HTTP proxy settings. This must be a dictionary with the following - keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). - Additionally the following keys may also be present: 'username', 'password'. - :type http_proxy: dict[str, Any] - :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. - The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. - :type auth_timeout: int - - Example: - .. literalinclude:: ../examples/test_examples_eventhub.py - :start-after: [START create_eventhub_client_iot_connstr] - :end-before: [END create_eventhub_client_iot_connstr] - :language: python - :dedent: 4 - :caption: Create an EventHubClient from an IoTHub connection string. - - """ - address, policy, key, _ = _parse_conn_str(conn_str) - hub_name = address.split('.')[0] - username = "{}@sas.root.{}".format(policy, hub_name) - password = _generate_sas_token(address, policy, key) - client = cls("amqps://" + address, username=username, password=password, **kwargs) - client._auth_config = { # pylint: disable=protected-access - 'iot_username': policy, - 'iot_password': key, - 'username': username, - 'password': password} - return client - def _create_auth(self, username=None, password=None): """ Create an ~uamqp.authentication.SASTokenAuth instance to authenticate @@ -270,7 +56,18 @@ def _create_auth(self, username=None, password=None): :param password: The shared access key. :type password: str """ - if self.sas_token: + http_proxy = self.config.http_proxy + transport_type = self.config.transport_type + auth_timeout = self.config.auth_timeout + if self.aad_credential and self.sas_token: + raise ValueError("Can't have both sas_token and aad_credential") + + elif self.aad_credential: + get_jwt_token = functools.partial(self.aad_credential.get_token, ['https://eventhubs.azure.net//.default']) + return authentication.JWTTokenAuth(self.auth_uri, self.auth_uri, + get_jwt_token, http_proxy=http_proxy, + transport_type=transport_type) + elif self.sas_token: token = self.sas_token() if callable(self.sas_token) else self.sas_token try: expiry = int(parse_sas_token(token)['se']) @@ -279,122 +76,19 @@ def _create_auth(self, username=None, password=None): return authentication.SASTokenAuth( self.auth_uri, self.auth_uri, token, expires_at=expiry, - timeout=self.auth_timeout, - http_proxy=self.http_proxy) + timeout=auth_timeout, + http_proxy=http_proxy, + transport_type=transport_type) username = username or self._auth_config['username'] password = password or self._auth_config['password'] if "@sas.root" in username: return authentication.SASLPlain( - self.address.hostname, username, password, http_proxy=self.http_proxy) + self.address.hostname, username, password, http_proxy=http_proxy, transport_type=transport_type) return authentication.SASTokenAuth.from_shared_access_key( - self.auth_uri, username, password, timeout=self.auth_timeout, http_proxy=self.http_proxy) - - def create_properties(self): # pylint: disable=no-self-use - """ - Format the properties with which to instantiate the connection. - This acts like a user agent over HTTP. - - :rtype: dict - """ - properties = {} - properties["product"] = "eventhub.python" - properties["version"] = __version__ - properties["framework"] = "Python {}.{}.{}".format(*sys.version_info[0:3]) - properties["platform"] = sys.platform - return properties - - def _close_clients(self): - """ - Close all open Sender/Receiver clients. - """ - for client in self.clients: - client.close() + self.auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy, transport_type=transport_type) - def _start_clients(self): - for client in self.clients: - try: - if not client.running: - client.open() - except Exception as exp: # pylint: disable=broad-except - client.close(exception=exp) - - def _process_redirect_uri(self, redirect): - redirect_uri = redirect.address.decode('utf-8') - auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups") - self.address = urlparse(auth_uri) - self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path) - self.eh_name = self.address.path.lstrip('/') - self.mgmt_target = redirect_uri - - def _handle_redirect(self, redirects): - if len(redirects) != len(self.clients): - raise EventHubError("Some clients are attempting to redirect the connection.") - if not all(r.hostname == redirects[0].hostname for r in redirects): - raise EventHubError("Multiple clients attempting to redirect to different hosts.") - self._process_redirect_uri(redirects[0]) - for client in self.clients: - client.open() - - def run(self): - """ - Run the EventHubClient in blocking mode. - Opens the connection and starts running all Sender/Receiver clients. - Returns a list of the start up results. For a succcesful client start the - result will be `None`, otherwise the exception raised. - If all clients failed to start, then run will fail, shut down the connection - and raise an exception. - If at least one client starts up successfully the run command will succeed. - - :rtype: list[~azure.eventhub.common.EventHubError] - - Example: - .. literalinclude:: ../examples/test_examples_eventhub.py - :start-after: [START eventhub_client_run] - :end-before: [END eventhub_client_run] - :language: python - :dedent: 4 - :caption: Run the EventHubClient in blocking mode. - - """ - log.info("%r: Starting %r clients", self.container_id, len(self.clients)) - try: - self._start_clients() - redirects = [c.redirected for c in self.clients if c.redirected] - failed = [c.error for c in self.clients if c.error] - if failed and len(failed) == len(self.clients): - log.warning("%r: All clients failed to start.", self.container_id) - raise failed[0] - if failed: - log.warning("%r: %r clients failed to start.", self.container_id, len(failed)) - elif redirects: - self._handle_redirect(redirects) - except EventHubError: - self.stop() - raise - except Exception as e: - self.stop() - raise EventHubError(str(e)) - return failed - - def stop(self): - """ - Stop the EventHubClient and all its Sender/Receiver clients. - - Example: - .. literalinclude:: ../examples/test_examples_eventhub.py - :start-after: [START eventhub_client_stop] - :end-before: [END eventhub_client_stop] - :language: python - :dedent: 4 - :caption: Stop the EventHubClient and all its Sender/Receiver clients. - - """ - log.info("%r: Stopping %r clients", self.container_id, len(self.clients)) - self.stopped = True - self._close_clients() - - def get_eventhub_info(self): + def get_eventhub_information(self): """ Get details on the specified EventHub. Keys in the details dictionary include: @@ -409,7 +103,7 @@ def get_eventhub_info(self): """ alt_creds = { "username": self._auth_config.get("iot_username"), - "password":self._auth_config.get("iot_password")} + "password": self._auth_config.get("iot_password")} try: mgmt_auth = self._create_auth(**alt_creds) mgmt_client = uamqp.AMQPClient(self.mgmt_target, auth=mgmt_auth, debug=self.debug) @@ -433,9 +127,12 @@ def get_eventhub_info(self): finally: mgmt_client.close() - def add_receiver( - self, consumer_group, partition, offset=None, prefetch=300, - operation=None, keep_alive=30, auto_reconnect=True): + def create_receiver( + self, consumer_group, partition, offset=None, epoch=None, operation=None, + prefetch=None, + keep_alive=None, + auto_reconnect=None, + ): """ Add a receiver to the client for a particular consumer group and partition. @@ -461,56 +158,23 @@ def add_receiver( :caption: Add a receiver to the client for a particular consumer group and partition. """ + keep_alive = self.config.keep_alive if keep_alive is None else keep_alive + auto_reconnect = self.config.auto_reconnect if auto_reconnect is None else auto_reconnect + prefetch = self.config.prefetch if prefetch is None else prefetch + path = self.address.path + operation if operation else self.address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( self.address.hostname, path, consumer_group, partition) handler = Receiver( - self, source_url, offset=offset, prefetch=prefetch, - keep_alive=keep_alive, auto_reconnect=auto_reconnect) - self.clients.append(handler) + self, source_url, offset=offset, epoch=epoch, prefetch=prefetch, keep_alive=keep_alive, auto_reconnect=auto_reconnect) return handler - def add_epoch_receiver( + def create_epoch_receiver( self, consumer_group, partition, epoch, prefetch=300, - operation=None, keep_alive=30, auto_reconnect=True): - """ - Add a receiver to the client with an epoch value. Only a single epoch receiver - can connect to a partition at any given time - additional epoch receivers must have - a higher epoch value or they will be rejected. If a 2nd epoch receiver has - connected, the first will be closed. + operation=None): + return self.create_receiver(consumer_group, partition, epoch=epoch, prefetch=prefetch, operation=operation) - :param consumer_group: The name of the consumer group. - :type consumer_group: str - :param partition: The ID of the partition. - :type partition: str - :param epoch: The epoch value for the receiver. - :type epoch: int - :param prefetch: The message prefetch count of the receiver. Default is 300. - :type prefetch: int - :operation: An optional operation to be appended to the hostname in the source URL. - The value must start with `/` character. - :type operation: str - :rtype: ~azure.eventhub.receiver.Receiver - - Example: - .. literalinclude:: ../examples/test_examples_eventhub.py - :start-after: [START create_eventhub_client_epoch_receiver] - :end-before: [END create_eventhub_client_epoch_receiver] - :language: python - :dedent: 4 - :caption: Add a receiver to the client with an epoch value. - - """ - path = self.address.path + operation if operation else self.address.path - source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( - self.address.hostname, path, consumer_group, partition) - handler = Receiver( - self, source_url, prefetch=prefetch, epoch=epoch, - keep_alive=keep_alive, auto_reconnect=auto_reconnect) - self.clients.append(handler) - return handler - - def add_sender(self, partition=None, operation=None, send_timeout=60, keep_alive=30, auto_reconnect=True): + def create_sender(self, partition=None, operation=None, send_timeout=None, keep_alive=None, auto_reconnect=None): """ Add a sender to the client to send EventData object to an EventHub. @@ -544,8 +208,10 @@ def add_sender(self, partition=None, operation=None, send_timeout=60, keep_alive target = "amqps://{}{}".format(self.address.hostname, self.address.path) if operation: target = target + operation + send_timeout = self.config.send_timeout if send_timeout is None else send_timeout + keep_alive = self.config.keep_alive if keep_alive is None else keep_alive + auto_reconnect = self.config.auto_reconnect if auto_reconnect is None else auto_reconnect + handler = Sender( - self, target, partition=partition, send_timeout=send_timeout, - keep_alive=keep_alive, auto_reconnect=auto_reconnect) - self.clients.append(handler) + self, target, partition=partition, send_timeout=send_timeout, keep_alive=keep_alive, auto_reconnect=auto_reconnect) return handler diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py new file mode 100644 index 000000000000..1435d15bd2be --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -0,0 +1,311 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +from __future__ import unicode_literals + +import logging +import datetime +import sys +import uuid +import time +import functools +from abc import abstractmethod +try: + from urlparse import urlparse + from urllib import unquote_plus, urlencode, quote_plus +except ImportError: + from urllib.parse import urlparse, unquote_plus, urlencode, quote_plus + +import uamqp +from uamqp import Message +from uamqp import authentication +from uamqp import constants + +from azure.eventhub import __version__ +from azure.eventhub.sender import Sender +from azure.eventhub.receiver import Receiver +from azure.eventhub.common import EventHubError, parse_sas_token +from azure.eventhub.configuration import Configuration + +log = logging.getLogger(__name__) + + +def _parse_conn_str(conn_str): + endpoint = None + shared_access_key_name = None + shared_access_key = None + entity_path = None + for element in conn_str.split(';'): + key, _, value = element.partition('=') + if key.lower() == 'endpoint': + endpoint = value.rstrip('/') + elif key.lower() == 'hostname': + endpoint = value.rstrip('/') + elif key.lower() == 'sharedaccesskeyname': + shared_access_key_name = value + elif key.lower() == 'sharedaccesskey': + shared_access_key = value + elif key.lower() == 'entitypath': + entity_path = value + if not all([endpoint, shared_access_key_name, shared_access_key]): + raise ValueError("Invalid connection string") + return endpoint, shared_access_key_name, shared_access_key, entity_path + + +def _generate_sas_token(uri, policy, key, expiry=None): + """Create a shared access signiture token as a string literal. + :returns: SAS token as string literal. + :rtype: str + """ + from base64 import b64encode, b64decode + from hashlib import sha256 + from hmac import HMAC + if not expiry: + expiry = time.time() + 3600 # Default to 1 hour. + encoded_uri = quote_plus(uri) + ttl = int(expiry) + sign_key = '%s\n%d' % (encoded_uri, ttl) + signature = b64encode(HMAC(b64decode(key), sign_key.encode('utf-8'), sha256).digest()) + result = { + 'sr': uri, + 'sig': signature, + 'se': str(ttl)} + if policy: + result['skn'] = policy + return 'SharedAccessSignature ' + urlencode(result) + + +def _build_uri(address, entity): + parsed = urlparse(address) + if parsed.path: + return address + if not entity: + raise ValueError("No EventHub specified") + address += "/" + str(entity) + return address + + +class EventHubClientAbstract(object): + """ + The EventHubClient class defines a high level interface for sending + events to and receiving events from the Azure Event Hubs service. + + Example: + .. literalinclude:: ../examples/test_examples_eventhub.py + :start-after: [START create_eventhub_client] + :end-before: [END create_eventhub_client] + :language: python + :dedent: 4 + :caption: Create a new instance of the Event Hub client + + """ + + def __init__( + self, address, username=None, password=None, sas_token=None, aad_credential=None, **kwargs): + """ + Constructs a new EventHubClient with the given address URL. + + :param address: The full URI string of the Event Hub. This can optionally + include URL-encoded access name and key. + :type address: str + :param username: The name of the shared access policy. This must be supplied + if not encoded into the address. + :type username: str + :param password: The shared access key. This must be supplied if not encoded + into the address. + :type password: str + :param debug: Whether to output network trace logs to the logger. Default + is `False`. + :type debug: bool + :param http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). + Additionally the following keys may also be present: 'username', 'password'. + :type http_proxy: dict[str, Any] + :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. + The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. + :type auth_timeout: int + :param sas_token: A SAS token or function that returns a SAS token. If a function is supplied, + it will be used to retrieve subsequent tokens in the case of token expiry. The function should + take no arguments. + :type sas_token: str or callable + """ + self.container_id = "eventhub.pysdk-" + str(uuid.uuid4())[:8] + self.sas_token = sas_token + self.address = urlparse(address) + self.aad_credential = aad_credential + self.eh_name = self.address.path.lstrip('/') + # self.http_proxy = kwargs.get("http_proxy") + self.keep_alive = kwargs.get("keep_alive", 30) + self.auto_reconnect = kwargs.get("auto_reconnect", True) + self.mgmt_target = "amqps://{}/{}".format(self.address.hostname, self.eh_name) + url_username = unquote_plus(self.address.username) if self.address.username else None + username = username or url_username + url_password = unquote_plus(self.address.password) if self.address.password else None + password = password or url_password + if (not username or not password) and not sas_token: + raise ValueError("Please supply either username and password, or a SAS token") + self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path) + self._auth_config = {'username': username, 'password': password} + self.get_auth = functools.partial(self._create_auth) + # self.debug = kwargs.get("debug", False) # debug + #self.auth_timeout = auth_timeout + + self.stopped = False + self.config = Configuration(**kwargs) + self.debug = self.config.network_tracing + + log.info("%r: Created the Event Hub client", self.container_id) + + @classmethod + def from_sas_token(cls, address, sas_token, eventhub=None, **kwargs): + """Create an EventHubClient from an existing auth token or token generator. + + :param address: The Event Hub address URL + :type address: str + :param sas_token: A SAS token or function that returns a SAS token. If a function is supplied, + it will be used to retrieve subsequent tokens in the case of token expiry. The function should + take no arguments. + :type sas_token: str or callable + :param eventhub: The name of the EventHub, if not already included in the address URL. + :type eventhub: str + :param debug: Whether to output network trace logs to the logger. Default + is `False`. + :type debug: bool + :param http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). + Additionally the following keys may also be present: 'username', 'password'. + :type http_proxy: dict[str, Any] + :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. + The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. + :type auth_timeout: int + + Example: + .. literalinclude:: ../examples/test_examples_eventhub.py + :start-after: [START create_eventhub_client_sas_token] + :end-before: [END create_eventhub_client_sas_token] + :language: python + :dedent: 4 + :caption: Create an EventHubClient from an existing auth token or token generator. + + """ + address = _build_uri(address, eventhub) + return cls(address, sas_token=sas_token, **kwargs) + + @classmethod + def from_connection_string(cls, conn_str, eventhub=None, **kwargs): + """Create an EventHubClient from a connection string. + + :param conn_str: The connection string. + :type conn_str: str + :param eventhub: The name of the EventHub, if the EntityName is + not included in the connection string. + :type eventhub: str + :param debug: Whether to output network trace logs to the logger. Default + is `False`. + :type debug: bool + :param http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). + Additionally the following keys may also be present: 'username', 'password'. + :type http_proxy: dict[str, Any] + :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. + The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. + :type auth_timeout: int + + Example: + .. literalinclude:: ../examples/test_examples_eventhub.py + :start-after: [START create_eventhub_client_connstr] + :end-before: [END create_eventhub_client_connstr] + :language: python + :dedent: 4 + :caption: Create an EventHubClient from a connection string. + + """ + address, policy, key, entity = _parse_conn_str(conn_str) + entity = eventhub or entity + address = _build_uri(address, entity) + return cls(address, username=policy, password=key, **kwargs) + + @classmethod + def from_iothub_connection_string(cls, conn_str, **kwargs): + """ + Create an EventHubClient from an IoTHub connection string. + + :param conn_str: The connection string. + :type conn_str: str + :param debug: Whether to output network trace logs to the logger. Default + is `False`. + :type debug: bool + :param http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). + Additionally the following keys may also be present: 'username', 'password'. + :type http_proxy: dict[str, Any] + :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. + The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. + :type auth_timeout: int + + Example: + .. literalinclude:: ../examples/test_examples_eventhub.py + :start-after: [START create_eventhub_client_iot_connstr] + :end-before: [END create_eventhub_client_iot_connstr] + :language: python + :dedent: 4 + :caption: Create an EventHubClient from an IoTHub connection string. + + """ + address, policy, key, _ = _parse_conn_str(conn_str) + hub_name = address.split('.')[0] + username = "{}@sas.root.{}".format(policy, hub_name) + password = _generate_sas_token(address, policy, key) + client = cls("amqps://" + address, username=username, password=password, **kwargs) + client._auth_config = { # pylint: disable=protected-access + 'iot_username': policy, + 'iot_password': key, + 'username': username, + 'password': password} + return client + + @classmethod + def from_aad_credential(cls, address, aad_credential, eventhub=None, **kwargs): + address = _build_uri(address, eventhub) + return cls(address, aad_credential=aad_credential, **kwargs) + + @abstractmethod + def _create_auth(self, username=None, password=None): + pass + + def create_properties(self): # pylint: disable=no-self-use + """ + Format the properties with which to instantiate the connection. + This acts like a user agent over HTTP. + + :rtype: dict + """ + properties = {} + properties["product"] = "eventhub.python" + properties["version"] = __version__ + properties["framework"] = "Python {}.{}.{}".format(*sys.version_info[0:3]) + properties["platform"] = sys.platform + return properties + + def _process_redirect_uri(self, redirect): + redirect_uri = redirect.address.decode('utf-8') + auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups") + self.address = urlparse(auth_uri) + self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path) + self.eh_name = self.address.path.lstrip('/') + self.mgmt_target = redirect_uri + + @abstractmethod + def get_eventhub_information(self): + pass + + @abstractmethod + def create_receiver( + self, consumer_group, partition, epoch=None, offset=None, prefetch=300, + operation=None): + pass + + @abstractmethod + def create_sender(self, partition=None, operation=None, send_timeout=60): + pass diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 76e315d2a25e..03a602616c4d 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -4,6 +4,7 @@ # -------------------------------------------------------------------------------------------- from __future__ import unicode_literals +from enum import Enum import datetime import calendar import json @@ -83,7 +84,7 @@ class EventData(object): PROP_TIMESTAMP = b"x-opt-enqueued-time" PROP_DEVICE_ID = b"iothub-connection-device-id" - def __init__(self, body=None, batch=None, to_device=None, message=None): + def __init__(self, body=None, to_device=None, message=None): """ Initialize EventData. @@ -102,9 +103,7 @@ def __init__(self, body=None, batch=None, to_device=None, message=None): self.msg_properties = MessageProperties() if to_device: self.msg_properties.to = '/devices/{}/messages/devicebound'.format(to_device) - if batch: - self.message = BatchMessage(data=batch, multi_messages=True, properties=self.msg_properties) - elif message: + if message: self.message = message self.msg_properties = message.properties self._annotations = message.annotations @@ -136,7 +135,7 @@ def offset(self): :rtype: ~azure.eventhub.common.Offset """ try: - return Offset(self._annotations[EventData.PROP_OFFSET].decode('UTF-8')) + return EventPosition(self._annotations[EventData.PROP_OFFSET].decode('UTF-8')) except (KeyError, AttributeError): return None @@ -208,7 +207,7 @@ def application_properties(self, value): :type value: dict """ self._app_properties = value - properties = dict(self._app_properties) + properties = None if value is None else dict(self._app_properties) self.message.application_properties = properties @property @@ -258,23 +257,32 @@ def body_as_json(self, encoding='UTF-8'): except Exception as e: raise TypeError("Event data is not compatible with JSON type: {}".format(e)) + def encode_message(self): + return self.message.encode_message() -class Offset(object): + +class _BatchSendEventData(EventData): + def __init__(self, batch_event_data): + # TODO: rethink if to_device should be included in + self.message = BatchMessage(data=batch_event_data, multi_messages=True, properties=None) + + +class EventPosition(object): """ - The offset (position or timestamp) where a receiver starts. Examples: + The position(offset, sequence or timestamp) where a receiver starts. Examples: Beginning of the event stream: - >>> offset = Offset("-1") + >>> event_pos = EventPosition("-1") End of the event stream: - >>> offset = Offset("@latest") + >>> event_pos = EventPosition("@latest") Events after the specified offset: - >>> offset = Offset("12345") + >>> event_pos = EventPosition("12345") Events from the specified offset: - >>> offset = Offset("12345", True) + >>> event_pos = EventPosition("12345", True) Events after a datetime: - >>> offset = Offset(datetime.datetime.utcnow()) + >>> event_pos = EventPosition(datetime.datetime.utcnow()) Events after a specific sequence number: - >>> offset = Offset(1506968696002) + >>> event_pos = EventPosition(1506968696002) """ def __init__(self, value, inclusive=False): @@ -299,10 +307,30 @@ def selector(self): if isinstance(self.value, datetime.datetime): timestamp = (calendar.timegm(self.value.utctimetuple()) * 1000) + (self.value.microsecond/1000) return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8') - if isinstance(self.value, six.integer_types): + elif isinstance(self.value, six.integer_types): return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8') return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8') + @staticmethod + def from_start_of_stream(): + return EventPosition("-1") + + @staticmethod + def from_end_of_stream(): + return EventPosition("@latest") + + @staticmethod + def from_offset(offset, inclusive=False): + return EventPosition(offset, inclusive) + + @staticmethod + def from_sequence(sequence, inclusive=False): + return EventPosition(sequence, inclusive) + + @staticmethod + def from_enqueued_time(enqueued_time, inclusive=False): + return EventPosition(enqueued_time, inclusive) + class EventHubError(Exception): """ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/configuration.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/configuration.py new file mode 100644 index 000000000000..2d7a7be57638 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/configuration.py @@ -0,0 +1,21 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from uamqp.constants import TransportType + + +class Configuration(object): + def __init__(self, **kwargs): + self.user_agent = kwargs.get("user_agent") + self.max_retries = kwargs.get("max_retries", 3) + self.network_tracing = kwargs.get("debug", False) + self.http_proxy = kwargs.get("http_proxy") + self.auto_reconnect = kwargs.get("auto_reconnect", False) + self.keep_alive = kwargs.get("keep_alive", 0) + self.transport_type = TransportType.AmqpOverWebsocket if self.http_proxy \ + else kwargs.get("transport_type", TransportType.Amqp) + self.auth_timeout = kwargs.get("auth_timeout", 60) + self.prefetch = kwargs.get("prefetch") + self.send_timeout = kwargs.get("send_timeout", 60) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/receiver.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/receiver.py index 486c75b3c682..4577d0332af5 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/receiver.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/receiver.py @@ -51,11 +51,12 @@ def __init__(self, client, source, offset=None, prefetch=300, epoch=None, keep_a self.client = client self.source = source self.offset = offset + self.iter_started = False self.prefetch = prefetch self.epoch = epoch self.keep_alive = keep_alive self.auto_reconnect = auto_reconnect - self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) + self.retry_policy = errors.ErrorPolicy(max_retries=self.client.config.max_retries, on_error=_error_handler) self.reconnect_backoff = 1 self.properties = None self.redirected = None @@ -79,6 +80,52 @@ def __init__(self, client, source, offset=None, prefetch=300, epoch=None, keep_a client_name=self.name, properties=self.client.create_properties()) + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close(exc_val) + + def __iter__(self): + if not self.running: + self.open() + if not self.iter_started: + self.iter_started = True + self.messages_iter = self._handler.receive_messages_iter() + return self + + def __next__(self): + while True: + try: + message = next(self.messages_iter) + event_data = EventData(message=message) + self.offset = event_data.offset + return event_data + except (errors.TokenExpired, errors.AuthenticationException): + log.info("Receiver disconnected due to token error. Attempting reconnect.") + self.reconnect() + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry and self.auto_reconnect: + log.info("Receiver detached. Attempting reconnect.") + self.reconnect() + log.info("Receiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: + log.info("Receiver detached. Attempting reconnect.") + self.reconnect() + log.info("Receiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error + except Exception as e: + log.info("Unexpected error occurred (%r). Shutting down.", e) + error = EventHubError("Receive failed: {}".format(e)) + self.close(exception=error) + raise error + def open(self): """ Open the Receiver using the supplied conneciton. @@ -288,7 +335,7 @@ def receive(self, max_batch_size=None, timeout=None): if self.error: raise self.error if not self.running: - raise ValueError("Unable to receive until client has been started.") + self.open() data_batch = [] try: timeout_ms = 1000 * timeout if timeout else 0 diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/sender.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/sender.py index 0a7334050a5f..ab113eac1c28 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/sender.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/sender.py @@ -10,8 +10,9 @@ from uamqp import constants, errors from uamqp import SendClient +from uamqp.constants import MessageSendResult -from azure.eventhub.common import EventHubError, _error_handler +from azure.eventhub.common import EventHubError, EventData, _BatchSendEventData, _error_handler log = logging.getLogger(__name__) @@ -60,7 +61,7 @@ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=N self.error = None self.keep_alive = keep_alive self.auto_reconnect = auto_reconnect - self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) + self.retry_policy = errors.ErrorPolicy(max_retries=self.client.config.max_retries, on_error=_error_handler) self.reconnect_backoff = 1 self.name = "EHSender-{}".format(uuid.uuid4()) if partition: @@ -78,6 +79,12 @@ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=N self._outcome = None self._condition = None + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close(exc_val) + def open(self): """ Open the Sender using the supplied conneciton. @@ -181,28 +188,6 @@ def get_handler_state(self): # pylint: disable=protected-access return self._handler._message_sender.get_state() - def has_started(self): - """ - Whether the handler has completed all start up processes such as - establishing the connection, session, link and authentication, and - is not ready to process messages. - **This function is now deprecated and will be removed in v2.0+.** - - :rtype: bool - """ - # pylint: disable=protected-access - timeout = False - auth_in_progress = False - if self._handler._connection.cbs: - timeout, auth_in_progress = self._handler._auth.handle_token() - if timeout: - raise EventHubError("Authorization timeout.") - if auth_in_progress: - return False - if not self._handler._client_ready(): - return False - return True - def close(self, exception=None): """ Close down the handler. If the handler has already closed, @@ -235,37 +220,12 @@ def close(self, exception=None): self.error = EventHubError("This send handler is now closed.") self._handler.close() - def send(self, event_data): - """ - Sends an event data and blocks until acknowledgement is - received or operation times out. - - :param event_data: The event to be sent. - :type event_data: ~azure.eventhub.common.EventData - :raises: ~azure.eventhub.common.EventHubError if the message fails to - send. - :return: The outcome of the message send. - :rtype: ~uamqp.constants.MessageSendResult - - Example: - .. literalinclude:: ../examples/test_examples_eventhub.py - :start-after: [START eventhub_client_sync_send] - :end-before: [END eventhub_client_sync_send] - :language: python - :dedent: 4 - :caption: Sends an event data and blocks until acknowledgement is received or operation times out. - - """ - if self.error: - raise self.error + def _send_event_data(self, event_data): if not self.running: - raise ValueError("Unable to send until client has been started.") - if event_data.partition_key and self.partition: - raise ValueError("EventData partition key cannot be used with a partition sender.") - event_data.message.on_send_complete = self._on_outcome + self.open() try: self._handler.send_message(event_data.message) - if self._outcome != constants.MessageSendResult.Ok: + if self._outcome != MessageSendResult.Ok: raise Sender._error(self._outcome, self._condition) except errors.MessageException as failed: error = EventHubError(str(failed), failed) @@ -300,7 +260,76 @@ def send(self, event_data): else: return self._outcome - def transfer(self, event_data, callback=None): + def send(self, event_data): + """ + Sends an event data and blocks until acknowledgement is + received or operation times out. + + :param event_data: The event to be sent. + :type event_data: ~azure.eventhub.common.EventData + :raises: ~azure.eventhub.common.EventHubError if the message fails to + send. + :return: The outcome of the message send. + :rtype: ~uamqp.constants.MessageSendResult + + Example: + .. literalinclude:: ../examples/test_examples_eventhub.py + :start-after: [START eventhub_client_sync_send] + :end-before: [END eventhub_client_sync_send] + :language: python + :dedent: 4 + :caption: Sends an event data and blocks until acknowledgement is received or operation times out. + + """ + if self.error: + raise self.error + if event_data.partition_key and self.partition: + raise ValueError("EventData partition key cannot be used with a partition sender.") + event_data.message.on_send_complete = self._on_outcome + return self._send_event_data(event_data) + + def send_batch(self, batch_event_data): + """ + Sends an event data and blocks until acknowledgement is + received or operation times out. + + :param event_data: The event to be sent. + :type event_data: ~azure.eventhub.common.EventData + :raises: ~azure.eventhub.common.EventHubError if the message fails to + send. + :return: The outcome of the message send. + :rtype: ~uamqp.constants.MessageSendResult + + Example: + .. literalinclude:: ../examples/test_examples_eventhub.py + :start-after: [START eventhub_client_sync_send] + :end-before: [END eventhub_client_sync_send] + :language: python + :dedent: 4 + :caption: Sends an event data and blocks until acknowledgement is received or operation times out. + + """ + if self.error: + raise self.error + + def verify_partition(event_datas): + ed_iter = iter(event_datas) + try: + ed = next(ed_iter) + partition_key = ed.partition_key + yield ed + except StopIteration: + raise ValueError("batch_event_data must not be empty") + for ed in ed_iter: + if ed.partition_key != partition_key: + raise ValueError("partition key of all EventData must be the same if being sent in a batch") + yield ed + + wrapper_event_data = _BatchSendEventData(verify_partition(batch_event_data)) + wrapper_event_data.message.on_send_complete = self._on_outcome + return self._send_event_data(wrapper_event_data) + + def queue_message(self, event_data, callback=None): """ Transfers an event data and notifies the callback when the operation is done. @@ -322,14 +351,14 @@ def transfer(self, event_data, callback=None): if self.error: raise self.error if not self.running: - raise ValueError("Unable to send until client has been started.") + self.open() if event_data.partition_key and self.partition: raise ValueError("EventData partition key cannot be used with a partition sender.") if callback: event_data.message.on_send_complete = lambda o, c: callback(o, Sender._error(o, c)) self._handler.queue_message(event_data.message) - def wait(self): + def send_pending_messages(self): """ Wait until all transferred events have been sent. @@ -345,7 +374,7 @@ def wait(self): if self.error: raise self.error if not self.running: - raise ValueError("Unable to send until client has been started.") + self.open() try: self._handler.wait() except (errors.TokenExpired, errors.AuthenticationException): @@ -385,4 +414,4 @@ def _on_outcome(self, outcome, condition): @staticmethod def _error(outcome, condition): - return None if outcome == constants.MessageSendResult.Ok else EventHubError(outcome, condition) + return None if outcome == MessageSendResult.Ok else EventHubError(outcome, condition) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventprocessorhost/eh_partition_pump.py b/sdk/eventhub/azure-eventhubs/azure/eventprocessorhost/eh_partition_pump.py index e0aa25dc2e8d..d2c649f9a0a6 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventprocessorhost/eh_partition_pump.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventprocessorhost/eh_partition_pump.py @@ -5,7 +5,8 @@ import logging import asyncio -from azure.eventhub import Offset, EventHubClientAsync +from azure.eventhub import EventPosition +from azure.eventhub.aio import EventHubClient from azure.eventprocessorhost.partition_pump import PartitionPump @@ -64,14 +65,14 @@ async def open_clients_async(self): """ await self.partition_context.get_initial_offset_async() # Create event hub client and receive handler and set options - self.eh_client = EventHubClientAsync( + self.eh_client = EventHubClient( self.host.eh_config.client_address, debug=self.host.eph_options.debug_trace, http_proxy=self.host.eph_options.http_proxy) self.partition_receive_handler = self.eh_client.add_async_receiver( self.partition_context.consumer_group_name, self.partition_context.partition_id, - Offset(self.partition_context.offset), + EventPosition(self.partition_context.offset), prefetch=self.host.eph_options.prefetch_count, keep_alive=self.host.eph_options.keep_alive_interval, auto_reconnect=self.host.eph_options.auto_reconnect_on_error, diff --git a/sdk/eventhub/azure-eventhubs/azure/eventprocessorhost/partition_manager.py b/sdk/eventhub/azure-eventhubs/azure/eventprocessorhost/partition_manager.py index 41aaded73b56..d532846a5476 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventprocessorhost/partition_manager.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventprocessorhost/partition_manager.py @@ -8,7 +8,8 @@ from queue import Queue from collections import Counter -from azure.eventhub import EventHubClientAsync +from azure.eventhub.aio import EventHubClient + from azure.eventprocessorhost.eh_partition_pump import EventHubPartitionPump from azure.eventprocessorhost.cancellation_token import CancellationToken @@ -36,7 +37,7 @@ async def get_partition_ids_async(self): """ if not self.partition_ids: try: - eh_client = EventHubClientAsync( + eh_client = EventHubClient( self.host.eh_config.client_address, debug=self.host.eph_options.debug_trace, http_proxy=self.host.eph_options.http_proxy) diff --git a/sdk/eventhub/azure-eventhubs/conftest.py b/sdk/eventhub/azure-eventhubs/conftest.py index 237a60918f17..ce6f83adc6af 100644 --- a/sdk/eventhub/azure-eventhubs/conftest.py +++ b/sdk/eventhub/azure-eventhubs/conftest.py @@ -19,7 +19,7 @@ collect_ignore.append("examples/async_examples") else: sys.path.append(os.path.join(os.path.dirname(__file__), "tests")) - from asynctests import MockEventProcessor + from tests.asynctests import MockEventProcessor from azure.eventprocessorhost import EventProcessorHost from azure.eventprocessorhost import EventHubPartitionPump from azure.eventprocessorhost import AzureStorageCheckpointLeaseManager @@ -29,8 +29,7 @@ from azure.eventprocessorhost.partition_pump import PartitionPump from azure.eventprocessorhost.partition_manager import PartitionManager -from azure import eventhub -from azure.eventhub import EventHubClient, Receiver, Offset +from azure.eventhub import EventHubClient, Receiver, EventPosition def get_logger(filename, level=logging.INFO): @@ -71,7 +70,7 @@ def create_eventhub(eventhub_config, client=None): raise ValueError("EventHub creation failed.") -def cleanup_eventhub(servicebus_config, hub_name, client=None): +def cleanup_eventhub(eventhub_config, hub_name, client=None): from azure.servicebus.control_client import ServiceBusService client = client or ServiceBusService( service_namespace=eventhub_config['namespace'], @@ -166,36 +165,34 @@ def device_id(): @pytest.fixture() def connstr_receivers(connection_str): client = EventHubClient.from_connection_string(connection_str, debug=False) - eh_hub_info = client.get_eventhub_info() + eh_hub_info = client.get_eventhub_information() partitions = eh_hub_info["partition_ids"] - recv_offset = Offset("@latest") + recv_offset = EventPosition("@latest") receivers = [] for p in partitions: - receivers.append(client.add_receiver("$default", p, prefetch=500, offset=Offset("@latest"))) - - client.run() - - for r in receivers: - r.receive(timeout=1) + receiver = client.create_receiver("$default", p, prefetch=500, offset=EventPosition("@latest")) + receivers.append(receiver) + receiver.receive(timeout=1) yield connection_str, receivers - client.stop() + for r in receivers: + r.close() @pytest.fixture() def connstr_senders(connection_str): client = EventHubClient.from_connection_string(connection_str, debug=True) - eh_hub_info = client.get_eventhub_info() + eh_hub_info = client.get_eventhub_information() partitions = eh_hub_info["partition_ids"] senders = [] for p in partitions: - senders.append(client.add_sender(partition=p)) - - client.run() + sender = client.create_sender(partition=p) + senders.append(sender) yield connection_str, senders - client.stop() + for s in senders: + s.close() @pytest.fixture() diff --git a/sdk/eventhub/azure-eventhubs/setup.py b/sdk/eventhub/azure-eventhubs/setup.py index 034bdc14c115..1fdb12ec33d8 100644 --- a/sdk/eventhub/azure-eventhubs/setup.py +++ b/sdk/eventhub/azure-eventhubs/setup.py @@ -62,7 +62,7 @@ "tests", "tests.asynctests"]), install_requires=[ - 'uamqp~=1.1.0', + 'uamqp~=1.2.0', 'msrestazure>=0.4.32,<2.0.0', 'azure-common~=1.1', 'azure-storage-blob~=1.3' diff --git a/sdk/eventhub/azure-eventhubs/tests/__init__.py b/sdk/eventhub/azure-eventhubs/tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py index 6b086ff8202c..1be11107dae0 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py @@ -9,18 +9,17 @@ import pytest import time -from azure import eventhub -from azure.eventhub import EventData, Offset, EventHubError, EventHubClientAsync +from azure.eventhub import EventData, EventPosition, EventHubError +from azure.eventhub.aio import EventHubClient @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_end_of_stream_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - receiver = client.add_async_receiver("$default", "0", offset=Offset('@latest')) - await client.run_async() - try: + client = EventHubClient.from_connection_string(connection_str, debug=False) + receiver = client.create_receiver("$default", "0", offset=EventPosition('@latest')) + async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 senders[0].send(EventData(b"Receiving only a single event")) @@ -28,20 +27,15 @@ async def test_receive_end_of_stream_async(connstr_senders): assert len(received) == 1 assert list(received[-1].body)[0] == b"Receiving only a single event" - except: - raise - finally: - await client.stop_async() @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_with_offset_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - receiver = client.add_async_receiver("$default", "0", offset=Offset('@latest')) - await client.run_async() - try: + client = EventHubClient.from_connection_string(connection_str, debug=False) + receiver = client.create_receiver("$default", "0", offset=EventPosition('@latest')) + async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 senders[0].send(EventData(b"Data")) @@ -50,27 +44,22 @@ async def test_receive_with_offset_async(connstr_senders): assert len(received) == 1 offset = received[0].offset - offset_receiver = client.add_async_receiver("$default", "0", offset=offset) - await client.run_async() - received = await offset_receiver.receive(timeout=5) - assert len(received) == 0 - senders[0].send(EventData(b"Message after offset")) - received = await offset_receiver.receive(timeout=5) - assert len(received) == 1 - except: - raise - finally: - await client.stop_async() + offset_receiver = client.create_receiver("$default", "0", offset=offset) + async with offset_receiver: + received = await offset_receiver.receive(timeout=5) + assert len(received) == 0 + senders[0].send(EventData(b"Message after offset")) + received = await offset_receiver.receive(timeout=5) + assert len(received) == 1 @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_with_inclusive_offset_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - receiver = client.add_async_receiver("$default", "0", offset=Offset('@latest')) - await client.run_async() - try: + client = EventHubClient.from_connection_string(connection_str, debug=False) + receiver = client.create_receiver("$default", "0", offset=EventPosition('@latest')) + async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 senders[0].send(EventData(b"Data")) @@ -79,24 +68,19 @@ async def test_receive_with_inclusive_offset_async(connstr_senders): assert len(received) == 1 offset = received[0].offset - offset_receiver = client.add_async_receiver("$default", "0", offset=Offset(offset.value, inclusive=True)) - await client.run_async() - received = await offset_receiver.receive(timeout=5) - assert len(received) == 1 - except: - raise - finally: - await client.stop_async() + offset_receiver = client.create_receiver("$default", "0", offset=EventPosition(offset.value, inclusive=True)) + async with offset_receiver: + received = await offset_receiver.receive(timeout=5) + assert len(received) == 1 @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_with_datetime_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - receiver = client.add_async_receiver("$default", "0", offset=Offset('@latest')) - await client.run_async() - try: + client = EventHubClient.from_connection_string(connection_str, debug=False) + receiver = client.create_receiver("$default", "0", offset=EventPosition('@latest')) + async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 senders[0].send(EventData(b"Data")) @@ -104,28 +88,23 @@ async def test_receive_with_datetime_async(connstr_senders): assert len(received) == 1 offset = received[0].enqueued_time - offset_receiver = client.add_async_receiver("$default", "0", offset=Offset(offset)) - await client.run_async() - received = await offset_receiver.receive(timeout=5) - assert len(received) == 0 - senders[0].send(EventData(b"Message after timestamp")) - time.sleep(1) - received = await offset_receiver.receive(timeout=5) - assert len(received) == 1 - except: - raise - finally: - await client.stop_async() + offset_receiver = client.create_receiver("$default", "0", offset=EventPosition(offset)) + async with offset_receiver: + received = await offset_receiver.receive(timeout=5) + assert len(received) == 0 + senders[0].send(EventData(b"Message after timestamp")) + time.sleep(1) + received = await offset_receiver.receive(timeout=5) + assert len(received) == 1 @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_with_sequence_no_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - receiver = client.add_async_receiver("$default", "0", offset=Offset('@latest')) - await client.run_async() - try: + client = EventHubClient.from_connection_string(connection_str, debug=False) + receiver = client.create_receiver("$default", "0", offset=EventPosition('@latest')) + async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 senders[0].send(EventData(b"Data")) @@ -133,28 +112,23 @@ async def test_receive_with_sequence_no_async(connstr_senders): assert len(received) == 1 offset = received[0].sequence_number - offset_receiver = client.add_async_receiver("$default", "0", offset=Offset(offset)) - await client.run_async() - received = await offset_receiver.receive(timeout=5) - assert len(received) == 0 - senders[0].send(EventData(b"Message next in sequence")) - time.sleep(1) - received = await offset_receiver.receive(timeout=5) - assert len(received) == 1 - except: - raise - finally: - await client.stop_async() + offset_receiver = client.create_receiver("$default", "0", offset=EventPosition(offset)) + async with offset_receiver: + received = await offset_receiver.receive(timeout=5) + assert len(received) == 0 + senders[0].send(EventData(b"Message next in sequence")) + time.sleep(1) + received = await offset_receiver.receive(timeout=5) + assert len(received) == 1 @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_with_inclusive_sequence_no_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - receiver = client.add_async_receiver("$default", "0", offset=Offset('@latest')) - await client.run_async() - try: + client = EventHubClient.from_connection_string(connection_str, debug=False) + receiver = client.create_receiver("$default", "0", offset=EventPosition('@latest')) + async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 senders[0].send(EventData(b"Data")) @@ -162,34 +136,25 @@ async def test_receive_with_inclusive_sequence_no_async(connstr_senders): assert len(received) == 1 offset = received[0].sequence_number - offset_receiver = client.add_async_receiver("$default", "0", offset=Offset(offset, inclusive=True)) - await client.run_async() - received = await offset_receiver.receive(timeout=5) - assert len(received) == 1 - except: - raise - finally: - await client.stop_async() + offset_receiver = client.create_receiver("$default", "0", offset=EventPosition(offset, inclusive=True)) + async with offset_receiver: + received = await offset_receiver.receive(timeout=5) + assert len(received) == 1 @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_batch_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - receiver = client.add_async_receiver("$default", "0", prefetch=500, offset=Offset('@latest')) - await client.run_async() - try: + client = EventHubClient.from_connection_string(connection_str, debug=False) + receiver = client.create_receiver("$default", "0", prefetch=500, offset=EventPosition('@latest')) + async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 for i in range(10): senders[0].send(EventData(b"Data")) received = await receiver.receive(max_batch_size=5, timeout=5) assert len(received) == 5 - except: - raise - finally: - await client.stop_async() async def pump(receiver, sleep=None): @@ -213,22 +178,16 @@ async def test_epoch_receiver_async(connstr_senders): connection_str, senders = connstr_senders senders[0].send(EventData(b"Receiving only a single event")) - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) + client = EventHubClient.from_connection_string(connection_str, debug=False) receivers = [] for epoch in [10, 20]: - receivers.append(client.add_async_epoch_receiver("$default", "0", epoch, prefetch=5)) - try: - await client.run_async() - outputs = await asyncio.gather( - pump(receivers[0]), - pump(receivers[1]), - return_exceptions=True) - assert isinstance(outputs[0], EventHubError) - assert outputs[1] == 1 - except: - raise - finally: - await client.stop_async() + receivers.append(client.create_epoch_receiver("$default", "0", epoch, prefetch=5)) + outputs = await asyncio.gather( + pump(receivers[0]), + pump(receivers[1]), + return_exceptions=True) + assert isinstance(outputs[0], EventHubError) + assert outputs[1] == 1 @pytest.mark.liveTest @@ -237,15 +196,14 @@ async def test_multiple_receiver_async(connstr_senders): connection_str, senders = connstr_senders senders[0].send(EventData(b"Receiving only a single event")) - client = EventHubClientAsync.from_connection_string(connection_str, debug=True) - partitions = await client.get_eventhub_info_async() + client = EventHubClient.from_connection_string(connection_str, debug=True) + partitions = await client.get_eventhub_information() assert partitions["partition_ids"] == ["0", "1"] receivers = [] for i in range(2): - receivers.append(client.add_async_receiver("$default", "0", prefetch=10)) + receivers.append(client.create_receiver("$default", "0", prefetch=10)) try: - await client.run_async() - more_partitions = await client.get_eventhub_info_async() + more_partitions = await client.get_eventhub_information() assert more_partitions["partition_ids"] == ["0", "1"] outputs = await asyncio.gather( pump(receivers[0]), @@ -253,10 +211,9 @@ async def test_multiple_receiver_async(connstr_senders): return_exceptions=True) assert isinstance(outputs[0], int) and outputs[0] == 1 assert isinstance(outputs[1], int) and outputs[1] == 1 - except: - raise finally: - await client.stop_async() + for r in receivers: + await r.close() @pytest.mark.liveTest @@ -265,22 +222,20 @@ async def test_epoch_receiver_after_non_epoch_receiver_async(connstr_senders): connection_str, senders = connstr_senders senders[0].send(EventData(b"Receiving only a single event")) - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) + client = EventHubClient.from_connection_string(connection_str, debug=False) receivers = [] - receivers.append(client.add_async_receiver("$default", "0", prefetch=10)) - receivers.append(client.add_async_epoch_receiver("$default", "0", 15, prefetch=10)) + receivers.append(client.create_receiver("$default", "0", prefetch=10)) + receivers.append(client.create_epoch_receiver("$default", "0", 15, prefetch=10)) try: - await client.run_async() outputs = await asyncio.gather( pump(receivers[0]), pump(receivers[1], sleep=5), return_exceptions=True) assert isinstance(outputs[0], EventHubError) assert isinstance(outputs[1], int) and outputs[1] == 1 - except: - raise finally: - await client.stop_async() + for r in receivers: + await r.close() @pytest.mark.liveTest @@ -289,51 +244,48 @@ async def test_non_epoch_receiver_after_epoch_receiver_async(connstr_senders): connection_str, senders = connstr_senders senders[0].send(EventData(b"Receiving only a single event")) - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) + client = EventHubClient.from_connection_string(connection_str, debug=False) receivers = [] - receivers.append(client.add_async_epoch_receiver("$default", "0", 15, prefetch=10)) - receivers.append(client.add_async_receiver("$default", "0", prefetch=10)) + receivers.append(client.create_epoch_receiver("$default", "0", 15, prefetch=10)) + receivers.append(client.create_receiver("$default", "0", prefetch=10)) try: - await client.run_async() outputs = await asyncio.gather( pump(receivers[0]), pump(receivers[1]), return_exceptions=True) assert isinstance(outputs[1], EventHubError) assert isinstance(outputs[0], int) and outputs[0] == 1 - except: - raise finally: - await client.stop_async() + for r in receivers: + await r.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_batch_with_app_prop_async(connstr_senders): - pytest.skip("Waiting on uAMQP release") + #pytest.skip("Waiting on uAMQP release") connection_str, senders = connstr_senders + app_prop_key = "raw_prop" + app_prop_value = "raw_value" + app_prop = {app_prop_key: app_prop_value} def batched(): for i in range(10): - yield "Event Data {}".format(i) + ed = EventData("Event Data {}".format(i)) + ed.application_properties = app_prop + yield ed for i in range(10, 20): - yield EventData("Event Data {}".format(i)) - - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - receiver = client.add_async_receiver("$default", "0", prefetch=500, offset=Offset('@latest')) - try: - await client.run_async() + ed = EventData("Event Data {}".format(i)) + ed.application_properties = app_prop + yield ed + client = EventHubClient.from_connection_string(connection_str, debug=False) + receiver = client.create_receiver("$default", "0", prefetch=500, offset=EventPosition('@latest')) + async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 - app_prop_key = "raw_prop" - app_prop_value = "raw_value" - batch_app_prop = {app_prop_key:app_prop_value} - batch_event = EventData(batch=batched()) - batch_event.application_properties = batch_app_prop - - senders[0].send(batch_event) + senders[0].send_batch(batched()) await asyncio.sleep(1) @@ -344,7 +296,3 @@ def batched(): assert list(message.body)[0] == "Event Data {}".format(index).encode('utf-8') assert (app_prop_key.encode('utf-8') in message.application_properties) \ and (dict(message.application_properties)[app_prop_key.encode('utf-8')] == app_prop_value.encode('utf-8')) - except: - raise - finally: - await client.stop_async() diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py index 917d7cde3b63..b17dad9cae2c 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py @@ -11,16 +11,16 @@ import time import json -from azure.eventhub import EventData, EventHubClientAsync +from azure.eventhub import EventData +from azure.eventhub.aio import EventHubClient @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_with_partition_key_async(connstr_receivers): connection_str, receivers = connstr_receivers - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - sender = client.add_async_sender() - await client.run_async() + client = EventHubClient.from_connection_string(connection_str, debug=False) + sender = client.create_sender() data_val = 0 for partition in [b"a", b"b", b"c", b"d", b"e", b"f"]: @@ -30,7 +30,6 @@ async def test_send_with_partition_key_async(connstr_receivers): data.partition_key = partition_key data_val += 1 await sender.send(data) - await client.stop_async() found_partition_keys = {} for index, partition in enumerate(receivers): @@ -47,15 +46,10 @@ async def test_send_with_partition_key_async(connstr_receivers): @pytest.mark.asyncio async def test_send_and_receive_zero_length_body_async(connstr_receivers): connection_str, receivers = connstr_receivers - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - sender = client.add_async_sender() - try: - await client.run_async() + client = EventHubClient.from_connection_string(connection_str, debug=False) + sender = client.create_sender() + async with sender: await sender.send(EventData("")) - except: - raise - finally: - await client.stop_async() received = [] for r in receivers: @@ -69,15 +63,10 @@ async def test_send_and_receive_zero_length_body_async(connstr_receivers): @pytest.mark.asyncio async def test_send_single_event_async(connstr_receivers): connection_str, receivers = connstr_receivers - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - sender = client.add_async_sender() - try: - await client.run_async() + client = EventHubClient.from_connection_string(connection_str, debug=False) + sender = client.create_sender() + async with sender: await sender.send(EventData(b"A single event")) - except: - raise - finally: - await client.stop_async() received = [] for r in receivers: @@ -93,17 +82,12 @@ async def test_send_batch_async(connstr_receivers): connection_str, receivers = connstr_receivers def batched(): for i in range(10): - yield "Event number {}".format(i) + yield EventData("Event number {}".format(i)) - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - sender = client.add_async_sender() - try: - await client.run_async() - await sender.send(EventData(batch=batched())) - except: - raise - finally: - await client.stop_async() + client = EventHubClient.from_connection_string(connection_str, debug=False) + sender = client.create_sender() + async with sender: + await sender.send_batch(batched()) time.sleep(1) received = [] @@ -119,15 +103,10 @@ def batched(): @pytest.mark.asyncio async def test_send_partition_async(connstr_receivers): connection_str, receivers = connstr_receivers - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - sender = client.add_async_sender(partition="1") - try: - await client.run_async() + client = EventHubClient.from_connection_string(connection_str, debug=False) + sender = client.create_sender(partition="1") + async with sender: await sender.send(EventData(b"Data")) - except: - raise - finally: - await client.stop_async() partition_0 = receivers[0].receive(timeout=2) assert len(partition_0) == 0 @@ -139,16 +118,11 @@ async def test_send_partition_async(connstr_receivers): @pytest.mark.asyncio async def test_send_non_ascii_async(connstr_receivers): connection_str, receivers = connstr_receivers - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - sender = client.add_async_sender(partition="0") - try: - await client.run_async() + client = EventHubClient.from_connection_string(connection_str, debug=False) + sender = client.create_sender(partition="0") + async with sender: await sender.send(EventData("é,è,à,ù,â,ê,î,ô,û")) await sender.send(EventData(json.dumps({"foo": "漢字"}))) - except: - raise - finally: - await client.stop_async() partition_0 = receivers[0].receive(timeout=2) assert len(partition_0) == 2 @@ -160,19 +134,15 @@ async def test_send_non_ascii_async(connstr_receivers): @pytest.mark.asyncio async def test_send_partition_batch_async(connstr_receivers): connection_str, receivers = connstr_receivers + def batched(): for i in range(10): - yield "Event number {}".format(i) + yield EventData("Event number {}".format(i)) - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - sender = client.add_async_sender(partition="1") - try: - await client.run_async() - await sender.send(EventData(batch=batched())) - except: - raise - finally: - await client.stop_async() + client = EventHubClient.from_connection_string(connection_str, debug=False) + sender = client.create_sender(partition="1") + async with sender: + await sender.send_batch(batched()) partition_0 = receivers[0].receive(timeout=2) assert len(partition_0) == 0 @@ -184,15 +154,10 @@ def batched(): @pytest.mark.asyncio async def test_send_array_async(connstr_receivers): connection_str, receivers = connstr_receivers - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - sender = client.add_async_sender() - try: - await client.run_async() + client = EventHubClient.from_connection_string(connection_str, debug=False) + sender = client.create_sender() + async with sender: await sender.send(EventData([b"A", b"B", b"C"])) - except: - raise - finally: - await client.stop_async() received = [] for r in receivers: @@ -206,17 +171,12 @@ async def test_send_array_async(connstr_receivers): @pytest.mark.asyncio async def test_send_multiple_clients_async(connstr_receivers): connection_str, receivers = connstr_receivers - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - sender_0 = client.add_async_sender(partition="0") - sender_1 = client.add_async_sender(partition="1") - try: - await client.run_async() + client = EventHubClient.from_connection_string(connection_str, debug=False) + sender_0 = client.create_sender(partition="0") + sender_1 = client.create_sender(partition="1") + async with sender_0 and sender_1: await sender_0.send(EventData(b"Message 0")) await sender_1.send(EventData(b"Message 1")) - except: - raise - finally: - await client.stop_async() partition_0 = receivers[0].receive(timeout=2) assert len(partition_0) == 1 @@ -227,31 +187,26 @@ async def test_send_multiple_clients_async(connstr_receivers): @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_batch_with_app_prop_async(connstr_receivers): - pytest.skip("Waiting on uAMQP release") + # pytest.skip("Waiting on uAMQP release") connection_str, receivers = connstr_receivers + app_prop_key = "raw_prop" + app_prop_value = "raw_value" + app_prop = {app_prop_key: app_prop_value} def batched(): for i in range(10): + ed = EventData("Event number {}".format(i)) + ed.application_properties = app_prop yield "Event number {}".format(i) for i in range(10, 20): - yield EventData("Event number {}".format(i)) + ed = EventData("Event number {}".format(i)) + ed.application_properties = app_prop + yield "Event number {}".format(i) - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) - sender = client.add_async_sender() - try: - await client.run_async() - - app_prop_key = "raw_prop" - app_prop_value = "raw_value" - batch_app_prop = {app_prop_key:app_prop_value} - batch_event = EventData(batch=batched()) - batch_event.application_properties = batch_app_prop - - await sender.send(batch_event) - except: - raise - finally: - await client.stop_async() + client = EventHubClient.from_connection_string(connection_str, debug=False) + sender = client.create_sender() + async with sender: + await sender.send_batch(batched()) time.sleep(1) diff --git a/sdk/eventhub/azure-eventhubs/tests/test_receive.py b/sdk/eventhub/azure-eventhubs/tests/test_receive.py index 0b05bf78c842..51fbb3a6079a 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_receive.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_receive.py @@ -9,14 +9,13 @@ import time import datetime -from azure import eventhub -from azure.eventhub import EventData, EventHubClient, Offset +from azure.eventhub import EventData, EventHubClient, EventPosition # def test_receive_without_events(connstr_senders): # connection_str, senders = connstr_senders # client = EventHubClient.from_connection_string(connection_str, debug=True) -# receiver = client.add_receiver("$default", "0", offset=Offset('@latest')) +# receiver = client.create_receiver("$default", "0", offset=EventPosition('@latest')) # finish = datetime.datetime.now() + datetime.timedelta(seconds=240) # count = 0 # try: @@ -38,10 +37,8 @@ def test_receive_end_of_stream(connstr_senders): connection_str, senders = connstr_senders client = EventHubClient.from_connection_string(connection_str, debug=False) - receiver = client.add_receiver("$default", "0", offset=Offset('@latest')) - try: - client.run() - + receiver = client.create_receiver("$default", "0", offset=EventPosition('@latest')) + with receiver: received = receiver.receive(timeout=5) assert len(received) == 0 senders[0].send(EventData(b"Receiving only a single event")) @@ -50,22 +47,17 @@ def test_receive_end_of_stream(connstr_senders): assert received[0].body_as_str() == "Receiving only a single event" assert list(received[-1].body)[0] == b"Receiving only a single event" - except: - raise - finally: - client.stop() @pytest.mark.liveTest def test_receive_with_offset_sync(connstr_senders): connection_str, senders = connstr_senders client = EventHubClient.from_connection_string(connection_str, debug=False) - partitions = client.get_eventhub_info() + partitions = client.get_eventhub_information() assert partitions["partition_ids"] == ["0", "1"] - receiver = client.add_receiver("$default", "0", offset=Offset('@latest')) - try: - client.run() - more_partitions = client.get_eventhub_info() + receiver = client.create_receiver("$default", "0", offset=EventPosition('@latest')) + with receiver: + more_partitions = client.get_eventhub_information() assert more_partitions["partition_ids"] == ["0", "1"] received = receiver.receive(timeout=5) @@ -78,27 +70,22 @@ def test_receive_with_offset_sync(connstr_senders): assert list(received[0].body) == [b'Data'] assert received[0].body_as_str() == "Data" - offset_receiver = client.add_receiver("$default", "0", offset=offset) - client.run() - received = offset_receiver.receive(timeout=5) - assert len(received) == 0 - senders[0].send(EventData(b"Message after offset")) - received = offset_receiver.receive(timeout=5) - assert len(received) == 1 - except: - raise - finally: - client.stop() + offset_receiver = client.create_receiver("$default", "0", offset=offset) + with offset_receiver: + received = offset_receiver.receive(timeout=5) + assert len(received) == 0 + senders[0].send(EventData(b"Message after offset")) + received = offset_receiver.receive(timeout=5) + assert len(received) == 1 @pytest.mark.liveTest def test_receive_with_inclusive_offset(connstr_senders): connection_str, senders = connstr_senders client = EventHubClient.from_connection_string(connection_str, debug=False) - receiver = client.add_receiver("$default", "0", offset=Offset('@latest')) - try: - client.run() + receiver = client.create_receiver("$default", "0", offset=EventPosition('@latest')) + with receiver: received = receiver.receive(timeout=5) assert len(received) == 0 senders[0].send(EventData(b"Data")) @@ -110,26 +97,21 @@ def test_receive_with_inclusive_offset(connstr_senders): assert list(received[0].body) == [b'Data'] assert received[0].body_as_str() == "Data" - offset_receiver = client.add_receiver("$default", "0", offset=Offset(offset.value, inclusive=True)) - client.run() - received = offset_receiver.receive(timeout=5) - assert len(received) == 1 - except: - raise - finally: - client.stop() + offset_receiver = client.create_receiver("$default", "0", offset=EventPosition(offset.value, inclusive=True)) + with offset_receiver: + received = offset_receiver.receive(timeout=5) + assert len(received) == 1 @pytest.mark.liveTest def test_receive_with_datetime_sync(connstr_senders): connection_str, senders = connstr_senders client = EventHubClient.from_connection_string(connection_str, debug=False) - partitions = client.get_eventhub_info() + partitions = client.get_eventhub_information() assert partitions["partition_ids"] == ["0", "1"] - receiver = client.add_receiver("$default", "0", offset=Offset('@latest')) - try: - client.run() - more_partitions = client.get_eventhub_info() + receiver = client.create_receiver("$default", "0", offset=EventPosition('@latest')) + with receiver: + more_partitions = client.get_eventhub_information() assert more_partitions["partition_ids"] == ["0", "1"] received = receiver.receive(timeout=5) assert len(received) == 0 @@ -141,17 +123,13 @@ def test_receive_with_datetime_sync(connstr_senders): assert list(received[0].body) == [b'Data'] assert received[0].body_as_str() == "Data" - offset_receiver = client.add_receiver("$default", "0", offset=Offset(offset)) - client.run() - received = offset_receiver.receive(timeout=5) - assert len(received) == 0 - senders[0].send(EventData(b"Message after timestamp")) - received = offset_receiver.receive(timeout=5) - assert len(received) == 1 - except: - raise - finally: - client.stop() + offset_receiver = client.create_receiver("$default", "0", offset=EventPosition(offset)) + with offset_receiver: + received = offset_receiver.receive(timeout=5) + assert len(received) == 0 + senders[0].send(EventData(b"Message after timestamp")) + received = offset_receiver.receive(timeout=5) + assert len(received) == 1 @pytest.mark.liveTest @@ -167,9 +145,8 @@ def test_receive_with_custom_datetime_sync(connstr_senders): for i in range(5): senders[0].send(EventData(b"Message after timestamp")) - receiver = client.add_receiver("$default", "0", offset=Offset(offset)) - try: - client.run() + receiver = client.create_receiver("$default", "0", offset=EventPosition(offset)) + with receiver: all_received = [] received = receiver.receive(timeout=1) while received: @@ -180,20 +157,14 @@ def test_receive_with_custom_datetime_sync(connstr_senders): for received_event in all_received: assert received_event.body_as_str() == "Message after timestamp" assert received_event.enqueued_time > offset - except: - raise - finally: - client.stop() @pytest.mark.liveTest def test_receive_with_sequence_no(connstr_senders): connection_str, senders = connstr_senders client = EventHubClient.from_connection_string(connection_str, debug=False) - receiver = client.add_receiver("$default", "0", offset=Offset('@latest')) - try: - client.run() - + receiver = client.create_receiver("$default", "0", offset=EventPosition('@latest')) + with receiver: received = receiver.receive(timeout=5) assert len(received) == 0 senders[0].send(EventData(b"Data")) @@ -202,91 +173,73 @@ def test_receive_with_sequence_no(connstr_senders): assert len(received) == 1 offset = received[0].sequence_number - offset_receiver = client.add_receiver("$default", "0", offset=Offset(offset)) - client.run() - received = offset_receiver.receive(timeout=5) - assert len(received) == 0 - senders[0].send(EventData(b"Message next in sequence")) - time.sleep(1) - received = offset_receiver.receive(timeout=5) - assert len(received) == 1 - except: - raise - finally: - client.stop() + offset_receiver = client.create_receiver("$default", "0", offset=EventPosition(offset)) + with offset_receiver: + received = offset_receiver.receive(timeout=5) + assert len(received) == 0 + senders[0].send(EventData(b"Message next in sequence")) + time.sleep(1) + received = offset_receiver.receive(timeout=5) + assert len(received) == 1 @pytest.mark.liveTest def test_receive_with_inclusive_sequence_no(connstr_senders): connection_str, senders = connstr_senders client = EventHubClient.from_connection_string(connection_str, debug=False) - receiver = client.add_receiver("$default", "0", offset=Offset('@latest')) - try: - client.run() - + receiver = client.create_receiver("$default", "0", offset=EventPosition('@latest')) + with receiver: received = receiver.receive(timeout=5) assert len(received) == 0 senders[0].send(EventData(b"Data")) received = receiver.receive(timeout=5) assert len(received) == 1 offset = received[0].sequence_number - - offset_receiver = client.add_receiver("$default", "0", offset=Offset(offset, inclusive=True)) - client.run() - received = offset_receiver.receive(timeout=5) - assert len(received) == 1 - except: - raise - finally: - client.stop() + offset_receiver = client.create_receiver("$default", "0", offset=EventPosition(offset, inclusive=True)) + with offset_receiver: + received = offset_receiver.receive(timeout=5) + assert len(received) == 1 @pytest.mark.liveTest def test_receive_batch(connstr_senders): connection_str, senders = connstr_senders client = EventHubClient.from_connection_string(connection_str, debug=False) - receiver = client.add_receiver("$default", "0", prefetch=500, offset=Offset('@latest')) - try: - client.run() - + receiver = client.create_receiver("$default", "0", prefetch=500, offset=EventPosition('@latest')) + with receiver: received = receiver.receive(timeout=5) assert len(received) == 0 for i in range(10): senders[0].send(EventData(b"Data")) received = receiver.receive(max_batch_size=5, timeout=5) assert len(received) == 5 - except: - raise - finally: - client.stop() @pytest.mark.liveTest def test_receive_batch_with_app_prop_sync(connstr_senders): - pytest.skip("Waiting on uAMQP release") + #pytest.skip("Waiting on uAMQP release") connection_str, senders = connstr_senders + app_prop_key = "raw_prop" + app_prop_value = "raw_value" + batch_app_prop = {app_prop_key: app_prop_value} def batched(): for i in range(10): - yield "Event Data {}".format(i) + ed = EventData("Event Data {}".format(i)) + ed.application_properties = batch_app_prop + yield ed for i in range(10, 20): - yield EventData("Event Data {}".format(i)) + ed = EventData("Event Data {}".format(i)) + ed.application_properties = batch_app_prop + yield ed client = EventHubClient.from_connection_string(connection_str, debug=False) - receiver = client.add_receiver("$default", "0", prefetch=500, offset=Offset('@latest')) - try: - client.run() - + receiver = client.create_receiver("$default", "0", prefetch=500, offset=EventPosition('@latest')) + with receiver: received = receiver.receive(timeout=5) assert len(received) == 0 - app_prop_key = "raw_prop" - app_prop_value = "raw_value" - batch_app_prop = {app_prop_key:app_prop_value} - batch_event = EventData(batch=batched()) - batch_event.application_properties = batch_app_prop - - senders[0].send(batch_event) + senders[0].send_batch(batched()) time.sleep(1) @@ -297,7 +250,4 @@ def batched(): assert list(message.body)[0] == "Event Data {}".format(index).encode('utf-8') assert (app_prop_key.encode('utf-8') in message.application_properties) \ and (dict(message.application_properties)[app_prop_key.encode('utf-8')] == app_prop_value.encode('utf-8')) - except: - raise - finally: - client.stop() + diff --git a/sdk/eventhub/azure-eventhubs/tests/test_send.py b/sdk/eventhub/azure-eventhubs/tests/test_send.py index f7a8ccc3b158..cdf1f0ebc6d0 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_send.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_send.py @@ -5,13 +5,11 @@ # license information. #-------------------------------------------------------------------------- -import os import pytest import time import json import sys -from azure import eventhub from azure.eventhub import EventData, EventHubClient @@ -19,10 +17,8 @@ def test_send_with_partition_key(connstr_receivers): connection_str, receivers = connstr_receivers client = EventHubClient.from_connection_string(connection_str, debug=False) - sender = client.add_sender() - try: - client.run() - + sender = client.create_sender() + with sender: data_val = 0 for partition in [b"a", b"b", b"c", b"d", b"e", b"f"]: partition_key = b"test_partition_" + partition @@ -31,10 +27,6 @@ def test_send_with_partition_key(connstr_receivers): data.partition_key = partition_key data_val += 1 sender.send(data) - except: - raise - finally: - client.stop() found_partition_keys = {} for index, partition in enumerate(receivers): @@ -53,15 +45,10 @@ def test_send_and_receive_large_body_size(connstr_receivers): pytest.skip("Skipping on OSX - open issue regarding message size") connection_str, receivers = connstr_receivers client = EventHubClient.from_connection_string(connection_str, debug=False) - sender = client.add_sender() - try: - client.run() + sender = client.create_sender() + with sender: payload = 250 * 1024 sender.send(EventData("A" * payload)) - except: - raise - finally: - client.stop() received = [] for r in receivers: @@ -75,14 +62,9 @@ def test_send_and_receive_large_body_size(connstr_receivers): def test_send_and_receive_zero_length_body(connstr_receivers): connection_str, receivers = connstr_receivers client = EventHubClient.from_connection_string(connection_str, debug=False) - sender = client.add_sender() - try: - client.run() + sender = client.create_sender() + with sender: sender.send(EventData("")) - except: - raise - finally: - client.stop() received = [] for r in receivers: @@ -96,14 +78,9 @@ def test_send_and_receive_zero_length_body(connstr_receivers): def test_send_single_event(connstr_receivers): connection_str, receivers = connstr_receivers client = EventHubClient.from_connection_string(connection_str, debug=False) - sender = client.add_sender() - try: - client.run() + sender = client.create_sender() + with sender: sender.send(EventData(b"A single event")) - except: - raise - finally: - client.stop() received = [] for r in receivers: @@ -118,17 +95,12 @@ def test_send_batch_sync(connstr_receivers): connection_str, receivers = connstr_receivers def batched(): for i in range(10): - yield "Event number {}".format(i) + yield EventData("Event number {}".format(i)) client = EventHubClient.from_connection_string(connection_str, debug=False) - sender = client.add_sender() - try: - client.run() - sender.send(EventData(batch=batched())) - except: - raise - finally: - client.stop() + sender = client.create_sender() + with sender: + sender.send_batch(batched()) time.sleep(1) received = [] @@ -144,14 +116,9 @@ def batched(): def test_send_partition(connstr_receivers): connection_str, receivers = connstr_receivers client = EventHubClient.from_connection_string(connection_str, debug=False) - sender = client.add_sender(partition="1") - try: - client.run() + sender = client.create_sender(partition="1") + with sender: sender.send(EventData(b"Data")) - except: - raise - finally: - client.stop() partition_0 = receivers[0].receive(timeout=2) assert len(partition_0) == 0 @@ -163,15 +130,10 @@ def test_send_partition(connstr_receivers): def test_send_non_ascii(connstr_receivers): connection_str, receivers = connstr_receivers client = EventHubClient.from_connection_string(connection_str, debug=False) - sender = client.add_sender(partition="0") - try: - client.run() + sender = client.create_sender(partition="0") + with sender: sender.send(EventData(u"é,è,à,ù,â,ê,î,ô,û")) sender.send(EventData(json.dumps({"foo": u"漢字"}))) - except: - raise - finally: - client.stop() partition_0 = receivers[0].receive(timeout=2) assert len(partition_0) == 2 @@ -184,18 +146,13 @@ def test_send_partition_batch(connstr_receivers): connection_str, receivers = connstr_receivers def batched(): for i in range(10): - yield "Event number {}".format(i) + yield EventData("Event number {}".format(i)) client = EventHubClient.from_connection_string(connection_str, debug=False) - sender = client.add_sender(partition="1") - try: - client.run() - sender.send(EventData(batch=batched())) + sender = client.create_sender(partition="1") + with sender: + sender.send_batch(batched()) time.sleep(1) - except: - raise - finally: - client.stop() partition_0 = receivers[0].receive(timeout=2) assert len(partition_0) == 0 @@ -207,14 +164,9 @@ def batched(): def test_send_array_sync(connstr_receivers): connection_str, receivers = connstr_receivers client = EventHubClient.from_connection_string(connection_str, debug=True) - sender = client.add_sender() - try: - client.run() + sender = client.create_sender() + with sender: sender.send(EventData([b"A", b"B", b"C"])) - except: - raise - finally: - client.stop() received = [] for r in receivers: @@ -228,16 +180,12 @@ def test_send_array_sync(connstr_receivers): def test_send_multiple_clients(connstr_receivers): connection_str, receivers = connstr_receivers client = EventHubClient.from_connection_string(connection_str, debug=False) - sender_0 = client.add_sender(partition="0") - sender_1 = client.add_sender(partition="1") - try: - client.run() + sender_0 = client.create_sender(partition="0") + sender_1 = client.create_sender(partition="1") + with sender_0: sender_0.send(EventData(b"Message 0")) + with sender_1: sender_1.send(EventData(b"Message 1")) - except: - raise - finally: - client.stop() partition_0 = receivers[0].receive(timeout=2) assert len(partition_0) == 1 @@ -247,33 +195,27 @@ def test_send_multiple_clients(connstr_receivers): @pytest.mark.liveTest def test_send_batch_with_app_prop_sync(connstr_receivers): - pytest.skip("Waiting on uAMQP release") + #pytest.skip("Waiting on uAMQP release") connection_str, receivers = connstr_receivers + app_prop_key = "raw_prop" + app_prop_value = "raw_value" + app_prop = {app_prop_key: app_prop_value} + def batched(): for i in range(10): - yield "Event number {}".format(i) + ed = EventData("Event number {}".format(i)) + ed.application_properties = app_prop + yield ed for i in range(10, 20): - yield EventData("Event number {}".format(i)) + ed = EventData("Event number {}".format(i)) + ed.application_properties = app_prop + yield ed client = EventHubClient.from_connection_string(connection_str, debug=False) - sender = client.add_sender() - try: - client.run() - - app_prop_key = "raw_prop" - app_prop_value = "raw_value" - batch_app_prop = {app_prop_key:app_prop_value} - batch_event = EventData(batch=batched()) - batch_event.application_properties = batch_app_prop - - sender.send(batch_event) - except: - raise - finally: - client.stop() - + sender = client.create_sender() + with sender: + sender.send_batch(batched()) time.sleep(1) - received = [] for r in receivers: received.extend(r.receive(timeout=3))