Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Response Ops][Task Manager] Resource based task scheduling - 2nd attempt #189626

Merged
merged 12 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions x-pack/plugins/alerting/server/rule_type_registry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,39 @@ describe('Create Lifecycle', () => {
});
});

test('injects custom cost for certain rule types', () => {
const ruleType: RuleType<never, never, never, never, never, 'default', 'recovered', {}> = {
id: 'siem.indicatorRule',
name: 'Test',
actionGroups: [
{
id: 'default',
name: 'Default',
},
],
defaultActionGroupId: 'default',
minimumLicenseRequired: 'basic',
isExportable: true,
executor: jest.fn(),
category: 'test',
producer: 'alerts',
ruleTaskTimeout: '20m',
validate: {
params: { validate: (params) => params },
},
};
const registry = new RuleTypeRegistry(ruleTypeRegistryParams);
registry.register(ruleType);
expect(taskManager.registerTaskDefinitions).toHaveBeenCalledTimes(1);
expect(taskManager.registerTaskDefinitions.mock.calls[0][0]).toMatchObject({
'alerting:siem.indicatorRule': {
timeout: '20m',
title: 'Test',
cost: 10,
},
});
});

test('shallow clones the given rule type', () => {
const ruleType: RuleType<never, never, never, never, never, 'default', 'recovered', {}> = {
id: 'test',
Expand Down
7 changes: 7 additions & 0 deletions x-pack/plugins/alerting/server/rule_type_registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { Logger } from '@kbn/core/server';
import { LicensingPluginSetup } from '@kbn/licensing-plugin/server';
import { RunContext, TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
import { stateSchemaByVersion } from '@kbn/alerting-state-types';
import { TaskCost } from '@kbn/task-manager-plugin/server/task';
import { TaskRunnerFactory } from './task_runner';
import {
RuleType,
Expand All @@ -40,6 +41,9 @@ import { AlertsService } from './alerts_service/alerts_service';
import { getRuleTypeIdValidLegacyConsumers } from './rule_type_registry_deprecated_consumers';
import { AlertingConfig } from './config';

const RULE_TYPES_WITH_CUSTOM_COST: Record<string, TaskCost> = {
'siem.indicatorRule': TaskCost.ExtraLarge,
};
export interface ConstructorOptions {
config: AlertingConfig;
logger: Logger;
Expand Down Expand Up @@ -289,6 +293,8 @@ export class RuleTypeRegistry {
normalizedRuleType as unknown as UntypedNormalizedRuleType
);

const taskCost: TaskCost | undefined = RULE_TYPES_WITH_CUSTOM_COST[ruleType.id];

this.taskManager.registerTaskDefinitions({
[`alerting:${ruleType.id}`]: {
title: ruleType.name,
Expand All @@ -310,6 +316,7 @@ export class RuleTypeRegistry {
spaceId: schema.string(),
consumer: schema.maybe(schema.string()),
}),
...(taskCost ? { cost: taskCost } : {}),
},
});

Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/kibana.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"task_manager"
],
"optionalPlugins": [
"cloud",
"usageCollection"
]
}
Expand Down
6 changes: 3 additions & 3 deletions x-pack/plugins/task_manager/server/MONITORING.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ The root `timestamp` is the time in which the summary was exposed (either to the
Follow this step-by-step guide to make sense of the stats: https://www.elastic.co/guide/en/kibana/master/task-manager-troubleshooting.html#task-manager-diagnosing-root-cause

#### The Configuration Section
The `configuration` section summarizes Task Manager's current configuration, including dynamic configurations which change over time, such as `poll_interval` and `max_workers` which adjust in reaction to changing load on the system.
The `configuration` section summarizes Task Manager's current configuration, including dynamic configurations which change over time, such as `poll_interval` and `capacity` which adjust in reaction to changing load on the system.

These are "Hot" stats which are updated whenever a change happens in the configuration.

Expand All @@ -69,8 +69,8 @@ The `runtime` tracks Task Manager's performance as it runs, making note of task
These include:
- The time it takes a task to run (p50, p90, p95 & p99, using a configurable running average window, `50` by default)
- The average _drift_ that tasks experience (p50, p90, p95 & p99, using the same configurable running average window as above). Drift tells us how long after a task's scheduled a task typically executes.
- The average _load_ (p50, p90, p95 & p99, using the same configurable running average window as above). Load tells us what percentage of workers is in use at the end of each polling cycle.
- The polling rate (the timestamp of the last time a polling cycle completed), the polling health stats (number of version clashes and mismatches) and the result [`No tasks | Filled task pool | Unexpectedly ran out of workers`] frequency the past 50 polling cycles (using the same window size as the one used for running averages)
- The average _load_ (p50, p90, p95 & p99, using the same configurable running average window as above). Load tells us what percentage of capacity is in use at the end of each polling cycle.
- The polling rate (the timestamp of the last time a polling cycle completed), the polling health stats (number of version clashes and mismatches) and the result [`No tasks | Filled task pool | Unexpectedly ran out of capacity`] frequency the past 50 polling cycles (using the same window size as the one used for running averages)
- The `Success | Retry | Failure ratio` by task type. This is different than the workload stats which tell you what's in the queue, but ca't keep track of retries and of non recurring tasks as they're wiped off the index when completed.

These are "Hot" stats which are updated reactively as Tasks are executed and interacted with.
3 changes: 0 additions & 3 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_workers": 10,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
Expand Down Expand Up @@ -81,7 +80,6 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_workers": 10,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
Expand Down Expand Up @@ -137,7 +135,6 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_workers": 10,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
Expand Down
16 changes: 11 additions & 5 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import { schema, TypeOf } from '@kbn/config-schema';

export const MAX_WORKERS_LIMIT = 100;
export const DEFAULT_CAPACITY = 10;
export const MAX_CAPACITY = 50;
export const MIN_CAPACITY = 5;
export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const DEFAULT_VERSION_CONFLICT_THRESHOLD = 80;
Expand Down Expand Up @@ -64,6 +67,8 @@ const requestTimeoutsConfig = schema.object({
export const configSchema = schema.object(
{
allow_reading_invalid_state: schema.boolean({ defaultValue: true }),
/* The number of normal cost tasks that this Kibana instance will run simultaneously */
capacity: schema.maybe(schema.number({ min: MIN_CAPACITY, max: MAX_CAPACITY })),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to add this to kibana-docker file, right? And to the cloud allow-list?

ephemeral_tasks: schema.object({
enabled: schema.boolean({ defaultValue: false }),
/* How many requests can Task Manager buffer before it rejects new requests. */
Expand All @@ -81,11 +86,12 @@ export const configSchema = schema.object(
min: 1,
}),
/* The maximum number of tasks that this Kibana instance will run simultaneously. */
max_workers: schema.number({
defaultValue: DEFAULT_MAX_WORKERS,
// disable the task manager rather than trying to specify it with 0 workers
min: 1,
}),
max_workers: schema.maybe(
schema.number({
// disable the task manager rather than trying to specify it with 0 workers
min: 1,
})
),
/* The interval at which monotonically increasing metrics counters will reset */
metrics_reset_interval: schema.number({
defaultValue: DEFAULT_METRICS_RESET_INTERVAL,
Expand Down
29 changes: 14 additions & 15 deletions x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { v4 as uuidv4 } from 'uuid';
import { asTaskPollingCycleEvent, asTaskRunEvent, TaskPersistence } from './task_events';
import { TaskRunResult } from './task_running';
import { TaskPoolRunResult } from './task_pool';
import { TaskPoolMock } from './task_pool.mock';
import { TaskPoolMock } from './task_pool/task_pool.mock';
import { executionContextServiceMock } from '@kbn/core/server/mocks';
import { taskManagerMock } from './mocks';

Expand All @@ -45,7 +45,6 @@ describe('EphemeralTaskLifecycle', () => {
definitions: new TaskTypeDictionary(taskManagerLogger),
executionContext,
config: {
max_workers: 10,
max_attempts: 9,
poll_interval: 6000000,
version_conflict_threshold: 80,
Expand Down Expand Up @@ -156,7 +155,7 @@ describe('EphemeralTaskLifecycle', () => {
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task));

poolCapacity.mockReturnValue({
availableWorkers: 10,
availableCapacity: 10,
});

lifecycleEvent$.next(
Expand All @@ -179,7 +178,7 @@ describe('EphemeralTaskLifecycle', () => {
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task));

poolCapacity.mockReturnValue({
availableWorkers: 10,
availableCapacity: 10,
});

lifecycleEvent$.next(
Expand Down Expand Up @@ -216,7 +215,7 @@ describe('EphemeralTaskLifecycle', () => {
expect(ephemeralTaskLifecycle.attemptToRun(tasks[2])).toMatchObject(asOk(tasks[2]));

poolCapacity.mockReturnValue({
availableWorkers: 2,
availableCapacity: 2,
});

lifecycleEvent$.next(
Expand Down Expand Up @@ -256,9 +255,9 @@ describe('EphemeralTaskLifecycle', () => {

// pool has capacity for both
poolCapacity.mockReturnValue({
availableWorkers: 10,
availableCapacity: 10,
});
pool.getOccupiedWorkersByType.mockReturnValue(0);
pool.getUsedCapacityByType.mockReturnValue(0);

lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
Expand Down Expand Up @@ -296,10 +295,10 @@ describe('EphemeralTaskLifecycle', () => {

// pool has capacity in general
poolCapacity.mockReturnValue({
availableWorkers: 2,
availableCapacity: 2,
});
// but when we ask how many it has occupied by type - wee always have one worker already occupied by that type
pool.getOccupiedWorkersByType.mockReturnValue(1);
pool.getUsedCapacityByType.mockReturnValue(1);

lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
Expand All @@ -308,7 +307,7 @@ describe('EphemeralTaskLifecycle', () => {
expect(pool.run).toHaveBeenCalledTimes(0);

// now we release the worker in the pool and cause another cycle in the epheemral queue
pool.getOccupiedWorkersByType.mockReturnValue(0);
pool.getUsedCapacityByType.mockReturnValue(0);
lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
);
Expand Down Expand Up @@ -356,9 +355,9 @@ describe('EphemeralTaskLifecycle', () => {

// pool has capacity for all
poolCapacity.mockReturnValue({
availableWorkers: 10,
availableCapacity: 10,
});
pool.getOccupiedWorkersByType.mockReturnValue(0);
pool.getUsedCapacityByType.mockReturnValue(0);

lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));

Expand Down Expand Up @@ -389,19 +388,19 @@ describe('EphemeralTaskLifecycle', () => {

expect(ephemeralTaskLifecycle.queuedTasks).toBe(3);
poolCapacity.mockReturnValue({
availableWorkers: 1,
availableCapacity: 1,
});
lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));
expect(ephemeralTaskLifecycle.queuedTasks).toBe(2);

poolCapacity.mockReturnValue({
availableWorkers: 1,
availableCapacity: 1,
});
lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));
expect(ephemeralTaskLifecycle.queuedTasks).toBe(1);

poolCapacity.mockReturnValue({
availableWorkers: 1,
availableCapacity: 1,
});
lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));
expect(ephemeralTaskLifecycle.queuedTasks).toBe(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ export class EphemeralTaskLifecycle {
taskType && this.definitions.get(taskType)?.maxConcurrency
? Math.max(
Math.min(
this.pool.availableWorkers,
this.pool.availableCapacity(),
this.definitions.get(taskType)!.maxConcurrency! -
this.pool.getOccupiedWorkersByType(taskType)
this.pool.getUsedCapacityByType(taskType)
),
0
)
: this.pool.availableWorkers;
: this.pool.availableCapacity();

private emitEvent = (event: TaskLifecycleEvent) => {
this.events$.next(event);
Expand Down
7 changes: 4 additions & 3 deletions x-pack/plugins/task_manager/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ export type {

export const config: PluginConfigDescriptor<TaskManagerConfig> = {
schema: configSchema,
exposeToUsage: {
max_workers: true,
},
deprecations: ({ deprecate }) => {
return [
deprecate('ephemeral_tasks.enabled', 'a future version', {
Expand All @@ -68,6 +65,10 @@ export const config: PluginConfigDescriptor<TaskManagerConfig> = {
level: 'warning',
message: `Configuring "xpack.task_manager.ephemeral_tasks.request_capacity" is deprecated and will be removed in a future version. Remove this setting to increase task execution resiliency.`,
}),
deprecate('max_workers', 'a future version', {
level: 'warning',
message: `Configuring "xpack.task_manager.max_workers" is deprecated and will be removed in a future version. Remove this setting and use "xpack.task_manager.capacity" instead.`,
}),
(settings, fromPath, addDeprecation) => {
const taskManager = get(settings, fromPath);
if (taskManager?.index) {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading