diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index d372b063252b0..2e1285d4989ed 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -1321,11 +1321,12 @@ For open jobs only, the elapsed time for which the job has been open. end::open-time[] tag::out-of-order-timestamp-count[] -The number of input documents that are out of time sequence and outside -of the latency window. This information is applicable only when you provide data -to the {anomaly-job} by using the <>. These out of -order documents are discarded, since jobs require time series data to be in -ascending chronological order. +The number of input documents that have a timestamp chronologically +preceding the start of the current anomaly detection bucket offset by +the latency window. This information is applicable only when you provide +data to the {anomaly-job} by using the <>. +These out of order documents are discarded, since jobs require time +series data to be in ascending chronological order. end::out-of-order-timestamp-count[] tag::outlier-fraction[] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/DataCounts.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/DataCounts.java index fb6d04db07213..446bafa0b4480 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/DataCounts.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/DataCounts.java @@ -407,14 +407,6 @@ public void setLatestRecordTimeStamp(Date latestRecordTimeStamp) { this.latestRecordTimeStamp = latestRecordTimeStamp; } - public void updateLatestRecordTimeStamp(Date latestRecordTimeStamp) { - if (latestRecordTimeStamp != null && - (this.latestRecordTimeStamp == null || - latestRecordTimeStamp.after(this.latestRecordTimeStamp))) { - this.latestRecordTimeStamp = latestRecordTimeStamp; - } - } - /** * The wall clock time the latest record was seen. * diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java index 86d5093227503..440e3136f96a7 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java @@ -46,6 +46,10 @@ public class MlJobIT extends ESRestTestCase { private static final String BASIC_AUTH_VALUE = UsernamePasswordToken.basicAuthHeaderValue("x_pack_rest_user", SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING); + private static final RequestOptions POST_DATA = RequestOptions.DEFAULT.toBuilder() + .setWarningsHandler(warnings -> Collections.singletonList( + "Posting data directly to anomaly detection jobs is deprecated, " + + "in a future major version it will be compulsory to use a datafeed").equals(warnings) == false).build(); @Override protected Settings restClientSettings() { @@ -127,8 +131,7 @@ public void testUsage() throws IOException { Map usage = entityAsMap(client().performRequest(new Request("GET", "_xpack/usage"))); assertEquals(2, XContentMapValues.extractValue("ml.jobs._all.count", usage)); assertEquals(2, XContentMapValues.extractValue("ml.jobs.closed.count", usage)); - Response openResponse = client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/job-1/_open")); - assertThat(entityAsMap(openResponse), hasEntry("opened", true)); + openJob("job-1"); usage = entityAsMap(client().performRequest(new Request("GET", "_xpack/usage"))); assertEquals(2, XContentMapValues.extractValue("ml.jobs._all.count", usage)); assertEquals(1, XContentMapValues.extractValue("ml.jobs.closed.count", usage)); @@ -136,20 +139,17 @@ public void testUsage() throws IOException { } private Response createFarequoteJob(String jobId) throws IOException { - Request request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId); - request.setJsonEntity( - "{\n" - + " \"description\":\"Analysis of response time by airline\",\n" - + " \"analysis_config\" : {\n" - + " \"bucket_span\": \"3600s\",\n" - + " \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n" - + " },\n" + " \"data_description\" : {\n" - + " \"field_delimiter\":\",\",\n" - + " \"time_field\":\"time\",\n" - + " \"time_format\":\"yyyy-MM-dd HH:mm:ssX\"\n" - + " }\n" - + "}"); - return client().performRequest(request); + return putJob(jobId, "{\n" + + " \"description\":\"Analysis of response time by airline\",\n" + + " \"analysis_config\" : {\n" + + " \"bucket_span\": \"3600s\",\n" + + " \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n" + + " },\n" + " \"data_description\" : {\n" + + " \"field_delimiter\":\",\",\n" + + " \"time_field\":\"time\",\n" + + " \"time_format\":\"yyyy-MM-dd HH:mm:ssX\"\n" + + " }\n" + + "}"); } public void testCantCreateJobWithSameID() throws Exception { @@ -161,13 +161,10 @@ public void testCantCreateJobWithSameID() throws Exception { " \"results_index_name\" : \"%s\"}"; String jobId = "cant-create-job-with-same-id-job"; - Request createJob1 = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId); - createJob1.setJsonEntity(String.format(Locale.ROOT, jobTemplate, "index-1")); - client().performRequest(createJob1); - - Request createJob2 = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId); - createJob2.setJsonEntity(String.format(Locale.ROOT, jobTemplate, "index-2")); - ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(createJob2)); + putJob(jobId, String.format(Locale.ROOT, jobTemplate, "index-1")); + ResponseException e = expectThrows(ResponseException.class, + () -> putJob(jobId, String.format(Locale.ROOT, jobTemplate, "index-2")) + ); assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400)); assertThat(e.getMessage(), containsString("The job cannot be created with the Id '" + jobId + "'. The Id is already used.")); @@ -183,14 +180,10 @@ public void testCreateJobsWithIndexNameOption() throws Exception { String jobId1 = "create-jobs-with-index-name-option-job-1"; String indexName = "non-default-index"; - Request createJob1 = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1); - createJob1.setJsonEntity(String.format(Locale.ROOT, jobTemplate, indexName)); - client().performRequest(createJob1); + putJob(jobId1, String.format(Locale.ROOT, jobTemplate, indexName)); String jobId2 = "create-jobs-with-index-name-option-job-2"; - Request createJob2 = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId2); - createJob2.setEntity(createJob1.getEntity()); - client().performRequest(createJob2); + putJob(jobId2, String.format(Locale.ROOT, jobTemplate, indexName)); // With security enabled GET _aliases throws an index_not_found_exception // if no aliases have been created. In multi-node tests the alias may not @@ -313,9 +306,7 @@ public void testCreateJobInSharedIndexUpdatesMapping() throws Exception { String jobId2 = "create-job-in-shared-index-updates-mapping-job-2"; String byFieldName2 = "cpu-usage"; - Request createJob1Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1); - createJob1Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName1)); - client().performRequest(createJob1Request); + putJob(jobId1, String.format(Locale.ROOT, jobTemplate, byFieldName1)); // Check the index mapping contains the first by_field_name Request getResultsMappingRequest = new Request("GET", @@ -325,10 +316,7 @@ public void testCreateJobInSharedIndexUpdatesMapping() throws Exception { assertThat(resultsMappingAfterJob1, containsString(byFieldName1)); assertThat(resultsMappingAfterJob1, not(containsString(byFieldName2))); - Request createJob2Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId2); - createJob2Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName2)); - client().performRequest(createJob2Request); - + putJob(jobId2, String.format(Locale.ROOT, jobTemplate, byFieldName2)); // Check the index mapping now contains both fields String resultsMappingAfterJob2 = EntityUtils.toString(client().performRequest(getResultsMappingRequest).getEntity()); assertThat(resultsMappingAfterJob2, containsString(byFieldName1)); @@ -348,9 +336,7 @@ public void testCreateJobInCustomSharedIndexUpdatesMapping() throws Exception { String jobId2 = "create-job-in-custom-shared-index-updates-mapping-job-2"; String byFieldName2 = "cpu-usage"; - Request createJob1Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1); - createJob1Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName1)); - client().performRequest(createJob1Request); + putJob(jobId1, String.format(Locale.ROOT, jobTemplate, byFieldName1)); // Check the index mapping contains the first by_field_name Request getResultsMappingRequest = new Request("GET", @@ -360,9 +346,7 @@ public void testCreateJobInCustomSharedIndexUpdatesMapping() throws Exception { assertThat(resultsMappingAfterJob1, containsString(byFieldName1)); assertThat(resultsMappingAfterJob1, not(containsString(byFieldName2))); - Request createJob2Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId2); - createJob2Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName2)); - client().performRequest(createJob2Request); + putJob(jobId2, String.format(Locale.ROOT, jobTemplate, byFieldName2)); // Check the index mapping now contains both fields String resultsMappingAfterJob2 = EntityUtils.toString(client().performRequest(getResultsMappingRequest).getEntity()); @@ -391,13 +375,10 @@ public void testCreateJob_WithClashingFieldMappingsFails() throws Exception { byFieldName2 = "response"; } - Request createJob1Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1); - createJob1Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName1)); - client().performRequest(createJob1Request); + putJob(jobId1, String.format(Locale.ROOT, jobTemplate, byFieldName1)); - Request createJob2Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId2); - createJob2Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName2)); - ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(createJob2Request)); + ResponseException e = expectThrows(ResponseException.class, + () -> putJob(jobId2, String.format(Locale.ROOT, jobTemplate, byFieldName2))); assertThat(e.getMessage(), containsString("This job would cause a mapping clash with existing field [response] - " + "avoid the clash by assigning a dedicated results index")); @@ -419,8 +400,8 @@ public void testOpenJobFailsWhenPersistentTaskAssignmentDisabled() throws Except try { ResponseException exception = expectThrows( ResponseException.class, - () -> client().performRequest( - new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open"))); + () -> openJob(jobId) + ); assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(429)); assertThat(EntityUtils.toString(exception.getResponse().getEntity()), containsString("Cannot open jobs because persistent task assignment is disabled by the " + @@ -463,6 +444,41 @@ public void testDeleteJob() throws Exception { client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"))); } + public void testOutOfOrderData() throws Exception { + String jobId = "job-with-out-of-order-docs"; + createFarequoteJob(jobId); + + openJob(jobId); + + Request postDataRequest = new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data"); + // Post data is deprecated, so expect a deprecation warning + postDataRequest.setOptions(POST_DATA); + // Bucket span is 1h (3600s). So, posting data within the same hour should not result in out of order data + postDataRequest.setJsonEntity("{ \"airline\":\"LOT\", \"responsetime\":100, \"time\":\"2019-07-01 00:00:00Z\" }"); + client().performRequest(postDataRequest); + postDataRequest.setJsonEntity("{ \"airline\":\"LOT\", \"responsetime\":100, \"time\":\"2019-07-01 00:30:00Z\" }"); + client().performRequest(postDataRequest); + // out of order, but in the same time bucket + postDataRequest.setJsonEntity("{ \"airline\":\"LOT\", \"responsetime\":100, \"time\":\"2019-07-01 00:10:00Z\" }"); + client().performRequest(postDataRequest); + + Response flushResponse = + client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_flush")); + assertThat(entityAsMap(flushResponse), hasEntry("flushed", true)); + + closeJob(jobId); + + String stats = EntityUtils.toString( + client().performRequest(new Request("GET", "_ml/anomaly_detectors/" + jobId + "/_stats")).getEntity() + ); + //assert 2019-07-01 00:30:00Z + assertThat(stats, containsString("\"latest_record_timestamp\":1561941000000")); + assertThat(stats, containsString("\"out_of_order_timestamp_count\":0")); + assertThat(stats, containsString("\"processed_record_count\":3")); + + client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId)); + } + public void testDeleteJob_TimingStatsDocumentIsDeleted() throws Exception { String jobId = "delete-job-with-timing-stats-document-job"; String indexName = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT; @@ -472,15 +488,11 @@ public void testDeleteJob_TimingStatsDocumentIsDeleted() throws Exception { EntityUtils.toString(client().performRequest(new Request("GET", indexName + "/_count")).getEntity()), containsString("\"count\":0")); // documents related to the job do not exist yet - Response openResponse = - client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open")); - assertThat(entityAsMap(openResponse), hasEntry("opened", true)); + openJob(jobId); Request postDataRequest = new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data"); // Post data is deprecated, so expect a deprecation warning - postDataRequest.setOptions(RequestOptions.DEFAULT.toBuilder() - .setWarningsHandler(warnings -> Collections.singletonList("Posting data directly to anomaly detection jobs is deprecated, " + - "in a future major version it will be compulsory to use a datafeed").equals(warnings) == false)); + postDataRequest.setOptions(POST_DATA); postDataRequest.setJsonEntity("{ \"airline\":\"LOT\", \"response_time\":100, \"time\":\"2019-07-01 00:00:00Z\" }"); client().performRequest(postDataRequest); postDataRequest.setJsonEntity("{ \"airline\":\"LOT\", \"response_time\":100, \"time\":\"2019-07-01 02:00:00Z\" }"); @@ -490,9 +502,7 @@ public void testDeleteJob_TimingStatsDocumentIsDeleted() throws Exception { client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_flush")); assertThat(entityAsMap(flushResponse), hasEntry("flushed", true)); - Response closeResponse = - client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_close")); - assertThat(entityAsMap(closeResponse), hasEntry("closed", true)); + closeJob(jobId); String timingStatsDoc = EntityUtils.toString( @@ -840,6 +850,28 @@ private String getAliases() throws IOException { return EntityUtils.toString(response.getEntity()); } + private void openJob(String jobId) throws IOException { + Response openResponse = client().performRequest(new Request( + "POST", + MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open" + )); + assertThat(entityAsMap(openResponse), hasEntry("opened", true)); + } + + private void closeJob(String jobId) throws IOException { + Response openResponse = client().performRequest(new Request( + "POST", + MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_close" + )); + assertThat(entityAsMap(openResponse), hasEntry("closed", true)); + } + + private Response putJob(String jobId, String jsonBody) throws IOException { + Request request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId); + request.setJsonEntity(jsonBody); + return client().performRequest(request); + } + @After public void clearMlState() throws Exception { new MlRestTestStateCleaner(logger, adminClient()).clearMlMetadata(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java index 93caac71e3897..4c1df15799b49 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java @@ -74,20 +74,23 @@ public DataCountsReporter(Job job, DataCounts counts, JobDataCountsPersister dat * but the actual number of fields in the record * @param recordTimeMs The time of the record written * in milliseconds from the epoch. + * @param latestRecordTimeMs The time of the latest (in time) record written. + * May be greater than or equal to `recordTimeMs` */ - public void reportRecordWritten(long inputFieldCount, long recordTimeMs) { - Date recordDate = new Date(recordTimeMs); + public void reportRecordWritten(long inputFieldCount, long recordTimeMs, long latestRecordTimeMs) { + final Date latestRecordDate = new Date(latestRecordTimeMs); totalRecordStats.incrementInputFieldCount(inputFieldCount); totalRecordStats.incrementProcessedRecordCount(1); - totalRecordStats.setLatestRecordTimeStamp(recordDate); + totalRecordStats.setLatestRecordTimeStamp(latestRecordDate); incrementalRecordStats.incrementInputFieldCount(inputFieldCount); incrementalRecordStats.incrementProcessedRecordCount(1); - incrementalRecordStats.setLatestRecordTimeStamp(recordDate); + incrementalRecordStats.setLatestRecordTimeStamp(latestRecordDate); boolean isFirstReport = totalRecordStats.getEarliestRecordTimeStamp() == null; if (isFirstReport) { + final Date recordDate = new Date(recordTimeMs); totalRecordStats.setEarliestRecordTimeStamp(recordDate); incrementalRecordStats.setEarliestRecordTimeStamp(recordDate); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java index 9dc261abffb32..f228bc97ef1fa 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java @@ -33,6 +33,8 @@ import java.util.Set; import java.util.function.BiConsumer; +import static org.elasticsearch.xpack.core.ml.utils.Intervals.alignToFloor; + public abstract class AbstractDataToProcessWriter implements DataToProcessWriter { private static final int TIME_FIELD_OUT_INDEX = 0; @@ -48,7 +50,8 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter private final Logger logger; private final DateTransformer dateTransformer; - private long latencySeconds; + private final long bucketSpanMs; + private final long latencySeconds; protected Map inFieldIndexes; protected List inputOutputMap; @@ -68,6 +71,7 @@ protected AbstractDataToProcessWriter(boolean includeControlField, boolean inclu this.dataCountsReporter = Objects.requireNonNull(dataCountsReporter); this.logger = Objects.requireNonNull(logger); this.latencySeconds = analysisConfig.getLatency() == null ? 0 : analysisConfig.getLatency().seconds(); + this.bucketSpanMs = analysisConfig.getBucketSpan().getMillis(); Date date = dataCountsReporter.getLatestRecordTime(); latestEpochMsThisUpload = 0; @@ -178,9 +182,11 @@ protected boolean transformTimeAndWrite(String[] record, long numberOfFieldsRead } record[TIME_FIELD_OUT_INDEX] = Long.toString(epochMs / MS_IN_SECOND); + final long latestBucketFloor = alignToFloor(latestEpochMs, bucketSpanMs); - // Records have epoch seconds timestamp so compare for out of order in seconds - if (epochMs / MS_IN_SECOND < latestEpochMs / MS_IN_SECOND - latencySeconds) { + // We care only about records that are older than the current bucket according to our latest timestamp + // The native side handles random order within the same bucket without issue + if (epochMs / MS_IN_SECOND < latestBucketFloor / MS_IN_SECOND - latencySeconds) { // out of order dataCountsReporter.reportOutOfOrderRecord(numberOfFieldsRead); @@ -196,7 +202,7 @@ protected boolean transformTimeAndWrite(String[] record, long numberOfFieldsRead latestEpochMsThisUpload = latestEpochMs; autodetectProcess.writeRecord(record); - dataCountsReporter.reportRecordWritten(numberOfFieldsRead, epochMs); + dataCountsReporter.reportRecordWritten(numberOfFieldsRead, epochMs, latestEpochMs); return true; } @@ -325,7 +331,7 @@ protected abstract boolean checkForMissingFields(Collection inputFields, /** * Input and output array indexes map */ - protected class InputOutputMap { + protected static class InputOutputMap { int inputIndex; int outputIndex; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java index 3b8f8f7216449..dc809950c3906 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java @@ -33,7 +33,7 @@ public class DataCountsReporterTests extends ESTestCase { private Job job; private JobDataCountsPersister jobDataCountsPersister; - private TimeValue bucketSpan = TimeValue.timeValueSeconds(300); + private final TimeValue bucketSpan = TimeValue.timeValueSeconds(300); @Before public void setUpMocks() { @@ -86,8 +86,8 @@ public void testResetIncrementalCounts() { dataCountsReporter.setAnalysedFieldsPerRecord(3); - dataCountsReporter.reportRecordWritten(5, 1000); - dataCountsReporter.reportRecordWritten(5, 1000); + dataCountsReporter.reportRecordWritten(5, 1000, 1000); + dataCountsReporter.reportRecordWritten(5, 1000, 1000); assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount()); assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount()); assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount()); @@ -104,8 +104,8 @@ public void testResetIncrementalCounts() { // write some more data // skip a bucket so there is a non-zero empty bucket count long timeStamp = bucketSpan.millis() * 2 + 2000; - dataCountsReporter.reportRecordWritten(5, timeStamp); - dataCountsReporter.reportRecordWritten(5, timeStamp); + dataCountsReporter.reportRecordWritten(5, timeStamp, timeStamp); + dataCountsReporter.reportRecordWritten(5, timeStamp, timeStamp); assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount()); assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount()); assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount()); @@ -141,14 +141,14 @@ public void testReportRecordsWritten() { DataCountsReporter dataCountsReporter = new DataCountsReporter(job, new DataCounts(job.getId()), jobDataCountsPersister); dataCountsReporter.setAnalysedFieldsPerRecord(3); - dataCountsReporter.reportRecordWritten(5, 2000); + dataCountsReporter.reportRecordWritten(5, 2000, 2000); assertEquals(1, dataCountsReporter.incrementalStats().getInputRecordCount()); assertEquals(5, dataCountsReporter.incrementalStats().getInputFieldCount()); assertEquals(1, dataCountsReporter.incrementalStats().getProcessedRecordCount()); assertEquals(3, dataCountsReporter.incrementalStats().getProcessedFieldCount()); assertEquals(2000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); - dataCountsReporter.reportRecordWritten(5, 3000); + dataCountsReporter.reportRecordWritten(5, 3000, 3000); dataCountsReporter.reportMissingField(); assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount()); assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount()); @@ -166,7 +166,7 @@ public void testReportRecordsWritten_Given9999Records() { dataCountsReporter.setAnalysedFieldsPerRecord(3); for (int i = 1; i <= 9999; i++) { - dataCountsReporter.reportRecordWritten(5, i); + dataCountsReporter.reportRecordWritten(5, i, i); } assertEquals(9999, dataCountsReporter.incrementalStats().getInputRecordCount()); @@ -183,7 +183,7 @@ public void testReportRecordsWritten_Given30000Records() { dataCountsReporter.setAnalysedFieldsPerRecord(3); for (int i = 1; i <= 30001; i++) { - dataCountsReporter.reportRecordWritten(5, i); + dataCountsReporter.reportRecordWritten(5, i, i); } assertEquals(30001, dataCountsReporter.incrementalStats().getInputRecordCount()); @@ -200,7 +200,7 @@ public void testReportRecordsWritten_Given100_000Records() { dataCountsReporter.setAnalysedFieldsPerRecord(3); for (int i = 1; i <= 100000; i++) { - dataCountsReporter.reportRecordWritten(5, i); + dataCountsReporter.reportRecordWritten(5, i, i); } assertEquals(100000, dataCountsReporter.incrementalStats().getInputRecordCount()); @@ -217,7 +217,7 @@ public void testReportRecordsWritten_Given1_000_000Records() { dataCountsReporter.setAnalysedFieldsPerRecord(3); for (int i = 1; i <= 1_000_000; i++) { - dataCountsReporter.reportRecordWritten(5, i); + dataCountsReporter.reportRecordWritten(5, i, i); } assertEquals(1_000_000, dataCountsReporter.incrementalStats().getInputRecordCount()); @@ -234,7 +234,7 @@ public void testReportRecordsWritten_Given2_000_000Records() { dataCountsReporter.setAnalysedFieldsPerRecord(3); for (int i = 1; i <= 2_000_000; i++) { - dataCountsReporter.reportRecordWritten(5, i); + dataCountsReporter.reportRecordWritten(5, i, i); } assertEquals(2000000, dataCountsReporter.incrementalStats().getInputRecordCount()); @@ -254,8 +254,8 @@ public void testFinishReporting() { Date now = new Date(); DataCounts dc = new DataCounts(job.getId(), 2L, 5L, 0L, 10L, 0L, 1L, 0L, 0L, 0L, 0L, new Date(2000), new Date(3000), now, (Date) null, (Date) null, (Instant) null); - dataCountsReporter.reportRecordWritten(5, 2000); - dataCountsReporter.reportRecordWritten(5, 3000); + dataCountsReporter.reportRecordWritten(5, 2000, 2000); + dataCountsReporter.reportRecordWritten(5, 3000, 3000); dataCountsReporter.reportMissingField(); dataCountsReporter.finishReporting(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriterTests.java index 760f1bdd99812..e37354171883f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriterTests.java @@ -86,7 +86,9 @@ public Void answer(InvocationOnMock invocation) throws Throwable { dataDescription.setTimeFormat(DataDescription.EPOCH); Detector detector = new Detector.Builder("metric", "value").build(); - analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector)).build(); + analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector)) + .setBucketSpan(TimeValue.timeValueSeconds(1)) + .build(); } public void testWrite_GivenTimeFormatIsEpochAndDataIsValid() throws IOException { @@ -190,15 +192,54 @@ public void testWrite_GivenTimeFormatIsEpochAndAllRecordsAreOutOfOrder() throws verify(dataCountsReporter, times(2)).reportOutOfOrderRecord(2); verify(dataCountsReporter, times(2)).reportLatestTimeIncrementalStats(anyLong()); - verify(dataCountsReporter, never()).reportRecordWritten(anyLong(), anyLong()); + verify(dataCountsReporter, never()).reportRecordWritten(anyLong(), anyLong(), anyLong()); verify(dataCountsReporter).finishReporting(); } public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOutOfOrder() throws IOException { - AnalysisConfig.Builder builder = - new AnalysisConfig.Builder(Collections.singletonList(new Detector.Builder("metric", "value").build())); - builder.setLatency(TimeValue.timeValueSeconds(2)); - analysisConfig = builder.build(); + analysisConfig = new AnalysisConfig.Builder( + Collections.singletonList( + new Detector.Builder("metric", "value").build() + )) + .setLatency(TimeValue.timeValueSeconds(2)) + .setBucketSpan(TimeValue.timeValueSeconds(1)) + .build(); + + StringBuilder input = new StringBuilder(); + input.append("time,metric,value\n"); + input.append("4,foo,4.0\n"); + input.append("5,foo,5.0\n"); + input.append("3,foo,3.0\n"); + input.append("4,bar,4.0\n"); + input.append("2,bar,2.0\n"); + input.append("\0"); + InputStream inputStream = createInputStream(input.toString()); + CsvDataToProcessWriter writer = createWriter(); + writer.writeHeader(); + writer.write(inputStream, null, null, (r, e) -> {}); + verify(dataCountsReporter, times(1)).startNewIncrementalCount(); + + List expectedRecords = new ArrayList<>(); + // The final field is the control field + expectedRecords.add(new String[] { "time", "value", "." }); + expectedRecords.add(new String[] { "4", "4.0", "" }); + expectedRecords.add(new String[] { "5", "5.0", "" }); + expectedRecords.add(new String[] { "3", "3.0", "" }); + expectedRecords.add(new String[] { "4", "4.0", "" }); + assertWrittenRecordsEqualTo(expectedRecords); + + verify(dataCountsReporter, times(1)).reportOutOfOrderRecord(2); + verify(dataCountsReporter, never()).reportLatestTimeIncrementalStats(anyLong()); + verify(dataCountsReporter).finishReporting(); + } + + public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsOutOfOrderWithinBucketSpan() throws Exception { + analysisConfig = new AnalysisConfig.Builder( + Collections.singletonList( + new Detector.Builder("metric", "value").build() + )) + .setBucketSpan(TimeValue.timeValueSeconds(10)) + .build(); StringBuilder input = new StringBuilder(); input.append("time,metric,value\n"); @@ -207,6 +248,8 @@ public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOu input.append("3,foo,3.0\n"); input.append("4,bar,4.0\n"); input.append("2,bar,2.0\n"); + input.append("12,bar,12.0\n"); + input.append("2,bar,2.0\n"); input.append("\0"); InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); @@ -221,6 +264,8 @@ public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOu expectedRecords.add(new String[] { "5", "5.0", "" }); expectedRecords.add(new String[] { "3", "3.0", "" }); expectedRecords.add(new String[] { "4", "4.0", "" }); + expectedRecords.add(new String[] { "2", "2.0", "" }); + expectedRecords.add(new String[] { "12", "12.0", "" }); assertWrittenRecordsEqualTo(expectedRecords); verify(dataCountsReporter, times(1)).reportOutOfOrderRecord(2); @@ -258,10 +303,10 @@ public void testWrite_NullByte() throws IOException { assertWrittenRecordsEqualTo(expectedRecords); verify(dataCountsReporter, times(2)).reportMissingField(); - verify(dataCountsReporter, times(1)).reportRecordWritten(2, 1000); - verify(dataCountsReporter, times(1)).reportRecordWritten(2, 2000); - verify(dataCountsReporter, times(1)).reportRecordWritten(2, 3000); - verify(dataCountsReporter, times(1)).reportRecordWritten(2, 4000); + verify(dataCountsReporter, times(1)).reportRecordWritten(2, 1000, 1000); + verify(dataCountsReporter, times(1)).reportRecordWritten(2, 2000, 2000); + verify(dataCountsReporter, times(1)).reportRecordWritten(2, 3000, 3000); + verify(dataCountsReporter, times(1)).reportRecordWritten(2, 4000, 4000); verify(dataCountsReporter, times(1)).reportDateParseError(2); verify(dataCountsReporter).finishReporting(); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java index 229f7f260a634..9ce79ccb5a011 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java @@ -87,7 +87,9 @@ public Void answer(InvocationOnMock invocation) throws Throwable { dataDescription.setTimeFormat(DataDescription.EPOCH); Detector detector = new Detector.Builder("metric", "value").build(); - analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector)).build(); + analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector)) + .setBucketSpan(TimeValue.timeValueSeconds(1)) + .build(); } public void testWrite_GivenTimeFormatIsEpochAndDataIsValid() throws Exception { @@ -168,11 +170,51 @@ public void testWrite_GivenTimeFormatIsEpochAndTimestampsAreOutOfOrder() throws verify(dataCountsReporter).finishReporting(); } + public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsOutOfOrderWithinBucketSpan() throws Exception { + analysisConfig = new AnalysisConfig.Builder( + Collections.singletonList( + new Detector.Builder("metric", "value").build() + )) + .setBucketSpan(TimeValue.timeValueSeconds(10)) + .build(); + + StringBuilder input = new StringBuilder(); + input.append("{\"time\":\"4\", \"metric\":\"foo\", \"value\":\"4.0\"}"); + input.append("{\"time\":\"5\", \"metric\":\"foo\", \"value\":\"5.0\"}"); + input.append("{\"time\":\"3\", \"metric\":\"bar\", \"value\":\"3.0\"}"); + input.append("{\"time\":\"4\", \"metric\":\"bar\", \"value\":\"4.0\"}"); + input.append("{\"time\":\"2\", \"metric\":\"bar\", \"value\":\"2.0\"}"); + input.append("{\"time\":\"12\", \"metric\":\"bar\", \"value\":\"12.0\"}"); + input.append("{\"time\":\"2\", \"metric\":\"bar\", \"value\":\"2.0\"}"); + InputStream inputStream = createInputStream(input.toString()); + JsonDataToProcessWriter writer = createWriter(); + writer.writeHeader(); + writer.write(inputStream, null, XContentType.JSON, (r, e) -> {}); + + List expectedRecords = new ArrayList<>(); + // The final field is the control field + expectedRecords.add(new String[]{"time", "value", "."}); + expectedRecords.add(new String[]{"4", "4.0", ""}); + expectedRecords.add(new String[]{"5", "5.0", ""}); + expectedRecords.add(new String[]{"3", "3.0", ""}); + expectedRecords.add(new String[]{"4", "4.0", ""}); + expectedRecords.add(new String[]{"2", "2.0", ""}); + expectedRecords.add(new String[]{"12", "12.0", ""}); + assertWrittenRecordsEqualTo(expectedRecords); + + verify(dataCountsReporter, times(1)).reportOutOfOrderRecord(2); + verify(dataCountsReporter, never()).reportLatestTimeIncrementalStats(anyLong()); + verify(dataCountsReporter).finishReporting(); + } + public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOutOfOrder() throws Exception { - AnalysisConfig.Builder builder = - new AnalysisConfig.Builder(Collections.singletonList(new Detector.Builder("metric", "value").build())); - builder.setLatency(TimeValue.timeValueSeconds(2)); - analysisConfig = builder.build(); + analysisConfig = new AnalysisConfig.Builder( + Collections.singletonList( + new Detector.Builder("metric", "value").build() + )) + .setLatency(TimeValue.timeValueSeconds(2)) + .setBucketSpan(TimeValue.timeValueSeconds(1)).setLatency(TimeValue.timeValueSeconds(2)) + .build(); StringBuilder input = new StringBuilder(); input.append("{\"time\":\"4\", \"metric\":\"foo\", \"value\":\"4.0\"}"); @@ -326,10 +368,10 @@ public void testWrite_GivenJsonWithMissingFields() throws Exception { assertWrittenRecordsEqualTo(expectedRecords); verify(dataCountsReporter, times(1)).reportMissingFields(1L); - verify(dataCountsReporter, times(1)).reportRecordWritten(2, 1000); - verify(dataCountsReporter, times(1)).reportRecordWritten(1, 2000); - verify(dataCountsReporter, times(1)).reportRecordWritten(1, 3000); - verify(dataCountsReporter, times(1)).reportRecordWritten(1, 4000); + verify(dataCountsReporter, times(1)).reportRecordWritten(2, 1000, 1000); + verify(dataCountsReporter, times(1)).reportRecordWritten(1, 2000, 2000); + verify(dataCountsReporter, times(1)).reportRecordWritten(1, 3000, 3000); + verify(dataCountsReporter, times(1)).reportRecordWritten(1, 4000, 4000); verify(dataCountsReporter, times(1)).reportDateParseError(0); verify(dataCountsReporter).finishReporting(); }