From 4fb0186073f28c0075f0ea20b914989fee8f2857 Mon Sep 17 00:00:00 2001 From: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Date: Thu, 4 Mar 2021 15:55:32 -0500 Subject: [PATCH] [Alerting] Improving health status check (#93282) (#93639) * wip * Moving catchError so observable stream does not complete. Adding retry on failure * Using retryWhen. Updating unit tests * PR fixes Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: ymao1 --- .../alerts/server/health/get_state.test.ts | 223 +++++++++++++----- .../plugins/alerts/server/health/get_state.ts | 72 ++++-- x-pack/plugins/alerts/server/plugin.ts | 5 +- 3 files changed, 220 insertions(+), 80 deletions(-) diff --git a/x-pack/plugins/alerts/server/health/get_state.test.ts b/x-pack/plugins/alerts/server/health/get_state.test.ts index 5dd58aba23911..7b36bf34377f7 100644 --- a/x-pack/plugins/alerts/server/health/get_state.test.ts +++ b/x-pack/plugins/alerts/server/health/get_state.test.ts @@ -5,73 +5,182 @@ * 2.0. */ +import { ServiceStatusLevels } from '../../../../../src/core/server'; import { taskManagerMock } from '../../../task_manager/server/mocks'; -import { getHealthStatusStream } from '.'; -import { TaskStatus } from '../../../task_manager/server'; +import { + getHealthStatusStream, + getHealthServiceStatusWithRetryAndErrorHandling, + MAX_RETRY_ATTEMPTS, +} from './get_state'; +import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager/server'; import { HealthStatus } from '../types'; -describe('getHealthStatusStream()', () => { - const mockTaskManager = taskManagerMock.createStart(); - - it('should return an object with the "unavailable" level and proper summary of "Alerting framework is unhealthy"', async () => { - mockTaskManager.get.mockReturnValue( - new Promise((_resolve, _reject) => { - return { - id: 'test', - attempts: 0, - status: TaskStatus.Running, - version: '123', - runAt: new Date(), - scheduledAt: new Date(), - startedAt: new Date(), - retryAt: new Date(Date.now() + 5 * 60 * 1000), - state: { - runs: 1, - health_status: HealthStatus.Warning, - }, - taskType: 'alerting:alerting_health_check', - params: { - alertId: '1', - }, - ownerId: null, - }; +const tick = () => new Promise((resolve) => setImmediate(resolve)); + +const getHealthCheckTask = (overrides = {}): ConcreteTaskInstance => ({ + id: 'test', + attempts: 0, + status: TaskStatus.Running, + version: '123', + runAt: new Date(), + scheduledAt: new Date(), + startedAt: new Date(), + retryAt: new Date(Date.now() + 5 * 60 * 1000), + state: { + runs: 1, + health_status: HealthStatus.OK, + }, + taskType: 'alerting:alerting_health_check', + params: { + alertId: '1', + }, + ownerId: null, + ...overrides, +}); + +describe('getHealthServiceStatusWithRetryAndErrorHandling', () => { + beforeEach(() => jest.useFakeTimers()); + + it('should get status at each interval', async () => { + const mockTaskManager = taskManagerMock.createStart(); + mockTaskManager.get.mockResolvedValue(getHealthCheckTask()); + const pollInterval = 100; + const halfInterval = Math.floor(pollInterval / 2); + + getHealthStatusStream(mockTaskManager, pollInterval).subscribe(); + + // shouldn't fire before poll interval passes + // should fire once each poll interval + jest.advanceTimersByTime(halfInterval); + expect(mockTaskManager.get).toHaveBeenCalledTimes(0); + jest.advanceTimersByTime(halfInterval); + expect(mockTaskManager.get).toHaveBeenCalledTimes(1); + jest.advanceTimersByTime(pollInterval); + expect(mockTaskManager.get).toHaveBeenCalledTimes(2); + jest.advanceTimersByTime(pollInterval); + expect(mockTaskManager.get).toHaveBeenCalledTimes(3); + }); + + it('should retry on error', async () => { + const mockTaskManager = taskManagerMock.createStart(); + mockTaskManager.get.mockRejectedValue(new Error('Failure')); + const retryDelay = 10; + const pollInterval = 100; + const halfInterval = Math.floor(pollInterval / 2); + + getHealthStatusStream(mockTaskManager, pollInterval, retryDelay).subscribe(); + + jest.advanceTimersByTime(halfInterval); + expect(mockTaskManager.get).toHaveBeenCalledTimes(0); + jest.advanceTimersByTime(halfInterval); + expect(mockTaskManager.get).toHaveBeenCalledTimes(1); + + // Retry on failure + let numTimesCalled = 1; + for (let i = 0; i < MAX_RETRY_ATTEMPTS; i++) { + await tick(); + jest.advanceTimersByTime(retryDelay); + expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled++ + 1); + } + + // Once we've exceeded max retries, should not try again + await tick(); + jest.advanceTimersByTime(retryDelay); + expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled); + + // Once another poll interval passes, should call fn again + await tick(); + jest.advanceTimersByTime(pollInterval - MAX_RETRY_ATTEMPTS * retryDelay); + expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled + 1); + }); + + it('should return healthy status when health status is "ok"', async () => { + const mockTaskManager = taskManagerMock.createStart(); + mockTaskManager.get.mockResolvedValue(getHealthCheckTask()); + + const status = await getHealthServiceStatusWithRetryAndErrorHandling( + mockTaskManager + ).toPromise(); + + expect(status.level).toEqual(ServiceStatusLevels.available); + expect(status.summary).toEqual('Alerting framework is available'); + }); + + it('should return degraded status when health status is "warn"', async () => { + const mockTaskManager = taskManagerMock.createStart(); + mockTaskManager.get.mockResolvedValue( + getHealthCheckTask({ + state: { + runs: 1, + health_status: HealthStatus.Warning, + }, }) ); - getHealthStatusStream(mockTaskManager).subscribe( - (val: { level: Readonly; summary: string }) => { - expect(val.level).toBe(false); - } - ); + + const status = await getHealthServiceStatusWithRetryAndErrorHandling( + mockTaskManager + ).toPromise(); + + expect(status.level).toEqual(ServiceStatusLevels.degraded); + expect(status.summary).toEqual('Alerting framework is degraded'); }); - it('should return an object with the "available" level and proper summary of "Alerting framework is healthy"', async () => { - mockTaskManager.get.mockReturnValue( - new Promise((_resolve, _reject) => { - return { - id: 'test', - attempts: 0, - status: TaskStatus.Running, - version: '123', - runAt: new Date(), - scheduledAt: new Date(), - startedAt: new Date(), - retryAt: new Date(Date.now() + 5 * 60 * 1000), - state: { - runs: 1, - health_status: HealthStatus.OK, - }, - taskType: 'alerting:alerting_health_check', - params: { - alertId: '1', - }, - ownerId: null, - }; + it('should return unavailable status when health status is "error"', async () => { + const mockTaskManager = taskManagerMock.createStart(); + mockTaskManager.get.mockResolvedValue( + getHealthCheckTask({ + state: { + runs: 1, + health_status: HealthStatus.Error, + }, }) ); - getHealthStatusStream(mockTaskManager).subscribe( - (val: { level: Readonly; summary: string }) => { - expect(val.level).toBe(true); + + const status = await getHealthServiceStatusWithRetryAndErrorHandling( + mockTaskManager + ).toPromise(); + + expect(status.level).toEqual(ServiceStatusLevels.unavailable); + expect(status.summary).toEqual('Alerting framework is unavailable'); + expect(status.meta).toBeUndefined(); + }); + + it('should retry on error and return healthy status if retry succeeds', async () => { + const retryDelay = 10; + const mockTaskManager = taskManagerMock.createStart(); + mockTaskManager.get + .mockRejectedValueOnce(new Error('Failure')) + .mockResolvedValue(getHealthCheckTask()); + + getHealthServiceStatusWithRetryAndErrorHandling(mockTaskManager, retryDelay).subscribe( + (status) => { + expect(status.level).toEqual(ServiceStatusLevels.available); + expect(status.summary).toEqual('Alerting framework is available'); + } + ); + + await tick(); + jest.advanceTimersByTime(retryDelay * 2); + }); + + it('should retry on error and return unavailable status if retry fails', async () => { + const retryDelay = 10; + const err = new Error('Failure'); + const mockTaskManager = taskManagerMock.createStart(); + mockTaskManager.get.mockRejectedValue(err); + + getHealthServiceStatusWithRetryAndErrorHandling(mockTaskManager, retryDelay).subscribe( + (status) => { + expect(status.level).toEqual(ServiceStatusLevels.unavailable); + expect(status.summary).toEqual('Alerting framework is unavailable'); + expect(status.meta).toEqual({ error: err }); } ); + + for (let i = 0; i < MAX_RETRY_ATTEMPTS + 1; i++) { + await tick(); + jest.advanceTimersByTime(retryDelay); + } + expect(mockTaskManager.get).toHaveBeenCalledTimes(MAX_RETRY_ATTEMPTS + 1); }); }); diff --git a/x-pack/plugins/alerts/server/health/get_state.ts b/x-pack/plugins/alerts/server/health/get_state.ts index 4f9f2f73b63bd..49604462662d5 100644 --- a/x-pack/plugins/alerts/server/health/get_state.ts +++ b/x-pack/plugins/alerts/server/health/get_state.ts @@ -6,13 +6,17 @@ */ import { i18n } from '@kbn/i18n'; -import { interval, Observable } from 'rxjs'; -import { catchError, switchMap } from 'rxjs/operators'; +import { defer, of, interval, Observable, throwError, timer } from 'rxjs'; +import { catchError, mergeMap, retryWhen, switchMap } from 'rxjs/operators'; import { ServiceStatus, ServiceStatusLevels } from '../../../../../src/core/server'; import { TaskManagerStartContract } from '../../../task_manager/server'; import { HEALTH_TASK_ID } from './task'; import { HealthStatus } from '../types'; +export const MAX_RETRY_ATTEMPTS = 3; +const HEALTH_STATUS_INTERVAL = 60000 * 5; // Five minutes +const RETRY_DELAY = 5000; // Wait 5 seconds before retrying on errors + async function getLatestTaskState(taskManager: TaskManagerStartContract) { try { const result = await taskManager.get(HEALTH_TASK_ID); @@ -48,27 +52,53 @@ const LEVEL_SUMMARY = { ), }; -export const getHealthStatusStream = ( +const getHealthServiceStatus = async ( taskManager: TaskManagerStartContract +): Promise> => { + const doc = await getLatestTaskState(taskManager); + const level = + doc?.state?.health_status === HealthStatus.OK + ? ServiceStatusLevels.available + : doc?.state?.health_status === HealthStatus.Warning + ? ServiceStatusLevels.degraded + : ServiceStatusLevels.unavailable; + return { + level, + summary: LEVEL_SUMMARY[level.toString()], + }; +}; + +export const getHealthServiceStatusWithRetryAndErrorHandling = ( + taskManager: TaskManagerStartContract, + retryDelay?: number ): Observable> => { - return interval(60000 * 5).pipe( - switchMap(async () => { - const doc = await getLatestTaskState(taskManager); - const level = - doc?.state?.health_status === HealthStatus.OK - ? ServiceStatusLevels.available - : doc?.state?.health_status === HealthStatus.Warning - ? ServiceStatusLevels.degraded - : ServiceStatusLevels.unavailable; - return { - level, - summary: LEVEL_SUMMARY[level.toString()], - }; + return defer(() => getHealthServiceStatus(taskManager)).pipe( + retryWhen((errors) => { + return errors.pipe( + mergeMap((error, i) => { + const retryAttempt = i + 1; + if (retryAttempt > MAX_RETRY_ATTEMPTS) { + return throwError(error); + } + return timer(retryDelay ?? RETRY_DELAY); + }) + ); }), - catchError(async (error) => ({ - level: ServiceStatusLevels.unavailable, - summary: LEVEL_SUMMARY[ServiceStatusLevels.unavailable.toString()], - meta: { error }, - })) + catchError((error) => { + return of({ + level: ServiceStatusLevels.unavailable, + summary: LEVEL_SUMMARY[ServiceStatusLevels.unavailable.toString()], + meta: { error }, + }); + }) ); }; + +export const getHealthStatusStream = ( + taskManager: TaskManagerStartContract, + healthStatusInterval?: number, + retryDelay?: number +): Observable> => + interval(healthStatusInterval ?? HEALTH_STATUS_INTERVAL).pipe( + switchMap(() => getHealthServiceStatusWithRetryAndErrorHandling(taskManager, retryDelay)) + ); diff --git a/x-pack/plugins/alerts/server/plugin.ts b/x-pack/plugins/alerts/server/plugin.ts index 5a0745d3f00b7..d85622f301171 100644 --- a/x-pack/plugins/alerts/server/plugin.ts +++ b/x-pack/plugins/alerts/server/plugin.ts @@ -6,7 +6,7 @@ */ import type { PublicMethodsOf } from '@kbn/utility-types'; -import { first, map } from 'rxjs/operators'; +import { first, map, share } from 'rxjs/operators'; import { Observable } from 'rxjs'; import { UsageCollectionSetup } from 'src/plugins/usage_collection/server'; import { combineLatest } from 'rxjs'; @@ -251,7 +251,8 @@ export class AlertingPlugin { } else { return derivedStatus; } - }) + }), + share() ) ); });