From 1e766aa6e16f9ffab097183c4c0c5c8104dced2c Mon Sep 17 00:00:00 2001 From: Nicolas Chaulet Date: Sat, 1 Aug 2020 13:34:52 -0400 Subject: [PATCH] [7.9] [Ingest Manager] Fix limited concurrency helper (#73976) (#74043) * [Ingest Manager] Fix limited concurrency helper (#73976) # Conflicts: # x-pack/plugins/ingest_manager/server/routes/limited_concurrency.test.ts # x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts * Fix types --- .../server/routes/limited_concurrency.ts | 49 ++++++++++++------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts b/x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts index ec8e2f6c8d436..7ba8e151b726c 100644 --- a/x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts +++ b/x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts @@ -12,7 +12,8 @@ import { } from 'kibana/server'; import { LIMITED_CONCURRENCY_ROUTE_TAG } from '../../common'; import { IngestManagerConfigType } from '../index'; -class MaxCounter { + +export class MaxCounter { constructor(private readonly max: number = 1) {} private counter = 0; valueOf() { @@ -33,40 +34,54 @@ class MaxCounter { } } -function shouldHandleRequest(request: KibanaRequest) { +export type IMaxCounter = Pick; + +export function isLimitedRoute(request: KibanaRequest) { const tags = request.route.options.tags; - return tags.includes(LIMITED_CONCURRENCY_ROUTE_TAG); + return !!tags.includes(LIMITED_CONCURRENCY_ROUTE_TAG); } -export function registerLimitedConcurrencyRoutes(core: CoreSetup, config: IngestManagerConfigType) { - const max = config.fleet.maxConcurrentConnections; - if (!max) return; - - const counter = new MaxCounter(max); - core.http.registerOnPreAuth(function preAuthHandler( +export function createLimitedPreAuthHandler({ + isMatch, + maxCounter, +}: { + isMatch: (request: KibanaRequest) => boolean; + maxCounter: IMaxCounter; +}) { + return function preAuthHandler( request: KibanaRequest, response: LifecycleResponseFactory, toolkit: OnPreAuthToolkit ) { - if (!shouldHandleRequest(request)) { + if (!isMatch(request)) { return toolkit.next(); } - if (!counter.lessThanMax()) { + if (!maxCounter.lessThanMax()) { return response.customError({ body: 'Too Many Requests', statusCode: 429, }); } - counter.increase(); + maxCounter.increase(); - // requests.events.aborted$ has a bug (but has test which explicitly verifies) where it's fired even when the request completes - // https://github.com/elastic/kibana/pull/70495#issuecomment-656288766 - request.events.aborted$.toPromise().then(() => { - counter.decrease(); + request.events.completed$.toPromise().then(() => { + maxCounter.decrease(); }); return toolkit.next(); - }); + }; +} + +export function registerLimitedConcurrencyRoutes(core: CoreSetup, config: IngestManagerConfigType) { + const max = config.fleet.maxConcurrentConnections; + if (!max) return; + + core.http.registerOnPreAuth( + createLimitedPreAuthHandler({ + isMatch: isLimitedRoute, + maxCounter: new MaxCounter(max), + }) + ); }