diff --git a/sdk/eventhub/azure-eventhub/CHANGELOG.md b/sdk/eventhub/azure-eventhub/CHANGELOG.md index f29e8a3341e1..97a4e44f4068 100644 --- a/sdk/eventhub/azure-eventhub/CHANGELOG.md +++ b/sdk/eventhub/azure-eventhub/CHANGELOG.md @@ -25,6 +25,7 @@ ### Features Added - Added support for connection using websocket and http proxy. +- Added support for custom endpoint connection over websocket. ## 5.9.0 (2022-05-10) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py index 7817751b8b7b..5b48324bbe05 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py @@ -118,6 +118,14 @@ class EventHubConsumerClient( evaluation regardless of the load balancing strategy. Greedy strategy is used by default. :paramtype load_balancing_strategy: str or ~azure.eventhub.LoadBalancingStrategy + :keyword str custom_endpoint_address: The custom endpoint address to use for establishing a connection to + the Event Hubs service, allowing network requests to be routed through any application gateways or + other paths needed for the host environment. Default is None. + The format would be like "sb://:". + If port is not specified in the `custom_endpoint_address`, by default port 443 will be used. + :keyword str connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to + authenticate the identity of the connection endpoint. + Default is None in which case `certifi.where()` will be used. .. admonition:: Example: diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py index 21731e46ac34..623282d821ed 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py @@ -109,6 +109,22 @@ class EventHubProducerClient( :keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection if there is no activity. By default the value is None, meaning that the client will not shutdown due to inactivity unless initiated by the service. + :keyword transport_type: The type of transport protocol that will be used for communicating with + the Event Hubs service. Default is `TransportType.Amqp` in which case port 5671 is used. + If the port 5671 is unavailable/blocked in the network environment, `TransportType.AmqpOverWebsocket` could + be used instead which uses port 443 for communication. + :paramtype transport_type: ~azure.eventhub.TransportType + :keyword Dict http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). + Additionally the following keys may also be present: `'username', 'password'`. + :keyword str custom_endpoint_address: The custom endpoint address to use for establishing a connection to + the Event Hubs service, allowing network requests to be routed through any application gateways or + other paths needed for the host environment. Default is None. + The format would be like "sb://:". + If port is not specified in the `custom_endpoint_address`, by default port 443 will be used. + :keyword str connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to + authenticate the identity of the connection endpoint. + Default is None in which case `certifi.where()` will be used. .. admonition:: Example: diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py index c73417d1e56f..34515131e24f 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py @@ -18,6 +18,7 @@ from .constants import ( PORT, SECURE_PORT, + WEBSOCKET_PORT, MAX_CHANNELS, MAX_FRAME_SIZE_BYTES, HEADER_FRAME, @@ -99,6 +100,14 @@ def __init__(self, endpoint, **kwargs): self._port = PORT self.state = None # type: Optional[ConnectionState] + # Custom Endpoint + custom_endpoint_address = kwargs.get("custom_endpoint_address") + custom_endpoint = None + if custom_endpoint_address: + custom_parsed_url = urlparse(custom_endpoint_address) + custom_port = custom_parsed_url.port or WEBSOCKET_PORT + custom_endpoint = "{}:{}{}".format(custom_parsed_url.hostname, custom_port, custom_parsed_url.path) + transport = kwargs.get('transport') self._transport_type = kwargs.pop('transport_type', TransportType.Amqp) if transport: @@ -111,6 +120,7 @@ def __init__(self, endpoint, **kwargs): self._transport = sasl_transport( host=endpoint, credential=kwargs['sasl_credential'], + custom_endpoint=custom_endpoint, **kwargs ) else: diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py index b98411c12e1c..344692ca4c1e 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py @@ -149,6 +149,7 @@ def __init__(self, host, port=AMQP_PORT, connect_timeout=None, self.raise_on_initial_eintr = raise_on_initial_eintr self._read_buffer = BytesIO() self.host, self.port = to_host_port(host, port) + self.connect_timeout = connect_timeout or TIMEOUT_INTERVAL self.read_timeout = read_timeout self.write_timeout = write_timeout @@ -663,6 +664,7 @@ def __init__(self, host, port=WEBSOCKET_PORT, connect_timeout=None, ssl=None, ** self.sslopts = ssl if isinstance(ssl, dict) else {} self._connect_timeout = connect_timeout or TIMEOUT_INTERVAL self._host = host + self._custom_endpoint = kwargs.get("custom_endpoint") super().__init__( host, port, connect_timeout, **kwargs ) @@ -681,7 +683,7 @@ def connect(self): try: from websocket import create_connection self.ws = create_connection( - url="wss://{}".format(self._host), + url="wss://{}".format(self._custom_endpoint or self._host), subprotocols=[AMQP_WS_SUBPROTOCOL], timeout=self._connect_timeout, skip_utf8_validation=True, @@ -690,6 +692,7 @@ def connect(self): http_proxy_port=http_proxy_port, http_proxy_auth=http_proxy_auth ) + except ImportError: raise ValueError("Please install websocket-client library to use websocket transport.") diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py index 863285f7ca59..9a4f8f5a544c 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py @@ -194,7 +194,7 @@ async def open_async(self): self._connection = Connection( "amqps://" + self._hostname, sasl_credential=self._auth.sasl, - ssl={'ca_certs': certifi.where()}, + ssl={'ca_certs': self._connection_verify or certifi.where()}, container_id=self._name, max_frame_size=self._max_frame_size, channel_max=self._channel_max, @@ -202,7 +202,8 @@ async def open_async(self): properties=self._properties, network_trace=self._network_trace, transport_type=self._transport_type, - http_proxy=self._http_proxy + http_proxy=self._http_proxy, + custom_endpoint_address=self._custom_endpoint_address ) await self._connection.open() if not self._session: diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py index 68ba94881b63..6faff8eabb3d 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py @@ -26,6 +26,7 @@ MAX_FRAME_SIZE_BYTES, MAX_CHANNELS, HEADER_FRAME, + WEBSOCKET_PORT, ConnectionState, EMPTY_FRAME, TransportType @@ -91,6 +92,7 @@ def __init__(self, endpoint, **kwargs): self.transport = sasl_transport( host=endpoint, credential=kwargs['sasl_credential'], + custom_endpoint=custom_endpoint, **kwargs ) else: diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py index 66f3187ab7d7..89e7d29b274a 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py @@ -163,7 +163,6 @@ def __init__(self, host, port=AMQP_PORT, connect_timeout=None, self.raise_on_initial_eintr = raise_on_initial_eintr self._read_buffer = BytesIO() self.host, self.port = to_host_port(host, port) - self.connect_timeout = connect_timeout self.read_timeout = read_timeout self.write_timeout = write_timeout @@ -426,6 +425,7 @@ def __init__(self, host, port=WEBSOCKET_PORT, connect_timeout=None, ssl=None, ** self.socket_lock = asyncio.Lock() self.sslopts = ssl if isinstance(ssl, dict) else {} self._connect_timeout = connect_timeout or TIMEOUT_INTERVAL + self._custom_endpoint = kwargs.get("custom_endpoint") self.host = host self.ws = None self._http_proxy = kwargs.get('http_proxy', None) @@ -442,7 +442,7 @@ async def connect(self): try: from websocket import create_connection self.ws = create_connection( - url="wss://{}".format(self.host), + url="wss://{}".format(self._custom_endpoint or self.host), subprotocols=[AMQP_WS_SUBPROTOCOL], timeout=self._connect_timeout, skip_utf8_validation=True, diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py index 25fbb125a4fc..551e610a9df4 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py @@ -160,6 +160,10 @@ def __init__(self, hostname, auth=None, **kwargs): self._transport_type = kwargs.pop('transport_type', TransportType.Amqp) self._http_proxy = kwargs.pop('http_proxy', None) + # Custom Endpoint + self._custom_endpoint_address = kwargs.get("custom_endpoint_address") + self._connection_verify = kwargs.get("connection_verify") + def __enter__(self): """Run Client in a context manager.""" self.open() @@ -239,7 +243,7 @@ def open(self): self._connection = Connection( "amqps://" + self._hostname, sasl_credential=self._auth.sasl, - ssl={'ca_certs':certifi.where()}, + ssl={'ca_certs':self._connection_verify or certifi.where()}, container_id=self._name, max_frame_size=self._max_frame_size, channel_max=self._channel_max, @@ -247,7 +251,8 @@ def open(self): properties=self._properties, network_trace=self._network_trace, transport_type=self._transport_type, - http_proxy=self._http_proxy + http_proxy=self._http_proxy, + custom_endpoint_address=self._custom_endpoint_address ) self._connection.open() if not self._session: diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py index 009e5ac8a1cf..f44630f83ace 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py @@ -123,6 +123,14 @@ class EventHubConsumerClient( evaluation regardless of the load balancing strategy. Greedy strategy is used by default. :paramtype load_balancing_strategy: str or ~azure.eventhub.LoadBalancingStrategy + :keyword str custom_endpoint_address: The custom endpoint address to use for establishing a connection to + the Event Hubs service, allowing network requests to be routed through any application gateways or + other paths needed for the host environment. Default is None. + The format would be like "sb://:". + If port is not specified in the `custom_endpoint_address`, by default port 443 will be used. + :keyword str connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to + authenticate the identity of the connection endpoint. + Default is None in which case `certifi.where()` will be used. .. admonition:: Example: @@ -290,8 +298,17 @@ def from_connection_string( evaluation regardless of the load balancing strategy. Greedy strategy is used by default. :paramtype load_balancing_strategy: str or ~azure.eventhub.LoadBalancingStrategy + :keyword str custom_endpoint_address: The custom endpoint address to use for establishing a connection to + the Event Hubs service, allowing network requests to be routed through any application gateways or + other paths needed for the host environment. Default is None. + The format would be like "sb://:". + If port is not specified in the `custom_endpoint_address`, by default port 443 will be used. + :keyword str connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to + authenticate the identity of the connection endpoint. + Default is None in which case `certifi.where()` will be used. :rtype: ~azure.eventhub.aio.EventHubConsumerClient + .. admonition:: Example: .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py diff --git a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/async/test_websocket_async.py b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/async/test_websocket_async.py new file mode 100644 index 000000000000..d0ec02564973 --- /dev/null +++ b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/async/test_websocket_async.py @@ -0,0 +1,38 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import pytest +import asyncio +import logging + +from azure.eventhub._pyamqp import authentication +from azure.eventhub._pyamqp.aio import ReceiveClientAsync +from azure.eventhub._pyamqp.constants import TransportType + +@pytest.mark.asyncio +@pytest.mark.skip() +async def test_event_hubs_client_web_socket(eventhub_config): + uri = "sb://{}/{}".format(eventhub_config['hostname'], eventhub_config['event_hub']) + sas_auth = SASTokenAuthAsync( + uri=uri, + audience=uri, + username=eventhub_config['key_name'], + password=eventhub_config['access_key'] + ) + + source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format( + eventhub_config['hostname'], + eventhub_config['event_hub'], + eventhub_config['consumer_group'], + eventhub_config['partition']) + + receive_client = ReceiveClientAsync(eventhub_config['hostname'] + '/$servicebus/websocket/', source, auth=sas_auth, debug=False, timeout=5000, prefetch=50, transport_type=TransportType.AmqpOverWebsocket) + await receive_client.open_async() + while not await receive_client.client_ready_async(): + await asyncio.sleep(0.05) + messages = await receive_client.receive_message_batch_async(max_batch_size=1) + logging.info(len(messages)) + logging.info(messages[0]) + await receive_client.close_async() diff --git a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_websocket.py b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_websocket.py new file mode 100644 index 000000000000..4e180b9cc243 --- /dev/null +++ b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_websocket.py @@ -0,0 +1,28 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import pytest + +from azure.eventhub._pyamqp import authentication, ReceiveClient +from azure.eventhub._pyamqp.constants import TransportType + +@pytest.mark.skip() +def test_event_hubs_client_web_socket(live_eventhub): + uri = "sb://{}/{}".format(live_eventhub['hostname'], live_eventhub['event_hub']) + sas_auth = authentication.SASTokenAuth( + uri=uri, + audience=uri, + username=live_eventhub['key_name'], + password=live_eventhub['access_key'] + ) + + source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format( + live_eventhub['hostname'], + live_eventhub['event_hub'], + live_eventhub['consumer_group'], + live_eventhub['partition']) + + with ReceiveClient(live_eventhub['hostname'] + '/$servicebus/websocket/', source, auth=sas_auth, debug=False, timeout=5000, prefetch=50, transport_type=TransportType.AmqpOverWebsocket) as receive_client: + receive_client.receive_message_batch(max_batch_size=10)