From 796a00289f5a219e7bcbc88ce675e2296a7bef7a Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 17 Apr 2024 16:53:31 -0700 Subject: [PATCH] fix(ingest/profiling): fix bug in profiler for samples This bug would cause incorrect numbers for the null percentages and distinct percentages. Because of some subtle caching, we would use the full table's row count instead of the sample's row count. --- .../ingestion/source/ge_data_profiler.py | 34 +++++++++++++------ .../source/sql/sql_generic_profiler.py | 2 +- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index d31b65f003426..c04f2f8e5a931 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -689,9 +689,28 @@ def generate_dataset_profile( # noqa: C901 (complexity) logger.debug(f"profiling {self.dataset_name}: flushing stage 1 queries") self.query_combiner.flush() + assert profile.rowCount is not None + full_row_count = profile.rowCount + if self.config.use_sampling and not self.config.limit: self.update_dataset_batch_use_sampling(profile) + # Note that this row count may be different from the full_row_count if we are using sampling. + row_count: int = profile.rowCount + if profile.partitionSpec and "SAMPLE" in profile.partitionSpec.partition: + # Querying exact row count of sample using `_get_dataset_rows`. + # We are not using `self.config.sample_size` directly as the actual row count + # in the sample may be different than configured `sample_size`. For BigQuery, + # we've even seen 160k rows returned for a sample size of 10k. + logger.debug("Recomputing row count for the sample") + + # Note that we can't just call `self._get_dataset_rows(profile)` here because + # there's some sort of caching happening that will return the full table row count + # instead of the sample row count. + row_count = self.dataset.get_row_count(str(self.dataset._table)) + + profile.partitionSpec.partition += f" (sample rows {row_count})" + columns_profiling_queue: List[_SingleColumnSpec] = [] if columns_to_profile: for column in all_columns: @@ -708,16 +727,6 @@ def generate_dataset_profile( # noqa: C901 (complexity) logger.debug(f"profiling {self.dataset_name}: flushing stage 2 queries") self.query_combiner.flush() - assert profile.rowCount is not None - row_count: int # used for null counts calculation - if profile.partitionSpec and "SAMPLE" in profile.partitionSpec.partition: - # Querying exact row count of sample using `_get_dataset_rows`. - # We are not using `self.config.sample_size` directly as actual row count - # in sample may be slightly different (more or less) than configured `sample_size`. - self._get_dataset_rows(profile) - - row_count = profile.rowCount - for column_spec in columns_profiling_queue: column = column_spec.column column_profile = column_spec.column_profile @@ -825,6 +834,10 @@ def generate_dataset_profile( # noqa: C901 (complexity) logger.debug(f"profiling {self.dataset_name}: flushing stage 3 queries") self.query_combiner.flush() + + # Reset the row count to the original value. + profile.rowCount = full_row_count + return profile def init_profile(self): @@ -1274,6 +1287,7 @@ def create_bigquery_temp_table( try: cursor: "BigQueryCursor" = cast("BigQueryCursor", raw_connection.cursor()) try: + logger.debug(f"Creating temporary table for {table_pretty_name}: {bq_sql}") cursor.execute(bq_sql) except Exception as e: if not instance.config.catch_exceptions: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index 4708836d3d259..365539df7a83b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -159,7 +159,7 @@ def get_profile_request( rows_count=table.rows_count, ): logger.debug( - f"Dataset {dataset_name} was not eliagable for profiling due to last_altered, size in bytes or count of rows limit" + f"Dataset {dataset_name} was not eligible for profiling due to last_altered, size in bytes or count of rows limit" ) # Profile only table level if dataset is filtered from profiling # due to size limits alone