diff --git a/x-pack/legacy/plugins/alerting/server/alerts_client.test.ts b/x-pack/legacy/plugins/alerting/server/alerts_client.test.ts index dc3aaaf5cf23c..3b05fe0f0c74e 100644 --- a/x-pack/legacy/plugins/alerting/server/alerts_client.test.ts +++ b/x-pack/legacy/plugins/alerting/server/alerts_client.test.ts @@ -195,24 +195,25 @@ describe('create()', () => { `); expect(taskManager.schedule).toHaveBeenCalledTimes(1); expect(taskManager.schedule.mock.calls[0]).toMatchInlineSnapshot(` - Array [ - Object { - "params": Object { - "alertId": "1", - "spaceId": "default", - }, - "scope": Array [ - "alerting", - ], - "state": Object { - "alertInstances": Object {}, - "alertTypeState": Object {}, - "previousStartedAt": null, - }, - "taskType": "alerting:123", - }, - ] - `); + Array [ + Object { + "interval": "10s", + "params": Object { + "alertId": "1", + "spaceId": "default", + }, + "scope": Array [ + "alerting", + ], + "state": Object { + "alertInstances": Object {}, + "alertTypeState": Object {}, + "previousStartedAt": null, + }, + "taskType": "alerting:123", + }, + ] + `); expect(savedObjectsClient.update).toHaveBeenCalledTimes(1); expect(savedObjectsClient.update.mock.calls[0]).toHaveLength(3); expect(savedObjectsClient.update.mock.calls[0][0]).toEqual('alert'); @@ -583,6 +584,7 @@ describe('enable()', () => { ); expect(taskManager.schedule).toHaveBeenCalledWith({ taskType: `alerting:2`, + interval: '10s', params: { alertId: '1', spaceId: 'default', @@ -664,6 +666,7 @@ describe('enable()', () => { ); expect(taskManager.schedule).toHaveBeenCalledWith({ taskType: `alerting:2`, + interval: '10s', params: { alertId: '1', spaceId: 'default', @@ -1262,6 +1265,154 @@ describe('update()', () => { `); }); + test('reschedules the Alert Task if the interval is changed', async () => { + const alertsClient = new AlertsClient(alertsClientParams); + alertTypeRegistry.get.mockReturnValueOnce({ + id: '123', + name: 'Test', + actionGroups: ['default'], + async executor() {}, + }); + savedObjectsClient.get.mockResolvedValueOnce({ + id: '1', + type: 'alert', + attributes: { + enabled: true, + alertTypeId: '123', + scheduledTaskId: 'task-123', + }, + references: [], + version: '123', + }); + savedObjectsClient.update.mockResolvedValueOnce({ + id: '1', + type: 'alert', + attributes: { + enabled: true, + interval: '10s', + alertTypeParams: { + bar: true, + }, + actions: [ + { + group: 'default', + actionRef: 'action_0', + params: { + foo: true, + }, + }, + ], + scheduledTaskId: 'task-123', + }, + references: [ + { + name: 'action_0', + type: 'action', + id: '1', + }, + ], + }); + await alertsClient.update({ + id: '1', + data: { + interval: '100s', + name: 'abc', + tags: ['foo'], + alertTypeParams: { + bar: true, + }, + actions: [ + { + group: 'default', + id: '1', + params: { + foo: true, + }, + }, + ], + }, + }); + + expect(taskManager.reschedule.mock.calls[0][0]).toMatchInlineSnapshot(` + Object { + "id": "task-123", + "interval": "100s", + } + `); + }); + + test('does not reschedule the Alert Task if the interval is unchanged', async () => { + const alertsClient = new AlertsClient(alertsClientParams); + alertTypeRegistry.get.mockReturnValueOnce({ + id: '123', + name: 'Test', + actionGroups: ['default'], + async executor() {}, + }); + savedObjectsClient.get.mockResolvedValueOnce({ + id: '1', + type: 'alert', + attributes: { + enabled: true, + interval: '10s', + alertTypeId: '123', + scheduledTaskId: 'task-123', + }, + references: [], + version: '123', + }); + savedObjectsClient.update.mockResolvedValueOnce({ + id: '1', + type: 'alert', + attributes: { + enabled: true, + interval: '10s', + alertTypeParams: { + bar: true, + }, + actions: [ + { + group: 'default', + actionRef: 'action_0', + params: { + foo: true, + }, + }, + ], + scheduledTaskId: 'task-123', + }, + references: [ + { + name: 'action_0', + type: 'action', + id: '1', + }, + ], + }); + await alertsClient.update({ + id: '1', + data: { + interval: '10s', + name: 'abc', + tags: ['foo'], + alertTypeParams: { + bar: true, + }, + actions: [ + { + group: 'default', + id: '1', + params: { + foo: true, + }, + }, + ], + }, + }); + + expect(taskManager.reschedule).not.toHaveBeenCalled(); + }); + it('calls the createApiKey function', async () => { const alertsClient = new AlertsClient(alertsClientParams); alertTypeRegistry.get.mockReturnValueOnce({ diff --git a/x-pack/legacy/plugins/alerting/server/alerts_client.ts b/x-pack/legacy/plugins/alerting/server/alerts_client.ts index c260a754e4594..5b989cd16b1d6 100644 --- a/x-pack/legacy/plugins/alerting/server/alerts_client.ts +++ b/x-pack/legacy/plugins/alerting/server/alerts_client.ts @@ -139,7 +139,7 @@ export class AlertsClient { scheduledTask = await this.scheduleAlert( createdAlert.id, rawAlert.alertTypeId, - rawAlert.interval + data.interval ); } catch (e) { // Cleanup data, something went wrong scheduling the task @@ -223,6 +223,10 @@ export class AlertsClient { references, } ); + if (data.interval !== attributes.interval) { + // if interval has changed, we should reschedule the task + await this.rescheduleAlert(attributes.scheduledTaskId, data.interval); + } return this.getAlertFromRaw(id, updatedObject.attributes, updatedObject.references); } @@ -358,6 +362,7 @@ export class AlertsClient { private async scheduleAlert(id: string, alertTypeId: string, interval: string) { return await this.taskManager.schedule({ taskType: `alerting:${alertTypeId}`, + interval, params: { alertId: id, spaceId: this.spaceId, @@ -371,6 +376,13 @@ export class AlertsClient { }); } + private async rescheduleAlert(scheduledTaskId: string, interval: string) { + return await this.taskManager.reschedule({ + id: scheduledTaskId, + interval, + }); + } + private extractReferences(actions: Alert['actions']) { const references: SavedObjectReference[] = []; const rawActions = actions.map((action, i) => { diff --git a/x-pack/legacy/plugins/alerting/server/lib/get_next_run_at.test.ts b/x-pack/legacy/plugins/alerting/server/lib/get_next_run_at.test.ts deleted file mode 100644 index 852e412689b35..0000000000000 --- a/x-pack/legacy/plugins/alerting/server/lib/get_next_run_at.test.ts +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { getNextRunAt } from './get_next_run_at'; - -const mockedNow = new Date('2019-06-03T18:55:25.982Z'); -(global as any).Date = class Date extends global.Date { - static now() { - return mockedNow.getTime(); - } -}; - -test('Adds interface to given date when result is > Date.now()', () => { - const currentRunAt = new Date('2019-06-03T18:55:23.982Z'); - const result = getNextRunAt(currentRunAt, '10s'); - expect(result).toEqual(new Date('2019-06-03T18:55:33.982Z')); -}); - -test('Uses Date.now() when the result would of been a date in the past', () => { - const currentRunAt = new Date('2019-06-03T18:55:13.982Z'); - const result = getNextRunAt(currentRunAt, '10s'); - expect(result).toEqual(mockedNow); -}); diff --git a/x-pack/legacy/plugins/alerting/server/lib/get_next_run_at.ts b/x-pack/legacy/plugins/alerting/server/lib/get_next_run_at.ts deleted file mode 100644 index 901b614b4d68c..0000000000000 --- a/x-pack/legacy/plugins/alerting/server/lib/get_next_run_at.ts +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { parseDuration } from './parse_duration'; - -export function getNextRunAt(currentRunAt: Date, interval: string) { - let nextRunAt = currentRunAt.getTime() + parseDuration(interval); - if (nextRunAt < Date.now()) { - // To prevent returning dates in the past, we'll return now instead - nextRunAt = Date.now(); - } - return new Date(nextRunAt); -} diff --git a/x-pack/legacy/plugins/alerting/server/lib/task_runner_factory.test.ts b/x-pack/legacy/plugins/alerting/server/lib/task_runner_factory.test.ts index dcc74ed9488ce..92572e625b355 100644 --- a/x-pack/legacy/plugins/alerting/server/lib/task_runner_factory.test.ts +++ b/x-pack/legacy/plugins/alerting/server/lib/task_runner_factory.test.ts @@ -134,7 +134,6 @@ test('successfully executes the task', async () => { const runnerResult = await taskRunner.run(); expect(runnerResult).toMatchInlineSnapshot(` Object { - "runAt": 1970-01-01T00:00:10.000Z, "state": Object { "alertInstances": Object {}, "alertTypeState": undefined, diff --git a/x-pack/legacy/plugins/alerting/server/lib/task_runner_factory.ts b/x-pack/legacy/plugins/alerting/server/lib/task_runner_factory.ts index 66d445f57fe73..a97932b776196 100644 --- a/x-pack/legacy/plugins/alerting/server/lib/task_runner_factory.ts +++ b/x-pack/legacy/plugins/alerting/server/lib/task_runner_factory.ts @@ -9,7 +9,6 @@ import { RunContext } from '../../../task_manager'; import { createExecutionHandler } from './create_execution_handler'; import { createAlertInstanceFactory } from './create_alert_instance_factory'; import { AlertInstance } from './alert_instance'; -import { getNextRunAt } from './get_next_run_at'; import { validateAlertTypeParams } from './validate_alert_type_params'; import { PluginStartContract as EncryptedSavedObjectsStartContract } from '../../../../../plugins/encrypted_saved_objects/server'; import { PluginStartContract as ActionsPluginStartContract } from '../../../actions'; @@ -94,7 +93,7 @@ export class TaskRunnerFactory { const services = getServices(fakeRequest); // Ensure API key is still valid and user has access const { - attributes: { alertTypeParams, actions, interval, throttle, muteAll, mutedInstanceIds }, + attributes: { alertTypeParams, actions, throttle, muteAll, mutedInstanceIds }, references, } = await services.savedObjectsClient.get('alert', alertId); @@ -167,15 +166,12 @@ export class TaskRunnerFactory { }) ); - const nextRunAt = getNextRunAt(new Date(taskInstance.startedAt!), interval); - return { state: { alertTypeState, alertInstances, previousStartedAt: taskInstance.startedAt!, }, - runAt: nextRunAt, }; }, }; diff --git a/x-pack/legacy/plugins/alerting/server/shim.ts b/x-pack/legacy/plugins/alerting/server/shim.ts index 0ee1ef843d7d0..2d844f2c7222a 100644 --- a/x-pack/legacy/plugins/alerting/server/shim.ts +++ b/x-pack/legacy/plugins/alerting/server/shim.ts @@ -40,7 +40,10 @@ export interface Server extends Legacy.Server { /** * Shim what we're thinking setup and start contracts will look like */ -export type TaskManagerStartContract = Pick; +export type TaskManagerStartContract = Pick< + TaskManager, + 'schedule' | 'reschedule' | 'fetch' | 'remove' +>; export type SecurityPluginSetupContract = Pick; export type SecurityPluginStartContract = Pick; export type XPackMainPluginSetupContract = Pick; diff --git a/x-pack/legacy/plugins/task_manager/README.md b/x-pack/legacy/plugins/task_manager/README.md index 744643458e136..49d8a4824c7fc 100644 --- a/x-pack/legacy/plugins/task_manager/README.md +++ b/x-pack/legacy/plugins/task_manager/README.md @@ -265,6 +265,19 @@ The danger is that in such a situation, a Task with that same `id` might already To achieve this you should use the `ensureScheduling` api which has the exact same behavior as `schedule`, except it allows the scheduling of a Task with an `id` that's already in assigned to another Task and it will assume that the existing Task is the one you wished to `schedule`, treating this as a successful operation. +### reschedule +The `reschedule` api allows a developer to reconfigure an existing Task's `runAt` and `interval` fields. +Given the `id` of an existing Task, you may specify either, or both, of these fields and the Task's fields will be updated accordingly, returning the updated Task. + +One thing to note is that this api behaves mildly differently depending on the state of the Task. + +If a request is made to `reschedule` the `interval` field of an `idle` task, the task's field will be updated and the task will run immediately and the new `interval` will be applied to scheduling the _next_ run. +If a request is made to `reschedule` the `runAt` field of an `idle` task, irrespective of whether the `interval` field is also specified, the task's field(s) will be updated and the task will only run when the newly specified `runAt` expires (after which, any new `interval` that may also have been specified, will be applied to the _next_ scheduled run). +These behaviors mirrors how `schedule` treats these two fields. + +Where this behavior diverges is if a request is made to `reschedule` a non `idle` task, such as a `running` task or a `failed` task. +In such a case, `reschedule` will avoid changing the `runAt` field, as it is used internally to manage Task Manager lifecycles and changing that field in such a case could be considered dangerous. `interval` on the other hand, will be updated in such a case, and we recommend using the Task returned by the `reschedule` api to assess whether the fields have been updated as expected. + ### more options More custom access to the tasks can be done directly via Elasticsearch, though that won't be officially supported, as we can change the document structure at any time. diff --git a/x-pack/legacy/plugins/task_manager/lib/async_retry.test.ts b/x-pack/legacy/plugins/task_manager/lib/async_retry.test.ts new file mode 100644 index 0000000000000..c395de9611749 --- /dev/null +++ b/x-pack/legacy/plugins/task_manager/lib/async_retry.test.ts @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import _ from 'lodash'; +import { asyncRetry } from './async_retry'; + +describe('asyncRetry', () => { + test('wraps an async function and resolves when it resolves', async () => { + async function performAsyncAction() { + return true; + } + + return expect(await asyncRetry(performAsyncAction)()).toEqual(true); + }); + + test('catches an error and tried again', async () => { + let count = 0; + async function performAsyncActionFailOnFirst() { + if (++count === 1) { + throw new Error('fail'); + } + return true; + } + + expect(await asyncRetry(performAsyncActionFailOnFirst)()).toEqual(true); + }); + + test('only retries once by default', async () => { + let count = 0; + async function performAsyncActionFailOnFirst() { + if (++count < 3) { + throw new Error('fail'); + } + return true; + } + + return expect(asyncRetry(performAsyncActionFailOnFirst)()).rejects.toMatchInlineSnapshot( + `[Error: fail]` + ); + }); + + test('it takes a custom number of retries', async () => { + let count = 0; + async function performAsyncActionSucceedOnFifth() { + if (++count < 5) { + throw new Error('fail'); + } + return true; + } + + const retries = 4; + + return expect( + await asyncRetry( + performAsyncActionSucceedOnFifth, + _.matchesProperty('message', 'fail'), + retries + )() + ).toEqual(true); + }); + + test('it takes a custom retryOn function which returns when to retry', async () => { + let count = 0; + async function performAsyncActionFailOnFirst() { + if (count++ < 2) { + throw new Error('retry'); + } + return true; + } + + const retryOn = _.matchesProperty('message', 'retry'); + + return expect(await asyncRetry(performAsyncActionFailOnFirst, retryOn, 2)()).toEqual(true); + }); + + test('it throws even if retryOn returns true when retries number has expired', async () => { + async function performAsyncActionFailOnFirst() { + throw new Error('retry'); + } + + const retryOn = _.matchesProperty('message', 'retry'); + + return expect( + asyncRetry(performAsyncActionFailOnFirst, retryOn, 2)() + ).rejects.toMatchInlineSnapshot(`[Error: retry]`); + }); +}); diff --git a/x-pack/legacy/plugins/task_manager/lib/async_retry.ts b/x-pack/legacy/plugins/task_manager/lib/async_retry.ts new file mode 100644 index 0000000000000..8f70d5ea8f832 --- /dev/null +++ b/x-pack/legacy/plugins/task_manager/lib/async_retry.ts @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { after, negate, constant } from 'lodash'; + +const DEFAULT_RETRIES = 1; +const returnFalseAfterXRetries = (retries: number) => negate(after(retries, () => true)); + +type AsyncNullaryFunction = () => Promise; + +export function asyncRetry( + fn: AsyncNullaryFunction, + retryOn: (error: E) => boolean = constant(true), + retries: number = DEFAULT_RETRIES +): AsyncNullaryFunction { + const shouldRetry = returnFalseAfterXRetries(retries + 1); + return async function retry(): Promise { + try { + return await fn(); + } catch (ex) { + if (retryOn(ex) && shouldRetry()) { + return await retry(); + } + throw ex; + } + }; +} diff --git a/x-pack/legacy/plugins/task_manager/plugin.test.ts b/x-pack/legacy/plugins/task_manager/plugin.test.ts index 4f2effb5da3a8..eaeb9ccc5c174 100644 --- a/x-pack/legacy/plugins/task_manager/plugin.test.ts +++ b/x-pack/legacy/plugins/task_manager/plugin.test.ts @@ -46,6 +46,7 @@ describe('Task Manager Plugin', () => { "fetch": [Function], "registerTaskDefinitions": [Function], "remove": [Function], + "reschedule": [Function], "schedule": [Function], } `); diff --git a/x-pack/legacy/plugins/task_manager/plugin.ts b/x-pack/legacy/plugins/task_manager/plugin.ts index 3e1514bd5234f..0fc2101a9ca3f 100644 --- a/x-pack/legacy/plugins/task_manager/plugin.ts +++ b/x-pack/legacy/plugins/task_manager/plugin.ts @@ -11,6 +11,7 @@ export interface PluginSetupContract { fetch: TaskManager['fetch']; remove: TaskManager['remove']; schedule: TaskManager['schedule']; + reschedule: TaskManager['reschedule']; ensureScheduled: TaskManager['ensureScheduled']; addMiddleware: TaskManager['addMiddleware']; registerTaskDefinitions: TaskManager['registerTaskDefinitions']; @@ -60,6 +61,7 @@ export class Plugin { fetch: (...args) => taskManager.fetch(...args), remove: (...args) => taskManager.remove(...args), schedule: (...args) => taskManager.schedule(...args), + reschedule: (...args) => taskManager.reschedule(...args), ensureScheduled: (...args) => taskManager.ensureScheduled(...args), addMiddleware: (...args) => taskManager.addMiddleware(...args), registerTaskDefinitions: (...args) => taskManager.registerTaskDefinitions(...args), diff --git a/x-pack/legacy/plugins/task_manager/task_manager.mock.ts b/x-pack/legacy/plugins/task_manager/task_manager.mock.ts index 515099a8bd479..da99c36d439e0 100644 --- a/x-pack/legacy/plugins/task_manager/task_manager.mock.ts +++ b/x-pack/legacy/plugins/task_manager/task_manager.mock.ts @@ -12,6 +12,7 @@ const createTaskManagerMock = () => { addMiddleware: jest.fn(), ensureScheduled: jest.fn(), schedule: jest.fn(), + reschedule: jest.fn(), fetch: jest.fn(), remove: jest.fn(), start: jest.fn(), diff --git a/x-pack/legacy/plugins/task_manager/task_manager.test.ts b/x-pack/legacy/plugins/task_manager/task_manager.test.ts index 0b4a22910e611..ba80a0a6b042a 100644 --- a/x-pack/legacy/plugins/task_manager/task_manager.test.ts +++ b/x-pack/legacy/plugins/task_manager/task_manager.test.ts @@ -146,7 +146,7 @@ describe('TaskManager', () => { expect(result.id).toEqual('my-foo-id'); }); - test('doesnt ignore failure to scheduling existing tasks for reasons other than already being scheduled', async () => { + test('doesnt ignore failure to schedule existing tasks for reasons other than already being scheduled', async () => { const client = new TaskManager(taskManagerOpts); client.registerTaskDefinitions({ foo: { @@ -200,6 +200,79 @@ describe('TaskManager', () => { }); }); + test('allows and queues rescheduling tasks before starting', async () => { + const savedObjectsRepository = savedObjectsClientMock.create(); + const client = new TaskManager({ + ...taskManagerOpts, + savedObjectsRepository, + }); + + const task = { + id: '1', + state: {}, + }; + + const storedTask = { + id: '1', + type: 'task', + attributes: { + scheduledAt: Date.now(), + attempts: 0, + status: 'idle', + runAt: Date.now(), + state: '{}', + }, + references: [], + }; + savedObjectsRepository.get.mockResolvedValueOnce(storedTask); + savedObjectsRepository.update.mockResolvedValueOnce(storedTask); + + const promise = client.reschedule(task); + + expect(savedObjectsRepository.get).not.toHaveBeenCalled(); + + client.start(); + + await promise; + + expect(savedObjectsRepository.get).toHaveBeenCalledWith('task', task.id); + }); + + test('allows rescheduling tasks after starting', async () => { + const savedObjectsRepository = savedObjectsClientMock.create(); + const client = new TaskManager({ + ...taskManagerOpts, + savedObjectsRepository, + }); + + client.start(); + + const task = { + id: '1', + state: {}, + }; + + const storedTask = { + id: '1', + type: 'task', + attributes: { + scheduledAt: Date.now(), + attempts: 0, + status: 'idle', + runAt: Date.now(), + state: '{}', + }, + references: [], + }; + + savedObjectsRepository.get.mockResolvedValueOnce(storedTask); + savedObjectsRepository.update.mockResolvedValueOnce(storedTask); + + await client.reschedule(task); + + expect(savedObjectsRepository.get).toHaveBeenCalledWith('task', task.id); + }); + test('allows and queues removing tasks before starting', async () => { const client = new TaskManager(taskManagerOpts); savedObjectsClient.delete.mockResolvedValueOnce({}); diff --git a/x-pack/legacy/plugins/task_manager/task_manager.ts b/x-pack/legacy/plugins/task_manager/task_manager.ts index 269d7ff67384b..18aac81eb7a7d 100644 --- a/x-pack/legacy/plugins/task_manager/task_manager.ts +++ b/x-pack/legacy/plugins/task_manager/task_manager.ts @@ -26,6 +26,7 @@ import { FetchResult, TaskStore, OwnershipClaimingOpts, + TaskReschedulingOpts, ClaimOwnershipResult, } from './task_store'; import { identifyEsError } from './lib/identify_es_error'; @@ -217,7 +218,20 @@ export class TaskManager { ...options, taskInstance, }); - const result = await this.store.schedule(modifiedTask); + const result = await this.store.scheduleTask(modifiedTask); + this.poller.attemptWork(); + return result; + } + + /** + * Reschedules a task. + * + * @param taskInstanceUpdate - The task being rescheduled with its id and any fields you wish to update. + * @returns {Promise} + */ + public async reschedule(taskInstance: TaskReschedulingOpts): Promise { + await this.waitUntilStarted(); + const result = await this.store.rescheduleTask(taskInstance); this.poller.attemptWork(); return result; } @@ -250,7 +264,7 @@ export class TaskManager { */ public async fetch(opts: FetchOpts): Promise { await this.waitUntilStarted(); - return this.store.fetch(opts); + return this.store.fetchTasks(opts); } /** @@ -261,7 +275,7 @@ export class TaskManager { */ public async remove(id: string): Promise { await this.waitUntilStarted(); - return this.store.remove(id); + return this.store.removeTask(id); } /** diff --git a/x-pack/legacy/plugins/task_manager/task_runner.test.ts b/x-pack/legacy/plugins/task_manager/task_runner.test.ts index 578b86ba0b3f6..dc546910d65f7 100644 --- a/x-pack/legacy/plugins/task_manager/task_runner.test.ts +++ b/x-pack/legacy/plugins/task_manager/task_runner.test.ts @@ -75,8 +75,8 @@ describe('TaskManagerRunner', () => { await runner.run(); - sinon.assert.calledOnce(store.update); - const instance = store.update.args[0][0]; + sinon.assert.calledOnce(store.updateTask); + const instance = store.updateTask.args[0][0]; expect(instance.id).toEqual(id); expect(instance.runAt.getTime()).toEqual(minutesFromNow(initialAttempts * 5).getTime()); @@ -104,8 +104,8 @@ describe('TaskManagerRunner', () => { await runner.run(); - sinon.assert.calledOnce(store.update); - const instance = store.update.args[0][0]; + sinon.assert.calledOnce(store.updateTask); + const instance = store.updateTask.args[0][0]; expect(instance.runAt.getTime()).toBeGreaterThan(minutesFromNow(9).getTime()); expect(instance.runAt.getTime()).toBeLessThanOrEqual(minutesFromNow(10).getTime()); @@ -127,8 +127,8 @@ describe('TaskManagerRunner', () => { await runner.run(); - sinon.assert.calledOnce(store.update); - sinon.assert.calledWithMatch(store.update, { runAt }); + sinon.assert.calledOnce(store.updateTask); + sinon.assert.calledWithMatch(store.updateTask, { runAt }); }); test('tasks that return runAt override interval', async () => { @@ -150,8 +150,8 @@ describe('TaskManagerRunner', () => { await runner.run(); - sinon.assert.calledOnce(store.update); - sinon.assert.calledWithMatch(store.update, { runAt }); + sinon.assert.calledOnce(store.updateTask); + sinon.assert.calledWithMatch(store.updateTask, { runAt }); }); test('removes non-recurring tasks after they complete', async () => { @@ -174,8 +174,8 @@ describe('TaskManagerRunner', () => { await runner.run(); - sinon.assert.calledOnce(store.remove); - sinon.assert.calledWith(store.remove, id); + sinon.assert.calledOnce(store.removeTask); + sinon.assert.calledWith(store.removeTask, id); }); test('cancel cancels the task runner, if it is cancellable', async () => { @@ -249,8 +249,8 @@ describe('TaskManagerRunner', () => { await runner.markTaskAsRunning(); - sinon.assert.calledOnce(store.update); - const instance = store.update.args[0][0]; + sinon.assert.calledOnce(store.updateTask); + const instance = store.updateTask.args[0][0]; expect(instance.attempts).toEqual(initialAttempts + 1); expect(instance.status).toBe('running'); @@ -285,9 +285,9 @@ describe('TaskManagerRunner', () => { await runner.run(); - sinon.assert.calledOnce(store.update); + sinon.assert.calledOnce(store.updateTask); sinon.assert.calledWith(getRetryStub, initialAttempts, error); - const instance = store.update.args[0][0]; + const instance = store.updateTask.args[0][0]; expect(instance.runAt.getTime()).toEqual(nextRetry.getTime()); }); @@ -316,9 +316,9 @@ describe('TaskManagerRunner', () => { await runner.run(); - sinon.assert.calledOnce(store.update); + sinon.assert.calledOnce(store.updateTask); sinon.assert.calledWith(getRetryStub, initialAttempts, error); - const instance = store.update.args[0][0]; + const instance = store.updateTask.args[0][0]; const expectedRunAt = new Date(Date.now() + initialAttempts * 5 * 60 * 1000); expect(instance.runAt.getTime()).toEqual(expectedRunAt.getTime()); @@ -348,9 +348,9 @@ describe('TaskManagerRunner', () => { await runner.run(); - sinon.assert.calledOnce(store.update); + sinon.assert.calledOnce(store.updateTask); sinon.assert.calledWith(getRetryStub, initialAttempts, error); - const instance = store.update.args[0][0]; + const instance = store.updateTask.args[0][0]; expect(instance.status).toBe('failed'); }); @@ -381,9 +381,9 @@ describe('TaskManagerRunner', () => { await runner.run(); - sinon.assert.calledOnce(store.update); + sinon.assert.calledOnce(store.updateTask); sinon.assert.notCalled(getRetryStub); - const instance = store.update.args[0][0]; + const instance = store.updateTask.args[0][0]; const nextIntervalDelay = 60000; // 1m const expectedRunAt = new Date(Date.now() + nextIntervalDelay); @@ -415,9 +415,9 @@ describe('TaskManagerRunner', () => { await runner.markTaskAsRunning(); - sinon.assert.calledOnce(store.update); + sinon.assert.calledOnce(store.updateTask); sinon.assert.calledWith(getRetryStub, initialAttempts + 1); - const instance = store.update.args[0][0]; + const instance = store.updateTask.args[0][0]; expect(instance.retryAt.getTime()).toEqual( new Date(nextRetry.getTime() + timeoutMinutes * 60 * 1000).getTime() @@ -447,7 +447,7 @@ describe('TaskManagerRunner', () => { }, }); - store.update = sinon + store.updateTask = sinon .stub() .throws( SavedObjectsErrorHelpers.decorateConflictError(new Error('repo error')).output.payload @@ -479,7 +479,7 @@ describe('TaskManagerRunner', () => { }, }); - store.update = sinon + store.updateTask = sinon .stub() .throws(SavedObjectsErrorHelpers.createGenericNotFoundError('type', 'id').output.payload); @@ -516,9 +516,9 @@ describe('TaskManagerRunner', () => { await runner.markTaskAsRunning(); - sinon.assert.calledOnce(store.update); + sinon.assert.calledOnce(store.updateTask); sinon.assert.calledWith(getRetryStub, initialAttempts + 1); - const instance = store.update.args[0][0]; + const instance = store.updateTask.args[0][0]; const attemptDelay = (initialAttempts + 1) * 5 * 60 * 1000; const timeoutDelay = timeoutMinutes * 60 * 1000; @@ -551,9 +551,9 @@ describe('TaskManagerRunner', () => { await runner.markTaskAsRunning(); - sinon.assert.calledOnce(store.update); + sinon.assert.calledOnce(store.updateTask); sinon.assert.calledWith(getRetryStub, initialAttempts + 1); - const instance = store.update.args[0][0]; + const instance = store.updateTask.args[0][0]; expect(instance.retryAt).toBeNull(); expect(instance.status).toBe('running'); @@ -584,9 +584,9 @@ describe('TaskManagerRunner', () => { await runner.markTaskAsRunning(); - sinon.assert.calledOnce(store.update); + sinon.assert.calledOnce(store.updateTask); sinon.assert.notCalled(getRetryStub); - const instance = store.update.args[0][0]; + const instance = store.updateTask.args[0][0]; const timeoutDelay = timeoutMinutes * 60 * 1000; expect(instance.retryAt.getTime()).toEqual(new Date(Date.now() + timeoutDelay).getTime()); @@ -615,8 +615,8 @@ describe('TaskManagerRunner', () => { await runner.run(); - sinon.assert.calledOnce(store.update); - const instance = store.update.args[0][0]; + sinon.assert.calledOnce(store.updateTask); + const instance = store.updateTask.args[0][0]; expect(instance.attempts).toEqual(3); expect(instance.status).toEqual('failed'); expect(instance.retryAt).toBeNull(); @@ -648,8 +648,8 @@ describe('TaskManagerRunner', () => { await runner.run(); - sinon.assert.calledOnce(store.update); - const instance = store.update.args[0][0]; + sinon.assert.calledOnce(store.updateTask); + const instance = store.updateTask.args[0][0]; expect(instance.attempts).toEqual(3); expect(instance.status).toEqual('idle'); expect(instance.runAt.getTime()).toEqual( @@ -667,8 +667,8 @@ describe('TaskManagerRunner', () => { const createTaskRunner = sinon.stub(); const logger = mockLogger(); const store = { - update: sinon.stub(), - remove: sinon.stub(), + updateTask: sinon.stub(), + removeTask: sinon.stub(), maxAttempts: 5, }; const runner = new TaskManagerRunner({ diff --git a/x-pack/legacy/plugins/task_manager/task_runner.ts b/x-pack/legacy/plugins/task_manager/task_runner.ts index 9d1431ed004e3..27e28e15d4b58 100644 --- a/x-pack/legacy/plugins/task_manager/task_runner.ts +++ b/x-pack/legacy/plugins/task_manager/task_runner.ts @@ -38,8 +38,8 @@ export interface TaskRunner { interface Updatable { readonly maxAttempts: number; - update(doc: ConcreteTaskInstance): Promise; - remove(id: string): Promise; + updateTask(doc: ConcreteTaskInstance): Promise; + removeTask(id: string): Promise; } interface Opts { @@ -181,7 +181,7 @@ export class TaskManagerRunner implements TaskRunner { ); } - this.instance = await this.bufferedTaskStore.update({ + this.instance = await this.bufferedTaskStore.updateTask({ ...taskInstance, status: 'running', startedAt: now, @@ -274,7 +274,7 @@ export class TaskManagerRunner implements TaskRunner { runAt = intervalFromDate(startedAt, this.instance.interval)!; } - await this.bufferedTaskStore.update({ + await this.bufferedTaskStore.updateTask({ ...this.instance, runAt, state, @@ -291,7 +291,7 @@ export class TaskManagerRunner implements TaskRunner { private async processResultWhenDone(result: RunResult): Promise { // not a recurring task: clean up by removing the task instance from store try { - await this.bufferedTaskStore.remove(this.instance.id); + await this.bufferedTaskStore.removeTask(this.instance.id); } catch (err) { if (err.statusCode === 404) { this.logger.warn(`Task cleanup of ${this} failed in processing. Was remove called twice?`); diff --git a/x-pack/legacy/plugins/task_manager/task_store.test.ts b/x-pack/legacy/plugins/task_manager/task_store.test.ts index 46efc4bb57ba7..672c51fbbbdbd 100644 --- a/x-pack/legacy/plugins/task_manager/task_store.test.ts +++ b/x-pack/legacy/plugins/task_manager/task_store.test.ts @@ -7,10 +7,29 @@ import _ from 'lodash'; import sinon from 'sinon'; import uuid from 'uuid'; -import { TaskDictionary, TaskDefinition, TaskInstance, TaskStatus } from './task'; -import { FetchOpts, StoreOpts, OwnershipClaimingOpts, TaskStore } from './task_store'; +import { + TaskDictionary, + TaskDefinition, + TaskInstance, + TaskStatus, + ConcreteTaskInstance, +} from './task'; + +import { + FetchOpts, + StoreOpts, + OwnershipClaimingOpts, + TaskReschedulingOpts, + TaskStore, +} from './task_store'; import { savedObjectsClientMock } from 'src/core/server/mocks'; -import { SavedObjectsSerializer, SavedObjectsSchema, SavedObjectAttributes } from 'src/core/server'; +import { SavedObjectsErrorHelpers } from 'src/core/server/saved_objects/service/lib/errors'; +import { + SavedObjectsSerializer, + SavedObjectsSchema, + SavedObjectAttributes, + SavedObject, +} from 'src/core/server'; const taskDefinitions: TaskDictionary = { report: { @@ -36,9 +55,19 @@ const serializer = new SavedObjectsSerializer(new SavedObjectsSchema()); beforeEach(() => jest.resetAllMocks()); const mockedDate = new Date('2019-02-12T21:01:22.479Z'); + +const anHourAgo = new Date(); +anHourAgo.setHours(anHourAgo.getHours() - 1); + +const anHourInTheFuture = new Date(); +anHourInTheFuture.setHours(anHourInTheFuture.getHours() + 1); + +const NodeDate = (global as any).Date; + (global as any).Date = class Date { - constructor() { - return mockedDate; + constructor(date?: string) { + // use mock for empty Date()s, but not when wrapping a date string + return date ? new NodeDate(date) : mockedDate; } static now() { return mockedDate.getTime(); @@ -67,7 +96,7 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, }); - const result = await store.schedule(task); + const result = await store.scheduleTask(task); expect(savedObjectsClient.create).toHaveBeenCalledTimes(1); @@ -159,6 +188,402 @@ describe('TaskStore', () => { }); }); + describe('reschedule', () => { + async function testReschedule( + taskBeingRescheduled: TaskReschedulingOpts, + taskCurrentlyInStore: ConcreteTaskInstance, + taskInStoreAfterUpdate: ConcreteTaskInstance = taskCurrentlyInStore + ) { + const callCluster = jest.fn(); + + savedObjectsClient.get.mockImplementation(async (type: string, id: string) => + concreteTaskInstanceAsSavedObject(id, type, taskCurrentlyInStore) + ); + + savedObjectsClient.update.mockImplementation( + async (type: string, id: string, attributes: SavedObjectAttributes) => { + return concreteTaskInstanceAsSavedObject(id, type, taskInStoreAfterUpdate); + } + ); + + const store = new TaskStore({ + index: 'tasky', + taskManagerId: '', + serializer, + callCluster, + maxAttempts: 2, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + }); + + const result = await store.rescheduleTask(taskBeingRescheduled); + + expect(savedObjectsClient.get).toHaveBeenCalledTimes(1); + expect(savedObjectsClient.update).toHaveBeenCalledTimes(1); + + return result; + } + + test('rescheduling returns the updated task', async () => { + const taskReturnedByUpdate: ConcreteTaskInstance = { + id: 'myTask', + taskType: 'foo', + version: '1f672d50-9fd8-44fa-b42c-1a5d9982dec7', + scheduledAt: mockedDate, + attempts: 0, + status: 'idle', + runAt: mockedDate, + startedAt: mockedDate, + retryAt: null, + interval: '10m', + state: { + field: 123, + }, + params: {}, + ownerId: null, + }; + + const result = await testReschedule( + // Task Reschedule updated fields + { + id: 'myTask', + interval: '10m', + }, + // Current Task in Store + { + id: 'myTask', + taskType: 'foo', + version: '5a2a3c3e-a5a9-44ff-8aec-c3790c69b081', + scheduledAt: mockedDate, + attempts: 0, + status: 'idle', + interval: '5m', + runAt: mockedDate, + startedAt: mockedDate, + retryAt: null, + state: { field: 456 }, + params: {}, + ownerId: null, + }, + // Return from update of Task in Store + taskReturnedByUpdate + ); + + expect(result).toMatchObject(taskReturnedByUpdate); + }); + + test('resets runAt when an interval is specified on rescheduled task, even if no runAt has been specified', async () => { + const updatedInterval = 90; + + await testReschedule( + // Task Reschedule updated fields + { + id: 'myTask', + interval: `${updatedInterval}m`, + }, + // Current Task in Store + { + id: 'myTask', + taskType: 'foo', + version: '5a2a3c3e-a5a9-44ff-8aec-c3790c69b081', + scheduledAt: anHourAgo, + attempts: 0, + status: 'idle', + interval: undefined, + runAt: anHourInTheFuture, + startedAt: null, + retryAt: null, + state: {}, + params: {}, + ownerId: null, + } + ); + + const [, , attributes] = savedObjectsClient.update.mock.calls[0]; + + expect(new Date(attributes.runAt as string).getTime()).toEqual(mockedDate.getTime()); + }); + + test('when both a runAt and an interval are specified on rescheduled task, the runAt will be used', async () => { + await testReschedule( + // Task Reschedule updated fields + { + id: 'myTask', + interval: `90m`, + runAt: anHourInTheFuture, + }, + // Current Task in Store + { + id: 'myTask', + taskType: 'foo', + version: '5a2a3c3e-a5a9-44ff-8aec-c3790c69b081', + scheduledAt: mockedDate, + attempts: 0, + status: 'idle', + interval: undefined, + runAt: anHourInTheFuture, + startedAt: null, + retryAt: null, + state: {}, + params: {}, + ownerId: null, + } + ); + + const [, , attributes] = savedObjectsClient.update.mock.calls[0]; + + expect(new Date(attributes.runAt as string).getTime()).toEqual(anHourInTheFuture.getTime()); + expect(attributes.interval as string).toEqual(`90m`); + }); + + test('rescheduling a running task retains the current runAt', async () => { + await testReschedule( + // Task Reschedule updated fields + { + id: 'myTask', + interval: '15m', + runAt: anHourInTheFuture, + }, + // Current Task in Store + { + id: 'myTask', + taskType: 'foo', + version: '5a2a3c3e-a5a9-44ff-8aec-c3790c69b081', + scheduledAt: anHourAgo, + attempts: 0, + status: 'running', + interval: '5m', + runAt: anHourAgo, + startedAt: anHourAgo, + retryAt: anHourInTheFuture, + state: { field: 456 }, + params: {}, + ownerId: null, + } + ); + + const [, id, attributes] = savedObjectsClient.update.mock.calls[0]; + expect(id).toEqual('myTask'); + expect(attributes).toMatchObject({ + interval: '15m', + runAt: anHourAgo.toISOString(), + startedAt: anHourAgo.toISOString(), + retryAt: anHourInTheFuture.toISOString(), + }); + }); + + test('when rescheduling a task fails, it retries', async () => { + const taskBeingRescheduled: TaskReschedulingOpts = { + id: 'myTask', + interval: '15m', + runAt: anHourInTheFuture, + }; + const taskCurrentlyInStore: ConcreteTaskInstance = { + id: 'myTask', + taskType: 'foo', + version: '5a2a3c3e-a5a9-44ff-8aec-c3790c69b081', + scheduledAt: anHourAgo, + attempts: 0, + status: 'running', + interval: '5m', + runAt: anHourAgo, + startedAt: anHourAgo, + retryAt: anHourInTheFuture, + state: { field: 456 }, + params: {}, + ownerId: null, + }; + + const taskReturnedByUpdate: ConcreteTaskInstance = { + id: 'myTask', + taskType: 'foo', + version: '1f672d50-9fd8-44fa-b42c-1a5d9982dec7', + scheduledAt: mockedDate, + attempts: 0, + status: 'idle', + runAt: mockedDate, + startedAt: mockedDate, + retryAt: null, + interval: '10m', + state: { + field: 123, + }, + params: {}, + ownerId: null, + }; + + savedObjectsClient.get.mockImplementation(async (type: string, id: string) => + concreteTaskInstanceAsSavedObject(id, type, taskCurrentlyInStore) + ); + + savedObjectsClient.update + // throw version conflict on first update + .mockImplementationOnce(async (type: string, id: string) => { + throw SavedObjectsErrorHelpers.decorateConflictError(new Error('version conflict')).output + .payload; + }) + // succeed on second update + .mockImplementationOnce(async (type: string, id: string) => + concreteTaskInstanceAsSavedObject(id, type, taskReturnedByUpdate) + ); + + const store = new TaskStore({ + index: 'tasky', + taskManagerId: '', + serializer, + callCluster: jest.fn(), + maxAttempts: 2, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + }); + + const result = await store.rescheduleTask(taskBeingRescheduled); + + expect(savedObjectsClient.update).toHaveBeenCalledTimes(2); + + expect(result).toMatchObject(taskReturnedByUpdate); + }); + + test('when rescheduling a task fails, it only retries twice, then gives up', async () => { + savedObjectsClient.get.mockImplementation(async (type: string, id: string) => + concreteTaskInstanceAsSavedObject(id, type, { + id: 'myTask', + taskType: 'foo', + version: '5a2a3c3e-a5a9-44ff-8aec-c3790c69b081', + scheduledAt: anHourAgo, + attempts: 0, + status: 'running', + interval: '5m', + runAt: anHourAgo, + startedAt: anHourAgo, + retryAt: anHourInTheFuture, + state: { field: 456 }, + params: {}, + ownerId: null, + }) + ); + + savedObjectsClient.update + // throw version conflict on all updates + .mockImplementation(async (type: string, id: string) => { + throw SavedObjectsErrorHelpers.decorateConflictError(new Error('version conflict')).output + .payload; + }); + + const store = new TaskStore({ + index: 'tasky', + taskManagerId: '', + serializer, + callCluster: jest.fn(), + maxAttempts: 2, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + }); + + expect.assertions(2); + try { + await store.rescheduleTask({ + id: 'myTask', + interval: '15m', + runAt: anHourInTheFuture, + }); + } catch (ex) { + expect(savedObjectsClient.update).toHaveBeenCalledTimes(3); + expect(ex).toMatchInlineSnapshot(` + Object { + "error": "Conflict", + "message": "version conflict", + "statusCode": 409, + } + `); + } + }); + + test('when rescheduling a task fails, it doesnt retry for non version conflict errors', async () => { + savedObjectsClient.get.mockImplementation(async (type: string, id: string) => + concreteTaskInstanceAsSavedObject(id, type, { + id: 'myTask', + taskType: 'foo', + version: '5a2a3c3e-a5a9-44ff-8aec-c3790c69b081', + scheduledAt: anHourAgo, + attempts: 0, + status: 'running', + interval: '5m', + runAt: anHourAgo, + startedAt: anHourAgo, + retryAt: anHourInTheFuture, + state: { field: 456 }, + params: {}, + ownerId: null, + }) + ); + + savedObjectsClient.update + // throw server error + .mockImplementation(async (type: string, id: string) => { + throw SavedObjectsErrorHelpers.decorateEsUnavailableError(new Error('ES unavailable')) + .output.payload; + }); + + const store = new TaskStore({ + index: 'tasky', + taskManagerId: '', + serializer, + callCluster: jest.fn(), + maxAttempts: 2, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + }); + + expect.assertions(2); + try { + await store.rescheduleTask({ + id: 'myTask', + interval: '15m', + runAt: anHourInTheFuture, + }); + } catch (ex) { + expect(savedObjectsClient.update).toHaveBeenCalledTimes(1); + expect(ex).toMatchInlineSnapshot(` + Object { + "error": "Service Unavailable", + "message": "ES unavailable", + "statusCode": 503, + } + `); + } + }); + }); + + describe('getTask', () => { + test(`gets a specific task by it's ID`, async () => { + const id = `id-${_.random(1, 20)}`; + const callCluster = jest.fn(); + const store = new TaskStore({ + index: 'tasky', + taskManagerId: '', + serializer, + callCluster, + maxAttempts: 2, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + }); + + savedObjectsClient.get.mockResolvedValueOnce( + Promise.resolve({ + id, + type: 'task', + attributes: {}, + references: [], + }) + ); + + const result: ConcreteTaskInstance = await store.getTask(id); + expect(result).toMatchObject({ id }); + expect(savedObjectsClient.get).toHaveBeenCalledWith('task', id); + }); + }); + describe('fetch', () => { async function testFetch(opts?: FetchOpts, hits: any[] = []) { const callCluster = sinon.spy(async (name: string, params?: any) => ({ hits: { hits } })); @@ -172,7 +597,7 @@ describe('TaskStore', () => { savedObjectsRepository: savedObjectsClient, }); - const result = await store.fetch(opts); + const result = await store.fetchTasks(opts); sinon.assert.calledOnce(callCluster); sinon.assert.calledWith(callCluster, 'search'); @@ -683,7 +1108,7 @@ describe('TaskStore', () => { savedObjectsRepository: savedObjectsClient, }); - const result = await store.update(task); + const result = await store.updateTask(task); expect(savedObjectsClient.update).toHaveBeenCalledWith( 'task', @@ -731,7 +1156,7 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, }); - const result = await store.remove(id); + const result = await store.removeTask(id); expect(result).toBeUndefined(); expect(savedObjectsClient.delete).toHaveBeenCalledWith('task', id); }); @@ -750,3 +1175,28 @@ function generateFakeTasks(count: number = 1) { sort: ['a', _.random(1, 5)], })); } + +function concreteTaskInstanceAsSavedObject( + id: string, + type: string, + taskInstance: ConcreteTaskInstance +): SavedObject { + return { + id, + type: 'task', + references: [], + version: taskInstance.version, + attributes: { + ...taskInstance, + ..._.mapValues( + _.pick(taskInstance, ['scheduledAt', 'startedAt', 'retryAt', 'runAt']), + (value: Date) => { + return value ? value.toISOString() : value; + } + ), + ..._.mapValues(_.pick(taskInstance, ['state', 'params']), (data: Record) => { + return JSON.stringify(data); + }), + }, + }; +} diff --git a/x-pack/legacy/plugins/task_manager/task_store.ts b/x-pack/legacy/plugins/task_manager/task_store.ts index 58bffd2269eb6..83931f86505ab 100644 --- a/x-pack/legacy/plugins/task_manager/task_store.ts +++ b/x-pack/legacy/plugins/task_manager/task_store.ts @@ -8,7 +8,7 @@ * This module contains helpers for managing the task manager storage layer. */ -import { omit } from 'lodash'; +import { omit, pick, defaults, mapValues, isPlainObject, isDate, flow } from 'lodash'; import { SavedObjectsClientContract, SavedObject, @@ -22,8 +22,15 @@ import { TaskDefinition, TaskDictionary, TaskInstance, + TaskInstanceWithId, } from './task'; +import { asyncRetry } from './lib/async_retry'; + +interface DecoratedError { + statusCode: number; +} + export interface StoreOpts { callCluster: ElasticJs; index: string; @@ -81,6 +88,11 @@ export interface UpdateByQueryResult { total: number; } +/** + * The Scheduling fields of a task instance that has an id. + */ +export type TaskReschedulingOpts = Pick; + /** * Wraps an elasticsearch connection and provides a task manager-specific * interface into the index. @@ -119,7 +131,7 @@ export class TaskStore { * * @param task - The task being scheduled. */ - public async schedule(taskInstance: TaskInstance): Promise { + public async scheduleTask(taskInstance: TaskInstance): Promise { if (!this.definitions[taskInstance.taskType]) { throw new Error( `Unsupported task type "${taskInstance.taskType}". Supported types are ${Object.keys( @@ -137,12 +149,57 @@ export class TaskStore { return savedObjectToConcreteTaskInstance(savedObject); } + /** + * Reschedules a previously scheduled task. + * + * @param task - The task being rescheduled. + */ + public async rescheduleTask( + taskInstanceScheduling: TaskReschedulingOpts + ): Promise { + return asyncRetry( + () => this.getTaskAndReschedule(taskInstanceScheduling), + // if we experience a document version conflict, we'll give it another attempt + error => error.statusCode === 409, + // after two retries, let the error bubble up + 2 + )(); + } + + private async getTaskAndReschedule( + taskInstanceScheduling: TaskReschedulingOpts + ): Promise { + const taskInstance = await this.getTask(taskInstanceScheduling.id); + return await this.updateTask( + applyConcreteTaskInstanceDefaults( + taskInstance.status === 'idle' + ? { + ...omit(taskInstance, 'runAt'), + ...taskInstanceScheduling, + } + : { + ...taskInstance, + ...omit(taskInstanceScheduling, 'runAt'), + } + ) + ); + } + + /** + * Get a task by id + * + * @param task - The task being scheduled. + */ + public async getTask(id: string): Promise { + return savedObjectToConcreteTaskInstance(await this.savedObjectsRepository.get('task', id)); + } + /** * Fetches a paginatable list of scheduled tasks. * * @param opts - The query options used to filter tasks */ - public async fetch(opts: FetchOpts = {}): Promise { + public async fetchTasks(opts: FetchOpts = {}): Promise { const sort = paginatableSort(opts.sort); return this.search({ sort, @@ -305,7 +362,7 @@ export class TaskStore { * @param {TaskDoc} doc * @returns {Promise} */ - public async update(doc: ConcreteTaskInstance): Promise { + public async updateTask(doc: ConcreteTaskInstance): Promise { const updatedSavedObject = await this.savedObjectsRepository.update( 'task', doc.id, @@ -325,7 +382,7 @@ export class TaskStore { * @param {string} id * @returns {Promise} */ - public async remove(id: string): Promise { + public async removeTask(id: string): Promise { await this.savedObjectsRepository.delete('task', id); } @@ -392,20 +449,35 @@ function paginatableSort(sort: any[] = []) { return [...sort, sortById]; } -function taskInstanceToAttributes(doc: TaskInstance): SavedObjectAttributes { +const taskInstanceToAttributes = flow( + applyConcreteTaskInstanceDefaults, + serializeTaskInstanceFields +); + +function serializeTaskInstanceFields(doc: ConcreteTaskInstance): SavedObjectAttributes { + const { scheduledAt, runAt, startedAt, retryAt, params, state } = doc; return { ...omit(doc, 'id', 'version'), - params: JSON.stringify(doc.params || {}), - state: JSON.stringify(doc.state || {}), - attempts: (doc as ConcreteTaskInstance).attempts || 0, - scheduledAt: (doc.scheduledAt || new Date()).toISOString(), - startedAt: (doc.startedAt && doc.startedAt.toISOString()) || null, - retryAt: (doc.retryAt && doc.retryAt.toISOString()) || null, - runAt: (doc.runAt || new Date()).toISOString(), - status: (doc as ConcreteTaskInstance).status || 'idle', + ...mapValues(pick({ params, state }, isPlainObject), objectProp => JSON.stringify(objectProp)), + ...mapValues(pick({ scheduledAt, runAt, startedAt, retryAt }, isDate), (dateProp: Date) => + dateProp.toISOString() + ), }; } +function applyConcreteTaskInstanceDefaults(doc: TaskInstance): ConcreteTaskInstance { + return defaults(doc, { + params: {}, + state: {}, + attempts: 0, + scheduledAt: new Date(), + runAt: new Date(), + startedAt: null, + retryAt: null, + status: 'idle', + }); +} + function savedObjectToConcreteTaskInstance( savedObject: Omit ): ConcreteTaskInstance { diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/alerts.ts b/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/alerts.ts index c43e159bbe8ca..4f7c8d92e731c 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/alerts.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/alerts.ts @@ -247,6 +247,57 @@ export default function alertTests({ getService }: FtrProviderContext) { } }); + it('should set the Alert interval on the scheduled Task when appropriate', async () => { + const reference = alertUtils.generateReference(); + const response = await alertUtils.createAlwaysFiringAction({ + reference, + overwrites: { interval: '1m' }, + }); + + switch (scenario.id) { + case 'no_kibana_privileges at space1': + case 'global_read at space1': + case 'space_1_all at space2': + expect(response.statusCode).to.eql(404); + expect(response.body).to.eql({ + statusCode: 404, + error: 'Not Found', + message: 'Not Found', + }); + break; + case 'superuser at space1': + case 'space_1_all at space1': + expect(response.statusCode).to.eql(200); + + // Wait for the task to be attempted once and idle + const scheduledActionTask = await retry.try(async () => { + const searchResult = await es.search({ + index: '.kibana_task_manager', + body: { + query: { + bool: { + must: [ + { + term: { + 'task.taskType': 'alerting:test.always-firing', + }, + }, + ], + }, + }, + }, + }); + expect(searchResult.hits.total.value).to.eql(1); + return searchResult.hits.hits[0]; + }); + + expect(scheduledActionTask._source.task.interval).to.eql('1m'); + break; + default: + throw new Error(`Scenario untested: ${JSON.stringify(scenario)}`); + } + }); + it('should have proper callCluster and savedObjectsClient authorization for alert type executor when appropriate', async () => { let searchResult: any; const testStart = new Date(); diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/alerts.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/alerts.ts index 28634c46b6350..57fdd9a998fb3 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/alerts.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/alerts.ts @@ -100,6 +100,33 @@ export default function alertTests({ getService }: FtrProviderContext) { }); }); + it('should set the Alert interval on the scheduled Task', async () => { + const reference = alertUtils.generateReference(); + await alertUtils.createAlwaysFiringAction({ reference }); + + const scheduledActionTask = await retry.try(async () => { + const searchResult = await es.search({ + index: '.kibana_task_manager', + body: { + query: { + bool: { + must: [ + { + term: { + 'task.taskType': 'alerting:test.always-firing', + }, + }, + ], + }, + }, + }, + }); + expect(searchResult.hits.total.value).to.eql(1); + return searchResult.hits.hits[0]; + }); + expect(scheduledActionTask._source.task.interval).to.eql('1m'); + }); + it('should handle custom retry logic', async () => { // We'll use this start time to query tasks created after this point const testStart = new Date(); diff --git a/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js b/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js index a9dfabae6d609..cd6558e3210a2 100644 --- a/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js +++ b/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js @@ -27,7 +27,7 @@ export function initRoutes(server) { const taskManager = server.plugins.task_manager; server.route({ - path: '/api/sample_tasks', + path: '/api/sample_tasks/schedule', method: 'POST', config: { validate: { @@ -39,27 +39,73 @@ export function initRoutes(server) { state: Joi.object().optional(), id: Joi.string().optional() }), - ensureScheduled: Joi.boolean() - .default(false) - .optional(), }), }, }, async handler(request) { try { - const { ensureScheduled = false, task: taskFields } = request.payload; + const { task: taskFields } = request.payload; + const task = { + ...taskFields, + scope: [scope], + }; + + return await taskManager.schedule(task, { request }); + } catch (err) { + return err; + } + }, + }); + + server.route({ + path: '/api/sample_tasks/ensure', + method: 'POST', + config: { + validate: { + payload: Joi.object({ + task: Joi.object({ + taskType: Joi.string().required(), + interval: Joi.string().optional(), + params: Joi.object().required(), + state: Joi.object().optional(), + id: Joi.string().optional() + }) + }), + }, + }, + async handler(request) { + try { + const { task: taskFields } = request.payload; const task = { ...taskFields, scope: [scope], }; - const taskResult = await ( - ensureScheduled - ? taskManager.ensureScheduled(task, { request }) - : taskManager.schedule(task, { request }) - ); + return await taskManager.ensureScheduled(task, { request }); + } catch (err) { + return err; + } + }, + }); + + server.route({ + path: '/api/sample_tasks/reschedule', + method: 'POST', + config: { + validate: { + payload: Joi.object({ + task: Joi.object({ + interval: Joi.string().optional(), + id: Joi.string().required() + }) + }), + }, + }, + async handler(request) { + try { + const { task } = request.payload; - return taskResult; + return await taskManager.reschedule(task); } catch (err) { return err; } diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js index 9b4297e995cbd..1b7a998316d42 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js @@ -58,7 +58,7 @@ export default function ({ getService }) { } function scheduleTask(task) { - return supertest.post('/api/sample_tasks') + return supertest.post('/api/sample_tasks/schedule') .set('kbn-xsrf', 'xxx') .send({ task }) .expect(200) @@ -66,9 +66,17 @@ export default function ({ getService }) { } function scheduleTaskIfNotExists(task) { - return supertest.post('/api/sample_tasks') + return supertest.post('/api/sample_tasks/ensure') .set('kbn-xsrf', 'xxx') - .send({ task, ensureScheduled: true }) + .send({ task }) + .expect(200) + .then((response) => response.body); + } + + function rescheduleTask(task) { + return supertest.post('/api/sample_tasks/reschedule') + .set('kbn-xsrf', 'xxx') + .send({ task }) .expect(200) .then((response) => response.body); } @@ -177,7 +185,7 @@ export default function ({ getService }) { }); }); - it('should reschedule if task has an interval', async () => { + it('should reschedule after the first run if the task has an interval', async () => { const interval = _.random(5, 200); const intervalMilliseconds = interval * 60000; @@ -187,14 +195,53 @@ export default function ({ getService }) { params: { }, }); - await retry.try(async () => { + return await retry.try(async () => { expect((await historyDocs()).length).to.eql(1); const [task] = (await currentTasks()).docs; expect(task.attempts).to.eql(0); expect(task.state.count).to.eql(1); - expectReschedule(originalTask, task, intervalMilliseconds); + return expectReschedule(originalTask, task, intervalMilliseconds); + }); + }); + + it('should rerun and adjust the scheduled interval when rescheduling an idle task', async () => { + const interval = _.random(150, 200); + const intervalMilliseconds = interval * 60000; + + const originalTask = await scheduleTask({ + taskType: 'sampleTask', + interval: `${interval}m`, + params: { }, + }); + + await retry.try(async () => { + expect((await historyDocs()).length).to.eql(1); + + const [automaticallyRescheduledTask] = (await currentTasks()).docs; + expect(automaticallyRescheduledTask.state.count).to.eql(1); + + return expectReschedule(originalTask, automaticallyRescheduledTask, intervalMilliseconds); + }); + + const updatedInterval = _.random(50, 100); + const updatedIntervalMilliseconds = updatedInterval * 60000; + + await rescheduleTask({ + id: originalTask.id, + interval: `${updatedInterval}m` + }); + + return await retry.try(async () => { + expect((await historyDocs()).length).to.eql(2); + + const [manuallyRescheduledTask] = (await currentTasks()).docs; + + expect(manuallyRescheduledTask.interval).to.eql(`${updatedInterval}m`); + expect(manuallyRescheduledTask.state.count).to.eql(2); + + return expectReschedule(originalTask, manuallyRescheduledTask, updatedIntervalMilliseconds); }); }); @@ -204,5 +251,6 @@ export default function ({ getService }) { expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.greaterThan(expectedDiff - buffer); expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.lessThan(expectedDiff + buffer); } + }); }