From 8130f489d87f2b0b3083b528c241fb076cf8e345 Mon Sep 17 00:00:00 2001 From: Patrick Mueller Date: Wed, 1 Sep 2021 19:35:07 -0400 Subject: [PATCH] [7.14] [task manager] provide better diagnostics when task manager performance is degraded (#109741) (#110875) * [task manager] provide better diagnostics when task manager performance is degraded (#109741) resolves #109095 resolves #106854 Changes the way task manager and alerting perform their health / status checks: - no longer sets an `unavailable` status; now uses `degraded` instead - change task manager "hot stats freshness" calculation to allow for staler data before signalling a problem - Changed the "Detected potential performance issue" message to sound less scary, include a doc link to task manager health monitoring, and log a debug instead of warning level - add additional debug logging when task manager sets a status that's not `available`, indicating why it's setting that status (in the code, it's when task manager uses HealthStatus.Warning or Error) # Conflicts: # x-pack/plugins/task_manager/server/monitoring/capacity_estimation.ts # x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts # x-pack/plugins/task_manager/server/routes/health.test.ts * fix backport to remove post-7.14 stuff --- .../alerting/server/health/get_state.test.ts | 8 +-- .../alerting/server/health/get_state.ts | 10 ++-- x-pack/plugins/alerting/server/plugin.ts | 2 +- .../server/lib/calculate_health_status.ts | 44 ++++++++++------ .../server/lib/log_health_metrics.test.ts | 15 +++--- .../server/lib/log_health_metrics.ts | 10 ++-- .../monitoring/capacity_estimation.test.ts | 20 +++++++ .../server/monitoring/capacity_estimation.ts | 46 +++++++++++++--- .../monitoring/monitoring_stats_stream.ts | 5 +- .../monitoring/task_run_statistics.test.ts | 13 +++-- .../server/monitoring/task_run_statistics.ts | 34 +++++++++--- x-pack/plugins/task_manager/server/plugin.ts | 8 +++ .../task_manager/server/routes/health.test.ts | 52 +++++++++++-------- .../task_manager/server/routes/health.ts | 8 ++- 14 files changed, 192 insertions(+), 83 deletions(-) diff --git a/x-pack/plugins/alerting/server/health/get_state.test.ts b/x-pack/plugins/alerting/server/health/get_state.test.ts index 24f3c101b26b..fdb09aac5ce6 100644 --- a/x-pack/plugins/alerting/server/health/get_state.test.ts +++ b/x-pack/plugins/alerting/server/health/get_state.test.ts @@ -211,8 +211,8 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => { }) ).toPromise(); - expect(status.level).toEqual(ServiceStatusLevels.unavailable); - expect(status.summary).toEqual('Alerting framework is unavailable'); + expect(status.level).toEqual(ServiceStatusLevels.degraded); + expect(status.summary).toEqual('Alerting framework is degraded'); expect(status.meta).toBeUndefined(); }); @@ -268,8 +268,8 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => { }), retryDelay ).subscribe((status) => { - expect(status.level).toEqual(ServiceStatusLevels.unavailable); - expect(status.summary).toEqual('Alerting framework is unavailable'); + expect(status.level).toEqual(ServiceStatusLevels.degraded); + expect(status.summary).toEqual('Alerting framework is degraded'); expect(status.meta).toEqual({ error: err }); }); diff --git a/x-pack/plugins/alerting/server/health/get_state.ts b/x-pack/plugins/alerting/server/health/get_state.ts index 255037d7015a..34f897ad5b73 100644 --- a/x-pack/plugins/alerting/server/health/get_state.ts +++ b/x-pack/plugins/alerting/server/health/get_state.ts @@ -73,9 +73,7 @@ const getHealthServiceStatus = async ( const level = doc.state?.health_status === HealthStatus.OK ? ServiceStatusLevels.available - : doc.state?.health_status === HealthStatus.Warning - ? ServiceStatusLevels.degraded - : ServiceStatusLevels.unavailable; + : ServiceStatusLevels.degraded; return { level, summary: LEVEL_SUMMARY[level.toString()], @@ -102,10 +100,10 @@ export const getHealthServiceStatusWithRetryAndErrorHandling = ( ); }), catchError((error) => { - logger.warn(`Alerting framework is unavailable due to the error: ${error}`); + logger.warn(`Alerting framework is degraded due to the error: ${error}`); return of({ - level: ServiceStatusLevels.unavailable, - summary: LEVEL_SUMMARY[ServiceStatusLevels.unavailable.toString()], + level: ServiceStatusLevels.degraded, + summary: LEVEL_SUMMARY[ServiceStatusLevels.degraded.toString()], meta: { error }, }); }) diff --git a/x-pack/plugins/alerting/server/plugin.ts b/x-pack/plugins/alerting/server/plugin.ts index df63625bf242..ba74f83eb5d3 100644 --- a/x-pack/plugins/alerting/server/plugin.ts +++ b/x-pack/plugins/alerting/server/plugin.ts @@ -234,7 +234,7 @@ export class AlertingPlugin { ); const serviceStatus$ = new BehaviorSubject({ - level: ServiceStatusLevels.unavailable, + level: ServiceStatusLevels.degraded, summary: 'Alerting is initializing', }); core.status.set(serviceStatus$); diff --git a/x-pack/plugins/task_manager/server/lib/calculate_health_status.ts b/x-pack/plugins/task_manager/server/lib/calculate_health_status.ts index 7a6bc5986210..529733668c5f 100644 --- a/x-pack/plugins/task_manager/server/lib/calculate_health_status.ts +++ b/x-pack/plugins/task_manager/server/lib/calculate_health_status.ts @@ -9,32 +9,42 @@ import { isString } from 'lodash'; import { JsonValue } from '@kbn/common-utils'; import { HealthStatus, RawMonitoringStats } from '../monitoring'; import { TaskManagerConfig } from '../config'; +import { Logger } from '../../../../../src/core/server'; export function calculateHealthStatus( summarizedStats: RawMonitoringStats, - config: TaskManagerConfig + config: TaskManagerConfig, + logger: Logger ): HealthStatus { const now = Date.now(); - // if "hot" health stats are any more stale than monitored_stats_required_freshness (pollInterval +1s buffer by default) - // consider the system unhealthy - const requiredHotStatsFreshness: number = config.monitored_stats_required_freshness; + // if "hot" health stats are any more stale than monitored_stats_required_freshness + // times a multiplier, consider the system unhealthy + const requiredHotStatsFreshness: number = config.monitored_stats_required_freshness * 3; - // if "cold" health stats are any more stale than the configured refresh (+ a buffer), consider the system unhealthy + // if "cold" health stats are any more stale than the configured refresh + // times a multiplier, consider the system unhealthy const requiredColdStatsFreshness: number = config.monitored_aggregated_stats_refresh_rate * 1.5; - /** - * If the monitored stats aren't fresh, return a red status - */ - const healthStatus = - hasStatus(summarizedStats.stats, HealthStatus.Error) || - hasExpiredHotTimestamps(summarizedStats, now, requiredHotStatsFreshness) || - hasExpiredColdTimestamps(summarizedStats, now, requiredColdStatsFreshness) - ? HealthStatus.Error - : hasStatus(summarizedStats.stats, HealthStatus.Warning) - ? HealthStatus.Warning - : HealthStatus.OK; - return healthStatus; + if (hasStatus(summarizedStats.stats, HealthStatus.Error)) { + return HealthStatus.Error; + } + + if (hasExpiredHotTimestamps(summarizedStats, now, requiredHotStatsFreshness)) { + logger.debug('setting HealthStatus.Error because of expired hot timestamps'); + return HealthStatus.Error; + } + + if (hasExpiredColdTimestamps(summarizedStats, now, requiredColdStatsFreshness)) { + logger.debug('setting HealthStatus.Error because of expired cold timestamps'); + return HealthStatus.Error; + } + + if (hasStatus(summarizedStats.stats, HealthStatus.Warning)) { + return HealthStatus.Warning; + } + + return HealthStatus.OK; } function hasStatus(stats: RawMonitoringStats['stats'], status: HealthStatus): boolean { diff --git a/x-pack/plugins/task_manager/server/lib/log_health_metrics.test.ts b/x-pack/plugins/task_manager/server/lib/log_health_metrics.test.ts index aca73a4b7743..77cdecb0e4c9 100644 --- a/x-pack/plugins/task_manager/server/lib/log_health_metrics.test.ts +++ b/x-pack/plugins/task_manager/server/lib/log_health_metrics.test.ts @@ -50,12 +50,15 @@ describe('logHealthMetrics', () => { (calculateHealthStatus as jest.Mock).mockImplementation(() => HealthStatus.Error); logHealthMetrics(health, logger, config); - expect((logger as jest.Mocked).warn.mock.calls[0][0] as string).toBe( - `Detected potential performance issue with Task Manager. Set 'xpack.task_manager.monitored_stats_health_verbose_log.enabled: true' in your Kibana.yml to enable debug logging` - ); - expect((logger as jest.Mocked).warn.mock.calls[1][0] as string).toBe( - `Detected potential performance issue with Task Manager. Set 'xpack.task_manager.monitored_stats_health_verbose_log.enabled: true' in your Kibana.yml to enable debug logging` - ); + const debugCalls = (logger as jest.Mocked).debug.mock.calls; + const performanceMessage = /^Task Manager detected a degradation in performance/; + const lastStatsMessage = /^Latest Monitored Stats: \{.*\}$/; + expect(debugCalls[0][0] as string).toMatch(lastStatsMessage); + expect(debugCalls[1][0] as string).toMatch(lastStatsMessage); + expect(debugCalls[2][0] as string).toMatch(performanceMessage); + expect(debugCalls[3][0] as string).toMatch(lastStatsMessage); + expect(debugCalls[4][0] as string).toMatch(lastStatsMessage); + expect(debugCalls[5][0] as string).toMatch(performanceMessage); }); it('should not log a warning message to enable verbose logging when the status goes from Warning to OK', () => { diff --git a/x-pack/plugins/task_manager/server/lib/log_health_metrics.ts b/x-pack/plugins/task_manager/server/lib/log_health_metrics.ts index e8511b1e8c71..d541ffb5684d 100644 --- a/x-pack/plugins/task_manager/server/lib/log_health_metrics.ts +++ b/x-pack/plugins/task_manager/server/lib/log_health_metrics.ts @@ -5,6 +5,8 @@ * 2.0. */ +import { kibanaPackageJson } from '@kbn/utils'; + import { isEmpty } from 'lodash'; import { Logger } from '../../../../../src/core/server'; import { HealthStatus } from '../monitoring'; @@ -36,7 +38,7 @@ export function logHealthMetrics( capacity_estimation: undefined, }, }; - const statusWithoutCapacity = calculateHealthStatus(healthWithoutCapacity, config); + const statusWithoutCapacity = calculateHealthStatus(healthWithoutCapacity, config, logger); if (statusWithoutCapacity === HealthStatus.Warning) { logLevel = LogLevel.Warn; } else if (statusWithoutCapacity === HealthStatus.Error && !isEmpty(monitoredHealth.stats)) { @@ -44,6 +46,8 @@ export function logHealthMetrics( } const message = `Latest Monitored Stats: ${JSON.stringify(monitoredHealth)}`; + const docLink = `https://www.elastic.co/guide/en/kibana/${kibanaPackageJson.branch}/task-manager-health-monitoring.html`; + const detectedProblemMessage = `Task Manager detected a degradation in performance. This is usually temporary, and Kibana can recover automatically. If the problem persists, check the docs for troubleshooting information: ${docLink} .`; if (enabled) { const driftInSeconds = (monitoredHealth.stats.runtime?.value.drift.p99 ?? 0) / 1000; if ( @@ -80,9 +84,7 @@ export function logHealthMetrics( // This is legacy support - we used to always show this logger.debug(message); if (logLevel !== LogLevel.Debug && lastLogLevel === LogLevel.Debug) { - logger.warn( - `Detected potential performance issue with Task Manager. Set 'xpack.task_manager.monitored_stats_health_verbose_log.enabled: true' in your Kibana.yml to enable debug logging` - ); + logger.debug(detectedProblemMessage); } } diff --git a/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.test.ts b/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.test.ts index bd8ecf0cc6d9..3ea286298603 100644 --- a/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.test.ts @@ -7,11 +7,19 @@ import { CapacityEstimationParams, estimateCapacity } from './capacity_estimation'; import { HealthStatus, RawMonitoringStats } from './monitoring_stats_stream'; +import { mockLogger } from '../test_utils'; describe('estimateCapacity', () => { + const logger = mockLogger(); + + beforeAll(() => { + jest.resetAllMocks(); + }); + test('estimates the max throughput per minute based on the workload and the assumed kibana instances', async () => { expect( estimateCapacity( + logger, mockStats( { max_workers: 10, poll_interval: 3000 }, { @@ -67,6 +75,7 @@ describe('estimateCapacity', () => { test('reduces the available capacity per kibana when average task duration exceeds the poll interval', async () => { expect( estimateCapacity( + logger, mockStats( { max_workers: 10, poll_interval: 3000 }, { @@ -124,6 +133,7 @@ describe('estimateCapacity', () => { test('estimates the max throughput per minute when duration by persistence is empty', async () => { expect( estimateCapacity( + logger, mockStats( { max_workers: 10, poll_interval: 3000 }, { @@ -160,6 +170,7 @@ describe('estimateCapacity', () => { test('estimates the max throughput per minute based on the workload and the assumed kibana instances when there are tasks that repeat each hour or day', async () => { expect( estimateCapacity( + logger, mockStats( { max_workers: 10, poll_interval: 3000 }, { @@ -215,6 +226,7 @@ describe('estimateCapacity', () => { test('estimates the max throughput available when there are no active Kibana', async () => { expect( estimateCapacity( + logger, mockStats( { max_workers: 10, poll_interval: 3000 }, { @@ -271,6 +283,7 @@ describe('estimateCapacity', () => { test('estimates the max throughput available to handle the workload when there are multiple active kibana instances', async () => { expect( estimateCapacity( + logger, mockStats( { max_workers: 10, poll_interval: 3000 }, { @@ -332,6 +345,7 @@ describe('estimateCapacity', () => { expect( estimateCapacity( + logger, mockStats( { max_workers: 10, poll_interval: 3000 }, { @@ -412,6 +426,7 @@ describe('estimateCapacity', () => { expect( estimateCapacity( + logger, mockStats( { max_workers: 10, poll_interval: 3000 }, { @@ -493,6 +508,7 @@ describe('estimateCapacity', () => { test('marks estimated capacity as OK state when workload and load suggest capacity is sufficient', async () => { expect( estimateCapacity( + logger, mockStats( { max_workers: 10, poll_interval: 3000 }, { @@ -557,6 +573,7 @@ describe('estimateCapacity', () => { test('marks estimated capacity as Warning state when capacity is insufficient for recent spikes of non-recurring workload, but sufficient for the recurring workload', async () => { expect( estimateCapacity( + logger, mockStats( { max_workers: 10, poll_interval: 3000 }, { @@ -618,6 +635,7 @@ describe('estimateCapacity', () => { test('marks estimated capacity as Error state when workload and load suggest capacity is insufficient', async () => { expect( estimateCapacity( + logger, mockStats( { max_workers: 10, poll_interval: 3000 }, { @@ -679,6 +697,7 @@ describe('estimateCapacity', () => { test('recommmends a 20% increase in kibana when a spike in non-recurring tasks forces recurring task capacity to zero', async () => { expect( estimateCapacity( + logger, mockStats( { max_workers: 10, poll_interval: 3000 }, { @@ -754,6 +773,7 @@ describe('estimateCapacity', () => { test('recommmends a 20% increase in kibana when a spike in non-recurring tasks in a system with insufficient capacity even for recurring tasks', async () => { expect( estimateCapacity( + logger, mockStats( { max_workers: 10, poll_interval: 3000 }, { diff --git a/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.ts b/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.ts index 90f564152c8c..70a880ee4a4a 100644 --- a/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.ts +++ b/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.ts @@ -12,6 +12,7 @@ import { RawMonitoringStats, RawMonitoredStat, HealthStatus } from './monitoring import { AveragedStat } from './task_run_calcultors'; import { TaskPersistenceTypes } from './task_run_statistics'; import { asErr, asOk, map, Result } from '../lib/result_type'; +import { Logger } from '../../../../../src/core/server'; export interface CapacityEstimationStat extends JsonObject { observed: { @@ -44,6 +45,7 @@ function isCapacityEstimationParams( } export function estimateCapacity( + logger: Logger, capacityStats: CapacityEstimationParams ): RawMonitoredStat { const workload = capacityStats.workload.value; @@ -183,13 +185,13 @@ export function estimateCapacity( averageCapacityUsedByNonRecurringAndEphemeralTasksPerKibana + averageRecurringRequiredPerMinute / assumedKibanaInstances; + const status = getHealthStatus(logger, { + assumedRequiredThroughputPerMinutePerKibana, + assumedAverageRecurringRequiredThroughputPerMinutePerKibana, + capacityPerMinutePerKibana, + }); return { - status: - assumedRequiredThroughputPerMinutePerKibana < capacityPerMinutePerKibana - ? HealthStatus.OK - : assumedAverageRecurringRequiredThroughputPerMinutePerKibana < capacityPerMinutePerKibana - ? HealthStatus.Warning - : HealthStatus.Error, + status, timestamp: new Date().toISOString(), value: { observed: mapValues( @@ -220,13 +222,43 @@ export function estimateCapacity( }; } +interface GetHealthStatusParams { + assumedRequiredThroughputPerMinutePerKibana: number; + assumedAverageRecurringRequiredThroughputPerMinutePerKibana: number; + capacityPerMinutePerKibana: number; +} + +function getHealthStatus(logger: Logger, params: GetHealthStatusParams): HealthStatus { + const { + assumedRequiredThroughputPerMinutePerKibana, + assumedAverageRecurringRequiredThroughputPerMinutePerKibana, + capacityPerMinutePerKibana, + } = params; + if (assumedRequiredThroughputPerMinutePerKibana < capacityPerMinutePerKibana) { + return HealthStatus.OK; + } + + if (assumedAverageRecurringRequiredThroughputPerMinutePerKibana < capacityPerMinutePerKibana) { + logger.debug( + `setting HealthStatus.Warning because assumedAverageRecurringRequiredThroughputPerMinutePerKibana (${assumedAverageRecurringRequiredThroughputPerMinutePerKibana}) < capacityPerMinutePerKibana (${capacityPerMinutePerKibana})` + ); + return HealthStatus.Warning; + } + + logger.debug( + `setting HealthStatus.Error because assumedRequiredThroughputPerMinutePerKibana (${assumedRequiredThroughputPerMinutePerKibana}) >= capacityPerMinutePerKibana (${capacityPerMinutePerKibana}) AND assumedAverageRecurringRequiredThroughputPerMinutePerKibana (${assumedAverageRecurringRequiredThroughputPerMinutePerKibana}) >= capacityPerMinutePerKibana (${capacityPerMinutePerKibana})` + ); + return HealthStatus.Error; +} + export function withCapacityEstimate( + logger: Logger, monitoredStats: RawMonitoringStats['stats'] ): RawMonitoringStats['stats'] { if (isCapacityEstimationParams(monitoredStats)) { return { ...monitoredStats, - capacity_estimation: estimateCapacity(monitoredStats), + capacity_estimation: estimateCapacity(logger, monitoredStats), }; } return monitoredStats; diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts index 0d3b6ebf56de..3daed2bc1690 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts @@ -116,6 +116,7 @@ export function createMonitoringStatsStream( } export function summarizeMonitoringStats( + logger: Logger, { // eslint-disable-next-line @typescript-eslint/naming-convention last_update, @@ -123,7 +124,7 @@ export function summarizeMonitoringStats( }: MonitoringStats, config: TaskManagerConfig ): RawMonitoringStats { - const summarizedStats = withCapacityEstimate({ + const summarizedStats = withCapacityEstimate(logger, { ...(configuration ? { configuration: { @@ -136,7 +137,7 @@ export function summarizeMonitoringStats( ? { runtime: { timestamp: runtime.timestamp, - ...summarizeTaskRunStat(runtime.value, config), + ...summarizeTaskRunStat(logger, runtime.value, config), }, } : {}), diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts index 38fdc89278e8..61b611df8799 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts @@ -10,6 +10,7 @@ import { Subject, Observable } from 'rxjs'; import stats from 'stats-lite'; import sinon from 'sinon'; import { take, tap, bufferCount, skip, map } from 'rxjs/operators'; +import { mockLogger } from '../test_utils'; import { ConcreteTaskInstance, TaskStatus } from '../task'; import { @@ -34,9 +35,11 @@ import { configSchema } from '../config'; describe('Task Run Statistics', () => { let fakeTimer: sinon.SinonFakeTimers; + const logger = mockLogger(); beforeAll(() => { fakeTimer = sinon.useFakeTimers(); + jest.resetAllMocks(); }); afterAll(() => fakeTimer.restore()); @@ -75,7 +78,7 @@ describe('Task Run Statistics', () => { // Use 'summarizeTaskRunStat' to receive summarize stats map(({ key, value }: AggregatedStat) => ({ key, - value: summarizeTaskRunStat(value, getTaskManagerConfig()).value, + value: summarizeTaskRunStat(logger, value, getTaskManagerConfig()).value, })), take(runAtDrift.length), bufferCount(runAtDrift.length) @@ -143,7 +146,7 @@ describe('Task Run Statistics', () => { // Use 'summarizeTaskRunStat' to receive summarize stats map(({ key, value }: AggregatedStat) => ({ key, - value: summarizeTaskRunStat(value, getTaskManagerConfig()).value, + value: summarizeTaskRunStat(logger, value, getTaskManagerConfig()).value, })), take(runDurations.length * 2), bufferCount(runDurations.length * 2) @@ -239,7 +242,7 @@ describe('Task Run Statistics', () => { // Use 'summarizeTaskRunStat' to receive summarize stats map(({ key, value }: AggregatedStat) => ({ key, - value: summarizeTaskRunStat(value, getTaskManagerConfig()).value, + value: summarizeTaskRunStat(logger, value, getTaskManagerConfig()).value, })), take(10), bufferCount(10) @@ -319,6 +322,7 @@ describe('Task Run Statistics', () => { map(({ key, value }: AggregatedStat) => ({ key, value: summarizeTaskRunStat( + logger, value, getTaskManagerConfig({ monitored_task_execution_thresholds: { @@ -410,6 +414,7 @@ describe('Task Run Statistics', () => { map(({ key, value }: AggregatedStat) => ({ key, value: summarizeTaskRunStat( + logger, value, getTaskManagerConfig({ monitored_task_execution_thresholds: { @@ -552,7 +557,7 @@ describe('Task Run Statistics', () => { // Use 'summarizeTaskRunStat' to receive summarize stats map(({ key, value }: AggregatedStat) => ({ key, - value: summarizeTaskRunStat(value, getTaskManagerConfig()).value, + value: summarizeTaskRunStat(logger, value, getTaskManagerConfig()).value, })), tap(() => { expectedTimestamp.push(new Date().toISOString()); diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts index da86cfad2a91..a7f6a8a09142 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts @@ -35,6 +35,7 @@ import { import { HealthStatus } from './monitoring_stats_stream'; import { TaskPollingLifecycle } from '../polling_lifecycle'; import { TaskExecutionFailureThreshold, TaskManagerConfig } from '../config'; +import { Logger } from '../../../../../src/core/server'; export enum TaskPersistence { Recurring = 'recurring', @@ -325,6 +326,7 @@ const DEFAULT_PERSISTENCE_FREQUENCIES = { }; export function summarizeTaskRunStat( + logger: Logger, { polling: { // eslint-disable-next-line @typescript-eslint/naming-convention @@ -383,6 +385,7 @@ export function summarizeTaskRunStat( executionResultFrequency, (typedResultFrequencies, taskType) => summarizeTaskExecutionResultFrequencyStat( + logger, { ...DEFAULT_TASK_RUN_FREQUENCIES, ...calculateFrequency(typedResultFrequencies), @@ -398,16 +401,35 @@ export function summarizeTaskRunStat( } function summarizeTaskExecutionResultFrequencyStat( + logger: Logger, resultFrequencySummary: ResultFrequency, executionErrorThreshold: TaskExecutionFailureThreshold ): ResultFrequencySummary { + const status = getHealthStatus(logger, resultFrequencySummary, executionErrorThreshold); return { ...resultFrequencySummary, - status: - resultFrequencySummary.Failed > executionErrorThreshold.warn_threshold - ? resultFrequencySummary.Failed > executionErrorThreshold.error_threshold - ? HealthStatus.Error - : HealthStatus.Warning - : HealthStatus.OK, + status, }; } + +function getHealthStatus( + logger: Logger, + resultFrequencySummary: ResultFrequency, + executionErrorThreshold: TaskExecutionFailureThreshold +): HealthStatus { + if (resultFrequencySummary.Failed > executionErrorThreshold.warn_threshold) { + if (resultFrequencySummary.Failed > executionErrorThreshold.error_threshold) { + logger.debug( + `setting HealthStatus.Error because resultFrequencySummary.Failed (${resultFrequencySummary.Failed}) > error_threshold (${executionErrorThreshold.error_threshold})` + ); + return HealthStatus.Error; + } else { + logger.debug( + `setting HealthStatus.Warning because resultFrequencySummary.Failed (${resultFrequencySummary.Failed}) > warn_threshold (${executionErrorThreshold.warn_threshold})` + ); + return HealthStatus.Warning; + } + } + + return HealthStatus.OK; +} diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index d3e251b751ef..4f6be60abc93 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -87,6 +87,14 @@ export class TaskManagerPlugin this.config! ); + core.status.derivedStatus$.subscribe((status) => + this.logger.debug(`status core.status.derivedStatus now set to ${status.level}`) + ); + serviceStatus$.subscribe((status) => + this.logger.debug(`status serviceStatus now set to ${status.level}`) + ); + + // here is where the system status is updated core.status.set( combineLatest([core.status.derivedStatus$, serviceStatus$]).pipe( map(([derivedStatus, serviceStatus]) => diff --git a/x-pack/plugins/task_manager/server/routes/health.test.ts b/x-pack/plugins/task_manager/server/routes/health.test.ts index ece91ed571f8..a850b3eaf96d 100644 --- a/x-pack/plugins/task_manager/server/routes/health.test.ts +++ b/x-pack/plugins/task_manager/server/routes/health.test.ts @@ -20,7 +20,7 @@ import { RawMonitoringStats, summarizeMonitoringStats, } from '../monitoring'; -import { ServiceStatusLevels } from 'src/core/server'; +import { ServiceStatusLevels, Logger } from 'src/core/server'; import { configSchema, TaskManagerConfig } from '../config'; import { calculateHealthStatusMock } from '../lib/calculate_health_status.mock'; @@ -29,14 +29,14 @@ jest.mock('../lib/log_health_metrics', () => ({ })); describe('healthRoute', () => { + const logger = loggingSystemMock.create().get(); + beforeEach(() => { jest.resetAllMocks(); }); it('registers the route', async () => { const router = httpServiceMock.createRouter(); - - const logger = loggingSystemMock.create().get(); healthRoute(router, of(), logger, uuid.v4(), getTaskManagerConfig()); const [config] = router.get.mock.calls[0]; @@ -46,7 +46,6 @@ describe('healthRoute', () => { it('logs the Task Manager stats at a fixed interval', async () => { const router = httpServiceMock.createRouter(); - const logger = loggingSystemMock.create().get(); const calculateHealthStatus = calculateHealthStatusMock.create(); calculateHealthStatus.mockImplementation(() => HealthStatus.OK); const { logHealthMetrics } = jest.requireMock('../lib/log_health_metrics'); @@ -86,19 +85,22 @@ describe('healthRoute', () => { id, timestamp: expect.any(String), status: expect.any(String), - ...ignoreCapacityEstimation(summarizeMonitoringStats(mockStat, getTaskManagerConfig({}))), + ...ignoreCapacityEstimation( + summarizeMonitoringStats(logger, mockStat, getTaskManagerConfig({})) + ), }); expect(logHealthMetrics.mock.calls[1][0]).toMatchObject({ id, timestamp: expect.any(String), status: expect.any(String), - ...ignoreCapacityEstimation(summarizeMonitoringStats(nextMockStat, getTaskManagerConfig({}))), + ...ignoreCapacityEstimation( + summarizeMonitoringStats(logger, nextMockStat, getTaskManagerConfig({})) + ), }); }); it(`logs at a warn level if the status is warning`, async () => { const router = httpServiceMock.createRouter(); - const logger = loggingSystemMock.create().get(); const calculateHealthStatus = calculateHealthStatusMock.create(); calculateHealthStatus.mockImplementation(() => HealthStatus.Warning); const { logHealthMetrics } = jest.requireMock('../lib/log_health_metrics'); @@ -137,7 +139,7 @@ describe('healthRoute', () => { timestamp: expect.any(String), status: expect.any(String), ...ignoreCapacityEstimation( - summarizeMonitoringStats(warnRuntimeStat, getTaskManagerConfig({})) + summarizeMonitoringStats(logger, warnRuntimeStat, getTaskManagerConfig({})) ), }); expect(logHealthMetrics.mock.calls[1][0]).toMatchObject({ @@ -145,7 +147,7 @@ describe('healthRoute', () => { timestamp: expect.any(String), status: expect.any(String), ...ignoreCapacityEstimation( - summarizeMonitoringStats(warnConfigurationStat, getTaskManagerConfig({})) + summarizeMonitoringStats(logger, warnConfigurationStat, getTaskManagerConfig({})) ), }); expect(logHealthMetrics.mock.calls[2][0]).toMatchObject({ @@ -153,14 +155,13 @@ describe('healthRoute', () => { timestamp: expect.any(String), status: expect.any(String), ...ignoreCapacityEstimation( - summarizeMonitoringStats(warnWorkloadStat, getTaskManagerConfig({})) + summarizeMonitoringStats(logger, warnWorkloadStat, getTaskManagerConfig({})) ), }); }); it(`logs at an error level if the status is error`, async () => { const router = httpServiceMock.createRouter(); - const logger = loggingSystemMock.create().get(); const calculateHealthStatus = calculateHealthStatusMock.create(); calculateHealthStatus.mockImplementation(() => HealthStatus.Error); const { logHealthMetrics } = jest.requireMock('../lib/log_health_metrics'); @@ -199,7 +200,7 @@ describe('healthRoute', () => { timestamp: expect.any(String), status: expect.any(String), ...ignoreCapacityEstimation( - summarizeMonitoringStats(errorRuntimeStat, getTaskManagerConfig({})) + summarizeMonitoringStats(logger, errorRuntimeStat, getTaskManagerConfig({})) ), }); expect(logHealthMetrics.mock.calls[1][0]).toMatchObject({ @@ -207,7 +208,7 @@ describe('healthRoute', () => { timestamp: expect.any(String), status: expect.any(String), ...ignoreCapacityEstimation( - summarizeMonitoringStats(errorConfigurationStat, getTaskManagerConfig({})) + summarizeMonitoringStats(logger, errorConfigurationStat, getTaskManagerConfig({})) ), }); expect(logHealthMetrics.mock.calls[2][0]).toMatchObject({ @@ -215,7 +216,7 @@ describe('healthRoute', () => { timestamp: expect.any(String), status: expect.any(String), ...ignoreCapacityEstimation( - summarizeMonitoringStats(errorWorkloadStat, getTaskManagerConfig({})) + summarizeMonitoringStats(logger, errorWorkloadStat, getTaskManagerConfig({})) ), }); }); @@ -228,7 +229,7 @@ describe('healthRoute', () => { const serviceStatus$ = healthRoute( router, stats$, - loggingSystemMock.create().get(), + logger, uuid.v4(), getTaskManagerConfig({ monitored_stats_required_freshness: 1000, @@ -246,7 +247,7 @@ describe('healthRoute', () => { stats$.next( mockHealthStats({ - last_update: new Date(Date.now() - 1500).toISOString(), + last_update: new Date(Date.now() - 3001).toISOString(), }) ); @@ -255,6 +256,7 @@ describe('healthRoute', () => { status: 'error', ...ignoreCapacityEstimation( summarizeMonitoringStats( + logger, mockHealthStats({ last_update: expect.any(String), stats: { @@ -281,9 +283,15 @@ describe('healthRoute', () => { }); expect(await serviceStatus).toMatchObject({ - level: ServiceStatusLevels.unavailable, - summary: 'Task Manager is unavailable', + level: ServiceStatusLevels.degraded, + summary: 'Task Manager is unhealthy', }); + const debugCalls = (logger as jest.Mocked).debug.mock.calls as string[][]; + const warnMessage = /^setting HealthStatus.Warning because assumedAverageRecurringRequiredThroughputPerMinutePerKibana/; + const found = debugCalls + .map((arr) => arr[0]) + .find((message) => message.match(warnMessage) != null); + expect(found).toMatch(warnMessage); }); it('returns a error status if the workload stats have not been updated within the required cold freshness', async () => { @@ -294,7 +302,7 @@ describe('healthRoute', () => { healthRoute( router, stats$, - loggingSystemMock.create().get(), + logger, uuid.v4(), getTaskManagerConfig({ monitored_stats_required_freshness: 5000, @@ -326,6 +334,7 @@ describe('healthRoute', () => { status: 'error', ...ignoreCapacityEstimation( summarizeMonitoringStats( + logger, mockHealthStats({ last_update: expect.any(String), stats: { @@ -359,7 +368,7 @@ describe('healthRoute', () => { healthRoute( router, stats$, - loggingSystemMock.create().get(), + logger, uuid.v4(), getTaskManagerConfig({ monitored_stats_required_freshness: 1000, @@ -370,7 +379,7 @@ describe('healthRoute', () => { await sleep(0); // eslint-disable-next-line @typescript-eslint/naming-convention - const last_successful_poll = new Date(Date.now() - 2000).toISOString(); + const last_successful_poll = new Date(Date.now() - 3001).toISOString(); stats$.next( mockHealthStats({ stats: { @@ -394,6 +403,7 @@ describe('healthRoute', () => { status: 'error', ...ignoreCapacityEstimation( summarizeMonitoringStats( + logger, mockHealthStats({ last_update: expect.any(String), stats: { diff --git a/x-pack/plugins/task_manager/server/routes/health.ts b/x-pack/plugins/task_manager/server/routes/health.ts index b5d8a23ba555..2f5ebaa7515e 100644 --- a/x-pack/plugins/task_manager/server/routes/health.ts +++ b/x-pack/plugins/task_manager/server/routes/health.ts @@ -59,8 +59,8 @@ export function healthRoute( const requiredHotStatsFreshness: number = config.monitored_stats_required_freshness; function getHealthStatus(monitoredStats: MonitoringStats) { - const summarizedStats = summarizeMonitoringStats(monitoredStats, config); - const status = calculateHealthStatus(summarizedStats, config); + const summarizedStats = summarizeMonitoringStats(logger, monitoredStats, config); + const status = calculateHealthStatus(summarizedStats, config, logger); const now = Date.now(); const timestamp = new Date(now).toISOString(); return { id: taskManagerId, timestamp, status, ...summarizedStats }; @@ -113,9 +113,7 @@ export function withServiceStatus( const level = monitoredHealth.status === HealthStatus.OK ? ServiceStatusLevels.available - : monitoredHealth.status === HealthStatus.Warning - ? ServiceStatusLevels.degraded - : ServiceStatusLevels.unavailable; + : ServiceStatusLevels.degraded; return [ monitoredHealth, {