From be949d66e43ae24e9bce4a13a4613ad00e1dce9a Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Mon, 11 Nov 2024 15:17:46 -0500 Subject: [PATCH] [Response Ops][Task Manager] Adding background task to mark removed task types as `unrecognized` (#199057) Resolves https://github.com/elastic/kibana/issues/192686 ## Summary Creates a background task to search for removed task types and mark them as unrecognized. Removes the current logic that does this during the task claim cycle for both task claim strategies. --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> --- x-pack/plugins/task_manager/server/plugin.ts | 13 +- .../server/polling_lifecycle.test.ts | 1 - .../task_manager/server/polling_lifecycle.ts | 3 - .../mark_available_tasks_as_claimed.test.ts | 5 - .../mark_available_tasks_as_claimed.ts | 5 - .../server/queries/task_claiming.test.ts | 2 - .../server/queries/task_claiming.ts | 4 - ...mark_removed_tasks_as_unrecognized.test.ts | 266 ++++++++++++++ .../mark_removed_tasks_as_unrecognized.ts | 150 ++++++++ .../server/task_claimers/index.ts | 1 - .../task_claimers/strategy_mget.test.ts | 341 +----------------- .../server/task_claimers/strategy_mget.ts | 67 +--- .../strategy_update_by_query.test.ts | 133 ------- .../task_claimers/strategy_update_by_query.ts | 6 +- x-pack/plugins/task_manager/tsconfig.json | 3 +- .../task_manager_fixture/server/plugin.ts | 28 ++ .../alerting/group4/scheduled_task_id.ts | 6 + .../sample_task_plugin/server/init_routes.ts | 28 ++ .../check_registered_task_types.ts | 1 + .../task_management_removed_types.ts | 6 + .../server/init_routes.ts | 28 ++ .../task_management_removed_types.ts | 6 + 22 files changed, 547 insertions(+), 556 deletions(-) create mode 100644 x-pack/plugins/task_manager/server/removed_tasks/mark_removed_tasks_as_unrecognized.test.ts create mode 100644 x-pack/plugins/task_manager/server/removed_tasks/mark_removed_tasks_as_unrecognized.ts diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index 45960195be216..cd820d1e70780 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -29,7 +29,7 @@ import { TaskManagerConfig } from './config'; import { createInitialMiddleware, addMiddlewareToChain, Middleware } from './lib/middleware'; import { removeIfExists } from './lib/remove_if_exists'; import { setupSavedObjects, BACKGROUND_TASK_NODE_SO_NAME, TASK_SO_NAME } from './saved_objects'; -import { TaskDefinitionRegistry, TaskTypeDictionary, REMOVED_TYPES } from './task_type_dictionary'; +import { TaskDefinitionRegistry, TaskTypeDictionary } from './task_type_dictionary'; import { AggregationOpts, FetchResult, SearchOpts, TaskStore } from './task_store'; import { createManagedConfiguration } from './lib/create_managed_configuration'; import { TaskScheduling } from './task_scheduling'; @@ -45,6 +45,10 @@ import { metricsStream, Metrics } from './metrics'; import { TaskManagerMetricsCollector } from './metrics/task_metrics_collector'; import { TaskPartitioner } from './lib/task_partitioner'; import { getDefaultCapacity } from './lib/get_default_capacity'; +import { + registerMarkRemovedTasksAsUnrecognizedDefinition, + scheduleMarkRemovedTasksAsUnrecognizedDefinition, +} from './removed_tasks/mark_removed_tasks_as_unrecognized'; export interface TaskManagerSetupContract { /** @@ -221,6 +225,11 @@ export class TaskManagerPlugin } registerDeleteInactiveNodesTaskDefinition(this.logger, core.getStartServices, this.definitions); + registerMarkRemovedTasksAsUnrecognizedDefinition( + this.logger, + core.getStartServices, + this.definitions + ); if (this.config.unsafe.exclude_task_types.length) { this.logger.warn( @@ -332,7 +341,6 @@ export class TaskManagerPlugin this.taskPollingLifecycle = new TaskPollingLifecycle({ config: this.config!, definitions: this.definitions, - unusedTypes: REMOVED_TYPES, logger: this.logger, executionContext, taskStore, @@ -384,6 +392,7 @@ export class TaskManagerPlugin }); scheduleDeleteInactiveNodesTaskDefinition(this.logger, taskScheduling).catch(() => {}); + scheduleMarkRemovedTasksAsUnrecognizedDefinition(this.logger, taskScheduling).catch(() => {}); return { fetch: (opts: SearchOpts): Promise => taskStore.fetch(opts), diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index 1f244f7f4c8a5..a408bd3f634d9 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -106,7 +106,6 @@ describe('TaskPollingLifecycle', () => { }, taskStore: mockTaskStore, logger: taskManagerLogger, - unusedTypes: [], definitions: new TaskTypeDictionary(taskManagerLogger), middleware: createInitialMiddleware(), startingCapacity: 20, diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index 0b1710ae7fa2f..fb6776fa34f28 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -55,7 +55,6 @@ export interface ITaskEventEmitter { export type TaskPollingLifecycleOpts = { logger: Logger; definitions: TaskTypeDictionary; - unusedTypes: string[]; taskStore: TaskStore; config: TaskManagerConfig; middleware: Middleware; @@ -115,7 +114,6 @@ export class TaskPollingLifecycle implements ITaskEventEmitter this.pool.availableCapacity(taskType), taskPartitioner, diff --git a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts index 76df8b7ae5584..fa1d1f749985b 100644 --- a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts +++ b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts @@ -70,7 +70,6 @@ describe('mark_available_tasks_as_claimed', () => { fieldUpdates, claimableTaskTypes: definitions.getAllTypes(), skippedTaskTypes: [], - unusedTaskTypes: [], taskMaxAttempts: Array.from(definitions).reduce((accumulator, [type, { maxAttempts }]) => { return { ...accumulator, [type]: maxAttempts || defaultMaxAttempts }; }, {}), @@ -153,8 +152,6 @@ if (doc['task.runAt'].size()!=0) { ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates) .map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`) .join(' ')} - } else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) { - ctx._source.task.status = "unrecognized"; } else { ctx.op = "noop"; }`, @@ -167,7 +164,6 @@ if (doc['task.runAt'].size()!=0) { }, claimableTaskTypes: ['sampleTask', 'otherTask'], skippedTaskTypes: [], - unusedTaskTypes: [], taskMaxAttempts: { sampleTask: 5, otherTask: 1, @@ -242,7 +238,6 @@ if (doc['task.runAt'].size()!=0) { fieldUpdates, claimableTaskTypes: ['foo', 'bar'], skippedTaskTypes: [], - unusedTaskTypes: [], taskMaxAttempts: { foo: 5, bar: 2, diff --git a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts index 4e138545aec25..ec99c6ad5bf80 100644 --- a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts +++ b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts @@ -202,7 +202,6 @@ export interface UpdateFieldsAndMarkAsFailedOpts { }; claimableTaskTypes: string[]; skippedTaskTypes: string[]; - unusedTaskTypes: string[]; taskMaxAttempts: { [field: string]: number }; } @@ -210,7 +209,6 @@ export const updateFieldsAndMarkAsFailed = ({ fieldUpdates, claimableTaskTypes, skippedTaskTypes, - unusedTaskTypes, taskMaxAttempts, }: UpdateFieldsAndMarkAsFailedOpts): ScriptClause => { const setScheduledAtScript = `if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) { @@ -227,8 +225,6 @@ export const updateFieldsAndMarkAsFailed = ({ source: ` if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) { ${setScheduledAtAndMarkAsClaimed} - } else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) { - ctx._source.task.status = "unrecognized"; } else { ctx.op = "noop"; }`, @@ -238,7 +234,6 @@ export const updateFieldsAndMarkAsFailed = ({ fieldUpdates, claimableTaskTypes, skippedTaskTypes, - unusedTaskTypes, taskMaxAttempts, }, }; diff --git a/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts b/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts index 437af8e007bdb..629e3464399c7 100644 --- a/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts +++ b/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts @@ -83,7 +83,6 @@ describe('TaskClaiming', () => { strategy: 'non-default', definitions, excludedTaskTypes: [], - unusedTypes: [], taskStore: taskStoreMock.create({ taskManagerId: '' }), maxAttempts: 2, getAvailableCapacity: () => 10, @@ -134,7 +133,6 @@ describe('TaskClaiming', () => { strategy: 'default', definitions, excludedTaskTypes: [], - unusedTypes: [], taskStore: taskStoreMock.create({ taskManagerId: '' }), maxAttempts: 2, getAvailableCapacity: () => 10, diff --git a/x-pack/plugins/task_manager/server/queries/task_claiming.ts b/x-pack/plugins/task_manager/server/queries/task_claiming.ts index c9bca31755408..1b1e414903628 100644 --- a/x-pack/plugins/task_manager/server/queries/task_claiming.ts +++ b/x-pack/plugins/task_manager/server/queries/task_claiming.ts @@ -34,7 +34,6 @@ export interface TaskClaimingOpts { logger: Logger; strategy: string; definitions: TaskTypeDictionary; - unusedTypes: string[]; taskStore: TaskStore; maxAttempts: number; excludedTaskTypes: string[]; @@ -92,7 +91,6 @@ export class TaskClaiming { private readonly taskClaimingBatchesByType: TaskClaimingBatches; private readonly taskMaxAttempts: Record; private readonly excludedTaskTypes: string[]; - private readonly unusedTypes: string[]; private readonly taskClaimer: TaskClaimerFn; private readonly taskPartitioner: TaskPartitioner; @@ -111,7 +109,6 @@ export class TaskClaiming { this.taskClaimingBatchesByType = this.partitionIntoClaimingBatches(this.definitions); this.taskMaxAttempts = Object.fromEntries(this.normalizeMaxAttempts(this.definitions)); this.excludedTaskTypes = opts.excludedTaskTypes; - this.unusedTypes = opts.unusedTypes; this.taskClaimer = getTaskClaimer(this.logger, opts.strategy); this.events$ = new Subject(); this.taskPartitioner = opts.taskPartitioner; @@ -178,7 +175,6 @@ export class TaskClaiming { taskStore: this.taskStore, events$: this.events$, getCapacity: this.getAvailableCapacity, - unusedTypes: this.unusedTypes, definitions: this.definitions, taskMaxAttempts: this.taskMaxAttempts, excludedTaskTypes: this.excludedTaskTypes, diff --git a/x-pack/plugins/task_manager/server/removed_tasks/mark_removed_tasks_as_unrecognized.test.ts b/x-pack/plugins/task_manager/server/removed_tasks/mark_removed_tasks_as_unrecognized.test.ts new file mode 100644 index 0000000000000..1485216a67f33 --- /dev/null +++ b/x-pack/plugins/task_manager/server/removed_tasks/mark_removed_tasks_as_unrecognized.test.ts @@ -0,0 +1,266 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { mockLogger } from '../test_utils'; +import { coreMock, elasticsearchServiceMock } from '@kbn/core/server/mocks'; +import { SCHEDULE_INTERVAL, taskRunner } from './mark_removed_tasks_as_unrecognized'; +import { SearchHit } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; + +const createTaskDoc = (id: string = '1'): SearchHit => ({ + _index: '.kibana_task_manager_9.0.0_001', + _id: `task:${id}`, + _score: 1, + _source: { + references: [], + type: 'task', + updated_at: '2024-11-06T14:17:55.935Z', + task: { + taskType: 'report', + params: '{}', + state: '{"foo":"test"}', + stateVersion: 1, + runAt: '2024-11-06T14:17:55.935Z', + enabled: true, + scheduledAt: '2024-11-06T14:17:55.935Z', + attempts: 0, + status: 'idle', + startedAt: null, + retryAt: null, + ownerId: null, + partition: 211, + }, + }, +}); + +describe('markRemovedTasksAsUnrecognizedTask', () => { + const logger = mockLogger(); + const coreSetup = coreMock.createSetup(); + const esClient = elasticsearchServiceMock.createStart(); + + afterEach(() => { + jest.clearAllMocks(); + }); + + it('marks removed tasks as unrecognized', async () => { + esClient.client.asInternalUser.bulk.mockResolvedValue({ + errors: false, + took: 0, + items: [ + { + update: { + _index: '.kibana_task_manager_9.0.0_001', + _id: 'task:123', + _version: 2, + result: 'updated', + _shards: { total: 1, successful: 1, failed: 0 }, + _seq_no: 84, + _primary_term: 1, + status: 200, + }, + }, + { + update: { + _index: '.kibana_task_manager_9.0.0_001', + _id: 'task:456', + _version: 2, + result: 'updated', + _shards: { total: 1, successful: 1, failed: 0 }, + _seq_no: 84, + _primary_term: 1, + status: 200, + }, + }, + { + update: { + _index: '.kibana_task_manager_9.0.0_001', + _id: 'task:789', + _version: 2, + result: 'updated', + _shards: { total: 1, successful: 1, failed: 0 }, + _seq_no: 84, + _primary_term: 1, + status: 200, + }, + }, + ], + }); + + coreSetup.getStartServices.mockResolvedValue([ + { + ...coreMock.createStart(), + elasticsearch: esClient, + }, + {}, + coreMock.createSetup(), + ]); + // @ts-expect-error + esClient.client.asInternalUser.search.mockResponse({ + hits: { hits: [createTaskDoc('123'), createTaskDoc('456'), createTaskDoc('789')], total: 3 }, + }); + + const runner = taskRunner(logger, coreSetup.getStartServices)(); + const result = await runner.run(); + + expect(esClient.client.asInternalUser.bulk).toHaveBeenCalledWith({ + body: [ + { update: { _id: 'task:123' } }, + { doc: { task: { status: 'unrecognized' } } }, + { update: { _id: 'task:456' } }, + { doc: { task: { status: 'unrecognized' } } }, + { update: { _id: 'task:789' } }, + { doc: { task: { status: 'unrecognized' } } }, + ], + index: '.kibana_task_manager', + refresh: false, + }); + + expect(logger.debug).toHaveBeenCalledWith(`Marked 3 removed tasks as unrecognized`); + + expect(result).toEqual({ + state: {}, + schedule: { interval: SCHEDULE_INTERVAL }, + }); + }); + + it('skips update when there are no removed task types', async () => { + coreSetup.getStartServices.mockResolvedValue([ + { + ...coreMock.createStart(), + elasticsearch: esClient, + }, + {}, + coreMock.createSetup(), + ]); + // @ts-expect-error + esClient.client.asInternalUser.search.mockResponse({ + hits: { hits: [], total: 0 }, + }); + + const runner = taskRunner(logger, coreSetup.getStartServices)(); + const result = await runner.run(); + + expect(esClient.client.asInternalUser.bulk).not.toHaveBeenCalled(); + + expect(result).toEqual({ + state: {}, + schedule: { interval: SCHEDULE_INTERVAL }, + }); + }); + + it('schedules the next run even when there is an error', async () => { + coreSetup.getStartServices.mockResolvedValue([ + { + ...coreMock.createStart(), + elasticsearch: esClient, + }, + {}, + coreMock.createSetup(), + ]); + esClient.client.asInternalUser.search.mockRejectedValueOnce(new Error('foo')); + + const runner = taskRunner(logger, coreSetup.getStartServices)(); + const result = await runner.run(); + + expect(esClient.client.asInternalUser.bulk).not.toHaveBeenCalled(); + + expect(logger.error).toHaveBeenCalledWith( + 'Failed to mark removed tasks as unrecognized. Error: foo' + ); + + expect(result).toEqual({ + state: {}, + schedule: { interval: SCHEDULE_INTERVAL }, + }); + }); + + it('handles partial errors from bulk partial update', async () => { + esClient.client.asInternalUser.bulk.mockResolvedValue({ + errors: false, + took: 0, + items: [ + { + update: { + _index: '.kibana_task_manager_9.0.0_001', + _id: 'task:123', + _version: 2, + result: 'updated', + _shards: { total: 1, successful: 1, failed: 0 }, + _seq_no: 84, + _primary_term: 1, + status: 200, + }, + }, + { + update: { + _index: '.kibana_task_manager_9.0.0_001', + _id: 'task:456', + _version: 2, + result: 'updated', + _shards: { total: 1, successful: 1, failed: 0 }, + _seq_no: 84, + _primary_term: 1, + status: 200, + }, + }, + { + update: { + _index: '.kibana_task_manager_9.0.0_001', + _id: 'task:789', + _version: 2, + error: { + type: 'document_missing_exception', + reason: '[5]: document missing', + index_uuid: 'aAsFqTI0Tc2W0LCWgPNrOA', + shard: '0', + index: '.kibana_task_manager_9.0.0_001', + }, + status: 404, + }, + }, + ], + }); + + coreSetup.getStartServices.mockResolvedValue([ + { + ...coreMock.createStart(), + elasticsearch: esClient, + }, + {}, + coreMock.createSetup(), + ]); + // @ts-expect-error + esClient.client.asInternalUser.search.mockResponse({ + hits: { hits: [createTaskDoc('123'), createTaskDoc('456'), createTaskDoc('789')], total: 3 }, + }); + + const runner = taskRunner(logger, coreSetup.getStartServices)(); + const result = await runner.run(); + + expect(esClient.client.asInternalUser.bulk).toHaveBeenCalledWith({ + body: [ + { update: { _id: 'task:123' } }, + { doc: { task: { status: 'unrecognized' } } }, + { update: { _id: 'task:456' } }, + { doc: { task: { status: 'unrecognized' } } }, + { update: { _id: 'task:789' } }, + { doc: { task: { status: 'unrecognized' } } }, + ], + index: '.kibana_task_manager', + refresh: false, + }); + expect(logger.warn).toHaveBeenCalledWith( + `Error updating task task:789 to mark as unrecognized - {\"type\":\"document_missing_exception\",\"reason\":\"[5]: document missing\",\"index_uuid\":\"aAsFqTI0Tc2W0LCWgPNrOA\",\"shard\":\"0\",\"index\":\".kibana_task_manager_9.0.0_001\"}` + ); + + expect(logger.debug).toHaveBeenCalledWith(`Marked 2 removed tasks as unrecognized`); + + expect(result).toEqual({ + state: {}, + schedule: { interval: SCHEDULE_INTERVAL }, + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/removed_tasks/mark_removed_tasks_as_unrecognized.ts b/x-pack/plugins/task_manager/server/removed_tasks/mark_removed_tasks_as_unrecognized.ts new file mode 100644 index 0000000000000..e28d5221e72d5 --- /dev/null +++ b/x-pack/plugins/task_manager/server/removed_tasks/mark_removed_tasks_as_unrecognized.ts @@ -0,0 +1,150 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Logger } from '@kbn/logging'; +import { CoreStart } from '@kbn/core-lifecycle-server'; +import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; +import { SearchHit } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import { TaskScheduling } from '../task_scheduling'; +import { TaskTypeDictionary } from '../task_type_dictionary'; +import { ConcreteTaskInstance, TaskManagerStartContract } from '..'; +import { TaskStatus } from '../task'; +import { REMOVED_TYPES } from '../task_type_dictionary'; +import { TASK_MANAGER_INDEX } from '../constants'; + +export const TASK_ID = 'mark_removed_tasks_as_unrecognized'; +const TASK_TYPE = `task_manager:${TASK_ID}`; + +export const SCHEDULE_INTERVAL = '1h'; + +export async function scheduleMarkRemovedTasksAsUnrecognizedDefinition( + logger: Logger, + taskScheduling: TaskScheduling +) { + try { + await taskScheduling.ensureScheduled({ + id: TASK_ID, + taskType: TASK_TYPE, + schedule: { interval: SCHEDULE_INTERVAL }, + state: {}, + params: {}, + }); + } catch (e) { + logger.error(`Error scheduling ${TASK_ID} task, received ${e.message}`); + } +} + +export function registerMarkRemovedTasksAsUnrecognizedDefinition( + logger: Logger, + coreStartServices: () => Promise<[CoreStart, TaskManagerStartContract, unknown]>, + taskTypeDictionary: TaskTypeDictionary +) { + taskTypeDictionary.registerTaskDefinitions({ + [TASK_TYPE]: { + title: 'Mark removed tasks as unrecognized', + createTaskRunner: taskRunner(logger, coreStartServices), + }, + }); +} + +export function taskRunner( + logger: Logger, + coreStartServices: () => Promise<[CoreStart, TaskManagerStartContract, unknown]> +) { + return () => { + return { + async run() { + try { + const [{ elasticsearch }] = await coreStartServices(); + const esClient = elasticsearch.client.asInternalUser; + + const removedTasks = await queryForRemovedTasks(esClient); + + if (removedTasks.length > 0) { + await updateTasksToBeUnrecognized(esClient, logger, removedTasks); + } + + return { + state: {}, + schedule: { interval: SCHEDULE_INTERVAL }, + }; + } catch (e) { + logger.error(`Failed to mark removed tasks as unrecognized. Error: ${e.message}`); + return { + state: {}, + schedule: { interval: SCHEDULE_INTERVAL }, + }; + } + }, + }; + }; +} + +async function queryForRemovedTasks( + esClient: ElasticsearchClient +): Promise>> { + const result = await esClient.search({ + index: TASK_MANAGER_INDEX, + body: { + size: 100, + _source: false, + query: { + bool: { + must: [ + { + terms: { + 'task.taskType': REMOVED_TYPES, + }, + }, + ], + }, + }, + }, + }); + + return result.hits.hits; +} + +async function updateTasksToBeUnrecognized( + esClient: ElasticsearchClient, + logger: Logger, + removedTasks: Array> +) { + const bulkBody = []; + for (const task of removedTasks) { + bulkBody.push({ update: { _id: task._id } }); + bulkBody.push({ doc: { task: { status: TaskStatus.Unrecognized } } }); + } + + let removedCount = 0; + try { + const removeResults = await esClient.bulk({ + index: TASK_MANAGER_INDEX, + refresh: false, + body: bulkBody, + }); + for (const removeResult of removeResults.items) { + if (!removeResult.update || !removeResult.update._id) { + logger.warn( + `Error updating task with unknown to mark as unrecognized - malformed response` + ); + } else if (removeResult.update?.error) { + logger.warn( + `Error updating task ${ + removeResult.update._id + } to mark as unrecognized - ${JSON.stringify(removeResult.update.error)}` + ); + } else { + removedCount++; + } + } + logger.debug(`Marked ${removedCount} removed tasks as unrecognized`); + } catch (err) { + // don't worry too much about errors, we'll try again next time + logger.warn(`Error updating tasks to mark as unrecognized: ${err}`); + } +} diff --git a/x-pack/plugins/task_manager/server/task_claimers/index.ts b/x-pack/plugins/task_manager/server/task_claimers/index.ts index 178ebacf68cb9..f41c489fd7550 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/index.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/index.ts @@ -26,7 +26,6 @@ export interface TaskClaimerOpts { events$: Subject; taskStore: TaskStore; definitions: TaskTypeDictionary; - unusedTypes: string[]; excludedTaskTypes: string[]; taskMaxAttempts: Record; logger: Logger; diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts index fe44ce9e94c68..07dae3c48a392 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts @@ -190,7 +190,6 @@ describe('TaskClaiming', () => { definitions, taskStore: store, excludedTaskTypes, - unusedTypes: unusedTaskTypes, maxAttempts: taskClaimingOpts.maxAttempts ?? 2, getAvailableCapacity: taskClaimingOpts.getAvailableCapacity ?? (() => 10), taskPartitioner, @@ -206,20 +205,17 @@ describe('TaskClaiming', () => { claimingOpts, hits = [generateFakeTasks(1)], excludedTaskTypes = [], - unusedTaskTypes = [], }: { storeOpts: Partial; taskClaimingOpts: Partial; claimingOpts: Omit; hits?: ConcreteTaskInstance[][]; excludedTaskTypes?: string[]; - unusedTaskTypes?: string[]; }) { const { taskClaiming, store } = initialiseTestClaiming({ storeOpts, taskClaimingOpts, excludedTaskTypes, - unusedTaskTypes, hits, }); @@ -355,7 +351,6 @@ describe('TaskClaiming', () => { definitions: taskDefinitions, taskStore: store, excludedTaskTypes: [], - unusedTypes: [], maxAttempts: 2, getAvailableCapacity: () => 10, taskPartitioner, @@ -378,7 +373,7 @@ describe('TaskClaiming', () => { expect(mockApmTrans.end).toHaveBeenCalledWith('success'); expect(taskManagerLogger.debug).toHaveBeenCalledWith( - 'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 3; updateErrors: 0; getErrors: 0; removed: 0;', + 'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 3; updateErrors: 0; getErrors: 0;', { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); @@ -440,312 +435,6 @@ describe('TaskClaiming', () => { expect(result.docs.length).toEqual(3); }); - test('should not claim tasks of removed type', async () => { - const store = taskStoreMock.create({ taskManagerId: 'test-test' }); - store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`)); - - const fetchedTasks = [ - mockInstance({ id: `id-1`, taskType: 'report' }), - mockInstance({ id: `id-2`, taskType: 'report' }), - mockInstance({ id: `id-3`, taskType: 'yawn' }), - ]; - - const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); - store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); - store.getDocVersions.mockResolvedValueOnce(docLatestVersions); - - store.bulkGet.mockResolvedValueOnce([fetchedTasks[2]].map(asOk)); - store.bulkPartialUpdate.mockResolvedValueOnce([fetchedTasks[2]].map(getPartialUpdateResult)); - store.bulkPartialUpdate.mockResolvedValueOnce( - [fetchedTasks[0], fetchedTasks[1]].map(getPartialUpdateResult) - ); - - const taskClaiming = new TaskClaiming({ - logger: taskManagerLogger, - strategy: CLAIM_STRATEGY_MGET, - definitions: taskDefinitions, - taskStore: store, - excludedTaskTypes: [], - unusedTypes: ['report'], - maxAttempts: 2, - getAvailableCapacity: () => 10, - taskPartitioner, - }); - - const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ - claimOwnershipUntil: new Date(), - }); - - if (!isOk(resultOrErr)) { - expect(resultOrErr).toBe(undefined); - } - - const result = unwrap(resultOrErr) as ClaimOwnershipResult; - - expect(apm.startTransaction).toHaveBeenCalledWith( - TASK_MANAGER_MARK_AS_CLAIMED, - TASK_MANAGER_TRANSACTION_TYPE - ); - expect(mockApmTrans.end).toHaveBeenCalledWith('success'); - - expect(taskManagerLogger.debug).toHaveBeenCalledWith( - 'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 2;', - { tags: ['taskClaiming', 'claimAvailableTasksMget'] } - ); - - expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ - size: 40, - seq_no_primary_term: true, - }); - expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']); - expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(2); - expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(1, [ - { - id: fetchedTasks[2].id, - version: fetchedTasks[2].version, - scheduledAt: fetchedTasks[2].runAt, - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - ]); - expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(2, [ - { - id: fetchedTasks[0].id, - version: fetchedTasks[0].version, - status: 'unrecognized', - }, - { - id: fetchedTasks[1].id, - version: fetchedTasks[1].version, - status: 'unrecognized', - }, - ]); - expect(store.bulkGet).toHaveBeenCalledWith(['id-3']); - - expect(result.stats).toEqual({ - tasksClaimed: 1, - tasksConflicted: 0, - tasksErrors: 0, - tasksUpdated: 1, - tasksLeftUnclaimed: 0, - staleTasks: 0, - }); - expect(result.docs.length).toEqual(1); - }); - - test('should log warning if error updating single removed task as unrecognized', async () => { - const store = taskStoreMock.create({ taskManagerId: 'test-test' }); - store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`)); - - const fetchedTasks = [ - mockInstance({ id: `id-1`, taskType: 'report' }), - mockInstance({ id: `id-2`, taskType: 'report' }), - mockInstance({ id: `id-3`, taskType: 'yawn' }), - ]; - - const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); - store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); - store.getDocVersions.mockResolvedValueOnce(docLatestVersions); - - store.bulkGet.mockResolvedValueOnce([fetchedTasks[2]].map(asOk)); - store.bulkPartialUpdate.mockResolvedValueOnce([fetchedTasks[2]].map(getPartialUpdateResult)); - store.bulkPartialUpdate.mockResolvedValueOnce([ - asOk(fetchedTasks[0]), - asErr({ - type: 'task', - id: fetchedTasks[1].id, - status: 404, - error: { - type: 'document_missing_exception', - reason: '[5]: document missing', - index_uuid: 'aAsFqTI0Tc2W0LCWgPNrOA', - shard: '0', - index: '.kibana_task_manager_8.16.0_001', - }, - }), - ]); - - const taskClaiming = new TaskClaiming({ - logger: taskManagerLogger, - strategy: CLAIM_STRATEGY_MGET, - definitions: taskDefinitions, - taskStore: store, - excludedTaskTypes: [], - unusedTypes: ['report'], - maxAttempts: 2, - getAvailableCapacity: () => 10, - taskPartitioner, - }); - - const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ - claimOwnershipUntil: new Date(), - }); - - if (!isOk(resultOrErr)) { - expect(resultOrErr).toBe(undefined); - } - - const result = unwrap(resultOrErr) as ClaimOwnershipResult; - - expect(apm.startTransaction).toHaveBeenCalledWith( - TASK_MANAGER_MARK_AS_CLAIMED, - TASK_MANAGER_TRANSACTION_TYPE - ); - expect(mockApmTrans.end).toHaveBeenCalledWith('success'); - - expect(taskManagerLogger.warn).toHaveBeenCalledWith( - 'Error updating task id-2:task to mark as unrecognized during claim: {"type":"document_missing_exception","reason":"[5]: document missing","index_uuid":"aAsFqTI0Tc2W0LCWgPNrOA","shard":"0","index":".kibana_task_manager_8.16.0_001"}', - { tags: ['taskClaiming', 'claimAvailableTasksMget'] } - ); - expect(taskManagerLogger.debug).toHaveBeenCalledWith( - 'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 1;', - { tags: ['taskClaiming', 'claimAvailableTasksMget'] } - ); - - expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ - size: 40, - seq_no_primary_term: true, - }); - expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']); - expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(2); - expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(1, [ - { - id: fetchedTasks[2].id, - version: fetchedTasks[2].version, - scheduledAt: fetchedTasks[2].runAt, - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - ]); - expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(2, [ - { - id: fetchedTasks[0].id, - version: fetchedTasks[0].version, - status: 'unrecognized', - }, - { - id: fetchedTasks[1].id, - version: fetchedTasks[1].version, - status: 'unrecognized', - }, - ]); - expect(store.bulkGet).toHaveBeenCalledWith(['id-3']); - - expect(result.stats).toEqual({ - tasksClaimed: 1, - tasksConflicted: 0, - tasksErrors: 0, - tasksUpdated: 1, - tasksLeftUnclaimed: 0, - staleTasks: 0, - }); - expect(result.docs.length).toEqual(1); - }); - - test('should log warning if error updating all removed tasks as unrecognized', async () => { - const store = taskStoreMock.create({ taskManagerId: 'test-test' }); - store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`)); - - const fetchedTasks = [ - mockInstance({ id: `id-1`, taskType: 'report' }), - mockInstance({ id: `id-2`, taskType: 'report' }), - mockInstance({ id: `id-3`, taskType: 'yawn' }), - ]; - - const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); - store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); - store.getDocVersions.mockResolvedValueOnce(docLatestVersions); - - store.bulkGet.mockResolvedValueOnce([fetchedTasks[2]].map(asOk)); - store.bulkPartialUpdate.mockResolvedValueOnce([fetchedTasks[2]].map(getPartialUpdateResult)); - store.bulkPartialUpdate.mockRejectedValueOnce(new Error('Oh no')); - - const taskClaiming = new TaskClaiming({ - logger: taskManagerLogger, - strategy: CLAIM_STRATEGY_MGET, - definitions: taskDefinitions, - taskStore: store, - excludedTaskTypes: [], - unusedTypes: ['report'], - maxAttempts: 2, - getAvailableCapacity: () => 10, - taskPartitioner, - }); - - const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ - claimOwnershipUntil: new Date(), - }); - - if (!isOk(resultOrErr)) { - expect(resultOrErr).toBe(undefined); - } - - const result = unwrap(resultOrErr) as ClaimOwnershipResult; - - expect(apm.startTransaction).toHaveBeenCalledWith( - TASK_MANAGER_MARK_AS_CLAIMED, - TASK_MANAGER_TRANSACTION_TYPE - ); - expect(mockApmTrans.end).toHaveBeenCalledWith('success'); - - expect(taskManagerLogger.warn).toHaveBeenCalledWith( - 'Error updating tasks to mark as unrecognized during claim: Error: Oh no', - { tags: ['taskClaiming', 'claimAvailableTasksMget'] } - ); - expect(taskManagerLogger.debug).toHaveBeenCalledWith( - 'task claimer claimed: 1; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', - { tags: ['taskClaiming', 'claimAvailableTasksMget'] } - ); - - expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ - size: 40, - seq_no_primary_term: true, - }); - expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']); - expect(store.bulkGet).toHaveBeenCalledWith(['id-3']); - expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(2); - expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(1, [ - { - id: fetchedTasks[2].id, - version: fetchedTasks[2].version, - scheduledAt: fetchedTasks[2].runAt, - attempts: 1, - ownerId: 'test-test', - retryAt: new Date('1970-01-01T00:05:30.000Z'), - status: 'running', - startedAt: new Date('1970-01-01T00:00:00.000Z'), - }, - ]); - expect(store.bulkPartialUpdate).toHaveBeenNthCalledWith(2, [ - { - id: fetchedTasks[0].id, - version: fetchedTasks[0].version, - status: 'unrecognized', - }, - { - id: fetchedTasks[1].id, - version: fetchedTasks[1].version, - status: 'unrecognized', - }, - ]); - - expect(result.stats).toEqual({ - tasksClaimed: 1, - tasksConflicted: 0, - tasksErrors: 0, - tasksUpdated: 1, - tasksLeftUnclaimed: 0, - staleTasks: 0, - }); - expect(result.docs.length).toEqual(1); - }); - test('should handle no tasks to claim', async () => { const store = taskStoreMock.create({ taskManagerId: 'test-test' }); store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`)); @@ -761,7 +450,6 @@ describe('TaskClaiming', () => { definitions: taskDefinitions, taskStore: store, excludedTaskTypes: [], - unusedTypes: [], maxAttempts: 2, getAvailableCapacity: () => 10, taskPartitioner, @@ -828,7 +516,6 @@ describe('TaskClaiming', () => { definitions: taskDefinitions, taskStore: store, excludedTaskTypes: [], - unusedTypes: [], maxAttempts: 2, getAvailableCapacity: () => 10, taskPartitioner, @@ -851,7 +538,7 @@ describe('TaskClaiming', () => { expect(mockApmTrans.end).toHaveBeenCalledWith('success'); expect(taskManagerLogger.debug).toHaveBeenCalledWith( - 'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', + 'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0;', { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); @@ -922,7 +609,6 @@ describe('TaskClaiming', () => { definitions: taskDefinitions, taskStore: store, excludedTaskTypes: [], - unusedTypes: [], maxAttempts: 2, getAvailableCapacity: () => 10, taskPartitioner, @@ -945,7 +631,7 @@ describe('TaskClaiming', () => { expect(mockApmTrans.end).toHaveBeenCalledWith('success'); expect(taskManagerLogger.debug).toHaveBeenCalledWith( - 'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', + 'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0;', { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); @@ -1016,7 +702,6 @@ describe('TaskClaiming', () => { definitions: taskDefinitions, taskStore: store, excludedTaskTypes: [], - unusedTypes: [], maxAttempts: 2, getAvailableCapacity: () => 10, taskPartitioner, @@ -1039,7 +724,7 @@ describe('TaskClaiming', () => { expect(mockApmTrans.end).toHaveBeenCalledWith('success'); expect(taskManagerLogger.debug).toHaveBeenCalledWith( - 'task claimer claimed: 2; stale: 1; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', + 'task claimer claimed: 2; stale: 1; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0;', { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); @@ -1116,7 +801,6 @@ describe('TaskClaiming', () => { definitions: taskDefinitions, taskStore: store, excludedTaskTypes: [], - unusedTypes: [], maxAttempts: 2, getAvailableCapacity: () => 10, taskPartitioner, @@ -1139,7 +823,7 @@ describe('TaskClaiming', () => { expect(mockApmTrans.end).toHaveBeenCalledWith('success'); expect(taskManagerLogger.debug).toHaveBeenCalledWith( - 'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', + 'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0;', { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); @@ -1248,7 +932,6 @@ describe('TaskClaiming', () => { definitions: taskDefinitions, taskStore: store, excludedTaskTypes: [], - unusedTypes: [], maxAttempts: 2, getAvailableCapacity: () => 10, taskPartitioner, @@ -1271,7 +954,7 @@ describe('TaskClaiming', () => { expect(mockApmTrans.end).toHaveBeenCalledWith('success'); expect(taskManagerLogger.debug).toHaveBeenCalledWith( - 'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 1; removed: 0;', + 'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 1;', { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(taskManagerLogger.error).toHaveBeenCalledWith( @@ -1377,7 +1060,6 @@ describe('TaskClaiming', () => { definitions: taskDefinitions, taskStore: store, excludedTaskTypes: [], - unusedTypes: [], maxAttempts: 2, getAvailableCapacity: () => 10, taskPartitioner, @@ -1400,7 +1082,7 @@ describe('TaskClaiming', () => { expect(mockApmTrans.end).toHaveBeenCalledWith('success'); expect(taskManagerLogger.debug).toHaveBeenCalledWith( - 'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', + 'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0;', { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(taskManagerLogger.warn).toHaveBeenCalledWith( @@ -1504,7 +1186,6 @@ describe('TaskClaiming', () => { definitions: taskDefinitions, taskStore: store, excludedTaskTypes: [], - unusedTypes: [], maxAttempts: 2, getAvailableCapacity: () => 10, taskPartitioner, @@ -1619,7 +1300,6 @@ describe('TaskClaiming', () => { definitions: taskDefinitions, taskStore: store, excludedTaskTypes: [], - unusedTypes: [], maxAttempts: 2, getAvailableCapacity: () => 10, taskPartitioner, @@ -1642,7 +1322,7 @@ describe('TaskClaiming', () => { expect(mockApmTrans.end).toHaveBeenCalledWith('success'); expect(taskManagerLogger.debug).toHaveBeenCalledWith( - 'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 1; getErrors: 0; removed: 0;', + 'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 1; getErrors: 0;', { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(taskManagerLogger.error).toHaveBeenCalledWith( @@ -1753,7 +1433,6 @@ describe('TaskClaiming', () => { definitions: taskDefinitions, taskStore: store, excludedTaskTypes: [], - unusedTypes: [], maxAttempts: 2, getAvailableCapacity: () => 10, taskPartitioner, @@ -1776,7 +1455,7 @@ describe('TaskClaiming', () => { expect(mockApmTrans.end).toHaveBeenCalledWith('success'); expect(taskManagerLogger.debug).toHaveBeenCalledWith( - 'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', + 'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0;', { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(taskManagerLogger.error).not.toHaveBeenCalled(); @@ -1870,7 +1549,6 @@ describe('TaskClaiming', () => { definitions: taskDefinitions, taskStore: store, excludedTaskTypes: [], - unusedTypes: [], maxAttempts: 2, getAvailableCapacity: () => 10, taskPartitioner, @@ -2488,7 +2166,6 @@ describe('TaskClaiming', () => { strategy: CLAIM_STRATEGY_MGET, definitions, excludedTaskTypes: [], - unusedTypes: [], taskStore, maxAttempts: 2, getAvailableCapacity, diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index 16d9ba5c7fae7..431daab8dd2cb 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -57,7 +57,6 @@ interface OwnershipClaimingOpts { claimOwnershipUntil: Date; size: number; taskTypes: Set; - removedTypes: Set; getCapacity: (taskType?: string | undefined) => number; excludedTaskTypePatterns: string[]; taskStore: TaskStore; @@ -90,19 +89,16 @@ export async function claimAvailableTasksMget( async function claimAvailableTasks(opts: TaskClaimerOpts): Promise { const { getCapacity, claimOwnershipUntil, batches, events$, taskStore, taskPartitioner } = opts; - const { definitions, unusedTypes, excludedTaskTypes, taskMaxAttempts } = opts; + const { definitions, excludedTaskTypes, taskMaxAttempts } = opts; const logger = createWrappedLogger({ logger: opts.logger, tags: [claimAvailableTasksMget.name] }); const initialCapacity = getCapacity(); const stopTaskTimer = startTaskTimer(); - const removedTypes = new Set(unusedTypes); // REMOVED_TYPES - // get a list of candidate tasks to claim, with their version info const { docs, versionMap } = await searchAvailableTasks({ definitions, taskTypes: new Set(definitions.getAllTypes()), excludedTaskTypePatterns: excludedTaskTypes, - removedTypes, taskStore, events$, claimOwnershipUntil, @@ -125,18 +121,12 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise `task:${doc.id}`)); - // filter out stale, missing and removed tasks + // filter out stale and missing tasks const currentTasks: ConcreteTaskInstance[] = []; const staleTasks: ConcreteTaskInstance[] = []; const missingTasks: ConcreteTaskInstance[] = []; - const removedTasks: ConcreteTaskInstance[] = []; for (const searchDoc of docs) { - if (removedTypes.has(searchDoc.taskType)) { - removedTasks.push(searchDoc); - continue; - } - const searchVersion = versionMap.get(searchDoc.id); const latestVersion = docLatestVersions.get(`task:${searchDoc.id}`); if (!searchVersion || !latestVersion) { @@ -236,42 +226,8 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise 0) { - const tasksToRemove = Array.from(removedTasks); - const tasksToRemoveUpdates: PartialConcreteTaskInstance[] = []; - for (const task of tasksToRemove) { - tasksToRemoveUpdates.push({ - id: task.id, - status: TaskStatus.Unrecognized, - }); - } - - // don't worry too much about errors, we'll get them next time - try { - const removeResults = await taskStore.bulkPartialUpdate(tasksToRemoveUpdates); - for (const removeResult of removeResults) { - if (isOk(removeResult)) { - removedCount++; - } else { - const { id, type, error } = removeResult.error; - logger.warn( - `Error updating task ${id}:${type} to mark as unrecognized during claim: ${JSON.stringify( - error - )}` - ); - } - } - } catch (err) { - // swallow the error because this is unrelated to the claim cycle - logger.warn(`Error updating tasks to mark as unrecognized during claim: ${err}`); - } - } - // TODO: need a better way to generate stats - const message = `task claimer claimed: ${fullTasksToRun.length}; stale: ${staleTasks.length}; conflicts: ${conflicts}; missing: ${missingTasks.length}; capacity reached: ${leftOverTasks.length}; updateErrors: ${bulkUpdateErrors}; getErrors: ${bulkGetErrors}; removed: ${removedCount};`; + const message = `task claimer claimed: ${fullTasksToRun.length}; stale: ${staleTasks.length}; conflicts: ${conflicts}; missing: ${missingTasks.length}; capacity reached: ${leftOverTasks.length}; updateErrors: ${bulkUpdateErrors}; getErrors: ${bulkGetErrors};`; logger.debug(message); // build results @@ -306,7 +262,6 @@ export const NO_ASSIGNED_PARTITIONS_WARNING_INTERVAL = 60000; async function searchAvailableTasks({ definitions, taskTypes, - removedTypes, excludedTaskTypePatterns, taskStore, getCapacity, @@ -318,7 +273,6 @@ async function searchAvailableTasks({ const claimPartitions = buildClaimPartitions({ types: taskTypes, excludedTaskTypes, - removedTypes, getCapacity, definitions, }); @@ -352,10 +306,7 @@ async function searchAvailableTasks({ // Task must be enabled EnabledTask, // a task type that's not excluded (may be removed or not) - OneOfTaskTypes( - 'task.taskType', - claimPartitions.unlimitedTypes.concat(Array.from(removedTypes)) - ), + OneOfTaskTypes('task.taskType', claimPartitions.unlimitedTypes), // Either a task with idle status and runAt <= now or // status running or claiming with a retryAt <= now. shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt), @@ -407,7 +358,6 @@ async function searchAvailableTasks({ } interface ClaimPartitions { - removedTypes: string[]; unlimitedTypes: string[]; limitedTypes: Map; } @@ -415,30 +365,23 @@ interface ClaimPartitions { interface BuildClaimPartitionsOpts { types: Set; excludedTaskTypes: Set; - removedTypes: Set; getCapacity: (taskType?: string) => number; definitions: TaskTypeDictionary; } function buildClaimPartitions(opts: BuildClaimPartitionsOpts): ClaimPartitions { const result: ClaimPartitions = { - removedTypes: [], unlimitedTypes: [], limitedTypes: new Map(), }; - const { types, excludedTaskTypes, removedTypes, getCapacity, definitions } = opts; + const { types, excludedTaskTypes, getCapacity, definitions } = opts; for (const type of types) { const definition = definitions.get(type); if (definition == null) continue; if (excludedTaskTypes.has(type)) continue; - if (removedTypes.has(type)) { - result.removedTypes.push(type); - continue; - } - if (definition.maxConcurrency == null) { result.unlimitedTypes.push(definition.type); continue; diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.test.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.test.ts index 13e6faf2de0fd..623693e71c54d 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.test.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.test.ts @@ -99,14 +99,12 @@ describe('TaskClaiming', () => { hits = [generateFakeTasks(1)], versionConflicts = 2, excludedTaskTypes = [], - unusedTaskTypes = [], }: { storeOpts: Partial; taskClaimingOpts: Partial; hits?: ConcreteTaskInstance[][]; versionConflicts?: number; excludedTaskTypes?: string[]; - unusedTaskTypes?: string[]; }) { const definitions = storeOpts.definitions ?? taskDefinitions; const store = taskStoreMock.create({ taskManagerId: storeOpts.taskManagerId }); @@ -136,7 +134,6 @@ describe('TaskClaiming', () => { definitions, taskStore: store, excludedTaskTypes, - unusedTypes: unusedTaskTypes, maxAttempts: taskClaimingOpts.maxAttempts ?? 2, getAvailableCapacity: taskClaimingOpts.getAvailableCapacity ?? (() => 10), taskPartitioner, @@ -153,7 +150,6 @@ describe('TaskClaiming', () => { hits = [generateFakeTasks(1)], versionConflicts = 2, excludedTaskTypes = [], - unusedTaskTypes = [], }: { storeOpts: Partial; taskClaimingOpts: Partial; @@ -161,14 +157,12 @@ describe('TaskClaiming', () => { hits?: ConcreteTaskInstance[][]; versionConflicts?: number; excludedTaskTypes?: string[]; - unusedTaskTypes?: string[]; }) { const getCapacity = taskClaimingOpts.getAvailableCapacity ?? (() => 10); const { taskClaiming, store } = initialiseTestClaiming({ storeOpts, taskClaimingOpts, excludedTaskTypes, - unusedTaskTypes, hits, versionConflicts, }); @@ -471,7 +465,6 @@ if (doc['task.runAt'].size()!=0) { 'anotherLimitedToOne', 'limitedToTwo', ], - unusedTaskTypes: [], taskMaxAttempts: { unlimited: maxAttempts, }, @@ -493,7 +486,6 @@ if (doc['task.runAt'].size()!=0) { 'anotherLimitedToOne', 'limitedToTwo', ], - unusedTaskTypes: [], taskMaxAttempts: { limitedToOne: maxAttempts, }, @@ -640,7 +632,6 @@ if (doc['task.runAt'].size()!=0) { }, taskPartitioner, excludedTaskTypes: [], - unusedTypes: [], }); const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ @@ -848,129 +839,6 @@ if (doc['task.runAt'].size()!=0) { expect(firstCycle).not.toMatchObject(secondCycle); }); - test('it passes any unusedTaskTypes to script', async () => { - const maxAttempts = _.random(2, 43); - const customMaxAttempts = _.random(44, 100); - const taskManagerId = uuidv1(); - const fieldUpdates = { - ownerId: taskManagerId, - retryAt: new Date(Date.now()), - }; - const definitions = new TaskTypeDictionary(mockLogger()); - definitions.registerTaskDefinitions({ - foo: { - title: 'foo', - createTaskRunner: jest.fn(), - }, - bar: { - title: 'bar', - maxAttempts: customMaxAttempts, - createTaskRunner: jest.fn(), - }, - foobar: { - title: 'foobar', - maxAttempts: customMaxAttempts, - createTaskRunner: jest.fn(), - }, - }); - - const { - args: { - updateByQuery: [{ query, script }], - }, - } = await testClaimAvailableTasks({ - storeOpts: { - definitions, - taskManagerId, - }, - taskClaimingOpts: { - maxAttempts, - }, - claimingOpts: { - claimOwnershipUntil: new Date(), - }, - excludedTaskTypes: ['foobar'], - unusedTaskTypes: ['barfoo'], - }); - expect(query).toMatchObject({ - bool: { - must: [ - { - bool: { - must: [ - { - term: { - 'task.enabled': true, - }, - }, - ], - }, - }, - { - bool: { - should: [ - { - bool: { - must: [ - { term: { 'task.status': 'idle' } }, - { range: { 'task.runAt': { lte: 'now' } } }, - ], - }, - }, - { - bool: { - must: [ - { - bool: { - should: [ - { term: { 'task.status': 'running' } }, - { term: { 'task.status': 'claiming' } }, - ], - }, - }, - { range: { 'task.retryAt': { lte: 'now' } } }, - ], - }, - }, - ], - }, - }, - ], - filter: [ - { - bool: { - must_not: [ - { - bool: { - should: [ - { term: { 'task.status': 'running' } }, - { term: { 'task.status': 'claiming' } }, - ], - must: { range: { 'task.retryAt': { gt: 'now' } } }, - }, - }, - ], - }, - }, - ], - }, - }); - expect(script).toMatchObject({ - source: expect.any(String), - lang: 'painless', - params: { - fieldUpdates, - claimableTaskTypes: ['foo', 'bar'], - skippedTaskTypes: ['foobar'], - unusedTaskTypes: ['barfoo'], - taskMaxAttempts: { - bar: customMaxAttempts, - foo: maxAttempts, - }, - }, - }); - }); - test('it claims tasks by setting their ownerId, status and retryAt', async () => { const taskManagerId = uuidv1(); const claimOwnershipUntil = new Date(Date.now()); @@ -1356,7 +1224,6 @@ if (doc['task.runAt'].size()!=0) { strategy: 'update_by_query', definitions, excludedTaskTypes: [], - unusedTypes: [], taskStore, maxAttempts: 2, getAvailableCapacity, diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.ts index 5a4bccb43b984..fdfd09e07f9c7 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.ts @@ -51,7 +51,6 @@ interface OwnershipClaimingOpts { taskStore: TaskStore; events$: Subject; definitions: TaskTypeDictionary; - unusedTypes: string[]; excludedTaskTypes: string[]; taskMaxAttempts: Record; } @@ -60,7 +59,7 @@ export async function claimAvailableTasksUpdateByQuery( opts: TaskClaimerOpts ): Promise { const { getCapacity, claimOwnershipUntil, batches, events$, taskStore } = opts; - const { definitions, unusedTypes, excludedTaskTypes, taskMaxAttempts } = opts; + const { definitions, excludedTaskTypes, taskMaxAttempts } = opts; const initialCapacity = getCapacity(); let accumulatedResult = getEmptyClaimOwnershipResult(); @@ -83,7 +82,6 @@ export async function claimAvailableTasksUpdateByQuery( taskTypes: isLimited(batch) ? new Set([batch.tasksTypes]) : batch.tasksTypes, taskStore, definitions, - unusedTypes, excludedTaskTypes, taskMaxAttempts, }); @@ -137,7 +135,6 @@ async function markAvailableTasksAsClaimed({ claimOwnershipUntil, size, taskTypes, - unusedTypes, taskMaxAttempts, }: OwnershipClaimingOpts): Promise { const { taskTypesToSkip = [], taskTypesToClaim = [] } = groupBy( @@ -164,7 +161,6 @@ async function markAvailableTasksAsClaimed({ }, claimableTaskTypes: taskTypesToClaim, skippedTaskTypes: taskTypesToSkip, - unusedTaskTypes: unusedTypes, taskMaxAttempts: pick(taskMaxAttempts, taskTypesToClaim), }); diff --git a/x-pack/plugins/task_manager/tsconfig.json b/x-pack/plugins/task_manager/tsconfig.json index 55ad764c5bdcd..b11eaaf44a905 100644 --- a/x-pack/plugins/task_manager/tsconfig.json +++ b/x-pack/plugins/task_manager/tsconfig.json @@ -27,7 +27,8 @@ "@kbn/logging", "@kbn/core-lifecycle-server", "@kbn/cloud-plugin", - "@kbn/core-saved-objects-base-server-internal" + "@kbn/core-saved-objects-base-server-internal", + "@kbn/core-elasticsearch-server", ], "exclude": ["target/**/*"] } diff --git a/x-pack/test/alerting_api_integration/common/plugins/task_manager_fixture/server/plugin.ts b/x-pack/test/alerting_api_integration/common/plugins/task_manager_fixture/server/plugin.ts index 6e4eba065ec70..f6e3383270436 100644 --- a/x-pack/test/alerting_api_integration/common/plugins/task_manager_fixture/server/plugin.ts +++ b/x-pack/test/alerting_api_integration/common/plugins/task_manager_fixture/server/plugin.ts @@ -95,6 +95,34 @@ export class SampleTaskManagerFixturePlugin return res.ok({ body: {} }); } ); + + router.post( + { + path: '/api/alerting_tasks/run_mark_tasks_as_unrecognized', + validate: { + body: schema.object({}), + }, + }, + async ( + context: RequestHandlerContext, + req: KibanaRequest, + res: KibanaResponseFactory + ): Promise> => { + try { + const taskManager = await this.taskManagerStart; + await taskManager.ensureScheduled({ + id: 'mark_removed_tasks_as_unrecognized', + taskType: 'task_manager:mark_removed_tasks_as_unrecognized', + schedule: { interval: '1h' }, + state: {}, + params: {}, + }); + return res.ok({ body: await taskManager.runSoon('mark_removed_tasks_as_unrecognized') }); + } catch (err) { + return res.ok({ body: { id: 'mark_removed_tasks_as_unrecognized', error: `${err}` } }); + } + } + ); } public start(core: CoreStart, { taskManager }: SampleTaskManagerFixtureStartDeps) { diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/scheduled_task_id.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/scheduled_task_id.ts index 0086dd2679c74..f05075be810a1 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/scheduled_task_id.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/scheduled_task_id.ts @@ -138,6 +138,12 @@ export default function createScheduledTaskIdTests({ getService }: FtrProviderCo // When we enable the rule, the unrecognized task should be removed and a new // task created in its place + await supertestWithoutAuth + .post('/api/alerting_tasks/run_mark_tasks_as_unrecognized') + .set('kbn-xsrf', 'foo') + .send({}) + .expect(200); + // scheduled task should exist and be unrecognized await retry.try(async () => { const taskRecordLoaded = await getScheduledTask(RULE_ID); diff --git a/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts b/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts index 51b0842e60bc8..c5927d894911c 100644 --- a/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts +++ b/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts @@ -114,6 +114,34 @@ export function initRoutes( } ); + router.post( + { + path: `/api/sample_tasks/run_mark_removed_tasks_as_unrecognized`, + validate: { + body: schema.object({}), + }, + }, + async function ( + context: RequestHandlerContext, + req: KibanaRequest, + res: KibanaResponseFactory + ): Promise> { + try { + const taskManager = await taskManagerStart; + await taskManager.ensureScheduled({ + id: 'mark_removed_tasks_as_unrecognized', + taskType: 'task_manager:mark_removed_tasks_as_unrecognized', + schedule: { interval: '1h' }, + state: {}, + params: {}, + }); + return res.ok({ body: await taskManager.runSoon('mark_removed_tasks_as_unrecognized') }); + } catch (err) { + return res.ok({ body: { id: 'mark_removed_tasks_as_unrecognized', error: `${err}` } }); + } + } + ); + router.post( { path: `/api/sample_tasks/bulk_enable`, diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts index 091e0fe01e415..c8056c2ee205e 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts @@ -168,6 +168,7 @@ export default function ({ getService }: FtrProviderContext) { 'security:telemetry-timelines', 'session_cleanup', 'task_manager:delete_inactive_background_task_nodes', + 'task_manager:mark_removed_tasks_as_unrecognized', ]); }); }); diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management_removed_types.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management_removed_types.ts index aae90a52572c7..a7447353e805a 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management_removed_types.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management_removed_types.ts @@ -92,6 +92,12 @@ export default function ({ getService }: FtrProviderContext) { let scheduledTaskRuns = 0; let scheduledTaskInstanceRunAt = scheduledTask.runAt; + await request + .post('/api/sample_tasks/run_mark_removed_tasks_as_unrecognized') + .set('kbn-xsrf', 'xxx') + .send({}) + .expect(200); + await retry.try(async () => { const tasks = (await currentTasks()).docs; expect(tasks.length).to.eql(3); diff --git a/x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget/server/init_routes.ts b/x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget/server/init_routes.ts index f1e697399fe09..acdbae0b00337 100644 --- a/x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget/server/init_routes.ts +++ b/x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget/server/init_routes.ts @@ -117,6 +117,34 @@ export function initRoutes( } ); + router.post( + { + path: `/api/sample_tasks/run_mark_removed_tasks_as_unrecognized`, + validate: { + body: schema.object({}), + }, + }, + async function ( + context: RequestHandlerContext, + req: KibanaRequest, + res: KibanaResponseFactory + ): Promise> { + try { + const taskManager = await taskManagerStart; + await taskManager.ensureScheduled({ + id: 'mark_removed_tasks_as_unrecognized', + taskType: 'task_manager:mark_removed_tasks_as_unrecognized', + schedule: { interval: '1h' }, + state: {}, + params: {}, + }); + return res.ok({ body: await taskManager.runSoon('mark_removed_tasks_as_unrecognized') }); + } catch (err) { + return res.ok({ body: { id: 'mark_removed_tasks_as_unrecognized', error: `${err}` } }); + } + } + ); + router.post( { path: `/api/sample_tasks/bulk_enable`, diff --git a/x-pack/test/task_manager_claimer_update_by_query/test_suites/task_manager/task_management_removed_types.ts b/x-pack/test/task_manager_claimer_update_by_query/test_suites/task_manager/task_management_removed_types.ts index aae90a52572c7..a7447353e805a 100644 --- a/x-pack/test/task_manager_claimer_update_by_query/test_suites/task_manager/task_management_removed_types.ts +++ b/x-pack/test/task_manager_claimer_update_by_query/test_suites/task_manager/task_management_removed_types.ts @@ -92,6 +92,12 @@ export default function ({ getService }: FtrProviderContext) { let scheduledTaskRuns = 0; let scheduledTaskInstanceRunAt = scheduledTask.runAt; + await request + .post('/api/sample_tasks/run_mark_removed_tasks_as_unrecognized') + .set('kbn-xsrf', 'xxx') + .send({}) + .expect(200); + await retry.try(async () => { const tasks = (await currentTasks()).docs; expect(tasks.length).to.eql(3);