Skip to content

Commit

Permalink
[ML] Batch requests for speed and retry failures for resiliency
Browse files Browse the repository at this point in the history
  • Loading branch information
qn895 committed Nov 4, 2021
1 parent 817fe2d commit 41e7cb6
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 643 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,11 @@ export const useDataVisualizerGridData = (
const existMetricFields = metricConfigs
.map((config) => {
if (config.existsInDocs === false) return;
if (config?.stats?.cardinality !== undefined) {
return {
fieldName: config.fieldName,
type: config.type,
cardinality: config.stats.cardinality,
};
}
return {
fieldName: config.fieldName,
type: config.type,
cardinality: config.stats?.cardinality ?? 0,
};
})
.filter((c) => c !== undefined) as FieldRequestConfig[];

Expand All @@ -237,13 +235,11 @@ export const useDataVisualizerGridData = (
const existNonMetricFields: FieldRequestConfig[] = nonMetricConfigs
.map((config) => {
if (config.existsInDocs === false) return;
if (config?.stats?.cardinality !== undefined) {
return {
fieldName: config.fieldName,
type: config.type,
cardinality: config.stats.cardinality,
};
}
return {
fieldName: config.fieldName,
type: config.type,
cardinality: config.stats?.cardinality ?? 0,
};
})
.filter((c) => c !== undefined) as FieldRequestConfig[];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ export function useFieldStatsSearchStrategy(
10
);

const subTemp = combineLatest(
const fieldStatsSub = combineLatest(
batches
.map((batch) => getFieldsStats(data, params, batch, searchOptions, true))
.map((batch) => getFieldsStats(data, params, batch, searchOptions))
.filter((obs) => obs !== undefined) as Array<Observable<FieldStats[] | FieldStatsError>>
);

Expand All @@ -186,7 +186,7 @@ export function useFieldStatsSearchStrategy(
};

// First, attempt to fetch field stats in batches of 10
searchSubscription$.current = subTemp.subscribe({
searchSubscription$.current = fieldStatsSub.subscribe({
next: (resp) => {
if (resp) {
const statsMap = new Map<string, FieldStats>();
Expand All @@ -206,7 +206,7 @@ export function useFieldStatsSearchStrategy(
});

setFetchState({
loaded: (resp.length / sortedConfigs.length) * 100,
loaded: (statsMap.size / sortedConfigs.length) * 100,
isRunning: true,
});

Expand All @@ -227,9 +227,7 @@ export function useFieldStatsSearchStrategy(
switchMap((failedFields) => {
return combineLatest(
failedFields
.map((failedField) =>
getFieldsStats(data, params, [failedField], searchOptions, false)
)
.map((failedField) => getFieldsStats(data, params, [failedField], searchOptions))
.filter((obs) => obs !== undefined)
);
})
Expand All @@ -239,12 +237,18 @@ export function useFieldStatsSearchStrategy(
const statsMap = cloneDeep(resp[0]) as Map<string, FieldStats>;
const fieldBatches = resp[1];

fieldBatches.forEach((f) => {
if (Array.isArray(f) && f.length === 1) {
statsMap.set(f[0].fieldName, f[0]);
}
});
setFieldStats(statsMap);
if (Array.isArray(fieldBatches)) {
fieldBatches.forEach((f) => {
if (Array.isArray(f) && f.length === 1) {
statsMap.set(f[0].fieldName, f[0]);
}
});
setFieldStats(statsMap);
setFetchState({
loaded: (statsMap.size / sortedConfigs.length) * 100,
isRunning: true,
});
}
},
error: onError,
complete: onComplete,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,26 @@ import {
} from '../../../../../../../../src/plugins/data/public';
import { extractErrorProperties } from '../../utils/error_utils';

export const getBooleanFieldStatsRequest = (
export const getBooleanFieldsStatsRequest = (
params: FieldStatsCommonRequestParams,
field: Field
fields: Field[]
) => {
const { index, query, runtimeFieldMap, samplerShardSize } = params;

const size = 0;
const aggs: Aggs = {};

const safeFieldName = field.safeFieldName;
aggs[`${safeFieldName}_value_count`] = {
filter: { exists: { field: field.fieldName } },
};
aggs[`${safeFieldName}_values`] = {
terms: {
field: field.fieldName,
size: 2,
},
};

fields.forEach((field, i) => {
const safeFieldName = field.safeFieldName;
aggs[`${safeFieldName}_value_count`] = {
filter: { exists: { field: field.fieldName } },
};
aggs[`${safeFieldName}_values`] = {
terms: {
field: field.fieldName,
size: 2,
},
};
});
const searchBody = {
query,
aggs: buildSamplerAggregation(aggs, samplerShardSize),
Expand All @@ -61,20 +61,20 @@ export const getBooleanFieldStatsRequest = (
};
};

export const fetchBooleanFieldStats = (
export const fetchBooleanFieldsStats = (
data: DataPublicPluginStart,
params: FieldStatsCommonRequestParams,
field: Field,
fields: Field[],
options: ISearchOptions
): Observable<BooleanFieldStats | FieldStatsError> => {
): Observable<BooleanFieldStats[] | FieldStatsError> => {
const { samplerShardSize } = params;
const request: estypes.SearchRequest = getBooleanFieldStatsRequest(params, field);
const request: estypes.SearchRequest = getBooleanFieldsStatsRequest(params, fields);
return data.search
.search<IKibanaSearchRequest, IKibanaSearchResponse>({ params: request }, options)
.pipe(
catchError((e) =>
of({
fieldName: field.fieldName,
fields,
error: extractErrorProperties(e),
} as FieldStatsError)
),
Expand All @@ -84,23 +84,27 @@ export const fetchBooleanFieldStats = (
const aggregations = resp.rawResponse.aggregations;
const aggsPath = getSamplerAggregationsResponsePath(samplerShardSize);

const safeFieldName = field.safeFieldName;
const stats: BooleanFieldStats = {
fieldName: field.fieldName,
count: get(aggregations, [...aggsPath, `${safeFieldName}_value_count`, 'doc_count'], 0),
trueCount: 0,
falseCount: 0,
};
const batchStats: BooleanFieldStats[] = fields.map((field, i) => {
const safeFieldName = field.fieldName;
const stats: BooleanFieldStats = {
fieldName: field.fieldName,
count: get(aggregations, [...aggsPath, `${safeFieldName}_value_count`, 'doc_count'], 0),
trueCount: 0,
falseCount: 0,
};

const valueBuckets: Array<{ [key: string]: number }> = get(
aggregations,
[...aggsPath, `${safeFieldName}_values`, 'buckets'],
[]
);
valueBuckets.forEach((bucket) => {
stats[`${bucket.key_as_string}Count`] = bucket.doc_count;
const valueBuckets: Array<{ [key: string]: number }> = get(
aggregations,
[...aggsPath, `${safeFieldName}_values`, 'buckets'],
[]
);
valueBuckets.forEach((bucket) => {
stats[`${bucket.key_as_string}Count`] = bucket.doc_count;
});
return stats;
});
return stats;

return batchStats;
})
);
};
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,26 @@ import {
import { FieldStatsError, isIKibanaSearchResponse } from '../../../../../common/types/field_stats';
import { extractErrorProperties } from '../../utils/error_utils';

export const getDateFieldStatsRequest = (params: FieldStatsCommonRequestParams, field: Field) => {
export const getDateFieldsStatsRequest = (
params: FieldStatsCommonRequestParams,
fields: Field[]
) => {
const { index, query, runtimeFieldMap, samplerShardSize } = params;

const size = 0;

const aggs: Aggs = {};
const safeFieldName = field.safeFieldName;
aggs[`${safeFieldName}_field_stats`] = {
filter: { exists: { field: field.fieldName } },
aggs: {
actual_stats: {
stats: { field: field.fieldName },
fields.forEach((field, i) => {
const safeFieldName = field.safeFieldName;
aggs[`${safeFieldName}_field_stats`] = {
filter: { exists: { field: field.fieldName } },
aggs: {
actual_stats: {
stats: { field: field.fieldName },
},
},
},
};
};
});

const searchBody = {
query,
Expand All @@ -53,45 +58,49 @@ export const getDateFieldStatsRequest = (params: FieldStatsCommonRequestParams,
};
};

export const fetchDateFieldStats = (
export const fetchDateFieldsStats = (
data: DataPublicPluginStart,
params: FieldStatsCommonRequestParams,
field: Field,
fields: Field[],
options: ISearchOptions
): Observable<DateFieldStats | FieldStatsError> => {
): Observable<DateFieldStats[] | FieldStatsError> => {
const { samplerShardSize } = params;

const request: estypes.SearchRequest = getDateFieldStatsRequest(params, field);
const request: estypes.SearchRequest = getDateFieldsStatsRequest(params, fields);
return data.search
.search<IKibanaSearchRequest, IKibanaSearchResponse>({ params: request }, options)
.pipe(
catchError((e) =>
of({
fieldName: field.fieldName,
fields,
error: extractErrorProperties(e),
} as FieldStatsError)
),
map((resp) => {
if (!isIKibanaSearchResponse(resp)) return resp;
const aggregations = resp.rawResponse.aggregations;
const aggsPath = getSamplerAggregationsResponsePath(samplerShardSize);
const safeFieldName = field.safeFieldName;
const docCount = get(
aggregations,
[...aggsPath, `${safeFieldName}_field_stats`, 'doc_count'],
0
);
const fieldStatsResp = get(
aggregations,
[...aggsPath, `${safeFieldName}_field_stats`, 'actual_stats'],
{}
);
return {
fieldName: field.fieldName,
count: docCount,
earliest: get(fieldStatsResp, 'min', 0),
latest: get(fieldStatsResp, 'max', 0),
};

const batchStats: DateFieldStats[] = fields.map((field, i) => {
const safeFieldName = field.safeFieldName;
const docCount = get(
aggregations,
[...aggsPath, `${safeFieldName}_field_stats`, 'doc_count'],
0
);
const fieldStatsResp = get(
aggregations,
[...aggsPath, `${safeFieldName}_field_stats`, 'actual_stats'],
{}
);
return {
fieldName: field.fieldName,
count: docCount,
earliest: get(fieldStatsResp, 'min', 0),
latest: get(fieldStatsResp, 'max', 0),
} as DateFieldStats;
});
return batchStats;
})
);
};
Loading

0 comments on commit 41e7cb6

Please sign in to comment.