Skip to content

Commit

Permalink
Merge pull request #2285 from atlanhq/feat/airflow-pipeline-fixes
Browse files Browse the repository at this point in the history
fix: add base entity to guidEntityMap in lineage response
  • Loading branch information
suraj5077 authored Aug 21, 2023
2 parents f56e533 + 0200269 commit d59832c
Showing 1 changed file with 7 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,26 +284,23 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag
LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext);
AtlasLineageOnDemandInfo.LineageDirection direction = lineageConstraintsByGuid.getDirection();
int depth = lineageConstraintsByGuid.getDepth();

AtlasLineageOnDemandInfo ret = initializeLineageOnDemandInfo(guid);

if (depth == 0) {
if (depth == 0)
depth = -1;
}

if (!ret.getRelationsOnDemand().containsKey(guid)) {
if (!ret.getRelationsOnDemand().containsKey(guid))
ret.getRelationsOnDemand().put(guid, new LineageInfoOnDemand(lineageConstraintsByGuid));
}

AtomicInteger inputEntitiesTraversed = new AtomicInteger(0);
AtomicInteger outputEntitiesTraversed = new AtomicInteger(0);
if (isDataSet) {
AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, true, depth, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed);
}
if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, false, depth, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed);
}
AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes());
ret.getGuidEntityMap().put(guid, baseEntityHeader);
} else {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
// make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1'
Expand All @@ -317,7 +314,6 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag
}
}
RequestContext.get().endMetricRecord(metricRecorder);

return ret;
}

Expand Down

0 comments on commit d59832c

Please sign in to comment.