Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/looker): ingest explore tags into the DataHub #10547

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
TagPropertiesClass,
TagSnapshotClass,
)
from datahub.metadata.urns import TagUrn
from datahub.utilities.lossy_collections import LossyList, LossySet
from datahub.utilities.url_util import remove_port_from_url

Expand Down Expand Up @@ -669,6 +670,7 @@ class LookerExplore:
joins: Optional[List[str]] = None
fields: Optional[List[ViewField]] = None # the fields exposed in this explore
source_file: Optional[str] = None
tags: List[str] = dataclasses_field(default_factory=list)

@validator("name")
def remove_quotes(cls, v):
Expand Down Expand Up @@ -770,6 +772,7 @@ def from_dict(
# This method is getting called from lookml_source's get_internal_workunits method
# & upstream_views_file_path is not in use in that code flow
upstream_views_file_path={},
tags=cast(List, dict.get("tags")) if dict.get("tags") is not None else [],
)

@classmethod # noqa: C901
Expand All @@ -786,7 +789,6 @@ def from_api( # noqa: C901
try:
explore = client.lookml_model_explore(model, explore_name)
views: Set[str] = set()

lkml_fields: List[
LookmlModelExploreField
] = explore_field_set_to_lkml_fields(explore)
Expand Down Expand Up @@ -956,6 +958,7 @@ def from_api( # noqa: C901
),
upstream_views_file_path=upstream_views_file_path,
source_file=explore.source_file,
tags=list(explore.tags) if explore.tags is not None else [],
)
except SDKError as e:
if "<title>Looker Not Found (404)</title>" in str(e):
Expand Down Expand Up @@ -1133,6 +1136,20 @@ def _to_metadata_events( # noqa: C901
mcp,
]

# Add tags
explore_tag_urns: List[TagAssociationClass] = []
for tag in self.tags:
tag_urn = TagUrn(tag)
explore_tag_urns.append(TagAssociationClass(tag_urn.urn()))
proposals.append(
MetadataChangeProposalWrapper(
entityUrn=tag_urn.urn(),
aspect=tag_urn.to_key_aspect(),
)
)
if explore_tag_urns:
dataset_snapshot.aspects.append(GlobalTagsClass(explore_tag_urns))

# If extracting embeds is enabled, produce an MCP for embed URL.
if extract_embed_urls:
embed_mcp = create_embed_mcp(
Expand Down
66 changes: 64 additions & 2 deletions metadata-ingestion/tests/integration/looker/test_looker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, cast
from typing import Any, Dict, List, Optional, Union, cast
from unittest import mock

import pytest
Expand All @@ -24,9 +24,12 @@
WriteQuery,
)

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.run.pipeline import Pipeline, PipelineInitError
from datahub.ingestion.source.looker import looker_common, looker_usage
from datahub.ingestion.source.looker.looker_common import LookerExplore
from datahub.ingestion.source.looker.looker_config import LookerCommonConfig
from datahub.ingestion.source.looker.looker_lib_wrapper import (
LookerAPI,
LookerAPIConfig,
Expand All @@ -37,6 +40,8 @@
UserViewField,
)
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import GlobalTagsClass, MetadataChangeEventClass
from tests.test_helpers import mce_helpers
from tests.test_helpers.state_helpers import (
get_current_checkpoint_from_pipeline,
Expand Down Expand Up @@ -481,7 +486,9 @@ def setup_mock_explore_unaliased_with_joins(mocked_client):


def setup_mock_explore(
mocked_client: Any, additional_lkml_fields: List[LookmlModelExploreField] = []
mocked_client: Any,
additional_lkml_fields: List[LookmlModelExploreField] = [],
**additional_explore_fields: Any,
) -> None:
mock_model = mock.MagicMock(project_name="lkml_samples")
mocked_client.lookml_model.return_value = mock_model
Expand All @@ -508,6 +515,7 @@ def setup_mock_explore(
dimensions=lkml_fields,
),
source_file="test_source_file.lkml",
**additional_explore_fields,
)


Expand Down Expand Up @@ -1058,3 +1066,57 @@ def test_upstream_cll(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
assert (
looker_explore.fields[2].upstream_fields[0] == "dataset_lineages.createdon"
)


@freeze_time(FROZEN_TIME)
def test_explore_tags(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
mocked_client = mock.MagicMock()

with mock.patch(
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
mock_datahub_graph,
) as mock_checkpoint, mock.patch("looker_sdk.init40") as mock_sdk:
mock_checkpoint.return_value = mock_datahub_graph

tags: List[str] = ["metrics", "all"]

mock_sdk.return_value = mocked_client
setup_mock_explore(
mocked_client,
tags=tags,
)

looker_explore: Optional[LookerExplore] = looker_common.LookerExplore.from_api(
model="fake",
explore_name="my_explore_name",
client=mocked_client,
reporter=mock.MagicMock(),
source_config=mock.MagicMock(),
)

assert looker_explore is not None
assert looker_explore.name == "my_explore_name"
assert looker_explore.tags == tags

mcps: Optional[
List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]
] = looker_explore._to_metadata_events(
config=LookerCommonConfig(),
reporter=SourceReport(),
base_url="fake",
extract_embed_urls=False,
)

expected_tag_urns: List[str] = ["urn:li:tag:metrics", "urn:li:tag:all"]

actual_tag_urns: List[str] = []
if mcps:
for mcp in mcps:
if isinstance(mcp, MetadataChangeEventClass):
for aspect in mcp.proposedSnapshot.aspects:
if isinstance(aspect, GlobalTagsClass):
actual_tag_urns = [
tag_association.tag for tag_association in aspect.tags
]

assert expected_tag_urns == actual_tag_urns
Loading