Skip to content

Commit

Permalink
[eventhub] Custom Endpoint (Azure#24505)
Browse files Browse the repository at this point in the history
* sync ce

* async ce

* add string ending

* only pass to transport

* running into same recieve issue with sync

* fixing async - needs to pass to sasl

* remove logger

* stopping here

* adding prefix to fix sample

* add in prefetch

* fixing transport remove print

* host being overriden

* removing trace

* fix to use url async

* aligning sync/async pattern

* removing uneeded hostname switch

* string formatting

* changelog

* adding docstrings for supported events

* pr comments refactoring sync

* mirroring on async

* pr comment docstring

* removing import

* missing _

* missing ssl

* if no port given, we use default set in config

* async of same ^

* add default port in connection stage if port is none

* adding in docstring to cliet/connection string constructor

* custom_endpoint_address in client base async to match sync

* fix import on websocket test

* fix import 2

* skipping tests

* removing import

* pytest.mark
  • Loading branch information
l0lawrence authored and swathipil committed Aug 23, 2022
1 parent 57b529c commit bb8fa12
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 7 deletions.
1 change: 1 addition & 0 deletions sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
### Features Added

- Added support for connection using websocket and http proxy.
- Added support for custom endpoint connection over websocket.

## 5.9.0 (2022-05-10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ class EventHubConsumerClient(
evaluation regardless of the load balancing strategy.
Greedy strategy is used by default.
:paramtype load_balancing_strategy: str or ~azure.eventhub.LoadBalancingStrategy
:keyword str custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>".
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:keyword str connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to
authenticate the identity of the connection endpoint.
Default is None in which case `certifi.where()` will be used.
.. admonition:: Example:
Expand Down
16 changes: 16 additions & 0 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,22 @@ class EventHubProducerClient(
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no activity. By default the value is None, meaning that the client will not shutdown due to inactivity
unless initiated by the service.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp` in which case port 5671 is used.
If the port 5671 is unavailable/blocked in the network environment, `TransportType.AmqpOverWebsocket` could
be used instead which uses port 443 for communication.
:paramtype transport_type: ~azure.eventhub.TransportType
:keyword Dict http_proxy: HTTP proxy settings. This must be a dictionary with the following
keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value).
Additionally the following keys may also be present: `'username', 'password'`.
:keyword str custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>".
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:keyword str connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to
authenticate the identity of the connection endpoint.
Default is None in which case `certifi.where()` will be used.
.. admonition:: Example:
Expand Down
10 changes: 10 additions & 0 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .constants import (
PORT,
SECURE_PORT,
WEBSOCKET_PORT,
MAX_CHANNELS,
MAX_FRAME_SIZE_BYTES,
HEADER_FRAME,
Expand Down Expand Up @@ -99,6 +100,14 @@ def __init__(self, endpoint, **kwargs):
self._port = PORT
self.state = None # type: Optional[ConnectionState]

# Custom Endpoint
custom_endpoint_address = kwargs.get("custom_endpoint_address")
custom_endpoint = None
if custom_endpoint_address:
custom_parsed_url = urlparse(custom_endpoint_address)
custom_port = custom_parsed_url.port or WEBSOCKET_PORT
custom_endpoint = "{}:{}{}".format(custom_parsed_url.hostname, custom_port, custom_parsed_url.path)

transport = kwargs.get('transport')
self._transport_type = kwargs.pop('transport_type', TransportType.Amqp)
if transport:
Expand All @@ -111,6 +120,7 @@ def __init__(self, endpoint, **kwargs):
self._transport = sasl_transport(
host=endpoint,
credential=kwargs['sasl_credential'],
custom_endpoint=custom_endpoint,
**kwargs
)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __init__(self, host, port=AMQP_PORT, connect_timeout=None,
self.raise_on_initial_eintr = raise_on_initial_eintr
self._read_buffer = BytesIO()
self.host, self.port = to_host_port(host, port)

self.connect_timeout = connect_timeout or TIMEOUT_INTERVAL
self.read_timeout = read_timeout
self.write_timeout = write_timeout
Expand Down Expand Up @@ -663,6 +664,7 @@ def __init__(self, host, port=WEBSOCKET_PORT, connect_timeout=None, ssl=None, **
self.sslopts = ssl if isinstance(ssl, dict) else {}
self._connect_timeout = connect_timeout or TIMEOUT_INTERVAL
self._host = host
self._custom_endpoint = kwargs.get("custom_endpoint")
super().__init__(
host, port, connect_timeout, **kwargs
)
Expand All @@ -681,7 +683,7 @@ def connect(self):
try:
from websocket import create_connection
self.ws = create_connection(
url="wss://{}".format(self._host),
url="wss://{}".format(self._custom_endpoint or self._host),
subprotocols=[AMQP_WS_SUBPROTOCOL],
timeout=self._connect_timeout,
skip_utf8_validation=True,
Expand All @@ -690,6 +692,7 @@ def connect(self):
http_proxy_port=http_proxy_port,
http_proxy_auth=http_proxy_auth
)

except ImportError:
raise ValueError("Please install websocket-client library to use websocket transport.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,16 @@ async def open_async(self):
self._connection = Connection(
"amqps://" + self._hostname,
sasl_credential=self._auth.sasl,
ssl={'ca_certs': certifi.where()},
ssl={'ca_certs': self._connection_verify or certifi.where()},
container_id=self._name,
max_frame_size=self._max_frame_size,
channel_max=self._channel_max,
idle_timeout=self._idle_timeout,
properties=self._properties,
network_trace=self._network_trace,
transport_type=self._transport_type,
http_proxy=self._http_proxy
http_proxy=self._http_proxy,
custom_endpoint_address=self._custom_endpoint_address
)
await self._connection.open()
if not self._session:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
MAX_FRAME_SIZE_BYTES,
MAX_CHANNELS,
HEADER_FRAME,
WEBSOCKET_PORT,
ConnectionState,
EMPTY_FRAME,
TransportType
Expand Down Expand Up @@ -91,6 +92,7 @@ def __init__(self, endpoint, **kwargs):
self.transport = sasl_transport(
host=endpoint,
credential=kwargs['sasl_credential'],
custom_endpoint=custom_endpoint,
**kwargs
)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ def __init__(self, host, port=AMQP_PORT, connect_timeout=None,
self.raise_on_initial_eintr = raise_on_initial_eintr
self._read_buffer = BytesIO()
self.host, self.port = to_host_port(host, port)

self.connect_timeout = connect_timeout
self.read_timeout = read_timeout
self.write_timeout = write_timeout
Expand Down Expand Up @@ -426,6 +425,7 @@ def __init__(self, host, port=WEBSOCKET_PORT, connect_timeout=None, ssl=None, **
self.socket_lock = asyncio.Lock()
self.sslopts = ssl if isinstance(ssl, dict) else {}
self._connect_timeout = connect_timeout or TIMEOUT_INTERVAL
self._custom_endpoint = kwargs.get("custom_endpoint")
self.host = host
self.ws = None
self._http_proxy = kwargs.get('http_proxy', None)
Expand All @@ -442,7 +442,7 @@ async def connect(self):
try:
from websocket import create_connection
self.ws = create_connection(
url="wss://{}".format(self.host),
url="wss://{}".format(self._custom_endpoint or self.host),
subprotocols=[AMQP_WS_SUBPROTOCOL],
timeout=self._connect_timeout,
skip_utf8_validation=True,
Expand Down
9 changes: 7 additions & 2 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ def __init__(self, hostname, auth=None, **kwargs):
self._transport_type = kwargs.pop('transport_type', TransportType.Amqp)
self._http_proxy = kwargs.pop('http_proxy', None)

# Custom Endpoint
self._custom_endpoint_address = kwargs.get("custom_endpoint_address")
self._connection_verify = kwargs.get("connection_verify")

def __enter__(self):
"""Run Client in a context manager."""
self.open()
Expand Down Expand Up @@ -239,15 +243,16 @@ def open(self):
self._connection = Connection(
"amqps://" + self._hostname,
sasl_credential=self._auth.sasl,
ssl={'ca_certs':certifi.where()},
ssl={'ca_certs':self._connection_verify or certifi.where()},
container_id=self._name,
max_frame_size=self._max_frame_size,
channel_max=self._channel_max,
idle_timeout=self._idle_timeout,
properties=self._properties,
network_trace=self._network_trace,
transport_type=self._transport_type,
http_proxy=self._http_proxy
http_proxy=self._http_proxy,
custom_endpoint_address=self._custom_endpoint_address
)
self._connection.open()
if not self._session:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ class EventHubConsumerClient(
evaluation regardless of the load balancing strategy.
Greedy strategy is used by default.
:paramtype load_balancing_strategy: str or ~azure.eventhub.LoadBalancingStrategy
:keyword str custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>".
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:keyword str connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to
authenticate the identity of the connection endpoint.
Default is None in which case `certifi.where()` will be used.
.. admonition:: Example:
Expand Down Expand Up @@ -290,8 +298,17 @@ def from_connection_string(
evaluation regardless of the load balancing strategy.
Greedy strategy is used by default.
:paramtype load_balancing_strategy: str or ~azure.eventhub.LoadBalancingStrategy
:keyword str custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>".
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:keyword str connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to
authenticate the identity of the connection endpoint.
Default is None in which case `certifi.where()` will be used.
:rtype: ~azure.eventhub.aio.EventHubConsumerClient
.. admonition:: Example:
.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import pytest
import asyncio
import logging

from azure.eventhub._pyamqp import authentication
from azure.eventhub._pyamqp.aio import ReceiveClientAsync
from azure.eventhub._pyamqp.constants import TransportType

@pytest.mark.asyncio
@pytest.mark.skip()
async def test_event_hubs_client_web_socket(eventhub_config):
uri = "sb://{}/{}".format(eventhub_config['hostname'], eventhub_config['event_hub'])
sas_auth = SASTokenAuthAsync(
uri=uri,
audience=uri,
username=eventhub_config['key_name'],
password=eventhub_config['access_key']
)

source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
eventhub_config['hostname'],
eventhub_config['event_hub'],
eventhub_config['consumer_group'],
eventhub_config['partition'])

receive_client = ReceiveClientAsync(eventhub_config['hostname'] + '/$servicebus/websocket/', source, auth=sas_auth, debug=False, timeout=5000, prefetch=50, transport_type=TransportType.AmqpOverWebsocket)
await receive_client.open_async()
while not await receive_client.client_ready_async():
await asyncio.sleep(0.05)
messages = await receive_client.receive_message_batch_async(max_batch_size=1)
logging.info(len(messages))
logging.info(messages[0])
await receive_client.close_async()
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import pytest

from azure.eventhub._pyamqp import authentication, ReceiveClient
from azure.eventhub._pyamqp.constants import TransportType

@pytest.mark.skip()
def test_event_hubs_client_web_socket(live_eventhub):
uri = "sb://{}/{}".format(live_eventhub['hostname'], live_eventhub['event_hub'])
sas_auth = authentication.SASTokenAuth(
uri=uri,
audience=uri,
username=live_eventhub['key_name'],
password=live_eventhub['access_key']
)

source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
live_eventhub['hostname'],
live_eventhub['event_hub'],
live_eventhub['consumer_group'],
live_eventhub['partition'])

with ReceiveClient(live_eventhub['hostname'] + '/$servicebus/websocket/', source, auth=sas_auth, debug=False, timeout=5000, prefetch=50, transport_type=TransportType.AmqpOverWebsocket) as receive_client:
receive_client.receive_message_batch(max_batch_size=10)

0 comments on commit bb8fa12

Please sign in to comment.