Skip to content

Commit

Permalink
Let metadata_resolver return both artifact and type
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 627141741
  • Loading branch information
ml-metadata-team authored and tfx-copybara committed Apr 22, 2024
1 parent 8d74d23 commit b3b2732
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 109 deletions.
71 changes: 52 additions & 19 deletions ml_metadata/tools/mlmd_resolver/metadata_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
"""Metadata resolver for reasoning about metadata information."""

from typing import Callable, Dict, List, Optional
from typing import Callable, Dict, List, Optional, Tuple

from ml_metadata import metadata_store
from ml_metadata.proto import metadata_store_pb2
Expand All @@ -25,6 +25,7 @@
# Supported field mask paths in LineageGraph message for get_lineage_subgraph().
_ARTIFACTS_FIELD_MASK_PATH = 'artifacts'
_EVENTS_FIELD_MASK_PATH = 'events'
_ARTIFACT_TYPES_MASK_PATH = 'artifact_types'


class MetadataResolver:
Expand Down Expand Up @@ -60,7 +61,10 @@ def get_downstream_artifacts_by_artifact_ids(
max_num_hops: int = _MAX_NUM_HOPS,
filter_query: str = '',
event_filter: Optional[Callable[[metadata_store_pb2.Event], bool]] = None,
) -> Dict[int, List[metadata_store_pb2.Artifact]]:
) -> Dict[
int,
List[Tuple[metadata_store_pb2.Artifact, metadata_store_pb2.ArtifactType]],
]:
"""Given a list of artifact ids, get their provenance successor artifacts.
For each artifact matched by a given `artifact_id`, treat it as a starting
Expand Down Expand Up @@ -115,7 +119,13 @@ def get_downstream_artifacts_by_artifact_ids(
limit=_MAX_NUM_STARTING_NODES,
)
)
return {artifact.id: [artifact] for artifact in artifacts}
artifact_type_ids = [a.type_id for a in artifacts]
artifact_types = self._store.get_artifact_types_by_id(artifact_type_ids)
artifact_type_by_id = {t.id: t for t in artifact_types}
return {
artifact.id: [(artifact, artifact_type_by_id[artifact.type_id])]
for artifact in artifacts
}

options = metadata_store_pb2.LineageSubgraphQueryOptions(
starting_artifacts=metadata_store_pb2.LineageSubgraphQueryOptions.StartingNodes(
Expand All @@ -124,16 +134,18 @@ def get_downstream_artifacts_by_artifact_ids(
max_num_hops=max_num_hops,
direction=metadata_store_pb2.LineageSubgraphQueryOptions.Direction.DOWNSTREAM,
)
field_mask_paths = (
[_ARTIFACTS_FIELD_MASK_PATH, _EVENTS_FIELD_MASK_PATH]
if not filter_query
else [_EVENTS_FIELD_MASK_PATH]
)
field_mask_paths = [
_ARTIFACTS_FIELD_MASK_PATH,
_EVENTS_FIELD_MASK_PATH,
_ARTIFACT_TYPES_MASK_PATH,
]
lineage_graph = self._store.get_lineage_subgraph(
query_options=options,
field_mask_paths=field_mask_paths,
)

artifact_type_by_id = {t.id: t for t in lineage_graph.artifact_types}

if not filter_query:
artifacts_to_subgraph = metadata_resolver_utils.get_subgraphs_by_artifact_ids(
artifact_ids,
Expand All @@ -142,7 +154,9 @@ def get_downstream_artifacts_by_artifact_ids(
event_filter,
)
return {
artifact_id: list(subgraph.artifacts)
artifact_id: [
[a, artifact_type_by_id[a.type_id]] for a in subgraph.artifacts
]
for artifact_id, subgraph in artifacts_to_subgraph.items()
}
else:
Expand Down Expand Up @@ -171,7 +185,10 @@ def get_downstream_artifacts_by_artifact_ids(
downstream_artifacts_dict = {}
for artifact_id, visited_ids in artifacts_to_visited_ids.items():
downstream_artifacts = [
artifact_id_to_artifact[id]
(
artifact_id_to_artifact[id],
artifact_type_by_id[artifact_id_to_artifact[id].type_id],
)
for id in visited_ids[metadata_resolver_utils.NodeType.ARTIFACT]
if id in artifact_id_to_artifact
]
Expand Down Expand Up @@ -249,7 +266,10 @@ def get_upstream_artifacts_by_artifact_ids(
max_num_hops: int = _MAX_NUM_HOPS,
filter_query: str = '',
event_filter: Optional[Callable[[metadata_store_pb2.Event], bool]] = None,
) -> Dict[int, List[metadata_store_pb2.Artifact]]:
) -> Dict[
int,
List[Tuple[metadata_store_pb2.Artifact, metadata_store_pb2.ArtifactType]],
]:
"""Given a list of artifact ids, get their provenance ancestor artifacts.
For each artifact matched by a given `artifact_id`, treat it as a starting
Expand Down Expand Up @@ -303,7 +323,13 @@ def get_upstream_artifacts_by_artifact_ids(
limit=_MAX_NUM_STARTING_NODES,
)
)
return {artifact.id: [artifact] for artifact in artifacts}
artifact_type_ids = [a.type_id for a in artifacts]
artifact_types = self._store.get_artifact_types_by_id(artifact_type_ids)
artifact_type_by_id = {t.id: t for t in artifact_types}
return {
artifact.id: [(artifact, artifact_type_by_id[artifact.type_id])]
for artifact in artifacts
}

options = metadata_store_pb2.LineageSubgraphQueryOptions(
starting_artifacts=metadata_store_pb2.LineageSubgraphQueryOptions.StartingNodes(
Expand All @@ -312,16 +338,18 @@ def get_upstream_artifacts_by_artifact_ids(
max_num_hops=max_num_hops,
direction=metadata_store_pb2.LineageSubgraphQueryOptions.Direction.UPSTREAM,
)
field_mask_paths = (
[_ARTIFACTS_FIELD_MASK_PATH, _EVENTS_FIELD_MASK_PATH]
if not filter_query
else [_EVENTS_FIELD_MASK_PATH]
)
field_mask_paths = [
_ARTIFACTS_FIELD_MASK_PATH,
_EVENTS_FIELD_MASK_PATH,
_ARTIFACT_TYPES_MASK_PATH,
]
lineage_graph = self._store.get_lineage_subgraph(
query_options=options,
field_mask_paths=field_mask_paths,
)

artifact_type_by_id = {t.id: t for t in lineage_graph.artifact_types}

if not filter_query:
artifacts_to_subgraph = (
metadata_resolver_utils.get_subgraphs_by_artifact_ids(
Expand All @@ -332,7 +360,9 @@ def get_upstream_artifacts_by_artifact_ids(
)
)
return {
artifact_id: list(subgraph.artifacts)
artifact_id: [
[a, artifact_type_by_id[a.type_id]] for a in subgraph.artifacts
]
for artifact_id, subgraph in artifacts_to_subgraph.items()
}
else:
Expand Down Expand Up @@ -362,7 +392,10 @@ def get_upstream_artifacts_by_artifact_ids(
upstream_artifacts_dict = {}
for artifact_id, visited_ids in artifacts_to_visited_ids.items():
upstream_artifacts = [
artifact_id_to_artifact[id]
(
artifact_id_to_artifact[id],
artifact_type_by_id[artifact_id_to_artifact[id].type_id],
)
for id in visited_ids[metadata_resolver_utils.NodeType.ARTIFACT]
if id in artifact_id_to_artifact
]
Expand Down
Loading

0 comments on commit b3b2732

Please sign in to comment.