Skip to content

Commit

Permalink
[7.9] [Ingest Manager] Fix limited concurrency helper (#73976) (#74043)
Browse files Browse the repository at this point in the history
* [Ingest Manager] Fix limited concurrency helper (#73976)

# Conflicts:
#	x-pack/plugins/ingest_manager/server/routes/limited_concurrency.test.ts
#	x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts

* Fix types
  • Loading branch information
nchaulet authored Aug 1, 2020
1 parent 7aea62b commit 1e766aa
Showing 1 changed file with 32 additions and 17 deletions.
49 changes: 32 additions & 17 deletions x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import {
} from 'kibana/server';
import { LIMITED_CONCURRENCY_ROUTE_TAG } from '../../common';
import { IngestManagerConfigType } from '../index';
class MaxCounter {

export class MaxCounter {
constructor(private readonly max: number = 1) {}
private counter = 0;
valueOf() {
Expand All @@ -33,40 +34,54 @@ class MaxCounter {
}
}

function shouldHandleRequest(request: KibanaRequest) {
export type IMaxCounter = Pick<MaxCounter, 'increase' | 'decrease' | 'lessThanMax'>;

export function isLimitedRoute(request: KibanaRequest) {
const tags = request.route.options.tags;
return tags.includes(LIMITED_CONCURRENCY_ROUTE_TAG);
return !!tags.includes(LIMITED_CONCURRENCY_ROUTE_TAG);
}

export function registerLimitedConcurrencyRoutes(core: CoreSetup, config: IngestManagerConfigType) {
const max = config.fleet.maxConcurrentConnections;
if (!max) return;

const counter = new MaxCounter(max);
core.http.registerOnPreAuth(function preAuthHandler(
export function createLimitedPreAuthHandler({
isMatch,
maxCounter,
}: {
isMatch: (request: KibanaRequest) => boolean;
maxCounter: IMaxCounter;
}) {
return function preAuthHandler(
request: KibanaRequest,
response: LifecycleResponseFactory,
toolkit: OnPreAuthToolkit
) {
if (!shouldHandleRequest(request)) {
if (!isMatch(request)) {
return toolkit.next();
}

if (!counter.lessThanMax()) {
if (!maxCounter.lessThanMax()) {
return response.customError({
body: 'Too Many Requests',
statusCode: 429,
});
}

counter.increase();
maxCounter.increase();

// requests.events.aborted$ has a bug (but has test which explicitly verifies) where it's fired even when the request completes
// https://github.com/elastic/kibana/pull/70495#issuecomment-656288766
request.events.aborted$.toPromise().then(() => {
counter.decrease();
request.events.completed$.toPromise().then(() => {
maxCounter.decrease();
});

return toolkit.next();
});
};
}

export function registerLimitedConcurrencyRoutes(core: CoreSetup, config: IngestManagerConfigType) {
const max = config.fleet.maxConcurrentConnections;
if (!max) return;

core.http.registerOnPreAuth(
createLimitedPreAuthHandler({
isMatch: isLimitedRoute,
maxCounter: new MaxCounter(max),
})
);
}

0 comments on commit 1e766aa

Please sign in to comment.