diff --git a/application/api/schemas.py b/application/api/schemas.py index 2e70a7b7..edf6c44e 100644 --- a/application/api/schemas.py +++ b/application/api/schemas.py @@ -67,11 +67,7 @@ class SQLSearchResult(BaseModel): class TaskSQLSearchResult(BaseModel): sub_task_query: str - sql: str - sql_data: list[Any] - data_show_type: str - sql_gen_process: str - data_analyse: str + sql_search_result: SQLSearchResult class KnowledgeSearchResult(BaseModel): diff --git a/application/api/service.py b/application/api/service.py index 952d95b9..e6f06bb4 100644 --- a/application/api/service.py +++ b/application/api/service.py @@ -8,6 +8,7 @@ from nlq.business.nlq_chain import NLQChain from nlq.business.profile import ProfileManagement from nlq.business.vector_store import VectorStore +from nlq.business.log_store import LogManagement from utils.apis import get_sql_result_tool from utils.database import get_db_url_dialect from nlq.business.suggested_question import SuggestedQuestionManagement as sqm @@ -16,6 +17,7 @@ generate_suggested_question, data_visualization from utils.opensearch import get_retrieve_opensearch from utils.text_search import normal_text_search, agent_text_search +from utils.tool import generate_log_id, get_current_time from .schemas import Question, Answer, Example, Option, SQLSearchResult, AgentSearchResult, KnowledgeSearchResult, \ TaskSQLSearchResult from .exception_handler import BizException @@ -179,6 +181,10 @@ def ask(question: Question) -> Answer: filter_deep_dive_sql_result = [] + log_id = generate_log_id() + current_time = get_current_time() + log_info = "" + all_profiles = ProfileManagement.get_all_profiles_with_info() database_profile = all_profiles[selected_profile] @@ -188,7 +194,7 @@ def ask(question: Question) -> Answer: sql_gen_process="", data_analyse="") - agent_search_response = AgentSearchResult(agent_summary="", agent_sql_search_result=[], sub_search_task=[]) + agent_search_response = AgentSearchResult(agent_summary="", agent_sql_search_result=[]) knowledge_search_result = KnowledgeSearchResult(knowledge_response="") @@ -232,6 +238,8 @@ def ask(question: Question) -> Answer: answer = Answer(query=search_box, query_intent="reject_search", knowledge_search_result=knowledge_search_result, sql_search_result=sql_search_result, agent_search_result=agent_search_response, suggested_question=[]) + LogManagement.add_log_to_database(log_id=log_id, profile_name=selected_profile, sql="", query=search_box, + intent="reject_search", log_info="", time_str=current_time) return answer elif search_intent_flag: normal_search_result = normal_text_search(search_box, model_type, @@ -246,6 +254,10 @@ def ask(question: Question) -> Answer: knowledge_search_result=knowledge_search_result, sql_search_result=sql_search_result, agent_search_result=agent_search_response, suggested_question=[]) + + LogManagement.add_log_to_database(log_id=log_id, profile_name=selected_profile, sql="", query=search_box, + intent="knowledge_search", log_info=knowledge_search_result.knowledge_response, + time_str=current_time) return answer else: @@ -298,6 +310,12 @@ def ask(question: Question) -> Answer: # sql_search_result.sql_data = [list(search_intent_result["data"].columns)] + search_intent_result[ # "data"].values.tolist() + log_info = search_intent_result["error_info"] + ";" + sql_search_result.data_analyse + LogManagement.add_log_to_database(log_id=log_id, profile_name=selected_profile, sql=sql_search_result.sql, query=search_box, + intent="normal_search", + log_info=log_info, + time_str=current_time) + answer = Answer(query=search_box, query_intent="normal_search", knowledge_search_result=knowledge_search_result, sql_search_result=sql_search_result, agent_search_result=agent_search_response, suggested_question=generate_suggested_question_list) @@ -311,13 +329,30 @@ def ask(question: Question) -> Answer: orient='records') filter_deep_dive_sql_result.append(agent_search_result[i]) each_task_sql_res = [list(each_task_res["data"].columns)] + each_task_res["data"].values.tolist() + + model_select_type, show_select_data = data_visualization(model_type, agent_search_result[i]["query"], + each_task_res["data"], + database_profile['prompt_map']) + + each_task_sql_response = agent_search_result[i]["response"] + sub_task_sql_result = SQLSearchResult(sql_data=show_select_data, sql=each_task_res["sql"], data_show_type=model_select_type, + sql_gen_process=each_task_sql_response, + data_analyse="") + each_task_sql_search_result = TaskSQLSearchResult(sub_task_query=agent_search_result[i]["query"], - sql_data=each_task_sql_res, - sql=each_task_res["sql"], data_show_type="table", - sql_gen_process="", - data_analyse="") + sql_search_result=sub_task_sql_result) agent_sql_search_result.append(each_task_sql_search_result) + sub_search_task.append(agent_search_result[i]["query"]) + log_info = "" + else: + log_info = agent_search_result[i]["query"] + "The SQL error Info: " + each_task_res["error_info"] + "。" + log_id = generate_log_id() + LogManagement.add_log_to_database(log_id=log_id, profile_name=selected_profile, sql=each_task_res["sql"], + query=search_box + "; The sub task is " + agent_search_result[i]["query"], + intent="agent_search", + log_info=log_info, + time_str=current_time) agent_data_analyse_result = data_analyse_tool(model_type, prompt_map, search_box, json.dumps(filter_deep_dive_sql_result, ensure_ascii=False), "agent") diff --git a/application/nlq/business/log_store.py b/application/nlq/business/log_store.py new file mode 100644 index 00000000..e33316ae --- /dev/null +++ b/application/nlq/business/log_store.py @@ -0,0 +1,13 @@ +import logging + +from nlq.data_access.dynamo_query_log import DynamoQueryLogDao + +logger = logging.getLogger(__name__) + + +class LogManagement: + query_log_dao = DynamoQueryLogDao() + + @classmethod + def add_log_to_database(cls, log_id, profile_name, sql, query, intent, log_info, time_str): + cls.query_log_dao.add_log(log_id, profile_name, sql, query, intent, log_info, time_str) diff --git a/application/nlq/data_access/dynamo_query_log.py b/application/nlq/data_access/dynamo_query_log.py index d93abfcf..02060d64 100644 --- a/application/nlq/data_access/dynamo_query_log.py +++ b/application/nlq/data_access/dynamo_query_log.py @@ -11,7 +11,7 @@ DYNAMODB_AWS_REGION = os.environ.get('DYNAMODB_AWS_REGION') -class DynamoQueryLog: +class DynamoQueryLogEntity: def __init__(self, log_id, profile_name, sql, query, intent, log_info, time_str): self.log_id = log_id self.profile_name = profile_name @@ -101,7 +101,14 @@ def create_table(self): raise def add(self, entity): - self.table.put_item(Item=entity.to_dict()) + try: + self.table.put_item(Item=entity.to_dict()) + except Exception as e: + logger.error("add log entity is error {}",e) def update(self, entity): self.table.put_item(Item=entity.to_dict()) + + def add_log(self, log_id, profile_name, sql, query, intent, log_info, time_str): + entity = DynamoQueryLogEntity(log_id, profile_name, sql, query, intent, log_info, time_str) + self.add(entity) diff --git a/application/utils/constant.py b/application/utils/constant.py index 5bbe2a4b..c4076291 100644 --- a/application/utils/constant.py +++ b/application/utils/constant.py @@ -2,6 +2,5 @@ PROFILE_QUESTION_TABLE_NAME = 'NlqSuggestedQuestion' DEFAULT_PROMPT_NAME = 'suggested_question_prompt_default' ACTIVE_PROMPT_NAME = 'suggested_question_prompt_active' -BEDROCK_MODEL_IDS = ['anthropic.claude-3-sonnet-20240229-v1:0', 'anthropic.claude-3-opus-20240229-v1:0', - 'anthropic.claude-3-haiku-20240307-v1:0', 'mistral.mixtral-8x7b-instruct-v0:1', - 'meta.llama3-70b-instruct-v1:0'] \ No newline at end of file +BEDROCK_MODEL_IDS = ['anthropic.claude-3-sonnet-20240229-v1:0', 'anthropic.claude-3-haiku-20240307-v1:0', + 'mistral.mixtral-8x7b-instruct-v0:1', 'meta.llama3-70b-instruct-v1:0'] \ No newline at end of file diff --git a/application/utils/tool.py b/application/utils/tool.py index e3584974..f6a9e43a 100644 --- a/application/utils/tool.py +++ b/application/utils/tool.py @@ -1,12 +1,12 @@ import logging +import time +import random +from datetime import datetime logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - - - def get_generated_sql(generated_sql_response): sql = "" try: @@ -14,4 +14,31 @@ def get_generated_sql(generated_sql_response): except IndexError: logger.error("No SQL found in the LLM's response") logger.error(generated_sql_response) - return sql \ No newline at end of file + return sql + + +def generate_log_id(): + # 获取当前时间戳,精确到微秒 + timestamp = int(time.time() * 1000000) + # 添加随机数以增加唯一性 + random_part = random.randint(0, 9999) + # 拼接时间戳和随机数生成logID + log_id = f"{timestamp}{random_part:04d}" + return log_id + + +def get_current_time(): + # 获取当前时间 + now = datetime.now() + # 格式化时间,包括毫秒部分 + # 注意:strftime默认不直接支持毫秒,需要单独处理 + formatted_time = now.strftime('%Y-%m-%d %H:%M:%S') + return formatted_time + + +def get_generated_sql_explain(generated_sql_response): + index = generated_sql_response.find("") + if index != -1: + return generated_sql_response[index + len(""):] + else: + return generated_sql_response