Skip to content

Commit

Permalink
added UnrecoverableError support for task runners nad pluged it into …
Browse files Browse the repository at this point in the history
…alerting where needed
  • Loading branch information
gmmorris committed Nov 2, 2020
1 parent 6c07f2e commit 16729cd
Show file tree
Hide file tree
Showing 16 changed files with 177 additions and 37 deletions.
20 changes: 10 additions & 10 deletions x-pack/plugins/alerts/server/task_runner/task_runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
import sinon from 'sinon';
import { schema } from '@kbn/config-schema';
import { AlertExecutorOptions } from '../types';
import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager/server';
import {
ConcreteTaskInstance,
isUnrecoverableError,
TaskStatus,
} from '../../../task_manager/server';
import { TaskRunnerContext } from './task_runner_factory';
import { TaskRunner } from './task_runner';
import { encryptedSavedObjectsMock } from '../../../encrypted_saved_objects/server/mocks';
Expand Down Expand Up @@ -985,7 +989,7 @@ describe('Task Runner', () => {

test('avoids rescheduling a failed Alert Task Runner when it throws due to failing to fetch the alert', async () => {
alertsClient.get.mockImplementation(() => {
throw SavedObjectsErrorHelpers.createGenericNotFoundError('task', '1');
throw SavedObjectsErrorHelpers.createGenericNotFoundError('alert', '1');
});

const taskRunner = new TaskRunner(
Expand All @@ -1003,13 +1007,9 @@ describe('Task Runner', () => {
references: [],
});

const runnerResult = await taskRunner.run();

expect(runnerResult).toMatchInlineSnapshot(`
Object {
"schedule": undefined,
"state": Object {},
}
`);
return taskRunner.run().catch((ex) => {
expect(ex).toMatchInlineSnapshot(`[Error: Saved object [alert/1] not found]`);
expect(isUnrecoverableError(ex)).toBeTruthy();
});
});
});
9 changes: 5 additions & 4 deletions x-pack/plugins/alerts/server/task_runner/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { PublicMethodsOf } from '@kbn/utility-types';
import { pickBy, mapValues, without } from 'lodash';
import { Logger, KibanaRequest } from '../../../../../src/core/server';
import { TaskRunnerContext } from './task_runner_factory';
import { ConcreteTaskInstance } from '../../../task_manager/server';
import { ConcreteTaskInstance, throwUnrecoverableError } from '../../../task_manager/server';
import { createExecutionHandler } from './create_execution_handler';
import { AlertInstance, createAlertInstanceFactory } from '../alert_instance';
import {
Expand Down Expand Up @@ -376,9 +376,10 @@ export class TaskRunner {
}
),
schedule: resolveErr<IntervalSchedule | undefined, Error>(schedule, (error) => {
return isAlertSavedObjectNotFoundError(error, alertId)
? undefined
: { interval: taskSchedule?.interval ?? FALLBACK_RETRY_INTERVAL };
if (isAlertSavedObjectNotFoundError(error, alertId)) {
throwUnrecoverableError(error);
}
return { interval: taskSchedule?.interval ?? FALLBACK_RETRY_INTERVAL };
}),
};
}
Expand Down
33 changes: 33 additions & 0 deletions x-pack/plugins/task_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,39 @@ The task runner's `run` method is expected to return a promise that resolves to

Other return values will result in a warning, but the system should continue to work.

### Task retries when the Task Runner fails
If a task runner throws an error, task manager will try to rerun the task shortly after (up to the task definition's `maxAttampts`).
Normal tasks will wait a default amount of 5m before trying again and every subsequent attempt will add an additonal 5m cool off period to avoid a stampeding herd of failed tasks from storming Elasticsearch.

Recurring tasks will also get retried, but instead of using the 5m interval for the retry, they will be retried on their nex tscheduled run.

### Force failing a task
If you wish to purposfully fail a task, you can throw an error of any kind and the retry logic will apply.
If, on the other hand, you wish not only to fail the task, but you'd also like to indicate the Task Manager that it shouldn't retry the task, you can throw an Unrecoverable Error, using the `throwUnrecoverableError` helper function.

For example:
```js
taskManager.registerTaskDefinitions({
myTask: {
/// ...
createTaskRunner(context) {
return {
async run() {
const result = ... // Do some work

if(!result) {
// No point retrying?
throwUnrecoverableError(new Error("No point retrying, this is unrecoverable"));
}

return result;
}
};
},
},
});
```

## Task instances

The task_manager module will store scheduled task instances in an index. This allows for recovery of failed tasks, coordination across Kibana clusters, persistence across Kibana reboots, etc.
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/buffered_task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import { TaskStore } from './task_store';
import { ConcreteTaskInstance } from './task';
import { Updatable } from './task_runner';
import { Updatable } from './task_running';
import { createBuffer, Operation, BufferOptions } from './lib/bulk_operation_buffer';
import { unwrapPromise } from './lib/result_type';

Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/task_manager/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ export {
RunContext,
} from './task';

export { isUnrecoverableError, throwUnrecoverableError } from './task_running';

export {
TaskManagerPlugin as TaskManager,
TaskManagerSetupContract,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { ConcreteTaskInstance, TaskStatus } from '../task';
import { asTaskRunEvent, asTaskPollingCycleEvent, TaskTiming } from '../task_events';
import { asOk } from '../lib/result_type';
import { TaskLifecycleEvent } from '../polling_lifecycle';
import { TaskRunResult } from '../task_runner';
import { TaskRunResult } from '../task_running';
import {
createTaskRunAggregator,
summarizeTaskRunStat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
} from '../task_events';
import { isOk, Ok, unwrap } from '../lib/result_type';
import { ConcreteTaskInstance } from '../task';
import { TaskRunResult } from '../task_runner';
import { TaskRunResult } from '../task_running';
import { FillPoolResult } from '../lib/fill_pool';
import {
AveragedStat,
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import {
createObservableMonitor,
} from './polling';
import { TaskPool } from './task_pool';
import { TaskManagerRunner, TaskRunner } from './task_runner';
import { TaskManagerRunner, TaskRunner } from './task_running';
import { TaskStore, OwnershipClaimingOpts, ClaimOwnershipResult } from './task_store';
import { identifyEsError } from './lib/identify_es_error';
import { BufferedTaskStore } from './buffered_task_store';
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/task_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { ConcreteTaskInstance } from './task';
import { Result, Err } from './lib/result_type';
import { FillPoolResult } from './lib/fill_pool';
import { PollingError } from './polling';
import { TaskRunResult } from './task_runner';
import { TaskRunResult } from './task_running';

export enum TaskEventType {
TASK_CLAIM = 'TASK_CLAIM',
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/task_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import moment, { Duration } from 'moment';
import { performance } from 'perf_hooks';
import { padStart } from 'lodash';
import { Logger } from '../../../../src/core/server';
import { TaskRunner } from './task_runner';
import { TaskRunner } from './task_running';
import { isTaskSavedObjectNotFoundError } from './lib/is_task_not_found_error';

interface Opts {
Expand Down
29 changes: 29 additions & 0 deletions x-pack/plugins/task_manager/server/task_running/errors.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { isUnrecoverableError, throwUnrecoverableError } from './errors';

describe('Error Types', () => {
describe('Unrecoverable error', () => {
it('wraps and throws normal errors', () => {
expect(() => throwUnrecoverableError(new Error('OMG'))).toThrowError('OMG');
});

it('idnentifies wrapped normal errors', async () => {
let result;
try {
throwUnrecoverableError(new Error('OMG'));
} catch (ex) {
result = ex;
}
expect(isUnrecoverableError(result)).toBeTruthy();
});

it('idnentifies normal errors', () => {
expect(isUnrecoverableError(new Error('OMG'))).toBeFalsy();
});
});
});
27 changes: 27 additions & 0 deletions x-pack/plugins/task_manager/server/task_running/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

// Unrecoverable
const CODE_UNRECOVERABLE = 'TaskManager/unrecoverable';

const code = Symbol('TaskManagerErrorCode');

export interface DecoratedError extends Error {
[code]?: string;
}

function isTaskManagerError(error: unknown): error is DecoratedError {
return Boolean(error && (error as DecoratedError)[code]);
}

export function isUnrecoverableError(error: Error | DecoratedError) {
return isTaskManagerError(error) && error[code] === CODE_UNRECOVERABLE;
}

export function throwUnrecoverableError(error: Error) {
(error as DecoratedError)[code] = CODE_UNRECOVERABLE;
throw error;
}
7 changes: 7 additions & 0 deletions x-pack/plugins/task_manager/server/task_running/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export * from './task_runner';
export * from './errors';
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@

import _ from 'lodash';
import sinon from 'sinon';
import { secondsFromNow } from './lib/intervals';
import { asOk, asErr } from './lib/result_type';
import { TaskManagerRunner, TaskRunResult } from './task_runner';
import { TaskEvent, asTaskRunEvent, asTaskMarkRunningEvent, TaskRun } from './task_events';
import { ConcreteTaskInstance, TaskStatus, TaskDefinition, SuccessfulRunResult } from './task';
import { SavedObjectsErrorHelpers } from '../../../../src/core/server';
import { secondsFromNow } from '../lib/intervals';
import { asOk, asErr } from '../lib/result_type';
import { TaskManagerRunner, TaskRunResult } from '../task_running';
import { TaskEvent, asTaskRunEvent, asTaskMarkRunningEvent, TaskRun } from '../task_events';
import { ConcreteTaskInstance, TaskStatus, TaskDefinition, SuccessfulRunResult } from '../task';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import moment from 'moment';
import { TaskTypeDictionary } from './task_type_dictionary';
import { mockLogger } from './test_utils';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { mockLogger } from '../test_utils';
import { throwUnrecoverableError } from './errors';

const minutesFromNow = (mins: number): Date => secondsFromNow(mins * 60);

Expand Down Expand Up @@ -222,6 +223,45 @@ describe('TaskManagerRunner', () => {
sinon.assert.calledWithMatch(store.update, { runAt });
});

test(`doesn't reschedule recurring tasks that throw an unrecoverable error`, async () => {
const id = _.random(1, 20).toString();
const error = new Error('Dangit!');
const onTaskEvent = jest.fn();
const { runner, store, instance: originalInstance } = testOpts({
onTaskEvent,
instance: { id, status: TaskStatus.Running, startedAt: new Date() },
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
throwUnrecoverableError(error);
},
}),
},
},
});

await runner.run();

const instance = store.update.args[0][0];
expect(instance.status).toBe('failed');

expect(onTaskEvent).toHaveBeenCalledWith(
withAnyTiming(
asTaskRunEvent(
id,
asErr({
error,
task: originalInstance,
result: TaskRunResult.Failed,
})
)
)
);
expect(onTaskEvent).toHaveBeenCalledTimes(1);
});

test('tasks that return runAt override the schedule', async () => {
const runAt = minutesFromNow(_.random(5));
const { runner, store } = testOpts({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ import { performance } from 'perf_hooks';
import Joi from 'joi';
import { identity, defaults, flow } from 'lodash';

import { Middleware } from './lib/middleware';
import { asOk, asErr, mapErr, eitherAsync, unwrap, isOk, mapOk, Result } from './lib/result_type';
import { Middleware } from '../lib/middleware';
import { asOk, asErr, mapErr, eitherAsync, unwrap, isOk, mapOk, Result } from '../lib/result_type';
import {
TaskRun,
TaskMarkRunning,
asTaskRunEvent,
asTaskMarkRunningEvent,
startTaskTimer,
TaskTiming,
} from './task_events';
import { intervalFromDate, intervalFromNow } from './lib/intervals';
} from '../task_events';
import { intervalFromDate, intervalFromNow } from '../lib/intervals';
import {
CancelFunction,
CancellableTask,
Expand All @@ -38,8 +38,9 @@ import {
TaskDefinition,
validateRunResult,
TaskStatus,
} from './task';
import { TaskTypeDictionary } from './task_type_dictionary';
} from '../task';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { isUnrecoverableError } from './errors';

const defaultBackoffPerFailure = 5 * 60 * 1000;
const EMPTY_RUN_RESULT: SuccessfulRunResult = { state: {} };
Expand Down Expand Up @@ -341,11 +342,11 @@ export class TaskManagerRunner implements TaskRunner {
private rescheduleFailedRun = (
failureResult: FailedRunResult
): Result<SuccessfulRunResult, FailedTaskResult> => {
if (this.shouldTryToScheduleRetry()) {
const { state, error } = failureResult;
if (this.shouldTryToScheduleRetry() && !isUnrecoverableError(error)) {
// if we're retrying, keep the number of attempts
const { schedule, attempts } = this.instance;

const { state, error } = failureResult;
const reschedule = failureResult.runAt
? { runAt: failureResult.runAt }
: failureResult.schedule
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/task_scheduling.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { asErr, asOk } from './lib/result_type';
import { ConcreteTaskInstance, TaskLifecycleResult, TaskStatus } from './task';
import { createInitialMiddleware } from './lib/middleware';
import { taskStoreMock } from './task_store.mock';
import { TaskRunResult } from './task_runner';
import { TaskRunResult } from './task_running';
import { mockLogger } from './test_utils';

describe('TaskScheduling', () => {
Expand Down

0 comments on commit 16729cd

Please sign in to comment.