Skip to content

Commit

Permalink
[ML] Change to use batch and only retry with individual field if failed
Browse files Browse the repository at this point in the history
  • Loading branch information
qn895 committed Nov 4, 2021
1 parent 93016de commit 817fe2d
Show file tree
Hide file tree
Showing 3 changed files with 472 additions and 39 deletions.
4 changes: 3 additions & 1 deletion x-pack/plugins/data_visualizer/common/types/field_stats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export interface Field {
safeFieldName: string;
}

// @todo: check
export function isValidField(arg: unknown): arg is Field {
return isPopulatedObject(arg, ['fieldName', 'type']) && typeof arg.fieldName === 'string';
}
Expand All @@ -48,7 +49,8 @@ export interface Bucket {
}

export interface FieldStatsError {
fieldName: string;
fieldName?: string;
fields?: Field[];
error: Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
*/

import { useCallback, useEffect, useReducer, useRef, useState } from 'react';
import { combineLatest, Observable, Subscription } from 'rxjs';
import { combineLatest, Observable, Subject, Subscription } from 'rxjs';
import { i18n } from '@kbn/i18n';
import { last, cloneDeep } from 'lodash';
import { switchMap } from 'rxjs/operators';
import type {
DataStatsFetchProgress,
FieldStatsSearchStrategyReturnBase,
OverallStatsSearchStrategyParams,
FieldStatsCommonRequestParams,
Field,
} from '../../../../common/types/field_stats';
import { useDataVisualizerKibana } from '../../kibana_context';
import type { FieldRequestConfig } from '../../../../common';
Expand All @@ -21,17 +24,40 @@ import {
buildBaseFilterCriteria,
getSafeAggregationName,
} from '../../../../common/utils/query_utils';
import { getFieldStats } from '../search_strategy/requests/get_field_stats';
import type { FieldStats, FieldStatsError } from '../../../../common/types/field_stats';
import { getInitialProgress, getReducer } from '../progress_utils';
import { MAX_EXAMPLES_DEFAULT } from '../search_strategy/requests/constants';
import type { ISearchOptions } from '../../../../../../../src/plugins/data/common';

import { getFieldsStats } from '../search_strategy/requests/get_fields_stats';
interface FieldStatsParams {
metricConfigs: FieldRequestConfig[];
nonMetricConfigs: FieldRequestConfig[];
}

const createBatchedRequests = (fields: Field[], maxBatchSize = 10) => {
// Batch up fields by type, getting stats for multiple fields at a time.
const batches: Field[][] = [];
const batchedFields: { [key: string]: Field[][] } = {};

fields.forEach((field) => {
const fieldType = field.type;
if (batchedFields[fieldType] === undefined) {
batchedFields[fieldType] = [[]];
}
let lastArray: Field[] = last(batchedFields[fieldType]) as Field[];
if (lastArray.length === maxBatchSize) {
lastArray = [];
batchedFields[fieldType].push(lastArray);
}
lastArray.push(field);
});

Object.values(batchedFields).forEach((lists) => {
batches.push(...lists);
});
return batches;
};

export function useFieldStatsSearchStrategy(
searchStrategyParams: OverallStatsSearchStrategyParams | undefined,
fieldStatsParams: FieldStatsParams | undefined,
Expand All @@ -52,9 +78,12 @@ export function useFieldStatsSearchStrategy(

const abortCtrl = useRef(new AbortController());
const searchSubscription$ = useRef<Subscription>();
const retries$ = useRef<Subscription>();

const startFetch = useCallback(() => {
searchSubscription$.current?.unsubscribe();
retries$.current?.unsubscribe();

abortCtrl.current.abort();
abortCtrl.current = new AbortController();
setFetchState({
Expand Down Expand Up @@ -118,62 +147,117 @@ export function useFieldStatsSearchStrategy(
abortSignal: abortCtrl.current.signal,
sessionId: searchStrategyParams?.sessionId,
};
const sub = combineLatest(
sortedConfigs
.map((config, idx) =>
getFieldStats(
data,
params,
{
fieldName: config.fieldName,
type: config.type,
cardinality: config.cardinality,
safeFieldName: getSafeAggregationName(config.fieldName, idx),
},
searchOptions
)
)
.filter((obs) => obs !== undefined) as Array<Observable<FieldStats | FieldStatsError>>

const batches = createBatchedRequests(
sortedConfigs.map((config, idx) => ({
fieldName: config.fieldName,
type: config.type,
cardinality: config.cardinality,
safeFieldName: getSafeAggregationName(config.fieldName, idx),
})),
10
);

searchSubscription$.current = sub.subscribe({
const subTemp = combineLatest(
batches
.map((batch) => getFieldsStats(data, params, batch, searchOptions, true))
.filter((obs) => obs !== undefined) as Array<Observable<FieldStats[] | FieldStatsError>>
);

const statsMap$ = new Subject();
const fieldsToRetry$ = new Subject<Field[]>();

const onError = (error: any) => {
toasts.addError(error, {
title: i18n.translate('xpack.dataVisualizer.index.errorFetchingFieldStatisticsMessage', {
defaultMessage: 'Error fetching field statistics',
}),
});
setFetchState({
isRunning: false,
error,
});
};

const onComplete = () => {
setFetchState({
isRunning: false,
});
};

// First, attempt to fetch field stats in batches of 10
searchSubscription$.current = subTemp.subscribe({
next: (resp) => {
if (resp) {
const statsMap = resp.reduce((map, field) => {
map.set(field.fieldName, field);
return map;
}, new Map<string, FieldStats>());
const statsMap = new Map<string, FieldStats>();
const failedFields: Field[] = [];
resp.forEach((batchResponse) => {
if (Array.isArray(batchResponse)) {
batchResponse.forEach((f) => {
if (f.fieldName !== undefined) {
statsMap.set(f.fieldName, f);
}
});
} else {
// If an error occurred during batch
// retry each field in the failed batch individually
failedFields.push(...(batchResponse.fields ?? []));
}
});

setFetchState({
loaded: (resp.length / sortedConfigs.length) * 100,
isRunning: true,
});

setFieldStats(statsMap);

statsMap$.next(statsMap);
fieldsToRetry$.next(failedFields);
}
},
error: (error) => {
toasts.addError(error, {
title: i18n.translate('xpack.dataVisualizer.index.errorFetchingFieldStatisticsMessage', {
defaultMessage: 'Error fetching field statistics',
}),
});
setFetchState({
isRunning: false,
error,
});
},
complete: () => {
setFetchState({
isRunning: false,
error: onError,
complete: onComplete,
});

// If any of batches failed, retry each of the failed field at least one time individually
retries$.current = combineLatest([
statsMap$,
fieldsToRetry$.pipe(
switchMap((failedFields) => {
return combineLatest(
failedFields
.map((failedField) =>
getFieldsStats(data, params, [failedField], searchOptions, false)
)
.filter((obs) => obs !== undefined)
);
})
),
]).subscribe({
next: (resp) => {
const statsMap = cloneDeep(resp[0]) as Map<string, FieldStats>;
const fieldBatches = resp[1];

fieldBatches.forEach((f) => {
if (Array.isArray(f) && f.length === 1) {
statsMap.set(f[0].fieldName, f[0]);
}
});
setFieldStats(statsMap);
},
error: onError,
complete: onComplete,
});
}, [data, toasts, searchStrategyParams, fieldStatsParams, initialDataVisualizerListState]);

const cancelFetch = useCallback(() => {
searchSubscription$.current?.unsubscribe();
searchSubscription$.current = undefined;

retries$.current?.unsubscribe();
retries$.current = undefined;

abortCtrl.current.abort();
setFetchState({
isRunning: false,
Expand Down
Loading

0 comments on commit 817fe2d

Please sign in to comment.