Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Great Expectations Plugin #495

Merged
merged 48 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
1e35de4
great expectations plugin
samhita-alla May 31, 2021
65933f6
add new lines
samhita-alla May 31, 2021
78eb2a6
modify comments
samhita-alla May 31, 2021
ec2caa3
add GEType, test cases, clean up the code
samhita-alla Jun 8, 2021
ef8964b
resolve merge conflict
samhita-alla Jun 8, 2021
2ac96ab
fixed failing multiple args test, added one expectation suite, got ri…
samhita-alla Jun 10, 2021
d79bccf
added support for flytefile and flyteschema
samhita-alla Jun 15, 2021
d4fe004
added support for flytefile and flyteschema
samhita-alla Jun 15, 2021
9bc14fc
replace FlyteContext with ctx in schema
samhita-alla Jun 15, 2021
bb004b6
resolved merge conflict
samhita-alla Jun 17, 2021
9565b17
resolve flake CI errors
samhita-alla Jun 18, 2021
cad4c2d
resolve flake CI errors
samhita-alla Jun 18, 2021
10126ea
flake8 errors
samhita-alla Jun 21, 2021
c579931
add init file in test folder
samhita-alla Jun 21, 2021
f03a7cc
fix GitHub CI
samhita-alla Jun 21, 2021
1a3d313
fix shutil.copytree() issue in Python3.7
samhita-alla Jun 21, 2021
829c0a8
fix shutil.copytree() issue in Python3.7
samhita-alla Jun 21, 2021
f421988
missed directory creation
samhita-alla Jun 21, 2021
57d60d0
remove tuple in local_file_path
samhita-alla Jul 17, 2021
320b719
modify error being captured
samhita-alla Jul 19, 2021
2cc88c3
flyteschema local_path
samhita-alla Jul 19, 2021
87ad91f
file path for FlyteSchema
samhita-alla Jul 19, 2021
6885379
modify getype get_literal_type
samhita-alla Jul 20, 2021
1c480a4
datatype of literal
samhita-alla Jul 20, 2021
95071a7
nomenclature
samhita-alla Jul 23, 2021
c00d6c0
remove unwanted char
samhita-alla Jul 23, 2021
08beb8a
resolve merge conflict
samhita-alla Jul 23, 2021
769f632
stylistic changes
samhita-alla Jul 28, 2021
9f068d4
GreatExpectations to Great Expectations, batchrequest_conf to batch_r…
samhita-alla Jul 28, 2021
3afaa3a
change type from str to Any
samhita-alla Jul 29, 2021
2e8c4dc
remove typevar
samhita-alla Aug 3, 2021
9d31702
replace download_directory with get_data, remove typing redundancy, t…
samhita-alla Aug 4, 2021
59f10f0
ge runtime
samhita-alla Aug 5, 2021
9a022e4
add is_runtime parameter, update requirements
samhita-alla Aug 5, 2021
5f41e77
Merge branch 'master' into great-expectations-plugin
samhita-alla Aug 5, 2021
43e7bf4
comments
samhita-alla Aug 5, 2021
5d78055
resolve merge conflicts
samhita-alla Aug 6, 2021
46adb10
add data_asset, modify runtime logic
samhita-alla Aug 10, 2021
a33777d
Merge remote-tracking branch 'origin/master' into great-expectations-…
samhita-alla Aug 11, 2021
583e8c1
pin GE version to 0.13.23 as test_flytekit_sagemaker_runner.py is fai…
samhita-alla Aug 11, 2021
aa2f432
Rename scripts directory to flytekit_scripts
eapolinario Aug 16, 2021
7908444
Merge remote-tracking branch 'origin/rename-flytekit-scripts-dir' int…
samhita-alla Aug 17, 2021
6825f72
unpin ge version
samhita-alla Aug 17, 2021
33f701a
resolved comments @kumare3
samhita-alla Aug 17, 2021
a43cc45
lint
samhita-alla Aug 17, 2021
3c88583
lint
samhita-alla Aug 17, 2021
d1da1e3
lint
samhita-alla Aug 17, 2021
85c408d
error handling in schema.py
samhita-alla Aug 17, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions flytekit/types/file/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,9 @@
Can be used to receive or return an SVGImage. The underlying type is a FlyteFile, type. This is just a
decoration and useful for attaching content type information with the file and automatically documenting code.
"""

CSVFile = FlyteFile[typing.TypeVar("csv")]
samhita-alla marked this conversation as resolved.
Show resolved Hide resolved
"""
Can be used to receive or return a CSVFile. The underlying type is a FlyteFile, type. This is just a
decoration and useful for attaching content type information with the file and automatically documenting code.
"""
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .schema import GreatExpectationsFlyteConfig, GreatExpectationsType # noqa: F401
from .task import BatchRequestConfig, GreatExpectationsTask # noqa: F401
Original file line number Diff line number Diff line change
@@ -0,0 +1,327 @@
import datetime
import logging
import os
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple, Type, Union

import great_expectations as ge
from dataclasses_json import dataclass_json
from great_expectations.checkpoint import SimpleCheckpoint
from great_expectations.core.run_identifier import RunIdentifier
from great_expectations.core.util import convert_to_json_serializable
from great_expectations.exceptions import ValidationError

from flytekit import FlyteContext
from flytekit.extend import TypeEngine, TypeTransformer
from flytekit.models import types as _type_models
from flytekit.models.literals import Literal, Primitive, Scalar
from flytekit.models.types import LiteralType
from flytekit.types.file.file import FlyteFile, FlyteFilePathTransformer
from flytekit.types.schema.types import FlyteSchema, FlyteSchemaTransformer, SchemaOpenMode

from .task import BatchRequestConfig


@dataclass_json
@dataclass
class GreatExpectationsFlyteConfig(object):
"""
Use this configuration to configure GreatExpectations Plugin.

Args:
datasource_name: tell where your data lives and how to get it
expectation_suite_name: suite which consists of the data expectations
data_connector_name: connector to identify data batches
data_asset_name: name of the data asset (to be used for RuntimeBatchRequest)
local_file_path: dataset file path useful for FlyteFile and FlyteSchema
checkpoint_params: optional SimpleCheckpoint parameters
batch_request_config: batchrequest config
context_root_dir: directory in which GreatExpectations' configuration resides
"""

datasource_name: str
expectation_suite_name: str
data_connector_name: str
data_asset_name: Optional[str] = None
"""
local_file_path is a must in two scenrios:
* When using FlyteSchema
* When using FlyteFile for remote paths
This is because base directory which has the dataset file 'must' be given in GreatExpectations' config file
"""
local_file_path: Optional[str] = None
checkpoint_params: Optional[Dict[str, Union[str, List[str]]]] = None
batch_request_config: BatchRequestConfig = None
context_root_dir: str = "./great_expectations"


class GreatExpectationsType(object):
"""
Use this class to send the GreatExpectationsFlyteConfig.

Args:
config: GreatExpectations Plugin configuration

TODO: Connect Data Docs to Flyte Console.
"""

@classmethod
def config(cls) -> Tuple[Type, Type[GreatExpectationsFlyteConfig]]:
return (
str,
GreatExpectationsFlyteConfig(datasource_name="", data_connector_name="", expectation_suite_name=""),
)

def __class_getitem__(cls, config: Tuple[Type, GreatExpectationsFlyteConfig]) -> Any:
if not (isinstance(config, tuple) or len(config) != 2):
raise AssertionError("GreatExpectationsType must have both datatype and GreatExpectationsFlyteConfig")

class _GreatExpectationsTypeClass(GreatExpectationsType):
__origin__ = GreatExpectationsType

@classmethod
def config(cls) -> Tuple[Type, Type[GreatExpectationsFlyteConfig]]:
return config

return _GreatExpectationsTypeClass


class GreatExpectationsTypeTransformer(TypeTransformer[GreatExpectationsType]):
def __init__(self):
super().__init__(name="GreatExpectations Transformer", t=GreatExpectationsType)

@staticmethod
def get_config(t: Type[GreatExpectationsType]) -> Tuple[Type, Type[GreatExpectationsFlyteConfig]]:
return t.config()

def get_literal_type(self, t: Type[GreatExpectationsType]) -> LiteralType:
datatype = GreatExpectationsTypeTransformer.get_config(t)[0]

if issubclass(datatype, str):
return LiteralType(simple=_type_models.SimpleType.STRING, metadata={})
elif issubclass(datatype, FlyteFile):
return FlyteFilePathTransformer().get_literal_type(datatype)
elif issubclass(datatype, FlyteSchema):
return FlyteSchemaTransformer().get_literal_type(datatype)
else:
raise TypeError(f"{datatype} is not a supported type")

def to_literal(
self,
ctx: FlyteContext,
python_val: Union[FlyteFile, FlyteSchema, str],
python_type: Type[GreatExpectationsType],
expected: LiteralType,
) -> Literal:
datatype = GreatExpectationsTypeTransformer.get_config(python_type)[0]

if issubclass(datatype, FlyteSchema):
return FlyteSchemaTransformer().to_literal(ctx, python_val, datatype, expected)
elif issubclass(datatype, FlyteFile):
return FlyteFilePathTransformer().to_literal(ctx, python_val, datatype, expected)
elif issubclass(datatype, str):
return Literal(scalar=Scalar(primitive=Primitive(string_value=python_val)))
else:
raise TypeError(f"{datatype} is not a supported type")

def _flyte_schema(
self, is_runtime: bool, ctx: FlyteContext, ge_conf: GreatExpectationsFlyteConfig, lv: Literal
) -> (FlyteSchema, str):
temp_dataset = ""

# if data batch is to be generated, skip copying the parquet file
if not is_runtime:
if not ge_conf.local_file_path:
raise ValueError("local_file_path is missing!")

# copy parquet file to user-given directory
ctx.file_access.get_data(lv.scalar.schema.uri, ge_conf.local_file_path, is_multipart=True)

temp_dataset = os.path.basename(ge_conf.local_file_path)

def downloader(x, y):
ctx.file_access.get_data(x, y, is_multipart=True)

return (
FlyteSchema(
local_path=ctx.file_access.get_random_local_directory(),
remote_path=lv.scalar.schema.uri,
downloader=downloader,
supported_mode=SchemaOpenMode.READ,
)
.open()
.all()
), temp_dataset

def _flyte_file(self, ctx: FlyteContext, ge_conf: GreatExpectationsFlyteConfig, lv: Literal) -> (FlyteFile, str):
uri = lv.scalar.blob.uri

# check if the file is remote
if ctx.file_access.is_remote(uri):
if not ge_conf.local_file_path:
raise ValueError("local_file_path is missing!")

if os.path.isdir(ge_conf.local_file_path):
local_path = os.path.join(ge_conf.local_file_path, os.path.basename(uri))
else:
local_path = ge_conf.local_file_path

# download the file into local_file_path
ctx.file_access.get_data(
remote_path=uri,
local_path=local_path,
)

temp_dataset = os.path.basename(uri)

return FlyteFile(uri), temp_dataset

def to_python_value(
self,
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type[GreatExpectationsType],
) -> GreatExpectationsType:
if not (
lv
and lv.scalar
and ((lv.scalar.primitive and lv.scalar.primitive.string_value) or lv.scalar.schema or lv.scalar.blob)
):
raise AssertionError("Can only validate a literal string/FlyteFile/FlyteSchema value")

# fetch the configuration
conf_dict = GreatExpectationsTypeTransformer.get_config(expected_python_type)[1].to_dict()

ge_conf = GreatExpectationsFlyteConfig(**conf_dict)

# fetch the data context
context = ge.data_context.DataContext(ge_conf.context_root_dir)

# determine the type of data connector
selected_datasource = list(filter(lambda x: x["name"] == ge_conf.datasource_name, context.list_datasources()))

if not selected_datasource:
raise ValueError("Datasource doesn't exist!")

data_connector_class_lookup = {
data_connector_name: data_connector_class["class_name"]
for data_connector_name, data_connector_class in selected_datasource[0]["data_connectors"].items()
}

specified_data_connector_class = data_connector_class_lookup[ge_conf.data_connector_name]

is_runtime = False
if specified_data_connector_class == "RuntimeDataConnector":
is_runtime = True
if not ge_conf.data_asset_name:
raise ValueError("data_asset_name has to be given in a RuntimeBatchRequest")

# file path for FlyteSchema and FlyteFile
temp_dataset = ""

# return value
return_dataset = ""

# FlyteSchema
if lv.scalar.schema:
return_dataset, temp_dataset = self._flyte_schema(is_runtime=is_runtime, ctx=ctx, ge_conf=ge_conf, lv=lv)

# FlyteFile
if lv.scalar.blob:
return_dataset, temp_dataset = self._flyte_file(ctx=ctx, ge_conf=ge_conf, lv=lv)

if lv.scalar.primitive:
dataset = return_dataset = lv.scalar.primitive.string_value
else:
dataset = temp_dataset

batch_request_conf = ge_conf.batch_request_config

# minimalistic batch request
final_batch_request = {
"data_asset_name": ge_conf.data_asset_name if is_runtime else dataset,
"datasource_name": ge_conf.datasource_name,
"data_connector_name": ge_conf.data_connector_name,
}

# Great Expectations' RuntimeBatchRequest
if batch_request_conf and (batch_request_conf["runtime_parameters"] or is_runtime):
final_batch_request.update(
{
"runtime_parameters": batch_request_conf["runtime_parameters"]
if batch_request_conf["runtime_parameters"]
else {},
"batch_identifiers": batch_request_conf["batch_identifiers"],
"batch_spec_passthrough": batch_request_conf["batch_spec_passthrough"],
}
)

if is_runtime and lv.scalar.primitive:
final_batch_request["runtime_parameters"]["query"] = dataset
elif is_runtime and lv.scalar.schema:
final_batch_request["runtime_parameters"]["batch_data"] = return_dataset
else:
raise AssertionError("Can only use runtime_parameters for query(str)/schema data")

# Great Expectations' BatchRequest
elif batch_request_conf:
final_batch_request.update(
{
"data_connector_query": batch_request_conf["data_connector_query"],
"batch_spec_passthrough": batch_request_conf["batch_spec_passthrough"],
}
)

checkpoint_config = {
"class_name": "SimpleCheckpoint",
"validations": [
{
"batch_request": final_batch_request,
"expectation_suite_name": ge_conf.expectation_suite_name,
}
],
}

if ge_conf.checkpoint_params:
checkpoint = SimpleCheckpoint(
f"_tmp_checkpoint_{ge_conf.expectation_suite_name}",
context,
**checkpoint_config,
**ge_conf.checkpoint_params,
)
else:
checkpoint = SimpleCheckpoint(
f"_tmp_checkpoint_{ge_conf.expectation_suite_name}", context, **checkpoint_config
)

# identify every run uniquely
run_id = RunIdentifier(
**{
"run_name": ge_conf.datasource_name + "_run",
"run_time": datetime.datetime.utcnow(),
}
)

checkpoint_result = checkpoint.run(run_id=run_id)
final_result = convert_to_json_serializable(checkpoint_result.list_validation_results())[0]

result_string = ""
if final_result["success"] is False:
for every_result in final_result["results"]:
if every_result["success"] is False:
result_string += (
every_result["expectation_config"]["kwargs"]["column"]
+ " -> "
+ every_result["expectation_config"]["expectation_type"]
+ "\n"
)

# raise a Great Expectations' exception
raise ValidationError("Validation failed!\nCOLUMN\t\tFAILED EXPECTATION\n" + result_string)

logging.info("Validation succeeded!")

return return_dataset


TypeEngine.register(GreatExpectationsTypeTransformer())
Loading