Skip to content

Commit

Permalink
code review round 1
Browse files Browse the repository at this point in the history
  • Loading branch information
Liza K committed Apr 13, 2021
1 parent 6671fbe commit 07423c1
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 33 deletions.
2 changes: 1 addition & 1 deletion src/plugins/data/public/search/search_interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ export class SearchInterceptor {
}

protected getSerializableOptions(options?: ISearchOptions) {
const { abortSignal, sessionId, ...requestOptions } = options || {};
const { sessionId, ...requestOptions } = options || {};

const serializableOptions: ISearchOptionsSerializable = {};
const combined = {
Expand Down
13 changes: 9 additions & 4 deletions x-pack/plugins/data_enhanced/public/search/search_interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {

private createRequestHash$(request: IKibanaSearchRequest, options: IAsyncSearchOptions) {
const { sessionId, isRestore } = options;
// Preference is used to ensure all queries go to the same set of shards and it doesn't need to be hashed
// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-shard-routing.html#shard-and-node-preference
const { preference, ...params } = request.params || {};
const hashOptions = {
...params,
Expand Down Expand Up @@ -130,6 +132,9 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
savedToBackgroundSub.unsubscribe();
}
}),
// This observable is cached in the responseCache.
// Using shareReplay makes sure that future subscribers will get the final response

shareReplay(1)
);
}
Expand All @@ -151,7 +156,7 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {

// Create a new abort signal if one was not passed. This fake signal will never be aborted,
// So the underlaying search will not be aborted, even if the other consumers abort.
searchAbortController.addAbortSignal(options.abortSignal || new AbortController().signal);
searchAbortController.addAbortSignal(options.abortSignal ?? new AbortController().signal);
const response$ = cached?.response$ || this.runSearch$(request, options, searchAbortController);

if (requestHash && !this.responseCache.has(requestHash)) {
Expand All @@ -174,11 +179,11 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
};
const { sessionId, abortSignal } = searchOptions;

return this.createRequestHash$(request, options).pipe(
return this.createRequestHash$(request, searchOptions).pipe(
switchMap((requestHash) => {
const { searchAbortController, response$ } = this.getSearchResponse$(
request,
options,
searchOptions,
requestHash
);

Expand All @@ -199,7 +204,7 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
takeUntil(aborted$),
catchError((e) => {
return throwError(
this.handleSearchError(e, options, searchAbortController.isTimeout())
this.handleSearchError(e, searchOptions, searchAbortController.isTimeout())
);
}),
finalize(() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { IKibanaSearchResponse } from 'src/plugins/data/public';
import { SearchAbortController } from './search_abort_controller';
import { SearchResponseCache } from './search_response_cache';

describe('', () => {
describe('SearchResponseCache', () => {
let cache: SearchResponseCache;
let searchAbortController: SearchAbortController;
const r: Array<IKibanaSearchResponse<any>> = [
Expand Down Expand Up @@ -59,7 +59,7 @@ describe('', () => {
);
}

function wrapWithAbotController(response$: Observable<IKibanaSearchResponse<any>>) {
function wrapWithAbortController(response$: Observable<IKibanaSearchResponse<any>>) {
return {
response$,
searchAbortController,
Expand All @@ -74,8 +74,8 @@ describe('', () => {
describe('Cache eviction', () => {
test('clear evicts all', () => {
const finalResult = r[r.length - 1];
cache.set('123', wrapWithAbotController(of(finalResult)));
cache.set('234', wrapWithAbotController(of(finalResult)));
cache.set('123', wrapWithAbortController(of(finalResult)));
cache.set('234', wrapWithAbortController(of(finalResult)));

cache.clear();

Expand All @@ -101,8 +101,8 @@ describe('', () => {
},
},
]);
cache.set('123', wrapWithAbotController(err$));
cache.set('234', wrapWithAbotController(res$));
cache.set('123', wrapWithAbortController(err$));
cache.set('234', wrapWithAbortController(res$));

const errHandler = jest.fn();
await err$.toPromise().catch(errHandler);
Expand Down Expand Up @@ -130,7 +130,7 @@ describe('', () => {
},
},
]);
cache.set('123', wrapWithAbotController(err$));
cache.set('123', wrapWithAbortController(err$));

const errHandler = jest.fn();
await err$.toPromise().catch(errHandler);
Expand All @@ -141,10 +141,10 @@ describe('', () => {

test('evicts oldest item if has too many cached items', async () => {
const finalResult = r[r.length - 1];
cache.set('123', wrapWithAbotController(of(finalResult)));
cache.set('234', wrapWithAbotController(of(finalResult)));
cache.set('345', wrapWithAbotController(of(finalResult)));
cache.set('456', wrapWithAbotController(of(finalResult)));
cache.set('123', wrapWithAbortController(of(finalResult)));
cache.set('234', wrapWithAbortController(of(finalResult)));
cache.set('345', wrapWithAbortController(of(finalResult)));
cache.set('456', wrapWithAbortController(of(finalResult)));

expect(cache.get('123')).toBeUndefined();
expect(cache.get('234')).not.toBeUndefined();
Expand All @@ -170,9 +170,9 @@ describe('', () => {
},
]);

cache.set('123', wrapWithAbotController(largeResult$));
cache.set('234', wrapWithAbotController(largeResult$));
cache.set('345', wrapWithAbotController(largeResult$));
cache.set('123', wrapWithAbortController(largeResult$));
cache.set('234', wrapWithAbortController(largeResult$));
cache.set('345', wrapWithAbortController(largeResult$));

await largeResult$.toPromise();

Expand All @@ -199,21 +199,21 @@ describe('', () => {
},
]);

cache.set('234', wrapWithAbotController(largeResult$));
cache.set('234', wrapWithAbortController(largeResult$));
await largeResult$.toPromise();
expect(cache.get('234')).toBeUndefined();
});

test('get updates the insertion time of an item', async () => {
const finalResult = r[r.length - 1];
cache.set('123', wrapWithAbotController(of(finalResult)));
cache.set('234', wrapWithAbotController(of(finalResult)));
cache.set('345', wrapWithAbotController(of(finalResult)));
cache.set('123', wrapWithAbortController(of(finalResult)));
cache.set('234', wrapWithAbortController(of(finalResult)));
cache.set('345', wrapWithAbortController(of(finalResult)));

cache.get('123');
cache.get('234');

cache.set('456', wrapWithAbotController(of(finalResult)));
cache.set('456', wrapWithAbortController(of(finalResult)));

expect(cache.get('123')).not.toBeUndefined();
expect(cache.get('234')).not.toBeUndefined();
Expand All @@ -225,14 +225,14 @@ describe('', () => {
describe('Observable behavior', () => {
test('caches a response and re-emits it', async () => {
const s$ = getSearchObservable$();
cache.set('123', wrapWithAbotController(s$));
cache.set('123', wrapWithAbortController(s$));
const finalRes = await cache.get('123')!.response$.toPromise();
expect(finalRes).toStrictEqual(r[r.length - 1]);
});

test('cached$ should emit same as original search$', async () => {
const s$ = getSearchObservable$();
cache.set('123', wrapWithAbotController(s$));
cache.set('123', wrapWithAbortController(s$));

const next = jest.fn();
const cached$ = cache.get('123');
Expand All @@ -252,7 +252,7 @@ describe('', () => {

test('cached$ should emit only current value and keep emitting if subscribed while search$ is running', async () => {
const s$ = getSearchObservable$();
cache.set('123', wrapWithAbotController(s$));
cache.set('123', wrapWithAbortController(s$));

const next = jest.fn();
let cached$: Observable<IKibanaSearchResponse<any>> | undefined;
Expand All @@ -278,7 +278,7 @@ describe('', () => {

test('cached$ should emit only last value if subscribed after search$ was complete 1', async () => {
const finalResult = r[r.length - 1];
const s$ = wrapWithAbotController(of(finalResult));
const s$ = wrapWithAbortController(of(finalResult));
cache.set('123', s$);

// wait for original search to complete
Expand All @@ -298,7 +298,7 @@ describe('', () => {

test('cached$ should emit only last value if subscribed after search$ was complete', async () => {
const s$ = getSearchObservable$();
cache.set('123', wrapWithAbotController(s$));
cache.set('123', wrapWithAbortController(s$));

// wait for original search to complete
await s$!.toPromise();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ interface ResponseCacheItemInternal {
subs: Subscription;
}

export const CACHE_MAX_SIZE_MB = 10;

export class SearchResponseCache {
private responseCache: Map<string, ResponseCacheItemInternal>;
private cacheSize = 0;
Expand Down Expand Up @@ -66,7 +64,7 @@ export class SearchResponseCache {
this.responseCache.size > this.maxItems ||
this.byteToMb(this.cacheSize) > this.maxCacheSizeMB
) {
const [key] = this.responseCache.entries().next().value as [string, ResponseCacheItem];
const [key] = [...this.responseCache.keys()];
this.deleteItem(key);
}
}
Expand Down Expand Up @@ -102,7 +100,7 @@ export class SearchResponseCache {
response$.subscribe({
next: (r) => {
// TODO: avoid stringiying. Get the size some other way!
const newSize = JSON.stringify(r).length;
const newSize = (new Blob([JSON.stringify(r)])).size;
if (this.byteToMb(newSize) < this.maxCacheSizeMB && !isErrorResponse(r)) {
this.setItem(key, {
...cacheItem,
Expand Down

0 comments on commit 07423c1

Please sign in to comment.