diff --git a/.gitignore b/.gitignore index 6cf8934..cca30d0 100644 --- a/.gitignore +++ b/.gitignore @@ -163,3 +163,7 @@ tests/test_data/**/*analysis.json test-results.xml report.html result/ + +# Generated Report Files +protocols_and_analyses.md +protocols_and_analyses.zip \ No newline at end of file diff --git a/action.yml b/action.yml index d54503f..4fb7ad3 100644 --- a/action.yml +++ b/action.yml @@ -7,6 +7,7 @@ inputs: description: 'relative path in the repository of the root folder in which to search for to search for protocols' required: false default: 'protocols' + type: string runs: using: 'docker' image: 'docker://ghcr.io/y3rsh/ot-analyze:main' diff --git a/ot_analyze.py b/ot_analyze.py index 802c5b1..6ba7eb2 100644 --- a/ot_analyze.py +++ b/ot_analyze.py @@ -1,13 +1,106 @@ import json import os +import pprint +import shutil import subprocess import time from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from enum import Enum, auto from pathlib import Path -from typing import List +from typing import Any, Dict, Iterator, List from write_failed_analysis import write_failed_analysis +ZIP_FILE_BASENAME = "protocols_and_analyses" +MARKDOWN_FILE_BASENAME = "summary" + +SUCCESS_COLOR = "#7beb73" +FAILURE_COLOR = "#eb7373" + +MARKDOWN_TEMPLATE = """ +# OT-Analyze Test Results + +## Result Breakdown + +Below is a list of protocols and their analysis results. + +{results} + +""" + +ANALYSIS_ERROR_TEMPLATE = """ + +Analysis Error: {analysis_error} + +""" + +RESULTS_TEMPLATE = """ +
+ +{protocol_file_name} [ {pass_or_fail} ] + +Protocol Type: {protocol_type} + +Analysis Execution Time: {execution_time:.2f} seconds + +{analysis_error} + +
+""" + + +class ProtocolType(Enum): + PROTOCOL_DESIGNER = auto() + PYTHON = auto() + +class AnalysisResult(Enum): + PASS = auto() + FAIL = auto() + + +@dataclass +class ProtocolPaths: + protocol_file: Path + analysis_file: Path + analysis_execution_time: float | None = None + + @property + def _analysis_file_content(self) -> Dict[str, Any]: + with open(self.analysis_file.absolute(), "r") as file: + return json.load(file) + + @property + def _analysis_success(self) -> bool: + return self._analysis_file_content["errors"] == [] + + @property + def analysis_error(self) -> str: + return self._analysis_file_content["errors"] + + @property + def analysis_result(self) -> AnalysisResult: + return ( + AnalysisResult.PASS + if self._analysis_success + else AnalysisResult.FAIL + ) + + @property + def protocol_file_name(self) -> str: + return self.protocol_file.name + + @property + def protocol_type(self) -> str: + return ( + ProtocolType.PYTHON + if self.protocol_file.suffix == ".py" + else ProtocolType.PROTOCOL_DESIGNER + ).name.title() + + def set_analysis_execution_time(self, analysis_execution_time: float) -> None: + self.analysis_execution_time = analysis_execution_time + def generate_analysis_path(protocol_file: Path) -> Path: """ @@ -20,14 +113,13 @@ def generate_analysis_path(protocol_file: Path) -> Path: return Path(protocol_file.parent, f"{protocol_file.stem}_analysis.json") -def analyze(protocol_file: Path): +def analyze(protocol_path: ProtocolPaths) -> float: start_time = time.time() # Start timing - analysis_file = generate_analysis_path(protocol_file) - custom_labware_directory = Path(protocol_file.parent, "custom_labware") + custom_labware_directory = Path(protocol_path.protocol_file.parent, "custom_labware") custom_labware = [] # PD protocols contain their own custom labware - if custom_labware_directory.is_dir() and protocol_file.suffix == ".py": + if custom_labware_directory.is_dir() and protocol_path.protocol_file.suffix == ".py": custom_labware = [ os.path.join(custom_labware_directory, file) for file in os.listdir(custom_labware_directory) if file.endswith(".json") ] @@ -39,23 +131,25 @@ def analyze(protocol_file: Path): "opentrons.cli", "analyze", "--json-output", - analysis_file, - protocol_file, + protocol_path.analysis_file, + protocol_path.protocol_file, ] + custom_labware try: subprocess.run(command, capture_output=True, text=True, check=True) except Exception as e: - print(f"Error in analysis of {protocol_file}") - write_failed_analysis(analysis_file, e) + print(f"Error in analysis of {protocol_path.protocol_file}") + write_failed_analysis(protocol_path.analysis_file, e) end_time = time.time() + protocol_path.set_analysis_execution_time(end_time) return end_time - start_time end_time = time.time() elapsed_time = end_time - start_time - print(f"Successful analysis of {protocol_file} completed in {elapsed_time:.2f} seconds") + protocol_path.set_analysis_execution_time(elapsed_time) + print(f"Successful analysis of {protocol_path.protocol_file} completed in {elapsed_time:.2f} seconds") return elapsed_time -def run_analyze_in_parallel(protocol_files: List[Path]): +def run_analyze_in_parallel(protocol_files: List[ProtocolPaths]): start_time = time.time() with ThreadPoolExecutor() as executor: futures = [executor.submit(analyze, file) for file in protocol_files] @@ -74,18 +168,6 @@ def run_analyze_in_parallel(protocol_files: List[Path]): ) -def find_python_protocols(directory: Path) -> List[Path]: - # Check if the provided path is a valid directory - - if not directory.is_dir(): - raise NotADirectoryError(f"The path {directory} is not a valid directory.") - - # Recursively find all .py files - python_files = list(directory.rglob("*.py")) - # TODO: shallow test that they are valid protocol files - return python_files - - def has_designer_application(json_file_path): try: with open(json_file_path, "r", encoding="utf-8") as file: @@ -97,25 +179,108 @@ def has_designer_application(json_file_path): return False -def find_pd_protocols(directory: Path) -> List[Path]: + + +def find_protocol_paths(repo_relative_path: Path) -> List[ProtocolPaths]: + def find_pd_protocols(directory: Path) -> List[Path]: # Check if the provided path is a valid directory - if not directory.is_dir(): - raise NotADirectoryError(f"The path {directory} is not a valid directory.") + if not directory.is_dir(): + raise NotADirectoryError(f"The path {directory} is not a valid directory.") + + # Recursively find all .json files + json_files = list(directory.rglob("*.json")) + filtered_json_files = [file for file in json_files if has_designer_application(file)] + + return filtered_json_files - # Recursively find all .json files - json_files = list(directory.rglob("*.json")) - filtered_json_files = [file for file in json_files if has_designer_application(file)] - return filtered_json_files + def find_python_protocols(directory: Path) -> List[Path]: + # Check if the provided path is a valid directory + if not directory.is_dir(): + raise NotADirectoryError(f"The path {directory} is not a valid directory.") + + # Recursively find all .py files + python_files = list(directory.rglob("*.py")) + # TODO: shallow test that they are valid protocol files + return python_files + return [ + ProtocolPaths(protocol_file, generate_analysis_path(protocol_file)) + for protocol_file + in find_python_protocols(repo_relative_path) + find_pd_protocols(repo_relative_path) + ] + +def create_zip(directory_path: Path): + absolute_directory_path = directory_path.absolute() + try: + archive_name = shutil.make_archive(ZIP_FILE_BASENAME, 'zip', absolute_directory_path, absolute_directory_path) + print(f"Zipfile created and saved to: {absolute_directory_path / archive_name}") + + except Exception as e: + print(f"Error: {e}") + +def create_markdown(protocol_paths: List[ProtocolPaths]) -> None: + def generate_result(protocol_path: ProtocolPaths) -> str: + if protocol_path.analysis_result == AnalysisResult.PASS: + summary_color = SUCCESS_COLOR + analysis_error = "" + else: + summary_color = FAILURE_COLOR + analysis_error = ANALYSIS_ERROR_TEMPLATE.format( + analysis_error="\n".join( + error["detail"] + for error + in protocol_path.analysis_error + ) + ) + + return RESULTS_TEMPLATE.format( + protocol_file_name=protocol_path.protocol_file_name, + protocol_type=protocol_path.protocol_type, + summary_color=summary_color, + pass_or_fail=protocol_path.analysis_result.name.upper(), + analysis_error=analysis_error, + execution_time=protocol_path.analysis_execution_time, + ) + markdown_content = MARKDOWN_TEMPLATE.format( + results="\n".join([generate_result(protocol_path) for protocol_path in protocol_paths]), + ) + markdown_file_name = f"{MARKDOWN_FILE_BASENAME}.md" + absolute_directory_path = Path.cwd() + with open(markdown_file_name, "w") as file: + file.write(markdown_content) + print(f"Markdown file created and saved to: {absolute_directory_path / markdown_file_name}") def main(): repo_relative_path = Path(os.getenv("GITHUB_WORKSPACE"), os.getenv("INPUT_BASE_DIRECTORY")) print(f"Analyzing all protocol files in {repo_relative_path}") - python_files = find_python_protocols(repo_relative_path) - pd_files = find_pd_protocols(repo_relative_path) - all_protocol_files = python_files + pd_files - run_analyze_in_parallel(all_protocol_files) + protocol_paths = find_protocol_paths(repo_relative_path) + run_analyze_in_parallel(protocol_paths) + create_zip(repo_relative_path) + create_markdown(protocol_paths) if __name__ == "__main__": - main() + import contextlib + + @contextlib.contextmanager + def set_env(**environ: Dict[str, str]) -> Iterator[None]: + old_environ = dict(os.environ) + os.environ.update(environ) + try: + yield + finally: + os.environ.clear() + os.environ.update(old_environ) + + environ_vars_to_add = {} + if os.getenv("GITHUB_WORKSPACE") is None: + environ_vars_to_add["GITHUB_WORKSPACE"] = str(Path(__file__).parent.absolute()) + + if os.getenv("INPUT_BASE_DIRECTORY") is None: + environ_vars_to_add["INPUT_BASE_DIRECTORY"] = "../ot-analyze-test/protocols" + if len(environ_vars_to_add) > 0: + print(f"Running with the following temporary environment variables: {environ_vars_to_add}") + + with set_env(**environ_vars_to_add): + main() + diff --git a/tests/test_analyze.py b/tests/test_analyze.py index 404f660..32dc623 100644 --- a/tests/test_analyze.py +++ b/tests/test_analyze.py @@ -1,7 +1,7 @@ import json from pathlib import Path -from ot_analyze import analyze, generate_analysis_path +from ot_analyze import ProtocolPaths, analyze, generate_analysis_path import test_data.data as td @@ -27,30 +27,36 @@ def check_errors_in_analysis(analysis: Path): def test_analyze_ot2_positive(): - analyze(td.POSITIVE_OT2) - check_no_errors_in_analysis(generate_analysis_path(td.POSITIVE_OT2)) + protocol_path = ProtocolPaths(td.POSITIVE_OT2, generate_analysis_path(td.POSITIVE_OT2)) + analyze(protocol_path) + check_no_errors_in_analysis(protocol_path.analysis_file) def test_analyze_flex_positive(): - analyze(td.POSITIVE_FLEX) - check_no_errors_in_analysis(generate_analysis_path(td.POSITIVE_FLEX)) + protocol_path = ProtocolPaths(td.POSITIVE_FLEX, generate_analysis_path(td.POSITIVE_FLEX)) + analyze(protocol_path) + check_no_errors_in_analysis(protocol_path.analysis_file) def test_analyze_ot2_negative(): - analyze(td.ERROR) - check_errors_in_analysis(generate_analysis_path(td.ERROR)) + protocol_path = ProtocolPaths(td.ERROR, generate_analysis_path(td.ERROR)) + analyze(protocol_path) + check_errors_in_analysis(protocol_path.analysis_file) def test_analyze_flex_negative(): - analyze(td.FLEX_ERROR) - check_errors_in_analysis(generate_analysis_path(td.FLEX_ERROR)) + protocol_path = ProtocolPaths(td.FLEX_ERROR, generate_analysis_path(td.FLEX_ERROR)) + analyze(protocol_path) + check_errors_in_analysis(protocol_path.analysis_file) def test_analyze_json_positive(): - analyze(td.JSON_POSITIVE) - check_no_errors_in_analysis(generate_analysis_path(td.JSON_POSITIVE)) + protocol_path = ProtocolPaths(td.JSON_POSITIVE, generate_analysis_path(td.JSON_POSITIVE)) + analyze(protocol_path) + check_no_errors_in_analysis(protocol_path.analysis_file) def test_analyze_json_error(): - analyze(td.JSON_ERROR) - check_errors_in_analysis(generate_analysis_path(td.JSON_ERROR)) + protocol_path = ProtocolPaths(td.JSON_ERROR, generate_analysis_path(td.JSON_ERROR)) + analyze(protocol_path) + check_errors_in_analysis(protocol_path.analysis_file)