Skip to content

Commit

Permalink
[ML] Return 408 when open/start APIs time out
Browse files Browse the repository at this point in the history
This changes the response status code from `500` to `408` when
the following ML APIs time out:

- open anomaly detection job
- start datafeed
- start data frame analytics

Closes elastic#89585
  • Loading branch information
dimitris-athanasiou committed Sep 1, 2022
1 parent b5504ea commit 3c1d74d
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
Expand Down Expand Up @@ -792,4 +794,38 @@ private void startRealtime(String jobId, Integer maxEmptySearches) throws Except
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
}, 30, TimeUnit.SECONDS);
}

public void testStartDatafeed_GivenTimeout_Returns408() throws Exception {
client().admin().indices().prepareCreate("data-1").setMapping("time", "type=date").get();
long numDocs = 100;
long now = System.currentTimeMillis();
long oneWeekAgo = now - 604800000;
indexDocs(logger, "data-1", numDocs, oneWeekAgo, now);

String jobId = "job-for-start-datafeed-timeout";
String datafeedId = jobId + "-datafeed";

Job.Builder job = createScheduledJob(jobId);
putJob(job);
openJob(job.getId());
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));

DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(
job.getId() + "-datafeed",
job.getId(),
Collections.singletonList("data-1")
);
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
putDatafeed(datafeedConfig);

StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedId, oneWeekAgo);
request.getParams().setTimeout(TimeValue.timeValueNanos(1L));

ElasticsearchException e = expectThrows(
ElasticsearchException.class,
() -> client().execute(StartDatafeedAction.INSTANCE, request).actionGet()
);

assertThat(e.status(), equalTo(RestStatus.REQUEST_TIMEOUT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.SecuritySettingsSourceField;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.ml.MlTasks;
Expand All @@ -31,6 +33,7 @@
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -150,6 +153,15 @@ public void testUsage() throws IOException {
assertEquals(1, XContentMapValues.extractValue("ml.jobs.opened.count", usage));
}

public void testOpenJob_GivenTimeout_Returns408() throws IOException {
String jobId = "test-timeout-returns-408";
createFarequoteJob(jobId);

ResponseException e = expectThrows(ResponseException.class, () -> openJob(jobId, Optional.of(TimeValue.timeValueNanos(1L))));

assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.REQUEST_TIMEOUT.getStatus()));
}

private Response createFarequoteJob(String jobId) throws IOException {
return putJob(jobId, """
{
Expand Down Expand Up @@ -960,10 +972,19 @@ private String getAliases() throws IOException {
}

private void openJob(String jobId) throws IOException {
Response response = openJob(jobId, Optional.empty());
assertThat(entityAsMap(response), hasEntry("opened", true));
}

private Response openJob(String jobId, Optional<TimeValue> timeout) throws IOException {
StringBuilder path = new StringBuilder(MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open");
if (timeout.isPresent()) {
path.append("?timeout=" + timeout.get().getStringRep());
}
Response openResponse = client().performRequest(
new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open")
new Request("POST", path.toString())
);
assertThat(entityAsMap(openResponse), hasEntry("opened", true));
return openResponse;
}

private void closeJob(String jobId) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand All @@ -15,12 +16,15 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
Expand Down Expand Up @@ -1043,6 +1047,45 @@ public void testOutlierDetection_GivenSearchRuntimeMappings() throws Exception {
);
}

public void testStart_GivenTimeout_Returns408() throws Exception {
String sourceIndex = "test-timeout-returns-408-data";

client().admin().indices().prepareCreate(sourceIndex).setMapping("numeric_1", "type=integer", "numeric_2", "type=integer").get();

BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

for (int i = 0; i < 5; i++) {
IndexRequest indexRequest = new IndexRequest(sourceIndex);
indexRequest.id(String.valueOf(i));
indexRequest.source("numeric_1", randomInt(), "numeric_2", randomInt());
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}

String id = "test-timeout-returns-408";
DataFrameAnalyticsConfig config = buildAnalytics(
id,
sourceIndex,
sourceIndex + "-results",
null,
new OutlierDetection.Builder().build()
);
putAnalytics(config);

StartDataFrameAnalyticsAction.Request request = new StartDataFrameAnalyticsAction.Request(id);
request.setTimeout(TimeValue.timeValueNanos(1L));
ElasticsearchException e = expectThrows(
ElasticsearchException.class,
() -> client().execute(StartDataFrameAnalyticsAction.INSTANCE, request).actionGet()
);

assertThat(e.status(), equalTo(RestStatus.REQUEST_TIMEOUT));
}

@Override
boolean supportsInference() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,14 @@ public void onFailure(Exception e) {

@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new ElasticsearchException("Opening job [{}] timed out after [{}]", jobParams.getJob(), timeout));
listener.onFailure(
new ElasticsearchStatusException(
"Opening job [{}] timed out after [{}]",
RestStatus.REQUEST_TIMEOUT,
jobParams.getJob().getId(),
timeout
)
);
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,9 @@ public void onTimeout(TimeValue timeout) {
);
} else {
listener.onFailure(
new ElasticsearchException(
new ElasticsearchStatusException(
"Starting data frame analytics [{}] timed out after [{}]",
RestStatus.REQUEST_TIMEOUT,
task.getParams().getId(),
timeout
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
Expand Down Expand Up @@ -407,7 +406,12 @@ public void onFailure(Exception e) {
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(
new ElasticsearchException("Starting datafeed [" + params.getDatafeedId() + "] timed out after [" + timeout + "]")
new ElasticsearchStatusException(
"Starting datafeed [{}] timed out after [{}]",
RestStatus.REQUEST_TIMEOUT,
params.getDatafeedId(),
timeout
)
);
}
}
Expand Down

0 comments on commit 3c1d74d

Please sign in to comment.