From 582d97ddb20e594de5300524e6c3b7851ed81f9a Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Thu, 10 Aug 2023 10:23:11 -0400 Subject: [PATCH 1/3] [Response Ops][Task Manager] Expose SLI metrics in HTTP API (#162178) Towards https://github.com/elastic/kibana/issues/160334 ## Summary Exposes a new HTTP API at `/api/task_manager/metrics` that collects SLI metrics for task manager. The following metrics are exposed: - count of task claim successes & count of task claim tries - this is a counter metric that keeps track over overall task claim success, not task claim success of individual background task workers - count of task run success & count of task runs - this is a counter metric that keeps track of overall task run successes, as well as successes grouped by task type. Alerting and action task types are rolled up into an `alerting` and an `actions` group to allow us to calculate SLIs across all alerting rules and all actions - task claim duration in milliseconds - this is a histogram counter metric that is bucketed into 100 ms buckets These counter metrics are incremented until a reset event is received, in which case the counter is reset back to 0. This allows the collection mechanism (in this case Elastic Agent) to determine the interval at which these metrics are collected as well as to collect the rate of change for these SLI metrics without having to perform complicated Elasticsearch aggregation math. In addition, the counters are reset every 30 seconds (this is configurable) to avoid providing the metrics collector with stale data in case of a collector outage. Flaky test runner: https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/2813 --------- Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> --- .../task_manager/server/config.test.ts | 3 + x-pack/plugins/task_manager/server/config.ts | 107 +- .../server/ephemeral_task_lifecycle.test.ts | 1 + .../managed_configuration.test.ts | 1 + .../runtime_statistics_aggregator.ts | 0 .../server/metrics/create_aggregator.test.ts | 1070 +++++++++++++++++ .../server/metrics/create_aggregator.ts | 57 + .../task_manager/server/metrics/index.ts | 26 + .../server/metrics/metrics_aggregator.mock.ts | 21 + .../server/metrics/metrics_stream.test.ts | 89 ++ .../server/metrics/metrics_stream.ts | 89 ++ .../metrics/success_rate_counter.test.ts | 49 + .../server/metrics/success_rate_counter.ts | 44 + .../task_claim_metrics_aggregator.test.ts | 102 ++ .../metrics/task_claim_metrics_aggregator.ts | 71 ++ .../task_run_metrics_aggregator.test.ts | 208 ++++ .../metrics/task_run_metrics_aggregator.ts | 85 ++ .../task_manager/server/metrics/types.ts | 15 + ...ground_task_utilization_statistics.test.ts | 2 +- .../background_task_utilization_statistics.ts | 2 +- .../configuration_statistics.test.ts | 1 + .../monitoring/configuration_statistics.ts | 2 +- .../ephemeral_task_statistics.test.ts | 2 +- .../monitoring/ephemeral_task_statistics.ts | 2 +- .../monitoring_stats_stream.test.ts | 4 +- .../monitoring/monitoring_stats_stream.ts | 4 +- .../monitoring/task_run_statistics.test.ts | 2 +- .../server/monitoring/task_run_statistics.ts | 2 +- .../server/monitoring/workload_statistics.ts | 2 +- .../task_manager/server/plugin.test.ts | 1 + x-pack/plugins/task_manager/server/plugin.ts | 15 +- .../server/polling_lifecycle.test.ts | 1 + .../task_manager/server/routes/index.ts | 1 + .../server/routes/metrics.test.ts | 82 ++ .../task_manager/server/routes/metrics.ts | 71 ++ .../server/task_running/task_runner.test.ts | 39 + .../server/task_running/task_runner.ts | 54 +- .../test_suites/task_manager/index.ts | 1 + .../test_suites/task_manager/metrics_route.ts | 227 ++++ 39 files changed, 2469 insertions(+), 86 deletions(-) rename x-pack/plugins/task_manager/server/{monitoring => lib}/runtime_statistics_aggregator.ts (100%) create mode 100644 x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts create mode 100644 x-pack/plugins/task_manager/server/metrics/create_aggregator.ts create mode 100644 x-pack/plugins/task_manager/server/metrics/index.ts create mode 100644 x-pack/plugins/task_manager/server/metrics/metrics_aggregator.mock.ts create mode 100644 x-pack/plugins/task_manager/server/metrics/metrics_stream.test.ts create mode 100644 x-pack/plugins/task_manager/server/metrics/metrics_stream.ts create mode 100644 x-pack/plugins/task_manager/server/metrics/success_rate_counter.test.ts create mode 100644 x-pack/plugins/task_manager/server/metrics/success_rate_counter.ts create mode 100644 x-pack/plugins/task_manager/server/metrics/task_claim_metrics_aggregator.test.ts create mode 100644 x-pack/plugins/task_manager/server/metrics/task_claim_metrics_aggregator.ts create mode 100644 x-pack/plugins/task_manager/server/metrics/task_run_metrics_aggregator.test.ts create mode 100644 x-pack/plugins/task_manager/server/metrics/task_run_metrics_aggregator.ts create mode 100644 x-pack/plugins/task_manager/server/metrics/types.ts create mode 100644 x-pack/plugins/task_manager/server/routes/metrics.test.ts create mode 100644 x-pack/plugins/task_manager/server/routes/metrics.ts create mode 100644 x-pack/test/plugin_api_integration/test_suites/task_manager/metrics_route.ts diff --git a/x-pack/plugins/task_manager/server/config.test.ts b/x-pack/plugins/task_manager/server/config.test.ts index 9782d6ae08dbf..c196a334931ba 100644 --- a/x-pack/plugins/task_manager/server/config.test.ts +++ b/x-pack/plugins/task_manager/server/config.test.ts @@ -23,6 +23,7 @@ describe('config validation', () => { }, "max_attempts": 3, "max_workers": 10, + "metrics_reset_interval": 30000, "monitored_aggregated_stats_refresh_rate": 60000, "monitored_stats_health_verbose_log": Object { "enabled": false, @@ -81,6 +82,7 @@ describe('config validation', () => { }, "max_attempts": 3, "max_workers": 10, + "metrics_reset_interval": 30000, "monitored_aggregated_stats_refresh_rate": 60000, "monitored_stats_health_verbose_log": Object { "enabled": false, @@ -137,6 +139,7 @@ describe('config validation', () => { }, "max_attempts": 3, "max_workers": 10, + "metrics_reset_interval": 30000, "monitored_aggregated_stats_refresh_rate": 60000, "monitored_stats_health_verbose_log": Object { "enabled": false, diff --git a/x-pack/plugins/task_manager/server/config.ts b/x-pack/plugins/task_manager/server/config.ts index c2d4940d36450..490d25a7bdfb0 100644 --- a/x-pack/plugins/task_manager/server/config.ts +++ b/x-pack/plugins/task_manager/server/config.ts @@ -20,6 +20,8 @@ export const DEFAULT_MONITORING_REFRESH_RATE = 60 * 1000; export const DEFAULT_MONITORING_STATS_RUNNING_AVERAGE_WINDOW = 50; export const DEFAULT_MONITORING_STATS_WARN_DELAYED_TASK_START_IN_SECONDS = 60; +export const DEFAULT_METRICS_RESET_INTERVAL = 30 * 1000; // 30 seconds + // At the default poll interval of 3sec, this averages over the last 15sec. export const DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW = 5; @@ -52,46 +54,40 @@ const eventLoopDelaySchema = schema.object({ }); const requeueInvalidTasksConfig = schema.object({ - enabled: schema.boolean({ defaultValue: false }), delay: schema.number({ defaultValue: 3000, min: 0 }), + enabled: schema.boolean({ defaultValue: false }), max_attempts: schema.number({ defaultValue: 100, min: 1, max: 500 }), }); export const configSchema = schema.object( { + allow_reading_invalid_state: schema.boolean({ defaultValue: true }), + ephemeral_tasks: schema.object({ + enabled: schema.boolean({ defaultValue: false }), + /* How many requests can Task Manager buffer before it rejects new requests. */ + request_capacity: schema.number({ + // a nice round contrived number, feel free to change as we learn how it behaves + defaultValue: 10, + min: 1, + max: DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY, + }), + }), + event_loop_delay: eventLoopDelaySchema, /* The maximum number of times a task will be attempted before being abandoned as failed */ max_attempts: schema.number({ defaultValue: 3, min: 1, }), - /* How often, in milliseconds, the task manager will look for more work. */ - poll_interval: schema.number({ - defaultValue: DEFAULT_POLL_INTERVAL, - min: 100, - }), - /* How many requests can Task Manager buffer before it rejects new requests. */ - request_capacity: schema.number({ - // a nice round contrived number, feel free to change as we learn how it behaves - defaultValue: 1000, - min: 1, - }), /* The maximum number of tasks that this Kibana instance will run simultaneously. */ max_workers: schema.number({ defaultValue: DEFAULT_MAX_WORKERS, // disable the task manager rather than trying to specify it with 0 workers min: 1, }), - /* The threshold percenatge for workers experiencing version conflicts for shifting the polling interval. */ - version_conflict_threshold: schema.number({ - defaultValue: DEFAULT_VERSION_CONFLICT_THRESHOLD, - min: 50, - max: 100, - }), - /* The rate at which we emit fresh monitored stats. By default we'll use the poll_interval (+ a slight buffer) */ - monitored_stats_required_freshness: schema.number({ - defaultValue: (config?: unknown) => - ((config as { poll_interval: number })?.poll_interval ?? DEFAULT_POLL_INTERVAL) + 1000, - min: 100, + /* The interval at which monotonically increasing metrics counters will reset */ + metrics_reset_interval: schema.number({ + defaultValue: DEFAULT_METRICS_RESET_INTERVAL, + min: 10 * 1000, // minimum 10 seconds }), /* The rate at which we refresh monitored stats that require aggregation queries against ES. */ monitored_aggregated_stats_refresh_rate: schema.number({ @@ -99,6 +95,22 @@ export const configSchema = schema.object( /* don't run monitored stat aggregations any faster than once every 5 seconds */ min: 5000, }), + monitored_stats_health_verbose_log: schema.object({ + enabled: schema.boolean({ defaultValue: false }), + level: schema.oneOf([schema.literal('debug'), schema.literal('info')], { + defaultValue: 'debug', + }), + /* The amount of seconds we allow a task to delay before printing a warning server log */ + warn_delayed_task_start_in_seconds: schema.number({ + defaultValue: DEFAULT_MONITORING_STATS_WARN_DELAYED_TASK_START_IN_SECONDS, + }), + }), + /* The rate at which we emit fresh monitored stats. By default we'll use the poll_interval (+ a slight buffer) */ + monitored_stats_required_freshness: schema.number({ + defaultValue: (config?: unknown) => + ((config as { poll_interval: number })?.poll_interval ?? DEFAULT_POLL_INTERVAL) + 1000, + min: 100, + }), /* The size of the running average window for monitored stats. */ monitored_stats_running_average_window: schema.number({ defaultValue: DEFAULT_MONITORING_STATS_RUNNING_AVERAGE_WINDOW, @@ -107,44 +119,39 @@ export const configSchema = schema.object( }), /* Task Execution result warn & error thresholds. */ monitored_task_execution_thresholds: schema.object({ - default: taskExecutionFailureThresholdSchema, custom: schema.recordOf(schema.string(), taskExecutionFailureThresholdSchema, { defaultValue: {}, }), + default: taskExecutionFailureThresholdSchema, }), - monitored_stats_health_verbose_log: schema.object({ - enabled: schema.boolean({ defaultValue: false }), - level: schema.oneOf([schema.literal('debug'), schema.literal('info')], { - defaultValue: 'debug', - }), - /* The amount of seconds we allow a task to delay before printing a warning server log */ - warn_delayed_task_start_in_seconds: schema.number({ - defaultValue: DEFAULT_MONITORING_STATS_WARN_DELAYED_TASK_START_IN_SECONDS, - }), - }), - ephemeral_tasks: schema.object({ - enabled: schema.boolean({ defaultValue: false }), - /* How many requests can Task Manager buffer before it rejects new requests. */ - request_capacity: schema.number({ - // a nice round contrived number, feel free to change as we learn how it behaves - defaultValue: 10, - min: 1, - max: DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY, - }), + /* How often, in milliseconds, the task manager will look for more work. */ + poll_interval: schema.number({ + defaultValue: DEFAULT_POLL_INTERVAL, + min: 100, }), - event_loop_delay: eventLoopDelaySchema, - worker_utilization_running_average_window: schema.number({ - defaultValue: DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW, - max: 100, + /* How many requests can Task Manager buffer before it rejects new requests. */ + request_capacity: schema.number({ + // a nice round contrived number, feel free to change as we learn how it behaves + defaultValue: 1000, min: 1, }), + requeue_invalid_tasks: requeueInvalidTasksConfig, /* These are not designed to be used by most users. Please use caution when changing these */ unsafe: schema.object({ - exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }), authenticate_background_task_utilization: schema.boolean({ defaultValue: true }), + exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }), + }), + /* The threshold percenatge for workers experiencing version conflicts for shifting the polling interval. */ + version_conflict_threshold: schema.number({ + defaultValue: DEFAULT_VERSION_CONFLICT_THRESHOLD, + min: 50, + max: 100, + }), + worker_utilization_running_average_window: schema.number({ + defaultValue: DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW, + max: 100, + min: 1, }), - requeue_invalid_tasks: requeueInvalidTasksConfig, - allow_reading_invalid_state: schema.boolean({ defaultValue: true }), }, { validate: (config) => { diff --git a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts index 863b5d986d3da..6a06ea93f3dcb 100644 --- a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts @@ -84,6 +84,7 @@ describe('EphemeralTaskLifecycle', () => { delay: 3000, max_attempts: 20, }, + metrics_reset_interval: 3000, ...config, }, elasticsearchAndSOAvailability$, diff --git a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts index e2d290d256ec2..f034feb154462 100644 --- a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts +++ b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts @@ -79,6 +79,7 @@ describe('managed configuration', () => { delay: 3000, max_attempts: 20, }, + metrics_reset_interval: 3000, }); logger = context.logger.get('taskManager'); diff --git a/x-pack/plugins/task_manager/server/monitoring/runtime_statistics_aggregator.ts b/x-pack/plugins/task_manager/server/lib/runtime_statistics_aggregator.ts similarity index 100% rename from x-pack/plugins/task_manager/server/monitoring/runtime_statistics_aggregator.ts rename to x-pack/plugins/task_manager/server/lib/runtime_statistics_aggregator.ts diff --git a/x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts b/x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts new file mode 100644 index 0000000000000..9671698329447 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts @@ -0,0 +1,1070 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import sinon from 'sinon'; +import { Subject, Observable } from 'rxjs'; +import { take, bufferCount, skip } from 'rxjs/operators'; +import { isTaskPollingCycleEvent, isTaskRunEvent } from '../task_events'; +import { TaskLifecycleEvent } from '../polling_lifecycle'; +import { AggregatedStat } from '../lib/runtime_statistics_aggregator'; +import { taskPollingLifecycleMock } from '../polling_lifecycle.mock'; +import { TaskManagerConfig } from '../config'; +import { createAggregator } from './create_aggregator'; +import { TaskClaimMetric, TaskClaimMetricsAggregator } from './task_claim_metrics_aggregator'; +import { taskClaimFailureEvent, taskClaimSuccessEvent } from './task_claim_metrics_aggregator.test'; +import { getTaskRunFailedEvent, getTaskRunSuccessEvent } from './task_run_metrics_aggregator.test'; +import { TaskRunMetric, TaskRunMetricsAggregator } from './task_run_metrics_aggregator'; +import * as TaskClaimMetricsAggregatorModule from './task_claim_metrics_aggregator'; +import { metricsAggregatorMock } from './metrics_aggregator.mock'; + +const mockMetricsAggregator = metricsAggregatorMock.create(); +const config: TaskManagerConfig = { + allow_reading_invalid_state: false, + ephemeral_tasks: { + enabled: true, + request_capacity: 10, + }, + event_loop_delay: { + monitor: true, + warn_threshold: 5000, + }, + max_attempts: 9, + max_workers: 10, + metrics_reset_interval: 30000, + monitored_aggregated_stats_refresh_rate: 5000, + monitored_stats_health_verbose_log: { + enabled: false, + level: 'debug' as const, + warn_delayed_task_start_in_seconds: 60, + }, + monitored_stats_required_freshness: 6000000, + monitored_stats_running_average_window: 50, + monitored_task_execution_thresholds: { + custom: {}, + default: { + error_threshold: 90, + warn_threshold: 80, + }, + }, + poll_interval: 6000000, + request_capacity: 1000, + requeue_invalid_tasks: { + enabled: false, + delay: 3000, + max_attempts: 20, + }, + unsafe: { + authenticate_background_task_utilization: true, + exclude_task_types: [], + }, + version_conflict_threshold: 80, + worker_utilization_running_average_window: 5, +}; + +describe('createAggregator', () => { + beforeEach(() => { + jest.resetAllMocks(); + }); + + describe('with TaskClaimMetricsAggregator', () => { + test('returns a cumulative count of successful polling cycles and total polling cycles', async () => { + const pollingCycleEvents = [ + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + ]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + + const taskClaimAggregator = createAggregator({ + key: 'task_claim', + taskPollingLifecycle, + config, + resetMetrics$: new Subject(), + taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskPollingCycleEvent(taskEvent), + metricsAggregator: new TaskClaimMetricsAggregator(), + }); + + return new Promise((resolve) => { + taskClaimAggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(pollingCycleEvents.length), + bufferCount(pollingCycleEvents.length) + ) + .subscribe((metrics: Array>) => { + expect(metrics[0]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 1, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[1]).toEqual({ + key: 'task_claim', + value: { success: 2, total: 2, duration: { counts: [2], values: [100] } }, + }); + expect(metrics[2]).toEqual({ + key: 'task_claim', + value: { success: 3, total: 3, duration: { counts: [3], values: [100] } }, + }); + expect(metrics[3]).toEqual({ + key: 'task_claim', + value: { success: 4, total: 4, duration: { counts: [4], values: [100] } }, + }); + expect(metrics[4]).toEqual({ + key: 'task_claim', + value: { success: 4, total: 5, duration: { counts: [4], values: [100] } }, + }); + expect(metrics[5]).toEqual({ + key: 'task_claim', + value: { success: 5, total: 6, duration: { counts: [5], values: [100] } }, + }); + expect(metrics[6]).toEqual({ + key: 'task_claim', + value: { success: 6, total: 7, duration: { counts: [6], values: [100] } }, + }); + expect(metrics[7]).toEqual({ + key: 'task_claim', + value: { success: 7, total: 8, duration: { counts: [7], values: [100] } }, + }); + expect(metrics[8]).toEqual({ + key: 'task_claim', + value: { success: 8, total: 9, duration: { counts: [8], values: [100] } }, + }); + expect(metrics[9]).toEqual({ + key: 'task_claim', + value: { success: 8, total: 10, duration: { counts: [8], values: [100] } }, + }); + expect(metrics[10]).toEqual({ + key: 'task_claim', + value: { success: 9, total: 11, duration: { counts: [9], values: [100] } }, + }); + resolve(); + }); + + for (const event of pollingCycleEvents) { + events$.next(event); + } + }); + }); + + test('resets count when resetMetric$ event is received', async () => { + const resetMetrics$ = new Subject(); + const pollingCycleEvents1 = [ + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + ]; + + const pollingCycleEvents2 = [ + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + ]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + + const taskClaimAggregator = createAggregator({ + key: 'task_claim', + taskPollingLifecycle, + config, + resetMetrics$, + taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskPollingCycleEvent(taskEvent), + metricsAggregator: new TaskClaimMetricsAggregator(), + }); + + return new Promise((resolve) => { + taskClaimAggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(pollingCycleEvents1.length + pollingCycleEvents2.length), + bufferCount(pollingCycleEvents1.length + pollingCycleEvents2.length) + ) + .subscribe((metrics: Array>) => { + expect(metrics[0]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 1, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[1]).toEqual({ + key: 'task_claim', + value: { success: 2, total: 2, duration: { counts: [2], values: [100] } }, + }); + expect(metrics[2]).toEqual({ + key: 'task_claim', + value: { success: 3, total: 3, duration: { counts: [3], values: [100] } }, + }); + expect(metrics[3]).toEqual({ + key: 'task_claim', + value: { success: 4, total: 4, duration: { counts: [4], values: [100] } }, + }); + expect(metrics[4]).toEqual({ + key: 'task_claim', + value: { success: 4, total: 5, duration: { counts: [4], values: [100] } }, + }); + expect(metrics[5]).toEqual({ + key: 'task_claim', + value: { success: 5, total: 6, duration: { counts: [5], values: [100] } }, + }); + // reset event should have been received here + expect(metrics[6]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 1, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[7]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 2, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[8]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 3, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[9]).toEqual({ + key: 'task_claim', + value: { success: 2, total: 4, duration: { counts: [2], values: [100] } }, + }); + expect(metrics[10]).toEqual({ + key: 'task_claim', + value: { success: 3, total: 5, duration: { counts: [3], values: [100] } }, + }); + resolve(); + }); + + for (const event of pollingCycleEvents1) { + events$.next(event); + } + resetMetrics$.next(true); + for (const event of pollingCycleEvents2) { + events$.next(event); + } + }); + }); + + test('resets count when configured metrics reset interval expires', async () => { + const clock = sinon.useFakeTimers(); + clock.tick(0); + const pollingCycleEvents1 = [ + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + ]; + + const pollingCycleEvents2 = [ + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + ]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + + const taskClaimAggregator = createAggregator({ + key: 'task_claim', + taskPollingLifecycle, + config: { + ...config, + metrics_reset_interval: 10, + }, + resetMetrics$: new Subject(), + taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskPollingCycleEvent(taskEvent), + metricsAggregator: new TaskClaimMetricsAggregator(), + }); + + return new Promise((resolve) => { + taskClaimAggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(pollingCycleEvents1.length + pollingCycleEvents2.length), + bufferCount(pollingCycleEvents1.length + pollingCycleEvents2.length) + ) + .subscribe((metrics: Array>) => { + expect(metrics[0]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 1, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[1]).toEqual({ + key: 'task_claim', + value: { success: 2, total: 2, duration: { counts: [2], values: [100] } }, + }); + expect(metrics[2]).toEqual({ + key: 'task_claim', + value: { success: 3, total: 3, duration: { counts: [3], values: [100] } }, + }); + expect(metrics[3]).toEqual({ + key: 'task_claim', + value: { success: 4, total: 4, duration: { counts: [4], values: [100] } }, + }); + expect(metrics[4]).toEqual({ + key: 'task_claim', + value: { success: 4, total: 5, duration: { counts: [4], values: [100] } }, + }); + expect(metrics[5]).toEqual({ + key: 'task_claim', + value: { success: 5, total: 6, duration: { counts: [5], values: [100] } }, + }); + // reset interval should have fired here + expect(metrics[6]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 1, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[7]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 2, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[8]).toEqual({ + key: 'task_claim', + value: { success: 1, total: 3, duration: { counts: [1], values: [100] } }, + }); + expect(metrics[9]).toEqual({ + key: 'task_claim', + value: { success: 2, total: 4, duration: { counts: [2], values: [100] } }, + }); + expect(metrics[10]).toEqual({ + key: 'task_claim', + value: { success: 3, total: 5, duration: { counts: [3], values: [100] } }, + }); + resolve(); + }); + + for (const event of pollingCycleEvents1) { + events$.next(event); + } + clock.tick(20); + for (const event of pollingCycleEvents2) { + events$.next(event); + } + + clock.restore(); + }); + }); + }); + + describe('with TaskRunMetricsAggregator', () => { + test('returns a cumulative count of successful task runs and total task runs, broken down by type', async () => { + const taskRunEvents = [ + getTaskRunSuccessEvent('alerting:example'), + getTaskRunSuccessEvent('telemetry'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunSuccessEvent('report'), + getTaskRunFailedEvent('alerting:example'), + getTaskRunSuccessEvent('alerting:.index-threshold'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunFailedEvent('alerting:example'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunFailedEvent('actions:webhook'), + ]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + + const taskRunAggregator = createAggregator({ + key: 'task_run', + taskPollingLifecycle, + config, + resetMetrics$: new Subject(), + taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent), + metricsAggregator: new TaskRunMetricsAggregator(), + }); + + return new Promise((resolve) => { + taskRunAggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(taskRunEvents.length), + bufferCount(taskRunEvents.length) + ) + .subscribe((metrics: Array>) => { + expect(metrics[0]).toEqual({ + key: 'task_run', + value: { + overall: { success: 1, total: 1 }, + by_type: { + alerting: { success: 1, total: 1 }, + 'alerting:example': { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[1]).toEqual({ + key: 'task_run', + value: { + overall: { success: 2, total: 2 }, + by_type: { + alerting: { success: 1, total: 1 }, + 'alerting:example': { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[2]).toEqual({ + key: 'task_run', + value: { + overall: { success: 3, total: 3 }, + by_type: { + alerting: { success: 2, total: 2 }, + 'alerting:example': { success: 2, total: 2 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[3]).toEqual({ + key: 'task_run', + value: { + overall: { success: 4, total: 4 }, + by_type: { + alerting: { success: 2, total: 2 }, + 'alerting:example': { success: 2, total: 2 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[4]).toEqual({ + key: 'task_run', + value: { + overall: { success: 4, total: 5 }, + by_type: { + alerting: { success: 2, total: 3 }, + 'alerting:example': { success: 2, total: 3 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[5]).toEqual({ + key: 'task_run', + value: { + overall: { success: 5, total: 6 }, + by_type: { + alerting: { success: 3, total: 4 }, + 'alerting:.index-threshold': { success: 1, total: 1 }, + 'alerting:example': { success: 2, total: 3 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[6]).toEqual({ + key: 'task_run', + value: { + overall: { success: 6, total: 7 }, + by_type: { + alerting: { success: 4, total: 5 }, + 'alerting:.index-threshold': { success: 1, total: 1 }, + 'alerting:example': { success: 3, total: 4 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[7]).toEqual({ + key: 'task_run', + value: { + overall: { success: 6, total: 8 }, + by_type: { + alerting: { success: 4, total: 6 }, + 'alerting:.index-threshold': { success: 1, total: 1 }, + 'alerting:example': { success: 3, total: 5 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[8]).toEqual({ + key: 'task_run', + value: { + overall: { success: 7, total: 9 }, + by_type: { + alerting: { success: 5, total: 7 }, + 'alerting:.index-threshold': { success: 1, total: 1 }, + 'alerting:example': { success: 4, total: 6 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[9]).toEqual({ + key: 'task_run', + value: { + overall: { success: 7, total: 10 }, + by_type: { + actions: { success: 0, total: 1 }, + alerting: { success: 5, total: 7 }, + 'actions:webhook': { success: 0, total: 1 }, + 'alerting:.index-threshold': { success: 1, total: 1 }, + 'alerting:example': { success: 4, total: 6 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + resolve(); + }); + + for (const event of taskRunEvents) { + events$.next(event); + } + }); + }); + + test('resets count when resetMetric$ event is received', async () => { + const resetMetrics$ = new Subject(); + const taskRunEvents1 = [ + getTaskRunSuccessEvent('alerting:example'), + getTaskRunSuccessEvent('telemetry'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunSuccessEvent('report'), + getTaskRunFailedEvent('alerting:example'), + ]; + + const taskRunEvents2 = [ + getTaskRunSuccessEvent('alerting:example'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunFailedEvent('alerting:example'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunFailedEvent('actions:webhook'), + ]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + + const taskRunAggregator = createAggregator({ + key: 'task_run', + taskPollingLifecycle, + config, + resetMetrics$, + taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent), + metricsAggregator: new TaskRunMetricsAggregator(), + }); + + return new Promise((resolve) => { + taskRunAggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(taskRunEvents1.length + taskRunEvents2.length), + bufferCount(taskRunEvents1.length + taskRunEvents2.length) + ) + .subscribe((metrics: Array>) => { + expect(metrics[0]).toEqual({ + key: 'task_run', + value: { + overall: { success: 1, total: 1 }, + by_type: { + alerting: { success: 1, total: 1 }, + 'alerting:example': { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[1]).toEqual({ + key: 'task_run', + value: { + overall: { success: 2, total: 2 }, + by_type: { + alerting: { success: 1, total: 1 }, + 'alerting:example': { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[2]).toEqual({ + key: 'task_run', + value: { + overall: { success: 3, total: 3 }, + by_type: { + alerting: { success: 2, total: 2 }, + 'alerting:example': { success: 2, total: 2 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[3]).toEqual({ + key: 'task_run', + value: { + overall: { success: 4, total: 4 }, + by_type: { + alerting: { success: 2, total: 2 }, + 'alerting:example': { success: 2, total: 2 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[4]).toEqual({ + key: 'task_run', + value: { + overall: { success: 4, total: 5 }, + by_type: { + alerting: { success: 2, total: 3 }, + 'alerting:example': { success: 2, total: 3 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + // reset event should have been received here + expect(metrics[5]).toEqual({ + key: 'task_run', + value: { + overall: { success: 1, total: 1 }, + by_type: { + alerting: { success: 1, total: 1 }, + 'alerting:example': { success: 1, total: 1 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + expect(metrics[6]).toEqual({ + key: 'task_run', + value: { + overall: { success: 2, total: 2 }, + by_type: { + alerting: { success: 2, total: 2 }, + 'alerting:example': { success: 2, total: 2 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + expect(metrics[7]).toEqual({ + key: 'task_run', + value: { + overall: { success: 2, total: 3 }, + by_type: { + alerting: { success: 2, total: 3 }, + 'alerting:example': { success: 2, total: 3 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + expect(metrics[8]).toEqual({ + key: 'task_run', + value: { + overall: { success: 3, total: 4 }, + by_type: { + alerting: { success: 3, total: 4 }, + 'alerting:example': { success: 3, total: 4 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + expect(metrics[9]).toEqual({ + key: 'task_run', + value: { + overall: { success: 3, total: 5 }, + by_type: { + actions: { success: 0, total: 1 }, + alerting: { success: 3, total: 4 }, + 'actions:webhook': { success: 0, total: 1 }, + 'alerting:example': { success: 3, total: 4 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + resolve(); + }); + + for (const event of taskRunEvents1) { + events$.next(event); + } + resetMetrics$.next(true); + for (const event of taskRunEvents2) { + events$.next(event); + } + }); + }); + + test('resets count when configured metrics reset interval expires', async () => { + const clock = sinon.useFakeTimers(); + clock.tick(0); + const taskRunEvents1 = [ + getTaskRunSuccessEvent('alerting:example'), + getTaskRunSuccessEvent('telemetry'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunSuccessEvent('report'), + getTaskRunFailedEvent('alerting:example'), + ]; + + const taskRunEvents2 = [ + getTaskRunSuccessEvent('alerting:example'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunFailedEvent('alerting:example'), + getTaskRunSuccessEvent('alerting:example'), + getTaskRunFailedEvent('actions:webhook'), + ]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + + const taskRunAggregator = createAggregator({ + key: 'task_run', + taskPollingLifecycle, + config: { + ...config, + metrics_reset_interval: 10, + }, + resetMetrics$: new Subject(), + taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent), + metricsAggregator: new TaskRunMetricsAggregator(), + }); + + return new Promise((resolve) => { + taskRunAggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(taskRunEvents1.length + taskRunEvents2.length), + bufferCount(taskRunEvents1.length + taskRunEvents2.length) + ) + .subscribe((metrics: Array>) => { + expect(metrics[0]).toEqual({ + key: 'task_run', + value: { + overall: { success: 1, total: 1 }, + by_type: { + alerting: { success: 1, total: 1 }, + 'alerting:example': { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[1]).toEqual({ + key: 'task_run', + value: { + overall: { success: 2, total: 2 }, + by_type: { + alerting: { success: 1, total: 1 }, + 'alerting:example': { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[2]).toEqual({ + key: 'task_run', + value: { + overall: { success: 3, total: 3 }, + by_type: { + alerting: { success: 2, total: 2 }, + 'alerting:example': { success: 2, total: 2 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[3]).toEqual({ + key: 'task_run', + value: { + overall: { success: 4, total: 4 }, + by_type: { + alerting: { success: 2, total: 2 }, + 'alerting:example': { success: 2, total: 2 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + expect(metrics[4]).toEqual({ + key: 'task_run', + value: { + overall: { success: 4, total: 5 }, + by_type: { + alerting: { success: 2, total: 3 }, + 'alerting:example': { success: 2, total: 3 }, + report: { success: 1, total: 1 }, + telemetry: { success: 1, total: 1 }, + }, + }, + }); + // reset event should have been received here + expect(metrics[5]).toEqual({ + key: 'task_run', + value: { + overall: { success: 1, total: 1 }, + by_type: { + alerting: { success: 1, total: 1 }, + 'alerting:example': { success: 1, total: 1 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + expect(metrics[6]).toEqual({ + key: 'task_run', + value: { + overall: { success: 2, total: 2 }, + by_type: { + alerting: { success: 2, total: 2 }, + 'alerting:example': { success: 2, total: 2 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + expect(metrics[7]).toEqual({ + key: 'task_run', + value: { + overall: { success: 2, total: 3 }, + by_type: { + alerting: { success: 2, total: 3 }, + 'alerting:example': { success: 2, total: 3 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + expect(metrics[8]).toEqual({ + key: 'task_run', + value: { + overall: { success: 3, total: 4 }, + by_type: { + alerting: { success: 3, total: 4 }, + 'alerting:example': { success: 3, total: 4 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + expect(metrics[9]).toEqual({ + key: 'task_run', + value: { + overall: { success: 3, total: 5 }, + by_type: { + actions: { success: 0, total: 1 }, + alerting: { success: 3, total: 4 }, + 'actions:webhook': { success: 0, total: 1 }, + 'alerting:example': { success: 3, total: 4 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }, + }); + resolve(); + }); + + for (const event of taskRunEvents1) { + events$.next(event); + } + clock.tick(20); + for (const event of taskRunEvents2) { + events$.next(event); + } + + clock.restore(); + }); + }); + }); + + test('should filter task lifecycle events using specified taskEventFilter', () => { + const pollingCycleEvents = [ + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + ]; + const taskEventFilter = jest.fn().mockReturnValue(true); + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const aggregator = createAggregator({ + key: 'test', + taskPollingLifecycle, + config, + resetMetrics$: new Subject(), + taskEventFilter, + metricsAggregator: new TaskClaimMetricsAggregator(), + }); + + return new Promise((resolve) => { + aggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(pollingCycleEvents.length), + bufferCount(pollingCycleEvents.length) + ) + .subscribe(() => { + resolve(); + }); + + for (const event of pollingCycleEvents) { + events$.next(event); + } + + expect(taskEventFilter).toHaveBeenCalledTimes(pollingCycleEvents.length); + }); + }); + + test('should call metricAggregator to process task lifecycle events', () => { + const spy = jest + .spyOn(TaskClaimMetricsAggregatorModule, 'TaskClaimMetricsAggregator') + .mockImplementation(() => mockMetricsAggregator); + + const pollingCycleEvents = [ + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + ]; + const taskEventFilter = jest.fn().mockReturnValue(true); + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const aggregator = createAggregator({ + key: 'test', + taskPollingLifecycle, + config, + resetMetrics$: new Subject(), + taskEventFilter, + metricsAggregator: mockMetricsAggregator, + }); + + return new Promise((resolve) => { + aggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(pollingCycleEvents.length), + bufferCount(pollingCycleEvents.length) + ) + .subscribe(() => { + resolve(); + }); + + for (const event of pollingCycleEvents) { + events$.next(event); + } + + expect(mockMetricsAggregator.initialMetric).toHaveBeenCalledTimes(1); + expect(mockMetricsAggregator.processTaskLifecycleEvent).toHaveBeenCalledTimes( + pollingCycleEvents.length + ); + expect(mockMetricsAggregator.collect).toHaveBeenCalledTimes(pollingCycleEvents.length); + expect(mockMetricsAggregator.reset).not.toHaveBeenCalled(); + spy.mockRestore(); + }); + }); + + test('should call metricAggregator reset when resetMetric$ event is received', () => { + const spy = jest + .spyOn(TaskClaimMetricsAggregatorModule, 'TaskClaimMetricsAggregator') + .mockImplementation(() => mockMetricsAggregator); + + const resetMetrics$ = new Subject(); + const pollingCycleEvents = [ + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimSuccessEvent, + taskClaimFailureEvent, + taskClaimSuccessEvent, + ]; + const taskEventFilter = jest.fn().mockReturnValue(true); + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const aggregator = createAggregator({ + key: 'test', + taskPollingLifecycle, + config, + resetMetrics$, + taskEventFilter, + metricsAggregator: mockMetricsAggregator, + }); + + return new Promise((resolve) => { + aggregator + .pipe( + // skip initial metric which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + take(pollingCycleEvents.length), + bufferCount(pollingCycleEvents.length) + ) + .subscribe(() => { + resolve(); + }); + + for (const event of pollingCycleEvents) { + events$.next(event); + } + + for (let i = 0; i < 5; i++) { + events$.next(pollingCycleEvents[i]); + } + resetMetrics$.next(true); + for (let i = 0; i < pollingCycleEvents.length; i++) { + events$.next(pollingCycleEvents[i]); + } + + expect(mockMetricsAggregator.initialMetric).toHaveBeenCalledTimes(1); + expect(mockMetricsAggregator.processTaskLifecycleEvent).toHaveBeenCalledTimes( + pollingCycleEvents.length + ); + expect(mockMetricsAggregator.collect).toHaveBeenCalledTimes(pollingCycleEvents.length); + expect(mockMetricsAggregator.reset).toHaveBeenCalledTimes(1); + spy.mockRestore(); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/metrics/create_aggregator.ts b/x-pack/plugins/task_manager/server/metrics/create_aggregator.ts new file mode 100644 index 0000000000000..cece8c0f70b23 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/create_aggregator.ts @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { combineLatest, filter, interval, map, merge, Observable, startWith } from 'rxjs'; +import { JsonValue } from '@kbn/utility-types'; +import { TaskLifecycleEvent, TaskPollingLifecycle } from '../polling_lifecycle'; +import { AggregatedStat, AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; +import { TaskManagerConfig } from '../config'; +import { ITaskMetricsAggregator } from './types'; + +export interface CreateMetricsAggregatorOpts { + key: string; + config: TaskManagerConfig; + resetMetrics$: Observable; + taskPollingLifecycle: TaskPollingLifecycle; + taskEventFilter: (taskEvent: TaskLifecycleEvent) => boolean; + metricsAggregator: ITaskMetricsAggregator; +} + +export function createAggregator({ + key, + taskPollingLifecycle, + config, + resetMetrics$, + taskEventFilter, + metricsAggregator, +}: CreateMetricsAggregatorOpts): AggregatedStatProvider { + // Resets the aggregators either when the reset interval has passed or + // a resetMetrics$ event is received + merge( + interval(config.metrics_reset_interval).pipe(map(() => true)), + resetMetrics$.pipe(map(() => true)) + ).subscribe(() => { + metricsAggregator.reset(); + }); + + const taskEvents$: Observable = taskPollingLifecycle.events.pipe( + filter((taskEvent: TaskLifecycleEvent) => taskEventFilter(taskEvent)), + map((taskEvent: TaskLifecycleEvent) => { + metricsAggregator.processTaskLifecycleEvent(taskEvent); + return metricsAggregator.collect(); + }) + ); + + return combineLatest([taskEvents$.pipe(startWith(metricsAggregator.initialMetric()))]).pipe( + map(([value]: [T]) => { + return { + key, + value, + } as AggregatedStat; + }) + ); +} diff --git a/x-pack/plugins/task_manager/server/metrics/index.ts b/x-pack/plugins/task_manager/server/metrics/index.ts new file mode 100644 index 0000000000000..5e2a73f91dd73 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/index.ts @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Observable } from 'rxjs'; +import { TaskManagerConfig } from '../config'; +import { Metrics, createMetricsAggregators, createMetricsStream } from './metrics_stream'; +import { TaskPollingLifecycle } from '../polling_lifecycle'; +export type { Metrics } from './metrics_stream'; + +export function metricsStream( + config: TaskManagerConfig, + resetMetrics$: Observable, + taskPollingLifecycle?: TaskPollingLifecycle +): Observable { + return createMetricsStream( + createMetricsAggregators({ + config, + resetMetrics$, + taskPollingLifecycle, + }) + ); +} diff --git a/x-pack/plugins/task_manager/server/metrics/metrics_aggregator.mock.ts b/x-pack/plugins/task_manager/server/metrics/metrics_aggregator.mock.ts new file mode 100644 index 0000000000000..691ba9d0290d2 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/metrics_aggregator.mock.ts @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const createIMetricsAggregatorMock = () => { + return jest.fn().mockImplementation(() => { + return { + initialMetric: jest.fn().mockReturnValue({ count: 0 }), + reset: jest.fn(), + collect: jest.fn(), + processTaskLifecycleEvent: jest.fn(), + }; + }); +}; + +export const metricsAggregatorMock = { + create: createIMetricsAggregatorMock(), +}; diff --git a/x-pack/plugins/task_manager/server/metrics/metrics_stream.test.ts b/x-pack/plugins/task_manager/server/metrics/metrics_stream.test.ts new file mode 100644 index 0000000000000..5aec856a7a4f0 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/metrics_stream.test.ts @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Subject } from 'rxjs'; +import { take, bufferCount } from 'rxjs/operators'; +import { createMetricsStream } from './metrics_stream'; +import { JsonValue } from '@kbn/utility-types'; +import { AggregatedStat } from '../lib/runtime_statistics_aggregator'; + +beforeEach(() => { + jest.resetAllMocks(); +}); + +describe('createMetricsStream', () => { + it('incrementally updates the metrics returned by the endpoint', async () => { + const aggregatedStats$ = new Subject(); + + return new Promise((resolve) => { + createMetricsStream(aggregatedStats$) + .pipe(take(3), bufferCount(3)) + .subscribe(([initialValue, secondValue, thirdValue]) => { + expect(initialValue.metrics).toMatchObject({ + lastUpdate: expect.any(String), + metrics: {}, + }); + + expect(secondValue).toMatchObject({ + lastUpdate: expect.any(String), + metrics: { + newAggregatedStat: { + timestamp: expect.any(String), + value: { + some: { + complex: { + value: 123, + }, + }, + }, + }, + }, + }); + + expect(thirdValue).toMatchObject({ + lastUpdate: expect.any(String), + metrics: { + newAggregatedStat: { + timestamp: expect.any(String), + value: { + some: { + updated: { + value: 456, + }, + }, + }, + }, + }, + }); + }); + + aggregatedStats$.next({ + key: 'newAggregatedStat', + value: { + some: { + complex: { + value: 123, + }, + }, + } as JsonValue, + }); + + aggregatedStats$.next({ + key: 'newAggregatedStat', + value: { + some: { + updated: { + value: 456, + }, + }, + } as JsonValue, + }); + + resolve(); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/metrics/metrics_stream.ts b/x-pack/plugins/task_manager/server/metrics/metrics_stream.ts new file mode 100644 index 0000000000000..29558308c5196 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/metrics_stream.ts @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { merge, of, Observable } from 'rxjs'; +import { map, scan } from 'rxjs/operators'; +import { set } from '@kbn/safer-lodash-set'; +import { TaskLifecycleEvent, TaskPollingLifecycle } from '../polling_lifecycle'; +import { TaskManagerConfig } from '../config'; +import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; +import { isTaskPollingCycleEvent, isTaskRunEvent } from '../task_events'; +import { TaskClaimMetric, TaskClaimMetricsAggregator } from './task_claim_metrics_aggregator'; +import { createAggregator } from './create_aggregator'; +import { TaskRunMetric, TaskRunMetricsAggregator } from './task_run_metrics_aggregator'; +export interface Metrics { + last_update: string; + metrics: { + task_claim?: Metric; + task_run?: Metric; + }; +} + +export interface Metric { + timestamp: string; + value: T; +} + +interface CreateMetricsAggregatorsOpts { + config: TaskManagerConfig; + resetMetrics$: Observable; + taskPollingLifecycle?: TaskPollingLifecycle; +} +export function createMetricsAggregators({ + config, + resetMetrics$, + taskPollingLifecycle, +}: CreateMetricsAggregatorsOpts): AggregatedStatProvider { + const aggregators: AggregatedStatProvider[] = []; + if (taskPollingLifecycle) { + aggregators.push( + createAggregator({ + key: 'task_claim', + taskPollingLifecycle, + config, + resetMetrics$, + taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskPollingCycleEvent(taskEvent), + metricsAggregator: new TaskClaimMetricsAggregator(), + }), + createAggregator({ + key: 'task_run', + taskPollingLifecycle, + config, + resetMetrics$, + taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent), + metricsAggregator: new TaskRunMetricsAggregator(), + }) + ); + } + return merge(...aggregators); +} + +export function createMetricsStream(provider$: AggregatedStatProvider): Observable { + const initialMetrics = { + last_update: new Date().toISOString(), + metrics: {}, + }; + return merge( + // emit the initial metrics + of(initialMetrics), + // emit updated metrics whenever a provider updates a specific key on the stats + provider$.pipe( + map(({ key, value }) => { + return { + value: { timestamp: new Date().toISOString(), value }, + key, + }; + }), + scan((metrics: Metrics, { key, value }) => { + // incrementally merge stats as they come in + set(metrics.metrics, key, value); + metrics.last_update = new Date().toISOString(); + return metrics; + }, initialMetrics) + ) + ); +} diff --git a/x-pack/plugins/task_manager/server/metrics/success_rate_counter.test.ts b/x-pack/plugins/task_manager/server/metrics/success_rate_counter.test.ts new file mode 100644 index 0000000000000..eb34f3a34c005 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/success_rate_counter.test.ts @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { SuccessRateCounter } from './success_rate_counter'; + +describe('SuccessRateCounter', () => { + let successRateCounter: SuccessRateCounter; + beforeEach(() => { + successRateCounter = new SuccessRateCounter(); + }); + + test('should correctly initialize', () => { + expect(successRateCounter.get()).toEqual({ success: 0, total: 0 }); + }); + + test('should correctly return initialMetrics', () => { + expect(successRateCounter.initialMetric()).toEqual({ success: 0, total: 0 }); + }); + + test('should correctly increment counter when success is true', () => { + successRateCounter.increment(true); + successRateCounter.increment(true); + expect(successRateCounter.get()).toEqual({ success: 2, total: 2 }); + }); + + test('should correctly increment counter when success is false', () => { + successRateCounter.increment(false); + successRateCounter.increment(false); + expect(successRateCounter.get()).toEqual({ success: 0, total: 2 }); + }); + + test('should correctly reset counter', () => { + successRateCounter.increment(true); + successRateCounter.increment(true); + successRateCounter.increment(false); + successRateCounter.increment(false); + successRateCounter.increment(true); + successRateCounter.increment(true); + successRateCounter.increment(false); + expect(successRateCounter.get()).toEqual({ success: 4, total: 7 }); + + successRateCounter.reset(); + expect(successRateCounter.get()).toEqual({ success: 0, total: 0 }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/metrics/success_rate_counter.ts b/x-pack/plugins/task_manager/server/metrics/success_rate_counter.ts new file mode 100644 index 0000000000000..d9c61575a2698 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/success_rate_counter.ts @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { JsonObject } from '@kbn/utility-types'; + +export interface SuccessRate extends JsonObject { + success: number; + total: number; +} + +export class SuccessRateCounter { + private success = 0; + private total = 0; + + public initialMetric(): SuccessRate { + return { + success: 0, + total: 0, + }; + } + + public get(): SuccessRate { + return { + success: this.success, + total: this.total, + }; + } + + public increment(success: boolean) { + if (success) { + this.success++; + } + this.total++; + } + + public reset() { + this.success = 0; + this.total = 0; + } +} diff --git a/x-pack/plugins/task_manager/server/metrics/task_claim_metrics_aggregator.test.ts b/x-pack/plugins/task_manager/server/metrics/task_claim_metrics_aggregator.test.ts new file mode 100644 index 0000000000000..cfcf4bfdf8d0b --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/task_claim_metrics_aggregator.test.ts @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { none } from 'fp-ts/lib/Option'; +import { FillPoolResult } from '../lib/fill_pool'; +import { asOk, asErr } from '../lib/result_type'; +import { PollingError, PollingErrorType } from '../polling'; +import { asTaskPollingCycleEvent } from '../task_events'; +import { TaskClaimMetricsAggregator } from './task_claim_metrics_aggregator'; + +export const taskClaimSuccessEvent = asTaskPollingCycleEvent( + asOk({ + result: FillPoolResult.PoolFilled, + stats: { + tasksUpdated: 0, + tasksConflicted: 0, + tasksClaimed: 0, + }, + }), + { + start: 1689698780490, + stop: 1689698780500, + } +); +export const taskClaimFailureEvent = asTaskPollingCycleEvent( + asErr( + new PollingError( + 'Failed to poll for work: Error: failed to work', + PollingErrorType.WorkError, + none + ) + ) +); + +describe('TaskClaimMetricsAggregator', () => { + let taskClaimMetricsAggregator: TaskClaimMetricsAggregator; + beforeEach(() => { + taskClaimMetricsAggregator = new TaskClaimMetricsAggregator(); + }); + + test('should correctly initialize', () => { + expect(taskClaimMetricsAggregator.collect()).toEqual({ + success: 0, + total: 0, + duration: { counts: [], values: [] }, + }); + }); + + test('should correctly return initialMetrics', () => { + expect(taskClaimMetricsAggregator.initialMetric()).toEqual({ + success: 0, + total: 0, + duration: { counts: [], values: [] }, + }); + }); + + test('should correctly process task lifecycle success event', () => { + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent); + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent); + expect(taskClaimMetricsAggregator.collect()).toEqual({ + success: 2, + total: 2, + duration: { counts: [2], values: [100] }, + }); + }); + + test('should correctly process task lifecycle failure event', () => { + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimFailureEvent); + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimFailureEvent); + expect(taskClaimMetricsAggregator.collect()).toEqual({ + success: 0, + total: 2, + duration: { counts: [], values: [] }, + }); + }); + + test('should correctly reset counter', () => { + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent); + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent); + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimFailureEvent); + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimFailureEvent); + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent); + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimSuccessEvent); + taskClaimMetricsAggregator.processTaskLifecycleEvent(taskClaimFailureEvent); + expect(taskClaimMetricsAggregator.collect()).toEqual({ + success: 4, + total: 7, + duration: { counts: [4], values: [100] }, + }); + + taskClaimMetricsAggregator.reset(); + expect(taskClaimMetricsAggregator.collect()).toEqual({ + success: 0, + total: 0, + duration: { counts: [], values: [] }, + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/metrics/task_claim_metrics_aggregator.ts b/x-pack/plugins/task_manager/server/metrics/task_claim_metrics_aggregator.ts new file mode 100644 index 0000000000000..75c03105287fa --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/task_claim_metrics_aggregator.ts @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +// @ts-expect-error +// eslint-disable-next-line import/no-extraneous-dependencies +import Histogram from 'native-hdr-histogram'; +import { isOk } from '../lib/result_type'; +import { TaskLifecycleEvent } from '../polling_lifecycle'; +import { TaskRun } from '../task_events'; +import { SuccessRate, SuccessRateCounter } from './success_rate_counter'; +import { ITaskMetricsAggregator } from './types'; + +const HDR_HISTOGRAM_MIN = 1; // 1 millis +const HDR_HISTOGRAM_MAX = 30000; // 30 seconds +const HDR_HISTOGRAM_BUCKET_SIZE = 100; // 100 millis + +export type TaskClaimMetric = SuccessRate & { + duration: { + counts: number[]; + values: number[]; + }; +}; + +export class TaskClaimMetricsAggregator implements ITaskMetricsAggregator { + private claimSuccessRate = new SuccessRateCounter(); + private durationHistogram = new Histogram(HDR_HISTOGRAM_MIN, HDR_HISTOGRAM_MAX); + + public initialMetric(): TaskClaimMetric { + return { + ...this.claimSuccessRate.initialMetric(), + duration: { counts: [], values: [] }, + }; + } + public collect(): TaskClaimMetric { + return { + ...this.claimSuccessRate.get(), + duration: this.serializeHistogram(), + }; + } + + public reset() { + this.claimSuccessRate.reset(); + this.durationHistogram.reset(); + } + + public processTaskLifecycleEvent(taskEvent: TaskLifecycleEvent) { + const success = isOk((taskEvent as TaskRun).event); + this.claimSuccessRate.increment(success); + + if (taskEvent.timing) { + const durationInMs = taskEvent.timing.stop - taskEvent.timing.start; + this.durationHistogram.record(durationInMs); + } + } + + private serializeHistogram() { + const counts: number[] = []; + const values: number[] = []; + + for (const { count, value } of this.durationHistogram.linearcounts(HDR_HISTOGRAM_BUCKET_SIZE)) { + counts.push(count); + values.push(value); + } + + return { counts, values }; + } +} diff --git a/x-pack/plugins/task_manager/server/metrics/task_run_metrics_aggregator.test.ts b/x-pack/plugins/task_manager/server/metrics/task_run_metrics_aggregator.test.ts new file mode 100644 index 0000000000000..e3654fd9a21d5 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/task_run_metrics_aggregator.test.ts @@ -0,0 +1,208 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import * as uuid from 'uuid'; +import { asOk, asErr } from '../lib/result_type'; +import { TaskStatus } from '../task'; +import { asTaskRunEvent, TaskPersistence } from '../task_events'; +import { TaskRunResult } from '../task_running'; +import { TaskRunMetricsAggregator } from './task_run_metrics_aggregator'; + +export const getTaskRunSuccessEvent = (type: string) => { + const id = uuid.v4(); + return asTaskRunEvent( + id, + asOk({ + task: { + id, + attempts: 0, + status: TaskStatus.Running, + version: '123', + runAt: new Date(), + scheduledAt: new Date(), + startedAt: new Date(), + retryAt: new Date(Date.now() + 5 * 60 * 1000), + state: {}, + taskType: type, + params: {}, + ownerId: null, + }, + persistence: TaskPersistence.Recurring, + result: TaskRunResult.Success, + }), + { + start: 1689698780490, + stop: 1689698780500, + } + ); +}; + +export const getTaskRunFailedEvent = (type: string) => { + const id = uuid.v4(); + return asTaskRunEvent( + id, + asErr({ + error: new Error('task failed to run'), + task: { + id, + attempts: 0, + status: TaskStatus.Running, + version: '123', + runAt: new Date(), + scheduledAt: new Date(), + startedAt: new Date(), + retryAt: new Date(Date.now() + 5 * 60 * 1000), + state: {}, + taskType: type, + params: {}, + ownerId: null, + }, + persistence: TaskPersistence.Recurring, + result: TaskRunResult.Failed, + }) + ); +}; + +describe('TaskRunMetricsAggregator', () => { + let taskRunMetricsAggregator: TaskRunMetricsAggregator; + beforeEach(() => { + taskRunMetricsAggregator = new TaskRunMetricsAggregator(); + }); + + test('should correctly initialize', () => { + expect(taskRunMetricsAggregator.collect()).toEqual({ + overall: { success: 0, total: 0 }, + by_type: {}, + }); + }); + + test('should correctly return initialMetrics', () => { + expect(taskRunMetricsAggregator.initialMetric()).toEqual({ + overall: { success: 0, total: 0 }, + by_type: {}, + }); + }); + + test('should correctly process task run success event', () => { + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry')); + expect(taskRunMetricsAggregator.collect()).toEqual({ + overall: { success: 2, total: 2 }, + by_type: { + telemetry: { success: 2, total: 2 }, + }, + }); + }); + + test('should correctly process task run failure event', () => { + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry')); + expect(taskRunMetricsAggregator.collect()).toEqual({ + overall: { success: 0, total: 2 }, + by_type: { + telemetry: { success: 0, total: 2 }, + }, + }); + }); + + test('should correctly process different task types', () => { + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry')); + expect(taskRunMetricsAggregator.collect()).toEqual({ + overall: { success: 3, total: 4 }, + by_type: { + report: { success: 2, total: 2 }, + telemetry: { success: 1, total: 2 }, + }, + }); + }); + + test('should correctly group alerting and action task types', () => { + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent( + getTaskRunSuccessEvent('alerting:.index-threshold') + ); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:webhook')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:webhook')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:.email')); + taskRunMetricsAggregator.processTaskLifecycleEvent( + getTaskRunSuccessEvent('alerting:.index-threshold') + ); + expect(taskRunMetricsAggregator.collect()).toEqual({ + overall: { success: 11, total: 14 }, + by_type: { + actions: { success: 3, total: 3 }, + 'actions:.email': { success: 1, total: 1 }, + 'actions:webhook': { success: 2, total: 2 }, + alerting: { success: 5, total: 7 }, + 'alerting:example': { success: 3, total: 5 }, + 'alerting:.index-threshold': { success: 2, total: 2 }, + report: { success: 2, total: 2 }, + telemetry: { success: 1, total: 2 }, + }, + }); + }); + + test('should correctly reset counter', () => { + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent( + getTaskRunSuccessEvent('alerting:.index-threshold') + ); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:webhook')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:webhook')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('alerting:example')); + taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('actions:.email')); + taskRunMetricsAggregator.processTaskLifecycleEvent( + getTaskRunSuccessEvent('alerting:.index-threshold') + ); + expect(taskRunMetricsAggregator.collect()).toEqual({ + overall: { success: 11, total: 14 }, + by_type: { + actions: { success: 3, total: 3 }, + 'actions:.email': { success: 1, total: 1 }, + 'actions:webhook': { success: 2, total: 2 }, + alerting: { success: 5, total: 7 }, + 'alerting:example': { success: 3, total: 5 }, + 'alerting:.index-threshold': { success: 2, total: 2 }, + report: { success: 2, total: 2 }, + telemetry: { success: 1, total: 2 }, + }, + }); + + taskRunMetricsAggregator.reset(); + expect(taskRunMetricsAggregator.collect()).toEqual({ + overall: { success: 0, total: 0 }, + by_type: { + actions: { success: 0, total: 0 }, + 'actions:.email': { success: 0, total: 0 }, + 'actions:webhook': { success: 0, total: 0 }, + alerting: { success: 0, total: 0 }, + 'alerting:example': { success: 0, total: 0 }, + 'alerting:.index-threshold': { success: 0, total: 0 }, + report: { success: 0, total: 0 }, + telemetry: { success: 0, total: 0 }, + }, + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/metrics/task_run_metrics_aggregator.ts b/x-pack/plugins/task_manager/server/metrics/task_run_metrics_aggregator.ts new file mode 100644 index 0000000000000..c25d80f112df1 --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/task_run_metrics_aggregator.ts @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { JsonObject } from '@kbn/utility-types'; +import { isOk, unwrap } from '../lib/result_type'; +import { TaskLifecycleEvent } from '../polling_lifecycle'; +import { ErroredTask, RanTask, TaskRun } from '../task_events'; +import { SuccessRate, SuccessRateCounter } from './success_rate_counter'; +import { ITaskMetricsAggregator } from './types'; + +const taskTypeGrouping = new Set(['alerting:', 'actions:']); + +export interface TaskRunMetric extends JsonObject { + overall: SuccessRate; + by_type: { + [key: string]: SuccessRate; + }; +} + +export class TaskRunMetricsAggregator implements ITaskMetricsAggregator { + private taskRunSuccessRate = new SuccessRateCounter(); + private taskRunCounter: Map = new Map(); + + public initialMetric(): TaskRunMetric { + return { + overall: this.taskRunSuccessRate.initialMetric(), + by_type: {}, + }; + } + + public collect(): TaskRunMetric { + return { + overall: this.taskRunSuccessRate.get(), + by_type: this.collectTaskTypeEntries(), + }; + } + + public reset() { + this.taskRunSuccessRate.reset(); + for (const taskType of this.taskRunCounter.keys()) { + this.taskRunCounter.get(taskType)!.reset(); + } + } + + public processTaskLifecycleEvent(taskEvent: TaskLifecycleEvent) { + const { task }: RanTask | ErroredTask = unwrap((taskEvent as TaskRun).event); + const taskType = task.taskType; + + const taskTypeSuccessRate: SuccessRateCounter = + this.taskRunCounter.get(taskType) ?? new SuccessRateCounter(); + + const success = isOk((taskEvent as TaskRun).event); + this.taskRunSuccessRate.increment(success); + taskTypeSuccessRate.increment(success); + this.taskRunCounter.set(taskType, taskTypeSuccessRate); + + const taskTypeGroup = this.getTaskTypeGroup(taskType); + if (taskTypeGroup) { + const taskTypeGroupSuccessRate: SuccessRateCounter = + this.taskRunCounter.get(taskTypeGroup) ?? new SuccessRateCounter(); + taskTypeGroupSuccessRate.increment(success); + this.taskRunCounter.set(taskTypeGroup, taskTypeGroupSuccessRate); + } + } + + private collectTaskTypeEntries() { + const collected: Record = {}; + for (const [key, value] of this.taskRunCounter) { + collected[key] = value.get(); + } + return collected; + } + + private getTaskTypeGroup(taskType: string): string | undefined { + for (const group of taskTypeGrouping) { + if (taskType.startsWith(group)) { + return group.replaceAll(':', ''); + } + } + } +} diff --git a/x-pack/plugins/task_manager/server/metrics/types.ts b/x-pack/plugins/task_manager/server/metrics/types.ts new file mode 100644 index 0000000000000..7fbee1fe8abdd --- /dev/null +++ b/x-pack/plugins/task_manager/server/metrics/types.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { TaskLifecycleEvent } from '../polling_lifecycle'; + +export interface ITaskMetricsAggregator { + initialMetric: () => T; + collect: () => T; + reset: () => void; + processTaskLifecycleEvent: (taskEvent: TaskLifecycleEvent) => void; +} diff --git a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts index cdd67a07ff9e7..9507b3ab0e4cd 100644 --- a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts @@ -19,7 +19,7 @@ import { import { asOk } from '../lib/result_type'; import { TaskLifecycleEvent } from '../polling_lifecycle'; import { TaskRunResult } from '../task_running'; -import { AggregatedStat } from './runtime_statistics_aggregator'; +import { AggregatedStat } from '../lib/runtime_statistics_aggregator'; import { taskPollingLifecycleMock } from '../polling_lifecycle.mock'; import { BackgroundTaskUtilizationStat, diff --git a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts index fd116cbdd71d8..837f29c83f108 100644 --- a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts @@ -20,7 +20,7 @@ import { TaskTiming, } from '../task_events'; import { MonitoredStat } from './monitoring_stats_stream'; -import { AggregatedStat, AggregatedStatProvider } from './runtime_statistics_aggregator'; +import { AggregatedStat, AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; import { createRunningAveragedStat } from './task_run_calcultors'; import { DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW } from '../config'; diff --git a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts index 98493ae89b683..689c9c882bee3 100644 --- a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts @@ -52,6 +52,7 @@ describe('Configuration Statistics Aggregator', () => { delay: 3000, max_attempts: 20, }, + metrics_reset_interval: 3000, }; const managedConfig = { diff --git a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.ts index 6414c9e80ce06..2212affcc8db3 100644 --- a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.ts @@ -8,7 +8,7 @@ import { combineLatest, of } from 'rxjs'; import { pick, merge } from 'lodash'; import { map, startWith } from 'rxjs/operators'; -import { AggregatedStatProvider } from './runtime_statistics_aggregator'; +import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; import { TaskManagerConfig } from '../config'; import { ManagedConfiguration } from '../lib/create_managed_configuration'; diff --git a/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.test.ts index 8a2305c3076a5..8d4ef4fab2eba 100644 --- a/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.test.ts @@ -26,7 +26,7 @@ import { SummarizedEphemeralTaskStat, EphemeralTaskStat, } from './ephemeral_task_statistics'; -import { AggregatedStat } from './runtime_statistics_aggregator'; +import { AggregatedStat } from '../lib/runtime_statistics_aggregator'; import { ephemeralTaskLifecycleMock } from '../ephemeral_task_lifecycle.mock'; import { times, takeRight, take as takeLeft } from 'lodash'; diff --git a/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.ts index 52aa2b1eead25..8a6ade503b041 100644 --- a/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.ts @@ -9,7 +9,7 @@ import { map, filter, startWith, buffer, share } from 'rxjs/operators'; import { JsonObject } from '@kbn/utility-types'; import { combineLatest, Observable, zip } from 'rxjs'; import { isOk, Ok } from '../lib/result_type'; -import { AggregatedStat, AggregatedStatProvider } from './runtime_statistics_aggregator'; +import { AggregatedStat, AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; import { EphemeralTaskLifecycle } from '../ephemeral_task_lifecycle'; import { TaskLifecycleEvent } from '../polling_lifecycle'; import { isTaskRunEvent, isTaskManagerStatEvent } from '../task_events'; diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts index 995db14fa09ea..daf3f2baf085d 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts @@ -8,8 +8,9 @@ import { TaskManagerConfig } from '../config'; import { of, Subject } from 'rxjs'; import { take, bufferCount } from 'rxjs/operators'; -import { createMonitoringStatsStream, AggregatedStat } from './monitoring_stats_stream'; +import { createMonitoringStatsStream } from './monitoring_stats_stream'; import { JsonValue } from '@kbn/utility-types'; +import { AggregatedStat } from '../lib/runtime_statistics_aggregator'; beforeEach(() => { jest.resetAllMocks(); @@ -56,6 +57,7 @@ describe('createMonitoringStatsStream', () => { delay: 3000, max_attempts: 20, }, + metrics_reset_interval: 3000, }; it('returns the initial config used to configure Task Manager', async () => { diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts index e1ff38d1c9607..62505a34d7f89 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts @@ -37,13 +37,11 @@ import { import { ConfigStat, createConfigurationAggregator } from './configuration_statistics'; import { TaskManagerConfig } from '../config'; -import { AggregatedStatProvider } from './runtime_statistics_aggregator'; import { ManagedConfiguration } from '../lib/create_managed_configuration'; import { EphemeralTaskLifecycle } from '../ephemeral_task_lifecycle'; import { CapacityEstimationStat, withCapacityEstimate } from './capacity_estimation'; import { AdHocTaskCounter } from '../lib/adhoc_task_counter'; - -export type { AggregatedStatProvider, AggregatedStat } from './runtime_statistics_aggregator'; +import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; export interface MonitoringStats { last_update: string; diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts index 4d69b23b699b7..91e81013b726f 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts @@ -30,7 +30,7 @@ import { TaskRunStat, SummarizedTaskRunStat, } from './task_run_statistics'; -import { AggregatedStat } from './runtime_statistics_aggregator'; +import { AggregatedStat } from '../lib/runtime_statistics_aggregator'; import { FillPoolResult } from '../lib/fill_pool'; import { taskPollingLifecycleMock } from '../polling_lifecycle.mock'; import { configSchema } from '../config'; diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts index 0c6063af19286..7b7db8cb25eed 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts @@ -10,7 +10,7 @@ import { filter, startWith, map } from 'rxjs/operators'; import { JsonObject, JsonValue } from '@kbn/utility-types'; import { isNumber, mapValues } from 'lodash'; import { Logger } from '@kbn/core/server'; -import { AggregatedStatProvider, AggregatedStat } from './runtime_statistics_aggregator'; +import { AggregatedStatProvider, AggregatedStat } from '../lib/runtime_statistics_aggregator'; import { TaskLifecycleEvent } from '../polling_lifecycle'; import { isTaskRunEvent, diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts index bacd05dcb6a06..b4d5db14a12e4 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts @@ -12,7 +12,7 @@ import { JsonObject } from '@kbn/utility-types'; import { keyBy, mapValues } from 'lodash'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import type { AggregationResultOf } from '@kbn/es-types'; -import { AggregatedStatProvider } from './runtime_statistics_aggregator'; +import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; import { parseIntervalAsSecond, asInterval, parseIntervalAsMillisecond } from '../lib/intervals'; import { HealthStatus } from './monitoring_stats_stream'; import { TaskStore } from '../task_store'; diff --git a/x-pack/plugins/task_manager/server/plugin.test.ts b/x-pack/plugins/task_manager/server/plugin.test.ts index 4c0c96c7f76a6..1e7215d6d7a1b 100644 --- a/x-pack/plugins/task_manager/server/plugin.test.ts +++ b/x-pack/plugins/task_manager/server/plugin.test.ts @@ -77,6 +77,7 @@ const pluginInitializerContextParams = { delay: 3000, max_attempts: 20, }, + metrics_reset_interval: 3000, }; describe('TaskManagerPlugin', () => { diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index e65574cef779a..3b8ab4a54be1f 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -27,7 +27,7 @@ import { TaskDefinitionRegistry, TaskTypeDictionary, REMOVED_TYPES } from './tas import { AggregationOpts, FetchResult, SearchOpts, TaskStore } from './task_store'; import { createManagedConfiguration } from './lib/create_managed_configuration'; import { TaskScheduling } from './task_scheduling'; -import { backgroundTaskUtilizationRoute, healthRoute } from './routes'; +import { backgroundTaskUtilizationRoute, healthRoute, metricsRoute } from './routes'; import { createMonitoringStats, MonitoringStats } from './monitoring'; import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle'; import { EphemeralTask, ConcreteTaskInstance } from './task'; @@ -35,6 +35,7 @@ import { registerTaskManagerUsageCollector } from './usage'; import { TASK_MANAGER_INDEX } from './constants'; import { AdHocTaskCounter } from './lib/adhoc_task_counter'; import { setupIntervalLogging } from './lib/log_health_metrics'; +import { metricsStream, Metrics } from './metrics'; export interface TaskManagerSetupContract { /** @@ -82,6 +83,8 @@ export class TaskManagerPlugin private middleware: Middleware = createInitialMiddleware(); private elasticsearchAndSOAvailability$?: Observable; private monitoringStats$ = new Subject(); + private metrics$ = new Subject(); + private resetMetrics$ = new Subject(); private shouldRunBackgroundTasks: boolean; private readonly kibanaVersion: PluginInitializerContext['env']['packageInfo']['version']; private adHocTaskCounter: AdHocTaskCounter; @@ -155,6 +158,12 @@ export class TaskManagerPlugin getClusterClient: () => startServicesPromise.then(({ elasticsearch }) => elasticsearch.client), }); + metricsRoute({ + router, + metrics$: this.metrics$, + resetMetrics$: this.resetMetrics$, + taskManagerId: this.taskManagerId, + }); core.status.derivedStatus$.subscribe((status) => this.logger.debug(`status core.status.derivedStatus now set to ${status.level}`) @@ -276,6 +285,10 @@ export class TaskManagerPlugin this.ephemeralTaskLifecycle ).subscribe((stat) => this.monitoringStats$.next(stat)); + metricsStream(this.config!, this.resetMetrics$, this.taskPollingLifecycle).subscribe((metric) => + this.metrics$.next(metric) + ); + const taskScheduling = new TaskScheduling({ logger: this.logger, taskStore, diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index 62e6be589b4cf..79b153f42a88d 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -82,6 +82,7 @@ describe('TaskPollingLifecycle', () => { delay: 3000, max_attempts: 20, }, + metrics_reset_interval: 3000, }, taskStore: mockTaskStore, logger: taskManagerLogger, diff --git a/x-pack/plugins/task_manager/server/routes/index.ts b/x-pack/plugins/task_manager/server/routes/index.ts index f3ba539323f8e..372996f7cea3d 100644 --- a/x-pack/plugins/task_manager/server/routes/index.ts +++ b/x-pack/plugins/task_manager/server/routes/index.ts @@ -7,3 +7,4 @@ export { healthRoute } from './health'; export { backgroundTaskUtilizationRoute } from './background_task_utilization'; +export { metricsRoute } from './metrics'; diff --git a/x-pack/plugins/task_manager/server/routes/metrics.test.ts b/x-pack/plugins/task_manager/server/routes/metrics.test.ts new file mode 100644 index 0000000000000..a9703aa7548dd --- /dev/null +++ b/x-pack/plugins/task_manager/server/routes/metrics.test.ts @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { of, Subject } from 'rxjs'; +import { v4 as uuidv4 } from 'uuid'; +import { httpServiceMock } from '@kbn/core/server/mocks'; +import { metricsRoute } from './metrics'; +import { mockHandlerArguments } from './_mock_handler_arguments'; + +describe('metricsRoute', () => { + beforeEach(() => { + jest.resetAllMocks(); + }); + + it('registers route', async () => { + const router = httpServiceMock.createRouter(); + metricsRoute({ + router, + metrics$: of(), + resetMetrics$: new Subject(), + taskManagerId: uuidv4(), + }); + + const [config] = router.get.mock.calls[0]; + + expect(config.path).toMatchInlineSnapshot(`"/api/task_manager/metrics"`); + }); + + it('emits resetMetric$ event when route is accessed and reset query param is true', async () => { + let resetCalledTimes = 0; + const resetMetrics$ = new Subject(); + + resetMetrics$.subscribe(() => { + resetCalledTimes++; + }); + const router = httpServiceMock.createRouter(); + metricsRoute({ + router, + metrics$: of(), + resetMetrics$, + taskManagerId: uuidv4(), + }); + + const [config, handler] = router.get.mock.calls[0]; + const [context, req, res] = mockHandlerArguments({}, { query: { reset: true } }, ['ok']); + + expect(config.path).toMatchInlineSnapshot(`"/api/task_manager/metrics"`); + + await handler(context, req, res); + + expect(resetCalledTimes).toEqual(1); + }); + + it('does not emit resetMetric$ event when route is accessed and reset query param is false', async () => { + let resetCalledTimes = 0; + const resetMetrics$ = new Subject(); + + resetMetrics$.subscribe(() => { + resetCalledTimes++; + }); + const router = httpServiceMock.createRouter(); + metricsRoute({ + router, + metrics$: of(), + resetMetrics$, + taskManagerId: uuidv4(), + }); + + const [config, handler] = router.get.mock.calls[0]; + const [context, req, res] = mockHandlerArguments({}, { query: { reset: false } }, ['ok']); + + expect(config.path).toMatchInlineSnapshot(`"/api/task_manager/metrics"`); + + await handler(context, req, res); + + expect(resetCalledTimes).toEqual(0); + }); +}); diff --git a/x-pack/plugins/task_manager/server/routes/metrics.ts b/x-pack/plugins/task_manager/server/routes/metrics.ts new file mode 100644 index 0000000000000..f9dcf447fa101 --- /dev/null +++ b/x-pack/plugins/task_manager/server/routes/metrics.ts @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + IRouter, + RequestHandlerContext, + KibanaRequest, + IKibanaResponse, + KibanaResponseFactory, +} from '@kbn/core/server'; +import { schema, TypeOf } from '@kbn/config-schema'; +import { Observable, Subject } from 'rxjs'; +import { Metrics } from '../metrics'; + +export interface NodeMetrics { + process_uuid: string; + timestamp: string; + last_update: string; + metrics: Metrics['metrics'] | null; +} + +export interface MetricsRouteParams { + router: IRouter; + metrics$: Observable; + resetMetrics$: Subject; + taskManagerId: string; +} + +const QuerySchema = schema.object({ + reset: schema.boolean({ defaultValue: true }), +}); + +export function metricsRoute(params: MetricsRouteParams) { + const { router, metrics$, resetMetrics$, taskManagerId } = params; + + let lastMetrics: NodeMetrics | null = null; + + metrics$.subscribe((metrics) => { + lastMetrics = { process_uuid: taskManagerId, timestamp: new Date().toISOString(), ...metrics }; + }); + + router.get( + { + path: `/api/task_manager/metrics`, + // Uncomment when we determine that we can restrict API usage to Global admins based on telemetry + // options: { tags: ['access:taskManager'] }, + validate: { + query: QuerySchema, + }, + }, + async function ( + _: RequestHandlerContext, + req: KibanaRequest, unknown>, + res: KibanaResponseFactory + ): Promise { + if (req.query.reset) { + resetMetrics$.next(true); + } + + return res.ok({ + body: lastMetrics + ? lastMetrics + : { process_uuid: taskManagerId, timestamp: new Date().toISOString(), metrics: {} }, + }); + } + ); +} diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts index 97eeb17f0cd4e..7e897840f72c7 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts @@ -1298,6 +1298,45 @@ describe('TaskManagerRunner', () => { ); }); + test('emits TaskEvent when a recurring task returns a success result with hasError=true', async () => { + const id = _.random(1, 20).toString(); + const runAt = minutesFromNow(_.random(5)); + const onTaskEvent = jest.fn(); + const { runner, instance } = await readyToRunStageSetup({ + onTaskEvent, + instance: { + id, + schedule: { interval: '1m' }, + }, + definitions: { + bar: { + title: 'Bar!', + createTaskRunner: () => ({ + async run() { + return { runAt, state: {}, hasError: true }; + }, + }), + }, + }, + }); + + await runner.run(); + + expect(onTaskEvent).toHaveBeenCalledWith( + withAnyTiming( + asTaskRunEvent( + id, + asErr({ + task: instance, + persistence: TaskPersistence.Recurring, + result: TaskRunResult.Success, + error: new Error(`Alerting task failed to run.`), + }) + ) + ) + ); + }); + test('emits TaskEvent when a task run throws an error', async () => { const id = _.random(1, 20).toString(); const error = new Error('Dangit!'); diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts index 8ad020684e269..e8ec5cb0f2d91 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -47,6 +47,7 @@ import { FailedRunResult, FailedTaskResult, isFailedRunResult, + RunContext, SuccessfulRunResult, TaskDefinition, TaskStatus, @@ -321,9 +322,9 @@ export class TaskManagerRunner implements TaskRunner { let taskParamsValidation; if (this.requeueInvalidTasksConfig.enabled) { - taskParamsValidation = this.validateTaskParams(); + taskParamsValidation = this.validateTaskParams(modifiedContext); if (!taskParamsValidation.error) { - taskParamsValidation = await this.validateIndirectTaskParams(); + taskParamsValidation = await this.validateIndirectTaskParams(modifiedContext); } } @@ -359,9 +360,9 @@ export class TaskManagerRunner implements TaskRunner { } } - private validateTaskParams() { + private validateTaskParams({ taskInstance }: RunContext) { let error; - const { state, taskType, params, id, numSkippedRuns = 0 } = this.instance.task; + const { state, taskType, params, id, numSkippedRuns = 0 } = taskInstance; const { max_attempts: maxAttempts } = this.requeueInvalidTasksConfig; try { @@ -383,9 +384,9 @@ export class TaskManagerRunner implements TaskRunner { return { ...(error ? { error } : {}), state }; } - private async validateIndirectTaskParams() { + private async validateIndirectTaskParams({ taskInstance }: RunContext) { let error; - const { state, taskType, id, numSkippedRuns = 0 } = this.instance.task; + const { state, taskType, id, numSkippedRuns = 0 } = taskInstance; const { max_attempts: maxAttempts } = this.requeueInvalidTasksConfig; const indirectParamsSchema = this.definition.indirectParamsSchema; @@ -735,23 +736,30 @@ export class TaskManagerRunner implements TaskRunner { await eitherAsync( result, - async ({ runAt, schedule }: SuccessfulRunResult) => { - this.onTaskEvent( - asTaskRunEvent( - this.id, - asOk({ - task, - persistence: - schedule || task.schedule - ? TaskPersistence.Recurring - : TaskPersistence.NonRecurring, - result: await (runAt || schedule || task.schedule - ? this.processResultForRecurringTask(result) - : this.processResultWhenDone()), - }), - taskTiming - ) - ); + async ({ runAt, schedule, hasError }: SuccessfulRunResult) => { + const processedResult = { + task, + persistence: + schedule || task.schedule ? TaskPersistence.Recurring : TaskPersistence.NonRecurring, + result: await (runAt || schedule || task.schedule + ? this.processResultForRecurringTask(result) + : this.processResultWhenDone()), + }; + + // Alerting task runner returns SuccessfulRunResult with hasError=true + // when the alerting task fails, so we check for this condition in order + // to emit the correct task run event for metrics collection + const taskRunEvent = hasError + ? asTaskRunEvent( + this.id, + asErr({ + ...processedResult, + error: new Error(`Alerting task failed to run.`), + }), + taskTiming + ) + : asTaskRunEvent(this.id, asOk(processedResult), taskTiming); + this.onTaskEvent(taskRunEvent); }, async ({ error }: FailedRunResult) => { this.onTaskEvent( diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/index.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/index.ts index af17d1b76ed99..420dfe795f322 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/index.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/index.ts @@ -10,6 +10,7 @@ import { FtrProviderContext } from '../../ftr_provider_context'; export default function ({ loadTestFile }: FtrProviderContext) { describe('task_manager', function taskManagerSuite() { loadTestFile(require.resolve('./background_task_utilization_route')); + loadTestFile(require.resolve('./metrics_route')); loadTestFile(require.resolve('./health_route')); loadTestFile(require.resolve('./task_management')); loadTestFile(require.resolve('./task_management_scheduled_at')); diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/metrics_route.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/metrics_route.ts new file mode 100644 index 0000000000000..4da679b6839ac --- /dev/null +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/metrics_route.ts @@ -0,0 +1,227 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import url from 'url'; +import supertest from 'supertest'; +import { NodeMetrics } from '@kbn/task-manager-plugin/server/routes/metrics'; +import { FtrProviderContext } from '../../ftr_provider_context'; + +export default function ({ getService }: FtrProviderContext) { + const config = getService('config'); + const retry = getService('retry'); + const request = supertest(url.format(config.get('servers.kibana'))); + + const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + + function getMetricsRequest(reset: boolean = false) { + return request + .get(`/api/task_manager/metrics${reset ? '' : '?reset=false'}`) + .set('kbn-xsrf', 'foo') + .expect(200) + .then((response) => response.body); + } + + function getMetrics( + reset: boolean = false, + callback: (metrics: NodeMetrics) => boolean + ): Promise { + return retry.try(async () => { + const metrics = await getMetricsRequest(reset); + + if (metrics.metrics && callback(metrics)) { + return metrics; + } + + await delay(500); + throw new Error('Expected metrics not received'); + }); + } + + describe('task manager metrics', () => { + describe('task claim', () => { + it('should increment task claim success/total counters', async () => { + // counters are reset every 30 seconds, so wait until the start of a + // fresh counter cycle to make sure values are incrementing + const initialMetrics = ( + await getMetrics(false, (metrics) => metrics?.metrics?.task_claim?.value.total === 1) + ).metrics; + expect(initialMetrics).not.to.be(null); + expect(initialMetrics?.task_claim).not.to.be(null); + expect(initialMetrics?.task_claim?.value).not.to.be(null); + + let previousTaskClaimSuccess = initialMetrics?.task_claim?.value.total!; + let previousTaskClaimTotal = initialMetrics?.task_claim?.value.success!; + let previousTaskClaimTimestamp: string = initialMetrics?.task_claim?.timestamp!; + + for (let i = 0; i < 5; ++i) { + const metrics = ( + await getMetrics( + false, + (m: NodeMetrics) => m.metrics?.task_claim?.timestamp !== previousTaskClaimTimestamp + ) + ).metrics; + expect(metrics).not.to.be(null); + expect(metrics?.task_claim).not.to.be(null); + expect(metrics?.task_claim?.value).not.to.be(null); + + expect(metrics?.task_claim?.value.success).to.be.greaterThan(previousTaskClaimSuccess); + expect(metrics?.task_claim?.value.total).to.be.greaterThan(previousTaskClaimTotal); + + previousTaskClaimTimestamp = metrics?.task_claim?.timestamp!; + previousTaskClaimSuccess = metrics?.task_claim?.value.success!; + previousTaskClaimTotal = metrics?.task_claim?.value.total!; + + // check that duration histogram exists + expect(metrics?.task_claim?.value.duration).not.to.be(null); + expect(Array.isArray(metrics?.task_claim?.value.duration.counts)).to.be(true); + expect(Array.isArray(metrics?.task_claim?.value.duration.values)).to.be(true); + } + }); + + it('should reset task claim success/total counters at an interval', async () => { + const initialCounterValue = 7; + const initialMetrics = ( + await getMetrics( + false, + (metrics) => metrics?.metrics?.task_claim?.value.total === initialCounterValue + ) + ).metrics; + expect(initialMetrics).not.to.be(null); + expect(initialMetrics?.task_claim).not.to.be(null); + expect(initialMetrics?.task_claim?.value).not.to.be(null); + + // retry until counter value resets + const resetMetrics = ( + await getMetrics(false, (m: NodeMetrics) => m?.metrics?.task_claim?.value.total === 1) + ).metrics; + expect(resetMetrics).not.to.be(null); + expect(resetMetrics?.task_claim).not.to.be(null); + expect(resetMetrics?.task_claim?.value).not.to.be(null); + }); + + it('should reset task claim success/total counters on request', async () => { + const initialCounterValue = 1; + const initialMetrics = ( + await getMetrics( + false, + (metrics) => metrics?.metrics?.task_claim?.value.total === initialCounterValue + ) + ).metrics; + expect(initialMetrics).not.to.be(null); + expect(initialMetrics?.task_claim).not.to.be(null); + expect(initialMetrics?.task_claim?.value).not.to.be(null); + + let previousTaskClaimTimestamp: string = initialMetrics?.task_claim?.timestamp!; + + for (let i = 0; i < 5; ++i) { + const metrics = ( + await getMetrics( + true, + (m: NodeMetrics) => m.metrics?.task_claim?.timestamp !== previousTaskClaimTimestamp + ) + ).metrics; + expect(metrics).not.to.be(null); + expect(metrics?.task_claim).not.to.be(null); + expect(metrics?.task_claim?.value).not.to.be(null); + + expect(metrics?.task_claim?.value.success).to.equal(1); + expect(metrics?.task_claim?.value.total).to.equal(1); + + previousTaskClaimTimestamp = metrics?.task_claim?.timestamp!; + + // check that duration histogram exists + expect(metrics?.task_claim?.value.duration).not.to.be(null); + expect(Array.isArray(metrics?.task_claim?.value.duration.counts)).to.be(true); + expect(Array.isArray(metrics?.task_claim?.value.duration.values)).to.be(true); + } + }); + }); + + describe('task run test', () => { + let ruleId: string | null = null; + before(async () => { + // create a rule that fires actions + const rule = await request + .post(`/api/alerting/rule`) + .set('kbn-xsrf', 'foo') + .send({ + enabled: true, + name: 'test rule', + tags: [], + rule_type_id: '.es-query', + consumer: 'alerts', + // set schedule long so we can control when it runs + schedule: { interval: '1d' }, + actions: [], + params: { + aggType: 'count', + esQuery: '{\n "query":{\n "match_all" : {}\n }\n}', + excludeHitsFromPreviousRun: false, + groupBy: 'all', + index: ['.kibana-event-log*'], + searchType: 'esQuery', + size: 100, + termSize: 5, + threshold: [0], + thresholdComparator: '>', + timeField: '@timestamp', + timeWindowSize: 5, + timeWindowUnit: 'm', + }, + }) + .expect(200) + .then((response) => response.body); + + ruleId = rule.id; + }); + + after(async () => { + // delete rule + await request.delete(`/api/alerting/rule/${ruleId}`).set('kbn-xsrf', 'foo').expect(204); + }); + + it('should increment task run success/total counters', async () => { + const initialMetrics = ( + await getMetrics( + false, + (metrics) => + metrics?.metrics?.task_run?.value.by_type.alerting?.total === 1 && + metrics?.metrics?.task_run?.value.by_type.alerting?.success === 1 + ) + ).metrics; + expect(initialMetrics).not.to.be(null); + expect(initialMetrics?.task_claim).not.to.be(null); + expect(initialMetrics?.task_claim?.value).not.to.be(null); + + for (let i = 0; i < 1; ++i) { + // run the rule and expect counters to increment + await request + .post('/api/sample_tasks/run_soon') + .set('kbn-xsrf', 'xxx') + .send({ task: { id: ruleId } }) + .expect(200); + + await getMetrics( + false, + (metrics) => + metrics?.metrics?.task_run?.value.by_type.alerting?.total === i + 2 && + metrics?.metrics?.task_run?.value.by_type.alerting?.success === i + 2 + ); + } + + // counter should reset on its own + await getMetrics( + false, + (metrics) => + metrics?.metrics?.task_run?.value.by_type.alerting?.total === 0 && + metrics?.metrics?.task_run?.value.by_type.alerting?.success === 0 + ); + }); + }); + }); +} From 17936ffd21d4b4b274d2cda90902764ed0d4ae07 Mon Sep 17 00:00:00 2001 From: Tiago Costa Date: Thu, 10 Aug 2023 15:35:34 +0100 Subject: [PATCH 2/3] fix(NA): yarn env vars for node_modules mirrors (#163549) This PR fixes the setup we have for the node_module mirrors vars that are overriding and pointing into our middle cache. The previous configuration was not working as intended as the env vars set globally on CI never ended up in the bazel managed yarn install. Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> --- WORKSPACE.bazel | 4 ++++ src/dev/ci_setup/setup_env.sh | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/WORKSPACE.bazel b/WORKSPACE.bazel index 0b5c0d0bc3634..baec139453143 100644 --- a/WORKSPACE.bazel +++ b/WORKSPACE.bazel @@ -57,7 +57,11 @@ yarn_install( quiet = False, frozen_lockfile = False, environment = { + "GECKODRIVER_CDNURL": "https://us-central1-elastic-kibana-184716.cloudfunctions.net/kibana-ci-proxy-cache", + "CHROMEDRIVER_CDNURL": "https://us-central1-elastic-kibana-184716.cloudfunctions.net/kibana-ci-proxy-cache", + "CHROMEDRIVER_CDNBINARIESURL": "https://us-central1-elastic-kibana-184716.cloudfunctions.net/kibana-ci-proxy-cache", "SASS_BINARY_SITE": "https://us-central1-elastic-kibana-184716.cloudfunctions.net/kibana-ci-proxy-cache/node-sass", "RE2_DOWNLOAD_MIRROR": "https://us-central1-elastic-kibana-184716.cloudfunctions.net/kibana-ci-proxy-cache/node-re2", + "CYPRESS_DOWNLOAD_MIRROR": "https://us-central1-elastic-kibana-184716.cloudfunctions.net/kibana-ci-proxy-cache/cypress", } ) diff --git a/src/dev/ci_setup/setup_env.sh b/src/dev/ci_setup/setup_env.sh index 2bcf3775183e5..c1942775c88b5 100644 --- a/src/dev/ci_setup/setup_env.sh +++ b/src/dev/ci_setup/setup_env.sh @@ -128,10 +128,10 @@ export PATH="$PATH:$yarnGlobalDir" # use a proxy to fetch chromedriver/geckodriver asset export GECKODRIVER_CDNURL="https://us-central1-elastic-kibana-184716.cloudfunctions.net/kibana-ci-proxy-cache" -export CHROMEDRIVER_LEGACY_CDNURL="https://us-central1-elastic-kibana-184716.cloudfunctions.net/kibana-ci-proxy-cache" export CHROMEDRIVER_CDNURL="https://us-central1-elastic-kibana-184716.cloudfunctions.net/kibana-ci-proxy-cache" export CHROMEDRIVER_CDNBINARIESURL="https://us-central1-elastic-kibana-184716.cloudfunctions.net/kibana-ci-proxy-cache" export RE2_DOWNLOAD_MIRROR="https://us-central1-elastic-kibana-184716.cloudfunctions.net/kibana-ci-proxy-cache" +export SASS_BINARY_SITE="https://us-central1-elastic-kibana-184716.cloudfunctions.net/kibana-ci-proxy-cache/node-sass" export CYPRESS_DOWNLOAD_MIRROR="https://us-central1-elastic-kibana-184716.cloudfunctions.net/kibana-ci-proxy-cache/cypress" export CHECKS_REPORTER_ACTIVE=false From ea57caed71e5a9eca5325fc9e16f0bd34ebf3c7a Mon Sep 17 00:00:00 2001 From: Tim Sullivan Date: Thu, 10 Aug 2023 07:43:05 -0700 Subject: [PATCH 3/3] unskip license type functional test (#163199) --- x-pack/test/licensing_plugin/scenario.ts | 16 ++++++++-------- x-pack/test/licensing_plugin/server/updates.ts | 3 +-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/x-pack/test/licensing_plugin/scenario.ts b/x-pack/test/licensing_plugin/scenario.ts index f7d2f5790460d..051a937d98d35 100644 --- a/x-pack/test/licensing_plugin/scenario.ts +++ b/x-pack/test/licensing_plugin/scenario.ts @@ -74,18 +74,18 @@ export function createScenario({ getService, getPageObjects }: FtrProviderContex .post('/_license/?acknowledge=true') .send({ license: { - uid: '00000000-d3ad-7357-c0d3-000000000000', + uid: '504430e6-503c-4316-85cb-b402c730ca08', type: 'enterprise', - issue_date_in_millis: 1577836800000, - start_date_in_millis: 1577836800000, - // expires 2022-12-31 - expiry_date_in_millis: 1672531199999, + issue_date_in_millis: 1669680000000, + start_date_in_millis: 1669680000000, + // expires 2024-12-31 + expiry_date_in_millis: 1735689599999, max_resource_units: 250, max_nodes: null, - issued_to: 'Elastic Internal Use (development environments)', - issuer: 'Elastic', + issued_to: 'Elastic - INTERNAL (development environments)', + issuer: 'API', signature: - 'AAAABQAAAA1gHUVis7hel8b8nNCAAAAAIAo5/x6hrsGh1GqqrJmy4qgmEC7gK0U4zQ6q5ZEMhm4jAAABAKMR+w3KZsMJfG5jNWgZXJLwRmiNqN7k94vKFgRdj1yM+gA9ufhXIn9d01OvFhPjilIqm+fxVjCxXwGKbFRiwtTWnTYjXPuNml+qCFGgUWguWEcVoIW6VU7/lYOqMJ4EB4zOMLe93P267iaDm542aelQrW1OJ69lGGuPBik8v9r1bNZzKBQ99VUr/qoosGDAm0udh2HxWzYoCL5lDML5Niy87xlVCubSSBXdUXzUgdZKKk6pKaMdHswB1gjvEfnwqPxEWAyrV0BCr/T1WehXd7U4p6/zt6sJ6cPh+34AZe9g4+3WPKrZhX4iaSHMDDHn4HNjO72CZ2oi42ZDNnJ37tA=', + 'AAAABQAAAA2h1vBafHuRhjOHREKYAAAAIAo5/x6hrsGh1GqqrJmy4qgmEC7gK0U4zQ6q5ZEMhm4jAAABAByGz9MmRW/L7vQriISa6u8Oov7zykA+Cv55BToWEthSn0c5KQUxcWG+K5Cm4/OkFsXA8TE4zFnlSgYxmQi2Eqq7IAKGdcxI/xhQfMsq5RWlSEwtfyV0M2RKJxgam8o2lvKC9EbrU76ISYr7jTkgoBl6GFSjdfXMHmxNXBSKDDm03ZeXkWkvuNNFrHJuYivf2Se9OeeB/eu4jqUI0UuNfPYF07ZcYvtKfj3KX+aysCSV2FW8wgyAjndOPEinfYcwAJ09zcl+MTig2K0DQTsYkLykXmzZnLz6qeuVVFjCTowxizDFW+5MrpzUnwkjqv8CFhLfvxG7waWQWslv8fXLUn8=', }, }) .auth('license_manager_user', 'license_manager_user-password') diff --git a/x-pack/test/licensing_plugin/server/updates.ts b/x-pack/test/licensing_plugin/server/updates.ts index 6acbe42bc1abe..ccec87dc0cdc6 100644 --- a/x-pack/test/licensing_plugin/server/updates.ts +++ b/x-pack/test/licensing_plugin/server/updates.ts @@ -17,8 +17,7 @@ export default function (ftrContext: FtrProviderContext) { const scenario = createScenario(ftrContext); - // FLAKY: https://github.com/elastic/kibana/issues/110938 - describe.skip('changes in license types', () => { + describe('changes in license types', () => { after(async () => { await scenario.teardown(); });