Skip to content

Commit

Permalink
Merge pull request #3493 from atlanhq/temp-sync-beta
Browse files Browse the repository at this point in the history
sync beta with master
  • Loading branch information
nikhilbonte21 authored Sep 9, 2024
2 parents 82b11a9 + 5d01988 commit d21f19f
Showing 1 changed file with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3132,7 +3132,6 @@ private void updateInConsistentOwnedMapVertices(AttributeMutationContext ctx, At
}
}


public void cleanUpClassificationPropagation(String classificationName, int batchLimit) throws AtlasBaseException {
int CLEANUP_MAX = batchLimit <= 0 ? CLEANUP_BATCH_SIZE : batchLimit * CLEANUP_BATCH_SIZE;
int cleanedUpCount = 0;
Expand All @@ -3148,7 +3147,7 @@ public void cleanUpClassificationPropagation(String classificationName, int batc

while (tagVertices.hasNext() && currentAssetVerticesBatch.size() < CLEANUP_BATCH_SIZE) {
AtlasVertex tagVertex = tagVertices.next();
LOG.info("Currently processing tagVertice : {}", tagVertex.getIdForDisplay());

int availableSlots = CLEANUP_BATCH_SIZE - currentAssetVerticesBatch.size();
long assetCountForCurrentTagVertex = GraphHelper.getAssetsCountOfClassificationVertex(tagVertex);
currentAssetVerticesBatch.addAll(GraphHelper.getAllAssetsWithClassificationVertex(tagVertex, availableSlots));
Expand All @@ -3168,11 +3167,20 @@ public void cleanUpClassificationPropagation(String classificationName, int batc
List<AtlasVertex> entityVertices = currentAssetVerticesBatch.subList(offset, toIndex);
List<String> impactedGuids = entityVertices.stream().map(GraphHelper::getGuid).collect(Collectors.toList());
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(impactedGuids);
entityVertices.forEach(vertex ->
{
detachAndRepairTagEdges(classificationName, vertex);
});
// entityVertices.stream().parallel().forEach(vertex -> detachAndRepairTagEdges(classificationName, vertex));

for (AtlasVertex vertex : entityVertices) {
List<AtlasClassification> deletedClassifications = new ArrayList<>();
List<AtlasEdge> classificationEdges = GraphHelper.getClassificationEdges(vertex, null, classificationName);
for (AtlasEdge edge : classificationEdges) {
AtlasClassification classification = entityRetriever.toAtlasClassification(edge.getInVertex());
deletedClassifications.add(classification);
deleteDelegate.getHandler().deleteEdgeReference(edge, TypeCategory.CLASSIFICATION, false, true, null, vertex);
}

AtlasEntity entity = repairClassificationMappings(vertex);

entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassifications);
}

transactionInterceptHelper.intercept();

Expand All @@ -3182,7 +3190,7 @@ public void cleanUpClassificationPropagation(String classificationName, int batc
}

} while (offset < currentAssetsBatchSize);
LOG.info("{} - Now deleting {} tagVertices", classificationName, tagVerticesProcessed.size());

for (AtlasVertex classificationVertex : tagVerticesProcessed) {
try {
deleteDelegate.getHandler().deleteClassificationVertex(classificationVertex, true);
Expand All @@ -3197,7 +3205,6 @@ public void cleanUpClassificationPropagation(String classificationName, int batc
tagVerticesProcessed.clear();
}
tagVertices = GraphHelper.getClassificationVertices(graph, classificationName, CLEANUP_BATCH_SIZE);
LOG.info("{} - tagVertices.hasNext() : {}", classificationName, tagVertices.hasNext());
}

LOG.info("Completed cleaning up classification {}", classificationName);
Expand Down

0 comments on commit d21f19f

Please sign in to comment.