Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML-DataFrame] version data frame transform internal index #45375

Merged
merged 14 commits into from
Aug 22, 2019
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ protected void searchResources(AbstractGetResourcesRequest request, ActionListen
sourceBuilder.from(request.getPageParams().getFrom())
.size(request.getPageParams().getSize());
}
sourceBuilder.trackTotalHits(true);

IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;
SearchRequest searchRequest = new SearchRequest(getIndices())
Expand All @@ -88,7 +89,7 @@ protected void searchResources(AbstractGetResourcesRequest request, ActionListen
indicesOptions.expandWildcardsOpen(),
indicesOptions.expandWildcardsClosed(),
indicesOptions))
.source(sourceBuilder.trackTotalHits(true));
.source(customSearchOptions(sourceBuilder));

executeAsyncWithOrigin(client.threadPool().getThreadContext(),
executionOrigin(),
Expand All @@ -105,8 +106,11 @@ public void onResponse(SearchResponse response) {
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
Resource resource = parse(parser);
docs.add(resource);
foundResourceIds.add(extractIdFromResource(resource));
// Do not include a resource with the same ID twice
if (foundResourceIds.contains(extractIdFromResource(resource)) == false) {
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
docs.add(resource);
foundResourceIds.add(extractIdFromResource(resource));
}
} catch (IOException e) {
this.onFailure(e);
}
Expand Down Expand Up @@ -159,6 +163,10 @@ private QueryBuilder buildQuery(String[] tokens, String resourceIdField) {
return boolQuery.hasClauses() ? boolQuery : QueryBuilders.matchAllQuery();
}

protected SearchSourceBuilder customSearchOptions(SearchSourceBuilder searchSourceBuilder) {
return searchSourceBuilder;
}

@Nullable
protected QueryBuilder additionalQuery() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public final class DataFrameField {
public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type");
public static final ParseField SOURCE = new ParseField("source");
public static final ParseField DESCRIPTION = new ParseField("description");
public static final ParseField VERSION = new ParseField("version");
public static final ParseField CREATE_TIME = new ParseField("create_time");
public static final ParseField DESTINATION = new ParseField("dest");
public static final ParseField FREQUENCY = new ParseField("frequency");
public static final ParseField FORCE = new ParseField("force");
Expand Down Expand Up @@ -65,7 +67,6 @@ public final class DataFrameField {
// strings for meta information
public static final String META_FIELDNAME = "_data_frame";
public static final String CREATION_DATE_MILLIS = "creation_date_in_millis";
public static final String VERSION = "version";
public static final String CREATED = "created";
public static final String CREATED_BY = "created_by";
public static final String TRANSFORM = "transform";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> implements PersistentTaskParams {

public static final String NAME = DataFrameField.TASK_NAME;
public static final ParseField VERSION = new ParseField(DataFrameField.VERSION);
public static final ParseField FREQUENCY = DataFrameField.FREQUENCY;

private final String transformId;
Expand All @@ -36,7 +35,7 @@ public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> imp

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameField.ID);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), VERSION);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), DataFrameField.VERSION);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FREQUENCY);
}

Expand Down Expand Up @@ -90,7 +89,7 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DataFrameField.ID.getPreferredName(), transformId);
builder.field(VERSION.getPreferredName(), version);
builder.field(DataFrameField.VERSION.getPreferredName(), version);
if (frequency != null) {
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
// types of transforms
public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");

public static final ParseField VERSION = new ParseField("version");
public static final ParseField CREATE_TIME = new ParseField("create_time");
private static final ConstructingObjectParser<DataFrameTransformConfig, String> STRICT_PARSER = createParser(false);
private static final ConstructingObjectParser<DataFrameTransformConfig, String> LENIENT_PARSER = createParser(true);
static final int MAX_DESCRIPTION_LENGTH = 1_000;
Expand Down Expand Up @@ -98,8 +96,8 @@ private static ConstructingObjectParser<DataFrameTransformConfig, String> create
// on strict parsing do not allow injection of headers, transform version, or create time
if (lenient == false) {
validateStrictParsingParams(args[6], HEADERS.getPreferredName());
validateStrictParsingParams(args[9], CREATE_TIME.getPreferredName());
validateStrictParsingParams(args[10], VERSION.getPreferredName());
validateStrictParsingParams(args[9], DataFrameField.CREATE_TIME.getPreferredName());
validateStrictParsingParams(args[10], DataFrameField.VERSION.getPreferredName());
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -132,8 +130,9 @@ private static ConstructingObjectParser<DataFrameTransformConfig, String> create
parser.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p, lenient), PIVOT_TRANSFORM);
parser.declareString(optionalConstructorArg(), DataFrameField.DESCRIPTION);
parser.declareField(optionalConstructorArg(),
p -> TimeUtils.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()), CREATE_TIME, ObjectParser.ValueType.VALUE);
parser.declareString(optionalConstructorArg(), VERSION);
p -> TimeUtils.parseTimeFieldToInstant(p, DataFrameField.CREATE_TIME.getPreferredName()), DataFrameField.CREATE_TIME,
ObjectParser.ValueType.VALUE);
parser.declareString(optionalConstructorArg(), DataFrameField.VERSION);
return parser;
}

Expand Down Expand Up @@ -256,7 +255,7 @@ public Instant getCreateTime() {
}

public DataFrameTransformConfig setCreateTime(Instant createTime) {
ExceptionsHelper.requireNonNull(createTime, CREATE_TIME.getPreferredName());
ExceptionsHelper.requireNonNull(createTime, DataFrameField.CREATE_TIME.getPreferredName());
this.createTime = Instant.ofEpochMilli(createTime.toEpochMilli());
return this;
}
Expand Down Expand Up @@ -332,10 +331,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
builder.field(DataFrameField.DESCRIPTION.getPreferredName(), description);
}
if (transformVersion != null) {
builder.field(VERSION.getPreferredName(), transformVersion);
builder.field(DataFrameField.VERSION.getPreferredName(), transformVersion);
}
if (createTime != null) {
builder.timeField(CREATE_TIME.getPreferredName(), CREATE_TIME.getPreferredName() + "_string", createTime.toEpochMilli());
builder.timeField(DataFrameField.CREATE_TIME.getPreferredName(), DataFrameField.CREATE_TIME.getPreferredName() + "_string",
createTime.toEpochMilli());
}
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public void testDeleteConfigurationLeftOver() throws IOException {
builder.endObject();
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
Request req = new Request("PUT",
DataFrameInternalIndex.INDEX_NAME + "/_doc/" + DataFrameTransformConfig.documentId(fakeTransformName));
DataFrameInternalIndex.LATEST_INDEX_NAME + "/_doc/" + DataFrameTransformConfig.documentId(fakeTransformName));
req.setEntity(entity);
client().performRequest(req);
}

// refresh the index
assertOK(client().performRequest(new Request("POST", DataFrameInternalIndex.INDEX_NAME + "/_refresh")));
assertOK(client().performRequest(new Request("POST", DataFrameInternalIndex.LATEST_INDEX_NAME + "/_refresh")));

Request deleteRequest = new Request("DELETE", DATAFRAME_ENDPOINT + fakeTransformName);
Response deleteResponse = client().performRequest(deleteRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public void wipeDataFrameTransforms() throws IOException {
assertTrue(transformConfigs.isEmpty());

// the configuration index should be empty
Request request = new Request("GET", DataFrameInternalIndex.INDEX_NAME + "/_search");
Request request = new Request("GET", DataFrameInternalIndex.LATEST_INDEX_NAME + "/_search");
try {
Response searchResponse = adminClient().performRequest(request);
Map<String, Object> searchResult = entityAsMap(searchResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void testUsage() throws Exception {
stopDataFrameTransform("test_usage", false);

Request statsExistsRequest = new Request("GET",
DataFrameInternalIndex.INDEX_NAME+"/_search?q=" +
DataFrameInternalIndex.LATEST_INDEX_NAME+"/_search?q=" +
INDEX_DOC_TYPE.getPreferredName() + ":" +
DataFrameTransformStoredDoc.NAME);
// Verify that we have one stat document
Expand Down Expand Up @@ -96,7 +96,7 @@ public void testUsage() throws Exception {
XContentMapValues.extractValue("data_frame.stats." + statName, statsMap));
}
// Refresh the index so that statistics are searchable
refreshIndex(DataFrameInternalIndex.INDEX_TEMPLATE_NAME);
refreshIndex(DataFrameInternalIndex.LATEST_INDEX_VERSIONED_NAME);
}, 60, TimeUnit.SECONDS);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDataUpgrader() {
return templates -> {
try {
templates.put(DataFrameInternalIndex.INDEX_TEMPLATE_NAME, DataFrameInternalIndex.getIndexTemplateMetaData());
templates.put(DataFrameInternalIndex.LATEST_INDEX_VERSIONED_NAME, DataFrameInternalIndex.getIndexTemplateMetaData());
} catch (IOException e) {
logger.error("Error creating data frame index template", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ static void getStatisticSummations(Client client, ActionListener<DataFrameIndexe
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(),
DataFrameTransformStoredDoc.NAME)));

SearchRequestBuilder requestBuilder = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
SearchRequestBuilder requestBuilder = client.prepareSearch(DataFrameInternalIndex.LATEST_INDEX_NAME)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will not provide accurate data. Stats documents for various transforms could exist between various index versions. Since we use aggregations and a transform could have multiple stored docs between index versions, I am not sure we can even provide accurate stats.

@droberts195 What do you think?

Copy link
Member

@benwtrent benwtrent Aug 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and DataFrameUsageTransportAction.java, these don't seem like critical pieces data and SOME information drift is OK. How we tackle this seems predicated on how we deal with old indices and how quickly they are cleaned up.

.setSize(0)
.setQuery(queryBuilder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected void masterOperation(Task task, XPackUsageRequest request, ClusterStat
}
);

SearchRequest totalTransformCount = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
SearchRequest totalTransformCount = client.prepareSearch(DataFrameInternalIndex.LATEST_INDEX_NAME)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if a transform exists in the old index and has never been updated (thus NOT in the latest index) ?

.setTrackTotalHits(true)
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformConfig.NAME))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
Expand Down Expand Up @@ -56,7 +58,7 @@ protected ParseField getResultsField() {

@Override
protected String[] getIndices() {
return new String[]{DataFrameInternalIndex.INDEX_NAME};
return new String[]{DataFrameInternalIndex.INDEX_NAME_PATTERN};
}

@Override
Expand Down Expand Up @@ -84,4 +86,10 @@ protected String extractIdFromResource(DataFrameTransformConfig transformConfig)
protected QueryBuilder additionalQuery() {
return QueryBuilders.termQuery(INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformConfig.NAME);
}

@Override
protected SearchSourceBuilder customSearchOptions(SearchSourceBuilder searchSourceBuilder) {
return searchSourceBuilder.sort("_index", SortOrder.DESC);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ protected StopDataFrameTransformAction.Response newResponse(StopDataFrameTransfo
waitResponse ->
client.admin()
.indices()
.prepareRefresh(DataFrameInternalIndex.INDEX_NAME)
.prepareRefresh(DataFrameInternalIndex.LATEST_INDEX_NAME)
.execute(ActionListener.wrap(
r -> listener.onResponse(waitResponse),
e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
private void handlePrivsResponse(String username,
Request request,
DataFrameTransformConfig config,
DataFrameTransformsConfigManager.SeqNoPrimaryTermPair seqNoPrimaryTermPair,
DataFrameTransformsConfigManager.SeqNoPrimaryTermAndIndex seqNoPrimaryTermPair,
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
ClusterState clusterState,
HasPrivilegesResponse privilegesResponse,
ActionListener<Response> listener) {
Expand All @@ -159,7 +159,7 @@ private void handlePrivsResponse(String username,
private void validateAndUpdateDataFrame(Request request,
ClusterState clusterState,
DataFrameTransformConfig config,
DataFrameTransformsConfigManager.SeqNoPrimaryTermPair seqNoPrimaryTermPair,
DataFrameTransformsConfigManager.SeqNoPrimaryTermAndIndex seqNoPrimaryTermPair,
ActionListener<Response> listener) {
try {
SourceDestValidator.validate(config, clusterState, indexNameExpressionResolver, request.isDeferValidation());
Expand All @@ -184,7 +184,7 @@ private void validateAndUpdateDataFrame(Request request,
}
private void updateDataFrame(Request request,
DataFrameTransformConfig config,
DataFrameTransformsConfigManager.SeqNoPrimaryTermPair seqNoPrimaryTermPair,
DataFrameTransformsConfigManager.SeqNoPrimaryTermAndIndex seqNoPrimaryTermPair,
ClusterState clusterState,
ActionListener<Response> listener) {

Expand Down
Loading