diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index de939eac55..df41d22328 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -343,7 +343,7 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI } boolean isInputEdge = processEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); - if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction)) { + if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, new HashSet<>())) { break; } else { addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext, nextLevel, traversalOrder); @@ -387,7 +387,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i continue; } - if (incrementAndCheckIfRelationsLimitReached(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction)) { + if (incrementAndCheckIfRelationsLimitReached(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, visitedVertices)) { LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(baseGuid); if (entityOnDemandInfo == null) continue; @@ -414,7 +414,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) { continue; } - if (incrementAndCheckIfRelationsLimitReached(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction)) { + if (incrementAndCheckIfRelationsLimitReached(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, visitedVertices)) { String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(processGuid); if (entityOnDemandInfo == null) @@ -597,10 +597,8 @@ private static String getId(AtlasVertex vertex) { return vertex.getIdForDisplay(); } - private boolean incrementAndCheckIfRelationsLimitReached(AtlasEdge atlasEdge, boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, AtomicInteger entitiesTraversed, AtlasLineageOnDemandInfo.LineageDirection direction) { + private boolean incrementAndCheckIfRelationsLimitReached(AtlasEdge atlasEdge, boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, AtomicInteger entitiesTraversed, AtlasLineageOnDemandInfo.LineageDirection direction, Set visitedVertices) { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("incrementAndCheckIfRelationsLimitReached"); - if (lineageContainsVisitedEdgeV2(ret, atlasEdge)) - return false; AtlasVertex inVertex = isInput ? atlasEdge.getOutVertex() : atlasEdge.getInVertex(); String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); @@ -613,7 +611,7 @@ private boolean incrementAndCheckIfRelationsLimitReached(AtlasEdge atlasEdge, bo LineageInfoOnDemand inLineageInfo = ret.getRelationsOnDemand().containsKey(inGuid) ? ret.getRelationsOnDemand().get(inGuid) : new LineageInfoOnDemand(inGuidLineageConstraints); LineageInfoOnDemand outLineageInfo = ret.getRelationsOnDemand().containsKey(outGuid) ? ret.getRelationsOnDemand().get(outGuid) : new LineageInfoOnDemand(outGuidLineageConstraints); - setHorizontalPaginationFlags(isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, inVertex, inGuid, outVertex, outGuid, inLineageInfo, outLineageInfo); + setHorizontalPaginationFlags(isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, inVertex, inGuid, outVertex, outGuid, inLineageInfo, outLineageInfo, visitedVertices); boolean hasRelationsLimitReached = setVerticalPaginationFlags(entitiesTraversed, inLineageInfo, outLineageInfo); if (!hasRelationsLimitReached) { @@ -640,9 +638,9 @@ private boolean setVerticalPaginationFlags(AtomicInteger entitiesTraversed, Line return hasRelationsLimitReached; } - private void setHorizontalPaginationFlags(boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, AtomicInteger entitiesTraversed, AtlasVertex inVertex, String inGuid, AtlasVertex outVertex, String outGuid, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo) { - boolean isOutVertexVisited = ret.getRelationsOnDemand().containsKey(outGuid); - boolean isInVertexVisited = ret.getRelationsOnDemand().containsKey(inGuid); + private void setHorizontalPaginationFlags(boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, AtomicInteger entitiesTraversed, AtlasVertex inVertex, String inGuid, AtlasVertex outVertex, String outGuid, LineageInfoOnDemand inLineageInfo, LineageInfoOnDemand outLineageInfo, Set visitedVertices) { + boolean isOutVertexVisited = visitedVertices.contains(getId(outVertex)); + boolean isInVertexVisited = visitedVertices.contains(getId(inVertex)); if (depth == 1 || entitiesTraversed.get() == getLineageMaxNodeAllowedCount()-1) { // is the vertex a leaf? if (isInput && ! isOutVertexVisited) setHasUpstream(atlasLineageOnDemandContext, outVertex, outLineageInfo);