Skip to content

Commit

Permalink
[task manager] provide better diagnostics when task manager performan…
Browse files Browse the repository at this point in the history
…ce is degraded (#109741) (#110870)

resolves #109095
resolves #106854

Changes the way task manager and alerting perform their health / status
checks:

- no longer sets an `unavailable` status; now uses `degraded` instead
- change task manager "hot stats freshness" calculation to allow for
  staler data before signalling a problem
- Changed the "Detected potential performance issue" message to sound
  less scary, include a doc link to task manager health monitoring, and
  log a debug instead of warning level
- add additional debug logging when task manager sets a status that's
  not `available`, indicating why it's setting that status (in the code,
  it's when task manager uses HealthStatus.Warning or Error)
  • Loading branch information
pmuellr authored Sep 1, 2021
1 parent 2c48a2f commit 680d45d
Show file tree
Hide file tree
Showing 14 changed files with 196 additions and 87 deletions.
8 changes: 4 additions & 4 deletions x-pack/plugins/alerting/server/health/get_state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
})
).toPromise();

expect(status.level).toEqual(ServiceStatusLevels.unavailable);
expect(status.summary).toEqual('Alerting framework is unavailable');
expect(status.level).toEqual(ServiceStatusLevels.degraded);
expect(status.summary).toEqual('Alerting framework is degraded');
expect(status.meta).toBeUndefined();
});

Expand Down Expand Up @@ -275,8 +275,8 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
}),
retryDelay
).subscribe((status) => {
expect(status.level).toEqual(ServiceStatusLevels.unavailable);
expect(status.summary).toEqual('Alerting framework is unavailable');
expect(status.level).toEqual(ServiceStatusLevels.degraded);
expect(status.summary).toEqual('Alerting framework is degraded');
expect(status.meta).toEqual({ error: err });
});

Expand Down
10 changes: 4 additions & 6 deletions x-pack/plugins/alerting/server/health/get_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ const getHealthServiceStatus = async (
const level =
doc.state?.health_status === HealthStatus.OK
? ServiceStatusLevels.available
: doc.state?.health_status === HealthStatus.Warning
? ServiceStatusLevels.degraded
: ServiceStatusLevels.unavailable;
: ServiceStatusLevels.degraded;
return {
level,
summary: LEVEL_SUMMARY[level.toString()],
Expand All @@ -102,10 +100,10 @@ export const getHealthServiceStatusWithRetryAndErrorHandling = (
);
}),
catchError((error) => {
logger.warn(`Alerting framework is unavailable due to the error: ${error}`);
logger.warn(`Alerting framework is degraded due to the error: ${error}`);
return of({
level: ServiceStatusLevels.unavailable,
summary: LEVEL_SUMMARY[ServiceStatusLevels.unavailable.toString()],
level: ServiceStatusLevels.degraded,
summary: LEVEL_SUMMARY[ServiceStatusLevels.degraded.toString()],
meta: { error },
});
})
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/alerting/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ export class AlertingPlugin {
);

const serviceStatus$ = new BehaviorSubject<ServiceStatus>({
level: ServiceStatusLevels.unavailable,
level: ServiceStatusLevels.degraded,
summary: 'Alerting is initializing',
});
core.status.set(serviceStatus$);
Expand Down
44 changes: 27 additions & 17 deletions x-pack/plugins/task_manager/server/lib/calculate_health_status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,42 @@ import { isString } from 'lodash';
import { JsonValue } from '@kbn/utility-types';
import { HealthStatus, RawMonitoringStats } from '../monitoring';
import { TaskManagerConfig } from '../config';
import { Logger } from '../../../../../src/core/server';

export function calculateHealthStatus(
summarizedStats: RawMonitoringStats,
config: TaskManagerConfig
config: TaskManagerConfig,
logger: Logger
): HealthStatus {
const now = Date.now();

// if "hot" health stats are any more stale than monitored_stats_required_freshness (pollInterval +1s buffer by default)
// consider the system unhealthy
const requiredHotStatsFreshness: number = config.monitored_stats_required_freshness;
// if "hot" health stats are any more stale than monitored_stats_required_freshness
// times a multiplier, consider the system unhealthy
const requiredHotStatsFreshness: number = config.monitored_stats_required_freshness * 3;

// if "cold" health stats are any more stale than the configured refresh (+ a buffer), consider the system unhealthy
// if "cold" health stats are any more stale than the configured refresh
// times a multiplier, consider the system unhealthy
const requiredColdStatsFreshness: number = config.monitored_aggregated_stats_refresh_rate * 1.5;

/**
* If the monitored stats aren't fresh, return a red status
*/
const healthStatus =
hasStatus(summarizedStats.stats, HealthStatus.Error) ||
hasExpiredHotTimestamps(summarizedStats, now, requiredHotStatsFreshness) ||
hasExpiredColdTimestamps(summarizedStats, now, requiredColdStatsFreshness)
? HealthStatus.Error
: hasStatus(summarizedStats.stats, HealthStatus.Warning)
? HealthStatus.Warning
: HealthStatus.OK;
return healthStatus;
if (hasStatus(summarizedStats.stats, HealthStatus.Error)) {
return HealthStatus.Error;
}

if (hasExpiredHotTimestamps(summarizedStats, now, requiredHotStatsFreshness)) {
logger.debug('setting HealthStatus.Error because of expired hot timestamps');
return HealthStatus.Error;
}

if (hasExpiredColdTimestamps(summarizedStats, now, requiredColdStatsFreshness)) {
logger.debug('setting HealthStatus.Error because of expired cold timestamps');
return HealthStatus.Error;
}

if (hasStatus(summarizedStats.stats, HealthStatus.Warning)) {
return HealthStatus.Warning;
}

return HealthStatus.OK;
}

function hasStatus(stats: RawMonitoringStats['stats'], status: HealthStatus): boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@ describe('logHealthMetrics', () => {
(calculateHealthStatus as jest.Mock<HealthStatus>).mockImplementation(() => HealthStatus.Error);
logHealthMetrics(health, logger, config);

expect((logger as jest.Mocked<Logger>).warn.mock.calls[0][0] as string).toBe(
`Detected potential performance issue with Task Manager. Set 'xpack.task_manager.monitored_stats_health_verbose_log.enabled: true' in your Kibana.yml to enable debug logging`
);
expect((logger as jest.Mocked<Logger>).warn.mock.calls[1][0] as string).toBe(
`Detected potential performance issue with Task Manager. Set 'xpack.task_manager.monitored_stats_health_verbose_log.enabled: true' in your Kibana.yml to enable debug logging`
);
const debugCalls = (logger as jest.Mocked<Logger>).debug.mock.calls;
const performanceMessage = /^Task Manager detected a degradation in performance/;
const lastStatsMessage = /^Latest Monitored Stats: \{.*\}$/;
expect(debugCalls[0][0] as string).toMatch(lastStatsMessage);
expect(debugCalls[1][0] as string).toMatch(lastStatsMessage);
expect(debugCalls[2][0] as string).toMatch(performanceMessage);
expect(debugCalls[3][0] as string).toMatch(lastStatsMessage);
expect(debugCalls[4][0] as string).toMatch(lastStatsMessage);
expect(debugCalls[5][0] as string).toMatch(performanceMessage);
});

it('should not log a warning message to enable verbose logging when the status goes from Warning to OK', () => {
Expand Down
10 changes: 6 additions & 4 deletions x-pack/plugins/task_manager/server/lib/log_health_metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
* 2.0.
*/

import { kibanaPackageJson } from '@kbn/utils';

import { isEmpty } from 'lodash';
import { Logger } from '../../../../../src/core/server';
import { HealthStatus } from '../monitoring';
Expand Down Expand Up @@ -36,14 +38,16 @@ export function logHealthMetrics(
capacity_estimation: undefined,
},
};
const statusWithoutCapacity = calculateHealthStatus(healthWithoutCapacity, config);
const statusWithoutCapacity = calculateHealthStatus(healthWithoutCapacity, config, logger);
if (statusWithoutCapacity === HealthStatus.Warning) {
logLevel = LogLevel.Warn;
} else if (statusWithoutCapacity === HealthStatus.Error && !isEmpty(monitoredHealth.stats)) {
logLevel = LogLevel.Error;
}

const message = `Latest Monitored Stats: ${JSON.stringify(monitoredHealth)}`;
const docLink = `https://www.elastic.co/guide/en/kibana/${kibanaPackageJson.branch}/task-manager-health-monitoring.html`;
const detectedProblemMessage = `Task Manager detected a degradation in performance. This is usually temporary, and Kibana can recover automatically. If the problem persists, check the docs for troubleshooting information: ${docLink} .`;
if (enabled) {
const driftInSeconds = (monitoredHealth.stats.runtime?.value.drift.p99 ?? 0) / 1000;
if (
Expand Down Expand Up @@ -80,9 +84,7 @@ export function logHealthMetrics(
// This is legacy support - we used to always show this
logger.debug(message);
if (logLevel !== LogLevel.Debug && lastLogLevel === LogLevel.Debug) {
logger.warn(
`Detected potential performance issue with Task Manager. Set 'xpack.task_manager.monitored_stats_health_verbose_log.enabled: true' in your Kibana.yml to enable debug logging`
);
logger.debug(detectedProblemMessage);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,19 @@

import { CapacityEstimationParams, estimateCapacity } from './capacity_estimation';
import { HealthStatus, RawMonitoringStats } from './monitoring_stats_stream';
import { mockLogger } from '../test_utils';

describe('estimateCapacity', () => {
const logger = mockLogger();

beforeAll(() => {
jest.resetAllMocks();
});

test('estimates the max throughput per minute based on the workload and the assumed kibana instances', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -67,6 +75,7 @@ describe('estimateCapacity', () => {
test('reduces the available capacity per kibana when average task duration exceeds the poll interval', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -124,6 +133,7 @@ describe('estimateCapacity', () => {
test('estimates the max throughput per minute when duration by persistence is empty', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -160,6 +170,7 @@ describe('estimateCapacity', () => {
test('estimates the max throughput per minute based on the workload and the assumed kibana instances when there are tasks that repeat each hour or day', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -215,6 +226,7 @@ describe('estimateCapacity', () => {
test('estimates the max throughput available when there are no active Kibana', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -271,6 +283,7 @@ describe('estimateCapacity', () => {
test('estimates the max throughput available to handle the workload when there are multiple active kibana instances', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -332,6 +345,7 @@ describe('estimateCapacity', () => {

expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -412,6 +426,7 @@ describe('estimateCapacity', () => {

expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -493,6 +508,7 @@ describe('estimateCapacity', () => {
test('marks estimated capacity as OK state when workload and load suggest capacity is sufficient', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -557,6 +573,7 @@ describe('estimateCapacity', () => {
test('marks estimated capacity as Warning state when capacity is insufficient for recent spikes of non-recurring workload, but sufficient for the recurring workload', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -618,6 +635,7 @@ describe('estimateCapacity', () => {
test('marks estimated capacity as Error state when workload and load suggest capacity is insufficient', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -679,6 +697,7 @@ describe('estimateCapacity', () => {
test('recommmends a 20% increase in kibana when a spike in non-recurring tasks forces recurring task capacity to zero', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -754,6 +773,7 @@ describe('estimateCapacity', () => {
test('recommmends a 20% increase in kibana when a spike in non-recurring tasks in a system with insufficient capacity even for recurring tasks', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { RawMonitoringStats, RawMonitoredStat, HealthStatus } from './monitoring
import { AveragedStat } from './task_run_calcultors';
import { TaskPersistenceTypes } from './task_run_statistics';
import { asErr, asOk, map, Result } from '../lib/result_type';
import { Logger } from '../../../../../src/core/server';

export interface CapacityEstimationStat extends JsonObject {
observed: {
Expand Down Expand Up @@ -44,6 +45,7 @@ function isCapacityEstimationParams(
}

export function estimateCapacity(
logger: Logger,
capacityStats: CapacityEstimationParams
): RawMonitoredStat<CapacityEstimationStat> {
const workload = capacityStats.workload.value;
Expand Down Expand Up @@ -183,13 +185,14 @@ export function estimateCapacity(
const assumedRequiredThroughputPerMinutePerKibana =
averageCapacityUsedByNonRecurringAndEphemeralTasksPerKibana +
averageRecurringRequiredPerMinute / assumedKibanaInstances;

const status = getHealthStatus(logger, {
assumedRequiredThroughputPerMinutePerKibana,
assumedAverageRecurringRequiredThroughputPerMinutePerKibana,
capacityPerMinutePerKibana,
});
return {
status:
assumedRequiredThroughputPerMinutePerKibana < capacityPerMinutePerKibana
? HealthStatus.OK
: assumedAverageRecurringRequiredThroughputPerMinutePerKibana < capacityPerMinutePerKibana
? HealthStatus.Warning
: HealthStatus.Error,
status,
timestamp: new Date().toISOString(),
value: {
observed: mapValues(
Expand Down Expand Up @@ -220,13 +223,43 @@ export function estimateCapacity(
};
}

interface GetHealthStatusParams {
assumedRequiredThroughputPerMinutePerKibana: number;
assumedAverageRecurringRequiredThroughputPerMinutePerKibana: number;
capacityPerMinutePerKibana: number;
}

function getHealthStatus(logger: Logger, params: GetHealthStatusParams): HealthStatus {
const {
assumedRequiredThroughputPerMinutePerKibana,
assumedAverageRecurringRequiredThroughputPerMinutePerKibana,
capacityPerMinutePerKibana,
} = params;
if (assumedRequiredThroughputPerMinutePerKibana < capacityPerMinutePerKibana) {
return HealthStatus.OK;
}

if (assumedAverageRecurringRequiredThroughputPerMinutePerKibana < capacityPerMinutePerKibana) {
logger.debug(
`setting HealthStatus.Warning because assumedAverageRecurringRequiredThroughputPerMinutePerKibana (${assumedAverageRecurringRequiredThroughputPerMinutePerKibana}) < capacityPerMinutePerKibana (${capacityPerMinutePerKibana})`
);
return HealthStatus.Warning;
}

logger.debug(
`setting HealthStatus.Error because assumedRequiredThroughputPerMinutePerKibana (${assumedRequiredThroughputPerMinutePerKibana}) >= capacityPerMinutePerKibana (${capacityPerMinutePerKibana}) AND assumedAverageRecurringRequiredThroughputPerMinutePerKibana (${assumedAverageRecurringRequiredThroughputPerMinutePerKibana}) >= capacityPerMinutePerKibana (${capacityPerMinutePerKibana})`
);
return HealthStatus.Error;
}

export function withCapacityEstimate(
logger: Logger,
monitoredStats: RawMonitoringStats['stats']
): RawMonitoringStats['stats'] {
if (isCapacityEstimationParams(monitoredStats)) {
return {
...monitoredStats,
capacity_estimation: estimateCapacity(monitoredStats),
capacity_estimation: estimateCapacity(logger, monitoredStats),
};
}
return monitoredStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,15 @@ export function createMonitoringStatsStream(
}

export function summarizeMonitoringStats(
logger: Logger,
{
// eslint-disable-next-line @typescript-eslint/naming-convention
last_update,
stats: { runtime, workload, configuration, ephemeral },
}: MonitoringStats,
config: TaskManagerConfig
): RawMonitoringStats {
const summarizedStats = withCapacityEstimate({
const summarizedStats = withCapacityEstimate(logger, {
...(configuration
? {
configuration: {
Expand All @@ -156,7 +157,7 @@ export function summarizeMonitoringStats(
? {
runtime: {
timestamp: runtime.timestamp,
...summarizeTaskRunStat(runtime.value, config),
...summarizeTaskRunStat(logger, runtime.value, config),
},
}
: {}),
Expand Down
Loading

0 comments on commit 680d45d

Please sign in to comment.