From 3c6ae05d9fbc158bfd4f9d32436fcf319e81bfcc Mon Sep 17 00:00:00 2001 From: FelixFehse Date: Mon, 17 Jun 2024 09:49:55 +0200 Subject: [PATCH 01/11] WIP feat: resume runs after failed PHS-520 --- .../evaluation/dataset/dataset_repository.py | 15 +++++++++++++ .../dataset/file_dataset_repository.py | 20 +++++++++++++++++ .../dataset/in_memory_dataset_repository.py | 10 +++++++++ .../single_huggingface_dataset_repository.py | 6 +++++ .../run/in_memory_run_repository.py | 22 +++++++++++++++++++ .../evaluation/run/run_repository.py | 16 ++++++++++++++ 6 files changed, 89 insertions(+) diff --git a/src/intelligence_layer/evaluation/dataset/dataset_repository.py b/src/intelligence_layer/evaluation/dataset/dataset_repository.py index 999aab760..0a9e63019 100644 --- a/src/intelligence_layer/evaluation/dataset/dataset_repository.py +++ b/src/intelligence_layer/evaluation/dataset/dataset_repository.py @@ -122,3 +122,18 @@ def examples( :class:`Iterable` of :class`Example`s. """ pass + + @abstractmethod + def example_ids( + self, + dataset_id: str + ) -> list[str]: + """Returns the sorted ids of all :class:`Example`s for a given dataset ID. + + Args: + dataset_id: Dataset ID to retrieve the example IDs from. + + Returns: + list of example IDs. + """ + pass diff --git a/src/intelligence_layer/evaluation/dataset/file_dataset_repository.py b/src/intelligence_layer/evaluation/dataset/file_dataset_repository.py index 85575c8ad..aba846383 100644 --- a/src/intelligence_layer/evaluation/dataset/file_dataset_repository.py +++ b/src/intelligence_layer/evaluation/dataset/file_dataset_repository.py @@ -126,6 +126,26 @@ def examples( ] return sorted(examples, key=lambda example: example.id) + + + def example_ids( + self, + dataset_id: str + ) -> list[str]: + example_path = self.path_to_str(self._dataset_examples_path(dataset_id)) + if not self._file_system.exists(example_path): + raise ValueError( + f"Repository does not contain a dataset with id: {dataset_id}" + ) + + with self._file_system.open( + example_path, "r", encoding="utf-8" + ) as examples_file: + # Mypy does not accept dynamic types + ids = [example.id for example in examples_file] + + return sorted(ids) + def _dataset_root_directory(self) -> Path: return self._root_directory / "datasets" diff --git a/src/intelligence_layer/evaluation/dataset/in_memory_dataset_repository.py b/src/intelligence_layer/evaluation/dataset/in_memory_dataset_repository.py index 2ca418cf9..ed4e9e473 100644 --- a/src/intelligence_layer/evaluation/dataset/in_memory_dataset_repository.py +++ b/src/intelligence_layer/evaluation/dataset/in_memory_dataset_repository.py @@ -89,3 +89,13 @@ def examples( key=lambda example: example.id, ), ) + + def example_ids( + self, + dataset_id: str, + ) -> Iterable[str]: + if dataset_id not in self._datasets_and_examples: + raise ValueError( + f"Repository does not contain a dataset with id: {dataset_id}" + ) + return sorted([example.id for example in self._datasets_and_examples[dataset_id][1]]) diff --git a/src/intelligence_layer/evaluation/dataset/single_huggingface_dataset_repository.py b/src/intelligence_layer/evaluation/dataset/single_huggingface_dataset_repository.py index 4608a248f..f3df986d7 100644 --- a/src/intelligence_layer/evaluation/dataset/single_huggingface_dataset_repository.py +++ b/src/intelligence_layer/evaluation/dataset/single_huggingface_dataset_repository.py @@ -86,3 +86,9 @@ def examples( expected_output=cast(ExpectedOutput, answers[sample["answer"]]), id=str(index), ) + + def example_ids( + self, + dataset_id: str, + ) -> Iterable[str]: + return [str(i) for i in range(self._huggingface_dataset["test"])] diff --git a/src/intelligence_layer/evaluation/run/in_memory_run_repository.py b/src/intelligence_layer/evaluation/run/in_memory_run_repository.py index 5051615b2..bb8cb7f3b 100644 --- a/src/intelligence_layer/evaluation/run/in_memory_run_repository.py +++ b/src/intelligence_layer/evaluation/run/in_memory_run_repository.py @@ -21,6 +21,28 @@ def store_run_overview(self, overview: RunOverview) -> None: if overview.id not in self._example_outputs: self._example_outputs[overview.id] = [] + + + + @abstractmethod + def create_temporary_run_data(self, run_id: str) -> None: + ... + + @abstractmethod + def delete_temporary_run_data(self, run_id: str) -> None: + ... + + @abstractmethod + def temp_store_finished_example(self, run_id: str, example_id: str) -> None: + ... + + @abstractmethod + def unfinished_examples(self) -> dict[str, Sequence[str]]: + ... + + + + def run_overview(self, run_id: str) -> Optional[RunOverview]: return self._run_overviews.get(run_id, None) diff --git a/src/intelligence_layer/evaluation/run/run_repository.py b/src/intelligence_layer/evaluation/run/run_repository.py index b6b9a7f91..acd29dee0 100644 --- a/src/intelligence_layer/evaluation/run/run_repository.py +++ b/src/intelligence_layer/evaluation/run/run_repository.py @@ -27,6 +27,22 @@ def store_run_overview(self, overview: RunOverview) -> None: """ ... + @abstractmethod + def create_temporary_run_data(self, run_id: str) -> None: + ... + + @abstractmethod + def delete_temporary_run_data(self, run_id: str) -> None: + ... + + @abstractmethod + def temp_store_finished_example(self, run_id: str, example_id: str) -> None: + ... + + @abstractmethod + def unfinished_examples(self) -> dict[str, Sequence[str]]: + ... + @abstractmethod def run_overview(self, run_id: str) -> Optional[RunOverview]: """Returns a :class:`RunOverview` for the given ID. From 550ec38074d501b67dae550d21ee96dc7d8b28f3 Mon Sep 17 00:00:00 2001 From: Sebastian Niehus Date: Mon, 17 Jun 2024 11:33:37 +0200 Subject: [PATCH 02/11] feat: Add `examples_to_skip` parameter to `DatasetRepository.examples` interface and derive methods TASK:PHS-520 --- .../evaluation/dataset/dataset_repository.py | 17 ++------- .../dataset/file_dataset_repository.py | 35 +++++-------------- .../dataset/in_memory_dataset_repository.py | 18 ++++------ .../single_huggingface_dataset_repository.py | 29 +++++++-------- .../evaluation/run/file_run_repository.py | 8 +++++ .../run/in_memory_run_repository.py | 22 +++--------- .../evaluation/run/run_repository.py | 12 +++---- .../dataset/test_dataset_repository.py | 31 ++++++++++++++++ .../test_hugging_face_dataset_repository.py | 6 +++- 9 files changed, 83 insertions(+), 95 deletions(-) diff --git a/src/intelligence_layer/evaluation/dataset/dataset_repository.py b/src/intelligence_layer/evaluation/dataset/dataset_repository.py index 0a9e63019..92ac1c3d9 100644 --- a/src/intelligence_layer/evaluation/dataset/dataset_repository.py +++ b/src/intelligence_layer/evaluation/dataset/dataset_repository.py @@ -110,6 +110,7 @@ def examples( dataset_id: str, input_type: type[Input], expected_output_type: type[ExpectedOutput], + examples_to_skip: Optional[frozenset[str]] = None, ) -> Iterable[Example[Input, ExpectedOutput]]: """Returns all :class:`Example`s for the given dataset ID sorted by their ID. @@ -117,23 +118,9 @@ def examples( dataset_id: Dataset ID whose examples should be retrieved. input_type: Input type of the example. expected_output_type: Expected output type of the example. + examples_to_skip: Optional list of example IDs. Those examples will be excluded from the output. Returns: :class:`Iterable` of :class`Example`s. """ pass - - @abstractmethod - def example_ids( - self, - dataset_id: str - ) -> list[str]: - """Returns the sorted ids of all :class:`Example`s for a given dataset ID. - - Args: - dataset_id: Dataset ID to retrieve the example IDs from. - - Returns: - list of example IDs. - """ - pass diff --git a/src/intelligence_layer/evaluation/dataset/file_dataset_repository.py b/src/intelligence_layer/evaluation/dataset/file_dataset_repository.py index aba846383..d17e788f3 100644 --- a/src/intelligence_layer/evaluation/dataset/file_dataset_repository.py +++ b/src/intelligence_layer/evaluation/dataset/file_dataset_repository.py @@ -107,7 +107,9 @@ def examples( dataset_id: str, input_type: type[Input], expected_output_type: type[ExpectedOutput], + examples_to_skip: Optional[frozenset[str]] = None, ) -> Iterable[Example[Input, ExpectedOutput]]: + examples_to_skip = examples_to_skip or frozenset() example_path = self.path_to_str(self._dataset_examples_path(dataset_id)) if not self._file_system.exists(example_path): raise ValueError( @@ -118,34 +120,15 @@ def examples( example_path, "r", encoding="utf-8" ) as examples_file: # Mypy does not accept dynamic types - examples = [ - Example[input_type, expected_output_type].model_validate_json( # type: ignore - json_data=example - ) - for example in examples_file - ] + examples = [] + for example in examples_file: + current_example = Example[ + input_type, expected_output_type # type: ignore + ].model_validate_json(json_data=example) + if current_example.id not in examples_to_skip: + examples.append(current_example) return sorted(examples, key=lambda example: example.id) - - - def example_ids( - self, - dataset_id: str - ) -> list[str]: - example_path = self.path_to_str(self._dataset_examples_path(dataset_id)) - if not self._file_system.exists(example_path): - raise ValueError( - f"Repository does not contain a dataset with id: {dataset_id}" - ) - - with self._file_system.open( - example_path, "r", encoding="utf-8" - ) as examples_file: - # Mypy does not accept dynamic types - ids = [example.id for example in examples_file] - - return sorted(ids) - def _dataset_root_directory(self) -> Path: return self._root_directory / "datasets" diff --git a/src/intelligence_layer/evaluation/dataset/in_memory_dataset_repository.py b/src/intelligence_layer/evaluation/dataset/in_memory_dataset_repository.py index ed4e9e473..d75eb261f 100644 --- a/src/intelligence_layer/evaluation/dataset/in_memory_dataset_repository.py +++ b/src/intelligence_layer/evaluation/dataset/in_memory_dataset_repository.py @@ -77,7 +77,9 @@ def examples( dataset_id: str, input_type: type[Input], expected_output_type: type[ExpectedOutput], + examples_to_skip: Optional[frozenset[str]] = None, ) -> Iterable[Example[Input, ExpectedOutput]]: + examples_to_skip = examples_to_skip or frozenset() if dataset_id not in self._datasets_and_examples: raise ValueError( f"Repository does not contain a dataset with id: {dataset_id}" @@ -85,17 +87,11 @@ def examples( return cast( Iterable[Example[Input, ExpectedOutput]], sorted( - self._datasets_and_examples[dataset_id][1], + [ + example + for example in self._datasets_and_examples[dataset_id][1] + if example.id not in examples_to_skip + ], key=lambda example: example.id, ), ) - - def example_ids( - self, - dataset_id: str, - ) -> Iterable[str]: - if dataset_id not in self._datasets_and_examples: - raise ValueError( - f"Repository does not contain a dataset with id: {dataset_id}" - ) - return sorted([example.id for example in self._datasets_and_examples[dataset_id][1]]) diff --git a/src/intelligence_layer/evaluation/dataset/single_huggingface_dataset_repository.py b/src/intelligence_layer/evaluation/dataset/single_huggingface_dataset_repository.py index f3df986d7..4b1227e2a 100644 --- a/src/intelligence_layer/evaluation/dataset/single_huggingface_dataset_repository.py +++ b/src/intelligence_layer/evaluation/dataset/single_huggingface_dataset_repository.py @@ -1,5 +1,5 @@ from collections.abc import Iterable, Sequence -from typing import cast +from typing import Optional, cast from datasets import Dataset as HFDataset # type: ignore from datasets import DatasetDict, IterableDataset, IterableDatasetDict @@ -71,24 +71,21 @@ def examples( dataset_id: str, input_type: type[Input], expected_output_type: type[ExpectedOutput], + examples_to_skip: Optional[frozenset[str]] = None, ) -> Iterable[Example[Input, ExpectedOutput]]: + examples_to_skip = examples_to_skip or frozenset() answers = "ABCD" assert input_type == MultipleChoiceInput assert expected_output_type == str for index, sample in enumerate(self._huggingface_dataset["test"]): - yield Example( - input=cast( - Input, - MultipleChoiceInput( - question=sample["question"], choices=sample["choices"] + if str(index) not in examples_to_skip: + yield Example( + input=cast( + Input, + MultipleChoiceInput( + question=sample["question"], choices=sample["choices"] + ), ), - ), - expected_output=cast(ExpectedOutput, answers[sample["answer"]]), - id=str(index), - ) - - def example_ids( - self, - dataset_id: str, - ) -> Iterable[str]: - return [str(i) for i in range(self._huggingface_dataset["test"])] + expected_output=cast(ExpectedOutput, answers[sample["answer"]]), + id=str(index), + ) diff --git a/src/intelligence_layer/evaluation/run/file_run_repository.py b/src/intelligence_layer/evaluation/run/file_run_repository.py index 984df154c..219208747 100644 --- a/src/intelligence_layer/evaluation/run/file_run_repository.py +++ b/src/intelligence_layer/evaluation/run/file_run_repository.py @@ -23,6 +23,14 @@ def store_run_overview(self, overview: RunOverview) -> None: # create empty folder just in case no examples are ever saved self.mkdir(self._run_directory(overview.id)) + def create_temporary_run_data(self, run_id: str) -> None: ... + + def delete_temporary_run_data(self, run_id: str) -> None: ... + + def temp_store_finished_example(self, run_id: str, example_id: str) -> None: ... + + def unfinished_examples(self) -> dict[str, Sequence[str]]: ... + def run_overview(self, run_id: str) -> Optional[RunOverview]: file_path = self._run_overview_path(run_id) if not self.exists(file_path): diff --git a/src/intelligence_layer/evaluation/run/in_memory_run_repository.py b/src/intelligence_layer/evaluation/run/in_memory_run_repository.py index bb8cb7f3b..9d250177c 100644 --- a/src/intelligence_layer/evaluation/run/in_memory_run_repository.py +++ b/src/intelligence_layer/evaluation/run/in_memory_run_repository.py @@ -21,27 +21,13 @@ def store_run_overview(self, overview: RunOverview) -> None: if overview.id not in self._example_outputs: self._example_outputs[overview.id] = [] + def create_temporary_run_data(self, run_id: str) -> None: ... + def delete_temporary_run_data(self, run_id: str) -> None: ... + def temp_store_finished_example(self, run_id: str, example_id: str) -> None: ... - @abstractmethod - def create_temporary_run_data(self, run_id: str) -> None: - ... - - @abstractmethod - def delete_temporary_run_data(self, run_id: str) -> None: - ... - - @abstractmethod - def temp_store_finished_example(self, run_id: str, example_id: str) -> None: - ... - - @abstractmethod - def unfinished_examples(self) -> dict[str, Sequence[str]]: - ... - - - + def unfinished_examples(self) -> dict[str, Sequence[str]]: ... def run_overview(self, run_id: str) -> Optional[RunOverview]: return self._run_overviews.get(run_id, None) diff --git a/src/intelligence_layer/evaluation/run/run_repository.py b/src/intelligence_layer/evaluation/run/run_repository.py index acd29dee0..bce3a7490 100644 --- a/src/intelligence_layer/evaluation/run/run_repository.py +++ b/src/intelligence_layer/evaluation/run/run_repository.py @@ -28,20 +28,16 @@ def store_run_overview(self, overview: RunOverview) -> None: ... @abstractmethod - def create_temporary_run_data(self, run_id: str) -> None: - ... + def create_temporary_run_data(self, run_id: str) -> None: ... @abstractmethod - def delete_temporary_run_data(self, run_id: str) -> None: - ... + def delete_temporary_run_data(self, run_id: str) -> None: ... @abstractmethod - def temp_store_finished_example(self, run_id: str, example_id: str) -> None: - ... + def temp_store_finished_example(self, run_id: str, example_id: str) -> None: ... @abstractmethod - def unfinished_examples(self) -> dict[str, Sequence[str]]: - ... + def unfinished_examples(self) -> dict[str, Sequence[str]]: ... @abstractmethod def run_overview(self, run_id: str) -> Optional[RunOverview]: diff --git a/tests/evaluation/dataset/test_dataset_repository.py b/tests/evaluation/dataset/test_dataset_repository.py index c92fc73cd..a348928ed 100644 --- a/tests/evaluation/dataset/test_dataset_repository.py +++ b/tests/evaluation/dataset/test_dataset_repository.py @@ -318,6 +318,37 @@ def test_examples_returns_all_examples_sorted_by_their_id( assert stored_examples == sorted(examples, key=lambda example: example.id) +@mark.parametrize( + "repository_fixture", + test_repository_fixtures, +) +def test_examples_skips_blacklisted_examples( + repository_fixture: str, + request: FixtureRequest, +) -> None: + dataset_repository: DatasetRepository = request.getfixturevalue(repository_fixture) + examples = [ + Example( + input=DummyStringInput(), + expected_output=DummyStringOutput(), + ) + for _ in range(0, 10) + ] + examples_to_skip = frozenset(example.id for example in examples[2:5]) + dataset = dataset_repository.create_dataset( + examples=examples, dataset_name="test-dataset" + ) + + retrieved_examples = list( + dataset_repository.examples( + dataset.id, DummyStringInput, DummyStringOutput, examples_to_skip + ) + ) + + assert len(retrieved_examples) == len(examples) - len(examples_to_skip) + assert all(example.id not in examples_to_skip for example in retrieved_examples) + + @mark.parametrize( "repository_fixture", test_repository_fixtures, diff --git a/tests/evaluation/dataset/test_hugging_face_dataset_repository.py b/tests/evaluation/dataset/test_hugging_face_dataset_repository.py index fd8ec5958..9a417ba77 100644 --- a/tests/evaluation/dataset/test_hugging_face_dataset_repository.py +++ b/tests/evaluation/dataset/test_hugging_face_dataset_repository.py @@ -1,5 +1,6 @@ from collections.abc import Iterable, Sequence from pathlib import Path +from typing import Optional from unittest.mock import patch from uuid import uuid4 @@ -25,9 +26,12 @@ def examples( dataset_id: str, input_type: type[Input], expected_output_type: type[ExpectedOutput], + examples_to_skip: Optional[frozenset[str]] = None, ) -> Iterable[Example[Input, ExpectedOutput]]: self.counter += 1 - return super().examples(dataset_id, input_type, expected_output_type) + return super().examples( + dataset_id, input_type, expected_output_type, examples_to_skip + ) @fixture From c70dcbd15912b441e017c934fc8abaf50722432e Mon Sep 17 00:00:00 2001 From: FelixFehse Date: Mon, 17 Jun 2024 15:00:13 +0200 Subject: [PATCH 03/11] WIP: implement temporary file logging PHS-520 --- .../file_system_based_repository.py | 3 ++ .../evaluation/run/file_run_repository.py | 36 ++++++++++++++++--- .../evaluation/run/run_repository.py | 31 +++++++++++++--- .../run/test_file_run_repository.py | 17 +++++++++ 4 files changed, 77 insertions(+), 10 deletions(-) diff --git a/src/intelligence_layer/evaluation/infrastructure/file_system_based_repository.py b/src/intelligence_layer/evaluation/infrastructure/file_system_based_repository.py index e3bd3bed6..74fbe1304 100644 --- a/src/intelligence_layer/evaluation/infrastructure/file_system_based_repository.py +++ b/src/intelligence_layer/evaluation/infrastructure/file_system_based_repository.py @@ -30,6 +30,9 @@ def read_utf8(self, path: Path) -> str: str, self._file_system.read_text(self.path_to_str(path), encoding="utf-8") ) + def remove_file(self, path: Path) -> None: + self._file_system.rm_file(path) + def exists(self, path: Path) -> bool: return cast(bool, self._file_system.exists(self.path_to_str(path))) diff --git a/src/intelligence_layer/evaluation/run/file_run_repository.py b/src/intelligence_layer/evaluation/run/file_run_repository.py index 219208747..6e9a3ab76 100644 --- a/src/intelligence_layer/evaluation/run/file_run_repository.py +++ b/src/intelligence_layer/evaluation/run/file_run_repository.py @@ -2,6 +2,7 @@ from pathlib import Path from typing import Optional +from fsspec import AbstractFileSystem # type: ignore from fsspec.implementations.local import LocalFileSystem # type: ignore from intelligence_layer.core import FileTracer, InMemoryTracer, JsonSerializer, Output @@ -14,6 +15,13 @@ class FileSystemRunRepository(RunRepository, FileSystemBasedRepository): + + TMP_FILE_TYPE : str = "tmp" + + def __init__(self, file_system: AbstractFileSystem, root_directory: Path) -> None: + FileSystemBasedRepository.__init__(self, file_system, root_directory) + RunRepository.__init__(self) + def store_run_overview(self, overview: RunOverview) -> None: self.write_utf8( self._run_overview_path(overview.id), @@ -23,13 +31,31 @@ def store_run_overview(self, overview: RunOverview) -> None: # create empty folder just in case no examples are ever saved self.mkdir(self._run_directory(overview.id)) - def create_temporary_run_data(self, run_id: str) -> None: ... - - def delete_temporary_run_data(self, run_id: str) -> None: ... + def _tmp_file_path(self, run_id: str) -> Path: + return self._run_directory(run_id + "." + self.TMP_FILE_TYPE) - def temp_store_finished_example(self, run_id: str, example_id: str) -> None: ... + def _create_temporary_run_data(self, run_id: str) -> None: + self.write_utf8( + self._tmp_file_path(run_id), + "", + create_parents=True, + ) - def unfinished_examples(self) -> dict[str, Sequence[str]]: ... + def _delete_temporary_run_data(self, run_id: str) -> None: + self.remove_file(self._tmp_file_path(run_id)) + + def _temp_store_finished_example(self, run_id: str, example_id: str) -> None: + with self._file_system.open(self._tmp_file_path(run_id), mode="a") as f: + f.write(example_id + "\n") + + def unfinished_examples(self) -> dict[str, Sequence[str]]: + res = {} + path = self._tmp_file_path("").parent + file_names = self.file_names(path, file_type=self.TMP_FILE_TYPE) + for run_id in file_names: + text = self.read_utf8(path / f"{run_id}.{self.TMP_FILE_TYPE}") + res[run_id] = text.splitlines() + return res def run_overview(self, run_id: str) -> Optional[RunOverview]: file_path = self._run_overview_path(run_id) diff --git a/src/intelligence_layer/evaluation/run/run_repository.py b/src/intelligence_layer/evaluation/run/run_repository.py index bce3a7490..207bf590c 100644 --- a/src/intelligence_layer/evaluation/run/run_repository.py +++ b/src/intelligence_layer/evaluation/run/run_repository.py @@ -1,5 +1,7 @@ from abc import ABC, abstractmethod from collections.abc import Iterable, Sequence +from multiprocessing import Lock as lock +from multiprocessing.synchronize import Lock from typing import Optional from intelligence_layer.core import Output, Tracer @@ -18,6 +20,9 @@ class RunRepository(ABC): representing results of a dataset. """ + def __init__(self) -> None: + self.locks: dict[str, Lock] = {} + @abstractmethod def store_run_overview(self, overview: RunOverview) -> None: """Stores a :class:`RunOverview`. @@ -25,19 +30,35 @@ def store_run_overview(self, overview: RunOverview) -> None: Args: overview: The overview to be persisted. """ - ... + pass @abstractmethod - def create_temporary_run_data(self, run_id: str) -> None: ... + def _create_temporary_run_data(self, run_id: str) -> None: + pass @abstractmethod - def delete_temporary_run_data(self, run_id: str) -> None: ... + def _delete_temporary_run_data(self, run_id: str) -> None: + pass @abstractmethod - def temp_store_finished_example(self, run_id: str, example_id: str) -> None: ... + def _temp_store_finished_example(self, run_id: str, example_id: str) -> None: + pass @abstractmethod - def unfinished_examples(self) -> dict[str, Sequence[str]]: ... + def unfinished_examples(self) -> dict[str, Sequence[str]]: + pass + + def create_temporary_run_data(self, run_id: str) -> None: + self.locks[run_id] = lock() + self._create_temporary_run_data(run_id) + + def delete_temporary_run_data(self, run_id: str) -> None: + del self.locks[run_id] + self._delete_temporary_run_data(run_id) + + def temp_store_finished_example(self, run_id: str, example_id: str) -> None: + with self.locks[run_id]: + self._temp_store_finished_example(run_id, example_id) @abstractmethod def run_overview(self, run_id: str) -> Optional[RunOverview]: diff --git a/tests/evaluation/run/test_file_run_repository.py b/tests/evaluation/run/test_file_run_repository.py index 600922879..060ebcb4c 100644 --- a/tests/evaluation/run/test_file_run_repository.py +++ b/tests/evaluation/run/test_file_run_repository.py @@ -1,4 +1,5 @@ import contextlib +import os from pydantic import BaseModel @@ -51,3 +52,19 @@ def test_example_run_does_not_create_a_folder( with contextlib.suppress(ValueError): file_run_repository.example_output("Non-existent", "Non-existent", DummyType) assert not file_run_repository._run_root_directory().exists() + + + +def test_temporary_file( + file_run_repository: FileRunRepository, +) -> None: + run_id = "abc123" + file_run_repository.create_temporary_run_data(run_id) + assert file_run_repository._tmp_file_path(run_id).exists() + + example_id = "9876lkjh" + file_run_repository.temp_store_finished_example(run_id=run_id, example_id=example_id) + with file_run_repository._tmp_file_path(run_id).open() as f: + lines = f.read().splitlines() + assert len(lines) == 1 + assert lines[0] == example_id From 47eece8fe8273b8bdce2c2afd6e1e716fcf0a206 Mon Sep 17 00:00:00 2001 From: Sebastian Niehus Date: Tue, 18 Jun 2024 09:48:19 +0200 Subject: [PATCH 04/11] feat: Refine test for run tracking Task: PHS-520 --- .../evaluation/run/file_run_repository.py | 11 +++--- .../run/in_memory_run_repository.py | 13 +++++-- .../evaluation/run/run_repository.py | 2 +- .../run/test_file_run_repository.py | 39 ++++++++++++------- 4 files changed, 42 insertions(+), 23 deletions(-) diff --git a/src/intelligence_layer/evaluation/run/file_run_repository.py b/src/intelligence_layer/evaluation/run/file_run_repository.py index 6e9a3ab76..bb89bd3df 100644 --- a/src/intelligence_layer/evaluation/run/file_run_repository.py +++ b/src/intelligence_layer/evaluation/run/file_run_repository.py @@ -15,8 +15,7 @@ class FileSystemRunRepository(RunRepository, FileSystemBasedRepository): - - TMP_FILE_TYPE : str = "tmp" + TMP_FILE_TYPE: str = "tmp" def __init__(self, file_system: AbstractFileSystem, root_directory: Path) -> None: FileSystemBasedRepository.__init__(self, file_system, root_directory) @@ -45,11 +44,13 @@ def _delete_temporary_run_data(self, run_id: str) -> None: self.remove_file(self._tmp_file_path(run_id)) def _temp_store_finished_example(self, run_id: str, example_id: str) -> None: - with self._file_system.open(self._tmp_file_path(run_id), mode="a") as f: + with self._file_system.open( + self.path_to_str(self._tmp_file_path(run_id)), mode="a" + ) as f: f.write(example_id + "\n") - def unfinished_examples(self) -> dict[str, Sequence[str]]: - res = {} + def finished_examples(self) -> dict[str, Sequence[str]]: + res: dict[str, Sequence[str]] = {} path = self._tmp_file_path("").parent file_names = self.file_names(path, file_type=self.TMP_FILE_TYPE) for run_id in file_names: diff --git a/src/intelligence_layer/evaluation/run/in_memory_run_repository.py b/src/intelligence_layer/evaluation/run/in_memory_run_repository.py index 9d250177c..af5a5cc1f 100644 --- a/src/intelligence_layer/evaluation/run/in_memory_run_repository.py +++ b/src/intelligence_layer/evaluation/run/in_memory_run_repository.py @@ -10,6 +10,7 @@ class InMemoryRunRepository(RunRepository): def __init__(self) -> None: + super().__init__() self._example_outputs: dict[str, list[ExampleOutput[PydanticSerializable]]] = ( defaultdict(list) ) @@ -21,13 +22,17 @@ def store_run_overview(self, overview: RunOverview) -> None: if overview.id not in self._example_outputs: self._example_outputs[overview.id] = [] - def create_temporary_run_data(self, run_id: str) -> None: ... + def _create_temporary_run_data(self, run_id: str) -> None: + pass - def delete_temporary_run_data(self, run_id: str) -> None: ... + def _delete_temporary_run_data(self, run_id: str) -> None: + pass - def temp_store_finished_example(self, run_id: str, example_id: str) -> None: ... + def _temp_store_finished_example(self, run_id: str, example_id: str) -> None: + pass - def unfinished_examples(self) -> dict[str, Sequence[str]]: ... + def finished_examples(self) -> dict[str, Sequence[str]]: + return dict() def run_overview(self, run_id: str) -> Optional[RunOverview]: return self._run_overviews.get(run_id, None) diff --git a/src/intelligence_layer/evaluation/run/run_repository.py b/src/intelligence_layer/evaluation/run/run_repository.py index 207bf590c..5a6068234 100644 --- a/src/intelligence_layer/evaluation/run/run_repository.py +++ b/src/intelligence_layer/evaluation/run/run_repository.py @@ -45,7 +45,7 @@ def _temp_store_finished_example(self, run_id: str, example_id: str) -> None: pass @abstractmethod - def unfinished_examples(self) -> dict[str, Sequence[str]]: + def finished_examples(self) -> dict[str, Sequence[str]]: pass def create_temporary_run_data(self, run_id: str) -> None: diff --git a/tests/evaluation/run/test_file_run_repository.py b/tests/evaluation/run/test_file_run_repository.py index 060ebcb4c..6ed4b1770 100644 --- a/tests/evaluation/run/test_file_run_repository.py +++ b/tests/evaluation/run/test_file_run_repository.py @@ -1,5 +1,5 @@ import contextlib -import os +from uuid import uuid4 from pydantic import BaseModel @@ -54,17 +54,30 @@ def test_example_run_does_not_create_a_folder( assert not file_run_repository._run_root_directory().exists() - def test_temporary_file( - file_run_repository: FileRunRepository, + file_run_repository: FileRunRepository, ) -> None: - run_id = "abc123" - file_run_repository.create_temporary_run_data(run_id) - assert file_run_repository._tmp_file_path(run_id).exists() - - example_id = "9876lkjh" - file_run_repository.temp_store_finished_example(run_id=run_id, example_id=example_id) - with file_run_repository._tmp_file_path(run_id).open() as f: - lines = f.read().splitlines() - assert len(lines) == 1 - assert lines[0] == example_id + expected_run_dictionary = { + str(uuid4()): [str(uuid4()), str(uuid4())] for _ in range(2) + } + # Create + for run_id in expected_run_dictionary: + file_run_repository.create_temporary_run_data(run_id) + + # Write + for run_id, example_ids in expected_run_dictionary.items(): + for example_id in example_ids: + file_run_repository.temp_store_finished_example( + run_id=run_id, example_id=example_id + ) + + # Read + finished_examples = file_run_repository.finished_examples() + assert finished_examples == expected_run_dictionary + + # Delete + run_ids = list(expected_run_dictionary.keys()) + for run_id in run_ids: + file_run_repository.delete_temporary_run_data(run_id) + del expected_run_dictionary[run_id] + assert file_run_repository.finished_examples() == expected_run_dictionary From 9b4b6132b707dcf84959fc2e12efb85a4d29d8eb Mon Sep 17 00:00:00 2001 From: Sebastian Niehus Date: Tue, 18 Jun 2024 11:47:12 +0200 Subject: [PATCH 05/11] feat: Refactor file format for tmp run file into json Task: PHS-520 --- .../evaluation/run/file_run_repository.py | 41 ++++++++++--------- .../run/in_memory_run_repository.py | 4 +- .../evaluation/run/run_repository.py | 31 +++++++------- .../run/test_file_run_repository.py | 30 +++++++------- 4 files changed, 55 insertions(+), 51 deletions(-) diff --git a/src/intelligence_layer/evaluation/run/file_run_repository.py b/src/intelligence_layer/evaluation/run/file_run_repository.py index bb89bd3df..52fa2a61d 100644 --- a/src/intelligence_layer/evaluation/run/file_run_repository.py +++ b/src/intelligence_layer/evaluation/run/file_run_repository.py @@ -1,3 +1,4 @@ +import json from collections.abc import Iterable, Sequence from pathlib import Path from typing import Optional @@ -5,6 +6,7 @@ from fsspec import AbstractFileSystem # type: ignore from fsspec.implementations.local import LocalFileSystem # type: ignore +from intelligence_layer.connectors.base.json_serializable import SerializableDict from intelligence_layer.core import FileTracer, InMemoryTracer, JsonSerializer, Output from intelligence_layer.core.tracer.tracer import Tracer from intelligence_layer.evaluation.infrastructure.file_system_based_repository import ( @@ -30,33 +32,34 @@ def store_run_overview(self, overview: RunOverview) -> None: # create empty folder just in case no examples are ever saved self.mkdir(self._run_directory(overview.id)) - def _tmp_file_path(self, run_id: str) -> Path: - return self._run_directory(run_id + "." + self.TMP_FILE_TYPE) + def _tmp_file_path(self, tmp_hash: str) -> Path: + return self._run_directory(tmp_hash + "." + self.TMP_FILE_TYPE) - def _create_temporary_run_data(self, run_id: str) -> None: + def _create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None: self.write_utf8( - self._tmp_file_path(run_id), - "", + self._tmp_file_path(tmp_hash), + f"{{\"run_id\": \"{run_id}\", \"finished_examples\": []}}", create_parents=True, ) - def _delete_temporary_run_data(self, run_id: str) -> None: - self.remove_file(self._tmp_file_path(run_id)) + def _delete_temporary_run_data(self, tmp_hash: str) -> None: + self.remove_file(self._tmp_file_path(tmp_hash)) - def _temp_store_finished_example(self, run_id: str, example_id: str) -> None: - with self._file_system.open( - self.path_to_str(self._tmp_file_path(run_id)), mode="a" - ) as f: - f.write(example_id + "\n") + def _temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None: + data: SerializableDict = json.loads(self.read_utf8(self._tmp_file_path(tmp_hash))) + data["finished_examples"].append(example_id) + self.write_utf8( + self._tmp_file_path(tmp_hash), + json.dumps(data), + create_parents=True, + ) - def finished_examples(self) -> dict[str, Sequence[str]]: - res: dict[str, Sequence[str]] = {} + def finished_examples(self, tmp_hash: str) -> dict[str, Sequence[str]]: path = self._tmp_file_path("").parent - file_names = self.file_names(path, file_type=self.TMP_FILE_TYPE) - for run_id in file_names: - text = self.read_utf8(path / f"{run_id}.{self.TMP_FILE_TYPE}") - res[run_id] = text.splitlines() - return res + try: + return json.loads(self.read_utf8(path / f"{tmp_hash}.{self.TMP_FILE_TYPE}")) + except FileNotFoundError: + return {} def run_overview(self, run_id: str) -> Optional[RunOverview]: file_path = self._run_overview_path(run_id) diff --git a/src/intelligence_layer/evaluation/run/in_memory_run_repository.py b/src/intelligence_layer/evaluation/run/in_memory_run_repository.py index af5a5cc1f..221f7808f 100644 --- a/src/intelligence_layer/evaluation/run/in_memory_run_repository.py +++ b/src/intelligence_layer/evaluation/run/in_memory_run_repository.py @@ -22,10 +22,10 @@ def store_run_overview(self, overview: RunOverview) -> None: if overview.id not in self._example_outputs: self._example_outputs[overview.id] = [] - def _create_temporary_run_data(self, run_id: str) -> None: + def _create_temporary_run_data(self, tmp_hash: str) -> None: pass - def _delete_temporary_run_data(self, run_id: str) -> None: + def _delete_temporary_run_data(self, tmp_hash: str) -> None: pass def _temp_store_finished_example(self, run_id: str, example_id: str) -> None: diff --git a/src/intelligence_layer/evaluation/run/run_repository.py b/src/intelligence_layer/evaluation/run/run_repository.py index 5a6068234..f2c4dbdea 100644 --- a/src/intelligence_layer/evaluation/run/run_repository.py +++ b/src/intelligence_layer/evaluation/run/run_repository.py @@ -2,7 +2,7 @@ from collections.abc import Iterable, Sequence from multiprocessing import Lock as lock from multiprocessing.synchronize import Lock -from typing import Optional +from typing import Optional, final from intelligence_layer.core import Output, Tracer from intelligence_layer.evaluation.run.domain import ( @@ -33,32 +33,35 @@ def store_run_overview(self, overview: RunOverview) -> None: pass @abstractmethod - def _create_temporary_run_data(self, run_id: str) -> None: + def _create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None: pass @abstractmethod - def _delete_temporary_run_data(self, run_id: str) -> None: + def _delete_temporary_run_data(self, tmp_hash: str) -> None: pass @abstractmethod - def _temp_store_finished_example(self, run_id: str, example_id: str) -> None: + def _temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None: pass @abstractmethod - def finished_examples(self) -> dict[str, Sequence[str]]: + def finished_examples(self, tmp_hash: str) -> dict[str, Sequence[str]]: pass - def create_temporary_run_data(self, run_id: str) -> None: - self.locks[run_id] = lock() - self._create_temporary_run_data(run_id) + @final + def create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None: + self.locks[tmp_hash] = lock() + self._create_temporary_run_data(tmp_hash, run_id) - def delete_temporary_run_data(self, run_id: str) -> None: - del self.locks[run_id] - self._delete_temporary_run_data(run_id) + @final + def delete_temporary_run_data(self, tmp_hash: str) -> None: + del self.locks[tmp_hash] + self._delete_temporary_run_data(tmp_hash) - def temp_store_finished_example(self, run_id: str, example_id: str) -> None: - with self.locks[run_id]: - self._temp_store_finished_example(run_id, example_id) + @final + def temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None: + with self.locks[tmp_hash]: + self._temp_store_finished_example(tmp_hash, example_id) @abstractmethod def run_overview(self, run_id: str) -> Optional[RunOverview]: diff --git a/tests/evaluation/run/test_file_run_repository.py b/tests/evaluation/run/test_file_run_repository.py index 6ed4b1770..e4fa86f85 100644 --- a/tests/evaluation/run/test_file_run_repository.py +++ b/tests/evaluation/run/test_file_run_repository.py @@ -57,27 +57,25 @@ def test_example_run_does_not_create_a_folder( def test_temporary_file( file_run_repository: FileRunRepository, ) -> None: - expected_run_dictionary = { - str(uuid4()): [str(uuid4()), str(uuid4())] for _ in range(2) - } + + test_hash = str(uuid4()) + run_id = str(uuid4()) + + expected_run_dictionary = {"run_id": run_id, "finished_examples": [str(uuid4()), str(uuid4())]} + # Create - for run_id in expected_run_dictionary: - file_run_repository.create_temporary_run_data(run_id) + file_run_repository.create_temporary_run_data(test_hash, run_id) # Write - for run_id, example_ids in expected_run_dictionary.items(): - for example_id in example_ids: - file_run_repository.temp_store_finished_example( - run_id=run_id, example_id=example_id - ) + for example_id in expected_run_dictionary["finished_examples"]: + file_run_repository.temp_store_finished_example( + tmp_hash=test_hash, example_id=example_id + ) # Read - finished_examples = file_run_repository.finished_examples() + finished_examples = file_run_repository.finished_examples(test_hash) assert finished_examples == expected_run_dictionary # Delete - run_ids = list(expected_run_dictionary.keys()) - for run_id in run_ids: - file_run_repository.delete_temporary_run_data(run_id) - del expected_run_dictionary[run_id] - assert file_run_repository.finished_examples() == expected_run_dictionary + file_run_repository.delete_temporary_run_data(test_hash) + assert file_run_repository.finished_examples(test_hash) == {} From cbbd485defa4d5c42494c43da3275bc1a9efe503 Mon Sep 17 00:00:00 2001 From: Sebastian Niehus Date: Tue, 18 Jun 2024 12:04:24 +0200 Subject: [PATCH 06/11] feat: Create class RecoveryData Task: PHS-520 --- .../evaluation/run/file_run_repository.py | 23 ++++++++++--------- .../run/in_memory_run_repository.py | 10 ++++---- .../evaluation/run/run_repository.py | 9 +++++++- .../run/test_file_run_repository.py | 12 ++++++---- 4 files changed, 32 insertions(+), 22 deletions(-) diff --git a/src/intelligence_layer/evaluation/run/file_run_repository.py b/src/intelligence_layer/evaluation/run/file_run_repository.py index 52fa2a61d..946231372 100644 --- a/src/intelligence_layer/evaluation/run/file_run_repository.py +++ b/src/intelligence_layer/evaluation/run/file_run_repository.py @@ -1,4 +1,3 @@ -import json from collections.abc import Iterable, Sequence from pathlib import Path from typing import Optional @@ -6,14 +5,13 @@ from fsspec import AbstractFileSystem # type: ignore from fsspec.implementations.local import LocalFileSystem # type: ignore -from intelligence_layer.connectors.base.json_serializable import SerializableDict from intelligence_layer.core import FileTracer, InMemoryTracer, JsonSerializer, Output from intelligence_layer.core.tracer.tracer import Tracer from intelligence_layer.evaluation.infrastructure.file_system_based_repository import ( FileSystemBasedRepository, ) from intelligence_layer.evaluation.run.domain import ExampleOutput, RunOverview -from intelligence_layer.evaluation.run.run_repository import RunRepository +from intelligence_layer.evaluation.run.run_repository import RecoveryData, RunRepository class FileSystemRunRepository(RunRepository, FileSystemBasedRepository): @@ -38,7 +36,7 @@ def _tmp_file_path(self, tmp_hash: str) -> Path: def _create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None: self.write_utf8( self._tmp_file_path(tmp_hash), - f"{{\"run_id\": \"{run_id}\", \"finished_examples\": []}}", + RecoveryData(run_id=run_id).model_dump_json(), create_parents=True, ) @@ -46,20 +44,23 @@ def _delete_temporary_run_data(self, tmp_hash: str) -> None: self.remove_file(self._tmp_file_path(tmp_hash)) def _temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None: - data: SerializableDict = json.loads(self.read_utf8(self._tmp_file_path(tmp_hash))) - data["finished_examples"].append(example_id) + data = RecoveryData.model_validate_json( + self.read_utf8(self._tmp_file_path(tmp_hash)) + ) + data.finished_examples.append(example_id) self.write_utf8( self._tmp_file_path(tmp_hash), - json.dumps(data), + data.model_dump_json(), create_parents=True, ) - def finished_examples(self, tmp_hash: str) -> dict[str, Sequence[str]]: - path = self._tmp_file_path("").parent + def finished_examples(self, tmp_hash: str) -> Optional[RecoveryData]: try: - return json.loads(self.read_utf8(path / f"{tmp_hash}.{self.TMP_FILE_TYPE}")) + return RecoveryData.model_validate_json( + self.read_utf8(self._tmp_file_path(tmp_hash)) + ) except FileNotFoundError: - return {} + return None def run_overview(self, run_id: str) -> Optional[RunOverview]: file_path = self._run_overview_path(run_id) diff --git a/src/intelligence_layer/evaluation/run/in_memory_run_repository.py b/src/intelligence_layer/evaluation/run/in_memory_run_repository.py index 221f7808f..1ac773233 100644 --- a/src/intelligence_layer/evaluation/run/in_memory_run_repository.py +++ b/src/intelligence_layer/evaluation/run/in_memory_run_repository.py @@ -5,7 +5,7 @@ from intelligence_layer.core import InMemoryTracer, Output, PydanticSerializable from intelligence_layer.core.tracer.tracer import Tracer from intelligence_layer.evaluation.run.domain import ExampleOutput, RunOverview -from intelligence_layer.evaluation.run.run_repository import RunRepository +from intelligence_layer.evaluation.run.run_repository import RecoveryData, RunRepository class InMemoryRunRepository(RunRepository): @@ -22,17 +22,17 @@ def store_run_overview(self, overview: RunOverview) -> None: if overview.id not in self._example_outputs: self._example_outputs[overview.id] = [] - def _create_temporary_run_data(self, tmp_hash: str) -> None: + def _create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None: pass def _delete_temporary_run_data(self, tmp_hash: str) -> None: pass - def _temp_store_finished_example(self, run_id: str, example_id: str) -> None: + def _temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None: pass - def finished_examples(self) -> dict[str, Sequence[str]]: - return dict() + def finished_examples(self, tmp_hash: str) -> Optional[RecoveryData]: + return None def run_overview(self, run_id: str) -> Optional[RunOverview]: return self._run_overviews.get(run_id, None) diff --git a/src/intelligence_layer/evaluation/run/run_repository.py b/src/intelligence_layer/evaluation/run/run_repository.py index f2c4dbdea..a6d0cfbe3 100644 --- a/src/intelligence_layer/evaluation/run/run_repository.py +++ b/src/intelligence_layer/evaluation/run/run_repository.py @@ -4,6 +4,8 @@ from multiprocessing.synchronize import Lock from typing import Optional, final +from pydantic import BaseModel + from intelligence_layer.core import Output, Tracer from intelligence_layer.evaluation.run.domain import ( ExampleOutput, @@ -12,6 +14,11 @@ ) +class RecoveryData(BaseModel): + run_id: str + finished_examples: list[str] = [] + + class RunRepository(ABC): """Base run repository interface. @@ -45,7 +52,7 @@ def _temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None: pass @abstractmethod - def finished_examples(self, tmp_hash: str) -> dict[str, Sequence[str]]: + def finished_examples(self, tmp_hash: str) -> Optional[RecoveryData]: pass @final diff --git a/tests/evaluation/run/test_file_run_repository.py b/tests/evaluation/run/test_file_run_repository.py index e4fa86f85..8eddee274 100644 --- a/tests/evaluation/run/test_file_run_repository.py +++ b/tests/evaluation/run/test_file_run_repository.py @@ -4,6 +4,7 @@ from pydantic import BaseModel from intelligence_layer.evaluation.run.file_run_repository import FileRunRepository +from intelligence_layer.evaluation.run.run_repository import RecoveryData """Contains specific test for the FileRunRepository. For more generic tests, check the test_run_repository file.""" @@ -57,25 +58,26 @@ def test_example_run_does_not_create_a_folder( def test_temporary_file( file_run_repository: FileRunRepository, ) -> None: - test_hash = str(uuid4()) run_id = str(uuid4()) - expected_run_dictionary = {"run_id": run_id, "finished_examples": [str(uuid4()), str(uuid4())]} + expected_recovery_data = RecoveryData( + run_id=run_id, finished_examples=[str(uuid4()), str(uuid4())] + ) # Create file_run_repository.create_temporary_run_data(test_hash, run_id) # Write - for example_id in expected_run_dictionary["finished_examples"]: + for example_id in expected_recovery_data.finished_examples: file_run_repository.temp_store_finished_example( tmp_hash=test_hash, example_id=example_id ) # Read finished_examples = file_run_repository.finished_examples(test_hash) - assert finished_examples == expected_run_dictionary + assert finished_examples == expected_recovery_data # Delete file_run_repository.delete_temporary_run_data(test_hash) - assert file_run_repository.finished_examples(test_hash) == {} + assert file_run_repository.finished_examples(test_hash) is None From 077c053d4b69f3bea65e637def8df06a50eb79be Mon Sep 17 00:00:00 2001 From: FelixFehse Date: Tue, 18 Jun 2024 14:20:39 +0200 Subject: [PATCH 07/11] feat: runner can resume failed examples --- .../evaluation/run/runner.py | 46 +++++++++++++------ tests/evaluation/run/test_runner.py | 43 +++++++++++++++++ 2 files changed, 76 insertions(+), 13 deletions(-) diff --git a/src/intelligence_layer/evaluation/run/runner.py b/src/intelligence_layer/evaluation/run/runner.py index 20d96d790..94ecef776 100644 --- a/src/intelligence_layer/evaluation/run/runner.py +++ b/src/intelligence_layer/evaluation/run/runner.py @@ -75,6 +75,9 @@ def input_type(self) -> type[Input]: ) from None return cast(type[Input], input_type) + def _run_hash(self, dataset_id: str, run_description: str) -> str: + return str(hash(dataset_id + self.description + run_description)) + def run_dataset( self, dataset_id: str, @@ -86,6 +89,7 @@ def run_dataset( trace_examples_individually: bool = True, labels: Optional[set[str]] = None, metadata: Optional[SerializableDict] = None, + resume_from_recovery_data: bool = False, ) -> RunOverview: """Generates all outputs for the provided dataset. @@ -104,6 +108,7 @@ def run_dataset( trace_examples_individually: Flag to create individual tracers for each example. Defaults to True. labels: A list of labels for filtering. Defaults to an empty list. metadata: A dict for additional information about the run overview. Defaults to an empty dict. + resume_from_recovery_data: Flag to resume if execution crashed previously. Returns: An overview of the run. Outputs will not be returned but instead stored in the @@ -114,6 +119,28 @@ def run_dataset( if metadata is None: metadata = dict() + run_id = str(uuid4()) + tmp_hash = self._run_hash(dataset_id, description or "") + + recovery_data = self._run_repository.finished_examples(tmp_hash) + finished_examples: frozenset[str] = frozenset() + if recovery_data is not None and resume_from_recovery_data: + run_id = recovery_data.run_id + finished_examples = frozenset(recovery_data.finished_examples) + else: + self._run_repository.create_temporary_run_data(tmp_hash, run_id) + + examples = self._dataset_repository.examples( + dataset_id, + self.input_type(), + JsonValue, # type: ignore + examples_to_skip=finished_examples, + ) + if examples is None: + raise ValueError(f"Dataset with id {dataset_id} not found") + if num_examples: + examples = islice(examples, num_examples) + def run( example: Example[Input, ExpectedOutput], ) -> tuple[str, Output | FailedExampleRun]: @@ -128,27 +155,18 @@ def run( else: example_tracer = NoOpTracer() try: - return example.id, self._task.run(example.input, example_tracer) + output = self._task.run(example.input, example_tracer) + self._run_repository.temp_store_finished_example(tmp_hash, example.id) + return example.id, output except Exception as e: if abort_on_error: raise e print( f'FAILED RUN: example "{example.id}", {type(e).__qualname__}: "{e}"' ) + self._run_repository.temp_store_finished_example(tmp_hash, example.id) return example.id, FailedExampleRun.from_exception(e) - # mypy does not like union types - - examples = self._dataset_repository.examples( - dataset_id, - self.input_type(), - JsonValue, # type: ignore - ) - if examples is None: - raise ValueError(f"Dataset with id {dataset_id} not found") - if num_examples: - examples = islice(examples, num_examples) - run_id = str(uuid4()) start = utc_now() with ThreadPoolExecutor(max_workers=max_workers) as executor: ids_and_outputs = tqdm(executor.map(run, examples), desc="Running") @@ -165,6 +183,8 @@ def run( run_id=run_id, example_id=example_id, output=output ), ) + self._run_repository.delete_temporary_run_data(tmp_hash) + full_description = ( self.description + " : " + description if description else self.description ) diff --git a/tests/evaluation/run/test_runner.py b/tests/evaluation/run/test_runner.py index f3e32eb06..16282918a 100644 --- a/tests/evaluation/run/test_runner.py +++ b/tests/evaluation/run/test_runner.py @@ -12,6 +12,8 @@ InMemoryRunRepository, Runner, ) +from intelligence_layer.evaluation.run.file_run_repository import FileRunRepository +from intelligence_layer.evaluation.run.run_repository import RecoveryData from tests.evaluation.conftest import FAIL_IN_TASK_INPUT, DummyTask @@ -97,6 +99,47 @@ def test_runner_aborts_on_error( runner.run_dataset(dataset_id, abort_on_error=True) +def test_runner_resumes_after_error_in_task( + in_memory_dataset_repository: InMemoryDatasetRepository, + file_run_repository: FileRunRepository, + sequence_examples: Iterable[Example[str, None]], +) -> None: + task = DummyTask() + runner = Runner( + task, in_memory_dataset_repository, file_run_repository, "dummy-runner" + ) + + dataset_id = in_memory_dataset_repository.create_dataset( + examples=sequence_examples, dataset_name="test-dataset" + ).id + + fail_example_id = "" + for example in sequence_examples: + if example.input != FAIL_IN_TASK_INPUT: + continue + fail_example_id = example.id + assert fail_example_id != "" + + run_description = "my_run" + tmp_hash = runner._run_hash(dataset_id, run_description) + + with pytest.raises(RuntimeError): + runner.run_dataset(dataset_id, abort_on_error=True, description=run_description) + + recovery_data = file_run_repository.finished_examples(tmp_hash) + assert fail_example_id not in recovery_data.finished_examples + + examples = in_memory_dataset_repository._datasets_and_examples[dataset_id][1] + for example in examples: + if example.input == FAIL_IN_TASK_INPUT: + example.input = "do_not_fail_me" + + runner.run_dataset(dataset_id, abort_on_error=True, description=run_description, resume_from_recovery_data=True) + assert file_run_repository.finished_examples(tmp_hash) is None + + # TODO : we are not yet correctly tracking the number of failed and successful example counts + + def test_runner_runs_n_examples( in_memory_dataset_repository: InMemoryDatasetRepository, in_memory_run_repository: InMemoryRunRepository, From 673a89597e5408a4cd15311f9ed613540ea83cd2 Mon Sep 17 00:00:00 2001 From: FelixFehse Date: Tue, 18 Jun 2024 17:57:05 +0200 Subject: [PATCH 08/11] fix: all tests passing --- .../evaluation/run/run_repository.py | 7 +++ .../evaluation/run/runner.py | 62 ++++++++++++------- tests/evaluation/run/test_runner.py | 19 ++++-- 3 files changed, 60 insertions(+), 28 deletions(-) diff --git a/src/intelligence_layer/evaluation/run/run_repository.py b/src/intelligence_layer/evaluation/run/run_repository.py index a6d0cfbe3..13c6300d6 100644 --- a/src/intelligence_layer/evaluation/run/run_repository.py +++ b/src/intelligence_layer/evaluation/run/run_repository.py @@ -111,6 +111,13 @@ def store_example_output(self, example_output: ExampleOutput[Output]) -> None: """ ... + @final + def store_example_output_parallel( + self, tmp_hash: str, example_output: ExampleOutput[Output] + ) -> None: + with self.locks[tmp_hash]: + self.store_example_output(example_output) + @abstractmethod def example_output( self, run_id: str, example_id: str, output_type: type[Output] diff --git a/src/intelligence_layer/evaluation/run/runner.py b/src/intelligence_layer/evaluation/run/runner.py index 94ecef776..9a407c2a3 100644 --- a/src/intelligence_layer/evaluation/run/runner.py +++ b/src/intelligence_layer/evaluation/run/runner.py @@ -6,7 +6,6 @@ from uuid import uuid4 from pydantic import JsonValue -from tqdm import tqdm from intelligence_layer.connectors.base.json_serializable import ( SerializableDict, @@ -143,7 +142,7 @@ def run_dataset( def run( example: Example[Input, ExpectedOutput], - ) -> tuple[str, Output | FailedExampleRun]: + ) -> None: if trace_examples_individually: example_tracer = self._run_repository.create_tracer_for_example( run_id, example.id @@ -154,52 +153,71 @@ def run( example_tracer = tracer else: example_tracer = NoOpTracer() + + output: Output | FailedExampleRun try: output = self._task.run(example.input, example_tracer) - self._run_repository.temp_store_finished_example(tmp_hash, example.id) - return example.id, output except Exception as e: if abort_on_error: raise e print( f'FAILED RUN: example "{example.id}", {type(e).__qualname__}: "{e}"' ) - self._run_repository.temp_store_finished_example(tmp_hash, example.id) - return example.id, FailedExampleRun.from_exception(e) + output = FailedExampleRun.from_exception(e) + + self._run_repository.store_example_output_parallel( + tmp_hash, + ExampleOutput[Output]( + run_id=run_id, example_id=example.id, output=output + ), + ) + self._run_repository.temp_store_finished_example(tmp_hash, example.id) start = utc_now() + with ThreadPoolExecutor(max_workers=max_workers) as executor: - ids_and_outputs = tqdm(executor.map(run, examples), desc="Running") - - failed_count = 0 - successful_count = 0 - for example_id, output in ids_and_outputs: - if isinstance(output, FailedExampleRun): - failed_count += 1 - else: - successful_count += 1 - self._run_repository.store_example_output( - ExampleOutput[Output]( - run_id=run_id, example_id=example_id, output=output - ), - ) + for _ in executor.map(run, examples): + pass # the result of the map must be retrieved for the exceptions to be raised. self._run_repository.delete_temporary_run_data(tmp_hash) full_description = ( self.description + " : " + description if description else self.description ) + run_overview = RunOverview( + dataset_id=dataset_id, + id=run_id, + start=start, + end=utc_now(), + failed_example_count=0, + successful_example_count=0, + description=full_description, + labels=labels, + metadata=metadata, + ) + self._run_repository.store_run_overview(run_overview) + + successful = 0 + failed = 0 + for example_output in self._run_repository.example_outputs( + run_id, self.output_type() + ): + if isinstance(example_output.output, FailedExampleRun): + failed += 1 + else: + successful += 1 run_overview = RunOverview( dataset_id=dataset_id, id=run_id, start=start, end=utc_now(), - failed_example_count=failed_count, - successful_example_count=successful_count, + failed_example_count=failed, + successful_example_count=successful, description=full_description, labels=labels, metadata=metadata, ) + self._run_repository.store_run_overview(run_overview) return run_overview diff --git a/tests/evaluation/run/test_runner.py b/tests/evaluation/run/test_runner.py index 16282918a..3ac71e205 100644 --- a/tests/evaluation/run/test_runner.py +++ b/tests/evaluation/run/test_runner.py @@ -1,4 +1,4 @@ -from collections.abc import Iterable +from collections.abc import Iterable, Sequence import pytest @@ -13,7 +13,6 @@ Runner, ) from intelligence_layer.evaluation.run.file_run_repository import FileRunRepository -from intelligence_layer.evaluation.run.run_repository import RecoveryData from tests.evaluation.conftest import FAIL_IN_TASK_INPUT, DummyTask @@ -115,8 +114,8 @@ def test_runner_resumes_after_error_in_task( fail_example_id = "" for example in sequence_examples: - if example.input != FAIL_IN_TASK_INPUT: - continue + if example.input != FAIL_IN_TASK_INPUT: + continue fail_example_id = example.id assert fail_example_id != "" @@ -127,14 +126,22 @@ def test_runner_resumes_after_error_in_task( runner.run_dataset(dataset_id, abort_on_error=True, description=run_description) recovery_data = file_run_repository.finished_examples(tmp_hash) + assert recovery_data assert fail_example_id not in recovery_data.finished_examples - examples = in_memory_dataset_repository._datasets_and_examples[dataset_id][1] + examples: Sequence[Example[str, None]] = ( + in_memory_dataset_repository._datasets_and_examples[dataset_id][1] # type: ignore + ) for example in examples: if example.input == FAIL_IN_TASK_INPUT: example.input = "do_not_fail_me" - runner.run_dataset(dataset_id, abort_on_error=True, description=run_description, resume_from_recovery_data=True) + runner.run_dataset( + dataset_id, + abort_on_error=True, + description=run_description, + resume_from_recovery_data=True, + ) assert file_run_repository.finished_examples(tmp_hash) is None # TODO : we are not yet correctly tracking the number of failed and successful example counts From 8d378300e07dbcf62aa72e7f0e746002b8162e6f Mon Sep 17 00:00:00 2001 From: Sebastian Niehus Date: Wed, 19 Jun 2024 10:32:07 +0200 Subject: [PATCH 09/11] feat: Change behavior of example_outputs on nonexistent run_id. Task: PHS-520 --- CHANGELOG.md | 4 ++-- .../evaluation/run/file_run_repository.py | 4 +++- .../run/in_memory_run_repository.py | 6 ++++-- .../evaluation/run/runner.py | 19 +++++-------------- 4 files changed, 14 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9525b08a5..bd1cfbba4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,10 +3,10 @@ ## Unreleased ### Breaking Changes -... + - `RunRepository.example_outputs()` now returns and empty list and a warning when there is no associated record for the given `run_id` instead of raising a `ValueError`. ### Features -... + - `Runner`s using the `FileRunRepository` can now be resumed after a crash or on exceptions by setting the `resume_from_recovery_data` flag of the `Runner.run_dataset` method to `True`. ### Fixes ... diff --git a/src/intelligence_layer/evaluation/run/file_run_repository.py b/src/intelligence_layer/evaluation/run/file_run_repository.py index 946231372..68e15db99 100644 --- a/src/intelligence_layer/evaluation/run/file_run_repository.py +++ b/src/intelligence_layer/evaluation/run/file_run_repository.py @@ -1,3 +1,4 @@ +import warnings from collections.abc import Iterable, Sequence from pathlib import Path from typing import Optional @@ -110,7 +111,8 @@ def example_outputs( ) -> Iterable[ExampleOutput[Output]]: path = self._run_output_directory(run_id) if not self.exists(path): - raise ValueError(f"Repository does not contain a run with id: {run_id}") + warnings.warn(f"Repository does not contain a run with id: {run_id}") + return [] example_outputs = [] for file_name in self.file_names(path): diff --git a/src/intelligence_layer/evaluation/run/in_memory_run_repository.py b/src/intelligence_layer/evaluation/run/in_memory_run_repository.py index 1ac773233..d353f53fb 100644 --- a/src/intelligence_layer/evaluation/run/in_memory_run_repository.py +++ b/src/intelligence_layer/evaluation/run/in_memory_run_repository.py @@ -1,3 +1,4 @@ +import warnings from collections import defaultdict from collections.abc import Iterable, Sequence from typing import Optional, cast @@ -70,8 +71,9 @@ def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer: def example_outputs( self, run_id: str, output_type: type[Output] ) -> Iterable[ExampleOutput[Output]]: - if run_id not in self._run_overviews: - raise ValueError(f"Repository does not contain a run with id: {run_id}") + if run_id not in self._example_outputs and run_id not in self._run_overviews: + warnings.warn(f"Repository does not contain a run with id: {run_id}") + return [] return ( cast(ExampleOutput[Output], example_output) diff --git a/src/intelligence_layer/evaluation/run/runner.py b/src/intelligence_layer/evaluation/run/runner.py index 9a407c2a3..288a916df 100644 --- a/src/intelligence_layer/evaluation/run/runner.py +++ b/src/intelligence_layer/evaluation/run/runner.py @@ -1,3 +1,4 @@ +import concurrent.futures from collections.abc import Iterable from concurrent.futures import ThreadPoolExecutor from inspect import get_annotations @@ -176,25 +177,15 @@ def run( start = utc_now() with ThreadPoolExecutor(max_workers=max_workers) as executor: - for _ in executor.map(run, examples): - pass # the result of the map must be retrieved for the exceptions to be raised. + futures = [executor.submit(run, example) for example in examples] + concurrent.futures.wait(futures) + for future in futures: + future.result() # result of the futures must be retrieved for exceptions to be raised self._run_repository.delete_temporary_run_data(tmp_hash) full_description = ( self.description + " : " + description if description else self.description ) - run_overview = RunOverview( - dataset_id=dataset_id, - id=run_id, - start=start, - end=utc_now(), - failed_example_count=0, - successful_example_count=0, - description=full_description, - labels=labels, - metadata=metadata, - ) - self._run_repository.store_run_overview(run_overview) successful = 0 failed = 0 From d5f9ce04e7ec062e6bc3c748a7684126c8962c9c Mon Sep 17 00:00:00 2001 From: Sebastian Niehus Date: Wed, 19 Jun 2024 12:00:11 +0200 Subject: [PATCH 10/11] feat: Enable resuming of InMemoryRunRepository based Runners * Unify behavior of example_output and example_outputs * Refactor tests Task: PHS-520 --- CHANGELOG.md | 7 ++- .../evaluation/run/file_run_repository.py | 29 ++++++----- .../run/in_memory_run_repository.py | 38 ++++++++------ .../evaluation/run/run_repository.py | 52 +++++++++---------- .../evaluation/run/runner.py | 2 +- .../run/test_file_run_repository.py | 33 ++---------- tests/evaluation/run/test_run_repository.py | 45 +++++++++++++++- .../classify/test_prompt_based_classify.py | 1 + 8 files changed, 117 insertions(+), 90 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bd1cfbba4..19458bcd8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,10 +3,13 @@ ## Unreleased ### Breaking Changes - - `RunRepository.example_outputs()` now returns and empty list and a warning when there is no associated record for the given `run_id` instead of raising a `ValueError`. +- `RunRepository.example_output` now returns `None` and prints a warning when there is no associated record for the given `run_id` instead of raising a `ValueError`. +- `RunRepository.example_outputs` now returns an empty list and prints a warning when there is no associated record for the given `run_id` instead of raising a `ValueError`. ### Features - - `Runner`s using the `FileRunRepository` can now be resumed after a crash or on exceptions by setting the `resume_from_recovery_data` flag of the `Runner.run_dataset` method to `True`. + - `Runner.run_dataset` can now be resumed after failure by setting the `resume_from_recovery_data` flag to `True` and calling `Runner.run_dataset` again. + - For `InMemoryRunRepository` based `Runner`s this is limited to runs that failed with an exception that did not crash the whole process/kernel. + - For `FileRunRepository` based `Runners` even runs that crashed the whole process can be resumed. ### Fixes ... diff --git a/src/intelligence_layer/evaluation/run/file_run_repository.py b/src/intelligence_layer/evaluation/run/file_run_repository.py index 68e15db99..ba4f0b8b8 100644 --- a/src/intelligence_layer/evaluation/run/file_run_repository.py +++ b/src/intelligence_layer/evaluation/run/file_run_repository.py @@ -86,9 +86,10 @@ def example_output( self, run_id: str, example_id: str, output_type: type[Output] ) -> Optional[ExampleOutput[Output]]: file_path = self._example_output_path(run_id, example_id) - if not self.exists(file_path.parent): - raise ValueError(f"Repository does not contain a run with id: {run_id}") if not self.exists(file_path): + warnings.warn( + f'Repository does not contain a run with id: "{run_id}"', UserWarning + ) return None content = self.read_utf8(file_path) # mypy does not accept dynamic types @@ -96,22 +97,14 @@ def example_output( json_data=content ) - def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]: - file_path = self._example_trace_path(run_id, example_id) - if not self.exists(file_path): - return None - return self._parse_log(file_path) - - def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer: - file_path = self._example_trace_path(run_id, example_id) - return FileTracer(file_path) - def example_outputs( self, run_id: str, output_type: type[Output] ) -> Iterable[ExampleOutput[Output]]: path = self._run_output_directory(run_id) if not self.exists(path): - warnings.warn(f"Repository does not contain a run with id: {run_id}") + warnings.warn( + f'Repository does not contain a run with id: "{run_id}"', UserWarning + ) return [] example_outputs = [] @@ -143,6 +136,16 @@ def _run_output_directory(self, run_id: str) -> Path: def _run_overview_path(self, run_id: str) -> Path: return self._run_directory(run_id).with_suffix(".json") + def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]: + file_path = self._example_trace_path(run_id, example_id) + if not self.exists(file_path): + return None + return self._parse_log(file_path) + + def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer: + file_path = self._example_trace_path(run_id, example_id) + return FileTracer(file_path) + def _trace_directory(self, run_id: str) -> Path: path = self._run_directory(run_id) / "trace" return path diff --git a/src/intelligence_layer/evaluation/run/in_memory_run_repository.py b/src/intelligence_layer/evaluation/run/in_memory_run_repository.py index d353f53fb..21ec362e0 100644 --- a/src/intelligence_layer/evaluation/run/in_memory_run_repository.py +++ b/src/intelligence_layer/evaluation/run/in_memory_run_repository.py @@ -17,6 +17,7 @@ def __init__(self) -> None: ) self._example_traces: dict[str, Tracer] = dict() self._run_overviews: dict[str, RunOverview] = dict() + self._recovery_data: dict[str, RecoveryData] = dict() def store_run_overview(self, overview: RunOverview) -> None: self._run_overviews[overview.id] = overview @@ -24,16 +25,19 @@ def store_run_overview(self, overview: RunOverview) -> None: self._example_outputs[overview.id] = [] def _create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None: - pass + self._recovery_data[tmp_hash] = RecoveryData(run_id=run_id) def _delete_temporary_run_data(self, tmp_hash: str) -> None: - pass + del self._recovery_data[tmp_hash] def _temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None: - pass + self._recovery_data[tmp_hash].finished_examples.append(example_id) def finished_examples(self, tmp_hash: str) -> Optional[RecoveryData]: - return None + if tmp_hash in self._recovery_data: + return self._recovery_data[tmp_hash] + else: + return None def run_overview(self, run_id: str) -> Optional[RunOverview]: return self._run_overviews.get(run_id, None) @@ -50,9 +54,9 @@ def example_output( self, run_id: str, example_id: str, output_type: type[Output] ) -> Optional[ExampleOutput[Output]]: if run_id not in self._example_outputs: - raise ValueError(f"Repository does not contain a run with id: {run_id}") - - if run_id not in self._example_outputs: + warnings.warn( + f'Repository does not contain a run with id: "{run_id}"', UserWarning + ) return None for example_output in self._example_outputs[run_id]: @@ -60,19 +64,13 @@ def example_output( return cast(ExampleOutput[Output], example_output) return None - def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]: - return self._example_traces.get(f"{run_id}/{example_id}") - - def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer: - tracer = InMemoryTracer() - self._example_traces[f"{run_id}/{example_id}"] = tracer - return tracer - def example_outputs( self, run_id: str, output_type: type[Output] ) -> Iterable[ExampleOutput[Output]]: if run_id not in self._example_outputs and run_id not in self._run_overviews: - warnings.warn(f"Repository does not contain a run with id: {run_id}") + warnings.warn( + f'Repository does not contain a run with id: "{run_id}"', UserWarning + ) return [] return ( @@ -90,3 +88,11 @@ def example_output_ids(self, run_id: str) -> Sequence[str]: for example_output in self._example_outputs[run_id] ] ) + + def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]: + return self._example_traces.get(f"{run_id}/{example_id}") + + def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer: + tracer = InMemoryTracer() + self._example_traces[f"{run_id}/{example_id}"] = tracer + return tracer diff --git a/src/intelligence_layer/evaluation/run/run_repository.py b/src/intelligence_layer/evaluation/run/run_repository.py index 13c6300d6..94ccf07cc 100644 --- a/src/intelligence_layer/evaluation/run/run_repository.py +++ b/src/intelligence_layer/evaluation/run/run_repository.py @@ -134,32 +134,6 @@ def example_output( """ ... - @abstractmethod - def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]: - """Returns an :class:`Optional[Tracer]` for the given run ID and example ID. - - Args: - run_id: The ID of the linked run overview. - example_id: ID of the example whose :class:`Tracer` should be retrieved. - - Returns: - A :class:`Tracer` if it was found, `None` otherwise. - """ - ... - - @abstractmethod - def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer: - """Creates and returns a :class:`Tracer` for the given run ID and example ID. - - Args: - run_id: The ID of the linked run overview. - example_id: ID of the example whose :class:`Tracer` should be retrieved. - - Returns: - A :.class:`Tracer`. - """ - ... - @abstractmethod def example_outputs( self, run_id: str, output_type: type[Output] @@ -216,3 +190,29 @@ def failed_example_outputs( """ results = self.example_outputs(run_id, output_type) return (r for r in results if isinstance(r.output, FailedExampleRun)) + + @abstractmethod + def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]: + """Returns an :class:`Optional[Tracer]` for the given run ID and example ID. + + Args: + run_id: The ID of the linked run overview. + example_id: ID of the example whose :class:`Tracer` should be retrieved. + + Returns: + A :class:`Tracer` if it was found, `None` otherwise. + """ + ... + + @abstractmethod + def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer: + """Creates and returns a :class:`Tracer` for the given run ID and example ID. + + Args: + run_id: The ID of the linked run overview. + example_id: ID of the example whose :class:`Tracer` should be retrieved. + + Returns: + A :.class:`Tracer`. + """ + ... diff --git a/src/intelligence_layer/evaluation/run/runner.py b/src/intelligence_layer/evaluation/run/runner.py index 288a916df..a8b74dff5 100644 --- a/src/intelligence_layer/evaluation/run/runner.py +++ b/src/intelligence_layer/evaluation/run/runner.py @@ -108,7 +108,7 @@ def run_dataset( trace_examples_individually: Flag to create individual tracers for each example. Defaults to True. labels: A list of labels for filtering. Defaults to an empty list. metadata: A dict for additional information about the run overview. Defaults to an empty dict. - resume_from_recovery_data: Flag to resume if execution crashed previously. + resume_from_recovery_data: Flag to resume if execution failed previously. Returns: An overview of the run. Outputs will not be returned but instead stored in the diff --git a/tests/evaluation/run/test_file_run_repository.py b/tests/evaluation/run/test_file_run_repository.py index 8eddee274..3bb88c88a 100644 --- a/tests/evaluation/run/test_file_run_repository.py +++ b/tests/evaluation/run/test_file_run_repository.py @@ -1,10 +1,9 @@ import contextlib -from uuid import uuid4 +import pytest from pydantic import BaseModel from intelligence_layer.evaluation.run.file_run_repository import FileRunRepository -from intelligence_layer.evaluation.run.run_repository import RecoveryData """Contains specific test for the FileRunRepository. For more generic tests, check the test_run_repository file.""" @@ -33,6 +32,7 @@ def test_run_overview_does_not_create_a_folder( assert not file_run_repository._run_root_directory().exists() +@pytest.mark.filterwarnings("ignore::UserWarning") def test_example_runs_does_not_create_a_folder( file_run_repository: FileRunRepository, ) -> None: @@ -44,6 +44,7 @@ def test_example_runs_does_not_create_a_folder( assert not file_run_repository._run_root_directory().exists() +@pytest.mark.filterwarnings("ignore::UserWarning") def test_example_run_does_not_create_a_folder( file_run_repository: FileRunRepository, ) -> None: @@ -53,31 +54,3 @@ def test_example_run_does_not_create_a_folder( with contextlib.suppress(ValueError): file_run_repository.example_output("Non-existent", "Non-existent", DummyType) assert not file_run_repository._run_root_directory().exists() - - -def test_temporary_file( - file_run_repository: FileRunRepository, -) -> None: - test_hash = str(uuid4()) - run_id = str(uuid4()) - - expected_recovery_data = RecoveryData( - run_id=run_id, finished_examples=[str(uuid4()), str(uuid4())] - ) - - # Create - file_run_repository.create_temporary_run_data(test_hash, run_id) - - # Write - for example_id in expected_recovery_data.finished_examples: - file_run_repository.temp_store_finished_example( - tmp_hash=test_hash, example_id=example_id - ) - - # Read - finished_examples = file_run_repository.finished_examples(test_hash) - assert finished_examples == expected_recovery_data - - # Delete - file_run_repository.delete_temporary_run_data(test_hash) - assert file_run_repository.finished_examples(test_hash) is None diff --git a/tests/evaluation/run/test_run_repository.py b/tests/evaluation/run/test_run_repository.py index 3bdba5ea8..6b05b5147 100644 --- a/tests/evaluation/run/test_run_repository.py +++ b/tests/evaluation/run/test_run_repository.py @@ -9,6 +9,7 @@ from intelligence_layer.core import CompositeTracer, InMemoryTracer, utc_now from intelligence_layer.evaluation import ExampleOutput, RunOverview, RunRepository from intelligence_layer.evaluation.run.domain import FailedExampleRun +from intelligence_layer.evaluation.run.run_repository import RecoveryData from tests.evaluation.conftest import DummyStringInput test_repository_fixtures = [ @@ -62,6 +63,7 @@ def test_run_repository_stores_and_returns_example_output( "repository_fixture", test_repository_fixtures, ) +@pytest.mark.filterwarnings("ignore::UserWarning") def test_example_output_returns_none_for_not_existing_example_id( repository_fixture: str, request: FixtureRequest, @@ -82,6 +84,7 @@ def test_example_output_returns_none_for_not_existing_example_id( "repository_fixture", test_repository_fixtures, ) +@pytest.mark.filterwarnings("ignore::UserWarning") def test_example_output_returns_none_for_not_existing_run_id( repository_fixture: str, request: FixtureRequest, @@ -92,12 +95,16 @@ def test_example_output_returns_none_for_not_existing_run_id( example_output = ExampleOutput(run_id=run_id, example_id=example_id, output=None) run_repository.store_example_output(example_output) - with pytest.raises(ValueError): + assert ( run_repository.example_output("not-existing-run-id", example_id, type(None)) - with pytest.raises(ValueError): + is None + ) + assert ( run_repository.example_output( "not-existing-run-id", "not-existing-example-id", type(None) ) + is None + ) @mark.parametrize( @@ -304,3 +311,37 @@ def test_successful_example_outputs_returns_only_successful_examples( assert len(successful_outputs) == 1 assert successful_outputs[0].example_id == "2" + + +@mark.parametrize( + "repository_fixture", + test_repository_fixtures, +) +def test_recovery_from_crash( + repository_fixture: str, + request: FixtureRequest, +) -> None: + run_repository: RunRepository = request.getfixturevalue(repository_fixture) + test_hash = str(uuid4()) + run_id = str(uuid4()) + + expected_recovery_data = RecoveryData( + run_id=run_id, finished_examples=[str(uuid4()), str(uuid4())] + ) + + # Create + run_repository.create_temporary_run_data(test_hash, run_id) + + # Write + for example_id in expected_recovery_data.finished_examples: + run_repository.temp_store_finished_example( + tmp_hash=test_hash, example_id=example_id + ) + + # Read + finished_examples = run_repository.finished_examples(test_hash) + assert finished_examples == expected_recovery_data + + # Delete + run_repository.delete_temporary_run_data(test_hash) + assert run_repository.finished_examples(test_hash) is None diff --git a/tests/examples/classify/test_prompt_based_classify.py b/tests/examples/classify/test_prompt_based_classify.py index 4c9e0613a..acebfc209 100644 --- a/tests/examples/classify/test_prompt_based_classify.py +++ b/tests/examples/classify/test_prompt_based_classify.py @@ -288,6 +288,7 @@ def test_can_aggregate_evaluations( assert aggregation_overview.statistics.percentage_correct == 0.5 +@pytest.mark.filterwarnings("ignore::UserWarning") def test_aggregating_evaluations_works_with_empty_list( classify_evaluator: Evaluator[ ClassifyInput, From a83f9a9430f7d0a14e4da96f0a078be3f37fde8c Mon Sep 17 00:00:00 2001 From: Sebastian Niehus Date: Wed, 19 Jun 2024 13:27:57 +0200 Subject: [PATCH 11/11] feat: Update CHANGELOG.md Task: PHS-520 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 19458bcd8..08211b741 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - `Runner.run_dataset` can now be resumed after failure by setting the `resume_from_recovery_data` flag to `True` and calling `Runner.run_dataset` again. - For `InMemoryRunRepository` based `Runner`s this is limited to runs that failed with an exception that did not crash the whole process/kernel. - For `FileRunRepository` based `Runners` even runs that crashed the whole process can be resumed. + - `DatasetRepository.examples` now accepts an optional parameter `examples_to_skip` to enable skipping of `Example`s with the provided IDs. ### Fixes ...