From c87de69a6a75dfc89217da8eab2dd930f960a926 Mon Sep 17 00:00:00 2001 From: Georges Lorre Date: Tue, 12 Sep 2023 14:27:11 +0200 Subject: [PATCH 1/4] Add vertex runner --- .github/workflows/pipeline.yaml | 2 +- pyproject.toml | 5 +++-- src/fondant/runner.py | 28 ++++++++++++++++++++++++++++ tests/test_runner.py | 11 ++++++++++- 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/.github/workflows/pipeline.yaml b/.github/workflows/pipeline.yaml index 38bc56cce..11deb2329 100644 --- a/.github/workflows/pipeline.yaml +++ b/.github/workflows/pipeline.yaml @@ -11,7 +11,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ['3.8', '3.9', '3.10', '3.11'] + python-version: ['3.8', '3.9', '3.10'] steps: - uses: actions/checkout@v2 - name: Set up Python ${{ matrix.python-version }} diff --git a/pyproject.toml b/pyproject.toml index 1cb65ad09..c73d09685 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,7 +41,7 @@ classifiers = [ ] [tool.poetry.dependencies] -python = ">= 3.8 < 3.12" +python = ">= 3.8 < 3.11" dask = {extras = ["dataframe"], version = ">= 2023.4.1"} importlib-resources = { version = ">= 1.3", python = "<3.9" } jsonschema = ">= 4.18" @@ -53,13 +53,14 @@ s3fs = { version = ">= 2023.4.0", optional = true } adlfs = { version = ">= 2023.4.0", optional = true } kfp = { version = "2.0.1", optional = true } pandas = { version = ">= 1.3.5", optional = true } +google-cloud-aiplatform = "^1.32.0" [tool.poetry.extras] aws = ["fsspec", "s3fs"] azure = ["fsspec", "adlfs"] gcp = ["fsspec", "gcsfs"] kfp = ["kfp"] -vertex = ["kfp"] +vertex = ["kfp", "google-cloud-aiplatform"] [tool.poetry.group.test.dependencies] pre-commit = "^3.1.1" diff --git a/src/fondant/runner.py b/src/fondant/runner.py index ff75ae1a3..a72a697e8 100644 --- a/src/fondant/runner.py +++ b/src/fondant/runner.py @@ -85,3 +85,31 @@ def get_name_from_spec(self, input_spec: str): with open(input_spec) as f: spec = yaml.safe_load(f) return spec["pipelineInfo"]["name"] + + +class VertexRunner(Runner): + def __resolve_imports(self): + import google.cloud.aiplatform as aip + + self.aip = aip + + def __init__(self, project_id: str, project_region: str): + self.__resolve_imports() + self.aip.init( + project=project_id, + location=project_region, + ) + + def run(self, input_spec: str, *args, **kwargs): + job = self.aip.PipelineJob( + display_name=self.get_name_from_spec(input_spec), + template_path=input_spec, + enable_caching=False, + ) + job.submit() + + def get_name_from_spec(self, input_spec: str): + """Get the name of the pipeline from the spec.""" + with open(input_spec) as f: + spec = yaml.safe_load(f) + return spec["pipelineInfo"]["name"] diff --git a/tests/test_runner.py b/tests/test_runner.py index 975359db3..405aab87f 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -4,7 +4,7 @@ from unittest import mock import pytest -from fondant.runner import DockerRunner, KubeflowRunner +from fondant.runner import DockerRunner, KubeflowRunner, VertexRunner VALID_PIPELINE = Path("./tests/example_pipelines/compiled_pipeline/") @@ -79,3 +79,12 @@ def test_kfp_import(): sys.modules["kfp"] = None with pytest.raises(ImportError): _ = KubeflowRunner(host="some_host") + + +def test_vertex_runner(): + input_spec_path = str(VALID_PIPELINE / "kubeflow_pipeline.yml") + with mock.patch("google.cloud.aiplatform.init", return_value=None), mock.patch( + "google.cloud.aiplatform.PipelineJob", + ): + runner = VertexRunner(project_id="some_project", project_region="some_region") + runner.run(input_spec=input_spec_path) From 2538b3af4976d0c9537e38c01329ac6b4dce7d88 Mon Sep 17 00:00:00 2001 From: Georges Lorre Date: Tue, 12 Sep 2023 19:27:19 +0200 Subject: [PATCH 2/4] make aiplatform dependencies optional --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c73d09685..a1ea1c88a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,7 +53,7 @@ s3fs = { version = ">= 2023.4.0", optional = true } adlfs = { version = ">= 2023.4.0", optional = true } kfp = { version = "2.0.1", optional = true } pandas = { version = ">= 1.3.5", optional = true } -google-cloud-aiplatform = "^1.32.0" +google-cloud-aiplatform = { version = "1.32.0", optional = true} [tool.poetry.extras] aws = ["fsspec", "s3fs"] From 3d2fde74ad5aa140e171aa261ae175168a5bff4e Mon Sep 17 00:00:00 2001 From: Georges Lorre Date: Wed, 13 Sep 2023 11:34:37 +0200 Subject: [PATCH 3/4] Add support for svc account --- src/fondant/runner.py | 9 +++++++-- tests/test_runner.py | 8 ++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/fondant/runner.py b/src/fondant/runner.py index a72a697e8..184d4915f 100644 --- a/src/fondant/runner.py +++ b/src/fondant/runner.py @@ -93,12 +93,14 @@ def __resolve_imports(self): self.aip = aip - def __init__(self, project_id: str, project_region: str): + def __init__(self, project_id: str, project_region: str, service_account: str = ""): self.__resolve_imports() + self.aip.init( project=project_id, location=project_region, ) + self.service_account = service_account def run(self, input_spec: str, *args, **kwargs): job = self.aip.PipelineJob( @@ -106,7 +108,10 @@ def run(self, input_spec: str, *args, **kwargs): template_path=input_spec, enable_caching=False, ) - job.submit() + if self.service_account: + job.submit(service_account=self.service_account) + else: + job.submit() def get_name_from_spec(self, input_spec: str): """Get the name of the pipeline from the spec.""" diff --git a/tests/test_runner.py b/tests/test_runner.py index 405aab87f..b73deb32f 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -88,3 +88,11 @@ def test_vertex_runner(): ): runner = VertexRunner(project_id="some_project", project_region="some_region") runner.run(input_spec=input_spec_path) + + # test with service account + runner2 = VertexRunner( + project_id="some_project", + project_region="some_region", + service_account="some_account", + ) + runner2.run(input_spec=input_spec_path) From 8931f9f03718647ddc1fade89c5434515e393405 Mon Sep 17 00:00:00 2001 From: Georges Lorre Date: Wed, 13 Sep 2023 15:03:35 +0200 Subject: [PATCH 4/4] Make service account optional --- src/fondant/runner.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/fondant/runner.py b/src/fondant/runner.py index 184d4915f..59035e084 100644 --- a/src/fondant/runner.py +++ b/src/fondant/runner.py @@ -1,5 +1,6 @@ import logging import subprocess # nosec +import typing as t from abc import ABC, abstractmethod import yaml @@ -93,7 +94,12 @@ def __resolve_imports(self): self.aip = aip - def __init__(self, project_id: str, project_region: str, service_account: str = ""): + def __init__( + self, + project_id: str, + project_region: str, + service_account: t.Optional[str] = None, + ): self.__resolve_imports() self.aip.init( @@ -108,10 +114,7 @@ def run(self, input_spec: str, *args, **kwargs): template_path=input_spec, enable_caching=False, ) - if self.service_account: - job.submit(service_account=self.service_account) - else: - job.submit() + job.submit(service_account=self.service_account) def get_name_from_spec(self, input_spec: str): """Get the name of the pipeline from the spec."""