diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dd70f3cfbe4e..36d6f5efbcd6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615))) - Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348)) - Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459)) +- Support versioning for Weighted routing apis([#5255](https://github.com/opensearch-project/OpenSearch/pull/5255)) - Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668)) - Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959)) diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/AwarenessAttributeDecommissionRestIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/AwarenessAttributeDecommissionRestIT.java index 4d9115b8962ea..b7228a75984fa 100644 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/AwarenessAttributeDecommissionRestIT.java +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/AwarenessAttributeDecommissionRestIT.java @@ -91,6 +91,7 @@ public void testRestStatusForAcknowledgedDecommission() throws IOException { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java index bba07d878a42c..ac590a17c2875 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java @@ -8,11 +8,13 @@ package org.opensearch.cluster.routing; +import org.opensearch.ResourceNotFoundException; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; import org.opensearch.common.settings.Settings; +import org.opensearch.rest.RestStatus; import org.opensearch.test.OpenSearchIntegTestCase; import java.io.IOException; @@ -64,6 +66,7 @@ public void testPutWeightedRouting() { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertEquals(response.isAcknowledged(), true); @@ -73,6 +76,7 @@ public void testPutWeightedRouting() { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(0) .get(); assertEquals(response.isAcknowledged(), true); } @@ -199,6 +203,7 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertEquals(response.isAcknowledged(), true); @@ -270,6 +275,7 @@ public void testWeightedRoutingMetadataOnOSProcessRestart() throws Exception { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertEquals(response.isAcknowledged(), true); @@ -307,12 +313,14 @@ public void testDeleteWeightedRouting_WeightsNotSet() { assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); // delete weighted routing metadata - ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get(); - assertTrue(deleteResponse.isAcknowledged()); - assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + ResourceNotFoundException exception = expectThrows( + ResourceNotFoundException.class, + () -> client().admin().cluster().prepareDeleteWeightedRouting().setVersion(-1).get() + ); + assertEquals(RestStatus.NOT_FOUND, exception.status()); } - public void testDeleteWeightedRouting_WeightsAreSet() { + public void testDeleteWeightedRouting_WeightsAreSet() throws IOException { Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") @@ -339,13 +347,111 @@ public void testDeleteWeightedRouting_WeightsAreSet() { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); - assertEquals(response.isAcknowledged(), true); + assertTrue(response.isAcknowledged()); assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); // delete weighted routing metadata - ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get(); + ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get(); assertTrue(deleteResponse.isAcknowledged()); - assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + } + + public void testPutAndDeleteWithVersioning() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> starting 6 nodes on different zones"); + int nodeCountPerAZ = 2; + + logger.info("--> starting a dedicated cluster manager node"); + internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build()); + + logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'"); + internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()); + internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()); + internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("7").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + + logger.info("--> setting shard routing weights for weighted round robin"); + + Map weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + ClusterPutWeightedRoutingResponse response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(-1) + .get(); + assertTrue(response.isAcknowledged()); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + // update weights api call with correct version number + weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0); + weightedRouting = new WeightedRouting("zone", weights); + response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(0).get(); + assertTrue(response.isAcknowledged()); + + // update weights api call with incorrect version number + weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0); + WeightedRouting weightedRouting1 = new WeightedRouting("zone", weights); + UnsupportedWeightedRoutingStateException exception = expectThrows( + UnsupportedWeightedRoutingStateException.class, + () -> client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting1).setVersion(100).get() + ); + assertEquals(exception.status(), RestStatus.CONFLICT); + + // get weights call + ClusterGetWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareGetWeightedRouting() + .setAwarenessAttribute("zone") + .get(); + + // update weights call using version returned by get api call + weights = Map.of("a", 1.0, "b", 2.0, "c", 5.0); + weightedRouting = new WeightedRouting("zone", weights); + response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(weightedRoutingResponse.getVersion()) + .get(); + assertTrue(response.isAcknowledged()); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + // delete weights by awareness attribute + ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin() + .cluster() + .prepareDeleteWeightedRouting() + .setAwarenessAttribute("zone") + .setVersion(2) + .get(); + assertTrue(deleteResponse.isAcknowledged()); + + // update weights again and make sure that version number got updated on delete + weights = Map.of("a", 1.0, "b", 2.0, "c", 6.0); + weightedRouting = new WeightedRouting("zone", weights); + response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(3).get(); + assertTrue(response.isAcknowledged()); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + // delete weights + deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(4).get(); + assertTrue(deleteResponse.isAcknowledged()); + + // delete weights call, incorrect version number + UnsupportedWeightedRoutingStateException deleteException = expectThrows( + UnsupportedWeightedRoutingStateException.class, + () -> client().admin().cluster().prepareDeleteWeightedRouting().setVersion(7).get() + ); + assertEquals(RestStatus.CONFLICT, deleteException.status()); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java index 71eab8ff35a2d..12c45e12bb612 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java @@ -8,12 +8,26 @@ package org.opensearch.action.admin.cluster.shards.routing.weighted.delete; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchGenerationException; +import org.opensearch.OpenSearchParseException; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.DeprecationHandler; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; import java.io.IOException; +import java.util.Map; /** * Request to delete weights for weighted round-robin shard routing policy. @@ -21,10 +35,42 @@ * @opensearch.internal */ public class ClusterDeleteWeightedRoutingRequest extends ClusterManagerNodeRequest { - public ClusterDeleteWeightedRoutingRequest() {} + private static final Logger logger = LogManager.getLogger(ClusterDeleteWeightedRoutingRequest.class); + + private long version; + private String awarenessAttribute; + + public void setVersion(long version) { + this.version = version; + } + + public ClusterDeleteWeightedRoutingRequest() { + this.version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; + } public ClusterDeleteWeightedRoutingRequest(StreamInput in) throws IOException { super(in); + version = in.readLong(); + if (in.available() != 0) { + awarenessAttribute = in.readString(); + } + } + + public long getVersion() { + return version; + } + + public String getAwarenessAttribute() { + return awarenessAttribute; + } + + public void setAwarenessAttribute(String awarenessAttribute) { + this.awarenessAttribute = awarenessAttribute; + } + + public ClusterDeleteWeightedRoutingRequest(String awarenessAttribute) { + this.awarenessAttribute = awarenessAttribute; + this.version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; } @Override @@ -32,13 +78,72 @@ public ActionRequestValidationException validate() { return null; } + /** + * @param source weights definition from request body + * @return this request + */ + public ClusterDeleteWeightedRoutingRequest source(Map source) { + try { + if (source.isEmpty()) { + throw new OpenSearchParseException(("Empty request body")); + } + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.map(source); + setRequestBody(BytesReference.bytes(builder), builder.contentType()); + } catch (IOException e) { + throw new OpenSearchGenerationException("Failed to generate [" + source + "]", e); + } + return this; + } + + public void setRequestBody(BytesReference source, XContentType contentType) { + try ( + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + source, + contentType + ) + ) { + String versionAttr = null; + XContentParser.Token token; + // move to the first alias + parser.nextToken(); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + String fieldName = parser.currentName(); + if (fieldName != null && fieldName.equals(WeightedRoutingMetadata.VERSION)) { + versionAttr = parser.currentName(); + } else { + throw new OpenSearchParseException( + "failed to parse delete weighted routing request body [{}], unknown type", + fieldName + ); + } + } else if (token == XContentParser.Token.VALUE_STRING) { + if (versionAttr != null && versionAttr.equals(WeightedRoutingMetadata.VERSION)) { + this.version = Long.parseLong(parser.text()); + } + } else { + throw new OpenSearchParseException("failed to parse delete weighted routing request body"); + } + } + } catch (IOException e) { + logger.error("error while parsing delete request for weighted routing request object", e); + } + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + out.writeLong(version); + if (awarenessAttribute != null) { + out.writeString(awarenessAttribute); + } } @Override public String toString() { - return "ClusterDeleteWeightedRoutingRequest"; + return "ClusterDeleteWeightedRoutingRequest{" + "version= " + version + "awarenessAttribute=" + awarenessAttribute + "}"; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java index 19976ac6b07aa..bb34fea589534 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java @@ -24,4 +24,15 @@ public class ClusterDeleteWeightedRoutingRequestBuilder extends ClusterManagerNo public ClusterDeleteWeightedRoutingRequestBuilder(OpenSearchClient client, ClusterDeleteWeightedRoutingAction action) { super(client, action, new ClusterDeleteWeightedRoutingRequest()); } + + public ClusterDeleteWeightedRoutingRequestBuilder setVersion(long version) { + request.setVersion(version); + return this; + } + + public ClusterDeleteWeightedRoutingRequestBuilder setAwarenessAttribute(String attribute) { + request.setAwarenessAttribute(attribute); + return this; + } + } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java index bb77576b63d20..d0f0d46da280b 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java @@ -11,6 +11,7 @@ import org.opensearch.OpenSearchParseException; import org.opensearch.action.ActionResponse; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -34,6 +35,12 @@ public class ClusterGetWeightedRoutingResponse extends ActionResponse implements private WeightedRouting weightedRouting; private String localNodeWeight; private static final String NODE_WEIGHT = "node_weight"; + private static final String WEIGHTS = "weights"; + private long version; + + public long getVersion() { + return version; + } public String getLocalNodeWeight() { return localNodeWeight; @@ -43,14 +50,16 @@ public String getLocalNodeWeight() { this.weightedRouting = null; } - public ClusterGetWeightedRoutingResponse(String localNodeWeight, WeightedRouting weightedRouting) { + public ClusterGetWeightedRoutingResponse(String localNodeWeight, WeightedRouting weightedRouting, long version) { this.localNodeWeight = localNodeWeight; this.weightedRouting = weightedRouting; + this.version = version; } ClusterGetWeightedRoutingResponse(StreamInput in) throws IOException { if (in.available() != 0) { this.weightedRouting = new WeightedRouting(in); + this.version = in.readLong(); } } @@ -67,6 +76,7 @@ public WeightedRouting weights() { public void writeTo(StreamOutput out) throws IOException { if (weightedRouting != null) { weightedRouting.writeTo(out); + out.writeLong(version); } } @@ -74,12 +84,15 @@ public void writeTo(StreamOutput out) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); if (this.weightedRouting != null) { + builder.startObject(WEIGHTS); for (Map.Entry entry : weightedRouting.weights().entrySet()) { builder.field(entry.getKey(), entry.getValue().toString()); } if (localNodeWeight != null) { builder.field(NODE_WEIGHT, localNodeWeight); } + builder.endObject(); + builder.field(WeightedRoutingMetadata.VERSION, version); } builder.endObject(); return builder; @@ -88,26 +101,58 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser parser) throws IOException { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); XContentParser.Token token; - String attrKey = null, attrValue = null; + String attrKey = null, attrValue; String localNodeWeight = null; Map weights = new HashMap<>(); + long version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; + String versionAttr = null; + String weightsAttr; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { - attrKey = parser.currentName(); - } else if (token == XContentParser.Token.VALUE_STRING) { - attrValue = parser.text(); - if (attrKey != null && attrKey.equals(NODE_WEIGHT)) { - localNodeWeight = attrValue; - } else if (attrKey != null) { - weights.put(attrKey, Double.parseDouble(attrValue)); + String fieldName = parser.currentName(); + if (fieldName != null && fieldName.equals(WeightedRoutingMetadata.VERSION)) { + versionAttr = parser.currentName(); + continue; + } else if (fieldName != null && fieldName.equals(WEIGHTS)) { + weightsAttr = parser.currentName(); + } else { + throw new OpenSearchParseException("failed to parse weighted routing request object", fieldName); + } + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new OpenSearchParseException( + "failed to parse weighted routing request object [{}], expected object", + weightsAttr + ); + } + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + attrKey = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + attrValue = parser.text(); + if (attrKey != null && attrKey.equals(NODE_WEIGHT)) { + localNodeWeight = attrValue; + } else if (attrKey != null && attrKey.equals(WeightedRoutingMetadata.VERSION)) { + version = Long.parseLong(attrValue); + } else if (attrKey != null) { + weights.put(attrKey, Double.parseDouble(attrValue)); + } + } else { + throw new OpenSearchParseException("failed to parse weighted routing request attribute [{}]", attrKey); + } + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (versionAttr != null && versionAttr.equals(WeightedRoutingMetadata.VERSION)) { + version = parser.longValue(); } } else { - throw new OpenSearchParseException("failed to parse weighted routing response"); + throw new OpenSearchParseException("failed to parse weighted routing request"); } } + WeightedRouting weightedRouting = new WeightedRouting("", weights); - return new ClusterGetWeightedRoutingResponse(localNodeWeight, weightedRouting); + return new ClusterGetWeightedRoutingResponse(localNodeWeight, weightedRouting, version); } @Override @@ -115,7 +160,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ClusterGetWeightedRoutingResponse that = (ClusterGetWeightedRoutingResponse) o; - return weightedRouting.equals(that.weightedRouting) && localNodeWeight.equals(that.localNodeWeight); + return weightedRouting.equals(that.weightedRouting) && localNodeWeight.equals(that.localNodeWeight) && version == that.version; } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java index 9421967a5df26..6ebe7e88185ed 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java @@ -100,8 +100,13 @@ protected void clusterManagerOperation( weight = weightedRouting.weights().get(attrVal).toString(); } } + } - clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse(weight, weightedRouting); + clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse( + weight, + weightedRouting, + weightedRoutingMetadata.getVersion() + ); } listener.onResponse(clusterGetWeightedRoutingResponse); } catch (Exception ex) { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java index 8adbf13a000c5..cba4d0e8e796c 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java @@ -14,6 +14,7 @@ import org.opensearch.OpenSearchParseException; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.StreamInput; @@ -43,6 +44,15 @@ public class ClusterPutWeightedRoutingRequest extends ClusterManagerNodeRequest< private WeightedRouting weightedRouting; private String attributeName; + private long version; + + public void version(long version) { + this.version = version; + } + + public long getVersion() { + return this.version; + } public ClusterPutWeightedRoutingRequest() {} @@ -62,13 +72,14 @@ public void attributeName(String attributeName) { public ClusterPutWeightedRoutingRequest(StreamInput in) throws IOException { super(in); weightedRouting = new WeightedRouting(in); + version = in.readLong(); } public ClusterPutWeightedRoutingRequest(String attributeName) { this.attributeName = attributeName; } - public void setWeightedRouting(Map source) { + public void setWeightedRouting(Map source) { try { if (source.isEmpty()) { throw new OpenSearchParseException(("Empty request body")); @@ -96,22 +107,56 @@ public void setWeightedRouting(BytesReference source, XContentType contentType) XContentParser.Token token; // move to the first alias parser.nextToken(); + String versionAttr = null; + String weightsAttr; + long version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { - attrValue = parser.currentName(); - } else if (token == XContentParser.Token.VALUE_STRING) { - attrWeight = Double.parseDouble(parser.text()); - weights.put(attrValue, attrWeight); + String fieldName = parser.currentName(); + if (fieldName != null && fieldName.equals(WeightedRoutingMetadata.VERSION)) { + versionAttr = parser.currentName(); + continue; + } else if (fieldName != null && fieldName.equals("weights")) { + weightsAttr = parser.currentName(); + } else { + throw new OpenSearchParseException("failed to parse weighted routing request object [{}]", fieldName); + } + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new OpenSearchParseException( + "failed to parse weighted routing request object [{}], expected object", + weightsAttr + ); + } + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + attrValue = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + attrWeight = Double.parseDouble(parser.text()); + weights.put(attrValue, attrWeight); + } else { + throw new OpenSearchParseException( + "failed to parse weighted routing request attribute [{}], " + "unknown type", + attrWeight + ); + } + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (versionAttr != null && versionAttr.equals(WeightedRoutingMetadata.VERSION)) { + version = parser.longValue(); + } } else { throw new OpenSearchParseException( - "failed to parse weighted routing request attribute [{}], " + "unknown type", - attrWeight + "failed to parse weighted routing request " + "[{}], unknown " + "type", + attributeName ); } } this.weightedRouting = new WeightedRouting(this.attributeName, weights); + this.version = version; } catch (IOException e) { - logger.error("error while parsing put for weighted routing request object", e); + logger.error("error while parsing put weighted routing request object", e); } } @@ -127,6 +172,9 @@ public ActionRequestValidationException validate() { if (weightedRouting.weights() == null || weightedRouting.weights().isEmpty()) { validationException = addValidationError("Weights are missing", validationException); } + if (version == WeightedRoutingMetadata.VERSION_UNSET_VALUE) { + validationException = addValidationError("Version is missing", validationException); + } int countValueWithZeroWeights = 0; double weight; try { @@ -164,7 +212,7 @@ public ActionRequestValidationException validate() { * @param source weights definition from request body * @return this request */ - public ClusterPutWeightedRoutingRequest source(Map source) { + public ClusterPutWeightedRoutingRequest source(Map source) { setWeightedRouting(source); return this; } @@ -173,11 +221,12 @@ public ClusterPutWeightedRoutingRequest source(Map source) { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); weightedRouting.writeTo(out); + out.writeLong(version); } @Override public String toString() { - return "ClusterPutWeightedRoutingRequest{" + "weightedRouting= " + weightedRouting.toString() + "}"; + return "ClusterPutWeightedRoutingRequest{" + "weightedRouting= " + weightedRouting.toString() + "version= " + version + "}"; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java index b437f4c54d8d6..adfb2cf02f6d9 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java @@ -30,4 +30,8 @@ public ClusterPutWeightedRoutingRequestBuilder setWeightedRouting(WeightedRoutin return this; } + public ClusterPutWeightedRoutingRequestBuilder setVersion(long version) { + request.version(version); + return this; + } } diff --git a/server/src/main/java/org/opensearch/client/Requests.java b/server/src/main/java/org/opensearch/client/Requests.java index 21f2a2d906602..cad5bac8acf0d 100644 --- a/server/src/main/java/org/opensearch/client/Requests.java +++ b/server/src/main/java/org/opensearch/client/Requests.java @@ -578,8 +578,8 @@ public static ClusterGetWeightedRoutingRequest getWeightedRoutingRequest(String * * @return delete weight request */ - public static ClusterDeleteWeightedRoutingRequest deleteWeightedRoutingRequest() { - return new ClusterDeleteWeightedRoutingRequest(); + public static ClusterDeleteWeightedRoutingRequest deleteWeightedRoutingRequest(String attributeName) { + return new ClusterDeleteWeightedRoutingRequest(attributeName); } /** diff --git a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java index 07cdc949c4529..320f75a9f2ada 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java @@ -36,6 +36,15 @@ public class WeightedRoutingMetadata extends AbstractNamedDiffable weights = new HashMap<>(); - WeightedRouting weightedRouting = null; + WeightedRouting weightedRouting; XContentParser.Token token; - String awarenessField = null; + String awarenessField; + String versionAttr = null; + long version = VERSION_UNSET_VALUE; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { - awarenessField = parser.currentName(); + String attr = parser.currentName(); + if (attr != null && attr.equals(VERSION)) { + versionAttr = parser.currentName(); + continue; + } else { + awarenessField = parser.currentName(); + } if (parser.nextToken() != XContentParser.Token.START_OBJECT) { - throw new OpenSearchParseException( - "failed to parse weighted routing metadata [{}], expected " + "object", - awarenessField - ); + throw new OpenSearchParseException("failed to parse weighted routing metadata [{}], expected object", awarenessField); } while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { attributeName = parser.currentName(); if (parser.nextToken() != XContentParser.Token.START_OBJECT) { throw new OpenSearchParseException( - "failed to parse weighted routing metadata [{}], expected" + " object", + "failed to parse weighted routing metadata [{}], expected object", attributeName ); } while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { attrKey = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_NUMBER) { - attrValue = Double.parseDouble(parser.text()); - weights.put(attrKey, attrValue); + if (attrKey != null && attrKey.equals(VERSION)) { + version = Long.parseLong(parser.text()); + } else { + attrValue = Double.parseDouble(parser.text()); + weights.put(attrKey, attrValue); + } + } else { throw new OpenSearchParseException( "failed to parse weighted routing metadata attribute " + "[{}], unknown type", @@ -123,10 +146,14 @@ public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws } } } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (versionAttr != null && versionAttr.equals(VERSION)) { + version = Long.parseLong(parser.text()); + } } } weightedRouting = new WeightedRouting(attributeName, weights); - return new WeightedRoutingMetadata(weightedRouting); + return new WeightedRoutingMetadata(weightedRouting, version); } @Override @@ -144,18 +171,21 @@ public int hashCode() { @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { - toXContent(weightedRouting, builder); + toXContent(weightedRouting, builder, version); return builder; } - public static void toXContent(WeightedRouting weightedRouting, XContentBuilder builder) throws IOException { + public static void toXContent(WeightedRouting weightedRouting, XContentBuilder builder, long version) throws IOException { builder.startObject(AWARENESS); - builder.startObject(weightedRouting.attributeName()); - for (Map.Entry entry : weightedRouting.weights().entrySet()) { - builder.field(entry.getKey(), entry.getValue()); + if (weightedRouting.isSet()) { + builder.startObject(weightedRouting.attributeName()); + for (Map.Entry entry : weightedRouting.weights().entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + builder.endObject(); } builder.endObject(); - builder.endObject(); + builder.field(VERSION, version); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index 9026da667ccb0..a4b4cc961fade 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -324,7 +324,7 @@ private ShardIterator shardRoutings( @Nullable Map nodeCounts, @Nullable WeightedRoutingMetadata weightedRoutingMetadata ) { - if (weightedRoutingMetadata != null) { + if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet()) { return indexShard.activeInitializingShardsWeightedIt( weightedRoutingMetadata.getWeightedRouting(), nodes, diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java index df2d8d595eaab..3117f4c7cd001 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java @@ -13,6 +13,7 @@ import org.opensearch.common.io.stream.Writeable; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -25,6 +26,11 @@ public class WeightedRouting implements Writeable { private String attributeName; private Map weights; + public WeightedRouting() { + this.attributeName = ""; + this.weights = new HashMap<>(); + } + public WeightedRouting(String attributeName, Map weights) { this.attributeName = attributeName; this.weights = weights; @@ -40,6 +46,11 @@ public WeightedRouting(StreamInput in) throws IOException { weights = (Map) in.readGenericValue(); } + public boolean isSet() { + if (this.attributeName.isEmpty() || this.weights.isEmpty()) return false; + return true; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(attributeName); diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java index 895b790f8499a..ef1a196d751c3 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest; import org.opensearch.action.ActionRequestValidationException; @@ -77,13 +78,16 @@ public WeightedRoutingService( CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, this::setForcedAwarenessAttributes ); + } public void registerWeightedRoutingMetadata( final ClusterPutWeightedRoutingRequest request, final ActionListener listener ) { - final WeightedRoutingMetadata newWeightedRoutingMetadata = new WeightedRoutingMetadata(request.getWeightedRouting()); + final WeightedRouting newWeightedRouting = new WeightedRouting(request.getWeightedRouting()); + + final long requestVersion = request.getVersion(); clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { @@ -94,19 +98,21 @@ public ClusterState execute(ClusterState currentState) { Metadata metadata = currentState.metadata(); Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); + ensureNoVersionConflict(requestVersion, weightedRoutingMetadata); if (weightedRoutingMetadata == null) { - logger.info("put weighted routing weights in metadata [{}]", request.getWeightedRouting()); - weightedRoutingMetadata = new WeightedRoutingMetadata(request.getWeightedRouting()); + logger.info("add weighted routing weights in metadata [{}]", newWeightedRouting); + weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRouting, requestVersion + 1); } else { - if (!checkIfSameWeightsInMetadata(newWeightedRoutingMetadata, weightedRoutingMetadata)) { - logger.info("updated weighted routing weights [{}] in metadata", request.getWeightedRouting()); - weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRoutingMetadata.getWeightedRouting()); + if (!checkIfSameWeightsInMetadata(newWeightedRouting, weightedRoutingMetadata.getWeightedRouting())) { + logger.info("updated weighted routing weights [{}] in metadata", newWeightedRouting); + weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRouting, weightedRoutingMetadata.getVersion() + 1); } else { + logger.info("weights are same, not updating weighted routing weights [{}] in metadata", newWeightedRouting); return currentState; } } mdBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); - logger.info("building cluster state with weighted routing weights [{}]", request.getWeightedRouting()); + logger.info("building cluster state with weighted routing weights [{}]", newWeightedRouting); return ClusterState.builder(currentState).metadata(mdBuilder).build(); } @@ -124,23 +130,41 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } - private boolean checkIfSameWeightsInMetadata( - WeightedRoutingMetadata newWeightedRoutingMetadata, - WeightedRoutingMetadata oldWeightedRoutingMetadata - ) { - return newWeightedRoutingMetadata.getWeightedRouting().equals(oldWeightedRoutingMetadata.getWeightedRouting()); + private boolean checkIfSameWeightsInMetadata(WeightedRouting newWeights, WeightedRouting oldWeights) { + return newWeights.equals(oldWeights); } public void deleteWeightedRoutingMetadata( final ClusterDeleteWeightedRoutingRequest request, final ActionListener listener ) { + final long requestVersion = request.getVersion(); + final String awarenessAttribute = request.getAwarenessAttribute(); clusterService.submitStateUpdateTask("delete_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { logger.info("Deleting weighted routing metadata from the cluster state"); + Metadata metadata = currentState.metadata(); Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); - mdBuilder.removeCustom(WeightedRoutingMetadata.TYPE); + WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); + ensureNoVersionConflict(requestVersion, weightedRoutingMetadata); + + if ((weightedRoutingMetadata != null && awarenessAttribute == null) + || (weightedRoutingMetadata != null + && weightedRoutingMetadata.getWeightedRouting().attributeName().equals(awarenessAttribute))) { + weightedRoutingMetadata = new WeightedRoutingMetadata(new WeightedRouting(), weightedRoutingMetadata.getVersion() + 1); + } else { + throw new ResourceNotFoundException( + String.format( + Locale.ROOT, + "weighted routing metadata does not have weights set for awareness attribute %s", + awarenessAttribute + ) + ); + } + + mdBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + logger.info("building cluster state with weighted routing weights deleted"); return ClusterState.builder(currentState).metadata(mdBuilder).build(); } @@ -153,7 +177,6 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { logger.debug("cluster weighted routing metadata change is processed by all the nodes"); - assert newState.metadata().weightedRoutingMetadata() == null; listener.onResponse(new ClusterDeleteWeightedRoutingResponse(true)); } }); @@ -183,7 +206,7 @@ public void verifyAwarenessAttribute(String attributeName) { if (getAwarenessAttributes().contains(attributeName) == false) { ActionRequestValidationException validationException = null; validationException = addValidationError( - String.format(Locale.ROOT, "invalid awareness attribute %s requested for updating weighted routing", attributeName), + String.format(Locale.ROOT, "invalid awareness attribute %s requested for weighted routing", attributeName), validationException ); throw validationException; @@ -270,4 +293,18 @@ private void ensureDecommissionedAttributeHasZeroWeight(ClusterState state, Clus ); } } + + private void ensureNoVersionConflict(long requestedVersion, WeightedRoutingMetadata weightedRoutingMetadata) { + if ((weightedRoutingMetadata == null && requestedVersion != WeightedRoutingMetadata.INITIAL_VERSION) + || (weightedRoutingMetadata != null && requestedVersion != weightedRoutingMetadata.getVersion())) { + throw new UnsupportedWeightedRoutingStateException( + String.format( + Locale.ROOT, + "requested version is %s but cluster weighted routing metadata is at a " + "different version %s ", + requestedVersion, + weightedRoutingMetadata != null ? weightedRoutingMetadata.getVersion() : WeightedRoutingMetadata.INITIAL_VERSION + ) + ); + } + } } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java index 9742cc373d520..d9dedf8d14506 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java @@ -20,7 +20,8 @@ import java.io.IOException; import java.util.List; -import static java.util.Collections.singletonList; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; import static org.opensearch.rest.RestRequest.Method.DELETE; /** @@ -35,7 +36,12 @@ public class RestClusterDeleteWeightedRoutingAction extends BaseRestHandler { @Override public List routes() { - return singletonList(new Route(DELETE, "/_cluster/routing/awareness/weights")); + return unmodifiableList( + asList( + new Route(DELETE, "/_cluster/routing/awareness/weights"), + new Route(DELETE, "/_cluster/routing/awareness/{attribute}/weights") + ) + ); } @Override @@ -45,9 +51,17 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = Requests.deleteWeightedRoutingRequest(); + ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = createRequest(request); return channel -> client.admin() .cluster() .deleteWeightedRouting(clusterDeleteWeightedRoutingRequest, new RestToXContentListener<>(channel)); } + + public static ClusterDeleteWeightedRoutingRequest createRequest(RestRequest request) throws IOException { + ClusterDeleteWeightedRoutingRequest deleteWeightedRoutingRequest = Requests.deleteWeightedRoutingRequest( + request.param("attribute") + ); + request.applyContentParser(p -> deleteWeightedRoutingRequest.source(p.mapStrings())); + return deleteWeightedRoutingRequest; + } } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java index 1cf44e665cf84..5f845b7a66c1f 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java @@ -51,7 +51,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli public static ClusterPutWeightedRoutingRequest createRequest(RestRequest request) throws IOException { ClusterPutWeightedRoutingRequest putWeightedRoutingRequest = Requests.putWeightedRoutingRequest(request.param("attribute")); - request.applyContentParser(p -> putWeightedRoutingRequest.source(p.mapStrings())); + request.applyContentParser(p -> putWeightedRoutingRequest.source(p.mapOrdered())); return putWeightedRoutingRequest; } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java index 5e456158941b8..9d0ed8e03d7f2 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java @@ -19,16 +19,18 @@ public class ClusterPutWeightedRoutingRequestTests extends OpenSearchTestCase { public void testSetWeightedRoutingWeight() { - String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"1\",\"us-east-1a\":\"1\"}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"1\",\"us-east-1a\":\"1\"},\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); Map weights = Map.of("us-east-1a", 1.0, "us-east-1b", 1.0, "us-east-1c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); - assertEquals(request.getWeightedRouting(), weightedRouting); + assertEquals(weightedRouting, request.getWeightedRouting()); + assertEquals(1, request.getVersion()); } public void testValidate_ValuesAreProper() { - String reqString = "{\"us-east-1c\" : \"1\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"1\",\"us-east-1a\":\"1\"},\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); ActionRequestValidationException actionRequestValidationException = request.validate(); @@ -45,7 +47,7 @@ public void testValidate_MissingWeights() { } public void testValidate_AttributeMissing() { - String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"1\",\"us-east-1a\":\"1\"}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"1\",\"us-east-1a\": \"1\"},\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest(); request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); ActionRequestValidationException actionRequestValidationException = request.validate(); @@ -54,7 +56,7 @@ public void testValidate_AttributeMissing() { } public void testValidate_MoreThanHalfWithZeroWeight() { - String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"0\",\"us-east-1a\": \"1\"}," + "\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); ActionRequestValidationException actionRequestValidationException = request.validate(); @@ -63,4 +65,13 @@ public void testValidate_MoreThanHalfWithZeroWeight() { actionRequestValidationException.getMessage().contains("Maximum expected number of routing weights having zero weight is [1]") ); } + + public void testValidate_VersionMissing() { + String reqString = "{\"weights\":{\"us-east-1c\": \"0\",\"us-east-1b\": \"1\",\"us-east-1a\": \"1\"}}"; + ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); + ActionRequestValidationException actionRequestValidationException = request.validate(); + assertNotNull(actionRequestValidationException); + assertTrue(actionRequestValidationException.getMessage().contains("Version is missing")); + } } diff --git a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java index e9add55ca774b..b17d3b789e03f 100644 --- a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java +++ b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java @@ -21,7 +21,7 @@ public class ClusterGetWeightedRoutingResponseTests extends AbstractXContentTest protected ClusterGetWeightedRoutingResponse createTestInstance() { Map weights = Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 1.0); WeightedRouting weightedRouting = new WeightedRouting("", weights); - ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse("1", weightedRouting); + ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse("1", weightedRouting, 3); return response; } diff --git a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java index f28e932e068ac..c8b22b200dadc 100644 --- a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java +++ b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java @@ -174,7 +174,7 @@ private ClusterState setLocalNode(ClusterState clusterState, String nodeId) { private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights) { WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index 81fbd7c0e332b..a942c62bd05eb 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -332,7 +332,7 @@ public void onFailure(Exception e) { private void setWeightedRoutingWeights(Map weights) { ClusterState clusterState = clusterService.state(); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java index a0a9d2bd9586b..17b682618b1a8 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java @@ -20,7 +20,7 @@ public class WeightedRoutingMetadataTests extends AbstractXContentTestCase weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, -1); return weightedRoutingMetadata; } diff --git a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java index 014f2d237a306..d64402a74fba2 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java @@ -789,7 +789,7 @@ private ClusterState clusterStateForWeightedRouting(String[] indexNames, int num private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights) { WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); @@ -1158,7 +1158,7 @@ private ClusterState updateStatetoTestWeightedRouting( // add weighted routing weights in metadata Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState.metadata(metadataBuilder); clusterState.routingTable(routingTableBuilder.build()); diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java index 089fb453ca2c0..1f892b993d4d6 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java @@ -175,7 +175,7 @@ private ClusterState setLocalNode(ClusterState clusterState, String nodeId) { private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights) { WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); @@ -260,13 +260,13 @@ public void testDeleteWeightedRoutingMetadata() throws InterruptedException { ClusterState.Builder builder = ClusterState.builder(state); ClusterServiceUtils.setState(clusterService, builder); - ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = new ClusterDeleteWeightedRoutingRequest(); + ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = new ClusterDeleteWeightedRoutingRequest("zone"); + clusterDeleteWeightedRoutingRequest.setVersion(0); final CountDownLatch countDownLatch = new CountDownLatch(1); ActionListener listener = new ActionListener() { @Override public void onResponse(ClusterDeleteWeightedRoutingResponse clusterDeleteWeightedRoutingResponse) { assertTrue(clusterDeleteWeightedRoutingResponse.isAcknowledged()); - assertNull(clusterService.state().metadata().weightedRoutingMetadata()); countDownLatch.countDown(); } diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java index a4cd6224217b7..582fbfce315b2 100644 --- a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java @@ -34,14 +34,15 @@ public void setupAction() { } public void testCreateRequest_SupportedRequestBody() throws IOException { - String req = "{\"us-east-1c\" : \"1\", \"us-east-1d\":\"1.0\", \"us-east-1a\":\"0.0\"}"; + String req = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"1\",\"us-east-1a\":\"1\"},\"_version\":1}"; RestRequest restRequest = buildRestRequest(req); ClusterPutWeightedRoutingRequest clusterPutWeightedRoutingRequest = RestClusterPutWeightedRoutingAction.createRequest(restRequest); assertEquals("zone", clusterPutWeightedRoutingRequest.getWeightedRouting().attributeName()); assertNotNull(clusterPutWeightedRoutingRequest.getWeightedRouting().weights()); - assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1c").toString()); - assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1d").toString()); - assertEquals("0.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1a").toString()); + assertEquals("0.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1c").toString()); + assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1b").toString()); + assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1a").toString()); + assertEquals(1, clusterPutWeightedRoutingRequest.getVersion()); } public void testCreateRequest_UnsupportedRequestBody() throws IOException { @@ -54,7 +55,7 @@ public void testCreateRequest_UnsupportedRequestBody() throws IOException { public void testCreateRequest_MalformedRequestBody() throws IOException { Map params = new HashMap<>(); - String req = "{\"us-east-1c\" : \1\", \"us-east-1d\":\"1\", \"us-east-1a\":\"0\"}"; + String req = "{\"weights\":{\"us-east-1c\":\"0,\"us-east-1b\":\"1\",\"us-east-1a\":\"1\"},\"_version\":1}"; RestRequest restRequest = buildRestRequest(req); assertThrows(JsonParseException.class, () -> RestClusterPutWeightedRoutingAction.createRequest(restRequest)); } diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java new file mode 100644 index 0000000000000..2589d68e4cf0b --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.admin.cluster; + +import org.junit.Before; +import org.opensearch.OpenSearchParseException; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.test.rest.RestActionTestCase; + +import java.io.IOException; + +import static java.util.Collections.singletonMap; + +public class RestClusterDeleteWeightedRoutingActionTests extends RestActionTestCase { + private RestClusterPutWeightedRoutingAction action; + + @Before + public void setupAction() { + action = new RestClusterPutWeightedRoutingAction(); + controller().registerHandler(action); + } + + public void testDeleteRequest_SupportedRequestBody() throws IOException { + String req = "{\"_version\":2}"; + RestRequest restRequest = buildRestRequest(req); + ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = RestClusterDeleteWeightedRoutingAction.createRequest( + restRequest + ); + assertEquals(2, clusterDeleteWeightedRoutingRequest.getVersion()); + + restRequest = buildRestRequestWithAwarenessAttribute(req); + clusterDeleteWeightedRoutingRequest = RestClusterDeleteWeightedRoutingAction.createRequest(restRequest); + assertEquals("zone", clusterDeleteWeightedRoutingRequest.getAwarenessAttribute()); + assertEquals(2, clusterDeleteWeightedRoutingRequest.getVersion()); + } + + public void testDeleteRequest_BadRequest() throws IOException { + String req = "{\"_ver\":2}"; + RestRequest restRequest = buildRestRequest(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterDeleteWeightedRoutingAction.createRequest(restRequest)); + + RestRequest restRequest2 = buildRestRequestWithAwarenessAttribute(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterDeleteWeightedRoutingAction.createRequest(restRequest2)); + } + + private RestRequest buildRestRequestWithAwarenessAttribute(String content) { + return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE) + .withPath("/_cluster/routing/awareness/zone/weights") + .withParams(singletonMap("attribute", "zone")) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + } + + private RestRequest buildRestRequest(String content) { + return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE) + .withPath("/_cluster/routing/awareness/weights") + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + } + + public void testCreateRequest_EmptyRequestBody() throws IOException { + String req = "{}"; + RestRequest restRequest = buildRestRequest(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterDeleteWeightedRoutingAction.createRequest(restRequest)); + + RestRequest restRequest2 = buildRestRequestWithAwarenessAttribute(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterDeleteWeightedRoutingAction.createRequest(restRequest2)); + } +}