From e99cc514397f217f307807e1d6e33e1aea9ada84 Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Sun, 13 Apr 2014 11:23:38 +0300 Subject: [PATCH] Fixing nested WriteSupport and adding tests --- .../sql/parquet/ParquetTableSupport.scala | 61 +++++++++++-------- .../spark/sql/parquet/ParquetTestData.scala | 4 +- .../spark/sql/parquet/ParquetTypes.scala | 23 ++++--- .../spark/sql/parquet/ParquetQuerySuite.scala | 49 +++++++++++++++ 4 files changed, 99 insertions(+), 38 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index e29d416d5cfb7..567574516883d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -141,41 +141,49 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { } private[parquet] def writeValue(schema: DataType, value: Any): Unit = { - schema match { - case t @ ArrayType(_) => writeArray(t, value.asInstanceOf[Row]) - case t @ MapType(_, _) => writeMap(t, value.asInstanceOf[Map[Any, Any]]) - case t @ StructType(_) => writeStruct(t, value.asInstanceOf[Row]) - case _ => writePrimitive(schema.asInstanceOf[PrimitiveType], value) + if (value != null && value != Nil) { + schema match { + case t @ ArrayType(_) => writeArray(t, value.asInstanceOf[Row]) + case t @ MapType(_, _) => writeMap(t, value.asInstanceOf[Map[Any, Any]]) + case t @ StructType(_) => writeStruct(t, value.asInstanceOf[Row]) + case _ => writePrimitive(schema.asInstanceOf[PrimitiveType], value) + } } } private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = { - schema match { - case StringType => writer.addBinary( - Binary.fromByteArray( - value.asInstanceOf[String].getBytes("utf-8") + if (value != null && value != Nil) { + schema match { + case StringType => writer.addBinary( + Binary.fromByteArray( + value.asInstanceOf[String].getBytes("utf-8") + ) ) - ) - case IntegerType => writer.addInteger(value.asInstanceOf[Int]) - case LongType => writer.addLong(value.asInstanceOf[Long]) - case DoubleType => writer.addDouble(value.asInstanceOf[Double]) - case FloatType => writer.addFloat(value.asInstanceOf[Float]) - case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean]) - case _ => sys.error(s"Do not know how to writer $schema to consumer") + case IntegerType => writer.addInteger(value.asInstanceOf[Int]) + case LongType => writer.addLong(value.asInstanceOf[Long]) + case DoubleType => writer.addDouble(value.asInstanceOf[Double]) + case FloatType => writer.addFloat(value.asInstanceOf[Float]) + case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean]) + case _ => sys.error(s"Do not know how to writer $schema to consumer") + } } } private[parquet] def writeStruct(schema: StructType, struct: Row): Unit = { - val fields = schema.fields.toArray - writer.startGroup() - var i = 0 - while(i < fields.size) { - writer.startField(fields(i).name, i) - writeValue(fields(i).dataType, struct(i)) - writer.endField(fields(i).name, i) - i = i + 1 + if (struct != null && struct != Nil) { + val fields = schema.fields.toArray + writer.startGroup() + var i = 0 + while(i < fields.size) { + if (struct(i) != null && struct(i) != Nil) { + writer.startField(fields(i).name, i) + writeValue(fields(i).dataType, struct(i)) + writer.endField(fields(i).name, i) + } + i = i + 1 + } + writer.endGroup() } - writer.endGroup() } private[parquet] def writeArray(schema: ArrayType, array: Row): Unit = { @@ -183,18 +191,17 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { writer.startGroup() if (array.size > 0) { writer.startField("values", 0) - writer.startGroup() var i = 0 while(i < array.size) { writeValue(elementType, array(i)) i = i + 1 } - writer.endGroup() writer.endField("values", 0) } writer.endGroup() } + // TODO: this does not allow null values! Should these be supported? private[parquet] def writeMap(schema: MapType, map: Map[_, _]): Unit = { writer.startGroup() if (map.size > 0) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala index 8a34ee3b92eff..c33d04925653f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala @@ -182,13 +182,13 @@ private[sql] object ParquetTestData { |optional group data1 { |repeated group map { |required binary key; - |optional int32 value; + |required int32 value; |} |} |required group data2 { |repeated group map { |required binary key; - |optional group value { + |required group value { |required int64 payload1; |optional binary payload2; |} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index 50f06d5467505..bee469a5ddd19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -64,13 +64,14 @@ private[parquet] object ParquetTypesConverter { *