From 02d92cdda891e523f0dd8529ab0e3d92078cd6e2 Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Tue, 18 Aug 2020 17:33:31 +0100 Subject: [PATCH] [Task Manager] Handles case where buffer receives multiple entities with the same ID (#74943) (#75150) Handles the case where two operations for the same entity make it into a single batched bulk operation and avoid the clashing ID issue that could cause the poller to hang and stop poling for work). --- .../server/buffered_task_store.test.ts | 33 ++++++++++ .../server/lib/bulk_operation_buffer.test.ts | 34 ++++++++++ .../server/lib/bulk_operation_buffer.ts | 64 ++++++++++++++++--- .../task_manager/server/task_manager.ts | 5 +- 4 files changed, 125 insertions(+), 11 deletions(-) diff --git a/x-pack/plugins/task_manager/server/buffered_task_store.test.ts b/x-pack/plugins/task_manager/server/buffered_task_store.test.ts index 8e18405c79ed2..8ddb9f81c2a8f 100644 --- a/x-pack/plugins/task_manager/server/buffered_task_store.test.ts +++ b/x-pack/plugins/task_manager/server/buffered_task_store.test.ts @@ -58,6 +58,39 @@ describe('Buffered Task Store', () => { ); expect(await results[2]).toMatchObject(tasks[2]); }); + + test('handles multiple items with the same id', async () => { + const taskStore = taskStoreMock.create({ maxAttempts: 10 }); + const bufferedStore = new BufferedTaskStore(taskStore, {}); + + const duplicateIdTask = mockTask(); + const tasks = [ + duplicateIdTask, + mockTask(), + mockTask(), + { ...mockTask(), id: duplicateIdTask.id }, + ]; + + taskStore.bulkUpdate.mockResolvedValueOnce([ + asOk(tasks[0]), + asErr({ entity: tasks[1], error: new Error('Oh no, something went terribly wrong') }), + asOk(tasks[2]), + asOk(tasks[3]), + ]); + + const results = [ + bufferedStore.update(tasks[0]), + bufferedStore.update(tasks[1]), + bufferedStore.update(tasks[2]), + bufferedStore.update(tasks[3]), + ]; + expect(await results[0]).toMatchObject(tasks[0]); + expect(results[1]).rejects.toMatchInlineSnapshot( + `[Error: Oh no, something went terribly wrong]` + ); + expect(await results[2]).toMatchObject(tasks[2]); + expect(await results[3]).toMatchObject(tasks[3]); + }); }); }); diff --git a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts index f32a755515a95..25abd92b32a26 100644 --- a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts +++ b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts @@ -6,6 +6,7 @@ import { createBuffer, Entity, OperationError, BulkOperation } from './bulk_operation_buffer'; import { mapErr, asOk, asErr, Ok, Err } from './result_type'; +import { mockLogger } from '../test_utils'; interface TaskInstance extends Entity { attempts: number; @@ -227,5 +228,38 @@ describe('Bulk Operation Buffer', () => { done(); }); }); + + test('logs unknown bulk operation results', async (done) => { + const bulkUpdate: jest.Mocked> = jest.fn( + ([task1, task2, task3]) => { + return Promise.resolve([ + incrementAttempts(task1), + errorAttempts(createTask()), + incrementAttempts(createTask()), + ]); + } + ); + + const logger = mockLogger(); + + const bufferedUpdate = createBuffer(bulkUpdate, { logger }); + + const task1 = createTask(); + const task2 = createTask(); + const task3 = createTask(); + + return Promise.all([ + expect(bufferedUpdate(task1)).resolves.toMatchObject(incrementAttempts(task1)), + expect(bufferedUpdate(task2)).rejects.toMatchObject( + asErr(new Error(`Unhandled buffered operation for entity: ${task2.id}`)) + ), + expect(bufferedUpdate(task3)).rejects.toMatchObject( + asErr(new Error(`Unhandled buffered operation for entity: ${task3.id}`)) + ), + ]).then(() => { + expect(logger.warn).toHaveBeenCalledTimes(2); + done(); + }); + }); }); }); diff --git a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts index c8e5b837fa36c..57a14c2f8a56b 100644 --- a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts +++ b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts @@ -4,14 +4,16 @@ * you may not use this file except in compliance with the Elastic License. */ -import { keyBy, map } from 'lodash'; +import { map } from 'lodash'; import { Subject, race, from } from 'rxjs'; import { bufferWhen, filter, bufferCount, flatMap, mapTo, first } from 'rxjs/operators'; import { either, Result, asOk, asErr, Ok, Err } from './result_type'; +import { Logger } from '../types'; export interface BufferOptions { bufferMaxDuration?: number; bufferMaxOperations?: number; + logger?: Logger; } export interface Entity { @@ -41,14 +43,14 @@ const FLUSH = true; export function createBuffer( bulkOperation: BulkOperation, - { bufferMaxDuration = 0, bufferMaxOperations = Number.MAX_VALUE }: BufferOptions = {} + { bufferMaxDuration = 0, bufferMaxOperations = Number.MAX_VALUE, logger }: BufferOptions = {} ): Operation { const flushBuffer = new Subject(); const storeUpdateBuffer = new Subject<{ entity: Input; onSuccess: (entity: Ok) => void; - onFailure: (error: Err) => void; + onFailure: (error: Err) => void; }>(); storeUpdateBuffer @@ -56,24 +58,61 @@ export function createBuffer flushBuffer), filter((tasks) => tasks.length > 0) ) - .subscribe((entities) => { - const entityById = keyBy(entities, ({ entity: { id } }) => id); - bulkOperation(map(entities, 'entity')) + .subscribe((bufferedEntities) => { + bulkOperation(map(bufferedEntities, 'entity')) .then((results) => { results.forEach((result) => either( result, (entity) => { - entityById[entity.id].onSuccess(asOk(entity)); + either( + pullFirstWhere(bufferedEntities, ({ entity: { id } }) => id === entity.id), + ({ onSuccess }) => { + onSuccess(asOk(entity)); + }, + () => { + if (logger) { + logger.warn( + `Unhandled successful Bulk Operation result: ${ + entity?.id ? entity.id : entity + }` + ); + } + } + ); }, ({ entity, error }: OperationError) => { - entityById[entity.id].onFailure(asErr(error)); + either( + pullFirstWhere(bufferedEntities, ({ entity: { id } }) => id === entity.id), + ({ onFailure }) => { + onFailure(asErr(error)); + }, + () => { + if (logger) { + logger.warn( + `Unhandled failed Bulk Operation result: ${entity?.id ? entity.id : entity}` + ); + } + } + ); } ) ); + + // if any `bufferedEntities` remain in the array then there was no result we could map to them in the bulkOperation + // call their failure handler to avoid hanging the promise returned to the call site + bufferedEntities.forEach((unhandledBufferedEntity) => { + unhandledBufferedEntity.onFailure( + asErr( + new Error( + `Unhandled buffered operation for entity: ${unhandledBufferedEntity.entity.id}` + ) + ) + ); + }); }) .catch((ex) => { - entities.forEach(({ onFailure }) => onFailure(asErr(ex))); + bufferedEntities.forEach(({ onFailure }) => onFailure(asErr(ex))); }); }); @@ -120,3 +159,10 @@ function resolveIn(ms: number) { setTimeout(resolve, ms); }); } + +function pullFirstWhere(collection: T[], predicate: (entity: T) => boolean): Result { + const indexOfFirstEntity = collection.findIndex(predicate); + return indexOfFirstEntity >= 0 + ? asOk(collection.splice(indexOfFirstEntity, 1)[0]) + : asErr(undefined); +} diff --git a/x-pack/plugins/task_manager/server/task_manager.ts b/x-pack/plugins/task_manager/server/task_manager.ts index 7165fd28678c1..9c194b3fb9dd2 100644 --- a/x-pack/plugins/task_manager/server/task_manager.ts +++ b/x-pack/plugins/task_manager/server/task_manager.ts @@ -146,6 +146,7 @@ export class TaskManager { this.bufferedStore = new BufferedTaskStore(this.store, { bufferMaxOperations: opts.config.max_workers, + logger: this.logger, }); this.pool = new TaskPool({ @@ -283,7 +284,7 @@ export class TaskManager { */ public async schedule( taskInstance: TaskInstanceWithDeprecatedFields, - options?: object + options?: Record ): Promise { await this.waitUntilStarted(); const { taskInstance: modifiedTask } = await this.middleware.beforeSave({ @@ -318,7 +319,7 @@ export class TaskManager { */ public async ensureScheduled( taskInstance: TaskInstanceWithId, - options?: object + options?: Record ): Promise { try { return await this.schedule(taskInstance, options);