From 6ab7e706395ffc36d9a07212ee3ed240f0dc1cb6 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 15 Nov 2022 13:15:37 +0530 Subject: [PATCH 01/18] Initial commit for api versioning Signed-off-by: Anshu Agarwal --- .../ClusterGetWeightedRoutingResponse.java | 13 ++++- .../TransportGetWeightedRoutingAction.java | 7 ++- .../put/ClusterPutWeightedRoutingRequest.java | 11 ++++ ...usterPutWeightedRoutingRequestBuilder.java | 4 ++ .../metadata/WeightedRoutingMetadata.java | 44 ++++++++++++--- .../routing/WeightedRoutingService.java | 56 ++++++++++++++----- .../RestClusterPutWeightedRoutingAction.java | 1 + ...lusterGetWeightedRoutingResponseTests.java | 2 +- ...ransportGetWeightedRoutingActionTests.java | 2 +- .../DecommissionServiceTests.java | 2 +- .../WeightedRoutingMetadataTests.java | 2 +- .../routing/OperationRoutingTests.java | 4 +- .../routing/WeightedRoutingServiceTests.java | 2 +- 13 files changed, 118 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java index bb77576b63d20..9fcf29f1c41af 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java @@ -11,6 +11,7 @@ import org.opensearch.OpenSearchParseException; import org.opensearch.action.ActionResponse; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -34,6 +35,7 @@ public class ClusterGetWeightedRoutingResponse extends ActionResponse implements private WeightedRouting weightedRouting; private String localNodeWeight; private static final String NODE_WEIGHT = "node_weight"; + private long version; public String getLocalNodeWeight() { return localNodeWeight; @@ -43,14 +45,16 @@ public String getLocalNodeWeight() { this.weightedRouting = null; } - public ClusterGetWeightedRoutingResponse(String localNodeWeight, WeightedRouting weightedRouting) { + public ClusterGetWeightedRoutingResponse(String localNodeWeight, WeightedRouting weightedRouting, long version) { this.localNodeWeight = localNodeWeight; this.weightedRouting = weightedRouting; + this.version = version; } ClusterGetWeightedRoutingResponse(StreamInput in) throws IOException { if (in.available() != 0) { this.weightedRouting = new WeightedRouting(in); + this.version = in.readLong(); } } @@ -67,6 +71,7 @@ public WeightedRouting weights() { public void writeTo(StreamOutput out) throws IOException { if (weightedRouting != null) { weightedRouting.writeTo(out); + out.writeLong(version); } } @@ -80,6 +85,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (localNodeWeight != null) { builder.field(NODE_WEIGHT, localNodeWeight); } + builder.field(WeightedRoutingMetadata.VERSION, version); } builder.endObject(); return builder; @@ -91,6 +97,7 @@ public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser pars String attrKey = null, attrValue = null; String localNodeWeight = null; Map weights = new HashMap<>(); + long version = -1; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { @@ -99,6 +106,8 @@ public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser pars attrValue = parser.text(); if (attrKey != null && attrKey.equals(NODE_WEIGHT)) { localNodeWeight = attrValue; + } else if (attrKey != null && attrKey.equals(WeightedRoutingMetadata.VERSION)) { + version = Long.parseLong(attrValue); } else if (attrKey != null) { weights.put(attrKey, Double.parseDouble(attrValue)); } @@ -107,7 +116,7 @@ public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser pars } } WeightedRouting weightedRouting = new WeightedRouting("", weights); - return new ClusterGetWeightedRoutingResponse(localNodeWeight, weightedRouting); + return new ClusterGetWeightedRoutingResponse(localNodeWeight, weightedRouting, version); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java index 9421967a5df26..6ebe7e88185ed 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java @@ -100,8 +100,13 @@ protected void clusterManagerOperation( weight = weightedRouting.weights().get(attrVal).toString(); } } + } - clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse(weight, weightedRouting); + clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse( + weight, + weightedRouting, + weightedRoutingMetadata.getVersion() + ); } listener.onResponse(clusterGetWeightedRoutingResponse); } catch (Exception ex) { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java index af229fb12b4f0..76b09a419f720 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java @@ -43,6 +43,15 @@ public class ClusterPutWeightedRoutingRequest extends ClusterManagerNodeRequest< private WeightedRouting weightedRouting; private String attributeName; + private long version; + + public void version(long version) { + this.version = version; + } + + public long getVersion() { + return this.version; + } public ClusterPutWeightedRoutingRequest() {} @@ -62,6 +71,7 @@ public void attributeName(String attributeName) { public ClusterPutWeightedRoutingRequest(StreamInput in) throws IOException { super(in); weightedRouting = new WeightedRouting(in); + version = in.readLong(); } public ClusterPutWeightedRoutingRequest(String attributeName) { @@ -163,6 +173,7 @@ public ClusterPutWeightedRoutingRequest source(Map source) { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); weightedRouting.writeTo(out); + out.writeLong(version); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java index b437f4c54d8d6..adfb2cf02f6d9 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java @@ -30,4 +30,8 @@ public ClusterPutWeightedRoutingRequestBuilder setWeightedRouting(WeightedRoutin return this; } + public ClusterPutWeightedRoutingRequestBuilder setVersion(long version) { + request.version(version); + return this; + } } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java index 07cdc949c4529..f9cabdc299e6c 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java @@ -36,6 +36,14 @@ public class WeightedRoutingMetadata extends AbstractNamedDiffable entry : weightedRouting.weights().entrySet()) { @@ -156,6 +185,7 @@ public static void toXContent(WeightedRouting weightedRouting, XContentBuilder b } builder.endObject(); builder.endObject(); + builder.field(VERSION, version); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java index 6acb4a1e832cb..759eafe80cc29 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.shards.routing.weighted.WeightedRoutingVersionMismatchException; import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; @@ -66,28 +67,56 @@ public void registerWeightedRoutingMetadata( final ClusterPutWeightedRoutingRequest request, final ActionListener listener ) { - final WeightedRoutingMetadata newWeightedRoutingMetadata = new WeightedRoutingMetadata(request.getWeightedRouting()); + final WeightedRouting newWeightedRouting = new WeightedRouting(request.getWeightedRouting()); + + final long requestVersion = request.getVersion(); clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) { @Override - public ClusterState execute(ClusterState currentState) { + public ClusterState execute(ClusterState currentState) throws Exception { // verify currently no decommission action is ongoing ensureNoOngoingDecommissionAction(currentState); - Metadata metadata = currentState.metadata(); - Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); + Metadata metadata; + Metadata.Builder mdBuilder; + + metadata = currentState.metadata(); + mdBuilder = Metadata.builder(currentState.metadata()); WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); + if (weightedRoutingMetadata == null) { - logger.info("put weighted routing weights in metadata [{}]", request.getWeightedRouting()); - weightedRoutingMetadata = new WeightedRoutingMetadata(request.getWeightedRouting()); + logger.info("put weighted routing weights in metadata [{}]", newWeightedRouting); + weightedRoutingMetadata = new WeightedRoutingMetadata( + newWeightedRouting, + WeightedRoutingMetadata.INITIAL_VERSION + ); + } else if (weightedRoutingMetadata.getVersion() != requestVersion) { + throw new WeightedRoutingVersionMismatchException( + String.format( + Locale.ROOT, + "weighted routing " + + "version in request is %s but cluster weighted routing metadata is at a different version %s ", + requestVersion, + weightedRoutingMetadata.getVersion() + ) + ); + } else { - if (!checkIfSameWeightsInMetadata(newWeightedRoutingMetadata, weightedRoutingMetadata)) { - logger.info("updated weighted routing weights [{}] in metadata", request.getWeightedRouting()); - weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRoutingMetadata.getWeightedRouting()); + if (!checkIfSameWeightsInMetadata(request.getWeightedRouting(), weightedRoutingMetadata.getWeightedRouting())) { + logger.info("updated weighted routing weights [{}] in metadata", newWeightedRouting); + weightedRoutingMetadata = new WeightedRoutingMetadata( + newWeightedRouting, + weightedRoutingMetadata.getVersion() + 1 + ); } else { + logger.info( + "weights are same, not updating weighted routing weights [{}] in metadata", + newWeightedRouting + ); return currentState; } } + mdBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); - logger.info("building cluster state with weighted routing weights [{}]", request.getWeightedRouting()); + logger.info("building cluster state with weighted routing weights [{}]", newWeightedRouting); return ClusterState.builder(currentState).metadata(mdBuilder).build(); } @@ -105,11 +134,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } - private boolean checkIfSameWeightsInMetadata( - WeightedRoutingMetadata newWeightedRoutingMetadata, - WeightedRoutingMetadata oldWeightedRoutingMetadata - ) { - return newWeightedRoutingMetadata.getWeightedRouting().equals(oldWeightedRoutingMetadata.getWeightedRouting()); + private boolean checkIfSameWeightsInMetadata(WeightedRouting newWeights, WeightedRouting oldWeights) { + return newWeights.equals(oldWeights); } public void deleteWeightedRoutingMetadata( diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java index 1cf44e665cf84..954a2f1901f0e 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java @@ -51,6 +51,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli public static ClusterPutWeightedRoutingRequest createRequest(RestRequest request) throws IOException { ClusterPutWeightedRoutingRequest putWeightedRoutingRequest = Requests.putWeightedRoutingRequest(request.param("attribute")); + putWeightedRoutingRequest.version(request.paramAsLong("_version", -1)); request.applyContentParser(p -> putWeightedRoutingRequest.source(p.mapStrings())); return putWeightedRoutingRequest; } diff --git a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java index e9add55ca774b..3127eae7c5d8d 100644 --- a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java +++ b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java @@ -21,7 +21,7 @@ public class ClusterGetWeightedRoutingResponseTests extends AbstractXContentTest protected ClusterGetWeightedRoutingResponse createTestInstance() { Map weights = Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 1.0); WeightedRouting weightedRouting = new WeightedRouting("", weights); - ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse("1", weightedRouting); + ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse("1", weightedRouting, 1); return response; } diff --git a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java index f28e932e068ac..c8b22b200dadc 100644 --- a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java +++ b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java @@ -174,7 +174,7 @@ private ClusterState setLocalNode(ClusterState clusterState, String nodeId) { private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights) { WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index 7fe58d75932a1..9338c91e85e11 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -396,7 +396,7 @@ public void onFailure(Exception e) { private void setWeightedRoutingWeights(Map weights) { ClusterState clusterState = clusterService.state(); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java index a0a9d2bd9586b..a2fda8902c6d7 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java @@ -20,7 +20,7 @@ public class WeightedRoutingMetadataTests extends AbstractXContentTestCase weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); return weightedRoutingMetadata; } diff --git a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java index 87cab4a006a63..42e2c60326989 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java @@ -789,7 +789,7 @@ private ClusterState clusterStateForWeightedRouting(String[] indexNames, int num private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights) { WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); @@ -1153,7 +1153,7 @@ private ClusterState updateStatetoTestWeightedRouting( // add weighted routing weights in metadata Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState.metadata(metadataBuilder); clusterState.routingTable(routingTableBuilder.build()); diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java index 91b8703cacf5c..ff3c153655f1e 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java @@ -175,7 +175,7 @@ private ClusterState setLocalNode(ClusterState clusterState, String nodeId) { private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights) { WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); From 71105fd1a3ee39a89be6de58aba00b30dec9fe95 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 15 Nov 2022 16:47:51 +0530 Subject: [PATCH 02/18] Add tests Signed-off-by: Anshu Agarwal --- .../cluster/routing/WeightedRoutingIT.java | 62 +++++++++++++++++++ ...ightedRoutingVersionMismatchException.java | 39 ++++++++++++ 2 files changed, 101 insertions(+) create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/WeightedRoutingVersionMismatchException.java diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java index bba07d878a42c..e0a2443d8330f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java @@ -12,12 +12,15 @@ import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; +import org.opensearch.cluster.ClusterState; import org.opensearch.common.settings.Settings; import org.opensearch.test.OpenSearchIntegTestCase; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import static org.hamcrest.Matchers.equalTo; @@ -348,4 +351,63 @@ public void testDeleteWeightedRouting_WeightsAreSet() { assertTrue(deleteResponse.isAcknowledged()); assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); } + + public void testPutWeightedRoutingWithVersioning() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> starting 6 nodes on different zones"); + int nodeCountPerAZ = 2; + + logger.info("--> starting a dedicated cluster manager node"); + internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build()); + + logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'"); + List nodes_in_zone_a = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() + ); + List nodes_in_zone_b = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() + ); + List nodes_in_zone_c = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("7").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + + logger.info("--> setting shard routing weights for weighted round robin"); + + Future response[] = new Future[50]; + for (int i = 0; i < 50; i++) { + Map weights = new HashMap<>(); + weights.put("a", 1.0); + weights.put("b", 2.0); + double weightc = (double) i; + weights.put("c", weightc); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + response[i] = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(0).execute(); + } + + for (int i = 0; i < 50; i++) { + try { + ClusterPutWeightedRoutingResponse weightedRoutingResponse = response[i].get(); + logger.info("response from request -" + i); + logger.info(weightedRoutingResponse.isAcknowledged()); + } catch (Exception t) { + logger.info("Exception is hit"); + } + } + + ClusterState stateafter = internalCluster().clusterService().state(); + + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/WeightedRoutingVersionMismatchException.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/WeightedRoutingVersionMismatchException.java new file mode 100644 index 0000000000000..13067363ac59c --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/WeightedRoutingVersionMismatchException.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.rest.RestStatus; + +import java.io.IOException; + +public class WeightedRoutingVersionMismatchException extends OpenSearchException { + + public WeightedRoutingVersionMismatchException() { + super(""); + } + + public WeightedRoutingVersionMismatchException(Throwable cause) { + super(cause); + } + + public WeightedRoutingVersionMismatchException(String message) { + super(message); + } + + @Override + public RestStatus status() { + return RestStatus.CONFLICT; + } + + public WeightedRoutingVersionMismatchException(StreamInput in) throws IOException { + super(in); + } +} From 6b18467ce4fb4a20d936b529f432c3e21db11f66 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 13 Dec 2022 17:21:28 +0530 Subject: [PATCH 03/18] Change default version Signed-off-by: Anshu Agarwal --- .../ClusterGetWeightedRoutingResponse.java | 2 +- .../metadata/WeightedRoutingMetadata.java | 4 ++-- .../routing/WeightedRoutingService.java | 22 +++++-------------- .../RestClusterPutWeightedRoutingAction.java | 3 ++- 4 files changed, 11 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java index 9fcf29f1c41af..11c1ea998e89a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java @@ -97,7 +97,7 @@ public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser pars String attrKey = null, attrValue = null; String localNodeWeight = null; Map weights = new HashMap<>(); - long version = -1; + long version = WeightedRoutingMetadata.INITIAL_VERSION; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { diff --git a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java index f9cabdc299e6c..de1cdc0e2d384 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java @@ -37,7 +37,7 @@ public class WeightedRoutingMetadata extends AbstractNamedDiffable putWeightedRoutingRequest.source(p.mapStrings())); return putWeightedRoutingRequest; } From b39369c6bc43a3bf9e23e73bf7ef6c910c5ec96d Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 13 Dec 2022 20:42:00 +0530 Subject: [PATCH 04/18] Fix tests Signed-off-by: Anshu Agarwal --- .../weighted/get/ClusterGetWeightedRoutingResponse.java | 4 ++-- .../weighted/get/ClusterGetWeightedRoutingResponseTests.java | 2 +- .../cluster/metadata/WeightedRoutingMetadataTests.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java index 11c1ea998e89a..0a056e74c365f 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java @@ -85,7 +85,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (localNodeWeight != null) { builder.field(NODE_WEIGHT, localNodeWeight); } - builder.field(WeightedRoutingMetadata.VERSION, version); + builder.field(WeightedRoutingMetadata.VERSION, String.valueOf(version)); } builder.endObject(); return builder; @@ -124,7 +124,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ClusterGetWeightedRoutingResponse that = (ClusterGetWeightedRoutingResponse) o; - return weightedRouting.equals(that.weightedRouting) && localNodeWeight.equals(that.localNodeWeight); + return weightedRouting.equals(that.weightedRouting) && localNodeWeight.equals(that.localNodeWeight) && version == that.version; } @Override diff --git a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java index 3127eae7c5d8d..148f3e1228fcc 100644 --- a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java +++ b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java @@ -21,7 +21,7 @@ public class ClusterGetWeightedRoutingResponseTests extends AbstractXContentTest protected ClusterGetWeightedRoutingResponse createTestInstance() { Map weights = Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 1.0); WeightedRouting weightedRouting = new WeightedRouting("", weights); - ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse("1", weightedRouting, 1); + ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse("1", weightedRouting, -1); return response; } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java index a2fda8902c6d7..17b682618b1a8 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java @@ -20,7 +20,7 @@ public class WeightedRoutingMetadataTests extends AbstractXContentTestCase weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, -1); return weightedRoutingMetadata; } From 04ff159fbadda261563998ad59f4781e9965f775 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Thu, 22 Dec 2022 13:12:58 +0530 Subject: [PATCH 05/18] Change request body schema Signed-off-by: Anshu Agarwal --- .../put/ClusterPutWeightedRoutingRequest.java | 50 ++++++++++++++++--- .../routing/WeightedRoutingService.java | 45 +++++++++-------- .../RestClusterPutWeightedRoutingAction.java | 2 - ...ClusterPutWeightedRoutingRequestTests.java | 19 +++++-- 4 files changed, 84 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java index a25574c3e22b9..b5eedc5ff06ec 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java @@ -44,6 +44,9 @@ public class ClusterPutWeightedRoutingRequest extends ClusterManagerNodeRequest< private String attributeName; private long version; + public static final String VERSION = "_version"; + public static final long VERSION_UNSET_VALUE = -2; + public void version(long version) { this.version = version; } @@ -105,20 +108,52 @@ public void setWeightedRouting(BytesReference source, XContentType contentType) XContentParser.Token token; // move to the first alias parser.nextToken(); + String versionAttr = null; + String weightsAttr = null; + long version = VERSION_UNSET_VALUE; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { - attrValue = parser.currentName(); - } else if (token == XContentParser.Token.VALUE_STRING) { - attrWeight = Double.parseDouble(parser.text()); - weights.put(attrValue, attrWeight); + String fieldName = parser.currentName(); + if (fieldName != null && fieldName.equals(VERSION)) { + versionAttr = parser.currentName(); + continue; + } else { + weightsAttr = parser.currentName(); + } + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new OpenSearchParseException( + "failed to parse weighted routing request object [{}], expected " + "object", + weightsAttr + ); + } + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + attrValue = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + attrWeight = Double.parseDouble(parser.text()); + weights.put(attrValue, attrWeight); + } else { + throw new OpenSearchParseException( + "failed to parse weighted routing request attribute [{}], " + "unknown type", + attrWeight + ); + } + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (versionAttr.equals(VERSION)) { + version = parser.longValue(); + } } else { throw new OpenSearchParseException( - "failed to parse weighted routing request attribute [{}], " + "unknown type", - attrWeight + "failed to parse weighted routing request " + "[{}], unknown " + "type", + attributeName ); } } this.weightedRouting = new WeightedRouting(this.attributeName, weights); + this.version = version; } catch (IOException e) { logger.error("error while parsing put for weighted routing request object", e); } @@ -136,6 +171,9 @@ public ActionRequestValidationException validate() { if (weightedRouting.weights() == null || weightedRouting.weights().isEmpty()) { validationException = addValidationError("Weights are missing", validationException); } + if (version == VERSION_UNSET_VALUE) { + validationException = addValidationError("Version is missing", validationException); + } try { for (Object value : weightedRouting.weights().values()) { if (value == null) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java index 4a9d1f61c3e37..d984c657dde18 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -93,29 +93,34 @@ public ClusterState execute(ClusterState currentState) { Metadata metadata = currentState.metadata(); Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); - if (weightedRoutingMetadata == null) { + if (weightedRoutingMetadata == null && requestVersion == WeightedRoutingMetadata.INITIAL_VERSION) { logger.info("put weighted routing weights in metadata [{}]", newWeightedRouting); - weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRouting, WeightedRoutingMetadata.INITIAL_VERSION); - } else if (weightedRoutingMetadata.getVersion() != requestVersion) { - throw new UnsupportedWeightedRoutingStateException( - String.format( - Locale.ROOT, - "weighted routing " - + "version in request is %s but cluster weighted routing metadata is at a different version %s ", - requestVersion, - weightedRoutingMetadata.getVersion() - ) - ); - - } else { - if (!checkIfSameWeightsInMetadata(request.getWeightedRouting(), weightedRoutingMetadata.getWeightedRouting())) { - logger.info("updated weighted routing weights [{}] in metadata", newWeightedRouting); - weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRouting, weightedRoutingMetadata.getVersion() + 1); + weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRouting, requestVersion + 1); + } else if ((weightedRoutingMetadata == null && requestVersion != WeightedRoutingMetadata.INITIAL_VERSION) + || weightedRoutingMetadata.getVersion() != requestVersion) { + throw new UnsupportedWeightedRoutingStateException( + String.format( + Locale.ROOT, + "weighted routing " + + "version in request is %s but cluster weighted routing metadata is at a different version %s ", + requestVersion, + weightedRoutingMetadata != null + ? weightedRoutingMetadata.getVersion() + : WeightedRoutingMetadata.INITIAL_VERSION + ) + ); } else { - logger.info("weights are same, not updating weighted routing weights [{}] in metadata", newWeightedRouting); - return currentState; + if (!checkIfSameWeightsInMetadata(request.getWeightedRouting(), weightedRoutingMetadata.getWeightedRouting())) { + logger.info("updated weighted routing weights [{}] in metadata", newWeightedRouting); + weightedRoutingMetadata = new WeightedRoutingMetadata( + newWeightedRouting, + weightedRoutingMetadata.getVersion() + 1 + ); + } else { + logger.info("weights are same, not updating weighted routing weights [{}] in metadata", newWeightedRouting); + return currentState; + } } - } mdBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); logger.info("building cluster state with weighted routing weights [{}]", newWeightedRouting); diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java index 021589d6cc218..1cf44e665cf84 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java @@ -13,7 +13,6 @@ import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; import org.opensearch.client.Requests; import org.opensearch.client.node.NodeClient; -import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.RestToXContentListener; @@ -52,7 +51,6 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli public static ClusterPutWeightedRoutingRequest createRequest(RestRequest request) throws IOException { ClusterPutWeightedRoutingRequest putWeightedRoutingRequest = Requests.putWeightedRoutingRequest(request.param("attribute")); - putWeightedRoutingRequest.version(request.paramAsLong("if_version", WeightedRoutingMetadata.INITIAL_VERSION)); request.applyContentParser(p -> putWeightedRoutingRequest.source(p.mapStrings())); return putWeightedRoutingRequest; } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java index cdec66d6683eb..1d236d4d82fa0 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java @@ -19,16 +19,18 @@ public class ClusterPutWeightedRoutingRequestTests extends OpenSearchTestCase { public void testSetWeightedRoutingWeight() { - String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"1\",\"us-east-1a\":\"1\"}"; + String reqString = "{\"weights\":{\"us-east-1c\": \"0\",\"us-east-1b\": \"1\",\"us-east-1a\": \"1\" },\"_version\": 1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); Map weights = Map.of("us-east-1a", 1.0, "us-east-1b", 1.0, "us-east-1c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); - assertEquals(request.getWeightedRouting(), weightedRouting); + assertEquals(weightedRouting, request.getWeightedRouting()); + assertEquals(1, request.getVersion()); } public void testValidate_ValuesAreProper() { - String reqString = "{\"us-east-1c\" : \"1\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}"; + String reqString = "{\"weights\":{\"us-east-1c\": \"0\",\"us-east-1b\": \"1\",\"us-east-1a\": \"1\" },\"_version\": 1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); ActionRequestValidationException actionRequestValidationException = request.validate(); @@ -45,7 +47,7 @@ public void testValidate_MissingWeights() { } public void testValidate_AttributeMissing() { - String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"1\",\"us-east-1a\":\"1\"}"; + String reqString = "{\"weights\":{\"us-east-1c\": \"0\",\"us-east-1b\": \"1\",\"us-east-1a\": \"1\" },\"_version\": 1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest(); request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); ActionRequestValidationException actionRequestValidationException = request.validate(); @@ -53,4 +55,13 @@ public void testValidate_AttributeMissing() { assertTrue(actionRequestValidationException.getMessage().contains("Attribute name is missing")); } + public void testValidate_VersionMissing() { + String reqString = "{\"weights\":{\"us-east-1c\": \"0\",\"us-east-1b\": \"1\",\"us-east-1a\": \"1\" }}"; + ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); + ActionRequestValidationException actionRequestValidationException = request.validate(); + assertNotNull(actionRequestValidationException); + assertTrue(actionRequestValidationException.getMessage().contains("Version is missing")); + } + } From b985aa3b3780f5edf7ec791d2f17d7613303c5b9 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Mon, 26 Dec 2022 20:31:54 +0530 Subject: [PATCH 06/18] Add version support for delete api Signed-off-by: Anshu Agarwal --- .../cluster/routing/WeightedRoutingIT.java | 88 ++++++++++++----- ...ightedRoutingVersionMismatchException.java | 39 -------- .../ClusterDeleteWeightedRoutingRequest.java | 99 ++++++++++++++++++- ...erDeleteWeightedRoutingRequestBuilder.java | 11 +++ .../ClusterGetWeightedRoutingResponse.java | 60 ++++++++--- .../java/org/opensearch/client/Requests.java | 4 +- .../metadata/WeightedRoutingMetadata.java | 10 +- .../cluster/routing/WeightedRouting.java | 11 +++ .../routing/WeightedRoutingService.java | 75 ++++++++------ ...estClusterDeleteWeightedRoutingAction.java | 20 +++- ...sterDeleteWeightedRoutingRequestTests.java | 15 +++ ...lusterGetWeightedRoutingResponseTests.java | 2 +- ...usterDeleteWeightedRoutingActionTests.java | 59 +++++++++++ 13 files changed, 379 insertions(+), 114 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/WeightedRoutingVersionMismatchException.java create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestTests.java create mode 100644 server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java index e0a2443d8330f..2174919a8f15e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java @@ -12,15 +12,14 @@ import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; -import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.rest.RestStatus; import org.opensearch.test.OpenSearchIntegTestCase; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Future; import static org.hamcrest.Matchers.equalTo; @@ -67,6 +66,7 @@ public void testPutWeightedRouting() { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertEquals(response.isAcknowledged(), true); @@ -76,6 +76,7 @@ public void testPutWeightedRouting() { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(0) .get(); assertEquals(response.isAcknowledged(), true); } @@ -202,6 +203,7 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertEquals(response.isAcknowledged(), true); @@ -273,6 +275,7 @@ public void testWeightedRoutingMetadataOnOSProcessRestart() throws Exception { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertEquals(response.isAcknowledged(), true); @@ -342,6 +345,7 @@ public void testDeleteWeightedRouting_WeightsAreSet() { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertEquals(response.isAcknowledged(), true); assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); @@ -386,28 +390,62 @@ public void testPutWeightedRoutingWithVersioning() { logger.info("--> setting shard routing weights for weighted round robin"); - Future response[] = new Future[50]; - for (int i = 0; i < 50; i++) { - Map weights = new HashMap<>(); - weights.put("a", 1.0); - weights.put("b", 2.0); - double weightc = (double) i; - weights.put("c", weightc); - WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - response[i] = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(0).execute(); - } - - for (int i = 0; i < 50; i++) { - try { - ClusterPutWeightedRoutingResponse weightedRoutingResponse = response[i].get(); - logger.info("response from request -" + i); - logger.info(weightedRoutingResponse.isAcknowledged()); - } catch (Exception t) { - logger.info("Exception is hit"); - } - } - - ClusterState stateafter = internalCluster().clusterService().state(); + Map weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + ClusterPutWeightedRoutingResponse response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(-1) + .get(); + assertEquals(response.isAcknowledged(), true); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0); + weightedRouting = new WeightedRouting("zone", weights); + response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(0).get(); + assertEquals(true, response.isAcknowledged()); + + weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0); + WeightedRouting weightedRouting1 = new WeightedRouting("zone", weights); + UnsupportedWeightedRoutingStateException exception = expectThrows( + UnsupportedWeightedRoutingStateException.class, + () -> client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting1).setVersion(100).get() + ); + assertEquals(exception.status(), RestStatus.CONFLICT); + // assertTrue(exception.getMessage().contains("weighted routing metadata does not have weights set for awareness attribute zone")); + + ClusterGetWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareGetWeightedRouting() + .setAwarenessAttribute("zone") + .get(); + + weights = Map.of("a", 1.0, "b", 2.0, "c", 5.0); + weightedRouting = new WeightedRouting("zone", weights); + response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(weightedRoutingResponse.getVersion()) + .get(); + assertEquals(response.isAcknowledged(), true); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin() + .cluster() + .prepareDeleteWeightedRouting() + .setAwarenessAttribute("zone") + .setVersion(2) + .get(); + assertTrue(deleteResponse.isAcknowledged()); + + WeightedRoutingMetadata metadata = internalCluster().clusterService().state().metadata().weightedRoutingMetadata(); + + weights = Map.of("a", 1.0, "b", 2.0, "c", 6.0); + weightedRouting = new WeightedRouting("zone", weights); + response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(3).get(); + assertEquals(response.isAcknowledged(), true); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/WeightedRoutingVersionMismatchException.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/WeightedRoutingVersionMismatchException.java deleted file mode 100644 index 13067363ac59c..0000000000000 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/WeightedRoutingVersionMismatchException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.admin.cluster.shards.routing.weighted; - -import org.opensearch.OpenSearchException; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.rest.RestStatus; - -import java.io.IOException; - -public class WeightedRoutingVersionMismatchException extends OpenSearchException { - - public WeightedRoutingVersionMismatchException() { - super(""); - } - - public WeightedRoutingVersionMismatchException(Throwable cause) { - super(cause); - } - - public WeightedRoutingVersionMismatchException(String message) { - super(message); - } - - @Override - public RestStatus status() { - return RestStatus.CONFLICT; - } - - public WeightedRoutingVersionMismatchException(StreamInput in) throws IOException { - super(in); - } -} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java index 71eab8ff35a2d..5b63f90ef93ff 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java @@ -8,12 +8,26 @@ package org.opensearch.action.admin.cluster.shards.routing.weighted.delete; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchGenerationException; +import org.opensearch.OpenSearchParseException; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.DeprecationHandler; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; import java.io.IOException; +import java.util.Map; /** * Request to delete weights for weighted round-robin shard routing policy. @@ -21,10 +35,37 @@ * @opensearch.internal */ public class ClusterDeleteWeightedRoutingRequest extends ClusterManagerNodeRequest { + private static final Logger logger = LogManager.getLogger(ClusterDeleteWeightedRoutingRequest.class); + + public void setVersion(long version) { + this.version = version; + } + + private long version; + private String awarenessAttribute; + public ClusterDeleteWeightedRoutingRequest() {} public ClusterDeleteWeightedRoutingRequest(StreamInput in) throws IOException { super(in); + version = in.readLong(); + awarenessAttribute = in.readString(); + } + + public long getVersion() { + return version; + } + + public String getAwarenessAttribute() { + return awarenessAttribute; + } + + public void setAwarenessAttribute(String awarenessAttribute) { + this.awarenessAttribute = awarenessAttribute; + } + + public ClusterDeleteWeightedRoutingRequest(String awarenessAttribute) { + this.awarenessAttribute = awarenessAttribute; } @Override @@ -32,13 +73,69 @@ public ActionRequestValidationException validate() { return null; } + /** + * @param source weights definition from request body + * @return this request + */ + public ClusterDeleteWeightedRoutingRequest source(Map source) { + try { + if (source.isEmpty()) { + throw new OpenSearchParseException(("Empty request body")); + } + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.map(source); + setRequestBody(BytesReference.bytes(builder), builder.contentType()); + } catch (IOException e) { + throw new OpenSearchGenerationException("Failed to generate [" + source + "]", e); + } + return this; + } + + public void setRequestBody(BytesReference source, XContentType contentType) { + try ( + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + source, + contentType + ) + ) { + String versionAttr = null; + XContentParser.Token token; + // move to the first alias + parser.nextToken(); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + String fieldName = parser.currentName(); + if (fieldName != null && fieldName.equals(WeightedRoutingMetadata.VERSION)) { + versionAttr = parser.currentName(); + continue; + } else if (token == XContentParser.Token.VALUE_STRING) { + if (versionAttr.equals(WeightedRoutingMetadata.VERSION)) { + this.version = Long.parseLong(parser.text()); + } + } else { + throw new OpenSearchParseException( + "failed to parse delete weighted routing request body [{}], " + "unknown type", + versionAttr + ); + } + } + } + } catch (IOException e) { + logger.error("error while parsing delete for weighted routing request object", e); + } + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + out.writeLong(version); + out.writeString(awarenessAttribute); } @Override public String toString() { - return "ClusterDeleteWeightedRoutingRequest"; + return "ClusterDeleteWeightedRoutingRequest{" + "version= " + version + "awarenessAttribute=" + awarenessAttribute + "}"; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java index 19976ac6b07aa..bb34fea589534 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java @@ -24,4 +24,15 @@ public class ClusterDeleteWeightedRoutingRequestBuilder extends ClusterManagerNo public ClusterDeleteWeightedRoutingRequestBuilder(OpenSearchClient client, ClusterDeleteWeightedRoutingAction action) { super(client, action, new ClusterDeleteWeightedRoutingRequest()); } + + public ClusterDeleteWeightedRoutingRequestBuilder setVersion(long version) { + request.setVersion(version); + return this; + } + + public ClusterDeleteWeightedRoutingRequestBuilder setAwarenessAttribute(String attribute) { + request.setAwarenessAttribute(attribute); + return this; + } + } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java index 0a056e74c365f..e88946326e4cd 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java @@ -35,6 +35,11 @@ public class ClusterGetWeightedRoutingResponse extends ActionResponse implements private WeightedRouting weightedRouting; private String localNodeWeight; private static final String NODE_WEIGHT = "node_weight"; + + public long getVersion() { + return version; + } + private long version; public String getLocalNodeWeight() { @@ -79,13 +84,15 @@ public void writeTo(StreamOutput out) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); if (this.weightedRouting != null) { + builder.startObject("weights"); for (Map.Entry entry : weightedRouting.weights().entrySet()) { builder.field(entry.getKey(), entry.getValue().toString()); } if (localNodeWeight != null) { builder.field(NODE_WEIGHT, localNodeWeight); } - builder.field(WeightedRoutingMetadata.VERSION, String.valueOf(version)); + builder.endObject(); + builder.field(WeightedRoutingMetadata.VERSION, version); } builder.endObject(); return builder; @@ -98,23 +105,54 @@ public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser pars String localNodeWeight = null; Map weights = new HashMap<>(); long version = WeightedRoutingMetadata.INITIAL_VERSION; + String versionAttr = null; + String weightsAttr = null; + Double attrWeight = null; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { - attrKey = parser.currentName(); - } else if (token == XContentParser.Token.VALUE_STRING) { - attrValue = parser.text(); - if (attrKey != null && attrKey.equals(NODE_WEIGHT)) { - localNodeWeight = attrValue; - } else if (attrKey != null && attrKey.equals(WeightedRoutingMetadata.VERSION)) { - version = Long.parseLong(attrValue); - } else if (attrKey != null) { - weights.put(attrKey, Double.parseDouble(attrValue)); + String fieldName = parser.currentName(); + if (fieldName != null && fieldName.equals(WeightedRoutingMetadata.VERSION)) { + versionAttr = parser.currentName(); + continue; + } else { + weightsAttr = parser.currentName(); + } + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new OpenSearchParseException( + "failed to parse weighted routing request object [{}], expected " + "object", + weightsAttr + ); + } + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + attrKey = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + attrValue = parser.text(); + if (attrKey != null && attrKey.equals(NODE_WEIGHT)) { + localNodeWeight = attrValue; + } else if (attrKey != null && attrKey.equals(WeightedRoutingMetadata.VERSION)) { + version = Long.parseLong(attrValue); + } else if (attrKey != null) { + weights.put(attrKey, Double.parseDouble(attrValue)); + } + } else { + throw new OpenSearchParseException( + "failed to parse weighted routing request attribute [{}], " + "unknown type", + attrWeight + ); + } + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (versionAttr.equals(WeightedRoutingMetadata.VERSION)) { + version = parser.longValue(); } } else { - throw new OpenSearchParseException("failed to parse weighted routing response"); + throw new OpenSearchParseException("failed to parse weighted routing request " + "[{}], unknown " + "type", "fail"); } } + WeightedRouting weightedRouting = new WeightedRouting("", weights); return new ClusterGetWeightedRoutingResponse(localNodeWeight, weightedRouting, version); } diff --git a/server/src/main/java/org/opensearch/client/Requests.java b/server/src/main/java/org/opensearch/client/Requests.java index 21f2a2d906602..cad5bac8acf0d 100644 --- a/server/src/main/java/org/opensearch/client/Requests.java +++ b/server/src/main/java/org/opensearch/client/Requests.java @@ -578,8 +578,8 @@ public static ClusterGetWeightedRoutingRequest getWeightedRoutingRequest(String * * @return delete weight request */ - public static ClusterDeleteWeightedRoutingRequest deleteWeightedRoutingRequest() { - return new ClusterDeleteWeightedRoutingRequest(); + public static ClusterDeleteWeightedRoutingRequest deleteWeightedRoutingRequest(String attributeName) { + return new ClusterDeleteWeightedRoutingRequest(attributeName); } /** diff --git a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java index de1cdc0e2d384..4efe6c62cf93b 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java @@ -179,12 +179,14 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par public static void toXContent(WeightedRouting weightedRouting, XContentBuilder builder, long version) throws IOException { builder.startObject(AWARENESS); - builder.startObject(weightedRouting.attributeName()); - for (Map.Entry entry : weightedRouting.weights().entrySet()) { - builder.field(entry.getKey(), entry.getValue()); + if (weightedRouting.isSet()) { + builder.startObject(weightedRouting.attributeName()); + for (Map.Entry entry : weightedRouting.weights().entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + builder.endObject(); } builder.endObject(); - builder.endObject(); builder.field(VERSION, version); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java index df2d8d595eaab..3117f4c7cd001 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java @@ -13,6 +13,7 @@ import org.opensearch.common.io.stream.Writeable; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -25,6 +26,11 @@ public class WeightedRouting implements Writeable { private String attributeName; private Map weights; + public WeightedRouting() { + this.attributeName = ""; + this.weights = new HashMap<>(); + } + public WeightedRouting(String attributeName, Map weights) { this.attributeName = attributeName; this.weights = weights; @@ -40,6 +46,11 @@ public WeightedRouting(StreamInput in) throws IOException { weights = (Map) in.readGenericValue(); } + public boolean isSet() { + if (this.attributeName.isEmpty() || this.weights.isEmpty()) return false; + return true; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(attributeName); diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java index d984c657dde18..b8e14d86aa391 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest; import org.opensearch.action.ActionRequestValidationException; @@ -93,35 +94,19 @@ public ClusterState execute(ClusterState currentState) { Metadata metadata = currentState.metadata(); Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); - if (weightedRoutingMetadata == null && requestVersion == WeightedRoutingMetadata.INITIAL_VERSION) { + ensureNoVersionConflict(requestVersion, weightedRoutingMetadata); + if (weightedRoutingMetadata == null) { logger.info("put weighted routing weights in metadata [{}]", newWeightedRouting); weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRouting, requestVersion + 1); - } else if ((weightedRoutingMetadata == null && requestVersion != WeightedRoutingMetadata.INITIAL_VERSION) - || weightedRoutingMetadata.getVersion() != requestVersion) { - throw new UnsupportedWeightedRoutingStateException( - String.format( - Locale.ROOT, - "weighted routing " - + "version in request is %s but cluster weighted routing metadata is at a different version %s ", - requestVersion, - weightedRoutingMetadata != null - ? weightedRoutingMetadata.getVersion() - : WeightedRoutingMetadata.INITIAL_VERSION - ) - ); + } else { + if (!checkIfSameWeightsInMetadata(request.getWeightedRouting(), weightedRoutingMetadata.getWeightedRouting())) { + logger.info("updated weighted routing weights [{}] in metadata", newWeightedRouting); + weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRouting, weightedRoutingMetadata.getVersion() + 1); } else { - if (!checkIfSameWeightsInMetadata(request.getWeightedRouting(), weightedRoutingMetadata.getWeightedRouting())) { - logger.info("updated weighted routing weights [{}] in metadata", newWeightedRouting); - weightedRoutingMetadata = new WeightedRoutingMetadata( - newWeightedRouting, - weightedRoutingMetadata.getVersion() + 1 - ); - } else { - logger.info("weights are same, not updating weighted routing weights [{}] in metadata", newWeightedRouting); - return currentState; - } + logger.info("weights are same, not updating weighted routing weights [{}] in metadata", newWeightedRouting); + return currentState; } - + } mdBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); logger.info("building cluster state with weighted routing weights [{}]", newWeightedRouting); return ClusterState.builder(currentState).metadata(mdBuilder).build(); @@ -149,12 +134,33 @@ public void deleteWeightedRoutingMetadata( final ClusterDeleteWeightedRoutingRequest request, final ActionListener listener ) { + final long requestVersion = request.getVersion(); + final String awarenessAttribute = request.getAwarenessAttribute(); clusterService.submitStateUpdateTask("delete_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { logger.info("Deleting weighted routing metadata from the cluster state"); + Metadata metadata = currentState.metadata(); Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); - mdBuilder.removeCustom(WeightedRoutingMetadata.TYPE); + WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); + ensureNoVersionConflict(requestVersion, weightedRoutingMetadata); + + if ((weightedRoutingMetadata != null && awarenessAttribute == null) + || (weightedRoutingMetadata != null + && weightedRoutingMetadata.getWeightedRouting().attributeName().equals(awarenessAttribute))) { + weightedRoutingMetadata = new WeightedRoutingMetadata(new WeightedRouting(), weightedRoutingMetadata.getVersion() + 1); + } else { + throw new ResourceNotFoundException( + String.format( + Locale.ROOT, + "weighted routing metadata does not have weights set for awareness attribute %s", + awarenessAttribute + ) + ); + } + + mdBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + logger.info("building cluster state with weighted routing weights deleted"); return ClusterState.builder(currentState).metadata(mdBuilder).build(); } @@ -167,7 +173,6 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { logger.debug("cluster weighted routing metadata change is processed by all the nodes"); - assert newState.metadata().weightedRoutingMetadata() == null; listener.onResponse(new ClusterDeleteWeightedRoutingResponse(true)); } }); @@ -197,7 +202,7 @@ public void verifyAwarenessAttribute(String attributeName) { if (getAwarenessAttributes().contains(attributeName) == false) { ActionRequestValidationException validationException = null; validationException = addValidationError( - String.format(Locale.ROOT, "invalid awareness attribute %s requested for updating weighted routing", attributeName), + String.format(Locale.ROOT, "invalid awareness attribute %s requested for weighted routing", attributeName), validationException ); throw validationException; @@ -263,4 +268,18 @@ private void ensureDecommissionedAttributeHasZeroWeight(ClusterState state, Clus ); } } + + private void ensureNoVersionConflict(long requestedVersion, WeightedRoutingMetadata weightedRoutingMetadata) { + if ((weightedRoutingMetadata == null && requestedVersion != WeightedRoutingMetadata.INITIAL_VERSION) + || (weightedRoutingMetadata != null && requestedVersion != weightedRoutingMetadata.getVersion())) { + throw new UnsupportedWeightedRoutingStateException( + String.format( + Locale.ROOT, + "weighted routing " + "version in request is %s but cluster weighted routing metadata is at a different version %s ", + requestedVersion, + weightedRoutingMetadata != null ? weightedRoutingMetadata.getVersion() : WeightedRoutingMetadata.INITIAL_VERSION + ) + ); + } + } } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java index 9742cc373d520..d9dedf8d14506 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java @@ -20,7 +20,8 @@ import java.io.IOException; import java.util.List; -import static java.util.Collections.singletonList; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; import static org.opensearch.rest.RestRequest.Method.DELETE; /** @@ -35,7 +36,12 @@ public class RestClusterDeleteWeightedRoutingAction extends BaseRestHandler { @Override public List routes() { - return singletonList(new Route(DELETE, "/_cluster/routing/awareness/weights")); + return unmodifiableList( + asList( + new Route(DELETE, "/_cluster/routing/awareness/weights"), + new Route(DELETE, "/_cluster/routing/awareness/{attribute}/weights") + ) + ); } @Override @@ -45,9 +51,17 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = Requests.deleteWeightedRoutingRequest(); + ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = createRequest(request); return channel -> client.admin() .cluster() .deleteWeightedRouting(clusterDeleteWeightedRoutingRequest, new RestToXContentListener<>(channel)); } + + public static ClusterDeleteWeightedRoutingRequest createRequest(RestRequest request) throws IOException { + ClusterDeleteWeightedRoutingRequest deleteWeightedRoutingRequest = Requests.deleteWeightedRoutingRequest( + request.param("attribute") + ); + request.applyContentParser(p -> deleteWeightedRoutingRequest.source(p.mapStrings())); + return deleteWeightedRoutingRequest; + } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestTests.java new file mode 100644 index 0000000000000..bff4a7b8a5f51 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestTests.java @@ -0,0 +1,15 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.delete; + +import org.opensearch.test.OpenSearchTestCase; + +public class ClusterDeleteWeightedRoutingRequestTests extends OpenSearchTestCase { + +} diff --git a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java index 148f3e1228fcc..b17d3b789e03f 100644 --- a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java +++ b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java @@ -21,7 +21,7 @@ public class ClusterGetWeightedRoutingResponseTests extends AbstractXContentTest protected ClusterGetWeightedRoutingResponse createTestInstance() { Map weights = Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 1.0); WeightedRouting weightedRouting = new WeightedRouting("", weights); - ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse("1", weightedRouting, -1); + ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse("1", weightedRouting, 3); return response; } diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java new file mode 100644 index 0000000000000..00059dcc39517 --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java @@ -0,0 +1,59 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.admin.cluster; + +import org.junit.Before; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.test.rest.RestActionTestCase; + +import java.io.IOException; + +import static java.util.Collections.singletonMap; + +public class RestClusterDeleteWeightedRoutingActionTests extends RestActionTestCase { + private RestClusterPutWeightedRoutingAction action; + + @Before + public void setupAction() { + action = new RestClusterPutWeightedRoutingAction(); + controller().registerHandler(action); + } + + public void testDeleteRequest_SupportedRequestBody() throws IOException { + String req = "{\"_version\":2}"; + RestRequest restRequest = buildRestRequest(req); + ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = RestClusterDeleteWeightedRoutingAction.createRequest( + restRequest + ); + assertEquals("zone", clusterDeleteWeightedRoutingRequest.getAwarenessAttribute()); + assertEquals(2, clusterDeleteWeightedRoutingRequest.getVersion()); + } + + public void testDeleteRequest_VersionNotProvided() throws IOException { + String req = "{\"_ver\":2}"; + RestRequest restRequest = buildRestRequest(req); + ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = RestClusterDeleteWeightedRoutingAction.createRequest( + restRequest + ); + assertEquals("zone", clusterDeleteWeightedRoutingRequest.getAwarenessAttribute()); + assertEquals(2, clusterDeleteWeightedRoutingRequest.getVersion()); + } + + private RestRequest buildRestRequest(String content) { + return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE) + .withPath("/_cluster/routing/awareness/zone/weights") + .withParams(singletonMap("attribute", "zone")) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + } +} From 3c286e65f4559036c9361c27d5b25f9e058ace0a Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Mon, 26 Dec 2022 22:34:07 +0530 Subject: [PATCH 07/18] Fix precommit build error Signed-off-by: Anshu Agarwal --- .../put/ClusterPutWeightedRoutingRequestTests.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java index 1d236d4d82fa0..a0071506390a8 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java @@ -19,7 +19,7 @@ public class ClusterPutWeightedRoutingRequestTests extends OpenSearchTestCase { public void testSetWeightedRoutingWeight() { - String reqString = "{\"weights\":{\"us-east-1c\": \"0\",\"us-east-1b\": \"1\",\"us-east-1a\": \"1\" },\"_version\": 1}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"1\",\"us-east-1a\":\"1\"},\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); Map weights = Map.of("us-east-1a", 1.0, "us-east-1b", 1.0, "us-east-1c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); @@ -30,7 +30,7 @@ public void testSetWeightedRoutingWeight() { } public void testValidate_ValuesAreProper() { - String reqString = "{\"weights\":{\"us-east-1c\": \"0\",\"us-east-1b\": \"1\",\"us-east-1a\": \"1\" },\"_version\": 1}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"1\",\"us-east-1a\":\"1\"},\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); ActionRequestValidationException actionRequestValidationException = request.validate(); @@ -47,7 +47,7 @@ public void testValidate_MissingWeights() { } public void testValidate_AttributeMissing() { - String reqString = "{\"weights\":{\"us-east-1c\": \"0\",\"us-east-1b\": \"1\",\"us-east-1a\": \"1\" },\"_version\": 1}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"1\",\"us-east-1a\": \"1\"},\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest(); request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); ActionRequestValidationException actionRequestValidationException = request.validate(); @@ -56,7 +56,7 @@ public void testValidate_AttributeMissing() { } public void testValidate_VersionMissing() { - String reqString = "{\"weights\":{\"us-east-1c\": \"0\",\"us-east-1b\": \"1\",\"us-east-1a\": \"1\" }}"; + String reqString = "{\"weights\":{\"us-east-1c\": \"0\",\"us-east-1b\": \"1\",\"us-east-1a\": \"1\"}}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); ActionRequestValidationException actionRequestValidationException = request.validate(); From 68fc34dc1db17de11262669a67cba162f546125e Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 27 Dec 2022 04:50:08 +0530 Subject: [PATCH 08/18] Fix tests Signed-off-by: Anshu Agarwal --- .../AwarenessAttributeDecommissionRestIT.java | 1 + .../ClusterDeleteWeightedRoutingRequest.java | 20 ++++++----- .../routing/WeightedRoutingServiceTests.java | 4 +-- ...usterDeleteWeightedRoutingActionTests.java | 34 +++++++++++++++---- 4 files changed, 42 insertions(+), 17 deletions(-) diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/AwarenessAttributeDecommissionRestIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/AwarenessAttributeDecommissionRestIT.java index 4d9115b8962ea..b7228a75984fa 100644 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/AwarenessAttributeDecommissionRestIT.java +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/AwarenessAttributeDecommissionRestIT.java @@ -91,6 +91,7 @@ public void testRestStatusForAcknowledgedDecommission() throws IOException { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java index 5b63f90ef93ff..cb40983b53310 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java @@ -44,7 +44,9 @@ public void setVersion(long version) { private long version; private String awarenessAttribute; - public ClusterDeleteWeightedRoutingRequest() {} + public ClusterDeleteWeightedRoutingRequest() { + + } public ClusterDeleteWeightedRoutingRequest(StreamInput in) throws IOException { super(in); @@ -66,6 +68,7 @@ public void setAwarenessAttribute(String awarenessAttribute) { public ClusterDeleteWeightedRoutingRequest(String awarenessAttribute) { this.awarenessAttribute = awarenessAttribute; + this.version = WeightedRoutingMetadata.INITIAL_VERSION; } @Override @@ -109,17 +112,18 @@ public void setRequestBody(BytesReference source, XContentType contentType) { String fieldName = parser.currentName(); if (fieldName != null && fieldName.equals(WeightedRoutingMetadata.VERSION)) { versionAttr = parser.currentName(); - continue; - } else if (token == XContentParser.Token.VALUE_STRING) { - if (versionAttr.equals(WeightedRoutingMetadata.VERSION)) { - this.version = Long.parseLong(parser.text()); - } } else { throw new OpenSearchParseException( - "failed to parse delete weighted routing request body [{}], " + "unknown type", - versionAttr + "failed to parse delete weighted routing request body [{}], unknown type", + fieldName ); } + } else if (token == XContentParser.Token.VALUE_STRING) { + if (versionAttr != null && versionAttr.equals(WeightedRoutingMetadata.VERSION)) { + this.version = Long.parseLong(parser.text()); + } + } else { + throw new OpenSearchParseException("failed to parse delete weighted routing request body"); } } } catch (IOException e) { diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java index 3bb91c7e7f406..26b484f01e826 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java @@ -260,13 +260,13 @@ public void testDeleteWeightedRoutingMetadata() throws InterruptedException { ClusterState.Builder builder = ClusterState.builder(state); ClusterServiceUtils.setState(clusterService, builder); - ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = new ClusterDeleteWeightedRoutingRequest(); + ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = new ClusterDeleteWeightedRoutingRequest("zone"); + clusterDeleteWeightedRoutingRequest.setVersion(0); final CountDownLatch countDownLatch = new CountDownLatch(1); ActionListener listener = new ActionListener() { @Override public void onResponse(ClusterDeleteWeightedRoutingResponse clusterDeleteWeightedRoutingResponse) { assertTrue(clusterDeleteWeightedRoutingResponse.isAcknowledged()); - assertNull(clusterService.state().metadata().weightedRoutingMetadata()); countDownLatch.countDown(); } diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java index 00059dcc39517..2589d68e4cf0b 100644 --- a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java @@ -9,6 +9,7 @@ package org.opensearch.rest.action.admin.cluster; import org.junit.Before; +import org.opensearch.OpenSearchParseException; import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.xcontent.XContentType; @@ -35,25 +36,44 @@ public void testDeleteRequest_SupportedRequestBody() throws IOException { ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = RestClusterDeleteWeightedRoutingAction.createRequest( restRequest ); + assertEquals(2, clusterDeleteWeightedRoutingRequest.getVersion()); + + restRequest = buildRestRequestWithAwarenessAttribute(req); + clusterDeleteWeightedRoutingRequest = RestClusterDeleteWeightedRoutingAction.createRequest(restRequest); assertEquals("zone", clusterDeleteWeightedRoutingRequest.getAwarenessAttribute()); assertEquals(2, clusterDeleteWeightedRoutingRequest.getVersion()); } - public void testDeleteRequest_VersionNotProvided() throws IOException { + public void testDeleteRequest_BadRequest() throws IOException { String req = "{\"_ver\":2}"; RestRequest restRequest = buildRestRequest(req); - ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = RestClusterDeleteWeightedRoutingAction.createRequest( - restRequest - ); - assertEquals("zone", clusterDeleteWeightedRoutingRequest.getAwarenessAttribute()); - assertEquals(2, clusterDeleteWeightedRoutingRequest.getVersion()); + assertThrows(OpenSearchParseException.class, () -> RestClusterDeleteWeightedRoutingAction.createRequest(restRequest)); + + RestRequest restRequest2 = buildRestRequestWithAwarenessAttribute(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterDeleteWeightedRoutingAction.createRequest(restRequest2)); } - private RestRequest buildRestRequest(String content) { + private RestRequest buildRestRequestWithAwarenessAttribute(String content) { return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE) .withPath("/_cluster/routing/awareness/zone/weights") .withParams(singletonMap("attribute", "zone")) .withContent(new BytesArray(content), XContentType.JSON) .build(); } + + private RestRequest buildRestRequest(String content) { + return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE) + .withPath("/_cluster/routing/awareness/weights") + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + } + + public void testCreateRequest_EmptyRequestBody() throws IOException { + String req = "{}"; + RestRequest restRequest = buildRestRequest(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterDeleteWeightedRoutingAction.createRequest(restRequest)); + + RestRequest restRequest2 = buildRestRequestWithAwarenessAttribute(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterDeleteWeightedRoutingAction.createRequest(restRequest2)); + } } From ff0a144bf13d22179912773ebef5f9d1b1a95416 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 27 Dec 2022 05:07:54 +0530 Subject: [PATCH 09/18] Refactor code Signed-off-by: Anshu Agarwal --- .../put/ClusterPutWeightedRoutingRequest.java | 12 +++++------- .../cluster/metadata/WeightedRoutingMetadata.java | 1 + 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java index b5eedc5ff06ec..d3c2823f2153c 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java @@ -14,6 +14,7 @@ import org.opensearch.OpenSearchParseException; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.StreamInput; @@ -44,9 +45,6 @@ public class ClusterPutWeightedRoutingRequest extends ClusterManagerNodeRequest< private String attributeName; private long version; - public static final String VERSION = "_version"; - public static final long VERSION_UNSET_VALUE = -2; - public void version(long version) { this.version = version; } @@ -110,12 +108,12 @@ public void setWeightedRouting(BytesReference source, XContentType contentType) parser.nextToken(); String versionAttr = null; String weightsAttr = null; - long version = VERSION_UNSET_VALUE; + long version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { String fieldName = parser.currentName(); - if (fieldName != null && fieldName.equals(VERSION)) { + if (fieldName != null && fieldName.equals(WeightedRoutingMetadata.VERSION)) { versionAttr = parser.currentName(); continue; } else { @@ -142,7 +140,7 @@ public void setWeightedRouting(BytesReference source, XContentType contentType) } } } else if (token == XContentParser.Token.VALUE_NUMBER) { - if (versionAttr.equals(VERSION)) { + if (versionAttr != null && versionAttr.equals(WeightedRoutingMetadata.VERSION)) { version = parser.longValue(); } } else { @@ -171,7 +169,7 @@ public ActionRequestValidationException validate() { if (weightedRouting.weights() == null || weightedRouting.weights().isEmpty()) { validationException = addValidationError("Weights are missing", validationException); } - if (version == VERSION_UNSET_VALUE) { + if (version == WeightedRoutingMetadata.VERSION_UNSET_VALUE) { validationException = addValidationError("Version is missing", validationException); } try { diff --git a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java index 4efe6c62cf93b..8d3562a51a3e2 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java @@ -38,6 +38,7 @@ public class WeightedRoutingMetadata extends AbstractNamedDiffable Date: Tue, 27 Dec 2022 05:45:01 +0530 Subject: [PATCH 10/18] Fix tests Signed-off-by: Anshu Agarwal --- .../put/ClusterPutWeightedRoutingRequest.java | 4 ++-- .../cluster/RestClusterPutWeightedRoutingAction.java | 2 +- .../RestClusterAddWeightedRoutingActionTests.java | 11 ++++++----- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java index d3c2823f2153c..6d0d2ab5899a8 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java @@ -78,7 +78,7 @@ public ClusterPutWeightedRoutingRequest(String attributeName) { this.attributeName = attributeName; } - public void setWeightedRouting(Map source) { + public void setWeightedRouting(Map source) { try { if (source.isEmpty()) { throw new OpenSearchParseException(("Empty request body")); @@ -190,7 +190,7 @@ public ActionRequestValidationException validate() { * @param source weights definition from request body * @return this request */ - public ClusterPutWeightedRoutingRequest source(Map source) { + public ClusterPutWeightedRoutingRequest source(Map source) { setWeightedRouting(source); return this; } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java index 1cf44e665cf84..5f845b7a66c1f 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java @@ -51,7 +51,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli public static ClusterPutWeightedRoutingRequest createRequest(RestRequest request) throws IOException { ClusterPutWeightedRoutingRequest putWeightedRoutingRequest = Requests.putWeightedRoutingRequest(request.param("attribute")); - request.applyContentParser(p -> putWeightedRoutingRequest.source(p.mapStrings())); + request.applyContentParser(p -> putWeightedRoutingRequest.source(p.mapOrdered())); return putWeightedRoutingRequest; } diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java index a4cd6224217b7..582fbfce315b2 100644 --- a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java @@ -34,14 +34,15 @@ public void setupAction() { } public void testCreateRequest_SupportedRequestBody() throws IOException { - String req = "{\"us-east-1c\" : \"1\", \"us-east-1d\":\"1.0\", \"us-east-1a\":\"0.0\"}"; + String req = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"1\",\"us-east-1a\":\"1\"},\"_version\":1}"; RestRequest restRequest = buildRestRequest(req); ClusterPutWeightedRoutingRequest clusterPutWeightedRoutingRequest = RestClusterPutWeightedRoutingAction.createRequest(restRequest); assertEquals("zone", clusterPutWeightedRoutingRequest.getWeightedRouting().attributeName()); assertNotNull(clusterPutWeightedRoutingRequest.getWeightedRouting().weights()); - assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1c").toString()); - assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1d").toString()); - assertEquals("0.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1a").toString()); + assertEquals("0.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1c").toString()); + assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1b").toString()); + assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1a").toString()); + assertEquals(1, clusterPutWeightedRoutingRequest.getVersion()); } public void testCreateRequest_UnsupportedRequestBody() throws IOException { @@ -54,7 +55,7 @@ public void testCreateRequest_UnsupportedRequestBody() throws IOException { public void testCreateRequest_MalformedRequestBody() throws IOException { Map params = new HashMap<>(); - String req = "{\"us-east-1c\" : \1\", \"us-east-1d\":\"1\", \"us-east-1a\":\"0\"}"; + String req = "{\"weights\":{\"us-east-1c\":\"0,\"us-east-1b\":\"1\",\"us-east-1a\":\"1\"},\"_version\":1}"; RestRequest restRequest = buildRestRequest(req); assertThrows(JsonParseException.class, () -> RestClusterPutWeightedRoutingAction.createRequest(restRequest)); } From b91cce1d974485b48445610ba6ef04d054543bc1 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 27 Dec 2022 15:22:30 +0530 Subject: [PATCH 11/18] Refactor code Signed-off-by: Anshu Agarwal --- .../cluster/routing/WeightedRoutingIT.java | 60 ++++++++++--------- .../ClusterDeleteWeightedRoutingRequest.java | 20 ++++--- .../ClusterGetWeightedRoutingResponse.java | 28 ++++----- .../put/ClusterPutWeightedRoutingRequest.java | 12 ++-- .../metadata/WeightedRoutingMetadata.java | 17 +++--- .../routing/WeightedRoutingService.java | 6 +- ...sterDeleteWeightedRoutingRequestTests.java | 15 ----- 7 files changed, 75 insertions(+), 83 deletions(-) delete mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java index 2174919a8f15e..ee647dfcecf85 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java @@ -8,11 +8,11 @@ package org.opensearch.cluster.routing; +import org.opensearch.ResourceNotFoundException; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; -import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.rest.RestStatus; import org.opensearch.test.OpenSearchIntegTestCase; @@ -313,9 +313,11 @@ public void testDeleteWeightedRouting_WeightsNotSet() { assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); // delete weighted routing metadata - ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get(); - assertTrue(deleteResponse.isAcknowledged()); - assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + ResourceNotFoundException exception = expectThrows( + ResourceNotFoundException.class, + () -> client().admin().cluster().prepareDeleteWeightedRouting().setVersion(-1).get() + ); + assertEquals(RestStatus.NOT_FOUND, exception.status()); } public void testDeleteWeightedRouting_WeightsAreSet() { @@ -347,16 +349,15 @@ public void testDeleteWeightedRouting_WeightsAreSet() { .setWeightedRouting(weightedRouting) .setVersion(-1) .get(); - assertEquals(response.isAcknowledged(), true); + assertTrue(response.isAcknowledged()); assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); // delete weighted routing metadata - ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get(); + ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get(); assertTrue(deleteResponse.isAcknowledged()); - assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); } - public void testPutWeightedRoutingWithVersioning() { + public void testPutAndDeleteWithVersioning() { Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") @@ -369,18 +370,9 @@ public void testPutWeightedRoutingWithVersioning() { internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build()); logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'"); - List nodes_in_zone_a = internalCluster().startDataOnlyNodes( - nodeCountPerAZ, - Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() - ); - List nodes_in_zone_b = internalCluster().startDataOnlyNodes( - nodeCountPerAZ, - Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() - ); - List nodes_in_zone_c = internalCluster().startDataOnlyNodes( - nodeCountPerAZ, - Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() - ); + internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()); + internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()); + internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()); logger.info("--> waiting for nodes to form a cluster"); ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("7").execute().actionGet(); @@ -398,14 +390,16 @@ public void testPutWeightedRoutingWithVersioning() { .setWeightedRouting(weightedRouting) .setVersion(-1) .get(); - assertEquals(response.isAcknowledged(), true); + assertTrue(response.isAcknowledged()); assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + // update weights api call with correct version number weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0); weightedRouting = new WeightedRouting("zone", weights); response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(0).get(); - assertEquals(true, response.isAcknowledged()); + assertTrue(response.isAcknowledged()); + // update weights api call with incorrect version number weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0); WeightedRouting weightedRouting1 = new WeightedRouting("zone", weights); UnsupportedWeightedRoutingStateException exception = expectThrows( @@ -413,14 +407,15 @@ public void testPutWeightedRoutingWithVersioning() { () -> client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting1).setVersion(100).get() ); assertEquals(exception.status(), RestStatus.CONFLICT); - // assertTrue(exception.getMessage().contains("weighted routing metadata does not have weights set for awareness attribute zone")); + // get weights call ClusterGetWeightedRoutingResponse weightedRoutingResponse = client().admin() .cluster() .prepareGetWeightedRouting() .setAwarenessAttribute("zone") .get(); + // update weights call using version returned by get api call weights = Map.of("a", 1.0, "b", 2.0, "c", 5.0); weightedRouting = new WeightedRouting("zone", weights); response = client().admin() @@ -429,9 +424,10 @@ public void testPutWeightedRoutingWithVersioning() { .setWeightedRouting(weightedRouting) .setVersion(weightedRoutingResponse.getVersion()) .get(); - assertEquals(response.isAcknowledged(), true); + assertTrue(response.isAcknowledged()); assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + // delete weights by awareness attribute ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin() .cluster() .prepareDeleteWeightedRouting() @@ -440,12 +436,22 @@ public void testPutWeightedRoutingWithVersioning() { .get(); assertTrue(deleteResponse.isAcknowledged()); - WeightedRoutingMetadata metadata = internalCluster().clusterService().state().metadata().weightedRoutingMetadata(); - + // update weights again and make sure that version number got updated on delete weights = Map.of("a", 1.0, "b", 2.0, "c", 6.0); weightedRouting = new WeightedRouting("zone", weights); response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(3).get(); - assertEquals(response.isAcknowledged(), true); + assertTrue(response.isAcknowledged()); assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + // delete weights + deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(4).get(); + assertTrue(deleteResponse.isAcknowledged()); + + // delete weights call, incorrect version number + UnsupportedWeightedRoutingStateException deleteException = expectThrows( + UnsupportedWeightedRoutingStateException.class, + () -> client().admin().cluster().prepareDeleteWeightedRouting().setVersion(7).get() + ); + assertEquals(RestStatus.CONFLICT, deleteException.status()); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java index cb40983b53310..12c45e12bb612 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java @@ -37,21 +37,23 @@ public class ClusterDeleteWeightedRoutingRequest extends ClusterManagerNodeRequest { private static final Logger logger = LogManager.getLogger(ClusterDeleteWeightedRoutingRequest.class); + private long version; + private String awarenessAttribute; + public void setVersion(long version) { this.version = version; } - private long version; - private String awarenessAttribute; - public ClusterDeleteWeightedRoutingRequest() { - + this.version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; } public ClusterDeleteWeightedRoutingRequest(StreamInput in) throws IOException { super(in); version = in.readLong(); - awarenessAttribute = in.readString(); + if (in.available() != 0) { + awarenessAttribute = in.readString(); + } } public long getVersion() { @@ -68,7 +70,7 @@ public void setAwarenessAttribute(String awarenessAttribute) { public ClusterDeleteWeightedRoutingRequest(String awarenessAttribute) { this.awarenessAttribute = awarenessAttribute; - this.version = WeightedRoutingMetadata.INITIAL_VERSION; + this.version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; } @Override @@ -127,7 +129,7 @@ public void setRequestBody(BytesReference source, XContentType contentType) { } } } catch (IOException e) { - logger.error("error while parsing delete for weighted routing request object", e); + logger.error("error while parsing delete request for weighted routing request object", e); } } @@ -135,7 +137,9 @@ public void setRequestBody(BytesReference source, XContentType contentType) { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeLong(version); - out.writeString(awarenessAttribute); + if (awarenessAttribute != null) { + out.writeString(awarenessAttribute); + } } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java index e88946326e4cd..d0f0d46da280b 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java @@ -35,13 +35,13 @@ public class ClusterGetWeightedRoutingResponse extends ActionResponse implements private WeightedRouting weightedRouting; private String localNodeWeight; private static final String NODE_WEIGHT = "node_weight"; + private static final String WEIGHTS = "weights"; + private long version; public long getVersion() { return version; } - private long version; - public String getLocalNodeWeight() { return localNodeWeight; } @@ -84,7 +84,7 @@ public void writeTo(StreamOutput out) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); if (this.weightedRouting != null) { - builder.startObject("weights"); + builder.startObject(WEIGHTS); for (Map.Entry entry : weightedRouting.weights().entrySet()) { builder.field(entry.getKey(), entry.getValue().toString()); } @@ -101,13 +101,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser parser) throws IOException { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); XContentParser.Token token; - String attrKey = null, attrValue = null; + String attrKey = null, attrValue; String localNodeWeight = null; Map weights = new HashMap<>(); - long version = WeightedRoutingMetadata.INITIAL_VERSION; + long version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; String versionAttr = null; - String weightsAttr = null; - Double attrWeight = null; + String weightsAttr; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { @@ -115,12 +114,14 @@ public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser pars if (fieldName != null && fieldName.equals(WeightedRoutingMetadata.VERSION)) { versionAttr = parser.currentName(); continue; - } else { + } else if (fieldName != null && fieldName.equals(WEIGHTS)) { weightsAttr = parser.currentName(); + } else { + throw new OpenSearchParseException("failed to parse weighted routing request object", fieldName); } if (parser.nextToken() != XContentParser.Token.START_OBJECT) { throw new OpenSearchParseException( - "failed to parse weighted routing request object [{}], expected " + "object", + "failed to parse weighted routing request object [{}], expected object", weightsAttr ); } @@ -138,18 +139,15 @@ public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser pars weights.put(attrKey, Double.parseDouble(attrValue)); } } else { - throw new OpenSearchParseException( - "failed to parse weighted routing request attribute [{}], " + "unknown type", - attrWeight - ); + throw new OpenSearchParseException("failed to parse weighted routing request attribute [{}]", attrKey); } } } else if (token == XContentParser.Token.VALUE_NUMBER) { - if (versionAttr.equals(WeightedRoutingMetadata.VERSION)) { + if (versionAttr != null && versionAttr.equals(WeightedRoutingMetadata.VERSION)) { version = parser.longValue(); } } else { - throw new OpenSearchParseException("failed to parse weighted routing request " + "[{}], unknown " + "type", "fail"); + throw new OpenSearchParseException("failed to parse weighted routing request"); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java index 6d0d2ab5899a8..c8e631db20c97 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java @@ -107,7 +107,7 @@ public void setWeightedRouting(BytesReference source, XContentType contentType) // move to the first alias parser.nextToken(); String versionAttr = null; - String weightsAttr = null; + String weightsAttr; long version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { @@ -116,12 +116,14 @@ public void setWeightedRouting(BytesReference source, XContentType contentType) if (fieldName != null && fieldName.equals(WeightedRoutingMetadata.VERSION)) { versionAttr = parser.currentName(); continue; - } else { + } else if (fieldName != null && fieldName.equals("weights")) { weightsAttr = parser.currentName(); + } else { + throw new OpenSearchParseException("failed to parse weighted routing request object [{}]", fieldName); } if (parser.nextToken() != XContentParser.Token.START_OBJECT) { throw new OpenSearchParseException( - "failed to parse weighted routing request object [{}], expected " + "object", + "failed to parse weighted routing request object [{}], expected object", weightsAttr ); } @@ -153,7 +155,7 @@ public void setWeightedRouting(BytesReference source, XContentType contentType) this.weightedRouting = new WeightedRouting(this.attributeName, weights); this.version = version; } catch (IOException e) { - logger.error("error while parsing put for weighted routing request object", e); + logger.error("error while parsing put weighted routing request object", e); } } @@ -204,7 +206,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { - return "ClusterPutWeightedRoutingRequest{" + "weightedRouting= " + weightedRouting.toString() + "}"; + return "ClusterPutWeightedRoutingRequest{" + "weightedRouting= " + weightedRouting.toString() + "version= " + version + "}"; } } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java index 8d3562a51a3e2..763917a174880 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java @@ -100,11 +100,11 @@ public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws Double attrValue; String attributeName = null; Map weights = new HashMap<>(); - WeightedRouting weightedRouting = null; + WeightedRouting weightedRouting; XContentParser.Token token; - String awarenessField = null; + String awarenessField; String versionAttr = null; - long version = INITIAL_VERSION; + long version = VERSION_UNSET_VALUE; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { @@ -116,16 +116,13 @@ public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws awarenessField = parser.currentName(); } if (parser.nextToken() != XContentParser.Token.START_OBJECT) { - throw new OpenSearchParseException( - "failed to parse weighted routing metadata [{}], expected " + "object", - awarenessField - ); + throw new OpenSearchParseException("failed to parse weighted routing metadata [{}], expected object", awarenessField); } while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { attributeName = parser.currentName(); if (parser.nextToken() != XContentParser.Token.START_OBJECT) { throw new OpenSearchParseException( - "failed to parse weighted routing metadata [{}], expected" + " object", + "failed to parse weighted routing metadata [{}], expected object", attributeName ); } @@ -134,7 +131,7 @@ public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws attrKey = parser.currentName(); } else if (token == XContentParser.Token.VALUE_NUMBER) { - if (attrKey != null && attrKey.equals("_version")) { + if (attrKey != null && attrKey.equals(VERSION)) { version = Long.parseLong(parser.text()); } else { attrValue = Double.parseDouble(parser.text()); @@ -150,7 +147,7 @@ public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws } } } else if (token == XContentParser.Token.VALUE_STRING) { - if (versionAttr.equals(VERSION)) { + if (versionAttr != null && versionAttr.equals(VERSION)) { version = Long.parseLong(parser.text()); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java index b8e14d86aa391..e83d92332678e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -96,10 +96,10 @@ public ClusterState execute(ClusterState currentState) { WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); ensureNoVersionConflict(requestVersion, weightedRoutingMetadata); if (weightedRoutingMetadata == null) { - logger.info("put weighted routing weights in metadata [{}]", newWeightedRouting); + logger.info("add weighted routing weights in metadata [{}]", newWeightedRouting); weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRouting, requestVersion + 1); } else { - if (!checkIfSameWeightsInMetadata(request.getWeightedRouting(), weightedRoutingMetadata.getWeightedRouting())) { + if (!checkIfSameWeightsInMetadata(newWeightedRouting, weightedRoutingMetadata.getWeightedRouting())) { logger.info("updated weighted routing weights [{}] in metadata", newWeightedRouting); weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRouting, weightedRoutingMetadata.getVersion() + 1); } else { @@ -275,7 +275,7 @@ private void ensureNoVersionConflict(long requestedVersion, WeightedRoutingMetad throw new UnsupportedWeightedRoutingStateException( String.format( Locale.ROOT, - "weighted routing " + "version in request is %s but cluster weighted routing metadata is at a different version %s ", + "requested version is %s but cluster weighted routing metadata is at a " + "different version %s ", requestedVersion, weightedRoutingMetadata != null ? weightedRoutingMetadata.getVersion() : WeightedRoutingMetadata.INITIAL_VERSION ) diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestTests.java deleted file mode 100644 index bff4a7b8a5f51..0000000000000 --- a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestTests.java +++ /dev/null @@ -1,15 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.admin.cluster.shards.routing.weighted.delete; - -import org.opensearch.test.OpenSearchTestCase; - -public class ClusterDeleteWeightedRoutingRequestTests extends OpenSearchTestCase { - -} From fdf3f442519cc25d772e1aace1e53fd173ab4e7e Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 27 Dec 2022 15:26:18 +0530 Subject: [PATCH 12/18] Fix condition to check for weighted shard routing Signed-off-by: Anshu Agarwal --- .../java/org/opensearch/cluster/routing/OperationRouting.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index 9026da667ccb0..a4b4cc961fade 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -324,7 +324,7 @@ private ShardIterator shardRoutings( @Nullable Map nodeCounts, @Nullable WeightedRoutingMetadata weightedRoutingMetadata ) { - if (weightedRoutingMetadata != null) { + if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet()) { return indexShard.activeInitializingShardsWeightedIt( weightedRoutingMetadata.getWeightedRouting(), nodes, From 63316b5829a9fc246c0464caa0833d7cc6bd5c24 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 3 Jan 2023 12:22:46 +0530 Subject: [PATCH 13/18] GC delete for weights api Signed-off-by: Anshu Agarwal --- .../cluster/routing/WeightedRoutingIT.java | 15 ++-- .../routing/WeightedRoutingService.java | 78 +++++++++++++++++++ .../common/settings/ClusterSettings.java | 2 + 3 files changed, 90 insertions(+), 5 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java index ee647dfcecf85..3e5aa80743f2b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.equalTo; @@ -361,6 +362,7 @@ public void testPutAndDeleteWithVersioning() { Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("weighted_routing.gc_deletes", 10, TimeUnit.SECONDS) .build(); logger.info("--> starting 6 nodes on different zones"); @@ -436,17 +438,20 @@ public void testPutAndDeleteWithVersioning() { .get(); assertTrue(deleteResponse.isAcknowledged()); + // sleeping for 10 sec to ensure that weighted routing weights is gc deleted ie version is reset to -1 + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + // update weights again and make sure that version number got updated on delete weights = Map.of("a", 1.0, "b", 2.0, "c", 6.0); weightedRouting = new WeightedRouting("zone", weights); - response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(3).get(); + response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(-1).get(); assertTrue(response.isAcknowledged()); assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); - // delete weights - deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(4).get(); - assertTrue(deleteResponse.isAcknowledged()); - // delete weights call, incorrect version number UnsupportedWeightedRoutingStateException deleteException = expectThrows( UnsupportedWeightedRoutingStateException.class, diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java index e83d92332678e..1e94102495584 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -31,7 +31,10 @@ import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; import java.util.HashMap; @@ -41,6 +44,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import static org.opensearch.action.ValidateActions.addValidationError; import static org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING; @@ -55,6 +59,22 @@ public class WeightedRoutingService { private volatile List awarenessAttributes; private volatile Map> forcedAwarenessAttributes; private static final Double DECOMMISSIONED_AWARENESS_VALUE_WEIGHT = 0.0; + volatile Scheduler.Cancellable cancellable; + + /** + * setting to enable / disable weighted routing deletes garbage collection. + * This setting is realtime updatable + */ + public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60); + public static final Setting WEIGHTS_GC_DELETES_SETTING = Setting.timeSetting( + "weighted_routing.gc_deletes", + DEFAULT_GC_DELETES, + new TimeValue(-1, TimeUnit.MILLISECONDS), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private volatile long gcDeletesInMillis = DEFAULT_GC_DELETES.millis(); @Inject public WeightedRoutingService( @@ -75,6 +95,17 @@ public WeightedRoutingService( CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, this::setForcedAwarenessAttributes ); + + this.gcDeletesInMillis = WEIGHTS_GC_DELETES_SETTING.get(settings).getMillis(); + clusterSettings.addSettingsUpdateConsumer(WEIGHTS_GC_DELETES_SETTING, this::setGCDeletes); + } + + private void setGCDeletes(TimeValue timeValue) { + this.gcDeletesInMillis = timeValue.getMillis(); + } + + public long getGcDeletesInMillis() { + return gcDeletesInMillis; } public void registerWeightedRoutingMetadata( @@ -174,6 +205,7 @@ public void onFailure(String source, Exception e) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { logger.debug("cluster weighted routing metadata change is processed by all the nodes"); listener.onResponse(new ClusterDeleteWeightedRoutingResponse(true)); + scheduleVersionReset(); } }); } @@ -282,4 +314,50 @@ private void ensureNoVersionConflict(long requestedVersion, WeightedRoutingMetad ); } } + + private void scheduleVersionReset() { + cancellable = this.threadPool.schedule( + resetWeightedRoutingVersion(), + TimeValue.timeValueMillis(getGcDeletesInMillis()), + ThreadPool.Names.SAME + ); + + } + + private Runnable resetWeightedRoutingVersion() { + clusterService.submitStateUpdateTask("reset_weighted_routing_version", new ClusterStateUpdateTask(Priority.URGENT) { + @Override + public ClusterState execute(ClusterState currentState) { + logger.info("Resetting weighted routing weights version in cluster state metadata"); + Metadata metadata = currentState.metadata(); + Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); + WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); + if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting() != null) { + weightedRoutingMetadata = new WeightedRoutingMetadata( + weightedRoutingMetadata.getWeightedRouting(), + WeightedRoutingMetadata.INITIAL_VERSION + ); + } + + mdBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + logger.info("building cluster state after resetting weighted routing version "); + return ClusterState.builder(currentState).metadata(mdBuilder).build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.error("failed to reset weighted routing weights version in cluster metadata", e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.debug("cluster weighted routing version reset is processed by all the nodes"); + if (cancellable != null) { + cancellable.cancel(); + } + } + }); + + return null; + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 7e9c7bd3123c5..5e0642bfcb4fb 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -32,6 +32,7 @@ package org.opensearch.common.settings; import org.apache.logging.log4j.LogManager; +import org.opensearch.cluster.routing.WeightedRoutingService; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; import org.opensearch.action.search.CreatePitController; import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; @@ -538,6 +539,7 @@ public void apply(Settings value, Settings current, Settings previous) { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, OperationRouting.IGNORE_AWARENESS_ATTRIBUTES_SETTING, OperationRouting.WEIGHTED_ROUTING_DEFAULT_WEIGHT, + WeightedRoutingService.WEIGHTS_GC_DELETES_SETTING, IndexGraveyard.SETTING_MAX_TOMBSTONES, PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, From e4b8da01e4f17e9a485bfc6fade31e9caca3d38b Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 3 Jan 2023 12:58:42 +0530 Subject: [PATCH 14/18] Fix precommit build error Signed-off-by: Anshu Agarwal --- .../opensearch/cluster/routing/WeightedRoutingIT.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java index 3e5aa80743f2b..2bfb5e5015b77 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java @@ -321,7 +321,7 @@ public void testDeleteWeightedRouting_WeightsNotSet() { assertEquals(RestStatus.NOT_FOUND, exception.status()); } - public void testDeleteWeightedRouting_WeightsAreSet() { + public void testDeleteWeightedRouting_WeightsAreSet() throws IOException { Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") @@ -358,7 +358,7 @@ public void testDeleteWeightedRouting_WeightsAreSet() { assertTrue(deleteResponse.isAcknowledged()); } - public void testPutAndDeleteWithVersioning() { + public void testPutAndDeleteWithVersioning() throws IOException { Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") @@ -439,11 +439,7 @@ public void testPutAndDeleteWithVersioning() { assertTrue(deleteResponse.isAcknowledged()); // sleeping for 10 sec to ensure that weighted routing weights is gc deleted ie version is reset to -1 - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + Thread.sleep(10000); // update weights again and make sure that version number got updated on delete weights = Map.of("a", 1.0, "b", 2.0, "c", 6.0); From a87fe4c13a4149aced329daae99933f1caab0508 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 3 Jan 2023 13:56:44 +0530 Subject: [PATCH 15/18] Add changelog Signed-off-by: Anshu Agarwal --- CHANGELOG.md | 1 + .../java/org/opensearch/cluster/routing/WeightedRoutingIT.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e589cf4cc866f..60d10d2906c01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615))) - Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348)) - Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459)) +- Support versioning for Weighted routing apis([#5255](https://github.com/opensearch-project/OpenSearch/pull/5255)) ### Dependencies diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java index 2bfb5e5015b77..83d802c3b7888 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java @@ -358,7 +358,7 @@ public void testDeleteWeightedRouting_WeightsAreSet() throws IOException { assertTrue(deleteResponse.isAcknowledged()); } - public void testPutAndDeleteWithVersioning() throws IOException { + public void testPutAndDeleteWithVersioning() throws Exception { Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") From f7d230b78bef4f09fbf3df4f83bcd61e9f777e3d Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 3 Jan 2023 15:15:25 +0530 Subject: [PATCH 16/18] Fix test Signed-off-by: Anshu Agarwal --- .../opensearch/cluster/metadata/WeightedRoutingMetadata.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java index 763917a174880..320f75a9f2ada 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java @@ -146,7 +146,7 @@ public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws } } } - } else if (token == XContentParser.Token.VALUE_STRING) { + } else if (token == XContentParser.Token.VALUE_NUMBER) { if (versionAttr != null && versionAttr.equals(VERSION)) { version = Long.parseLong(parser.text()); } From 1efc435fede7e160e8ff70a18f306ea8f8913cd4 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Wed, 4 Jan 2023 13:46:20 +0530 Subject: [PATCH 17/18] Fix tests Signed-off-by: Anshu Agarwal --- .../weighted/put/ClusterPutWeightedRoutingRequestTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java index 4f3ae77e805c1..ac7b664721fd4 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java @@ -56,7 +56,8 @@ public void testValidate_AttributeMissing() { } public void testValidate_MoreThanHalfWithZeroWeight() { - String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"0\",\"us-east-1a\": \"1\"}," + + "\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); ActionRequestValidationException actionRequestValidationException = request.validate(); From 04b7db1063155aee369dc0121e59193d574ea833 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Wed, 4 Jan 2023 13:57:06 +0530 Subject: [PATCH 18/18] Revert version reset on delete Signed-off-by: Anshu Agarwal --- .../cluster/routing/WeightedRoutingIT.java | 11 ++- .../routing/WeightedRoutingService.java | 77 ------------------- .../common/settings/ClusterSettings.java | 2 - ...ClusterPutWeightedRoutingRequestTests.java | 3 +- 4 files changed, 6 insertions(+), 87 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java index 83d802c3b7888..ac590a17c2875 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.equalTo; @@ -362,7 +361,6 @@ public void testPutAndDeleteWithVersioning() throws Exception { Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") - .put("weighted_routing.gc_deletes", 10, TimeUnit.SECONDS) .build(); logger.info("--> starting 6 nodes on different zones"); @@ -438,16 +436,17 @@ public void testPutAndDeleteWithVersioning() throws Exception { .get(); assertTrue(deleteResponse.isAcknowledged()); - // sleeping for 10 sec to ensure that weighted routing weights is gc deleted ie version is reset to -1 - Thread.sleep(10000); - // update weights again and make sure that version number got updated on delete weights = Map.of("a", 1.0, "b", 2.0, "c", 6.0); weightedRouting = new WeightedRouting("zone", weights); - response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(-1).get(); + response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(3).get(); assertTrue(response.isAcknowledged()); assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + // delete weights + deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(4).get(); + assertTrue(deleteResponse.isAcknowledged()); + // delete weights call, incorrect version number UnsupportedWeightedRoutingStateException deleteException = expectThrows( UnsupportedWeightedRoutingStateException.class, diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java index d6c55f38e0dd4..ef1a196d751c3 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -33,10 +33,7 @@ import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; import java.util.HashMap; @@ -46,7 +43,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.action.ValidateActions.addValidationError; @@ -62,22 +58,6 @@ public class WeightedRoutingService { private volatile List awarenessAttributes; private volatile Map> forcedAwarenessAttributes; private static final Double DECOMMISSIONED_AWARENESS_VALUE_WEIGHT = 0.0; - volatile Scheduler.Cancellable cancellable; - - /** - * setting to enable / disable weighted routing deletes garbage collection. - * This setting is realtime updatable - */ - public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60); - public static final Setting WEIGHTS_GC_DELETES_SETTING = Setting.timeSetting( - "weighted_routing.gc_deletes", - DEFAULT_GC_DELETES, - new TimeValue(-1, TimeUnit.MILLISECONDS), - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - private volatile long gcDeletesInMillis = DEFAULT_GC_DELETES.millis(); @Inject public WeightedRoutingService( @@ -99,16 +79,6 @@ public WeightedRoutingService( this::setForcedAwarenessAttributes ); - this.gcDeletesInMillis = WEIGHTS_GC_DELETES_SETTING.get(settings).getMillis(); - clusterSettings.addSettingsUpdateConsumer(WEIGHTS_GC_DELETES_SETTING, this::setGCDeletes); - } - - private void setGCDeletes(TimeValue timeValue) { - this.gcDeletesInMillis = timeValue.getMillis(); - } - - public long getGcDeletesInMillis() { - return gcDeletesInMillis; } public void registerWeightedRoutingMetadata( @@ -208,7 +178,6 @@ public void onFailure(String source, Exception e) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { logger.debug("cluster weighted routing metadata change is processed by all the nodes"); listener.onResponse(new ClusterDeleteWeightedRoutingResponse(true)); - scheduleVersionReset(); } }); } @@ -338,50 +307,4 @@ private void ensureNoVersionConflict(long requestedVersion, WeightedRoutingMetad ); } } - - private void scheduleVersionReset() { - cancellable = this.threadPool.schedule( - resetWeightedRoutingVersion(), - TimeValue.timeValueMillis(getGcDeletesInMillis()), - ThreadPool.Names.SAME - ); - - } - - private Runnable resetWeightedRoutingVersion() { - clusterService.submitStateUpdateTask("reset_weighted_routing_version", new ClusterStateUpdateTask(Priority.URGENT) { - @Override - public ClusterState execute(ClusterState currentState) { - logger.info("Resetting weighted routing weights version in cluster state metadata"); - Metadata metadata = currentState.metadata(); - Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); - WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); - if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting() != null) { - weightedRoutingMetadata = new WeightedRoutingMetadata( - weightedRoutingMetadata.getWeightedRouting(), - WeightedRoutingMetadata.INITIAL_VERSION - ); - } - - mdBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); - logger.info("building cluster state after resetting weighted routing version "); - return ClusterState.builder(currentState).metadata(mdBuilder).build(); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error("failed to reset weighted routing weights version in cluster metadata", e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - logger.debug("cluster weighted routing version reset is processed by all the nodes"); - if (cancellable != null) { - cancellable.cancel(); - } - } - }); - - return null; - } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 5e0642bfcb4fb..7e9c7bd3123c5 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -32,7 +32,6 @@ package org.opensearch.common.settings; import org.apache.logging.log4j.LogManager; -import org.opensearch.cluster.routing.WeightedRoutingService; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; import org.opensearch.action.search.CreatePitController; import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; @@ -539,7 +538,6 @@ public void apply(Settings value, Settings current, Settings previous) { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, OperationRouting.IGNORE_AWARENESS_ATTRIBUTES_SETTING, OperationRouting.WEIGHTED_ROUTING_DEFAULT_WEIGHT, - WeightedRoutingService.WEIGHTS_GC_DELETES_SETTING, IndexGraveyard.SETTING_MAX_TOMBSTONES, PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java index ac7b664721fd4..9d0ed8e03d7f2 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java @@ -56,8 +56,7 @@ public void testValidate_AttributeMissing() { } public void testValidate_MoreThanHalfWithZeroWeight() { - String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"0\",\"us-east-1a\": \"1\"}," + - "\"_version\":1}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"0\",\"us-east-1a\": \"1\"}," + "\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); ActionRequestValidationException actionRequestValidationException = request.validate();