Skip to content

Commit

Permalink
Prallel Evaluator (#401)
Browse files Browse the repository at this point in the history
* added threadpoolexecutor to evaluator

* fixed type annotations
  • Loading branch information
JohannesWesch authored Jan 22, 2024
1 parent 0ae39c9 commit 538399f
Showing 1 changed file with 61 additions and 29 deletions.
90 changes: 61 additions & 29 deletions src/intelligence_layer/core/evaluation/evaluator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
from typing import (
Callable,
Expand All @@ -8,6 +9,7 @@
Mapping,
Optional,
Sequence,
Tuple,
TypeVar,
cast,
final,
Expand All @@ -16,6 +18,8 @@
)
from uuid import uuid4

from tqdm import tqdm

from intelligence_layer.connectors import ArgillaClient, Field
from intelligence_layer.connectors.argilla.argilla_client import (
ArgillaEvaluation,
Expand Down Expand Up @@ -518,36 +522,64 @@ def load_overview(run_id: str) -> RunOverview:
),
strict=True,
)
for example_outputs in examples_zipped:
if not any(
isinstance(output.output, FailedExampleRun)
for output in example_outputs
):
example_id = example_outputs[0].example_id
assert all(
example_output.example_id == example_id
for example_output in example_outputs
)
example = self._dataset_repository.example(
dataset_id,
example_id,
self.input_type(),
self.expected_output_type(),
)
assert example is not None
self.evaluate(
example,
eval_id,
*[
SuccessfulExampleOutput(
run_id=example_output.run_id,
example_id=example_output.example_id,
output=example_output.output,
)

def generate_evaluation_inputs() -> (
Iterable[
Tuple[
Example[Input, ExpectedOutput],
str,
Sequence[SuccessfulExampleOutput[Output]],
]
]
):
for example_outputs in examples_zipped:
if not any(
isinstance(output.output, FailedExampleRun)
for output in example_outputs
):
example_id = example_outputs[0].example_id
assert all(
example_output.example_id == example_id
for example_output in example_outputs
if not isinstance(example_output.output, FailedExampleRun)
],
)
)

example = self._dataset_repository.example(
dataset_id,
example_id,
self.input_type(),
self.expected_output_type(),
)
assert example is not None

yield (
example,
eval_id,
[
SuccessfulExampleOutput(
run_id=example_output.run_id,
example_id=example_output.example_id,
output=example_output.output,
)
for example_output in example_outputs
if not isinstance(example_output.output, FailedExampleRun)
],
)

def evaluate(
args: Tuple[
Example[Input, ExpectedOutput],
str,
Sequence[SuccessfulExampleOutput[Output]],
]
) -> None:
example, eval_id, example_outputs = args
self.evaluate(example, eval_id, *example_outputs)

with ThreadPoolExecutor(max_workers=10) as executor:
tqdm(
executor.map(evaluate, generate_evaluation_inputs()),
desc="Evaluating",
)

partial_overview = PartialEvaluationOverview(
run_overviews=run_overviews,
Expand Down

0 comments on commit 538399f

Please sign in to comment.