Skip to content

Commit

Permalink
fix(ingestion/lookml): emit dummy sql condition for lookml custom con…
Browse files Browse the repository at this point in the history
…dition tag (#11008)

Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
sid-acryl and hsheth2 authored Jul 31, 2024
1 parent e14dc91 commit 0667470
Show file tree
Hide file tree
Showing 17 changed files with 805 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from functools import lru_cache
from typing import ClassVar, Optional, TextIO, cast
from typing import ClassVar, Optional, TextIO

from liquid import Environment
from liquid.ast import Node
Expand All @@ -25,18 +25,9 @@ def __init__(self, tok: Token, sql_or_lookml_reference: str, filter_name: str):
self.filter_name = filter_name

def render_to_output(self, context: Context, buffer: TextIO) -> Optional[bool]:
filter_value: Optional[str] = cast(
str, context.globals.get(self.filter_name)
) # to silent lint

if filter_value is None:
raise CustomTagException(
f'filter {self.filter_name} value is not provided for "condition" tag'
)

filter_value = filter_value.strip()

buffer.write(f"{self.sql_or_lookml_reference}='{filter_value}'")
# This implementation will make sure that sql parse work correctly if looker condition tag
# is used in lookml sql field
buffer.write(f"{self.sql_or_lookml_reference}='dummy_value'")

return True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
CustomTagException,
create_template,
)
from datahub.ingestion.source.looker.lookml_config import DERIVED_VIEW_PATTERN
from datahub.ingestion.source.looker.str_functions import (
remove_extra_spaces_and_newlines,
)
Expand Down Expand Up @@ -94,6 +95,24 @@ def resolve_liquid_variable(text: str, liquid_variable: Dict[Any, Any]) -> str:
return text


def _complete_incomplete_sql(raw_view: dict, sql: str) -> str:

# Looker supports sql fragments that omit the SELECT and FROM parts of the query
# Add those in if we detect that it is missing
sql_query: str = sql

if not re.search(r"SELECT\s", sql_query, flags=re.I):
# add a SELECT clause at the beginning
sql_query = f"SELECT {sql}"

if not re.search(r"FROM\s", sql_query, flags=re.I):
# add a FROM clause at the end
sql_query = f"{sql_query} FROM {raw_view['name']}"

# Drop ${ and }
return re.sub(DERIVED_VIEW_PATTERN, r"\1", sql_query)


def resolve_liquid_variable_in_view_dict(
raw_view: dict, liquid_variable: Dict[Any, Any]
) -> None:
Expand All @@ -102,14 +121,18 @@ def resolve_liquid_variable_in_view_dict(

for view in raw_view["views"]:
if "sql_table_name" in view:
view["sql_table_name"] = resolve_liquid_variable(
view["datahub_transformed_sql_table_name"] = resolve_liquid_variable(
text=remove_extra_spaces_and_newlines(view["sql_table_name"]),
liquid_variable=liquid_variable,
)
) # keeping original sql_table_name as is to avoid any visualization issue later

if "derived_table" in view and "sql" in view["derived_table"]:
# In sql we don't need to remove the extra spaces as sql parser takes care of extra spaces and \n
# while generating URN from sql
view["derived_table"]["sql"] = resolve_liquid_variable(
view["derived_table"]["datahub_transformed_sql"] = resolve_liquid_variable(
text=view["derived_table"]["sql"], liquid_variable=liquid_variable
) # keeping original sql as is, so that on UI sql will be shown same is it is visible on looker portal

view["derived_table"]["datahub_transformed_sql"] = _complete_incomplete_sql(
raw_view=view, sql=view["derived_table"]["datahub_transformed_sql"]
)
Original file line number Diff line number Diff line change
Expand Up @@ -266,15 +266,25 @@ def sql_table_name(self) -> str:
sql_table_name: Optional[str] = self._get_sql_table_name_field()
# if sql_table_name field is not set then the table name is equal to view-name
if sql_table_name is None:
return self.raw_view[NAME].lower()
sql_table_name = self.raw_view[NAME].lower()

return sql_table_name

def datahub_transformed_sql_table_name(self) -> str:
table_name: Optional[str] = self.raw_view.get(
"datahub_transformed_sql_table_name"
)

if not table_name:
table_name = self.sql_table_name()

# sql_table_name is in the format "${view-name}.SQL_TABLE_NAME"
# remove extra characters
if self._is_dot_sql_table_name_present():
sql_table_name = re.sub(DERIVED_VIEW_PATTERN, r"\1", sql_table_name)
table_name = re.sub(DERIVED_VIEW_PATTERN, r"\1", table_name)

# Some sql_table_name fields contain quotes like: optimizely."group", just remove the quotes
return sql_table_name.replace('"', "").replace("`", "").lower()
return table_name.replace('"', "").replace("`", "").lower()

def derived_table(self) -> Dict[Any, Any]:
"""
Expand All @@ -296,30 +306,21 @@ def explore_source(self) -> Dict[Any, Any]:

return derived_table["explore_source"]

def sql(self, transformed: bool = True) -> str:
def sql(self) -> str:
"""
This function should only be called if is_sql_based_derived_case return true
"""
derived_table = self.derived_table()

# Looker supports sql fragments that omit the SELECT and FROM parts of the query
# Add those in if we detect that it is missing
sql_query: str = derived_table["sql"]

if transformed: # update the original sql attribute only if transformed is true
if not re.search(r"SELECT\s", sql_query, flags=re.I):
# add a SELECT clause at the beginning
sql_query = f"SELECT {sql_query}"
return derived_table["sql"]

if not re.search(r"FROM\s", sql_query, flags=re.I):
# add a FROM clause at the end
sql_query = f"{sql_query} FROM {self.name()}"
# Get the list of tables in the query

# Drop ${ and }
sql_query = re.sub(DERIVED_VIEW_PATTERN, r"\1", sql_query)
def datahub_transformed_sql(self) -> str:
"""
This function should only be called if is_sql_based_derived_case return true
"""
derived_table = self.derived_table()

return sql_query
return derived_table["datahub_transformed_sql"]

def name(self) -> str:
return self.raw_view[NAME]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
from datahub.configuration.git import GitInfo
from datahub.configuration.source_common import EnvConfigMixin
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.looker.looker_config import LookerCommonConfig
from datahub.ingestion.source.looker.looker_connection import LookerConnectionDefinition
from datahub.ingestion.source.looker.looker_config import (
LookerCommonConfig,
LookerConnectionDefinition,
)
from datahub.ingestion.source.looker.looker_lib_wrapper import (
LookerAPI,
LookerAPIConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,7 @@ def from_looker_dict(
view_logic = view_context.view_file.raw_file_content[:max_file_snippet_length]

if view_context.is_sql_based_derived_case():
view_logic = view_context.sql(transformed=False)
# Parse SQL to extract dependencies.
view_logic = view_context.sql()
view_details = ViewProperties(
materialized=False,
viewLogic=view_logic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,21 @@ class AbstractViewUpstream(ABC):
view_context: LookerViewContext
looker_view_id_cache: LookerViewIdCache
config: LookMLSourceConfig
reporter: LookMLSourceReport
ctx: PipelineContext

def __init__(
self,
view_context: LookerViewContext,
looker_view_id_cache: LookerViewIdCache,
config: LookMLSourceConfig,
reporter: LookMLSourceReport,
ctx: PipelineContext,
):
self.view_context = view_context
self.looker_view_id_cache = looker_view_id_cache
self.config = config
self.reporter = reporter
self.ctx = ctx

@abstractmethod
Expand All @@ -244,9 +247,10 @@ def __init__(
view_context: LookerViewContext,
looker_view_id_cache: LookerViewIdCache,
config: LookMLSourceConfig,
reporter: LookMLSourceReport,
ctx: PipelineContext,
):
super().__init__(view_context, looker_view_id_cache, config, ctx)
super().__init__(view_context, looker_view_id_cache, config, reporter, ctx)
# These are the function where we need to catch the response once calculated
self._get_spr = lru_cache(maxsize=1)(self.__get_spr)
self._get_upstream_dataset_urn = lru_cache(maxsize=1)(
Expand All @@ -259,25 +263,14 @@ def __get_spr(self) -> Optional[SqlParsingResult]:
return None

spr = create_lineage_sql_parsed_result(
query=self.view_context.sql(),
query=self.view_context.datahub_transformed_sql(),
default_schema=self.view_context.view_connection.default_schema,
default_db=self.view_context.view_connection.default_db,
platform=self.view_context.view_connection.platform,
platform_instance=self.view_context.view_connection.platform_instance,
env=self.view_context.view_connection.platform_env or self.config.env,
graph=self.ctx.graph,
)

if (
spr.debug_info.table_error is not None
or spr.debug_info.column_error is not None
):
logging.debug(
f"Failed to parsed the sql query. table_error={spr.debug_info.table_error} and "
f"column_error={spr.debug_info.column_error}"
)
return None

return spr

def __get_upstream_dataset_urn(self) -> List[Urn]:
Expand All @@ -286,6 +279,15 @@ def __get_upstream_dataset_urn(self) -> List[Urn]:
if sql_parsing_result is None:
return []

if sql_parsing_result.debug_info.table_error is not None:
self.reporter.report_warning(
title="Table Level Lineage Missing",
message="Error in parsing derived sql",
context=f"View-name: {self.view_context.name()}",
exc=sql_parsing_result.debug_info.table_error,
)
return []

upstream_dataset_urns: List[str] = [
_drop_hive_dot(urn) for urn in sql_parsing_result.in_tables
]
Expand All @@ -306,6 +308,15 @@ def create_fields(self) -> List[ViewField]:
if spr is None:
return []

if spr.debug_info.column_error is not None:
self.reporter.report_warning(
title="Column Level Lineage Missing",
message="Error in parsing derived sql for CLL",
context=f"View-name: {self.view_context.name()}",
exc=spr.debug_info.column_error,
)
return []

fields: List[ViewField] = []

column_lineages: List[ColumnLineageInfo] = (
Expand Down Expand Up @@ -336,6 +347,15 @@ def get_upstream_column_ref(
if sql_parsing_result is None:
return []

if sql_parsing_result.debug_info.column_error is not None:
self.reporter.report_warning(
title="Column Level Lineage Missing",
message="Error in parsing derived sql for CLL",
context=f"View-name: {self.view_context.name()}. "
f"Error: {sql_parsing_result.debug_info.column_error}",
)
return []

upstreams_column_refs: List[ColumnRef] = []
if sql_parsing_result.column_lineage:
for cll in sql_parsing_result.column_lineage:
Expand Down Expand Up @@ -384,9 +404,11 @@ def __init__(
view_context: LookerViewContext,
looker_view_id_cache: LookerViewIdCache,
config: LookMLSourceConfig,
reporter: LookMLSourceReport,
ctx: PipelineContext,
):
super().__init__(view_context, looker_view_id_cache, config, ctx)
super().__init__(view_context, looker_view_id_cache, config, reporter, ctx)

self._get_upstream_dataset_urn = lru_cache(maxsize=1)(
self.__get_upstream_dataset_urn
)
Expand All @@ -402,7 +424,7 @@ def __get_upstream_dataset_urn(self) -> List[str]:
base_folder_path=self.view_context.base_folder_path,
)

# Current view will always be present in cache. The assert will silence the lint
# Current view will always be present in cache. assert will silence the lint
assert current_view_id

# We're creating a "LookerExplore" just to use the urn generator.
Expand Down Expand Up @@ -467,9 +489,10 @@ def __init__(
view_context: LookerViewContext,
looker_view_id_cache: LookerViewIdCache,
config: LookMLSourceConfig,
reporter: LookMLSourceReport,
ctx: PipelineContext,
):
super().__init__(view_context, looker_view_id_cache, config, ctx)
super().__init__(view_context, looker_view_id_cache, config, reporter, ctx)
self.upstream_dataset_urn = None

self._get_upstream_dataset_urn = lru_cache(maxsize=1)(
Expand All @@ -478,9 +501,9 @@ def __init__(

def __get_upstream_dataset_urn(self) -> Urn:
# In regular case view's upstream dataset is either same as view-name or mentioned in "sql_table_name" field
# view_context.sql_table_name() handle this condition to return dataset name
# view_context.datahub_transformed_sql_table_name() handle this condition to return dataset name
qualified_table_name: str = _generate_fully_qualified_name(
sql_table_name=self.view_context.sql_table_name(),
sql_table_name=self.view_context.datahub_transformed_sql_table_name(),
connection_def=self.view_context.view_connection,
reporter=self.view_context.reporter,
)
Expand Down Expand Up @@ -522,20 +545,21 @@ def __init__(
view_context: LookerViewContext,
looker_view_id_cache: LookerViewIdCache,
config: LookMLSourceConfig,
reporter: LookMLSourceReport,
ctx: PipelineContext,
):
super().__init__(view_context, looker_view_id_cache, config, ctx)
super().__init__(view_context, looker_view_id_cache, config, reporter, ctx)
self.upstream_dataset_urn = []

self._get_upstream_dataset_urn = lru_cache(maxsize=1)(
self.__get_upstream_dataset_urn
)

def __get_upstream_dataset_urn(self) -> List[Urn]:
# In this case view_context.sql_table_name() refers to derived view name
# In this case view_context.datahub_transformed_sql_table_name() refers to derived view name
looker_view_id = get_derived_looker_view_id(
qualified_table_name=_generate_fully_qualified_name(
self.view_context.sql_table_name(),
self.view_context.datahub_transformed_sql_table_name(),
self.view_context.view_connection,
self.view_context.reporter,
),
Expand Down Expand Up @@ -591,6 +615,7 @@ def create_view_upstream(
return RegularViewUpstream(
view_context=view_context,
config=config,
reporter=reporter,
ctx=ctx,
looker_view_id_cache=looker_view_id_cache,
)
Expand All @@ -599,6 +624,7 @@ def create_view_upstream(
return DotSqlTableNameViewUpstream(
view_context=view_context,
config=config,
reporter=reporter,
ctx=ctx,
looker_view_id_cache=looker_view_id_cache,
)
Expand All @@ -610,6 +636,7 @@ def create_view_upstream(
return SqlBasedDerivedViewUpstream(
view_context=view_context,
config=config,
reporter=reporter,
ctx=ctx,
looker_view_id_cache=looker_view_id_cache,
)
Expand All @@ -618,6 +645,7 @@ def create_view_upstream(
return NativeDerivedViewUpstream(
view_context=view_context,
config=config,
reporter=reporter,
ctx=ctx,
looker_view_id_cache=looker_view_id_cache,
)
Expand All @@ -631,6 +659,7 @@ def create_view_upstream(
return EmptyImplementation(
view_context=view_context,
config=config,
reporter=reporter,
ctx=ctx,
looker_view_id_cache=looker_view_id_cache,
)
Loading

0 comments on commit 0667470

Please sign in to comment.