From 738ff6201c2053aaa55e3cc2970ba6ff3f80bd93 Mon Sep 17 00:00:00 2001 From: Harm Weites Date: Wed, 9 Mar 2022 18:46:55 +0100 Subject: [PATCH] feat: Delta table partition watermarks (#1694) * Install pyspark for dev work So now we can run pytest on a fresh clone. Due to the rather old version this will throw some DeprecationWarning messages, but we can upgrade to 3.1 at a later stage. Signed-off-by: Harm Weites * Read watermarks for Delta tables Signed-off-by: Harm Weites * Include tests Signed-off-by: Harm Weites * More proper watermark yielding Signed-off-by: Harm Weites * Select the partition_column Going with the first item of the returned list will return the same column, which is not deterministic at all (given there are multiple partitions). Signed-off-by: Harm Weites * Cut the line length Signed-off-by: Harm Weites * Only process partitions of a workable type Since watermarking strings doesn't make much sense, keep to checking integer/float/date/datetime types. Signed-off-by: Harm Weites * Updated tests Signed-off-by: Harm Weites * Oops, the .first() returns a Row object Signed-off-by: Harm Weites * Wrap this extraction in a try/except There are scenarios where a dataset exists, but is empty. In this case .first() will fail. Signed-off-by: Harm Weites * Flake8 fixes Signed-off-by: Harm Weites * Simplicity Signed-off-by: Harm Weites * Revert "Simplicity" This reverts commit 06b9fc39f3da4546321679a66f1724e3e9756932. Working with this as part of job.launch() brings errors, where the original code would bring the desired result. Signed-off-by: Harm Weites * Simplicity in return typing Signed-off-by: Harm Weites * There is no complexity here :jedi_hand_wave: Signed-off-by: Harm Weites * Pass the mypy Signed-off-by: Harm Weites * Fix the return type here, finally Signed-off-by: Harm Weites * Fix import sorting order Signed-off-by: Harm Weites --- .../delta_lake_metadata_extractor.py | 117 +++++++++++++++++- .../extractor/test_deltalake_extractor.py | 103 ++++++++++++++- requirements-dev.txt | 1 + 3 files changed, 214 insertions(+), 7 deletions(-) diff --git a/databuilder/databuilder/extractor/delta_lake_metadata_extractor.py b/databuilder/databuilder/extractor/delta_lake_metadata_extractor.py index 3dec6cbb2..93399b2a1 100644 --- a/databuilder/databuilder/extractor/delta_lake_metadata_extractor.py +++ b/databuilder/databuilder/extractor/delta_lake_metadata_extractor.py @@ -6,7 +6,7 @@ from collections import namedtuple from datetime import datetime from typing import ( # noqa: F401 - Any, Dict, Iterator, List, Optional, Union, + Any, Dict, Iterator, List, Optional, Tuple, Union, ) from pyhocon import ConfigFactory, ConfigTree # noqa: F401 @@ -21,6 +21,7 @@ from databuilder.extractor.table_metadata_constants import PARTITION_BADGE from databuilder.models.table_last_updated import TableLastUpdated from databuilder.models.table_metadata import ColumnMetadata, TableMetadata +from databuilder.models.watermark import Watermark TableKey = namedtuple('TableKey', ['schema', 'table_name']) @@ -167,7 +168,7 @@ def init(self, conf: ConfigTree) -> None: def set_spark(self, spark: SparkSession) -> None: self.spark = spark - def extract(self) -> Union[TableMetadata, TableLastUpdated, None]: + def extract(self) -> Union[TableMetadata, List[Tuple[Watermark, Watermark]], TableLastUpdated, None]: if not self._extract_iter: self._extract_iter = self._get_extract_iter() try: @@ -178,12 +179,13 @@ def extract(self) -> Union[TableMetadata, TableLastUpdated, None]: def get_scope(self) -> str: return 'extractor.delta_lake_table_metadata' - def _get_extract_iter(self) -> Iterator[Union[TableMetadata, TableLastUpdated, None]]: + def _get_extract_iter(self) -> Iterator[Union[TableMetadata, Watermark, TableLastUpdated, + None]]: """ Given either a list of schemas, or a list of exclude schemas, it will query hive metastore and then access delta log to get all of the metadata for your delta tables. It will produce: - - table and column metadata + - table and column metadata (including partition watermarks) - last updated information """ if self.schema_list: @@ -196,7 +198,6 @@ def _get_extract_iter(self) -> Iterator[Union[TableMetadata, TableLastUpdated, N LOGGER.info("working on %s", schemas) tables = self.get_all_tables(schemas) # TODO add the programmatic information as well? - # TODO add watermarks scraped_tables = self.scrape_all_tables(tables) for scraped_table in scraped_tables: if not scraped_table: @@ -206,6 +207,11 @@ def _get_extract_iter(self) -> Iterator[Union[TableMetadata, TableLastUpdated, N continue else: yield self.create_table_metadata(scraped_table) + watermarks = self.create_table_watermarks(scraped_table) + if watermarks: + for watermark in watermarks: + yield watermark[0] + yield watermark[1] last_updated = self.create_table_last_updated(scraped_table) if last_updated: yield last_updated @@ -425,3 +431,104 @@ def is_array_type(self, delta_type: Any) -> bool: def is_map_type(self, delta_type: Any) -> bool: return isinstance(delta_type, MapType) + + def create_table_watermarks(self, table: ScrapedTableMetadata) -> Optional[List[Tuple[Watermark, Watermark]]]: # noqa c901 + """ + Creates the watermark objects that reflect the highest and lowest values in the partition columns + """ + def _is_show_partitions_supported(t: ScrapedTableMetadata) -> bool: + try: + self.spark.sql(f'show partitions {t.schema}.{t.table}') + return True + except Exception as e: + # pyspark.sql.utils.AnalysisException: SHOW PARTITIONS is not allowed on a table that is not partitioned + LOGGER.warning(e) + return False + + def _fetch_minmax(table: ScrapedTableMetadata, partition_column: str) -> Tuple[str, str]: + LOGGER.info(f'Fetching partition info for {partition_column} in {table.schema}.{table.table}') + min_water = "" + max_water = "" + try: + if is_show_partitions_supported: + LOGGER.info('Using SHOW PARTITION') + min_water = str( + self + .spark + .sql(f'show partitions {table.schema}.{table.table}') + .orderBy(partition_column, ascending=True) + .first()[partition_column]) + max_water = str( + self + .spark + .sql(f'show partitions {table.schema}.{table.table}') + .orderBy(partition_column, ascending=False) + .first()[partition_column]) + else: + LOGGER.info('Using DESCRIBE EXTENDED') + part_info = (self + .spark + .sql(f'describe extended {table.schema}.{table.table} {partition_column}') + .collect() + ) + minmax = {} + for mm in list(filter(lambda x: x['info_name'] in ['min', 'max'], part_info)): + minmax[mm['info_name']] = mm['info_value'] + min_water = minmax['min'] + max_water = minmax['max'] + except Exception as e: + LOGGER.warning(f'Failed fetching partition watermarks: {e}') + return max_water, min_water + + if not table.table_detail: + LOGGER.info(f'No table details found in {table}, skipping') + return None + + if 'partitionColumns' not in table.table_detail or len(table.table_detail['partitionColumns']) < 1: + LOGGER.info(f'No partitions found in {table}, skipping') + return None + + is_show_partitions_supported: bool = _is_show_partitions_supported(table) + + if not is_show_partitions_supported: + LOGGER.info('Analyzing table, this can take a while...') + partition_columns = ','.join(table.table_detail['partitionColumns']) + self.spark.sql( + f"analyze table {table.schema}.{table.table} compute statistics for columns {partition_columns}") + + # It makes little sense to get watermarks from a string value, with no concept of high and low. + # Just imagine a dataset with a partition by country... + valid_types = ['int', 'float', 'date', 'datetime'] + if table.columns: + _table_columns = table.columns + else: + _table_columns = [] + columns_with_valid_type = list(map(lambda l: l.name, + filter(lambda l: str(l.data_type).lower() in valid_types, _table_columns) + ) + ) + + r = [] + for partition_column in table.table_detail['partitionColumns']: + if partition_column not in columns_with_valid_type: + continue + + last, first = _fetch_minmax(table, partition_column) + low = Watermark( + create_time=table.table_detail['createdAt'], + database=self._db, + schema=table.schema, + table_name=table.table, + part_name=f'{partition_column}={first}', + part_type='low_watermark', + cluster=self._cluster) + high = Watermark( + create_time=table.table_detail['createdAt'], + database=self._db, + schema=table.schema, + table_name=table.table, + part_name=f'{partition_column}={last}', + part_type='high_watermark', + cluster=self._cluster) + r.append((high, low)) + return r diff --git a/databuilder/tests/unit/extractor/test_deltalake_extractor.py b/databuilder/tests/unit/extractor/test_deltalake_extractor.py index 28ec4847f..a8eaa56af 100644 --- a/databuilder/tests/unit/extractor/test_deltalake_extractor.py +++ b/databuilder/tests/unit/extractor/test_deltalake_extractor.py @@ -17,6 +17,7 @@ ) from databuilder.extractor.table_metadata_constants import PARTITION_BADGE from databuilder.models.table_metadata import ColumnMetadata, TableMetadata +from databuilder.models.watermark import Watermark class TestDeltaLakeExtractor(unittest.TestCase): @@ -57,6 +58,18 @@ def setUpSchemas(self) -> None: # TODO do we even need to support views and none delta tables in this case? self.spark.sql("create view if not exists test_schema2.test_view1 as (select * from test_schema2.test_table2)") + self.spark.sql("create table if not exists " + "test_schema2.watermarks_single_partition (date date, value float) using delta partitioned by" + "(date)") + self.spark.sql("insert into test_schema2.watermarks_single_partition values " + "('2020-12-03', 1337), ('2020-12-02', 42), ('2020-12-01', 42), ('2020-12-05', 42)," + "('2020-12-04', 42)") + self.spark.sql("create table if not exists " + "test_schema2.watermarks_multi_partition (date date, spec int, value float) using delta " + "partitioned by (date, spec)") + self.spark.sql("insert into test_schema2.watermarks_multi_partition values " + "('2020-12-03', 1, 1337), ('2020-12-02', 2, 42), ('2020-12-01', 2, 42), ('2020-12-05', 3, 42)," + "('2020-12-04', 1, 42)") # Nested/Complex schemas self.spark.sql("create schema if not exists complex_schema") self.spark.sql("create table if not exists complex_schema.struct_table (a int, struct_col struct None: while data is not None: ret.append(data) data = self.dExtractor.extract() - self.assertEqual(len(ret), 30) + self.assertEqual(len(ret), 40) def test_extract_with_only_specific_schemas(self) -> None: self.config_dict = { @@ -222,7 +235,7 @@ def test_extract_with_only_specific_schemas(self) -> None: while data is not None: ret.append(data) data = self.dExtractor.extract() - self.assertEqual(len(ret), 2) + self.assertEqual(len(ret), 12) def test_extract_when_excluding(self) -> None: self.config_dict = { @@ -350,6 +363,92 @@ def test_scrape_complex_schema_columns(self) -> None: self.assertEqual(len(expected), len(actual), f"{table_name} failed") self.assertListEqual(expected, actual, f"{table_name} failed") + def test_create_table_watermarks_single_partition(self) -> None: + scraped_table = self.dExtractor.scrape_table(Table("watermarks_single_partition", "test_schema2", None, "delta", + False)) + self.assertIsNotNone(scraped_table) + if scraped_table: + found = self.dExtractor.create_table_watermarks(scraped_table) + self.assertIsNotNone(found) + if found: + self.assertEqual(1, len(found)) + self.assertEqual(2, len(found[0])) + create_time = found[0][0].create_time + expected = [( + Watermark( + create_time=create_time, + database='test_database', + schema='test_schema2', + table_name='watermarks_single_partition', + part_name='date=2020-12-05', + part_type='high_watermark', + cluster='test_cluster'), + Watermark( + create_time=create_time, + database='test_database', + schema='test_schema2', + table_name='watermarks_single_partition', + part_name='date=2020-12-01', + part_type='low_watermark', + cluster='test_cluster') + )] + self.assertEqual(str(expected), str(found)) + + def test_create_table_watermarks_multi_partition(self) -> None: + scraped_table = self.dExtractor.scrape_table(Table("watermarks_multi_partition", "test_schema2", None, "delta", + False)) + self.assertIsNotNone(scraped_table) + if scraped_table: + found = self.dExtractor.create_table_watermarks(scraped_table) + self.assertIsNotNone(found) + if found: + self.assertEqual(2, len(found)) + self.assertEqual(2, len(found[0])) + create_time = found[0][0].create_time + expected = [( + Watermark( + create_time=create_time, + database='test_database', + schema='test_schema2', + table_name='watermarks_multi_partition', + part_name='date=2020-12-05', + part_type='high_watermark', + cluster='test_cluster'), + Watermark( + create_time=create_time, + database='test_database', + schema='test_schema2', + table_name='watermarks_multi_partition', + part_name='date=2020-12-01', + part_type='low_watermark', + cluster='test_cluster') + ), ( + Watermark( + create_time=create_time, + database='test_database', + schema='test_schema2', + table_name='watermarks_multi_partition', + part_name='spec=3', + part_type='high_watermark', + cluster='test_cluster'), + Watermark( + create_time=create_time, + database='test_database', + schema='test_schema2', + table_name='watermarks_multi_partition', + part_name='spec=1', + part_type='low_watermark', + cluster='test_cluster') + )] + self.assertEqual(str(expected), str(found)) + + def test_create_table_watermarks_without_partition(self) -> None: + scraped_table = self.dExtractor.scrape_table(Table("test_table1", "test_schema1", None, "delta", False)) + self.assertIsNotNone(scraped_table) + if scraped_table: + found = self.dExtractor.create_table_watermarks(scraped_table) + self.assertIsNone(found) + if __name__ == '__main__': unittest.main() diff --git a/requirements-dev.txt b/requirements-dev.txt index fa23e2768..25a97fc7a 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -13,3 +13,4 @@ pytest-cov>=2.12.0 pytest-env>=0.6.2 pytest-mock>=3.6.1 typed-ast>=1.4.3 +pyspark==3.0.1