Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Bigquery plugin #789

Merged
merged 9 commits into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ jobs:
matrix:
python-version: ["3.8", "3.9", "3.10"]
plugin-names:
# Please maintain an alphabetical order in the following list
- flytekit-aws-athena
- flytekit-aws-sagemaker
- flytekit-bigquery
- flytekit-data-fsspec
- flytekit-dolt
- flytekit-greatexpectations
Expand All @@ -70,12 +72,12 @@ jobs:
- flytekit-kf-mpi
- flytekit-kf-pytorch
- flytekit-kf-tensorflow
- flytekit-modin
- flytekit-pandera
- flytekit-papermill
- flytekit-snowflake
- flytekit-spark
- flytekit-sqlalchemy
- flytekit-pandera
- flytekit-snowflake
- flytekit-modin
exclude:
# flytekit-modin depends on ray which does not have a 3.10 wheel yet.
# Issue tracked in https://github.com/ray-project/ray/issues/19116.
Expand Down
11 changes: 11 additions & 0 deletions plugins/flytekit-bigquery/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Flytekit BigQuery Plugin

BigQuery enables us to build data-intensive applications without operational burden. Flyte backend can be connected with the BigQuery service. Once enabled, it can allow you to query a BigQuery table.

To install the plugin, run the following command:

```bash
pip install flytekitplugins-bigquery
```

To configure BigQuery in the Flyte deployment's backend, follow the [configuration guide](https://docs.flyte.org/en/latest/deployment/plugin_setup/gcp/bigquery.html#deployment-plugin-setup-gcp-bigquery).
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .task import BigQueryConfig, BigQueryTask
82 changes: 82 additions & 0 deletions plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional, Type

from google.cloud import bigquery
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Struct

from flytekit import StructuredDataset
from flytekit.extend import SerializationSettings, SQLTask
from flytekit.models import task as _task_model


@dataclass
class BigQueryConfig(object):
"""
BigQueryConfig should be used to configure a BigQuery Task.
"""

ProjectID: str
Location: Optional[str] = None
QueryJobConfig: Optional[bigquery.QueryJobConfig] = None


class BigQueryTask(SQLTask[BigQueryConfig]):
"""
This is the simplest form of a BigQuery Task, that can be used even for tasks that do not produce any output.
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
"""

# This task is executed using the BigQuery handler in the backend.
# https://github.com/flyteorg/flyteplugins/blob/43623826fb189fa64dc4cb53e7025b517d911f22/go/tasks/plugins/webapi/bigquery/plugin.go#L34
_TASK_TYPE = "bigquery_query_job_task"
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

def __init__(
self,
name: str,
query_template: str,
task_config: Optional[BigQueryConfig],
inputs: Optional[Dict[str, Type]] = None,
output_structured_dataset_type: Optional[Type[StructuredDataset]] = None,
**kwargs,
):
"""
To be used to query BigQuery Tables.

:param name: Name of this task, should be unique in the project
:param query_template: The actual query to run. We use Flyte's Golang templating format for Query templating.
Refer to the templating documentation
:param task_config: BigQueryConfig object
:param inputs: Name and type of inputs specified as an ordered dictionary
:param output_structured_dataset_type: If some data is produced by this query, then you can specify the output StructuredDataset type
:param kwargs: All other args required by Parent type - SQLTask
"""
outputs = None
if output_structured_dataset_type is not None:
outputs = {
"results": output_structured_dataset_type,
}
super().__init__(
name=name,
task_config=task_config,
query_template=query_template,
inputs=inputs,
outputs=outputs,
task_type=self._TASK_TYPE,
**kwargs,
)
self._output_structured_dataset_type = output_structured_dataset_type

def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
config = {
"Location": self.task_config.Location,
"ProjectID": self.task_config.ProjectID,
}
if self.task_config.QueryJobConfig is not None:
config.update(self.task_config.QueryJobConfig.to_api_repr()["query"])
s = Struct()
s.update(config)
return json_format.MessageToDict(s)

def get_sql(self, settings: SerializationSettings) -> Optional[_task_model.Sql]:
sql = _task_model.Sql(statement=self.query_template, dialect=_task_model.Sql.Dialect.ANSI)
return sql
2 changes: 2 additions & 0 deletions plugins/flytekit-bigquery/requirements.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.
-e file:.#egg=flytekitplugins-bigquery
202 changes: 202 additions & 0 deletions plugins/flytekit-bigquery/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
#
# This file is autogenerated by pip-compile with python 3.9
# To update, run:
#
# pip-compile requirements.in
#
-e file:.#egg=flytekitplugins-bigquery
# via -r requirements.in
arrow==1.2.1
# via jinja2-time
binaryornot==0.4.4
# via cookiecutter
cachetools==4.2.4
# via google-auth
certifi==2021.10.8
# via requests
chardet==4.0.0
# via binaryornot
charset-normalizer==2.0.7
# via requests
checksumdir==1.2.0
# via flytekit
click==7.1.2
# via
# cookiecutter
# flytekit
cloudpickle==2.0.0
# via flytekit
cookiecutter==1.7.3
# via flytekit
croniter==1.0.15
# via flytekit
dataclasses-json==0.5.6
# via flytekit
decorator==5.1.0
# via retry
deprecated==1.2.13
# via flytekit
diskcache==5.2.1
# via flytekit
docker-image-py==0.1.12
# via flytekit
docstring-parser==0.12
# via flytekit
flyteidl==0.21.8
# via flytekit
flytekit==0.24.0
# via flytekitplugins-bigquery
google-api-core[grpc]==2.3.2
# via
# google-cloud-bigquery
# google-cloud-core
google-auth==2.3.3
# via
# google-api-core
# google-cloud-core
google-cloud-bigquery==2.31.0
# via flytekitplugins-bigquery
google-cloud-core==2.2.1
# via google-cloud-bigquery
google-crc32c==1.3.0
# via google-resumable-media
google-resumable-media==2.1.0
# via google-cloud-bigquery
googleapis-common-protos==1.54.0
# via
# google-api-core
# grpcio-status
grpcio==1.43.0
# via
# flytekit
# google-api-core
# google-cloud-bigquery
# grpcio-status
grpcio-status==1.43.0
# via google-api-core
idna==3.3
# via requests
importlib-metadata==4.8.2
# via keyring
jinja2==3.0.3
# via
# cookiecutter
# jinja2-time
jinja2-time==0.2.0
# via cookiecutter
keyring==23.2.1
# via flytekit
markupsafe==2.0.1
# via jinja2
marshmallow==3.14.0
# via
# dataclasses-json
# marshmallow-enum
# marshmallow-jsonschema
marshmallow-enum==1.5.1
# via dataclasses-json
marshmallow-jsonschema==0.13.0
# via flytekit
mypy-extensions==0.4.3
# via typing-inspect
natsort==8.0.0
# via flytekit
numpy==1.21.4
# via
# pandas
# pyarrow
packaging==21.3
# via google-cloud-bigquery
pandas==1.3.4
# via flytekit
poyo==0.5.0
# via cookiecutter
proto-plus==1.19.8
# via google-cloud-bigquery
protobuf==3.19.1
# via
# flyteidl
# flytekit
# google-api-core
# google-cloud-bigquery
# googleapis-common-protos
# grpcio-status
# proto-plus
py==1.11.0
# via retry
pyarrow==6.0.0
# via flytekit
pyasn1==0.4.8
# via
# pyasn1-modules
# rsa
pyasn1-modules==0.2.8
# via google-auth
pyparsing==3.0.6
# via packaging
python-dateutil==2.8.1
# via
# arrow
# croniter
# flytekit
# google-cloud-bigquery
# pandas
python-json-logger==2.0.2
# via flytekit
python-slugify==5.0.2
# via cookiecutter
pytimeparse==1.1.8
# via flytekit
pytz==2018.4
# via
# flytekit
# pandas
regex==2021.11.10
# via docker-image-py
requests==2.26.0
# via
# cookiecutter
# flytekit
# google-api-core
# google-cloud-bigquery
# responses
responses==0.15.0
# via flytekit
retry==0.9.2
# via flytekit
rsa==4.8
# via google-auth
six==1.16.0
# via
# cookiecutter
# flytekit
# google-auth
# grpcio
# python-dateutil
# responses
sortedcontainers==2.4.0
# via flytekit
statsd==3.3.0
# via flytekit
text-unidecode==1.3
# via python-slugify
typing-extensions==3.10.0.2
# via typing-inspect
typing-inspect==0.7.1
# via dataclasses-json
urllib3==1.26.7
# via
# flytekit
# requests
# responses
wheel==0.37.0
# via flytekit
wrapt==1.13.3
# via
# deprecated
# flytekit
zipp==3.6.0
# via importlib-metadata

# The following packages are considered to be unsafe in a requirements file:
# setuptools
36 changes: 36 additions & 0 deletions plugins/flytekit-bigquery/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from setuptools import setup

PLUGIN_NAME = "bigquery"

microlib_name = f"flytekitplugins-{PLUGIN_NAME}"

plugin_requires = ["flytekit>=v0.30.0b3,<1.0.0", "google-cloud-bigquery"]

__version__ = "0.0.0+develop"

setup(
name=microlib_name,
version=__version__,
author="flyteorg",
author_email="[email protected]",
description="This package holds the Bigquery plugins for flytekit",
namespace_packages=["flytekitplugins"],
packages=[f"flytekitplugins.{PLUGIN_NAME}"],
install_requires=plugin_requires,
license="apache2",
python_requires=">=3.7",
classifiers=[
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Topic :: Scientific/Engineering",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Software Development",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
)
Empty file.
Loading