From 4a5a3bb43603fdb38aa2ff3b2946384f65cd5b37 Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Sat, 12 Oct 2024 12:42:44 -0400 Subject: [PATCH 1/8] feat: implement DevBroker for development purposes --- .../common/messaging/messaging.py | 80 ++++++++++++++++--- .../common/messaging/messaging_builder.py | 3 + .../components/inputs_outputs/broker_base.py | 1 + .../components/inputs_outputs/broker_input.py | 16 ++-- 4 files changed, 85 insertions(+), 15 deletions(-) diff --git a/src/solace_ai_connector/common/messaging/messaging.py b/src/solace_ai_connector/common/messaging/messaging.py index 0844863..cb71459 100644 --- a/src/solace_ai_connector/common/messaging/messaging.py +++ b/src/solace_ai_connector/common/messaging/messaging.py @@ -1,5 +1,8 @@ -# messaging.py - Base class for EDA messaging services - +import threading +import queue +from typing import Dict, List, Any +import re +from copy import deepcopy class Messaging: def __init__(self, broker_properties: dict): @@ -11,14 +14,73 @@ def connect(self): def disconnect(self): raise NotImplementedError - def receive_message(self, timeout_ms): + def receive_message(self, timeout_ms, queue_id: str): + raise NotImplementedError + + def send_message(self, destination_name: str, payload: Any, user_properties: Dict = None): + raise NotImplementedError + + def subscribe(self, subscription: str, queue_id: str): raise NotImplementedError - # def is_connected(self): - # raise NotImplementedError +class DevBroker(Messaging): + def __init__(self, broker_properties: dict): + super().__init__(broker_properties) + self.subscriptions: Dict[str, List[str]] = {} + self.queues: Dict[str, queue.Queue] = {} + self.connected = False + self.lock = threading.Lock() + + def connect(self): + self.connected = True + + def disconnect(self): + self.connected = False + + def receive_message(self, timeout_ms, queue_id: str): + if not self.connected: + raise RuntimeError("DevBroker is not connected") + + try: + return self.queues[queue_id].get(timeout=timeout_ms/1000) + except queue.Empty: + return None + + def send_message(self, destination_name: str, payload: Any, user_properties: Dict = None): + if not self.connected: + raise RuntimeError("DevBroker is not connected") + + message = { + 'payload': payload, + 'topic': destination_name, + 'user_properties': user_properties or {} + } + + matching_queue_ids = self._get_matching_queue_ids(destination_name) + + for queue_id in matching_queue_ids: + # Clone the message for each queue to ensure isolation + self.queues[queue_id].put(deepcopy(message)) + + def subscribe(self, subscription: str, queue_id: str): + if not self.connected: + raise RuntimeError("DevBroker is not connected") + + with self.lock: + if queue_id not in self.queues: + self.queues[queue_id] = queue.Queue() + if subscription not in self.subscriptions: + self.subscriptions[subscription] = [] + self.subscriptions[subscription].append(queue_id) - # def send_message(self, destination_name: str, message: str): - # raise NotImplementedError + def _get_matching_queue_ids(self, topic: str) -> List[str]: + matching_queue_ids = [] + for subscription, queue_ids in self.subscriptions.items(): + if self._topic_matches(subscription, topic): + matching_queue_ids.extend(queue_ids) + return list(set(matching_queue_ids)) # Remove duplicates - # def subscribe(self, subscription: str, message_handler): #: MessageHandler): - # raise NotImplementedError + @staticmethod + def _topic_matches(subscription: str, topic: str) -> bool: + regex = subscription.replace(">", ".*").replace("*", "[^/]+") + return re.match(f"^{regex}$", topic) is not None diff --git a/src/solace_ai_connector/common/messaging/messaging_builder.py b/src/solace_ai_connector/common/messaging/messaging_builder.py index 423d246..0349cd0 100644 --- a/src/solace_ai_connector/common/messaging/messaging_builder.py +++ b/src/solace_ai_connector/common/messaging/messaging_builder.py @@ -1,6 +1,7 @@ """Class to build a Messaging Service object""" from .solace_messaging import SolaceMessaging +from .messaging import DevBroker # Make a Messaging Service builder - this is a factory for Messaging Service objects @@ -15,6 +16,8 @@ def from_properties(self, broker_properties: dict): def build(self): if self.broker_properties["broker_type"] == "solace": return SolaceMessaging(self.broker_properties) + elif self.broker_properties["broker_type"] == "dev_broker": + return DevBroker(self.broker_properties) raise ValueError( f"Unsupported broker type: {self.broker_properties['broker_type']}" diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_base.py b/src/solace_ai_connector/components/inputs_outputs/broker_base.py index 85ad933..1d87469 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_base.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_base.py @@ -33,6 +33,7 @@ class BrokerBase(ComponentBase): def __init__(self, module_info, **kwargs): super().__init__(module_info, **kwargs) self.broker_properties = self.get_broker_properties() + self.queue_id = self.generate_uuid() if self.broker_properties["broker_type"] not in ["test", "test_streaming"]: self.messaging_service = ( MessagingServiceBuilder() diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_input.py b/src/solace_ai_connector/components/inputs_outputs/broker_input.py index eeeb073..0e1d272 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_input.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_input.py @@ -110,18 +110,16 @@ def invoke(self, message, data): def get_next_message(self, timeout_ms=None): if timeout_ms is None: timeout_ms = DEFAULT_TIMEOUT_MS - broker_message = self.messaging_service.receive_message(timeout_ms) + broker_message = self.messaging_service.receive_message(timeout_ms, self.queue_id) if not broker_message: return None self.current_broker_message = broker_message - payload = broker_message.get_payload_as_string() - if payload is None: - payload = broker_message.get_payload_as_bytes() + payload = broker_message.get('payload') payload = self.decode_payload(payload) - topic = broker_message.get_destination_name() - user_properties = broker_message.get_properties() + topic = broker_message.get('topic') + user_properties = broker_message.get('user_properties', {}) log.debug( "Received message from broker: topic=%s, user_properties=%s, payload length=%d", topic, @@ -130,6 +128,12 @@ def get_next_message(self, timeout_ms=None): ) return Message(payload=payload, topic=topic, user_properties=user_properties) + def connect(self): + super().connect() + if self.broker_properties.get("broker_subscriptions"): + for subscription in self.broker_properties["broker_subscriptions"]: + self.messaging_service.subscribe(subscription["topic"], self.queue_id) + def acknowledge_message(self, broker_message): self.messaging_service.ack_message(broker_message) From a5f926d03761ae7b50310b617c372806cce67fd1 Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Sat, 12 Oct 2024 12:52:12 -0400 Subject: [PATCH 2/8] refactor: convert Solace message to dictionary in receive_message --- .../common/messaging/solace_messaging.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/solace_ai_connector/common/messaging/solace_messaging.py b/src/solace_ai_connector/common/messaging/solace_messaging.py index ed33091..e752d00 100644 --- a/src/solace_ai_connector/common/messaging/solace_messaging.py +++ b/src/solace_ai_connector/common/messaging/solace_messaging.py @@ -246,8 +246,18 @@ def send_message( user_context=user_context, ) - def receive_message(self, timeout_ms): - return self.persistent_receivers[0].receive_message(timeout_ms) + def receive_message(self, timeout_ms, queue_id): + broker_message = self.persistent_receivers[0].receive_message(timeout_ms) + if broker_message is None: + return None + + # Convert Solace message to dictionary format + return { + 'payload': broker_message.get_payload_as_bytes(), + 'topic': broker_message.get_destination_name(), + 'user_properties': broker_message.get_properties(), + '_original_message': broker_message # Keep original message for acknowledgement + } def subscribe( self, subscription: str, persistent_receiver: PersistentMessageReceiver @@ -256,4 +266,7 @@ def subscribe( persistent_receiver.add_subscription(sub) def ack_message(self, broker_message): - self.persistent_receiver.ack(broker_message) + if '_original_message' in broker_message: + self.persistent_receiver.ack(broker_message['_original_message']) + else: + log.warning("Cannot acknowledge message: original Solace message not found") From 8d7c02226ba83a2c6b5a67c336475b6c77da2872 Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Sat, 12 Oct 2024 12:59:07 -0400 Subject: [PATCH 3/8] refactor: align DevBroker with solace_messaging subscription handling --- src/solace_ai_connector/common/messaging/messaging.py | 6 ++++++ .../components/inputs_outputs/broker_input.py | 8 +------- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/solace_ai_connector/common/messaging/messaging.py b/src/solace_ai_connector/common/messaging/messaging.py index cb71459..b262350 100644 --- a/src/solace_ai_connector/common/messaging/messaging.py +++ b/src/solace_ai_connector/common/messaging/messaging.py @@ -33,6 +33,12 @@ def __init__(self, broker_properties: dict): def connect(self): self.connected = True + queue_name = self.broker_properties.get("queue_name") + subscriptions = self.broker_properties.get("subscriptions", []) + if queue_name: + self.queues[queue_name] = queue.Queue() + for subscription in subscriptions: + self.subscribe(subscription["topic"], queue_name) def disconnect(self): self.connected = False diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_input.py b/src/solace_ai_connector/components/inputs_outputs/broker_input.py index 0e1d272..48753b0 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_input.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_input.py @@ -110,7 +110,7 @@ def invoke(self, message, data): def get_next_message(self, timeout_ms=None): if timeout_ms is None: timeout_ms = DEFAULT_TIMEOUT_MS - broker_message = self.messaging_service.receive_message(timeout_ms, self.queue_id) + broker_message = self.messaging_service.receive_message(timeout_ms, self.broker_properties["queue_name"]) if not broker_message: return None self.current_broker_message = broker_message @@ -128,12 +128,6 @@ def get_next_message(self, timeout_ms=None): ) return Message(payload=payload, topic=topic, user_properties=user_properties) - def connect(self): - super().connect() - if self.broker_properties.get("broker_subscriptions"): - for subscription in self.broker_properties["broker_subscriptions"]: - self.messaging_service.subscribe(subscription["topic"], self.queue_id) - def acknowledge_message(self, broker_message): self.messaging_service.ack_message(broker_message) From ae3dceddf0d1e7076bfb617223b07e9b69668126 Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Sat, 12 Oct 2024 13:08:20 -0400 Subject: [PATCH 4/8] More changes --- .../common/messaging/dev_broker_messaging.py | 80 +++++++++++++++++++ .../common/messaging/messaging.py | 79 ++---------------- .../common/messaging/messaging_builder.py | 2 +- .../common/messaging/solace_messaging.py | 15 ++-- .../components/inputs_outputs/broker_input.py | 10 ++- 5 files changed, 100 insertions(+), 86 deletions(-) create mode 100644 src/solace_ai_connector/common/messaging/dev_broker_messaging.py diff --git a/src/solace_ai_connector/common/messaging/dev_broker_messaging.py b/src/solace_ai_connector/common/messaging/dev_broker_messaging.py new file mode 100644 index 0000000..ba52d1c --- /dev/null +++ b/src/solace_ai_connector/common/messaging/dev_broker_messaging.py @@ -0,0 +1,80 @@ +"""This is a simple broker for testing purposes. It allows sending and receiving +messages to/from queues. It supports subscriptions based on topics.""" + +from typing import Dict, List, Any +import threading +import queue +import re +from copy import deepcopy +from .messaging import Messaging + + +class DevBroker(Messaging): + def __init__(self, broker_properties: dict): + super().__init__(broker_properties) + self.subscriptions: Dict[str, List[str]] = {} + self.queues: Dict[str, queue.Queue] = {} + self.connected = False + self.lock = threading.Lock() + + def connect(self): + self.connected = True + queue_name = self.broker_properties.get("queue_name") + subscriptions = self.broker_properties.get("subscriptions", []) + if queue_name: + self.queues[queue_name] = queue.Queue() + for subscription in subscriptions: + self.subscribe(subscription["topic"], queue_name) + + def disconnect(self): + self.connected = False + + def receive_message(self, timeout_ms, queue_id: str): + if not self.connected: + raise RuntimeError("DevBroker is not connected") + + try: + return self.queues[queue_id].get(timeout=timeout_ms / 1000) + except queue.Empty: + return None + + def send_message( + self, destination_name: str, payload: Any, user_properties: Dict = None + ): + if not self.connected: + raise RuntimeError("DevBroker is not connected") + + message = { + "payload": payload, + "topic": destination_name, + "user_properties": user_properties or {}, + } + + matching_queue_ids = self._get_matching_queue_ids(destination_name) + + for queue_id in matching_queue_ids: + # Clone the message for each queue to ensure isolation + self.queues[queue_id].put(deepcopy(message)) + + def subscribe(self, subscription: str, queue_id: str): + if not self.connected: + raise RuntimeError("DevBroker is not connected") + + with self.lock: + if queue_id not in self.queues: + self.queues[queue_id] = queue.Queue() + if subscription not in self.subscriptions: + self.subscriptions[subscription] = [] + self.subscriptions[subscription].append(queue_id) + + def _get_matching_queue_ids(self, topic: str) -> List[str]: + matching_queue_ids = [] + for subscription, queue_ids in self.subscriptions.items(): + if self._topic_matches(subscription, topic): + matching_queue_ids.extend(queue_ids) + return list(set(matching_queue_ids)) # Remove duplicates + + @staticmethod + def _topic_matches(subscription: str, topic: str) -> bool: + regex = subscription.replace(">", ".*").replace("*", "[^/]+") + return re.match(f"^{regex}$", topic) is not None diff --git a/src/solace_ai_connector/common/messaging/messaging.py b/src/solace_ai_connector/common/messaging/messaging.py index b262350..a5d5888 100644 --- a/src/solace_ai_connector/common/messaging/messaging.py +++ b/src/solace_ai_connector/common/messaging/messaging.py @@ -1,8 +1,5 @@ -import threading -import queue -from typing import Dict, List, Any -import re -from copy import deepcopy +from typing import Any, Dict + class Messaging: def __init__(self, broker_properties: dict): @@ -17,76 +14,10 @@ def disconnect(self): def receive_message(self, timeout_ms, queue_id: str): raise NotImplementedError - def send_message(self, destination_name: str, payload: Any, user_properties: Dict = None): + def send_message( + self, destination_name: str, payload: Any, user_properties: Dict = None + ): raise NotImplementedError def subscribe(self, subscription: str, queue_id: str): raise NotImplementedError - -class DevBroker(Messaging): - def __init__(self, broker_properties: dict): - super().__init__(broker_properties) - self.subscriptions: Dict[str, List[str]] = {} - self.queues: Dict[str, queue.Queue] = {} - self.connected = False - self.lock = threading.Lock() - - def connect(self): - self.connected = True - queue_name = self.broker_properties.get("queue_name") - subscriptions = self.broker_properties.get("subscriptions", []) - if queue_name: - self.queues[queue_name] = queue.Queue() - for subscription in subscriptions: - self.subscribe(subscription["topic"], queue_name) - - def disconnect(self): - self.connected = False - - def receive_message(self, timeout_ms, queue_id: str): - if not self.connected: - raise RuntimeError("DevBroker is not connected") - - try: - return self.queues[queue_id].get(timeout=timeout_ms/1000) - except queue.Empty: - return None - - def send_message(self, destination_name: str, payload: Any, user_properties: Dict = None): - if not self.connected: - raise RuntimeError("DevBroker is not connected") - - message = { - 'payload': payload, - 'topic': destination_name, - 'user_properties': user_properties or {} - } - - matching_queue_ids = self._get_matching_queue_ids(destination_name) - - for queue_id in matching_queue_ids: - # Clone the message for each queue to ensure isolation - self.queues[queue_id].put(deepcopy(message)) - - def subscribe(self, subscription: str, queue_id: str): - if not self.connected: - raise RuntimeError("DevBroker is not connected") - - with self.lock: - if queue_id not in self.queues: - self.queues[queue_id] = queue.Queue() - if subscription not in self.subscriptions: - self.subscriptions[subscription] = [] - self.subscriptions[subscription].append(queue_id) - - def _get_matching_queue_ids(self, topic: str) -> List[str]: - matching_queue_ids = [] - for subscription, queue_ids in self.subscriptions.items(): - if self._topic_matches(subscription, topic): - matching_queue_ids.extend(queue_ids) - return list(set(matching_queue_ids)) # Remove duplicates - - @staticmethod - def _topic_matches(subscription: str, topic: str) -> bool: - regex = subscription.replace(">", ".*").replace("*", "[^/]+") - return re.match(f"^{regex}$", topic) is not None diff --git a/src/solace_ai_connector/common/messaging/messaging_builder.py b/src/solace_ai_connector/common/messaging/messaging_builder.py index 0349cd0..d642c36 100644 --- a/src/solace_ai_connector/common/messaging/messaging_builder.py +++ b/src/solace_ai_connector/common/messaging/messaging_builder.py @@ -1,7 +1,7 @@ """Class to build a Messaging Service object""" from .solace_messaging import SolaceMessaging -from .messaging import DevBroker +from .dev_broker_messaging import DevBroker # Make a Messaging Service builder - this is a factory for Messaging Service objects diff --git a/src/solace_ai_connector/common/messaging/solace_messaging.py b/src/solace_ai_connector/common/messaging/solace_messaging.py index e752d00..4b03e7a 100644 --- a/src/solace_ai_connector/common/messaging/solace_messaging.py +++ b/src/solace_ai_connector/common/messaging/solace_messaging.py @@ -250,13 +250,14 @@ def receive_message(self, timeout_ms, queue_id): broker_message = self.persistent_receivers[0].receive_message(timeout_ms) if broker_message is None: return None - + # Convert Solace message to dictionary format return { - 'payload': broker_message.get_payload_as_bytes(), - 'topic': broker_message.get_destination_name(), - 'user_properties': broker_message.get_properties(), - '_original_message': broker_message # Keep original message for acknowledgement + "payload": broker_message.get_payload_as_string() + or broker_message.get_payload_as_bytes(), + "topic": broker_message.get_destination_name(), + "user_properties": broker_message.get_properties(), + "_original_message": broker_message, # Keep original message for acknowledgement } def subscribe( @@ -266,7 +267,7 @@ def subscribe( persistent_receiver.add_subscription(sub) def ack_message(self, broker_message): - if '_original_message' in broker_message: - self.persistent_receiver.ack(broker_message['_original_message']) + if "_original_message" in broker_message: + self.persistent_receiver.ack(broker_message["_original_message"]) else: log.warning("Cannot acknowledge message: original Solace message not found") diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_input.py b/src/solace_ai_connector/components/inputs_outputs/broker_input.py index 48753b0..2d277cb 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_input.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_input.py @@ -110,16 +110,18 @@ def invoke(self, message, data): def get_next_message(self, timeout_ms=None): if timeout_ms is None: timeout_ms = DEFAULT_TIMEOUT_MS - broker_message = self.messaging_service.receive_message(timeout_ms, self.broker_properties["queue_name"]) + broker_message = self.messaging_service.receive_message( + timeout_ms, self.broker_properties["queue_name"] + ) if not broker_message: return None self.current_broker_message = broker_message - payload = broker_message.get('payload') + payload = broker_message.get("payload") payload = self.decode_payload(payload) - topic = broker_message.get('topic') - user_properties = broker_message.get('user_properties', {}) + topic = broker_message.get("topic") + user_properties = broker_message.get("user_properties", {}) log.debug( "Received message from broker: topic=%s, user_properties=%s, payload length=%d", topic, From 300b0ddba1a76a83098fa4abeec7186c1764b0e0 Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Sat, 12 Oct 2024 14:05:30 -0400 Subject: [PATCH 5/8] More tweaks --- .../common/messaging/messaging.py | 3 --- .../inputs_outputs/broker_request_response.py | 12 ++++++------ 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/solace_ai_connector/common/messaging/messaging.py b/src/solace_ai_connector/common/messaging/messaging.py index a5d5888..5c7d310 100644 --- a/src/solace_ai_connector/common/messaging/messaging.py +++ b/src/solace_ai_connector/common/messaging/messaging.py @@ -18,6 +18,3 @@ def send_message( self, destination_name: str, payload: Any, user_properties: Dict = None ): raise NotImplementedError - - def subscribe(self, subscription: str, queue_id: str): - raise NotImplementedError diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py b/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py index cb217b9..b624811 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py @@ -224,7 +224,9 @@ def start_response_thread(self): def handle_responses(self): while not self.stop_signal.is_set(): try: - broker_message = self.messaging_service.receive_message(1000) + broker_message = self.messaging_service.receive_message( + 1000, self.reply_queue_name + ) if broker_message: self.process_response(broker_message) except Exception as e: @@ -248,12 +250,10 @@ def process_response(self, broker_message): topic = broker_message.get_topic() user_properties = broker_message.get_user_properties() else: - payload = broker_message.get_payload_as_string() - if payload is None: - payload = broker_message.get_payload_as_bytes() + payload = broker_message.get("payload") payload = self.decode_payload(payload) - topic = broker_message.get_destination_name() - user_properties = broker_message.get_properties() + topic = broker_message.get("topic") + user_properties = broker_message.get("user_properties", {}) metadata_json = user_properties.get( "__solace_ai_connector_broker_request_reply_metadata__" From 645a84a1aa333c246564a769b599f1ba012562c1 Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Sat, 12 Oct 2024 15:29:01 -0400 Subject: [PATCH 6/8] Last few issues --- .../common/messaging/dev_broker_messaging.py | 50 ++++++++++++++----- .../common/messaging/messaging.py | 6 ++- .../common/messaging/messaging_builder.py | 8 ++- .../components/inputs_outputs/broker_base.py | 2 +- .../inputs_outputs/broker_request_response.py | 7 +-- 5 files changed, 53 insertions(+), 20 deletions(-) diff --git a/src/solace_ai_connector/common/messaging/dev_broker_messaging.py b/src/solace_ai_connector/common/messaging/dev_broker_messaging.py index ba52d1c..d3d1daf 100644 --- a/src/solace_ai_connector/common/messaging/dev_broker_messaging.py +++ b/src/solace_ai_connector/common/messaging/dev_broker_messaging.py @@ -2,7 +2,6 @@ messages to/from queues. It supports subscriptions based on topics.""" from typing import Dict, List, Any -import threading import queue import re from copy import deepcopy @@ -10,12 +9,21 @@ class DevBroker(Messaging): - def __init__(self, broker_properties: dict): + def __init__(self, broker_properties: dict, flow_lock_manager, flow_kv_store): super().__init__(broker_properties) - self.subscriptions: Dict[str, List[str]] = {} - self.queues: Dict[str, queue.Queue] = {} + self.flow_lock_manager = flow_lock_manager + self.flow_kv_store = flow_kv_store self.connected = False - self.lock = threading.Lock() + self.subscriptions_lock = self.flow_lock_manager.get_lock("subscriptions") + with self.subscriptions_lock: + self.subscriptions = self.flow_kv_store.get("dev_broker:subscriptions") + if self.subscriptions is None: + self.subscriptions: Dict[str, List[str]] = {} + self.flow_kv_store.set("dev_broker:subscriptions", self.subscriptions) + self.queues = self.flow_kv_store.get("dev_broker:queues") + if self.queues is None: + self.queues: Dict[str, queue.Queue] = {} + self.flow_kv_store.set("dev_broker:queues", self.queues) def connect(self): self.connected = True @@ -39,7 +47,11 @@ def receive_message(self, timeout_ms, queue_id: str): return None def send_message( - self, destination_name: str, payload: Any, user_properties: Dict = None + self, + destination_name: str, + payload: Any, + user_properties: Dict = None, + user_context: Dict = None, ): if not self.connected: raise RuntimeError("DevBroker is not connected") @@ -56,25 +68,37 @@ def send_message( # Clone the message for each queue to ensure isolation self.queues[queue_id].put(deepcopy(message)) + if user_context and "callback" in user_context: + user_context["callback"](user_context) + def subscribe(self, subscription: str, queue_id: str): if not self.connected: raise RuntimeError("DevBroker is not connected") - with self.lock: + subscription = self._subscription_to_regex(subscription) + + with self.subscriptions_lock: if queue_id not in self.queues: self.queues[queue_id] = queue.Queue() if subscription not in self.subscriptions: self.subscriptions[subscription] = [] self.subscriptions[subscription].append(queue_id) + def ack_message(self, message): + pass + def _get_matching_queue_ids(self, topic: str) -> List[str]: matching_queue_ids = [] - for subscription, queue_ids in self.subscriptions.items(): - if self._topic_matches(subscription, topic): - matching_queue_ids.extend(queue_ids) - return list(set(matching_queue_ids)) # Remove duplicates + with self.subscriptions_lock: + for subscription, queue_ids in self.subscriptions.items(): + if self._topic_matches(subscription, topic): + matching_queue_ids.extend(queue_ids) + return list(set(matching_queue_ids)) # Remove duplicates @staticmethod def _topic_matches(subscription: str, topic: str) -> bool: - regex = subscription.replace(">", ".*").replace("*", "[^/]+") - return re.match(f"^{regex}$", topic) is not None + return re.match(f"^{subscription}$", topic) is not None + + @staticmethod + def _subscription_to_regex(subscription: str) -> str: + return subscription.replace("*", "[^/]+").replace(">", ".*") diff --git a/src/solace_ai_connector/common/messaging/messaging.py b/src/solace_ai_connector/common/messaging/messaging.py index 5c7d310..5847eff 100644 --- a/src/solace_ai_connector/common/messaging/messaging.py +++ b/src/solace_ai_connector/common/messaging/messaging.py @@ -15,6 +15,10 @@ def receive_message(self, timeout_ms, queue_id: str): raise NotImplementedError def send_message( - self, destination_name: str, payload: Any, user_properties: Dict = None + self, + destination_name: str, + payload: Any, + user_properties: Dict = None, + user_context: Dict = None, ): raise NotImplementedError diff --git a/src/solace_ai_connector/common/messaging/messaging_builder.py b/src/solace_ai_connector/common/messaging/messaging_builder.py index d642c36..826cdd4 100644 --- a/src/solace_ai_connector/common/messaging/messaging_builder.py +++ b/src/solace_ai_connector/common/messaging/messaging_builder.py @@ -6,8 +6,10 @@ # Make a Messaging Service builder - this is a factory for Messaging Service objects class MessagingServiceBuilder: - def __init__(self): + def __init__(self, flow_lock_manager, flow_kv_store): self.broker_properties = {} + self.flow_lock_manager = flow_lock_manager + self.flow_kv_store = flow_kv_store def from_properties(self, broker_properties: dict): self.broker_properties = broker_properties @@ -17,7 +19,9 @@ def build(self): if self.broker_properties["broker_type"] == "solace": return SolaceMessaging(self.broker_properties) elif self.broker_properties["broker_type"] == "dev_broker": - return DevBroker(self.broker_properties) + return DevBroker( + self.broker_properties, self.flow_lock_manager, self.flow_kv_store + ) raise ValueError( f"Unsupported broker type: {self.broker_properties['broker_type']}" diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_base.py b/src/solace_ai_connector/components/inputs_outputs/broker_base.py index 1d87469..d0bc649 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_base.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_base.py @@ -36,7 +36,7 @@ def __init__(self, module_info, **kwargs): self.queue_id = self.generate_uuid() if self.broker_properties["broker_type"] not in ["test", "test_streaming"]: self.messaging_service = ( - MessagingServiceBuilder() + MessagingServiceBuilder(self.flow_lock_manager, self.flow_kv_store) .from_properties(self.broker_properties) .build() ) diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py b/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py index b624811..4c33ddb 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py @@ -193,11 +193,12 @@ def __init__(self, **kwargs): ] self.test_mode = False - if self.broker_type == "solace": - self.connect() - elif self.broker_type == "test" or self.broker_type == "test_streaming": + if self.broker_type == "test" or self.broker_type == "test_streaming": self.test_mode = True self.setup_test_pass_through() + else: + self.connect() + self.start() def start(self): From 03d15e4c97736093e3a7f6c56ab879ff51793ea0 Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Sat, 12 Oct 2024 15:38:58 -0400 Subject: [PATCH 7/8] refactor: standardize use of 'queue_name' in dev_broker_messaging --- .../common/messaging/dev_broker_messaging.py | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/solace_ai_connector/common/messaging/dev_broker_messaging.py b/src/solace_ai_connector/common/messaging/dev_broker_messaging.py index d3d1daf..0d10cd6 100644 --- a/src/solace_ai_connector/common/messaging/dev_broker_messaging.py +++ b/src/solace_ai_connector/common/messaging/dev_broker_messaging.py @@ -37,12 +37,12 @@ def connect(self): def disconnect(self): self.connected = False - def receive_message(self, timeout_ms, queue_id: str): + def receive_message(self, timeout_ms, queue_name: str): if not self.connected: raise RuntimeError("DevBroker is not connected") try: - return self.queues[queue_id].get(timeout=timeout_ms / 1000) + return self.queues[queue_name].get(timeout=timeout_ms / 1000) except queue.Empty: return None @@ -62,38 +62,38 @@ def send_message( "user_properties": user_properties or {}, } - matching_queue_ids = self._get_matching_queue_ids(destination_name) + matching_queue_names = self._get_matching_queue_names(destination_name) - for queue_id in matching_queue_ids: + for queue_name in matching_queue_names: # Clone the message for each queue to ensure isolation - self.queues[queue_id].put(deepcopy(message)) + self.queues[queue_name].put(deepcopy(message)) if user_context and "callback" in user_context: user_context["callback"](user_context) - def subscribe(self, subscription: str, queue_id: str): + def subscribe(self, subscription: str, queue_name: str): if not self.connected: raise RuntimeError("DevBroker is not connected") subscription = self._subscription_to_regex(subscription) with self.subscriptions_lock: - if queue_id not in self.queues: - self.queues[queue_id] = queue.Queue() + if queue_name not in self.queues: + self.queues[queue_name] = queue.Queue() if subscription not in self.subscriptions: self.subscriptions[subscription] = [] - self.subscriptions[subscription].append(queue_id) + self.subscriptions[subscription].append(queue_name) def ack_message(self, message): pass - def _get_matching_queue_ids(self, topic: str) -> List[str]: - matching_queue_ids = [] + def _get_matching_queue_names(self, topic: str) -> List[str]: + matching_queue_names = [] with self.subscriptions_lock: - for subscription, queue_ids in self.subscriptions.items(): + for subscription, queue_names in self.subscriptions.items(): if self._topic_matches(subscription, topic): - matching_queue_ids.extend(queue_ids) - return list(set(matching_queue_ids)) # Remove duplicates + matching_queue_names.extend(queue_names) + return list(set(matching_queue_names)) # Remove duplicates @staticmethod def _topic_matches(subscription: str, topic: str) -> bool: From 7714909b64c8cac14225eed5946b19520349501e Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Sat, 12 Oct 2024 15:42:09 -0400 Subject: [PATCH 8/8] Remove queue_id --- src/solace_ai_connector/components/inputs_outputs/broker_base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_base.py b/src/solace_ai_connector/components/inputs_outputs/broker_base.py index d0bc649..c312740 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_base.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_base.py @@ -33,7 +33,6 @@ class BrokerBase(ComponentBase): def __init__(self, module_info, **kwargs): super().__init__(module_info, **kwargs) self.broker_properties = self.get_broker_properties() - self.queue_id = self.generate_uuid() if self.broker_properties["broker_type"] not in ["test", "test_streaming"]: self.messaging_service = ( MessagingServiceBuilder(self.flow_lock_manager, self.flow_kv_store)