diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 11e08bf78b..917bc71936 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -207,6 +207,11 @@ public final class Constants { public static String[] PROCESS_EDGE_LABELS = {PROCESS_OUTPUTS, PROCESS_INPUTS}; + public static final String PROCESS_ENTITY_TYPE = "Process"; + + public static final String CONNECTION_PROCESS_ENTITY_TYPE = "ConnectionProcess"; + public static final String PARENT_CONNECTION_PROCESS_QUALIFIED_NAME = "parentConnectionProcessQualifiedName"; + /** * The homeId field is used when saving into Atlas a copy of an object that is being imported from another * repository. The homeId will be set to a String that identifies the other repository. The specific format @@ -269,6 +274,7 @@ public final class Constants { public static final String NAME = "name"; public static final String QUALIFIED_NAME = "qualifiedName"; + public static final String CONNECTION_QUALIFIED_NAME = "connectionQualifiedName"; public static final String TYPE_NAME_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "typeName"; public static final String INDEX_SEARCH_MAX_RESULT_SET_SIZE = "atlas.graph.index.search.max-result-set-size"; public static final String INDEX_SEARCH_TYPES_MAX_QUERY_STR_LENGTH = "atlas.graph.index.search.types.max-query-str-length"; diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 9dd862b333..be882a7f28 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -84,6 +84,8 @@ public class EntityLineageService implements AtlasLineageService { private static final String PROCESS_INPUTS_EDGE = "__Process.inputs"; private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs"; + private static final String CONNECTION_PROCESS_INPUTS_EDGE = "__ConnectionProcess.inputs"; + private static final String CONNECTION_PROCESS_OUTPUTS_EDGE = "__ConnectionProcess.outputs"; private static final String COLUMNS = "columns"; private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean(); private static final Integer DEFAULT_LINEAGE_MAX_NODE_COUNT = 9000; @@ -178,8 +180,8 @@ public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemand RequestContext.get().setRelationAttrsForSearch(lineageOnDemandRequest.getRelationAttributes()); AtlasLineageOnDemandContext atlasLineageOnDemandContext = new AtlasLineageOnDemandContext(lineageOnDemandRequest, atlasTypeRegistry); - boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid); - AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, isDataSet); + EntityValidationResult entityValidationResult = validateEntityTypeAndCheckIfDataSet(guid); + AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, entityValidationResult); appendLineageOnDemandPayload(ret, lineageOnDemandRequest); // filtering out on-demand relations which has input & output nodes within the limit cleanupRelationsOnDemand(ret); @@ -204,20 +206,44 @@ public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListR return ret; } - private boolean validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException { + public class EntityValidationResult { + public final boolean isProcess; + public final boolean isDataSet; + public final boolean isConnection; + public final boolean isConnectionProcess; + + public EntityValidationResult(boolean isProcess, boolean isDataSet, boolean isConnection, boolean isConnectionProcess) { + this.isProcess = isProcess; + this.isDataSet = isDataSet; + this.isConnection = isConnection; + this.isConnectionProcess = isConnectionProcess; + } + } + + + private EntityValidationResult validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException { String typeName = entityRetriever.getEntityVertex(guid).getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class); AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName); if (entityType == null) { throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, typeName); } boolean isProcess = entityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE); + boolean isConnectionProcess = false; + boolean isDataSet = false; + boolean isConnection = false; if (!isProcess) { - boolean isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE); - if (!isDataSet) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName); + isConnectionProcess = entityType.getTypeAndAllSuperTypes().contains(CONNECTION_PROCESS_ENTITY_TYPE); + if(!isConnectionProcess){ + isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE); + if (!isDataSet) { + isConnection = entityType.getTypeAndAllSuperTypes().contains(CONNECTION_ENTITY_TYPE); + if(!isConnection){ + throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName); + } + } } } - return !isProcess; + return new EntityValidationResult(isProcess, isDataSet, isConnection, isConnectionProcess); } private LineageOnDemandConstraints getLineageConstraints(String guid, LineageOnDemandBaseParams defaultParams) { @@ -282,7 +308,7 @@ private void cleanupRelationsOnDemand(AtlasLineageOnDemandInfo lineageInfo) { } } - private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, boolean isDataSet) throws AtlasBaseException { + private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, EntityValidationResult entityValidationResult) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageInfoOnDemand"); LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext); @@ -301,12 +327,12 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtomicInteger traversalOrder = new AtomicInteger(1); TimeoutChecker timeoutChecker = new TimeoutChecker(LINEAGE_TIMEOUT_MS); atlasLineageOnDemandContext.setTimeoutChecker(timeoutChecker); - if (isDataSet) { + if (entityValidationResult.isConnection || entityValidationResult.isDataSet) { AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder); + traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder, entityValidationResult); if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder); + traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder, entityValidationResult); AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader); ret.getGuidEntityMap().put(guid, baseEntityHeader); @@ -314,12 +340,12 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { - Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder); + Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, entityValidationResult.isProcess? PROCESS_INPUTS_EDGE:CONNECTION_PROCESS_INPUTS_EDGE).iterator(); + traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder, entityValidationResult); } if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { - Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder); + Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, entityValidationResult.isProcess? PROCESS_OUTPUTS_EDGE:CONNECTION_PROCESS_OUTPUTS_EDGE).iterator(); + traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder, entityValidationResult); } } RequestContext.get().endMetricRecord(metricRecorder); @@ -332,7 +358,7 @@ private static void setGraphTraversalMetadata(int level, AtomicInteger traversal baseEntityHeader.setFinishTime(traversalOrder.get()); } - private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { + private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, EntityValidationResult entityValidationResult) throws AtlasBaseException { AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; int nextLevel = isInput ? level - 1: level + 1; @@ -363,11 +389,12 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains)); } - traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); + EntityValidationResult entityValidationResult1 = validateEntityTypeAndCheckIfDataSet(inGuid); + traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, entityValidationResult1); } } - private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { + private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, EntityValidationResult entityValidationResult) throws AtlasBaseException { // Get timeout checker from context or create new one TimeoutChecker timeoutChecker = atlasLineageOnDemandContext.getTimeoutChecker(); // Check timeout before starting traversal @@ -385,8 +412,13 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i // keep track of visited vertices to avoid circular loop visitedVertices.add(getId(datasetVertex)); + Iterator incomingEdges = null; AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn"); - Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); + if(entityValidationResult.isDataSet){ + incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); + } else if (entityValidationResult.isConnection) { + incomingEdges = datasetVertex.getEdges(IN, isInput ? CONNECTION_PROCESS_OUTPUTS_EDGE : CONNECTION_PROCESS_INPUTS_EDGE).iterator(); + } RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn); while (incomingEdges.hasNext()) { @@ -420,7 +452,13 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i } AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); - Iterator outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); + Iterator outgoingEdges = null; + if(entityValidationResult.isDataSet){ + outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); + } else if (entityValidationResult.isConnection) { + outgoingEdges = processVertex.getEdges(OUT, isInput ? CONNECTION_PROCESS_INPUTS_EDGE : CONNECTION_PROCESS_OUTPUTS_EDGE).iterator(); + } + RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut); while (outgoingEdges.hasNext()) { @@ -457,7 +495,8 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i setEntityLimitReachedFlag(isInput, ret); } if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { - traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); // execute inner depth + EntityValidationResult entityValidationResult1 = validateEntityTypeAndCheckIfDataSet(getGuid(entityVertex)); + traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, entityValidationResult1); // execute inner depth AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex)); if (traversedEntity != null) traversedEntity.setFinishTime(traversalOrder.get()); @@ -494,7 +533,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); - boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid); + boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid).isDataSet; // Get the neighbors for the current node enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); int currentDepth = 0; @@ -522,7 +561,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line if (Objects.isNull(currentVertex)) throw new AtlasBaseException("Found null vertex during lineage graph traversal for guid: " + currentGUID); - boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID); + boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID).isDataSet; if (!lineageListContext.evaluateVertexFilter(currentVertex)) { enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); continue; @@ -1539,7 +1578,7 @@ private void processEdge(final AtlasEdge edge, AtlasLineageInfo lineageInfo, String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE) || edge.getLabel().equalsIgnoreCase(CONNECTION_PROCESS_INPUTS_EDGE); if (!entities.containsKey(inGuid)) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(inVertex, lineageContext.getAttributes()); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index f48d206653..75e55cd8a3 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -56,6 +56,11 @@ import org.apache.atlas.repository.store.graph.v2.preprocessor.AssetPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.AuthPolicyPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.ConnectionPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.StakeholderPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.contract.ContractPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.datamesh.StakeholderTitlePreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.lineage.LineagePreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PersonaPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PurposePreProcessor; @@ -1925,6 +1930,9 @@ public List getPreProcessor(String typeName) { case STAKEHOLDER_TITLE_ENTITY_TYPE: preProcessors.add(new StakeholderTitlePreProcessor(graph, typeRegistry, entityRetriever)); break; + + case PROCESS_ENTITY_TYPE: + preProcessors.add(new LineagePreProcessor(typeRegistry, entityRetriever, graph, this)); } // The default global pre-processor for all AssetTypes diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java new file mode 100644 index 0000000000..cd24ac943d --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java @@ -0,0 +1,472 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2.preprocessor.lineage; + + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; +import org.apache.atlas.DeleteType; +import org.apache.atlas.RequestContext; +import org.apache.atlas.discovery.EntityDiscoveryService; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.*; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.repository.store.graph.v2.EntityStream; +import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; +import org.apache.atlas.repository.util.AtlasEntityUtils; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE; +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.Constants.NAME; +import static org.apache.atlas.repository.graph.GraphHelper.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.indexSearchPaginated; +import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; + +public class LineagePreProcessor implements PreProcessor { + private static final Logger LOG = LoggerFactory.getLogger(LineagePreProcessor.class); + private static final List FETCH_ENTITY_ATTRIBUTES = Arrays.asList(CONNECTION_QUALIFIED_NAME); + private final AtlasTypeRegistry typeRegistry; + private final EntityGraphRetriever entityRetriever; + private AtlasEntityStore entityStore; + protected EntityDiscoveryService discovery; + private static final String HAS_LINEAGE = "__hasLineage"; + + public LineagePreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, AtlasGraph graph, AtlasEntityStore entityStore) { + this.entityRetriever = entityRetriever; + this.typeRegistry = typeRegistry; + this.entityStore = entityStore; + try { + this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null); + } catch (AtlasException e) { + e.printStackTrace(); + } + } + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processAttributesForLineagePreprocessor"); + + try { + if (LOG.isDebugEnabled()) { + LOG.debug("LineageProcessPreProcessor.processAttributes: pre processing {}, {}", entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + ArrayList connectionProcessQNs = getConnectionProcessQNsForTheGivenInputOutputs(entity); + + switch (operation) { + case CREATE: + processCreateLineageProcess(entity, connectionProcessQNs); + break; + case UPDATE: + processUpdateLineageProcess(entity, vertex, context, connectionProcessQNs); + break; + } + }catch(Exception exp){ + if (LOG.isDebugEnabled()) { + LOG.debug("Lineage preprocessor: " + exp); + } + }finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + + } + + private void processCreateLineageProcess(AtlasEntity entity, ArrayList connectionProcessList) { + // if not exist create lineage process + // add owner connection process + if(!connectionProcessList.isEmpty()){ + entity.setAttribute(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, connectionProcessList); + } + } + + private void processUpdateLineageProcess(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context, ArrayList connectionProcessList) { + // check if connection lineage exists + // add owner connection process + if(!connectionProcessList.isEmpty()){ + entity.setAttribute(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, connectionProcessList); + } + } + + private AtlasEntity createConnectionProcessEntity(Map connectionProcessInfo) throws AtlasBaseException { + AtlasEntity processEntity = new AtlasEntity(); + processEntity.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE); + processEntity.setAttribute(NAME, connectionProcessInfo.get("connectionProcessName")); + processEntity.setAttribute(QUALIFIED_NAME, connectionProcessInfo.get("connectionProcessQualifiedName")); + + // Set up relationship attributes for input and output connections + AtlasObjectId inputConnection = new AtlasObjectId(); + inputConnection.setTypeName(CONNECTION_ENTITY_TYPE); + inputConnection.setUniqueAttributes(mapOf(QUALIFIED_NAME, connectionProcessInfo.get("input"))); + + AtlasObjectId outputConnection = new AtlasObjectId(); + outputConnection.setTypeName(CONNECTION_ENTITY_TYPE); + outputConnection.setUniqueAttributes(mapOf(QUALIFIED_NAME, connectionProcessInfo.get("output"))); + + Map relationshipAttributes = new HashMap<>(); + relationshipAttributes.put("inputs", Collections.singletonList(inputConnection)); + relationshipAttributes.put("outputs", Collections.singletonList(outputConnection)); + processEntity.setRelationshipAttributes(relationshipAttributes); + + try { + RequestContext.get().setSkipAuthorizationCheck(true); + AtlasEntity.AtlasEntitiesWithExtInfo processExtInfo = new AtlasEntity.AtlasEntitiesWithExtInfo(); + processExtInfo.addEntity(processEntity); + EntityStream entityStream = new AtlasEntityStream(processExtInfo); + entityStore.createOrUpdate(entityStream, false); + + // Update hasLineage for both connections + updateConnectionLineageFlag((String) connectionProcessInfo.get("input"), true); + updateConnectionLineageFlag((String) connectionProcessInfo.get("output"), true); + } finally { + RequestContext.get().setSkipAuthorizationCheck(false); + } + + return processEntity; + } + + private void updateConnectionLineageFlag(String connectionQualifiedName, boolean hasLineage) throws AtlasBaseException { + AtlasObjectId connectionId = new AtlasObjectId(); + connectionId.setTypeName(CONNECTION_ENTITY_TYPE); + connectionId.setUniqueAttributes(mapOf(QUALIFIED_NAME, connectionQualifiedName)); + + try { + AtlasVertex connectionVertex = entityRetriever.getEntityVertex(connectionId); + AtlasEntity connection = entityRetriever.toAtlasEntity(connectionVertex); + connection.setAttribute(HAS_LINEAGE, hasLineage); + + AtlasEntity.AtlasEntitiesWithExtInfo connectionExtInfo = new AtlasEntity.AtlasEntitiesWithExtInfo(); + connectionExtInfo.addEntity(connection); + EntityStream entityStream = new AtlasEntityStream(connectionExtInfo); + + RequestContext.get().setSkipAuthorizationCheck(true); + try { + entityStore.createOrUpdate(entityStream, false); + } finally { + RequestContext.get().setSkipAuthorizationCheck(false); + } + } catch (AtlasBaseException e) { + if (!e.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) { + throw e; + } + } + } + + private void checkAndUpdateConnectionLineage(String connectionQualifiedName) throws AtlasBaseException { + AtlasObjectId connectionId = new AtlasObjectId(); + connectionId.setTypeName(CONNECTION_ENTITY_TYPE); + connectionId.setUniqueAttributes(mapOf(QUALIFIED_NAME, connectionQualifiedName)); + + try { + AtlasVertex connectionVertex = entityRetriever.getEntityVertex(connectionId); + + // Check if this connection has any active connection processes + boolean hasActiveConnectionProcess = hasActiveConnectionProcesses(connectionVertex); + + // Only update if the hasLineage status needs to change + boolean currentHasLineage = getEntityHasLineage(connectionVertex); + if (currentHasLineage != hasActiveConnectionProcess) { + if (LOG.isDebugEnabled()) { + LOG.debug("Updating hasLineage for connection {} from {} to {}", + connectionQualifiedName, currentHasLineage, hasActiveConnectionProcess); + } + + AtlasEntity connection = entityRetriever.toAtlasEntity(connectionVertex); + connection.setAttribute(HAS_LINEAGE, hasActiveConnectionProcess); + + AtlasEntity.AtlasEntitiesWithExtInfo connectionExtInfo = new AtlasEntity.AtlasEntitiesWithExtInfo(); + connectionExtInfo.addEntity(connection); + EntityStream entityStream = new AtlasEntityStream(connectionExtInfo); + + RequestContext.get().setSkipAuthorizationCheck(true); + try { + entityStore.createOrUpdate(entityStream, false); + } finally { + RequestContext.get().setSkipAuthorizationCheck(false); + } + } + } catch (AtlasBaseException e) { + if (!e.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) { + throw e; + } + } + } + + private boolean hasActiveConnectionProcesses(AtlasVertex connectionVertex) { + // Iterate over both input and output edges connected to this connection + Iterator edges = connectionVertex.getEdges(AtlasEdgeDirection.BOTH, + new String[]{"__ConnectionProcess.inputs", "__ConnectionProcess.outputs"}).iterator(); + + while (edges.hasNext()) { + AtlasEdge edge = edges.next(); + + // Check if the edge is ACTIVE + if (getStatus(edge) == ACTIVE) { + // Get the connected process vertex (the other vertex of the edge) + AtlasVertex processVertex = edge.getOutVertex().equals(connectionVertex) ? + edge.getInVertex() : edge.getOutVertex(); + + // Check if the connected vertex is an ACTIVE ConnectionProcess + if (getStatus(processVertex) == ACTIVE && + getTypeName(processVertex).equals(CONNECTION_PROCESS_ENTITY_TYPE)) { + return true; + } + } + } + return false; + } + + private ArrayList getConnectionProcessQNsForTheGivenInputOutputs(AtlasEntity processEntity) throws AtlasBaseException{ + + // check connection lineage exists or not + // check if connection lineage exists + Map entityAttrValues = processEntity.getRelationshipAttributes(); + + ArrayList inputsAssets = (ArrayList) entityAttrValues.get("inputs"); + ArrayList outputsAssets = (ArrayList) entityAttrValues.get("outputs"); + + // get connection process + Set> uniquesSetOfConnectionProcess = new HashSet<>(); + + for (AtlasObjectId input : inputsAssets){ + AtlasVertex inputVertex = entityRetriever.getEntityVertex(input); + Map inputVertexConnectionQualifiedName = fetchAttributes(inputVertex, FETCH_ENTITY_ATTRIBUTES); + for (AtlasObjectId output : outputsAssets){ + AtlasVertex outputVertex = entityRetriever.getEntityVertex(output); + Map outputVertexConnectionQualifiedName = fetchAttributes(outputVertex, FETCH_ENTITY_ATTRIBUTES); + + if(inputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME) == outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME)){ + continue; + } + + String connectionProcessName = "(" + inputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME) + ")->(" + outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME) + ")"; + String connectionProcessQualifiedName = outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME) + "/" + connectionProcessName; + // Create a map to store both connectionProcessName and connectionProcessQualifiedName + Map connectionProcessMap = new HashMap<>(); + connectionProcessMap.put("input", inputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME)); + connectionProcessMap.put("output", outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME)); + connectionProcessMap.put("connectionProcessName", connectionProcessName); + connectionProcessMap.put("connectionProcessQualifiedName", connectionProcessQualifiedName); + + // Add the map to the set + uniquesSetOfConnectionProcess.add(connectionProcessMap); + } + } + + ArrayList connectionProcessList = new ArrayList<>(); + + // check if connection process exists + for (Map connectionProcessInfo : uniquesSetOfConnectionProcess){ + AtlasObjectId atlasObjectId = new AtlasObjectId(); + atlasObjectId.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE); + atlasObjectId.setUniqueAttributes(mapOf(QUALIFIED_NAME, connectionProcessInfo.get("connectionProcessQualifiedName"))); + AtlasVertex connectionProcessVertex = null; + try { + // TODO add caching here + connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId); + } + catch(AtlasBaseException exp){ + if(!exp.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)){ + throw exp; + } + } + + AtlasEntity connectionProcess; + if (connectionProcessVertex == null) { + connectionProcess = createConnectionProcessEntity(connectionProcessInfo); + } else { + // exist so retrieve and perform any update so below statement to retrieve + // TODO add caching here + connectionProcess = entityRetriever.toAtlasEntity(connectionProcessVertex); + } + // only add in list if created + connectionProcessList.add(connectionProcess.getAttribute(QUALIFIED_NAME)); + } + + return connectionProcessList; + } + + public boolean checkIfMoreChildProcessExistForConnectionProcess(String connectionProcessQn) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("checkIfMoreChileProcessExistForConnectionProcess"); + boolean ret = false; + + try { + List mustClauseList = new ArrayList(); + mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", PROCESS_ENTITY_TYPE))); + mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); + mustClauseList.add(mapOf("term", mapOf(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, connectionProcessQn))); + + Map dsl = mapOf("query", mapOf("bool", mapOf("must", mustClauseList))); + + List process = indexSearchPaginated(dsl, new HashSet<>(Arrays.asList(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME)) , this.discovery); + + if (CollectionUtils.isNotEmpty(process) && process.size()>1) { + ret = true; + } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + return ret; + } + + // handle process delete logic + @Override + public void processDelete(AtlasVertex vertex) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processDeleteLineageProcess"); + + try { + // handle both soft and hard deletes + // Collect all connections involved in the process being deleted + AtlasEntity processEntity = entityRetriever.toAtlasEntity(vertex); + + Set involvedConnections = new HashSet<>(); + + // Retrieve inputs and outputs from the process + List inputs = (List) processEntity.getRelationshipAttribute("inputs"); + List outputs = (List) processEntity.getRelationshipAttribute("outputs"); + + if (inputs == null) inputs = Collections.emptyList(); + if (outputs == null) outputs = Collections.emptyList(); + + List allAssets = new ArrayList<>(); + allAssets.addAll(inputs); + allAssets.addAll(outputs); + + // For each asset, get its connection and add to involvedConnections + for (AtlasObjectId assetId : allAssets) { + try { + AtlasVertex assetVertex = entityRetriever.getEntityVertex(assetId); + Map assetConnectionAttributes = fetchAttributes(assetVertex, FETCH_ENTITY_ATTRIBUTES); + if (assetConnectionAttributes != null) { + String connectionQN = assetConnectionAttributes.get(CONNECTION_QUALIFIED_NAME); + if (StringUtils.isNotEmpty(connectionQN)) { + involvedConnections.add(connectionQN); + } + } + } catch (AtlasBaseException e) { + LOG.warn("Failed to retrieve connection for asset {}: {}", assetId.getGuid(), e.getMessage()); + } + } + + // Collect affected connections from connection processes to be deleted + Set connectionProcessQNs = new HashSet<>(); + + Object rawProperty = vertex.getProperty(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, Object.class); + + if (rawProperty instanceof List) { + // If the property is a List, cast and add all elements + List propertyList = (List) rawProperty; + connectionProcessQNs.addAll(propertyList); + } else if (rawProperty instanceof String) { + // If it's a single String, add it to the set + connectionProcessQNs.add((String) rawProperty); + } else if (rawProperty != null) { + // Handle other object types if necessary + connectionProcessQNs.add(rawProperty.toString()); + } + + if (connectionProcessQNs.isEmpty()) { + return; + } + + Set affectedConnections = new HashSet<>(); + + // Process each connection process + for (String connectionProcessQn : connectionProcessQNs) { + if (!checkIfMoreChildProcessExistForConnectionProcess(connectionProcessQn)) { + AtlasObjectId atlasObjectId = new AtlasObjectId(); + atlasObjectId.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE); + atlasObjectId.setUniqueAttributes(AtlasEntityUtils.mapOf(QUALIFIED_NAME, connectionProcessQn)); + + try { + // Get connection process before deletion to track affected connections + AtlasVertex connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId); + AtlasEntity connectionProcess = entityRetriever.toAtlasEntity(connectionProcessVertex); + + // Safely get connection qualified names + String inputConnQN = getConnectionQualifiedName(connectionProcess, "input"); + String outputConnQN = getConnectionQualifiedName(connectionProcess, "output"); + + // Add non-null qualified names to affected connections + if (StringUtils.isNotEmpty(inputConnQN)) { + affectedConnections.add(inputConnQN); + } + if (StringUtils.isNotEmpty(outputConnQN)) { + affectedConnections.add(outputConnQN); + } + + // Delete the connection process + entityStore.deleteById(connectionProcessVertex.getProperty("__guid", String.class)); + } catch (AtlasBaseException exp) { + if (!exp.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) { + throw exp; + } + } + } + } + + // Combine involved and affected connections + Set connectionsToCheck = new HashSet<>(); + connectionsToCheck.addAll(involvedConnections); + connectionsToCheck.addAll(affectedConnections); + + // Check and update hasLineage for all connections involved + for (String connectionQN : connectionsToCheck) { + checkAndUpdateConnectionLineage(connectionQN); + } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + } + + // Helper method to safely get connection qualified name + private String getConnectionQualifiedName(AtlasEntity connectionProcess, String attributeName) { + try { + Object relationshipAttr = connectionProcess.getRelationshipAttribute(attributeName); + if (relationshipAttr instanceof AtlasObjectId) { + AtlasObjectId connObjectId = (AtlasObjectId) relationshipAttr; + Map uniqueAttributes = connObjectId.getUniqueAttributes(); + if (uniqueAttributes != null) { + return (String) uniqueAttributes.get(QUALIFIED_NAME); + } + } + } catch (Exception e) { + LOG.warn("Error getting {} qualified name for connection process {}: {}", + attributeName, connectionProcess.getGuid(), e.getMessage()); + } + return null; + } +}