From e4d7dde2be0c8156b75c4297af2477ce3ca772eb Mon Sep 17 00:00:00 2001 From: Nicolas Chaulet Date: Fri, 31 Jul 2020 14:55:29 -0400 Subject: [PATCH] [Ingest Manager] Fix limited concurrency helper (#73976) # Conflicts: # x-pack/plugins/ingest_manager/server/routes/limited_concurrency.test.ts # x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts --- .../server/routes/limited_concurrency.test.ts | 188 +++++++++++++++++- .../server/routes/limited_concurrency.ts | 49 +++-- 2 files changed, 218 insertions(+), 19 deletions(-) diff --git a/x-pack/plugins/ingest_manager/server/routes/limited_concurrency.test.ts b/x-pack/plugins/ingest_manager/server/routes/limited_concurrency.test.ts index a0bb8e9b86fbb..e5b5a83743287 100644 --- a/x-pack/plugins/ingest_manager/server/routes/limited_concurrency.test.ts +++ b/x-pack/plugins/ingest_manager/server/routes/limited_concurrency.test.ts @@ -4,8 +4,12 @@ * you may not use this file except in compliance with the Elastic License. */ -import { coreMock } from 'src/core/server/mocks'; -import { registerLimitedConcurrencyRoutes } from './limited_concurrency'; +import { coreMock, httpServerMock, httpServiceMock } from 'src/core/server/mocks'; +import { + createLimitedPreAuthHandler, + isLimitedRoute, + registerLimitedConcurrencyRoutes, +} from './limited_concurrency'; import { IngestManagerConfigType } from '../index'; describe('registerLimitedConcurrencyRoutes', () => { @@ -33,3 +37,183 @@ describe('registerLimitedConcurrencyRoutes', () => { expect(mockSetup.http.registerOnPreAuth).toHaveBeenCalledTimes(1); }); }); + +// assertions for calls to .decrease are commented out because it's called on the +// "req.events.completed$ observable (which) will never emit from a mocked request in a jest unit test environment" +// https://github.com/elastic/kibana/pull/72338#issuecomment-661908791 +describe('preAuthHandler', () => { + test(`ignores routes when !isMatch`, async () => { + const mockMaxCounter = { + increase: jest.fn(), + decrease: jest.fn(), + lessThanMax: jest.fn(), + }; + const preAuthHandler = createLimitedPreAuthHandler({ + isMatch: jest.fn().mockImplementation(() => false), + maxCounter: mockMaxCounter, + }); + + const mockRequest = httpServerMock.createKibanaRequest({ + path: '/no/match', + }); + const mockResponse = httpServerMock.createResponseFactory(); + const mockPreAuthToolkit = httpServiceMock.createOnPreAuthToolkit(); + + await preAuthHandler(mockRequest, mockResponse, mockPreAuthToolkit); + + expect(mockMaxCounter.increase).not.toHaveBeenCalled(); + expect(mockMaxCounter.decrease).not.toHaveBeenCalled(); + expect(mockMaxCounter.lessThanMax).not.toHaveBeenCalled(); + expect(mockPreAuthToolkit.next).toHaveBeenCalledTimes(1); + }); + + test(`ignores routes which don't have the correct tag`, async () => { + const mockMaxCounter = { + increase: jest.fn(), + decrease: jest.fn(), + lessThanMax: jest.fn(), + }; + const preAuthHandler = createLimitedPreAuthHandler({ + isMatch: isLimitedRoute, + maxCounter: mockMaxCounter, + }); + + const mockRequest = httpServerMock.createKibanaRequest({ + path: '/no/match', + }); + const mockResponse = httpServerMock.createResponseFactory(); + const mockPreAuthToolkit = httpServiceMock.createOnPreAuthToolkit(); + + await preAuthHandler(mockRequest, mockResponse, mockPreAuthToolkit); + + expect(mockMaxCounter.increase).not.toHaveBeenCalled(); + expect(mockMaxCounter.decrease).not.toHaveBeenCalled(); + expect(mockMaxCounter.lessThanMax).not.toHaveBeenCalled(); + expect(mockPreAuthToolkit.next).toHaveBeenCalledTimes(1); + }); + + test(`processes routes which have the correct tag`, async () => { + const mockMaxCounter = { + increase: jest.fn(), + decrease: jest.fn(), + lessThanMax: jest.fn().mockImplementation(() => true), + }; + const preAuthHandler = createLimitedPreAuthHandler({ + isMatch: isLimitedRoute, + maxCounter: mockMaxCounter, + }); + + const mockRequest = httpServerMock.createKibanaRequest({ + path: '/should/match', + routeTags: ['ingest:limited-concurrency'], + }); + const mockResponse = httpServerMock.createResponseFactory(); + const mockPreAuthToolkit = httpServiceMock.createOnPreAuthToolkit(); + + await preAuthHandler(mockRequest, mockResponse, mockPreAuthToolkit); + + // will call lessThanMax because isMatch succeeds + expect(mockMaxCounter.lessThanMax).toHaveBeenCalledTimes(1); + // will not error because lessThanMax is true + expect(mockResponse.customError).not.toHaveBeenCalled(); + expect(mockPreAuthToolkit.next).toHaveBeenCalledTimes(1); + }); + + test(`updates the counter when isMatch & lessThanMax`, async () => { + const mockMaxCounter = { + increase: jest.fn(), + decrease: jest.fn(), + lessThanMax: jest.fn().mockImplementation(() => true), + }; + const preAuthHandler = createLimitedPreAuthHandler({ + isMatch: jest.fn().mockImplementation(() => true), + maxCounter: mockMaxCounter, + }); + + const mockRequest = httpServerMock.createKibanaRequest(); + const mockResponse = httpServerMock.createResponseFactory(); + const mockPreAuthToolkit = httpServiceMock.createOnPreAuthToolkit(); + + await preAuthHandler(mockRequest, mockResponse, mockPreAuthToolkit); + + expect(mockMaxCounter.increase).toHaveBeenCalled(); + // expect(mockMaxCounter.decrease).toHaveBeenCalled(); + expect(mockPreAuthToolkit.next).toHaveBeenCalledTimes(1); + }); + + test(`lessThanMax ? next : error`, async () => { + const mockMaxCounter = { + increase: jest.fn(), + decrease: jest.fn(), + lessThanMax: jest + .fn() + // call 1 + .mockImplementationOnce(() => true) + // calls 2, 3, 4 + .mockImplementationOnce(() => false) + .mockImplementationOnce(() => false) + .mockImplementationOnce(() => false) + // calls 5+ + .mockImplementationOnce(() => true) + .mockImplementation(() => true), + }; + + const preAuthHandler = createLimitedPreAuthHandler({ + isMatch: isLimitedRoute, + maxCounter: mockMaxCounter, + }); + + function makeRequestExpectNext() { + const request = httpServerMock.createKibanaRequest({ + path: '/should/match/', + routeTags: ['ingest:limited-concurrency'], + }); + const response = httpServerMock.createResponseFactory(); + const toolkit = httpServiceMock.createOnPreAuthToolkit(); + + preAuthHandler(request, response, toolkit); + expect(toolkit.next).toHaveBeenCalledTimes(1); + expect(response.customError).not.toHaveBeenCalled(); + } + + function makeRequestExpectError() { + const request = httpServerMock.createKibanaRequest({ + path: '/should/match/', + routeTags: ['ingest:limited-concurrency'], + }); + const response = httpServerMock.createResponseFactory(); + const toolkit = httpServiceMock.createOnPreAuthToolkit(); + + preAuthHandler(request, response, toolkit); + expect(toolkit.next).not.toHaveBeenCalled(); + expect(response.customError).toHaveBeenCalledTimes(1); + expect(response.customError).toHaveBeenCalledWith({ + statusCode: 429, + body: 'Too Many Requests', + }); + } + + // request 1 succeeds + makeRequestExpectNext(); + expect(mockMaxCounter.increase).toHaveBeenCalledTimes(1); + // expect(mockMaxCounter.decrease).toHaveBeenCalledTimes(1); + + // requests 2, 3, 4 fail + makeRequestExpectError(); + makeRequestExpectError(); + makeRequestExpectError(); + + // requests 5+ succeed + makeRequestExpectNext(); + expect(mockMaxCounter.increase).toHaveBeenCalledTimes(2); + // expect(mockMaxCounter.decrease).toHaveBeenCalledTimes(2); + + makeRequestExpectNext(); + expect(mockMaxCounter.increase).toHaveBeenCalledTimes(3); + // expect(mockMaxCounter.decrease).toHaveBeenCalledTimes(3); + + makeRequestExpectNext(); + expect(mockMaxCounter.increase).toHaveBeenCalledTimes(4); + // expect(mockMaxCounter.decrease).toHaveBeenCalledTimes(4); + }); +}); diff --git a/x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts b/x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts index ec8e2f6c8d436..7ba8e151b726c 100644 --- a/x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts +++ b/x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts @@ -12,7 +12,8 @@ import { } from 'kibana/server'; import { LIMITED_CONCURRENCY_ROUTE_TAG } from '../../common'; import { IngestManagerConfigType } from '../index'; -class MaxCounter { + +export class MaxCounter { constructor(private readonly max: number = 1) {} private counter = 0; valueOf() { @@ -33,40 +34,54 @@ class MaxCounter { } } -function shouldHandleRequest(request: KibanaRequest) { +export type IMaxCounter = Pick; + +export function isLimitedRoute(request: KibanaRequest) { const tags = request.route.options.tags; - return tags.includes(LIMITED_CONCURRENCY_ROUTE_TAG); + return !!tags.includes(LIMITED_CONCURRENCY_ROUTE_TAG); } -export function registerLimitedConcurrencyRoutes(core: CoreSetup, config: IngestManagerConfigType) { - const max = config.fleet.maxConcurrentConnections; - if (!max) return; - - const counter = new MaxCounter(max); - core.http.registerOnPreAuth(function preAuthHandler( +export function createLimitedPreAuthHandler({ + isMatch, + maxCounter, +}: { + isMatch: (request: KibanaRequest) => boolean; + maxCounter: IMaxCounter; +}) { + return function preAuthHandler( request: KibanaRequest, response: LifecycleResponseFactory, toolkit: OnPreAuthToolkit ) { - if (!shouldHandleRequest(request)) { + if (!isMatch(request)) { return toolkit.next(); } - if (!counter.lessThanMax()) { + if (!maxCounter.lessThanMax()) { return response.customError({ body: 'Too Many Requests', statusCode: 429, }); } - counter.increase(); + maxCounter.increase(); - // requests.events.aborted$ has a bug (but has test which explicitly verifies) where it's fired even when the request completes - // https://github.com/elastic/kibana/pull/70495#issuecomment-656288766 - request.events.aborted$.toPromise().then(() => { - counter.decrease(); + request.events.completed$.toPromise().then(() => { + maxCounter.decrease(); }); return toolkit.next(); - }); + }; +} + +export function registerLimitedConcurrencyRoutes(core: CoreSetup, config: IngestManagerConfigType) { + const max = config.fleet.maxConcurrentConnections; + if (!max) return; + + core.http.registerOnPreAuth( + createLimitedPreAuthHandler({ + isMatch: isLimitedRoute, + maxCounter: new MaxCounter(max), + }) + ); }