Skip to content

Commit

Permalink
[8.x] Consistent scheduling when tasks run within the poll interval o…
Browse files Browse the repository at this point in the history
…f their original time (elastic#190093) (elastic#193022)

# Backport

This will backport the following commits from `main` to `8.x`:
- [Consistent scheduling when tasks run within the poll interval of
their original time
(elastic#190093)](elastic#190093)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Mike
Côté","email":"[email protected]"},"sourceCommit":{"committedDate":"2024-09-16T14:10:36Z","message":"Consistent
scheduling when tasks run within the poll interval of their original
time (elastic#190093)\n\nResolves
https://github.com/elastic/kibana/issues/189114\r\n\r\nIn this PR, I'm
changing the logic to calculate the task's next run at.\r\nWhenever the
gap between the task's runAt and when it was picked up is\r\nless than
the poll interval, we'll use the `runAt` to schedule the next.\r\nThis
way we don't continuously add time to the task's next run
(ex:\r\nrunning every 1m turns into every 1m 3s).\r\n\r\nI've had to
modify a few tests to have a more increased interval because\r\nthis
made tasks run more frequently (on time), which
introduced\r\nflakiness.\r\n\r\n## To verify\r\n1. Create an alerting
rule that runs every 10s\r\n2. Apply the following diff to your
code\r\n```\r\ndiff --git
a/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts
b/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts\r\nindex
55d5f85e5d3..4342dcdd845 100644\r\n---
a/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts\r\n+++
b/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts\r\n@@ -31,5
+31,7 @@ export function getNextRunAt(\r\n Date.now()\r\n );\r\n\r\n+
console.log(`*** Next run at: ${new
Date(nextCalculatedRunAt).toISOString()},
interval=${newSchedule?.interval ?? schedule.interval},
originalRunAt=${originalRunAt.toISOString()},
startedAt=${startedAt.toISOString()}`);\r\n+\r\n return new
Date(nextCalculatedRunAt);\r\n }\r\n```\r\n3. Observe the logs, the gap
between runAt and startedAt should be less\r\nthan the poll interval, so
the next run at is based on `runAt` instead\r\nof `startedAt`.\r\n4.
Stop Kibana for 15 seconds then start it again\r\n5. Observe the first
logs when the rule runs again and notice now that\r\nthe gap between
runAt and startedAt is larger than the poll interval,\r\nthe next run at
is based on `startedAt` instead of `runAt` to spread the\r\ntasks out
evenly.\r\n\r\n---------\r\n\r\nCo-authored-by: Elastic Machine
<[email protected]>","sha":"1f673dc9f12e90a6aa41a903fee8b0adafcdcaf9","branchLabelMapping":{"^v9.0.0$":"main","^v8.16.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Feature:Task
Manager","Team:ResponseOps","v9.0.0","backport:prev-minor","v8.16.0"],"title":"Consistent
scheduling when tasks run within the poll interval of their original
time","number":190093,"url":"https://github.com/elastic/kibana/pull/190093","mergeCommit":{"message":"Consistent
scheduling when tasks run within the poll interval of their original
time (elastic#190093)\n\nResolves
https://github.com/elastic/kibana/issues/189114\r\n\r\nIn this PR, I'm
changing the logic to calculate the task's next run at.\r\nWhenever the
gap between the task's runAt and when it was picked up is\r\nless than
the poll interval, we'll use the `runAt` to schedule the next.\r\nThis
way we don't continuously add time to the task's next run
(ex:\r\nrunning every 1m turns into every 1m 3s).\r\n\r\nI've had to
modify a few tests to have a more increased interval because\r\nthis
made tasks run more frequently (on time), which
introduced\r\nflakiness.\r\n\r\n## To verify\r\n1. Create an alerting
rule that runs every 10s\r\n2. Apply the following diff to your
code\r\n```\r\ndiff --git
a/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts
b/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts\r\nindex
55d5f85e5d3..4342dcdd845 100644\r\n---
a/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts\r\n+++
b/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts\r\n@@ -31,5
+31,7 @@ export function getNextRunAt(\r\n Date.now()\r\n );\r\n\r\n+
console.log(`*** Next run at: ${new
Date(nextCalculatedRunAt).toISOString()},
interval=${newSchedule?.interval ?? schedule.interval},
originalRunAt=${originalRunAt.toISOString()},
startedAt=${startedAt.toISOString()}`);\r\n+\r\n return new
Date(nextCalculatedRunAt);\r\n }\r\n```\r\n3. Observe the logs, the gap
between runAt and startedAt should be less\r\nthan the poll interval, so
the next run at is based on `runAt` instead\r\nof `startedAt`.\r\n4.
Stop Kibana for 15 seconds then start it again\r\n5. Observe the first
logs when the rule runs again and notice now that\r\nthe gap between
runAt and startedAt is larger than the poll interval,\r\nthe next run at
is based on `startedAt` instead of `runAt` to spread the\r\ntasks out
evenly.\r\n\r\n---------\r\n\r\nCo-authored-by: Elastic Machine
<[email protected]>","sha":"1f673dc9f12e90a6aa41a903fee8b0adafcdcaf9"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/190093","number":190093,"mergeCommit":{"message":"Consistent
scheduling when tasks run within the poll interval of their original
time (elastic#190093)\n\nResolves
https://github.com/elastic/kibana/issues/189114\r\n\r\nIn this PR, I'm
changing the logic to calculate the task's next run at.\r\nWhenever the
gap between the task's runAt and when it was picked up is\r\nless than
the poll interval, we'll use the `runAt` to schedule the next.\r\nThis
way we don't continuously add time to the task's next run
(ex:\r\nrunning every 1m turns into every 1m 3s).\r\n\r\nI've had to
modify a few tests to have a more increased interval because\r\nthis
made tasks run more frequently (on time), which
introduced\r\nflakiness.\r\n\r\n## To verify\r\n1. Create an alerting
rule that runs every 10s\r\n2. Apply the following diff to your
code\r\n```\r\ndiff --git
a/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts
b/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts\r\nindex
55d5f85e5d3..4342dcdd845 100644\r\n---
a/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts\r\n+++
b/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts\r\n@@ -31,5
+31,7 @@ export function getNextRunAt(\r\n Date.now()\r\n );\r\n\r\n+
console.log(`*** Next run at: ${new
Date(nextCalculatedRunAt).toISOString()},
interval=${newSchedule?.interval ?? schedule.interval},
originalRunAt=${originalRunAt.toISOString()},
startedAt=${startedAt.toISOString()}`);\r\n+\r\n return new
Date(nextCalculatedRunAt);\r\n }\r\n```\r\n3. Observe the logs, the gap
between runAt and startedAt should be less\r\nthan the poll interval, so
the next run at is based on `runAt` instead\r\nof `startedAt`.\r\n4.
Stop Kibana for 15 seconds then start it again\r\n5. Observe the first
logs when the rule runs again and notice now that\r\nthe gap between
runAt and startedAt is larger than the poll interval,\r\nthe next run at
is based on `startedAt` instead of `runAt` to spread the\r\ntasks out
evenly.\r\n\r\n---------\r\n\r\nCo-authored-by: Elastic Machine
<[email protected]>","sha":"1f673dc9f12e90a6aa41a903fee8b0adafcdcaf9"}},{"branch":"8.x","label":"v8.16.0","branchLabelMappingKey":"^v8.16.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Mike Côté <[email protected]>
  • Loading branch information
kibanamachine and mikecote authored Sep 16, 2024
1 parent 7939f22 commit 64e5384
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 36 deletions.
17 changes: 17 additions & 0 deletions x-pack/plugins/task_manager/server/config.mock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { type TaskManagerConfig, configSchema } from './config';

const createConfigMock = (overwrites: Partial<TaskManagerConfig> = {}) => {
const mocked: TaskManagerConfig = configSchema.validate(overwrites);
return mocked;
};

export const configMock = {
create: createConfigMock,
};
58 changes: 58 additions & 0 deletions x-pack/plugins/task_manager/server/lib/get_next_run_at.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { taskManagerMock } from '../mocks';

import { getNextRunAt } from './get_next_run_at';

describe('getNextRunAt', () => {
test('should use startedAt when the task delay is greater than the threshold', () => {
const now = new Date();
// Use time in the past to ensure the task delay calculation isn't relative to "now"
const fiveSecondsAgo = new Date(now.getTime() - 5000);
const fourSecondsAgo = new Date(now.getTime() - 4000);
const nextRunAt = getNextRunAt(
taskManagerMock.createTask({
schedule: { interval: '1m' },
runAt: fiveSecondsAgo,
startedAt: fourSecondsAgo,
}),
500
);
expect(nextRunAt).toEqual(new Date(fourSecondsAgo.getTime() + 60000));
});

test('should use runAt when the task delay is greater than the threshold', () => {
const now = new Date();
// Use time in the past to ensure the task delay calculation isn't relative to "now"
const fiveSecondsAgo = new Date(now.getTime() - 5000);
const aBitLessThanFiveSecondsAgo = new Date(now.getTime() - 4995);
const nextRunAt = getNextRunAt(
taskManagerMock.createTask({
schedule: { interval: '1m' },
runAt: fiveSecondsAgo,
startedAt: aBitLessThanFiveSecondsAgo,
}),
500
);
expect(nextRunAt).toEqual(new Date(fiveSecondsAgo.getTime() + 60000));
});

test('should not schedule in the past', () => {
const testStart = new Date();
const fiveMinsAgo = new Date(Date.now() - 300000);
const nextRunAt = getNextRunAt(
taskManagerMock.createTask({
schedule: { interval: '1m' },
runAt: fiveMinsAgo,
startedAt: fiveMinsAgo,
}),
0
);
expect(nextRunAt.getTime()).toBeGreaterThanOrEqual(testStart.getTime());
});
});
25 changes: 25 additions & 0 deletions x-pack/plugins/task_manager/server/lib/get_next_run_at.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { intervalFromDate } from './intervals';
import type { ConcreteTaskInstance } from '../task';

export function getNextRunAt(
{ runAt, startedAt, schedule }: Pick<ConcreteTaskInstance, 'runAt' | 'startedAt' | 'schedule'>,
taskDelayThresholdForPreciseScheduling: number = 0
): Date {
const taskDelay = startedAt!.getTime() - runAt.getTime();
const scheduleFromDate = taskDelay < taskDelayThresholdForPreciseScheduling ? runAt : startedAt!;

// Ensure we also don't schedule in the past by performing the Math.max with Date.now()
const nextCalculatedRunAt = Math.max(
intervalFromDate(scheduleFromDate, schedule!.interval)!.getTime(),
Date.now()
);

return new Date(nextCalculatedRunAt);
}
5 changes: 4 additions & 1 deletion x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
private taskClaiming: TaskClaiming;
private bufferedStore: BufferedTaskStore;
private readonly executionContext: ExecutionContextStart;
private readonly pollIntervalConfiguration$: Observable<number>;

private logger: Logger;
public pool: TaskPool;
Expand Down Expand Up @@ -122,6 +123,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
this.executionContext = executionContext;
this.usageCounter = usageCounter;
this.config = config;
this.pollIntervalConfiguration$ = pollIntervalConfiguration$;

const emitEvent = (event: TaskLifecycleEvent) => this.events$.next(event);

Expand Down Expand Up @@ -220,9 +222,10 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
defaultMaxAttempts: this.taskClaiming.maxAttempts,
executionContext: this.executionContext,
usageCounter: this.usageCounter,
eventLoopDelayConfig: { ...this.config.event_loop_delay },
config: this.config,
allowReadingInvalidState: this.config.allow_reading_invalid_state,
strategy: this.config.claim_strategy,
pollIntervalConfiguration$: this.pollIntervalConfiguration$,
});
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import _ from 'lodash';
import sinon from 'sinon';
import { secondsFromNow } from '../lib/intervals';
import { asOk, asErr } from '../lib/result_type';
import { BehaviorSubject } from 'rxjs';
import {
createTaskRunError,
TaskErrorSource,
Expand Down Expand Up @@ -41,10 +42,13 @@ import {
} from './task_runner';
import { schema } from '@kbn/config-schema';
import { CLAIM_STRATEGY_MGET, CLAIM_STRATEGY_UPDATE_BY_QUERY } from '../config';
import * as nextRunAtUtils from '../lib/get_next_run_at';
import { configMock } from '../config.mock';

const baseDelay = 5 * 60 * 1000;
const executionContext = executionContextServiceMock.createSetupContract();
const minutesFromNow = (mins: number): Date => secondsFromNow(mins * 60);
const getNextRunAtSpy = jest.spyOn(nextRunAtUtils, 'getNextRunAt');

let fakeTimer: sinon.SinonFakeTimers;

Expand Down Expand Up @@ -977,6 +981,8 @@ describe('TaskManagerRunner', () => {
expect(instance.params).toEqual({ a: 'b' });
expect(instance.state).toEqual({ hey: 'there' });
expect(instance.enabled).not.toBeDefined();

expect(getNextRunAtSpy).not.toHaveBeenCalled();
});

test('reschedules tasks that have an schedule', async () => {
Expand Down Expand Up @@ -1007,6 +1013,8 @@ describe('TaskManagerRunner', () => {
expect(instance.runAt.getTime()).toBeGreaterThan(minutesFromNow(9).getTime());
expect(instance.runAt.getTime()).toBeLessThanOrEqual(minutesFromNow(10).getTime());
expect(instance.enabled).not.toBeDefined();

expect(getNextRunAtSpy).toHaveBeenCalled();
});

test('expiration returns time after which timeout will have elapsed from start', async () => {
Expand Down Expand Up @@ -1084,6 +1092,8 @@ describe('TaskManagerRunner', () => {
expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }), {
validate: true,
});

expect(getNextRunAtSpy).not.toHaveBeenCalled();
});

test('reschedules tasks that return a schedule', async () => {
Expand Down Expand Up @@ -1114,6 +1124,11 @@ describe('TaskManagerRunner', () => {
expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }), {
validate: true,
});

expect(getNextRunAtSpy).toHaveBeenCalledWith(
expect.objectContaining({ schedule }),
expect.any(Number)
);
});

test(`doesn't reschedule recurring tasks that throw an unrecoverable error`, async () => {
Expand Down Expand Up @@ -2479,12 +2494,15 @@ describe('TaskManagerRunner', () => {
onTaskEvent: opts.onTaskEvent,
executionContext,
usageCounter,
eventLoopDelayConfig: {
monitor: true,
warn_threshold: 5000,
},
config: configMock.create({
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
}),
allowReadingInvalidState: opts.allowReadingInvalidState || false,
strategy: opts.strategy ?? CLAIM_STRATEGY_UPDATE_BY_QUERY,
pollIntervalConfiguration$: new BehaviorSubject(500),
});

if (stage === TaskRunningStage.READY_TO_RUN) {
Expand Down
38 changes: 27 additions & 11 deletions x-pack/plugins/task_manager/server/task_running/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* rescheduling, middleware application, etc.
*/

import { Observable } from 'rxjs';
import apm from 'elastic-apm-node';
import { v4 as uuidv4 } from 'uuid';
import { withSpan } from '@kbn/apm-utils';
Expand Down Expand Up @@ -55,9 +56,10 @@ import {
} from '../task';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { isUnrecoverableError } from './errors';
import { CLAIM_STRATEGY_MGET, type EventLoopDelayConfig } from '../config';
import { CLAIM_STRATEGY_MGET, type TaskManagerConfig } from '../config';
import { TaskValidator } from '../task_validator';
import { getRetryAt, getRetryDate, getTimeout } from '../lib/get_retry_at';
import { getNextRunAt } from '../lib/get_next_run_at';

export const EMPTY_RUN_RESULT: SuccessfulRunResult = { state: {} };

Expand Down Expand Up @@ -108,9 +110,10 @@ type Opts = {
defaultMaxAttempts: number;
executionContext: ExecutionContextStart;
usageCounter?: UsageCounter;
eventLoopDelayConfig: EventLoopDelayConfig;
config: TaskManagerConfig;
allowReadingInvalidState: boolean;
strategy: string;
pollIntervalConfiguration$: Observable<number>;
} & Pick<Middleware, 'beforeRun' | 'beforeMarkRunning'>;

export enum TaskRunResult {
Expand Down Expand Up @@ -160,9 +163,10 @@ export class TaskManagerRunner implements TaskRunner {
private uuid: string;
private readonly executionContext: ExecutionContextStart;
private usageCounter?: UsageCounter;
private eventLoopDelayConfig: EventLoopDelayConfig;
private config: TaskManagerConfig;
private readonly taskValidator: TaskValidator;
private readonly claimStrategy: string;
private currentPollInterval: number;

/**
* Creates an instance of TaskManagerRunner.
Expand All @@ -185,9 +189,10 @@ export class TaskManagerRunner implements TaskRunner {
onTaskEvent = identity,
executionContext,
usageCounter,
eventLoopDelayConfig,
config,
allowReadingInvalidState,
strategy,
pollIntervalConfiguration$,
}: Opts) {
this.instance = asPending(sanitizeInstance(instance));
this.definitions = definitions;
Expand All @@ -200,13 +205,17 @@ export class TaskManagerRunner implements TaskRunner {
this.executionContext = executionContext;
this.usageCounter = usageCounter;
this.uuid = uuidv4();
this.eventLoopDelayConfig = eventLoopDelayConfig;
this.config = config;
this.taskValidator = new TaskValidator({
logger: this.logger,
definitions: this.definitions,
allowReadingInvalidState,
});
this.claimStrategy = strategy;
this.currentPollInterval = config.poll_interval;
pollIntervalConfiguration$.subscribe((pollInterval) => {
this.currentPollInterval = pollInterval;
});
}

/**
Expand Down Expand Up @@ -335,7 +344,7 @@ export class TaskManagerRunner implements TaskRunner {
const apmTrans = apm.startTransaction(this.taskType, TASK_MANAGER_RUN_TRANSACTION_TYPE, {
childOf: this.instance.task.traceparent,
});
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring(this.eventLoopDelayConfig);
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring(this.config.event_loop_delay);

// Validate state
const stateValidationResult = this.validateTaskState(this.instance.task);
Expand Down Expand Up @@ -637,13 +646,20 @@ export class TaskManagerRunner implements TaskRunner {
return asOk({ status: TaskStatus.ShouldDelete });
}

const { startedAt, schedule } = this.instance.task;

const updatedTaskSchedule = reschedule ?? this.instance.task.schedule;
return asOk({
runAt:
runAt || intervalFromDate(startedAt!, reschedule?.interval ?? schedule?.interval)!,
runAt ||
getNextRunAt(
{
runAt: this.instance.task.runAt,
startedAt: this.instance.task.startedAt,
schedule: updatedTaskSchedule,
},
this.currentPollInterval
),
state,
schedule: reschedule ?? schedule,
schedule: updatedTaskSchedule,
attempts,
status: TaskStatus.Idle,
});
Expand Down Expand Up @@ -791,7 +807,7 @@ export class TaskManagerRunner implements TaskRunner {

const { eventLoopBlockMs = 0 } = taskTiming;
const taskLabel = `${this.taskType} ${this.instance.task.id}`;
if (eventLoopBlockMs > this.eventLoopDelayConfig.warn_threshold) {
if (eventLoopBlockMs > this.config.event_loop_delay.warn_threshold) {
this.logger.warn(
`event loop blocked for at least ${eventLoopBlockMs} ms while running task ${taskLabel}`,
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ export default function createGetActionErrorLogTests({ getService }: FtrProvider
.send(
getTestRuleData({
rule_type_id: 'test.cumulative-firing',
schedule: { interval: '5s' },
schedule: { interval: '6s' },
actions: [
{
id: createdConnector1.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export const ES_TEST_OUTPUT_INDEX_NAME = `${ES_TEST_INDEX_NAME}-output`;
export const ES_TEST_DATA_STREAM_NAME = 'test-data-stream';

export const RULE_INTERVALS_TO_WRITE = 5;
export const RULE_INTERVAL_SECONDS = 4;
export const RULE_INTERVAL_SECONDS = 6;
export const RULE_INTERVAL_MILLIS = RULE_INTERVAL_SECONDS * 1000;
export const ES_GROUPS_TO_WRITE = 3;

Expand Down
Loading

0 comments on commit 64e5384

Please sign in to comment.