-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-16060][SQL] Support Vectorized ORC Reader #19943
Conversation
|
||
case TimestampType => | ||
val data = fromColumn.asInstanceOf[TimestampColumnVector] | ||
toColumn.appendLongs(batchSize, data.time(0) * 1000L + data.nanos(0) / 1000L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This raw calculation here should be encapsulated in a utilities method with a few comments explaining what is happening (i.e. the conversion from TimestampColumnVector's java.sql.Timestamp representation to Spark's). It is repeated (so to speak several times in this method/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's done.
} | ||
|
||
case DecimalType.Fixed(precision, scale) => | ||
val d = fromColumn.asInstanceOf[DecimalColumnVector].vector(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, the conversion from DecimalColumnVector's HiveDecimalWritable to Spark's representation should be encapsulated in a utilities method and explained. Note that HiveDecimalWritable does provide the possibility of converting that are faster than going first to BigDecimal. Having one version of the code would make that easier in the future..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's refactored first.
throw new UnsupportedOperationException(s"Unsupported Data Type: $dt") | ||
} | ||
} | ||
} else if (!field.nullable || fromColumn.noNulls) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I get how this if stmt works right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate your concern on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I get your schema says the column has no nulls (!field.nullable) that isn't a guarantee that the column returned by the reader is noNulls. There should be an assertion or perhaps better yet an exception thrown for a data error. (My other comment about selectedInUse probably ought to be a data error, too).
This method talks about ORC reader -- yet really this is general Hive to Spark vectorization code and it will in one way or another be generalized and reused. In Hive, there is work to vectorize Parquet fully. We also support vectorized reading of text files and through LLAP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Nulls can sneak in when enforcing decimal precision/scale, for example).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwing data error exceptions can be nasty -- perhaps setting the value to a known value and a warning might be better. I noticed leveraging of not nullable in the Hive optimizer recently but we will see if there are any practical issues that arise...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I missed your comment here, @mmccline . Thank you for review!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mmccline . For this, I'll respect only fromColumn.noNulls
here.
if (requestedColIds(i) < 0) { | ||
toColumn.appendNulls(batchSize) | ||
} else { | ||
val fromColumn = batch.cols(requestedColIds(i)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The VectorizedRowBatch processing here is ignoring boolean batch.selectedInUse and int[] batch.selected array logical batch index processing. If you are going to assume batch.selectedInUse is false for batches read from ORC, it ought to be asserted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I'll add assertion.
Thank you for review, @mmccline ! |
@@ -378,6 +378,11 @@ object SQLConf { | |||
.checkValues(Set("hive", "native")) | |||
.createWithDefault("native") | |||
|
|||
val ORC_VECTORIZED_READER_ENABLED = buildConf("spark.sql.orc.vectorizedReader.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parquet config: spark.sql.parquet.enableVectorizedReader
. Shall we have similar config name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, @viirya . Thank you for review.
@@ -139,15 +146,25 @@ class OrcFileFormat | |||
} | |||
} | |||
|
|||
val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) | |||
val enableVectorizedReader = sparkSession.sessionState.conf.orcVectorizedReaderEnabled && | |||
supportBatch(sparkSession, resultSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether enabling vectorized reader is not the same as supporting batch. You can enable vectorized reader but not support batch like ParquetFileFormat
does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I'll try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, it seems to make the implementation complicated in order to provide slower version.
If you don't mind, I'll proceed that in a next follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
supportBatch
already have the condition conf.orcVectorizedReaderEnabled
, I think here we can just write
val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. It's fixed.
|
||
val fs = filePath.getFileSystem(conf) | ||
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) | ||
val reader = OrcFile.createReader(filePath, readerOptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why extract the creation of reader
from requestedColumnIds
to here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reader is used here, too. This extraction prevents redundant creation of ORC reader.
batchReader.setRequiredSchema(
OrcUtils.getFixedTypeDescription(reader.getSchema, dataSchema),
@@ -110,4 +107,21 @@ object OrcUtils extends Logging { | |||
} | |||
} | |||
} | |||
|
|||
/** | |||
* Return a fixed ORC schema with data schema information, if needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe explain what the issue to fix is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I added.
Test build #84733 has finished for PR 19943 at commit
|
Test build #84785 has finished for PR 19943 at commit
|
Test build #84788 has finished for PR 19943 at commit
|
Retest this please |
Test build #84809 has finished for PR 19943 at commit
|
Hi, @cloud-fan , @gatorsmile , @HyukjinKwon , @viirya , @mmccline . |
A high-level question: @viirya had a PR to do this by creating a wrapper for ORC columnar batch. The parquet data source pick a different approach that writes the values to Spark columnar batch. Generally I think the wrapper approach should be faster on pure scan, but may be slower if there is computation after the scan, e.g. aggregate. Do we have a benchmark for it? |
That's a good point, @cloud-fan . I also agreed to see the result for that before (at the my initial PR.) First of all, we can compare the results with |
Also cc @kiszk , this question also applies to the table cache reader. We should think more about using a wrapper or writing to spark column vector. |
columnarBatch.close() | ||
columnarBatch = null | ||
} | ||
if (rows != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to close the orc batch?
/** | ||
* To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. | ||
*/ | ||
private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need more documents. How will this class be used? Created for each Spark task and reset(call .initialize
) for each file split?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I can add that. But, do you think we can proceed this approach?
Based on the previous advice, I think you prefer @viirya 's approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to follow parquet and write data to Spark column vector now. Later we can try the wrapper approach and compare.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for decision, @cloud-fan . If then, I'll try to update the PR.
If i've understood Spark development process correctly, the 2.3 branch cut date is in couple of days, and if this PR doesn't get merged to master real soon, it'll have to wait until 2.4, about 6 months? @dongjoon-hyun @cloud-fan Considering that the benchmarks show almost order of magnitude improvement in performance, it would be really great to get this in for Spark 2.3, and worry about the details of copy vs wrapper approach later. Also, as this is anyway opt-in feature that needs to be enabled with config option, merging this shouldn't be "dangerous".. Thanks for your efforts & looking forward to get this PR to production! |
toColumn.appendByteArray(data.vector(0), data.start(0), data.length(0)) | ||
index += 1 | ||
} | ||
case BinaryType => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we merge two cases for StringType
and BinaryType
into one case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep.
case BooleanType => | ||
val data = fromColumn.asInstanceOf[LongColumnVector].vector | ||
var index = 0 | ||
while (index < batchSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use WritableColumn.appendBooleans()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not. data(index)
is changed at each iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought appendBooleans()
can be used since toColumn.appendLongs()
is used for LongType
. I realized data
is Long
as you pointed out .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, so the orc batch always return long as value? that seems very inefficient...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anyone knows why ORC does this? cc @mmccline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is that ORC library has LongColumnVector and DoubleColumnVector for implementation simplicity.
All integral types use LongColumnVector and real types(float/double) use DoubleColumnVector. For non-numeric types, there exists corresponding column vectors.
case ByteType => | ||
val data = fromColumn.asInstanceOf[LongColumnVector].vector | ||
var index = 0 | ||
while (index < batchSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm
case ShortType => | ||
val data = fromColumn.asInstanceOf[LongColumnVector].vector | ||
var index = 0 | ||
while (index < batchSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
case IntegerType | DateType => | ||
val data = fromColumn.asInstanceOf[LongColumnVector].vector | ||
var index = 0 | ||
while (index < batchSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, data
is long[]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. got it
case FloatType => | ||
val data = fromColumn.asInstanceOf[DoubleColumnVector].vector | ||
var index = 0 | ||
while (index < batchSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -170,6 +171,8 @@ case class FileSourceScanExec( | |||
|
|||
val needsUnsafeRowConversion: Boolean = if (relation.fileFormat.isInstanceOf[ParquetSource]) { | |||
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled | |||
} else if (relation.fileFormat.isInstanceOf[OrcFileFormat]) { | |||
SparkSession.getActiveSession.get.sessionState.conf.orcVectorizedReaderEnabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Different than Parquet, for now we enable vectorized ORC reader when batch output is supported. We don't need unsafe row conversion at all for ORC. Because once it supports batch, we go batch-based approach. If it doesn't support batch, we don't enable vectorized ORC reader at all, so we don't need unsafe row conversion too.
Once we can enable vectorized ORC even batch is not supported, we need to add this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I'll revert the change on this file, @viirya .
/** | ||
* Record reader from row batch. | ||
*/ | ||
private var rows: org.apache.orc.RecordReader = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recordReader
or rowReader
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. It's renamed to recordReader
.
private var columnarBatch: ColumnarBatch = _ | ||
|
||
/** | ||
* Writable columnVectors of ColumnarBatch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
columnVectors
-> ColumnVector
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it's Seq[WritableColumnVector]
, I'll keep the current one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so it should be Writable column vectors
* by copying from ORC VectorizedRowBatch columns to Spark ColumnarBatch columns. | ||
*/ | ||
private def nextBatch(): Boolean = { | ||
if (rowsReturned >= totalRowCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add an assert
to make sure batch
is initialized (not null)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like Parquet's nextBatch
, let's skip that assertion here.
val toColumn = columnVectors(i) | ||
|
||
if (requestedColIds(i) < 0) { | ||
toColumn.appendNulls(batchSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The column ids not requested are fixed. We don't need to reset them and re-append nulls into them again for each batch. We only need to append nulls into them once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The column ids not requested are fixed
is done by OrcDeserializer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant, the columns not requested are not changed for each batch. For now every time you reset the batch and then re-append nulls into such columns. But those columns are not changed across different batches.
Once #20116 is merged, we reset by column vectors, not the columnar batch. So we can just reset the column vectors of the required columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, @viirya .
Oops. Without noticing your comments, I pushed another refactoring which split the functions. |
@dongjoon-hyun Thank you for testing the split methods. If anything the benchmark results look couple of percent slower now? Oh well, at least it is good to know that your code is as fast as it can be! I have no further ideas how performance could possibly be improved. Just many thanks to you and all reviewers for your hard work on this PR! |
Thank you for your help, @henrify . I think it's within margin of deviation. |
// The number of rows read and considered to be returned. | ||
private long rowsReturned = 0L; | ||
|
||
private long totalRowCount = 0L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need these 2? They are not used for progress reporting anymore, and we don't rely on them to detect the end of file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. We can remove them.
} | ||
} | ||
|
||
private void putNonNullBytes(int count, LongColumnVector fromColumn, WritableColumnVector toColumn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we revert these? Since they do not help with performance, I'd like to inline these small functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I'll revert them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It almost seemed that they hurt the performance. The MR and Hive tests were up and down randomly as expected, but Vectorized tests were down in almost every benchmark.
------------------------------------------------------------------------------------------------ | ||
Native ORC MR 3364 / 3391 0.3 3208.3 1.0X | ||
Native ORC Vectorized 273 / 284 3.8 260.1 12.3X | ||
Hive built-in ORC 831 / 842 1.3 792.8 4.0X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't forget this question :) #19943 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for late response.
As we know, ORC read path evolves in two ways; ORC MR reader and ORC VectorizedRowBatch. But, the core part is ORC VectorizedRowBatch. ORC MR reader is reimplemented as a wrapper of the ORC VectorizedRowBatch.
What I mentioned previously was the difference between ORC VecterizedRowBatch and ORC MR reader (not about Hive 1.2.1 ORC read path). Maybe, the performance difference between Hive 1.2.1 ORC read path and ORC MR reader is due to the redevelopment of ORC MR reader.
Anyway, I consider both ORC MR and ORC Vectorized version as a single Native ORC
. So, I described in a vague way sometimes. If this PR is merged, Spark is going to have both native ORC implementations (MR/Vectorized). Like Parquet, we will call vectorized version as Spark's native ORC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does Hive ORC use? The MR reader or vectorized reader or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, the latest Hive shares the same library with Spark and the default is vectorized reader. That is the main goal of my efforts about Apache ORC 1.4.1.
Test build #85823 has finished for PR 19943 at commit
|
Since 91b3d66, it seems to pass again.
|
Test build #85827 has finished for PR 19943 at commit
|
Test build #85829 has finished for PR 19943 at commit
|
Test build #85837 has finished for PR 19943 at commit
|
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with minor comments.
public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> { | ||
|
||
/** | ||
* The default size of batch. We use this value for both ORC and Spark consistently |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We use this value for ORC reader to make it consistent with Spark's columnar batch, because their default batch sizes are different like the following.
schema.length <= conf.wholeStageMaxNumFields && | ||
schema.forall(_.dataType.isInstanceOf[AtomicType]) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to implement vectorTypes
as ParquetFileFormat
? Seems it affects the actual columnar vector types used in generated codes, instead of general ColumnVector
. May it affect performance a bit?
Test build #85845 has finished for PR 19943 at commit
|
thanks, merging to master/2.3! Let's address the comments in follow-up. BTW @dongjoon-hyun let's keep our discussion on #19943 (comment) |
## What changes were proposed in this pull request? This PR adds an ORC columnar-batch reader to native `OrcFileFormat`. Since both Spark `ColumnarBatch` and ORC `RowBatch` are used together, it is faster than the current Spark implementation. This replaces the prior PR, #17924. Also, this PR adds `OrcReadBenchmark` to show the performance improvement. ## How was this patch tested? Pass the existing test cases. Author: Dongjoon Hyun <[email protected]> Closes #19943 from dongjoon-hyun/SPARK-16060. (cherry picked from commit f44ba91) Signed-off-by: Wenchen Fan <[email protected]>
Thank you so much, @cloud-fan , @mmccline , @viirya , @henrify , @kiszk , @HyukjinKwon ! |
Great job guys! Also, check through the spam of your public github email address for a small gift @dongjoon-hyun @cloud-fan @viirya @kiszk @HyukjinKwon @mmccline |
What changes were proposed in this pull request?
This PR adds an ORC columnar-batch reader to native
OrcFileFormat
. Since both SparkColumnarBatch
and ORCRowBatch
are used together, it is faster than the current Spark implementation. This replaces the prior PR, #17924.Also, this PR adds
OrcReadBenchmark
to show the performance improvement.How was this patch tested?
Pass the existing test cases.