Skip to content

Commit

Permalink
[Sessions] Extract search abort controllers logic into a separate cla…
Browse files Browse the repository at this point in the history
…ss (#95688)

* simplify abort controller logic and extract it into a class

* docs

* delete unused tests

* code review

* code review

* code review
  • Loading branch information
lizozom authored Mar 30, 2021
1 parent 907b5c8 commit b58dd3e
Show file tree
Hide file tree
Showing 12 changed files with 216 additions and 228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
<b>Signature:</b>

```typescript
protected handleSearchError(e: KibanaServerError | AbortError, timeoutSignal: AbortSignal, options?: ISearchOptions): Error;
protected handleSearchError(e: KibanaServerError | AbortError, options?: ISearchOptions, isTimeout?: boolean): Error;
```

## Parameters

| Parameter | Type | Description |
| --- | --- | --- |
| e | <code>KibanaServerError &#124; AbortError</code> | |
| timeoutSignal | <code>AbortSignal</code> | |
| options | <code>ISearchOptions</code> | |
| isTimeout | <code>boolean</code> | |

<b>Returns:</b>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export declare class SearchInterceptor
| Method | Modifiers | Description |
| --- | --- | --- |
| [getTimeoutMode()](./kibana-plugin-plugins-data-public.searchinterceptor.gettimeoutmode.md) | | |
| [handleSearchError(e, timeoutSignal, options)](./kibana-plugin-plugins-data-public.searchinterceptor.handlesearcherror.md) | | |
| [handleSearchError(e, options, isTimeout)](./kibana-plugin-plugins-data-public.searchinterceptor.handlesearcherror.md) | | |
| [search(request, options)](./kibana-plugin-plugins-data-public.searchinterceptor.search.md) | | Searches using the given <code>search</code> method. Overrides the <code>AbortSignal</code> with one that will abort either when the request times out, or when the original <code>AbortSignal</code> is aborted. Updates <code>pendingCount$</code> when the request is started/finalized. |
| [showError(e)](./kibana-plugin-plugins-data-public.searchinterceptor.showerror.md) | | |

14 changes: 1 addition & 13 deletions src/plugins/data/public/public.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2322,8 +2322,6 @@ export interface SearchError {
// @public (undocumented)
export class SearchInterceptor {
constructor(deps: SearchInterceptorDeps);
// @internal
protected abortController: AbortController;
// @internal (undocumented)
protected application: CoreStart['application'];
// (undocumented)
Expand All @@ -2334,22 +2332,12 @@ export class SearchInterceptor {
// Warning: (ae-forgotten-export) The symbol "AbortError" needs to be exported by the entry point index.d.ts
//
// (undocumented)
protected handleSearchError(e: KibanaServerError | AbortError, timeoutSignal: AbortSignal, options?: ISearchOptions): Error;
protected handleSearchError(e: KibanaServerError | AbortError, options?: ISearchOptions, isTimeout?: boolean): Error;
// @internal
protected pendingCount$: BehaviorSubject<number>;
// @internal (undocumented)
protected runSearch(request: IKibanaSearchRequest, options?: ISearchOptions): Promise<IKibanaSearchResponse>;
search(request: IKibanaSearchRequest, options?: ISearchOptions): Observable<IKibanaSearchResponse>;
// @internal (undocumented)
protected setupAbortSignal({ abortSignal, timeout, }: {
abortSignal?: AbortSignal;
timeout?: number;
}): {
timeoutSignal: AbortSignal;
combinedSignal: AbortSignal;
cleanup: () => void;
abort: () => void;
};
// (undocumented)
showError(e: Error): void;
}
Expand Down
86 changes: 9 additions & 77 deletions src/plugins/data/public/search/search_interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/

import { memoize } from 'lodash';
import { BehaviorSubject, throwError, timer, defer, from, Observable, NEVER } from 'rxjs';
import { BehaviorSubject, throwError, defer, from, Observable } from 'rxjs';
import { catchError, finalize } from 'rxjs/operators';
import { PublicMethodsOf } from '@kbn/utility-types';
import { CoreStart, CoreSetup, ToastsSetup } from 'kibana/public';
Expand All @@ -30,11 +30,7 @@ import {
getHttpError,
} from './errors';
import { toMountPoint } from '../../../kibana_react/public';
import {
AbortError,
getCombinedAbortSignal,
KibanaServerError,
} from '../../../kibana_utils/public';
import { AbortError, KibanaServerError } from '../../../kibana_utils/public';
import { ISessionService } from './session';

export interface SearchInterceptorDeps {
Expand All @@ -48,12 +44,6 @@ export interface SearchInterceptorDeps {
}

export class SearchInterceptor {
/**
* `abortController` used to signal all searches to abort.
* @internal
*/
protected abortController = new AbortController();

/**
* Observable that emits when the number of pending requests changes.
* @internal
Expand Down Expand Up @@ -98,10 +88,10 @@ export class SearchInterceptor {
*/
protected handleSearchError(
e: KibanaServerError | AbortError,
timeoutSignal: AbortSignal,
options?: ISearchOptions
options?: ISearchOptions,
isTimeout?: boolean
): Error {
if (timeoutSignal.aborted || e.message === 'Request timed out') {
if (isTimeout || e.message === 'Request timed out') {
// Handle a client or a server side timeout
const err = new SearchTimeoutError(e, this.getTimeoutMode());

Expand Down Expand Up @@ -154,60 +144,6 @@ export class SearchInterceptor {
);
}

/**
* @internal
*/
protected setupAbortSignal({
abortSignal,
timeout,
}: {
abortSignal?: AbortSignal;
timeout?: number;
}) {
// Schedule this request to automatically timeout after some interval
const timeoutController = new AbortController();
const { signal: timeoutSignal } = timeoutController;
const timeout$ = timeout ? timer(timeout) : NEVER;
const subscription = timeout$.subscribe(() => {
this.deps.usageCollector?.trackQueryTimedOut();
timeoutController.abort();
});

const selfAbortController = new AbortController();

// Get a combined `AbortSignal` that will be aborted whenever the first of the following occurs:
// 1. The internal abort controller aborts
// 2. The request times out
// 3. abort() is called on `selfAbortController`. This is used by session service to abort all pending searches that it tracks
// in the current session
// 4. The passed-in signal aborts (e.g. when re-fetching, or whenever the app determines)
const signals = [
this.abortController.signal,
timeoutSignal,
selfAbortController.signal,
...(abortSignal ? [abortSignal] : []),
];

const { signal: combinedSignal, cleanup: cleanupCombinedSignal } = getCombinedAbortSignal(
signals
);
const cleanup = () => {
subscription.unsubscribe();
combinedSignal.removeEventListener('abort', cleanup);
cleanupCombinedSignal();
};
combinedSignal.addEventListener('abort', cleanup);

return {
timeoutSignal,
combinedSignal,
cleanup,
abort: () => {
selfAbortController.abort();
},
};
}

private showTimeoutErrorToast = (e: SearchTimeoutError, sessionId?: string) => {
this.deps.toasts.addDanger({
title: 'Timed out',
Expand Down Expand Up @@ -245,25 +181,21 @@ export class SearchInterceptor {
*/
public search(
request: IKibanaSearchRequest,
options?: ISearchOptions
options: ISearchOptions = {}
): Observable<IKibanaSearchResponse> {
// Defer the following logic until `subscribe` is actually called
return defer(() => {
if (options?.abortSignal?.aborted) {
if (options.abortSignal?.aborted) {
return throwError(new AbortError());
}

const { timeoutSignal, combinedSignal, cleanup } = this.setupAbortSignal({
abortSignal: options?.abortSignal,
});
this.pendingCount$.next(this.pendingCount$.getValue() + 1);
return from(this.runSearch(request, { ...options, abortSignal: combinedSignal })).pipe(
return from(this.runSearch(request, options)).pipe(
catchError((e: Error | AbortError) => {
return throwError(this.handleSearchError(e, timeoutSignal, options));
return throwError(this.handleSearchError(e, options));
}),
finalize(() => {
this.pendingCount$.next(this.pendingCount$.getValue() - 1);
cleanup();
})
);
});
Expand Down
89 changes: 1 addition & 88 deletions src/plugins/kibana_utils/common/abort_utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

import { AbortError, abortSignalToPromise, getCombinedAbortSignal } from './abort_utils';
import { AbortError, abortSignalToPromise } from './abort_utils';

jest.useFakeTimers();

Expand Down Expand Up @@ -66,91 +66,4 @@ describe('AbortUtils', () => {
});
});
});

describe('getCombinedAbortSignal', () => {
test('should return an AbortSignal', () => {
const signal = getCombinedAbortSignal([]).signal;
expect(signal).toBeInstanceOf(AbortSignal);
});

test('should not abort if none of the signals abort', async () => {
const controller1 = new AbortController();
const controller2 = new AbortController();
setTimeout(() => controller1.abort(), 2000);
setTimeout(() => controller2.abort(), 1000);
const signal = getCombinedAbortSignal([controller1.signal, controller2.signal]).signal;
expect(signal.aborted).toBe(false);
jest.advanceTimersByTime(500);
await flushPromises();
expect(signal.aborted).toBe(false);
});

test('should abort when the first signal aborts', async () => {
const controller1 = new AbortController();
const controller2 = new AbortController();
setTimeout(() => controller1.abort(), 2000);
setTimeout(() => controller2.abort(), 1000);
const signal = getCombinedAbortSignal([controller1.signal, controller2.signal]).signal;
expect(signal.aborted).toBe(false);
jest.advanceTimersByTime(1000);
await flushPromises();
expect(signal.aborted).toBe(true);
});

test('should be aborted if any of the signals is already aborted', async () => {
const controller1 = new AbortController();
const controller2 = new AbortController();
controller1.abort();
const signal = getCombinedAbortSignal([controller1.signal, controller2.signal]).signal;
expect(signal.aborted).toBe(true);
});

describe('cleanup listener', () => {
const createMockController = () => {
const controller = new AbortController();
const spyAddListener = jest.spyOn(controller.signal, 'addEventListener');
const spyRemoveListener = jest.spyOn(controller.signal, 'removeEventListener');
return {
controller,
getTotalListeners: () =>
Math.max(spyAddListener.mock.calls.length - spyRemoveListener.mock.calls.length, 0),
};
};

test('cleanup should cleanup inner listeners', () => {
const controller1 = createMockController();
const controller2 = createMockController();

const { cleanup } = getCombinedAbortSignal([
controller1.controller.signal,
controller2.controller.signal,
]);

expect(controller1.getTotalListeners()).toBe(1);
expect(controller2.getTotalListeners()).toBe(1);

cleanup();

expect(controller1.getTotalListeners()).toBe(0);
expect(controller2.getTotalListeners()).toBe(0);
});

test('abort should cleanup inner listeners', async () => {
const controller1 = createMockController();
const controller2 = createMockController();

getCombinedAbortSignal([controller1.controller.signal, controller2.controller.signal]);

expect(controller1.getTotalListeners()).toBe(1);
expect(controller2.getTotalListeners()).toBe(1);

controller1.controller.abort();

await flushPromises();

expect(controller1.getTotalListeners()).toBe(0);
expect(controller2.getTotalListeners()).toBe(0);
});
});
});
});
29 changes: 0 additions & 29 deletions src/plugins/kibana_utils/common/abort_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,32 +45,3 @@ export function abortSignalToPromise(

return { promise, cleanup };
}

/**
* Returns an `AbortSignal` that will be aborted when the first of the given signals aborts.
*
* @param signals
*/
export function getCombinedAbortSignal(
signals: AbortSignal[]
): { signal: AbortSignal; cleanup: () => void } {
const controller = new AbortController();
let cleanup = () => {};

if (signals.some((signal) => signal.aborted)) {
controller.abort();
} else {
const promises = signals.map((signal) => abortSignalToPromise(signal));
cleanup = () => {
promises.forEach((p) => p.cleanup());
controller.signal.removeEventListener('abort', cleanup);
};
controller.signal.addEventListener('abort', cleanup);
Promise.race(promises.map((p) => p.promise)).catch(() => {
cleanup();
controller.abort();
});
}

return { signal: controller.signal, cleanup };
}
2 changes: 1 addition & 1 deletion src/plugins/kibana_utils/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export * from './ui';
export * from './state_containers';
export * from './typed_json';
export * from './errors';
export { AbortError, abortSignalToPromise, getCombinedAbortSignal } from './abort_utils';
export { AbortError, abortSignalToPromise } from './abort_utils';
export { createGetterSetter, Get, Set } from './create_getter_setter';
export { distinctUntilChangedWithInitialValue } from './distinct_until_changed_with_initial_value';
export { url } from './url';
Expand Down
1 change: 0 additions & 1 deletion src/plugins/kibana_utils/public/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ export {
fieldWildcardFilter,
fieldWildcardMatcher,
Get,
getCombinedAbortSignal,
JsonArray,
JsonObject,
JsonValue,
Expand Down
1 change: 0 additions & 1 deletion src/plugins/kibana_utils/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ export {
fieldWildcardFilter,
fieldWildcardMatcher,
Get,
getCombinedAbortSignal,
Set,
url,
} from '../common';
Expand Down
Loading

0 comments on commit b58dd3e

Please sign in to comment.