Skip to content

Commit

Permalink
SPARK-1293 [SQL] Parquet support for nested types
Browse files Browse the repository at this point in the history
It should be possible to import and export data stored in Parquet's columnar format that contains nested types. For example:
```java
message AddressBook {
   required binary owner;
   optional group ownerPhoneNumbers {
      repeated binary array;
   }
   optional group contacts {
      repeated group array {
         required binary name;
         optional binary phoneNumber;
      }
   }
   optional group nameToApartmentNumber {
      repeated group map {
         required binary key;
         required int32 value;
      }
   }
}
```
The example could model a type (AddressBook) that contains records made of strings (owner), lists (ownerPhoneNumbers) and a table of contacts (e.g., a list of pairs or a map that can contain null values but keys must not be null). The list of tasks are as follows:

<h6>Implement support for converting nested Parquet types to Spark/Catalyst types:</h6>
- [x] Structs
- [x] Lists
- [x] Maps (note: currently keys need to be Strings)

<h6>Implement import (via ``parquetFile``) of nested Parquet types (first version in this PR)</h6>
- [x] Initial version

<h6>Implement export (via ``saveAsParquetFile``)</h6>
- [x] Initial version

<h6>Test support for AvroParquet, etc.</h6>
- [x] Initial testing of import of avro-generated Parquet data (simple + nested)

Example:
```scala
val data = TestSQLContext
  .parquetFile("input.dir")
  .toSchemaRDD
data.registerAsTable("data")
sql("SELECT owner, contacts[1].name, nameToApartmentNumber['John'] FROM data").collect()
```

Author: Andre Schumacher <[email protected]>
Author: Michael Armbrust <[email protected]>

Closes apache#360 from AndreSchumacher/nested_parquet and squashes the following commits:

30708c8 [Andre Schumacher] Taking out AvroParquet test for now to remove Avro dependency
95c1367 [Andre Schumacher] Changes to ParquetRelation and its metadata
7eceb67 [Andre Schumacher] Review feedback
94eea3a [Andre Schumacher] Scalastyle
403061f [Andre Schumacher] Fixing some issues with tests and schema metadata
b8a8b9a [Andre Schumacher] More fixes to short and byte conversion
63d1b57 [Andre Schumacher] Cleaning up and Scalastyle
88e6bdb [Andre Schumacher] Attempting to fix loss of schema
37e0a0a [Andre Schumacher] Cleaning up
14c3fd8 [Andre Schumacher] Attempting to fix Spark-Parquet schema conversion
3e1456c [Michael Armbrust] WIP: Directly serialize catalyst attributes.
f7aeba3 [Michael Armbrust] [SPARK-1982] Support for ByteType and ShortType.
3104886 [Michael Armbrust] Nested Rows should be Rows, not Seqs.
3c6b25f [Andre Schumacher] Trying to reduce no-op changes wrt master
31465d6 [Andre Schumacher] Scalastyle: fixing commented out bottom
de02538 [Andre Schumacher] Cleaning up ParquetTestData
2f5a805 [Andre Schumacher] Removing stripMargin from test schemas
191bc0d [Andre Schumacher] Changing to Seq for ArrayType, refactoring SQLParser for nested field extension
cbb5793 [Andre Schumacher] Code review feedback
32229c7 [Andre Schumacher] Removing Row nested values and placing by generic types
0ae9376 [Andre Schumacher] Doc strings and simplifying ParquetConverter.scala
a6b4f05 [Andre Schumacher] Cleaning up ArrayConverter, moving classTag to NativeType, adding NativeRow
431f00f [Andre Schumacher] Fixing problems introduced during rebase
c52ff2c [Andre Schumacher] Adding native-array converter
619c397 [Andre Schumacher] Completing Map testcase
79d81d5 [Andre Schumacher] Replacing field names for array and map in WriteSupport
f466ff0 [Andre Schumacher] Added ParquetAvro tests and revised Array conversion
adc1258 [Andre Schumacher] Optimizing imports
e99cc51 [Andre Schumacher] Fixing nested WriteSupport and adding tests
1dc5ac9 [Andre Schumacher] First version of WriteSupport for nested types
d1911dc [Andre Schumacher] Simplifying ArrayType conversion
f777b4b [Andre Schumacher] Scalastyle
824500c [Andre Schumacher] Adding attribute resolution for MapType
b539fde [Andre Schumacher] First commit for MapType
a594aed [Andre Schumacher] Scalastyle
4e25fcb [Andre Schumacher] Adding resolution of complex ArrayTypes
f8f8911 [Andre Schumacher] For primitive rows fall back to more efficient converter, code reorg
6dbc9b7 [Andre Schumacher] Fixing some problems intruduced during rebase
b7fcc35 [Andre Schumacher] Documenting conversions, bugfix, wrappers of Rows
ee70125 [Andre Schumacher] fixing one problem with arrayconverter
98219cf [Andre Schumacher] added struct converter
5d80461 [Andre Schumacher] fixing one problem with nested structs and breaking up files
1b1b3d6 [Andre Schumacher] Fixing one problem with nested arrays
ddb40d2 [Andre Schumacher] Extending tests for nested Parquet data
745a42b [Andre Schumacher] Completing testcase for nested data (Addressbook(
6125c75 [Andre Schumacher] First working nested Parquet record input
4d4892a [Andre Schumacher] First commit nested Parquet read converters
aa688fe [Andre Schumacher] Adding conversion of nested Parquet schemas
  • Loading branch information
AndreSchumacher authored and rxin committed Jun 20, 2014
1 parent f397e92 commit f479cf3
Show file tree
Hide file tree
Showing 14 changed files with 2,102 additions and 384 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,43 +66,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected case class Keyword(str: String)

protected implicit def asParser(k: Keyword): Parser[String] =
allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)

protected class SqlLexical extends StdLexical {
case class FloatLit(chars: String) extends Token {
override def toString = chars
}
override lazy val token: Parser[Token] = (
identChar ~ rep( identChar | digit ) ^^
{ case first ~ rest => processIdent(first :: rest mkString "") }
| rep1(digit) ~ opt('.' ~> rep(digit)) ^^ {
case i ~ None => NumericLit(i mkString "")
case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString(""))
}
| '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^
{ case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") }
| '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^
{ case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") }
| EofCh ^^^ EOF
| '\'' ~> failure("unclosed string literal")
| '\"' ~> failure("unclosed string literal")
| delim
| failure("illegal character")
)

override def identChar = letter | elem('.') | elem('_')

override def whitespace: Parser[Any] = rep(
whitespaceChar
| '/' ~ '*' ~ comment
| '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') )
| '#' ~ rep( chrExcept(EofCh, '\n') )
| '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') )
| '/' ~ '*' ~ failure("unclosed comment")
)
}

override val lexical = new SqlLexical
lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)

protected val ALL = Keyword("ALL")
protected val AND = Keyword("AND")
Expand Down Expand Up @@ -161,24 +125,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
this.getClass
.getMethods
.filter(_.getReturnType == classOf[Keyword])
.map(_.invoke(this).asInstanceOf[Keyword])

/** Generate all variations of upper and lower case of a given string */
private def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
if (s == "") {
Stream(prefix)
} else {
allCaseVersions(s.tail, prefix + s.head.toLower) ++
allCaseVersions(s.tail, prefix + s.head.toUpper)
}
}
.map(_.invoke(this).asInstanceOf[Keyword].str)

lexical.reserved ++= reservedWords.flatMap(w => allCaseVersions(w.str))

lexical.delimiters += (
"@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
",", ";", "%", "{", "}", ":", "[", "]"
)
override val lexical = new SqlLexical(reservedWords)

protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = {
exprs.zipWithIndex.map {
Expand Down Expand Up @@ -383,7 +332,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)

protected lazy val baseExpression: PackratParser[Expression] =
expression ~ "[" ~ expression <~ "]" ^^ {
expression ~ "[" ~ expression <~ "]" ^^ {
case base ~ _ ~ ordinal => GetItem(base, ordinal)
} |
TRUE ^^^ Literal(true, BooleanType) |
Expand All @@ -399,3 +348,55 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected lazy val dataType: Parser[DataType] =
STRING ^^^ StringType
}

class SqlLexical(val keywords: Seq[String]) extends StdLexical {
case class FloatLit(chars: String) extends Token {
override def toString = chars
}

reserved ++= keywords.flatMap(w => allCaseVersions(w))

delimiters += (
"@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
",", ";", "%", "{", "}", ":", "[", "]"
)

override lazy val token: Parser[Token] = (
identChar ~ rep( identChar | digit ) ^^
{ case first ~ rest => processIdent(first :: rest mkString "") }
| rep1(digit) ~ opt('.' ~> rep(digit)) ^^ {
case i ~ None => NumericLit(i mkString "")
case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString(""))
}
| '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^
{ case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") }
| '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^
{ case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") }
| EofCh ^^^ EOF
| '\'' ~> failure("unclosed string literal")
| '\"' ~> failure("unclosed string literal")
| delim
| failure("illegal character")
)

override def identChar = letter | elem('_') | elem('.')

override def whitespace: Parser[Any] = rep(
whitespaceChar
| '/' ~ '*' ~ comment
| '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') )
| '#' ~ rep( chrExcept(EofCh, '\n') )
| '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') )
| '/' ~ '*' ~ failure("unclosed comment")
)

/** Generate all variations of upper and lower case of a given string */
def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
if (s == "") {
Stream(prefix)
} else {
allCaseVersions(s.tail, prefix + s.head.toLower) ++
allCaseVersions(s.tail, prefix + s.head.toUpper)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ case class GetItem(child: Expression, ordinal: Expression) extends Expression {
null
} else {
if (child.dataType.isInstanceOf[ArrayType]) {
// TODO: consider using Array[_] for ArrayType child to avoid
// boxing of primitives
val baseValue = value.asInstanceOf[Seq[_]]
val o = key.asInstanceOf[Int]
if (o >= baseValue.size || o < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,108 @@ package org.apache.spark.sql.catalyst.types

import java.sql.Timestamp

import scala.reflect.runtime.universe.{typeTag, TypeTag}
import scala.util.parsing.combinator.RegexParsers

import org.apache.spark.sql.catalyst.expressions.Expression
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.{typeTag, TypeTag, runtimeMirror}

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.util.Utils

/**
*
*/
object DataType extends RegexParsers {
protected lazy val primitiveType: Parser[DataType] =
"StringType" ^^^ StringType |
"FloatType" ^^^ FloatType |
"IntegerType" ^^^ IntegerType |
"ByteType" ^^^ ByteType |
"ShortType" ^^^ ShortType |
"DoubleType" ^^^ DoubleType |
"LongType" ^^^ LongType |
"BinaryType" ^^^ BinaryType |
"BooleanType" ^^^ BooleanType |
"DecimalType" ^^^ DecimalType |
"TimestampType" ^^^ TimestampType

protected lazy val arrayType: Parser[DataType] =
"ArrayType" ~> "(" ~> dataType <~ ")" ^^ ArrayType

protected lazy val mapType: Parser[DataType] =
"MapType" ~> "(" ~> dataType ~ "," ~ dataType <~ ")" ^^ {
case t1 ~ _ ~ t2 => MapType(t1, t2)
}

protected lazy val structField: Parser[StructField] =
("StructField(" ~> "[a-zA-Z0-9_]*".r) ~ ("," ~> dataType) ~ ("," ~> boolVal <~ ")") ^^ {
case name ~ tpe ~ nullable =>
StructField(name, tpe, nullable = nullable)
}

protected lazy val boolVal: Parser[Boolean] =
"true" ^^^ true |
"false" ^^^ false


protected lazy val structType: Parser[DataType] =
"StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ {
case fields => new StructType(fields)
}

protected lazy val dataType: Parser[DataType] =
arrayType |
mapType |
structType |
primitiveType

/**
* Parses a string representation of a DataType.
*
* TODO: Generate parser as pickler...
*/
def apply(asString: String): DataType = parseAll(dataType, asString) match {
case Success(result, _) => result
case failure: NoSuccess => sys.error(s"Unsupported dataType: $asString, $failure")
}
}

abstract class DataType {
/** Matches any expression that evaluates to this DataType */
def unapply(a: Expression): Boolean = a match {
case e: Expression if e.dataType == this => true
case _ => false
}

def isPrimitive: Boolean = false
}

case object NullType extends DataType

trait PrimitiveType extends DataType {
override def isPrimitive = true
}

abstract class NativeType extends DataType {
type JvmType
@transient val tag: TypeTag[JvmType]
val ordering: Ordering[JvmType]

@transient val classTag = {
val mirror = runtimeMirror(Utils.getSparkClassLoader)
ClassTag[JvmType](mirror.runtimeClass(tag.tpe))
}
}

case object StringType extends NativeType {
case object StringType extends NativeType with PrimitiveType {
type JvmType = String
@transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
}
case object BinaryType extends DataType {
case object BinaryType extends DataType with PrimitiveType {
type JvmType = Array[Byte]
}
case object BooleanType extends NativeType {
case object BooleanType extends NativeType with PrimitiveType {
type JvmType = Boolean
@transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
Expand All @@ -63,7 +136,7 @@ case object TimestampType extends NativeType {
}
}

abstract class NumericType extends NativeType {
abstract class NumericType extends NativeType with PrimitiveType {
// Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
// implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
// type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
Expand Down Expand Up @@ -154,6 +227,17 @@ case object FloatType extends FractionalType {
case class ArrayType(elementType: DataType) extends DataType

case class StructField(name: String, dataType: DataType, nullable: Boolean)
case class StructType(fields: Seq[StructField]) extends DataType

object StructType {
def fromAttributes(attributes: Seq[Attribute]): StructType = {
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable)))
}

// def apply(fields: Seq[StructField]) = new StructType(fields.toIndexedSeq)
}

case class StructType(fields: Seq[StructField]) extends DataType {
def toAttributes = fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
}

case class MapType(keyType: DataType, valueType: DataType) extends DataType
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
def parquetFile(path: String): SchemaRDD =
new SchemaRDD(this, parquet.ParquetRelation(path))
new SchemaRDD(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration)))

/**
* Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ class JavaSQLContext(val sqlContext: SQLContext) {
* Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
*/
def parquetFile(path: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, ParquetRelation(path))
new JavaSchemaRDD(
sqlContext,
ParquetRelation(path, Some(sqlContext.sparkContext.hadoopConfiguration)))

/**
* Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.WriteToFile(path, child) =>
val relation =
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration)
InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
// Note: overwrite=false because otherwise the metadata we just created will be deleted
InsertIntoParquetTable(relation, planLater(child), overwrite=false)(sparkContext) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
Expand Down
Loading

0 comments on commit f479cf3

Please sign in to comment.