Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CAP] Abstraction of actor_connector to go along with runtime factory and runtime abstraction #3296

Merged
merged 42 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
8f8a0bd
Added Runtime Factory to support multiple implementations
rajan-chari Jul 25, 2024
a241bdc
Rename to ComponentEnsemble to ZMQRuntime
rajan-chari Jul 25, 2024
b372e77
rename zmq_runtime
rajan-chari Jul 25, 2024
15b715e
rename zmq_runtime
rajan-chari Jul 25, 2024
d91d478
pre-commit fixes
rajan-chari Jul 26, 2024
9f27b79
pre-commit fix
rajan-chari Jul 26, 2024
b040b1d
Merge branch 'main' into rajan/cap-factory
rajan-chari Jul 26, 2024
3357b3e
Merge branch 'main' into rajan/cap-factory
rajan-chari Jul 29, 2024
83d9ae0
pre-commit fixes and default runtime
rajan-chari Jul 29, 2024
0e5cb67
pre-commit fixes
rajan-chari Jul 29, 2024
7a593f9
Rename constants
rajan-chari Jul 29, 2024
7aef126
Rename Constants
rajan-chari Jul 29, 2024
3d4af24
Merge branch 'main' into rajan/cap-factory
rajan-chari Jul 29, 2024
32965d8
Merge branch 'main' into rajan/cap-factory
rajan-chari Jul 29, 2024
a92bc04
Merge branch 'main' into rajan/cap-factory
thinkall Aug 1, 2024
fefa3cd
Merge branch 'microsoft:main' into rajan/cap-factory
rajan-chari Aug 5, 2024
81b1d92
Create interfaces for connectors
rajan-chari Aug 5, 2024
1dd77b8
pre-commit fixes
rajan-chari Aug 5, 2024
5e3ffad
pre-commit fixes
rajan-chari Aug 5, 2024
5480efb
pre-commit fixes
rajan-chari Aug 5, 2024
e8bbf7a
Merge branch 'main' into rajan/cap-factory
rajan-chari Aug 6, 2024
ec0387f
Merge branch 'main' into rajan/cap-factory
sonichi Aug 6, 2024
8944dfc
Merge branch 'main' into rajan/cap-factory
rajan-chari Aug 7, 2024
a89675e
lower case file names
rajan-chari Aug 7, 2024
4df6805
Merge branch 'rajan/cap-factory' of https://github.com/rajan-chari/au…
rajan-chari Aug 7, 2024
16ab32c
rename files to lower _case
rajan-chari Aug 7, 2024
ce37d8e
rename files to _lowercase
rajan-chari Aug 7, 2024
3d7209c
removed _
rajan-chari Aug 7, 2024
86cfb7a
Merge branch '0.2' into rajan/cap-factory
rysweet Oct 12, 2024
b4e3f93
Refactored to make Actor zmq agnostic
rajan-chari Oct 21, 2024
9827848
fix for refactor
rajan-chari Oct 21, 2024
9d0c04f
fix refactor, circular dependency
rajan-chari Oct 21, 2024
ba2a18d
pre-commit fixes
rajan-chari Oct 21, 2024
2f54a02
document classes
rajan-chari Oct 21, 2024
f9a8aac
pre-commit ruff
rajan-chari Oct 21, 2024
4c6b445
fix ruff issues
rajan-chari Oct 21, 2024
23dc8d0
ruff fixes
rajan-chari Oct 21, 2024
d38fda5
ruff fixes
rajan-chari Oct 21, 2024
380644e
actor connector documentation
rajan-chari Oct 21, 2024
735d5fb
better docs
rajan-chari Oct 21, 2024
af0912d
Merge branch '0.2' into rajan/cap-factory
rysweet Oct 21, 2024
ef17d54
Merge branch '0.2' into rajan/cap-factory
ekzhu Oct 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions samples/apps/cap/py/autogencap/Config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
xsub_url: str = "tcp://127.0.0.1:5556"
router_url: str = "tcp://127.0.0.1:5557"
dealer_url: str = "tcp://127.0.0.1:5558"
USE_COLOR_LOGGING = True
Original file line number Diff line number Diff line change
@@ -1,57 +1,38 @@
import threading
import traceback

import zmq
from .actor_runtime import IMessageReceiver, IMsgActor, IRuntime
from .debug_log import Debug, Info

from .Config import xpub_url
from .DebugLog import Debug, Error, Info


class Actor:
class Actor(IMsgActor):
def __init__(self, agent_name: str, description: str, start_thread: bool = True):
"""Initialize the Actor with a name, description, and threading option."""
self.actor_name: str = agent_name
self.agent_description: str = description
self.run = False
self._start_event = threading.Event()
self._start_thread = start_thread
self._msg_receiver: IMessageReceiver = None
self._runtime: IRuntime = None

def on_connect(self, network):
Debug(self.actor_name, f"is connecting to {network}")
def on_connect(self):
"""Connect the actor to the runtime."""
Debug(self.actor_name, f"is connecting to {self._runtime}")
Debug(self.actor_name, "connected")

def on_txt_msg(self, msg: str, msg_type: str, receiver: str, sender: str) -> bool:
"""Handle incoming text messages."""
Info(self.actor_name, f"InBox: {msg}")
return True

def on_bin_msg(self, msg: bytes, msg_type: str, receiver: str, sender: str) -> bool:
"""Handle incoming binary messages."""
Info(self.actor_name, f"Msg: receiver=[{receiver}], msg_type=[{msg_type}]")
return True

def _msg_loop_init(self):
Debug(self.actor_name, "recv thread started")
self._socket: zmq.Socket = self._context.socket(zmq.SUB)
self._socket.setsockopt(zmq.RCVTIMEO, 500)
self._socket.connect(xpub_url)
str_topic = f"{self.actor_name}"
Debug(self.actor_name, f"subscribe to: {str_topic}")
self._socket.setsockopt_string(zmq.SUBSCRIBE, f"{str_topic}")
self._start_event.set()

def get_message(self):
try:
topic, msg_type, sender_topic, msg = self._socket.recv_multipart()
topic = topic.decode("utf-8") # Convert bytes to string
msg_type = msg_type.decode("utf-8") # Convert bytes to string
sender_topic = sender_topic.decode("utf-8") # Convert bytes to string
except zmq.Again:
return None # No message received, continue to next iteration
except Exception as e:
Error(self.actor_name, f"recv thread encountered an error: {e}")
traceback.print_exc()
return None
return topic, msg_type, sender_topic, msg

def dispatch_message(self, message):
"""Dispatch the received message based on its type."""
if message is None:
return
topic, msg_type, sender_topic, msg = message
Expand All @@ -65,40 +46,50 @@ def dispatch_message(self, message):
if not self.on_bin_msg(msg, msg_type, topic, sender_topic):
self.run = False

def get_message(self):
"""Retrieve a message from the runtime implementation."""
return self._msg_receiver.get_message()

def _msg_loop(self):
"""Main message loop for receiving and dispatching messages."""
try:
self._msg_loop_init()
self._msg_receiver = self._runtime.get_new_msg_receiver()
self._msg_receiver.init(self.actor_name)
self._start_event.set()
while self.run:
message = self.get_message()
message = self._msg_receiver.get_message()
self.dispatch_message(message)
except Exception as e:
Debug(self.actor_name, f"recv thread encountered an error: {e}")
traceback.print_exc()
finally:
self.run = False
# In case there was an exception at startup signal
# the main thread.
self._start_event.set()
self.run = False
Debug(self.actor_name, "recv thread ended")

def on_start(self, context: zmq.Context):
self._context = context
self.run: bool = True
def on_start(self, runtime: IRuntime):
"""Start the actor and its message receiving thread if applicable."""
self._runtime = runtime # Save the runtime
self.run = True
if self._start_thread:
self._thread = threading.Thread(target=self._msg_loop)
self._thread.start()
self._start_event.wait()
else:
self._msg_loop_init()
self._msg_receiver = self._runtime.get_new_msg_receiver()
self._msg_receiver.init(self.actor_name)

def disconnect_network(self, network):
"""Disconnect the actor from the network."""
Debug(self.actor_name, f"is disconnecting from {network}")
Debug(self.actor_name, "disconnected")
self.stop()

def stop(self):
"""Stop the actor and its message receiver."""
self.run = False
if self._start_thread:
self._thread.join()
self._socket.setsockopt(zmq.LINGER, 0)
self._socket.close()
self._msg_receiver.stop()
83 changes: 83 additions & 0 deletions samples/apps/cap/py/autogencap/actor_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from abc import ABC, abstractmethod
from typing import Any, Optional, Tuple


class IActorConnector(ABC):
"""
Abstract base class for actor connectors. Each runtime will have a different implementation.
Obtain an instance of the correct connector from the runtime by calling the runtime's find_by_xyz
method.
"""

@abstractmethod
def send_txt_msg(self, msg: str) -> None:
"""
Send a text message to the actor.

Args:
msg (str): The text message to send.
"""
pass

@abstractmethod
def send_bin_msg(self, msg_type: str, msg: bytes) -> None:
"""
Send a binary message to the actor.

Args:
msg_type (str): The type of the binary message.
msg (bytes): The binary message to send.
"""
pass

@abstractmethod
def send_proto_msg(self, msg: Any) -> None:
"""
Send a protocol buffer message to the actor.

Args:
msg (Any): The protocol buffer message to send.
"""
pass

@abstractmethod
def send_recv_proto_msg(
self, msg: Any, num_attempts: int = 5
) -> Tuple[Optional[str], Optional[str], Optional[bytes]]:
"""
Send a protocol buffer message and receive a response from the actor.

Args:
msg (Any): The protocol buffer message to send.
num_attempts (int, optional): Number of attempts to send and receive. Defaults to 5.

Returns:
Tuple[Optional[str], Optional[str], Optional[bytes]]: A tuple containing the topic,
message type, and response message, or None if no response is received.
"""
pass

@abstractmethod
def send_recv_msg(
self, msg_type: str, msg: bytes, num_attempts: int = 5
) -> Tuple[Optional[str], Optional[str], Optional[bytes]]:
"""
Send a binary message and receive a response from the actor.

Args:
msg_type (str): The type of the binary message.
msg (bytes): The binary message to send.
num_attempts (int, optional): Number of attempts to send and receive. Defaults to 5.

Returns:
Tuple[Optional[str], Optional[str], Optional[bytes]]: A tuple containing the topic,
message type, and response message, or None if no response is received.
"""
pass

@abstractmethod
def close(self) -> None:
"""
Close the actor connector and release any resources.
"""
pass
86 changes: 79 additions & 7 deletions samples/apps/cap/py/autogencap/actor_runtime.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,108 @@
from abc import ABC, abstractmethod
from typing import List

from .Actor import Actor
from .ActorConnector import ActorConnector
from .actor_connector import IActorConnector
from .proto.CAP_pb2 import ActorInfo


class IMsgActor(ABC):
"""Abstract base class for message based actors."""

@abstractmethod
def on_connect(self, runtime: "IRuntime"):
"""Called when the actor connects to the runtime."""
pass

@abstractmethod
def on_txt_msg(self, msg: str, msg_type: str, receiver: str, sender: str) -> bool:
"""Handle incoming text messages."""
pass

@abstractmethod
def on_bin_msg(self, msg: bytes, msg_type: str, receiver: str, sender: str) -> bool:
"""Handle incoming binary messages."""
pass

@abstractmethod
def on_start(self):
"""Called when the actor starts."""
pass

@abstractmethod
def stop(self):
"""Stop the actor."""
pass

@abstractmethod
def dispatch_message(self, message):
"""Dispatch the received message based on its type."""
pass


class IMessageReceiver(ABC):
"""Abstract base class for message receivers. Implementations are runtime specific."""

@abstractmethod
def init(self, actor_name: str):
"""Initialize the message receiver."""
pass

@abstractmethod
def add_listener(self, topic: str):
"""Add a topic to the message receiver."""
pass

@abstractmethod
def get_message(self):
"""Retrieve a message from the runtime implementation."""
pass

@abstractmethod
def stop(self):
"""Stop the message receiver."""
pass


# Abstract base class for the runtime environment
class IRuntime(ABC):
"""Abstract base class for the actor runtime environment."""

@abstractmethod
def register(self, actor: IMsgActor):
"""Register an actor with the runtime."""
pass

@abstractmethod
def register(self, actor: Actor):
def get_new_msg_receiver(self) -> IMessageReceiver:
"""Create and return a new message receiver."""
pass

@abstractmethod
def connect(self):
"""Connect the runtime to the messaging system."""
pass

@abstractmethod
def disconnect(self):
"""Disconnect the runtime from the messaging system."""
pass

@abstractmethod
def find_by_topic(self, topic: str) -> ActorConnector:
def find_by_topic(self, topic: str) -> IActorConnector:
"""Find an actor connector by topic."""
pass

@abstractmethod
def find_by_name(self, name: str) -> ActorConnector:
def find_by_name(self, name: str) -> IActorConnector:
"""Find an actor connector by name."""
pass

@abstractmethod
def find_termination(self) -> ActorConnector:
def find_termination(self) -> IActorConnector:
"""Find the termination actor connector."""
pass

@abstractmethod
def find_by_name_regex(self, name_regex) -> List[ActorInfo]:
def find_by_name_regex(self, name_regex) -> List["ActorInfo"]:
"""Find actors by name using a regular expression."""
pass
13 changes: 0 additions & 13 deletions samples/apps/cap/py/autogencap/ag_adapter/AGActor.py

This file was deleted.

11 changes: 11 additions & 0 deletions samples/apps/cap/py/autogencap/ag_adapter/ag_actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from autogencap.actor import Actor
from autogencap.constants import Termination_Topic
from autogencap.debug_log import Debug


class AGActor(Actor):
def on_start(self, runtime):
super().on_start(runtime)
str_topic = Termination_Topic
self._msg_receiver.add_listener(str_topic)
Debug(self.actor_name, f"subscribe to: {str_topic}")
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from autogen import Agent, ConversableAgent

from ..actor_runtime import IRuntime
from .AutoGenConnector import AutoGenConnector
from .autogen_connector import AutoGenConnector


class AG2CAP(ConversableAgent):
Expand Down
4 changes: 2 additions & 2 deletions samples/apps/cap/py/autogencap/ag_adapter/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from autogen import ConversableAgent

from ..DebugLog import Info, Warn
from .CAP2AG import CAP2AG
from ..debug_log import Info, Warn
from .cap_to_ag import CAP2AG


class Agent:
Expand Down
Loading
Loading