-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingestion/looker): Add view file-path as option in view_naming_pattern config #8713
Changes from all commits
08e5ba3
7bb90d3
de8fddd
851a711
243cc6e
554300b
55ff427
c6bef18
134cf6d
10bea8a
ffccf9a
00269af
ba313df
c3c8cfb
c3fdab9
4a8e2dc
f10c4b6
8ef9026
c239f5d
61f7899
acd56c3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -7,10 +7,26 @@ | |||||
from dataclasses import dataclass, field as dataclasses_field | ||||||
from enum import Enum | ||||||
from functools import lru_cache | ||||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple, Union | ||||||
from typing import ( | ||||||
TYPE_CHECKING, | ||||||
Dict, | ||||||
Iterable, | ||||||
List, | ||||||
Optional, | ||||||
Sequence, | ||||||
Set, | ||||||
Tuple, | ||||||
Union, | ||||||
cast, | ||||||
) | ||||||
|
||||||
from looker_sdk.error import SDKError | ||||||
from looker_sdk.sdk.api40.models import LookmlModelExploreField, User, WriteQuery | ||||||
from looker_sdk.sdk.api40.models import ( | ||||||
LookmlModelExplore, | ||||||
LookmlModelExploreField, | ||||||
User, | ||||||
WriteQuery, | ||||||
) | ||||||
from pydantic.class_validators import validator | ||||||
|
||||||
import datahub.emitter.mce_builder as builder | ||||||
|
@@ -23,6 +39,7 @@ | |||||
LookerCommonConfig, | ||||||
LookerDashboardSourceConfig, | ||||||
NamingPatternMapping, | ||||||
ViewNamingPatternMapping, | ||||||
) | ||||||
from datahub.ingestion.source.looker.looker_constant import IMPORTED_PROJECTS | ||||||
from datahub.ingestion.source.looker.looker_lib_wrapper import LookerAPI | ||||||
|
@@ -93,14 +110,16 @@ class LookerViewId: | |||||
project_name: str | ||||||
model_name: str | ||||||
view_name: str | ||||||
file_path: str | ||||||
|
||||||
def get_mapping(self, config: LookerCommonConfig) -> NamingPatternMapping: | ||||||
return NamingPatternMapping( | ||||||
def get_mapping(self, config: LookerCommonConfig) -> ViewNamingPatternMapping: | ||||||
return ViewNamingPatternMapping( | ||||||
platform=config.platform_name, | ||||||
env=config.env.lower(), | ||||||
project=self.project_name, | ||||||
model=self.model_name, | ||||||
name=self.view_name, | ||||||
file_path=self.file_path, | ||||||
) | ||||||
|
||||||
@validator("view_name") | ||||||
|
@@ -109,10 +128,35 @@ def remove_quotes(cls, v): | |||||
v = v.replace('"', "").replace("`", "") | ||||||
return v | ||||||
|
||||||
def preprocess_file_path(self, file_path: str) -> str: | ||||||
new_file_path: str = str(file_path) | ||||||
|
||||||
str_to_remove: List[str] = [ | ||||||
"\\.view\\.lkml$", # escape the . using \ | ||||||
] | ||||||
|
||||||
for pattern in str_to_remove: | ||||||
new_file_path = re.sub(pattern, "", new_file_path) | ||||||
|
||||||
str_to_replace: Dict[str, str] = { | ||||||
f"^imported_projects/{re.escape(self.project_name)}/": "", # escape any special regex character present in project-name | ||||||
"/": ".", # / is not urn friendly | ||||||
} | ||||||
|
||||||
for pattern in str_to_replace: | ||||||
new_file_path = re.sub(pattern, str_to_replace[pattern], new_file_path) | ||||||
|
||||||
logger.debug(f"Original file path {file_path}") | ||||||
logger.debug(f"After preprocessing file path {new_file_path}") | ||||||
|
||||||
return new_file_path | ||||||
|
||||||
def get_urn(self, config: LookerCommonConfig) -> str: | ||||||
dataset_name = config.view_naming_pattern.replace_variables( | ||||||
self.get_mapping(config) | ||||||
) | ||||||
n_mapping: ViewNamingPatternMapping = self.get_mapping(config) | ||||||
|
||||||
n_mapping.file_path = self.preprocess_file_path(n_mapping.file_path) | ||||||
|
||||||
dataset_name = config.view_naming_pattern.replace_variables(n_mapping) | ||||||
|
||||||
return builder.make_dataset_urn_with_platform_instance( | ||||||
platform=config.platform_name, | ||||||
|
@@ -135,6 +179,10 @@ class ViewFieldType(Enum): | |||||
UNKNOWN = "Unknown" | ||||||
|
||||||
|
||||||
class ViewFieldValue(Enum): | ||||||
NOT_AVAILABLE = "NotAvailable" | ||||||
|
||||||
|
||||||
@dataclass | ||||||
class ViewField: | ||||||
name: str | ||||||
|
@@ -161,6 +209,69 @@ def create_view_project_map(view_fields: List[ViewField]) -> Dict[str, str]: | |||||
return view_project_map | ||||||
|
||||||
|
||||||
def get_view_file_path( | ||||||
lkml_fields: List[LookmlModelExploreField], view_name: str | ||||||
) -> Optional[str]: | ||||||
""" | ||||||
Search for the view file path on field, if found then return the file path | ||||||
""" | ||||||
logger.debug("Entered") | ||||||
|
||||||
for field in lkml_fields: | ||||||
if field.view == view_name: | ||||||
# This path is relative to git clone directory | ||||||
logger.debug(f"Found view({view_name}) file-path {field.source_file}") | ||||||
return field.source_file | ||||||
|
||||||
logger.debug(f"Failed to find view({view_name}) file-path") | ||||||
|
||||||
return None | ||||||
|
||||||
|
||||||
def create_upstream_views_file_path_map( | ||||||
view_names: Set[str], lkml_fields: List[LookmlModelExploreField] | ||||||
) -> Dict[str, Optional[str]]: | ||||||
""" | ||||||
Create a map of view-name v/s view file path, so that later we can fetch view's file path via view-name | ||||||
""" | ||||||
|
||||||
upstream_views_file_path: Dict[str, Optional[str]] = {} | ||||||
|
||||||
for view_name in view_names: | ||||||
file_path: Optional[str] = get_view_file_path( | ||||||
lkml_fields=lkml_fields, view_name=view_name | ||||||
) | ||||||
|
||||||
upstream_views_file_path[view_name] = file_path | ||||||
|
||||||
return upstream_views_file_path | ||||||
|
||||||
|
||||||
def explore_field_set_to_lkml_fields( | ||||||
explore: LookmlModelExplore, | ||||||
) -> List[LookmlModelExploreField]: | ||||||
""" | ||||||
explore.fields has three variables i.e. dimensions, measures, parameters of same type i.e. LookmlModelExploreField. | ||||||
This method creating a list by adding all field instance to lkml_fields | ||||||
""" | ||||||
lkml_fields: List[LookmlModelExploreField] = [] | ||||||
|
||||||
if explore.fields is None: | ||||||
logger.debug(f"Explore({explore.name}) doesn't have any field") | ||||||
return lkml_fields | ||||||
|
||||||
def empty_list( | ||||||
fields: Optional[Sequence[LookmlModelExploreField]], | ||||||
) -> List[LookmlModelExploreField]: | ||||||
return list(fields) if fields is not None else [] | ||||||
|
||||||
lkml_fields.extend(empty_list(explore.fields.dimensions)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
same for below two statements. can remove nested function empty_list. It is already ensured earlier in code flow that explore.fields is not None. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. empty_list will avoid empty list creation logic three places in the function. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, okay. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in this case, I prefer the also, nesting functions like this is generally an antipattern There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is avoiding duplicates of |
||||||
lkml_fields.extend(empty_list(explore.fields.measures)) | ||||||
lkml_fields.extend(empty_list(explore.fields.parameters)) | ||||||
|
||||||
return lkml_fields | ||||||
|
||||||
|
||||||
class LookerUtil: | ||||||
field_type_mapping = { | ||||||
**POSTGRES_TYPES_MAP, | ||||||
|
@@ -457,6 +568,9 @@ class LookerExplore: | |||||
upstream_views: Optional[ | ||||||
List[ProjectInclude] | ||||||
] = None # captures the view name(s) this explore is derived from | ||||||
upstream_views_file_path: Dict[str, Optional[str]] = dataclasses_field( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please go through function |
||||||
default_factory=dict | ||||||
) # view_name is key and file_path is value. A single file may contains multiple views | ||||||
joins: Optional[List[str]] = None | ||||||
fields: Optional[List[ViewField]] = None # the fields exposed in this explore | ||||||
source_file: Optional[str] = None | ||||||
|
@@ -558,6 +672,9 @@ def from_dict( | |||||
description=dict.get("description"), | ||||||
upstream_views=upstream_views, | ||||||
joins=joins, | ||||||
# This method is getting called from lookml_source's get_internal_workunits method | ||||||
# & upstream_views_file_path is not in use in that code flow | ||||||
upstream_views_file_path={}, | ||||||
) | ||||||
|
||||||
@classmethod # noqa: C901 | ||||||
|
@@ -575,6 +692,10 @@ def from_api( # noqa: C901 | |||||
explore = client.lookml_model_explore(model, explore_name) | ||||||
views: Set[str] = set() | ||||||
|
||||||
lkml_fields: List[ | ||||||
LookmlModelExploreField | ||||||
] = explore_field_set_to_lkml_fields(explore) | ||||||
|
||||||
if explore.view_name is not None and explore.view_name != explore.name: | ||||||
# explore is not named after a view and is instead using a from field, which is modeled as view_name. | ||||||
aliased_explore = True | ||||||
|
@@ -685,6 +806,15 @@ def from_api( # noqa: C901 | |||||
if view_project_map: | ||||||
logger.debug(f"views and their projects: {view_project_map}") | ||||||
|
||||||
upstream_views_file_path: Dict[ | ||||||
str, Optional[str] | ||||||
] = create_upstream_views_file_path_map( | ||||||
lkml_fields=lkml_fields, | ||||||
view_names=views, | ||||||
) | ||||||
if upstream_views_file_path: | ||||||
logger.debug(f"views and their file-paths: {upstream_views_file_path}") | ||||||
|
||||||
return cls( | ||||||
name=explore_name, | ||||||
model_name=model, | ||||||
|
@@ -699,6 +829,7 @@ def from_api( # noqa: C901 | |||||
) | ||||||
for view_name in views | ||||||
), | ||||||
upstream_views_file_path=upstream_views_file_path, | ||||||
source_file=explore.source_file, | ||||||
) | ||||||
except SDKError as e: | ||||||
|
@@ -791,12 +922,20 @@ def _to_metadata_events( # noqa: C901 | |||||
upstreams = [] | ||||||
observed_lineage_ts = datetime.datetime.now(tz=datetime.timezone.utc) | ||||||
for view_ref in sorted(self.upstream_views): | ||||||
# set file_path to ViewFieldType.UNKNOWN if file_path is not available to keep backward compatibility | ||||||
# if we raise error on file_path equal to None then existing test-cases will fail as mock data doesn't have required attributes. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why can't we update the test mock data? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure whether mock data is written as per production scenario. The fields are marked as Optional in looker sdk, so to avoid any surprises in production I preferred to keep existing test as is |
||||||
file_path: str = ( | ||||||
cast(str, self.upstream_views_file_path[view_ref.include]) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need this cast There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because self.upstream_views_file_path[view_ref.include] is optional, None check is there but still lint was raising error |
||||||
if self.upstream_views_file_path[view_ref.include] is not None | ||||||
else ViewFieldValue.NOT_AVAILABLE.value | ||||||
) | ||||||
view_urn = LookerViewId( | ||||||
project_name=view_ref.project | ||||||
if view_ref.project != _BASE_PROJECT_NAME | ||||||
else self.project_name, | ||||||
model_name=self.model_name, | ||||||
view_name=view_ref.include, | ||||||
file_path=file_path, | ||||||
).get_urn(config) | ||||||
|
||||||
upstreams.append( | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't do this now, but I think it will make sense to more cleanly separate the looker and lookml code paths in the future e.g. this stuff really should live in looker_source.py, not looker common
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. I think it is happening because from_api is implemented in common