Skip to content

Commit

Permalink
Moves Task manager's interval under a generic schedule field (#52727)
Browse files Browse the repository at this point in the history
This moves the interval field under a generic schedule object field in preparation for the introduction of richer scheduling options (such as cron).

It includes a migration for existing tasks, and we've ensured no existing Task Type Definitions exist in Kibana that rely on Interval.

This includes support for the deprecated interval field (which gets mapped to schedule) but that support will be removed in 8.0.0, as it's a breaking change.
  • Loading branch information
gmmorris authored Dec 17, 2019
1 parent 74e1d17 commit 2b6ef5c
Show file tree
Hide file tree
Showing 16 changed files with 271 additions and 71 deletions.
6 changes: 3 additions & 3 deletions x-pack/legacy/plugins/task_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ The data stored for a task instance looks something like this:
runAt: "2020-07-24T17:34:35.272Z",

// Indicates that this is a recurring task. We currently only support
// minute syntax `5m` or second syntax `10s`.
interval: '5m',
// interval syntax with either minutes such as `5m` or seconds `10s`.
schedule: { interval: '5m' },

// How many times this task has been unsuccesfully attempted,
// this will be reset to 0 if the task ever succesfully completes.
Expand Down Expand Up @@ -233,7 +233,7 @@ const taskManager = server.plugins.task_manager;
const task = await taskManager.schedule({
taskType,
runAt,
interval,
schedule,
params,
scope: ['my-fanci-app'],
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { ensureDeprecatedFieldsAreCorrected } from './correct_deprecated_fields';
import { mockLogger } from '../test_utils';

describe('ensureDeprecatedFieldsAreCorrected', () => {
test('doesnt change tasks without any schedule fields', async () => {
expect(
ensureDeprecatedFieldsAreCorrected(
{
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
},
mockLogger()
)
).toEqual({
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
});
});
test('doesnt change tasks with the schedule field', async () => {
expect(
ensureDeprecatedFieldsAreCorrected(
{
id: 'my-foo-id',
taskType: 'foo',
schedule: { interval: '10m' },
params: {},
state: {},
},
mockLogger()
)
).toEqual({
id: 'my-foo-id',
taskType: 'foo',
schedule: { interval: '10m' },
params: {},
state: {},
});
});
test('corrects tasks with the deprecated inteval field', async () => {
expect(
ensureDeprecatedFieldsAreCorrected(
{
id: 'my-foo-id',
taskType: 'foo',
interval: '10m',
params: {},
state: {},
},
mockLogger()
)
).toEqual({
id: 'my-foo-id',
taskType: 'foo',
schedule: { interval: '10m' },
params: {},
state: {},
});
});
test('logs a warning when a deprecated inteval is corrected on a task', async () => {
const logger = mockLogger();
ensureDeprecatedFieldsAreCorrected(
{
taskType: 'foo',
interval: '10m',
params: {},
state: {},
},
logger
);
expect(logger.warn).toHaveBeenCalledWith(
`Task of type "foo" has been scheduled with the deprecated 'interval' field which is due to be removed in a future release`
);
});
test('logs a warning when a deprecated inteval is corrected on a task with an id', async () => {
const logger = mockLogger();
ensureDeprecatedFieldsAreCorrected(
{
id: 'my-foo-id',
taskType: 'foo',
interval: '10m',
params: {},
state: {},
},
logger
);
expect(logger.warn).toHaveBeenCalledWith(
`Task "my-foo-id" of type "foo" has been scheduled with the deprecated 'interval' field which is due to be removed in a future release`
);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { TaskInstance, TaskInstanceWithDeprecatedFields } from '../task';
import { Logger } from '../types';

export function ensureDeprecatedFieldsAreCorrected(
{ id, taskType, interval, schedule, ...taskInstance }: TaskInstanceWithDeprecatedFields,
logger: Logger
): TaskInstance {
if (interval) {
logger.warn(
`Task${
id ? ` "${id}"` : ''
} of type "${taskType}" has been scheduled with the deprecated 'interval' field which is due to be removed in a future release`
);
}
return {
id,
taskType,
...taskInstance,
schedule: schedule || (interval ? { interval } : undefined),
};
}
8 changes: 6 additions & 2 deletions x-pack/legacy/plugins/task_manager/mappings.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
"retryAt": {
"type": "date"
},
"interval": {
"type": "text"
"schedule": {
"properties": {
"interval": {
"type": "keyword"
}
}
},
"attempts": {
"type": "integer"
Expand Down
20 changes: 20 additions & 0 deletions x-pack/legacy/plugins/task_manager/migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,25 @@ export const migrations = {
...doc,
updated_at: new Date().toISOString(),
}),
'7.6.0': moveIntervalIntoSchedule,
},
};

function moveIntervalIntoSchedule({
attributes: { interval, ...attributes },
...doc
}: SavedObject) {
return {
...doc,
attributes: {
...attributes,
...(interval
? {
schedule: {
interval,
},
}
: {}),
},
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
updateFields,
IdleTaskWithExpiredRunAt,
RunningOrClaimingTaskWithExpiredRetryAt,
RecuringTaskWithInterval,
TaskWithSchedule,
taskWithLessThanMaxAttempts,
SortByRunAtAndRetryAt,
} from './mark_available_tasks_as_claimed';
Expand Down Expand Up @@ -50,9 +50,9 @@ describe('mark_available_tasks_as_claimed', () => {
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt),
// Either task has an interval or the attempts < the maximum configured
// Either task has an schedule or the attempts < the maximum configured
shouldBeOneOf<ExistsBoolClause | TermBoolClause | RangeBoolClause>(
RecuringTaskWithInterval,
TaskWithSchedule,
...Object.entries(definitions).map(([type, { maxAttempts }]) =>
taskWithLessThanMaxAttempts(type, maxAttempts || defaultMaxAttempts)
)
Expand Down Expand Up @@ -100,11 +100,11 @@ describe('mark_available_tasks_as_claimed', () => {
],
},
},
// Either task has an interval or the attempts < the maximum configured
// Either task has an recurring schedule or the attempts < the maximum configured
{
bool: {
should: [
{ exists: { field: 'task.interval' } },
{ exists: { field: 'task.schedule' } },
{
bool: {
must: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import {
RangeBoolClause,
} from './query_clauses';

export const RecuringTaskWithInterval: ExistsBoolClause = { exists: { field: 'task.interval' } };
export const TaskWithSchedule: ExistsBoolClause = {
exists: { field: 'task.schedule' },
};
export function taskWithLessThanMaxAttempts(
type: string,
maxAttempts: number
Expand Down
27 changes: 24 additions & 3 deletions x-pack/legacy/plugins/task_manager/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ export interface TaskDefinition {
* function can return `true`, `false` or a Date. True will tell task manager
* to retry using default delay logic. False will tell task manager to stop retrying
* this task. Date will suggest when to the task manager the task should retry.
* This function isn't used for interval type tasks, those retry at the next interval.
* This function isn't used for recurring tasks, those retry as per their configured recurring schedule.
*/
getRetry?: (attempts: number, error: object) => boolean | Date;

Expand Down Expand Up @@ -176,6 +176,13 @@ export enum TaskLifecycleResult {

export type TaskLifecycle = TaskStatus | TaskLifecycleResult;

export interface IntervalSchedule {
/**
* An interval in minutes (e.g. '5m'). If specified, this is a recurring task.
* */
interval: string;
}

/*
* A task instance represents all of the data required to store, fetch,
* and execute a task.
Expand Down Expand Up @@ -220,9 +227,11 @@ export interface TaskInstance {
runAt?: Date;

/**
* An interval in minutes (e.g. '5m'). If specified, this is a recurring task.
* A TaskSchedule string, which specifies this as a recurring task.
*
* Currently, this supports a single format: an interval in minutes or seconds (e.g. '5m', '30s').
*/
interval?: string;
schedule?: IntervalSchedule;

/**
* A task-specific set of parameters, used by the task's run function to tailor
Expand Down Expand Up @@ -254,6 +263,18 @@ export interface TaskInstance {
ownerId?: string | null;
}

/**
* Support for the depracated interval field, this should be removed in version 8.0.0
* and marked as a breaking change, ideally nutil then all usage of `interval` will be
* replaced with use of `schedule`
*/
export interface TaskInstanceWithDeprecatedFields extends TaskInstance {
/**
* An interval in minutes (e.g. '5m'). If specified, this is a recurring task.
* */
interval?: string;
}

/**
* A task instance that has an id.
*/
Expand Down
2 changes: 1 addition & 1 deletion x-pack/legacy/plugins/task_manager/task_manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ describe('TaskManager', () => {
conflicts: 'proceed',
},
body:
'{"query":{"bool":{"must":[{"term":{"type":"task"}},{"bool":{"must":[{"bool":{"should":[{"bool":{"must":[{"term":{"task.status":"idle"}},{"range":{"task.runAt":{"lte":"now"}}}]}},{"bool":{"must":[{"bool":{"should":[{"term":{"task.status":"running"}},{"term":{"task.status":"claiming"}}]}},{"range":{"task.retryAt":{"lte":"now"}}}]}}]}},{"bool":{"should":[{"exists":{"field":"task.interval"}},{"bool":{"must":[{"term":{"task.taskType":"vis_telemetry"}},{"range":{"task.attempts":{"lt":3}}}]}},{"bool":{"must":[{"term":{"task.taskType":"lens_telemetry"}},{"range":{"task.attempts":{"lt":3}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.server-log"}},{"range":{"task.attempts":{"lt":1}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.slack"}},{"range":{"task.attempts":{"lt":1}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.email"}},{"range":{"task.attempts":{"lt":1}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.index"}},{"range":{"task.attempts":{"lt":1}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.pagerduty"}},{"range":{"task.attempts":{"lt":1}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.webhook"}},{"range":{"task.attempts":{"lt":1}}}]}}]}}]}}]}},"sort":{"_script":{"type":"number","order":"asc","script":{"lang":"expression","source":"doc[\'task.retryAt\'].value || doc[\'task.runAt\'].value"}}},"seq_no_primary_term":true,"script":{"source":"ctx._source.task.ownerId=params.ownerId; ctx._source.task.status=params.status; ctx._source.task.retryAt=params.retryAt;","lang":"painless","params":{"ownerId":"kibana:5b2de169-2785-441b-ae8c-186a1936b17d","retryAt":"2019-10-31T13:35:43.579Z","status":"claiming"}}}',
'{"query":{"bool":{"must":[{"term":{"type":"task"}},{"bool":{"must":[{"bool":{"should":[{"bool":{"must":[{"term":{"task.status":"idle"}},{"range":{"task.runAt":{"lte":"now"}}}]}},{"bool":{"must":[{"bool":{"should":[{"term":{"task.status":"running"}},{"term":{"task.status":"claiming"}}]}},{"range":{"task.retryAt":{"lte":"now"}}}]}}]}},{"bool":{"should":[{"exists":{"field":"task.schedule"}},{"bool":{"must":[{"term":{"task.taskType":"vis_telemetry"}},{"range":{"task.attempts":{"lt":3}}}]}},{"bool":{"must":[{"term":{"task.taskType":"lens_telemetry"}},{"range":{"task.attempts":{"lt":3}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.server-log"}},{"range":{"task.attempts":{"lt":1}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.slack"}},{"range":{"task.attempts":{"lt":1}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.email"}},{"range":{"task.attempts":{"lt":1}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.index"}},{"range":{"task.attempts":{"lt":1}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.pagerduty"}},{"range":{"task.attempts":{"lt":1}}}]}},{"bool":{"must":[{"term":{"task.taskType":"actions:.webhook"}},{"range":{"task.attempts":{"lt":1}}}]}}]}}]}}]}},"sort":{"_script":{"type":"number","order":"asc","script":{"lang":"expression","source":"doc[\'task.retryAt\'].value || doc[\'task.runAt\'].value"}}},"seq_no_primary_term":true,"script":{"source":"ctx._source.task.ownerId=params.ownerId; ctx._source.task.status=params.status; ctx._source.task.retryAt=params.retryAt;","lang":"painless","params":{"ownerId":"kibana:5b2de169-2785-441b-ae8c-186a1936b17d","retryAt":"2019-10-31T13:35:43.579Z","status":"claiming"}}}',
statusCode: 400,
response:
'{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"cannot execute [inline] scripts"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":true,"failed_shards":[{"shard":0,"index":".kibana_task_manager_1","node":"24A4QbjHSK6prvtopAKLKw","reason":{"type":"illegal_argument_exception","reason":"cannot execute [inline] scripts"}}],"caused_by":{"type":"illegal_argument_exception","reason":"cannot execute [inline] scripts","caused_by":{"type":"illegal_argument_exception","reason":"cannot execute [inline] scripts"}}},"status":400}',
Expand Down
10 changes: 7 additions & 3 deletions x-pack/legacy/plugins/task_manager/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import {
ConcreteTaskInstance,
RunContext,
TaskInstanceWithId,
TaskInstance,
TaskInstanceWithDeprecatedFields,
TaskLifecycle,
TaskLifecycleResult,
TaskStatus,
Expand All @@ -53,6 +53,7 @@ import {
ClaimOwnershipResult,
} from './task_store';
import { identifyEsError } from './lib/identify_es_error';
import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields';

const VERSION_CONFLICT_STATUS = 409;

Expand Down Expand Up @@ -267,11 +268,14 @@ export class TaskManager {
* @param task - The task being scheduled.
* @returns {Promise<ConcreteTaskInstance>}
*/
public async schedule(taskInstance: TaskInstance, options?: any): Promise<ConcreteTaskInstance> {
public async schedule(
taskInstance: TaskInstanceWithDeprecatedFields,
options?: any
): Promise<ConcreteTaskInstance> {
await this.waitUntilStarted();
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
...options,
taskInstance,
taskInstance: ensureDeprecatedFieldsAreCorrected(taskInstance, this.logger),
});
const result = await this.store.schedule(modifiedTask);
this.attemptToRun();
Expand Down
Loading

0 comments on commit 2b6ef5c

Please sign in to comment.