Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(graph service): only query for entities that should have lineage [Breaking Change] #5539

Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ export const OperationsTab = () => {
inputs: run?.inputs?.relationships.map((relationship) => relationship.entity),
outputs: run?.outputs?.relationships.map((relationship) => relationship.entity),
externalUrl: run?.externalUrl,
parentTemplate: run?.parentTemplate?.relationships?.[0].entity,
parentTemplate: run?.parentTemplate?.relationships?.[0]?.entity,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small bug fix i noticed while testing

}));

return (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ export const CompactEntityNameList = ({ entities, onClick, linkUrlParams, showTo
return (
<>
{entities.map((entity, index) => {
if (!entity) return <></>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, small bug fix here ^

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is entity not a required field? Didn't realize... Nice!


const genericProps = entityRegistry.getGenericEntityProperties(entity.type, entity);
const platformLogoUrl = genericProps?.platform?.properties?.logoUrl;
const displayName = entityRegistry.getDisplayName(entity.type, entity);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.metadata.models.registry;

import com.linkedin.metadata.graph.LineageDirection;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.annotation.RelationshipAnnotation;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import java.util.ArrayList;
Expand All @@ -25,9 +26,11 @@
public class LineageRegistry {

private final Map<String, LineageSpec> _lineageSpecMap;
private final EntityRegistry _entityRegistry;

public LineageRegistry(EntityRegistry entityRegistry) {
_lineageSpecMap = buildLineageSpecs(entityRegistry);
_entityRegistry = entityRegistry;
}

private Map<String, LineageSpec> buildLineageSpecs(EntityRegistry entityRegistry) {
Expand Down Expand Up @@ -86,6 +89,14 @@ public LineageSpec getLineageSpec(String entityName) {
return _lineageSpecMap.get(entityName.toLowerCase());
}

public Set<String> getEntitiesWithLineage() {
Map<String, EntitySpec> specs = _entityRegistry.getEntitySpecs();
return _lineageSpecMap.keySet().stream().filter(key ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: You maybe could use _lineageSpecMap.entrySet() to avoid the .get(key) calls

!_lineageSpecMap.get(key).getUpstreamEdges().isEmpty()
|| !_lineageSpecMap.get(key).getDownstreamEdges().isEmpty()
).map(key -> specs.get(key).getName()).collect(Collectors.toSet());
}

public List<EdgeInfo> getLineageRelationships(String entityName, LineageDirection direction) {
LineageSpec spec = getLineageSpec(entityName);
if (spec == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.linkedin.metadata.graph;

import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;


@Data
@AllArgsConstructor
public class GraphFilters {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For other services (e.g. search service) we use PDLs to model the filter types...Any reason why we shouldn't keep that consistent here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These aren't exposed via API yet so ill keep them just in java for now so its easier to change. Once we've settled on them let's move them to pdl.

// entity types you want to allow in your result set
public List<String> allowedEntityTypes;

public static GraphFilters emptyGraphFilters = new GraphFilters(ImmutableList.of());

public static GraphFilters defaultGraphFilters = new GraphFilters(ImmutableList.of());
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.metadata.query.filter.RelationshipFilter;
import com.linkedin.metadata.search.utils.QueryUtils;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -79,11 +80,12 @@ public interface GraphService {
* - RelatedEntity("DownstreamOf", "dataset three")
*/
@Nonnull
RelatedEntitiesResult findRelatedEntities(@Nullable final String sourceType, @Nonnull final Filter sourceEntityFilter,
@Nullable final String destinationType, @Nonnull final Filter destinationEntityFilter,
RelatedEntitiesResult findRelatedEntities(@Nullable final List<String> sourceTypes, @Nonnull final Filter sourceEntityFilter,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor thing : We don't really need to remove this API, we could just have the old API call the new API with the different arguments.

RelatedEntitiesResult findRelatedEntities(final String sourceType, Filter filter,...) {
   return findRelatedEntities(ImmutableList.of(sourceType, ...);
}

RelatedEntitiesResult findRelatedEntities(final List<String> sourceType, Filter filter,...) {

}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually creates ambiguity if the user provides null for both fields (which is something we support), so I'll keep it as is for now.

@Nullable final List<String> destinationTypes, @Nonnull final Filter destinationEntityFilter,
@Nonnull final List<String> relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter,
final int offset, final int count);


/**
* Traverse from the entityUrn towards the input direction up to maxHops number of hops
* Abstracts away the concept of relationship types
Expand All @@ -93,6 +95,19 @@ RelatedEntitiesResult findRelatedEntities(@Nullable final String sourceType, @No
@Nonnull
default EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction, int offset,
int count, int maxHops) {
return getLineage(entityUrn, direction, new GraphFilters(new ArrayList(getLineageRegistry().getEntitiesWithLineage())), offset, count, maxHops);
}

/**
* Traverse from the entityUrn towards the input direction up to maxHops number of hops. If entityTypes is not empty,
* will only return edges to entities that are within the entity types set.
* Abstracts away the concept of relationship types
*
* Unless overridden, it uses the lineage registry to fetch valid edge types and queries for them
*/
@Nonnull
default EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way we can only filter for entity types that the current URN is related to via a lineage edge?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we are just filtering for anything that has lineage... That will solve our immedaite problem with data process instance but in the long tem could this pop up again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed- only relevant entity types will be pulled

GraphFilters graphFilters, int offset, int count, int maxHops) {
if (maxHops > 1) {
maxHops = 1;
}
Expand All @@ -112,7 +127,8 @@ default EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageD
edgesByDirection.get(true).stream().map(LineageRegistry.EdgeInfo::getType).collect(Collectors.toList());
// Fetch outgoing edges
RelatedEntitiesResult outgoingEdges =
findRelatedEntities(null, newFilter("urn", entityUrn.toString()), null, QueryUtils.EMPTY_FILTER,
findRelatedEntities(null, newFilter("urn", entityUrn.toString()), graphFilters.getAllowedEntityTypes(),
QueryUtils.EMPTY_FILTER,
relationshipTypes, newRelationshipFilter(QueryUtils.EMPTY_FILTER, RelationshipDirection.OUTGOING), offset,
count);

Expand All @@ -137,7 +153,8 @@ relationshipTypes, newRelationshipFilter(QueryUtils.EMPTY_FILTER, RelationshipDi
List<String> relationshipTypes =
edgesByDirection.get(false).stream().map(LineageRegistry.EdgeInfo::getType).collect(Collectors.toList());
RelatedEntitiesResult incomingEdges =
findRelatedEntities(null, newFilter("urn", entityUrn.toString()), null, QueryUtils.EMPTY_FILTER,
findRelatedEntities(null, newFilter("urn", entityUrn.toString()), graphFilters.getAllowedEntityTypes(),
QueryUtils.EMPTY_FILTER,
relationshipTypes, newRelationshipFilter(QueryUtils.EMPTY_FILTER, RelationshipDirection.INCOMING), offset,
count);
result.setTotal(result.getTotal() + incomingEdges.getTotal());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.graph;

import com.google.common.collect.ImmutableList;
import com.linkedin.common.EntityRelationship;
import com.linkedin.common.EntityRelationshipArray;
import com.linkedin.common.EntityRelationships;
Expand Down Expand Up @@ -38,10 +39,9 @@ public EntityRelationships getRelatedEntities(String rawUrn, List<String> relati
count = count == null ? DEFAULT_PAGE_SIZE : count;

RelatedEntitiesResult relatedEntitiesResult =
_graphService.findRelatedEntities(
"",
_graphService.findRelatedEntities(ImmutableList.of(),
QueryUtils.newFilter("urn", rawUrn),
"",
ImmutableList.of(),
EMPTY_FILTER,
relationshipTypes,
QueryUtils.newRelationshipFilter(EMPTY_FILTER, direction),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDi
* Unless overridden, it uses the lineage registry to fetch valid edge types and queries for them
*/
@Nonnull
public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction, int offset,
int count, int maxHops, boolean separateSiblings) {
public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction,
int offset, int count, int maxHops, boolean separateSiblings) {
if (separateSiblings) {
return _graphService.getLineage(entityUrn, direction, offset, count, maxHops);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ private static List<String> getDirectedRelationshipTypes(List<String> relationsh
return relationships;
}

protected static String getQueryForRelatedEntities(@Nullable String sourceType,
protected static String getQueryForRelatedEntities(@Nullable List<String> sourceTypes,
@Nonnull Filter sourceEntityFilter,
@Nullable String destinationType,
@Nullable List<String> destinationTypes,
@Nonnull Filter destinationEntityFilter,
@Nonnull List<String> relationshipTypes,
@Nonnull RelationshipFilter relationshipFilter,
Expand Down Expand Up @@ -291,16 +291,20 @@ protected static String getQueryForRelatedEntities(@Nullable String sourceType,
List<String> destinationFilterNames = new ArrayList<>();
List<String> relationshipTypeFilterNames = new ArrayList<>();

if (sourceType != null) {
if (sourceTypes != null && sourceTypes.size() > 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!!

sourceTypeFilterName = "sourceType";
// TODO: escape string value
filters.add(String.format("%s as var(func: eq(<type>, \"%s\"))", sourceTypeFilterName, sourceType));
final StringJoiner joiner = new StringJoiner("\",\"", "[\"", "\"]");
sourceTypes.forEach(type -> joiner.add(type));
filters.add(String.format("%s as var(func: eq(<type>, %s))", sourceTypeFilterName, joiner.toString()));
}

if (destinationType != null) {
if (destinationTypes != null && destinationTypes.size() > 0) {
destinationTypeFilterName = "destinationType";
final StringJoiner joiner = new StringJoiner("\",\"", "[\"", "\"]");
destinationTypes.forEach(type -> joiner.add(type));
// TODO: escape string value
filters.add(String.format("%s as var(func: eq(<type>, \"%s\"))", destinationTypeFilterName, destinationType));
filters.add(String.format("%s as var(func: eq(<type>, %s))", destinationTypeFilterName, joiner.toString()));
}

//noinspection ConstantConditions
Expand Down Expand Up @@ -381,9 +385,9 @@ protected static String getQueryForRelatedEntities(@Nullable String sourceType,

@Nonnull
@Override
public RelatedEntitiesResult findRelatedEntities(@Nullable String sourceType,
public RelatedEntitiesResult findRelatedEntities(@Nullable List<String> sourceTypes,
@Nonnull Filter sourceEntityFilter,
@Nullable String destinationType,
@Nullable List<String> destinationTypes,
@Nonnull Filter destinationEntityFilter,
@Nonnull List<String> relationshipTypes,
@Nonnull RelationshipFilter relationshipFilter,
Expand All @@ -394,8 +398,8 @@ public RelatedEntitiesResult findRelatedEntities(@Nullable String sourceType,
}

String query = getQueryForRelatedEntities(
sourceType, sourceEntityFilter,
destinationType, destinationEntityFilter,
sourceTypes, sourceEntityFilter,
destinationTypes, destinationEntityFilter,
relationshipTypes.stream().filter(get_schema()::hasField).collect(Collectors.toList()),
relationshipFilter,
offset, count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.common.collect.Lists;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.metadata.graph.GraphFilters;
import com.linkedin.metadata.graph.LineageDirection;
import com.linkedin.metadata.models.registry.LineageRegistry;
import com.linkedin.metadata.models.registry.LineageRegistry.EdgeInfo;
Expand Down Expand Up @@ -107,35 +108,35 @@ private SearchResponse executeSearchQuery(@Nonnull final QueryBuilder query, fin
}
}

public SearchResponse getSearchResponse(@Nullable final String sourceType, @Nonnull final Filter sourceEntityFilter,
@Nullable final String destinationType, @Nonnull final Filter destinationEntityFilter,
public SearchResponse getSearchResponse(@Nullable final List<String> sourceTypes, @Nonnull final Filter sourceEntityFilter,
@Nullable final List<String> destinationTypes, @Nonnull final Filter destinationEntityFilter,
@Nonnull final List<String> relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter,
final int offset, final int count) {
BoolQueryBuilder finalQuery =
buildQuery(sourceType, sourceEntityFilter, destinationType, destinationEntityFilter, relationshipTypes,
buildQuery(sourceTypes, sourceEntityFilter, destinationTypes, destinationEntityFilter, relationshipTypes,
relationshipFilter);

return executeSearchQuery(finalQuery, offset, count);
}

public static BoolQueryBuilder buildQuery(@Nullable final String sourceType, @Nonnull final Filter sourceEntityFilter,
@Nullable final String destinationType, @Nonnull final Filter destinationEntityFilter,
public static BoolQueryBuilder buildQuery(@Nullable final List<String> sourceTypes, @Nonnull final Filter sourceEntityFilter,
@Nullable final List<String> destinationTypes, @Nonnull final Filter destinationEntityFilter,
@Nonnull final List<String> relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter) {
BoolQueryBuilder finalQuery = QueryBuilders.boolQuery();

final RelationshipDirection relationshipDirection = relationshipFilter.getDirection();

// set source filter
String sourceNode = relationshipDirection == RelationshipDirection.OUTGOING ? SOURCE : DESTINATION;
if (sourceType != null && sourceType.length() > 0) {
finalQuery.must(QueryBuilders.termQuery(sourceNode + ".entityType", sourceType));
if (sourceTypes != null && sourceTypes.size() > 0) {
finalQuery.must(QueryBuilders.termsQuery(sourceNode + ".entityType", sourceTypes));
}
addFilterToQueryBuilder(sourceEntityFilter, sourceNode, finalQuery);

// set destination filter
String destinationNode = relationshipDirection == RelationshipDirection.OUTGOING ? DESTINATION : SOURCE;
if (destinationType != null && destinationType.length() > 0) {
finalQuery.must(QueryBuilders.termQuery(destinationNode + ".entityType", destinationType));
if (destinationTypes != null && destinationTypes.size() > 0) {
finalQuery.must(QueryBuilders.termsQuery(destinationNode + ".entityType", destinationTypes));
}
addFilterToQueryBuilder(destinationEntityFilter, destinationNode, finalQuery);

Expand All @@ -150,7 +151,7 @@ public static BoolQueryBuilder buildQuery(@Nullable final String sourceType, @No
}

@WithSpan
public LineageResponse getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction, int offset, int count,
public LineageResponse getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction, GraphFilters graphFilters, int offset, int count,
int maxHops) {
List<LineageRelationship> result = new ArrayList<>();
long currentTime = System.currentTimeMillis();
Expand All @@ -175,7 +176,7 @@ public LineageResponse getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirect

// Do one hop on the lineage graph
List<LineageRelationship> oneHopRelationships =
getLineageRelationshipsInBatches(currentLevel, direction, visitedEntities, i + 1, remainingTime);
getLineageRelationshipsInBatches(currentLevel, direction, graphFilters, visitedEntities, i + 1, remainingTime);
result.addAll(oneHopRelationships);
currentLevel = oneHopRelationships.stream().map(LineageRelationship::getEntity).collect(Collectors.toList());
currentTime = System.currentTimeMillis();
Expand All @@ -196,11 +197,11 @@ public LineageResponse getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirect
// Get 1-hop lineage relationships asynchronously in batches with timeout
@WithSpan
public List<LineageRelationship> getLineageRelationshipsInBatches(@Nonnull List<Urn> entityUrns,
@Nonnull LineageDirection direction, Set<Urn> visitedEntities, int numHops, long remainingTime) {
@Nonnull LineageDirection direction, GraphFilters graphFilters, Set<Urn> visitedEntities, int numHops, long remainingTime) {
List<List<Urn>> batches = Lists.partition(entityUrns, BATCH_SIZE);
return ConcurrencyUtils.getAllCompleted(batches.stream()
.map(batchUrns -> CompletableFuture.supplyAsync(
() -> getLineageRelationships(batchUrns, direction, visitedEntities, numHops)))
() -> getLineageRelationships(batchUrns, direction, graphFilters, visitedEntities, numHops)))
.collect(Collectors.toList()), remainingTime, TimeUnit.MILLISECONDS)
.stream()
.flatMap(List::stream)
Expand All @@ -210,7 +211,7 @@ public List<LineageRelationship> getLineageRelationshipsInBatches(@Nonnull List<
// Get 1-hop lineage relationships
@WithSpan
private List<LineageRelationship> getLineageRelationships(@Nonnull List<Urn> entityUrns,
@Nonnull LineageDirection direction, Set<Urn> visitedEntities, int numHops) {
@Nonnull LineageDirection direction, GraphFilters graphFilters, Set<Urn> visitedEntities, int numHops) {
Map<String, List<Urn>> urnsPerEntityType = entityUrns.stream().collect(Collectors.groupingBy(Urn::getEntityType));
Map<String, List<EdgeInfo>> edgesPerEntityType = urnsPerEntityType.keySet()
.stream()
Expand All @@ -219,7 +220,7 @@ private List<LineageRelationship> getLineageRelationships(@Nonnull List<Urn> ent
BoolQueryBuilder finalQuery = QueryBuilders.boolQuery();
// Get all relation types relevant to the set of urns to hop from
urnsPerEntityType.forEach((entityType, urns) -> finalQuery.should(
getQueryForLineage(urns, edgesPerEntityType.getOrDefault(entityType, Collections.emptyList()))));
getQueryForLineage(urns, edgesPerEntityType.getOrDefault(entityType, Collections.emptyList()), graphFilters)));
SearchResponse response = executeSearchQuery(finalQuery, 0, MAX_ELASTIC_RESULT);
Set<Urn> entityUrnSet = new HashSet<>(entityUrns);
// Get all valid edges given the set of urns to hop from
Expand Down Expand Up @@ -269,7 +270,7 @@ private List<LineageRelationship> extractRelationships(@Nonnull Set<Urn> entityU
}

// Get search query for given list of edges and source urns
public QueryBuilder getQueryForLineage(List<Urn> urns, List<EdgeInfo> lineageEdges) {
public QueryBuilder getQueryForLineage(List<Urn> urns, List<EdgeInfo> lineageEdges, GraphFilters graphFilters) {
BoolQueryBuilder query = QueryBuilders.boolQuery();
if (lineageEdges.isEmpty()) {
return query;
Expand All @@ -294,9 +295,22 @@ public QueryBuilder getQueryForLineage(List<Urn> urns, List<EdgeInfo> lineageEdg
incomingEdgeQuery.must(buildEdgeFilters(incomingEdges));
query.should(incomingEdgeQuery);
}

if (graphFilters != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit: this function is getting a bit long.. consider breaking into sub methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (graphFilters.getAllowedEntityTypes() != null && !graphFilters.getAllowedEntityTypes().isEmpty()) {
BoolQueryBuilder allowedEntityTypesFilter = QueryBuilders.boolQuery();
allowedEntityTypesFilter.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), SOURCE));
allowedEntityTypesFilter.must(buildEntityTypesFilter(graphFilters.getAllowedEntityTypes(), DESTINATION));
query.must(allowedEntityTypesFilter);
}
}
return query;
}

public QueryBuilder buildEntityTypesFilter(List<String> entityTypes, String prefix) {
return QueryBuilders.termsQuery(prefix + ".entityType", entityTypes.stream().map(Object::toString).collect(Collectors.toList()));
}

public QueryBuilder buildUrnFilters(List<Urn> urns, String prefix) {
return QueryBuilders.termsQuery(prefix + ".urn", urns.stream().map(Object::toString).collect(Collectors.toList()));
}
Expand Down
Loading