Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove bwc code related to 1.0 and earlier version #714

Merged
merged 1 commit into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/main/java/org/opensearch/ad/cluster/ADVersionUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,4 @@ public static String normalizeVersion(String adVersion) {
}
return normalizedVersion.toString();
}

public static boolean compatibleWithVersionOnOrAfter1_1(Version adVersion) {
return adVersion != null && adVersion.onOrAfter(Version.V_1_1_0);
}
}
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/cluster/HashRing.java
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ private void buildCircles(Set<String> removedNodeIds, Set<String> addedNodeIds,
// rebuild AD version hash ring with cooldown after all new node added.
rebuildCirclesForRealtimeAD();

if (!dataMigrator.isMigrated() && circles.size() > 0 && circles.lastEntry().getKey().onOrAfter(Version.V_1_1_0)) {
if (!dataMigrator.isMigrated() && circles.size() > 0) {
// Find owning node with highest AD version to make sure the data migration logic be compatible to
// latest AD version when upgrade.
Optional<DiscoveryNode> owningNode = getOwningNodeWithHighestAdVersion(DEFAULT_HASH_RING_MODEL_ID);
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/org/opensearch/ad/model/ADTaskProfile.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.opensearch.Version;
import org.opensearch.ad.annotation.Generated;
import org.opensearch.ad.cluster.ADVersionUtil;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -181,7 +180,7 @@ public void writeTo(StreamOutput out, Version adVersion) throws IOException {
out.writeOptionalInt(thresholdModelTrainingDataSize);
out.writeOptionalLong(modelSizeInBytes);
out.writeOptionalString(nodeId);
if (ADVersionUtil.compatibleWithVersionOnOrAfter1_1(adVersion)) {
if (adVersion != null) {
out.writeOptionalString(taskId);
out.writeOptionalString(adTaskType);
out.writeOptionalInt(detectorTaskSlots);
Expand Down
33 changes: 8 additions & 25 deletions src/main/java/org/opensearch/ad/model/ModelProfile.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.util.Bwc;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
Expand All @@ -43,42 +42,26 @@ public ModelProfile(String modelId, Entity entity, long modelSizeInBytes) {

public ModelProfile(StreamInput in) throws IOException {
this.modelId = in.readString();
if (Bwc.supportMultiCategoryFields(in.getVersion())) {
if (in.readBoolean()) {
this.entity = new Entity(in);
} else {
this.entity = null;
}
if (in.readBoolean()) {
this.entity = new Entity(in);
} else {
this.entity = null;
}

this.modelSizeInBytes = in.readLong();
if (!Bwc.supportMultiCategoryFields(in.getVersion())) {
// removed nodeId since Opensearch 1.1
// read it and do no assignment
in.readString();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(modelId);
if (Bwc.supportMultiCategoryFields(out.getVersion())) {
if (entity != null) {
out.writeBoolean(true);
entity.writeTo(out);
} else {
out.writeBoolean(false);
}
if (entity != null) {
out.writeBoolean(true);
entity.writeTo(out);
} else {
out.writeBoolean(false);
}

out.writeLong(modelSizeInBytes);
// removed nodeId since Opensearch 1.1
if (!Bwc.supportMultiCategoryFields(out.getVersion())) {
// write empty string for node id as we don't have it
// otherwise, we will get EOFException
out.writeString(CommonName.EMPTY_FIELD);
}
}

public String getModelId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.ad.cluster.ADVersionUtil;
import org.opensearch.ad.model.ADTaskProfile;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -50,8 +49,7 @@ public ADTaskProfile getAdTaskProfile() {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (adTaskProfile != null
&& (ADVersionUtil.compatibleWithVersionOnOrAfter1_1(remoteAdVersion) || adTaskProfile.getNodeId() != null)) {
if (adTaskProfile != null && (remoteAdVersion != null || adTaskProfile.getNodeId() != null)) {
out.writeBoolean(true);
adTaskProfile.writeTo(out, remoteAdVersion);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.EntityProfileName;
import org.opensearch.ad.util.Bwc;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand All @@ -41,17 +40,8 @@ public class EntityProfileRequest extends ActionRequest implements ToXContentObj
public EntityProfileRequest(StreamInput in) throws IOException {
super(in);
adID = in.readString();
if (Bwc.supportMultiCategoryFields(in.getVersion())) {
entityValue = new Entity(in);
} else {
// entity profile involving an old node won't work. Read
// EntityProfileTransportAction.doExecute for details. Read
// a string to not cause EOF exception.
// Cannot assign null to entityValue as old node has no logic to
// deal with a null entity.
String oldFormatEntityString = in.readString();
entityValue = Entity.createSingleAttributeEntity(CommonName.EMPTY_FIELD, oldFormatEntityString);
}
entityValue = new Entity(in);

int size = in.readVInt();
profilesToCollect = new HashSet<EntityProfileName>();
if (size != 0) {
Expand Down Expand Up @@ -84,14 +74,8 @@ public Set<EntityProfileName> getProfilesToCollect() {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(adID);
if (Bwc.supportMultiCategoryFields(out.getVersion())) {
entityValue.writeTo(out);
} else {
// entity profile involving an old node won't work. Read
// EntityProfileTransportAction.doExecute for details. Write
// a string to not cause EOF exception.
out.writeString(entityValue.toString());
}
entityValue.writeTo(out);

out.writeVInt(profilesToCollect.size());
for (EntityProfileName profile : profilesToCollect) {
out.writeEnum(profile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
import org.apache.commons.lang.builder.ToStringBuilder;
import org.opensearch.action.ActionResponse;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.ModelProfile;
import org.opensearch.ad.model.ModelProfileOnNode;
import org.opensearch.ad.util.Bwc;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContentObject;
Expand Down Expand Up @@ -82,14 +80,7 @@ public EntityProfileResponse(StreamInput in) throws IOException {
lastActiveMs = in.readLong();
totalUpdates = in.readLong();
if (in.readBoolean()) {
if (Bwc.supportMultiCategoryFields(in.getVersion())) {
modelProfile = new ModelProfileOnNode(in);
} else {
// we don't have model information from old node
ModelProfile profile = new ModelProfile(in);
modelProfile = new ModelProfileOnNode("", profile);
}

modelProfile = new ModelProfileOnNode(in);
} else {
modelProfile = null;
}
Expand Down Expand Up @@ -118,12 +109,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(totalUpdates);
if (modelProfile != null) {
out.writeBoolean(true);
if (Bwc.supportMultiCategoryFields(out.getVersion())) {
modelProfile.writeTo(out);
} else {
ModelProfile oldFormatModelProfile = modelProfile.getModelProfile();
oldFormatModelProfile.writeTo(out);
}
modelProfile.writeTo(out);
} else {
out.writeBoolean(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import static org.opensearch.action.ValidateActions.addValidationError;

import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

Expand All @@ -25,7 +24,6 @@
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.util.Bwc;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand All @@ -46,21 +44,7 @@ public EntityResultRequest(StreamInput in) throws IOException {

// guarded with version check. Just in case we receive requests from older node where we use String
// to represent an entity
if (Bwc.supportMultiCategoryFields(in.getVersion())) {
this.entities = in.readMap(Entity::new, StreamInput::readDoubleArray);
} else {
// receive a request from a version before OpenSearch 1.1
// the old request uses Map<String, double[]> instead of Map<Entity, double[]> to represent entities
// since it only supports one categorical field.
Map<String, double[]> oldFormatEntities = in.readMap(StreamInput::readString, StreamInput::readDoubleArray);
entities = new HashMap<>();
for (Map.Entry<String, double[]> entry : oldFormatEntities.entrySet()) {
// we don't know the category field name as we don't have access to detector config object
// so we put empty string as the category field name for now. Will handle the case
// in EntityResultTransportAciton.
entities.put(Entity.createSingleAttributeEntity(CommonName.EMPTY_FIELD, entry.getKey()), entry.getValue());
}
}
this.entities = in.readMap(Entity::new, StreamInput::readDoubleArray);

this.start = in.readLong();
this.end = in.readLong();
Expand Down Expand Up @@ -96,25 +80,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.detectorId);
// guarded with version check. Just in case we send requests to older node where we use String
// to represent an entity
if (Bwc.supportMultiCategoryFields(out.getVersion())) {
out.writeMap(entities, (s, e) -> e.writeTo(s), StreamOutput::writeDoubleArray);
} else {
Map<String, double[]> oldFormatEntities = new HashMap<>();
for (Map.Entry<Entity, double[]> entry : entities.entrySet()) {
Map<String, String> attributes = entry.getKey().getAttributes();
if (attributes.size() != 1) {
// cannot send a multi-category field entity to old node since it will
// cause EOF exception and stop the detector. The issue
// is temporary and will be gone after upgrade completes.
// Since one EntityResultRequest is sent to one node, we can safely
// ignore the rest of the requests.
LOG.info("Skip sending multi-category entities to an incompatible node. Attributes: ", attributes);
break;
}
oldFormatEntities.put(entry.getKey().getAttributes().entrySet().iterator().next().getValue(), entry.getValue());
}
out.writeMap(oldFormatEntities, StreamOutput::writeString, StreamOutput::writeDoubleArray);
}
out.writeMap(entities, (s, e) -> e.writeTo(s), StreamOutput::writeDoubleArray);

out.writeLong(this.start);
out.writeLong(this.end);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.Version;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.ad.cluster.ADVersionUtil;
import org.opensearch.ad.common.exception.ADVersionException;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.model.ADTask;
Expand Down Expand Up @@ -64,11 +63,8 @@ public ForwardADTaskRequest(
Integer availableTaskSlots,
Version remoteAdVersion
) {
if (!ADVersionUtil.compatibleWithVersionOnOrAfter1_1(remoteAdVersion)) {
throw new ADVersionException(
detector.getDetectorId(),
"Can't forward AD task request to node running AD version " + remoteAdVersion
);
if (remoteAdVersion == null) {
throw new ADVersionException(detector.getDetectorId(), "Can't forward AD task request to node running null AD version ");
}
this.detector = detector;
this.detectionDateRange = detectionDateRange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.ModelProfile;
import org.opensearch.ad.util.Bwc;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -51,7 +50,7 @@ public ProfileNodeResponse(StreamInput in) throws IOException {
shingleSize = in.readInt();
activeEntities = in.readVLong();
totalUpdates = in.readVLong();
if (Bwc.supportMultiCategoryFields(in.getVersion()) && in.readBoolean()) {
if (in.readBoolean()) {
// added after OpenSearch 1.0
modelProfiles = in.readList(ModelProfile::new);
modelCount = in.readVLong();
Expand Down Expand Up @@ -111,15 +110,13 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeInt(shingleSize);
out.writeVLong(activeEntities);
out.writeVLong(totalUpdates);
if (Bwc.supportMultiCategoryFields(out.getVersion())) {
// added after OpenSearch 1.0
if (modelProfiles != null) {
out.writeBoolean(true);
out.writeList(modelProfiles);
out.writeVLong(modelCount);
} else {
out.writeBoolean(false);
}
// added after OpenSearch 1.0
if (modelProfiles != null) {
out.writeBoolean(true);
out.writeList(modelProfiles);
out.writeVLong(modelCount);
} else {
out.writeBoolean(false);
}
}

Expand Down
28 changes: 5 additions & 23 deletions src/main/java/org/opensearch/ad/transport/ProfileResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.ModelProfile;
import org.opensearch.ad.model.ModelProfileOnNode;
import org.opensearch.ad.util.Bwc;
import org.opensearch.cluster.ClusterName;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -65,23 +64,15 @@ public ProfileResponse(StreamInput in) throws IOException {
int size = in.readVInt();
modelProfile = new ModelProfileOnNode[size];
for (int i = 0; i < size; i++) {
if (Bwc.supportMultiCategoryFields(in.getVersion())) {
modelProfile[i] = new ModelProfileOnNode(in);
} else {
// we don't have model information from old node
ModelProfile profile = new ModelProfile(in);
modelProfile[i] = new ModelProfileOnNode(CommonName.EMPTY_FIELD, profile);
}
modelProfile[i] = new ModelProfileOnNode(in);
}

shingleSize = in.readInt();
coordinatingNode = in.readString();
totalSizeInBytes = in.readVLong();
activeEntities = in.readVLong();
totalUpdates = in.readVLong();
if (Bwc.supportMultiCategoryFields(in.getVersion())) {
modelCount = in.readVLong();
}
modelCount = in.readVLong();
}

/**
Expand Down Expand Up @@ -140,25 +131,16 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(modelProfile.length);

if (Bwc.supportMultiCategoryFields(out.getVersion())) {
for (ModelProfileOnNode profile : modelProfile) {
profile.writeTo(out);
}
} else {
for (ModelProfileOnNode profile : modelProfile) {
ModelProfile oldFormatModelProfile = profile.getModelProfile();
oldFormatModelProfile.writeTo(out);
}
for (ModelProfileOnNode profile : modelProfile) {
profile.writeTo(out);
}

out.writeInt(shingleSize);
out.writeString(coordinatingNode);
out.writeVLong(totalSizeInBytes);
out.writeVLong(activeEntities);
out.writeVLong(totalUpdates);
if (Bwc.supportMultiCategoryFields(out.getVersion())) {
out.writeVLong(modelCount);
}
out.writeVLong(modelCount);
}

@Override
Expand Down
Loading