Skip to content

Commit

Permalink
Add Bigquery plugin (#789)
Browse files Browse the repository at this point in the history
* Add bigquery plugin

Signed-off-by: Kevin Su <[email protected]>

* Update dependency

Signed-off-by: Kevin Su <[email protected]>

* update get_custom

Signed-off-by: Kevin Su <[email protected]>

* Add structured dataset

Signed-off-by: Kevin Su <[email protected]>

* Add structured dataset

Signed-off-by: Kevin Su <[email protected]>

* Updated comment

Signed-off-by: Kevin Su <[email protected]>

* Add BQ in GA

Signed-off-by: Kevin Su <[email protected]>

* alphabetical order

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
pingsutw authored and eapolinario committed Jan 28, 2022
1 parent 46ce00d commit 9916e77
Show file tree
Hide file tree
Showing 10 changed files with 414 additions and 4 deletions.
8 changes: 5 additions & 3 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ jobs:
matrix:
python-version: ["3.8", "3.9", "3.10"]
plugin-names:
# Please maintain an alphabetical order in the following list
- flytekit-aws-athena
- flytekit-aws-sagemaker
- flytekit-bigquery
- flytekit-data-fsspec
- flytekit-dolt
- flytekit-greatexpectations
Expand All @@ -70,12 +72,12 @@ jobs:
- flytekit-kf-mpi
- flytekit-kf-pytorch
- flytekit-kf-tensorflow
- flytekit-modin
- flytekit-pandera
- flytekit-papermill
- flytekit-snowflake
- flytekit-spark
- flytekit-sqlalchemy
- flytekit-pandera
- flytekit-snowflake
- flytekit-modin
exclude:
# flytekit-modin depends on ray which does not have a 3.10 wheel yet.
# Issue tracked in https://github.com/ray-project/ray/issues/19116.
Expand Down
11 changes: 11 additions & 0 deletions plugins/flytekit-bigquery/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Flytekit BigQuery Plugin

BigQuery enables us to build data-intensive applications without operational burden. Flyte backend can be connected with the BigQuery service. Once enabled, it can allow you to query a BigQuery table.

To install the plugin, run the following command:

```bash
pip install flytekitplugins-bigquery
```

To configure BigQuery in the Flyte deployment's backend, follow the [configuration guide](https://docs.flyte.org/en/latest/deployment/plugin_setup/gcp/bigquery.html#deployment-plugin-setup-gcp-bigquery).
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .task import BigQueryConfig, BigQueryTask
82 changes: 82 additions & 0 deletions plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional, Type

from google.cloud import bigquery
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Struct

from flytekit import StructuredDataset
from flytekit.extend import SerializationSettings, SQLTask
from flytekit.models import task as _task_model


@dataclass
class BigQueryConfig(object):
"""
BigQueryConfig should be used to configure a BigQuery Task.
"""

ProjectID: str
Location: Optional[str] = None
QueryJobConfig: Optional[bigquery.QueryJobConfig] = None


class BigQueryTask(SQLTask[BigQueryConfig]):
"""
This is the simplest form of a BigQuery Task, that can be used even for tasks that do not produce any output.
"""

# This task is executed using the BigQuery handler in the backend.
# https://github.com/flyteorg/flyteplugins/blob/43623826fb189fa64dc4cb53e7025b517d911f22/go/tasks/plugins/webapi/bigquery/plugin.go#L34
_TASK_TYPE = "bigquery_query_job_task"

def __init__(
self,
name: str,
query_template: str,
task_config: Optional[BigQueryConfig],
inputs: Optional[Dict[str, Type]] = None,
output_structured_dataset_type: Optional[Type[StructuredDataset]] = None,
**kwargs,
):
"""
To be used to query BigQuery Tables.
:param name: Name of this task, should be unique in the project
:param query_template: The actual query to run. We use Flyte's Golang templating format for Query templating.
Refer to the templating documentation
:param task_config: BigQueryConfig object
:param inputs: Name and type of inputs specified as an ordered dictionary
:param output_structured_dataset_type: If some data is produced by this query, then you can specify the output StructuredDataset type
:param kwargs: All other args required by Parent type - SQLTask
"""
outputs = None
if output_structured_dataset_type is not None:
outputs = {
"results": output_structured_dataset_type,
}
super().__init__(
name=name,
task_config=task_config,
query_template=query_template,
inputs=inputs,
outputs=outputs,
task_type=self._TASK_TYPE,
**kwargs,
)
self._output_structured_dataset_type = output_structured_dataset_type

def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
config = {
"Location": self.task_config.Location,
"ProjectID": self.task_config.ProjectID,
}
if self.task_config.QueryJobConfig is not None:
config.update(self.task_config.QueryJobConfig.to_api_repr()["query"])
s = Struct()
s.update(config)
return json_format.MessageToDict(s)

def get_sql(self, settings: SerializationSettings) -> Optional[_task_model.Sql]:
sql = _task_model.Sql(statement=self.query_template, dialect=_task_model.Sql.Dialect.ANSI)
return sql
2 changes: 2 additions & 0 deletions plugins/flytekit-bigquery/requirements.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.
-e file:.#egg=flytekitplugins-bigquery
202 changes: 202 additions & 0 deletions plugins/flytekit-bigquery/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
#
# This file is autogenerated by pip-compile with python 3.9
# To update, run:
#
# pip-compile requirements.in
#
-e file:.#egg=flytekitplugins-bigquery
# via -r requirements.in
arrow==1.2.1
# via jinja2-time
binaryornot==0.4.4
# via cookiecutter
cachetools==4.2.4
# via google-auth
certifi==2021.10.8
# via requests
chardet==4.0.0
# via binaryornot
charset-normalizer==2.0.7
# via requests
checksumdir==1.2.0
# via flytekit
click==7.1.2
# via
# cookiecutter
# flytekit
cloudpickle==2.0.0
# via flytekit
cookiecutter==1.7.3
# via flytekit
croniter==1.0.15
# via flytekit
dataclasses-json==0.5.6
# via flytekit
decorator==5.1.0
# via retry
deprecated==1.2.13
# via flytekit
diskcache==5.2.1
# via flytekit
docker-image-py==0.1.12
# via flytekit
docstring-parser==0.12
# via flytekit
flyteidl==0.21.8
# via flytekit
flytekit==0.24.0
# via flytekitplugins-bigquery
google-api-core[grpc]==2.3.2
# via
# google-cloud-bigquery
# google-cloud-core
google-auth==2.3.3
# via
# google-api-core
# google-cloud-core
google-cloud-bigquery==2.31.0
# via flytekitplugins-bigquery
google-cloud-core==2.2.1
# via google-cloud-bigquery
google-crc32c==1.3.0
# via google-resumable-media
google-resumable-media==2.1.0
# via google-cloud-bigquery
googleapis-common-protos==1.54.0
# via
# google-api-core
# grpcio-status
grpcio==1.43.0
# via
# flytekit
# google-api-core
# google-cloud-bigquery
# grpcio-status
grpcio-status==1.43.0
# via google-api-core
idna==3.3
# via requests
importlib-metadata==4.8.2
# via keyring
jinja2==3.0.3
# via
# cookiecutter
# jinja2-time
jinja2-time==0.2.0
# via cookiecutter
keyring==23.2.1
# via flytekit
markupsafe==2.0.1
# via jinja2
marshmallow==3.14.0
# via
# dataclasses-json
# marshmallow-enum
# marshmallow-jsonschema
marshmallow-enum==1.5.1
# via dataclasses-json
marshmallow-jsonschema==0.13.0
# via flytekit
mypy-extensions==0.4.3
# via typing-inspect
natsort==8.0.0
# via flytekit
numpy==1.21.4
# via
# pandas
# pyarrow
packaging==21.3
# via google-cloud-bigquery
pandas==1.3.4
# via flytekit
poyo==0.5.0
# via cookiecutter
proto-plus==1.19.8
# via google-cloud-bigquery
protobuf==3.19.1
# via
# flyteidl
# flytekit
# google-api-core
# google-cloud-bigquery
# googleapis-common-protos
# grpcio-status
# proto-plus
py==1.11.0
# via retry
pyarrow==6.0.0
# via flytekit
pyasn1==0.4.8
# via
# pyasn1-modules
# rsa
pyasn1-modules==0.2.8
# via google-auth
pyparsing==3.0.6
# via packaging
python-dateutil==2.8.1
# via
# arrow
# croniter
# flytekit
# google-cloud-bigquery
# pandas
python-json-logger==2.0.2
# via flytekit
python-slugify==5.0.2
# via cookiecutter
pytimeparse==1.1.8
# via flytekit
pytz==2018.4
# via
# flytekit
# pandas
regex==2021.11.10
# via docker-image-py
requests==2.26.0
# via
# cookiecutter
# flytekit
# google-api-core
# google-cloud-bigquery
# responses
responses==0.15.0
# via flytekit
retry==0.9.2
# via flytekit
rsa==4.8
# via google-auth
six==1.16.0
# via
# cookiecutter
# flytekit
# google-auth
# grpcio
# python-dateutil
# responses
sortedcontainers==2.4.0
# via flytekit
statsd==3.3.0
# via flytekit
text-unidecode==1.3
# via python-slugify
typing-extensions==3.10.0.2
# via typing-inspect
typing-inspect==0.7.1
# via dataclasses-json
urllib3==1.26.7
# via
# flytekit
# requests
# responses
wheel==0.37.0
# via flytekit
wrapt==1.13.3
# via
# deprecated
# flytekit
zipp==3.6.0
# via importlib-metadata

# The following packages are considered to be unsafe in a requirements file:
# setuptools
36 changes: 36 additions & 0 deletions plugins/flytekit-bigquery/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from setuptools import setup

PLUGIN_NAME = "bigquery"

microlib_name = f"flytekitplugins-{PLUGIN_NAME}"

plugin_requires = ["flytekit>=v0.30.0b3,<1.0.0", "google-cloud-bigquery"]

__version__ = "0.0.0+develop"

setup(
name=microlib_name,
version=__version__,
author="flyteorg",
author_email="[email protected]",
description="This package holds the Bigquery plugins for flytekit",
namespace_packages=["flytekitplugins"],
packages=[f"flytekitplugins.{PLUGIN_NAME}"],
install_requires=plugin_requires,
license="apache2",
python_requires=">=3.7",
classifiers=[
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Topic :: Scientific/Engineering",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Software Development",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
)
Empty file.
Loading

0 comments on commit 9916e77

Please sign in to comment.