Skip to content

Commit

Permalink
Merge pull request #19 from TranslatorSRI/query_runner
Browse files Browse the repository at this point in the history
Query runner
  • Loading branch information
maximusunc authored Jun 22, 2024
2 parents 13f26ef + 913b846 commit 847960b
Show file tree
Hide file tree
Showing 9 changed files with 635 additions and 172 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
__pycache__
*.egg-info
logs/
*.log
scripts/
.env
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name="sri-test-harness",
version="0.1.15",
version="0.2.0",
author="Max Wang",
author_email="[email protected]",
url="https://github.com/TranslatorSRI/TestHarness",
Expand Down
42 changes: 30 additions & 12 deletions test_harness/result_collector.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
"""The Collector of Results."""

from typing import Union
from translator_testing_model.datamodel.pydanticmodel import TestAsset, TestCase

from .utils import get_tag
from test_harness.utils import get_tag


class ResultCollector:
"""Collect results for easy dissemination."""

def __init__(self):
"""Initialize the Collector."""
self.agents = ["ars", "aragorn", "arax", "bte", "improving", "unsecret", "cqs"]
self.agents = [
"ars",
"aragorn",
"arax",
"biothings-explorer",
"improving-agent",
"unsecret-agent",
"cqs",
]
query_types = ["TopAnswer", "Acceptable", "BadButForgivable", "NeverShow"]
self.result_types = {
"PASSED": "PASSED",
Expand All @@ -30,22 +39,31 @@ def __init__(self):
header = ",".join(self.columns)
self.csv = f"{header}\n"

def collect_result(self, test: TestCase, asset: TestAsset, result: dict, url: str):
"""Add a single result to the total output."""
def collect_result(
self,
test: TestCase,
asset: TestAsset,
report: dict,
parent_pk: Union[str, None],
url: str,
):
"""Add a single report to the total output."""
# add result to stats
for agent in result["result"]:
for agent in self.agents:
query_type = asset.expected_output
result_type = self.result_types.get(
get_tag(result["result"][agent]), "Test Error"
)
self.stats[agent][query_type][result_type] += 1
if agent in report:
result_type = self.result_types.get(
get_tag(report[agent]), "Test Error"
)
self.stats[agent][query_type][result_type] += 1

# add result to csv
agent_results = ",".join(
get_tag(result["result"][agent]) for agent in self.agents
get_tag(report[agent]) for agent in self.agents if agent in report
)
pk_url = (
f"https://arax.ncats.io/?r={parent_pk}" if parent_pk is not None else ""
)
ars_pk = result["pks"].get("parent_pk", None)
pk_url = f"https://arax.ncats.io/?r={ars_pk}" if ars_pk is not None else ""
self.csv += (
f"""{asset.name},{url},{pk_url},{test.id},{asset.id},{agent_results}\n"""
)
208 changes: 70 additions & 138 deletions test_harness/run.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
"""Run tests through the Test Runners."""

from collections import defaultdict
import httpx
import json
import logging
import time
from tqdm import tqdm
import traceback
from typing import Dict

from ARS_Test_Runner.semantic_test import run_semantic_test as run_ars_test
from ARS_Test_Runner.semantic_test import pass_fail_analysis

# from benchmarks_runner import run_benchmarks

from translator_testing_model.datamodel.pydanticmodel import TestCase

from .reporter import Reporter
from .slacker import Slacker
from .result_collector import ResultCollector
from .utils import normalize_curies, get_tag
from test_harness.runner.runner import QueryRunner
from test_harness.reporter import Reporter
from test_harness.slacker import Slacker
from test_harness.result_collector import ResultCollector
from test_harness.utils import get_tag, hash_test_asset


async def run_tests(
Expand All @@ -42,6 +41,9 @@ async def run_tests(
f"Running {args['suite']} ({sum([len(test.test_assets) for test in tests.values()])} tests)...\n<{reporter.base_path}/test-runs/{reporter.test_run_id}|View in the Information Radiator>"
]
)
query_runner = QueryRunner(logger)
logger.info("Runner is getting service registry")
await query_runner.retrieve_registry(trapi_version="1.5.0")
collector = ResultCollector()
# loop over all tests
for test in tqdm(tests.values()):
Expand All @@ -51,31 +53,12 @@ async def run_tests(
if not test.test_assets or not test.test_case_objective:
logger.warning(f"Test has missing required fields: {test.id}")
continue

query_responses = await query_runner.run_queries(test)
if test.test_case_objective == "AcceptanceTest":
assets = test.test_assets
test_ids = []
biolink_object_aspect_qualifier = ""
biolink_object_direction_qualifier = ""
for qualifier in test.qualifiers:
if qualifier.parameter == "biolink_object_aspect_qualifier":
biolink_object_aspect_qualifier = qualifier.value
elif qualifier.parameter == "biolink_object_direction_qualifier":
biolink_object_direction_qualifier = qualifier.value

# normalize all the curies
curies = [asset.output_id for asset in assets]
curies.append(test.test_case_input_id)
normalized_curies = await normalize_curies(test, logger)
input_curie = normalized_curies[test.test_case_input_id]["id"]["identifier"]
# try and get normalized input category, but default to original
# input_category = normalized_curies[test.test_case_input_id].get(
# "type", [test.input_category]
# )[0]
# TODO: figure out the right way to handle input category wrt normalization
input_category = test.input_category

err_msg = ""
for asset in assets:
for asset in test.test_assets:
# create test in Test Dashboard
test_id = ""
try:
Expand All @@ -84,138 +67,85 @@ async def run_tests(
except Exception:
logger.error(f"Failed to create test: {test.id}")

try:
test_input = json.dumps(
{
"environment": test.test_env,
"predicate": test.test_case_predicate_name,
"runner_settings": test.test_runner_settings,
"expected_output": asset.expected_output,
"biolink_object_aspect_qualifier": biolink_object_aspect_qualifier,
"biolink_object_direction_qualifier": biolink_object_direction_qualifier,
"input_category": input_category,
"input_curie": input_curie,
"output_curie": normalized_curies[asset.output_id]["id"][
"identifier"
],
},
indent=2,
)
await reporter.upload_log(
test_id,
"Calling ARS Test Runner with: {test_input}".format(
test_input=test_input
),
)
except Exception as e:
logger.error(str(e))
logger.error(f"Failed to upload logs to test: {test.id}, {test_id}")
test_asset_hash = hash_test_asset(asset)
test_query = query_responses.get(test_asset_hash)
if test_query is not None:
message = json.dumps(test_query["query"], indent=2)
else:
message = "Unable to retrieve response for test asset."
await reporter.upload_log(
test_id,
message,
)

# group all outputs together to make one Translator query
output_ids = [
normalized_curies[asset.output_id]["id"]["identifier"]
for asset in assets
]
expected_outputs = [asset.expected_output for asset in assets]
test_inputs = [
test.test_env,
test.test_case_predicate_name,
test.test_runner_settings,
expected_outputs,
biolink_object_aspect_qualifier,
biolink_object_direction_qualifier,
input_category,
input_curie,
output_ids,
]
try:
ars_result, ars_url = await run_ars_test(*test_inputs)
except Exception as e:
err_msg = f"ARS Test Runner failed with {traceback.format_exc()}"
logger.error(f"[{test.id}] {err_msg}")
ars_url = None
ars_result = {
"pks": {},
# this will effectively act as a list that we access by index down below
"results": defaultdict(lambda: {"error": err_msg}),
}
# full_report[test["test_case_input_id"]]["ars"] = {"error": str(e)}
try:
ars_pk = ars_result.get("pks", {}).get("parent_pk")
if ars_pk and ars_url is not None:
async with httpx.AsyncClient() as client:
await client.post(f"{ars_url}retain/{ars_pk}")
except Exception as e:
logger.error("Failed to retain PK on ARS.")
# grab individual results for each asset
for index, (test_id, asset) in enumerate(zip(test_ids, assets)):
status = "PASSED"
try:
results = ars_result.get("results", [])
if isinstance(results, list):
test_result = {
"pks": ars_result.get("pks", {}),
"result": results[index],
}
elif isinstance(results, dict):
# make sure it has a single error message
assert "error" in results
test_result = {
"pks": ars_result.get("pks", {}),
"result": results,
}
else:
# got something completely unexpected from the ARS Test Runner
raise Exception()
if test_query is not None:
report = {
"pks": test_query["pks"],
"result": {},
}
for agent, response in test_query["responses"].items():
report["result"][agent] = {}
agent_report = report["result"][agent]
if response["status_code"] > 299:
agent_report["status"] = "FAILED"
if response["status_code"] == "598":
agent_report["message"] = "Timed out"
else:
agent_report["message"] = (
f"Status code: {response['status_code']}"
)
elif (
response["response"]["message"]["results"] is None
or len(response["response"]["message"]["results"]) == 0
):
agent_report["status"] = "DONE"
agent_report["message"] = "No results"
else:
await pass_fail_analysis(
report["result"],
agent,
response["response"]["message"]["results"],
query_runner.normalized_curies[asset.output_id],
asset.expected_output,
)

status = "PASSED"
# grab only ars result if it exists, otherwise default to failed
if test_result["result"].get("error") is not None:
status = "SKIPPED"
else:
status = (
test_result["result"].get("ars", {}).get("status", "FAILED")
)
ars_status = report["result"].get("ars", {}).get("status")
status = ars_status if ars_status is not None else "SKIPPED"
full_report[status] += 1

collector.collect_result(
test,
asset,
test_result,
report["result"],
test_query["pks"].get("parent_pk"),
f"{reporter.base_path}/test-runs/{reporter.test_run_id}/tests/{test_id}",
)
if not err_msg and status != "SKIPPED":

if status != "SKIPPED":
# only upload ara labels if the test ran successfully
try:
labels = [
{
"key": ara,
"value": get_tag(result),
"value": get_tag(report["result"][ara]),
}
for ara, result in test_result["result"].items()
for ara in collector.agents
if ara in report["result"]
]
await reporter.upload_labels(test_id, labels)
except Exception as e:
logger.warning(f"[{test.id}] failed to upload labels: {e}")
try:
await reporter.upload_log(
test_id, json.dumps(test_result, indent=4)
)
except Exception as e:
await reporter.upload_log(test_id, json.dumps(report, indent=4))
except Exception:
logger.error(f"[{test.id}] failed to upload logs.")
except Exception as e:
logger.error(
f"[{test.id}] failed to parse test results: {ars_result}"
)
try:
await reporter.upload_log(
test_id,
f"Failed to parse results: {json.dumps(ars_result)}",
)
except Exception as e:
logger.error(f"[{test.id}] failed to upload failure log.")

try:
await reporter.finish_test(test_id, status)
except Exception as e:
except Exception:
logger.error(f"[{test.id}] failed to upload finished status.")
# full_report[test["test_case_input_id"]]["ars"] = ars_result
elif test.test_case_objective == "QuantitativeTest":
continue
assets = test.test_assets[0]
Expand Down Expand Up @@ -269,6 +199,8 @@ async def run_tests(
except Exception:
logger.error(f"Failed to report errors with: {test.id}")

break

await slacker.post_notification(
messages=[
"""Test Suite: {test_suite}\nDuration: {duration} | Environment: {env}\n<{ir_url}|View in the Information Radiator>\n> Test Results:\n> Passed: {num_passed}, Failed: {num_failed}, Skipped: {num_skipped}""".format(
Expand Down
18 changes: 18 additions & 0 deletions test_harness/runner/Test Query Runner
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Test Query Runner

Acceptance Tests
- Test Cases will be a single query
- Test Assets will have different output curies and expected outputs

Performance Tests
- Test Assets will be separate queries
- It's assumed that all test assets in a single test case are unique, or that any duplicates will be run together as a single query
- Each Test Case can have an overall timeout for overall pass/fail, or maybe it's a green/yellow/red based on how long it takes from the ARS, and then each ARA will have outputs for each run query
- Test Assets for the Utilities should be full responses stored in the Tests repo

Benchmark Tests
- Test Assets can be separate queries
- Each Test Case is a full benchmark

Validation Tests
- Can be run on any existing response
Loading

0 comments on commit 847960b

Please sign in to comment.