From 0363a1c1b1db70badb59591a128742b49323e03f Mon Sep 17 00:00:00 2001 From: Rahul Madan Date: Thu, 28 Nov 2024 17:25:30 +0530 Subject: [PATCH] simplified delete --- .../lineage/LineagePreProcessor.java | 154 ++++++++---------- 1 file changed, 66 insertions(+), 88 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java index f7e5b2edfb..e36469d723 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java @@ -286,7 +286,6 @@ private boolean hasActiveConnectionProcesses(AtlasVertex connectionVertex) { private ArrayList getConnectionProcessQNsForTheGivenInputOutputs(AtlasEntity processEntity) throws AtlasBaseException{ - // check connection lineage exists or not // check if connection lineage exists Map entityAttrValues = processEntity.getRelationshipAttributes(); @@ -383,111 +382,90 @@ public void processDelete(AtlasVertex vertex) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processDeleteLineageProcess"); try { - // Collect all connections involved in the process being deleted AtlasEntity processEntity = entityRetriever.toAtlasEntity(vertex); + Set connectionsToCheck = new HashSet<>(); + + // Collect connections from inputs and outputs + collectConnectionsFromAssets(processEntity.getRelationshipAttribute("inputs"), connectionsToCheck); + collectConnectionsFromAssets(processEntity.getRelationshipAttribute("outputs"), connectionsToCheck); + + // Handle connection processes + Object rawConnectionProcessQNs = vertex.getProperty(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, Object.class); + if (rawConnectionProcessQNs == null) { + return; + } - Set involvedConnections = new HashSet<>(); - - // Retrieve inputs and outputs from the process - List inputs = (List) processEntity.getRelationshipAttribute("inputs"); - List outputs = (List) processEntity.getRelationshipAttribute("outputs"); + Set connectionProcessQNs = new HashSet<>(); + if (rawConnectionProcessQNs instanceof List) { + connectionProcessQNs.addAll((List) rawConnectionProcessQNs); + } else { + connectionProcessQNs.add(rawConnectionProcessQNs.toString()); + } - if (inputs == null) inputs = Collections.emptyList(); - if (outputs == null) outputs = Collections.emptyList(); + // Process each connection process + for (String connectionProcessQn : connectionProcessQNs) { + if (!checkIfMoreChildProcessExistForConnectionProcess(connectionProcessQn)) { + deleteConnectionProcess(connectionProcessQn, connectionsToCheck); + } + } - List allAssets = new ArrayList<>(); - allAssets.addAll(inputs); - allAssets.addAll(outputs); + // Update hasLineage flags + for (String connectionQN : connectionsToCheck) { + checkAndUpdateConnectionLineage(connectionQN); + } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + } - // For each asset, get its connection and add to involvedConnections - for (AtlasObjectId assetId : allAssets) { + private void collectConnectionsFromAssets(Object assets, Set connections) { + if (assets instanceof List) { + for (AtlasObjectId assetId : (List) assets) { try { AtlasVertex assetVertex = entityRetriever.getEntityVertex(assetId); - Map assetConnectionAttributes = fetchAttributes(assetVertex, FETCH_ENTITY_ATTRIBUTES); - if (assetConnectionAttributes != null) { - String connectionQN = assetConnectionAttributes.get(CONNECTION_QUALIFIED_NAME); - if (StringUtils.isNotEmpty(connectionQN)) { - involvedConnections.add(connectionQN); - } + Map connectionAttr = fetchAttributes(assetVertex, FETCH_ENTITY_ATTRIBUTES); + String connectionQN = connectionAttr.get(CONNECTION_QUALIFIED_NAME); + if (StringUtils.isNotEmpty(connectionQN)) { + connections.add(connectionQN); } } catch (AtlasBaseException e) { LOG.warn("Failed to retrieve connection for asset {}: {}", assetId.getGuid(), e.getMessage()); } } + } + } - // Collect affected connections from connection processes to be deleted - Set connectionProcessQNs = new HashSet<>(); - - // Handle both single value and multi-value cases - try { - // Try getting as a list first - Iterable propertyValues = vertex.getPropertyValues(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, Object.class); - if (propertyValues != null) { - for (Object value : propertyValues) { - if (value != null) { - connectionProcessQNs.add(value.toString()); - } - } - } - } catch (Exception e) { - // If getting as list fails, try getting as single value - try { - String singleValue = vertex.getProperty(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, String.class); - if (StringUtils.isNotEmpty(singleValue)) { - connectionProcessQNs.add(singleValue); - } - } catch (Exception ex) { - LOG.warn("Error getting parentConnectionProcessQualifiedName property: {}", ex.getMessage()); - } - } + private void deleteConnectionProcess(String connectionProcessQn, Set affectedConnections) throws AtlasBaseException { + AtlasObjectId atlasObjectId = new AtlasObjectId(); + atlasObjectId.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE); + atlasObjectId.setUniqueAttributes(AtlasEntityUtils.mapOf(QUALIFIED_NAME, connectionProcessQn)); - if (connectionProcessQNs.isEmpty()) { - return; + try { + AtlasVertex connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId); + AtlasEntity connectionProcess = entityRetriever.toAtlasEntity(connectionProcessVertex); + + // Add connection QNs to affected connections + addConnectionToSet(connectionProcess.getRelationshipAttribute("input"), affectedConnections); + addConnectionToSet(connectionProcess.getRelationshipAttribute("output"), affectedConnections); + + entityStore.deleteById(connectionProcessVertex.getProperty("__guid", String.class)); + } catch (AtlasBaseException e) { + if (!e.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) { + throw e; } + } + } - Set affectedConnections = new HashSet<>(); - - // Process each connection process - for (String connectionProcessQn : connectionProcessQNs) { - if (!checkIfMoreChildProcessExistForConnectionProcess(connectionProcessQn)) { - AtlasObjectId atlasObjectId = new AtlasObjectId(); - atlasObjectId.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE); - atlasObjectId.setUniqueAttributes(AtlasEntityUtils.mapOf(QUALIFIED_NAME, connectionProcessQn)); - - try { - // Get connection process before deletion to track affected connections - AtlasVertex connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId); - AtlasEntity connectionProcess = entityRetriever.toAtlasEntity(connectionProcessVertex); - - // Get all input and output connections - List inputConnQNs = getConnectionQualifiedNames(connectionProcess, "inputs"); - List outputConnQNs = getConnectionQualifiedNames(connectionProcess, "outputs"); - - // Add all connections to affected set - affectedConnections.addAll(inputConnQNs); - affectedConnections.addAll(outputConnQNs); - - // Delete the connection process - entityStore.deleteById(connectionProcessVertex.getProperty("__guid", String.class)); - } catch (AtlasBaseException exp) { - if (!exp.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) { - throw exp; - } - } + private void addConnectionToSet(Object connectionAttr, Set connections) { + if (connectionAttr instanceof AtlasObjectId) { + AtlasObjectId connObjectId = (AtlasObjectId) connectionAttr; + Map uniqueAttributes = connObjectId.getUniqueAttributes(); + if (uniqueAttributes != null) { + String qn = (String) uniqueAttributes.get(QUALIFIED_NAME); + if (StringUtils.isNotEmpty(qn)) { + connections.add(qn); } } - - // Combine involved and affected connections - Set connectionsToCheck = new HashSet<>(); - connectionsToCheck.addAll(involvedConnections); - connectionsToCheck.addAll(affectedConnections); - - // Check and update hasLineage for all connections involved - for (String connectionQN : connectionsToCheck) { - checkAndUpdateConnectionLineage(connectionQN); - } - } finally { - RequestContext.get().endMetricRecord(metricRecorder); } }