Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Enable mypy for the HuggingFace connectors. Increase unit test code coverage. #7176

Merged
merged 4 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions python/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ ignore_errors = true
ignore_errors = true
# TODO (eavanvalkenburg): remove this: https://github.com/microsoft/semantic-kernel/issues/7132

[mypy-semantic_kernel.connectors.ai.hugging_face.*]
ignore_errors = true
# TODO (eavanvalkenburg): remove this: https://github.com/microsoft/semantic-kernel/issues/7133

[mypy-semantic_kernel.connectors.ai.ollama.*]
ignore_errors = true
# TODO (eavanvalkenburg): remove this: https://github.com/microsoft/semantic-kernel/issues/7134
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
# Copyright (c) Microsoft. All rights reserved.

import logging
import sys
from collections.abc import AsyncGenerator
from threading import Thread
from typing import TYPE_CHECKING, Any, Literal
from typing import Any, Literal

if sys.version_info >= (3, 12):
from typing import override
moonbox3 marked this conversation as resolved.
Show resolved Hide resolved
else:
from typing_extensions import override # pragma: no cover

import torch
from transformers import AutoTokenizer, TextIteratorStreamer, pipeline

from semantic_kernel.connectors.ai.hugging_face.hf_prompt_execution_settings import HuggingFacePromptExecutionSettings
from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings
from semantic_kernel.connectors.ai.text_completion_client_base import TextCompletionClientBase
from semantic_kernel.contents.streaming_text_content import StreamingTextContent
from semantic_kernel.contents.text_content import TextContent
from semantic_kernel.exceptions import ServiceInvalidExecutionSettingsError, ServiceResponseException

if TYPE_CHECKING:
from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings

logger: logging.Logger = logging.getLogger(__name__)


Expand All @@ -29,7 +33,7 @@ def __init__(
self,
ai_model_id: str,
task: str | None = "text2text-generation",
device: int | None = -1,
device: int = -1,
service_id: str | None = None,
model_kwargs: dict[str, Any] | None = None,
pipeline_kwargs: dict[str, Any] | None = None,
Expand All @@ -39,22 +43,21 @@ def __init__(
Args:
ai_model_id (str): Hugging Face model card string, see
https://huggingface.co/models
device (Optional[int]): Device to run the model on, defaults to CPU, 0+ for GPU,
-- None if using device_map instead. (If both device and device_map
are specified, device overrides device_map. If unintended,
it can lead to unexpected behavior.)
service_id (Optional[str]): Service ID for the AI service.
task (Optional[str]): Model completion task type, options are:
device (int): Device to run the model on, defaults to CPU, 0+ for GPU,
-- None if using device_map instead. (If both device and device_map
are specified, device overrides device_map. If unintended,
it can lead to unexpected behavior.) (optional)
service_id (str): Service ID for the AI service. (optional)
task (str): Model completion task type, options are:
- summarization: takes a long text and returns a shorter summary.
- text-generation: takes incomplete text and returns a set of completion candidates.
- text2text-generation (default): takes an input prompt and returns a completion.
text2text-generation is the default as it behaves more like GPT-3+.
log : Logger instance. (Deprecated)
model_kwargs (Optional[Dict[str, Any]]): Additional dictionary of keyword arguments
passed along to the model's `from_pretrained(..., **model_kwargs)` function.
pipeline_kwargs (Optional[Dict[str, Any]]): Additional keyword arguments passed along
text2text-generation is the default as it behaves more like GPT-3+. (optional)
model_kwargs (dict[str, Any]): Additional dictionary of keyword arguments
passed along to the model's `from_pretrained(..., **model_kwargs)` function. (optional)
pipeline_kwargs (dict[str, Any]): Additional keyword arguments passed along
to the specific pipeline init (see the documentation for the corresponding pipeline class
for possible values).
for possible values). (optional)

Note that this model will be downloaded from the Hugging Face model hub.
"""
Expand All @@ -65,18 +68,19 @@ def __init__(
model_kwargs=model_kwargs,
**pipeline_kwargs or {},
)
resolved_device = f"cuda:{device}" if device >= 0 and torch.cuda.is_available() else "cpu"
super().__init__(
service_id=service_id,
ai_model_id=ai_model_id,
task=task,
device=(f"cuda:{device}" if device >= 0 and torch.cuda.is_available() else "cpu"),
device=resolved_device,
generator=generator,
)

async def get_text_contents(
self,
prompt: str,
settings: HuggingFacePromptExecutionSettings,
settings: PromptExecutionSettings,
) -> list[TextContent]:
"""This is the method that is called from the kernel to get a response from a text-optimized LLM.

Expand All @@ -88,9 +92,12 @@ async def get_text_contents(
List[TextContent]: A list of TextContent objects representing the response(s) from the LLM.
"""
try:
if not isinstance(settings, HuggingFacePromptExecutionSettings):
moonbox3 marked this conversation as resolved.
Show resolved Hide resolved
settings = self.get_prompt_execution_settings_from_settings(settings)
assert isinstance(settings, HuggingFacePromptExecutionSettings) # nosec
results = self.generator(prompt, **settings.prepare_settings_dict())
except Exception as e:
raise ServiceResponseException("Hugging Face completion failed", e) from e
raise ServiceResponseException("Hugging Face completion failed") from e
if isinstance(results, list):
return [self._create_text_content(results, result) for result in results]
return [self._create_text_content(results, results)]
Expand All @@ -105,7 +112,7 @@ def _create_text_content(self, response: Any, candidate: dict[str, str]) -> Text
async def get_streaming_text_contents(
self,
prompt: str,
settings: HuggingFacePromptExecutionSettings,
settings: PromptExecutionSettings,
) -> AsyncGenerator[list[StreamingTextContent], Any]:
"""Streams a text completion using a Hugging Face model.

Expand All @@ -118,6 +125,10 @@ async def get_streaming_text_contents(
Yields:
List[StreamingTextContent]: List of StreamingTextContent objects.
"""
if not isinstance(settings, HuggingFacePromptExecutionSettings):
settings = self.get_prompt_execution_settings_from_settings(settings)
assert isinstance(settings, HuggingFacePromptExecutionSettings) # nosec

if settings.num_return_sequences > 1:
raise ServiceInvalidExecutionSettingsError(
"HuggingFace TextIteratorStreamer does not stream multiple responses in a parseable format. \
Expand All @@ -139,10 +150,10 @@ async def get_streaming_text_contents(
]

thread.join()

except Exception as e:
raise ServiceResponseException("Hugging Face completion failed", e) from e
raise ServiceResponseException("Hugging Face completion failed") from e

def get_prompt_execution_settings_class(self) -> "PromptExecutionSettings":
@override
def get_prompt_execution_settings_class(self) -> type["PromptExecutionSettings"]:
"""Create a request settings object."""
return HuggingFacePromptExecutionSettings
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
if sys.version_info >= (3, 12):
from typing import override
else:
from typing_extensions import override
from typing_extensions import override # pragma: no cover

import sentence_transformers
import torch
Expand All @@ -28,16 +28,16 @@ class HuggingFaceTextEmbedding(EmbeddingGeneratorBase):
def __init__(
self,
ai_model_id: str,
device: int | None = -1,
device: int = -1,
service_id: str | None = None,
) -> None:
"""Initializes a new instance of the HuggingFaceTextEmbedding class.

Args:
ai_model_id (str): Hugging Face model card string, see
https://huggingface.co/sentence-transformers
device (Optional[int]): Device to run the model on, -1 for CPU, 0+ for GPU.
service_id (Optional[str]): Service ID for the model.
device (int): Device to run the model on, -1 for CPU, 0+ for GPU. (optional)
service_id (str): Service ID for the model. (optional)

Note that this model will be downloaded from the Hugging Face model hub.
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
# Copyright (c) Microsoft. All rights reserved.

from unittest.mock import Mock, patch
from threading import Thread
from unittest.mock import MagicMock, Mock, patch

import pytest
from transformers import TextIteratorStreamer

from semantic_kernel.connectors.ai.hugging_face.services.hf_text_completion import HuggingFaceTextCompletion
from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings
from semantic_kernel.exceptions import KernelInvokeException, ServiceResponseException
from semantic_kernel.functions.kernel_arguments import KernelArguments
from semantic_kernel.kernel import Kernel
from semantic_kernel.prompt_template.prompt_template_config import PromptTemplateConfig
Expand Down Expand Up @@ -46,8 +49,9 @@ async def test_text_completion(model_name, task, input_str):
# Configure LLM service
with patch("semantic_kernel.connectors.ai.hugging_face.services.hf_text_completion.pipeline") as patched_pipeline:
patched_pipeline.return_value = mock_pipeline
service = HuggingFaceTextCompletion(service_id=model_name, ai_model_id=model_name, task=task)
kernel.add_service(
service=HuggingFaceTextCompletion(service_id=model_name, ai_model_id=model_name, task=task),
service=service,
)

exec_settings = PromptExecutionSettings(service_id=model_name, extension_data={"max_new_tokens": 25})
Expand All @@ -68,3 +72,148 @@ async def test_text_completion(model_name, task, input_str):

await kernel.invoke(function_name="TestFunction", plugin_name="TestPlugin", arguments=arguments)
assert mock_pipeline.call_args.args[0] == input_str


@pytest.mark.asyncio
async def test_text_completion_throws():
kernel = Kernel()

model_name = "patrickvonplaten/t5-tiny-random"
task = "text2text-generation"
input_str = "translate English to Dutch: Hello, how are you?"

with patch("semantic_kernel.connectors.ai.hugging_face.services.hf_text_completion.pipeline") as patched_pipeline:
mock_generator = Mock()
mock_generator.side_effect = Exception("Test exception")
patched_pipeline.return_value = mock_generator
service = HuggingFaceTextCompletion(service_id=model_name, ai_model_id=model_name, task=task)
kernel.add_service(service=service)

exec_settings = PromptExecutionSettings(service_id=model_name, extension_data={"max_new_tokens": 25})

prompt = "{{$input}}"
prompt_template_config = PromptTemplateConfig(template=prompt, execution_settings=exec_settings)

kernel.add_function(
prompt_template_config=prompt_template_config,
function_name="TestFunction",
plugin_name="TestPlugin",
prompt_execution_settings=exec_settings,
)

arguments = KernelArguments(input=input_str)

with pytest.raises(
KernelInvokeException, match="Error occurred while invoking function: 'TestPlugin-TestFunction'"
):
await kernel.invoke(function_name="TestFunction", plugin_name="TestPlugin", arguments=arguments)


@pytest.mark.asyncio
@pytest.mark.parametrize(
("model_name", "task", "input_str"),
[
(
"patrickvonplaten/t5-tiny-random",
"text2text-generation",
"translate English to Dutch: Hello, how are you?",
),
("HuggingFaceM4/tiny-random-LlamaForCausalLM", "text-generation", "Hello, I like sleeping and "),
],
ids=["text2text-generation", "text-generation"],
)
async def test_text_completion_streaming(model_name, task, input_str):
ret = {"summary_text": "test"} if task == "summarization" else {"generated_text": "test"}
mock_pipeline = Mock(return_value=ret)

mock_streamer = MagicMock(spec=TextIteratorStreamer)
mock_streamer.__iter__.return_value = iter(["mocked_text"])

with (
patch(
"semantic_kernel.connectors.ai.hugging_face.services.hf_text_completion.pipeline",
return_value=mock_pipeline,
),
patch(
"semantic_kernel.connectors.ai.hugging_face.services.hf_text_completion.Thread",
side_effect=Mock(spec=Thread),
),
patch(
"semantic_kernel.connectors.ai.hugging_face.services.hf_text_completion.TextIteratorStreamer",
return_value=mock_streamer,
) as mock_stream,
):
mock_stream.return_value = mock_streamer
service = HuggingFaceTextCompletion(service_id=model_name, ai_model_id=model_name, task=task)
prompt = "test prompt"
exec_settings = PromptExecutionSettings(service_id=model_name, extension_data={"max_new_tokens": 25})

result = []
async for content in service.get_streaming_text_contents(prompt, exec_settings):
result.append(content)

assert len(result) == 1
assert result[0][0].inner_content == "mocked_text"


@pytest.mark.asyncio
@pytest.mark.parametrize(
("model_name", "task", "input_str"),
[
(
"patrickvonplaten/t5-tiny-random",
"text2text-generation",
"translate English to Dutch: Hello, how are you?",
),
("HuggingFaceM4/tiny-random-LlamaForCausalLM", "text-generation", "Hello, I like sleeping and "),
],
ids=["text2text-generation", "text-generation"],
)
async def test_text_completion_streaming_throws(model_name, task, input_str):
ret = {"summary_text": "test"} if task == "summarization" else {"generated_text": "test"}
mock_pipeline = Mock(return_value=ret)

mock_streamer = MagicMock(spec=TextIteratorStreamer)
mock_streamer.__iter__.return_value = Exception()

with (
patch(
"semantic_kernel.connectors.ai.hugging_face.services.hf_text_completion.pipeline",
return_value=mock_pipeline,
),
patch(
"semantic_kernel.connectors.ai.hugging_face.services.hf_text_completion.Thread",
side_effect=Exception(),
),
patch(
"semantic_kernel.connectors.ai.hugging_face.services.hf_text_completion.TextIteratorStreamer",
return_value=mock_streamer,
) as mock_stream,
):
mock_stream.return_value = mock_streamer
service = HuggingFaceTextCompletion(service_id=model_name, ai_model_id=model_name, task=task)
prompt = "test prompt"
exec_settings = PromptExecutionSettings(service_id=model_name, extension_data={"max_new_tokens": 25})

with pytest.raises(ServiceResponseException, match=("Hugging Face completion failed")):
async for _ in service.get_streaming_text_contents(prompt, exec_settings):
pass


def test_hugging_face_text_completion_init():
with (
patch("semantic_kernel.connectors.ai.hugging_face.services.hf_text_completion.pipeline") as patched_pipeline,
patch(
"semantic_kernel.connectors.ai.hugging_face.services.hf_text_completion.torch.cuda.is_available"
) as mock_torch_cuda_is_available,
):
patched_pipeline.return_value = patched_pipeline
mock_torch_cuda_is_available.return_value = False

ai_model_id = "test-model"
task = "summarization"
device = -1

service = HuggingFaceTextCompletion(service_id="test", ai_model_id=ai_model_id, task=task, device=device)

assert service is not None
Loading
Loading