Skip to content

Commit

Permalink
Merge pull request #7 from rxin/moreclean
Browse files Browse the repository at this point in the history
More cleaning
  • Loading branch information
marmbrus committed Jan 3, 2014
2 parents da16e45 + 52864da commit b8cd7e3
Show file tree
Hide file tree
Showing 46 changed files with 901 additions and 431 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ scalaVersion := "2.10.3"

resolvers += "Local Maven Repository" at "file://"+Path.userHome.absolutePath+"/.m2/repository"

libraryDependencies += "edu.berkeley.cs.amplab" %% "shark" % "0.9.0-SNAPSHOT"
libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.0-incubating-SNAPSHOT"

// Hive 0.10.0 relies on a weird version of jdo that is not published anywhere... Remove when we upgrade to 0.11.0
libraryDependencies += "javax.jdo" % "jdo2-api" % "2.3-ec" from "http://www.datanucleus.org/downloads/maven2/javax/jdo/jdo2-api/2.3-ec/jdo2-api-2.3-ec.jar"
Expand Down
41 changes: 27 additions & 14 deletions src/main/scala/catalyst/analysis/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,28 @@ import plans.logical._
import rules._

/**
* A trivial [[Analyzer]] with an [[EmptyCatalog]]. Used for testing when all relations are
* already filled in and the analyser needs only to resolve attribute references.
*
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
* when all relations are already filled in and the analyser needs only to resolve attribute
* references.
*/
object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyRegistry, true)
object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true)

/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
* [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and
* a [[FunctionRegistry]].
*/
class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)
extends RuleExecutor[LogicalPlan] {
extends RuleExecutor[LogicalPlan] {

// TODO: pass this in as a parameter.
val fixedPoint = FixedPoint(100)

val batches = Seq(
val batches: Seq[Batch] = Seq(
Batch("LocalRelations", Once,
NewLocalRelationInstances),
Batch("CaseInsensitiveAttributeReferences", Once,
(if(caseSensitive) Nil else LowercaseAttributeReferences :: Nil):_*),
(if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
Batch("Resolution", fixedPoint,
ResolveReferences,
ResolveRelations,
Expand Down Expand Up @@ -60,22 +67,26 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
}

/**
* Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s from a logical plan node's children.
* Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s
* from a logical plan node's children.
*/
object ResolveReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan if childIsFullyResolved(q) =>
logger.trace(s"Attempting to resolve ${q.simpleString}")
q transformExpressions {
case u @ UnresolvedAttribute(name) =>
// Leave unchanged if resolution fails. Hopefully will be resolved next round.
val result = q.resolve(name).getOrElse(u)
logger.debug(s"Resolving $u to $result")
result
case u @ UnresolvedAttribute(name) =>
// Leave unchanged if resolution fails. Hopefully will be resolved next round.
val result = q.resolve(name).getOrElse(u)
logger.debug(s"Resolving $u to $result")
result
}
}
}

/**
* Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s.
*/
object ResolveFunctions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan =>
Expand Down Expand Up @@ -111,13 +122,15 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Wait until children are resolved
case p: LogicalPlan if !childIsFullyResolved(p) => p
// If the projection list contains Stars, expand it.
case p @ Project(projectList, child) if containsStar(projectList) =>
Project(
projectList.flatMap {
case s: Star => s.expand(child.output)
case o => o :: Nil
},
child)
// If the aggregate function argument contains Stars, expand it.
case a: Aggregate if containsStar(a.aggregateExpressions) =>
a.copy(
aggregateExpressions = a.aggregateExpressions.flatMap {
Expand All @@ -139,4 +152,4 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
*/
protected def childIsFullyResolved(plan: LogicalPlan): Boolean =
(!plan.inputSet.isEmpty) && plan.inputSet.map(_.resolved).reduceLeft(_ && _)
}
}
12 changes: 7 additions & 5 deletions src/main/scala/catalyst/analysis/Catalog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import plans.logical.LogicalPlan
/**
* An interface for looking up relations by name. Used by an [[Analyzer]].
*/
abstract trait Catalog {
trait Catalog {
def lookupRelation(name: String, alias: Option[String] = None): LogicalPlan
}

/**
* A trivial catalog that returns an error when a relation is requested. Used for testing when all relations are
* already filled in and the analyser needs only to resolve attribute references.
* A trivial catalog that returns an error when a relation is requested. Used for testing when all
* relations are already filled in and the analyser needs only to resolve attribute references.
*/
object EmptyCatalog extends Catalog {
def lookupRelation(name: String, alias: Option[String] = None) = ???
}
def lookupRelation(name: String, alias: Option[String] = None) = {
throw new UnsupportedOperationException
}
}
15 changes: 11 additions & 4 deletions src/main/scala/catalyst/analysis/FunctionRegistry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,17 @@ package analysis

import expressions._

abstract trait FunctionRegistry {
/** A catalog for looking up user defined functions, used by an [[Analyzer]]. */
trait FunctionRegistry {
def lookupFunction(name: String, children: Seq[Expression]): Expression
}

object EmptyRegistry extends FunctionRegistry {
def lookupFunction(name: String, children: Seq[Expression]): Expression = ???
}
/**
* A trivial catalog that returns an error when a function is requested. Used for testing when all
* functions are already filled in and the analyser needs only to resolve attribute references.
*/
object EmptyFunctionRegistry extends FunctionRegistry {
def lookupFunction(name: String, children: Seq[Expression]): Expression = {
throw new UnsupportedOperationException
}
}
15 changes: 8 additions & 7 deletions src/main/scala/catalyst/analysis/typeCoercion.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package catalyst
package analysis

import errors._
import expressions._
import plans.logical._
import rules._
Expand All @@ -11,7 +10,7 @@ import types._
* Converts string "NaN"s that are in binary operators with a NaN-able types (Float / Double) to the
* appropriate numeric equivalent.
*/
object ConvertNaNs extends Rule[LogicalPlan]{
object ConvertNaNs extends Rule[LogicalPlan] {
val stringNaN = Literal("NaN", StringType)

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Expand Down Expand Up @@ -43,10 +42,12 @@ object ConvertNaNs extends Rule[LogicalPlan]{
*
* Loosely based on rules from "Hadoop: The Definitive Guide" 2nd edition, by Tom White
*
* The implicit conversion rules can be summarized as follows. Any integral numeric type can be
* implicitly converted to a wider type. All the integral numeric types, FLOAT, and (perhaps
* surprisingly) STRING can be implicitly converted to DOUBLE. TINYINT, SMALLINT, and INT can all be
* converted to FLOAT. BOOLEAN types cannot be converted to any other type.
* The implicit conversion rules can be summarized as follows:
* $ - Any integral numeric type can be implicitly converted to a wider type.
* $ - All the integral numeric types, FLOAT, and (perhaps surprisingly) STRING can be implicitly
* converted to DOUBLE.
* $ - TINYINT, SMALLINT, and INT can all be converted to FLOAT.
* $ - BOOLEAN types cannot be converted to any other type.
*
* String conversions are handled by the PromoteStrings rule.
*/
Expand Down Expand Up @@ -121,4 +122,4 @@ object BooleanComparisons extends Rule[LogicalPlan] {
case p: BinaryComparison if p.left.dataType == BooleanType && p.right.dataType == BooleanType =>
p.makeCopy(Array(Cast(p.left, ByteType), Cast(p.right, ByteType)))
}
}
}
12 changes: 8 additions & 4 deletions src/main/scala/catalyst/analysis/unresolved.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import plans.logical.BaseRelation
import trees.TreeNode

/**
* Thrown when an invalid attempt is made to access a property of a tree that has yet to be fully resolved.
* Thrown when an invalid attempt is made to access a property of a tree that has yet to be fully
* resolved.
*/
class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: String) extends
errors.OptimizationException(tree, s"Invalid call to $function on unresolved object")
Expand All @@ -16,6 +17,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
*/
case class UnresolvedRelation(name: String, alias: Option[String] = None) extends BaseRelation {
def output = Nil
override lazy val resolved = false
}

/**
Expand All @@ -31,7 +33,7 @@ case class UnresolvedAttribute(name: String) extends Attribute with trees.LeafNo
def newInstance = this
def withQualifiers(newQualifiers: Seq[String]) = this

override def toString(): String = s"'$name"
override def toString: String = s"'$name"
}

case class UnresolvedFunction(name: String, children: Seq[Expression]) extends Expression {
Expand Down Expand Up @@ -67,8 +69,10 @@ case class Star(
def withQualifiers(newQualifiers: Seq[String]) = this

def expand(input: Seq[Attribute]): Seq[NamedExpression] = {
val expandedAttributes = table match {
val expandedAttributes: Seq[Attribute] = table match {
// If there is no table specified, use all input attributes.
case None => input
// If there is a table, pick out attributes that are part of this table.
case Some(table) => input.filter(_.qualifiers contains table)
}
val mappedAttributes = expandedAttributes.map(mapFunction).zip(input).map {
Expand All @@ -80,4 +84,4 @@ case class Star(
}

override def toString = table.map(_ + ".").getOrElse("") + "*"
}
}
54 changes: 26 additions & 28 deletions src/main/scala/catalyst/execution/FunctionRegistry.scala
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
package catalyst
package execution

import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry}

import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
import scala.collection.JavaConversions._

import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF
import org.apache.hadoop.hive.ql.exec.UDF
import org.apache.hadoop.hive.serde2.objectinspector.primitive.{AbstractPrimitiveJavaObjectInspector, PrimitiveObjectInspectorFactory}
import org.apache.hadoop.io._

import expressions._
import types._
import org.apache.hadoop.io._

import collection.JavaConversions._
import scala.Predef._

object HiveFunctionRegistry extends analysis.FunctionRegistry {
def lookupFunction(name: String, children: Seq[Expression]): Expression = {
// We only look it up to see if it exists, but do not include it in the HiveUDF since it is not always serializable.
val functionInfo =
Option(FunctionRegistry.getFunctionInfo(name))
.getOrElse(sys.error(s"Couldn't find function $name"))
// We only look it up to see if it exists, but do not include it in the HiveUDF since it is
// not always serializable.
val functionInfo: FunctionInfo = Option(FunctionRegistry.getFunctionInfo(name)).getOrElse(
sys.error(s"Couldn't find function $name"))

if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
val functionInfo = FunctionRegistry.getFunctionInfo(name)
Expand All @@ -31,7 +28,7 @@ object HiveFunctionRegistry extends analysis.FunctionRegistry {

HiveSimpleUdf(
name,
children.zip(expectedDataTypes).map { case (e, t) => Cast(e,t) }
children.zip(expectedDataTypes).map { case (e, t) => Cast(e, t) }
)
} else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
HiveGenericUdf(name, IntegerType, children)
Expand Down Expand Up @@ -99,9 +96,7 @@ abstract class HiveUdf extends Expression with ImplementedUdf with Logging {
}
}

case class HiveSimpleUdf(
name: String,
children: Seq[Expression]) extends HiveUdf {
case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUdf {
import HiveFunctionRegistry._
type UDFType = UDF

Expand All @@ -110,30 +105,30 @@ case class HiveSimpleUdf(
@transient
lazy val dataType = javaClassToDataType(method.getReturnType)

lazy val wrappers = method.getParameterTypes.map { argClass =>
lazy val wrappers: Array[(Any) => AnyRef] = method.getParameterTypes.map { argClass =>
val primitiveClasses = Seq(
Integer.TYPE, classOf[java.lang.Integer], classOf[java.lang.String], java.lang.Double.TYPE,
classOf[java.lang.Double], java.lang.Long.TYPE, classOf[java.lang.Long]
)
val matchingConstructor =
argClass.getConstructors.find(c =>
c.getParameterTypes.size == 1 &&
primitiveClasses.contains(c.getParameterTypes.head))
val matchingConstructor = argClass.getConstructors.find { c =>
c.getParameterTypes.size == 1 && primitiveClasses.contains(c.getParameterTypes.head)
}

val constructor =
matchingConstructor
.getOrElse(sys.error(s"No matching wrapper found, options: ${argClass.getConstructors.toSeq}."))
val constructor = matchingConstructor.getOrElse(
sys.error(s"No matching wrapper found, options: ${argClass.getConstructors.toSeq}."))

(a: Any) => {
logger.debug(s"Wrapping $a of type ${if(a == null) "null" else a.getClass.getName} using $constructor.")
logger.debug(
s"Wrapping $a of type ${if (a == null) "null" else a.getClass.getName} using $constructor.")
// We must make sure that primitives get boxed java style.
if(a == null)
if (a == null) {
null
else
} else {
constructor.newInstance(a match {
case i: Int => i: java.lang.Integer
case other: AnyRef => other
}).asInstanceOf[AnyRef]
}
}
}

Expand All @@ -156,7 +151,7 @@ case class HiveGenericUdf(
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._
type UDFType = GenericUDF

lazy val inspectors = children.map(_.dataType).map {
lazy val inspectors: Seq[AbstractPrimitiveJavaObjectInspector] = children.map(_.dataType).map {
case StringType => PrimitiveObjectInspectorFactory.javaStringObjectInspector
case IntegerType => PrimitiveObjectInspectorFactory.javaIntObjectInspector
case DoubleType => PrimitiveObjectInspectorFactory.javaDoubleObjectInspector
Expand All @@ -165,6 +160,7 @@ case class HiveGenericUdf(
case ShortType => PrimitiveObjectInspectorFactory.javaShortObjectInspector
case ByteType => PrimitiveObjectInspectorFactory.javaByteObjectInspector
}

lazy val instance = {
function.initialize(inspectors.toArray)
function
Expand All @@ -183,7 +179,9 @@ case class HiveGenericUdf(
}

def evaluate(evaluatedChildren: Seq[Any]): Any = {
val args = evaluatedChildren.map(wrap).map(v => new DeferredJavaObject(v): DeferredObject).toArray
val args = evaluatedChildren.map(wrap).map { v =>
new DeferredJavaObject(v): DeferredObject
}.toArray
unwrap(instance.evaluate(args))
}
}
Loading

0 comments on commit b8cd7e3

Please sign in to comment.