Skip to content

Commit

Permalink
Only fetching TaskManager's available tasks once per call to fillPool (
Browse files Browse the repository at this point in the history
…#61991) (#63369)

Co-authored-by: Elastic Machine <[email protected]>

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
kobelb and elasticmachine authored Apr 13, 2020
1 parent 857e08c commit 2ef0c71
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 29 deletions.
4 changes: 2 additions & 2 deletions x-pack/plugins/task_manager/server/lib/fill_pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { fillPool } from './fill_pool';
import { TaskPoolRunResult } from '../task_pool';

describe('fillPool', () => {
test('stops filling when there are no more tasks in the store', async () => {
test('stops filling when pool runs all claimed tasks, even if there is more capacity', async () => {
const tasks = [
[1, 2, 3],
[4, 5],
Expand All @@ -22,7 +22,7 @@ describe('fillPool', () => {

await fillPool(fetchAvailableTasks, converter, run);

expect(_.flattenDeep(run.args)).toEqual([1, 2, 3, 4, 5]);
expect(_.flattenDeep(run.args)).toEqual([1, 2, 3]);
});

test('stops filling when the pool has no more capacity', async () => {
Expand Down
49 changes: 22 additions & 27 deletions x-pack/plugins/task_manager/server/lib/fill_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
*/

import { performance } from 'perf_hooks';
import { after } from 'lodash';
import { TaskPoolRunResult } from '../task_pool';

export enum FillPoolResult {
NoTasksClaimed = 'NoTasksClaimed',
RanOutOfCapacity = 'RanOutOfCapacity',
PoolFilled = 'PoolFilled',
}

type BatchRun<T> = (tasks: T[]) => Promise<TaskPoolRunResult>;
Expand All @@ -35,33 +35,28 @@ export async function fillPool<TRecord, TRunner>(
run: BatchRun<TRunner>
): Promise<FillPoolResult> {
performance.mark('fillPool.start');
const markClaimedTasksOnRerunCycle = after(2, () =>
performance.mark('fillPool.claimedOnRerunCycle')
);
while (true) {
const instances = await fetchAvailableTasks();
const instances = await fetchAvailableTasks();

if (!instances.length) {
performance.mark('fillPool.bailNoTasks');
performance.measure(
'fillPool.activityDurationUntilNoTasks',
'fillPool.start',
'fillPool.bailNoTasks'
);
return FillPoolResult.NoTasksClaimed;
}
markClaimedTasksOnRerunCycle();
const tasks = instances.map(converter);
if (!instances.length) {
performance.mark('fillPool.bailNoTasks');
performance.measure(
'fillPool.activityDurationUntilNoTasks',
'fillPool.start',
'fillPool.bailNoTasks'
);
return FillPoolResult.NoTasksClaimed;
}
const tasks = instances.map(converter);

if ((await run(tasks)) === TaskPoolRunResult.RanOutOfCapacity) {
performance.mark('fillPool.bailExhaustedCapacity');
performance.measure(
'fillPool.activityDurationUntilExhaustedCapacity',
'fillPool.start',
'fillPool.bailExhaustedCapacity'
);
return FillPoolResult.RanOutOfCapacity;
}
performance.mark('fillPool.cycle');
if ((await run(tasks)) === TaskPoolRunResult.RanOutOfCapacity) {
performance.mark('fillPool.bailExhaustedCapacity');
performance.measure(
'fillPool.activityDurationUntilExhaustedCapacity',
'fillPool.start',
'fillPool.bailExhaustedCapacity'
);
return FillPoolResult.RanOutOfCapacity;
}
performance.mark('fillPool.cycle');
return FillPoolResult.PoolFilled;
}

0 comments on commit 2ef0c71

Please sign in to comment.