diff --git a/x-pack/plugins/task_manager/server/config.test.ts b/x-pack/plugins/task_manager/server/config.test.ts index 9782d6ae08dbf..c196a334931ba 100644 --- a/x-pack/plugins/task_manager/server/config.test.ts +++ b/x-pack/plugins/task_manager/server/config.test.ts @@ -23,6 +23,7 @@ describe('config validation', () => { }, "max_attempts": 3, "max_workers": 10, + "metrics_reset_interval": 30000, "monitored_aggregated_stats_refresh_rate": 60000, "monitored_stats_health_verbose_log": Object { "enabled": false, @@ -81,6 +82,7 @@ describe('config validation', () => { }, "max_attempts": 3, "max_workers": 10, + "metrics_reset_interval": 30000, "monitored_aggregated_stats_refresh_rate": 60000, "monitored_stats_health_verbose_log": Object { "enabled": false, @@ -137,6 +139,7 @@ describe('config validation', () => { }, "max_attempts": 3, "max_workers": 10, + "metrics_reset_interval": 30000, "monitored_aggregated_stats_refresh_rate": 60000, "monitored_stats_health_verbose_log": Object { "enabled": false, diff --git a/x-pack/plugins/task_manager/server/config.ts b/x-pack/plugins/task_manager/server/config.ts index c2d4940d36450..490d25a7bdfb0 100644 --- a/x-pack/plugins/task_manager/server/config.ts +++ b/x-pack/plugins/task_manager/server/config.ts @@ -20,6 +20,8 @@ export const DEFAULT_MONITORING_REFRESH_RATE = 60 * 1000; export const DEFAULT_MONITORING_STATS_RUNNING_AVERAGE_WINDOW = 50; export const DEFAULT_MONITORING_STATS_WARN_DELAYED_TASK_START_IN_SECONDS = 60; +export const DEFAULT_METRICS_RESET_INTERVAL = 30 * 1000; // 30 seconds + // At the default poll interval of 3sec, this averages over the last 15sec. export const DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW = 5; @@ -52,46 +54,40 @@ const eventLoopDelaySchema = schema.object({ }); const requeueInvalidTasksConfig = schema.object({ - enabled: schema.boolean({ defaultValue: false }), delay: schema.number({ defaultValue: 3000, min: 0 }), + enabled: schema.boolean({ defaultValue: false }), max_attempts: schema.number({ defaultValue: 100, min: 1, max: 500 }), }); export const configSchema = schema.object( { + allow_reading_invalid_state: schema.boolean({ defaultValue: true }), + ephemeral_tasks: schema.object({ + enabled: schema.boolean({ defaultValue: false }), + /* How many requests can Task Manager buffer before it rejects new requests. */ + request_capacity: schema.number({ + // a nice round contrived number, feel free to change as we learn how it behaves + defaultValue: 10, + min: 1, + max: DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY, + }), + }), + event_loop_delay: eventLoopDelaySchema, /* The maximum number of times a task will be attempted before being abandoned as failed */ max_attempts: schema.number({ defaultValue: 3, min: 1, }), - /* How often, in milliseconds, the task manager will look for more work. */ - poll_interval: schema.number({ - defaultValue: DEFAULT_POLL_INTERVAL, - min: 100, - }), - /* How many requests can Task Manager buffer before it rejects new requests. */ - request_capacity: schema.number({ - // a nice round contrived number, feel free to change as we learn how it behaves - defaultValue: 1000, - min: 1, - }), /* The maximum number of tasks that this Kibana instance will run simultaneously. */ max_workers: schema.number({ defaultValue: DEFAULT_MAX_WORKERS, // disable the task manager rather than trying to specify it with 0 workers min: 1, }), - /* The threshold percenatge for workers experiencing version conflicts for shifting the polling interval. */ - version_conflict_threshold: schema.number({ - defaultValue: DEFAULT_VERSION_CONFLICT_THRESHOLD, - min: 50, - max: 100, - }), - /* The rate at which we emit fresh monitored stats. By default we'll use the poll_interval (+ a slight buffer) */ - monitored_stats_required_freshness: schema.number({ - defaultValue: (config?: unknown) => - ((config as { poll_interval: number })?.poll_interval ?? DEFAULT_POLL_INTERVAL) + 1000, - min: 100, + /* The interval at which monotonically increasing metrics counters will reset */ + metrics_reset_interval: schema.number({ + defaultValue: DEFAULT_METRICS_RESET_INTERVAL, + min: 10 * 1000, // minimum 10 seconds }), /* The rate at which we refresh monitored stats that require aggregation queries against ES. */ monitored_aggregated_stats_refresh_rate: schema.number({ @@ -99,6 +95,22 @@ export const configSchema = schema.object( /* don't run monitored stat aggregations any faster than once every 5 seconds */ min: 5000, }), + monitored_stats_health_verbose_log: schema.object({ + enabled: schema.boolean({ defaultValue: false }), + level: schema.oneOf([schema.literal('debug'), schema.literal('info')], { + defaultValue: 'debug', + }), + /* The amount of seconds we allow a task to delay before printing a warning server log */ + warn_delayed_task_start_in_seconds: schema.number({ + defaultValue: DEFAULT_MONITORING_STATS_WARN_DELAYED_TASK_START_IN_SECONDS, + }), + }), + /* The rate at which we emit fresh monitored stats. By default we'll use the poll_interval (+ a slight buffer) */ + monitored_stats_required_freshness: schema.number({ + defaultValue: (config?: unknown) => + ((config as { poll_interval: number })?.poll_interval ?? DEFAULT_POLL_INTERVAL) + 1000, + min: 100, + }), /* The size of the running average window for monitored stats. */ monitored_stats_running_average_window: schema.number({ defaultValue: DEFAULT_MONITORING_STATS_RUNNING_AVERAGE_WINDOW, @@ -107,44 +119,39 @@ export const configSchema = schema.object( }), /* Task Execution result warn & error thresholds. */ monitored_task_execution_thresholds: schema.object({ - default: taskExecutionFailureThresholdSchema, custom: schema.recordOf(schema.string(), taskExecutionFailureThresholdSchema, { defaultValue: {}, }), + default: taskExecutionFailureThresholdSchema, }), - monitored_stats_health_verbose_log: schema.object({ - enabled: schema.boolean({ defaultValue: false }), - level: schema.oneOf([schema.literal('debug'), schema.literal('info')], { - defaultValue: 'debug', - }), - /* The amount of seconds we allow a task to delay before printing a warning server log */ - warn_delayed_task_start_in_seconds: schema.number({ - defaultValue: DEFAULT_MONITORING_STATS_WARN_DELAYED_TASK_START_IN_SECONDS, - }), - }), - ephemeral_tasks: schema.object({ - enabled: schema.boolean({ defaultValue: false }), - /* How many requests can Task Manager buffer before it rejects new requests. */ - request_capacity: schema.number({ - // a nice round contrived number, feel free to change as we learn how it behaves - defaultValue: 10, - min: 1, - max: DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY, - }), + /* How often, in milliseconds, the task manager will look for more work. */ + poll_interval: schema.number({ + defaultValue: DEFAULT_POLL_INTERVAL, + min: 100, }), - event_loop_delay: eventLoopDelaySchema, - worker_utilization_running_average_window: schema.number({ - defaultValue: DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW, - max: 100, + /* How many requests can Task Manager buffer before it rejects new requests. */ + request_capacity: schema.number({ + // a nice round contrived number, feel free to change as we learn how it behaves + defaultValue: 1000, min: 1, }), + requeue_invalid_tasks: requeueInvalidTasksConfig, /* These are not designed to be used by most users. Please use caution when changing these */ unsafe: schema.object({ - exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }), authenticate_background_task_utilization: schema.boolean({ defaultValue: true }), + exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }), + }), + /* The threshold percenatge for workers experiencing version conflicts for shifting the polling interval. */ + version_conflict_threshold: schema.number({ + defaultValue: DEFAULT_VERSION_CONFLICT_THRESHOLD, + min: 50, + max: 100, + }), + worker_utilization_running_average_window: schema.number({ + defaultValue: DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW, + max: 100, + min: 1, }), - requeue_invalid_tasks: requeueInvalidTasksConfig, - allow_reading_invalid_state: schema.boolean({ defaultValue: true }), }, { validate: (config) => { diff --git a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts index 863b5d986d3da..6a06ea93f3dcb 100644 --- a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts @@ -84,6 +84,7 @@ describe('EphemeralTaskLifecycle', () => { delay: 3000, max_attempts: 20, }, + metrics_reset_interval: 3000, ...config, }, elasticsearchAndSOAvailability$, diff --git a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts index e2d290d256ec2..f034feb154462 100644 --- a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts +++ b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts @@ -79,6 +79,7 @@ describe('managed configuration', () => { delay: 3000, max_attempts: 20, }, + metrics_reset_interval: 3000, }); logger = context.logger.get('taskManager'); diff --git a/x-pack/plugins/task_manager/server/monitoring/runtime_statistics_aggregator.ts b/x-pack/plugins/task_manager/server/lib/runtime_statistics_aggregator.ts similarity index 100% rename from x-pack/plugins/task_manager/server/monitoring/runtime_statistics_aggregator.ts rename to x-pack/plugins/task_manager/server/lib/runtime_statistics_aggregator.ts diff --git a/x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts b/x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts new file mode 100644 index 0000000000000..9671698329447 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts @@ -0,0 +1,1070 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import sinon from 'sinon'; +import { Subject, Observable } from 'rxjs'; +import { take, bufferCount, skip } from 'rxjs/operators'; +import { isTaskPollingCycleEvent, isTaskRunEvent } from '../task_events'; +import { TaskLifecycleEvent } from '../polling_lifecycle'; +import { AggregatedStat } from '../lib/runtime_statistics_aggregator'; +import { taskPollingLifecycleMock } from '../polling_lifecycle.mock'; +import { TaskManagerConfig } from '../config'; +import { createAggregator } from './create_aggregator'; +import { TaskClaimMetric, TaskClaimMetricsAggregator } from './task_claim_metrics_aggregator'; +import { taskClaimFailureEvent, taskClaimSuccessEvent } from './task_claim_metrics_aggregator.test'; +import { getTaskRunFailedEvent, getTaskRunSuccessEvent } from './task_run_metrics_aggregator.test'; +import { TaskRunMetric, TaskRunMetricsAggregator } from './task_run_metrics_aggregator'; +import * as TaskClaimMetricsAggregatorModule from './task_claim_metrics_aggregator'; +import { metricsAggregatorMock } from './metrics_aggregator.mock'; + +const mockMetricsAggregator = metricsAggregatorMock.create(); +const config: TaskManagerConfig = { + allow_reading_invalid_state: false, + ephemeral_tasks: { + enabled: true, + request_capacity: 10, + }, + event_loop_delay: { + monitor: true, + warn_threshold: 5000, + }, + max_attempts: 9, + max_workers: 10, + metrics_reset_interval: 30000, + monitored_aggregated_stats_refresh_rate: 5000, + monitored_stats_health_verbose_log: { + enabled: false, + level: 'debug' as const, + warn_delayed_task_start_in_seconds: 60, + }, + monitored_stats_required_freshness: 6000000, + monitored_stats_running_average_window: 50, + monitored_task_execution_thresholds: { + custom: {}, + default: { + error_threshold: 90, + warn_threshold: 80, + }, + }, + poll_interval: 6000000, + request_capacity: 1000, + requeue_invalid_tasks: { + enabled: false, + delay: 3000, + max_attempts: 20, + }, + unsafe: { + authenticate_background_task_utilization: true, + exclude_task_types: [], + }, + version_conflict_threshold: 80, + worker_utilization_running_average_window: 5, +}; + +describe('createAggregator', () => { + beforeEach(() => { + jest.resetAllMocks(); + }); + + describe('with TaskClaimMetricsAggregator', () => { + test('returns a cumulative count of successful polling cycles and total polling cycles', async () => { + const pollingCycleEvents = [ + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + ]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + + const taskClaimAggregator = createAggregator({ + key: 'task_claim', + taskPollingLifecycle, + config, + resetMetrics$: new Subject(), + taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskPollingCycleEvent(taskEvent), + metricsAggregator: new TaskClaimMetricsAggregator(), + }); + + return new Promise((resolve) => { + taskClaimAggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(pollingCycleEvents.length), + bufferCount(pollingCycleEvents.length) + ) + .subscribe((metrics: Array>) => { + expect(metrics[0]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 1, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[1]).toEqual({ + key: 'task_claim', + value: { success: 2, total: 2, duration: { counts: [2], values: [100] } }, + }); + expect(metrics[2]).toEqual({ + key: 'task_claim', + value: { success: 3, total: 3, duration: { counts: [3], values: [100] } }, + }); + expect(metrics[3]).toEqual({ + key: 'task_claim', + value: { success: 4, total: 4, duration: { counts: [4], values: [100] } }, + }); + expect(metrics[4]).toEqual({ + key: 'task_claim', + value: { success: 4, total: 5, duration: { counts: [4], values: [100] } }, + }); + expect(metrics[5]).toEqual({ + key: 'task_claim', + value: { success: 5, total: 6, duration: { counts: [5], values: [100] } }, + }); + expect(metrics[6]).toEqual({ + key: 'task_claim', + value: { success: 6, total: 7, duration: { counts: [6], values: [100] } }, + }); + expect(metrics[7]).toEqual({ + key: 'task_claim', + value: { success: 7, total: 8, duration: { counts: [7], values: [100] } }, + }); + expect(metrics[8]).toEqual({ + key: 'task_claim', + value: { success: 8, total: 9, duration: { counts: [8], values: [100] } }, + }); + expect(metrics[9]).toEqual({ + key: 'task_claim', + value: { success: 8, total: 10, duration: { counts: [8], values: [100] } }, + }); + expect(metrics[10]).toEqual({ + key: 'task_claim', + value: { success: 9, total: 11, duration: { counts: [9], values: [100] } }, + }); + resolve(); + }); + + for (const event of pollingCycleEvents) { + events$.next(event); + } + }); + }); + + test('resets count when resetMetric$ event is received', async () => { + const resetMetrics$ = new Subject(); + const pollingCycleEvents1 = [ + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + ]; + + const pollingCycleEvents2 = [ + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + ]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + + const taskClaimAggregator = createAggregator({ + key: 'task_claim', + taskPollingLifecycle, + config, + resetMetrics$, + taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskPollingCycleEvent(taskEvent), + metricsAggregator: new TaskClaimMetricsAggregator(), + }); + + return new Promise((resolve) => { + taskClaimAggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(pollingCycleEvents1.length + pollingCycleEvents2.length), + bufferCount(pollingCycleEvents1.length + pollingCycleEvents2.length) + ) + .subscribe((metrics: Array>) => { + expect(metrics[0]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 1, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[1]).toEqual({ + key: 'task_claim', + value: { success: 2, total: 2, duration: { counts: [2], values: [100] } }, + }); + expect(metrics[2]).toEqual({ + key: 'task_claim', + value: { success: 3, total: 3, duration: { counts: [3], values: [100] } }, + }); + expect(metrics[3]).toEqual({ + key: 'task_claim', + value: { success: 4, total: 4, duration: { counts: [4], values: [100] } }, + }); + expect(metrics[4]).toEqual({ + key: 'task_claim', + value: { success: 4, total: 5, duration: { counts: [4], values: [100] } }, + }); + expect(metrics[5]).toEqual({ + key: 'task_claim', + value: { success: 5, total: 6, duration: { counts: [5], values: [100] } }, + }); + // reset event should have been received here + expect(metrics[6]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 1, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[7]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 2, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[8]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 3, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[9]).toEqual({ + key: 'task_claim', + value: { success: 2, total: 4, duration: { counts: [2], values: [100] } }, + }); + expect(metrics[10]).toEqual({ + key: 'task_claim', + value: { success: 3, total: 5, duration: { counts: [3], values: [100] } }, + }); + resolve(); + }); + + for (const event of pollingCycleEvents1) { + events$.next(event); + } + resetMetrics$.next(true); + for (const event of pollingCycleEvents2) { + events$.next(event); + } + }); + }); + + test('resets count when configured metrics reset interval expires', async () => { + const clock = sinon.useFakeTimers(); + clock.tick(0); + const pollingCycleEvents1 = [ + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + ]; + + const pollingCycleEvents2 = [ + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + ]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + + const taskClaimAggregator = createAggregator({ + key: 'task_claim', + taskPollingLifecycle, + config: { + ...config, + metrics_reset_interval: 10, + }, + resetMetrics$: new Subject(), + taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskPollingCycleEvent(taskEvent), + metricsAggregator: new TaskClaimMetricsAggregator(), + }); + + return new Promise((resolve) => { + taskClaimAggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(pollingCycleEvents1.length + pollingCycleEvents2.length), + bufferCount(pollingCycleEvents1.length + pollingCycleEvents2.length) + ) + .subscribe((metrics: Array>) => { + expect(metrics[0]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 1, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[1]).toEqual({ + key: 'task_claim', + value: { success: 2, total: 2, duration: { counts: [2], values: [100] } }, + }); + expect(metrics[2]).toEqual({ + key: 'task_claim', + value: { success: 3, total: 3, duration: { counts: [3], values: [100] } }, + }); + expect(metrics[3]).toEqual({ + key: 'task_claim', + value: { success: 4, total: 4, duration: { counts: [4], values: [100] } }, + }); + expect(metrics[4]).toEqual({ + key: 'task_claim', + value: { success: 4, total: 5, duration: { counts: [4], values: [100] } }, + }); + expect(metrics[5]).toEqual({ + key: 'task_claim', + value: { success: 5, total: 6, duration: { counts: [5], values: [100] } }, + }); + // reset interval should have fired here + expect(metrics[6]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 1, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[7]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 2, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[8]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 3, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[9]).toEqual({ + key: 'task_claim', + value: { success: 2, total: 4, duration: { counts: [2], values: [100] } }, + }); + expect(metrics[10]).toEqual({ + key: 'task_claim', + value: { success: 3, total: 5, duration: { counts: [3], values: [100] } }, + }); + resolve(); + }); + + for (const event of pollingCycleEvents1) { + events$.next(event); + } + clock.tick(20); + for (const event of pollingCycleEvents2) { + events$.next(event); + } + + clock.restore(); + }); + }); + }); + + describe('with TaskRunMetricsAggregator', () => { + test('returns a cumulative count of successful task runs and total task runs, broken down by type', async () => { + const taskRunEvents = [ + getTaskRunSuccessEvent('alerting:example'), + getTaskRunSuccessEvent('telemetry'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunSuccessEvent('report'), + getTaskRunFailedEvent('alerting:example'), + getTaskRunSuccessEvent('alerting:.index-threshold'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunFailedEvent('alerting:example'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunFailedEvent('actions:webhook'), + ]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + + const taskRunAggregator = createAggregator({ + key: 'task_run', + taskPollingLifecycle, + config, + resetMetrics$: new Subject(), + taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent), + metricsAggregator: new TaskRunMetricsAggregator(), + }); + + return new Promise((resolve) => { + taskRunAggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(taskRunEvents.length), + bufferCount(taskRunEvents.length) + ) + .subscribe((metrics: Array>) => { + expect(metrics[0]).toEqual({ + key: 'task_run', + value: { + overall: { success: 1, total: 1 }, + by_type: { + alerting: { success: 1, total: 1 }, + 'alerting:example': { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[1]).toEqual({ + key: 'task_run', + value: { + overall: { success: 2, total: 2 }, + by_type: { + alerting: { success: 1, total: 1 }, + 'alerting:example': { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[2]).toEqual({ + key: 'task_run', + value: { + overall: { success: 3, total: 3 }, + by_type: { + alerting: { success: 2, total: 2 }, + 'alerting:example': { success: 2, total: 2 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[3]).toEqual({ + key: 'task_run', + value: { + overall: { success: 4, total: 4 }, + by_type: { + alerting: { success: 2, total: 2 }, + 'alerting:example': { success: 2, total: 2 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[4]).toEqual({ + key: 'task_run', + value: { + overall: { success: 4, total: 5 }, + by_type: { + alerting: { success: 2, total: 3 }, + 'alerting:example': { success: 2, total: 3 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[5]).toEqual({ + key: 'task_run', + value: { + overall: { success: 5, total: 6 }, + by_type: { + alerting: { success: 3, total: 4 }, + 'alerting:.index-threshold': { success: 1, total: 1 }, + 'alerting:example': { success: 2, total: 3 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[6]).toEqual({ + key: 'task_run', + value: { + overall: { success: 6, total: 7 }, + by_type: { + alerting: { success: 4, total: 5 }, + 'alerting:.index-threshold': { success: 1, total: 1 }, + 'alerting:example': { success: 3, total: 4 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[7]).toEqual({ + key: 'task_run', + value: { + overall: { success: 6, total: 8 }, + by_type: { + alerting: { success: 4, total: 6 }, + 'alerting:.index-threshold': { success: 1, total: 1 }, + 'alerting:example': { success: 3, total: 5 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[8]).toEqual({ + key: 'task_run', + value: { + overall: { success: 7, total: 9 }, + by_type: { + alerting: { success: 5, total: 7 }, + 'alerting:.index-threshold': { success: 1, total: 1 }, + 'alerting:example': { success: 4, total: 6 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[9]).toEqual({ + key: 'task_run', + value: { + overall: { success: 7, total: 10 }, + by_type: { + actions: { success: 0, total: 1 }, + alerting: { success: 5, total: 7 }, + 'actions:webhook': { success: 0, total: 1 }, + 'alerting:.index-threshold': { success: 1, total: 1 }, + 'alerting:example': { success: 4, total: 6 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + resolve(); + }); + + for (const event of taskRunEvents) { + events$.next(event); + } + }); + }); + + test('resets count when resetMetric$ event is received', async () => { + const resetMetrics$ = new Subject(); + const taskRunEvents1 = [ + getTaskRunSuccessEvent('alerting:example'), + getTaskRunSuccessEvent('telemetry'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunSuccessEvent('report'), + getTaskRunFailedEvent('alerting:example'), + ]; + + const taskRunEvents2 = [ + getTaskRunSuccessEvent('alerting:example'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunFailedEvent('alerting:example'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunFailedEvent('actions:webhook'), + ]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + + const taskRunAggregator = createAggregator({ + key: 'task_run', + taskPollingLifecycle, + config, + resetMetrics$, + taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent), + metricsAggregator: new TaskRunMetricsAggregator(), + }); + + return new Promise((resolve) => { + taskRunAggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(taskRunEvents1.length + taskRunEvents2.length), + bufferCount(taskRunEvents1.length + taskRunEvents2.length) + ) + .subscribe((metrics: Array>) => { + expect(metrics[0]).toEqual({ + key: 'task_run', + value: { + overall: { success: 1, total: 1 }, + by_type: { + alerting: { success: 1, total: 1 }, + 'alerting:example': { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[1]).toEqual({ + key: 'task_run', + value: { + overall: { success: 2, total: 2 }, + by_type: { + alerting: { success: 1, total: 1 }, + 'alerting:example': { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[2]).toEqual({ + key: 'task_run', + value: { + overall: { success: 3, total: 3 }, + by_type: { + alerting: { success: 2, total: 2 }, + 'alerting:example': { success: 2, total: 2 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[3]).toEqual({ + key: 'task_run', + value: { + overall: { success: 4, total: 4 }, + by_type: { + alerting: { success: 2, total: 2 }, + 'alerting:example': { success: 2, total: 2 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[4]).toEqual({ + key: 'task_run', + value: { + overall: { success: 4, total: 5 }, + by_type: { + alerting: { success: 2, total: 3 }, + 'alerting:example': { success: 2, total: 3 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + // reset event should have been received here + expect(metrics[5]).toEqual({ + key: 'task_run', + value: { + overall: { success: 1, total: 1 }, + by_type: { + alerting: { success: 1, total: 1 }, + 'alerting:example': { success: 1, total: 1 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + expect(metrics[6]).toEqual({ + key: 'task_run', + value: { + overall: { success: 2, total: 2 }, + by_type: { + alerting: { success: 2, total: 2 }, + 'alerting:example': { success: 2, total: 2 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + expect(metrics[7]).toEqual({ + key: 'task_run', + value: { + overall: { success: 2, total: 3 }, + by_type: { + alerting: { success: 2, total: 3 }, + 'alerting:example': { success: 2, total: 3 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + expect(metrics[8]).toEqual({ + key: 'task_run', + value: { + overall: { success: 3, total: 4 }, + by_type: { + alerting: { success: 3, total: 4 }, + 'alerting:example': { success: 3, total: 4 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + expect(metrics[9]).toEqual({ + key: 'task_run', + value: { + overall: { success: 3, total: 5 }, + by_type: { + actions: { success: 0, total: 1 }, + alerting: { success: 3, total: 4 }, + 'actions:webhook': { success: 0, total: 1 }, + 'alerting:example': { success: 3, total: 4 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + resolve(); + }); + + for (const event of taskRunEvents1) { + events$.next(event); + } + resetMetrics$.next(true); + for (const event of taskRunEvents2) { + events$.next(event); + } + }); + }); + + test('resets count when configured metrics reset interval expires', async () => { + const clock = sinon.useFakeTimers(); + clock.tick(0); + const taskRunEvents1 = [ + getTaskRunSuccessEvent('alerting:example'), + getTaskRunSuccessEvent('telemetry'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunSuccessEvent('report'), + getTaskRunFailedEvent('alerting:example'), + ]; + + const taskRunEvents2 = [ + getTaskRunSuccessEvent('alerting:example'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunFailedEvent('alerting:example'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunFailedEvent('actions:webhook'), + ]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + + const taskRunAggregator = createAggregator({ + key: 'task_run', + taskPollingLifecycle, + config: { + ...config, + metrics_reset_interval: 10, + }, + resetMetrics$: new Subject(), + taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent), + metricsAggregator: new TaskRunMetricsAggregator(), + }); + + return new Promise((resolve) => { + taskRunAggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(taskRunEvents1.length + taskRunEvents2.length), + bufferCount(taskRunEvents1.length + taskRunEvents2.length) + ) + .subscribe((metrics: Array>) => { + expect(metrics[0]).toEqual({ + key: 'task_run', + value: { + overall: { success: 1, total: 1 }, + by_type: { + alerting: { success: 1, total: 1 }, + 'alerting:example': { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[1]).toEqual({ + key: 'task_run', + value: { + overall: { success: 2, total: 2 }, + by_type: { + alerting: { success: 1, total: 1 }, + 'alerting:example': { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[2]).toEqual({ + key: 'task_run', + value: { + overall: { success: 3, total: 3 }, + by_type: { + alerting: { success: 2, total: 2 }, + 'alerting:example': { success: 2, total: 2 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[3]).toEqual({ + key: 'task_run', + value: { + overall: { success: 4, total: 4 }, + by_type: { + alerting: { success: 2, total: 2 }, + 'alerting:example': { success: 2, total: 2 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[4]).toEqual({ + key: 'task_run', + value: { + overall: { success: 4, total: 5 }, + by_type: { + alerting: { success: 2, total: 3 }, + 'alerting:example': { success: 2, total: 3 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + // reset event should have been received here + expect(metrics[5]).toEqual({ + key: 'task_run', + value: { + overall: { success: 1, total: 1 }, + by_type: { + alerting: { success: 1, total: 1 }, + 'alerting:example': { success: 1, total: 1 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + expect(metrics[6]).toEqual({ + key: 'task_run', + value: { + overall: { success: 2, total: 2 }, + by_type: { + alerting: { success: 2, total: 2 }, + 'alerting:example': { success: 2, total: 2 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + expect(metrics[7]).toEqual({ + key: 'task_run', + value: { + overall: { success: 2, total: 3 }, + by_type: { + alerting: { success: 2, total: 3 }, + 'alerting:example': { success: 2, total: 3 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + expect(metrics[8]).toEqual({ + key: 'task_run', + value: { + overall: { success: 3, total: 4 }, + by_type: { + alerting: { success: 3, total: 4 }, + 'alerting:example': { success: 3, total: 4 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + expect(metrics[9]).toEqual({ + key: 'task_run', + value: { + overall: { success: 3, total: 5 }, + by_type: { + actions: { success: 0, total: 1 }, + alerting: { success: 3, total: 4 }, + 'actions:webhook': { success: 0, total: 1 }, + 'alerting:example': { success: 3, total: 4 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + resolve(); + }); + + for (const event of taskRunEvents1) { + events$.next(event); + } + clock.tick(20); + for (const event of taskRunEvents2) { + events$.next(event); + } + + clock.restore(); + }); + }); + }); + + test('should filter task lifecycle events using specified taskEventFilter', () => { + const pollingCycleEvents = [ + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + ]; + const taskEventFilter = jest.fn().mockReturnValue(true); + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const aggregator = createAggregator({ + key: 'test', + taskPollingLifecycle, + config, + resetMetrics$: new Subject(), + taskEventFilter, + metricsAggregator: new TaskClaimMetricsAggregator(), + }); + + return new Promise((resolve) => { + aggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(pollingCycleEvents.length), + bufferCount(pollingCycleEvents.length) + ) + .subscribe(() => { + resolve(); + }); + + for (const event of pollingCycleEvents) { + events$.next(event); + } + + expect(taskEventFilter).toHaveBeenCalledTimes(pollingCycleEvents.length); + }); + }); + + test('should call metricAggregator to process task lifecycle events', () => { + const spy = jest + .spyOn(TaskClaimMetricsAggregatorModule, 'TaskClaimMetricsAggregator') + .mockImplementation(() => mockMetricsAggregator); + + const pollingCycleEvents = [ + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + ]; + const taskEventFilter = jest.fn().mockReturnValue(true); + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const aggregator = createAggregator({ + key: 'test', + taskPollingLifecycle, + config, + resetMetrics$: new Subject(), + taskEventFilter, + metricsAggregator: mockMetricsAggregator, + }); + + return new Promise((resolve) => { + aggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(pollingCycleEvents.length), + bufferCount(pollingCycleEvents.length) + ) + .subscribe(() => { + resolve(); + }); + + for (const event of pollingCycleEvents) { + events$.next(event); + } + + expect(mockMetricsAggregator.initialMetric).toHaveBeenCalledTimes(1); + expect(mockMetricsAggregator.processTaskLifecycleEvent).toHaveBeenCalledTimes( + pollingCycleEvents.length + ); + expect(mockMetricsAggregator.collect).toHaveBeenCalledTimes(pollingCycleEvents.length); + expect(mockMetricsAggregator.reset).not.toHaveBeenCalled(); + spy.mockRestore(); + }); + }); + + test('should call metricAggregator reset when resetMetric$ event is received', () => { + const spy = jest + .spyOn(TaskClaimMetricsAggregatorModule, 'TaskClaimMetricsAggregator') + .mockImplementation(() => mockMetricsAggregator); + + const resetMetrics$ = new Subject(); + const pollingCycleEvents = [ + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + ]; + const taskEventFilter = jest.fn().mockReturnValue(true); + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const aggregator = createAggregator({ + key: 'test', + taskPollingLifecycle, + config, + resetMetrics$, + taskEventFilter, + metricsAggregator: mockMetricsAggregator, + }); + + return new Promise((resolve) => { + aggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(pollingCycleEvents.length), + bufferCount(pollingCycleEvents.length) + ) + .subscribe(() => { + resolve(); + }); + + for (const event of pollingCycleEvents) { + events$.next(event); + } + + for (let i = 0; i < 5; i++) { + events$.next(pollingCycleEvents[i]); + } + resetMetrics$.next(true); + for (let i = 0; i < pollingCycleEvents.length; i++) { + events$.next(pollingCycleEvents[i]); + } + + expect(mockMetricsAggregator.initialMetric).toHaveBeenCalledTimes(1); + expect(mockMetricsAggregator.processTaskLifecycleEvent).toHaveBeenCalledTimes( + pollingCycleEvents.length + ); + expect(mockMetricsAggregator.collect).toHaveBeenCalledTimes(pollingCycleEvents.length); + expect(mockMetricsAggregator.reset).toHaveBeenCalledTimes(1); + spy.mockRestore(); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/metrics/create_aggregator.ts b/x-pack/plugins/task_manager/server/metrics/create_aggregator.ts new file mode 100644 index 0000000000000..cece8c0f70b23 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/create_aggregator.ts @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { combineLatest, filter, interval, map, merge, Observable, startWith } from 'rxjs'; +import { JsonValue } from '@kbn/utility-types'; +import { TaskLifecycleEvent, TaskPollingLifecycle } from '../polling_lifecycle'; +import { AggregatedStat, AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; +import { TaskManagerConfig } from '../config'; +import { ITaskMetricsAggregator } from './types'; + +export interface CreateMetricsAggregatorOpts { + key: string; + config: TaskManagerConfig; + resetMetrics$: Observable; + taskPollingLifecycle: TaskPollingLifecycle; + taskEventFilter: (taskEvent: TaskLifecycleEvent) => boolean; + metricsAggregator: ITaskMetricsAggregator; +} + +export function createAggregator({ + key, + taskPollingLifecycle, + config, + resetMetrics$, + taskEventFilter, + metricsAggregator, +}: CreateMetricsAggregatorOpts): AggregatedStatProvider { + // Resets the aggregators either when the reset interval has passed or + // a resetMetrics$ event is received + merge( + interval(config.metrics_reset_interval).pipe(map(() => true)), + resetMetrics$.pipe(map(() => true)) + ).subscribe(() => { + metricsAggregator.reset(); + }); + + const taskEvents$: Observable = taskPollingLifecycle.events.pipe( + filter((taskEvent: TaskLifecycleEvent) => taskEventFilter(taskEvent)), + map((taskEvent: TaskLifecycleEvent) => { + metricsAggregator.processTaskLifecycleEvent(taskEvent); + return metricsAggregator.collect(); + }) + ); + + return combineLatest([taskEvents$.pipe(startWith(metricsAggregator.initialMetric()))]).pipe( + map(([value]: [T]) => { + return { + key, + value, + } as AggregatedStat; + }) + ); +} diff --git a/x-pack/plugins/task_manager/server/metrics/index.ts b/x-pack/plugins/task_manager/server/metrics/index.ts new file mode 100644 index 0000000000000..5e2a73f91dd73 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/index.ts @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Observable } from 'rxjs'; +import { TaskManagerConfig } from '../config'; +import { Metrics, createMetricsAggregators, createMetricsStream } from './metrics_stream'; +import { TaskPollingLifecycle } from '../polling_lifecycle'; +export type { Metrics } from './metrics_stream'; + +export function metricsStream( + config: TaskManagerConfig, + resetMetrics$: Observable, + taskPollingLifecycle?: TaskPollingLifecycle +): Observable { + return createMetricsStream( + createMetricsAggregators({ + config, + resetMetrics$, + taskPollingLifecycle, + }) + ); +} diff --git a/x-pack/plugins/task_manager/server/metrics/metrics_aggregator.mock.ts b/x-pack/plugins/task_manager/server/metrics/metrics_aggregator.mock.ts new file mode 100644 index 0000000000000..691ba9d0290d2 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/metrics_aggregator.mock.ts @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const createIMetricsAggregatorMock = () => { + return jest.fn().mockImplementation(() => { + return { + initialMetric: jest.fn().mockReturnValue({ count: 0 }), + reset: jest.fn(), + collect: jest.fn(), + processTaskLifecycleEvent: jest.fn(), + }; + }); +}; + +export const metricsAggregatorMock = { + create: createIMetricsAggregatorMock(), +}; diff --git a/x-pack/plugins/task_manager/server/metrics/metrics_stream.test.ts b/x-pack/plugins/task_manager/server/metrics/metrics_stream.test.ts new file mode 100644 index 0000000000000..5aec856a7a4f0 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/metrics_stream.test.ts @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Subject } from 'rxjs'; +import { take, bufferCount } from 'rxjs/operators'; +import { createMetricsStream } from './metrics_stream'; +import { JsonValue } from '@kbn/utility-types'; +import { AggregatedStat } from '../lib/runtime_statistics_aggregator'; + +beforeEach(() => { + jest.resetAllMocks(); +}); + +describe('createMetricsStream', () => { + it('incrementally updates the metrics returned by the endpoint', async () => { + const aggregatedStats$ = new Subject(); + + return new Promise((resolve) => { + createMetricsStream(aggregatedStats$) + .pipe(take(3), bufferCount(3)) + .subscribe(([initialValue, secondValue, thirdValue]) => { + expect(initialValue.metrics).toMatchObject({ + lastUpdate: expect.any(String), + metrics: {}, + }); + + expect(secondValue).toMatchObject({ + lastUpdate: expect.any(String), + metrics: { + newAggregatedStat: { + timestamp: expect.any(String), + value: { + some: { + complex: { + value: 123, + }, + }, + }, + }, + }, + }); + + expect(thirdValue).toMatchObject({ + lastUpdate: expect.any(String), + metrics: { + newAggregatedStat: { + timestamp: expect.any(String), + value: { + some: { + updated: { + value: 456, + }, + }, + }, + }, + }, + }); + }); + + aggregatedStats$.next({ + key: 'newAggregatedStat', + value: { + some: { + complex: { + value: 123, + }, + }, + } as JsonValue, + }); + + aggregatedStats$.next({ + key: 'newAggregatedStat', + value: { + some: { + updated: { + value: 456, + }, + }, + } as JsonValue, + }); + + resolve(); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/metrics/metrics_stream.ts b/x-pack/plugins/task_manager/server/metrics/metrics_stream.ts new file mode 100644 index 0000000000000..29558308c5196 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/metrics_stream.ts @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { merge, of, Observable } from 'rxjs'; +import { map, scan } from 'rxjs/operators'; +import { set } from '@kbn/safer-lodash-set'; +import { TaskLifecycleEvent, TaskPollingLifecycle } from '../polling_lifecycle'; +import { TaskManagerConfig } from '../config'; +import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; +import { isTaskPollingCycleEvent, isTaskRunEvent } from '../task_events'; +import { TaskClaimMetric, TaskClaimMetricsAggregator } from './task_claim_metrics_aggregator'; +import { createAggregator } from './create_aggregator'; +import { TaskRunMetric, TaskRunMetricsAggregator } from './task_run_metrics_aggregator'; +export interface Metrics { + last_update: string; + metrics: { + task_claim?: Metric; + task_run?: Metric; + }; +} + +export interface Metric { + timestamp: string; + value: T; +} + +interface CreateMetricsAggregatorsOpts { + config: TaskManagerConfig; + resetMetrics$: Observable; + taskPollingLifecycle?: TaskPollingLifecycle; +} +export function createMetricsAggregators({ + config, + resetMetrics$, + taskPollingLifecycle, +}: CreateMetricsAggregatorsOpts): AggregatedStatProvider { + const aggregators: AggregatedStatProvider[] = []; + if (taskPollingLifecycle) { + aggregators.push( + createAggregator({ + key: 'task_claim', + taskPollingLifecycle, + config, + resetMetrics$, + taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskPollingCycleEvent(taskEvent), + metricsAggregator: new TaskClaimMetricsAggregator(), + }), + createAggregator({ + key: 'task_run', + taskPollingLifecycle, + config, + resetMetrics$, + taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent), + metricsAggregator: new TaskRunMetricsAggregator(), + }) + ); + } + return merge(...aggregators); +} + +export function createMetricsStream(provider$: AggregatedStatProvider): Observable { + const initialMetrics = { + last_update: new Date().toISOString(), + metrics: {}, + }; + return merge( + // emit the initial metrics + of(initialMetrics), + // emit updated metrics whenever a provider updates a specific key on the stats + provider$.pipe( + map(({ key, value }) => { + return { + value: { timestamp: new Date().toISOString(), value }, + key, + }; + }), + scan((metrics: Metrics, { key, value }) => { + // incrementally merge stats as they come in + set(metrics.metrics, key, value); + metrics.last_update = new Date().toISOString(); + return metrics; + }, initialMetrics) + ) + ); +} diff --git a/x-pack/plugins/task_manager/server/metrics/success_rate_counter.test.ts b/x-pack/plugins/task_manager/server/metrics/success_rate_counter.test.ts new file mode 100644 index 0000000000000..eb34f3a34c005 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/success_rate_counter.test.ts @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { SuccessRateCounter } from './success_rate_counter'; + +describe('SuccessRateCounter', () => { + let successRateCounter: SuccessRateCounter; + beforeEach(() => { + successRateCounter = new SuccessRateCounter(); + }); + + test('should correctly initialize', () => { + expect(successRateCounter.get()).toEqual({ success: 0, total: 0 }); + }); + + test('should correctly return initialMetrics', () => { + expect(successRateCounter.initialMetric()).toEqual({ success: 0, total: 0 }); + }); + + test('should correctly increment counter when success is true', () => { + successRateCounter.increment(true); + successRateCounter.increment(true); + expect(successRateCounter.get()).toEqual({ success: 2, total: 2 }); + }); + + test('should correctly increment counter when success is false', () => { + successRateCounter.increment(false); + successRateCounter.increment(false); + expect(successRateCounter.get()).toEqual({ success: 0, total: 2 }); + }); + + test('should correctly reset counter', () => { + successRateCounter.increment(true); + successRateCounter.increment(true); + successRateCounter.increment(false); + successRateCounter.increment(false); + successRateCounter.increment(true); + successRateCounter.increment(true); + successRateCounter.increment(false); + expect(successRateCounter.get()).toEqual({ success: 4, total: 7 }); + + successRateCounter.reset(); + expect(successRateCounter.get()).toEqual({ success: 0, total: 0 }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/metrics/success_rate_counter.ts b/x-pack/plugins/task_manager/server/metrics/success_rate_counter.ts new file mode 100644 index 0000000000000..d9c61575a2698 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/success_rate_counter.ts @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { JsonObject } from '@kbn/utility-types'; + +export interface SuccessRate extends JsonObject { + success: number; + total: number; +} + +export class SuccessRateCounter { + private success = 0; + private total = 0; + + public initialMetric(): SuccessRate { + return { + success: 0, + total: 0, + }; + } + + public get(): SuccessRate { + return { + success: this.success, + total: this.total, + }; + } + + public increment(success: boolean) { + if (success) { + this.success++; + } + this.total++; + } + + public reset() { + this.success = 0; + this.total = 0; + } +} diff --git a/x-pack/plugins/task_manager/server/metrics/task_claim_metrics_aggregator.test.ts b/x-pack/plugins/task_manager/server/metrics/task_claim_metrics_aggregator.test.ts new file mode 100644 index 0000000000000..cfcf4bfdf8d0b --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/task_claim_metrics_aggregator.test.ts @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { none } from 'fp-ts/lib/Option'; +import { FillPoolResult } from '../lib/fill_pool'; +import { asOk, asErr } from '../lib/result_type'; +import { PollingError, PollingErrorType } from '../polling'; +import { asTaskPollingCycleEvent } from '../task_events'; +import { TaskClaimMetricsAggregator } from './task_claim_metrics_aggregator'; + +export const taskClaimSuccessEvent = asTaskPollingCycleEvent( + asOk({ + result: FillPoolResult.PoolFilled, + stats: { + tasksUpdated: 0, + tasksConflicted: 0, + tasksClaimed: 0, + }, + }), + { + start: 1689698780490, + stop: 1689698780500, + } +); +export const taskClaimFailureEvent = asTaskPollingCycleEvent( + asErr( + new PollingError( + 'Failed to poll for work: Error: failed to work', + PollingErrorType.WorkError, + none + ) + ) +); + +describe('TaskClaimMetricsAggregator', () => { + let taskClaimMetricsAggregator: TaskClaimMetricsAggregator; + beforeEach(() => { + taskClaimMetricsAggregator = new TaskClaimMetricsAggregator(); + }); + + test('should correctly initialize', () => { + expect(taskClaimMetricsAggregator.collect()).toEqual({ + success: 0, + total: 0, + duration: { counts: [], values: [] }, + }); + }); + + test('should correctly return initialMetrics', () => { + expect(taskClaimMetricsAggregator.initialMetric()).toEqual({ + success: 0, + total: 0, + duration: { counts: [], values: [] }, + }); + }); + + test('should correctly process task lifecycle success event', () => { + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent); + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent); + expect(taskClaimMetricsAggregator.collect()).toEqual({ + success: 2, + total: 2, + duration: { counts: [2], values: [100] }, + }); + }); + + test('should correctly process task lifecycle failure event', () => { + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimFailureEvent); + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimFailureEvent); + expect(taskClaimMetricsAggregator.collect()).toEqual({ + success: 0, + total: 2, + duration: { counts: [], values: [] }, + }); + }); + + test('should correctly reset counter', () => { + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent); + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent); + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimFailureEvent); + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimFailureEvent); + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent); + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent); + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimFailureEvent); + expect(taskClaimMetricsAggregator.collect()).toEqual({ + success: 4, + total: 7, + duration: { counts: [4], values: [100] }, + }); + + taskClaimMetricsAggregator.reset(); + expect(taskClaimMetricsAggregator.collect()).toEqual({ + success: 0, + total: 0, + duration: { counts: [], values: [] }, + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/metrics/task_claim_metrics_aggregator.ts b/x-pack/plugins/task_manager/server/metrics/task_claim_metrics_aggregator.ts new file mode 100644 index 0000000000000..75c03105287fa --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/task_claim_metrics_aggregator.ts @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +// @ts-expect-error +// eslint-disable-next-line import/no-extraneous-dependencies +import Histogram from 'native-hdr-histogram'; +import { isOk } from '../lib/result_type'; +import { TaskLifecycleEvent } from '../polling_lifecycle'; +import { TaskRun } from '../task_events'; +import { SuccessRate, SuccessRateCounter } from './success_rate_counter'; +import { ITaskMetricsAggregator } from './types'; + +const HDR_HISTOGRAM_MIN = 1; // 1 millis +const HDR_HISTOGRAM_MAX = 30000; // 30 seconds +const HDR_HISTOGRAM_BUCKET_SIZE = 100; // 100 millis + +export type TaskClaimMetric = SuccessRate & { + duration: { + counts: number[]; + values: number[]; + }; +}; + +export class TaskClaimMetricsAggregator implements ITaskMetricsAggregator { + private claimSuccessRate = new SuccessRateCounter(); + private durationHistogram = new Histogram(HDR_HISTOGRAM_MIN, HDR_HISTOGRAM_MAX); + + public initialMetric(): TaskClaimMetric { + return { + ...this.claimSuccessRate.initialMetric(), + duration: { counts: [], values: [] }, + }; + } + public collect(): TaskClaimMetric { + return { + ...this.claimSuccessRate.get(), + duration: this.serializeHistogram(), + }; + } + + public reset() { + this.claimSuccessRate.reset(); + this.durationHistogram.reset(); + } + + public processTaskLifecycleEvent(taskEvent: TaskLifecycleEvent) { + const success = isOk((taskEvent as TaskRun).event); + this.claimSuccessRate.increment(success); + + if (taskEvent.timing) { + const durationInMs = taskEvent.timing.stop - taskEvent.timing.start; + this.durationHistogram.record(durationInMs); + } + } + + private serializeHistogram() { + const counts: number[] = []; + const values: number[] = []; + + for (const { count, value } of this.durationHistogram.linearcounts(HDR_HISTOGRAM_BUCKET_SIZE)) { + counts.push(count); + values.push(value); + } + + return { counts, values }; + } +} diff --git a/x-pack/plugins/task_manager/server/metrics/task_run_metrics_aggregator.test.ts b/x-pack/plugins/task_manager/server/metrics/task_run_metrics_aggregator.test.ts new file mode 100644 index 0000000000000..e3654fd9a21d5 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/task_run_metrics_aggregator.test.ts @@ -0,0 +1,208 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import * as uuid from 'uuid'; +import { asOk, asErr } from '../lib/result_type'; +import { TaskStatus } from '../task'; +import { asTaskRunEvent, TaskPersistence } from '../task_events'; +import { TaskRunResult } from '../task_running'; +import { TaskRunMetricsAggregator } from './task_run_metrics_aggregator'; + +export const getTaskRunSuccessEvent = (type: string) => { + const id = uuid.v4(); + return asTaskRunEvent( + id, + asOk({ + task: { + id, + attempts: 0, + status: TaskStatus.Running, + version: '123', + runAt: new Date(), + scheduledAt: new Date(), + startedAt: new Date(), + retryAt: new Date(Date.now() + 5 * 60 * 1000), + state: {}, + taskType: type, + params: {}, + ownerId: null, + }, + persistence: TaskPersistence.Recurring, + result: TaskRunResult.Success, + }), + { + start: 1689698780490, + stop: 1689698780500, + } + ); +}; + +export const getTaskRunFailedEvent = (type: string) => { + const id = uuid.v4(); + return asTaskRunEvent( + id, + asErr({ + error: new Error('task failed to run'), + task: { + id, + attempts: 0, + status: TaskStatus.Running, + version: '123', + runAt: new Date(), + scheduledAt: new Date(), + startedAt: new Date(), + retryAt: new Date(Date.now() + 5 * 60 * 1000), + state: {}, + taskType: type, + params: {}, + ownerId: null, + }, + persistence: TaskPersistence.Recurring, + result: TaskRunResult.Failed, + }) + ); +}; + +describe('TaskRunMetricsAggregator', () => { + let taskRunMetricsAggregator: TaskRunMetricsAggregator; + beforeEach(() => { + taskRunMetricsAggregator = new TaskRunMetricsAggregator(); + }); + + test('should correctly initialize', () => { + expect(taskRunMetricsAggregator.collect()).toEqual({ + overall: { success: 0, total: 0 }, + by_type: {}, + }); + }); + + test('should correctly return initialMetrics', () => { + expect(taskRunMetricsAggregator.initialMetric()).toEqual({ + overall: { success: 0, total: 0 }, + by_type: {}, + }); + }); + + test('should correctly process task run success event', () => { + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry')); + expect(taskRunMetricsAggregator.collect()).toEqual({ + overall: { success: 2, total: 2 }, + by_type: { + telemetry: { success: 2, total: 2 }, + }, + }); + }); + + test('should correctly process task run failure event', () => { + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry')); + expect(taskRunMetricsAggregator.collect()).toEqual({ + overall: { success: 0, total: 2 }, + by_type: { + telemetry: { success: 0, total: 2 }, + }, + }); + }); + + test('should correctly process different task types', () => { + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry')); + expect(taskRunMetricsAggregator.collect()).toEqual({ + overall: { success: 3, total: 4 }, + by_type: { + report: { success: 2, total: 2 }, + telemetry: { success: 1, total: 2 }, + }, + }); + }); + + test('should correctly group alerting and action task types', () => { + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent( + getTaskRunSuccessEvent('alerting:.index-threshold') + ); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:webhook')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:webhook')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:.email')); + taskRunMetricsAggregator.processTaskLifecycleEvent( + getTaskRunSuccessEvent('alerting:.index-threshold') + ); + expect(taskRunMetricsAggregator.collect()).toEqual({ + overall: { success: 11, total: 14 }, + by_type: { + actions: { success: 3, total: 3 }, + 'actions:.email': { success: 1, total: 1 }, + 'actions:webhook': { success: 2, total: 2 }, + alerting: { success: 5, total: 7 }, + 'alerting:example': { success: 3, total: 5 }, + 'alerting:.index-threshold': { success: 2, total: 2 }, + report: { success: 2, total: 2 }, + telemetry: { success: 1, total: 2 }, + }, + }); + }); + + test('should correctly reset counter', () => { + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent( + getTaskRunSuccessEvent('alerting:.index-threshold') + ); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:webhook')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:webhook')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:.email')); + taskRunMetricsAggregator.processTaskLifecycleEvent( + getTaskRunSuccessEvent('alerting:.index-threshold') + ); + expect(taskRunMetricsAggregator.collect()).toEqual({ + overall: { success: 11, total: 14 }, + by_type: { + actions: { success: 3, total: 3 }, + 'actions:.email': { success: 1, total: 1 }, + 'actions:webhook': { success: 2, total: 2 }, + alerting: { success: 5, total: 7 }, + 'alerting:example': { success: 3, total: 5 }, + 'alerting:.index-threshold': { success: 2, total: 2 }, + report: { success: 2, total: 2 }, + telemetry: { success: 1, total: 2 }, + }, + }); + + taskRunMetricsAggregator.reset(); + expect(taskRunMetricsAggregator.collect()).toEqual({ + overall: { success: 0, total: 0 }, + by_type: { + actions: { success: 0, total: 0 }, + 'actions:.email': { success: 0, total: 0 }, + 'actions:webhook': { success: 0, total: 0 }, + alerting: { success: 0, total: 0 }, + 'alerting:example': { success: 0, total: 0 }, + 'alerting:.index-threshold': { success: 0, total: 0 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/metrics/task_run_metrics_aggregator.ts b/x-pack/plugins/task_manager/server/metrics/task_run_metrics_aggregator.ts new file mode 100644 index 0000000000000..c25d80f112df1 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/task_run_metrics_aggregator.ts @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { JsonObject } from '@kbn/utility-types'; +import { isOk, unwrap } from '../lib/result_type'; +import { TaskLifecycleEvent } from '../polling_lifecycle'; +import { ErroredTask, RanTask, TaskRun } from '../task_events'; +import { SuccessRate, SuccessRateCounter } from './success_rate_counter'; +import { ITaskMetricsAggregator } from './types'; + +const taskTypeGrouping = new Set(['alerting:', 'actions:']); + +export interface TaskRunMetric extends JsonObject { + overall: SuccessRate; + by_type: { + [key: string]: SuccessRate; + }; +} + +export class TaskRunMetricsAggregator implements ITaskMetricsAggregator { + private taskRunSuccessRate = new SuccessRateCounter(); + private taskRunCounter: Map = new Map(); + + public initialMetric(): TaskRunMetric { + return { + overall: this.taskRunSuccessRate.initialMetric(), + by_type: {}, + }; + } + + public collect(): TaskRunMetric { + return { + overall: this.taskRunSuccessRate.get(), + by_type: this.collectTaskTypeEntries(), + }; + } + + public reset() { + this.taskRunSuccessRate.reset(); + for (const taskType of this.taskRunCounter.keys()) { + this.taskRunCounter.get(taskType)!.reset(); + } + } + + public processTaskLifecycleEvent(taskEvent: TaskLifecycleEvent) { + const { task }: RanTask | ErroredTask = unwrap((taskEvent as TaskRun).event); + const taskType = task.taskType; + + const taskTypeSuccessRate: SuccessRateCounter = + this.taskRunCounter.get(taskType) ?? new SuccessRateCounter(); + + const success = isOk((taskEvent as TaskRun).event); + this.taskRunSuccessRate.increment(success); + taskTypeSuccessRate.increment(success); + this.taskRunCounter.set(taskType, taskTypeSuccessRate); + + const taskTypeGroup = this.getTaskTypeGroup(taskType); + if (taskTypeGroup) { + const taskTypeGroupSuccessRate: SuccessRateCounter = + this.taskRunCounter.get(taskTypeGroup) ?? new SuccessRateCounter(); + taskTypeGroupSuccessRate.increment(success); + this.taskRunCounter.set(taskTypeGroup, taskTypeGroupSuccessRate); + } + } + + private collectTaskTypeEntries() { + const collected: Record = {}; + for (const [key, value] of this.taskRunCounter) { + collected[key] = value.get(); + } + return collected; + } + + private getTaskTypeGroup(taskType: string): string | undefined { + for (const group of taskTypeGrouping) { + if (taskType.startsWith(group)) { + return group.replaceAll(':', ''); + } + } + } +} diff --git a/x-pack/plugins/task_manager/server/metrics/types.ts b/x-pack/plugins/task_manager/server/metrics/types.ts new file mode 100644 index 0000000000000..7fbee1fe8abdd --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/types.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { TaskLifecycleEvent } from '../polling_lifecycle'; + +export interface ITaskMetricsAggregator { + initialMetric: () => T; + collect: () => T; + reset: () => void; + processTaskLifecycleEvent: (taskEvent: TaskLifecycleEvent) => void; +} diff --git a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts index cdd67a07ff9e7..9507b3ab0e4cd 100644 --- a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts @@ -19,7 +19,7 @@ import { import { asOk } from '../lib/result_type'; import { TaskLifecycleEvent } from '../polling_lifecycle'; import { TaskRunResult } from '../task_running'; -import { AggregatedStat } from './runtime_statistics_aggregator'; +import { AggregatedStat } from '../lib/runtime_statistics_aggregator'; import { taskPollingLifecycleMock } from '../polling_lifecycle.mock'; import { BackgroundTaskUtilizationStat, diff --git a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts index fd116cbdd71d8..837f29c83f108 100644 --- a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts @@ -20,7 +20,7 @@ import { TaskTiming, } from '../task_events'; import { MonitoredStat } from './monitoring_stats_stream'; -import { AggregatedStat, AggregatedStatProvider } from './runtime_statistics_aggregator'; +import { AggregatedStat, AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; import { createRunningAveragedStat } from './task_run_calcultors'; import { DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW } from '../config'; diff --git a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts index 98493ae89b683..689c9c882bee3 100644 --- a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts @@ -52,6 +52,7 @@ describe('Configuration Statistics Aggregator', () => { delay: 3000, max_attempts: 20, }, + metrics_reset_interval: 3000, }; const managedConfig = { diff --git a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.ts index 6414c9e80ce06..2212affcc8db3 100644 --- a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.ts @@ -8,7 +8,7 @@ import { combineLatest, of } from 'rxjs'; import { pick, merge } from 'lodash'; import { map, startWith } from 'rxjs/operators'; -import { AggregatedStatProvider } from './runtime_statistics_aggregator'; +import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; import { TaskManagerConfig } from '../config'; import { ManagedConfiguration } from '../lib/create_managed_configuration'; diff --git a/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.test.ts index 8a2305c3076a5..8d4ef4fab2eba 100644 --- a/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.test.ts @@ -26,7 +26,7 @@ import { SummarizedEphemeralTaskStat, EphemeralTaskStat, } from './ephemeral_task_statistics'; -import { AggregatedStat } from './runtime_statistics_aggregator'; +import { AggregatedStat } from '../lib/runtime_statistics_aggregator'; import { ephemeralTaskLifecycleMock } from '../ephemeral_task_lifecycle.mock'; import { times, takeRight, take as takeLeft } from 'lodash'; diff --git a/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.ts index 52aa2b1eead25..8a6ade503b041 100644 --- a/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.ts @@ -9,7 +9,7 @@ import { map, filter, startWith, buffer, share } from 'rxjs/operators'; import { JsonObject } from '@kbn/utility-types'; import { combineLatest, Observable, zip } from 'rxjs'; import { isOk, Ok } from '../lib/result_type'; -import { AggregatedStat, AggregatedStatProvider } from './runtime_statistics_aggregator'; +import { AggregatedStat, AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; import { EphemeralTaskLifecycle } from '../ephemeral_task_lifecycle'; import { TaskLifecycleEvent } from '../polling_lifecycle'; import { isTaskRunEvent, isTaskManagerStatEvent } from '../task_events'; diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts index 995db14fa09ea..daf3f2baf085d 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts @@ -8,8 +8,9 @@ import { TaskManagerConfig } from '../config'; import { of, Subject } from 'rxjs'; import { take, bufferCount } from 'rxjs/operators'; -import { createMonitoringStatsStream, AggregatedStat } from './monitoring_stats_stream'; +import { createMonitoringStatsStream } from './monitoring_stats_stream'; import { JsonValue } from '@kbn/utility-types'; +import { AggregatedStat } from '../lib/runtime_statistics_aggregator'; beforeEach(() => { jest.resetAllMocks(); @@ -56,6 +57,7 @@ describe('createMonitoringStatsStream', () => { delay: 3000, max_attempts: 20, }, + metrics_reset_interval: 3000, }; it('returns the initial config used to configure Task Manager', async () => { diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts index e1ff38d1c9607..62505a34d7f89 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts @@ -37,13 +37,11 @@ import { import { ConfigStat, createConfigurationAggregator } from './configuration_statistics'; import { TaskManagerConfig } from '../config'; -import { AggregatedStatProvider } from './runtime_statistics_aggregator'; import { ManagedConfiguration } from '../lib/create_managed_configuration'; import { EphemeralTaskLifecycle } from '../ephemeral_task_lifecycle'; import { CapacityEstimationStat, withCapacityEstimate } from './capacity_estimation'; import { AdHocTaskCounter } from '../lib/adhoc_task_counter'; - -export type { AggregatedStatProvider, AggregatedStat } from './runtime_statistics_aggregator'; +import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; export interface MonitoringStats { last_update: string; diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts index 4d69b23b699b7..91e81013b726f 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts @@ -30,7 +30,7 @@ import { TaskRunStat, SummarizedTaskRunStat, } from './task_run_statistics'; -import { AggregatedStat } from './runtime_statistics_aggregator'; +import { AggregatedStat } from '../lib/runtime_statistics_aggregator'; import { FillPoolResult } from '../lib/fill_pool'; import { taskPollingLifecycleMock } from '../polling_lifecycle.mock'; import { configSchema } from '../config'; diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts index 0c6063af19286..7b7db8cb25eed 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts @@ -10,7 +10,7 @@ import { filter, startWith, map } from 'rxjs/operators'; import { JsonObject, JsonValue } from '@kbn/utility-types'; import { isNumber, mapValues } from 'lodash'; import { Logger } from '@kbn/core/server'; -import { AggregatedStatProvider, AggregatedStat } from './runtime_statistics_aggregator'; +import { AggregatedStatProvider, AggregatedStat } from '../lib/runtime_statistics_aggregator'; import { TaskLifecycleEvent } from '../polling_lifecycle'; import { isTaskRunEvent, diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts index bacd05dcb6a06..b4d5db14a12e4 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts @@ -12,7 +12,7 @@ import { JsonObject } from '@kbn/utility-types'; import { keyBy, mapValues } from 'lodash'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import type { AggregationResultOf } from '@kbn/es-types'; -import { AggregatedStatProvider } from './runtime_statistics_aggregator'; +import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; import { parseIntervalAsSecond, asInterval, parseIntervalAsMillisecond } from '../lib/intervals'; import { HealthStatus } from './monitoring_stats_stream'; import { TaskStore } from '../task_store'; diff --git a/x-pack/plugins/task_manager/server/plugin.test.ts b/x-pack/plugins/task_manager/server/plugin.test.ts index 4c0c96c7f76a6..1e7215d6d7a1b 100644 --- a/x-pack/plugins/task_manager/server/plugin.test.ts +++ b/x-pack/plugins/task_manager/server/plugin.test.ts @@ -77,6 +77,7 @@ const pluginInitializerContextParams = { delay: 3000, max_attempts: 20, }, + metrics_reset_interval: 3000, }; describe('TaskManagerPlugin', () => { diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index e65574cef779a..3b8ab4a54be1f 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -27,7 +27,7 @@ import { TaskDefinitionRegistry, TaskTypeDictionary, REMOVED_TYPES } from './tas import { AggregationOpts, FetchResult, SearchOpts, TaskStore } from './task_store'; import { createManagedConfiguration } from './lib/create_managed_configuration'; import { TaskScheduling } from './task_scheduling'; -import { backgroundTaskUtilizationRoute, healthRoute } from './routes'; +import { backgroundTaskUtilizationRoute, healthRoute, metricsRoute } from './routes'; import { createMonitoringStats, MonitoringStats } from './monitoring'; import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle'; import { EphemeralTask, ConcreteTaskInstance } from './task'; @@ -35,6 +35,7 @@ import { registerTaskManagerUsageCollector } from './usage'; import { TASK_MANAGER_INDEX } from './constants'; import { AdHocTaskCounter } from './lib/adhoc_task_counter'; import { setupIntervalLogging } from './lib/log_health_metrics'; +import { metricsStream, Metrics } from './metrics'; export interface TaskManagerSetupContract { /** @@ -82,6 +83,8 @@ export class TaskManagerPlugin private middleware: Middleware = createInitialMiddleware(); private elasticsearchAndSOAvailability$?: Observable; private monitoringStats$ = new Subject(); + private metrics$ = new Subject(); + private resetMetrics$ = new Subject(); private shouldRunBackgroundTasks: boolean; private readonly kibanaVersion: PluginInitializerContext['env']['packageInfo']['version']; private adHocTaskCounter: AdHocTaskCounter; @@ -155,6 +158,12 @@ export class TaskManagerPlugin getClusterClient: () => startServicesPromise.then(({ elasticsearch }) => elasticsearch.client), }); + metricsRoute({ + router, + metrics$: this.metrics$, + resetMetrics$: this.resetMetrics$, + taskManagerId: this.taskManagerId, + }); core.status.derivedStatus$.subscribe((status) => this.logger.debug(`status core.status.derivedStatus now set to ${status.level}`) @@ -276,6 +285,10 @@ export class TaskManagerPlugin this.ephemeralTaskLifecycle ).subscribe((stat) => this.monitoringStats$.next(stat)); + metricsStream(this.config!, this.resetMetrics$, this.taskPollingLifecycle).subscribe((metric) => + this.metrics$.next(metric) + ); + const taskScheduling = new TaskScheduling({ logger: this.logger, taskStore, diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index 62e6be589b4cf..79b153f42a88d 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -82,6 +82,7 @@ describe('TaskPollingLifecycle', () => { delay: 3000, max_attempts: 20, }, + metrics_reset_interval: 3000, }, taskStore: mockTaskStore, logger: taskManagerLogger, diff --git a/x-pack/plugins/task_manager/server/routes/index.ts b/x-pack/plugins/task_manager/server/routes/index.ts index f3ba539323f8e..372996f7cea3d 100644 --- a/x-pack/plugins/task_manager/server/routes/index.ts +++ b/x-pack/plugins/task_manager/server/routes/index.ts @@ -7,3 +7,4 @@ export { healthRoute } from './health'; export { backgroundTaskUtilizationRoute } from './background_task_utilization'; +export { metricsRoute } from './metrics'; diff --git a/x-pack/plugins/task_manager/server/routes/metrics.test.ts b/x-pack/plugins/task_manager/server/routes/metrics.test.ts new file mode 100644 index 0000000000000..a9703aa7548dd --- /dev/null +++ b/x-pack/plugins/task_manager/server/routes/metrics.test.ts @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { of, Subject } from 'rxjs'; +import { v4 as uuidv4 } from 'uuid'; +import { httpServiceMock } from '@kbn/core/server/mocks'; +import { metricsRoute } from './metrics'; +import { mockHandlerArguments } from './_mock_handler_arguments'; + +describe('metricsRoute', () => { + beforeEach(() => { + jest.resetAllMocks(); + }); + + it('registers route', async () => { + const router = httpServiceMock.createRouter(); + metricsRoute({ + router, + metrics$: of(), + resetMetrics$: new Subject(), + taskManagerId: uuidv4(), + }); + + const [config] = router.get.mock.calls[0]; + + expect(config.path).toMatchInlineSnapshot(`"/api/task_manager/metrics"`); + }); + + it('emits resetMetric$ event when route is accessed and reset query param is true', async () => { + let resetCalledTimes = 0; + const resetMetrics$ = new Subject(); + + resetMetrics$.subscribe(() => { + resetCalledTimes++; + }); + const router = httpServiceMock.createRouter(); + metricsRoute({ + router, + metrics$: of(), + resetMetrics$, + taskManagerId: uuidv4(), + }); + + const [config, handler] = router.get.mock.calls[0]; + const [context, req, res] = mockHandlerArguments({}, { query: { reset: true } }, ['ok']); + + expect(config.path).toMatchInlineSnapshot(`"/api/task_manager/metrics"`); + + await handler(context, req, res); + + expect(resetCalledTimes).toEqual(1); + }); + + it('does not emit resetMetric$ event when route is accessed and reset query param is false', async () => { + let resetCalledTimes = 0; + const resetMetrics$ = new Subject(); + + resetMetrics$.subscribe(() => { + resetCalledTimes++; + }); + const router = httpServiceMock.createRouter(); + metricsRoute({ + router, + metrics$: of(), + resetMetrics$, + taskManagerId: uuidv4(), + }); + + const [config, handler] = router.get.mock.calls[0]; + const [context, req, res] = mockHandlerArguments({}, { query: { reset: false } }, ['ok']); + + expect(config.path).toMatchInlineSnapshot(`"/api/task_manager/metrics"`); + + await handler(context, req, res); + + expect(resetCalledTimes).toEqual(0); + }); +}); diff --git a/x-pack/plugins/task_manager/server/routes/metrics.ts b/x-pack/plugins/task_manager/server/routes/metrics.ts new file mode 100644 index 0000000000000..f9dcf447fa101 --- /dev/null +++ b/x-pack/plugins/task_manager/server/routes/metrics.ts @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + IRouter, + RequestHandlerContext, + KibanaRequest, + IKibanaResponse, + KibanaResponseFactory, +} from '@kbn/core/server'; +import { schema, TypeOf } from '@kbn/config-schema'; +import { Observable, Subject } from 'rxjs'; +import { Metrics } from '../metrics'; + +export interface NodeMetrics { + process_uuid: string; + timestamp: string; + last_update: string; + metrics: Metrics['metrics'] | null; +} + +export interface MetricsRouteParams { + router: IRouter; + metrics$: Observable; + resetMetrics$: Subject; + taskManagerId: string; +} + +const QuerySchema = schema.object({ + reset: schema.boolean({ defaultValue: true }), +}); + +export function metricsRoute(params: MetricsRouteParams) { + const { router, metrics$, resetMetrics$, taskManagerId } = params; + + let lastMetrics: NodeMetrics | null = null; + + metrics$.subscribe((metrics) => { + lastMetrics = { process_uuid: taskManagerId, timestamp: new Date().toISOString(), ...metrics }; + }); + + router.get( + { + path: `/api/task_manager/metrics`, + // Uncomment when we determine that we can restrict API usage to Global admins based on telemetry + // options: { tags: ['access:taskManager'] }, + validate: { + query: QuerySchema, + }, + }, + async function ( + _: RequestHandlerContext, + req: KibanaRequest, unknown>, + res: KibanaResponseFactory + ): Promise { + if (req.query.reset) { + resetMetrics$.next(true); + } + + return res.ok({ + body: lastMetrics + ? lastMetrics + : { process_uuid: taskManagerId, timestamp: new Date().toISOString(), metrics: {} }, + }); + } + ); +} diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts index 97eeb17f0cd4e..7e897840f72c7 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts @@ -1298,6 +1298,45 @@ describe('TaskManagerRunner', () => { ); }); + test('emits TaskEvent when a recurring task returns a success result with hasError=true', async () => { + const id = _.random(1, 20).toString(); + const runAt = minutesFromNow(_.random(5)); + const onTaskEvent = jest.fn(); + const { runner, instance } = await readyToRunStageSetup({ + onTaskEvent, + instance: { + id, + schedule: { interval: '1m' }, + }, + definitions: { + bar: { + title: 'Bar!', + createTaskRunner: () => ({ + async run() { + return { runAt, state: {}, hasError: true }; + }, + }), + }, + }, + }); + + await runner.run(); + + expect(onTaskEvent).toHaveBeenCalledWith( + withAnyTiming( + asTaskRunEvent( + id, + asErr({ + task: instance, + persistence: TaskPersistence.Recurring, + result: TaskRunResult.Success, + error: new Error(`Alerting task failed to run.`), + }) + ) + ) + ); + }); + test('emits TaskEvent when a task run throws an error', async () => { const id = _.random(1, 20).toString(); const error = new Error('Dangit!'); diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts index 8ad020684e269..e8ec5cb0f2d91 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -47,6 +47,7 @@ import { FailedRunResult, FailedTaskResult, isFailedRunResult, + RunContext, SuccessfulRunResult, TaskDefinition, TaskStatus, @@ -321,9 +322,9 @@ export class TaskManagerRunner implements TaskRunner { let taskParamsValidation; if (this.requeueInvalidTasksConfig.enabled) { - taskParamsValidation = this.validateTaskParams(); + taskParamsValidation = this.validateTaskParams(modifiedContext); if (!taskParamsValidation.error) { - taskParamsValidation = await this.validateIndirectTaskParams(); + taskParamsValidation = await this.validateIndirectTaskParams(modifiedContext); } } @@ -359,9 +360,9 @@ export class TaskManagerRunner implements TaskRunner { } } - private validateTaskParams() { + private validateTaskParams({ taskInstance }: RunContext) { let error; - const { state, taskType, params, id, numSkippedRuns = 0 } = this.instance.task; + const { state, taskType, params, id, numSkippedRuns = 0 } = taskInstance; const { max_attempts: maxAttempts } = this.requeueInvalidTasksConfig; try { @@ -383,9 +384,9 @@ export class TaskManagerRunner implements TaskRunner { return { ...(error ? { error } : {}), state }; } - private async validateIndirectTaskParams() { + private async validateIndirectTaskParams({ taskInstance }: RunContext) { let error; - const { state, taskType, id, numSkippedRuns = 0 } = this.instance.task; + const { state, taskType, id, numSkippedRuns = 0 } = taskInstance; const { max_attempts: maxAttempts } = this.requeueInvalidTasksConfig; const indirectParamsSchema = this.definition.indirectParamsSchema; @@ -735,23 +736,30 @@ export class TaskManagerRunner implements TaskRunner { await eitherAsync( result, - async ({ runAt, schedule }: SuccessfulRunResult) => { - this.onTaskEvent( - asTaskRunEvent( - this.id, - asOk({ - task, - persistence: - schedule || task.schedule - ? TaskPersistence.Recurring - : TaskPersistence.NonRecurring, - result: await (runAt || schedule || task.schedule - ? this.processResultForRecurringTask(result) - : this.processResultWhenDone()), - }), - taskTiming - ) - ); + async ({ runAt, schedule, hasError }: SuccessfulRunResult) => { + const processedResult = { + task, + persistence: + schedule || task.schedule ? TaskPersistence.Recurring : TaskPersistence.NonRecurring, + result: await (runAt || schedule || task.schedule + ? this.processResultForRecurringTask(result) + : this.processResultWhenDone()), + }; + + // Alerting task runner returns SuccessfulRunResult with hasError=true + // when the alerting task fails, so we check for this condition in order + // to emit the correct task run event for metrics collection + const taskRunEvent = hasError + ? asTaskRunEvent( + this.id, + asErr({ + ...processedResult, + error: new Error(`Alerting task failed to run.`), + }), + taskTiming + ) + : asTaskRunEvent(this.id, asOk(processedResult), taskTiming); + this.onTaskEvent(taskRunEvent); }, async ({ error }: FailedRunResult) => { this.onTaskEvent( diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/index.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/index.ts index af17d1b76ed99..420dfe795f322 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/index.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/index.ts @@ -10,6 +10,7 @@ import { FtrProviderContext } from '../../ftr_provider_context'; export default function ({ loadTestFile }: FtrProviderContext) { describe('task_manager', function taskManagerSuite() { loadTestFile(require.resolve('./background_task_utilization_route')); + loadTestFile(require.resolve('./metrics_route')); loadTestFile(require.resolve('./health_route')); loadTestFile(require.resolve('./task_management')); loadTestFile(require.resolve('./task_management_scheduled_at')); diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/metrics_route.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/metrics_route.ts new file mode 100644 index 0000000000000..4da679b6839ac --- /dev/null +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/metrics_route.ts @@ -0,0 +1,227 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import url from 'url'; +import supertest from 'supertest'; +import { NodeMetrics } from '@kbn/task-manager-plugin/server/routes/metrics'; +import { FtrProviderContext } from '../../ftr_provider_context'; + +export default function ({ getService }: FtrProviderContext) { + const config = getService('config'); + const retry = getService('retry'); + const request = supertest(url.format(config.get('servers.kibana'))); + + const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + + function getMetricsRequest(reset: boolean = false) { + return request + .get(`/api/task_manager/metrics${reset ? '' : '?reset=false'}`) + .set('kbn-xsrf', 'foo') + .expect(200) + .then((response) => response.body); + } + + function getMetrics( + reset: boolean = false, + callback: (metrics: NodeMetrics) => boolean + ): Promise { + return retry.try(async () => { + const metrics = await getMetricsRequest(reset); + + if (metrics.metrics && callback(metrics)) { + return metrics; + } + + await delay(500); + throw new Error('Expected metrics not received'); + }); + } + + describe('task manager metrics', () => { + describe('task claim', () => { + it('should increment task claim success/total counters', async () => { + // counters are reset every 30 seconds, so wait until the start of a + // fresh counter cycle to make sure values are incrementing + const initialMetrics = ( + await getMetrics(false, (metrics) => metrics?.metrics?.task_claim?.value.total === 1) + ).metrics; + expect(initialMetrics).not.to.be(null); + expect(initialMetrics?.task_claim).not.to.be(null); + expect(initialMetrics?.task_claim?.value).not.to.be(null); + + let previousTaskClaimSuccess = initialMetrics?.task_claim?.value.total!; + let previousTaskClaimTotal = initialMetrics?.task_claim?.value.success!; + let previousTaskClaimTimestamp: string = initialMetrics?.task_claim?.timestamp!; + + for (let i = 0; i < 5; ++i) { + const metrics = ( + await getMetrics( + false, + (m: NodeMetrics) => m.metrics?.task_claim?.timestamp !== previousTaskClaimTimestamp + ) + ).metrics; + expect(metrics).not.to.be(null); + expect(metrics?.task_claim).not.to.be(null); + expect(metrics?.task_claim?.value).not.to.be(null); + + expect(metrics?.task_claim?.value.success).to.be.greaterThan(previousTaskClaimSuccess); + expect(metrics?.task_claim?.value.total).to.be.greaterThan(previousTaskClaimTotal); + + previousTaskClaimTimestamp = metrics?.task_claim?.timestamp!; + previousTaskClaimSuccess = metrics?.task_claim?.value.success!; + previousTaskClaimTotal = metrics?.task_claim?.value.total!; + + // check that duration histogram exists + expect(metrics?.task_claim?.value.duration).not.to.be(null); + expect(Array.isArray(metrics?.task_claim?.value.duration.counts)).to.be(true); + expect(Array.isArray(metrics?.task_claim?.value.duration.values)).to.be(true); + } + }); + + it('should reset task claim success/total counters at an interval', async () => { + const initialCounterValue = 7; + const initialMetrics = ( + await getMetrics( + false, + (metrics) => metrics?.metrics?.task_claim?.value.total === initialCounterValue + ) + ).metrics; + expect(initialMetrics).not.to.be(null); + expect(initialMetrics?.task_claim).not.to.be(null); + expect(initialMetrics?.task_claim?.value).not.to.be(null); + + // retry until counter value resets + const resetMetrics = ( + await getMetrics(false, (m: NodeMetrics) => m?.metrics?.task_claim?.value.total === 1) + ).metrics; + expect(resetMetrics).not.to.be(null); + expect(resetMetrics?.task_claim).not.to.be(null); + expect(resetMetrics?.task_claim?.value).not.to.be(null); + }); + + it('should reset task claim success/total counters on request', async () => { + const initialCounterValue = 1; + const initialMetrics = ( + await getMetrics( + false, + (metrics) => metrics?.metrics?.task_claim?.value.total === initialCounterValue + ) + ).metrics; + expect(initialMetrics).not.to.be(null); + expect(initialMetrics?.task_claim).not.to.be(null); + expect(initialMetrics?.task_claim?.value).not.to.be(null); + + let previousTaskClaimTimestamp: string = initialMetrics?.task_claim?.timestamp!; + + for (let i = 0; i < 5; ++i) { + const metrics = ( + await getMetrics( + true, + (m: NodeMetrics) => m.metrics?.task_claim?.timestamp !== previousTaskClaimTimestamp + ) + ).metrics; + expect(metrics).not.to.be(null); + expect(metrics?.task_claim).not.to.be(null); + expect(metrics?.task_claim?.value).not.to.be(null); + + expect(metrics?.task_claim?.value.success).to.equal(1); + expect(metrics?.task_claim?.value.total).to.equal(1); + + previousTaskClaimTimestamp = metrics?.task_claim?.timestamp!; + + // check that duration histogram exists + expect(metrics?.task_claim?.value.duration).not.to.be(null); + expect(Array.isArray(metrics?.task_claim?.value.duration.counts)).to.be(true); + expect(Array.isArray(metrics?.task_claim?.value.duration.values)).to.be(true); + } + }); + }); + + describe('task run test', () => { + let ruleId: string | null = null; + before(async () => { + // create a rule that fires actions + const rule = await request + .post(`/api/alerting/rule`) + .set('kbn-xsrf', 'foo') + .send({ + enabled: true, + name: 'test rule', + tags: [], + rule_type_id: '.es-query', + consumer: 'alerts', + // set schedule long so we can control when it runs + schedule: { interval: '1d' }, + actions: [], + params: { + aggType: 'count', + esQuery: '{\n "query":{\n "match_all" : {}\n }\n}', + excludeHitsFromPreviousRun: false, + groupBy: 'all', + index: ['.kibana-event-log*'], + searchType: 'esQuery', + size: 100, + termSize: 5, + threshold: [0], + thresholdComparator: '>', + timeField: '@timestamp', + timeWindowSize: 5, + timeWindowUnit: 'm', + }, + }) + .expect(200) + .then((response) => response.body); + + ruleId = rule.id; + }); + + after(async () => { + // delete rule + await request.delete(`/api/alerting/rule/${ruleId}`).set('kbn-xsrf', 'foo').expect(204); + }); + + it('should increment task run success/total counters', async () => { + const initialMetrics = ( + await getMetrics( + false, + (metrics) => + metrics?.metrics?.task_run?.value.by_type.alerting?.total === 1 && + metrics?.metrics?.task_run?.value.by_type.alerting?.success === 1 + ) + ).metrics; + expect(initialMetrics).not.to.be(null); + expect(initialMetrics?.task_claim).not.to.be(null); + expect(initialMetrics?.task_claim?.value).not.to.be(null); + + for (let i = 0; i < 1; ++i) { + // run the rule and expect counters to increment + await request + .post('/api/sample_tasks/run_soon') + .set('kbn-xsrf', 'xxx') + .send({ task: { id: ruleId } }) + .expect(200); + + await getMetrics( + false, + (metrics) => + metrics?.metrics?.task_run?.value.by_type.alerting?.total === i + 2 && + metrics?.metrics?.task_run?.value.by_type.alerting?.success === i + 2 + ); + } + + // counter should reset on its own + await getMetrics( + false, + (metrics) => + metrics?.metrics?.task_run?.value.by_type.alerting?.total === 0 && + metrics?.metrics?.task_run?.value.by_type.alerting?.success === 0 + ); + }); + }); + }); +}