Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into SPARK-1413
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed Apr 8, 2014
2 parents ba09bcd + f27e56a commit 38160cb
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.{errors, trees}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
import org.apache.spark.sql.catalyst.trees.TreeNode

Expand All @@ -36,34 +37,41 @@ case class UnresolvedRelation(
databaseName: Option[String],
tableName: String,
alias: Option[String] = None) extends BaseRelation {
def output = Nil
override def output = Nil
override lazy val resolved = false
}

/**
* Holds the name of an attribute that has yet to be resolved.
*/
case class UnresolvedAttribute(name: String) extends Attribute with trees.LeafNode[Expression] {
def exprId = throw new UnresolvedException(this, "exprId")
def dataType = throw new UnresolvedException(this, "dataType")
def nullable = throw new UnresolvedException(this, "nullable")
def qualifiers = throw new UnresolvedException(this, "qualifiers")
override def exprId = throw new UnresolvedException(this, "exprId")
override def dataType = throw new UnresolvedException(this, "dataType")
override def nullable = throw new UnresolvedException(this, "nullable")
override def qualifiers = throw new UnresolvedException(this, "qualifiers")
override lazy val resolved = false

def newInstance = this
def withQualifiers(newQualifiers: Seq[String]) = this
override def newInstance = this
override def withQualifiers(newQualifiers: Seq[String]) = this

// Unresolved attributes are transient at compile time and don't get evaluated during execution.
override def eval(input: Row = null): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")

override def toString: String = s"'$name"
}

case class UnresolvedFunction(name: String, children: Seq[Expression]) extends Expression {
def exprId = throw new UnresolvedException(this, "exprId")
def dataType = throw new UnresolvedException(this, "dataType")
override def dataType = throw new UnresolvedException(this, "dataType")
override def foldable = throw new UnresolvedException(this, "foldable")
def nullable = throw new UnresolvedException(this, "nullable")
def qualifiers = throw new UnresolvedException(this, "qualifiers")
def references = children.flatMap(_.references).toSet
override def nullable = throw new UnresolvedException(this, "nullable")
override def references = children.flatMap(_.references).toSet
override lazy val resolved = false

// Unresolved functions are transient at compile time and don't get evaluated during execution.
override def eval(input: Row = null): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")

override def toString = s"'$name(${children.mkString(",")})"
}

Expand All @@ -79,15 +87,15 @@ case class Star(
mapFunction: Attribute => Expression = identity[Attribute])
extends Attribute with trees.LeafNode[Expression] {

def name = throw new UnresolvedException(this, "exprId")
def exprId = throw new UnresolvedException(this, "exprId")
def dataType = throw new UnresolvedException(this, "dataType")
def nullable = throw new UnresolvedException(this, "nullable")
def qualifiers = throw new UnresolvedException(this, "qualifiers")
override def name = throw new UnresolvedException(this, "exprId")
override def exprId = throw new UnresolvedException(this, "exprId")
override def dataType = throw new UnresolvedException(this, "dataType")
override def nullable = throw new UnresolvedException(this, "nullable")
override def qualifiers = throw new UnresolvedException(this, "qualifiers")
override lazy val resolved = false

def newInstance = this
def withQualifiers(newQualifiers: Seq[String]) = this
override def newInstance = this
override def withQualifiers(newQualifiers: Seq[String]) = this

def expand(input: Seq[Attribute]): Seq[NamedExpression] = {
val expandedAttributes: Seq[Attribute] = table match {
Expand All @@ -104,5 +112,9 @@ case class Star(
mappedAttributes
}

// Star gets expanded at runtime so we never evaluate a Star.
override def eval(input: Row = null): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")

override def toString = table.map(_ + ".").getOrElse("") + "*"
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ package object dsl {
implicit class DslSymbol(sym: Symbol) extends ImplicitAttribute { def s = sym.name }
// TODO more implicit class for literal?
implicit class DslString(val s: String) extends ImplicitOperators {
def expr: Expression = Literal(s)
override def expr: Expression = Literal(s)
def attr = analysis.UnresolvedAttribute(s)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {

private def decimalToTimestamp(d: BigDecimal) = {
val seconds = d.longValue()
val bd = (d - seconds) * (1000000000)
val bd = (d - seconds) * 1000000000
val nanos = bd.intValue()

// Convert to millis
Expand All @@ -96,18 +96,23 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {

// remaining fractional portion as nanos
t.setNanos(nanos)

t
}

private def timestampToDouble(t: Timestamp) = (t.getSeconds() + t.getNanos().toDouble / 1000)
// Timestamp to long, converting milliseconds to seconds
private def timestampToLong(ts: Timestamp) = ts.getTime / 1000

private def timestampToDouble(ts: Timestamp) = {
// First part is the seconds since the beginning of time, followed by nanosecs.
ts.getTime / 1000 + ts.getNanos.toDouble / 1000000000
}

def castToLong: Any => Any = child.dataType match {
case StringType => nullOrCast[String](_, s => try s.toLong catch {
case _: NumberFormatException => null
})
case BooleanType => nullOrCast[Boolean](_, b => if(b) 1 else 0)
case TimestampType => nullOrCast[Timestamp](_, t => timestampToDouble(t).toLong)
case TimestampType => nullOrCast[Timestamp](_, t => timestampToLong(t))
case DecimalType => nullOrCast[BigDecimal](_, _.toLong)
case x: NumericType => b => x.numeric.asInstanceOf[Numeric[Any]].toLong(b)
}
Expand All @@ -117,7 +122,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
case _: NumberFormatException => null
})
case BooleanType => nullOrCast[Boolean](_, b => if(b) 1 else 0)
case TimestampType => nullOrCast[Timestamp](_, t => timestampToDouble(t).toInt)
case TimestampType => nullOrCast[Timestamp](_, t => timestampToLong(t).toInt)
case DecimalType => nullOrCast[BigDecimal](_, _.toInt)
case x: NumericType => b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b)
}
Expand All @@ -127,7 +132,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
case _: NumberFormatException => null
})
case BooleanType => nullOrCast[Boolean](_, b => if(b) 1 else 0)
case TimestampType => nullOrCast[Timestamp](_, t => timestampToDouble(t).toShort)
case TimestampType => nullOrCast[Timestamp](_, t => timestampToLong(t).toShort)
case DecimalType => nullOrCast[BigDecimal](_, _.toShort)
case x: NumericType => b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b).toShort
}
Expand All @@ -137,7 +142,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
case _: NumberFormatException => null
})
case BooleanType => nullOrCast[Boolean](_, b => if(b) 1 else 0)
case TimestampType => nullOrCast[Timestamp](_, t => timestampToDouble(t).toByte)
case TimestampType => nullOrCast[Timestamp](_, t => timestampToLong(t).toByte)
case DecimalType => nullOrCast[BigDecimal](_, _.toByte)
case x: NumericType => b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b).toByte
}
Expand All @@ -147,7 +152,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
case _: NumberFormatException => null
})
case BooleanType => nullOrCast[Boolean](_, b => if(b) BigDecimal(1) else BigDecimal(0))
case TimestampType => nullOrCast[Timestamp](_, t => BigDecimal(timestampToDouble(t)))
case TimestampType =>
// Note that we lose precision here.
nullOrCast[Timestamp](_, t => BigDecimal(timestampToDouble(t)))
case x: NumericType => b => BigDecimal(x.numeric.asInstanceOf[Numeric[Any]].toDouble(b))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ abstract class Expression extends TreeNode[Expression] {
def references: Set[Attribute]

/** Returns the result of evaluating this expression on a given input Row */
def eval(input: Row = null): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
def eval(input: Row = null): EvaluatedType

/**
* Returns `true` if this expression and all its children have been resolved to a specific schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@

package org.apache.spark.sql.catalyst.expressions

import java.util.Random
import org.apache.spark.sql.catalyst.types.DoubleType


case object Rand extends LeafExpression {
def dataType = DoubleType
def nullable = false
def references = Set.empty
override def dataType = DoubleType
override def nullable = false
override def references = Set.empty

private[this] lazy val rand = new Random

override def eval(input: Row = null) = rand.nextDouble().asInstanceOf[EvaluatedType]

override def toString = "RAND()"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.errors.TreeNodeException

abstract sealed class SortDirection
case object Ascending extends SortDirection
case object Descending extends SortDirection
Expand All @@ -26,7 +28,12 @@ case object Descending extends SortDirection
* transformations over expression will descend into its child.
*/
case class SortOrder(child: Expression, direction: SortDirection) extends UnaryExpression {
def dataType = child.dataType
def nullable = child.nullable
override def dataType = child.dataType
override def nullable = child.nullable

// SortOrder itself is never evaluated.
override def eval(input: Row = null): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")

override def toString = s"$child ${if (direction == Ascending) "ASC" else "DESC"}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.errors.TreeNodeException

abstract class AggregateExpression extends Expression {
self: Product =>
Expand All @@ -28,6 +29,13 @@ abstract class AggregateExpression extends Expression {
* of input rows/
*/
def newInstance(): AggregateFunction

/**
* [[AggregateExpression.eval]] should never be invoked because [[AggregateExpression]]'s are
* replaced with a physical aggregate operator at runtime.
*/
override def eval(input: Row = null): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.types._

object NamedExpression {
Expand Down Expand Up @@ -58,9 +59,9 @@ abstract class Attribute extends NamedExpression {

def withQualifiers(newQualifiers: Seq[String]): Attribute

def references = Set(this)
def toAttribute = this
def newInstance: Attribute
override def references = Set(this)
}

/**
Expand All @@ -77,15 +78,15 @@ case class Alias(child: Expression, name: String)
(val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil)
extends NamedExpression with trees.UnaryNode[Expression] {

type EvaluatedType = Any
override type EvaluatedType = Any

override def eval(input: Row) = child.eval(input)

def dataType = child.dataType
def nullable = child.nullable
def references = child.references
override def dataType = child.dataType
override def nullable = child.nullable
override def references = child.references

def toAttribute = {
override def toAttribute = {
if (resolved) {
AttributeReference(name, child.dataType, child.nullable)(exprId, qualifiers)
} else {
Expand Down Expand Up @@ -127,7 +128,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
h
}

def newInstance = AttributeReference(name, dataType, nullable)(qualifiers = qualifiers)
override def newInstance = AttributeReference(name, dataType, nullable)(qualifiers = qualifiers)

/**
* Returns a copy of this [[AttributeReference]] with changed nullability.
Expand All @@ -143,13 +144,17 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
/**
* Returns a copy of this [[AttributeReference]] with new qualifiers.
*/
def withQualifiers(newQualifiers: Seq[String]) = {
override def withQualifiers(newQualifiers: Seq[String]) = {
if (newQualifiers == qualifiers) {
this
} else {
AttributeReference(name, dataType, nullable)(exprId, newQualifiers)
}
}

// Unresolved attributes are transient at compile time and don't get evaluated during execution.
override def eval(input: Row = null): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")

override def toString: String = s"$name#${exprId.id}$typeSuffix"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.sql.catalyst.plans.physical

import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Expression, Row, SortOrder}
import org.apache.spark.sql.catalyst.types.IntegerType

/**
Expand Down Expand Up @@ -139,12 +140,12 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
extends Expression
with Partitioning {

def children = expressions
def references = expressions.flatMap(_.references).toSet
def nullable = false
def dataType = IntegerType
override def children = expressions
override def references = expressions.flatMap(_.references).toSet
override def nullable = false
override def dataType = IntegerType

lazy val clusteringSet = expressions.toSet
private[this] lazy val clusteringSet = expressions.toSet

override def satisfies(required: Distribution): Boolean = required match {
case UnspecifiedDistribution => true
Expand All @@ -158,6 +159,9 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
case h: HashPartitioning if h == this => true
case _ => false
}

override def eval(input: Row = null): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}

/**
Expand All @@ -168,17 +172,20 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
* partition.
* - Each partition will have a `min` and `max` row, relative to the given ordering. All rows
* that are in between `min` and `max` in this `ordering` will reside in this partition.
*
* This class extends expression primarily so that transformations over expression will descend
* into its child.
*/
case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
extends Expression
with Partitioning {

def children = ordering
def references = ordering.flatMap(_.references).toSet
def nullable = false
def dataType = IntegerType
override def children = ordering
override def references = ordering.flatMap(_.references).toSet
override def nullable = false
override def dataType = IntegerType

lazy val clusteringSet = ordering.map(_.child).toSet
private[this] lazy val clusteringSet = ordering.map(_.child).toSet

override def satisfies(required: Distribution): Boolean = required match {
case UnspecifiedDistribution => true
Expand All @@ -195,4 +202,7 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
case r: RangePartitioning if r == this => true
case _ => false
}

override def eval(input: Row): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}
Loading

0 comments on commit 38160cb

Please sign in to comment.