Skip to content

Commit

Permalink
Add to JavaAPI.
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Apr 15, 2014
1 parent 77b512c commit 765c506
Showing 1 changed file with 58 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.spark.sql.api.java

import java.beans.{Introspector, PropertyDescriptor}
import java.beans.Introspector

import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
Expand All @@ -45,29 +48,42 @@ class JavaSQLContext(sparkContext: JavaSparkContext) {
result
}

/**
* :: Experimental ::
* Creates an empty parquet file with the schema of class `beanClass`, which can be registered as
* a table. This registered table can be used as the target of future insertInto` operations.
*
* {{{
* JavaSQLContext sqlCtx = new JavaSQLContext(...)
*
* sqlCtx.createParquetFile(Person.class, "path/to/file.parquet").registerAsTable("people")
* sqlCtx.sql("INSERT INTO people SELECT 'michael', 29")
* }}}
*
* @param beanClass A java bean class object that will be used to determine the schema of the
* parquet file. s
* @param path The path where the directory containing parquet metadata should be created.
* Data inserted into this table will also be stored at this location.
* @param allowExisting When false, an exception will be thrown if this directory already exists.
* @param conf A Hadoop configuration object that can be used to specific options to the parquet
* output format.
*/
@Experimental
def createParquetFile(
beanClass: Class[_],
path: String,
allowExisting: Boolean = true,
conf: Configuration = new Configuration()): JavaSchemaRDD = {
new JavaSchemaRDD(
sqlContext,
ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf))
}

/**
* Applies a schema to an RDD of Java Beans.
*/
def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): JavaSchemaRDD = {
// TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
val beanInfo = Introspector.getBeanInfo(beanClass)

val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
val schema = fields.map { property =>
val dataType = property.getPropertyType match {
case c: Class[_] if c == classOf[java.lang.String] => StringType
case c: Class[_] if c == java.lang.Short.TYPE => ShortType
case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
case c: Class[_] if c == java.lang.Long.TYPE => LongType
case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
case c: Class[_] if c == java.lang.Float.TYPE => FloatType
case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
}

AttributeReference(property.getName, dataType, true)()
}

val schema = getSchema(beanClass)
val className = beanClass.getCanonicalName
val rowRdd = rdd.rdd.mapPartitions { iter =>
// BeanInfo is not serializable so we must rediscover it remotely for each partition.
Expand Down Expand Up @@ -97,4 +113,26 @@ class JavaSQLContext(sparkContext: JavaSparkContext) {
def registerRDDAsTable(rdd: JavaSchemaRDD, tableName: String): Unit = {
sqlContext.registerRDDAsTable(rdd.baseSchemaRDD, tableName)
}

/** Returns a Catalyst Schema for the given java bean class. */
protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = {
// TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
val beanInfo = Introspector.getBeanInfo(beanClass)

val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
fields.map { property =>
val dataType = property.getPropertyType match {
case c: Class[_] if c == classOf[java.lang.String] => StringType
case c: Class[_] if c == java.lang.Short.TYPE => ShortType
case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
case c: Class[_] if c == java.lang.Long.TYPE => LongType
case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
case c: Class[_] if c == java.lang.Float.TYPE => FloatType
case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
}
// TODO: Nullability could be stricter.
AttributeReference(property.getName, dataType, nullable = true)()
}
}
}

0 comments on commit 765c506

Please sign in to comment.