Skip to content

Commit

Permalink
Merge pull request #3793 from atlanhq/lin-160-beta2
Browse files Browse the repository at this point in the history
LIN-160 PR2 [summary lineage] added LineagePreprocessor to add and delete connectionProcess assets
  • Loading branch information
rahul-madaan authored Nov 27, 2024
2 parents 7e6052a + 4dc6e57 commit dd1955f
Showing 1 changed file with 124 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,47 @@ private void processCreateLineageProcess(AtlasEntity entity, ArrayList connectio
}
}

private void processUpdateLineageProcess(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context, ArrayList connectionProcessList) {
// check if connection lineage exists
// add owner connection process
if(!connectionProcessList.isEmpty()){
entity.setAttribute(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, connectionProcessList);
private void processUpdateLineageProcess(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context, ArrayList<String> newConnectionProcessList) throws AtlasBaseException {
// Get the old parentConnectionProcessQualifiedName from the existing vertex
List<String> oldConnectionProcessList = null;
try {
Object propertyValue = vertex.getProperty(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, Object.class);
if (propertyValue instanceof String) {
oldConnectionProcessList = Arrays.asList((String) propertyValue);
} else if (propertyValue instanceof List) {
oldConnectionProcessList = (List<String>) propertyValue;
} else if (propertyValue != null) {
oldConnectionProcessList = Collections.singletonList(propertyValue.toString());
} else {
oldConnectionProcessList = Collections.emptyList();
}
} catch (Exception e) {
oldConnectionProcessList = Collections.emptyList();
}

// Identify ConnectionProcesses to remove (present in old list but not in new list)
Set<String> connectionProcessesToRemove = new HashSet<>(oldConnectionProcessList);
connectionProcessesToRemove.removeAll(newConnectionProcessList);

// Identify ConnectionProcesses to add (present in new list but not in old list)
Set<String> connectionProcessesToAdd = new HashSet<>(newConnectionProcessList);
connectionProcessesToAdd.removeAll(oldConnectionProcessList);

// For each ConnectionProcess to remove
for (String connectionProcessQn : connectionProcessesToRemove) {
// Check if more child Processes exist for this ConnectionProcess
if (!checkIfMoreChildProcessExistForConnectionProcess(connectionProcessQn)) {
// Delete the ConnectionProcess
deleteConnectionProcess(connectionProcessQn);
}
// Update __hasLineage for involved Connections
updateConnectionsHasLineageForConnectionProcess(connectionProcessQn);
}

// For new ConnectionProcesses, we've already created or retrieved them in getConnectionProcessQNsForTheGivenInputOutputs

// Update the Process entity's parentConnectionProcessQualifiedName attribute
entity.setAttribute(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, newConnectionProcessList);
}

private AtlasEntity createConnectionProcessEntity(Map<String, Object> connectionProcessInfo) throws AtlasBaseException {
Expand Down Expand Up @@ -348,7 +383,6 @@ public void processDelete(AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processDeleteLineageProcess");

try {
// handle both soft and hard deletes
// Collect all connections involved in the process being deleted
AtlasEntity processEntity = entityRetriever.toAtlasEntity(vertex);

Expand Down Expand Up @@ -384,18 +418,27 @@ public void processDelete(AtlasVertex vertex) throws AtlasBaseException {
// Collect affected connections from connection processes to be deleted
Set<String> connectionProcessQNs = new HashSet<>();

Object rawProperty = vertex.getProperty(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, Object.class);

if (rawProperty instanceof List) {
// If the property is a List, cast and add all elements
List<String> propertyList = (List<String>) rawProperty;
connectionProcessQNs.addAll(propertyList);
} else if (rawProperty instanceof String) {
// If it's a single String, add it to the set
connectionProcessQNs.add((String) rawProperty);
} else if (rawProperty != null) {
// Handle other object types if necessary
connectionProcessQNs.add(rawProperty.toString());
// Handle both single value and multi-value cases
try {
// Try getting as a list first
Iterable<Object> propertyValues = vertex.getPropertyValues(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, Object.class);
if (propertyValues != null) {
for (Object value : propertyValues) {
if (value != null) {
connectionProcessQNs.add(value.toString());
}
}
}
} catch (Exception e) {
// If getting as list fails, try getting as single value
try {
String singleValue = vertex.getProperty(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, String.class);
if (StringUtils.isNotEmpty(singleValue)) {
connectionProcessQNs.add(singleValue);
}
} catch (Exception ex) {
LOG.warn("Error getting parentConnectionProcessQualifiedName property: {}", ex.getMessage());
}
}

if (connectionProcessQNs.isEmpty()) {
Expand All @@ -416,17 +459,13 @@ public void processDelete(AtlasVertex vertex) throws AtlasBaseException {
AtlasVertex connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId);
AtlasEntity connectionProcess = entityRetriever.toAtlasEntity(connectionProcessVertex);

// Safely get connection qualified names
String inputConnQN = getConnectionQualifiedName(connectionProcess, "input");
String outputConnQN = getConnectionQualifiedName(connectionProcess, "output");
// Get all input and output connections
List<String> inputConnQNs = getConnectionQualifiedNames(connectionProcess, "inputs");
List<String> outputConnQNs = getConnectionQualifiedNames(connectionProcess, "outputs");

// Add non-null qualified names to affected connections
if (StringUtils.isNotEmpty(inputConnQN)) {
affectedConnections.add(inputConnQN);
}
if (StringUtils.isNotEmpty(outputConnQN)) {
affectedConnections.add(outputConnQN);
}
// Add all connections to affected set
affectedConnections.addAll(inputConnQNs);
affectedConnections.addAll(outputConnQNs);

// Delete the connection process
entityStore.deleteById(connectionProcessVertex.getProperty("__guid", String.class));
Expand All @@ -452,21 +491,70 @@ public void processDelete(AtlasVertex vertex) throws AtlasBaseException {
}
}

// Helper method to safely get connection qualified name
private String getConnectionQualifiedName(AtlasEntity connectionProcess, String attributeName) {
private List<String> getConnectionQualifiedNames(AtlasEntity connectionProcess, String attributeName) {
List<String> connectionQualifiedNames = new ArrayList<>();
try {
Object relationshipAttr = connectionProcess.getRelationshipAttribute(attributeName);
if (relationshipAttr instanceof AtlasObjectId) {
AtlasObjectId connObjectId = (AtlasObjectId) relationshipAttr;
Map<String, Object> uniqueAttributes = connObjectId.getUniqueAttributes();
if (uniqueAttributes != null) {
return (String) uniqueAttributes.get(QUALIFIED_NAME);
if (relationshipAttr instanceof List) {
List<AtlasObjectId> connObjectIds = (List<AtlasObjectId>) relationshipAttr;
for (AtlasObjectId connObjectId : connObjectIds) {
Map<String, Object> uniqueAttributes = connObjectId.getUniqueAttributes();
if (uniqueAttributes != null) {
String qualifiedName = (String) uniqueAttributes.get(QUALIFIED_NAME);
if (StringUtils.isNotEmpty(qualifiedName)) {
connectionQualifiedNames.add(qualifiedName);
}
}
}
}
} catch (Exception e) {
LOG.warn("Error getting {} qualified name for connection process {}: {}",
attributeName, connectionProcess.getGuid(), e.getMessage());
}
return null;
return connectionQualifiedNames;
}

private void deleteConnectionProcess(String connectionProcessQn) throws AtlasBaseException {
AtlasObjectId atlasObjectId = new AtlasObjectId();
atlasObjectId.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE);
atlasObjectId.setUniqueAttributes(AtlasEntityUtils.mapOf(QUALIFIED_NAME, connectionProcessQn));

try {
AtlasVertex connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId);
entityStore.deleteById(connectionProcessVertex.getProperty("__guid", String.class));
} catch (AtlasBaseException exp) {
if (!exp.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) {
throw exp;
}
}
}

private void updateConnectionsHasLineageForConnectionProcess(String connectionProcessQn) throws AtlasBaseException {
// Get the ConnectionProcess entity
AtlasObjectId atlasObjectId = new AtlasObjectId();
atlasObjectId.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE);
atlasObjectId.setUniqueAttributes(AtlasEntityUtils.mapOf(QUALIFIED_NAME, connectionProcessQn));

try {
AtlasVertex connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId);
AtlasEntity connectionProcess = entityRetriever.toAtlasEntity(connectionProcessVertex);

// Get input and output connections
List<String> inputConnQNs = getConnectionQualifiedNames(connectionProcess, "inputs");
List<String> outputConnQNs = getConnectionQualifiedNames(connectionProcess, "outputs");

// For each connection, check and update __hasLineage
Set<String> connectionsToUpdate = new HashSet<>();
connectionsToUpdate.addAll(inputConnQNs);
connectionsToUpdate.addAll(outputConnQNs);

for (String connectionQN : connectionsToUpdate) {
checkAndUpdateConnectionLineage(connectionQN);
}
} catch (AtlasBaseException exp) {
if (!exp.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) {
throw exp;
}
}
}
}

0 comments on commit dd1955f

Please sign in to comment.