Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] APM Latency Correlations: Field/value candidates prioritization #107370

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export interface SearchServiceParams {

export interface SearchServiceFetchParams extends SearchServiceParams {
index: string;
includeFrozen: boolean;
}

export interface SearchServiceValue {
Expand All @@ -50,5 +51,4 @@ export interface AsyncSearchProviderProgress {
loadedFieldCanditates: number;
loadedFieldValuePairs: number;
loadedHistograms: number;
getOverallProgress: () => number;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,76 +5,45 @@
* 2.0.
*/

import { shuffle, range } from 'lodash';
import { range } from 'lodash';
import type { ElasticsearchClient } from 'src/core/server';
import type { ApmIndicesConfig } from '../../settings/apm_indices/get_apm_indices';
import { fetchTransactionDurationFieldCandidates } from './query_field_candidates';
import { fetchTransactionDurationFieldValuePairs } from './query_field_value_pairs';
import { fetchTransactionDurationPercentiles } from './query_percentiles';
import { fetchTransactionDurationCorrelation } from './query_correlation';
import { fetchTransactionDurationHistogramRangeSteps } from './query_histogram_range_steps';
import { fetchTransactionDurationRanges, HistogramItem } from './query_ranges';
import type {
AsyncSearchProviderProgress,
SearchServiceParams,
SearchServiceFetchParams,
SearchServiceValue,
} from '../../../../common/search_strategies/correlations/types';
import { computeExpectationsAndRanges } from './utils/aggregation_utils';
import { fetchTransactionDurationFractions } from './query_fractions';

const CORRELATION_THRESHOLD = 0.3;
const KS_TEST_THRESHOLD = 0.1;

const currentTimeAsString = () => new Date().toISOString();
import type { ApmIndicesConfig } from '../../settings/apm_indices/get_apm_indices';
import {
fetchTransactionDurationFieldCandidates,
fetchTransactionDurationFieldValuePairs,
fetchTransactionDurationFractions,
fetchTransactionDurationPercentiles,
fetchTransactionDurationHistograms,
fetchTransactionDurationHistogramRangeSteps,
fetchTransactionDurationRanges,
} from './queries';
import { computeExpectationsAndRanges } from './utils';
import { asyncSearchServiceLogProvider } from './async_search_service_log';
import { asyncSearchServiceStateProvider } from './async_search_service_state';

export const asyncSearchServiceProvider = (
esClient: ElasticsearchClient,
getApmIndices: () => Promise<ApmIndicesConfig>,
searchServiceParams: SearchServiceParams,
includeFrozen: boolean
) => {
let isCancelled = false;
let isRunning = true;
let error: Error;
let ccsWarning = false;
const log: string[] = [];
const logMessage = (message: string) =>
log.push(`${currentTimeAsString()}: ${message}`);

const progress: AsyncSearchProviderProgress = {
started: Date.now(),
loadedHistogramStepsize: 0,
loadedOverallHistogram: 0,
loadedFieldCanditates: 0,
loadedFieldValuePairs: 0,
loadedHistograms: 0,
getOverallProgress: () =>
progress.loadedHistogramStepsize * 0.025 +
progress.loadedOverallHistogram * 0.025 +
progress.loadedFieldCanditates * 0.025 +
progress.loadedFieldValuePairs * 0.025 +
progress.loadedHistograms * 0.9,
};
const { addLogMessage, getLogMessages } = asyncSearchServiceLogProvider();

const values: SearchServiceValue[] = [];
let overallHistogram: HistogramItem[] | undefined;
const state = asyncSearchServiceStateProvider();

let percentileThresholdValue: number;

const cancel = () => {
logMessage(`Service cancelled.`);
isCancelled = true;
};

const fetchCorrelations = async () => {
async function fetchCorrelations() {
let params: SearchServiceFetchParams | undefined;

try {
const indices = await getApmIndices();
params = {
...searchServiceParams,
index: indices['apm_oss.transactionIndices'],
includeFrozen,
};

// 95th percentile to be displayed as a marker in the log log chart
Expand All @@ -86,37 +55,40 @@ export const asyncSearchServiceProvider = (
params,
params.percentileThreshold ? [params.percentileThreshold] : undefined
);
percentileThresholdValue =
const percentileThresholdValue =
percentileThreshold[`${params.percentileThreshold}.0`];
state.setPercentileThresholdValue(percentileThresholdValue);

logMessage(
addLogMessage(
`Fetched ${params.percentileThreshold}th percentile value of ${percentileThresholdValue} based on ${totalDocs} documents.`
);

// finish early if we weren't able to identify the percentileThresholdValue.
if (percentileThresholdValue === undefined) {
logMessage(
addLogMessage(
`Abort service since percentileThresholdValue could not be determined.`
);
progress.loadedHistogramStepsize = 1;
progress.loadedOverallHistogram = 1;
progress.loadedFieldCanditates = 1;
progress.loadedFieldValuePairs = 1;
progress.loadedHistograms = 1;
isRunning = false;
state.setProgress({
loadedHistogramStepsize: 1,
loadedOverallHistogram: 1,
loadedFieldCanditates: 1,
loadedFieldValuePairs: 1,
loadedHistograms: 1,
});
state.setIsRunning(false);
return;
}

const histogramRangeSteps = await fetchTransactionDurationHistogramRangeSteps(
esClient,
params
);
progress.loadedHistogramStepsize = 1;
state.setProgress({ loadedHistogramStepsize: 1 });

logMessage(`Loaded histogram range steps.`);
addLogMessage(`Loaded histogram range steps.`);

if (isCancelled) {
isRunning = false;
if (state.getState().isCancelled) {
state.setIsRunning(false);
return;
}

Expand All @@ -125,13 +97,13 @@ export const asyncSearchServiceProvider = (
params,
histogramRangeSteps
);
progress.loadedOverallHistogram = 1;
overallHistogram = overallLogHistogramChartData;
state.setProgress({ loadedOverallHistogram: 1 });
state.setOverallHistogram(overallLogHistogramChartData);

logMessage(`Loaded overall histogram chart data.`);
addLogMessage(`Loaded overall histogram chart data.`);

if (isCancelled) {
isRunning = false;
if (state.getState().isCancelled) {
state.setIsRunning(false);
return;
}

Expand All @@ -142,10 +114,10 @@ export const asyncSearchServiceProvider = (
} = await fetchTransactionDurationPercentiles(esClient, params, percents);
const percentiles = Object.values(percentilesRecords);

logMessage(`Loaded percentiles.`);
addLogMessage(`Loaded percentiles.`);

if (isCancelled) {
isRunning = false;
if (state.getState().isCancelled) {
state.setIsRunning(false);
return;
}

Expand All @@ -154,21 +126,21 @@ export const asyncSearchServiceProvider = (
params
);

logMessage(`Identified ${fieldCandidates.length} fieldCandidates.`);
addLogMessage(`Identified ${fieldCandidates.length} fieldCandidates.`);

progress.loadedFieldCanditates = 1;
state.setProgress({ loadedFieldCanditates: 1 });

const fieldValuePairs = await fetchTransactionDurationFieldValuePairs(
esClient,
params,
fieldCandidates,
progress
state
);

logMessage(`Identified ${fieldValuePairs.length} fieldValuePairs.`);
addLogMessage(`Identified ${fieldValuePairs.length} fieldValuePairs.`);

if (isCancelled) {
isRunning = false;
if (state.getState().isCancelled) {
state.setIsRunning(false);
return;
}

Expand All @@ -181,114 +153,75 @@ export const asyncSearchServiceProvider = (
totalDocCount,
} = await fetchTransactionDurationFractions(esClient, params, ranges);

logMessage(`Loaded fractions and totalDocCount of ${totalDocCount}.`);

async function* fetchTransactionDurationHistograms() {
for (const item of shuffle(fieldValuePairs)) {
if (params === undefined || item === undefined || isCancelled) {
isRunning = false;
return;
}

// If one of the fields have an error
// We don't want to stop the whole process
try {
const {
correlation,
ksTest,
} = await fetchTransactionDurationCorrelation(
esClient,
params,
expectations,
ranges,
fractions,
totalDocCount,
item.field,
item.value
);

if (isCancelled) {
isRunning = false;
return;
}

if (
correlation !== null &&
correlation > CORRELATION_THRESHOLD &&
ksTest !== null &&
ksTest < KS_TEST_THRESHOLD
) {
const logHistogram = await fetchTransactionDurationRanges(
esClient,
params,
histogramRangeSteps,
item.field,
item.value
);
yield {
...item,
correlation,
ksTest,
histogram: logHistogram,
};
} else {
yield undefined;
}
} catch (e) {
// don't fail the whole process for individual correlation queries,
// just add the error to the internal log and check if we'd want to set the
// cross-cluster search compatibility warning to true.
logMessage(
`Failed to fetch correlation/kstest for '${item.field}/${item.value}'`
);
if (params?.index.includes(':')) {
ccsWarning = true;
}
yield undefined;
}
}
}
addLogMessage(`Loaded fractions and totalDocCount of ${totalDocCount}.`);

let loadedHistograms = 0;
for await (const item of fetchTransactionDurationHistograms()) {
for await (const item of fetchTransactionDurationHistograms(
esClient,
addLogMessage,
params,
state,
expectations,
ranges,
fractions,
histogramRangeSteps,
totalDocCount,
fieldValuePairs
)) {
if (item !== undefined) {
values.push(item);
state.addValue(item);
}
loadedHistograms++;
progress.loadedHistograms = loadedHistograms / fieldValuePairs.length;
state.setProgress({
loadedHistograms: loadedHistograms / fieldValuePairs.length,
});
}

logMessage(
`Identified ${values.length} significant correlations out of ${fieldValuePairs.length} field/value pairs.`
addLogMessage(
`Identified ${
state.getState().values.length
} significant correlations out of ${
fieldValuePairs.length
} field/value pairs.`
);
} catch (e) {
error = e;
state.setError(e);
}

if (error !== undefined && params?.index.includes(':')) {
ccsWarning = true;
if (state.getState().error !== undefined && params?.index.includes(':')) {
state.setCcsWarning(true);
}

isRunning = false;
};
state.setIsRunning(false);
}

fetchCorrelations();

return () => {
const sortedValues = values.sort((a, b) => b.correlation - a.correlation);
const {
ccsWarning,
error,
isRunning,
overallHistogram,
percentileThresholdValue,
progress,
} = state.getState();

return {
ccsWarning,
error,
log,
log: getLogMessages(),
isRunning,
loaded: Math.round(progress.getOverallProgress() * 100),
loaded: Math.round(state.getOverallProgress() * 100),
overallHistogram,
started: progress.started,
total: 100,
values: sortedValues,
values: state.getValuesSortedByCorrelation(),
percentileThresholdValue,
cancel,
cancel: () => {
addLogMessage(`Service cancelled.`);
state.setIsCancelled(true);
},
};
};
};
Loading