Skip to content

Commit

Permalink
Parameter improvements to Cluster Health API wait for shards (#20223)
Browse files Browse the repository at this point in the history
* Params improvements to Cluster Health API wait for shards

Previously, the cluster health API used a strictly numeric value
for `wait_for_active_shards`. However, with the introduction of
ActiveShardCount and the removal of write consistency level for
replication operations, `wait_for_active_shards` is used for
write operations to represent values for ActiveShardCount. This
commit moves the cluster health API's usage of `wait_for_active_shards`
to be consistent with its usage in the write operation APIs.

This commit also changes `wait_for_relocating_shards` from a
numeric value to a simple boolean value `wait_for_no_relocating_shards`
to set whether the cluster health operation should wait for
all relocating shards to complete relocation.

* Addresses code review comments

* Don't be lenient if `wait_for_relocating_shards` is set
  • Loading branch information
Ali Beyad authored Aug 31, 2016
1 parent 6cac3e9 commit 4641254
Show file tree
Hide file tree
Showing 36 changed files with 298 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
Expand All @@ -41,8 +42,8 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
private String[] indices;
private TimeValue timeout = new TimeValue(30, TimeUnit.SECONDS);
private ClusterHealthStatus waitForStatus;
private int waitForRelocatingShards = -1;
private int waitForActiveShards = -1;
private boolean waitForNoRelocatingShards = false;
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
private String waitForNodes = "";
private Priority waitForEvents = null;

Expand Down Expand Up @@ -102,24 +103,52 @@ public ClusterHealthRequest waitForYellowStatus() {
return waitForStatus(ClusterHealthStatus.YELLOW);
}

public int waitForRelocatingShards() {
return waitForRelocatingShards;
public boolean waitForNoRelocatingShards() {
return waitForNoRelocatingShards;
}

public ClusterHealthRequest waitForRelocatingShards(int waitForRelocatingShards) {
this.waitForRelocatingShards = waitForRelocatingShards;
/**
* Sets whether the request should wait for there to be no relocating shards before
* retrieving the cluster health status. Defaults to {@code false}, meaning the
* operation does not wait on there being no more relocating shards. Set to <code>true</code>
* to wait until the number of relocating shards in the cluster is 0.
*/
public ClusterHealthRequest waitForNoRelocatingShards(boolean waitForNoRelocatingShards) {
this.waitForNoRelocatingShards = waitForNoRelocatingShards;
return this;
}

public int waitForActiveShards() {
public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}

public ClusterHealthRequest waitForActiveShards(int waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
/**
* Sets the number of shard copies that must be active across all indices before getting the
* health status. Defaults to {@link ActiveShardCount#NONE}, meaning we don't wait on any active shards.
* Set this value to {@link ActiveShardCount#ALL} to wait for all shards (primary and
* all replicas) to be active across all indices in the cluster. Otherwise, use
* {@link ActiveShardCount#from(int)} to set this value to any non-negative integer, up to the
* total number of shard copies to wait for.
*/
public ClusterHealthRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
if (waitForActiveShards.equals(ActiveShardCount.DEFAULT)) {
// the default for cluster health request is 0, not 1
this.waitForActiveShards = ActiveShardCount.NONE;
} else {
this.waitForActiveShards = waitForActiveShards;
}
return this;
}

/**
* A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public ClusterHealthRequest waitForActiveShards(final int waitForActiveShards) {
return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}

public String waitForNodes() {
return waitForNodes;
}
Expand Down Expand Up @@ -162,8 +191,8 @@ public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
waitForStatus = ClusterHealthStatus.fromValue(in.readByte());
}
waitForRelocatingShards = in.readInt();
waitForActiveShards = in.readInt();
waitForNoRelocatingShards = in.readBoolean();
waitForActiveShards = ActiveShardCount.readFrom(in);
waitForNodes = in.readString();
if (in.readBoolean()) {
waitForEvents = Priority.readFrom(in);
Expand All @@ -188,8 +217,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(true);
out.writeByte(waitForStatus.value());
}
out.writeInt(waitForRelocatingShards);
out.writeInt(waitForActiveShards);
out.writeBoolean(waitForNoRelocatingShards);
waitForActiveShards.writeTo(out);
out.writeString(waitForNodes);
if (waitForEvents == null) {
out.writeBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.cluster.health;

import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
Expand Down Expand Up @@ -64,11 +65,40 @@ public ClusterHealthRequestBuilder setWaitForYellowStatus() {
return this;
}

public ClusterHealthRequestBuilder setWaitForRelocatingShards(int waitForRelocatingShards) {
request.waitForRelocatingShards(waitForRelocatingShards);
/**
* Sets whether the request should wait for there to be no relocating shards before
* retrieving the cluster health status. Defaults to <code>false</code>, meaning the
* operation does not wait on there being no more relocating shards. Set to <code>true</code>
* to wait until the number of relocating shards in the cluster is 0.
*/
public ClusterHealthRequestBuilder setWaitForNoRelocatingShards(boolean waitForRelocatingShards) {
request.waitForNoRelocatingShards(waitForRelocatingShards);
return this;
}

/**
* Sets the number of shard copies that must be active before getting the health status.
* Defaults to {@link ActiveShardCount#NONE}, meaning we don't wait on any active shards.
* Set this value to {@link ActiveShardCount#ALL} to wait for all shards (primary and
* all replicas) to be active across all indices in the cluster. Otherwise, use
* {@link ActiveShardCount#from(int)} to set this value to any non-negative integer, up to the
* total number of shard copies that would exist across all indices in the cluster.
*/
public ClusterHealthRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
if (waitForActiveShards.equals(ActiveShardCount.DEFAULT)) {
// the default for cluster health is 0, not 1
request.waitForActiveShards(ActiveShardCount.NONE);
} else {
request.waitForActiveShards(waitForActiveShards);
}
return this;
}

/**
* A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public ClusterHealthRequestBuilder setWaitForActiveShards(int waitForActiveShards) {
request.waitForActiveShards(waitForActiveShards);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -125,10 +126,10 @@ private void executeHealth(final ClusterHealthRequest request, final ActionListe
if (request.waitForStatus() == null) {
waitFor--;
}
if (request.waitForRelocatingShards() == -1) {
if (request.waitForNoRelocatingShards() == false) {
waitFor--;
}
if (request.waitForActiveShards() == -1) {
if (request.waitForActiveShards().equals(ActiveShardCount.NONE)) {
waitFor--;
}
if (request.waitForNodes().isEmpty()) {
Expand Down Expand Up @@ -203,11 +204,22 @@ private boolean prepareResponse(final ClusterHealthRequest request, final Cluste
if (request.waitForStatus() != null && response.getStatus().value() <= request.waitForStatus().value()) {
waitForCounter++;
}
if (request.waitForRelocatingShards() != -1 && response.getRelocatingShards() <= request.waitForRelocatingShards()) {
if (request.waitForNoRelocatingShards() && response.getRelocatingShards() == 0) {
waitForCounter++;
}
if (request.waitForActiveShards() != -1 && response.getActiveShards() >= request.waitForActiveShards()) {
waitForCounter++;
if (request.waitForActiveShards().equals(ActiveShardCount.NONE) == false) {
ActiveShardCount waitForActiveShards = request.waitForActiveShards();
assert waitForActiveShards.equals(ActiveShardCount.DEFAULT) == false :
"waitForActiveShards must not be DEFAULT on the request object, instead it should be NONE";
if (waitForActiveShards.equals(ActiveShardCount.ALL)
&& response.getUnassignedShards() == 0
&& response.getInitializingShards() == 0) {
// if we are waiting for all shards to be active, then the num of unassigned and num of initializing shards must be 0
waitForCounter++;
} else if (waitForActiveShards.enoughShardsActive(response.getActiveShards())) {
// there are enough active shards to meet the requirements of the request
waitForCounter++;
}
}
if (request.indices() != null && request.indices().length > 0) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,25 @@ public static ActiveShardCount parseString(final String str) {
}
}

/**
* Returns true iff the given number of active shards is enough to meet
* the required shard count represented by this instance. This method
* should only be invoked with {@link ActiveShardCount} objects created
* from {@link #from(int)}, or {@link #NONE} or {@link #ONE}.
*/
public boolean enoughShardsActive(final int activeShardCount) {
if (this.value < 0) {
throw new IllegalStateException("not enough information to resolve to shard count");
}
if (activeShardCount < 0) {
throw new IllegalArgumentException("activeShardCount cannot be negative");
}
return this.value <= activeShardCount;
}

/**
* Returns true iff the given cluster state's routing table contains enough active
* shards to meet the required shard count represented by this instance.
* shards for the given index to meet the required shard count represented by this instance.
*/
public boolean enoughShardsActive(final ClusterState clusterState, final String indexName) {
if (this == ActiveShardCount.NONE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Priority;
Expand Down Expand Up @@ -57,9 +58,17 @@ public void handleRequest(final RestRequest request, final RestChannel channel,
if (waitForStatus != null) {
clusterHealthRequest.waitForStatus(ClusterHealthStatus.valueOf(waitForStatus.toUpperCase(Locale.ROOT)));
}
clusterHealthRequest.waitForRelocatingShards(
request.paramAsInt("wait_for_relocating_shards", clusterHealthRequest.waitForRelocatingShards()));
clusterHealthRequest.waitForActiveShards(request.paramAsInt("wait_for_active_shards", clusterHealthRequest.waitForActiveShards()));
clusterHealthRequest.waitForNoRelocatingShards(
request.paramAsBoolean("wait_for_no_relocating_shards", clusterHealthRequest.waitForNoRelocatingShards()));
if (request.hasParam("wait_for_relocating_shards")) {
// wait_for_relocating_shards has been removed in favor of wait_for_no_relocating_shards
throw new IllegalArgumentException("wait_for_relocating_shards has been removed, " +
"use wait_for_no_relocating_shards [true/false] instead");
}
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
clusterHealthRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
clusterHealthRequest.waitForNodes(request.param("wait_for_nodes", clusterHealthRequest.waitForNodes()));
if (request.param("wait_for_events") != null) {
clusterHealthRequest.waitForEvents(Priority.valueOf(request.param("wait_for_events").toUpperCase(Locale.ROOT)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,25 @@ public void testEnoughShardsActiveLevelAll() {
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
}

public void testEnoughShardsActiveValueBased() {
// enough shards active case
int threshold = randomIntBetween(1, 50);
ActiveShardCount waitForActiveShards = ActiveShardCount.from(randomIntBetween(0, threshold));
assertTrue(waitForActiveShards.enoughShardsActive(randomIntBetween(threshold, 50)));
// not enough shards active
waitForActiveShards = ActiveShardCount.from(randomIntBetween(threshold, 50));
assertFalse(waitForActiveShards.enoughShardsActive(randomIntBetween(0, threshold - 1)));
// wait for zero shards should always pass
assertTrue(ActiveShardCount.from(0).enoughShardsActive(randomIntBetween(0, 50)));
// invalid values
Exception e = expectThrows(IllegalStateException.class, () -> ActiveShardCount.ALL.enoughShardsActive(randomIntBetween(0, 50)));
assertEquals("not enough information to resolve to shard count", e.getMessage());
e = expectThrows(IllegalStateException.class, () -> ActiveShardCount.DEFAULT.enoughShardsActive(randomIntBetween(0, 50)));
assertEquals("not enough information to resolve to shard count", e.getMessage());
e = expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.NONE.enoughShardsActive(randomIntBetween(-10, -1)));
assertEquals("activeShardCount cannot be negative", e.getMessage());
}

private void runTestForOneActiveShard(final ActiveShardCount activeShardCount) {
final String indexName = "test-idx";
final int numberOfShards = randomIntBetween(1, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void testSimpleAwareness() throws Exception {
assertThat(awaitBusy(
() -> {
logger.info("--> waiting for no relocation");
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("3").setWaitForRelocatingShards(0).get();
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("3").setWaitForNoRelocatingShards(true).get();
if (clusterHealth.isTimedOut()) {
return false;
}
Expand Down Expand Up @@ -131,7 +131,7 @@ public void testAwarenessZones() throws Exception {
.put("index.number_of_replicas", 1)).execute().actionGet();

logger.info("--> waiting for shards to be allocated");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForRelocatingShards(0).execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
Expand Down Expand Up @@ -166,7 +166,7 @@ public void testAwarenessZonesIncrementalNodes() throws Exception {
client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put("index.number_of_shards", 5)
.put("index.number_of_replicas", 1)).execute().actionGet();
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").setWaitForRelocatingShards(0).execute().actionGet();
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("2").setWaitForNoRelocatingShards(true).execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
ObjectIntHashMap<String> counts = new ObjectIntHashMap<>();
Expand All @@ -186,7 +186,7 @@ public void testAwarenessZonesIncrementalNodes() throws Exception {
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("3").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
client().admin().cluster().prepareReroute().get();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("3").setWaitForActiveShards(10).setWaitForRelocatingShards(0).execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("3").setWaitForActiveShards(10).setWaitForNoRelocatingShards(true).execute().actionGet();

assertThat(health.isTimedOut(), equalTo(false));
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
Expand All @@ -208,7 +208,7 @@ public void testAwarenessZonesIncrementalNodes() throws Exception {
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
client().admin().cluster().prepareReroute().get();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("4").setWaitForActiveShards(10).setWaitForRelocatingShards(0).execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("4").setWaitForActiveShards(10).setWaitForNoRelocatingShards(true).execute().actionGet();

assertThat(health.isTimedOut(), equalTo(false));
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
Expand All @@ -229,7 +229,7 @@ public void testAwarenessZonesIncrementalNodes() throws Exception {
assertThat(counts.containsKey(noZoneNode), equalTo(false));
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put("cluster.routing.allocation.awareness.attributes", "").build()).get();

health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("4").setWaitForActiveShards(10).setWaitForRelocatingShards(0).execute().actionGet();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("4").setWaitForActiveShards(10).setWaitForNoRelocatingShards(true).execute().actionGet();

assertThat(health.isTimedOut(), equalTo(false));
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
Expand Down
Loading

0 comments on commit 4641254

Please sign in to comment.