Skip to content

Commit

Permalink
Use updated onPreAuth from Platform (elastic#71552)
Browse files Browse the repository at this point in the history
* Use updated onPreAuth from Platform

* Add config flag. Increase default value.

* Set max connections flag default to 0 (disabled)

* Don't use limiting logic on checkin route

* Confirm preAuth handler only added when max > 0

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
John Schulz and elasticmachine committed Jul 14, 2020
1 parent c89f3f8 commit 5af7d21
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 3 deletions.
2 changes: 2 additions & 0 deletions x-pack/plugins/ingest_manager/common/constants/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export const PACKAGE_CONFIG_API_ROOT = `${API_ROOT}/package_configs`;
export const AGENT_CONFIG_API_ROOT = `${API_ROOT}/agent_configs`;
export const FLEET_API_ROOT = `${API_ROOT}/fleet`;

export const LIMITED_CONCURRENCY_ROUTE_TAG = 'ingest:limited-concurrency';

// EPM API routes
const EPM_PACKAGES_MANY = `${EPM_API_ROOT}/packages`;
const EPM_PACKAGES_ONE = `${EPM_PACKAGES_MANY}/{pkgkey}`;
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/ingest_manager/common/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export interface IngestManagerConfigType {
enabled: boolean;
tlsCheckDisabled: boolean;
pollingRequestTimeout: number;
maxConcurrentConnections: number;
kibana: {
host?: string;
ca_sha256?: string;
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/ingest_manager/server/constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export {
AGENT_UPDATE_ACTIONS_INTERVAL_MS,
INDEX_PATTERN_PLACEHOLDER_SUFFIX,
// Routes
LIMITED_CONCURRENCY_ROUTE_TAG,
PLUGIN_ID,
EPM_API_ROUTES,
DATA_STREAM_API_ROUTES,
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/ingest_manager/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export const config = {
enabled: schema.boolean({ defaultValue: true }),
tlsCheckDisabled: schema.boolean({ defaultValue: false }),
pollingRequestTimeout: schema.number({ defaultValue: 60000 }),
maxConcurrentConnections: schema.number({ defaultValue: 0 }),
kibana: schema.object({
host: schema.maybe(schema.string()),
ca_sha256: schema.maybe(schema.string()),
Expand Down
4 changes: 4 additions & 0 deletions x-pack/plugins/ingest_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
} from './constants';
import { registerSavedObjects, registerEncryptedSavedObjects } from './saved_objects';
import {
registerLimitedConcurrencyRoutes,
registerEPMRoutes,
registerPackageConfigRoutes,
registerDataStreamRoutes,
Expand Down Expand Up @@ -228,6 +229,9 @@ export class IngestManagerPlugin
);
}
} else {
// we currently only use this global interceptor if fleet is enabled
// since it would run this func on *every* req (other plugins, CSS, etc)
registerLimitedConcurrencyRoutes(core, config);
registerAgentRoutes(router);
registerEnrollmentApiKeyRoutes(router);
registerInstallScriptRoutes({
Expand Down
6 changes: 3 additions & 3 deletions x-pack/plugins/ingest_manager/server/routes/agent/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/

import { IRouter } from 'src/core/server';
import { PLUGIN_ID, AGENT_API_ROUTES } from '../../constants';
import { PLUGIN_ID, AGENT_API_ROUTES, LIMITED_CONCURRENCY_ROUTE_TAG } from '../../constants';
import {
GetAgentsRequestSchema,
GetOneAgentRequestSchema,
Expand Down Expand Up @@ -95,7 +95,7 @@ export const registerRoutes = (router: IRouter) => {
{
path: AGENT_API_ROUTES.ENROLL_PATTERN,
validate: PostAgentEnrollRequestSchema,
options: { tags: [] },
options: { tags: [LIMITED_CONCURRENCY_ROUTE_TAG] },
},
postAgentEnrollHandler
);
Expand All @@ -105,7 +105,7 @@ export const registerRoutes = (router: IRouter) => {
{
path: AGENT_API_ROUTES.ACKS_PATTERN,
validate: PostAgentAcksRequestSchema,
options: { tags: [] },
options: { tags: [LIMITED_CONCURRENCY_ROUTE_TAG] },
},
postAgentAcksHandlerBuilder({
acknowledgeAgentActions: AgentService.acknowledgeAgentActions,
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/ingest_manager/server/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ export { registerRoutes as registerInstallScriptRoutes } from './install_script'
export { registerRoutes as registerOutputRoutes } from './output';
export { registerRoutes as registerSettingsRoutes } from './settings';
export { registerRoutes as registerAppRoutes } from './app';
export { registerLimitedConcurrencyRoutes } from './limited_concurrency';
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { coreMock } from 'src/core/server/mocks';
import { registerLimitedConcurrencyRoutes } from './limited_concurrency';
import { IngestManagerConfigType } from '../index';

describe('registerLimitedConcurrencyRoutes', () => {
test(`doesn't call registerOnPreAuth if maxConcurrentConnections is 0`, async () => {
const mockSetup = coreMock.createSetup();
const mockConfig = { fleet: { maxConcurrentConnections: 0 } } as IngestManagerConfigType;
registerLimitedConcurrencyRoutes(mockSetup, mockConfig);

expect(mockSetup.http.registerOnPreAuth).not.toHaveBeenCalled();
});

test(`calls registerOnPreAuth once if maxConcurrentConnections is 1`, async () => {
const mockSetup = coreMock.createSetup();
const mockConfig = { fleet: { maxConcurrentConnections: 1 } } as IngestManagerConfigType;
registerLimitedConcurrencyRoutes(mockSetup, mockConfig);

expect(mockSetup.http.registerOnPreAuth).toHaveBeenCalledTimes(1);
});

test(`calls registerOnPreAuth once if maxConcurrentConnections is 1000`, async () => {
const mockSetup = coreMock.createSetup();
const mockConfig = { fleet: { maxConcurrentConnections: 1000 } } as IngestManagerConfigType;
registerLimitedConcurrencyRoutes(mockSetup, mockConfig);

expect(mockSetup.http.registerOnPreAuth).toHaveBeenCalledTimes(1);
});
});
72 changes: 72 additions & 0 deletions x-pack/plugins/ingest_manager/server/routes/limited_concurrency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import {
CoreSetup,
KibanaRequest,
LifecycleResponseFactory,
OnPreAuthToolkit,
} from 'kibana/server';
import { LIMITED_CONCURRENCY_ROUTE_TAG } from '../../common';
import { IngestManagerConfigType } from '../index';
class MaxCounter {
constructor(private readonly max: number = 1) {}
private counter = 0;
valueOf() {
return this.counter;
}
increase() {
if (this.counter < this.max) {
this.counter += 1;
}
}
decrease() {
if (this.counter > 0) {
this.counter -= 1;
}
}
lessThanMax() {
return this.counter < this.max;
}
}

function shouldHandleRequest(request: KibanaRequest) {
const tags = request.route.options.tags;
return tags.includes(LIMITED_CONCURRENCY_ROUTE_TAG);
}

export function registerLimitedConcurrencyRoutes(core: CoreSetup, config: IngestManagerConfigType) {
const max = config.fleet.maxConcurrentConnections;
if (!max) return;

const counter = new MaxCounter(max);
core.http.registerOnPreAuth(function preAuthHandler(
request: KibanaRequest,
response: LifecycleResponseFactory,
toolkit: OnPreAuthToolkit
) {
if (!shouldHandleRequest(request)) {
return toolkit.next();
}

if (!counter.lessThanMax()) {
return response.customError({
body: 'Too Many Requests',
statusCode: 429,
});
}

counter.increase();

// requests.events.aborted$ has a bug (but has test which explicitly verifies) where it's fired even when the request completes
// https://github.com/elastic/kibana/pull/70495#issuecomment-656288766
request.events.aborted$.toPromise().then(() => {
counter.decrease();
});

return toolkit.next();
});
}

0 comments on commit 5af7d21

Please sign in to comment.