-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and add ArrayType and StructType support. #18655
Conversation
assert(dictionary == null); | ||
boolean[] array = new boolean[count]; | ||
for (int i = 0; i < count; ++i) { | ||
array[i] = (boolData.getAccessor().get(rowId + i) == 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move boolData.getAccessor()
out of the loop if it is a loop invariant? Or, can we use nulls
?
Ditto for other types (e.g. getBytes()
).
|
||
@Override | ||
public boolean getBoolean(int rowId) { | ||
return boolData.getAccessor().get(rowId) == 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use nulls
? If so, it would be better to use another name instead of nulls
.
Test build #79668 has finished for PR 18655 at commit
|
Good feature, but can we split this PR into smaller PRs for ease of review since it looks large? |
Thanks for this @ueshin. I agree with @kiszk that it would be easier to review if you can split this into smaller PRs, maybe keep the additional type support separate? I'm all for refactoring this too, but could you elaborate with some details on why you are refactoring |
yea let's put
|
Thank you for your comments. |
Test build #79696 has finished for PR 18655 at commit
|
Jenkins, retest this please. |
@BryanCutler I'd like to share the motivation of refactoring For For Thanks! |
Test build #79698 has finished for PR 18655 at commit
|
Ok, I see. By using
That's fine, but do they need to be in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @ueshin! I have some concerns about using the TaskContext in the iterator to release resources. I believe there was a bug fix in Arrow 0.4.1 for the decimal type, and I had planned to look into upgraded to support that type. Also, with the added type support here, how does it affect the python side and are you planning on adding tests there?
As for the refactoring, if it offers a performance improvement that is great. However, it seems a little out of order to me to refactor and move files around to support a SPIP that has not reached a consensus and not been voted on. Just my thoughts.
recordsInBatch += 1 | ||
context.addTaskCompletionListener { _ => | ||
root.close() | ||
allocator.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems a little odd to me to tie an iterator to a TaskContext, why not just close resources as soon as the row iterator is consumed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was worried about memory leak when an exception happens during iterating. In that case, the task will fail before the row iterator is completely consumed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, good point. What about closing resources in both ways? So have the listener close for the case that something fails, otherwise once the the row iterator is fully consumed then close immediately. I'm not really sure at what exact point the task completion listener callback is done, is it dependent on any IO?
val writer = new ArrowFileWriter(root, null, Channels.newChannel(out)) | ||
|
||
Utils.tryWithSafeFinally { | ||
var rowId = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe rowCount
instead of rowId
because it is a count of how many rows in the batch so far and not a unique id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I'll update it.
|
||
def setNull(): Unit | ||
def setValue(input: SpecializedGetters, ordinal: Int): Unit | ||
def skip(): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of the skip()
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for the case if the value of the struct type is null.
I believe if the value of the struct type, the fields should have some values for the same row.
@@ -391,6 +392,85 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { | |||
collectAndValidate(df, json, "floating_point-double_precision.json") | |||
} | |||
|
|||
ignore("decimal conversion") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why ignore this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I'm sorry, I should have mentioned it.
It seems like JsonFileReader
doesn't support DecimalType, so I ignored it for now.
But now I'm thinking that If Arrow 0.4.0 has a bug for the decimal type as you said, should I remove decimal type support from this pr and add support in the following prs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That might be true, I haven't looked into it yet. I can work on adding support on the Arrow side, so I'll try to check on that and see where it stands in the upcoming 0.5 release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arrow integration support for DecimalType isn't slated until v0.6, so it might work but there are no guarantees that a record batch in Java will equal when that batch is read by Python/C++. Also, we can't test here until the JsonFileReader
supports it also. I made the Arrow JIRA here https://issues.apache.org/jira/browse/ARROW-1238
I see, I'll move files back to |
Test build #79737 has finished for PR 18655 at commit
|
Test build #79741 has finished for PR 18655 at commit
|
Jenkins, retest this please. |
Test build #79744 has finished for PR 18655 at commit
|
Test build #79786 has finished for PR 18655 at commit
|
Test build #79798 has finished for PR 18655 at commit
|
On DecimalType, I want to point out that we haven't hardened the memory format and integration tests between Java<->C++ within Arrow. It would be great if you could help with this -- we ran into a problem in C++ where we needed an extra sign bit with 16-byte high precision decimals. So we have 3 memory representations:
What is Spark's internal memory representation? cc @cpcloud |
Based on this code: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala#L429-L547 It looks like there are two types:
I think Arrow's Decimal representation in Java is almost identical to this. Looking at the |
There are a bunch of open JIRAs about decimals in Arrow: https://issues.apache.org/jira/issues/?filter=12334829&jql=project%20%3D%20ARROW%20AND%20status%20in%20(%22In%20Review%22%2C%20Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20text%20~%20%22decimal%22 between these JIRAs and the mailing list if would be good to come up with a game plan for integration tests between Java and C++ (and thus Python) so we can enable Spark to send Python decimals |
@BryanCutler @wesm @cpcloud Thank you for reviewing this. |
yes let leave decimal support for folllow-ups |
columnWriters(i).write(row) | ||
i += 1 | ||
context.addTaskCompletionListener { _ => | ||
if (!closed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need this? I think it's ok to close twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The allocator
can be closed twice, but the root
throws an exception after allocator
is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a bug in arrow? cc @BryanCutler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The root just releases the buffers from the FieldVectors, so I would think it should be able to handle being closed twice. I'll check tomorrow if seems reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed https://issues.apache.org/jira/browse/ARROW-1283 to fix this. For now, it looks like we need this.
+1 on holding off for |
|
||
def writeSkip(): Unit = { | ||
skip() | ||
count += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For skipping purpose, is it enough to just do count += 1
? e.g. vector.set(1, v1); vector.set(3, v3)
, value 2 is skipped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, yes, it's enough except for StructType
, but should we set null bit to 1
for skipped value?
val reader = new ArrowColumnVector(writer.root.getFieldVectors().get(0)) | ||
data.zipWithIndex.foreach { | ||
case (null, rowId) => assert(reader.isNullAt(rowId)) | ||
case (datum, rowId) => assert(get(reader, rowId) === datum) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can do something like
dt match {
case BooleanType => reader.getBoolean(rowid)
case IntegerType => ...
...
}
Then the caller side doesn't need to pass in a get
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll update it.
|
||
private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = { | ||
val field = vector.getField() | ||
ArrowUtils.fromArrowField(field) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to do as below?
(ArrowUtils.fromArrowField(field), vector) match {
case (_: BooleanType, vector: NullableBitVector) => new BooleanWriter(vector)
case (_: ByteType, vector: NullableTinyIntVector) => new ByteWriter(vector)
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll modify it.
val a_arr = Seq(Seq(1, 2), Seq(3, 4), Seq(), Seq(5)) | ||
val b_arr = Seq(Some(Seq(1, 2)), None, Some(Seq()), None) | ||
val c_arr = Seq(Seq(Some(1), Some(2)), Seq(Some(3), None), Seq(), Seq(Some(5))) | ||
val d_arr = Seq(Seq(Seq(1, 2)), Seq(Seq(3), Seq()), Seq(), Seq(Seq(5))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about camelCase naming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll modify it.
Test build #79955 has finished for PR 18655 at commit
|
Test build #79957 has finished for PR 18655 at commit
|
Jenkins, retest this please. |
Test build #79960 has finished for PR 18655 at commit
|
Test build #79995 has finished for PR 18655 at commit
|
Test build #79996 has finished for PR 18655 at commit
|
LGTM, merging to master! |
@BryanCutler @wesm @cpcloud I filed a JIRA issue for decimal type support SPARK-21552 and sent a pr for it as WIP #18754. |
What changes were proposed in this pull request?
This is a refactoring of
ArrowConverters
and related classes.ColumnWriter
asArrowWriter
.ArrayType
andStructType
support.ArrowConverters
to skip intermediateArrowRecordBatch
creation.How was this patch tested?
Added some tests and existing tests.