From f989beb7399c92675d3cf3a17d801c75ffde8b30 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 22 Aug 2019 14:44:34 +0200 Subject: [PATCH] [ML-DataFrame] version data frame transform internal index (#45375) Adds index versioning for the internal data frame transform index. Allows for new indices to be created and referenced, `GET` requests now query over the index pattern and takes the latest doc (based on INDEX name). --- .../AbstractTransportGetResourcesAction.java | 15 +- .../xpack/core/dataframe/DataFrameField.java | 3 +- .../transforms/DataFrameTransform.java | 5 +- .../transforms/DataFrameTransformConfig.java | 18 +- .../DataFrameConfigurationIndexIT.java | 4 +- .../integration/DataFrameRestTestCase.java | 2 +- .../DataFrameTransformInternalIndexIT.java | 132 ++++++ .../integration/DataFrameUsageIT.java | 4 +- .../xpack/dataframe/DataFrame.java | 2 +- .../DataFrameInfoTransportAction.java | 2 +- .../DataFrameUsageTransportAction.java | 2 +- ...TransportGetDataFrameTransformsAction.java | 10 +- ...TransportStopDataFrameTransformAction.java | 2 +- ...ansportUpdateDataFrameTransformAction.java | 33 +- .../persistence/DataFrameInternalIndex.java | 52 ++- .../DataFrameTransformsConfigManager.java | 406 ++++++++++++------ .../dataframe/persistence/DataframeIndex.java | 2 +- ...FrameTransformPersistentTasksExecutor.java | 2 +- .../transforms/DataFrameTransformTask.java | 21 +- .../DataFrameSingleNodeTestCase.java | 4 +- ...DataFrameTransformsConfigManagerTests.java | 79 ++++ ...TransformPersistentTasksExecutorTests.java | 4 +- 22 files changed, 607 insertions(+), 197 deletions(-) create mode 100644 x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformInternalIndexIT.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/AbstractTransportGetResourcesAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/AbstractTransportGetResourcesAction.java index 41e2605d9dba3..f4cc279aad7bc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/AbstractTransportGetResourcesAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/AbstractTransportGetResourcesAction.java @@ -80,6 +80,7 @@ protected void searchResources(AbstractGetResourcesRequest request, ActionListen sourceBuilder.from(request.getPageParams().getFrom()) .size(request.getPageParams().getSize()); } + sourceBuilder.trackTotalHits(true); IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; SearchRequest searchRequest = new SearchRequest(getIndices()) @@ -88,7 +89,7 @@ protected void searchResources(AbstractGetResourcesRequest request, ActionListen indicesOptions.expandWildcardsOpen(), indicesOptions.expandWildcardsClosed(), indicesOptions)) - .source(sourceBuilder.trackTotalHits(true)); + .source(customSearchOptions(sourceBuilder)); executeAsyncWithOrigin(client.threadPool().getThreadContext(), executionOrigin(), @@ -105,8 +106,12 @@ public void onResponse(SearchResponse response) { XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) { Resource resource = parse(parser); - docs.add(resource); - foundResourceIds.add(extractIdFromResource(resource)); + String id = extractIdFromResource(resource); + // Do not include a resource with the same ID twice + if (foundResourceIds.contains(id) == false) { + docs.add(resource); + foundResourceIds.add(id); + } } catch (IOException e) { this.onFailure(e); } @@ -159,6 +164,10 @@ private QueryBuilder buildQuery(String[] tokens, String resourceIdField) { return boolQuery.hasClauses() ? boolQuery : QueryBuilders.matchAllQuery(); } + protected SearchSourceBuilder customSearchOptions(SearchSourceBuilder searchSourceBuilder) { + return searchSourceBuilder; + } + @Nullable protected QueryBuilder additionalQuery() { return null; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java index c0c209a9b542b..56867ee902973 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java @@ -26,6 +26,8 @@ public final class DataFrameField { public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type"); public static final ParseField SOURCE = new ParseField("source"); public static final ParseField DESCRIPTION = new ParseField("description"); + public static final ParseField VERSION = new ParseField("version"); + public static final ParseField CREATE_TIME = new ParseField("create_time"); public static final ParseField DESTINATION = new ParseField("dest"); public static final ParseField FREQUENCY = new ParseField("frequency"); public static final ParseField FORCE = new ParseField("force"); @@ -65,7 +67,6 @@ public final class DataFrameField { // strings for meta information public static final String META_FIELDNAME = "_data_frame"; public static final String CREATION_DATE_MILLIS = "creation_date_in_millis"; - public static final String VERSION = "version"; public static final String CREATED = "created"; public static final String CREATED_BY = "created_by"; public static final String TRANSFORM = "transform"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java index d6e37c80dfe21..4319a35c8f148 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java @@ -24,7 +24,6 @@ public class DataFrameTransform extends AbstractDiffable implements PersistentTaskParams { public static final String NAME = DataFrameField.TASK_NAME; - public static final ParseField VERSION = new ParseField(DataFrameField.VERSION); public static final ParseField FREQUENCY = DataFrameField.FREQUENCY; private final String transformId; @@ -36,7 +35,7 @@ public class DataFrameTransform extends AbstractDiffable imp static { PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameField.ID); - PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), VERSION); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), DataFrameField.VERSION); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FREQUENCY); } @@ -90,7 +89,7 @@ public void writeTo(StreamOutput out) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(DataFrameField.ID.getPreferredName(), transformId); - builder.field(VERSION.getPreferredName(), version); + builder.field(DataFrameField.VERSION.getPreferredName(), version); if (frequency != null) { builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java index fe31eaffbeffc..62865f5e1e5d3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java @@ -47,8 +47,6 @@ public class DataFrameTransformConfig extends AbstractDiffable STRICT_PARSER = createParser(false); private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); static final int MAX_DESCRIPTION_LENGTH = 1_000; @@ -98,8 +96,8 @@ private static ConstructingObjectParser create // on strict parsing do not allow injection of headers, transform version, or create time if (lenient == false) { validateStrictParsingParams(args[6], HEADERS.getPreferredName()); - validateStrictParsingParams(args[9], CREATE_TIME.getPreferredName()); - validateStrictParsingParams(args[10], VERSION.getPreferredName()); + validateStrictParsingParams(args[9], DataFrameField.CREATE_TIME.getPreferredName()); + validateStrictParsingParams(args[10], DataFrameField.VERSION.getPreferredName()); } @SuppressWarnings("unchecked") @@ -132,8 +130,9 @@ private static ConstructingObjectParser create parser.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p, lenient), PIVOT_TRANSFORM); parser.declareString(optionalConstructorArg(), DataFrameField.DESCRIPTION); parser.declareField(optionalConstructorArg(), - p -> TimeUtils.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()), CREATE_TIME, ObjectParser.ValueType.VALUE); - parser.declareString(optionalConstructorArg(), VERSION); + p -> TimeUtils.parseTimeFieldToInstant(p, DataFrameField.CREATE_TIME.getPreferredName()), DataFrameField.CREATE_TIME, + ObjectParser.ValueType.VALUE); + parser.declareString(optionalConstructorArg(), DataFrameField.VERSION); return parser; } @@ -256,7 +255,7 @@ public Instant getCreateTime() { } public DataFrameTransformConfig setCreateTime(Instant createTime) { - ExceptionsHelper.requireNonNull(createTime, CREATE_TIME.getPreferredName()); + ExceptionsHelper.requireNonNull(createTime, DataFrameField.CREATE_TIME.getPreferredName()); this.createTime = Instant.ofEpochMilli(createTime.toEpochMilli()); return this; } @@ -332,10 +331,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field(DataFrameField.DESCRIPTION.getPreferredName(), description); } if (transformVersion != null) { - builder.field(VERSION.getPreferredName(), transformVersion); + builder.field(DataFrameField.VERSION.getPreferredName(), transformVersion); } if (createTime != null) { - builder.timeField(CREATE_TIME.getPreferredName(), CREATE_TIME.getPreferredName() + "_string", createTime.toEpochMilli()); + builder.timeField(DataFrameField.CREATE_TIME.getPreferredName(), DataFrameField.CREATE_TIME.getPreferredName() + "_string", + createTime.toEpochMilli()); } builder.endObject(); return builder; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java index 681599331c8af..499f62f13ea53 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java @@ -42,13 +42,13 @@ public void testDeleteConfigurationLeftOver() throws IOException { builder.endObject(); final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON); Request req = new Request("PUT", - DataFrameInternalIndex.INDEX_NAME + "/_doc/" + DataFrameTransformConfig.documentId(fakeTransformName)); + DataFrameInternalIndex.LATEST_INDEX_NAME + "/_doc/" + DataFrameTransformConfig.documentId(fakeTransformName)); req.setEntity(entity); client().performRequest(req); } // refresh the index - assertOK(client().performRequest(new Request("POST", DataFrameInternalIndex.INDEX_NAME + "/_refresh"))); + assertOK(client().performRequest(new Request("POST", DataFrameInternalIndex.LATEST_INDEX_NAME + "/_refresh"))); Request deleteRequest = new Request("DELETE", DATAFRAME_ENDPOINT + fakeTransformName); Response deleteResponse = client().performRequest(deleteRequest); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index 09a6f1ee56ab4..728ff55a42a06 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -385,7 +385,7 @@ public void wipeDataFrameTransforms() throws IOException { assertTrue(transformConfigs.isEmpty()); // the configuration index should be empty - Request request = new Request("GET", DataFrameInternalIndex.INDEX_NAME + "/_search"); + Request request = new Request("GET", DataFrameInternalIndex.LATEST_INDEX_NAME + "/_search"); try { Response searchResponse = adminClient().performRequest(request); Map searchResult = entityAsMap(searchResponse); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformInternalIndexIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformInternalIndexIT.java new file mode 100644 index 0000000000000..c76ca459694a6 --- /dev/null +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformInternalIndexIT.java @@ -0,0 +1,132 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.integration; + +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.dataframe.GetDataFrameTransformRequest; +import org.elasticsearch.client.dataframe.GetDataFrameTransformResponse; +import org.elasticsearch.client.dataframe.UpdateDataFrameTransformRequest; +import org.elasticsearch.client.dataframe.UpdateDataFrameTransformResponse; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.xpack.core.dataframe.DataFrameField; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Collections; + +import static org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex.addDataFrameTransformsConfigMappings; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.equalTo; + + +public class DataFrameTransformInternalIndexIT extends ESRestTestCase { + + + private static final String CURRENT_INDEX = DataFrameInternalIndex.LATEST_INDEX_NAME; + private static final String OLD_INDEX = DataFrameInternalIndex.INDEX_PATTERN + "1"; + + + public void testUpdateDeletesOldTransformConfig() throws Exception { + TestRestHighLevelClient client = new TestRestHighLevelClient(); + // The mapping does not need to actually be the "OLD" mapping, we are testing that the old doc gets deleted, and the new one + // created. + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + builder.startObject(); + builder.startObject("properties"); + builder.startObject(DataFrameField.INDEX_DOC_TYPE.getPreferredName()).field("type", "keyword").endObject(); + addDataFrameTransformsConfigMappings(builder); + builder.endObject(); + builder.endObject(); + client.indices().create(new CreateIndexRequest(OLD_INDEX).mapping(builder), RequestOptions.DEFAULT); + } + String transformIndex = "transform-index-deletes-old"; + createSourceIndex(transformIndex); + String transformId = "transform-update-deletes-old-transform-config"; + String config = "{\"dest\": {\"index\":\"bar\"}," + + " \"source\": {\"index\":\"" + transformIndex + "\", \"query\": {\"match_all\":{}}}," + + " \"id\": \""+transformId+"\"," + + " \"doc_type\": \"data_frame_transform_config\"," + + " \"pivot\": {" + + " \"group_by\": {" + + " \"reviewer\": {" + + " \"terms\": {" + + " \"field\": \"user_id\"" + + " } } }," + + " \"aggregations\": {" + + " \"avg_rating\": {" + + " \"avg\": {" + + " \"field\": \"stars\"" + + " } } } }," + + "\"frequency\":\"1s\"" + + "}"; + client.index(new IndexRequest(OLD_INDEX) + .id(DataFrameTransformConfig.documentId(transformId)) + .source(config, XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), + RequestOptions.DEFAULT); + GetResponse getResponse = client.get(new GetRequest(OLD_INDEX, DataFrameTransformConfig.documentId(transformId)), + RequestOptions.DEFAULT); + assertThat(getResponse.isExists(), is(true)); + + GetDataFrameTransformResponse response = client.dataFrame() + .getDataFrameTransform(new GetDataFrameTransformRequest(transformId), RequestOptions.DEFAULT); + assertThat(response.getTransformConfigurations().get(0).getId(), equalTo(transformId)); + + UpdateDataFrameTransformResponse updated = client.dataFrame().updateDataFrameTransform( + new UpdateDataFrameTransformRequest(DataFrameTransformConfigUpdate.builder().setDescription("updated").build(), transformId), + RequestOptions.DEFAULT); + + assertThat(updated.getTransformConfiguration().getId(), equalTo(transformId)); + assertThat(updated.getTransformConfiguration().getDescription(), equalTo("updated")); + + // Old should now be gone + getResponse = client.get(new GetRequest(OLD_INDEX, DataFrameTransformConfig.documentId(transformId)), RequestOptions.DEFAULT); + assertThat(getResponse.isExists(), is(false)); + + // New should be here + getResponse = client.get(new GetRequest(CURRENT_INDEX, DataFrameTransformConfig.documentId(transformId)), + RequestOptions.DEFAULT); + assertThat(getResponse.isExists(), is(true)); + } + + + @Override + protected Settings restClientSettings() { + final String token = "Basic " + + Base64.getEncoder().encodeToString(("x_pack_rest_user:x-pack-test-password").getBytes(StandardCharsets.UTF_8)); + return Settings.builder() + .put(ThreadContext.PREFIX + ".Authorization", token) + .build(); + } + + private void createSourceIndex(String index) throws IOException { + TestRestHighLevelClient client = new TestRestHighLevelClient(); + client.indices().create(new CreateIndexRequest(index), RequestOptions.DEFAULT); + } + + private class TestRestHighLevelClient extends RestHighLevelClient { + TestRestHighLevelClient() { + super(client(), restClient -> {}, new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents()); + } + } +} diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java index f195005165b43..c9ee2081d3d63 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java @@ -54,7 +54,7 @@ public void testUsage() throws Exception { stopDataFrameTransform("test_usage", false); Request statsExistsRequest = new Request("GET", - DataFrameInternalIndex.INDEX_NAME+"/_search?q=" + + DataFrameInternalIndex.LATEST_INDEX_NAME+"/_search?q=" + INDEX_DOC_TYPE.getPreferredName() + ":" + DataFrameTransformStoredDoc.NAME); // Verify that we have one stat document @@ -96,7 +96,7 @@ public void testUsage() throws Exception { XContentMapValues.extractValue("data_frame.stats." + statName, statsMap)); } // Refresh the index so that statistics are searchable - refreshIndex(DataFrameInternalIndex.INDEX_TEMPLATE_NAME); + refreshIndex(DataFrameInternalIndex.LATEST_INDEX_VERSIONED_NAME); }, 60, TimeUnit.SECONDS); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java index 7ecbd5b7ac9dc..383c7b5e75c88 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java @@ -186,7 +186,7 @@ public Collection createComponents(Client client, ClusterService cluster public UnaryOperator> getIndexTemplateMetaDataUpgrader() { return templates -> { try { - templates.put(DataFrameInternalIndex.INDEX_TEMPLATE_NAME, DataFrameInternalIndex.getIndexTemplateMetaData()); + templates.put(DataFrameInternalIndex.LATEST_INDEX_VERSIONED_NAME, DataFrameInternalIndex.getIndexTemplateMetaData()); } catch (IOException e) { logger.error("Error creating data frame index template", e); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameInfoTransportAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameInfoTransportAction.java index dc0e4eefd0a5f..153c2d1b2b50d 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameInfoTransportAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameInfoTransportAction.java @@ -110,7 +110,7 @@ static void getStatisticSummations(Client client, ActionListener waitForStopListener(Request request, ActionList waitResponse -> client.admin() .indices() - .prepareRefresh(DataFrameInternalIndex.INDEX_NAME) + .prepareRefresh(DataFrameInternalIndex.LATEST_INDEX_NAME) .execute(ActionListener.wrap( r -> listener.onResponse(waitResponse), e -> { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportUpdateDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportUpdateDataFrameTransformAction.java index 6f3433a84b540..ae6b0090a6b9d 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportUpdateDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportUpdateDataFrameTransformAction.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.dataframe.action; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; @@ -19,6 +21,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; @@ -59,6 +62,7 @@ public class TransportUpdateDataFrameTransformAction extends TransportMasterNodeAction { + private static final Logger logger = LogManager.getLogger(TransportUpdateDataFrameTransformAction.class); private final XPackLicenseState licenseState; private final Client client; private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager; @@ -109,8 +113,6 @@ protected void masterOperation(Task task, Request request, ClusterState clusterS DataFrameTransformConfigUpdate update = request.getUpdate(); update.setHeaders(filteredHeaders); - String transformId = request.getId(); - // GET transform and attempt to update // We don't want the update to complete if the config changed between GET and INDEX dataFrameTransformsConfigManager.getTransformConfigurationForUpdate(request.getId(), ActionListener.wrap( @@ -136,12 +138,12 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) private void handlePrivsResponse(String username, Request request, DataFrameTransformConfig config, - DataFrameTransformsConfigManager.SeqNoPrimaryTermPair seqNoPrimaryTermPair, + DataFrameTransformsConfigManager.SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex, ClusterState clusterState, HasPrivilegesResponse privilegesResponse, ActionListener listener) { if (privilegesResponse.isCompleteMatch()) { - updateDataFrame(request, config, seqNoPrimaryTermPair, clusterState, listener); + updateDataFrame(request, config, seqNoPrimaryTermAndIndex, clusterState, listener); } else { List indices = privilegesResponse.getIndexPrivileges() .stream() @@ -159,7 +161,7 @@ private void handlePrivsResponse(String username, private void validateAndUpdateDataFrame(Request request, ClusterState clusterState, DataFrameTransformConfig config, - DataFrameTransformsConfigManager.SeqNoPrimaryTermPair seqNoPrimaryTermPair, + DataFrameTransformsConfigManager.SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex, ActionListener listener) { try { SourceDestValidator.validate(config, clusterState, indexNameExpressionResolver, request.isDeferValidation()); @@ -174,17 +176,17 @@ private void validateAndUpdateDataFrame(Request request, final String username = securityContext.getUser().principal(); HasPrivilegesRequest privRequest = buildPrivilegeCheck(config, indexNameExpressionResolver, clusterState, username); ActionListener privResponseListener = ActionListener.wrap( - r -> handlePrivsResponse(username, request, config, seqNoPrimaryTermPair, clusterState, r, listener), + r -> handlePrivsResponse(username, request, config, seqNoPrimaryTermAndIndex, clusterState, r, listener), listener::onFailure); client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener); } else { // No security enabled, just create the transform - updateDataFrame(request, config, seqNoPrimaryTermPair, clusterState, listener); + updateDataFrame(request, config, seqNoPrimaryTermAndIndex, clusterState, listener); } } private void updateDataFrame(Request request, DataFrameTransformConfig config, - DataFrameTransformsConfigManager.SeqNoPrimaryTermPair seqNoPrimaryTermPair, + DataFrameTransformsConfigManager.SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex, ClusterState clusterState, ActionListener listener) { @@ -194,7 +196,18 @@ private void updateDataFrame(Request request, ActionListener putTransformConfigurationListener = ActionListener.wrap( putTransformConfigurationResult -> { auditor.info(config.getId(), "updated data frame transform."); - listener.onResponse(new Response(config)); + dataFrameTransformsConfigManager.deleteOldTransformConfigurations(request.getId(), ActionListener.wrap( + r -> { + logger.trace("[{}] successfully deleted old transform configurations", request.getId()); + listener.onResponse(new Response(config)); + }, + e -> { + logger.warn( + LoggerMessageFormat.format("[{}] failed deleting old transform configurations.", request.getId()), + e); + listener.onResponse(new Response(config)); + } + )); }, // If we failed to INDEX AND we created the destination index, the destination index will still be around // This is a similar behavior to _start @@ -204,7 +217,7 @@ private void updateDataFrame(Request request, // <2> Update our transform ActionListener createDestinationListener = ActionListener.wrap( createDestResponse -> dataFrameTransformsConfigManager.updateTransformConfiguration(config, - seqNoPrimaryTermPair, + seqNoPrimaryTermAndIndex, putTransformConfigurationListener), listener::onFailure ); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java index c24b37b29d449..cad4f0cba484d 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java @@ -31,11 +31,23 @@ public final class DataFrameInternalIndex { + /* Changelog of internal index versions + * + * Please list changes, increase the version if you are 1st in this release cycle + * + * version 1 (7.2): initial + * version 2 (7.4): cleanup, add config::version, config::create_time, checkpoint::timestamp, checkpoint::time_upper_bound, + * progress::docs_processed, progress::docs_indexed, + * stats::exponential_avg_checkpoint_duration_ms, stats::exponential_avg_documents_indexed, + * stats::exponential_avg_documents_processed + */ + // constants for the index - public static final String INDEX_TEMPLATE_VERSION = "1"; - public static final String INDEX_TEMPLATE_PATTERN = ".data-frame-internal-"; - public static final String INDEX_TEMPLATE_NAME = INDEX_TEMPLATE_PATTERN + INDEX_TEMPLATE_VERSION; - public static final String INDEX_NAME = INDEX_TEMPLATE_NAME; + public static final String INDEX_VERSION = "2"; + public static final String INDEX_PATTERN = ".data-frame-internal-"; + public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION; + public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME; + public static final String INDEX_NAME_PATTERN = INDEX_PATTERN + "*"; public static final String AUDIT_TEMPLATE_VERSION = "1"; public static final String AUDIT_INDEX_PREFIX = ".data-frame-notifications-"; @@ -58,8 +70,8 @@ public final class DataFrameInternalIndex { public static final String KEYWORD = "keyword"; public static IndexTemplateMetaData getIndexTemplateMetaData() throws IOException { - IndexTemplateMetaData dataFrameTemplate = IndexTemplateMetaData.builder(INDEX_TEMPLATE_NAME) - .patterns(Collections.singletonList(INDEX_TEMPLATE_NAME)) + IndexTemplateMetaData dataFrameTemplate = IndexTemplateMetaData.builder(LATEST_INDEX_VERSIONED_NAME) + .patterns(Collections.singletonList(LATEST_INDEX_VERSIONED_NAME)) .version(Version.CURRENT.id) .settings(Settings.builder() // the configurations are expected to be small @@ -116,7 +128,7 @@ private static XContentBuilder auditMappings() throws IOException { return builder; } - private static XContentBuilder mappings() throws IOException { + public static XContentBuilder mappings() throws IOException { XContentBuilder builder = jsonBuilder(); builder.startObject(); @@ -133,6 +145,8 @@ private static XContentBuilder mappings() throws IOException { addDataFrameTransformsConfigMappings(builder); // add the schema for transform stats addDataFrameTransformStoredDocMappings(builder); + // add the schema for checkpoints + addDataFrameCheckpointMappings(builder); // end type builder.endObject(); // end properties @@ -225,15 +239,13 @@ private static XContentBuilder addDataFrameTransformStoredDocMappings(XContentBu .field(TYPE, DOUBLE) .endObject() .endObject() - .endObject() + .endObject(); // This is obsolete and can be removed for future versions of the index, but is left here as a warning/reminder that // we cannot declare this field differently in version 1 of the internal index as it would cause a mapping clash - .startObject("checkpointing") - .field(ENABLED, false) - .endObject(); + // .startObject("checkpointing").field(ENABLED, false).endObject(); } - private static XContentBuilder addDataFrameTransformsConfigMappings(XContentBuilder builder) throws IOException { + public static XContentBuilder addDataFrameTransformsConfigMappings(XContentBuilder builder) throws IOException { return builder .startObject(DataFrameField.ID.getPreferredName()) .field(TYPE, KEYWORD) @@ -257,6 +269,22 @@ private static XContentBuilder addDataFrameTransformsConfigMappings(XContentBuil .endObject() .startObject(DataFrameField.DESCRIPTION.getPreferredName()) .field(TYPE, TEXT) + .endObject() + .startObject(DataFrameField.VERSION.getPreferredName()) + .field(TYPE, KEYWORD) + .endObject() + .startObject(DataFrameField.CREATE_TIME.getPreferredName()) + .field(TYPE, DATE) + .endObject(); + } + + private static XContentBuilder addDataFrameCheckpointMappings(XContentBuilder builder) throws IOException { + return builder + .startObject(DataFrameField.TIMESTAMP_MILLIS.getPreferredName()) + .field(TYPE, DATE) + .endObject() + .startObject(DataFrameField.TIME_UPPER_BOUND_MILLIS.getPreferredName()) + .field(TYPE, DATE) .endObject(); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java index 2c8281eeab223..3d5c8b28aaa18 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java @@ -9,16 +9,18 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.get.GetAction; -import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; @@ -37,8 +39,11 @@ import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.ScrollableHitSource; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher; @@ -54,12 +59,34 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; +/** + * Place of all interactions with the internal transforms index. For configuration and mappings see @link{DataFrameInternalIndex} + * + * Versioned Index: + * + * We wrap several indexes under 1 pattern: ".data-frame-internal-1", ".data-frame-internal-2", ".data-frame-internal-n" while + * n is the _current_ version of the index. + * + * - all gets/reads and dbq as well are searches on all indexes, while last-one-wins, so the result with the highest version is uses + * - all puts and updates go into the _current_ version of the index, in case of updates this can leave dups behind + * + * Duplicate handling / old version cleanup + * + * As we always write to the new index, updates of older documents leave a dup in the previous versioned index behind. However, + * documents are tiny, so the impact is rather small. + * + * Nevertheless cleanup would be good, eventually we need to move old documents into new indexes after major upgrades. + * + * TODO: Provide a method that moves old docs into the current index and delete old indexes and templates + */ public class DataFrameTransformsConfigManager { private static final Logger logger = LogManager.getLogger(DataFrameTransformsConfigManager.class); @@ -84,7 +111,7 @@ public void putTransformCheckpoint(DataFrameTransformCheckpoint checkpoint, Acti try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder source = checkpoint.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); - IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.INDEX_NAME) + IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.LATEST_INDEX_NAME) .opType(DocWriteRequest.OpType.INDEX) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .id(DataFrameTransformCheckpoint.documentId(checkpoint.getTransformId(), checkpoint.getCheckpoint())) @@ -116,30 +143,91 @@ public void putTransformConfiguration(DataFrameTransformConfig transformConfig, * but is an index operation that will fail with a version conflict * if the current document seqNo and primaryTerm is not the same as the provided version. * @param transformConfig the @link{DataFrameTransformConfig} - * @param seqNoPrimaryTermPair an object containing the believed seqNo and primaryTerm for the doc. + * @param seqNoPrimaryTermAndIndex an object containing the believed seqNo, primaryTerm and index for the doc. * Used for optimistic concurrency control * @param listener listener to call after request */ public void updateTransformConfiguration(DataFrameTransformConfig transformConfig, - SeqNoPrimaryTermPair seqNoPrimaryTermPair, + SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex, ActionListener listener) { - putTransformConfiguration(transformConfig, DocWriteRequest.OpType.INDEX, seqNoPrimaryTermPair, listener); + if (seqNoPrimaryTermAndIndex.getIndex().equals(DataFrameInternalIndex.LATEST_INDEX_NAME)) { + // update the config in the same, current index using optimistic concurrency control + putTransformConfiguration(transformConfig, DocWriteRequest.OpType.INDEX, seqNoPrimaryTermAndIndex, listener); + } else { + // create the config in the current version of the index assuming there is no existing one + // this leaves a dup behind in the old index, see dup handling on the top + putTransformConfiguration(transformConfig, DocWriteRequest.OpType.CREATE, null, listener); + } + } + + /** + * This deletes configuration documents that match the given transformId that are contained in old index versions. + * + * @param transformId The configuration ID potentially referencing configurations stored in the old indices + * @param listener listener to alert on completion + */ + public void deleteOldTransformConfigurations(String transformId, ActionListener listener) { + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(DataFrameInternalIndex.INDEX_NAME_PATTERN) + .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery() + .mustNot(QueryBuilders.termQuery("_index", DataFrameInternalIndex.LATEST_INDEX_NAME)) + .filter(QueryBuilders.termQuery("_id", DataFrameTransformConfig.documentId(transformId))))) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()); + + executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap( + response -> { + if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) { + Tuple statusAndReason = getStatusAndReason(response); + listener.onFailure( + new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2())); + return; + } + listener.onResponse(true); + }, + listener::onFailure + )); + } + + /** + * This deletes stored state/stats documents for the given transformId that are contained in old index versions. + * + * @param transformId The transform ID referenced by the documents + * @param listener listener to alert on completion + */ + public void deleteOldTransformStoredDocuments(String transformId, ActionListener listener) { + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(DataFrameInternalIndex.INDEX_NAME_PATTERN) + .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery() + .mustNot(QueryBuilders.termQuery("_index", DataFrameInternalIndex.LATEST_INDEX_NAME)) + .filter(QueryBuilders.termQuery("_id", DataFrameTransformStoredDoc.documentId(transformId))))) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()); + + executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap( + response -> { + if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) { + Tuple statusAndReason = getStatusAndReason(response); + listener.onFailure( + new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2())); + return; + } + listener.onResponse(true); + }, + listener::onFailure + )); } private void putTransformConfiguration(DataFrameTransformConfig transformConfig, DocWriteRequest.OpType optType, - SeqNoPrimaryTermPair seqNoPrimaryTermPair, + SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex, ActionListener listener) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); - IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.INDEX_NAME) + IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.LATEST_INDEX_NAME) .opType(optType) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .id(DataFrameTransformConfig.documentId(transformConfig.getId())) .source(source); - if (seqNoPrimaryTermPair != null) { - indexRequest.setIfSeqNo(seqNoPrimaryTermPair.seqNo).setIfPrimaryTerm(seqNoPrimaryTermPair.primaryTerm); + if (seqNoPrimaryTermAndIndex != null) { + indexRequest.setIfSeqNo(seqNoPrimaryTermAndIndex.seqNo).setIfPrimaryTerm(seqNoPrimaryTermAndIndex.primaryTerm); } executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(r -> { listener.onResponse(true); @@ -170,19 +258,25 @@ private void putTransformConfiguration(DataFrameTransformConfig transformConfig, * @param resultListener listener to call after request has been made */ public void getTransformCheckpoint(String transformId, long checkpoint, ActionListener resultListener) { - GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, - DataFrameTransformCheckpoint.documentId(transformId, checkpoint)); - executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> { - - if (getResponse.isExists() == false) { - // do not fail if checkpoint does not exist but return an empty checkpoint - logger.trace("found no checkpoint for transform [" + transformId + "], returning empty checkpoint"); - resultListener.onResponse(DataFrameTransformCheckpoint.EMPTY); - return; - } - BytesReference source = getResponse.getSourceAsBytesRef(); - parseCheckpointsLenientlyFromSource(source, transformId, resultListener); - }, resultListener::onFailure)); + QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", DataFrameTransformCheckpoint.documentId(transformId, checkpoint)); + SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN) + .setQuery(queryBuilder) + // use sort to get the last + .addSort("_index", SortOrder.DESC) + .setSize(1) + .request(); + + executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, SearchAction.INSTANCE, searchRequest, ActionListener.wrap( + searchResponse -> { + if (searchResponse.getHits().getHits().length == 0) { + // do not fail if checkpoint does not exist but return an empty checkpoint + logger.trace("found no checkpoint for transform [" + transformId + "], returning empty checkpoint"); + resultListener.onResponse(DataFrameTransformCheckpoint.EMPTY); + return; + } + BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef(); + parseCheckpointsLenientlyFromSource(source, transformId, resultListener); + }, resultListener::onFailure)); } /** @@ -193,24 +287,25 @@ public void getTransformCheckpoint(String transformId, long checkpoint, ActionLi * @param resultListener listener to call after inner request has returned */ public void getTransformConfiguration(String transformId, ActionListener resultListener) { - GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId)); - executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> { + QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", DataFrameTransformConfig.documentId(transformId)); + SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN) + .setQuery(queryBuilder) + // use sort to get the last + .addSort("_index", SortOrder.DESC) + .setSize(1) + .request(); - if (getResponse.isExists() == false) { - resultListener.onFailure(new ResourceNotFoundException( - DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId))); - return; - } - BytesReference source = getResponse.getSourceAsBytesRef(); - parseTransformLenientlyFromSource(source, transformId, resultListener); - }, e -> { - if (e.getClass() == IndexNotFoundException.class) { - resultListener.onFailure(new ResourceNotFoundException( - DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId))); - } else { - resultListener.onFailure(e); - } - })); + executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, SearchAction.INSTANCE, searchRequest, + ActionListener.wrap( + searchResponse -> { + if (searchResponse.getHits().getHits().length == 0) { + resultListener.onFailure(new ResourceNotFoundException( + DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId))); + return; + } + BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef(); + parseTransformLenientlyFromSource(source, transformId, resultListener); + }, resultListener::onFailure)); } /** @@ -222,28 +317,30 @@ public void getTransformConfiguration(String transformId, ActionListener> configAndVersionListener) { - GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId)); - executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> { + SeqNoPrimaryTermAndIndex>> configAndVersionListener) { + QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", DataFrameTransformConfig.documentId(transformId)); + SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN) + .setQuery(queryBuilder) + // use sort to get the last + .addSort("_index", SortOrder.DESC) + .setSize(1) + .seqNoAndPrimaryTerm(true) + .request(); - if (getResponse.isExists() == false) { - configAndVersionListener.onFailure(new ResourceNotFoundException( - DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId))); - return; - } - BytesReference source = getResponse.getSourceAsBytesRef(); - parseTransformLenientlyFromSource(source, transformId, ActionListener.wrap( - config -> configAndVersionListener.onResponse(Tuple.tuple(config, - new SeqNoPrimaryTermPair(getResponse.getSeqNo(), getResponse.getPrimaryTerm()))), - configAndVersionListener::onFailure)); - }, e -> { - if (e.getClass() == IndexNotFoundException.class) { - configAndVersionListener.onFailure(new ResourceNotFoundException( - DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId))); - } else { - configAndVersionListener.onFailure(e); - } - })); + executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, SearchAction.INSTANCE, searchRequest, ActionListener.wrap( + searchResponse -> { + if (searchResponse.getHits().getHits().length == 0) { + configAndVersionListener.onFailure(new ResourceNotFoundException( + DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId))); + return; + } + SearchHit hit = searchResponse.getHits().getHits()[0]; + BytesReference source = hit.getSourceRef(); + parseTransformLenientlyFromSource(source, transformId, ActionListener.wrap( + config -> configAndVersionListener.onResponse(Tuple.tuple(config, + new SeqNoPrimaryTermAndIndex(hit.getSeqNo(), hit.getPrimaryTerm(), hit.getIndex()))), + configAndVersionListener::onFailure)); + }, configAndVersionListener::onFailure)); } /** @@ -263,7 +360,7 @@ public void expandTransformIds(String transformIdsExpression, String[] idTokens = ExpandedIdsMatcher.tokenizeExpression(transformIdsExpression); QueryBuilder queryBuilder = buildQueryFromTokenizedIds(idTokens, DataFrameTransformConfig.NAME); - SearchRequest request = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME) + SearchRequest request = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN) .addSort(DataFrameField.ID.getPreferredName(), SortOrder.ASC) .setFrom(pageParams.getFrom()) .setTrackTotalHits(true) @@ -275,35 +372,33 @@ public void expandTransformIds(String transformIdsExpression, final ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(idTokens, allowNoMatch); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, request, - ActionListener.wrap( - searchResponse -> { - long totalHits = searchResponse.getHits().getTotalHits().value; - List ids = new ArrayList<>(searchResponse.getHits().getHits().length); - for (SearchHit hit : searchResponse.getHits().getHits()) { - BytesReference source = hit.getSourceRef(); - try (InputStream stream = source.streamInput(); - XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, stream)) { - ids.add((String) parser.map().get(DataFrameField.ID.getPreferredName())); - } catch (IOException e) { - foundIdsListener.onFailure(new ElasticsearchParseException("failed to parse search hit for ids", e)); - return; - } - } - requiredMatches.filterMatchedIds(ids); - if (requiredMatches.hasUnmatchedIds()) { - // some required Ids were not found - foundIdsListener.onFailure( - new ResourceNotFoundException( - DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, - requiredMatches.unmatchedIdsString()))); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, request, ActionListener.wrap( + searchResponse -> { + long totalHits = searchResponse.getHits().getTotalHits().value; + // important: preserve order + Set ids = new LinkedHashSet<>(searchResponse.getHits().getHits().length); + for (SearchHit hit : searchResponse.getHits().getHits()) { + BytesReference source = hit.getSourceRef(); + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, stream)) { + ids.add((String) parser.map().get(DataFrameField.ID.getPreferredName())); + } catch (IOException e) { + foundIdsListener.onFailure(new ElasticsearchParseException("failed to parse search hit for ids", e)); return; } - foundIdsListener.onResponse(new Tuple<>(totalHits, ids)); - }, - foundIdsListener::onFailure - ), client::search); + } + requiredMatches.filterMatchedIds(ids); + if (requiredMatches.hasUnmatchedIds()) { + // some required Ids were not found + foundIdsListener.onFailure( + new ResourceNotFoundException( + DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, + requiredMatches.unmatchedIdsString()))); + return; + } + foundIdsListener.onResponse(new Tuple<>(totalHits, new ArrayList<>(ids))); + }, foundIdsListener::onFailure), client::search); } /** @@ -314,15 +409,14 @@ public void expandTransformIds(String transformIdsExpression, */ public void deleteTransform(String transformId, ActionListener listener) { DeleteByQueryRequest request = new DeleteByQueryRequest() - .setAbortOnVersionConflict(false); //since these documents are not updated, a conflict just means it was deleted previously + .setAbortOnVersionConflict(false); //since these documents are not updated, a conflict just means it was deleted previously - request.indices(DataFrameInternalIndex.INDEX_NAME); + request.indices(DataFrameInternalIndex.INDEX_NAME_PATTERN); QueryBuilder query = QueryBuilders.termQuery(DataFrameField.ID.getPreferredName(), transformId); request.setQuery(query); request.setRefresh(true); executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(deleteResponse -> { - if (deleteResponse.getDeleted() == 0) { listener.onFailure(new ResourceNotFoundException( DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId))); @@ -343,9 +437,10 @@ public void putOrUpdateTransformStoredDoc(DataFrameTransformStoredDoc stats, Act try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder source = stats.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); - IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.INDEX_NAME) + IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.LATEST_INDEX_NAME) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .id(DataFrameTransformStoredDoc.documentId(stats.getId())) + .opType(DocWriteRequest.OpType.INDEX) .source(source); executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap( @@ -363,51 +458,56 @@ public void putOrUpdateTransformStoredDoc(DataFrameTransformStoredDoc stats, Act } public void getTransformStoredDoc(String transformId, ActionListener resultListener) { - GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformStoredDoc.documentId(transformId)); - executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> { + QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", DataFrameTransformStoredDoc.documentId(transformId)); + SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN) + .setQuery(queryBuilder) + // use sort to get the last + .addSort("_index", SortOrder.DESC) + .setSize(1) + .request(); - if (getResponse.isExists() == false) { - resultListener.onFailure(new ResourceNotFoundException( - DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNKNOWN_TRANSFORM_STATS, transformId))); - return; - } - BytesReference source = getResponse.getSourceAsBytesRef(); - try (InputStream stream = source.streamInput(); - XContentParser parser = XContentFactory.xContent(XContentType.JSON) - .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) { - resultListener.onResponse(DataFrameTransformStoredDoc.fromXContent(parser)); - } catch (Exception e) { - logger.error( - DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_PARSE_TRANSFORM_STATISTICS_CONFIGURATION, transformId), e); - resultListener.onFailure(e); - } - }, e -> { - if (e instanceof ResourceNotFoundException) { - resultListener.onFailure(new ResourceNotFoundException( - DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNKNOWN_TRANSFORM_STATS, transformId))); - } else { - resultListener.onFailure(e); - } - })); + executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, SearchAction.INSTANCE, searchRequest, ActionListener.wrap( + searchResponse -> { + if (searchResponse.getHits().getHits().length == 0) { + resultListener.onFailure(new ResourceNotFoundException( + DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNKNOWN_TRANSFORM_STATS, transformId))); + return; + } + BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef(); + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) { + resultListener.onResponse(DataFrameTransformStoredDoc.fromXContent(parser)); + } catch (Exception e) { + logger.error(DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_PARSE_TRANSFORM_STATISTICS_CONFIGURATION, + transformId), e); + resultListener.onFailure(e); + } + }, resultListener::onFailure)); } public void getTransformStoredDoc(Collection transformIds, ActionListener> listener) { - QueryBuilder builder = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery() - .filter(QueryBuilders.termsQuery(DataFrameField.ID.getPreferredName(), transformIds)) - .filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformStoredDoc.NAME))); + .filter(QueryBuilders.termsQuery(DataFrameField.ID.getPreferredName(), transformIds)) + .filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformStoredDoc.NAME))); - SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME) - .addSort(DataFrameField.ID.getPreferredName(), SortOrder.ASC) - .setQuery(builder) - .setSize(Math.min(transformIds.size(), 10_000)) - .request(); + SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN) + .addSort(DataFrameField.ID.getPreferredName(), SortOrder.ASC) + .addSort("_index", SortOrder.DESC) + .setQuery(builder) + // the limit for getting stats and transforms is 1000, as long as we do not have 10 indices this works + .setSize(Math.min(transformIds.size(), 10_000)) + .request(); executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, searchRequest, - ActionListener.wrap( - searchResponse -> { - List stats = new ArrayList<>(); - for (SearchHit hit : searchResponse.getHits().getHits()) { + ActionListener.wrap( + searchResponse -> { + List stats = new ArrayList<>(); + String previousId = null; + for (SearchHit hit : searchResponse.getHits().getHits()) { + // skip old versions + if (hit.getId().equals(previousId) == false) { + previousId = hit.getId(); BytesReference source = hit.getSourceRef(); try (InputStream stream = source.streamInput(); XContentParser parser = XContentFactory.xContent(XContentType.JSON) @@ -419,17 +519,11 @@ public void getTransformStoredDoc(Collection transformIds, ActionListene return; } } - - listener.onResponse(stats); - }, - e -> { - if (e.getClass() == IndexNotFoundException.class) { - listener.onResponse(Collections.emptyList()); - } else { - listener.onFailure(e); - } } - ), client::search); + + listener.onResponse(stats); + }, listener::onFailure + ), client::search); } private void parseTransformLenientlyFromSource(BytesReference source, String transformId, @@ -480,13 +574,37 @@ private QueryBuilder buildQueryFromTokenizedIds(String[] idTokens, String resour return QueryBuilders.constantScoreQuery(queryBuilder); } - public static class SeqNoPrimaryTermPair { + private static Tuple getStatusAndReason(final BulkByScrollResponse response) { + RestStatus status = RestStatus.OK; + Throwable reason = new Exception("Unknown error"); + //Getting the max RestStatus is sort of arbitrary, would the user care about 5xx over 4xx? + //Unsure of a better way to return an appropriate and possibly actionable cause to the user. + for (BulkItemResponse.Failure failure : response.getBulkFailures()) { + if (failure.getStatus().getStatus() > status.getStatus()) { + status = failure.getStatus(); + reason = failure.getCause(); + } + } + + for (ScrollableHitSource.SearchFailure failure : response.getSearchFailures()) { + RestStatus failureStatus = org.elasticsearch.ExceptionsHelper.status(failure.getReason()); + if (failureStatus.getStatus() > status.getStatus()) { + status = failureStatus; + reason = failure.getReason(); + } + } + return new Tuple<>(status, reason); + } + + public static class SeqNoPrimaryTermAndIndex { private final long seqNo; private final long primaryTerm; + private final String index; - public SeqNoPrimaryTermPair(long seqNo, long primaryTerm) { + public SeqNoPrimaryTermAndIndex(long seqNo, long primaryTerm, String index) { this.seqNo = seqNo; this.primaryTerm = primaryTerm; + this.index = index; } public long getSeqNo() { @@ -496,5 +614,9 @@ public long getSeqNo() { public long getPrimaryTerm() { return primaryTerm; } + + public String getIndex() { + return index; + } } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataframeIndex.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataframeIndex.java index 005be73e8cee2..170f8e9559914 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataframeIndex.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataframeIndex.java @@ -100,7 +100,7 @@ private static XContentBuilder addMetaData(XContentBuilder builder, String id, C .field(DataFrameField.CREATED_BY, DataFrameField.DATA_FRAME_SIGNATURE) .startObject(DataFrameField.META_FIELDNAME) .field(DataFrameField.CREATION_DATE_MILLIS, clock.millis()) - .startObject(DataFrameField.VERSION) + .startObject(DataFrameField.VERSION.getPreferredName()) .field(DataFrameField.CREATED, Version.CURRENT) .endObject() .field(DataFrameField.TRANSFORM, id) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index dc37e937ea135..593c3c6e8a560 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -104,7 +104,7 @@ static List verifyIndicesPrimaryShardsAreActive(ClusterState clusterStat IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(); String[] indices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), - DataFrameInternalIndex.INDEX_TEMPLATE_PATTERN + "*"); + DataFrameInternalIndex.INDEX_NAME_PATTERN); List unavailableIndices = new ArrayList<>(indices.length); for (String index : indices) { IndexRoutingTable routingTable = clusterState.getRoutingTable().index(index); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 641e3a0d1d777..313efff756917 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; @@ -58,6 +59,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -632,6 +634,7 @@ static class ClientDataFrameIndexer extends DataFrameIndexer { private volatile boolean auditBulkFailures = true; // Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index private volatile String lastAuditedExceptionMessage = null; + private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false); ClientDataFrameIndexer(String transformId, DataFrameTransformsConfigManager transformsConfigManager, @@ -896,7 +899,23 @@ protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition p if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) { transformTask.shutdown(); } - next.run(); + // Only do this clean up once, if it succeeded, no reason to do the query again. + if (oldStatsCleanedUp.compareAndExchange(false, true) == false) { + transformsConfigManager.deleteOldTransformStoredDocuments(transformId, ActionListener.wrap( + nil -> { + logger.trace("[{}] deleted old transform stats and state document", transformId); + next.run(); + }, + e -> { + String msg = LoggerMessageFormat.format("[{}] failed deleting old transform configurations.", + transformId); + logger.warn(msg, e); + // If we have failed, we should attempt the clean up again later + oldStatsCleanedUp.set(false); + next.run(); + } + )); + } }, statsExc -> { logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc); diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameSingleNodeTestCase.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameSingleNodeTestCase.java index 2c2ad5ba0b336..34c16ebc9e73a 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameSingleNodeTestCase.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameSingleNodeTestCase.java @@ -29,8 +29,8 @@ public abstract class DataFrameSingleNodeTestCase extends ESSingleNodeTestCase { public void waitForTemplates() throws Exception { assertBusy(() -> { ClusterState state = client().admin().cluster().prepareState().get().getState(); - assertTrue("Timed out waiting for the data frame templates to be installed", - TemplateUtils.checkTemplateExistsAndVersionIsGTECurrentVersion(DataFrameInternalIndex.INDEX_TEMPLATE_NAME, state)); + assertTrue("Timed out waiting for the data frame templates to be installed", TemplateUtils + .checkTemplateExistsAndVersionIsGTECurrentVersion(DataFrameInternalIndex.LATEST_INDEX_VERSIONED_NAME, state)); }); } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManagerTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManagerTests.java index 45c792f8d1166..e403d102adff0 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManagerTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManagerTests.java @@ -8,7 +8,16 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint; @@ -27,6 +36,9 @@ import java.util.List; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex.mappings; +import static org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager.TO_XCONTENT_PARAMS; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -278,4 +290,71 @@ public void testGetStoredDocMultiple() throws InterruptedException { expectedDocs.sort(Comparator.comparing(DataFrameTransformStoredDoc::getId)); assertAsync(listener -> transformsConfigManager.getTransformStoredDoc(ids, listener), expectedDocs, null, null); } + + public void testDeleteOldTransformConfigurations() throws Exception { + String oldIndex = DataFrameInternalIndex.INDEX_PATTERN + "1"; + String transformId = "transform_test_delete_old_configurations"; + String docId = DataFrameTransformConfig.documentId(transformId); + DataFrameTransformConfig transformConfig = DataFrameTransformConfigTests + .randomDataFrameTransformConfig("transform_test_delete_old_configurations"); + client().admin().indices().create(new CreateIndexRequest(oldIndex) + .mapping(MapperService.SINGLE_MAPPING_NAME, mappings())).actionGet(); + + try(XContentBuilder builder = XContentFactory.jsonBuilder()) { + XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); + IndexRequest request = new IndexRequest(oldIndex) + .source(source) + .id(docId) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client().index(request).actionGet(); + } + + assertAsync(listener -> transformsConfigManager.putTransformConfiguration(transformConfig, listener), true, null, null); + + assertThat(client().get(new GetRequest(oldIndex).id(docId)).actionGet().isExists(), is(true)); + assertThat(client().get(new GetRequest(DataFrameInternalIndex.LATEST_INDEX_NAME).id(docId)).actionGet().isExists(), is(true)); + + assertAsync(listener -> transformsConfigManager.deleteOldTransformConfigurations(transformId, listener), true, null, null); + + client().admin().indices().refresh(new RefreshRequest(DataFrameInternalIndex.INDEX_NAME_PATTERN)).actionGet(); + assertThat(client().get(new GetRequest(oldIndex).id(docId)).actionGet().isExists(), is(false)); + assertThat(client().get(new GetRequest(DataFrameInternalIndex.LATEST_INDEX_NAME).id(docId)).actionGet().isExists(), is(true)); + } + + public void testDeleteOldTransformStoredDocuments() throws Exception { + String oldIndex = DataFrameInternalIndex.INDEX_PATTERN + "1"; + String transformId = "transform_test_delete_old_stored_documents"; + String docId = DataFrameTransformStoredDoc.documentId(transformId); + DataFrameTransformStoredDoc dataFrameTransformStoredDoc = DataFrameTransformStoredDocTests + .randomDataFrameTransformStoredDoc(transformId); + client().admin().indices().create(new CreateIndexRequest(oldIndex) + .mapping(MapperService.SINGLE_MAPPING_NAME, mappings())).actionGet(); + + try(XContentBuilder builder = XContentFactory.jsonBuilder()) { + XContentBuilder source = dataFrameTransformStoredDoc.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); + IndexRequest request = new IndexRequest(oldIndex) + .source(source) + .id(docId); + client().index(request).actionGet(); + } + + assertAsync(listener -> transformsConfigManager.putOrUpdateTransformStoredDoc(dataFrameTransformStoredDoc, listener), + true, + null, + null); + + client().admin().indices().refresh(new RefreshRequest(DataFrameInternalIndex.INDEX_NAME_PATTERN)).actionGet(); + + assertThat(client().get(new GetRequest(oldIndex).id(docId)).actionGet().isExists(), is(true)); + assertThat(client().get(new GetRequest(DataFrameInternalIndex.LATEST_INDEX_NAME).id(docId)).actionGet().isExists(), is(true)); + + assertAsync(listener -> transformsConfigManager.deleteOldTransformStoredDocuments(transformId, listener), + true, + null, + null); + + client().admin().indices().refresh(new RefreshRequest(DataFrameInternalIndex.INDEX_NAME_PATTERN)).actionGet(); + assertThat(client().get(new GetRequest(oldIndex).id(docId)).actionGet().isExists(), is(false)); + assertThat(client().get(new GetRequest(DataFrameInternalIndex.LATEST_INDEX_NAME).id(docId)).actionGet().isExists(), is(true)); + } } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java index 4abcdf5646fa4..5195873576f22 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java @@ -134,7 +134,7 @@ public void testVerifyIndicesPrimaryShardsAreActive() { metaData = new MetaData.Builder(cs.metaData()); routingTable = new RoutingTable.Builder(cs.routingTable()); - String indexToRemove = DataFrameInternalIndex.INDEX_NAME; + String indexToRemove = DataFrameInternalIndex.LATEST_INDEX_NAME; if (randomBoolean()) { routingTable.remove(indexToRemove); } else { @@ -157,7 +157,7 @@ public void testVerifyIndicesPrimaryShardsAreActive() { private void addIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable) { List indices = new ArrayList<>(); indices.add(DataFrameInternalIndex.AUDIT_INDEX); - indices.add(DataFrameInternalIndex.INDEX_NAME); + indices.add(DataFrameInternalIndex.LATEST_INDEX_NAME); for (String indexName : indices) { IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName); indexMetaData.settings(Settings.builder()