Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/gcp #841

Merged
merged 17 commits into from
Nov 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/changelog/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

0.8.3
-----------------
* Add support for GCS store
* Fix a bug in data-docs' rendering of mostly parameter
* Correct wording for expect_column_proportion_of_unique_values_to_be_between
* Set charset and meta tags to avoid unicode decode error in some browser/backend configurations
Expand All @@ -21,6 +22,7 @@
* Correct a packaging issue resulting in missing css files in tarball release



0.8.2
-----------------
* Add easier support for customizing data-docs css
Expand Down Expand Up @@ -231,7 +233,7 @@ v.0.7.0
Version 0.7 of Great Expectations is HUGE. It introduces several major new features
and a large number of improvements, including breaking API changes.

The core vocabulary of expectations remains consistent. Upgrading to
The core vocabulary of expectations remains consistent. Upgrading to
the new version of GE will primarily require changes to code that
uses data contexts; existing expectation suites will require only changes
to top-level names.
Expand Down Expand Up @@ -310,7 +312,7 @@ v.0.6.0
------------
* Add support for SparkDFDataset and caching (HUGE work from @cselig)
* Migrate distributional expectations to new testing framework
* Add support for two new expectations: expect_column_distinct_values_to_contain_set
* Add support for two new expectations: expect_column_distinct_values_to_contain_set
and expect_column_distinct_values_to_equal_set (thanks @RoyalTS)
* FUTURE BREAKING CHANGE: The new cache mechanism for Datasets, \
when enabled, causes GE to assume that dataset does not change between evaluation of individual expectations. \
Expand Down
4 changes: 2 additions & 2 deletions docs/features/data_docs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ Users can specify

* which datasources to document (by default, all)
* whether to include expectations, validations and profiling results sections
* where the expectations and validations should be read from (filesystem or S3)
* where the HTML files should be written (filesystem or S3)
* where the expectations and validations should be read from (filesystem, S3, or GCS)
* where the HTML files should be written (filesystem, S3, or GCS)
* which renderer and view class should be used to render each section

********************************
Expand Down
2 changes: 1 addition & 1 deletion docs/getting_started/pipeline_integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ Validation Operators
--------------------

Validation Operators and Actions make it possible to define collections of tasks together that should be done after a
validation. For example, we might store results (either on a local filesystem or to S3), send a slack notification,
validation. For example, we might store results (either on a local filesystem, to S3 or GCS), send a slack notification,
and update data documentation. The default configuration performs each of those actions. See the
:ref:`validation_operators_and_actions` for more information.

Expand Down
60 changes: 60 additions & 0 deletions docs/reference/integrations/bigquery.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
.. _BigQuery:

##############
BigQuery
##############

To add a BigQuery datasource do this:

1. Run ``great_expectations add-datasource``
2. Choose the *SQL* option from the menu.
3. When asked which sqlalchemy driver to use enter ``bigquery``.
4. Consult the `PyBigQuery <https://github.com/mxmzdlv/pybigquery`_ docs
for help building a connection string for your BigQuery cluster. It will look
something like this:

.. code-block:: python

"bigquery://project-name"


5. Paste in this connection string and finish out the cli prompts.
6. Should you need to modify your connection string you can manually edit the
``great_expectations/uncommitted/config_variables.yml`` file.

Custom Queries with SQL datasource
==================================

While other backends use temporary tables to generate batches of data from
custom queries, BigQuery does not support ephemeral temporary tables. As a
work-around, GE will create or replace a *permanent table* when the user supplies
a custom query.

Users can specify a table via a Batch Kwarg called ``bigquery_temp_table``:

.. code-block:: python

batch_kwargs = {
"query": "SELECT * FROM `my-project.my_dataset.my_table`",
"bigquery_temp_table": "my_other_dataset.temp_table"
}

Otherwise, default behavior depends on how the pybigquery engine is configured:

If a default BigQuery dataset is defined in the connection string
(for example, ``bigquery://project-name/dataset-name``), and no ``bigquery_temp_table``
Batch Kwarg is supplied, then GE will create a permanent table with a random
UUID in that location (e.g. ``project-name.dataset-name.ge_tmp_1a1b6511_03e6_4e18_a1b2_d85f9e9045c3``).

If a default BigQuery dataset is not defined in the connection string
(for example, ``bigquery://project-name``) and no ``bigquery_temp_table`` Batch Kwawrg
is supplied, then custom queries will fail.


Additional Notes
=================

Follow the `Google Cloud library guide <https://googleapis.dev/python/google-api-core/latest/auth.html>`_
for authentication.

Install the pybigquery package for the BigQuery sqlalchemy dialect (``pip install pybigquery``)
1 change: 1 addition & 0 deletions great_expectations/data_context/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# FilesystemStoreBackend,
FixedLengthTupleFilesystemStoreBackend,
FixedLengthTupleS3StoreBackend,
FixedLengthTupleGCSStoreBackend
)

from .store import (
Expand Down
83 changes: 83 additions & 0 deletions great_expectations/data_context/store/store_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,86 @@ def has_key(self, key):

all_keys = self.list_keys()
return key in all_keys


class FixedLengthTupleGCSStoreBackend(FixedLengthTupleStoreBackend):
"""
Uses a GCS bucket as a store.

The key to this StoreBackend must be a tuple with fixed length equal to key_length.
The filepath_template is a string template used to convert the key to a filepath.
There's a bit of regex magic in _convert_filepath_to_key that reverses this process,
so that we can write AND read using filenames as keys.
"""
def __init__(
self,
root_directory,
filepath_template,
key_length,
bucket,
prefix,
project,
forbidden_substrings=None,
platform_specific_separator=False
):
super(FixedLengthTupleGCSStoreBackend, self).__init__(
root_directory=root_directory,
filepath_template=filepath_template,
key_length=key_length,
forbidden_substrings=forbidden_substrings,
platform_specific_separator=platform_specific_separator
)
self.bucket = bucket
self.prefix = prefix
self.project = project


def _get(self, key):
gcs_object_key = os.path.join(
self.prefix,
self._convert_key_to_filepath(key)
)

from google.cloud import storage
gcs = storage.Client(project=self.project)
bucket = gcs.get_bucket(self.bucket)
gcs_response_object = bucket.get_blob(gcs_object_key)
return gcs_response_object.download_as_string().decode("utf-8")

def _set(self, key, value, content_encoding='utf-8', content_type='application/json'):
gcs_object_key = os.path.join(
self.prefix,
self._convert_key_to_filepath(key)
)

from google.cloud import storage
gcs = storage.Client(project=self.project)
bucket = gcs.get_bucket(self.bucket)
blob = bucket.blob(gcs_object_key)
blob.upload_from_string(value.encode(content_encoding), content_type=content_type)
return gcs_object_key

def list_keys(self):
key_list = []

from google.cloud import storage
gcs = storage.Client(self.project)

for blob in gcs.list_blobs(self.bucket, prefix=self.prefix):
gcs_object_name = blob.name
gcs_object_key = os.path.relpath(
gcs_object_name,
self.prefix,
)

key = self._convert_filepath_to_key(gcs_object_key)
if key:
key_list.append(key)

return key_list

def has_key(self, key):
assert isinstance(key, string_types)

all_keys = self.list_keys()
return key in all_keys
50 changes: 41 additions & 9 deletions great_expectations/dataset/sqlalchemy_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
except ImportError:
snowflake = None

try:
import pybigquery.sqlalchemy_bigquery
except ImportError:
pybigquery = None


class MetaSqlAlchemyDataset(Dataset):

Expand Down Expand Up @@ -200,6 +205,9 @@ def __init__(self, table_name=None, engine=None, connection_string=None,
if custom_sql and not table_name:
# dashes are special characters in most databases so use underscores
table_name = "ge_tmp_" + str(uuid.uuid4()).replace("-", "_")
generated_table_name = table_name
else:
generated_table_name = None

if table_name is None:
raise ValueError("No table_name provided.")
Expand All @@ -226,6 +234,8 @@ def __init__(self, table_name=None, engine=None, connection_string=None,
self.dialect = import_module("snowflake.sqlalchemy.snowdialect")
elif self.engine.dialect.name.lower() == "redshift":
self.dialect = import_module("sqlalchemy_redshift.dialect")
elif self.engine.dialect.name.lower() == "bigquery":
self.dialect = import_module("pybigquery.sqlalchemy_bigquery")
else:
self.dialect = None

Expand All @@ -234,9 +244,17 @@ def __init__(self, table_name=None, engine=None, connection_string=None,
# a user-defined schema
raise ValueError("Cannot specify both schema and custom_sql.")

if custom_sql is not None and self.engine.dialect.name.lower() == "bigquery":
if generated_table_name is not None and self.engine.dialect.dataset_id is None:
raise ValueError("No BigQuery dataset specified. Use biquery_temp_table batch_kwarg or a specify a default dataset in engine url")

if custom_sql:
self.create_temporary_table(table_name, custom_sql)

if generated_table_name is not None and self.engine.dialect.name.lower() == "bigquery":
logger.warning("Created permanent table {table_name}".format(
table_name=table_name))

try:
insp = reflection.Inspector.from_engine(self.engine)
self.columns = insp.get_columns(table_name, schema=schema)
Expand All @@ -258,14 +276,17 @@ def head(self, n=5):
con=self.engine,
chunksize=n
))
except ValueError:
except (ValueError, NotImplementedError):
# it looks like MetaData that is used by pd.read_sql_table
# cannot work on a temp table.
# If it fails, we are trying to get the data using read_sql
head_sql_str = "select * from "
if self._table.schema:
if self._table.schema and self.engine.dialect.name.lower() != "bigquery":
head_sql_str += self._table.schema + "."
head_sql_str += self._table.name
elif self.engine.dialect.name.lower() == "bigquery":
head_sql_str += "`" + self._table.name + "`"
else:
head_sql_str += self._table.name
head_sql_str += " limit {0:d}".format(n)

df = pd.read_sql(head_sql_str, con=self.engine)
Expand Down Expand Up @@ -329,7 +350,7 @@ def get_column_min(self, column, parse_strings_as_datetimes=False):
sa.select([sa.func.min(sa.column(column))]).select_from(
self._table)
).scalar()

def get_column_value_counts(self, column, sort="value", collate=None):
if sort not in ["value", "count", "none"]:
raise ValueError(
Expand Down Expand Up @@ -461,7 +482,7 @@ def get_column_hist(self, column, bins):
)
).label("bin_" + str(len(bins)-1))
)
else:
else:
case_conditions.append(
sa.func.sum(
sa.case(
Expand Down Expand Up @@ -504,7 +525,7 @@ def get_column_count_in_range(self, column, min_val=None, max_val=None, strict_m
max_condition = sa.column(column) < max_val
else:
max_condition = sa.column(column) <= max_val

if min_condition is not None and max_condition is not None:
condition = sa.and_(min_condition, max_condition)
elif min_condition is not None:
Expand All @@ -522,7 +543,7 @@ def get_column_count_in_range(self, column, min_val=None, max_val=None, strict_m
)
) \
.select_from(self._table)

return self.engine.execute(query).scalar()

def create_temporary_table(self, table_name, custom_sql):
Expand All @@ -532,13 +553,17 @@ def create_temporary_table(self, table_name, custom_sql):
It hasn't been tested in all SQL dialects, and may change based on community feedback.
:param custom_sql:
"""
if self.engine.dialect.name == "mysql":

if self.engine.dialect.name.lower() == "bigquery":
stmt = "CREATE OR REPLACE TABLE `{table_name}` AS {custom_sql}".format(
williamjr marked this conversation as resolved.
Show resolved Hide resolved
table_name=table_name, custom_sql=custom_sql)

elif self.engine.dialect.name == "mysql":
stmt = "CREATE TEMPORARY TABLE {table_name} AS {custom_sql}".format(
table_name=table_name, custom_sql=custom_sql)
else:
stmt = "CREATE TEMPORARY TABLE \"{table_name}\" AS {custom_sql}".format(
table_name=table_name, custom_sql=custom_sql)

self.engine.execute(stmt)

def column_reflection_fallback(self):
Expand Down Expand Up @@ -871,6 +896,13 @@ def _get_dialect_regex_fn(self, positive=True):
except (AttributeError, TypeError): # TypeError can occur if the driver was not installed and so is None
pass

try:
# Bigquery
if isinstance(self.engine.dialect, pybigquery.sqlalchemy_bigquery.BigQueryDialect):
return "REGEXP_CONTAINS" if positive else "NOT REGEXP_CONTAINS"
except (AttributeError, TypeError): # TypeError can occur if the driver was not installed and so is None
pass

@MetaSqlAlchemyDataset.column_map_expectation
def expect_column_values_to_match_regex(
self,
Expand Down
25 changes: 17 additions & 8 deletions great_expectations/datasource/generator/table_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ def _get_iterator(self, generator_asset, **kwargs):
else:
raise ValueError("Table name must be of shape '[SCHEMA.]TABLE'. Passed: " + split_generator_asset)
tables = self.inspector.get_table_names(schema=schema_name)
tables.extend(self.inspector.get_view_names(schema=schema_name))
try:
tables.extend(self.inspector.get_view_names(schema=schema_name))
except NotImplementedError:
# Not implemented by bigquery dialect
pass

if table_name in tables:
return iter([
SqlAlchemyDatasourceTableBatchKwargs(
Expand Down Expand Up @@ -162,13 +167,17 @@ def get_available_data_asset_names(self):
if table_name not in known_system_tables
]
)
tables.extend(
[table_name if self.inspector.default_schema_name == schema_name else
schema_name + "." + table_name
for table_name in self.inspector.get_view_names(schema=schema_name)
if table_name not in known_system_tables
]
)
try:
tables.extend(
[table_name if self.inspector.default_schema_name == schema_name else
schema_name + "." + table_name
for table_name in self.inspector.get_view_names(schema=schema_name)
if table_name not in known_system_tables
]
)
except NotImplementedError:
# Not implemented by bigquery dialect
pass

return defined_assets + tables

Expand Down
Loading