diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index d31b65f003426..c04f2f8e5a931 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -689,9 +689,28 @@ def generate_dataset_profile( # noqa: C901 (complexity) logger.debug(f"profiling {self.dataset_name}: flushing stage 1 queries") self.query_combiner.flush() + assert profile.rowCount is not None + full_row_count = profile.rowCount + if self.config.use_sampling and not self.config.limit: self.update_dataset_batch_use_sampling(profile) + # Note that this row count may be different from the full_row_count if we are using sampling. + row_count: int = profile.rowCount + if profile.partitionSpec and "SAMPLE" in profile.partitionSpec.partition: + # Querying exact row count of sample using `_get_dataset_rows`. + # We are not using `self.config.sample_size` directly as the actual row count + # in the sample may be different than configured `sample_size`. For BigQuery, + # we've even seen 160k rows returned for a sample size of 10k. + logger.debug("Recomputing row count for the sample") + + # Note that we can't just call `self._get_dataset_rows(profile)` here because + # there's some sort of caching happening that will return the full table row count + # instead of the sample row count. + row_count = self.dataset.get_row_count(str(self.dataset._table)) + + profile.partitionSpec.partition += f" (sample rows {row_count})" + columns_profiling_queue: List[_SingleColumnSpec] = [] if columns_to_profile: for column in all_columns: @@ -708,16 +727,6 @@ def generate_dataset_profile( # noqa: C901 (complexity) logger.debug(f"profiling {self.dataset_name}: flushing stage 2 queries") self.query_combiner.flush() - assert profile.rowCount is not None - row_count: int # used for null counts calculation - if profile.partitionSpec and "SAMPLE" in profile.partitionSpec.partition: - # Querying exact row count of sample using `_get_dataset_rows`. - # We are not using `self.config.sample_size` directly as actual row count - # in sample may be slightly different (more or less) than configured `sample_size`. - self._get_dataset_rows(profile) - - row_count = profile.rowCount - for column_spec in columns_profiling_queue: column = column_spec.column column_profile = column_spec.column_profile @@ -825,6 +834,10 @@ def generate_dataset_profile( # noqa: C901 (complexity) logger.debug(f"profiling {self.dataset_name}: flushing stage 3 queries") self.query_combiner.flush() + + # Reset the row count to the original value. + profile.rowCount = full_row_count + return profile def init_profile(self): @@ -1274,6 +1287,7 @@ def create_bigquery_temp_table( try: cursor: "BigQueryCursor" = cast("BigQueryCursor", raw_connection.cursor()) try: + logger.debug(f"Creating temporary table for {table_pretty_name}: {bq_sql}") cursor.execute(bq_sql) except Exception as e: if not instance.config.catch_exceptions: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index 4708836d3d259..365539df7a83b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -159,7 +159,7 @@ def get_profile_request( rows_count=table.rows_count, ): logger.debug( - f"Dataset {dataset_name} was not eliagable for profiling due to last_altered, size in bytes or count of rows limit" + f"Dataset {dataset_name} was not eligible for profiling due to last_altered, size in bytes or count of rows limit" ) # Profile only table level if dataset is filtered from profiling # due to size limits alone