diff --git a/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.handlesearcherror.md b/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.handlesearcherror.md index 5f8966f0227ac..f6421d65bc551 100644 --- a/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.handlesearcherror.md +++ b/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.handlesearcherror.md @@ -7,7 +7,7 @@ Signature: ```typescript -protected handleSearchError(e: KibanaServerError | AbortError, timeoutSignal: AbortSignal, options?: ISearchOptions): Error; +protected handleSearchError(e: KibanaServerError | AbortError, options?: ISearchOptions, isTimeout?: boolean): Error; ``` ## Parameters @@ -15,8 +15,8 @@ protected handleSearchError(e: KibanaServerError | AbortError, timeoutSignal: Ab | Parameter | Type | Description | | --- | --- | --- | | e | KibanaServerError | AbortError | | -| timeoutSignal | AbortSignal | | | options | ISearchOptions | | +| isTimeout | boolean | | Returns: diff --git a/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.md b/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.md index 2247813562dc7..9d18309fc07be 100644 --- a/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.md +++ b/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.md @@ -27,7 +27,7 @@ export declare class SearchInterceptor | Method | Modifiers | Description | | --- | --- | --- | | [getTimeoutMode()](./kibana-plugin-plugins-data-public.searchinterceptor.gettimeoutmode.md) | | | -| [handleSearchError(e, timeoutSignal, options)](./kibana-plugin-plugins-data-public.searchinterceptor.handlesearcherror.md) | | | +| [handleSearchError(e, options, isTimeout)](./kibana-plugin-plugins-data-public.searchinterceptor.handlesearcherror.md) | | | | [search(request, options)](./kibana-plugin-plugins-data-public.searchinterceptor.search.md) | | Searches using the given search method. Overrides the AbortSignal with one that will abort either when the request times out, or when the original AbortSignal is aborted. Updates pendingCount$ when the request is started/finalized. | | [showError(e)](./kibana-plugin-plugins-data-public.searchinterceptor.showerror.md) | | | diff --git a/src/plugins/data/public/public.api.md b/src/plugins/data/public/public.api.md index e219b02ff18af..cef85b2fafa02 100644 --- a/src/plugins/data/public/public.api.md +++ b/src/plugins/data/public/public.api.md @@ -2330,8 +2330,6 @@ export interface SearchError { // @public (undocumented) export class SearchInterceptor { constructor(deps: SearchInterceptorDeps); - // @internal - protected abortController: AbortController; // @internal (undocumented) protected application: CoreStart['application']; // (undocumented) @@ -2342,22 +2340,12 @@ export class SearchInterceptor { // Warning: (ae-forgotten-export) The symbol "AbortError" needs to be exported by the entry point index.d.ts // // (undocumented) - protected handleSearchError(e: KibanaServerError | AbortError, timeoutSignal: AbortSignal, options?: ISearchOptions): Error; + protected handleSearchError(e: KibanaServerError | AbortError, options?: ISearchOptions, isTimeout?: boolean): Error; // @internal protected pendingCount$: BehaviorSubject; // @internal (undocumented) protected runSearch(request: IKibanaSearchRequest, options?: ISearchOptions): Promise; search(request: IKibanaSearchRequest, options?: ISearchOptions): Observable; - // @internal (undocumented) - protected setupAbortSignal({ abortSignal, timeout, }: { - abortSignal?: AbortSignal; - timeout?: number; - }): { - timeoutSignal: AbortSignal; - combinedSignal: AbortSignal; - cleanup: () => void; - abort: () => void; - }; // (undocumented) showError(e: Error): void; } diff --git a/src/plugins/data/public/search/search_interceptor.ts b/src/plugins/data/public/search/search_interceptor.ts index f5a2dc0571fdc..3df2313f83798 100644 --- a/src/plugins/data/public/search/search_interceptor.ts +++ b/src/plugins/data/public/search/search_interceptor.ts @@ -7,7 +7,7 @@ */ import { memoize } from 'lodash'; -import { BehaviorSubject, throwError, timer, defer, from, Observable, NEVER } from 'rxjs'; +import { BehaviorSubject, throwError, defer, from, Observable } from 'rxjs'; import { catchError, finalize } from 'rxjs/operators'; import { PublicMethodsOf } from '@kbn/utility-types'; import { CoreStart, CoreSetup, ToastsSetup } from 'kibana/public'; @@ -30,11 +30,7 @@ import { getHttpError, } from './errors'; import { toMountPoint } from '../../../kibana_react/public'; -import { - AbortError, - getCombinedAbortSignal, - KibanaServerError, -} from '../../../kibana_utils/public'; +import { AbortError, KibanaServerError } from '../../../kibana_utils/public'; import { ISessionService } from './session'; export interface SearchInterceptorDeps { @@ -48,12 +44,6 @@ export interface SearchInterceptorDeps { } export class SearchInterceptor { - /** - * `abortController` used to signal all searches to abort. - * @internal - */ - protected abortController = new AbortController(); - /** * Observable that emits when the number of pending requests changes. * @internal @@ -98,10 +88,10 @@ export class SearchInterceptor { */ protected handleSearchError( e: KibanaServerError | AbortError, - timeoutSignal: AbortSignal, - options?: ISearchOptions + options?: ISearchOptions, + isTimeout?: boolean ): Error { - if (timeoutSignal.aborted || e.message === 'Request timed out') { + if (isTimeout || e.message === 'Request timed out') { // Handle a client or a server side timeout const err = new SearchTimeoutError(e, this.getTimeoutMode()); @@ -154,60 +144,6 @@ export class SearchInterceptor { ); } - /** - * @internal - */ - protected setupAbortSignal({ - abortSignal, - timeout, - }: { - abortSignal?: AbortSignal; - timeout?: number; - }) { - // Schedule this request to automatically timeout after some interval - const timeoutController = new AbortController(); - const { signal: timeoutSignal } = timeoutController; - const timeout$ = timeout ? timer(timeout) : NEVER; - const subscription = timeout$.subscribe(() => { - this.deps.usageCollector?.trackQueryTimedOut(); - timeoutController.abort(); - }); - - const selfAbortController = new AbortController(); - - // Get a combined `AbortSignal` that will be aborted whenever the first of the following occurs: - // 1. The internal abort controller aborts - // 2. The request times out - // 3. abort() is called on `selfAbortController`. This is used by session service to abort all pending searches that it tracks - // in the current session - // 4. The passed-in signal aborts (e.g. when re-fetching, or whenever the app determines) - const signals = [ - this.abortController.signal, - timeoutSignal, - selfAbortController.signal, - ...(abortSignal ? [abortSignal] : []), - ]; - - const { signal: combinedSignal, cleanup: cleanupCombinedSignal } = getCombinedAbortSignal( - signals - ); - const cleanup = () => { - subscription.unsubscribe(); - combinedSignal.removeEventListener('abort', cleanup); - cleanupCombinedSignal(); - }; - combinedSignal.addEventListener('abort', cleanup); - - return { - timeoutSignal, - combinedSignal, - cleanup, - abort: () => { - selfAbortController.abort(); - }, - }; - } - private showTimeoutErrorToast = (e: SearchTimeoutError, sessionId?: string) => { this.deps.toasts.addDanger({ title: 'Timed out', @@ -245,25 +181,21 @@ export class SearchInterceptor { */ public search( request: IKibanaSearchRequest, - options?: ISearchOptions + options: ISearchOptions = {} ): Observable { // Defer the following logic until `subscribe` is actually called return defer(() => { - if (options?.abortSignal?.aborted) { + if (options.abortSignal?.aborted) { return throwError(new AbortError()); } - const { timeoutSignal, combinedSignal, cleanup } = this.setupAbortSignal({ - abortSignal: options?.abortSignal, - }); this.pendingCount$.next(this.pendingCount$.getValue() + 1); - return from(this.runSearch(request, { ...options, abortSignal: combinedSignal })).pipe( + return from(this.runSearch(request, options)).pipe( catchError((e: Error | AbortError) => { - return throwError(this.handleSearchError(e, timeoutSignal, options)); + return throwError(this.handleSearchError(e, options)); }), finalize(() => { this.pendingCount$.next(this.pendingCount$.getValue() - 1); - cleanup(); }) ); }); diff --git a/src/plugins/kibana_utils/common/abort_utils.test.ts b/src/plugins/kibana_utils/common/abort_utils.test.ts index 1f8a1ef3d84c5..0d34a7852fb44 100644 --- a/src/plugins/kibana_utils/common/abort_utils.test.ts +++ b/src/plugins/kibana_utils/common/abort_utils.test.ts @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -import { AbortError, abortSignalToPromise, getCombinedAbortSignal } from './abort_utils'; +import { AbortError, abortSignalToPromise } from './abort_utils'; jest.useFakeTimers(); @@ -66,91 +66,4 @@ describe('AbortUtils', () => { }); }); }); - - describe('getCombinedAbortSignal', () => { - test('should return an AbortSignal', () => { - const signal = getCombinedAbortSignal([]).signal; - expect(signal).toBeInstanceOf(AbortSignal); - }); - - test('should not abort if none of the signals abort', async () => { - const controller1 = new AbortController(); - const controller2 = new AbortController(); - setTimeout(() => controller1.abort(), 2000); - setTimeout(() => controller2.abort(), 1000); - const signal = getCombinedAbortSignal([controller1.signal, controller2.signal]).signal; - expect(signal.aborted).toBe(false); - jest.advanceTimersByTime(500); - await flushPromises(); - expect(signal.aborted).toBe(false); - }); - - test('should abort when the first signal aborts', async () => { - const controller1 = new AbortController(); - const controller2 = new AbortController(); - setTimeout(() => controller1.abort(), 2000); - setTimeout(() => controller2.abort(), 1000); - const signal = getCombinedAbortSignal([controller1.signal, controller2.signal]).signal; - expect(signal.aborted).toBe(false); - jest.advanceTimersByTime(1000); - await flushPromises(); - expect(signal.aborted).toBe(true); - }); - - test('should be aborted if any of the signals is already aborted', async () => { - const controller1 = new AbortController(); - const controller2 = new AbortController(); - controller1.abort(); - const signal = getCombinedAbortSignal([controller1.signal, controller2.signal]).signal; - expect(signal.aborted).toBe(true); - }); - - describe('cleanup listener', () => { - const createMockController = () => { - const controller = new AbortController(); - const spyAddListener = jest.spyOn(controller.signal, 'addEventListener'); - const spyRemoveListener = jest.spyOn(controller.signal, 'removeEventListener'); - return { - controller, - getTotalListeners: () => - Math.max(spyAddListener.mock.calls.length - spyRemoveListener.mock.calls.length, 0), - }; - }; - - test('cleanup should cleanup inner listeners', () => { - const controller1 = createMockController(); - const controller2 = createMockController(); - - const { cleanup } = getCombinedAbortSignal([ - controller1.controller.signal, - controller2.controller.signal, - ]); - - expect(controller1.getTotalListeners()).toBe(1); - expect(controller2.getTotalListeners()).toBe(1); - - cleanup(); - - expect(controller1.getTotalListeners()).toBe(0); - expect(controller2.getTotalListeners()).toBe(0); - }); - - test('abort should cleanup inner listeners', async () => { - const controller1 = createMockController(); - const controller2 = createMockController(); - - getCombinedAbortSignal([controller1.controller.signal, controller2.controller.signal]); - - expect(controller1.getTotalListeners()).toBe(1); - expect(controller2.getTotalListeners()).toBe(1); - - controller1.controller.abort(); - - await flushPromises(); - - expect(controller1.getTotalListeners()).toBe(0); - expect(controller2.getTotalListeners()).toBe(0); - }); - }); - }); }); diff --git a/src/plugins/kibana_utils/common/abort_utils.ts b/src/plugins/kibana_utils/common/abort_utils.ts index f4c750745a605..051f947b68c1b 100644 --- a/src/plugins/kibana_utils/common/abort_utils.ts +++ b/src/plugins/kibana_utils/common/abort_utils.ts @@ -45,32 +45,3 @@ export function abortSignalToPromise( return { promise, cleanup }; } - -/** - * Returns an `AbortSignal` that will be aborted when the first of the given signals aborts. - * - * @param signals - */ -export function getCombinedAbortSignal( - signals: AbortSignal[] -): { signal: AbortSignal; cleanup: () => void } { - const controller = new AbortController(); - let cleanup = () => {}; - - if (signals.some((signal) => signal.aborted)) { - controller.abort(); - } else { - const promises = signals.map((signal) => abortSignalToPromise(signal)); - cleanup = () => { - promises.forEach((p) => p.cleanup()); - controller.signal.removeEventListener('abort', cleanup); - }; - controller.signal.addEventListener('abort', cleanup); - Promise.race(promises.map((p) => p.promise)).catch(() => { - cleanup(); - controller.abort(); - }); - } - - return { signal: controller.signal, cleanup }; -} diff --git a/src/plugins/kibana_utils/common/index.ts b/src/plugins/kibana_utils/common/index.ts index 398bf1415c005..76a7cb2855c6e 100644 --- a/src/plugins/kibana_utils/common/index.ts +++ b/src/plugins/kibana_utils/common/index.ts @@ -13,7 +13,7 @@ export * from './ui'; export * from './state_containers'; export * from './typed_json'; export * from './errors'; -export { AbortError, abortSignalToPromise, getCombinedAbortSignal } from './abort_utils'; +export { AbortError, abortSignalToPromise } from './abort_utils'; export { createGetterSetter, Get, Set } from './create_getter_setter'; export { distinctUntilChangedWithInitialValue } from './distinct_until_changed_with_initial_value'; export { url } from './url'; diff --git a/src/plugins/kibana_utils/public/index.ts b/src/plugins/kibana_utils/public/index.ts index 9a94757cdcb7a..75c52e1301ea5 100644 --- a/src/plugins/kibana_utils/public/index.ts +++ b/src/plugins/kibana_utils/public/index.ts @@ -15,7 +15,6 @@ export { fieldWildcardFilter, fieldWildcardMatcher, Get, - getCombinedAbortSignal, JsonArray, JsonObject, JsonValue, diff --git a/src/plugins/kibana_utils/server/index.ts b/src/plugins/kibana_utils/server/index.ts index babc5c4a201ee..483c5aa92b45e 100644 --- a/src/plugins/kibana_utils/server/index.ts +++ b/src/plugins/kibana_utils/server/index.ts @@ -13,7 +13,6 @@ export { fieldWildcardFilter, fieldWildcardMatcher, Get, - getCombinedAbortSignal, Set, url, } from '../common'; diff --git a/x-pack/plugins/data_enhanced/public/search/search_abort_controller.test.ts b/x-pack/plugins/data_enhanced/public/search/search_abort_controller.test.ts new file mode 100644 index 0000000000000..68282c1e947f7 --- /dev/null +++ b/x-pack/plugins/data_enhanced/public/search/search_abort_controller.test.ts @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { SearchAbortController } from './search_abort_controller'; + +const timeTravel = (msToRun = 0) => { + jest.advanceTimersByTime(msToRun); + return new Promise((resolve) => setImmediate(resolve)); +}; + +describe('search abort controller', () => { + test('is not aborted when empty', () => { + const sac = new SearchAbortController(); + expect(sac.getSignal().aborted).toBe(false); + }); + + test('immediately aborts when passed an aborted signal in the constructor', () => { + const controller = new AbortController(); + controller.abort(); + const sac = new SearchAbortController(controller.signal); + expect(sac.getSignal().aborted).toBe(true); + }); + + test('aborts when input signal is aborted', () => { + const controller = new AbortController(); + const sac = new SearchAbortController(controller.signal); + expect(sac.getSignal().aborted).toBe(false); + controller.abort(); + expect(sac.getSignal().aborted).toBe(true); + }); + + test('aborts when all input signals are aborted', () => { + const controller = new AbortController(); + const sac = new SearchAbortController(controller.signal); + + const controller2 = new AbortController(); + sac.addAbortSignal(controller2.signal); + expect(sac.getSignal().aborted).toBe(false); + controller.abort(); + expect(sac.getSignal().aborted).toBe(false); + controller2.abort(); + expect(sac.getSignal().aborted).toBe(true); + }); + + test('aborts explicitly even if all inputs are not aborted', () => { + const controller = new AbortController(); + const sac = new SearchAbortController(controller.signal); + + const controller2 = new AbortController(); + sac.addAbortSignal(controller2.signal); + + expect(sac.getSignal().aborted).toBe(false); + sac.abort(); + expect(sac.getSignal().aborted).toBe(true); + }); + + test('doesnt abort, if cleared', () => { + const controller = new AbortController(); + const sac = new SearchAbortController(controller.signal); + expect(sac.getSignal().aborted).toBe(false); + sac.cleanup(); + controller.abort(); + expect(sac.getSignal().aborted).toBe(false); + }); + + describe('timeout abort', () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + test('doesnt abort on timeout, if cleared', () => { + const sac = new SearchAbortController(undefined, 100); + expect(sac.getSignal().aborted).toBe(false); + sac.cleanup(); + timeTravel(100); + expect(sac.getSignal().aborted).toBe(false); + }); + + test('aborts on timeout, even if no signals passed in', () => { + const sac = new SearchAbortController(undefined, 100); + expect(sac.getSignal().aborted).toBe(false); + timeTravel(100); + expect(sac.getSignal().aborted).toBe(true); + expect(sac.isTimeout()).toBe(true); + }); + + test('aborts on timeout, even if there are unaborted signals', () => { + const controller = new AbortController(); + const sac = new SearchAbortController(controller.signal, 100); + + expect(sac.getSignal().aborted).toBe(false); + timeTravel(100); + expect(sac.getSignal().aborted).toBe(true); + expect(sac.isTimeout()).toBe(true); + }); + }); +}); diff --git a/x-pack/plugins/data_enhanced/public/search/search_abort_controller.ts b/x-pack/plugins/data_enhanced/public/search/search_abort_controller.ts new file mode 100644 index 0000000000000..4482a7771dc28 --- /dev/null +++ b/x-pack/plugins/data_enhanced/public/search/search_abort_controller.ts @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Subscription, timer } from 'rxjs'; + +export enum AbortReason { + Timeout = 'timeout', +} + +export class SearchAbortController { + private inputAbortSignals: AbortSignal[] = new Array(); + private abortController: AbortController = new AbortController(); + private timeoutSub?: Subscription; + private destroyed = false; + private reason?: AbortReason; + + constructor(abortSignal?: AbortSignal, timeout?: number) { + if (abortSignal) { + this.addAbortSignal(abortSignal); + } + + if (timeout) { + this.timeoutSub = timer(timeout).subscribe(() => { + this.reason = AbortReason.Timeout; + this.abortController.abort(); + this.timeoutSub!.unsubscribe(); + }); + } + } + + private abortHandler = () => { + const allAborted = this.inputAbortSignals.every((signal) => signal.aborted); + if (allAborted) { + this.abortController.abort(); + this.cleanup(); + } + }; + + public cleanup() { + this.destroyed = true; + this.timeoutSub?.unsubscribe(); + this.inputAbortSignals.forEach((abortSignal) => { + abortSignal.removeEventListener('abort', this.abortHandler); + }); + } + + public addAbortSignal(inputSignal: AbortSignal) { + if (this.destroyed) { + return; + } + + this.inputAbortSignals.push(inputSignal); + + if (inputSignal.aborted) { + this.abortHandler(); + } else { + // abort our internal controller if the input signal aborts + inputSignal.addEventListener('abort', this.abortHandler); + } + } + + public getSignal() { + return this.abortController.signal; + } + + public abort() { + this.cleanup(); + this.abortController.abort(); + } + + public isTimeout() { + return this.reason === AbortReason.Timeout; + } +} diff --git a/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts b/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts index 0dfec1a35d900..b9d8553d3dc5a 100644 --- a/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts +++ b/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts @@ -17,6 +17,7 @@ import { SearchSessionState, } from '../../../../../src/plugins/data/public'; import { ENHANCED_ES_SEARCH_STRATEGY, IAsyncSearchOptions, pollSearch } from '../../common'; +import { SearchAbortController } from './search_abort_controller'; export class EnhancedSearchInterceptor extends SearchInterceptor { private uiSettingsSub: Subscription; @@ -47,31 +48,30 @@ export class EnhancedSearchInterceptor extends SearchInterceptor { } public search({ id, ...request }: IKibanaSearchRequest, options: IAsyncSearchOptions = {}) { - const { combinedSignal, timeoutSignal, cleanup, abort } = this.setupAbortSignal({ - abortSignal: options.abortSignal, - timeout: this.searchTimeout, - }); - const strategy = options?.strategy ?? ENHANCED_ES_SEARCH_STRATEGY; - const searchOptions = { ...options, strategy, abortSignal: combinedSignal }; + const searchOptions = { + strategy: ENHANCED_ES_SEARCH_STRATEGY, + ...options, + }; + const { sessionId, strategy, abortSignal } = searchOptions; const search = () => this.runSearch({ id, ...request }, searchOptions); + const searchAbortController = new SearchAbortController(abortSignal, this.searchTimeout); this.pendingCount$.next(this.pendingCount$.getValue() + 1); - - const untrackSearch = - this.deps.session.isCurrentSession(options.sessionId) && - this.deps.session.trackSearch({ abort }); + const untrackSearch = this.deps.session.isCurrentSession(options.sessionId) + ? this.deps.session.trackSearch({ abort: () => searchAbortController.abort() }) + : undefined; // track if this search's session will be send to background // if yes, then we don't need to cancel this search when it is aborted let isSavedToBackground = false; const savedToBackgroundSub = - this.deps.session.isCurrentSession(options.sessionId) && + this.deps.session.isCurrentSession(sessionId) && this.deps.session.state$ .pipe( skip(1), // ignore any state, we are only interested in transition x -> BackgroundLoading filter( (state) => - this.deps.session.isCurrentSession(options.sessionId) && + this.deps.session.isCurrentSession(sessionId) && state === SearchSessionState.BackgroundLoading ), take(1) @@ -84,15 +84,18 @@ export class EnhancedSearchInterceptor extends SearchInterceptor { if (id && !isSavedToBackground) this.deps.http.delete(`/internal/search/${strategy}/${id}`); }); - return pollSearch(search, cancel, { ...options, abortSignal: combinedSignal }).pipe( + return pollSearch(search, cancel, { + ...options, + abortSignal: searchAbortController.getSignal(), + }).pipe( tap((response) => (id = response.id)), catchError((e: Error) => { cancel(); - return throwError(this.handleSearchError(e, timeoutSignal, options)); + return throwError(this.handleSearchError(e, options, searchAbortController.isTimeout())); }), finalize(() => { this.pendingCount$.next(this.pendingCount$.getValue() - 1); - cleanup(); + searchAbortController.cleanup(); if (untrackSearch && this.deps.session.isCurrentSession(options.sessionId)) { // untrack if this search still belongs to current session untrackSearch();