Skip to content

Commit

Permalink
[ML][Data frame] make sure that fields exist when creating progress (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
benwtrent authored Jun 7, 2019
1 parent 68d0829 commit a5055f6
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
Expand Down Expand Up @@ -44,7 +43,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameTransformProgressIT extends ESRestTestCase {

protected void createReviewsIndex() throws Exception {
Expand Down Expand Up @@ -164,6 +162,23 @@ public void testGetProgress() throws Exception {
assertThat(progress.getRemainingDocs(), equalTo(35L));
assertThat(progress.getPercentComplete(), equalTo(0.0));

histgramGroupConfig = new GroupConfig(Collections.emptyMap(),
Collections.singletonMap("every_50", new HistogramGroupSource("missing_field", 50.0)));
pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null);
config = new DataFrameTransformConfig("get_progress_transform",
sourceConfig,
destConfig,
null,
pivotConfig,
null);

response = restClient.search(TransformProgressGatherer.getSearchRequest(config), RequestOptions.DEFAULT);
progress = TransformProgressGatherer.searchResponseToDataFrameTransformProgressFunction().apply(response);

assertThat(progress.getTotalDocs(), equalTo(0L));
assertThat(progress.getRemainingDocs(), equalTo(0L));
assertThat(progress.getPercentComplete(), equalTo(100.0));

deleteIndex(REVIEWS_INDEX_NAME);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
.setInitialPosition(stateAndStats.getTransformState().getPosition())
.setProgress(stateAndStats.getTransformState().getProgress())
.setIndexerState(currentIndexerState(stateAndStats.getTransformState()));
logger.info("[{}] Loading existing state: [{}], position [{}]",
logger.debug("[{}] Loading existing state: [{}], position [{}]",
transformId,
stateAndStats.getTransformState(),
stateAndStats.getTransformState().getPosition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
Expand Down Expand Up @@ -49,10 +51,20 @@ public static void getInitialProgress(Client client,
public static SearchRequest getSearchRequest(DataFrameTransformConfig config) {
SearchRequest request = new SearchRequest(config.getSource().getIndex());
request.allowPartialSearchResults(false);
BoolQueryBuilder existsClauses = QueryBuilders.boolQuery();
config.getPivotConfig()
.getGroupConfig()
.getGroups()
.values()
// TODO change once we allow missing_buckets
.forEach(src -> existsClauses.must(QueryBuilders.existsQuery(src.getField())));

request.source(new SearchSourceBuilder()
.size(0)
.trackTotalHits(true)
.query(config.getSource().getQueryConfig().getQuery()));
.query(QueryBuilders.boolQuery()
.filter(config.getSource().getQueryConfig().getQuery())
.filter(existsClauses)));
return request;
}

Expand Down

0 comments on commit a5055f6

Please sign in to comment.