diff --git a/docs/changelog/100340.yaml b/docs/changelog/100340.yaml new file mode 100644 index 0000000000000..935cc15adb139 --- /dev/null +++ b/docs/changelog/100340.yaml @@ -0,0 +1,5 @@ +pr: 100340 +summary: Shutdown the task immediately when force == true +area: Transform +type: bug +issues: [] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index e811c38740618..b2bef94ad14e1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -529,6 +529,8 @@ private void onSearchResponse(SearchResponse searchResponse) { if (bulkRequest.numberOfActions() > 0) { stats.markStartIndexing(); doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> { + Thread.sleep(360_000); + // TODO we should check items in the response and move after accordingly to // resume the failing buckets ? if (bulkResponse.hasFailures()) { diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java index 93615efdfba2b..510f44bfc0514 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.transform.integration; +import org.apache.http.client.methods.HttpGet; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.elasticsearch.client.Request; @@ -29,20 +30,26 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource; import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource; +import org.hamcrest.Matcher; import org.junit.After; import org.junit.Before; import java.io.IOException; import java.time.Instant; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.oneOf; @@ -332,7 +339,7 @@ public void testStopWaitForCheckpoint() throws Exception { }); // waitForCheckpoint: true should make the transform continue until we hit the first checkpoint, then it will stop - stopTransform(transformId, false, null, true); + stopTransform(transformId, false, null, true, false); // Wait until the first checkpoint waitUntilCheckpoint(config.getId(), 1L); @@ -354,7 +361,7 @@ public void testStopWaitForCheckpoint() throws Exception { startTransformWithRetryOnConflict(config.getId(), RequestOptions.DEFAULT); boolean waitForCompletion = randomBoolean(); - stopTransform(transformId, waitForCompletion, null, true); + stopTransform(transformId, waitForCompletion, null, true, false); assertBusy(() -> { var stateAndStats = getTransformStats(config.getId()); @@ -494,6 +501,84 @@ public void testStartTransform_GivenTimeout_Returns408() throws Exception { } } + public void testForceStop() throws Exception { + String sourceIndex = "continuous-crud-reviews"; + createReviewsIndex(sourceIndex, 100, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow); + + String transformId = "transform-continuous-crud"; + String destIndex = transformId + "-dest"; + TransformConfig config; + { + Map groups = Map.of( + "by-day", + createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null), + "by-user", + new TermsGroupSource("user_id", null, false), + "by-business", + new TermsGroupSource("business_id", null, false) + ); + + AggregatorFactories.Builder aggs = AggregatorFactories.builder() + .addAggregator(AggregationBuilders.avg("review_score").field("stars")) + .addAggregator(AggregationBuilders.max("timestamp").field("timestamp")); + + config = createTransformConfigBuilder(transformId, destIndex, QueryConfig.matchAll(), sourceIndex).setPivotConfig( + createPivotConfig(groups, aggs) + ).setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))).build(); + } + + for (int i = 0; i < 10; ++i) { + try { + putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT); + assertThatListOfTransformPersistentTasks(is(empty())); + + startTransform(config.getId(), RequestOptions.DEFAULT); + assertThatListOfTransformPersistentTasks(hasSize(1)); + + Exception e = expectThrows(Exception.class, () -> deleteTransform(transformId)); + assertThat( + e.getMessage(), + containsString("Cannot delete transform [" + transformId + "] as the task is running. Stop the task first") + ); + assertThatListOfTransformPersistentTasks(hasSize(1)); + + stopTransform(transformId); + + if (randomBoolean()) { // Either force-stop and then delete OR just force-delete + stopTransform(transformId, true, null, false, true); + deleteTransform(transformId); + } else { + deleteTransform(transformId, true); + } + assertThatListOfTransformPersistentTasks(is(empty())); + } catch (AssertionError | Exception e) { + fail("Failure at iteration " + i + ": " + e.getMessage()); + } + } + } + + @SuppressWarnings("unchecked") + private void assertThatListOfTransformPersistentTasks(Matcher> matcher) { + Request request = new Request(HttpGet.METHOD_NAME, "/_tasks"); + Map parameters = Map.of( + "actions", + "data_frame/transforms[c]", + "group_by", + "none", + "timeout", + TimeValue.timeValueSeconds(10).getStringRep() + ); + request.addParameters(parameters); + try { + Response response = adminClient().performRequest(request); + Map responseAsMap = entityAsMap(response); + List tasksAsList = (List) responseAsMap.get("tasks"); + assertThat("Tasks were: " + tasksAsList, tasksAsList, matcher); + } catch (Exception e) { + throw new AssertionError("Failed to get pending tasks", e); + } + } + private void indexMoreDocs(long timestamp, long userId, String index) throws Exception { StringBuilder bulkBuilder = new StringBuilder(); for (int i = 0; i < 25; i++) { diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index f154b13b32add..f9f8789da01be 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -152,11 +152,16 @@ protected Map getIndexMapping(String index, RequestOptions optio } protected void stopTransform(String id) throws IOException { - stopTransform(id, true, null, false); + stopTransform(id, true, null, false, false); } - protected void stopTransform(String id, boolean waitForCompletion, @Nullable TimeValue timeout, boolean waitForCheckpoint) - throws IOException { + protected void stopTransform( + String id, + boolean waitForCompletion, + @Nullable TimeValue timeout, + boolean waitForCheckpoint, + boolean force + ) throws IOException { final Request stopTransformRequest = new Request("POST", TRANSFORM_ENDPOINT + id + "/_stop"); stopTransformRequest.addParameter(TransformField.WAIT_FOR_COMPLETION.getPreferredName(), Boolean.toString(waitForCompletion)); @@ -164,6 +169,9 @@ protected void stopTransform(String id, boolean waitForCompletion, @Nullable Tim if (timeout != null) { stopTransformRequest.addParameter(TransformField.TIMEOUT.getPreferredName(), timeout.getStringRep()); } + if (force) { + stopTransformRequest.addParameter(TransformField.FORCE.getPreferredName(), Boolean.toString(force)); + } Map stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest)); assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); } @@ -219,6 +227,7 @@ protected void deleteTransform(String id, boolean force) throws IOException { request.addParameter("force", "true"); } assertOK(adminClient().performRequest(request)); + createdTransformIds.remove(id); } protected void putTransform(String id, String config, RequestOptions options) throws IOException { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index 00fa7f200a3c3..c9ded338cbf18 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -167,7 +167,7 @@ protected void doNextBulk(BulkRequest request, ActionListener next ); } - protected void handleBulkResponse(BulkResponse bulkResponse, ActionListener nextPhase) { + protected void handleBulkResponse(BulkResponse bulkResponse, ActionListener nextPhase) throws InterruptedException { if (bulkResponse.hasFailures() == false) { // We don't know the of failures that have occurred (searching, processing, indexing, etc.), // but if we search, process and bulk index then we have @@ -527,6 +527,8 @@ void doSearch(Tuple namedSearchRequest, ActionListener { // check if the error has been caused by a missing search context, which could be a timed out pit