diff --git a/.gitignore b/.gitignore index 0bdbf412..a60d4cf3 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ .ipynb_checkpoints *scratch.ipynb *scratch.py +tmp/ # IPython profile_default/ diff --git a/src/dfcx_scrapi/agent_extract/__init__.py b/src/dfcx_scrapi/agent_extract/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/dfcx_scrapi/agent_extract/agents.py b/src/dfcx_scrapi/agent_extract/agents.py new file mode 100644 index 00000000..eaa4153d --- /dev/null +++ b/src/dfcx_scrapi/agent_extract/agents.py @@ -0,0 +1,146 @@ +"""Agent processing methods and functions.""" + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +import os +import shutil +from typing import Dict + +from dfcx_scrapi.core import agents +from dfcx_scrapi.core import operations +from dfcx_scrapi.core import scrapi_base +from dfcx_scrapi.agent_extract import graph +from dfcx_scrapi.agent_extract import flows +from dfcx_scrapi.agent_extract import intents +from dfcx_scrapi.agent_extract import entity_types +from dfcx_scrapi.agent_extract import test_cases +from dfcx_scrapi.agent_extract import webhooks +from dfcx_scrapi.agent_extract import gcs_utils +from dfcx_scrapi.agent_extract import types + +# logging config +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)-8s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) + +class Agents(scrapi_base.ScrapiBase): + """Agent Metadata methods and functions.""" + def __init__( + self, + agent_id: str, + lang_code: str = "en", + creds_path: str = None, + creds_dict: Dict = None, + creds=None, + scope=False + ): + super().__init__( + creds_path=creds_path, + creds_dict=creds_dict, + creds=creds, + scope=scope, + ) + self.agent_id = agent_id + self.lang_code = lang_code + self._core_agents = agents.Agents(creds=creds) + self.gcs = gcs_utils.GcsUtils() + self.flows = flows.Flows() + self.intents = intents.Intents() + self.etypes = entity_types.EntityTypes() + self.webhooks = webhooks.Webhooks() + self.tcs = test_cases.TestCases() + self.ops = operations.Operations() + + @staticmethod + def prep_local_dir(agent_local_path: str): + """Prepare the local directory for agent zip file.""" + if os.path.isdir(agent_local_path): + logging.info("Cleaning up old directory...") + shutil.rmtree(agent_local_path) + logging.info(f"Making temp directory: {agent_local_path}") + os.mkdir(agent_local_path) + else: + os.mkdir(agent_local_path) + + def await_lro(self, lro: str): + """Wait for long running operation to complete.""" + try: + i = 0 + while not self.ops.get_lro(lro).done: + time.sleep(1) + i += 1 + if i == 20: + break + + except UserWarning: + print("LRO Failed.") + + return True + + def export_agent(self, agent_id: str, gcs_bucket_uri: str, + environment_display_name: str = None): + """Handle the agent export, LRO and logging.""" + export_start = time.time() + logging.info("Exporting agent...") + lro = self._core_agents.export_agent( + agent_id=agent_id,gcs_bucket_uri=gcs_bucket_uri, data_format="JSON", + environment_display_name=environment_display_name) + + + self.await_lro(lro) + logging.info("Export Complete.") + logging.debug(f"EXPORT: {time.time() - export_start}") + + def download_and_extract(self, agent_local_path: str, gcs_bucket_uri: str): + """Handle download from GCS and extracting ZIP file.""" + if not os.path.exists(agent_local_path): + os.makedirs(agent_local_path) + + download_start = time.time() + logging.info("Downloading agent file from GCS Bucket...") + agent_file = self.gcs.download_gcs( + gcs_path=gcs_bucket_uri, local_path=agent_local_path) + logging.info("Download complete.") + logging.debug(f"DOWNLOAD: {time.time() - download_start}") + + self.gcs.unzip(agent_file, agent_local_path) + + + def process_agent(self, agent_id: str, gcs_bucket_uri: str, + environment_display_name: str = None): + """Process the specified Agent for offline data gathering.""" + agent_local_path = "tmp/agent" + self.prep_local_dir(agent_local_path) + self.export_agent(agent_id, gcs_bucket_uri, environment_display_name) + self.download_and_extract(agent_local_path, gcs_bucket_uri) + + logging.info("Processing Agent...") + data = types.AgentData() + data.graph = graph.Graph() + data.lang_code = self.lang_code + data.agent_id = agent_id + data = self.flows.process_flows_directory(agent_local_path, data) + data = self.intents.process_intents_directory(agent_local_path, data) + data = self.etypes.process_entity_types_directory( + agent_local_path, data) + data = self.webhooks.process_webhooks_directory(agent_local_path, data) + data = self.tcs.process_test_cases_directory(agent_local_path, data) + logging.info("Processing Complete.") + + return data diff --git a/src/dfcx_scrapi/agent_extract/common.py b/src/dfcx_scrapi/agent_extract/common.py new file mode 100644 index 00000000..26b86ff3 --- /dev/null +++ b/src/dfcx_scrapi/agent_extract/common.py @@ -0,0 +1,80 @@ +"""Common methods and helper functions used throughout library.""" + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re +from dfcx_scrapi.agent_extract import types + +# logging config +logging.basicConfig( + level=logging.INFO, + format="%(message)s", +) + +class Common: + """Common methods and helper functions used throughout library.""" + + @staticmethod + def parse_filepath(in_path: str, resource_type: str) -> str: + """Parse file path to provide quick reference for resource.""" + + regex_map = { + "flow": r".*\/flows\/([^\/]*)", + "page": r".*\/pages\/([^\/]*)\.", + "entity_type": r".*\/entityTypes\/([^\/]*)", + "intent": r".*\/intents\/([^\/]*)", + "route_group": r".*\/transitionRouteGroups\/([^\/]*)", + "webhook": r".*\/webhooks\/([^\/]*)\." + } + resource_name = re.match(regex_map[resource_type], in_path).groups()[0] + + return resource_name + + @staticmethod + def clean_display_name(display_name: str): + """Replace cspecial haracters from map for the given display name.""" + patterns = { + "%22": '"', + "%23": "#", + "%24": "$", + "%26": "&", + "%27": "'", + "%28": "(", + "%29": ")", + "%2b": "+", + "%2c": ",", + "%2f": "/", + "%3a": ":", + "%3c": "<", + "%3d": "=", + "%3e": ">", + "%3f": "?", + "%5b": "[", + "%5d": "]", + "%e2%80%9c": "“", + "%e2%80%9d": "”", + } + + for key, value in patterns.items(): + if key in display_name: + display_name = display_name.replace(key, value) + + return display_name + + @staticmethod + def check_lang_code(lang_code: str, stats: types.AgentData): + """Check to see if file lang_code matches user input lang_code.""" + return stats.lang_code == lang_code diff --git a/src/dfcx_scrapi/agent_extract/entity_types.py b/src/dfcx_scrapi/agent_extract/entity_types.py new file mode 100644 index 00000000..17469b58 --- /dev/null +++ b/src/dfcx_scrapi/agent_extract/entity_types.py @@ -0,0 +1,168 @@ +"""Entity Type processing methods and functions.""" + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + +from typing import Dict + +from dfcx_scrapi.agent_extract import common +from dfcx_scrapi.agent_extract import types + +class EntityTypes: + """Entity Type processing methods and functions.""" + + def __init__(self): + self.common = common.Common() + + @staticmethod + def build_entity_type_path_list(agent_local_path: str): + """Builds a list of dirs, each representing an Entity Type directory. + + Ex: /path/to/agent/entityTypes/ + + This dir path can then be used to find the next level of information + in the directory by appending the appropriate next dir structures like: + - .json, for the Entity Type object + - /entities, for the Entities dir + """ + root_dir = agent_local_path + "/entityTypes" + + entity_type_paths = [] + + for entity_type_dir in os.listdir(root_dir): + entity_type_dir_path = f"{root_dir}/{entity_type_dir}" + entity_type_paths.append(entity_type_dir_path) + + return entity_type_paths + + @staticmethod + def build_lang_code_paths(etype: types.EntityType): + """Builds dict of lang codes and file locations. + + The language_codes and paths for each file are stored in a dictionary + inside of the Entity Type dataclass. This dict is accessed later to + lint each file and provide reporting based on each language code. + """ + root_dir = etype.dir_path + "/entities" + + for lang_file in os.listdir(root_dir): + lang_code = lang_file.split(".")[0] + lang_code_path = f"{root_dir}/{lang_file}" + etype.entities[lang_code] = {"file_path": lang_code_path} + + @staticmethod + def build_excluded_phrases_path(etype: types.EntityType, lang_code: str): + """Builds a dict of excluded phrases and file locations.""" + root_dir = etype.dir_path + "/excludedPhrases" + lang_code_path = f"{root_dir}/{lang_code}.json" + + return lang_code_path + + @staticmethod + def process_entity_type_metadata(etype: types.EntityType): + """Extract metadata for Entity Type for later processing.""" + metadata_file = etype.dir_path + f"/{etype.display_name}.json" + + with open(metadata_file, "r", encoding="UTF-8") as etype_file: + etype.data = json.load(etype_file) + etype.resource_id = etype.data.get("name", None) + etype.kind = etype.data.get("kind", None) + etype.auto_expansion = etype.data.get("autoExpansionMode", None) + etype.fuzzy_extraction = etype.data.get( + "enableFuzzyExtraction", False) + + etype_file.close() + + def process_excluded_phrases_language_codes( + self, data: Dict[str, str], lang_code_path: str): + """Process all ecluded phrases lang_code files.""" + with open(lang_code_path, "r", encoding="UTF-8") as ent_file: + new_data = json.load(ent_file) + data["excluded_phrases"] = new_data.get("excludedPhrases", None) + + return data + + def process_excluded_phrases(self, etype: types.EntityType, lang_code: str, + data: Dict[str, str]): + """Process the excluded phrases if they exist.""" + if "excludedPhrases" in os.listdir(etype.dir_path): + lang_code_path = self.build_excluded_phrases_path(etype, lang_code) + data = self.process_excluded_phrases_language_codes( + data, lang_code_path) + + return data + + def process_language_codes( + self, etype: types.EntityType, stats: types.AgentData): + """Process all Entity Type lang_code files.""" + for lang_code in etype.entities: + ent_file_path = etype.entities[lang_code]["file_path"] + + if not self.common.check_lang_code(lang_code, stats): + continue + + with open(ent_file_path, "r", encoding="UTF-8") as ent_file: + data = json.load(ent_file) + data["name"] = f"{stats.agent_id}/entityTypes/"\ + f"{etype.resource_id}" + data["display_name"] = etype.display_name + data["kind"] = etype.kind + data["entities"] = data.get("entities", None) + data = self.process_excluded_phrases(etype, lang_code, data) + stats.entity_types.append(data) + + ent_file.close() + + return stats + + def process_entities(self, etype: types.EntityType, stats: types.AgentData): + """Process the Entity files inside of an Entity Type.""" + if "entities" in os.listdir(etype.dir_path): + self.build_lang_code_paths(etype) + stats = self.process_language_codes(etype, stats) + + return stats + + def process_entity_type( + self, etype: types.EntityType, stats: types.AgentData): + """Process a Single Entity Type dir and all subdirectories.""" + + etype.display_name = self.common.parse_filepath( + etype.dir_path, "entity_type") + etype.display_name = self.common.clean_display_name(etype.display_name) + + self.process_entity_type_metadata(etype) + stats = self.process_entities(etype, stats) + stats.total_entity_types += 1 + + return stats + + def process_entity_types_directory( + self, agent_local_path: str, stats: types.AgentData): + """Processing the Entity Types dir in the JSON Package structure.""" + # Create a list of all Entity Type paths to iter through + entity_type_paths = self.build_entity_type_path_list(agent_local_path) + + for entity_type_path in entity_type_paths: + etype = types.EntityType() + etype.dir_path = entity_type_path + + stats = self.process_entity_type(etype, stats) + full_etype_id = f"{stats.agent_id}/entityTypes/{etype.resource_id}" + stats.entity_types_map[etype.display_name] = full_etype_id + + return stats diff --git a/src/dfcx_scrapi/agent_extract/flows.py b/src/dfcx_scrapi/agent_extract/flows.py new file mode 100644 index 00000000..6d991dac --- /dev/null +++ b/src/dfcx_scrapi/agent_extract/flows.py @@ -0,0 +1,301 @@ +"""Flow extract methods and functions.""" + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + +from typing import List + +from dfcx_scrapi.agent_extract import graph +from dfcx_scrapi.agent_extract import common +from dfcx_scrapi.agent_extract import types +from dfcx_scrapi.agent_extract import pages +from dfcx_scrapi.agent_extract import routes +from dfcx_scrapi.agent_extract import route_groups + + +class Flows: + """Flow processing methods and functions.""" + + def __init__(self): + self.common = common.Common() + self.pages = pages.Pages() + self.rgs = route_groups.RouteGroups() + self.routes = routes.Fulfillments() + self.special_pages = [ + "End Session", + "End Flow", + "Start Page", + "Current Page", + "Previous Page", + ] + + @staticmethod + def build_flow_path_list(agent_local_path: str): + """Builds a list of dirs, each representing a Flow directory. + + Ex: /path/to/agent/flows/ + + This dir path can then be used to find the next level of information + in the directory by appending the appropriate next dir structures like: + - .json, for the Flow object + - /transitionRouteGroups, for the Route Groups dir + - /pages, for the Pages dir + """ + root_dir = agent_local_path + "/flows" + + flow_paths = [] + + for flow_dir in os.listdir(root_dir): + flow_dir_path = f"{root_dir}/{flow_dir}" + flow_paths.append(flow_dir_path) + + return flow_paths + + @staticmethod + def remove_flow_pages_from_set(input_set: set) -> set: + """Remove any transitions tagged with FLOW. + + Some route transitions go to Flow instead of Page. For these + transitions, we tag them with `FLOW` for easier identification later. + However, when reporting on Graph inconsistencies like Dangling or + Unreachable pages, we want to remove these from any result sets as they + are not relevant. + """ + filtered_set = set() + + for page in input_set: + if "FLOW" not in page: + filtered_set.add(page) + + return filtered_set + + def find_unreachable_pages(self, flow: types.Flow): + """Find Unreachable Pages in the graph. + + An Unreachable Page is defined as: + - A Page which has no incoming edge when traversed from Start Page. + That is, it is unreachable in the graph by any practical means. + - A Page which is connected to a root unreachable page. That is, a + page that could have both incoming or outgoing routes, but due to + its connectedness to the root orphan page, is unreachable in the + graph. + + Here we will compute the symmetric difference of 2 sets: + - Active Pages (i.e. Pages that were reachable in the graph) + - Used Pages (i.e. Pages that were used by some Route) + + If an Unreachable Page has children that it routes to, those children + will appear in Used Pages, although they will ultimately be + unreachable. It's possible for an Unreachable Page to route back to an + Active Page in the graph. For these instances, we don't want to count + those pages as unreachable, because they are reachable via other + sections of the graph. + """ + filtered_set = flow.active_pages.symmetric_difference( + flow.graph.used_nodes + ) + filtered_set = self.remove_flow_pages_from_set(filtered_set) + flow.unreachable_pages.update(filtered_set) + + return flow + + def find_unused_pages(self, flow: types.Flow): + """Find Unused Pages in the graph. + + An Unused Page is defined as: + - A Page which has no incoming or outgoing edge AND + - A Page which exists in the Agent design time, but which is not + present anywhere in the graph, either visible or non-visible. + + Here we will compute the difference of 2 sets: + - All Pages (i.e. Pages that exist in the Agent Design Time) + - Used Pages (i.e. Pages that were used by some Route) + + The resulting set will consist of 2 types of Pages: + - Truly Unused Pages + - Unreachable Root Pages + + Unreachable Root Pages end up in the results due to the fact that no + other Active Page is pointing to them. We remove these from the + resulting set before presenting the Truly Unused Pages. + """ + + # Discard special pages as they are non-relevant for final outcome + for page in self.special_pages: + flow.all_pages.discard(page) + + prelim_unused = flow.all_pages.difference(flow.graph.used_nodes) + + # Filter out Unreachable Root Pages + filtered_set = set() + + for page in prelim_unused: + if page not in flow.graph.edges: + filtered_set.add(page) + else: + flow.unreachable_pages.add(page) + + flow.unused_pages = filtered_set + + return flow + + def recurse_edges( + self, edges: List, page: types.Page, dangling: set, visited: set + ): + """Recursive method searching graph edges for Active / Dangling Pages. + + A byproduct of searching for Dangling Pages in the graph is that we can + produce a set of Active Pages in the graph. These are pages that are + reachable when traversing from the Start Page. These can then be used + to determine Unreachable Pages in another method. + """ + # For Flow Start Pages, we prepend the Flow name for later + # identification. For this section, we'll need to strip it off to + # compare with the other sets. + if page in edges: + for inner_page in edges[page]: + if inner_page not in visited: + visited.add(inner_page) + dangling, visited = self.recurse_edges( + edges, inner_page, dangling, visited + ) + + else: + dangling.add(page) + + return dangling, visited + + def find_dangling_pages(self, flow: types.Flow): + """Find Dangling Pages in the graph. + + Dangling Page is defined as: + - Any page that exists in the graph that has no outgoing edge + Active Page is defined as: + - Any page that is reachable via an active route in the graph and can + be traced back to the Start Page. + + These pages can result in a conversational "dead end" which is + potentially unrecoverable. + A byproduct of searching for the dangling pages is locating all of the + "active" pages. These are the pages that are "visited" as we traverse + the graph. We'll also return Active Pages in this method since they + will be used for downstream tasks. + """ + + flow.dangling_pages, flow.active_pages = self.recurse_edges( + flow.graph.edges, + f"{flow.display_name}: Start Page", + flow.dangling_pages, + flow.active_pages, + ) + + # Clean up Special Pages + for page in self.special_pages: + flow.dangling_pages.discard(page) + + flow.dangling_pages = self.remove_flow_pages_from_set( + flow.dangling_pages + ) + + return flow + + def process_start_page(self, flow: types.Flow, stats: types.AgentData): + """Process a single Flow Path file.""" + with open(flow.start_page_file, "r", encoding="UTF-8") as flow_file: + page = types.Page(flow=flow) + page.display_name = f"{flow.display_name}: Start Page" + + # We keep track of an instance specific Flow graph for the current + # Flow, and then a main Graph for the entire agent. + flow.graph.add_node(page.display_name) + stats.graph.add_node(page.display_name) + + page.data = json.load(flow_file) + page.events = page.data.get("eventHandlers", None) + page.routes = page.data.get("transitionRoutes", None) + page.route_groups = page.data.get("transitionRouteGroups", None) + stats.flows.append(page.data) + + flow.resource_id = page.data.get("name", None) + + # Order of processing is important + stats = self.routes.process_routes(page, stats) + stats = self.routes.process_events(page, stats) + + if page.route_groups: + page, stats = self.routes.set_route_group_targets(page, stats) + + flow_file.close() + + full_flow_id = f"{stats.agent_id}/flows/{flow.resource_id}" + stats.flows_map[flow.display_name] = full_flow_id + stats.flow_page_map[flow.display_name] = { + "id": full_flow_id, + "pages": {} + } + + return stats + + def process_flow(self, flow: types.Flow, stats: types.AgentData): + """Process a Single Flow dir and all subdirectories.""" + flow.file_name = self.common.parse_filepath(flow.dir_path, "flow") + flow.display_name = self.common.clean_display_name(flow.file_name) + + flow.start_page_file = f"{flow.dir_path}/{flow.file_name}.json" + + stats.pages[flow.display_name] = [] + stats.active_intents[flow.display_name] = [] + stats = self.process_start_page(flow, stats) + stats = self.pages.process_pages_directory(flow, stats) + stats = self.rgs.process_route_groups_directory(flow, stats) + + # Order of Find Operations is important here! + flow = self.find_unused_pages(flow) + flow = self.find_dangling_pages(flow) + flow = self.find_unreachable_pages(flow) + + stats.active_pages[flow.display_name] = flow.active_pages + stats.unused_pages[flow.display_name] = flow.unused_pages + stats.unreachable_pages[flow.display_name] = flow.unreachable_pages + + return stats + + def process_flows_directory( + self, agent_local_path: str, stats: types.AgentData): + """Process the top level Flows dir in the JSON Package structure. + + The following files/dirs exist under the `flows` dir: + - Flow object (i.e. Flow START_PAGE) + - transitionRouteGroups + - pages + + In Dialogflow CX, the START_PAGE of each Flow is a special kind of Page + that exists within the Flow object itself. In this method, we will lint + the Flow object, all files in the transitionRouteGroups dir and all + files in the pages dir. + """ + # Create a list of all Flow paths to iter through + flow_paths = self.build_flow_path_list(agent_local_path) + stats.total_flows = len(flow_paths) + + for flow_path in flow_paths: + flow = types.Flow() + flow.graph = graph.Graph() + flow.dir_path = flow_path + stats = self.process_flow(flow, stats) + + return stats diff --git a/src/dfcx_scrapi/agent_extract/gcs_utils.py b/src/dfcx_scrapi/agent_extract/gcs_utils.py new file mode 100644 index 00000000..c3daff89 --- /dev/null +++ b/src/dfcx_scrapi/agent_extract/gcs_utils.py @@ -0,0 +1,68 @@ +"""Utils for Cloud Storage and local file manipulation.""" + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import zipfile +from google.cloud import storage +from google.oauth2 import service_account + + +class GcsUtils: + """Utils for Cloud Storage and local file manipulation.""" + + def __init__(self, creds_path: str = None, project_id: str = None): + if creds_path and project_id: + self.creds = service_account.Credentials.from_service_account_file( + creds_path + ) + self.gcs_client = storage.Client( + credentials=self.creds, project=project_id + ) + + else: + self.gcs_client = storage.Client() + + @staticmethod + def unzip(agent_zip_file_path: str, extract_path: str): + """Unzip file locally.""" + with zipfile.ZipFile(agent_zip_file_path, "r") as zip_ref: + zip_ref.extractall(extract_path) + + @staticmethod + def check_for_gcs_file(file_path: str) -> bool: + """Validates GCS path vs. local path.""" + is_gcs_file = False + + file_prefix = file_path.split("/")[0] + if file_prefix == "gs:": + is_gcs_file = True + + return is_gcs_file + + def download_gcs(self, gcs_path: str, local_path: str = None): + """Downloads the specified GCS file to local machine.""" + path = gcs_path.split("//")[1] + bucket = path.split("/", 1)[0] + gcs_object = path.split("/", 1)[1] + file_name = gcs_object.split("/")[-1] + bucket = self.gcs_client.bucket(bucket) + blob = storage.Blob(gcs_object, bucket) + + if local_path: + file_name = local_path + "/" + file_name + + blob.download_to_filename(file_name) + + return file_name diff --git a/src/dfcx_scrapi/agent_extract/graph.py b/src/dfcx_scrapi/agent_extract/graph.py new file mode 100644 index 00000000..b94217ce --- /dev/null +++ b/src/dfcx_scrapi/agent_extract/graph.py @@ -0,0 +1,45 @@ +"""Utility class for managing graph structure.""" + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections + +class Graph: + """Utility class for manaing graph structure.""" + + def __init__(self): + self.nodes = set() + self.edges = collections.defaultdict(list) + self.used_nodes = set() + + def add_node(self, node): + """Add node to set of all nodes, regardless of use in graph.""" + self.nodes.add(node) + + def add_edge(self, node1, node2): + self.edges[node1].append(node2) + + def add_used_node(self, node): + """Add node to set of active in use nodes for the graph.""" + self.used_nodes.add(node) + + def remove_node(self, node): + self.nodes.remove(node) + + def remove_edge(self, node1, node2): + self.edges[node1].remove(node2) + + def __str__(self): + return f"Graph({self.nodes}, {self.edges})" diff --git a/src/dfcx_scrapi/agent_extract/intents.py b/src/dfcx_scrapi/agent_extract/intents.py new file mode 100644 index 00000000..e8d46dca --- /dev/null +++ b/src/dfcx_scrapi/agent_extract/intents.py @@ -0,0 +1,166 @@ +"""Intent processing methods and functions.""" + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + +from dfcx_scrapi.agent_extract import common +from dfcx_scrapi.agent_extract import types + + +class Intents: + """Intent processing methods and functions.""" + + def __init__(self): + self.common = common.Common() + + @staticmethod + def parse_lang_code(lang_code_path: str) -> str: + """Extract the language_code from the given file path.""" + + first_parse = lang_code_path.split("/")[-1] + lang_code = first_parse.split(".")[0] + + return lang_code + + @staticmethod + def build_lang_code_paths(intent: types.Intent): + """Builds dict of lang codes and file locations. + + The language_codes and paths for each file are stored in a dictionary + inside of the Intent dataclass. This dict is access later to process + each file and provide reporting based on each language code. + """ + root_dir = intent.dir_path + "/trainingPhrases" + + for lang_file in os.listdir(root_dir): + lang_code = lang_file.split(".")[0] + lang_code_path = f"{root_dir}/{lang_file}" + intent.training_phrases[lang_code] = {"file_path": lang_code_path} + + @staticmethod + def build_intent_path_list(agent_local_path: str): + """Builds a list of dirs, each representing an Intent directory. + + Ex: /path/to/agent/intents/ + + This dir path can be used to find the next level of information + in the directory by appending the appropriate next dir structures like: + - .json, for the Intent object metadata + - /trainingPhrases, for the Training Phrases dir + """ + root_dir = agent_local_path + "/intents" + + intent_paths = [] + + for intent_dir in os.listdir(root_dir): + intent_dir_path = f"{root_dir}/{intent_dir}" + intent_paths.append(intent_dir_path) + + return intent_paths + + def process_intent_metadata( + self, intent: types.Intent): + """Process the metadata file for a single Intent.""" + intent.metadata_file = f"{intent.dir_path}/{intent.display_name}.json" + + try: + with open(intent.metadata_file, "r", encoding="UTF-8") as meta_file: + intent.data = json.load(meta_file) + intent.resource_id = intent.data.get("name", None) + intent.labels = intent.data.get("labels", None) + intent.description = intent.data.get("description", None) + intent.parameters = intent.data.get("parameters", None) + + meta_file.close() + + except FileNotFoundError: + pass + + def process_language_codes( + self, intent: types.Intent, stats: types.AgentData): + """Process all training phrase lang_code files.""" + + for lang_code in intent.training_phrases: + tp_file = intent.training_phrases[lang_code]["file_path"] + + if not self.common.check_lang_code(lang_code, stats): + continue + + with open(tp_file, "r", encoding="UTF-8") as tps: + data = json.load(tps) + data["name"] = f"{stats.agent_id}/intents/{intent.resource_id}" + data["display_name"] = intent.display_name + data["labels"] = intent.labels + data["description"] = intent.description + data["parameters"] = intent.parameters + stats.intents.append(data) + stats.total_training_phrases += len(data["trainingPhrases"]) + + tps.close() + + return stats + + def process_training_phrases( + self, intent: types.Intent, stats: types.AgentData): + """Process the Training Phrase dir for a single Intent.""" + if "trainingPhrases" in os.listdir(intent.dir_path): + self.build_lang_code_paths(intent) + stats = self.process_language_codes(intent, stats) + + return stats + + def process_intent(self, intent: types.Intent, stats: types.AgentData): + """Process a single Intent directory and associated files.""" + intent.display_name = self.common.parse_filepath( + intent.dir_path, "intent") + intent.display_name = self.common.clean_display_name( + intent.display_name) + + self.process_intent_metadata(intent) + stats = self.process_training_phrases(intent, stats) + stats.total_intents += 1 + + return stats + + def process_intents_directory( + self, agent_local_path: str, stats: types.AgentData): + """Processing the top level Intents Dir in the JSON Package structure. + + The following files/dirs exist under the `intents` dir: + - Directory + - trainingPhrases + - .json + - Object + + In Dialogflow CX, the Training Phrases of each Intent are stored in + individual .json files by language code under each Intent Display + Name. In this method, we will process all Intent dirs, including the + training phrase files and metadata objects for each Intent. + """ + # Create a list of all Intent paths to iter through + intent_paths = self.build_intent_path_list(agent_local_path) + stats.intents = [] + + for intent_path in intent_paths: + intent = types.Intent() + intent.dir_path = intent_path + + stats = self.process_intent(intent, stats) + full_intent_id = f"{stats.agent_id}/intents/{intent.resource_id}" + stats.intents_map[intent.display_name] = full_intent_id + + return stats diff --git a/src/dfcx_scrapi/agent_extract/pages.py b/src/dfcx_scrapi/agent_extract/pages.py new file mode 100644 index 00000000..51a710c6 --- /dev/null +++ b/src/dfcx_scrapi/agent_extract/pages.py @@ -0,0 +1,138 @@ +"""Pages processing methods and functions.""" + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + +from typing import Dict, Any + +from dfcx_scrapi.agent_extract import common +from dfcx_scrapi.agent_extract import types +from dfcx_scrapi.agent_extract import routes + + +class Pages: + """Pages processing methods and functions.""" + + def __init__(self): + self.common = common.Common() + self.routes = routes.Fulfillments() + + @staticmethod + def build_page_path_list(flow_path: str): + """Builds a list of files, each representing a Page. + + Ex: /path/to/agent/flows//pages/.json + """ + pages_path = f"{flow_path}/pages" + + page_paths = [] + + for page in os.listdir(pages_path): + page_file_path = f"{pages_path}/{page}" + page_paths.append(page_file_path) + + return page_paths + + @staticmethod + def get_form_parameter_data(param: Dict[str, Any], page: types.Page): + fp = types.FormParameter(page=page) + fp.display_name = param.get("displayName", None) + fp.entity_type = param.get("entityType", None) + fp.required = param.get("required", None) + + fp.fill_behavior = param.get("fillBehavior", None) + + if fp.fill_behavior: + fp.init_fulfillment = fp.fill_behavior.get( + "initialPromptFulfillment", None) + fp.reprompt_handlers = fp.fill_behavior.get( + "repromptEventHandlers", None) + + fp.advanced_settings = page.form.get("advancedSettings", None) + + if fp.advanced_settings: + fp.dtmf_settings = fp.advanced_settings.get("dtmfSettings", None) + + return fp + + def process_form(self, page: types.Page, stats: types.AgentData): + """Process the Form and sub-resources within it for the Page.""" + parameters = page.form.get("parameters", None) + + if parameters: + for param in parameters: + fp = self.get_form_parameter_data(param, page) + stats = self.routes.process_reprompt_handlers(fp, stats) + + return stats + + + def process_page(self, page: types.Page, stats: types.AgentData): + """Process a Single Page file.""" + page.display_name = self.common.parse_filepath(page.page_file, "page") + page.display_name = self.common.clean_display_name(page.display_name) + + stats.graph.add_node(page.display_name) + page.flow.graph.add_node(page.display_name) + + page.flow.all_pages.add(page.display_name) + + with open(page.page_file, "r", encoding="UTF-8") as page_file: + page.data = json.load(page_file) + page.entry = page.data.get("entryFulfillment", None) + page.events = page.data.get("eventHandlers", None) + page.form = page.data.get("form", None) + page.routes = page.data.get("transitionRoutes", None) + page.route_groups = page.data.get("transitionRouteGroups", None) + page.resource_id = page.data.get("name", None) + + # Order of linting is important here + stats = self.routes.process_entry(page, stats) + stats = self.routes.process_routes(page, stats) + stats = self.routes.process_events(page, stats) + stats = self.process_form(page, stats) + + if page.route_groups: + page, stats = self.routes.set_route_group_targets(page, stats) + + page_file.close() + + full_flow_id = f"{stats.agent_id}/flows/{page.flow.resource_id}" + full_page_id = f"{full_flow_id}/pages/{page.resource_id}" + stats.pages[page.flow.display_name].append(page.data) + stats.flow_page_map[ + page.flow.display_name]["pages"][page.display_name] = full_page_id + + return stats + + def process_pages_directory(self, flow: types.Flow, stats: types.AgentData): + """Process the Pages dir inside a specific Flow dir. + + Some Flows may not contain Pages, so we check for the existence + of the directory before traversing + """ + if "pages" in os.listdir(flow.dir_path): + page_paths = self.build_page_path_list(flow.dir_path) + + for page_path in page_paths: + page = types.Page(flow=flow) + page.agent_id = flow.agent_id + page.page_file = page_path + stats.total_pages += 1 + stats = self.process_page(page, stats) + + return stats diff --git a/src/dfcx_scrapi/agent_extract/route_groups.py b/src/dfcx_scrapi/agent_extract/route_groups.py new file mode 100644 index 00000000..3a73e50a --- /dev/null +++ b/src/dfcx_scrapi/agent_extract/route_groups.py @@ -0,0 +1,100 @@ +"""Route Groups processing methods and functions.""" + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import json + +from dfcx_scrapi.agent_extract import common +from dfcx_scrapi.agent_extract import types +from dfcx_scrapi.agent_extract import routes + + +class RouteGroups: + """Route Groups processing methods and functions.""" + + def __init__(self): + self.special_pages = [ + "End Session", + "End Flow", + "Start Page", + "Current Page", + "Previous Page", + ] + + self.common = common.Common() + self.routes = routes.Fulfillments() + + @staticmethod + def build_route_group_path_list(flow_local_path: str): + """Builds a list of files, each representing a Route Group. + + Ex: /path/to/agent/flows//transitionRouteGroups/ + """ + root_dir = flow_local_path + "/transitionRouteGroups" + + if "transitionRouteGroups" in os.listdir(flow_local_path): + rg_paths = [] + + for rg_file in os.listdir(root_dir): + rg_file_path = f"{root_dir}/{rg_file}" + rg_paths.append(rg_file_path) + + return rg_paths + + def process_route_group(self, rg: types.RouteGroup, stats: types.AgentData): + """Process a single Route Group.""" + rg.display_name = self.common.parse_filepath(rg.rg_file, "route_group") + rg.display_name = self.common.clean_display_name(rg.display_name) + + with open(rg.rg_file, "r", encoding="UTF-8") as route_group_file: + rg.data = json.load(route_group_file) + rg.resource_id = rg.data.get("name", None) + rg.display_name = rg.data.get("displayName", None) + rg.routes = rg.data.get("transitionRoutes", None) + + stats = self.routes.process_routes(rg, stats) + + route_group_file.close() + + full_flow_id = f"{stats.agent_id}/flows/{rg.flow.resource_id}" + full_rg_id = f"{full_flow_id}/transitionRouteGroups/{rg.resource_id}" + stats.route_groups_map[ + rg.flow.display_name]["route_groups"][rg.display_name] = full_rg_id + stats.route_groups[rg.flow.display_name].append(rg.data) + + return stats + + def process_route_groups_directory( + self, flow: types.Flow, stats: types.AgentData): + """Process Route Groups dir in the JSON Package structure.""" + if "transitionRouteGroups" in os.listdir(flow.dir_path): + # Create a list of all Route Group paths to iter through + rg_paths = self.build_route_group_path_list(flow.dir_path) + stats.total_route_groups += len(rg_paths) + + full_flow_id = f"{stats.agent_id}/flows/{flow.resource_id}" + stats.route_groups_map[flow.display_name] = { + "id": full_flow_id, + "route_groups": {} + } + stats.route_groups[flow.display_name] = [] + + for rg_path in rg_paths: + rg = types.RouteGroup(flow=flow) + rg.rg_file = rg_path + stats = self.process_route_group(rg, stats) + + return stats diff --git a/src/dfcx_scrapi/agent_extract/routes.py b/src/dfcx_scrapi/agent_extract/routes.py new file mode 100644 index 00000000..f91d61ed --- /dev/null +++ b/src/dfcx_scrapi/agent_extract/routes.py @@ -0,0 +1,320 @@ +"""Fulfillment routes processing methods and functions.""" + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, Any + +from dfcx_scrapi.agent_extract import common +from dfcx_scrapi.agent_extract import types + + +class Fulfillments: + """Fulfillment routes processing methods and functions.""" + + def __init__(self): + self.common = common.Common() + self.route_parameters = {} + + @staticmethod + def check_for_webhook(page: types.Page, path: Dict[str, Any]): + """Check the current route for existence of webhook.""" + if "webhook" in path: + page.has_webhook = True + + @staticmethod + def check_for_webhook_event_handlers(route: types.Fulfillment): + """Check for Webhook Error Event Handler on Page. + + In this method, we're interested in the following conditions: + - Page is currently flagged w/webhook = True + - Page HAS NOT been flagged w/having a webhook error handler + - The trigger MATCHES the pattern 'webhook.error' + + If a Page and its Route meet all the criteria, we'll flip the bit. + Otherwise, the webhook handler bit will remain False, causing a rule + flag.""" + + if all( + [ + route.page.has_webhook, + not route.page.has_webhook_event_handler, + "webhook.error" in route.trigger, + ] + ): + route.page.has_webhook_event_handler = True + + @staticmethod + def check_for_intent(route: types.Fulfillment): + """Check route data to see if Intent is present.""" + intent = None + if "intent" in route.data: + intent = route.data["intent"] + + return intent + + def process_intents_in_routes( + self, route: types.Fulfillment, stats: types.AgentData): + intent = self.check_for_intent(route) + if intent: + pair = (intent, route.page.display_name) + stats.active_intents[ + route.page.flow.display_name].append(pair) + + return stats + + def collect_transition_route_trigger(self, route): + """Inspect route and return all Intent/Condition info.""" + + trigger = [] + + if "intent" in route.data: + trigger.append("intent") + + if "condition" in route.data: + trigger.append("condition") + + if len(trigger) > 0: + trigger = "+".join(trigger) + + return trigger + + def get_trigger_info(self, route): + """Extract trigger info from route based on primary key.""" + + if route.fulfillment_type == "event": + trigger = f"event : {route.data.get('event', None)}" + + if route.fulfillment_type == "reprompt_handler": + trigger = f"{route.parameter} : event : "\ + f"{route.data.get('event', None)}" + + if route.fulfillment_type == "transition_route": + intent_condition = self.collect_transition_route_trigger(route) + trigger = f"route : {intent_condition}" + + return trigger + + def set_route_group_targets(self, page: types.Page, stats: types.AgentData): + """Determine Route Targets for Route Group routes.""" + current_page = page.display_name + + for route_group in page.route_groups: + page.flow.graph.add_edge(current_page, route_group) + page.flow.graph.add_used_node(route_group) + + stats.graph.add_edge(current_page, route_group) + stats.graph.add_used_node(route_group) + + return page, stats + + def set_route_targets( + self, route: types.Fulfillment, stats: types.AgentData): + """Determine the Route Targets for the specified route. + + Primary function is to build out the graph structure for the + Flow based on the current page and where the routes are pointing to. + The graph structure can then be traversed later to determine any errors + or inconsistencies in design. + """ + current_page = route.page.display_name + + route.target_flow = route.data.get("targetFlow", None) + route.target_page = route.data.get("targetPage", None) + + if route.target_page: + route.page.flow.graph.add_edge(current_page, route.target_page) + route.page.flow.graph.add_used_node(route.target_page) + + stats.graph.add_edge(current_page, route.target_page) + stats.graph.add_used_node(route.target_page) + + if route.target_flow: + route.page.flow.graph.add_edge( + current_page, f"FLOW: {route.target_flow}") + route.page.flow.graph.add_used_node(f"FLOW: {route.target_flow}") + + stats.graph.add_edge( + current_page, f"FLOW: {route.target_flow}" + ) + stats.graph.add_used_node(f"FLOW: {route.target_flow}") + + return route, stats + + def update_route_parameters( + self, route: types.Fulfillment, item: Dict[str, str]): + """Update the Route Parameters map based on new info.""" + flow_name = route.page.flow.display_name + page_name = route.page.display_name + + flow_data = self.route_parameters.get(flow_name, None) + page_data = None + + if flow_data: + page_data = flow_data.get(page_name, None) + + # Flow and Page already exists, append to existing list. + if page_data: + self.route_parameters[flow_name][page_name].append(item) + + # Flow data exists, but not Page, so only create the Page list. + elif flow_data and not page_data: + self.route_parameters[flow_name][page_name] = [item] + + # Neither the Flow or Page data exists, so create it all. + else: + self.route_parameters[flow_name] = {page_name: [item]} + + + def process_fulfillment_type( + self, stats: types.AgentData, route: types.Fulfillment, path: object, + key: str): + """Parse through specific fulfillment types.""" + fulfillment_data = path.get(key, None) + + if fulfillment_data: + for item in fulfillment_data: + # This is where each message type will exist + # text, custom payload, etc. + + if "text" in item: + for text in item["text"]["text"]: + route.text = text + + if "parameter" in item: + self.update_route_parameters(route, item) + + return stats + + def process_reprompt_handlers( + self, fp: types.FormParameter, stats: types.AgentData): + """Processing for Reprompt Event Handlers inside Form parameters. + + While Reprompt Event Handlers are technically Events, they differ from + standard Page level Events because they act on the FormParameter data + structure, not Fulfillment Route data structure as standard Events do. + """ + if not fp.reprompt_handlers: + return stats + + for handler in fp.reprompt_handlers: + route = types.Fulfillment(page=fp.page) + route.data = handler + route.agent_id = fp.page.agent_id + route.fulfillment_type = "reprompt_handler" + route.parameter = fp.display_name + route.trigger = self.get_trigger_info(route) + route, stats = self.set_route_targets(route, stats) + path = route.data.get("triggerFulfillment", None) + event = route.data.get("event", None) + + stats = self.process_intents_in_routes(route, stats) + + if not path and not event: + continue + + # Flag for Webhook Handler + self.check_for_webhook(fp.page, path) + + stats = self.process_fulfillment_type( + stats, route, path, "messages") + + return stats + + def process_events(self, page: types.Page, stats: types.AgentData): + """Parse through all Page Event Handlers.""" + if not page.events: + return stats + + for route_data in page.events: + route = types.Fulfillment(page=page) + route.data = route_data + route.agent_id = page.agent_id + route.fulfillment_type = "event" + route.trigger = self.get_trigger_info(route) + route, stats = self.set_route_targets(route, stats) + path = route.data.get("triggerFulfillment", None) + event = route.data.get("event", None) + + stats = self.process_intents_in_routes(route, stats) + + if not path and not event: + continue + + # Flag for Webhook Handler + self.check_for_webhook_event_handlers(route) + + stats = self.process_fulfillment_type( + stats, route, path, "messages") + + return stats + + def process_routes(self, page: types.Page, stats: types.AgentData): + """Parse through all Transition Routes.""" + tf_key = "triggerFulfillment" + + if not page.routes: + return stats + + for route_data in page.routes: + route = types.Fulfillment(page=page) + route.data = route_data + route.agent_id = page.agent_id + route.fulfillment_type = "transition_route" + route.trigger = self.get_trigger_info(route) + route, stats = self.set_route_targets(route, stats) + + stats = self.process_intents_in_routes(route, stats) + + path = route.data.get(tf_key, None) + + if not path: + continue + + # Flag for Webhook Handler + self.check_for_webhook(page, path) + + stats = self.process_fulfillment_type( + stats, route, path, "messages") + + # Preset Params processed here + stats = self.process_fulfillment_type( + stats, route, path, "setParameterActions" + ) + + return stats + + def process_entry(self, page: types.Page, stats: types.AgentData): + """Process Entry Fulfillment on a single page file. + + The Entry Fulfillment to a Page only has 1 "route" (i.e. itself) so + there is no need to loop through multiple routes, as they don't + exist for Entry Fulfillment. + """ + + if not page.entry: + return stats + + route = types.Fulfillment(page=page) + route.data = page.entry + route.agent_id = page.agent_id + route.fulfillment_type = "entry" + route.trigger = "entry" + path = route.data + + self.check_for_webhook(page, path) + + stats = self.process_fulfillment_type(stats, route, path, "messages") + + return stats diff --git a/src/dfcx_scrapi/agent_extract/test_cases.py b/src/dfcx_scrapi/agent_extract/test_cases.py new file mode 100644 index 00000000..ee5be205 --- /dev/null +++ b/src/dfcx_scrapi/agent_extract/test_cases.py @@ -0,0 +1,184 @@ +"""Test Case processing methods and functions.""" + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + +from typing import Dict, List, Any + +from dfcx_scrapi.agent_extract import common +from dfcx_scrapi.agent_extract import types + + +class TestCases: + """Test Case processing methods and functions.""" + + def __init__(self): + self.common = common.Common() + + @staticmethod + def build_test_case_path_list(agent_local_path: str): + """Builds a list of files, each representing a test case.""" + root_dir = agent_local_path + "/testCases" + + test_case_paths = [] + + for test_case in os.listdir(root_dir): + end = test_case.split(".")[-1] + if end == "json": + test_case_path = f"{root_dir}/{test_case}" + test_case_paths.append(test_case_path) + + return test_case_paths + + @staticmethod + def get_test_case_intent_phrase_pair( + tc: types.TestCase) -> List[Dict[str, str]]: + """Parse Test Case and return a list of intents in use. + + This method will produce a List of Dicts where the contents of each + dict is the Training Phrase and associated Triggered Intent as listed + in the Test Case Conversation Turn. This information is used to compare + the User Input training phrase with the actual training phrases that + exist in the Intent resource. + + The dict format is as follows: + { + training_phrase: , + intent: + } + """ + intent_data = [] + + if tc.conversation_turns: + for turn in tc.conversation_turns: + user = turn["userInput"] + agent = turn["virtualAgentOutput"] + intent = agent.get("triggeredIntent", None) + phrase = user.get("input", None) + + text = phrase.get("text", None) + + if text: + text = text["text"] + + if intent and text: + intent_data.append( + { + "user_utterance": text, + "intent": intent["name"], + "status": "valid", + "training_phrases": [], + } + ) + + return intent_data + + @staticmethod + def get_test_case_intent_data(agent_local_path: str): + """Collect all Intent Files and Training Phrases for Test Case.""" + intents_path = agent_local_path + "/intents" + + intent_paths = [] + + for intent_dir in os.listdir(intents_path): + intent_dir_path = f"{intents_path}/{intent_dir}" + intent_paths.append( + {"intent": intent_dir, "file_path": intent_dir_path} + ) + + return intent_paths + + @staticmethod + def flatten_tp_data(tp_data: List[Any]): + """Flatten the Training Phrase proto to a list of strings.""" + cleaned_tps = [] + + for tp in tp_data["trainingPhrases"]: + parts_list = [part["text"].lower() for part in tp["parts"]] + cleaned_tps.append("".join(parts_list)) + + return cleaned_tps + + def gather_intent_tps(self, tc: types.TestCase): + """Collect all TPs associated with Intent data in Test Case.""" + tc.associated_intent_data = {} + + for i, pair in enumerate(tc.intent_data): + intent_dir = tc.agent_path + "/intents/" + pair["intent"] + + try: + if "trainingPhrases" in os.listdir(intent_dir): + training_phrases_path = intent_dir + "/trainingPhrases" + + for lang_file in os.listdir(training_phrases_path): + # lang_code = lang_file.split(".")[0] + lang_code_path = f"{training_phrases_path}/{lang_file}" + + with open( + lang_code_path, "r", encoding="UTF-8" + ) as tp_file: + tp_data = json.load(tp_file) + cleaned_tps = self.flatten_tp_data(tp_data) + + tp_file.close() + + tc.intent_data[i]["training_phrases"].extend( + cleaned_tps + ) + tc.associated_intent_data[pair["intent"]] = cleaned_tps + + except FileNotFoundError: + tc.intent_data[i]["status"] = "invalid_intent" + tc.has_invalid_intent = True + continue + + return tc + + def process_test_case(self, tc: types.TestCase, stats: types.AgentData): + """Process a single Test Case file.""" + + with open(tc.dir_path, "r", encoding="UTF-8") as tc_file: + tc.data = json.load(tc_file) + tc.resource_id = tc.data.get("name", None) + tc.display_name = tc.data.get("displayName", None) + tc.tags = tc.data.get("tags", None) + tc.conversation_turns = tc.data.get( + "testCaseConversationTurns", None + ) + tc.test_config = tc.data.get("testConfig", None) + + full_tc_id = f"{stats.agent_id}/testCases/{tc.resource_id}" + tc.data["name"] = full_tc_id + stats.test_cases.append(tc.data) + + tc_file.close() + + return stats + + def process_test_cases_directory( + self, agent_local_path: str, stats: types.AgentData): + """Processing the test cases dir in the JSON package structure.""" + test_case_paths = self.build_test_case_path_list(agent_local_path) + stats.total_test_cases = len(test_case_paths) + + for test_case in test_case_paths: + tc = types.TestCase() + tc.dir_path = test_case + tc.agent_path = agent_local_path + stats = self.process_test_case(tc, stats) + + return stats diff --git a/src/dfcx_scrapi/agent_extract/types.py b/src/dfcx_scrapi/agent_extract/types.py new file mode 100644 index 00000000..ac25c34d --- /dev/null +++ b/src/dfcx_scrapi/agent_extract/types.py @@ -0,0 +1,212 @@ +"""Collection of Type Classes used for offline processing.""" + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, List, Any, Tuple +from dataclasses import dataclass, field + +from dfcx_scrapi.agent_extract import graph as graph_class + +@dataclass +class AgentMetadata: + """Used to track the current Agent Metadata attrinbutes.""" + + default_language_code: str = None + dtmf_settings: bool = False + logging_enabled: bool = False + speech_adaptation: bool = False + + +@dataclass +class Flow: + """Used to track current Flow Attributes.""" + agent_id: str = None + all_pages: set = field(default_factory=set) + active_pages: set = field(default_factory=set) + data: Dict[str, Any] = field(default_factory=dict) + dangling_pages: set = field(default_factory=set) + dir_path: str = None # Full Directory Path for this Flow + display_name: str = None # Flow Display Name (removed special chars) + file_name: str = None # Original Name of Flow (includes special chars) + graph: graph_class.Graph = None + resource_id: str = None + resource_type: str = "flow" + start_page_file: str = None # File Path Location of START_PAGE + unreachable_pages: set = field(default_factory=set) + unused_pages: set = field(default_factory=set) + +@dataclass +class Page: + """Used to track current Page Attributes.""" + + agent_id: str = None + data: Dict[str, Any] = None + display_name: str = None + entry: Dict[str, Any] = None + events: List[object] = None + flow: Flow = None + form: Dict[str, Any] = None + has_webhook: bool = False + has_webhook_event_handler: bool = False + page_file: str = None + resource_id: str = None + resource_type: str = "page" + routes: List[object] = None + route_groups: List[str] = None + +@dataclass +class FormParameter: + """Tracks Form Paramter attributes within a Page.""" + + advanced_settings: str = None + agent_id: str = None + data: Dict[str, Any] = None + display_name: str = None + dtmf_settings: str = None + entity_type: str = None + fill_behavior: Dict[str, Any] = None + init_fulfillment: Dict[str, Any] = None + page: Page = None + reprompt_handlers: Dict[str, Any] = None + required: bool = True + + +@dataclass +class RouteGroup: + """Used to track current RouteGroup Attributes.""" + + agent_id: str = None + data: Dict[str, Any] = None + display_name: str = None + flow: Flow = None + resource_id: str = None + resource_type: str = "route_group" + rg_file: str = None + routes: List[object] = None + +@dataclass +class Fulfillment: + """Used to track current Fulfillment Attributes.""" + + agent_id: str = None + data: Dict[str, Any] = None + display_name: str = None # Inherit from Page easy logging + fulfillment_type: str = None # transition_route | event + page: Page = None + parameter: str = None # Used for Reprompt Event Handlers + target_flow: str = None + target_page: str = None + text: str = None + trigger: str = None + resource_type: str = "fulfillment" + +@dataclass +class Intent: + """Used to track current Intent Attributes.""" + + agent_id: str = None + data: Dict[str, Any] = None + description: str = None + display_name: str = None + dir_path: str = None + labels: Dict[str, str] = None + metadata_file: str = None + parameters: List[Dict[str, str]] = field(default_factory=list) + resource_id: str = None + resource_type: str = "intent" + training_phrases: Dict[str, Any] = field(default_factory=dict) + +@dataclass +class EntityType: + """Used to track current Flow Attributes.""" + + agent_id: str = None + auto_expansion: str = None + data: Dict[str, Any] = None + dir_path: str = None # Full Directory Path for this Entity Type + display_name: str = None # Entity Type Display Name + entities: Dict[str, Any] = field(default_factory=dict) # Map + excluded_phrases: Dict[str, Any] = field(default_factory=dict) # Map + fuzzy_extraction: bool = False + kind: str = None # The kind of Entity Type represented + resource_id: str = None + resource_type: str = "entity_type" + +@dataclass +class TestCase: + """Used to track current Test Case Attributes.""" + + associated_intent_data: Dict[str, Any] = None + agent_id: str = None + agent_path: str = None + conversation_turns: List[Any] = None + data: Dict[str, Any] = None + dir_path: str = None + display_name: str = None + has_invalid_intent: bool = False + intent_data: List[str] = None + qualified: bool = False + resource_id: str = None + resource_type: str = "test_case" + tags: List[str] = None + test_config: Dict[str, Any] = None + +@dataclass +class Webhook: + """Used to track current Webhook attributes.""" + + agent_id: str = None + agent_path: str = None + data: Dict[str, Any] = None + dir_path: str = None + display_name: str = None + resource_id: str = None + resource_type: str = "webhook" + service_type: str = None + timeout: int = 0 + +@dataclass +class AgentData: + """Used to track agent data for each section processed.""" + active_intents: Dict[str, List[Tuple[str, str]]] = field( + default_factory=dict) + active_pages: Dict[str, set] = field(default_factory=dict) + agent_id: str = None + entity_types: List[Dict[str, Any]] = field(default_factory=list) + entity_types_map: Dict[str, Any] = field(default_factory=dict) + flow_page_map: Dict[str, Any] = field(default_factory=dict) + flows: List[Dict[str, Any]] = field(default_factory=list) + flows_map: Dict[str, Any] = field(default_factory=dict) + graph: graph_class.Graph = None + intents: List[Dict[str, Any]] = field(default_factory=list) + intents_map: Dict[str, Any] = field(default_factory=dict) + lang_code: str = "en" + pages: Dict[str, List[Dict[str, Any]]] = field(default_factory=dict) + route_groups: Dict[str, List[Dict[str, Any]]] = field(default_factory=dict) + route_groups_map: Dict[str, Any] = field(default_factory=dict) + test_cases: List[Dict[str, Any]] = field(default_factory=list) + unreachable_pages: Dict[str, set] = field(default_factory=dict) + unused_pages: Dict[str, set] = field(default_factory=dict) + webhooks: List[Dict[str, Any]] = field(default_factory=list) + webhooks_map: Dict[str, Any] = field(default_factory=dict) + + total_flows: int = 0 + total_pages: int = 0 + total_intents: int = 0 + total_training_phrases: int = 0 + total_entity_types: int = 0 + total_route_groups: int = 0 + total_test_cases: int = 0 + total_webhooks: int = 0 diff --git a/src/dfcx_scrapi/agent_extract/webhooks.py b/src/dfcx_scrapi/agent_extract/webhooks.py new file mode 100644 index 00000000..38467080 --- /dev/null +++ b/src/dfcx_scrapi/agent_extract/webhooks.py @@ -0,0 +1,95 @@ +"""Webhook processing methods and functions.""" + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + +from dfcx_scrapi.agent_extract import common +from dfcx_scrapi.agent_extract import types + +class Webhooks: + """Webhook linter methods and functions.""" + + def __init__(self): + self.common = common.Common() + + @staticmethod + def build_webhook_path_list(agent_local_path: str): + """Builds a list of webhook file locations.""" + root_dir = agent_local_path + "/webhooks" + + webhook_paths = [] + + for webhook_file in os.listdir(root_dir): + webhook_file_path = f"{root_dir}/{webhook_file}" + webhook_paths.append(webhook_file_path) + + return webhook_paths + + @staticmethod + def get_service_type(webhook: types.Webhook) -> str: + """Get the type of Webhook Service that is cofigured.""" + if "genericWebService" in webhook.data: + webhook.service_type = "Generic Web Service" + + else: + webhook.service_type = "Other" + + return webhook.service_type + + def process_webhook(self, webhook: types.Webhook, stats: types.AgentData + ) -> types.AgentData: + """Process a single Webhook file.""" + + with open(webhook.dir_path, "r", encoding="UTF-8") as webhook_file: + webhook.data = json.load(webhook_file) + webhook.resource_id = webhook.data.get("name", None) + webhook.display_name = webhook.data.get("displayName", None) + webhook.service_type = self.get_service_type(webhook) + + timeout_dict = webhook.data.get("timeout", None) + if timeout_dict: + webhook.timeout = timeout_dict.get("seconds", None) + + webhook_file.close() + + full_webhook_id = f"{stats.agent_id}/webhooks/{webhook.resource_id}" + webhook.data["name"] = full_webhook_id + stats.webhooks.append(webhook.data) + stats.total_webhooks += 1 + + return stats + + def process_webhooks_directory(self, agent_local_path: str, + stats: types.AgentData) -> types.AgentData: + """Processing the top level Webhooks Dir in the JSON Package structure. + + The following files exist under the `webhooks` dir: + - .json + """ + # Create a list of all Webhook paths to iter through + webhook_paths = self.build_webhook_path_list(agent_local_path) + + for webhook_path in webhook_paths: + webhook = types.Webhook() + webhook.dir_path = webhook_path + + stats = self.process_webhook(webhook, stats) + + full_webhook_id = f"{stats.agent_id}/webhooks/{webhook.resource_id}" + stats.webhooks_map[webhook.display_name] = full_webhook_id + + return stats diff --git a/src/dfcx_scrapi/core/agents.py b/src/dfcx_scrapi/core/agents.py index f4897250..7e441528 100644 --- a/src/dfcx_scrapi/core/agents.py +++ b/src/dfcx_scrapi/core/agents.py @@ -364,7 +364,11 @@ def export_agent( self, agent_id: str, gcs_bucket_uri: str, - environment_display_name: str = None + environment_display_name: str = None, + data_format: str = "BLOB", + git_branch: str = None, + git_commit_message: str = None, + include_bq_export_settings: bool = False ) -> str: """Exports the specified CX agent to Google Cloud Storage bucket. @@ -374,17 +378,39 @@ def export_agent( gcs_bucket_uri: The Google Cloud Storage bucket/filepath to export the agent to in the following format: `gs:///` - environment_display_name: (Optional) CX Agent environment display name + environment_display_name: CX Agent environment display name as string. If not set, DRAFT environment is assumed. + data_format: Optional. The data format of the exported agent. If not + specified, ``BLOB`` is assumed. + git_branch: Optional. The Git branch to commit the exported agent to. + git_commit_message: Optional. The Git Commit message to send. Only + applicable if using `git_branch` arg. + include_bigquery_export_settings: Will exclude or included the BQ + settings on export. Returns: A Long Running Operation (LRO) ID that can be used to check the status of the export using dfcx_scrapi.core.operations->get_lro() """ + blob_format = types.agent.ExportAgentRequest.DataFormat(1) + json_format = types.agent.ExportAgentRequest.DataFormat(4) + request = types.agent.ExportAgentRequest() request.name = agent_id request.agent_uri = gcs_bucket_uri + request.include_bigquery_export_settings = include_bq_export_settings + + if data_format in ["JSON", "ZIP", "JSON_PACKAGE"]: + request.data_format = json_format + else: + request.data_format = blob_format + + if git_branch: + git_settings = types.agent.ExportAgentRequest.GitDestination() + git_settings.tracking_branch = git_branch + git_settings.commit_message = git_commit_message + request.git_destination = git_settings if environment_display_name: self._environments = environments.Environments(creds=self.creds) diff --git a/src/dfcx_scrapi/core/flows.py b/src/dfcx_scrapi/core/flows.py index 8fd7de97..da236136 100644 --- a/src/dfcx_scrapi/core/flows.py +++ b/src/dfcx_scrapi/core/flows.py @@ -15,11 +15,13 @@ # limitations under the License. import logging +import time from typing import Dict, List from google.cloud.dialogflowcx_v3beta1 import services from google.cloud.dialogflowcx_v3beta1 import types from google.protobuf import field_mask_pb2 from dfcx_scrapi.core import scrapi_base +from dfcx_scrapi.core import pages # logging config logging.basicConfig( @@ -54,6 +56,7 @@ def __init__( self.flow_id = flow_id self.agent_id = agent_id + self.pages = pages.Pages(creds=self.creds) # TODO: Migrate to Flow Builder class when ready @staticmethod @@ -129,6 +132,38 @@ def get_flows_map(self, agent_id: str, reverse=False): return flows_dict + def get_flow_page_map( + self, agent_id: str, rate_limit: float = 1.0 + ) -> Dict[str, Dict[str, str]]: + """Exports a user friendly dict containing Flows, Pages, and IDs + This method builds on top of `get_flows_map` and builds out a nested + dictionary containing all of the Page Display Names and UUIDs contained + within each Flow. Output Format: + { + : { + 'id': + 'pages': { : } + } + } + + Args: + agent_id: the formatted CX Agent ID to use + + Returns: + Dictionary containing Flow Names/UUIDs and Page Names/UUIDs + """ + flow_page_map = {} + + flows_map = self.get_flows_map(agent_id, reverse=True) + + for flow in flows_map: + pages_map = self.pages.get_pages_map( + flows_map[flow], reverse=True) + flow_page_map[flow] = {"id": flows_map[flow], "pages": pages_map} + time.sleep(rate_limit) + + return flow_page_map + @scrapi_base.api_call_counter_decorator def train_flow(self, flow_id: str) -> str: """Trains the specified flow. diff --git a/src/dfcx_scrapi/core/test_cases.py b/src/dfcx_scrapi/core/test_cases.py index 78976aea..b705a278 100644 --- a/src/dfcx_scrapi/core/test_cases.py +++ b/src/dfcx_scrapi/core/test_cases.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - +import pandas as pd import logging from typing import Dict, List @@ -23,6 +23,8 @@ from google.protobuf import field_mask_pb2 from dfcx_scrapi.core import scrapi_base +from dfcx_scrapi.core import flows +from dfcx_scrapi.core import pages # logging config logging.basicConfig( @@ -59,6 +61,120 @@ def __init__( self.test_case_id = test_case_id self.client_options = self._set_region(self.test_case_id) + def _convert_test_result_to_string(self, test_case: types.TestCase) -> str: + """Converts the Enum result to a string.""" + if test_case.last_test_result.test_result == 0: + return "TEST_RESULT_UNSPECIFIED" + elif test_case.last_test_result.test_result == 1: + return "PASSED" + elif test_case.last_test_result.test_result == 2: + return "FAILED" + else: + return "" + + def _convert_test_result_to_bool(self, test_case: types.TestCase) -> bool: + """Converts the String result to a boolean.""" + test_result = self._convert_test_result_to_string(test_case) + + if test_result == "PASSED": + return True + elif test_result == "FAILED": + return False + else: + return None + + def _get_flow_id_from_test_config( + self, test_case: types.TestCase) -> str: + """Attempt to get the Flow ID from the Test Case Test Config.""" + if "flow" in test_case.test_config: + return test_case.test_config.flow + elif "page" in test_case.test_config: + return "/".join(test_case.test_config.page.split("/")[:8]) + else: + agent_id = "/".join(test_case.name.split("/")[:6]) + return f"{agent_id}/flows/00000000-0000-0000-0000-000000000000" + + def _get_page_id_from_test_config( + self, test_case: types.TestCase, flow_id: str) -> str: + """Attempt to get the Page ID from the Test Case Test Config.""" + if "page" in test_case.test_config: + return test_case.test_config.page + else: + return f"{flow_id}/pages/START_PAGE" + + def _get_page_display_name( + self, flow_id: str, page_id: str, + pages_map: Dict[str, Dict[str, str]]) -> str: + """Get the Page Display Name from the Pages Map based on the Page ID.""" + page_map = pages_map.get(flow_id, None) + page = "START_PAGE" + if page_map: + page = page_map.get(page_id, None) + + return page + + def _process_test_case(self, test_case, flows_map, pages_map): + """Takes a response from list_test_cases and returns a single row + dataframe of the test case result. + + Args: + test_case: The test case response + flows_map: A dictionary mapping flow IDs to flow display names + pages_map: A dictionary with keys as flow IDs and values as + dictionaries mapping page IDs to page display names for that flow + + Returns: A dataframe with columns: + display_name, id, short_id, tags, creation_time, start_flow, + start_page, test_result, passed, test_time + """ + flow_id = self._get_flow_id_from_test_config(test_case) + page_id = self._get_page_id_from_test_config(test_case, flow_id) + page = self._get_page_display_name(flow_id, page_id, pages_map) + test_result = self._convert_test_result_to_bool(test_case) + + return pd.DataFrame( + { + "display_name": [test_case.display_name], + "id": [test_case.name], + "short_id": [test_case.name.split("/")[-1]], + "tags": [",".join(test_case.tags)], + "creation_time": [test_case.creation_time], + "start_flow": [flows_map.get(flow_id, None)], + "start_page": [page], + # "test_result": [test_result], + "passed": [test_result], + "test_time": [test_case.last_test_result.test_time] + } + ) + + def _retest_cases( + self, test_case_df: pd.DataFrame, retest_ids: List[str] + ) -> pd.DataFrame: + print("To retest:", len(retest_ids)) + response = self.batch_run_test_cases(retest_ids, self.agent_id) + for result in response.results: + # Results may not be in the same order as they went in + # Process the name a bit to remove the /results/id part + tc_id_full = "/".join(result.name.split("/")[:-2]) + tc_id = tc_id_full.rsplit("/", maxsplit=1)[-1] + + # Update dataframe where id = tc_id_full + # row = test_case_df.loc[test_case_df['id']==tc_id_full] + test_case_df.loc[ + test_case_df["id"] == tc_id_full, "short_id" + ] = tc_id + # test_case_df.loc[ + # test_case_df["id"] == tc_id_full, "test_result" + # ] = str(result.test_result) + test_case_df.loc[ + test_case_df["id"] == tc_id_full, "test_time" + ] = result.test_time + test_case_df.loc[test_case_df["id"] == tc_id_full,"passed"] = ( + str(result.test_result) == "TestResult.PASSED" + ) + + return test_case_df + @scrapi_base.api_call_counter_decorator def list_test_cases( self, agent_id: str = None, include_conversation_turns: bool = False @@ -453,3 +569,50 @@ def calculate_coverage(self, coverage_type: int, agent_id: str = None): ) response = client.calculate_coverage(request) return response + + def get_test_case_results_df(self, agent_id=None, retest_all=False): + """Convert Test Cases to Dataframe. + + Gets the test case results for this agent, and generates a dataframe + with their details. Any tests without a result will be run in a batch. + + Args: + agent_id: The agent to create the test case for. Format: + `projects//locations//agents/` + retest_all: if true, all test cases are re-run, + regardless of whether or not they had a result + + Returns: + DataFrame of test case results for this agent, with columns: + display_name, id, short_id, tags, creation_time, start_flow, + start_page, passed, test_time + """ + if agent_id: + self.agent_id = agent_id + + dfcx_flows = flows.Flows(creds=self.creds, agent_id=self.agent_id) + dfcx_pages = pages.Pages(creds=self.creds) + flows_map = dfcx_flows.get_flows_map(agent_id=self.agent_id) + pages_map = {} + for flow_id in flows_map.keys(): + pages_map[flow_id] = dfcx_pages.get_pages_map(flow_id=flow_id) + + test_case_results = self.list_test_cases(self.agent_id) + retest_ids = [] + test_case_rows = [] + + for test_case in test_case_results: + row = self._process_test_case(test_case, flows_map, pages_map) + test_case_rows.append(row) + test_result = self._convert_test_result_to_string(test_case) + if retest_all or test_result == "TEST_RESULT_UNSPECIFIED": + retest_ids.append(test_case.name) + + # Create dataframe + test_case_df = pd.concat(test_case_rows) + + # Retest any that haven't been run yet + if len(retest_ids) > 0: + test_case_df = self._retest_cases(test_case_df,retest_ids) + + return test_case_df diff --git a/src/dfcx_scrapi/tools/agent_checker_util.py b/src/dfcx_scrapi/tools/agent_checker_util.py new file mode 100644 index 00000000..ff6e331a --- /dev/null +++ b/src/dfcx_scrapi/tools/agent_checker_util.py @@ -0,0 +1,203 @@ +"""A set of Utility methods to check resources stats on DFCX Agents.""" + +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from typing import Dict, List +import pandas as pd + +from dfcx_scrapi.core import scrapi_base +from dfcx_scrapi.agent_extract import agents + +# logging config +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)-8s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) + +class AgentCheckerUtil(scrapi_base.ScrapiBase): + """Utility class for checking DFCX Agents.""" + + def __init__( + self, + agent_id: str, + gcs_bucket_uri: str, + creds_path: str = None, + creds_dict: Dict = None, + creds=None, + scope=False, + ): + super().__init__( + creds_path=creds_path, + creds_dict=creds_dict, + creds=creds, + scope=scope, + ) + self.agent_id = agent_id + self.special_pages = [ + "End Session", + "End Flow", + "Start Page", + "Current Page", + "Previous Page", + ] + + startup_time = time.time() + self.extract = agents.Agents(agent_id) + processing_time = time.time() + logging.debug(f"STARTUP: {processing_time - startup_time}") + + self.data = self.extract.process_agent(agent_id, gcs_bucket_uri) + logging.debug(f"TOTAL PROCESSING: {time.time() - processing_time}") + + self.active_intents_df = self.active_intents_to_dataframe() + + def _filter_special_pages(self, page: str, filter_special_pages: bool): + """Recursion helper to check for special page match.""" + if filter_special_pages and page in self.special_pages: + return True + + return False + + def _recurse_edges(self, edges: Dict[str, List[str]], page: str, + visited: set, depth: int, max_depth: int, + filter_special_pages: bool): + """Recursion method used to traverse the agent graph for page data. + + Args: + edges: The set of graph edges collected from the agent. + page: The current Page Display Name + visited: A set of visited Page nodes + depth: The current recursion depth + max_depth: The max recursion depth + filter_special_pages: Will discard all self.special_pages from output + if set to False. + """ + if depth == max_depth: + return visited + + if page in edges: + for inner_page in edges[page]: + if self._filter_special_pages(inner_page, filter_special_pages): + return visited + + if inner_page not in visited: + visited.add(inner_page) + visited = self._recurse_edges( + edges, inner_page, visited, depth+1, max_depth, + filter_special_pages) + + return visited + + def _mark_unreachable_pages(self, df: pd.DataFrame) -> pd.DataFrame: + """Mark dataframe rows True if the page is unreachable in graph.""" + for idx, row in df.iterrows(): + for page in self.data.unreachable_pages[row["flow"]]: + if row["page"] == page: + df.loc[idx, "unreachable"] = True + + return df + + def get_reachable_pages( + self, + flow_display_name: str, + page_display_name: str = "Start Page", + max_depth: int = 1, + filter_special_pages = True) -> List[str]: + """Get all pages in the graph that are reachable via transition routes, + starting from a given Flow and Page. + + Args: + flow_display_name: The display name of the flow + page_display_name: The display name of the page. Defaults to + "Start Page" + max_depth: The max recursion depth to search the graph from the + provided starting point. For example, a max_depth of 2 would produce + all reachable Pages that are 2 transition routes away from the + starting Flow/Page. Defaults to 1. + filter_special_pages: Will filter out all self.special_pages. Defaults + to True. + """ + if page_display_name in ["START", "START_PAGE", "Start", "Start Page"]: + page_display_name = "Start Page" + page_display_name = f"{flow_display_name}: {page_display_name}" + + visited = self._recurse_edges( + self.data.graph.edges, page_display_name, set(), 0, max_depth, + filter_special_pages) + + return list(visited) + + def active_intents_to_dataframe(self) -> pd.DataFrame: + """Gets all intents referenced in the agent, across all flows and pages, + and produces a dataframe listing which flows/pages reference each + intent. + + Returns: + A dataframe with columns + intent - the intent display name + flow - the Flow Display Name where the intent resides + page - the Page Display Name where the intent resides + unreachable - Denotes whether the Flow/Page/Intent combination is + unreachable in the graph. + """ + df = pd.DataFrame({ + "intent": pd.Series(dtype="str"), + "flow": pd.Series(dtype="str"), + "page": pd.Series(dtype="str"), + "unreachable": pd.Series(dtype="bool") + }) + + # Loop over active_intents, create temp dataframe, then concat with the + # main dataframe to build out the complete Flow/Page/Intent dataset. + for flow in self.data.active_intents: + for pair in self.data.active_intents[flow]: + intent = pair[0] + page = pair[1] + temp = pd.DataFrame({ + "intent": [intent], + "flow": [flow], + "page": [page], + "unreachable": [False]}) + df = pd.concat([df, temp]) + + df = df.reset_index(drop=True) + + # Finally, determine what rows are unreachable. + self.active_intents_df = self._mark_unreachable_pages(df) + + return self.active_intents_df + + def get_unused_intents(self) -> List: + """Get all unused Intents across the agent.""" + if self.active_intents_df.empty: + self.active_intents_df = self.active_intents_to_dataframe() + active_intents_set = set(self.active_intents_df.intent.to_list()) + all_intents_set = set(self.data.intents_map.keys()) + + return list(all_intents_set.difference(active_intents_set)) + + def get_unreachable_intents(self) -> pd.DataFrame: + """Get all unreachable Intents across the agent. + + An Intent is unreachable if it resides on a page that is also + unreachable. + """ + if self.active_intents_df.empty: + self.active_intents_df = self.active_intents_to_dataframe() + + return self.active_intents_df[self.active_intents_df["unreachable"]]