Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Phs 520 resume after failed #918

Merged
merged 11 commits into from
Jun 19, 2024
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
## Unreleased

### Breaking Changes
...
- `RunRepository.example_output` now returns `None` and prints a warning when there is no associated record for the given `run_id` instead of raising a `ValueError`.
- `RunRepository.example_outputs` now returns an empty list and prints a warning when there is no associated record for the given `run_id` instead of raising a `ValueError`.

### Features
...
- `Runner.run_dataset` can now be resumed after failure by setting the `resume_from_recovery_data` flag to `True` and calling `Runner.run_dataset` again.
- For `InMemoryRunRepository` based `Runner`s this is limited to runs that failed with an exception that did not crash the whole process/kernel.
- For `FileRunRepository` based `Runners` even runs that crashed the whole process can be resumed.
- `DatasetRepository.examples` now accepts an optional parameter `examples_to_skip` to enable skipping of `Example`s with the provided IDs.

### Fixes
...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,15 @@ def examples(
dataset_id: str,
input_type: type[Input],
expected_output_type: type[ExpectedOutput],
examples_to_skip: Optional[frozenset[str]] = None,
) -> Iterable[Example[Input, ExpectedOutput]]:
"""Returns all :class:`Example`s for the given dataset ID sorted by their ID.

Args:
dataset_id: Dataset ID whose examples should be retrieved.
input_type: Input type of the example.
expected_output_type: Expected output type of the example.
examples_to_skip: Optional list of example IDs. Those examples will be excluded from the output.

Returns:
:class:`Iterable` of :class`Example`s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ def examples(
dataset_id: str,
input_type: type[Input],
expected_output_type: type[ExpectedOutput],
examples_to_skip: Optional[frozenset[str]] = None,
) -> Iterable[Example[Input, ExpectedOutput]]:
examples_to_skip = examples_to_skip or frozenset()
example_path = self.path_to_str(self._dataset_examples_path(dataset_id))
if not self._file_system.exists(example_path):
raise ValueError(
Expand All @@ -118,12 +120,13 @@ def examples(
example_path, "r", encoding="utf-8"
) as examples_file:
# Mypy does not accept dynamic types
examples = [
Example[input_type, expected_output_type].model_validate_json( # type: ignore
json_data=example
)
for example in examples_file
]
examples = []
for example in examples_file:
current_example = Example[
input_type, expected_output_type # type: ignore
].model_validate_json(json_data=example)
if current_example.id not in examples_to_skip:
examples.append(current_example)

return sorted(examples, key=lambda example: example.id)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,21 @@ def examples(
dataset_id: str,
input_type: type[Input],
expected_output_type: type[ExpectedOutput],
examples_to_skip: Optional[frozenset[str]] = None,
) -> Iterable[Example[Input, ExpectedOutput]]:
examples_to_skip = examples_to_skip or frozenset()
if dataset_id not in self._datasets_and_examples:
raise ValueError(
f"Repository does not contain a dataset with id: {dataset_id}"
)
return cast(
Iterable[Example[Input, ExpectedOutput]],
sorted(
self._datasets_and_examples[dataset_id][1],
[
example
for example in self._datasets_and_examples[dataset_id][1]
if example.id not in examples_to_skip
],
key=lambda example: example.id,
),
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections.abc import Iterable, Sequence
from typing import cast
from typing import Optional, cast

from datasets import Dataset as HFDataset # type: ignore
from datasets import DatasetDict, IterableDataset, IterableDatasetDict
Expand Down Expand Up @@ -71,18 +71,21 @@ def examples(
dataset_id: str,
input_type: type[Input],
expected_output_type: type[ExpectedOutput],
examples_to_skip: Optional[frozenset[str]] = None,
) -> Iterable[Example[Input, ExpectedOutput]]:
examples_to_skip = examples_to_skip or frozenset()
answers = "ABCD"
assert input_type == MultipleChoiceInput
assert expected_output_type == str
for index, sample in enumerate(self._huggingface_dataset["test"]):
yield Example(
input=cast(
Input,
MultipleChoiceInput(
question=sample["question"], choices=sample["choices"]
if str(index) not in examples_to_skip:
yield Example(
input=cast(
Input,
MultipleChoiceInput(
question=sample["question"], choices=sample["choices"]
),
),
),
expected_output=cast(ExpectedOutput, answers[sample["answer"]]),
id=str(index),
)
expected_output=cast(ExpectedOutput, answers[sample["answer"]]),
id=str(index),
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ def read_utf8(self, path: Path) -> str:
str, self._file_system.read_text(self.path_to_str(path), encoding="utf-8")
)

def remove_file(self, path: Path) -> None:
self._file_system.rm_file(path)

def exists(self, path: Path) -> bool:
return cast(bool, self._file_system.exists(self.path_to_str(path)))

Expand Down
72 changes: 58 additions & 14 deletions src/intelligence_layer/evaluation/run/file_run_repository.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import warnings
from collections.abc import Iterable, Sequence
from pathlib import Path
from typing import Optional

from fsspec import AbstractFileSystem # type: ignore
from fsspec.implementations.local import LocalFileSystem # type: ignore

from intelligence_layer.core import FileTracer, InMemoryTracer, JsonSerializer, Output
Expand All @@ -10,10 +12,16 @@
FileSystemBasedRepository,
)
from intelligence_layer.evaluation.run.domain import ExampleOutput, RunOverview
from intelligence_layer.evaluation.run.run_repository import RunRepository
from intelligence_layer.evaluation.run.run_repository import RecoveryData, RunRepository


class FileSystemRunRepository(RunRepository, FileSystemBasedRepository):
TMP_FILE_TYPE: str = "tmp"

def __init__(self, file_system: AbstractFileSystem, root_directory: Path) -> None:
FileSystemBasedRepository.__init__(self, file_system, root_directory)
RunRepository.__init__(self)

def store_run_overview(self, overview: RunOverview) -> None:
self.write_utf8(
self._run_overview_path(overview.id),
Expand All @@ -23,6 +31,38 @@ def store_run_overview(self, overview: RunOverview) -> None:
# create empty folder just in case no examples are ever saved
self.mkdir(self._run_directory(overview.id))

def _tmp_file_path(self, tmp_hash: str) -> Path:
return self._run_directory(tmp_hash + "." + self.TMP_FILE_TYPE)

def _create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None:
self.write_utf8(
self._tmp_file_path(tmp_hash),
RecoveryData(run_id=run_id).model_dump_json(),
create_parents=True,
)

def _delete_temporary_run_data(self, tmp_hash: str) -> None:
self.remove_file(self._tmp_file_path(tmp_hash))

def _temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None:
data = RecoveryData.model_validate_json(
self.read_utf8(self._tmp_file_path(tmp_hash))
)
data.finished_examples.append(example_id)
self.write_utf8(
self._tmp_file_path(tmp_hash),
data.model_dump_json(),
create_parents=True,
)

def finished_examples(self, tmp_hash: str) -> Optional[RecoveryData]:
try:
return RecoveryData.model_validate_json(
self.read_utf8(self._tmp_file_path(tmp_hash))
)
except FileNotFoundError:
return None

def run_overview(self, run_id: str) -> Optional[RunOverview]:
file_path = self._run_overview_path(run_id)
if not self.exists(file_path):
Expand All @@ -46,32 +86,26 @@ def example_output(
self, run_id: str, example_id: str, output_type: type[Output]
) -> Optional[ExampleOutput[Output]]:
file_path = self._example_output_path(run_id, example_id)
if not self.exists(file_path.parent):
raise ValueError(f"Repository does not contain a run with id: {run_id}")
if not self.exists(file_path):
warnings.warn(
f'Repository does not contain a run with id: "{run_id}"', UserWarning
)
return None
content = self.read_utf8(file_path)
# mypy does not accept dynamic types
return ExampleOutput[output_type].model_validate_json( # type: ignore
json_data=content
)

def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]:
file_path = self._example_trace_path(run_id, example_id)
if not self.exists(file_path):
return None
return self._parse_log(file_path)

def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer:
file_path = self._example_trace_path(run_id, example_id)
return FileTracer(file_path)

def example_outputs(
self, run_id: str, output_type: type[Output]
) -> Iterable[ExampleOutput[Output]]:
path = self._run_output_directory(run_id)
if not self.exists(path):
raise ValueError(f"Repository does not contain a run with id: {run_id}")
warnings.warn(
f'Repository does not contain a run with id: "{run_id}"', UserWarning
)
return []

example_outputs = []
for file_name in self.file_names(path):
Expand Down Expand Up @@ -102,6 +136,16 @@ def _run_output_directory(self, run_id: str) -> Path:
def _run_overview_path(self, run_id: str) -> Path:
return self._run_directory(run_id).with_suffix(".json")

def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]:
file_path = self._example_trace_path(run_id, example_id)
if not self.exists(file_path):
return None
return self._parse_log(file_path)

def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer:
file_path = self._example_trace_path(run_id, example_id)
return FileTracer(file_path)

def _trace_directory(self, run_id: str) -> Path:
path = self._run_directory(run_id) / "trace"
return path
Expand Down
49 changes: 35 additions & 14 deletions src/intelligence_layer/evaluation/run/in_memory_run_repository.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,44 @@
import warnings
from collections import defaultdict
from collections.abc import Iterable, Sequence
from typing import Optional, cast

from intelligence_layer.core import InMemoryTracer, Output, PydanticSerializable
from intelligence_layer.core.tracer.tracer import Tracer
from intelligence_layer.evaluation.run.domain import ExampleOutput, RunOverview
from intelligence_layer.evaluation.run.run_repository import RunRepository
from intelligence_layer.evaluation.run.run_repository import RecoveryData, RunRepository


class InMemoryRunRepository(RunRepository):
def __init__(self) -> None:
super().__init__()
self._example_outputs: dict[str, list[ExampleOutput[PydanticSerializable]]] = (
defaultdict(list)
)
self._example_traces: dict[str, Tracer] = dict()
self._run_overviews: dict[str, RunOverview] = dict()
self._recovery_data: dict[str, RecoveryData] = dict()

def store_run_overview(self, overview: RunOverview) -> None:
self._run_overviews[overview.id] = overview
if overview.id not in self._example_outputs:
self._example_outputs[overview.id] = []

def _create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None:
self._recovery_data[tmp_hash] = RecoveryData(run_id=run_id)

def _delete_temporary_run_data(self, tmp_hash: str) -> None:
del self._recovery_data[tmp_hash]

def _temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None:
self._recovery_data[tmp_hash].finished_examples.append(example_id)

def finished_examples(self, tmp_hash: str) -> Optional[RecoveryData]:
if tmp_hash in self._recovery_data:
return self._recovery_data[tmp_hash]
else:
return None

def run_overview(self, run_id: str) -> Optional[RunOverview]:
return self._run_overviews.get(run_id, None)

Expand All @@ -36,29 +54,24 @@ def example_output(
self, run_id: str, example_id: str, output_type: type[Output]
) -> Optional[ExampleOutput[Output]]:
if run_id not in self._example_outputs:
raise ValueError(f"Repository does not contain a run with id: {run_id}")

if run_id not in self._example_outputs:
warnings.warn(
f'Repository does not contain a run with id: "{run_id}"', UserWarning
)
return None

for example_output in self._example_outputs[run_id]:
if example_output.example_id == example_id:
return cast(ExampleOutput[Output], example_output)
return None

def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]:
return self._example_traces.get(f"{run_id}/{example_id}")

def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer:
tracer = InMemoryTracer()
self._example_traces[f"{run_id}/{example_id}"] = tracer
return tracer

def example_outputs(
self, run_id: str, output_type: type[Output]
) -> Iterable[ExampleOutput[Output]]:
if run_id not in self._run_overviews:
raise ValueError(f"Repository does not contain a run with id: {run_id}")
if run_id not in self._example_outputs and run_id not in self._run_overviews:
warnings.warn(
f'Repository does not contain a run with id: "{run_id}"', UserWarning
)
return []

return (
cast(ExampleOutput[Output], example_output)
Expand All @@ -75,3 +88,11 @@ def example_output_ids(self, run_id: str) -> Sequence[str]:
for example_output in self._example_outputs[run_id]
]
)

def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]:
return self._example_traces.get(f"{run_id}/{example_id}")

def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer:
tracer = InMemoryTracer()
self._example_traces[f"{run_id}/{example_id}"] = tracer
return tracer
Loading