From dbee1bd16fede7b3026efedd9c0976a19ad4ced4 Mon Sep 17 00:00:00 2001 From: Vladimir Aleksiev Date: Fri, 2 Aug 2024 09:35:18 +0300 Subject: [PATCH 1/7] addinga db client singleton class adding util functions for chroma interacitons adding logs --- ai_adapter.py | 85 ++++++++++++++-------------- logger.py | 26 ++++++--- utils.py | 40 +++++++++++++ virtual_contributor_engine_expert.py | 2 +- 4 files changed, 103 insertions(+), 50 deletions(-) diff --git a/ai_adapter.py b/ai_adapter.py index 972f2af..3fab7c1 100644 --- a/ai_adapter.py +++ b/ai_adapter.py @@ -4,7 +4,7 @@ from config import config from langchain.prompts import ChatPromptTemplate from logger import setup_logger -from utils import history_as_messages, combine_documents +from utils import history_as_messages, combine_documents, load_context, load_knowledge from prompts import ( expert_system_prompt, bok_system_prompt, @@ -18,6 +18,19 @@ logger = setup_logger(__name__) +async def invoke(message): + try: + result = await query_chain(message) + return result + except Exception as inst: + logger.exception(inst) + return { + "answer": "Alkemio's VirtualContributor service is currently unavailable.", + "original_answer": "Alkemio's VirtualContributor service is currently unavailable.", + "sources": [], + } + + # how do we handle languages? not all spaces are in Dutch obviously # translating the question to the data _base language_ should be a separate call # so the translation could be used for embeddings retrieval @@ -46,40 +59,15 @@ async def query_chain(message): % (question, result.content) ) question = result.content + else: + logger.info("No history to handle, initial interaction") - knowledge_space_name = "%s-knowledge" % message["bodyOfKnowledgeID"] - context_space_name = "%s-context" % message["contextID"] - - logger.info( - "Query chaing invoked for question: %s; spaces are: %s and %s" - % (question, knowledge_space_name, context_space_name) - ) - - # try to rework those as retreivers - chroma_client = chromadb.HttpClient(host=config["db_host"], port=config["db_port"]) - knowledge_collection = chroma_client.get_collection( - knowledge_space_name, embedding_function=embed_func - ) - context_collection = chroma_client.get_collection( - context_space_name, embedding_function=embed_func - ) - knowledge_docs = knowledge_collection.query( - query_texts=[question], include=["documents", "metadatas"], n_results=4 - ) - context_docs = context_collection.query( - query_texts=[question], include=["documents", "metadatas"], n_results=4 - ) + knowledge_docs = load_knowledge(question, message["bodyOfKnowledgeID"]) - # logger.info(knowledge_docs["metadatas"]) - logger.info( - "Knowledge documents with ids [%s] selected" - % ",".join(list(knowledge_docs["ids"][0])) - ) - logger.info( - "Context documents with ids [%s] selected" - % ",".join(list(context_docs["ids"][0])) - ) + # TODO bring back the context space usage + # context_docs = load_context(question, message["contextID"]) + logger.info("Creating expert prompt. Applying system messages...") expert_prompt = ChatPromptTemplate.from_messages( [ ("system", expert_system_prompt), @@ -88,13 +76,17 @@ async def query_chain(message): ("system", limits_system_prompt), ] ) + logger.info("System messages applied.") + logger.info("Adding history...") expert_prompt += history_as_messages(history) + logger.info("History added.") + logger.info("Adding last question...") expert_prompt.append(("human", "{question}")) - + logger.info("Last question added added") expert_chain = expert_prompt | chat_llm - if knowledge_docs["documents"] and knowledge_docs["metadatas"]: - + if knowledge_docs["ids"] and knowledge_docs["metadatas"]: + logger.info("Invoking expert chain...") result = expert_chain.invoke( { "question": question, @@ -102,13 +94,16 @@ async def query_chain(message): } ) json_result = {} + logger.info("Expert chain invoked. Result is `%s`" % str(result.content)) try: - json_result = json.loads(result.content) # try to parse a valid JSON response from the main expert engine - except Exception as inst: - # if not log the error and use the result of the engine as plain string - logger.error(inst) - logger.error(traceback.format_exc()) + json_result = json.loads(str(result.content)) + logger.info("Engine chain returned valid JSON.") + except: + # not an actual error; beha viours is semi-expected + logger.info( + "Engine chain returned invalid JSON. Falling back to default result schema." + ) json_result = { "answer": result.content, "original_answer": result.content, @@ -121,21 +116,27 @@ async def query_chain(message): and "answer_language" in json_result and json_result["human_language"] != json_result["answer_language"] ): + target_lang = json_result["human_language"] + logger.info( + "Creating translsator chaing. Human language is %s; answer language is %s" + % (target_lang, json_result["answer_language"]) + ) translator_prompt = ChatPromptTemplate.from_messages( [("system", translator_system_prompt), ("human", "{text}")] ) - translator_chain = translator_prompt | chat_llm translation_result = translator_chain.invoke( { - "target_language": json_result["human_language"], + "target_language": target_lang, "text": json_result["answer"], } ) json_result["original_answer"] = json_result.pop("answer") json_result["answer"] = translation_result.content + logger.info("Translation completed. Result is: %s" % json_result["answer"]) else: + logger.info("Translation not needed or impossible.") json_result["original_answer"] = json_result["answer"] source_scores = json_result.pop("source_scores") diff --git a/logger.py b/logger.py index 092a3d3..8363744 100644 --- a/logger.py +++ b/logger.py @@ -2,21 +2,33 @@ import sys import io import os -from config import config, local_path, LOG_LEVEL +from config import local_path, LOG_LEVEL + def setup_logger(name): logger = logging.getLogger(name) - assert LOG_LEVEL in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] + assert LOG_LEVEL in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] logger.setLevel(getattr(logging, LOG_LEVEL)) # Set logger level - c_handler = logging.StreamHandler(io.TextIOWrapper(sys.stdout.buffer, line_buffering=True)) - f_handler = logging.FileHandler(os.path.join(os.path.expanduser(local_path), 'app.log')) + c_handler = logging.StreamHandler( + io.TextIOWrapper(sys.stdout.buffer, line_buffering=True) + ) + f_handler = logging.FileHandler( + os.path.join(os.path.expanduser(local_path), "app.log") + ) c_handler.setLevel(level=getattr(logging, LOG_LEVEL)) f_handler.setLevel(logging.WARNING) - c_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m-%d %H:%M:%S') - f_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m-%d %H:%M:%S') + c_format = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + "%m-%d %H:%M:%S", + ) + f_format = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + "%m-%d %H:%M:%S", + ) + c_handler.setFormatter(c_format) f_handler.setFormatter(f_format) @@ -25,4 +37,4 @@ def setup_logger(name): logger.info(f"log level {os.path.basename(__file__)}: {LOG_LEVEL}") - return logger \ No newline at end of file + return logger diff --git a/utils.py b/utils.py index b9bb1f3..6c7789a 100644 --- a/utils.py +++ b/utils.py @@ -1,4 +1,12 @@ import re +from db_client import DbClient +from config import config +from models import chat_llm, condenser_llm, embed_func +from logger import setup_logger + +logger = setup_logger(__name__) + +logger.info(config) def clear_tags(message): @@ -15,6 +23,38 @@ def history_as_messages(history): return "\n".join(list(map(entry_as_message, history))) +def log_docs(purpose, docs): + if docs: + ids = list(docs["ids"][0]) + logger.info("%s documents with ids [%s] selected" % (purpose, ",".join(ids))) + + +def load_context(query, contextId): + collection_name = "%s-context" % contextId + docs = load_documents(query, collection_name) + log_docs(docs, "Context") + return docs + + +def load_knowledge(query, knolwedgeId): + collection_name = "%s-knowledge" % knolwedgeId + docs = load_documents(query, collection_name) + log_docs(docs, "Knowledge") + return docs + + +def load_documents(query, collection_name, num_docs=4): + try: + db_client = DbClient() + return db_client.query_docs(query, collection_name, num_docs) + except Exception as inst: + logger.error( + "Error querying collection %s for question %s" % (collection_name, query) + ) + logger.exception(inst) + return {} + + def combine_documents(docs, document_separator="\n\n"): chunks_array = [] for index, document in enumerate(docs["documents"][0]): diff --git a/virtual_contributor_engine_expert.py b/virtual_contributor_engine_expert.py index c5fcc6c..b2d60fa 100644 --- a/virtual_contributor_engine_expert.py +++ b/virtual_contributor_engine_expert.py @@ -73,7 +73,7 @@ async def query(user_id, message_body, language_code): logger.debug(f"\nlanguage: {user_data[user_id]['language']}\n") with get_openai_callback() as cb: - result = await ai_adapter.query_chain(message_body) + result = await ai_adapter.invoke(message_body) logger.debug(f"\nTotal Tokens: {cb.total_tokens}") logger.debug(f"\nPrompt Tokens: {cb.prompt_tokens}") From a87cd9399b8b5eedae85dfd357b1550ae54d5b38 Mon Sep 17 00:00:00 2001 From: Vladimir Aleksiev Date: Fri, 2 Aug 2024 09:35:32 +0300 Subject: [PATCH 2/7] adding the db client class --- db_client.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 db_client.py diff --git a/db_client.py b/db_client.py new file mode 100644 index 0000000..eb6e666 --- /dev/null +++ b/db_client.py @@ -0,0 +1,31 @@ +import chromadb +from config import config +from logger import setup_logger + +logger = setup_logger(__name__) + + +class DbClient(object): + def __init__(self): + self.client = chromadb.HttpClient( + host=config["db_host"], port=config["db_port"] + ) + + def __new__(cls): + if not hasattr(cls, "instance"): + try: + cls.instance = super(DbClient, cls).__new__(cls) + except Exception as inst: + logger.error("Error connecting to vector db.") + logger.exception(inst) + + return cls.instance + + def query_docs(self, query, collection, embed_func, num_docs=4): + collection = self.client.get_collection( + collection, embedding_function=embed_func + ) + + return collection.query( + query_texts=[query], include=["documents", "metadatas"], n_results=num_docs + ) From 716b013ed5793c734e9a16b3f7e1831882b1faa8 Mon Sep 17 00:00:00 2001 From: Vladimir Aleksiev Date: Fri, 2 Aug 2024 11:15:59 +0300 Subject: [PATCH 3/7] fix logging leverls and use f-strings everywhere --- ai_adapter.py | 10 ++++------ utils.py | 22 ++++++++++---------- virtual_contributor_engine_expert.py | 30 ++++++++++++---------------- 3 files changed, 27 insertions(+), 35 deletions(-) diff --git a/ai_adapter.py b/ai_adapter.py index 3fab7c1..9ead0b7 100644 --- a/ai_adapter.py +++ b/ai_adapter.py @@ -55,8 +55,7 @@ async def query_chain(message): {"question": question, "chat_history": history_as_messages(history)} ) logger.info( - "Original question is: '%s'; Rephrased question is: '%s" - % (question, result.content) + f"Original question is: '{question}'; Rephrased question is: '{result.content}" ) question = result.content else: @@ -94,7 +93,7 @@ async def query_chain(message): } ) json_result = {} - logger.info("Expert chain invoked. Result is `%s`" % str(result.content)) + logger.info(f"Expert chain invoked. Result is `{str(result.content)}`") try: # try to parse a valid JSON response from the main expert engine json_result = json.loads(str(result.content)) @@ -118,8 +117,7 @@ async def query_chain(message): ): target_lang = json_result["human_language"] logger.info( - "Creating translsator chaing. Human language is %s; answer language is %s" - % (target_lang, json_result["answer_language"]) + f"Creating translsator chaing. Human language is {target_lang}; answer language is {json_result['answer_language']}" ) translator_prompt = ChatPromptTemplate.from_messages( [("system", translator_system_prompt), ("human", "{text}")] @@ -134,7 +132,7 @@ async def query_chain(message): ) json_result["original_answer"] = json_result.pop("answer") json_result["answer"] = translation_result.content - logger.info("Translation completed. Result is: %s" % json_result["answer"]) + logger.info(f"Translation completed. Result is: {json_result['answer']}") else: logger.info("Translation not needed or impossible.") json_result["original_answer"] = json_result["answer"] diff --git a/utils.py b/utils.py index 6c7789a..dd3f76d 100644 --- a/utils.py +++ b/utils.py @@ -6,8 +6,6 @@ logger = setup_logger(__name__) -logger.info(config) - def clear_tags(message): return re.sub(r"-? ?\[@?.*\]\(.*?\)", "", message).strip() @@ -15,29 +13,29 @@ def clear_tags(message): def entry_as_message(entry): if entry["role"] == "human": - return "%s: %s" % ("Human", clear_tags(entry["content"])) - return "%s: %s" % ("Assistant", clear_tags(entry["content"])) + return f"Human: {clear_tags(entry['content'])}" + return f"Assistant: {clear_tags(entry['content'])}" def history_as_messages(history): return "\n".join(list(map(entry_as_message, history))) -def log_docs(purpose, docs): +def log_docs(docs, purpose): if docs: ids = list(docs["ids"][0]) - logger.info("%s documents with ids [%s] selected" % (purpose, ",".join(ids))) + logger.info(f"{purpose} documents with ids [{','.join(ids)}] selected") def load_context(query, contextId): - collection_name = "%s-context" % contextId + collection_name = f"{contextId}-context" docs = load_documents(query, collection_name) log_docs(docs, "Context") return docs -def load_knowledge(query, knolwedgeId): - collection_name = "%s-knowledge" % knolwedgeId +def load_knowledge(query, knowledgeId): + collection_name = f"{knowledgeId}-knowledge" docs = load_documents(query, collection_name) log_docs(docs, "Knowledge") return docs @@ -46,10 +44,10 @@ def load_knowledge(query, knolwedgeId): def load_documents(query, collection_name, num_docs=4): try: db_client = DbClient() - return db_client.query_docs(query, collection_name, num_docs) + return db_client.query_docs(query, collection_name, embed_func, num_docs) except Exception as inst: logger.error( - "Error querying collection %s for question %s" % (collection_name, query) + f"Error querying collection {collection_name} for question `{query}`" ) logger.exception(inst) return {} @@ -58,6 +56,6 @@ def load_documents(query, collection_name, num_docs=4): def combine_documents(docs, document_separator="\n\n"): chunks_array = [] for index, document in enumerate(docs["documents"][0]): - chunks_array.append("[source:%s] %s" % (index, document)) + chunks_array.append(f"[source:{index}] {document}") return document_separator.join(chunks_array) diff --git a/virtual_contributor_engine_expert.py b/virtual_contributor_engine_expert.py index b2d60fa..6d15305 100644 --- a/virtual_contributor_engine_expert.py +++ b/virtual_contributor_engine_expert.py @@ -59,7 +59,7 @@ async def query(user_id, message_body, language_code): # trim the VC tag message_body["question"] = clear_tags(message_body["question"]) - logger.info(f"\nQuery from user {user_id}: {message_body['question']}\n") + logger.info(f"Query from user {user_id}: {message_body['question']}") if user_id not in user_data: user_data[user_id] = {} @@ -70,23 +70,23 @@ async def query(user_id, message_body, language_code): user_data[user_id]["language"] = language_code - logger.debug(f"\nlanguage: {user_data[user_id]['language']}\n") + logger.debug(f"language: {user_data[user_id]['language']}") with get_openai_callback() as cb: result = await ai_adapter.invoke(message_body) - logger.debug(f"\nTotal Tokens: {cb.total_tokens}") - logger.debug(f"\nPrompt Tokens: {cb.prompt_tokens}") - logger.debug(f"\nCompletion Tokens: {cb.completion_tokens}") - logger.debug(f"\nTotal Cost (USD): ${cb.total_cost}") + logger.debug(f"Total Tokens: {cb.total_tokens}") + logger.debug(f"Prompt Tokens: {cb.prompt_tokens}") + logger.debug(f"Completion Tokens: {cb.completion_tokens}") + logger.debug(f"Total Cost (USD): ${cb.total_cost}") - logger.debug(f"\n\nLLM result: {result}\n\n") + logger.debug(f"LLM result: {result}") user_data[user_id]["chat_history"].save_context( {"question": message_body["question"]}, {"answer": result["answer"]}, ) - logger.debug(f"new chat history {user_data[user_id]['chat_history']}\n") + logger.debug(f"new chat history {user_data[user_id]['chat_history']}") response = { "question": message_body["question"], "prompt_tokens": cb.prompt_tokens, @@ -110,14 +110,10 @@ async def on_request(message: aio_pika.abc.AbstractIncomingMessage): # Parse the message body as JSON body = json.loads(message.body) - logger.info(body) - # Get the user ID from the message body user_id = body["data"]["userID"] - logger.info( - f"\nrequest arriving for user id: {user_id}, deciding what to do\n\n" - ) + logger.info(f"request arriving for user id: {user_id}, deciding what to do") # If there's no lock for this user, create one if user_id not in user_locks: @@ -126,10 +122,10 @@ async def on_request(message: aio_pika.abc.AbstractIncomingMessage): # Check if the lock is locked if user_locks[user_id].locked(): logger.info( - f"existing task running for user id: {user_id}, waiting for it to finish first\n\n" + f"existing task running for user id: {user_id}, waiting for it to finish first" ) else: - logger.info(f"no task running for user id: {user_id}, let's move!\n\n") + logger.info(f"no task running for user id: {user_id}, let's move!") # Acquire the lock for this user async with user_locks[user_id]: @@ -141,7 +137,7 @@ async def process_message(message: aio_pika.abc.AbstractIncomingMessage): body = json.loads(message.body.decode()) user_id = body["data"].get("userID") - logger.info(body) + logger.debug(body) operation = body["pattern"]["cmd"] @@ -151,7 +147,7 @@ async def process_message(message: aio_pika.abc.AbstractIncomingMessage): if operation == "query": if "question" in body["data"]: logger.info( - f"query time for user id: {user_id}, let's call the query() function!\n\n" + f"query time for user id: {user_id}, let's call the query() function!" ) response = await query(user_id, body["data"], "English") else: From 99e2af0dcc9bbcd9d48f571e9b93464d723986fb Mon Sep 17 00:00:00 2001 From: Vladimir Aleksiev Date: Fri, 2 Aug 2024 11:17:15 +0300 Subject: [PATCH 4/7] bump version ot 0.6.0 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 89dc65d..5e3a0be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "virtual-contributor-engine-expert" -version = "0.5.1" +version = "0.6.0" description = "Alkemio Generative AI Virtul Persona" authors = ["Alkemio BV "] license = "EUPL-1.2" From ac06cf7adfd9f4e152fe3a4735e02f450ec1fa35 Mon Sep 17 00:00:00 2001 From: Vladimir Aleksiev Date: Fri, 2 Aug 2024 11:21:12 +0300 Subject: [PATCH 5/7] cleanup unneded imports --- ai_adapter.py | 6 ++---- utils.py | 3 +-- virtual_contributor_engine_expert.py | 1 - 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/ai_adapter.py b/ai_adapter.py index 9ead0b7..d2efc0d 100644 --- a/ai_adapter.py +++ b/ai_adapter.py @@ -1,10 +1,8 @@ -import traceback -import chromadb import json from config import config from langchain.prompts import ChatPromptTemplate from logger import setup_logger -from utils import history_as_messages, combine_documents, load_context, load_knowledge +from utils import history_as_messages, combine_documents, load_knowledge from prompts import ( expert_system_prompt, bok_system_prompt, @@ -13,7 +11,7 @@ translator_system_prompt, condenser_system_prompt, ) -from models import chat_llm, condenser_llm, embed_func +from models import chat_llm, condenser_llm logger = setup_logger(__name__) diff --git a/utils.py b/utils.py index dd3f76d..73b0470 100644 --- a/utils.py +++ b/utils.py @@ -1,7 +1,6 @@ import re from db_client import DbClient -from config import config -from models import chat_llm, condenser_llm, embed_func +from models import embed_func from logger import setup_logger logger = setup_logger(__name__) diff --git a/virtual_contributor_engine_expert.py b/virtual_contributor_engine_expert.py index 6d15305..2384367 100644 --- a/virtual_contributor_engine_expert.py +++ b/virtual_contributor_engine_expert.py @@ -1,4 +1,3 @@ -import re from langchain.callbacks import get_openai_callback from langchain.memory import ConversationBufferWindowMemory import json From ff35192d6c00e21dfaf3707591da4252e7b0cd28 Mon Sep 17 00:00:00 2001 From: Vladimir Aleksiev Date: Fri, 2 Aug 2024 14:52:55 +0300 Subject: [PATCH 6/7] clean up a return and fixing typos --- ai_adapter.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/ai_adapter.py b/ai_adapter.py index d2efc0d..eba77c6 100644 --- a/ai_adapter.py +++ b/ai_adapter.py @@ -15,11 +15,12 @@ logger = setup_logger(__name__) +braek = 10 + async def invoke(message): try: - result = await query_chain(message) - return result + return query_chain(message) except Exception as inst: logger.exception(inst) return { @@ -79,7 +80,7 @@ async def query_chain(message): logger.info("History added.") logger.info("Adding last question...") expert_prompt.append(("human", "{question}")) - logger.info("Last question added added") + logger.info("Last question added.") expert_chain = expert_prompt | chat_llm if knowledge_docs["ids"] and knowledge_docs["metadatas"]: @@ -97,7 +98,7 @@ async def query_chain(message): json_result = json.loads(str(result.content)) logger.info("Engine chain returned valid JSON.") except: - # not an actual error; beha viours is semi-expected + # not an actual error; behaviour is semi-expected logger.info( "Engine chain returned invalid JSON. Falling back to default result schema." ) @@ -115,7 +116,7 @@ async def query_chain(message): ): target_lang = json_result["human_language"] logger.info( - f"Creating translsator chaing. Human language is {target_lang}; answer language is {json_result['answer_language']}" + f"Creating translsator chain. Human language is {target_lang}; answer language is {json_result['answer_language']}" ) translator_prompt = ChatPromptTemplate.from_messages( [("system", translator_system_prompt), ("human", "{text}")] From 88627e595ee952dcc7216c08cfbb677aeef0e9a1 Mon Sep 17 00:00:00 2001 From: Vladimir Aleksiev Date: Fri, 2 Aug 2024 16:18:07 +0300 Subject: [PATCH 7/7] remove unneeded variable --- ai_adapter.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/ai_adapter.py b/ai_adapter.py index eba77c6..9017dd3 100644 --- a/ai_adapter.py +++ b/ai_adapter.py @@ -15,8 +15,6 @@ logger = setup_logger(__name__) -braek = 10 - async def invoke(message): try: