Skip to content

Commit

Permalink
feat(transformers): Add domain transformer for dataset (#5456)
Browse files Browse the repository at this point in the history
Co-authored-by: MohdSiddique Bagwan <[email protected]>
  • Loading branch information
siddiquebagwan and siddiquebagwan-gslab authored Aug 4, 2022
1 parent 858b132 commit c619ba9
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 7 deletions.
20 changes: 19 additions & 1 deletion metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import re
from abc import ABC, abstractmethod
from enum import Enum
from typing import IO, Any, Dict, List, Optional, Pattern, cast

from pydantic import BaseModel, Extra
from pydantic import BaseModel, Extra, validator
from pydantic.fields import Field


Expand All @@ -11,6 +12,23 @@ class Config:
extra = Extra.forbid


class TransformerSemantics(Enum):
"""Describes semantics for aspect changes"""

OVERWRITE = "OVERWRITE" # Apply changes blindly
PATCH = "PATCH" # Only apply differences from what exists already on the server


class TransformerSemanticsConfigModel(ConfigModel):
semantics: TransformerSemantics = TransformerSemantics.OVERWRITE

@validator("semantics", pre=True)
def ensure_semantics_is_upper_case(cls, v: str) -> str:
if isinstance(v, str):
return v.upper()
return v


class DynamicTypedConfig(ConfigModel):
type: str = Field(
description="The type of the dynamic object",
Expand Down
8 changes: 8 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datahub.emitter.serialization_helper import post_json_transform
from datahub.metadata.schema_classes import (
DatasetUsageStatisticsClass,
DomainPropertiesClass,
DomainsClass,
GlobalTagsClass,
GlossaryTermsClass,
Expand Down Expand Up @@ -183,6 +184,13 @@ def get_ownership(self, entity_urn: str) -> Optional[OwnershipClass]:
aspect_type=OwnershipClass,
)

def get_domain_properties(self, entity_urn: str) -> Optional[DomainPropertiesClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="domainProperties",
aspect_type=DomainPropertiesClass,
)

def get_tags(self, entity_urn: str) -> Optional[GlobalTagsClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
DatasetPropertiesClass,
DatasetSnapshotClass,
DatasetUpstreamLineageClass,
DomainsClass,
EditableDatasetPropertiesClass,
EditableSchemaMetadataClass,
GlobalTagsClass,
Expand All @@ -41,6 +42,7 @@ class SnapshotAspectRegistry:
def __init__(self):
self.aspect_name_type_mapping = {
"ownership": OwnershipClass,
"domains": DomainsClass,
"globalTags": GlobalTagsClass,
"datasetProperties": DatasetPropertiesClass,
"editableDatasetProperties": EditableDatasetPropertiesClass,
Expand Down
169 changes: 169 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
from typing import Callable, List, Optional, Union, cast

from datahub.configuration.common import (
ConfigurationError,
KeyValuePattern,
TransformerSemantics,
TransformerSemanticsConfigModel,
)
from datahub.configuration.import_resolver import pydantic_resolve_key
from datahub.emitter.mce_builder import Aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.transformer.dataset_transformer import DatasetDomainTransformer
from datahub.metadata.schema_classes import DomainsClass
from datahub.utilities.registries.domain_registry import DomainRegistry


class AddDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel):
get_domains_to_add: Union[
Callable[[str], DomainsClass],
Callable[[str], DomainsClass],
]

_resolve_domain_fn = pydantic_resolve_key("get_domains_to_add")


class SimpleDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel):
domain_urns: List[str]


class PatternDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel):
domain_pattern: KeyValuePattern = KeyValuePattern.all()


class AddDatasetDomain(DatasetDomainTransformer):
"""Transformer that adds domains to datasets according to a callback function."""

ctx: PipelineContext
config: AddDatasetDomainSemanticsConfig

def __init__(self, config: AddDatasetDomainSemanticsConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetDomain":
config = AddDatasetDomainSemanticsConfig.parse_obj(config_dict)
return cls(config, ctx)

@staticmethod
def get_domain_class(
graph: Optional[DataHubGraph], domains: List[str]
) -> DomainsClass:
domain_registry: DomainRegistry = DomainRegistry(
cached_domains=[k for k in domains], graph=graph
)
domain_class = DomainsClass(
domains=[domain_registry.get_domain_urn(domain) for domain in domains]
)
return domain_class

@staticmethod
def get_domains_to_set(
graph: DataHubGraph, urn: str, mce_domain: Optional[DomainsClass]
) -> Optional[DomainsClass]:
if not mce_domain or not mce_domain.domains:
# nothing to add, no need to consult server
return None

server_domain = graph.get_domain(entity_urn=urn)
if server_domain:
# compute patch
# we only include domain who are not present in the server domain list
domains_to_add: List[str] = []
for domain in mce_domain.domains:
if domain not in server_domain.domains:
domains_to_add.append(domain)

mce_domain.domains.extend(server_domain.domains)
mce_domain.domains.extend(domains_to_add)

return mce_domain

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:

domain_aspect: DomainsClass = DomainsClass(domains=[])
# Check if we have received existing aspect
if aspect is not None:
domain_aspect.domains.extend(cast(DomainsClass, aspect).domains)

domain_to_add = self.config.get_domains_to_add(entity_urn)

domain_aspect.domains.extend(domain_to_add.domains)

if self.config.semantics == TransformerSemantics.PATCH:
assert self.ctx.graph
patch_domain_aspect: Optional[
DomainsClass
] = AddDatasetDomain.get_domains_to_set(
self.ctx.graph, entity_urn, domain_aspect
)
# This will pass the mypy lint
domain_aspect = (
patch_domain_aspect
if patch_domain_aspect is not None
else domain_aspect
)

return cast(Optional[Aspect], domain_aspect)


class SimpleAddDatasetDomain(AddDatasetDomain):
"""Transformer that adds a specified set of domains to each dataset."""

def __init__(
self, config: SimpleDatasetDomainSemanticsConfig, ctx: PipelineContext
):
if ctx.graph is None:
raise ConfigurationError(
"AddDatasetDomain requires a datahub_api to connect to. Consider using the datahub-rest sink or provide a datahub_api: configuration on your ingestion recipe"
)

domains = AddDatasetDomain.get_domain_class(ctx.graph, config.domain_urns)
generic_config = AddDatasetDomainSemanticsConfig(
get_domains_to_add=lambda _: domains,
semantics=config.semantics,
)
super().__init__(generic_config, ctx)

@classmethod
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "SimpleAddDatasetDomain":
config = SimpleDatasetDomainSemanticsConfig.parse_obj(config_dict)
return cls(config, ctx)


class PatternAddDatasetDomain(AddDatasetDomain):
"""Transformer that adds a specified set of domains to each dataset."""

def __init__(
self, config: PatternDatasetDomainSemanticsConfig, ctx: PipelineContext
):
if ctx.graph is None:
raise ConfigurationError(
"AddDatasetDomain requires a datahub_api to connect to. Consider using the datahub-rest sink or provide a datahub_api: configuration on your ingestion recipe"
)

domain_pattern = config.domain_pattern

def resolve_domain(domain_urn: str) -> DomainsClass:
domains = domain_pattern.value(domain_urn)
return self.get_domain_class(ctx.graph, domains)

generic_config = AddDatasetDomainSemanticsConfig(
get_domains_to_add=resolve_domain,
semantics=config.semantics,
)
super().__init__(generic_config, ctx)

@classmethod
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "PatternAddDatasetDomain":
config = PatternDatasetDomainSemanticsConfig.parse_obj(config_dict)
return cls(config, ctx)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from abc import abstractmethod
from abc import ABCMeta, abstractmethod
from typing import List, Optional

from deprecated import deprecated
Expand Down Expand Up @@ -59,11 +59,27 @@ def transform_aspect( # not marked as @abstractmethod to avoid impacting transf
)


# TODO: rename DatasetTransformerV2 to DatasetTransformer after upgrading all existing dataset transformer
class DatasetTransformerV2(BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta):
"""Transformer that does transforms sequentially on each dataset."""

def __init__(self):
super().__init__()

def entity_types(self) -> List[str]:
return ["dataset"]


class DatasetOwnershipTransformer(DatasetTransformer, SingleAspectTransformer):
def aspect_name(self) -> str:
return "ownership"


class DatasetDomainTransformer(DatasetTransformerV2, SingleAspectTransformer):
def aspect_name(self) -> str:
return "domains"


class DatasetStatusTransformer(DatasetTransformer, SingleAspectTransformer):
def aspect_name(self) -> str:
return "status"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datahub.ingestion.api.registry import PluginRegistry
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.transformer import dataset_domain
from datahub.ingestion.transformer.add_dataset_browse_path import (
AddDatasetBrowsePathTransformer,
)
Expand Down Expand Up @@ -45,6 +46,15 @@
transform_registry.register("simple_add_dataset_ownership", SimpleAddDatasetOwnership)
transform_registry.register("pattern_add_dataset_ownership", PatternAddDatasetOwnership)

transform_registry.register("add_dataset_domain", dataset_domain.AddDatasetDomain)
transform_registry.register(
"simple_add_dataset_domain", dataset_domain.SimpleAddDatasetDomain
)
transform_registry.register(
"pattern_add_dataset_domain", dataset_domain.PatternAddDatasetDomain
)


transform_registry.register("add_dataset_tags", AddDatasetTags)
transform_registry.register("simple_add_dataset_tags", SimpleAddDatasetTags)
transform_registry.register("pattern_add_dataset_tags", PatternAddDatasetTags)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ def __init__(
assert graph
# first try to check if this domain exists by urn
maybe_domain_urn = f"urn:li:domain:{domain_identifier}"
from datahub.metadata.schema_classes import DomainPropertiesClass

maybe_domain_properties = graph.get_aspect_v2(
maybe_domain_urn, DomainPropertiesClass, "domainProperties"
)
maybe_domain_properties = graph.get_domain_properties(maybe_domain_urn)
if maybe_domain_properties:
self.domain_registry[domain_identifier] = maybe_domain_urn
else:
Expand Down

0 comments on commit c619ba9

Please sign in to comment.