From 172e68dc4efde7d7857e478ab12aa74484582111 Mon Sep 17 00:00:00 2001 From: Chris Cowan Date: Fri, 18 Oct 2019 13:18:07 -0700 Subject: [PATCH] [Infra UI] Convert terms aggregation to composite for field selection filtering (#47500) (#48675) * Convert from large terms agg to multiple smaller composite * Fixing bug in snapshot * Reducing duplicate code for composite agg requests * Adding filter for last 24 hours * Creating a helper for the afterKey handlers --- .../lib/adapters/fields/adapter_types.ts | 6 +- .../fields/framework_fields_adapter.ts | 97 +++++++++++++------ .../infra/server/lib/domains/fields_domain.ts | 3 +- .../infra/server/lib/snapshot/snapshot.ts | 76 +++++---------- .../server/utils/create_afterkey_handler.ts | 21 ++++ .../server/utils/get_all_composite_data.ts | 54 +++++++++++ 6 files changed, 172 insertions(+), 85 deletions(-) create mode 100644 x-pack/legacy/plugins/infra/server/utils/create_afterkey_handler.ts create mode 100644 x-pack/legacy/plugins/infra/server/utils/get_all_composite_data.ts diff --git a/x-pack/legacy/plugins/infra/server/lib/adapters/fields/adapter_types.ts b/x-pack/legacy/plugins/infra/server/lib/adapters/fields/adapter_types.ts index 8f12c0f22a483..66081e60e7e10 100644 --- a/x-pack/legacy/plugins/infra/server/lib/adapters/fields/adapter_types.ts +++ b/x-pack/legacy/plugins/infra/server/lib/adapters/fields/adapter_types.ts @@ -7,7 +7,11 @@ import { InfraFrameworkRequest } from '../framework'; export interface FieldsAdapter { - getIndexFields(req: InfraFrameworkRequest, indices: string): Promise; + getIndexFields( + req: InfraFrameworkRequest, + indices: string, + timefield: string + ): Promise; } export interface IndexFieldDescriptor { diff --git a/x-pack/legacy/plugins/infra/server/lib/adapters/fields/framework_fields_adapter.ts b/x-pack/legacy/plugins/infra/server/lib/adapters/fields/framework_fields_adapter.ts index 2cea001d87b00..179bfef6f1bd8 100644 --- a/x-pack/legacy/plugins/infra/server/lib/adapters/fields/framework_fields_adapter.ts +++ b/x-pack/legacy/plugins/infra/server/lib/adapters/fields/framework_fields_adapter.ts @@ -4,22 +4,29 @@ * you may not use this file except in compliance with the Elastic License. */ -import { startsWith, uniq } from 'lodash'; -import { InfraBackendFrameworkAdapter, InfraFrameworkRequest } from '../framework'; +import { startsWith, uniq, first } from 'lodash'; +import { idx } from '@kbn/elastic-idx'; +import { + InfraBackendFrameworkAdapter, + InfraFrameworkRequest, + InfraDatabaseSearchResponse, +} from '../framework'; import { FieldsAdapter, IndexFieldDescriptor } from './adapter_types'; import { getAllowedListForPrefix } from '../../../../common/ecs_allowed_list'; +import { getAllCompositeData } from '../../../utils/get_all_composite_data'; +import { createAfterKeyHandler } from '../../../utils/create_afterkey_handler'; interface Bucket { - key: string; + key: { dataset: string }; doc_count: number; } interface DataSetResponse { - modules: { - buckets: Bucket[]; - }; - dataSets: { + datasets: { buckets: Bucket[]; + after_key: { + dataset: string; + }; }; } @@ -32,13 +39,14 @@ export class FrameworkFieldsAdapter implements FieldsAdapter { public async getIndexFields( request: InfraFrameworkRequest, - indices: string + indices: string, + timefield: string ): Promise { const indexPatternsService = this.framework.getIndexPatternsService(request); const response = await indexPatternsService.getFieldsForWildcard({ pattern: indices, }); - const { dataSets, modules } = await this.getDataSetsAndModules(request, indices); + const { dataSets, modules } = await this.getDataSetsAndModules(request, indices, timefield); const allowedList = modules.reduce( (acc, name) => uniq([...acc, ...getAllowedListForPrefix(name)]), [] as string[] @@ -52,41 +60,68 @@ export class FrameworkFieldsAdapter implements FieldsAdapter { private async getDataSetsAndModules( request: InfraFrameworkRequest, - indices: string + indices: string, + timefield: string ): Promise<{ dataSets: string[]; modules: string[] }> { const params = { index: indices, allowNoIndices: true, ignoreUnavailable: true, body: { - aggs: { - modules: { - terms: { - field: 'event.modules', - size: 1000, - }, + size: 0, + query: { + bool: { + filter: [ + { + range: { + [timefield]: { + gte: 'now-24h', + lte: 'now', + }, + }, + }, + ], }, - dataSets: { - terms: { - field: 'event.dataset', - size: 1000, + }, + aggs: { + datasets: { + composite: { + sources: [ + { + dataset: { + terms: { + field: 'event.dataset', + }, + }, + }, + ], }, }, }, }, }; - const response = await this.framework.callWithRequest<{}, DataSetResponse>( + + const bucketSelector = (response: InfraDatabaseSearchResponse<{}, DataSetResponse>) => + (response.aggregations && response.aggregations.datasets.buckets) || []; + const handleAfterKey = createAfterKeyHandler('body.aggs.datasets.composite.after', input => + idx(input, _ => _.aggregations.datasets.after_key) + ); + + const buckets = await getAllCompositeData( + this.framework, request, - 'search', - params + params, + bucketSelector, + handleAfterKey ); - if (!response.aggregations) { - return { dataSets: [], modules: [] }; - } - const { modules, dataSets } = response.aggregations; - return { - modules: modules.buckets.map(bucket => bucket.key), - dataSets: dataSets.buckets.map(bucket => bucket.key), - }; + const dataSets = buckets.map(bucket => bucket.key.dataset); + const modules = dataSets.reduce( + (acc, dataset) => { + const module = first(dataset.split(/\./)); + return module ? uniq([...acc, module]) : acc; + }, + [] as string[] + ); + return { modules, dataSets }; } } diff --git a/x-pack/legacy/plugins/infra/server/lib/domains/fields_domain.ts b/x-pack/legacy/plugins/infra/server/lib/domains/fields_domain.ts index a339fcd4f08bf..c5a3bbeb87449 100644 --- a/x-pack/legacy/plugins/infra/server/lib/domains/fields_domain.ts +++ b/x-pack/legacy/plugins/infra/server/lib/domains/fields_domain.ts @@ -28,7 +28,8 @@ export class InfraFieldsDomain { request, `${includeMetricIndices ? configuration.metricAlias : ''},${ includeLogIndices ? configuration.logAlias : '' - }` + }`, + configuration.fields.timestamp ); return fields; diff --git a/x-pack/legacy/plugins/infra/server/lib/snapshot/snapshot.ts b/x-pack/legacy/plugins/infra/server/lib/snapshot/snapshot.ts index 212a01d9e50bf..741293f61056e 100644 --- a/x-pack/legacy/plugins/infra/server/lib/snapshot/snapshot.ts +++ b/x-pack/legacy/plugins/infra/server/lib/snapshot/snapshot.ts @@ -4,6 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ +import { idx } from '@kbn/elastic-idx'; import { InfraSnapshotGroupbyInput, InfraSnapshotMetricInput, @@ -12,7 +13,11 @@ import { InfraNodeType, InfraSourceConfiguration, } from '../../graphql/types'; -import { InfraBackendFrameworkAdapter, InfraFrameworkRequest } from '../adapters/framework'; +import { + InfraBackendFrameworkAdapter, + InfraFrameworkRequest, + InfraDatabaseSearchResponse, +} from '../adapters/framework'; import { InfraSources } from '../sources'; import { JsonObject } from '../../../common/typed_json'; @@ -31,6 +36,8 @@ import { InfraSnapshotNodeMetricsBucket, } from './response_helpers'; import { IP_FIELDS } from '../constants'; +import { getAllCompositeData } from '../../utils/get_all_composite_data'; +import { createAfterKeyHandler } from '../../utils/create_afterkey_handler'; export interface InfraSnapshotRequestOptions { nodeType: InfraNodeType; @@ -63,6 +70,14 @@ export class InfraSnapshot { } } +const bucketSelector = ( + response: InfraDatabaseSearchResponse<{}, InfraSnapshotAggregationResponse> +) => (response.aggregations && response.aggregations.nodes.buckets) || []; + +const handleAfterKey = createAfterKeyHandler('body.aggregations.nodes.composite.after', input => + idx(input, _ => _.aggregations.nodes.after_key) +); + const requestGroupedNodes = async ( request: InfraFrameworkRequest, options: InfraSnapshotRequestOptions, @@ -112,11 +127,10 @@ const requestGroupedNodes = async ( }, }; - return await getAllCompositeAggregationData( - framework, - request, - query - ); + return await getAllCompositeData< + InfraSnapshotAggregationResponse, + InfraSnapshotNodeGroupByBucket + >(framework, request, query, bucketSelector, handleAfterKey); }; const requestNodeMetrics = async ( @@ -174,12 +188,10 @@ const requestNodeMetrics = async ( }, }, }; - - return await getAllCompositeAggregationData( - framework, - request, - query - ); + return await getAllCompositeData< + InfraSnapshotAggregationResponse, + InfraSnapshotNodeMetricsBucket + >(framework, request, query, bucketSelector, handleAfterKey); }; // buckets can be InfraSnapshotNodeGroupByBucket[] or InfraSnapshotNodeMetricsBucket[] @@ -191,46 +203,6 @@ interface InfraSnapshotAggregationResponse { }; } -const getAllCompositeAggregationData = async ( - framework: InfraBackendFrameworkAdapter, - request: InfraFrameworkRequest, - query: any, - previousBuckets: BucketType[] = [] -): Promise => { - const response = await framework.callWithRequest<{}, InfraSnapshotAggregationResponse>( - request, - 'search', - query - ); - - // Nothing available, return the previous buckets. - if (response.hits.total.value === 0) { - return previousBuckets; - } - - // if ES doesn't return an aggregations key, something went seriously wrong. - if (!response.aggregations) { - throw new Error('Whoops!, `aggregations` key must always be returned.'); - } - - const currentBuckets = response.aggregations.nodes.buckets; - - // if there are no currentBuckets then we are finished paginating through the results - if (currentBuckets.length === 0) { - return previousBuckets; - } - - // There is possibly more data, concat previous and current buckets and call ourselves recursively. - const newQuery = { ...query }; - newQuery.body.aggregations.nodes.composite.after = response.aggregations.nodes.after_key; - return getAllCompositeAggregationData( - framework, - request, - query, - previousBuckets.concat(currentBuckets) - ); -}; - const mergeNodeBuckets = ( nodeGroupByBuckets: InfraSnapshotNodeGroupByBucket[], nodeMetricsBuckets: InfraSnapshotNodeMetricsBucket[], diff --git a/x-pack/legacy/plugins/infra/server/utils/create_afterkey_handler.ts b/x-pack/legacy/plugins/infra/server/utils/create_afterkey_handler.ts new file mode 100644 index 0000000000000..559fba0799987 --- /dev/null +++ b/x-pack/legacy/plugins/infra/server/utils/create_afterkey_handler.ts @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { set } from 'lodash'; +import { InfraDatabaseSearchResponse } from '../lib/adapters/framework'; + +export const createAfterKeyHandler = ( + optionsAfterKeyPath: string | string[], + afterKeySelector: (input: InfraDatabaseSearchResponse) => any +) => (options: Options, response: InfraDatabaseSearchResponse): Options => { + if (!response.aggregations) { + return options; + } + const newOptions = { ...options }; + const afterKey = afterKeySelector(response); + set(newOptions, optionsAfterKeyPath, afterKey); + return newOptions; +}; diff --git a/x-pack/legacy/plugins/infra/server/utils/get_all_composite_data.ts b/x-pack/legacy/plugins/infra/server/utils/get_all_composite_data.ts new file mode 100644 index 0000000000000..a5729b6004dcf --- /dev/null +++ b/x-pack/legacy/plugins/infra/server/utils/get_all_composite_data.ts @@ -0,0 +1,54 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { + InfraBackendFrameworkAdapter, + InfraFrameworkRequest, + InfraDatabaseSearchResponse, +} from '../lib/adapters/framework'; + +export const getAllCompositeData = async < + Aggregation = undefined, + Bucket = {}, + Options extends object = {} +>( + framework: InfraBackendFrameworkAdapter, + request: InfraFrameworkRequest, + options: Options, + bucketSelector: (response: InfraDatabaseSearchResponse<{}, Aggregation>) => Bucket[], + onAfterKey: (options: Options, response: InfraDatabaseSearchResponse<{}, Aggregation>) => Options, + previousBuckets: Bucket[] = [] +): Promise => { + const response = await framework.callWithRequest<{}, Aggregation>(request, 'search', options); + + // Nothing available, return the previous buckets. + if (response.hits.total.value === 0) { + return previousBuckets; + } + + // if ES doesn't return an aggregations key, something went seriously wrong. + if (!response.aggregations) { + throw new Error('Whoops!, `aggregations` key must always be returned.'); + } + + const currentBuckets = bucketSelector(response); + + // if there are no currentBuckets then we are finished paginating through the results + if (currentBuckets.length === 0) { + return previousBuckets; + } + + // There is possibly more data, concat previous and current buckets and call ourselves recursively. + const newOptions = onAfterKey(options, response); + return getAllCompositeData( + framework, + request, + newOptions, + bucketSelector, + onAfterKey, + previousBuckets.concat(currentBuckets) + ); +};