Skip to content

Commit

Permalink
Eventhubs remove iot and linkredirect (#7450)
Browse files Browse the repository at this point in the history
* Remove iothub/link-redirect related code

* Remove self._running from consumer and producer

* Remove IoT related params "operation" and "device"

* Remove exception from close()

* add iterator long running test

* small bug fix

* small bug fix

* Fix connection properties bug and format

* Changed product to azure-eventhub in user agent

* Fix a type hint

* Improve stress script

* Print to console configurable

* small changes

* Disable tracking last enqueued event properties for uamqp 1.2.2

* use different consumer group

* fix an issue about consumer group

* fix an issue about consumer group

* Fix a get_properties bug
  • Loading branch information
YijunXieMS authored Oct 2, 2019
1 parent 634f495 commit 2ea6424
Show file tree
Hide file tree
Showing 27 changed files with 377 additions and 1,307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,34 @@ def __init__(self):
self._client = None
self._handler = None
self._name = None
self._running = False
self._closed = False

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close(exc_val)
self.close()

def _check_closed(self):
if self._error:
if self._closed:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name))

def _create_handler(self):
pass

def _redirect(self, redirect):
self._redirected = redirect
self._running = False
self._close_connection()

def _open(self):
"""
Open the EventHubConsumer/EventHubProducer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.
"""Open the EventHubConsumer/EventHubProducer using the supplied connection.
"""
# pylint: disable=protected-access
if not self._running:
if self._handler:
self._handler.close()
if self._redirected:
alt_creds = {
"username": self._client._auth_config.get("iot_username"),
"password": self._client._auth_config.get("iot_password")}
else:
alt_creds = {}
self._create_handler()
self._handler.open(connection=self._client._conn_manager.get_connection( # pylint: disable=protected-access
self._client._address.hostname,
self._client._get_auth(**alt_creds)
self._client._create_auth()
))
while not self._handler.client_ready():
time.sleep(0.05)
Expand All @@ -66,7 +54,8 @@ def _open(self):
self._running = True

def _close_handler(self):
self._handler.close() # close the link (sharing connection) or connection (not sharing)
if self._handler:
self._handler.close() # close the link (sharing connection) or connection (not sharing)
self._running = False

def _close_connection(self):
Expand All @@ -76,8 +65,6 @@ def _close_connection(self):
def _handle_exception(self, exception):
if not self._running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return _handle_exception(exception, self)

return _handle_exception(exception, self)

def _do_retryable_operation(self, operation, timeout=100000, **kwargs):
Expand All @@ -102,16 +89,11 @@ def _do_retryable_operation(self, operation, timeout=100000, **kwargs):
log.info("%r operation has exhausted retry. Last exception: %r.", self._name, last_exception)
raise last_exception

def close(self, exception=None):
# type:(Exception) -> None
def close(self):
# type:() -> None
"""
Close down the handler. If the handler has already closed,
this will be a no op. An optional exception can be passed in to
indicate that the handler was shutdown due to error.
:param exception: An optional exception if the handler is closing
due to an error.
:type exception: Exception
this will be a no op.
Example:
.. literalinclude:: ../examples/test_examples_eventhub.py
Expand All @@ -122,16 +104,8 @@ def close(self, exception=None):
:caption: Close down the handler.
"""
self._running = False
if self._error: # type: ignore
return
if isinstance(exception, errors.LinkRedirect):
self._redirected = exception
elif isinstance(exception, EventHubError):
self._error = exception
elif exception:
self._error = EventHubError(str(exception))
else:
self._error = EventHubError("{} handler is closed.".format(self._name))
if self._handler:
self._handler.close() # this will close link if sharing connection. Otherwise close connection
self._running = False
self._closed = True

Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,35 @@ def __init__(self):
self._client = None
self._handler = None
self._name = None
self._running = False
self._closed = False

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close(exc_val)
await self.close()

def _check_closed(self):
if self._error:
if self._closed:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name))

def _create_handler(self):
pass

async def _redirect(self, redirect):
self._redirected = redirect
self._running = False
await self._close_connection()

async def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.
"""
# pylint: disable=protected-access
if not self._running:
if self._handler:
await self._handler.close_async()
if self._redirected:
alt_creds = {
"username": self._client._auth_config.get("iot_username"),
"password": self._client._auth_config.get("iot_password")}
else:
alt_creds = {}
self._create_handler()
await self._handler.open_async(connection=await self._client._conn_manager.get_connection(
self._client._address.hostname,
self._client._get_auth(**alt_creds)
self._client._create_auth()
))
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)
Expand All @@ -67,7 +56,8 @@ async def _open(self):
self._running = True

async def _close_handler(self):
await self._handler.close_async() # close the link (sharing connection) or connection (not sharing)
if self._handler:
await self._handler.close_async() # close the link (sharing connection) or connection (not sharing)
self._running = False

async def _close_connection(self):
Expand Down Expand Up @@ -103,16 +93,11 @@ async def _do_retryable_operation(self, operation, timeout=100000, **kwargs):
log.info("%r operation has exhausted retry. Last exception: %r.", self._name, last_exception)
raise last_exception

async def close(self, exception=None):
# type: (Exception) -> None
async def close(self):
# type: () -> None
"""
Close down the handler. If the handler has already closed,
this will be a no op. An optional exception can be passed in to
indicate that the handler was shutdown due to error.
:param exception: An optional exception if the handler is closing
due to an error.
:type exception: Exception
this will be a no op.
Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
Expand All @@ -123,18 +108,7 @@ async def close(self, exception=None):
:caption: Close down the handler.
"""
self._running = False
if self._error: #type: ignore
return
if isinstance(exception, errors.LinkRedirect):
self._redirected = exception
elif isinstance(exception, EventHubError):
self._error = exception
elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)):
self._error = ConnectError(str(exception), exception)
elif exception:
self._error = EventHubError(str(exception))
else:
self._error = EventHubError("This receive handler is now closed.")
if self._handler:
await self._handler.close_async()
self._running = False
self._closed = True
49 changes: 8 additions & 41 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,19 @@ async def __aenter__(self):
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()

def _create_auth(self, username=None, password=None):
def _create_auth(self):
"""
Create an ~uamqp.authentication.cbs_auth_async.SASTokenAuthAsync instance to authenticate
the session.
:param username: The name of the shared access policy.
:type username: str
:param password: The shared access key.
:type password: str
"""
http_proxy = self._config.http_proxy
transport_type = self._config.transport_type
auth_timeout = self._config.auth_timeout

if isinstance(self._credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return
username = username or self._auth_config['username']
password = password or self._auth_config['password']
username = self._credential.policy
password = self._credential.key
if "@sas.root" in username:
return authentication.SASLPlain(
self._host, username, password, http_proxy=http_proxy, transport_type=transport_type)
Expand Down Expand Up @@ -117,14 +113,10 @@ async def _try_delay(self, retried_times, last_exception, timeout_time=None, ent
raise last_exception

async def _management_request(self, mgmt_msg, op_type):
alt_creds = {
"username": self._auth_config.get("iot_username"),
"password": self._auth_config.get("iot_password")
}

retried_times = 0
last_exception = None
while retried_times <= self._config.max_retries:
mgmt_auth = self._create_auth(**alt_creds)
mgmt_auth = self._create_auth()
mgmt_client = AMQPClientAsync(self._mgmt_target, auth=mgmt_auth, debug=self._config.network_tracing)
try:
conn = await self._conn_manager.get_connection(self._host, mgmt_auth)
Expand All @@ -142,18 +134,8 @@ async def _management_request(self, mgmt_msg, op_type):
retried_times += 1
finally:
await mgmt_client.close_async()

async def _iothub_redirect(self):
async with self._lock:
if self._is_iothub and not self._iothub_redirect_info:
if not self._redirect_consumer:
self._redirect_consumer = self.create_consumer(consumer_group='$default',
partition_id='0',
event_position=EventPosition('-1'),
operation='/messages/events')
async with self._redirect_consumer:
await self._redirect_consumer._open_with_retry() # pylint: disable=protected-access
self._redirect_consumer = None
log.info("%r returns an exception %r", self._container_id, last_exception)
raise last_exception

async def get_properties(self):
# type:() -> Dict[str, Any]
Expand All @@ -168,8 +150,6 @@ async def get_properties(self):
:rtype: dict
:raises: ~azure.eventhub.EventHubError
"""
if self._is_iothub and not self._iothub_redirect_info:
await self._iothub_redirect()
mgmt_msg = Message(application_properties={'name': self.eh_name})
response = await self._management_request(mgmt_msg, op_type=b'com.microsoft:eventhub')
output = {}
Expand Down Expand Up @@ -209,8 +189,6 @@ async def get_partition_properties(self, partition):
:rtype: dict
:raises: ~azure.eventhub.EventHubError
"""
if self._is_iothub and not self._iothub_redirect_info:
await self._iothub_redirect()
mgmt_msg = Message(application_properties={'name': self.eh_name,
'partition': partition})
response = await self._management_request(mgmt_msg, op_type=b'com.microsoft:partition')
Expand Down Expand Up @@ -246,9 +224,6 @@ def create_consumer(
:param owner_level: The priority of the exclusive consumer. The client will create an exclusive
consumer if owner_level is set.
:type owner_level: int
:param operation: An optional operation to be appended to the hostname in the source URL.
The value must start with `/` character.
:type operation: str
:param prefetch: The message prefetch count of the consumer. Default is 300.
:type prefetch: int
:type track_last_enqueued_event_properties: bool
Expand All @@ -272,14 +247,12 @@ def create_consumer(
"""
owner_level = kwargs.get("owner_level")
operation = kwargs.get("operation")
prefetch = kwargs.get("prefetch") or self._config.prefetch
track_last_enqueued_event_properties = kwargs.get("track_last_enqueued_event_properties", False)
loop = kwargs.get("loop")

path = self._address.path + operation if operation else self._address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
self._address.hostname, path, consumer_group, partition_id)
self._address.hostname, self._address.path, consumer_group, partition_id)
handler = EventHubConsumer(
self, source_url, event_position=event_position, owner_level=owner_level,
prefetch=prefetch,
Expand All @@ -289,7 +262,6 @@ def create_consumer(
def create_producer(
self, *,
partition_id: str = None,
operation: str = None,
send_timeout: float = None,
loop: asyncio.AbstractEventLoop = None
) -> EventHubProducer:
Expand All @@ -300,9 +272,6 @@ def create_producer(
If omitted, the events will be distributed to available partitions via
round-robin.
:type partition_id: str
:param operation: An optional operation to be appended to the hostname in the target URL.
The value must start with `/` character.
:type operation: str
:param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is
queued. Default value is 60 seconds. If set to 0, there will be no timeout.
:type send_timeout: float
Expand All @@ -320,8 +289,6 @@ def create_producer(
"""

target = "amqps://{}{}".format(self._address.hostname, self._address.path)
if operation:
target = target + operation
send_timeout = self._config.send_timeout if send_timeout is None else send_timeout

handler = EventHubProducer(
Expand Down
Loading

0 comments on commit 2ea6424

Please sign in to comment.