Skip to content

Commit

Permalink
[Transform] Fix _reset API when called with force=true on a faile…
Browse files Browse the repository at this point in the history
…d transform (#106574) (#106589)
  • Loading branch information
przemekwitek authored Mar 21, 2024
1 parent d1c98f3 commit c212abd
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 28 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/106574.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 106574
summary: Fix `_reset` API when called with `force=true` on a failed transform
area: Transform
type: bug
issues:
- 106573
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ public class TransformResetIT extends TransformRestTestCase {
TEST_PASSWORD_SECURE_STRING
);
private static final String DATA_ACCESS_ROLE = "test_data_access";
private static final String SYNC_CONFIG = """
"sync": {
"time": {
"field": "timestamp"
}
},
""";

private static boolean indicesCreated = false;

Expand Down Expand Up @@ -132,6 +139,7 @@ public void testResetDeletesDestinationIndex() throws Exception {
}

private static String createConfig(String transformDestIndex) {
boolean isContinuous = randomBoolean();
return Strings.format("""
{
"dest": {
Expand All @@ -140,6 +148,7 @@ private static String createConfig(String transformDestIndex) {
"source": {
"index": "%s"
},
%s
"pivot": {
"group_by": {
"reviewer": {
Expand All @@ -156,6 +165,6 @@ private static String createConfig(String transformDestIndex) {
}
}
}
}""", transformDestIndex, REVIEWS_INDEX_NAME);
}""", transformDestIndex, REVIEWS_INDEX_NAME, isContinuous ? SYNC_CONFIG : "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -734,4 +735,26 @@ private void logAudits() throws Exception {
}
}, 5, TimeUnit.SECONDS);
}

@SuppressWarnings("unchecked")
protected List<String> getTransformTasks() throws IOException {
final Request tasksRequest = new Request("GET", "/_tasks");
tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*");
Map<String, Object> tasksResponse = entityAsMap(client().performRequest(tasksRequest));

Map<String, Object> nodes = (Map<String, Object>) tasksResponse.get("nodes");
if (nodes == null) {
return List.of();
}

List<String> foundTasks = new ArrayList<>();
for (Map.Entry<String, Object> node : nodes.entrySet()) {
Map<String, Object> nodeInfo = (Map<String, Object>) node.getValue();
Map<String, Object> tasks = (Map<String, Object>) nodeInfo.get("tasks");
if (tasks != null) {
foundTasks.addAll(tasks.keySet());
}
}
return foundTasks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -150,28 +148,6 @@ public void testCancellingTransformTask() throws Exception {
assertThat(getTransformTasks(), is(empty()));
}

@SuppressWarnings("unchecked")
private List<String> getTransformTasks() throws IOException {
final Request tasksRequest = new Request("GET", "/_tasks");
tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*");
Map<String, Object> tasksResponse = entityAsMap(client().performRequest(tasksRequest));

Map<String, Object> nodes = (Map<String, Object>) tasksResponse.get("nodes");
if (nodes == null) {
return List.of();
}

List<String> foundTasks = new ArrayList<>();
for (Entry<String, Object> node : nodes.entrySet()) {
Map<String, Object> nodeInfo = (Map<String, Object>) node.getValue();
Map<String, Object> tasks = (Map<String, Object>) nodeInfo.get("tasks");
if (tasks != null) {
foundTasks.addAll(tasks.keySet());
}
}
return foundTasks;
}

private void beEvilAndDeleteTheTransformIndex() throws IOException {
final Request deleteRequest = new Request("DELETE", TransformInternalIndexConstants.LATEST_INDEX_NAME);
deleteRequest.setOptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.matchesRegex;
import static org.hamcrest.Matchers.nullValue;

public class TransformTaskFailedStateIT extends TransformRestTestCase {

Expand Down Expand Up @@ -61,6 +63,9 @@ public void testForceStopFailedTransform() throws Exception {
String transformIndex = "failure_pivot_reviews";
createDestinationIndexWithBadMapping(transformIndex);
createContinuousPivotReviewsTransform(transformId, transformIndex, null);

assertThat(getTransformTasks(), is(empty()));

startTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Map<?, ?> fullState = getTransformStateAndStats(transformId);
Expand All @@ -72,6 +77,8 @@ public void testForceStopFailedTransform() throws Exception {
// Verify we have failed for the expected reason
assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason));

assertThat(getTransformTasks(), hasSize(1));

// verify that we cannot stop a failed transform
ResponseException ex = expectThrows(ResponseException.class, () -> stopTransform(transformId, false));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
Expand All @@ -90,6 +97,44 @@ public void testForceStopFailedTransform() throws Exception {
awaitState(transformId, TransformStats.State.STOPPED);
fullState = getTransformStateAndStats(transformId);
assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));

assertThat(getTransformTasks(), is(empty()));
}

public void testForceResetFailedTransform() throws Exception {
String transformId = "test-force-reset-failed-transform";
createReviewsIndex(REVIEWS_INDEX_NAME, 10, 27, "date", false, -1, null);
String transformIndex = "failure_pivot_reviews";
createDestinationIndexWithBadMapping(transformIndex);
createContinuousPivotReviewsTransform(transformId, transformIndex, null);

assertThat(getTransformTasks(), is(empty()));

startTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Map<?, ?> fullState = getTransformStateAndStats(transformId);
final String failureReason = "Failed to index documents into destination index due to permanent error: "
+ "\\[org.elasticsearch.xpack.transform.transforms.BulkIndexingException: Bulk index experienced \\[7\\] "
+ "failures and at least 1 irrecoverable "
+ "\\[org.elasticsearch.xpack.transform.transforms.TransformException: Destination index mappings are "
+ "incompatible with the transform configuration.;.*";
// Verify we have failed for the expected reason
assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason));

assertThat(getTransformTasks(), hasSize(1));

// verify that we cannot reset a failed transform
ResponseException ex = expectThrows(ResponseException.class, () -> resetTransform(transformId, false));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(
(String) XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
is(equalTo("Cannot reset transform [test-force-reset-failed-transform] as the task is running. Stop the task first"))
);

// Verify that we can force reset a failed transform
resetTransform(transformId, true);

assertThat(getTransformTasks(), is(empty()));
}

public void testStartFailedTransform() throws Exception {
Expand All @@ -98,6 +143,9 @@ public void testStartFailedTransform() throws Exception {
String transformIndex = "failure_pivot_reviews";
createDestinationIndexWithBadMapping(transformIndex);
createContinuousPivotReviewsTransform(transformId, transformIndex, null);

assertThat(getTransformTasks(), is(empty()));

startTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Map<?, ?> fullState = getTransformStateAndStats(transformId);
Expand All @@ -109,6 +157,8 @@ public void testStartFailedTransform() throws Exception {
// Verify we have failed for the expected reason
assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason));

assertThat(getTransformTasks(), hasSize(1));

final String expectedFailure = "Unable to start transform \\[test-force-start-failed-transform\\] "
+ "as it is in a failed state with failure: \\["
+ failureReason
Expand All @@ -124,6 +174,8 @@ public void testStartFailedTransform() throws Exception {
}, 60, TimeUnit.SECONDS);

stopTransform(transformId, true);

assertThat(getTransformTasks(), is(empty()));
}

private void awaitState(String transformId, TransformStats.State state) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,14 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
stopTransformActionListener.onResponse(null);
return;
}
StopTransformAction.Request stopTransformRequest = new StopTransformAction.Request(request.getId(), true, false, null, true, false);
StopTransformAction.Request stopTransformRequest = new StopTransformAction.Request(
request.getId(),
true,
request.isForce(),
null,
true,
false
);
executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, StopTransformAction.INSTANCE, stopTransformRequest, stopTransformActionListener);
}

Expand Down

0 comments on commit c212abd

Please sign in to comment.