diff --git a/samples/apps/autogen-studio/autogenstudio/chatmanager.py b/samples/apps/autogen-studio/autogenstudio/chatmanager.py index a91401e6663..e8ed3abfd62 100644 --- a/samples/apps/autogen-studio/autogenstudio/chatmanager.py +++ b/samples/apps/autogen-studio/autogenstudio/chatmanager.py @@ -1,13 +1,12 @@ -import asyncio import os from datetime import datetime from queue import Queue from typing import Any, Dict, List, Optional, Tuple, Union -import websockets -from fastapi import WebSocket, WebSocketDisconnect +from loguru import logger from .datamodel import Message +from .websocket_connection_manager import WebSocketConnectionManager from .workflowmanager import WorkflowManager @@ -17,15 +16,19 @@ class AutoGenChatManager: using an automated workflow configuration and message queue. """ - def __init__(self, message_queue: Queue) -> None: + def __init__( + self, message_queue: Queue, websocket_manager: WebSocketConnectionManager = None, human_input_timeout: int = 180 + ) -> None: """ Initializes the AutoGenChatManager with a message queue. :param message_queue: A queue to use for sending messages asynchronously. """ self.message_queue = message_queue + self.websocket_manager = websocket_manager + self.a_human_input_timeout = human_input_timeout - def send(self, message: str) -> None: + def send(self, message: dict) -> None: """ Sends a message by putting it into the message queue. @@ -34,6 +37,45 @@ def send(self, message: str) -> None: if self.message_queue is not None: self.message_queue.put_nowait(message) + async def a_send(self, message: dict) -> None: + """ + Asynchronously sends a message via the WebSocketManager class + + :param message: The message string to be sent. + """ + for connection, socket_client_id in self.websocket_manager.active_connections: + if message["connection_id"] == socket_client_id: + logger.info( + f"Sending message to connection_id: {message['connection_id']}. Connection ID: {socket_client_id}" + ) + await self.websocket_manager.send_message(message, connection) + else: + logger.info( + f"Skipping message for connection_id: {message['connection_id']}. Connection ID: {socket_client_id}" + ) + + async def a_prompt_for_input(self, prompt: dict, timeout: int = 60) -> str: + """ + Sends the user a prompt and waits for a response asynchronously via the WebSocketManager class + + :param message: The message string to be sent. + """ + + for connection, socket_client_id in self.websocket_manager.active_connections: + if prompt["connection_id"] == socket_client_id: + logger.info( + f"Sending message to connection_id: {prompt['connection_id']}. Connection ID: {socket_client_id}" + ) + try: + result = await self.websocket_manager.get_input(prompt, connection, timeout) + return result + except Exception as e: + return f"Error: {e}\nTERMINATE" + else: + logger.info( + f"Skipping message for connection_id: {prompt['connection_id']}. Connection ID: {socket_client_id}" + ) + def chat( self, message: Message, @@ -72,6 +114,7 @@ def chat( history=history, work_dir=work_dir, send_message_function=self.send, + a_send_message_function=self.a_send, connection_id=connection_id, ) @@ -82,96 +125,55 @@ def chat( result_message.session_id = message.session_id return result_message - -class WebSocketConnectionManager: - """ - Manages WebSocket connections including sending, broadcasting, and managing the lifecycle of connections. - """ - - def __init__( + async def a_chat( self, - active_connections: List[Tuple[WebSocket, str]] = None, - active_connections_lock: asyncio.Lock = None, - ) -> None: - """ - Initializes WebSocketConnectionManager with an optional list of active WebSocket connections. - - :param active_connections: A list of tuples, each containing a WebSocket object and its corresponding client_id. - """ - if active_connections is None: - active_connections = [] - self.active_connections_lock = active_connections_lock - self.active_connections: List[Tuple[WebSocket, str]] = active_connections - - async def connect(self, websocket: WebSocket, client_id: str) -> None: + message: Message, + history: List[Dict[str, Any]], + workflow: Any = None, + connection_id: Optional[str] = None, + user_dir: Optional[str] = None, + **kwargs, + ) -> Message: """ - Accepts a new WebSocket connection and appends it to the active connections list. + Processes an incoming message according to the agent's workflow configuration + and generates a response. - :param websocket: The WebSocket instance representing a client connection. - :param client_id: A string representing the unique identifier of the client. + :param message: An instance of `Message` representing an incoming message. + :param history: A list of dictionaries, each representing a past interaction. + :param flow_config: An instance of `AgentWorkFlowConfig`. If None, defaults to a standard configuration. + :param connection_id: An optional connection identifier. + :param kwargs: Additional keyword arguments. + :return: An instance of `Message` representing a response. """ - await websocket.accept() - async with self.active_connections_lock: - self.active_connections.append((websocket, client_id)) - print(f"New Connection: {client_id}, Total: {len(self.active_connections)}") - async def disconnect(self, websocket: WebSocket) -> None: - """ - Disconnects and removes a WebSocket connection from the active connections list. + # create a working director for workflow based on user_dir/session_id/time_hash + work_dir = os.path.join( + user_dir, + str(message.session_id), + datetime.now().strftime("%Y%m%d_%H-%M-%S"), + ) + os.makedirs(work_dir, exist_ok=True) - :param websocket: The WebSocket instance to remove. - """ - async with self.active_connections_lock: - try: - self.active_connections = [conn for conn in self.active_connections if conn[0] != websocket] - print(f"Connection Closed. Total: {len(self.active_connections)}") - except ValueError: - print("Error: WebSocket connection not found") - - async def disconnect_all(self) -> None: - """ - Disconnects all active WebSocket connections. - """ - for connection, _ in self.active_connections[:]: - await self.disconnect(connection) + # if no flow config is provided, use the default + if workflow is None: + raise ValueError("Workflow must be specified") - async def send_message(self, message: Union[Dict, str], websocket: WebSocket) -> None: - """ - Sends a JSON message to a single WebSocket connection. + workflow_manager = WorkflowManager( + workflow=workflow, + history=history, + work_dir=work_dir, + send_message_function=self.send, + a_send_message_function=self.a_send, + a_human_input_function=self.a_prompt_for_input, + a_human_input_timeout=self.a_human_input_timeout, + connection_id=connection_id, + ) - :param message: A JSON serializable dictionary containing the message to send. - :param websocket: The WebSocket instance through which to send the message. - """ - try: - async with self.active_connections_lock: - await websocket.send_json(message) - except WebSocketDisconnect: - print("Error: Tried to send a message to a closed WebSocket") - await self.disconnect(websocket) - except websockets.exceptions.ConnectionClosedOK: - print("Error: WebSocket connection closed normally") - await self.disconnect(websocket) - except Exception as e: - print(f"Error in sending message: {str(e)}", message) - await self.disconnect(websocket) - - async def broadcast(self, message: Dict) -> None: - """ - Broadcasts a JSON message to all active WebSocket connections. + message_text = message.content.strip() + result_message: Message = await workflow_manager.a_run( + message=f"{message_text}", clear_history=False, history=history + ) - :param message: A JSON serializable dictionary containing the message to broadcast. - """ - # Create a message dictionary with the desired format - message_dict = {"message": message} - - for connection, _ in self.active_connections[:]: - try: - if connection.client_state == websockets.protocol.State.OPEN: - # Call send_message method with the message dictionary and current WebSocket connection - await self.send_message(message_dict, connection) - else: - print("Error: WebSocket connection is closed") - await self.disconnect(connection) - except (WebSocketDisconnect, websockets.exceptions.ConnectionClosedOK) as e: - print(f"Error: WebSocket disconnected or closed({str(e)})") - await self.disconnect(connection) + result_message.user_id = message.user_id + result_message.session_id = message.session_id + return result_message diff --git a/samples/apps/autogen-studio/autogenstudio/database/utils.py b/samples/apps/autogen-studio/autogenstudio/database/utils.py index 189fa1baf8d..ac77a916149 100644 --- a/samples/apps/autogen-studio/autogenstudio/database/utils.py +++ b/samples/apps/autogen-studio/autogenstudio/database/utils.py @@ -175,6 +175,13 @@ def init_db_samples(dbmanager: Any): model="gpt-4-1106-preview", description="OpenAI GPT-4 model", user_id="guestuser@gmail.com", api_type="open_ai" ) + anthropic_sonnet_model = Model( + model="claude-3-5-sonnet-20240620", + description="Anthropic's Claude 3.5 Sonnet model", + api_type="anthropic", + user_id="guestuser@gmail.com", + ) + # skills generate_pdf_skill = Skill( name="generate_and_save_pdf", @@ -303,6 +310,7 @@ def init_db_samples(dbmanager: Any): session.add(google_gemini_model) session.add(azure_model) session.add(gpt_4_model) + session.add(anthropic_sonnet_model) session.add(generate_image_skill) session.add(generate_pdf_skill) session.add(user_proxy) diff --git a/samples/apps/autogen-studio/autogenstudio/version.py b/samples/apps/autogen-studio/autogenstudio/version.py index 3d83da06d44..bf51c1b62b6 100644 --- a/samples/apps/autogen-studio/autogenstudio/version.py +++ b/samples/apps/autogen-studio/autogenstudio/version.py @@ -1,3 +1,3 @@ -VERSION = "0.1.4" +VERSION = "0.1.6" __version__ = VERSION APP_NAME = "autogenstudio" diff --git a/samples/apps/autogen-studio/autogenstudio/web/app.py b/samples/apps/autogen-studio/autogenstudio/web/app.py index 5926f6c64a1..bbd087f52ea 100644 --- a/samples/apps/autogen-studio/autogenstudio/web/app.py +++ b/samples/apps/autogen-studio/autogenstudio/web/app.py @@ -12,13 +12,14 @@ from loguru import logger from openai import OpenAIError -from ..chatmanager import AutoGenChatManager, WebSocketConnectionManager +from ..chatmanager import AutoGenChatManager from ..database import workflow_from_id from ..database.dbmanager import DBManager from ..datamodel import Agent, Message, Model, Response, Session, Skill, Workflow from ..profiler import Profiler from ..utils import check_and_cast_datetime_fields, init_app_folders, md5_hash, test_model from ..version import VERSION +from ..websocket_connection_manager import WebSocketConnectionManager profiler = Profiler() managers = {"chat": None} # manage calls to autogen @@ -64,11 +65,17 @@ def message_handler(): database_engine_uri = folders["database_engine_uri"] dbmanager = DBManager(engine_uri=database_engine_uri) +HUMAN_INPUT_TIMEOUT_SECONDS = 180 + @asynccontextmanager async def lifespan(app: FastAPI): print("***** App started *****") - managers["chat"] = AutoGenChatManager(message_queue=message_queue) + managers["chat"] = AutoGenChatManager( + message_queue=message_queue, + websocket_manager=websocket_manager, + human_input_timeout=HUMAN_INPUT_TIMEOUT_SECONDS, + ) dbmanager.create_db_and_tables() yield @@ -449,7 +456,7 @@ async def run_session_workflow(message: Message, session_id: int, workflow_id: i user_dir = os.path.join(folders["files_static_root"], "user", md5_hash(message.user_id)) os.makedirs(user_dir, exist_ok=True) workflow = workflow_from_id(workflow_id, dbmanager=dbmanager) - agent_response: Message = managers["chat"].chat( + agent_response: Message = await managers["chat"].a_chat( message=message, history=user_message_history, user_dir=user_dir, diff --git a/samples/apps/autogen-studio/autogenstudio/websocket_connection_manager.py b/samples/apps/autogen-studio/autogenstudio/websocket_connection_manager.py new file mode 100644 index 00000000000..73f7ef89681 --- /dev/null +++ b/samples/apps/autogen-studio/autogenstudio/websocket_connection_manager.py @@ -0,0 +1,135 @@ +import asyncio +from typing import Any, Dict, List, Optional, Tuple, Union + +import websockets +from fastapi import WebSocket, WebSocketDisconnect + + +class WebSocketConnectionManager: + """ + Manages WebSocket connections including sending, broadcasting, and managing the lifecycle of connections. + """ + + def __init__( + self, + active_connections: List[Tuple[WebSocket, str]] = None, + active_connections_lock: asyncio.Lock = None, + ) -> None: + """ + Initializes WebSocketConnectionManager with an optional list of active WebSocket connections. + + :param active_connections: A list of tuples, each containing a WebSocket object and its corresponding client_id. + """ + if active_connections is None: + active_connections = [] + self.active_connections_lock = active_connections_lock + self.active_connections: List[Tuple[WebSocket, str]] = active_connections + + async def connect(self, websocket: WebSocket, client_id: str) -> None: + """ + Accepts a new WebSocket connection and appends it to the active connections list. + + :param websocket: The WebSocket instance representing a client connection. + :param client_id: A string representing the unique identifier of the client. + """ + await websocket.accept() + async with self.active_connections_lock: + self.active_connections.append((websocket, client_id)) + print(f"New Connection: {client_id}, Total: {len(self.active_connections)}") + + async def disconnect(self, websocket: WebSocket) -> None: + """ + Disconnects and removes a WebSocket connection from the active connections list. + + :param websocket: The WebSocket instance to remove. + """ + async with self.active_connections_lock: + try: + self.active_connections = [conn for conn in self.active_connections if conn[0] != websocket] + print(f"Connection Closed. Total: {len(self.active_connections)}") + except ValueError: + print("Error: WebSocket connection not found") + + async def disconnect_all(self) -> None: + """ + Disconnects all active WebSocket connections. + """ + for connection, _ in self.active_connections[:]: + await self.disconnect(connection) + + async def send_message(self, message: Union[Dict, str], websocket: WebSocket) -> None: + """ + Sends a JSON message to a single WebSocket connection. + + :param message: A JSON serializable dictionary containing the message to send. + :param websocket: The WebSocket instance through which to send the message. + """ + try: + async with self.active_connections_lock: + await websocket.send_json(message) + except WebSocketDisconnect: + print("Error: Tried to send a message to a closed WebSocket") + await self.disconnect(websocket) + except websockets.exceptions.ConnectionClosedOK: + print("Error: WebSocket connection closed normally") + await self.disconnect(websocket) + except Exception as e: + print(f"Error in sending message: {str(e)}", message) + await self.disconnect(websocket) + + async def get_input(self, prompt: Union[Dict, str], websocket: WebSocket, timeout: int = 60) -> str: + """ + Sends a JSON message to a single WebSocket connection as a prompt for user input. + Waits on a user response or until the given timeout elapses. + + :param prompt: A JSON serializable dictionary containing the message to send. + :param websocket: The WebSocket instance through which to send the message. + """ + response = "Error: Unexpected response.\nTERMINATE" + try: + async with self.active_connections_lock: + await websocket.send_json(prompt) + result = await asyncio.wait_for(websocket.receive_json(), timeout=timeout) + data = result.get("data") + if data: + response = data.get("content", "Error: Unexpected response format\nTERMINATE") + else: + response = "Error: Unexpected response format\nTERMINATE" + + except asyncio.TimeoutError: + response = f"The user was timed out after {timeout} seconds of inactivity.\nTERMINATE" + except WebSocketDisconnect: + print("Error: Tried to send a message to a closed WebSocket") + await self.disconnect(websocket) + response = "The user was disconnected\nTERMINATE" + except websockets.exceptions.ConnectionClosedOK: + print("Error: WebSocket connection closed normally") + await self.disconnect(websocket) + response = "The user was disconnected\nTERMINATE" + except Exception as e: + print(f"Error in sending message: {str(e)}", prompt) + await self.disconnect(websocket) + response = f"Error: {e}\nTERMINATE" + + return response + + async def broadcast(self, message: Dict) -> None: + """ + Broadcasts a JSON message to all active WebSocket connections. + + :param message: A JSON serializable dictionary containing the message to broadcast. + """ + # Create a message dictionary with the desired format + message_dict = {"message": message} + + for connection, _ in self.active_connections[:]: + try: + if connection.client_state == websockets.protocol.State.OPEN: + # Call send_message method with the message dictionary and current WebSocket connection + await self.send_message(message_dict, connection) + else: + print("Error: WebSocket connection is closed") + await self.disconnect(connection) + except (WebSocketDisconnect, websockets.exceptions.ConnectionClosedOK) as e: + print(f"Error: WebSocket disconnected or closed({str(e)})") + await self.disconnect(connection) diff --git a/samples/apps/autogen-studio/autogenstudio/workflowmanager.py b/samples/apps/autogen-studio/autogenstudio/workflowmanager.py index f5065e85e5c..2da3b58b7ce 100644 --- a/samples/apps/autogen-studio/autogenstudio/workflowmanager.py +++ b/samples/apps/autogen-studio/autogenstudio/workflowmanager.py @@ -2,7 +2,7 @@ import os import time from datetime import datetime -from typing import Any, Dict, List, Optional, Union +from typing import Any, Coroutine, Dict, List, Optional, Union import autogen @@ -40,6 +40,9 @@ def __init__( work_dir: str = None, clear_work_dir: bool = True, send_message_function: Optional[callable] = None, + a_send_message_function: Optional[Coroutine] = None, + a_human_input_function: Optional[callable] = None, + a_human_input_timeout: Optional[int] = 60, connection_id: Optional[str] = None, ) -> None: """ @@ -51,6 +54,9 @@ def __init__( work_dir (str): The working directory. clear_work_dir (bool): If set to True, clears the working directory. send_message_function (Optional[callable]): The function to send messages. + a_send_message_function (Optional[Coroutine]): Async coroutine to send messages. + a_human_input_function (Optional[callable]): Async coroutine to prompt the user for input. + a_human_input_timeout (Optional[int]): A time (in seconds) to wait for user input. After this time, the a_human_input_function will timeout and end the conversation. connection_id (Optional[str]): The connection identifier. """ if isinstance(workflow, str): @@ -67,6 +73,9 @@ def __init__( # TODO - improved typing for workflow self.workflow_skills = [] self.send_message_function = send_message_function + self.a_send_message_function = a_send_message_function + self.a_human_input_function = a_human_input_function + self.a_human_input_timeout = a_human_input_timeout self.connection_id = connection_id self.work_dir = work_dir or "work_dir" self.code_executor_pool = { @@ -112,6 +121,36 @@ def _run_workflow(self, message: str, history: Optional[List[Message]] = None, c else: raise ValueError("Sender and receiver agents are not defined in the workflow configuration.") + async def _a_run_workflow( + self, message: str, history: Optional[List[Message]] = None, clear_history: bool = False + ) -> None: + """ + Asynchronously runs the workflow based on the provided configuration. + + Args: + message: The initial message to start the chat. + history: A list of messages to populate the agents' history. + clear_history: If set to True, clears the chat history before initiating. + + """ + for agent in self.workflow.get("agents", []): + if agent.get("link").get("agent_type") == "sender": + self.sender = self.load(agent.get("agent")) + elif agent.get("link").get("agent_type") == "receiver": + self.receiver = self.load(agent.get("agent")) + if self.sender and self.receiver: + # save all agent skills to skills.py + save_skills_to_file(self.workflow_skills, self.work_dir) + if history: + self._populate_history(history) + await self.sender.a_initiate_chat( + self.receiver, + message=message, + clear_history=clear_history, + ) + else: + raise ValueError("Sender and receiver agents are not defined in the workflow configuration.") + def _serialize_agent( self, agent: Agent, @@ -182,7 +221,9 @@ def process_message( "connection_id": self.connection_id, "message_type": "agent_message", } - # if the agent will respond to the message, or the message is sent by a groupchat agent. This avoids adding groupchat broadcast messages to the history (which are sent with request_reply=False), or when agent populated from history + # if the agent will respond to the message, or the message is sent by a groupchat agent. + # This avoids adding groupchat broadcast messages to the history (which are sent with request_reply=False), + # or when agent populated from history if request_reply is not False or sender_type == "groupchat": self.agent_history.append(message_payload) # add to history if self.send_message_function: # send over the message queue @@ -193,6 +234,53 @@ def process_message( ) self.send_message_function(socket_msg.dict()) + async def a_process_message( + self, + sender: autogen.Agent, + receiver: autogen.Agent, + message: Dict, + request_reply: bool = False, + silent: bool = False, + sender_type: str = "agent", + ) -> None: + """ + Asynchronously processes the message and adds it to the agent history. + + Args: + + sender: The sender of the message. + receiver: The receiver of the message. + message: The message content. + request_reply: If set to True, the message will be added to agent history. + silent: determining verbosity. + sender_type: The type of the sender of the message. + """ + + message = message if isinstance(message, dict) else {"content": message, "role": "user"} + message_payload = { + "recipient": receiver.name, + "sender": sender.name, + "message": message, + "timestamp": datetime.now().isoformat(), + "sender_type": sender_type, + "connection_id": self.connection_id, + "message_type": "agent_message", + } + # if the agent will respond to the message, or the message is sent by a groupchat agent. + # This avoids adding groupchat broadcast messages to the history (which are sent with request_reply=False), + # or when agent populated from history + if request_reply is not False or sender_type == "groupchat": + self.agent_history.append(message_payload) # add to history + socket_msg = SocketMessage( + type="agent_message", + data=message_payload, + connection_id=self.connection_id, + ) + if self.a_send_message_function: # send over the message queue + await self.a_send_message_function(socket_msg.dict()) + elif self.send_message_function: # send over the message queue + self.send_message_function(socket_msg.dict()) + def _populate_history(self, history: List[Message]) -> None: """ Populates the agent message history from the provided list of messages. @@ -222,6 +310,12 @@ def sanitize_agent(self, agent: Dict) -> Agent: """ """ skills = agent.get("skills", []) + + # When human input mode is not NEVER and no model is attached, the ui is passing bogus llm_config. + configured_models = agent.get("models") + if not configured_models or len(configured_models) == 0: + agent["config"]["llm_config"] = False + agent = Agent.model_validate(agent) agent.config.is_termination_msg = agent.config.is_termination_msg or ( lambda x: "TERMINATE" in x.get("content", "").rstrip()[-20:] @@ -284,6 +378,10 @@ def load(self, agent: Any) -> autogen.Agent: agent = ExtendedGroupChatManager( groupchat=groupchat, message_processor=self.process_message, + a_message_processor=self.a_process_message, + a_human_input_function=self.a_human_input_function, + a_human_input_timeout=self.a_human_input_timeout, + connection_id=self.connection_id, llm_config=agent.config.llm_config.model_dump(), ) return agent @@ -293,11 +391,19 @@ def load(self, agent: Any) -> autogen.Agent: agent = ExtendedConversableAgent( **self._serialize_agent(agent), message_processor=self.process_message, + a_message_processor=self.a_process_message, + a_human_input_function=self.a_human_input_function, + a_human_input_timeout=self.a_human_input_timeout, + connection_id=self.connection_id, ) elif agent.type == "userproxy": agent = ExtendedConversableAgent( **self._serialize_agent(agent), message_processor=self.process_message, + a_message_processor=self.a_process_message, + a_human_input_function=self.a_human_input_function, + a_human_input_timeout=self.a_human_input_timeout, + connection_id=self.connection_id, ) else: raise ValueError(f"Unknown agent type: {agent.type}") @@ -409,6 +515,40 @@ def run(self, message: str, history: Optional[List[Message]] = None, clear_histo ) return result_message + async def a_run( + self, message: str, history: Optional[List[Message]] = None, clear_history: bool = False + ) -> Message: + """ + Asynchronously initiates a chat between the sender and receiver agents with an initial message + and an option to clear the history. + + Args: + message: The initial message to start the chat. + clear_history: If set to True, clears the chat history before initiating. + """ + + start_time = time.time() + await self._a_run_workflow(message=message, history=history, clear_history=clear_history) + end_time = time.time() + + output = self._generate_output(message, self.workflow.get("summary_method", "last")) + + usage = self._get_usage_summary() + # print("usage", usage) + + result_message = Message( + content=output, + role="assistant", + meta={ + "messages": self.agent_history, + "summary_method": self.workflow.get("summary_method", "last"), + "time": end_time - start_time, + "files": get_modified_files(start_time, end_time, source_dir=self.work_dir), + "usage": usage, + }, + ) + return result_message + class SequentialWorkflowManager: """ @@ -422,6 +562,9 @@ def __init__( work_dir: str = None, clear_work_dir: bool = True, send_message_function: Optional[callable] = None, + a_send_message_function: Optional[Coroutine] = None, + a_human_input_function: Optional[callable] = None, + a_human_input_timeout: Optional[int] = 60, connection_id: Optional[str] = None, ) -> None: """ @@ -433,6 +576,9 @@ def __init__( work_dir (str): The working directory. clear_work_dir (bool): If set to True, clears the working directory. send_message_function (Optional[callable]): The function to send messages. + a_send_message_function (Optional[Coroutine]): Async coroutine to send messages. + a_human_input_function (Optional[callable]): Async coroutine to prompt for human input. + a_human_input_timeout (Optional[int]): A time (in seconds) to wait for user input. After this time, the a_human_input_function will timeout and end the conversation. connection_id (Optional[str]): The connection identifier. """ if isinstance(workflow, str): @@ -448,6 +594,9 @@ def __init__( # TODO - improved typing for workflow self.send_message_function = send_message_function + self.a_send_message_function = a_send_message_function + self.a_human_input_function = a_human_input_function + self.a_human_input_timeout = a_human_input_timeout self.connection_id = connection_id self.work_dir = work_dir or "work_dir" if clear_work_dir: @@ -498,6 +647,8 @@ def _run_workflow(self, message: str, history: Optional[List[Message]] = None, c work_dir=self.work_dir, clear_work_dir=True, send_message_function=self.send_message_function, + a_send_message_function=self.a_send_message_function, + a_human_input_timeout=self.a_human_input_timeout, connection_id=self.connection_id, ) task_prompt = ( @@ -519,6 +670,72 @@ def _run_workflow(self, message: str, history: Optional[List[Message]] = None, c print(f"======== end of sequence === {i}============") self.agent_history.extend(result.meta.get("messages", [])) + async def _a_run_workflow( + self, message: str, history: Optional[List[Message]] = None, clear_history: bool = False + ) -> None: + """ + Asynchronously runs the workflow based on the provided configuration. + + Args: + message: The initial message to start the chat. + history: A list of messages to populate the agents' history. + clear_history: If set to True, clears the chat history before initiating. + + """ + user_proxy = { + "config": { + "name": "user_proxy", + "human_input_mode": "NEVER", + "max_consecutive_auto_reply": 25, + "code_execution_config": "local", + "default_auto_reply": "TERMINATE", + "description": "User Proxy Agent Configuration", + "llm_config": False, + "type": "userproxy", + } + } + sequential_history = [] + for i, agent in enumerate(self.workflow.get("agents", [])): + workflow = Workflow( + name="agent workflow", type=WorkFlowType.autonomous, summary_method=WorkFlowSummaryMethod.llm + ) + workflow = workflow.model_dump(mode="json") + agent = agent.get("agent") + workflow["agents"] = [ + {"agent": user_proxy, "link": {"agent_type": "sender"}}, + {"agent": agent, "link": {"agent_type": "receiver"}}, + ] + + auto_workflow = AutoWorkflowManager( + workflow=workflow, + history=history, + work_dir=self.work_dir, + clear_work_dir=True, + send_message_function=self.send_message_function, + a_send_message_function=self.a_send_message_function, + a_human_input_function=self.a_human_input_function, + a_human_input_timeout=self.a_human_input_timeout, + connection_id=self.connection_id, + ) + task_prompt = ( + f""" + Your primary instructions are as follows: + {agent.get("task_instruction")} + Context for addressing your task is below: + ======= + {str(sequential_history)} + ======= + Now address your task: + """ + if i > 0 + else message + ) + result = await auto_workflow.a_run(message=task_prompt, clear_history=clear_history) + sequential_history.append(result.content) + self.model_client = auto_workflow.receiver.client + print(f"======== end of sequence === {i}============") + self.agent_history.extend(result.meta.get("messages", [])) + def _generate_output( self, message_text: str, @@ -587,6 +804,36 @@ def run(self, message: str, history: Optional[List[Message]] = None, clear_histo ) return result_message + async def a_run( + self, message: str, history: Optional[List[Message]] = None, clear_history: bool = False + ) -> Message: + """ + Asynchronously initiates a chat between the sender and receiver agents with an initial message + and an option to clear the history. + + Args: + message: The initial message to start the chat. + clear_history: If set to True, clears the chat history before initiating. + """ + + start_time = time.time() + await self._a_run_workflow(message=message, history=history, clear_history=clear_history) + end_time = time.time() + output = self._generate_output(message, self.workflow.get("summary_method", "last")) + + result_message = Message( + content=output, + role="assistant", + meta={ + "messages": self.agent_history, + "summary_method": self.workflow.get("summary_method", "last"), + "time": end_time - start_time, + "files": get_modified_files(start_time, end_time, source_dir=self.work_dir), + "task": message, + }, + ) + return result_message + class WorkflowManager: """ @@ -600,6 +847,9 @@ def __new__( work_dir: str = None, clear_work_dir: bool = True, send_message_function: Optional[callable] = None, + a_send_message_function: Optional[Coroutine] = None, + a_human_input_function: Optional[callable] = None, + a_human_input_timeout: Optional[int] = 60, connection_id: Optional[str] = None, ) -> None: """ @@ -611,6 +861,9 @@ def __new__( work_dir (str): The working directory. clear_work_dir (bool): If set to True, clears the working directory. send_message_function (Optional[callable]): The function to send messages. + a_send_message_function (Optional[Coroutine]): Async coroutine to send messages. + a_human_input_function (Optional[callable]): Async coroutine to prompt for user input. + a_human_input_timeout (Optional[int]): A time (in seconds) to wait for user input. After this time, the a_human_input_function will timeout and end the conversation. connection_id (Optional[str]): The connection identifier. """ if isinstance(workflow, str): @@ -631,6 +884,9 @@ def __new__( work_dir=work_dir, clear_work_dir=clear_work_dir, send_message_function=send_message_function, + a_send_message_function=a_send_message_function, + a_human_input_function=a_human_input_function, + a_human_input_timeout=a_human_input_timeout, connection_id=connection_id, ) elif self.workflow.get("type") == WorkFlowType.sequential.value: @@ -640,14 +896,32 @@ def __new__( work_dir=work_dir, clear_work_dir=clear_work_dir, send_message_function=send_message_function, + a_send_message_function=a_send_message_function, + a_human_input_function=a_human_input_function, + a_human_input_timeout=a_human_input_timeout, connection_id=connection_id, ) class ExtendedConversableAgent(autogen.ConversableAgent): - def __init__(self, message_processor=None, *args, **kwargs): + def __init__( + self, + message_processor=None, + a_message_processor=None, + a_human_input_function=None, + a_human_input_timeout: Optional[int] = 60, + connection_id=None, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) self.message_processor = message_processor + self.a_message_processor = a_message_processor + self.a_human_input_function = a_human_input_function + self.a_human_input_response = None + self.a_human_input_timeout = a_human_input_timeout + self.connection_id = connection_id def receive( self, @@ -660,14 +934,79 @@ def receive( self.message_processor(sender, self, message, request_reply, silent, sender_type="agent") super().receive(message, sender, request_reply, silent) + async def a_receive( + self, + message: Union[Dict, str], + sender: autogen.Agent, + request_reply: Optional[bool] = None, + silent: Optional[bool] = False, + ) -> None: + if self.a_message_processor: + await self.a_message_processor(sender, self, message, request_reply, silent, sender_type="agent") + elif self.message_processor: + self.message_processor(sender, self, message, request_reply, silent, sender_type="agent") + await super().a_receive(message, sender, request_reply, silent) + + # Strangely, when the response from a_get_human_input == "" (empty string) the libs call into the + # sync version. I guess that's "just in case", but it's odd because replying with an empty string + # is the intended way for the user to signal the underlying libs that they want to system to go forward + # with whatever function call, tool call or AI generated response the request calls for. Oh well, + # Que Sera Sera. + def get_human_input(self, prompt: str) -> str: + if self.a_human_input_response is None: + return super().get_human_input(prompt) + else: + response = self.a_human_input_response + self.a_human_input_response = None + return response + + async def a_get_human_input(self, prompt: str) -> str: + if self.message_processor and self.a_human_input_function: + message_dict = {"content": prompt, "role": "system", "type": "user-input-request"} + + message_payload = { + "recipient": self.name, + "sender": "system", + "message": message_dict, + "timestamp": datetime.now().isoformat(), + "sender_type": "system", + "connection_id": self.connection_id, + "message_type": "agent_message", + } + + socket_msg = SocketMessage( + type="user_input_request", + data=message_payload, + connection_id=self.connection_id, + ) + self.a_human_input_response = await self.a_human_input_function( + socket_msg.dict(), self.a_human_input_timeout + ) + return self.a_human_input_response -"" + else: + result = await super().a_get_human_input(prompt) + return result class ExtendedGroupChatManager(autogen.GroupChatManager): - def __init__(self, message_processor=None, *args, **kwargs): + def __init__( + self, + message_processor=None, + a_message_processor=None, + a_human_input_function=None, + a_human_input_timeout: Optional[int] = 60, + connection_id=None, + *args, + **kwargs, + ): super().__init__(*args, **kwargs) self.message_processor = message_processor + self.a_message_processor = a_message_processor + self.a_human_input_function = a_human_input_function + self.a_human_input_response = None + self.a_human_input_timeout = a_human_input_timeout + self.connection_id = connection_id def receive( self, @@ -679,3 +1018,49 @@ def receive( if self.message_processor: self.message_processor(sender, self, message, request_reply, silent, sender_type="groupchat") super().receive(message, sender, request_reply, silent) + + async def a_receive( + self, + message: Union[Dict, str], + sender: autogen.Agent, + request_reply: Optional[bool] = None, + silent: Optional[bool] = False, + ) -> None: + if self.a_message_processor: + await self.a_message_processor(sender, self, message, request_reply, silent, sender_type="agent") + elif self.message_processor: + self.message_processor(sender, self, message, request_reply, silent, sender_type="agent") + await super().a_receive(message, sender, request_reply, silent) + + def get_human_input(self, prompt: str) -> str: + if self.a_human_input_response is None: + return super().get_human_input(prompt) + else: + response = self.a_human_input_response + self.a_human_input_response = None + return response + + async def a_get_human_input(self, prompt: str) -> str: + if self.message_processor and self.a_human_input_function: + message_dict = {"content": prompt, "role": "system", "type": "user-input-request"} + + message_payload = { + "recipient": self.name, + "sender": "system", + "message": message_dict, + "timestamp": datetime.now().isoformat(), + "sender_type": "system", + "connection_id": self.connection_id, + "message_type": "agent_message", + } + socket_msg = SocketMessage( + type="user_input_request", + data=message_payload, + connection_id=self.connection_id, + ) + result = await self.a_human_input_function(socket_msg.dict(), self.a_human_input_timeout) + return result + + else: + result = await super().a_get_human_input(prompt) + return result diff --git a/samples/apps/autogen-studio/frontend/src/components/atoms.tsx b/samples/apps/autogen-studio/frontend/src/components/atoms.tsx index a0864153f5a..8f52e60281b 100644 --- a/samples/apps/autogen-studio/frontend/src/components/atoms.tsx +++ b/samples/apps/autogen-studio/frontend/src/components/atoms.tsx @@ -49,7 +49,7 @@ export const SectionHeader = ({ icon, }: IProps) => { return ( -
+

{/* {count !== null && {count}} */} {icon && <>{icon}} @@ -72,6 +72,7 @@ export const IconButton = ({ }: IProps) => { return ( { return (