From 9916e77d58e8b52aa7505a9cef779788aeec0ff5 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 18 Jan 2022 01:26:10 +0800 Subject: [PATCH] Add Bigquery plugin (#789) * Add bigquery plugin Signed-off-by: Kevin Su * Update dependency Signed-off-by: Kevin Su * update get_custom Signed-off-by: Kevin Su * Add structured dataset Signed-off-by: Kevin Su * Add structured dataset Signed-off-by: Kevin Su * Updated comment Signed-off-by: Kevin Su * Add BQ in GA Signed-off-by: Kevin Su * alphabetical order Signed-off-by: Kevin Su Signed-off-by: Eduardo Apolinario --- .github/workflows/pythonbuild.yml | 8 +- plugins/flytekit-bigquery/README.md | 11 + .../flytekitplugins/bigquery/__init__.py | 1 + .../flytekitplugins/bigquery/task.py | 82 +++++++ plugins/flytekit-bigquery/requirements.in | 2 + plugins/flytekit-bigquery/requirements.txt | 202 ++++++++++++++++++ plugins/flytekit-bigquery/setup.py | 36 ++++ plugins/flytekit-bigquery/tests/__init__.py | 0 .../flytekit-bigquery/tests/test_bigquery.py | 71 ++++++ plugins/setup.py | 5 +- 10 files changed, 414 insertions(+), 4 deletions(-) create mode 100644 plugins/flytekit-bigquery/README.md create mode 100644 plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py create mode 100644 plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py create mode 100644 plugins/flytekit-bigquery/requirements.in create mode 100644 plugins/flytekit-bigquery/requirements.txt create mode 100644 plugins/flytekit-bigquery/setup.py create mode 100644 plugins/flytekit-bigquery/tests/__init__.py create mode 100644 plugins/flytekit-bigquery/tests/test_bigquery.py diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index ce4b272426c..a5b8d2488d6 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -60,8 +60,10 @@ jobs: matrix: python-version: ["3.8", "3.9", "3.10"] plugin-names: + # Please maintain an alphabetical order in the following list - flytekit-aws-athena - flytekit-aws-sagemaker + - flytekit-bigquery - flytekit-data-fsspec - flytekit-dolt - flytekit-greatexpectations @@ -70,12 +72,12 @@ jobs: - flytekit-kf-mpi - flytekit-kf-pytorch - flytekit-kf-tensorflow + - flytekit-modin + - flytekit-pandera - flytekit-papermill + - flytekit-snowflake - flytekit-spark - flytekit-sqlalchemy - - flytekit-pandera - - flytekit-snowflake - - flytekit-modin exclude: # flytekit-modin depends on ray which does not have a 3.10 wheel yet. # Issue tracked in https://github.com/ray-project/ray/issues/19116. diff --git a/plugins/flytekit-bigquery/README.md b/plugins/flytekit-bigquery/README.md new file mode 100644 index 00000000000..7b8468ffc22 --- /dev/null +++ b/plugins/flytekit-bigquery/README.md @@ -0,0 +1,11 @@ +# Flytekit BigQuery Plugin + +BigQuery enables us to build data-intensive applications without operational burden. Flyte backend can be connected with the BigQuery service. Once enabled, it can allow you to query a BigQuery table. + +To install the plugin, run the following command: + +```bash +pip install flytekitplugins-bigquery +``` + +To configure BigQuery in the Flyte deployment's backend, follow the [configuration guide](https://docs.flyte.org/en/latest/deployment/plugin_setup/gcp/bigquery.html#deployment-plugin-setup-gcp-bigquery). diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py new file mode 100644 index 00000000000..cb259e4f498 --- /dev/null +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py @@ -0,0 +1 @@ +from .task import BigQueryConfig, BigQueryTask diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py new file mode 100644 index 00000000000..b7a5104dea8 --- /dev/null +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py @@ -0,0 +1,82 @@ +from dataclasses import dataclass +from typing import Any, Dict, Optional, Type + +from google.cloud import bigquery +from google.protobuf import json_format +from google.protobuf.struct_pb2 import Struct + +from flytekit import StructuredDataset +from flytekit.extend import SerializationSettings, SQLTask +from flytekit.models import task as _task_model + + +@dataclass +class BigQueryConfig(object): + """ + BigQueryConfig should be used to configure a BigQuery Task. + """ + + ProjectID: str + Location: Optional[str] = None + QueryJobConfig: Optional[bigquery.QueryJobConfig] = None + + +class BigQueryTask(SQLTask[BigQueryConfig]): + """ + This is the simplest form of a BigQuery Task, that can be used even for tasks that do not produce any output. + """ + + # This task is executed using the BigQuery handler in the backend. + # https://github.com/flyteorg/flyteplugins/blob/43623826fb189fa64dc4cb53e7025b517d911f22/go/tasks/plugins/webapi/bigquery/plugin.go#L34 + _TASK_TYPE = "bigquery_query_job_task" + + def __init__( + self, + name: str, + query_template: str, + task_config: Optional[BigQueryConfig], + inputs: Optional[Dict[str, Type]] = None, + output_structured_dataset_type: Optional[Type[StructuredDataset]] = None, + **kwargs, + ): + """ + To be used to query BigQuery Tables. + + :param name: Name of this task, should be unique in the project + :param query_template: The actual query to run. We use Flyte's Golang templating format for Query templating. + Refer to the templating documentation + :param task_config: BigQueryConfig object + :param inputs: Name and type of inputs specified as an ordered dictionary + :param output_structured_dataset_type: If some data is produced by this query, then you can specify the output StructuredDataset type + :param kwargs: All other args required by Parent type - SQLTask + """ + outputs = None + if output_structured_dataset_type is not None: + outputs = { + "results": output_structured_dataset_type, + } + super().__init__( + name=name, + task_config=task_config, + query_template=query_template, + inputs=inputs, + outputs=outputs, + task_type=self._TASK_TYPE, + **kwargs, + ) + self._output_structured_dataset_type = output_structured_dataset_type + + def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: + config = { + "Location": self.task_config.Location, + "ProjectID": self.task_config.ProjectID, + } + if self.task_config.QueryJobConfig is not None: + config.update(self.task_config.QueryJobConfig.to_api_repr()["query"]) + s = Struct() + s.update(config) + return json_format.MessageToDict(s) + + def get_sql(self, settings: SerializationSettings) -> Optional[_task_model.Sql]: + sql = _task_model.Sql(statement=self.query_template, dialect=_task_model.Sql.Dialect.ANSI) + return sql diff --git a/plugins/flytekit-bigquery/requirements.in b/plugins/flytekit-bigquery/requirements.in new file mode 100644 index 00000000000..d01f21ac2bd --- /dev/null +++ b/plugins/flytekit-bigquery/requirements.in @@ -0,0 +1,2 @@ +. +-e file:.#egg=flytekitplugins-bigquery diff --git a/plugins/flytekit-bigquery/requirements.txt b/plugins/flytekit-bigquery/requirements.txt new file mode 100644 index 00000000000..45be4f75a08 --- /dev/null +++ b/plugins/flytekit-bigquery/requirements.txt @@ -0,0 +1,202 @@ +# +# This file is autogenerated by pip-compile with python 3.9 +# To update, run: +# +# pip-compile requirements.in +# +-e file:.#egg=flytekitplugins-bigquery + # via -r requirements.in +arrow==1.2.1 + # via jinja2-time +binaryornot==0.4.4 + # via cookiecutter +cachetools==4.2.4 + # via google-auth +certifi==2021.10.8 + # via requests +chardet==4.0.0 + # via binaryornot +charset-normalizer==2.0.7 + # via requests +checksumdir==1.2.0 + # via flytekit +click==7.1.2 + # via + # cookiecutter + # flytekit +cloudpickle==2.0.0 + # via flytekit +cookiecutter==1.7.3 + # via flytekit +croniter==1.0.15 + # via flytekit +dataclasses-json==0.5.6 + # via flytekit +decorator==5.1.0 + # via retry +deprecated==1.2.13 + # via flytekit +diskcache==5.2.1 + # via flytekit +docker-image-py==0.1.12 + # via flytekit +docstring-parser==0.12 + # via flytekit +flyteidl==0.21.8 + # via flytekit +flytekit==0.24.0 + # via flytekitplugins-bigquery +google-api-core[grpc]==2.3.2 + # via + # google-cloud-bigquery + # google-cloud-core +google-auth==2.3.3 + # via + # google-api-core + # google-cloud-core +google-cloud-bigquery==2.31.0 + # via flytekitplugins-bigquery +google-cloud-core==2.2.1 + # via google-cloud-bigquery +google-crc32c==1.3.0 + # via google-resumable-media +google-resumable-media==2.1.0 + # via google-cloud-bigquery +googleapis-common-protos==1.54.0 + # via + # google-api-core + # grpcio-status +grpcio==1.43.0 + # via + # flytekit + # google-api-core + # google-cloud-bigquery + # grpcio-status +grpcio-status==1.43.0 + # via google-api-core +idna==3.3 + # via requests +importlib-metadata==4.8.2 + # via keyring +jinja2==3.0.3 + # via + # cookiecutter + # jinja2-time +jinja2-time==0.2.0 + # via cookiecutter +keyring==23.2.1 + # via flytekit +markupsafe==2.0.1 + # via jinja2 +marshmallow==3.14.0 + # via + # dataclasses-json + # marshmallow-enum + # marshmallow-jsonschema +marshmallow-enum==1.5.1 + # via dataclasses-json +marshmallow-jsonschema==0.13.0 + # via flytekit +mypy-extensions==0.4.3 + # via typing-inspect +natsort==8.0.0 + # via flytekit +numpy==1.21.4 + # via + # pandas + # pyarrow +packaging==21.3 + # via google-cloud-bigquery +pandas==1.3.4 + # via flytekit +poyo==0.5.0 + # via cookiecutter +proto-plus==1.19.8 + # via google-cloud-bigquery +protobuf==3.19.1 + # via + # flyteidl + # flytekit + # google-api-core + # google-cloud-bigquery + # googleapis-common-protos + # grpcio-status + # proto-plus +py==1.11.0 + # via retry +pyarrow==6.0.0 + # via flytekit +pyasn1==0.4.8 + # via + # pyasn1-modules + # rsa +pyasn1-modules==0.2.8 + # via google-auth +pyparsing==3.0.6 + # via packaging +python-dateutil==2.8.1 + # via + # arrow + # croniter + # flytekit + # google-cloud-bigquery + # pandas +python-json-logger==2.0.2 + # via flytekit +python-slugify==5.0.2 + # via cookiecutter +pytimeparse==1.1.8 + # via flytekit +pytz==2018.4 + # via + # flytekit + # pandas +regex==2021.11.10 + # via docker-image-py +requests==2.26.0 + # via + # cookiecutter + # flytekit + # google-api-core + # google-cloud-bigquery + # responses +responses==0.15.0 + # via flytekit +retry==0.9.2 + # via flytekit +rsa==4.8 + # via google-auth +six==1.16.0 + # via + # cookiecutter + # flytekit + # google-auth + # grpcio + # python-dateutil + # responses +sortedcontainers==2.4.0 + # via flytekit +statsd==3.3.0 + # via flytekit +text-unidecode==1.3 + # via python-slugify +typing-extensions==3.10.0.2 + # via typing-inspect +typing-inspect==0.7.1 + # via dataclasses-json +urllib3==1.26.7 + # via + # flytekit + # requests + # responses +wheel==0.37.0 + # via flytekit +wrapt==1.13.3 + # via + # deprecated + # flytekit +zipp==3.6.0 + # via importlib-metadata + +# The following packages are considered to be unsafe in a requirements file: +# setuptools diff --git a/plugins/flytekit-bigquery/setup.py b/plugins/flytekit-bigquery/setup.py new file mode 100644 index 00000000000..7be6fc7d562 --- /dev/null +++ b/plugins/flytekit-bigquery/setup.py @@ -0,0 +1,36 @@ +from setuptools import setup + +PLUGIN_NAME = "bigquery" + +microlib_name = f"flytekitplugins-{PLUGIN_NAME}" + +plugin_requires = ["flytekit>=v0.30.0b3,<1.0.0", "google-cloud-bigquery"] + +__version__ = "0.0.0+develop" + +setup( + name=microlib_name, + version=__version__, + author="flyteorg", + author_email="admin@flyte.org", + description="This package holds the Bigquery plugins for flytekit", + namespace_packages=["flytekitplugins"], + packages=[f"flytekitplugins.{PLUGIN_NAME}"], + install_requires=plugin_requires, + license="apache2", + python_requires=">=3.7", + classifiers=[ + "Intended Audience :: Science/Research", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Topic :: Scientific/Engineering", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: Software Development", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", + ], +) diff --git a/plugins/flytekit-bigquery/tests/__init__.py b/plugins/flytekit-bigquery/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/plugins/flytekit-bigquery/tests/test_bigquery.py b/plugins/flytekit-bigquery/tests/test_bigquery.py new file mode 100644 index 00000000000..78d6c0893f3 --- /dev/null +++ b/plugins/flytekit-bigquery/tests/test_bigquery.py @@ -0,0 +1,71 @@ +from collections import OrderedDict + +import pytest +from flytekitplugins.bigquery import BigQueryConfig, BigQueryTask +from google.cloud.bigquery import QueryJobConfig +from google.protobuf import json_format +from google.protobuf.struct_pb2 import Struct + +from flytekit import StructuredDataset, kwtypes, workflow +from flytekit.extend import Image, ImageConfig, SerializationSettings, get_serializable + +query_template = "SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE @version = 1 LIMIT 10" + + +def test_serialization(): + bigquery_task = BigQueryTask( + name="flytekit.demo.bigquery_task.query", + inputs=kwtypes(ds=str), + task_config=BigQueryConfig( + ProjectID="Flyte", Location="Asia", QueryJobConfig=QueryJobConfig(allow_large_results=True) + ), + query_template=query_template, + output_structured_dataset_type=StructuredDataset, + ) + + @workflow + def my_wf(ds: str) -> StructuredDataset: + return bigquery_task(ds=ds) + + default_img = Image(name="default", fqn="test", tag="tag") + serialization_settings = SerializationSettings( + project="proj", + domain="dom", + version="123", + image_config=ImageConfig(default_image=default_img, images=[default_img]), + env={}, + ) + + task_spec = get_serializable(OrderedDict(), serialization_settings, bigquery_task) + + assert "SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions`" in task_spec.template.sql.statement + assert "@version" in task_spec.template.sql.statement + assert task_spec.template.sql.dialect == task_spec.template.sql.Dialect.ANSI + s = Struct() + s.update({"ProjectID": "Flyte", "Location": "Asia", "allowLargeResults": True}) + assert task_spec.template.custom == json_format.MessageToDict(s) + assert len(task_spec.template.interface.inputs) == 1 + assert len(task_spec.template.interface.outputs) == 1 + + admin_workflow_spec = get_serializable(OrderedDict(), serialization_settings, my_wf) + assert admin_workflow_spec.template.interface.outputs["o0"].type.structured_dataset_type is not None + assert admin_workflow_spec.template.outputs[0].var == "o0" + assert admin_workflow_spec.template.outputs[0].binding.promise.node_id == "n0" + assert admin_workflow_spec.template.outputs[0].binding.promise.var == "results" + + +def test_local_exec(): + bigquery_task = BigQueryTask( + name="flytekit.demo.bigquery_task.query2", + inputs=kwtypes(ds=str), + query_template=query_template, + task_config=BigQueryConfig(ProjectID="Flyte", Location="Asia"), + output_structured_dataset_type=StructuredDataset, + ) + + assert len(bigquery_task.interface.inputs) == 1 + assert len(bigquery_task.interface.outputs) == 1 + + # will not run locally + with pytest.raises(Exception): + bigquery_task() diff --git a/plugins/setup.py b/plugins/setup.py index 4c5b69dc362..8f3cc5c2994 100644 --- a/plugins/setup.py +++ b/plugins/setup.py @@ -6,9 +6,12 @@ from setuptools.command.install import install PACKAGE_NAME = "flytekitplugins-parent" + +# Please maintain an alphabetical order in the following list SOURCES = { "flytekitplugins-athena": "flytekit-aws-athena", "flytekitplugins-awssagemaker": "flytekit-aws-sagemaker", + "flytekitplugins-bigquery": "flytekit-bigquery", "flytekitplugins-fsspec": "flytekit-data-fsspec", "flytekitplugins-dolt": "flytekit-dolt", "flytekitplugins-great_expectations": "flytekit-greatexpectations", @@ -17,12 +20,12 @@ "flytekitplugins-kfmpi": "flytekit-kf-mpi", "flytekitplugins-kfpytorch": "flytekit-kf-pytorch", "flytekitplugins-kftensorflow": "flytekit-kf-tensorflow", + "flytekitplugins-modin": "flytekit-modin", "flytekitplugins-pandera": "flytekit-pandera", "flytekitplugins-papermill": "flytekit-papermill", "flytekitplugins-snowflake": "flytekit-snowflake", "flytekitplugins-spark": "flytekit-spark", "flytekitplugins-sqlalchemy": "flytekit-sqlalchemy", - "flytekitplugins-modin": "flytekit-modin", }