Skip to content

Commit

Permalink
Add per-task description to pending cluster tasks
Browse files Browse the repository at this point in the history
Deferring this until after elastic#92021 when there will be more typesafety and
better tests.
  • Loading branch information
DaveCTurner committed Dec 5, 2022
1 parent f3e9041 commit 1d062b0
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.admin.cluster.tasks;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -16,15 +17,28 @@

public class PendingClusterTasksRequest extends MasterNodeReadRequest<PendingClusterTasksRequest> {

public PendingClusterTasksRequest() {}
private final boolean detailed;

public PendingClusterTasksRequest(boolean detailed) {
this.detailed = detailed;
}

public PendingClusterTasksRequest(StreamInput in) throws IOException {
super(in);
if (in.getVersion().onOrAfter(Version.V_8_7_0)) {
detailed = in.readBoolean();
} else {
// earlier versions don't support detailed mode
detailed = false;
}
}

@Override
public ActionRequestValidationException validate() {
return null;
}

public boolean detailed() {
return detailed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ public class PendingClusterTasksRequestBuilder extends MasterNodeReadOperationRe
PendingClusterTasksRequestBuilder> {

public PendingClusterTasksRequestBuilder(ElasticsearchClient client, PendingClusterTasksAction action) {
super(client, action, new PendingClusterTasksRequest());
super(client, action, new PendingClusterTasksRequest(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

public class PendingClusterTasksResponse extends ActionResponse implements ChunkedToXContent {

public static final String DETAILED_PARAM_NAME = "detailed";

private final List<PendingClusterTask> pendingTasks;

public PendingClusterTasksResponse(StreamInput in) throws IOException {
Expand Down Expand Up @@ -56,6 +58,7 @@ public String toString() {

@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
final var detailed = params.paramAsBoolean(DETAILED_PARAM_NAME, false);
return Iterators.concat(Iterators.single((builder, p) -> {
builder.startObject();
builder.startArray(Fields.TASKS);
Expand All @@ -65,6 +68,9 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
builder.field(Fields.INSERT_ORDER, pendingClusterTask.getInsertOrder());
builder.field(Fields.PRIORITY, pendingClusterTask.getPriority());
builder.field(Fields.SOURCE, pendingClusterTask.getSource());
if (detailed) {
builder.field(Fields.TASK, pendingClusterTask.taskDescription());
}
builder.field(Fields.EXECUTING, pendingClusterTask.isExecuting());
builder.field(Fields.TIME_IN_QUEUE_MILLIS, pendingClusterTask.getTimeInQueueInMillis());
builder.field(Fields.TIME_IN_QUEUE, pendingClusterTask.getTimeInQueue());
Expand All @@ -84,6 +90,7 @@ static final class Fields {
static final String INSERT_ORDER = "insert_order";
static final String PRIORITY = "priority";
static final String SOURCE = "source";
static final String TASK = "task";
static final String TIME_IN_QUEUE_MILLIS = "time_in_queue_millis";
static final String TIME_IN_QUEUE = "time_in_queue";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected void masterOperation(
ActionListener<PendingClusterTasksResponse> listener
) {
logger.trace("fetching pending tasks from cluster service");
final List<PendingClusterTask> pendingTasks = clusterService.getMasterService().pendingTasks();
final List<PendingClusterTask> pendingTasks = clusterService.getMasterService().pendingTasks(request.detailed());
logger.trace("done fetching pending tasks from cluster service");
listener.onResponse(new PendingClusterTasksResponse(pendingTasks));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,13 @@ public <T extends ClusterStateTaskListener> void submitStateUpdateTask(
* Returns the tasks that are pending.
*/
public List<PendingClusterTask> pendingTasks() {
return pendingTasks(false);
}

/**
* Returns the tasks that are pending.
*/
public List<PendingClusterTask> pendingTasks(boolean detailed) {
return Arrays.stream(threadPoolExecutor.getPending()).map(pending -> {
assert pending.task instanceof SourcePrioritizedRunnable
: "thread pool executor should only use SourcePrioritizedRunnable instances but found: "
Expand All @@ -622,6 +629,7 @@ public List<PendingClusterTask> pendingTasks() {
pending.insertionOrder,
pending.priority,
new Text(task.source()),
detailed && task instanceof TaskBatcher.BatchedTask batchedTask ? batchedTask.getTask().toString() : null,
task.getAgeInMillis(),
pending.executing
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.cluster.service;

import org.elasticsearch.Version;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -17,17 +18,35 @@

import java.io.IOException;

public record PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue, boolean executing)
implements
Writeable {
public record PendingClusterTask(
long insertOrder,
Priority priority,
Text source,
String taskDescription,
long timeInQueue,
boolean executing
) implements Writeable {

public PendingClusterTask(StreamInput in) throws IOException {
this(in.readVLong(), Priority.readFrom(in), in.readText(), in.readLong(), in.readBoolean());
this(in.readVLong(), Priority.readFrom(in), readSource(in), readDescription(in), readTimeInQueue(in), in.readBoolean());
}

private static Text readSource(StreamInput in) throws IOException {
return in.getVersion().onOrAfter(Version.V_8_7_0) ? new Text(in.readString()) : in.readText();
}

private static String readDescription(StreamInput in) throws IOException {
return in.getVersion().onOrAfter(Version.V_8_7_0) ? in.readOptionalString() : null;
}

private static long readTimeInQueue(StreamInput in) throws IOException {
return in.getVersion().onOrAfter(Version.V_8_7_0) ? in.readVLong() : in.readLong();
}

public PendingClusterTask {
assert timeInQueue >= 0 : "got a negative timeInQueue [" + timeInQueue + "]";
assert insertOrder >= 0 : "got a negative insertOrder [" + insertOrder + "]";
assert source.hasString();
}

public long getInsertOrder() {
Expand Down Expand Up @@ -58,8 +77,15 @@ public boolean isExecuting() {
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(insertOrder);
Priority.writeTo(priority, out);
out.writeText(source);
out.writeLong(timeInQueue);
if (out.getVersion().onOrAfter(Version.V_8_7_0)) {
out.writeString(source.string());
out.writeOptionalString(taskDescription);
out.writeVLong(timeInQueue);
} else {
out.writeText(source);
// earlier versions don't support detailed mode, can just omit the task description
out.writeLong(timeInQueue);
}
out.writeBoolean(executing);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.IOException;
import java.util.List;

import static org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse.DETAILED_PARAM_NAME;
import static org.elasticsearch.rest.RestRequest.Method.GET;

public class RestPendingClusterTasksAction extends BaseRestHandler {
Expand All @@ -33,7 +34,9 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest();
PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest(
request.paramAsBoolean(DETAILED_PARAM_NAME, false)
);
pendingClusterTasksRequest.masterNodeTimeout(request.paramAsTime("master_timeout", pendingClusterTasksRequest.masterNodeTimeout()));
pendingClusterTasksRequest.local(request.paramAsBoolean("local", pendingClusterTasksRequest.local()));
return channel -> client.admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected void documentation(StringBuilder sb) {

@Override
public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) {
PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest();
PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest(false);
pendingClusterTasksRequest.masterNodeTimeout(request.paramAsTime("master_timeout", pendingClusterTasksRequest.masterNodeTimeout()));
pendingClusterTasksRequest.local(request.paramAsBoolean("local", pendingClusterTasksRequest.local()));
return channel -> client.admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public void testPendingClusterTasksResponseChunking() throws IOException {
randomNonNegativeLong(),
randomFrom(Priority.values()),
new Text(randomAlphaOfLengthBetween(1, 10)),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 10),
randomNonNegativeLong(),
randomBoolean()
)
Expand Down

0 comments on commit 1d062b0

Please sign in to comment.