From 7868a569eba1195e689ca1cf490c7e97238c0349 Mon Sep 17 00:00:00 2001 From: Quynh Nguyen <43350163+qn895@users.noreply.github.com> Date: Thu, 16 Jul 2020 14:42:34 -0500 Subject: [PATCH] [ML] Fix datafeed start time is incorrect when the job has trailing empty buckets (#71976) Co-authored-by: Elastic Machine --- .../anomaly_detection_jobs/summary_job.ts | 1 + .../plugins/ml/common/util/job_utils.test.ts | 20 +++++++++++++++++++ x-pack/plugins/ml/common/util/job_utils.ts | 19 +++++++++++++++++- .../start_datafeed_modal.js | 2 +- .../ml/server/models/job_service/jobs.ts | 12 ++++++++--- 5 files changed, 49 insertions(+), 5 deletions(-) diff --git a/x-pack/plugins/ml/common/types/anomaly_detection_jobs/summary_job.ts b/x-pack/plugins/ml/common/types/anomaly_detection_jobs/summary_job.ts index 6cf109dc553ae..2102673060273 100644 --- a/x-pack/plugins/ml/common/types/anomaly_detection_jobs/summary_job.ts +++ b/x-pack/plugins/ml/common/types/anomaly_detection_jobs/summary_job.ts @@ -30,6 +30,7 @@ export interface MlSummaryJob { isSingleMetricViewerJob: boolean; deleting?: boolean; latestTimestampSortValue?: number; + earliestStartTimestampMs?: number; } export interface AuditMessage { diff --git a/x-pack/plugins/ml/common/util/job_utils.test.ts b/x-pack/plugins/ml/common/util/job_utils.test.ts index 233e2c2cd19ac..a56ccd5208bab 100644 --- a/x-pack/plugins/ml/common/util/job_utils.test.ts +++ b/x-pack/plugins/ml/common/util/job_utils.test.ts @@ -18,8 +18,10 @@ import { prefixDatafeedId, getSafeAggregationName, getLatestDataOrBucketTimestamp, + getEarliestDatafeedStartTime, } from './job_utils'; import { CombinedJob, Job } from '../types/anomaly_detection_jobs'; +import moment from 'moment'; describe('ML - job utils', () => { describe('calculateDatafeedFrequencyDefaultSeconds', () => { @@ -581,4 +583,22 @@ describe('ML - job utils', () => { expect(getLatestDataOrBucketTimestamp(undefined, undefined)).toBe(undefined); }); }); + + describe('getEarliestDatafeedStartTime', () => { + test('returns expected value when no gap in data at end of bucket processing', () => { + expect(getEarliestDatafeedStartTime(1549929594000, 1549928700000)).toBe(1549929594000); + }); + test('returns expected value when there is a gap in data at end of bucket processing', () => { + expect(getEarliestDatafeedStartTime(1549929594000, 1562256600000)).toBe(1562256600000); + }); + test('returns expected value when bucket span is provided', () => { + expect( + getEarliestDatafeedStartTime(1549929594000, 1562256600000, moment.duration(1, 'h')) + ).toBe(1562260200000); + }); + + test('returns expected value when job has not run', () => { + expect(getLatestDataOrBucketTimestamp(undefined, undefined)).toBe(undefined); + }); + }); }); diff --git a/x-pack/plugins/ml/common/util/job_utils.ts b/x-pack/plugins/ml/common/util/job_utils.ts index 7ea4ceccf578d..bb0e351ebfec8 100644 --- a/x-pack/plugins/ml/common/util/job_utils.ts +++ b/x-pack/plugins/ml/common/util/job_utils.ts @@ -6,7 +6,7 @@ import _ from 'lodash'; import semver from 'semver'; -import { Duration } from 'moment'; +import moment, { Duration } from 'moment'; // @ts-ignore import numeral from '@elastic/numeral'; @@ -621,6 +621,23 @@ function isValidTimeInterval(value: string | undefined): boolean { return parseTimeIntervalForJob(value) !== null; } +// The earliest start time for the datafeed should be the max(latest_record_timestamp, latest_bucket.timestamp + bucket_span). +export function getEarliestDatafeedStartTime( + latestRecordTimestamp: number | undefined, + latestBucketTimestamp: number | undefined, + bucketSpan?: Duration | null | undefined +): number | undefined { + if (latestRecordTimestamp !== undefined && latestBucketTimestamp !== undefined) { + // if bucket span is available (e.g. 15m) add it to the latest bucket timestamp in ms + const adjustedBucketStartTime = bucketSpan + ? moment(latestBucketTimestamp).add(bucketSpan).valueOf() + : latestBucketTimestamp; + return Math.max(latestRecordTimestamp, adjustedBucketStartTime); + } else { + return latestRecordTimestamp !== undefined ? latestRecordTimestamp : latestBucketTimestamp; + } +} + // Returns the latest of the last source data and last processed bucket timestamp, // as used for example in setting the end time of results views for cases where // anomalies might have been raised after the point at which data ingest has stopped. diff --git a/x-pack/plugins/ml/public/application/jobs/jobs_list/components/start_datafeed_modal/start_datafeed_modal.js b/x-pack/plugins/ml/public/application/jobs/jobs_list/components/start_datafeed_modal/start_datafeed_modal.js index 9ce15fb881bd8..d0d3dc56ababf 100644 --- a/x-pack/plugins/ml/public/application/jobs/jobs_list/components/start_datafeed_modal/start_datafeed_modal.js +++ b/x-pack/plugins/ml/public/application/jobs/jobs_list/components/start_datafeed_modal/start_datafeed_modal.js @@ -222,6 +222,6 @@ StartDatafeedModal.propTypes = { }; function getLowestLatestTime(jobs) { - const times = jobs.map((j) => j.latestTimestampSortValue); + const times = jobs.map((j) => j.earliestStartTimestampMs || 0); return moment(Math.min(...times)); } diff --git a/x-pack/plugins/ml/server/models/job_service/jobs.ts b/x-pack/plugins/ml/server/models/job_service/jobs.ts index aca0c5d72a9f5..e9ed2d0941d96 100644 --- a/x-pack/plugins/ml/server/models/job_service/jobs.ts +++ b/x-pack/plugins/ml/server/models/job_service/jobs.ts @@ -8,6 +8,7 @@ import { i18n } from '@kbn/i18n'; import { uniq } from 'lodash'; import Boom from 'boom'; import { ILegacyScopedClusterClient } from 'kibana/server'; +import { parseTimeIntervalForJob } from '../../../common/util/job_utils'; import { JOB_STATE, DATAFEED_STATE } from '../../../common/constants/states'; import { MlSummaryJob, @@ -24,11 +25,11 @@ import { resultsServiceProvider } from '../results_service'; import { CalendarManager, Calendar } from '../calendar'; import { fillResultsWithTimeouts, isRequestTimeout } from './error_utils'; import { + getEarliestDatafeedStartTime, getLatestDataOrBucketTimestamp, isTimeSeriesViewJob, } from '../../../common/util/job_utils'; import { groupsProvider } from './groups'; - export interface MlJobsResponse { jobs: Job[]; count: number; @@ -171,6 +172,11 @@ export function jobsProvider(mlClusterClient: ILegacyScopedClusterClient) { description: job.description || '', groups: Array.isArray(job.groups) ? job.groups.sort() : [], processed_record_count: job.data_counts?.processed_record_count, + earliestStartTimestampMs: getEarliestDatafeedStartTime( + dataCounts?.latest_record_timestamp, + dataCounts?.latest_bucket_timestamp, + parseTimeIntervalForJob(job.analysis_config?.bucket_span) + ), memory_status: job.model_size_stats ? job.model_size_stats.memory_status : '', jobState: job.deleting === true ? deletingStr : job.state, hasDatafeed, @@ -182,8 +188,8 @@ export function jobsProvider(mlClusterClient: ILegacyScopedClusterClient) { latestTimestampMs: dataCounts?.latest_record_timestamp, earliestTimestampMs: dataCounts?.earliest_record_timestamp, latestResultsTimestampMs: getLatestDataOrBucketTimestamp( - dataCounts?.latest_record_timestamp as number, - dataCounts?.latest_bucket_timestamp as number + dataCounts?.latest_record_timestamp, + dataCounts?.latest_bucket_timestamp ), isSingleMetricViewerJob: isTimeSeriesViewJob(job), nodeName: job.node ? job.node.name : undefined,