-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/mssql): include stored procedure lineage #11912
Merged
mayurinehate
merged 14 commits into
datahub-project:master
from
mayurinehate:mssql_stored_proc_lineage
Nov 22, 2024
Merged
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
d7e9824
feat(ingest/mssql): include stored procedure lineage
mayurinehate 12002c3
lineage via job and tests
mayurinehate 1b81994
minor updates
mayurinehate 5d3d662
add new test for stored procedure
mayurinehate ac01003
fix lint
mayurinehate 397748e
tweak tests
hsheth2 dda71cd
minor cleanups
hsheth2 bf44d82
Merge branch 'master' into mssql_stored_proc_lineage
hsheth2 85b6acd
make more cll work
hsheth2 3385862
remove aggregator code
hsheth2 bf81203
fix bug in split statements
hsheth2 43aabe4
add is_temp_table check
mayurinehate 1fae14d
testcase for via temp table lineage
mayurinehate 8449f01
revert test changes
mayurinehate File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
84 changes: 84 additions & 0 deletions
84
metadata-ingestion/src/datahub/ingestion/source/sql/mssql/stored_procedure_lineage.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
import logging | ||
from typing import Callable, Iterable, Optional | ||
|
||
from datahub.emitter.mcp import MetadataChangeProposalWrapper | ||
from datahub.ingestion.source.sql.mssql.job_models import StoredProcedure | ||
from datahub.metadata.schema_classes import DataJobInputOutputClass | ||
from datahub.sql_parsing.datajob import to_datajob_input_output | ||
from datahub.sql_parsing.schema_resolver import SchemaResolver | ||
from datahub.sql_parsing.split_statements import split_statements | ||
from datahub.sql_parsing.sql_parsing_aggregator import ( | ||
ObservedQuery, | ||
SqlParsingAggregator, | ||
) | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def parse_procedure_code( | ||
*, | ||
schema_resolver: SchemaResolver, | ||
default_db: Optional[str], | ||
default_schema: Optional[str], | ||
code: str, | ||
is_temp_table: Callable[[str], bool], | ||
raise_: bool = False, | ||
) -> Optional[DataJobInputOutputClass]: | ||
aggregator = SqlParsingAggregator( | ||
platform=schema_resolver.platform, | ||
env=schema_resolver.env, | ||
schema_resolver=schema_resolver, | ||
generate_lineage=True, | ||
generate_queries=False, | ||
generate_usage_statistics=False, | ||
generate_operations=False, | ||
generate_query_subject_fields=False, | ||
generate_query_usage_statistics=False, | ||
is_temp_table=is_temp_table, | ||
) | ||
for query in split_statements(code): | ||
# TODO: We should take into account `USE x` statements. | ||
aggregator.add_observed_query( | ||
observed=ObservedQuery( | ||
default_db=default_db, | ||
default_schema=default_schema, | ||
query=query, | ||
) | ||
) | ||
if aggregator.report.num_observed_queries_failed and raise_: | ||
logger.info(aggregator.report.as_string()) | ||
raise ValueError( | ||
f"Failed to parse {aggregator.report.num_observed_queries_failed} queries." | ||
) | ||
|
||
mcps = list(aggregator.gen_metadata()) | ||
return to_datajob_input_output( | ||
mcps=mcps, | ||
ignore_extra_mcps=True, | ||
) | ||
|
||
|
||
# Is procedure handling generic enough to be added to SqlParsingAggregator? | ||
def generate_procedure_lineage( | ||
*, | ||
schema_resolver: SchemaResolver, | ||
procedure: StoredProcedure, | ||
procedure_job_urn: str, | ||
is_temp_table: Callable[[str], bool] = lambda _: False, | ||
raise_: bool = False, | ||
) -> Iterable[MetadataChangeProposalWrapper]: | ||
if procedure.code: | ||
datajob_input_output = parse_procedure_code( | ||
schema_resolver=schema_resolver, | ||
default_db=procedure.db, | ||
default_schema=procedure.schema, | ||
code=procedure.code, | ||
is_temp_table=is_temp_table, | ||
raise_=raise_, | ||
) | ||
|
||
if datajob_input_output: | ||
yield MetadataChangeProposalWrapper( | ||
entityUrn=procedure_job_urn, | ||
aspect=datajob_input_output, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unnecessarily added \n in middle of identifier names. Multi-line procedure response already contains \n character explicitly, so this additional separator is not needed.