Skip to content

Commit

Permalink
[ML-DataFrame] version data frame transform internal index (#45375)
Browse files Browse the repository at this point in the history
Adds index versioning for the internal data frame transform index. Allows for new indices to be created and referenced, `GET` requests now query over the index pattern and takes the latest doc (based on INDEX name).
  • Loading branch information
Hendrik Muhs authored and benwtrent committed Aug 22, 2019
1 parent 26323f0 commit f989beb
Show file tree
Hide file tree
Showing 22 changed files with 607 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ protected void searchResources(AbstractGetResourcesRequest request, ActionListen
sourceBuilder.from(request.getPageParams().getFrom())
.size(request.getPageParams().getSize());
}
sourceBuilder.trackTotalHits(true);

IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;
SearchRequest searchRequest = new SearchRequest(getIndices())
Expand All @@ -88,7 +89,7 @@ protected void searchResources(AbstractGetResourcesRequest request, ActionListen
indicesOptions.expandWildcardsOpen(),
indicesOptions.expandWildcardsClosed(),
indicesOptions))
.source(sourceBuilder.trackTotalHits(true));
.source(customSearchOptions(sourceBuilder));

executeAsyncWithOrigin(client.threadPool().getThreadContext(),
executionOrigin(),
Expand All @@ -105,8 +106,12 @@ public void onResponse(SearchResponse response) {
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
Resource resource = parse(parser);
docs.add(resource);
foundResourceIds.add(extractIdFromResource(resource));
String id = extractIdFromResource(resource);
// Do not include a resource with the same ID twice
if (foundResourceIds.contains(id) == false) {
docs.add(resource);
foundResourceIds.add(id);
}
} catch (IOException e) {
this.onFailure(e);
}
Expand Down Expand Up @@ -159,6 +164,10 @@ private QueryBuilder buildQuery(String[] tokens, String resourceIdField) {
return boolQuery.hasClauses() ? boolQuery : QueryBuilders.matchAllQuery();
}

protected SearchSourceBuilder customSearchOptions(SearchSourceBuilder searchSourceBuilder) {
return searchSourceBuilder;
}

@Nullable
protected QueryBuilder additionalQuery() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public final class DataFrameField {
public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type");
public static final ParseField SOURCE = new ParseField("source");
public static final ParseField DESCRIPTION = new ParseField("description");
public static final ParseField VERSION = new ParseField("version");
public static final ParseField CREATE_TIME = new ParseField("create_time");
public static final ParseField DESTINATION = new ParseField("dest");
public static final ParseField FREQUENCY = new ParseField("frequency");
public static final ParseField FORCE = new ParseField("force");
Expand Down Expand Up @@ -65,7 +67,6 @@ public final class DataFrameField {
// strings for meta information
public static final String META_FIELDNAME = "_data_frame";
public static final String CREATION_DATE_MILLIS = "creation_date_in_millis";
public static final String VERSION = "version";
public static final String CREATED = "created";
public static final String CREATED_BY = "created_by";
public static final String TRANSFORM = "transform";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> implements PersistentTaskParams {

public static final String NAME = DataFrameField.TASK_NAME;
public static final ParseField VERSION = new ParseField(DataFrameField.VERSION);
public static final ParseField FREQUENCY = DataFrameField.FREQUENCY;

private final String transformId;
Expand All @@ -36,7 +35,7 @@ public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> imp

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameField.ID);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), VERSION);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), DataFrameField.VERSION);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FREQUENCY);
}

Expand Down Expand Up @@ -90,7 +89,7 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DataFrameField.ID.getPreferredName(), transformId);
builder.field(VERSION.getPreferredName(), version);
builder.field(DataFrameField.VERSION.getPreferredName(), version);
if (frequency != null) {
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
// types of transforms
public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");

public static final ParseField VERSION = new ParseField("version");
public static final ParseField CREATE_TIME = new ParseField("create_time");
private static final ConstructingObjectParser<DataFrameTransformConfig, String> STRICT_PARSER = createParser(false);
private static final ConstructingObjectParser<DataFrameTransformConfig, String> LENIENT_PARSER = createParser(true);
static final int MAX_DESCRIPTION_LENGTH = 1_000;
Expand Down Expand Up @@ -98,8 +96,8 @@ private static ConstructingObjectParser<DataFrameTransformConfig, String> create
// on strict parsing do not allow injection of headers, transform version, or create time
if (lenient == false) {
validateStrictParsingParams(args[6], HEADERS.getPreferredName());
validateStrictParsingParams(args[9], CREATE_TIME.getPreferredName());
validateStrictParsingParams(args[10], VERSION.getPreferredName());
validateStrictParsingParams(args[9], DataFrameField.CREATE_TIME.getPreferredName());
validateStrictParsingParams(args[10], DataFrameField.VERSION.getPreferredName());
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -132,8 +130,9 @@ private static ConstructingObjectParser<DataFrameTransformConfig, String> create
parser.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p, lenient), PIVOT_TRANSFORM);
parser.declareString(optionalConstructorArg(), DataFrameField.DESCRIPTION);
parser.declareField(optionalConstructorArg(),
p -> TimeUtils.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()), CREATE_TIME, ObjectParser.ValueType.VALUE);
parser.declareString(optionalConstructorArg(), VERSION);
p -> TimeUtils.parseTimeFieldToInstant(p, DataFrameField.CREATE_TIME.getPreferredName()), DataFrameField.CREATE_TIME,
ObjectParser.ValueType.VALUE);
parser.declareString(optionalConstructorArg(), DataFrameField.VERSION);
return parser;
}

Expand Down Expand Up @@ -256,7 +255,7 @@ public Instant getCreateTime() {
}

public DataFrameTransformConfig setCreateTime(Instant createTime) {
ExceptionsHelper.requireNonNull(createTime, CREATE_TIME.getPreferredName());
ExceptionsHelper.requireNonNull(createTime, DataFrameField.CREATE_TIME.getPreferredName());
this.createTime = Instant.ofEpochMilli(createTime.toEpochMilli());
return this;
}
Expand Down Expand Up @@ -332,10 +331,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
builder.field(DataFrameField.DESCRIPTION.getPreferredName(), description);
}
if (transformVersion != null) {
builder.field(VERSION.getPreferredName(), transformVersion);
builder.field(DataFrameField.VERSION.getPreferredName(), transformVersion);
}
if (createTime != null) {
builder.timeField(CREATE_TIME.getPreferredName(), CREATE_TIME.getPreferredName() + "_string", createTime.toEpochMilli());
builder.timeField(DataFrameField.CREATE_TIME.getPreferredName(), DataFrameField.CREATE_TIME.getPreferredName() + "_string",
createTime.toEpochMilli());
}
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public void testDeleteConfigurationLeftOver() throws IOException {
builder.endObject();
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
Request req = new Request("PUT",
DataFrameInternalIndex.INDEX_NAME + "/_doc/" + DataFrameTransformConfig.documentId(fakeTransformName));
DataFrameInternalIndex.LATEST_INDEX_NAME + "/_doc/" + DataFrameTransformConfig.documentId(fakeTransformName));
req.setEntity(entity);
client().performRequest(req);
}

// refresh the index
assertOK(client().performRequest(new Request("POST", DataFrameInternalIndex.INDEX_NAME + "/_refresh")));
assertOK(client().performRequest(new Request("POST", DataFrameInternalIndex.LATEST_INDEX_NAME + "/_refresh")));

Request deleteRequest = new Request("DELETE", DATAFRAME_ENDPOINT + fakeTransformName);
Response deleteResponse = client().performRequest(deleteRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public void wipeDataFrameTransforms() throws IOException {
assertTrue(transformConfigs.isEmpty());

// the configuration index should be empty
Request request = new Request("GET", DataFrameInternalIndex.INDEX_NAME + "/_search");
Request request = new Request("GET", DataFrameInternalIndex.LATEST_INDEX_NAME + "/_search");
try {
Response searchResponse = adminClient().performRequest(request);
Map<String, Object> searchResult = entityAsMap(searchResponse);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.dataframe.integration;

import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.dataframe.GetDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.GetDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.UpdateDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.UpdateDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;

import static org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex.addDataFrameTransformsConfigMappings;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.equalTo;


public class DataFrameTransformInternalIndexIT extends ESRestTestCase {


private static final String CURRENT_INDEX = DataFrameInternalIndex.LATEST_INDEX_NAME;
private static final String OLD_INDEX = DataFrameInternalIndex.INDEX_PATTERN + "1";


public void testUpdateDeletesOldTransformConfig() throws Exception {
TestRestHighLevelClient client = new TestRestHighLevelClient();
// The mapping does not need to actually be the "OLD" mapping, we are testing that the old doc gets deleted, and the new one
// created.
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
builder.startObject();
builder.startObject("properties");
builder.startObject(DataFrameField.INDEX_DOC_TYPE.getPreferredName()).field("type", "keyword").endObject();
addDataFrameTransformsConfigMappings(builder);
builder.endObject();
builder.endObject();
client.indices().create(new CreateIndexRequest(OLD_INDEX).mapping(builder), RequestOptions.DEFAULT);
}
String transformIndex = "transform-index-deletes-old";
createSourceIndex(transformIndex);
String transformId = "transform-update-deletes-old-transform-config";
String config = "{\"dest\": {\"index\":\"bar\"},"
+ " \"source\": {\"index\":\"" + transformIndex + "\", \"query\": {\"match_all\":{}}},"
+ " \"id\": \""+transformId+"\","
+ " \"doc_type\": \"data_frame_transform_config\","
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } },"
+ "\"frequency\":\"1s\""
+ "}";
client.index(new IndexRequest(OLD_INDEX)
.id(DataFrameTransformConfig.documentId(transformId))
.source(config, XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
RequestOptions.DEFAULT);
GetResponse getResponse = client.get(new GetRequest(OLD_INDEX, DataFrameTransformConfig.documentId(transformId)),
RequestOptions.DEFAULT);
assertThat(getResponse.isExists(), is(true));

GetDataFrameTransformResponse response = client.dataFrame()
.getDataFrameTransform(new GetDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
assertThat(response.getTransformConfigurations().get(0).getId(), equalTo(transformId));

UpdateDataFrameTransformResponse updated = client.dataFrame().updateDataFrameTransform(
new UpdateDataFrameTransformRequest(DataFrameTransformConfigUpdate.builder().setDescription("updated").build(), transformId),
RequestOptions.DEFAULT);

assertThat(updated.getTransformConfiguration().getId(), equalTo(transformId));
assertThat(updated.getTransformConfiguration().getDescription(), equalTo("updated"));

// Old should now be gone
getResponse = client.get(new GetRequest(OLD_INDEX, DataFrameTransformConfig.documentId(transformId)), RequestOptions.DEFAULT);
assertThat(getResponse.isExists(), is(false));

// New should be here
getResponse = client.get(new GetRequest(CURRENT_INDEX, DataFrameTransformConfig.documentId(transformId)),
RequestOptions.DEFAULT);
assertThat(getResponse.isExists(), is(true));
}


@Override
protected Settings restClientSettings() {
final String token = "Basic " +
Base64.getEncoder().encodeToString(("x_pack_rest_user:x-pack-test-password").getBytes(StandardCharsets.UTF_8));
return Settings.builder()
.put(ThreadContext.PREFIX + ".Authorization", token)
.build();
}

private void createSourceIndex(String index) throws IOException {
TestRestHighLevelClient client = new TestRestHighLevelClient();
client.indices().create(new CreateIndexRequest(index), RequestOptions.DEFAULT);
}

private class TestRestHighLevelClient extends RestHighLevelClient {
TestRestHighLevelClient() {
super(client(), restClient -> {}, new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void testUsage() throws Exception {
stopDataFrameTransform("test_usage", false);

Request statsExistsRequest = new Request("GET",
DataFrameInternalIndex.INDEX_NAME+"/_search?q=" +
DataFrameInternalIndex.LATEST_INDEX_NAME+"/_search?q=" +
INDEX_DOC_TYPE.getPreferredName() + ":" +
DataFrameTransformStoredDoc.NAME);
// Verify that we have one stat document
Expand Down Expand Up @@ -96,7 +96,7 @@ public void testUsage() throws Exception {
XContentMapValues.extractValue("data_frame.stats." + statName, statsMap));
}
// Refresh the index so that statistics are searchable
refreshIndex(DataFrameInternalIndex.INDEX_TEMPLATE_NAME);
refreshIndex(DataFrameInternalIndex.LATEST_INDEX_VERSIONED_NAME);
}, 60, TimeUnit.SECONDS);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDataUpgrader() {
return templates -> {
try {
templates.put(DataFrameInternalIndex.INDEX_TEMPLATE_NAME, DataFrameInternalIndex.getIndexTemplateMetaData());
templates.put(DataFrameInternalIndex.LATEST_INDEX_VERSIONED_NAME, DataFrameInternalIndex.getIndexTemplateMetaData());
} catch (IOException e) {
logger.error("Error creating data frame index template", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ static void getStatisticSummations(Client client, ActionListener<DataFrameIndexe
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(),
DataFrameTransformStoredDoc.NAME)));

SearchRequestBuilder requestBuilder = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
SearchRequestBuilder requestBuilder = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN)
.setSize(0)
.setQuery(queryBuilder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected void masterOperation(Task task, XPackUsageRequest request, ClusterStat
}
);

SearchRequest totalTransformCount = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
SearchRequest totalTransformCount = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN)
.setTrackTotalHits(true)
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformConfig.NAME))))
Expand Down
Loading

0 comments on commit f989beb

Please sign in to comment.