Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Delta table partition watermarks #1694

Merged
merged 19 commits into from
Mar 9, 2022

Conversation

harmw
Copy link
Contributor

@harmw harmw commented Feb 1, 2022

Summary of Changes

Introducing watermark detection on Delta tables. Since [1] I lost track of this one around the time of going monorepo, here it is once more 🙈

[1] amundsen-io/amundsendatabuilder#427

Tests

Testing for watermarks.

Documentation

Nothing.

CheckList

Make sure you have checked all steps below to ensure a timely review.

  • PR title addresses the issue accurately and concisely. Example: "Updates the version of Flask to v1.0.2"
  • PR includes a summary of changes.
  • PR adds unit tests, updates existing unit tests, OR documents why no test additions or modifications are needed.
  • In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain docstrings that explain what it does

@harmw harmw requested a review from a team as a code owner February 1, 2022 13:52
@boring-cyborg boring-cyborg bot added area:databuilder From databuilder folder category:models labels Feb 1, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Feb 1, 2022

Congratulations on your first Pull Request and welcome to Amundsen community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/amundsen-io/amundsen/blob/main/CONTRIBUTING.md)

So now we can run pytest on a fresh clone.

Due to the rather old version this will throw some DeprecationWarning
messages, but we can upgrade to 3.1 at a later stage.

Signed-off-by: Harm Weites <[email protected]>
Signed-off-by: Harm Weites <[email protected]>
Going with the first item of the returned list will return the same
column, which is not deterministic at all (given there are multiple
partitions).

Signed-off-by: Harm Weites <[email protected]>
Signed-off-by: Harm Weites <[email protected]>
Since watermarking strings doesn't make much sense, keep to checking
integer/float/date/datetime types.

Signed-off-by: Harm Weites <[email protected]>
Signed-off-by: Harm Weites <[email protected]>
@@ -13,3 +13,4 @@ pytest-cov>=2.12.0
pytest-env>=0.6.2
pytest-mock>=3.6.1
typed-ast>=1.4.3
pyspark==3.0.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it need to be a hard pin or will this work with >= ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we had pyspark deps in setup.py?

@@ -425,3 +431,96 @@ def is_array_type(self, delta_type: Any) -> bool:

def is_map_type(self, delta_type: Any) -> bool:
return isinstance(delta_type, MapType)

def create_table_watermarks(self, table: ScrapedTableMetadata) -> Union[List[Tuple[Optional[Watermark],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it doesn't really matter which watermark is high and which low so this method could just return Optional[List[Watermark]] which would be more readable and simple. wdyt?

@@ -206,6 +207,11 @@ def _get_extract_iter(self) -> Iterator[Union[TableMetadata, TableLastUpdated, N
continue
else:
yield self.create_table_metadata(scraped_table)
watermarks = self.create_table_watermarks(scraped_table)
if watermarks:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in regards to comment on line 435 I would just do:

for watermark in watermarks:
    yield watermark

@mgorsk1
Copy link
Contributor

mgorsk1 commented Feb 1, 2022

by going through this code I realized that this is actually just SparkCatalogExtractor, not particularly DeltaExtractor. Wdyt about generalizing it @harmw @feng-tao @samshuster

@feng-tao
Copy link
Member

feng-tao commented Feb 1, 2022

@mgorsk1 i don't think so given the extractor is for delta only which requires a databricks cluster to execute. I think we could have a separate extractor for standalone spark but let's not change the scope of this pr.

@mgorsk1
Copy link
Contributor

mgorsk1 commented Feb 1, 2022

I didn't mean it to be in the scope for this PR, there is just not much code here that makes it delta specific, just iterating over sparkCatalog databases and tables which might as well be for alternative hive metastore extraction.

@samshuster
Copy link
Contributor

samshuster commented Feb 1, 2022 via email

@harmw
Copy link
Contributor Author

harmw commented Feb 9, 2022

hm, cool, interesting 🤔 I simply picked up from where I left over a year ago, get the watermarks in and have it (Amundsen) be more valuable to how we're using it in our shop 😂

Dropping the changes in this PR in favour of something new (extracting-without-spark) sounds pretty reasonable, but I personally don't have time to do that in a rather timely fashion :(

Would it make sense to get these changes in (after resolving the code review notes) and work on the break-out in a separate PR? And if yes, would that necessitate an RFC of some sorts?

@samshuster
Copy link
Contributor

samshuster commented Feb 9, 2022 via email

@feng-tao
Copy link
Member

agree, we shouldn't block this pr instead should just file a github issue for enhancement.

btw, could you fix the lint:

flake8 .
./tests/unit/extractor/test_deltalake_extractor.py:62: [E501] line too long (125 > 120 characters)
./tests/unit/extractor/test_deltalake_extractor.py:366: [E501] line too long (128 > 120 characters)
./tests/unit/extractor/test_deltalake_extractor.py:392: [E501] line too long (127 > 120 characters)
./databuilder/extractor/delta_lake_metadata_extractor.py:435: [C901] 'DeltaLakeMetadataExtractor.create_table_watermarks' is too complex (12)
make: *** [lint] Error 1
Makefile:11: recipe for target 'lint' failed

I will take a look

There are scenarios where a dataset exists, but is empty. In this case
.first() will fail.

Signed-off-by: Harm Weites <[email protected]>
Signed-off-by: Harm Weites <[email protected]>
Signed-off-by: Harm Weites <[email protected]>
This reverts commit 06b9fc3.

Working with this as part of job.launch() brings errors, where the
original code would bring the desired result.

Signed-off-by: Harm Weites <[email protected]>
@feng-tao
Copy link
Member

flake8 .
mypy .
databuilder/extractor/delta_lake_metadata_extractor.py:213: error: Value of type "Watermark" is not indexable
databuilder/extractor/delta_lake_metadata_extractor.py:214: error: Value of type "Watermark" is not indexable
databuilder/extractor/delta_lake_metadata_extractor.py:503: error: Argument 2 to "filter" has incompatible type "Optional[List[ScrapedColumnMetadata]]"; expected "Iterable[ScrapedColumnMetadata]"
databuilder/extractor/delta_lake_metadata_extractor.py:530: error: Incompatible return value type (got "List[Tuple[Watermark, Watermark]]", expected "Optional[List[Watermark]]")
tests/unit/extractor/test_deltalake_extractor.py:369: error: Argument 1 to "create_table_watermarks" of "DeltaLakeMetadataExtractor" has incompatible type "Optional[ScrapedTableMetadata]"; expected "ScrapedTableMetadata"
tests/unit/extractor/test_deltalake_extractor.py:370: error: Argument 1 to "len" has incompatible type "Optional[List[Watermark]]"; expected "Sized"
tests/unit/extractor/test_deltalake_extractor.py:371: error: Value of type "Optional[List[Watermark]]" is not indexable
tests/unit/extractor/test_deltalake_extractor.py:371: error: Argument 1 to "len" has incompatible type "Union[Watermark, Any]"; expected "Sized"
tests/unit/extractor/test_deltalake_extractor.py:372: error: Value of type "Optional[List[Watermark]]" is not indexable
tests/unit/extractor/test_deltalake_extractor.py:372: error: Value of type "Union[Watermark, Any]" is not indexable
tests/unit/extractor/test_deltalake_extractor.py:396: error: Argument 1 to "create_table_watermarks" of "DeltaLakeMetadataExtractor" has incompatible type "Optional[ScrapedTableMetadata]"; expected "ScrapedTableMetadata"
tests/unit/extractor/test_deltalake_extractor.py:397: error: Argument 1 to "len" has incompatible type "Optional[List[Watermark]]"; expected "Sized"
tests/unit/extractor/test_deltalake_extractor.py:398: error: Value of type "Optional[List[Watermark]]" is not indexable
tests/unit/extractor/test_deltalake_extractor.py:398: error: Argument 1 to "len" has incompatible type "Union[Watermark, Any]"; expected "Sized"
tests/unit/extractor/test_deltalake_extractor.py:399: error: Value of type "Optional[List[Watermark]]" is not indexable
tests/unit/extractor/test_deltalake_extractor.py:399: error: Value of type "Union[Watermark, Any]" is not indexable
tests/unit/extractor/test_deltalake_extractor.py:439: error: Argument 1 to "create_table_watermarks" of "DeltaLakeMetadataExtractor" has incompatible type "Optional[ScrapedTableMetadata]"; expected "ScrapedTableMetadata"
Found 17 errors in 2 files (checked 361 source files)

Signed-off-by: Harm Weites <[email protected]>
@harmw
Copy link
Contributor Author

harmw commented Mar 1, 2022

@feng-tao should be all good now, finally 😅

First-time contributors need a maintainer to approve running workflows.

Is this new?

@feng-tao
Copy link
Member

feng-tao commented Mar 2, 2022

@harmw , almost there :) :

 from collections import namedtuple
 from datetime import datetime
 from typing import (  # noqa: F401
-    Any, Dict, Iterator, List, Optional, Union, Tuple,
+    Any, Dict, Iterator, List, Optional, Tuple, Union,
 )
 
 from pyhocon import ConfigFactory, ConfigTree  # noqa: F401
Skipped 2 files
make: *** [isort_check] Error 1

Signed-off-by: Harm Weites <[email protected]>
@harmw
Copy link
Contributor Author

harmw commented Mar 2, 2022

completely ignore the contribution guidelines, my bad - should be good now, the steps in the Makefile succeeded. Future addition could be make all support, but whatever, all good now 🙂

@feng-tao
Copy link
Member

feng-tao commented Mar 8, 2022

trigger the CI button again, will merge once it is green

@feng-tao feng-tao merged commit f9c0eeb into amundsen-io:main Mar 9, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Mar 9, 2022

Awesome work, congrats on your first merged pull request!

@harmw harmw deleted the more-delta-lake branch March 10, 2022 08:22
ozandogrultan pushed a commit to deliveryhero/amundsen that referenced this pull request Apr 28, 2022
* Install pyspark for dev work

So now we can run pytest on a fresh clone.

Due to the rather old version this will throw some DeprecationWarning
messages, but we can upgrade to 3.1 at a later stage.

Signed-off-by: Harm Weites <[email protected]>

* Read watermarks for Delta tables

Signed-off-by: Harm Weites <[email protected]>

* Include tests

Signed-off-by: Harm Weites <[email protected]>

* More proper watermark yielding

Signed-off-by: Harm Weites <[email protected]>

* Select the partition_column

Going with the first item of the returned list will return the same
column, which is not deterministic at all (given there are multiple
partitions).

Signed-off-by: Harm Weites <[email protected]>

* Cut the line length

Signed-off-by: Harm Weites <[email protected]>

* Only process partitions of a workable type

Since watermarking strings doesn't make much sense, keep to checking
integer/float/date/datetime types.

Signed-off-by: Harm Weites <[email protected]>

* Updated tests

Signed-off-by: Harm Weites <[email protected]>

* Oops, the .first() returns a Row object

Signed-off-by: Harm Weites <[email protected]>

* Wrap this extraction in a try/except

There are scenarios where a dataset exists, but is empty. In this case
.first() will fail.

Signed-off-by: Harm Weites <[email protected]>

* Flake8 fixes

Signed-off-by: Harm Weites <[email protected]>

* Simplicity

Signed-off-by: Harm Weites <[email protected]>

* Revert "Simplicity"

This reverts commit 06b9fc3.

Working with this as part of job.launch() brings errors, where the
original code would bring the desired result.

Signed-off-by: Harm Weites <[email protected]>

* Simplicity in return typing

Signed-off-by: Harm Weites <[email protected]>

* There is no complexity here :jedi_hand_wave:

Signed-off-by: Harm Weites <[email protected]>

* Pass the mypy

Signed-off-by: Harm Weites <[email protected]>

* Fix the return type here, finally

Signed-off-by: Harm Weites <[email protected]>

* Fix import sorting order

Signed-off-by: Harm Weites <[email protected]>
Signed-off-by: Ozan Dogrultan <[email protected]>
zacr pushed a commit to SaltIO/amundsen that referenced this pull request May 3, 2022
* Install pyspark for dev work

So now we can run pytest on a fresh clone.

Due to the rather old version this will throw some DeprecationWarning
messages, but we can upgrade to 3.1 at a later stage.

Signed-off-by: Harm Weites <[email protected]>

* Read watermarks for Delta tables

Signed-off-by: Harm Weites <[email protected]>

* Include tests

Signed-off-by: Harm Weites <[email protected]>

* More proper watermark yielding

Signed-off-by: Harm Weites <[email protected]>

* Select the partition_column

Going with the first item of the returned list will return the same
column, which is not deterministic at all (given there are multiple
partitions).

Signed-off-by: Harm Weites <[email protected]>

* Cut the line length

Signed-off-by: Harm Weites <[email protected]>

* Only process partitions of a workable type

Since watermarking strings doesn't make much sense, keep to checking
integer/float/date/datetime types.

Signed-off-by: Harm Weites <[email protected]>

* Updated tests

Signed-off-by: Harm Weites <[email protected]>

* Oops, the .first() returns a Row object

Signed-off-by: Harm Weites <[email protected]>

* Wrap this extraction in a try/except

There are scenarios where a dataset exists, but is empty. In this case
.first() will fail.

Signed-off-by: Harm Weites <[email protected]>

* Flake8 fixes

Signed-off-by: Harm Weites <[email protected]>

* Simplicity

Signed-off-by: Harm Weites <[email protected]>

* Revert "Simplicity"

This reverts commit 06b9fc3.

Working with this as part of job.launch() brings errors, where the
original code would bring the desired result.

Signed-off-by: Harm Weites <[email protected]>

* Simplicity in return typing

Signed-off-by: Harm Weites <[email protected]>

* There is no complexity here :jedi_hand_wave:

Signed-off-by: Harm Weites <[email protected]>

* Pass the mypy

Signed-off-by: Harm Weites <[email protected]>

* Fix the return type here, finally

Signed-off-by: Harm Weites <[email protected]>

* Fix import sorting order

Signed-off-by: Harm Weites <[email protected]>
Signed-off-by: Zachary Ruiz <[email protected]>
zacr pushed a commit to SaltIO/amundsen that referenced this pull request May 13, 2022
* Install pyspark for dev work

So now we can run pytest on a fresh clone.

Due to the rather old version this will throw some DeprecationWarning
messages, but we can upgrade to 3.1 at a later stage.

Signed-off-by: Harm Weites <[email protected]>

* Read watermarks for Delta tables

Signed-off-by: Harm Weites <[email protected]>

* Include tests

Signed-off-by: Harm Weites <[email protected]>

* More proper watermark yielding

Signed-off-by: Harm Weites <[email protected]>

* Select the partition_column

Going with the first item of the returned list will return the same
column, which is not deterministic at all (given there are multiple
partitions).

Signed-off-by: Harm Weites <[email protected]>

* Cut the line length

Signed-off-by: Harm Weites <[email protected]>

* Only process partitions of a workable type

Since watermarking strings doesn't make much sense, keep to checking
integer/float/date/datetime types.

Signed-off-by: Harm Weites <[email protected]>

* Updated tests

Signed-off-by: Harm Weites <[email protected]>

* Oops, the .first() returns a Row object

Signed-off-by: Harm Weites <[email protected]>

* Wrap this extraction in a try/except

There are scenarios where a dataset exists, but is empty. In this case
.first() will fail.

Signed-off-by: Harm Weites <[email protected]>

* Flake8 fixes

Signed-off-by: Harm Weites <[email protected]>

* Simplicity

Signed-off-by: Harm Weites <[email protected]>

* Revert "Simplicity"

This reverts commit 06b9fc3.

Working with this as part of job.launch() brings errors, where the
original code would bring the desired result.

Signed-off-by: Harm Weites <[email protected]>

* Simplicity in return typing

Signed-off-by: Harm Weites <[email protected]>

* There is no complexity here :jedi_hand_wave:

Signed-off-by: Harm Weites <[email protected]>

* Pass the mypy

Signed-off-by: Harm Weites <[email protected]>

* Fix the return type here, finally

Signed-off-by: Harm Weites <[email protected]>

* Fix import sorting order

Signed-off-by: Harm Weites <[email protected]>
hansadriaans pushed a commit to DataChefHQ/amundsen that referenced this pull request Jun 30, 2022
* Install pyspark for dev work

So now we can run pytest on a fresh clone.

Due to the rather old version this will throw some DeprecationWarning
messages, but we can upgrade to 3.1 at a later stage.

Signed-off-by: Harm Weites <[email protected]>

* Read watermarks for Delta tables

Signed-off-by: Harm Weites <[email protected]>

* Include tests

Signed-off-by: Harm Weites <[email protected]>

* More proper watermark yielding

Signed-off-by: Harm Weites <[email protected]>

* Select the partition_column

Going with the first item of the returned list will return the same
column, which is not deterministic at all (given there are multiple
partitions).

Signed-off-by: Harm Weites <[email protected]>

* Cut the line length

Signed-off-by: Harm Weites <[email protected]>

* Only process partitions of a workable type

Since watermarking strings doesn't make much sense, keep to checking
integer/float/date/datetime types.

Signed-off-by: Harm Weites <[email protected]>

* Updated tests

Signed-off-by: Harm Weites <[email protected]>

* Oops, the .first() returns a Row object

Signed-off-by: Harm Weites <[email protected]>

* Wrap this extraction in a try/except

There are scenarios where a dataset exists, but is empty. In this case
.first() will fail.

Signed-off-by: Harm Weites <[email protected]>

* Flake8 fixes

Signed-off-by: Harm Weites <[email protected]>

* Simplicity

Signed-off-by: Harm Weites <[email protected]>

* Revert "Simplicity"

This reverts commit 06b9fc3.

Working with this as part of job.launch() brings errors, where the
original code would bring the desired result.

Signed-off-by: Harm Weites <[email protected]>

* Simplicity in return typing

Signed-off-by: Harm Weites <[email protected]>

* There is no complexity here :jedi_hand_wave:

Signed-off-by: Harm Weites <[email protected]>

* Pass the mypy

Signed-off-by: Harm Weites <[email protected]>

* Fix the return type here, finally

Signed-off-by: Harm Weites <[email protected]>

* Fix import sorting order

Signed-off-by: Harm Weites <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:databuilder From databuilder folder
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants