Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] [Task Manager] Handles case where buffer receives multiple entities with the same ID (#74943) #75150

Merged
merged 1 commit into from
Aug 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions x-pack/plugins/task_manager/server/buffered_task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,39 @@ describe('Buffered Task Store', () => {
);
expect(await results[2]).toMatchObject(tasks[2]);
});

test('handles multiple items with the same id', async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
const bufferedStore = new BufferedTaskStore(taskStore, {});

const duplicateIdTask = mockTask();
const tasks = [
duplicateIdTask,
mockTask(),
mockTask(),
{ ...mockTask(), id: duplicateIdTask.id },
];

taskStore.bulkUpdate.mockResolvedValueOnce([
asOk(tasks[0]),
asErr({ entity: tasks[1], error: new Error('Oh no, something went terribly wrong') }),
asOk(tasks[2]),
asOk(tasks[3]),
]);

const results = [
bufferedStore.update(tasks[0]),
bufferedStore.update(tasks[1]),
bufferedStore.update(tasks[2]),
bufferedStore.update(tasks[3]),
];
expect(await results[0]).toMatchObject(tasks[0]);
expect(results[1]).rejects.toMatchInlineSnapshot(
`[Error: Oh no, something went terribly wrong]`
);
expect(await results[2]).toMatchObject(tasks[2]);
expect(await results[3]).toMatchObject(tasks[3]);
});
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import { createBuffer, Entity, OperationError, BulkOperation } from './bulk_operation_buffer';
import { mapErr, asOk, asErr, Ok, Err } from './result_type';
import { mockLogger } from '../test_utils';

interface TaskInstance extends Entity {
attempts: number;
Expand Down Expand Up @@ -227,5 +228,38 @@ describe('Bulk Operation Buffer', () => {
done();
});
});

test('logs unknown bulk operation results', async (done) => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
([task1, task2, task3]) => {
return Promise.resolve([
incrementAttempts(task1),
errorAttempts(createTask()),
incrementAttempts(createTask()),
]);
}
);

const logger = mockLogger();

const bufferedUpdate = createBuffer(bulkUpdate, { logger });

const task1 = createTask();
const task2 = createTask();
const task3 = createTask();

return Promise.all([
expect(bufferedUpdate(task1)).resolves.toMatchObject(incrementAttempts(task1)),
expect(bufferedUpdate(task2)).rejects.toMatchObject(
asErr(new Error(`Unhandled buffered operation for entity: ${task2.id}`))
),
expect(bufferedUpdate(task3)).rejects.toMatchObject(
asErr(new Error(`Unhandled buffered operation for entity: ${task3.id}`))
),
]).then(() => {
expect(logger.warn).toHaveBeenCalledTimes(2);
done();
});
});
});
});
64 changes: 55 additions & 9 deletions x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { keyBy, map } from 'lodash';
import { map } from 'lodash';
import { Subject, race, from } from 'rxjs';
import { bufferWhen, filter, bufferCount, flatMap, mapTo, first } from 'rxjs/operators';
import { either, Result, asOk, asErr, Ok, Err } from './result_type';
import { Logger } from '../types';

export interface BufferOptions {
bufferMaxDuration?: number;
bufferMaxOperations?: number;
logger?: Logger;
}

export interface Entity {
Expand Down Expand Up @@ -41,39 +43,76 @@ const FLUSH = true;

export function createBuffer<Input extends Entity, ErrorOutput, Output extends Entity = Input>(
bulkOperation: BulkOperation<Input, ErrorOutput, Output>,
{ bufferMaxDuration = 0, bufferMaxOperations = Number.MAX_VALUE }: BufferOptions = {}
{ bufferMaxDuration = 0, bufferMaxOperations = Number.MAX_VALUE, logger }: BufferOptions = {}
): Operation<Input, ErrorOutput, Output> {
const flushBuffer = new Subject<void>();

const storeUpdateBuffer = new Subject<{
entity: Input;
onSuccess: (entity: Ok<Output>) => void;
onFailure: (error: Err<ErrorOutput>) => void;
onFailure: (error: Err<ErrorOutput | Error>) => void;
}>();

storeUpdateBuffer
.pipe(
bufferWhen(() => flushBuffer),
filter((tasks) => tasks.length > 0)
)
.subscribe((entities) => {
const entityById = keyBy(entities, ({ entity: { id } }) => id);
bulkOperation(map(entities, 'entity'))
.subscribe((bufferedEntities) => {
bulkOperation(map(bufferedEntities, 'entity'))
.then((results) => {
results.forEach((result) =>
either(
result,
(entity) => {
entityById[entity.id].onSuccess(asOk(entity));
either(
pullFirstWhere(bufferedEntities, ({ entity: { id } }) => id === entity.id),
({ onSuccess }) => {
onSuccess(asOk(entity));
},
() => {
if (logger) {
logger.warn(
`Unhandled successful Bulk Operation result: ${
entity?.id ? entity.id : entity
}`
);
}
}
);
},
({ entity, error }: OperationError<Input, ErrorOutput>) => {
entityById[entity.id].onFailure(asErr(error));
either(
pullFirstWhere(bufferedEntities, ({ entity: { id } }) => id === entity.id),
({ onFailure }) => {
onFailure(asErr(error));
},
() => {
if (logger) {
logger.warn(
`Unhandled failed Bulk Operation result: ${entity?.id ? entity.id : entity}`
);
}
}
);
}
)
);

// if any `bufferedEntities` remain in the array then there was no result we could map to them in the bulkOperation
// call their failure handler to avoid hanging the promise returned to the call site
bufferedEntities.forEach((unhandledBufferedEntity) => {
unhandledBufferedEntity.onFailure(
asErr(
new Error(
`Unhandled buffered operation for entity: ${unhandledBufferedEntity.entity.id}`
)
)
);
});
})
.catch((ex) => {
entities.forEach(({ onFailure }) => onFailure(asErr(ex)));
bufferedEntities.forEach(({ onFailure }) => onFailure(asErr(ex)));
});
});

Expand Down Expand Up @@ -120,3 +159,10 @@ function resolveIn(ms: number) {
setTimeout(resolve, ms);
});
}

function pullFirstWhere<T>(collection: T[], predicate: (entity: T) => boolean): Result<T, void> {
const indexOfFirstEntity = collection.findIndex(predicate);
return indexOfFirstEntity >= 0
? asOk(collection.splice(indexOfFirstEntity, 1)[0])
: asErr(undefined);
}
5 changes: 3 additions & 2 deletions x-pack/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ export class TaskManager {

this.bufferedStore = new BufferedTaskStore(this.store, {
bufferMaxOperations: opts.config.max_workers,
logger: this.logger,
});

this.pool = new TaskPool({
Expand Down Expand Up @@ -283,7 +284,7 @@ export class TaskManager {
*/
public async schedule(
taskInstance: TaskInstanceWithDeprecatedFields,
options?: object
options?: Record<string, unknown>
): Promise<ConcreteTaskInstance> {
await this.waitUntilStarted();
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
Expand Down Expand Up @@ -318,7 +319,7 @@ export class TaskManager {
*/
public async ensureScheduled(
taskInstance: TaskInstanceWithId,
options?: object
options?: Record<string, unknown>
): Promise<TaskInstanceWithId> {
try {
return await this.schedule(taskInstance, options);
Expand Down