diff --git a/CHANGELOG.md b/CHANGELOG.md index 9525b08a5..08211b741 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,10 +3,14 @@ ## Unreleased ### Breaking Changes -... +- `RunRepository.example_output` now returns `None` and prints a warning when there is no associated record for the given `run_id` instead of raising a `ValueError`. +- `RunRepository.example_outputs` now returns an empty list and prints a warning when there is no associated record for the given `run_id` instead of raising a `ValueError`. ### Features -... + - `Runner.run_dataset` can now be resumed after failure by setting the `resume_from_recovery_data` flag to `True` and calling `Runner.run_dataset` again. + - For `InMemoryRunRepository` based `Runner`s this is limited to runs that failed with an exception that did not crash the whole process/kernel. + - For `FileRunRepository` based `Runners` even runs that crashed the whole process can be resumed. + - `DatasetRepository.examples` now accepts an optional parameter `examples_to_skip` to enable skipping of `Example`s with the provided IDs. ### Fixes ... diff --git a/src/intelligence_layer/evaluation/dataset/dataset_repository.py b/src/intelligence_layer/evaluation/dataset/dataset_repository.py index 999aab760..92ac1c3d9 100644 --- a/src/intelligence_layer/evaluation/dataset/dataset_repository.py +++ b/src/intelligence_layer/evaluation/dataset/dataset_repository.py @@ -110,6 +110,7 @@ def examples( dataset_id: str, input_type: type[Input], expected_output_type: type[ExpectedOutput], + examples_to_skip: Optional[frozenset[str]] = None, ) -> Iterable[Example[Input, ExpectedOutput]]: """Returns all :class:`Example`s for the given dataset ID sorted by their ID. @@ -117,6 +118,7 @@ def examples( dataset_id: Dataset ID whose examples should be retrieved. input_type: Input type of the example. expected_output_type: Expected output type of the example. + examples_to_skip: Optional list of example IDs. Those examples will be excluded from the output. Returns: :class:`Iterable` of :class`Example`s. diff --git a/src/intelligence_layer/evaluation/dataset/file_dataset_repository.py b/src/intelligence_layer/evaluation/dataset/file_dataset_repository.py index 85575c8ad..d17e788f3 100644 --- a/src/intelligence_layer/evaluation/dataset/file_dataset_repository.py +++ b/src/intelligence_layer/evaluation/dataset/file_dataset_repository.py @@ -107,7 +107,9 @@ def examples( dataset_id: str, input_type: type[Input], expected_output_type: type[ExpectedOutput], + examples_to_skip: Optional[frozenset[str]] = None, ) -> Iterable[Example[Input, ExpectedOutput]]: + examples_to_skip = examples_to_skip or frozenset() example_path = self.path_to_str(self._dataset_examples_path(dataset_id)) if not self._file_system.exists(example_path): raise ValueError( @@ -118,12 +120,13 @@ def examples( example_path, "r", encoding="utf-8" ) as examples_file: # Mypy does not accept dynamic types - examples = [ - Example[input_type, expected_output_type].model_validate_json( # type: ignore - json_data=example - ) - for example in examples_file - ] + examples = [] + for example in examples_file: + current_example = Example[ + input_type, expected_output_type # type: ignore + ].model_validate_json(json_data=example) + if current_example.id not in examples_to_skip: + examples.append(current_example) return sorted(examples, key=lambda example: example.id) diff --git a/src/intelligence_layer/evaluation/dataset/in_memory_dataset_repository.py b/src/intelligence_layer/evaluation/dataset/in_memory_dataset_repository.py index 2ca418cf9..d75eb261f 100644 --- a/src/intelligence_layer/evaluation/dataset/in_memory_dataset_repository.py +++ b/src/intelligence_layer/evaluation/dataset/in_memory_dataset_repository.py @@ -77,7 +77,9 @@ def examples( dataset_id: str, input_type: type[Input], expected_output_type: type[ExpectedOutput], + examples_to_skip: Optional[frozenset[str]] = None, ) -> Iterable[Example[Input, ExpectedOutput]]: + examples_to_skip = examples_to_skip or frozenset() if dataset_id not in self._datasets_and_examples: raise ValueError( f"Repository does not contain a dataset with id: {dataset_id}" @@ -85,7 +87,11 @@ def examples( return cast( Iterable[Example[Input, ExpectedOutput]], sorted( - self._datasets_and_examples[dataset_id][1], + [ + example + for example in self._datasets_and_examples[dataset_id][1] + if example.id not in examples_to_skip + ], key=lambda example: example.id, ), ) diff --git a/src/intelligence_layer/evaluation/dataset/single_huggingface_dataset_repository.py b/src/intelligence_layer/evaluation/dataset/single_huggingface_dataset_repository.py index 4608a248f..4b1227e2a 100644 --- a/src/intelligence_layer/evaluation/dataset/single_huggingface_dataset_repository.py +++ b/src/intelligence_layer/evaluation/dataset/single_huggingface_dataset_repository.py @@ -1,5 +1,5 @@ from collections.abc import Iterable, Sequence -from typing import cast +from typing import Optional, cast from datasets import Dataset as HFDataset # type: ignore from datasets import DatasetDict, IterableDataset, IterableDatasetDict @@ -71,18 +71,21 @@ def examples( dataset_id: str, input_type: type[Input], expected_output_type: type[ExpectedOutput], + examples_to_skip: Optional[frozenset[str]] = None, ) -> Iterable[Example[Input, ExpectedOutput]]: + examples_to_skip = examples_to_skip or frozenset() answers = "ABCD" assert input_type == MultipleChoiceInput assert expected_output_type == str for index, sample in enumerate(self._huggingface_dataset["test"]): - yield Example( - input=cast( - Input, - MultipleChoiceInput( - question=sample["question"], choices=sample["choices"] + if str(index) not in examples_to_skip: + yield Example( + input=cast( + Input, + MultipleChoiceInput( + question=sample["question"], choices=sample["choices"] + ), ), - ), - expected_output=cast(ExpectedOutput, answers[sample["answer"]]), - id=str(index), - ) + expected_output=cast(ExpectedOutput, answers[sample["answer"]]), + id=str(index), + ) diff --git a/src/intelligence_layer/evaluation/infrastructure/file_system_based_repository.py b/src/intelligence_layer/evaluation/infrastructure/file_system_based_repository.py index e3bd3bed6..74fbe1304 100644 --- a/src/intelligence_layer/evaluation/infrastructure/file_system_based_repository.py +++ b/src/intelligence_layer/evaluation/infrastructure/file_system_based_repository.py @@ -30,6 +30,9 @@ def read_utf8(self, path: Path) -> str: str, self._file_system.read_text(self.path_to_str(path), encoding="utf-8") ) + def remove_file(self, path: Path) -> None: + self._file_system.rm_file(path) + def exists(self, path: Path) -> bool: return cast(bool, self._file_system.exists(self.path_to_str(path))) diff --git a/src/intelligence_layer/evaluation/run/file_run_repository.py b/src/intelligence_layer/evaluation/run/file_run_repository.py index 984df154c..ba4f0b8b8 100644 --- a/src/intelligence_layer/evaluation/run/file_run_repository.py +++ b/src/intelligence_layer/evaluation/run/file_run_repository.py @@ -1,7 +1,9 @@ +import warnings from collections.abc import Iterable, Sequence from pathlib import Path from typing import Optional +from fsspec import AbstractFileSystem # type: ignore from fsspec.implementations.local import LocalFileSystem # type: ignore from intelligence_layer.core import FileTracer, InMemoryTracer, JsonSerializer, Output @@ -10,10 +12,16 @@ FileSystemBasedRepository, ) from intelligence_layer.evaluation.run.domain import ExampleOutput, RunOverview -from intelligence_layer.evaluation.run.run_repository import RunRepository +from intelligence_layer.evaluation.run.run_repository import RecoveryData, RunRepository class FileSystemRunRepository(RunRepository, FileSystemBasedRepository): + TMP_FILE_TYPE: str = "tmp" + + def __init__(self, file_system: AbstractFileSystem, root_directory: Path) -> None: + FileSystemBasedRepository.__init__(self, file_system, root_directory) + RunRepository.__init__(self) + def store_run_overview(self, overview: RunOverview) -> None: self.write_utf8( self._run_overview_path(overview.id), @@ -23,6 +31,38 @@ def store_run_overview(self, overview: RunOverview) -> None: # create empty folder just in case no examples are ever saved self.mkdir(self._run_directory(overview.id)) + def _tmp_file_path(self, tmp_hash: str) -> Path: + return self._run_directory(tmp_hash + "." + self.TMP_FILE_TYPE) + + def _create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None: + self.write_utf8( + self._tmp_file_path(tmp_hash), + RecoveryData(run_id=run_id).model_dump_json(), + create_parents=True, + ) + + def _delete_temporary_run_data(self, tmp_hash: str) -> None: + self.remove_file(self._tmp_file_path(tmp_hash)) + + def _temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None: + data = RecoveryData.model_validate_json( + self.read_utf8(self._tmp_file_path(tmp_hash)) + ) + data.finished_examples.append(example_id) + self.write_utf8( + self._tmp_file_path(tmp_hash), + data.model_dump_json(), + create_parents=True, + ) + + def finished_examples(self, tmp_hash: str) -> Optional[RecoveryData]: + try: + return RecoveryData.model_validate_json( + self.read_utf8(self._tmp_file_path(tmp_hash)) + ) + except FileNotFoundError: + return None + def run_overview(self, run_id: str) -> Optional[RunOverview]: file_path = self._run_overview_path(run_id) if not self.exists(file_path): @@ -46,9 +86,10 @@ def example_output( self, run_id: str, example_id: str, output_type: type[Output] ) -> Optional[ExampleOutput[Output]]: file_path = self._example_output_path(run_id, example_id) - if not self.exists(file_path.parent): - raise ValueError(f"Repository does not contain a run with id: {run_id}") if not self.exists(file_path): + warnings.warn( + f'Repository does not contain a run with id: "{run_id}"', UserWarning + ) return None content = self.read_utf8(file_path) # mypy does not accept dynamic types @@ -56,22 +97,15 @@ def example_output( json_data=content ) - def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]: - file_path = self._example_trace_path(run_id, example_id) - if not self.exists(file_path): - return None - return self._parse_log(file_path) - - def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer: - file_path = self._example_trace_path(run_id, example_id) - return FileTracer(file_path) - def example_outputs( self, run_id: str, output_type: type[Output] ) -> Iterable[ExampleOutput[Output]]: path = self._run_output_directory(run_id) if not self.exists(path): - raise ValueError(f"Repository does not contain a run with id: {run_id}") + warnings.warn( + f'Repository does not contain a run with id: "{run_id}"', UserWarning + ) + return [] example_outputs = [] for file_name in self.file_names(path): @@ -102,6 +136,16 @@ def _run_output_directory(self, run_id: str) -> Path: def _run_overview_path(self, run_id: str) -> Path: return self._run_directory(run_id).with_suffix(".json") + def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]: + file_path = self._example_trace_path(run_id, example_id) + if not self.exists(file_path): + return None + return self._parse_log(file_path) + + def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer: + file_path = self._example_trace_path(run_id, example_id) + return FileTracer(file_path) + def _trace_directory(self, run_id: str) -> Path: path = self._run_directory(run_id) / "trace" return path diff --git a/src/intelligence_layer/evaluation/run/in_memory_run_repository.py b/src/intelligence_layer/evaluation/run/in_memory_run_repository.py index 5051615b2..21ec362e0 100644 --- a/src/intelligence_layer/evaluation/run/in_memory_run_repository.py +++ b/src/intelligence_layer/evaluation/run/in_memory_run_repository.py @@ -1,3 +1,4 @@ +import warnings from collections import defaultdict from collections.abc import Iterable, Sequence from typing import Optional, cast @@ -5,22 +6,39 @@ from intelligence_layer.core import InMemoryTracer, Output, PydanticSerializable from intelligence_layer.core.tracer.tracer import Tracer from intelligence_layer.evaluation.run.domain import ExampleOutput, RunOverview -from intelligence_layer.evaluation.run.run_repository import RunRepository +from intelligence_layer.evaluation.run.run_repository import RecoveryData, RunRepository class InMemoryRunRepository(RunRepository): def __init__(self) -> None: + super().__init__() self._example_outputs: dict[str, list[ExampleOutput[PydanticSerializable]]] = ( defaultdict(list) ) self._example_traces: dict[str, Tracer] = dict() self._run_overviews: dict[str, RunOverview] = dict() + self._recovery_data: dict[str, RecoveryData] = dict() def store_run_overview(self, overview: RunOverview) -> None: self._run_overviews[overview.id] = overview if overview.id not in self._example_outputs: self._example_outputs[overview.id] = [] + def _create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None: + self._recovery_data[tmp_hash] = RecoveryData(run_id=run_id) + + def _delete_temporary_run_data(self, tmp_hash: str) -> None: + del self._recovery_data[tmp_hash] + + def _temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None: + self._recovery_data[tmp_hash].finished_examples.append(example_id) + + def finished_examples(self, tmp_hash: str) -> Optional[RecoveryData]: + if tmp_hash in self._recovery_data: + return self._recovery_data[tmp_hash] + else: + return None + def run_overview(self, run_id: str) -> Optional[RunOverview]: return self._run_overviews.get(run_id, None) @@ -36,9 +54,9 @@ def example_output( self, run_id: str, example_id: str, output_type: type[Output] ) -> Optional[ExampleOutput[Output]]: if run_id not in self._example_outputs: - raise ValueError(f"Repository does not contain a run with id: {run_id}") - - if run_id not in self._example_outputs: + warnings.warn( + f'Repository does not contain a run with id: "{run_id}"', UserWarning + ) return None for example_output in self._example_outputs[run_id]: @@ -46,19 +64,14 @@ def example_output( return cast(ExampleOutput[Output], example_output) return None - def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]: - return self._example_traces.get(f"{run_id}/{example_id}") - - def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer: - tracer = InMemoryTracer() - self._example_traces[f"{run_id}/{example_id}"] = tracer - return tracer - def example_outputs( self, run_id: str, output_type: type[Output] ) -> Iterable[ExampleOutput[Output]]: - if run_id not in self._run_overviews: - raise ValueError(f"Repository does not contain a run with id: {run_id}") + if run_id not in self._example_outputs and run_id not in self._run_overviews: + warnings.warn( + f'Repository does not contain a run with id: "{run_id}"', UserWarning + ) + return [] return ( cast(ExampleOutput[Output], example_output) @@ -75,3 +88,11 @@ def example_output_ids(self, run_id: str) -> Sequence[str]: for example_output in self._example_outputs[run_id] ] ) + + def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]: + return self._example_traces.get(f"{run_id}/{example_id}") + + def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer: + tracer = InMemoryTracer() + self._example_traces[f"{run_id}/{example_id}"] = tracer + return tracer diff --git a/src/intelligence_layer/evaluation/run/run_repository.py b/src/intelligence_layer/evaluation/run/run_repository.py index b6b9a7f91..94ccf07cc 100644 --- a/src/intelligence_layer/evaluation/run/run_repository.py +++ b/src/intelligence_layer/evaluation/run/run_repository.py @@ -1,6 +1,10 @@ from abc import ABC, abstractmethod from collections.abc import Iterable, Sequence -from typing import Optional +from multiprocessing import Lock as lock +from multiprocessing.synchronize import Lock +from typing import Optional, final + +from pydantic import BaseModel from intelligence_layer.core import Output, Tracer from intelligence_layer.evaluation.run.domain import ( @@ -10,6 +14,11 @@ ) +class RecoveryData(BaseModel): + run_id: str + finished_examples: list[str] = [] + + class RunRepository(ABC): """Base run repository interface. @@ -18,6 +27,9 @@ class RunRepository(ABC): representing results of a dataset. """ + def __init__(self) -> None: + self.locks: dict[str, Lock] = {} + @abstractmethod def store_run_overview(self, overview: RunOverview) -> None: """Stores a :class:`RunOverview`. @@ -25,7 +37,38 @@ def store_run_overview(self, overview: RunOverview) -> None: Args: overview: The overview to be persisted. """ - ... + pass + + @abstractmethod + def _create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None: + pass + + @abstractmethod + def _delete_temporary_run_data(self, tmp_hash: str) -> None: + pass + + @abstractmethod + def _temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None: + pass + + @abstractmethod + def finished_examples(self, tmp_hash: str) -> Optional[RecoveryData]: + pass + + @final + def create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None: + self.locks[tmp_hash] = lock() + self._create_temporary_run_data(tmp_hash, run_id) + + @final + def delete_temporary_run_data(self, tmp_hash: str) -> None: + del self.locks[tmp_hash] + self._delete_temporary_run_data(tmp_hash) + + @final + def temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None: + with self.locks[tmp_hash]: + self._temp_store_finished_example(tmp_hash, example_id) @abstractmethod def run_overview(self, run_id: str) -> Optional[RunOverview]: @@ -68,6 +111,13 @@ def store_example_output(self, example_output: ExampleOutput[Output]) -> None: """ ... + @final + def store_example_output_parallel( + self, tmp_hash: str, example_output: ExampleOutput[Output] + ) -> None: + with self.locks[tmp_hash]: + self.store_example_output(example_output) + @abstractmethod def example_output( self, run_id: str, example_id: str, output_type: type[Output] @@ -84,32 +134,6 @@ def example_output( """ ... - @abstractmethod - def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]: - """Returns an :class:`Optional[Tracer]` for the given run ID and example ID. - - Args: - run_id: The ID of the linked run overview. - example_id: ID of the example whose :class:`Tracer` should be retrieved. - - Returns: - A :class:`Tracer` if it was found, `None` otherwise. - """ - ... - - @abstractmethod - def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer: - """Creates and returns a :class:`Tracer` for the given run ID and example ID. - - Args: - run_id: The ID of the linked run overview. - example_id: ID of the example whose :class:`Tracer` should be retrieved. - - Returns: - A :.class:`Tracer`. - """ - ... - @abstractmethod def example_outputs( self, run_id: str, output_type: type[Output] @@ -166,3 +190,29 @@ def failed_example_outputs( """ results = self.example_outputs(run_id, output_type) return (r for r in results if isinstance(r.output, FailedExampleRun)) + + @abstractmethod + def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]: + """Returns an :class:`Optional[Tracer]` for the given run ID and example ID. + + Args: + run_id: The ID of the linked run overview. + example_id: ID of the example whose :class:`Tracer` should be retrieved. + + Returns: + A :class:`Tracer` if it was found, `None` otherwise. + """ + ... + + @abstractmethod + def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer: + """Creates and returns a :class:`Tracer` for the given run ID and example ID. + + Args: + run_id: The ID of the linked run overview. + example_id: ID of the example whose :class:`Tracer` should be retrieved. + + Returns: + A :.class:`Tracer`. + """ + ... diff --git a/src/intelligence_layer/evaluation/run/runner.py b/src/intelligence_layer/evaluation/run/runner.py index 20d96d790..a8b74dff5 100644 --- a/src/intelligence_layer/evaluation/run/runner.py +++ b/src/intelligence_layer/evaluation/run/runner.py @@ -1,3 +1,4 @@ +import concurrent.futures from collections.abc import Iterable from concurrent.futures import ThreadPoolExecutor from inspect import get_annotations @@ -6,7 +7,6 @@ from uuid import uuid4 from pydantic import JsonValue -from tqdm import tqdm from intelligence_layer.connectors.base.json_serializable import ( SerializableDict, @@ -75,6 +75,9 @@ def input_type(self) -> type[Input]: ) from None return cast(type[Input], input_type) + def _run_hash(self, dataset_id: str, run_description: str) -> str: + return str(hash(dataset_id + self.description + run_description)) + def run_dataset( self, dataset_id: str, @@ -86,6 +89,7 @@ def run_dataset( trace_examples_individually: bool = True, labels: Optional[set[str]] = None, metadata: Optional[SerializableDict] = None, + resume_from_recovery_data: bool = False, ) -> RunOverview: """Generates all outputs for the provided dataset. @@ -104,6 +108,7 @@ def run_dataset( trace_examples_individually: Flag to create individual tracers for each example. Defaults to True. labels: A list of labels for filtering. Defaults to an empty list. metadata: A dict for additional information about the run overview. Defaults to an empty dict. + resume_from_recovery_data: Flag to resume if execution failed previously. Returns: An overview of the run. Outputs will not be returned but instead stored in the @@ -114,9 +119,31 @@ def run_dataset( if metadata is None: metadata = dict() + run_id = str(uuid4()) + tmp_hash = self._run_hash(dataset_id, description or "") + + recovery_data = self._run_repository.finished_examples(tmp_hash) + finished_examples: frozenset[str] = frozenset() + if recovery_data is not None and resume_from_recovery_data: + run_id = recovery_data.run_id + finished_examples = frozenset(recovery_data.finished_examples) + else: + self._run_repository.create_temporary_run_data(tmp_hash, run_id) + + examples = self._dataset_repository.examples( + dataset_id, + self.input_type(), + JsonValue, # type: ignore + examples_to_skip=finished_examples, + ) + if examples is None: + raise ValueError(f"Dataset with id {dataset_id} not found") + if num_examples: + examples = islice(examples, num_examples) + def run( example: Example[Input, ExpectedOutput], - ) -> tuple[str, Output | FailedExampleRun]: + ) -> None: if trace_examples_individually: example_tracer = self._run_repository.create_tracer_for_example( run_id, example.id @@ -127,59 +154,61 @@ def run( example_tracer = tracer else: example_tracer = NoOpTracer() + + output: Output | FailedExampleRun try: - return example.id, self._task.run(example.input, example_tracer) + output = self._task.run(example.input, example_tracer) except Exception as e: if abort_on_error: raise e print( f'FAILED RUN: example "{example.id}", {type(e).__qualname__}: "{e}"' ) - return example.id, FailedExampleRun.from_exception(e) + output = FailedExampleRun.from_exception(e) - # mypy does not like union types + self._run_repository.store_example_output_parallel( + tmp_hash, + ExampleOutput[Output]( + run_id=run_id, example_id=example.id, output=output + ), + ) + self._run_repository.temp_store_finished_example(tmp_hash, example.id) - examples = self._dataset_repository.examples( - dataset_id, - self.input_type(), - JsonValue, # type: ignore - ) - if examples is None: - raise ValueError(f"Dataset with id {dataset_id} not found") - if num_examples: - examples = islice(examples, num_examples) - run_id = str(uuid4()) start = utc_now() + with ThreadPoolExecutor(max_workers=max_workers) as executor: - ids_and_outputs = tqdm(executor.map(run, examples), desc="Running") - - failed_count = 0 - successful_count = 0 - for example_id, output in ids_and_outputs: - if isinstance(output, FailedExampleRun): - failed_count += 1 - else: - successful_count += 1 - self._run_repository.store_example_output( - ExampleOutput[Output]( - run_id=run_id, example_id=example_id, output=output - ), - ) + futures = [executor.submit(run, example) for example in examples] + concurrent.futures.wait(futures) + for future in futures: + future.result() # result of the futures must be retrieved for exceptions to be raised + self._run_repository.delete_temporary_run_data(tmp_hash) + full_description = ( self.description + " : " + description if description else self.description ) + successful = 0 + failed = 0 + for example_output in self._run_repository.example_outputs( + run_id, self.output_type() + ): + if isinstance(example_output.output, FailedExampleRun): + failed += 1 + else: + successful += 1 + run_overview = RunOverview( dataset_id=dataset_id, id=run_id, start=start, end=utc_now(), - failed_example_count=failed_count, - successful_example_count=successful_count, + failed_example_count=failed, + successful_example_count=successful, description=full_description, labels=labels, metadata=metadata, ) + self._run_repository.store_run_overview(run_overview) return run_overview diff --git a/tests/evaluation/dataset/test_dataset_repository.py b/tests/evaluation/dataset/test_dataset_repository.py index c92fc73cd..a348928ed 100644 --- a/tests/evaluation/dataset/test_dataset_repository.py +++ b/tests/evaluation/dataset/test_dataset_repository.py @@ -318,6 +318,37 @@ def test_examples_returns_all_examples_sorted_by_their_id( assert stored_examples == sorted(examples, key=lambda example: example.id) +@mark.parametrize( + "repository_fixture", + test_repository_fixtures, +) +def test_examples_skips_blacklisted_examples( + repository_fixture: str, + request: FixtureRequest, +) -> None: + dataset_repository: DatasetRepository = request.getfixturevalue(repository_fixture) + examples = [ + Example( + input=DummyStringInput(), + expected_output=DummyStringOutput(), + ) + for _ in range(0, 10) + ] + examples_to_skip = frozenset(example.id for example in examples[2:5]) + dataset = dataset_repository.create_dataset( + examples=examples, dataset_name="test-dataset" + ) + + retrieved_examples = list( + dataset_repository.examples( + dataset.id, DummyStringInput, DummyStringOutput, examples_to_skip + ) + ) + + assert len(retrieved_examples) == len(examples) - len(examples_to_skip) + assert all(example.id not in examples_to_skip for example in retrieved_examples) + + @mark.parametrize( "repository_fixture", test_repository_fixtures, diff --git a/tests/evaluation/dataset/test_hugging_face_dataset_repository.py b/tests/evaluation/dataset/test_hugging_face_dataset_repository.py index fd8ec5958..9a417ba77 100644 --- a/tests/evaluation/dataset/test_hugging_face_dataset_repository.py +++ b/tests/evaluation/dataset/test_hugging_face_dataset_repository.py @@ -1,5 +1,6 @@ from collections.abc import Iterable, Sequence from pathlib import Path +from typing import Optional from unittest.mock import patch from uuid import uuid4 @@ -25,9 +26,12 @@ def examples( dataset_id: str, input_type: type[Input], expected_output_type: type[ExpectedOutput], + examples_to_skip: Optional[frozenset[str]] = None, ) -> Iterable[Example[Input, ExpectedOutput]]: self.counter += 1 - return super().examples(dataset_id, input_type, expected_output_type) + return super().examples( + dataset_id, input_type, expected_output_type, examples_to_skip + ) @fixture diff --git a/tests/evaluation/run/test_file_run_repository.py b/tests/evaluation/run/test_file_run_repository.py index 600922879..3bb88c88a 100644 --- a/tests/evaluation/run/test_file_run_repository.py +++ b/tests/evaluation/run/test_file_run_repository.py @@ -1,5 +1,6 @@ import contextlib +import pytest from pydantic import BaseModel from intelligence_layer.evaluation.run.file_run_repository import FileRunRepository @@ -31,6 +32,7 @@ def test_run_overview_does_not_create_a_folder( assert not file_run_repository._run_root_directory().exists() +@pytest.mark.filterwarnings("ignore::UserWarning") def test_example_runs_does_not_create_a_folder( file_run_repository: FileRunRepository, ) -> None: @@ -42,6 +44,7 @@ def test_example_runs_does_not_create_a_folder( assert not file_run_repository._run_root_directory().exists() +@pytest.mark.filterwarnings("ignore::UserWarning") def test_example_run_does_not_create_a_folder( file_run_repository: FileRunRepository, ) -> None: diff --git a/tests/evaluation/run/test_run_repository.py b/tests/evaluation/run/test_run_repository.py index 3bdba5ea8..6b05b5147 100644 --- a/tests/evaluation/run/test_run_repository.py +++ b/tests/evaluation/run/test_run_repository.py @@ -9,6 +9,7 @@ from intelligence_layer.core import CompositeTracer, InMemoryTracer, utc_now from intelligence_layer.evaluation import ExampleOutput, RunOverview, RunRepository from intelligence_layer.evaluation.run.domain import FailedExampleRun +from intelligence_layer.evaluation.run.run_repository import RecoveryData from tests.evaluation.conftest import DummyStringInput test_repository_fixtures = [ @@ -62,6 +63,7 @@ def test_run_repository_stores_and_returns_example_output( "repository_fixture", test_repository_fixtures, ) +@pytest.mark.filterwarnings("ignore::UserWarning") def test_example_output_returns_none_for_not_existing_example_id( repository_fixture: str, request: FixtureRequest, @@ -82,6 +84,7 @@ def test_example_output_returns_none_for_not_existing_example_id( "repository_fixture", test_repository_fixtures, ) +@pytest.mark.filterwarnings("ignore::UserWarning") def test_example_output_returns_none_for_not_existing_run_id( repository_fixture: str, request: FixtureRequest, @@ -92,12 +95,16 @@ def test_example_output_returns_none_for_not_existing_run_id( example_output = ExampleOutput(run_id=run_id, example_id=example_id, output=None) run_repository.store_example_output(example_output) - with pytest.raises(ValueError): + assert ( run_repository.example_output("not-existing-run-id", example_id, type(None)) - with pytest.raises(ValueError): + is None + ) + assert ( run_repository.example_output( "not-existing-run-id", "not-existing-example-id", type(None) ) + is None + ) @mark.parametrize( @@ -304,3 +311,37 @@ def test_successful_example_outputs_returns_only_successful_examples( assert len(successful_outputs) == 1 assert successful_outputs[0].example_id == "2" + + +@mark.parametrize( + "repository_fixture", + test_repository_fixtures, +) +def test_recovery_from_crash( + repository_fixture: str, + request: FixtureRequest, +) -> None: + run_repository: RunRepository = request.getfixturevalue(repository_fixture) + test_hash = str(uuid4()) + run_id = str(uuid4()) + + expected_recovery_data = RecoveryData( + run_id=run_id, finished_examples=[str(uuid4()), str(uuid4())] + ) + + # Create + run_repository.create_temporary_run_data(test_hash, run_id) + + # Write + for example_id in expected_recovery_data.finished_examples: + run_repository.temp_store_finished_example( + tmp_hash=test_hash, example_id=example_id + ) + + # Read + finished_examples = run_repository.finished_examples(test_hash) + assert finished_examples == expected_recovery_data + + # Delete + run_repository.delete_temporary_run_data(test_hash) + assert run_repository.finished_examples(test_hash) is None diff --git a/tests/evaluation/run/test_runner.py b/tests/evaluation/run/test_runner.py index f3e32eb06..3ac71e205 100644 --- a/tests/evaluation/run/test_runner.py +++ b/tests/evaluation/run/test_runner.py @@ -1,4 +1,4 @@ -from collections.abc import Iterable +from collections.abc import Iterable, Sequence import pytest @@ -12,6 +12,7 @@ InMemoryRunRepository, Runner, ) +from intelligence_layer.evaluation.run.file_run_repository import FileRunRepository from tests.evaluation.conftest import FAIL_IN_TASK_INPUT, DummyTask @@ -97,6 +98,55 @@ def test_runner_aborts_on_error( runner.run_dataset(dataset_id, abort_on_error=True) +def test_runner_resumes_after_error_in_task( + in_memory_dataset_repository: InMemoryDatasetRepository, + file_run_repository: FileRunRepository, + sequence_examples: Iterable[Example[str, None]], +) -> None: + task = DummyTask() + runner = Runner( + task, in_memory_dataset_repository, file_run_repository, "dummy-runner" + ) + + dataset_id = in_memory_dataset_repository.create_dataset( + examples=sequence_examples, dataset_name="test-dataset" + ).id + + fail_example_id = "" + for example in sequence_examples: + if example.input != FAIL_IN_TASK_INPUT: + continue + fail_example_id = example.id + assert fail_example_id != "" + + run_description = "my_run" + tmp_hash = runner._run_hash(dataset_id, run_description) + + with pytest.raises(RuntimeError): + runner.run_dataset(dataset_id, abort_on_error=True, description=run_description) + + recovery_data = file_run_repository.finished_examples(tmp_hash) + assert recovery_data + assert fail_example_id not in recovery_data.finished_examples + + examples: Sequence[Example[str, None]] = ( + in_memory_dataset_repository._datasets_and_examples[dataset_id][1] # type: ignore + ) + for example in examples: + if example.input == FAIL_IN_TASK_INPUT: + example.input = "do_not_fail_me" + + runner.run_dataset( + dataset_id, + abort_on_error=True, + description=run_description, + resume_from_recovery_data=True, + ) + assert file_run_repository.finished_examples(tmp_hash) is None + + # TODO : we are not yet correctly tracking the number of failed and successful example counts + + def test_runner_runs_n_examples( in_memory_dataset_repository: InMemoryDatasetRepository, in_memory_run_repository: InMemoryRunRepository, diff --git a/tests/examples/classify/test_prompt_based_classify.py b/tests/examples/classify/test_prompt_based_classify.py index 4c9e0613a..acebfc209 100644 --- a/tests/examples/classify/test_prompt_based_classify.py +++ b/tests/examples/classify/test_prompt_based_classify.py @@ -288,6 +288,7 @@ def test_can_aggregate_evaluations( assert aggregation_overview.statistics.percentage_correct == 0.5 +@pytest.mark.filterwarnings("ignore::UserWarning") def test_aggregating_evaluations_works_with_empty_list( classify_evaluator: Evaluator[ ClassifyInput,