Skip to content

Commit

Permalink
[Search Sessions] Save all sessions, with persisted flag (#89570)
Browse files Browse the repository at this point in the history
* [data.search] Add search session methods to search service contract

* Fix types

* Fix tests and switch to cancel

* Update docs

* Fix types/tests

* Fix tests

* Update status of SO before cancelling search requests

* Add API integration test

* Fix types

* Update expiration route to use config defaultExpiration

* Fix test

* Update docs

* New logic for extend

* Remove declare module

* Search Sessions: Unskip Flaky Functional Test

* Review feedback

* fix ts

* Save all search sessions and then manage them based on their persisted state

* Get default search session expiration from config

* randomize sleep time

* fix test

* Remove test that is no longer valid

* fix test

* Make sure we poll, and dont persist, searches not in the context of a session

* Added keepalive unit tests

* fix ts

* code review @lukasolson

* ts

* More tests, rename onScreenTimeout to completedTimeout

* lint

* lint

* Delete async seaches

* Support saved object pagination
Fix get search status tests

* better PersistedSearchSessionSavedObjectAttributes ts

* test titles

* Fix undefined bug

* Remove runAt from monitoring task
Increase testing trackingInterval (caused bug)

* support workload histograms that take into account overdue tasks

* Update touched when changing session status to complete \ error

* removed test

* Updated management test data

* Rename configs

* delete tap first
add comments

* Use DataRequestHandlerContext in maps

* ts

* Fixed ts

Co-authored-by: Lukas Olson <[email protected]>
Co-authored-by: Timothy Sullivan <[email protected]>
Co-authored-by: Kibana Machine <[email protected]>
Co-authored-by: Anton Dosov <[email protected]>
Co-authored-by: Gidi Meir Morris <[email protected]>
  • Loading branch information
6 people authored Feb 3, 2021
1 parent 1b1e831 commit 7fbcf68
Show file tree
Hide file tree
Showing 35 changed files with 1,795 additions and 883 deletions.
1 change: 0 additions & 1 deletion src/plugins/data/common/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,3 @@

/** @internal */
export { shortenDottedString } from './shorten_dotted_string';
export { tapFirst } from './tap_first';
19 changes: 0 additions & 19 deletions src/plugins/data/common/utils/tap_first.test.ts

This file was deleted.

20 changes: 0 additions & 20 deletions src/plugins/data/common/utils/tap_first.ts

This file was deleted.

144 changes: 144 additions & 0 deletions src/plugins/data/server/search/search_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ import { createIndexPatternsStartMock } from '../index_patterns/mocks';
import { SearchService, SearchServiceSetupDependencies } from './search_service';
import { bfetchPluginMock } from '../../../bfetch/server/mocks';
import { of } from 'rxjs';
import {
IEsSearchRequest,
IEsSearchResponse,
IScopedSearchClient,
IScopedSearchSessionsClient,
ISearchSessionService,
ISearchStart,
ISearchStrategy,
} from '.';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { expressionsPluginMock } from '../../../expressions/public/mocks';
import { createSearchSessionsClientMock } from './mocks';

describe('Search service', () => {
let plugin: SearchService;
Expand Down Expand Up @@ -70,4 +82,136 @@ describe('Search service', () => {
expect(start).toHaveProperty('getSearchStrategy');
});
});

describe('asScopedProvider', () => {
let mockScopedClient: IScopedSearchClient;
let searcPluginStart: ISearchStart<IEsSearchRequest, IEsSearchResponse<any>>;
let mockStrategy: jest.Mocked<ISearchStrategy>;
let mockSessionService: ISearchSessionService<any>;
let mockSessionClient: jest.Mocked<IScopedSearchSessionsClient>;
const sessionId = '1234';

beforeEach(() => {
mockStrategy = { search: jest.fn().mockReturnValue(of({})) };

mockSessionClient = createSearchSessionsClientMock();
mockSessionService = {
asScopedProvider: () => (request: any) => mockSessionClient,
};

const pluginSetup = plugin.setup(mockCoreSetup, {
bfetch: bfetchPluginMock.createSetupContract(),
expressions: expressionsPluginMock.createSetupContract(),
});
pluginSetup.registerSearchStrategy('es', mockStrategy);
pluginSetup.__enhance({
defaultStrategy: 'es',
sessionService: mockSessionService,
});

searcPluginStart = plugin.start(mockCoreStart, {
fieldFormats: createFieldFormatsStartMock(),
indexPatterns: createIndexPatternsStartMock(),
});

const r: any = {};

mockScopedClient = searcPluginStart.asScoped(r);
});

describe('search', () => {
it('searches using the original request if not restoring, trackId is not called if there is no id in the response', async () => {
const searchRequest = { params: {} };
const options = { sessionId, isStored: false, isRestore: false };
mockSessionClient.trackId = jest.fn();

mockStrategy.search.mockReturnValue(
of({
rawResponse: {} as any,
})
);

await mockScopedClient.search(searchRequest, options).toPromise();

const [request, callOptions] = mockStrategy.search.mock.calls[0];

expect(callOptions).toBe(options);
expect(request).toBe(searchRequest);
expect(mockSessionClient.trackId).not.toBeCalled();
});

it('searches using the original request if `id` is provided', async () => {
const searchId = 'FnpFYlBpeXdCUTMyZXhCLTc1TWFKX0EbdDFDTzJzTE1Sck9PVTBIcW1iU05CZzo4MDA0';
const searchRequest = { id: searchId, params: {} };
const options = { sessionId, isStored: true, isRestore: true };

await mockScopedClient.search(searchRequest, options).toPromise();

const [request, callOptions] = mockStrategy.search.mock.calls[0];
expect(callOptions).toBe(options);
expect(request).toBe(searchRequest);
});

it('searches by looking up an `id` if restoring and `id` is not provided', async () => {
const searchRequest = { params: {} };
const options = { sessionId, isStored: true, isRestore: true };

mockSessionClient.getId = jest.fn().mockResolvedValueOnce('my_id');

await mockScopedClient.search(searchRequest, options).toPromise();

const [request, callOptions] = mockStrategy.search.mock.calls[0];
expect(callOptions).toBe(options);
expect(request).toStrictEqual({ ...searchRequest, id: 'my_id' });
});

it('calls `trackId` for every response, if the response contains an `id` and not restoring', async () => {
const searchRequest = { params: {} };
const options = { sessionId, isStored: false, isRestore: false };
mockSessionClient.trackId = jest.fn();

mockStrategy.search.mockReturnValue(
of(
{
id: 'my_id',
rawResponse: {} as any,
},
{
id: 'my_id',
rawResponse: {} as any,
}
)
);

await mockScopedClient.search(searchRequest, options).toPromise();

expect(mockSessionClient.trackId).toBeCalledTimes(2);

expect(mockSessionClient.trackId.mock.calls[0]).toEqual([searchRequest, 'my_id', options]);
expect(mockSessionClient.trackId.mock.calls[1]).toEqual([searchRequest, 'my_id', options]);
});

it('does not call `trackId` if restoring', async () => {
const searchRequest = { params: {} };
const options = { sessionId, isStored: true, isRestore: true };
mockSessionClient.getId = jest.fn().mockResolvedValueOnce('my_id');
mockSessionClient.trackId = jest.fn();

await mockScopedClient.search(searchRequest, options).toPromise();

expect(mockSessionClient.trackId).not.toBeCalled();
});

it('does not call `trackId` if no session id provided', async () => {
const searchRequest = { params: {} };
const options = {};
mockSessionClient.getId = jest.fn().mockResolvedValueOnce('my_id');
mockSessionClient.trackId = jest.fn();

await mockScopedClient.search(searchRequest, options).toPromise();

expect(mockSessionClient.trackId).not.toBeCalled();
});
});
});
});
7 changes: 3 additions & 4 deletions src/plugins/data/server/search/search_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
SharedGlobalConfig,
StartServicesAccessor,
} from 'src/core/server';
import { first, switchMap } from 'rxjs/operators';
import { first, switchMap, tap } from 'rxjs/operators';
import { BfetchServerSetup } from 'src/plugins/bfetch/server';
import { ExpressionsServerSetup } from 'src/plugins/expressions/server';
import type {
Expand Down Expand Up @@ -65,7 +65,6 @@ import { aggShardDelay } from '../../common/search/aggs/buckets/shard_delay_fn';
import { ConfigSchema } from '../../config';
import { ISearchSessionService, SearchSessionService } from './session';
import { KbnServerError } from '../../../kibana_utils/server';
import { tapFirst } from '../../common';
import { registerBsearchRoute } from './routes/bsearch';

type StrategyMap = Record<string, ISearchStrategy<any, any>>;
Expand Down Expand Up @@ -274,8 +273,8 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {

return from(getSearchRequest()).pipe(
switchMap((searchRequest) => strategy.search(searchRequest, options, deps)),
tapFirst((response) => {
if (request.id || !options.sessionId || !response.id || options.isRestore) return;
tap((response) => {
if (!options.sessionId || !response.id || options.isRestore) return;
deps.searchSessionsClient.trackId(request, response.id, options);
})
);
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/data/server/search/session/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export interface IScopedSearchSessionsClient<T = unknown> {
options: ISearchOptions
) => Promise<void>;
getSearchIdMapping: (sessionId: string) => Promise<Map<string, string>>;
save: (sessionId: string, attributes: Partial<T>) => Promise<SavedObject<T>>;
save: (sessionId: string, attributes: Partial<T>) => Promise<SavedObject<T> | undefined>;
get: (sessionId: string) => Promise<SavedObject<T>>;
find: (options: Omit<SavedObjectsFindOptions, 'type'>) => Promise<SavedObjectsFindResponse<T>>;
update: (sessionId: string, attributes: Partial<T>) => Promise<SavedObjectsUpdateResponse<T>>;
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/data_enhanced/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

export {
SEARCH_SESSION_TYPE,
ENHANCED_ES_SEARCH_STRATEGY,
EQL_SEARCH_STRATEGY,
EqlRequestParams,
Expand Down
22 changes: 17 additions & 5 deletions x-pack/plugins/data_enhanced/common/search/session/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,25 @@

import { SearchSessionStatus } from './';

export const SEARCH_SESSION_TYPE = 'search-session';
export interface SearchSessionSavedObjectAttributes {
sessionId: string;
/**
* User-facing session name to be displayed in session management
*/
name: string;
name?: string;
/**
* App that created the session. e.g 'discover'
*/
appId: string;
appId?: string;
/**
* Creation time of the session
*/
created: string;
/**
* Last touch time of the session
*/
touched: string;
/**
* Expiration time of the session. Expiration itself is managed by Elasticsearch.
*/
Expand All @@ -30,22 +36,28 @@ export interface SearchSessionSavedObjectAttributes {
/**
* urlGeneratorId
*/
urlGeneratorId: string;
urlGeneratorId?: string;
/**
* The application state that was used to create the session.
* Should be used, for example, to re-load an expired search session.
*/
initialState: Record<string, unknown>;
initialState?: Record<string, unknown>;
/**
* Application state that should be used to restore the session.
* For example, relative dates are conveted to absolute ones.
*/
restoreState: Record<string, unknown>;
restoreState?: Record<string, unknown>;
/**
* Mapping of search request hashes to their corresponsing info (async search id, etc.)
*/
idMapping: Record<string, SearchSessionRequestInfo>;

/**
* This value is true if the session was actively stored by the user. If it is false, the session may be purged by the system.
*/
persisted: boolean;
}

export interface SearchSessionRequestInfo {
/**
* ID of the async search request
Expand Down
36 changes: 35 additions & 1 deletion x-pack/plugins/data_enhanced/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,49 @@ import { schema, TypeOf } from '@kbn/config-schema';
export const configSchema = schema.object({
search: schema.object({
sessions: schema.object({
/**
* Turns the feature on \ off (incl. removing indicator and management screens)
*/
enabled: schema.boolean({ defaultValue: false }),
/**
* pageSize controls how many search session objects we load at once while monitoring
* session completion
*/
pageSize: schema.number({ defaultValue: 10000 }),
/**
* trackingInterval controls how often we track search session objects progress
*/
trackingInterval: schema.duration({ defaultValue: '10s' }),
inMemTimeout: schema.duration({ defaultValue: '1m' }),
/**
* notTouchedTimeout controls how long do we store unpersisted search session results,
* after the last search in the session has completed
*/
notTouchedTimeout: schema.duration({ defaultValue: '5m' }),
/**
* notTouchedInProgressTimeout controls how long do allow a search session to run after
* a user has navigated away without persisting
*/
notTouchedInProgressTimeout: schema.duration({ defaultValue: '1m' }),
/**
* maxUpdateRetries controls how many retries we perform while attempting to save a search session
*/
maxUpdateRetries: schema.number({ defaultValue: 3 }),
/**
* defaultExpiration controls how long search sessions are valid for, until they are expired.
*/
defaultExpiration: schema.duration({ defaultValue: '7d' }),
management: schema.object({
/**
* maxSessions controls how many saved search sessions we display per page on the management screen.
*/
maxSessions: schema.number({ defaultValue: 10000 }),
/**
* refreshInterval controls how often we refresh the management screen.
*/
refreshInterval: schema.duration({ defaultValue: '10s' }),
/**
* refreshTimeout controls how often we refresh the management screen.
*/
refreshTimeout: schema.duration({ defaultValue: '1m' }),
expiresSoonWarning: schema.duration({ defaultValue: '1d' }),
}),
Expand Down
Loading

0 comments on commit 7fbcf68

Please sign in to comment.