Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task Manager] Batches the update operations in Task Manager #71470

Merged
merged 10 commits into from
Jul 21, 2020
82 changes: 82 additions & 0 deletions x-pack/plugins/task_manager/server/buffered_task_store.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import uuid from 'uuid';
import { taskStoreMock } from './task_store.mock';
import { BufferedTaskStore } from './buffered_task_store';
import { asErr, asOk } from './lib/result_type';
import { TaskStatus } from './task';

describe('Buffered Task Store', () => {
test('proxies the TaskStore for `maxAttempts` and `remove`', async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
taskStore.bulkUpdate.mockResolvedValue([]);
const bufferedStore = new BufferedTaskStore(taskStore);

expect(bufferedStore.maxAttempts).toEqual(10);

bufferedStore.remove('1');
expect(taskStore.remove).toHaveBeenCalledWith('1');
});

describe('update', () => {
test("proxies the TaskStore's `bulkUpdate`", async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
const bufferedStore = new BufferedTaskStore(taskStore);

const task = mockTask();

taskStore.bulkUpdate.mockResolvedValue([asOk(task)]);

expect(await bufferedStore.update(task)).toMatchObject(task);
expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task]);
});

test('handles partially successfull bulkUpdates resolving each call appropriately', async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
const bufferedStore = new BufferedTaskStore(taskStore);

const tasks = [mockTask(), mockTask(), mockTask()];

taskStore.bulkUpdate.mockResolvedValueOnce([
asOk(tasks[0]),
asErr({ entity: tasks[1], error: new Error('Oh no, something went terribly wrong') }),
asOk(tasks[2]),
]);

const results = [
bufferedStore.update(tasks[0]),
bufferedStore.update(tasks[1]),
bufferedStore.update(tasks[2]),
];
expect(await results[0]).toMatchObject(tasks[0]);
expect(results[1]).rejects.toMatchInlineSnapshot(
`[Error: Oh no, something went terribly wrong]`
);
expect(await results[2]).toMatchObject(tasks[2]);
});
});
});

function mockTask() {
return {
id: `task_${uuid.v4()}`,
attempts: 0,
schedule: undefined,
params: { hello: 'world' },
retryAt: null,
runAt: new Date(),
scheduledAt: new Date(),
scope: undefined,
startedAt: null,
state: { foo: 'bar' },
status: TaskStatus.Idle,
taskType: 'report',
user: undefined,
version: '123',
ownerId: '123',
};
}
32 changes: 32 additions & 0 deletions x-pack/plugins/task_manager/server/buffered_task_store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { TaskStore } from './task_store';
import { ConcreteTaskInstance } from './task';
import { Updatable } from './task_runner';
import { createBuffer, Operation } from './lib/bulk_operation_buffer';
import { unwrapPromise } from './lib/result_type';

export class BufferedTaskStore implements Updatable {
private bufferedUpdate: Operation<ConcreteTaskInstance, Error>;
constructor(private readonly taskStore: TaskStore) {
this.bufferedUpdate = createBuffer<ConcreteTaskInstance, Error>((docs) =>
taskStore.bulkUpdate(docs)
);
}

public get maxAttempts(): number {
return this.taskStore.maxAttempts;
}

public async update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance> {
return unwrapPromise(this.bufferedUpdate(doc));
}

public async remove(id: string): Promise<void> {
return this.taskStore.remove(id);
}
}
167 changes: 167 additions & 0 deletions x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { createBuffer, Entity, OperationError, BulkOperation } from './bulk_operation_buffer';
import { mapErr, asOk, asErr, Ok, Err } from './result_type';

interface TaskInstance extends Entity {
attempts: number;
}

const createTask = (function (): () => TaskInstance {
let counter = 0;
return () => ({
id: `task ${++counter}`,
attempts: 1,
});
})();

function incrementAttempts(task: TaskInstance): Ok<TaskInstance> {
return asOk({
...task,
attempts: task.attempts + 1,
});
}

function errorAttempts(task: TaskInstance): Err<OperationError<TaskInstance, Error>> {
return asErr({
entity: incrementAttempts(task).value,
error: { name: '', message: 'Oh no, something went terribly wrong', statusCode: 500 },
});
}

describe('Bulk Operation Buffer', () => {
describe('createBuffer()', () => {
test('batches up multiple Operation calls', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
([task1, task2]) => {
return Promise.resolve([incrementAttempts(task1), incrementAttempts(task2)]);
}
);

const bufferedUpdate = createBuffer(bulkUpdate);

const task1 = createTask();
const task2 = createTask();

expect(await Promise.all([bufferedUpdate(task1), bufferedUpdate(task2)])).toMatchObject([
incrementAttempts(task1),
incrementAttempts(task2),
]);
expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]);
});

test('batch updates are executed at most by the next Event Loop tick', async () => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn((tasks) => {
return Promise.resolve(tasks.map(incrementAttempts));
});

const bufferedUpdate = createBuffer(bulkUpdate);

const task1 = createTask();
const task2 = createTask();
const task3 = createTask();
const task4 = createTask();
const task5 = createTask();
const task6 = createTask();

return new Promise((resolve) => {
Promise.all([bufferedUpdate(task1), bufferedUpdate(task2)]).then((_) => {
expect(bulkUpdate).toHaveBeenCalledTimes(1);
expect(bulkUpdate).toHaveBeenCalledWith([task1, task2]);
expect(bulkUpdate).not.toHaveBeenCalledWith([task3, task4]);
});

setTimeout(() => {
// on next tick
setTimeout(() => {
// on next tick
expect(bulkUpdate).toHaveBeenCalledTimes(2);
Promise.all([bufferedUpdate(task5), bufferedUpdate(task6)]).then((_) => {
expect(bulkUpdate).toHaveBeenCalledTimes(3);
expect(bulkUpdate).toHaveBeenCalledWith([task5, task6]);
resolve();
});
}, 0);

expect(bulkUpdate).toHaveBeenCalledTimes(1);
Promise.all([bufferedUpdate(task3), bufferedUpdate(task4)]).then((_) => {
expect(bulkUpdate).toHaveBeenCalledTimes(2);
expect(bulkUpdate).toHaveBeenCalledWith([task3, task4]);
});
}, 0);
});
});

test('handles both resolutions and rejections at individual task level', async (done) => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
([task1, task2, task3]) => {
return Promise.resolve([
incrementAttempts(task1),
errorAttempts(task2),
incrementAttempts(task3),
]);
}
);

const bufferedUpdate = createBuffer(bulkUpdate);

const task1 = createTask();
const task2 = createTask();
const task3 = createTask();

return Promise.all([
expect(bufferedUpdate(task1)).resolves.toMatchObject(incrementAttempts(task1)),
expect(bufferedUpdate(task2)).rejects.toMatchObject(
mapErr(
(err: OperationError<TaskInstance, Error>) => asErr(err.error),
errorAttempts(task2)
)
),
expect(bufferedUpdate(task3)).resolves.toMatchObject(incrementAttempts(task3)),
]).then(() => {
expect(bulkUpdate).toHaveBeenCalledTimes(1);
done();
});
});

test('handles bulkUpdate failure', async (done) => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(() => {
return Promise.reject(new Error('bulkUpdate is an illusion'));
});

const bufferedUpdate = createBuffer(bulkUpdate);

const task1 = createTask();
const task2 = createTask();
const task3 = createTask();

return Promise.all([
expect(bufferedUpdate(task1)).rejects.toMatchInlineSnapshot(`
Object {
"error": [Error: bulkUpdate is an illusion],
"tag": "err",
}
`),
expect(bufferedUpdate(task2)).rejects.toMatchInlineSnapshot(`
Object {
"error": [Error: bulkUpdate is an illusion],
"tag": "err",
}
`),
expect(bufferedUpdate(task3)).rejects.toMatchInlineSnapshot(`
Object {
"error": [Error: bulkUpdate is an illusion],
"tag": "err",
}
`),
]).then(() => {
expect(bulkUpdate).toHaveBeenCalledTimes(1);
done();
});
});
});
});
77 changes: 77 additions & 0 deletions x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { keyBy, map } from 'lodash';
import { Subject } from 'rxjs';
import { bufferWhen, filter } from 'rxjs/operators';
import { either, Result, asOk, asErr, Ok, Err } from './result_type';

export interface Entity {
id: string;
}

export interface OperationError<Input, ErrorOutput> {
entity: Input;
error: ErrorOutput;
}

export type OperationResult<Input, ErrorOutput, Output = Input> = Result<
Output,
OperationError<Input, ErrorOutput>
>;

export type Operation<Input, ErrorOutput, Output = Input> = (
entity: Input
) => Promise<Result<Output, ErrorOutput>>;

export type BulkOperation<Input, ErrorOutput, Output = Input> = (
entities: Input[]
) => Promise<Array<OperationResult<Input, ErrorOutput, Output>>>;

export function createBuffer<Input extends Entity, ErrorOutput, Output extends Entity = Input>(
bulkOperation: BulkOperation<Input, ErrorOutput, Output>
): Operation<Input, ErrorOutput, Output> {
const flushBuffer = new Subject<void>();
const storeUpdateBuffer = new Subject<{
entity: Input;
onSuccess: (entity: Ok<Output>) => void;
onFailure: (error: Err<ErrorOutput>) => void;
}>();

storeUpdateBuffer
.pipe(
bufferWhen(() => flushBuffer),
filter((tasks) => tasks.length > 0)
)
.subscribe((entities) => {
const entityById = keyBy(entities, ({ entity: { id } }) => id);
bulkOperation(map(entities, 'entity'))
.then((results) => {
results.forEach((result) =>
either(
result,
(entity) => {
entityById[entity.id].onSuccess(asOk(entity));
},
({ entity, error }: OperationError<Input, ErrorOutput>) => {
entityById[entity.id].onFailure(asErr(error));
}
)
);
})
.catch((ex) => {
entities.forEach(({ onFailure }) => onFailure(asErr(ex)));
});
});

return async function (entity: Input) {
return new Promise((resolve, reject) => {
// ensure we flush by the end of the "current" event loop tick
setImmediate(() => flushBuffer.next());
gmmorris marked this conversation as resolved.
Show resolved Hide resolved
storeUpdateBuffer.next({ entity, onSuccess: resolve, onFailure: reject });
});
};
}
Loading