Skip to content

Commit

Permalink
Merge pull request #3789 from atlanhq/lin-160-beta2
Browse files Browse the repository at this point in the history
LIN-160 [summary lineage] added LineagePreprocessor to add and delete connectionProcess assets
  • Loading branch information
rahul-madaan authored Nov 26, 2024
2 parents 054072b + c160162 commit 5abbf15
Show file tree
Hide file tree
Showing 4 changed files with 549 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ public final class Constants {

public static String[] PROCESS_EDGE_LABELS = {PROCESS_OUTPUTS, PROCESS_INPUTS};

public static final String PROCESS_ENTITY_TYPE = "Process";

public static final String CONNECTION_PROCESS_ENTITY_TYPE = "ConnectionProcess";
public static final String PARENT_CONNECTION_PROCESS_QUALIFIED_NAME = "parentConnectionProcessQualifiedName";

/**
* The homeId field is used when saving into Atlas a copy of an object that is being imported from another
* repository. The homeId will be set to a String that identifies the other repository. The specific format
Expand Down Expand Up @@ -269,6 +274,7 @@ public final class Constants {

public static final String NAME = "name";
public static final String QUALIFIED_NAME = "qualifiedName";
public static final String CONNECTION_QUALIFIED_NAME = "connectionQualifiedName";
public static final String TYPE_NAME_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "typeName";
public static final String INDEX_SEARCH_MAX_RESULT_SET_SIZE = "atlas.graph.index.search.max-result-set-size";
public static final String INDEX_SEARCH_TYPES_MAX_QUERY_STR_LENGTH = "atlas.graph.index.search.types.max-query-str-length";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class EntityLineageService implements AtlasLineageService {

private static final String PROCESS_INPUTS_EDGE = "__Process.inputs";
private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs";
private static final String CONNECTION_PROCESS_INPUTS_EDGE = "__ConnectionProcess.inputs";
private static final String CONNECTION_PROCESS_OUTPUTS_EDGE = "__ConnectionProcess.outputs";
private static final String COLUMNS = "columns";
private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean();
private static final Integer DEFAULT_LINEAGE_MAX_NODE_COUNT = 9000;
Expand Down Expand Up @@ -178,8 +180,8 @@ public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemand

RequestContext.get().setRelationAttrsForSearch(lineageOnDemandRequest.getRelationAttributes());
AtlasLineageOnDemandContext atlasLineageOnDemandContext = new AtlasLineageOnDemandContext(lineageOnDemandRequest, atlasTypeRegistry);
boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid);
AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, isDataSet);
EntityValidationResult entityValidationResult = validateEntityTypeAndCheckIfDataSet(guid);
AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, entityValidationResult);
appendLineageOnDemandPayload(ret, lineageOnDemandRequest);
// filtering out on-demand relations which has input & output nodes within the limit
cleanupRelationsOnDemand(ret);
Expand All @@ -204,20 +206,44 @@ public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListR
return ret;
}

private boolean validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException {
public class EntityValidationResult {
public final boolean isProcess;
public final boolean isDataSet;
public final boolean isConnection;
public final boolean isConnectionProcess;

public EntityValidationResult(boolean isProcess, boolean isDataSet, boolean isConnection, boolean isConnectionProcess) {
this.isProcess = isProcess;
this.isDataSet = isDataSet;
this.isConnection = isConnection;
this.isConnectionProcess = isConnectionProcess;
}
}


private EntityValidationResult validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException {
String typeName = entityRetriever.getEntityVertex(guid).getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName);
if (entityType == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, typeName);
}
boolean isProcess = entityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE);
boolean isConnectionProcess = false;
boolean isDataSet = false;
boolean isConnection = false;
if (!isProcess) {
boolean isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE);
if (!isDataSet) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName);
isConnectionProcess = entityType.getTypeAndAllSuperTypes().contains(CONNECTION_PROCESS_ENTITY_TYPE);
if(!isConnectionProcess){
isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE);
if (!isDataSet) {
isConnection = entityType.getTypeAndAllSuperTypes().contains(CONNECTION_ENTITY_TYPE);
if(!isConnection){
throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName);
}
}
}
}
return !isProcess;
return new EntityValidationResult(isProcess, isDataSet, isConnection, isConnectionProcess);
}

private LineageOnDemandConstraints getLineageConstraints(String guid, LineageOnDemandBaseParams defaultParams) {
Expand Down Expand Up @@ -282,7 +308,7 @@ private void cleanupRelationsOnDemand(AtlasLineageOnDemandInfo lineageInfo) {
}
}

private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, boolean isDataSet) throws AtlasBaseException {
private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, EntityValidationResult entityValidationResult) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageInfoOnDemand");

LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext);
Expand All @@ -301,25 +327,25 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag
AtomicInteger traversalOrder = new AtomicInteger(1);
TimeoutChecker timeoutChecker = new TimeoutChecker(LINEAGE_TIMEOUT_MS);
atlasLineageOnDemandContext.setTimeoutChecker(timeoutChecker);
if (isDataSet) {
if (entityValidationResult.isConnection || entityValidationResult.isDataSet) {
AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder);
traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder, entityValidationResult);
if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH)
traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder);
traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder, entityValidationResult);
AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes());
setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader);
ret.getGuidEntityMap().put(guid, baseEntityHeader);
} else {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
// make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1'
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator();
traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder);
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, entityValidationResult.isProcess? PROCESS_INPUTS_EDGE:CONNECTION_PROCESS_INPUTS_EDGE).iterator();
traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder, entityValidationResult);
}
if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator();
traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder);
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, entityValidationResult.isProcess? PROCESS_OUTPUTS_EDGE:CONNECTION_PROCESS_OUTPUTS_EDGE).iterator();
traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder, entityValidationResult);
}
}
RequestContext.get().endMetricRecord(metricRecorder);
Expand All @@ -332,7 +358,7 @@ private static void setGraphTraversalMetadata(int level, AtomicInteger traversal
baseEntityHeader.setFinishTime(traversalOrder.get());
}

private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException {
private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, EntityValidationResult entityValidationResult) throws AtlasBaseException {
AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT;
int nextLevel = isInput ? level - 1: level + 1;

Expand Down Expand Up @@ -363,11 +389,12 @@ private void traverseEdgesOnDemand(Iterator<AtlasEdge> processEdges, boolean isI
ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains));
}

traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder);
EntityValidationResult entityValidationResult1 = validateEntityTypeAndCheckIfDataSet(inGuid);
traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, entityValidationResult1);
}
}

private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set<String> visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException {
private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set<String> visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, EntityValidationResult entityValidationResult) throws AtlasBaseException {
// Get timeout checker from context or create new one
TimeoutChecker timeoutChecker = atlasLineageOnDemandContext.getTimeoutChecker();
// Check timeout before starting traversal
Expand All @@ -385,8 +412,13 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
// keep track of visited vertices to avoid circular loop
visitedVertices.add(getId(datasetVertex));

Iterator<AtlasEdge> incomingEdges = null;
AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn");
Iterator<AtlasEdge> incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator();
if(entityValidationResult.isDataSet){
incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator();
} else if (entityValidationResult.isConnection) {
incomingEdges = datasetVertex.getEdges(IN, isInput ? CONNECTION_PROCESS_OUTPUTS_EDGE : CONNECTION_PROCESS_INPUTS_EDGE).iterator();
}
RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn);

while (incomingEdges.hasNext()) {
Expand Down Expand Up @@ -420,7 +452,13 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
}

AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut");
Iterator<AtlasEdge> outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator();
Iterator<AtlasEdge> outgoingEdges = null;
if(entityValidationResult.isDataSet){
outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator();
} else if (entityValidationResult.isConnection) {
outgoingEdges = processVertex.getEdges(OUT, isInput ? CONNECTION_PROCESS_INPUTS_EDGE : CONNECTION_PROCESS_OUTPUTS_EDGE).iterator();
}

RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut);

while (outgoingEdges.hasNext()) {
Expand Down Expand Up @@ -457,7 +495,8 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
setEntityLimitReachedFlag(isInput, ret);
}
if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) {
traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); // execute inner depth
EntityValidationResult entityValidationResult1 = validateEntityTypeAndCheckIfDataSet(getGuid(entityVertex));
traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, entityValidationResult1); // execute inner depth
AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex));
if (traversedEntity != null)
traversedEntity.setFinishTime(traversalOrder.get());
Expand Down Expand Up @@ -494,7 +533,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line


AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid);
boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid);
boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid).isDataSet;
// Get the neighbors for the current node
enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap);
int currentDepth = 0;
Expand Down Expand Up @@ -522,7 +561,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line
if (Objects.isNull(currentVertex))
throw new AtlasBaseException("Found null vertex during lineage graph traversal for guid: " + currentGUID);

boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID);
boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID).isDataSet;
if (!lineageListContext.evaluateVertexFilter(currentVertex)) {
enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap);
continue;
Expand Down Expand Up @@ -1539,7 +1578,7 @@ private void processEdge(final AtlasEdge edge, AtlasLineageInfo lineageInfo,
String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex);
String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex);
String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class);
boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE);
boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE) || edge.getLabel().equalsIgnoreCase(CONNECTION_PROCESS_INPUTS_EDGE);

if (!entities.containsKey(inGuid)) {
AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(inVertex, lineageContext.getAttributes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
import org.apache.atlas.repository.store.graph.v2.preprocessor.AssetPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.AuthPolicyPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.ConnectionPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.StakeholderPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.contract.ContractPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.StakeholderTitlePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.lineage.LineagePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PersonaPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PurposePreProcessor;
Expand Down Expand Up @@ -1925,6 +1930,9 @@ public List<PreProcessor> getPreProcessor(String typeName) {
case STAKEHOLDER_TITLE_ENTITY_TYPE:
preProcessors.add(new StakeholderTitlePreProcessor(graph, typeRegistry, entityRetriever));
break;

case PROCESS_ENTITY_TYPE:
preProcessors.add(new LineagePreProcessor(typeRegistry, entityRetriever, graph, this));
}

// The default global pre-processor for all AssetTypes
Expand Down
Loading

0 comments on commit 5abbf15

Please sign in to comment.