Skip to content

Commit

Permalink
feat(ingest): bigquery-beta - Parsing view ddl definition for lineage (
Browse files Browse the repository at this point in the history
…#6187)

Co-authored-by: Shirshanka Das <[email protected]>
  • Loading branch information
treff7es and shirshanka authored Oct 13, 2022
1 parent c7cc0af commit 6e34cd6
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,10 @@ def _process_table(

if self.config.include_table_lineage:
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
table_identifier, self.platform
project_id=project_id,
dataset_name=schema_name,
table=table,
platform=self.platform,
)

table_workunits = self.gen_table_dataset_workunits(
Expand Down Expand Up @@ -665,7 +668,10 @@ def _process_view(
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None
if self.config.include_table_lineage:
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
table_identifier, self.platform
project_id=project_id,
dataset_name=dataset_name,
table=view,
platform=self.platform,
)

view_workunits = self.gen_view_dataset_workunits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ class BigQueryV2Config(BigQueryConfig):
)

number_of_datasets_process_in_batch: int = Field(
default=50,
description="Number of table queried in batch when getting metadata. This is a low leve config propert which should be touched with care. This restriction needed because we query partitions system view which throws error if we try to touch too many tables.",
default=80,
description="Number of table queried in batch when getting metadata. This is a low level config property which should be touched with care. This restriction is needed because we query partitions system view which throws error if we try to touch too many tables.",
)
column_limit: int = Field(
default=1000,
description="Maximum number of columns to process in a table",
default=300,
description="Maximum number of columns to process in a table. This is a low level config property which should be touched with care. This restriction is needed because excessively wide tables can result in failure to ingest the schema.",
)
# The inheritance hierarchy is wonky here, but these options need modifications.
project_id: Optional[str] = Field(
Expand All @@ -72,6 +72,10 @@ class BigQueryV2Config(BigQueryConfig):
default=False,
description="Experimental. Use sql parser to resolve view/table lineage. If there is a view being referenced then bigquery sends both the view as well as underlying tablein the references. There is no distinction between direct/base objects accessed. So doing sql parsing to ensure we only use direct objects accessed for lineage.",
)
lineage_parse_view_ddl: bool = Field(
default=True,
description="Sql parse view ddl to get lineage.",
)

@root_validator(pre=False)
def profile_default_settings(cls, values: Dict) -> Dict:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
)
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigqueryTable,
BigqueryView,
)
from datahub.ingestion.source.bigquery_v2.common import (
BQ_DATE_SHARD_FORMAT,
BQ_DATETIME_FORMAT,
Expand Down Expand Up @@ -396,6 +400,10 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st
continue

destination_table_str = destination_table.table_identifier.get_table_name()
destination_table_str = str(
BigQueryTableRef(table_identifier=destination_table.table_identifier)
)

if not self.config.dataset_pattern.allowed(
destination_table.table_identifier.dataset
) or not self.config.table_pattern.allowed(
Expand Down Expand Up @@ -464,6 +472,47 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st
logger.info("Exiting create lineage map function")
return lineage_map

def parse_view_lineage(
self, project: str, dataset: str, view: BigqueryView
) -> List[BigqueryTableIdentifier]:
parsed_tables = set()
if view.ddl:
try:
parser = BigQuerySQLParser(view.ddl)
tables = parser.get_tables()
except Exception as ex:
logger.debug(
f"View {view.name} definination sql parsing failed on query: {view.ddl}. Edge from physical table to view won't be added. The error was {ex}."
)
return []

for table in tables:
parts = table.split(".")
if len(parts) == 1:
parsed_tables.add(
BigqueryTableIdentifier(
project_id=project, dataset=dataset, table=table
)
)
elif len(parts) == 2:
parsed_tables.add(
BigqueryTableIdentifier(
project_id=project, dataset=parts[0], table=parts[1]
)
)
elif len(parts) == 3:
parsed_tables.add(
BigqueryTableIdentifier(
project_id=parts[0], dataset=parts[1], table=parts[2]
)
)
else:
continue

return list(parsed_tables)
else:
return []

def _compute_bigquery_lineage(self, project_id: str) -> Dict[str, Set[str]]:
lineage_extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(
config=self.config, report=self.report
Expand Down Expand Up @@ -524,11 +573,18 @@ def get_upstream_tables(
)
else:
upstreams.add(upstream_table)

return upstreams

def get_upstream_lineage_info(
self, table_identifier: BigqueryTableIdentifier, platform: str
self,
project_id: str,
dataset_name: str,
table: Union[BigqueryTable, BigqueryView],
platform: str,
) -> Optional[Tuple[UpstreamLineageClass, Dict[str, str]]]:
table_identifier = BigqueryTableIdentifier(project_id, dataset_name, table.name)

if table_identifier.project_id not in self.loaded_project_ids:
with PerfTimer() as timer:
self.lineage_metadata.update(
Expand All @@ -539,6 +595,21 @@ def get_upstream_lineage_info(
)
self.loaded_project_ids.append(table_identifier.project_id)

if self.config.lineage_parse_view_ddl and isinstance(table, BigqueryView):
for table_id in self.parse_view_lineage(project_id, dataset_name, table):
if table_identifier.get_table_name() in self.lineage_metadata:
self.lineage_metadata[
str(
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
)
].add(str(BigQueryTableRef(table_id).get_sanitized_table_ref()))
else:
self.lineage_metadata[
str(
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
)
] = {str(BigQueryTableRef(table_id).get_sanitized_table_ref())}

bq_table = BigQueryTableRef.from_bigquery_table(table_identifier)
if str(bq_table) in self.lineage_metadata:
upstream_list: List[UpstreamClass] = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def generate_partition_profiler_query(
See more about partitioned tables at https://cloud.google.com/bigquery/docs/partitioned-tables
"""
logger.debug(
f"generate partition profiler query for project: {project} schema: {schema} and table {table}, partition_datetime: {partition_datetime}"
f"generate partition profiler query for project: {project} schema: {schema} and table {table.name}, partition_datetime: {partition_datetime}"
)
partition = table.max_partition_id
if partition:
Expand Down
87 changes: 87 additions & 0 deletions metadata-ingestion/tests/unit/test_bigquery_lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import datetime

from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigqueryView
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor


def test_parse_view_lineage():
config = BigQueryV2Config()
report = BigQueryV2Report()
extractor = BigqueryLineageExtractor(config, report)

# ddl = "select * from some_dataset.sometable as a"
ddl = """CREATE VIEW `my-project.my-dataset.test_table`
AS SELECT
* REPLACE(
myrandom(something) AS something)
FROM
`my-project2.my-dataset2.test_physical_table`;
"""
view = BigqueryView(
name="test",
created=datetime.datetime.now(),
last_altered=datetime.datetime.now(),
comment="",
ddl=ddl,
)
tables = extractor.parse_view_lineage("my_project", "my_dataset", view)
assert 1 == len(tables)
assert "my-project2.my-dataset2.test_physical_table" == tables[0].get_table_name()


def test_parse_view_lineage_with_two_part_table_name():
config = BigQueryV2Config()
report = BigQueryV2Report()
extractor = BigqueryLineageExtractor(config, report)

ddl = "CREATE VIEW my_view as select * from some_dataset.sometable as a"
view = BigqueryView(
name="test",
created=datetime.datetime.now(),
last_altered=datetime.datetime.now(),
comment="",
ddl=ddl,
)
tables = extractor.parse_view_lineage("my_project", "my_dataset", view)
assert 1 == len(tables)
assert "my_project.some_dataset.sometable" == tables[0].get_table_name()


def test_one_part_table():
config = BigQueryV2Config()
report = BigQueryV2Report()
extractor = BigqueryLineageExtractor(config, report)

ddl = "CREATE VIEW my_view as select * from sometable as a"
view = BigqueryView(
name="test",
created=datetime.datetime.now(),
last_altered=datetime.datetime.now(),
comment="",
ddl=ddl,
)
tables = extractor.parse_view_lineage("my_project", "my_dataset", view)
assert 1 == len(tables)
assert "my_project.my_dataset.sometable" == tables[0].get_table_name()


def test_create_statement_with_multiple_table():
config = BigQueryV2Config()
report = BigQueryV2Report()
extractor = BigqueryLineageExtractor(config, report)

ddl = "CREATE VIEW my_view as select * from my_project_2.my_dataset_2.sometable union select * from my_project_2.my_dataset_2.sometable2 as a"
view = BigqueryView(
name="test",
created=datetime.datetime.now(),
last_altered=datetime.datetime.now(),
comment="",
ddl=ddl,
)
tables = extractor.parse_view_lineage("my_project", "my_dataset", view)
tables.sort(key=lambda e: e.get_table_name())
assert 2 == len(tables)
assert "my_project_2.my_dataset_2.sometable" == tables[0].get_table_name()
assert "my_project_2.my_dataset_2.sometable2" == tables[1].get_table_name()

0 comments on commit 6e34cd6

Please sign in to comment.