Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] fixing bug where analytics process starts with 0 rows #45879

Merged
1 change: 1 addition & 0 deletions x-pack/plugin/ml/qa/ml-with-security/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ integTest.runner {
'ml/start_data_frame_analytics/Test start given source index has no compatible fields',
'ml/start_data_frame_analytics/Test start with inconsistent body/param ids',
'ml/start_data_frame_analytics/Test start given dest index is not empty',
'ml/start_data_frame_analytics/Test start with compatible fields but no data',
'ml/start_stop_datafeed/Test start datafeed job, but not open',
'ml/start_stop_datafeed/Test start non existing datafeed',
'ml/start_stop_datafeed/Test stop non existing datafeed',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -226,10 +227,41 @@ public void onFailure(Exception e) {
}

private void getConfigAndValidate(String id, ActionListener<DataFrameAnalyticsConfig> finalListener) {

// Step 5. Validate that there are analyzable data in the source index
ActionListener<DataFrameAnalyticsConfig> validateMappingsMergeListener = ActionListener.wrap(
config -> DataFrameDataExtractorFactory.createForSourceIndices(client,
"validate_source_index_has_rows-" + id,
config,
ActionListener.wrap(
dataFrameDataExtractorFactory ->
dataFrameDataExtractorFactory
.newExtractor(false)
.collectDataSummaryAsync(ActionListener.wrap(
dataSummary -> {
if (dataSummary.rows == 0) {
finalListener.onFailure(new ElasticsearchStatusException(
"Unable to start {} as there are no analyzable data in source indices [{}].",
RestStatus.BAD_REQUEST,
id,
Strings.arrayToCommaDelimitedString(config.getSource().getIndex())
));
} else {
finalListener.onResponse(config);
}
},
finalListener::onFailure
)),
finalListener::onFailure
))
,
finalListener::onFailure
);

// Step 4. Validate mappings can be merged
ActionListener<DataFrameAnalyticsConfig> toValidateMappingsListener = ActionListener.wrap(
config -> MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource().getIndex(), ActionListener.wrap(
mappings -> finalListener.onResponse(config), finalListener::onFailure)),
mappings -> validateMappingsMergeListener.onResponse(config), finalListener::onFailure)),
finalListener::onFailure
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ClearScrollAction;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchAction;
Expand Down Expand Up @@ -234,14 +235,33 @@ public List<String> getFieldNames() {
}

public DataSummary collectDataSummary() {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder();
SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size());
}

public void collectDataSummaryAsync(ActionListener<DataSummary> dataSummaryActionListener) {
SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder();
final int numberOfFields = context.extractedFields.getAllFields().size();

ClientHelper.executeWithHeadersAsync(context.headers,
ClientHelper.ML_ORIGIN,
client,
SearchAction.INSTANCE,
searchRequestBuilder.request(),
ActionListener.wrap(
searchResponse -> dataSummaryActionListener.onResponse(
new DataSummary(searchResponse.getHits().getTotalHits().value, numberOfFields)),
dataSummaryActionListener::onFailure
));
}

private SearchRequestBuilder buildDataSummarySearchRequestBuilder() {
return new SearchRequestBuilder(client, SearchAction.INSTANCE)
.setIndices(context.indices)
.setSize(0)
.setQuery(context.query)
.setTrackTotalHits(true);

SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size());
}

public Set<String> getCategoricalFields() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
Expand Down Expand Up @@ -273,7 +274,15 @@ private synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtr
}

dataExtractor = dataExtractorFactory.newExtractor(false);
process = createProcess(task, createProcessConfig(config, dataExtractor));
AnalyticsProcessConfig analyticsProcessConfig = createProcessConfig(config, dataExtractor);
LOGGER.trace("[{}] creating analytics process with config [{}]", config.getId(), Strings.toString(analyticsProcessConfig));
// If we have no rows, that means there is no data so no point in starting the native process
// just finish the task
if (analyticsProcessConfig.rows() == 0) {
LOGGER.info("[{}] no data found to analyze. Will not start analytics native process.", config.getId());
return false;
}
process = createProcess(task, analyticsProcessConfig);
DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client,
dataExtractorFactory.newExtractor(true));
resultProcessor = new AnalyticsResultProcessor(id, dataFrameRowsJoiner, this::isProcessKilled, task.getProgressTracker());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,34 @@
id: "foo"

---
"Test start with compatible fields but no data":
- do:
indices.create:
index: empty-index-with-compatible-fields
body:
mappings:
properties:
long_field: { "type": "long" }

- do:
ml.put_data_frame_analytics:
id: "empty-with-compatible-fields"
body: >
{
"source": {
"index": "empty-index-with-compatible-fields"
},
"dest": {
"index": "empty-index-with-compatible-fields-dest"
},
"analysis": {"outlier_detection":{}}
}

- do:
catch: /Unable to start empty-with-compatible-fields as there are no analyzable data in source indices \[empty-index-with-compatible-fields\]/
ml.start_data_frame_analytics:
id: "empty-with-compatible-fields"
---
"Test start with inconsistent body/param ids":

- do:
Expand Down