Skip to content

Commit

Permalink
Fixes (#776)
Browse files Browse the repository at this point in the history
Fixes for the 0.9 release: 

* Revert back retires from embedding text component since some models
have their own retry mechanism implemented
[example](https://github.com/langchain-ai/langchain/blob/9b3962fc2521ec0d6ef2ea7c0a40b9c32977671a/libs/community/langchain_community/embeddings/openai.py#L216).
They can be modified through extra arguments
* Reverted back the docker compose error handling since it was blocking
some logs for showing up, the issue should be fleshed out more. Created
a separate ticket for it #775
  • Loading branch information
PhilippeMoussalli authored Jan 12, 2024
1 parent 02a0df6 commit 255548d
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 92 deletions.
2 changes: 0 additions & 2 deletions components/embed_text/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ The component takes the following arguments to alter its behavior:
| model | str | The model to generate embeddings from. Choose an available model name to pass to the model provider's langchain embedding class. | / |
| api_keys | dict | The API keys to use for the model provider that are written to environment variables.Pass only the keys required by the model provider or conveniently pass all keys you will ever need. Pay attention how to name the dictionary keys so that they can be used by the model provider. | / |
| auth_kwargs | dict | Additional keyword arguments required for api initialization/authentication. | / |
| retries | int | Number of retries to attempt when an embedding request fails. | 5 |

<a id="embed_text#usage"></a>
## Usage
Expand All @@ -58,7 +57,6 @@ dataset = dataset.apply(
# "model": ,
# "api_keys": {},
# "auth_kwargs": {},
# "retries": 5,
},
)
```
Expand Down
4 changes: 0 additions & 4 deletions components/embed_text/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ args:
Additional keyword arguments required for api initialization/authentication.
type: dict
default: {}
retries:
description: Number of retries to attempt when an embedding request fails.
type: int
default: 5



6 changes: 1 addition & 5 deletions components/embed_text/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
VertexAIEmbeddings,
)
from langchain.schema.embeddings import Embeddings
from retry import retry

logger = logging.getLogger(__name__)

Expand All @@ -30,12 +29,10 @@ def __init__(
model: str,
api_keys: dict,
auth_kwargs: dict,
retries: int,
**kwargs,
):
to_env_vars(api_keys)

self.retries = retries
self.embedding_model = self.get_embedding_model(
model_provider,
model,
Expand Down Expand Up @@ -65,8 +62,7 @@ def get_embedding_model(

# make sure to keep trying even when api call limit is reached
def get_embeddings_vectors(self, texts):
retry_wrapper = retry(tries=self.retries, logger=logger)
return retry_wrapper(self.embedding_model.embed_documents(texts.tolist()))
return self.embedding_model.embed_documents(texts.tolist())

def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
dataframe["embedding"] = self.get_embeddings_vectors(
Expand Down
1 change: 0 additions & 1 deletion components/embed_text/tests/embed_text_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def test_run_component_test():
model="all-MiniLM-L6-v2",
api_keys={},
auth_kwargs={},
retries=5,
)

dataframe = component.transform(dataframe=dataframe)
Expand Down
4 changes: 0 additions & 4 deletions src/fondant/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,3 @@ class InvalidTypeSchema(ValidationError, FondantException):

class UnsupportedTypeAnnotation(FondantException):
"""Thrown when an unsupported type annotation is encountered during type inference."""


class PipelineRunError(ValidationError, FondantException):
"""Thrown when a pipeline run results in an error."""
13 changes: 2 additions & 11 deletions src/fondant/pipeline/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import yaml

from fondant.core.exceptions import PipelineRunError
from fondant.pipeline import Pipeline
from fondant.pipeline.compiler import (
DockerCompiler,
Expand Down Expand Up @@ -40,23 +39,15 @@ def _run(self, input_spec: str, *args, **kwargs):
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
]

print("Starting pipeline run...")

# copy the current environment with the DOCKER_DEFAULT_PLATFORM argument
output = subprocess.run( # nosec
subprocess.call( # nosec
cmd,
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)

if output.returncode != 0:
msg = f"Command failed with error: '{output.stderr}'"
raise PipelineRunError(msg)

print("Finished pipeline run.")

def run(
Expand All @@ -65,7 +56,7 @@ def run(
*,
extra_volumes: t.Union[t.Optional[list], t.Optional[str]] = None,
build_args: t.Optional[t.List[str]] = None,
):
) -> None:
"""Run a pipeline, either from a compiled docker-compose spec or from a fondant pipeline.
Args:
Expand Down
53 changes: 19 additions & 34 deletions tests/pipeline/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from unittest import mock

import pytest
from fondant.core.exceptions import PipelineRunError
from fondant.pipeline import Pipeline
from fondant.pipeline.runner import (
DockerRunner,
Expand All @@ -25,22 +24,26 @@


@pytest.fixture()
def mock_subprocess_run():
def _mock_subprocess_run(*args, **kwargs):
class MockCompletedProcess:
returncode = 0

return MockCompletedProcess()

return _mock_subprocess_run
def mock_docker_installation(monkeypatch): # noqa: PT004
def mock_check_docker_install(self):
pass

def mock_check_docker_compose_install(self):
pass

monkeypatch.setattr(DockerRunner, "check_docker_install", mock_check_docker_install)
monkeypatch.setattr(
DockerRunner,
"check_docker_compose_install",
mock_check_docker_compose_install,
)


def test_docker_runner(mock_subprocess_run):
def test_docker_runner(mock_docker_installation):
"""Test that the docker runner while mocking subprocess.call."""
with mock.patch("subprocess.run") as mock_run:
mock_run.side_effect = mock_subprocess_run
with mock.patch("subprocess.call") as mock_call:
DockerRunner().run("some/path")
mock_run.assert_called_once_with(
mock_call.assert_called_once_with(
[
"docker",
"compose",
Expand All @@ -51,19 +54,15 @@ def test_docker_runner(mock_subprocess_run):
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
],
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)


def test_docker_runner_from_pipeline(mock_subprocess_run):
with mock.patch("subprocess.run") as mock_run:
mock_run.side_effect = mock_subprocess_run
def test_docker_runner_from_pipeline(mock_docker_installation):
with mock.patch("subprocess.call") as mock_call:
DockerRunner().run(PIPELINE)
mock_run.assert_called_once_with(
mock_call.assert_called_once_with(
[
"docker",
"compose",
Expand All @@ -74,25 +73,11 @@ def test_docker_runner_from_pipeline(mock_subprocess_run):
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
],
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)


def test_invalid_docker_run():
"""Test that the docker runner throws the correct error."""
spec_path = "some/path"
resolved_spec_path = str(Path(spec_path).resolve())
with pytest.raises(
PipelineRunError,
match=f"stat {resolved_spec_path}: no such file or directory",
):
DockerRunner().run(spec_path)


def test_docker_is_not_available():
expected_msg = (
"Docker is not installed or not running. Please make sure Docker is installed and is "
Expand Down
55 changes: 24 additions & 31 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from fondant.component.executor import Executor, ExecutorFactory
from fondant.core.schema import CloudCredentialsMount
from fondant.pipeline import Pipeline
from fondant.pipeline.runner import DockerRunner

commands = [
"fondant",
Expand All @@ -36,23 +37,28 @@
]


class MyTestComponent(DaskLoadComponent):
def __init__(self, *args):
@pytest.fixture()
def mock_docker_installation(monkeypatch): # noqa: PT004
def mock_check_docker_install(self):
pass

def load(self):
def mock_check_docker_compose_install(self):
pass

monkeypatch.setattr(DockerRunner, "check_docker_install", mock_check_docker_install)
monkeypatch.setattr(
DockerRunner,
"check_docker_compose_install",
mock_check_docker_compose_install,
)

@pytest.fixture()
def mock_subprocess_run():
def _mock_subprocess_run(*args, **kwargs):
class MockCompletedProcess:
returncode = 0

return MockCompletedProcess()
class MyTestComponent(DaskLoadComponent):
def __init__(self, *args):
pass

return _mock_subprocess_run
def load(self):
pass


@pytest.mark.parametrize("command", commands)
Expand Down Expand Up @@ -273,7 +279,7 @@ def test_sagemaker_compile(tmp_path_factory):
)


def test_local_run(mock_subprocess_run):
def test_local_run(mock_docker_installation):
"""Test that the run command works with different arguments."""
args = argparse.Namespace(
local=True,
Expand All @@ -286,11 +292,9 @@ def test_local_run(mock_subprocess_run):
extra_volumes=[],
build_arg=[],
)

with patch("subprocess.run") as mock_run:
mock_run.side_effect = mock_subprocess_run
with patch("subprocess.call") as mock_call:
run_local(args)
mock_run.assert_called_once_with(
mock_call.assert_called_once_with(
[
"docker",
"compose",
Expand All @@ -301,15 +305,11 @@ def test_local_run(mock_subprocess_run):
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
],
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)

with patch("subprocess.run") as mock_run:
mock_run.side_effect = mock_subprocess_run
with patch("subprocess.call") as mock_call:
args1 = argparse.Namespace(
local=True,
vertex=False,
Expand All @@ -323,7 +323,7 @@ def test_local_run(mock_subprocess_run):
credentials=None,
)
run_local(args1)
mock_run.assert_called_once_with(
mock_call.assert_called_once_with(
[
"docker",
"compose",
Expand All @@ -334,15 +334,12 @@ def test_local_run(mock_subprocess_run):
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
],
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)


def test_local_run_cloud_credentials(mock_subprocess_run):
def test_local_run_cloud_credentials(mock_docker_installation):
namespace_creds_kwargs = [
{"auth_gcp": True, "auth_azure": False, "auth_aws": False},
{"auth_gcp": False, "auth_azure": True, "auth_aws": False},
Expand All @@ -353,10 +350,8 @@ def test_local_run_cloud_credentials(mock_subprocess_run):
with patch(
"fondant.pipeline.compiler.DockerCompiler.compile",
) as mock_compiler, patch(
"subprocess.run",
"subprocess.call",
) as mock_runner:
mock_runner.side_effect = mock_subprocess_run

args = argparse.Namespace(
local=True,
vertex=False,
Expand All @@ -382,6 +377,7 @@ def test_local_run_cloud_credentials(mock_subprocess_run):
output_path=".fondant/compose.yaml",
build_args=[],
)

mock_runner.assert_called_once_with(
[
"docker",
Expand All @@ -393,11 +389,8 @@ def test_local_run_cloud_credentials(mock_subprocess_run):
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
],
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)


Expand Down

0 comments on commit 255548d

Please sign in to comment.