Skip to content

Commit

Permalink
[ML] Fix datafeed start time is incorrect when the job has trailing e…
Browse files Browse the repository at this point in the history
…mpty buckets (#71976)

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
qn895 and elasticmachine authored Jul 16, 2020
1 parent 52597b2 commit 7868a56
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export interface MlSummaryJob {
isSingleMetricViewerJob: boolean;
deleting?: boolean;
latestTimestampSortValue?: number;
earliestStartTimestampMs?: number;
}

export interface AuditMessage {
Expand Down
20 changes: 20 additions & 0 deletions x-pack/plugins/ml/common/util/job_utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import {
prefixDatafeedId,
getSafeAggregationName,
getLatestDataOrBucketTimestamp,
getEarliestDatafeedStartTime,
} from './job_utils';
import { CombinedJob, Job } from '../types/anomaly_detection_jobs';
import moment from 'moment';

describe('ML - job utils', () => {
describe('calculateDatafeedFrequencyDefaultSeconds', () => {
Expand Down Expand Up @@ -581,4 +583,22 @@ describe('ML - job utils', () => {
expect(getLatestDataOrBucketTimestamp(undefined, undefined)).toBe(undefined);
});
});

describe('getEarliestDatafeedStartTime', () => {
test('returns expected value when no gap in data at end of bucket processing', () => {
expect(getEarliestDatafeedStartTime(1549929594000, 1549928700000)).toBe(1549929594000);
});
test('returns expected value when there is a gap in data at end of bucket processing', () => {
expect(getEarliestDatafeedStartTime(1549929594000, 1562256600000)).toBe(1562256600000);
});
test('returns expected value when bucket span is provided', () => {
expect(
getEarliestDatafeedStartTime(1549929594000, 1562256600000, moment.duration(1, 'h'))
).toBe(1562260200000);
});

test('returns expected value when job has not run', () => {
expect(getLatestDataOrBucketTimestamp(undefined, undefined)).toBe(undefined);
});
});
});
19 changes: 18 additions & 1 deletion x-pack/plugins/ml/common/util/job_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import _ from 'lodash';
import semver from 'semver';
import { Duration } from 'moment';
import moment, { Duration } from 'moment';
// @ts-ignore
import numeral from '@elastic/numeral';

Expand Down Expand Up @@ -621,6 +621,23 @@ function isValidTimeInterval(value: string | undefined): boolean {
return parseTimeIntervalForJob(value) !== null;
}

// The earliest start time for the datafeed should be the max(latest_record_timestamp, latest_bucket.timestamp + bucket_span).
export function getEarliestDatafeedStartTime(
latestRecordTimestamp: number | undefined,
latestBucketTimestamp: number | undefined,
bucketSpan?: Duration | null | undefined
): number | undefined {
if (latestRecordTimestamp !== undefined && latestBucketTimestamp !== undefined) {
// if bucket span is available (e.g. 15m) add it to the latest bucket timestamp in ms
const adjustedBucketStartTime = bucketSpan
? moment(latestBucketTimestamp).add(bucketSpan).valueOf()
: latestBucketTimestamp;
return Math.max(latestRecordTimestamp, adjustedBucketStartTime);
} else {
return latestRecordTimestamp !== undefined ? latestRecordTimestamp : latestBucketTimestamp;
}
}

// Returns the latest of the last source data and last processed bucket timestamp,
// as used for example in setting the end time of results views for cases where
// anomalies might have been raised after the point at which data ingest has stopped.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,6 @@ StartDatafeedModal.propTypes = {
};

function getLowestLatestTime(jobs) {
const times = jobs.map((j) => j.latestTimestampSortValue);
const times = jobs.map((j) => j.earliestStartTimestampMs || 0);
return moment(Math.min(...times));
}
12 changes: 9 additions & 3 deletions x-pack/plugins/ml/server/models/job_service/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { i18n } from '@kbn/i18n';
import { uniq } from 'lodash';
import Boom from 'boom';
import { ILegacyScopedClusterClient } from 'kibana/server';
import { parseTimeIntervalForJob } from '../../../common/util/job_utils';
import { JOB_STATE, DATAFEED_STATE } from '../../../common/constants/states';
import {
MlSummaryJob,
Expand All @@ -24,11 +25,11 @@ import { resultsServiceProvider } from '../results_service';
import { CalendarManager, Calendar } from '../calendar';
import { fillResultsWithTimeouts, isRequestTimeout } from './error_utils';
import {
getEarliestDatafeedStartTime,
getLatestDataOrBucketTimestamp,
isTimeSeriesViewJob,
} from '../../../common/util/job_utils';
import { groupsProvider } from './groups';

export interface MlJobsResponse {
jobs: Job[];
count: number;
Expand Down Expand Up @@ -171,6 +172,11 @@ export function jobsProvider(mlClusterClient: ILegacyScopedClusterClient) {
description: job.description || '',
groups: Array.isArray(job.groups) ? job.groups.sort() : [],
processed_record_count: job.data_counts?.processed_record_count,
earliestStartTimestampMs: getEarliestDatafeedStartTime(
dataCounts?.latest_record_timestamp,
dataCounts?.latest_bucket_timestamp,
parseTimeIntervalForJob(job.analysis_config?.bucket_span)
),
memory_status: job.model_size_stats ? job.model_size_stats.memory_status : '',
jobState: job.deleting === true ? deletingStr : job.state,
hasDatafeed,
Expand All @@ -182,8 +188,8 @@ export function jobsProvider(mlClusterClient: ILegacyScopedClusterClient) {
latestTimestampMs: dataCounts?.latest_record_timestamp,
earliestTimestampMs: dataCounts?.earliest_record_timestamp,
latestResultsTimestampMs: getLatestDataOrBucketTimestamp(
dataCounts?.latest_record_timestamp as number,
dataCounts?.latest_bucket_timestamp as number
dataCounts?.latest_record_timestamp,
dataCounts?.latest_bucket_timestamp
),
isSingleMetricViewerJob: isTimeSeriesViewJob(job),
nodeName: job.node ? job.node.name : undefined,
Expand Down

0 comments on commit 7868a56

Please sign in to comment.