Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exposing Helpful Anomaly Detection Metadata from Anomaly Strategies (ie Anomaly Check Range/Thresholds) through backwards compatible function #593

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5338fe4
added more tests to the anomaly detection with extended results changes
arsenalgunnershubert777 Sep 21, 2024
6040cef
Add Spark 3.5 support (#514)
jhchee Feb 8, 2024
02ed720
fix merge conflicts
arsenalgunnershubert777 Nov 1, 2024
a8780b7
Feature: Add Row Level Result Treatment Options for Uniqueness and Co…
eycho-am Feb 15, 2024
185ce01
Skip SparkTableMetricsRepositoryTest iceberg test when SupportsRowLev…
eycho-am Feb 21, 2024
e48f97a
Feature: Add Row Level Result Treatment Options for Miminum and Maxim…
eycho-am Feb 21, 2024
efeec97
Add analyzerOption to add filteredRowOutcome for isPrimaryKey Check (…
eycho-am Feb 23, 2024
9fa5096
Fix bug in MinLength and MaxLength analyzers where given the NullBeha…
eycho-am Feb 26, 2024
c7fa635
[Min/Max] Apply filtered row behavior at the row level evaluation (#543)
rdsharma26 Mar 8, 2024
abb54bc
[MinLength/MaxLength] Apply filtered row behavior at the row level ev…
rdsharma26 Mar 10, 2024
c2e862f
fix merge conflicts
arsenalgunnershubert777 Nov 1, 2024
6538ef3
Fix for satisfies row level results bug (#553)
rdsharma26 Apr 3, 2024
b69f2b8
New analyzer, RatioOfSums (#552)
scott-gunn Apr 11, 2024
efd33f0
Column Count Analyzer and Check (#555)
mentekid Apr 15, 2024
a4a8aa6
Update breeze to match spark 3.5 breeze version (#545)
zeotuan Apr 17, 2024
572d776
Configurable RetainCompletenessRule (#564)
zeotuan May 6, 2024
dc9ba7e
Optional specification of instance name in CustomSQL analyzer metric.…
tylermcdaniel0 May 24, 2024
2a02afe
Adding Wilson Score Confidence Interval Strategy (#567)
zeotuan May 24, 2024
ee26d1c
CustomAggregator (#572)
joshuazexter Jul 31, 2024
97f7a3e
fix typo (#574)
bojackli Aug 29, 2024
9d92d94
Fix performance of building row-level results (#577)
marcantony Aug 31, 2024
0f81982
fix merge conflicts
arsenalgunnershubert777 Nov 1, 2024
fdebce5
updating anomaly check bounds to not have defaults and require inputs…
arsenalgunnershubert777 Nov 4, 2024
5da25c4
add accidentally removed import
arsenalgunnershubert777 Nov 4, 2024
198a41f
update readme to be more clear about the anomalyMetricValue
arsenalgunnershubert777 Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion src/main/scala/com/amazon/deequ/VerificationRunBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.amazon.deequ

import com.amazon.deequ.anomalydetection.AnomalyDetectionStrategy
import com.amazon.deequ.anomalydetection.{AnomalyDetectionStrategy, AnomalyDetectionStrategyWithExtendedResults}
import com.amazon.deequ.analyzers.Analyzer
import com.amazon.deequ.analyzers.{State, _}
import com.amazon.deequ.checks.{Check, CheckLevel}
Expand Down Expand Up @@ -240,6 +240,24 @@ class VerificationRunBuilderWithRepository(
anomalyDetectionStrategy, analyzer, anomalyCheckConfigOrDefault)
this
}

def addAnomalyCheckWithExtendedResults[S <: State[S]](
anomalyDetectionStrategy: AnomalyDetectionStrategyWithExtendedResults,
analyzer: Analyzer[S, Metric[Double]],
anomalyCheckConfig: Option[AnomalyCheckConfig] = None)
: this.type = {

val anomalyCheckConfigOrDefault = anomalyCheckConfig.getOrElse {

val checkDescription = s"Anomaly check for ${analyzer.toString}"

AnomalyCheckConfig(CheckLevel.Warning, checkDescription)
}

checks :+= VerificationRunBuilderHelper.getAnomalyCheckWithExtendedResults(
metricsRepository.get, anomalyDetectionStrategy, analyzer, anomalyCheckConfigOrDefault)
this
}
}

class VerificationRunBuilderWithSparkSession(
Expand Down Expand Up @@ -315,6 +333,32 @@ private[this] object VerificationRunBuilderHelper {
anomalyCheckConfig.beforeDate
)
}

/**
* Build a check using Anomaly Detection with extended results methods
*
* @param metricsRepository A metrics repository to get the previous results
* @param anomalyDetectionStrategyWithExtendedResults The anomaly detection strategy with extended results
* @param analyzer The analyzer for the metric to run anomaly detection on
* @param anomalyCheckConfig Some configuration settings for the Check
*/
def getAnomalyCheckWithExtendedResults[S <: State[S]](
metricsRepository: MetricsRepository,
anomalyDetectionStrategyWithExtendedResults: AnomalyDetectionStrategyWithExtendedResults,
analyzer: Analyzer[S, Metric[Double]],
anomalyCheckConfig: AnomalyCheckConfig)
: Check = {

Check(anomalyCheckConfig.level, anomalyCheckConfig.description)
.isNewestPointNonAnomalousWithExtendedResults(
metricsRepository,
anomalyDetectionStrategyWithExtendedResults,
analyzer,
anomalyCheckConfig.withTagValues,
anomalyCheckConfig.afterDate,
anomalyCheckConfig.beforeDate
)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import java.util.Calendar

import com.amazon.deequ.analyzers.{Analyzer, State}
import com.amazon.deequ.checks.Check
import com.amazon.deequ.constraints.{AnalysisBasedConstraint, Constraint, ConstraintDecorator}
import com.amazon.deequ.constraints.{AnalysisBasedConstraint, AnomalyExtendedResultsConstraint,
Constraint, ConstraintDecorator}
import com.amazon.deequ.metrics.Metric
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
Expand Down Expand Up @@ -187,9 +188,13 @@ private[deequ] class Applicability(session: SparkSession) {
case (name, nc: ConstraintDecorator) => name -> nc.inner
case (name, c: Constraint) => name -> c
}
.collect { case (name, constraint: AnalysisBasedConstraint[_, _, _]) =>
val metric = constraint.analyzer.calculate(data).value
name -> metric
.collect {
case (name, constraint: AnalysisBasedConstraint[_, _, _]) =>
val metric = constraint.analyzer.calculate(data).value
name -> metric
case (name, constraint: AnomalyExtendedResultsConstraint[_, _, _]) =>
val metric = constraint.analyzer.calculate(data).value
name -> metric
}

val constraintApplicabilities = check.constraints.zip(namedMetrics).map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,17 @@ trait AnomalyDetectionStrategy {
dataSeries: Vector[Double],
searchInterval: (Int, Int) = (0, Int.MaxValue)): Seq[(Int, Anomaly)]
}
trait AnomalyDetectionStrategyWithExtendedResults {

/**
* Search for anomalies in a series of data points, returns extended results.
*
* @param dataSeries The data contained in a Vector of Doubles
* @param searchInterval The indices between which anomalies should be detected. [a, b).
* @return The indices of all data points with their corresponding anomaly extended results wrapper
* object.
*/
def detectWithExtendedResults(
dataSeries: Vector[Double],
searchInterval: (Int, Int) = (0, Int.MaxValue)): Seq[(Int, AnomalyDetectionDataPoint)]
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,8 @@ case class AnomalyDetector(strategy: AnomalyDetectionStrategy) {

val allDataPoints = sortedDataPoints :+ newPoint

// Run anomaly
val anomalies = detectAnomaliesInHistory(allDataPoints, (newPoint.time, Long.MaxValue))
.anomalies

// Create a Detection result with all anomalies
DetectionResult(anomalies)
// Run anomaly and create a Detection result with all anomalies
detectAnomaliesInHistory(allDataPoints, (newPoint.time, Long.MaxValue))
}

/**
Expand Down Expand Up @@ -100,3 +96,86 @@ case class AnomalyDetector(strategy: AnomalyDetectionStrategy) {
DetectionResult(anomalies.map { case (index, anomaly) => (sortedTimestamps(index), anomaly) })
}
}

case class AnomalyDetectorWithExtendedResults(strategy: AnomalyDetectionStrategyWithExtendedResults) {


/**
* Given a sequence of metrics and a current value, detects if there is an anomaly by using the
* given algorithm and returns extended results.
*
* @param historicalDataPoints Sequence of tuples (Points in time with corresponding Metric).
* @param newPoint A new data point to check if there are anomalies
* @return
*/
def isNewPointAnomalousWithExtendedResults(
historicalDataPoints: Seq[DataPoint[Double]],
newPoint: DataPoint[Double])
: ExtendedDetectionResult = {

require(historicalDataPoints.nonEmpty, "historicalDataPoints must not be empty!")

val sortedDataPoints = historicalDataPoints.sortBy(_.time)

val firstDataPointTime = sortedDataPoints.head.time
val lastDataPointTime = sortedDataPoints.last.time

val newPointTime = newPoint.time

require(lastDataPointTime < newPointTime,
s"Can't decide which range to use for anomaly detection. New data point with time " +
s"$newPointTime is in history range ($firstDataPointTime - $lastDataPointTime)!")

val allDataPoints = sortedDataPoints :+ newPoint

// Run anomaly and create an Extended Detection result with all data points and anomaly details
detectAnomaliesInHistoryWithExtendedResults(allDataPoints, (newPoint.time, Long.MaxValue))
}


/**
* Given a strategy, detects anomalies in a time series after some preprocessing
* and returns extended results.
*
* @param dataSeries Sequence of tuples (Points in time with corresponding value).
* @param searchInterval The interval in which anomalies should be detected. [a, b).
* @return A wrapper object, containing all data points with anomaly extended results.
*/
def detectAnomaliesInHistoryWithExtendedResults(
dataSeries: Seq[DataPoint[Double]],
searchInterval: (Long, Long) = (Long.MinValue, Long.MaxValue))
: ExtendedDetectionResult = {

def findIndexForBound(sortedTimestamps: Seq[Long], boundValue: Long): Int = {
sortedTimestamps.search(boundValue).insertionPoint
}

val (searchStart, searchEnd) = searchInterval

require(searchStart <= searchEnd,
"The first interval element has to be smaller or equal to the last.")

// Remove missing values and sort series by time
val removedMissingValues = dataSeries.filter {
_.metricValue.isDefined
}
val sortedSeries = removedMissingValues.sortBy {
_.time
}
val sortedTimestamps = sortedSeries.map {
_.time
}

// Find indices of lower and upper bound
val lowerBoundIndex = findIndexForBound(sortedTimestamps, searchStart)
val upperBoundIndex = findIndexForBound(sortedTimestamps, searchEnd)

val anomalies = strategy.detectWithExtendedResults(
sortedSeries.flatMap {
_.metricValue
}.toVector, (lowerBoundIndex, upperBoundIndex))

ExtendedDetectionResult(anomalies.map { case (index, anomaly) => (sortedTimestamps(index), anomaly) })
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import breeze.linalg.DenseVector
* Set to 1 it calculates the difference between two consecutive values.
*/
trait BaseChangeStrategy
extends AnomalyDetectionStrategy {
extends AnomalyDetectionStrategy with AnomalyDetectionStrategyWithExtendedResults {

def maxRateDecrease: Option[Double]
def maxRateIncrease: Option[Double]
Expand Down Expand Up @@ -67,7 +67,8 @@ trait BaseChangeStrategy
}

/**
* Search for anomalies in a series of data points.
* Search for anomalies in a series of data points. This function uses the
* detectWithExtendedResults function and then filters and maps to return only anomaly data point objects.
*
* If there aren't enough data points preceding the searchInterval,
* it may happen that the interval's first elements (depending on the specified order)
Expand All @@ -81,6 +82,30 @@ trait BaseChangeStrategy
dataSeries: Vector[Double],
searchInterval: (Int, Int))
: Seq[(Int, Anomaly)] = {

detectWithExtendedResults(dataSeries, searchInterval)
.filter { case (_, anomDataPoint) => anomDataPoint.isAnomaly }
.map { case (i, anomDataPoint) =>
(i, Anomaly(Some(anomDataPoint.dataMetricValue), anomDataPoint.confidence, anomDataPoint.detail))
}
}

/**
* Search for anomalies in a series of data points, returns extended results.
*
* If there aren't enough data points preceding the searchInterval,
* it may happen that the interval's first elements (depending on the specified order)
* can't be flagged as anomalies.
*
* @param dataSeries The data contained in a Vector of Doubles
* @param searchInterval The indices between which anomalies should be detected. [a, b).
* @return The indices of all anomalies in the interval and their corresponding wrapper object
* with extended results.
*/
override def detectWithExtendedResults(
dataSeries: Vector[Double],
searchInterval: (Int, Int))
: Seq[(Int, AnomalyDetectionDataPoint)] = {
val (start, end) = searchInterval

require(start <= end,
Expand All @@ -89,15 +114,25 @@ trait BaseChangeStrategy
val startPoint = Seq(start - order, 0).max
val data = diff(DenseVector(dataSeries.slice(startPoint, end): _*), order).data

data.zipWithIndex.filter { case (value, _) =>
(value < maxRateDecrease.getOrElse(Double.MinValue)
|| value > maxRateIncrease.getOrElse(Double.MaxValue))
}
.map { case (change, index) =>
(index + startPoint + order, Anomaly(Option(dataSeries(index + startPoint + order)), 1.0,
Some(s"[AbsoluteChangeStrategy]: Change of $change is not in bounds [" +
s"${maxRateDecrease.getOrElse(Double.MinValue)}, " +
s"${maxRateIncrease.getOrElse(Double.MaxValue)}]. Order=$order")))
val lowerBound = maxRateDecrease.getOrElse(Double.MinValue)
val upperBound = maxRateIncrease.getOrElse(Double.MaxValue)


data.zipWithIndex.map {
case (change, index) =>
val outputSequenceIndex = index + startPoint + order
val value = dataSeries(outputSequenceIndex)
val (detail, isAnomaly) = if (change < lowerBound || change > upperBound) {
(Some(s"[AbsoluteChangeStrategy]: Change of $change is not in bounds [" +
s"$lowerBound, " +
s"$upperBound]. Order=$order"), true)
}
else {
(None, false)
}
(outputSequenceIndex, AnomalyDetectionDataPoint(value, change,
BoundedRange(lowerBound = Bound(lowerBound, inclusive = true),
upperBound = Bound(upperBound, inclusive = true)), isAnomaly, 1.0, detail))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import breeze.stats.meanAndVariance
case class BatchNormalStrategy(
lowerDeviationFactor: Option[Double] = Some(3.0),
upperDeviationFactor: Option[Double] = Some(3.0),
includeInterval: Boolean = false) extends AnomalyDetectionStrategy {
includeInterval: Boolean = false)
extends AnomalyDetectionStrategy with AnomalyDetectionStrategyWithExtendedResults
{

require(lowerDeviationFactor.isDefined || upperDeviationFactor.isDefined,
"At least one factor has to be specified.")
Expand All @@ -43,7 +45,8 @@ case class BatchNormalStrategy(


/**
* Search for anomalies in a series of data points.
* Search for anomalies in a series of data points. This function uses the
* detectWithExtendedResults function and then filters and maps to return only anomaly objects.
*
* @param dataSeries The data contained in a Vector of Doubles
* @param searchInterval The indices between which anomalies should be detected. [a, b).
Expand All @@ -53,6 +56,25 @@ case class BatchNormalStrategy(
dataSeries: Vector[Double],
searchInterval: (Int, Int)): Seq[(Int, Anomaly)] = {

detectWithExtendedResults(dataSeries, searchInterval)
.filter { case (_, anomDataPoint) => anomDataPoint.isAnomaly }
.map { case (i, anomDataPoint) =>
(i, Anomaly(Some(anomDataPoint.dataMetricValue), anomDataPoint.confidence, anomDataPoint.detail))
}
}

/**
* Search for anomalies in a series of data points, returns extended results.
*
* @param dataSeries The data contained in a Vector of Doubles
* @param searchInterval The indices between which anomalies should be detected. [a, b).
* @return The indices of all anomalies in the interval and their corresponding wrapper object
* with extended results.
*/
override def detectWithExtendedResults(
dataSeries: Vector[Double],
searchInterval: (Int, Int)): Seq[(Int, AnomalyDetectionDataPoint)] = {

val (searchStart, searchEnd) = searchInterval

require(searchStart <= searchEnd, "The start of the interval can't be larger than the end.")
Expand Down Expand Up @@ -83,13 +105,18 @@ case class BatchNormalStrategy(

dataSeries.zipWithIndex
.slice(searchStart, searchEnd)
.filter { case (value, _) => value > upperBound || value < lowerBound }
.map { case (value, index) =>

val detail = Some(s"[BatchNormalStrategy]: Value $value is not in " +
s"bounds [$lowerBound, $upperBound].")

(index, Anomaly(Option(value), 1.0, detail))
val (detail, isAnomaly) = if (value > upperBound || value < lowerBound) {
(Some(s"[BatchNormalStrategy]: Value $value is not in " +
s"bounds [$lowerBound, $upperBound]."), true)
} else {
(None, false)
}
(index, AnomalyDetectionDataPoint(value, value,
BoundedRange(lowerBound = Bound(lowerBound, inclusive = true),
upperBound = Bound(upperBound, inclusive = true)), isAnomaly, 1.0, detail))
}
}


}
Loading
Loading