Skip to content

Commit

Permalink
[ML] allow documents to be out of order within the same time bucket (#…
Browse files Browse the repository at this point in the history
…70468)

This commit allows documents seen within the same time bucket to be out of order.

This is already supported within the native process.

Additionally, when recording the "latest" record timestamp, we were assuming that the latest seen document was truly the "latest". This is not really the case if latency is utilized or if documents come out of order within the same bucket.
  • Loading branch information
benwtrent authored Mar 17, 2021
1 parent a1899ac commit 10e637d
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 114 deletions.
11 changes: 6 additions & 5 deletions docs/reference/ml/ml-shared.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1321,11 +1321,12 @@ For open jobs only, the elapsed time for which the job has been open.
end::open-time[]

tag::out-of-order-timestamp-count[]
The number of input documents that are out of time sequence and outside
of the latency window. This information is applicable only when you provide data
to the {anomaly-job} by using the <<ml-post-data,post data API>>. These out of
order documents are discarded, since jobs require time series data to be in
ascending chronological order.
The number of input documents that have a timestamp chronologically
preceding the start of the current anomaly detection bucket offset by
the latency window. This information is applicable only when you provide
data to the {anomaly-job} by using the <<ml-post-data,post data API>>.
These out of order documents are discarded, since jobs require time
series data to be in ascending chronological order.
end::out-of-order-timestamp-count[]

tag::outlier-fraction[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,14 +407,6 @@ public void setLatestRecordTimeStamp(Date latestRecordTimeStamp) {
this.latestRecordTimeStamp = latestRecordTimeStamp;
}

public void updateLatestRecordTimeStamp(Date latestRecordTimeStamp) {
if (latestRecordTimeStamp != null &&
(this.latestRecordTimeStamp == null ||
latestRecordTimeStamp.after(this.latestRecordTimeStamp))) {
this.latestRecordTimeStamp = latestRecordTimeStamp;
}
}

/**
* The wall clock time the latest record was seen.
*
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,23 @@ public DataCountsReporter(Job job, DataCounts counts, JobDataCountsPersister dat
* but the actual number of fields in the record
* @param recordTimeMs The time of the record written
* in milliseconds from the epoch.
* @param latestRecordTimeMs The time of the latest (in time) record written.
* May be greater than or equal to `recordTimeMs`
*/
public void reportRecordWritten(long inputFieldCount, long recordTimeMs) {
Date recordDate = new Date(recordTimeMs);
public void reportRecordWritten(long inputFieldCount, long recordTimeMs, long latestRecordTimeMs) {
final Date latestRecordDate = new Date(latestRecordTimeMs);

totalRecordStats.incrementInputFieldCount(inputFieldCount);
totalRecordStats.incrementProcessedRecordCount(1);
totalRecordStats.setLatestRecordTimeStamp(recordDate);
totalRecordStats.setLatestRecordTimeStamp(latestRecordDate);

incrementalRecordStats.incrementInputFieldCount(inputFieldCount);
incrementalRecordStats.incrementProcessedRecordCount(1);
incrementalRecordStats.setLatestRecordTimeStamp(recordDate);
incrementalRecordStats.setLatestRecordTimeStamp(latestRecordDate);

boolean isFirstReport = totalRecordStats.getEarliestRecordTimeStamp() == null;
if (isFirstReport) {
final Date recordDate = new Date(recordTimeMs);
totalRecordStats.setEarliestRecordTimeStamp(recordDate);
incrementalRecordStats.setEarliestRecordTimeStamp(recordDate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.Set;
import java.util.function.BiConsumer;

import static org.elasticsearch.xpack.core.ml.utils.Intervals.alignToFloor;

public abstract class AbstractDataToProcessWriter implements DataToProcessWriter {

private static final int TIME_FIELD_OUT_INDEX = 0;
Expand All @@ -48,7 +50,8 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter

private final Logger logger;
private final DateTransformer dateTransformer;
private long latencySeconds;
private final long bucketSpanMs;
private final long latencySeconds;

protected Map<String, Integer> inFieldIndexes;
protected List<InputOutputMap> inputOutputMap;
Expand All @@ -68,6 +71,7 @@ protected AbstractDataToProcessWriter(boolean includeControlField, boolean inclu
this.dataCountsReporter = Objects.requireNonNull(dataCountsReporter);
this.logger = Objects.requireNonNull(logger);
this.latencySeconds = analysisConfig.getLatency() == null ? 0 : analysisConfig.getLatency().seconds();
this.bucketSpanMs = analysisConfig.getBucketSpan().getMillis();

Date date = dataCountsReporter.getLatestRecordTime();
latestEpochMsThisUpload = 0;
Expand Down Expand Up @@ -178,9 +182,11 @@ protected boolean transformTimeAndWrite(String[] record, long numberOfFieldsRead
}

record[TIME_FIELD_OUT_INDEX] = Long.toString(epochMs / MS_IN_SECOND);
final long latestBucketFloor = alignToFloor(latestEpochMs, bucketSpanMs);

// Records have epoch seconds timestamp so compare for out of order in seconds
if (epochMs / MS_IN_SECOND < latestEpochMs / MS_IN_SECOND - latencySeconds) {
// We care only about records that are older than the current bucket according to our latest timestamp
// The native side handles random order within the same bucket without issue
if (epochMs / MS_IN_SECOND < latestBucketFloor / MS_IN_SECOND - latencySeconds) {
// out of order
dataCountsReporter.reportOutOfOrderRecord(numberOfFieldsRead);

Expand All @@ -196,7 +202,7 @@ protected boolean transformTimeAndWrite(String[] record, long numberOfFieldsRead
latestEpochMsThisUpload = latestEpochMs;

autodetectProcess.writeRecord(record);
dataCountsReporter.reportRecordWritten(numberOfFieldsRead, epochMs);
dataCountsReporter.reportRecordWritten(numberOfFieldsRead, epochMs, latestEpochMs);

return true;
}
Expand Down Expand Up @@ -325,7 +331,7 @@ protected abstract boolean checkForMissingFields(Collection<String> inputFields,
/**
* Input and output array indexes map
*/
protected class InputOutputMap {
protected static class InputOutputMap {
int inputIndex;
int outputIndex;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class DataCountsReporterTests extends ESTestCase {

private Job job;
private JobDataCountsPersister jobDataCountsPersister;
private TimeValue bucketSpan = TimeValue.timeValueSeconds(300);
private final TimeValue bucketSpan = TimeValue.timeValueSeconds(300);

@Before
public void setUpMocks() {
Expand Down Expand Up @@ -86,8 +86,8 @@ public void testResetIncrementalCounts() {

dataCountsReporter.setAnalysedFieldsPerRecord(3);

dataCountsReporter.reportRecordWritten(5, 1000);
dataCountsReporter.reportRecordWritten(5, 1000);
dataCountsReporter.reportRecordWritten(5, 1000, 1000);
dataCountsReporter.reportRecordWritten(5, 1000, 1000);
assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount());
Expand All @@ -104,8 +104,8 @@ public void testResetIncrementalCounts() {
// write some more data
// skip a bucket so there is a non-zero empty bucket count
long timeStamp = bucketSpan.millis() * 2 + 2000;
dataCountsReporter.reportRecordWritten(5, timeStamp);
dataCountsReporter.reportRecordWritten(5, timeStamp);
dataCountsReporter.reportRecordWritten(5, timeStamp, timeStamp);
dataCountsReporter.reportRecordWritten(5, timeStamp, timeStamp);
assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount());
Expand Down Expand Up @@ -141,14 +141,14 @@ public void testReportRecordsWritten() {
DataCountsReporter dataCountsReporter = new DataCountsReporter(job, new DataCounts(job.getId()), jobDataCountsPersister);
dataCountsReporter.setAnalysedFieldsPerRecord(3);

dataCountsReporter.reportRecordWritten(5, 2000);
dataCountsReporter.reportRecordWritten(5, 2000, 2000);
assertEquals(1, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(5, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(1, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(3, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(2000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());

dataCountsReporter.reportRecordWritten(5, 3000);
dataCountsReporter.reportRecordWritten(5, 3000, 3000);
dataCountsReporter.reportMissingField();
assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount());
Expand All @@ -166,7 +166,7 @@ public void testReportRecordsWritten_Given9999Records() {
dataCountsReporter.setAnalysedFieldsPerRecord(3);

for (int i = 1; i <= 9999; i++) {
dataCountsReporter.reportRecordWritten(5, i);
dataCountsReporter.reportRecordWritten(5, i, i);
}

assertEquals(9999, dataCountsReporter.incrementalStats().getInputRecordCount());
Expand All @@ -183,7 +183,7 @@ public void testReportRecordsWritten_Given30000Records() {
dataCountsReporter.setAnalysedFieldsPerRecord(3);

for (int i = 1; i <= 30001; i++) {
dataCountsReporter.reportRecordWritten(5, i);
dataCountsReporter.reportRecordWritten(5, i, i);
}

assertEquals(30001, dataCountsReporter.incrementalStats().getInputRecordCount());
Expand All @@ -200,7 +200,7 @@ public void testReportRecordsWritten_Given100_000Records() {
dataCountsReporter.setAnalysedFieldsPerRecord(3);

for (int i = 1; i <= 100000; i++) {
dataCountsReporter.reportRecordWritten(5, i);
dataCountsReporter.reportRecordWritten(5, i, i);
}

assertEquals(100000, dataCountsReporter.incrementalStats().getInputRecordCount());
Expand All @@ -217,7 +217,7 @@ public void testReportRecordsWritten_Given1_000_000Records() {
dataCountsReporter.setAnalysedFieldsPerRecord(3);

for (int i = 1; i <= 1_000_000; i++) {
dataCountsReporter.reportRecordWritten(5, i);
dataCountsReporter.reportRecordWritten(5, i, i);
}

assertEquals(1_000_000, dataCountsReporter.incrementalStats().getInputRecordCount());
Expand All @@ -234,7 +234,7 @@ public void testReportRecordsWritten_Given2_000_000Records() {
dataCountsReporter.setAnalysedFieldsPerRecord(3);

for (int i = 1; i <= 2_000_000; i++) {
dataCountsReporter.reportRecordWritten(5, i);
dataCountsReporter.reportRecordWritten(5, i, i);
}

assertEquals(2000000, dataCountsReporter.incrementalStats().getInputRecordCount());
Expand All @@ -254,8 +254,8 @@ public void testFinishReporting() {
Date now = new Date();
DataCounts dc = new DataCounts(job.getId(), 2L, 5L, 0L, 10L, 0L, 1L, 0L, 0L, 0L, 0L, new Date(2000), new Date(3000),
now, (Date) null, (Date) null, (Instant) null);
dataCountsReporter.reportRecordWritten(5, 2000);
dataCountsReporter.reportRecordWritten(5, 3000);
dataCountsReporter.reportRecordWritten(5, 2000, 2000);
dataCountsReporter.reportRecordWritten(5, 3000, 3000);
dataCountsReporter.reportMissingField();
dataCountsReporter.finishReporting();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
dataDescription.setTimeFormat(DataDescription.EPOCH);

Detector detector = new Detector.Builder("metric", "value").build();
analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector)).build();
analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector))
.setBucketSpan(TimeValue.timeValueSeconds(1))
.build();
}

public void testWrite_GivenTimeFormatIsEpochAndDataIsValid() throws IOException {
Expand Down Expand Up @@ -190,15 +192,54 @@ public void testWrite_GivenTimeFormatIsEpochAndAllRecordsAreOutOfOrder() throws

verify(dataCountsReporter, times(2)).reportOutOfOrderRecord(2);
verify(dataCountsReporter, times(2)).reportLatestTimeIncrementalStats(anyLong());
verify(dataCountsReporter, never()).reportRecordWritten(anyLong(), anyLong());
verify(dataCountsReporter, never()).reportRecordWritten(anyLong(), anyLong(), anyLong());
verify(dataCountsReporter).finishReporting();
}

public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOutOfOrder() throws IOException {
AnalysisConfig.Builder builder =
new AnalysisConfig.Builder(Collections.singletonList(new Detector.Builder("metric", "value").build()));
builder.setLatency(TimeValue.timeValueSeconds(2));
analysisConfig = builder.build();
analysisConfig = new AnalysisConfig.Builder(
Collections.singletonList(
new Detector.Builder("metric", "value").build()
))
.setLatency(TimeValue.timeValueSeconds(2))
.setBucketSpan(TimeValue.timeValueSeconds(1))
.build();

StringBuilder input = new StringBuilder();
input.append("time,metric,value\n");
input.append("4,foo,4.0\n");
input.append("5,foo,5.0\n");
input.append("3,foo,3.0\n");
input.append("4,bar,4.0\n");
input.append("2,bar,2.0\n");
input.append("\0");
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream, null, null, (r, e) -> {});
verify(dataCountsReporter, times(1)).startNewIncrementalCount();

List<String[]> expectedRecords = new ArrayList<>();
// The final field is the control field
expectedRecords.add(new String[] { "time", "value", "." });
expectedRecords.add(new String[] { "4", "4.0", "" });
expectedRecords.add(new String[] { "5", "5.0", "" });
expectedRecords.add(new String[] { "3", "3.0", "" });
expectedRecords.add(new String[] { "4", "4.0", "" });
assertWrittenRecordsEqualTo(expectedRecords);

verify(dataCountsReporter, times(1)).reportOutOfOrderRecord(2);
verify(dataCountsReporter, never()).reportLatestTimeIncrementalStats(anyLong());
verify(dataCountsReporter).finishReporting();
}

public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsOutOfOrderWithinBucketSpan() throws Exception {
analysisConfig = new AnalysisConfig.Builder(
Collections.singletonList(
new Detector.Builder("metric", "value").build()
))
.setBucketSpan(TimeValue.timeValueSeconds(10))
.build();

StringBuilder input = new StringBuilder();
input.append("time,metric,value\n");
Expand All @@ -207,6 +248,8 @@ public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOu
input.append("3,foo,3.0\n");
input.append("4,bar,4.0\n");
input.append("2,bar,2.0\n");
input.append("12,bar,12.0\n");
input.append("2,bar,2.0\n");
input.append("\0");
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
Expand All @@ -221,6 +264,8 @@ public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOu
expectedRecords.add(new String[] { "5", "5.0", "" });
expectedRecords.add(new String[] { "3", "3.0", "" });
expectedRecords.add(new String[] { "4", "4.0", "" });
expectedRecords.add(new String[] { "2", "2.0", "" });
expectedRecords.add(new String[] { "12", "12.0", "" });
assertWrittenRecordsEqualTo(expectedRecords);

verify(dataCountsReporter, times(1)).reportOutOfOrderRecord(2);
Expand Down Expand Up @@ -258,10 +303,10 @@ public void testWrite_NullByte() throws IOException {
assertWrittenRecordsEqualTo(expectedRecords);

verify(dataCountsReporter, times(2)).reportMissingField();
verify(dataCountsReporter, times(1)).reportRecordWritten(2, 1000);
verify(dataCountsReporter, times(1)).reportRecordWritten(2, 2000);
verify(dataCountsReporter, times(1)).reportRecordWritten(2, 3000);
verify(dataCountsReporter, times(1)).reportRecordWritten(2, 4000);
verify(dataCountsReporter, times(1)).reportRecordWritten(2, 1000, 1000);
verify(dataCountsReporter, times(1)).reportRecordWritten(2, 2000, 2000);
verify(dataCountsReporter, times(1)).reportRecordWritten(2, 3000, 3000);
verify(dataCountsReporter, times(1)).reportRecordWritten(2, 4000, 4000);
verify(dataCountsReporter, times(1)).reportDateParseError(2);
verify(dataCountsReporter).finishReporting();
}
Expand Down
Loading

0 comments on commit 10e637d

Please sign in to comment.