Skip to content

Commit

Permalink
Merge pull request #3790 from atlanhq/DG-1893
Browse files Browse the repository at this point in the history
DG-1893 Support append/remove/replace for Tags
  • Loading branch information
hr2904 authored Nov 26, 2024
2 parents 5abbf15 + 25c5bcf commit 7e6052a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
private String displayText = null;
private List<String> classificationNames = null;
private List<AtlasClassification> classifications = null;
private List<AtlasClassification> appendClassifications = null;


private List<AtlasClassification> updateClassifications = null;
private List<AtlasClassification> removeClassifications = null;
private List<String> meaningNames = null;
private List<AtlasTermAssignmentHeader> meanings = null;
private Boolean isIncomplete = Boolean.FALSE;
Expand Down Expand Up @@ -101,6 +106,29 @@ public AtlasEntityHeader(String typeName, Map<String, Object> attributes) {
setLabels(null);
}

public List<AtlasClassification> getAppendClassifications() {
return appendClassifications;
}

public void setAppendClassifications(List<AtlasClassification> appendClassifications) {
this.appendClassifications = appendClassifications;
}

public List<AtlasClassification> getUpdateClassifications() {
return updateClassifications;
}

public void setUpdateClassifications(List<AtlasClassification> updateClassifications) {
this.updateClassifications = updateClassifications;
}

public List<AtlasClassification> getRemoveClassifications() {
return removeClassifications;
}

public void setRemoveClassifications(List<AtlasClassification> removeClassifications) {
this.removeClassifications = removeClassifications;
}

public AtlasEntityHeader(String typeName, String guid, Map<String, Object> attributes) {
super(typeName, attributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.atlas.AtlasConfiguration.ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES;

Expand Down Expand Up @@ -154,7 +155,7 @@ public Updater(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore,
this(AtlasGraphProvider.getGraphInstance(), typeRegistry, entitiesStore, entityGraphMapper, entityChangeNotifier, instanceConverter);
}

public void setClassifications(Map<String, AtlasEntityHeader> map) throws AtlasBaseException {
public void setClassifications(Map<String, AtlasEntityHeader> map, boolean overrideClassifications) throws AtlasBaseException {
RequestContext.get().setDelayTagNotifications(true);

for (String guid : map.keySet()) {
Expand All @@ -178,8 +179,12 @@ public void setClassifications(Map<String, AtlasEntityHeader> map) throws AtlasB
if (entityToBeChanged == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}

Map<String, List<AtlasClassification>> operationListMap = computeChanges(incomingEntityHeader, entityToBeChanged);
Map<String, List<AtlasClassification>> operationListMap = new HashMap<>();
if(overrideClassifications) {
operationListMap = computeChanges(incomingEntityHeader, entityToBeChanged);
} else {
operationListMap = validateAndTransfer(incomingEntityHeader, entityToBeChanged);
}
try {
commitChanges(guid, typeName, operationListMap);
} catch (AtlasBaseException e) {
Expand Down Expand Up @@ -236,6 +241,44 @@ public void setClassifications(Map<String, AtlasEntityHeader> map) throws AtlasB
RequestContext.get().setDelayTagNotifications(false);
}

private Map<String, List<AtlasClassification>> validateAndTransfer(AtlasEntityHeader incomingEntityHeader, AtlasEntityHeader entityToBeChanged) throws AtlasBaseException {
Map<String, List<AtlasClassification>> operationListMap = new HashMap<>();

Set<String> requiredClassificationKeys = Stream.concat(
incomingEntityHeader.getRemoveClassifications().stream(),
incomingEntityHeader.getUpdateClassifications().stream()
).map(this::generateClassificationComparisonKey)
.collect(Collectors.toSet());

Set<String> preExistingClassificationKeys = entityToBeChanged.getClassifications()
.stream()
.map(this::generateClassificationComparisonKey)
.collect(Collectors.toSet());

Set<String> diff = requiredClassificationKeys.stream()
.filter(key -> !preExistingClassificationKeys.contains(key))
.collect(Collectors.toSet());

if (!diff.isEmpty()) {
String firstTypeName = diff.iterator().next().split("\\|")[1];
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, firstTypeName);
}

List<AtlasClassification> filteredClassifications = incomingEntityHeader.getAppendClassifications()
.stream()
.filter(appendClassification -> !preExistingClassificationKeys.contains(generateClassificationComparisonKey(appendClassification)))
.collect(Collectors.toList());

bucket(PROCESS_DELETE, operationListMap, incomingEntityHeader.getRemoveClassifications());
bucket(PROCESS_UPDATE, operationListMap, incomingEntityHeader.getUpdateClassifications());
bucket(PROCESS_ADD, operationListMap, filteredClassifications);
return operationListMap;
}

private String generateClassificationComparisonKey(AtlasClassification classification) {
return classification.getEntityGuid() + "|" + classification.getTypeName();
}

private void commitChanges(String entityGuid, String typeName, Map<String, List<AtlasClassification>> operationListMap) throws AtlasBaseException {
if (MapUtils.isEmpty(operationListMap)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ public AtlasEntityHeaders getEntityHeaders(@QueryParam("tagUpdateStartTime") lon
@Produces(Servlets.JSON_MEDIA_TYPE)
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Timed
public void setClassifications(AtlasEntityHeaders entityHeaders) throws AtlasBaseException {
public void setClassifications(AtlasEntityHeaders entityHeaders, @QueryParam("overrideClassifications") @DefaultValue("true") boolean overrideClassifications) throws AtlasBaseException {
AtlasPerfTracer perf = null;

try {
Expand All @@ -1274,7 +1274,7 @@ public void setClassifications(AtlasEntityHeaders entityHeaders) throws AtlasBas
}

ClassificationAssociator.Updater associator = new ClassificationAssociator.Updater(typeRegistry, entitiesStore, entityGraphMapper, entityChangeNotifier, instanceConverter);
associator.setClassifications(entityHeaders.getGuidHeaderMap());
associator.setClassifications(entityHeaders.getGuidHeaderMap(), overrideClassifications);
} finally {
AtlasPerfTracer.log(perf);
}
Expand Down

0 comments on commit 7e6052a

Please sign in to comment.