From 980e0bfb12fe4214111197c1ad5b73d077511d9b Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Wed, 5 Jan 2022 21:42:15 +0100 Subject: [PATCH] feat(profiler): using approximate queries for profiling (#3752) --- .../examples/recipes/redshift_to_datahub.yml | 50 ++++++++ .../ingestion/source/ge_data_profiler.py | 114 ++++++++++++++---- 2 files changed, 143 insertions(+), 21 deletions(-) create mode 100644 metadata-ingestion/examples/recipes/redshift_to_datahub.yml diff --git a/metadata-ingestion/examples/recipes/redshift_to_datahub.yml b/metadata-ingestion/examples/recipes/redshift_to_datahub.yml new file mode 100644 index 00000000000000..94e97ca93b2a90 --- /dev/null +++ b/metadata-ingestion/examples/recipes/redshift_to_datahub.yml @@ -0,0 +1,50 @@ +--- +# see https://datahubproject.io/docs/metadata-ingestion/source_docs/redshift for complete documentation +source: + type: "redshift" + config: + # Coordinates + host_port: host:port + database: database_name + options: + connect_args: + sslmode: prefer + # Credentials + username: datahub + password: datahub + #include_tables: true + #include_views: true + #include_table_lineage: true + #default_schema: public + #table_lineage_mode: stl_scan_based + #include_copy_lineage: true + #start_time: 2020-12-15T20:08:23.091Z + #end_time: 2023-12-15T20:08:23.091Z + #profiling: + # enabled: true + # turn_off_expensive_profiling_metrics: false + # limit: 10 + # query_combiner_enabled: true + # max_number_of_fields_to_profile: 8 + # profile_table_level_only: false + # include_field_null_count: true + # include_field_min_value: true + # include_field_max_value: true + # include_field_mean_value: true + # include_field_median_value: true + # include_field_stddev_value: false + # include_field_quantiles: false + # include_field_distinct_value_frequencies: false + # include_field_histogram: false + # include_field_sample_values: false + #profile_pattern: + # allow: + # - "schema.table.column" + # deny: + # - "*.*.*" + +# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation +sink: + type: "datahub-rest" + config: + server: "http://localhost:8080" diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index fffc60cad6dacf..69536ad64653aa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -22,6 +22,8 @@ pass import pydantic +import sqlalchemy as sa +from great_expectations.core.util import convert_to_json_serializable from great_expectations.data_context import BaseDataContext from great_expectations.data_context.types.base import ( DataContextConfig, @@ -34,6 +36,7 @@ from great_expectations.profile.base import OrderedProfilerCardinality, ProfilerDataType from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfilerBase from sqlalchemy.engine import Connection, Engine +from sqlalchemy.exc import ProgrammingError from typing_extensions import Concatenate, ParamSpec from datahub.configuration.common import AllowDenyPattern, ConfigModel @@ -97,6 +100,59 @@ class GEProfilerRequest: batch_kwargs: dict +def get_column_unique_count_patch(self, column): + if self.engine.dialect.name.lower() == "redshift": + element_values = self.engine.execute( + sa.select( + [sa.text(f"APPROXIMATE count(distinct {column})")] # type:ignore + ).select_from(self._table) + ) + return convert_to_json_serializable(element_values.fetchone()[0]) + elif self.engine.dialect.name.lower() == "bigquery": + element_values = self.engine.execute( + sa.select( + [sa.text(f"APPROX_COUNT_DISTINCT ({column})")] # type:ignore + ).select_from(self._table) + ) + return convert_to_json_serializable(element_values.fetchone()[0]) + elif self.engine.dialect.name.lower() == "snowflake": + element_values = self.engine.execute( + sa.select( + [sa.text(f"APPROX_COUNT_DISTINCT({column})")] # type:ignore + ).select_from(self._table) + ) + return convert_to_json_serializable(element_values.fetchone()[0]) + return convert_to_json_serializable( + self.engine.execute( + sa.select([sa.func.count(sa.func.distinct(sa.column(column)))]).select_from( + self._table + ) + ).scalar() + ) + + +def _get_column_quantiles_bigquery_patch( # type:ignore + self, column: str, quantiles: Iterable +) -> list: + quantile_queries = list() + for quantile in quantiles: + quantile_queries.append( + sa.text(f"approx_quantiles({column}, 100) OFFSET [{round(quantile * 100)}]") + ) + + quantiles_query = sa.select(quantile_queries).select_from( # type:ignore + self._table + ) + try: + quantiles_results = self.engine.execute(quantiles_query).fetchone() + return list(quantiles_results) + + except ProgrammingError as pe: + # This treat quantile exception will raise a formatted exception and there won't be any return value here + self._treat_quantiles_exception(pe) + return list() + + class GEProfilingConfig(ConfigModel): enabled: bool = False limit: Optional[int] = None @@ -422,6 +478,7 @@ def _get_dataset_column_quantiles( res = self.dataset.expect_column_quantile_values_to_be_between( column, + allow_relative_error=True, quantile_ranges={ "quantiles": quantiles, "value_ranges": [[None, None]] * len(quantiles), @@ -532,15 +589,23 @@ def generate_dataset_profile( # noqa: C901 (complexity) if self.config.include_field_null_count and non_null_count is not None: null_count = row_count - non_null_count - assert null_count >= 0 + if null_count < 0: + null_count = 0 + column_profile.nullCount = null_count if row_count > 0: column_profile.nullProportion = null_count / row_count + # Sometimes this value is bigger than 1 because of the approx queries + if column_profile.nullProportion > 1: + column_profile.nullProportion = 1 if unique_count is not None: column_profile.uniqueCount = unique_count if non_null_count is not None and non_null_count > 0: column_profile.uniqueProportion = unique_count / non_null_count + # Sometimes this value is bigger than 1 because of the approx queries + if column_profile.uniqueProportion > 1: + column_profile.uniqueProportion = 1 self._get_dataset_column_sample_values(column_profile, column) @@ -704,27 +769,34 @@ def generate_profiles( logger.info( f"Will profile {len(requests)} table(s) with {max_workers} worker(s) - this may take a while" ) + with unittest.mock.patch( + "great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset.get_column_unique_count", + get_column_unique_count_patch, + ): + with unittest.mock.patch( + "great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset._get_column_quantiles_bigquery", + _get_column_quantiles_bigquery_patch, + ): + async_profiles = [ + async_executor.submit( + self._generate_profile_from_request, + query_combiner, + request, + ) + for request in requests + ] + + # Avoid using as_completed so that the results are yielded in the + # same order as the requests. + # for async_profile in concurrent.futures.as_completed(async_profiles): + for async_profile in async_profiles: + yield async_profile.result() + + logger.info( + f"Profiling {len(requests)} table(s) finished in {(timer.elapsed_seconds()):.3f} seconds" + ) - async_profiles = [ - async_executor.submit( - self._generate_profile_from_request, - query_combiner, - request, - ) - for request in requests - ] - - # Avoid using as_completed so that the results are yielded in the - # same order as the requests. - # for async_profile in concurrent.futures.as_completed(async_profiles): - for async_profile in async_profiles: - yield async_profile.result() - - logger.info( - f"Profiling {len(requests)} table(s) finished in {(timer.elapsed_seconds()):.3f} seconds" - ) - - self.report.report_from_query_combiner(query_combiner.report) + self.report.report_from_query_combiner(query_combiner.report) def _generate_profile_from_request( self,