Skip to content

Commit

Permalink
Merge pull request #13 from anshu1106/feature/api-versioning
Browse files Browse the repository at this point in the history
Feature/api versioning
  • Loading branch information
bharath-techie authored Jan 4, 2023
2 parents 241bd42 + 04b7db1 commit bf5287c
Show file tree
Hide file tree
Showing 25 changed files with 601 additions and 91 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615)))
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))
- Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459))
- Support versioning for Weighted routing apis([#5255](https://github.com/opensearch-project/OpenSearch/pull/5255))
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public void testRestStatusForAcknowledgedDecommission() throws IOException {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

package org.opensearch.cluster.routing;

import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
Expand Down Expand Up @@ -64,6 +66,7 @@ public void testPutWeightedRouting() {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

Expand All @@ -73,6 +76,7 @@ public void testPutWeightedRouting() {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(0)
.get();
assertEquals(response.isAcknowledged(), true);
}
Expand Down Expand Up @@ -199,6 +203,7 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

Expand Down Expand Up @@ -270,6 +275,7 @@ public void testWeightedRoutingMetadataOnOSProcessRestart() throws Exception {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

Expand Down Expand Up @@ -307,12 +313,14 @@ public void testDeleteWeightedRouting_WeightsNotSet() {
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weighted routing metadata
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
assertTrue(deleteResponse.isAcknowledged());
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
ResourceNotFoundException exception = expectThrows(
ResourceNotFoundException.class,
() -> client().admin().cluster().prepareDeleteWeightedRouting().setVersion(-1).get()
);
assertEquals(RestStatus.NOT_FOUND, exception.status());
}

public void testDeleteWeightedRouting_WeightsAreSet() {
public void testDeleteWeightedRouting_WeightsAreSet() throws IOException {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
Expand All @@ -339,13 +347,111 @@ public void testDeleteWeightedRouting_WeightsAreSet() {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weighted routing metadata
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get();
assertTrue(deleteResponse.isAcknowledged());
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
}

public void testPutAndDeleteWithVersioning() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> starting 6 nodes on different zones");
int nodeCountPerAZ = 2;

logger.info("--> starting a dedicated cluster manager node");
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'");
internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build());
internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "b").build());
internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "c").build());

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("7").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");

Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// update weights api call with correct version number
weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0);
weightedRouting = new WeightedRouting("zone", weights);
response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(0).get();
assertTrue(response.isAcknowledged());

// update weights api call with incorrect version number
weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0);
WeightedRouting weightedRouting1 = new WeightedRouting("zone", weights);
UnsupportedWeightedRoutingStateException exception = expectThrows(
UnsupportedWeightedRoutingStateException.class,
() -> client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting1).setVersion(100).get()
);
assertEquals(exception.status(), RestStatus.CONFLICT);

// get weights call
ClusterGetWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareGetWeightedRouting()
.setAwarenessAttribute("zone")
.get();

// update weights call using version returned by get api call
weights = Map.of("a", 1.0, "b", 2.0, "c", 5.0);
weightedRouting = new WeightedRouting("zone", weights);
response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(weightedRoutingResponse.getVersion())
.get();
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weights by awareness attribute
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin()
.cluster()
.prepareDeleteWeightedRouting()
.setAwarenessAttribute("zone")
.setVersion(2)
.get();
assertTrue(deleteResponse.isAcknowledged());

// update weights again and make sure that version number got updated on delete
weights = Map.of("a", 1.0, "b", 2.0, "c", 6.0);
weightedRouting = new WeightedRouting("zone", weights);
response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(3).get();
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weights
deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(4).get();
assertTrue(deleteResponse.isAcknowledged());

// delete weights call, incorrect version number
UnsupportedWeightedRoutingStateException deleteException = expectThrows(
UnsupportedWeightedRoutingStateException.class,
() -> client().admin().cluster().prepareDeleteWeightedRouting().setVersion(7).get()
);
assertEquals(RestStatus.CONFLICT, deleteException.status());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,142 @@

package org.opensearch.action.admin.cluster.shards.routing.weighted.delete;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchGenerationException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.DeprecationHandler;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;

import java.io.IOException;
import java.util.Map;

/**
* Request to delete weights for weighted round-robin shard routing policy.
*
* @opensearch.internal
*/
public class ClusterDeleteWeightedRoutingRequest extends ClusterManagerNodeRequest<ClusterDeleteWeightedRoutingRequest> {
public ClusterDeleteWeightedRoutingRequest() {}
private static final Logger logger = LogManager.getLogger(ClusterDeleteWeightedRoutingRequest.class);

private long version;
private String awarenessAttribute;

public void setVersion(long version) {
this.version = version;
}

public ClusterDeleteWeightedRoutingRequest() {
this.version = WeightedRoutingMetadata.VERSION_UNSET_VALUE;
}

public ClusterDeleteWeightedRoutingRequest(StreamInput in) throws IOException {
super(in);
version = in.readLong();
if (in.available() != 0) {
awarenessAttribute = in.readString();
}
}

public long getVersion() {
return version;
}

public String getAwarenessAttribute() {
return awarenessAttribute;
}

public void setAwarenessAttribute(String awarenessAttribute) {
this.awarenessAttribute = awarenessAttribute;
}

public ClusterDeleteWeightedRoutingRequest(String awarenessAttribute) {
this.awarenessAttribute = awarenessAttribute;
this.version = WeightedRoutingMetadata.VERSION_UNSET_VALUE;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

/**
* @param source weights definition from request body
* @return this request
*/
public ClusterDeleteWeightedRoutingRequest source(Map<String, String> source) {
try {
if (source.isEmpty()) {
throw new OpenSearchParseException(("Empty request body"));
}
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.map(source);
setRequestBody(BytesReference.bytes(builder), builder.contentType());
} catch (IOException e) {
throw new OpenSearchGenerationException("Failed to generate [" + source + "]", e);
}
return this;
}

public void setRequestBody(BytesReference source, XContentType contentType) {
try (
XContentParser parser = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
source,
contentType
)
) {
String versionAttr = null;
XContentParser.Token token;
// move to the first alias
parser.nextToken();
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
String fieldName = parser.currentName();
if (fieldName != null && fieldName.equals(WeightedRoutingMetadata.VERSION)) {
versionAttr = parser.currentName();
} else {
throw new OpenSearchParseException(
"failed to parse delete weighted routing request body [{}], unknown type",
fieldName
);
}
} else if (token == XContentParser.Token.VALUE_STRING) {
if (versionAttr != null && versionAttr.equals(WeightedRoutingMetadata.VERSION)) {
this.version = Long.parseLong(parser.text());
}
} else {
throw new OpenSearchParseException("failed to parse delete weighted routing request body");
}
}
} catch (IOException e) {
logger.error("error while parsing delete request for weighted routing request object", e);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(version);
if (awarenessAttribute != null) {
out.writeString(awarenessAttribute);
}
}

@Override
public String toString() {
return "ClusterDeleteWeightedRoutingRequest";
return "ClusterDeleteWeightedRoutingRequest{" + "version= " + version + "awarenessAttribute=" + awarenessAttribute + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,15 @@ public class ClusterDeleteWeightedRoutingRequestBuilder extends ClusterManagerNo
public ClusterDeleteWeightedRoutingRequestBuilder(OpenSearchClient client, ClusterDeleteWeightedRoutingAction action) {
super(client, action, new ClusterDeleteWeightedRoutingRequest());
}

public ClusterDeleteWeightedRoutingRequestBuilder setVersion(long version) {
request.setVersion(version);
return this;
}

public ClusterDeleteWeightedRoutingRequestBuilder setAwarenessAttribute(String attribute) {
request.setAwarenessAttribute(attribute);
return this;
}

}
Loading

0 comments on commit bf5287c

Please sign in to comment.