Skip to content

Commit

Permalink
feat(profiler): using approximate queries for profiling (#3752)
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored and pull[bot] committed Nov 3, 2023
1 parent b63ffb4 commit 980e0bf
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 21 deletions.
50 changes: 50 additions & 0 deletions metadata-ingestion/examples/recipes/redshift_to_datahub.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
# see https://datahubproject.io/docs/metadata-ingestion/source_docs/redshift for complete documentation
source:
type: "redshift"
config:
# Coordinates
host_port: host:port
database: database_name
options:
connect_args:
sslmode: prefer
# Credentials
username: datahub
password: datahub
#include_tables: true
#include_views: true
#include_table_lineage: true
#default_schema: public
#table_lineage_mode: stl_scan_based
#include_copy_lineage: true
#start_time: 2020-12-15T20:08:23.091Z
#end_time: 2023-12-15T20:08:23.091Z
#profiling:
# enabled: true
# turn_off_expensive_profiling_metrics: false
# limit: 10
# query_combiner_enabled: true
# max_number_of_fields_to_profile: 8
# profile_table_level_only: false
# include_field_null_count: true
# include_field_min_value: true
# include_field_max_value: true
# include_field_mean_value: true
# include_field_median_value: true
# include_field_stddev_value: false
# include_field_quantiles: false
# include_field_distinct_value_frequencies: false
# include_field_histogram: false
# include_field_sample_values: false
#profile_pattern:
# allow:
# - "schema.table.column"
# deny:
# - "*.*.*"

# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"
114 changes: 93 additions & 21 deletions metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
pass

import pydantic
import sqlalchemy as sa
from great_expectations.core.util import convert_to_json_serializable
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
DataContextConfig,
Expand All @@ -34,6 +36,7 @@
from great_expectations.profile.base import OrderedProfilerCardinality, ProfilerDataType
from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfilerBase
from sqlalchemy.engine import Connection, Engine
from sqlalchemy.exc import ProgrammingError
from typing_extensions import Concatenate, ParamSpec

from datahub.configuration.common import AllowDenyPattern, ConfigModel
Expand Down Expand Up @@ -97,6 +100,59 @@ class GEProfilerRequest:
batch_kwargs: dict


def get_column_unique_count_patch(self, column):
if self.engine.dialect.name.lower() == "redshift":
element_values = self.engine.execute(
sa.select(
[sa.text(f"APPROXIMATE count(distinct {column})")] # type:ignore
).select_from(self._table)
)
return convert_to_json_serializable(element_values.fetchone()[0])
elif self.engine.dialect.name.lower() == "bigquery":
element_values = self.engine.execute(
sa.select(
[sa.text(f"APPROX_COUNT_DISTINCT ({column})")] # type:ignore
).select_from(self._table)
)
return convert_to_json_serializable(element_values.fetchone()[0])
elif self.engine.dialect.name.lower() == "snowflake":
element_values = self.engine.execute(
sa.select(
[sa.text(f"APPROX_COUNT_DISTINCT({column})")] # type:ignore
).select_from(self._table)
)
return convert_to_json_serializable(element_values.fetchone()[0])
return convert_to_json_serializable(
self.engine.execute(
sa.select([sa.func.count(sa.func.distinct(sa.column(column)))]).select_from(
self._table
)
).scalar()
)


def _get_column_quantiles_bigquery_patch( # type:ignore
self, column: str, quantiles: Iterable
) -> list:
quantile_queries = list()
for quantile in quantiles:
quantile_queries.append(
sa.text(f"approx_quantiles({column}, 100) OFFSET [{round(quantile * 100)}]")
)

quantiles_query = sa.select(quantile_queries).select_from( # type:ignore
self._table
)
try:
quantiles_results = self.engine.execute(quantiles_query).fetchone()
return list(quantiles_results)

except ProgrammingError as pe:
# This treat quantile exception will raise a formatted exception and there won't be any return value here
self._treat_quantiles_exception(pe)
return list()


class GEProfilingConfig(ConfigModel):
enabled: bool = False
limit: Optional[int] = None
Expand Down Expand Up @@ -422,6 +478,7 @@ def _get_dataset_column_quantiles(

res = self.dataset.expect_column_quantile_values_to_be_between(
column,
allow_relative_error=True,
quantile_ranges={
"quantiles": quantiles,
"value_ranges": [[None, None]] * len(quantiles),
Expand Down Expand Up @@ -532,15 +589,23 @@ def generate_dataset_profile( # noqa: C901 (complexity)

if self.config.include_field_null_count and non_null_count is not None:
null_count = row_count - non_null_count
assert null_count >= 0
if null_count < 0:
null_count = 0

column_profile.nullCount = null_count
if row_count > 0:
column_profile.nullProportion = null_count / row_count
# Sometimes this value is bigger than 1 because of the approx queries
if column_profile.nullProportion > 1:
column_profile.nullProportion = 1

if unique_count is not None:
column_profile.uniqueCount = unique_count
if non_null_count is not None and non_null_count > 0:
column_profile.uniqueProportion = unique_count / non_null_count
# Sometimes this value is bigger than 1 because of the approx queries
if column_profile.uniqueProportion > 1:
column_profile.uniqueProportion = 1

self._get_dataset_column_sample_values(column_profile, column)

Expand Down Expand Up @@ -704,27 +769,34 @@ def generate_profiles(
logger.info(
f"Will profile {len(requests)} table(s) with {max_workers} worker(s) - this may take a while"
)
with unittest.mock.patch(
"great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset.get_column_unique_count",
get_column_unique_count_patch,
):
with unittest.mock.patch(
"great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset._get_column_quantiles_bigquery",
_get_column_quantiles_bigquery_patch,
):
async_profiles = [
async_executor.submit(
self._generate_profile_from_request,
query_combiner,
request,
)
for request in requests
]

# Avoid using as_completed so that the results are yielded in the
# same order as the requests.
# for async_profile in concurrent.futures.as_completed(async_profiles):
for async_profile in async_profiles:
yield async_profile.result()

logger.info(
f"Profiling {len(requests)} table(s) finished in {(timer.elapsed_seconds()):.3f} seconds"
)

async_profiles = [
async_executor.submit(
self._generate_profile_from_request,
query_combiner,
request,
)
for request in requests
]

# Avoid using as_completed so that the results are yielded in the
# same order as the requests.
# for async_profile in concurrent.futures.as_completed(async_profiles):
for async_profile in async_profiles:
yield async_profile.result()

logger.info(
f"Profiling {len(requests)} table(s) finished in {(timer.elapsed_seconds()):.3f} seconds"
)

self.report.report_from_query_combiner(query_combiner.report)
self.report.report_from_query_combiner(query_combiner.report)

def _generate_profile_from_request(
self,
Expand Down

0 comments on commit 980e0bf

Please sign in to comment.