Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
maropu committed Jan 4, 2018
1 parent b0b3cd6 commit 09fd22e
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,21 @@ private void grow(int neededSize) {
}
}

private int totalSize() {
return cursor - Platform.BYTE_ARRAY_OFFSET;
}

public void append(UTF8String value) {
grow(value.numBytes());
value.writeToMemory(buffer, cursor);
cursor += value.numBytes();
}

public void append(String value) {
append(value.getBytes(StandardCharsets.UTF_8));
}

public void append(byte[] value) {
grow(value.length);
Platform.copyMemory(value, Platform.BYTE_ARRAY_OFFSET, buffer, cursor, value.length);
cursor += value.length;
}

public UTF8String toUTF8String() {
final int len = totalSize();
final byte[] bytes = new byte[len];
Platform.copyMemory(buffer, Platform.BYTE_ARRAY_OFFSET, bytes, Platform.BYTE_ARRAY_OFFSET, len);
return UTF8String.fromBytes(bytes);
append(UTF8String.fromString(value));
}

public int totalSize() {
return cursor - Platform.BYTE_ARRAY_OFFSET;
public UTF8String build() {
return UTF8String.fromBytes(buffer, 0, totalSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,22 +206,27 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
case DateType => buildCast[Int](_, d => UTF8String.fromString(DateTimeUtils.dateToString(d)))
case TimestampType => buildCast[Long](_,
t => UTF8String.fromString(DateTimeUtils.timestampToString(t, timeZone)))
case ar: ArrayType =>
case ArrayType(et, _) =>
buildCast[ArrayData](_, array => {
val res = new UTF8StringBuilder
res.append("[")
val builder = new UTF8StringBuilder
builder.append("[")
if (array.numElements > 0) {
val toUTF8String = castToString(ar.elementType)
res.append(toUTF8String(array.get(0, ar.elementType)).asInstanceOf[UTF8String])
val toUTF8String = castToString(et)
if (!array.isNullAt(0)) {
builder.append(toUTF8String(array.get(0, et)).asInstanceOf[UTF8String])
}
var i = 1
while (i < array.numElements) {
res.append(", ")
res.append(toUTF8String(array.get(i, ar.elementType)).asInstanceOf[UTF8String])
builder.append(",")
if (!array.isNullAt(i)) {
builder.append(" ")
builder.append(toUTF8String(array.get(i, et)).asInstanceOf[UTF8String])
}
i += 1
}
}
res.append("]")
res.toUTF8String
builder.append("]")
builder.build()
})
case _ => buildCast[Any](_, o => UTF8String.fromString(o.toString))
}
Expand Down Expand Up @@ -614,45 +619,37 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
"""
}

private[this] def writeElemToBufferCode(
dataType: DataType,
buffer: String,
elemTerm: String,
ctx: CodegenContext): String = dataType match {
case BinaryType | StringType => s"$buffer.append($elemTerm)"
case DateType => s"""$buffer.append(
org.apache.spark.sql.catalyst.util.DateTimeUtils.dateToString($elemTerm))"""
case TimestampType => s"""$buffer.append(
org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($elemTerm))"""
case ar: ArrayType => s"${codegenWriteArrayToBuffer(ar, ctx)}($elemTerm, $buffer)"
case _ => s"$buffer.append(String.valueOf($elemTerm))"
}
private[this] def codegenWriteArrayElemCode(et: DataType, ctx: CodegenContext): String = {
val elementToStringCode = castToStringCode(et, ctx)
val funcName = ctx.freshName("elementToString")
val elementToStringFunc = ctx.addNewFunction(funcName,
s"""
|private UTF8String $funcName(${ctx.javaType(et)} element) {
| UTF8String elementStr = null;
| ${elementToStringCode("element", "elementStr", null /* resultIsNull won't be used */)}
| return elementStr;
|}
""".stripMargin)

private[this] def codegenWriteArrayToBuffer(ar: ArrayType, ctx: CodegenContext): String = {
val loopIndex = ctx.freshName("loopIndex")
val writeArrayToBuffer = ctx.freshName("writeArrayToBuffer")
val arTerm = ctx.freshName("arTerm")
val bufferClass = classOf[UTF8StringBuilder].getName
val bufferTerm = ctx.freshName("bufferTerm")
def writeElemCode(elemTerm: String) = {
writeElemToBufferCode(ar.elementType, bufferTerm, elemTerm, ctx)
}
def writeToBufferCode(i: String) = {
val elemTerm = ctx.freshName("elemTerm")
s"""
|${ctx.javaType(ar.elementType)} $elemTerm = ${ctx.getValue(arTerm, ar.elementType, i)};
|${writeElemCode(elemTerm)};
""".stripMargin
}
ctx.addNewFunction(writeArrayToBuffer,
s"""
|private void $writeArrayToBuffer(ArrayData $arTerm, $bufferClass $bufferTerm) {
| $bufferTerm.append("[");
| if ($arTerm.numElements() > 0) {
| ${writeToBufferCode("0")}
| if (!$arTerm.isNullAt(0)) {
| $bufferTerm.append($elementToStringFunc(${ctx.getValue(arTerm, et, "0")}));
| }
| for (int $loopIndex = 1; $loopIndex < $arTerm.numElements(); $loopIndex++) {
| $bufferTerm.append(", ");
| ${writeToBufferCode(loopIndex)}
| $bufferTerm.append(",");
| if (!$arTerm.isNullAt($loopIndex)) {
| $bufferTerm.append(" ");
| $bufferTerm.append($elementToStringFunc(${ctx.getValue(arTerm, et, loopIndex)}));
| }
| }
| }
| $bufferTerm.append("]");
Expand All @@ -671,15 +668,15 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
val tz = ctx.addReferenceObj("timeZone", timeZone)
(c, evPrim, evNull) => s"""$evPrim = UTF8String.fromString(
org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($c, $tz));"""
case ar: ArrayType =>
case ArrayType(et, _) =>
(c, evPrim, evNull) => {
val bufferTerm = ctx.freshName("bufferTerm")
val bufferClass = classOf[UTF8StringBuilder].getName
val writeArrayToBuffer = codegenWriteArrayToBuffer(ar, ctx)
val writeArrayElemCode = codegenWriteArrayElemCode(et, ctx)
s"""
|$bufferClass $bufferTerm = new $bufferClass();
|$writeArrayToBuffer($c, $bufferTerm);
|$evPrim = $bufferTerm.toUTF8String();
|$writeArrayElemCode($c, $bufferTerm);
|$evPrim = $bufferTerm.build();
""".stripMargin
}
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,21 +859,23 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(ret1, "[1, 2, 3, 4, 5]")
val ret2 = cast(Literal.create(Array("ab", "cde", "f")), StringType)
checkEvaluation(ret2, "[ab, cde, f]")
val ret3 = cast(Literal.create(Array("ab".getBytes, "cde".getBytes, "f".getBytes)), StringType)
checkEvaluation(ret3, "[ab, cde, f]")
val ret4 = cast(
val ret3 = cast(Literal.create(Array("ab", null, "c")), StringType)
checkEvaluation(ret3, "[ab,, c]")
val ret4 = cast(Literal.create(Array("ab".getBytes, "cde".getBytes, "f".getBytes)), StringType)
checkEvaluation(ret4, "[ab, cde, f]")
val ret5 = cast(
Literal.create(Array("2014-12-03", "2014-12-04", "2014-12-06").map(Date.valueOf)),
StringType)
checkEvaluation(ret4, "[2014-12-03, 2014-12-04, 2014-12-06]")
val ret5 = cast(
checkEvaluation(ret5, "[2014-12-03, 2014-12-04, 2014-12-06]")
val ret6 = cast(
Literal.create(Array("2014-12-03 13:01:00", "2014-12-04 15:05:00").map(Timestamp.valueOf)),
StringType)
checkEvaluation(ret5, "[2014-12-03 13:01:00, 2014-12-04 15:05:00]")
val ret6 = cast(Literal.create(Array(Array(1, 2, 3), Array(4, 5))), StringType)
checkEvaluation(ret6, "[[1, 2, 3], [4, 5]]")
val ret7 = cast(
checkEvaluation(ret6, "[2014-12-03 13:01:00, 2014-12-04 15:05:00]")
val ret7 = cast(Literal.create(Array(Array(1, 2, 3), Array(4, 5))), StringType)
checkEvaluation(ret7, "[[1, 2, 3], [4, 5]]")
val ret8 = cast(
Literal.create(Array(Array(Array("a"), Array("b", "c")), Array(Array("d")))),
StringType)
checkEvaluation(ret7, "[[[a], [b, c]], [[d]]]")
checkEvaluation(ret8, "[[[a], [b, c]], [[d]]]")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2787,6 +2787,11 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
val df = sql("SELECT CAST(a AS STRING) FROM t")
checkAnswer(df, Row("[ab, cde, f]"))
}
withTable("t") {
Seq(Seq("ab", null, "c")).toDF("a").write.saveAsTable("t")
val df = sql("SELECT CAST(a AS STRING) FROM t")
checkAnswer(df, Row("[ab,, c]"))
}
withTable("t") {
Seq(Seq("ab".getBytes, "cde".getBytes, "f".getBytes)).toDF("a").write.saveAsTable("t")
val df = sql("SELECT CAST(a AS STRING) FROM t")
Expand Down

0 comments on commit 09fd22e

Please sign in to comment.