diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 5e75ff6f6e1a3..b2c302fbbbe31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal import com.fasterxml.jackson.core._ +import org.apache.hadoop.fs.PositionedReadable import org.apache.spark.SparkUpgradeException import org.apache.spark.internal.Logging @@ -275,19 +276,63 @@ class JacksonParser( } } - case _: StringType => - (parser: JsonParser) => parseJsonToken[UTF8String](parser, dataType) { + case _: StringType => (parser: JsonParser) => { + // This must be enabled if we will retrieve the bytes directly from the raw content: + val includeSourceInLocation = JsonParser.Feature.INCLUDE_SOURCE_IN_LOCATION + val originalMask = if (includeSourceInLocation.enabledIn(parser.getFeatureMask)) { + 1 + } else { + 0 + } + parser.overrideStdFeatures(includeSourceInLocation.getMask, includeSourceInLocation.getMask) + val result = parseJsonToken[UTF8String](parser, dataType) { case VALUE_STRING => UTF8String.fromString(parser.getText) - case _ => + case other => // Note that it always tries to convert the data as string without the case of failure. - val writer = new ByteArrayOutputStream() - Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) { - generator => generator.copyCurrentStructure(parser) + val startLocation = parser.currentTokenLocation() + def skipAhead(): Unit = { + other match { + case START_OBJECT => + parser.skipChildren() + case START_ARRAY => + parser.skipChildren() + case _ => + // Do nothing in this case; we've already read the token + } } - UTF8String.fromBytes(writer.toByteArray) - } + + // PositionedReadable + startLocation.contentReference().getRawContent match { + case byteArray: Array[Byte] if exactStringParsing => + skipAhead() + val endLocation = parser.currentLocation.getByteOffset + + UTF8String.fromBytes( + byteArray, + startLocation.getByteOffset.toInt, + endLocation.toInt - (startLocation.getByteOffset.toInt)) + case positionedReadable: PositionedReadable if exactStringParsing => + skipAhead() + val endLocation = parser.currentLocation.getByteOffset + + val size = endLocation.toInt - (startLocation.getByteOffset.toInt) + val buffer = new Array[Byte](size) + positionedReadable.read(startLocation.getByteOffset, buffer, 0, size) + UTF8String.fromBytes(buffer, 0, size) + case _ => + val writer = new ByteArrayOutputStream() + Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) { + generator => generator.copyCurrentStructure(parser) + } + UTF8String.fromBytes(writer.toByteArray) + } + } + // Reset back to the original configuration: + parser.overrideStdFeatures(includeSourceInLocation.getMask, originalMask) + result + } case TimestampType => (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) { @@ -429,6 +474,8 @@ class JacksonParser( private val allowEmptyString = SQLConf.get.getConf(SQLConf.LEGACY_ALLOW_EMPTY_STRING_IN_JSON) + private val exactStringParsing = SQLConf.get.getConf(SQLConf.JSON_EXACT_STRING_PARSING) + /** * This function throws an exception for failed conversion. For empty string on data types * except for string and binary types, this also throws an exception. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 54aa87260534f..e78157d611586 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4257,6 +4257,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val JSON_EXACT_STRING_PARSING = + buildConf("spark.sql.json.enableExactStringParsing") + .internal() + .doc("When set to true, string columns extracted from JSON objects will be extracted " + + "exactly as they appear in the input string, with no changes") + .version("4.0.0") + .booleanConf + .createWithDefault(true) + val LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK = buildConf("spark.sql.legacy.csv.enableDateTimeParsingFallback") .internal() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index c17a25be8e2ae..3d0eedd2f689c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3865,6 +3865,64 @@ abstract class JsonSuite } } } + + test("SPARK-48148: values are unchanged when read as string") { + withTempPath { path => + def extractData( + jsonString: String, + expectedInexactData: Seq[String], + expectedExactData: Seq[String], + multiLine: Boolean = false): Unit = { + Seq(jsonString).toDF() + .repartition(1) + .write + .mode("overwrite") + .text(path.getAbsolutePath) + + withClue("Exact string parsing") { + withSQLConf(SQLConf.JSON_EXACT_STRING_PARSING.key -> "true") { + val df = spark.read + .schema("data STRING") + .option("multiLine", multiLine.toString) + .json(path.getAbsolutePath) + checkAnswer(df, expectedExactData.map(d => Row(d))) + } + } + + withClue("Inexact string parsing") { + withSQLConf(SQLConf.JSON_EXACT_STRING_PARSING.key -> "false") { + val df = spark.read + .schema("data STRING") + .option("multiLine", multiLine.toString) + .json(path.getAbsolutePath) + checkAnswer(df, expectedInexactData.map(d => Row(d))) + } + } + } + extractData( + """{"data": {"white": "space"}}""", + expectedInexactData = Seq("""{"white":"space"}"""), + expectedExactData = Seq("""{"white": "space"}""") + ) + extractData( + """{"data": ["white", "space"]}""", + expectedInexactData = Seq("""["white","space"]"""), + expectedExactData = Seq("""["white", "space"]""") + ) + val granularFloat = "-999.99999999999999999999999999999999995" + extractData( + s"""{"data": {"v": ${granularFloat}}}""", + expectedInexactData = Seq("""{"v":-1000.0}"""), + expectedExactData = Seq(s"""{"v": ${granularFloat}}""") + ) + extractData( + s"""{"data": {"white":\n"space"}}""", + expectedInexactData = Seq("""{"white":"space"}"""), + expectedExactData = Seq(s"""{"white":\n"space"}"""), + multiLine = true + ) + } + } } class JsonV1Suite extends JsonSuite {