Skip to content

Commit

Permalink
Add bigquery plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw committed Dec 22, 2021
1 parent dc05a5b commit fb18d7b
Show file tree
Hide file tree
Showing 8 changed files with 359 additions and 0 deletions.
15 changes: 15 additions & 0 deletions plugins/flytekit-bigquery/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Flytekit BigQuery Plugin

BigQuery enables us to build data-intensive applications without operational burden. Flyte backend can be connected with the BigQuery service. Once enabled, it can allow you to query a BigQuery table.

To install the plugin, run the following command:

```bash
pip install flytekitplugins-bigquery
```

To configure BigQuery in the Flyte deployment's backend, follow the [configuration guide](https://docs.flyte.org/en/latest/deployment/plugin_setup/gcp/bigquery.html#deployment-plugin-setup-gcp-bigquery).

TODO: Add example

An [example]() showcasing BigQuery service can be found in the documentation.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .task import BigQueryConfig, BigQueryTask
81 changes: 81 additions & 0 deletions plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional, Type

from google.cloud import bigquery
from google.protobuf.struct_pb2 import Struct

from flytekit.extend import SerializationSettings, SQLTask
from flytekit.models import task as _task_model
from flytekit.types.schema import FlyteSchema


@dataclass
class BigQueryConfig(object):
"""
BigQueryConfig should be used to configure a BigQuery Task.
"""

Location: Optional[str]
ProjectID: Optional[str]
QueryJobConfig: Optional[bigquery.QueryJobConfig] = None


class BigQueryTask(SQLTask[BigQueryConfig]):
"""
This is the simplest form of a BigQuery Task, that can be used even for tasks that do not produce any output.
"""

# This task is executed using the BigQuery handler in the backend.
_TASK_TYPE = "bigquery_query_job_task"

def __init__(
self,
name: str,
query_template: str,
task_config: Optional[BigQueryConfig],
inputs: Optional[Dict[str, Type]] = None,
output_schema_type: Optional[Type[FlyteSchema]] = None,
**kwargs,
):
"""
To be used to query BigQuery Tables.
:param name: Name of this task, should be unique in the project
:param query_template: The actual query to run. We use Flyte's Golang templating format for Query templating.
Refer to the templating documentation
:param task_config: BigQueryConfig object
:param inputs: Name and type of inputs specified as an ordered dictionary
:param output_schema_type: If some data is produced by this query, then you can specify the output schema type
:param kwargs: All other args required by Parent type - SQLTask
"""
outputs = None
if output_schema_type is not None:
outputs = {
"results": output_schema_type,
}
super().__init__(
name=name,
task_config=task_config,
query_template=query_template,
inputs=inputs,
outputs=outputs,
task_type=self._TASK_TYPE,
**kwargs,
)
self._output_schema_type = output_schema_type

def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
config: dict = self.task_config.QueryJobConfig.to_api_repr()["query"]
config.update(
{
"Location": self.task_config.Location,
"ProjectID": self.task_config.ProjectID,
}
)
s = Struct()
s.update(config)
return s

def get_sql(self, settings: SerializationSettings) -> Optional[_task_model.Sql]:
sql = _task_model.Sql(statement=self.query_template, dialect=_task_model.Sql.Dialect.ANSI)
return sql
2 changes: 2 additions & 0 deletions plugins/flytekit-bigquery/requirements.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.
-e file:.#egg=flytekitplugins-bigquery
148 changes: 148 additions & 0 deletions plugins/flytekit-bigquery/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
#
# This file is autogenerated by pip-compile with python 3.9
# To update, run:
#
# pip-compile requirements.in
#
-e file:.#egg=flytekitplugins-bigquery
# via -r requirements.in
arrow==1.2.1
# via jinja2-time
binaryornot==0.4.4
# via cookiecutter
certifi==2021.10.8
# via requests
chardet==4.0.0
# via binaryornot
charset-normalizer==2.0.7
# via requests
checksumdir==1.2.0
# via flytekit
click==7.1.2
# via
# cookiecutter
# flytekit
cloudpickle==2.0.0
# via flytekit
cookiecutter==1.7.3
# via flytekit
croniter==1.0.15
# via flytekit
dataclasses-json==0.5.6
# via flytekit
decorator==5.1.0
# via retry
deprecated==1.2.13
# via flytekit
diskcache==5.2.1
# via flytekit
docker-image-py==0.1.12
# via flytekit
docstring-parser==0.12
# via flytekit
flyteidl==0.21.8
# via flytekit
flytekit==0.24.0
# via flytekitplugins-bigquery
grpcio==1.41.1
# via flytekit
idna==3.3
# via requests
importlib-metadata==4.8.2
# via keyring
jinja2==3.0.3
# via
# cookiecutter
# jinja2-time
jinja2-time==0.2.0
# via cookiecutter
keyring==23.2.1
# via flytekit
markupsafe==2.0.1
# via jinja2
marshmallow==3.14.0
# via
# dataclasses-json
# marshmallow-enum
# marshmallow-jsonschema
marshmallow-enum==1.5.1
# via dataclasses-json
marshmallow-jsonschema==0.13.0
# via flytekit
mypy-extensions==0.4.3
# via typing-inspect
natsort==8.0.0
# via flytekit
numpy==1.21.4
# via
# pandas
# pyarrow
pandas==1.3.4
# via flytekit
poyo==0.5.0
# via cookiecutter
protobuf==3.19.1
# via
# flyteidl
# flytekit
py==1.11.0
# via retry
pyarrow==6.0.0
# via flytekit
python-dateutil==2.8.1
# via
# arrow
# croniter
# flytekit
# pandas
python-json-logger==2.0.2
# via flytekit
python-slugify==5.0.2
# via cookiecutter
pytimeparse==1.1.8
# via flytekit
pytz==2018.4
# via
# flytekit
# pandas
regex==2021.11.10
# via docker-image-py
requests==2.26.0
# via
# cookiecutter
# flytekit
# responses
responses==0.15.0
# via flytekit
retry==0.9.2
# via flytekit
six==1.16.0
# via
# cookiecutter
# flytekit
# grpcio
# python-dateutil
# responses
sortedcontainers==2.4.0
# via flytekit
statsd==3.3.0
# via flytekit
text-unidecode==1.3
# via python-slugify
typing-extensions==3.10.0.2
# via typing-inspect
typing-inspect==0.7.1
# via dataclasses-json
urllib3==1.26.7
# via
# flytekit
# requests
# responses
wheel==0.37.0
# via flytekit
wrapt==1.13.3
# via
# deprecated
# flytekit
zipp==3.6.0
# via importlib-metadata
34 changes: 34 additions & 0 deletions plugins/flytekit-bigquery/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from setuptools import setup

PLUGIN_NAME = "bigquery"

microlib_name = f"flytekitplugins-{PLUGIN_NAME}"

plugin_requires = ["flytekit>=v0.23.0b0,<1.0.0"]

__version__ = "0.0.0+develop"

setup(
name=microlib_name,
version=__version__,
author="flyteorg",
author_email="[email protected]",
description="This package holds the Bigquery plugins for flytekit",
namespace_packages=["flytekitplugins"],
packages=[f"flytekitplugins.{PLUGIN_NAME}"],
install_requires=plugin_requires,
license="apache2",
python_requires=">=3.7",
classifiers=[
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Topic :: Scientific/Engineering",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Software Development",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
)
Empty file.
78 changes: 78 additions & 0 deletions plugins/flytekit-bigquery/tests/test_bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from collections import OrderedDict

import pytest
from flytekitplugins.bigquery import BigQueryConfig, BigQueryTask
from google.cloud.bigquery import QueryJobConfig
from google.protobuf.struct_pb2 import Struct

from flytekit import kwtypes, workflow
from flytekit.extend import Image, ImageConfig, SerializationSettings, get_serializable
from flytekit.types.schema import FlyteSchema

query_template = """
insert overwrite directory '{{ .rawOutputDataPrefix }}' stored as parquet
select *
from my_table
where ds = '{{ .Inputs.ds }}'
"""


def test_serialization():
bigquery_task = BigQueryTask(
name="flytekit.demo.bigquery_task.query",
inputs=kwtypes(ds=str),
task_config=BigQueryConfig(
ProjectID="Flyte", Location="Asia", QueryJobConfig=QueryJobConfig(allow_large_results=True)
),
query_template=query_template,
# the schema literal's backend uri will be equal to the value of .raw_output_data
output_schema_type=FlyteSchema,
)

@workflow
def my_wf(ds: str) -> FlyteSchema:
return bigquery_task(ds=ds)

default_img = Image(name="default", fqn="test", tag="tag")
serialization_settings = SerializationSettings(
project="proj",
domain="dom",
version="123",
image_config=ImageConfig(default_image=default_img, images=[default_img]),
env={},
)

task_spec = get_serializable(OrderedDict(), serialization_settings, bigquery_task)

assert "{{ .rawOutputDataPrefix" in task_spec.template.sql.statement
assert "insert overwrite directory" in task_spec.template.sql.statement
assert task_spec.template.sql.dialect == task_spec.template.sql.Dialect.ANSI
s = Struct()
s.update({"ProjectID": "Flyte", "Location": "Asia", "allowLargeResults": True})
assert task_spec.template.custom == s
assert len(task_spec.template.interface.inputs) == 1
assert len(task_spec.template.interface.outputs) == 1

admin_workflow_spec = get_serializable(OrderedDict(), serialization_settings, my_wf)
assert admin_workflow_spec.template.interface.outputs["o0"].type.schema is not None
assert admin_workflow_spec.template.outputs[0].var == "o0"
assert admin_workflow_spec.template.outputs[0].binding.promise.node_id == "n0"
assert admin_workflow_spec.template.outputs[0].binding.promise.var == "results"


def test_local_exec():
bigquery_task = BigQueryTask(
name="flytekit.demo.snowflake_task.query2",
inputs=kwtypes(ds=str),
query_template=query_template,
task_config=BigQueryConfig(ProjectID="Flyte", Location="Asia"),
# the schema literal's backend uri will be equal to the value of .raw_output_data
output_schema_type=FlyteSchema,
)

assert len(bigquery_task.interface.inputs) == 1
assert len(bigquery_task.interface.outputs) == 1

# will not run locally
with pytest.raises(Exception):
bigquery_task()

0 comments on commit fb18d7b

Please sign in to comment.