Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22825][SQL] Fix incorrect results of Casting Array to String #20024

Closed
wants to merge 12 commits into from

Conversation

maropu
Copy link
Member

@maropu maropu commented Dec 19, 2017

What changes were proposed in this pull request?

This pr fixed the issue when casting arrays into strings;

scala> val df = spark.range(10).select('id.cast("integer")).agg(collect_list('id).as('ids))
scala> df.write.saveAsTable("t")
scala> sql("SELECT cast(ids as String) FROM t").show(false)
+------------------------------------------------------------------+
|ids                                                               |
+------------------------------------------------------------------+
|org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@8bc285df|
+------------------------------------------------------------------+

This pr modified the result into;

+------------------------------+
|ids                           |
+------------------------------+
|[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]|
+------------------------------+

How was this patch tested?

Added tests in CastSuite and SQLQuerySuite.

@SparkQA
Copy link

SparkQA commented Dec 19, 2017

Test build #85118 has finished for PR 20024 at commit ed7f885.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Dec 19, 2017

MapType has the same issue;

scala> Seq(Map(1 -> "a", 2 -> "b")).toDF("a").write.saveAsTable("t")
scala> sql("SELECT cast(a as String) FROM t").show(false)
+----------------------------------------------------------------+
|a                                                               |
+----------------------------------------------------------------+
|org.apache.spark.sql.catalyst.expressions.UnsafeMapData@38bdd75d|
+----------------------------------------------------------------+

After the approach to solve the issue is fixed, I'll also fix the MapType one in following activities.

@SparkQA
Copy link

SparkQA commented Dec 19, 2017

Test build #85119 has finished for PR 20024 at commit 211e7f3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val df1 = sql("SELECT CAST(ARRAY(1, 2, 3, 4) AS STRING)")
checkAnswer(df1, Row("[1, 2, 3, 4]"))
val df2 = sql("SELECT CAST(ARRAY(ARRAY(1, 2), ARRAY(3, 4, 5), ARRAY(6, 7)) AS STRING)")
checkAnswer(df2, Row("[WrappedArray(1, 2), WrappedArray(3, 4, 5), WrappedArray(6, 7)]"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @maropu .
Could you put the result after this PR into PR description? So far, only before result is described.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks!

@maropu
Copy link
Member Author

maropu commented Dec 20, 2017

Could you check? Thanks! @gatorsmile @viirya

@maropu
Copy link
Member Author

maropu commented Dec 22, 2017

ping

@gatorsmile
Copy link
Member

cc @cloud-fan

@cloud-fan
Copy link
Contributor

I feel it's not the most effcient way to cast array to string by deserializing the catalyst array to java array. Instead, I think we should have a schema-aware string casting function, i.e. using ArrayData.get to get each element, and recursively cast each element to string, and concat them with ,

@maropu
Copy link
Member Author

maropu commented Dec 22, 2017

ok, I'll brush up based on your suggestion.

@maropu
Copy link
Member Author

maropu commented Jan 3, 2018

@cloud-fan How about the current impl.? (not finished yet though)

@SparkQA
Copy link

SparkQA commented Jan 3, 2018

Test build #85630 has finished for PR 20024 at commit edd5abf.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class StringBuffer

*/
public class StringBuffer {

private BufferHolder buffer;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, I reused BufferHolder for the string buffer though, probably we'd better to make another buffer implementation for this purpose?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, just use java.lang.StringBuffer?

@SparkQA
Copy link

SparkQA commented Jan 3, 2018

Test build #85631 has finished for PR 20024 at commit d7cf391.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class StringBuffer

@SparkQA
Copy link

SparkQA commented Jan 3, 2018

Test build #85632 has finished for PR 20024 at commit e77ea6b.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class StringBuffer

@maropu maropu force-pushed the SPARK-22825 branch 2 times, most recently from 570f13b to 91a60c3 Compare January 3, 2018 16:35
@SparkQA
Copy link

SparkQA commented Jan 3, 2018

Test build #85633 has finished for PR 20024 at commit 570f13b.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class StringWriterBuffer

@maropu maropu force-pushed the SPARK-22825 branch 2 times, most recently from ca6519e to 9e13bb9 Compare January 3, 2018 18:05
@SparkQA
Copy link

SparkQA commented Jan 3, 2018

Test build #85634 has finished for PR 20024 at commit 91a60c3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class StringWriterBuffer

@SparkQA
Copy link

SparkQA commented Jan 3, 2018

Test build #85640 has finished for PR 20024 at commit 9e13bb9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class StringWriterBuffer

@maropu maropu force-pushed the SPARK-22825 branch 2 times, most recently from 3f075d8 to c2e6757 Compare January 4, 2018 02:06
}

public void append(String value) {
append(value.getBytes(StandardCharsets.UTF_8));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be append(UTF8String.fromString(value)), then we can remove append(byte[] value)

return UTF8String.fromBytes(bytes);
}

public int totalSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be public

public UTF8String toUTF8String() {
final int len = totalSize();
final byte[] bytes = new byte[len];
Platform.copyMemory(buffer, Platform.BYTE_ARRAY_OFFSET, bytes, Platform.BYTE_ARRAY_OFFSET, len);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why copy? we can do UTF8String.fromBytes(buffer, 0, totalSize)

cursor += value.length;
}

public UTF8String toUTF8String() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: public UTF8String build()

@@ -206,6 +206,23 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
case DateType => buildCast[Int](_, d => UTF8String.fromString(DateTimeUtils.dateToString(d)))
case TimestampType => buildCast[Long](_,
t => UTF8String.fromString(DateTimeUtils.timestampToString(t, timeZone)))
case ar: ArrayType =>
buildCast[ArrayData](_, array => {
val res = new UTF8StringBuilder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: builder

@@ -206,6 +206,23 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
case DateType => buildCast[Int](_, d => UTF8String.fromString(DateTimeUtils.dateToString(d)))
case TimestampType => buildCast[Long](_,
t => UTF8String.fromString(DateTimeUtils.timestampToString(t, timeZone)))
case ar: ArrayType =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: case ArrayType(et, _)

|$bufferClass $bufferTerm = new $bufferClass();
|$writeArrayToBuffer($c, $bufferTerm);
|$evPrim = $bufferTerm.toUTF8String();
""".stripMargin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simplify this too

val elementToStringCode = castToStringCode(et, ctx)
val funcName = ctx.freshName("elementToString")
val elementToStringFunc = ctx.addNewFunction(funcName,
  s"""
     private UTF8String $funcName(${ctx.dataType(et)} element) {
       UTF8String elementStr = null;
       ${elementToStringCode("element", "elementStr", null /* resultIsNull won't be touched */)}
       return elementStr;
     }
   """)
...
$bufferClass $bufferTerm = new $bufferClass();
$bufferTerm.append("[");
if ($c.numElements > 0) {
  if (!$c.isNullAt(0)) {
    $buffer.append($elementToStringFunc(${ctx.getValue(array, et, "0")}))
  }
  for (int $loopIndex = 1; $loopIndex < $arTerm.numElements(); $loopIndex++) ...
}

@SparkQA
Copy link

SparkQA commented Jan 4, 2018

Test build #85676 has finished for PR 20024 at commit b0b3cd6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

import org.apache.spark.unsafe.types.UTF8String;

/**
* A helper class to write `UTF8String`, `String`, and `byte[]` data into an internal byte buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A helper class to write {@link UTF8String}s to an internal buffer and build the concatenated {@link UTF8String} at the end.

@@ -597,6 +619,44 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
"""
}

private[this] def codegenWriteArrayElemCode(et: DataType, ctx: CodegenContext): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It returns a function to write the array elements, maybe a better name is: writeArrayToStringBuilderFunc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wait, the returned function is only called once, I think we don't need to make it a function, but just return the code, e.g.

def writeArrayToStringBuilder(ctx: CodegenContext, et: DataType, arr: String, builder: String): String

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elementToString needs a function because it's called twice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll update soon.

@@ -2775,4 +2773,53 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-22825 Cast array to string") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the unit test is good enough, we don't need this end-to-end test.

@cloud-fan
Copy link
Contributor

LGTM except a few minor comments

@SparkQA
Copy link

SparkQA commented Jan 4, 2018

Test build #85681 has finished for PR 20024 at commit 09fd22e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jan 4, 2018

Thanks for the kindly checks in the year start and all the comments done.

@SparkQA
Copy link

SparkQA commented Jan 5, 2018

Test build #85701 has finished for PR 20024 at commit 449e2c9.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jan 5, 2018

retest this please

@@ -608,6 +665,17 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
val tz = ctx.addReferenceObj("timeZone", timeZone)
(c, evPrim, evNull) => s"""$evPrim = UTF8String.fromString(
org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($c, $tz));"""
case ArrayType(et, _) =>
(c, evPrim, evNull) => {
val bufferTerm = ctx.freshName("bufferTerm")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: In codegen we usually don't add a term postfix, just call it buffer, array, etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@SparkQA
Copy link

SparkQA commented Jan 5, 2018

Test build #85710 has finished for PR 20024 at commit 449e2c9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 5, 2018

Test build #85714 has finished for PR 20024 at commit dc15b93.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/2.3!

@cloud-fan
Copy link
Contributor

as a follow-up, we should do the same thing for struct and map type too.

asfgit pushed a commit that referenced this pull request Jan 5, 2018
## What changes were proposed in this pull request?
This pr fixed the issue when casting arrays into strings;
```
scala> val df = spark.range(10).select('id.cast("integer")).agg(collect_list('id).as('ids))
scala> df.write.saveAsTable("t")
scala> sql("SELECT cast(ids as String) FROM t").show(false)
+------------------------------------------------------------------+
|ids                                                               |
+------------------------------------------------------------------+
|org.apache.spark.sql.catalyst.expressions.UnsafeArrayData8bc285df|
+------------------------------------------------------------------+
```

This pr modified the result into;
```
+------------------------------+
|ids                           |
+------------------------------+
|[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]|
+------------------------------+
```

## How was this patch tested?
Added tests in `CastSuite` and `SQLQuerySuite`.

Author: Takeshi Yamamuro <[email protected]>

Closes #20024 from maropu/SPARK-22825.

(cherry picked from commit 52fc5c1)
Signed-off-by: Wenchen Fan <[email protected]>
@asfgit asfgit closed this in 52fc5c1 Jan 5, 2018
@maropu
Copy link
Member Author

maropu commented Jan 5, 2018

Thanks! I'll do.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants