Skip to content

Commit

Permalink
refactor(ingest/biz-glossary): simplify business glossary source (#7912)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored May 4, 2023
1 parent a9e0038 commit ca5dffa
Show file tree
Hide file tree
Showing 8 changed files with 689 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ nodes:
source_ref: FIBO
source_url: "https://spec.edmcouncil.org/fibo/ontology/FBC/ProductsAndServices/ClientsAndAccounts/Account"
values:
- House.Colors.Red
- House.Colors.Pink
- Housing.Colors.Red
- Housing.Colors.Pink

- name: Kitchen
description: a room or area where food is prepared and cooked.
Expand All @@ -113,7 +113,7 @@ nodes:
source_ref: FIBO
source_url: "https://spec.edmcouncil.org/fibo/ontology/FBC/ProductsAndServices/ClientsAndAccounts/Account"
related_terms:
- House.Kitchen
- Housing.Kitchen
knowledge_links:
- url: "https://en.wikipedia.org/wiki/Spoon"
label: Wiki link
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from hashlib import md5
from typing import Any, List, Optional, Set, Tuple

import confluent_kafka
import jsonref
from confluent_kafka.schema_registry.schema_registry_client import (
RegisteredSchema,
Schema,
SchemaReference,
SchemaRegistryClient,
)

from datahub.ingestion.extractor import protobuf_util, schema_util
Expand Down Expand Up @@ -45,14 +45,11 @@ def __init__(
) -> None:
self.source_config: KafkaSourceConfig = source_config
self.report: KafkaSourceReport = report
# Use the fully qualified name for SchemaRegistryClient to make it mock patchable for testing.
self.schema_registry_client = (
confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient(
{
"url": source_config.connection.schema_registry_url,
**source_config.connection.schema_registry_config,
}
)
self.schema_registry_client = SchemaRegistryClient(
{
"url": source_config.connection.schema_registry_url,
**source_config.connection.schema_registry_config,
}
)
self.known_schema_registry_subjects: List[str] = []
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,9 @@ def __init__(self, looker_api: LookerAPI):
self.looker_api_wrapper = looker_api

def get_by_id(self, id_: str) -> Optional[LookerUser]:
if not id_:
return None

logger.debug(f"Will get user {id_}")

raw_user: Optional[User] = self.looker_api_wrapper.get_user(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pathlib
import time
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional, Union
from typing import Any, Dict, Iterable, List, Optional, TypeVar, Union

from pydantic import validator
from pydantic.fields import Field
Expand All @@ -13,25 +13,28 @@
from datahub.emitter.mce_builder import datahub_guid, make_group_urn, make_user_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import ( # SourceCapability,; capability,
from datahub.ingestion.api.decorators import (
SupportStatus,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.source_helpers import auto_workunit_reporter
from datahub.utilities.source_helpers import (
auto_status_aspect,
auto_workunit,
auto_workunit_reporter,
)
from datahub.utilities.urn_encoder import UrnEncoder

logger = logging.getLogger(__name__)

valid_status: models.StatusClass = models.StatusClass(removed=False)

# This needed to map path presents in inherits, contains, values, and related_terms to terms' optional id
path_vs_id: Dict[str, Optional[str]] = {}
GlossaryNodeInterface = TypeVar(
"GlossaryNodeInterface", "GlossaryNodeConfig", "BusinessGlossaryConfig"
)


class Owners(ConfigModel):
Expand Down Expand Up @@ -60,43 +63,46 @@ class GlossaryTermConfig(ConfigModel):
knowledge_links: Optional[List[KnowledgeCard]]
domain: Optional[str]

# Private fields.
_urn: str


class GlossaryNodeConfig(ConfigModel):
id: Optional[str]
name: str
description: str
owners: Optional[Owners]
terms: Optional[List[GlossaryTermConfig]]
terms: Optional[List["GlossaryTermConfig"]]
nodes: Optional[List["GlossaryNodeConfig"]]
knowledge_links: Optional[List[KnowledgeCard]]


GlossaryNodeConfig.update_forward_refs()
# Private fields.
_urn: str


class DefaultConfig(ConfigModel):
"""Holds defaults for populating fields in glossary terms"""

source: str
source: Optional[str]
owners: Owners
url: Optional[str] = None
source_type: Optional[str] = "INTERNAL"
source_type: str = "INTERNAL"


class BusinessGlossarySourceConfig(ConfigModel):
file: Union[str, pathlib.Path] = Field(
description="File path or URL to business glossary file to ingest."
)
enable_auto_id: bool = Field(
description="Generate id field from GlossaryNode and GlossaryTerm's name field",
description="Generate guid urns instead of a plaintext path urn with the node/term's hierarchy.",
default=False,
)


class BusinessGlossaryConfig(DefaultConfig):
version: str
nodes: Optional[List[GlossaryNodeConfig]]
terms: Optional[List[GlossaryTermConfig]]
terms: Optional[List["GlossaryTermConfig"]]
nodes: Optional[List["GlossaryNodeConfig"]]

@validator("version")
def version_must_be_1(cls, v):
Expand Down Expand Up @@ -166,17 +172,17 @@ def get_owners(owners: Owners) -> models.OwnershipClass:

def get_mces(
glossary: BusinessGlossaryConfig,
path_vs_id: Dict[str, str],
ingestion_config: BusinessGlossarySourceConfig,
ctx: PipelineContext,
) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]:
path: List[str] = []
root_owners = get_owners(glossary.owners)

if glossary.nodes:
for node in glossary.nodes:
yield from get_mces_from_node(
node,
path + [node.name],
path_vs_id=path_vs_id,
parentNode=None,
parentOwners=root_owners,
defaults=glossary,
Expand All @@ -188,7 +194,7 @@ def get_mces(
for term in glossary.terms:
yield from get_mces_from_term(
term,
path + [term.name],
path_vs_id=path_vs_id,
parentNode=None,
parentOwnership=root_owners,
defaults=glossary,
Expand Down Expand Up @@ -237,16 +243,15 @@ def make_domain_mcp(

def get_mces_from_node(
glossaryNode: GlossaryNodeConfig,
path: List[str],
path_vs_id: Dict[str, str],
parentNode: Optional[str],
parentOwners: models.OwnershipClass,
defaults: DefaultConfig,
ingestion_config: BusinessGlossarySourceConfig,
ctx: PipelineContext,
) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]:
node_urn = make_glossary_node_urn(
path, glossaryNode.id, ingestion_config.enable_auto_id
)
node_urn = glossaryNode._urn

node_info = models.GlossaryNodeInfoClass(
definition=glossaryNode.description,
parentNode=parentNode,
Expand All @@ -259,7 +264,7 @@ def get_mces_from_node(

node_snapshot = models.GlossaryNodeSnapshotClass(
urn=node_urn,
aspects=[node_info, node_owners, valid_status],
aspects=[node_info, node_owners],
)
yield get_mce_from_snapshot(node_snapshot)

Expand All @@ -274,7 +279,7 @@ def get_mces_from_node(
for node in glossaryNode.nodes:
yield from get_mces_from_node(
node,
path + [node.name],
path_vs_id=path_vs_id,
parentNode=node_urn,
parentOwners=node_owners,
defaults=defaults,
Expand All @@ -286,7 +291,7 @@ def get_mces_from_node(
for term in glossaryNode.terms:
yield from get_mces_from_term(
glossaryTerm=term,
path=path + [term.name],
path_vs_id=path_vs_id,
parentNode=node_urn,
parentOwnership=node_owners,
defaults=defaults,
Expand All @@ -313,29 +318,28 @@ def get_domain_class(

def get_mces_from_term(
glossaryTerm: GlossaryTermConfig,
path: List[str],
path_vs_id: Dict[str, str],
parentNode: Optional[str],
parentOwnership: models.OwnershipClass,
defaults: DefaultConfig,
ingestion_config: BusinessGlossarySourceConfig,
ctx: PipelineContext,
) -> Iterable[Union[models.MetadataChangeEventClass, MetadataChangeProposalWrapper]]:
term_urn = make_glossary_term_urn(
path, glossaryTerm.id, ingestion_config.enable_auto_id
)
term_urn = glossaryTerm._urn

aspects: List[
Union[
models.GlossaryTermInfoClass,
models.GlossaryRelatedTermsClass,
models.OwnershipClass,
models.StatusClass,
models.GlossaryTermKeyClass,
models.StatusClass,
models.BrowsePathsClass,
]
] = []
term_info = models.GlossaryTermInfoClass(
definition=glossaryTerm.description,
termSource=glossaryTerm.term_source # type: ignore
termSource=glossaryTerm.term_source
if glossaryTerm.term_source is not None
else defaults.source_type,
sourceRef=glossaryTerm.source_ref
Expand Down Expand Up @@ -432,27 +436,46 @@ def get_mces_from_term(
yield mcp


def populate_path_vs_id(glossary: BusinessGlossaryConfig) -> None:
path: List[str] = []
def materialize_all_node_urns(
glossary: BusinessGlossaryConfig, enable_auto_id: bool
) -> None:
"""After this runs, all nodes will have an id value that is a valid urn."""

def _process_child_terms(parent_node: GlossaryNodeConfig, path: List[str]) -> None:
path_vs_id[".".join(path + [parent_node.name])] = parent_node.id
def _process_child_terms(
parent_node: GlossaryNodeInterface, path: List[str]
) -> None:
for term in parent_node.terms or []:
term._urn = make_glossary_term_urn(
path + [term.name], term.id, enable_auto_id
)

if parent_node.terms:
for term in parent_node.terms:
path_vs_id[".".join(path + [parent_node.name] + [term.name])] = term.id
for node in parent_node.nodes or []:
node._urn = make_glossary_node_urn(
path + [node.name], node.id, enable_auto_id
)
_process_child_terms(node, path + [node.name])

if parent_node.nodes:
for node in parent_node.nodes:
_process_child_terms(node, path + [parent_node.name])
_process_child_terms(glossary, [])

if glossary.nodes:
for node in glossary.nodes:
_process_child_terms(node, path)

if glossary.terms:
for term in glossary.terms:
path_vs_id[".".join(path + [term.name])] = term.id
def populate_path_vs_id(glossary: BusinessGlossaryConfig) -> Dict[str, str]:
# This needed to map paths present in inherits, contains, values, and related_terms to term's
# urn, if one was manually specified.
path_vs_id: Dict[str, str] = {}

def _process_child_terms(
parent_node: GlossaryNodeInterface, path: List[str]
) -> None:
for term in parent_node.terms or []:
path_vs_id[".".join(path + [term.name])] = term._urn

for node in parent_node.nodes or []:
path_vs_id[".".join(path + [node.name])] = node._urn
_process_child_terms(node, path + [node.name])

_process_child_terms(glossary, [])

return path_vs_id


@platform_name("Business Glossary")
Expand All @@ -472,28 +495,36 @@ def create(cls, config_dict, ctx):
config = BusinessGlossarySourceConfig.parse_obj(config_dict)
return cls(ctx, config)

@classmethod
def load_glossary_config(
self, file_name: Union[str, pathlib.Path]
cls, file_name: Union[str, pathlib.Path]
) -> BusinessGlossaryConfig:
config = load_config_file(file_name)
glossary_cfg = BusinessGlossaryConfig.parse_obj(config)
return glossary_cfg

def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(
self.report,
auto_status_aspect(
self.get_workunits_internal(),
),
)

def get_workunits_internal(
self,
) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
) -> Iterable[MetadataWorkUnit]:
glossary_config = self.load_glossary_config(self.config.file)
populate_path_vs_id(glossary_config)
for event in get_mces(
glossary_config, ingestion_config=self.config, ctx=self.ctx

materialize_all_node_urns(glossary_config, self.config.enable_auto_id)
path_vs_id = populate_path_vs_id(glossary_config)

for event in auto_workunit(
get_mces(
glossary_config, path_vs_id, ingestion_config=self.config, ctx=self.ctx
)
):
if isinstance(event, models.MetadataChangeEventClass):
yield MetadataWorkUnit(f"{event.proposedSnapshot.urn}", mce=event)
elif isinstance(event, MetadataChangeProposalWrapper):
yield event.as_workunit()
yield event

def get_report(self):
return self.report
Loading

0 comments on commit ca5dffa

Please sign in to comment.