Skip to content

Commit

Permalink
Merge pull request #3795 from atlanhq/lin-160-beta2
Browse files Browse the repository at this point in the history
LIN-160 PR3 [summary lineage] added LineagePreprocessor to add and delete connectionProcess assets
  • Loading branch information
rahul-madaan authored Nov 28, 2024
2 parents dd1955f + 0363a1c commit c63597d
Showing 1 changed file with 66 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ private boolean hasActiveConnectionProcesses(AtlasVertex connectionVertex) {

private ArrayList<String> getConnectionProcessQNsForTheGivenInputOutputs(AtlasEntity processEntity) throws AtlasBaseException{

// check connection lineage exists or not
// check if connection lineage exists
Map<String, Object> entityAttrValues = processEntity.getRelationshipAttributes();

Expand Down Expand Up @@ -383,111 +382,90 @@ public void processDelete(AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processDeleteLineageProcess");

try {
// Collect all connections involved in the process being deleted
AtlasEntity processEntity = entityRetriever.toAtlasEntity(vertex);
Set<String> connectionsToCheck = new HashSet<>();

// Collect connections from inputs and outputs
collectConnectionsFromAssets(processEntity.getRelationshipAttribute("inputs"), connectionsToCheck);
collectConnectionsFromAssets(processEntity.getRelationshipAttribute("outputs"), connectionsToCheck);

// Handle connection processes
Object rawConnectionProcessQNs = vertex.getProperty(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, Object.class);
if (rawConnectionProcessQNs == null) {
return;
}

Set<String> involvedConnections = new HashSet<>();

// Retrieve inputs and outputs from the process
List<AtlasObjectId> inputs = (List<AtlasObjectId>) processEntity.getRelationshipAttribute("inputs");
List<AtlasObjectId> outputs = (List<AtlasObjectId>) processEntity.getRelationshipAttribute("outputs");
Set<String> connectionProcessQNs = new HashSet<>();
if (rawConnectionProcessQNs instanceof List) {
connectionProcessQNs.addAll((List<String>) rawConnectionProcessQNs);
} else {
connectionProcessQNs.add(rawConnectionProcessQNs.toString());
}

if (inputs == null) inputs = Collections.emptyList();
if (outputs == null) outputs = Collections.emptyList();
// Process each connection process
for (String connectionProcessQn : connectionProcessQNs) {
if (!checkIfMoreChildProcessExistForConnectionProcess(connectionProcessQn)) {
deleteConnectionProcess(connectionProcessQn, connectionsToCheck);
}
}

List<AtlasObjectId> allAssets = new ArrayList<>();
allAssets.addAll(inputs);
allAssets.addAll(outputs);
// Update hasLineage flags
for (String connectionQN : connectionsToCheck) {
checkAndUpdateConnectionLineage(connectionQN);
}
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}

// For each asset, get its connection and add to involvedConnections
for (AtlasObjectId assetId : allAssets) {
private void collectConnectionsFromAssets(Object assets, Set<String> connections) {
if (assets instanceof List) {
for (AtlasObjectId assetId : (List<AtlasObjectId>) assets) {
try {
AtlasVertex assetVertex = entityRetriever.getEntityVertex(assetId);
Map<String, String> assetConnectionAttributes = fetchAttributes(assetVertex, FETCH_ENTITY_ATTRIBUTES);
if (assetConnectionAttributes != null) {
String connectionQN = assetConnectionAttributes.get(CONNECTION_QUALIFIED_NAME);
if (StringUtils.isNotEmpty(connectionQN)) {
involvedConnections.add(connectionQN);
}
Map<String, String> connectionAttr = fetchAttributes(assetVertex, FETCH_ENTITY_ATTRIBUTES);
String connectionQN = connectionAttr.get(CONNECTION_QUALIFIED_NAME);
if (StringUtils.isNotEmpty(connectionQN)) {
connections.add(connectionQN);
}
} catch (AtlasBaseException e) {
LOG.warn("Failed to retrieve connection for asset {}: {}", assetId.getGuid(), e.getMessage());
}
}
}
}

// Collect affected connections from connection processes to be deleted
Set<String> connectionProcessQNs = new HashSet<>();

// Handle both single value and multi-value cases
try {
// Try getting as a list first
Iterable<Object> propertyValues = vertex.getPropertyValues(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, Object.class);
if (propertyValues != null) {
for (Object value : propertyValues) {
if (value != null) {
connectionProcessQNs.add(value.toString());
}
}
}
} catch (Exception e) {
// If getting as list fails, try getting as single value
try {
String singleValue = vertex.getProperty(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, String.class);
if (StringUtils.isNotEmpty(singleValue)) {
connectionProcessQNs.add(singleValue);
}
} catch (Exception ex) {
LOG.warn("Error getting parentConnectionProcessQualifiedName property: {}", ex.getMessage());
}
}
private void deleteConnectionProcess(String connectionProcessQn, Set<String> affectedConnections) throws AtlasBaseException {
AtlasObjectId atlasObjectId = new AtlasObjectId();
atlasObjectId.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE);
atlasObjectId.setUniqueAttributes(AtlasEntityUtils.mapOf(QUALIFIED_NAME, connectionProcessQn));

if (connectionProcessQNs.isEmpty()) {
return;
try {
AtlasVertex connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId);
AtlasEntity connectionProcess = entityRetriever.toAtlasEntity(connectionProcessVertex);

// Add connection QNs to affected connections
addConnectionToSet(connectionProcess.getRelationshipAttribute("input"), affectedConnections);
addConnectionToSet(connectionProcess.getRelationshipAttribute("output"), affectedConnections);

entityStore.deleteById(connectionProcessVertex.getProperty("__guid", String.class));
} catch (AtlasBaseException e) {
if (!e.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) {
throw e;
}
}
}

Set<String> affectedConnections = new HashSet<>();

// Process each connection process
for (String connectionProcessQn : connectionProcessQNs) {
if (!checkIfMoreChildProcessExistForConnectionProcess(connectionProcessQn)) {
AtlasObjectId atlasObjectId = new AtlasObjectId();
atlasObjectId.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE);
atlasObjectId.setUniqueAttributes(AtlasEntityUtils.mapOf(QUALIFIED_NAME, connectionProcessQn));

try {
// Get connection process before deletion to track affected connections
AtlasVertex connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId);
AtlasEntity connectionProcess = entityRetriever.toAtlasEntity(connectionProcessVertex);

// Get all input and output connections
List<String> inputConnQNs = getConnectionQualifiedNames(connectionProcess, "inputs");
List<String> outputConnQNs = getConnectionQualifiedNames(connectionProcess, "outputs");

// Add all connections to affected set
affectedConnections.addAll(inputConnQNs);
affectedConnections.addAll(outputConnQNs);

// Delete the connection process
entityStore.deleteById(connectionProcessVertex.getProperty("__guid", String.class));
} catch (AtlasBaseException exp) {
if (!exp.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) {
throw exp;
}
}
private void addConnectionToSet(Object connectionAttr, Set<String> connections) {
if (connectionAttr instanceof AtlasObjectId) {
AtlasObjectId connObjectId = (AtlasObjectId) connectionAttr;
Map<String, Object> uniqueAttributes = connObjectId.getUniqueAttributes();
if (uniqueAttributes != null) {
String qn = (String) uniqueAttributes.get(QUALIFIED_NAME);
if (StringUtils.isNotEmpty(qn)) {
connections.add(qn);
}
}

// Combine involved and affected connections
Set<String> connectionsToCheck = new HashSet<>();
connectionsToCheck.addAll(involvedConnections);
connectionsToCheck.addAll(affectedConnections);

// Check and update hasLineage for all connections involved
for (String connectionQN : connectionsToCheck) {
checkAndUpdateConnectionLineage(connectionQN);
}
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}

Expand Down

0 comments on commit c63597d

Please sign in to comment.