Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/sqlglot): add optional default_dialect parameter to sqlglot lineage #10830

Merged
merged 4 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,7 @@ def parse_sql_lineage(
env: str = DEFAULT_ENV,
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
default_dialect: Optional[str] = None,
) -> "SqlParsingResult":
from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage

Expand All @@ -1254,6 +1255,7 @@ def parse_sql_lineage(
schema_resolver=schema_resolver,
default_db=default_db,
default_schema=default_schema,
default_dialect=default_dialect,
)

def create_tag(self, tag_name: str) -> str:
Expand Down
16 changes: 13 additions & 3 deletions metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -843,8 +843,14 @@ def _sqlglot_lineage_inner(
schema_resolver: SchemaResolverInterface,
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
default_dialect: Optional[str] = None,
) -> SqlParsingResult:
dialect = get_dialect(schema_resolver.platform)

if not default_dialect:
dialect = get_dialect(schema_resolver.platform)
else:
dialect = get_dialect(default_dialect)

if is_dialect_instance(dialect, "snowflake"):
# in snowflake, table identifiers must be uppercased to match sqlglot's behavior.
if default_db:
Expand Down Expand Up @@ -1003,6 +1009,7 @@ def sqlglot_lineage(
schema_resolver: SchemaResolverInterface,
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
default_dialect: Optional[str] = None,
) -> SqlParsingResult:
"""Parse a SQL statement and generate lineage information.

Expand All @@ -1020,8 +1027,9 @@ def sqlglot_lineage(
can be brittle with respect to missing schema information and complex
SQL logic like UNNESTs.

The SQL dialect is inferred from the schema_resolver's platform. The
set of supported dialects is the same as sqlglot's. See their
The SQL dialect can be given as an argument called default_dialect or it can
be inferred from the schema_resolver's platform.
The set of supported dialects is the same as sqlglot's. See their
`documentation <https://sqlglot.com/sqlglot/dialects/dialect.html#Dialects>`_
for the full list.

Expand All @@ -1035,6 +1043,7 @@ def sqlglot_lineage(
schema_resolver: The schema resolver to use for resolving table schemas.
default_db: The default database to use for unqualified table names.
default_schema: The default schema to use for unqualified table names.
default_dialect: A default dialect to override the dialect provided by 'schema_resolver'.

Returns:
A SqlParsingResult object containing the parsed lineage information.
Expand All @@ -1059,6 +1068,7 @@ def sqlglot_lineage(
schema_resolver=schema_resolver,
default_db=default_db,
default_schema=default_schema,
default_dialect=default_dialect,
)
except Exception as e:
return SqlParsingResult.make_from_error(e)
Expand Down
Loading