-
Notifications
You must be signed in to change notification settings - Fork 5.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
419 additions
and
0 deletions.
There are no files selected for viewing
77 changes: 77 additions & 0 deletions
77
python/packages/autogen-core/samples/semantic_router/README.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
# Multi Agent Orchestration, Distributed Agent Runtime Example | ||
|
||
This repository is an example of how to run a distributed agent runtime. The system is composed of three main components: | ||
|
||
1. The agent host runtime, which is responsible for managing the eventing engine, and the pub/sub message system. | ||
2. The worker runtime, which is responsible for the lifecycle of the distributed agents, including the "semantic router". | ||
3. The user proxy, which is responsible for managing the user interface and the user interactions with the agents. | ||
|
||
|
||
## Example Scenario | ||
|
||
In this example, we have a simple scenario where we have a set of distributed agents (an "HR", and a "Finance" agent) which an enterprise may use to manage their HR and Finance operations. Each of these agents are independent, and can be running on different machines. While many multi-agent systems are built to have the agents collaborate to solve a difficult task - the goal of this example is to show how an enterprise may manage a large set of agents that are suited to individual tasks, and how to route a user to the most relevant agent for the task at hand. | ||
|
||
The way this system is designed, when a user initiates a session, the semantic router agent will identify the intent of the user (currently using the overly simple method of string matching), identify the most relevant agent, and then route the user to that agent. The agent will then manage the conversation with the user, and the user will be able to interact with the agent in a conversational manner. | ||
|
||
While the logic of the agents is simple in this example, the goal is to show how the distributed runtime capabilities of autogen supports this scenario independantly of the capabilities of the agents themselves. | ||
|
||
## Getting Started | ||
|
||
1. Install `autogen-core` and its dependencies | ||
|
||
## To run | ||
|
||
Since this example is meant to demonstrate a distributed runtime, the components of this example are meant to run in different processes - i.e. different terminals. | ||
|
||
In 2 separate terminals, run: | ||
|
||
```bash | ||
# Terminal 1, to run the Agent Host Runtime | ||
python run_host.py | ||
``` | ||
|
||
```bash | ||
# Terminal 2, to run the Worker Runtime | ||
python run_semantic_router.py | ||
``` | ||
|
||
The first terminal should log a series of events where the vrious agents are registered | ||
against the runtime. | ||
|
||
In the second terminal, you may enter a request related to finance or hr scenarios. | ||
In our simple example here, this means using one of the following keywords in your request: | ||
|
||
- For the finance agent: "finance", "money", "budget" | ||
- For the hr agent: "hr", "human resources", "employee" | ||
|
||
You will then see the host and worker runtimes send messages back and forth, routing to the correct | ||
agent, before the final response is printed. | ||
|
||
The conversation can then continue with the selected agent until the user sends a message containing "END",at which point the agent will be disconnected from the user and a new conversation can start. | ||
|
||
## Message Flow | ||
|
||
Using the "Topic" feature of the agent host runtime, the message flow of the system is as follows: | ||
|
||
```mermaid | ||
sequenceDiagram | ||
participant User | ||
participant Closure_Agent | ||
participant User_Proxy_Agent | ||
participant Semantic_Router | ||
participant Worker_Agent | ||
User->>User_Proxy_Agent: Send initial message | ||
Semantic_Router->>Worker_Agent: Route message to appropriate agent | ||
Worker_Agent->>User_Proxy_Agent: Respond to user message | ||
User_Proxy_Agent->>Closure_Agent: Forward message to externally facing Closure Agent | ||
Closure_Agent->>User: Expose the response to the User | ||
User->>Worker_Agent: Directly send follow up message | ||
Worker_Agent->>User_Proxy_Agent: Respond to user message | ||
User_Proxy_Agent->>Closure_Agent: Forward message to externally facing Closure Agent | ||
Closure_Agent->>User: Return response | ||
User->>Worker_Agent: Send "END" message | ||
Worker_Agent->>User_Proxy_Agent: Confirm session end | ||
User_Proxy_Agent->>Closure_Agent: Confirm session end | ||
Closure_Agent->>User: Display session end message | ||
``` |
70 changes: 70 additions & 0 deletions
70
python/packages/autogen-core/samples/semantic_router/_agents.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
import asyncio | ||
import logging | ||
|
||
from _semantic_router_components import FinalResult, TerminationMessage, UserProxyMessage, WorkerAgentMessage | ||
from autogen_core.application.logging import TRACE_LOGGER_NAME | ||
from autogen_core.base import MessageContext | ||
from autogen_core.components import DefaultTopicId, RoutedAgent, message_handler | ||
|
||
logging.basicConfig(level=logging.DEBUG) | ||
logger = logging.getLogger(f"{TRACE_LOGGER_NAME}.workers") | ||
|
||
|
||
class WorkerAgent(RoutedAgent): | ||
def __init__(self, name: str) -> None: | ||
super().__init__("A Worker Agent") | ||
self._name = name | ||
|
||
@message_handler | ||
async def my_message_handler(self, message: UserProxyMessage, ctx: MessageContext) -> None: | ||
assert ctx.topic_id is not None | ||
logger.debug(f"Received message from {message.source}: {message.content}") | ||
if "END" in message.content: | ||
await self.publish_message( | ||
TerminationMessage(reason="user terminated conversation", content=message.content, source=self.type), | ||
topic_id=DefaultTopicId(type="user_proxy", source=ctx.topic_id.source), | ||
) | ||
else: | ||
content = f"Hello from {self._name}! You said: {message.content}" | ||
logger.debug(f"Returning message: {content}") | ||
await self.publish_message( | ||
WorkerAgentMessage(content=content, source=ctx.topic_id.type), | ||
topic_id=DefaultTopicId(type="user_proxy", source=ctx.topic_id.source), | ||
) | ||
|
||
|
||
class UserProxyAgent(RoutedAgent): | ||
"""An agent that proxies user input from the console. Override the `get_user_input` | ||
method to customize how user input is retrieved. | ||
Args: | ||
description (str): The description of the agent. | ||
""" | ||
|
||
def __init__(self, description: str) -> None: | ||
super().__init__(description) | ||
|
||
# When a conversation ends | ||
@message_handler | ||
async def on_terminate(self, message: TerminationMessage, ctx: MessageContext) -> None: | ||
assert ctx.topic_id is not None | ||
"""Handle a publish now message. This method prompts the user for input, then publishes it.""" | ||
logger.debug(f"Ending conversation with {ctx.sender} because {message.reason}") | ||
await self.publish_message( | ||
FinalResult(content=message.content, source=self.id.key), | ||
topic_id=DefaultTopicId(type="response", source=ctx.topic_id.source), | ||
) | ||
|
||
# When the agent responds back, user proxy adds it to history and then | ||
# sends to Closure Agent for API to respond | ||
@message_handler | ||
async def on_agent_message(self, message: WorkerAgentMessage, ctx: MessageContext) -> None: | ||
assert ctx.topic_id is not None | ||
logger.debug(f"Received message from {message.source}: {message.content}") | ||
logger.debug("Publishing message to Closure Agent") | ||
await self.publish_message(message, topic_id=DefaultTopicId(type="response", source=ctx.topic_id.source)) | ||
|
||
async def get_user_input(self, prompt: str) -> str: | ||
"""Get user input from the console. Override this method to customize how user input is retrieved.""" | ||
loop = asyncio.get_event_loop() | ||
return await loop.run_in_executor(None, input, prompt) |
58 changes: 58 additions & 0 deletions
58
python/packages/autogen-core/samples/semantic_router/_semantic_router_agent.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
import logging | ||
|
||
from _semantic_router_components import AgentRegistryBase, IntentClassifierBase, TerminationMessage, UserProxyMessage | ||
from autogen_core.application.logging import TRACE_LOGGER_NAME | ||
from autogen_core.base import MessageContext | ||
from autogen_core.components import DefaultTopicId, RoutedAgent, default_subscription, message_handler | ||
|
||
logging.basicConfig(level=logging.WARNING) | ||
logger = logging.getLogger(f"{TRACE_LOGGER_NAME}.semantic_router") | ||
logger.setLevel(logging.DEBUG) | ||
|
||
|
||
@default_subscription | ||
class SemanticRouterAgent(RoutedAgent): | ||
def __init__(self, name: str, agent_registry: AgentRegistryBase, intent_classifier: IntentClassifierBase) -> None: | ||
super().__init__("Semantic Router Agent") | ||
self._name = name | ||
self._registry = agent_registry | ||
self._classifier = intent_classifier | ||
|
||
# The User has sent a message that needs to be routed | ||
@message_handler | ||
async def route_to_agent(self, message: UserProxyMessage, ctx: MessageContext) -> None: | ||
assert ctx.topic_id is not None | ||
logger.debug(f"Received message from {message.source}: {message.content}") | ||
session_id = ctx.topic_id.source | ||
intent = await self._identify_intent(message) | ||
agent = await self._find_agent(intent) | ||
await self.contact_agent(agent, message, session_id) | ||
|
||
## Identify the intent of the user message | ||
async def _identify_intent(self, message: UserProxyMessage) -> str: | ||
return await self._classifier.classify_intent(message.content) | ||
|
||
## Use a lookup, search, or LLM to identify the most relevant agent for the intent | ||
async def _find_agent(self, intent: str) -> str: | ||
logger.debug(f"Identified intent: {intent}") | ||
try: | ||
agent = await self._registry.get_agent(intent) | ||
return agent | ||
except KeyError: | ||
logger.debug("No relevant agent found for intent: " + intent) | ||
return "termination" | ||
|
||
## Forward user message to the appropriate agent, or end the thread. | ||
async def contact_agent(self, agent: str, message: UserProxyMessage, session_id: str) -> None: | ||
if agent == "termination": | ||
logger.debug("No relevant agent found") | ||
await self.publish_message( | ||
TerminationMessage(reason="No relevant agent found", content=message.content, source=self.type), | ||
DefaultTopicId(type="user_proxy", source=session_id), | ||
) | ||
else: | ||
logger.debug("Routing to agent: " + agent) | ||
await self.publish_message( | ||
UserProxyMessage(content=message.content, source=message.source), | ||
DefaultTopicId(type=agent, source=session_id), | ||
) |
57 changes: 57 additions & 0 deletions
57
python/packages/autogen-core/samples/semantic_router/_semantic_router_components.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
from abc import ABC, abstractmethod | ||
from dataclasses import dataclass | ||
|
||
|
||
class IntentClassifierBase(ABC): | ||
@abstractmethod | ||
async def classify_intent(self, message: str) -> str: | ||
pass | ||
|
||
|
||
class AgentRegistryBase(ABC): | ||
@abstractmethod | ||
async def get_agent(self, intent: str) -> str: | ||
pass | ||
|
||
|
||
@dataclass(kw_only=True) | ||
class BaseMessage: | ||
"""A basic message that stores the source of the message.""" | ||
|
||
source: str | ||
|
||
|
||
@dataclass | ||
class TextMessage(BaseMessage): | ||
content: str | ||
|
||
def __len__(self): | ||
return len(self.content) | ||
|
||
|
||
@dataclass | ||
class UserProxyMessage(TextMessage): | ||
"""A message that is sent from the user to the system, and needs to be routed to the appropriate agent.""" | ||
|
||
pass | ||
|
||
|
||
@dataclass | ||
class TerminationMessage(TextMessage): | ||
"""A message that is sent from the system to the user, indicating that the conversation has ended.""" | ||
|
||
reason: str | ||
|
||
|
||
@dataclass | ||
class WorkerAgentMessage(TextMessage): | ||
"""A message that is sent from a worker agent to the user.""" | ||
|
||
pass | ||
|
||
|
||
@dataclass | ||
class FinalResult(TextMessage): | ||
"""A message sent from the agent to the user, indicating the end of a conversation""" | ||
|
||
pass |
25 changes: 25 additions & 0 deletions
25
python/packages/autogen-core/samples/semantic_router/run_host.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
import asyncio | ||
import logging | ||
import platform | ||
|
||
from autogen_core.application import WorkerAgentRuntimeHost | ||
from autogen_core.application.logging import TRACE_LOGGER_NAME | ||
|
||
|
||
async def run_host(): | ||
host = WorkerAgentRuntimeHost(address="localhost:50051") | ||
host.start() # Start a host service in the background. | ||
if platform.system() == "Windows": | ||
try: | ||
while True: | ||
await asyncio.sleep(1) | ||
except KeyboardInterrupt: | ||
await host.stop() | ||
else: | ||
await host.stop_when_signal() | ||
|
||
|
||
if __name__ == "__main__": | ||
logging.basicConfig(level=logging.DEBUG) | ||
logger = logging.getLogger(f"{TRACE_LOGGER_NAME}.host") | ||
asyncio.run(run_host()) |
Oops, something went wrong.