Skip to content

Commit

Permalink
Merge pull request #36 from alkem-io/tidy-up-logs
Browse files Browse the repository at this point in the history
Tidy up logs and always respond to queries
  • Loading branch information
valeksiev authored Aug 6, 2024
2 parents 8e96c34 + 88627e5 commit c240192
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 77 deletions.
90 changes: 43 additions & 47 deletions ai_adapter.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import traceback
import chromadb
import json
from config import config
from langchain.prompts import ChatPromptTemplate
from logger import setup_logger
from utils import history_as_messages, combine_documents
from utils import history_as_messages, combine_documents, load_knowledge
from prompts import (
expert_system_prompt,
bok_system_prompt,
Expand All @@ -13,11 +11,23 @@
translator_system_prompt,
condenser_system_prompt,
)
from models import chat_llm, condenser_llm, embed_func
from models import chat_llm, condenser_llm

logger = setup_logger(__name__)


async def invoke(message):
try:
return query_chain(message)
except Exception as inst:
logger.exception(inst)
return {
"answer": "Alkemio's VirtualContributor service is currently unavailable.",
"original_answer": "Alkemio's VirtualContributor service is currently unavailable.",
"sources": [],
}


# how do we handle languages? not all spaces are in Dutch obviously
# translating the question to the data _base language_ should be a separate call
# so the translation could be used for embeddings retrieval
Expand All @@ -42,44 +52,18 @@ async def query_chain(message):
{"question": question, "chat_history": history_as_messages(history)}
)
logger.info(
"Original question is: '%s'; Rephrased question is: '%s"
% (question, result.content)
f"Original question is: '{question}'; Rephrased question is: '{result.content}"
)
question = result.content
else:
logger.info("No history to handle, initial interaction")

knowledge_space_name = "%s-knowledge" % message["bodyOfKnowledgeID"]
context_space_name = "%s-context" % message["contextID"]

logger.info(
"Query chaing invoked for question: %s; spaces are: %s and %s"
% (question, knowledge_space_name, context_space_name)
)

# try to rework those as retreivers
chroma_client = chromadb.HttpClient(host=config["db_host"], port=config["db_port"])
knowledge_collection = chroma_client.get_collection(
knowledge_space_name, embedding_function=embed_func
)
context_collection = chroma_client.get_collection(
context_space_name, embedding_function=embed_func
)
knowledge_docs = knowledge_collection.query(
query_texts=[question], include=["documents", "metadatas"], n_results=4
)
context_docs = context_collection.query(
query_texts=[question], include=["documents", "metadatas"], n_results=4
)
knowledge_docs = load_knowledge(question, message["bodyOfKnowledgeID"])

# logger.info(knowledge_docs["metadatas"])
logger.info(
"Knowledge documents with ids [%s] selected"
% ",".join(list(knowledge_docs["ids"][0]))
)
logger.info(
"Context documents with ids [%s] selected"
% ",".join(list(context_docs["ids"][0]))
)
# TODO bring back the context space usage
# context_docs = load_context(question, message["contextID"])

logger.info("Creating expert prompt. Applying system messages...")
expert_prompt = ChatPromptTemplate.from_messages(
[
("system", expert_system_prompt),
Expand All @@ -88,27 +72,34 @@ async def query_chain(message):
("system", limits_system_prompt),
]
)
logger.info("System messages applied.")
logger.info("Adding history...")
expert_prompt += history_as_messages(history)
logger.info("History added.")
logger.info("Adding last question...")
expert_prompt.append(("human", "{question}"))

logger.info("Last question added.")
expert_chain = expert_prompt | chat_llm

if knowledge_docs["documents"] and knowledge_docs["metadatas"]:

if knowledge_docs["ids"] and knowledge_docs["metadatas"]:
logger.info("Invoking expert chain...")
result = expert_chain.invoke(
{
"question": question,
"knowledge": combine_documents(knowledge_docs),
}
)
json_result = {}
logger.info(f"Expert chain invoked. Result is `{str(result.content)}`")
try:
json_result = json.loads(result.content)
# try to parse a valid JSON response from the main expert engine
except Exception as inst:
# if not log the error and use the result of the engine as plain string
logger.error(inst)
logger.error(traceback.format_exc())
json_result = json.loads(str(result.content))
logger.info("Engine chain returned valid JSON.")
except:
# not an actual error; behaviour is semi-expected
logger.info(
"Engine chain returned invalid JSON. Falling back to default result schema."
)
json_result = {
"answer": result.content,
"original_answer": result.content,
Expand All @@ -121,21 +112,26 @@ async def query_chain(message):
and "answer_language" in json_result
and json_result["human_language"] != json_result["answer_language"]
):
target_lang = json_result["human_language"]
logger.info(
f"Creating translsator chain. Human language is {target_lang}; answer language is {json_result['answer_language']}"
)
translator_prompt = ChatPromptTemplate.from_messages(
[("system", translator_system_prompt), ("human", "{text}")]
)

translator_chain = translator_prompt | chat_llm

translation_result = translator_chain.invoke(
{
"target_language": json_result["human_language"],
"target_language": target_lang,
"text": json_result["answer"],
}
)
json_result["original_answer"] = json_result.pop("answer")
json_result["answer"] = translation_result.content
logger.info(f"Translation completed. Result is: {json_result['answer']}")
else:
logger.info("Translation not needed or impossible.")
json_result["original_answer"] = json_result["answer"]

source_scores = json_result.pop("source_scores")
Expand Down
31 changes: 31 additions & 0 deletions db_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import chromadb
from config import config
from logger import setup_logger

logger = setup_logger(__name__)


class DbClient(object):
def __init__(self):
self.client = chromadb.HttpClient(
host=config["db_host"], port=config["db_port"]
)

def __new__(cls):
if not hasattr(cls, "instance"):
try:
cls.instance = super(DbClient, cls).__new__(cls)
except Exception as inst:
logger.error("Error connecting to vector db.")
logger.exception(inst)

return cls.instance

def query_docs(self, query, collection, embed_func, num_docs=4):
collection = self.client.get_collection(
collection, embedding_function=embed_func
)

return collection.query(
query_texts=[query], include=["documents", "metadatas"], n_results=num_docs
)
26 changes: 19 additions & 7 deletions logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,33 @@
import sys
import io
import os
from config import config, local_path, LOG_LEVEL
from config import local_path, LOG_LEVEL


def setup_logger(name):
logger = logging.getLogger(name)
assert LOG_LEVEL in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
assert LOG_LEVEL in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
logger.setLevel(getattr(logging, LOG_LEVEL)) # Set logger level

c_handler = logging.StreamHandler(io.TextIOWrapper(sys.stdout.buffer, line_buffering=True))
f_handler = logging.FileHandler(os.path.join(os.path.expanduser(local_path), 'app.log'))
c_handler = logging.StreamHandler(
io.TextIOWrapper(sys.stdout.buffer, line_buffering=True)
)
f_handler = logging.FileHandler(
os.path.join(os.path.expanduser(local_path), "app.log")
)

c_handler.setLevel(level=getattr(logging, LOG_LEVEL))
f_handler.setLevel(logging.WARNING)

c_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m-%d %H:%M:%S')
f_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m-%d %H:%M:%S')
c_format = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
"%m-%d %H:%M:%S",
)
f_format = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
"%m-%d %H:%M:%S",
)

c_handler.setFormatter(c_format)
f_handler.setFormatter(f_format)

Expand All @@ -25,4 +37,4 @@ def setup_logger(name):

logger.info(f"log level {os.path.basename(__file__)}: {LOG_LEVEL}")

return logger
return logger
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "virtual-contributor-engine-expert"
version = "0.5.1"
version = "0.6.0"
description = "Alkemio Generative AI Virtul Persona"
authors = ["Alkemio BV <[email protected]>"]
license = "EUPL-1.2"
Expand Down
43 changes: 40 additions & 3 deletions utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import re
from db_client import DbClient
from models import embed_func
from logger import setup_logger

logger = setup_logger(__name__)


def clear_tags(message):
Expand All @@ -7,17 +12,49 @@ def clear_tags(message):

def entry_as_message(entry):
if entry["role"] == "human":
return "%s: %s" % ("Human", clear_tags(entry["content"]))
return "%s: %s" % ("Assistant", clear_tags(entry["content"]))
return f"Human: {clear_tags(entry['content'])}"
return f"Assistant: {clear_tags(entry['content'])}"


def history_as_messages(history):
return "\n".join(list(map(entry_as_message, history)))


def log_docs(docs, purpose):
if docs:
ids = list(docs["ids"][0])
logger.info(f"{purpose} documents with ids [{','.join(ids)}] selected")


def load_context(query, contextId):
collection_name = f"{contextId}-context"
docs = load_documents(query, collection_name)
log_docs(docs, "Context")
return docs


def load_knowledge(query, knowledgeId):
collection_name = f"{knowledgeId}-knowledge"
docs = load_documents(query, collection_name)
log_docs(docs, "Knowledge")
return docs


def load_documents(query, collection_name, num_docs=4):
try:
db_client = DbClient()
return db_client.query_docs(query, collection_name, embed_func, num_docs)
except Exception as inst:
logger.error(
f"Error querying collection {collection_name} for question `{query}`"
)
logger.exception(inst)
return {}


def combine_documents(docs, document_separator="\n\n"):
chunks_array = []
for index, document in enumerate(docs["documents"][0]):
chunks_array.append("[source:%s] %s" % (index, document))
chunks_array.append(f"[source:{index}] {document}")

return document_separator.join(chunks_array)
Loading

0 comments on commit c240192

Please sign in to comment.