Skip to content

Commit

Permalink
Cluster Health: Add max wait time for pending task and active shard p…
Browse files Browse the repository at this point in the history
…ercentage

In order to get a quick overview using by simply checking the cluster state
and its corresponding cat API, the following two attributes have been added
to the cluster health response:

* task max waiting time, the time value of the first task of the
  queue and how long it has been waiting
* active shards percent: The percentage of the number of shards that are in
  initializing state

This makes the cluster health API handy to check, when a fully restarted
cluster is back up and running.

Closes #10805
  • Loading branch information
spinscale committed Jun 22, 2015
1 parent 8b60083 commit 88f8d58
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTableValidation;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
Expand Down Expand Up @@ -60,6 +62,8 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
int numberOfPendingTasks = 0;
int numberOfInFlightFetch = 0;
int delayedUnassignedShards = 0;
TimeValue taskMaxWaitingTime = TimeValue.timeValueMillis(0);
double activeShardsPercent = 100;
boolean timedOut = false;
ClusterHealthStatus status = ClusterHealthStatus.RED;
private List<String> validationFailures;
Expand All @@ -70,15 +74,19 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl

/** needed for plugins BWC */
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState) {
this(clusterName, concreteIndices, clusterState, -1, -1, -1);
this(clusterName, concreteIndices, clusterState, -1, -1, -1, TimeValue.timeValueHours(0));
}

public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState, int numberOfPendingTasks,
int numberOfInFlightFetch, int delayedUnassignedShards) {
int numberOfInFlightFetch, int delayedUnassignedShards, TimeValue taskMaxWaitingTime) {
this.clusterName = clusterName;
this.numberOfPendingTasks = numberOfPendingTasks;
this.numberOfInFlightFetch = numberOfInFlightFetch;
this.delayedUnassignedShards = delayedUnassignedShards;
this.clusterName = clusterName;
this.numberOfPendingTasks = numberOfPendingTasks;
this.numberOfInFlightFetch = numberOfInFlightFetch;
this.taskMaxWaitingTime = taskMaxWaitingTime;
RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData());
validationFailures = validation.failures();
numberOfNodes = clusterState.nodes().size();
Expand Down Expand Up @@ -116,6 +124,20 @@ public ClusterHealthResponse(String clusterName, String[] concreteIndices, Clust
} else if (clusterState.blocks().hasGlobalBlock(RestStatus.SERVICE_UNAVAILABLE)) {
status = ClusterHealthStatus.RED;
}

// shortcut on green
if (status.equals(ClusterHealthStatus.GREEN)) {
this.activeShardsPercent = 100;
} else {
List<ShardRouting> shardRoutings = clusterState.getRoutingTable().allShards();
int activeShardCount = 0;
int totalShardCount = 0;
for (ShardRouting shardRouting : shardRoutings) {
if (shardRouting.active()) activeShardCount++;
totalShardCount++;
}
this.activeShardsPercent = (((double) activeShardCount) / totalShardCount) * 100;
}
}

public String getClusterName() {
Expand Down Expand Up @@ -200,6 +222,21 @@ public Map<String, ClusterIndexHealth> getIndices() {
return indices;
}

/**
*
* @return The maximum wait time of all tasks in the queue
*/
public TimeValue getTaskMaxWaitingTime() {
return taskMaxWaitingTime;
}

/**
* The percentage of active shards, should be 100% in a green system
*/
public double getActiveShardsPercent() {
return activeShardsPercent;
}

@Override
public Iterator<ClusterIndexHealth> iterator() {
return indices.values().iterator();
Expand Down Expand Up @@ -244,6 +281,9 @@ public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_1_7_0)) {
delayedUnassignedShards= in.readInt();
}

activeShardsPercent = in.readDouble();
taskMaxWaitingTime = TimeValue.readTimeValue(in);
}

@Override
Expand Down Expand Up @@ -274,6 +314,8 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_1_7_0)) {
out.writeInt(delayedUnassignedShards);
}
out.writeDouble(activeShardsPercent);
taskMaxWaitingTime.writeTo(out);
}


Expand All @@ -299,6 +341,10 @@ static final class Fields {
static final XContentBuilderString NUMBER_OF_PENDING_TASKS = new XContentBuilderString("number_of_pending_tasks");
static final XContentBuilderString NUMBER_OF_IN_FLIGHT_FETCH = new XContentBuilderString("number_of_in_flight_fetch");
static final XContentBuilderString DELAYED_UNASSIGNED_SHARDS = new XContentBuilderString("delayed_unassigned_shards");
static final XContentBuilderString TASK_MAX_WAIT_TIME_IN_QUEUE = new XContentBuilderString("task_max_waiting_in_queue");
static final XContentBuilderString TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS = new XContentBuilderString("task_max_waiting_in_queue_millis");
static final XContentBuilderString ACTIVE_SHARDS_PERCENT_AS_NUMBER = new XContentBuilderString("active_shards_percent_as_number");
static final XContentBuilderString ACTIVE_SHARDS_PERCENT = new XContentBuilderString("active_shards_percent");
static final XContentBuilderString ACTIVE_PRIMARY_SHARDS = new XContentBuilderString("active_primary_shards");
static final XContentBuilderString ACTIVE_SHARDS = new XContentBuilderString("active_shards");
static final XContentBuilderString RELOCATING_SHARDS = new XContentBuilderString("relocating_shards");
Expand All @@ -323,6 +369,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(Fields.DELAYED_UNASSIGNED_SHARDS, getDelayedUnassignedShards());
builder.field(Fields.NUMBER_OF_PENDING_TASKS, getNumberOfPendingTasks());
builder.field(Fields.NUMBER_OF_IN_FLIGHT_FETCH, getNumberOfInFlightFetch());
builder.timeValueField(Fields.TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS, Fields.TASK_MAX_WAIT_TIME_IN_QUEUE, getTaskMaxWaitingTime());
builder.percentageField(Fields.ACTIVE_SHARDS_PERCENT_AS_NUMBER, Fields.ACTIVE_SHARDS_PERCENT, getActiveShardsPercent());

String level = params.param("level", "cluster");
boolean outputIndices = "indices".equals(level) || "shards".equals(level);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -170,12 +167,14 @@ public void onTimeout(TimeValue timeout) {
}

private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor) {
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks(), gatewayAllocator.getNumberOfInFlightFetch());
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks(),
gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMaxTaskWaitTime());
return prepareResponse(request, response, clusterState, waitFor);
}

private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor, boolean timedOut) {
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks(), gatewayAllocator.getNumberOfInFlightFetch());
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks(),
gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMaxTaskWaitTime());
boolean valid = prepareResponse(request, response, clusterState, waitFor);
assert valid || timedOut;
// we check for a timeout here since this method might be called from the wait_for_events
Expand Down Expand Up @@ -259,20 +258,25 @@ private boolean prepareResponse(final ClusterHealthRequest request, final Cluste
}


private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, ClusterState clusterState, int numberOfPendingTasks, int numberOfInFlightFetch) {
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, ClusterState clusterState, int numberOfPendingTasks, int numberOfInFlightFetch,
TimeValue pendingTaskTimeInQueue) {
if (logger.isTraceEnabled()) {
logger.trace("Calculating health based on state version [{}]", clusterState.version());
}

String[] concreteIndices;
try {
concreteIndices = clusterState.metaData().concreteIndices(request.indicesOptions(), request.indices());
} catch (IndexMissingException e) {
// one of the specified indices is not there - treat it as RED.
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState, numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(settings, clusterState));
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState,
numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(settings, clusterState),
pendingTaskTimeInQueue);
response.status = ClusterHealthStatus.RED;
return response;
}

return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(settings, clusterState));
return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks,
numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(settings, clusterState), pendingTaskTimeInQueue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,10 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
*/
int numberOfPendingTasks();

/**
* Returns the maximum wait time for tasks in the queue
*
* @returns A zero time value if the queue is empty, otherwise the time value oldest task waiting in the queue
*/
TimeValue getMaxTaskWaitTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public void add(@Nullable final TimeValue timeout, final TimeoutClusterStateList
}
// call the post added notification on the same event thread
try {
updateTasksExecutor.execute(new TimedPrioritizedRunnable(Priority.HIGH, "_add_listener_") {
updateTasksExecutor.execute(new PrioritizedRunnable(Priority.HIGH) {
@Override
public void run() {
if (timeout != null) {
Expand Down Expand Up @@ -312,10 +312,10 @@ public List<PendingClusterTask> pendingTasks() {
final Object task = pending.task;
if (task == null) {
continue;
} else if (task instanceof TimedPrioritizedRunnable) {
TimedPrioritizedRunnable runnable = (TimedPrioritizedRunnable) task;
} else if (task instanceof UpdateTask) {
UpdateTask runnable = (UpdateTask) task;
source = runnable.source();
timeInQueue = runnable.timeSinceCreatedInMillis();
timeInQueue = runnable.getAgeInMillis();
} else {
assert false : "expected TimedPrioritizedRunnable got " + task.getClass();
source = "unknown [" + task.getClass() + "]";
Expand All @@ -332,37 +332,26 @@ public int numberOfPendingTasks() {
return updateTasksExecutor.getNumberOfPendingTasks();
}

@Override
public TimeValue getMaxTaskWaitTime() {
return updateTasksExecutor.getMaxTaskWaitTime();
}

class UpdateTask extends PrioritizedRunnable {

static abstract class TimedPrioritizedRunnable extends PrioritizedRunnable {
private final long creationTimeNS;
public final ClusterStateUpdateTask updateTask;
protected final String source;

protected TimedPrioritizedRunnable(Priority priority, String source) {

UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
super(priority);
this.updateTask = updateTask;
this.source = source;
this.creationTimeNS = System.nanoTime();
}

public long timeSinceCreatedInMillis() {
// max with 0 to make sure we always return a non negative number
// even if time shifts.
return Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - creationTimeNS));
}

public String source() {
return source;
}
}

class UpdateTask extends TimedPrioritizedRunnable {

public final ClusterStateUpdateTask updateTask;


UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
super(priority, source);
this.updateTask = updateTask;
}

@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
package org.elasticsearch.common.util.concurrent;


import java.util.concurrent.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* An extension to thread pool executor, allowing (in the future) to add specific additional stats to it.
Expand Down Expand Up @@ -67,8 +70,8 @@ protected synchronized void terminated() {
}
}

public static interface ShutdownListener {
public void onTerminated();
public interface ShutdownListener {
void onTerminated();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
*/
public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {

private static final TimeValue NO_WAIT_TIME_VALUE = TimeValue.timeValueMillis(0);
private AtomicLong insertionOrder = new AtomicLong();
private Queue<Runnable> current = ConcurrentCollections.newQueue();

Expand All @@ -56,6 +57,26 @@ public int getNumberOfPendingTasks() {
return size;
}

/**
* Returns the waiting time of the first task in the queue
*/
public TimeValue getMaxTaskWaitTime() {
if (getQueue().size() == 0) {
return NO_WAIT_TIME_VALUE;
}

long now = System.nanoTime();
long oldestCreationDateInNanos = now;
for (Runnable queuedRunnable : getQueue()) {
if (queuedRunnable instanceof PrioritizedRunnable) {
oldestCreationDateInNanos = Math.min(oldestCreationDateInNanos,
((PrioritizedRunnable) queuedRunnable).getCreationDateInNanos());
}
}

return TimeValue.timeValueNanos(now - oldestCreationDateInNanos);
}

private void addPending(List<Runnable> runnables, List<Pending> pending, boolean executing) {
for (Runnable runnable : runnables) {
if (runnable instanceof TieBreakingPrioritizedRunnable) {
Expand Down Expand Up @@ -191,7 +212,6 @@ private void runAndClean(Runnable run) {
timeoutFuture = null;
}
}

}

private final class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,31 @@
package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;

/**
*
*/
public abstract class PrioritizedRunnable implements Runnable, Comparable<PrioritizedRunnable> {

private final Priority priority;
private final long creationDate;

public static PrioritizedRunnable wrap(Runnable runnable, Priority priority) {
return new Wrapped(runnable, priority);
}

protected PrioritizedRunnable(Priority priority) {
this.priority = priority;
creationDate = System.nanoTime();
}

public long getCreationDateInNanos() {
return creationDate;
}

public long getAgeInMillis() {
return Math.max(0, (System.nanoTime() - creationDate) / 1000);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.math.RoundingMode;
import java.util.Calendar;
import java.util.Date;
import java.util.Locale;
import java.util.Map;

/**
Expand Down Expand Up @@ -957,6 +958,14 @@ public XContentBuilder byteSizeField(XContentBuilderString rawFieldName, XConten
return this;
}

public XContentBuilder percentageField(XContentBuilderString rawFieldName, XContentBuilderString readableFieldName, double percentage) throws IOException {
if (humanReadable) {
field(readableFieldName, String.format(Locale.ROOT, "%1.1f%%", percentage));
}
field(rawFieldName, percentage);
return this;
}

public XContentBuilder value(Boolean value) throws IOException {
if (value == null) {
return nullValue();
Expand Down
Loading

0 comments on commit 88f8d58

Please sign in to comment.