Skip to content

Commit

Permalink
Initial proof-of-concept for TeamOne BaseAgent class (#181)
Browse files Browse the repository at this point in the history
* Initial proof-of-concept for TeamOne BaseAgent class

* Fixed most hatch errors.

* Handle final Hatch error

---------

Co-authored-by: gagb <[email protected]>
  • Loading branch information
afourney and gagb authored Jul 8, 2024
1 parent 7d12c70 commit beada02
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 65 deletions.
60 changes: 60 additions & 0 deletions python/teams/team-one/src/team_one/agents/base_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from typing import List, Tuple, Union

from agnext.components import Image, TypeRoutedAgent, message_handler
from agnext.components.models import (
AssistantMessage,
LLMMessage,
UserMessage,
)
from agnext.core import CancellationToken

from team_one.messages import BroadcastMessage, RequestReplyMessage

# Convenience type
UserContent = Union[str, List[Union[str, Image]]]


class BaseAgent(TypeRoutedAgent):
"""An agent that handles the RequestReply and Broadcast messages"""

def __init__(
self,
description: str,
) -> None:
super().__init__(description)
self._chat_history: List[LLMMessage] = []

@message_handler
async def handle_broadcast(self, message: BroadcastMessage, cancellation_token: CancellationToken) -> None:
"""Handle an incoming broadcast message."""
assert isinstance(message.content, UserMessage)
self._chat_history.append(message.content)

@message_handler
async def handle_request_reply(self, message: RequestReplyMessage, cancellation_token: CancellationToken) -> None:
"""Respond to a reply request."""
request_halt, response = await self._generate_reply(cancellation_token)

# Convert the response to an acceptable format for the assistant
if isinstance(response, str):
assistant_message = AssistantMessage(content=response, source=self.metadata["name"])
elif isinstance(response, List):
converted: List[str] = list()
for item in response:
if isinstance(item, str):
converted.append(item.rstrip())
elif isinstance(item, Image):
converted.append("<image>")
else:
raise AssertionError("Unexpected response type.")
assistant_message = AssistantMessage(content="\n".join(converted), source=self.metadata["name"])
else:
raise AssertionError("Unexpected response type.")
self._chat_history.append(assistant_message)

user_message = UserMessage(content=response, source=self.metadata["name"])
await self.publish_message(BroadcastMessage(content=user_message, request_halt=request_halt))

async def _generate_reply(self, cancellation_token: CancellationToken) -> Tuple[bool, UserContent]:
"""Returns (request_halt, response_message)"""
raise NotImplementedError()
59 changes: 13 additions & 46 deletions python/teams/team-one/src/team_one/agents/coder.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
import asyncio
import re
from typing import List, Optional, Union
from typing import List, Optional, Tuple, Union

from agnext.components import TypeRoutedAgent, message_handler
from agnext.components.code_executor import CodeBlock, CodeExecutor, LocalCommandLineCodeExecutor
from agnext.components.models import (
AssistantMessage,
ChatCompletionClient,
LLMMessage,
SystemMessage,
UserMessage,
)
from agnext.core import CancellationToken

from team_one.messages import BroadcastMessage, RequestReplyMessage
from .base_agent import BaseAgent, UserContent


class Coder(TypeRoutedAgent):
class Coder(BaseAgent):
"""An agent that uses tools to write, execute, and debug Python code."""

DEFAULT_DESCRIPTION = "A Python coder assistant."
Expand Down Expand Up @@ -50,44 +46,22 @@ def __init__(
super().__init__(description)
self._model_client = model_client
self._system_messages = system_messages
self._chat_history: List[LLMMessage] = []

@message_handler
async def handle_broadcast(self, message: BroadcastMessage, cancellation_token: CancellationToken) -> None:
"""Handle an incoming broadcast message."""
assert isinstance(message.content, UserMessage)
self._chat_history.append(message.content)

@message_handler
async def handle_request_reply(self, message: RequestReplyMessage, cancellation_token: CancellationToken) -> None:
async def _generate_reply(self, cancellation_token: CancellationToken) -> Tuple[bool, UserContent]:
"""Respond to a reply request."""

# Make an inference to the model.
response = await self._model_client.create(self._system_messages + self._chat_history)
assert isinstance(response.content, str)
self._chat_history.append(AssistantMessage(content=response.content, source=self.metadata["name"]))

await self.publish_message(
BroadcastMessage(
content=UserMessage(content=response.content, source=self.metadata["name"]),
request_halt=("TERMINATE" in response.content),
)
)
return "TERMINATE" in response.content, response.content


class Executor(TypeRoutedAgent):
class Executor(BaseAgent):
def __init__(self, description: str, executor: Optional[CodeExecutor] = None) -> None:
super().__init__(description)
self._executor = executor or LocalCommandLineCodeExecutor()
self._chat_history: List[LLMMessage] = []

@message_handler
async def handle_broadcast(self, message: BroadcastMessage, cancellation_token: CancellationToken) -> None:
"""Handle an incoming broadcast message."""
self._chat_history.append(message.content)

@message_handler
async def handle_request_reply(self, message: RequestReplyMessage, cancellation_token: CancellationToken) -> None:
async def _generate_reply(self, cancellation_token: CancellationToken) -> Tuple[bool, UserContent]:
"""Respond to a reply request."""

# Extract code block from the message.
Expand All @@ -100,21 +74,14 @@ async def handle_request_reply(self, message: RequestReplyMessage, cancellation_
)
cancellation_token.link_future(future)
result = await future

str_result = (
f"The script ran, then exited with Unix exit code: {result.exit_code}\nIts output was:\n{result.output}"
)
await self.publish_message(
BroadcastMessage(content=UserMessage(content=str_result, source=self.metadata["name"]))
return (
False,
f"The script ran, then exited with Unix exit code: {result.exit_code}\nIts output was:\n{result.output}",
)
else:
await self.publish_message(
BroadcastMessage(
content=UserMessage(
content="No code block detected. Please provide a markdown-encoded code block to execute for the original task.",
source=self.metadata["name"],
)
)
return (
False,
"No code block detected. Please provide a markdown-encoded code block to execute for the original task.",
)

def _extract_execution_request(self, markdown_text: str) -> Union[str, None]:
Expand Down
24 changes: 5 additions & 19 deletions python/teams/team-one/src/team_one/agents/user_proxy.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import asyncio
from typing import Tuple

from agnext.components import TypeRoutedAgent, message_handler
from agnext.components.models import UserMessage
from agnext.core import CancellationToken

from team_one.messages import BroadcastMessage, RequestReplyMessage
from .base_agent import BaseAgent, UserContent


class UserProxy(TypeRoutedAgent):
class UserProxy(BaseAgent):
"""An agent that allows the user to play the role of an agent in the conversation."""

DEFAULT_DESCRIPTION = "A human user."
Expand All @@ -18,25 +17,12 @@ def __init__(
) -> None:
super().__init__(description)

@message_handler
async def handle_broadcast(self, message: BroadcastMessage, cancellation_token: CancellationToken) -> None:
"""Handle an incoming broadcast message."""
pass

@message_handler
async def handle_request_reply(self, message: RequestReplyMessage, cancellation_token: CancellationToken) -> None:
async def _generate_reply(self, cancellation_token: CancellationToken) -> Tuple[bool, UserContent]:
"""Respond to a reply request."""

# Make an inference to the model.
response = await self.ainput("User input ('exit' to quit): ")

response = response.strip()

await self.publish_message(
BroadcastMessage(
content=UserMessage(content=response, source=self.metadata["name"]), request_halt=(response == "exit")
)
)
return response == "exit", response

async def ainput(self, prompt: str) -> str:
return await asyncio.to_thread(input, f"{prompt} ")

0 comments on commit beada02

Please sign in to comment.