From 9b095eb765fb08e192bc4960281f1f55d4ed826c Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 12 Dec 2024 14:39:25 -0600 Subject: [PATCH] Adding a migration reindex cancel API (#118291) This introduces the migration reindex cancel API, which cancels a migration reindex task for a given data stream name that was started with #118109. For example: ``` POST localhost:9200/_migration/reindex/my-data-stream/_cancel?pretty ``` returns ``` { "acknowledged" : true } ``` This cancels the task, and cancels any ongoing reindexing of backing indices, but does not do any cleanup. --- docs/changelog/118291.yaml | 5 ++ .../api/migrate.cancel_reindex.json | 30 +++++++ .../ReindexDataStreamTransportActionIT.java | 17 ++++ .../xpack/migrate/MigratePlugin.java | 5 ++ .../action/CancelReindexDataStreamAction.java | 90 +++++++++++++++++++ ...ancelReindexDataStreamTransportAction.java | 57 ++++++++++++ .../RestCancelReindexDataStreamAction.java | 39 ++++++++ ...indexDataStreamPersistentTaskExecutor.java | 6 +- .../migrate/task/ReindexDataStreamTask.java | 45 ++++++++-- .../CancelReindexDataStreamRequestTests.java | 32 +++++++ .../xpack/security/operator/Constants.java | 1 + .../rest-api-spec/test/migrate/10_reindex.yml | 31 ++++--- .../test/migrate/20_reindex_status.yml | 58 +++++++----- 13 files changed, 371 insertions(+), 45 deletions(-) create mode 100644 docs/changelog/118291.yaml create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/migrate.cancel_reindex.json create mode 100644 x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamAction.java create mode 100644 x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamTransportAction.java create mode 100644 x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestCancelReindexDataStreamAction.java create mode 100644 x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamRequestTests.java diff --git a/docs/changelog/118291.yaml b/docs/changelog/118291.yaml new file mode 100644 index 0000000000000..8001b3972e876 --- /dev/null +++ b/docs/changelog/118291.yaml @@ -0,0 +1,5 @@ +pr: 118291 +summary: Adding a migration reindex cancel API +area: Data streams +type: enhancement +issues: [] diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/migrate.cancel_reindex.json b/rest-api-spec/src/main/resources/rest-api-spec/api/migrate.cancel_reindex.json new file mode 100644 index 0000000000000..a034f204edbfb --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/migrate.cancel_reindex.json @@ -0,0 +1,30 @@ +{ + "migrate.cancel_reindex":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-stream-reindex.html", + "description":"This API returns the status of a migration reindex attempt for a data stream or index" + }, + "stability":"experimental", + "visibility":"private", + "headers":{ + "accept": [ "application/json"], + "content_type": ["application/json"] + }, + "url":{ + "paths":[ + { + "path":"/_migration/reindex/{index}/_cancel", + "methods":[ + "POST" + ], + "parts":{ + "index":{ + "type":"string", + "description":"The index or data stream name" + } + } + } + ] + } + } +} diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java index 7f2243ed76849..6e24e644cb2af 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java @@ -117,6 +117,23 @@ public void testAlreadyUpToDateDataStream() throws Exception { assertThat(status.totalIndices(), equalTo(backingIndexCount)); assertThat(status.totalIndicesToBeUpgraded(), equalTo(0)); }); + AcknowledgedResponse cancelResponse = client().execute( + CancelReindexDataStreamAction.INSTANCE, + new CancelReindexDataStreamAction.Request(dataStreamName) + ).actionGet(); + assertNotNull(cancelResponse); + assertThrows( + ResourceNotFoundException.class, + () -> client().execute(CancelReindexDataStreamAction.INSTANCE, new CancelReindexDataStreamAction.Request(dataStreamName)) + .actionGet() + ); + assertThrows( + ResourceNotFoundException.class, + () -> client().execute( + new ActionType(GetMigrationReindexStatusAction.NAME), + new GetMigrationReindexStatusAction.Request(dataStreamName) + ).actionGet() + ); } private int createDataStream(String dataStreamName) { diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java index 1af66a2c61d56..26f8e57102a4d 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java @@ -32,10 +32,13 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction; +import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamTransportAction; import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction; import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusTransportAction; import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction; import org.elasticsearch.xpack.migrate.action.ReindexDataStreamTransportAction; +import org.elasticsearch.xpack.migrate.rest.RestCancelReindexDataStreamAction; import org.elasticsearch.xpack.migrate.rest.RestGetMigrationReindexStatusAction; import org.elasticsearch.xpack.migrate.rest.RestMigrationReindexAction; import org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor; @@ -69,6 +72,7 @@ public List getRestHandlers( if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) { handlers.add(new RestMigrationReindexAction()); handlers.add(new RestGetMigrationReindexStatusAction()); + handlers.add(new RestCancelReindexDataStreamAction()); } return handlers; } @@ -79,6 +83,7 @@ public List getRestHandlers( if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) { actions.add(new ActionHandler<>(ReindexDataStreamAction.INSTANCE, ReindexDataStreamTransportAction.class)); actions.add(new ActionHandler<>(GetMigrationReindexStatusAction.INSTANCE, GetMigrationReindexStatusTransportAction.class)); + actions.add(new ActionHandler<>(CancelReindexDataStreamAction.INSTANCE, CancelReindexDataStreamTransportAction.class)); } return actions; } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamAction.java new file mode 100644 index 0000000000000..635d8b8f30978 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamAction.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +public class CancelReindexDataStreamAction extends ActionType { + + public static final CancelReindexDataStreamAction INSTANCE = new CancelReindexDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/reindex_cancel"; + + public CancelReindexDataStreamAction() { + super(NAME); + } + + public static class Request extends ActionRequest implements IndicesRequest { + private final String index; + + public Request(String index) { + super(); + this.index = index; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.index = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(index); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public boolean getShouldStoreResult() { + return true; + } + + public String getIndex() { + return index; + } + + @Override + public int hashCode() { + return Objects.hashCode(index); + } + + @Override + public boolean equals(Object other) { + return other instanceof Request && index.equals(((Request) other).index); + } + + public Request nodeRequest(String thisNodeId, long thisTaskId) { + Request copy = new Request(index); + copy.setParentTask(thisNodeId, thisTaskId); + return copy; + } + + @Override + public String[] indices() { + return new String[] { index }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + } + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamTransportAction.java new file mode 100644 index 0000000000000..00a846bf7eb9a --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamTransportAction.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction.Request; + +public class CancelReindexDataStreamTransportAction extends HandledTransportAction { + private final PersistentTasksService persistentTasksService; + + @Inject + public CancelReindexDataStreamTransportAction( + TransportService transportService, + ActionFilters actionFilters, + PersistentTasksService persistentTasksService + ) { + super(CancelReindexDataStreamAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); + this.persistentTasksService = persistentTasksService; + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + String index = request.getIndex(); + String persistentTaskId = ReindexDataStreamAction.TASK_ID_PREFIX + index; + /* + * This removes the persistent task from the cluster state and results in the running task being cancelled (but not removed from + * the task manager). The running task is removed from the task manager in ReindexDataStreamTask::onCancelled, which is called as + * as result of this. + */ + persistentTasksService.sendRemoveRequest(persistentTaskId, TimeValue.MAX_VALUE, new ActionListener<>() { + @Override + public void onResponse(PersistentTasksCustomMetadata.PersistentTask persistentTask) { + listener.onResponse(AcknowledgedResponse.TRUE); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestCancelReindexDataStreamAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestCancelReindexDataStreamAction.java new file mode 100644 index 0000000000000..0bd68e8b2df73 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestCancelReindexDataStreamAction.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.rest; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestCancelReindexDataStreamAction extends BaseRestHandler { + + @Override + public String getName() { + return "cancel_reindex_data_stream_action"; + } + + @Override + public List routes() { + return List.of(new Route(POST, "/_migration/reindex/{index}/_cancel")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String index = request.param("index"); + CancelReindexDataStreamAction.Request cancelTaskRequest = new CancelReindexDataStreamAction.Request(index); + return channel -> client.execute(CancelReindexDataStreamAction.INSTANCE, cancelTaskRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index 7ec5014b9edff..176220a1ccae8 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -91,13 +91,11 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask } private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) { - persistentTask.allReindexesCompleted(); - threadPool.schedule(persistentTask::markAsCompleted, getTimeToLive(persistentTask), threadPool.generic()); + persistentTask.allReindexesCompleted(threadPool, getTimeToLive(persistentTask)); } private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) { - persistentTask.taskFailed(e); - threadPool.schedule(() -> persistentTask.markAsFailed(e), getTimeToLive(persistentTask), threadPool.generic()); + persistentTask.taskFailed(threadPool, getTimeToLive(persistentTask), e); } private TimeValue getTimeToLive(ReindexDataStreamTask reindexDataStreamTask) { diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java index 72ddb87e9dea5..844f24f45ab77 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java @@ -7,9 +7,12 @@ package org.elasticsearch.xpack.migrate.task; +import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.List; @@ -21,12 +24,14 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask { private final long persistentTaskStartTime; private final int totalIndices; private final int totalIndicesToBeUpgraded; - private boolean complete = false; - private Exception exception; - private AtomicInteger inProgress = new AtomicInteger(0); - private AtomicInteger pending = new AtomicInteger(); - private List> errors = new ArrayList<>(); + private volatile boolean complete = false; + private volatile Exception exception; + private final AtomicInteger inProgress = new AtomicInteger(0); + private final AtomicInteger pending = new AtomicInteger(); + private final List> errors = new ArrayList<>(); + private final RunOnce completeTask; + @SuppressWarnings("this-escape") public ReindexDataStreamTask( long persistentTaskStartTime, int totalIndices, @@ -42,6 +47,13 @@ public ReindexDataStreamTask( this.persistentTaskStartTime = persistentTaskStartTime; this.totalIndices = totalIndices; this.totalIndicesToBeUpgraded = totalIndicesToBeUpgraded; + this.completeTask = new RunOnce(() -> { + if (exception == null) { + markAsCompleted(); + } else { + markAsFailed(exception); + } + }); } @Override @@ -58,13 +70,18 @@ public ReindexDataStreamStatus getStatus() { ); } - public void allReindexesCompleted() { + public void allReindexesCompleted(ThreadPool threadPool, TimeValue timeToLive) { this.complete = true; + if (isCancelled()) { + completeTask.run(); + } else { + threadPool.schedule(completeTask, timeToLive, threadPool.generic()); + } } - public void taskFailed(Exception e) { - this.complete = true; + public void taskFailed(ThreadPool threadPool, TimeValue timeToLive, Exception e) { this.exception = e; + allReindexesCompleted(threadPool, timeToLive); } public void reindexSucceeded() { @@ -84,4 +101,16 @@ public void incrementInProgressIndicesCount() { public void setPendingIndicesCount(int size) { pending.set(size); } + + @Override + public void onCancelled() { + /* + * If the task is complete, but just waiting for its scheduled removal, we go ahead and call markAsCompleted/markAsFailed + * immediately. This results in the running task being removed from the task manager. If the task is not complete, then one of + * allReindexesCompleted or taskFailed will be called in the future, resulting in the same thing. + */ + if (complete) { + completeTask.run(); + } + } } diff --git a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamRequestTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamRequestTests.java new file mode 100644 index 0000000000000..187561dae19b0 --- /dev/null +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamRequestTests.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction.Request; + +import java.io.IOException; + +public class CancelReindexDataStreamRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + return new Request(randomAlphaOfLength(30)); + } + + @Override + protected Request mutateInstance(Request instance) throws IOException { + return new Request(instance.getIndex() + randomAlphaOfLength(5)); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index db87fdbcb8f1f..b139d1526ec20 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -639,6 +639,7 @@ public class Constants { "internal:index/metadata/migration_version/update", new FeatureFlag("reindex_data_stream").isEnabled() ? "indices:admin/migration/reindex_status" : null, new FeatureFlag("reindex_data_stream").isEnabled() ? "indices:admin/data_stream/reindex" : null, + new FeatureFlag("reindex_data_stream").isEnabled() ? "indices:admin/data_stream/reindex_cancel" : null, "internal:admin/repository/verify", "internal:admin/repository/verify/coordinate" ).filter(Objects::nonNull).collect(Collectors.toUnmodifiableSet()); diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/10_reindex.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/10_reindex.yml index f50a7a65f53d3..9fb33b43f042f 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/10_reindex.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/10_reindex.yml @@ -104,14 +104,23 @@ setup: name: my-data-stream - is_true: acknowledged -# Uncomment once the cancel API is in place -# - do: -# migrate.reindex: -# body: | -# { -# "mode": "upgrade", -# "source": { -# "index": "my-data-stream" -# } -# } -# - match: { task: "reindex-data-stream-my-data-stream" } + - do: + migrate.reindex: + body: | + { + "mode": "upgrade", + "source": { + "index": "my-data-stream" + } + } + - match: { acknowledged: true } + + - do: + migrate.cancel_reindex: + index: "my-data-stream" + - match: { acknowledged: true } + + - do: + catch: /resource_not_found_exception/ + migrate.cancel_reindex: + index: "my-data-stream" diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/20_reindex_status.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/20_reindex_status.yml index ae343a0b4db95..c94ce8dd211ae 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/20_reindex_status.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/20_reindex_status.yml @@ -46,25 +46,39 @@ setup: name: my-data-stream - is_true: acknowledged -# Uncomment once the cancel API is in place -# - do: -# migrate.reindex: -# body: | -# { -# "mode": "upgrade", -# "source": { -# "index": "my-data-stream" -# } -# } -# - match: { acknowledged: true } -# -# - do: -# migrate.get_reindex_status: -# index: "my-data-stream" -# - match: { complete: true } -# - match: { total_indices: 1 } -# - match: { total_indices_requiring_upgrade: 0 } -# - match: { successes: 0 } -# - match: { in_progress: 0 } -# - match: { pending: 0 } -# - match: { errors: [] } + - do: + migrate.reindex: + body: | + { + "mode": "upgrade", + "source": { + "index": "my-data-stream" + } + } + - match: { acknowledged: true } + + - do: + migrate.get_reindex_status: + index: "my-data-stream" + - match: { complete: true } + - match: { total_indices: 1 } + - match: { total_indices_requiring_upgrade: 0 } + - match: { successes: 0 } + - match: { in_progress: 0 } + - match: { pending: 0 } + - match: { errors: [] } + + - do: + migrate.cancel_reindex: + index: "my-data-stream" + - match: { acknowledged: true } + + - do: + catch: /resource_not_found_exception/ + migrate.cancel_reindex: + index: "my-data-stream" + + - do: + catch: /resource_not_found_exception/ + migrate.get_reindex_status: + index: "my-data-stream"