Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-352] Map/Array/Struct type support for Parquet reading in ArrowData Source #559

Merged
merged 3 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public final class ArrowWritableColumnVector extends WritableColumnVector {
private ArrowVectorAccessor accessor;
private ArrowVectorWriter writer;

private int ordinal;
private ValueVector vector;
private ValueVector dictionaryVector;
private static BufferAllocator OffRecordAllocator = SparkMemoryUtils.globalAllocator();
Expand Down Expand Up @@ -89,7 +88,7 @@ public static ArrowWritableColumnVector[] allocateColumns(
ArrowWritableColumnVector[] vectors =
new ArrowWritableColumnVector[fieldVectors.size()];
for (int i = 0; i < fieldVectors.size(); i++) {
vectors[i] = new ArrowWritableColumnVector(fieldVectors.get(i), i, capacity, true);
vectors[i] = new ArrowWritableColumnVector(fieldVectors.get(i), capacity, true);
}
// LOG.info("allocateColumns allocator is " + allocator);
return vectors;
Expand All @@ -107,7 +106,7 @@ public static ArrowWritableColumnVector[] loadColumns(
new ArrowWritableColumnVector[fieldVectors.size()];
for (int i = 0; i < fieldVectors.size(); i++) {
vectors[i] = new ArrowWritableColumnVector(
fieldVectors.get(i), dictionaryVectors.get(i), i, capacity, false);
fieldVectors.get(i), dictionaryVectors.get(i), capacity, false);
}
return vectors;
}
Expand All @@ -117,7 +116,7 @@ public static ArrowWritableColumnVector[] loadColumns(
ArrowWritableColumnVector[] vectors =
new ArrowWritableColumnVector[fieldVectors.size()];
for (int i = 0; i < fieldVectors.size(); i++) {
vectors[i] = new ArrowWritableColumnVector(fieldVectors.get(i), i, capacity, false);
vectors[i] = new ArrowWritableColumnVector(fieldVectors.get(i), capacity, false);
}
return vectors;
}
Expand All @@ -140,17 +139,16 @@ public static ArrowWritableColumnVector[] loadColumns(int capacity, Schema arrow

@Deprecated
public ArrowWritableColumnVector(
ValueVector vector, int ordinal, int capacity, boolean init) {
this(vector, null, ordinal, capacity, init);
ValueVector vector, int capacity, boolean init) {
this(vector, null, capacity, init);
}

public ArrowWritableColumnVector(ValueVector vector, ValueVector dicionaryVector,
int ordinal, int capacity, boolean init) {
public ArrowWritableColumnVector(ValueVector vector, ValueVector dicionaryVector, int capacity,
boolean init) {
super(capacity, ArrowUtils.fromArrowField(vector.getField()));
vectorCount.getAndIncrement();
refCnt.getAndIncrement();

this.ordinal = ordinal;
this.vector = vector;
this.dictionaryVector = dicionaryVector;
if (init) {
Expand Down Expand Up @@ -231,21 +229,23 @@ private void createVectorAccessor(ValueVector vector, ValueVector dictionary) {
} else if (vector instanceof TimeStampMicroVector
|| vector instanceof TimeStampMicroTZVector) {
accessor = new TimestampMicroAccessor((TimeStampVector) vector);
} else if (vector instanceof MapVector) {
MapVector mapVector = (MapVector) vector;
accessor = new MapAccessor(mapVector);
} else if (vector instanceof ListVector) {
ListVector listVector = (ListVector) vector;
accessor = new ArrayAccessor(listVector);
childColumns = new ArrowWritableColumnVector[1];
childColumns[0] = new ArrowWritableColumnVector(
listVector.getDataVector(), 0, listVector.size(), false);
listVector.getDataVector(), listVector.size(), false);
} else if (vector instanceof StructVector) {
throw new UnsupportedOperationException();
/*StructVector structVector = (StructVector) vector;
StructVector structVector = (StructVector) vector;
accessor = new StructAccessor(structVector);

childColumns = new ArrowWritableColumnVector[structVector.size()];
for (int i = 0; i < childColumns.length; ++i) {
childColumns[i] = new ArrowWritableColumnVector(structVector.getVectorById(i));
}*/
childColumns[i] = new ArrowWritableColumnVector(structVector.getVectorById(i), null, structVector.size(), false);
}
} else {
throw new UnsupportedOperationException();
}
Expand Down Expand Up @@ -277,6 +277,9 @@ private ArrowVectorWriter createVectorWriter(ValueVector vector) {
} else if (vector instanceof TimeStampMicroVector
|| vector instanceof TimeStampMicroTZVector) {
return new TimestampMicroWriter((TimeStampVector) vector);
} else if (vector instanceof MapVector) {
MapVector mapVector = (MapVector) vector;
return new MapWriter(mapVector);
} else if (vector instanceof ListVector) {
ListVector listVector = (ListVector) vector;
ArrowVectorWriter elementVector = createVectorWriter(listVector.getDataVector());
Expand Down Expand Up @@ -893,6 +896,10 @@ int getArrayLength(int rowId) {
int getArrayOffset(int rowId) {
throw new UnsupportedOperationException();
}

ColumnarMap getMap(int rowId) {
throw new UnsupportedOperationException();
}
}

private static class BooleanAccessor extends ArrowVectorAccessor {
Expand Down Expand Up @@ -1224,6 +1231,40 @@ private static class StructAccessor extends ArrowVectorAccessor {
}
}

private static class MapAccessor extends ArrowVectorAccessor {
private final MapVector accessor;
private final ArrowColumnVector keys;
private final ArrowColumnVector values;

MapAccessor(MapVector vector) {
super(vector);
this.accessor = vector;
StructVector entries = (StructVector) vector.getDataVector();
this.keys = new ArrowColumnVector(entries.getChild(MapVector.KEY_NAME));
this.values = new ArrowColumnVector(entries.getChild(MapVector.VALUE_NAME));
}

@Override
final ColumnarMap getMap(int rowId) {
int index = rowId * MapVector.OFFSET_WIDTH;
int offset = accessor.getOffsetBuffer().getInt(index);
int length = accessor.getInnerValueCountAt(rowId);
return new ColumnarMap(keys, values, offset, length);
}

@Override
int getArrayOffset(int rowId) {
int index = rowId * MapVector.OFFSET_WIDTH;
return accessor.getOffsetBuffer().getInt(index);
}

@Override
int getArrayLength(int rowId) {
return accessor.getInnerValueCountAt(rowId);
}
}


/* Arrow Vector Writer */
private abstract static class ArrowVectorWriter {
private final ValueVector vector;
Expand Down Expand Up @@ -1885,4 +1926,10 @@ private static class StructWriter extends ArrowVectorWriter {
super(vector);
}
}

private static class MapWriter extends ArrowVectorWriter {
MapWriter(ValueVector vector) {
super(vector);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.intel.oap.spark.sql.DataFrameWriterImplicits._
import com.intel.oap.spark.sql.execution.datasources.v2.arrow.ArrowOptions
import com.sun.management.UnixOperatingSystemMXBean
import org.apache.commons.io.FileUtils

import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
Expand All @@ -34,14 +35,18 @@ import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}

class ArrowDataSourceTest extends QueryTest with SharedSparkSession {
import testImplicits._

private val parquetFile1 = "parquet-1.parquet"
private val parquetFile2 = "parquet-2.parquet"
private val parquetFile3 = "parquet-3.parquet"
private val parquetFile4 = "parquet-4.parquet"
private val parquetFile5 = "parquet-5.parquet"
private val parquetFile6 = "parquet-6.parquet"

override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
Expand Down Expand Up @@ -95,6 +100,14 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession {
.mode("overwrite")
.parquet(ArrowDataSourceTest.locateResourcePath(parquetFile5))

spark.range(100)
.map(i => Tuple1((i, Seq(s"val1_$i", s"val2_$i"), Map((s"ka_$i", s"va_$i"),
(s"kb_$i", s"vb_$i")))))
.write
.format("parquet")
.mode("overwrite")
.parquet(ArrowDataSourceTest.locateResourcePath(parquetFile6))

}

override def afterAll(): Unit = {
Expand Down Expand Up @@ -296,22 +309,33 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession {
assert(fdGrowth < 100)
}

test("parquet reader on data type: struct, array, map") {
val path = ArrowDataSourceTest.locateResourcePath(parquetFile6)
val frame = spark.read
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet")
.arrow(path)
frame.createOrReplaceTempView("ptab3")
val df = spark.sql("select * from ptab3")
df.explain()
df.show()
}

private val orcFile = "people.orc"
test("read orc file") {
val path = ArrowDataSourceTest.locateResourcePath(orcFile)
verifyFrame(
spark.read
.format("arrow")
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "orc")
.load(path), 2, 3)
.format("arrow")
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "orc")
.load(path), 2, 3)
}

test("read orc file - programmatic API ") {
val path = ArrowDataSourceTest.locateResourcePath(orcFile)
verifyFrame(
spark.read
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "orc")
.arrow(path), 2, 3)
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "orc")
.arrow(path), 2, 3)
}

test("create catalog table for orc") {
Expand All @@ -326,14 +350,14 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession {
test("simple SQL query on orc file ") {
val path = ArrowDataSourceTest.locateResourcePath(orcFile)
val frame = spark.read
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "orc")
.arrow(path)
.option(ArrowOptions.KEY_ORIGINAL_FORMAT, "orc")
.arrow(path)
frame.createOrReplaceTempView("people")
val sqlFrame = spark.sql("select * from people")
assert(
sqlFrame.schema ===
StructType(Seq(StructField("name", StringType),
StructField("age", IntegerType), StructField("job", StringType))))
StructType(Seq(StructField("name", StringType),
StructField("age", IntegerType), StructField("job", StringType))))
val rows = sqlFrame.collect()
assert(rows(0).get(0) == "Jorge")
assert(rows(0).get(1) == 30)
Expand Down