From e36caf5a210578b49ae16353d983bf1ba0e7459c Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 2 Jan 2014 23:28:05 -0800 Subject: [PATCH 1/8] Renamed Rule.name to Rule.ruleName since name is used too frequently in the code base and is shadowed often by local scope. --- src/main/scala/catalyst/rules/Rule.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/scala/catalyst/rules/Rule.scala b/src/main/scala/catalyst/rules/Rule.scala index c20eca0109d88..fb609fd527b77 100644 --- a/src/main/scala/catalyst/rules/Rule.scala +++ b/src/main/scala/catalyst/rules/Rule.scala @@ -4,13 +4,12 @@ package rules import trees._ abstract class Rule[TreeType <: TreeNode[_]] extends Logging { - val name = { + + /** Name for this rule, automatically inferred based on class name. */ + val ruleName: String = { val className = getClass.getName - if(className endsWith "$") - className.dropRight(1) - else - className + if (className endsWith "$") className.dropRight(1) else className } def apply(plan: TreeType): TreeType -} \ No newline at end of file +} From 479e0558f0e1ee294fd22bb8f1bc4ca3032001be Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 3 Jan 2014 01:02:33 -0800 Subject: [PATCH 2/8] A set of minor changes, including: - import order - limit some lines to 100 character wide - inline code comment - more scaladocs - minor spacing (i.e. add a space after if) --- .../scala/catalyst/analysis/Analyzer.scala | 41 ++++++---- .../scala/catalyst/analysis/Catalog.scala | 12 +-- .../catalyst/analysis/FunctionRegistry.scala | 15 +++- .../catalyst/analysis/typeCoercion.scala | 15 ++-- .../scala/catalyst/analysis/unresolved.scala | 6 +- .../catalyst/execution/FunctionRegistry.scala | 49 ++++++------ .../catalyst/execution/MetastoreCatalog.scala | 25 ++++--- .../scala/catalyst/execution/SharkPlan.scala | 8 +- .../scala/catalyst/expressions/Cast.scala | 3 +- .../catalyst/expressions/SortOrder.scala | 2 - .../catalyst/expressions/aggregates.scala | 7 +- .../catalyst/expressions/functions.scala | 2 +- .../expressions/namedExpressions.scala | 31 ++++---- .../catalyst/expressions/predicates.scala | 10 ++- src/main/scala/catalyst/frontend/Hive.scala | 13 ++-- src/main/scala/catalyst/plans/QueryPlan.scala | 4 +- .../catalyst/plans/logical/BaseRelation.scala | 3 +- .../catalyst/plans/logical/LogicalPlan.scala | 11 +-- .../catalyst/plans/logical/TestRelation.scala | 19 +++-- src/main/scala/catalyst/plans/package.scala | 5 +- .../scala/catalyst/rules/RuleExecutor.scala | 21 +++++- src/main/scala/catalyst/trees/TreeNode.scala | 74 ++++++++++--------- src/main/scala/catalyst/types/package.scala | 5 +- 23 files changed, 215 insertions(+), 166 deletions(-) diff --git a/src/main/scala/catalyst/analysis/Analyzer.scala b/src/main/scala/catalyst/analysis/Analyzer.scala index 335fcab3f83f5..490dcb3b29008 100644 --- a/src/main/scala/catalyst/analysis/Analyzer.scala +++ b/src/main/scala/catalyst/analysis/Analyzer.scala @@ -6,21 +6,28 @@ import plans.logical._ import rules._ /** - * A trivial [[Analyzer]] with an [[EmptyCatalog]]. Used for testing when all relations are - * already filled in and the analyser needs only to resolve attribute references. - * + * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing + * when all relations are already filled in and the analyser needs only to resolve attribute + * references. */ -object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyRegistry, true) +object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true) +/** + * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and + * [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and + * a [[FunctionRegistry]]. + */ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean) - extends RuleExecutor[LogicalPlan] { + extends RuleExecutor[LogicalPlan] { + + // TODO: pass this in as a parameter. val fixedPoint = FixedPoint(100) - val batches = Seq( + val batches: Seq[Batch] = Seq( Batch("LocalRelations", Once, NewLocalRelationInstances), Batch("CaseInsensitiveAttributeReferences", Once, - (if(caseSensitive) Nil else LowercaseAttributeReferences :: Nil):_*), + (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*), Batch("Resolution", fixedPoint, ResolveReferences, ResolveRelations, @@ -60,22 +67,26 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool } /** - * Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s from a logical plan node's children. + * Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s + * from a logical plan node's children. */ object ResolveReferences extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan if childIsFullyResolved(q) => logger.trace(s"Attempting to resolve ${q.simpleString}") q transformExpressions { - case u @ UnresolvedAttribute(name) => - // Leave unchanged if resolution fails. Hopefully will be resolved next round. - val result = q.resolve(name).getOrElse(u) - logger.debug(s"Resolving $u to $result") - result + case u @ UnresolvedAttribute(name) => + // Leave unchanged if resolution fails. Hopefully will be resolved next round. + val result = q.resolve(name).getOrElse(u) + logger.debug(s"Resolving $u to $result") + result } } } + /** + * Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s. + */ object ResolveFunctions extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => @@ -111,6 +122,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Wait until children are resolved case p: LogicalPlan if !childIsFullyResolved(p) => p + // If the projection list contains Star's, expand it. case p @ Project(projectList, child) if containsStar(projectList) => Project( projectList.flatMap { @@ -118,6 +130,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool case o => o :: Nil }, child) + // If the aggregate function argument contains Star's, expand it. case a: Aggregate if containsStar(a.aggregateExpressions) => a.copy( aggregateExpressions = a.aggregateExpressions.flatMap { @@ -139,4 +152,4 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool */ protected def childIsFullyResolved(plan: LogicalPlan): Boolean = (!plan.inputSet.isEmpty) && plan.inputSet.map(_.resolved).reduceLeft(_ && _) -} \ No newline at end of file +} diff --git a/src/main/scala/catalyst/analysis/Catalog.scala b/src/main/scala/catalyst/analysis/Catalog.scala index be931890661b7..2d4e307c8921b 100644 --- a/src/main/scala/catalyst/analysis/Catalog.scala +++ b/src/main/scala/catalyst/analysis/Catalog.scala @@ -6,14 +6,16 @@ import plans.logical.LogicalPlan /** * An interface for looking up relations by name. Used by an [[Analyzer]]. */ -abstract trait Catalog { +trait Catalog { def lookupRelation(name: String, alias: Option[String] = None): LogicalPlan } /** - * A trivial catalog that returns an error when a relation is requested. Used for testing when all relations are - * already filled in and the analyser needs only to resolve attribute references. + * A trivial catalog that returns an error when a relation is requested. Used for testing when all + * relations are already filled in and the analyser needs only to resolve attribute references. */ object EmptyCatalog extends Catalog { - def lookupRelation(name: String, alias: Option[String] = None) = ??? -} \ No newline at end of file + def lookupRelation(name: String, alias: Option[String] = None) = { + throw new UnsupportedOperationException + } +} diff --git a/src/main/scala/catalyst/analysis/FunctionRegistry.scala b/src/main/scala/catalyst/analysis/FunctionRegistry.scala index be166b0931353..9fef07b363b9f 100644 --- a/src/main/scala/catalyst/analysis/FunctionRegistry.scala +++ b/src/main/scala/catalyst/analysis/FunctionRegistry.scala @@ -3,10 +3,17 @@ package analysis import expressions._ -abstract trait FunctionRegistry { +/** A catalog for looking up user defined functions, used by an [[Analyzer]]. */ +trait FunctionRegistry { def lookupFunction(name: String, children: Seq[Expression]): Expression } -object EmptyRegistry extends FunctionRegistry { - def lookupFunction(name: String, children: Seq[Expression]): Expression = ??? -} \ No newline at end of file +/** + * A trivial catalog that returns an error when a function is requested. Used for testing when all + * functions are already filled in and the analyser needs only to resolve attribute references. + */ +object EmptyFunctionRegistry extends FunctionRegistry { + def lookupFunction(name: String, children: Seq[Expression]): Expression = { + throw new UnsupportedOperationException + } +} diff --git a/src/main/scala/catalyst/analysis/typeCoercion.scala b/src/main/scala/catalyst/analysis/typeCoercion.scala index cdb9aa6100350..0edb6ef12d32e 100644 --- a/src/main/scala/catalyst/analysis/typeCoercion.scala +++ b/src/main/scala/catalyst/analysis/typeCoercion.scala @@ -1,7 +1,6 @@ package catalyst package analysis -import errors._ import expressions._ import plans.logical._ import rules._ @@ -11,7 +10,7 @@ import types._ * Converts string "NaN"s that are in binary operators with a NaN-able types (Float / Double) to the * appropriate numeric equivalent. */ -object ConvertNaNs extends Rule[LogicalPlan]{ +object ConvertNaNs extends Rule[LogicalPlan] { val stringNaN = Literal("NaN", StringType) def apply(plan: LogicalPlan): LogicalPlan = plan transform { @@ -43,10 +42,12 @@ object ConvertNaNs extends Rule[LogicalPlan]{ * * Loosely based on rules from "Hadoop: The Definitive Guide" 2nd edition, by Tom White * - * The implicit conversion rules can be summarized as follows. Any integral numeric type can be - * implicitly converted to a wider type. All the integral numeric types, FLOAT, and (perhaps - * surprisingly) STRING can be implicitly converted to DOUBLE. TINYINT, SMALLINT, and INT can all be - * converted to FLOAT. BOOLEAN types cannot be converted to any other type. + * The implicit conversion rules can be summarized as follows: + * - Any integral numeric type can be implicitly converted to a wider type. + * - All the integral numeric types, FLOAT, and (perhaps surprisingly) STRING can be implicitly + * converted to DOUBLE. + * - TINYINT, SMALLINT, and INT can all be converted to FLOAT. + * - BOOLEAN types cannot be converted to any other type. * * String conversions are handled by the PromoteStrings rule. */ @@ -121,4 +122,4 @@ object BooleanComparisons extends Rule[LogicalPlan] { case p: BinaryComparison if p.left.dataType == BooleanType && p.right.dataType == BooleanType => p.makeCopy(Array(Cast(p.left, ByteType), Cast(p.right, ByteType))) } -} \ No newline at end of file +} diff --git a/src/main/scala/catalyst/analysis/unresolved.scala b/src/main/scala/catalyst/analysis/unresolved.scala index ac942390bd8c6..0d4c45dfaa3b2 100644 --- a/src/main/scala/catalyst/analysis/unresolved.scala +++ b/src/main/scala/catalyst/analysis/unresolved.scala @@ -31,7 +31,7 @@ case class UnresolvedAttribute(name: String) extends Attribute with trees.LeafNo def newInstance = this def withQualifiers(newQualifiers: Seq[String]) = this - override def toString(): String = s"'$name" + override def toString: String = s"'$name" } case class UnresolvedFunction(name: String, children: Seq[Expression]) extends Expression { @@ -67,8 +67,10 @@ case class Star( def withQualifiers(newQualifiers: Seq[String]) = this def expand(input: Seq[Attribute]): Seq[NamedExpression] = { - val expandedAttributes = table match { + val expandedAttributes: Seq[Attribute] = table match { + // If there is no table specified, use all input attributes. case None => input + // If there is a table, pick out attributes that are part of this table. case Some(table) => input.filter(_.qualifiers contains table) } val mappedAttributes = expandedAttributes.map(mapFunction).zip(input).map { diff --git a/src/main/scala/catalyst/execution/FunctionRegistry.scala b/src/main/scala/catalyst/execution/FunctionRegistry.scala index 691fb0abc06d4..a86f8f0a80a14 100644 --- a/src/main/scala/catalyst/execution/FunctionRegistry.scala +++ b/src/main/scala/catalyst/execution/FunctionRegistry.scala @@ -1,26 +1,23 @@ package catalyst package execution -import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry} - -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory +import scala.collection.JavaConversions._ +import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry} import org.apache.hadoop.hive.ql.udf.generic.GenericUDF import org.apache.hadoop.hive.ql.exec.UDF +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory +import org.apache.hadoop.io._ import expressions._ import types._ -import org.apache.hadoop.io._ - -import collection.JavaConversions._ -import scala.Predef._ object HiveFunctionRegistry extends analysis.FunctionRegistry { def lookupFunction(name: String, children: Seq[Expression]): Expression = { - // We only look it up to see if it exists, but do not include it in the HiveUDF since it is not always serializable. - val functionInfo = - Option(FunctionRegistry.getFunctionInfo(name)) - .getOrElse(sys.error(s"Couldn't find function $name")) + // We only look it up to see if it exists, but do not include it in the HiveUDF since it is + // not always serializable. + val functionInfo: FunctionInfo = Option(FunctionRegistry.getFunctionInfo(name)).getOrElse( + sys.error(s"Couldn't find function $name")) if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) { val functionInfo = FunctionRegistry.getFunctionInfo(name) @@ -31,7 +28,7 @@ object HiveFunctionRegistry extends analysis.FunctionRegistry { HiveSimpleUdf( name, - children.zip(expectedDataTypes).map { case (e, t) => Cast(e,t) } + children.zip(expectedDataTypes).map { case (e, t) => Cast(e, t) } ) } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) { HiveGenericUdf(name, IntegerType, children) @@ -99,41 +96,38 @@ abstract class HiveUdf extends Expression with ImplementedUdf with Logging { } } -case class HiveSimpleUdf( - name: String, - children: Seq[Expression]) extends HiveUdf { +case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUdf { import HiveFunctionRegistry._ type UDFType = UDF @transient lazy val method = function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo)) @transient - lazy val dataType = javaClassToDataType(method.getReturnType) + lazy val dataType: DataType = javaClassToDataType(method.getReturnType) lazy val wrappers = method.getParameterTypes.map { argClass => val primitiveClasses = Seq( Integer.TYPE, classOf[java.lang.Integer], classOf[java.lang.String], java.lang.Double.TYPE, classOf[java.lang.Double], java.lang.Long.TYPE, classOf[java.lang.Long] ) - val matchingConstructor = - argClass.getConstructors.find(c => - c.getParameterTypes.size == 1 && - primitiveClasses.contains(c.getParameterTypes.head)) + val matchingConstructor = argClass.getConstructors.find { c => + c.getParameterTypes.size == 1 && primitiveClasses.contains(c.getParameterTypes.head) + } - val constructor = - matchingConstructor - .getOrElse(sys.error(s"No matching wrapper found, options: ${argClass.getConstructors.toSeq}.")) + val constructor = matchingConstructor.getOrElse( + sys.error(s"No matching wrapper found, options: ${argClass.getConstructors.toSeq}.")) (a: Any) => { - logger.debug(s"Wrapping $a of type ${if(a == null) "null" else a.getClass.getName} using $constructor.") + logger.debug(s"Wrapping $a of type ${if (a == null) "null" else a.getClass.getName} using $constructor.") // We must make sure that primitives get boxed java style. - if(a == null) + if (a == null) { null - else + } else { constructor.newInstance(a match { case i: Int => i: java.lang.Integer case other: AnyRef => other }).asInstanceOf[AnyRef] + } } } @@ -156,7 +150,7 @@ case class HiveGenericUdf( import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._ type UDFType = GenericUDF - lazy val inspectors = children.map(_.dataType).map { + lazy val inspectors = children.map(_.dataType).map { case StringType => PrimitiveObjectInspectorFactory.javaStringObjectInspector case IntegerType => PrimitiveObjectInspectorFactory.javaIntObjectInspector case DoubleType => PrimitiveObjectInspectorFactory.javaDoubleObjectInspector @@ -165,6 +159,7 @@ case class HiveGenericUdf( case ShortType => PrimitiveObjectInspectorFactory.javaShortObjectInspector case ByteType => PrimitiveObjectInspectorFactory.javaByteObjectInspector } + lazy val instance = { function.initialize(inspectors.toArray) function diff --git a/src/main/scala/catalyst/execution/MetastoreCatalog.scala b/src/main/scala/catalyst/execution/MetastoreCatalog.scala index 06b62687e88d6..46906149873ad 100644 --- a/src/main/scala/catalyst/execution/MetastoreCatalog.scala +++ b/src/main/scala/catalyst/execution/MetastoreCatalog.scala @@ -25,17 +25,19 @@ class HiveMetastoreCatalog(hiveConf: HiveConf) extends Catalog { val table = client.getTable(databaseName, tableName) val hiveQlTable = new org.apache.hadoop.hive.ql.metadata.Table(table) val partitions = - if(hiveQlTable.isPartitioned) + if (hiveQlTable.isPartitioned) { client.listPartitions(databaseName, tableName, 255).toSeq - else + } else { Nil + } // Since HiveQL is case insensitive for table names we make them all lowercase. MetastoreRelation(databaseName.toLowerCase, tableName.toLowerCase, alias)(table, partitions) } /** - * Creates any tables required for query execution. For example, because of a CREATE TABLE X AS statement. + * Creates any tables required for query execution. + * For example, because of a CREATE TABLE X AS statement. */ object CreateTables extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { @@ -81,8 +83,9 @@ object HiveMetatoreTypes { } } -case class MetastoreRelation(databaseName: String, tableName: String, alias: Option[String])(val table: Table, val partitions: Seq[Partition]) - extends BaseRelation { +case class MetastoreRelation(databaseName: String, tableName: String, alias: Option[String]) + (val table: Table, val partitions: Seq[Partition]) + extends BaseRelation { def hiveQlTable = new org.apache.hadoop.hive.ql.metadata.Table(table) def hiveQlPartitions = partitions.map(new org.apache.hadoop.hive.ql.metadata.Partition(hiveQlTable, _)) @@ -94,12 +97,12 @@ case class MetastoreRelation(databaseName: String, tableName: String, alias: Opt ) implicit class SchemaAttribute(f: FieldSchema) { - def toAttribute = - AttributeReference( - f.getName, - HiveMetatoreTypes.toDataType(f.getType), - true // Since data can be dumped in randomly with no validation, everything is nullable. - )(qualifiers = tableName +: alias.toSeq) + def toAttribute = AttributeReference( + f.getName, + HiveMetatoreTypes.toDataType(f.getType), + // Since data can be dumped in randomly with no validation, everything is nullable. + nullable = true + )(qualifiers = tableName +: alias.toSeq) } val partitionKeys = hiveQlTable.getPartitionKeys.map(_.toAttribute) diff --git a/src/main/scala/catalyst/execution/SharkPlan.scala b/src/main/scala/catalyst/execution/SharkPlan.scala index 519336f217fb1..0f74b2147104d 100644 --- a/src/main/scala/catalyst/execution/SharkPlan.scala +++ b/src/main/scala/catalyst/execution/SharkPlan.scala @@ -15,14 +15,14 @@ abstract class SharkPlan extends QueryPlan[SharkPlan] with Logging { protected def buildRow(values: Seq[Any]): Row = new catalyst.expressions.GenericRow(values) } -abstract trait LeafNode extends SharkPlan with trees.LeafNode[SharkPlan] { +trait LeafNode extends SharkPlan with trees.LeafNode[SharkPlan] { self: Product => } -abstract trait UnaryNode extends SharkPlan with trees.UnaryNode[SharkPlan] { +trait UnaryNode extends SharkPlan with trees.UnaryNode[SharkPlan] { self: Product => } -abstract trait BinaryNode extends SharkPlan with trees.BinaryNode[SharkPlan] { +trait BinaryNode extends SharkPlan with trees.BinaryNode[SharkPlan] { self: Product => -} \ No newline at end of file +} diff --git a/src/main/scala/catalyst/expressions/Cast.scala b/src/main/scala/catalyst/expressions/Cast.scala index 297933581671b..9c6eb77bbe827 100644 --- a/src/main/scala/catalyst/expressions/Cast.scala +++ b/src/main/scala/catalyst/expressions/Cast.scala @@ -3,7 +3,8 @@ package expressions import types.DataType +/** Cast the child expression to the target data type. */ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { def nullable = child.nullable override def toString = s"CAST($child, $dataType)" -} \ No newline at end of file +} diff --git a/src/main/scala/catalyst/expressions/SortOrder.scala b/src/main/scala/catalyst/expressions/SortOrder.scala index cfce25cf1475f..b33ef867519c9 100644 --- a/src/main/scala/catalyst/expressions/SortOrder.scala +++ b/src/main/scala/catalyst/expressions/SortOrder.scala @@ -1,8 +1,6 @@ package catalyst package expressions -import types._ - abstract sealed class SortDirection case object Ascending extends SortDirection case object Descending extends SortDirection diff --git a/src/main/scala/catalyst/expressions/aggregates.scala b/src/main/scala/catalyst/expressions/aggregates.scala index 1422d1bd7aec6..621f6e7bd668e 100644 --- a/src/main/scala/catalyst/expressions/aggregates.scala +++ b/src/main/scala/catalyst/expressions/aggregates.scala @@ -8,10 +8,11 @@ abstract class AggregateExpression extends Expression { } /** - * A specific implementation of an aggregate function. Used to wrap a generic [[AggregateExpression]] with an - * algorithm that will be used to compute the result. + * A specific implementation of an aggregate function. Used to wrap a generic + * [[AggregateExpression]] with an algorithm that will be used to compute the result. */ -abstract class AggregateFunction extends AggregateExpression with Serializable with trees.LeafNode[Expression] { +abstract class AggregateFunction + extends AggregateExpression with Serializable with trees.LeafNode[Expression] { self: Product => /** Base should return the generic aggregate expression that this function is computing */ diff --git a/src/main/scala/catalyst/expressions/functions.scala b/src/main/scala/catalyst/expressions/functions.scala index 44b058f5eda2e..c16e096c93345 100644 --- a/src/main/scala/catalyst/expressions/functions.scala +++ b/src/main/scala/catalyst/expressions/functions.scala @@ -4,6 +4,6 @@ package expressions /** * A UDF that has a native JVM implementation. */ -abstract trait ImplementedUdf { +trait ImplementedUdf { def evaluate(evaluatedChildren: Seq[Any]): Any } \ No newline at end of file diff --git a/src/main/scala/catalyst/expressions/namedExpressions.scala b/src/main/scala/catalyst/expressions/namedExpressions.scala index b3498d7575f84..1a1faf32e2c3d 100644 --- a/src/main/scala/catalyst/expressions/namedExpressions.scala +++ b/src/main/scala/catalyst/expressions/namedExpressions.scala @@ -1,7 +1,7 @@ package catalyst package expressions -import catalyst.analysis.{UnresolvedAttribute, UnresolvedException} +import catalyst.analysis.UnresolvedAttribute import types._ object NamedExpression { @@ -43,25 +43,26 @@ abstract class Attribute extends NamedExpression { * * @param child the computation being performed * @param name the name to be associated with the result of computing [[child]]. - * @param exprId A globally unique id used to check if an [[AttributeReference]] refers to this alias. - * Auto-assigned if left blank. + * @param exprId A globally unique id used to check if an [[AttributeReference]] refers to this + * alias. Auto-assigned if left blank. */ case class Alias(child: Expression, name: String) - (val exprId: ExprId = NamedExpression.newExprId, - val qualifiers: Seq[String] = Nil) + (val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil) extends NamedExpression with trees.UnaryNode[Expression] { def dataType = child.dataType def nullable = child.nullable def references = child.references - def toAttribute = - if(resolved) + def toAttribute = { + if (resolved) { AttributeReference(name, child.dataType, child.nullable)(exprId, qualifiers) - else + } else { UnresolvedAttribute(name) + } + } - override def toString(): String = s"$child AS $name#${exprId.id}" + override def toString: String = s"$child AS $name#${exprId.id}" override protected final def otherCopyArgs = exprId :: qualifiers :: Nil } @@ -72,12 +73,14 @@ case class Alias(child: Expression, name: String) * @param name The name of this attribute, should only be used during analysis or for debugging. * @param dataType The [[DataType]] of this attribute. * @param nullable True if null is a valid value for this attribute. - * @param exprId A globally unique id used to check if different AttributeReferences refer to the same attribute. - * @param qualifiers a list of strings that can be used to referred to this attribute in a fully qualified way. Consider - * the examples tableName.name, subQueryAlias.name. tableName and subQueryAlias are possible qualifiers. + * @param exprId A globally unique id used to check if different AttributeReferences refer to the + * same attribute. + * @param qualifiers a list of strings that can be used to referred to this attribute in a fully + * qualified way. Consider the examples tableName.name, subQueryAlias.name. + * tableName and subQueryAlias are possible qualifiers. */ case class AttributeReference(name: String, dataType: DataType, nullable: Boolean = true) - (val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil) + (val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil) extends Attribute with trees.LeafNode[Expression] { override def equals(other: Any) = other match { @@ -106,5 +109,5 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea else AttributeReference(name, dataType, nullable)(exprId, newQualifiers) - override def toString(): String = s"$name#${exprId.id}" + override def toString: String = s"$name#${exprId.id}" } \ No newline at end of file diff --git a/src/main/scala/catalyst/expressions/predicates.scala b/src/main/scala/catalyst/expressions/predicates.scala index cbb543a32d131..0600d92e34446 100644 --- a/src/main/scala/catalyst/expressions/predicates.scala +++ b/src/main/scala/catalyst/expressions/predicates.scala @@ -3,7 +3,7 @@ package expressions import types._ -abstract trait Predicate extends Expression { +trait Predicate extends Expression { self: Product => def dataType = BooleanType @@ -15,14 +15,16 @@ abstract class BinaryPredicate extends BinaryExpression with Predicate { def nullable = left.nullable || right.nullable } -case class Not(child: Expression) extends Predicate with trees.UnaryNode[Expression]{ +case class Not(child: Expression) extends Predicate with trees.UnaryNode[Expression] { def references = child.references def nullable = child.nullable override def toString = s"NOT $child" } + case class And(left: Expression, right: Expression) extends BinaryPredicate { def symbol = "&&" } + case class Or(left: Expression, right: Expression) extends BinaryPredicate { def symbol = "||" } @@ -38,12 +40,15 @@ case class Equals(left: Expression, right: Expression) extends BinaryComparison case class LessThan(left: Expression, right: Expression) extends BinaryComparison { def symbol = "<" } + case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison { def symbol = "<=" } + case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison { def symbol = ">" } + case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison { def symbol = ">=" } @@ -52,6 +57,7 @@ case class IsNull(child: Expression) extends Predicate with trees.UnaryNode[Expr def references = child.references def nullable = false } + case class IsNotNull(child: Expression) extends Predicate with trees.UnaryNode[Expression] { def references = child.references def nullable = false diff --git a/src/main/scala/catalyst/frontend/Hive.scala b/src/main/scala/catalyst/frontend/Hive.scala index 69b571a8bca3f..2d907211711e0 100644 --- a/src/main/scala/catalyst/frontend/Hive.scala +++ b/src/main/scala/catalyst/frontend/Hive.scala @@ -2,7 +2,8 @@ package catalyst package frontend package hive -import catalyst.analysis.UnresolvedRelation +import scala.collection.JavaConversions._ + import org.apache.hadoop.hive.ql.lib.Node import org.apache.hadoop.hive.ql.parse._ @@ -12,9 +13,6 @@ import plans._ import plans.logical._ import types._ -import collection.JavaConversions._ -import scala.collection.mutable - /** * A logical node that represents a non-query command to be executed by the system. For example, * commands can be used by parsers to represent DDL operations. @@ -25,8 +23,9 @@ abstract class Command extends LeafNode { } /** - * Used when we need to start parsing the AST before deciding that we are going to pass the command back for Hive to - * execute natively. Will be replaced with a native command that contains the cmd string. + * Used when we need to start parsing the AST before deciding that we are going to pass the command + * back for Hive to execute natively. Will be replaced with a native command that contains the + * cmd string. */ case object NativePlaceholder extends Command @@ -119,7 +118,7 @@ object HiveQl { /** * A set of implicit transformations that allow Hive ASTNodes to be rewritten by transformations similar to - * [[catalyst.tree.TreeNode]]. + * [[catalyst.trees.TreeNode]]. * * Note that this should be considered very experimental and is not indented as a replacement for TreeNode. Primarily * it should be noted ASTNodes are not immutable and do not appear to have clean copy semantics. Therefore, users of diff --git a/src/main/scala/catalyst/plans/QueryPlan.scala b/src/main/scala/catalyst/plans/QueryPlan.scala index d574954af2855..c099a777ef4aa 100644 --- a/src/main/scala/catalyst/plans/QueryPlan.scala +++ b/src/main/scala/catalyst/plans/QueryPlan.scala @@ -19,7 +19,7 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy @inline def transformExpression(e: Expression) = { val newE = e.transform(rule) - if(newE.id != e.id && newE != e) { + if (newE.id != e.id && newE != e) { changed = true newE } else { @@ -37,7 +37,7 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy case other: AnyRef => other }.toArray - if(changed) makeCopy(newArgs) else this + if (changed) makeCopy(newArgs) else this } /** Returns the result of running [[transformExpressions]] on this node and all its children */ diff --git a/src/main/scala/catalyst/plans/logical/BaseRelation.scala b/src/main/scala/catalyst/plans/logical/BaseRelation.scala index 829f0b3ff245b..16788affb3cf6 100644 --- a/src/main/scala/catalyst/plans/logical/BaseRelation.scala +++ b/src/main/scala/catalyst/plans/logical/BaseRelation.scala @@ -4,5 +4,4 @@ package logical abstract class BaseRelation extends LeafNode { self: Product => - -} \ No newline at end of file +} diff --git a/src/main/scala/catalyst/plans/logical/LogicalPlan.scala b/src/main/scala/catalyst/plans/logical/LogicalPlan.scala index a4bb80ef240ba..fdd1428601cf6 100644 --- a/src/main/scala/catalyst/plans/logical/LogicalPlan.scala +++ b/src/main/scala/catalyst/plans/logical/LogicalPlan.scala @@ -28,14 +28,9 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { def resolve(name: String): Option[Attribute] = { val parts = name.split("\\.") - val options = children.flatMap(_.output).filter {option => - // If the first part of the desired name matches a qualifier for this possible match, drop it. - val remainingParts = - if(option.qualifiers contains parts.head) - parts.drop(1) - else - parts - + val options = children.flatMap(_.output).filter { option => + // If the first part of the desired name matches a qualifier for this possible match, drop it. + val remainingParts = if (option.qualifiers contains parts.head) parts.drop(1) else parts option.name == remainingParts.head } diff --git a/src/main/scala/catalyst/plans/logical/TestRelation.scala b/src/main/scala/catalyst/plans/logical/TestRelation.scala index ca49f585e7f8b..88535680fa075 100644 --- a/src/main/scala/catalyst/plans/logical/TestRelation.scala +++ b/src/main/scala/catalyst/plans/logical/TestRelation.scala @@ -15,9 +15,9 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[Product] = Nil) exten def loadData(newData: Seq[Product]) = new LocalRelation(output, data ++ newData) /** - * Returns an identical copy of this relation with new exprIds for all attributes. Different attributes are required - * when a relation is going to be included multiple times in the same query. - * @return + * Returns an identical copy of this relation with new exprIds for all attributes. Different + * attributes are required when a relation is going to be included multiple times in the same + * query. */ def newInstance: LocalRelation = { LocalRelation(output.map(_.newInstance), data) @@ -27,17 +27,20 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[Product] = Nil) exten } /** - * If any local relation appears more than once in the query plan then the plan is updated so that each instance has - * unique expression ids for the attributes produced. + * If any local relation appears more than once in the query plan then the plan is updated so that + * each instance has unique expression ids for the attributes produced. */ object NewLocalRelationInstances extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { val localRelations = plan collect { case l: LocalRelation => l} - val multiAppearance = - localRelations.groupBy(identity[LocalRelation]).filter { case (_, ls) => ls.size > 1 }.map(_._1).toSet + val multiAppearance = localRelations + .groupBy(identity[LocalRelation]) + .filter { case (_, ls) => ls.size > 1 } + .map(_._1) + .toSet plan transform { case l: LocalRelation if multiAppearance contains l => l.newInstance } } -} \ No newline at end of file +} diff --git a/src/main/scala/catalyst/plans/package.scala b/src/main/scala/catalyst/plans/package.scala index 3b98dbb518ff8..6ac3859a2329c 100644 --- a/src/main/scala/catalyst/plans/package.scala +++ b/src/main/scala/catalyst/plans/package.scala @@ -1,6 +1,7 @@ package catalyst /** - * A a collection of common abstractions for query plans as well as a base [[logical]] plan representation. + * A a collection of common abstractions for query plans as well as + * a base logical plan representation. */ -package object plans \ No newline at end of file +package object plans diff --git a/src/main/scala/catalyst/rules/RuleExecutor.scala b/src/main/scala/catalyst/rules/RuleExecutor.scala index 7bb2497f41fa5..59c5f29c83f49 100644 --- a/src/main/scala/catalyst/rules/RuleExecutor.scala +++ b/src/main/scala/catalyst/rules/RuleExecutor.scala @@ -4,14 +4,29 @@ package rules import trees._ abstract class RuleExecutor[TreeType <: TreeNode[_]] { + + /** + * An execution strategy for rules that indicates the maximum number of executions. If the + * execution reaches fix point (i.e. converge) before maxIterations, it will stop. + */ abstract class Strategy { def maxIterations: Int } + + /** A strategy that only runs once. */ case object Once extends Strategy { val maxIterations = 1 } + + /** A strategy that runs until fix point or maxIterations times, whichever comes first. */ case class FixedPoint(maxIterations: Int) extends Strategy + /** A batch of rules. */ protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*) + /** Defines a sequence of rule batches, to be overridden by the implementation. */ protected val batches: Seq[Batch] + /** + * Executes the batches of rules defined by the subclass. The batches are executed serially + * using the defined execution strategy. Within each batch, rules are also executed serially. + */ def apply(plan: TreeType): TreeType = { var curPlan = plan @@ -20,8 +35,8 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] { var lastPlan = curPlan curPlan = batch.rules.foldLeft(curPlan) { case (curPlan, rule) => rule(curPlan) } - while(iteration < batch.strategy.maxIterations && - !(curPlan fastEquals lastPlan)) { + // Run until fix point (or the max number of iterations as specified in the strategy. + while (iteration < batch.strategy.maxIterations && !curPlan.fastEquals(lastPlan)) { lastPlan = curPlan curPlan = batch.rules.foldLeft(curPlan) { case (curPlan, rule) => rule(curPlan) } iteration += 1 @@ -30,4 +45,4 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] { curPlan } -} \ No newline at end of file +} diff --git a/src/main/scala/catalyst/trees/TreeNode.scala b/src/main/scala/catalyst/trees/TreeNode.scala index cdec810c1b680..e503a270ef3b4 100644 --- a/src/main/scala/catalyst/trees/TreeNode.scala +++ b/src/main/scala/catalyst/trees/TreeNode.scala @@ -25,8 +25,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { val id = TreeNode.nextId() /** - * Returns true if other is the same [[TreeNode]] instance. Unlike [[equals]] this function will return false for - * different instances of structurally identical trees. + * Returns true if other is the same [[TreeNode]] instance. Unlike [[equals]] this function will + * return false for different instances of structurally identical trees. */ def sameInstance(other: TreeNode[_]): Boolean = { this.id == other.id @@ -34,8 +34,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { /** * Faster version of equality which short-circuits when two treeNodes are the same instance. - * We don't just override Object.Equals, as doing so prevents the scala compiler from from generating case class - * [[equals]] methods. + * We don't just override Object.Equals, as doing so prevents the scala compiler from from + * generating case class [[equals]] methods. */ def fastEquals(other: TreeNode[_]): Boolean = { sameInstance(other) || this == other @@ -62,8 +62,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { } /** - * Returns a Seq by applying a function to all nodes in this tree and using the elements of the resulting - * collections. + * Returns a Seq by applying a function to all nodes in this tree and using the elements of the + * resulting collections. */ def flatMap[A](f: BaseType => TraversableOnce[A]): Seq[A] = { val ret = new collection.mutable.ArrayBuffer[A]() @@ -72,8 +72,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { } /** - * Returns a Seq containing the result of applying a partial function to all elements in this tree on which the - * function is defined. + * Returns a Seq containing the result of applying a partial function to all elements in this + * tree on which the function is defined. */ def collect[B](pf: PartialFunction[BaseType, B]): Seq[B] = { val ret = new collection.mutable.ArrayBuffer[B]() @@ -91,10 +91,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { def transform(rule: PartialFunction[BaseType, BaseType]): BaseType = { val afterRule = rule.applyOrElse(this, identity[BaseType]) // Check if unchanged and then possibly return old copy to avoid gc churn. - if(this fastEquals afterRule) + if (this fastEquals afterRule) { transformChildren(rule) - else + } else { afterRule.transformChildren(rule) + } } /** @@ -106,16 +107,16 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { def transformChildren(rule: PartialFunction[BaseType, BaseType]): this.type = { var changed = false val newArgs = productIterator.map { - case arg: TreeNode[_] if(children contains arg) => - val newChild = arg.asInstanceOf[BaseType].transform(rule) - if(!(newChild fastEquals arg)) { - changed = true - newChild - } else { - arg - } + case arg: TreeNode[_] if (children contains arg) => + val newChild = arg.asInstanceOf[BaseType].transform(rule) + if (!(newChild fastEquals arg)) { + changed = true + newChild + } else { + arg + } case args: Seq[_] => args.map { - case arg: TreeNode[_] if(children contains arg) => + case arg: TreeNode[_] if (children contains arg) => val newChild = arg.asInstanceOf[BaseType].transform(rule) if(!(newChild fastEquals arg)) { changed = true @@ -146,10 +147,12 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { */ def makeCopy(newArgs: Array[AnyRef]): this.type = attachTree(this, "makeCopy") { try { - if(otherCopyArgs.isEmpty) - getClass.getConstructors.head.newInstance(newArgs: _*).asInstanceOf[this.type] - else - getClass.getConstructors.head.newInstance((newArgs ++ otherCopyArgs).toArray :_*).asInstanceOf[this.type] + val defaultCtor = getClass.getConstructors.head + if (otherCopyArgs.isEmpty) { + defaultCtor.newInstance(newArgs: _*).asInstanceOf[this.type] + } else { + defaultCtor.newInstance((newArgs ++ otherCopyArgs).toArray: _*).asInstanceOf[this.type] + } } catch { case e: java.lang.IllegalArgumentException => throw new OptimizationException( @@ -157,17 +160,18 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { } } - /** Returns the name of this type of TreeNode. Defaults to the class name. */ def nodeName = getClass.getSimpleName - /** The arguments that should be included in the arg string. Defaults to the [[productIterator]] */ + /** + * The arguments that should be included in the arg string. Defaults to the [[productIterator]]. + */ protected def stringArgs = productIterator /** Returns a string representing the arguments to this node, minus any children */ def argString: String = productIterator.flatMap { case tn: TreeNode[_] if children contains tn => Nil - case tn: TreeNode[_] if(tn.toString contains "\n") => s"(${tn.simpleString})" :: Nil + case tn: TreeNode[_] if tn.toString contains "\n" => s"(${tn.simpleString})" :: Nil case seq: Seq[_] => seq.mkString("{", ",", "}") :: Nil case other => other :: Nil }.mkString(", ") @@ -175,28 +179,28 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { /** String representation of this node without any children */ def simpleString = s"$nodeName $argString" - override def toString(): String = treeString + override def toString: String = treeString /** Returns a string representation of the nodes in this tree */ def treeString = generateTreeString(0, new StringBuilder).toString /** - * Returns a string representation of the nodes in this tree, where each operator is numbered. The numbers can be - * used with [[apply]] to easily access specific subtrees. + * Returns a string representation of the nodes in this tree, where each operator is numbered. + * The numbers can be used with [[apply]] to easily access specific subtrees. */ def numberedTreeString = - treeString.split("\n").zipWithIndex.map { case (line,i) => f"$i%02d $line" }.mkString("\n") + treeString.split("\n").zipWithIndex.map { case (line, i) => f"$i%02d $line" }.mkString("\n") def apply(depth: Int): BaseType = getNodeAtDepth(new MutableInt(depth)) protected def getNodeAtDepth(depth: MutableInt): BaseType = { - if(depth.i < 0) - return null.asInstanceOf[BaseType] - else if(depth.i == 0) + if (depth.i < 0) { + null.asInstanceOf[BaseType] + } else if (depth.i == 0) { this - else { + } else { depth.i -= 1 - children.map(_.getNodeAtDepth(depth)).collectFirst { case n if n != null => n }.getOrElse(sys.error("Invalid depth")) + children.map(_.getNodeAtDepth(depth)).find(_ != null).getOrElse(sys.error("Invalid depth")) } } diff --git a/src/main/scala/catalyst/types/package.scala b/src/main/scala/catalyst/types/package.scala index 0e26a04ccf911..872b5b1ab904a 100644 --- a/src/main/scala/catalyst/types/package.scala +++ b/src/main/scala/catalyst/types/package.scala @@ -1,6 +1,7 @@ package catalyst /** - * Contains a type system for attributes produced by relations, including complex types like structs, arrays and maps. + * Contains a type system for attributes produced by relations, including complex types like + * structs, arrays and maps. */ -package object types \ No newline at end of file +package object types From 5c421ac1421b3f04c7bcd658616b65deb3f3e31b Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 3 Jan 2014 04:58:04 -0800 Subject: [PATCH 3/8] Imported SharkEnv, SharkContext, and HadoopTableReader to remove Shark dependency. --- build.sbt | 2 +- .../execution/QueryExecutionException.scala | 3 + .../catalyst/execution/SharkContext.scala | 82 +++++++ .../scala/catalyst/execution/SharkEnv.scala | 43 ++++ .../catalyst/execution/SharkInstance.scala | 8 +- .../catalyst/execution/TableReader.scala | 230 ++++++++++++++++++ .../scala/catalyst/execution/TestShark.scala | 10 +- .../scala/catalyst/execution/Transform.scala | 7 +- .../scala/catalyst/execution/aggregates.scala | 2 - .../catalyst/execution/basicOperators.scala | 11 - .../catalyst/execution/hiveOperators.scala | 19 +- src/main/scala/catalyst/execution/joins.scala | 6 +- .../scala/catalyst/execution/package.scala | 2 - .../execution/planningStrategies.scala | 4 +- src/main/scala/catalyst/expressions/Row.scala | 4 +- .../catalyst/planning/QueryPlanner.scala | 20 +- .../execution/HiveComparisionTest.scala | 5 +- 17 files changed, 393 insertions(+), 65 deletions(-) create mode 100644 src/main/scala/catalyst/execution/QueryExecutionException.scala create mode 100644 src/main/scala/catalyst/execution/SharkContext.scala create mode 100644 src/main/scala/catalyst/execution/SharkEnv.scala create mode 100644 src/main/scala/catalyst/execution/TableReader.scala diff --git a/build.sbt b/build.sbt index 4a180507d74e1..963f7de8a4735 100644 --- a/build.sbt +++ b/build.sbt @@ -10,7 +10,7 @@ scalaVersion := "2.10.3" resolvers += "Local Maven Repository" at "file://"+Path.userHome.absolutePath+"/.m2/repository" -libraryDependencies += "edu.berkeley.cs.amplab" %% "shark" % "0.9.0-SNAPSHOT" +libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.0-incubating-SNAPSHOT" // Hive 0.10.0 relies on a weird version of jdo that is not published anywhere... Remove when we upgrade to 0.11.0 libraryDependencies += "javax.jdo" % "jdo2-api" % "2.3-ec" from "http://www.datanucleus.org/downloads/maven2/javax/jdo/jdo2-api/2.3-ec/jdo2-api-2.3-ec.jar" diff --git a/src/main/scala/catalyst/execution/QueryExecutionException.scala b/src/main/scala/catalyst/execution/QueryExecutionException.scala new file mode 100644 index 0000000000000..40ec17798c319 --- /dev/null +++ b/src/main/scala/catalyst/execution/QueryExecutionException.scala @@ -0,0 +1,3 @@ +package catalyst.execution + +class QueryExecutionException(message: String) extends Exception(message) diff --git a/src/main/scala/catalyst/execution/SharkContext.scala b/src/main/scala/catalyst/execution/SharkContext.scala new file mode 100644 index 0000000000000..c5378b39516fd --- /dev/null +++ b/src/main/scala/catalyst/execution/SharkContext.scala @@ -0,0 +1,82 @@ +package catalyst.execution + +import java.io.PrintStream +import java.util.{ArrayList => JArrayList} + +import scala.collection.Map +import scala.collection.JavaConversions._ + +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.Driver +import org.apache.hadoop.hive.ql.processors.CommandProcessor +import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse +import org.apache.hadoop.hive.ql.session.SessionState + +import org.apache.spark.{SparkContext, SparkEnv} + + +class SharkContext( + master: String, + jobName: String, + sparkHome: String, + jars: Seq[String], + environment: Map[String, String]) + extends SparkContext(master, jobName, sparkHome, jars, environment) { + + @transient val sparkEnv = SparkEnv.get + + SharkContext.init() + import SharkContext._ + + /** + * Execute the command and return the results as a sequence. Each element + * in the sequence is one row. + */ + def sql(cmd: String, maxRows: Int = 1000): Seq[String] = { + SparkEnv.set(sparkEnv) + val cmd_trimmed: String = cmd.trim() + val tokens: Array[String] = cmd_trimmed.split("\\s+") + val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() + val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hiveconf) + + SessionState.start(sessionState) + + if (proc.isInstanceOf[Driver]) { + val driver: Driver = proc.asInstanceOf[Driver] + driver.init() + + val results = new JArrayList[String] + val response: CommandProcessorResponse = driver.run(cmd) + // Throw an exception if there is an error in query processing. + if (response.getResponseCode != 0) { + driver.destroy() + throw new QueryExecutionException(response.getErrorMessage) + } + driver.setMaxRows(maxRows) + driver.getResults(results) + driver.destroy() + results + } else { + sessionState.out.println(tokens(0) + " " + cmd_1) + Seq(proc.run(cmd_1).getResponseCode.toString) + } + } +} + + +object SharkContext { + // Since we can never properly shut down Hive, we put the Hive related initializations + // here in a global singleton. + + @transient val hiveconf = new HiveConf(classOf[SessionState]) + + @transient val sessionState = new SessionState(hiveconf) + sessionState.out = new PrintStream(System.out, true, "UTF-8") + sessionState.err = new PrintStream(System.out, true, "UTF-8") + + // A dummy init to make sure the object is properly initialized. + def init() {} +} + + diff --git a/src/main/scala/catalyst/execution/SharkEnv.scala b/src/main/scala/catalyst/execution/SharkEnv.scala new file mode 100644 index 0000000000000..d316d6f0c5d0f --- /dev/null +++ b/src/main/scala/catalyst/execution/SharkEnv.scala @@ -0,0 +1,43 @@ +package catalyst.execution + +import scala.collection.mutable.{HashMap, HashSet} + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.StatsReportListener + + +/** A singleton object for the master program. The slaves should not access this. */ +object SharkEnv { + + def initWithSharkContext(jobName: String, master: String): SharkContext = { + if (sc != null) { + sc.stop() + } + + sc = new SharkContext( + if (master == null) "local" else master, + jobName, + System.getenv("SPARK_HOME"), + Nil, + executorEnvVars) + sc.addSparkListener(new StatsReportListener()) + sc.asInstanceOf[SharkContext] + } + + val executorEnvVars = new HashMap[String, String] + executorEnvVars.put("SCALA_HOME", getEnv("SCALA_HOME")) + executorEnvVars.put("SPARK_MEM", getEnv("SPARK_MEM")) + executorEnvVars.put("SPARK_CLASSPATH", getEnv("SPARK_CLASSPATH")) + executorEnvVars.put("HADOOP_HOME", getEnv("HADOOP_HOME")) + executorEnvVars.put("JAVA_HOME", getEnv("JAVA_HOME")) + executorEnvVars.put("MESOS_NATIVE_LIBRARY", getEnv("MESOS_NATIVE_LIBRARY")) + executorEnvVars.put("TACHYON_MASTER", getEnv("TACHYON_MASTER")) + executorEnvVars.put("TACHYON_WAREHOUSE_PATH", getEnv("TACHYON_WAREHOUSE_PATH")) + + val activeSessions = new HashSet[String] + + var sc: SparkContext = _ + + /** Return the value of an environmental variable as a string. */ + def getEnv(varname: String) = if (System.getenv(varname) == null) "" else System.getenv(varname) +} diff --git a/src/main/scala/catalyst/execution/SharkInstance.scala b/src/main/scala/catalyst/execution/SharkInstance.scala index edd7eb93b4431..8de19de4b44be 100644 --- a/src/main/scala/catalyst/execution/SharkInstance.scala +++ b/src/main/scala/catalyst/execution/SharkInstance.scala @@ -3,8 +3,6 @@ package execution import java.io.File -import shark.{SharkConfVars, SharkContext, SharkEnv} - import analysis.{SimpleAnalyzer, Analyzer} import frontend.hive._ import optimizer.Optimize @@ -39,16 +37,12 @@ abstract class SharkInstance extends Logging { /** The SharkContext */ lazy val sc = createContext() - protected def createContext() = { + protected def createContext(): SharkContext = { SharkEnv.initWithSharkContext("catalyst.execution", master) } /** Sets up the system initially or after a RESET command */ protected def configure() { - // Use hive natively for queries that won't be executed by catalyst. This is because - // shark has dependencies on a custom version of hive that we are trying to avoid in catalyst. - SharkConfVars.setVar(SharkContext.hiveconf, SharkConfVars.EXEC_MODE, "hive") - // TODO: refactor this so we can work with other databases. runSqlHive("set javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=" + metastorePath + ";create=true") runSqlHive("set hive.metastore.warehouse.dir=" + warehousePath) diff --git a/src/main/scala/catalyst/execution/TableReader.scala b/src/main/scala/catalyst/execution/TableReader.scala new file mode 100644 index 0000000000000..58b0c8f502591 --- /dev/null +++ b/src/main/scala/catalyst/execution/TableReader.scala @@ -0,0 +1,230 @@ +package catalyst.execution + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.serde2.Deserializer +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.io.Writable +import org.apache.hadoop.fs.{Path, PathFilter} +import org.apache.hadoop.mapred.{FileInputFormat, JobConf, InputFormat} + +import org.apache.spark.SerializableWritable +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.{HadoopRDD, UnionRDD, EmptyRDD, RDD} + + +/** + * A trait for subclasses that handle table scans. In Shark, there is one subclass for each + * type of table storage: HeapTableReader for Shark tables in Spark's block manager, + * TachyonTableReader for tables in Tachyon, and HadoopTableReader for Hive tables in a filesystem. + */ +sealed trait TableReader { + + def makeRDDForTable(hiveTable: HiveTable): RDD[_] + + def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[_] + +} + + +/** + * Helper class for scanning tables stored in Hadoop - e.g., to read Hive tables that reside in the + * data warehouse directory. + */ +class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf: HiveConf) + extends TableReader { + + // Choose the minimum number of splits. If mapred.map.tasks is set, then use that unless + // it is smaller than what Spark suggests. + private val _minSplitsPerRDD = math.max( + _localHConf.getInt("mapred.map.tasks", 1), SharkEnv.sc.defaultMinSplits) + + + // TODO: set aws s3 credentials. + + private val _broadcastedHiveConf = SharkEnv.sc.broadcast(new SerializableWritable(_localHConf)) + + def broadcastedHiveConf = _broadcastedHiveConf + + def hiveConf = _broadcastedHiveConf.value.value + + override def makeRDDForTable(hiveTable: HiveTable): RDD[_] = + makeRDDForTable( + hiveTable, + _tableDesc.getDeserializerClass.asInstanceOf[Class[Deserializer]], + filterOpt = None) + + /** + * Creates a Hadoop RDD to read data from the target table's data directory. Returns a transformed + * RDD that contains deserialized rows. + * + * @param hiveTable Hive metadata for the table being scanned. + * @param deserializerClass Class of the SerDe used to deserialize Writables read from Hadoop. + * @param filterOpt If defined, then the filter is used to reject files contained in the data + * directory being read. If None, then all files are accepted. + */ + def makeRDDForTable( + hiveTable: HiveTable, + deserializerClass: Class[_ <: Deserializer], + filterOpt: Option[PathFilter]): RDD[_] = + { + assert(!hiveTable.isPartitioned, """makeRDDForTable() cannot be called on a partitioned table, + since input formats may differ across partitions. Use makeRDDForTablePartitions() instead.""") + + // Create local references to member variables, so that the entire `this` object won't be + // serialized in the closure below. + val tableDesc = _tableDesc + val broadcastedHiveConf = _broadcastedHiveConf + + val tablePath = hiveTable.getPath + val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) + + //logDebug("Table input: %s".format(tablePath)) + val ifc = hiveTable.getInputFormatClass + .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] + val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) + + val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => + val hconf = broadcastedHiveConf.value.value + val deserializer = deserializerClass.newInstance().asInstanceOf[Deserializer] + deserializer.initialize(hconf, tableDesc.getProperties) + + // Deserialize each Writable to get the row value. + iter.map { value => + value match { + case v: Writable => deserializer.deserialize(v) + case _ => throw new RuntimeException("Failed to match " + value.toString) + } + } + } + deserializedHadoopRDD + } + + override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[_] = { + val partitionToDeserializer = partitions.map(part => + (part, part.getDeserializer.getClass.asInstanceOf[Class[Deserializer]])).toMap + makeRDDForPartitionedTable(partitionToDeserializer, filterOpt = None) + } + + /** + * Create a HadoopRDD for every partition key specified in the query. Note that for on-disk Hive + * tables, a data directory is created for each partition corresponding to keys specified using + * 'PARTITION BY'. + * + * @param partitionToDeserializer Mapping from a Hive Partition metadata object to the SerDe + * class to use to deserialize input Writables from the corresponding partition. + * @param filterOpt If defined, then the filter is used to reject files contained in the data + * subdirectory of each partition being read. If None, then all files are accepted. + */ + def makeRDDForPartitionedTable( + partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]], + filterOpt: Option[PathFilter]): RDD[_] = + { + val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) => + val partDesc = Utilities.getPartitionDesc(partition) + val partPath = partition.getPartitionPath + val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) + val ifc = partDesc.getInputFileFormatClass + .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] + // Get partition field info + val partSpec = partDesc.getPartSpec() + val partProps = partDesc.getProperties() + + // TODO: fix this. + //val partColsDelimited: String = partProps.getProperty(META_TABLE_PARTITION_COLUMNS) + val partColsDelimited = ";" + // Partitioning columns are delimited by "/" + val partCols = partColsDelimited.trim().split("/").toSeq + // 'partValues[i]' contains the value for the partitioning column at 'partCols[i]'. + val partValues = if (partSpec == null) { + Array.fill(partCols.size)(new String) + } else { + partCols.map(col => new String(partSpec.get(col))).toArray + } + + // Create local references so that the outer object isn't serialized. + val tableDesc = _tableDesc + val broadcastedHiveConf = _broadcastedHiveConf + val localDeserializer = partDeserializer + + val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) + hivePartitionRDD.mapPartitions { iter => + val hconf = broadcastedHiveConf.value.value + val rowWithPartArr = new Array[Object](2) + // Map each tuple to a row object + iter.map { value => + val deserializer = localDeserializer.newInstance() + deserializer.initialize(hconf, partProps) + val deserializedRow = deserializer.deserialize(value) // LazyStruct + rowWithPartArr.update(0, deserializedRow) + rowWithPartArr.update(1, partValues) + rowWithPartArr.asInstanceOf[Object] + } + } + }.toSeq + // Even if we don't use any partitions, we still need an empty RDD + if (hivePartitionRDDs.size == 0) { + new EmptyRDD[Object](SharkEnv.sc) + } else { + new UnionRDD(hivePartitionRDDs(0).context, hivePartitionRDDs) + } + } + + /** + * If `filterOpt` is defined, then it will be used to filter files from `path`. These files are + * returned in a single, comma-separated string. + */ + private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = { + filterOpt match { + case Some(filter) => { + val fs = path.getFileSystem(_localHConf) + val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString) + filteredFiles.mkString(",") + } + case None => path.toString + } + } + + /** + * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be + * applied locally on each slave. + */ + private def createHadoopRdd( + tableDesc: TableDesc, + path: String, + inputFormatClass: Class[InputFormat[Writable, Writable]]) + : RDD[Writable] = { + val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ + + val rdd = new HadoopRDD( + SharkEnv.sc, + _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]], + Some(initializeJobConfFunc), + inputFormatClass, + classOf[Writable], + classOf[Writable], + _minSplitsPerRDD) + + // Only take the value (skip the key) because Hive works only with values. + rdd.map(_._2) + } + +} + +object HadoopTableReader { + + /** + * Curried. After given an argument for 'path', the resulting JobConf => Unit closure is used to + * instantiate a HadoopRDD. + */ + def initializeLocalJobConfFunc(path: String, tableDesc: TableDesc)(jobConf: JobConf) { + FileInputFormat.setInputPaths(jobConf, path) + if (tableDesc != null) { + Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf) + } + val bufferSize = System.getProperty("spark.buffer.size", "65536") + jobConf.set("io.file.buffer.size", bufferSize) + } +} diff --git a/src/main/scala/catalyst/execution/TestShark.scala b/src/main/scala/catalyst/execution/TestShark.scala index e500d4c79c080..f9b150a82aa52 100644 --- a/src/main/scala/catalyst/execution/TestShark.scala +++ b/src/main/scala/catalyst/execution/TestShark.scala @@ -2,21 +2,19 @@ package catalyst package execution import java.io.File + import scala.collection.mutable +import scala.collection.JavaConversions._ +import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor} import org.apache.hadoop.hive.metastore.MetaStoreUtils -import shark.SharkEnv +import org.apache.hadoop.hive.ql.exec.FunctionRegistry import analysis._ import plans.logical.LogicalPlan import frontend.hive._ import util._ -import collection.JavaConversions._ -import org.apache.hadoop.hive.ql.exec.FunctionRegistry -import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor} - - /** * A locally running test instance of spark. The lifecycle for a given query is managed by the inner class * [[SharkQuery]]. A [[SharkQuery]] can either be instantiated directly or using the implicit conversion '.q'. diff --git a/src/main/scala/catalyst/execution/Transform.scala b/src/main/scala/catalyst/execution/Transform.scala index 0178efbc8c9dd..3a00fa4560920 100644 --- a/src/main/scala/catalyst/execution/Transform.scala +++ b/src/main/scala/catalyst/execution/Transform.scala @@ -1,11 +1,12 @@ package catalyst package execution -import catalyst.expressions._ -import shark.SharkContext import java.io.{InputStreamReader, BufferedReader} -import collection.JavaConversions._ +import scala.collection.JavaConversions._ + +import catalyst.expressions._ + /** * Transforms the input by forking and running the specified script. diff --git a/src/main/scala/catalyst/execution/aggregates.scala b/src/main/scala/catalyst/execution/aggregates.scala index dfdb6e68fee31..8a507c81b6b72 100644 --- a/src/main/scala/catalyst/execution/aggregates.scala +++ b/src/main/scala/catalyst/execution/aggregates.scala @@ -3,8 +3,6 @@ package execution import catalyst.errors._ import catalyst.expressions._ -import catalyst.types.IntegerType -import shark.SharkContext /* Implicits */ import org.apache.spark.SparkContext._ diff --git a/src/main/scala/catalyst/execution/basicOperators.scala b/src/main/scala/catalyst/execution/basicOperators.scala index a4ea279240041..4535ccb708772 100644 --- a/src/main/scala/catalyst/execution/basicOperators.scala +++ b/src/main/scala/catalyst/execution/basicOperators.scala @@ -1,21 +1,10 @@ package catalyst package execution -import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} -import org.apache.hadoop.hive.serde2.objectinspector.{PrimitiveObjectInspector, StructObjectInspector} -import shark.execution.HadoopTableReader -import shark.{SharkContext, SharkEnv} - import errors._ import expressions._ import types._ -import collection.JavaConversions._ -import org.apache.hadoop.hive.ql.exec.OperatorFactory -import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils -import org.apache.hadoop.hive.serde2.Serializer -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext._ diff --git a/src/main/scala/catalyst/execution/hiveOperators.scala b/src/main/scala/catalyst/execution/hiveOperators.scala index fb9e94074dd43..47b66c1d1d71d 100644 --- a/src/main/scala/catalyst/execution/hiveOperators.scala +++ b/src/main/scala/catalyst/execution/hiveOperators.scala @@ -9,9 +9,6 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazyStruct import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.mapred.JobConf -import shark.execution.HadoopTableReader -import shark.SharkContext - import expressions.Attribute import util._ @@ -34,8 +31,8 @@ case class HiveTableScan(attributes: Seq[Attribute], relation: MetastoreRelation * Functions that extract the requested attributes from the hive output. */ @transient - protected lazy val attributeFunctions = attributes.map { a => - if(relation.partitionKeys.contains(a)) { + protected lazy val attributeFunctions: Seq[(LazyStruct, Array[String]) => AnyRef] = attributes.map { a => + if (relation.partitionKeys.contains(a)) { val ordinal = relation.partitionKeys.indexOf(a) (struct: LazyStruct, partitionKeys: Array[String]) => partitionKeys(ordinal) } else { @@ -53,11 +50,11 @@ case class HiveTableScan(attributes: Seq[Attribute], relation: MetastoreRelation } @transient - def inputRdd = - if(!relation.hiveQlTable.isPartitioned) - hadoopReader.makeRDDForTable(relation.hiveQlTable) - else - hadoopReader.makeRDDForPartitionedTable(relation.hiveQlPartitions) + def inputRdd = if (!relation.hiveQlTable.isPartitioned) { + hadoopReader.makeRDDForTable(relation.hiveQlTable) + } else { + hadoopReader.makeRDDForPartitionedTable(relation.hiveQlPartitions) + } def execute() = { inputRdd.map { row => @@ -117,7 +114,7 @@ case class InsertIntoHiveTable(table: MetastoreRelation, partition: Map[String, s"PARTITION (${partition.map { case (k,v) => s"$k=$v" }.mkString(",")})" else "" - sc.runSql(s"LOAD DATA LOCAL INPATH '${tempDir.getCanonicalPath}/*' INTO TABLE ${table.tableName} $partitionSpec") + sc.sql(s"LOAD DATA LOCAL INPATH '${tempDir.getCanonicalPath}/*' INTO TABLE ${table.tableName} $partitionSpec") // It would be nice to just return the childRdd unchanged so insert operations could be chained, // however for now we return an empty list to simplify compatibility checks with hive, which diff --git a/src/main/scala/catalyst/execution/joins.scala b/src/main/scala/catalyst/execution/joins.scala index 706d52d1fea93..6fbb8a9893c4b 100644 --- a/src/main/scala/catalyst/execution/joins.scala +++ b/src/main/scala/catalyst/execution/joins.scala @@ -1,15 +1,13 @@ package catalyst package execution +import scala.collection.mutable + import org.apache.spark.rdd.RDD import errors._ import expressions._ import plans._ -import shark.SharkContext -import org.apache.spark.util.collection.BitSet -import scala.collection -import scala.collection.mutable /* Implicits */ import org.apache.spark.SparkContext._ diff --git a/src/main/scala/catalyst/execution/package.scala b/src/main/scala/catalyst/execution/package.scala index 6e366263d3511..2a073f3e49558 100644 --- a/src/main/scala/catalyst/execution/package.scala +++ b/src/main/scala/catalyst/execution/package.scala @@ -1,7 +1,5 @@ package catalyst -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo - import types._ /** diff --git a/src/main/scala/catalyst/execution/planningStrategies.scala b/src/main/scala/catalyst/execution/planningStrategies.scala index 556503b6fbbdc..2615dfff5108b 100644 --- a/src/main/scala/catalyst/execution/planningStrategies.scala +++ b/src/main/scala/catalyst/execution/planningStrategies.scala @@ -1,14 +1,12 @@ package catalyst package execution -import shark.SharkContext - import expressions._ import planning._ import plans._ import plans.logical.LogicalPlan -abstract trait PlanningStrategies { +trait PlanningStrategies { self: QueryPlanner[SharkPlan] => val sc: SharkContext diff --git a/src/main/scala/catalyst/expressions/Row.scala b/src/main/scala/catalyst/expressions/Row.scala index 391a87b1f338a..7a39508d973be 100644 --- a/src/main/scala/catalyst/expressions/Row.scala +++ b/src/main/scala/catalyst/expressions/Row.scala @@ -5,8 +5,8 @@ package expressions * Represents one row of output from a relational operator. Allows both generic access by ordinal, * which will incur boxing overhead for primitives, as well as native primitive access. * - * It is invalid to use the native primitive interface to retrieve a value that is null, instead a user - * must check [[isNullAt]] before attempting to retrieve a value that might be null. + * It is invalid to use the native primitive interface to retrieve a value that is null, instead a + * user must check [[isNullAt]] before attempting to retrieve a value that might be null. */ abstract class Row extends Seq[Any] with Serializable { def apply(i: Int): Any diff --git a/src/main/scala/catalyst/planning/QueryPlanner.scala b/src/main/scala/catalyst/planning/QueryPlanner.scala index 8f793564e09ca..7024ca4b75bb3 100644 --- a/src/main/scala/catalyst/planning/QueryPlanner.scala +++ b/src/main/scala/catalyst/planning/QueryPlanner.scala @@ -6,10 +6,11 @@ import plans.logical.LogicalPlan import trees._ /** - * Extended by classes that transform [[plans.logical.LogicalPlan]]s into Physical plans. Child classes are - * responsible for specifying a list of [[Strategy]] objects that each of which can return a list of possible - * physical plan options. If a given strategy is unable to plan all of the remaining operators in the tree, it can - * call [[planLater]], which returns a placeholder object that will be filled in using other available strategies. + * Extended by classes that transform [[LogicalPlan]]s into physical plans. Child classes are + * responsible for specifying a list of [[Strategy]] objects that each of which can return a list + * of possible physical plan options. If a given strategy is unable to plan all of the remaining + * operators in the tree, it can call [[planLater]], which returns a placeholder object that will + * be filled in using other available strategies. * * NOTE: RIGHT NOW ONLY ONE PLAN IS RETURNED EVER... PLAN SPACE EXPLORATION WILL BE IMPLEMENTED LATER. * @@ -20,17 +21,18 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { def strategies: Seq[Strategy] /** - * Given a [[plan.logical.LogicalPlan]], returns a list of [[PhysicalPlans]] that can be used for execution. If this - * strategy does not apply to the give logical operation then an empty list should be returned. + * Given a [[LogicalPlan]], returns a list of [[PhysicalPlans]] that can be used for execution. + * If this strategy does not apply to the give logical operation then an empty list should be + * returned. */ abstract protected class Strategy extends Logging { def apply(plan: LogicalPlan): Seq[PhysicalPlan] - } /** - * Returns a placeholder for a physical plan that executes [[plan]]. This placeholder will be filled in automatically - * by the QueryPlanner using the other execution strategies that are available. + * Returns a placeholder for a physical plan that executes [[plan]]. This placeholder will be + * filled in automatically by the QueryPlanner using the other execution strategies that are + * available. */ protected def planLater(plan: LogicalPlan) = apply(plan).next() diff --git a/src/test/scala/catalyst/execution/HiveComparisionTest.scala b/src/test/scala/catalyst/execution/HiveComparisionTest.scala index 203a5c22b5919..e7183a94d419e 100644 --- a/src/test/scala/catalyst/execution/HiveComparisionTest.scala +++ b/src/test/scala/catalyst/execution/HiveComparisionTest.scala @@ -1,22 +1,19 @@ package catalyst package execution -import shark.{SharkContext, SharkEnv} - import java.io._ import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen} import frontend.hive.{ExplainCommand, Command} import util._ -import collection.JavaConversions._ - /** * Allows the creations of tests that execute the same query against both hive * and catalyst, comparing the results. * * The "golden" results from Hive are cached in [[answerCache]] to speed up testing. */ +// TODO: correct the mispelled name. abstract class HiveComaparisionTest extends FunSuite with BeforeAndAfterAll with GivenWhenThen with Logging { protected val targetDir = new File("target") protected val answerCache = new File(targetDir, "comparison-test-cache") From d48d0e1c10a9e0a46409437ab018f8760e65857b Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 3 Jan 2014 12:12:34 -0800 Subject: [PATCH 4/8] Code review feedback. --- src/main/scala/catalyst/analysis/Analyzer.scala | 4 ++-- src/main/scala/catalyst/analysis/typeCoercion.scala | 10 +++++----- .../scala/catalyst/execution/FunctionRegistry.scala | 2 +- src/main/scala/catalyst/execution/SharkContext.scala | 2 +- src/main/scala/catalyst/execution/SharkInstance.scala | 2 +- src/main/scala/catalyst/execution/hiveOperators.scala | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/main/scala/catalyst/analysis/Analyzer.scala b/src/main/scala/catalyst/analysis/Analyzer.scala index 490dcb3b29008..24d705ed4b5bb 100644 --- a/src/main/scala/catalyst/analysis/Analyzer.scala +++ b/src/main/scala/catalyst/analysis/Analyzer.scala @@ -122,7 +122,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Wait until children are resolved case p: LogicalPlan if !childIsFullyResolved(p) => p - // If the projection list contains Star's, expand it. + // If the projection list contains Stars, expand it. case p @ Project(projectList, child) if containsStar(projectList) => Project( projectList.flatMap { @@ -130,7 +130,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool case o => o :: Nil }, child) - // If the aggregate function argument contains Star's, expand it. + // If the aggregate function argument contains Stars, expand it. case a: Aggregate if containsStar(a.aggregateExpressions) => a.copy( aggregateExpressions = a.aggregateExpressions.flatMap { diff --git a/src/main/scala/catalyst/analysis/typeCoercion.scala b/src/main/scala/catalyst/analysis/typeCoercion.scala index 0edb6ef12d32e..968d9135632dd 100644 --- a/src/main/scala/catalyst/analysis/typeCoercion.scala +++ b/src/main/scala/catalyst/analysis/typeCoercion.scala @@ -43,11 +43,11 @@ object ConvertNaNs extends Rule[LogicalPlan] { * Loosely based on rules from "Hadoop: The Definitive Guide" 2nd edition, by Tom White * * The implicit conversion rules can be summarized as follows: - * - Any integral numeric type can be implicitly converted to a wider type. - * - All the integral numeric types, FLOAT, and (perhaps surprisingly) STRING can be implicitly - * converted to DOUBLE. - * - TINYINT, SMALLINT, and INT can all be converted to FLOAT. - * - BOOLEAN types cannot be converted to any other type. + * $ - Any integral numeric type can be implicitly converted to a wider type. + * $ - All the integral numeric types, FLOAT, and (perhaps surprisingly) STRING can be implicitly + * converted to DOUBLE. + * $ - TINYINT, SMALLINT, and INT can all be converted to FLOAT. + * $ - BOOLEAN types cannot be converted to any other type. * * String conversions are handled by the PromoteStrings rule. */ diff --git a/src/main/scala/catalyst/execution/FunctionRegistry.scala b/src/main/scala/catalyst/execution/FunctionRegistry.scala index a86f8f0a80a14..e46bd1163c99a 100644 --- a/src/main/scala/catalyst/execution/FunctionRegistry.scala +++ b/src/main/scala/catalyst/execution/FunctionRegistry.scala @@ -103,7 +103,7 @@ case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUd @transient lazy val method = function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo)) @transient - lazy val dataType: DataType = javaClassToDataType(method.getReturnType) + lazy val dataType = javaClassToDataType(method.getReturnType) lazy val wrappers = method.getParameterTypes.map { argClass => val primitiveClasses = Seq( diff --git a/src/main/scala/catalyst/execution/SharkContext.scala b/src/main/scala/catalyst/execution/SharkContext.scala index c5378b39516fd..a56b545e666c2 100644 --- a/src/main/scala/catalyst/execution/SharkContext.scala +++ b/src/main/scala/catalyst/execution/SharkContext.scala @@ -33,7 +33,7 @@ class SharkContext( * Execute the command and return the results as a sequence. Each element * in the sequence is one row. */ - def sql(cmd: String, maxRows: Int = 1000): Seq[String] = { + def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = { SparkEnv.set(sparkEnv) val cmd_trimmed: String = cmd.trim() val tokens: Array[String] = cmd_trimmed.split("\\s+") diff --git a/src/main/scala/catalyst/execution/SharkInstance.scala b/src/main/scala/catalyst/execution/SharkInstance.scala index 8de19de4b44be..fb51587d602aa 100644 --- a/src/main/scala/catalyst/execution/SharkInstance.scala +++ b/src/main/scala/catalyst/execution/SharkInstance.scala @@ -61,7 +61,7 @@ abstract class SharkInstance extends Logging { */ def runSqlHive(sql: String): Seq[String] = { val maxResults = 100000 - val results = sc.sql(sql, 100000) + val results = sc.runHive(sql, 100000) // It is very confusing when you only get back some of the results... if (results.size == maxResults) sys.error("RESULTS POSSIBLY TRUNCATED") results diff --git a/src/main/scala/catalyst/execution/hiveOperators.scala b/src/main/scala/catalyst/execution/hiveOperators.scala index 47b66c1d1d71d..c3c490e611276 100644 --- a/src/main/scala/catalyst/execution/hiveOperators.scala +++ b/src/main/scala/catalyst/execution/hiveOperators.scala @@ -114,7 +114,7 @@ case class InsertIntoHiveTable(table: MetastoreRelation, partition: Map[String, s"PARTITION (${partition.map { case (k,v) => s"$k=$v" }.mkString(",")})" else "" - sc.sql(s"LOAD DATA LOCAL INPATH '${tempDir.getCanonicalPath}/*' INTO TABLE ${table.tableName} $partitionSpec") + sc.runHive(s"LOAD DATA LOCAL INPATH '${tempDir.getCanonicalPath}/*' INTO TABLE ${table.tableName} $partitionSpec") // It would be nice to just return the childRdd unchanged so insert operations could be chained, // however for now we return an empty list to simplify compatibility checks with hive, which From 555d8393c6c222635eebe370f516a3f8e194e9e7 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 3 Jan 2014 12:17:14 -0800 Subject: [PATCH 5/8] More cleaning ... --- .../catalyst/execution/FunctionRegistry.scala | 13 ++- .../catalyst/execution/MetastoreCatalog.scala | 13 ++- .../catalyst/execution/SharkInstance.scala | 3 +- .../scala/catalyst/execution/TestShark.scala | 81 +++++++++------- .../scala/catalyst/execution/aggregates.scala | 95 ++++++++++--------- .../catalyst/execution/basicOperators.scala | 49 +++++----- .../catalyst/execution/hiveOperators.scala | 61 +++++++----- src/main/scala/catalyst/execution/joins.scala | 29 +++--- .../scala/catalyst/execution/package.scala | 10 +- .../execution/planningStrategies.scala | 26 +++-- .../scala/catalyst/expressions/Evaluate.scala | 2 +- .../scala/catalyst/frontend/package.scala | 6 +- .../plans/logical/basicOperators.scala | 6 +- 13 files changed, 227 insertions(+), 167 deletions(-) diff --git a/src/main/scala/catalyst/execution/FunctionRegistry.scala b/src/main/scala/catalyst/execution/FunctionRegistry.scala index e46bd1163c99a..f89329ea60c0b 100644 --- a/src/main/scala/catalyst/execution/FunctionRegistry.scala +++ b/src/main/scala/catalyst/execution/FunctionRegistry.scala @@ -6,7 +6,7 @@ import scala.collection.JavaConversions._ import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry} import org.apache.hadoop.hive.ql.udf.generic.GenericUDF import org.apache.hadoop.hive.ql.exec.UDF -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory +import org.apache.hadoop.hive.serde2.objectinspector.primitive.{AbstractPrimitiveJavaObjectInspector, PrimitiveObjectInspectorFactory} import org.apache.hadoop.io._ import expressions._ @@ -105,7 +105,7 @@ case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUd @transient lazy val dataType = javaClassToDataType(method.getReturnType) - lazy val wrappers = method.getParameterTypes.map { argClass => + lazy val wrappers: Array[(Any) => AnyRef] = method.getParameterTypes.map { argClass => val primitiveClasses = Seq( Integer.TYPE, classOf[java.lang.Integer], classOf[java.lang.String], java.lang.Double.TYPE, classOf[java.lang.Double], java.lang.Long.TYPE, classOf[java.lang.Long] @@ -118,7 +118,8 @@ case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUd sys.error(s"No matching wrapper found, options: ${argClass.getConstructors.toSeq}.")) (a: Any) => { - logger.debug(s"Wrapping $a of type ${if (a == null) "null" else a.getClass.getName} using $constructor.") + logger.debug( + s"Wrapping $a of type ${if (a == null) "null" else a.getClass.getName} using $constructor.") // We must make sure that primitives get boxed java style. if (a == null) { null @@ -150,7 +151,7 @@ case class HiveGenericUdf( import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._ type UDFType = GenericUDF - lazy val inspectors = children.map(_.dataType).map { + lazy val inspectors: Seq[AbstractPrimitiveJavaObjectInspector] = children.map(_.dataType).map { case StringType => PrimitiveObjectInspectorFactory.javaStringObjectInspector case IntegerType => PrimitiveObjectInspectorFactory.javaIntObjectInspector case DoubleType => PrimitiveObjectInspectorFactory.javaDoubleObjectInspector @@ -178,7 +179,9 @@ case class HiveGenericUdf( } def evaluate(evaluatedChildren: Seq[Any]): Any = { - val args = evaluatedChildren.map(wrap).map(v => new DeferredJavaObject(v): DeferredObject).toArray + val args = evaluatedChildren.map(wrap).map { v => + new DeferredJavaObject(v): DeferredObject + }.toArray unwrap(instance.evaluate(args)) } } \ No newline at end of file diff --git a/src/main/scala/catalyst/execution/MetastoreCatalog.scala b/src/main/scala/catalyst/execution/MetastoreCatalog.scala index 46906149873ad..33f1abeaaed72 100644 --- a/src/main/scala/catalyst/execution/MetastoreCatalog.scala +++ b/src/main/scala/catalyst/execution/MetastoreCatalog.scala @@ -5,6 +5,8 @@ import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.{FieldSchema, Partition, Table, StorageDescriptor, SerDeInfo} import org.apache.hadoop.hive.metastore.HiveMetaStoreClient import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde2.Deserializer +import org.apache.hadoop.mapred.InputFormat import analysis.Catalog import expressions._ @@ -88,10 +90,14 @@ case class MetastoreRelation(databaseName: String, tableName: String, alias: Opt extends BaseRelation { def hiveQlTable = new org.apache.hadoop.hive.ql.metadata.Table(table) - def hiveQlPartitions = partitions.map(new org.apache.hadoop.hive.ql.metadata.Partition(hiveQlTable, _)) + + def hiveQlPartitions = partitions.map { p => + new org.apache.hadoop.hive.ql.metadata.Partition(hiveQlTable, p) + } + val tableDesc = new TableDesc( - Class.forName(table.getSd.getSerdeInfo.getSerializationLib).asInstanceOf[Class[org.apache.hadoop.hive.serde2.Deserializer]], - Class.forName(table.getSd.getInputFormat).asInstanceOf[Class[org.apache.hadoop.mapred.InputFormat[_,_]]], + Class.forName(table.getSd.getSerdeInfo.getSerializationLib).asInstanceOf[Class[Deserializer]], + Class.forName(table.getSd.getInputFormat).asInstanceOf[Class[InputFormat[_,_]]], Class.forName(table.getSd.getOutputFormat), hiveQlTable.getSchema ) @@ -110,4 +116,3 @@ case class MetastoreRelation(databaseName: String, tableName: String, alias: Opt // Must be a stable value since new attributes are born here. val output = partitionKeys ++ table.getSd.getCols.map(_.toAttribute) } - diff --git a/src/main/scala/catalyst/execution/SharkInstance.scala b/src/main/scala/catalyst/execution/SharkInstance.scala index fb51587d602aa..1a59488aaf171 100644 --- a/src/main/scala/catalyst/execution/SharkInstance.scala +++ b/src/main/scala/catalyst/execution/SharkInstance.scala @@ -44,7 +44,8 @@ abstract class SharkInstance extends Logging { /** Sets up the system initially or after a RESET command */ protected def configure() { // TODO: refactor this so we can work with other databases. - runSqlHive("set javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=" + metastorePath + ";create=true") + runSqlHive("set javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=" + metastorePath + + ";create=true") runSqlHive("set hive.metastore.warehouse.dir=" + warehousePath) } diff --git a/src/main/scala/catalyst/execution/TestShark.scala b/src/main/scala/catalyst/execution/TestShark.scala index f9b150a82aa52..3cbfb80d297c2 100644 --- a/src/main/scala/catalyst/execution/TestShark.scala +++ b/src/main/scala/catalyst/execution/TestShark.scala @@ -2,6 +2,7 @@ package catalyst package execution import java.io.File +import java.util.{Set => JavaSet} import scala.collection.mutable import scala.collection.JavaConversions._ @@ -16,8 +17,9 @@ import frontend.hive._ import util._ /** - * A locally running test instance of spark. The lifecycle for a given query is managed by the inner class - * [[SharkQuery]]. A [[SharkQuery]] can either be instantiated directly or using the implicit conversion '.q'. + * A locally running test instance of spark. The lifecycle for a given query is managed by the + * inner class [[SharkQuery]]. A [[SharkQuery]] can either be instantiated directly or using the + * implicit conversion '.q'. * * {{{ * scala> val query = "SELECT key FROM src".q @@ -34,12 +36,13 @@ import util._ * res0: Array[IndexedSeq[Any]] = Array(Vector(238), Vector(86), Vector(311), ... * }}} * - * Data from [[testTables]] will be automatically loaded whenever a query is run over those tables. Calling - * [[reset]] will delete all tables and other state in the database, leaving the database in a "clean" state. + * Data from [[testTables]] will be automatically loaded whenever a query is run over those tables. + * Calling [[reset]] will delete all tables and other state in the database, leaving the database + * in a "clean" state. * - * TestShark is implemented as a singleton object because instantiating multiple copies of the hive metastore - * seems to lead to weird non-deterministic failures. Therefore, the execution of testcases that rely on TestShark - * must be serialized. + * TestShark is implemented as a singleton object because instantiating multiple copies of the hive + * metastore seems to lead to weird non-deterministic failures. Therefore, the execution of + * testcases that rely on TestShark must be serialized. */ object TestShark extends SharkInstance { self => @@ -48,7 +51,7 @@ object TestShark extends SharkInstance { lazy val warehousePath = getTempFilePath("sharkWarehouse").getCanonicalPath lazy val metastorePath = getTempFilePath("sharkMetastore").getCanonicalPath - override protected def createContext = { + override protected def createContext() = { // By clearing the port we force Spark to pick a new one. This allows us to rerun tests // without restarting the JVM. System.clearProperty("spark.driver.port") @@ -65,7 +68,10 @@ object TestShark extends SharkInstance { // Override so we can intercept relative paths and rewrite them to point at hive. override def runSqlHive(sql: String): Seq[String] = super.runSqlHive(rewritePaths(sql)) - /** Returns the value of specified environmental variable as a [[java.io.File]] after checking to ensure it exists */ + /** + * Returns the value of specified environmental variable as a [[java.io.File]] after checking + * to ensure it exists + */ private def envVarToFile(envVar: String): File = { assert(System.getenv(envVar) != null, s"$envVar not set") val ret = new File(System.getenv(envVar)) @@ -74,8 +80,8 @@ object TestShark extends SharkInstance { } /** - * Replaces relative paths to the parent directory "../" with hiveDevHome since this is how the hive test cases - * assume the system is set up. + * Replaces relative paths to the parent directory "../" with hiveDevHome since this is how the + * hive test cases assume the system is set up. */ private def rewritePaths(cmd: String): String = if(cmd.toUpperCase startsWith "LOAD") @@ -84,6 +90,7 @@ object TestShark extends SharkInstance { cmd val describedTable = "DESCRIBE (\\w+)".r + /** * Override SharkQuery with special debug workflow. */ @@ -105,8 +112,10 @@ object TestShark extends SharkInstance { analyze(parsed) } - /** Runs the query after interposing operators that print the result of each intermediate step. */ - def debugExec() = DebugQuery(executedPlan).execute().collect + /** + * Runs the query after interposing operators that print the result of each intermediate step. + */ + def debugExec() = DebugQuery(executedPlan).execute().collect() } class SharkSqlQuery(sql: String) extends SharkQuery { @@ -122,15 +131,17 @@ object TestShark extends SharkInstance { def q = new SharkSqlQuery(str) } - implicit override def logicalToSharkQuery(plan: LogicalPlan) = new LogicalSharkQuery { val parsed = plan } + implicit override def logicalToSharkQuery(plan: LogicalPlan) = new LogicalSharkQuery { + val parsed = plan + } case class TestTable(name: String, commands: (()=>Unit)*) implicit class SqlCmd(sql: String) { def cmd = () => sql.q.stringResult(): Unit} /** - * A list of test tables and the DDL required to initialize them. A test table is loaded on demand when a query - * are run against it. + * A list of test tables and the DDL required to initialize them. A test table is loaded on + * demand when a query are run against it. */ val testTables = new mutable.HashMap[String, TestTable]() def registerTestTable(testTable: TestTable) = testTables += (testTable.name -> testTable) @@ -175,10 +186,9 @@ object TestShark extends SharkInstance { srcThrift.getSd.setOutputFormat(classOf[SequenceFileOutputFormat[_,_]].getName) srcThrift.getSd.setSerdeInfo(new SerDeInfo) srcThrift.getSd.getSerdeInfo.setSerializationLib(classOf[ThriftDeserializer].getName) - srcThrift.getSd.getSerdeInfo.setParameters( - Map( - "serialization.class" -> classOf[Complex].getName, - "serialization.format" -> classOf[TBinaryProtocol].getName)) + srcThrift.getSd.getSerdeInfo.setParameters(Map( + "serialization.class" -> classOf[Complex].getName, + "serialization.format" -> classOf[TBinaryProtocol].getName)) catalog.client.createTable(srcThrift) @@ -189,8 +199,9 @@ object TestShark extends SharkInstance { hiveQTestUtilTables.foreach(registerTestTable) private val loadedTables = new collection.mutable.HashSet[String] + def loadTestTable(name: String) { - if(!(loadedTables contains name)) { + if (!(loadedTables contains name)) { logger.info(s"Loading test table $name") val createCmds = testTables.get(name).map(_.commands).getOrElse(sys.error(s"Unknown test table $name")) @@ -203,7 +214,7 @@ object TestShark extends SharkInstance { * Records the UDFs present when the server starts, so we can delete ones that are created by * tests. */ - protected val originalUdfs = FunctionRegistry.getFunctionNames() + protected val originalUdfs: JavaSet[String] = FunctionRegistry.getFunctionNames /** * Resets the test instance by deleting any tables that have been created. @@ -212,27 +223,32 @@ object TestShark extends SharkInstance { def reset() { try { // HACK: Hive is too noisy by default. - org.apache.log4j.LogManager.getCurrentLoggers.foreach(_.asInstanceOf[org.apache.log4j.Logger].setLevel(org.apache.log4j.Level.WARN)) + org.apache.log4j.LogManager.getCurrentLoggers.foreach { logger => + logger.asInstanceOf[org.apache.log4j.Logger].setLevel(org.apache.log4j.Level.WARN) + } - // It is important that we RESET first as broken hooks that might have been set could break other sql exec here. + // It is important that we RESET first as broken hooks that might have been set could break + // other sql exec here. runSqlHive("RESET") // For some reason, RESET does not reset the following variables... runSqlHive("set datanucleus.cache.collections=true") runSqlHive("set datanucleus.cache.collections.lazy=true") loadedTables.clear() - catalog.client.getAllTables("default").foreach(t => { + catalog.client.getAllTables("default").foreach { t => logger.debug(s"Deleting table $t") val table = catalog.client.getTable("default", t) - catalog.client.listIndexes("default", t, 255) - .foreach(i => catalog.client.dropIndex("default", t, i.getIndexName, true)) + catalog.client.listIndexes("default", t, 255).foreach { index => + catalog.client.dropIndex("default", t, index.getIndexName, true) + } - if(!MetaStoreUtils.isIndexTable(table)) + if (!MetaStoreUtils.isIndexTable(table)) { catalog.client.dropTable("default", t) - }) + } + } - catalog.client.getAllDatabases.filterNot(_ == "default").foreach {db => + catalog.client.getAllDatabases.filterNot(_ == "default").foreach { db => logger.debug(s"Dropping Database: $db") catalog.client.dropDatabase(db, true, false, true) } @@ -247,8 +263,9 @@ object TestShark extends SharkInstance { } catch { case e: Exception => logger.error(s"FATAL ERROR: Failed to reset TestDB state. $e") - // At this point there is really no reason to continue, but the test framework traps exits. So instead we just - // pause forever so that at least the developer can see where things started to go wrong. + // At this point there is really no reason to continue, but the test framework traps exits. + // So instead we just pause forever so that at least the developer can see where things + // started to go wrong. Thread.sleep(100000) } } diff --git a/src/main/scala/catalyst/execution/aggregates.scala b/src/main/scala/catalyst/execution/aggregates.scala index 8a507c81b6b72..bd2a38eb0903e 100644 --- a/src/main/scala/catalyst/execution/aggregates.scala +++ b/src/main/scala/catalyst/execution/aggregates.scala @@ -7,11 +7,15 @@ import catalyst.expressions._ /* Implicits */ import org.apache.spark.SparkContext._ -case class Aggregate(groupingExpressions: Seq[Expression], - aggregateExpressions: Seq[NamedExpression], - child: SharkPlan) extends UnaryNode { +case class Aggregate( + groupingExpressions: Seq[Expression], + aggregateExpressions: Seq[NamedExpression], + child: SharkPlan) + extends UnaryNode { + + case class AverageFunction(expr: Expression, base: AggregateExpression) + extends AggregateFunction { - case class AverageFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { def this() = this(null, null) // Required for serialization. var count: Long = _ @@ -33,8 +37,9 @@ case class Aggregate(groupingExpressions: Seq[Expression], def apply(input: Seq[Row]): Unit = { val evaluatedExpr = expr.map(Evaluate(_, input)) - if(evaluatedExpr.map(_ != null).reduceLeft(_ || _)) - count += 1 + if (evaluatedExpr.map(_ != null).reduceLeft(_ || _)) { + count += 1 + } } def result: Any = count @@ -51,7 +56,9 @@ case class Aggregate(groupingExpressions: Seq[Expression], def result: Any = sum } - case class CountDistinctFunction(expr: Seq[Expression], base: AggregateExpression) extends AggregateFunction { + case class CountDistinctFunction(expr: Seq[Expression], base: AggregateExpression) + extends AggregateFunction { + def this() = this(null, null) // Required for serialization. val seen = new scala.collection.mutable.HashSet[Any]() @@ -79,47 +86,49 @@ case class Aggregate(groupingExpressions: Seq[Expression], def output = aggregateExpressions.map(_.toAttribute) def execute() = attachTree(this, "execute") { - val grouped = child.execute().map(row => (buildRow(groupingExpressions.map(Evaluate(_, Vector(row)))), row)).groupByKey() - grouped.map { - case (group, rows) => - // Replace all aggregate expressions with spark functions that will compute the result. - val aggImplementations = aggregateExpressions.map { agg => - val impl = agg transform { - case base @ Average(expr) => new AverageFunction(expr, base) - case base @ Sum(expr) => new SumFunction(expr, base) - case base @ Count(expr) => new CountFunction(expr, base) - case base @ CountDistinct(expr) => new CountDistinctFunction(expr, base) - case base @ First(expr) => new FirstFunction(expr, base) - } + val grouped = child.execute().map { row => + (buildRow(groupingExpressions.map(Evaluate(_, Vector(row)))), row) + }.groupByKey() + + grouped.map { case (group, rows) => + // Replace all aggregate expressions with spark functions that will compute the result. + val aggImplementations = aggregateExpressions.map { agg => + val impl = agg transform { + case base @ Average(expr) => new AverageFunction(expr, base) + case base @ Sum(expr) => new SumFunction(expr, base) + case base @ Count(expr) => new CountFunction(expr, base) + case base @ CountDistinct(expr) => new CountDistinctFunction(expr, base) + case base @ First(expr) => new FirstFunction(expr, base) + } + + val remainingAttributes = impl.collect { case a: Attribute => a } + // If any references exist that are not inside agg functions then the must be grouping exprs + // in this case we must rebind them to the grouping tuple. + if (remainingAttributes.nonEmpty) { + val unaliasedAggregateExpr = agg transform { case Alias(c, _) => c } - val remainingAttributes = impl.collect { case a: Attribute => a } - // If any references exist that are not inside agg functions then the must be grouping exprs in this case - // we must rebind them to the grouping tuple. - if(remainingAttributes.nonEmpty) { - val unaliasedAggregateExpr = agg transform { case Alias(c, _) => c } - - // An exact match with a grouping expression - val exactGroupingExpr = groupingExpressions.indexOf(unaliasedAggregateExpr) match { - case -1 => None - case ordinal => Some(BoundReference(0, ordinal, Alias(impl, "AGGEXPR")().toAttribute) - ) - } - - exactGroupingExpr.getOrElse(sys.error(s"$agg is not in grouping expressions: $groupingExpressions")) - } else { - impl + // An exact match with a grouping expression + val exactGroupingExpr = groupingExpressions.indexOf(unaliasedAggregateExpr) match { + case -1 => None + case ordinal => Some(BoundReference(0, ordinal, Alias(impl, "AGGEXPR")().toAttribute)) } + + exactGroupingExpr.getOrElse( + sys.error(s"$agg is not in grouping expressions: $groupingExpressions")) + } else { + impl } + } - // Pull out all the functions so we can feed each row into them. - val aggFunctions = aggImplementations.flatMap(_ collect { case f: AggregateFunction => f }) - assert(aggFunctions.nonEmpty) + // Pull out all the functions so we can feed each row into them. + val aggFunctions = aggImplementations.flatMap(_ collect { case f: AggregateFunction => f }) + assert(aggFunctions.nonEmpty) - rows.foreach { row => - val input = Vector(row) - aggFunctions.foreach(_.apply(input)) - } - buildRow(aggImplementations.map(Evaluate(_, Vector(group)))) + rows.foreach { row => + val input = Vector(row) + aggFunctions.foreach(_.apply(input)) + } + buildRow(aggImplementations.map(Evaluate(_, Vector(group)))) } } } diff --git a/src/main/scala/catalyst/execution/basicOperators.scala b/src/main/scala/catalyst/execution/basicOperators.scala index 4535ccb708772..e7146b4c9f48e 100644 --- a/src/main/scala/catalyst/execution/basicOperators.scala +++ b/src/main/scala/catalyst/execution/basicOperators.scala @@ -5,9 +5,9 @@ import errors._ import expressions._ import types._ - import org.apache.spark.SparkContext._ + case class Project(projectList: Seq[NamedExpression], child: SharkPlan) extends UnaryNode { def output = projectList.map(_.toAttribute) @@ -23,7 +23,8 @@ case class Filter(condition: Expression, child: SharkPlan) extends UnaryNode { } } -case class Union(left: SharkPlan, right: SharkPlan)(@transient sc: SharkContext) extends BinaryNode { +case class Union(left: SharkPlan, right: SharkPlan)(@transient sc: SharkContext) + extends BinaryNode { // TODO: attributes output by union should be distinct for nullability purposes def output = left.output // TODO: is it more efficient to union a bunch of rdds at once? should union be variadic? @@ -49,7 +50,7 @@ case class Sort(sortExprs: Seq[SortOrder], child: SharkPlan) extends UnaryNode { private class SortKey(val keyValues: IndexedSeq[Any]) extends Ordered[SortKey] with Serializable { def compare(other: SortKey): Int = { var i = 0 - while(i < keyValues.size) { + while (i < keyValues.size) { val left = keyValues(i) val right = other.keyValues(i) val curDirection = directions(i) @@ -58,35 +59,33 @@ case class Sort(sortExprs: Seq[SortOrder], child: SharkPlan) extends UnaryNode { logger.debug(s"Comparing $left, $right as $curDataType order $curDirection") // TODO: Use numeric here too? val comparison = - if(left == null && right == null) + if (left == null && right == null) { 0 - else if(left == null) - if(curDirection == Ascending) - -1 - else - 1 - else if(right == null) - if(curDirection == Ascending) - 1 - else - -1 - else if(curDataType == IntegerType) - if(curDirection == Ascending) + } else if (left == null) { + if (curDirection == Ascending) -1 else 1 + } else if (right == null) { + if (curDirection == Ascending) 1 else -1 + } else if (curDataType == IntegerType) { + if (curDirection == Ascending) { left.asInstanceOf[Int] compare right.asInstanceOf[Int] - else + } else { right.asInstanceOf[Int] compare left.asInstanceOf[Int] - else if(curDataType == DoubleType) - if(curDirection == Ascending) + } + } else if (curDataType == DoubleType) { + if (curDirection == Ascending) { left.asInstanceOf[Double] compare right.asInstanceOf[Double] - else + } else { right.asInstanceOf[Double] compare left.asInstanceOf[Double] - else if(curDataType == StringType) - if(curDirection == Ascending) + } + } else if (curDataType == StringType) { + if (curDirection == Ascending) { left.asInstanceOf[String] compare right.asInstanceOf[String] - else + } else { right.asInstanceOf[String] compare left.asInstanceOf[String] - else + } + } else { sys.error(s"Comparison not yet implemented for: $curDataType") + } if(comparison != 0) return comparison i += 1 @@ -102,7 +101,7 @@ case class Sort(sortExprs: Seq[SortOrder], child: SharkPlan) extends UnaryNode { val sortKey = new SortKey(sortExprs.map(s => Evaluate(s.child, input)).toIndexedSeq) (sortKey, row) - }.sortByKey(true, numPartitions).map(_._2) + }.sortByKey(ascending = true, numPartitions).map(_._2) } def output = child.output diff --git a/src/main/scala/catalyst/execution/hiveOperators.scala b/src/main/scala/catalyst/execution/hiveOperators.scala index c3c490e611276..9827fae3bf4d7 100644 --- a/src/main/scala/catalyst/execution/hiveOperators.scala +++ b/src/main/scala/catalyst/execution/hiveOperators.scala @@ -4,16 +4,16 @@ package execution import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils import org.apache.hadoop.hive.ql.plan.FileSinkDesc +import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.hive.serde2.objectinspector.{PrimitiveObjectInspector, StructObjectInspector} import org.apache.hadoop.hive.serde2.`lazy`.LazyStruct -import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.mapred.JobConf import expressions.Attribute import util._ /* Implicits */ -import collection.JavaConversions._ +import scala.collection.JavaConversions._ case class HiveTableScan(attributes: Seq[Attribute], relation: MetastoreRelation) extends LeafNode { @transient @@ -31,20 +31,21 @@ case class HiveTableScan(attributes: Seq[Attribute], relation: MetastoreRelation * Functions that extract the requested attributes from the hive output. */ @transient - protected lazy val attributeFunctions: Seq[(LazyStruct, Array[String]) => AnyRef] = attributes.map { a => - if (relation.partitionKeys.contains(a)) { - val ordinal = relation.partitionKeys.indexOf(a) - (struct: LazyStruct, partitionKeys: Array[String]) => partitionKeys(ordinal) - } else { - val ref = - objectInspector.getAllStructFieldRefs - .find(_.getFieldName == a.name) - .getOrElse(sys.error(s"Can't find attribute $a")) - - (struct: LazyStruct, _: Array[String]) => { - val data = objectInspector.getStructFieldData(struct, ref) - val inspector = ref.getFieldObjectInspector.asInstanceOf[PrimitiveObjectInspector] - inspector.getPrimitiveJavaObject(data) + protected lazy val attributeFunctions: Seq[(LazyStruct, Array[String]) => AnyRef] = { + attributes.map { a => + if (relation.partitionKeys.contains(a)) { + val ordinal = relation.partitionKeys.indexOf(a) + (struct: LazyStruct, partitionKeys: Array[String]) => partitionKeys(ordinal) + } else { + val ref = objectInspector.getAllStructFieldRefs + .find(_.getFieldName == a.name) + .getOrElse(sys.error(s"Can't find attribute $a")) + + (struct: LazyStruct, _: Array[String]) => { + val data = objectInspector.getStructFieldData(struct, ref) + val inspector = ref.getFieldObjectInspector.asInstanceOf[PrimitiveObjectInspector] + inspector.getPrimitiveJavaObject(data) + } } } } @@ -75,31 +76,37 @@ case class HiveTableScan(attributes: Seq[Attribute], relation: MetastoreRelation def output = attributes } -case class InsertIntoHiveTable(table: MetastoreRelation, partition: Map[String, String], child: SharkPlan) - (@transient sc: SharkContext) extends UnaryNode { +case class InsertIntoHiveTable( + table: MetastoreRelation, partition: Map[String, String], child: SharkPlan) + (@transient sc: SharkContext) + extends UnaryNode { + /** - * This file sink / record writer code is only the first step towards implementing this operator correctly and is not - * actually used yet. + * This file sink / record writer code is only the first step towards implementing this operator + * correctly and is not actually used yet. */ val desc = new FileSinkDesc("./", table.tableDesc, false) + val outputClass = { val serializer = table.tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] serializer.initialize(null, table.tableDesc.getProperties) serializer.getSerializedClass } - lazy val conf = new JobConf(); + lazy val conf = new JobConf() + lazy val writer = HiveFileFormatUtils.getHiveRecordWriter( conf, table.tableDesc, outputClass, desc, - new Path((new org.apache.hadoop.fs.RawLocalFileSystem).getWorkingDirectory(), "test.out"), + new Path((new org.apache.hadoop.fs.RawLocalFileSystem).getWorkingDirectory, "test.out"), null) override def otherCopyArgs = sc :: Nil def output = child.output + def execute() = { val childRdd = child.execute() assert(childRdd != null) @@ -108,12 +115,16 @@ case class InsertIntoHiveTable(table: MetastoreRelation, partition: Map[String, val tempDir = java.io.File.createTempFile("data", "tsv") tempDir.delete() tempDir.mkdir() - childRdd.map(_.map(a => stringOrNull(a.asInstanceOf[AnyRef])).mkString("\001")).saveAsTextFile(tempDir.getCanonicalPath) + childRdd.map(_.map(a => stringOrNull(a.asInstanceOf[AnyRef])).mkString("\001")) + .saveAsTextFile(tempDir.getCanonicalPath) + val partitionSpec = - if(partition.nonEmpty) + if (partition.nonEmpty) { s"PARTITION (${partition.map { case (k,v) => s"$k=$v" }.mkString(",")})" - else + } else { "" + } + sc.runHive(s"LOAD DATA LOCAL INPATH '${tempDir.getCanonicalPath}/*' INTO TABLE ${table.tableName} $partitionSpec") // It would be nice to just return the childRdd unchanged so insert operations could be chained, diff --git a/src/main/scala/catalyst/execution/joins.scala b/src/main/scala/catalyst/execution/joins.scala index 6fbb8a9893c4b..609b2ea7646ca 100644 --- a/src/main/scala/catalyst/execution/joins.scala +++ b/src/main/scala/catalyst/execution/joins.scala @@ -16,7 +16,8 @@ case class SparkEquiInnerJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], left: SharkPlan, - right: SharkPlan) extends BinaryNode { + right: SharkPlan) + extends BinaryNode { def output = left.output ++ right.output @@ -57,9 +58,13 @@ case class CartesianProduct(left: SharkPlan, right: SharkPlan) extends BinaryNod } } -case class BroadcastNestedLoopJoin(streamed: SharkPlan, broadcast: SharkPlan, joinType: JoinType, condition: Option[Expression]) - (@transient sc: SharkContext) extends BinaryNode { +case class BroadcastNestedLoopJoin( + streamed: SharkPlan, broadcast: SharkPlan, joinType: JoinType, condition: Option[Expression]) + (@transient sc: SharkContext) + extends BinaryNode { + override def otherCopyArgs = sc :: Nil + def output = left.output ++ right.output /** The Streamed Relation */ @@ -73,27 +78,28 @@ case class BroadcastNestedLoopJoin(streamed: SharkPlan, broadcast: SharkPlan, jo val streamedPlusMatches = streamed.execute().map { streamedRow => var i = 0 val matchedRows = new mutable.ArrayBuffer[Row] - val includedBroadcastTuples = new scala.collection.mutable.BitSet(broadcastedRelation.value.size) + val includedBroadcastTuples = new mutable.BitSet(broadcastedRelation.value.size) - while(i < broadcastedRelation.value.size) { + while (i < broadcastedRelation.value.size) { // TODO: One bitset per partition instead of per row. val broadcastedRow = broadcastedRelation.value(i) val includeRow = condition match { case None => true case Some(c) => Evaluate(c, Vector(streamedRow, broadcastedRow)).asInstanceOf[Boolean] } - if(includeRow) { + if (includeRow) { matchedRows += buildRow(streamedRow ++ broadcastedRow) includedBroadcastTuples += i } i += 1 } - val outputRows = if(matchedRows.size > 0) + val outputRows = if (matchedRows.size > 0) { matchedRows - else if(joinType == LeftOuter || joinType == FullOuter) + } else if(joinType == LeftOuter || joinType == FullOuter) { Vector(buildRow(streamedRow ++ Array.fill(right.output.size)(null))) - else + } else { Vector() + } (outputRows, includedBroadcastTuples) } @@ -105,14 +111,15 @@ case class BroadcastNestedLoopJoin(streamed: SharkPlan, broadcast: SharkPlan, jo streamedPlusMatches.map(_._2).reduce(_ ++ _) val rightOuterMatches: Seq[Row] = - if(joinType == RightOuter || joinType == FullOuter) + if (joinType == RightOuter || joinType == FullOuter) { broadcastedRelation.value.zipWithIndex.filter { case (row, i) => !allIncludedBroadcastTuples.contains(i) }.map { case (row, _) => buildRow(Vector.fill(left.output.size)(null) ++ row) } - else + } else { Vector() + } sc.union(streamedPlusMatches.flatMap(_._1), sc.makeRDD(rightOuterMatches)) } diff --git a/src/main/scala/catalyst/execution/package.scala b/src/main/scala/catalyst/execution/package.scala index 2a073f3e49558..c05bdac4881fd 100644 --- a/src/main/scala/catalyst/execution/package.scala +++ b/src/main/scala/catalyst/execution/package.scala @@ -3,13 +3,13 @@ package catalyst import types._ /** - * A partial reimplementation of Shark, a Hive compatible SQL engine running on Spark, using Catalyst. + * A partial reimplementation of Shark, a Hive compatible SQL engine running on Spark. * - * This implementation uses the hive parser, metadata catalog and serdes, but performs all optimization and execution - * using catalyst and spark. + * This implementation uses the hive parser, metadata catalog and serdes, but performs all + * optimization and execution using catalyst and spark. * - * Currently functions that are not supported by this implementation are passed back to the original Shark - * implementation for execution. + * Currently functions that are not supported by this implementation are passed back to the + * original Shark implementation for execution. */ package object execution { type Row = catalyst.expressions.Row diff --git a/src/main/scala/catalyst/execution/planningStrategies.scala b/src/main/scala/catalyst/execution/planningStrategies.scala index 2615dfff5108b..5211feb45ee4a 100644 --- a/src/main/scala/catalyst/execution/planningStrategies.scala +++ b/src/main/scala/catalyst/execution/planningStrategies.scala @@ -49,7 +49,9 @@ trait PlanningStrategies { classOf[Count], classOf[Average]) - /** Returns true if [[exprs]] contains only aggregates that can be computed using Accumulators. */ + /** + * Returns true if [[exprs]] contains only aggregates that can be computed using Accumulators. + */ def onlyAllowedAggregates(exprs: Seq[Expression]): Boolean = { val aggs = exprs.flatMap(_.collect { case a: AggregateExpression => a}).map(_.getClass) aggs.map(allowedAggregates contains _).reduceLeft(_ && _) @@ -66,9 +68,9 @@ trait PlanningStrategies { def apply(plan: LogicalPlan): Seq[SharkPlan] = plan match { case FilteredOperation(predicates, logical.Join(left, right, Inner, condition)) => logger.debug(s"Considering join: ${predicates ++ condition}") - // Find equi-join predicates that can be evaluated before the join, and thus can be used as join keys. - // Note we can only mix in the conditions with other predicates because the match above ensures that this is - // and Inner join. + // Find equi-join predicates that can be evaluated before the join, and thus can be used + // as join keys. Note we can only mix in the conditions with other predicates because the + // match above ensures that this is and Inner join. val (joinPredicates, otherPredicates) = (predicates ++ condition).partition { case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) || (canEvaluate(l, right) && canEvaluate(r, left)) => true @@ -81,17 +83,19 @@ trait PlanningStrategies { } // Do not consider this strategy if there are no join keys. - if(joinKeys.nonEmpty) { + if (joinKeys.nonEmpty) { val leftKeys = joinKeys.map(_._1) val rightKeys = joinKeys.map(_._2) - val joinOp = execution.SparkEquiInnerJoin(leftKeys, rightKeys, planLater(left), planLater(right)) + val joinOp = execution.SparkEquiInnerJoin( + leftKeys, rightKeys, planLater(left), planLater(right)) // Make sure other conditions are met if present. - if(otherPredicates.nonEmpty) + if (otherPredicates.nonEmpty) { execution.Filter(combineConjunctivePredicates(otherPredicates), joinOp) :: Nil - else + } else { joinOp :: Nil + } } else { logger.debug(s"Avoiding spark join with no join keys.") Nil @@ -110,14 +114,16 @@ trait PlanningStrategies { object BroadcastNestedLoopJoin extends Strategy { def apply(plan: LogicalPlan): Seq[SharkPlan] = plan match { case logical.Join(left, right, joinType, condition) => - execution.BroadcastNestedLoopJoin(planLater(left), planLater(right), joinType, condition)(sc) :: Nil + execution.BroadcastNestedLoopJoin( + planLater(left), planLater(right), joinType, condition)(sc) :: Nil case _ => Nil } } object CartesianProduct extends Strategy { def apply(plan: LogicalPlan): Seq[SharkPlan] = plan match { - case logical.Join(left, right, _, None) => execution.CartesianProduct(planLater(left), planLater(right)) :: Nil + case logical.Join(left, right, _, None) => + execution.CartesianProduct(planLater(left), planLater(right)) :: Nil case logical.Join(left, right, Inner, Some(condition)) => execution.Filter(condition, execution.CartesianProduct(planLater(left), planLater(right))) :: Nil diff --git a/src/main/scala/catalyst/expressions/Evaluate.scala b/src/main/scala/catalyst/expressions/Evaluate.scala index 2a7427cfea7a7..314027fccaede 100644 --- a/src/main/scala/catalyst/expressions/Evaluate.scala +++ b/src/main/scala/catalyst/expressions/Evaluate.scala @@ -252,7 +252,7 @@ object Evaluate extends Logging { } /* Functions */ - case Rand => scala.util.Random.nextDouble + case Rand => scala.util.Random.nextDouble() /* UDFs */ case implementedFunction: ImplementedUdf => diff --git a/src/main/scala/catalyst/frontend/package.scala b/src/main/scala/catalyst/frontend/package.scala index 70952d1bedade..4d70c4b420d98 100644 --- a/src/main/scala/catalyst/frontend/package.scala +++ b/src/main/scala/catalyst/frontend/package.scala @@ -1,7 +1,7 @@ package catalyst /** - * A collection of languages that can be parsed into a [[plans.logical.LogicalPlan]]. Currently only [[hive]]'s parser is - * supported. + * A collection of languages that can be parsed into a [[catalyst.plans.logical.LogicalPlan]]. + * Currently only [[hive]]'s parser is supported. */ -package object frontend \ No newline at end of file +package object frontend diff --git a/src/main/scala/catalyst/plans/logical/basicOperators.scala b/src/main/scala/catalyst/plans/logical/basicOperators.scala index 5b62eddfc8cdf..224e44d857051 100644 --- a/src/main/scala/catalyst/plans/logical/basicOperators.scala +++ b/src/main/scala/catalyst/plans/logical/basicOperators.scala @@ -30,7 +30,8 @@ case class Join( def output = left.output ++ right.output } -case class InsertIntoTable(table: BaseRelation, partition: Map[String, String], child: LogicalPlan) extends LogicalPlan { +case class InsertIntoTable(table: BaseRelation, partition: Map[String, String], child: LogicalPlan) + extends LogicalPlan { // The table being inserted into is a child for the purposes of transformations. def children = table :: child :: Nil def references = Set.empty @@ -50,7 +51,8 @@ case class Sort(order: Seq[SortOrder], child: LogicalPlan) extends UnaryNode { case class Aggregate( groupingExpressions: Seq[Expression], aggregateExpressions: Seq[NamedExpression], - child: LogicalPlan) extends UnaryNode { + child: LogicalPlan) + extends UnaryNode { def output = aggregateExpressions.map(_.toAttribute) def references = child.references From b367e368544f7f4b03f228e120edc8bfbd9cdffe Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 3 Jan 2014 14:00:22 -0800 Subject: [PATCH 6/8] Replaced the use of ??? with UnsupportedOperationException. --- src/main/scala/catalyst/expressions/Row.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/scala/catalyst/expressions/Row.scala b/src/main/scala/catalyst/expressions/Row.scala index 7a39508d973be..edff359d7fca4 100644 --- a/src/main/scala/catalyst/expressions/Row.scala +++ b/src/main/scala/catalyst/expressions/Row.scala @@ -25,18 +25,18 @@ abstract class Row extends Seq[Any] with Serializable { * A row with no data. Calling any methods will result in an error. Can be used as a placeholder. */ object EmptyRow extends Row { - def apply(i: Int): Any = ??? + def apply(i: Int): Any = throw new UnsupportedOperationException def iterator = Iterator.empty def length = 0 - def isNullAt(i: Int): Boolean = ??? + def isNullAt(i: Int): Boolean = throw new UnsupportedOperationException - def getInt(i: Int): Int = ??? - def getLong(i: Int): Long = ??? - def getDouble(i: Int): Double = ??? - def getBoolean(i: Int): Boolean = ??? - def getShort(i: Int): Short = ??? - def getByte(i: Int): Byte = ??? + def getInt(i: Int): Int = throw new UnsupportedOperationException + def getLong(i: Int): Long = throw new UnsupportedOperationException + def getDouble(i: Int): Double = throw new UnsupportedOperationException + def getBoolean(i: Int): Boolean = throw new UnsupportedOperationException + def getShort(i: Int): Short = throw new UnsupportedOperationException + def getByte(i: Int): Byte = throw new UnsupportedOperationException } /** From f0e1cbf0d3a19f68a7443bcbecaae086bc884f29 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 3 Jan 2014 14:01:57 -0800 Subject: [PATCH 7/8] Added resolved lazy val to LogicalPlan. --- .../scala/catalyst/analysis/unresolved.scala | 6 ++++-- .../catalyst/execution/MetastoreCatalog.scala | 1 + .../catalyst/expressions/Expression.scala | 12 +++++++++--- .../catalyst/expressions/nullFunctions.scala | 17 ++++++++++------- src/main/scala/catalyst/plans/QueryPlan.scala | 6 +++++- .../catalyst/plans/logical/LogicalPlan.scala | 18 ++++++++++++------ 6 files changed, 41 insertions(+), 19 deletions(-) diff --git a/src/main/scala/catalyst/analysis/unresolved.scala b/src/main/scala/catalyst/analysis/unresolved.scala index 0d4c45dfaa3b2..80b18c281ec14 100644 --- a/src/main/scala/catalyst/analysis/unresolved.scala +++ b/src/main/scala/catalyst/analysis/unresolved.scala @@ -6,7 +6,8 @@ import plans.logical.BaseRelation import trees.TreeNode /** - * Thrown when an invalid attempt is made to access a property of a tree that has yet to be fully resolved. + * Thrown when an invalid attempt is made to access a property of a tree that has yet to be fully + * resolved. */ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: String) extends errors.OptimizationException(tree, s"Invalid call to $function on unresolved object") @@ -16,6 +17,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str */ case class UnresolvedRelation(name: String, alias: Option[String] = None) extends BaseRelation { def output = Nil + override lazy val resolved = false } /** @@ -82,4 +84,4 @@ case class Star( } override def toString = table.map(_ + ".").getOrElse("") + "*" -} \ No newline at end of file +} diff --git a/src/main/scala/catalyst/execution/MetastoreCatalog.scala b/src/main/scala/catalyst/execution/MetastoreCatalog.scala index 33f1abeaaed72..b18689614b242 100644 --- a/src/main/scala/catalyst/execution/MetastoreCatalog.scala +++ b/src/main/scala/catalyst/execution/MetastoreCatalog.scala @@ -28,6 +28,7 @@ class HiveMetastoreCatalog(hiveConf: HiveConf) extends Catalog { val hiveQlTable = new org.apache.hadoop.hive.ql.metadata.Table(table) val partitions = if (hiveQlTable.isPartitioned) { + // TODO: Is 255 the right number to pick? client.listPartitions(databaseName, tableName, 255).toSeq } else { Nil diff --git a/src/main/scala/catalyst/expressions/Expression.scala b/src/main/scala/catalyst/expressions/Expression.scala index db2c95aecd673..eaed3c9a494d8 100644 --- a/src/main/scala/catalyst/expressions/Expression.scala +++ b/src/main/scala/catalyst/expressions/Expression.scala @@ -12,12 +12,17 @@ abstract class Expression extends TreeNode[Expression] { def references: Set[Attribute] /** - * Returns true if this expression and all its children have been resolved to a specific schema and false if it is - * still contains any unresolved placeholders. + * Returns true if this expression and all its children have been resolved to a specific schema + * and false if it is still contains any unresolved placeholders. Implementations of expressions + * should override this. */ lazy val resolved: Boolean = childrenResolved - def childrenResolved = children.map(_.resolved).reduceLeftOption(_&&_).getOrElse(true) + /** + * Returns true if all the children of this expression have been resolved to a specific schema + * and false if it is still contains any unresolved placeholders. + */ + def childrenResolved = !children.exists(!_.resolved) } abstract class BinaryExpression extends Expression with trees.BinaryNode[Expression] { @@ -36,5 +41,6 @@ abstract class LeafExpression extends Expression with trees.LeafNode[Expression] abstract class UnaryExpression extends Expression with trees.UnaryNode[Expression] { self: Product => + def references = child.references } diff --git a/src/main/scala/catalyst/expressions/nullFunctions.scala b/src/main/scala/catalyst/expressions/nullFunctions.scala index f9308ab0c5c67..cab75526539f4 100644 --- a/src/main/scala/catalyst/expressions/nullFunctions.scala +++ b/src/main/scala/catalyst/expressions/nullFunctions.scala @@ -4,7 +4,10 @@ package expressions import catalyst.analysis.UnresolvedException case class Coalesce(children: Seq[Expression]) extends Expression { - def nullable = children.map(_.nullable).reduce(_&&_) + + /** Coalesce is nullable if all of its children are nullable, or if it has no children. */ + def nullable = !children.exists(!_.nullable) + def references = children.flatMap(_.references).toSet // Only resolved if all the children are of the same type. @@ -12,9 +15,9 @@ case class Coalesce(children: Seq[Expression]) extends Expression { override def toString = s"Coalesce(${children.mkString(",")})" - def dataType = - if(resolved) - children.head.dataType - else - throw new UnresolvedException(this, "Coalesce cannot have children of different types.") -} \ No newline at end of file + def dataType = if (resolved) { + children.head.dataType + } else { + throw new UnresolvedException(this, "Coalesce cannot have children of different types.") + } +} diff --git a/src/main/scala/catalyst/plans/QueryPlan.scala b/src/main/scala/catalyst/plans/QueryPlan.scala index c099a777ef4aa..54d54e42164aa 100644 --- a/src/main/scala/catalyst/plans/QueryPlan.scala +++ b/src/main/scala/catalyst/plans/QueryPlan.scala @@ -9,10 +9,14 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy def output: Seq[Attribute] + /** + * Returns the set of attributes that are output by this node. + */ + def outputSet: Set[Attribute] = output.toSet + /** * Runs [[transform]] with [[rule]] on all expressions present in this query operator. * @param rule the rule to be applied to every expression in this operator. - * @return */ def transformExpressions(rule: PartialFunction[Expression, Expression]): this.type = { var changed = false diff --git a/src/main/scala/catalyst/plans/logical/LogicalPlan.scala b/src/main/scala/catalyst/plans/logical/LogicalPlan.scala index fdd1428601cf6..8f6dca8a80a59 100644 --- a/src/main/scala/catalyst/plans/logical/LogicalPlan.scala +++ b/src/main/scala/catalyst/plans/logical/LogicalPlan.scala @@ -4,7 +4,6 @@ package logical import expressions.Attribute import errors._ -import trees._ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { self: Product => @@ -15,17 +14,24 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { */ def references: Set[Attribute] - /** - * Returns the set of attributes that are output by this node. - */ - def outputSet: Set[Attribute] = output.toSet - /** * Returns the set of attributes that this node takes as * input from its children. */ def inputSet: Set[Attribute] = children.flatMap(_.output).toSet + /** + * Returns true if this expression and all its children have been resolved to a specific schema + * and false if it is still contains any unresolved placeholders. Implementations of LogicalPlan + * can override this (e.g. [[UnresolvedRelation]] can set this to false). + */ + lazy val resolved: Boolean = !expressions.exists(!_.resolved) && childrenResolved + + /** + * Returns true if all its children of this query plan have been resolved. + */ + def childrenResolved = !children.exists(!_.resolved) + def resolve(name: String): Option[Attribute] = { val parts = name.split("\\.") val options = children.flatMap(_.output).filter { option => From 52864da52a647599c8d2b6c6ba411675b3a295f4 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 3 Jan 2014 14:08:35 -0800 Subject: [PATCH 8/8] Added executeCollect method to SharkPlan. --- src/main/scala/catalyst/execution/SharkPlan.scala | 8 +++++++- src/main/scala/catalyst/execution/aggregates.scala | 8 +++++--- src/main/scala/catalyst/execution/basicOperators.scala | 6 +++++- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/main/scala/catalyst/execution/SharkPlan.scala b/src/main/scala/catalyst/execution/SharkPlan.scala index 0f74b2147104d..6116d8397b090 100644 --- a/src/main/scala/catalyst/execution/SharkPlan.scala +++ b/src/main/scala/catalyst/execution/SharkPlan.scala @@ -1,9 +1,10 @@ package catalyst package execution -import catalyst.plans.QueryPlan import org.apache.spark.rdd.RDD +import catalyst.plans.QueryPlan + abstract class SharkPlan extends QueryPlan[SharkPlan] with Logging { self: Product => @@ -12,6 +13,11 @@ abstract class SharkPlan extends QueryPlan[SharkPlan] with Logging { */ def execute(): RDD[Row] + /** + * Runs this query returning the result as an array. + */ + def executeCollect(): Array[Row] = execute().collect() + protected def buildRow(values: Seq[Any]): Row = new catalyst.expressions.GenericRow(values) } diff --git a/src/main/scala/catalyst/execution/aggregates.scala b/src/main/scala/catalyst/execution/aggregates.scala index bd2a38eb0903e..3e5e65f526c7c 100644 --- a/src/main/scala/catalyst/execution/aggregates.scala +++ b/src/main/scala/catalyst/execution/aggregates.scala @@ -169,7 +169,7 @@ case class SparkAggregate(aggregateExprs: Seq[NamedExpression], child: SharkPlan def result: Any = count.value.toLong } - def execute() = attachTree(this, "SparkAggregate") { + override def executeCollect() = attachTree(this, "SparkAggregate") { // Replace all aggregate expressions with spark functions that will compute the result. val aggImplementations = aggregateExprs.map { _ transform { case base @ Average(expr) => new AverageFunction(expr, base) @@ -185,6 +185,8 @@ case class SparkAggregate(aggregateExprs: Seq[NamedExpression], child: SharkPlan val input = Vector(row) aggFunctions.foreach(_.apply(input)) } - sc.makeRDD(Seq(buildRow(aggImplementations.map(Evaluate(_, Nil)))), 1) + Array(buildRow(aggImplementations.map(Evaluate(_, Nil)))) } -} \ No newline at end of file + + def execute() = sc.makeRDD(executeCollect(), 1) +} diff --git a/src/main/scala/catalyst/execution/basicOperators.scala b/src/main/scala/catalyst/execution/basicOperators.scala index e7146b4c9f48e..2dee6b01e2fc5 100644 --- a/src/main/scala/catalyst/execution/basicOperators.scala +++ b/src/main/scala/catalyst/execution/basicOperators.scala @@ -37,8 +37,12 @@ case class StopAfter(limit: Int, child: SharkPlan)(@transient sc: SharkContext) override def otherCopyArgs = sc :: Nil def output = child.output + + override def executeCollect() = child.execute().take(limit) + + // TODO: Terminal split should be implemented differently from non-terminal split. // TODO: Pick num splits based on |limit|. - def execute() = sc.makeRDD(child.execute().take(limit),1) + def execute() = sc.makeRDD(executeCollect(), 1) } case class Sort(sortExprs: Seq[SortOrder], child: SharkPlan) extends UnaryNode {