From 8001d72309aad81fbe169300ebf143af03046147 Mon Sep 17 00:00:00 2001 From: John Schulz Date: Tue, 14 Jul 2020 09:31:19 -0400 Subject: [PATCH] Add config flag. Increase default value. --- .../ingest_manager/common/types/index.ts | 1 + x-pack/plugins/ingest_manager/server/index.ts | 1 + .../plugins/ingest_manager/server/plugin.ts | 4 +- .../server/routes/global_interceptors.ts | 63 ---------------- .../ingest_manager/server/routes/index.ts | 2 +- .../server/routes/limited_concurrency.ts | 72 +++++++++++++++++++ 6 files changed, 77 insertions(+), 66 deletions(-) delete mode 100644 x-pack/plugins/ingest_manager/server/routes/global_interceptors.ts create mode 100644 x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts diff --git a/x-pack/plugins/ingest_manager/common/types/index.ts b/x-pack/plugins/ingest_manager/common/types/index.ts index 0fce5cfa6226f..d7edc04a35799 100644 --- a/x-pack/plugins/ingest_manager/common/types/index.ts +++ b/x-pack/plugins/ingest_manager/common/types/index.ts @@ -13,6 +13,7 @@ export interface IngestManagerConfigType { enabled: boolean; tlsCheckDisabled: boolean; pollingRequestTimeout: number; + maxConcurrentConnections: number; kibana: { host?: string; ca_sha256?: string; diff --git a/x-pack/plugins/ingest_manager/server/index.ts b/x-pack/plugins/ingest_manager/server/index.ts index 1823cc3561693..10f2097a47e78 100644 --- a/x-pack/plugins/ingest_manager/server/index.ts +++ b/x-pack/plugins/ingest_manager/server/index.ts @@ -26,6 +26,7 @@ export const config = { enabled: schema.boolean({ defaultValue: true }), tlsCheckDisabled: schema.boolean({ defaultValue: false }), pollingRequestTimeout: schema.number({ defaultValue: 60000 }), + maxConcurrentConnections: schema.number({ defaultValue: 750 }), kibana: schema.object({ host: schema.maybe(schema.string()), ca_sha256: schema.maybe(schema.string()), diff --git a/x-pack/plugins/ingest_manager/server/plugin.ts b/x-pack/plugins/ingest_manager/server/plugin.ts index be96ea3fc4ef8..69af475886bb9 100644 --- a/x-pack/plugins/ingest_manager/server/plugin.ts +++ b/x-pack/plugins/ingest_manager/server/plugin.ts @@ -34,7 +34,7 @@ import { } from './constants'; import { registerSavedObjects, registerEncryptedSavedObjects } from './saved_objects'; import { - preAuthHandler, + registerLimitedConcurrencyRoutes, registerEPMRoutes, registerPackageConfigRoutes, registerDataStreamRoutes, @@ -231,7 +231,7 @@ export class IngestManagerPlugin } else { // we currently only use this global interceptor if fleet is enabled // since it would run this func on *every* req (other plugins, CSS, etc) - this.httpSetup.registerOnPreAuth(preAuthHandler); + registerLimitedConcurrencyRoutes(core, config); registerAgentRoutes(router); registerEnrollmentApiKeyRoutes(router); registerInstallScriptRoutes({ diff --git a/x-pack/plugins/ingest_manager/server/routes/global_interceptors.ts b/x-pack/plugins/ingest_manager/server/routes/global_interceptors.ts deleted file mode 100644 index a2e0cc446827e..0000000000000 --- a/x-pack/plugins/ingest_manager/server/routes/global_interceptors.ts +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { KibanaRequest, LifecycleResponseFactory, OnPreAuthToolkit } from 'kibana/server'; -import { LIMITED_CONCURRENCY_ROUTE_TAG } from '../../common'; - -class MaxCounter { - constructor(private readonly max: number = 1) {} - private counter = 0; - valueOf() { - return this.counter; - } - increase() { - if (this.counter < this.max) { - this.counter += 1; - } - } - decrease() { - this.counter += 1; - } - lessThanMax() { - return this.counter < this.max; - } -} - -function shouldHandleRequest(request: KibanaRequest) { - const tags = request.route.options.tags; - return tags.includes(LIMITED_CONCURRENCY_ROUTE_TAG); -} - -const LIMITED_CONCURRENCY_MAX_REQUESTS = 250; -const counter = new MaxCounter(LIMITED_CONCURRENCY_MAX_REQUESTS); - -export function preAuthHandler( - request: KibanaRequest, - response: LifecycleResponseFactory, - toolkit: OnPreAuthToolkit -) { - if (!shouldHandleRequest(request)) { - return toolkit.next(); - } - - if (!counter.lessThanMax()) { - return response.customError({ - body: 'Too Many Agents', - statusCode: 503, - headers: { - 'Retry-After': '30', - }, - }); - } - - counter.increase(); - - // requests.events.aborted$ has a bug where it's fired even when the request completes... - // we can take advantage of this bug just for load testing... - request.events.aborted$.toPromise().then(() => counter.decrease()); - - return toolkit.next(); -} diff --git a/x-pack/plugins/ingest_manager/server/routes/index.ts b/x-pack/plugins/ingest_manager/server/routes/index.ts index 076ea8c07b063..87be3a80cea96 100644 --- a/x-pack/plugins/ingest_manager/server/routes/index.ts +++ b/x-pack/plugins/ingest_manager/server/routes/index.ts @@ -14,4 +14,4 @@ export { registerRoutes as registerInstallScriptRoutes } from './install_script' export { registerRoutes as registerOutputRoutes } from './output'; export { registerRoutes as registerSettingsRoutes } from './settings'; export { registerRoutes as registerAppRoutes } from './app'; -export { preAuthHandler } from './global_interceptors'; +export { registerLimitedConcurrencyRoutes } from './limited_concurrency'; diff --git a/x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts b/x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts new file mode 100644 index 0000000000000..ec8e2f6c8d436 --- /dev/null +++ b/x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { + CoreSetup, + KibanaRequest, + LifecycleResponseFactory, + OnPreAuthToolkit, +} from 'kibana/server'; +import { LIMITED_CONCURRENCY_ROUTE_TAG } from '../../common'; +import { IngestManagerConfigType } from '../index'; +class MaxCounter { + constructor(private readonly max: number = 1) {} + private counter = 0; + valueOf() { + return this.counter; + } + increase() { + if (this.counter < this.max) { + this.counter += 1; + } + } + decrease() { + if (this.counter > 0) { + this.counter -= 1; + } + } + lessThanMax() { + return this.counter < this.max; + } +} + +function shouldHandleRequest(request: KibanaRequest) { + const tags = request.route.options.tags; + return tags.includes(LIMITED_CONCURRENCY_ROUTE_TAG); +} + +export function registerLimitedConcurrencyRoutes(core: CoreSetup, config: IngestManagerConfigType) { + const max = config.fleet.maxConcurrentConnections; + if (!max) return; + + const counter = new MaxCounter(max); + core.http.registerOnPreAuth(function preAuthHandler( + request: KibanaRequest, + response: LifecycleResponseFactory, + toolkit: OnPreAuthToolkit + ) { + if (!shouldHandleRequest(request)) { + return toolkit.next(); + } + + if (!counter.lessThanMax()) { + return response.customError({ + body: 'Too Many Requests', + statusCode: 429, + }); + } + + counter.increase(); + + // requests.events.aborted$ has a bug (but has test which explicitly verifies) where it's fired even when the request completes + // https://github.com/elastic/kibana/pull/70495#issuecomment-656288766 + request.events.aborted$.toPromise().then(() => { + counter.decrease(); + }); + + return toolkit.next(); + }); +}