Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into equals
Browse files Browse the repository at this point in the history
Conflicts:
	sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
  • Loading branch information
rxin committed Jun 20, 2014
2 parents bd19807 + f479cf3 commit 326b388
Show file tree
Hide file tree
Showing 23 changed files with 2,409 additions and 427 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,43 +66,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected case class Keyword(str: String)

protected implicit def asParser(k: Keyword): Parser[String] =
allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)

protected class SqlLexical extends StdLexical {
case class FloatLit(chars: String) extends Token {
override def toString = chars
}
override lazy val token: Parser[Token] = (
identChar ~ rep( identChar | digit ) ^^
{ case first ~ rest => processIdent(first :: rest mkString "") }
| rep1(digit) ~ opt('.' ~> rep(digit)) ^^ {
case i ~ None => NumericLit(i mkString "")
case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString(""))
}
| '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^
{ case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") }
| '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^
{ case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") }
| EofCh ^^^ EOF
| '\'' ~> failure("unclosed string literal")
| '\"' ~> failure("unclosed string literal")
| delim
| failure("illegal character")
)

override def identChar = letter | elem('.') | elem('_')

override def whitespace: Parser[Any] = rep(
whitespaceChar
| '/' ~ '*' ~ comment
| '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') )
| '#' ~ rep( chrExcept(EofCh, '\n') )
| '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') )
| '/' ~ '*' ~ failure("unclosed comment")
)
}

override val lexical = new SqlLexical
lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)

protected val ALL = Keyword("ALL")
protected val AND = Keyword("AND")
Expand Down Expand Up @@ -161,24 +125,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
this.getClass
.getMethods
.filter(_.getReturnType == classOf[Keyword])
.map(_.invoke(this).asInstanceOf[Keyword])

/** Generate all variations of upper and lower case of a given string */
private def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
if (s == "") {
Stream(prefix)
} else {
allCaseVersions(s.tail, prefix + s.head.toLower) ++
allCaseVersions(s.tail, prefix + s.head.toUpper)
}
}
.map(_.invoke(this).asInstanceOf[Keyword].str)

lexical.reserved ++= reservedWords.flatMap(w => allCaseVersions(w.str))

lexical.delimiters += (
"@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
",", ";", "%", "{", "}", ":", "[", "]"
)
override val lexical = new SqlLexical(reservedWords)

protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = {
exprs.zipWithIndex.map {
Expand Down Expand Up @@ -383,7 +332,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)

protected lazy val baseExpression: PackratParser[Expression] =
expression ~ "[" ~ expression <~ "]" ^^ {
expression ~ "[" ~ expression <~ "]" ^^ {
case base ~ _ ~ ordinal => GetItem(base, ordinal)
} |
TRUE ^^^ Literal(true, BooleanType) |
Expand All @@ -399,3 +348,55 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected lazy val dataType: Parser[DataType] =
STRING ^^^ StringType
}

class SqlLexical(val keywords: Seq[String]) extends StdLexical {
case class FloatLit(chars: String) extends Token {
override def toString = chars
}

reserved ++= keywords.flatMap(w => allCaseVersions(w))

delimiters += (
"@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
",", ";", "%", "{", "}", ":", "[", "]"
)

override lazy val token: Parser[Token] = (
identChar ~ rep( identChar | digit ) ^^
{ case first ~ rest => processIdent(first :: rest mkString "") }
| rep1(digit) ~ opt('.' ~> rep(digit)) ^^ {
case i ~ None => NumericLit(i mkString "")
case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString(""))
}
| '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^
{ case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") }
| '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^
{ case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") }
| EofCh ^^^ EOF
| '\'' ~> failure("unclosed string literal")
| '\"' ~> failure("unclosed string literal")
| delim
| failure("illegal character")
)

override def identChar = letter | elem('_') | elem('.')

override def whitespace: Parser[Any] = rep(
whitespaceChar
| '/' ~ '*' ~ comment
| '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') )
| '#' ~ rep( chrExcept(EofCh, '\n') )
| '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') )
| '/' ~ '*' ~ failure("unclosed comment")
)

/** Generate all variations of upper and lower case of a given string */
def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
if (s == "") {
Stream(prefix)
} else {
allCaseVersions(s.tail, prefix + s.head.toLower) ++
allCaseVersions(s.tail, prefix + s.head.toUpper)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ case class GetItem(child: Expression, ordinal: Expression) extends Expression {
null
} else {
if (child.dataType.isInstanceOf[ArrayType]) {
// TODO: consider using Array[_] for ArrayType child to avoid
// boxing of primitives
val baseValue = value.asInstanceOf[Seq[_]]
val o = key.asInstanceOf[Int]
if (o >= baseValue.size || o < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,19 @@ case class ExplainCommand(plan: LogicalPlan) extends Command {
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
*/
case class CacheCommand(tableName: String, doCache: Boolean) extends Command

/**
* Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
* @param table The table to be described.
* @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
* It is effective only when the table is a Hive table.
*/
case class DescribeCommand(
table: LogicalPlan,
isExtended: Boolean) extends Command {
override def output = Seq(
// Column names are based on Hive.
BoundReference(0, AttributeReference("col_name", StringType, nullable = false)()),
BoundReference(1, AttributeReference("data_type", StringType, nullable = false)()),
BoundReference(2, AttributeReference("comment", StringType, nullable = false)()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,108 @@ package org.apache.spark.sql.catalyst.types

import java.sql.Timestamp

import scala.reflect.runtime.universe.{typeTag, TypeTag}
import scala.util.parsing.combinator.RegexParsers

import org.apache.spark.sql.catalyst.expressions.Expression
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.{typeTag, TypeTag, runtimeMirror}

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.util.Utils

/**
*
*/
object DataType extends RegexParsers {
protected lazy val primitiveType: Parser[DataType] =
"StringType" ^^^ StringType |
"FloatType" ^^^ FloatType |
"IntegerType" ^^^ IntegerType |
"ByteType" ^^^ ByteType |
"ShortType" ^^^ ShortType |
"DoubleType" ^^^ DoubleType |
"LongType" ^^^ LongType |
"BinaryType" ^^^ BinaryType |
"BooleanType" ^^^ BooleanType |
"DecimalType" ^^^ DecimalType |
"TimestampType" ^^^ TimestampType

protected lazy val arrayType: Parser[DataType] =
"ArrayType" ~> "(" ~> dataType <~ ")" ^^ ArrayType

protected lazy val mapType: Parser[DataType] =
"MapType" ~> "(" ~> dataType ~ "," ~ dataType <~ ")" ^^ {
case t1 ~ _ ~ t2 => MapType(t1, t2)
}

protected lazy val structField: Parser[StructField] =
("StructField(" ~> "[a-zA-Z0-9_]*".r) ~ ("," ~> dataType) ~ ("," ~> boolVal <~ ")") ^^ {
case name ~ tpe ~ nullable =>
StructField(name, tpe, nullable = nullable)
}

protected lazy val boolVal: Parser[Boolean] =
"true" ^^^ true |
"false" ^^^ false


protected lazy val structType: Parser[DataType] =
"StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ {
case fields => new StructType(fields)
}

protected lazy val dataType: Parser[DataType] =
arrayType |
mapType |
structType |
primitiveType

/**
* Parses a string representation of a DataType.
*
* TODO: Generate parser as pickler...
*/
def apply(asString: String): DataType = parseAll(dataType, asString) match {
case Success(result, _) => result
case failure: NoSuccess => sys.error(s"Unsupported dataType: $asString, $failure")
}
}

abstract class DataType {
/** Matches any expression that evaluates to this DataType */
def unapply(a: Expression): Boolean = a match {
case e: Expression if e.dataType == this => true
case _ => false
}

def isPrimitive: Boolean = false
}

case object NullType extends DataType

trait PrimitiveType extends DataType {
override def isPrimitive = true
}

abstract class NativeType extends DataType {
type JvmType
@transient val tag: TypeTag[JvmType]
val ordering: Ordering[JvmType]

@transient val classTag = {
val mirror = runtimeMirror(Utils.getSparkClassLoader)
ClassTag[JvmType](mirror.runtimeClass(tag.tpe))
}
}

case object StringType extends NativeType {
case object StringType extends NativeType with PrimitiveType {
type JvmType = String
@transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
}
case object BinaryType extends DataType {
case object BinaryType extends DataType with PrimitiveType {
type JvmType = Array[Byte]
}
case object BooleanType extends NativeType {
case object BooleanType extends NativeType with PrimitiveType {
type JvmType = Boolean
@transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
Expand All @@ -63,7 +136,7 @@ case object TimestampType extends NativeType {
}
}

abstract class NumericType extends NativeType {
abstract class NumericType extends NativeType with PrimitiveType {
// Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
// implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
// type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
Expand Down Expand Up @@ -154,6 +227,17 @@ case object FloatType extends FractionalType {
case class ArrayType(elementType: DataType) extends DataType

case class StructField(name: String, dataType: DataType, nullable: Boolean)
case class StructType(fields: Seq[StructField]) extends DataType

object StructType {
def fromAttributes(attributes: Seq[Attribute]): StructType = {
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable)))
}

// def apply(fields: Seq[StructField]) = new StructType(fields.toIndexedSeq)
}

case class StructType(fields: Seq[StructField]) extends DataType {
def toAttributes = fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
}

case class MapType(keyType: DataType, valueType: DataType) extends DataType
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
def parquetFile(path: String): SchemaRDD =
new SchemaRDD(this, parquet.ParquetRelation(path))
new SchemaRDD(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration)))

/**
* Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ class JavaSQLContext(val sqlContext: SQLContext) {
* Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
*/
def parquetFile(path: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, ParquetRelation(path))
new JavaSchemaRDD(
sqlContext,
ParquetRelation(path, Some(sqlContext.sparkContext.hadoopConfiguration)))

/**
* Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.WriteToFile(path, child) =>
val relation =
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration)
InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
// Note: overwrite=false because otherwise the metadata we just created will be deleted
InsertIntoParquetTable(relation, planLater(child), overwrite=false)(sparkContext) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,24 @@ case class CacheCommand(tableName: String, doCache: Boolean)(@transient context:

override def output: Seq[Attribute] = Seq.empty
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with Command {

override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
Seq(("# Registered as a temporary table", null, null)) ++
child.output.map(field => (field.name, field.dataType.toString, null))
}

override def execute(): RDD[Row] = {
val rows = sideEffectResult.map {
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
}
context.sparkContext.parallelize(rows, 1)
}
}
Loading

0 comments on commit 326b388

Please sign in to comment.