From 0e59098575f0e614ecac4bf22dd21da838b241de Mon Sep 17 00:00:00 2001 From: Li Jin Date: Thu, 11 Jan 2018 15:43:53 -0500 Subject: [PATCH 1/3] Change MapVector to NullableMapVector in ArrowColumnVector --- .../org/apache/spark/sql/vectorized/ArrowColumnVector.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java index 708333213f3f1..c697cd1cca0b9 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java @@ -247,8 +247,8 @@ public ArrowColumnVector(ValueVector vector) { childColumns = new ArrowColumnVector[1]; childColumns[0] = new ArrowColumnVector(listVector.getDataVector()); - } else if (vector instanceof MapVector) { - MapVector mapVector = (MapVector) vector; + } else if (vector instanceof NullableMapVector) { + NullableMapVector mapVector = (NullableMapVector) vector; accessor = new StructAccessor(mapVector); childColumns = new ArrowColumnVector[mapVector.size()]; @@ -555,7 +555,7 @@ final int getArrayOffset(int rowId) { private static class StructAccessor extends ArrowVectorAccessor { - StructAccessor(MapVector vector) { + StructAccessor(NullableMapVector vector) { super(vector); } } From e0689666d77f0b62656c90ed11ba244c9fee4328 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Fri, 12 Jan 2018 11:36:49 -0500 Subject: [PATCH 2/3] Add comment to StructAccessor --- .../org/apache/spark/sql/vectorized/ArrowColumnVector.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java index c697cd1cca0b9..2ed5df70408ac 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java @@ -553,6 +553,13 @@ final int getArrayOffset(int rowId) { } } + /** + * This is a place holder class. Any "get" method will throw UnsupportedOperationException. + * + * Access struct values in a ArrowColumnVector doesn't use this accessor. Instead, it uses getStruct() method defined + * in the parent class. Any call to "get" method in this class is a bug in the code. + * + */ private static class StructAccessor extends ArrowVectorAccessor { StructAccessor(NullableMapVector vector) { From ab2a309ac8e900db50a73b87769537c5290c2363 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Fri, 12 Jan 2018 12:13:44 -0500 Subject: [PATCH 3/3] Add test --- .../sql/vectorized/ArrowColumnVector.java | 2 +- .../vectorized/ArrowColumnVectorSuite.scala | 36 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java index 2ed5df70408ac..eb69001fe677e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java @@ -554,7 +554,7 @@ final int getArrayOffset(int rowId) { } /** - * This is a place holder class. Any "get" method will throw UnsupportedOperationException. + * Any call to "get" method will throw UnsupportedOperationException. * * Access struct values in a ArrowColumnVector doesn't use this accessor. Instead, it uses getStruct() method defined * in the parent class. Any call to "get" method in this class is a bug in the code. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala index 7304803a092c0..53432669e215d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala @@ -322,6 +322,42 @@ class ArrowColumnVectorSuite extends SparkFunSuite { allocator.close() } + test("non nullable struct") { + val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue) + val schema = new StructType().add("int", IntegerType).add("long", LongType) + val vector = ArrowUtils.toArrowField("struct", schema, nullable = false, null) + .createVector(allocator).asInstanceOf[NullableMapVector] + + vector.allocateNew() + val intVector = vector.getChildByOrdinal(0).asInstanceOf[IntVector] + val longVector = vector.getChildByOrdinal(1).asInstanceOf[BigIntVector] + + vector.setIndexDefined(0) + intVector.setSafe(0, 1) + longVector.setSafe(0, 1L) + + vector.setIndexDefined(1) + intVector.setSafe(1, 2) + longVector.setNull(1) + + vector.setValueCount(2) + + val columnVector = new ArrowColumnVector(vector) + assert(columnVector.dataType === schema) + assert(columnVector.numNulls === 0) + + val row0 = columnVector.getStruct(0, 2) + assert(row0.getInt(0) === 1) + assert(row0.getLong(1) === 1L) + + val row1 = columnVector.getStruct(1, 2) + assert(row1.getInt(0) === 2) + assert(row1.isNullAt(1)) + + columnVector.close() + allocator.close() + } + test("struct") { val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue) val schema = new StructType().add("int", IntegerType).add("long", LongType)