Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Delta table partition watermarks #1694

Merged
merged 19 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 107 additions & 5 deletions databuilder/databuilder/extractor/delta_lake_metadata_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from collections import namedtuple
from datetime import datetime
from typing import ( # noqa: F401
Any, Dict, Iterator, List, Optional, Union,
Any, Dict, Iterator, List, Optional, Union, Tuple,
)

from pyhocon import ConfigFactory, ConfigTree # noqa: F401
Expand All @@ -21,6 +21,7 @@
from databuilder.extractor.table_metadata_constants import PARTITION_BADGE
from databuilder.models.table_last_updated import TableLastUpdated
from databuilder.models.table_metadata import ColumnMetadata, TableMetadata
from databuilder.models.watermark import Watermark

TableKey = namedtuple('TableKey', ['schema', 'table_name'])

Expand Down Expand Up @@ -167,7 +168,7 @@ def init(self, conf: ConfigTree) -> None:
def set_spark(self, spark: SparkSession) -> None:
self.spark = spark

def extract(self) -> Union[TableMetadata, TableLastUpdated, None]:
def extract(self) -> Union[TableMetadata, List[Tuple[Watermark, Watermark]], TableLastUpdated, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
Expand All @@ -178,12 +179,13 @@ def extract(self) -> Union[TableMetadata, TableLastUpdated, None]:
def get_scope(self) -> str:
return 'extractor.delta_lake_table_metadata'

def _get_extract_iter(self) -> Iterator[Union[TableMetadata, TableLastUpdated, None]]:
def _get_extract_iter(self) -> Iterator[Union[TableMetadata, List[Tuple[Watermark, Watermark]], TableLastUpdated,
None]]:
"""
Given either a list of schemas, or a list of exclude schemas,
it will query hive metastore and then access delta log
to get all of the metadata for your delta tables. It will produce:
- table and column metadata
- table and column metadata (including partition watermarks)
- last updated information
"""
if self.schema_list:
Expand All @@ -196,7 +198,6 @@ def _get_extract_iter(self) -> Iterator[Union[TableMetadata, TableLastUpdated, N
LOGGER.info("working on %s", schemas)
tables = self.get_all_tables(schemas)
# TODO add the programmatic information as well?
# TODO add watermarks
scraped_tables = self.scrape_all_tables(tables)
for scraped_table in scraped_tables:
if not scraped_table:
Expand All @@ -206,6 +207,10 @@ def _get_extract_iter(self) -> Iterator[Union[TableMetadata, TableLastUpdated, N
continue
else:
yield self.create_table_metadata(scraped_table)
watermarks = self.create_table_watermarks(scraped_table)
if watermarks:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in regards to comment on line 435 I would just do:

for watermark in watermarks:
    yield watermark

for watermark in watermarks:
yield watermark
last_updated = self.create_table_last_updated(scraped_table)
if last_updated:
yield last_updated
Expand Down Expand Up @@ -425,3 +430,100 @@ def is_array_type(self, delta_type: Any) -> bool:

def is_map_type(self, delta_type: Any) -> bool:
return isinstance(delta_type, MapType)

def create_table_watermarks(self, table: ScrapedTableMetadata) -> Optional[List[Watermark]]:
"""
Creates the watermark objects that reflect the highest and lowest values in the partition columns
"""
def _is_show_partitions_supported(t: ScrapedTableMetadata) -> bool:
try:
self.spark.sql(f'show partitions {t.schema}.{t.table}')
return True
except Exception as e:
# pyspark.sql.utils.AnalysisException: SHOW PARTITIONS is not allowed on a table that is not partitioned
LOGGER.warning(e)
return False

def _fetch_minmax(table: ScrapedTableMetadata, partition_column: str) -> Tuple[str, str]:
LOGGER.info(f'Fetching partition info for {partition_column} in {table.schema}.{table.table}')
min_water = ""
max_water = ""
try:
if is_show_partitions_supported:
LOGGER.info('Using SHOW PARTITION')
min_water = str(
self
.spark
.sql(f'show partitions {table.schema}.{table.table}')
.orderBy(partition_column, ascending=True)
.first()[partition_column])
max_water = str(
self
.spark
.sql(f'show partitions {table.schema}.{table.table}')
.orderBy(partition_column, ascending=False)
.first()[partition_column])
else:
LOGGER.info('Using DESCRIBE EXTENDED')
part_info = (self
.spark
.sql(f'describe extended {table.schema}.{table.table} {partition_column}')
.collect()
)
minmax = {}
for mm in list(filter(lambda x: x['info_name'] in ['min', 'max'], part_info)):
minmax[mm['info_name']] = mm['info_value']
min_water = minmax['min']
max_water = minmax['max']
except Exception as e:
LOGGER.warning(f'Failed fetching partition watermarks: {e}')
return max_water, min_water

if not table.table_detail:
LOGGER.info(f'No table details found in {table}, skipping')
return None

if 'partitionColumns' not in table.table_detail or len(table.table_detail['partitionColumns']) < 1:
LOGGER.info(f'No partitions found in {table}, skipping')
return None

is_show_partitions_supported: bool = _is_show_partitions_supported(table)

if not is_show_partitions_supported:
LOGGER.info('Analyzing table, this can take a while...')
partition_columns = ','.join(table.table_detail['partitionColumns'])
self.spark.sql(
f"analyze table {table.schema}.{table.table} compute statistics for columns {partition_columns}")

# It makes little sense to get watermarks from a string value, with no concept of high and low.
# Just imagine a dataset with a partition by country...
valid_types = ['int', 'float', 'date', 'datetime']
columns_with_valid_type = list(map(lambda l: l.name,
filter(lambda l: str(l.data_type).lower() in valid_types, table.columns)
)
)

r = []
for partition_column in table.table_detail['partitionColumns']:
if partition_column not in columns_with_valid_type:
continue

last, first = _fetch_minmax(table, partition_column)
low = Watermark(
create_time=table.table_detail['createdAt'],
database=self._db,
schema=table.schema,
table_name=table.table,
part_name=f'{partition_column}={first}',
part_type='low_watermark',
cluster=self._cluster)
high = Watermark(
create_time=table.table_detail['createdAt'],
database=self._db,
schema=table.schema,
table_name=table.table,
part_name=f'{partition_column}={last}',
part_type='high_watermark',
cluster=self._cluster)
r.append((high, low))
return r
93 changes: 91 additions & 2 deletions databuilder/tests/unit/extractor/test_deltalake_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)
from databuilder.extractor.table_metadata_constants import PARTITION_BADGE
from databuilder.models.table_metadata import ColumnMetadata, TableMetadata
from databuilder.models.watermark import Watermark


class TestDeltaLakeExtractor(unittest.TestCase):
Expand Down Expand Up @@ -57,6 +58,18 @@ def setUpSchemas(self) -> None:
# TODO do we even need to support views and none delta tables in this case?
self.spark.sql("create view if not exists test_schema2.test_view1 as (select * from test_schema2.test_table2)")

self.spark.sql("create table if not exists "
"test_schema2.watermarks_single_partition (date date, value float) using delta partitioned by"
"(date)")
self.spark.sql("insert into test_schema2.watermarks_single_partition values "
"('2020-12-03', 1337), ('2020-12-02', 42), ('2020-12-01', 42), ('2020-12-05', 42),"
"('2020-12-04', 42)")
self.spark.sql("create table if not exists "
"test_schema2.watermarks_multi_partition (date date, spec int, value float) using delta "
"partitioned by (date, spec)")
self.spark.sql("insert into test_schema2.watermarks_multi_partition values "
"('2020-12-03', 1, 1337), ('2020-12-02', 2, 42), ('2020-12-01', 2, 42), ('2020-12-05', 3, 42),"
"('2020-12-04', 1, 42)")
# Nested/Complex schemas
self.spark.sql("create schema if not exists complex_schema")
self.spark.sql("create table if not exists complex_schema.struct_table (a int, struct_col struct<b:string,"
Expand Down Expand Up @@ -205,7 +218,7 @@ def test_extract(self) -> None:
while data is not None:
ret.append(data)
data = self.dExtractor.extract()
self.assertEqual(len(ret), 30)
self.assertEqual(len(ret), 37)

def test_extract_with_only_specific_schemas(self) -> None:
self.config_dict = {
Expand All @@ -222,7 +235,7 @@ def test_extract_with_only_specific_schemas(self) -> None:
while data is not None:
ret.append(data)
data = self.dExtractor.extract()
self.assertEqual(len(ret), 2)
self.assertEqual(len(ret), 9)

def test_extract_when_excluding(self) -> None:
self.config_dict = {
Expand Down Expand Up @@ -350,6 +363,82 @@ def test_scrape_complex_schema_columns(self) -> None:
self.assertEqual(len(expected), len(actual), f"{table_name} failed")
self.assertListEqual(expected, actual, f"{table_name} failed")

def test_create_table_watermarks_single_partition(self) -> None:
scraped_table = self.dExtractor.scrape_table(Table("watermarks_single_partition", "test_schema2", None, "delta",
False))
found = self.dExtractor.create_table_watermarks(scraped_table)
self.assertEqual(1, len(found))
self.assertEqual(2, len(found[0]))
create_time = found[0][0].create_time
expected = [(
Watermark(
create_time=create_time,
database='test_database',
schema='test_schema2',
table_name='watermarks_single_partition',
part_name='date=2020-12-05',
part_type='high_watermark',
cluster='test_cluster'),
Watermark(
create_time=create_time,
database='test_database',
schema='test_schema2',
table_name='watermarks_single_partition',
part_name='date=2020-12-01',
part_type='low_watermark',
cluster='test_cluster')
)]
self.assertEqual(str(expected), str(found))

def test_create_table_watermarks_multi_partition(self) -> None:
scraped_table = self.dExtractor.scrape_table(Table("watermarks_multi_partition", "test_schema2", None, "delta",
False))
found = self.dExtractor.create_table_watermarks(scraped_table)
self.assertEqual(2, len(found))
self.assertEqual(2, len(found[0]))
create_time = found[0][0].create_time
expected = [(
Watermark(
create_time=create_time,
database='test_database',
schema='test_schema2',
table_name='watermarks_multi_partition',
part_name='date=2020-12-05',
part_type='high_watermark',
cluster='test_cluster'),
Watermark(
create_time=create_time,
database='test_database',
schema='test_schema2',
table_name='watermarks_multi_partition',
part_name='date=2020-12-01',
part_type='low_watermark',
cluster='test_cluster')
), (
Watermark(
create_time=create_time,
database='test_database',
schema='test_schema2',
table_name='watermarks_multi_partition',
part_name='spec=3',
part_type='high_watermark',
cluster='test_cluster'),
Watermark(
create_time=create_time,
database='test_database',
schema='test_schema2',
table_name='watermarks_multi_partition',
part_name='spec=1',
part_type='low_watermark',
cluster='test_cluster')
)]
self.assertEqual(str(expected), str(found))

def test_create_table_watermarks_without_partition(self) -> None:
scraped_table = self.dExtractor.scrape_table(Table("test_table1", "test_schema1", None, "delta", False))
found = self.dExtractor.create_table_watermarks(scraped_table)
self.assertIsNone(found)


if __name__ == '__main__':
unittest.main()
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ pytest-cov>=2.12.0
pytest-env>=0.6.2
pytest-mock>=3.6.1
typed-ast>=1.4.3
pyspark==3.0.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it need to be a hard pin or will this work with >= ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we had pyspark deps in setup.py?