Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChatBedrockConverse sudden interruption and return: TypeError: Additional kwargs key latencyMs already exists in left dict and value has unsupported type <class 'int'>. #223

Closed
DiogoPM9 opened this issue Oct 3, 2024 · 5 comments · Fixed by #275
Assignees
Labels
bedrock bug Something isn't working langgraph
Milestone

Comments

@DiogoPM9
Copy link

DiogoPM9 commented Oct 3, 2024

Requirements:

langchain==0.3.1
langchain-aws==0.2.1
langchain-chroma==0.1.4
langchain-community==0.3.1
langchain-core==0.3.7
langchain-text-splitters==0.3.0
langgraph==0.2.28
langgraph-checkpoint==1.0.9
langsmith==0.1.129

I was developing a multi-agent workflow using Langchain and Langgraph.

One of my requirement was that the LLMs had to be provided by AWS Bedrock.

However, I noticed an issue with ChatBedrockConverse, where the invocation stops due to an issue with merging dictionaries.

As I was debugging the code, I replace my LLM with one from OpenAI and that fixed the issue I had., however, since I have the stated requirement , this is not only not a solution, but seems to identify a problem in langchain-aws specifically.

Code used:

import os
import re
import pandas as pd
import json
from langgraph.prebuilt import ToolNode, tools_condition

from dotenv import load_dotenv
from langchain_chroma import Chroma
from langchain_community.embeddings import BedrockEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFDirectoryLoader
from langchain_core.prompts import MessagesPlaceholder

from typing import Annotated, Literal
from typing_extensions import TypedDict

from IPython.display import Image, display

from langchain_aws import ChatBedrockConverse

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.prompts import MessagesPlaceholder
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.messages.tool import ToolMessage

from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

from langchain_core.tools import tool
from langchain_community.document_loaders.athena import AthenaLoader
from langgraph.checkpoint.memory import MemorySaver

from pydantic import BaseModel

load_dotenv()


from langchain_openai import ChatOpenAI


@tool
def sales_records(
        start_date: str,
        end_date: str,
) -> dict:
    """
    Fetches the sales records

    Args:
        start_date: date to begin the search
        end_date: date to end the search
    Returns:
        a dictionary with the sales
    """
    return {"banana": 14, "apples": 20, "oranges": 2}


@tool
def context_retriever():
    """
    Retrieves the required context for the report.
    Returns:
        Context for the report.
    """
    return "The report must end with: And that is the way the news goes."


mistral = "mistral.mistral-large-2402-v1:0"
anthropic = "anthropic.claude-3-sonnet-20240229-v1:0"
# LLM = ChatBedrockConverse(
#         model=anthropic,
#         region_name=ZONE,
#         credentials_profile_name=PROFILE_NAME)

LLM = ChatOpenAI(
    model="gpt-4o",
    temperature=0,
    max_tokens=None,
    timeout=None,
    max_retries=2,
    api_key=API_KEY, 
)
class State(TypedDict):
    messages: Annotated[list, add_messages]
    feedback: str
    client_data: str
    context: str
    
members =["ToolCaller", "ReportMaker", "FeedbackRequester", "FINISH"]

# Architect
architect_instructions = f"""
You are a supervisor responsible for the delivery of a Sales report. Your workers comprise of {members}

The process to generate a report is as follows:

1. The ToolCaller will query the sales
2. The ToolCaller will retrieve the required context for the report
3. The Report Maker will generate a report
4. The FeedbackRequester asks the user for feedback
5. If there is feedback then call Report Maker again. 
6. The FINISH will end the task in case the user says that there is no feedback

Given the following user request, respond with the worker to act next.
If the worker is ToolCaller you must also add what it needs to do:
Example: ToolCall, to retrieve data from athena. 

For the other workers, only the name is to be generated.
Each worker will perform a task and respond with their results and status.
"""
architect_prompt = ChatPromptTemplate.from_messages([
    ("system", architect_instructions), 
    MessagesPlaceholder(variable_name="messages")])
architect_chain = architect_prompt | LLM

# Report Maker
report_instructions = """
You are responsible for generating a report on the data you receive. 
If there is feedback you must take it into account.
The data: {data}
The context: {context}
Output the following: This is a dummy report.
If there is feedback: {feedback}
Then add the following to the above: There is feedback
"""
report_prompt = ChatPromptTemplate.from_messages(
    [("system", report_instructions), MessagesPlaceholder(variable_name="messages")])
report_chain = report_prompt | LLM


tool_caller_instructions = """
You are an assitant that works together with other agents to produce a report. Your task is to use the tools
you have access to provide the data to the other agents.

You must do everything you are tasked with before ending.
"""
tool_caller_prompt = ChatPromptTemplate.from_messages(
    [("system", tool_caller_instructions), MessagesPlaceholder(variable_name="messages")])
tool_caller_chain = tool_caller_prompt | LLM.bind_tools([sales_records, context_retriever])


# Nodes
def assistant(state: State) -> State:
    print("*************In the assitant node*************")
    result = architect_chain.invoke({"messages": state["messages"]})
    state["messages"].append(result)
    return state

def tool_caller(state: State) -> State:
    print("*************In the Tool Caller node*************")
    last_message = state["messages"][-1]
    output = tool_caller_chain.invoke({"messages": [
        HumanMessage(f"Given the following messages {last_message}, use the appropriate tool you have access to.")]})
    state["messages"].append(output)
    return state

def reporter(state: State) -> State:
    print("*************In the report node*************")
    feedback = feedback = state.get("feedback", "No feedback")
    report = report_chain.invoke({"messages": [HumanMessage("Generate a report given the data, context and feedback provided")],
                                  "context": state["context"],
                                  "data": state["client_data"],
                                  "feedback": feedback})
    state["messages"].append(report)
    return state
    
def human_feedback_node(state: State) -> State:
    print("*************In the human feedback Node*************")
    report_message = state["messages"][-1]    
    while True:
        choice = input("Would you like to add feedback? (Yes or No)")
        if choice in ["Yes", "No"]:
            break
        if choice == "No":
            state["messages"].append(HumanMessage(content="No feedback to add. Finish."))
        print("Invalid choice, please try again (Yes or No)")
        
    if choice == "Yes":
        state["feedback"] = True
        while True:
                feedback = input("Please specify what feedback you would like to add: ").strip()
                if feedback:
                    state["messages"].append(HumanMessage(content=feedback))
                    state["feedback"] = feedback
                    break
                print("Request cannot be empty. Please try again.")
    else:
        state["feedback"] = False
    return state


def general_router(state: State):
    print("*************In the General router Node*************")
    last_message = state["messages"][-1]
    if "ToolCaller" in last_message.content:
        return "ToolCaller"
    elif last_message.content == "ReportMaker":
        return "ReportMaker"
    elif last_message.content == "FeedbackRequester":
        return "FeedbackRequester"
    elif last_message.content in ["There is no feedback.", "FINISH"]:
        return "__end__"

def tool_router(state: State) -> State:
    print("*************In the Tool router Node*************")
    last_message = state["messages"][-1]
    if last_message.tool_calls:
        return "Tools"
    else:
        return "Architect"

def run_tool(state: State, tools: dict) -> State:
    print("*************running the tool*************")
    tool_calls = state["messages"][-1].tool_calls
    for tool_call in tool_calls:
        tool = tools[tool_call["name"]]
        result = tool.invoke(tool_call["args"])
        state["messages"].append(ToolMessage(content=str(result), 
                                             tool_call_id= tool_call["id"]))
        if tool.name == "sales_records":
            state["client_data"] = str(result)
        if tool.name == "context_retriever":
            state["context"] = result
    return state


workflow = StateGraph(State)
workflow.add_node("Architect", assistant)
workflow.add_node("ToolCaller", tool_caller)
workflow.add_node("ReportMaker", reporter)
workflow.add_node("FeedbackRequester", human_feedback_node)
workflow.add_node("Tools", lambda state: run_tool(state, tools={"sales_records":sales_records, "context_retriever": context_retriever}))


workflow.add_edge(START, "Architect")
workflow.add_conditional_edges(
    "Architect",
    general_router,
    {
        "__end__": END,
        "ToolCaller": "ToolCaller",
        "ReportMaker": "ReportMaker",
        "FeedbackRequester": "FeedbackRequester"
    }
)
workflow.add_conditional_edges(
    "ToolCaller",
    tool_router,
    {
        "Architect": "Architect",
        "Tools": "Tools",
    }
)
workflow.add_edge("Tools", "Architect")
workflow.add_edge("ReportMaker", "Architect")
workflow.add_edge("FeedbackRequester", "Architect")

graph = workflow.compile()

events = graph.invoke({"messages": HumanMessage(
    content="Give me a report for the following, start date:1998-01-10 00:00:00.000 and end_date:2005-09-13 00:00:00.000")})

The graph is as below:
image

As you can see, if you choose to use ChatOpenAI, the code will execute smoothly, however, when you change to ChatBerockConverse, the invocation breaks.

When the graph is invoked, the architect successfully call the Tools worker. However, when the architect receives the tool call output, it breaks. The output was the following:

*************In the assitant node*************
*************In the General router Node*************
*************In the Tool Caller node*************
*************In the Tool router Node*************
*************running the tool*************
*************In the assitant node*************
TypeError: Additional kwargs key latencyMs already exists in left dict and value has unsupported type <class 'int'>.
@langcarl langcarl bot added the investigate label Oct 3, 2024
@tomaszbk
Copy link

tomaszbk commented Oct 4, 2024

I have this same Issue

@3coins 3coins added bedrock mistral langgraph bug Something isn't working and removed mistral labels Oct 9, 2024
@3coins
Copy link
Collaborator

3coins commented Oct 12, 2024

@DiogoPM9
Thanks for reporting this bug. I can reproduce the issue in my testing. The issue seems to be because of latencyMs attribute present in the Bedrock responses. While it's ok to for this to be present in the response, it's type int is something not supported in response_metadata when messages are merged, only dict, list or string is allowed. Updating the latencyMs value to a list should fix this issue.

File "/Users/pijain/projects/langchain-aws-dev/langchain-aws/libs/aws/langchain_aws/chat_models/bedrock_converse.py", line 682, in _messages_to_bedrock
    messages = merge_message_runs(messages)
....
File "/Users/pijain/Library/Caches/pypoetry/virtualenvs/langchain-aws-eH7P7gjZ-py3.10/lib/python3.10/site-packages/langchain_core/utils/_merge.py", line 59, in merge_dicts
    merged[right_k] = merge_dicts(merged[right_k], right_v)
  File "/Users/pijain/Library/Caches/pypoetry/virtualenvs/langchain-aws-eH7P7gjZ-py3.10/lib/python3.10/site-packages/langchain_core/utils/_merge.py", line 65, in merge_dicts
    raise TypeError(
TypeError: Additional kwargs key latencyMs already exists in left dict and value has unsupported type <class 'int'>.

@3coins
Copy link
Collaborator

3coins commented Oct 13, 2024

Seems like we will need a change in this function, to update the latencyMs from int to list.

def _parse_response(response: Dict[str, Any]) -> AIMessage:
anthropic_content = _bedrock_to_anthropic(
response.pop("output")["message"]["content"]
)
tool_calls = _extract_tool_calls(anthropic_content)
usage = UsageMetadata(_camel_to_snake_keys(response.pop("usage"))) # type: ignore[misc]
return AIMessage(
content=_str_if_single_text_block(anthropic_content), # type: ignore[arg-type]
usage_metadata=usage,
response_metadata=response,
tool_calls=tool_calls,
)

Here is a simplified solution, however we might need to handle other keys that might cause issue while merging messages as well.

response["metrics"]["latencyMs"] = [response["metrics"]["latencyMs"]]

@DiogoPM9
Copy link
Author

Thank you for having a look at this @3coins! Would you happen to know more or less when this would be fixed?

@3coins 3coins added this to the v0.2.5 milestone Oct 28, 2024
@wuodar
Copy link

wuodar commented Oct 31, 2024

I just faced similar error with ChatBedrockConverse:

Additional kwargs key latencyMs already exists in left dict and value has unsupported type <class 'decimal.Decimal'>.

In this case it's not <class 'int'>, but <class 'decimal.Decimal'> but in general, it's a similar issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bedrock bug Something isn't working langgraph
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants