Skip to content

Commit

Permalink
feat(tableau): add platform_instance_map, lineage_overrides in tablea…
Browse files Browse the repository at this point in the history
…u source
  • Loading branch information
mayurinehate committed Aug 29, 2022
1 parent ee43262 commit 8773816
Show file tree
Hide file tree
Showing 4 changed files with 3,897 additions and 22,668 deletions.
18 changes: 14 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
)

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel, ConfigurationError
from datahub.configuration.common import ConfigurationError
from datahub.configuration.source_common import DatasetLineageProviderConfigBase
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import (
PlatformKey,
Expand All @@ -36,6 +37,7 @@
from datahub.ingestion.source.tableau_common import (
FIELD_TYPE_MAPPING,
MetadataQueryException,
TableauLineageOverrides,
clean_query,
custom_sql_graphql_query,
get_field_value_in_sheet,
Expand Down Expand Up @@ -89,7 +91,7 @@
REPLACE_SLASH_CHAR = "|"


class TableauConfig(ConfigModel):
class TableauConfig(DatasetLineageProviderConfigBase):
connect_uri: str = Field(description="Tableau host URL.")
username: Optional[str] = Field(
default=None,
Expand Down Expand Up @@ -147,6 +149,11 @@ class TableauConfig(ConfigModel):
description="Environment to use in namespace when constructing URNs.",
)

lineage_overrides: Optional[TableauLineageOverrides] = Field(
default=None,
description="Mappings to change generated dataset urns. Use only if you really know what you are doing.",
)

@validator("connect_uri")
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)
Expand Down Expand Up @@ -419,12 +426,15 @@ def _create_upstream_table_lineage(
f"Omitting schema for upstream table {table['id']}, schema included in table name"
)
schema = ""

table_urn = make_table_urn(
self.config.env,
upstream_db,
table.get("connectionType", ""),
schema,
table_name,
self.config.platform_instance_map,
self.config.lineage_overrides,
)

upstream_table = UpstreamClass(
Expand All @@ -447,7 +457,7 @@ def _create_upstream_table_lineage(
return upstream_tables

def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
count_on_query = len(self.custom_sql_ids_being_used)
count_on_query = self.config.page_size
custom_sql_filter = f"idWithin: {json.dumps(self.custom_sql_ids_being_used)}"
custom_sql_connection, total_count, has_next_page = self.get_connection_object(
custom_sql_graphql_query, "customSQLTablesConnection", custom_sql_filter
Expand Down Expand Up @@ -823,7 +833,7 @@ def emit_datasource(
)

def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
count_on_query = len(self.datasource_ids_being_used)
count_on_query = self.config.page_size
datasource_filter = f"idWithin: {json.dumps(self.datasource_ids_being_used)}"
(
published_datasource_conn,
Expand Down
83 changes: 73 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/tableau_common.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import html
from functools import lru_cache
from typing import List
from typing import Dict, List, Optional

from pydantic.fields import Field

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayTypeClass,
BooleanTypeClass,
Expand All @@ -19,6 +22,13 @@
)


class TableauLineageOverrides(ConfigModel):
platform_override_map: Optional[Dict[str, str]] = Field(
default=None,
description="A holder for platform -> platform mappings to generate correct dataset urns",
)


class MetadataQueryException(Exception):
pass

Expand Down Expand Up @@ -384,16 +394,13 @@ def get_tags_from_params(params: List[str] = []) -> GlobalTagsClass:
return GlobalTagsClass(tags=tags)


@lru_cache(maxsize=None)
def make_table_urn(
env: str, upstream_db: str, connection_type: str, schema: str, full_name: str
) -> str:
@lru_cache
def get_platform(connection_type: str) -> str:
# connection_type taken from
# https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_concepts_connectiontype.htm
# datahub platform mapping is found here
# https://github.com/datahub-project/datahub/blob/master/metadata-service/war/src/main/resources/boot/data_platforms.json

final_name = full_name.replace("[", "").replace("]", "")
if connection_type in ("textscan", "textclean", "excel-direct", "excel", "csv"):
platform = "external"
elif connection_type in (
Expand All @@ -414,11 +421,23 @@ def make_table_urn(
platform = "mssql"
elif connection_type in ("athena"):
platform = "athena"
upstream_db = ""
else:
platform = connection_type
return platform


@lru_cache
def get_fully_qualified_table_name(
platform: str,
upstream_db: str,
schema: str,
full_name: str,
) -> str:
if platform == "athena":
upstream_db = ""
database_name = f"{upstream_db}." if upstream_db else ""
final_name = full_name.replace("[", "").replace("]", "")

schema_name = f"{schema}." if schema else ""

fully_qualified_table_name = f"{database_name}{schema_name}{final_name}"
Expand All @@ -430,10 +449,54 @@ def make_table_urn(
fully_qualified_table_name = (
fully_qualified_table_name.replace('\\"', "").replace('"', "").replace("\\", "")
)
# if there are more than 3 tokens, just take the final 3
fully_qualified_table_name = ".".join(fully_qualified_table_name.split(".")[-3:])

return builder.make_dataset_urn(platform, fully_qualified_table_name, env)
if platform in ("athena", "hive", "mysql"):
# it two tier database system (athena, hive, mysql), just take final 2
fully_qualified_table_name = ".".join(
fully_qualified_table_name.split(".")[-2:]
)
else:
# if there are more than 3 tokens, just take the final 3
fully_qualified_table_name = ".".join(
fully_qualified_table_name.split(".")[-3:]
)

return fully_qualified_table_name


def get_platform_instance(
platform: str, platform_instance_map: Optional[Dict[str, str]]
) -> Optional[str]:
if platform_instance_map is not None and platform in platform_instance_map.keys():
return platform_instance_map[platform]

return None


def make_table_urn(
env: str,
upstream_db: Optional[str],
connection_type: str,
schema: str,
full_name: str,
platform_instance_map: Optional[Dict[str, str]],
lineage_overrides: Optional[TableauLineageOverrides] = None,
) -> str:
original_platform = platform = get_platform(connection_type)
if (
lineage_overrides is not None
and lineage_overrides.platform_override_map is not None
and original_platform in lineage_overrides.platform_override_map.keys()
):
platform = lineage_overrides.platform_override_map[original_platform]

table_name = get_fully_qualified_table_name(
original_platform, upstream_db, schema, full_name
)
platform_instance = get_platform_instance(original_platform, platform_instance_map)
return builder.make_dataset_urn_with_platform_instance(
platform, table_name, platform_instance, env
)


def make_description_from_params(description, formula):
Expand Down
Loading

0 comments on commit 8773816

Please sign in to comment.