Skip to content

Commit

Permalink
Implement Azure Service Bus (Update and Receive) Subscription Operator (
Browse files Browse the repository at this point in the history
#25029)

Implement Azure Service Bus (Update and Receive) Subscription Operator
  • Loading branch information
bharanidharan14 authored Jul 14, 2022
1 parent 5e0160f commit 292440d
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 19 deletions.
36 changes: 36 additions & 0 deletions airflow/providers/microsoft/azure/hooks/asb.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,39 @@ def receive_message(
for msg in received_msgs:
self.log.info(msg)
receiver.complete_message(msg)

def receive_subscription_message(
self,
topic_name: str,
subscription_name: str,
max_message_count: Optional[int],
max_wait_time: Optional[float],
):
"""
Receive a batch of subscription message at once. This approach is optimal if you wish
to process multiple messages simultaneously, or perform an ad-hoc receive as a single call.
:param subscription_name: The subscription name that will own the rule in topic
:param topic_name: The topic that will own the subscription rule.
:param max_message_count: Maximum number of messages in the batch.
Actual number returned will depend on prefetch_count and incoming stream rate.
Setting to None will fully depend on the prefetch config. The default value is 1.
:param max_wait_time: Maximum time to wait in seconds for the first message to arrive. If no
messages arrive, and no timeout is specified, this call will not return until the
connection is closed. If specified, an no messages arrive within the timeout period,
an empty list will be returned.
"""
if subscription_name is None:
raise TypeError("Subscription name cannot be None.")
if topic_name is None:
raise TypeError("Topic name cannot be None.")
with self.get_conn() as service_bus_client, service_bus_client.get_subscription_receiver(
topic_name, subscription_name
) as subscription_receiver:
with subscription_receiver:
received_msgs = subscription_receiver.receive_messages(
max_message_count=max_message_count, max_wait_time=max_wait_time
)
for msg in received_msgs:
self.log.info(msg)
subscription_receiver.complete_message(msg)
116 changes: 116 additions & 0 deletions airflow/providers/microsoft/azure/operators/asb.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,122 @@ def execute(self, context: "Context") -> None:
self.log.info("Created subscription %s", subscription.name)


class AzureServiceBusUpdateSubscriptionOperator(BaseOperator):
"""
Update an Azure ServiceBus Topic Subscription under a ServiceBus Namespace
by using ServiceBusAdministrationClient
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:AzureServiceBusUpdateSubscriptionOperator`
:param topic_name: The topic that will own the to-be-created subscription.
:param subscription_name: Name of the subscription that need to be created.
:param max_delivery_count: The maximum delivery count. A message is automatically dead lettered
after this number of deliveries. Default value is 10.
:param dead_lettering_on_message_expiration: A value that indicates whether this subscription
has dead letter support when a message expires.
:param enable_batched_operations: Value that indicates whether server-side batched
operations are enabled.
:param azure_service_bus_conn_id: Reference to the
:ref:`Azure Service Bus connection<howto/connection:azure_service_bus>`.
"""

template_fields: Sequence[str] = ("topic_name", "subscription_name")
ui_color = "#e4f0e8"

def __init__(
self,
*,
topic_name: str,
subscription_name: str,
max_delivery_count: Optional[int] = None,
dead_lettering_on_message_expiration: Optional[bool] = None,
enable_batched_operations: Optional[bool] = None,
azure_service_bus_conn_id: str = 'azure_service_bus_default',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.topic_name = topic_name
self.subscription_name = subscription_name
self.max_delivery_count = max_delivery_count
self.dl_on_message_expiration = dead_lettering_on_message_expiration
self.enable_batched_operations = enable_batched_operations
self.azure_service_bus_conn_id = azure_service_bus_conn_id

def execute(self, context: "Context") -> None:
"""Updates Subscription properties, by connecting to Service Bus Admin client"""
hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)

with hook.get_conn() as service_mgmt_conn:
subscription_prop = service_mgmt_conn.get_subscription(self.topic_name, self.subscription_name)
if self.max_delivery_count:
subscription_prop.max_delivery_count = self.max_delivery_count
if self.dl_on_message_expiration is not None:
subscription_prop.dead_lettering_on_message_expiration = self.dl_on_message_expiration
if self.enable_batched_operations is not None:
subscription_prop.enable_batched_operations = self.enable_batched_operations
# update by updating the properties in the model
service_mgmt_conn.update_subscription(self.topic_name, subscription_prop)
updated_subscription = service_mgmt_conn.get_subscription(self.topic_name, self.subscription_name)
self.log.info("Subscription Updated successfully %s", updated_subscription)


class ASBReceiveSubscriptionMessageOperator(BaseOperator):
"""
Receive a Batch messages from a Service Bus Subscription under specific Topic.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:ASBReceiveSubscriptionMessageOperator`
:param subscription_name: The subscription name that will own the rule in topic
:param topic_name: The topic that will own the subscription rule.
:param max_message_count: Maximum number of messages in the batch.
Actual number returned will depend on prefetch_count and incoming stream rate.
Setting to None will fully depend on the prefetch config. The default value is 1.
:param max_wait_time: Maximum time to wait in seconds for the first message to arrive. If no
messages arrive, and no timeout is specified, this call will not return until the
connection is closed. If specified, an no messages arrive within the timeout period,
an empty list will be returned.
:param azure_service_bus_conn_id: Reference to the
:ref:`Azure Service Bus connection <howto/connection:azure_service_bus>`.
"""

template_fields: Sequence[str] = ("topic_name", "subscription_name")
ui_color = "#e4f0e8"

def __init__(
self,
*,
topic_name: str,
subscription_name: str,
max_message_count: Optional[int] = 1,
max_wait_time: Optional[float] = 5,
azure_service_bus_conn_id: str = 'azure_service_bus_default',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.topic_name = topic_name
self.subscription_name = subscription_name
self.max_message_count = max_message_count
self.max_wait_time = max_wait_time
self.azure_service_bus_conn_id = azure_service_bus_conn_id

def execute(self, context: "Context") -> None:
"""
Receive Message in specific queue in Service Bus namespace,
by connecting to Service Bus client
"""
# Create the hook
hook = MessageHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)

# Receive message
hook.receive_subscription_message(
self.topic_name, self.subscription_name, self.max_message_count, self.max_wait_time
)


class AzureServiceBusSubscriptionDeleteOperator(BaseOperator):
"""
Deletes the topic subscription in the Azure ServiceBus namespace
Expand Down
32 changes: 32 additions & 0 deletions docs/apache-airflow-providers-microsoft-azure/operators/asb.rst
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,38 @@ Below is an example of using this operator to execute an Azure Service Bus Creat
:start-after: [START howto_operator_create_service_bus_subscription]
:end-before: [END howto_operator_create_service_bus_subscription]

.. _howto/operator:AzureServiceBusUpdateSubscriptionOperator:

Update Azure Service Bus Subscription
======================================

To Update the Azure service bus topic Subscription which is already created, with specific Parameter you can use
:class:`~airflow.providers.microsoft.azure.operators.asb.AzureServiceBusUpdateSubscriptionOperator`.

Below is an example of using this operator to execute an Azure Service Bus Update Subscription.

.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_azure_service_bus.py
:language: python
:dedent: 4
:start-after: [START howto_operator_update_service_bus_subscription]
:end-before: [END howto_operator_update_service_bus_subscription]

.. _howto/operator:ASBReceiveSubscriptionMessageOperator:

Receive Azure Service Bus Subscription Message
===============================================

To Receive a Batch messages from a Service Bus Subscription under specific Topic, you can use
:class:`~airflow.providers.microsoft.azure.operators.asb.ASBReceiveSubscriptionMessageOperator`.

Below is an example of using this operator to execute an Azure Service Bus Receive Subscription Message.

.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_azure_service_bus.py
:language: python
:dedent: 4
:start-after: [START howto_operator_receive_message_service_bus_subscription]
:end-before: [END howto_operator_receive_message_service_bus_subscription]

.. _howto/operator:AzureServiceBusSubscriptionDeleteOperator:

Delete Azure Service Bus Subscription
Expand Down
81 changes: 62 additions & 19 deletions tests/providers/microsoft/azure/hooks/test_asb.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,35 @@ def test_delete_queue_exception(self, mock_sb_admin_client):
with pytest.raises(TypeError):
hook.delete_queue(None)

@mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn')
def test_delete_subscription(self, mock_sb_admin_client):
"""
Test Delete subscription functionality by passing subscription name and topic name,
assert the function with values, mock the azure service bus function `delete_subscription`
"""
subscription_name = "test_subscription_name"
topic_name = "test_topic_name"
hook = AdminClientHook(azure_service_bus_conn_id=self.conn_id)
hook.delete_subscription(subscription_name, topic_name)
expected_calls = [mock.call().__enter__().delete_subscription(topic_name, subscription_name)]
mock_sb_admin_client.assert_has_calls(expected_calls)

@pytest.mark.parametrize(
"mock_subscription_name, mock_topic_name",
[("subscription_1", None), (None, "topic_1")],
)
@mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook')
def test_delete_subscription_exception(
self, mock_sb_admin_client, mock_subscription_name, mock_topic_name
):
"""
Test `delete_subscription` functionality to raise AirflowException,
by passing subscription name and topic name as None and pytest raise Airflow Exception
"""
hook = AdminClientHook(azure_service_bus_conn_id=self.conn_id)
with pytest.raises(TypeError):
hook.delete_subscription(mock_subscription_name, mock_topic_name)


class TestMessageHook:
def setup_class(self) -> None:
Expand Down Expand Up @@ -202,31 +231,45 @@ def test_receive_message_exception(self, mock_sb_client):
with pytest.raises(TypeError):
hook.receive_message(None)

@mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn')
def test_delete_subscription(self, mock_sb_admin_client):
@mock.patch('airflow.providers.microsoft.azure.hooks.asb.MessageHook.get_conn')
def test_receive_subscription_message(self, mock_sb_client):
"""
Test Delete subscription functionality by passing subscription name and topic name,
assert the function with values, mock the azure service bus function `delete_subscription`
Test `receive_subscription_message` hook function and assert the function with mock value,
mock the azure service bus `receive_message` function of subscription
"""
subscription_name = "test_subscription_name"
topic_name = "test_topic_name"
hook = AdminClientHook(azure_service_bus_conn_id=self.conn_id)
hook.delete_subscription(subscription_name, topic_name)
expected_calls = [mock.call().__enter__().delete_subscription(topic_name, subscription_name)]
mock_sb_admin_client.assert_has_calls(expected_calls)
subscription_name = "subscription_1"
topic_name = "topic_name"
max_message_count = 10
max_wait_time = 5
hook = MessageHook(azure_service_bus_conn_id=self.conn_id)
hook.receive_subscription_message(topic_name, subscription_name, max_message_count, max_wait_time)
expected_calls = [
mock.call()
.__enter__()
.get_subscription_receiver(subscription_name, topic_name)
.__enter__()
.receive_messages(max_message_count=max_message_count, max_wait_time=max_wait_time)
.get_subscription_receiver(subscription_name, topic_name)
.__exit__()
.mock_call()
.__exit__
]
mock_sb_client.assert_has_calls(expected_calls)

@pytest.mark.parametrize(
"mock_subscription_name, mock_topic_name",
[("subscription_1", None), (None, "topic_1")],
"mock_subscription_name, mock_topic_name, mock_max_count, mock_wait_time",
[("subscription_1", None, None, None), (None, "topic_1", None, None)],
)
@mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook')
def test_delete_subscription_exception(
self, mock_sb_admin_client, mock_subscription_name, mock_topic_name
@mock.patch('airflow.providers.microsoft.azure.hooks.asb.MessageHook.get_conn')
def test_receive_subscription_message_exception(
self, mock_sb_client, mock_subscription_name, mock_topic_name, mock_max_count, mock_wait_time
):
"""
Test `delete_subscription` functionality to raise AirflowException,
by passing subscription name and topic name as None and pytest raise Airflow Exception
Test `receive_subscription_message` hook function to raise exception
by sending the subscription and topic name as none
"""
hook = AdminClientHook(azure_service_bus_conn_id=self.conn_id)
hook = MessageHook(azure_service_bus_conn_id=self.conn_id)
with pytest.raises(TypeError):
hook.delete_subscription(mock_subscription_name, mock_topic_name)
hook.receive_subscription_message(
mock_subscription_name, mock_topic_name, mock_max_count, mock_wait_time
)
Loading

0 comments on commit 292440d

Please sign in to comment.