From adaae665e3e109be7c001d10f01f7088ae7b0dd9 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Wed, 2 Nov 2022 19:50:51 -0700 Subject: [PATCH 01/10] [Reporting] use point-in-time for paging search results --- .../search/search_source/search_source.ts | 8 +- .../data/common/search/search_source/types.ts | 5 + .../es_search/es_search_strategy.ts | 21 +- .../strategies/es_search/request_utils.ts | 2 +- .../__snapshots__/generate_csv.test.ts.snap | 2 +- .../generate_csv/generate_csv.test.ts | 220 ++++++++++-------- .../generate_csv/generate_csv.ts | 167 +++++++------ 7 files changed, 242 insertions(+), 183 deletions(-) diff --git a/src/plugins/data/common/search/search_source/search_source.ts b/src/plugins/data/common/search/search_source/search_source.ts index 497a247668694..fad799c7915b1 100644 --- a/src/plugins/data/common/search/search_source/search_source.ts +++ b/src/plugins/data/common/search/search_source/search_source.ts @@ -667,6 +667,8 @@ export class SearchSource { getConfig(UI_SETTINGS.SORT_OPTIONS) ); return addToBody(key, sort); + case 'pit': + return addToRoot(key, val); case 'aggs': if ((val as unknown) instanceof AggConfigs) { return addToBody('aggs', val.toDsl()); @@ -768,7 +770,7 @@ export class SearchSource { const { getConfig } = this.dependencies; const searchRequest = this.mergeProps(); searchRequest.body = searchRequest.body || {}; - const { body, index, query, filters, highlightAll } = searchRequest; + const { body, index, query, filters, highlightAll, pit } = searchRequest; searchRequest.indexType = this.getIndexType(index); const metaFields = getConfig(UI_SETTINGS.META_FIELDS) ?? []; @@ -911,6 +913,10 @@ export class SearchSource { delete searchRequest.highlightAll; } + if (pit) { + body.pit = pit; + } + return searchRequest; } diff --git a/src/plugins/data/common/search/search_source/types.ts b/src/plugins/data/common/search/search_source/types.ts index a583a1d1112cc..b899a74efcab6 100644 --- a/src/plugins/data/common/search/search_source/types.ts +++ b/src/plugins/data/common/search/search_source/types.ts @@ -116,6 +116,11 @@ export interface SearchSourceFields { timeout?: string; terminate_after?: number; + /** + * Allow querying to use a point-in-time ID for paging results + */ + pit?: estypes.SearchPointInTimeReference; + parent?: SearchSourceFields; } diff --git a/src/plugins/data/server/search/strategies/es_search/es_search_strategy.ts b/src/plugins/data/server/search/strategies/es_search/es_search_strategy.ts index 73a3b58704877..22bf32fffb2a7 100644 --- a/src/plugins/data/server/search/strategies/es_search/es_search_strategy.ts +++ b/src/plugins/data/server/search/strategies/es_search/es_search_strategy.ts @@ -6,15 +6,17 @@ * Side Public License, v 1. */ -import { firstValueFrom, from, Observable } from 'rxjs'; -import { tap } from 'rxjs/operators'; +import { estypes } from '@elastic/elasticsearch'; import type { Logger, SharedGlobalConfig } from '@kbn/core/server'; import { getKbnServerError, KbnServerError } from '@kbn/kibana-utils-plugin/server'; -import type { ISearchStrategy } from '../../types'; +import { omit } from 'lodash'; +import { firstValueFrom, from, Observable } from 'rxjs'; +import { tap } from 'rxjs/operators'; import type { SearchUsage } from '../../collectors/search'; +import { searchUsageObserver } from '../../collectors/search/usage'; +import type { ISearchStrategy } from '../../types'; import { getDefaultSearchParams, getShardTimeout } from './request_utils'; import { shimHitsTotal, toKibanaSearchResponse } from './response_utils'; -import { searchUsageObserver } from '../../collectors/search/usage'; export const esSearchStrategyProvider = ( config$: Observable, @@ -35,13 +37,22 @@ export const esSearchStrategyProvider = ( throw new KbnServerError(`Unsupported index pattern type ${request.indexType}`, 400); } + const isPit = request.params?.body?.pit != null; + const search = async () => { try { const config = await firstValueFrom(config$); // @ts-expect-error params fall back to any, but should be valid SearchRequest params const { terminateAfter, ...requestParams } = request.params ?? {}; + let defaults: estypes.IndicesOptions = await getDefaultSearchParams(uiSettingsClient); + + if (isPit) { + // Remove IndicesOptions from the request if PIT is used, these options are set in the PIT + defaults = omit(defaults, ['ignore_unavailable']); + } + const params = { - ...(await getDefaultSearchParams(uiSettingsClient)), + ...defaults, ...getShardTimeout(config), ...(terminateAfter ? { terminate_after: terminateAfter } : {}), ...requestParams, diff --git a/src/plugins/data/server/search/strategies/es_search/request_utils.ts b/src/plugins/data/server/search/strategies/es_search/request_utils.ts index 2418ccfb49a0c..fa1bfdf635f70 100644 --- a/src/plugins/data/server/search/strategies/es_search/request_utils.ts +++ b/src/plugins/data/server/search/strategies/es_search/request_utils.ts @@ -21,7 +21,7 @@ export async function getDefaultSearchParams( uiSettingsClient: Pick ): Promise<{ max_concurrent_shard_requests?: number; - ignore_unavailable: boolean; + ignore_unavailable?: boolean; track_total_hits: boolean; }> { const maxConcurrentShardRequests = await uiSettingsClient.get( diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/__snapshots__/generate_csv.test.ts.snap b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/__snapshots__/generate_csv.test.ts.snap index 855b447d85ced..c10911d7687d3 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/__snapshots__/generate_csv.test.ts.snap +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/__snapshots__/generate_csv.test.ts.snap @@ -73,7 +73,7 @@ exports[`keeps order of the columns during the scroll 1`] = ` " `; -exports[`uses the scrollId to page all the data 1`] = ` +exports[`uses the pit ID to page all the data 1`] = ` "date,ip,message \\"2020-12-31T00:14:28.000Z\\",\\"110.135.176.89\\",\\"hit from the initial search\\" \\"2020-12-31T00:14:28.000Z\\",\\"110.135.176.89\\",\\"hit from the initial search\\" diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts index ee00ea28cc05e..2c2f343959a2a 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts @@ -85,6 +85,8 @@ const getMockConfig = (properties: DeepPartial = {}) => { return config.get('csv'); }; +const mockPitId = 'oju9fs3698s3902f02-8qg3-u9w36oiewiuyew6'; + beforeEach(async () => { content = ''; stream = { write: jest.fn((chunk) => (content += chunk)) } as unknown as typeof stream; @@ -92,6 +94,8 @@ beforeEach(async () => { mockDataClient = dataPluginMock.createStartContract().search.asScoped({} as any); mockDataClient.search = mockDataClientSearchDefault; + mockEsClient.asCurrentUser.openPointInTime = jest.fn().mockResolvedValueOnce({ id: mockPitId }); + uiSettingsClient = uiSettingsServiceMock .createStartContract() .asScopedToClient(savedObjectsClientMock.create()); @@ -117,6 +121,8 @@ beforeEach(async () => { searchSourceMock.getField = jest.fn((key: string) => { switch (key) { + case 'pit': + return { id: mockPitId }; case 'index': return { fields: { @@ -283,36 +289,40 @@ it('warns if max size was reached', async () => { expect(content).toMatchSnapshot(); }); -it('uses the scrollId to page all the data', async () => { - mockDataClient.search = jest.fn().mockImplementation(() => - Rx.of({ - rawResponse: { - _scroll_id: 'awesome-scroll-hero', - hits: { - hits: range(0, HITS_TOTAL / 10).map(() => ({ - fields: { - date: ['2020-12-31T00:14:28.000Z'], - ip: ['110.135.176.89'], - message: ['hit from the initial search'], - }, - })), - total: HITS_TOTAL, +it('uses the pit ID to page all the data', async () => { + mockDataClient.search = jest + .fn() + .mockImplementationOnce(() => + Rx.of({ + rawResponse: { + hits: { + hits: range(0, HITS_TOTAL / 10).map(() => ({ + fields: { + date: ['2020-12-31T00:14:28.000Z'], + ip: ['110.135.176.89'], + message: ['hit from the initial search'], + }, + })), + total: HITS_TOTAL, + }, }, - }, - }) - ); - - mockEsClient.asCurrentUser.scroll = jest.fn().mockResolvedValue({ - hits: { - hits: range(0, HITS_TOTAL / 10).map(() => ({ - fields: { - date: ['2020-12-31T00:14:28.000Z'], - ip: ['110.135.176.89'], - message: ['hit from a subsequent scroll'], + }) + ) + .mockImplementation(() => + Rx.of({ + rawResponse: { + hits: { + hits: range(0, HITS_TOTAL / 10).map(() => ({ + fields: { + date: ['2020-12-31T00:14:28.000Z'], + ip: ['110.135.176.89'], + message: ['hit from a subsequent scroll'], + }, + })), + }, }, - })), - }, - }); + }) + ); const generateCsv = new CsvGenerator( createMockJob({ columns: ['date', 'ip', 'message'] }), @@ -334,70 +344,78 @@ it('uses the scrollId to page all the data', async () => { expect(csvResult.warnings).toEqual([]); expect(content).toMatchSnapshot(); - expect(mockDataClient.search).toHaveBeenCalledTimes(1); + expect(mockDataClient.search).toHaveBeenCalledTimes(10); expect(mockDataClient.search).toBeCalledWith( - { params: { body: {}, ignore_throttled: undefined, scroll: '30s', size: 500 } }, + { params: { body: {}, ignore_throttled: undefined } }, { strategy: 'es', transport: { maxRetries: 0, requestTimeout: '30s' } } ); - // `scroll` and `clearScroll` must be called with scroll ID in the post body! - expect(mockEsClient.asCurrentUser.scroll).toHaveBeenCalledTimes(9); - expect(mockEsClient.asCurrentUser.scroll).toHaveBeenCalledWith({ - scroll: '30s', - scroll_id: 'awesome-scroll-hero', + expect(mockEsClient.asCurrentUser.openPointInTime).toHaveBeenCalledTimes(1); + expect(mockEsClient.asCurrentUser.openPointInTime).toHaveBeenCalledWith({ + ignore_unavailable: true, + index: undefined, + keep_alive: '30s', }); - expect(mockEsClient.asCurrentUser.clearScroll).toHaveBeenCalledTimes(1); - expect(mockEsClient.asCurrentUser.clearScroll).toHaveBeenCalledWith({ - scroll_id: ['awesome-scroll-hero'], + expect(mockEsClient.asCurrentUser.closePointInTime).toHaveBeenCalledTimes(1); + expect(mockEsClient.asCurrentUser.closePointInTime).toHaveBeenCalledWith({ + body: { id: mockPitId }, }); }); it('keeps order of the columns during the scroll', async () => { - mockDataClient.search = jest.fn().mockImplementation(() => - Rx.of({ - rawResponse: { - _scroll_id: 'awesome-scroll-hero', - hits: { - hits: [ - { - fields: { - a: ['a1'], - b: ['b1'], + mockDataClient.search = jest + .fn() + .mockImplementationOnce(() => + Rx.of({ + rawResponse: { + hits: { + hits: [ + { + fields: { + a: ['a1'], + b: ['b1'], + }, }, - }, - ], - total: 3, + ], + total: 3, + }, }, - }, - }) - ); - - mockEsClient.asCurrentUser.scroll = jest - .fn() - .mockResolvedValueOnce({ - hits: { - hits: [ - { - fields: { - b: ['b2'], - }, + }) + ) + .mockImplementationOnce(() => + Rx.of({ + rawResponse: { + hits: { + hits: [ + { + fields: { + b: ['b2'], + }, + }, + ], + total: 3, }, - ], - }, - }) - .mockResolvedValueOnce({ - hits: { - hits: [ - { - fields: { - a: ['a3'], - c: ['c3'], - }, + }, + }) + ) + .mockImplementationOnce(() => + Rx.of({ + rawResponse: { + hits: { + hits: [ + { + fields: { + a: ['a3'], + c: ['c3'], + }, + }, + ], + total: 3, }, - ], - }, - }); + }, + }) + ); const generateCsv = new CsvGenerator( createMockJob({ searchSource: {}, columns: [] }), @@ -875,8 +893,6 @@ it('can override ignoring frozen indices', async () => { params: { body: {}, ignore_throttled: false, - scroll: '30s', - size: 500, }, }, { strategy: 'es', transport: { maxRetries: 0, requestTimeout: '30s' } } @@ -928,7 +944,7 @@ it('will return partial data if the scroll or search fails', async () => { expect(mockLogger.error.mock.calls).toMatchInlineSnapshot(` Array [ Array [ - "CSV export scan error: ResponseError: my error", + "CSV export search error: ResponseError: my error", ], Array [ [ResponseError: my error], @@ -978,27 +994,27 @@ it('handles unknown errors', async () => { describe('error codes', () => { it('returns the expected error code when authentication expires', async () => { - mockDataClient.search = jest.fn().mockImplementation(() => - Rx.of({ - rawResponse: { - _scroll_id: 'test', - hits: { - hits: range(0, 5).map(() => ({ - fields: { - date: ['2020-12-31T00:14:28.000Z'], - ip: ['110.135.176.89'], - message: ['super cali fragile istic XPLA docious'], - }, - })), - total: 10, + mockDataClient.search = jest + .fn() + .mockImplementationOnce(() => + Rx.of({ + rawResponse: { + hits: { + hits: range(0, 5).map(() => ({ + fields: { + date: ['2020-12-31T00:14:28.000Z'], + ip: ['110.135.176.89'], + message: ['super cali fragile istic XPLA docious'], + }, + })), + total: 10, + }, }, - }, - }) - ); - - mockEsClient.asCurrentUser.scroll = jest.fn().mockImplementation(() => { - throw new esErrors.ResponseError({ statusCode: 403, meta: {} as any, warnings: [] }); - }); + }) + ) + .mockImplementationOnce(() => { + throw new esErrors.ResponseError({ statusCode: 403, meta: {} as any, warnings: [] }); + }); const generateCsv = new CsvGenerator( createMockJob({ columns: ['date', 'ip', 'message'] }), @@ -1029,7 +1045,7 @@ describe('error codes', () => { expect(mockLogger.error.mock.calls).toMatchInlineSnapshot(` Array [ Array [ - "CSV export scroll error: ResponseError: Response Error", + "CSV export search error: ResponseError: Response Error", ], Array [ [ResponseError: Response Error], diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts index d287ec58530b9..bc2071e0b4ed0 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts @@ -5,15 +5,9 @@ * 2.0. */ -import { errors as esErrors } from '@elastic/elasticsearch'; -import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import { errors as esErrors, estypes } from '@elastic/elasticsearch'; import type { IScopedClusterClient, IUiSettingsClient, Logger } from '@kbn/core/server'; -import type { - DataView, - ISearchSource, - ISearchStartSearchSource, - SearchRequest, -} from '@kbn/data-plugin/common'; +import type { ISearchSource, ISearchStartSearchSource } from '@kbn/data-plugin/common'; import { cellHasFormulas, ES_SEARCH_STRATEGY, tabifyDocs } from '@kbn/data-plugin/common'; import type { IScopedSearchClient } from '@kbn/data-plugin/server'; import type { Datatable } from '@kbn/expressions-plugin/server'; @@ -61,21 +55,65 @@ export class CsvGenerator { private stream: Writable ) {} - private async scan(index: DataView, searchSource: ISearchSource, settings: CsvExportSettings) { + private async openPointInTime(indexPatternTitle: string, settings: CsvExportSettings) { + const { duration } = settings.scroll; + let pitId: string | undefined; + this.logger.debug(`Open point-in-time for index: ${indexPatternTitle}`); + try { + // open PIT and set field format config + // use _pit API + const response = await this.clients.es.asCurrentUser.openPointInTime({ + index: indexPatternTitle, + keep_alive: duration, + ignore_unavailable: true, + }); + pitId = response.id; + } catch (err) { + this.logger.error(err); + throw err; + } + + if (!pitId) { + throw new Error(`Could not receive a point-in-time ID!`); + } + + return pitId; + } + + private async doSearch( + searchSource: ISearchSource, + settings: CsvExportSettings, + lastSortId?: estypes.SortResults + ) { const { scroll: scrollSettings, includeFrozen } = settings; - const searchBody: SearchRequest | undefined = searchSource.getSearchRequestBody(); + searchSource.setField('size', scrollSettings.size); + + { + const pitId = searchSource.getField('pit')?.id; + if (!pitId) { + throw new Error(`Unable to retrieve point-in-time ID from search source!`); + } + if (lastSortId) { + this.logger.debug( + `Executing search request with point-in-time ID: ${this.truncatePitId(pitId)}` + + `, and search_after: [${lastSortId}]` + ); + searchSource.setField('searchAfter', lastSortId); + } else { + this.logger.debug( + `Executing search request with point-in-time ID: ${this.truncatePitId(pitId)}` + ); + } + } + + const searchBody: estypes.SearchRequest = searchSource.getSearchRequestBody(); if (searchBody == null) { throw new Error('Could not retrieve the search body!'); } - this.logger.debug(`Tracking total hits with: track_total_hits=${searchBody.track_total_hits}`); - this.logger.info(`Executing search request...`); const searchParams = { params: { body: searchBody, - index: index.title, - scroll: scrollSettings.duration, - size: scrollSettings.size, ignore_throttled: includeFrozen ? false : undefined, // "true" will cause deprecation warnings logged in ES }, }; @@ -88,35 +126,19 @@ export class CsvGenerator { strategy: ES_SEARCH_STRATEGY, transport: { maxRetries: 0, // retrying reporting jobs is handled in the task manager scheduling logic - requestTimeout: this.config.scroll.duration, + requestTimeout: scrollSettings.duration, }, }) ) - ).rawResponse as estypes.SearchResponse; + ).rawResponse; } catch (err) { - this.logger.error(`CSV export scan error: ${err}`); + this.logger.error(`CSV export search error: ${err}`); throw err; } return results; } - private async scroll(scrollId: string, scrollSettings: CsvExportSettings['scroll']) { - this.logger.info(`Executing scroll request...`); - - let results: estypes.SearchResponse | undefined; - try { - results = await this.clients.es.asCurrentUser.scroll({ - scroll: scrollSettings.duration, - scroll_id: scrollId, - }); - } catch (err) { - this.logger.error(`CSV export scroll error: ${err}`); - throw err; - } - return results; - } - /* * Load field formats for each field in the list */ @@ -202,7 +224,7 @@ export class CsvGenerator { builder: MaxSizeStringBuilder, settings: CsvExportSettings ) { - this.logger.debug(`Building CSV header row...`); + this.logger.debug(`Building CSV header row`); const header = Array.from(columns).map(this.escapeValues(settings)).join(settings.separator) + '\n'; @@ -225,7 +247,7 @@ export class CsvGenerator { formatters: Record, settings: CsvExportSettings ) { - this.logger.debug(`Building ${table.rows.length} CSV data rows...`); + this.logger.debug(`Building ${table.rows.length} CSV data rows`); for (const dataTableRow of table.rows) { if (this.cancellationToken.isCancelled()) { break; @@ -293,26 +315,29 @@ export class CsvGenerator { throw new Error(`The search must have a reference to an index pattern!`); } - const { maxSizeBytes, bom, escapeFormulaValues, scroll: scrollSettings } = settings; - + const { maxSizeBytes, bom, escapeFormulaValues, timezone } = settings; + const indexPatternTitle = index.title; const builder = new MaxSizeStringBuilder(this.stream, byteSizeValueToNumber(maxSizeBytes), bom); const warnings: string[] = []; let first = true; let currentRecord = -1; let totalRecords: number | undefined; let totalRelation = 'eq'; - let scrollId: string | undefined; + let searchAfter: estypes.SortResults | undefined; + + const pitId = await this.openPointInTime(indexPatternTitle, settings); + searchSource.setField('pit', { id: pitId, keep_alive: settings.scroll.duration }); // apply timezone from the job to all date field formatters try { index.fields.getByType('date').forEach(({ name }) => { - this.logger.debug(`setting timezone on ${name}`); + this.logger.debug(`Setting timezone on ${name}`); const format: FieldFormatConfig = { ...index.fieldFormatMap[name], id: index.fieldFormatMap[name]?.id || 'date', // allow id: date_nanos params: { ...index.fieldFormatMap[name]?.params, - timezone: settings.timezone, + timezone, }, }; index.setFieldFormat(name, format); @@ -327,24 +352,17 @@ export class CsvGenerator { if (this.cancellationToken.isCancelled()) { break; } - let results: estypes.SearchResponse | undefined; - if (scrollId == null) { - // open a scroll cursor in Elasticsearch - results = await this.scan(index, searchSource, settings); - scrollId = results?._scroll_id; - if (results?.hits?.total != null) { - const { hits } = results; - if (typeof hits.total === 'number') { - totalRecords = hits.total; - } else { - totalRecords = hits.total?.value; - totalRelation = hits.total?.relation ?? 'unknown'; - } - this.logger.info(`Total hits: [${totalRecords}].` + `Accuracy: ${totalRelation}`); + const results = await this.doSearch(searchSource, settings, searchAfter); + + const { hits } = results; + if (first && hits.total != null) { + if (typeof hits.total === 'number') { + totalRecords = hits.total; + } else { + totalRecords = hits.total?.value; + totalRelation = hits.total?.relation ?? 'unknown'; } - } else { - // use the scroll cursor in Elasticsearch - results = await this.scroll(scrollId, scrollSettings); + this.logger.info(`Total hits ${totalRelation} ${totalRecords}.`); } if (!results) { @@ -354,12 +372,11 @@ export class CsvGenerator { // TODO check for shard failures, log them and add a warning if found { - const { - hits: { hits, ...hitsMeta }, - ...header - } = results; - this.logger.debug('Results metadata: ' + JSON.stringify({ header, hitsMeta })); - } + const { hits: { hits: _hits, ...hitsMeta }, ...headerWithPit } = results; + const { pit_id: newPitId, ...header } = headerWithPit; + const meta = JSON.stringify({ header: { pit_id: `${this.truncatePitId(newPitId)}`, ...header }, hitsMeta }); + this.logger.debug(`Results metadata: ${meta}`); + } // prettier-ignore let table: Datatable | undefined; try { @@ -390,6 +407,10 @@ export class CsvGenerator { const formatters = this.getFormatters(table); await this.generateRows(columns, table, builder, formatters, settings); + // Update last sort results for next query. PIT is used, so the sort results + // automatically include _shard_doc as a tiebreaker + searchAfter = hits.hits[hits.hits.length - 1]?.sort as estypes.SortResults | undefined; + // update iterator currentRecord += table.rows.length; } while (totalRecords != null && currentRecord < totalRecords - 1); @@ -411,16 +432,12 @@ export class CsvGenerator { warnings.push(i18nTexts.unknownError(err?.message ?? err)); } } finally { - // clear scrollID - if (scrollId) { - this.logger.debug(`Executing clearScroll request`); - try { - await this.clients.es.asCurrentUser.clearScroll({ scroll_id: [scrollId] }); - } catch (err) { - this.logger.error(err); - } + // + if (pitId) { + this.logger.debug(`Closing point-in-time`); + await this.clients.es.asCurrentUser.closePointInTime({ body: { id: pitId } }); } else { - this.logger.warn(`No scrollId to clear!`); + this.logger.warn(`No point-in-time ID to clear!`); } } @@ -447,4 +464,8 @@ export class CsvGenerator { error_code: reportingError?.code, }; } + + private truncatePitId(pitId: string | undefined) { + return pitId?.substring(0, 12) + '...'; + } } From dcccbb577257b1016c72c28eebf179f101a8c017 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Thu, 3 Nov 2022 11:22:28 -0700 Subject: [PATCH 02/10] add new PIT tests to data plugin --- .../search/search_source/search_source.test.ts | 7 +++++++ .../data/common/search/search_source/types.ts | 8 ++++++-- .../strategies/es_search/es_search_strategy.ts | 17 +++++------------ .../strategies/es_search/request_utils.ts | 16 +++++++++++++--- 4 files changed, 31 insertions(+), 17 deletions(-) diff --git a/src/plugins/data/common/search/search_source/search_source.test.ts b/src/plugins/data/common/search/search_source/search_source.test.ts index b5cabc654a3f7..4e6192d24e8eb 100644 --- a/src/plugins/data/common/search/search_source/search_source.test.ts +++ b/src/plugins/data/common/search/search_source/search_source.test.ts @@ -903,6 +903,13 @@ describe('SearchSource', () => { expect(Object.keys(JSON.parse(searchSourceJSON))).toEqual(['highlightAll', 'from', 'sort']); }); + test('should add pit', () => { + const pit = { id: 'flimflam', keep_alive: '1m' }; + searchSource.setField('pit', pit); + const { searchSourceJSON } = searchSource.serialize(); + expect(searchSourceJSON).toBe(JSON.stringify({ pit })); + }); + test('should serialize filters', () => { const filter = [ { diff --git a/src/plugins/data/common/search/search_source/types.ts b/src/plugins/data/common/search/search_source/types.ts index b899a74efcab6..b37794d3ed1cd 100644 --- a/src/plugins/data/common/search/search_source/types.ts +++ b/src/plugins/data/common/search/search_source/types.ts @@ -39,6 +39,9 @@ export interface ISearchStartSearchSource createEmpty: () => ISearchSource; } +/** + * @deprecated use {@link estypes.SortResults} instead. + */ export type EsQuerySearchAfter = [string | number, string | number]; export enum SortDirection { @@ -112,12 +115,13 @@ export interface SearchSourceFields { * {@link IndexPatternService} */ index?: DataView; - searchAfter?: EsQuerySearchAfter; timeout?: string; terminate_after?: number; + searchAfter?: estypes.SortResults; /** * Allow querying to use a point-in-time ID for paging results + * Requires searchAfter when the page index is > 1. */ pit?: estypes.SearchPointInTimeReference; @@ -165,7 +169,7 @@ export type SerializedSearchSourceFields = { * {@link IndexPatternService} */ index?: string | DataViewSpec; - searchAfter?: EsQuerySearchAfter; + searchAfter?: estypes.SortResults; timeout?: string; terminate_after?: number; diff --git a/src/plugins/data/server/search/strategies/es_search/es_search_strategy.ts b/src/plugins/data/server/search/strategies/es_search/es_search_strategy.ts index 22bf32fffb2a7..b2aed5804f248 100644 --- a/src/plugins/data/server/search/strategies/es_search/es_search_strategy.ts +++ b/src/plugins/data/server/search/strategies/es_search/es_search_strategy.ts @@ -6,17 +6,15 @@ * Side Public License, v 1. */ -import { estypes } from '@elastic/elasticsearch'; -import type { Logger, SharedGlobalConfig } from '@kbn/core/server'; -import { getKbnServerError, KbnServerError } from '@kbn/kibana-utils-plugin/server'; -import { omit } from 'lodash'; import { firstValueFrom, from, Observable } from 'rxjs'; import { tap } from 'rxjs/operators'; -import type { SearchUsage } from '../../collectors/search'; -import { searchUsageObserver } from '../../collectors/search/usage'; +import type { Logger, SharedGlobalConfig } from '@kbn/core/server'; +import { getKbnServerError, KbnServerError } from '@kbn/kibana-utils-plugin/server'; import type { ISearchStrategy } from '../../types'; +import type { SearchUsage } from '../../collectors/search'; import { getDefaultSearchParams, getShardTimeout } from './request_utils'; import { shimHitsTotal, toKibanaSearchResponse } from './response_utils'; +import { searchUsageObserver } from '../../collectors/search/usage'; export const esSearchStrategyProvider = ( config$: Observable, @@ -44,12 +42,7 @@ export const esSearchStrategyProvider = ( const config = await firstValueFrom(config$); // @ts-expect-error params fall back to any, but should be valid SearchRequest params const { terminateAfter, ...requestParams } = request.params ?? {}; - let defaults: estypes.IndicesOptions = await getDefaultSearchParams(uiSettingsClient); - - if (isPit) { - // Remove IndicesOptions from the request if PIT is used, these options are set in the PIT - defaults = omit(defaults, ['ignore_unavailable']); - } + const defaults = await getDefaultSearchParams(uiSettingsClient, { isPit }); const params = { ...defaults, diff --git a/src/plugins/data/server/search/strategies/es_search/request_utils.ts b/src/plugins/data/server/search/strategies/es_search/request_utils.ts index fa1bfdf635f70..11fd271902e1f 100644 --- a/src/plugins/data/server/search/strategies/es_search/request_utils.ts +++ b/src/plugins/data/server/search/strategies/es_search/request_utils.ts @@ -18,7 +18,8 @@ export function getShardTimeout( } export async function getDefaultSearchParams( - uiSettingsClient: Pick + uiSettingsClient: Pick, + options = { isPit: false } ): Promise<{ max_concurrent_shard_requests?: number; ignore_unavailable?: boolean; @@ -27,10 +28,19 @@ export async function getDefaultSearchParams( const maxConcurrentShardRequests = await uiSettingsClient.get( UI_SETTINGS.COURIER_MAX_CONCURRENT_SHARD_REQUESTS ); - return { + + const defaults: Awaited> = { max_concurrent_shard_requests: maxConcurrentShardRequests > 0 ? maxConcurrentShardRequests : undefined, - ignore_unavailable: true, // Don't fail if the index/indices don't exist track_total_hits: true, }; + + // If the request has a point-in-time ID attached, it can not include ignore_unavailable from {@link estypes.IndicesOptions}. + // ES will reject the request as that option was set when the point-in-time was created. + // Otherwise, this option allows search to not fail when the index/indices don't exist + if (!options.isPit) { + defaults.ignore_unavailable = true; + } + + return defaults; } From fec0f95233b8213673353ac7efbeb02b7072036f Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Fri, 4 Nov 2022 17:30:16 -0700 Subject: [PATCH 03/10] fix deprecation --- .../csv_searchsource/generate_csv/generate_csv.test.ts | 1 + .../export_types/csv_searchsource/generate_csv/generate_csv.ts | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts index 2c2f343959a2a..d11ce00a5a7a9 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts @@ -131,6 +131,7 @@ beforeEach(async () => { }, metaFields: ['_id', '_index', '_type', '_score'], getFormatterForField: jest.fn(), + getIndexPattern: () => 'dataview title', }; } }); diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts index bc2071e0b4ed0..7d758bbd2339a 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts @@ -316,7 +316,7 @@ export class CsvGenerator { } const { maxSizeBytes, bom, escapeFormulaValues, timezone } = settings; - const indexPatternTitle = index.title; + const indexPatternTitle = index.getIndexPattern(); const builder = new MaxSizeStringBuilder(this.stream, byteSizeValueToNumber(maxSizeBytes), bom); const warnings: string[] = []; let first = true; From 6284ac226267d1c78fb16aab6370447b9fd7e231 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Fri, 4 Nov 2022 17:31:25 -0700 Subject: [PATCH 04/10] update point-in-time ID to the latest one received --- .../generate_csv/generate_csv.ts | 43 +++++++++++-------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts index 7d758bbd2339a..7e94d2e8992d6 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts @@ -88,22 +88,15 @@ export class CsvGenerator { const { scroll: scrollSettings, includeFrozen } = settings; searchSource.setField('size', scrollSettings.size); - { - const pitId = searchSource.getField('pit')?.id; - if (!pitId) { - throw new Error(`Unable to retrieve point-in-time ID from search source!`); - } - if (lastSortId) { - this.logger.debug( - `Executing search request with point-in-time ID: ${this.truncatePitId(pitId)}` + - `, and search_after: [${lastSortId}]` - ); - searchSource.setField('searchAfter', lastSortId); - } else { - this.logger.debug( - `Executing search request with point-in-time ID: ${this.truncatePitId(pitId)}` - ); - } + const pitId = searchSource.getField('pit')?.id; + if (lastSortId) { + this.logger.debug( + `Executing search request with PIT ID: ${this.truncatePitId(pitId)}` + + `, and search_after: [${lastSortId}]` + ); + searchSource.setField('searchAfter', lastSortId); + } else { + this.logger.debug(`Executing search request with PIT ID: ${this.truncatePitId(pitId)}`); } const searchBody: estypes.SearchRequest = searchSource.getSearchRequestBody(); @@ -325,8 +318,7 @@ export class CsvGenerator { let totalRelation = 'eq'; let searchAfter: estypes.SortResults | undefined; - const pitId = await this.openPointInTime(indexPatternTitle, settings); - searchSource.setField('pit', { id: pitId, keep_alive: settings.scroll.duration }); + let pitId = await this.openPointInTime(indexPatternTitle, settings); // apply timezone from the job to all date field formatters try { @@ -352,8 +344,21 @@ export class CsvGenerator { if (this.cancellationToken.isCancelled()) { break; } + // set the latest pit, which could be different from the last request + searchSource.setField('pit', { id: pitId, keep_alive: settings.scroll.duration }); + const results = await this.doSearch(searchSource, settings, searchAfter); + if (!results.pit_id) { + throw new Error(`Search response did not contain a point-in-time ID!`); + } + // use the most recently received id for the next search request + this.logger.debug( + `Latest PIT ID from results: [${this.truncatePitId(results.pit_id)}]` + + ` Previous PIT ID used: [${this.truncatePitId(pitId)}]` + ); + pitId = results.pit_id; + const { hits } = results; if (first && hits.total != null) { if (typeof hits.total === 'number') { @@ -437,7 +442,7 @@ export class CsvGenerator { this.logger.debug(`Closing point-in-time`); await this.clients.es.asCurrentUser.closePointInTime({ body: { id: pitId } }); } else { - this.logger.warn(`No point-in-time ID to clear!`); + this.logger.warn(`No PIT ID to clear!`); } } From 73a428e67d18a564bb73fe0b3409d99da29b6a5d Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Fri, 4 Nov 2022 17:31:43 -0700 Subject: [PATCH 05/10] add warning for shard failure --- .../generate_csv/generate_csv.ts | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts index 7e94d2e8992d6..21ffde5b9e37e 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts @@ -375,13 +375,27 @@ export class CsvGenerator { break; } - // TODO check for shard failures, log them and add a warning if found - { - const { hits: { hits: _hits, ...hitsMeta }, ...headerWithPit } = results; - const { pit_id: newPitId, ...header } = headerWithPit; - const meta = JSON.stringify({ header: { pit_id: `${this.truncatePitId(newPitId)}`, ...header }, hitsMeta }); - this.logger.debug(`Results metadata: ${meta}`); - } // prettier-ignore + const { + hits: { hits: _hits, ...hitsMeta }, + ...headerWithPit + } = results; + + const { pit_id: newPitId, ...header } = headerWithPit; + + const logInfo = { + header: { pit_id: `${this.truncatePitId(newPitId)}`, ...header }, + hitsMeta, + }; + this.logger.debug(`Results metadata: ${JSON.stringify(logInfo)}`); + + // check for shard failures, log them and add a warning if found + const { _shards: shards } = header; + if (shards.failures) { + shards.failures.forEach(({ reason }) => { + warnings.push(`Shard failure: ${JSON.stringify(reason)}`); + this.logger.warn(JSON.stringify(reason)); + }); + } let table: Datatable | undefined; try { From 6dfecf809ca0cf723d0f684d960bd6cc37f003f2 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Fri, 4 Nov 2022 18:05:56 -0700 Subject: [PATCH 06/10] fix/cleanup csv generation test --- .../generate_csv/generate_csv.test.ts | 449 ++++++++---------- 1 file changed, 188 insertions(+), 261 deletions(-) diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts index d11ce00a5a7a9..8f97514f7f89c 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { errors as esErrors } from '@elastic/elasticsearch'; +import { errors as esErrors, estypes } from '@elastic/elasticsearch'; import type { SearchResponse } from '@elastic/elasticsearch/lib/api/types'; import type { IScopedClusterClient, IUiSettingsClient, Logger } from '@kbn/core/server'; import { @@ -50,6 +50,7 @@ const searchSourceMock = { ...searchSourceInstanceMock, getSearchRequestBody: jest.fn(() => ({})), }; + const mockSearchSourceService: jest.Mocked = { create: jest.fn().mockReturnValue(searchSourceMock), createEmpty: jest.fn().mockReturnValue(searchSourceMock), @@ -58,19 +59,21 @@ const mockSearchSourceService: jest.Mocked = { extract: jest.fn(), getAllMigrations: jest.fn(), }; + +const mockPitId = 'oju9fs3698s3902f02-8qg3-u9w36oiewiuyew6'; + +const getMockRawResponse = (hits: Array> = [], total = hits.length) => ({ + took: 1, + timed_out: false, + pit_id: mockPitId, + _shards: { total: 1, successful: 1, failed: 0, skipped: 0 }, + hits: { hits, total, max_score: 0 }, +}); + const mockDataClientSearchDefault = jest.fn().mockImplementation( (): Rx.Observable<{ rawResponse: SearchResponse }> => Rx.of({ - rawResponse: { - took: 1, - timed_out: false, - _shards: { total: 1, successful: 1, failed: 0, skipped: 0 }, - hits: { - hits: [], - total: 0, - max_score: 0, - }, - }, + rawResponse: getMockRawResponse(), }) ); @@ -85,8 +88,6 @@ const getMockConfig = (properties: DeepPartial = {}) => { return config.get('csv'); }; -const mockPitId = 'oju9fs3698s3902f02-8qg3-u9w36oiewiuyew6'; - beforeEach(async () => { content = ''; stream = { write: jest.fn((chunk) => (content += chunk)) } as unknown as typeof stream; @@ -131,7 +132,7 @@ beforeEach(async () => { }, metaFields: ['_id', '_index', '_type', '_score'], getFormatterForField: jest.fn(), - getIndexPattern: () => 'dataview title', + getIndexPattern: () => 'logstash-*', }; } }); @@ -164,20 +165,15 @@ it('formats an empty search result to CSV content', async () => { it('formats a search result to CSV content', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - fields: { - date: `["2020-12-31T00:14:28.000Z"]`, - ip: `["110.135.176.89"]`, - message: `["This is a great message!"]`, - }, - }, - ], - total: 1, - }, - }, + rawResponse: getMockRawResponse([ + { + fields: { + date: `["2020-12-31T00:14:28.000Z"]`, + ip: `["110.135.176.89"]`, + message: `["This is a great message!"]`, + }, + } as unknown as estypes.SearchHit, + ]), }) ); const generateCsv = new CsvGenerator( @@ -206,16 +202,16 @@ const HITS_TOTAL = 100; it('calculates the bytes of the content', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: range(0, HITS_TOTAL).map(() => ({ - fields: { - message: ['this is a great message'], - }, - })), - total: HITS_TOTAL, - }, - }, + rawResponse: getMockRawResponse( + range(0, HITS_TOTAL).map( + () => + ({ + fields: { + message: ['this is a great message'], + }, + } as unknown as estypes.SearchHit) + ) + ), }) ); @@ -253,18 +249,18 @@ it('warns if max size was reached', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: range(0, HITS_TOTAL).map(() => ({ - fields: { - date: ['2020-12-31T00:14:28.000Z'], - ip: ['110.135.176.89'], - message: ['super cali fragile istic XPLA docious'], - }, - })), - total: HITS_TOTAL, - }, - }, + rawResponse: getMockRawResponse( + range(0, HITS_TOTAL).map( + () => + ({ + fields: { + date: ['2020-12-31T00:14:28.000Z'], + ip: ['110.135.176.89'], + message: ['super cali fragile istic XPLA docious'], + }, + } as unknown as estypes.SearchHit) + ) + ), }) ); @@ -295,33 +291,35 @@ it('uses the pit ID to page all the data', async () => { .fn() .mockImplementationOnce(() => Rx.of({ - rawResponse: { - hits: { - hits: range(0, HITS_TOTAL / 10).map(() => ({ - fields: { - date: ['2020-12-31T00:14:28.000Z'], - ip: ['110.135.176.89'], - message: ['hit from the initial search'], - }, - })), - total: HITS_TOTAL, - }, - }, + rawResponse: getMockRawResponse( + range(0, HITS_TOTAL / 10).map( + () => + ({ + fields: { + date: ['2020-12-31T00:14:28.000Z'], + ip: ['110.135.176.89'], + message: ['hit from the initial search'], + }, + } as unknown as estypes.SearchHit) + ), + HITS_TOTAL + ), }) ) .mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: range(0, HITS_TOTAL / 10).map(() => ({ - fields: { - date: ['2020-12-31T00:14:28.000Z'], - ip: ['110.135.176.89'], - message: ['hit from a subsequent scroll'], - }, - })), - }, - }, + rawResponse: getMockRawResponse( + range(0, HITS_TOTAL / 10).map( + () => + ({ + fields: { + date: ['2020-12-31T00:14:28.000Z'], + ip: ['110.135.176.89'], + message: ['hit from a subsequent scroll'], + }, + } as unknown as estypes.SearchHit) + ) + ), }) ); @@ -354,7 +352,7 @@ it('uses the pit ID to page all the data', async () => { expect(mockEsClient.asCurrentUser.openPointInTime).toHaveBeenCalledTimes(1); expect(mockEsClient.asCurrentUser.openPointInTime).toHaveBeenCalledWith({ ignore_unavailable: true, - index: undefined, + index: 'logstash-*', keep_alive: '30s', }); @@ -369,52 +367,26 @@ it('keeps order of the columns during the scroll', async () => { .fn() .mockImplementationOnce(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - fields: { - a: ['a1'], - b: ['b1'], - }, - }, - ], - total: 3, - }, - }, + rawResponse: getMockRawResponse( + [{ fields: { a: ['a1'], b: ['b1'] } } as unknown as estypes.SearchHit], + 3 + ), }) ) .mockImplementationOnce(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - fields: { - b: ['b2'], - }, - }, - ], - total: 3, - }, - }, + rawResponse: getMockRawResponse( + [{ fields: { b: ['b2'] } } as unknown as estypes.SearchHit], + 3 + ), }) ) .mockImplementationOnce(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - fields: { - a: ['a3'], - c: ['c3'], - }, - }, - ], - total: 3, - }, - }, + rawResponse: getMockRawResponse( + [{ fields: { a: ['a3'], c: ['c3'] } } as unknown as estypes.SearchHit], + 3 + ), }) ); @@ -443,21 +415,16 @@ describe('fields from job.searchSource.getFields() (7.12 generated)', () => { it('cells can be multi-value', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - _id: 'my-cool-id', - _index: 'my-cool-index', - _version: 4, - fields: { - sku: [`This is a cool SKU.`, `This is also a cool SKU.`], - }, - }, - ], - total: 1, + rawResponse: getMockRawResponse([ + { + _id: 'my-cool-id', + _index: 'my-cool-index', + _version: 4, + fields: { + sku: [`This is a cool SKU.`, `This is also a cool SKU.`], + }, }, - }, + ]), }) ); @@ -485,22 +452,17 @@ describe('fields from job.searchSource.getFields() (7.12 generated)', () => { it('provides top-level underscored fields as columns', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - _id: 'my-cool-id', - _index: 'my-cool-index', - _version: 4, - fields: { - date: ['2020-12-31T00:14:28.000Z'], - message: [`it's nice to see you`], - }, - }, - ], - total: 1, + rawResponse: getMockRawResponse([ + { + _id: 'my-cool-id', + _index: 'my-cool-index', + _version: 4, + fields: { + date: ['2020-12-31T00:14:28.000Z'], + message: [`it's nice to see you`], + }, }, - }, + ]), }) ); @@ -539,28 +501,23 @@ describe('fields from job.searchSource.getFields() (7.12 generated)', () => { it('sorts the fields when they are to be used as table column names', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - _id: 'my-cool-id', - _index: 'my-cool-index', - _version: 4, - fields: { - date: ['2020-12-31T00:14:28.000Z'], - message_z: [`test field Z`], - message_y: [`test field Y`], - message_x: [`test field X`], - message_w: [`test field W`], - message_v: [`test field V`], - message_u: [`test field U`], - message_t: [`test field T`], - }, - }, - ], - total: 1, + rawResponse: getMockRawResponse([ + { + _id: 'my-cool-id', + _index: 'my-cool-index', + _version: 4, + fields: { + date: ['2020-12-31T00:14:28.000Z'], + message_z: [`test field Z`], + message_y: [`test field Y`], + message_x: [`test field X`], + message_w: [`test field W`], + message_v: [`test field V`], + message_u: [`test field U`], + message_t: [`test field T`], + }, }, - }, + ]), }) ); @@ -600,22 +557,17 @@ describe('fields from job.columns (7.13+ generated)', () => { it('cells can be multi-value', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - _id: 'my-cool-id', - _index: 'my-cool-index', - _version: 4, - fields: { - product: 'coconut', - category: [`cool`, `rad`], - }, - }, - ], - total: 1, + rawResponse: getMockRawResponse([ + { + _id: 'my-cool-id', + _index: 'my-cool-index', + _version: 4, + fields: { + product: 'coconut', + category: [`cool`, `rad`], + }, }, - }, + ]), }) ); @@ -643,22 +595,17 @@ describe('fields from job.columns (7.13+ generated)', () => { it('columns can be top-level fields such as _id and _index', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - _id: 'my-cool-id', - _index: 'my-cool-index', - _version: 4, - fields: { - product: 'coconut', - category: [`cool`, `rad`], - }, - }, - ], - total: 1, + rawResponse: getMockRawResponse([ + { + _id: 'my-cool-id', + _index: 'my-cool-index', + _version: 4, + fields: { + product: 'coconut', + category: [`cool`, `rad`], + }, }, - }, + ]), }) ); @@ -686,22 +633,17 @@ describe('fields from job.columns (7.13+ generated)', () => { it('default column names come from tabify', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - _id: 'my-cool-id', - _index: 'my-cool-index', - _version: 4, - fields: { - product: 'coconut', - category: [`cool`, `rad`], - }, - }, - ], - total: 1, + rawResponse: getMockRawResponse([ + { + _id: 'my-cool-id', + _index: 'my-cool-index', + _version: 4, + fields: { + product: 'coconut', + category: [`cool`, `rad`], + }, }, - }, + ]), }) ); @@ -733,20 +675,15 @@ describe('formulas', () => { it(`escapes formula values in a cell, doesn't warn the csv contains formulas`, async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - fields: { - date: ['2020-12-31T00:14:28.000Z'], - ip: ['110.135.176.89'], - message: [TEST_FORMULA], - }, - }, - ], - total: 1, - }, - }, + rawResponse: getMockRawResponse([ + { + fields: { + date: ['2020-12-31T00:14:28.000Z'], + ip: ['110.135.176.89'], + message: [TEST_FORMULA], + }, + } as unknown as estypes.SearchHit, + ]), }) ); @@ -776,20 +713,15 @@ describe('formulas', () => { it(`escapes formula values in a header, doesn't warn the csv contains formulas`, async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - fields: { - date: ['2020-12-31T00:14:28.000Z'], - ip: ['110.135.176.89'], - [TEST_FORMULA]: 'This is great data', - }, - }, - ], - total: 1, - }, - }, + rawResponse: getMockRawResponse([ + { + fields: { + date: ['2020-12-31T00:14:28.000Z'], + ip: ['110.135.176.89'], + [TEST_FORMULA]: 'This is great data', + }, + } as unknown as estypes.SearchHit, + ]), }) ); @@ -827,20 +759,15 @@ describe('formulas', () => { }); mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - fields: { - date: ['2020-12-31T00:14:28.000Z'], - ip: ['110.135.176.89'], - message: [TEST_FORMULA], - }, - }, - ], - total: 1, - }, - }, + rawResponse: getMockRawResponse([ + { + fields: { + date: ['2020-12-31T00:14:28.000Z'], + ip: ['110.135.176.89'], + message: [TEST_FORMULA], + }, + } as unknown as estypes.SearchHit, + ]), }) ); @@ -999,18 +926,18 @@ describe('error codes', () => { .fn() .mockImplementationOnce(() => Rx.of({ - rawResponse: { - hits: { - hits: range(0, 5).map(() => ({ - fields: { - date: ['2020-12-31T00:14:28.000Z'], - ip: ['110.135.176.89'], - message: ['super cali fragile istic XPLA docious'], - }, - })), - total: 10, - }, - }, + rawResponse: getMockRawResponse( + range(0, 5).map(() => ({ + _index: 'lasdf', + _id: 'lasdf123', + fields: { + date: ['2020-12-31T00:14:28.000Z'], + ip: ['110.135.176.89'], + message: ['super cali fragile istic XPLA docious'], + }, + })), + 10 + ), }) ) .mockImplementationOnce(() => { From 1ada50721903434d85479c2c52c94e378760fd2e Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Fri, 4 Nov 2022 19:10:01 -0700 Subject: [PATCH 07/10] add requestTimeout to openPit request --- .../generate_csv/generate_csv.ts | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts index 21ffde5b9e37e..5924729d0eed9 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts @@ -58,25 +58,31 @@ export class CsvGenerator { private async openPointInTime(indexPatternTitle: string, settings: CsvExportSettings) { const { duration } = settings.scroll; let pitId: string | undefined; - this.logger.debug(`Open point-in-time for index: ${indexPatternTitle}`); + this.logger.debug(`Requesting point-in-time for: [${indexPatternTitle}]...`); try { - // open PIT and set field format config - // use _pit API - const response = await this.clients.es.asCurrentUser.openPointInTime({ - index: indexPatternTitle, - keep_alive: duration, - ignore_unavailable: true, - }); + // NOTE: if ES is overloaded, this request could time out + const response = await this.clients.es.asCurrentUser.openPointInTime( + { + index: indexPatternTitle, + keep_alive: duration, + ignore_unavailable: true, + }, + { + requestTimeout: duration, + maxRetries: 0, + } + ); pitId = response.id; } catch (err) { this.logger.error(err); - throw err; } if (!pitId) { throw new Error(`Could not receive a point-in-time ID!`); } + this.logger.debug(`Received PIT ID: ${this.truncatePitId(pitId)}`); + return pitId; } @@ -88,17 +94,16 @@ export class CsvGenerator { const { scroll: scrollSettings, includeFrozen } = settings; searchSource.setField('size', scrollSettings.size); - const pitId = searchSource.getField('pit')?.id; if (lastSortId) { - this.logger.debug( - `Executing search request with PIT ID: ${this.truncatePitId(pitId)}` + - `, and search_after: [${lastSortId}]` - ); searchSource.setField('searchAfter', lastSortId); - } else { - this.logger.debug(`Executing search request with PIT ID: ${this.truncatePitId(pitId)}`); } + const pitId = searchSource.getField('pit')?.id; + this.logger.debug( + `Executing search request with PIT ID: ${this.truncatePitId(pitId)}` + + (lastSortId ? `, and search_after: [${lastSortId}]` : '') + ); + const searchBody: estypes.SearchRequest = searchSource.getSearchRequestBody(); if (searchBody == null) { throw new Error('Could not retrieve the search body!'); @@ -352,11 +357,9 @@ export class CsvGenerator { if (!results.pit_id) { throw new Error(`Search response did not contain a point-in-time ID!`); } + // use the most recently received id for the next search request - this.logger.debug( - `Latest PIT ID from results: [${this.truncatePitId(results.pit_id)}]` + - ` Previous PIT ID used: [${this.truncatePitId(pitId)}]` - ); + this.logger.debug(`Received PIT ID: [${this.truncatePitId(results.pit_id)}]`); pitId = results.pit_id; const { hits } = results; From c5710b1770b5f5bc4aaf5fb4a00a6cb10d052868 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Fri, 4 Nov 2022 19:42:15 -0700 Subject: [PATCH 08/10] logging polishes --- .../generate_csv/generate_csv.ts | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts index 5924729d0eed9..f527956d5c7fa 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts @@ -81,7 +81,7 @@ export class CsvGenerator { throw new Error(`Could not receive a point-in-time ID!`); } - this.logger.debug(`Received PIT ID: ${this.truncatePitId(pitId)}`); + this.logger.debug(`Opened PIT ID: ${this.truncatePitId(pitId)}`); return pitId; } @@ -89,19 +89,19 @@ export class CsvGenerator { private async doSearch( searchSource: ISearchSource, settings: CsvExportSettings, - lastSortId?: estypes.SortResults + searchAfter?: estypes.SortResults ) { const { scroll: scrollSettings, includeFrozen } = settings; searchSource.setField('size', scrollSettings.size); - if (lastSortId) { - searchSource.setField('searchAfter', lastSortId); + if (searchAfter) { + searchSource.setField('searchAfter', searchAfter); } const pitId = searchSource.getField('pit')?.id; this.logger.debug( - `Executing search request with PIT ID: ${this.truncatePitId(pitId)}` + - (lastSortId ? `, and search_after: [${lastSortId}]` : '') + `Executing search request with PIT ID: [${this.truncatePitId(pitId)}]` + + (searchAfter ? ` search_after: [${searchAfter}]` : '') ); const searchBody: estypes.SearchRequest = searchSource.getSearchRequestBody(); @@ -354,14 +354,6 @@ export class CsvGenerator { const results = await this.doSearch(searchSource, settings, searchAfter); - if (!results.pit_id) { - throw new Error(`Search response did not contain a point-in-time ID!`); - } - - // use the most recently received id for the next search request - this.logger.debug(`Received PIT ID: [${this.truncatePitId(results.pit_id)}]`); - pitId = results.pit_id; - const { hits } = results; if (first && hits.total != null) { if (typeof hits.total === 'number') { @@ -391,6 +383,15 @@ export class CsvGenerator { }; this.logger.debug(`Results metadata: ${JSON.stringify(logInfo)}`); + // use the most recently received id for the next search request + this.logger.debug(`Received PIT ID: [${this.truncatePitId(results.pit_id)}]`); + pitId = results.pit_id ?? pitId; + + // Update last sort results for next query. PIT is used, so the sort results + // automatically include _shard_doc as a tiebreaker + searchAfter = hits.hits[hits.hits.length - 1]?.sort as estypes.SortResults | undefined; + this.logger.debug(`Received search_after: [${searchAfter}]`); + // check for shard failures, log them and add a warning if found const { _shards: shards } = header; if (shards.failures) { @@ -429,10 +430,6 @@ export class CsvGenerator { const formatters = this.getFormatters(table); await this.generateRows(columns, table, builder, formatters, settings); - // Update last sort results for next query. PIT is used, so the sort results - // automatically include _shard_doc as a tiebreaker - searchAfter = hits.hits[hits.hits.length - 1]?.sort as estypes.SortResults | undefined; - // update iterator currentRecord += table.rows.length; } while (totalRecords != null && currentRecord < totalRecords - 1); @@ -468,7 +465,7 @@ export class CsvGenerator { if (!this.maxSizeReached && this.csvRowCount !== totalRecords) { this.logger.warn( `ES scroll returned fewer total hits than expected! ` + - `Search result total hits: ${totalRecords}. Row count: ${this.csvRowCount}.` + `Search result total hits: ${totalRecords}. Row count: ${this.csvRowCount}` ); warnings.push( i18nTexts.csvRowCountError({ expected: totalRecords ?? NaN, received: this.csvRowCount }) From 19050425ac140f6c8d05ce70b9d1b24b5ea22585 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Sat, 5 Nov 2022 12:12:01 -0700 Subject: [PATCH 09/10] fix test --- .../generate_csv/generate_csv.test.ts | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts index 8f97514f7f89c..804fa4bcdd4a6 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts @@ -350,11 +350,14 @@ it('uses the pit ID to page all the data', async () => { ); expect(mockEsClient.asCurrentUser.openPointInTime).toHaveBeenCalledTimes(1); - expect(mockEsClient.asCurrentUser.openPointInTime).toHaveBeenCalledWith({ - ignore_unavailable: true, - index: 'logstash-*', - keep_alive: '30s', - }); + expect(mockEsClient.asCurrentUser.openPointInTime).toHaveBeenCalledWith( + { + ignore_unavailable: true, + index: 'logstash-*', + keep_alive: '30s', + }, + { maxRetries: 0, requestTimeout: '30s' } + ); expect(mockEsClient.asCurrentUser.closePointInTime).toHaveBeenCalledTimes(1); expect(mockEsClient.asCurrentUser.closePointInTime).toHaveBeenCalledWith({ From 92ce96f3c303c2209bd524d3217b06ace2c80e68 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Mon, 7 Nov 2022 09:02:31 -0700 Subject: [PATCH 10/10] remove confusing comment --- src/plugins/data/common/search/search_source/types.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/plugins/data/common/search/search_source/types.ts b/src/plugins/data/common/search/search_source/types.ts index b37794d3ed1cd..140c2dd59a59d 100644 --- a/src/plugins/data/common/search/search_source/types.ts +++ b/src/plugins/data/common/search/search_source/types.ts @@ -117,11 +117,9 @@ export interface SearchSourceFields { index?: DataView; timeout?: string; terminate_after?: number; - searchAfter?: estypes.SortResults; /** * Allow querying to use a point-in-time ID for paging results - * Requires searchAfter when the page index is > 1. */ pit?: estypes.SearchPointInTimeReference;