diff --git a/docs/reference/rollup/apis/delete-job.asciidoc b/docs/reference/rollup/apis/delete-job.asciidoc index 37774560848c5..f649d3ee60d97 100644 --- a/docs/reference/rollup/apis/delete-job.asciidoc +++ b/docs/reference/rollup/apis/delete-job.asciidoc @@ -8,8 +8,8 @@ experimental[] -This API deletes an existing rollup job. The job can be started or stopped, in both cases it will be deleted. Attempting -to delete a non-existing job will throw an exception +This API deletes an existing rollup job. A job must be *stopped* first before it can be deleted. Attempting to delete +a started job will result in an error. Similarly, attempting to delete a nonexistent job will throw an exception. .Deleting the job does not delete rolled up data ********************************** @@ -99,12 +99,12 @@ A 404 `resource_not_found` exception will be thrown: "root_cause" : [ { "type" : "resource_not_found_exception", - "reason" : "the task with id does_not_exist doesn't exist", + "reason" : "the task with id [does_not_exist] doesn't exist", "stack_trace": ... } ], "type" : "resource_not_found_exception", - "reason" : "the task with id does_not_exist doesn't exist", + "reason" : "the task with id [does_not_exist] doesn't exist", "stack_trace": ... }, "status": 404 diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java index 0ab1391faa211..e6f1c52aae891 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java @@ -51,8 +51,6 @@ */ public class ListTasksResponse extends BaseTasksResponse implements ToXContentObject { private static final String TASKS = "tasks"; - private static final String TASK_FAILURES = "task_failures"; - private static final String NODE_FAILURES = "node_failures"; private List tasks; @@ -246,28 +244,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - private void toXContentCommon(XContentBuilder builder, Params params) throws IOException { - if (getTaskFailures() != null && getTaskFailures().size() > 0) { - builder.startArray(TASK_FAILURES); - for (TaskOperationFailure ex : getTaskFailures()){ - builder.startObject(); - builder.value(ex); - builder.endObject(); - } - builder.endArray(); - } - - if (getNodeFailures() != null && getNodeFailures().size() > 0) { - builder.startArray(NODE_FAILURES); - for (ElasticsearchException ex : getNodeFailures()) { - builder.startObject(); - ex.toXContent(builder, params); - builder.endObject(); - } - builder.endArray(); - } - } - public static ListTasksResponse fromXContent(XContentParser parser) { return PARSER.apply(parser, null); } diff --git a/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java b/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java index 1436410bf2046..090aaf628ac52 100644 --- a/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java +++ b/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java @@ -25,12 +25,15 @@ import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.tasks.TaskId; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.stream.Stream; import static java.util.stream.Collectors.toList; @@ -41,6 +44,9 @@ * Base class for responses of task-related operations */ public class BaseTasksResponse extends ActionResponse { + protected static final String TASK_FAILURES = "task_failures"; + protected static final String NODE_FAILURES = "node_failures"; + private List taskFailures; private List nodeFailures; @@ -103,4 +109,44 @@ public void writeTo(StreamOutput out) throws IOException { exp.writeTo(out); } } + + protected void toXContentCommon(XContentBuilder builder, ToXContent.Params params) throws IOException { + if (getTaskFailures() != null && getTaskFailures().size() > 0) { + builder.startArray(TASK_FAILURES); + for (TaskOperationFailure ex : getTaskFailures()){ + builder.startObject(); + builder.value(ex); + builder.endObject(); + } + builder.endArray(); + } + + if (getNodeFailures() != null && getNodeFailures().size() > 0) { + builder.startArray(NODE_FAILURES); + for (ElasticsearchException ex : getNodeFailures()) { + builder.startObject(); + ex.toXContent(builder, params); + builder.endObject(); + } + builder.endArray(); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BaseTasksResponse response = (BaseTasksResponse) o; + return taskFailures.equals(response.taskFailures) + && nodeFailures.equals(response.nodeFailures); + } + + @Override + public int hashCode() { + return Objects.hash(taskFailures, nodeFailures); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 8c7cb27101096..91d70b260fe80 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -68,6 +68,7 @@ import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -449,7 +450,7 @@ private void wipeClusterSettings() throws IOException { } } - private void wipeRollupJobs() throws IOException { + private void wipeRollupJobs() throws IOException, InterruptedException { Response response = adminClient().performRequest(new Request("GET", "/_xpack/rollup/job/_all")); Map jobs = entityAsMap(response); @SuppressWarnings("unchecked") @@ -460,6 +461,29 @@ private void wipeRollupJobs() throws IOException { return; } + for (Map jobConfig : jobConfigs) { + @SuppressWarnings("unchecked") + String jobId = (String) ((Map) jobConfig.get("config")).get("id"); + Request request = new Request("POST", "/_xpack/rollup/job/" + jobId + "/_stop"); + request.addParameter("ignore", "404"); + logger.debug("stopping rollup job [{}]", jobId); + adminClient().performRequest(request); + } + + // TODO this is temporary until StopJob API gains the ability to block until stopped + awaitBusy(() -> { + Request request = new Request("GET", "/_xpack/rollup/job/_all"); + try { + Response jobsResponse = adminClient().performRequest(request); + String body = EntityUtils.toString(jobsResponse.getEntity()); + logger.error(body); + // If the body contains any of the non-stopped states, at least one job is not finished yet + return Arrays.stream(new String[]{"started", "aborting", "stopping", "indexing"}).noneMatch(body::contains); + } catch (IOException e) { + return false; + } + }, 10, TimeUnit.SECONDS); + for (Map jobConfig : jobConfigs) { @SuppressWarnings("unchecked") String jobId = (String) ((Map) jobConfig.get("config")).get("id"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java index df2c70c76539e..f1c6213cd70e3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java @@ -7,22 +7,29 @@ import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.AcknowledgedRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.support.tasks.BaseTasksRequest; +import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.tasks.Task; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.rollup.RollupField; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.Objects; -public class DeleteRollupJobAction extends Action { +public class DeleteRollupJobAction extends Action { public static final DeleteRollupJobAction INSTANCE = new DeleteRollupJobAction(); public static final String NAME = "cluster:admin/xpack/rollup/delete"; @@ -32,11 +39,11 @@ private DeleteRollupJobAction() { } @Override - public AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); + public Response newResponse() { + return new Response(); } - public static class Request extends AcknowledgedRequest implements ToXContent { + public static class Request extends BaseTasksRequest implements ToXContentFragment { private String id; public Request(String id) { @@ -45,6 +52,11 @@ public Request(String id) { public Request() {} + @Override + public boolean match(Task task) { + return task.getDescription().equals(RollupField.NAME + "_" + id); + } + public String getId() { return id; } @@ -90,10 +102,74 @@ public boolean equals(Object obj) { } } - public static class RequestBuilder extends MasterNodeOperationRequestBuilder { - + public static class RequestBuilder extends ActionRequestBuilder { protected RequestBuilder(ElasticsearchClient client, DeleteRollupJobAction action) { - super(client, action, new Request()); + super(client, action, new DeleteRollupJobAction.Request()); + } + } + + public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { + + private boolean acknowledged; + + public Response(StreamInput in) throws IOException { + super(Collections.emptyList(), Collections.emptyList()); + readFrom(in); + } + + public Response(boolean acknowledged, List taskFailures, List nodeFailures) { + super(taskFailures, nodeFailures); + this.acknowledged = acknowledged; + } + + public Response(boolean acknowledged) { + super(Collections.emptyList(), Collections.emptyList()); + this.acknowledged = acknowledged; + } + + public Response() { + super(Collections.emptyList(), Collections.emptyList()); + this.acknowledged = false; + } + + public boolean isDeleted() { + return acknowledged; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + acknowledged = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(acknowledged); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + toXContentCommon(builder, params); + builder.field("acknowledged", acknowledged); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeleteRollupJobAction.Response response = (DeleteRollupJobAction.Response) o; + return super.equals(o) && acknowledged == response.acknowledged; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), acknowledged); } } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java index 97b4483b1ff03..5cdc40df4d699 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java @@ -5,103 +5,101 @@ */ package org.elasticsearch.xpack.rollup.action; -import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction; -import org.elasticsearch.xpack.core.rollup.job.RollupJob; +import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; +import org.elasticsearch.xpack.rollup.job.RollupJobTask; -import java.util.Objects; -import java.util.concurrent.TimeUnit; +import java.io.IOException; +import java.util.List; -public class TransportDeleteRollupJobAction - extends TransportMasterNodeAction { - - private final PersistentTasksService persistentTasksService; +public class TransportDeleteRollupJobAction extends TransportTasksAction { @Inject - public TransportDeleteRollupJobAction(Settings settings, TransportService transportService, ThreadPool threadPool, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - PersistentTasksService persistentTasksService, ClusterService clusterService) { - super(settings, DeleteRollupJobAction.NAME, transportService, clusterService, threadPool, actionFilters, - indexNameExpressionResolver, DeleteRollupJobAction.Request::new); - this.persistentTasksService = persistentTasksService; + public TransportDeleteRollupJobAction(Settings settings, TransportService transportService, + ActionFilters actionFilters, ClusterService clusterService) { + super(settings, DeleteRollupJobAction.NAME, clusterService, transportService, actionFilters, + DeleteRollupJobAction.Request::new, DeleteRollupJobAction.Response::new, ThreadPool.Names.SAME); } @Override - protected String executor() { - return ThreadPool.Names.SAME; + protected void doExecute(Task task, DeleteRollupJobAction.Request request, ActionListener listener) { + final ClusterState state = clusterService.state(); + final DiscoveryNodes nodes = state.nodes(); + + if (nodes.isLocalNodeElectedMaster()) { + PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) { + super.doExecute(task, request, listener); + } else { + // If we couldn't find the job in the persistent task CS, it means it was deleted prior to this call, + // no need to go looking for the allocated task + listener.onFailure(new ResourceNotFoundException("the task with id [" + request.getId() + "] doesn't exist")); + } + + } else { + // Delegates DeleteJob to elected master node, so it becomes the coordinating node. + // Non-master nodes may have a stale cluster state that shows jobs which are cancelled + // on the master, which makes testing difficult. + if (nodes.getMasterNode() == null) { + listener.onFailure(new MasterNotDiscoveredException("no known master nodes")); + } else { + transportService.sendRequest(nodes.getMasterNode(), actionName, request, + new ActionListenerResponseHandler<>(listener, DeleteRollupJobAction.Response::new)); + } + } } @Override - protected AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); + protected void taskOperation(DeleteRollupJobAction.Request request, RollupJobTask jobTask, + ActionListener listener) { + + assert jobTask.getConfig().getId().equals(request.getId()); + IndexerState state = ((RollupJobStatus) jobTask.getStatus()).getIndexerState(); + if (state.equals(IndexerState.STOPPED) ) { + jobTask.onCancelled(); + listener.onResponse(new DeleteRollupJobAction.Response(true)); + } else { + listener.onFailure(new IllegalStateException("Could not delete job [" + request.getId() + "] because " + + "indexer state is [" + state + "]. Job must be [" + IndexerState.STOPPED + "] before deletion.")); + } } @Override - protected void masterOperation(DeleteRollupJobAction.Request request, ClusterState state, - ActionListener listener) throws Exception { - - String jobId = request.getId(); - TimeValue timeout = new TimeValue(60, TimeUnit.SECONDS); // TODO make this a config option - - // Step 1. Cancel the persistent task - persistentTasksService.sendRemoveRequest(jobId, new ActionListener>() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { - logger.debug("Request to cancel Task for Rollup job [" + jobId + "] successful."); - - // Step 2. Wait for the task to finish cancellation internally - persistentTasksService.waitForPersistentTaskCondition(jobId, Objects::isNull, timeout, - new PersistentTasksService.WaitForPersistentTaskListener() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { - logger.debug("Task for Rollup job [" + jobId + "] successfully canceled."); - listener.onResponse(new AcknowledgedResponse(true)); - } - - @Override - public void onFailure(Exception e) { - logger.error("Error while cancelling task for Rollup job [" + jobId - + "]." + e); - listener.onFailure(e); - } - - @Override - public void onTimeout(TimeValue timeout) { - String msg = "Stopping of Rollup job [" + jobId + "] timed out after [" + timeout + "]."; - logger.warn(msg); - listener.onFailure(new ElasticsearchException(msg)); - } - }); - } - - @Override - public void onFailure(Exception e) { - logger.error("Error while requesting to cancel task for Rollup job [" + jobId - + "]" + e); - listener.onFailure(e); - } - }); - + protected DeleteRollupJobAction.Response newResponse(DeleteRollupJobAction.Request request, List tasks, + List taskOperationFailures, + List failedNodeExceptions) { + // There should theoretically only be one task running the rollup job + // If there are more, in production it should be ok as long as they are acknowledge shutting down. + // But in testing we'd like to know there were more than one hence the assert + assert tasks.size() + taskOperationFailures.size() == 1; + boolean cancelled = tasks.size() > 0 && tasks.stream().allMatch(DeleteRollupJobAction.Response::isDeleted); + return new DeleteRollupJobAction.Response(cancelled, taskOperationFailures, failedNodeExceptions); } @Override - protected ClusterBlockException checkBlock(DeleteRollupJobAction.Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + protected DeleteRollupJobAction.Response readTaskResponse(StreamInput in) throws IOException { + DeleteRollupJobAction.Response response = new DeleteRollupJobAction.Response(); + response.readFrom(in); + return response; } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index d16f47b1a3503..3fbe77b64b41c 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -349,7 +349,7 @@ synchronized void shutdown() { * shut down from the inside. */ @Override - protected synchronized void onCancelled() { + public synchronized void onCancelled() { logger.info("Received cancellation request for Rollup job [" + job.getConfig().getId() + "], state: [" + indexer.getState() + "]"); if (indexer.abort()) { // there is no background job running, we can shutdown safely diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestDeleteRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestDeleteRollupJobAction.java index 140b7d9b76943..77d39a45ac52d 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestDeleteRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestDeleteRollupJobAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction; import org.elasticsearch.xpack.rollup.Rollup; @@ -31,7 +32,16 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient String id = restRequest.param(ID.getPreferredName()); DeleteRollupJobAction.Request request = new DeleteRollupJobAction.Request(id); - return channel -> client.execute(DeleteRollupJobAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> client.execute(DeleteRollupJobAction.INSTANCE, request, + new RestToXContentListener(channel) { + @Override + protected RestStatus getStatus(DeleteRollupJobAction.Response response) { + if (response.getNodeFailures().size() > 0 || response.getTaskFailures().size() > 0) { + return RestStatus.INTERNAL_SERVER_ERROR; + } + return RestStatus.OK; + } + }); } @Override diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml index 8eb4a358d15c9..ebf953c93527a 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml @@ -184,15 +184,18 @@ setup: - is_true: started - do: + catch: request xpack.rollup.delete_job: id: foo - - is_true: acknowledged + - is_false: acknowledged + - match: { task_failures.0.reason.type: "illegal_state_exception" } + - match: { task_failures.0.reason.reason: "Could not delete job [foo] because indexer state is [STARTED]. Job must be [STOPPED] before deletion." } --- "Test delete non-existent job": - do: - catch: /the task with id does_not_exist doesn't exist/ + catch: /the task with id \[does_not_exist\] doesn't exist/ headers: Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser xpack.rollup.delete_job: diff --git a/x-pack/qa/multi-node/src/test/java/org/elasticsearch/multi_node/RollupIT.java b/x-pack/qa/multi-node/src/test/java/org/elasticsearch/multi_node/RollupIT.java index fb9c665b2bf1c..3ea1b8e67471c 100644 --- a/x-pack/qa/multi-node/src/test/java/org/elasticsearch/multi_node/RollupIT.java +++ b/x-pack/qa/multi-node/src/test/java/org/elasticsearch/multi_node/RollupIT.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.multi_node; -import org.apache.http.HttpStatus; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.apache.http.util.EntityUtils; @@ -20,16 +19,10 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.ESRestTestCase; -import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.core.watcher.support.xcontent.ObjectPath; -import org.junit.After; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; @@ -68,13 +61,6 @@ static Map toMap(String response) throws IOException { return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false); } - @After - public void clearRollupMetadata() throws Exception { - deleteAllJobs(); - waitForPendingTasks(); - // indices will be deleted by the ESRestTestCase class - } - public void testBigRollup() throws Exception { final int numDocs = 200; String dateFormat = "strict_date_optional_time"; @@ -293,60 +279,4 @@ private Map getJob(Map jobsMap, String targetJob } return null; } - - private void waitForPendingTasks() throws Exception { - ESTestCase.assertBusy(() -> { - try { - Request request = new Request("GET", "/_cat/tasks"); - request.addParameter("detailed", "true"); - Response response = adminClient().performRequest(request); - if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { - try (BufferedReader responseReader = new BufferedReader( - new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) { - int activeTasks = 0; - String line; - StringBuilder tasksListString = new StringBuilder(); - while ((line = responseReader.readLine()) != null) { - - // We only care about Rollup jobs, otherwise this fails too easily due to unrelated tasks - if (line.startsWith(RollupJob.NAME) == true) { - activeTasks++; - tasksListString.append(line); - tasksListString.append('\n'); - } - } - assertEquals(activeTasks + " active tasks found:\n" + tasksListString, 0, activeTasks); - } - } - } catch (IOException e) { - throw new AssertionError("Error getting active tasks list", e); - } - }); - } - - @SuppressWarnings("unchecked") - private void deleteAllJobs() throws Exception { - Request request = new Request("GET", "/_xpack/rollup/job/_all"); - Response response = adminClient().performRequest(request); - Map jobs = ESRestTestCase.entityAsMap(response); - @SuppressWarnings("unchecked") - List> jobConfigs = - (List>) XContentMapValues.extractValue("jobs", jobs); - - if (jobConfigs == null) { - return; - } - - for (Map jobConfig : jobConfigs) { - logger.debug(jobConfig); - String jobId = (String) ((Map) jobConfig.get("config")).get("id"); - logger.debug("Deleting job " + jobId); - try { - request = new Request("DELETE", "/_xpack/rollup/job/" + jobId); - adminClient().performRequest(request); - } catch (Exception e) { - // ok - } - } - } }