Skip to content

Commit

Permalink
1)CBOSuite was moved to StatisticsCollectionSuite
Browse files Browse the repository at this point in the history
2)Reduced number of changed lines in FilterEstomation.evaluateInSet
  • Loading branch information
mshtelma committed Apr 13, 2018
1 parent 74b6ebd commit 0faa789
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -392,31 +392,34 @@ case class FilterEstimation(plan: Filter) extends Logging {
val dataType = attr.dataType
var newNdv = ndv

if (ndv.toDouble == 0 || colStat.min.isEmpty || colStat.max.isEmpty) {
return Some(0.0)
}

// use [min, max] to filter the original hSet
dataType match {
case _: NumericType | BooleanType | DateType | TimestampType =>
if (colStat.min.isDefined && colStat.max.isDefined) {
val statsInterval =
ValueInterval(colStat.min, colStat.max, dataType).asInstanceOf[NumericValueInterval]
val validQuerySet = hSet.filter { v =>
v != null && statsInterval.contains(Literal(v, dataType))
}
val statsInterval =
ValueInterval(colStat.min, colStat.max, dataType).asInstanceOf[NumericValueInterval]
val validQuerySet = hSet.filter { v =>
v != null && statsInterval.contains(Literal(v, dataType))
}

if (validQuerySet.isEmpty) {
return Some(0.0)
}
if (validQuerySet.isEmpty) {
return Some(0.0)
}

val newMax = validQuerySet.maxBy(EstimationUtils.toDouble(_, dataType))
val newMin = validQuerySet.minBy(EstimationUtils.toDouble(_, dataType))
// newNdv should not be greater than the old ndv. For example, column has only 2 values
// 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validQuerySet.size is 5.
newNdv = ndv.min(BigInt(validQuerySet.size))
if (update) {
val newStats = colStat.copy(distinctCount = Some(newNdv), min = Some(newMin),
max = Some(newMax), nullCount = Some(0))
colStatsMap.update(attr, newStats)
}
val newMax = validQuerySet.maxBy(EstimationUtils.toDouble(_, dataType))
val newMin = validQuerySet.minBy(EstimationUtils.toDouble(_, dataType))
// newNdv should not be greater than the old ndv. For example, column has only 2 values
// 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validQuerySet.size is 5.
newNdv = ndv.min(BigInt(validQuerySet.size))
if (update) {
val newStats = colStat.copy(distinctCount = Some(newNdv), min = Some(newMin),
max = Some(newMax), nullCount = Some(0))
colStatsMap.update(attr, newStats)
}

// We assume the whole set since there is no min/max information for String/Binary type
case StringType | BinaryType =>
newNdv = ndv.min(BigInt(hSet.size))
Expand All @@ -428,11 +431,7 @@ case class FilterEstimation(plan: Filter) extends Logging {

// return the filter selectivity. Without advanced statistics such as histograms,
// we have to assume uniform distribution.
if (ndv.toDouble != 0) {
Some(math.min(newNdv.toDouble / ndv.toDouble, 1.0))
} else {
Some(0.0)
}
Some(math.min(newNdv.toDouble / ndv.toDouble, 1.0))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,9 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
test("evaluateInSet with all zeros") {
validateEstimatedStats(
Filter(InSet(attrString, Set(3, 4, 5)),
StatsTestPlan(Seq(attrString), 10,
StatsTestPlan(Seq(attrString), 0,
AttributeMap(Seq(attrString ->
ColumnStat(distinctCount = Some(0), min = Some(0), max = Some(0),
ColumnStat(distinctCount = Some(0), min = None, max = None,
nullCount = Some(0), avgLen = Some(0), maxLen = Some(0)))))),
Seq(attrString -> ColumnStat(distinctCount = Some(0))),
expectedRowCount = 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,4 +382,34 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
}
}
}

test("Simple queries must be working, if CBO is turned on") {
withSQLConf(("spark.sql.cbo.enabled", "true")) {
withTable("TBL1", "TBL") {
import org.apache.spark.sql.functions._
val df = spark.range(1000L).select('id,
'id * 2 as "FLD1",
'id * 12 as "FLD2",
lit("aaa") + 'id as "fld3")
df.write
.mode(SaveMode.Overwrite)
.bucketBy(10, "id", "FLD1", "FLD2")
.sortBy("id", "FLD1", "FLD2")
.saveAsTable("TBL")
spark.sql("ANALYZE TABLE TBL COMPUTE STATISTICS ")
spark.sql("ANALYZE TABLE TBL COMPUTE STATISTICS FOR COLUMNS ID, FLD1, FLD2, FLD3")
val df2 = spark.sql(
"""
SELECT t1.id, t1.fld1, t1.fld2, t1.fld3
FROM tbl t1
JOIN tbl t2 on t1.id=t2.id
WHERE t1.fld3 IN (-123.23,321.23)
""".stripMargin)
df2.createTempView("TBL2")
spark.sql("SELECT * FROM tbl2 WHERE fld3 IN ('qqq', 'qwe') ").explain()
}
}

}

}

This file was deleted.

0 comments on commit 0faa789

Please sign in to comment.