From ed2253169dedff795b736a5ebcdb4ba3b209cb4b Mon Sep 17 00:00:00 2001 From: Max Wang Date: Thu, 20 Jun 2024 15:34:01 -0400 Subject: [PATCH 1/3] Add Test Query Runner --- test_harness/result_collector.py | 26 +-- test_harness/run.py | 200 ++++++------------- test_harness/runner/Test Query Runner | 18 ++ test_harness/runner/generate_query.py | 101 ++++++++++ test_harness/runner/runner.py | 230 ++++++++++++++++++++++ test_harness/runner/smart_api_registry.py | 126 ++++++++++++ test_harness/utils.py | 57 ++++-- 7 files changed, 586 insertions(+), 172 deletions(-) create mode 100644 test_harness/runner/Test Query Runner create mode 100644 test_harness/runner/generate_query.py create mode 100644 test_harness/runner/runner.py create mode 100644 test_harness/runner/smart_api_registry.py diff --git a/test_harness/result_collector.py b/test_harness/result_collector.py index 43eefe8..9a5633a 100644 --- a/test_harness/result_collector.py +++ b/test_harness/result_collector.py @@ -1,8 +1,8 @@ """The Collector of Results.""" - +from typing import Union from translator_testing_model.datamodel.pydanticmodel import TestAsset, TestCase -from .utils import get_tag +from test_harness.utils import get_tag class ResultCollector: @@ -10,7 +10,7 @@ class ResultCollector: def __init__(self): """Initialize the Collector.""" - self.agents = ["ars", "aragorn", "arax", "bte", "improving", "unsecret", "cqs"] + self.agents = ["ars", "aragorn", "arax", "biothings-explorer", "improving-agent", "unsecret-agent", "cqs"] query_types = ["TopAnswer", "Acceptable", "BadButForgivable", "NeverShow"] self.result_types = { "PASSED": "PASSED", @@ -30,22 +30,22 @@ def __init__(self): header = ",".join(self.columns) self.csv = f"{header}\n" - def collect_result(self, test: TestCase, asset: TestAsset, result: dict, url: str): - """Add a single result to the total output.""" + def collect_result(self, test: TestCase, asset: TestAsset, report: dict, parent_pk: Union[str, None], url: str): + """Add a single report to the total output.""" # add result to stats - for agent in result["result"]: + for agent in self.agents: query_type = asset.expected_output - result_type = self.result_types.get( - get_tag(result["result"][agent]), "Test Error" - ) - self.stats[agent][query_type][result_type] += 1 + if agent in report: + result_type = self.result_types.get( + get_tag(report[agent]), "Test Error" + ) + self.stats[agent][query_type][result_type] += 1 # add result to csv agent_results = ",".join( - get_tag(result["result"][agent]) for agent in self.agents + get_tag(report[agent]) for agent in self.agents if agent in report ) - ars_pk = result["pks"].get("parent_pk", None) - pk_url = f"https://arax.ncats.io/?r={ars_pk}" if ars_pk is not None else "" + pk_url = f"https://arax.ncats.io/?r={parent_pk}" if parent_pk is not None else "" self.csv += ( f"""{asset.name},{url},{pk_url},{test.id},{asset.id},{agent_results}\n""" ) diff --git a/test_harness/run.py b/test_harness/run.py index b943237..bc777c1 100644 --- a/test_harness/run.py +++ b/test_harness/run.py @@ -1,7 +1,5 @@ """Run tests through the Test Runners.""" -from collections import defaultdict -import httpx import json import logging import time @@ -9,16 +7,16 @@ import traceback from typing import Dict -from ARS_Test_Runner.semantic_test import run_semantic_test as run_ars_test - +from ARS_Test_Runner.semantic_test import pass_fail_analysis # from benchmarks_runner import run_benchmarks from translator_testing_model.datamodel.pydanticmodel import TestCase -from .reporter import Reporter -from .slacker import Slacker -from .result_collector import ResultCollector -from .utils import normalize_curies, get_tag +from test_harness.runner.runner import QueryRunner +from test_harness.reporter import Reporter +from test_harness.slacker import Slacker +from test_harness.result_collector import ResultCollector +from test_harness.utils import get_tag, hash_test_asset async def run_tests( @@ -42,6 +40,9 @@ async def run_tests( f"Running {args['suite']} ({sum([len(test.test_assets) for test in tests.values()])} tests)...\n<{reporter.base_path}/test-runs/{reporter.test_run_id}|View in the Information Radiator>" ] ) + query_runner = QueryRunner(logger) + logger.info("Runner is getting service registry") + await query_runner.retrieve_registry(trapi_version="1.5.0") collector = ResultCollector() # loop over all tests for test in tqdm(tests.values()): @@ -51,31 +52,12 @@ async def run_tests( if not test.test_assets or not test.test_case_objective: logger.warning(f"Test has missing required fields: {test.id}") continue + + query_responses = await query_runner.run_queries(test) if test.test_case_objective == "AcceptanceTest": - assets = test.test_assets test_ids = [] - biolink_object_aspect_qualifier = "" - biolink_object_direction_qualifier = "" - for qualifier in test.qualifiers: - if qualifier.parameter == "biolink_object_aspect_qualifier": - biolink_object_aspect_qualifier = qualifier.value - elif qualifier.parameter == "biolink_object_direction_qualifier": - biolink_object_direction_qualifier = qualifier.value - # normalize all the curies - curies = [asset.output_id for asset in assets] - curies.append(test.test_case_input_id) - normalized_curies = await normalize_curies(test, logger) - input_curie = normalized_curies[test.test_case_input_id]["id"]["identifier"] - # try and get normalized input category, but default to original - # input_category = normalized_curies[test.test_case_input_id].get( - # "type", [test.input_category] - # )[0] - # TODO: figure out the right way to handle input category wrt normalization - input_category = test.input_category - - err_msg = "" - for asset in assets: + for asset in test.test_assets: # create test in Test Dashboard test_id = "" try: @@ -83,139 +65,79 @@ async def run_tests( test_ids.append(test_id) except Exception: logger.error(f"Failed to create test: {test.id}") + + test_asset_hash = hash_test_asset(asset) + test_query = query_responses.get(test_asset_hash) + if test_query is not None: + message = json.dumps(test_query["query"], indent=2) + else: + message = "Unable to retrieve response for test asset." + await reporter.upload_log( + test_id, + message, + ) - try: - test_input = json.dumps( - { - "environment": test.test_env, - "predicate": test.test_case_predicate_name, - "runner_settings": test.test_runner_settings, - "expected_output": asset.expected_output, - "biolink_object_aspect_qualifier": biolink_object_aspect_qualifier, - "biolink_object_direction_qualifier": biolink_object_direction_qualifier, - "input_category": input_category, - "input_curie": input_curie, - "output_curie": normalized_curies[asset.output_id]["id"][ - "identifier" - ], - }, - indent=2, - ) - await reporter.upload_log( - test_id, - "Calling ARS Test Runner with: {test_input}".format( - test_input=test_input - ), - ) - except Exception as e: - logger.error(str(e)) - logger.error(f"Failed to upload logs to test: {test.id}, {test_id}") - - # group all outputs together to make one Translator query - output_ids = [ - normalized_curies[asset.output_id]["id"]["identifier"] - for asset in assets - ] - expected_outputs = [asset.expected_output for asset in assets] - test_inputs = [ - test.test_env, - test.test_case_predicate_name, - test.test_runner_settings, - expected_outputs, - biolink_object_aspect_qualifier, - biolink_object_direction_qualifier, - input_category, - input_curie, - output_ids, - ] - try: - ars_result, ars_url = await run_ars_test(*test_inputs) - except Exception as e: - err_msg = f"ARS Test Runner failed with {traceback.format_exc()}" - logger.error(f"[{test.id}] {err_msg}") - ars_url = None - ars_result = { - "pks": {}, - # this will effectively act as a list that we access by index down below - "results": defaultdict(lambda: {"error": err_msg}), - } - # full_report[test["test_case_input_id"]]["ars"] = {"error": str(e)} - try: - ars_pk = ars_result.get("pks", {}).get("parent_pk") - if ars_pk and ars_url is not None: - async with httpx.AsyncClient() as client: - await client.post(f"{ars_url}retain/{ars_pk}") - except Exception as e: - logger.error("Failed to retain PK on ARS.") - # grab individual results for each asset - for index, (test_id, asset) in enumerate(zip(test_ids, assets)): - status = "PASSED" - try: - results = ars_result.get("results", []) - if isinstance(results, list): - test_result = { - "pks": ars_result.get("pks", {}), - "result": results[index], - } - elif isinstance(results, dict): - # make sure it has a single error message - assert "error" in results - test_result = { - "pks": ars_result.get("pks", {}), - "result": results, - } - else: - # got something completely unexpected from the ARS Test Runner - raise Exception() + if test_query is not None: + report = { + "pks": test_query["pks"], + "result": {}, + } + for agent, response in test_query["responses"].items(): + report["result"][agent] = {} + agent_report = report["result"][agent] + if response["status_code"] > 299: + agent_report["status"] = "FAILED" + if response["status_code"] == "598": + agent_report["message"] = "Timed out" + else: + agent_report["message"] = f"Status code: {response['status_code']}" + elif ( + response["response"]["message"]["results"] is None or + len(response["response"]["message"]["results"]) == 0 + ): + agent_report["status"] = "DONE" + agent_report["message"] = "No results" + else: + await pass_fail_analysis(report["result"], agent, response["response"]["message"]["results"], query_runner.normalized_curies[asset.output_id], asset.expected_output) + + status = "PASSED" # grab only ars result if it exists, otherwise default to failed - if test_result["result"].get("error") is not None: - status = "SKIPPED" - else: - status = ( - test_result["result"].get("ars", {}).get("status", "FAILED") - ) + ars_status = report["result"].get("ars", {}).get("status") + status = ars_status if ars_status is not None else "SKIPPED" full_report[status] += 1 + collector.collect_result( test, asset, - test_result, + report["result"], + test_query["pks"].get("parent_pk"), f"{reporter.base_path}/test-runs/{reporter.test_run_id}/tests/{test_id}", ) - if not err_msg and status != "SKIPPED": + + if status != "SKIPPED": # only upload ara labels if the test ran successfully try: labels = [ { "key": ara, - "value": get_tag(result), + "value": get_tag(report["result"][ara]), } - for ara, result in test_result["result"].items() + for ara in collector.agents if ara in report["result"] ] await reporter.upload_labels(test_id, labels) except Exception as e: logger.warning(f"[{test.id}] failed to upload labels: {e}") try: await reporter.upload_log( - test_id, json.dumps(test_result, indent=4) + test_id, json.dumps(report, indent=4) ) - except Exception as e: + except Exception: logger.error(f"[{test.id}] failed to upload logs.") - except Exception as e: - logger.error( - f"[{test.id}] failed to parse test results: {ars_result}" - ) - try: - await reporter.upload_log( - test_id, - f"Failed to parse results: {json.dumps(ars_result)}", - ) - except Exception as e: - logger.error(f"[{test.id}] failed to upload failure log.") + try: await reporter.finish_test(test_id, status) - except Exception as e: + except Exception: logger.error(f"[{test.id}] failed to upload finished status.") - # full_report[test["test_case_input_id"]]["ars"] = ars_result elif test.test_case_objective == "QuantitativeTest": continue assets = test.test_assets[0] @@ -269,6 +191,8 @@ async def run_tests( except Exception: logger.error(f"Failed to report errors with: {test.id}") + break + await slacker.post_notification( messages=[ """Test Suite: {test_suite}\nDuration: {duration} | Environment: {env}\n<{ir_url}|View in the Information Radiator>\n> Test Results:\n> Passed: {num_passed}, Failed: {num_failed}, Skipped: {num_skipped}""".format( diff --git a/test_harness/runner/Test Query Runner b/test_harness/runner/Test Query Runner new file mode 100644 index 0000000..ebc8a19 --- /dev/null +++ b/test_harness/runner/Test Query Runner @@ -0,0 +1,18 @@ +Test Query Runner + +Acceptance Tests +- Test Cases will be a single query +- Test Assets will have different output curies and expected outputs + +Performance Tests +- Test Assets will be separate queries +- It's assumed that all test assets in a single test case are unique, or that any duplicates will be run together as a single query +- Each Test Case can have an overall timeout for overall pass/fail, or maybe it's a green/yellow/red based on how long it takes from the ARS, and then each ARA will have outputs for each run query +- Test Assets for the Utilities should be full responses stored in the Tests repo + +Benchmark Tests +- Test Assets can be separate queries +- Each Test Case is a full benchmark + +Validation Tests +- Can be run on any existing response diff --git a/test_harness/runner/generate_query.py b/test_harness/runner/generate_query.py new file mode 100644 index 0000000..e651287 --- /dev/null +++ b/test_harness/runner/generate_query.py @@ -0,0 +1,101 @@ +"""Given a Test Asset, generate a TRAPI query.""" + +from translator_testing_model.datamodel.pydanticmodel import TestAsset + +from test_harness.utils import get_qualifier_constraints + + +MVP1 = { + "message": { + "query_graph": { + "nodes": { + "ON": {"categories": ["biolink:Disease"]}, + "SN": {"categories": ["biolink:ChemicalEntity"]}, + }, + "edges": { + "t_edge": { + "object": "ON", + "subject": "SN", + "predicates": ["biolink:treats"], + } + }, + } + } +} + +MVP2 = { + "message": { + "query_graph": { + "nodes": { + "ON": {"categories": ["biolink:Gene"]}, + "SN": {"categories": ["biolink:ChemicalEntity"]}, + }, + "edges": { + "t_edge": { + "object": "ON", + "subject": "SN", + "predicates": ["biolink:affects"], + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_aspect_qualifier", + "qualifier_value": "", + }, + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "", + }, + ] + } + ], + } + }, + } + } +} + + +def generate_query(test_asset: TestAsset) -> dict: + """Generate a TRAPI query.""" + query = {} + if test_asset.predicate_id == "biolink:treats": + # MVP1 + query = MVP1 + # add id to node + if test_asset.input_category == "biolink:Disease": + query["message"]["query_graph"]["nodes"]["ON"]["ids"] = [ + test_asset.input_id + ] + else: + raise Exception("Unsupported input category for MVP1") + # add knowledge_type + if "inferred" in test_asset.test_runner_settings: + query["message"]["query_graph"]["edges"]["t_edge"]["knowledge_type"] = "inferred" + elif test_asset.predicate_id == "biolink:affects": + # MVP2 + query = MVP2 + # add id to corresponding node + if test_asset.input_category == "biolink:ChemicalEntity": + query["message"]["query_graph"]["nodes"]["SN"]["ids"] = [ + test_asset.input_id + ] + elif test_asset.input_category == "biolink:Gene": + query["message"]["query_graph"]["nodes"]["ON"]["ids"] = [ + test_asset.input_id + ] + else: + raise Exception("Unsupported input category.") + # add qualifier constraints + aspect_qualifier, direction_qualifier = get_qualifier_constraints(test_asset) + query["message"]["query_graph"]["edges"]["t_edge"]["qualifier_constraints"][0][ + "qualifier_set" + ][0]["qualifier_value"] = aspect_qualifier + query["message"]["query_graph"]["edges"]["t_edge"]["qualifier_constraints"][0][ + "qualifier_set" + ][1]["qualifier_value"] = direction_qualifier + # add knowledge_type + if "inferred" in test_asset.test_runner_settings: + query["message"]["query_graph"]["edges"]["t_edge"]["knowledge_type"] = "inferred" + + return query diff --git a/test_harness/runner/runner.py b/test_harness/runner/runner.py new file mode 100644 index 0000000..325b9b4 --- /dev/null +++ b/test_harness/runner/runner.py @@ -0,0 +1,230 @@ +"""Translator Test Query Runner.""" + +import asyncio +import httpx +import logging +import time +from typing import Tuple, Dict + +from translator_testing_model.datamodel.pydanticmodel import TestCase +from test_harness.runner.smart_api_registry import retrieve_registry_from_smartapi +from test_harness.runner.generate_query import generate_query +from test_harness.utils import hash_test_asset, normalize_curies + +MAX_QUERY_TIME = 600 + +env_map = { + "dev": "development", + "ci": "staging", + "test": "testing", + "prod": "production", +} + + +class QueryRunner: + """Translator Test Query Runner.""" + + def __init__(self, logger: logging.Logger): + self.registry = {} + self.normalized_curies = {} + self.logger = logger + + async def retrieve_registry(self, trapi_version: str): + self.registry = await retrieve_registry_from_smartapi(trapi_version) + + async def run_query( + self, query_hash, semaphore, message, base_url, infores + ) -> Tuple[int, Dict[str, dict], Dict[str, str]]: + """Generate and run a single TRAPI query against a component.""" + # wait for opening in semaphore before sending next query + responses = {} + pks = {} + async with semaphore: + # handle some outlier urls + if infores == "infores:ars": + url = base_url + "/ars/api/submit" + elif infores == "infores:sri-answer-appraiser": + url = base_url + "/get_appraisal" + elif infores == "infores:sri-node-normalizer": + url = base_url + "/get_normalized_nodes" + elif "annotator" in base_url: + url = base_url + pass + else: + url = base_url + "/query" + # send message + response = {} + status_code = 418 + async with httpx.AsyncClient(timeout=600) as client: + try: + res = await client.post(url, json=message) + status_code = res.status_code + res.raise_for_status() + response = res.json() + except Exception as e: + self.logger.error(f"Something went wrong: {e}") + + if infores == "infores:ars": + # handle the ARS polling + parent_pk = response.get("pk", "") + ars_responses, pks = await self.get_ars_responses(parent_pk, base_url) + responses.update(ars_responses) + else: + single_infores = infores.split("infores:")[1] + # TODO: normalize this response + responses[single_infores] = { + "response": response, + "status_code": status_code, + } + + return query_hash, responses, pks + + async def run_queries( + self, + test_case: TestCase, + concurrency: int = 1, # for performance testing + ) -> Dict[int, dict]: + """Run all queries specified in a Test Case.""" + # normalize all the curies in a test case + self.normalized_curies.update(await normalize_curies(test_case, self.logger)) + # TODO: figure out the right way to handle input category wrt normalization + + queries: Dict[int, dict] = {} + for test_asset in test_case.test_assets: + test_asset.input_id = self.normalized_curies[test_asset.input_id] + # TODO: make this better + asset_hash = hash_test_asset(test_asset) + if asset_hash not in queries: + # generate query + query = generate_query(test_asset) + queries[asset_hash] = { + "query": query, + "responses": {}, + "pks": {}, + } + + self.logger.debug(queries) + # send queries to a single type of component at a time + for component in test_case.components: + # component = "ara" + # loop over all specified components, i.e. ars, ara, kp, utilities + semaphore = asyncio.Semaphore(concurrency) + self.logger.info(f"Sending queries to {self.registry[env_map[test_case.test_env]][component]}") + tasks = [ + asyncio.create_task( + self.run_query( + query_hash, + semaphore, + query["query"], + service["url"], + service["infores"], + ) + ) + for service in self.registry[env_map[test_case.test_env]][component] + for query_hash, query in queries.items() + ] + try: + all_responses = await asyncio.gather(*tasks, return_exceptions=True) + for query_hash, responses, pks in all_responses: + queries[query_hash]["responses"].update(responses) + queries[query_hash]["pks"].update(pks) + except Exception as e: + self.logger.error(f"Something went wrong with the queries: {e}") + + return queries + + async def get_ars_responses(self, parent_pk: str, base_url: str) -> Tuple[Dict[str, dict], Dict[str, str]]: + """Given a parent pk, get responses for all ARS things.""" + responses = {} + pks = { + "parent_pk": parent_pk, + } + async with httpx.AsyncClient(timeout=30) as client: + # retain this response for testing + # res = await client.post(f"{base_url}/ars/api/retain/{parent_pk}") + # res.raise_for_status() + # Get all children queries + res = await client.get(f"{base_url}/ars/api/messages/{parent_pk}?trace=y") + res.raise_for_status() + response = res.json() + + start_time = time.time() + for child in response.get("children", []): + child_pk = child["message"] + infores = child["actor"]["inforesid"].split("infores:")[1] + self.logger.info(f"Getting response for {infores}...") + # add child pk + pks[infores] = child_pk + current_time = time.time() + + response = None + # while we stay within the query max time + while current_time - start_time <= MAX_QUERY_TIME: + # get query status of child query + async with httpx.AsyncClient(timeout=30) as client: + res = await client.get(f"{base_url}/ars/api/messages/{child_pk}") + res.raise_for_status() + response = res.json() + status = response.get("fields", {}).get("status") + if status == "Done": + break + if status == "Error": + # query errored, need to capture + break + self.logger.info(f"{infores} is not Done, waiting...") + current_time = time.time() + await asyncio.sleep(5) + else: + self.logger.warning(f"Timed out getting ARS child messages after {MAX_QUERY_TIME / 60} minutes.") + + # add response to output + if response is not None: + self.logger.info(f"Got reponse for {infores}!") + responses[infores] = { + "response": response.get("fields", {}).get("data", {}), + "status_code": response.get("fields", {}).get("code", 410), + } + + # After getting all individual ARA responses, get and save the merged version + current_time = time.time() + while current_time - start_time <= MAX_QUERY_TIME: + async with httpx.AsyncClient(timeout=30) as client: + res = await client.get(f"{base_url}/ars/api/messages/{parent_pk}?trace=y") + res.raise_for_status() + response = res.json() + status = response.get("status") + if status == "Done" or status == "Error": + merged_pk = response.get("merged_version") + if merged_pk is None: + self.logger.error(f"Failed to get the ARS merged message from pk: {parent_pk}.") + pks["ars"] = "None" + responses["ars"] = { + "response": {"message": {"results": []}}, + "status_code": 410, + } + else: + # add final ars pk + pks["ars"] = merged_pk + # get full merged pk + res = await client.get(f"{base_url}/ars/api/messages/{merged_pk}") + res.raise_for_status() + merged_message = res.json() + responses["ars"] = { + "response": merged_message.get("fields", {}).get("data", {}), + "status_code": merged_message.get("fields", {}).get("code", 410), + } + self.logger.info("Got ARS merged message!") + break + else: + self.logger.info("ARS merging not done, waiting...") + current_time = time.time() + await asyncio.sleep(5) + else: + self.logger.warning(f"ARS merging took greater than {MAX_QUERY_TIME / 60} minutes.") + pks["ars"] = "None" + responses["ars"] = { + "response": {"message": {"results": []}}, + "status_code": 410, + } + + return responses, pks \ No newline at end of file diff --git a/test_harness/runner/smart_api_registry.py b/test_harness/runner/smart_api_registry.py new file mode 100644 index 0000000..c82f15e --- /dev/null +++ b/test_harness/runner/smart_api_registry.py @@ -0,0 +1,126 @@ +"""KP registry.""" + +import asyncio +from collections import defaultdict +import httpx +import json +import logging +import re + +LOGGER = logging.getLogger(__name__) + + +async def retrieve_registry_from_smartapi( + target_trapi_version="1.5.0", +): + """Returns a dict of smart api service endpoints defined with a dict like + { + "_id": _id, + "title": title, + "url": url, + "version": version, + } + """ + async with httpx.AsyncClient(timeout=30) as client: + try: + response = await client.get( + "https://smart-api.info/api/query?limit=1000&q=TRAPI" + ) + response.raise_for_status() + except httpx.HTTPError as e: + LOGGER.error("Failed to query smart api. Exiting...") + raise e + + registrations = response.json() + registry = defaultdict(lambda: defaultdict(list)) + for hit in registrations["hits"]: + try: + title = hit["info"]["title"] + except KeyError: + LOGGER.warning("No title for service. Cannot use.") + continue + # _id currently is missing on each "hit" (5/2/2022) + # https://github.com/SmartAPI/smartapi_registry/issues/7#issuecomment-1115007211 + try: + _id = hit["_id"] + except KeyError: + _id = title + try: + infores = hit["info"]["x-translator"]["infores"] + except KeyError: + LOGGER.warning( + "No x-translator.infores for %s (https://smart-api.info/registry?q=%s)", + title, + _id, + ) + infores = f"infores:{_id}" + try: + component = hit["info"]["x-translator"]["component"] + except KeyError: + LOGGER.warning( + "No x-translator.component for %s (https://smart-api.info/registry?q=%s)", + title, + _id, + ) + continue + try: + version = hit["info"]["x-trapi"]["version"] + except KeyError: + LOGGER.warning( + "No x-trapi.version for %s (https://smart-api.info/registry?q=%s)", + title, + _id, + ) + continue + regex = re.compile("[0-9]\.[0-9]") + trapi_version = regex.match(target_trapi_version) + if trapi_version is None or not version.startswith(trapi_version.group() + "."): + LOGGER.info( + f"TRAPI version != {f'{trapi_version.group()}.x' if trapi_version is not None else target_trapi_version} for %s (https://smart-api.info/registry?q=%s)", + title, + _id, + ) + continue + try: + for server in hit["servers"]: + try: + maturity = server["x-maturity"] + except KeyError: + LOGGER.warning(f"{infores} has no maturity") + continue + try: + url = server["url"] + if url.endswith("/"): + url = url[:-1] + except KeyError: + LOGGER.warning( + "No servers[0].url for %s (https://smart-api.info/registry?q=%s)", + title, + _id, + ) + continue + + endpoint_title = title + + registry[maturity][component.lower()].append( + { + "_id": _id, + "title": endpoint_title, + "infores": infores, + "url": url, + } + ) + except KeyError: + LOGGER.warning( + "No servers for %s (https://smart-api.info/registry?q=%s)", + title, + _id, + ) + continue + + return registry + + +if __name__ == "__main__": + registry = asyncio.run(retrieve_registry_from_smartapi()) + print(json.dumps(registry)) diff --git a/test_harness/utils.py b/test_harness/utils.py index 5ad5147..5e2deb6 100644 --- a/test_harness/utils.py +++ b/test_harness/utils.py @@ -2,9 +2,9 @@ import httpx import logging -from typing import Dict, Union, List +from typing import Dict, Union, List, Tuple -from translator_testing_model.datamodel.pydanticmodel import TestCase +from translator_testing_model.datamodel.pydanticmodel import TestCase, TestAsset NODE_NORM_URL = { "dev": "https://nodenormalization-sri.renci.org/1.4", @@ -19,10 +19,11 @@ async def normalize_curies( logger: logging.Logger = logging.getLogger(__name__), ) -> Dict[str, Dict[str, Union[Dict[str, str], List[str]]]]: """Normalize a list of curies.""" - node_norm = NODE_NORM_URL[test.test_env] + node_norm = NODE_NORM_URL.get(test.test_env) # collect all curies from test - curies = [asset.output_id for asset in test.test_assets] - curies.append(test.test_case_input_id) + curies = set([asset.output_id for asset in test.test_assets]) + curies.update([asset.input_id for asset in test.test_assets]) + curies.add(test.test_case_input_id) async with httpx.AsyncClient() as client: normalized_curies = {} @@ -30,7 +31,7 @@ async def normalize_curies( response = await client.post( node_norm + "/get_normalized_nodes", json={ - "curies": curies, + "curies": list(curies), "conflate": True, "drug_chemical_conflate": True, }, @@ -39,26 +40,15 @@ async def normalize_curies( response = response.json() for curie, attrs in response.items(): if attrs is None: - normalized_curies[curie] = { - "id": { - "identifier": "Unknown", - }, - "type": [ - "Unknown", - ], - } + normalized_curies[curie] = "Unknown" else: - normalized_curies[curie] = attrs + # choose the perferred id + normalized_curies[curie] = attrs["id"]["identifier"] except Exception as e: logger.error(f"Node norm failed with: {e}") logger.error("Using original curies.") for curie in curies: - normalized_curies[curie] = { - "id": { - "identifier": curie, - }, - # intentionally doesn't have a type so we can default to the original - } + normalized_curies[curie] = curie return normalized_curies @@ -70,3 +60,28 @@ def get_tag(result): if message: tag = message return tag + + +def hash_test_asset(test_asset: TestAsset) -> int: + """Given a test asset, return its unique hash.""" + asset_hash = hash( + ( + test_asset.input_id, + test_asset.predicate_id, + *[qualifier.value for qualifier in test_asset.qualifiers], + ) + ) + return asset_hash + + +def get_qualifier_constraints(test_asset: TestAsset) -> Tuple[str, str]: + """Get qualifier constraints from a Test Asset.""" + biolink_object_aspect_qualifier = "" + biolink_object_direction_qualifier = "" + for qualifier in test_asset.qualifiers: + if qualifier.parameter == "biolink_object_aspect_qualifier": + biolink_object_aspect_qualifier = qualifier.value + elif qualifier.parameter == "biolink_object_direction_qualifier": + biolink_object_direction_qualifier = qualifier.value + + return biolink_object_aspect_qualifier, biolink_object_direction_qualifier From 0b42ba2c9cef8805d7c75c0e8bcec840ba9b0507 Mon Sep 17 00:00:00 2001 From: Max Wang Date: Thu, 20 Jun 2024 15:34:13 -0400 Subject: [PATCH 2/3] Bump minor version --- .gitignore | 1 + setup.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 7e786de..777bad5 100644 --- a/.gitignore +++ b/.gitignore @@ -2,5 +2,6 @@ __pycache__ *.egg-info logs/ +*.log scripts/ .env diff --git a/setup.py b/setup.py index d45981d..5812523 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setup( name="sri-test-harness", - version="0.1.15", + version="0.2.0", author="Max Wang", author_email="max@covar.com", url="https://github.com/TranslatorSRI/TestHarness", From 913b846fca772854337eec3fe961d07b037103b7 Mon Sep 17 00:00:00 2001 From: Max Wang Date: Thu, 20 Jun 2024 15:38:51 -0400 Subject: [PATCH 3/3] Run black --- test_harness/result_collector.py | 24 ++++++++++++++--- test_harness/run.py | 30 +++++++++++++-------- test_harness/runner/generate_query.py | 8 ++++-- test_harness/runner/runner.py | 38 ++++++++++++++++++++------- 4 files changed, 74 insertions(+), 26 deletions(-) diff --git a/test_harness/result_collector.py b/test_harness/result_collector.py index 9a5633a..903df10 100644 --- a/test_harness/result_collector.py +++ b/test_harness/result_collector.py @@ -1,4 +1,5 @@ """The Collector of Results.""" + from typing import Union from translator_testing_model.datamodel.pydanticmodel import TestAsset, TestCase @@ -10,7 +11,15 @@ class ResultCollector: def __init__(self): """Initialize the Collector.""" - self.agents = ["ars", "aragorn", "arax", "biothings-explorer", "improving-agent", "unsecret-agent", "cqs"] + self.agents = [ + "ars", + "aragorn", + "arax", + "biothings-explorer", + "improving-agent", + "unsecret-agent", + "cqs", + ] query_types = ["TopAnswer", "Acceptable", "BadButForgivable", "NeverShow"] self.result_types = { "PASSED": "PASSED", @@ -30,7 +39,14 @@ def __init__(self): header = ",".join(self.columns) self.csv = f"{header}\n" - def collect_result(self, test: TestCase, asset: TestAsset, report: dict, parent_pk: Union[str, None], url: str): + def collect_result( + self, + test: TestCase, + asset: TestAsset, + report: dict, + parent_pk: Union[str, None], + url: str, + ): """Add a single report to the total output.""" # add result to stats for agent in self.agents: @@ -45,7 +61,9 @@ def collect_result(self, test: TestCase, asset: TestAsset, report: dict, parent_ agent_results = ",".join( get_tag(report[agent]) for agent in self.agents if agent in report ) - pk_url = f"https://arax.ncats.io/?r={parent_pk}" if parent_pk is not None else "" + pk_url = ( + f"https://arax.ncats.io/?r={parent_pk}" if parent_pk is not None else "" + ) self.csv += ( f"""{asset.name},{url},{pk_url},{test.id},{asset.id},{agent_results}\n""" ) diff --git a/test_harness/run.py b/test_harness/run.py index bc777c1..d59531b 100644 --- a/test_harness/run.py +++ b/test_harness/run.py @@ -8,6 +8,7 @@ from typing import Dict from ARS_Test_Runner.semantic_test import pass_fail_analysis + # from benchmarks_runner import run_benchmarks from translator_testing_model.datamodel.pydanticmodel import TestCase @@ -65,7 +66,7 @@ async def run_tests( test_ids.append(test_id) except Exception: logger.error(f"Failed to create test: {test.id}") - + test_asset_hash = hash_test_asset(asset) test_query = query_responses.get(test_asset_hash) if test_query is not None: @@ -90,16 +91,24 @@ async def run_tests( if response["status_code"] == "598": agent_report["message"] = "Timed out" else: - agent_report["message"] = f"Status code: {response['status_code']}" + agent_report["message"] = ( + f"Status code: {response['status_code']}" + ) elif ( - response["response"]["message"]["results"] is None or - len(response["response"]["message"]["results"]) == 0 + response["response"]["message"]["results"] is None + or len(response["response"]["message"]["results"]) == 0 ): agent_report["status"] = "DONE" agent_report["message"] = "No results" else: - await pass_fail_analysis(report["result"], agent, response["response"]["message"]["results"], query_runner.normalized_curies[asset.output_id], asset.expected_output) - + await pass_fail_analysis( + report["result"], + agent, + response["response"]["message"]["results"], + query_runner.normalized_curies[asset.output_id], + asset.expected_output, + ) + status = "PASSED" # grab only ars result if it exists, otherwise default to failed ars_status = report["result"].get("ars", {}).get("status") @@ -122,18 +131,17 @@ async def run_tests( "key": ara, "value": get_tag(report["result"][ara]), } - for ara in collector.agents if ara in report["result"] + for ara in collector.agents + if ara in report["result"] ] await reporter.upload_labels(test_id, labels) except Exception as e: logger.warning(f"[{test.id}] failed to upload labels: {e}") try: - await reporter.upload_log( - test_id, json.dumps(report, indent=4) - ) + await reporter.upload_log(test_id, json.dumps(report, indent=4)) except Exception: logger.error(f"[{test.id}] failed to upload logs.") - + try: await reporter.finish_test(test_id, status) except Exception: diff --git a/test_harness/runner/generate_query.py b/test_harness/runner/generate_query.py index e651287..0b357b9 100644 --- a/test_harness/runner/generate_query.py +++ b/test_harness/runner/generate_query.py @@ -71,7 +71,9 @@ def generate_query(test_asset: TestAsset) -> dict: raise Exception("Unsupported input category for MVP1") # add knowledge_type if "inferred" in test_asset.test_runner_settings: - query["message"]["query_graph"]["edges"]["t_edge"]["knowledge_type"] = "inferred" + query["message"]["query_graph"]["edges"]["t_edge"][ + "knowledge_type" + ] = "inferred" elif test_asset.predicate_id == "biolink:affects": # MVP2 query = MVP2 @@ -96,6 +98,8 @@ def generate_query(test_asset: TestAsset) -> dict: ][1]["qualifier_value"] = direction_qualifier # add knowledge_type if "inferred" in test_asset.test_runner_settings: - query["message"]["query_graph"]["edges"]["t_edge"]["knowledge_type"] = "inferred" + query["message"]["query_graph"]["edges"]["t_edge"][ + "knowledge_type" + ] = "inferred" return query diff --git a/test_harness/runner/runner.py b/test_harness/runner/runner.py index 325b9b4..3af542e 100644 --- a/test_harness/runner/runner.py +++ b/test_harness/runner/runner.py @@ -109,7 +109,9 @@ async def run_queries( # component = "ara" # loop over all specified components, i.e. ars, ara, kp, utilities semaphore = asyncio.Semaphore(concurrency) - self.logger.info(f"Sending queries to {self.registry[env_map[test_case.test_env]][component]}") + self.logger.info( + f"Sending queries to {self.registry[env_map[test_case.test_env]][component]}" + ) tasks = [ asyncio.create_task( self.run_query( @@ -133,7 +135,9 @@ async def run_queries( return queries - async def get_ars_responses(self, parent_pk: str, base_url: str) -> Tuple[Dict[str, dict], Dict[str, str]]: + async def get_ars_responses( + self, parent_pk: str, base_url: str + ) -> Tuple[Dict[str, dict], Dict[str, str]]: """Given a parent pk, get responses for all ARS things.""" responses = {} pks = { @@ -175,7 +179,9 @@ async def get_ars_responses(self, parent_pk: str, base_url: str) -> Tuple[Dict[s current_time = time.time() await asyncio.sleep(5) else: - self.logger.warning(f"Timed out getting ARS child messages after {MAX_QUERY_TIME / 60} minutes.") + self.logger.warning( + f"Timed out getting ARS child messages after {MAX_QUERY_TIME / 60} minutes." + ) # add response to output if response is not None: @@ -189,14 +195,18 @@ async def get_ars_responses(self, parent_pk: str, base_url: str) -> Tuple[Dict[s current_time = time.time() while current_time - start_time <= MAX_QUERY_TIME: async with httpx.AsyncClient(timeout=30) as client: - res = await client.get(f"{base_url}/ars/api/messages/{parent_pk}?trace=y") + res = await client.get( + f"{base_url}/ars/api/messages/{parent_pk}?trace=y" + ) res.raise_for_status() response = res.json() status = response.get("status") if status == "Done" or status == "Error": merged_pk = response.get("merged_version") if merged_pk is None: - self.logger.error(f"Failed to get the ARS merged message from pk: {parent_pk}.") + self.logger.error( + f"Failed to get the ARS merged message from pk: {parent_pk}." + ) pks["ars"] = "None" responses["ars"] = { "response": {"message": {"results": []}}, @@ -206,12 +216,18 @@ async def get_ars_responses(self, parent_pk: str, base_url: str) -> Tuple[Dict[s # add final ars pk pks["ars"] = merged_pk # get full merged pk - res = await client.get(f"{base_url}/ars/api/messages/{merged_pk}") + res = await client.get( + f"{base_url}/ars/api/messages/{merged_pk}" + ) res.raise_for_status() merged_message = res.json() responses["ars"] = { - "response": merged_message.get("fields", {}).get("data", {}), - "status_code": merged_message.get("fields", {}).get("code", 410), + "response": merged_message.get("fields", {}).get( + "data", {} + ), + "status_code": merged_message.get("fields", {}).get( + "code", 410 + ), } self.logger.info("Got ARS merged message!") break @@ -220,11 +236,13 @@ async def get_ars_responses(self, parent_pk: str, base_url: str) -> Tuple[Dict[s current_time = time.time() await asyncio.sleep(5) else: - self.logger.warning(f"ARS merging took greater than {MAX_QUERY_TIME / 60} minutes.") + self.logger.warning( + f"ARS merging took greater than {MAX_QUERY_TIME / 60} minutes." + ) pks["ars"] = "None" responses["ars"] = { "response": {"message": {"results": []}}, "status_code": 410, } - return responses, pks \ No newline at end of file + return responses, pks