Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #776

Merged
merged 3 commits into from
Jan 12, 2024
Merged

Fixes #776

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions components/embed_text/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ The component takes the following arguments to alter its behavior:
| model | str | The model to generate embeddings from. Choose an available model name to pass to the model provider's langchain embedding class. | / |
| api_keys | dict | The API keys to use for the model provider that are written to environment variables.Pass only the keys required by the model provider or conveniently pass all keys you will ever need. Pay attention how to name the dictionary keys so that they can be used by the model provider. | / |
| auth_kwargs | dict | Additional keyword arguments required for api initialization/authentication. | / |
| retries | int | Number of retries to attempt when an embedding request fails. | 5 |

<a id="embed_text#usage"></a>
## Usage
Expand All @@ -58,7 +57,6 @@ dataset = dataset.apply(
# "model": ,
# "api_keys": {},
# "auth_kwargs": {},
# "retries": 5,
},
)
```
Expand Down
4 changes: 0 additions & 4 deletions components/embed_text/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ args:
Additional keyword arguments required for api initialization/authentication.
type: dict
default: {}
retries:
description: Number of retries to attempt when an embedding request fails.
type: int
default: 5



6 changes: 1 addition & 5 deletions components/embed_text/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
VertexAIEmbeddings,
)
from langchain.schema.embeddings import Embeddings
from retry import retry

logger = logging.getLogger(__name__)

Expand All @@ -30,12 +29,10 @@ def __init__(
model: str,
api_keys: dict,
auth_kwargs: dict,
retries: int,
**kwargs,
):
to_env_vars(api_keys)

self.retries = retries
self.embedding_model = self.get_embedding_model(
model_provider,
model,
Expand Down Expand Up @@ -65,8 +62,7 @@ def get_embedding_model(

# make sure to keep trying even when api call limit is reached
def get_embeddings_vectors(self, texts):
retry_wrapper = retry(tries=self.retries, logger=logger)
return retry_wrapper(self.embedding_model.embed_documents(texts.tolist()))
return self.embedding_model.embed_documents(texts.tolist())

def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
dataframe["embedding"] = self.get_embeddings_vectors(
Expand Down
1 change: 0 additions & 1 deletion components/embed_text/tests/embed_text_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def test_run_component_test():
model="all-MiniLM-L6-v2",
api_keys={},
auth_kwargs={},
retries=5,
)

dataframe = component.transform(dataframe=dataframe)
Expand Down
4 changes: 0 additions & 4 deletions src/fondant/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,3 @@ class InvalidTypeSchema(ValidationError, FondantException):

class UnsupportedTypeAnnotation(FondantException):
"""Thrown when an unsupported type annotation is encountered during type inference."""


class PipelineRunError(ValidationError, FondantException):
"""Thrown when a pipeline run results in an error."""
13 changes: 2 additions & 11 deletions src/fondant/pipeline/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import yaml

from fondant.core.exceptions import PipelineRunError
from fondant.pipeline import Pipeline
from fondant.pipeline.compiler import (
DockerCompiler,
Expand Down Expand Up @@ -40,23 +39,15 @@ def _run(self, input_spec: str, *args, **kwargs):
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
]

print("Starting pipeline run...")

# copy the current environment with the DOCKER_DEFAULT_PLATFORM argument
output = subprocess.run( # nosec
subprocess.call( # nosec
cmd,
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)

if output.returncode != 0:
msg = f"Command failed with error: '{output.stderr}'"
raise PipelineRunError(msg)

print("Finished pipeline run.")

def run(
Expand All @@ -65,7 +56,7 @@ def run(
*,
extra_volumes: t.Union[t.Optional[list], t.Optional[str]] = None,
build_args: t.Optional[t.List[str]] = None,
):
) -> None:
"""Run a pipeline, either from a compiled docker-compose spec or from a fondant pipeline.

Args:
Expand Down
53 changes: 19 additions & 34 deletions tests/pipeline/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from unittest import mock

import pytest
from fondant.core.exceptions import PipelineRunError
from fondant.pipeline import Pipeline
from fondant.pipeline.runner import (
DockerRunner,
Expand All @@ -25,22 +24,26 @@


@pytest.fixture()
def mock_subprocess_run():
def _mock_subprocess_run(*args, **kwargs):
class MockCompletedProcess:
returncode = 0

return MockCompletedProcess()

return _mock_subprocess_run
def mock_docker_installation(monkeypatch): # noqa: PT004
def mock_check_docker_install(self):
pass

def mock_check_docker_compose_install(self):
pass

monkeypatch.setattr(DockerRunner, "check_docker_install", mock_check_docker_install)
monkeypatch.setattr(
DockerRunner,
"check_docker_compose_install",
mock_check_docker_compose_install,
)


def test_docker_runner(mock_subprocess_run):
def test_docker_runner(mock_docker_installation):
"""Test that the docker runner while mocking subprocess.call."""
with mock.patch("subprocess.run") as mock_run:
mock_run.side_effect = mock_subprocess_run
with mock.patch("subprocess.call") as mock_call:
DockerRunner().run("some/path")
mock_run.assert_called_once_with(
mock_call.assert_called_once_with(
[
"docker",
"compose",
Expand All @@ -51,19 +54,15 @@ def test_docker_runner(mock_subprocess_run):
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
],
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)


def test_docker_runner_from_pipeline(mock_subprocess_run):
with mock.patch("subprocess.run") as mock_run:
mock_run.side_effect = mock_subprocess_run
def test_docker_runner_from_pipeline(mock_docker_installation):
with mock.patch("subprocess.call") as mock_call:
DockerRunner().run(PIPELINE)
mock_run.assert_called_once_with(
mock_call.assert_called_once_with(
[
"docker",
"compose",
Expand All @@ -74,25 +73,11 @@ def test_docker_runner_from_pipeline(mock_subprocess_run):
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
],
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)


def test_invalid_docker_run():
"""Test that the docker runner throws the correct error."""
spec_path = "some/path"
resolved_spec_path = str(Path(spec_path).resolve())
with pytest.raises(
PipelineRunError,
match=f"stat {resolved_spec_path}: no such file or directory",
):
DockerRunner().run(spec_path)


def test_docker_is_not_available():
expected_msg = (
"Docker is not installed or not running. Please make sure Docker is installed and is "
Expand Down
55 changes: 24 additions & 31 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from fondant.component.executor import Executor, ExecutorFactory
from fondant.core.schema import CloudCredentialsMount
from fondant.pipeline import Pipeline
from fondant.pipeline.runner import DockerRunner

commands = [
"fondant",
Expand All @@ -36,23 +37,28 @@
]


class MyTestComponent(DaskLoadComponent):
def __init__(self, *args):
@pytest.fixture()
def mock_docker_installation(monkeypatch): # noqa: PT004
def mock_check_docker_install(self):
pass

def load(self):
def mock_check_docker_compose_install(self):
pass

monkeypatch.setattr(DockerRunner, "check_docker_install", mock_check_docker_install)
monkeypatch.setattr(
DockerRunner,
"check_docker_compose_install",
mock_check_docker_compose_install,
)

@pytest.fixture()
def mock_subprocess_run():
def _mock_subprocess_run(*args, **kwargs):
class MockCompletedProcess:
returncode = 0

return MockCompletedProcess()
class MyTestComponent(DaskLoadComponent):
def __init__(self, *args):
pass

return _mock_subprocess_run
def load(self):
pass


@pytest.mark.parametrize("command", commands)
Expand Down Expand Up @@ -273,7 +279,7 @@ def test_sagemaker_compile(tmp_path_factory):
)


def test_local_run(mock_subprocess_run):
def test_local_run(mock_docker_installation):
"""Test that the run command works with different arguments."""
args = argparse.Namespace(
local=True,
Expand All @@ -286,11 +292,9 @@ def test_local_run(mock_subprocess_run):
extra_volumes=[],
build_arg=[],
)

with patch("subprocess.run") as mock_run:
mock_run.side_effect = mock_subprocess_run
with patch("subprocess.call") as mock_call:
run_local(args)
mock_run.assert_called_once_with(
mock_call.assert_called_once_with(
[
"docker",
"compose",
Expand All @@ -301,15 +305,11 @@ def test_local_run(mock_subprocess_run):
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
],
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)

with patch("subprocess.run") as mock_run:
mock_run.side_effect = mock_subprocess_run
with patch("subprocess.call") as mock_call:
args1 = argparse.Namespace(
local=True,
vertex=False,
Expand All @@ -323,7 +323,7 @@ def test_local_run(mock_subprocess_run):
credentials=None,
)
run_local(args1)
mock_run.assert_called_once_with(
mock_call.assert_called_once_with(
[
"docker",
"compose",
Expand All @@ -334,15 +334,12 @@ def test_local_run(mock_subprocess_run):
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
],
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)


def test_local_run_cloud_credentials(mock_subprocess_run):
def test_local_run_cloud_credentials(mock_docker_installation):
namespace_creds_kwargs = [
{"auth_gcp": True, "auth_azure": False, "auth_aws": False},
{"auth_gcp": False, "auth_azure": True, "auth_aws": False},
Expand All @@ -353,10 +350,8 @@ def test_local_run_cloud_credentials(mock_subprocess_run):
with patch(
"fondant.pipeline.compiler.DockerCompiler.compile",
) as mock_compiler, patch(
"subprocess.run",
"subprocess.call",
) as mock_runner:
mock_runner.side_effect = mock_subprocess_run

args = argparse.Namespace(
local=True,
vertex=False,
Expand All @@ -382,6 +377,7 @@ def test_local_run_cloud_credentials(mock_subprocess_run):
output_path=".fondant/compose.yaml",
build_args=[],
)

mock_runner.assert_called_once_with(
[
"docker",
Expand All @@ -393,11 +389,8 @@ def test_local_run_cloud_credentials(mock_subprocess_run):
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
],
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)


Expand Down
Loading