diff --git a/docs/changelog/106989.yaml b/docs/changelog/106989.yaml new file mode 100644 index 0000000000000..47df5fe5b47d7 --- /dev/null +++ b/docs/changelog/106989.yaml @@ -0,0 +1,7 @@ +pr: 106989 +summary: Make force-stopping the transform always remove persistent task from cluster + state +area: Transform +type: bug +issues: + - 106811 diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java index e7d54028caa20..4db0d0d8baaf1 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java @@ -241,38 +241,39 @@ public void testTransformLifecycleInALoop() throws Exception { long sleepAfterStartMillis = randomLongBetween(0, 5_000); boolean force = randomBoolean(); try { - // Create the continuous transform + // Create the continuous transform. putTransform(transformId, config, RequestOptions.DEFAULT); assertThat(getTransformTasks(), is(empty())); assertThat(getTransformTasksFromClusterState(transformId), is(empty())); startTransform(transformId, RequestOptions.DEFAULT); - // There is 1 transform task after start + // There is 1 transform task after start. assertThat(getTransformTasks(), hasSize(1)); assertThat(getTransformTasksFromClusterState(transformId), hasSize(1)); Thread.sleep(sleepAfterStartMillis); - // There should still be 1 transform task as the transform is continuous + // There should still be 1 transform task as the transform is continuous. assertThat(getTransformTasks(), hasSize(1)); assertThat(getTransformTasksFromClusterState(transformId), hasSize(1)); - // Stop the transform with force set randomly + // Stop the transform with force set randomly. stopTransform(transformId, true, null, false, force); - // After the transform is stopped, there should be no transform task left - assertThat(getTransformTasks(), is(empty())); + if (force) { + // If the "force" has been used, then the persistent task is removed from the cluster state but the local task can still + // be seen by the PersistentTasksNodeService. We need to wait until PersistentTasksNodeService reconciles the state. + assertBusy(() -> assertThat(getTransformTasks(), is(empty()))); + } else { + // If the "force" hasn't been used then we can expect the local task to be already gone. + assertThat(getTransformTasks(), is(empty())); + } + // After the transform is stopped, there should be no transform task left in the cluster state. assertThat(getTransformTasksFromClusterState(transformId), is(empty())); // Delete the transform deleteTransform(transformId); } catch (AssertionError | Exception e) { throw new AssertionError( - format( - "Failure at iteration %d (sleepAfterStartMillis=%s,force=%s): %s", - i, - sleepAfterStartMillis, - force, - e.getMessage() - ), + format("Failure at iteration %d (sleepAfterStart=%sms,force=%s): %s", i, sleepAfterStartMillis, force, e.getMessage()), e ); } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index eb1a1258d5a96..4cc9a31c8eff5 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -65,9 +65,6 @@ public abstract class TransformRestTestCase extends TransformCommonRestTestCase { - protected static final String AUTH_KEY = "Authorization"; - protected static final String SECONDARY_AUTH_KEY = "es-secondary-authorization"; - private final Set createdTransformIds = new HashSet<>(); protected void cleanUp() throws Exception { diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java index 0f807fbae45d1..4b7c42968f557 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java @@ -10,7 +10,6 @@ import org.elasticsearch.client.Request; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.xcontent.support.XContentMapValues; -import org.elasticsearch.core.Strings; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; @@ -19,6 +18,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.core.Strings.format; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -86,10 +86,10 @@ public void testTaskRemovalAfterInternalIndexGotDeleted() throws Exception { deleteTransform(transformId); } - public void testCreateAndDeleteTransformInALoop() throws IOException { + public void testBatchTransformLifecycltInALoop() throws IOException { createReviewsIndex(); - String transformId = "test_create_and_delete_in_a_loop"; + String transformId = "test_batch_lifecycle_in_a_loop"; String destIndex = transformId + "-dest"; for (int i = 0; i < 100; ++i) { try { @@ -108,7 +108,48 @@ public void testCreateAndDeleteTransformInALoop() throws IOException { // Delete the transform deleteTransform(transformId); } catch (AssertionError | Exception e) { - fail("Failure at iteration " + i + ": " + e.getMessage()); + throw new AssertionError(format("Failure at iteration %d: %s", i, e.getMessage()), e); + } + } + } + + public void testContinuousTransformLifecycleInALoop() throws Exception { + createReviewsIndex(); + + String transformId = "test_cont_lifecycle_in_a_loop"; + String destIndex = transformId + "-dest"; + for (int i = 0; i < 100; ++i) { + long sleepAfterStartMillis = randomLongBetween(0, 5_000); + boolean force = randomBoolean(); + try { + // Create the continuous transform. + createContinuousPivotReviewsTransform(transformId, destIndex, null); + assertThat(getTransformTasks(), is(empty())); + assertThat(getTransformTasksFromClusterState(transformId), is(empty())); + + startTransform(transformId); + // There is 1 transform task after start. + assertThat(getTransformTasks(), hasSize(1)); + assertThat(getTransformTasksFromClusterState(transformId), hasSize(1)); + + Thread.sleep(sleepAfterStartMillis); + // There should still be 1 transform task as the transform is continuous. + assertThat(getTransformTasks(), hasSize(1)); + assertThat(getTransformTasksFromClusterState(transformId), hasSize(1)); + + // Stop the transform with force set randomly. + stopTransform(transformId, force); + // After the transform is stopped, there should be no transform task left. + assertThat(getTransformTasks(), is(empty())); + assertThat(getTransformTasksFromClusterState(transformId), is(empty())); + + // Delete the transform. + deleteTransform(transformId); + } catch (AssertionError | Exception e) { + throw new AssertionError( + format("Failure at iteration %d (sleepAfterStart=%sms,force=%s): %s", i, sleepAfterStartMillis, force, e.getMessage()), + e + ); } } } @@ -168,7 +209,7 @@ private void beEvilAndDeleteTheTransformIndex() throws IOException { } private static String createConfig(String sourceIndex, String destIndex) { - return Strings.format(""" + return format(""" { "source": { "index": "%s" diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java index bccd97f22b4a1..5ab65ca023506 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java @@ -65,6 +65,7 @@ public void testForceStopFailedTransform() throws Exception { createContinuousPivotReviewsTransform(transformId, transformIndex, null); assertThat(getTransformTasks(), is(empty())); + assertThat(getTransformTasksFromClusterState(transformId), is(empty())); startTransform(transformId); awaitState(transformId, TransformStats.State.FAILED); @@ -78,6 +79,7 @@ public void testForceStopFailedTransform() throws Exception { assertThat((String) XContentMapValues.extractValue("reason", fullState), startsWith(failureReason)); assertThat(getTransformTasks(), hasSize(1)); + assertThat(getTransformTasksFromClusterState(transformId), hasSize(1)); // verify that we cannot stop a failed transform ResponseException ex = expectThrows(ResponseException.class, () -> stopTransform(transformId, false)); @@ -99,6 +101,7 @@ public void testForceStopFailedTransform() throws Exception { assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue())); assertThat(getTransformTasks(), is(empty())); + assertThat(getTransformTasksFromClusterState(transformId), is(empty())); } public void testForceResetFailedTransform() throws Exception { @@ -109,6 +112,7 @@ public void testForceResetFailedTransform() throws Exception { createContinuousPivotReviewsTransform(transformId, transformIndex, null); assertThat(getTransformTasks(), is(empty())); + assertThat(getTransformTasksFromClusterState(transformId), is(empty())); startTransform(transformId); awaitState(transformId, TransformStats.State.FAILED); @@ -122,6 +126,7 @@ public void testForceResetFailedTransform() throws Exception { assertThat((String) XContentMapValues.extractValue("reason", fullState), startsWith(failureReason)); assertThat(getTransformTasks(), hasSize(1)); + assertThat(getTransformTasksFromClusterState(transformId), hasSize(1)); // verify that we cannot reset a failed transform ResponseException ex = expectThrows(ResponseException.class, () -> resetTransform(transformId, false)); @@ -135,6 +140,7 @@ public void testForceResetFailedTransform() throws Exception { resetTransform(transformId, true); assertThat(getTransformTasks(), is(empty())); + assertThat(getTransformTasksFromClusterState(transformId), is(empty())); } public void testStartFailedTransform() throws Exception { @@ -145,6 +151,7 @@ public void testStartFailedTransform() throws Exception { createContinuousPivotReviewsTransform(transformId, transformIndex, null); assertThat(getTransformTasks(), is(empty())); + assertThat(getTransformTasksFromClusterState(transformId), is(empty())); startTransform(transformId); awaitState(transformId, TransformStats.State.FAILED); @@ -158,6 +165,7 @@ public void testStartFailedTransform() throws Exception { assertThat((String) XContentMapValues.extractValue("reason", fullState), startsWith(failureReason)); assertThat(getTransformTasks(), hasSize(1)); + assertThat(getTransformTasksFromClusterState(transformId), hasSize(1)); var expectedFailure = "Unable to start transform [test-force-start-failed-transform] " + "as it is in a failed state. Use force stop and then restart the transform once error is resolved. More details: [" @@ -172,6 +180,7 @@ public void testStartFailedTransform() throws Exception { stopTransform(transformId, true); assertThat(getTransformTasks(), is(empty())); + assertThat(getTransformTasksFromClusterState(transformId), is(empty())); } private void awaitState(String transformId, TransformStats.State state) throws Exception { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java index b8ea1fee6e886..1996012ccdf58 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java @@ -164,18 +164,23 @@ protected void doExecute(Task task, Request request, ActionListener li state ); - final ActionListener doExecuteListener; - if (transformNodeAssignments.getWaitingForAssignment().size() > 0) { - doExecuteListener = cancelTransformTasksWithNoAssignment(finalListener, transformNodeAssignments); - } else { - doExecuteListener = finalListener; - } + final ActionListener doExecuteListener = cancelTransformTasksListener( + transformNodeAssignments.getWaitingForAssignment(), + finalListener + ); - if (transformNodeAssignments.getExecutorNodes().size() > 0) { + if (request.isForce()) { + // When force==true, we **do not** fan out to individual tasks (i.e. taskOperation method will not be called) as we + // want to make sure that the persistent tasks will be removed from cluster state even if these tasks are no longer + // visible by the PersistentTasksService. + cancelTransformTasksListener(transformNodeAssignments.getAssigned(), doExecuteListener).onResponse( + new Response(true) + ); + } else if (transformNodeAssignments.getExecutorNodes().isEmpty()) { + doExecuteListener.onResponse(new Response(true)); + } else { request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0])); super.doExecute(task, request, doExecuteListener); - } else { - doExecuteListener.onResponse(new Response(true)); } }, e -> { if (e instanceof ResourceNotFoundException) { @@ -189,13 +194,10 @@ protected void doExecute(Task task, Request request, ActionListener li listener.onFailure(e); // found transforms without a config } else if (request.isForce()) { - final ActionListener doExecuteListener; - - if (transformNodeAssignments.getWaitingForAssignment().size() > 0) { - doExecuteListener = cancelTransformTasksWithNoAssignment(finalListener, transformNodeAssignments); - } else { - doExecuteListener = finalListener; - } + final ActionListener doExecuteListener = cancelTransformTasksListener( + transformNodeAssignments.getWaitingForAssignment(), + finalListener + ); if (transformNodeAssignments.getExecutorNodes().size() > 0) { request.setExpandedIds(transformNodeAssignments.getAssigned()); @@ -235,7 +237,6 @@ protected void taskOperation( TransformTask transformTask, ActionListener listener ) { - Set ids = request.getExpandedIds(); if (ids == null) { listener.onFailure(new IllegalStateException("Request does not have expandedIds set")); @@ -243,20 +244,6 @@ protected void taskOperation( } if (ids.contains(transformTask.getTransformId())) { - if (request.isForce()) { - // If force==true, we skip the additional step (setShouldStopAtCheckpoint) and move directly to shutting down the task. - // This way we ensure that the persistent task is removed ASAP (as opposed to being removed in one of the listeners). - try { - // Here the task is deregistered in scheduler and marked as completed in persistent task service. - transformTask.shutdown(); - // Here the indexer is aborted so that its thread finishes work ASAP. - transformTask.onCancelled(); - listener.onResponse(new Response(true)); - } catch (ElasticsearchException ex) { - listener.onFailure(ex); - } - return; - } // move the call to the generic thread pool, so we do not block the network thread threadPool.generic().execute(() -> { transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap(r -> { @@ -306,7 +293,6 @@ protected StopTransformAction.Response newResponse( } private ActionListener waitForStopListener(Request request, ActionListener listener) { - ActionListener onStopListener = ActionListener.wrap( waitResponse -> transformConfigManager.refresh(ActionListener.wrap(r -> listener.onResponse(waitResponse), e -> { if ((ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) == false) { @@ -393,6 +379,7 @@ private void waitForTransformStopped( ) { // This map is accessed in the predicate and the listener callbacks final Map exceptions = new ConcurrentHashMap<>(); + persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetadata -> { if (persistentTasksCustomMetadata == null) { return true; @@ -501,34 +488,38 @@ private void waitForTransformStopped( })); } - private ActionListener cancelTransformTasksWithNoAssignment( - final ActionListener finalListener, - final TransformNodeAssignments transformNodeAssignments + /** + * Creates and returns the listener that sends remove request for every task in the given set. + * + * @param transformTasks set of transform tasks that should be removed + * @param finalListener listener that should be called once all the given tasks are removed + * @return listener that removes given tasks in parallel + */ + private ActionListener cancelTransformTasksListener( + final Set transformTasks, + final ActionListener finalListener ) { - final ActionListener doExecuteListener = ActionListener.wrap(response -> { + if (transformTasks.isEmpty()) { + return finalListener; + } + return ActionListener.wrap(response -> { GroupedActionListener> groupedListener = new GroupedActionListener<>( - transformNodeAssignments.getWaitingForAssignment().size(), - ActionListener.wrap(r -> { - finalListener.onResponse(response); - }, finalListener::onFailure) + transformTasks.size(), + ActionListener.wrap(r -> finalListener.onResponse(response), finalListener::onFailure) ); - for (String unassignedTaskId : transformNodeAssignments.getWaitingForAssignment()) { - persistentTasksService.sendRemoveRequest(unassignedTaskId, null, groupedListener); + for (String taskId : transformTasks) { + persistentTasksService.sendRemoveRequest(taskId, null, groupedListener); } - }, e -> { GroupedActionListener> groupedListener = new GroupedActionListener<>( - transformNodeAssignments.getWaitingForAssignment().size(), - ActionListener.wrap(r -> { - finalListener.onFailure(e); - }, finalListener::onFailure) + transformTasks.size(), + ActionListener.wrap(r -> finalListener.onFailure(e), finalListener::onFailure) ); - for (String unassignedTaskId : transformNodeAssignments.getWaitingForAssignment()) { - persistentTasksService.sendRemoveRequest(unassignedTaskId, null, groupedListener); + for (String taskId : transformTasks) { + persistentTasksService.sendRemoveRequest(taskId, null, groupedListener); } }); - return doExecuteListener; } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformNodeAssignments.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformNodeAssignments.java index 7b61f0c9e8335..46f893a90aba1 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformNodeAssignments.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformNodeAssignments.java @@ -65,4 +65,18 @@ public Set getWaitingForAssignment() { public Set getStopped() { return stopped; } + + @Override + public String toString() { + return new StringBuilder("TransformNodeAssignments[").append("executorNodes=") + .append(executorNodes) + .append(",assigned=") + .append(assigned) + .append(",waitingForAssignment=") + .append(waitingForAssignment) + .append(",stopped=") + .append(stopped) + .append("]") + .toString(); + } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformNodeAssignmentsTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformNodeAssignmentsTests.java index f5c0b6046fbfe..2643d1bba652d 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformNodeAssignmentsTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformNodeAssignmentsTests.java @@ -9,8 +9,6 @@ import org.elasticsearch.test.ESTestCase; -import java.util.Arrays; -import java.util.HashSet; import java.util.Set; import static org.hamcrest.Matchers.equalTo; @@ -19,10 +17,11 @@ public class TransformNodeAssignmentsTests extends ESTestCase { public void testConstructorAndGetters() { - Set executorNodes = new HashSet<>(Arrays.asList("executor-1", "executor-2")); - Set assigned = new HashSet<>(Arrays.asList("assigned-1", "assigned-2")); - Set waitingForAssignment = new HashSet<>(Arrays.asList("waiting-1", "waitingv-2")); - Set stopped = new HashSet<>(Arrays.asList("stopped-1", "stopped-2")); + Set executorNodes = Set.of("executor-1", "executor-2"); + Set assigned = Set.of("assigned-1", "assigned-2"); + Set waitingForAssignment = Set.of("waiting-1", "waiting-2"); + Set stopped = Set.of("stopped-1", "stopped-2"); + TransformNodeAssignments assignments = new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, stopped); assertThat(assignments.getExecutorNodes(), is(equalTo(executorNodes))); @@ -30,4 +29,45 @@ public void testConstructorAndGetters() { assertThat(assignments.getWaitingForAssignment(), is(equalTo(waitingForAssignment))); assertThat(assignments.getStopped(), is(equalTo(stopped))); } + + public void testToString() { + Set executorNodes = Set.of("executor-1"); + Set assigned = Set.of("assigned-1"); + Set waitingForAssignment = Set.of("waiting-1"); + Set stopped = Set.of("stopped-1"); + + TransformNodeAssignments assignments = new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, stopped); + + assertThat( + assignments.toString(), + is( + equalTo( + "TransformNodeAssignments[" + + "executorNodes=[executor-1]," + + "assigned=[assigned-1]," + + "waitingForAssignment=[waiting-1]," + + "stopped=[stopped-1]" + + "]" + ) + ) + ); + } + + public void testToString_EmptyCollections() { + Set executorNodes = Set.of(); + Set assigned = Set.of(); + Set waitingForAssignment = Set.of(); + Set stopped = Set.of(); + + TransformNodeAssignments assignments = new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, stopped); + + assertThat( + assignments.toString(), + is( + equalTo( + "TransformNodeAssignments[" + "executorNodes=[]," + "assigned=[]," + "waitingForAssignment=[]," + "stopped=[]" + "]" + ) + ) + ); + } }