-
Notifications
You must be signed in to change notification settings - Fork 75
[NSE-1171] Support merge parquet schema and read missing schema #1175
[NSE-1171] Support merge parquet schema and read missing schema #1175
Conversation
This PR could be tested in |
...ard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala
Outdated
Show resolved
Hide resolved
|
||
val nullVectors = if (hashMissingColumns) { | ||
val vectors = | ||
ArrowWritableColumnVector.allocateColumns(batchSize, requiredSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks these columnar vectors are allocated based on all required schema. And then keep truly null vectors for missing columns in ArrowUtils.scala
. Can this part be optimized? It looks unnecessary to create null vectors for non-null column. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, good catch, I would have a try.
...ndard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala
Outdated
Show resolved
Hide resolved
.getOrElse { | ||
// The missing column need to be find in nullVectors | ||
val nullVector = | ||
nullVectors.find(_.getValueVector.getName.equalsIgnoreCase(field.name)).get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a small suggestion for code refinement. I think the different handling for case sensitive/insensitive can be simplified. We can define a lambda expression as below. Thus, we can directly use this eql
in finding matched field, like anArray.find(x => eql(x, "MATCH_TARGET"))
, without separating the code logic with if/else. We can do the similar code refinement in other places. Right?
val eql = if (caseSensitive) {
(a: String, b: String) => a.equals(b)
} else {
(a: String, b: String) => a.equalsIgnoreCase(b)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@jackylee-ch could you please also add a small Scala unit test for this feature? |
Sure |
…project#1175) * Support merge parquet schema and read missing schema * fix error * optimize null vectors * optimize code * optimize code * change code * add schema merge suite tests * add test for struct type
* [NSE-1170] Set correct row number in batch scan w/ partition columns (#1172) * [NSE-1171] Throw RuntimeException when reading duplicate fields in case-insensitive mode (#1173) * throw exception if one more columns matched in case insensitive mode * add schema check in arrow v2 * bump h2/pgsql version (#1176) * bump h2/pgsql version Signed-off-by: Yuan Zhou <[email protected]> * ignore one failed test Signed-off-by: Yuan Zhou <[email protected]> Signed-off-by: Yuan Zhou <[email protected]> * [NSE-956] allow to write parquet with compression (#1014) This patch adds support for writing parquet with compression df.coalesce(1).write.format("arrow").option("parquet.compression","zstd").save(path) Signed-off-by: Yuan Zhou [email protected] * [NSE-1161] Support read-write parquet conversion to read-write arrow (#1162) * add ArrowConvertExtension * do not convert parquet fileformat while writing to partitioned/bucketed/sorted output * fix cache failed * care about write codec * disable convertor extension by default * add some comments * remove wrong compress type check (#1178) Since the compresssion has been supported in #1014 . The extra compression check in ArrowConvertorExtension can be remove now. * fix to use right arrow branch (#1179) fix to use right arrow branch Signed-off-by: Yuan Zhou <[email protected]> * [NSE-1171] Support merge parquet schema and read missing schema (#1175) * Support merge parquet schema and read missing schema * fix error * optimize null vectors * optimize code * optimize code * change code * add schema merge suite tests * add test for struct type * to use 1.5 branch arrow Signed-off-by: Yuan Zhou <[email protected]> Signed-off-by: Yuan Zhou <[email protected]> Signed-off-by: Yuan Zhou [email protected] Co-authored-by: Jacky Lee <[email protected]>
What changes were proposed in this pull request?
This pr is trying to support Parquet Schema merge in ArrowFileFormat.infer_schema and support dealing with missing column or filter in Parquet reading.
How was this patch tested?
unit tests.