Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] Consistent scheduling when tasks run within the poll interval of their original time (#190093) #193022

Merged
merged 1 commit into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions x-pack/plugins/task_manager/server/config.mock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { type TaskManagerConfig, configSchema } from './config';

const createConfigMock = (overwrites: Partial<TaskManagerConfig> = {}) => {
const mocked: TaskManagerConfig = configSchema.validate(overwrites);
return mocked;
};

export const configMock = {
create: createConfigMock,
};
58 changes: 58 additions & 0 deletions x-pack/plugins/task_manager/server/lib/get_next_run_at.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { taskManagerMock } from '../mocks';

import { getNextRunAt } from './get_next_run_at';

describe('getNextRunAt', () => {
test('should use startedAt when the task delay is greater than the threshold', () => {
const now = new Date();
// Use time in the past to ensure the task delay calculation isn't relative to "now"
const fiveSecondsAgo = new Date(now.getTime() - 5000);
const fourSecondsAgo = new Date(now.getTime() - 4000);
const nextRunAt = getNextRunAt(
taskManagerMock.createTask({
schedule: { interval: '1m' },
runAt: fiveSecondsAgo,
startedAt: fourSecondsAgo,
}),
500
);
expect(nextRunAt).toEqual(new Date(fourSecondsAgo.getTime() + 60000));
});

test('should use runAt when the task delay is greater than the threshold', () => {
const now = new Date();
// Use time in the past to ensure the task delay calculation isn't relative to "now"
const fiveSecondsAgo = new Date(now.getTime() - 5000);
const aBitLessThanFiveSecondsAgo = new Date(now.getTime() - 4995);
const nextRunAt = getNextRunAt(
taskManagerMock.createTask({
schedule: { interval: '1m' },
runAt: fiveSecondsAgo,
startedAt: aBitLessThanFiveSecondsAgo,
}),
500
);
expect(nextRunAt).toEqual(new Date(fiveSecondsAgo.getTime() + 60000));
});

test('should not schedule in the past', () => {
const testStart = new Date();
const fiveMinsAgo = new Date(Date.now() - 300000);
const nextRunAt = getNextRunAt(
taskManagerMock.createTask({
schedule: { interval: '1m' },
runAt: fiveMinsAgo,
startedAt: fiveMinsAgo,
}),
0
);
expect(nextRunAt.getTime()).toBeGreaterThanOrEqual(testStart.getTime());
});
});
25 changes: 25 additions & 0 deletions x-pack/plugins/task_manager/server/lib/get_next_run_at.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { intervalFromDate } from './intervals';
import type { ConcreteTaskInstance } from '../task';

export function getNextRunAt(
{ runAt, startedAt, schedule }: Pick<ConcreteTaskInstance, 'runAt' | 'startedAt' | 'schedule'>,
taskDelayThresholdForPreciseScheduling: number = 0
): Date {
const taskDelay = startedAt!.getTime() - runAt.getTime();
const scheduleFromDate = taskDelay < taskDelayThresholdForPreciseScheduling ? runAt : startedAt!;

// Ensure we also don't schedule in the past by performing the Math.max with Date.now()
const nextCalculatedRunAt = Math.max(
intervalFromDate(scheduleFromDate, schedule!.interval)!.getTime(),
Date.now()
);

return new Date(nextCalculatedRunAt);
}
5 changes: 4 additions & 1 deletion x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
private taskClaiming: TaskClaiming;
private bufferedStore: BufferedTaskStore;
private readonly executionContext: ExecutionContextStart;
private readonly pollIntervalConfiguration$: Observable<number>;

private logger: Logger;
public pool: TaskPool;
Expand Down Expand Up @@ -122,6 +123,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
this.executionContext = executionContext;
this.usageCounter = usageCounter;
this.config = config;
this.pollIntervalConfiguration$ = pollIntervalConfiguration$;

const emitEvent = (event: TaskLifecycleEvent) => this.events$.next(event);

Expand Down Expand Up @@ -220,9 +222,10 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
defaultMaxAttempts: this.taskClaiming.maxAttempts,
executionContext: this.executionContext,
usageCounter: this.usageCounter,
eventLoopDelayConfig: { ...this.config.event_loop_delay },
config: this.config,
allowReadingInvalidState: this.config.allow_reading_invalid_state,
strategy: this.config.claim_strategy,
pollIntervalConfiguration$: this.pollIntervalConfiguration$,
});
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import _ from 'lodash';
import sinon from 'sinon';
import { secondsFromNow } from '../lib/intervals';
import { asOk, asErr } from '../lib/result_type';
import { BehaviorSubject } from 'rxjs';
import {
createTaskRunError,
TaskErrorSource,
Expand Down Expand Up @@ -41,10 +42,13 @@ import {
} from './task_runner';
import { schema } from '@kbn/config-schema';
import { CLAIM_STRATEGY_MGET, CLAIM_STRATEGY_UPDATE_BY_QUERY } from '../config';
import * as nextRunAtUtils from '../lib/get_next_run_at';
import { configMock } from '../config.mock';

const baseDelay = 5 * 60 * 1000;
const executionContext = executionContextServiceMock.createSetupContract();
const minutesFromNow = (mins: number): Date => secondsFromNow(mins * 60);
const getNextRunAtSpy = jest.spyOn(nextRunAtUtils, 'getNextRunAt');

let fakeTimer: sinon.SinonFakeTimers;

Expand Down Expand Up @@ -977,6 +981,8 @@ describe('TaskManagerRunner', () => {
expect(instance.params).toEqual({ a: 'b' });
expect(instance.state).toEqual({ hey: 'there' });
expect(instance.enabled).not.toBeDefined();

expect(getNextRunAtSpy).not.toHaveBeenCalled();
});

test('reschedules tasks that have an schedule', async () => {
Expand Down Expand Up @@ -1007,6 +1013,8 @@ describe('TaskManagerRunner', () => {
expect(instance.runAt.getTime()).toBeGreaterThan(minutesFromNow(9).getTime());
expect(instance.runAt.getTime()).toBeLessThanOrEqual(minutesFromNow(10).getTime());
expect(instance.enabled).not.toBeDefined();

expect(getNextRunAtSpy).toHaveBeenCalled();
});

test('expiration returns time after which timeout will have elapsed from start', async () => {
Expand Down Expand Up @@ -1084,6 +1092,8 @@ describe('TaskManagerRunner', () => {
expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }), {
validate: true,
});

expect(getNextRunAtSpy).not.toHaveBeenCalled();
});

test('reschedules tasks that return a schedule', async () => {
Expand Down Expand Up @@ -1114,6 +1124,11 @@ describe('TaskManagerRunner', () => {
expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }), {
validate: true,
});

expect(getNextRunAtSpy).toHaveBeenCalledWith(
expect.objectContaining({ schedule }),
expect.any(Number)
);
});

test(`doesn't reschedule recurring tasks that throw an unrecoverable error`, async () => {
Expand Down Expand Up @@ -2479,12 +2494,15 @@ describe('TaskManagerRunner', () => {
onTaskEvent: opts.onTaskEvent,
executionContext,
usageCounter,
eventLoopDelayConfig: {
monitor: true,
warn_threshold: 5000,
},
config: configMock.create({
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
}),
allowReadingInvalidState: opts.allowReadingInvalidState || false,
strategy: opts.strategy ?? CLAIM_STRATEGY_UPDATE_BY_QUERY,
pollIntervalConfiguration$: new BehaviorSubject(500),
});

if (stage === TaskRunningStage.READY_TO_RUN) {
Expand Down
38 changes: 27 additions & 11 deletions x-pack/plugins/task_manager/server/task_running/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* rescheduling, middleware application, etc.
*/

import { Observable } from 'rxjs';
import apm from 'elastic-apm-node';
import { v4 as uuidv4 } from 'uuid';
import { withSpan } from '@kbn/apm-utils';
Expand Down Expand Up @@ -54,9 +55,10 @@ import {
} from '../task';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { isUnrecoverableError } from './errors';
import { CLAIM_STRATEGY_MGET, type EventLoopDelayConfig } from '../config';
import { CLAIM_STRATEGY_MGET, type TaskManagerConfig } from '../config';
import { TaskValidator } from '../task_validator';
import { getRetryAt, getRetryDate, getTimeout } from '../lib/get_retry_at';
import { getNextRunAt } from '../lib/get_next_run_at';

export const EMPTY_RUN_RESULT: SuccessfulRunResult = { state: {} };

Expand Down Expand Up @@ -107,9 +109,10 @@ type Opts = {
defaultMaxAttempts: number;
executionContext: ExecutionContextStart;
usageCounter?: UsageCounter;
eventLoopDelayConfig: EventLoopDelayConfig;
config: TaskManagerConfig;
allowReadingInvalidState: boolean;
strategy: string;
pollIntervalConfiguration$: Observable<number>;
} & Pick<Middleware, 'beforeRun' | 'beforeMarkRunning'>;

export enum TaskRunResult {
Expand Down Expand Up @@ -159,9 +162,10 @@ export class TaskManagerRunner implements TaskRunner {
private uuid: string;
private readonly executionContext: ExecutionContextStart;
private usageCounter?: UsageCounter;
private eventLoopDelayConfig: EventLoopDelayConfig;
private config: TaskManagerConfig;
private readonly taskValidator: TaskValidator;
private readonly claimStrategy: string;
private currentPollInterval: number;

/**
* Creates an instance of TaskManagerRunner.
Expand All @@ -184,9 +188,10 @@ export class TaskManagerRunner implements TaskRunner {
onTaskEvent = identity,
executionContext,
usageCounter,
eventLoopDelayConfig,
config,
allowReadingInvalidState,
strategy,
pollIntervalConfiguration$,
}: Opts) {
this.instance = asPending(sanitizeInstance(instance));
this.definitions = definitions;
Expand All @@ -199,13 +204,17 @@ export class TaskManagerRunner implements TaskRunner {
this.executionContext = executionContext;
this.usageCounter = usageCounter;
this.uuid = uuidv4();
this.eventLoopDelayConfig = eventLoopDelayConfig;
this.config = config;
this.taskValidator = new TaskValidator({
logger: this.logger,
definitions: this.definitions,
allowReadingInvalidState,
});
this.claimStrategy = strategy;
this.currentPollInterval = config.poll_interval;
pollIntervalConfiguration$.subscribe((pollInterval) => {
this.currentPollInterval = pollInterval;
});
}

/**
Expand Down Expand Up @@ -334,7 +343,7 @@ export class TaskManagerRunner implements TaskRunner {
const apmTrans = apm.startTransaction(this.taskType, TASK_MANAGER_RUN_TRANSACTION_TYPE, {
childOf: this.instance.task.traceparent,
});
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring(this.eventLoopDelayConfig);
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring(this.config.event_loop_delay);

// Validate state
const stateValidationResult = this.validateTaskState(this.instance.task);
Expand Down Expand Up @@ -636,13 +645,20 @@ export class TaskManagerRunner implements TaskRunner {
return asOk({ status: TaskStatus.ShouldDelete });
}

const { startedAt, schedule } = this.instance.task;

const updatedTaskSchedule = reschedule ?? this.instance.task.schedule;
return asOk({
runAt:
runAt || intervalFromDate(startedAt!, reschedule?.interval ?? schedule?.interval)!,
runAt ||
getNextRunAt(
{
runAt: this.instance.task.runAt,
startedAt: this.instance.task.startedAt,
schedule: updatedTaskSchedule,
},
this.currentPollInterval
),
state,
schedule: reschedule ?? schedule,
schedule: updatedTaskSchedule,
attempts,
status: TaskStatus.Idle,
});
Expand Down Expand Up @@ -790,7 +806,7 @@ export class TaskManagerRunner implements TaskRunner {

const { eventLoopBlockMs = 0 } = taskTiming;
const taskLabel = `${this.taskType} ${this.instance.task.id}`;
if (eventLoopBlockMs > this.eventLoopDelayConfig.warn_threshold) {
if (eventLoopBlockMs > this.config.event_loop_delay.warn_threshold) {
this.logger.warn(
`event loop blocked for at least ${eventLoopBlockMs} ms while running task ${taskLabel}`,
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ export default function createGetActionErrorLogTests({ getService }: FtrProvider
.send(
getTestRuleData({
rule_type_id: 'test.cumulative-firing',
schedule: { interval: '5s' },
schedule: { interval: '6s' },
actions: [
{
id: createdConnector1.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export const ES_TEST_OUTPUT_INDEX_NAME = `${ES_TEST_INDEX_NAME}-output`;
export const ES_TEST_DATA_STREAM_NAME = 'test-data-stream';

export const RULE_INTERVALS_TO_WRITE = 5;
export const RULE_INTERVAL_SECONDS = 4;
export const RULE_INTERVAL_SECONDS = 6;
export const RULE_INTERVAL_MILLIS = RULE_INTERVAL_SECONDS * 1000;
export const ES_GROUPS_TO_WRITE = 3;

Expand Down
Loading