diff --git a/x-pack/plugins/task_manager/server/config.mock.ts b/x-pack/plugins/task_manager/server/config.mock.ts new file mode 100644 index 0000000000000..513dea71d39dd --- /dev/null +++ b/x-pack/plugins/task_manager/server/config.mock.ts @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { type TaskManagerConfig, configSchema } from './config'; + +const createConfigMock = (overwrites: Partial = {}) => { + const mocked: TaskManagerConfig = configSchema.validate(overwrites); + return mocked; +}; + +export const configMock = { + create: createConfigMock, +}; diff --git a/x-pack/plugins/task_manager/server/lib/get_next_run_at.test.ts b/x-pack/plugins/task_manager/server/lib/get_next_run_at.test.ts new file mode 100644 index 0000000000000..efa7cf90ae15f --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/get_next_run_at.test.ts @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { taskManagerMock } from '../mocks'; + +import { getNextRunAt } from './get_next_run_at'; + +describe('getNextRunAt', () => { + test('should use startedAt when the task delay is greater than the threshold', () => { + const now = new Date(); + // Use time in the past to ensure the task delay calculation isn't relative to "now" + const fiveSecondsAgo = new Date(now.getTime() - 5000); + const fourSecondsAgo = new Date(now.getTime() - 4000); + const nextRunAt = getNextRunAt( + taskManagerMock.createTask({ + schedule: { interval: '1m' }, + runAt: fiveSecondsAgo, + startedAt: fourSecondsAgo, + }), + 500 + ); + expect(nextRunAt).toEqual(new Date(fourSecondsAgo.getTime() + 60000)); + }); + + test('should use runAt when the task delay is greater than the threshold', () => { + const now = new Date(); + // Use time in the past to ensure the task delay calculation isn't relative to "now" + const fiveSecondsAgo = new Date(now.getTime() - 5000); + const aBitLessThanFiveSecondsAgo = new Date(now.getTime() - 4995); + const nextRunAt = getNextRunAt( + taskManagerMock.createTask({ + schedule: { interval: '1m' }, + runAt: fiveSecondsAgo, + startedAt: aBitLessThanFiveSecondsAgo, + }), + 500 + ); + expect(nextRunAt).toEqual(new Date(fiveSecondsAgo.getTime() + 60000)); + }); + + test('should not schedule in the past', () => { + const testStart = new Date(); + const fiveMinsAgo = new Date(Date.now() - 300000); + const nextRunAt = getNextRunAt( + taskManagerMock.createTask({ + schedule: { interval: '1m' }, + runAt: fiveMinsAgo, + startedAt: fiveMinsAgo, + }), + 0 + ); + expect(nextRunAt.getTime()).toBeGreaterThanOrEqual(testStart.getTime()); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts b/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts new file mode 100644 index 0000000000000..a25960e61ee29 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { intervalFromDate } from './intervals'; +import type { ConcreteTaskInstance } from '../task'; + +export function getNextRunAt( + { runAt, startedAt, schedule }: Pick, + taskDelayThresholdForPreciseScheduling: number = 0 +): Date { + const taskDelay = startedAt!.getTime() - runAt.getTime(); + const scheduleFromDate = taskDelay < taskDelayThresholdForPreciseScheduling ? runAt : startedAt!; + + // Ensure we also don't schedule in the past by performing the Math.max with Date.now() + const nextCalculatedRunAt = Math.max( + intervalFromDate(scheduleFromDate, schedule!.interval)!.getTime(), + Date.now() + ); + + return new Date(nextCalculatedRunAt); +} diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index b8d41391f1411..81a65009391f6 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -84,6 +84,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter; private logger: Logger; public pool: TaskPool; @@ -122,6 +123,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter this.events$.next(event); @@ -220,9 +222,10 @@ export class TaskPollingLifecycle implements ITaskEventEmitter secondsFromNow(mins * 60); +const getNextRunAtSpy = jest.spyOn(nextRunAtUtils, 'getNextRunAt'); let fakeTimer: sinon.SinonFakeTimers; @@ -977,6 +981,8 @@ describe('TaskManagerRunner', () => { expect(instance.params).toEqual({ a: 'b' }); expect(instance.state).toEqual({ hey: 'there' }); expect(instance.enabled).not.toBeDefined(); + + expect(getNextRunAtSpy).not.toHaveBeenCalled(); }); test('reschedules tasks that have an schedule', async () => { @@ -1007,6 +1013,8 @@ describe('TaskManagerRunner', () => { expect(instance.runAt.getTime()).toBeGreaterThan(minutesFromNow(9).getTime()); expect(instance.runAt.getTime()).toBeLessThanOrEqual(minutesFromNow(10).getTime()); expect(instance.enabled).not.toBeDefined(); + + expect(getNextRunAtSpy).toHaveBeenCalled(); }); test('expiration returns time after which timeout will have elapsed from start', async () => { @@ -1084,6 +1092,8 @@ describe('TaskManagerRunner', () => { expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }), { validate: true, }); + + expect(getNextRunAtSpy).not.toHaveBeenCalled(); }); test('reschedules tasks that return a schedule', async () => { @@ -1114,6 +1124,11 @@ describe('TaskManagerRunner', () => { expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }), { validate: true, }); + + expect(getNextRunAtSpy).toHaveBeenCalledWith( + expect.objectContaining({ schedule }), + expect.any(Number) + ); }); test(`doesn't reschedule recurring tasks that throw an unrecoverable error`, async () => { @@ -2479,12 +2494,15 @@ describe('TaskManagerRunner', () => { onTaskEvent: opts.onTaskEvent, executionContext, usageCounter, - eventLoopDelayConfig: { - monitor: true, - warn_threshold: 5000, - }, + config: configMock.create({ + event_loop_delay: { + monitor: true, + warn_threshold: 5000, + }, + }), allowReadingInvalidState: opts.allowReadingInvalidState || false, strategy: opts.strategy ?? CLAIM_STRATEGY_UPDATE_BY_QUERY, + pollIntervalConfiguration$: new BehaviorSubject(500), }); if (stage === TaskRunningStage.READY_TO_RUN) { diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts index f125aee5952ce..862c76f5b98a0 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -11,6 +11,7 @@ * rescheduling, middleware application, etc. */ +import { Observable } from 'rxjs'; import apm from 'elastic-apm-node'; import { v4 as uuidv4 } from 'uuid'; import { withSpan } from '@kbn/apm-utils'; @@ -54,9 +55,10 @@ import { } from '../task'; import { TaskTypeDictionary } from '../task_type_dictionary'; import { isUnrecoverableError } from './errors'; -import { CLAIM_STRATEGY_MGET, type EventLoopDelayConfig } from '../config'; +import { CLAIM_STRATEGY_MGET, type TaskManagerConfig } from '../config'; import { TaskValidator } from '../task_validator'; import { getRetryAt, getRetryDate, getTimeout } from '../lib/get_retry_at'; +import { getNextRunAt } from '../lib/get_next_run_at'; export const EMPTY_RUN_RESULT: SuccessfulRunResult = { state: {} }; @@ -107,9 +109,10 @@ type Opts = { defaultMaxAttempts: number; executionContext: ExecutionContextStart; usageCounter?: UsageCounter; - eventLoopDelayConfig: EventLoopDelayConfig; + config: TaskManagerConfig; allowReadingInvalidState: boolean; strategy: string; + pollIntervalConfiguration$: Observable; } & Pick; export enum TaskRunResult { @@ -159,9 +162,10 @@ export class TaskManagerRunner implements TaskRunner { private uuid: string; private readonly executionContext: ExecutionContextStart; private usageCounter?: UsageCounter; - private eventLoopDelayConfig: EventLoopDelayConfig; + private config: TaskManagerConfig; private readonly taskValidator: TaskValidator; private readonly claimStrategy: string; + private currentPollInterval: number; /** * Creates an instance of TaskManagerRunner. @@ -184,9 +188,10 @@ export class TaskManagerRunner implements TaskRunner { onTaskEvent = identity, executionContext, usageCounter, - eventLoopDelayConfig, + config, allowReadingInvalidState, strategy, + pollIntervalConfiguration$, }: Opts) { this.instance = asPending(sanitizeInstance(instance)); this.definitions = definitions; @@ -199,13 +204,17 @@ export class TaskManagerRunner implements TaskRunner { this.executionContext = executionContext; this.usageCounter = usageCounter; this.uuid = uuidv4(); - this.eventLoopDelayConfig = eventLoopDelayConfig; + this.config = config; this.taskValidator = new TaskValidator({ logger: this.logger, definitions: this.definitions, allowReadingInvalidState, }); this.claimStrategy = strategy; + this.currentPollInterval = config.poll_interval; + pollIntervalConfiguration$.subscribe((pollInterval) => { + this.currentPollInterval = pollInterval; + }); } /** @@ -334,7 +343,7 @@ export class TaskManagerRunner implements TaskRunner { const apmTrans = apm.startTransaction(this.taskType, TASK_MANAGER_RUN_TRANSACTION_TYPE, { childOf: this.instance.task.traceparent, }); - const stopTaskTimer = startTaskTimerWithEventLoopMonitoring(this.eventLoopDelayConfig); + const stopTaskTimer = startTaskTimerWithEventLoopMonitoring(this.config.event_loop_delay); // Validate state const stateValidationResult = this.validateTaskState(this.instance.task); @@ -636,13 +645,20 @@ export class TaskManagerRunner implements TaskRunner { return asOk({ status: TaskStatus.ShouldDelete }); } - const { startedAt, schedule } = this.instance.task; - + const updatedTaskSchedule = reschedule ?? this.instance.task.schedule; return asOk({ runAt: - runAt || intervalFromDate(startedAt!, reschedule?.interval ?? schedule?.interval)!, + runAt || + getNextRunAt( + { + runAt: this.instance.task.runAt, + startedAt: this.instance.task.startedAt, + schedule: updatedTaskSchedule, + }, + this.currentPollInterval + ), state, - schedule: reschedule ?? schedule, + schedule: updatedTaskSchedule, attempts, status: TaskStatus.Idle, }); @@ -790,7 +806,7 @@ export class TaskManagerRunner implements TaskRunner { const { eventLoopBlockMs = 0 } = taskTiming; const taskLabel = `${this.taskType} ${this.instance.task.id}`; - if (eventLoopBlockMs > this.eventLoopDelayConfig.warn_threshold) { + if (eventLoopBlockMs > this.config.event_loop_delay.warn_threshold) { this.logger.warn( `event loop blocked for at least ${eventLoopBlockMs} ms while running task ${taskLabel}`, { diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group1/get_action_error_log.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group1/get_action_error_log.ts index dbb8cee8673b8..2a25cf481407e 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group1/get_action_error_log.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group1/get_action_error_log.ts @@ -146,7 +146,7 @@ export default function createGetActionErrorLogTests({ getService }: FtrProvider .send( getTestRuleData({ rule_type_id: 'test.cumulative-firing', - schedule: { interval: '5s' }, + schedule: { interval: '6s' }, actions: [ { id: createdConnector1.id, diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group3/builtin_alert_types/es_query/common.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group3/builtin_alert_types/es_query/common.ts index 26d8c64a30296..a08dba15f77ba 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group3/builtin_alert_types/es_query/common.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group3/builtin_alert_types/es_query/common.ts @@ -20,7 +20,7 @@ export const ES_TEST_OUTPUT_INDEX_NAME = `${ES_TEST_INDEX_NAME}-output`; export const ES_TEST_DATA_STREAM_NAME = 'test-data-stream'; export const RULE_INTERVALS_TO_WRITE = 5; -export const RULE_INTERVAL_SECONDS = 4; +export const RULE_INTERVAL_SECONDS = 6; export const RULE_INTERVAL_MILLIS = RULE_INTERVAL_SECONDS * 1000; export const ES_GROUPS_TO_WRITE = 3; diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group3/builtin_alert_types/es_query/esql_only.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group3/builtin_alert_types/es_query/esql_only.ts index 10aaf21a28a07..e748b56bd64cb 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group3/builtin_alert_types/es_query/esql_only.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group3/builtin_alert_types/es_query/esql_only.ts @@ -82,7 +82,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { }); const docs = await waitForDocs(2); - const messagePattern = /Document count is \d+ in the last 20s. Alert when greater than 0./; + const messagePattern = /Document count is \d+ in the last 30s. Alert when greater than 0./; for (let i = 0; i < docs.length; i++) { const doc = docs[i]; @@ -136,7 +136,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { expect(name).to.be('always fire'); expect(title).to.be(`rule 'always fire' matched query`); - const messagePattern = /Document count is \d+ in the last 20s. Alert when greater than 0./; + const messagePattern = /Document count is \d+ in the last 30s. Alert when greater than 0./; expect(message).to.match(messagePattern); expect(hits).not.to.be.empty(); } @@ -156,7 +156,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { expect(name).to.be('always fire'); expect(title).to.be(`rule 'always fire' matched query`); - const messagePattern = /Document count is \d+ in the last 20s. Alert when greater than 0./; + const messagePattern = /Document count is \d+ in the last 30s. Alert when greater than 0./; expect(message).to.match(messagePattern); expect(hits).not.to.be.empty(); } @@ -186,7 +186,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { expect(activeTitle).to.be(`rule 'fire then recovers' matched query`); expect(activeValue).to.be('1'); expect(activeMessage).to.match( - /Document count is \d+ in the last 4s. Alert when greater than 0./ + /Document count is \d+ in the last 6s. Alert when greater than 0./ ); await createEsDocumentsInGroups(1, endDate); docs = await waitForDocs(2); @@ -200,7 +200,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { expect(recoveredName).to.be('fire then recovers'); expect(recoveredTitle).to.be(`rule 'fire then recovers' recovered`); expect(recoveredMessage).to.match( - /Document count is \d+ in the last 4s. Alert when greater than 0./ + /Document count is \d+ in the last 6s. Alert when greater than 0./ ); }); @@ -223,7 +223,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { 'from test-data-stream | stats c = count(@timestamp) by host.hostname, host.name, host.id | where c > -1', }); - const messagePattern = /Document count is \d+ in the last 20s. Alert when greater than 0./; + const messagePattern = /Document count is \d+ in the last 30s. Alert when greater than 0./; const docs = await waitForDocs(2); for (let i = 0; i < docs.length; i++) { diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group3/builtin_alert_types/es_query/rule.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group3/builtin_alert_types/es_query/rule.ts index d53c985f616f3..5ad588a6924de 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group3/builtin_alert_types/es_query/rule.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group3/builtin_alert_types/es_query/rule.ts @@ -151,7 +151,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { const docs = await waitForDocs(2); const messagePattern = - /Document count is \d+.?\d* in the last 20s in .kibana-alerting-test-data (?:index|data view). Alert when greater than -1./; + /Document count is \d+.?\d* in the last 30s in .kibana-alerting-test-data (?:index|data view). Alert when greater than -1./; for (let i = 0; i < docs.length; i++) { const doc = docs[i]; @@ -269,7 +269,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { await initData(); const messagePattern = - /Document count is \d+.?\d* in the last 20s in .kibana-alerting-test-data (?:index|data view). Alert when greater than -1./; + /Document count is \d+.?\d* in the last 30s in .kibana-alerting-test-data (?:index|data view). Alert when greater than -1./; const docs = await waitForDocs(2); for (let i = 0; i < docs.length; i++) { @@ -391,7 +391,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { await initData(); const messagePattern = - /Document count is \d+.?\d* in the last 20s for group-\d+ in .kibana-alerting-test-data (?:index|data view). Alert when greater than -1./; + /Document count is \d+.?\d* in the last 30s for group-\d+ in .kibana-alerting-test-data (?:index|data view). Alert when greater than -1./; const titlePattern = /rule 'always fire' matched query for group group-\d/; const conditionPattern = /Number of matching documents for group "group-\d" is greater than -1/; @@ -478,7 +478,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { await initData(); const messagePattern = - /Document count is \d+.?\d* in the last 20s for group-\d+,\d+ in .kibana-alerting-test-data (?:index|data view). Alert when greater than -1./; + /Document count is \d+.?\d* in the last 30s for group-\d+,\d+ in .kibana-alerting-test-data (?:index|data view). Alert when greater than -1./; const titlePattern = /rule 'always fire' matched query for group group-\d+,\d+/; const conditionPattern = /Number of matching documents for group "group-\d+,\d+" is greater than -1/; @@ -608,7 +608,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { const titlePattern = /rule 'always fire' matched query for group group-\d/; expect(title).to.match(titlePattern); const messagePattern = - /Document count is \d+.?\d* in the last 20s for group-\d+ in .kibana-alerting-test-data (?:index|data view). Alert when greater than -1./; + /Document count is \d+.?\d* in the last 30s for group-\d+ in .kibana-alerting-test-data (?:index|data view). Alert when greater than -1./; expect(message).to.match(messagePattern); expect(hits).not.to.be.empty(); @@ -696,7 +696,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { expect(name).to.be('always fire'); expect(title).to.be(`rule 'always fire' matched query`); const messagePattern = - /Document count is \d+.?\d* in the last 20s in .kibana-alerting-test-data (?:index|data view). ./; + /Document count is \d+.?\d* in the last 30s in .kibana-alerting-test-data (?:index|data view). ./; expect(message).to.match(messagePattern); expect(hits).not.to.be.empty(); @@ -806,7 +806,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { expect(name).to.be('fires once'); expect(title).to.be(`rule 'fires once' matched query`); const messagePattern = - /Document count is \d+.?\d* in the last 20s in .kibana-alerting-test-data (?:index|data view). Alert when greater than or equal to 0./; + /Document count is \d+.?\d* in the last 30s in .kibana-alerting-test-data (?:index|data view). Alert when greater than or equal to 0./; expect(message).to.match(messagePattern); expect(hits).not.to.be.empty(); expect(previousTimestamp).to.be.empty(); @@ -866,7 +866,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { expect(name).to.be('always fire'); expect(title).to.be(`rule 'always fire' matched query`); const messagePattern = - /Document count is \d+.?\d* in the last 20s in .kibana-alerting-test-data (?:index|data view). Alert when less than 1./; + /Document count is \d+.?\d* in the last 30s in .kibana-alerting-test-data (?:index|data view). Alert when less than 1./; expect(message).to.match(messagePattern); expect(hits).to.be.empty(); @@ -944,7 +944,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { expect(activeTitle).to.be(`rule 'fire then recovers' matched query`); expect(activeValue).to.be('0'); expect(activeMessage).to.match( - /Document count is \d+.?\d* in the last 4s in .kibana-alerting-test-data (?:index|data view). Alert when less than 1./ + /Document count is \d+.?\d* in the last 6s in .kibana-alerting-test-data (?:index|data view). Alert when less than 1./ ); await createEsDocumentsInGroups(1, endDate); @@ -959,7 +959,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { expect(recoveredName).to.be('fire then recovers'); expect(recoveredTitle).to.be(`rule 'fire then recovers' recovered`); expect(recoveredMessage).to.match( - /Document count is \d+.?\d* in the last 4s in .kibana-alerting-test-data (?:index|data view). Alert when less than 1./ + /Document count is \d+.?\d* in the last 6s in .kibana-alerting-test-data (?:index|data view). Alert when less than 1./ ); }) ); @@ -1044,7 +1044,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { await initData(); const messagePattern = - /Document count is \d+.?\d* in the last 20s in test-data-stream (?:index|data view). Alert when greater than -1./; + /Document count is \d+.?\d* in the last 30s in test-data-stream (?:index|data view). Alert when greater than -1./; const docs = await waitForDocs(2); for (let i = 0; i < docs.length; i++) { @@ -1179,7 +1179,7 @@ export default function ruleTests({ getService }: FtrProviderContext) { expect(docs[0]._source.hits.length).greaterThan(0); const messagePattern = - /Document count is \d+.?\d* in the last 20s in .kibana-alerting-test-data (?:index|data view). Alert when greater than 0./; + /Document count is \d+.?\d* in the last 30s in .kibana-alerting-test-data (?:index|data view). Alert when greater than 0./; expect(docs[0]._source.params.message).to.match(messagePattern); expect(docs[1]._source.hits.length).to.be(0);