Skip to content

Commit

Permalink
reworked on DialogflowConversation class
Browse files Browse the repository at this point in the history
  • Loading branch information
jkshj21 authored Aug 26, 2024
1 parent ed20f4a commit 8430b62
Showing 1 changed file with 127 additions and 47 deletions.
174 changes: 127 additions & 47 deletions src/dfcx_scrapi/core/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import time
import traceback
import uuid
import json

from typing import Dict, Any
from operator import attrgetter
Expand All @@ -39,8 +40,6 @@
format="[dfcx] %(levelname)s:%(message)s", level=logging.INFO
)

MAX_RETRIES = 3


class DialogflowConversation(scrapi_base.ScrapiBase):
"""Class that wraps the SessionsClient to hold end to end conversations
Expand Down Expand Up @@ -165,17 +164,20 @@ def progress_bar(current, total, bar_length=50, type_="Progress"):
)

@staticmethod
def _build_query_params_object(parameters, current_page, disable_webhook):
def _build_query_params_object(
parameters,
current_page,
disable_webhook,
end_user_metadata):

query_params = types.session.QueryParameters(
disable_webhook=disable_webhook,
current_page=current_page,
)
if parameters:
query_params = types.session.QueryParameters(
disable_webhook=disable_webhook,
parameters=parameters,
current_page=current_page,
)
else:
query_params = types.session.QueryParameters(
disable_webhook=disable_webhook, current_page=current_page
)
query_params.parameters = parameters
if end_user_metadata:
query_params.end_user_metadata = end_user_metadata

return query_params

Expand Down Expand Up @@ -301,23 +303,80 @@ def _page_id_mapper(self):

self.agent_pages_map = agent_pages_map.reset_index(drop=True)

def _get_reply_results(self, utterance, page_id, results, i):
def _get_reply_results(
self,
send_obj,
page_id,
results,
i):
"""Get results of single text utterance to CX Agent.
Args:
utterance: Text to send to the bot for testing.
send_obj: A dict to send to the bot for testing.
page_id: Specified CX Page to send the utterance request to
results: Pandas Dataframe to capture and store the results
i: Internal tracking for Python Threading
"""

response = self.reply(
send_obj={"text": utterance}, current_page=page_id, restart=True
send_obj=send_obj, current_page=page_id, restart=True
)

target_page = response["page_name"]

results["target_page"][i] = target_page
results["match"][i] = response["match"]
results["response_messages"][i] = response["response_messages"]

def _extract_send_objs(self, test_set_mapped):
"""Extracts send objects from a test set mapped data frame.
NOTE - This is an internal method used by run_intent_detection to
manage parallel intent detection requests and should not be used as a
standalone function.
Args:
test_set_mapped (pandas.DataFrame): The test set mapped data frame.
Returns:
list: A list of send objects, each containing 'text', 'params', and
'end_user_metadata'.
"""
def _convert_to_json(json_in_str):
if not json_in_str:
return None
try:
return json.loads(json_in_str)
except json.JSONDecodeError as e:
raise ValueError(f"""Invalid JSON string: {json_in_str}
\nError details: {e}""") from e
except TypeError as e:
raise ValueError(f"""Unexpected data type: {type(json_in_str)}
\nError details: {e}""") from e

utterances = test_set_mapped["utterance"].tolist()
inject_params = (
test_set_mapped["inject_parameters"]
.apply(_convert_to_json)
.tolist()
if "inject_parameters" in test_set_mapped.columns
else [None] * len(test_set_mapped)
)
end_user_metadata = (
test_set_mapped["end_user_metadata"]
.apply(_convert_to_json)
.tolist()
if "end_user_metadata" in test_set_mapped.columns
else [None] * len(test_set_mapped)
)
return [
{
"text": utterance,
"params": inject_param,
"end_user_metadata": metadata,
}
for (utterance, inject_param, metadata) in zip(
utterances, inject_params, end_user_metadata
)
]

def _get_intent_detection(self, test_set: pd.DataFrame):
"""Gets the results of a subset of Intent Detection tests.
Expand All @@ -334,20 +393,21 @@ def _get_intent_detection(self, test_set: pd.DataFrame):
on=["flow_display_name", "page_display_name"],
how="left",
)
utterances = list(test_set_mapped["utterance"])
page_ids = list(test_set_mapped["page_id"])
send_objs = self._extract_send_objs(test_set_mapped)

self._validate_test_set_input(test_set_mapped)

threads = [None] * len(utterances)
threads = [None] * len(test_set_mapped)
results = {
"target_page": [None] * len(utterances),
"match":[None] * len(utterances),
"target_page": [None] * len(test_set_mapped),
"match":[None] * len(test_set_mapped),
"response_messages": [None] * len(test_set_mapped),
}
for i, (utterance, page_id) in enumerate(zip(utterances, page_ids)):
for i, (send_obj, page_id) in enumerate(zip(send_objs, page_ids)):
threads[i] = Thread(
target=self._get_reply_results,
args=(utterance, page_id, results, i),
args=(send_obj, page_id, results, i),
)
threads[i].start()

Expand All @@ -356,12 +416,12 @@ def _get_intent_detection(self, test_set: pd.DataFrame):

test_set_mapped["target_page"] = results["target_page"]
test_set_mapped["match"] = results["match"]
test_set_mapped["response_messages"] = results["response_messages"]
test_set_mapped = test_set_mapped.drop(columns=["page_id"])
intent_detection = test_set_mapped.copy()

return intent_detection


def restart(self):
"""Starts a new session/conversation for this agent"""
self.session_id = uuid.uuid4()
Expand Down Expand Up @@ -389,7 +449,6 @@ def reply(
self,
send_obj: Dict[str, str],
restart: bool = False,
retries: int = 0,
current_page: str = None,
checkpoints: bool = False,
):
Expand All @@ -399,12 +458,12 @@ def reply(
send_obj: Dictionary with the following structure:
{'text': str,
'params': Dict[str,str],
'dtmf': str}
'dtmf': str,
'end_user_metadata': Dict[str, str],}
restart: Boolean flag that determines whether to use the existing
session ID or start a new conversation with a new session ID.
Passing True will create a new session ID on subsequent calls.
Defaults to False.
retries: used for recurse calling this func if API fails
current_page: Specify the page id to start the conversation from
checkpoints: Boolean flag to enable/disable Checkpoint timer
debugging. Defaults to False.
Expand All @@ -416,9 +475,7 @@ def reply(
"""
text = send_obj.get("text")
send_params = send_obj.get("params")

if not text:
logging.warning(f"Input Text is empty. {send_obj}")
end_user_metadata = send_obj.get("end_user_metadata")

if text and len(text) > 256:
logging.warning(
Expand Down Expand Up @@ -451,7 +508,7 @@ def reply(

# Build Query Params object
query_params = self._build_query_params_object(
send_params, current_page, disable_webhook
send_params, current_page, disable_webhook, end_user_metadata
)

# Build Query Input object
Expand Down Expand Up @@ -479,7 +536,18 @@ def reply(
logging.error("text: %s", text)
logging.error("query_params: %s", query_params)
logging.error("query_input: %s", query_input)
return {}
return {
"response_messages": (
f"""---- ERROR --- InternalServerError caught on CX.detect,
{err}"""
),
"confidence": "",
"page_name": "",
"intent_name": "",
"match_type": "",
"match": None,
"params": "",
}

except core_exceptions.ClientError as err:
logging.error(
Expand All @@ -493,13 +561,17 @@ def reply(
logging.error("query_params %s", query_params)
logging.error("query_input %s", query_input)
logging.error(traceback.print_exc())
retries += 1
if retries < MAX_RETRIES:
logging.error("retrying")
return self.reply(send_obj, restart=restart, retries=retries)
else:
logging.error("MAX_RETRIES exceeded")
return {}
return {
"response_messages": (
f"""---- ERROR --- ClientError caught on CX.detect, {err}"""
),
"confidence": "",
"page_name": "",
"intent_name": "",
"match_type": "",
"match": None,
"params": "",
}

if checkpoints:
self.checkpoint("<< got response")
Expand All @@ -523,7 +595,6 @@ def reply(
)
else:
params = None

reply["response_messages"] = response_messages
reply["confidence"] = query_result.intent_detection_confidence
reply["page_name"] = query_result.current_page.display_name
Expand Down Expand Up @@ -578,6 +649,8 @@ def run_intent_detection(
- NOTE, when using the Default Start Page of a Flow you must
define it as the special display name START_PAGE
utterance: str
inject_parameters (optional): str
end_user_metadata (optional): str
chunk_size: Determines the number of text requests to send in
parallel. This should be adjusted based on your test_set size and
the Quota limits set for your GCP project. Default is 300.
Expand All @@ -593,6 +666,9 @@ def run_intent_detection(
detected_intent: str
confidence: float
target_page: str
response_messages: str
match_type: str
parameters_set: str
"""

result = pd.DataFrame()
Expand All @@ -609,26 +685,31 @@ def run_intent_detection(

def _unpack_match(self, df: pd.DataFrame):
""" Unpacks a 'match' column into four component columns.
if a match column is None, then all four columns will be None.
Args:
df: dataframe containing a column named match of types.Match
Returns:
A copy of df with columns match_type, confidence, parameters_set,
and detected_intent instead of match.
"""
df = (
df
.copy()
return (
df.copy()
.assign(
match_type = lambda df: df.match.apply(
attrgetter("match_type._name_")),
lambda m: attrgetter("m.match_type._name_") if m else ""
),
confidence = lambda df: df.match.apply(
attrgetter("confidence")),
lambda m: m.confidence if m else ""
),
parameters_set = lambda df: df.match.apply(
attrgetter("parameters")),
lambda m: m.parameters if m else ""
),
detected_intent = lambda df: df.match.apply(
attrgetter("intent.display_name"))
lambda m: m.intent.display_name if m else ""
)
)
.assign(
parameters_set = lambda df: df.parameters_set.apply(
Expand All @@ -637,4 +718,3 @@ def _unpack_match(self, df: pd.DataFrame):
)
.drop(columns="match")
)
return df

0 comments on commit 8430b62

Please sign in to comment.