From 255548d48c8718dc6721db6667745b6bed35f3cd Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Fri, 12 Jan 2024 12:02:12 +0100 Subject: [PATCH] Fixes (#776) Fixes for the 0.9 release: * Revert back retires from embedding text component since some models have their own retry mechanism implemented [example](https://github.com/langchain-ai/langchain/blob/9b3962fc2521ec0d6ef2ea7c0a40b9c32977671a/libs/community/langchain_community/embeddings/openai.py#L216). They can be modified through extra arguments * Reverted back the docker compose error handling since it was blocking some logs for showing up, the issue should be fleshed out more. Created a separate ticket for it #775 --- components/embed_text/README.md | 2 - components/embed_text/fondant_component.yaml | 4 -- components/embed_text/src/main.py | 6 +- .../embed_text/tests/embed_text_test.py | 1 - src/fondant/core/exceptions.py | 4 -- src/fondant/pipeline/runner.py | 13 +---- tests/pipeline/test_runner.py | 53 +++++++----------- tests/test_cli.py | 55 ++++++++----------- 8 files changed, 46 insertions(+), 92 deletions(-) diff --git a/components/embed_text/README.md b/components/embed_text/README.md index 169d0e251..030a7fc91 100644 --- a/components/embed_text/README.md +++ b/components/embed_text/README.md @@ -35,7 +35,6 @@ The component takes the following arguments to alter its behavior: | model | str | The model to generate embeddings from. Choose an available model name to pass to the model provider's langchain embedding class. | / | | api_keys | dict | The API keys to use for the model provider that are written to environment variables.Pass only the keys required by the model provider or conveniently pass all keys you will ever need. Pay attention how to name the dictionary keys so that they can be used by the model provider. | / | | auth_kwargs | dict | Additional keyword arguments required for api initialization/authentication. | / | -| retries | int | Number of retries to attempt when an embedding request fails. | 5 | ## Usage @@ -58,7 +57,6 @@ dataset = dataset.apply( # "model": , # "api_keys": {}, # "auth_kwargs": {}, - # "retries": 5, }, ) ``` diff --git a/components/embed_text/fondant_component.yaml b/components/embed_text/fondant_component.yaml index 3ff2cc23f..a739129da 100644 --- a/components/embed_text/fondant_component.yaml +++ b/components/embed_text/fondant_component.yaml @@ -40,10 +40,6 @@ args: Additional keyword arguments required for api initialization/authentication. type: dict default: {} - retries: - description: Number of retries to attempt when an embedding request fails. - type: int - default: 5 \ No newline at end of file diff --git a/components/embed_text/src/main.py b/components/embed_text/src/main.py index 17c7d1690..e0b7fe5e3 100644 --- a/components/embed_text/src/main.py +++ b/components/embed_text/src/main.py @@ -12,7 +12,6 @@ VertexAIEmbeddings, ) from langchain.schema.embeddings import Embeddings -from retry import retry logger = logging.getLogger(__name__) @@ -30,12 +29,10 @@ def __init__( model: str, api_keys: dict, auth_kwargs: dict, - retries: int, **kwargs, ): to_env_vars(api_keys) - self.retries = retries self.embedding_model = self.get_embedding_model( model_provider, model, @@ -65,8 +62,7 @@ def get_embedding_model( # make sure to keep trying even when api call limit is reached def get_embeddings_vectors(self, texts): - retry_wrapper = retry(tries=self.retries, logger=logger) - return retry_wrapper(self.embedding_model.embed_documents(texts.tolist())) + return self.embedding_model.embed_documents(texts.tolist()) def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: dataframe["embedding"] = self.get_embeddings_vectors( diff --git a/components/embed_text/tests/embed_text_test.py b/components/embed_text/tests/embed_text_test.py index b180eb86d..52caf77a7 100644 --- a/components/embed_text/tests/embed_text_test.py +++ b/components/embed_text/tests/embed_text_test.py @@ -34,7 +34,6 @@ def test_run_component_test(): model="all-MiniLM-L6-v2", api_keys={}, auth_kwargs={}, - retries=5, ) dataframe = component.transform(dataframe=dataframe) diff --git a/src/fondant/core/exceptions.py b/src/fondant/core/exceptions.py index 5560b9aab..4143f389a 100644 --- a/src/fondant/core/exceptions.py +++ b/src/fondant/core/exceptions.py @@ -25,7 +25,3 @@ class InvalidTypeSchema(ValidationError, FondantException): class UnsupportedTypeAnnotation(FondantException): """Thrown when an unsupported type annotation is encountered during type inference.""" - - -class PipelineRunError(ValidationError, FondantException): - """Thrown when a pipeline run results in an error.""" diff --git a/src/fondant/pipeline/runner.py b/src/fondant/pipeline/runner.py index 4e0bb0627..92ec403ec 100644 --- a/src/fondant/pipeline/runner.py +++ b/src/fondant/pipeline/runner.py @@ -7,7 +7,6 @@ import yaml -from fondant.core.exceptions import PipelineRunError from fondant.pipeline import Pipeline from fondant.pipeline.compiler import ( DockerCompiler, @@ -40,23 +39,15 @@ def _run(self, input_spec: str, *args, **kwargs): "--pull", "always", "--remove-orphans", - "--abort-on-container-exit", ] print("Starting pipeline run...") # copy the current environment with the DOCKER_DEFAULT_PLATFORM argument - output = subprocess.run( # nosec + subprocess.call( # nosec cmd, env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), - capture_output=True, - encoding="utf8", ) - - if output.returncode != 0: - msg = f"Command failed with error: '{output.stderr}'" - raise PipelineRunError(msg) - print("Finished pipeline run.") def run( @@ -65,7 +56,7 @@ def run( *, extra_volumes: t.Union[t.Optional[list], t.Optional[str]] = None, build_args: t.Optional[t.List[str]] = None, - ): + ) -> None: """Run a pipeline, either from a compiled docker-compose spec or from a fondant pipeline. Args: diff --git a/tests/pipeline/test_runner.py b/tests/pipeline/test_runner.py index a5eaba1b7..c93b57915 100644 --- a/tests/pipeline/test_runner.py +++ b/tests/pipeline/test_runner.py @@ -6,7 +6,6 @@ from unittest import mock import pytest -from fondant.core.exceptions import PipelineRunError from fondant.pipeline import Pipeline from fondant.pipeline.runner import ( DockerRunner, @@ -25,22 +24,26 @@ @pytest.fixture() -def mock_subprocess_run(): - def _mock_subprocess_run(*args, **kwargs): - class MockCompletedProcess: - returncode = 0 - - return MockCompletedProcess() - - return _mock_subprocess_run +def mock_docker_installation(monkeypatch): # noqa: PT004 + def mock_check_docker_install(self): + pass + + def mock_check_docker_compose_install(self): + pass + + monkeypatch.setattr(DockerRunner, "check_docker_install", mock_check_docker_install) + monkeypatch.setattr( + DockerRunner, + "check_docker_compose_install", + mock_check_docker_compose_install, + ) -def test_docker_runner(mock_subprocess_run): +def test_docker_runner(mock_docker_installation): """Test that the docker runner while mocking subprocess.call.""" - with mock.patch("subprocess.run") as mock_run: - mock_run.side_effect = mock_subprocess_run + with mock.patch("subprocess.call") as mock_call: DockerRunner().run("some/path") - mock_run.assert_called_once_with( + mock_call.assert_called_once_with( [ "docker", "compose", @@ -51,19 +54,15 @@ def test_docker_runner(mock_subprocess_run): "--pull", "always", "--remove-orphans", - "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), - capture_output=True, - encoding="utf8", ) -def test_docker_runner_from_pipeline(mock_subprocess_run): - with mock.patch("subprocess.run") as mock_run: - mock_run.side_effect = mock_subprocess_run +def test_docker_runner_from_pipeline(mock_docker_installation): + with mock.patch("subprocess.call") as mock_call: DockerRunner().run(PIPELINE) - mock_run.assert_called_once_with( + mock_call.assert_called_once_with( [ "docker", "compose", @@ -74,25 +73,11 @@ def test_docker_runner_from_pipeline(mock_subprocess_run): "--pull", "always", "--remove-orphans", - "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), - capture_output=True, - encoding="utf8", ) -def test_invalid_docker_run(): - """Test that the docker runner throws the correct error.""" - spec_path = "some/path" - resolved_spec_path = str(Path(spec_path).resolve()) - with pytest.raises( - PipelineRunError, - match=f"stat {resolved_spec_path}: no such file or directory", - ): - DockerRunner().run(spec_path) - - def test_docker_is_not_available(): expected_msg = ( "Docker is not installed or not running. Please make sure Docker is installed and is " diff --git a/tests/test_cli.py b/tests/test_cli.py index 23a4e2d0c..3acd6d08a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -25,6 +25,7 @@ from fondant.component.executor import Executor, ExecutorFactory from fondant.core.schema import CloudCredentialsMount from fondant.pipeline import Pipeline +from fondant.pipeline.runner import DockerRunner commands = [ "fondant", @@ -36,23 +37,28 @@ ] -class MyTestComponent(DaskLoadComponent): - def __init__(self, *args): +@pytest.fixture() +def mock_docker_installation(monkeypatch): # noqa: PT004 + def mock_check_docker_install(self): pass - def load(self): + def mock_check_docker_compose_install(self): pass + monkeypatch.setattr(DockerRunner, "check_docker_install", mock_check_docker_install) + monkeypatch.setattr( + DockerRunner, + "check_docker_compose_install", + mock_check_docker_compose_install, + ) -@pytest.fixture() -def mock_subprocess_run(): - def _mock_subprocess_run(*args, **kwargs): - class MockCompletedProcess: - returncode = 0 - return MockCompletedProcess() +class MyTestComponent(DaskLoadComponent): + def __init__(self, *args): + pass - return _mock_subprocess_run + def load(self): + pass @pytest.mark.parametrize("command", commands) @@ -273,7 +279,7 @@ def test_sagemaker_compile(tmp_path_factory): ) -def test_local_run(mock_subprocess_run): +def test_local_run(mock_docker_installation): """Test that the run command works with different arguments.""" args = argparse.Namespace( local=True, @@ -286,11 +292,9 @@ def test_local_run(mock_subprocess_run): extra_volumes=[], build_arg=[], ) - - with patch("subprocess.run") as mock_run: - mock_run.side_effect = mock_subprocess_run + with patch("subprocess.call") as mock_call: run_local(args) - mock_run.assert_called_once_with( + mock_call.assert_called_once_with( [ "docker", "compose", @@ -301,15 +305,11 @@ def test_local_run(mock_subprocess_run): "--pull", "always", "--remove-orphans", - "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), - capture_output=True, - encoding="utf8", ) - with patch("subprocess.run") as mock_run: - mock_run.side_effect = mock_subprocess_run + with patch("subprocess.call") as mock_call: args1 = argparse.Namespace( local=True, vertex=False, @@ -323,7 +323,7 @@ def test_local_run(mock_subprocess_run): credentials=None, ) run_local(args1) - mock_run.assert_called_once_with( + mock_call.assert_called_once_with( [ "docker", "compose", @@ -334,15 +334,12 @@ def test_local_run(mock_subprocess_run): "--pull", "always", "--remove-orphans", - "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), - capture_output=True, - encoding="utf8", ) -def test_local_run_cloud_credentials(mock_subprocess_run): +def test_local_run_cloud_credentials(mock_docker_installation): namespace_creds_kwargs = [ {"auth_gcp": True, "auth_azure": False, "auth_aws": False}, {"auth_gcp": False, "auth_azure": True, "auth_aws": False}, @@ -353,10 +350,8 @@ def test_local_run_cloud_credentials(mock_subprocess_run): with patch( "fondant.pipeline.compiler.DockerCompiler.compile", ) as mock_compiler, patch( - "subprocess.run", + "subprocess.call", ) as mock_runner: - mock_runner.side_effect = mock_subprocess_run - args = argparse.Namespace( local=True, vertex=False, @@ -382,6 +377,7 @@ def test_local_run_cloud_credentials(mock_subprocess_run): output_path=".fondant/compose.yaml", build_args=[], ) + mock_runner.assert_called_once_with( [ "docker", @@ -393,11 +389,8 @@ def test_local_run_cloud_credentials(mock_subprocess_run): "--pull", "always", "--remove-orphans", - "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), - capture_output=True, - encoding="utf8", )