Skip to content

Commit

Permalink
Async search observable can return a non-final response #64132 (#64155)
Browse files Browse the repository at this point in the history
* resolves #64132

* simplify condition

* added IAsyncSearchResponse type

* update and add jest tests
  • Loading branch information
Liza Katz committed Apr 22, 2020
1 parent 314c395 commit c646e58
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ describe('Async search strategy', () => {

it('stops polling when the response is complete', async () => {
mockSearch
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 1 }))
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 }))
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 }));
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 1, is_running: true, is_partial: true }))
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2, is_running: false, is_partial: false }))
.mockReturnValueOnce(
of({ id: 1, total: 2, loaded: 2, is_running: false, is_partial: false })
);

const asyncSearch = asyncSearchStrategyProvider({
core: mockCoreStart,
Expand All @@ -67,10 +69,39 @@ describe('Async search strategy', () => {
expect(mockSearch).toBeCalledTimes(2);
});

it('stops polling when the response is an error', async () => {
mockSearch
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 1, is_running: true, is_partial: true }))
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2, is_running: false, is_partial: true }))
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2, is_running: false, is_partial: true }));

const asyncSearch = asyncSearchStrategyProvider({
core: mockCoreStart,
getSearchStrategy: jest.fn().mockImplementation(() => {
return () => {
return {
search: mockSearch,
};
};
}),
});

expect(mockSearch).toBeCalledTimes(0);

await asyncSearch
.search(mockRequest, mockOptions)
.toPromise()
.catch(() => {
expect(mockSearch).toBeCalledTimes(2);
});
});

it('only sends the ID and server strategy after the first request', async () => {
mockSearch
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 1 }))
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 }));
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 1, is_running: true, is_partial: true }))
.mockReturnValueOnce(
of({ id: 1, total: 2, loaded: 2, is_running: false, is_partial: false })
);

const asyncSearch = asyncSearchStrategyProvider({
core: mockCoreStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
SYNC_SEARCH_STRATEGY,
TSearchStrategyProvider,
} from '../../../../../src/plugins/data/public';
import { IAsyncSearchRequest, IAsyncSearchOptions } from './types';
import { IAsyncSearchRequest, IAsyncSearchOptions, IAsyncSearchResponse } from './types';

export const ASYNC_SEARCH_STRATEGY = 'ASYNC_SEARCH_STRATEGY';

Expand Down Expand Up @@ -52,9 +52,14 @@ export const asyncSearchStrategyProvider: TSearchStrategyProvider<typeof ASYNC_S
: NEVER;

return search(request, options).pipe(
expand(response => {
expand((response: IAsyncSearchResponse) => {
// If the response indicates of an error, stop polling and complete the observable
if (!response || (response.is_partial && !response.is_running)) {
return throwError(new AbortError());
}

// If the response indicates it is complete, stop polling and complete the observable
if ((response.loaded ?? 0) >= (response.total ?? 0)) return EMPTY;
if (!response.is_running) return EMPTY;

id = response.id;

Expand Down
17 changes: 16 additions & 1 deletion x-pack/plugins/data_enhanced/public/search/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { ISearchOptions, ISyncSearchRequest } from '../../../../../src/plugins/data/public';
import {
IKibanaSearchResponse,
ISearchOptions,
ISyncSearchRequest,
} from '../../../../../src/plugins/data/public';

export interface IAsyncSearchRequest extends ISyncSearchRequest {
/**
Expand All @@ -19,3 +23,14 @@ export interface IAsyncSearchOptions extends ISearchOptions {
*/
pollInterval?: number;
}

export interface IAsyncSearchResponse extends IKibanaSearchResponse {
/**
* Indicates whether async search is still in flight
*/
is_running?: boolean;
/**
* Indicates whether the results returned are complete or partial
*/
is_partial?: boolean;
}
12 changes: 10 additions & 2 deletions x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import { shimHitsTotal } from './shim_hits_total';

export interface AsyncSearchResponse<T> {
id: string;
is_partial: boolean;
is_running: boolean;
response: SearchResponse<T>;
}

Expand Down Expand Up @@ -71,13 +73,19 @@ async function asyncSearch(
// Wait up to 1s for the response to return
const query = toSnakeCase({ waitForCompletionTimeout: '1s', ...queryParams });

const { response, id } = (await caller(
const { id, response, is_partial, is_running } = (await caller(
'transport.request',
{ method, path, body, query },
options
)) as AsyncSearchResponse<any>;

return { id, rawResponse: shimHitsTotal(response), ...getTotalLoaded(response._shards) };
return {
id,
is_partial,
is_running,
rawResponse: shimHitsTotal(response),
...getTotalLoaded(response._shards),
};
}

async function rollupSearch(
Expand Down

0 comments on commit c646e58

Please sign in to comment.