Skip to content

Commit

Permalink
Add commits from master branch to release/2.0.8-spark-3.2 (#590)
Browse files Browse the repository at this point in the history
* Configurable RetainCompletenessRule (#564)

* Configurable RetainCompletenessRule

* Add doc string

* Add default completeness const

* Optional specification of instance name in CustomSQL analyzer metric. (#569)

Co-authored-by: Tyler Mcdaniel <[email protected]>

* Adding Wilson Score Confidence Interval Strategy (#567)

* Configurable RetainCompletenessRule

* Add doc string

* Add default completeness const

* Add ConfidenceIntervalStrategy

* Add Separate Wilson and Wald Interval Test

* Add License information, Fix formatting

* Add License information

* formatting fix

* Update documentation

* Make WaldInterval the default strategy for now

* Formatting import to per line

* Separate group import to per line import

* CustomAggregator (#572)

* Add support for EntityTypes dqdl rule

* Add support for Conditional Aggregation Analyzer

---------

Co-authored-by: Joshua Zexter <[email protected]>

* fix typo (#574)

* Fix performance of building row-level results (#577)

* Generate row-level results with withColumns

Iteratively using withColumn (singular) causes performance
issues when iterating over a large sequence of columns.

* Add back UNIQUENESS_ID

* Replace 'withColumns' with 'select' (#582)

'withColumns' was introduced in Spark 3.3, so it won't
work for Deequ's <3.3 builds.

* Replace rdd with dataframe functions in Histogram analyzer (#586)

Co-authored-by: Shriya Vanvari <[email protected]>

* Updated version in pom.xml to 2.0.8-spark-3.2

---------

Co-authored-by: zeotuan <[email protected]>
Co-authored-by: tylermcdaniel0 <[email protected]>
Co-authored-by: Tyler Mcdaniel <[email protected]>
Co-authored-by: Joshua Zexter <[email protected]>
Co-authored-by: Joshua Zexter <[email protected]>
Co-authored-by: bojackli <[email protected]>
Co-authored-by: Josh <[email protected]>
Co-authored-by: Shriya Vanvari <[email protected]>
Co-authored-by: Shriya Vanvari <[email protected]>
  • Loading branch information
10 people authored Oct 9, 2024
1 parent 3822387 commit 3d01d7e
Show file tree
Hide file tree
Showing 18 changed files with 708 additions and 62 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.amazon.deequ</groupId>
<artifactId>deequ</artifactId>
<version>2.0.7-spark-3.2</version>
<version>2.0.8-spark-3.2</version>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
Expand Down
7 changes: 3 additions & 4 deletions src/main/scala/com/amazon/deequ/VerificationResult.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.amazon.deequ.repository.SimpleResultSerde
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.functions.{col, monotonically_increasing_id}

import java.util.UUID

Expand Down Expand Up @@ -96,11 +96,10 @@ object VerificationResult {
data: DataFrame): DataFrame = {

val columnNamesToMetrics: Map[String, Column] = verificationResultToColumn(verificationResult)
val columnsAliased = columnNamesToMetrics.toSeq.map { case (name, col) => col.as(name) }

val dataWithID = data.withColumn(UNIQUENESS_ID, monotonically_increasing_id())
columnNamesToMetrics.foldLeft(dataWithID)(
(dataWithID, newColumn: (String, Column)) =>
dataWithID.withColumn(newColumn._1, newColumn._2)).drop(UNIQUENESS_ID)
dataWithID.select(col("*") +: columnsAliased: _*).drop(UNIQUENESS_ID)
}

def checkResultsAsJson(verificationResult: VerificationResult,
Expand Down
69 changes: 69 additions & 0 deletions src/main/scala/com/amazon/deequ/analyzers/CustomAggregator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
* is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/
package com.amazon.deequ.analyzers

import com.amazon.deequ.metrics.AttributeDoubleMetric
import com.amazon.deequ.metrics.Entity
import org.apache.spark.sql.DataFrame

import scala.util.Failure
import scala.util.Success
import scala.util.Try

// Define a custom state to hold aggregation results
case class AggregatedMetricState(counts: Map[String, Int], total: Int)
extends DoubleValuedState[AggregatedMetricState] {

override def sum(other: AggregatedMetricState): AggregatedMetricState = {
val combinedCounts = counts ++ other
.counts
.map { case (k, v) => k -> (v + counts.getOrElse(k, 0)) }
AggregatedMetricState(combinedCounts, total + other.total)
}

override def metricValue(): Double = counts.values.sum.toDouble / total
}

// Define the analyzer
case class CustomAggregator(aggregatorFunc: DataFrame => AggregatedMetricState,
metricName: String,
instance: String = "Dataset")
extends Analyzer[AggregatedMetricState, AttributeDoubleMetric] {

override def computeStateFrom(data: DataFrame, filterCondition: Option[String] = None)
: Option[AggregatedMetricState] = {
Try(aggregatorFunc(data)) match {
case Success(state) => Some(state)
case Failure(_) => None
}
}

override def computeMetricFrom(state: Option[AggregatedMetricState]): AttributeDoubleMetric = {
state match {
case Some(detState) =>
val metrics = detState.counts.map { case (key, count) =>
key -> (count.toDouble / detState.total)
}
AttributeDoubleMetric(Entity.Column, metricName, instance, Success(metrics))
case None =>
AttributeDoubleMetric(Entity.Column, metricName, instance,
Failure(new RuntimeException("Metric computation failed")))
}
}

override private[deequ] def toFailureMetric(failure: Exception): AttributeDoubleMetric = {
AttributeDoubleMetric(Entity.Column, metricName, instance, Failure(failure))
}
}
14 changes: 9 additions & 5 deletions src/main/scala/com/amazon/deequ/analyzers/CustomSql.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ case class CustomSqlState(stateOrError: Either[Double, String]) extends DoubleVa
override def metricValue(): Double = state
}

case class CustomSql(expression: String) extends Analyzer[CustomSqlState, DoubleMetric] {
case class CustomSql(expression: String, disambiguator: String = "*") extends Analyzer[CustomSqlState, DoubleMetric] {
/**
* Compute the state (sufficient statistics) from the data
*
Expand Down Expand Up @@ -76,15 +76,19 @@ case class CustomSql(expression: String) extends Analyzer[CustomSqlState, Double
state match {
// The returned state may
case Some(theState) => theState.stateOrError match {
case Left(value) => DoubleMetric(Entity.Dataset, "CustomSQL", "*", Success(value))
case Right(error) => DoubleMetric(Entity.Dataset, "CustomSQL", "*", Failure(new RuntimeException(error)))
case Left(value) => DoubleMetric(Entity.Dataset, "CustomSQL", disambiguator,
Success(value))
case Right(error) => DoubleMetric(Entity.Dataset, "CustomSQL", disambiguator,
Failure(new RuntimeException(error)))
}
case None =>
DoubleMetric(Entity.Dataset, "CustomSQL", "*", Failure(new RuntimeException("CustomSql Failed To Run")))
DoubleMetric(Entity.Dataset, "CustomSQL", disambiguator,
Failure(new RuntimeException("CustomSql Failed To Run")))
}
}

override private[deequ] def toFailureMetric(failure: Exception) = {
DoubleMetric(Entity.Dataset, "CustomSQL", "*", Failure(new RuntimeException("CustomSql Failed To Run")))
DoubleMetric(Entity.Dataset, "CustomSQL", disambiguator,
Failure(new RuntimeException("CustomSql Failed To Run")))
}
}
21 changes: 18 additions & 3 deletions src/main/scala/com/amazon/deequ/analyzers/Histogram.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,26 @@ case class Histogram(
case Some(theState) =>
val value: Try[Distribution] = Try {

val topNRows = theState.frequencies.rdd.top(maxDetailBins)(OrderByAbsoluteCount)
val countColumnName = theState.frequencies.schema.fields
.find(field => field.dataType == LongType && field.name != column)
.map(_.name)
.getOrElse(throw new IllegalStateException(s"Count column not found in the frequencies DataFrame"))

val topNRowsDF = theState.frequencies
.orderBy(col(countColumnName).desc)
.limit(maxDetailBins)
.collect()

val binCount = theState.frequencies.count()

val histogramDetails = topNRows
.map { case Row(discreteValue: String, absolute: Long) =>
val columnName = theState.frequencies.columns
.find(_ == column)
.getOrElse(throw new IllegalStateException(s"Column $column not found"))

val histogramDetails = topNRowsDF
.map { row =>
val discreteValue = row.getAs[String](columnName)
val absolute = row.getAs[Long](countColumnName)
val ratio = absolute.toDouble / theState.numRows
discreteValue -> DistributionValue(absolute, ratio)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.amazon.deequ.examples

import com.amazon.deequ.examples.ExampleUtils.withSpark
import com.amazon.deequ.suggestions.rules.RetainCompletenessRule
import com.amazon.deequ.suggestions.rules.interval.WilsonScoreIntervalStrategy
import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}

private[examples] object ConstraintSuggestionExample extends App {
Expand Down Expand Up @@ -51,6 +53,10 @@ private[examples] object ConstraintSuggestionExample extends App {
val suggestionResult = ConstraintSuggestionRunner()
.onData(data)
.addConstraintRules(Rules.EXTENDED)
// We can also add our own constraint and customize constraint parameters
.addConstraintRule(
RetainCompletenessRule(intervalStrategy = WilsonScoreIntervalStrategy())
)
.run()

// We can now investigate the constraints that deequ suggested. We get a textual description
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ val suggestionResult = ConstraintSuggestionRunner()
.run()
```

Alternatively, we also support customizing and adding individual constraint rule using `addConstraintRule()`
```scala
val suggestionResult = ConstraintSuggestionRunner()
.onData(data)

.addConstraintRule(
RetainCompletenessRule(intervalStrategy = WilsonScoreIntervalStrategy())
)
.run()
```

We can now investigate the constraints that deequ suggested. We get a textual description and the corresponding scala code for each suggested constraint. Note that the constraint suggestion is based on heuristic rules and assumes that the data it is shown is 'static' and correct, which might often not be the case in the real world. Therefore the suggestions should always be manually reviewed before being applied in real deployments.
```scala
suggestionResult.constraintSuggestions.foreach { case (column, suggestions) =>
Expand Down Expand Up @@ -92,3 +103,5 @@ The corresponding scala code is .isContainedIn("status", Array("DELAYED", "UNKNO
Currently, we leave it up to the user to decide whether they want to apply the suggested constraints or not, and provide the corresponding Scala code for convenience. For larger datasets, it makes sense to evaluate the suggested constraints on some held-out portion of the data to see whether they hold or not. You can test this by adding an invocation of `.useTrainTestSplitWithTestsetRatio(0.1)` to the `ConstraintSuggestionRunner`. With this configuration, it would compute constraint suggestions on 90% of the data and evaluate the suggested constraints on the remaining 10%.

Finally, we would also like to note that the constraint suggestion code provides access to the underlying [column profiles](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/data_profiling_example.md) that it computed via `suggestionResult.columnProfiles`.

An [executable and extended version of this example](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/.scala) is part of our code base.
17 changes: 17 additions & 0 deletions src/main/scala/com/amazon/deequ/metrics/Metric.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,20 @@ case class KeyedDoubleMetric(
}
}
}

case class AttributeDoubleMetric(
entity: Entity.Value,
name: String,
instance: String,
value: Try[Map[String, Double]])
extends Metric[Map[String, Double]] {

override def flatten(): Seq[DoubleMetric] = {
value match {
case Success(valuesMap) => valuesMap.map { case (key, metricValue) =>
DoubleMetric(entity, s"$name.$key", instance, Success(metricValue))
}.toSeq
case Failure(ex) => Seq(DoubleMetric(entity, name, instance, Failure(ex)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class ConstraintSuggestionWithValue[T](

object ConstraintSuggestions {

private[this] val CONSTRANT_SUGGESTIONS_FIELD = "constraint_suggestions"
private[this] val CONSTRAINT_SUGGESTIONS_FIELD = "constraint_suggestions"

private[suggestions] def toJson(constraintSuggestions: Seq[ConstraintSuggestion]): String = {

Expand All @@ -68,7 +68,7 @@ object ConstraintSuggestions {
constraintsJson.add(constraintJson)
}

json.add(CONSTRANT_SUGGESTIONS_FIELD, constraintsJson)
json.add(CONSTRAINT_SUGGESTIONS_FIELD, constraintsJson)

val gson = new GsonBuilder()
.setPrettyPrinting()
Expand Down Expand Up @@ -109,7 +109,7 @@ object ConstraintSuggestions {
constraintEvaluations.add(constraintEvaluation)
}

json.add(CONSTRANT_SUGGESTIONS_FIELD, constraintEvaluations)
json.add(CONSTRAINT_SUGGESTIONS_FIELD, constraintEvaluations)

val gson = new GsonBuilder()
.setPrettyPrinting()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ import com.amazon.deequ.metrics.DistributionValue
import com.amazon.deequ.profiles.ColumnProfile
import com.amazon.deequ.suggestions.ConstraintSuggestion
import com.amazon.deequ.suggestions.ConstraintSuggestionWithValue
import com.amazon.deequ.suggestions.rules.interval.ConfidenceIntervalStrategy.defaultIntervalStrategy
import com.amazon.deequ.suggestions.rules.interval.ConfidenceIntervalStrategy
import org.apache.commons.lang3.StringEscapeUtils

import scala.math.BigDecimal.RoundingMode

/** If we see a categorical range for most values in a column, we suggest an IS IN (...)
* constraint that should hold for most values */
case class FractionalCategoricalRangeRule(
targetDataCoverageFraction: Double = 0.9,
categorySorter: Array[(String, DistributionValue)] => Array[(String, DistributionValue)] =
categories => categories.sortBy({ case (_, value) => value.absolute }).reverse
categories => categories.sortBy({ case (_, value) => value.absolute }).reverse,
intervalStrategy: ConfidenceIntervalStrategy = defaultIntervalStrategy
) extends ConstraintRule[ColumnProfile] {

override def shouldBeApplied(profile: ColumnProfile, numRecords: Long): Boolean = {
Expand Down Expand Up @@ -79,11 +80,8 @@ case class FractionalCategoricalRangeRule(

val p = ratioSums
val n = numRecords
val z = 1.96

// TODO this needs to be more robust for p's close to 0 or 1
val targetCompliance = BigDecimal(p - z * math.sqrt(p * (1 - p) / n))
.setScale(2, RoundingMode.DOWN).toDouble
val targetCompliance = intervalStrategy.calculateTargetConfidenceInterval(p, n).lowerBound

val description = s"'${profile.column}' has value range $categoriesSql for at least " +
s"${targetCompliance * 100}% of values"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,31 @@ import com.amazon.deequ.constraints.Constraint.completenessConstraint
import com.amazon.deequ.profiles.ColumnProfile
import com.amazon.deequ.suggestions.CommonConstraintSuggestion
import com.amazon.deequ.suggestions.ConstraintSuggestion

import scala.math.BigDecimal.RoundingMode
import com.amazon.deequ.suggestions.rules.RetainCompletenessRule._
import com.amazon.deequ.suggestions.rules.interval.ConfidenceIntervalStrategy.defaultIntervalStrategy
import com.amazon.deequ.suggestions.rules.interval.ConfidenceIntervalStrategy

/**
* If a column is incomplete in the sample, we model its completeness as a binomial variable,
* estimate a confidence interval and use this to define a lower bound for the completeness
*
* @param minCompleteness : minimum completeness threshold to determine if rule should be applied
* @param maxCompleteness : maximum completeness threshold to determine if rule should be applied
*/
case class RetainCompletenessRule() extends ConstraintRule[ColumnProfile] {

case class RetainCompletenessRule(
minCompleteness: Double = defaultMinCompleteness,
maxCompleteness: Double = defaultMaxCompleteness,
intervalStrategy: ConfidenceIntervalStrategy = defaultIntervalStrategy
) extends ConstraintRule[ColumnProfile] {
override def shouldBeApplied(profile: ColumnProfile, numRecords: Long): Boolean = {
profile.completeness > 0.2 && profile.completeness < 1.0
profile.completeness > minCompleteness && profile.completeness < maxCompleteness
}

override def candidate(profile: ColumnProfile, numRecords: Long): ConstraintSuggestion = {

val p = profile.completeness
val n = numRecords
val z = 1.96

// TODO this needs to be more robust for p's close to 0 or 1
val targetCompleteness = BigDecimal(p - z * math.sqrt(p * (1 - p) / n))
.setScale(2, RoundingMode.DOWN).toDouble
val targetCompleteness = intervalStrategy.calculateTargetConfidenceInterval(
profile.completeness,
numRecords
).lowerBound

val constraint = completenessConstraint(profile.column, _ >= targetCompleteness)

Expand All @@ -65,3 +68,8 @@ case class RetainCompletenessRule() extends ConstraintRule[ColumnProfile] {
"we model its completeness as a binomial variable, estimate a confidence interval " +
"and use this to define a lower bound for the completeness"
}

object RetainCompletenessRule {
private val defaultMinCompleteness: Double = 0.2
private val defaultMaxCompleteness: Double = 1.0
}
Loading

0 comments on commit 3d01d7e

Please sign in to comment.