Skip to content

Commit

Permalink
feat: Delta table partition watermarks (amundsen-io#1694)
Browse files Browse the repository at this point in the history
* Install pyspark for dev work

So now we can run pytest on a fresh clone.

Due to the rather old version this will throw some DeprecationWarning
messages, but we can upgrade to 3.1 at a later stage.

Signed-off-by: Harm Weites <[email protected]>

* Read watermarks for Delta tables

Signed-off-by: Harm Weites <[email protected]>

* Include tests

Signed-off-by: Harm Weites <[email protected]>

* More proper watermark yielding

Signed-off-by: Harm Weites <[email protected]>

* Select the partition_column

Going with the first item of the returned list will return the same
column, which is not deterministic at all (given there are multiple
partitions).

Signed-off-by: Harm Weites <[email protected]>

* Cut the line length

Signed-off-by: Harm Weites <[email protected]>

* Only process partitions of a workable type

Since watermarking strings doesn't make much sense, keep to checking
integer/float/date/datetime types.

Signed-off-by: Harm Weites <[email protected]>

* Updated tests

Signed-off-by: Harm Weites <[email protected]>

* Oops, the .first() returns a Row object

Signed-off-by: Harm Weites <[email protected]>

* Wrap this extraction in a try/except

There are scenarios where a dataset exists, but is empty. In this case
.first() will fail.

Signed-off-by: Harm Weites <[email protected]>

* Flake8 fixes

Signed-off-by: Harm Weites <[email protected]>

* Simplicity

Signed-off-by: Harm Weites <[email protected]>

* Revert "Simplicity"

This reverts commit 06b9fc3.

Working with this as part of job.launch() brings errors, where the
original code would bring the desired result.

Signed-off-by: Harm Weites <[email protected]>

* Simplicity in return typing

Signed-off-by: Harm Weites <[email protected]>

* There is no complexity here :jedi_hand_wave:

Signed-off-by: Harm Weites <[email protected]>

* Pass the mypy

Signed-off-by: Harm Weites <[email protected]>

* Fix the return type here, finally

Signed-off-by: Harm Weites <[email protected]>

* Fix import sorting order

Signed-off-by: Harm Weites <[email protected]>
Signed-off-by: Zachary Ruiz <[email protected]>
  • Loading branch information
harmw authored and Zachary Ruiz committed May 3, 2022
1 parent 2379b22 commit c36c4a2
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 7 deletions.
117 changes: 112 additions & 5 deletions databuilder/databuilder/extractor/delta_lake_metadata_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from collections import namedtuple
from datetime import datetime
from typing import ( # noqa: F401
Any, Dict, Iterator, List, Optional, Union,
Any, Dict, Iterator, List, Optional, Tuple, Union,
)

from pyhocon import ConfigFactory, ConfigTree # noqa: F401
Expand All @@ -21,6 +21,7 @@
from databuilder.extractor.table_metadata_constants import PARTITION_BADGE
from databuilder.models.table_last_updated import TableLastUpdated
from databuilder.models.table_metadata import ColumnMetadata, TableMetadata
from databuilder.models.watermark import Watermark

TableKey = namedtuple('TableKey', ['schema', 'table_name'])

Expand Down Expand Up @@ -167,7 +168,7 @@ def init(self, conf: ConfigTree) -> None:
def set_spark(self, spark: SparkSession) -> None:
self.spark = spark

def extract(self) -> Union[TableMetadata, TableLastUpdated, None]:
def extract(self) -> Union[TableMetadata, List[Tuple[Watermark, Watermark]], TableLastUpdated, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
Expand All @@ -178,12 +179,13 @@ def extract(self) -> Union[TableMetadata, TableLastUpdated, None]:
def get_scope(self) -> str:
return 'extractor.delta_lake_table_metadata'

def _get_extract_iter(self) -> Iterator[Union[TableMetadata, TableLastUpdated, None]]:
def _get_extract_iter(self) -> Iterator[Union[TableMetadata, Watermark, TableLastUpdated,
None]]:
"""
Given either a list of schemas, or a list of exclude schemas,
it will query hive metastore and then access delta log
to get all of the metadata for your delta tables. It will produce:
- table and column metadata
- table and column metadata (including partition watermarks)
- last updated information
"""
if self.schema_list:
Expand All @@ -196,7 +198,6 @@ def _get_extract_iter(self) -> Iterator[Union[TableMetadata, TableLastUpdated, N
LOGGER.info("working on %s", schemas)
tables = self.get_all_tables(schemas)
# TODO add the programmatic information as well?
# TODO add watermarks
scraped_tables = self.scrape_all_tables(tables)
for scraped_table in scraped_tables:
if not scraped_table:
Expand All @@ -206,6 +207,11 @@ def _get_extract_iter(self) -> Iterator[Union[TableMetadata, TableLastUpdated, N
continue
else:
yield self.create_table_metadata(scraped_table)
watermarks = self.create_table_watermarks(scraped_table)
if watermarks:
for watermark in watermarks:
yield watermark[0]
yield watermark[1]
last_updated = self.create_table_last_updated(scraped_table)
if last_updated:
yield last_updated
Expand Down Expand Up @@ -425,3 +431,104 @@ def is_array_type(self, delta_type: Any) -> bool:

def is_map_type(self, delta_type: Any) -> bool:
return isinstance(delta_type, MapType)

def create_table_watermarks(self, table: ScrapedTableMetadata) -> Optional[List[Tuple[Watermark, Watermark]]]: # noqa c901
"""
Creates the watermark objects that reflect the highest and lowest values in the partition columns
"""
def _is_show_partitions_supported(t: ScrapedTableMetadata) -> bool:
try:
self.spark.sql(f'show partitions {t.schema}.{t.table}')
return True
except Exception as e:
# pyspark.sql.utils.AnalysisException: SHOW PARTITIONS is not allowed on a table that is not partitioned
LOGGER.warning(e)
return False

def _fetch_minmax(table: ScrapedTableMetadata, partition_column: str) -> Tuple[str, str]:
LOGGER.info(f'Fetching partition info for {partition_column} in {table.schema}.{table.table}')
min_water = ""
max_water = ""
try:
if is_show_partitions_supported:
LOGGER.info('Using SHOW PARTITION')
min_water = str(
self
.spark
.sql(f'show partitions {table.schema}.{table.table}')
.orderBy(partition_column, ascending=True)
.first()[partition_column])
max_water = str(
self
.spark
.sql(f'show partitions {table.schema}.{table.table}')
.orderBy(partition_column, ascending=False)
.first()[partition_column])
else:
LOGGER.info('Using DESCRIBE EXTENDED')
part_info = (self
.spark
.sql(f'describe extended {table.schema}.{table.table} {partition_column}')
.collect()
)
minmax = {}
for mm in list(filter(lambda x: x['info_name'] in ['min', 'max'], part_info)):
minmax[mm['info_name']] = mm['info_value']
min_water = minmax['min']
max_water = minmax['max']
except Exception as e:
LOGGER.warning(f'Failed fetching partition watermarks: {e}')
return max_water, min_water

if not table.table_detail:
LOGGER.info(f'No table details found in {table}, skipping')
return None

if 'partitionColumns' not in table.table_detail or len(table.table_detail['partitionColumns']) < 1:
LOGGER.info(f'No partitions found in {table}, skipping')
return None

is_show_partitions_supported: bool = _is_show_partitions_supported(table)

if not is_show_partitions_supported:
LOGGER.info('Analyzing table, this can take a while...')
partition_columns = ','.join(table.table_detail['partitionColumns'])
self.spark.sql(
f"analyze table {table.schema}.{table.table} compute statistics for columns {partition_columns}")

# It makes little sense to get watermarks from a string value, with no concept of high and low.
# Just imagine a dataset with a partition by country...
valid_types = ['int', 'float', 'date', 'datetime']
if table.columns:
_table_columns = table.columns
else:
_table_columns = []
columns_with_valid_type = list(map(lambda l: l.name,
filter(lambda l: str(l.data_type).lower() in valid_types, _table_columns)
)
)

r = []
for partition_column in table.table_detail['partitionColumns']:
if partition_column not in columns_with_valid_type:
continue

last, first = _fetch_minmax(table, partition_column)
low = Watermark(
create_time=table.table_detail['createdAt'],
database=self._db,
schema=table.schema,
table_name=table.table,
part_name=f'{partition_column}={first}',
part_type='low_watermark',
cluster=self._cluster)
high = Watermark(
create_time=table.table_detail['createdAt'],
database=self._db,
schema=table.schema,
table_name=table.table,
part_name=f'{partition_column}={last}',
part_type='high_watermark',
cluster=self._cluster)
r.append((high, low))
return r
103 changes: 101 additions & 2 deletions databuilder/tests/unit/extractor/test_deltalake_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)
from databuilder.extractor.table_metadata_constants import PARTITION_BADGE
from databuilder.models.table_metadata import ColumnMetadata, TableMetadata
from databuilder.models.watermark import Watermark


class TestDeltaLakeExtractor(unittest.TestCase):
Expand Down Expand Up @@ -57,6 +58,18 @@ def setUpSchemas(self) -> None:
# TODO do we even need to support views and none delta tables in this case?
self.spark.sql("create view if not exists test_schema2.test_view1 as (select * from test_schema2.test_table2)")

self.spark.sql("create table if not exists "
"test_schema2.watermarks_single_partition (date date, value float) using delta partitioned by"
"(date)")
self.spark.sql("insert into test_schema2.watermarks_single_partition values "
"('2020-12-03', 1337), ('2020-12-02', 42), ('2020-12-01', 42), ('2020-12-05', 42),"
"('2020-12-04', 42)")
self.spark.sql("create table if not exists "
"test_schema2.watermarks_multi_partition (date date, spec int, value float) using delta "
"partitioned by (date, spec)")
self.spark.sql("insert into test_schema2.watermarks_multi_partition values "
"('2020-12-03', 1, 1337), ('2020-12-02', 2, 42), ('2020-12-01', 2, 42), ('2020-12-05', 3, 42),"
"('2020-12-04', 1, 42)")
# Nested/Complex schemas
self.spark.sql("create schema if not exists complex_schema")
self.spark.sql("create table if not exists complex_schema.struct_table (a int, struct_col struct<b:string,"
Expand Down Expand Up @@ -205,7 +218,7 @@ def test_extract(self) -> None:
while data is not None:
ret.append(data)
data = self.dExtractor.extract()
self.assertEqual(len(ret), 30)
self.assertEqual(len(ret), 40)

def test_extract_with_only_specific_schemas(self) -> None:
self.config_dict = {
Expand All @@ -222,7 +235,7 @@ def test_extract_with_only_specific_schemas(self) -> None:
while data is not None:
ret.append(data)
data = self.dExtractor.extract()
self.assertEqual(len(ret), 2)
self.assertEqual(len(ret), 12)

def test_extract_when_excluding(self) -> None:
self.config_dict = {
Expand Down Expand Up @@ -350,6 +363,92 @@ def test_scrape_complex_schema_columns(self) -> None:
self.assertEqual(len(expected), len(actual), f"{table_name} failed")
self.assertListEqual(expected, actual, f"{table_name} failed")

def test_create_table_watermarks_single_partition(self) -> None:
scraped_table = self.dExtractor.scrape_table(Table("watermarks_single_partition", "test_schema2", None, "delta",
False))
self.assertIsNotNone(scraped_table)
if scraped_table:
found = self.dExtractor.create_table_watermarks(scraped_table)
self.assertIsNotNone(found)
if found:
self.assertEqual(1, len(found))
self.assertEqual(2, len(found[0]))
create_time = found[0][0].create_time
expected = [(
Watermark(
create_time=create_time,
database='test_database',
schema='test_schema2',
table_name='watermarks_single_partition',
part_name='date=2020-12-05',
part_type='high_watermark',
cluster='test_cluster'),
Watermark(
create_time=create_time,
database='test_database',
schema='test_schema2',
table_name='watermarks_single_partition',
part_name='date=2020-12-01',
part_type='low_watermark',
cluster='test_cluster')
)]
self.assertEqual(str(expected), str(found))

def test_create_table_watermarks_multi_partition(self) -> None:
scraped_table = self.dExtractor.scrape_table(Table("watermarks_multi_partition", "test_schema2", None, "delta",
False))
self.assertIsNotNone(scraped_table)
if scraped_table:
found = self.dExtractor.create_table_watermarks(scraped_table)
self.assertIsNotNone(found)
if found:
self.assertEqual(2, len(found))
self.assertEqual(2, len(found[0]))
create_time = found[0][0].create_time
expected = [(
Watermark(
create_time=create_time,
database='test_database',
schema='test_schema2',
table_name='watermarks_multi_partition',
part_name='date=2020-12-05',
part_type='high_watermark',
cluster='test_cluster'),
Watermark(
create_time=create_time,
database='test_database',
schema='test_schema2',
table_name='watermarks_multi_partition',
part_name='date=2020-12-01',
part_type='low_watermark',
cluster='test_cluster')
), (
Watermark(
create_time=create_time,
database='test_database',
schema='test_schema2',
table_name='watermarks_multi_partition',
part_name='spec=3',
part_type='high_watermark',
cluster='test_cluster'),
Watermark(
create_time=create_time,
database='test_database',
schema='test_schema2',
table_name='watermarks_multi_partition',
part_name='spec=1',
part_type='low_watermark',
cluster='test_cluster')
)]
self.assertEqual(str(expected), str(found))

def test_create_table_watermarks_without_partition(self) -> None:
scraped_table = self.dExtractor.scrape_table(Table("test_table1", "test_schema1", None, "delta", False))
self.assertIsNotNone(scraped_table)
if scraped_table:
found = self.dExtractor.create_table_watermarks(scraped_table)
self.assertIsNone(found)


if __name__ == '__main__':
unittest.main()
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ pytest-cov>=2.12.0
pytest-env>=0.6.2
pytest-mock>=3.6.1
typed-ast>=1.4.3
pyspark==3.0.1

0 comments on commit c36c4a2

Please sign in to comment.