Skip to content

Commit

Permalink
Rename task claimers
Browse files Browse the repository at this point in the history
  • Loading branch information
mikecote committed Aug 14, 2024
1 parent 0e18fb3 commit 706e033
Show file tree
Hide file tree
Showing 16 changed files with 57 additions and 57 deletions.
4 changes: 2 additions & 2 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

import { configSchema, CLAIM_STRATEGY_DEFAULT, CLAIM_STRATEGY_MGET } from './config';
import { configSchema, CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET } from './config';

describe('config validation', () => {
test('task manager defaults', () => {
Expand Down Expand Up @@ -244,7 +244,7 @@ describe('config validation', () => {
});

test('default claim strategy defaults poll interval to 3000ms', () => {
const result = configSchema.validate({ claim_strategy: CLAIM_STRATEGY_DEFAULT });
const result = configSchema.validate({ claim_strategy: CLAIM_STRATEGY_UPDATE_BY_QUERY });
expect(result.poll_interval).toEqual(3000);
});

Expand Down
6 changes: 3 additions & 3 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ export const DEFAULT_METRICS_RESET_INTERVAL = 30 * 1000; // 30 seconds
// At the default poll interval of 3sec, this averages over the last 15sec.
export const DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW = 5;

export const CLAIM_STRATEGY_DEFAULT = 'default';
export const CLAIM_STRATEGY_MGET = 'unsafe_mget';
export const CLAIM_STRATEGY_UPDATE_BY_QUERY = 'update_by_query';
export const CLAIM_STRATEGY_MGET = 'mget';

export const taskExecutionFailureThresholdSchema = schema.object(
{
Expand Down Expand Up @@ -168,7 +168,7 @@ export const configSchema = schema.object(
max: 100,
min: 1,
}),
claim_strategy: schema.string({ defaultValue: CLAIM_STRATEGY_DEFAULT }),
claim_strategy: schema.string({ defaultValue: CLAIM_STRATEGY_UPDATE_BY_QUERY }),
request_timeouts: requestTimeoutsConfig,
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ describe('managed configuration', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'unsafe_mget',
claim_strategy: 'mget',
request_timeouts: {
update_by_query: 1000,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ describe('switch task claiming strategies', () => {
const setupResultMget = await setupKibanaServer({
xpack: {
task_manager: {
claim_strategy: 'unsafe_mget',
claim_strategy: 'mget',
},
},
});
kibanaServer = setupResultMget.kibanaServer;

taskClaimingOpts = TaskClaimingMock.mock.calls[1][0];
expect(taskClaimingOpts.strategy).toBe('unsafe_mget');
expect(taskClaimingOpts.strategy).toBe('mget');

// inject a task to run and ensure it is claimed and run
const id2 = uuidV4();
Expand Down Expand Up @@ -136,15 +136,15 @@ describe('switch task claiming strategies', () => {
const setupResultMget = await setupTestServers({
xpack: {
task_manager: {
claim_strategy: 'unsafe_mget',
claim_strategy: 'mget',
},
},
});
const esServer = setupResultMget.esServer;
let kibanaServer = setupResultMget.kibanaServer;
let taskClaimingOpts: TaskClaimingOpts = TaskClaimingMock.mock.calls[0][0];

expect(taskClaimingOpts.strategy).toBe('unsafe_mget');
expect(taskClaimingOpts.strategy).toBe('mget');

mockTaskTypeRunFn.mockImplementation(() => {
return { state: {} };
Expand Down Expand Up @@ -255,14 +255,14 @@ describe('switch task claiming strategies', () => {
const setupResultMget = await setupKibanaServer({
xpack: {
task_manager: {
claim_strategy: 'unsafe_mget',
claim_strategy: 'mget',
},
},
});
kibanaServer = setupResultMget.kibanaServer;

taskClaimingOpts = TaskClaimingMock.mock.calls[1][0];
expect(taskClaimingOpts.strategy).toBe('unsafe_mget');
expect(taskClaimingOpts.strategy).toBe('mget');

// task doc should still exist and be running
const task = await kibanaServer.coreStart.elasticsearch.client.asInternalUser.get<{
Expand Down Expand Up @@ -294,15 +294,15 @@ describe('switch task claiming strategies', () => {
const setupResultMget = await setupTestServers({
xpack: {
task_manager: {
claim_strategy: 'unsafe_mget',
claim_strategy: 'mget',
},
},
});
const esServer = setupResultMget.esServer;
let kibanaServer = setupResultMget.kibanaServer;
let taskClaimingOpts: TaskClaimingOpts = TaskClaimingMock.mock.calls[0][0];

expect(taskClaimingOpts.strategy).toBe('unsafe_mget');
expect(taskClaimingOpts.strategy).toBe('mget');

mockTaskTypeRunFn.mockImplementation(async () => {
await new Promise((resolve) => setTimeout(resolve, 2000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
ADJUST_THROUGHPUT_INTERVAL,
} from './create_managed_configuration';
import { mockLogger } from '../test_utils';
import { CLAIM_STRATEGY_DEFAULT, CLAIM_STRATEGY_MGET, TaskManagerConfig } from '../config';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET, TaskManagerConfig } from '../config';

describe('createManagedConfiguration()', () => {
let clock: sinon.SinonFakeTimers;
Expand Down Expand Up @@ -148,7 +148,7 @@ describe('createManagedConfiguration()', () => {
describe('capacity configuration', () => {
function setupScenario(
startingCapacity: number,
claimStrategy: string = CLAIM_STRATEGY_DEFAULT
claimStrategy: string = CLAIM_STRATEGY_UPDATE_BY_QUERY
) {
const errors$ = new Subject<Error>();
const subscription = jest.fn();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

import { CLAIM_STRATEGY_DEFAULT, CLAIM_STRATEGY_MGET, DEFAULT_CAPACITY } from '../config';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET, DEFAULT_CAPACITY } from '../config';
import { getDefaultCapacity } from './get_default_capacity';

describe('getDefaultCapacity', () => {
Expand Down Expand Up @@ -58,7 +58,7 @@ describe('getDefaultCapacity', () => {
isCloud: true,
isServerless: false,
isBackgroundTaskNodeOnly: false,
claimStrategy: CLAIM_STRATEGY_DEFAULT,
claimStrategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
})
).toBe(DEFAULT_CAPACITY);

Expand All @@ -68,7 +68,7 @@ describe('getDefaultCapacity', () => {
isCloud: true,
isServerless: false,
isBackgroundTaskNodeOnly: true,
claimStrategy: CLAIM_STRATEGY_DEFAULT,
claimStrategy: CLAIM_STRATEGY_UPDATE_BY_QUERY,
})
).toBe(DEFAULT_CAPACITY);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { pick, merge } from 'lodash';
import { map, startWith } from 'rxjs';
import { JsonObject } from '@kbn/utility-types';
import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
import { CLAIM_STRATEGY_DEFAULT, TaskManagerConfig } from '../config';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, TaskManagerConfig } from '../config';
import { ManagedConfiguration } from '../lib/create_managed_configuration';
import { getCapacityInCost, getCapacityInWorkers } from '../task_pool';

Expand Down Expand Up @@ -41,7 +41,7 @@ export function createConfigurationAggregator(
): AggregatedStatProvider<ConfigStat> {
return combineLatest([
of(pick(config, ...CONFIG_FIELDS_TO_EXPOSE)),
of({ claim_strategy: config.claim_strategy ?? CLAIM_STRATEGY_DEFAULT }),
of({ claim_strategy: config.claim_strategy ?? CLAIM_STRATEGY_UPDATE_BY_QUERY }),
managedConfig.pollIntervalConfiguration$.pipe(
startWith(config.poll_interval),
map<number, Pick<TaskManagerConfig, 'poll_interval'>>((pollInterval) => ({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ describe('TaskPollingLifecycle', () => {
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
});

test('provides TaskClaiming with the capacity available when strategy = CLAIM_STRATEGY_DEFAULT', () => {
test('provides TaskClaiming with the capacity available when strategy = CLAIM_STRATEGY_UPDATE_BY_QUERY', () => {
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const capacity$ = new Subject<number>();

Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import type { Logger, ExecutionContextStart } from '@kbn/core/server';

import { Result, asErr, mapErr, asOk, map, mapOk } from './lib/result_type';
import { ManagedConfiguration } from './lib/create_managed_configuration';
import { TaskManagerConfig, CLAIM_STRATEGY_DEFAULT } from './config';
import { TaskManagerConfig, CLAIM_STRATEGY_UPDATE_BY_QUERY } from './config';

import {
TaskMarkRunning,
Expand Down Expand Up @@ -155,7 +155,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
const { poll_interval: pollInterval, claim_strategy: claimStrategy } = config;

let pollIntervalDelay$: Observable<number> | undefined;
if (claimStrategy === CLAIM_STRATEGY_DEFAULT) {
if (claimStrategy === CLAIM_STRATEGY_UPDATE_BY_QUERY) {
pollIntervalDelay$ = delayOnClaimConflicts(
capacityConfiguration$,
pollIntervalConfiguration$,
Expand Down
14 changes: 7 additions & 7 deletions x-pack/plugins/task_manager/server/task_claimers/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import { getTaskClaimer } from '.';
import { mockLogger } from '../test_utils';
import { claimAvailableTasksDefault } from './strategy_default';
import { claimAvailableTasksUpdateByQuery } from './strategy_update_by_query';
import { claimAvailableTasksMget } from './strategy_mget';

const logger = mockLogger();
Expand All @@ -16,23 +16,23 @@ describe('task_claimers/index', () => {
beforeEach(() => jest.resetAllMocks());

describe('getTaskClaimer()', () => {
test('returns expected result for default', () => {
const taskClaimer = getTaskClaimer(logger, 'default');
expect(taskClaimer).toBe(claimAvailableTasksDefault);
test('returns expected result for update_by_query', () => {
const taskClaimer = getTaskClaimer(logger, 'update_by_query');
expect(taskClaimer).toBe(claimAvailableTasksUpdateByQuery);
expect(logger.warn).not.toHaveBeenCalled();
});

test('returns expected result for mget', () => {
const taskClaimer = getTaskClaimer(logger, 'unsafe_mget');
const taskClaimer = getTaskClaimer(logger, 'mget');
expect(taskClaimer).toBe(claimAvailableTasksMget);
expect(logger.warn).not.toHaveBeenCalled();
});

test('logs a warning for unsupported parameter', () => {
const taskClaimer = getTaskClaimer(logger, 'not-supported');
expect(taskClaimer).toBe(claimAvailableTasksDefault);
expect(taskClaimer).toBe(claimAvailableTasksUpdateByQuery);
expect(logger.warn).toHaveBeenCalledWith(
'Unknown task claiming strategy "not-supported", falling back to default'
'Unknown task claiming strategy "not-supported", falling back to update_by_query'
);
});
});
Expand Down
12 changes: 6 additions & 6 deletions x-pack/plugins/task_manager/server/task_claimers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import { TaskClaim, TaskTiming } from '../task_events';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { TaskClaimingBatches } from '../queries/task_claiming';
import { ConcreteTaskInstance } from '../task';
import { claimAvailableTasksDefault } from './strategy_default';
import { claimAvailableTasksUpdateByQuery } from './strategy_update_by_query';
import { claimAvailableTasksMget } from './strategy_mget';
import { CLAIM_STRATEGY_DEFAULT, CLAIM_STRATEGY_MGET } from '../config';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET } from '../config';
import { TaskPartitioner } from '../lib/task_partitioner';

export interface TaskClaimerOpts {
Expand Down Expand Up @@ -49,17 +49,17 @@ let WarnedOnInvalidClaimer = false;

export function getTaskClaimer(logger: Logger, strategy: string): TaskClaimerFn {
switch (strategy) {
case CLAIM_STRATEGY_DEFAULT:
return claimAvailableTasksDefault;
case CLAIM_STRATEGY_UPDATE_BY_QUERY:
return claimAvailableTasksUpdateByQuery;
case CLAIM_STRATEGY_MGET:
return claimAvailableTasksMget;
}

if (!WarnedOnInvalidClaimer) {
WarnedOnInvalidClaimer = true;
logger.warn(`Unknown task claiming strategy "${strategy}", falling back to default`);
logger.warn(`Unknown task claiming strategy "${strategy}", falling back to update_by_query`);
}
return claimAvailableTasksDefault;
return claimAvailableTasksUpdateByQuery;
}

export function getEmptyClaimOwnershipResult(): ClaimOwnershipResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,7 @@ if (doc['task.runAt'].size()!=0) {

const taskClaiming = new TaskClaiming({
logger: taskManagerLogger,
strategy: 'default',
strategy: 'update_by_query',
definitions,
excludedTaskTypes: [],
unusedTypes: [],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ interface OwnershipClaimingOpts {
taskMaxAttempts: Record<string, number>;
}

export function claimAvailableTasksDefault(
export function claimAvailableTasksUpdateByQuery(
opts: TaskClaimerOpts
): Observable<ClaimOwnershipResult> {
const { getCapacity, claimOwnershipUntil, batches, events$, taskStore } = opts;
Expand Down
Loading

0 comments on commit 706e033

Please sign in to comment.