From 1df44e5c1735ac203d1bd6afb18ec192a444d511 Mon Sep 17 00:00:00 2001 From: Tyler Smalley Date: Tue, 21 Jul 2020 12:13:19 -0700 Subject: [PATCH] Revert "[Task Manager] Batches the update operations in Task Manager (#71470) (#72626)" This reverts commit 63786472f52c74bdf569e46062a0c52e7e138127. --- .../server/buffered_task_store.test.ts | 82 ----- .../server/buffered_task_store.ts | 39 --- .../server/lib/bulk_operation_buffer.test.ts | 288 ------------------ .../server/lib/bulk_operation_buffer.ts | 129 -------- .../server/lib/result_type.test.ts | 27 -- .../task_manager/server/lib/result_type.ts | 19 -- .../task_manager/server/task_manager.ts | 10 +- .../task_manager/server/task_runner.ts | 2 +- .../task_manager/server/task_store.mock.ts | 31 -- .../plugins/task_manager/server/task_store.ts | 65 +--- 10 files changed, 7 insertions(+), 685 deletions(-) delete mode 100644 x-pack/plugins/task_manager/server/buffered_task_store.test.ts delete mode 100644 x-pack/plugins/task_manager/server/buffered_task_store.ts delete mode 100644 x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts delete mode 100644 x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts delete mode 100644 x-pack/plugins/task_manager/server/lib/result_type.test.ts delete mode 100644 x-pack/plugins/task_manager/server/task_store.mock.ts diff --git a/x-pack/plugins/task_manager/server/buffered_task_store.test.ts b/x-pack/plugins/task_manager/server/buffered_task_store.test.ts deleted file mode 100644 index 8e18405c79ed2..0000000000000 --- a/x-pack/plugins/task_manager/server/buffered_task_store.test.ts +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import uuid from 'uuid'; -import { taskStoreMock } from './task_store.mock'; -import { BufferedTaskStore } from './buffered_task_store'; -import { asErr, asOk } from './lib/result_type'; -import { TaskStatus } from './task'; - -describe('Buffered Task Store', () => { - test('proxies the TaskStore for `maxAttempts` and `remove`', async () => { - const taskStore = taskStoreMock.create({ maxAttempts: 10 }); - taskStore.bulkUpdate.mockResolvedValue([]); - const bufferedStore = new BufferedTaskStore(taskStore, {}); - - expect(bufferedStore.maxAttempts).toEqual(10); - - bufferedStore.remove('1'); - expect(taskStore.remove).toHaveBeenCalledWith('1'); - }); - - describe('update', () => { - test("proxies the TaskStore's `bulkUpdate`", async () => { - const taskStore = taskStoreMock.create({ maxAttempts: 10 }); - const bufferedStore = new BufferedTaskStore(taskStore, {}); - - const task = mockTask(); - - taskStore.bulkUpdate.mockResolvedValue([asOk(task)]); - - expect(await bufferedStore.update(task)).toMatchObject(task); - expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task]); - }); - - test('handles partially successfull bulkUpdates resolving each call appropriately', async () => { - const taskStore = taskStoreMock.create({ maxAttempts: 10 }); - const bufferedStore = new BufferedTaskStore(taskStore, {}); - - const tasks = [mockTask(), mockTask(), mockTask()]; - - taskStore.bulkUpdate.mockResolvedValueOnce([ - asOk(tasks[0]), - asErr({ entity: tasks[1], error: new Error('Oh no, something went terribly wrong') }), - asOk(tasks[2]), - ]); - - const results = [ - bufferedStore.update(tasks[0]), - bufferedStore.update(tasks[1]), - bufferedStore.update(tasks[2]), - ]; - expect(await results[0]).toMatchObject(tasks[0]); - expect(results[1]).rejects.toMatchInlineSnapshot( - `[Error: Oh no, something went terribly wrong]` - ); - expect(await results[2]).toMatchObject(tasks[2]); - }); - }); -}); - -function mockTask() { - return { - id: `task_${uuid.v4()}`, - attempts: 0, - schedule: undefined, - params: { hello: 'world' }, - retryAt: null, - runAt: new Date(), - scheduledAt: new Date(), - scope: undefined, - startedAt: null, - state: { foo: 'bar' }, - status: TaskStatus.Idle, - taskType: 'report', - user: undefined, - version: '123', - ownerId: '123', - }; -} diff --git a/x-pack/plugins/task_manager/server/buffered_task_store.ts b/x-pack/plugins/task_manager/server/buffered_task_store.ts deleted file mode 100644 index e1e5f802204c1..0000000000000 --- a/x-pack/plugins/task_manager/server/buffered_task_store.ts +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { TaskStore } from './task_store'; -import { ConcreteTaskInstance } from './task'; -import { Updatable } from './task_runner'; -import { createBuffer, Operation, BufferOptions } from './lib/bulk_operation_buffer'; -import { unwrapPromise } from './lib/result_type'; - -// by default allow updates to be buffered for up to 50ms -const DEFAULT_BUFFER_MAX_DURATION = 50; - -export class BufferedTaskStore implements Updatable { - private bufferedUpdate: Operation; - constructor(private readonly taskStore: TaskStore, options: BufferOptions) { - this.bufferedUpdate = createBuffer( - (docs) => taskStore.bulkUpdate(docs), - { - bufferMaxDuration: DEFAULT_BUFFER_MAX_DURATION, - ...options, - } - ); - } - - public get maxAttempts(): number { - return this.taskStore.maxAttempts; - } - - public async update(doc: ConcreteTaskInstance): Promise { - return unwrapPromise(this.bufferedUpdate(doc)); - } - - public async remove(id: string): Promise { - return this.taskStore.remove(id); - } -} diff --git a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts deleted file mode 100644 index 9293656233026..0000000000000 --- a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts +++ /dev/null @@ -1,288 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { createBuffer, Entity, OperationError, BulkOperation } from './bulk_operation_buffer'; -import { mapErr, asOk, asErr, Ok, Err } from './result_type'; - -interface TaskInstance extends Entity { - attempts: number; -} - -const createTask = (function (): () => TaskInstance { - let counter = 0; - return () => ({ - id: `task ${++counter}`, - attempts: 1, - }); -})(); - -function incrementAttempts(task: TaskInstance): Ok { - return asOk({ - ...task, - attempts: task.attempts + 1, - }); -} - -function errorAttempts(task: TaskInstance): Err> { - return asErr({ - entity: incrementAttempts(task).value, - error: { name: '', message: 'Oh no, something went terribly wrong', statusCode: 500 }, - }); -} - -describe('Bulk Operation Buffer', () => { - describe('createBuffer()', () => { - test('batches up multiple Operation calls', async () => { - const bulkUpdate: jest.Mocked> = jest.fn( - ([task1, task2]) => { - return Promise.resolve([incrementAttempts(task1), incrementAttempts(task2)]); - } - ); - - const bufferedUpdate = createBuffer(bulkUpdate); - - const task1 = createTask(); - const task2 = createTask(); - - expect(await Promise.all([bufferedUpdate(task1), bufferedUpdate(task2)])).toMatchObject([ - incrementAttempts(task1), - incrementAttempts(task2), - ]); - expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]); - }); - - test('batch updates are executed at most by the next Event Loop tick by default', async () => { - const bulkUpdate: jest.Mocked> = jest.fn((tasks) => { - return Promise.resolve(tasks.map(incrementAttempts)); - }); - - const bufferedUpdate = createBuffer(bulkUpdate); - - const task1 = createTask(); - const task2 = createTask(); - const task3 = createTask(); - const task4 = createTask(); - const task5 = createTask(); - const task6 = createTask(); - - return new Promise((resolve) => { - Promise.all([bufferedUpdate(task1), bufferedUpdate(task2)]).then((_) => { - expect(bulkUpdate).toHaveBeenCalledTimes(1); - expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]); - expect(bulkUpdate).not.toHaveBeenCalledWith([task3, task4]); - }); - - setTimeout(() => { - // on next tick - setTimeout(() => { - // on next tick - expect(bulkUpdate).toHaveBeenCalledTimes(2); - Promise.all([bufferedUpdate(task5), bufferedUpdate(task6)]).then((_) => { - expect(bulkUpdate).toHaveBeenCalledTimes(3); - expect(bulkUpdate).toHaveBeenCalledWith([task5, task6]); - resolve(); - }); - }, 0); - - expect(bulkUpdate).toHaveBeenCalledTimes(1); - Promise.all([bufferedUpdate(task3), bufferedUpdate(task4)]).then((_) => { - expect(bulkUpdate).toHaveBeenCalledTimes(2); - expect(bulkUpdate).toHaveBeenCalledWith([task3, task4]); - }); - }, 0); - }); - }); - - test('batch updates can be customised to execute after a certain period', async () => { - const bulkUpdate: jest.Mocked> = jest.fn((tasks) => { - return Promise.resolve(tasks.map(incrementAttempts)); - }); - - const bufferMaxDuration = 50; - const bufferedUpdate = createBuffer(bulkUpdate, { bufferMaxDuration }); - - const task1 = createTask(); - const task2 = createTask(); - const task3 = createTask(); - const task4 = createTask(); - const task5 = createTask(); - const task6 = createTask(); - - return new Promise((resolve) => { - Promise.all([bufferedUpdate(task1), bufferedUpdate(task2)]).then((_) => { - expect(bulkUpdate).toHaveBeenCalledTimes(1); - expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]); - expect(bulkUpdate).not.toHaveBeenCalledWith([task3, task4]); - }); - - setTimeout(() => { - // on next tick - setTimeout(() => { - // on next tick - expect(bulkUpdate).toHaveBeenCalledTimes(2); - Promise.all([bufferedUpdate(task5), bufferedUpdate(task6)]).then((_) => { - expect(bulkUpdate).toHaveBeenCalledTimes(3); - expect(bulkUpdate).toHaveBeenCalledWith([task5, task6]); - resolve(); - }); - }, bufferMaxDuration + 1); - - expect(bulkUpdate).toHaveBeenCalledTimes(1); - Promise.all([bufferedUpdate(task3), bufferedUpdate(task4)]).then((_) => { - expect(bulkUpdate).toHaveBeenCalledTimes(2); - expect(bulkUpdate).toHaveBeenCalledWith([task3, task4]); - }); - }, bufferMaxDuration + 1); - }); - }); - - test('batch updates are executed once queue hits a certain bound', async () => { - const bulkUpdate: jest.Mocked> = jest.fn((tasks) => { - return Promise.resolve(tasks.map(incrementAttempts)); - }); - - const bufferedUpdate = createBuffer(bulkUpdate, { - bufferMaxDuration: 100, - bufferMaxOperations: 2, - }); - - const task1 = createTask(); - const task2 = createTask(); - const task3 = createTask(); - const task4 = createTask(); - const task5 = createTask(); - - return new Promise((resolve) => { - bufferedUpdate(task1); - bufferedUpdate(task2); - bufferedUpdate(task3); - bufferedUpdate(task4); - - setTimeout(() => { - expect(bulkUpdate).toHaveBeenCalledTimes(2); - expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]); - expect(bulkUpdate).toHaveBeenCalledWith([task3, task4]); - - setTimeout(() => { - expect(bulkUpdate).toHaveBeenCalledTimes(2); - bufferedUpdate(task5).then((_) => { - expect(bulkUpdate).toHaveBeenCalledTimes(3); - expect(bulkUpdate).toHaveBeenCalledWith([task5]); - resolve(); - }); - }, 50); - }, 50); - }); - }); - - test('queue upper bound is reset after each flush', async () => { - const bulkUpdate: jest.Mocked> = jest.fn((tasks) => { - return Promise.resolve(tasks.map(incrementAttempts)); - }); - - const bufferMaxDuration = 100; - const bufferedUpdate = createBuffer(bulkUpdate, { - bufferMaxDuration, - bufferMaxOperations: 3, - }); - - const task1 = createTask(); - const task2 = createTask(); - const task3 = createTask(); - const task4 = createTask(); - - return new Promise((resolve) => { - bufferedUpdate(task1); - bufferedUpdate(task2); - - setTimeout(() => { - expect(bulkUpdate).toHaveBeenCalledTimes(1); - expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]); - - bufferedUpdate(task3); - bufferedUpdate(task4); - - setTimeout(() => { - expect(bulkUpdate).toHaveBeenCalledTimes(1); - - setTimeout(() => { - expect(bulkUpdate).toHaveBeenCalledTimes(2); - expect(bulkUpdate).toHaveBeenCalledWith([task3, task4]); - resolve(); - }, bufferMaxDuration / 2); - }, bufferMaxDuration / 2); - }, bufferMaxDuration + 1); - }); - }); - test('handles both resolutions and rejections at individual task level', async (done) => { - const bulkUpdate: jest.Mocked> = jest.fn( - ([task1, task2, task3]) => { - return Promise.resolve([ - incrementAttempts(task1), - errorAttempts(task2), - incrementAttempts(task3), - ]); - } - ); - - const bufferedUpdate = createBuffer(bulkUpdate); - - const task1 = createTask(); - const task2 = createTask(); - const task3 = createTask(); - - return Promise.all([ - expect(bufferedUpdate(task1)).resolves.toMatchObject(incrementAttempts(task1)), - expect(bufferedUpdate(task2)).rejects.toMatchObject( - mapErr( - (err: OperationError) => asErr(err.error), - errorAttempts(task2) - ) - ), - expect(bufferedUpdate(task3)).resolves.toMatchObject(incrementAttempts(task3)), - ]).then(() => { - expect(bulkUpdate).toHaveBeenCalledTimes(1); - done(); - }); - }); - - test('handles bulkUpdate failure', async (done) => { - const bulkUpdate: jest.Mocked> = jest.fn(() => { - return Promise.reject(new Error('bulkUpdate is an illusion')); - }); - - const bufferedUpdate = createBuffer(bulkUpdate); - - const task1 = createTask(); - const task2 = createTask(); - const task3 = createTask(); - - return Promise.all([ - expect(bufferedUpdate(task1)).rejects.toMatchInlineSnapshot(` - Object { - "error": [Error: bulkUpdate is an illusion], - "tag": "err", - } - `), - expect(bufferedUpdate(task2)).rejects.toMatchInlineSnapshot(` - Object { - "error": [Error: bulkUpdate is an illusion], - "tag": "err", - } - `), - expect(bufferedUpdate(task3)).rejects.toMatchInlineSnapshot(` - Object { - "error": [Error: bulkUpdate is an illusion], - "tag": "err", - } - `), - ]).then(() => { - expect(bulkUpdate).toHaveBeenCalledTimes(1); - done(); - }); - }); - }); -}); diff --git a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts deleted file mode 100644 index fca7ce02e0cd7..0000000000000 --- a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { keyBy, map } from 'lodash'; -import { Subject, race, from } from 'rxjs'; -import { bufferWhen, filter, bufferCount, flatMap, mapTo, first } from 'rxjs/operators'; -import { either, Result, asOk, asErr, Ok, Err } from './result_type'; - -export interface BufferOptions { - bufferMaxDuration?: number; - bufferMaxOperations?: number; -} - -export interface Entity { - id: string; -} - -export interface OperationError { - entity: Input; - error: ErrorOutput; -} - -export type OperationResult = Result< - Output, - OperationError ->; - -export type Operation = ( - entity: Input -) => Promise>; - -export type BulkOperation = ( - entities: Input[] -) => Promise>>; - -const DONT_FLUSH = false; -const FLUSH = true; - -export function createBuffer( - bulkOperation: BulkOperation, - { bufferMaxDuration = 0, bufferMaxOperations = Number.MAX_VALUE }: BufferOptions = {} -): Operation { - const flushBuffer = new Subject(); - - const storeUpdateBuffer = new Subject<{ - entity: Input; - onSuccess: (entity: Ok) => void; - onFailure: (error: Err) => void; - }>(); - - storeUpdateBuffer - .pipe( - bufferWhen(() => flushBuffer), - filter((tasks) => tasks.length > 0) - ) - .subscribe((entities) => { - const entityById = keyBy(entities, ({ entity: { id } }) => id); - bulkOperation(map(entities, 'entity')) - .then((results) => { - results.forEach((result) => - either( - result, - (entity) => { - entityById[entity.id].onSuccess(asOk(entity)); - }, - ({ entity, error }: OperationError) => { - entityById[entity.id].onFailure(asErr(error)); - } - ) - ); - }) - .catch((ex) => { - entities.forEach(({ onFailure }) => onFailure(asErr(ex))); - }); - }); - - let countInBuffer = 0; - const flushAndResetCounter = () => { - countInBuffer = 0; - flushBuffer.next(); - }; - storeUpdateBuffer - .pipe( - // complete once the buffer has either filled to `bufferMaxOperations` or - // a `bufferMaxDuration` has passed. Default to `bufferMaxDuration` being the - // current event loop tick rather than a fixed duration - flatMap(() => { - return ++countInBuffer === 1 - ? race([ - // the race is started in response to the first operation into the buffer - // so we flush once the remaining operations come in (which is `bufferMaxOperations - 1`) - storeUpdateBuffer.pipe(bufferCount(bufferMaxOperations - 1)), - bufferMaxDuration - ? // if theres a max duration, flush buffer based on that - from(resolveIn(bufferMaxDuration)) - : // ensure we flush by the end of the "current" event loop tick - from(resolveImmediate()), - ]).pipe(first(), mapTo(FLUSH)) - : from([DONT_FLUSH]); - }), - filter((shouldFlush) => shouldFlush) - ) - .subscribe({ - next: flushAndResetCounter, - // As this stream is just trying to decide when to flush - // there's no data to lose, so in the case that an error - // is thrown, lets just flush - error: flushAndResetCounter, - }); - - return async function (entity: Input) { - return new Promise((resolve, reject) => { - storeUpdateBuffer.next({ entity, onSuccess: resolve, onFailure: reject }); - }); - }; -} - -function resolveImmediate() { - return new Promise(setImmediate); -} - -function resolveIn(ms: number) { - return new Promise((resolve) => { - setTimeout(resolve, ms); - }); -} diff --git a/x-pack/plugins/task_manager/server/lib/result_type.test.ts b/x-pack/plugins/task_manager/server/lib/result_type.test.ts deleted file mode 100644 index 480a732f1f617..0000000000000 --- a/x-pack/plugins/task_manager/server/lib/result_type.test.ts +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import uuid from 'uuid'; -import { unwrapPromise, asOk, asErr } from './result_type'; - -describe(`Result`, () => { - describe(`unwrapPromise`, () => { - test(`unwraps OKs from the result`, async () => { - const uniqueId = uuid.v4(); - expect(await unwrapPromise(Promise.resolve(asOk(uniqueId)))).toEqual(uniqueId); - }); - - test(`unwraps Errs from the result`, async () => { - const uniqueId = uuid.v4(); - expect(unwrapPromise(Promise.resolve(asErr(uniqueId)))).rejects.toEqual(uniqueId); - }); - - test(`unwraps Errs from the result when promise rejects`, async () => { - const uniqueId = uuid.v4(); - expect(unwrapPromise(Promise.reject(asErr(uniqueId)))).rejects.toEqual(uniqueId); - }); - }); -}); diff --git a/x-pack/plugins/task_manager/server/lib/result_type.ts b/x-pack/plugins/task_manager/server/lib/result_type.ts index d21c17d3bb5b3..edf4d84dd226d 100644 --- a/x-pack/plugins/task_manager/server/lib/result_type.ts +++ b/x-pack/plugins/task_manager/server/lib/result_type.ts @@ -47,25 +47,6 @@ export async function promiseResult(future: Promise): Promise(future: Promise>): Promise { - return future - .catch( - // catch rejection as we expect the result of the rejected promise - // to be wrapped in a Result - sadly there's no way to "Type" this - // requirment in Typescript as Promises do not enfore a type on their - // rejection - // The `then` will then unwrap the Result from around `ex` for us - (ex: Err) => ex - ) - .then((result: Result) => - map( - result, - (value: T) => Promise.resolve(value), - (err: E) => Promise.reject(err) - ) - ); -} - export function unwrap(result: Result): T | E { return isOk(result) ? result.value : result.error; } diff --git a/x-pack/plugins/task_manager/server/task_manager.ts b/x-pack/plugins/task_manager/server/task_manager.ts index 35ca439bb9130..23cb33cfac6c2 100644 --- a/x-pack/plugins/task_manager/server/task_manager.ts +++ b/x-pack/plugins/task_manager/server/task_manager.ts @@ -57,7 +57,6 @@ import { } from './task_store'; import { identifyEsError } from './lib/identify_es_error'; import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields'; -import { BufferedTaskStore } from './buffered_task_store'; const VERSION_CONFLICT_STATUS = 409; @@ -91,10 +90,7 @@ export type TaskLifecycleEvent = TaskMarkRunning | TaskRun | TaskClaim | TaskRun */ export class TaskManager { private definitions: TaskDictionary = {}; - private store: TaskStore; - private bufferedStore: BufferedTaskStore; - private logger: Logger; private pool: TaskPool; // all task related events (task claimed, task marked as running, etc.) are emitted through events$ @@ -143,10 +139,6 @@ export class TaskManager { // pipe store events into the TaskManager's event stream this.store.events.subscribe((event) => this.events$.next(event)); - this.bufferedStore = new BufferedTaskStore(this.store, { - bufferMaxOperations: opts.config.max_workers, - }); - this.pool = new TaskPool({ logger: this.logger, maxWorkers: opts.config.max_workers, @@ -173,7 +165,7 @@ export class TaskManager { return new TaskManagerRunner({ logger: this.logger, instance, - store: this.bufferedStore, + store: this.store, definitions: this.definitions, beforeRun: this.middleware.beforeRun, beforeMarkRunning: this.middleware.beforeMarkRunning, diff --git a/x-pack/plugins/task_manager/server/task_runner.ts b/x-pack/plugins/task_manager/server/task_runner.ts index ebf13fac2f311..4c690a5675f61 100644 --- a/x-pack/plugins/task_manager/server/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_runner.ts @@ -49,7 +49,7 @@ export interface TaskRunner { toString: () => string; } -export interface Updatable { +interface Updatable { readonly maxAttempts: number; update(doc: ConcreteTaskInstance): Promise; remove(id: string): Promise; diff --git a/x-pack/plugins/task_manager/server/task_store.mock.ts b/x-pack/plugins/task_manager/server/task_store.mock.ts deleted file mode 100644 index 86db695bc5e2c..0000000000000 --- a/x-pack/plugins/task_manager/server/task_store.mock.ts +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { TaskStore } from './task_store'; - -interface TaskStoreOptions { - maxAttempts?: number; - index?: string; - taskManagerId?: string; -} -export const taskStoreMock = { - create({ maxAttempts = 0, index = '', taskManagerId = '' }: TaskStoreOptions) { - const mocked = ({ - update: jest.fn(), - remove: jest.fn(), - schedule: jest.fn(), - claimAvailableTasks: jest.fn(), - bulkUpdate: jest.fn(), - get: jest.fn(), - getLifecycle: jest.fn(), - fetch: jest.fn(), - maxAttempts, - index, - taskManagerId, - } as unknown) as jest.Mocked; - return mocked; - }, -}; diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 7ec3db5c99aa7..4a691e17011e8 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -17,10 +17,9 @@ import { SavedObjectsSerializer, SavedObjectsRawDoc, ISavedObjectsRepository, - SavedObjectsUpdateResponse, } from '../../../../src/core/server'; -import { asOk, asErr, Result } from './lib/result_type'; +import { asOk, asErr } from './lib/result_type'; import { ConcreteTaskInstance, @@ -99,10 +98,10 @@ export interface ClaimOwnershipResult { docs: ConcreteTaskInstance[]; } -export type BulkUpdateResult = Result< - ConcreteTaskInstance, - { entity: ConcreteTaskInstance; error: Error } ->; +export interface BulkUpdateTaskFailureResult { + error: NonNullable; + task: ConcreteTaskInstance; +} export interface UpdateByQueryResult { updated: number; @@ -333,54 +332,6 @@ export class TaskStore { ); } - /** - * Updates the specified docs in the index, returning the docs - * with their versions up to date. - * - * @param {Array} docs - * @returns {Promise>} - */ - public async bulkUpdate(docs: ConcreteTaskInstance[]): Promise { - const attributesByDocId = docs.reduce((attrsById, doc) => { - attrsById.set(doc.id, taskInstanceToAttributes(doc)); - return attrsById; - }, new Map()); - - const updatedSavedObjects: Array = ( - await this.savedObjectsRepository.bulkUpdate( - docs.map((doc) => ({ - type: 'task', - id: doc.id, - options: { version: doc.version }, - attributes: attributesByDocId.get(doc.id)!, - })), - { - refresh: false, - } - ) - ).saved_objects; - - return updatedSavedObjects.map((updatedSavedObject, index) => - isSavedObjectsUpdateResponse(updatedSavedObject) - ? asOk( - savedObjectToConcreteTaskInstance({ - ...updatedSavedObject, - attributes: defaults( - updatedSavedObject.attributes, - attributesByDocId.get(updatedSavedObject.id)! - ), - }) - ) - : asErr({ - // The SavedObjectsRepository maintains the order of the docs - // so we can rely on the index in the `docs` to match an error - // on the same index in the `bulkUpdate` result - entity: docs[index], - error: updatedSavedObject, - }) - ); - } - /** * Removes the specified task from the index. * @@ -517,9 +468,3 @@ function ensureQueryOnlyReturnsTaskObjects(opts: SearchOpts): SearchOpts { query, }; } - -function isSavedObjectsUpdateResponse( - result: SavedObjectsUpdateResponse | Error -): result is SavedObjectsUpdateResponse { - return result && typeof (result as SavedObjectsUpdateResponse).id === 'string'; -}