Skip to content

Commit

Permalink
Consistent scheduling when tasks run within the poll interval of thei…
Browse files Browse the repository at this point in the history
…r original time (#190093)

Resolves #189114

In this PR, I'm changing the logic to calculate the task's next run at.
Whenever the gap between the task's runAt and when it was picked up is
less than the poll interval, we'll use the `runAt` to schedule the next.
This way we don't continuously add time to the task's next run (ex:
running every 1m turns into every 1m 3s).

I've had to modify a few tests to have a more increased interval because
this made tasks run more frequently (on time), which introduced
flakiness.

## To verify
1. Create an alerting rule that runs every 10s
2. Apply the following diff to your code
```
diff --git a/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts b/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts
index 55d5f85e5d3..4342dcdd845 100644
--- a/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts
+++ b/x-pack/plugins/task_manager/server/lib/get_next_run_at.ts
@@ -31,5 +31,7 @@ export function getNextRunAt(
     Date.now()
   );

+  console.log(`*** Next run at: ${new Date(nextCalculatedRunAt).toISOString()}, interval=${newSchedule?.interval ?? schedule.interval}, originalRunAt=${originalRunAt.toISOString()}, startedAt=${startedAt.toISOString()}`);
+
   return new Date(nextCalculatedRunAt);
 }
```
3. Observe the logs, the gap between runAt and startedAt should be less
than the poll interval, so the next run at is based on `runAt` instead
of `startedAt`.
4. Stop Kibana for 15 seconds then start it again
5. Observe the first logs when the rule runs again and notice now that
the gap between runAt and startedAt is larger than the poll interval,
the next run at is based on `startedAt` instead of `runAt` to spread the
tasks out evenly.

---------

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
mikecote and elasticmachine authored Sep 16, 2024
1 parent e404a39 commit 1f673dc
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 36 deletions.
17 changes: 17 additions & 0 deletions x-pack/plugins/task_manager/server/config.mock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { type TaskManagerConfig, configSchema } from './config';

const createConfigMock = (overwrites: Partial<TaskManagerConfig> = {}) => {
const mocked: TaskManagerConfig = configSchema.validate(overwrites);
return mocked;
};

export const configMock = {
create: createConfigMock,
};
58 changes: 58 additions & 0 deletions x-pack/plugins/task_manager/server/lib/get_next_run_at.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { taskManagerMock } from '../mocks';

import { getNextRunAt } from './get_next_run_at';

describe('getNextRunAt', () => {
test('should use startedAt when the task delay is greater than the threshold', () => {
const now = new Date();
// Use time in the past to ensure the task delay calculation isn't relative to "now"
const fiveSecondsAgo = new Date(now.getTime() - 5000);
const fourSecondsAgo = new Date(now.getTime() - 4000);
const nextRunAt = getNextRunAt(
taskManagerMock.createTask({
schedule: { interval: '1m' },
runAt: fiveSecondsAgo,
startedAt: fourSecondsAgo,
}),
500
);
expect(nextRunAt).toEqual(new Date(fourSecondsAgo.getTime() + 60000));
});

test('should use runAt when the task delay is greater than the threshold', () => {
const now = new Date();
// Use time in the past to ensure the task delay calculation isn't relative to "now"
const fiveSecondsAgo = new Date(now.getTime() - 5000);
const aBitLessThanFiveSecondsAgo = new Date(now.getTime() - 4995);
const nextRunAt = getNextRunAt(
taskManagerMock.createTask({
schedule: { interval: '1m' },
runAt: fiveSecondsAgo,
startedAt: aBitLessThanFiveSecondsAgo,
}),
500
);
expect(nextRunAt).toEqual(new Date(fiveSecondsAgo.getTime() + 60000));
});

test('should not schedule in the past', () => {
const testStart = new Date();
const fiveMinsAgo = new Date(Date.now() - 300000);
const nextRunAt = getNextRunAt(
taskManagerMock.createTask({
schedule: { interval: '1m' },
runAt: fiveMinsAgo,
startedAt: fiveMinsAgo,
}),
0
);
expect(nextRunAt.getTime()).toBeGreaterThanOrEqual(testStart.getTime());
});
});
25 changes: 25 additions & 0 deletions x-pack/plugins/task_manager/server/lib/get_next_run_at.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { intervalFromDate } from './intervals';
import type { ConcreteTaskInstance } from '../task';

export function getNextRunAt(
{ runAt, startedAt, schedule }: Pick<ConcreteTaskInstance, 'runAt' | 'startedAt' | 'schedule'>,
taskDelayThresholdForPreciseScheduling: number = 0
): Date {
const taskDelay = startedAt!.getTime() - runAt.getTime();
const scheduleFromDate = taskDelay < taskDelayThresholdForPreciseScheduling ? runAt : startedAt!;

// Ensure we also don't schedule in the past by performing the Math.max with Date.now()
const nextCalculatedRunAt = Math.max(
intervalFromDate(scheduleFromDate, schedule!.interval)!.getTime(),
Date.now()
);

return new Date(nextCalculatedRunAt);
}
5 changes: 4 additions & 1 deletion x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
private taskClaiming: TaskClaiming;
private bufferedStore: BufferedTaskStore;
private readonly executionContext: ExecutionContextStart;
private readonly pollIntervalConfiguration$: Observable<number>;

private logger: Logger;
public pool: TaskPool;
Expand Down Expand Up @@ -122,6 +123,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
this.executionContext = executionContext;
this.usageCounter = usageCounter;
this.config = config;
this.pollIntervalConfiguration$ = pollIntervalConfiguration$;

const emitEvent = (event: TaskLifecycleEvent) => this.events$.next(event);

Expand Down Expand Up @@ -220,9 +222,10 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
defaultMaxAttempts: this.taskClaiming.maxAttempts,
executionContext: this.executionContext,
usageCounter: this.usageCounter,
eventLoopDelayConfig: { ...this.config.event_loop_delay },
config: this.config,
allowReadingInvalidState: this.config.allow_reading_invalid_state,
strategy: this.config.claim_strategy,
pollIntervalConfiguration$: this.pollIntervalConfiguration$,
});
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import _ from 'lodash';
import sinon from 'sinon';
import { secondsFromNow } from '../lib/intervals';
import { asOk, asErr } from '../lib/result_type';
import { BehaviorSubject } from 'rxjs';
import {
createTaskRunError,
TaskErrorSource,
Expand Down Expand Up @@ -41,10 +42,13 @@ import {
} from './task_runner';
import { schema } from '@kbn/config-schema';
import { CLAIM_STRATEGY_MGET, CLAIM_STRATEGY_UPDATE_BY_QUERY } from '../config';
import * as nextRunAtUtils from '../lib/get_next_run_at';
import { configMock } from '../config.mock';

const baseDelay = 5 * 60 * 1000;
const executionContext = executionContextServiceMock.createSetupContract();
const minutesFromNow = (mins: number): Date => secondsFromNow(mins * 60);
const getNextRunAtSpy = jest.spyOn(nextRunAtUtils, 'getNextRunAt');

let fakeTimer: sinon.SinonFakeTimers;

Expand Down Expand Up @@ -977,6 +981,8 @@ describe('TaskManagerRunner', () => {
expect(instance.params).toEqual({ a: 'b' });
expect(instance.state).toEqual({ hey: 'there' });
expect(instance.enabled).not.toBeDefined();

expect(getNextRunAtSpy).not.toHaveBeenCalled();
});

test('reschedules tasks that have an schedule', async () => {
Expand Down Expand Up @@ -1007,6 +1013,8 @@ describe('TaskManagerRunner', () => {
expect(instance.runAt.getTime()).toBeGreaterThan(minutesFromNow(9).getTime());
expect(instance.runAt.getTime()).toBeLessThanOrEqual(minutesFromNow(10).getTime());
expect(instance.enabled).not.toBeDefined();

expect(getNextRunAtSpy).toHaveBeenCalled();
});

test('expiration returns time after which timeout will have elapsed from start', async () => {
Expand Down Expand Up @@ -1084,6 +1092,8 @@ describe('TaskManagerRunner', () => {
expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }), {
validate: true,
});

expect(getNextRunAtSpy).not.toHaveBeenCalled();
});

test('reschedules tasks that return a schedule', async () => {
Expand Down Expand Up @@ -1114,6 +1124,11 @@ describe('TaskManagerRunner', () => {
expect(store.update).toHaveBeenCalledWith(expect.objectContaining({ runAt }), {
validate: true,
});

expect(getNextRunAtSpy).toHaveBeenCalledWith(
expect.objectContaining({ schedule }),
expect.any(Number)
);
});

test(`doesn't reschedule recurring tasks that throw an unrecoverable error`, async () => {
Expand Down Expand Up @@ -2479,12 +2494,15 @@ describe('TaskManagerRunner', () => {
onTaskEvent: opts.onTaskEvent,
executionContext,
usageCounter,
eventLoopDelayConfig: {
monitor: true,
warn_threshold: 5000,
},
config: configMock.create({
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
}),
allowReadingInvalidState: opts.allowReadingInvalidState || false,
strategy: opts.strategy ?? CLAIM_STRATEGY_UPDATE_BY_QUERY,
pollIntervalConfiguration$: new BehaviorSubject(500),
});

if (stage === TaskRunningStage.READY_TO_RUN) {
Expand Down
38 changes: 27 additions & 11 deletions x-pack/plugins/task_manager/server/task_running/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* rescheduling, middleware application, etc.
*/

import { Observable } from 'rxjs';
import apm from 'elastic-apm-node';
import { v4 as uuidv4 } from 'uuid';
import { withSpan } from '@kbn/apm-utils';
Expand Down Expand Up @@ -55,9 +56,10 @@ import {
} from '../task';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { isUnrecoverableError } from './errors';
import { CLAIM_STRATEGY_MGET, type EventLoopDelayConfig } from '../config';
import { CLAIM_STRATEGY_MGET, type TaskManagerConfig } from '../config';
import { TaskValidator } from '../task_validator';
import { getRetryAt, getRetryDate, getTimeout } from '../lib/get_retry_at';
import { getNextRunAt } from '../lib/get_next_run_at';

export const EMPTY_RUN_RESULT: SuccessfulRunResult = { state: {} };

Expand Down Expand Up @@ -108,9 +110,10 @@ type Opts = {
defaultMaxAttempts: number;
executionContext: ExecutionContextStart;
usageCounter?: UsageCounter;
eventLoopDelayConfig: EventLoopDelayConfig;
config: TaskManagerConfig;
allowReadingInvalidState: boolean;
strategy: string;
pollIntervalConfiguration$: Observable<number>;
} & Pick<Middleware, 'beforeRun' | 'beforeMarkRunning'>;

export enum TaskRunResult {
Expand Down Expand Up @@ -160,9 +163,10 @@ export class TaskManagerRunner implements TaskRunner {
private uuid: string;
private readonly executionContext: ExecutionContextStart;
private usageCounter?: UsageCounter;
private eventLoopDelayConfig: EventLoopDelayConfig;
private config: TaskManagerConfig;
private readonly taskValidator: TaskValidator;
private readonly claimStrategy: string;
private currentPollInterval: number;

/**
* Creates an instance of TaskManagerRunner.
Expand All @@ -185,9 +189,10 @@ export class TaskManagerRunner implements TaskRunner {
onTaskEvent = identity,
executionContext,
usageCounter,
eventLoopDelayConfig,
config,
allowReadingInvalidState,
strategy,
pollIntervalConfiguration$,
}: Opts) {
this.instance = asPending(sanitizeInstance(instance));
this.definitions = definitions;
Expand All @@ -200,13 +205,17 @@ export class TaskManagerRunner implements TaskRunner {
this.executionContext = executionContext;
this.usageCounter = usageCounter;
this.uuid = uuidv4();
this.eventLoopDelayConfig = eventLoopDelayConfig;
this.config = config;
this.taskValidator = new TaskValidator({
logger: this.logger,
definitions: this.definitions,
allowReadingInvalidState,
});
this.claimStrategy = strategy;
this.currentPollInterval = config.poll_interval;
pollIntervalConfiguration$.subscribe((pollInterval) => {
this.currentPollInterval = pollInterval;
});
}

/**
Expand Down Expand Up @@ -335,7 +344,7 @@ export class TaskManagerRunner implements TaskRunner {
const apmTrans = apm.startTransaction(this.taskType, TASK_MANAGER_RUN_TRANSACTION_TYPE, {
childOf: this.instance.task.traceparent,
});
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring(this.eventLoopDelayConfig);
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring(this.config.event_loop_delay);

// Validate state
const stateValidationResult = this.validateTaskState(this.instance.task);
Expand Down Expand Up @@ -637,13 +646,20 @@ export class TaskManagerRunner implements TaskRunner {
return asOk({ status: TaskStatus.ShouldDelete });
}

const { startedAt, schedule } = this.instance.task;

const updatedTaskSchedule = reschedule ?? this.instance.task.schedule;
return asOk({
runAt:
runAt || intervalFromDate(startedAt!, reschedule?.interval ?? schedule?.interval)!,
runAt ||
getNextRunAt(
{
runAt: this.instance.task.runAt,
startedAt: this.instance.task.startedAt,
schedule: updatedTaskSchedule,
},
this.currentPollInterval
),
state,
schedule: reschedule ?? schedule,
schedule: updatedTaskSchedule,
attempts,
status: TaskStatus.Idle,
});
Expand Down Expand Up @@ -791,7 +807,7 @@ export class TaskManagerRunner implements TaskRunner {

const { eventLoopBlockMs = 0 } = taskTiming;
const taskLabel = `${this.taskType} ${this.instance.task.id}`;
if (eventLoopBlockMs > this.eventLoopDelayConfig.warn_threshold) {
if (eventLoopBlockMs > this.config.event_loop_delay.warn_threshold) {
this.logger.warn(
`event loop blocked for at least ${eventLoopBlockMs} ms while running task ${taskLabel}`,
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ export default function createGetActionErrorLogTests({ getService }: FtrProvider
.send(
getTestRuleData({
rule_type_id: 'test.cumulative-firing',
schedule: { interval: '5s' },
schedule: { interval: '6s' },
actions: [
{
id: createdConnector1.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export const ES_TEST_OUTPUT_INDEX_NAME = `${ES_TEST_INDEX_NAME}-output`;
export const ES_TEST_DATA_STREAM_NAME = 'test-data-stream';

export const RULE_INTERVALS_TO_WRITE = 5;
export const RULE_INTERVAL_SECONDS = 4;
export const RULE_INTERVAL_SECONDS = 6;
export const RULE_INTERVAL_MILLIS = RULE_INTERVAL_SECONDS * 1000;
export const ES_GROUPS_TO_WRITE = 3;

Expand Down
Loading

0 comments on commit 1f673dc

Please sign in to comment.