Skip to content

Commit

Permalink
[ML] remove script fields
Browse files Browse the repository at this point in the history
  • Loading branch information
darnautov committed Jun 8, 2021
1 parent f5dc9d1 commit 615deb2
Showing 1 changed file with 36 additions and 99 deletions.
135 changes: 36 additions & 99 deletions x-pack/plugins/ml/server/lib/alerts/alerting_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type AggResultsResponse = { key?: number } & {
};
};

const TIME_RANGE_PADDING = 10;

/**
* Mapping for result types and corresponding score fields.
*/
Expand All @@ -63,43 +65,6 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
};
};

const getCommonScriptedFields = () => {
return {
start: {
script: {
lang: 'painless',
source: `LocalDateTime.ofEpochSecond((doc["timestamp"].value.getMillis()-((doc["bucket_span"].value * 1000)
* params.padding)) / 1000, 0, ZoneOffset.UTC).toString()+\":00.000Z\"`,
params: {
padding: 10,
},
},
},
end: {
script: {
lang: 'painless',
source: `LocalDateTime.ofEpochSecond((doc["timestamp"].value.getMillis()+((doc["bucket_span"].value * 1000)
* params.padding)) / 1000, 0, ZoneOffset.UTC).toString()+\":00.000Z\"`,
params: {
padding: 10,
},
},
},
timestamp_epoch: {
script: {
lang: 'painless',
source: 'doc["timestamp"].value.getMillis()/1000',
},
},
timestamp_iso8601: {
script: {
lang: 'painless',
source: 'doc["timestamp"].value',
},
},
};
};

/**
* Builds an agg query based on the requested result type.
* @param resultType
Expand All @@ -110,9 +75,9 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
severity: number,
useInitialScore?: boolean
) => {
const influencerScoreField = `${useInitialScore ? 'initial_' : ''}influencer_score`;
const recordScoreField = `${useInitialScore ? 'initial_' : ''}record_score`;
const bucketScoreField = `${useInitialScore ? 'initial_' : ''}anomaly_score`;
const influencerScoreField = getScoreFields(ANOMALY_RESULT_TYPE.INFLUENCER, useInitialScore);
const recordScoreField = getScoreFields(ANOMALY_RESULT_TYPE.RECORD, useInitialScore);
const bucketScoreField = getScoreFields(ANOMALY_RESULT_TYPE.BUCKET, useInitialScore);

return {
influencer_results: {
Expand Down Expand Up @@ -142,25 +107,10 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
'influencer_score',
'is_interim',
'job_id',
'bucket_span',
],
},
size: 3,
script_fields: {
...getCommonScriptedFields(),
score: {
script: {
lang: 'painless',
source: `Math.floor(doc["${influencerScoreField}"].value)`,
},
},
unique_key: {
script: {
lang: 'painless',
source:
'doc["timestamp"].value + "_" + doc["influencer_field_name"].value + "_" + doc["influencer_field_value"].value',
},
},
},
},
},
},
Expand Down Expand Up @@ -199,24 +149,10 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
'partition_field_value',
'job_id',
'detector_index',
'bucket_span',
],
},
size: 3,
script_fields: {
...getCommonScriptedFields(),
score: {
script: {
lang: 'painless',
source: `Math.floor(doc["${recordScoreField}"].value)`,
},
},
unique_key: {
script: {
lang: 'painless',
source: 'doc["timestamp"].value + "_" + doc["function"].value',
},
},
},
},
},
},
Expand Down Expand Up @@ -248,24 +184,10 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
'timestamp',
'anomaly_score',
'is_interim',
'bucket_span',
],
},
size: 1,
script_fields: {
...getCommonScriptedFields(),
score: {
script: {
lang: 'painless',
source: `Math.floor(doc["${bucketScoreField}"].value)`,
},
},
unique_key: {
script: {
lang: 'painless',
source: 'doc["timestamp"].value',
},
},
},
},
},
},
Expand All @@ -282,6 +204,10 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
return source.job_id;
};

const getScoreFields = (resultType: AnomalyResultType, useInitialScore?: boolean) => {
return `${useInitialScore ? 'initial_' : ''}${resultTypeScoreMapping[resultType]}`;
};

const getRecordKey = (source: AnomalyRecordDoc): string => {
let alertInstanceKey = `${source.job_id}_${source.timestamp}`;

Expand All @@ -294,18 +220,23 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
return alertInstanceKey;
};

const getResultsFormatter = (resultType: AnomalyResultType) => {
/**
* Returns a callback for formatting elasticsearch aggregation response
* to the alert context.
* @param resultType
*/
const getResultsFormatter = (resultType: AnomalyResultType, useInitialScore: boolean = false) => {
const resultsLabel = getAggResultsLabel(resultType);
return (v: AggResultsResponse): AlertExecutionResult | undefined => {
const aggTypeResults = v[resultsLabel.aggGroupLabel];
if (aggTypeResults.doc_count === 0) {
return;
}

const requestedAnomalies = aggTypeResults[resultsLabel.topHitsLabel].hits.hits;

const topAnomaly = requestedAnomalies[0];
const alertInstanceKey = getAlertInstanceKey(topAnomaly._source);
const timestamp = topAnomaly._source.timestamp;
const bucketSpanInSeconds = topAnomaly._source.bucket_span;

return {
count: aggTypeResults.doc_count,
Expand All @@ -315,26 +246,32 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da
alertInstanceKey,
jobIds: [...new Set(requestedAnomalies.map((h) => h._source.job_id))],
isInterim: requestedAnomalies.some((h) => h._source.is_interim),
timestamp: topAnomaly._source.timestamp,
timestampIso8601: topAnomaly.fields.timestamp_iso8601[0],
timestampEpoch: topAnomaly.fields.timestamp_epoch[0],
score: topAnomaly.fields.score[0],
timestamp,
timestampIso8601: new Date(timestamp).toISOString(),
timestampEpoch: timestamp / 1000,
score: Math.floor(topAnomaly._source[getScoreFields(resultType, useInitialScore)]),
bucketRange: {
start: topAnomaly.fields.start[0],
end: topAnomaly.fields.end[0],
start: new Date(
timestamp - bucketSpanInSeconds * 1000 * TIME_RANGE_PADDING
).toISOString(),
end: new Date(timestamp + bucketSpanInSeconds * 1000 * TIME_RANGE_PADDING).toISOString(),
},
topRecords: v.record_results.top_record_hits.hits.hits.map((h) => {
return {
...h._source,
score: h.fields.score[0],
score: Math.floor(
topAnomaly._source[getScoreFields(ANOMALY_RESULT_TYPE.RECORD, useInitialScore)]
),
unique_key: getRecordKey(h._source),
};
}) as RecordAnomalyAlertDoc[],
topInfluencers: v.influencer_results.top_influencer_hits.hits.hits.map((h) => {
return {
...h._source,
score: h.fields.score[0],
unique_key: h.fields.unique_key[0],
score: Math.floor(
topAnomaly._source[getScoreFields(ANOMALY_RESULT_TYPE.INFLUENCER, useInitialScore)]
),
unique_key: `${h._source.timestamp}_${h._source.influencer_field_name}_${h._source.influencer_field_value}`,
};
}) as InfluencerAnomalyAlertDoc[],
};
Expand Down Expand Up @@ -447,7 +384,7 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da

const resultsLabel = getAggResultsLabel(params.resultType);

const formatter = getResultsFormatter(params.resultType);
const formatter = getResultsFormatter(params.resultType, !!previewTimeInterval);

return (previewTimeInterval
? (result as {
Expand Down

0 comments on commit 615deb2

Please sign in to comment.