Skip to content

Commit

Permalink
updated lineage preprocessor to check both input/output, not just 1
Browse files Browse the repository at this point in the history
  • Loading branch information
rahul-madaan committed Nov 26, 2024
1 parent 924b588 commit c160162
Showing 1 changed file with 9 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private void checkAndUpdateConnectionLineage(String connectionQualifiedName) thr
try {
AtlasVertex connectionVertex = entityRetriever.getEntityVertex(connectionId);

// Check both input and output edges
// Check if this connection has any active connection processes
boolean hasActiveConnectionProcess = hasActiveConnectionProcesses(connectionVertex);

// Only update if the hasLineage status needs to change
Expand Down Expand Up @@ -226,45 +226,26 @@ private void checkAndUpdateConnectionLineage(String connectionQualifiedName) thr
}

private boolean hasActiveConnectionProcesses(AtlasVertex connectionVertex) {
// Check both input and output edges
// Iterate over both input and output edges connected to this connection
Iterator<AtlasEdge> edges = connectionVertex.getEdges(AtlasEdgeDirection.BOTH,
new String[]{"__ConnectionProcess.inputs", "__ConnectionProcess.outputs"}).iterator();

while (edges.hasNext()) {
AtlasEdge edge = edges.next();

// Check if the edge is ACTIVE
if (getStatus(edge) == ACTIVE) {
AtlasVertex processVertex = edge.getLabel().equals("__ConnectionProcess.inputs") ?
edge.getOutVertex() : edge.getInVertex();
// Get the connected process vertex (the other vertex of the edge)
AtlasVertex processVertex = edge.getOutVertex().equals(connectionVertex) ?
edge.getInVertex() : edge.getOutVertex();

// If this is an active connection process
// Check if the connected vertex is an ACTIVE ConnectionProcess
if (getStatus(processVertex) == ACTIVE &&
getTypeName(processVertex).equals(CONNECTION_PROCESS_ENTITY_TYPE)) {

// Get the other connection in this process
AtlasVertex otherConnectionVertex = null;
Iterator<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.BOTH,
new String[]{"__ConnectionProcess.inputs", "__ConnectionProcess.outputs"}).iterator();

while (processEdges.hasNext()) {
AtlasEdge processEdge = processEdges.next();
if (getStatus(processEdge) == ACTIVE) {
AtlasVertex connVertex = processEdge.getInVertex();
if (!connVertex.getId().equals(connectionVertex.getId())) {
otherConnectionVertex = connVertex;
break;
}
}
}

// If the other connection is active, this connection process is valid
if (otherConnectionVertex != null && getStatus(otherConnectionVertex) == ACTIVE) {
return true;
}
return true;
}
}
}

return false;
}

Expand Down Expand Up @@ -488,4 +469,4 @@ private String getConnectionQualifiedName(AtlasEntity connectionProcess, String
}
return null;
}
}
}

0 comments on commit c160162

Please sign in to comment.