Skip to content

Commit

Permalink
[Infra UI] Convert terms aggregation to composite for field selection…
Browse files Browse the repository at this point in the history
… filtering (#47500) (#48675)

* Convert from large terms agg to multiple smaller composite

* Fixing bug in snapshot

* Reducing duplicate code for composite agg requests

* Adding filter for last 24 hours

* Creating a helper for the afterKey handlers
  • Loading branch information
simianhacker authored Oct 18, 2019
1 parent cfab2a9 commit 172e68d
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
import { InfraFrameworkRequest } from '../framework';

export interface FieldsAdapter {
getIndexFields(req: InfraFrameworkRequest, indices: string): Promise<IndexFieldDescriptor[]>;
getIndexFields(
req: InfraFrameworkRequest,
indices: string,
timefield: string
): Promise<IndexFieldDescriptor[]>;
}

export interface IndexFieldDescriptor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,29 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { startsWith, uniq } from 'lodash';
import { InfraBackendFrameworkAdapter, InfraFrameworkRequest } from '../framework';
import { startsWith, uniq, first } from 'lodash';
import { idx } from '@kbn/elastic-idx';
import {
InfraBackendFrameworkAdapter,
InfraFrameworkRequest,
InfraDatabaseSearchResponse,
} from '../framework';
import { FieldsAdapter, IndexFieldDescriptor } from './adapter_types';
import { getAllowedListForPrefix } from '../../../../common/ecs_allowed_list';
import { getAllCompositeData } from '../../../utils/get_all_composite_data';
import { createAfterKeyHandler } from '../../../utils/create_afterkey_handler';

interface Bucket {
key: string;
key: { dataset: string };
doc_count: number;
}

interface DataSetResponse {
modules: {
buckets: Bucket[];
};
dataSets: {
datasets: {
buckets: Bucket[];
after_key: {
dataset: string;
};
};
}

Expand All @@ -32,13 +39,14 @@ export class FrameworkFieldsAdapter implements FieldsAdapter {

public async getIndexFields(
request: InfraFrameworkRequest,
indices: string
indices: string,
timefield: string
): Promise<IndexFieldDescriptor[]> {
const indexPatternsService = this.framework.getIndexPatternsService(request);
const response = await indexPatternsService.getFieldsForWildcard({
pattern: indices,
});
const { dataSets, modules } = await this.getDataSetsAndModules(request, indices);
const { dataSets, modules } = await this.getDataSetsAndModules(request, indices, timefield);
const allowedList = modules.reduce(
(acc, name) => uniq([...acc, ...getAllowedListForPrefix(name)]),
[] as string[]
Expand All @@ -52,41 +60,68 @@ export class FrameworkFieldsAdapter implements FieldsAdapter {

private async getDataSetsAndModules(
request: InfraFrameworkRequest,
indices: string
indices: string,
timefield: string
): Promise<{ dataSets: string[]; modules: string[] }> {
const params = {
index: indices,
allowNoIndices: true,
ignoreUnavailable: true,
body: {
aggs: {
modules: {
terms: {
field: 'event.modules',
size: 1000,
},
size: 0,
query: {
bool: {
filter: [
{
range: {
[timefield]: {
gte: 'now-24h',
lte: 'now',
},
},
},
],
},
dataSets: {
terms: {
field: 'event.dataset',
size: 1000,
},
aggs: {
datasets: {
composite: {
sources: [
{
dataset: {
terms: {
field: 'event.dataset',
},
},
},
],
},
},
},
},
};
const response = await this.framework.callWithRequest<{}, DataSetResponse>(

const bucketSelector = (response: InfraDatabaseSearchResponse<{}, DataSetResponse>) =>
(response.aggregations && response.aggregations.datasets.buckets) || [];
const handleAfterKey = createAfterKeyHandler('body.aggs.datasets.composite.after', input =>
idx(input, _ => _.aggregations.datasets.after_key)
);

const buckets = await getAllCompositeData<DataSetResponse, Bucket>(
this.framework,
request,
'search',
params
params,
bucketSelector,
handleAfterKey
);
if (!response.aggregations) {
return { dataSets: [], modules: [] };
}
const { modules, dataSets } = response.aggregations;
return {
modules: modules.buckets.map(bucket => bucket.key),
dataSets: dataSets.buckets.map(bucket => bucket.key),
};
const dataSets = buckets.map(bucket => bucket.key.dataset);
const modules = dataSets.reduce(
(acc, dataset) => {
const module = first(dataset.split(/\./));
return module ? uniq([...acc, module]) : acc;
},
[] as string[]
);
return { modules, dataSets };
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ export class InfraFieldsDomain {
request,
`${includeMetricIndices ? configuration.metricAlias : ''},${
includeLogIndices ? configuration.logAlias : ''
}`
}`,
configuration.fields.timestamp
);

return fields;
Expand Down
76 changes: 24 additions & 52 deletions x-pack/legacy/plugins/infra/server/lib/snapshot/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { idx } from '@kbn/elastic-idx';
import {
InfraSnapshotGroupbyInput,
InfraSnapshotMetricInput,
Expand All @@ -12,7 +13,11 @@ import {
InfraNodeType,
InfraSourceConfiguration,
} from '../../graphql/types';
import { InfraBackendFrameworkAdapter, InfraFrameworkRequest } from '../adapters/framework';
import {
InfraBackendFrameworkAdapter,
InfraFrameworkRequest,
InfraDatabaseSearchResponse,
} from '../adapters/framework';
import { InfraSources } from '../sources';

import { JsonObject } from '../../../common/typed_json';
Expand All @@ -31,6 +36,8 @@ import {
InfraSnapshotNodeMetricsBucket,
} from './response_helpers';
import { IP_FIELDS } from '../constants';
import { getAllCompositeData } from '../../utils/get_all_composite_data';
import { createAfterKeyHandler } from '../../utils/create_afterkey_handler';

export interface InfraSnapshotRequestOptions {
nodeType: InfraNodeType;
Expand Down Expand Up @@ -63,6 +70,14 @@ export class InfraSnapshot {
}
}

const bucketSelector = (
response: InfraDatabaseSearchResponse<{}, InfraSnapshotAggregationResponse>
) => (response.aggregations && response.aggregations.nodes.buckets) || [];

const handleAfterKey = createAfterKeyHandler('body.aggregations.nodes.composite.after', input =>
idx(input, _ => _.aggregations.nodes.after_key)
);

const requestGroupedNodes = async (
request: InfraFrameworkRequest,
options: InfraSnapshotRequestOptions,
Expand Down Expand Up @@ -112,11 +127,10 @@ const requestGroupedNodes = async (
},
};

return await getAllCompositeAggregationData<InfraSnapshotNodeGroupByBucket>(
framework,
request,
query
);
return await getAllCompositeData<
InfraSnapshotAggregationResponse,
InfraSnapshotNodeGroupByBucket
>(framework, request, query, bucketSelector, handleAfterKey);
};

const requestNodeMetrics = async (
Expand Down Expand Up @@ -174,12 +188,10 @@ const requestNodeMetrics = async (
},
},
};

return await getAllCompositeAggregationData<InfraSnapshotNodeMetricsBucket>(
framework,
request,
query
);
return await getAllCompositeData<
InfraSnapshotAggregationResponse,
InfraSnapshotNodeMetricsBucket
>(framework, request, query, bucketSelector, handleAfterKey);
};

// buckets can be InfraSnapshotNodeGroupByBucket[] or InfraSnapshotNodeMetricsBucket[]
Expand All @@ -191,46 +203,6 @@ interface InfraSnapshotAggregationResponse {
};
}

const getAllCompositeAggregationData = async <BucketType>(
framework: InfraBackendFrameworkAdapter,
request: InfraFrameworkRequest,
query: any,
previousBuckets: BucketType[] = []
): Promise<BucketType[]> => {
const response = await framework.callWithRequest<{}, InfraSnapshotAggregationResponse>(
request,
'search',
query
);

// Nothing available, return the previous buckets.
if (response.hits.total.value === 0) {
return previousBuckets;
}

// if ES doesn't return an aggregations key, something went seriously wrong.
if (!response.aggregations) {
throw new Error('Whoops!, `aggregations` key must always be returned.');
}

const currentBuckets = response.aggregations.nodes.buckets;

// if there are no currentBuckets then we are finished paginating through the results
if (currentBuckets.length === 0) {
return previousBuckets;
}

// There is possibly more data, concat previous and current buckets and call ourselves recursively.
const newQuery = { ...query };
newQuery.body.aggregations.nodes.composite.after = response.aggregations.nodes.after_key;
return getAllCompositeAggregationData(
framework,
request,
query,
previousBuckets.concat(currentBuckets)
);
};

const mergeNodeBuckets = (
nodeGroupByBuckets: InfraSnapshotNodeGroupByBucket[],
nodeMetricsBuckets: InfraSnapshotNodeMetricsBucket[],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { set } from 'lodash';
import { InfraDatabaseSearchResponse } from '../lib/adapters/framework';

export const createAfterKeyHandler = (
optionsAfterKeyPath: string | string[],
afterKeySelector: (input: InfraDatabaseSearchResponse<any, any>) => any
) => <Options>(options: Options, response: InfraDatabaseSearchResponse<any, any>): Options => {
if (!response.aggregations) {
return options;
}
const newOptions = { ...options };
const afterKey = afterKeySelector(response);
set(newOptions, optionsAfterKeyPath, afterKey);
return newOptions;
};
54 changes: 54 additions & 0 deletions x-pack/legacy/plugins/infra/server/utils/get_all_composite_data.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import {
InfraBackendFrameworkAdapter,
InfraFrameworkRequest,
InfraDatabaseSearchResponse,
} from '../lib/adapters/framework';

export const getAllCompositeData = async <
Aggregation = undefined,
Bucket = {},
Options extends object = {}
>(
framework: InfraBackendFrameworkAdapter,
request: InfraFrameworkRequest,
options: Options,
bucketSelector: (response: InfraDatabaseSearchResponse<{}, Aggregation>) => Bucket[],
onAfterKey: (options: Options, response: InfraDatabaseSearchResponse<{}, Aggregation>) => Options,
previousBuckets: Bucket[] = []
): Promise<Bucket[]> => {
const response = await framework.callWithRequest<{}, Aggregation>(request, 'search', options);

// Nothing available, return the previous buckets.
if (response.hits.total.value === 0) {
return previousBuckets;
}

// if ES doesn't return an aggregations key, something went seriously wrong.
if (!response.aggregations) {
throw new Error('Whoops!, `aggregations` key must always be returned.');
}

const currentBuckets = bucketSelector(response);

// if there are no currentBuckets then we are finished paginating through the results
if (currentBuckets.length === 0) {
return previousBuckets;
}

// There is possibly more data, concat previous and current buckets and call ourselves recursively.
const newOptions = onAfterKey(options, response);
return getAllCompositeData(
framework,
request,
newOptions,
bucketSelector,
onAfterKey,
previousBuckets.concat(currentBuckets)
);
};

0 comments on commit 172e68d

Please sign in to comment.