Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12506][SQL]push down WHERE clause arithmetic operator to JDBC #10750

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,120 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) =>
Some(sources.StringContains(a.name, v.toString))

case expressions.BinaryComparison(BinaryArithmetic(left, right), Literal(v, t)) =>
translateArithemiticOPFilter (predicate)
case expressions.BinaryComparison(Literal(v, t), BinaryArithmetic(left, right)) =>
translateArithemiticOPFilter (predicate)

case _ => None
}
}

private def translateArithemiticOPFilter(predicate: Expression): Option[Filter] = {
predicate match {
case expressions.EqualTo(Add(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPEqualTo(Add(left, right), convertToScala(v, t)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As described in SPARK-10195, it looks now data sources API exposes Catalyst's internal types through its Filter interfaces. I think this might have to be hidden.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look of SPARK-10195. Looks like it deals with the issue of exposing internal data types. It uses convertToScala to convert these internal data types to scala version. Since here convertToScala is used to convert the values. I think it should not be the same problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm.. I see. but for me it still looks expression._ might have to hide. In this way, the expression._ can be accessed in datasource level. I believe the reason way source._ is implemented is, to hide expression._ which has been changed rapidly version by version.

case expressions.EqualTo(Literal(v, t), Add(left, right)) =>
Some(sources.ArithmeticOPEqualTo(Add(left, right), convertToScala(v, t)))

case expressions.EqualTo(Subtract(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPEqualTo(Subtract(left, right), convertToScala(v, t)))
case expressions.EqualTo(Literal(v, t), Subtract(left, right)) =>
Some(sources.ArithmeticOPEqualTo(Subtract(left, right), convertToScala(v, t)))

case expressions.EqualTo(Multiply(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPEqualTo(Multiply(left, right), convertToScala(v, t)))
case expressions.EqualTo(Literal(v, t), Multiply(left, right)) =>
Some(sources.ArithmeticOPEqualTo(Multiply(left, right), convertToScala(v, t)))

case expressions.EqualTo(Divide(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPEqualTo(Divide(left, right), convertToScala(v, t)))
case expressions.EqualTo(Literal(v, t), Divide(left, right)) =>
Some(sources.ArithmeticOPEqualTo(Divide(left, right), convertToScala(v, t)))

case expressions.GreaterThan(Add(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPGreaterThan(Add(left, right), convertToScala(v, t)))
case expressions.GreaterThan(Literal(v, t), Add(left, right)) =>
Some(sources.ArithmeticOPLessThan(Add(left, right), convertToScala(v, t)))

case expressions.GreaterThan(Subtract(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPGreaterThan(Subtract(left, right), convertToScala(v, t)))
case expressions.GreaterThan(Literal(v, t), Subtract(left, right)) =>
Some(sources.ArithmeticOPLessThan(Subtract(left, right), convertToScala(v, t)))

case expressions.GreaterThan(Multiply(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPGreaterThan(Multiply(left, right), convertToScala(v, t)))
case expressions.GreaterThan(Literal(v, t), Multiply(left, right)) =>
Some(sources.ArithmeticOPLessThan(Multiply(left, right), convertToScala(v, t)))

case expressions.GreaterThan(Divide(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPGreaterThan(Divide(left, right), convertToScala(v, t)))
case expressions.GreaterThan(Literal(v, t), Divide(left, right)) =>
Some(sources.ArithmeticOPLessThan(Divide(left, right), convertToScala(v, t)))

case expressions.LessThan(Add(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPLessThan(Add(left, right), convertToScala(v, t)))
case expressions.LessThan(Literal(v, t), Add(left, right)) =>
Some(sources.ArithmeticOPGreaterThan(Add(left, right), convertToScala(v, t)))

case expressions.LessThan(Subtract(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPLessThan(Subtract(left, right), convertToScala(v, t)))
case expressions.LessThan(Literal(v, t), Subtract(left, right)) =>
Some(sources.ArithmeticOPGreaterThan(Subtract(left, right), convertToScala(v, t)))

case expressions.LessThan(Multiply(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPLessThan(Multiply(left, right), convertToScala(v, t)))
case expressions.LessThan(Literal(v, t), Multiply(left, right)) =>
Some(sources.ArithmeticOPGreaterThan(Multiply(left, right), convertToScala(v, t)))

case expressions.LessThan(Divide(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPLessThan(Divide(left, right), convertToScala(v, t)))
case expressions.LessThan(Literal(v, t), Divide(left, right)) =>
Some(sources.ArithmeticOPGreaterThan(Divide(left, right), convertToScala(v, t)))

case expressions.GreaterThanOrEqual(Add(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPGreaterThanOrEqual(Add(left, right), convertToScala(v, t)))
case expressions.GreaterThanOrEqual(Literal(v, t), Add(left, right)) =>
Some(sources.ArithmeticOPLessThanOrEqual(Add(left, right), convertToScala(v, t)))

case expressions.GreaterThanOrEqual(Subtract(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPGreaterThanOrEqual(Subtract(left, right), convertToScala(v, t)))
case expressions.GreaterThanOrEqual(Literal(v, t), Subtract(left, right)) =>
Some(sources.ArithmeticOPLessThanOrEqual(Subtract(left, right), convertToScala(v, t)))

case expressions.GreaterThanOrEqual(Multiply(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPGreaterThanOrEqual(Multiply(left, right), convertToScala(v, t)))
case expressions.GreaterThanOrEqual( Literal(v, t), Multiply(left, right)) =>
Some(sources.ArithmeticOPLessThanOrEqual(Multiply(left, right), convertToScala(v, t)))

case expressions.GreaterThanOrEqual(Divide(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPGreaterThanOrEqual(Divide(left, right), convertToScala(v, t)))
case expressions.GreaterThanOrEqual(Literal(v, t), Divide(left, right)) =>
Some(sources.ArithmeticOPLessThanOrEqual(Divide(left, right), convertToScala(v, t)))


case expressions.LessThanOrEqual(Add(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPLessThanOrEqual(Add(left, right), convertToScala(v, t)))
case expressions.LessThanOrEqual(Literal(v, t), Add(left, right)) =>
Some(sources.ArithmeticOPGreaterThanOrEqual(Add(left, right), convertToScala(v, t)))

case expressions.LessThanOrEqual(Subtract(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPLessThanOrEqual(Subtract(left, right), convertToScala(v, t)))
case expressions.LessThanOrEqual(Literal(v, t), Subtract(left, right)) =>
Some(sources.ArithmeticOPGreaterThanOrEqual(Subtract(left, right), convertToScala(v, t)))

case expressions.LessThanOrEqual(Multiply(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPLessThanOrEqual(Multiply(left, right), convertToScala(v, t)))
case expressions.LessThanOrEqual(Literal(v, t), Multiply(left, right)) =>
Some(sources.ArithmeticOPGreaterThanOrEqual(Multiply(left, right), convertToScala(v, t)))

case expressions.LessThanOrEqual(Divide(left, right), Literal(v, t)) =>
Some(sources.ArithmeticOPLessThanOrEqual(Divide(left, right), convertToScala(v, t)))
case expressions.LessThanOrEqual(Literal(v, t), Divide(left, right)) =>
Some(sources.ArithmeticOPGreaterThanOrEqual(Divide(left, right), convertToScala(v, t)))
}
}

/**
* Selects Catalyst predicate [[Expression]]s which are convertible into data source [[Filter]]s
* and can be handled by `relation`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import org.apache.commons.lang3.StringUtils

import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
import org.apache.spark.sql.catalyst.{expressions,InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, SpecificMutableRow}
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -223,11 +223,57 @@ private[sql] object JDBCRDD extends Logging {
} else {
null
}
case ArithmeticOPEqualTo(operation, value) =>
getArithmeticString(operation).get + s" = ${compileValue(value)}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When getArithmeticString returns None, you will get java.util.NoSuchElementException here.

case ArithmeticOPGreaterThan(operation, value) =>
getArithmeticString(operation).get + s" > ${compileValue(value)}"
case ArithmeticOPGreaterThanOrEqual(operation, value) =>
getArithmeticString(operation).get + s" >= ${compileValue(value)}"
case ArithmeticOPLessThan(operation, value) =>
getArithmeticString(operation).get + s" < ${compileValue(value)}"
case ArithmeticOPLessThanOrEqual(operation, value) =>
getArithmeticString(operation).get + s" <= ${compileValue(value)}"
case _ => null
})
}


private def getArithmeticString (predicate: Expression): Option[String] = {
predicate match {
case expressions.Add(left, right) => {
val add = Seq(left, right).map(getArithmeticString(_)).flatten
if (add.size == 2) {
Some(add.map(p => s"($p)").mkString(" + "))
} else {
None
}
}
case expressions.Subtract(left, right) => {
val subtract = Seq(left, right).map(getArithmeticString(_)).flatten
if (subtract.size == 2) {
Some(subtract.map(p => s"($p)").mkString(" - "))
} else {
None
}
}
case expressions.Multiply(left, right) => {
val multiply = Seq(left, right).map(getArithmeticString(_)).flatten
if (multiply.size == 2) {
Some(multiply.map(p => s"($p)").mkString(" * "))
} else {
None
}
}
case expressions.Divide(left, right) => {
val divide = Seq(left, right).map(getArithmeticString(_)).flatten
if (divide.size == 2) {
Some(divide.map(p => s"($p)").mkString(" / "))
} else {
None
}
}
case a:Attribute => Some(a.name)
}
}

/**
* Build and return JDBCRDD from the given information.
Expand Down
42 changes: 42 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.sources

import org.apache.spark.sql.catalyst.expressions.BinaryArithmetic

////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines all the filters that we can push down to the data sources.
////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -142,3 +144,43 @@ case class StringEndsWith(attribute: String, value: String) extends Filter
* @since 1.3.1
*/
case class StringContains(attribute: String, value: String) extends Filter

/**
* A filter that evaluates to `true` iff the Arithmetic operation evaluates to a value
* equal to `value`.
*
* @since 2.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be 2.0.0.

*/
case class ArithmeticOPEqualTo(operation: BinaryArithmetic, value: Any) extends Filter

/**
* A filter that evaluates to `true` iff the Arithmetic operation evaluates to a value
* greater than `value`.
*
* @since 2.0
*/
case class ArithmeticOPGreaterThan(operation: BinaryArithmetic, value: Any) extends Filter

/**
* A filter that evaluates to `true` iff the Arithmetic operation evaluates to a value
* greater than or equal to `value`.
*
* @since 2.0
*/
case class ArithmeticOPGreaterThanOrEqual(operation: BinaryArithmetic, value: Any) extends Filter

/**
* A filter that evaluates to `true` iff the Arithmetic operation evaluates to a value
* less than `value`.
*
* @since 2.0
*/
case class ArithmeticOPLessThan(operation: BinaryArithmetic, value: Any) extends Filter

/**
* A filter that evaluates to `true` iff the Arithmetic operation evaluates to a value
* less than or equal to `value`.
*
* @since 2.0
*/
case class ArithmeticOPLessThanOrEqual(operation: BinaryArithmetic, value: Any) extends Filter
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class JDBCSuite extends SparkFunSuite
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1)
assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1)
assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0)
assert(stripSparkFilter(sql("SELECT * FROM inttypes WHERE (A+C)*D-A = 15")).collect().size == 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add more tests?


// This is a test to reflect discussion in SPARK-12218.
// The older versions of spark have this kind of bugs in parquet data source.
Expand Down