Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ingest/biz-glossary): simplify business glossary source #7912

Merged
merged 9 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ nodes:
source_ref: FIBO
source_url: "https://spec.edmcouncil.org/fibo/ontology/FBC/ProductsAndServices/ClientsAndAccounts/Account"
values:
- House.Colors.Red
- House.Colors.Pink
- Housing.Colors.Red
- Housing.Colors.Pink

- name: Kitchen
description: a room or area where food is prepared and cooked.
Expand All @@ -113,7 +113,7 @@ nodes:
source_ref: FIBO
source_url: "https://spec.edmcouncil.org/fibo/ontology/FBC/ProductsAndServices/ClientsAndAccounts/Account"
related_terms:
- House.Kitchen
- Housing.Kitchen
knowledge_links:
- url: "https://en.wikipedia.org/wiki/Spoon"
label: Wiki link
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from hashlib import md5
from typing import Any, List, Optional, Set, Tuple

import confluent_kafka
import jsonref
from confluent_kafka.schema_registry.schema_registry_client import (
RegisteredSchema,
Schema,
SchemaReference,
SchemaRegistryClient,
)

from datahub.ingestion.extractor import protobuf_util, schema_util
Expand Down Expand Up @@ -45,14 +45,11 @@ def __init__(
) -> None:
self.source_config: KafkaSourceConfig = source_config
self.report: KafkaSourceReport = report
# Use the fully qualified name for SchemaRegistryClient to make it mock patchable for testing.
self.schema_registry_client = (
confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient(
{
"url": source_config.connection.schema_registry_url,
**source_config.connection.schema_registry_config,
}
)
self.schema_registry_client = SchemaRegistryClient(
{
"url": source_config.connection.schema_registry_url,
**source_config.connection.schema_registry_config,
}
)
self.known_schema_registry_subjects: List[str] = []
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,9 @@ def __init__(self, looker_api: LookerAPI):
self.looker_api_wrapper = looker_api

def get_by_id(self, id_: str) -> Optional[LookerUser]:
if not id_:
return None

hsheth2 marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(f"Will get user {id_}")

raw_user: Optional[User] = self.looker_api_wrapper.get_user(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pathlib
import time
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional, Union
from typing import Any, Dict, Iterable, List, Optional, TypeVar, Union

from pydantic import validator
from pydantic.fields import Field
Expand All @@ -13,25 +13,28 @@
from datahub.emitter.mce_builder import datahub_guid, make_group_urn, make_user_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import ( # SourceCapability,; capability,
from datahub.ingestion.api.decorators import (
SupportStatus,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.source_helpers import auto_workunit_reporter
from datahub.utilities.source_helpers import (
auto_status_aspect,
auto_workunit,
auto_workunit_reporter,
)
from datahub.utilities.urn_encoder import UrnEncoder

logger = logging.getLogger(__name__)

valid_status: models.StatusClass = models.StatusClass(removed=False)

# This needed to map path presents in inherits, contains, values, and related_terms to terms' optional id
path_vs_id: Dict[str, Optional[str]] = {}
GlossaryNodeInterface = TypeVar(
"GlossaryNodeInterface", "GlossaryNodeConfig", "BusinessGlossaryConfig"
)


class Owners(ConfigModel):
Expand Down Expand Up @@ -60,43 +63,46 @@ class GlossaryTermConfig(ConfigModel):
knowledge_links: Optional[List[KnowledgeCard]]
domain: Optional[str]

# Private fields.
_urn: str


class GlossaryNodeConfig(ConfigModel):
id: Optional[str]
name: str
description: str
owners: Optional[Owners]
terms: Optional[List[GlossaryTermConfig]]
terms: Optional[List["GlossaryTermConfig"]]
nodes: Optional[List["GlossaryNodeConfig"]]
knowledge_links: Optional[List[KnowledgeCard]]


GlossaryNodeConfig.update_forward_refs()
# Private fields.
_urn: str


class DefaultConfig(ConfigModel):
"""Holds defaults for populating fields in glossary terms"""

source: str
source: Optional[str]
owners: Owners
url: Optional[str] = None
source_type: Optional[str] = "INTERNAL"
source_type: str = "INTERNAL"


class BusinessGlossarySourceConfig(ConfigModel):
file: Union[str, pathlib.Path] = Field(
description="File path or URL to business glossary file to ingest."
)
enable_auto_id: bool = Field(
description="Generate id field from GlossaryNode and GlossaryTerm's name field",
description="Generate guid urns instead of a plaintext path urn with the node/term's hierarchy.",
default=False,
)


class BusinessGlossaryConfig(DefaultConfig):
version: str
nodes: Optional[List[GlossaryNodeConfig]]
terms: Optional[List[GlossaryTermConfig]]
terms: Optional[List["GlossaryTermConfig"]]
nodes: Optional[List["GlossaryNodeConfig"]]

@validator("version")
def version_must_be_1(cls, v):
Expand Down Expand Up @@ -166,17 +172,17 @@ def get_owners(owners: Owners) -> models.OwnershipClass:

def get_mces(
glossary: BusinessGlossaryConfig,
path_vs_id: Dict[str, str],
ingestion_config: BusinessGlossarySourceConfig,
ctx: PipelineContext,
) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]:
path: List[str] = []
root_owners = get_owners(glossary.owners)

if glossary.nodes:
for node in glossary.nodes:
yield from get_mces_from_node(
node,
path + [node.name],
path_vs_id=path_vs_id,
parentNode=None,
parentOwners=root_owners,
defaults=glossary,
Expand All @@ -188,7 +194,7 @@ def get_mces(
for term in glossary.terms:
yield from get_mces_from_term(
term,
path + [term.name],
path_vs_id=path_vs_id,
parentNode=None,
parentOwnership=root_owners,
defaults=glossary,
Expand Down Expand Up @@ -237,16 +243,15 @@ def make_domain_mcp(

def get_mces_from_node(
glossaryNode: GlossaryNodeConfig,
path: List[str],
path_vs_id: Dict[str, str],
parentNode: Optional[str],
parentOwners: models.OwnershipClass,
defaults: DefaultConfig,
ingestion_config: BusinessGlossarySourceConfig,
ctx: PipelineContext,
) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]:
node_urn = make_glossary_node_urn(
path, glossaryNode.id, ingestion_config.enable_auto_id
)
node_urn = glossaryNode._urn

node_info = models.GlossaryNodeInfoClass(
definition=glossaryNode.description,
parentNode=parentNode,
Expand All @@ -259,7 +264,7 @@ def get_mces_from_node(

node_snapshot = models.GlossaryNodeSnapshotClass(
urn=node_urn,
aspects=[node_info, node_owners, valid_status],
aspects=[node_info, node_owners],
)
yield get_mce_from_snapshot(node_snapshot)

Expand All @@ -274,7 +279,7 @@ def get_mces_from_node(
for node in glossaryNode.nodes:
yield from get_mces_from_node(
node,
path + [node.name],
path_vs_id=path_vs_id,
parentNode=node_urn,
parentOwners=node_owners,
defaults=defaults,
Expand All @@ -286,7 +291,7 @@ def get_mces_from_node(
for term in glossaryNode.terms:
yield from get_mces_from_term(
glossaryTerm=term,
path=path + [term.name],
path_vs_id=path_vs_id,
parentNode=node_urn,
parentOwnership=node_owners,
defaults=defaults,
Expand All @@ -313,29 +318,28 @@ def get_domain_class(

def get_mces_from_term(
glossaryTerm: GlossaryTermConfig,
path: List[str],
path_vs_id: Dict[str, str],
parentNode: Optional[str],
parentOwnership: models.OwnershipClass,
defaults: DefaultConfig,
ingestion_config: BusinessGlossarySourceConfig,
ctx: PipelineContext,
) -> Iterable[Union[models.MetadataChangeEventClass, MetadataChangeProposalWrapper]]:
term_urn = make_glossary_term_urn(
path, glossaryTerm.id, ingestion_config.enable_auto_id
)
term_urn = glossaryTerm._urn

aspects: List[
Union[
models.GlossaryTermInfoClass,
models.GlossaryRelatedTermsClass,
models.OwnershipClass,
models.StatusClass,
models.GlossaryTermKeyClass,
models.StatusClass,
models.BrowsePathsClass,
]
] = []
term_info = models.GlossaryTermInfoClass(
definition=glossaryTerm.description,
termSource=glossaryTerm.term_source # type: ignore
termSource=glossaryTerm.term_source
if glossaryTerm.term_source is not None
else defaults.source_type,
sourceRef=glossaryTerm.source_ref
Expand Down Expand Up @@ -432,27 +436,46 @@ def get_mces_from_term(
yield mcp


def populate_path_vs_id(glossary: BusinessGlossaryConfig) -> None:
path: List[str] = []
def materialize_all_node_urns(
glossary: BusinessGlossaryConfig, enable_auto_id: bool
) -> None:
"""After this runs, all nodes will have an id value that is a valid urn."""

def _process_child_terms(parent_node: GlossaryNodeConfig, path: List[str]) -> None:
path_vs_id[".".join(path + [parent_node.name])] = parent_node.id
def _process_child_terms(
parent_node: GlossaryNodeInterface, path: List[str]
) -> None:
for term in parent_node.terms or []:
term._urn = make_glossary_term_urn(
path + [term.name], term.id, enable_auto_id
)

if parent_node.terms:
for term in parent_node.terms:
path_vs_id[".".join(path + [parent_node.name] + [term.name])] = term.id
for node in parent_node.nodes or []:
node._urn = make_glossary_node_urn(
path + [node.name], node.id, enable_auto_id
)
_process_child_terms(node, path + [node.name])

if parent_node.nodes:
for node in parent_node.nodes:
_process_child_terms(node, path + [parent_node.name])
_process_child_terms(glossary, [])

if glossary.nodes:
for node in glossary.nodes:
_process_child_terms(node, path)

if glossary.terms:
for term in glossary.terms:
path_vs_id[".".join(path + [term.name])] = term.id
def populate_path_vs_id(glossary: BusinessGlossaryConfig) -> Dict[str, str]:
# This needed to map paths present in inherits, contains, values, and related_terms to term's
# urn, if one was manually specified.
path_vs_id: Dict[str, str] = {}

def _process_child_terms(
parent_node: GlossaryNodeInterface, path: List[str]
) -> None:
for term in parent_node.terms or []:
path_vs_id[".".join(path + [term.name])] = term._urn

for node in parent_node.nodes or []:
path_vs_id[".".join(path + [node.name])] = node._urn
_process_child_terms(node, path + [node.name])

_process_child_terms(glossary, [])

return path_vs_id


@platform_name("Business Glossary")
Expand All @@ -472,28 +495,36 @@ def create(cls, config_dict, ctx):
config = BusinessGlossarySourceConfig.parse_obj(config_dict)
return cls(ctx, config)

@classmethod
def load_glossary_config(
self, file_name: Union[str, pathlib.Path]
cls, file_name: Union[str, pathlib.Path]
) -> BusinessGlossaryConfig:
config = load_config_file(file_name)
glossary_cfg = BusinessGlossaryConfig.parse_obj(config)
return glossary_cfg

def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(
self.report,
auto_status_aspect(
self.get_workunits_internal(),
),
)

def get_workunits_internal(
self,
) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
) -> Iterable[MetadataWorkUnit]:
glossary_config = self.load_glossary_config(self.config.file)
populate_path_vs_id(glossary_config)
for event in get_mces(
glossary_config, ingestion_config=self.config, ctx=self.ctx

materialize_all_node_urns(glossary_config, self.config.enable_auto_id)
path_vs_id = populate_path_vs_id(glossary_config)
Comment on lines +519 to +520
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do these two together? The recursion looks the same and it seems like after the urns are calculated we can just alter path_vs_id after we define the urn

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use them separately here https://github.com/acryldata/business-glossary-sync-action/blob/main/glossary-sync.py#L473, which is why I structured it this way


for event in auto_workunit(
get_mces(
glossary_config, path_vs_id, ingestion_config=self.config, ctx=self.ctx
)
):
if isinstance(event, models.MetadataChangeEventClass):
yield MetadataWorkUnit(f"{event.proposedSnapshot.urn}", mce=event)
elif isinstance(event, MetadataChangeProposalWrapper):
yield event.as_workunit()
yield event

def get_report(self):
return self.report
Loading