diff --git a/.github/workflows/workflows/pr-agent.yaml b/.github/workflows/workflows/pr-agent.yaml new file mode 100644 index 00000000..79c6db90 --- /dev/null +++ b/.github/workflows/workflows/pr-agent.yaml @@ -0,0 +1,9 @@ +name: AI PR Agent + +on: + pull_request: + types: [opened, reopened, ready_for_review] + +jobs: + pr_agent_job: + uses: SolaceDev/ai-build-actions/.github/workflows/ai_pr.yaml@use_sonnet_3_5 \ No newline at end of file diff --git a/.pr_agent.toml b/.pr_agent.toml new file mode 100644 index 00000000..1b057f6e --- /dev/null +++ b/.pr_agent.toml @@ -0,0 +1,134 @@ +[config] +model="bedrock/anthropic.claude-3-5-sonnet-20240620-v1:0" +model_turbo="bedrock/anthropic.claude-3-5-sonnet-20240620-v1:0" +fallback_models="[bedrock/anthropic.claude-3-sonnet-20240229-v1:0]" +git_provider="github" +publish_output=true +publish_output_progress=false +verbosity_level=2 # 0,1,2 +use_extra_bad_extensions=false +use_wiki_settings_file=true +use_repo_settings_file=true +use_global_settings_file=true +ai_timeout=120 # 2minutes +max_description_tokens = 800 +max_commits_tokens = 500 +max_model_tokens = 64000 # Limits the maximum number of tokens that can be used by any model, regardless of the model's default capabilities. +patch_extra_lines = 200 +secret_provider="google_cloud_storage" +cli_mode=false +ai_disclaimer_title="" # Pro feature, title for a collapsible disclaimer to AI outputs +ai_disclaimer="" # Pro feature, full text for the AI disclaimer + +[pr_reviewer] # /review # +# enable/disable features +require_score_review=false +require_tests_review=true +require_estimate_effort_to_review=true +require_can_be_split_review=false +# soc2 +require_soc2_ticket=false +soc2_ticket_prompt="Does the PR description include a link to ticket in a project management system (e.g., Jira, Asana, Trello, etc.) ?" +# general options +num_code_suggestions=4 +inline_code_comments = true +ask_and_reflect=false +#automatic_review=true +persistent_comment=true +extra_instructions = "" +final_update_message = true +# review labels +enable_review_labels_security=true +enable_review_labels_effort=true +# specific configurations for incremental review (/review -i) +require_all_thresholds_for_incremental_review=false +minimal_commits_for_incremental_review=0 +minimal_minutes_for_incremental_review=0 +enable_help_text=true # Determines whether to include help text in the PR review. Enabled by default. +# auto approval +enable_auto_approval=false +maximal_review_effort=5 + +[pr_description] # /describe # +publish_labels=true +add_original_user_description=true +keep_original_user_title=true +generate_ai_title=false +use_bullet_points=true +extra_instructions = "" +enable_pr_type=true +final_update_message = true +enable_help_text=false +enable_help_comment=false +# describe as comment +publish_description_as_comment=false +publish_description_as_comment_persistent=true +## changes walkthrough section +enable_semantic_files_types=true +collapsible_file_list='adaptive' # true, false, 'adaptive' +inline_file_summary=false # false, true, 'table' +# markers +use_description_markers=false +include_generated_by_header=true + +[pr_code_suggestions] # /improve # +max_context_tokens=8000 +num_code_suggestions=4 +commitable_code_suggestions = false +extra_instructions = "" +rank_suggestions = false +enable_help_text=true +persistent_comment=false +# params for '/improve --extended' mode +auto_extended_mode=true +num_code_suggestions_per_chunk=5 +max_number_of_calls = 3 +parallel_calls = true +rank_extended_suggestions = false +final_clip_factor = 0.8 + +[pr_add_docs] # /add_docs # +extra_instructions = "" +docs_style = "Sphinx Style" # "Google Style with Args, Returns, Attributes...etc", "Numpy Style", "Sphinx Style", "PEP257", "reStructuredText" + +[pr_update_changelog] # /update_changelog # +push_changelog_changes=false +extra_instructions = "" + +[pr_analyze] # /analyze # + +[pr_test] # /test # +extra_instructions = "" +testing_framework = "" # specify the testing framework you want to use +num_tests=3 # number of tests to generate. max 5. +avoid_mocks=true # if true, the generated tests will prefer to use real objects instead of mocks +file = "" # in case there are several components with the same name, you can specify the relevant file +class_name = "" # in case there are several methods with the same name in the same file, you can specify the relevant class name +enable_help_text=true + +[pr_improve_component] # /improve_component # +num_code_suggestions=4 +extra_instructions = "" +file = "" # in case there are several components with the same name, you can specify the relevant file +class_name = "" + +[checks] # /checks (pro feature) # +enable_auto_checks_feedback=true +excluded_checks_list=["lint"] # list of checks to exclude, for example: ["check1", "check2"] +persistent_comment=true +enable_help_text=true + +[pr_help] # /help # + +[pr_config] # /config # + +[github] +# The type of deployment to create. Valid values are 'app' or 'user'. +deployment_type = "user" +ratelimit_retries = 5 +base_url = "https://api.github.com" +publish_inline_comments_fallback_with_verification = true +try_fix_invalid_inline_comments = true + +[litellm] +drop_params = true \ No newline at end of file diff --git a/docs/components/index.md b/docs/components/index.md index c55fcc9d..f599aa1b 100644 --- a/docs/components/index.md +++ b/docs/components/index.md @@ -17,6 +17,8 @@ | [langchain_vector_store_delete](langchain_vector_store_delete.md) | This component allows for entries in a LangChain Vector Store to be deleted. This is needed for the continued maintenance of the vector store. Due to the nature of langchain vector stores, you need to specify an embedding component even though it is not used in this component. | | [langchain_vector_store_embedding_index](langchain_vector_store_embedding_index.md) | Use LangChain Vector Stores to index text for later semantic searches. This will take text, run it through an embedding model and then store it in a vector database. | | [langchain_vector_store_embedding_search](langchain_vector_store_embedding_search.md) | Use LangChain Vector Stores to search a vector store with a semantic search. This will take text, run it through an embedding model with a query embedding and then find the closest matches in the store. | +| [litellm_chat_model](litellm_chat_model.md) | LiteLLM chat model component | +| [litellm_chat_model_with_history](litellm_chat_model_with_history.md) | LiteLLM model handler component with conversation history | | [message_filter](message_filter.md) | A filtering component. This will apply a user configurable expression. If the expression evaluates to True, the message will be passed on. If the expression evaluates to False, the message will be discarded. If the message is discarded, any previous components that require an acknowledgement will be acknowledged. | | [openai_chat_model](openai_chat_model.md) | OpenAI chat model component | | [openai_chat_model_with_history](openai_chat_model_with_history.md) | OpenAI chat model component with conversation history | @@ -30,3 +32,5 @@ | [websearch_bing](websearch_bing.md) | Perform a search query on Bing. | | [websearch_duckduckgo](websearch_duckduckgo.md) | Perform a search query on DuckDuckGo. | | [websearch_google](websearch_google.md) | Perform a search query on Google. | +| [websocket_input](websocket_input.md) | Listen for incoming messages on a websocket connection. | +| [websocket_output](websocket_output.md) | Send messages to a websocket connection. | diff --git a/docs/components/litellm_chat_model.md b/docs/components/litellm_chat_model.md new file mode 100644 index 00000000..5dd4cf6c --- /dev/null +++ b/docs/components/litellm_chat_model.md @@ -0,0 +1,84 @@ +# LiteLLMChatModel + +LiteLLM chat model component + +## Configuration Parameters + +```yaml +component_name: +component_module: litellm_chat_model +component_config: + action: + load_balancer: + embedding_params: + temperature: + stream_to_flow: + stream_to_next_component: + llm_mode: + stream_batch_size: + set_response_uuid_in_user_properties: + history_max_turns: + history_max_time: + history_max_turns: + history_max_time: +``` + +| Parameter | Required | Default | Description | +| --- | --- | --- | --- | +| action | True | inference | The action to perform (e.g., 'inference', 'embedding') | +| load_balancer | False | | Add a list of models to load balancer. | +| embedding_params | False | | LiteLLM model parameters. The model, api_key and base_url are mandatory.find more models at https://docs.litellm.ai/docs/providersfind more parameters at https://docs.litellm.ai/docs/completion/input | +| temperature | False | 0.7 | Sampling temperature to use | +| stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. This is mutually exclusive with stream_to_next_component. | +| stream_to_next_component | False | False | Whether to stream the output to the next component in the flow. This is mutually exclusive with stream_to_flow. | +| llm_mode | False | none | The mode for streaming results: 'sync' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | +| stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. | +| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. | +| history_max_turns | False | 10 | Maximum number of conversation turns to keep in history | +| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) | +| history_max_turns | False | 10 | Maximum number of conversation turns to keep in history | +| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) | + + +## Component Input Schema + +``` +{ + messages: [ + { + role: , + content: + }, + ... + ], + clear_history_but_keep_depth: +} +``` +| Field | Required | Description | +| --- | --- | --- | +| messages | True | | +| messages[].role | True | | +| messages[].content | True | | +| clear_history_but_keep_depth | False | Clear history but keep the last N messages. If 0, clear all history. If not set, do not clear history. | + + +## Component Output Schema + +``` +{ + content: , + chunk: , + response_uuid: , + first_chunk: , + last_chunk: , + streaming: +} +``` +| Field | Required | Description | +| --- | --- | --- | +| content | True | The generated response from the model | +| chunk | False | The current chunk of the response | +| response_uuid | False | The UUID of the response | +| first_chunk | False | Whether this is the first chunk of the response | +| last_chunk | False | Whether this is the last chunk of the response | +| streaming | False | Whether this is a streaming response | diff --git a/docs/components/litellm_chat_model_with_history.md b/docs/components/litellm_chat_model_with_history.md new file mode 100644 index 00000000..8c5ea582 --- /dev/null +++ b/docs/components/litellm_chat_model_with_history.md @@ -0,0 +1,84 @@ +# LiteLLMChatModelWithHistory + +LiteLLM model handler component with conversation history + +## Configuration Parameters + +```yaml +component_name: +component_module: litellm_chat_model_with_history +component_config: + action: + load_balancer: + embedding_params: + temperature: + stream_to_flow: + stream_to_next_component: + llm_mode: + stream_batch_size: + set_response_uuid_in_user_properties: + history_max_turns: + history_max_time: + history_max_turns: + history_max_time: +``` + +| Parameter | Required | Default | Description | +| --- | --- | --- | --- | +| action | True | inference | The action to perform (e.g., 'inference', 'embedding') | +| load_balancer | False | | Add a list of models to load balancer. | +| embedding_params | False | | LiteLLM model parameters. The model, api_key and base_url are mandatory.find more models at https://docs.litellm.ai/docs/providersfind more parameters at https://docs.litellm.ai/docs/completion/input | +| temperature | False | 0.7 | Sampling temperature to use | +| stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. This is mutually exclusive with stream_to_next_component. | +| stream_to_next_component | False | False | Whether to stream the output to the next component in the flow. This is mutually exclusive with stream_to_flow. | +| llm_mode | False | none | The mode for streaming results: 'sync' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | +| stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. | +| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. | +| history_max_turns | False | 10 | Maximum number of conversation turns to keep in history | +| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) | +| history_max_turns | False | 10 | Maximum number of conversation turns to keep in history | +| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) | + + +## Component Input Schema + +``` +{ + messages: [ + { + role: , + content: + }, + ... + ], + clear_history_but_keep_depth: +} +``` +| Field | Required | Description | +| --- | --- | --- | +| messages | True | | +| messages[].role | True | | +| messages[].content | True | | +| clear_history_but_keep_depth | False | Clear history but keep the last N messages. If 0, clear all history. If not set, do not clear history. | + + +## Component Output Schema + +``` +{ + content: , + chunk: , + response_uuid: , + first_chunk: , + last_chunk: , + streaming: +} +``` +| Field | Required | Description | +| --- | --- | --- | +| content | True | The generated response from the model | +| chunk | False | The current chunk of the response | +| response_uuid | False | The UUID of the response | +| first_chunk | False | Whether this is the first chunk of the response | +| last_chunk | False | Whether this is the last chunk of the response | +| streaming | False | Whether this is a streaming response | diff --git a/docs/components/openai_chat_model.md b/docs/components/openai_chat_model.md index b9cc6125..e41c6692 100644 --- a/docs/components/openai_chat_model.md +++ b/docs/components/openai_chat_model.md @@ -56,9 +56,19 @@ component_config: ``` { - content: + content: , + chunk: , + response_uuid: , + first_chunk: , + last_chunk: , + streaming: } ``` | Field | Required | Description | | --- | --- | --- | | content | True | The generated response from the model | +| chunk | False | The current chunk of the response | +| response_uuid | False | The UUID of the response | +| first_chunk | False | Whether this is the first chunk of the response | +| last_chunk | False | Whether this is the last chunk of the response | +| streaming | False | Whether this is a streaming response | diff --git a/docs/components/openai_chat_model_with_history.md b/docs/components/openai_chat_model_with_history.md index c72f818e..9c7c4dc3 100644 --- a/docs/components/openai_chat_model_with_history.md +++ b/docs/components/openai_chat_model_with_history.md @@ -62,9 +62,19 @@ component_config: ``` { - content: + content: , + chunk: , + response_uuid: , + first_chunk: , + last_chunk: , + streaming: } ``` | Field | Required | Description | | --- | --- | --- | | content | True | The generated response from the model | +| chunk | False | The current chunk of the response | +| response_uuid | False | The UUID of the response | +| first_chunk | False | Whether this is the first chunk of the response | +| last_chunk | False | Whether this is the last chunk of the response | +| streaming | False | Whether this is a streaming response | diff --git a/docs/components/websocket_input.md b/docs/components/websocket_input.md new file mode 100644 index 00000000..44bb2a1b --- /dev/null +++ b/docs/components/websocket_input.md @@ -0,0 +1,39 @@ +# WebsocketInput + +Listen for incoming messages on a websocket connection. + +## Configuration Parameters + +```yaml +component_name: +component_module: websocket_input +component_config: + listen_port: + serve_html: + html_path: + payload_encoding: + payload_format: +``` + +| Parameter | Required | Default | Description | +| --- | --- | --- | --- | +| listen_port | False | | Port to listen on (optional) | +| serve_html | False | False | Serve the example HTML file | +| html_path | False | examples/websocket/websocket_example_app.html | Path to the HTML file to serve | +| payload_encoding | False | none | Encoding for the payload (utf-8, base64, gzip, none) | +| payload_format | False | json | Format for the payload (json, yaml, text) | + + + +## Component Output Schema + +``` +{ + payload: { + + } +} +``` +| Field | Required | Description | +| --- | --- | --- | +| payload | True | The decoded JSON payload received from the WebSocket | diff --git a/docs/components/websocket_output.md b/docs/components/websocket_output.md new file mode 100644 index 00000000..d631dd96 --- /dev/null +++ b/docs/components/websocket_output.md @@ -0,0 +1,40 @@ +# WebsocketOutput + +Send messages to a websocket connection. + +## Configuration Parameters + +```yaml +component_name: +component_module: websocket_output +component_config: + listen_port: + serve_html: + html_path: + payload_encoding: + payload_format: +``` + +| Parameter | Required | Default | Description | +| --- | --- | --- | --- | +| listen_port | False | | Port to listen on (optional) | +| serve_html | False | False | Serve the example HTML file | +| html_path | False | examples/websocket/websocket_example_app.html | Path to the HTML file to serve | +| payload_encoding | False | none | Encoding for the payload (utf-8, base64, gzip, none) | +| payload_format | False | json | Format for the payload (json, yaml, text) | + + +## Component Input Schema + +``` +{ + payload: { + + }, + socket_id: +} +``` +| Field | Required | Description | +| --- | --- | --- | +| payload | True | The payload to be sent via WebSocket | +| socket_id | False | Identifier for the WebSocket connection | diff --git a/examples/llm/litellm_chat.yaml b/examples/llm/litellm_chat.yaml new file mode 100644 index 00000000..83ba283a --- /dev/null +++ b/examples/llm/litellm_chat.yaml @@ -0,0 +1,117 @@ +# This process will create a flow where the LiteLLM agent distributes requests across multiple LLM models. +# Solace -> LiteLLM -> Solace +# +# It will subscribe to `demo/question` and expect an event with the payload: +# +# The input message has the following schema: +# { +# "text": "" +# } +# +# Output is published to the topic `demo/question/response` +# +# It will then send an event back to Solace with the topic: `demo/question/response` +# +# Dependencies: +# pip install litellm +# +# required ENV variables: +# - OPENAI_API_KEY +# - OPENAI_API_ENDPOINT +# - OPENAI_MODEL_NAME +# - ANTHROPIC_MODEL_NAME +# - ANTHROPIC_API_KEY +# - ANTHROPIC_API_ENDPOINT +# - SOLACE_BROKER_URL +# - SOLACE_BROKER_USERNAME +# - SOLACE_BROKER_PASSWORD +# - SOLACE_BROKER_VPN +# +# Supported models: OpenAI, Anthropic, Azure, Huggingface, Ollama, Google VertexAI +# More models are available in https://docs.litellm.ai/docs/providers +# Note: For most models, the model provider’s name should be used as a prefix for the model name (e.g. azure/chatgpt-v-2) + +--- +log: + stdout_log_level: INFO + log_file_level: DEBUG + log_file: solace_ai_connector.log + +shared_config: + - broker_config: &broker_connection + broker_type: solace + broker_url: ${SOLACE_BROKER_URL} + broker_username: ${SOLACE_BROKER_USERNAME} + broker_password: ${SOLACE_BROKER_PASSWORD} + broker_vpn: ${SOLACE_BROKER_VPN} + +# Take from input broker and publish back to Solace +flows: + # broker input processing + - name: Simple template to LLM + components: + # Input from a Solace broker + - component_name: solace_sw_broker + component_module: broker_input + component_config: + <<: *broker_connection + broker_queue_name: demo_question + broker_subscriptions: + - topic: demo/question + qos: 1 + payload_encoding: utf-8 + payload_format: json + + # + # Do an LLM request + # + - component_name: llm_request + component_module: litellm_chat_model + component_config: + llm_mode: none # options: none or stream + load_balancer: + - model_name: "gpt-4o" # model alias + litellm_params: + model: ${OPENAI_MODEL_NAME} + api_key: ${OPENAI_API_KEY} + api_base: ${OPENAI_API_ENDPOINT} + temperature: 0.01 + # add any other parameters here + - model_name: "claude-3-5-sonnet" # model alias + litellm_params: + model: ${ANTHROPIC_MODEL_NAME} + api_key: ${ANTHROPIC_API_KEY} + api_base: ${ANTHROPIC_API_ENDPOINT} + # add any other parameters here + # add more models here + input_transforms: + - type: copy + source_expression: | + template:You are a helpful AI assistant. Please help with the user's request below: + + {{text://input.payload:text}} + + dest_expression: user_data.llm_input:messages.0.content + - type: copy + source_expression: static:user + dest_expression: user_data.llm_input:messages.0.role + input_selection: + source_expression: user_data.llm_input + + # Send response back to broker + - component_name: send_response + component_module: broker_output + component_config: + <<: *broker_connection + payload_encoding: utf-8 + payload_format: json + copy_user_properties: true + input_transforms: + - type: copy + source_expression: previous + dest_expression: user_data.output:payload + - type: copy + source_expression: template:{{text://input.topic}}/response + dest_expression: user_data.output:topic + input_selection: + source_expression: user_data.output diff --git a/examples/llm/litellm_chat_with_history.yaml b/examples/llm/litellm_chat_with_history.yaml new file mode 100644 index 00000000..3fcb96a8 --- /dev/null +++ b/examples/llm/litellm_chat_with_history.yaml @@ -0,0 +1,121 @@ +# This process will establish the following flow where the LiteLLM agent retains the history of questions and answers. +# Solace -> LiteLLM -> Solace +# +# It will subscribe to `demo/question` and expect an event with the payload: +# +# The input message has the following schema: +# { +# "text": "" +# "session_id": "" +# } +# +# Output is published to the topic `demo/question/response` +# +# It will then send an event back to Solace with the topic: `demo/question/response` +# +# Dependencies: +# pip install litellm +# +# Required ENV variables: +# - OPENAI_API_KEY +# - OPENAI_API_ENDPOINT +# - OPENAI_MODEL_NAME +# - ANTHROPIC_MODEL_NAME +# - ANTHROPIC_API_KEY +# - ANTHROPIC_API_ENDPOINT +# - SOLACE_BROKER_URL +# - SOLACE_BROKER_USERNAME +# - SOLACE_BROKER_PASSWORD +# - SOLACE_BROKER_VPN +# +# Supported models: OpenAI, Anthropic, Azure, Huggingface, Ollama, Google VertexAI +# More models are available in https://docs.litellm.ai/docs/providers +# Note: For most models, the model provider’s name should be used as a prefix for the model name (e.g. azure/chatgpt-v-2) + + +--- +log: + stdout_log_level: INFO + log_file_level: DEBUG + log_file: solace_ai_connector.log + +shared_config: + - broker_config: &broker_connection + broker_type: solace + broker_url: ${SOLACE_BROKER_URL} + broker_username: ${SOLACE_BROKER_USERNAME} + broker_password: ${SOLACE_BROKER_PASSWORD} + broker_vpn: ${SOLACE_BROKER_VPN} + +# Take from input broker and publish back to Solace +flows: + # broker input processing + - name: Simple template to LLM + components: + # Input from a Solace broker + - component_name: solace_sw_broker + component_module: broker_input + component_config: + <<: *broker_connection + broker_queue_name: demo_question67 + broker_subscriptions: + - topic: demo/question + qos: 1 + payload_encoding: utf-8 + payload_format: json + + # + # Do an LLM request + # + - component_name: llm_request + component_module: litellm_chat_model_with_history + component_config: + load_balancer: + - model_name: "gpt-4o" # model alias + litellm_params: + model: ${OPENAI_MODEL_NAME} + api_key: ${OPENAI_API_KEY} + api_base: ${OPENAI_API_ENDPOINT} + temperature: 0.01 + # add any other parameters here + - model_name: "claude-3-5-sonnet" # model alias + litellm_params: + model: ${ANTHROPIC_MODEL_NAME} + api_key: ${ANTHROPIC_API_KEY} + api_base: ${ANTHROPIC_API_ENDPOINT} + # add any other parameters here + # add more models here + input_transforms: + - type: copy + source_expression: | + template:You are a helpful AI assistant. Please help with the user's request below: + + {{text://input.payload:text}} + + dest_expression: user_data.llm_input:messages.0.content + - type: copy + source_expression: static:user + dest_expression: user_data.llm_input:messages.0.role + - type: copy + source_expression: input.payload:session_id + dest_expression: user_data.llm_input:session_id + input_selection: + source_expression: user_data.llm_input + + # Send response back to broker + - component_name: send_response + component_module: broker_output + component_config: + <<: *broker_connection + payload_encoding: utf-8 + payload_format: json + copy_user_properties: true + input_transforms: + - type: copy + source_expression: previous + dest_expression: user_data.output:payload + - type: copy + source_expression: template:{{text://input.topic}}/response + dest_expression: user_data.output:topic + input_selection: + source_expression: user_data.output diff --git a/examples/llm/litellm_embedding.yaml b/examples/llm/litellm_embedding.yaml new file mode 100644 index 00000000..37848593 --- /dev/null +++ b/examples/llm/litellm_embedding.yaml @@ -0,0 +1,106 @@ +# This below flow embeds the input text and sends it back to the Solace broker. +# Solace -> LiteLLM -> Solace +# +# It will subscribe to `demo/question` and expect an event with the payload: +# +# The input message has the following schema: +# { +# "items": ["item1", "item2", ...] +# } +# +# Output is published to the topic `demo/question/response` +# +# It will then send an event back to Solace with the topic: `demo/question/response` +# +# Dependencies: +# pip install litellm +# +# required ENV variables: +# - OPENAI_EMBEDDING_MODEL_NAME +# - OPENAI_API_KEY +# - OPENAI_API_ENDPOINT +# - AZURE_EMBEDDING_MODEL_NAME +# - AZURE_API_KEY +# - AZURE_API_ENDPOINT +# - SOLACE_BROKER_URL +# - SOLACE_BROKER_USERNAME +# - SOLACE_BROKER_PASSWORD +# - SOLACE_BROKER_VPN +# +# Supported models: OpenAI, Azure and Huggingface +# More models are available in https://docs.litellm.ai/docs/providers + +--- +log: + stdout_log_level: INFO + log_file_level: DEBUG + log_file: solace_ai_connector.log + +shared_config: + - broker_config: &broker_connection + broker_type: solace + broker_url: ${SOLACE_BROKER_URL} + broker_username: ${SOLACE_BROKER_USERNAME} + broker_password: ${SOLACE_BROKER_PASSWORD} + broker_vpn: ${SOLACE_BROKER_VPN} + +# Take from input broker and publish back to Solace +flows: + # broker input processing + - name: Simple template to LLM + components: + # Input from a Solace broker + - component_name: solace_sw_broker + component_module: broker_input + component_config: + <<: *broker_connection + broker_queue_name: demo_question434 + broker_subscriptions: + - topic: demo/question + qos: 1 + payload_encoding: utf-8 + payload_format: json + + # + # Do an LLM request + # + - component_name: llm_request + component_module: litellm_embeddings + component_config: + load_balancer: + - model_name: "text-embedding-ada-002" # model alias + litellm_params: + model: ${OPENAI_EMBEDDING_MODEL_NAME} + api_key: ${OPENAI_API_KEY} + api_base: ${OPENAI_API_ENDPOINT} + # add any other parameters here + - model_name: "text-embedding-3-large" # model alias + itellm_params: + model: ${AZURE_EMBEDDING_MODEL_NAME} + api_key: ${AZURE_API_KEY} + api_base: ${AZURE_API_ENDPOINT} + # add any other parameters here + input_transforms: + - type: copy + source_expression: input.payload + dest_expression: user_data.llm_input:items + input_selection: + source_expression: user_data.llm_input:items + + # Send response back to broker + - component_name: send_response + component_module: broker_output + component_config: + <<: *broker_connection + payload_encoding: utf-8 + payload_format: json + copy_user_properties: true + input_transforms: + - type: copy + source_expression: previous + dest_expression: user_data.output:payload + - type: copy + source_expression: template:{{text://input.topic}}/response + dest_expression: user_data.output:topic + input_selection: + source_expression: user_data.output diff --git a/src/solace_ai_connector/common/utils.py b/src/solace_ai_connector/common/utils.py index 9050bdc4..aaf9ee4f 100755 --- a/src/solace_ai_connector/common/utils.py +++ b/src/solace_ai_connector/common/utils.py @@ -125,11 +125,11 @@ def import_module(module, base_path=None, component_package=None): ".components", ".components.general", ".components.general.for_testing", - ".components.general.langchain", - ".components.general.openai", + ".components.general.llm.langchain", + ".components.general.llm.openai", + ".components.general.llm.litellm", ".components.general.websearch", ".components.inputs_outputs", - ".components.general.filter", ".transforms", ".common", ]: diff --git a/src/solace_ai_connector/components/__init__.py b/src/solace_ai_connector/components/__init__.py index e9e54b94..d20da981 100755 --- a/src/solace_ai_connector/components/__init__.py +++ b/src/solace_ai_connector/components/__init__.py @@ -25,7 +25,7 @@ give_ack_output, ) -from .general.langchain import ( +from .general.llm.langchain import ( langchain_embeddings, langchain_vector_store_delete, langchain_chat_model, @@ -34,6 +34,12 @@ langchain_vector_store_embedding_search, ) +from .general.llm.litellm import ( + litellm_chat_model, + litellm_embeddings, + litellm_chat_model_with_history, +) + from .general.websearch import ( websearch_duckduckgo, websearch_google, @@ -57,20 +63,19 @@ from .general.iterate import Iterate from .general.message_filter import MessageFilter from .general.parser import Parser -from .general.langchain.langchain_base import LangChainBase -from .general.langchain.langchain_embeddings import LangChainEmbeddings -from .general.langchain.langchain_vector_store_delete import LangChainVectorStoreDelete -from .general.langchain.langchain_chat_model import LangChainChatModel -from .general.langchain.langchain_chat_model_with_history import ( +from .general.llm.langchain.langchain_base import LangChainBase +from .general.llm.langchain.langchain_embeddings import LangChainEmbeddings +from .general.llm.langchain.langchain_vector_store_delete import LangChainVectorStoreDelete +from .general.llm.langchain.langchain_chat_model import LangChainChatModel +from .general.llm.langchain.langchain_chat_model_with_history import ( LangChainChatModelWithHistory, ) -from .general.langchain.langchain_vector_store_embedding_index import ( +from .general.llm.langchain.langchain_vector_store_embedding_index import ( LangChainVectorStoreEmbeddingsIndex, ) -from .general.langchain.langchain_vector_store_embedding_search import ( +from .general.llm.langchain.langchain_vector_store_embedding_search import ( LangChainVectorStoreEmbeddingsSearch, ) from .general.websearch.websearch_duckduckgo import WebSearchDuckDuckGo from .general.websearch.websearch_google import WebSearchGoogle -from .general.websearch.websearch_bing import WebSearchBing - +from .general.websearch.websearch_bing import WebSearchBing \ No newline at end of file diff --git a/src/solace_ai_connector/components/general/llm/common/chat_history_handler.py b/src/solace_ai_connector/components/general/llm/common/chat_history_handler.py new file mode 100644 index 00000000..c811f6eb --- /dev/null +++ b/src/solace_ai_connector/components/general/llm/common/chat_history_handler.py @@ -0,0 +1,86 @@ +"""Generic chat history handler.""" + +import time +from ....component_base import ComponentBase +from .....common.log import log + +class ChatHistoryHandler(ComponentBase): + def __init__(self, info, **kwargs): + super().__init__(info, **kwargs) + self.history_max_turns = self.get_config("history_max_turns", 10) + self.history_max_time = self.get_config("history_max_time", 3600) + self.history_key = f"{self.flow_name}_{self.name}_history" + + # Set up hourly timer for history cleanup + self.add_timer(3600000, "history_cleanup", interval_ms=3600000) + + def prune_history(self, session_id, history): + current_time = time.time() + if current_time - history[session_id]["last_accessed"] > self.history_max_time: + history[session_id]["messages"] = [] + elif len(history[session_id]["messages"]) > self.history_max_turns * 2: + history[session_id]["messages"] = history[session_id]["messages"][ + -self.history_max_turns * 2 : + ] + log.debug(f"Pruned history for session {session_id}") + self.make_history_start_with_user_message(session_id, history) + + def clear_history_but_keep_depth(self, session_id: str, depth: int, history): + if session_id in history: + messages = history[session_id]["messages"] + # If the depth is 0, then clear all history + if depth == 0: + history[session_id]["messages"] = [] + history[session_id]["last_accessed"] = time.time() + return + + # Check if the history is already shorter than the depth + if len(messages) <= depth: + # Do nothing, since the history is already shorter than the depth + return + + # If the message at depth is not a user message, then + # increment the depth until a user message is found + while depth < len(messages) and messages[-depth]["role"] != "user": + depth += 1 + history[session_id]["messages"] = messages[-depth:] + history[session_id]["last_accessed"] = time.time() + + # In the unlikely case that the history starts with a non-user message, + # remove it + self.make_history_start_with_user_message(session_id, history) + log.info(f"Cleared history for session {session_id}") + + def make_history_start_with_user_message(self, session_id, history): + if session_id in history: + messages = history[session_id]["messages"] + if messages: + if messages[0]["role"] == "system": + # Start from the second message if the first is "system" + start_index = 1 + else: + # Start from the first message otherwise + start_index = 0 + + while ( + start_index < len(messages) + and messages[start_index]["role"] != "user" + ): + messages.pop(start_index) + + def handle_timer_event(self, timer_data): + if timer_data["timer_id"] == "history_cleanup": + self.history_age_out() + + def history_age_out(self): + with self.get_lock(self.history_key): + history = self.kv_store_get(self.history_key) or {} + current_time = time.time() + for session_id in list(history.keys()): + if ( + current_time - history[session_id]["last_accessed"] + > self.history_max_time + ): + del history[session_id] + log.info(f"Removed history for session {session_id}") + self.kv_store_set(self.history_key, history) diff --git a/src/solace_ai_connector/components/general/langchain/__init__.py b/src/solace_ai_connector/components/general/llm/langchain/__init__.py similarity index 100% rename from src/solace_ai_connector/components/general/langchain/__init__.py rename to src/solace_ai_connector/components/general/llm/langchain/__init__.py diff --git a/src/solace_ai_connector/components/general/langchain/langchain_base.py b/src/solace_ai_connector/components/general/llm/langchain/langchain_base.py similarity index 94% rename from src/solace_ai_connector/components/general/langchain/langchain_base.py rename to src/solace_ai_connector/components/general/llm/langchain/langchain_base.py index 4ffda1bb..6bf94f46 100644 --- a/src/solace_ai_connector/components/general/langchain/langchain_base.py +++ b/src/solace_ai_connector/components/general/llm/langchain/langchain_base.py @@ -2,8 +2,8 @@ import importlib -from ...component_base import ComponentBase -from ....common.utils import resolve_config_values +from ....component_base import ComponentBase +from .....common.utils import resolve_config_values class LangChainBase(ComponentBase): diff --git a/src/solace_ai_connector/components/general/langchain/langchain_chat_model.py b/src/solace_ai_connector/components/general/llm/langchain/langchain_chat_model.py similarity index 100% rename from src/solace_ai_connector/components/general/langchain/langchain_chat_model.py rename to src/solace_ai_connector/components/general/llm/langchain/langchain_chat_model.py diff --git a/src/solace_ai_connector/components/general/langchain/langchain_chat_model_base.py b/src/solace_ai_connector/components/general/llm/langchain/langchain_chat_model_base.py similarity index 99% rename from src/solace_ai_connector/components/general/langchain/langchain_chat_model_base.py rename to src/solace_ai_connector/components/general/llm/langchain/langchain_chat_model_base.py index bd3b464d..58c7ae5d 100644 --- a/src/solace_ai_connector/components/general/langchain/langchain_chat_model_base.py +++ b/src/solace_ai_connector/components/general/llm/langchain/langchain_chat_model_base.py @@ -5,7 +5,7 @@ from abc import abstractmethod from langchain_core.output_parsers import JsonOutputParser -from ....common.utils import get_obj_text +from .....common.utils import get_obj_text from langchain.schema.messages import ( HumanMessage, SystemMessage, diff --git a/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py b/src/solace_ai_connector/components/general/llm/langchain/langchain_chat_model_with_history.py similarity index 99% rename from src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py rename to src/solace_ai_connector/components/general/llm/langchain/langchain_chat_model_with_history.py index 9885c683..4569c30a 100644 --- a/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py +++ b/src/solace_ai_connector/components/general/llm/langchain/langchain_chat_model_with_history.py @@ -16,7 +16,7 @@ SystemMessage, ) -from ....common.message import Message +from .....common.message import Message from .langchain_chat_model_base import ( LangChainChatModelBase, info_base, diff --git a/src/solace_ai_connector/components/general/langchain/langchain_embeddings.py b/src/solace_ai_connector/components/general/llm/langchain/langchain_embeddings.py similarity index 100% rename from src/solace_ai_connector/components/general/langchain/langchain_embeddings.py rename to src/solace_ai_connector/components/general/llm/langchain/langchain_embeddings.py diff --git a/src/solace_ai_connector/components/general/langchain/langchain_vector_store_delete.py b/src/solace_ai_connector/components/general/llm/langchain/langchain_vector_store_delete.py similarity index 99% rename from src/solace_ai_connector/components/general/langchain/langchain_vector_store_delete.py rename to src/solace_ai_connector/components/general/llm/langchain/langchain_vector_store_delete.py index 667d8828..7d847129 100644 --- a/src/solace_ai_connector/components/general/langchain/langchain_vector_store_delete.py +++ b/src/solace_ai_connector/components/general/llm/langchain/langchain_vector_store_delete.py @@ -3,7 +3,7 @@ # as well, so the configuration for this component will also include the # embedding model configuration -from ....common.log import log +from .....common.log import log from .langchain_vector_store_embedding_base import ( LangChainVectorStoreEmbeddingsBase, ) diff --git a/src/solace_ai_connector/components/general/langchain/langchain_vector_store_embedding_base.py b/src/solace_ai_connector/components/general/llm/langchain/langchain_vector_store_embedding_base.py similarity index 100% rename from src/solace_ai_connector/components/general/langchain/langchain_vector_store_embedding_base.py rename to src/solace_ai_connector/components/general/llm/langchain/langchain_vector_store_embedding_base.py diff --git a/src/solace_ai_connector/components/general/langchain/langchain_vector_store_embedding_index.py b/src/solace_ai_connector/components/general/llm/langchain/langchain_vector_store_embedding_index.py similarity index 100% rename from src/solace_ai_connector/components/general/langchain/langchain_vector_store_embedding_index.py rename to src/solace_ai_connector/components/general/llm/langchain/langchain_vector_store_embedding_index.py diff --git a/src/solace_ai_connector/components/general/langchain/langchain_vector_store_embedding_search.py b/src/solace_ai_connector/components/general/llm/langchain/langchain_vector_store_embedding_search.py similarity index 99% rename from src/solace_ai_connector/components/general/langchain/langchain_vector_store_embedding_search.py rename to src/solace_ai_connector/components/general/llm/langchain/langchain_vector_store_embedding_search.py index 1b974eb0..5295abdf 100644 --- a/src/solace_ai_connector/components/general/langchain/langchain_vector_store_embedding_search.py +++ b/src/solace_ai_connector/components/general/llm/langchain/langchain_vector_store_embedding_search.py @@ -4,7 +4,7 @@ embedding model configuration """ -from ....common.log import log +from .....common.log import log from .langchain_vector_store_embedding_base import ( LangChainVectorStoreEmbeddingsBase, ) diff --git a/src/solace_ai_connector/components/general/openai/__init__.py b/src/solace_ai_connector/components/general/llm/litellm/__init__.py similarity index 100% rename from src/solace_ai_connector/components/general/openai/__init__.py rename to src/solace_ai_connector/components/general/llm/litellm/__init__.py diff --git a/src/solace_ai_connector/components/general/llm/litellm/litellm_base.py b/src/solace_ai_connector/components/general/llm/litellm/litellm_base.py new file mode 100644 index 00000000..bcd5ddf1 --- /dev/null +++ b/src/solace_ai_connector/components/general/llm/litellm/litellm_base.py @@ -0,0 +1,128 @@ +"""Base class for LiteLLM chat models""" + +import uuid +import time +import asyncio +import litellm + +from ....component_base import ComponentBase +from .....common.message import Message +from .....common.log import log + +litellm_info_base = { + "class_name": "LiteLLMChatModelBase", + "description": "Base class for LiteLLM chat models", + "config_parameters": [ + { + "name": "load_balancer", + "required": False, + "description": ( + "Add a list of models to load balancer." + ), + "default": "", + }, + { + "name": "embedding_params", + "required": False, + "description": ( + "LiteLLM model parameters. The model, api_key and base_url are mandatory." + "find more models at https://docs.litellm.ai/docs/providers" + "find more parameters at https://docs.litellm.ai/docs/completion/input" + ), + "default": "", + }, + { + "name": "temperature", + "required": False, + "description": "Sampling temperature to use", + "default": 0.7, + }, + { + "name": "stream_to_flow", + "required": False, + "description": ( + "Name the flow to stream the output to - this must be configured for " + "llm_mode='stream'. This is mutually exclusive with stream_to_next_component." + ), + "default": "", + }, + { + "name": "stream_to_next_component", + "required": False, + "description": ( + "Whether to stream the output to the next component in the flow. " + "This is mutually exclusive with stream_to_flow." + ), + "default": False, + }, + { + "name": "llm_mode", + "required": False, + "description": ( + "The mode for streaming results: 'sync' or 'stream'. 'stream' " + "will just stream the results to the named flow. 'none' will " + "wait for the full response." + ), + "default": "none", + }, + { + "name": "stream_batch_size", + "required": False, + "description": "The minimum number of words in a single streaming result. Default: 15.", + "default": 15, + }, + { + "name": "set_response_uuid_in_user_properties", + "required": False, + "description": ( + "Whether to set the response_uuid in the user_properties of the " + "input_message. This will allow other components to correlate " + "streaming chunks with the full response." + ), + "default": False, + "type": "boolean", + }, + ], +} + + +class LiteLLMBase(ComponentBase): + def __init__(self, module_info, **kwargs): + super().__init__(module_info, **kwargs) + self.init() + self.init_load_balancer() + + def init(self): + litellm.suppress_debug_info = True + self.load_balancer = self.get_config("load_balancer") + self.stream_to_flow = self.get_config("stream_to_flow") + self.stream_to_next_component = self.get_config("stream_to_next_component") + self.llm_mode = self.get_config("llm_mode") + self.stream_batch_size = self.get_config("stream_batch_size") + self.set_response_uuid_in_user_properties = self.get_config( + "set_response_uuid_in_user_properties" + ) + if self.stream_to_flow and self.stream_to_next_component: + raise ValueError( + "stream_to_flow and stream_to_next_component are mutually exclusive" + ) + self.router = None + + def init_load_balancer(self): + """initialize a load balancer""" + try: + self.router = litellm.Router(model_list=self.load_balancer) + log.debug("Load balancer initialized with models: %s", self.load_balancer) + except Exception as e: + raise ValueError(f"Error initializing load balancer: {e}") + + def load_balance(self, messages, stream): + """load balance the messages""" + response = self.router.completion(model=self.load_balancer[0]["model_name"], + messages=messages, stream=stream) + log.debug("Load balancer response: %s", response) + return response + + def invoke(self, message, data): + """invoke the model""" + pass \ No newline at end of file diff --git a/src/solace_ai_connector/components/general/llm/litellm/litellm_chat_model.py b/src/solace_ai_connector/components/general/llm/litellm/litellm_chat_model.py new file mode 100644 index 00000000..5ae9c128 --- /dev/null +++ b/src/solace_ai_connector/components/general/llm/litellm/litellm_chat_model.py @@ -0,0 +1,11 @@ +"""LiteLLM chat model component""" + +from .litellm_chat_model_base import LiteLLMChatModelBase, litellm_chat_info_base + +info = litellm_chat_info_base.copy() +info["class_name"] = "LiteLLMChatModel" +info["description"] = "LiteLLM chat component" + +class LiteLLMChatModel(LiteLLMChatModelBase): + def __init__(self, **kwargs): + super().__init__(info, **kwargs) \ No newline at end of file diff --git a/src/solace_ai_connector/components/general/llm/litellm/litellm_chat_model_base.py b/src/solace_ai_connector/components/general/llm/litellm/litellm_chat_model_base.py new file mode 100644 index 00000000..9aea43cb --- /dev/null +++ b/src/solace_ai_connector/components/general/llm/litellm/litellm_chat_model_base.py @@ -0,0 +1,219 @@ +"""LiteLLM chat model component""" + +from .litellm_base import LiteLLMBase, litellm_info_base +from .....common.log import log + +litellm_chat_info_base = litellm_info_base.copy() +litellm_chat_info_base.update( + { + "class_name": "LiteLLMChatModelBase", + "description": "LiteLLM chat model base component", + "input_schema": { + "type": "object", + "properties": { + "messages": { + "type": "array", + "items": { + "type": "object", + "properties": { + "role": { + "type": "string", + "enum": ["system", "user", "assistant"], + }, + "content": {"type": "string"}, + }, + "required": ["role", "content"], + }, + }, + }, + "required": ["messages"], + }, + "output_schema": { + "type": "object", + "properties": { + "content": { + "type": "string", + "description": "The generated response from the model", + }, + "chunk": { + "type": "string", + "description": "The current chunk of the response", + }, + "response_uuid": { + "type": "string", + "description": "The UUID of the response", + }, + "first_chunk": { + "type": "boolean", + "description": "Whether this is the first chunk of the response", + }, + "last_chunk": { + "type": "boolean", + "description": "Whether this is the last chunk of the response", + }, + "streaming": { + "type": "boolean", + "description": "Whether this is a streaming response", + }, + }, + "required": ["content"], + }, + }, +) + +class LiteLLMChatModelBase(LiteLLMBase): + def __init__(self, info, **kwargs): + super().__init__(info, **kwargs) + + def invoke(self, message, data): + """invoke the model""" + messages = data.get("messages", []) + + if self.llm_mode == "stream": + return self.invoke_stream(message, messages) + else: + return self.invoke_non_stream(messages) + + def invoke_non_stream(self, messages): + """invoke the model without streaming""" + max_retries = 3 + while max_retries > 0: + try: + response = self.load_balance(messages, stream=False) + return {"content": response.choices[0].message.content} + except Exception as e: + log.error("Error invoking LiteLLM: %s", e) + max_retries -= 1 + if max_retries <= 0: + raise e + else: + time.sleep(1) + + def invoke_stream(self, message, messages): + """invoke the model with streaming""" + response_uuid = str(uuid.uuid4()) + if self.set_response_uuid_in_user_properties: + message.set_data("input.user_properties:response_uuid", response_uuid) + + aggregate_result = "" + current_batch = "" + first_chunk = True + + max_retries = 3 + while max_retries > 0: + try: + response = self.load_balance(messages, stream=True) + + for chunk in response: + # If we get any response, then don't retry + max_retries = 0 + if chunk.choices[0].delta.content is not None: + content = chunk.choices[0].delta.content + aggregate_result += content + current_batch += content + if len(current_batch.split()) >= self.stream_batch_size: + if self.stream_to_flow: + self.send_streaming_message( + message, + current_batch, + aggregate_result, + response_uuid, + first_chunk, + False, + ) + elif self.stream_to_next_component: + self.send_to_next_component( + message, + current_batch, + aggregate_result, + response_uuid, + first_chunk, + False, + ) + current_batch = "" + first_chunk = False + except Exception as e: + log.error("Error invoking LiteLLM: %s", e) + max_retries -= 1 + if max_retries <= 0: + raise e + else: + # Small delay before retrying + time.sleep(1) + + if self.stream_to_next_component: + # Just return the last chunk + return { + "content": aggregate_result, + "chunk": current_batch, + "response_uuid": response_uuid, + "first_chunk": first_chunk, + "last_chunk": True, + "streaming": True, + } + + if self.stream_to_flow: + self.send_streaming_message( + message, + current_batch, + aggregate_result, + response_uuid, + first_chunk, + True, + ) + + return {"content": aggregate_result, "response_uuid": response_uuid} + + def send_streaming_message( + self, + input_message, + chunk, + aggregate_result, + response_uuid, + first_chunk=False, + last_chunk=False, + ): + message = Message( + payload={ + "chunk": chunk, + "content": aggregate_result, + "response_uuid": response_uuid, + "first_chunk": first_chunk, + "last_chunk": last_chunk, + "streaming": True, + }, + user_properties=input_message.get_user_properties(), + ) + self.send_to_flow(self.stream_to_flow, message) + + def send_to_next_component( + self, + input_message, + chunk, + aggregate_result, + response_uuid, + first_chunk=False, + last_chunk=False, + ): + message = Message( + payload={ + "chunk": chunk, + "content": aggregate_result, + "response_uuid": response_uuid, + "first_chunk": first_chunk, + "last_chunk": last_chunk, + "streaming": True, + }, + user_properties=input_message.get_user_properties(), + ) + + result = { + "chunk": chunk, + "content": aggregate_result, + "response_uuid": response_uuid, + "first_chunk": first_chunk, + "last_chunk": last_chunk, + "streaming": True, + } + + self.process_post_invoke(result, message) diff --git a/src/solace_ai_connector/components/general/llm/litellm/litellm_chat_model_with_history.py b/src/solace_ai_connector/components/general/llm/litellm/litellm_chat_model_with_history.py new file mode 100644 index 00000000..c98e3535 --- /dev/null +++ b/src/solace_ai_connector/components/general/llm/litellm/litellm_chat_model_with_history.py @@ -0,0 +1,105 @@ +"""LiteLLM chat model component with conversation history""" + +import time + +from .litellm_chat_model_base import LiteLLMChatModelBase, litellm_chat_info_base +from ..common.chat_history_handler import ChatHistoryHandler +from .....common.log import log + +info = litellm_chat_info_base.copy() +info["class_name"] = "LiteLLMChatModelWithHistory" +info["description"] = "LiteLLM model handler component with conversation history" +info["config_parameters"].extend( + [ + { + "name": "history_max_turns", + "required": False, + "description": "Maximum number of conversation turns to keep in history", + "default": 10, + }, + { + "name": "history_max_time", + "required": False, + "description": "Maximum time to keep conversation history (in seconds)", + "default": 3600, + }, + ] +) + +info["input_schema"]["properties"]["clear_history_but_keep_depth"] = { + "type": "integer", + "minimum": 0, + "description": "Clear history but keep the last N messages. If 0, clear all history. If not set, do not clear history.", +} + +class LiteLLMChatModelWithHistory(LiteLLMChatModelBase, ChatHistoryHandler): + def __init__(self, **kwargs): + super().__init__(info, **kwargs) + self.history_max_turns = self.get_config("history_max_turns", 10) + self.history_max_time = self.get_config("history_max_time", 3600) + self.history_key = f"{self.flow_name}_{self.name}_history" + + # Set up hourly timer for history cleanup + self.add_timer(3600000, "history_cleanup", interval_ms=3600000) + + def invoke(self, message, data): + session_id = data.get("session_id") + if not session_id: + raise ValueError("session_id is not provided") + + clear_history_but_keep_depth = data.get("clear_history_but_keep_depth") + try: + if clear_history_but_keep_depth is not None: + clear_history_but_keep_depth = max(0, int(clear_history_but_keep_depth)) + except (TypeError, ValueError): + log.error("Invalid clear_history_but_keep_depth value. Defaulting to 0.") + clear_history_but_keep_depth = 0 + messages = data.get("messages", []) + + with self.get_lock(self.history_key): + history = self.kv_store_get(self.history_key) or {} + if session_id not in history: + history[session_id] = {"messages": [], "last_accessed": time.time()} + + if clear_history_but_keep_depth is not None: + self.clear_history_but_keep_depth( + session_id, clear_history_but_keep_depth, history + ) + + session_history = history[session_id]["messages"] + log.debug(f"Session history: {session_history}") + + # If the passed in messages have a system message and the history's + # first message is a system message, then replace the history's first + # message with the passed in messages' system message + if ( + len(messages) + and messages[0]["role"] == "system" + and len(session_history) + and session_history[0]["role"] == "system" + ): + session_history[0] = messages[0] + session_history.extend(messages[1:]) + else: + session_history.extend(messages) + + history[session_id]["last_accessed"] = time.time() + + self.prune_history(session_id, history) + + response = super().invoke( + message, {"messages": history[session_id]["messages"]} + ) + + # Add the assistant's response to the history + history[session_id]["messages"].append( + { + "role": "assistant", + "content": response["content"], + } + ) + + self.kv_store_set(self.history_key, history) + log.debug(f"Updated history: {history}") + + return response \ No newline at end of file diff --git a/src/solace_ai_connector/components/general/llm/litellm/litellm_embeddings.py b/src/solace_ai_connector/components/general/llm/litellm/litellm_embeddings.py new file mode 100644 index 00000000..7c521700 --- /dev/null +++ b/src/solace_ai_connector/components/general/llm/litellm/litellm_embeddings.py @@ -0,0 +1,51 @@ +"""LiteLLM embedding component""" + +from .litellm_base import LiteLLMBase, litellm_info_base +from .....common.log import log + +info = litellm_info_base.copy() +info.update( + { + "class_name": "LiteLLMEmbeddings", + "description": "Embed text using a LiteLLM model", + "input_schema": { + "type": "object", + "properties": { + "items": { + "type": "array", + "description": "A single element or a list of elements to embed", + }, + }, + "required": ["items"], + }, + "output_schema": { + "type": "object", + "properties": { + "embeddings": { + "type": "array", + "description": ( + "A list of floating point numbers representing the embeddings. " + "Its length is the size of vector that the embedding model produces" + ), + "items": {"type": "float"}, + } + }, + "required": ["embeddings"], + }, + } +) + +class LiteLLMEmbeddings(LiteLLMBase): + def __init__(self, **kwargs): + super().__init__(info, **kwargs) + + def invoke(self, message, data): + """invoke the embedding model""" + items = data.get("items", []) + + response = self.router.embedding(model=self.load_balancer[0]["model_name"], + input=items) + + # Extract the embedding data from the response + embedding_data = response['data'][0]['embedding'] + return {"embeddings": embedding_data} \ No newline at end of file diff --git a/src/solace_ai_connector/components/general/llm/openai/__init__.py b/src/solace_ai_connector/components/general/llm/openai/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/solace_ai_connector/components/general/openai/openai_chat_model.py b/src/solace_ai_connector/components/general/llm/openai/openai_chat_model.py similarity index 100% rename from src/solace_ai_connector/components/general/openai/openai_chat_model.py rename to src/solace_ai_connector/components/general/llm/openai/openai_chat_model.py diff --git a/src/solace_ai_connector/components/general/openai/openai_chat_model_base.py b/src/solace_ai_connector/components/general/llm/openai/openai_chat_model_base.py similarity index 98% rename from src/solace_ai_connector/components/general/openai/openai_chat_model_base.py rename to src/solace_ai_connector/components/general/llm/openai/openai_chat_model_base.py index d0a53d52..beabd07f 100755 --- a/src/solace_ai_connector/components/general/openai/openai_chat_model_base.py +++ b/src/solace_ai_connector/components/general/llm/openai/openai_chat_model_base.py @@ -4,9 +4,9 @@ import time from openai import OpenAI -from ...component_base import ComponentBase -from ....common.message import Message -from ....common.log import log +from ....component_base import ComponentBase +from .....common.message import Message +from .....common.log import log openai_info_base = { "class_name": "OpenAIChatModelBase", diff --git a/src/solace_ai_connector/components/general/openai/openai_chat_model_with_history.py b/src/solace_ai_connector/components/general/llm/openai/openai_chat_model_with_history.py similarity index 54% rename from src/solace_ai_connector/components/general/openai/openai_chat_model_with_history.py rename to src/solace_ai_connector/components/general/llm/openai/openai_chat_model_with_history.py index ba7fe646..e9f0da80 100644 --- a/src/solace_ai_connector/components/general/openai/openai_chat_model_with_history.py +++ b/src/solace_ai_connector/components/general/llm/openai/openai_chat_model_with_history.py @@ -4,6 +4,7 @@ import time from .openai_chat_model_base import OpenAIChatModelBase, openai_info_base +from ..common.chat_history_handler import ChatHistoryHandler info = openai_info_base.copy() info["class_name"] = "OpenAIChatModelWithHistory" @@ -33,7 +34,7 @@ } -class OpenAIChatModelWithHistory(OpenAIChatModelBase): +class OpenAIChatModelWithHistory(OpenAIChatModelBase, ChatHistoryHandler): def __init__(self, **kwargs): super().__init__(info, **kwargs) self.history_max_turns = self.get_config("history_max_turns", 10) @@ -45,6 +46,9 @@ def __init__(self, **kwargs): def invoke(self, message, data): session_id = data.get("session_id") + if not session_id: + raise ValueError("session_id is not provided") + clear_history_but_keep_depth = data.get("clear_history_but_keep_depth") try: if clear_history_but_keep_depth is not None: @@ -98,72 +102,4 @@ def invoke(self, message, data): self.kv_store_set(self.history_key, history) - return response - - def prune_history(self, session_id, history): - current_time = time.time() - if current_time - history[session_id]["last_accessed"] > self.history_max_time: - history[session_id]["messages"] = [] - elif len(history[session_id]["messages"]) > self.history_max_turns * 2: - history[session_id]["messages"] = history[session_id]["messages"][ - -self.history_max_turns * 2 : - ] - self.make_history_start_with_user_message(session_id, history) - - def clear_history_but_keep_depth(self, session_id: str, depth: int, history): - if session_id in history: - messages = history[session_id]["messages"] - # If the depth is 0, then clear all history - if depth == 0: - history[session_id]["messages"] = [] - history[session_id]["last_accessed"] = time.time() - return - - # Check if the history is already shorter than the depth - if len(messages) <= depth: - # Do nothing, since the history is already shorter than the depth - return - - # If the message at depth is not a user message, then - # increment the depth until a user message is found - while depth < len(messages) and messages[-depth]["role"] != "user": - depth += 1 - history[session_id]["messages"] = messages[-depth:] - history[session_id]["last_accessed"] = time.time() - - # In the unlikely case that the history starts with a non-user message, - # remove it - self.make_history_start_with_user_message(session_id, history) - - def make_history_start_with_user_message(self, session_id, history): - if session_id in history: - messages = history[session_id]["messages"] - if messages: - if messages[0]["role"] == "system": - # Start from the second message if the first is "system" - start_index = 1 - else: - # Start from the first message otherwise - start_index = 0 - - while ( - start_index < len(messages) - and messages[start_index]["role"] != "user" - ): - messages.pop(start_index) - - def handle_timer_event(self, timer_data): - if timer_data["timer_id"] == "history_cleanup": - self.history_age_out() - - def history_age_out(self): - with self.get_lock(self.history_key): - history = self.kv_store_get(self.history_key) or {} - current_time = time.time() - for session_id in list(history.keys()): - if ( - current_time - history[session_id]["last_accessed"] - > self.history_max_time - ): - del history[session_id] - self.kv_store_set(self.history_key, history) + return response \ No newline at end of file