Skip to content

Commit

Permalink
[SPARK-39231][SQL] Use ConstantColumnVector instead of `On/OffHeapC…
Browse files Browse the repository at this point in the history
…olumnVector` to store partition columns in `VectorizedParquetRecordReader`

### What changes were proposed in this pull request?
This pr change to use `ConstantColumnVector` to store partition columns in `VectorizedParquetRecordReader` because partition column vector always constant vector.

### Why are the changes needed?

1. Partition columns vector alway constant vector.

2. **Performance improvement**: `ConstantColumnVector` has better reading and writing performance than `OnHeapColumnVector` and `OffHeapColumnVector`. From the microbench results, the performance improvement is obvious for `StringType` : the read throughput is increased by  about 2 times, and the write throughput is increased by more than 100 times.

3. **Memory saving**: `ConstantColumnVector` saves more memory than `OnHeapColumnVector` and `OffHeapColumnVector`, for `UTF8String` type Vector with length of 4096(default `batchSize`), 'ConstantColumnVector' can save more than 90% of memory compared with `OnHeapColumnVector`:

-  - `ConstantColumnVector` only stores an `UTF8String`
-  -  `OnHeapColumnVector` needs `arrayOffsets(int[4096])` + `arrayLengths(int[4096])` + `(UTF8String * 4096)`

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Pass Github Action
- Add new UTs to test the new method introduced by this pr: `ColumnVectorUtils.fill(ConstantColumnVector col, InternalRow row, int fieldIdx)`
- Add new micro benchmark to compare the read and write performance of constant vector(simulate partition column scene) between `OnHeapColumnVector`, `OffHeapColumnVector` and `ConstantColumnVector`

Closes apache#36616 from LuciferYang/SPARK-39231.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
  • Loading branch information
LuciferYang authored and sunchao committed Jun 29, 2022
1 parent feae21c commit c0a12cf
Show file tree
Hide file tree
Showing 9 changed files with 1,414 additions and 14 deletions.
280 changes: 280 additions & 0 deletions sql/core/benchmarks/ConstantColumnVectorBenchmark-jdk11-results.txt

Large diffs are not rendered by default.

280 changes: 280 additions & 0 deletions sql/core/benchmarks/ConstantColumnVectorBenchmark-jdk17-results.txt

Large diffs are not rendered by default.

280 changes: 280 additions & 0 deletions sql/core/benchmarks/ConstantColumnVectorBenchmark-results.txt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
import org.apache.spark.sql.execution.vectorized.ConstantColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
Expand Down Expand Up @@ -243,18 +243,17 @@ private void initBatch(
for (StructField f: sparkSchema.fields()) {
batchSchema = batchSchema.add(f);
}
int constantColumnLength = 0;
if (partitionColumns != null) {
for (StructField f : partitionColumns.fields()) {
batchSchema = batchSchema.add(f);
}
constantColumnLength = partitionColumns.fields().length;
}

WritableColumnVector[] vectors;
if (memMode == MemoryMode.OFF_HEAP) {
vectors = OffHeapColumnVector.allocateColumns(capacity, batchSchema);
} else {
vectors = OnHeapColumnVector.allocateColumns(capacity, batchSchema);
}
ColumnVector[] vectors = ColumnVectorUtils.allocateColumns(
capacity, batchSchema, memMode == MemoryMode.OFF_HEAP, constantColumnLength);

columnarBatch = new ColumnarBatch(vectors);

columnVectors = new ParquetColumnVector[sparkSchema.fields().length];
Expand All @@ -264,14 +263,14 @@ private void initBatch(
defaultValue = sparkRequestedSchema.existenceDefaultValues()[i];
}
columnVectors[i] = new ParquetColumnVector(parquetColumn.children().apply(i),
vectors[i], capacity, memMode, missingColumns, true, defaultValue);
(WritableColumnVector) vectors[i], capacity, memMode, missingColumns, true, defaultValue);
}

if (partitionColumns != null) {
int partitionIdx = sparkSchema.fields().length;
for (int i = 0; i < partitionColumns.fields().length; i++) {
ColumnVectorUtils.populate(vectors[i + partitionIdx], partitionValues, i);
vectors[i + partitionIdx].setIsConstant();
ColumnVectorUtils.populate(
(ConstantColumnVector) vectors[i + partitionIdx], partitionValues, i);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarMap;
Expand Down Expand Up @@ -105,6 +106,61 @@ public static void populate(WritableColumnVector col, InternalRow row, int field
}
}

/**
* Populates the value of `row[fieldIdx]` into `ConstantColumnVector`.
*/
public static void populate(ConstantColumnVector col, InternalRow row, int fieldIdx) {
DataType t = col.dataType();

if (row.isNullAt(fieldIdx)) {
col.setNull();
} else {
if (t == DataTypes.BooleanType) {
col.setBoolean(row.getBoolean(fieldIdx));
} else if (t == DataTypes.BinaryType) {
col.setBinary(row.getBinary(fieldIdx));
} else if (t == DataTypes.ByteType) {
col.setByte(row.getByte(fieldIdx));
} else if (t == DataTypes.ShortType) {
col.setShort(row.getShort(fieldIdx));
} else if (t == DataTypes.IntegerType) {
col.setInt(row.getInt(fieldIdx));
} else if (t == DataTypes.LongType) {
col.setLong(row.getLong(fieldIdx));
} else if (t == DataTypes.FloatType) {
col.setFloat(row.getFloat(fieldIdx));
} else if (t == DataTypes.DoubleType) {
col.setDouble(row.getDouble(fieldIdx));
} else if (t == DataTypes.StringType) {
UTF8String v = row.getUTF8String(fieldIdx);
col.setUtf8String(v);
} else if (t instanceof DecimalType) {
DecimalType dt = (DecimalType) t;
Decimal d = row.getDecimal(fieldIdx, dt.precision(), dt.scale());
if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
col.setInt((int)d.toUnscaledLong());
} else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
col.setLong(d.toUnscaledLong());
} else {
final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
byte[] bytes = integer.toByteArray();
col.setBinary(bytes);
}
} else if (t instanceof CalendarIntervalType) {
// The value of `numRows` is irrelevant.
col.setCalendarInterval((CalendarInterval) row.get(fieldIdx, t));
} else if (t instanceof DateType || t instanceof YearMonthIntervalType) {
col.setInt(row.getInt(fieldIdx));
} else if (t instanceof TimestampType || t instanceof TimestampNTZType ||
t instanceof DayTimeIntervalType) {
col.setLong(row.getLong(fieldIdx));
} else {
throw new RuntimeException(String.format("DataType %s is not supported" +
" in column vectorized reader.", t.sql()));
}
}
}

/**
* Returns the array data as the java primitive array.
* For example, an array of IntegerType will return an int[].
Expand Down Expand Up @@ -235,4 +291,37 @@ public static ColumnarBatch toBatch(
batch.setNumRows(n);
return batch;
}

/**
* <b>This method assumes that all constant column are at the end of schema
* and `constantColumnLength` represents the number of constant column.<b/>
*
* This method allocates columns to store elements of each field of the schema,
* the data columns use `OffHeapColumnVector` when `useOffHeap` is true and
* use `OnHeapColumnVector` when `useOffHeap` is false, the constant columns
* always use `ConstantColumnVector`.
*
* Capacity is the initial capacity of the vector, and it will grow as necessary.
* Capacity is in number of elements, not number of bytes.
*/
public static ColumnVector[] allocateColumns(
int capacity, StructType schema, boolean useOffHeap, int constantColumnLength) {
StructField[] fields = schema.fields();
int fieldsLength = fields.length;
ColumnVector[] vectors = new ColumnVector[fieldsLength];
if (useOffHeap) {
for (int i = 0; i < fieldsLength - constantColumnLength; i++) {
vectors[i] = new OffHeapColumnVector(capacity, fields[i].dataType());
}
} else {
for (int i = 0; i < fieldsLength - constantColumnLength; i++) {
vectors[i] = new OnHeapColumnVector(capacity, fields[i].dataType());
}
}
for (int i = fieldsLength - constantColumnLength; i < fieldsLength; i++) {
vectors[i] = new ConstantColumnVector(capacity, fields[i].dataType());
}
return vectors;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

/**
Expand Down Expand Up @@ -63,6 +64,9 @@ public ConstantColumnVector(int numRows, DataType type) {
} else if (type instanceof CalendarIntervalType) {
// Three columns. Months as int. Days as Int. Microseconds as Long.
this.childData = new ConstantColumnVector[3];
this.childData[0] = new ConstantColumnVector(1, DataTypes.IntegerType);
this.childData[1] = new ConstantColumnVector(1, DataTypes.IntegerType);
this.childData[2] = new ConstantColumnVector(1, DataTypes.LongType);
} else {
this.childData = null;
}
Expand Down Expand Up @@ -294,4 +298,13 @@ public ColumnVector getChild(int ordinal) {
public void setChild(int ordinal, ConstantColumnVector value) {
childData[ordinal] = value;
}

/**
* Sets the CalendarInterval `value` for all rows
*/
public void setCalendarInterval(CalendarInterval value) {
this.childData[0].setInt(value.months);
this.childData[1].setInt(value.days);
this.childData[2].setLong(value.microseconds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -176,13 +176,13 @@ class ParquetFileFormat
requiredSchema: StructType,
partitionSchema: StructType,
sqlConf: SQLConf): Option[Seq[String]] = {
Option(Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)(
Option(Seq.fill(requiredSchema.fields.length)(
if (!sqlConf.offHeapColumnVectorEnabled) {
classOf[OnHeapColumnVector].getName
} else {
classOf[OffHeapColumnVector].getName
}
))
) ++ Seq.fill(partitionSchema.fields.length)(classOf[ConstantColumnVector].getName))
}

override def isSplitable(
Expand Down
Loading

0 comments on commit c0a12cf

Please sign in to comment.