Skip to content

Commit

Permalink
feat(evals): return partial results when llm function is interrupted (#…
Browse files Browse the repository at this point in the history
…1755)

* feat(evals): return partial results when llm function is interrupted

* Update src/phoenix/experimental/evals/functions/classify.py

* Update src/phoenix/experimental/evals/functions/classify.py

Co-authored-by: Roger Yang <[email protected]>

* Update src/phoenix/experimental/evals/functions/classify.py

Co-authored-by: Roger Yang <[email protected]>

* refactor tqdm

---------

Co-authored-by: Roger Yang <[email protected]>
  • Loading branch information
mikeldking and RogerHYang authored Nov 16, 2023
1 parent 3f2b8f4 commit 1fb0849
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 78 deletions.
1 change: 1 addition & 0 deletions cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"NDJSON",
"numpy",
"openai",
"openinference",
"pydantic",
"quickstart",
"RERANKER",
Expand Down
66 changes: 39 additions & 27 deletions src/phoenix/experimental/evals/functions/classify.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,39 +100,51 @@ def llm_classify(
prompt_options = PromptOptions(provide_explanation=provide_explanation)
prompts = map_template(dataframe, eval_template, options=prompt_options)

labels: List[str] = []
explanations: List[Optional[str]] = []
labels: List[Optional[str]] = [None] * len(dataframe)
explanations: List[Optional[str]] = [None] * len(dataframe)

printif(verbose, f"Using prompt:\n\n{eval_template.prompt(prompt_options)}")
if generation_info := model.verbose_generation_info():
printif(verbose, generation_info)

for prompt in tqdm(prompts, bar_format=get_tqdm_progress_bar_formatter("llm_classify")):
with set_verbosity(model, verbose) as verbose_model:
response = verbose_model(prompt, instruction=system_instruction, **model_kwargs)
if not use_openai_function_call:
if provide_explanation:
unrailed_label, explanation = (
eval_template.extract_label_from_explanation(response),
response,
)
printif(
verbose and unrailed_label == NOT_PARSABLE,
f"- Could not parse {repr(response)}",
)
# Wrap the loop in a try / catch so that we can still return a dataframe
# even if the process is interrupted
try:
for index, prompt in enumerate(
tqdm(prompts, bar_format=get_tqdm_progress_bar_formatter("llm_classify"))
):
with set_verbosity(model, verbose) as verbose_model:
response = verbose_model(prompt, instruction=system_instruction, **model_kwargs)
if not use_openai_function_call:
if provide_explanation:
unrailed_label, explanation = (
eval_template.extract_label_from_explanation(response),
response,
)
printif(
verbose and unrailed_label == NOT_PARSABLE,
f"- Could not parse {repr(response)}",
)
else:
unrailed_label = response
explanation = None
else:
unrailed_label = response
explanation = None
else:
try:
function_arguments = json.loads(response, strict=False)
unrailed_label = function_arguments.get(_RESPONSE)
explanation = function_arguments.get(_EXPLANATION)
except json.JSONDecodeError:
unrailed_label = response
explanation = None
labels.append(_snap_to_rail(unrailed_label, rails, verbose=verbose))
explanations.append(explanation)
try:
function_arguments = json.loads(response, strict=False)
unrailed_label = function_arguments.get(_RESPONSE)
explanation = function_arguments.get(_EXPLANATION)
except json.JSONDecodeError:
unrailed_label = response
explanation = None
labels[index] = _snap_to_rail(unrailed_label, rails, verbose=verbose)
explanations[index] = explanation
except (Exception, KeyboardInterrupt) as e:
logger.error(e)
print(
"Process was interrupted. The return value will be incomplete",
e,
)

return pd.DataFrame(
data={
"label": labels,
Expand Down
21 changes: 15 additions & 6 deletions src/phoenix/experimental/evals/functions/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,20 @@ def llm_generate(

# For each prompt, generate and parse the response
output = []
for prompt in prompts:
logger.info(f"Prompt: {prompt}")
response = verbose_model(prompt, instruction=system_instruction)
parsed_response = output_parser(response)
output.append(parsed_response)

# Wrap the loop in a try / catch so that we can still return a dataframe
# even if the process is interrupted
try:
for prompt in prompts:
logger.info(f"Prompt: {prompt}")
response = verbose_model(prompt, instruction=system_instruction)
parsed_response = output_parser(response)
output.append(parsed_response)

except (Exception, KeyboardInterrupt) as e:
logger.error(e)
print(
"Process was interrupted. The return value will be incomplete",
e,
)
# Return the data as a dataframe
return pd.DataFrame(output)
115 changes: 70 additions & 45 deletions tests/experimental/evals/functions/test_classify.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from contextlib import ExitStack
from itertools import product
from typing import List
from unittest.mock import MagicMock, patch

import httpx
Expand Down Expand Up @@ -38,6 +38,21 @@ def classification_dataframe():
)


@pytest.fixture
def classification_responses():
return [
"relevant",
"irrelevant",
"relevant",
"irrelevant",
]


@pytest.fixture
def classification_template():
return RAG_RELEVANCY_PROMPT_TEMPLATE


@pytest.mark.respx(base_url="https://api.openai.com/v1/chat/completions")
def test_llm_classify(
classification_dataframe, monkeypatch: pytest.MonkeyPatch, respx_mock: respx.mock
Expand Down Expand Up @@ -263,29 +278,19 @@ def test_llm_classify_shows_retry_info_with_verbose_flag(monkeypatch: pytest.Mon
mock_openai = MagicMock()
mock_openai.side_effect = openai_retry_errors

with ExitStack() as stack:
waiting_fn = "phoenix.experimental.evals.models.base.wait_random_exponential"
stack.enter_context(patch(waiting_fn, return_value=False))
stack.enter_context(patch.object(OpenAIModel, "_init_tiktoken", return_value=None))
stack.enter_context(patch.object(model._client.chat.completions, "create", mock_openai))
stack.enter_context(pytest.raises(model._openai.InternalServerError))
llm_classify(
dataframe=dataframe,
template=RAG_RELEVANCY_PROMPT_TEMPLATE,
model=model,
rails=["relevant", "irrelevant"],
verbose=True,
)
llm_classify(
dataframe=dataframe,
template=RAG_RELEVANCY_PROMPT_TEMPLATE,
model=model,
rails=["relevant", "irrelevant"],
verbose=True,
)

out, _ = capfd.readouterr()
assert "Failed attempt 1" in out, "Retry information should be printed"
assert "Request timed out" in out, "Retry information should be printed"
assert "Failed attempt 2" in out, "Retry information should be printed"
assert "test api error" in out, "Retry information should be printed"
assert "Failed attempt 3" in out, "Retry information should be printed"
assert "test api connection error" in out, "Retry information should be printed"
assert "Failed attempt 4" in out, "Retry information should be printed"
assert "test rate limit error" in out, "Retry information should be printed"
assert "Failed attempt 5" not in out, "Maximum retries should not be exceeded"


Expand Down Expand Up @@ -314,44 +319,30 @@ def test_llm_classify_does_not_persist_verbose_flag(monkeypatch: pytest.MonkeyPa
mock_openai = MagicMock()
mock_openai.side_effect = openai_retry_errors

with ExitStack() as stack:
waiting_fn = "phoenix.experimental.evals.models.base.wait_random_exponential"
stack.enter_context(patch(waiting_fn, return_value=False))
stack.enter_context(patch.object(OpenAIModel, "_init_tiktoken", return_value=None))
stack.enter_context(patch.object(model._client.chat.completions, "create", mock_openai))
stack.enter_context(pytest.raises(model._openai.OpenAIError))
llm_classify(
dataframe=dataframe,
template=RAG_RELEVANCY_PROMPT_TEMPLATE,
model=model,
rails=["relevant", "irrelevant"],
verbose=True,
)
llm_classify(
dataframe=dataframe,
template=RAG_RELEVANCY_PROMPT_TEMPLATE,
model=model,
rails=["relevant", "irrelevant"],
verbose=True,
)

out, _ = capfd.readouterr()
assert "Failed attempt 1" in out, "Retry information should be printed"
assert "Request timed out" in out, "Retry information should be printed"
assert "Failed attempt 2" not in out, "Retry information should be printed"

mock_openai.reset_mock()
mock_openai.side_effect = openai_retry_errors

with ExitStack() as stack:
waiting_fn = "phoenix.experimental.evals.models.base.wait_random_exponential"
stack.enter_context(patch(waiting_fn, return_value=False))
stack.enter_context(patch.object(OpenAIModel, "_init_tiktoken", return_value=None))
stack.enter_context(patch.object(model._client.chat.completions, "create", mock_openai))
stack.enter_context(pytest.raises(model._openai.APIError))
llm_classify(
dataframe=dataframe,
template=RAG_RELEVANCY_PROMPT_TEMPLATE,
model=model,
rails=["relevant", "irrelevant"],
)
llm_classify(
dataframe=dataframe,
template=RAG_RELEVANCY_PROMPT_TEMPLATE,
model=model,
rails=["relevant", "irrelevant"],
)

out, _ = capfd.readouterr()
assert "Failed attempt 1" not in out, "The `verbose` flag should not be persisted"
assert "Request timed out" not in out, "The `verbose` flag should not be persisted"


@pytest.mark.respx(base_url="https://api.openai.com/v1/chat/completions")
Expand Down Expand Up @@ -464,6 +455,40 @@ def test_run_relevance_eval_standard_dataframe(
]


@pytest.mark.respx(base_url="https://api.openai.com/v1/chat/completions", assert_all_called=False)
def test_classify_tolerance_to_exceptions(
classification_dataframe: pd.DataFrame,
classification_responses: List[str],
classification_template: str,
monkeypatch: pytest.MonkeyPatch,
respx_mock: respx.mock,
capfd,
):
with patch.object(OpenAIModel, "_init_tiktoken", return_value=None):
model = OpenAIModel(max_retries=0)
queries = classification_dataframe["input"].tolist()
for query, response in zip(queries, classification_responses):
matcher = M(content__contains=query)
# Simulate an error on the second query
if query == "What is C++?":
response = httpx.Response(500, json={"error": "Internal Server Error"})
else:
response = httpx.Response(200, json={"choices": [{"message": {"content": response}}]})
respx_mock.route(matcher).mock(return_value=response)

classification_df = llm_classify(
dataframe=classification_dataframe,
template=classification_template,
model=model,
rails=["relevant", "irrelevant"],
)

assert classification_df is not None
# Make sure there is a logger.error output
captured = capfd.readouterr()
assert "Process was interrupted" in captured.out


def test_run_relevance_eval_openinference_dataframe(
monkeypatch: pytest.MonkeyPatch,
respx_mock: respx.mock,
Expand Down
55 changes: 55 additions & 0 deletions tests/experimental/evals/functions/test_generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,58 @@ def output_parser(response: str) -> Dict[str, str]:
assert generated["__error__"].tolist() == [np.nan] * 4 + [
"Expecting value: line 1 column 1 (char 0)"
]


@pytest.mark.respx(base_url="https://api.openai.com/v1/chat/completions", assert_all_called=False)
def test_classify_tolerance_to_exceptions(
monkeypatch: pytest.MonkeyPatch, respx_mock: respx.mock, capfd
):
monkeypatch.setenv(OPENAI_API_KEY_ENVVAR_NAME, "sk-0123456789")
with patch.object(OpenAIModel, "_init_tiktoken", return_value=None):
model = OpenAIModel(max_retries=0)
dataframe = pd.DataFrame(
[
{
"query": "What is Python?",
},
{
"query": "What is Python?",
},
{
"query": "What is C++?",
},
{
"query": "What is C++?",
},
{
"query": "gobbledygook",
},
]
)
responses = [
'{ "category": "programming", "language": "Python" }',
'{ "category": "programming", "language": "Python" }',
'{ "category": "programming", "language": "C++" }',
'{ "category": "programming", "language": "C++" }',
"gobbledygook",
]
queries = dataframe["query"].tolist()
for query, response in zip(queries, responses):
matcher = M(content__contains=query)
# Simulate an error on the second query
if query == "What is C++?":
response = httpx.Response(500, json={"error": "Internal Server Error"})
else:
response = httpx.Response(200, json={"choices": [{"message": {"content": response}}]})
respx_mock.route(matcher).mock(return_value=response)

df = llm_generate(
dataframe=dataframe,
template="Given {query}, generate output",
model=model,
)

assert df is not None
# Make sure there is a logger.error output
captured = capfd.readouterr()
assert "Process was interrupted" in captured.out

0 comments on commit 1fb0849

Please sign in to comment.