Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task Manager] Batches the update operations in Task Manager #71470

Merged
merged 10 commits into from
Jul 21, 2020
82 changes: 82 additions & 0 deletions x-pack/plugins/task_manager/server/buffered_task_store.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import uuid from 'uuid';
import { taskStoreMock } from './task_store.mock';
import { BufferedTaskStore } from './buffered_task_store';
import { asErr, asOk } from './lib/result_type';
import { TaskStatus } from './task';

describe('Buffered Task Store', () => {
test('proxies the TaskStore for `maxAttempts` and `remove`', async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
taskStore.bulkUpdate.mockResolvedValue([]);
const bufferedStore = new BufferedTaskStore(taskStore, {});

expect(bufferedStore.maxAttempts).toEqual(10);

bufferedStore.remove('1');
expect(taskStore.remove).toHaveBeenCalledWith('1');
});

describe('update', () => {
test("proxies the TaskStore's `bulkUpdate`", async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
const bufferedStore = new BufferedTaskStore(taskStore, {});

const task = mockTask();

taskStore.bulkUpdate.mockResolvedValue([asOk(task)]);

expect(await bufferedStore.update(task)).toMatchObject(task);
expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task]);
});

test('handles partially successfull bulkUpdates resolving each call appropriately', async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
const bufferedStore = new BufferedTaskStore(taskStore, {});

const tasks = [mockTask(), mockTask(), mockTask()];

taskStore.bulkUpdate.mockResolvedValueOnce([
asOk(tasks[0]),
asErr({ entity: tasks[1], error: new Error('Oh no, something went terribly wrong') }),
asOk(tasks[2]),
]);

const results = [
bufferedStore.update(tasks[0]),
bufferedStore.update(tasks[1]),
bufferedStore.update(tasks[2]),
];
expect(await results[0]).toMatchObject(tasks[0]);
expect(results[1]).rejects.toMatchInlineSnapshot(
`[Error: Oh no, something went terribly wrong]`
);
expect(await results[2]).toMatchObject(tasks[2]);
});
});
});

function mockTask() {
return {
id: `task_${uuid.v4()}`,
attempts: 0,
schedule: undefined,
params: { hello: 'world' },
retryAt: null,
runAt: new Date(),
scheduledAt: new Date(),
scope: undefined,
startedAt: null,
state: { foo: 'bar' },
status: TaskStatus.Idle,
taskType: 'report',
user: undefined,
version: '123',
ownerId: '123',
};
}
39 changes: 39 additions & 0 deletions x-pack/plugins/task_manager/server/buffered_task_store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { TaskStore } from './task_store';
import { ConcreteTaskInstance } from './task';
import { Updatable } from './task_runner';
import { createBuffer, Operation, BufferOptions } from './lib/bulk_operation_buffer';
import { unwrapPromise } from './lib/result_type';

// by default allow updates to be buffered for up to 50ms
const DEFAULT_BUFFER_MAX_DURATION = 50;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think there would be value to have this as a kibana.yml config with a default of that number?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered it, but it feel very internal and I think for now we can keep it as a constant.
We can expose it later if we feel it's worth it, but I think the poll_interval and max_workers is enough cognitive load for our users. :)


export class BufferedTaskStore implements Updatable {
private bufferedUpdate: Operation<ConcreteTaskInstance, Error>;
constructor(private readonly taskStore: TaskStore, options: BufferOptions) {
this.bufferedUpdate = createBuffer<ConcreteTaskInstance, Error>(
(docs) => taskStore.bulkUpdate(docs),
{
bufferMaxDuration: DEFAULT_BUFFER_MAX_DURATION,
...options,
}
);
}

public get maxAttempts(): number {
return this.taskStore.maxAttempts;
}

public async update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance> {
return unwrapPromise(this.bufferedUpdate(doc));
}

public async remove(id: string): Promise<void> {
return this.taskStore.remove(id);
}
}
Loading