diff --git a/src/dfcx_scrapi/core/conversation.py b/src/dfcx_scrapi/core/conversation.py index ba488db3..6bf0ea33 100644 --- a/src/dfcx_scrapi/core/conversation.py +++ b/src/dfcx_scrapi/core/conversation.py @@ -18,6 +18,7 @@ import time import traceback import uuid +import json from typing import Dict, Any from operator import attrgetter @@ -39,8 +40,6 @@ format="[dfcx] %(levelname)s:%(message)s", level=logging.INFO ) -MAX_RETRIES = 3 - class DialogflowConversation(scrapi_base.ScrapiBase): """Class that wraps the SessionsClient to hold end to end conversations @@ -165,17 +164,20 @@ def progress_bar(current, total, bar_length=50, type_="Progress"): ) @staticmethod - def _build_query_params_object(parameters, current_page, disable_webhook): + def _build_query_params_object( + parameters, + current_page, + disable_webhook, + end_user_metadata): + + query_params = types.session.QueryParameters( + disable_webhook=disable_webhook, + current_page=current_page, + ) if parameters: - query_params = types.session.QueryParameters( - disable_webhook=disable_webhook, - parameters=parameters, - current_page=current_page, - ) - else: - query_params = types.session.QueryParameters( - disable_webhook=disable_webhook, current_page=current_page - ) + query_params.parameters = parameters + if end_user_metadata: + query_params.end_user_metadata = end_user_metadata return query_params @@ -301,23 +303,80 @@ def _page_id_mapper(self): self.agent_pages_map = agent_pages_map.reset_index(drop=True) - def _get_reply_results(self, utterance, page_id, results, i): + def _get_reply_results( + self, + send_obj, + page_id, + results, + i): """Get results of single text utterance to CX Agent. Args: - utterance: Text to send to the bot for testing. + send_obj: A dict to send to the bot for testing. page_id: Specified CX Page to send the utterance request to results: Pandas Dataframe to capture and store the results i: Internal tracking for Python Threading """ + response = self.reply( - send_obj={"text": utterance}, current_page=page_id, restart=True + send_obj=send_obj, current_page=page_id, restart=True ) target_page = response["page_name"] - results["target_page"][i] = target_page results["match"][i] = response["match"] + results["response_messages"][i] = response["response_messages"] + + def _extract_send_objs(self, test_set_mapped): + """Extracts send objects from a test set mapped data frame. + + NOTE - This is an internal method used by run_intent_detection to + manage parallel intent detection requests and should not be used as a + standalone function. + Args: + test_set_mapped (pandas.DataFrame): The test set mapped data frame. + + Returns: + list: A list of send objects, each containing 'text', 'params', and + 'end_user_metadata'. + """ + def _convert_to_json(json_in_str): + if not json_in_str: + return None + try: + return json.loads(json_in_str) + except json.JSONDecodeError as e: + raise ValueError(f"""Invalid JSON string: {json_in_str} + \nError details: {e}""") from e + except TypeError as e: + raise ValueError(f"""Unexpected data type: {type(json_in_str)} + \nError details: {e}""") from e + + utterances = test_set_mapped["utterance"].tolist() + inject_params = ( + test_set_mapped["inject_parameters"] + .apply(_convert_to_json) + .tolist() + if "inject_parameters" in test_set_mapped.columns + else [None] * len(test_set_mapped) + ) + end_user_metadata = ( + test_set_mapped["end_user_metadata"] + .apply(_convert_to_json) + .tolist() + if "end_user_metadata" in test_set_mapped.columns + else [None] * len(test_set_mapped) + ) + return [ + { + "text": utterance, + "params": inject_param, + "end_user_metadata": metadata, + } + for (utterance, inject_param, metadata) in zip( + utterances, inject_params, end_user_metadata + ) + ] def _get_intent_detection(self, test_set: pd.DataFrame): """Gets the results of a subset of Intent Detection tests. @@ -334,20 +393,21 @@ def _get_intent_detection(self, test_set: pd.DataFrame): on=["flow_display_name", "page_display_name"], how="left", ) - utterances = list(test_set_mapped["utterance"]) page_ids = list(test_set_mapped["page_id"]) + send_objs = self._extract_send_objs(test_set_mapped) self._validate_test_set_input(test_set_mapped) - threads = [None] * len(utterances) + threads = [None] * len(test_set_mapped) results = { - "target_page": [None] * len(utterances), - "match":[None] * len(utterances), + "target_page": [None] * len(test_set_mapped), + "match":[None] * len(test_set_mapped), + "response_messages": [None] * len(test_set_mapped), } - for i, (utterance, page_id) in enumerate(zip(utterances, page_ids)): + for i, (send_obj, page_id) in enumerate(zip(send_objs, page_ids)): threads[i] = Thread( target=self._get_reply_results, - args=(utterance, page_id, results, i), + args=(send_obj, page_id, results, i), ) threads[i].start() @@ -356,12 +416,12 @@ def _get_intent_detection(self, test_set: pd.DataFrame): test_set_mapped["target_page"] = results["target_page"] test_set_mapped["match"] = results["match"] + test_set_mapped["response_messages"] = results["response_messages"] test_set_mapped = test_set_mapped.drop(columns=["page_id"]) intent_detection = test_set_mapped.copy() return intent_detection - def restart(self): """Starts a new session/conversation for this agent""" self.session_id = uuid.uuid4() @@ -389,7 +449,6 @@ def reply( self, send_obj: Dict[str, str], restart: bool = False, - retries: int = 0, current_page: str = None, checkpoints: bool = False, ): @@ -399,12 +458,12 @@ def reply( send_obj: Dictionary with the following structure: {'text': str, 'params': Dict[str,str], - 'dtmf': str} + 'dtmf': str, + 'end_user_metadata': Dict[str, str],} restart: Boolean flag that determines whether to use the existing session ID or start a new conversation with a new session ID. Passing True will create a new session ID on subsequent calls. Defaults to False. - retries: used for recurse calling this func if API fails current_page: Specify the page id to start the conversation from checkpoints: Boolean flag to enable/disable Checkpoint timer debugging. Defaults to False. @@ -416,9 +475,7 @@ def reply( """ text = send_obj.get("text") send_params = send_obj.get("params") - - if not text: - logging.warning(f"Input Text is empty. {send_obj}") + end_user_metadata = send_obj.get("end_user_metadata") if text and len(text) > 256: logging.warning( @@ -451,7 +508,7 @@ def reply( # Build Query Params object query_params = self._build_query_params_object( - send_params, current_page, disable_webhook + send_params, current_page, disable_webhook, end_user_metadata ) # Build Query Input object @@ -479,7 +536,18 @@ def reply( logging.error("text: %s", text) logging.error("query_params: %s", query_params) logging.error("query_input: %s", query_input) - return {} + return { + "response_messages": ( + f"""---- ERROR --- InternalServerError caught on CX.detect, + {err}""" + ), + "confidence": "", + "page_name": "", + "intent_name": "", + "match_type": "", + "match": None, + "params": "", + } except core_exceptions.ClientError as err: logging.error( @@ -493,13 +561,17 @@ def reply( logging.error("query_params %s", query_params) logging.error("query_input %s", query_input) logging.error(traceback.print_exc()) - retries += 1 - if retries < MAX_RETRIES: - logging.error("retrying") - return self.reply(send_obj, restart=restart, retries=retries) - else: - logging.error("MAX_RETRIES exceeded") - return {} + return { + "response_messages": ( + f"""---- ERROR --- ClientError caught on CX.detect, {err}""" + ), + "confidence": "", + "page_name": "", + "intent_name": "", + "match_type": "", + "match": None, + "params": "", + } if checkpoints: self.checkpoint("<< got response") @@ -523,7 +595,6 @@ def reply( ) else: params = None - reply["response_messages"] = response_messages reply["confidence"] = query_result.intent_detection_confidence reply["page_name"] = query_result.current_page.display_name @@ -578,6 +649,8 @@ def run_intent_detection( - NOTE, when using the Default Start Page of a Flow you must define it as the special display name START_PAGE utterance: str + inject_parameters (optional): str + end_user_metadata (optional): str chunk_size: Determines the number of text requests to send in parallel. This should be adjusted based on your test_set size and the Quota limits set for your GCP project. Default is 300. @@ -593,6 +666,9 @@ def run_intent_detection( detected_intent: str confidence: float target_page: str + response_messages: str + match_type: str + parameters_set: str """ result = pd.DataFrame() @@ -609,6 +685,7 @@ def run_intent_detection( def _unpack_match(self, df: pd.DataFrame): """ Unpacks a 'match' column into four component columns. + if a match column is None, then all four columns will be None. Args: df: dataframe containing a column named match of types.Match @@ -616,19 +693,23 @@ def _unpack_match(self, df: pd.DataFrame): Returns: A copy of df with columns match_type, confidence, parameters_set, and detected_intent instead of match. + """ - df = ( - df - .copy() + return ( + df.copy() .assign( match_type = lambda df: df.match.apply( - attrgetter("match_type._name_")), + lambda m: attrgetter("m.match_type._name_") if m else "" + ), confidence = lambda df: df.match.apply( - attrgetter("confidence")), + lambda m: m.confidence if m else "" + ), parameters_set = lambda df: df.match.apply( - attrgetter("parameters")), + lambda m: m.parameters if m else "" + ), detected_intent = lambda df: df.match.apply( - attrgetter("intent.display_name")) + lambda m: m.intent.display_name if m else "" + ) ) .assign( parameters_set = lambda df: df.parameters_set.apply( @@ -637,4 +718,3 @@ def _unpack_match(self, df: pd.DataFrame): ) .drop(columns="match") ) - return df