diff --git a/x-pack/plugins/ingest_manager/common/types/index.ts b/x-pack/plugins/ingest_manager/common/types/index.ts index 7acef263f973a..69bcc498c18be 100644 --- a/x-pack/plugins/ingest_manager/common/types/index.ts +++ b/x-pack/plugins/ingest_manager/common/types/index.ts @@ -22,7 +22,8 @@ export interface IngestManagerConfigType { host?: string; ca_sha256?: string; }; - agentConfigRolloutConcurrency: number; + agentConfigRolloutRateLimitIntervalMs: number; + agentConfigRolloutRateLimitRequestPerInterval: number; }; } diff --git a/x-pack/plugins/ingest_manager/server/index.ts b/x-pack/plugins/ingest_manager/server/index.ts index 6f8c4948559d3..e2f659f54d625 100644 --- a/x-pack/plugins/ingest_manager/server/index.ts +++ b/x-pack/plugins/ingest_manager/server/index.ts @@ -35,7 +35,8 @@ export const config = { host: schema.maybe(schema.string()), ca_sha256: schema.maybe(schema.string()), }), - agentConfigRolloutConcurrency: schema.number({ defaultValue: 10 }), + agentConfigRolloutRateLimitIntervalMs: schema.number({ defaultValue: 5000 }), + agentConfigRolloutRateLimitRequestPerInterval: schema.number({ defaultValue: 5 }), }), }), }; diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.test.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.test.ts deleted file mode 100644 index 70207dcf325c4..0000000000000 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.test.ts +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import * as Rx from 'rxjs'; -import { share } from 'rxjs/operators'; -import { createSubscriberConcurrencyLimiter } from './rxjs_utils'; - -function createSpyObserver(o: Rx.Observable): [Rx.Subscription, jest.Mock] { - const spy = jest.fn(); - const observer = o.subscribe(spy); - return [observer, spy]; -} - -describe('createSubscriberConcurrencyLimiter', () => { - it('should not publish to more than n concurrent subscriber', async () => { - const subject = new Rx.Subject(); - const sharedObservable = subject.pipe(share()); - - const limiter = createSubscriberConcurrencyLimiter(2); - - const [observer1, spy1] = createSpyObserver(sharedObservable.pipe(limiter())); - const [observer2, spy2] = createSpyObserver(sharedObservable.pipe(limiter())); - const [observer3, spy3] = createSpyObserver(sharedObservable.pipe(limiter())); - const [observer4, spy4] = createSpyObserver(sharedObservable.pipe(limiter())); - subject.next('test1'); - - expect(spy1).toBeCalled(); - expect(spy2).toBeCalled(); - expect(spy3).not.toBeCalled(); - expect(spy4).not.toBeCalled(); - - observer1.unsubscribe(); - expect(spy3).toBeCalled(); - expect(spy4).not.toBeCalled(); - - observer2.unsubscribe(); - expect(spy4).toBeCalled(); - - observer3.unsubscribe(); - observer4.unsubscribe(); - }); -}); diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts index dc0ed35207e46..dddade6841460 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts @@ -43,23 +43,37 @@ export const toPromiseAbortable = ( } }); -export function createSubscriberConcurrencyLimiter(maxConcurrency: number) { - let observers: Array<[Rx.Subscriber, any]> = []; - let activeObservers: Array> = []; +export function createRateLimiter( + ratelimitIntervalMs: number, + ratelimitRequestPerInterval: number +) { + function createCurrentInterval() { + return { + startedAt: Rx.asyncScheduler.now(), + numRequests: 0, + }; + } - function processNext() { - if (activeObservers.length >= maxConcurrency) { - return; - } - const observerValuePair = observers.shift(); + let currentInterval: { startedAt: number; numRequests: number } = createCurrentInterval(); + let observers: Array<[Rx.Subscriber, any]> = []; + let timerSubscription: Rx.Subscription | undefined; - if (!observerValuePair) { + function createTimeout() { + if (timerSubscription) { return; } - - const [observer, value] = observerValuePair; - activeObservers.push(observer); - observer.next(value); + timerSubscription = Rx.asyncScheduler.schedule(() => { + timerSubscription = undefined; + currentInterval = createCurrentInterval(); + for (const [waitingObserver, value] of observers) { + if (currentInterval.numRequests >= ratelimitRequestPerInterval) { + createTimeout(); + continue; + } + currentInterval.numRequests++; + waitingObserver.next(value); + } + }, ratelimitIntervalMs); } return function limit(): Rx.MonoTypeOperatorFunction { @@ -67,8 +81,14 @@ export function createSubscriberConcurrencyLimiter(maxConcurrency: number) { new Rx.Observable((observer) => { const subscription = observable.subscribe({ next(value) { + if (currentInterval.numRequests < ratelimitRequestPerInterval) { + currentInterval.numRequests++; + observer.next(value); + return; + } + observers = [...observers, [observer, value]]; - processNext(); + createTimeout(); }, error(err) { observer.error(err); @@ -79,10 +99,8 @@ export function createSubscriberConcurrencyLimiter(maxConcurrency: number) { }); return () => { - activeObservers = activeObservers.filter((o) => o !== observer); observers = observers.filter((o) => o[0] !== observer); subscription.unsubscribe(); - processNext(); }; }); }; diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts index 53270afe453c4..1547b6b5ea053 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts @@ -28,7 +28,7 @@ import * as APIKeysService from '../../api_keys'; import { AGENT_SAVED_OBJECT_TYPE, AGENT_UPDATE_ACTIONS_INTERVAL_MS } from '../../../constants'; import { createAgentAction, getNewActionsSince } from '../actions'; import { appContextService } from '../../app_context'; -import { toPromiseAbortable, AbortError, createSubscriberConcurrencyLimiter } from './rxjs_utils'; +import { toPromiseAbortable, AbortError, createRateLimiter } from './rxjs_utils'; function getInternalUserSOClient() { const fakeRequest = ({ @@ -134,8 +134,9 @@ export function agentCheckinStateNewActionsFactory() { const agentConfigs$ = new Map>(); const newActions$ = createNewActionsSharedObservable(); // Rx operators - const concurrencyLimiter = createSubscriberConcurrencyLimiter( - appContextService.getConfig()?.fleet.agentConfigRolloutConcurrency ?? 10 + const rateLimiter = createRateLimiter( + appContextService.getConfig()?.fleet.agentConfigRolloutRateLimitIntervalMs ?? 5000, + appContextService.getConfig()?.fleet.agentConfigRolloutRateLimitRequestPerInterval ?? 50 ); async function subscribeToNewActions( @@ -158,7 +159,7 @@ export function agentCheckinStateNewActionsFactory() { const stream$ = agentConfig$.pipe( timeout(appContextService.getConfig()?.fleet.pollingRequestTimeout || 0), filter((config) => shouldCreateAgentConfigAction(agent, config)), - concurrencyLimiter(), + rateLimiter(), mergeMap((config) => createAgentActionFromConfig(soClient, agent, config)), merge(newActions$), mergeMap(async (data) => {