Skip to content

Commit

Permalink
Use getStartServices in search strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
Liza K committed Jun 4, 2020
1 parent a7c6742 commit 6a223fd
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 69 deletions.
25 changes: 17 additions & 8 deletions examples/demo_search/public/async_demo_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,27 @@
* under the License.
*/

import { Observable } from 'rxjs';
import { DataPublicPluginSetup, ISearch } from '../../../src/plugins/data/public';
import { Observable, from } from 'rxjs';
import { CoreSetup } from 'kibana/public';
import { flatMap } from 'rxjs/operators';
import { ISearch } from '../../../src/plugins/data/public';
import { ASYNC_SEARCH_STRATEGY } from '../../../x-pack/plugins/data_enhanced/public';
import { ASYNC_DEMO_SEARCH_STRATEGY, IAsyncDemoResponse } from '../common';
import { DemoDataSearchStartDependencies } from './types';

export function asyncDemoClientSearchStrategyProvider(data: DataPublicPluginSetup) {
const asyncStrategy = data.search.getSearchStrategy(ASYNC_SEARCH_STRATEGY);
export function asyncDemoClientSearchStrategyProvider(core: CoreSetup) {
const search: ISearch<typeof ASYNC_DEMO_SEARCH_STRATEGY> = (request, options) => {
return asyncStrategy.search(
{ ...request, serverStrategy: ASYNC_DEMO_SEARCH_STRATEGY },
options
) as Observable<IAsyncDemoResponse>;
return from(core.getStartServices()).pipe(
flatMap((startServices) => {
const asyncStrategy = (startServices[1] as DemoDataSearchStartDependencies).data.search.getSearchStrategy(
ASYNC_SEARCH_STRATEGY
);
return asyncStrategy.search(
{ ...request, serverStrategy: ASYNC_DEMO_SEARCH_STRATEGY },
options
) as Observable<IAsyncDemoResponse>;
})
);
};
return { search };
}
29 changes: 17 additions & 12 deletions examples/demo_search/public/demo_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
* under the License.
*/

import { Observable } from 'rxjs';
import {
DataPublicPluginSetup,
ISearch,
SYNC_SEARCH_STRATEGY,
} from '../../../src/plugins/data/public';
import { Observable, from } from 'rxjs';
import { flatMap } from 'rxjs/operators';
import { CoreSetup } from 'kibana/public';
import { ISearch, SYNC_SEARCH_STRATEGY } from '../../../src/plugins/data/public';
import { DEMO_SEARCH_STRATEGY, IDemoResponse } from '../common';
import { DemoDataSearchStartDependencies } from './types';

/**
* This demo search strategy provider simply provides a shortcut for calling the DEMO_SEARCH_STRATEGY
Expand All @@ -48,13 +47,19 @@ import { DEMO_SEARCH_STRATEGY, IDemoResponse } from '../common';
*
* and are ensured type safety in regard to the request and response objects.
*/
export function demoClientSearchStrategyProvider(data: DataPublicPluginSetup) {
const syncStrategy = data.search.getSearchStrategy(SYNC_SEARCH_STRATEGY);
export function demoClientSearchStrategyProvider(core: CoreSetup) {
const search: ISearch<typeof DEMO_SEARCH_STRATEGY> = (request, options) => {
return syncStrategy.search(
{ ...request, serverStrategy: DEMO_SEARCH_STRATEGY },
options
) as Observable<IDemoResponse>;
return from(core.getStartServices()).pipe(
flatMap((startServices) => {
const syncStrategy = (startServices[1] as DemoDataSearchStartDependencies).data.search.getSearchStrategy(
SYNC_SEARCH_STRATEGY
);
return syncStrategy.search(
{ ...request, serverStrategy: DEMO_SEARCH_STRATEGY },
options
) as Observable<IDemoResponse>;
})
);
};
return { search };
}
13 changes: 5 additions & 8 deletions examples/demo_search/public/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
* under the License.
*/

import { DataPublicPluginSetup } from '../../../src/plugins/data/public';
import { Plugin, CoreSetup } from '../../../src/core/public';
import {
DEMO_SEARCH_STRATEGY,
Expand All @@ -29,10 +28,7 @@ import {
} from '../common';
import { demoClientSearchStrategyProvider } from './demo_search_strategy';
import { asyncDemoClientSearchStrategyProvider } from './async_demo_search_strategy';

interface DemoDataSearchSetupDependencies {
data: DataPublicPluginSetup;
}
import { DemoDataSearchSetupDependencies, DemoDataSearchStartDependencies } from './types';

/**
* Add the typescript mappings for our search strategy to the request and
Expand All @@ -55,10 +51,11 @@ declare module '../../../src/plugins/data/public' {
}
}

export class DemoDataPlugin implements Plugin {
export class DemoDataPlugin
implements Plugin<void, void, DemoDataSearchSetupDependencies, DemoDataSearchStartDependencies> {
public setup(core: CoreSetup, { data }: DemoDataSearchSetupDependencies) {
const demoClientSearchStrategy = demoClientSearchStrategyProvider(data);
const asyncDemoClientSearchStrategy = asyncDemoClientSearchStrategyProvider(data);
const demoClientSearchStrategy = demoClientSearchStrategyProvider(core);
const asyncDemoClientSearchStrategy = asyncDemoClientSearchStrategyProvider(core);
data.search.registerSearchStrategy(DEMO_SEARCH_STRATEGY, demoClientSearchStrategy);
data.search.registerSearchStrategy(ASYNC_DEMO_SEARCH_STRATEGY, asyncDemoClientSearchStrategy);
}
Expand Down
28 changes: 28 additions & 0 deletions examples/demo_search/public/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { DataPublicPluginStart, DataPublicPluginSetup } from '../../../src/plugins/data/public';

export interface DemoDataSearchSetupDependencies {
data: DataPublicPluginSetup;
}

export interface DemoDataSearchStartDependencies {
data: DataPublicPluginStart;
}
2 changes: 1 addition & 1 deletion src/plugins/data/public/search/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ export * from './search_source/mocks';
const searchSetupMock: jest.Mocked<ISearchSetup> = {
aggs: searchAggsSetupMock(),
registerSearchStrategy: jest.fn(),
getSearchStrategy: jest.fn(),
};

const searchStartMock: jest.Mocked<ISearchStart> = {
aggs: searchAggsStartMock(),
setInterceptor: jest.fn(),
getSearchStrategy: jest.fn(),
search: jest.fn(),
searchSource: searchSourceMock,
__LEGACY: {
Expand Down
1 change: 0 additions & 1 deletion src/plugins/data/public/search/search_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ describe('Search service', () => {
expressions: expressionsPluginMock.createSetupContract(),
} as any);
expect(setup).toHaveProperty('registerSearchStrategy');
expect(setup).toHaveProperty('getSearchStrategy');
});
});
});
2 changes: 1 addition & 1 deletion src/plugins/data/public/search/search_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
types: aggTypesSetup,
},
registerSearchStrategy: this.registerSearchStrategy,
getSearchStrategy: this.getSearchStrategy,
};
}

Expand Down Expand Up @@ -165,6 +164,7 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
},
types: aggTypesStart,
},
getSearchStrategy: this.getSearchStrategy,
search,
searchSource: {
create: createSearchSource(dependencies.indexPatterns, searchSourceDependencies),
Expand Down
9 changes: 5 additions & 4 deletions src/plugins/data/public/search/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,17 @@ export interface ISearchSetup {
* strategies.
*/
registerSearchStrategy: TRegisterSearchStrategy;
}

export interface ISearchStart {
aggs: SearchAggsStart;
setInterceptor: (searchInterceptor: SearchInterceptor) => void;

/**
* Used if a plugin needs access to an already registered search strategy.
*/
getSearchStrategy: TGetSearchStrategy;
}

export interface ISearchStart {
aggs: SearchAggsStart;
setInterceptor: (searchInterceptor: SearchInterceptor) => void;
search: ISearchGeneric;
searchSource: {
create: (fields?: SearchSourceFields) => Promise<ISearchSource>;
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/data_enhanced/public/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class DataEnhancedPlugin implements Plugin {
KUERY_LANGUAGE_NAME,
setupKqlQuerySuggestionProvider(core)
);
const asyncSearchStrategy = asyncSearchStrategyProvider(core, data);
const asyncSearchStrategy = asyncSearchStrategyProvider(core);
const esSearchStrategy = enhancedEsSearchStrategyProvider(core, asyncSearchStrategy);
data.search.registerSearchStrategy(ASYNC_SEARCH_STRATEGY, asyncSearchStrategy);
data.search.registerSearchStrategy(ES_SEARCH_STRATEGY, esSearchStrategy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,34 @@ import { of } from 'rxjs';
import { AbortController } from 'abort-controller';
import { CoreSetup } from '../../../../../src/core/public';
import { coreMock } from '../../../../../src/core/public/mocks';
import { DataPublicPluginSetup } from '../../../../../src/plugins/data/public';
import { DataPublicPluginStart } from '../../../../../src/plugins/data/public';
import { dataPluginMock } from '../../../../../src/plugins/data/public/mocks';
import { asyncSearchStrategyProvider } from './async_search_strategy';
import { IAsyncSearchOptions } from '.';

describe('Async search strategy', () => {
let mockCoreSetup: jest.Mocked<CoreSetup>;
let mockDataSetup: jest.Mocked<DataPublicPluginSetup>;
let mockDataStart: jest.Mocked<DataPublicPluginStart>;
const mockSearch = jest.fn();
const mockRequest = { params: {}, serverStrategy: 'foo' };
const mockOptions: IAsyncSearchOptions = { pollInterval: 0 };

beforeEach(() => {
mockCoreSetup = coreMock.createSetup();
mockDataSetup = dataPluginMock.createSetupContract();
(mockDataSetup.search.getSearchStrategy as jest.Mock).mockReturnValue({ search: mockSearch });
mockDataStart = dataPluginMock.createStartContract();
(mockDataStart.search.getSearchStrategy as jest.Mock).mockReturnValue({ search: mockSearch });
mockCoreSetup.getStartServices.mockResolvedValue([
undefined as any,
{ data: mockDataStart },
undefined,
]);
mockSearch.mockReset();
});

it('only sends one request if the first response is complete', async () => {
mockSearch.mockReturnValueOnce(of({ id: 1, total: 1, loaded: 1 }));

const asyncSearch = asyncSearchStrategyProvider(mockCoreSetup, mockDataSetup);
const asyncSearch = asyncSearchStrategyProvider(mockCoreSetup);

await asyncSearch.search(mockRequest, mockOptions).toPromise();

Expand All @@ -47,7 +52,7 @@ describe('Async search strategy', () => {
of({ id: 1, total: 2, loaded: 2, is_running: false, is_partial: false })
);

const asyncSearch = asyncSearchStrategyProvider(mockCoreSetup, mockDataSetup);
const asyncSearch = asyncSearchStrategyProvider(mockCoreSetup);
expect(mockSearch).toBeCalledTimes(0);

await asyncSearch.search(mockRequest, mockOptions).toPromise();
Expand All @@ -61,7 +66,7 @@ describe('Async search strategy', () => {
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2, is_running: false, is_partial: true }))
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2, is_running: false, is_partial: true }));

const asyncSearch = asyncSearchStrategyProvider(mockCoreSetup, mockDataSetup);
const asyncSearch = asyncSearchStrategyProvider(mockCoreSetup);
expect(mockSearch).toBeCalledTimes(0);

await asyncSearch
Expand All @@ -80,7 +85,7 @@ describe('Async search strategy', () => {
of({ id: 1, total: 2, loaded: 2, is_running: false, is_partial: false })
);

const asyncSearch = asyncSearchStrategyProvider(mockCoreSetup, mockDataSetup);
const asyncSearch = asyncSearchStrategyProvider(mockCoreSetup);

expect(mockSearch).toBeCalledTimes(0);

Expand All @@ -98,7 +103,7 @@ describe('Async search strategy', () => {
of({ id: 1, total: 2, loaded: 2, is_running: false, is_partial: false })
);

const asyncSearch = asyncSearchStrategyProvider(mockCoreSetup, mockDataSetup);
const asyncSearch = asyncSearchStrategyProvider(mockCoreSetup);

expect(mockSearch).toBeCalledTimes(0);

Expand All @@ -115,7 +120,7 @@ describe('Async search strategy', () => {
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 }))
.mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 }));

const asyncSearch = asyncSearchStrategyProvider(mockCoreSetup, mockDataSetup);
const asyncSearch = asyncSearchStrategyProvider(mockCoreSetup);
const abortController = new AbortController();
const options = { ...mockOptions, signal: abortController.signal };

Expand Down
53 changes: 30 additions & 23 deletions x-pack/plugins/data_enhanced/public/search/async_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { EMPTY, fromEvent, NEVER, throwError, timer, Observable } from 'rxjs';
import { mergeMap, expand, takeUntil } from 'rxjs/operators';
import { EMPTY, fromEvent, NEVER, throwError, timer, Observable, from } from 'rxjs';
import { mergeMap, expand, takeUntil, share, flatMap } from 'rxjs/operators';
import { CoreSetup } from '../../../../../src/core/public';
import { AbortError } from '../../../../../src/plugins/data/common';
import {
DataPublicPluginSetup,
ISearch,
ISearchStrategy,
ISyncSearchRequest,
SYNC_SEARCH_STRATEGY,
} from '../../../../../src/plugins/data/public';
import { IAsyncSearchOptions, IAsyncSearchResponse, IAsyncSearchRequest } from './types';
import { DataEnhancedStartDependencies } from '../plugin';

export const ASYNC_SEARCH_STRATEGY = 'ASYNC_SEARCH_STRATEGY';

Expand All @@ -26,10 +26,10 @@ declare module '../../../../../src/plugins/data/public' {
}

export function asyncSearchStrategyProvider(
core: CoreSetup,
data: DataPublicPluginSetup
core: CoreSetup
): ISearchStrategy<typeof ASYNC_SEARCH_STRATEGY> {
const syncSearch = data.search.getSearchStrategy(SYNC_SEARCH_STRATEGY);
const startServices$ = from(core.getStartServices()).pipe(share());

const search: ISearch<typeof ASYNC_SEARCH_STRATEGY> = (
request: ISyncSearchRequest,
{ pollInterval = 1000, ...options }: IAsyncSearchOptions = {}
Expand All @@ -51,27 +51,34 @@ export function asyncSearchStrategyProvider(
)
: NEVER;

return (syncSearch.search(request, options) as Observable<IAsyncSearchResponse>).pipe(
expand((response) => {
// If the response indicates of an error, stop polling and complete the observable
if (!response || (response.is_partial && !response.is_running)) {
return throwError(new AbortError());
}
return startServices$.pipe(
flatMap((startServices) => {
const syncSearch = (startServices[1] as DataEnhancedStartDependencies).data.search.getSearchStrategy(
SYNC_SEARCH_STRATEGY
);
return (syncSearch.search(request, options) as Observable<IAsyncSearchResponse>).pipe(
expand((response) => {
// If the response indicates of an error, stop polling and complete the observable
if (!response || (response.is_partial && !response.is_running)) {
return throwError(new AbortError());
}

// If the response indicates it is complete, stop polling and complete the observable
if (!response.is_running) return EMPTY;
// If the response indicates it is complete, stop polling and complete the observable
if (!response.is_running) return EMPTY;

id = response.id;
id = response.id;

// Delay by the given poll interval
return timer(pollInterval).pipe(
// Send future requests using just the ID from the response
mergeMap(() => {
return search({ id, serverStrategy }, options);
})
// Delay by the given poll interval
return timer(pollInterval).pipe(
// Send future requests using just the ID from the response
mergeMap(() => {
return search({ id, serverStrategy }, options);
})
);
}),
takeUntil(aborted$)
);
}),
takeUntil(aborted$)
})
);
};
return { search };
Expand Down

0 comments on commit 6a223fd

Please sign in to comment.