diff --git a/docs/development/plugins/data/server/kibana-plugin-plugins-data-server.md b/docs/development/plugins/data/server/kibana-plugin-plugins-data-server.md index e756eb9b72905..d179b9d9dcd82 100644 --- a/docs/development/plugins/data/server/kibana-plugin-plugins-data-server.md +++ b/docs/development/plugins/data/server/kibana-plugin-plugins-data-server.md @@ -60,7 +60,6 @@ | [esQuery](./kibana-plugin-plugins-data-server.esquery.md) | | | [fieldFormats](./kibana-plugin-plugins-data-server.fieldformats.md) | | | [indexPatterns](./kibana-plugin-plugins-data-server.indexpatterns.md) | | -| [search](./kibana-plugin-plugins-data-server.search.md) | | ## Type Aliases @@ -70,6 +69,5 @@ | [IFieldFormatsRegistry](./kibana-plugin-plugins-data-server.ifieldformatsregistry.md) | | | [ISearch](./kibana-plugin-plugins-data-server.isearch.md) | | | [ISearchCancel](./kibana-plugin-plugins-data-server.isearchcancel.md) | | -| [ParsedInterval](./kibana-plugin-plugins-data-server.parsedinterval.md) | | | [TSearchStrategyProvider](./kibana-plugin-plugins-data-server.tsearchstrategyprovider.md) | Search strategy provider creates an instance of a search strategy with the request handler context bound to it. This way every search strategy can use whatever information they require from the request context. | diff --git a/src/plugins/data/common/index.ts b/src/plugins/data/common/index.ts index cf8c0bfe3d434..e4a663a1599f1 100644 --- a/src/plugins/data/common/index.ts +++ b/src/plugins/data/common/index.ts @@ -26,3 +26,4 @@ export * from './query'; export * from './search'; export * from './search/aggs'; export * from './types'; +export * from './utils'; diff --git a/src/plugins/data/common/utils/abort_utils.test.ts b/src/plugins/data/common/utils/abort_utils.test.ts new file mode 100644 index 0000000000000..d2a25f2c2dd52 --- /dev/null +++ b/src/plugins/data/common/utils/abort_utils.test.ts @@ -0,0 +1,114 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { AbortError, toPromise, getCombinedSignal } from './abort_utils'; + +jest.useFakeTimers(); + +const flushPromises = () => new Promise(resolve => setImmediate(resolve)); + +describe('AbortUtils', () => { + describe('AbortError', () => { + test('should preserve `message`', () => { + const message = 'my error message'; + const error = new AbortError(message); + expect(error.message).toBe(message); + }); + + test('should have a name of "AbortError"', () => { + const error = new AbortError(); + expect(error.name).toBe('AbortError'); + }); + }); + + describe('toPromise', () => { + describe('resolves', () => { + test('should not resolve if the signal does not abort', async () => { + const controller = new AbortController(); + const promise = toPromise(controller.signal); + const whenResolved = jest.fn(); + promise.then(whenResolved); + await flushPromises(); + expect(whenResolved).not.toBeCalled(); + }); + + test('should resolve if the signal does abort', async () => { + const controller = new AbortController(); + const promise = toPromise(controller.signal); + const whenResolved = jest.fn(); + promise.then(whenResolved); + controller.abort(); + await flushPromises(); + expect(whenResolved).toBeCalled(); + }); + }); + + describe('rejects', () => { + test('should not reject if the signal does not abort', async () => { + const controller = new AbortController(); + const promise = toPromise(controller.signal, true); + const whenRejected = jest.fn(); + promise.catch(whenRejected); + await flushPromises(); + expect(whenRejected).not.toBeCalled(); + }); + + test('should reject if the signal does abort', async () => { + const controller = new AbortController(); + const promise = toPromise(controller.signal, true); + const whenRejected = jest.fn(); + promise.catch(whenRejected); + controller.abort(); + await flushPromises(); + expect(whenRejected).toBeCalled(); + }); + }); + }); + + describe('getCombinedSignal', () => { + test('should return an AbortSignal', () => { + const signal = getCombinedSignal([]); + expect(signal instanceof AbortSignal).toBe(true); + }); + + test('should not abort if none of the signals abort', async () => { + const controller1 = new AbortController(); + const controller2 = new AbortController(); + setTimeout(() => controller1.abort(), 2000); + setTimeout(() => controller2.abort(), 1000); + const signal = getCombinedSignal([controller1.signal, controller2.signal]); + expect(signal.aborted).toBe(false); + jest.advanceTimersByTime(500); + await flushPromises(); + expect(signal.aborted).toBe(false); + }); + + test('should abort when the first signal aborts', async () => { + const controller1 = new AbortController(); + const controller2 = new AbortController(); + setTimeout(() => controller1.abort(), 2000); + setTimeout(() => controller2.abort(), 1000); + const signal = getCombinedSignal([controller1.signal, controller2.signal]); + expect(signal.aborted).toBe(false); + jest.advanceTimersByTime(1000); + await flushPromises(); + expect(signal.aborted).toBe(true); + }); + }); +}); diff --git a/src/plugins/data/common/utils/abort_utils.ts b/src/plugins/data/common/utils/abort_utils.ts new file mode 100644 index 0000000000000..5051515f3a826 --- /dev/null +++ b/src/plugins/data/common/utils/abort_utils.ts @@ -0,0 +1,56 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Class used to signify that something was aborted. Useful for applications to conditionally handle + * this type of error differently than other errors. + */ +export class AbortError extends Error { + constructor(message = 'Aborted') { + super(message); + this.message = message; + this.name = 'AbortError'; + } +} + +/** + * Returns a `Promise` corresponding with when the given `AbortSignal` is aborted. Useful for + * situations when you might need to `Promise.race` multiple `AbortSignal`s, or an `AbortSignal` + * with any other expected errors (or completions). + * @param signal The `AbortSignal` to generate the `Promise` from + * @param shouldReject If `false`, the promise will be resolved, otherwise it will be rejected + */ +export function toPromise(signal: AbortSignal, shouldReject = false) { + return new Promise((resolve, reject) => { + const action = shouldReject ? reject : resolve; + if (signal.aborted) action(); + signal.addEventListener('abort', action); + }); +} + +/** + * Returns an `AbortSignal` that will be aborted when the first of the given signals aborts. + * @param signals + */ +export function getCombinedSignal(signals: AbortSignal[]) { + const promises = signals.map(signal => toPromise(signal)); + const controller = new AbortController(); + Promise.race(promises).then(() => controller.abort()); + return controller.signal; +} diff --git a/src/plugins/data/common/utils/index.ts b/src/plugins/data/common/utils/index.ts index 8b8686c51b9c1..33989f3ad50a7 100644 --- a/src/plugins/data/common/utils/index.ts +++ b/src/plugins/data/common/utils/index.ts @@ -19,3 +19,4 @@ /** @internal */ export { shortenDottedString } from './shorten_dotted_string'; +export { AbortError, toPromise, getCombinedSignal } from './abort_utils'; diff --git a/src/plugins/data/public/mocks.ts b/src/plugins/data/public/mocks.ts index c5cff1c5c68d9..e3fc0e97af09b 100644 --- a/src/plugins/data/public/mocks.ts +++ b/src/plugins/data/public/mocks.ts @@ -19,9 +19,7 @@ import { Plugin, DataPublicPluginSetup, DataPublicPluginStart, IndexPatternsContract } from '.'; import { fieldFormatsMock } from '../common/field_formats/mocks'; -import { searchSetupMock } from './search/mocks'; -import { AggTypeFieldFilters } from './search/aggs'; -import { searchAggsStartMock } from './search/aggs/mocks'; +import { searchSetupMock, searchStartMock } from './search/mocks'; import { queryServiceMock } from './query/mocks'; export type Setup = jest.Mocked>; @@ -35,59 +33,28 @@ const autocompleteMock: any = { const createSetupContract = (): Setup => { const querySetupMock = queryServiceMock.createSetupContract(); - const setupContract = { + return { autocomplete: autocompleteMock, search: searchSetupMock, fieldFormats: fieldFormatsMock as DataPublicPluginSetup['fieldFormats'], query: querySetupMock, - __LEGACY: { - esClient: { - search: jest.fn(), - msearch: jest.fn(), - }, - }, }; - - return setupContract; }; const createStartContract = (): Start => { const queryStartMock = queryServiceMock.createStartContract(); - const startContract = { + return { actions: { createFiltersFromEvent: jest.fn().mockResolvedValue(['yes']), }, autocomplete: autocompleteMock, - getSuggestions: jest.fn(), - search: { - aggs: searchAggsStartMock(), - search: jest.fn(), - __LEGACY: { - AggConfig: jest.fn() as any, - AggType: jest.fn(), - aggTypeFieldFilters: new AggTypeFieldFilters(), - FieldParamType: jest.fn(), - MetricAggType: jest.fn(), - parentPipelineAggHelper: jest.fn() as any, - siblingPipelineAggHelper: jest.fn() as any, - esClient: { - search: jest.fn(), - msearch: jest.fn(), - }, - }, - }, + search: searchStartMock, fieldFormats: fieldFormatsMock as DataPublicPluginStart['fieldFormats'], query: queryStartMock, ui: { IndexPatternSelect: jest.fn(), SearchBar: jest.fn(), }, - __LEGACY: { - esClient: { - search: jest.fn(), - msearch: jest.fn(), - }, - }, indexPatterns: ({ make: () => ({ fieldsFetcher: { @@ -97,7 +64,6 @@ const createStartContract = (): Start => { get: jest.fn().mockReturnValue(Promise.resolve({})), } as unknown) as IndexPatternsContract, }; - return startContract; }; export { searchSourceMock } from './search/mocks'; diff --git a/src/plugins/data/public/search/index.ts b/src/plugins/data/public/search/index.ts index ac72cfd6f62ca..f3d2d99af5998 100644 --- a/src/plugins/data/public/search/index.ts +++ b/src/plugins/data/public/search/index.ts @@ -56,4 +56,6 @@ export { SortDirection, } from './search_source'; +export { SearchInterceptor } from './search_interceptor'; + export { FetchOptions } from './fetch'; diff --git a/src/plugins/data/public/search/mocks.ts b/src/plugins/data/public/search/mocks.ts index 71b4eece91cef..12cf258759a99 100644 --- a/src/plugins/data/public/search/mocks.ts +++ b/src/plugins/data/public/search/mocks.ts @@ -17,7 +17,9 @@ * under the License. */ -import { searchAggsSetupMock } from './aggs/mocks'; +import { searchAggsSetupMock, searchAggsStartMock } from './aggs/mocks'; +import { AggTypeFieldFilters } from './aggs/param_types/filter'; +import { ISearchStart } from './types'; export * from './search_source/mocks'; @@ -26,3 +28,24 @@ export const searchSetupMock = { registerSearchStrategyContext: jest.fn(), registerSearchStrategyProvider: jest.fn(), }; + +export const searchStartMock: jest.Mocked = { + aggs: searchAggsStartMock(), + search: jest.fn(), + cancel: jest.fn(), + getPendingCount$: jest.fn(), + runBeyondTimeout: jest.fn(), + __LEGACY: { + AggConfig: jest.fn() as any, + AggType: jest.fn(), + aggTypeFieldFilters: new AggTypeFieldFilters(), + FieldParamType: jest.fn(), + MetricAggType: jest.fn(), + parentPipelineAggHelper: jest.fn() as any, + siblingPipelineAggHelper: jest.fn() as any, + esClient: { + search: jest.fn(), + msearch: jest.fn(), + }, + }, +}; diff --git a/src/plugins/data/public/search/request_timeout_error.ts b/src/plugins/data/public/search/request_timeout_error.ts new file mode 100644 index 0000000000000..92894deb4f0ff --- /dev/null +++ b/src/plugins/data/public/search/request_timeout_error.ts @@ -0,0 +1,30 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Class used to signify that a request timed out. Useful for applications to conditionally handle + * this type of error differently than other errors. + */ +export class RequestTimeoutError extends Error { + constructor(message = 'Request timed out') { + super(message); + this.message = message; + this.name = 'RequestTimeoutError'; + } +} diff --git a/src/plugins/data/public/search/search_interceptor.test.ts b/src/plugins/data/public/search/search_interceptor.test.ts new file mode 100644 index 0000000000000..a89d17464b9e0 --- /dev/null +++ b/src/plugins/data/public/search/search_interceptor.test.ts @@ -0,0 +1,157 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Observable, Subject } from 'rxjs'; +import { IKibanaSearchRequest } from '../../common/search'; +import { RequestTimeoutError } from './request_timeout_error'; +import { SearchInterceptor } from './search_interceptor'; + +jest.useFakeTimers(); + +const flushPromises = () => new Promise(resolve => setImmediate(resolve)); +const mockSearch = jest.fn(); +let searchInterceptor: SearchInterceptor; + +describe('SearchInterceptor', () => { + beforeEach(() => { + mockSearch.mockClear(); + searchInterceptor = new SearchInterceptor(1000); + }); + + describe('search', () => { + test('should invoke `search` with the request', () => { + mockSearch.mockReturnValue(new Observable()); + const mockRequest: IKibanaSearchRequest = {}; + searchInterceptor.search(mockSearch, mockRequest); + expect(mockSearch.mock.calls[0][0]).toBe(mockRequest); + }); + + test('should mirror the observable to completion if the request does not time out', () => { + const mockResponse = new Subject(); + mockSearch.mockReturnValue(mockResponse.asObservable()); + const response = searchInterceptor.search(mockSearch, {}); + + setTimeout(() => mockResponse.next('hi'), 250); + setTimeout(() => mockResponse.complete(), 500); + + const next = jest.fn(); + const complete = jest.fn(); + response.subscribe({ next, complete }); + + jest.advanceTimersByTime(1000); + + expect(next).toHaveBeenCalledWith('hi'); + expect(complete).toHaveBeenCalled(); + }); + + test('should mirror the observable to error if the request does not time out', () => { + const mockResponse = new Subject(); + mockSearch.mockReturnValue(mockResponse.asObservable()); + const response = searchInterceptor.search(mockSearch, {}); + + setTimeout(() => mockResponse.next('hi'), 250); + setTimeout(() => mockResponse.error('error'), 500); + + const next = jest.fn(); + const error = jest.fn(); + response.subscribe({ next, error }); + + jest.advanceTimersByTime(1000); + + expect(next).toHaveBeenCalledWith('hi'); + expect(error).toHaveBeenCalledWith('error'); + }); + + test('should return a `RequestTimeoutError` if the request times out', () => { + mockSearch.mockReturnValue(new Observable()); + const response = searchInterceptor.search(mockSearch, {}); + + const error = jest.fn(); + response.subscribe({ error }); + + jest.advanceTimersByTime(1000); + + expect(error).toHaveBeenCalled(); + expect(error.mock.calls[0][0] instanceof RequestTimeoutError).toBe(true); + }); + }); + + describe('cancelPending', () => { + test('should abort all pending requests', async () => { + mockSearch.mockReturnValue(new Observable()); + + searchInterceptor.search(mockSearch, {}); + searchInterceptor.search(mockSearch, {}); + searchInterceptor.cancelPending(); + + await flushPromises(); + + const areAllRequestsAborted = mockSearch.mock.calls.every(([, { signal }]) => signal.aborted); + expect(areAllRequestsAborted).toBe(true); + }); + }); + + describe('runBeyondTimeout', () => { + test('should prevent the request from timing out', () => { + const mockResponse = new Subject(); + mockSearch.mockReturnValue(mockResponse.asObservable()); + const response = searchInterceptor.search(mockSearch, {}); + + setTimeout(searchInterceptor.runBeyondTimeout, 500); + setTimeout(() => mockResponse.next('hi'), 250); + setTimeout(() => mockResponse.complete(), 2000); + + const next = jest.fn(); + const complete = jest.fn(); + const error = jest.fn(); + response.subscribe({ next, error, complete }); + + jest.advanceTimersByTime(2000); + + expect(next).toHaveBeenCalledWith('hi'); + expect(error).not.toHaveBeenCalled(); + expect(complete).toHaveBeenCalled(); + }); + }); + + describe('getPendingCount$', () => { + test('should observe the number of pending requests', () => { + let i = 0; + const mockResponses = [new Subject(), new Subject()]; + mockSearch.mockImplementation(() => mockResponses[i++]); + + const pendingCount$ = searchInterceptor.getPendingCount$(); + + const next = jest.fn(); + pendingCount$.subscribe(next); + + const error = jest.fn(); + searchInterceptor.search(mockSearch, {}).subscribe({ error }); + searchInterceptor.search(mockSearch, {}).subscribe({ error }); + + setTimeout(() => mockResponses[0].complete(), 250); + setTimeout(() => mockResponses[1].error('error'), 500); + + jest.advanceTimersByTime(500); + + expect(next).toHaveBeenCalled(); + expect(next.mock.calls).toEqual([[0], [1], [2], [1], [0]]); + }); + }); +}); diff --git a/src/plugins/data/public/search/search_interceptor.ts b/src/plugins/data/public/search/search_interceptor.ts new file mode 100644 index 0000000000000..3f83214f6050c --- /dev/null +++ b/src/plugins/data/public/search/search_interceptor.ts @@ -0,0 +1,121 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { BehaviorSubject, fromEvent, throwError } from 'rxjs'; +import { mergeMap, takeUntil, finalize } from 'rxjs/operators'; +import { getCombinedSignal } from '../../common/utils'; +import { IKibanaSearchRequest } from '../../common/search'; +import { ISearchGeneric, ISearchOptions } from './i_search'; +import { RequestTimeoutError } from './request_timeout_error'; + +export class SearchInterceptor { + /** + * `abortController` used to signal all searches to abort. + */ + private abortController = new AbortController(); + + /** + * Observable that emits when the number of pending requests changes. + */ + private pendingCount$ = new BehaviorSubject(0); + + /** + * The IDs from `setTimeout` when scheduling the automatic timeout for each request. + */ + private timeoutIds: Set = new Set(); + + /** + * This class should be instantiated with a `requestTimeout` corresponding with how many ms after + * requests are initiated that they should automatically cancel. + * @param requestTimeout Usually config value `elasticsearch.requestTimeout` + */ + constructor(private readonly requestTimeout?: number) {} + + /** + * Abort our `AbortController`, which in turn aborts any intercepted searches. + */ + public cancelPending = () => { + this.abortController.abort(); + this.abortController = new AbortController(); + }; + + /** + * Un-schedule timing out all of the searches intercepted. + */ + public runBeyondTimeout = () => { + this.timeoutIds.forEach(clearTimeout); + this.timeoutIds.clear(); + }; + + /** + * Returns an `Observable` over the current number of pending searches. This could mean that one + * of the search requests is still in flight, or that it has only received partial responses. + */ + public getPendingCount$ = () => { + return this.pendingCount$.asObservable(); + }; + + /** + * Searches using the given `search` method. Overrides the `AbortSignal` with one that will abort + * either when `cancelPending` is called, when the request times out, or when the original + * `AbortSignal` is aborted. Updates the `pendingCount` when the request is started/finalized. + */ + public search = ( + search: ISearchGeneric, + request: IKibanaSearchRequest, + options?: ISearchOptions + ) => { + // Schedule this request to automatically timeout after some interval + const timeoutController = new AbortController(); + const { signal: timeoutSignal } = timeoutController; + const timeoutId = window.setTimeout(() => { + timeoutController.abort(); + }, this.requestTimeout); + this.addTimeoutId(timeoutId); + + // Get a combined `AbortSignal` that will be aborted whenever the first of the following occurs: + // 1. The user manually aborts (via `cancelPending`) + // 2. The request times out + // 3. The passed-in signal aborts (e.g. when re-fetching, or whenever the app determines) + const signals = [this.abortController.signal, timeoutSignal, options?.signal].filter( + Boolean + ) as AbortSignal[]; + const combinedSignal = getCombinedSignal(signals); + + // If the request timed out, throw a `RequestTimeoutError` + const timeoutError$ = fromEvent(timeoutSignal, 'abort').pipe( + mergeMap(() => throwError(new RequestTimeoutError())) + ); + + return search(request as any, { ...options, signal: combinedSignal }).pipe( + takeUntil(timeoutError$), + finalize(() => this.removeTimeoutId(timeoutId)) + ); + }; + + private addTimeoutId(id: number) { + this.timeoutIds.add(id); + this.pendingCount$.next(this.timeoutIds.size); + } + + private removeTimeoutId(id: number) { + this.timeoutIds.delete(id); + this.pendingCount$.next(this.timeoutIds.size); + } +} diff --git a/src/plugins/data/public/search/search_service.ts b/src/plugins/data/public/search/search_service.ts index 691c8aa0e984d..62c7e0468bb88 100644 --- a/src/plugins/data/public/search/search_service.ts +++ b/src/plugins/data/public/search/search_service.ts @@ -25,6 +25,7 @@ import { TStrategyTypes } from './strategy_types'; import { getEsClient, LegacyApiCaller } from './es_client'; import { ES_SEARCH_STRATEGY, DEFAULT_SEARCH_STRATEGY } from '../../common/search'; import { esSearchStrategyProvider } from './es_search/es_search_strategy'; +import { SearchInterceptor } from './search_interceptor'; import { getAggTypes, AggType, @@ -91,6 +92,16 @@ export class SearchService implements Plugin { } public start(core: CoreStart): ISearchStart { + /** + * A global object that intercepts all searches and provides convenience methods for cancelling + * all pending search requests, as well as getting the number of pending search requests. + * TODO: Make this modular so that apps can opt in/out of search collection, or even provide + * their own search collector instances + */ + const searchInterceptor = new SearchInterceptor( + core.injectedMetadata.getInjectedVar('esRequestTimeout') as number + ); + const aggTypesStart = this.aggTypesRegistry.start(); return { @@ -103,13 +114,16 @@ export class SearchService implements Plugin { }, types: aggTypesStart, }, + cancel: () => searchInterceptor.cancelPending(), + getPendingCount$: () => searchInterceptor.getPendingCount$(), + runBeyondTimeout: () => searchInterceptor.runBeyondTimeout(), search: (request, options, strategyName) => { const strategyProvider = this.getSearchStrategy(strategyName || DEFAULT_SEARCH_STRATEGY); const { search } = strategyProvider({ core, getSearchStrategy: this.getSearchStrategy, }); - return search(request as any, options); + return searchInterceptor.search(search as any, request, options); }, __LEGACY: { esClient: this.esClient!, diff --git a/src/plugins/data/public/search/search_source/search_source.test.ts b/src/plugins/data/public/search/search_source/search_source.test.ts index d2b8308bfb258..fcd116a3f4121 100644 --- a/src/plugins/data/public/search/search_source/search_source.test.ts +++ b/src/plugins/data/public/search/search_source/search_source.test.ts @@ -17,7 +17,7 @@ * under the License. */ -import { SearchSource } from '../search_source'; +import { SearchSource } from './search_source'; import { IndexPattern } from '../..'; import { mockDataServices } from '../aggs/test_helpers'; diff --git a/src/plugins/data/public/search/search_strategy/default_search_strategy.test.ts b/src/plugins/data/public/search/search_strategy/default_search_strategy.test.ts index e4f492c89e0ef..210a0e5fd1ac7 100644 --- a/src/plugins/data/public/search/search_strategy/default_search_strategy.test.ts +++ b/src/plugins/data/public/search/search_strategy/default_search_strategy.test.ts @@ -18,9 +18,9 @@ */ import { IUiSettingsClient } from '../../../../../core/public'; -import { ISearchStart } from '../types'; import { SearchStrategySearchParams } from './types'; import { defaultSearchStrategy } from './default_search_strategy'; +import { searchStartMock } from '../mocks'; const { search } = defaultSearchStrategy; @@ -56,6 +56,12 @@ describe('defaultSearchStrategy', function() { searchMockResponse.abort.mockClear(); searchMock.mockClear(); + const searchService = searchStartMock; + searchService.aggs.calculateAutoTimeExpression = jest.fn().mockReturnValue('1d'); + searchService.search = newSearchMock; + searchService.__LEGACY.esClient.search = searchMock; + searchService.__LEGACY.esClient.msearch = msearchMock; + searchArgs = { searchRequests: [ { @@ -63,15 +69,7 @@ describe('defaultSearchStrategy', function() { }, ], esShardTimeout: 0, - searchService: ({ - search: newSearchMock, - __LEGACY: { - esClient: { - search: searchMock, - msearch: msearchMock, - }, - }, - } as unknown) as jest.Mocked, + searchService, }; es = searchArgs.searchService.__LEGACY.esClient; diff --git a/src/plugins/data/public/search/types.ts b/src/plugins/data/public/search/types.ts index 1732c384b1a85..1b551f978b971 100644 --- a/src/plugins/data/public/search/types.ts +++ b/src/plugins/data/public/search/types.ts @@ -17,6 +17,7 @@ * under the License. */ +import { Observable } from 'rxjs'; import { CoreStart } from 'kibana/public'; import { SearchAggsSetup, SearchAggsStart, SearchAggsStartLegacy } from './aggs'; import { ISearch, ISearchGeneric } from './i_search'; @@ -86,6 +87,9 @@ export interface ISearchSetup { export interface ISearchStart { aggs: SearchAggsStart; + cancel: () => void; + getPendingCount$: () => Observable; + runBeyondTimeout: () => void; search: ISearchGeneric; __LEGACY: ISearchStartLegacy & SearchAggsStartLegacy; } diff --git a/src/plugins/expressions/common/execution/execution.ts b/src/plugins/expressions/common/execution/execution.ts index f70a32f2f09c1..d0ab178296408 100644 --- a/src/plugins/expressions/common/execution/execution.ts +++ b/src/plugins/expressions/common/execution/execution.ts @@ -22,6 +22,7 @@ import { Executor } from '../executor'; import { createExecutionContainer, ExecutionContainer } from './container'; import { createError } from '../util'; import { Defer, now } from '../../../kibana_utils/common'; +import { AbortError } from '../../../data/common'; import { RequestAdapter, DataAdapter } from '../../../inspector/common'; import { isExpressionValueError, ExpressionValueError } from '../expression_types/specs/error'; import { @@ -190,10 +191,7 @@ export class Execution< for (const link of chainArr) { // if execution was aborted return error if (this.context.abortSignal && this.context.abortSignal.aborted) { - return createError({ - message: 'The expression was aborted.', - name: 'AbortError', - }); + return createError(new AbortError('The expression was aborted.')); } const { function: fnName, arguments: fnArgs } = link; diff --git a/x-pack/plugins/data_enhanced/public/search/async_search_strategy.ts b/x-pack/plugins/data_enhanced/public/search/async_search_strategy.ts index fa5d677a53b2a..6271d7fcbeaac 100644 --- a/x-pack/plugins/data_enhanced/public/search/async_search_strategy.ts +++ b/x-pack/plugins/data_enhanced/public/search/async_search_strategy.ts @@ -6,6 +6,7 @@ import { EMPTY, fromEvent, NEVER, Observable, throwError, timer } from 'rxjs'; import { mergeMap, expand, takeUntil } from 'rxjs/operators'; +import { AbortError } from '../../../../../src/plugins/data/common'; import { IKibanaSearchResponse, ISearchContext, @@ -45,10 +46,7 @@ export const asyncSearchStrategyProvider: TSearchStrategyProvider { const config = await context.config$.pipe(first()).toPromise(); const defaultParams = getDefaultSearchParams(config); - const params = { ...defaultParams, trackTotalHits: true, ...request.params }; + const params = { ...defaultParams, ...request.params }; const response = await (request.indexType === 'rollup' ? rollupSearch(caller, { ...request, params }, options) @@ -45,11 +45,6 @@ export const enhancedEsSearchStrategyProvider: TSearchStrategyProvider) : (response as AsyncSearchResponse).response; - if (typeof rawResponse.hits.total !== 'number') { - // @ts-ignore This should be fixed as part of https://github.com/elastic/kibana/issues/26356 - rawResponse.hits.total = rawResponse.hits.total.value; - } - const id = (response as AsyncSearchResponse).id; const { total, failed, successful } = rawResponse._shards; const loaded = failed + successful;